Skip to main content

elara_diffusion/
propagation.rs

1//! Propagation - How state flows through the network
2//!
3//! State propagation rules and scheduling.
4
5use crate::{InterestLevel, InterestMap, PropagationTopology};
6use elara_core::{NodeId, StateTime};
7
8/// Propagation priority
9#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
10pub enum PropagationPriority {
11    /// Background - send when bandwidth available
12    Background = 0,
13    /// Normal - regular priority
14    Normal = 1,
15    /// High - prioritize over normal
16    High = 2,
17    /// Urgent - send immediately
18    Urgent = 3,
19}
20
21/// State update to propagate
22#[derive(Debug, Clone)]
23pub struct StateUpdate {
24    /// State ID
25    pub state_id: u64,
26    /// Source node (authority)
27    pub source: NodeId,
28    /// Sequence number
29    pub sequence: u64,
30    /// Timestamp
31    pub timestamp: StateTime,
32    /// Priority
33    pub priority: PropagationPriority,
34    /// Payload size in bytes
35    pub size: usize,
36    /// Is this a keyframe?
37    pub is_keyframe: bool,
38}
39
40impl StateUpdate {
41    pub fn new(state_id: u64, source: NodeId, sequence: u64, timestamp: StateTime) -> Self {
42        Self {
43            state_id,
44            source,
45            sequence,
46            timestamp,
47            priority: PropagationPriority::Normal,
48            size: 0,
49            is_keyframe: false,
50        }
51    }
52
53    pub fn with_priority(mut self, priority: PropagationPriority) -> Self {
54        self.priority = priority;
55        self
56    }
57
58    pub fn with_size(mut self, size: usize) -> Self {
59        self.size = size;
60        self
61    }
62
63    pub fn keyframe(mut self) -> Self {
64        self.is_keyframe = true;
65        self
66    }
67}
68
69/// Propagation decision for a specific node
70#[derive(Debug, Clone)]
71pub struct PropagationDecision {
72    /// Target node
73    pub target: NodeId,
74    /// Should we send to this node?
75    pub should_send: bool,
76    /// Priority for this target
77    pub priority: PropagationPriority,
78    /// Delay before sending (for rate limiting)
79    pub delay_ms: u32,
80    /// Quality level (for degradation)
81    pub quality_level: u8,
82}
83
84/// Propagation scheduler
85#[derive(Debug)]
86pub struct PropagationScheduler {
87    /// Interest map
88    interests: InterestMap,
89    /// Topology
90    topology: PropagationTopology,
91    /// Bandwidth budget per node (bytes per second)
92    bandwidth_budget: u32,
93    /// Current bandwidth usage per node
94    bandwidth_usage: std::collections::HashMap<NodeId, u32>,
95}
96
97impl PropagationScheduler {
98    /// Create a new scheduler
99    pub fn new(interests: InterestMap, topology: PropagationTopology) -> Self {
100        Self {
101            interests,
102            topology,
103            bandwidth_budget: 1_000_000, // 1 MB/s default
104            bandwidth_usage: std::collections::HashMap::new(),
105        }
106    }
107
108    /// Set bandwidth budget
109    pub fn set_bandwidth_budget(&mut self, bytes_per_second: u32) {
110        self.bandwidth_budget = bytes_per_second;
111    }
112
113    /// Decide how to propagate an update
114    pub fn schedule(&self, update: &StateUpdate) -> Vec<PropagationDecision> {
115        let mut decisions = Vec::new();
116
117        // Get all interested nodes
118        let interested = self.interests.interested_nodes(update.state_id);
119
120        for (node, interest_level) in interested {
121            // Skip the source
122            if node == update.source {
123                continue;
124            }
125
126            // Check if node is reachable in topology
127            if !self.topology.has_node(node) {
128                continue;
129            }
130
131            // Determine priority based on interest level
132            let priority = match interest_level {
133                InterestLevel::Critical => PropagationPriority::Urgent,
134                InterestLevel::High => PropagationPriority::High,
135                InterestLevel::Medium => PropagationPriority::Normal,
136                InterestLevel::Low => PropagationPriority::Background,
137                InterestLevel::None => continue,
138            };
139
140            // Determine quality level based on interest
141            let quality_level = match interest_level {
142                InterestLevel::Critical | InterestLevel::High => 0, // Full quality
143                InterestLevel::Medium => 1,                         // Slight reduction
144                InterestLevel::Low => 2,                            // Significant reduction
145                InterestLevel::None => continue,
146            };
147
148            // Calculate delay based on priority
149            let delay_ms = match priority {
150                PropagationPriority::Urgent => 0,
151                PropagationPriority::High => 10,
152                PropagationPriority::Normal => 50,
153                PropagationPriority::Background => 200,
154            };
155
156            decisions.push(PropagationDecision {
157                target: node,
158                should_send: true,
159                priority,
160                delay_ms,
161                quality_level,
162            });
163        }
164
165        // Sort by priority (highest first)
166        decisions.sort_by(|a, b| b.priority.cmp(&a.priority));
167
168        decisions
169    }
170
171    /// Update bandwidth usage
172    pub fn record_send(&mut self, target: NodeId, bytes: u32) {
173        *self.bandwidth_usage.entry(target).or_insert(0) += bytes;
174    }
175
176    /// Reset bandwidth usage (call periodically)
177    pub fn reset_bandwidth(&mut self) {
178        self.bandwidth_usage.clear();
179    }
180
181    /// Check if we have bandwidth for a send
182    pub fn has_bandwidth(&self, target: NodeId, bytes: u32) -> bool {
183        let used = self.bandwidth_usage.get(&target).copied().unwrap_or(0);
184        used + bytes <= self.bandwidth_budget
185    }
186}
187
188/// Propagation statistics
189#[derive(Debug, Clone, Default)]
190pub struct PropagationStats {
191    /// Total updates sent
192    pub updates_sent: u64,
193    /// Total bytes sent
194    pub bytes_sent: u64,
195    /// Updates dropped (bandwidth limit)
196    pub updates_dropped: u64,
197    /// Average latency in milliseconds
198    pub avg_latency_ms: f32,
199    /// Peak latency in milliseconds
200    pub peak_latency_ms: u32,
201}
202
203impl PropagationStats {
204    pub fn new() -> Self {
205        Self::default()
206    }
207
208    pub fn record_send(&mut self, bytes: u64, latency_ms: u32) {
209        self.updates_sent += 1;
210        self.bytes_sent += bytes;
211
212        // Update average latency
213        let n = self.updates_sent as f32;
214        self.avg_latency_ms = ((n - 1.0) * self.avg_latency_ms + latency_ms as f32) / n;
215
216        if latency_ms > self.peak_latency_ms {
217            self.peak_latency_ms = latency_ms;
218        }
219    }
220
221    pub fn record_drop(&mut self) {
222        self.updates_dropped += 1;
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229    use crate::InterestDeclaration;
230
231    #[test]
232    fn test_propagation_scheduler() {
233        let mut interests = InterestMap::new();
234        let source = NodeId::new(1);
235        let viewer1 = NodeId::new(2);
236        let viewer2 = NodeId::new(3);
237
238        interests.register(InterestDeclaration::new(viewer1, 100, InterestLevel::High));
239        interests.register(InterestDeclaration::new(viewer2, 100, InterestLevel::Low));
240
241        let mut topology = PropagationTopology::new();
242        topology.add_node(source);
243        topology.add_node(viewer1);
244        topology.add_node(viewer2);
245
246        let scheduler = PropagationScheduler::new(interests, topology);
247
248        let update = StateUpdate::new(100, source, 1, StateTime::from_millis(0));
249        let decisions = scheduler.schedule(&update);
250
251        assert_eq!(decisions.len(), 2);
252        // High interest should be first
253        assert_eq!(decisions[0].target, viewer1);
254        assert_eq!(decisions[0].priority, PropagationPriority::High);
255    }
256
257    #[test]
258    fn test_propagation_stats() {
259        let mut stats = PropagationStats::new();
260
261        stats.record_send(1000, 50);
262        stats.record_send(1000, 100);
263        stats.record_send(1000, 75);
264
265        assert_eq!(stats.updates_sent, 3);
266        assert_eq!(stats.bytes_sent, 3000);
267        assert_eq!(stats.peak_latency_ms, 100);
268    }
269}