mechutil 0.8.1

Utility structures and functions for mechatronics applications.
Documentation
//
// Copyright (C) 2024 - 2025 Automated Design Corp. All Rights Reserved.
//

//! ModuleHandler trait - the unified interface for both internal and external modules.
//!
//! This trait is analogous to AsyncServelet but designed for external module development.
//! External modules implement this trait to handle IPC messages from autocore-server.

use async_trait::async_trait;

use super::command_message::{CommandMessage, MessageType};
use super::error::IpcError;

/// The ModuleHandler trait defines the interface that external modules implement
/// to process IPC messages from autocore-server.
///
/// This is the external module equivalent of AsyncServelet. While AsyncServelet
/// is used for in-process modules within autocore-server, ModuleHandler is used
/// for modules that run as separate processes and communicate via IPC.
///
/// # Example
///
/// ```ignore
/// use mechutil::ipc::{ModuleHandler, CommandMessage, MessageType};
/// use async_trait::async_trait;
///
/// struct MyModbusModule {
///     holding_registers: Vec<u16>,
/// }
///
/// #[async_trait]
/// impl ModuleHandler for MyModbusModule {
///     async fn handle_message(&mut self, msg: CommandMessage) -> CommandMessage {
///         // Get the subtopic (function name portion of the FQDN topic)
///         let subtopic = msg.subtopic().unwrap_or("");
///
///         match subtopic {
///             "read_holding" => {
///                 // Handle read request - return success with data
///                 msg.into_response(serde_json::json!(self.holding_registers))
///             }
///             _ => {
///                 // Unknown command - return error
///                 msg.into_error_response(format!("Unknown command: {}", subtopic))
///             }
///         }
///     }
///
///     async fn on_initialize(&mut self) -> Result<(), anyhow::Error> {
///         // Initialize hardware, connections, etc.
///         Ok(())
///     }
///
///     async fn on_finalize(&mut self) -> Result<(), anyhow::Error> {
///         // Cleanup resources
///         Ok(())
///     }
/// }
/// ```
#[async_trait]
pub trait ModuleHandler: Send + Sync {
    /// Handle an incoming message and return a response.
    ///
    /// This is the main entry point for processing IPC messages. The module
    /// should examine the `topic` field (FQDN format: domain.subtopic) to
    /// determine what action to take, process the `data`, and return a response
    /// with `success` and either `data` or `error_message` populated.
    ///
    /// Use `msg.subtopic()` to extract just the function/subtopic portion,
    /// and `msg.into_response(data)` or `msg.into_error_response(error)` to
    /// create the response.
    ///
    /// # Arguments
    /// * `msg` - The incoming CommandMessage to process
    ///
    /// # Returns
    /// The response CommandMessage
    async fn handle_message(&mut self, msg: CommandMessage) -> CommandMessage;

    /// Called when the module should initialize.
    ///
    /// This is called after the IPC connection is established and the module
    /// has been registered with the server. Use this to initialize hardware,
    /// establish connections, load configuration, etc.
    async fn on_initialize(&mut self) -> Result<(), anyhow::Error>;

    /// Called when the module should shut down.
    ///
    /// This is called before the IPC connection is closed. Use this to
    /// clean up resources, close connections, save state, etc.
    async fn on_finalize(&mut self) -> Result<(), anyhow::Error>;

    /// Called when a client subscribes to a topic provided by this module.
    ///
    /// Override this to set up any necessary push notification mechanisms.
    /// The default implementation does nothing.
    ///
    /// # Arguments
    /// * `topic` - The topic being subscribed to
    /// * `subscriber_id` - Identifier of the subscriber
    async fn on_subscribe(&mut self, _topic: &str, _subscriber_id: &str) -> Result<(), anyhow::Error> {
        Ok(())
    }

    /// Called when a client unsubscribes from a topic.
    ///
    /// Override this to clean up any push notification mechanisms.
    /// The default implementation does nothing.
    ///
    /// # Arguments
    /// * `topic` - The topic being unsubscribed from
    /// * `subscriber_id` - Identifier of the subscriber
    async fn on_unsubscribe(&mut self, _topic: &str, _subscriber_id: &str) -> Result<(), anyhow::Error> {
        Ok(())
    }

    /// Handle a heartbeat message.
    ///
    /// Override this to perform custom heartbeat handling. The default
    /// implementation returns a heartbeat response.
    async fn on_heartbeat(&mut self) -> Result<(), anyhow::Error> {
        Ok(())
    }

    /// Get the module's domain name.
    ///
    /// This should return the unique identifier for this module instance.
    fn domain(&self) -> &str;

    /// Get the module's version string.
    ///
    /// Override to provide version information. Default returns "1.0.0".
    fn version(&self) -> &str {
        "1.0.0"
    }

    /// Get the list of capabilities/topics this module provides.
    ///
    /// Override to advertise what this module can do. This is used during
    /// registration to inform the server about available functionality.
    fn capabilities(&self) -> Vec<String> {
        Vec::new()
    }

    /// Get the catalog of FQDNs this module serves.
    fn get_catalog(&self) -> Vec<String> {
        Vec::new()
    }

    /// Return variable names to resolve when SHM is configured.
    ///
    /// Override this to request automatic SHM pointer resolution.
    /// Default: empty (module does not use automatic SHM resolution).
    fn shm_variable_names(&self) -> Vec<String> {
        Vec::new()
    }

    /// Called after SHM pointers are resolved.
    ///
    /// Wire them into your module state here.
    /// Default: no-op.
    async fn on_shm_configured(&mut self, _shm_map: crate::shm::ShmMap) -> Result<(), anyhow::Error> {
        Ok(())
    }
}

/// Extension trait for ModuleHandler that provides IPC-specific message handling.
///
/// This trait is automatically implemented for all ModuleHandler implementations
/// and handles different message types (heartbeat, control, subscribe, etc.).
#[async_trait]
pub trait ModuleHandlerExt: ModuleHandler {
    /// Process a CommandMessage and return a response.
    ///
    /// This wraps `handle_message` with IPC-specific handling for different
    /// message types (heartbeat, control, etc.).
    ///
    /// Returns (response, should_shutdown) where should_shutdown is true if
    /// the module should exit after sending the response.
    async fn process_message(&mut self, msg: CommandMessage) -> Result<(CommandMessage, bool), IpcError>;
}

#[async_trait]
impl<T: ModuleHandler + ?Sized> ModuleHandlerExt for T {
    async fn process_message(&mut self, msg: CommandMessage) -> Result<(CommandMessage, bool), IpcError> {
        match msg.message_type {
            MessageType::NoOp => {
                // NoOp always succeeds
                Ok((msg.into_response(serde_json::Value::Null), false))
            }

            MessageType::Heartbeat => {
                self.on_heartbeat().await.map_err(|e| IpcError::Handler(e.to_string()))?;
                Ok((CommandMessage::heartbeat(), false))
            }

            MessageType::Control => {
                // Extract control subtype from topic or data.action
                let subtopic = msg.subtopic();
                let control_type = msg.data.get("action")
                    .and_then(|a| a.as_str())
                    .unwrap_or(&subtopic);

                match control_type {
                    "initialize" => {
                        self.on_initialize().await.map_err(|e| IpcError::Handler(e.to_string()))?;
                        Ok((msg.into_response(serde_json::Value::Null), false))
                    }
                    "finalize" => {
                        log::info!("Received finalize command, shutting down...");
                        self.on_finalize().await.map_err(|e| IpcError::Handler(e.to_string()))?;
                        // Signal that we should shutdown after sending the response
                        Ok((msg.into_response(serde_json::json!({"finalized": true})), true))
                    }
                    _ => {
                        // Unknown control message, pass to handler
                        let response = self.handle_message(msg).await;
                        Ok((response, false))
                    }
                }
            }

            MessageType::Subscribe => {
                // Topic is in the msg.topic field, subscriber info in data
                let topic = &msg.topic;
                let subscriber = msg.data.get("subscriber")
                    .and_then(|v| v.as_str())
                    .unwrap_or("unknown");

                self.on_subscribe(topic, subscriber).await
                    .map_err(|e| IpcError::Handler(e.to_string()))?;

                Ok((msg.into_response(serde_json::Value::Null), false))
            }

            MessageType::Unsubscribe => {
                let topic = &msg.topic;
                let subscriber = msg.data.get("subscriber")
                    .and_then(|v| v.as_str())
                    .unwrap_or("unknown");

                self.on_unsubscribe(topic, subscriber).await
                    .map_err(|e| IpcError::Handler(e.to_string()))?;

                Ok((msg.into_response(serde_json::Value::Null), false))
            }

            MessageType::Request | MessageType::Read | MessageType::Write => {
                // Standard message handling
                let response = self.handle_message(msg).await;
                Ok((response, false))
            }

            MessageType::Response | MessageType::Broadcast => {
                // These shouldn't be received by the handler, just pass through
                Ok((msg, false))
            }
        }
    }
}

/// A simple base implementation of ModuleHandler that can be extended.
///
/// This provides a starting point for modules that want default behavior
/// for most operations.
pub struct BaseModuleHandler {
    domain: String,
    version: String,
    capabilities: Vec<String>,
    pub catalog: Vec<String>,
}

impl BaseModuleHandler {
    pub fn new(domain: &str) -> Self {
        Self {
            domain: domain.to_string(),
            version: "1.0.0".to_string(),
            capabilities: Vec::new(),
            catalog: Vec::new(),
        }
    }

    pub fn with_version(mut self, version: &str) -> Self {
        self.version = version.to_string();
        self
    }

    pub fn with_capabilities(mut self, caps: Vec<String>) -> Self {
        self.capabilities = caps;
        self
    }

    pub fn register_fqdn(&mut self, fqdn: String) {
        self.catalog.push(fqdn);
    }
}

#[async_trait]
impl ModuleHandler for BaseModuleHandler {
    async fn handle_message(&mut self, msg: CommandMessage) -> CommandMessage {
        if msg.subtopic() == "get_catalog" {
            return msg.into_response(serde_json::to_value(&self.catalog).unwrap_or(serde_json::Value::Null));
        }

        // Default implementation returns "not implemented"
        let subtopic = msg.subtopic().to_string();
        let error_msg = format!("Command '{}' not implemented", subtopic);
        msg.into_error_response(&error_msg)
    }

    async fn on_initialize(&mut self) -> Result<(), anyhow::Error> {
        log::info!("Module {} initialized", self.domain);
        Ok(())
    }

    async fn on_finalize(&mut self) -> Result<(), anyhow::Error> {
        log::info!("Module {} finalized", self.domain);
        Ok(())
    }

    fn domain(&self) -> &str {
        &self.domain
    }

    fn version(&self) -> &str {
        &self.version
    }

    fn capabilities(&self) -> Vec<String> {
        self.capabilities.clone()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    struct TestModule {
        domain: String,
        initialized: bool,
    }

    impl TestModule {
        fn new(domain: &str) -> Self {
            Self {
                domain: domain.to_string(),
                initialized: false,
            }
        }
    }

    #[async_trait]
    impl ModuleHandler for TestModule {
        async fn handle_message(&mut self, msg: CommandMessage) -> CommandMessage {
            let subtopic = msg.subtopic().to_string();
            msg.into_response(serde_json::json!({"echo": subtopic}))
        }

        async fn on_initialize(&mut self) -> Result<(), anyhow::Error> {
            self.initialized = true;
            Ok(())
        }

        async fn on_finalize(&mut self) -> Result<(), anyhow::Error> {
            self.initialized = false;
            Ok(())
        }

        fn domain(&self) -> &str {
            &self.domain
        }
    }

    #[tokio::test]
    async fn test_module_handler() {
        let mut module = TestModule::new("TEST");

        // Test initialization
        module.on_initialize().await.unwrap();
        assert!(module.initialized);

        // Test message handling
        let msg = CommandMessage::read("TEST.ping");
        let response = module.handle_message(msg).await;

        assert!(response.success);
        assert_eq!(response.data["echo"], "ping");

        // Test finalization
        module.on_finalize().await.unwrap();
        assert!(!module.initialized);
    }
}