use crate::py_element::PyElement;
use crate::py_stream::PyStream;
use pyo3::prelude::*;
use std::rc::Rc;
use wingfoil::adapters::zmq::{ZeroMqPub, ZmqStatus, zmq_sub};
use wingfoil::{Node, Stream, StreamOperators};
#[pyfunction]
pub fn py_zmq_sub(address: String) -> PyResult<(PyStream, PyStream)> {
let (data, status) = zmq_sub::<Vec<u8>>(address.as_str())
.map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?;
Ok(streams_to_py(data, status))
}
#[cfg(feature = "etcd")]
#[pyfunction]
pub fn py_zmq_sub_etcd(name: String, endpoint: String) -> PyResult<(PyStream, PyStream)> {
use wingfoil::adapters::zmq::EtcdRegistry;
let (data, status) = zmq_sub::<Vec<u8>>((name.as_str(), EtcdRegistry::new(endpoint)))
.map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?;
Ok(streams_to_py(data, status))
}
fn streams_to_py(
data: Rc<dyn wingfoil::Stream<wingfoil::Burst<Vec<u8>>>>,
status: Rc<dyn wingfoil::Stream<ZmqStatus>>,
) -> (PyStream, PyStream) {
let data_py = data.map(|burst| {
Python::attach(|py| {
let items: Vec<Py<PyAny>> = burst
.into_iter()
.map(|b| pyo3::types::PyBytes::new(py, &b).into_any().unbind())
.collect();
PyElement::new(
pyo3::types::PyList::new(py, items)
.unwrap()
.into_any()
.unbind(),
)
})
});
let status_py = status.map(|s| {
Python::attach(|py| {
let s_str = match s {
ZmqStatus::Connected => "connected",
ZmqStatus::Disconnected => "disconnected",
};
PyElement::new(s_str.into_pyobject(py).unwrap().into_any().unbind())
})
});
(PyStream(data_py), PyStream(status_py))
}
pub fn py_zmq_pub_inner(stream: &Rc<dyn Stream<PyElement>>, port: u16) -> Rc<dyn Node> {
let bytes_stream: Rc<dyn Stream<Vec<u8>>> = stream.map(|elem| {
Python::attach(|py| {
elem.as_ref().extract::<Vec<u8>>(py).unwrap_or_else(|e| {
log::error!("zmq_pub: stream value is not bytes: {e}");
Vec::new()
})
})
});
bytes_stream.zmq_pub(port, ())
}
#[cfg(feature = "etcd")]
pub fn py_zmq_pub_etcd_inner(
stream: &Rc<dyn Stream<PyElement>>,
name: String,
port: u16,
endpoint: String,
) -> Rc<dyn Node> {
use wingfoil::adapters::zmq::EtcdRegistry;
let registry = EtcdRegistry::new(endpoint);
let bytes_stream = py_bytes_stream(stream, "zmq_pub_etcd");
bytes_stream.zmq_pub(port, (name.as_str(), registry))
}
#[cfg(feature = "etcd")]
pub fn py_zmq_pub_etcd_on_inner(
stream: &Rc<dyn Stream<PyElement>>,
name: String,
address: String,
port: u16,
endpoint: String,
) -> Rc<dyn Node> {
use wingfoil::adapters::zmq::EtcdRegistry;
let registry = EtcdRegistry::new(endpoint);
let bytes_stream = py_bytes_stream(stream, "zmq_pub_etcd_on");
bytes_stream.zmq_pub_on(&address, port, (name.as_str(), registry))
}
#[cfg(feature = "etcd")]
fn py_bytes_stream(
stream: &Rc<dyn Stream<PyElement>>,
label: &'static str,
) -> Rc<dyn Stream<Vec<u8>>> {
stream.map(move |elem| {
Python::attach(|py| {
elem.as_ref().extract::<Vec<u8>>(py).unwrap_or_else(|e| {
log::error!("{label}: stream value is not bytes: {e}");
Vec::new()
})
})
})
}