Skip to main content

middleware_core/bus/
topic.rs

1use std::any::Any;
2use std::sync::Arc;
3use std::time::Instant;
4
5use core_types::BackpressureSignal;
6
7use crate::{ReplayDegradeStrategy, TopicReliabilityPolicy};
8
9use super::{InflightDelivery, TopicBus, TopicLoadEntry, TopicSlot, TopicSubscriberState};
10
11impl TopicSlot {
12    pub fn new(max_depth: usize) -> Self {
13        Self::new_with_reliability_policy(max_depth, TopicReliabilityPolicy::default())
14    }
15
16    pub fn new_with_reliability_policy(
17        max_depth: usize,
18        reliability_policy: TopicReliabilityPolicy,
19    ) -> Self {
20        Self {
21            state: std::sync::Mutex::new(super::TopicSlotState::new(reliability_policy)),
22            max_depth,
23        }
24    }
25
26    pub fn set_reliability_policy(&self, reliability_policy: TopicReliabilityPolicy) {
27        let mut state = self.state.lock().expect("topic slot lock poisoned");
28        state.reliability_policy = reliability_policy;
29        Self::apply_virtual_replay_governance(&mut state);
30        Self::garbage_collect(&mut state);
31    }
32
33    /// Push a message into the slot. Returns `true` when the slot was already
34    /// at capacity (backpressure) and the message is not enqueued.
35    pub fn push(&self, msg: Box<dyn Any + Send>) -> bool {
36        let mut state = self.state.lock().expect("topic slot lock poisoned");
37        if state.queue.len() >= self.max_depth {
38            return true;
39        }
40
41        let seq = state.next_sequence;
42        state.next_sequence = state.next_sequence.saturating_add(1);
43        state.queue.push_back((seq, msg));
44        if state.queue.len() == 1 {
45            state.head_sequence = seq;
46        }
47        false
48    }
49
50    /// Push a message in best-effort mode.
51    ///
52    /// When full, the oldest queued item is dropped to make room so publishers
53    /// are not globally blocked by queue capacity.
54    pub fn push_best_effort(&self, msg: Box<dyn Any + Send>) -> bool {
55        let mut state = self.state.lock().expect("topic slot lock poisoned");
56        if self.max_depth == 0 {
57            return true;
58        }
59
60        if state.queue.len() >= self.max_depth {
61            state.queue.pop_front();
62            state.dropped_messages = state.dropped_messages.saturating_add(1);
63            Self::advance_head(&mut state);
64        }
65
66        let seq = state.next_sequence;
67        state.next_sequence = state.next_sequence.saturating_add(1);
68        state.queue.push_back((seq, msg));
69        if state.queue.len() == 1 {
70            state.head_sequence = seq;
71        }
72        false
73    }
74
75    /// Push a batch of messages and return the number accepted before
76    /// reaching capacity.
77    pub fn push_batch<I>(&self, msgs: I) -> usize
78    where
79        I: IntoIterator<Item = Box<dyn Any + Send>>,
80    {
81        let mut state = self.state.lock().expect("topic slot lock poisoned");
82        let mut accepted = 0usize;
83        for msg in msgs {
84            if state.queue.len() >= self.max_depth {
85                break;
86            }
87
88            let seq = state.next_sequence;
89            state.next_sequence = state.next_sequence.saturating_add(1);
90            state.queue.push_back((seq, msg));
91            if state.queue.len() == 1 {
92                state.head_sequence = seq;
93            }
94            accepted += 1;
95        }
96        accepted
97    }
98
99    /// Push a batch in best-effort mode.
100    ///
101    /// When full, the oldest queued item is dropped for each incoming item.
102    pub fn push_batch_best_effort<I>(&self, msgs: I) -> usize
103    where
104        I: IntoIterator<Item = Box<dyn Any + Send>>,
105    {
106        let mut state = self.state.lock().expect("topic slot lock poisoned");
107        if self.max_depth == 0 {
108            return 0;
109        }
110
111        let mut accepted = 0usize;
112        for msg in msgs {
113            if state.queue.len() >= self.max_depth {
114                state.queue.pop_front();
115                state.dropped_messages = state.dropped_messages.saturating_add(1);
116            }
117
118            let seq = state.next_sequence;
119            state.next_sequence = state.next_sequence.saturating_add(1);
120            state.queue.push_back((seq, msg));
121            if state.queue.len() == 1 {
122                state.head_sequence = seq;
123            }
124            accepted += 1;
125        }
126        Self::advance_head(&mut state);
127        accepted
128    }
129
130    pub fn register_subscriber(&self) -> u64 {
131        self.register_subscriber_with_policy(false)
132    }
133
134    pub fn register_subscriber_with_policy(&self, reliable: bool) -> u64 {
135        let mut state = self.state.lock().expect("topic slot lock poisoned");
136        let id = state.next_subscriber_id;
137        state.next_subscriber_id = state.next_subscriber_id.saturating_add(1);
138        let start_sequence = state.next_sequence;
139        state.subscribers.insert(
140            id,
141            TopicSubscriberState {
142                next_sequence: start_sequence,
143                reliable,
144                degraded_by_policy: false,
145                inflight: std::collections::VecDeque::new(),
146            },
147        );
148        id
149    }
150
151    pub fn unregister_subscriber(&self, subscriber_id: u64) {
152        let mut state = self.state.lock().expect("topic slot lock poisoned");
153        state.subscribers.remove(&subscriber_id);
154        Self::garbage_collect(&mut state);
155    }
156
157    pub fn reconcile_virtual_reliable_subscribers<I, S>(&self, desired: I)
158    where
159        I: IntoIterator<Item = (S, Option<u64>)>,
160        S: Into<String>,
161    {
162        let mut state = self.state.lock().expect("topic slot lock poisoned");
163        let desired = desired
164            .into_iter()
165            .map(|(name, acked_seq)| (name.into(), acked_seq))
166            .collect::<std::collections::HashMap<String, Option<u64>>>();
167
168        for (name, acked_seq) in &desired {
169            if let Some(subscriber_id) = state.named_subscribers.get(name).copied() {
170                let target_next = acked_seq.map(|seq| {
171                    Self::clamp_next_sequence(
172                        seq.saturating_add(1),
173                        state.head_sequence,
174                        state.next_sequence,
175                    )
176                });
177                if let Some(subscriber) = state.subscribers.get_mut(&subscriber_id) {
178                    subscriber.reliable = true;
179                    subscriber.inflight.clear();
180                    if let Some(target_next) = target_next
181                        && target_next > subscriber.next_sequence
182                    {
183                        subscriber.next_sequence = target_next;
184                    }
185                }
186                continue;
187            }
188
189            let subscriber_id = state.next_subscriber_id;
190            state.next_subscriber_id = state.next_subscriber_id.saturating_add(1);
191            let mut next_sequence = state.next_sequence;
192            if let Some(acked_seq) = acked_seq {
193                next_sequence = Self::clamp_next_sequence(
194                    acked_seq.saturating_add(1),
195                    state.head_sequence,
196                    state.next_sequence,
197                );
198            }
199
200            state.subscribers.insert(
201                subscriber_id,
202                TopicSubscriberState {
203                    next_sequence,
204                    reliable: true,
205                    degraded_by_policy: false,
206                    inflight: std::collections::VecDeque::new(),
207                },
208            );
209            state.named_subscribers.insert(name.clone(), subscriber_id);
210        }
211
212        let stale_keys = state
213            .named_subscribers
214            .keys()
215            .filter(|name| !desired.contains_key(*name))
216            .cloned()
217            .collect::<Vec<_>>();
218
219        for name in stale_keys {
220            if let Some(subscriber_id) = state.named_subscribers.remove(&name) {
221                state.subscribers.remove(&subscriber_id);
222            }
223        }
224
225        Self::apply_virtual_replay_governance(&mut state);
226        Self::garbage_collect(&mut state);
227    }
228
229    pub fn pop_for<T: Any + Clone + Send + 'static>(&self, subscriber_id: u64) -> Option<T> {
230        let mut state = self.state.lock().expect("topic slot lock poisoned");
231        let now = Instant::now();
232        let retry_timeout = state.reliability_policy.retry_timeout;
233        let max_retry = state.reliability_policy.max_retry;
234        let max_inflight_per_subscriber = state.reliability_policy.max_inflight_per_subscriber;
235        let evicted = Self::evict_exhausted_inflight(
236            &mut state,
237            subscriber_id,
238            now,
239            retry_timeout,
240            max_retry,
241        );
242        if evicted > 0 {
243            state.dropped_messages = state.dropped_messages.saturating_add(evicted);
244            Self::garbage_collect(&mut state);
245        }
246
247        let subscriber = state.subscribers.get(&subscriber_id)?;
248        let reliable = subscriber.reliable;
249        let next_sequence = subscriber.next_sequence;
250        let inflight_len = subscriber.inflight.len();
251
252        if reliable && inflight_len >= max_inflight_per_subscriber {
253            if let Some(retry_seq) = Self::next_retry_sequence(
254                subscriber,
255                now,
256                retry_timeout,
257                max_retry,
258            ) {
259                return Self::deliver_retry::<T>(&mut state, subscriber_id, retry_seq);
260            }
261            return None;
262        }
263
264        if reliable
265            && let Some(retry_seq) = Self::next_retry_sequence(
266                subscriber,
267                now,
268                retry_timeout,
269                max_retry,
270            )
271        {
272            return Self::deliver_retry::<T>(&mut state, subscriber_id, retry_seq);
273        }
274
275        let mut target = next_sequence;
276        if target < state.head_sequence {
277            target = state.head_sequence;
278        }
279
280        let offset = target.saturating_sub(state.head_sequence) as usize;
281        let (seq, boxed) = state.queue.get(offset)?;
282        if *seq != target {
283            return None;
284        }
285
286        let value = boxed.downcast_ref::<T>()?.clone();
287        if let Some(cursor) = state.subscribers.get_mut(&subscriber_id) {
288            cursor.next_sequence = target.saturating_add(1);
289            if cursor.reliable {
290                cursor.inflight.push_back(InflightDelivery {
291                    sequence: target,
292                    last_sent_at: Instant::now(),
293                    retry_count: 0,
294                });
295            }
296        }
297        Self::garbage_collect(&mut state);
298        Some(value)
299    }
300
301    pub fn ack_reliable_for(&self, subscriber_id: u64) -> Option<u64> {
302        let mut state = self.state.lock().expect("topic slot lock poisoned");
303        let Some(subscriber) = state.subscribers.get_mut(&subscriber_id) else {
304            return None;
305        };
306
307        if !subscriber.reliable {
308            return None;
309        }
310
311        let acknowledged = subscriber.inflight.pop_front().map(|delivery| delivery.sequence);
312        if acknowledged.is_some() {
313            Self::garbage_collect(&mut state);
314        }
315        acknowledged
316    }
317
318    pub fn ack_for(&self, subscriber_id: u64) -> bool {
319        let mut state = self.state.lock().expect("topic slot lock poisoned");
320        let Some(subscriber) = state.subscribers.get_mut(&subscriber_id) else {
321            return false;
322        };
323
324        if !subscriber.reliable {
325            return true;
326        }
327
328        let acknowledged = subscriber.inflight.pop_front().is_some();
329        if acknowledged {
330            Self::garbage_collect(&mut state);
331        }
332        acknowledged
333    }
334
335    pub fn pop_batch_for<T: Any + Clone + Send + 'static>(
336        &self,
337        subscriber_id: u64,
338        max_items: usize,
339    ) -> Vec<T> {
340        if max_items == 0 {
341            return Vec::new();
342        }
343
344        let mut out = Vec::with_capacity(max_items);
345        for _ in 0..max_items {
346            if let Some(item) = self.pop_for::<T>(subscriber_id) {
347                out.push(item);
348            } else {
349                break;
350            }
351        }
352        out
353    }
354
355    pub fn pending_count_for(&self, subscriber_id: u64) -> usize {
356        let state = self.state.lock().expect("topic slot lock poisoned");
357        let Some(cursor) = state.subscribers.get(&subscriber_id) else {
358            return 0;
359        };
360
361        let effective_next = cursor.retain_from_sequence().max(state.head_sequence);
362        if effective_next >= state.next_sequence {
363            0
364        } else {
365            (state.next_sequence - effective_next) as usize
366        }
367    }
368
369    /// Pop the oldest message, if any.
370    pub fn pop(&self) -> Option<Box<dyn Any + Send>> {
371        let mut state = self.state.lock().expect("topic slot lock poisoned");
372        if !state.subscribers.is_empty() {
373            return None;
374        }
375
376        let popped = state.queue.pop_front().map(|(_, msg)| msg);
377        Self::advance_head(&mut state);
378        popped
379    }
380
381    /// Pop up to `max_items` messages, preserving FIFO order.
382    pub fn pop_batch(&self, max_items: usize) -> Vec<Box<dyn Any + Send>> {
383        if max_items == 0 {
384            return Vec::new();
385        }
386
387        let mut state = self.state.lock().expect("topic slot lock poisoned");
388        if !state.subscribers.is_empty() {
389            return Vec::new();
390        }
391
392        let mut out = Vec::with_capacity(max_items.min(state.queue.len()));
393        for _ in 0..max_items {
394            if let Some((_, msg)) = state.queue.pop_front() {
395                out.push(msg);
396            } else {
397                break;
398            }
399        }
400        Self::advance_head(&mut state);
401        out
402    }
403
404    /// Number of messages currently waiting in the slot.
405    pub fn pending_count(&self) -> usize {
406        self.state
407            .lock()
408            .expect("topic slot lock poisoned")
409            .queue
410            .len()
411    }
412
413    /// Remaining capacity before hitting queue depth limit.
414    pub fn remaining_capacity(&self) -> usize {
415        self.max_depth.saturating_sub(self.pending_count())
416    }
417
418    /// Backpressure state derived from queue utilization.
419    /// - Hard: queue full
420    /// - Soft: queue utilization >= 80%
421    /// - Clear: otherwise
422    pub fn backpressure_signal(&self) -> BackpressureSignal {
423        if self.max_depth == 0 {
424            return BackpressureSignal::Hard;
425        }
426
427        let pending = self.pending_count();
428        if pending >= self.max_depth {
429            return BackpressureSignal::Hard;
430        }
431
432        let utilization = pending as f64 / self.max_depth as f64;
433        if utilization >= 0.8 {
434            BackpressureSignal::Soft
435        } else {
436            BackpressureSignal::Clear
437        }
438    }
439
440    pub fn max_depth(&self) -> usize {
441        self.max_depth
442    }
443
444    pub fn load_entry(&self, topic: &str) -> TopicLoadEntry {
445        let state = self.state.lock().expect("topic slot lock poisoned");
446
447        let mut lag_messages = 0usize;
448        let mut retry_inflight = 0usize;
449        let mut replay_attempts = 0usize;
450        let mut degraded_subscribers = 0usize;
451        let max_inflight_per_subscriber = state.reliability_policy.max_inflight_per_subscriber;
452
453        for subscriber in state.subscribers.values() {
454            if !subscriber.reliable {
455                continue;
456            }
457
458            let effective_next = subscriber.retain_from_sequence().max(state.head_sequence);
459            let lag = if effective_next >= state.next_sequence {
460                0
461            } else {
462                (state.next_sequence - effective_next) as usize
463            };
464            lag_messages = lag_messages.max(lag);
465
466            retry_inflight = retry_inflight.saturating_add(subscriber.inflight.len());
467            replay_attempts = replay_attempts.saturating_add(
468                subscriber
469                    .inflight
470                    .iter()
471                    .map(|item| item.retry_count as usize)
472                    .sum::<usize>(),
473            );
474
475            let has_replay = subscriber.inflight.iter().any(|item| item.retry_count > 0);
476            let stalled = subscriber.inflight.len() >= max_inflight_per_subscriber;
477            if has_replay || stalled || subscriber.degraded_by_policy {
478                degraded_subscribers = degraded_subscribers.saturating_add(1);
479            }
480        }
481
482        TopicLoadEntry {
483            topic: topic.to_string(),
484            pending: state.queue.len(),
485            max_depth: self.max_depth,
486            dropped_messages: state.dropped_messages,
487            lag_messages,
488            retry_inflight,
489            replay_attempts,
490            degraded_subscribers,
491        }
492    }
493
494    fn garbage_collect(state: &mut super::TopicSlotState) {
495        if state.subscribers.is_empty() {
496            Self::advance_head(state);
497            return;
498        }
499
500        let min_next = state
501            .subscribers
502            .values()
503            .map(TopicSubscriberState::retain_from_sequence)
504            .min()
505            .unwrap_or(state.next_sequence);
506
507        while let Some((seq, _)) = state.queue.front() {
508            if *seq < min_next {
509                state.queue.pop_front();
510            } else {
511                break;
512            }
513        }
514
515        Self::advance_head(state);
516    }
517
518    fn advance_head(state: &mut super::TopicSlotState) {
519        if let Some((seq, _)) = state.queue.front() {
520            state.head_sequence = *seq;
521        } else {
522            state.head_sequence = state.next_sequence;
523        }
524    }
525
526    fn next_retry_sequence(
527        subscriber: &TopicSubscriberState,
528        now: Instant,
529        retry_timeout: std::time::Duration,
530        max_retry: u8,
531    ) -> Option<u64> {
532        subscriber
533            .inflight
534            .iter()
535            .find(|item| {
536                item.retry_count < max_retry
537                    && now.duration_since(item.last_sent_at) >= retry_timeout
538            })
539            .map(|item| item.sequence)
540    }
541
542    fn deliver_retry<T: Any + Clone + Send + 'static>(
543        state: &mut super::TopicSlotState,
544        subscriber_id: u64,
545        retry_seq: u64,
546    ) -> Option<T> {
547        let offset = retry_seq.saturating_sub(state.head_sequence) as usize;
548        let (_, boxed) = state.queue.get(offset)?;
549        let value = boxed.downcast_ref::<T>()?.clone();
550
551        if let Some(subscriber) = state.subscribers.get_mut(&subscriber_id)
552            && let Some(item) = subscriber
553                .inflight
554                .iter_mut()
555                .find(|item| item.sequence == retry_seq)
556        {
557            item.retry_count = item.retry_count.saturating_add(1);
558            item.last_sent_at = Instant::now();
559        }
560        Some(value)
561    }
562
563    fn evict_exhausted_inflight(
564        state: &mut super::TopicSlotState,
565        subscriber_id: u64,
566        now: Instant,
567        retry_timeout: std::time::Duration,
568        max_retry: u8,
569    ) -> usize {
570        let Some(subscriber) = state.subscribers.get_mut(&subscriber_id) else {
571            return 0;
572        };
573
574        if !subscriber.reliable {
575            return 0;
576        }
577
578        let mut evicted_count = 0usize;
579        while let Some(front) = subscriber.inflight.front() {
580            let exhausted = front.retry_count >= max_retry;
581            let retry_due = now.duration_since(front.last_sent_at) >= retry_timeout;
582            if exhausted && retry_due {
583                subscriber.inflight.pop_front();
584                evicted_count = evicted_count.saturating_add(1);
585            } else {
586                break;
587            }
588        }
589
590        evicted_count
591    }
592
593    fn clamp_next_sequence(next: u64, head_sequence: u64, max_sequence: u64) -> u64 {
594        next.max(head_sequence).min(max_sequence)
595    }
596
597    fn apply_virtual_replay_governance(state: &mut super::TopicSlotState) {
598        let Some(window) = state.reliability_policy.replay_window else {
599            for subscriber_id in state.named_subscribers.values() {
600                if let Some(subscriber) = state.subscribers.get_mut(subscriber_id) {
601                    subscriber.degraded_by_policy = false;
602                }
603            }
604            return;
605        };
606
607        let floor = state.next_sequence.saturating_sub(window as u64);
608        let mut dropped_total = 0usize;
609        for subscriber_id in state.named_subscribers.values() {
610            let Some(subscriber) = state.subscribers.get_mut(subscriber_id) else {
611                continue;
612            };
613
614            let over_window = subscriber.next_sequence < floor;
615            subscriber.degraded_by_policy = over_window;
616
617            if !over_window {
618                continue;
619            }
620
621            if state.reliability_policy.replay_degrade_strategy == ReplayDegradeStrategy::DropOldest {
622                let dropped = floor.saturating_sub(subscriber.next_sequence) as usize;
623                if dropped > 0 {
624                    dropped_total = dropped_total.saturating_add(dropped);
625                    subscriber.next_sequence = floor;
626                    subscriber.inflight.clear();
627                }
628            }
629        }
630
631        if dropped_total > 0 {
632            state.dropped_messages = state.dropped_messages.saturating_add(dropped_total);
633        }
634    }
635}
636
637impl TopicSubscriberState {
638    fn retain_from_sequence(&self) -> u64 {
639        self.inflight
640            .front()
641            .map(|item| item.sequence)
642            .unwrap_or(self.next_sequence)
643    }
644}
645
646impl TopicBus {
647    /// Get or create the slot for `topic` using the caller-supplied `depth`.
648    /// Once a slot is created its depth is fixed for the lifetime of the bus.
649    pub fn get_or_create(&mut self, topic: &str, depth: usize) -> Arc<TopicSlot> {
650        let reliability_policy = self.reliability_policy;
651        self.slots
652            .entry(topic.to_string())
653            .or_insert_with(|| {
654                Arc::new(TopicSlot::new_with_reliability_policy(
655                    depth,
656                    reliability_policy,
657                ))
658            })
659            .clone()
660    }
661
662    /// Get or create with the bus-wide default depth.
663    pub fn get_or_create_default(&mut self, topic: &str) -> Arc<TopicSlot> {
664        let depth = self.default_depth;
665        self.get_or_create(topic, depth)
666    }
667
668    pub fn set_reliability_policy(&mut self, reliability_policy: TopicReliabilityPolicy) {
669        self.reliability_policy = reliability_policy;
670        for slot in self.slots.values() {
671            slot.set_reliability_policy(reliability_policy);
672        }
673    }
674
675    pub fn load_entries(&self) -> Vec<TopicLoadEntry> {
676        let mut out = self
677            .slots
678            .iter()
679            .map(|(topic, slot)| slot.load_entry(topic))
680            .collect::<Vec<_>>();
681        out.sort_by(|a, b| a.topic.cmp(&b.topic));
682        out
683    }
684}