use crate::py_element::PyElement;
use crate::py_stream::PyStream;
use pyo3::prelude::*;
use pyo3::types::{PyBytes, PyDict, PyList};
use std::rc::Rc;
use std::time::Duration;
use wingfoil::adapters::etcd::{EtcdConnection, EtcdEntry, EtcdEventKind, etcd_pub, etcd_sub};
use wingfoil::{Burst, Node, Stream, StreamOperators};
#[pyfunction]
pub fn py_etcd_sub(endpoint: String, prefix: String) -> PyStream {
let conn = EtcdConnection::new(endpoint);
let stream = etcd_sub(conn, prefix);
let py_stream = stream.map(|burst| {
Python::attach(|py| {
let items: Vec<Py<PyAny>> = burst
.into_iter()
.map(|event| {
let dict = PyDict::new(py);
let kind_str = match event.kind {
EtcdEventKind::Put => "put",
EtcdEventKind::Delete => "delete",
};
dict.set_item("kind", kind_str).unwrap();
dict.set_item("key", &event.entry.key).unwrap();
dict.set_item("value", PyBytes::new(py, &event.entry.value))
.unwrap();
dict.set_item("revision", event.revision).unwrap();
dict.into_any().unbind()
})
.collect();
PyElement::new(PyList::new(py, items).unwrap().into_any().unbind())
})
});
PyStream(py_stream)
}
pub fn py_etcd_pub_inner(
stream: &Rc<dyn Stream<PyElement>>,
endpoint: String,
lease_ttl: Option<f64>,
force: bool,
) -> Rc<dyn Node> {
let conn = EtcdConnection::new(endpoint);
let lease_duration = lease_ttl.map(Duration::from_secs_f64);
let burst_stream: Rc<dyn Stream<Burst<EtcdEntry>>> = stream.map(move |elem| {
Python::attach(|py| {
let obj = elem.as_ref().bind(py);
if let Ok(dict) = obj.cast_exact::<PyDict>() {
std::iter::once(dict_to_entry(dict)).collect()
} else if let Ok(list) = obj.cast_exact::<PyList>() {
list.iter()
.filter_map(|item| item.cast_exact::<PyDict>().ok().map(|d| dict_to_entry(d)))
.collect()
} else {
log::error!("etcd_pub: stream value must be a dict or list of dicts");
Burst::new()
}
})
});
etcd_pub(conn, &burst_stream, lease_duration, force)
}
fn dict_to_entry(dict: &Bound<'_, PyDict>) -> EtcdEntry {
let key = dict
.get_item("key")
.ok()
.flatten()
.and_then(|v| v.extract::<String>().ok())
.unwrap_or_default();
let value = dict
.get_item("value")
.ok()
.flatten()
.and_then(|v| v.extract::<Vec<u8>>().ok())
.unwrap_or_default();
EtcdEntry { key, value }
}