1use std::any::Any;
2use std::sync::Arc;
3use std::time::Instant;
4
5use core_types::BackpressureSignal;
6
7use crate::{ReplayDegradeStrategy, TopicReliabilityPolicy};
8
9use super::{InflightDelivery, TopicBus, TopicLoadEntry, TopicSlot, TopicSubscriberState};
10
11impl TopicSlot {
12 pub fn new(max_depth: usize) -> Self {
13 Self::new_with_reliability_policy(max_depth, TopicReliabilityPolicy::default())
14 }
15
16 pub fn new_with_reliability_policy(
17 max_depth: usize,
18 reliability_policy: TopicReliabilityPolicy,
19 ) -> Self {
20 Self {
21 state: std::sync::Mutex::new(super::TopicSlotState::new(reliability_policy)),
22 max_depth,
23 }
24 }
25
26 pub fn set_reliability_policy(&self, reliability_policy: TopicReliabilityPolicy) {
27 let mut state = self.state.lock().expect("topic slot lock poisoned");
28 state.reliability_policy = reliability_policy;
29 Self::apply_virtual_replay_governance(&mut state);
30 Self::garbage_collect(&mut state);
31 }
32
33 pub fn push(&self, msg: Box<dyn Any + Send>) -> bool {
36 let mut state = self.state.lock().expect("topic slot lock poisoned");
37 if state.queue.len() >= self.max_depth {
38 return true;
39 }
40
41 let seq = state.next_sequence;
42 state.next_sequence = state.next_sequence.saturating_add(1);
43 state.queue.push_back((seq, msg));
44 if state.queue.len() == 1 {
45 state.head_sequence = seq;
46 }
47 false
48 }
49
50 pub fn push_best_effort(&self, msg: Box<dyn Any + Send>) -> bool {
55 let mut state = self.state.lock().expect("topic slot lock poisoned");
56 if self.max_depth == 0 {
57 return true;
58 }
59
60 if state.queue.len() >= self.max_depth {
61 state.queue.pop_front();
62 state.dropped_messages = state.dropped_messages.saturating_add(1);
63 Self::advance_head(&mut state);
64 }
65
66 let seq = state.next_sequence;
67 state.next_sequence = state.next_sequence.saturating_add(1);
68 state.queue.push_back((seq, msg));
69 if state.queue.len() == 1 {
70 state.head_sequence = seq;
71 }
72 false
73 }
74
75 pub fn push_batch<I>(&self, msgs: I) -> usize
78 where
79 I: IntoIterator<Item = Box<dyn Any + Send>>,
80 {
81 let mut state = self.state.lock().expect("topic slot lock poisoned");
82 let mut accepted = 0usize;
83 for msg in msgs {
84 if state.queue.len() >= self.max_depth {
85 break;
86 }
87
88 let seq = state.next_sequence;
89 state.next_sequence = state.next_sequence.saturating_add(1);
90 state.queue.push_back((seq, msg));
91 if state.queue.len() == 1 {
92 state.head_sequence = seq;
93 }
94 accepted += 1;
95 }
96 accepted
97 }
98
99 pub fn push_batch_best_effort<I>(&self, msgs: I) -> usize
103 where
104 I: IntoIterator<Item = Box<dyn Any + Send>>,
105 {
106 let mut state = self.state.lock().expect("topic slot lock poisoned");
107 if self.max_depth == 0 {
108 return 0;
109 }
110
111 let mut accepted = 0usize;
112 for msg in msgs {
113 if state.queue.len() >= self.max_depth {
114 state.queue.pop_front();
115 state.dropped_messages = state.dropped_messages.saturating_add(1);
116 }
117
118 let seq = state.next_sequence;
119 state.next_sequence = state.next_sequence.saturating_add(1);
120 state.queue.push_back((seq, msg));
121 if state.queue.len() == 1 {
122 state.head_sequence = seq;
123 }
124 accepted += 1;
125 }
126 Self::advance_head(&mut state);
127 accepted
128 }
129
130 pub fn register_subscriber(&self) -> u64 {
131 self.register_subscriber_with_policy(false)
132 }
133
134 pub fn register_subscriber_with_policy(&self, reliable: bool) -> u64 {
135 let mut state = self.state.lock().expect("topic slot lock poisoned");
136 let id = state.next_subscriber_id;
137 state.next_subscriber_id = state.next_subscriber_id.saturating_add(1);
138 let start_sequence = state.next_sequence;
139 state.subscribers.insert(
140 id,
141 TopicSubscriberState {
142 next_sequence: start_sequence,
143 reliable,
144 degraded_by_policy: false,
145 inflight: std::collections::VecDeque::new(),
146 },
147 );
148 id
149 }
150
151 pub fn unregister_subscriber(&self, subscriber_id: u64) {
152 let mut state = self.state.lock().expect("topic slot lock poisoned");
153 state.subscribers.remove(&subscriber_id);
154 Self::garbage_collect(&mut state);
155 }
156
157 pub fn reconcile_virtual_reliable_subscribers<I, S>(&self, desired: I)
158 where
159 I: IntoIterator<Item = (S, Option<u64>)>,
160 S: Into<String>,
161 {
162 let mut state = self.state.lock().expect("topic slot lock poisoned");
163 let desired = desired
164 .into_iter()
165 .map(|(name, acked_seq)| (name.into(), acked_seq))
166 .collect::<std::collections::HashMap<String, Option<u64>>>();
167
168 for (name, acked_seq) in &desired {
169 if let Some(subscriber_id) = state.named_subscribers.get(name).copied() {
170 let target_next = acked_seq.map(|seq| {
171 Self::clamp_next_sequence(
172 seq.saturating_add(1),
173 state.head_sequence,
174 state.next_sequence,
175 )
176 });
177 if let Some(subscriber) = state.subscribers.get_mut(&subscriber_id) {
178 subscriber.reliable = true;
179 subscriber.inflight.clear();
180 if let Some(target_next) = target_next
181 && target_next > subscriber.next_sequence
182 {
183 subscriber.next_sequence = target_next;
184 }
185 }
186 continue;
187 }
188
189 let subscriber_id = state.next_subscriber_id;
190 state.next_subscriber_id = state.next_subscriber_id.saturating_add(1);
191 let mut next_sequence = state.next_sequence;
192 if let Some(acked_seq) = acked_seq {
193 next_sequence = Self::clamp_next_sequence(
194 acked_seq.saturating_add(1),
195 state.head_sequence,
196 state.next_sequence,
197 );
198 }
199
200 state.subscribers.insert(
201 subscriber_id,
202 TopicSubscriberState {
203 next_sequence,
204 reliable: true,
205 degraded_by_policy: false,
206 inflight: std::collections::VecDeque::new(),
207 },
208 );
209 state.named_subscribers.insert(name.clone(), subscriber_id);
210 }
211
212 let stale_keys = state
213 .named_subscribers
214 .keys()
215 .filter(|name| !desired.contains_key(*name))
216 .cloned()
217 .collect::<Vec<_>>();
218
219 for name in stale_keys {
220 if let Some(subscriber_id) = state.named_subscribers.remove(&name) {
221 state.subscribers.remove(&subscriber_id);
222 }
223 }
224
225 Self::apply_virtual_replay_governance(&mut state);
226 Self::garbage_collect(&mut state);
227 }
228
229 pub fn pop_for<T: Any + Clone + Send + 'static>(&self, subscriber_id: u64) -> Option<T> {
230 let mut state = self.state.lock().expect("topic slot lock poisoned");
231 let now = Instant::now();
232 let retry_timeout = state.reliability_policy.retry_timeout;
233 let max_retry = state.reliability_policy.max_retry;
234 let max_inflight_per_subscriber = state.reliability_policy.max_inflight_per_subscriber;
235 let evicted = Self::evict_exhausted_inflight(
236 &mut state,
237 subscriber_id,
238 now,
239 retry_timeout,
240 max_retry,
241 );
242 if evicted > 0 {
243 state.dropped_messages = state.dropped_messages.saturating_add(evicted);
244 Self::garbage_collect(&mut state);
245 }
246
247 let subscriber = state.subscribers.get(&subscriber_id)?;
248 let reliable = subscriber.reliable;
249 let next_sequence = subscriber.next_sequence;
250 let inflight_len = subscriber.inflight.len();
251
252 if reliable && inflight_len >= max_inflight_per_subscriber {
253 if let Some(retry_seq) = Self::next_retry_sequence(
254 subscriber,
255 now,
256 retry_timeout,
257 max_retry,
258 ) {
259 return Self::deliver_retry::<T>(&mut state, subscriber_id, retry_seq);
260 }
261 return None;
262 }
263
264 if reliable
265 && let Some(retry_seq) = Self::next_retry_sequence(
266 subscriber,
267 now,
268 retry_timeout,
269 max_retry,
270 )
271 {
272 return Self::deliver_retry::<T>(&mut state, subscriber_id, retry_seq);
273 }
274
275 let mut target = next_sequence;
276 if target < state.head_sequence {
277 target = state.head_sequence;
278 }
279
280 let offset = target.saturating_sub(state.head_sequence) as usize;
281 let (seq, boxed) = state.queue.get(offset)?;
282 if *seq != target {
283 return None;
284 }
285
286 let value = boxed.downcast_ref::<T>()?.clone();
287 if let Some(cursor) = state.subscribers.get_mut(&subscriber_id) {
288 cursor.next_sequence = target.saturating_add(1);
289 if cursor.reliable {
290 cursor.inflight.push_back(InflightDelivery {
291 sequence: target,
292 last_sent_at: Instant::now(),
293 retry_count: 0,
294 });
295 }
296 }
297 Self::garbage_collect(&mut state);
298 Some(value)
299 }
300
301 pub fn ack_reliable_for(&self, subscriber_id: u64) -> Option<u64> {
302 let mut state = self.state.lock().expect("topic slot lock poisoned");
303 let Some(subscriber) = state.subscribers.get_mut(&subscriber_id) else {
304 return None;
305 };
306
307 if !subscriber.reliable {
308 return None;
309 }
310
311 let acknowledged = subscriber.inflight.pop_front().map(|delivery| delivery.sequence);
312 if acknowledged.is_some() {
313 Self::garbage_collect(&mut state);
314 }
315 acknowledged
316 }
317
318 pub fn ack_for(&self, subscriber_id: u64) -> bool {
319 let mut state = self.state.lock().expect("topic slot lock poisoned");
320 let Some(subscriber) = state.subscribers.get_mut(&subscriber_id) else {
321 return false;
322 };
323
324 if !subscriber.reliable {
325 return true;
326 }
327
328 let acknowledged = subscriber.inflight.pop_front().is_some();
329 if acknowledged {
330 Self::garbage_collect(&mut state);
331 }
332 acknowledged
333 }
334
335 pub fn pop_batch_for<T: Any + Clone + Send + 'static>(
336 &self,
337 subscriber_id: u64,
338 max_items: usize,
339 ) -> Vec<T> {
340 if max_items == 0 {
341 return Vec::new();
342 }
343
344 let mut out = Vec::with_capacity(max_items);
345 for _ in 0..max_items {
346 if let Some(item) = self.pop_for::<T>(subscriber_id) {
347 out.push(item);
348 } else {
349 break;
350 }
351 }
352 out
353 }
354
355 pub fn pending_count_for(&self, subscriber_id: u64) -> usize {
356 let state = self.state.lock().expect("topic slot lock poisoned");
357 let Some(cursor) = state.subscribers.get(&subscriber_id) else {
358 return 0;
359 };
360
361 let effective_next = cursor.retain_from_sequence().max(state.head_sequence);
362 if effective_next >= state.next_sequence {
363 0
364 } else {
365 (state.next_sequence - effective_next) as usize
366 }
367 }
368
369 pub fn pop(&self) -> Option<Box<dyn Any + Send>> {
371 let mut state = self.state.lock().expect("topic slot lock poisoned");
372 if !state.subscribers.is_empty() {
373 return None;
374 }
375
376 let popped = state.queue.pop_front().map(|(_, msg)| msg);
377 Self::advance_head(&mut state);
378 popped
379 }
380
381 pub fn pop_batch(&self, max_items: usize) -> Vec<Box<dyn Any + Send>> {
383 if max_items == 0 {
384 return Vec::new();
385 }
386
387 let mut state = self.state.lock().expect("topic slot lock poisoned");
388 if !state.subscribers.is_empty() {
389 return Vec::new();
390 }
391
392 let mut out = Vec::with_capacity(max_items.min(state.queue.len()));
393 for _ in 0..max_items {
394 if let Some((_, msg)) = state.queue.pop_front() {
395 out.push(msg);
396 } else {
397 break;
398 }
399 }
400 Self::advance_head(&mut state);
401 out
402 }
403
404 pub fn pending_count(&self) -> usize {
406 self.state
407 .lock()
408 .expect("topic slot lock poisoned")
409 .queue
410 .len()
411 }
412
413 pub fn remaining_capacity(&self) -> usize {
415 self.max_depth.saturating_sub(self.pending_count())
416 }
417
418 pub fn backpressure_signal(&self) -> BackpressureSignal {
423 if self.max_depth == 0 {
424 return BackpressureSignal::Hard;
425 }
426
427 let pending = self.pending_count();
428 if pending >= self.max_depth {
429 return BackpressureSignal::Hard;
430 }
431
432 let utilization = pending as f64 / self.max_depth as f64;
433 if utilization >= 0.8 {
434 BackpressureSignal::Soft
435 } else {
436 BackpressureSignal::Clear
437 }
438 }
439
440 pub fn max_depth(&self) -> usize {
441 self.max_depth
442 }
443
444 pub fn load_entry(&self, topic: &str) -> TopicLoadEntry {
445 let state = self.state.lock().expect("topic slot lock poisoned");
446
447 let mut lag_messages = 0usize;
448 let mut retry_inflight = 0usize;
449 let mut replay_attempts = 0usize;
450 let mut degraded_subscribers = 0usize;
451 let max_inflight_per_subscriber = state.reliability_policy.max_inflight_per_subscriber;
452
453 for subscriber in state.subscribers.values() {
454 if !subscriber.reliable {
455 continue;
456 }
457
458 let effective_next = subscriber.retain_from_sequence().max(state.head_sequence);
459 let lag = if effective_next >= state.next_sequence {
460 0
461 } else {
462 (state.next_sequence - effective_next) as usize
463 };
464 lag_messages = lag_messages.max(lag);
465
466 retry_inflight = retry_inflight.saturating_add(subscriber.inflight.len());
467 replay_attempts = replay_attempts.saturating_add(
468 subscriber
469 .inflight
470 .iter()
471 .map(|item| item.retry_count as usize)
472 .sum::<usize>(),
473 );
474
475 let has_replay = subscriber.inflight.iter().any(|item| item.retry_count > 0);
476 let stalled = subscriber.inflight.len() >= max_inflight_per_subscriber;
477 if has_replay || stalled || subscriber.degraded_by_policy {
478 degraded_subscribers = degraded_subscribers.saturating_add(1);
479 }
480 }
481
482 TopicLoadEntry {
483 topic: topic.to_string(),
484 pending: state.queue.len(),
485 max_depth: self.max_depth,
486 dropped_messages: state.dropped_messages,
487 lag_messages,
488 retry_inflight,
489 replay_attempts,
490 degraded_subscribers,
491 }
492 }
493
494 fn garbage_collect(state: &mut super::TopicSlotState) {
495 if state.subscribers.is_empty() {
496 Self::advance_head(state);
497 return;
498 }
499
500 let min_next = state
501 .subscribers
502 .values()
503 .map(TopicSubscriberState::retain_from_sequence)
504 .min()
505 .unwrap_or(state.next_sequence);
506
507 while let Some((seq, _)) = state.queue.front() {
508 if *seq < min_next {
509 state.queue.pop_front();
510 } else {
511 break;
512 }
513 }
514
515 Self::advance_head(state);
516 }
517
518 fn advance_head(state: &mut super::TopicSlotState) {
519 if let Some((seq, _)) = state.queue.front() {
520 state.head_sequence = *seq;
521 } else {
522 state.head_sequence = state.next_sequence;
523 }
524 }
525
526 fn next_retry_sequence(
527 subscriber: &TopicSubscriberState,
528 now: Instant,
529 retry_timeout: std::time::Duration,
530 max_retry: u8,
531 ) -> Option<u64> {
532 subscriber
533 .inflight
534 .iter()
535 .find(|item| {
536 item.retry_count < max_retry
537 && now.duration_since(item.last_sent_at) >= retry_timeout
538 })
539 .map(|item| item.sequence)
540 }
541
542 fn deliver_retry<T: Any + Clone + Send + 'static>(
543 state: &mut super::TopicSlotState,
544 subscriber_id: u64,
545 retry_seq: u64,
546 ) -> Option<T> {
547 let offset = retry_seq.saturating_sub(state.head_sequence) as usize;
548 let (_, boxed) = state.queue.get(offset)?;
549 let value = boxed.downcast_ref::<T>()?.clone();
550
551 if let Some(subscriber) = state.subscribers.get_mut(&subscriber_id)
552 && let Some(item) = subscriber
553 .inflight
554 .iter_mut()
555 .find(|item| item.sequence == retry_seq)
556 {
557 item.retry_count = item.retry_count.saturating_add(1);
558 item.last_sent_at = Instant::now();
559 }
560 Some(value)
561 }
562
563 fn evict_exhausted_inflight(
564 state: &mut super::TopicSlotState,
565 subscriber_id: u64,
566 now: Instant,
567 retry_timeout: std::time::Duration,
568 max_retry: u8,
569 ) -> usize {
570 let Some(subscriber) = state.subscribers.get_mut(&subscriber_id) else {
571 return 0;
572 };
573
574 if !subscriber.reliable {
575 return 0;
576 }
577
578 let mut evicted_count = 0usize;
579 while let Some(front) = subscriber.inflight.front() {
580 let exhausted = front.retry_count >= max_retry;
581 let retry_due = now.duration_since(front.last_sent_at) >= retry_timeout;
582 if exhausted && retry_due {
583 subscriber.inflight.pop_front();
584 evicted_count = evicted_count.saturating_add(1);
585 } else {
586 break;
587 }
588 }
589
590 evicted_count
591 }
592
593 fn clamp_next_sequence(next: u64, head_sequence: u64, max_sequence: u64) -> u64 {
594 next.max(head_sequence).min(max_sequence)
595 }
596
597 fn apply_virtual_replay_governance(state: &mut super::TopicSlotState) {
598 let Some(window) = state.reliability_policy.replay_window else {
599 for subscriber_id in state.named_subscribers.values() {
600 if let Some(subscriber) = state.subscribers.get_mut(subscriber_id) {
601 subscriber.degraded_by_policy = false;
602 }
603 }
604 return;
605 };
606
607 let floor = state.next_sequence.saturating_sub(window as u64);
608 let mut dropped_total = 0usize;
609 for subscriber_id in state.named_subscribers.values() {
610 let Some(subscriber) = state.subscribers.get_mut(subscriber_id) else {
611 continue;
612 };
613
614 let over_window = subscriber.next_sequence < floor;
615 subscriber.degraded_by_policy = over_window;
616
617 if !over_window {
618 continue;
619 }
620
621 if state.reliability_policy.replay_degrade_strategy == ReplayDegradeStrategy::DropOldest {
622 let dropped = floor.saturating_sub(subscriber.next_sequence) as usize;
623 if dropped > 0 {
624 dropped_total = dropped_total.saturating_add(dropped);
625 subscriber.next_sequence = floor;
626 subscriber.inflight.clear();
627 }
628 }
629 }
630
631 if dropped_total > 0 {
632 state.dropped_messages = state.dropped_messages.saturating_add(dropped_total);
633 }
634 }
635}
636
637impl TopicSubscriberState {
638 fn retain_from_sequence(&self) -> u64 {
639 self.inflight
640 .front()
641 .map(|item| item.sequence)
642 .unwrap_or(self.next_sequence)
643 }
644}
645
646impl TopicBus {
647 pub fn get_or_create(&mut self, topic: &str, depth: usize) -> Arc<TopicSlot> {
650 let reliability_policy = self.reliability_policy;
651 self.slots
652 .entry(topic.to_string())
653 .or_insert_with(|| {
654 Arc::new(TopicSlot::new_with_reliability_policy(
655 depth,
656 reliability_policy,
657 ))
658 })
659 .clone()
660 }
661
662 pub fn get_or_create_default(&mut self, topic: &str) -> Arc<TopicSlot> {
664 let depth = self.default_depth;
665 self.get_or_create(topic, depth)
666 }
667
668 pub fn set_reliability_policy(&mut self, reliability_policy: TopicReliabilityPolicy) {
669 self.reliability_policy = reliability_policy;
670 for slot in self.slots.values() {
671 slot.set_reliability_policy(reliability_policy);
672 }
673 }
674
675 pub fn load_entries(&self) -> Vec<TopicLoadEntry> {
676 let mut out = self
677 .slots
678 .iter()
679 .map(|(topic, slot)| slot.load_entry(topic))
680 .collect::<Vec<_>>();
681 out.sort_by(|a, b| a.topic.cmp(&b.topic));
682 out
683 }
684}