Skip to main content

wingfoil/
lib.rs

1mod proxy_stream;
2mod py_element;
3mod py_kdb;
4mod py_stream;
5mod types;
6
7use ::wingfoil::{Dep, Node, NodeOperators};
8use py_element::*;
9use py_stream::*;
10use types::ToPyResult;
11
12use pyo3::prelude::*;
13use std::rc::Rc;
14use std::time::Duration;
15
16#[pyclass(unsendable, name = "Node")]
17#[derive(Clone)]
18pub(crate) struct PyNode(Rc<dyn Node>);
19
20impl PyNode {
21    pub(crate) fn new(node: Rc<dyn Node>) -> Self {
22        Self(node)
23    }
24}
25
26#[pymethods]
27impl PyNode {
28    /// Counts how many times upstream node has ticked.
29    fn count(&self) -> PyStream {
30        self.0.count().as_py_stream()
31    }
32
33    #[pyo3(signature = (realtime=true, start=None, duration=None, cycles=None))]
34    fn run(
35        &self,
36        py: Python<'_>,
37        realtime: Option<bool>,
38        start: Option<Py<PyAny>>,
39        duration: Option<Py<PyAny>>,
40        cycles: Option<u32>,
41    ) -> PyResult<()> {
42        let (run_mode, run_for) =
43            types::parse_run_args(py, realtime, start, duration, cycles).to_pyresult()?;
44
45        // Convert fat pointer to (addr, vtable) pair which is Send+Sync
46        let node_ptr = Rc::as_ptr(&self.0);
47        let (addr, vtable): (usize, usize) = unsafe { std::mem::transmute(node_ptr) };
48
49        // Release GIL during the run to allow async tasks to acquire it
50        // SAFETY: The Rc is kept alive by self for the duration of this call
51        let result = py.detach(move || {
52            // Reconstruct the fat pointer from (addr, vtable)
53            let node_ptr: *const dyn Node = unsafe { std::mem::transmute((addr, vtable)) };
54            // Temporarily reconstruct the Rc without taking ownership
55            let node = unsafe { Rc::from_raw(node_ptr) };
56            let result = ::wingfoil::NodeOperators::run(&node, run_mode, run_for);
57            std::mem::forget(node); // Don't drop the Rc (self.0 still owns it)
58            result
59        });
60        result.to_pyresult()?;
61        Ok(())
62    }
63}
64
65/// A node that ticks at the specified period
66#[pyfunction]
67fn ticker(seconds: f64) -> PyNode {
68    let ticker = ::wingfoil::ticker(Duration::from_secs_f64(seconds));
69    PyNode::new(ticker)
70}
71
72/// A stream that ticks once, on first engine cycle
73#[pyfunction]
74fn constant(val: Py<PyAny>) -> PyStream {
75    let strm = ::wingfoil::constant(PyElement::new(val));
76    PyStream(strm)
77}
78
79/// maps steams a amd b into a new stream using func (e.g lambda a, b: a + b)
80#[pyfunction]
81fn bimap(a: Py<PyAny>, b: Py<PyAny>, func: Py<PyAny>) -> PyStream {
82    Python::attach(|py| {
83        let a = a
84            .as_ref()
85            .extract::<PyRef<PyStream>>(py)
86            .unwrap()
87            .inner_stream();
88        let b = b
89            .as_ref()
90            .extract::<PyRef<PyStream>>(py)
91            .unwrap()
92            .inner_stream();
93        let stream = ::wingfoil::bimap(
94            Dep::Active(a),
95            Dep::Active(b),
96            move |a: PyElement, b: PyElement| {
97                Python::attach(|py: Python<'_>| {
98                    let res = func.call1(py, (a.value(), b.value())).unwrap();
99                    PyElement::new(res)
100                })
101            },
102        );
103        PyStream(stream)
104    })
105}
106
107#[pyclass(unsendable, name = "Graph")]
108#[derive(Clone)]
109pub(crate) struct PyGraph(Vec<Rc<dyn Node>>);
110
111#[pymethods]
112impl PyGraph {
113    #[new]
114    fn new(nodes: Vec<Py<PyAny>>) -> PyResult<Self> {
115        Python::attach(|py| {
116            let mut roots: Vec<Rc<dyn Node>> = Vec::new();
117            for obj in nodes {
118                if let Ok(stream) = obj.extract::<PyRef<PyStream>>(py) {
119                    roots.push(stream.0.clone().as_node());
120                } else if let Ok(node) = obj.extract::<PyRef<PyNode>>(py) {
121                    roots.push(node.0.clone());
122                } else {
123                    return Err(pyo3::exceptions::PyTypeError::new_err(
124                        "Graph components must be Stream or Node",
125                    ));
126                }
127            }
128            Ok(PyGraph(roots))
129        })
130    }
131
132    #[pyo3(signature = (realtime=true, start=None, duration=None, cycles=None))]
133    fn run(
134        &self,
135        py: Python<'_>,
136        realtime: Option<bool>,
137        start: Option<Py<PyAny>>,
138        duration: Option<Py<PyAny>>,
139        cycles: Option<u32>,
140    ) -> PyResult<()> {
141        let (run_mode, run_for) =
142            types::parse_run_args(py, realtime, start, duration, cycles).to_pyresult()?;
143
144        let mut ptrs: Vec<(usize, usize)> = Vec::with_capacity(self.0.len());
145        for node in &self.0 {
146            let node_ptr = Rc::as_ptr(node);
147            let (addr, vtable): (usize, usize) = unsafe { std::mem::transmute(node_ptr) };
148            ptrs.push((addr, vtable));
149        }
150
151        let result = py.detach(move || {
152            let mut roots: Vec<Rc<dyn Node>> = Vec::with_capacity(ptrs.len());
153            for (addr, vtable) in ptrs {
154                let node_ptr: *const dyn Node = unsafe { std::mem::transmute((addr, vtable)) };
155                let node = unsafe { Rc::from_raw(node_ptr) };
156                roots.push(node.clone());
157                std::mem::forget(node);
158            }
159
160            let mut graph = ::wingfoil::Graph::new(roots, run_mode, run_for);
161            graph.run()
162        });
163        result.to_pyresult()?;
164        Ok(())
165    }
166}
167
168/// Wingfoil is a blazingly fast, highly scalable stream processing
169/// framework designed for latency-critical use cases such as electronic
170/// trading and real-time AI systems
171#[pymodule]
172fn _wingfoil(module: &Bound<'_, PyModule>) -> PyResult<()> {
173    let env = env_logger::Env::default().default_filter_or("info");
174    env_logger::Builder::from_env(env).init();
175    module.add_function(wrap_pyfunction!(ticker, module)?)?;
176    module.add_function(wrap_pyfunction!(constant, module)?)?;
177    module.add_function(wrap_pyfunction!(bimap, module)?)?;
178    module.add_function(wrap_pyfunction!(py_kdb::py_kdb_read, module)?)?;
179    module.add_function(wrap_pyfunction!(py_kdb::py_kdb_write, module)?)?;
180    module.add_class::<PyNode>()?;
181    module.add_class::<PyStream>()?;
182    module.add_class::<PyGraph>()?;
183    module.add("__version__", env!("CARGO_PKG_VERSION"))?;
184    Ok(())
185}