haply 1.0.0

Haply Robotics Client Library for the Inverse Service
Documentation
//! Event channel interface for Haply device. Usually uses port 10020.

use crate::websocket::InverseWebSocketClient;
use std::sync::Arc;
use tokio::sync::{ Mutex, broadcast };
use std::time::SystemTime;

use serde::{ Deserialize, Serialize };

#[derive(Debug, Deserialize, Serialize, Clone)]
/// Generic event wrapper with typed payload
pub struct Event {
    #[serde(flatten)]
    pub data: EventData,
    pub category: Category,
    pub level: Level,
    pub message: Option<String>,
}

#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(rename_all = "kebab-case")]
/// Event categories for filtering and organization
pub enum Category {
    System,
    Input,
    Session,
    Device,
    Module,
    Feature,
}

#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(rename_all = "lowercase")]
/// Severity or importance of the event
pub enum Level {
    Info,
    Notice,
    Warning,
    Error,
    Critical,
    Panic,
}

/// Specific event data variants with associated fields
#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(tag = "name", content = "data")]
#[serde(rename_all = "kebab-case")]
pub enum EventData {
    InvalidIoChannel {
        received_data: String,
    },
    CommandDeprecated {
        command: String,
    },
    DeviceConnected {
        device_family: String,
        device_id: String,
    },
    DeviceDisconnected {
        device_family: String,
        device_id: String,
    },
    SystemRateReport {
        min_frequency: u32,
        max_frequency: u32,
        mean_frequency: u32,
        median_frequency: u32,
    },
    DeviceSleep {
        device_family: String,
        device_id: String,
    },
    InvalidValue {
        field: String,
        reason: String,
        context: String,
    },
}

impl Event {
    /// Returns a short, human-readable name for the event type
    /// It matches with the "name" field in the JSON event payload except for "other" events which are not supported yet
    pub fn name(&self) -> &'static str {
        match &self.data {
            EventData::InvalidIoChannel { .. } => "invalid-io-channel",
            EventData::CommandDeprecated { .. } => "command-deprecated",
            EventData::DeviceConnected { .. } => "device-connected",
            EventData::DeviceDisconnected { .. } => "device-disconnected",
            EventData::SystemRateReport { .. } => "system-rate-report",
            EventData::DeviceSleep { .. } => "device-sleep",
            EventData::InvalidValue { .. } => "invalid-value",
        }
    }
}

/// A timestamped wrapper around Event
#[derive(Debug, Clone)]
pub struct TimestampedEvent {
    pub timestamp: SystemTime,
    pub event: Event,
}

/// Main struct representing an Event Channel for Haply devices.
/// Handles all event-driven interactions with the Haply Inverse Service.
pub struct EventChannel {
    pub ws_client: InverseWebSocketClient,
    /// In‐memory FIFO buffer for events
    events: Arc<Mutex<Vec<TimestampedEvent>>>,
    /// Broadcast channel for events
    event_tx: broadcast::Sender<TimestampedEvent>,
}

impl EventChannel {
    /// Creates a new EventChannel instance and establishes connections.
    /// Initializes both HTTP and WebSocket connections, sets up state management,
    /// and begins listening for device updates.
    pub async fn new(ws_url: &str) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
        let mut ws_client = InverseWebSocketClient::new(ws_url);
        ws_client.connect().await?;

        // create a bounded broadcast channel
        let (tx, _rx) = broadcast::channel(128);

        let chan = Self {
            ws_client: ws_client.clone(),
            events: Arc::new(Mutex::new(Vec::new())),
            event_tx: tx,
        };

        chan.start_listening(ws_client);
        Ok(chan)
    }

    /// shutdown the event channel and its connections
    /// This will close the WebSocket connection and clean up resources.
    pub async fn shutdown(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        self.ws_client.disconnect().await
    }

    /// Subscribers call this to get a Receiver
    pub fn subscribe(&self) -> broadcast::Receiver<TimestampedEvent> {
        self.event_tx.subscribe()
    }

    /// Return all buffered events
    pub async fn read_events(&self) -> Vec<TimestampedEvent> {
        std::mem::take(&mut *self.events.lock().await)
    }

    /// Starts the WebSocket listening loop for device updates.
    /// Creates an asynchronous task that continuously monitors for
    /// device state updates and processes them accordingly.
    fn start_listening(&self, mut ws_client: InverseWebSocketClient) {
        let events_clone = Arc::clone(&self.events);
        let tx_clone = self.event_tx.clone();

        tokio::spawn(async move {
            ws_client.listen(move |message| {
                let events_clone = Arc::clone(&events_clone);
                let tx_clone = tx_clone.clone();
                async move {
                    match serde_json::from_str::<Event>(&message) {
                        Ok(raw_evt) => {
                            let ts = TimestampedEvent {
                                timestamp: SystemTime::now(),
                                event: raw_evt.clone(),
                            };
                            events_clone.lock().await.push(ts.clone());

                            // broadcast the timestamped event
                            let _ = tx_clone.send(ts);
                        }
                        Err(e) => {
                            log::trace!(
                                "Failed to parse event from WebSocket message: {}. Message was: {}",
                                e,
                                message
                            );
                        }
                    }
                }
            }).await;
        });
    }
}