Skip to main content

rustpython_vm/
warn.rs

1use crate::{
2    AsObject, Context, Py, PyObject, PyObjectRef, PyResult, VirtualMachine,
3    builtins::{
4        PyBaseExceptionRef, PyDictRef, PyListRef, PyStr, PyStrInterned, PyStrRef, PyTuple,
5        PyTupleRef, PyTypeRef,
6    },
7    convert::TryFromObject,
8};
9use core::sync::atomic::{AtomicUsize, Ordering};
10use rustpython_common::lock::OnceCell;
11
12pub struct WarningsState {
13    pub filters: PyListRef,
14    pub once_registry: PyDictRef,
15    pub default_action: PyStrRef,
16    pub filters_version: AtomicUsize,
17    pub context_var: OnceCell<PyObjectRef>,
18    lock_count: AtomicUsize,
19}
20
21impl WarningsState {
22    fn create_default_filters(ctx: &Context) -> PyListRef {
23        // init_filters(): non-debug default filter set.
24        ctx.new_list(vec![
25            ctx.new_tuple(vec![
26                ctx.new_str("default").into(),
27                ctx.none(),
28                ctx.exceptions.deprecation_warning.as_object().to_owned(),
29                ctx.new_str("__main__").into(),
30                ctx.new_int(0).into(),
31            ])
32            .into(),
33            ctx.new_tuple(vec![
34                ctx.new_str("ignore").into(),
35                ctx.none(),
36                ctx.exceptions.deprecation_warning.as_object().to_owned(),
37                ctx.none(),
38                ctx.new_int(0).into(),
39            ])
40            .into(),
41            ctx.new_tuple(vec![
42                ctx.new_str("ignore").into(),
43                ctx.none(),
44                ctx.exceptions
45                    .pending_deprecation_warning
46                    .as_object()
47                    .to_owned(),
48                ctx.none(),
49                ctx.new_int(0).into(),
50            ])
51            .into(),
52            ctx.new_tuple(vec![
53                ctx.new_str("ignore").into(),
54                ctx.none(),
55                ctx.exceptions.import_warning.as_object().to_owned(),
56                ctx.none(),
57                ctx.new_int(0).into(),
58            ])
59            .into(),
60            ctx.new_tuple(vec![
61                ctx.new_str("ignore").into(),
62                ctx.none(),
63                ctx.exceptions.resource_warning.as_object().to_owned(),
64                ctx.none(),
65                ctx.new_int(0).into(),
66            ])
67            .into(),
68        ])
69    }
70
71    pub fn init_state(ctx: &Context) -> Self {
72        Self {
73            filters: Self::create_default_filters(ctx),
74            once_registry: ctx.new_dict(),
75            default_action: ctx.new_str("default"),
76            filters_version: AtomicUsize::new(0),
77            context_var: OnceCell::new(),
78            lock_count: AtomicUsize::new(0),
79        }
80    }
81
82    pub fn acquire_lock(&self) {
83        self.lock_count.fetch_add(1, Ordering::SeqCst);
84    }
85
86    pub fn release_lock(&self) -> bool {
87        let prev = self.lock_count.load(Ordering::SeqCst);
88        if prev == 0 {
89            return false;
90        }
91        self.lock_count.fetch_sub(1, Ordering::SeqCst);
92        true
93    }
94
95    pub fn filters_mutated(&self) {
96        self.filters_version.fetch_add(1, Ordering::SeqCst);
97    }
98}
99
100/// None matches everything; plain strings do exact comparison;
101/// regex objects use .match().
102fn check_matched(obj: &PyObject, arg: &PyObject, vm: &VirtualMachine) -> PyResult<bool> {
103    if vm.is_none(obj) {
104        return Ok(true);
105    }
106    if obj.class().is(vm.ctx.types.str_type) {
107        return obj.rich_compare_bool(arg, crate::types::PyComparisonOp::Eq, vm);
108    }
109    let result = vm.call_method(obj, "match", (arg.to_owned(),))?;
110    result.is_true(vm)
111}
112
113fn get_warnings_attr(
114    vm: &VirtualMachine,
115    attr_name: &'static PyStrInterned,
116    try_import: bool,
117) -> PyResult<Option<PyObjectRef>> {
118    let module = if try_import
119        && !vm
120            .state
121            .finalizing
122            .load(core::sync::atomic::Ordering::SeqCst)
123    {
124        match vm.import("warnings", 0) {
125            Ok(module) => module,
126            Err(_) => return Ok(None),
127        }
128    } else {
129        match vm.sys_module.get_attr(identifier!(vm, modules), vm) {
130            Ok(modules) => match modules.get_item(vm.ctx.intern_str("warnings"), vm) {
131                Ok(module) => module,
132                Err(_) => return Ok(None),
133            },
134            Err(_) => return Ok(None),
135        }
136    };
137    match module.get_attr(attr_name, vm) {
138        Ok(attr) => Ok(Some(attr)),
139        Err(_) => Ok(None),
140    }
141}
142
143/// Get the warnings filters list from sys.modules['warnings'].filters,
144/// falling back to vm.state.warnings.filters.
145fn get_warnings_filters(vm: &VirtualMachine) -> PyResult<PyListRef> {
146    if let Some(filters_obj) = get_warnings_attr(vm, identifier!(&vm.ctx, filters), false)?
147        && let Ok(filters) = filters_obj.try_into_value::<PyListRef>(vm)
148    {
149        return Ok(filters);
150    }
151    Ok(vm.state.warnings.filters.clone())
152}
153
154/// Get the default action from sys.modules['warnings']._defaultaction,
155/// falling back to vm.state.warnings.default_action.
156fn get_default_action(vm: &VirtualMachine) -> PyResult<PyObjectRef> {
157    if let Some(action) = get_warnings_attr(vm, identifier!(&vm.ctx, defaultaction), false)? {
158        if !action.class().is(vm.ctx.types.str_type) {
159            return Err(vm.new_type_error(format!(
160                "_warnings.defaultaction must be a string, not '{}'",
161                action.class().name()
162            )));
163        }
164        return Ok(action);
165    }
166    Ok(vm.state.warnings.default_action.clone().into())
167}
168
169/// Get the once registry from sys.modules['warnings']._onceregistry,
170/// falling back to vm.state.warnings.once_registry.
171fn get_once_registry(vm: &VirtualMachine) -> PyResult<PyObjectRef> {
172    if let Some(registry) = get_warnings_attr(vm, identifier!(&vm.ctx, onceregistry), false)? {
173        if !registry.class().is(vm.ctx.types.dict_type) {
174            return Err(vm.new_type_error(format!(
175                "_warnings.onceregistry must be a dict, not '{}'",
176                registry.class().name()
177            )));
178        }
179        return Ok(registry);
180    }
181    Ok(vm.state.warnings.once_registry.clone().into())
182}
183
184fn already_warned(
185    registry: &PyObject,
186    key: PyObjectRef,
187    should_set: bool,
188    vm: &VirtualMachine,
189) -> PyResult<bool> {
190    if vm.is_none(registry) {
191        return Ok(false);
192    }
193
194    let current_version = vm.state.warnings.filters_version.load(Ordering::SeqCst);
195    let version_obj = registry.get_item(identifier!(&vm.ctx, version), vm).ok();
196
197    let version_matches = version_obj.as_ref().is_some_and(|v| {
198        v.try_int(vm)
199            .map(|i| i.as_u32_mask() as usize == current_version)
200            .unwrap_or(false)
201    });
202
203    if version_matches {
204        if let Ok(val) = registry.get_item(key.as_ref(), vm)
205            && val.is_true(vm)?
206        {
207            return Ok(true);
208        }
209    } else if let Ok(dict) = PyDictRef::try_from_object(vm, registry.to_owned()) {
210        dict.clear();
211        dict.set_item(
212            identifier!(&vm.ctx, version),
213            vm.ctx.new_int(current_version).into(),
214            vm,
215        )?;
216    }
217
218    if should_set {
219        registry.set_item(key.as_ref(), vm.ctx.true_value.clone().into(), vm)?;
220    }
221    Ok(false)
222}
223
224/// Create a `(text, category)` or `(text, category, 0)` key and record
225/// it in the registry via `already_warned`.
226fn update_registry(
227    registry: &PyObject,
228    text: &PyObject,
229    category: &PyObject,
230    add_zero: bool,
231    vm: &VirtualMachine,
232) -> PyResult<bool> {
233    let altkey: PyObjectRef = if add_zero {
234        PyTuple::new_ref(
235            vec![
236                text.to_owned(),
237                category.to_owned(),
238                vm.ctx.new_int(0).into(),
239            ],
240            &vm.ctx,
241        )
242        .into()
243    } else {
244        PyTuple::new_ref(vec![text.to_owned(), category.to_owned()], &vm.ctx).into()
245    };
246    already_warned(registry, altkey, true, vm)
247}
248
249fn normalize_module(filename: &Py<PyStr>, vm: &VirtualMachine) -> PyObjectRef {
250    match filename.byte_len() {
251        0 => vm.new_pyobj("<unknown>"),
252        len if len >= 3 && filename.as_bytes().ends_with(b".py") => {
253            vm.new_pyobj(&filename.as_wtf8()[..len - 3])
254        }
255        _ => filename.as_object().to_owned(),
256    }
257}
258
259/// Search the global filters list for a matching action.
260// TODO: split into filter_search() + get_filter() and support
261//       context-aware filters (get_warnings_context_filters).
262fn get_filter(
263    category: PyObjectRef,
264    text: PyObjectRef,
265    lineno: usize,
266    module: PyObjectRef,
267    vm: &VirtualMachine,
268) -> PyResult {
269    let filters = get_warnings_filters(vm)?;
270
271    // filters could change while we are iterating over it.
272    // Re-check list length each iteration (matches C behavior).
273    let mut i = 0;
274    while i < filters.borrow_vec().len() {
275        let Some(tmp_item) = filters.borrow_vec().get(i).cloned() else {
276            break;
277        };
278        let tmp_item = PyTupleRef::try_from_object(vm, tmp_item)
279            .ok()
280            .filter(|t| t.len() == 5)
281            .ok_or_else(|| {
282                vm.new_value_error(format!("_warnings.filters item {i} isn't a 5-tuple"))
283            })?;
284
285        /* action, msg, cat, mod, ln = item */
286        let action = &tmp_item[0];
287        let good_msg = check_matched(&tmp_item[1], &text, vm)?;
288        let is_subclass = category.is_subclass(&tmp_item[2], vm)?;
289        let good_mod = check_matched(&tmp_item[3], &module, vm)?;
290        let ln: usize = tmp_item[4].try_int(vm).map_or(0, |v| v.as_u32_mask() as _);
291
292        if good_msg && is_subclass && good_mod && (ln == 0 || lineno == ln) {
293            return Ok(action.to_owned());
294        }
295        i += 1;
296    }
297
298    get_default_action(vm)
299}
300
301pub fn warn(
302    message: PyObjectRef,
303    category: Option<PyTypeRef>,
304    stack_level: isize,
305    source: Option<PyObjectRef>,
306    vm: &VirtualMachine,
307) -> PyResult<()> {
308    warn_with_skip(message, category, stack_level, source, None, vm)
309}
310
311/// do_warn: resolve context via setup_context, then call warn_explicit.
312pub fn warn_with_skip(
313    message: PyObjectRef,
314    category: Option<PyTypeRef>,
315    mut stack_level: isize,
316    source: Option<PyObjectRef>,
317    skip_file_prefixes: Option<PyTupleRef>,
318    vm: &VirtualMachine,
319) -> PyResult<()> {
320    if let Some(ref prefixes) = skip_file_prefixes
321        && !prefixes.is_empty()
322        && stack_level < 2
323    {
324        stack_level = 2;
325    }
326    let (filename, lineno, module, registry) =
327        setup_context(stack_level, skip_file_prefixes.as_ref(), vm)?;
328    warn_explicit(
329        category, message, filename, lineno, module, registry, None, source, vm,
330    )
331}
332
333/// Core warning logic matching `warn_explicit()` in `_warnings.c`.
334#[allow(clippy::too_many_arguments)]
335pub(crate) fn warn_explicit(
336    category: Option<PyTypeRef>,
337    message: PyObjectRef,
338    filename: PyStrRef,
339    lineno: usize,
340    module: Option<PyObjectRef>,
341    registry: PyObjectRef,
342    source_line: Option<PyObjectRef>,
343    source: Option<PyObjectRef>,
344    vm: &VirtualMachine,
345) -> PyResult<()> {
346    // Normalize module. None → silent return (late-shutdown safety).
347    let module = module.unwrap_or_else(|| normalize_module(&filename, vm));
348    if vm.is_none(&module) {
349        return Ok(());
350    }
351
352    // Normalize message.
353    let is_warning = message.fast_isinstance(vm.ctx.exceptions.warning);
354    let (text, category, message) = if is_warning {
355        let text = message.str(vm)?;
356        let cat = message.class().to_owned();
357        (text, cat, message)
358    } else {
359        // For non-Warning messages, convert to string via str()
360        let text = message.str(vm)?;
361        let cat = category.unwrap_or_else(|| vm.ctx.exceptions.user_warning.to_owned());
362        let instance = cat.as_object().call((text.clone(),), vm)?;
363        (text, cat, instance)
364    };
365
366    let lineno_obj: PyObjectRef = vm.ctx.new_int(lineno).into();
367
368    // key = (text, category, lineno)
369    let key: PyObjectRef = PyTuple::new_ref(
370        vec![
371            text.clone().into(),
372            category.as_object().to_owned(),
373            lineno_obj.clone(),
374        ],
375        &vm.ctx,
376    )
377    .into();
378
379    // Check if already warned
380    if !vm.is_none(&registry) && already_warned(&registry, key.clone(), false, vm)? {
381        return Ok(());
382    }
383
384    // Get filter action
385    let action = get_filter(
386        category.as_object().to_owned(),
387        text.clone().into(),
388        lineno,
389        module,
390        vm,
391    )?;
392    let action_str = PyStrRef::try_from_object(vm, action)
393        .map_err(|_| vm.new_type_error("action must be a string"))?;
394
395    if action_str.as_bytes() == b"error" {
396        let exc = PyBaseExceptionRef::try_from_object(vm, message)?;
397        return Err(exc);
398    }
399    if action_str.as_bytes() == b"ignore" {
400        return Ok(());
401    }
402
403    // For everything except "always"/"all", record in registry then
404    // check per-action registries.
405    let already = if action_str.as_wtf8() != "always" && action_str.as_wtf8() != "all" {
406        if !vm.is_none(&registry) {
407            registry.set_item(&*key, vm.ctx.true_value.clone().into(), vm)?;
408        }
409
410        let action_s = action_str.to_str();
411        match action_s {
412            Some("once") => {
413                let reg = if vm.is_none(&registry) {
414                    get_once_registry(vm)?
415                } else {
416                    registry.clone()
417                };
418                update_registry(&reg, text.as_ref(), category.as_object(), false, vm)?
419            }
420            Some("module") => {
421                if !vm.is_none(&registry) {
422                    update_registry(&registry, text.as_ref(), category.as_object(), false, vm)?
423                } else {
424                    false
425                }
426            }
427            Some("default") => false,
428            _ => {
429                return Err(vm.new_runtime_error(format!(
430                    "Unrecognized action ({action_str}) in warnings.filters:\n {action_str}"
431                )));
432            }
433        }
434    } else {
435        false
436    };
437
438    if already {
439        return Ok(());
440    }
441
442    call_show_warning(
443        category,
444        text,
445        message,
446        filename,
447        lineno,
448        lineno_obj,
449        source_line,
450        source,
451        vm,
452    )
453}
454
455#[allow(clippy::too_many_arguments)]
456fn call_show_warning(
457    category: PyTypeRef,
458    text: PyStrRef,
459    message: PyObjectRef,
460    filename: PyStrRef,
461    lineno: usize,
462    lineno_obj: PyObjectRef,
463    source_line: Option<PyObjectRef>,
464    source: Option<PyObjectRef>,
465    vm: &VirtualMachine,
466) -> PyResult<()> {
467    let Some(show_fn) =
468        get_warnings_attr(vm, identifier!(&vm.ctx, _showwarnmsg), source.is_some())?
469    else {
470        return show_warning(filename, lineno, text, category, source_line, vm);
471    };
472    if !show_fn.is_callable() {
473        return Err(vm.new_type_error("warnings._showwarnmsg() must be set to a callable"));
474    }
475    let Some(warnmsg_cls) = get_warnings_attr(vm, identifier!(&vm.ctx, WarningMessage), false)?
476    else {
477        return Err(vm.new_runtime_error("unable to get warnings.WarningMessage"));
478    };
479
480    let msg = warnmsg_cls.call(
481        vec![
482            message,
483            category.into(),
484            filename.into(),
485            lineno_obj,
486            vm.ctx.none(),
487            vm.ctx.none(),
488            vm.unwrap_or_none(source),
489        ],
490        vm,
491    )?;
492    show_fn.call((msg,), vm)?;
493    Ok(())
494}
495
496fn show_warning(
497    filename: PyStrRef,
498    lineno: usize,
499    text: PyStrRef,
500    category: PyTypeRef,
501    _source_line: Option<PyObjectRef>,
502    vm: &VirtualMachine,
503) -> PyResult<()> {
504    let stderr = crate::stdlib::sys::PyStderr(vm);
505    writeln!(
506        stderr,
507        "{}:{}: {}: {}",
508        filename,
509        lineno,
510        category.name(),
511        text
512    );
513    Ok(())
514}
515
516/// Check if a frame's filename starts with any of the given prefixes.
517fn is_filename_to_skip(frame: &crate::frame::Frame, prefixes: &PyTupleRef) -> bool {
518    let filename = frame.f_code().co_filename();
519    let filename_bytes = filename.as_bytes();
520    prefixes.iter().any(|prefix| {
521        prefix
522            .downcast_ref::<PyStr>()
523            .is_some_and(|s| filename_bytes.starts_with(s.as_bytes()))
524    })
525}
526
527/// Like Frame::next_external_frame but also skips frames matching prefixes.
528fn next_external_frame_with_skip(
529    frame: &crate::frame::FrameRef,
530    skip_file_prefixes: Option<&PyTupleRef>,
531    vm: &VirtualMachine,
532) -> Option<crate::frame::FrameRef> {
533    let mut f = frame.f_back(vm);
534    loop {
535        let current: crate::frame::FrameRef = f.take()?;
536        if current.is_internal_frame()
537            || skip_file_prefixes.is_some_and(|p| is_filename_to_skip(&current, p))
538        {
539            f = current.f_back(vm);
540        } else {
541            return Some(current);
542        }
543    }
544}
545
546/// filename, module, and registry are new refs, globals is borrowed
547/// Returns `Ok` on success, or `Err` on error (no new refs)
548fn setup_context(
549    mut stack_level: isize,
550    skip_file_prefixes: Option<&PyTupleRef>,
551    vm: &VirtualMachine,
552) -> PyResult<(PyStrRef, usize, Option<PyObjectRef>, PyObjectRef)> {
553    let mut f = vm.current_frame();
554
555    // Stack level comparisons to Python code is off by one as there is no
556    // warnings-related stack level to avoid.
557    if stack_level <= 0 || f.as_ref().is_some_and(|frame| frame.is_internal_frame()) {
558        while {
559            stack_level -= 1;
560            stack_level > 0
561        } {
562            match f {
563                Some(tmp) => f = tmp.f_back(vm),
564                None => break,
565            }
566        }
567    } else {
568        while {
569            stack_level -= 1;
570            stack_level > 0
571        } {
572            match f {
573                Some(tmp) => f = next_external_frame_with_skip(&tmp, skip_file_prefixes, vm),
574                None => break,
575            }
576        }
577    }
578
579    let (globals, filename, lineno) = if let Some(f) = f {
580        (f.globals.clone(), f.code.source_path(), f.f_lineno())
581    } else if let Some(frame) = vm.current_frame() {
582        // We have a frame but it wasn't found during stack walking
583        (frame.globals.clone(), vm.ctx.intern_str("<sys>"), 1)
584    } else {
585        // No frames on the stack - use sys.__dict__ (interp->sysdict)
586        let globals = vm
587            .sys_module
588            .as_object()
589            .get_attr(identifier!(vm, __dict__), vm)
590            .and_then(|d| {
591                d.downcast::<crate::builtins::PyDict>()
592                    .map_err(|_| vm.new_type_error("sys.__dict__ is not a dictionary"))
593            })?;
594        (globals, vm.ctx.intern_str("<sys>"), 0)
595    };
596
597    let registry = match globals.get_item("__warningregistry__", vm) {
598        Ok(r) => r,
599        Err(_) => {
600            let r = vm.ctx.new_dict();
601            globals.set_item("__warningregistry__", r.clone().into(), vm)?;
602            r.into()
603        }
604    };
605
606    // Setup module.
607    let module = globals
608        .get_item("__name__", vm)
609        .unwrap_or_else(|_| vm.new_pyobj("<string>"));
610    Ok((filename.to_owned(), lineno, Some(module), registry))
611}