querent_synapse/querent/
py_process.rs1use pyo3::{
2 prelude::*,
3 types::{IntoPyDict, PyDict, PyTuple},
4};
5
6pub struct Process<'a> {
7 py: Python<'a>,
8 py_process: &'a PyAny,
9 py_manager: &'a PyAny,
10
11 pool: Vec<&'a PyAny>,
12}
13
14impl<'a> Process<'a> {
15 pub fn new(py: Python<'a>) -> PyResult<Self> {
16 let py_multiprocessing = py.import("multiprocessing")?;
17 let py_process = py_multiprocessing.getattr("Process")?; let py_manager = py_multiprocessing.getattr("Manager")?.call0()?; Ok(Self { py, py_process, py_manager, pool: vec![] })
21 }
22
23 pub fn is_running(&self) -> bool {
24 !self.pool.is_empty()
25 }
26
27 pub fn spawn(
28 &mut self,
29 f: impl IntoPy<Py<PyAny>>,
30 args: impl IntoPy<Py<PyTuple>>,
31 kwargs: Option<&PyDict>,
32 ) -> PyResult<()> {
33 let f = f.into_py(self.py);
34 let f_args = args.into_py(self.py).into_py(self.py);
35 let f_kwargs = kwargs.or_else(|| Some(PyDict::new(self.py))).into_py(self.py);
36
37 let kwargs = [("target", f), ("args", f_args), ("kwargs", f_kwargs)].into_py_dict(self.py);
38
39 let p = self.py_process.call((), Some(kwargs))?;
40 p.call_method0("start")?;
41 self.pool.push(p);
42 Ok(())
43 }
44
45 pub fn spawn_mut(
46 &mut self,
47 f: impl IntoPy<Py<PyAny>>,
48 args: impl IntoPy<Py<PyTuple>>,
49 kwargs: Option<&PyDict>,
50 ) -> PyResult<(Py<PyAny>, Py<PyAny>)> {
51 let f = f.into_py(self.py);
52 let f_args: Py<PyTuple> = args.into_py(self.py);
53 let f_kwargs: Option<&PyDict> = kwargs.or_else(|| Some(PyDict::new(self.py)));
54
55 let f_args = self.py_manager.call_method1("list", (f_args,))?;
56 let f_kwargs = self.py_manager.call_method1("dict", (f_kwargs,))?;
57
58 let kwargs =
59 [("target", f), ("args", (f_args, f_kwargs).into_py(self.py))].into_py_dict(self.py);
60
61 let p = self.py_process.call((), Some(kwargs))?;
62 p.call_method0("start")?;
63 self.pool.push(p);
64 Ok((f_args.into_py(self.py), f_kwargs.into_py(self.py)))
65 }
66
67 pub fn join(&mut self) -> PyResult<()> {
68 for p in &self.pool {
69 p.call_method0("join")?;
70 }
71 self.pool.clear();
72 Ok(())
73 }
74}
75
76impl<'a> Drop for Process<'a> {
77 fn drop(&mut self) {
78 Python::with_gil(|py| -> PyResult<()> {
79 self.join()?; self.py_manager.call_method0("shutdown")?; py.import("threading")?.getattr("_shutdown")?.call0()?; Ok(())
83 })
84 .map_err(|e| {
85 Python::with_gil(|py| {
86 e.print_and_set_sys_last_vars(py);
87 })
88 })
89 .unwrap()
90 }
91}