use std::any::Any;
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex};
use std::time::Instant;
use crate::TopicReliabilityPolicy;
mod service;
mod topic;
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct TopicLoadEntry {
pub topic: String,
pub pending: usize,
pub max_depth: usize,
pub dropped_messages: usize,
pub lag_messages: usize,
pub retry_inflight: usize,
pub replay_attempts: usize,
pub degraded_subscribers: usize,
}
impl TopicLoadEntry {
pub fn utilization_ratio(&self) -> f64 {
if self.max_depth == 0 {
return 0.0;
}
self.pending as f64 / self.max_depth as f64
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ServiceLoadEntry {
pub service: String,
pub pending_requests: usize,
pub pending_responses: usize,
}
pub struct TopicSlot {
state: Mutex<TopicSlotState>,
max_depth: usize,
}
struct TopicSlotState {
queue: VecDeque<(u64, Box<dyn Any + Send>)>,
head_sequence: u64,
next_sequence: u64,
dropped_messages: usize,
reliability_policy: TopicReliabilityPolicy,
subscribers: HashMap<u64, TopicSubscriberState>,
named_subscribers: HashMap<String, u64>,
next_subscriber_id: u64,
}
impl TopicSlotState {
fn new(reliability_policy: TopicReliabilityPolicy) -> Self {
Self {
queue: VecDeque::new(),
head_sequence: 1,
next_sequence: 1,
dropped_messages: 0,
reliability_policy,
subscribers: HashMap::new(),
named_subscribers: HashMap::new(),
next_subscriber_id: 1,
}
}
}
#[derive(Clone, Debug)]
struct TopicSubscriberState {
next_sequence: u64,
reliable: bool,
degraded_by_policy: bool,
inflight: VecDeque<InflightDelivery>,
}
#[derive(Clone, Copy, Debug)]
struct InflightDelivery {
sequence: u64,
last_sent_at: Instant,
retry_count: u8,
}
pub struct TopicBus {
slots: HashMap<String, Arc<TopicSlot>>,
default_depth: usize,
reliability_policy: TopicReliabilityPolicy,
}
impl Default for TopicBus {
fn default() -> Self {
Self {
slots: HashMap::new(),
default_depth: 16,
reliability_policy: TopicReliabilityPolicy::default(),
}
}
}
pub struct ServiceChannel {
requests: Mutex<VecDeque<(u64, Box<dyn Any + Send>)>>,
responses: Mutex<HashMap<u64, Box<dyn Any + Send>>>,
}
impl Default for ServiceChannel {
fn default() -> Self {
Self {
requests: Mutex::new(VecDeque::new()),
responses: Mutex::new(HashMap::new()),
}
}
}
#[derive(Default)]
pub struct ServiceBus {
channels: HashMap<String, Arc<ServiceChannel>>,
}