use std::any::Any;
use std::sync::Arc;
use std::time::Instant;
use core_types::BackpressureSignal;
use crate::{ReplayDegradeStrategy, TopicReliabilityPolicy};
use super::{InflightDelivery, TopicBus, TopicLoadEntry, TopicSlot, TopicSubscriberState};
impl TopicSlot {
pub fn new(max_depth: usize) -> Self {
Self::new_with_reliability_policy(max_depth, TopicReliabilityPolicy::default())
}
pub fn new_with_reliability_policy(
max_depth: usize,
reliability_policy: TopicReliabilityPolicy,
) -> Self {
Self {
state: std::sync::Mutex::new(super::TopicSlotState::new(reliability_policy)),
max_depth,
}
}
pub fn set_reliability_policy(&self, reliability_policy: TopicReliabilityPolicy) {
let mut state = self.state.lock().expect("topic slot lock poisoned");
state.reliability_policy = reliability_policy;
Self::apply_virtual_replay_governance(&mut state);
Self::garbage_collect(&mut state);
}
pub fn push(&self, msg: Box<dyn Any + Send>) -> bool {
let mut state = self.state.lock().expect("topic slot lock poisoned");
if state.queue.len() >= self.max_depth {
return true;
}
let seq = state.next_sequence;
state.next_sequence = state.next_sequence.saturating_add(1);
state.queue.push_back((seq, msg));
if state.queue.len() == 1 {
state.head_sequence = seq;
}
false
}
pub fn push_best_effort(&self, msg: Box<dyn Any + Send>) -> bool {
let mut state = self.state.lock().expect("topic slot lock poisoned");
if self.max_depth == 0 {
return true;
}
if state.queue.len() >= self.max_depth {
state.queue.pop_front();
state.dropped_messages = state.dropped_messages.saturating_add(1);
Self::advance_head(&mut state);
}
let seq = state.next_sequence;
state.next_sequence = state.next_sequence.saturating_add(1);
state.queue.push_back((seq, msg));
if state.queue.len() == 1 {
state.head_sequence = seq;
}
false
}
pub fn push_batch<I>(&self, msgs: I) -> usize
where
I: IntoIterator<Item = Box<dyn Any + Send>>,
{
let mut state = self.state.lock().expect("topic slot lock poisoned");
let mut accepted = 0usize;
for msg in msgs {
if state.queue.len() >= self.max_depth {
break;
}
let seq = state.next_sequence;
state.next_sequence = state.next_sequence.saturating_add(1);
state.queue.push_back((seq, msg));
if state.queue.len() == 1 {
state.head_sequence = seq;
}
accepted += 1;
}
accepted
}
pub fn push_batch_best_effort<I>(&self, msgs: I) -> usize
where
I: IntoIterator<Item = Box<dyn Any + Send>>,
{
let mut state = self.state.lock().expect("topic slot lock poisoned");
if self.max_depth == 0 {
return 0;
}
let mut accepted = 0usize;
for msg in msgs {
if state.queue.len() >= self.max_depth {
state.queue.pop_front();
state.dropped_messages = state.dropped_messages.saturating_add(1);
}
let seq = state.next_sequence;
state.next_sequence = state.next_sequence.saturating_add(1);
state.queue.push_back((seq, msg));
if state.queue.len() == 1 {
state.head_sequence = seq;
}
accepted += 1;
}
Self::advance_head(&mut state);
accepted
}
pub fn register_subscriber(&self) -> u64 {
self.register_subscriber_with_policy(false)
}
pub fn register_subscriber_with_policy(&self, reliable: bool) -> u64 {
let mut state = self.state.lock().expect("topic slot lock poisoned");
let id = state.next_subscriber_id;
state.next_subscriber_id = state.next_subscriber_id.saturating_add(1);
let start_sequence = state.next_sequence;
state.subscribers.insert(
id,
TopicSubscriberState {
next_sequence: start_sequence,
reliable,
degraded_by_policy: false,
inflight: std::collections::VecDeque::new(),
},
);
id
}
pub fn unregister_subscriber(&self, subscriber_id: u64) {
let mut state = self.state.lock().expect("topic slot lock poisoned");
state.subscribers.remove(&subscriber_id);
Self::garbage_collect(&mut state);
}
pub fn reconcile_virtual_reliable_subscribers<I, S>(&self, desired: I)
where
I: IntoIterator<Item = (S, Option<u64>)>,
S: Into<String>,
{
let mut state = self.state.lock().expect("topic slot lock poisoned");
let desired = desired
.into_iter()
.map(|(name, acked_seq)| (name.into(), acked_seq))
.collect::<std::collections::HashMap<String, Option<u64>>>();
for (name, acked_seq) in &desired {
if let Some(subscriber_id) = state.named_subscribers.get(name).copied() {
let target_next = acked_seq.map(|seq| {
Self::clamp_next_sequence(
seq.saturating_add(1),
state.head_sequence,
state.next_sequence,
)
});
if let Some(subscriber) = state.subscribers.get_mut(&subscriber_id) {
subscriber.reliable = true;
subscriber.inflight.clear();
if let Some(target_next) = target_next
&& target_next > subscriber.next_sequence
{
subscriber.next_sequence = target_next;
}
}
continue;
}
let subscriber_id = state.next_subscriber_id;
state.next_subscriber_id = state.next_subscriber_id.saturating_add(1);
let mut next_sequence = state.next_sequence;
if let Some(acked_seq) = acked_seq {
next_sequence = Self::clamp_next_sequence(
acked_seq.saturating_add(1),
state.head_sequence,
state.next_sequence,
);
}
state.subscribers.insert(
subscriber_id,
TopicSubscriberState {
next_sequence,
reliable: true,
degraded_by_policy: false,
inflight: std::collections::VecDeque::new(),
},
);
state.named_subscribers.insert(name.clone(), subscriber_id);
}
let stale_keys = state
.named_subscribers
.keys()
.filter(|name| !desired.contains_key(*name))
.cloned()
.collect::<Vec<_>>();
for name in stale_keys {
if let Some(subscriber_id) = state.named_subscribers.remove(&name) {
state.subscribers.remove(&subscriber_id);
}
}
Self::apply_virtual_replay_governance(&mut state);
Self::garbage_collect(&mut state);
}
pub fn pop_for<T: Any + Clone + Send + 'static>(&self, subscriber_id: u64) -> Option<T> {
let mut state = self.state.lock().expect("topic slot lock poisoned");
let now = Instant::now();
let retry_timeout = state.reliability_policy.retry_timeout;
let max_retry = state.reliability_policy.max_retry;
let max_inflight_per_subscriber = state.reliability_policy.max_inflight_per_subscriber;
let evicted = Self::evict_exhausted_inflight(
&mut state,
subscriber_id,
now,
retry_timeout,
max_retry,
);
if evicted > 0 {
state.dropped_messages = state.dropped_messages.saturating_add(evicted);
Self::garbage_collect(&mut state);
}
let subscriber = state.subscribers.get(&subscriber_id)?;
let reliable = subscriber.reliable;
let next_sequence = subscriber.next_sequence;
let inflight_len = subscriber.inflight.len();
if reliable && inflight_len >= max_inflight_per_subscriber {
if let Some(retry_seq) = Self::next_retry_sequence(
subscriber,
now,
retry_timeout,
max_retry,
) {
return Self::deliver_retry::<T>(&mut state, subscriber_id, retry_seq);
}
return None;
}
if reliable
&& let Some(retry_seq) = Self::next_retry_sequence(
subscriber,
now,
retry_timeout,
max_retry,
)
{
return Self::deliver_retry::<T>(&mut state, subscriber_id, retry_seq);
}
let mut target = next_sequence;
if target < state.head_sequence {
target = state.head_sequence;
}
let offset = target.saturating_sub(state.head_sequence) as usize;
let (seq, boxed) = state.queue.get(offset)?;
if *seq != target {
return None;
}
let value = boxed.downcast_ref::<T>()?.clone();
if let Some(cursor) = state.subscribers.get_mut(&subscriber_id) {
cursor.next_sequence = target.saturating_add(1);
if cursor.reliable {
cursor.inflight.push_back(InflightDelivery {
sequence: target,
last_sent_at: Instant::now(),
retry_count: 0,
});
}
}
Self::garbage_collect(&mut state);
Some(value)
}
pub fn ack_reliable_for(&self, subscriber_id: u64) -> Option<u64> {
let mut state = self.state.lock().expect("topic slot lock poisoned");
let Some(subscriber) = state.subscribers.get_mut(&subscriber_id) else {
return None;
};
if !subscriber.reliable {
return None;
}
let acknowledged = subscriber.inflight.pop_front().map(|delivery| delivery.sequence);
if acknowledged.is_some() {
Self::garbage_collect(&mut state);
}
acknowledged
}
pub fn ack_for(&self, subscriber_id: u64) -> bool {
let mut state = self.state.lock().expect("topic slot lock poisoned");
let Some(subscriber) = state.subscribers.get_mut(&subscriber_id) else {
return false;
};
if !subscriber.reliable {
return true;
}
let acknowledged = subscriber.inflight.pop_front().is_some();
if acknowledged {
Self::garbage_collect(&mut state);
}
acknowledged
}
pub fn pop_batch_for<T: Any + Clone + Send + 'static>(
&self,
subscriber_id: u64,
max_items: usize,
) -> Vec<T> {
if max_items == 0 {
return Vec::new();
}
let mut out = Vec::with_capacity(max_items);
for _ in 0..max_items {
if let Some(item) = self.pop_for::<T>(subscriber_id) {
out.push(item);
} else {
break;
}
}
out
}
pub fn pending_count_for(&self, subscriber_id: u64) -> usize {
let state = self.state.lock().expect("topic slot lock poisoned");
let Some(cursor) = state.subscribers.get(&subscriber_id) else {
return 0;
};
let effective_next = cursor.retain_from_sequence().max(state.head_sequence);
if effective_next >= state.next_sequence {
0
} else {
(state.next_sequence - effective_next) as usize
}
}
pub fn pop(&self) -> Option<Box<dyn Any + Send>> {
let mut state = self.state.lock().expect("topic slot lock poisoned");
if !state.subscribers.is_empty() {
return None;
}
let popped = state.queue.pop_front().map(|(_, msg)| msg);
Self::advance_head(&mut state);
popped
}
pub fn pop_batch(&self, max_items: usize) -> Vec<Box<dyn Any + Send>> {
if max_items == 0 {
return Vec::new();
}
let mut state = self.state.lock().expect("topic slot lock poisoned");
if !state.subscribers.is_empty() {
return Vec::new();
}
let mut out = Vec::with_capacity(max_items.min(state.queue.len()));
for _ in 0..max_items {
if let Some((_, msg)) = state.queue.pop_front() {
out.push(msg);
} else {
break;
}
}
Self::advance_head(&mut state);
out
}
pub fn pending_count(&self) -> usize {
self.state
.lock()
.expect("topic slot lock poisoned")
.queue
.len()
}
pub fn remaining_capacity(&self) -> usize {
self.max_depth.saturating_sub(self.pending_count())
}
pub fn backpressure_signal(&self) -> BackpressureSignal {
if self.max_depth == 0 {
return BackpressureSignal::Hard;
}
let pending = self.pending_count();
if pending >= self.max_depth {
return BackpressureSignal::Hard;
}
let utilization = pending as f64 / self.max_depth as f64;
if utilization >= 0.8 {
BackpressureSignal::Soft
} else {
BackpressureSignal::Clear
}
}
pub fn max_depth(&self) -> usize {
self.max_depth
}
pub fn load_entry(&self, topic: &str) -> TopicLoadEntry {
let state = self.state.lock().expect("topic slot lock poisoned");
let mut lag_messages = 0usize;
let mut retry_inflight = 0usize;
let mut replay_attempts = 0usize;
let mut degraded_subscribers = 0usize;
let max_inflight_per_subscriber = state.reliability_policy.max_inflight_per_subscriber;
for subscriber in state.subscribers.values() {
if !subscriber.reliable {
continue;
}
let effective_next = subscriber.retain_from_sequence().max(state.head_sequence);
let lag = if effective_next >= state.next_sequence {
0
} else {
(state.next_sequence - effective_next) as usize
};
lag_messages = lag_messages.max(lag);
retry_inflight = retry_inflight.saturating_add(subscriber.inflight.len());
replay_attempts = replay_attempts.saturating_add(
subscriber
.inflight
.iter()
.map(|item| item.retry_count as usize)
.sum::<usize>(),
);
let has_replay = subscriber.inflight.iter().any(|item| item.retry_count > 0);
let stalled = subscriber.inflight.len() >= max_inflight_per_subscriber;
if has_replay || stalled || subscriber.degraded_by_policy {
degraded_subscribers = degraded_subscribers.saturating_add(1);
}
}
TopicLoadEntry {
topic: topic.to_string(),
pending: state.queue.len(),
max_depth: self.max_depth,
dropped_messages: state.dropped_messages,
lag_messages,
retry_inflight,
replay_attempts,
degraded_subscribers,
}
}
fn garbage_collect(state: &mut super::TopicSlotState) {
if state.subscribers.is_empty() {
Self::advance_head(state);
return;
}
let min_next = state
.subscribers
.values()
.map(TopicSubscriberState::retain_from_sequence)
.min()
.unwrap_or(state.next_sequence);
while let Some((seq, _)) = state.queue.front() {
if *seq < min_next {
state.queue.pop_front();
} else {
break;
}
}
Self::advance_head(state);
}
fn advance_head(state: &mut super::TopicSlotState) {
if let Some((seq, _)) = state.queue.front() {
state.head_sequence = *seq;
} else {
state.head_sequence = state.next_sequence;
}
}
fn next_retry_sequence(
subscriber: &TopicSubscriberState,
now: Instant,
retry_timeout: std::time::Duration,
max_retry: u8,
) -> Option<u64> {
subscriber
.inflight
.iter()
.find(|item| {
item.retry_count < max_retry
&& now.duration_since(item.last_sent_at) >= retry_timeout
})
.map(|item| item.sequence)
}
fn deliver_retry<T: Any + Clone + Send + 'static>(
state: &mut super::TopicSlotState,
subscriber_id: u64,
retry_seq: u64,
) -> Option<T> {
let offset = retry_seq.saturating_sub(state.head_sequence) as usize;
let (_, boxed) = state.queue.get(offset)?;
let value = boxed.downcast_ref::<T>()?.clone();
if let Some(subscriber) = state.subscribers.get_mut(&subscriber_id)
&& let Some(item) = subscriber
.inflight
.iter_mut()
.find(|item| item.sequence == retry_seq)
{
item.retry_count = item.retry_count.saturating_add(1);
item.last_sent_at = Instant::now();
}
Some(value)
}
fn evict_exhausted_inflight(
state: &mut super::TopicSlotState,
subscriber_id: u64,
now: Instant,
retry_timeout: std::time::Duration,
max_retry: u8,
) -> usize {
let Some(subscriber) = state.subscribers.get_mut(&subscriber_id) else {
return 0;
};
if !subscriber.reliable {
return 0;
}
let mut evicted_count = 0usize;
while let Some(front) = subscriber.inflight.front() {
let exhausted = front.retry_count >= max_retry;
let retry_due = now.duration_since(front.last_sent_at) >= retry_timeout;
if exhausted && retry_due {
subscriber.inflight.pop_front();
evicted_count = evicted_count.saturating_add(1);
} else {
break;
}
}
evicted_count
}
fn clamp_next_sequence(next: u64, head_sequence: u64, max_sequence: u64) -> u64 {
next.max(head_sequence).min(max_sequence)
}
fn apply_virtual_replay_governance(state: &mut super::TopicSlotState) {
let Some(window) = state.reliability_policy.replay_window else {
for subscriber_id in state.named_subscribers.values() {
if let Some(subscriber) = state.subscribers.get_mut(subscriber_id) {
subscriber.degraded_by_policy = false;
}
}
return;
};
let floor = state.next_sequence.saturating_sub(window as u64);
let mut dropped_total = 0usize;
for subscriber_id in state.named_subscribers.values() {
let Some(subscriber) = state.subscribers.get_mut(subscriber_id) else {
continue;
};
let over_window = subscriber.next_sequence < floor;
subscriber.degraded_by_policy = over_window;
if !over_window {
continue;
}
if state.reliability_policy.replay_degrade_strategy == ReplayDegradeStrategy::DropOldest {
let dropped = floor.saturating_sub(subscriber.next_sequence) as usize;
if dropped > 0 {
dropped_total = dropped_total.saturating_add(dropped);
subscriber.next_sequence = floor;
subscriber.inflight.clear();
}
}
}
if dropped_total > 0 {
state.dropped_messages = state.dropped_messages.saturating_add(dropped_total);
}
}
}
impl TopicSubscriberState {
fn retain_from_sequence(&self) -> u64 {
self.inflight
.front()
.map(|item| item.sequence)
.unwrap_or(self.next_sequence)
}
}
impl TopicBus {
pub fn get_or_create(&mut self, topic: &str, depth: usize) -> Arc<TopicSlot> {
let reliability_policy = self.reliability_policy;
self.slots
.entry(topic.to_string())
.or_insert_with(|| {
Arc::new(TopicSlot::new_with_reliability_policy(
depth,
reliability_policy,
))
})
.clone()
}
pub fn get_or_create_default(&mut self, topic: &str) -> Arc<TopicSlot> {
let depth = self.default_depth;
self.get_or_create(topic, depth)
}
pub fn set_reliability_policy(&mut self, reliability_policy: TopicReliabilityPolicy) {
self.reliability_policy = reliability_policy;
for slot in self.slots.values() {
slot.set_reliability_policy(reliability_policy);
}
}
pub fn load_entries(&self) -> Vec<TopicLoadEntry> {
let mut out = self
.slots
.iter()
.map(|(topic, slot)| slot.load_entry(topic))
.collect::<Vec<_>>();
out.sort_by(|a, b| a.topic.cmp(&b.topic));
out
}
}