mechutil 0.8.1

Utility structures and functions for mechatronics applications.
Documentation
//
// Copyright (C) 2025 Automated Design Corp.. All Rights Reserved.
// Created Date: 2025-02-18 14:01:54
// -----
// Last Modified: 2025-02-24 21:21:37
// -----
//
//

use async_trait::async_trait;

use std::sync::Arc;
use std::collections::HashMap;
use tokio::sync::Mutex;
use tokio::sync::mpsc;

use super::subscription_handler::{
    SubscriptionFilter, 
    SubscriptionHandler, 
    SubscriptionResponse, 
    SubscriptionResponseTx,
    SubscriptionBroadcast,
    SubscriptionBroadcastTx,
    SubscriptionBroadcastRx
};




/// Represents a subscription item containing a broadcast channel and a filter.
struct SubscriptionItem<R> {
    /// unique id idenfiying the subscription
    id : usize,
    
    /// The sender that will broadcast the message to the subscriber.
    /// We use a direct pipeline to the subscriber to avoid backpressure issues.
    broadcast_tx: SubscriptionBroadcastTx<R>,
    
    /// A filter that returns true if the conditions of this topic match and the
    /// message should be broadcast to subscribers.
    filter: Box<dyn SubscriptionFilter<R> + Send + Sync>,
}

/// A default filter that always returns true. In other words, no filter.
pub struct DefaultSubscriptionFilter;

impl<R> SubscriptionFilter<R> for DefaultSubscriptionFilter {
    fn matches(&self, _message: &R) -> bool {
        true
    }
}


#[async_trait]
pub trait SubscriptionNode<R> {
    async fn subscription_message(&mut self, message: R) -> Result<(), anyhow::Error>;
}

/// Implements the SubscriptionHandler trait in a concrete struct that
/// manages subscriptions. This struct will store subscriptions in a HashMap
/// where the key is the topic and the value is a list of subscription items.
pub struct SubscriptionManagerV2<R> 
where
    R: Clone + Send + Sync + 'static,
{
    subscriptions: Arc<Mutex<HashMap<String, Vec<SubscriptionItem<R>>>>>,  
    id_count : usize,
}

impl<R> SubscriptionManagerV2<R>
where
    R: Clone + Send + Sync + 'static,
{
    /// Creates a new SubscriptionManager instance.
    pub fn new() -> Self {
        Self {
            subscriptions: Arc::new(Mutex::new(HashMap::new())),
            id_count : 0
        }
    }

    /// Spawns a task to process incoming messages from the receiver.
    ///
    /// # Parameters
    /// - `receiver`: The receiver channel to listen for incoming messages.
    /// - `callback`: A callback function to handle each incoming message.
    pub fn spawn_message_processor<H>(
        &self,
        mut receiver: SubscriptionBroadcastRx<R>,
        mut handler: H,
    ) where
        H: SubscriptionNode<R> + Send + 'static,
        R: Clone + Send + Sync + 'static,
    {
    
        tokio::spawn(async move {
            while let Some(message) = receiver.recv().await {
                if let Err(err) = handler.subscription_message(
                    message.message
                ).await {
                    log::error!("Error in message handler: {}", err);
                }
    
                // // Broadcast the message to all subscribers
                // let subscriptions = subscriptions.lock().await;
                // for (topic, subscribers) in subscriptions.iter() {
                //     for subscriber in subscribers {
                //         if subscriber.filter.matches(&message) {
                //             if let Err(err) = subscriber.broadcast_tx.send(message.clone()).await {
                //                 log::error!("Failed to broadcast message to subscriber: {}", err);
                //             }
                //         }
                //     }
                // }
            }
        });
    }
}

#[async_trait]
impl<R> SubscriptionHandler<R> for SubscriptionManagerV2<R>
where
    R: Clone + Send + Sync + 'static,
{
    /// Subscribes to a topic with a given broadcast channel and responds to the sender.
    ///
    /// # Parameters
    /// - `topic`: The topic to subscribe to.
    /// - `subscription_information`: Information about the subscription.
    /// - `broadcast_tx`: The broadcast channel to send messages to the subscriber.
    /// - `respond_to`: The channel to send a response to acknowledge the subscription.
    async fn subscribe(
        &mut self,
        topic: String,        
        broadcast_tx: SubscriptionBroadcastTx<R>,
        respond_to: SubscriptionResponseTx        
    ) {
        log::info!("Subcription Manager subscribing to topic: {} with id {}", topic, self.id_count + 1);

        let mut subscriptions = self.subscriptions.lock().await;
        subscriptions
            .entry(topic.clone())
            .or_insert_with(Vec::new)
            .push(SubscriptionItem {
                broadcast_tx,
                id : self.id_count + 1,
                filter: Box::new(DefaultSubscriptionFilter),
            });

        let count = subscriptions.get(&topic).unwrap().len();
        log::info!("Subcription Manager topic: {} has {} subscribers", topic, count);

        self.id_count += 1;
        // Send a response to acknowledge subscription
        let _ = respond_to.send(SubscriptionResponse{
            id : self.id_count
        }); 
    }

    /// Subscribes to a topic with a given broadcast channel and a filter, then responds to the sender.
    ///
    /// # Parameters
    /// - `topic`: The topic to subscribe to.
    /// - `subscription_information`: Information about the subscription.
    /// - `filter`: The filter to apply to the subscription.
    /// - `broadcast_tx`: The broadcast channel to send messages to the subscriber.
    /// - `respond_to`: The channel to send a response to acknowledge the subscription.
    async fn subscribe_with_filter(
        &mut self,
        topic: String,
        filter: Box<dyn SubscriptionFilter<R> + Send + Sync>,        
        broadcast_tx: SubscriptionBroadcastTx<R>,
        respond_to: SubscriptionResponseTx
    ) {
        let mut subscriptions = self.subscriptions.lock().await;
        subscriptions
            .entry(topic)
            .or_insert_with(Vec::new)
            .push(SubscriptionItem {
                broadcast_tx,
                id : self.id_count + 1,
                filter,
            });

        self.id_count += 1;
        let _ = respond_to.send(SubscriptionResponse{
            id : self.id_count
        }); 
    }

    /// Unsubscribes from a topic and responds to the sender.
    ///
    /// # Parameters
    /// - `topic`: The topic to unsubscribe from.
    /// - `subscription_information`: Information about the subscription.
    /// - `respond_to`: The channel to send a response to acknowledge the unsubscription.
    async fn unsubscribe(
        &mut self, 
        id: usize, 
        respond_to: SubscriptionResponseTx
    ) {
        let mut subscriptions = self.subscriptions.lock().await;
        for (_, subscribers) in subscriptions.iter_mut() {
            subscribers.retain(|subscriber| subscriber.id != id);
        }

        // Remove the topic if there are no more subscribers
        let mut topics_to_remove = Vec::new();
        for (topic, _) in subscriptions.iter() {
            log::info!("Subcription Manager topic: {} has {} subscribers", topic, subscriptions.get(topic).unwrap().len());
            if subscriptions.get(topic).map_or(true, |v| v.is_empty()) {
                topics_to_remove.push(topic.clone());
            }    
        }
        for topic in topics_to_remove {
            subscriptions.remove(&topic);
        }

        // Send a response to acknowledge unsubscription
        let _ = respond_to.send(SubscriptionResponse{
            id : id
        }); 
    }

    /// Broadcasts a message to all subscribers of a topic.
    ///
    /// # Parameters
    /// - `topic`: The topic to broadcast the message to.
    /// - `message`: The message to broadcast.
    ///
    /// # Returns
    /// - `Result<(), anyhow::Error>`: An empty result or an error if the broadcast fails.
    async fn broadcast(&self, topic: String, message: R) -> Result<(), anyhow::Error> {

        log::debug!("Subcription Manager checking topic: {}", topic);

        let subscriptions = self.subscriptions.lock().await;

        for (k,v) in subscriptions.iter() {
            log::debug!("Subcription Manager topic: {} has {} subscribers", k, v.len());

            if k == topic.as_str() {
                log::debug!("k {} == topic {}", k, topic);
            }
            else {
                log::debug!("k {} != topic {}", k, topic);
            }
        }
        
        if let Some(subscribers) = subscriptions.get(&topic) {
            
            for subscriber in subscribers {

                log::debug!("Subcription Checking filter on: {}", topic);

                if subscriber.filter.matches(&message) {

                    log::debug!("Subcription Manager broadcasting topic: {}", topic);

                    if let Err(err) = subscriber.broadcast_tx.send(
                        SubscriptionBroadcast {
                            id : subscriber.id,
                            topic : topic.clone(),
                            message : message.clone()
                        }
                    ).await {
                        log::error!("SubscriptionManager broadcast failed: {}", err);
                    }
                }
            }
        }
        else {
            log::debug!("Subcription Manager no subscribers for topic: {}", topic);
        }

        Ok(())
        
    }
    
}