Skip to main content

wingfoil/
lib.rs

1mod proxy_stream;
2mod py_csv;
3mod py_element;
4#[cfg(feature = "etcd")]
5mod py_etcd;
6mod py_fix;
7#[cfg(feature = "iceoryx2-beta")]
8mod py_iceoryx2;
9mod py_kdb;
10mod py_otlp;
11mod py_prometheus;
12mod py_stream;
13mod py_zmq;
14mod types;
15
16use ::wingfoil::{Dep, Node, NodeOperators};
17use py_element::*;
18use py_stream::*;
19use types::ToPyResult;
20
21use pyo3::prelude::*;
22use std::rc::Rc;
23use std::time::Duration;
24
25#[pyclass(unsendable, name = "Node")]
26#[derive(Clone)]
27pub(crate) struct PyNode(Rc<dyn Node>);
28
29impl PyNode {
30    pub(crate) fn new(node: Rc<dyn Node>) -> Self {
31        Self(node)
32    }
33}
34
35#[pymethods]
36impl PyNode {
37    /// Counts how many times upstream node has ticked.
38    fn count(&self) -> PyStream {
39        self.0.count().as_py_stream()
40    }
41
42    #[pyo3(signature = (realtime, start=None, duration=None, cycles=None))]
43    fn run(
44        &self,
45        py: Python<'_>,
46        realtime: bool,
47        start: Option<Py<PyAny>>,
48        duration: Option<Py<PyAny>>,
49        cycles: Option<u32>,
50    ) -> PyResult<()> {
51        let (run_mode, run_for) =
52            types::parse_run_args(py, realtime, start, duration, cycles).to_pyresult()?;
53
54        // Convert fat pointer to (addr, vtable) pair which is Send+Sync
55        let node_ptr = Rc::as_ptr(&self.0);
56        let (addr, vtable): (usize, usize) = unsafe { std::mem::transmute(node_ptr) };
57
58        // Release GIL during the run to allow async tasks to acquire it
59        // SAFETY: The Rc is kept alive by self for the duration of this call
60        let result = py.detach(move || {
61            // Reconstruct the fat pointer from (addr, vtable)
62            let node_ptr: *const dyn Node = unsafe { std::mem::transmute((addr, vtable)) };
63            // Temporarily reconstruct the Rc without taking ownership
64            let node = unsafe { Rc::from_raw(node_ptr) };
65            let result = ::wingfoil::NodeOperators::run(&node, run_mode, run_for);
66            std::mem::forget(node); // Don't drop the Rc (self.0 still owns it)
67            result
68        });
69        result.to_pyresult()?;
70        Ok(())
71    }
72}
73
74/// A node that ticks at the specified period
75#[pyfunction]
76fn ticker(seconds: f64) -> PyNode {
77    let ticker = ::wingfoil::ticker(Duration::from_secs_f64(seconds));
78    PyNode::new(ticker)
79}
80
81/// A stream that ticks once, on first engine cycle
82#[pyfunction]
83fn constant(val: Py<PyAny>) -> PyStream {
84    let strm = ::wingfoil::constant(PyElement::new(val));
85    PyStream(strm)
86}
87
88/// maps steams a amd b into a new stream using func (e.g lambda a, b: a + b)
89#[pyfunction]
90fn bimap(a: Py<PyAny>, b: Py<PyAny>, func: Py<PyAny>) -> PyStream {
91    Python::attach(|py| {
92        let a = a
93            .as_ref()
94            .extract::<PyRef<PyStream>>(py)
95            .unwrap()
96            .inner_stream();
97        let b = b
98            .as_ref()
99            .extract::<PyRef<PyStream>>(py)
100            .unwrap()
101            .inner_stream();
102        let stream = ::wingfoil::bimap(
103            Dep::Active(a),
104            Dep::Active(b),
105            move |a: PyElement, b: PyElement| {
106                Python::attach(|py: Python<'_>| {
107                    let res = func.call1(py, (a.value(), b.value())).unwrap();
108                    PyElement::new(res)
109                })
110            },
111        );
112        PyStream(stream)
113    })
114}
115
116#[pyclass(unsendable, name = "Graph")]
117#[derive(Clone)]
118pub(crate) struct PyGraph(Vec<Rc<dyn Node>>);
119
120#[pymethods]
121impl PyGraph {
122    #[new]
123    fn new(nodes: Vec<Py<PyAny>>) -> PyResult<Self> {
124        Python::attach(|py| {
125            let mut roots: Vec<Rc<dyn Node>> = Vec::new();
126            for obj in nodes {
127                if let Ok(stream) = obj.extract::<PyRef<PyStream>>(py) {
128                    roots.push(stream.0.clone().as_node());
129                } else if let Ok(node) = obj.extract::<PyRef<PyNode>>(py) {
130                    roots.push(node.0.clone());
131                } else {
132                    return Err(pyo3::exceptions::PyTypeError::new_err(
133                        "Graph components must be Stream or Node",
134                    ));
135                }
136            }
137            Ok(PyGraph(roots))
138        })
139    }
140
141    #[pyo3(signature = (realtime, start=None, duration=None, cycles=None))]
142    fn run(
143        &self,
144        py: Python<'_>,
145        realtime: bool,
146        start: Option<Py<PyAny>>,
147        duration: Option<Py<PyAny>>,
148        cycles: Option<u32>,
149    ) -> PyResult<()> {
150        let (run_mode, run_for) =
151            types::parse_run_args(py, realtime, start, duration, cycles).to_pyresult()?;
152
153        let mut ptrs: Vec<(usize, usize)> = Vec::with_capacity(self.0.len());
154        for node in &self.0 {
155            let node_ptr = Rc::as_ptr(node);
156            let (addr, vtable): (usize, usize) = unsafe { std::mem::transmute(node_ptr) };
157            ptrs.push((addr, vtable));
158        }
159
160        let result = py.detach(move || {
161            let mut roots: Vec<Rc<dyn Node>> = Vec::with_capacity(ptrs.len());
162            for (addr, vtable) in ptrs {
163                let node_ptr: *const dyn Node = unsafe { std::mem::transmute((addr, vtable)) };
164                let node = unsafe { Rc::from_raw(node_ptr) };
165                roots.push(node.clone());
166                std::mem::forget(node);
167            }
168
169            let mut graph = ::wingfoil::Graph::new(roots, run_mode, run_for);
170            graph.run()
171        });
172        result.to_pyresult()?;
173        Ok(())
174    }
175}
176
177/// Wingfoil is a blazingly fast, highly scalable stream processing
178/// framework designed for latency-critical use cases such as electronic
179/// trading and real-time AI systems
180#[pymodule]
181fn _wingfoil(module: &Bound<'_, PyModule>) -> PyResult<()> {
182    let env = env_logger::Env::default().default_filter_or("info");
183    env_logger::Builder::from_env(env).init();
184    module.add_function(wrap_pyfunction!(ticker, module)?)?;
185    module.add_function(wrap_pyfunction!(constant, module)?)?;
186    module.add_function(wrap_pyfunction!(bimap, module)?)?;
187    module.add_function(wrap_pyfunction!(py_csv::py_csv_read, module)?)?;
188    #[cfg(feature = "etcd")]
189    module.add_function(wrap_pyfunction!(py_etcd::py_etcd_sub, module)?)?;
190    module.add_function(wrap_pyfunction!(py_kdb::py_kdb_read, module)?)?;
191    module.add_function(wrap_pyfunction!(py_kdb::py_kdb_write, module)?)?;
192    module.add_function(wrap_pyfunction!(py_zmq::py_zmq_sub, module)?)?;
193    #[cfg(feature = "iceoryx2-beta")]
194    module.add_function(wrap_pyfunction!(py_iceoryx2::py_iceoryx2_sub, module)?)?;
195    #[cfg(feature = "etcd")]
196    module.add_function(wrap_pyfunction!(py_zmq::py_zmq_sub_etcd, module)?)?;
197    module.add_function(wrap_pyfunction!(py_fix::py_fix_connect, module)?)?;
198    module.add_function(wrap_pyfunction!(py_fix::py_fix_connect_tls, module)?)?;
199    module.add_function(wrap_pyfunction!(py_fix::py_fix_accept, module)?)?;
200    module.add_class::<PyNode>()?;
201    module.add_class::<PyStream>()?;
202    module.add_class::<PyGraph>()?;
203    #[cfg(feature = "iceoryx2-beta")]
204    module.add_class::<py_iceoryx2::PyIceoryx2ServiceVariant>()?;
205    #[cfg(feature = "iceoryx2-beta")]
206    module.add_class::<py_iceoryx2::PyIceoryx2Mode>()?;
207    module.add_class::<py_prometheus::PyPrometheusExporter>()?;
208    module.add("__version__", env!("CARGO_PKG_VERSION"))?;
209    Ok(())
210}