use futures::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
#[cfg(unix)]
use tracing::{debug, info, warn};
#[cfg(not(unix))]
macro_rules! debug {
($($args:tt)*) => {};
}
#[cfg(not(unix))]
macro_rules! info {
($($args:tt)*) => {};
}
#[cfg(not(unix))]
macro_rules! warn {
($($args:tt)*) => {
eprintln!("[WARN] {}", format!($($args)*))
};
}
#[cfg(not(unix))]
macro_rules! trace {
($($args:tt)*) => {};
}
#[cfg(not(unix))]
macro_rules! error {
($($args:tt)*) => {
eprintln!("[ERROR] {}", format!($($args)*))
};
}
#[cfg(not(unix))]
macro_rules! instrument {
($($args:tt)*) => {};
}
use uuid::Uuid;
use warp::ws::{Message, WebSocket};
use warp::Filter;
use crate::ev_formats::streaming::Event;
type Events = Vec<Event>;
use polars::prelude::*;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebServerConfig {
pub port: u16,
pub host: String,
pub max_clients: usize,
pub event_batch_size: usize,
pub batch_interval_ms: u64,
}
impl Default for WebServerConfig {
fn default() -> Self {
Self {
port: 3030,
host: "127.0.0.1".to_string(),
max_clients: 100,
event_batch_size: 1000,
batch_interval_ms: 16, }
}
}
#[derive(Debug)]
pub struct WebSocketClient {
pub id: Uuid,
pub tx: tokio::sync::mpsc::UnboundedSender<Message>,
}
pub struct EventBroadcaster {
clients: HashMap<Uuid, WebSocketClient>,
event_buffer: Events,
}
impl Default for EventBroadcaster {
fn default() -> Self {
Self::new()
}
}
impl EventBroadcaster {
pub fn new() -> Self {
Self {
clients: HashMap::new(),
event_buffer: Events::with_capacity(10000),
}
}
pub fn add_client(&mut self, client: WebSocketClient) -> Result<(), String> {
self.clients.insert(client.id, client);
Ok(())
}
pub fn remove_client(&mut self, id: &Uuid) {
self.clients.remove(id);
}
pub async fn broadcast_events(&mut self, events: Events) {
self.broadcast_events_impl(events).await;
}
pub async fn broadcast_events_from_dataframe(
&mut self,
df: LazyFrame,
) -> Result<(), PolarsError> {
let events = dataframe_to_events_for_visualization(df)?;
self.broadcast_events_impl(events).await;
Ok(())
}
async fn broadcast_events_impl(&mut self, events: Events) {
let event_count = events.len();
self.event_buffer.extend(events);
if self.event_buffer.len() >= 100
|| (!self.clients.is_empty() && self.event_buffer.len() >= 20)
{
let batch = std::mem::replace(&mut self.event_buffer, Events::with_capacity(5000));
if !batch.is_empty() {
let message = Self::serialize_events(&batch);
if batch.len() > 50 {
debug!(
batch_len = batch.len(),
message_len = message.len(),
clients_len = self.clients.len(),
"Broadcasting events to clients"
);
}
let disconnected_clients: Vec<Uuid> = self
.clients
.iter()
.filter_map(|(id, client)| {
match client.tx.send(Message::binary(message.clone())) {
Ok(_) => {
debug!(client_id = %id, "Message sent to client");
None
}
Err(e) => {
warn!(client_id = %id, error = ?e, "Failed to send message to client");
Some(*id)
}
}
})
.collect();
for id in disconnected_clients {
self.remove_client(&id);
}
}
} else {
if self.event_buffer.len() % 100 == 0 && !self.event_buffer.is_empty() {
debug!(
event_count = event_count,
total = self.event_buffer.len(),
clients = self.clients.len(),
"Buffering events"
);
}
}
}
fn serialize_events(events: &[Event]) -> Vec<u8> {
let mut buffer = Vec::with_capacity(13 + events.len() * 13);
buffer.push(1u8); if let Some(first_event) = events.first() {
let timestamp_us = (first_event.t * 1_000_000.0) as u64;
buffer.extend_from_slice(×tamp_us.to_le_bytes());
} else {
buffer.extend_from_slice(&0u64.to_le_bytes());
}
buffer.extend_from_slice(&(events.len() as u32).to_le_bytes());
for event in events {
buffer.extend_from_slice(&event.x.to_le_bytes());
buffer.extend_from_slice(&event.y.to_le_bytes());
let timestamp_us = (event.t * 1_000_000.0) as u64;
buffer.extend_from_slice(×tamp_us.to_le_bytes());
let polarity = if event.polarity > 0 { 1u8 } else { 0u8 };
buffer.push(polarity);
}
buffer
}
}
pub struct EventWebServer {
config: WebServerConfig,
broadcaster: Arc<Mutex<EventBroadcaster>>,
}
impl EventWebServer {
pub fn new(config: WebServerConfig) -> Self {
Self {
config,
broadcaster: Arc::new(Mutex::new(EventBroadcaster::new())),
}
}
pub async fn run(&self) {
let broadcaster = self.broadcaster.clone();
let websocket_route = warp::path("ws")
.and(warp::ws())
.and(warp::any().map(move || broadcaster.clone()))
.map(
|ws: warp::ws::Ws, broadcaster: Arc<Mutex<EventBroadcaster>>| {
ws.on_upgrade(move |socket| handle_websocket(socket, broadcaster))
},
);
let static_route = warp::path::end()
.and(warp::get())
.map(|| warp::reply::html(include_str!("../../static/index.html")));
let routes = websocket_route.or(static_route);
info!(
host = %self.config.host,
port = self.config.port,
"WebSocket server listening"
);
warp::serve(routes)
.run(([127, 0, 0, 1], self.config.port))
.await;
}
pub fn broadcaster(&self) -> Arc<Mutex<EventBroadcaster>> {
self.broadcaster.clone()
}
}
async fn handle_websocket(ws: WebSocket, broadcaster: Arc<Mutex<EventBroadcaster>>) {
let client_id = Uuid::new_v4();
let (mut ws_tx, mut ws_rx) = ws.split();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let client = WebSocketClient {
id: client_id,
tx: tx.clone(),
};
broadcaster.lock().await.add_client(client).unwrap();
info!(
client_id = %client_id,
total = broadcaster.lock().await.clients.len(),
"Client connected"
);
let mut send_task = tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
if ws_tx.send(msg).await.is_err() {
break;
}
}
});
let mut recv_task = tokio::spawn(async move {
while let Some(result) = ws_rx.next().await {
match result {
Ok(msg) => {
if msg.is_close() {
break;
}
}
Err(_) => break,
}
}
});
tokio::select! {
_ = &mut send_task => recv_task.abort(),
_ = &mut recv_task => send_task.abort(),
}
broadcaster.lock().await.remove_client(&client_id);
info!(
client_id = %client_id,
remaining = broadcaster.lock().await.clients.len(),
"Client disconnected"
);
}
fn dataframe_to_events_for_visualization(df: LazyFrame) -> Result<Events, PolarsError> {
let df = df.collect()?;
let x_series = df.column("x")?;
let y_series = df.column("y")?;
let t_series = df.column("t")?;
let polarity_series = df.column("polarity")?;
let x_values = x_series.i64()?.into_no_null_iter().collect::<Vec<_>>();
let y_values = y_series.i64()?.into_no_null_iter().collect::<Vec<_>>();
let t_values = t_series.f64()?.into_no_null_iter().collect::<Vec<_>>();
let polarity_values = polarity_series
.i64()?
.into_no_null_iter()
.collect::<Vec<_>>();
let events = x_values
.into_iter()
.zip(y_values)
.zip(t_values)
.zip(polarity_values)
.map(|(((x, y), t), p)| Event {
x: x as u16,
y: y as u16,
t,
polarity: p > 0,
})
.collect();
Ok(events)
}
pub mod python {
use super::*;
use crate::from_numpy_arrays;
use numpy::PyReadonlyArray1;
use pyo3::prelude::*;
#[pyclass]
#[derive(Clone)]
pub struct PyWebServerConfig {
pub inner: WebServerConfig,
}
#[pymethods]
impl PyWebServerConfig {
#[new]
#[pyo3(signature = (
port = None,
host = None,
max_clients = None,
event_batch_size = None,
batch_interval_ms = None
))]
pub fn new(
port: Option<u16>,
host: Option<String>,
max_clients: Option<usize>,
event_batch_size: Option<usize>,
batch_interval_ms: Option<u64>,
) -> Self {
let mut config = WebServerConfig::default();
if let Some(p) = port {
config.port = p;
}
if let Some(h) = host {
config.host = h;
}
if let Some(mc) = max_clients {
config.max_clients = mc;
}
if let Some(ebs) = event_batch_size {
config.event_batch_size = ebs;
}
if let Some(bi) = batch_interval_ms {
config.batch_interval_ms = bi;
}
Self { inner: config }
}
#[getter]
pub fn port(&self) -> u16 {
self.inner.port
}
#[getter]
pub fn host(&self) -> String {
self.inner.host.clone()
}
}
#[pyclass]
pub struct PyEventWebServer {
server: EventWebServer,
runtime: Option<tokio::runtime::Runtime>,
}
#[pymethods]
impl PyEventWebServer {
#[new]
pub fn new(config: &PyWebServerConfig) -> Self {
let server = EventWebServer::new(config.inner.clone());
let runtime = tokio::runtime::Runtime::new().ok();
Self { server, runtime }
}
pub fn run(&mut self) -> PyResult<()> {
if let Some(ref runtime) = self.runtime {
runtime.block_on(self.server.run());
Ok(())
} else {
Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
"Failed to create tokio runtime",
))
}
}
pub fn start(&mut self) -> PyResult<()> {
if let Some(ref runtime) = self.runtime {
let server = EventWebServer::new(self.server.config.clone());
runtime.spawn(async move {
server.run().await;
});
Ok(())
} else {
Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
"Failed to create tokio runtime",
))
}
}
pub fn send_events(
&self,
xs: PyReadonlyArray1<i64>,
ys: PyReadonlyArray1<i64>,
ts: PyReadonlyArray1<f64>,
ps: PyReadonlyArray1<i64>,
) -> PyResult<()> {
let events = from_numpy_arrays(xs, ys, ts, ps);
if let Some(ref runtime) = self.runtime {
let broadcaster = self.server.broadcaster();
runtime.block_on(async {
broadcaster.lock().await.broadcast_events(events).await;
});
Ok(())
} else {
Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
"Runtime not available",
))
}
}
pub fn get_url(&self) -> String {
format!(
"http://{host}:{port}",
host = self.server.config.host,
port = self.server.config.port
)
}
}
#[pyfunction]
pub fn create_web_server(config: Option<&PyWebServerConfig>) -> PyEventWebServer {
let config = config.map(|c| c.inner.clone()).unwrap_or_default();
let server = EventWebServer::new(config);
let runtime = tokio::runtime::Runtime::new().ok();
PyEventWebServer { server, runtime }
}
#[pyfunction]
pub fn create_web_server_config() -> PyWebServerConfig {
PyWebServerConfig {
inner: WebServerConfig::default(),
}
}
}