use std::any::Any;
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use core_types::BackpressureSignal;
use super::{TopicBus, TopicLoadEntry, TopicSlot};
impl TopicSlot {
pub fn new(max_depth: usize) -> Self {
Self {
queue: Mutex::new(VecDeque::new()),
max_depth,
}
}
pub fn push(&self, msg: Box<dyn Any + Send>) -> bool {
let mut q = self.queue.lock().expect("topic slot lock poisoned");
if q.len() >= self.max_depth {
return true;
}
q.push_back(msg);
false
}
pub fn push_batch<I>(&self, msgs: I) -> usize
where
I: IntoIterator<Item = Box<dyn Any + Send>>,
{
let mut q = self.queue.lock().expect("topic slot lock poisoned");
let mut accepted = 0usize;
for msg in msgs {
if q.len() >= self.max_depth {
break;
}
q.push_back(msg);
accepted += 1;
}
accepted
}
pub fn pop(&self) -> Option<Box<dyn Any + Send>> {
self.queue
.lock()
.expect("topic slot lock poisoned")
.pop_front()
}
pub fn pop_batch(&self, max_items: usize) -> Vec<Box<dyn Any + Send>> {
if max_items == 0 {
return Vec::new();
}
let mut q = self.queue.lock().expect("topic slot lock poisoned");
let mut out = Vec::with_capacity(max_items.min(q.len()));
for _ in 0..max_items {
if let Some(msg) = q.pop_front() {
out.push(msg);
} else {
break;
}
}
out
}
pub fn pending_count(&self) -> usize {
self.queue.lock().expect("topic slot lock poisoned").len()
}
pub fn remaining_capacity(&self) -> usize {
self.max_depth.saturating_sub(self.pending_count())
}
pub fn backpressure_signal(&self) -> BackpressureSignal {
if self.max_depth == 0 {
return BackpressureSignal::Hard;
}
let pending = self.pending_count();
if pending >= self.max_depth {
return BackpressureSignal::Hard;
}
let utilization = pending as f64 / self.max_depth as f64;
if utilization >= 0.8 {
BackpressureSignal::Soft
} else {
BackpressureSignal::Clear
}
}
pub fn max_depth(&self) -> usize {
self.max_depth
}
}
impl TopicBus {
pub fn get_or_create(&mut self, topic: &str, depth: usize) -> Arc<TopicSlot> {
self.slots
.entry(topic.to_string())
.or_insert_with(|| Arc::new(TopicSlot::new(depth)))
.clone()
}
pub fn get_or_create_default(&mut self, topic: &str) -> Arc<TopicSlot> {
let depth = self.default_depth;
self.get_or_create(topic, depth)
}
pub fn load_entries(&self) -> Vec<TopicLoadEntry> {
let mut out = self
.slots
.iter()
.map(|(topic, slot)| TopicLoadEntry {
topic: topic.clone(),
pending: slot.pending_count(),
max_depth: slot.max_depth(),
})
.collect::<Vec<_>>();
out.sort_by(|a, b| a.topic.cmp(&b.topic));
out
}
}