haply 1.3.1

Haply Robotics Client Library for the Inverse Service
Documentation
//! Event channel interface for Haply device. Usually uses port 10020.

use crate::websocket::InverseWebSocketClient;
use std::sync::Arc;
use tokio::sync::{ broadcast, oneshot, Mutex };
use tokio::task::JoinHandle;
use std::time::SystemTime;

use serde::{ Deserialize, Serialize };

#[derive(Debug, Deserialize, Serialize, Clone)]
/// Generic event wrapper with typed payload
pub struct Event {
    #[serde(flatten)]
    pub data: EventData,
    pub category: Category,
    pub level: Level,
    pub message: Option<String>,
}

#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(rename_all = "kebab-case")]
/// Event categories for filtering and organization
pub enum Category {
    System,
    Input,
    Session,
    Device,
    Module,
    Feature,
}

#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(rename_all = "lowercase")]
/// Severity or importance of the event
pub enum Level {
    Info,
    Notice,
    Warning,
    Error,
    Critical,
    Panic,
}

/// Specific event data variants with associated fields
#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(tag = "name", content = "data")]
#[serde(rename_all = "kebab-case")]
pub enum EventData {
    InvalidIoChannel {
        received_data: String,
    },
    CommandDeprecated {
        command: String,
    },
    DeviceConnected {
        device_family: String,
        device_id: String,
    },
    DeviceDisconnected {
        device_family: String,
        device_id: String,
    },
    SystemRateReport {
        min_frequency: u32,
        max_frequency: u32,
        mean_frequency: u32,
        median_frequency: u32,
    },
    DeviceSleep {
        device_family: String,
        device_id: String,
    },
    InvalidValue {
        field: String,
        reason: String,
        context: String,
    },
}

impl Event {
    /// Returns a short, human-readable name for the event type
    /// It matches with the "name" field in the JSON event payload except for "other" events which are not supported yet
    pub fn name(&self) -> &'static str {
        match &self.data {
            EventData::InvalidIoChannel { .. } => "invalid-io-channel",
            EventData::CommandDeprecated { .. } => "command-deprecated",
            EventData::DeviceConnected { .. } => "device-connected",
            EventData::DeviceDisconnected { .. } => "device-disconnected",
            EventData::SystemRateReport { .. } => "system-rate-report",
            EventData::DeviceSleep { .. } => "device-sleep",
            EventData::InvalidValue { .. } => "invalid-value",
        }
    }
}

/// A timestamped wrapper around Event
#[derive(Debug, Clone)]
pub struct TimestampedEvent {
    pub timestamp: SystemTime,
    pub event: Event,
}

/// Main struct representing an Event Channel for Haply devices.
/// Handles all event-driven interactions with the Haply Inverse Service.
pub struct EventChannel {
    pub ws_client: InverseWebSocketClient,
    /// In‐memory FIFO buffer for events
    events: Arc<Mutex<Vec<TimestampedEvent>>>,
    /// Broadcast channel for events
    event_tx: broadcast::Sender<TimestampedEvent>,
    listen_task: Option<JoinHandle<()>>,
    listen_stop_tx: Option<oneshot::Sender<()>>,
}

impl EventChannel {
    /// Creates a new EventChannel instance and establishes connections.
    /// Initializes both HTTP and WebSocket connections, sets up state management,
    /// and begins listening for device updates.
    pub async fn new(ws_url: &str) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
        let mut ws_client = InverseWebSocketClient::new(ws_url);
        ws_client.connect().await?;
        let ws_client_for_struct = ws_client.clone();

        let (tx, _rx) = broadcast::channel(128);
        let events = Arc::new(Mutex::new(Vec::new()));
        let (stop_tx, stop_rx) = oneshot::channel();
        let listen_task = Some(Self::spawn_listener(
            ws_client,
            Arc::clone(&events),
            tx.clone(),
            stop_rx,
        ));

        Ok(Self {
            ws_client: ws_client_for_struct,
            events,
            event_tx: tx,
            listen_task,
            listen_stop_tx: Some(stop_tx),
        })
    }

    /// shutdown the event channel and its connections
    /// This will close the WebSocket connection and clean up resources.
    pub async fn shutdown(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        drop(self.listen_stop_tx.take());
        if let Some(handle) = self.listen_task.take() {
            let _ = handle.await;
        }
        self.ws_client.disconnect().await
    }

    /// Subscribers call this to get a Receiver
    pub fn subscribe(&self) -> broadcast::Receiver<TimestampedEvent> {
        self.event_tx.subscribe()
    }

    /// Return all buffered events
    pub async fn read_events(&self) -> Vec<TimestampedEvent> {
        std::mem::take(&mut *self.events.lock().await)
    }

    fn spawn_listener(
        mut ws_client: InverseWebSocketClient,
        events: Arc<Mutex<Vec<TimestampedEvent>>>,
        tx: broadcast::Sender<TimestampedEvent>,
        stop_rx: oneshot::Receiver<()>,
    ) -> JoinHandle<()> {
        tokio::spawn(async move {
            ws_client.listen(move |message| {
                let events_clone = Arc::clone(&events);
                let tx_clone = tx.clone();
                async move {
                    match serde_json::from_str::<Event>(&message) {
                        Ok(raw_evt) => {
                            let ts = TimestampedEvent {
                                timestamp: SystemTime::now(),
                                event: raw_evt,
                            };
                            events_clone.lock().await.push(ts.clone());
                            let _ = tx_clone.send(ts);
                        }
                        Err(e) => {
                            log::trace!(
                                "Failed to parse event from WebSocket message: {}. Message was: {}",
                                e,
                                message
                            );
                        }
                    }
                }
            }, stop_rx).await;
        })
    }
}

impl Drop for EventChannel {
    fn drop(&mut self) {
        drop(self.listen_stop_tx.take());
        if let Some(handle) = self.listen_task.take() {
            handle.abort();
        }
    }
}