use crate::py_element::PyElement;
use crate::py_stream::PyStream;
use pyo3::prelude::*;
use std::rc::Rc;
use wingfoil::adapters::zmq::{ZeroMqPub, ZmqStatus, zmq_sub};
use wingfoil::{Node, Stream, StreamOperators};
#[pyfunction]
pub fn py_zmq_sub(address: String) -> (PyStream, PyStream) {
let (data, status) = zmq_sub::<Vec<u8>>(&address);
let data_py = data.map(|burst| {
Python::attach(|py| {
let items: Vec<Py<PyAny>> = burst
.into_iter()
.map(|b| pyo3::types::PyBytes::new(py, &b).into_any().unbind())
.collect();
PyElement::new(
pyo3::types::PyList::new(py, items)
.unwrap()
.into_any()
.unbind(),
)
})
});
let status_py = status.map(|s| {
Python::attach(|py| {
let s_str = match s {
ZmqStatus::Connected => "connected",
ZmqStatus::Disconnected => "disconnected",
};
PyElement::new(s_str.into_pyobject(py).unwrap().into_any().unbind())
})
});
(PyStream(data_py), PyStream(status_py))
}
pub fn py_zmq_pub_inner(stream: &Rc<dyn Stream<PyElement>>, port: u16) -> Rc<dyn Node> {
let bytes_stream: Rc<dyn Stream<Vec<u8>>> = stream.map(|elem| {
Python::attach(|py| {
elem.as_ref().extract::<Vec<u8>>(py).unwrap_or_else(|e| {
log::error!("zmq_pub: stream value is not bytes: {e}");
Vec::new()
})
})
});
bytes_stream.zmq_pub(port)
}