net-mesh 0.23.0

High-performance, schema-agnostic, backend-agnostic event bus
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
//! `DataGravityPolicy` — per-channel tuning for the heat-counter
//! emission cycle + the pure emission-decision function the
//! runtime hooks call after every read.
//!
//! Decision shape:
//!
//! - **Emit a new tag** when `(current_rate / last_emitted) ≥
//!   emit_threshold_ratio` (default `2.0`) — hot enough to
//!   surface to peers.
//! - **Withdraw** (emit rate=0) when the rate decays to zero —
//!   peers drop the heat annotation.
//! - **Otherwise** suppress emission. Re-emission floods the
//!   capability-announcement bus; the throttle keeps wire
//!   traffic bounded.
//!
//! Locked defaults from `DATAFORTS_PLAN.md` § Phase 4.

use std::time::Duration;

/// Default emission threshold ratio. `2.0` means a heat tag
/// re-emits only when the read rate doubles (or halves) since
/// the last emission. Chosen to keep capability-announcement
/// traffic bounded under steady-state read patterns; a workload
/// that fluctuates uniformly across the threshold emits at most
/// `log2(peak / baseline)` tags per channel per lifetime.
pub const DEFAULT_EMIT_THRESHOLD_RATIO: f32 = 2.0;

/// Default decay half-life. `30 min` — fast enough that read
/// patterns flowing over operator-relevant timescales (minutes,
/// not hours) surface as heat changes, slow enough that
/// transient bursts don't churn the emission path.
pub const DEFAULT_DECAY_HALF_LIFE_SECS: u64 = 30 * 60;

/// Minimum emission ratio. `1.01` lets operators bias toward
/// "emit aggressively" without permitting 1.0 (which would
/// re-emit on every bump — pathological). Below this fires
/// the validator at construction.
pub const MIN_EMIT_THRESHOLD_RATIO: f32 = 1.01;

/// Maximum emission ratio. `10.0` is a sanity ceiling — above
/// this the policy approaches "never emit," which is equivalent
/// to disabling the feature.
pub const MAX_EMIT_THRESHOLD_RATIO: f32 = 10.0;

/// Default reference rate for wire normalization. The substrate
/// maps unbounded local read-rate to the wire's `[0.0, 1.0]`
/// encoding via `ln_1p(rate) / ln_1p(reference)` so the rate
/// distribution stretches across the wire range instead of
/// saturating at the top end the way `rate / (rate + 1)` does.
///
/// `1000.0` means "rate=1000 maps to 1.0 on the wire." A typical
/// hot-chain rate (10-100 reads/window) sits at ~0.34-0.67 under
/// this scale — useful dynamic range — while extremely hot
/// (10000+) saturates as expected.
pub const DEFAULT_NORMALIZATION_REFERENCE_RATE: f32 = 1000.0;

/// Floor on the normalization reference rate. Values at or below
/// 1.0 collapse the log scale (`ln_1p(reference)` would be
/// near-zero and division blows up). 1.5 is a defensive lower
/// bound; below this the validator fires at construction.
pub const MIN_NORMALIZATION_REFERENCE_RATE: f32 = 1.5;

/// Per-channel configuration for the data-gravity heat-counter
/// emission cycle. Installed at runtime via
/// [`super::super::super::redex::Redex::enable_gravity_for_greedy`]
/// against a running mesh — no `RedexFileConfig` field; gravity is
/// a runtime policy toggle (see `DATAFORTS_PLAN.md` § Runtime
/// toggles vs. compile-time flags).
///
/// Validation rules (enforced by [`Self::validate`]):
///
/// - `emit_threshold_ratio` is finite,
///   `>= MIN_EMIT_THRESHOLD_RATIO`, `<= MAX_EMIT_THRESHOLD_RATIO`.
/// - `decay_half_life` is non-zero.
#[derive(Debug, Clone)]
pub struct DataGravityPolicy {
    /// Whether the counter + emission cycle is active for the
    /// channel. `false` keeps the policy carried through config
    /// surfaces without spinning up the per-channel state.
    pub enabled: bool,
    /// Re-emission threshold. Default
    /// [`DEFAULT_EMIT_THRESHOLD_RATIO`].
    pub emit_threshold_ratio: f32,
    /// Exponential-decay half-life for the read rate. Default
    /// [`DEFAULT_DECAY_HALF_LIFE_SECS`].
    pub decay_half_life: Duration,
    /// Reference rate the wire normalization uses to map raw
    /// read-rate to `[0.0, 1.0]`. See
    /// [`DEFAULT_NORMALIZATION_REFERENCE_RATE`].
    pub normalization_reference_rate: f32,
}

impl Default for DataGravityPolicy {
    fn default() -> Self {
        Self {
            enabled: true,
            emit_threshold_ratio: DEFAULT_EMIT_THRESHOLD_RATIO,
            decay_half_life: Duration::from_secs(DEFAULT_DECAY_HALF_LIFE_SECS),
            normalization_reference_rate: DEFAULT_NORMALIZATION_REFERENCE_RATE,
        }
    }
}

impl DataGravityPolicy {
    /// Construct with the locked Phase-4 defaults.
    pub fn new() -> Self {
        Self::default()
    }

    /// Builder: set the enabled flag.
    pub fn with_enabled(mut self, enabled: bool) -> Self {
        self.enabled = enabled;
        self
    }

    /// Builder: set the emission threshold ratio.
    pub fn with_emit_threshold_ratio(mut self, ratio: f32) -> Self {
        self.emit_threshold_ratio = ratio;
        self
    }

    /// Builder: set the decay half-life.
    pub fn with_decay_half_life(mut self, half_life: Duration) -> Self {
        self.decay_half_life = half_life;
        self
    }

    /// Builder: set the wire normalization reference rate. See
    /// [`DEFAULT_NORMALIZATION_REFERENCE_RATE`] for the meaning
    /// and rate-to-wire calibration.
    pub fn with_normalization_reference_rate(mut self, reference: f32) -> Self {
        self.normalization_reference_rate = reference;
        self
    }

    /// Map a raw unbounded read-rate into the wire-side `[0.0, 1.0]`
    /// range using log-scale normalization against
    /// [`Self::normalization_reference_rate`]. Saturates at 1.0
    /// for rates beyond the reference; never returns NaN.
    pub fn normalize_rate_for_wire(&self, rate: f64) -> f64 {
        if !rate.is_finite() || rate <= 0.0 {
            return 0.0;
        }
        let reference = self
            .normalization_reference_rate
            .max(MIN_NORMALIZATION_REFERENCE_RATE) as f64;
        let denom = reference.ln_1p();
        if denom <= 0.0 {
            return 0.0;
        }
        (rate.ln_1p() / denom).clamp(0.0, 1.0)
    }

    /// Validate the locked invariants. Returns a typed error
    /// naming the offending field so binding-layer callers can
    /// surface operator-friendly diagnostics.
    pub fn validate(&self) -> Result<(), DataGravityPolicyError> {
        if !self.emit_threshold_ratio.is_finite()
            || self.emit_threshold_ratio < MIN_EMIT_THRESHOLD_RATIO
            || self.emit_threshold_ratio > MAX_EMIT_THRESHOLD_RATIO
        {
            return Err(DataGravityPolicyError::EmitThresholdOutOfRange {
                got: self.emit_threshold_ratio,
                min: MIN_EMIT_THRESHOLD_RATIO,
                max: MAX_EMIT_THRESHOLD_RATIO,
            });
        }
        if self.decay_half_life.is_zero() {
            return Err(DataGravityPolicyError::DecayHalfLifeZero);
        }
        if !self.normalization_reference_rate.is_finite()
            || self.normalization_reference_rate < MIN_NORMALIZATION_REFERENCE_RATE
        {
            return Err(DataGravityPolicyError::NormalizationReferenceTooLow {
                got: self.normalization_reference_rate,
                min: MIN_NORMALIZATION_REFERENCE_RATE,
            });
        }
        Ok(())
    }
}

/// Outcome of the pure emission-decision function. Names the
/// path so the runtime can route the right metric bump.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum EmissionDecision {
    /// No emission — suppress. Rate hasn't crossed the
    /// threshold; the last-emitted value is still
    /// representative.
    Suppress,
    /// Emit a new heat tag with the supplied rate.
    Emit {
        /// Decayed read-rate to carry on the wire. The substrate
        /// clamps to `[0.0, 1.0]` at emission time.
        rate: f64,
    },
    /// Withdraw — emit `heat:<chain>=0`. Peers drop the
    /// annotation.
    Withdraw,
}

/// Pure-function emission gate. Caller passes the current
/// (decayed) read rate, the last-emitted value (or `None` if
/// no prior emission), and the configured policy. Returns the
/// `EmissionDecision` the runtime acts on.
///
/// Locked semantics:
///
/// - `current_rate == 0.0` + `last_emitted == Some(>0)` → withdraw.
/// - `current_rate > 0` + `last_emitted == None` → emit (first
///   announcement).
/// - `current_rate > 0` + `last_emitted == Some(prev)` and
///   `current_rate / prev >= ratio` (or
///   `prev / current_rate >= ratio` when rate fell) → emit.
/// - Otherwise → suppress.
pub fn should_emit_heat(
    current_rate: f64,
    last_emitted: Option<f64>,
    policy: &DataGravityPolicy,
) -> EmissionDecision {
    if !policy.enabled {
        return EmissionDecision::Suppress;
    }
    // Reject non-finite / negative rates defensively — the
    // caller's decay loop should never produce these but a
    // misuse should not corrupt the emission path.
    if !current_rate.is_finite() || current_rate < 0.0 {
        return EmissionDecision::Suppress;
    }
    let ratio = policy.emit_threshold_ratio as f64;
    match (last_emitted, current_rate) {
        // No prior emission — emit if there's any heat to surface.
        (None, r) if r > 0.0 => EmissionDecision::Emit { rate: r },
        // No prior, no heat — nothing to do.
        (None, _) => EmissionDecision::Suppress,
        // Withdrawn-to-zero — emit a withdrawal tag so peers
        // drop the annotation.
        (Some(prev), 0.0) if prev > 0.0 => EmissionDecision::Withdraw,
        // Already withdrawn + still zero — suppress.
        (Some(_), 0.0) => EmissionDecision::Suppress,
        // Rate moved meaningfully — emit. Symmetric:
        // rate doubled (current/prev >= ratio) OR
        // rate halved (prev/current >= ratio).
        (Some(prev), r) => {
            // Guard against `inf` / `NaN`: a `prev` near zero
            // makes `r / prev` explode toward +inf, and a `r` near
            // zero makes `prev / r` explode the same way. The
            // bootstrap branch treats both as "no prior emission"
            // so the ratio test is only reached with normal,
            // strictly-positive values on both sides.
            //
            // `is_normal()` rejects NaN, ±inf, 0.0, and subnormals
            // (≈ < 2.225e-308). `f64::EPSILON` ≈ 2.22e-16 is the
            // smallest *meaningful* divisor under the ratio scale
            // we operate at (rates of 0.01 - 1000); below that the
            // signal isn't distinguishable from rounding noise so
            // we route through the bootstrap arm regardless.
            let prev_safe = prev.is_normal() && prev > f64::EPSILON;
            let r_safe = r.is_normal() && r > f64::EPSILON;
            let bootstrap = !prev_safe || !r_safe;
            let crossed_threshold = !bootstrap && ((r / prev) >= ratio || (prev / r) >= ratio);
            if bootstrap || crossed_threshold {
                EmissionDecision::Emit { rate: r }
            } else {
                EmissionDecision::Suppress
            }
        }
    }
}

/// Validation errors for [`DataGravityPolicy`].
#[derive(Debug, thiserror::Error, PartialEq)]
pub enum DataGravityPolicyError {
    /// `emit_threshold_ratio` outside `[1.01, 10.0]` or non-finite.
    #[error("data-gravity emit_threshold_ratio {got} outside [{min}, {max}] or non-finite")]
    EmitThresholdOutOfRange {
        /// Configured value.
        got: f32,
        /// Minimum permitted value.
        min: f32,
        /// Maximum permitted value.
        max: f32,
    },
    /// `decay_half_life` is zero. A zero half-life decays
    /// instantly and produces a flapping emission cycle.
    #[error("data-gravity decay_half_life must be non-zero")]
    DecayHalfLifeZero,
    /// `enable_gravity_for_greedy` was called before greedy
    /// itself was installed. Operators must call
    /// `enable_greedy_dataforts` first.
    #[error("data-gravity requires greedy to be enabled first")]
    GreedyNotEnabled,
    /// `normalization_reference_rate` below
    /// [`MIN_NORMALIZATION_REFERENCE_RATE`] or non-finite.
    /// Values too close to `1.0` collapse the log scale.
    #[error("data-gravity normalization_reference_rate {got} below floor {min} or non-finite")]
    NormalizationReferenceTooLow {
        /// Configured value.
        got: f32,
        /// Minimum permitted value.
        min: f32,
    },
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn default_policy_validates() {
        DataGravityPolicy::default()
            .validate()
            .expect("defaults must validate");
    }

    #[test]
    fn emit_threshold_below_min_rejected() {
        let p = DataGravityPolicy::default().with_emit_threshold_ratio(1.0);
        let err = p.validate().expect_err("ratio 1.0 must reject");
        assert!(matches!(
            err,
            DataGravityPolicyError::EmitThresholdOutOfRange { .. }
        ));
    }

    #[test]
    fn emit_threshold_above_max_rejected() {
        let p = DataGravityPolicy::default().with_emit_threshold_ratio(20.0);
        let err = p.validate().expect_err("ratio 20.0 must reject");
        assert!(matches!(
            err,
            DataGravityPolicyError::EmitThresholdOutOfRange { .. }
        ));
    }

    #[test]
    fn emit_threshold_nan_rejected() {
        let p = DataGravityPolicy::default().with_emit_threshold_ratio(f32::NAN);
        let err = p.validate().expect_err("NaN ratio must reject");
        assert!(matches!(
            err,
            DataGravityPolicyError::EmitThresholdOutOfRange { .. }
        ));
    }

    #[test]
    fn decay_half_life_zero_rejected() {
        let p = DataGravityPolicy::default().with_decay_half_life(Duration::ZERO);
        let err = p.validate().expect_err("zero half-life must reject");
        assert!(matches!(err, DataGravityPolicyError::DecayHalfLifeZero));
    }

    #[test]
    fn normalization_reference_too_low_rejected() {
        let p = DataGravityPolicy::default().with_normalization_reference_rate(1.0);
        let err = p.validate().expect_err("reference 1.0 must reject");
        assert!(matches!(
            err,
            DataGravityPolicyError::NormalizationReferenceTooLow { .. }
        ));
    }

    #[test]
    fn should_emit_heat_handles_subnormal_prev() {
        // Subnormal `prev` (below `f64::MIN_POSITIVE_NORMAL`) used
        // to be a hazard: `r / prev` overflows to `inf` and the
        // ratio test wrongly emits. The bootstrap branch should
        // catch it via `is_normal()` and re-route to emit-fresh.
        // Pin the behavior so a future refactor can't lose it.
        let p = policy();
        // MIN_POSITIVE (5e-324) is subnormal; r=1.0 is normal.
        let decision = should_emit_heat(1.0, Some(f64::MIN_POSITIVE), &p);
        match decision {
            EmissionDecision::Emit { rate } => assert!((rate - 1.0).abs() < 1e-9),
            other => panic!("subnormal prev must route to Emit, got {:?}", other),
        }
        // MIN_POSITIVE_SUBNORMAL (5e-324) for r, normal prev.
        let decision = should_emit_heat(f64::MIN_POSITIVE, Some(1.0), &p);
        assert!(matches!(decision, EmissionDecision::Emit { .. }));
        // EPSILON-boundary: prev at EPSILON should also route to
        // bootstrap (avoid noise-level signals on the ratio test).
        let decision = should_emit_heat(1.0, Some(f64::EPSILON), &p);
        assert!(matches!(decision, EmissionDecision::Emit { .. }));
    }

    #[test]
    fn normalize_rate_for_wire_log_scale_has_dynamic_range() {
        // The old `rate / (rate + 1)` formula collapses everything
        // above ~rate=10 into the 0.9x band. Log scale must give
        // visibly different wire values for rates that differ by
        // an order of magnitude in the operating range.
        let p = DataGravityPolicy::default();
        let v_1 = p.normalize_rate_for_wire(1.0);
        let v_10 = p.normalize_rate_for_wire(10.0);
        let v_100 = p.normalize_rate_for_wire(100.0);
        let v_1000 = p.normalize_rate_for_wire(1000.0);

        // Strictly increasing across orders of magnitude.
        assert!(v_1 < v_10);
        assert!(v_10 < v_100);
        assert!(v_100 < v_1000);
        // Saturates near 1.0 at the reference rate, not far below it.
        assert!(v_1000 >= 0.99);
        // Pin a useful dynamic range: rate=10 vs rate=100 differ
        // by > 0.2 on the wire (vs ~0.08 under the old formula).
        assert!(v_100 - v_10 > 0.20);
        // No NaN / negative for edge inputs.
        assert_eq!(p.normalize_rate_for_wire(0.0), 0.0);
        assert_eq!(p.normalize_rate_for_wire(-5.0), 0.0);
        assert_eq!(p.normalize_rate_for_wire(f64::NAN), 0.0);
        // Far above the reference saturates at 1.0.
        assert_eq!(p.normalize_rate_for_wire(1.0e9), 1.0);
    }

    // ---- should_emit_heat ----

    fn policy() -> DataGravityPolicy {
        DataGravityPolicy::default()
    }

    #[test]
    fn disabled_policy_always_suppresses() {
        let p = policy().with_enabled(false);
        assert_eq!(
            should_emit_heat(10.0, Some(1.0), &p),
            EmissionDecision::Suppress
        );
    }

    #[test]
    fn first_emission_fires_when_heat_present() {
        let p = policy();
        match should_emit_heat(5.0, None, &p) {
            EmissionDecision::Emit { rate } => assert_eq!(rate, 5.0),
            other => panic!("expected Emit, got {other:?}"),
        }
    }

    #[test]
    fn first_emission_suppressed_with_zero_rate() {
        let p = policy();
        assert_eq!(should_emit_heat(0.0, None, &p), EmissionDecision::Suppress);
    }

    #[test]
    fn doubled_rate_emits() {
        let p = policy();
        match should_emit_heat(20.0, Some(10.0), &p) {
            EmissionDecision::Emit { rate } => assert_eq!(rate, 20.0),
            other => panic!("expected Emit, got {other:?}"),
        }
    }

    #[test]
    fn halved_rate_emits() {
        let p = policy();
        match should_emit_heat(5.0, Some(10.0), &p) {
            EmissionDecision::Emit { rate } => assert_eq!(rate, 5.0),
            other => panic!("expected Emit, got {other:?}"),
        }
    }

    #[test]
    fn sub_threshold_change_suppresses() {
        let p = policy();
        // Rate moved from 10 → 15 (ratio 1.5 < 2.0 default).
        assert_eq!(
            should_emit_heat(15.0, Some(10.0), &p),
            EmissionDecision::Suppress
        );
    }

    #[test]
    fn decay_to_zero_emits_withdrawal() {
        let p = policy();
        assert_eq!(
            should_emit_heat(0.0, Some(5.0), &p),
            EmissionDecision::Withdraw
        );
    }

    #[test]
    fn already_withdrawn_suppresses() {
        let p = policy();
        // Last emission was 0.0; current is 0.0 — no change.
        assert_eq!(
            should_emit_heat(0.0, Some(0.0), &p),
            EmissionDecision::Suppress
        );
    }

    #[test]
    fn negative_rate_suppresses_defensively() {
        let p = policy();
        assert_eq!(
            should_emit_heat(-1.0, Some(1.0), &p),
            EmissionDecision::Suppress
        );
    }

    #[test]
    fn non_finite_rate_suppresses_defensively() {
        let p = policy();
        assert_eq!(
            should_emit_heat(f64::NAN, Some(1.0), &p),
            EmissionDecision::Suppress
        );
        assert_eq!(
            should_emit_heat(f64::INFINITY, Some(1.0), &p),
            EmissionDecision::Suppress
        );
    }

    #[test]
    fn higher_threshold_suppresses_doubled_rate() {
        // ratio = 3.0: doubling (2×) is no longer enough.
        let p = policy().with_emit_threshold_ratio(3.0);
        assert_eq!(
            should_emit_heat(20.0, Some(10.0), &p),
            EmissionDecision::Suppress
        );
        // tripling DOES fire.
        match should_emit_heat(30.0, Some(10.0), &p) {
            EmissionDecision::Emit { rate } => assert_eq!(rate, 30.0),
            other => panic!("expected Emit, got {other:?}"),
        }
    }
}