use crate::{InterestLevel, InterestMap, PropagationTopology};
use elara_core::{NodeId, StateTime};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum PropagationPriority {
Background = 0,
Normal = 1,
High = 2,
Urgent = 3,
}
#[derive(Debug, Clone)]
pub struct StateUpdate {
pub state_id: u64,
pub source: NodeId,
pub sequence: u64,
pub timestamp: StateTime,
pub priority: PropagationPriority,
pub size: usize,
pub is_keyframe: bool,
}
impl StateUpdate {
pub fn new(state_id: u64, source: NodeId, sequence: u64, timestamp: StateTime) -> Self {
Self {
state_id,
source,
sequence,
timestamp,
priority: PropagationPriority::Normal,
size: 0,
is_keyframe: false,
}
}
pub fn with_priority(mut self, priority: PropagationPriority) -> Self {
self.priority = priority;
self
}
pub fn with_size(mut self, size: usize) -> Self {
self.size = size;
self
}
pub fn keyframe(mut self) -> Self {
self.is_keyframe = true;
self
}
}
#[derive(Debug, Clone)]
pub struct PropagationDecision {
pub target: NodeId,
pub should_send: bool,
pub priority: PropagationPriority,
pub delay_ms: u32,
pub quality_level: u8,
}
#[derive(Debug)]
pub struct PropagationScheduler {
interests: InterestMap,
topology: PropagationTopology,
bandwidth_budget: u32,
bandwidth_usage: std::collections::HashMap<NodeId, u32>,
}
impl PropagationScheduler {
pub fn new(interests: InterestMap, topology: PropagationTopology) -> Self {
Self {
interests,
topology,
bandwidth_budget: 1_000_000, bandwidth_usage: std::collections::HashMap::new(),
}
}
pub fn set_bandwidth_budget(&mut self, bytes_per_second: u32) {
self.bandwidth_budget = bytes_per_second;
}
pub fn schedule(&self, update: &StateUpdate) -> Vec<PropagationDecision> {
let mut decisions = Vec::new();
let interested = self.interests.interested_nodes(update.state_id);
for (node, interest_level) in interested {
if node == update.source {
continue;
}
if !self.topology.has_node(node) {
continue;
}
let priority = match interest_level {
InterestLevel::Critical => PropagationPriority::Urgent,
InterestLevel::High => PropagationPriority::High,
InterestLevel::Medium => PropagationPriority::Normal,
InterestLevel::Low => PropagationPriority::Background,
InterestLevel::None => continue,
};
let quality_level = match interest_level {
InterestLevel::Critical | InterestLevel::High => 0, InterestLevel::Medium => 1, InterestLevel::Low => 2, InterestLevel::None => continue,
};
let delay_ms = match priority {
PropagationPriority::Urgent => 0,
PropagationPriority::High => 10,
PropagationPriority::Normal => 50,
PropagationPriority::Background => 200,
};
decisions.push(PropagationDecision {
target: node,
should_send: true,
priority,
delay_ms,
quality_level,
});
}
decisions.sort_by(|a, b| b.priority.cmp(&a.priority));
decisions
}
pub fn record_send(&mut self, target: NodeId, bytes: u32) {
*self.bandwidth_usage.entry(target).or_insert(0) += bytes;
}
pub fn reset_bandwidth(&mut self) {
self.bandwidth_usage.clear();
}
pub fn has_bandwidth(&self, target: NodeId, bytes: u32) -> bool {
let used = self.bandwidth_usage.get(&target).copied().unwrap_or(0);
used + bytes <= self.bandwidth_budget
}
}
#[derive(Debug, Clone, Default)]
pub struct PropagationStats {
pub updates_sent: u64,
pub bytes_sent: u64,
pub updates_dropped: u64,
pub avg_latency_ms: f32,
pub peak_latency_ms: u32,
}
impl PropagationStats {
pub fn new() -> Self {
Self::default()
}
pub fn record_send(&mut self, bytes: u64, latency_ms: u32) {
self.updates_sent += 1;
self.bytes_sent += bytes;
let n = self.updates_sent as f32;
self.avg_latency_ms = ((n - 1.0) * self.avg_latency_ms + latency_ms as f32) / n;
if latency_ms > self.peak_latency_ms {
self.peak_latency_ms = latency_ms;
}
}
pub fn record_drop(&mut self) {
self.updates_dropped += 1;
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::InterestDeclaration;
#[test]
fn test_propagation_scheduler() {
let mut interests = InterestMap::new();
let source = NodeId::new(1);
let viewer1 = NodeId::new(2);
let viewer2 = NodeId::new(3);
interests.register(InterestDeclaration::new(viewer1, 100, InterestLevel::High));
interests.register(InterestDeclaration::new(viewer2, 100, InterestLevel::Low));
let mut topology = PropagationTopology::new();
topology.add_node(source);
topology.add_node(viewer1);
topology.add_node(viewer2);
let scheduler = PropagationScheduler::new(interests, topology);
let update = StateUpdate::new(100, source, 1, StateTime::from_millis(0));
let decisions = scheduler.schedule(&update);
assert_eq!(decisions.len(), 2);
assert_eq!(decisions[0].target, viewer1);
assert_eq!(decisions[0].priority, PropagationPriority::High);
}
#[test]
fn test_propagation_stats() {
let mut stats = PropagationStats::new();
stats.record_send(1000, 50);
stats.record_send(1000, 100);
stats.record_send(1000, 75);
assert_eq!(stats.updates_sent, 3);
assert_eq!(stats.bytes_sent, 3000);
assert_eq!(stats.peak_latency_ms, 100);
}
}