robotrt-middleware-core 0.1.0-beta.2

RobotRT modular robotics runtime and middleware components.
Documentation
use std::any::Any;
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex};
use std::time::Instant;

use crate::TopicReliabilityPolicy;

mod service;
mod topic;

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct TopicLoadEntry {
    pub topic: String,
    pub pending: usize,
    pub max_depth: usize,
    pub dropped_messages: usize,
    pub lag_messages: usize,
    pub retry_inflight: usize,
    pub replay_attempts: usize,
    pub degraded_subscribers: usize,
}

impl TopicLoadEntry {
    pub fn utilization_ratio(&self) -> f64 {
        if self.max_depth == 0 {
            return 0.0;
        }
        self.pending as f64 / self.max_depth as f64
    }
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ServiceLoadEntry {
    pub service: String,
    pub pending_requests: usize,
    pub pending_responses: usize,
}

// ─── Topic Bus ────────────────────────────────────────────────────────────────

/// Shared, type-erased FIFO slot that connects a [`BasicPublisher`] to one or
/// more [`BasicSubscriber`]s on the same topic within a single process.
pub struct TopicSlot {
    state: Mutex<TopicSlotState>,
    max_depth: usize,
}

struct TopicSlotState {
    queue: VecDeque<(u64, Box<dyn Any + Send>)>,
    head_sequence: u64,
    next_sequence: u64,
    dropped_messages: usize,
    reliability_policy: TopicReliabilityPolicy,
    subscribers: HashMap<u64, TopicSubscriberState>,
    named_subscribers: HashMap<String, u64>,
    next_subscriber_id: u64,
}

impl TopicSlotState {
    fn new(reliability_policy: TopicReliabilityPolicy) -> Self {
        Self {
            queue: VecDeque::new(),
            head_sequence: 1,
            next_sequence: 1,
            dropped_messages: 0,
            reliability_policy,
            subscribers: HashMap::new(),
            named_subscribers: HashMap::new(),
            next_subscriber_id: 1,
        }
    }
}

#[derive(Clone, Debug)]
struct TopicSubscriberState {
    next_sequence: u64,
    reliable: bool,
    degraded_by_policy: bool,
    inflight: VecDeque<InflightDelivery>,
}

#[derive(Clone, Copy, Debug)]
struct InflightDelivery {
    sequence: u64,
    last_sent_at: Instant,
    retry_count: u8,
}

/// In-process topic bus: maps fully-resolved topic names to shared slots.
pub struct TopicBus {
    slots: HashMap<String, Arc<TopicSlot>>,
    default_depth: usize,
    reliability_policy: TopicReliabilityPolicy,
}

impl Default for TopicBus {
    fn default() -> Self {
        Self {
            slots: HashMap::new(),
            default_depth: 16,
            reliability_policy: TopicReliabilityPolicy::default(),
        }
    }
}

// ─── Service Bus ──────────────────────────────────────────────────────────────

/// Shared request/response channel between a [`BasicServiceClient`] and a
/// [`BasicServiceServer`] for the same service name within a single process.
pub struct ServiceChannel {
    requests: Mutex<VecDeque<(u64, Box<dyn Any + Send>)>>,
    responses: Mutex<HashMap<u64, Box<dyn Any + Send>>>,
}

impl Default for ServiceChannel {
    fn default() -> Self {
        Self {
            requests: Mutex::new(VecDeque::new()),
            responses: Mutex::new(HashMap::new()),
        }
    }
}

/// In-process service bus: maps service names to shared channels.
#[derive(Default)]
pub struct ServiceBus {
    channels: HashMap<String, Arc<ServiceChannel>>,
}