1mod proxy_stream;
2mod py_element;
3mod py_kdb;
4mod py_stream;
5mod types;
6
7use ::wingfoil::{Dep, Node, NodeOperators};
8use py_element::*;
9use py_stream::*;
10use types::ToPyResult;
11
12use pyo3::prelude::*;
13use std::rc::Rc;
14use std::time::Duration;
15
16#[pyclass(unsendable, name = "Node")]
17#[derive(Clone)]
18pub(crate) struct PyNode(Rc<dyn Node>);
19
20impl PyNode {
21 pub(crate) fn new(node: Rc<dyn Node>) -> Self {
22 Self(node)
23 }
24}
25
26#[pymethods]
27impl PyNode {
28 fn count(&self) -> PyStream {
30 self.0.count().as_py_stream()
31 }
32
33 #[pyo3(signature = (realtime=true, start=None, duration=None, cycles=None))]
34 fn run(
35 &self,
36 py: Python<'_>,
37 realtime: Option<bool>,
38 start: Option<Py<PyAny>>,
39 duration: Option<Py<PyAny>>,
40 cycles: Option<u32>,
41 ) -> PyResult<()> {
42 let (run_mode, run_for) =
43 types::parse_run_args(py, realtime, start, duration, cycles).to_pyresult()?;
44
45 let node_ptr = Rc::as_ptr(&self.0);
47 let (addr, vtable): (usize, usize) = unsafe { std::mem::transmute(node_ptr) };
48
49 let result = py.detach(move || {
52 let node_ptr: *const dyn Node = unsafe { std::mem::transmute((addr, vtable)) };
54 let node = unsafe { Rc::from_raw(node_ptr) };
56 let result = ::wingfoil::NodeOperators::run(&node, run_mode, run_for);
57 std::mem::forget(node); result
59 });
60 result.to_pyresult()?;
61 Ok(())
62 }
63}
64
65#[pyfunction]
67fn ticker(seconds: f64) -> PyNode {
68 let ticker = ::wingfoil::ticker(Duration::from_secs_f64(seconds));
69 PyNode::new(ticker)
70}
71
72#[pyfunction]
74fn constant(val: Py<PyAny>) -> PyStream {
75 let strm = ::wingfoil::constant(PyElement::new(val));
76 PyStream(strm)
77}
78
79#[pyfunction]
81fn bimap(a: Py<PyAny>, b: Py<PyAny>, func: Py<PyAny>) -> PyStream {
82 Python::attach(|py| {
83 let a = a
84 .as_ref()
85 .extract::<PyRef<PyStream>>(py)
86 .unwrap()
87 .inner_stream();
88 let b = b
89 .as_ref()
90 .extract::<PyRef<PyStream>>(py)
91 .unwrap()
92 .inner_stream();
93 let stream = ::wingfoil::bimap(
94 Dep::Active(a),
95 Dep::Active(b),
96 move |a: PyElement, b: PyElement| {
97 Python::attach(|py: Python<'_>| {
98 let res = func.call1(py, (a.value(), b.value())).unwrap();
99 PyElement::new(res)
100 })
101 },
102 );
103 PyStream(stream)
104 })
105}
106
107#[pyclass(unsendable, name = "Graph")]
108#[derive(Clone)]
109pub(crate) struct PyGraph(Vec<Rc<dyn Node>>);
110
111#[pymethods]
112impl PyGraph {
113 #[new]
114 fn new(nodes: Vec<Py<PyAny>>) -> PyResult<Self> {
115 Python::attach(|py| {
116 let mut roots: Vec<Rc<dyn Node>> = Vec::new();
117 for obj in nodes {
118 if let Ok(stream) = obj.extract::<PyRef<PyStream>>(py) {
119 roots.push(stream.0.clone().as_node());
120 } else if let Ok(node) = obj.extract::<PyRef<PyNode>>(py) {
121 roots.push(node.0.clone());
122 } else {
123 return Err(pyo3::exceptions::PyTypeError::new_err(
124 "Graph components must be Stream or Node",
125 ));
126 }
127 }
128 Ok(PyGraph(roots))
129 })
130 }
131
132 #[pyo3(signature = (realtime=true, start=None, duration=None, cycles=None))]
133 fn run(
134 &self,
135 py: Python<'_>,
136 realtime: Option<bool>,
137 start: Option<Py<PyAny>>,
138 duration: Option<Py<PyAny>>,
139 cycles: Option<u32>,
140 ) -> PyResult<()> {
141 let (run_mode, run_for) =
142 types::parse_run_args(py, realtime, start, duration, cycles).to_pyresult()?;
143
144 let mut ptrs: Vec<(usize, usize)> = Vec::with_capacity(self.0.len());
145 for node in &self.0 {
146 let node_ptr = Rc::as_ptr(node);
147 let (addr, vtable): (usize, usize) = unsafe { std::mem::transmute(node_ptr) };
148 ptrs.push((addr, vtable));
149 }
150
151 let result = py.detach(move || {
152 let mut roots: Vec<Rc<dyn Node>> = Vec::with_capacity(ptrs.len());
153 for (addr, vtable) in ptrs {
154 let node_ptr: *const dyn Node = unsafe { std::mem::transmute((addr, vtable)) };
155 let node = unsafe { Rc::from_raw(node_ptr) };
156 roots.push(node.clone());
157 std::mem::forget(node);
158 }
159
160 let mut graph = ::wingfoil::Graph::new(roots, run_mode, run_for);
161 graph.run()
162 });
163 result.to_pyresult()?;
164 Ok(())
165 }
166}
167
168#[pymodule]
172fn _wingfoil(module: &Bound<'_, PyModule>) -> PyResult<()> {
173 let env = env_logger::Env::default().default_filter_or("info");
174 env_logger::Builder::from_env(env).init();
175 module.add_function(wrap_pyfunction!(ticker, module)?)?;
176 module.add_function(wrap_pyfunction!(constant, module)?)?;
177 module.add_function(wrap_pyfunction!(bimap, module)?)?;
178 module.add_function(wrap_pyfunction!(py_kdb::py_kdb_read, module)?)?;
179 module.add_function(wrap_pyfunction!(py_kdb::py_kdb_write, module)?)?;
180 module.add_class::<PyNode>()?;
181 module.add_class::<PyStream>()?;
182 module.add_class::<PyGraph>()?;
183 module.add("__version__", env!("CARGO_PKG_VERSION"))?;
184 Ok(())
185}