rustpython_vm/
warn.rs

1use crate::{
2    builtins::{
3        PyDict, PyDictRef, PyListRef, PyStr, PyStrInterned, PyStrRef, PyTuple, PyTupleRef,
4        PyTypeRef,
5    },
6    convert::{IntoObject, TryFromObject},
7    types::PyComparisonOp,
8    AsObject, Context, Py, PyObjectRef, PyResult, VirtualMachine,
9};
10
11pub struct WarningsState {
12    filters: PyListRef,
13    _once_registry: PyDictRef,
14    default_action: PyStrRef,
15    filters_version: usize,
16}
17
18impl WarningsState {
19    fn create_filter(ctx: &Context) -> PyListRef {
20        ctx.new_list(vec![ctx
21            .new_tuple(vec![
22                ctx.new_str("__main__").into(),
23                ctx.types.none_type.as_object().to_owned(),
24                ctx.exceptions.warning.as_object().to_owned(),
25                ctx.new_str("ACTION").into(),
26                ctx.new_int(0).into(),
27            ])
28            .into()])
29    }
30
31    pub fn init_state(ctx: &Context) -> WarningsState {
32        WarningsState {
33            filters: Self::create_filter(ctx),
34            _once_registry: PyDict::new_ref(ctx),
35            default_action: ctx.new_str("default"),
36            filters_version: 0,
37        }
38    }
39}
40
41fn check_matched(obj: &PyObjectRef, arg: &PyObjectRef, vm: &VirtualMachine) -> PyResult<bool> {
42    if obj.class().is(vm.ctx.types.none_type) {
43        return Ok(true);
44    }
45
46    if obj.rich_compare_bool(arg, PyComparisonOp::Eq, vm)? {
47        return Ok(false);
48    }
49
50    let result = obj.call((arg.to_owned(),), vm);
51    Ok(result.is_ok())
52}
53
54fn get_warnings_attr(
55    vm: &VirtualMachine,
56    attr_name: &'static PyStrInterned,
57    try_import: bool,
58) -> PyResult<Option<PyObjectRef>> {
59    let module = if try_import
60        && !vm
61            .state
62            .finalizing
63            .load(std::sync::atomic::Ordering::SeqCst)
64    {
65        match vm.import("warnings", 0) {
66            Ok(module) => module,
67            Err(_) => return Ok(None),
68        }
69    } else {
70        // TODO: finalizing support
71        return Ok(None);
72    };
73    Ok(Some(module.get_attr(attr_name, vm)?))
74}
75
76pub fn warn(
77    message: PyStrRef,
78    category: Option<PyTypeRef>,
79    stack_level: isize,
80    source: Option<PyObjectRef>,
81    vm: &VirtualMachine,
82) -> PyResult<()> {
83    let (filename, lineno, module, registry) = setup_context(stack_level, vm)?;
84    warn_explicit(
85        category, message, filename, lineno, module, registry, None, source, vm,
86    )
87}
88
89fn get_default_action(vm: &VirtualMachine) -> PyResult<PyObjectRef> {
90    Ok(vm.state.warnings.default_action.clone().into())
91    // .map_err(|_| {
92    //     vm.new_value_error(format!(
93    //         "_warnings.defaultaction must be a string, not '{}'",
94    //         vm.state.warnings.default_action
95    //     ))
96    // })
97}
98
99fn get_filter(
100    category: PyObjectRef,
101    text: PyObjectRef,
102    lineno: usize,
103    module: PyObjectRef,
104    mut _item: PyTupleRef,
105    vm: &VirtualMachine,
106) -> PyResult {
107    let filters = vm.state.warnings.filters.as_object().to_owned();
108
109    let filters: PyListRef = filters
110        .try_into_value(vm)
111        .map_err(|_| vm.new_value_error("_warnings.filters must be a list".to_string()))?;
112
113    /* WarningsState.filters could change while we are iterating over it. */
114    for i in 0..filters.borrow_vec().len() {
115        let tmp_item = if let Some(tmp_item) = filters.borrow_vec().get(i).cloned() {
116            let tmp_item = PyTupleRef::try_from_object(vm, tmp_item)?;
117            (tmp_item.len() == 5).then_some(tmp_item)
118        } else {
119            None
120        }
121        .ok_or_else(|| vm.new_value_error(format!("_warnings.filters item {i} isn't a 5-tuple")))?;
122
123        /* Python code: action, msg, cat, mod, ln = item */
124        let action = if let Some(action) = tmp_item.first() {
125            action.str(vm).map(|action| action.into_object())
126        } else {
127            Err(vm.new_type_error("action must be a string".to_string()))
128        };
129
130        let good_msg = if let Some(msg) = tmp_item.get(1) {
131            check_matched(msg, &text, vm)?
132        } else {
133            false
134        };
135
136        let is_subclass = if let Some(cat) = tmp_item.get(2) {
137            category.fast_isinstance(cat.class())
138        } else {
139            false
140        };
141
142        let good_mod = if let Some(item_mod) = tmp_item.get(3) {
143            check_matched(item_mod, &module, vm)?
144        } else {
145            false
146        };
147
148        let ln = tmp_item.get(4).map_or(0, |ln_obj| {
149            ln_obj.try_int(vm).map_or(0, |ln| ln.as_u32_mask() as _)
150        });
151
152        if good_msg && good_mod && is_subclass && (ln == 0 || lineno == ln) {
153            _item = tmp_item;
154            return action;
155        }
156    }
157
158    get_default_action(vm)
159}
160
161fn already_warned(
162    registry: PyObjectRef,
163    key: PyObjectRef,
164    should_set: bool,
165    vm: &VirtualMachine,
166) -> PyResult<bool> {
167    let version_obj = registry.get_item(identifier!(&vm.ctx, version), vm).ok();
168    let filters_version = vm.ctx.new_int(vm.state.warnings.filters_version).into();
169
170    match version_obj {
171        Some(version_obj)
172            if version_obj.try_int(vm).is_ok() || version_obj.is(&filters_version) =>
173        {
174            let already_warned = registry.get_item(key.as_ref(), vm)?;
175            if already_warned.is_true(vm)? {
176                return Ok(true);
177            }
178        }
179        _ => {
180            let registry = registry.dict();
181            if let Some(registry) = registry.as_ref() {
182                registry.clear();
183                let r = registry.set_item("version", filters_version, vm);
184                if r.is_err() {
185                    return Ok(false);
186                }
187            }
188        }
189    }
190
191    /* This warning wasn't found in the registry, set it. */
192    if !should_set {
193        return Ok(false);
194    }
195
196    let item = vm.ctx.true_value.clone().into();
197    let _ = registry.set_item(key.as_ref(), item, vm); // ignore set error
198    Ok(true)
199}
200
201fn normalize_module(filename: &Py<PyStr>, vm: &VirtualMachine) -> Option<PyObjectRef> {
202    let obj = match filename.char_len() {
203        0 => vm.new_pyobj("<unknown>"),
204        len if len >= 3 && filename.as_str().ends_with(".py") => {
205            vm.new_pyobj(&filename.as_str()[..len - 3])
206        }
207        _ => filename.as_object().to_owned(),
208    };
209    Some(obj)
210}
211
212#[allow(clippy::too_many_arguments)]
213fn warn_explicit(
214    category: Option<PyTypeRef>,
215    message: PyStrRef,
216    filename: PyStrRef,
217    lineno: usize,
218    module: Option<PyObjectRef>,
219    registry: PyObjectRef,
220    source_line: Option<PyObjectRef>,
221    source: Option<PyObjectRef>,
222    vm: &VirtualMachine,
223) -> PyResult<()> {
224    let registry: PyObjectRef = registry
225        .try_into_value(vm)
226        .map_err(|_| vm.new_type_error("'registry' must be a dict or None".to_owned()))?;
227
228    // Normalize module.
229    let module = match module.or_else(|| normalize_module(&filename, vm)) {
230        Some(module) => module,
231        None => return Ok(()),
232    };
233
234    // Normalize message.
235    let text = message.as_str();
236
237    let category = if let Some(category) = category {
238        if !category.fast_issubclass(vm.ctx.exceptions.warning) {
239            return Err(vm.new_type_error(format!(
240                "category must be a Warning subclass, not '{}'",
241                category.class().name()
242            )));
243        }
244        category
245    } else {
246        vm.ctx.exceptions.user_warning.to_owned()
247    };
248
249    let category = if message.fast_isinstance(vm.ctx.exceptions.warning) {
250        message.class().to_owned()
251    } else {
252        category
253    };
254
255    // Create key.
256    let key = PyTuple::new_ref(
257        vec![
258            vm.ctx.new_int(3).into(),
259            vm.ctx.new_str(text).into(),
260            category.as_object().to_owned(),
261            vm.ctx.new_int(lineno).into(),
262        ],
263        &vm.ctx,
264    );
265
266    if !vm.is_none(registry.as_object()) && already_warned(registry, key.into_object(), false, vm)?
267    {
268        return Ok(());
269    }
270
271    let item = vm.ctx.new_tuple(vec![]);
272    let action = get_filter(
273        category.as_object().to_owned(),
274        vm.ctx.new_str(text).into(),
275        lineno,
276        module,
277        item,
278        vm,
279    )?;
280
281    if action.str(vm)?.as_str().eq("error") {
282        return Err(vm.new_type_error(message.to_string()));
283    }
284
285    if action.str(vm)?.as_str().eq("ignore") {
286        return Ok(());
287    }
288
289    call_show_warning(
290        // t_state,
291        category,
292        message,
293        filename,
294        lineno, // lineno_obj,
295        source_line,
296        source,
297        vm,
298    )
299}
300
301fn call_show_warning(
302    category: PyTypeRef,
303    message: PyStrRef,
304    filename: PyStrRef,
305    lineno: usize,
306    source_line: Option<PyObjectRef>,
307    source: Option<PyObjectRef>,
308    vm: &VirtualMachine,
309) -> PyResult<()> {
310    let Some(show_fn) =
311        get_warnings_attr(vm, identifier!(&vm.ctx, _showwarnmsg), source.is_some())?
312    else {
313        return show_warning(filename, lineno, message, category, source_line, vm);
314    };
315    if !show_fn.is_callable() {
316        return Err(
317            vm.new_type_error("warnings._showwarnmsg() must be set to a callable".to_owned())
318        );
319    }
320    let Some(warnmsg_cls) = get_warnings_attr(vm, identifier!(&vm.ctx, WarningMessage), false)?
321    else {
322        return Err(vm.new_type_error("unable to get warnings.WarningMessage".to_owned()));
323    };
324
325    let msg = warnmsg_cls.call(
326        vec![
327            message.into(),
328            category.into(),
329            filename.into(),
330            vm.new_pyobj(lineno),
331            vm.ctx.none(),
332            vm.ctx.none(),
333            vm.unwrap_or_none(source),
334        ],
335        vm,
336    )?;
337    show_fn.call((msg,), vm)?;
338    Ok(())
339}
340
341fn show_warning(
342    _filename: PyStrRef,
343    _lineno: usize,
344    text: PyStrRef,
345    category: PyTypeRef,
346    _source_line: Option<PyObjectRef>,
347    vm: &VirtualMachine,
348) -> PyResult<()> {
349    let stderr = crate::stdlib::sys::PyStderr(vm);
350    writeln!(stderr, "{}: {}", category.name(), text.as_str(),);
351    Ok(())
352}
353
354/// filename, module, and registry are new refs, globals is borrowed
355/// Returns `Ok` on success, or `Err` on error (no new refs)
356fn setup_context(
357    mut stack_level: isize,
358    vm: &VirtualMachine,
359) -> PyResult<
360    // filename, lineno, module, registry
361    (PyStrRef, usize, Option<PyObjectRef>, PyObjectRef),
362> {
363    let __warningregistry__ = "__warningregistry__";
364    let __name__ = "__name__";
365
366    let mut f = vm.current_frame().as_deref().cloned();
367
368    // Stack level comparisons to Python code is off by one as there is no
369    // warnings-related stack level to avoid.
370    if stack_level <= 0 || f.as_ref().map_or(false, |frame| frame.is_internal_frame()) {
371        loop {
372            stack_level -= 1;
373            if stack_level <= 0 {
374                break;
375            }
376            if let Some(tmp) = f {
377                f = tmp.f_back(vm);
378            } else {
379                break;
380            }
381        }
382    } else {
383        loop {
384            stack_level -= 1;
385            if stack_level <= 0 {
386                break;
387            }
388            if let Some(tmp) = f {
389                f = tmp.next_external_frame(vm);
390            } else {
391                break;
392            }
393        }
394    }
395
396    let (globals, filename, lineno) = if let Some(f) = f {
397        (f.globals.clone(), f.code.source_path, f.f_lineno())
398    } else {
399        (vm.current_globals().clone(), vm.ctx.intern_str("sys"), 1)
400    };
401
402    let registry = if let Ok(registry) = globals.get_item(__warningregistry__, vm) {
403        registry
404    } else {
405        let registry = PyDict::new_ref(&vm.ctx);
406        globals.set_item(__warningregistry__, registry.clone().into(), vm)?;
407        registry.into()
408    };
409
410    // Setup module.
411    let module = globals
412        .get_item(__name__, vm)
413        .unwrap_or_else(|_| vm.new_pyobj("<string>"));
414    Ok((filename.to_owned(), lineno, Some(module), registry))
415}