1use crate::{InterestLevel, InterestMap, PropagationTopology};
6use elara_core::{NodeId, StateTime};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
10pub enum PropagationPriority {
11 Background = 0,
13 Normal = 1,
15 High = 2,
17 Urgent = 3,
19}
20
21#[derive(Debug, Clone)]
23pub struct StateUpdate {
24 pub state_id: u64,
26 pub source: NodeId,
28 pub sequence: u64,
30 pub timestamp: StateTime,
32 pub priority: PropagationPriority,
34 pub size: usize,
36 pub is_keyframe: bool,
38}
39
40impl StateUpdate {
41 pub fn new(state_id: u64, source: NodeId, sequence: u64, timestamp: StateTime) -> Self {
42 Self {
43 state_id,
44 source,
45 sequence,
46 timestamp,
47 priority: PropagationPriority::Normal,
48 size: 0,
49 is_keyframe: false,
50 }
51 }
52
53 pub fn with_priority(mut self, priority: PropagationPriority) -> Self {
54 self.priority = priority;
55 self
56 }
57
58 pub fn with_size(mut self, size: usize) -> Self {
59 self.size = size;
60 self
61 }
62
63 pub fn keyframe(mut self) -> Self {
64 self.is_keyframe = true;
65 self
66 }
67}
68
69#[derive(Debug, Clone)]
71pub struct PropagationDecision {
72 pub target: NodeId,
74 pub should_send: bool,
76 pub priority: PropagationPriority,
78 pub delay_ms: u32,
80 pub quality_level: u8,
82}
83
84#[derive(Debug)]
86pub struct PropagationScheduler {
87 interests: InterestMap,
89 topology: PropagationTopology,
91 bandwidth_budget: u32,
93 bandwidth_usage: std::collections::HashMap<NodeId, u32>,
95}
96
97impl PropagationScheduler {
98 pub fn new(interests: InterestMap, topology: PropagationTopology) -> Self {
100 Self {
101 interests,
102 topology,
103 bandwidth_budget: 1_000_000, bandwidth_usage: std::collections::HashMap::new(),
105 }
106 }
107
108 pub fn set_bandwidth_budget(&mut self, bytes_per_second: u32) {
110 self.bandwidth_budget = bytes_per_second;
111 }
112
113 pub fn schedule(&self, update: &StateUpdate) -> Vec<PropagationDecision> {
115 let mut decisions = Vec::new();
116
117 let interested = self.interests.interested_nodes(update.state_id);
119
120 for (node, interest_level) in interested {
121 if node == update.source {
123 continue;
124 }
125
126 if !self.topology.has_node(node) {
128 continue;
129 }
130
131 let priority = match interest_level {
133 InterestLevel::Critical => PropagationPriority::Urgent,
134 InterestLevel::High => PropagationPriority::High,
135 InterestLevel::Medium => PropagationPriority::Normal,
136 InterestLevel::Low => PropagationPriority::Background,
137 InterestLevel::None => continue,
138 };
139
140 let quality_level = match interest_level {
142 InterestLevel::Critical | InterestLevel::High => 0, InterestLevel::Medium => 1, InterestLevel::Low => 2, InterestLevel::None => continue,
146 };
147
148 let delay_ms = match priority {
150 PropagationPriority::Urgent => 0,
151 PropagationPriority::High => 10,
152 PropagationPriority::Normal => 50,
153 PropagationPriority::Background => 200,
154 };
155
156 decisions.push(PropagationDecision {
157 target: node,
158 should_send: true,
159 priority,
160 delay_ms,
161 quality_level,
162 });
163 }
164
165 decisions.sort_by(|a, b| b.priority.cmp(&a.priority));
167
168 decisions
169 }
170
171 pub fn record_send(&mut self, target: NodeId, bytes: u32) {
173 *self.bandwidth_usage.entry(target).or_insert(0) += bytes;
174 }
175
176 pub fn reset_bandwidth(&mut self) {
178 self.bandwidth_usage.clear();
179 }
180
181 pub fn has_bandwidth(&self, target: NodeId, bytes: u32) -> bool {
183 let used = self.bandwidth_usage.get(&target).copied().unwrap_or(0);
184 used + bytes <= self.bandwidth_budget
185 }
186}
187
188#[derive(Debug, Clone, Default)]
190pub struct PropagationStats {
191 pub updates_sent: u64,
193 pub bytes_sent: u64,
195 pub updates_dropped: u64,
197 pub avg_latency_ms: f32,
199 pub peak_latency_ms: u32,
201}
202
203impl PropagationStats {
204 pub fn new() -> Self {
205 Self::default()
206 }
207
208 pub fn record_send(&mut self, bytes: u64, latency_ms: u32) {
209 self.updates_sent += 1;
210 self.bytes_sent += bytes;
211
212 let n = self.updates_sent as f32;
214 self.avg_latency_ms = ((n - 1.0) * self.avg_latency_ms + latency_ms as f32) / n;
215
216 if latency_ms > self.peak_latency_ms {
217 self.peak_latency_ms = latency_ms;
218 }
219 }
220
221 pub fn record_drop(&mut self) {
222 self.updates_dropped += 1;
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use super::*;
229 use crate::InterestDeclaration;
230
231 #[test]
232 fn test_propagation_scheduler() {
233 let mut interests = InterestMap::new();
234 let source = NodeId::new(1);
235 let viewer1 = NodeId::new(2);
236 let viewer2 = NodeId::new(3);
237
238 interests.register(InterestDeclaration::new(viewer1, 100, InterestLevel::High));
239 interests.register(InterestDeclaration::new(viewer2, 100, InterestLevel::Low));
240
241 let mut topology = PropagationTopology::new();
242 topology.add_node(source);
243 topology.add_node(viewer1);
244 topology.add_node(viewer2);
245
246 let scheduler = PropagationScheduler::new(interests, topology);
247
248 let update = StateUpdate::new(100, source, 1, StateTime::from_millis(0));
249 let decisions = scheduler.schedule(&update);
250
251 assert_eq!(decisions.len(), 2);
252 assert_eq!(decisions[0].target, viewer1);
254 assert_eq!(decisions[0].priority, PropagationPriority::High);
255 }
256
257 #[test]
258 fn test_propagation_stats() {
259 let mut stats = PropagationStats::new();
260
261 stats.record_send(1000, 50);
262 stats.record_send(1000, 100);
263 stats.record_send(1000, 75);
264
265 assert_eq!(stats.updates_sent, 3);
266 assert_eq!(stats.bytes_sent, 3000);
267 assert_eq!(stats.peak_latency_ms, 100);
268 }
269}