use crate::websocket::InverseWebSocketClient;
use std::sync::Arc;
use tokio::sync::{ Mutex, broadcast };
use std::time::SystemTime;
use serde::{ Deserialize, Serialize };
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct Event {
#[serde(flatten)]
pub data: EventData,
pub category: Category,
pub level: Level,
pub message: Option<String>,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(rename_all = "kebab-case")]
pub enum Category {
System,
Input,
Session,
Device,
Module,
Feature,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(rename_all = "lowercase")]
pub enum Level {
Info,
Notice,
Warning,
Error,
Critical,
Panic,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(tag = "name", content = "data")]
#[serde(rename_all = "kebab-case")]
pub enum EventData {
InvalidIoChannel {
received_data: String,
},
CommandDeprecated {
command: String,
},
DeviceConnected {
device_family: String,
device_id: String,
},
DeviceDisconnected {
device_family: String,
device_id: String,
},
SystemRateReport {
min_frequency: u32,
max_frequency: u32,
mean_frequency: u32,
median_frequency: u32,
},
DeviceSleep {
device_family: String,
device_id: String,
},
InvalidValue {
field: String,
reason: String,
context: String,
},
}
impl Event {
pub fn name(&self) -> &'static str {
match &self.data {
EventData::InvalidIoChannel { .. } => "invalid-io-channel",
EventData::CommandDeprecated { .. } => "command-deprecated",
EventData::DeviceConnected { .. } => "device-connected",
EventData::DeviceDisconnected { .. } => "device-disconnected",
EventData::SystemRateReport { .. } => "system-rate-report",
EventData::DeviceSleep { .. } => "device-sleep",
EventData::InvalidValue { .. } => "invalid-value",
}
}
}
#[derive(Debug, Clone)]
pub struct TimestampedEvent {
pub timestamp: SystemTime,
pub event: Event,
}
pub struct EventChannel {
pub ws_client: InverseWebSocketClient,
events: Arc<Mutex<Vec<TimestampedEvent>>>,
event_tx: broadcast::Sender<TimestampedEvent>,
}
impl EventChannel {
pub async fn new(ws_url: &str) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let mut ws_client = InverseWebSocketClient::new(ws_url);
ws_client.connect().await?;
let (tx, _rx) = broadcast::channel(128);
let chan = Self {
ws_client: ws_client.clone(),
events: Arc::new(Mutex::new(Vec::new())),
event_tx: tx,
};
chan.start_listening(ws_client);
Ok(chan)
}
pub async fn shutdown(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
self.ws_client.disconnect().await
}
pub fn subscribe(&self) -> broadcast::Receiver<TimestampedEvent> {
self.event_tx.subscribe()
}
pub async fn read_events(&self) -> Vec<TimestampedEvent> {
std::mem::take(&mut *self.events.lock().await)
}
fn start_listening(&self, mut ws_client: InverseWebSocketClient) {
let events_clone = Arc::clone(&self.events);
let tx_clone = self.event_tx.clone();
tokio::spawn(async move {
ws_client.listen(move |message| {
let events_clone = Arc::clone(&events_clone);
let tx_clone = tx_clone.clone();
async move {
match serde_json::from_str::<Event>(&message) {
Ok(raw_evt) => {
let ts = TimestampedEvent {
timestamp: SystemTime::now(),
event: raw_evt.clone(),
};
events_clone.lock().await.push(ts.clone());
let _ = tx_clone.send(ts);
}
Err(e) => {
log::trace!(
"Failed to parse event from WebSocket message: {}. Message was: {}",
e,
message
);
}
}
}
}).await;
});
}
}