Skip to main content

wingfoil/
lib.rs

1mod proxy_stream;
2mod py_element;
3mod py_stream;
4mod types;
5
6use ::wingfoil::{Dep, Node, NodeOperators};
7use py_element::*;
8use py_stream::*;
9
10use pyo3::prelude::*;
11use std::rc::Rc;
12use std::time::Duration;
13
14#[pyclass(unsendable, name = "Node")]
15#[derive(Clone)]
16struct PyNode(Rc<dyn Node>);
17
18impl PyNode {
19    fn new(node: Rc<dyn Node>) -> Self {
20        Self(node)
21    }
22}
23
24#[pymethods]
25impl PyNode {
26    /// Counts how many times upstream node has ticked.
27    fn count(&self) -> PyStream {
28        self.0.count().as_py_stream()
29    }
30}
31
32/// A node that ticks at the specified period
33#[pyfunction]
34fn ticker(seconds: f64) -> PyNode {
35    let ticker = ::wingfoil::ticker(Duration::from_secs_f64(seconds));
36    PyNode::new(ticker)
37}
38
39/// A atream that ticks once, on first engine cycle
40#[pyfunction]
41fn constant(val: Py<PyAny>) -> PyStream {
42    let strm = ::wingfoil::constant(PyElement::new(val));
43    PyStream(strm)
44}
45
46/// maps steams a amd b into a new stream using func (e.g lambda a, b: a + b)
47#[pyfunction]
48fn bimap(a: Py<PyAny>, b: Py<PyAny>, func: Py<PyAny>) -> PyStream {
49    Python::attach(|py| {
50        let a = a
51            .as_ref()
52            .extract::<PyRef<PyStream>>(py)
53            .unwrap()
54            .inner_stream();
55        let b = b
56            .as_ref()
57            .extract::<PyRef<PyStream>>(py)
58            .unwrap()
59            .inner_stream();
60        let stream = ::wingfoil::bimap(
61            Dep::Active(a),
62            Dep::Active(b),
63            move |a: PyElement, b: PyElement| {
64                Python::attach(|py: Python<'_>| {
65                    let res = func.call1(py, (a.value(), b.value())).unwrap();
66                    PyElement::new(res)
67                })
68            },
69        );
70        PyStream(stream)
71    })
72}
73
74/// Wingfoil is a blazingly fast, highly scalable stream processing
75/// framework designed for latency-critical use cases such as electronic
76/// trading and real-time AI systems
77#[pymodule]
78fn _wingfoil(module: &Bound<'_, PyModule>) -> PyResult<()> {
79    let env = env_logger::Env::default().default_filter_or("info");
80    env_logger::Builder::from_env(env).init();
81    module.add_function(wrap_pyfunction!(ticker, module)?)?;
82    module.add_function(wrap_pyfunction!(constant, module)?)?;
83    module.add_function(wrap_pyfunction!(bimap, module)?)?;
84    module.add_class::<PyNode>()?;
85    module.add_class::<PyStream>()?;
86    module.add("__version__", env!("CARGO_PKG_VERSION"))?;
87    Ok(())
88}