1use crate::{
2 builtins::{
3 PyDict, PyDictRef, PyListRef, PyStr, PyStrInterned, PyStrRef, PyTuple, PyTupleRef,
4 PyTypeRef,
5 },
6 convert::{IntoObject, TryFromObject},
7 types::PyComparisonOp,
8 AsObject, Context, Py, PyObjectRef, PyResult, VirtualMachine,
9};
10
11pub struct WarningsState {
12 filters: PyListRef,
13 _once_registry: PyDictRef,
14 default_action: PyStrRef,
15 filters_version: usize,
16}
17
18impl WarningsState {
19 fn create_filter(ctx: &Context) -> PyListRef {
20 ctx.new_list(vec![ctx
21 .new_tuple(vec![
22 ctx.new_str("__main__").into(),
23 ctx.types.none_type.as_object().to_owned(),
24 ctx.exceptions.warning.as_object().to_owned(),
25 ctx.new_str("ACTION").into(),
26 ctx.new_int(0).into(),
27 ])
28 .into()])
29 }
30
31 pub fn init_state(ctx: &Context) -> WarningsState {
32 WarningsState {
33 filters: Self::create_filter(ctx),
34 _once_registry: PyDict::new_ref(ctx),
35 default_action: ctx.new_str("default"),
36 filters_version: 0,
37 }
38 }
39}
40
41fn check_matched(obj: &PyObjectRef, arg: &PyObjectRef, vm: &VirtualMachine) -> PyResult<bool> {
42 if obj.class().is(vm.ctx.types.none_type) {
43 return Ok(true);
44 }
45
46 if obj.rich_compare_bool(arg, PyComparisonOp::Eq, vm)? {
47 return Ok(false);
48 }
49
50 let result = obj.call((arg.to_owned(),), vm);
51 Ok(result.is_ok())
52}
53
54fn get_warnings_attr(
55 vm: &VirtualMachine,
56 attr_name: &'static PyStrInterned,
57 try_import: bool,
58) -> PyResult<Option<PyObjectRef>> {
59 let module = if try_import
60 && !vm
61 .state
62 .finalizing
63 .load(std::sync::atomic::Ordering::SeqCst)
64 {
65 match vm.import("warnings", 0) {
66 Ok(module) => module,
67 Err(_) => return Ok(None),
68 }
69 } else {
70 return Ok(None);
72 };
73 Ok(Some(module.get_attr(attr_name, vm)?))
74}
75
76pub fn warn(
77 message: PyStrRef,
78 category: Option<PyTypeRef>,
79 stack_level: isize,
80 source: Option<PyObjectRef>,
81 vm: &VirtualMachine,
82) -> PyResult<()> {
83 let (filename, lineno, module, registry) = setup_context(stack_level, vm)?;
84 warn_explicit(
85 category, message, filename, lineno, module, registry, None, source, vm,
86 )
87}
88
89fn get_default_action(vm: &VirtualMachine) -> PyResult<PyObjectRef> {
90 Ok(vm.state.warnings.default_action.clone().into())
91 }
98
99fn get_filter(
100 category: PyObjectRef,
101 text: PyObjectRef,
102 lineno: usize,
103 module: PyObjectRef,
104 mut _item: PyTupleRef,
105 vm: &VirtualMachine,
106) -> PyResult {
107 let filters = vm.state.warnings.filters.as_object().to_owned();
108
109 let filters: PyListRef = filters
110 .try_into_value(vm)
111 .map_err(|_| vm.new_value_error("_warnings.filters must be a list".to_string()))?;
112
113 for i in 0..filters.borrow_vec().len() {
115 let tmp_item = if let Some(tmp_item) = filters.borrow_vec().get(i).cloned() {
116 let tmp_item = PyTupleRef::try_from_object(vm, tmp_item)?;
117 (tmp_item.len() == 5).then_some(tmp_item)
118 } else {
119 None
120 }
121 .ok_or_else(|| vm.new_value_error(format!("_warnings.filters item {i} isn't a 5-tuple")))?;
122
123 let action = if let Some(action) = tmp_item.first() {
125 action.str(vm).map(|action| action.into_object())
126 } else {
127 Err(vm.new_type_error("action must be a string".to_string()))
128 };
129
130 let good_msg = if let Some(msg) = tmp_item.get(1) {
131 check_matched(msg, &text, vm)?
132 } else {
133 false
134 };
135
136 let is_subclass = if let Some(cat) = tmp_item.get(2) {
137 category.fast_isinstance(cat.class())
138 } else {
139 false
140 };
141
142 let good_mod = if let Some(item_mod) = tmp_item.get(3) {
143 check_matched(item_mod, &module, vm)?
144 } else {
145 false
146 };
147
148 let ln = tmp_item.get(4).map_or(0, |ln_obj| {
149 ln_obj.try_int(vm).map_or(0, |ln| ln.as_u32_mask() as _)
150 });
151
152 if good_msg && good_mod && is_subclass && (ln == 0 || lineno == ln) {
153 _item = tmp_item;
154 return action;
155 }
156 }
157
158 get_default_action(vm)
159}
160
161fn already_warned(
162 registry: PyObjectRef,
163 key: PyObjectRef,
164 should_set: bool,
165 vm: &VirtualMachine,
166) -> PyResult<bool> {
167 let version_obj = registry.get_item(identifier!(&vm.ctx, version), vm).ok();
168 let filters_version = vm.ctx.new_int(vm.state.warnings.filters_version).into();
169
170 match version_obj {
171 Some(version_obj)
172 if version_obj.try_int(vm).is_ok() || version_obj.is(&filters_version) =>
173 {
174 let already_warned = registry.get_item(key.as_ref(), vm)?;
175 if already_warned.is_true(vm)? {
176 return Ok(true);
177 }
178 }
179 _ => {
180 let registry = registry.dict();
181 if let Some(registry) = registry.as_ref() {
182 registry.clear();
183 let r = registry.set_item("version", filters_version, vm);
184 if r.is_err() {
185 return Ok(false);
186 }
187 }
188 }
189 }
190
191 if !should_set {
193 return Ok(false);
194 }
195
196 let item = vm.ctx.true_value.clone().into();
197 let _ = registry.set_item(key.as_ref(), item, vm); Ok(true)
199}
200
201fn normalize_module(filename: &Py<PyStr>, vm: &VirtualMachine) -> Option<PyObjectRef> {
202 let obj = match filename.char_len() {
203 0 => vm.new_pyobj("<unknown>"),
204 len if len >= 3 && filename.as_str().ends_with(".py") => {
205 vm.new_pyobj(&filename.as_str()[..len - 3])
206 }
207 _ => filename.as_object().to_owned(),
208 };
209 Some(obj)
210}
211
212#[allow(clippy::too_many_arguments)]
213fn warn_explicit(
214 category: Option<PyTypeRef>,
215 message: PyStrRef,
216 filename: PyStrRef,
217 lineno: usize,
218 module: Option<PyObjectRef>,
219 registry: PyObjectRef,
220 source_line: Option<PyObjectRef>,
221 source: Option<PyObjectRef>,
222 vm: &VirtualMachine,
223) -> PyResult<()> {
224 let registry: PyObjectRef = registry
225 .try_into_value(vm)
226 .map_err(|_| vm.new_type_error("'registry' must be a dict or None".to_owned()))?;
227
228 let module = match module.or_else(|| normalize_module(&filename, vm)) {
230 Some(module) => module,
231 None => return Ok(()),
232 };
233
234 let text = message.as_str();
236
237 let category = if let Some(category) = category {
238 if !category.fast_issubclass(vm.ctx.exceptions.warning) {
239 return Err(vm.new_type_error(format!(
240 "category must be a Warning subclass, not '{}'",
241 category.class().name()
242 )));
243 }
244 category
245 } else {
246 vm.ctx.exceptions.user_warning.to_owned()
247 };
248
249 let category = if message.fast_isinstance(vm.ctx.exceptions.warning) {
250 message.class().to_owned()
251 } else {
252 category
253 };
254
255 let key = PyTuple::new_ref(
257 vec![
258 vm.ctx.new_int(3).into(),
259 vm.ctx.new_str(text).into(),
260 category.as_object().to_owned(),
261 vm.ctx.new_int(lineno).into(),
262 ],
263 &vm.ctx,
264 );
265
266 if !vm.is_none(registry.as_object()) && already_warned(registry, key.into_object(), false, vm)?
267 {
268 return Ok(());
269 }
270
271 let item = vm.ctx.new_tuple(vec![]);
272 let action = get_filter(
273 category.as_object().to_owned(),
274 vm.ctx.new_str(text).into(),
275 lineno,
276 module,
277 item,
278 vm,
279 )?;
280
281 if action.str(vm)?.as_str().eq("error") {
282 return Err(vm.new_type_error(message.to_string()));
283 }
284
285 if action.str(vm)?.as_str().eq("ignore") {
286 return Ok(());
287 }
288
289 call_show_warning(
290 category,
292 message,
293 filename,
294 lineno, source_line,
296 source,
297 vm,
298 )
299}
300
301fn call_show_warning(
302 category: PyTypeRef,
303 message: PyStrRef,
304 filename: PyStrRef,
305 lineno: usize,
306 source_line: Option<PyObjectRef>,
307 source: Option<PyObjectRef>,
308 vm: &VirtualMachine,
309) -> PyResult<()> {
310 let Some(show_fn) =
311 get_warnings_attr(vm, identifier!(&vm.ctx, _showwarnmsg), source.is_some())?
312 else {
313 return show_warning(filename, lineno, message, category, source_line, vm);
314 };
315 if !show_fn.is_callable() {
316 return Err(
317 vm.new_type_error("warnings._showwarnmsg() must be set to a callable".to_owned())
318 );
319 }
320 let Some(warnmsg_cls) = get_warnings_attr(vm, identifier!(&vm.ctx, WarningMessage), false)?
321 else {
322 return Err(vm.new_type_error("unable to get warnings.WarningMessage".to_owned()));
323 };
324
325 let msg = warnmsg_cls.call(
326 vec![
327 message.into(),
328 category.into(),
329 filename.into(),
330 vm.new_pyobj(lineno),
331 vm.ctx.none(),
332 vm.ctx.none(),
333 vm.unwrap_or_none(source),
334 ],
335 vm,
336 )?;
337 show_fn.call((msg,), vm)?;
338 Ok(())
339}
340
341fn show_warning(
342 _filename: PyStrRef,
343 _lineno: usize,
344 text: PyStrRef,
345 category: PyTypeRef,
346 _source_line: Option<PyObjectRef>,
347 vm: &VirtualMachine,
348) -> PyResult<()> {
349 let stderr = crate::stdlib::sys::PyStderr(vm);
350 writeln!(stderr, "{}: {}", category.name(), text.as_str(),);
351 Ok(())
352}
353
354fn setup_context(
357 mut stack_level: isize,
358 vm: &VirtualMachine,
359) -> PyResult<
360 (PyStrRef, usize, Option<PyObjectRef>, PyObjectRef),
362> {
363 let __warningregistry__ = "__warningregistry__";
364 let __name__ = "__name__";
365
366 let mut f = vm.current_frame().as_deref().cloned();
367
368 if stack_level <= 0 || f.as_ref().map_or(false, |frame| frame.is_internal_frame()) {
371 loop {
372 stack_level -= 1;
373 if stack_level <= 0 {
374 break;
375 }
376 if let Some(tmp) = f {
377 f = tmp.f_back(vm);
378 } else {
379 break;
380 }
381 }
382 } else {
383 loop {
384 stack_level -= 1;
385 if stack_level <= 0 {
386 break;
387 }
388 if let Some(tmp) = f {
389 f = tmp.next_external_frame(vm);
390 } else {
391 break;
392 }
393 }
394 }
395
396 let (globals, filename, lineno) = if let Some(f) = f {
397 (f.globals.clone(), f.code.source_path, f.f_lineno())
398 } else {
399 (vm.current_globals().clone(), vm.ctx.intern_str("sys"), 1)
400 };
401
402 let registry = if let Ok(registry) = globals.get_item(__warningregistry__, vm) {
403 registry
404 } else {
405 let registry = PyDict::new_ref(&vm.ctx);
406 globals.set_item(__warningregistry__, registry.clone().into(), vm)?;
407 registry.into()
408 };
409
410 let module = globals
412 .get_item(__name__, vm)
413 .unwrap_or_else(|_| vm.new_pyobj("<string>"));
414 Ok((filename.to_owned(), lineno, Some(module), registry))
415}