use log::Level;
use pyo3::BoundObject;
use std::any::type_name;
use ::wingfoil::{Element, IntoStream, NodeOperators, Stream, StreamOperators};
use pyo3::conversion::IntoPyObject;
use pyo3::prelude::*;
use std::rc::Rc;
use crate::proxy_stream::*;
use crate::py_element::PyElement;
use crate::types::*;
use crate::*;
#[derive(Clone)]
#[pyclass(subclass, unsendable, name = "Stream")]
pub struct PyStream(pub Rc<dyn Stream<PyElement>>);
impl PyStream {
fn extract<T>(&self) -> Rc<dyn Stream<T>>
where
T: Element + for<'a, 'py> FromPyObject<'a, 'py>,
{
self.0.map(move |x: PyElement| {
Python::attach(|py| match x.as_ref().extract::<T>(py) {
Ok(val) => val,
Err(_err) => {
panic!("Failed to convert from python type to native rust type")
}
})
})
}
pub fn inner_stream(&self) -> Rc<dyn Stream<PyElement>> {
self.0.clone()
}
pub fn from_inner(inner: Rc<dyn Stream<PyElement>>) -> Self {
Self(inner)
}
}
pub fn to_pyany<T>(x: T) -> Py<PyAny>
where
T: for<'py> IntoPyObject<'py>,
{
Python::attach(|py| match x.into_pyobject(py) {
Ok(bound) => bound.into_any().unbind(),
Err(_) => panic!("Conversion to PyAny from type {} failed", type_name::<T>()),
})
}
pub fn vec_any_to_pyany(x: Vec<Py<PyAny>>) -> Py<PyAny> {
Python::attach(|py| x.into_pyobject(py).unwrap().into_any().unbind())
}
pub trait AsPyStream<T>
where
T: Element + for<'py> IntoPyObject<'py>,
{
fn as_py_stream(&self) -> PyStream;
}
impl<T> AsPyStream<T> for Rc<dyn Stream<T>>
where
T: Element + for<'py> IntoPyObject<'py>,
{
fn as_py_stream(&self) -> PyStream {
let strm = self.map(|x| {
let py_any = to_pyany(x);
PyElement::new(py_any)
});
PyStream(strm)
}
}
#[pymethods]
impl PyStream {
#[new]
fn new(inner: Py<PyAny>) -> Self {
let stream = PyProxyStream::new(inner);
let stream = stream.into_stream();
Self(stream)
}
#[pyo3(signature = (realtime=true, start=None, duration=None, cycles=None))]
fn run(
&self,
py: Python<'_>,
realtime: Option<bool>,
start: Option<Py<PyAny>>,
duration: Option<Py<PyAny>>,
cycles: Option<u32>,
) -> PyResult<()> {
let (run_mode, run_for) =
parse_run_args(py, realtime, start, duration, cycles).to_pyresult()?;
let stream_ptr = Rc::as_ptr(&self.0);
let (addr, vtable): (usize, usize) = unsafe { std::mem::transmute(stream_ptr) };
let result = py.detach(move || {
let stream_ptr: *const dyn Stream<PyElement> =
unsafe { std::mem::transmute((addr, vtable)) };
let stream = unsafe { Rc::from_raw(stream_ptr) };
let result = stream.run(run_mode, run_for);
std::mem::forget(stream); result
});
result.to_pyresult()?;
Ok(())
}
fn peek_value(&self) -> Py<PyAny> {
self.0.peek_value().value()
}
fn collect(&self) -> PyStream {
let strm = self.0.collect().map(|items| {
Python::attach(move |py| {
let items = items
.iter()
.map(|item| item.value.as_ref().clone_ref(py))
.collect::<Vec<_>>();
PyElement::new(vec_any_to_pyany(items))
})
});
PyStream(strm)
}
fn dataframe(&self) -> PyStream {
let time_stream = self.0.clone().as_node().ticked_at_elapsed();
let zipped = ::wingfoil::bimap(
Dep::Active(self.0.clone()),
Dep::Active(time_stream),
|val: PyElement, time: ::wingfoil::NanoTime| {
Python::attach(|py| {
let time_secs: f64 = time.into();
let py_tuple = pyo3::types::PyTuple::new(
py,
&[
time_secs.into_pyobject(py).unwrap().into_any(),
val.value().into_bound(py),
],
)
.unwrap();
PyElement::new(py_tuple.into_any().unbind())
})
},
);
let strm = zipped.collect().map(|items| {
Python::attach(move |py| {
let items = items
.iter()
.map(|item| item.value.as_ref().clone_ref(py))
.collect::<Vec<_>>();
PyElement::new(vec_any_to_pyany(items))
})
});
PyStream(strm)
}
fn average(&self) -> PyStream {
self.extract::<f64>().average().as_py_stream()
}
fn buffer(&self, capacity: usize) -> PyStream {
let strm = self.0.buffer(capacity).map(|items| {
Python::attach(move |py| {
let items = items
.iter()
.map(|item| item.as_ref().clone_ref(py))
.collect::<Vec<_>>();
PyElement::new(vec_any_to_pyany(items))
})
});
PyStream(strm)
}
fn finally(&self, func: Py<PyAny>) -> PyNode {
let node = self.0.finally(|py_elmnt, _| {
Python::attach(move |py| {
let res = py_elmnt.as_ref().clone_ref(py);
let args = (res,);
func.call1(py, args).unwrap();
});
Ok(())
});
PyNode(node)
}
fn for_each(&self, func: Py<PyAny>) -> PyNode {
let node = self.0.for_each(move |py_elmnt, t| {
Python::attach(|py| {
let res = py_elmnt.as_ref().clone_ref(py);
let t: f64 = t.into();
let args = (res, t);
func.call1(py, args).unwrap();
});
});
PyNode(node)
}
fn inspect(&self, func: Py<PyAny>) -> PyStream {
let stream = self.0.inspect(move |x| {
Python::attach(|py| {
func.call1(py, (x.value(),)).unwrap();
});
});
PyStream(stream)
}
fn difference(&self) -> PyStream {
PyStream(self.0.difference())
}
fn delay(&self, delay_secs: f64) -> PyStream {
let delay = Duration::from_secs_f64(delay_secs);
PyStream(self.0.delay(delay))
}
fn distinct(&self) -> PyStream {
PyStream(self.0.distinct())
}
fn filter(&self, keep_func: Py<PyAny>) -> PyStream {
let keep = self.0.map(move |x| {
Python::attach(|py| {
keep_func
.call1(py, (x.value(),))
.unwrap()
.extract::<bool>(py)
.unwrap()
})
});
PyStream(self.0.filter(keep))
}
fn limit(&self, limit: u32) -> PyStream {
PyStream(self.0.limit(limit))
}
fn logged(&self, label: String) -> PyStream {
PyStream(self.0.logged(&label, Level::Info))
}
fn map(&self, func: Py<PyAny>) -> PyStream {
let stream = self.0.map(move |x| {
Python::attach(|py| {
let res = func.call1(py, (x.value(),)).unwrap();
PyElement::new(res)
})
});
PyStream(stream)
}
fn not(&self) -> PyStream {
PyStream(self.0.not())
}
fn sample(&self, trigger: Py<PyAny>) -> PyStream {
Python::attach(|py| {
let obj = trigger.as_ref();
if let Ok(node) = obj.extract::<PyRef<PyNode>>(py) {
return PyStream(self.0.sample(node.0.clone()));
}
if let Ok(stream) = obj.extract::<PyRef<PyStream>>(py) {
return PyStream(self.0.sample(stream.0.clone()));
}
panic!("Expected a PyNode or PyStream");
})
}
fn sum(&self) -> PyStream {
PyStream(self.0.sum())
}
fn count(&self) -> PyStream {
self.0.count().as_py_stream()
}
fn with_time(&self) -> PyStream {
let strm = self.0.with_time().map(|(t, v)| {
Python::attach(|py| {
let time_secs: f64 = t.into();
let py_tuple = pyo3::types::PyTuple::new(
py,
&[
time_secs.into_pyobject(py).unwrap().into_any(),
v.value().into_bound(py),
],
)
.unwrap();
PyElement::new(py_tuple.into_any().unbind())
})
});
PyStream(strm)
}
#[pyo3(signature = (host, port, table, columns))]
fn kdb_write(
&self,
host: String,
port: u16,
table: String,
columns: Vec<(String, String)>,
) -> PyResult<PyNode> {
let conn = ::wingfoil::adapters::kdb::KdbConnection::new(host, port);
let node = crate::py_kdb::py_kdb_write_inner(conn, table, columns, &self.0)?;
Ok(PyNode::new(node))
}
fn zmq_pub(&self, port: u16) -> PyNode {
PyNode::new(crate::py_zmq::py_zmq_pub_inner(&self.0, port))
}
}