Skip to main content

elara_time/
network.rs

1//! Network model for passive jitter/latency estimation
2
3use std::collections::HashMap;
4
5use elara_core::NodeId;
6
7/// Network statistics for a single peer
8#[derive(Clone, Debug)]
9pub struct PeerNetworkModel {
10    /// Estimated clock offset (local - remote)
11    pub offset: f64,
12    /// Estimated clock skew (drift rate)
13    pub skew: f64,
14    /// Jitter envelope (max deviation)
15    pub jitter_envelope: f64,
16    /// Recent latency samples
17    samples: Vec<f64>,
18    /// Maximum samples to keep
19    max_samples: usize,
20}
21
22impl PeerNetworkModel {
23    pub fn new() -> Self {
24        PeerNetworkModel {
25            offset: 0.0,
26            skew: 0.0,
27            jitter_envelope: 0.0,
28            samples: Vec::new(),
29            max_samples: 100,
30        }
31    }
32
33    /// Update with a new timing sample
34    pub fn update(&mut self, local_time: f64, remote_time: f64) {
35        let sample = local_time - remote_time;
36        self.samples.push(sample);
37
38        // Trim old samples
39        if self.samples.len() > self.max_samples {
40            self.samples.remove(0);
41        }
42
43        // Update estimates
44        if self.samples.len() >= 5 {
45            self.offset = Self::median(&self.samples);
46            self.jitter_envelope = self
47                .samples
48                .iter()
49                .map(|s| (s - self.offset).abs())
50                .fold(0.0, f64::max);
51        }
52    }
53
54    fn median(values: &[f64]) -> f64 {
55        let mut sorted = values.to_vec();
56        sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
57        let mid = sorted.len() / 2;
58        if sorted.len().is_multiple_of(2) {
59            (sorted[mid - 1] + sorted[mid]) / 2.0
60        } else {
61            sorted[mid]
62        }
63    }
64}
65
66impl Default for PeerNetworkModel {
67    fn default() -> Self {
68        Self::new()
69    }
70}
71
72/// Aggregate network model across all peers
73#[derive(Debug, Default)]
74pub struct NetworkModel {
75    /// Per-peer models
76    pub peers: HashMap<NodeId, PeerNetworkModel>,
77    /// Aggregate latency mean (seconds)
78    pub latency_mean: f64,
79    /// Aggregate jitter (seconds)
80    pub jitter: f64,
81    /// Estimated reorder depth
82    pub reorder_depth: u32,
83    /// Estimated loss rate (0.0 - 1.0)
84    pub loss_rate: f64,
85    /// Overall stability score (0.0 - 1.0)
86    pub stability_score: f64,
87}
88
89impl NetworkModel {
90    pub fn new() -> Self {
91        NetworkModel::default()
92    }
93
94    /// Update model from a received packet
95    pub fn update_from_packet(
96        &mut self,
97        peer: NodeId,
98        local_time: f64,
99        remote_time: f64,
100        _seq: u16,
101    ) {
102        let peer_model = self.peers.entry(peer).or_default();
103        peer_model.update(local_time, remote_time);
104
105        // Update aggregate statistics
106        self.update_aggregates();
107    }
108
109    /// Record a detected reorder
110    pub fn record_reorder(&mut self, depth: u32) {
111        self.reorder_depth = self.reorder_depth.max(depth);
112    }
113
114    /// Record packet loss
115    pub fn record_loss(&mut self, lost_count: u32, total_count: u32) {
116        if total_count > 0 {
117            let new_rate = lost_count as f64 / total_count as f64;
118            // Exponential moving average
119            self.loss_rate = self.loss_rate * 0.9 + new_rate * 0.1;
120        }
121    }
122
123    fn update_aggregates(&mut self) {
124        if self.peers.is_empty() {
125            return;
126        }
127
128        // Average jitter across peers
129        let total_jitter: f64 = self.peers.values().map(|p| p.jitter_envelope).sum();
130        self.jitter = total_jitter / self.peers.len() as f64;
131
132        // Compute stability score
133        let jitter_factor = 1.0 / (1.0 + self.jitter * 10.0);
134        let loss_factor = 1.0 - self.loss_rate;
135        let reorder_factor = 1.0 / (1.0 + self.reorder_depth as f64 * 0.1);
136
137        self.stability_score = (jitter_factor * loss_factor * reorder_factor).clamp(0.0, 1.0);
138    }
139
140    /// Get peer model
141    pub fn get_peer(&self, peer: NodeId) -> Option<&PeerNetworkModel> {
142        self.peers.get(&peer)
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    use super::*;
149
150    #[test]
151    fn test_peer_model_update() {
152        let mut model = PeerNetworkModel::new();
153
154        // Simulate samples with ~50ms offset and some jitter
155        for i in 0..20 {
156            let jitter = (i % 5) as f64 * 0.005; // 0-20ms jitter
157            model.update(1.0 + i as f64 * 0.1, 0.95 + i as f64 * 0.1 + jitter);
158        }
159
160        // Offset should be approximately 0.05 (50ms)
161        assert!((model.offset - 0.05).abs() < 0.02);
162    }
163
164    #[test]
165    fn test_network_model_stability() {
166        let mut model = NetworkModel::new();
167
168        // Good network
169        for i in 0..10 {
170            model.update_from_packet(NodeId::new(1), i as f64, i as f64 - 0.05, i as u16);
171        }
172
173        assert!(model.stability_score > 0.8);
174
175        // Add significant loss (50%)
176        model.record_loss(50, 100);
177        model.update_aggregates();
178
179        // With 50% loss recorded (EMA gives ~5% loss_rate), stability should drop
180        assert!(model.stability_score < 1.0);
181    }
182}