1use crate::{
2 AsObject, Context, Py, PyObject, PyObjectRef, PyResult, VirtualMachine,
3 builtins::{
4 PyBaseExceptionRef, PyDictRef, PyListRef, PyStr, PyStrInterned, PyStrRef, PyTuple,
5 PyTupleRef, PyTypeRef,
6 },
7 convert::TryFromObject,
8};
9use core::sync::atomic::{AtomicUsize, Ordering};
10use rustpython_common::lock::OnceCell;
11
12pub struct WarningsState {
13 pub filters: PyListRef,
14 pub once_registry: PyDictRef,
15 pub default_action: PyStrRef,
16 pub filters_version: AtomicUsize,
17 pub context_var: OnceCell<PyObjectRef>,
18 lock_count: AtomicUsize,
19}
20
21impl WarningsState {
22 fn create_default_filters(ctx: &Context) -> PyListRef {
23 ctx.new_list(vec![
25 ctx.new_tuple(vec![
26 ctx.new_str("default").into(),
27 ctx.none(),
28 ctx.exceptions.deprecation_warning.as_object().to_owned(),
29 ctx.new_str("__main__").into(),
30 ctx.new_int(0).into(),
31 ])
32 .into(),
33 ctx.new_tuple(vec![
34 ctx.new_str("ignore").into(),
35 ctx.none(),
36 ctx.exceptions.deprecation_warning.as_object().to_owned(),
37 ctx.none(),
38 ctx.new_int(0).into(),
39 ])
40 .into(),
41 ctx.new_tuple(vec![
42 ctx.new_str("ignore").into(),
43 ctx.none(),
44 ctx.exceptions
45 .pending_deprecation_warning
46 .as_object()
47 .to_owned(),
48 ctx.none(),
49 ctx.new_int(0).into(),
50 ])
51 .into(),
52 ctx.new_tuple(vec![
53 ctx.new_str("ignore").into(),
54 ctx.none(),
55 ctx.exceptions.import_warning.as_object().to_owned(),
56 ctx.none(),
57 ctx.new_int(0).into(),
58 ])
59 .into(),
60 ctx.new_tuple(vec![
61 ctx.new_str("ignore").into(),
62 ctx.none(),
63 ctx.exceptions.resource_warning.as_object().to_owned(),
64 ctx.none(),
65 ctx.new_int(0).into(),
66 ])
67 .into(),
68 ])
69 }
70
71 pub fn init_state(ctx: &Context) -> Self {
72 Self {
73 filters: Self::create_default_filters(ctx),
74 once_registry: ctx.new_dict(),
75 default_action: ctx.new_str("default"),
76 filters_version: AtomicUsize::new(0),
77 context_var: OnceCell::new(),
78 lock_count: AtomicUsize::new(0),
79 }
80 }
81
82 pub fn acquire_lock(&self) {
83 self.lock_count.fetch_add(1, Ordering::SeqCst);
84 }
85
86 pub fn release_lock(&self) -> bool {
87 let prev = self.lock_count.load(Ordering::SeqCst);
88 if prev == 0 {
89 return false;
90 }
91 self.lock_count.fetch_sub(1, Ordering::SeqCst);
92 true
93 }
94
95 pub fn filters_mutated(&self) {
96 self.filters_version.fetch_add(1, Ordering::SeqCst);
97 }
98}
99
100fn check_matched(obj: &PyObject, arg: &PyObject, vm: &VirtualMachine) -> PyResult<bool> {
103 if vm.is_none(obj) {
104 return Ok(true);
105 }
106 if obj.class().is(vm.ctx.types.str_type) {
107 return obj.rich_compare_bool(arg, crate::types::PyComparisonOp::Eq, vm);
108 }
109 let result = vm.call_method(obj, "match", (arg.to_owned(),))?;
110 result.is_true(vm)
111}
112
113fn get_warnings_attr(
114 vm: &VirtualMachine,
115 attr_name: &'static PyStrInterned,
116 try_import: bool,
117) -> PyResult<Option<PyObjectRef>> {
118 let module = if try_import
119 && !vm
120 .state
121 .finalizing
122 .load(core::sync::atomic::Ordering::SeqCst)
123 {
124 match vm.import("warnings", 0) {
125 Ok(module) => module,
126 Err(_) => return Ok(None),
127 }
128 } else {
129 match vm.sys_module.get_attr(identifier!(vm, modules), vm) {
130 Ok(modules) => match modules.get_item(vm.ctx.intern_str("warnings"), vm) {
131 Ok(module) => module,
132 Err(_) => return Ok(None),
133 },
134 Err(_) => return Ok(None),
135 }
136 };
137 match module.get_attr(attr_name, vm) {
138 Ok(attr) => Ok(Some(attr)),
139 Err(_) => Ok(None),
140 }
141}
142
143fn get_warnings_filters(vm: &VirtualMachine) -> PyResult<PyListRef> {
146 if let Some(filters_obj) = get_warnings_attr(vm, identifier!(&vm.ctx, filters), false)?
147 && let Ok(filters) = filters_obj.try_into_value::<PyListRef>(vm)
148 {
149 return Ok(filters);
150 }
151 Ok(vm.state.warnings.filters.clone())
152}
153
154fn get_default_action(vm: &VirtualMachine) -> PyResult<PyObjectRef> {
157 if let Some(action) = get_warnings_attr(vm, identifier!(&vm.ctx, defaultaction), false)? {
158 if !action.class().is(vm.ctx.types.str_type) {
159 return Err(vm.new_type_error(format!(
160 "_warnings.defaultaction must be a string, not '{}'",
161 action.class().name()
162 )));
163 }
164 return Ok(action);
165 }
166 Ok(vm.state.warnings.default_action.clone().into())
167}
168
169fn get_once_registry(vm: &VirtualMachine) -> PyResult<PyObjectRef> {
172 if let Some(registry) = get_warnings_attr(vm, identifier!(&vm.ctx, onceregistry), false)? {
173 if !registry.class().is(vm.ctx.types.dict_type) {
174 return Err(vm.new_type_error(format!(
175 "_warnings.onceregistry must be a dict, not '{}'",
176 registry.class().name()
177 )));
178 }
179 return Ok(registry);
180 }
181 Ok(vm.state.warnings.once_registry.clone().into())
182}
183
184fn already_warned(
185 registry: &PyObject,
186 key: PyObjectRef,
187 should_set: bool,
188 vm: &VirtualMachine,
189) -> PyResult<bool> {
190 if vm.is_none(registry) {
191 return Ok(false);
192 }
193
194 let current_version = vm.state.warnings.filters_version.load(Ordering::SeqCst);
195 let version_obj = registry.get_item(identifier!(&vm.ctx, version), vm).ok();
196
197 let version_matches = version_obj.as_ref().is_some_and(|v| {
198 v.try_int(vm)
199 .map(|i| i.as_u32_mask() as usize == current_version)
200 .unwrap_or(false)
201 });
202
203 if version_matches {
204 if let Ok(val) = registry.get_item(key.as_ref(), vm)
205 && val.is_true(vm)?
206 {
207 return Ok(true);
208 }
209 } else if let Ok(dict) = PyDictRef::try_from_object(vm, registry.to_owned()) {
210 dict.clear();
211 dict.set_item(
212 identifier!(&vm.ctx, version),
213 vm.ctx.new_int(current_version).into(),
214 vm,
215 )?;
216 }
217
218 if should_set {
219 registry.set_item(key.as_ref(), vm.ctx.true_value.clone().into(), vm)?;
220 }
221 Ok(false)
222}
223
224fn update_registry(
227 registry: &PyObject,
228 text: &PyObject,
229 category: &PyObject,
230 add_zero: bool,
231 vm: &VirtualMachine,
232) -> PyResult<bool> {
233 let altkey: PyObjectRef = if add_zero {
234 PyTuple::new_ref(
235 vec![
236 text.to_owned(),
237 category.to_owned(),
238 vm.ctx.new_int(0).into(),
239 ],
240 &vm.ctx,
241 )
242 .into()
243 } else {
244 PyTuple::new_ref(vec![text.to_owned(), category.to_owned()], &vm.ctx).into()
245 };
246 already_warned(registry, altkey, true, vm)
247}
248
249fn normalize_module(filename: &Py<PyStr>, vm: &VirtualMachine) -> PyObjectRef {
250 match filename.byte_len() {
251 0 => vm.new_pyobj("<unknown>"),
252 len if len >= 3 && filename.as_bytes().ends_with(b".py") => {
253 vm.new_pyobj(&filename.as_wtf8()[..len - 3])
254 }
255 _ => filename.as_object().to_owned(),
256 }
257}
258
259fn get_filter(
263 category: PyObjectRef,
264 text: PyObjectRef,
265 lineno: usize,
266 module: PyObjectRef,
267 vm: &VirtualMachine,
268) -> PyResult {
269 let filters = get_warnings_filters(vm)?;
270
271 let mut i = 0;
274 while i < filters.borrow_vec().len() {
275 let Some(tmp_item) = filters.borrow_vec().get(i).cloned() else {
276 break;
277 };
278 let tmp_item = PyTupleRef::try_from_object(vm, tmp_item)
279 .ok()
280 .filter(|t| t.len() == 5)
281 .ok_or_else(|| {
282 vm.new_value_error(format!("_warnings.filters item {i} isn't a 5-tuple"))
283 })?;
284
285 let action = &tmp_item[0];
287 let good_msg = check_matched(&tmp_item[1], &text, vm)?;
288 let is_subclass = category.is_subclass(&tmp_item[2], vm)?;
289 let good_mod = check_matched(&tmp_item[3], &module, vm)?;
290 let ln: usize = tmp_item[4].try_int(vm).map_or(0, |v| v.as_u32_mask() as _);
291
292 if good_msg && is_subclass && good_mod && (ln == 0 || lineno == ln) {
293 return Ok(action.to_owned());
294 }
295 i += 1;
296 }
297
298 get_default_action(vm)
299}
300
301pub fn warn(
302 message: PyObjectRef,
303 category: Option<PyTypeRef>,
304 stack_level: isize,
305 source: Option<PyObjectRef>,
306 vm: &VirtualMachine,
307) -> PyResult<()> {
308 warn_with_skip(message, category, stack_level, source, None, vm)
309}
310
311pub fn warn_with_skip(
313 message: PyObjectRef,
314 category: Option<PyTypeRef>,
315 mut stack_level: isize,
316 source: Option<PyObjectRef>,
317 skip_file_prefixes: Option<PyTupleRef>,
318 vm: &VirtualMachine,
319) -> PyResult<()> {
320 if let Some(ref prefixes) = skip_file_prefixes
321 && !prefixes.is_empty()
322 && stack_level < 2
323 {
324 stack_level = 2;
325 }
326 let (filename, lineno, module, registry) =
327 setup_context(stack_level, skip_file_prefixes.as_ref(), vm)?;
328 warn_explicit(
329 category, message, filename, lineno, module, registry, None, source, vm,
330 )
331}
332
333#[allow(clippy::too_many_arguments)]
335pub(crate) fn warn_explicit(
336 category: Option<PyTypeRef>,
337 message: PyObjectRef,
338 filename: PyStrRef,
339 lineno: usize,
340 module: Option<PyObjectRef>,
341 registry: PyObjectRef,
342 source_line: Option<PyObjectRef>,
343 source: Option<PyObjectRef>,
344 vm: &VirtualMachine,
345) -> PyResult<()> {
346 let module = module.unwrap_or_else(|| normalize_module(&filename, vm));
348 if vm.is_none(&module) {
349 return Ok(());
350 }
351
352 let is_warning = message.fast_isinstance(vm.ctx.exceptions.warning);
354 let (text, category, message) = if is_warning {
355 let text = message.str(vm)?;
356 let cat = message.class().to_owned();
357 (text, cat, message)
358 } else {
359 let text = message.str(vm)?;
361 let cat = category.unwrap_or_else(|| vm.ctx.exceptions.user_warning.to_owned());
362 let instance = cat.as_object().call((text.clone(),), vm)?;
363 (text, cat, instance)
364 };
365
366 let lineno_obj: PyObjectRef = vm.ctx.new_int(lineno).into();
367
368 let key: PyObjectRef = PyTuple::new_ref(
370 vec![
371 text.clone().into(),
372 category.as_object().to_owned(),
373 lineno_obj.clone(),
374 ],
375 &vm.ctx,
376 )
377 .into();
378
379 if !vm.is_none(®istry) && already_warned(®istry, key.clone(), false, vm)? {
381 return Ok(());
382 }
383
384 let action = get_filter(
386 category.as_object().to_owned(),
387 text.clone().into(),
388 lineno,
389 module,
390 vm,
391 )?;
392 let action_str = PyStrRef::try_from_object(vm, action)
393 .map_err(|_| vm.new_type_error("action must be a string"))?;
394
395 if action_str.as_bytes() == b"error" {
396 let exc = PyBaseExceptionRef::try_from_object(vm, message)?;
397 return Err(exc);
398 }
399 if action_str.as_bytes() == b"ignore" {
400 return Ok(());
401 }
402
403 let already = if action_str.as_wtf8() != "always" && action_str.as_wtf8() != "all" {
406 if !vm.is_none(®istry) {
407 registry.set_item(&*key, vm.ctx.true_value.clone().into(), vm)?;
408 }
409
410 let action_s = action_str.to_str();
411 match action_s {
412 Some("once") => {
413 let reg = if vm.is_none(®istry) {
414 get_once_registry(vm)?
415 } else {
416 registry.clone()
417 };
418 update_registry(®, text.as_ref(), category.as_object(), false, vm)?
419 }
420 Some("module") => {
421 if !vm.is_none(®istry) {
422 update_registry(®istry, text.as_ref(), category.as_object(), false, vm)?
423 } else {
424 false
425 }
426 }
427 Some("default") => false,
428 _ => {
429 return Err(vm.new_runtime_error(format!(
430 "Unrecognized action ({action_str}) in warnings.filters:\n {action_str}"
431 )));
432 }
433 }
434 } else {
435 false
436 };
437
438 if already {
439 return Ok(());
440 }
441
442 call_show_warning(
443 category,
444 text,
445 message,
446 filename,
447 lineno,
448 lineno_obj,
449 source_line,
450 source,
451 vm,
452 )
453}
454
455#[allow(clippy::too_many_arguments)]
456fn call_show_warning(
457 category: PyTypeRef,
458 text: PyStrRef,
459 message: PyObjectRef,
460 filename: PyStrRef,
461 lineno: usize,
462 lineno_obj: PyObjectRef,
463 source_line: Option<PyObjectRef>,
464 source: Option<PyObjectRef>,
465 vm: &VirtualMachine,
466) -> PyResult<()> {
467 let Some(show_fn) =
468 get_warnings_attr(vm, identifier!(&vm.ctx, _showwarnmsg), source.is_some())?
469 else {
470 return show_warning(filename, lineno, text, category, source_line, vm);
471 };
472 if !show_fn.is_callable() {
473 return Err(vm.new_type_error("warnings._showwarnmsg() must be set to a callable"));
474 }
475 let Some(warnmsg_cls) = get_warnings_attr(vm, identifier!(&vm.ctx, WarningMessage), false)?
476 else {
477 return Err(vm.new_runtime_error("unable to get warnings.WarningMessage"));
478 };
479
480 let msg = warnmsg_cls.call(
481 vec![
482 message,
483 category.into(),
484 filename.into(),
485 lineno_obj,
486 vm.ctx.none(),
487 vm.ctx.none(),
488 vm.unwrap_or_none(source),
489 ],
490 vm,
491 )?;
492 show_fn.call((msg,), vm)?;
493 Ok(())
494}
495
496fn show_warning(
497 filename: PyStrRef,
498 lineno: usize,
499 text: PyStrRef,
500 category: PyTypeRef,
501 _source_line: Option<PyObjectRef>,
502 vm: &VirtualMachine,
503) -> PyResult<()> {
504 let stderr = crate::stdlib::sys::PyStderr(vm);
505 writeln!(
506 stderr,
507 "{}:{}: {}: {}",
508 filename,
509 lineno,
510 category.name(),
511 text
512 );
513 Ok(())
514}
515
516fn is_filename_to_skip(frame: &crate::frame::Frame, prefixes: &PyTupleRef) -> bool {
518 let filename = frame.f_code().co_filename();
519 let filename_bytes = filename.as_bytes();
520 prefixes.iter().any(|prefix| {
521 prefix
522 .downcast_ref::<PyStr>()
523 .is_some_and(|s| filename_bytes.starts_with(s.as_bytes()))
524 })
525}
526
527fn next_external_frame_with_skip(
529 frame: &crate::frame::FrameRef,
530 skip_file_prefixes: Option<&PyTupleRef>,
531 vm: &VirtualMachine,
532) -> Option<crate::frame::FrameRef> {
533 let mut f = frame.f_back(vm);
534 loop {
535 let current: crate::frame::FrameRef = f.take()?;
536 if current.is_internal_frame()
537 || skip_file_prefixes.is_some_and(|p| is_filename_to_skip(¤t, p))
538 {
539 f = current.f_back(vm);
540 } else {
541 return Some(current);
542 }
543 }
544}
545
546fn setup_context(
549 mut stack_level: isize,
550 skip_file_prefixes: Option<&PyTupleRef>,
551 vm: &VirtualMachine,
552) -> PyResult<(PyStrRef, usize, Option<PyObjectRef>, PyObjectRef)> {
553 let mut f = vm.current_frame();
554
555 if stack_level <= 0 || f.as_ref().is_some_and(|frame| frame.is_internal_frame()) {
558 while {
559 stack_level -= 1;
560 stack_level > 0
561 } {
562 match f {
563 Some(tmp) => f = tmp.f_back(vm),
564 None => break,
565 }
566 }
567 } else {
568 while {
569 stack_level -= 1;
570 stack_level > 0
571 } {
572 match f {
573 Some(tmp) => f = next_external_frame_with_skip(&tmp, skip_file_prefixes, vm),
574 None => break,
575 }
576 }
577 }
578
579 let (globals, filename, lineno) = if let Some(f) = f {
580 (f.globals.clone(), f.code.source_path(), f.f_lineno())
581 } else if let Some(frame) = vm.current_frame() {
582 (frame.globals.clone(), vm.ctx.intern_str("<sys>"), 1)
584 } else {
585 let globals = vm
587 .sys_module
588 .as_object()
589 .get_attr(identifier!(vm, __dict__), vm)
590 .and_then(|d| {
591 d.downcast::<crate::builtins::PyDict>()
592 .map_err(|_| vm.new_type_error("sys.__dict__ is not a dictionary"))
593 })?;
594 (globals, vm.ctx.intern_str("<sys>"), 0)
595 };
596
597 let registry = match globals.get_item("__warningregistry__", vm) {
598 Ok(r) => r,
599 Err(_) => {
600 let r = vm.ctx.new_dict();
601 globals.set_item("__warningregistry__", r.clone().into(), vm)?;
602 r.into()
603 }
604 };
605
606 let module = globals
608 .get_item("__name__", vm)
609 .unwrap_or_else(|_| vm.new_pyobj("<string>"));
610 Ok((filename.to_owned(), lineno, Some(module), registry))
611}