use crate::websocket::InverseWebSocketClient;
use std::sync::Arc;
use tokio::sync::{ broadcast, oneshot, Mutex };
use tokio::task::JoinHandle;
use std::time::SystemTime;
use serde::{ Deserialize, Serialize };
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct Event {
#[serde(flatten)]
pub data: EventData,
pub category: Category,
pub level: Level,
pub message: Option<String>,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(rename_all = "kebab-case")]
pub enum Category {
System,
Input,
Session,
Device,
Module,
Feature,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(rename_all = "lowercase")]
pub enum Level {
Info,
Notice,
Warning,
Error,
Critical,
Panic,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(tag = "name", content = "data")]
#[serde(rename_all = "kebab-case")]
pub enum EventData {
InvalidIoChannel {
received_data: String,
},
CommandDeprecated {
command: String,
},
DeviceConnected {
device_family: String,
device_id: String,
},
DeviceDisconnected {
device_family: String,
device_id: String,
},
SystemRateReport {
min_frequency: u32,
max_frequency: u32,
mean_frequency: u32,
median_frequency: u32,
},
DeviceSleep {
device_family: String,
device_id: String,
},
InvalidValue {
field: String,
reason: String,
context: String,
},
}
impl Event {
pub fn name(&self) -> &'static str {
match &self.data {
EventData::InvalidIoChannel { .. } => "invalid-io-channel",
EventData::CommandDeprecated { .. } => "command-deprecated",
EventData::DeviceConnected { .. } => "device-connected",
EventData::DeviceDisconnected { .. } => "device-disconnected",
EventData::SystemRateReport { .. } => "system-rate-report",
EventData::DeviceSleep { .. } => "device-sleep",
EventData::InvalidValue { .. } => "invalid-value",
}
}
}
#[derive(Debug, Clone)]
pub struct TimestampedEvent {
pub timestamp: SystemTime,
pub event: Event,
}
pub struct EventChannel {
pub ws_client: InverseWebSocketClient,
events: Arc<Mutex<Vec<TimestampedEvent>>>,
event_tx: broadcast::Sender<TimestampedEvent>,
listen_task: Option<JoinHandle<()>>,
listen_stop_tx: Option<oneshot::Sender<()>>,
}
impl EventChannel {
pub async fn new(ws_url: &str) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let mut ws_client = InverseWebSocketClient::new(ws_url);
ws_client.connect().await?;
let ws_client_for_struct = ws_client.clone();
let (tx, _rx) = broadcast::channel(128);
let events = Arc::new(Mutex::new(Vec::new()));
let (stop_tx, stop_rx) = oneshot::channel();
let listen_task = Some(Self::spawn_listener(
ws_client,
Arc::clone(&events),
tx.clone(),
stop_rx,
));
Ok(Self {
ws_client: ws_client_for_struct,
events,
event_tx: tx,
listen_task,
listen_stop_tx: Some(stop_tx),
})
}
pub async fn shutdown(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
drop(self.listen_stop_tx.take());
if let Some(handle) = self.listen_task.take() {
let _ = handle.await;
}
self.ws_client.disconnect().await
}
pub fn subscribe(&self) -> broadcast::Receiver<TimestampedEvent> {
self.event_tx.subscribe()
}
pub async fn read_events(&self) -> Vec<TimestampedEvent> {
std::mem::take(&mut *self.events.lock().await)
}
fn spawn_listener(
mut ws_client: InverseWebSocketClient,
events: Arc<Mutex<Vec<TimestampedEvent>>>,
tx: broadcast::Sender<TimestampedEvent>,
stop_rx: oneshot::Receiver<()>,
) -> JoinHandle<()> {
tokio::spawn(async move {
ws_client.listen(move |message| {
let events_clone = Arc::clone(&events);
let tx_clone = tx.clone();
async move {
match serde_json::from_str::<Event>(&message) {
Ok(raw_evt) => {
let ts = TimestampedEvent {
timestamp: SystemTime::now(),
event: raw_evt,
};
events_clone.lock().await.push(ts.clone());
let _ = tx_clone.send(ts);
}
Err(e) => {
log::trace!(
"Failed to parse event from WebSocket message: {}. Message was: {}",
e,
message
);
}
}
}
}, stop_rx).await;
})
}
}
impl Drop for EventChannel {
fn drop(&mut self) {
drop(self.listen_stop_tx.take());
if let Some(handle) = self.listen_task.take() {
handle.abort();
}
}
}