robotrt-middleware-core 0.1.0-beta.1

RobotRT modular robotics runtime and middleware components.
Documentation
use std::any::Any;
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};

use core_types::BackpressureSignal;

use super::{TopicBus, TopicLoadEntry, TopicSlot};

impl TopicSlot {
    pub fn new(max_depth: usize) -> Self {
        Self {
            queue: Mutex::new(VecDeque::new()),
            max_depth,
        }
    }

    /// Push a message into the slot. Returns `true` when the slot was already
    /// at capacity (backpressure) and the message is not enqueued.
    pub fn push(&self, msg: Box<dyn Any + Send>) -> bool {
        let mut q = self.queue.lock().expect("topic slot lock poisoned");
        if q.len() >= self.max_depth {
            return true;
        }
        q.push_back(msg);
        false
    }

    /// Push a batch of messages and return the number accepted before
    /// reaching capacity.
    pub fn push_batch<I>(&self, msgs: I) -> usize
    where
        I: IntoIterator<Item = Box<dyn Any + Send>>,
    {
        let mut q = self.queue.lock().expect("topic slot lock poisoned");
        let mut accepted = 0usize;
        for msg in msgs {
            if q.len() >= self.max_depth {
                break;
            }
            q.push_back(msg);
            accepted += 1;
        }
        accepted
    }

    /// Pop the oldest message, if any.
    pub fn pop(&self) -> Option<Box<dyn Any + Send>> {
        self.queue
            .lock()
            .expect("topic slot lock poisoned")
            .pop_front()
    }

    /// Pop up to `max_items` messages, preserving FIFO order.
    pub fn pop_batch(&self, max_items: usize) -> Vec<Box<dyn Any + Send>> {
        if max_items == 0 {
            return Vec::new();
        }

        let mut q = self.queue.lock().expect("topic slot lock poisoned");
        let mut out = Vec::with_capacity(max_items.min(q.len()));
        for _ in 0..max_items {
            if let Some(msg) = q.pop_front() {
                out.push(msg);
            } else {
                break;
            }
        }
        out
    }

    /// Number of messages currently waiting in the slot.
    pub fn pending_count(&self) -> usize {
        self.queue.lock().expect("topic slot lock poisoned").len()
    }

    /// Remaining capacity before hitting queue depth limit.
    pub fn remaining_capacity(&self) -> usize {
        self.max_depth.saturating_sub(self.pending_count())
    }

    /// Backpressure state derived from queue utilization.
    /// - Hard: queue full
    /// - Soft: queue utilization >= 80%
    /// - Clear: otherwise
    pub fn backpressure_signal(&self) -> BackpressureSignal {
        if self.max_depth == 0 {
            return BackpressureSignal::Hard;
        }

        let pending = self.pending_count();
        if pending >= self.max_depth {
            return BackpressureSignal::Hard;
        }

        let utilization = pending as f64 / self.max_depth as f64;
        if utilization >= 0.8 {
            BackpressureSignal::Soft
        } else {
            BackpressureSignal::Clear
        }
    }

    pub fn max_depth(&self) -> usize {
        self.max_depth
    }
}

impl TopicBus {
    /// Get or create the slot for `topic` using the caller-supplied `depth`.
    /// Once a slot is created its depth is fixed for the lifetime of the bus.
    pub fn get_or_create(&mut self, topic: &str, depth: usize) -> Arc<TopicSlot> {
        self.slots
            .entry(topic.to_string())
            .or_insert_with(|| Arc::new(TopicSlot::new(depth)))
            .clone()
    }

    /// Get or create with the bus-wide default depth.
    pub fn get_or_create_default(&mut self, topic: &str) -> Arc<TopicSlot> {
        let depth = self.default_depth;
        self.get_or_create(topic, depth)
    }

    pub fn load_entries(&self) -> Vec<TopicLoadEntry> {
        let mut out = self
            .slots
            .iter()
            .map(|(topic, slot)| TopicLoadEntry {
                topic: topic.clone(),
                pending: slot.pending_count(),
                max_depth: slot.max_depth(),
            })
            .collect::<Vec<_>>();
        out.sort_by(|a, b| a.topic.cmp(&b.topic));
        out
    }
}