#[cfg(feature = "threading")]
use super::FramePtr;
#[cfg(feature = "threading")]
use crate::builtins::PyBaseExceptionRef;
use crate::frame::Frame;
use crate::{AsObject, PyObject, VirtualMachine};
#[cfg(feature = "threading")]
use alloc::sync::Arc;
use core::{
cell::{Cell, RefCell},
ptr::NonNull,
sync::atomic::{AtomicPtr, Ordering},
};
use itertools::Itertools;
use std::thread_local;
#[cfg(all(unix, feature = "threading"))]
pub const THREAD_DETACHED: i32 = 0;
#[cfg(all(unix, feature = "threading"))]
pub const THREAD_ATTACHED: i32 = 1;
#[cfg(all(unix, feature = "threading"))]
pub const THREAD_SUSPENDED: i32 = 2;
#[cfg(feature = "threading")]
pub struct ThreadSlot {
pub frames: parking_lot::Mutex<Vec<FramePtr>>,
pub exception: crate::PyAtomicRef<Option<crate::exceptions::types::PyBaseException>>,
#[cfg(unix)]
pub state: core::sync::atomic::AtomicI32,
#[cfg(unix)]
pub stop_requested: core::sync::atomic::AtomicBool,
#[cfg(unix)]
pub thread: std::thread::Thread,
}
#[cfg(feature = "threading")]
pub type CurrentFrameSlot = Arc<ThreadSlot>;
thread_local! {
pub(super) static VM_STACK: RefCell<Vec<NonNull<VirtualMachine>>> = Vec::with_capacity(1).into();
pub(crate) static COROUTINE_ORIGIN_TRACKING_DEPTH: Cell<u32> = const { Cell::new(0) };
#[cfg(feature = "threading")]
static CURRENT_THREAD_SLOT: RefCell<Option<CurrentFrameSlot>> = const { RefCell::new(None) };
pub(crate) static CURRENT_FRAME: AtomicPtr<Frame> =
const { AtomicPtr::new(core::ptr::null_mut()) };
}
scoped_tls::scoped_thread_local!(static VM_CURRENT: VirtualMachine);
pub fn with_current_vm<R>(f: impl FnOnce(&VirtualMachine) -> R) -> R {
if !VM_CURRENT.is_set() {
panic!("call with_current_vm() but VM_CURRENT is null");
}
VM_CURRENT.with(f)
}
pub fn enter_vm<R>(vm: &VirtualMachine, f: impl FnOnce() -> R) -> R {
VM_STACK.with(|vms| {
#[cfg(all(unix, feature = "threading"))]
let was_outermost = vms.borrow().is_empty();
vms.borrow_mut().push(vm.into());
#[cfg(feature = "threading")]
init_thread_slot_if_needed(vm);
#[cfg(all(unix, feature = "threading"))]
if was_outermost {
attach_thread(vm);
}
scopeguard::defer! {
#[cfg(all(unix, feature = "threading"))]
if vms.borrow().len() == 1 {
detach_thread();
}
vms.borrow_mut().pop();
}
VM_CURRENT.set(vm, f)
})
}
#[cfg(feature = "threading")]
fn init_thread_slot_if_needed(vm: &VirtualMachine) {
CURRENT_THREAD_SLOT.with(|slot| {
if slot.borrow().is_none() {
let thread_id = crate::stdlib::_thread::get_ident();
let mut registry = vm.state.thread_frames.lock();
let new_slot = Arc::new(ThreadSlot {
frames: parking_lot::Mutex::new(Vec::new()),
exception: crate::PyAtomicRef::from(None::<PyBaseExceptionRef>),
#[cfg(unix)]
state: core::sync::atomic::AtomicI32::new(
if vm.state.stop_the_world.requested.load(Ordering::Acquire) {
THREAD_SUSPENDED
} else {
THREAD_DETACHED
},
),
#[cfg(unix)]
stop_requested: core::sync::atomic::AtomicBool::new(false),
#[cfg(unix)]
thread: std::thread::current(),
});
registry.insert(thread_id, new_slot.clone());
drop(registry);
*slot.borrow_mut() = Some(new_slot);
}
});
}
#[cfg(all(unix, feature = "threading"))]
fn wait_while_suspended(slot: &ThreadSlot) -> u64 {
let mut wait_yields = 0u64;
while slot.state.load(Ordering::Acquire) == THREAD_SUSPENDED {
wait_yields = wait_yields.saturating_add(1);
std::thread::park();
}
wait_yields
}
#[cfg(all(unix, feature = "threading"))]
fn attach_thread(vm: &VirtualMachine) {
CURRENT_THREAD_SLOT.with(|slot| {
if let Some(s) = slot.borrow().as_ref() {
super::stw_trace(format_args!("attach begin"));
loop {
match s.state.compare_exchange(
THREAD_DETACHED,
THREAD_ATTACHED,
Ordering::AcqRel,
Ordering::Relaxed,
) {
Ok(_) => {
super::stw_trace(format_args!("attach DETACHED->ATTACHED"));
break;
}
Err(THREAD_SUSPENDED) => {
super::stw_trace(format_args!("attach wait-suspended"));
let wait_yields = wait_while_suspended(s);
vm.state.stop_the_world.add_attach_wait_yields(wait_yields);
}
Err(state) => {
debug_assert!(false, "unexpected thread state in attach: {state}");
break;
}
}
}
}
});
}
#[cfg(all(unix, feature = "threading"))]
fn detach_thread() {
CURRENT_THREAD_SLOT.with(|slot| {
if let Some(s) = slot.borrow().as_ref() {
match s.state.compare_exchange(
THREAD_ATTACHED,
THREAD_DETACHED,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {}
Err(THREAD_DETACHED) => {
debug_assert!(false, "detach called while already DETACHED");
return;
}
Err(state) => {
debug_assert!(false, "unexpected thread state in detach: {state}");
return;
}
}
super::stw_trace(format_args!("detach ATTACHED->DETACHED"));
}
});
}
#[cfg(all(unix, feature = "threading"))]
pub fn allow_threads<R>(vm: &VirtualMachine, f: impl FnOnce() -> R) -> R {
let should_transition = CURRENT_THREAD_SLOT.with(|slot| {
slot.borrow()
.as_ref()
.is_some_and(|s| s.state.load(Ordering::Acquire) == THREAD_ATTACHED)
});
if !should_transition {
return f();
}
detach_thread();
let reattach_guard = scopeguard::guard(vm, attach_thread);
let result = f();
drop(reattach_guard);
result
}
#[cfg(not(all(unix, feature = "threading")))]
pub fn allow_threads<R>(_vm: &VirtualMachine, f: impl FnOnce() -> R) -> R {
f()
}
#[cfg(all(unix, feature = "threading"))]
pub fn suspend_if_needed(stw: &super::StopTheWorldState) {
let should_suspend = CURRENT_THREAD_SLOT.with(|slot| {
slot.borrow()
.as_ref()
.is_some_and(|s| s.stop_requested.load(Ordering::Relaxed))
});
if !should_suspend {
return;
}
if !stw.requested.load(Ordering::Acquire) {
CURRENT_THREAD_SLOT.with(|slot| {
if let Some(s) = slot.borrow().as_ref() {
s.stop_requested.store(false, Ordering::Release);
}
});
return;
}
do_suspend(stw);
}
#[cfg(all(unix, feature = "threading"))]
#[cold]
fn do_suspend(stw: &super::StopTheWorldState) {
CURRENT_THREAD_SLOT.with(|slot| {
if let Some(s) = slot.borrow().as_ref() {
match s.state.compare_exchange(
THREAD_ATTACHED,
THREAD_SUSPENDED,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
s.stop_requested.store(false, Ordering::Release);
}
Err(THREAD_DETACHED) => {
super::stw_trace(format_args!("suspend skip DETACHED"));
return;
}
Err(THREAD_SUSPENDED) => {
s.stop_requested.store(false, Ordering::Release);
super::stw_trace(format_args!("suspend skip already-suspended"));
return;
}
Err(state) => {
debug_assert!(false, "unexpected thread state in suspend: {state}");
return;
}
}
super::stw_trace(format_args!("suspend ATTACHED->SUSPENDED"));
if !stw.requested.load(Ordering::Acquire) {
s.state.store(THREAD_ATTACHED, Ordering::Release);
s.stop_requested.store(false, Ordering::Release);
super::stw_trace(format_args!("suspend abort requested-cleared"));
return;
}
stw.notify_suspended();
super::stw_trace(format_args!("suspend notified-requester"));
let wait_yields = wait_while_suspended(s);
stw.add_suspend_wait_yields(wait_yields);
loop {
match s.state.compare_exchange(
THREAD_DETACHED,
THREAD_ATTACHED,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => break,
Err(THREAD_SUSPENDED) => {
let extra_wait = wait_while_suspended(s);
stw.add_suspend_wait_yields(extra_wait);
}
Err(THREAD_ATTACHED) => break,
Err(state) => {
debug_assert!(false, "unexpected post-suspend state: {state}");
break;
}
}
}
s.stop_requested.store(false, Ordering::Release);
super::stw_trace(format_args!("suspend resume -> ATTACHED"));
}
});
}
#[cfg(all(unix, feature = "threading"))]
#[inline]
pub fn stop_requested_for_current_thread() -> bool {
CURRENT_THREAD_SLOT.with(|slot| {
slot.borrow()
.as_ref()
.is_some_and(|s| s.stop_requested.load(Ordering::Relaxed))
})
}
#[cfg(feature = "threading")]
pub fn push_thread_frame(fp: FramePtr) {
CURRENT_THREAD_SLOT.with(|slot| {
if let Some(s) = slot.borrow().as_ref() {
s.frames.lock().push(fp);
} else {
debug_assert!(
false,
"push_thread_frame called without initialized thread slot"
);
}
});
}
#[cfg(feature = "threading")]
pub fn pop_thread_frame() {
CURRENT_THREAD_SLOT.with(|slot| {
if let Some(s) = slot.borrow().as_ref() {
s.frames.lock().pop();
} else {
debug_assert!(
false,
"pop_thread_frame called without initialized thread slot"
);
}
});
}
pub fn set_current_frame(frame: *const Frame) -> *const Frame {
CURRENT_FRAME.with(|c| c.swap(frame as *mut Frame, Ordering::Relaxed) as *const Frame)
}
pub fn get_current_frame() -> *const Frame {
CURRENT_FRAME.with(|c| c.load(Ordering::Relaxed) as *const Frame)
}
#[cfg(feature = "threading")]
pub fn update_thread_exception(exc: Option<PyBaseExceptionRef>) {
CURRENT_THREAD_SLOT.with(|slot| {
if let Some(s) = slot.borrow().as_ref() {
let _old = unsafe { s.exception.swap(exc) };
}
});
}
#[cfg(feature = "threading")]
pub fn get_all_current_exceptions(vm: &VirtualMachine) -> Vec<(u64, Option<PyBaseExceptionRef>)> {
let registry = vm.state.thread_frames.lock();
registry
.iter()
.map(|(id, slot)| (*id, slot.exception.to_owned()))
.collect()
}
#[cfg(feature = "threading")]
pub fn cleanup_current_thread_frames(vm: &VirtualMachine) {
let thread_id = crate::stdlib::_thread::get_ident();
let current_slot = CURRENT_THREAD_SLOT.with(|slot| slot.borrow().as_ref().cloned());
#[cfg(all(unix, feature = "threading"))]
if let Some(slot) = ¤t_slot {
let _ = slot.state.compare_exchange(
THREAD_ATTACHED,
THREAD_DETACHED,
Ordering::AcqRel,
Ordering::Acquire,
);
}
let removed = if let Some(slot) = ¤t_slot {
let mut registry = vm.state.thread_frames.lock();
match registry.get(&thread_id) {
Some(registered) if Arc::ptr_eq(registered, slot) => registry.remove(&thread_id),
_ => None,
}
} else {
None
};
#[cfg(all(unix, feature = "threading"))]
if let Some(slot) = &removed
&& vm.state.stop_the_world.requested.load(Ordering::Acquire)
&& thread_id != vm.state.stop_the_world.requester_ident()
&& slot.state.load(Ordering::Relaxed) != THREAD_SUSPENDED
{
vm.state.stop_the_world.notify_thread_gone();
}
CURRENT_THREAD_SLOT.with(|s| {
*s.borrow_mut() = None;
});
}
#[cfg(feature = "threading")]
pub fn reinit_frame_slot_after_fork(vm: &VirtualMachine) {
let current_ident = crate::stdlib::_thread::get_ident();
let current_frames: Vec<FramePtr> = vm.frames.borrow().clone();
let new_slot = Arc::new(ThreadSlot {
frames: parking_lot::Mutex::new(current_frames),
exception: crate::PyAtomicRef::from(vm.topmost_exception()),
#[cfg(unix)]
state: core::sync::atomic::AtomicI32::new(THREAD_ATTACHED),
#[cfg(unix)]
stop_requested: core::sync::atomic::AtomicBool::new(false),
#[cfg(unix)]
thread: std::thread::current(),
});
let mut registry = vm.state.thread_frames.lock();
registry.clear();
registry.insert(current_ident, new_slot.clone());
drop(registry);
CURRENT_THREAD_SLOT.with(|s| {
*s.borrow_mut() = Some(new_slot);
});
}
pub fn with_vm<F, R>(obj: &PyObject, f: F) -> Option<R>
where
F: Fn(&VirtualMachine) -> R,
{
let vm_owns_obj = |interp: NonNull<VirtualMachine>| {
let vm = unsafe { interp.as_ref() };
obj.fast_isinstance(vm.ctx.types.object_type)
};
VM_STACK.with(|vms| {
let interp = match vms.borrow().iter().copied().exactly_one() {
Ok(x) => {
debug_assert!(vm_owns_obj(x));
x
}
Err(mut others) => others.find(|x| vm_owns_obj(*x))?,
};
let vm = unsafe { interp.as_ref() };
Some(VM_CURRENT.set(vm, || f(vm)))
})
}
#[must_use = "ThreadedVirtualMachine does nothing unless you move it to another thread and call .run()"]
#[cfg(feature = "threading")]
pub struct ThreadedVirtualMachine {
pub(super) vm: VirtualMachine,
}
#[cfg(feature = "threading")]
impl ThreadedVirtualMachine {
pub fn make_spawn_func<F, R>(self, f: F) -> impl FnOnce() -> R
where
F: FnOnce(&VirtualMachine) -> R,
{
move || self.run(f)
}
pub fn run<F, R>(&self, f: F) -> R
where
F: FnOnce(&VirtualMachine) -> R,
{
let vm = &self.vm;
enter_vm(vm, || f(vm))
}
}
impl VirtualMachine {
#[cfg(feature = "threading")]
pub fn start_thread<F, R>(&self, f: F) -> std::thread::JoinHandle<R>
where
F: FnOnce(&Self) -> R,
F: Send + 'static,
R: Send + 'static,
{
let func = self.new_thread().make_spawn_func(f);
std::thread::spawn(func)
}
#[cfg(feature = "threading")]
pub fn new_thread(&self) -> ThreadedVirtualMachine {
let global_trace = self.state.global_trace_func.lock().clone();
let global_profile = self.state.global_profile_func.lock().clone();
let use_tracing = global_trace.is_some() || global_profile.is_some();
let vm = Self {
builtins: self.builtins.clone(),
sys_module: self.sys_module.clone(),
ctx: self.ctx.clone(),
frames: RefCell::new(vec![]),
datastack: core::cell::UnsafeCell::new(crate::datastack::DataStack::new()),
wasm_id: self.wasm_id.clone(),
exceptions: RefCell::default(),
import_func: self.import_func.clone(),
importlib: self.importlib.clone(),
profile_func: RefCell::new(global_profile.unwrap_or_else(|| self.ctx.none())),
trace_func: RefCell::new(global_trace.unwrap_or_else(|| self.ctx.none())),
use_tracing: Cell::new(use_tracing),
recursion_limit: self.recursion_limit.clone(),
signal_handlers: core::cell::OnceCell::new(),
signal_rx: None,
repr_guards: RefCell::default(),
state: self.state.clone(),
initialized: self.initialized,
recursion_depth: Cell::new(0),
c_stack_soft_limit: Cell::new(VirtualMachine::calculate_c_stack_soft_limit()),
async_gen_firstiter: RefCell::new(None),
async_gen_finalizer: RefCell::new(None),
asyncio_running_loop: RefCell::new(None),
asyncio_running_task: RefCell::new(None),
callable_cache: self.callable_cache.clone(),
};
ThreadedVirtualMachine { vm }
}
}