mechutil 0.8.1

Utility structures and functions for mechatronics applications.
Documentation
//
// Copyright (C) 2025 Automated Design Corp.. All Rights Reserved.
// Created Date: 2025-02-18 14:01:54
// -----
// Last Modified: 2025-02-25 06:55:50
// -----
//
//

use async_trait::async_trait;
use std::pin::Pin;
use std::sync::Arc;
use std::collections::HashMap;
use tokio::sync::Mutex;

use super::subscription_handler::{
    SubscriptionHandler, SubscriptionResponse, SubscriptionResponseTx,
};

use super::filter::{DefaultSubscriptionFilter, SubscriptionFilter};

// Define the type
pub type BroadcastCallback<R> = Box<
    dyn Fn(String, R) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + Sync>>
        + Send
        + Sync,
>;

/// Represents a subscription item containing a broadcast channel and a filter.
struct SubscriptionItem<R> {
    /// unique id idenfiying the subscription
    id: usize,

    /// A filter that returns true if the conditions of this topic match and the
    /// message should be broadcast to subscribers.
    filter: Box<dyn SubscriptionFilter<R> + Send + Sync>,
}

/// Implements the SubscriptionHandler trait in a concrete struct that
/// manages subscriptions. This struct will store subscriptions in a HashMap
/// where the key is the topic and the value is a list of subscription items.
pub struct SubscriptionManager<R>
where
    R: Clone + Send + Sync + 'static,
{
    subscriptions: Arc<Mutex<HashMap<String, Vec<SubscriptionItem<R>>>>>,
    id_count: usize,

    broadcast_callback: BroadcastCallback<R>,
}

impl<R> SubscriptionManager<R>
where
    R: Clone + Send + Sync + 'static,
{
    /// Creates a new SubscriptionManager instance.
    pub fn new(broadcast_callback: BroadcastCallback<R>) -> Self {
        Self {
            subscriptions: Arc::new(Mutex::new(HashMap::new())),
            id_count: 0,
            broadcast_callback: broadcast_callback,
        }
    }
}

#[async_trait]
impl<R> SubscriptionHandler<R> for SubscriptionManager<R>
where
    R: Clone + Send + Sync + 'static,
{
    /// Subscribes to a topic with a given broadcast channel and responds to the sender.
    ///
    /// # Parameters
    /// - `topic`: The topic to subscribe to.
    /// - `subscription_information`: Information about the subscription.
    /// - `broadcast_tx`: The broadcast channel to send messages to the subscriber.
    /// - `respond_to`: The channel to send a response to acknowledge the subscription.
    async fn subscribe(&mut self, topic: String, respond_to: SubscriptionResponseTx) {
        log::info!(
            "Subcription Manager subscribing to topic: {} with id {}",
            topic,
            self.id_count + 1
        );

        let mut subscriptions = self.subscriptions.lock().await;
        subscriptions
            .entry(topic.clone())
            .or_insert_with(Vec::new)
            .push(SubscriptionItem {
                id: self.id_count + 1,
                filter: Box::new(DefaultSubscriptionFilter),
            });

        let count = subscriptions.get(&topic).unwrap().len();
        log::info!(
            "Subcription Manager topic: {} has {} subscribers",
            topic,
            count
        );

        self.id_count += 1;
        // Send a response to acknowledge subscription
        let _ = respond_to.send(SubscriptionResponse { id: self.id_count });
    }

    /// Subscribes to a topic with a given broadcast channel and a filter, then responds to the sender.
    ///
    /// # Parameters
    /// - `topic`: The topic to subscribe to.
    /// - `subscription_information`: Information about the subscription.
    /// - `filter`: The filter to apply to the subscription.
    /// - `broadcast_tx`: The broadcast channel to send messages to the subscriber.
    /// - `respond_to`: The channel to send a response to acknowledge the subscription.
    async fn subscribe_with_filter(
        &mut self,
        topic: String,
        filter: Box<dyn SubscriptionFilter<R> + Send + Sync>,
        respond_to: SubscriptionResponseTx,
    ) {
        let mut subscriptions = self.subscriptions.lock().await;
        subscriptions
            .entry(topic)
            .or_insert_with(Vec::new)
            .push(SubscriptionItem {
                id: self.id_count + 1,
                filter,
            });

        self.id_count += 1;
        let _ = respond_to.send(SubscriptionResponse { id: self.id_count });
    }

    /// Unsubscribes from a topic and responds to the sender.
    ///
    /// # Parameters
    /// - `topic`: The topic to unsubscribe from.
    /// - `subscription_information`: Information about the subscription.
    /// - `respond_to`: The channel to send a response to acknowledge the unsubscription.
    async fn unsubscribe(&mut self, id: usize, respond_to: SubscriptionResponseTx) {
        let mut subscriptions = self.subscriptions.lock().await;
        for (_, subscribers) in subscriptions.iter_mut() {
            subscribers.retain(|subscriber| subscriber.id != id);
        }

        // Remove the topic if there are no more subscribers
        let mut topics_to_remove = Vec::new();
        for (topic, _) in subscriptions.iter() {
            log::info!(
                "Subcription Manager topic: {} has {} subscribers",
                topic,
                subscriptions.get(topic).unwrap().len()
            );
            if subscriptions.get(topic).map_or(true, |v| v.is_empty()) {
                topics_to_remove.push(topic.clone());
            }
        }
        for topic in topics_to_remove {
            subscriptions.remove(&topic);
        }

        // Send a response to acknowledge unsubscription
        let _ = respond_to.send(SubscriptionResponse { id: id });
    }

    /// Broadcasts a message to all subscribers of a topic.
    ///
    /// # Parameters
    /// - `topic`: The topic to broadcast the message to.
    /// - `message`: The message to broadcast.
    ///
    /// # Returns
    /// - `Result<(), anyhow::Error>`: An empty result or an error if the broadcast fails.
    async fn broadcast(&self, topic: String, message: R) -> Result<(), anyhow::Error> {
        log::debug!("Subcription Manager checking topic: {}", topic);

        let subscriptions = self.subscriptions.lock().await;

        for (k, v) in subscriptions.iter() {
            log::debug!(
                "Subcription Manager topic: {} has {} subscribers",
                k,
                v.len()
            );

            if k == topic.as_str() {
                log::debug!("k {} == topic {}", k, topic);
            } else {
                log::debug!("k {} != topic {}", k, topic);
            }
        }

        if let Some(subscribers) = subscriptions.get(&topic) {
            for subscriber in subscribers {
                log::debug!("Subcription Checking filter on: {}", topic);

                if subscriber.filter.matches(&message) {
                    log::debug!("Subcription Manager broadcasting topic: {}", topic);

                    return (self.broadcast_callback)(topic.clone(), message.clone()).await;
                    // if let Err(err) = subscriber.broadcast_tx.send(message.clone()).await {
                    //     log::error!("SubscriptionManager broadcast failed: {}", err);
                    // }
                }
            }
        } else {
            log::debug!("Subcription Manager no subscribers for topic: {}", topic);
        }
        Ok(())
    }
}