querent_synapse/querent/
py_process.rs

1use pyo3::{
2	prelude::*,
3	types::{IntoPyDict, PyDict, PyTuple},
4};
5
6pub struct Process<'a> {
7	py: Python<'a>,
8	py_process: &'a PyAny,
9	py_manager: &'a PyAny,
10
11	pool: Vec<&'a PyAny>,
12}
13
14impl<'a> Process<'a> {
15	pub fn new(py: Python<'a>) -> PyResult<Self> {
16		let py_multiprocessing = py.import("multiprocessing")?;
17		let py_process = py_multiprocessing.getattr("Process")?; // Adjusted
18		let py_manager = py_multiprocessing.getattr("Manager")?.call0()?; // Adjusted
19
20		Ok(Self { py, py_process, py_manager, pool: vec![] })
21	}
22
23	pub fn is_running(&self) -> bool {
24		!self.pool.is_empty()
25	}
26
27	pub fn spawn(
28		&mut self,
29		f: impl IntoPy<Py<PyAny>>,
30		args: impl IntoPy<Py<PyTuple>>,
31		kwargs: Option<&PyDict>,
32	) -> PyResult<()> {
33		let f = f.into_py(self.py);
34		let f_args = args.into_py(self.py).into_py(self.py);
35		let f_kwargs = kwargs.or_else(|| Some(PyDict::new(self.py))).into_py(self.py);
36
37		let kwargs = [("target", f), ("args", f_args), ("kwargs", f_kwargs)].into_py_dict(self.py);
38
39		let p = self.py_process.call((), Some(kwargs))?;
40		p.call_method0("start")?;
41		self.pool.push(p);
42		Ok(())
43	}
44
45	pub fn spawn_mut(
46		&mut self,
47		f: impl IntoPy<Py<PyAny>>,
48		args: impl IntoPy<Py<PyTuple>>,
49		kwargs: Option<&PyDict>,
50	) -> PyResult<(Py<PyAny>, Py<PyAny>)> {
51		let f = f.into_py(self.py);
52		let f_args: Py<PyTuple> = args.into_py(self.py);
53		let f_kwargs: Option<&PyDict> = kwargs.or_else(|| Some(PyDict::new(self.py)));
54
55		let f_args = self.py_manager.call_method1("list", (f_args,))?;
56		let f_kwargs = self.py_manager.call_method1("dict", (f_kwargs,))?;
57
58		let kwargs =
59			[("target", f), ("args", (f_args, f_kwargs).into_py(self.py))].into_py_dict(self.py);
60
61		let p = self.py_process.call((), Some(kwargs))?;
62		p.call_method0("start")?;
63		self.pool.push(p);
64		Ok((f_args.into_py(self.py), f_kwargs.into_py(self.py)))
65	}
66
67	pub fn join(&mut self) -> PyResult<()> {
68		for p in &self.pool {
69			p.call_method0("join")?;
70		}
71		self.pool.clear();
72		Ok(())
73	}
74}
75
76impl<'a> Drop for Process<'a> {
77	fn drop(&mut self) {
78		Python::with_gil(|py| -> PyResult<()> {
79			self.join()?; // Ensure subprocesses are joined
80			self.py_manager.call_method0("shutdown")?; // Correctly shutdown the manager
81			py.import("threading")?.getattr("_shutdown")?.call0()?; // Correctly shutdown threading
82			Ok(())
83		})
84		.map_err(|e| {
85			Python::with_gil(|py| {
86				e.print_and_set_sys_last_vars(py);
87			})
88		})
89		.unwrap()
90	}
91}