robotrt-obs-core 0.1.0-beta.2

RobotRT modular robotics runtime and middleware components.
Documentation
use crate::alert::{Alert, AlertSeverity};

/// Receives alerts dispatched by the bus.
pub trait AlertHandler: Send {
    /// Called synchronously when an alert is dispatched.
    fn on_alert(&mut self, alert: &Alert);
}

/// Filtering predicate - returns `true` if this alert should be delivered.
pub type AlertFilter = Box<dyn Fn(&Alert) -> bool + Send>;

struct Subscription {
    handler: Box<dyn AlertHandler>,
    min_severity: AlertSeverity,
    source_filter: Option<String>,
}

impl Subscription {
    fn matches(&self, alert: &Alert) -> bool {
        if alert.severity < self.min_severity {
            return false;
        }
        if let Some(ref src) = self.source_filter
            && alert.source != *src
        {
            return false;
        }
        true
    }
}

/// Central alert bus.
///
/// Components fire alerts via [`AlertBus::fire`]; subscribers receive them
/// synchronously in the same call.
///
/// # Example
/// ```
/// use obs_core::bus::AlertBus;
/// let bus = AlertBus::new();
/// assert_eq!(bus.subscriber_count(), 0);
/// ```
pub struct AlertBus {
    subscriptions: Vec<Subscription>,
    /// All alerts ever fired (bounded by `history_limit`).
    history: Vec<Alert>,
    history_limit: usize,
}

impl AlertBus {
    /// Create a new bus with the default history limit (1 000 entries).
    pub fn new() -> Self {
        Self::with_history_limit(1_000)
    }

    pub fn with_history_limit(limit: usize) -> Self {
        Self {
            subscriptions: Vec::new(),
            history: Vec::new(),
            history_limit: limit,
        }
    }

    /// Register a handler that receives all alerts at or above `min_severity`.
    /// If `source` is `Some`, only alerts from that source component are delivered.
    pub fn subscribe(
        &mut self,
        handler: Box<dyn AlertHandler>,
        min_severity: AlertSeverity,
        source: Option<String>,
    ) {
        self.subscriptions.push(Subscription {
            handler,
            min_severity,
            source_filter: source,
        });
    }

    /// Dispatch `alert` to all matching subscribers and add it to history.
    pub fn fire(&mut self, alert: Alert) {
        for sub in &mut self.subscriptions {
            if sub.matches(&alert) {
                sub.handler.on_alert(&alert);
            }
        }
        if self.history.len() >= self.history_limit {
            self.history.remove(0);
        }
        self.history.push(alert);
    }

    /// All alerts in history (oldest first).
    pub fn history(&self) -> &[Alert] {
        &self.history
    }

    /// History filtered to `severity >= min`.
    pub fn history_at_or_above(&self, min: AlertSeverity) -> impl Iterator<Item = &Alert> {
        self.history.iter().filter(move |a| a.severity >= min)
    }

    /// History filtered to a single `source`.
    pub fn history_for_source<'a>(&'a self, source: &str) -> impl Iterator<Item = &'a Alert> {
        self.history.iter().filter(move |a| a.source == source)
    }

    pub fn subscriber_count(&self) -> usize {
        self.subscriptions.len()
    }
}

impl Default for AlertBus {
    fn default() -> Self {
        Self::new()
    }
}