use std::time::{SystemTime, UNIX_EPOCH};
use pyo3::exceptions::{PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::PyDict;
use tokio::runtime::Runtime;
use crate::client::{ClientConfig, WireBandClient};
use crate::frame;
fn unix_ts() -> f64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64()
}
#[pyclass(name = "WireBandClient")]
pub struct PyWireBandClient {
client: WireBandClient,
runtime: Runtime,
}
#[pymethods]
impl PyWireBandClient {
#[new]
#[pyo3(signature = (
backend_url,
device_id,
api_key = None,
buffer_size = 50_000,
flush_batch = 200,
flush_interval_secs = 1.0,
))]
fn new(
backend_url: String,
device_id: String,
api_key: Option<String>,
buffer_size: usize,
flush_batch: usize,
flush_interval_secs: f64,
) -> PyResult<Self> {
let runtime = Runtime::new()
.map_err(|e| PyRuntimeError::new_err(format!("tokio runtime: {e}")))?;
let config = ClientConfig {
backend_url,
device_id,
api_key,
buffer_size,
flush_batch,
flush_interval: std::time::Duration::from_secs_f64(flush_interval_secs),
..Default::default()
};
let client = WireBandClient::new(config);
runtime.block_on(async { client.start() });
Ok(Self { client, runtime })
}
fn buffer_event(
&self,
py: Python<'_>,
topic: String,
symbol: u16,
data: String,
) -> PyResult<()> {
let payload: serde_json::Value = serde_json::from_str(&data)
.map_err(|e| PyValueError::new_err(format!("invalid JSON: {e}")))?;
let encoded = frame::encode(symbol, &topic, &payload);
let ts = unix_ts();
py.allow_threads(|| {
self.runtime.block_on(self.client.buffer_event(topic, symbol, encoded, ts));
});
Ok(())
}
fn ingest(
&self,
py: Python<'_>,
data: String,
topic: String,
symbol: Option<u16>,
) -> PyResult<()> {
let value: serde_json::Value = serde_json::from_str(&data)
.map_err(|e| PyValueError::new_err(format!("invalid JSON: {e}")))?;
py.allow_threads(|| {
self.runtime.block_on(self.client.ingest(value, &topic, symbol));
});
Ok(())
}
fn flush(&self, py: Python<'_>) -> u64 {
py.allow_threads(|| self.runtime.block_on(self.client.flush()))
}
fn buffer_depth(&self) -> usize {
self.client.buffer_depth()
}
fn stats<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
let s = self.runtime.block_on(self.client.stats());
let dict = PyDict::new_bound(py);
dict.set_item("events_ingested", s.events_ingested)?;
dict.set_item("events_flushed", s.events_flushed)?;
dict.set_item("events_dropped", s.events_dropped)?;
dict.set_item("flush_errors", s.flush_errors)?;
dict.set_item("bytes_sent", s.bytes_sent)?;
Ok(dict)
}
fn __repr__(&self) -> String {
let cfg = self.client.config();
format!(
"WireBandClient(device_id={:?}, backend_url={:?}, buffer_depth={})",
cfg.device_id,
cfg.backend_url,
self.client.buffer_depth()
)
}
}
impl Drop for PyWireBandClient {
fn drop(&mut self) {
self.runtime.block_on(self.client.stop());
}
}
#[pymodule]
fn wireband_edge_rs(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyWireBandClient>()?;
Ok(())
}