1use std::any::Any;
2use std::collections::{HashMap, VecDeque};
3use std::sync::{Arc, Mutex};
4use std::time::Instant;
5
6use crate::TopicReliabilityPolicy;
7
8mod service;
9mod topic;
10
11#[derive(Clone, Debug, Eq, PartialEq)]
12pub struct TopicLoadEntry {
13 pub topic: String,
14 pub pending: usize,
15 pub max_depth: usize,
16 pub dropped_messages: usize,
17 pub lag_messages: usize,
18 pub retry_inflight: usize,
19 pub replay_attempts: usize,
20 pub degraded_subscribers: usize,
21}
22
23impl TopicLoadEntry {
24 pub fn utilization_ratio(&self) -> f64 {
25 if self.max_depth == 0 {
26 return 0.0;
27 }
28 self.pending as f64 / self.max_depth as f64
29 }
30}
31
32#[derive(Clone, Debug, Eq, PartialEq)]
33pub struct ServiceLoadEntry {
34 pub service: String,
35 pub pending_requests: usize,
36 pub pending_responses: usize,
37}
38
39pub struct TopicSlot {
44 state: Mutex<TopicSlotState>,
45 max_depth: usize,
46}
47
48struct TopicSlotState {
49 queue: VecDeque<(u64, Box<dyn Any + Send>)>,
50 head_sequence: u64,
51 next_sequence: u64,
52 dropped_messages: usize,
53 reliability_policy: TopicReliabilityPolicy,
54 subscribers: HashMap<u64, TopicSubscriberState>,
55 named_subscribers: HashMap<String, u64>,
56 next_subscriber_id: u64,
57}
58
59impl TopicSlotState {
60 fn new(reliability_policy: TopicReliabilityPolicy) -> Self {
61 Self {
62 queue: VecDeque::new(),
63 head_sequence: 1,
64 next_sequence: 1,
65 dropped_messages: 0,
66 reliability_policy,
67 subscribers: HashMap::new(),
68 named_subscribers: HashMap::new(),
69 next_subscriber_id: 1,
70 }
71 }
72}
73
74#[derive(Clone, Debug)]
75struct TopicSubscriberState {
76 next_sequence: u64,
77 reliable: bool,
78 degraded_by_policy: bool,
79 inflight: VecDeque<InflightDelivery>,
80}
81
82#[derive(Clone, Copy, Debug)]
83struct InflightDelivery {
84 sequence: u64,
85 last_sent_at: Instant,
86 retry_count: u8,
87}
88
89pub struct TopicBus {
91 slots: HashMap<String, Arc<TopicSlot>>,
92 default_depth: usize,
93 reliability_policy: TopicReliabilityPolicy,
94}
95
96impl Default for TopicBus {
97 fn default() -> Self {
98 Self {
99 slots: HashMap::new(),
100 default_depth: 16,
101 reliability_policy: TopicReliabilityPolicy::default(),
102 }
103 }
104}
105
106pub struct ServiceChannel {
111 requests: Mutex<VecDeque<(u64, Box<dyn Any + Send>)>>,
112 responses: Mutex<HashMap<u64, Box<dyn Any + Send>>>,
113}
114
115impl Default for ServiceChannel {
116 fn default() -> Self {
117 Self {
118 requests: Mutex::new(VecDeque::new()),
119 responses: Mutex::new(HashMap::new()),
120 }
121 }
122}
123
124#[derive(Default)]
126pub struct ServiceBus {
127 channels: HashMap<String, Arc<ServiceChannel>>,
128}