Skip to main content

fips_core/mmp/
mod.rs

1//! Metrics Measurement Protocol (MMP) — link-layer instantiation.
2//!
3//! Measures link quality between adjacent peers: RTT, loss, jitter,
4//! throughput, one-way delay trend, and ETX. Operates on the per-frame
5//! hooks (counter, timestamp, flags) introduced by the FMP wire format
6//! revision.
7//!
8//! Three operating modes trade measurement fidelity for overhead:
9//! - **Full**: sender + receiver reports at RTT-adaptive intervals
10//! - **Lightweight**: receiver reports only (infer loss from counters)
11//! - **Minimal**: spin bit + CE echo only, no reports
12
13use serde::{Deserialize, Serialize};
14use std::fmt::{self, Debug};
15use std::time::{Duration, Instant};
16
17// Sub-modules
18pub mod algorithms;
19pub mod metrics;
20pub mod receiver;
21pub mod report;
22pub mod sender;
23
24// Re-exports
25pub use algorithms::{
26    DualEwma, JitterEstimator, OwdTrendDetector, SpinBitState, SrttEstimator, compute_etx,
27};
28pub use metrics::MmpMetrics;
29pub use receiver::ReceiverState;
30pub use report::{ReceiverReport, SenderReport};
31pub use sender::SenderState;
32
33// Session-layer re-exports
34// MmpSessionState and PathMtuState are defined in this file
35
36// ============================================================================
37// Constants
38// ============================================================================
39
40/// SenderReport body size (after msg_type byte): 3 reserved + 44 payload = 47.
41pub const SENDER_REPORT_BODY_SIZE: usize = 47;
42
43/// ReceiverReport body size (after msg_type byte): 3 reserved + 64 payload = 67.
44pub const RECEIVER_REPORT_BODY_SIZE: usize = 67;
45
46/// SenderReport total wire size including inner header: 5 + 47 = 52.
47pub const SENDER_REPORT_WIRE_SIZE: usize = 52;
48
49/// ReceiverReport total wire size including inner header: 5 + 67 = 72.
50pub const RECEIVER_REPORT_WIRE_SIZE: usize = 72;
51
52// --- EWMA parameters (as shift amounts for integer arithmetic) ---
53
54/// Jitter EWMA: α = 1/16 (RFC 3550 §6.4.1).
55pub const JITTER_ALPHA_SHIFT: u32 = 4;
56
57/// SRTT: α = 1/8 (Jacobson, RFC 6298).
58pub const SRTT_ALPHA_SHIFT: u32 = 3;
59
60/// RTTVAR: β = 1/4 (Jacobson, RFC 6298).
61pub const RTTVAR_BETA_SHIFT: u32 = 2;
62
63/// Dual EWMA short-term: α = 1/4.
64pub const EWMA_SHORT_ALPHA: f64 = 0.25;
65
66/// Dual EWMA long-term: α = 1/32.
67pub const EWMA_LONG_ALPHA: f64 = 1.0 / 32.0;
68
69// --- Timing defaults (milliseconds) ---
70
71/// Default report interval before SRTT is available (cold start).
72pub const DEFAULT_COLD_START_INTERVAL_MS: u64 = 200;
73
74/// Minimum report interval (SRTT clamp floor).
75///
76/// Raised from 100ms to 1000ms: parent re-evaluation runs every 60s,
77/// so 60 samples/cycle is more than sufficient for EWMA convergence (~10).
78/// The cold-start phase uses `DEFAULT_COLD_START_INTERVAL_MS` (200ms) for
79/// fast initial SRTT convergence before transitioning to this floor.
80pub const MIN_REPORT_INTERVAL_MS: u64 = 1_000;
81
82/// Maximum report interval (SRTT clamp ceiling).
83pub const MAX_REPORT_INTERVAL_MS: u64 = 5_000;
84
85/// Number of SRTT samples before transitioning from cold-start to normal floor.
86///
87/// During cold-start, report intervals use `DEFAULT_COLD_START_INTERVAL_MS` as
88/// the floor to gather SRTT samples quickly. After this many updates, the floor
89/// switches to `MIN_REPORT_INTERVAL_MS`.
90pub const COLD_START_SAMPLES: u32 = 5;
91
92/// Default OWD ring buffer capacity.
93pub const DEFAULT_OWD_WINDOW_SIZE: usize = 32;
94
95/// Default operator log interval in seconds.
96pub const DEFAULT_LOG_INTERVAL_SECS: u64 = 30;
97
98// --- Session-layer timing defaults ---
99// Session reports are routed end-to-end (bandwidth cost on every transit link),
100// so intervals are higher than link-layer.
101
102/// Session-layer minimum report interval.
103pub const MIN_SESSION_REPORT_INTERVAL_MS: u64 = 500;
104
105/// Session-layer maximum report interval.
106pub const MAX_SESSION_REPORT_INTERVAL_MS: u64 = 10_000;
107
108/// Session-layer cold-start report interval (before SRTT is available).
109pub const SESSION_COLD_START_INTERVAL_MS: u64 = 1_000;
110
111// ============================================================================
112// Operating Mode
113// ============================================================================
114
115/// MMP operating mode.
116#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
117#[serde(rename_all = "lowercase")]
118pub enum MmpMode {
119    /// Sender + receiver reports at RTT-adaptive intervals. Maximum fidelity.
120    #[default]
121    Full,
122    /// Receiver reports only. Loss inferred from counter gaps.
123    Lightweight,
124    /// Spin bit + CE echo only. No reports exchanged.
125    Minimal,
126}
127
128impl fmt::Display for MmpMode {
129    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130        match self {
131            MmpMode::Full => write!(f, "full"),
132            MmpMode::Lightweight => write!(f, "lightweight"),
133            MmpMode::Minimal => write!(f, "minimal"),
134        }
135    }
136}
137
138// ============================================================================
139// Configuration
140// ============================================================================
141
142/// MMP configuration (`node.mmp.*`).
143#[derive(Debug, Clone, Serialize, Deserialize)]
144pub struct MmpConfig {
145    /// Operating mode (`node.mmp.mode`).
146    #[serde(default)]
147    pub mode: MmpMode,
148
149    /// Periodic operator log interval in seconds (`node.mmp.log_interval_secs`).
150    #[serde(default = "MmpConfig::default_log_interval_secs")]
151    pub log_interval_secs: u64,
152
153    /// OWD trend ring buffer size (`node.mmp.owd_window_size`).
154    #[serde(default = "MmpConfig::default_owd_window_size")]
155    pub owd_window_size: usize,
156}
157
158impl Default for MmpConfig {
159    fn default() -> Self {
160        Self {
161            mode: MmpMode::default(),
162            log_interval_secs: DEFAULT_LOG_INTERVAL_SECS,
163            owd_window_size: DEFAULT_OWD_WINDOW_SIZE,
164        }
165    }
166}
167
168impl MmpConfig {
169    fn default_log_interval_secs() -> u64 {
170        DEFAULT_LOG_INTERVAL_SECS
171    }
172    fn default_owd_window_size() -> usize {
173        DEFAULT_OWD_WINDOW_SIZE
174    }
175}
176
177// ============================================================================
178// Per-Peer MMP State
179// ============================================================================
180
181/// Combined MMP state for a single peer link.
182///
183/// Wraps sender, receiver, metrics, and spin bit state. One instance
184/// per `ActivePeer`.
185pub struct MmpPeerState {
186    pub sender: SenderState,
187    pub receiver: ReceiverState,
188    pub metrics: MmpMetrics,
189    pub spin_bit: SpinBitState,
190    mode: MmpMode,
191    log_interval: Duration,
192    last_log_time: Option<Instant>,
193}
194
195impl MmpPeerState {
196    /// Create MMP state for a new peer link.
197    ///
198    /// `is_initiator`: true if this node initiated the Noise handshake
199    /// (determines spin bit role).
200    pub fn new(config: &MmpConfig, is_initiator: bool) -> Self {
201        Self {
202            sender: SenderState::new(),
203            receiver: ReceiverState::new(config.owd_window_size),
204            metrics: MmpMetrics::new(),
205            spin_bit: SpinBitState::new(is_initiator),
206            mode: config.mode,
207            log_interval: Duration::from_secs(config.log_interval_secs),
208            last_log_time: None,
209        }
210    }
211
212    /// Reset counter-dependent state for rekey cutover.
213    pub fn reset_for_rekey(&mut self, now: Instant) {
214        self.receiver.reset_for_rekey(now);
215        self.metrics.reset_for_rekey();
216    }
217
218    /// Current operating mode.
219    pub fn mode(&self) -> MmpMode {
220        self.mode
221    }
222
223    /// Check if it's time to emit a periodic metrics log.
224    pub fn should_log(&self, now: Instant) -> bool {
225        match self.last_log_time {
226            None => true,
227            Some(last) => now.duration_since(last) >= self.log_interval,
228        }
229    }
230
231    /// Mark that a periodic log was emitted.
232    pub fn mark_logged(&mut self, now: Instant) {
233        self.last_log_time = Some(now);
234    }
235}
236
237// ============================================================================
238// Per-Session MMP State (session-layer instantiation)
239// ============================================================================
240
241/// Combined MMP state for a single end-to-end session.
242///
243/// Wraps sender, receiver, metrics, spin bit, and path MTU state.
244/// One instance per established `SessionEntry`.
245pub struct MmpSessionState {
246    pub sender: SenderState,
247    pub receiver: ReceiverState,
248    pub metrics: MmpMetrics,
249    pub spin_bit: SpinBitState,
250    mode: MmpMode,
251    log_interval: Duration,
252    last_log_time: Option<Instant>,
253    pub path_mtu: PathMtuState,
254}
255
256impl MmpSessionState {
257    /// Create MMP state for a new session.
258    ///
259    /// `is_initiator`: true if this node initiated the Noise handshake
260    /// (determines spin bit role).
261    pub fn new(config: &crate::config::SessionMmpConfig, is_initiator: bool) -> Self {
262        Self {
263            sender: SenderState::new_with_cold_start(SESSION_COLD_START_INTERVAL_MS),
264            receiver: ReceiverState::new_with_cold_start(
265                config.owd_window_size,
266                SESSION_COLD_START_INTERVAL_MS,
267            ),
268            metrics: MmpMetrics::new(),
269            spin_bit: SpinBitState::new(is_initiator),
270            mode: config.mode,
271            log_interval: Duration::from_secs(config.log_interval_secs),
272            last_log_time: None,
273            path_mtu: PathMtuState::new(),
274        }
275    }
276
277    /// Reset counter-dependent state for rekey cutover.
278    pub fn reset_for_rekey(&mut self, now: Instant) {
279        self.receiver.reset_for_rekey(now);
280        self.metrics.reset_for_rekey();
281    }
282
283    /// Current operating mode.
284    pub fn mode(&self) -> MmpMode {
285        self.mode
286    }
287
288    /// Check if it's time to emit a periodic metrics log.
289    pub fn should_log(&self, now: Instant) -> bool {
290        match self.last_log_time {
291            None => true,
292            Some(last) => now.duration_since(last) >= self.log_interval,
293        }
294    }
295
296    /// Mark that a periodic log was emitted.
297    pub fn mark_logged(&mut self, now: Instant) {
298        self.last_log_time = Some(now);
299    }
300}
301
302impl Debug for MmpSessionState {
303    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
304        f.debug_struct("MmpSessionState")
305            .field("mode", &self.mode)
306            .field("path_mtu", &self.path_mtu.current_mtu())
307            .finish_non_exhaustive()
308    }
309}
310
311// ============================================================================
312// Path MTU State (session-layer only)
313// ============================================================================
314
315/// Path MTU tracking for a single session.
316///
317/// Destination side: observes `path_mtu` from incoming SessionDatagram envelopes
318/// and generates PathMtuNotification messages back to the source.
319///
320/// Source side: applies received PathMtuNotification to limit outbound datagram
321/// size. Decrease is immediate; increase requires 3 consecutive notifications.
322pub struct PathMtuState {
323    /// Current effective path MTU (what we use for sending).
324    current_mtu: u16,
325    /// Last observed path MTU from incoming datagrams (destination-side).
326    last_observed_mtu: u16,
327    /// Whether the observed MTU has changed since the last notification.
328    observed_changed: bool,
329    /// Last time a PathMtuNotification was sent.
330    last_notification_time: Option<Instant>,
331    /// Notification interval: max(10s, 5 * SRTT). Default 10s.
332    notification_interval: Duration,
333    /// For source-side increase tracking: consecutive higher-value notifications.
334    consecutive_increase_count: u8,
335    /// Time of the first notification in the current increase sequence.
336    first_increase_time: Option<Instant>,
337    /// The MTU value being proposed for increase.
338    pending_increase_mtu: u16,
339}
340
341impl PathMtuState {
342    /// Create path MTU state with no initial measurement.
343    pub fn new() -> Self {
344        Self {
345            current_mtu: u16::MAX,
346            last_observed_mtu: u16::MAX,
347            observed_changed: false,
348            last_notification_time: None,
349            notification_interval: Duration::from_secs(10),
350            consecutive_increase_count: 0,
351            first_increase_time: None,
352            pending_increase_mtu: 0,
353        }
354    }
355
356    /// Current effective path MTU (source-side, for sending).
357    pub fn current_mtu(&self) -> u16 {
358        self.current_mtu
359    }
360
361    /// Last observed incoming path MTU (destination-side).
362    pub fn last_observed_mtu(&self) -> u16 {
363        self.last_observed_mtu
364    }
365
366    /// Update notification interval from SRTT: max(10s, 5 * SRTT).
367    pub fn update_interval_from_srtt(&mut self, srtt_ms: f64) {
368        let five_srtt = Duration::from_millis((srtt_ms * 5.0) as u64);
369        self.notification_interval = five_srtt.max(Duration::from_secs(10));
370    }
371
372    /// Seed source-side current_mtu from outbound transport MTU.
373    ///
374    /// Called on each send. Only decreases (never increases) the current_mtu
375    /// so the destination's PathMtuNotification can still raise it later.
376    /// Ensures current_mtu doesn't stay at u16::MAX before any notification
377    /// arrives from the destination.
378    pub fn seed_source_mtu(&mut self, outbound_mtu: u16) {
379        if outbound_mtu < self.current_mtu {
380            self.current_mtu = outbound_mtu;
381        }
382    }
383
384    // --- Destination side ---
385
386    /// Observe the path_mtu from an incoming SessionDatagram envelope.
387    ///
388    /// Called on the destination (receiver) side for every session message.
389    pub fn observe_incoming_mtu(&mut self, path_mtu: u16) {
390        if path_mtu != self.last_observed_mtu {
391            self.observed_changed = true;
392            self.last_observed_mtu = path_mtu;
393        }
394    }
395
396    /// Check if a PathMtuNotification should be sent.
397    ///
398    /// Send on first measurement, on decrease (immediate), or periodic
399    /// confirmation at the notification interval.
400    pub fn should_send_notification(&self, now: Instant) -> bool {
401        if self.last_observed_mtu == u16::MAX {
402            return false; // No measurement yet
403        }
404        match self.last_notification_time {
405            None => true, // First measurement
406            Some(last) => {
407                // Immediate on decrease
408                if self.observed_changed && self.last_observed_mtu < self.current_mtu {
409                    return true;
410                }
411                // Periodic confirmation
412                now.duration_since(last) >= self.notification_interval
413            }
414        }
415    }
416
417    /// Build a PathMtuNotification from current state.
418    ///
419    /// Returns the path_mtu value to send. Caller handles encoding.
420    pub fn build_notification(&mut self, now: Instant) -> Option<u16> {
421        if self.last_observed_mtu == u16::MAX {
422            return None;
423        }
424        self.last_notification_time = Some(now);
425        self.observed_changed = false;
426        Some(self.last_observed_mtu)
427    }
428
429    // --- Source side ---
430
431    /// Apply a received PathMtuNotification.
432    ///
433    /// - Decrease: immediate (take the lower value).
434    /// - Increase: require 3 consecutive notifications with the same higher
435    ///   value, spanning at least 2 * notification_interval.
436    ///
437    /// Returns `true` if the effective MTU changed.
438    pub fn apply_notification(&mut self, reported_mtu: u16, now: Instant) -> bool {
439        if reported_mtu < self.current_mtu {
440            // Decrease: immediate
441            self.current_mtu = reported_mtu;
442            self.consecutive_increase_count = 0;
443            self.first_increase_time = None;
444            return true;
445        }
446
447        if reported_mtu > self.current_mtu {
448            // Increase: track consecutive notifications
449            if reported_mtu == self.pending_increase_mtu {
450                self.consecutive_increase_count += 1;
451            } else {
452                // Different value: reset sequence
453                self.pending_increase_mtu = reported_mtu;
454                self.consecutive_increase_count = 1;
455                self.first_increase_time = Some(now);
456            }
457
458            // Accept increase after 3 consecutive spanning 2 * interval
459            if self.consecutive_increase_count >= 3
460                && let Some(first_time) = self.first_increase_time
461            {
462                let required = self.notification_interval * 2;
463                if now.duration_since(first_time) >= required {
464                    self.current_mtu = reported_mtu;
465                    self.consecutive_increase_count = 0;
466                    self.first_increase_time = None;
467                    return true;
468                }
469            }
470        }
471
472        // No change (equal or increase not yet confirmed)
473        false
474    }
475}
476
477impl Default for PathMtuState {
478    fn default() -> Self {
479        Self::new()
480    }
481}
482
483impl Debug for MmpPeerState {
484    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
485        f.debug_struct("MmpPeerState")
486            .field("mode", &self.mode)
487            .finish_non_exhaustive()
488    }
489}
490
491// ============================================================================
492// Tests
493// ============================================================================
494
495#[cfg(test)]
496mod tests {
497    use super::*;
498
499    #[test]
500    fn test_mode_default() {
501        assert_eq!(MmpMode::default(), MmpMode::Full);
502    }
503
504    #[test]
505    fn test_mode_display() {
506        assert_eq!(MmpMode::Full.to_string(), "full");
507        assert_eq!(MmpMode::Lightweight.to_string(), "lightweight");
508        assert_eq!(MmpMode::Minimal.to_string(), "minimal");
509    }
510
511    #[test]
512    fn test_mode_serde_roundtrip() {
513        let yaml = "full";
514        let mode: MmpMode = serde_yaml::from_str(yaml).unwrap();
515        assert_eq!(mode, MmpMode::Full);
516
517        let yaml = "lightweight";
518        let mode: MmpMode = serde_yaml::from_str(yaml).unwrap();
519        assert_eq!(mode, MmpMode::Lightweight);
520
521        let yaml = "minimal";
522        let mode: MmpMode = serde_yaml::from_str(yaml).unwrap();
523        assert_eq!(mode, MmpMode::Minimal);
524    }
525
526    #[test]
527    fn test_config_default() {
528        let config = MmpConfig::default();
529        assert_eq!(config.mode, MmpMode::Full);
530        assert_eq!(config.log_interval_secs, 30);
531        assert_eq!(config.owd_window_size, 32);
532    }
533
534    #[test]
535    fn test_config_yaml_parse() {
536        let yaml = r#"
537mode: lightweight
538log_interval_secs: 60
539owd_window_size: 48
540"#;
541        let config: MmpConfig = serde_yaml::from_str(yaml).unwrap();
542        assert_eq!(config.mode, MmpMode::Lightweight);
543        assert_eq!(config.log_interval_secs, 60);
544        assert_eq!(config.owd_window_size, 48);
545    }
546
547    #[test]
548    fn test_config_yaml_partial() {
549        let yaml = "mode: minimal";
550        let config: MmpConfig = serde_yaml::from_str(yaml).unwrap();
551        assert_eq!(config.mode, MmpMode::Minimal);
552        assert_eq!(config.log_interval_secs, DEFAULT_LOG_INTERVAL_SECS);
553        assert_eq!(config.owd_window_size, DEFAULT_OWD_WINDOW_SIZE);
554    }
555}