1#[cfg(feature = "threading")]
2use super::FramePtr;
3#[cfg(feature = "threading")]
4use crate::builtins::PyBaseExceptionRef;
5use crate::frame::Frame;
6use crate::{AsObject, PyObject, VirtualMachine};
7#[cfg(feature = "threading")]
8use alloc::sync::Arc;
9use core::{
10 cell::{Cell, RefCell},
11 ptr::NonNull,
12 sync::atomic::{AtomicPtr, Ordering},
13};
14use itertools::Itertools;
15use std::thread_local;
16
17#[cfg(all(unix, feature = "threading"))]
22pub const THREAD_DETACHED: i32 = 0;
23#[cfg(all(unix, feature = "threading"))]
24pub const THREAD_ATTACHED: i32 = 1;
25#[cfg(all(unix, feature = "threading"))]
26pub const THREAD_SUSPENDED: i32 = 2;
27
28#[cfg(feature = "threading")]
31pub struct ThreadSlot {
32 pub frames: parking_lot::Mutex<Vec<FramePtr>>,
35 pub exception: crate::PyAtomicRef<Option<crate::exceptions::types::PyBaseException>>,
36 #[cfg(unix)]
38 pub state: core::sync::atomic::AtomicI32,
39 #[cfg(unix)]
41 pub stop_requested: core::sync::atomic::AtomicBool,
42 #[cfg(unix)]
44 pub thread: std::thread::Thread,
45}
46
47#[cfg(feature = "threading")]
48pub type CurrentFrameSlot = Arc<ThreadSlot>;
49
50thread_local! {
51 pub(super) static VM_STACK: RefCell<Vec<NonNull<VirtualMachine>>> = Vec::with_capacity(1).into();
52
53 pub(crate) static COROUTINE_ORIGIN_TRACKING_DEPTH: Cell<u32> = const { Cell::new(0) };
54
55 #[cfg(feature = "threading")]
57 static CURRENT_THREAD_SLOT: RefCell<Option<CurrentFrameSlot>> = const { RefCell::new(None) };
58
59 pub(crate) static CURRENT_FRAME: AtomicPtr<Frame> =
65 const { AtomicPtr::new(core::ptr::null_mut()) };
66
67}
68
69scoped_tls::scoped_thread_local!(static VM_CURRENT: VirtualMachine);
70
71pub fn with_current_vm<R>(f: impl FnOnce(&VirtualMachine) -> R) -> R {
72 if !VM_CURRENT.is_set() {
73 panic!("call with_current_vm() but VM_CURRENT is null");
74 }
75 VM_CURRENT.with(f)
76}
77
78pub fn enter_vm<R>(vm: &VirtualMachine, f: impl FnOnce() -> R) -> R {
79 VM_STACK.with(|vms| {
80 #[cfg(all(unix, feature = "threading"))]
82 let was_outermost = vms.borrow().is_empty();
83
84 vms.borrow_mut().push(vm.into());
85
86 #[cfg(feature = "threading")]
88 init_thread_slot_if_needed(vm);
89
90 #[cfg(all(unix, feature = "threading"))]
91 if was_outermost {
92 attach_thread(vm);
93 }
94
95 scopeguard::defer! {
96 #[cfg(all(unix, feature = "threading"))]
98 if vms.borrow().len() == 1 {
99 detach_thread();
100 }
101 vms.borrow_mut().pop();
102 }
103 VM_CURRENT.set(vm, f)
104 })
105}
106
107#[cfg(feature = "threading")]
110fn init_thread_slot_if_needed(vm: &VirtualMachine) {
111 CURRENT_THREAD_SLOT.with(|slot| {
112 if slot.borrow().is_none() {
113 let thread_id = crate::stdlib::_thread::get_ident();
114 let mut registry = vm.state.thread_frames.lock();
115 let new_slot = Arc::new(ThreadSlot {
116 frames: parking_lot::Mutex::new(Vec::new()),
117 exception: crate::PyAtomicRef::from(None::<PyBaseExceptionRef>),
118 #[cfg(unix)]
119 state: core::sync::atomic::AtomicI32::new(
120 if vm.state.stop_the_world.requested.load(Ordering::Acquire) {
121 THREAD_SUSPENDED
124 } else {
125 THREAD_DETACHED
126 },
127 ),
128 #[cfg(unix)]
129 stop_requested: core::sync::atomic::AtomicBool::new(false),
130 #[cfg(unix)]
131 thread: std::thread::current(),
132 });
133 registry.insert(thread_id, new_slot.clone());
134 drop(registry);
135 *slot.borrow_mut() = Some(new_slot);
136 }
137 });
138}
139
140#[cfg(all(unix, feature = "threading"))]
143fn wait_while_suspended(slot: &ThreadSlot) -> u64 {
144 let mut wait_yields = 0u64;
145 while slot.state.load(Ordering::Acquire) == THREAD_SUSPENDED {
146 wait_yields = wait_yields.saturating_add(1);
147 std::thread::park();
148 }
149 wait_yields
150}
151
152#[cfg(all(unix, feature = "threading"))]
153fn attach_thread(vm: &VirtualMachine) {
154 CURRENT_THREAD_SLOT.with(|slot| {
155 if let Some(s) = slot.borrow().as_ref() {
156 super::stw_trace(format_args!("attach begin"));
157 loop {
158 match s.state.compare_exchange(
159 THREAD_DETACHED,
160 THREAD_ATTACHED,
161 Ordering::AcqRel,
162 Ordering::Relaxed,
163 ) {
164 Ok(_) => {
165 super::stw_trace(format_args!("attach DETACHED->ATTACHED"));
166 break;
167 }
168 Err(THREAD_SUSPENDED) => {
169 super::stw_trace(format_args!("attach wait-suspended"));
171 let wait_yields = wait_while_suspended(s);
172 vm.state.stop_the_world.add_attach_wait_yields(wait_yields);
173 }
175 Err(state) => {
176 debug_assert!(false, "unexpected thread state in attach: {state}");
177 break;
178 }
179 }
180 }
181 }
182 });
183}
184
185#[cfg(all(unix, feature = "threading"))]
187fn detach_thread() {
188 CURRENT_THREAD_SLOT.with(|slot| {
189 if let Some(s) = slot.borrow().as_ref() {
190 match s.state.compare_exchange(
191 THREAD_ATTACHED,
192 THREAD_DETACHED,
193 Ordering::AcqRel,
194 Ordering::Acquire,
195 ) {
196 Ok(_) => {}
197 Err(THREAD_DETACHED) => {
198 debug_assert!(false, "detach called while already DETACHED");
199 return;
200 }
201 Err(state) => {
202 debug_assert!(false, "unexpected thread state in detach: {state}");
203 return;
204 }
205 }
206 super::stw_trace(format_args!("detach ATTACHED->DETACHED"));
207 }
208 });
209}
210
211#[cfg(all(unix, feature = "threading"))]
217pub fn allow_threads<R>(vm: &VirtualMachine, f: impl FnOnce() -> R) -> R {
218 let should_transition = CURRENT_THREAD_SLOT.with(|slot| {
222 slot.borrow()
223 .as_ref()
224 .is_some_and(|s| s.state.load(Ordering::Acquire) == THREAD_ATTACHED)
225 });
226 if !should_transition {
227 return f();
228 }
229
230 detach_thread();
231 let reattach_guard = scopeguard::guard(vm, attach_thread);
232 let result = f();
233 drop(reattach_guard);
234 result
235}
236
237#[cfg(not(all(unix, feature = "threading")))]
239pub fn allow_threads<R>(_vm: &VirtualMachine, f: impl FnOnce() -> R) -> R {
240 f()
241}
242
243#[cfg(all(unix, feature = "threading"))]
247pub fn suspend_if_needed(stw: &super::StopTheWorldState) {
248 let should_suspend = CURRENT_THREAD_SLOT.with(|slot| {
249 slot.borrow()
250 .as_ref()
251 .is_some_and(|s| s.stop_requested.load(Ordering::Relaxed))
252 });
253 if !should_suspend {
254 return;
255 }
256
257 if !stw.requested.load(Ordering::Acquire) {
258 CURRENT_THREAD_SLOT.with(|slot| {
259 if let Some(s) = slot.borrow().as_ref() {
260 s.stop_requested.store(false, Ordering::Release);
261 }
262 });
263 return;
264 }
265
266 do_suspend(stw);
267}
268
269#[cfg(all(unix, feature = "threading"))]
270#[cold]
271fn do_suspend(stw: &super::StopTheWorldState) {
272 CURRENT_THREAD_SLOT.with(|slot| {
273 if let Some(s) = slot.borrow().as_ref() {
274 match s.state.compare_exchange(
276 THREAD_ATTACHED,
277 THREAD_SUSPENDED,
278 Ordering::AcqRel,
279 Ordering::Acquire,
280 ) {
281 Ok(_) => {
282 s.stop_requested.store(false, Ordering::Release);
284 }
285 Err(THREAD_DETACHED) => {
286 super::stw_trace(format_args!("suspend skip DETACHED"));
288 return;
289 }
290 Err(THREAD_SUSPENDED) => {
291 s.stop_requested.store(false, Ordering::Release);
293 super::stw_trace(format_args!("suspend skip already-suspended"));
294 return;
295 }
296 Err(state) => {
297 debug_assert!(false, "unexpected thread state in suspend: {state}");
298 return;
299 }
300 }
301 super::stw_trace(format_args!("suspend ATTACHED->SUSPENDED"));
302
303 if !stw.requested.load(Ordering::Acquire) {
306 s.state.store(THREAD_ATTACHED, Ordering::Release);
307 s.stop_requested.store(false, Ordering::Release);
308 super::stw_trace(format_args!("suspend abort requested-cleared"));
309 return;
310 }
311
312 stw.notify_suspended();
314 super::stw_trace(format_args!("suspend notified-requester"));
315
316 let wait_yields = wait_while_suspended(s);
318 stw.add_suspend_wait_yields(wait_yields);
319
320 loop {
322 match s.state.compare_exchange(
323 THREAD_DETACHED,
324 THREAD_ATTACHED,
325 Ordering::AcqRel,
326 Ordering::Acquire,
327 ) {
328 Ok(_) => break,
329 Err(THREAD_SUSPENDED) => {
330 let extra_wait = wait_while_suspended(s);
331 stw.add_suspend_wait_yields(extra_wait);
332 }
333 Err(THREAD_ATTACHED) => break,
334 Err(state) => {
335 debug_assert!(false, "unexpected post-suspend state: {state}");
336 break;
337 }
338 }
339 }
340 s.stop_requested.store(false, Ordering::Release);
341 super::stw_trace(format_args!("suspend resume -> ATTACHED"));
342 }
343 });
344}
345
346#[cfg(all(unix, feature = "threading"))]
347#[inline]
348pub fn stop_requested_for_current_thread() -> bool {
349 CURRENT_THREAD_SLOT.with(|slot| {
350 slot.borrow()
351 .as_ref()
352 .is_some_and(|s| s.stop_requested.load(Ordering::Relaxed))
353 })
354}
355
356#[cfg(feature = "threading")]
359pub fn push_thread_frame(fp: FramePtr) {
360 CURRENT_THREAD_SLOT.with(|slot| {
361 if let Some(s) = slot.borrow().as_ref() {
362 s.frames.lock().push(fp);
363 } else {
364 debug_assert!(
365 false,
366 "push_thread_frame called without initialized thread slot"
367 );
368 }
369 });
370}
371
372#[cfg(feature = "threading")]
375pub fn pop_thread_frame() {
376 CURRENT_THREAD_SLOT.with(|slot| {
377 if let Some(s) = slot.borrow().as_ref() {
378 s.frames.lock().pop();
379 } else {
380 debug_assert!(
381 false,
382 "pop_thread_frame called without initialized thread slot"
383 );
384 }
385 });
386}
387
388pub fn set_current_frame(frame: *const Frame) -> *const Frame {
391 CURRENT_FRAME.with(|c| c.swap(frame as *mut Frame, Ordering::Relaxed) as *const Frame)
392}
393
394pub fn get_current_frame() -> *const Frame {
397 CURRENT_FRAME.with(|c| c.load(Ordering::Relaxed) as *const Frame)
398}
399
400#[cfg(feature = "threading")]
403pub fn update_thread_exception(exc: Option<PyBaseExceptionRef>) {
404 CURRENT_THREAD_SLOT.with(|slot| {
405 if let Some(s) = slot.borrow().as_ref() {
406 let _old = unsafe { s.exception.swap(exc) };
409 }
410 });
411}
412
413#[cfg(feature = "threading")]
416pub fn get_all_current_exceptions(vm: &VirtualMachine) -> Vec<(u64, Option<PyBaseExceptionRef>)> {
417 let registry = vm.state.thread_frames.lock();
418 registry
419 .iter()
420 .map(|(id, slot)| (*id, slot.exception.to_owned()))
421 .collect()
422}
423
424#[cfg(feature = "threading")]
426pub fn cleanup_current_thread_frames(vm: &VirtualMachine) {
427 let thread_id = crate::stdlib::_thread::get_ident();
428 let current_slot = CURRENT_THREAD_SLOT.with(|slot| slot.borrow().as_ref().cloned());
429
430 #[cfg(all(unix, feature = "threading"))]
433 if let Some(slot) = ¤t_slot {
434 let _ = slot.state.compare_exchange(
435 THREAD_ATTACHED,
436 THREAD_DETACHED,
437 Ordering::AcqRel,
438 Ordering::Acquire,
439 );
440 }
441
442 let removed = if let Some(slot) = ¤t_slot {
445 let mut registry = vm.state.thread_frames.lock();
446 match registry.get(&thread_id) {
447 Some(registered) if Arc::ptr_eq(registered, slot) => registry.remove(&thread_id),
448 _ => None,
449 }
450 } else {
451 None
452 };
453 #[cfg(all(unix, feature = "threading"))]
454 if let Some(slot) = &removed
455 && vm.state.stop_the_world.requested.load(Ordering::Acquire)
456 && thread_id != vm.state.stop_the_world.requester_ident()
457 && slot.state.load(Ordering::Relaxed) != THREAD_SUSPENDED
458 {
459 vm.state.stop_the_world.notify_thread_gone();
462 }
463 CURRENT_THREAD_SLOT.with(|s| {
464 *s.borrow_mut() = None;
465 });
466}
467
468#[cfg(feature = "threading")]
475pub fn reinit_frame_slot_after_fork(vm: &VirtualMachine) {
476 let current_ident = crate::stdlib::_thread::get_ident();
477 let current_frames: Vec<FramePtr> = vm.frames.borrow().clone();
478 let new_slot = Arc::new(ThreadSlot {
479 frames: parking_lot::Mutex::new(current_frames),
480 exception: crate::PyAtomicRef::from(vm.topmost_exception()),
481 #[cfg(unix)]
482 state: core::sync::atomic::AtomicI32::new(THREAD_ATTACHED),
483 #[cfg(unix)]
484 stop_requested: core::sync::atomic::AtomicBool::new(false),
485 #[cfg(unix)]
486 thread: std::thread::current(),
487 });
488
489 let mut registry = vm.state.thread_frames.lock();
491 registry.clear();
492 registry.insert(current_ident, new_slot.clone());
493 drop(registry);
494
495 CURRENT_THREAD_SLOT.with(|s| {
496 *s.borrow_mut() = Some(new_slot);
497 });
498}
499
500pub fn with_vm<F, R>(obj: &PyObject, f: F) -> Option<R>
501where
502 F: Fn(&VirtualMachine) -> R,
503{
504 let vm_owns_obj = |interp: NonNull<VirtualMachine>| {
505 let vm = unsafe { interp.as_ref() };
507 obj.fast_isinstance(vm.ctx.types.object_type)
508 };
509 VM_STACK.with(|vms| {
510 let interp = match vms.borrow().iter().copied().exactly_one() {
511 Ok(x) => {
512 debug_assert!(vm_owns_obj(x));
513 x
514 }
515 Err(mut others) => others.find(|x| vm_owns_obj(*x))?,
516 };
517 let vm = unsafe { interp.as_ref() };
520 Some(VM_CURRENT.set(vm, || f(vm)))
521 })
522}
523
524#[must_use = "ThreadedVirtualMachine does nothing unless you move it to another thread and call .run()"]
525#[cfg(feature = "threading")]
526pub struct ThreadedVirtualMachine {
527 pub(super) vm: VirtualMachine,
528}
529
530#[cfg(feature = "threading")]
531impl ThreadedVirtualMachine {
532 pub fn make_spawn_func<F, R>(self, f: F) -> impl FnOnce() -> R
541 where
542 F: FnOnce(&VirtualMachine) -> R,
543 {
544 move || self.run(f)
545 }
546
547 pub fn run<F, R>(&self, f: F) -> R
556 where
557 F: FnOnce(&VirtualMachine) -> R,
558 {
559 let vm = &self.vm;
560 enter_vm(vm, || f(vm))
561 }
562}
563
564impl VirtualMachine {
565 #[cfg(feature = "threading")]
574 pub fn start_thread<F, R>(&self, f: F) -> std::thread::JoinHandle<R>
575 where
576 F: FnOnce(&Self) -> R,
577 F: Send + 'static,
578 R: Send + 'static,
579 {
580 let func = self.new_thread().make_spawn_func(f);
581 std::thread::spawn(func)
582 }
583
584 #[cfg(feature = "threading")]
607 pub fn new_thread(&self) -> ThreadedVirtualMachine {
608 let global_trace = self.state.global_trace_func.lock().clone();
609 let global_profile = self.state.global_profile_func.lock().clone();
610 let use_tracing = global_trace.is_some() || global_profile.is_some();
611
612 let vm = Self {
613 builtins: self.builtins.clone(),
614 sys_module: self.sys_module.clone(),
615 ctx: self.ctx.clone(),
616 frames: RefCell::new(vec![]),
617 datastack: core::cell::UnsafeCell::new(crate::datastack::DataStack::new()),
618 wasm_id: self.wasm_id.clone(),
619 exceptions: RefCell::default(),
620 import_func: self.import_func.clone(),
621 importlib: self.importlib.clone(),
622 profile_func: RefCell::new(global_profile.unwrap_or_else(|| self.ctx.none())),
623 trace_func: RefCell::new(global_trace.unwrap_or_else(|| self.ctx.none())),
624 use_tracing: Cell::new(use_tracing),
625 recursion_limit: self.recursion_limit.clone(),
626 signal_handlers: core::cell::OnceCell::new(),
627 signal_rx: None,
628 repr_guards: RefCell::default(),
629 state: self.state.clone(),
630 initialized: self.initialized,
631 recursion_depth: Cell::new(0),
632 c_stack_soft_limit: Cell::new(VirtualMachine::calculate_c_stack_soft_limit()),
633 async_gen_firstiter: RefCell::new(None),
634 async_gen_finalizer: RefCell::new(None),
635 asyncio_running_loop: RefCell::new(None),
636 asyncio_running_task: RefCell::new(None),
637 callable_cache: self.callable_cache.clone(),
638 };
639 ThreadedVirtualMachine { vm }
640 }
641}