Skip to main content

middleware_core/
bus.rs

1use std::any::Any;
2use std::collections::{HashMap, VecDeque};
3use std::sync::{Arc, Mutex};
4use std::time::Instant;
5
6use crate::TopicReliabilityPolicy;
7
8mod service;
9mod topic;
10
11#[derive(Clone, Debug, Eq, PartialEq)]
12pub struct TopicLoadEntry {
13    pub topic: String,
14    pub pending: usize,
15    pub max_depth: usize,
16    pub dropped_messages: usize,
17    pub lag_messages: usize,
18    pub retry_inflight: usize,
19    pub replay_attempts: usize,
20    pub degraded_subscribers: usize,
21}
22
23impl TopicLoadEntry {
24    pub fn utilization_ratio(&self) -> f64 {
25        if self.max_depth == 0 {
26            return 0.0;
27        }
28        self.pending as f64 / self.max_depth as f64
29    }
30}
31
32#[derive(Clone, Debug, Eq, PartialEq)]
33pub struct ServiceLoadEntry {
34    pub service: String,
35    pub pending_requests: usize,
36    pub pending_responses: usize,
37}
38
39// ─── Topic Bus ────────────────────────────────────────────────────────────────
40
41/// Shared, type-erased FIFO slot that connects a [`BasicPublisher`] to one or
42/// more [`BasicSubscriber`]s on the same topic within a single process.
43pub struct TopicSlot {
44    state: Mutex<TopicSlotState>,
45    max_depth: usize,
46}
47
48struct TopicSlotState {
49    queue: VecDeque<(u64, Box<dyn Any + Send>)>,
50    head_sequence: u64,
51    next_sequence: u64,
52    dropped_messages: usize,
53    reliability_policy: TopicReliabilityPolicy,
54    subscribers: HashMap<u64, TopicSubscriberState>,
55    named_subscribers: HashMap<String, u64>,
56    next_subscriber_id: u64,
57}
58
59impl TopicSlotState {
60    fn new(reliability_policy: TopicReliabilityPolicy) -> Self {
61        Self {
62            queue: VecDeque::new(),
63            head_sequence: 1,
64            next_sequence: 1,
65            dropped_messages: 0,
66            reliability_policy,
67            subscribers: HashMap::new(),
68            named_subscribers: HashMap::new(),
69            next_subscriber_id: 1,
70        }
71    }
72}
73
74#[derive(Clone, Debug)]
75struct TopicSubscriberState {
76    next_sequence: u64,
77    reliable: bool,
78    degraded_by_policy: bool,
79    inflight: VecDeque<InflightDelivery>,
80}
81
82#[derive(Clone, Copy, Debug)]
83struct InflightDelivery {
84    sequence: u64,
85    last_sent_at: Instant,
86    retry_count: u8,
87}
88
89/// In-process topic bus: maps fully-resolved topic names to shared slots.
90pub struct TopicBus {
91    slots: HashMap<String, Arc<TopicSlot>>,
92    default_depth: usize,
93    reliability_policy: TopicReliabilityPolicy,
94}
95
96impl Default for TopicBus {
97    fn default() -> Self {
98        Self {
99            slots: HashMap::new(),
100            default_depth: 16,
101            reliability_policy: TopicReliabilityPolicy::default(),
102        }
103    }
104}
105
106// ─── Service Bus ──────────────────────────────────────────────────────────────
107
108/// Shared request/response channel between a [`BasicServiceClient`] and a
109/// [`BasicServiceServer`] for the same service name within a single process.
110pub struct ServiceChannel {
111    requests: Mutex<VecDeque<(u64, Box<dyn Any + Send>)>>,
112    responses: Mutex<HashMap<u64, Box<dyn Any + Send>>>,
113}
114
115impl Default for ServiceChannel {
116    fn default() -> Self {
117        Self {
118            requests: Mutex::new(VecDeque::new()),
119            responses: Mutex::new(HashMap::new()),
120        }
121    }
122}
123
124/// In-process service bus: maps service names to shared channels.
125#[derive(Default)]
126pub struct ServiceBus {
127    channels: HashMap<String, Arc<ServiceChannel>>,
128}