net-mesh 0.23.0

High-performance, schema-agnostic, backend-agnostic event bus
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
//! `HeatCounter` — per-chain read-rate with exponential decay.
//!
//! Pure data structure. The runtime layer calls
//! [`HeatCounter::bump`] on every authorized read and consults
//! [`HeatCounter::rate`] before the throttled emission check
//! (`should_emit_heat`).
//!
//! Decay function: `rate := rate × 0.5^((now - last_update) / half_life)`.
//! Equivalent to: each call's contribution decays geometrically
//! with the elapsed time since the previous bump.
//!
//! The "rate" units are deliberately abstract — the counter
//! measures bumps-per-half-life, not bumps-per-hour or bumps-
//! per-second. Operators interpret heat values comparatively
//! within a deployment, not absolutely; the wire-form tag
//! (`heat:<hex>=<rate>`) carries the same raw value the counter
//! holds.

use std::collections::HashMap;
use std::time::{Duration, Instant};

/// Per-chain heat counter.
#[derive(Debug, Clone)]
pub struct HeatCounter {
    /// Current decayed rate. Bumps add `1.0`; decay scales by
    /// `0.5^((now - last_update) / half_life)`.
    rate: f64,
    /// Last `Instant` the rate was updated (either via `bump` or
    /// `decay_to`). Drives the decay multiplier on the next
    /// observation.
    last_update: Instant,
    /// The rate at the last `record_emission` call, or `None` if
    /// no emission has happened yet. Drives the `should_emit_heat`
    /// threshold check.
    last_emitted: Option<f64>,
    /// Exponential-decay half-life. Tied to the channel's
    /// `DataGravityPolicy::decay_half_life`.
    half_life: Duration,
}

impl HeatCounter {
    /// Fresh counter — zero rate, no prior emission.
    pub fn new(half_life: Duration, now: Instant) -> Self {
        Self {
            rate: 0.0,
            last_update: now,
            last_emitted: None,
            half_life,
        }
    }

    /// Apply decay through `now` without bumping. Use when
    /// inspecting the current rate without observing a new
    /// event.
    pub fn decay_to(&mut self, now: Instant) {
        if self.half_life.is_zero() || self.rate == 0.0 {
            self.last_update = now;
            return;
        }
        let elapsed = now.saturating_duration_since(self.last_update);
        let half_lives = elapsed.as_secs_f64() / self.half_life.as_secs_f64();
        // Defensive clamp — very long elapses can saturate the
        // exponent at f64::MIN_POSITIVE. Treat anything past
        // 64 half-lives (≈ ratio 1.8e-20) as zero.
        if half_lives > 64.0 {
            self.rate = 0.0;
        } else {
            self.rate *= 0.5_f64.powf(half_lives);
            if self.rate < f64::EPSILON {
                self.rate = 0.0;
            }
        }
        self.last_update = now;
    }

    /// Observe a read at `now`. Decays the prior rate, then
    /// adds `1.0`.
    pub fn bump(&mut self, now: Instant) {
        self.decay_to(now);
        self.rate += 1.0;
    }

    /// Read the current (decayed) rate without mutating state.
    /// Useful in tests; production callers should `decay_to(now)`
    /// first to reflect time elapsed since the last bump.
    pub fn rate(&self) -> f64 {
        self.rate
    }

    /// The value carried in the last emitted heat tag, if any.
    pub fn last_emitted(&self) -> Option<f64> {
        self.last_emitted
    }

    /// Record that an emission with `rate` just landed. Future
    /// `should_emit_heat` calls compare against this snapshot.
    pub fn record_emission(&mut self, rate: f64) {
        self.last_emitted = Some(rate);
    }

    /// Record a withdrawal (heat=0 emitted). Equivalent to
    /// `record_emission(0.0)`.
    pub fn record_withdrawal(&mut self) {
        self.last_emitted = Some(0.0);
    }

    /// Half-life this counter was constructed with. Read-only —
    /// reconfigure by replacing the counter.
    pub fn half_life(&self) -> Duration {
        self.half_life
    }

    /// Last `Instant` the rate was updated (via `bump` or
    /// `decay_to`). Used by the registry's LRU eviction to pick
    /// the least-recently-touched counter under cap pressure.
    pub fn last_update(&self) -> Instant {
        self.last_update
    }
}

/// Outcome of one runtime tick over a channel's heat counter.
/// Returned by [`HeatRegistry::tick`] so the runtime can route
/// each path to the right wire action without re-deciding the
/// case.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum HeatEmission {
    /// No emission — caller does nothing.
    Suppress,
    /// Emit a `heat:<origin_hash_hex>=<rate>` tag.
    Emit {
        /// Decayed read-rate to carry on the wire.
        rate: f64,
    },
    /// Emit a withdrawal — `heat:<origin_hash_hex>=0`.
    Withdraw,
}

/// Default cap on tracked heat counters. Sized to comfortably
/// hold a busy node's working set without growing unboundedly
/// under churn (each counter is ~48 bytes + hashmap overhead,
/// so 8K entries fit in <1 MiB). Operators with workloads beyond
/// this raise via [`HeatRegistry::with_cap`].
pub const DEFAULT_HEAT_REGISTRY_CAP: usize = 8 * 1024;

/// Cluster-wide heat registry. Keyed by `u64` origin_hash — the
/// same chain identifier the substrate's `causal:<hex>` and
/// `heat:<hex>=<rate>` reserved tags carry on the wire.
///
/// Per-channel state is mutated under the registry's outer mutex.
/// The hot path (`bump` per read) takes the lock briefly; total
/// cost is dominated by the decay arithmetic, which is two
/// `as_secs_f64` + one `powf`. Acceptable for read paths today;
/// if telemetry shows contention we can shard by channel-hash.
///
/// The registry has an independent **cap + LRU-style replacement**
/// so the entry count stays bounded even in deployments where
/// the greedy cache (the usual eviction driver) isn't running, or
/// is sized so generously that it never evicts. Past the cap, an
/// `entry_mut` insert evicts the entry with the oldest
/// `last_update` first. The cap defaults to
/// [`DEFAULT_HEAT_REGISTRY_CAP`]; operators tune via
/// [`HeatRegistry::with_cap`].
#[derive(Debug)]
pub struct HeatRegistry {
    counters: HashMap<u64, HeatCounter>,
    cap: usize,
}

impl Default for HeatRegistry {
    fn default() -> Self {
        Self {
            counters: HashMap::new(),
            cap: DEFAULT_HEAT_REGISTRY_CAP,
        }
    }
}

impl HeatRegistry {
    /// Empty registry with the default cap.
    pub fn new() -> Self {
        Self::default()
    }

    /// Empty registry with an explicit cap. `cap == 0` disables
    /// the bound (use only when an external loop guarantees
    /// bounded entries — typically the greedy cache wiring).
    ///
    /// **Footgun**: passing `0` silently turns off memory
    /// bounding on the registry. Prefer
    /// [`Self::with_cap_unbounded`] when that is the intent so
    /// the reader of the call site can tell unbounded-by-design
    /// apart from "operator typo dropped the cap to zero."
    pub fn with_cap(cap: usize) -> Self {
        Self {
            counters: HashMap::new(),
            cap,
        }
    }

    /// Empty registry with the cap explicitly disabled. Use when
    /// an external loop (typically the greedy cache wiring)
    /// guarantees bounded entries and the registry just rides
    /// along.
    pub fn with_cap_unbounded() -> Self {
        Self::with_cap(0)
    }

    /// Configured cap. `0` means unbounded.
    pub fn cap(&self) -> usize {
        self.cap
    }

    /// Number of tracked channels.
    pub fn len(&self) -> usize {
        self.counters.len()
    }

    /// True iff zero channels tracked.
    pub fn is_empty(&self) -> bool {
        self.counters.is_empty()
    }

    /// Get-or-create the counter for `channel`. Returns a
    /// mutable reference so the caller can `bump` / `decay_to`
    /// / `record_emission` in one borrow.
    ///
    /// When `len() == cap` and the inserted key is new, the entry
    /// with the oldest `last_update` is evicted first (LRU-style
    /// replacement). `cap == 0` disables the bound.
    pub fn entry_mut(
        &mut self,
        channel: u64,
        half_life: Duration,
        now: Instant,
    ) -> &mut HeatCounter {
        if !self.counters.contains_key(&channel) && self.cap > 0 && self.counters.len() >= self.cap
        {
            self.evict_lru();
        }
        self.counters
            .entry(channel)
            .or_insert_with(|| HeatCounter::new(half_life, now))
    }

    /// Evict the counter whose `last_update` is oldest. O(n) over
    /// tracked counters; runs at most once per `entry_mut` past
    /// the cap, so amortized cost stays bounded under steady-state
    /// churn. No-op when empty.
    fn evict_lru(&mut self) {
        let victim = self
            .counters
            .iter()
            .min_by_key(|(_, c)| c.last_update())
            .map(|(k, _)| *k);
        if let Some(key) = victim {
            self.counters.remove(&key);
        }
    }

    /// Read-only access to the counter for `channel`.
    pub fn get(&self, channel: &u64) -> Option<&HeatCounter> {
        self.counters.get(channel)
    }

    /// Remove the counter for `channel`. Used on channel close /
    /// cache eviction.
    pub fn remove(&mut self, channel: &u64) {
        self.counters.remove(channel);
    }

    /// Iterate `(channel, counter)` pairs. Read-only.
    pub fn iter(&self) -> impl Iterator<Item = (&u64, &HeatCounter)> {
        self.counters.iter()
    }

    /// Walk every tracked channel, applying decay through `now`
    /// and asking [`super::should_emit_heat`] whether to emit.
    /// Returns the list of `(channel, decision)` pairs the
    /// runtime acts on.
    ///
    /// Records the emission against the counter for `Emit` /
    /// `Withdraw` decisions before returning, so the next tick
    /// sees the updated `last_emitted` snapshot.
    ///
    /// After the per-counter pass, prunes entries whose rate has
    /// fully decayed to zero AND have already emitted a
    /// withdrawal — there's no future state transition possible
    /// (any new bump would re-enter via `entry_mut`), so keeping
    /// them around just bloats the map and slows subsequent
    /// ticks.
    pub fn tick(
        &mut self,
        policy: &super::DataGravityPolicy,
        now: Instant,
    ) -> Vec<(u64, HeatEmission)> {
        let mut out = Vec::new();
        for (channel, counter) in self.counters.iter_mut() {
            counter.decay_to(now);
            let decision = super::should_emit_heat(counter.rate, counter.last_emitted, policy);
            // D-17: candidate/commit split. Pre-fix `tick` mutated
            // `last_emitted` inline, before the async sink had
            // confirmed the announcement. One transient sink error
            // would leave `last_emitted ≈ rate` and the next tick's
            // `should_emit_heat` returned `Suppress` forever — the
            // chain's heat update silently stranded. The mutation
            // now lives in `commit_emissions`, which the caller
            // invokes only on `Ok(())` from the sink. A failure
            // simply skips commit; the next tick reruns
            // `should_emit_heat` against the unchanged
            // `last_emitted` and re-emits.
            let emission = match decision {
                super::EmissionDecision::Suppress => HeatEmission::Suppress,
                super::EmissionDecision::Emit { rate } => HeatEmission::Emit { rate },
                super::EmissionDecision::Withdraw => HeatEmission::Withdraw,
            };
            if !matches!(emission, HeatEmission::Suppress) {
                out.push((*channel, emission));
            }
        }
        // Pruning is deferred to commit so a sink failure doesn't
        // evict counters the next tick should be retrying against.
        out
    }

    /// Commit the emissions returned by [`Self::tick`] after the
    /// async sink has confirmed delivery. Mutates each counter's
    /// `last_emitted` so the next tick computes its
    /// `should_emit_heat` against the durable state, and prunes
    /// fully-decayed + already-withdrawn entries. Callers that
    /// observed a sink error skip this call; the candidates stay
    /// pending and the next tick reissues them naturally because
    /// no state mutation happened.
    pub fn commit_emissions(&mut self, emissions: &[(u64, HeatEmission)]) {
        for (channel, emission) in emissions {
            if let Some(counter) = self.counters.get_mut(channel) {
                match emission {
                    HeatEmission::Emit { rate } => counter.record_emission(*rate),
                    HeatEmission::Withdraw => counter.record_withdrawal(),
                    HeatEmission::Suppress => {}
                }
            }
        }
        // Prune fully-decayed + already-withdrawn entries. A future
        // bump for the same origin re-enters the registry via
        // `entry_mut`; the LRU cap protects against unbounded
        // re-entries. Use `<= 0.0` rather than `== 0.0` so a future
        // caller writing `-0.0` (e.g. a clamped-from-negative rate)
        // still prunes — `-0.0 == 0.0` is true in IEEE-754 but the
        // explicit comparator is more obviously robust to a future
        // refactor that swaps in `<=` elsewhere.
        self.counters
            .retain(|_, c| !(c.rate <= 0.0 && c.last_emitted.is_some_and(|v| v <= 0.0)));
    }
}

/// Cluster-wide blob heat registry. Mirrors [`HeatRegistry`] but
/// keyed by `[u8; 32]` (the chunk's BLAKE3 hash) rather than the
/// chain's `u64` `origin_hash` — operators reading a `BlobRef`
/// have the hash in hand and a `u64` projection would unnecessarily
/// collapse it. Same LRU + cap discipline; same per-counter
/// half-life decay; same tick semantics. PR-5j-a foundation for
/// the gravity migration controller.
#[derive(Debug)]
pub struct BlobHeatRegistry {
    counters: HashMap<[u8; 32], HeatCounter>,
    cap: usize,
    /// Hashes whose emission is currently in-flight to the sink
    /// but has not yet been committed. The mesh-blob driver
    /// releases the registry mutex between `tick` and
    /// `commit_emissions` so the async sink call can run without
    /// holding a `!Send` parking_lot guard across `.await`. A
    /// concurrent `tick` landing in that window would otherwise
    /// recompute `should_emit_heat` against the still-unmutated
    /// `last_emitted` and re-emit the same candidates over the
    /// sink — duplicates downstream. Excluding in-flight hashes
    /// here prevents the duplicate emission; `commit_emissions`
    /// clears the entry once the sink has confirmed.
    in_flight: std::collections::HashSet<[u8; 32]>,
}

impl Default for BlobHeatRegistry {
    fn default() -> Self {
        Self {
            counters: HashMap::new(),
            cap: DEFAULT_HEAT_REGISTRY_CAP,
            in_flight: std::collections::HashSet::new(),
        }
    }
}

impl BlobHeatRegistry {
    /// Empty registry with the default cap.
    pub fn new() -> Self {
        Self::default()
    }

    /// Empty registry with an explicit cap. `cap == 0` disables
    /// the bound — only safe when an external loop guarantees
    /// bounded entries (e.g. an adapter that prunes on chunk
    /// delete).
    ///
    /// **Footgun**: passing `0` silently turns off memory
    /// bounding. Prefer [`Self::with_cap_unbounded`] when that
    /// is the intent so a call-site reader can tell intentional
    /// unboundedness apart from an operator typo.
    pub fn with_cap(cap: usize) -> Self {
        Self {
            counters: HashMap::new(),
            cap,
            in_flight: std::collections::HashSet::new(),
        }
    }

    /// Empty registry with the cap explicitly disabled. Use when
    /// an external loop guarantees bounded entries (e.g. an
    /// adapter that prunes on chunk delete) and the registry is
    /// just riding along.
    pub fn with_cap_unbounded() -> Self {
        Self::with_cap(0)
    }

    /// Configured cap. `0` means unbounded.
    pub fn cap(&self) -> usize {
        self.cap
    }

    /// Number of tracked blob hashes.
    pub fn len(&self) -> usize {
        self.counters.len()
    }

    /// True iff zero hashes tracked.
    pub fn is_empty(&self) -> bool {
        self.counters.is_empty()
    }

    /// Get-or-create the counter for `hash`. Returns a mutable
    /// reference so the caller can `bump` / `decay_to` /
    /// `record_emission` in one borrow.
    ///
    /// When `len() == cap` and the inserted key is new, the entry
    /// with the oldest `last_update` is evicted first (LRU-style
    /// replacement). `cap == 0` disables the bound.
    pub fn entry_mut(
        &mut self,
        hash: [u8; 32],
        half_life: Duration,
        now: Instant,
    ) -> &mut HeatCounter {
        if !self.counters.contains_key(&hash) && self.cap > 0 && self.counters.len() >= self.cap {
            self.evict_lru();
        }
        self.counters
            .entry(hash)
            .or_insert_with(|| HeatCounter::new(half_life, now))
    }

    fn evict_lru(&mut self) {
        let victim = self
            .counters
            .iter()
            .min_by_key(|(_, c)| c.last_update())
            .map(|(k, _)| *k);
        if let Some(key) = victim {
            self.counters.remove(&key);
            // Clear any stale `in_flight` marker for the evicted
            // hash. Without this, a later reintroduction (re-bump
            // via `entry_mut`) creates a fresh counter that
            // `tick` permanently suppresses because
            // `in_flight.contains(&key)` is still true — the
            // emission was issued for the prior counter but
            // never `commit_emissions`'d after eviction wiped
            // the registry entry.
            self.in_flight.remove(&key);
        }
    }

    /// Read-only access to the counter for `hash`.
    pub fn get(&self, hash: &[u8; 32]) -> Option<&HeatCounter> {
        self.counters.get(hash)
    }

    /// Remove the counter for `hash`. Used on chunk delete /
    /// GC sweep.
    pub fn remove(&mut self, hash: &[u8; 32]) {
        self.counters.remove(hash);
        // Same in-flight cleanup as `evict_lru` — a removed hash
        // that re-enters later via `entry_mut` must start with a
        // clean `in_flight` slot, otherwise `tick` suppresses
        // every emission for the new counter forever.
        self.in_flight.remove(hash);
    }

    /// Iterate `(hash, counter)` pairs. Read-only.
    pub fn iter(&self) -> impl Iterator<Item = (&[u8; 32], &HeatCounter)> {
        self.counters.iter()
    }

    /// Walk every tracked hash, applying decay through `now` and
    /// asking `should_emit_heat` whether to emit. Returns the
    /// list of `(hash, decision)` pairs the runtime acts on.
    /// Mirrors [`HeatRegistry::tick`]; records emissions against
    /// the counter so the next tick sees the updated snapshot,
    /// and prunes fully-decayed + already-withdrawn entries.
    pub fn tick(
        &mut self,
        policy: &super::DataGravityPolicy,
        now: Instant,
    ) -> Vec<([u8; 32], HeatEmission)> {
        let mut out = Vec::new();
        // Two-step iter so we can mutate `in_flight` after we're
        // done reading `counters` — both are `&mut self` fields.
        for (hash, counter) in self.counters.iter_mut() {
            counter.decay_to(now);
            if self.in_flight.contains(hash) {
                // A prior `tick` already emitted this hash and the
                // caller hasn't yet `commit_emissions`'d it. Don't
                // re-emit — `commit_emissions` will mutate the
                // `last_emitted` snapshot, after which a future tick
                // computes against the durable state cleanly.
                continue;
            }
            let decision = super::should_emit_heat(counter.rate, counter.last_emitted, policy);
            // D-17 candidate/commit split: see `HeatRegistry::tick`
            // for the rationale. `last_emitted` mutates only after
            // the sink confirms via `commit_emissions`.
            let emission = match decision {
                super::EmissionDecision::Suppress => HeatEmission::Suppress,
                super::EmissionDecision::Emit { rate } => HeatEmission::Emit { rate },
                super::EmissionDecision::Withdraw => HeatEmission::Withdraw,
            };
            if !matches!(emission, HeatEmission::Suppress) {
                out.push((*hash, emission));
            }
        }
        for (hash, _) in &out {
            self.in_flight.insert(*hash);
        }
        out
    }

    /// Commit the emissions from [`Self::tick`] after the sink
    /// confirms. See [`HeatRegistry::commit_emissions`] for the
    /// candidate/commit rationale. Also clears the in-flight
    /// marker so a subsequent `tick` can re-evaluate the hash.
    pub fn commit_emissions(&mut self, emissions: &[([u8; 32], HeatEmission)]) {
        for (hash, emission) in emissions {
            if let Some(counter) = self.counters.get_mut(hash) {
                match emission {
                    HeatEmission::Emit { rate } => counter.record_emission(*rate),
                    HeatEmission::Withdraw => counter.record_withdrawal(),
                    HeatEmission::Suppress => {}
                }
            }
            self.in_flight.remove(hash);
        }
        // Prune fully-decayed + already-withdrawn entries. Same
        // `<= 0.0` rationale as HeatRegistry::tick above. Also
        // clear any `in_flight` marker associated with a pruned
        // hash so a future reintroduction starts clean — pre-fix
        // the prune dropped the counter but left `in_flight`
        // dangling, suppressing the new counter's emissions until
        // a manual `rollback_emission` cleared the stale marker.
        let mut pruned: Vec<[u8; 32]> = Vec::new();
        self.counters.retain(|hash, c| {
            let keep = !(c.rate <= 0.0 && c.last_emitted.is_some_and(|v| v <= 0.0));
            if !keep {
                pruned.push(*hash);
            }
            keep
        });
        for hash in &pruned {
            self.in_flight.remove(hash);
        }
    }

    /// Clear the in-flight marker for `hash` without committing
    /// the emission — used by the sink-failure path so the
    /// caller's retry on the next tick reissues the same
    /// emission. Pre-fix the caller could only `commit` (apply
    /// the mutation) or do nothing (leak the in-flight marker);
    /// neither matched the "retry on failure" semantic the audit
    /// asked for.
    pub fn rollback_emission(&mut self, hash: &[u8; 32]) {
        self.in_flight.remove(hash);
    }
}

#[cfg(test)]
mod tests {
    #![allow(
        clippy::disallowed_methods,
        reason = "test code legitimately uses std::sync::{Mutex,RwLock} for SUT setup; tests have no real poison concern"
    )]
    use super::*;

    fn channel(seed: u64) -> u64 {
        // Tests just need a stable distinct identifier per
        // "channel"; the production wire uses the chain's
        // origin_hash here.
        0xCAFE_BABE_0000_0000 | seed
    }

    fn t0() -> Instant {
        Instant::now()
    }

    #[test]
    fn fresh_counter_is_zero() {
        let c = HeatCounter::new(Duration::from_secs(60), t0());
        assert_eq!(c.rate(), 0.0);
        assert_eq!(c.last_emitted(), None);
    }

    #[test]
    fn bump_adds_one_when_no_decay() {
        let base = t0();
        let mut c = HeatCounter::new(Duration::from_secs(60), base);
        c.bump(base);
        assert!((c.rate() - 1.0).abs() < 1e-9);
        c.bump(base);
        assert!((c.rate() - 2.0).abs() < 1e-9);
    }

    #[test]
    fn one_half_life_decays_rate_by_half() {
        let base = t0();
        let half = Duration::from_secs(60);
        let mut c = HeatCounter::new(half, base);
        c.bump(base);
        c.bump(base);
        c.bump(base);
        c.bump(base);
        // rate ≈ 4.0 at base.
        c.decay_to(base + half);
        assert!(
            (c.rate() - 2.0).abs() < 1e-6,
            "rate after half-life ≈ 2.0; got {}",
            c.rate()
        );
        c.decay_to(base + half * 2);
        assert!((c.rate() - 1.0).abs() < 1e-6);
        c.decay_to(base + half * 3);
        assert!((c.rate() - 0.5).abs() < 1e-6);
    }

    #[test]
    fn long_elapse_clamps_to_zero() {
        let base = t0();
        let half = Duration::from_secs(60);
        let mut c = HeatCounter::new(half, base);
        c.bump(base);
        // 100 half-lives — past the clamp threshold (64).
        c.decay_to(base + half * 100);
        assert_eq!(c.rate(), 0.0);
    }

    #[test]
    fn bump_decays_then_adds() {
        let base = t0();
        let half = Duration::from_secs(60);
        let mut c = HeatCounter::new(half, base);
        c.bump(base);
        c.bump(base);
        // rate = 2.0 at base.
        c.bump(base + half);
        // decay 2.0 → 1.0, then +1.0 → 2.0.
        assert!((c.rate() - 2.0).abs() < 1e-6);
    }

    #[test]
    fn record_emission_tracks_last() {
        let base = t0();
        let mut c = HeatCounter::new(Duration::from_secs(60), base);
        c.bump(base);
        c.record_emission(1.5);
        assert_eq!(c.last_emitted(), Some(1.5));
        c.record_withdrawal();
        assert_eq!(c.last_emitted(), Some(0.0));
    }

    // ---- HeatRegistry ----

    #[test]
    fn new_registry_is_empty() {
        let r = HeatRegistry::new();
        assert!(r.is_empty());
        assert_eq!(r.len(), 0);
    }

    #[test]
    fn entry_mut_creates_on_first_access() {
        let mut r = HeatRegistry::new();
        let half = Duration::from_secs(60);
        let counter = r.entry_mut(channel(0xA), half, t0());
        counter.bump(t0());
        assert!((counter.rate() - 1.0).abs() < 1e-9);
        assert_eq!(r.len(), 1);
    }

    #[test]
    fn remove_drops_entry() {
        let mut r = HeatRegistry::new();
        let half = Duration::from_secs(60);
        let _ = r.entry_mut(channel(0xA), half, t0());
        r.remove(&channel(0xA));
        assert!(r.is_empty());
    }

    #[test]
    fn entry_mut_at_cap_evicts_lru_on_new_insert() {
        // Cap of 2; insert three distinct chains, each touched at
        // a strictly-later Instant. The first chain is LRU at the
        // moment of the third insert and must evict.
        let base = t0();
        let mut r = HeatRegistry::with_cap(2);
        let half = Duration::from_secs(60);

        let _ = r.entry_mut(channel(0xA), half, base);
        let _ = r.entry_mut(channel(0xB), half, base + Duration::from_secs(1));
        assert_eq!(r.len(), 2);
        // Bumping B updates its last_update past A — making A the
        // LRU. The third insert (C) evicts A.
        let bumped = r.entry_mut(channel(0xB), half, base + Duration::from_secs(2));
        bumped.bump(base + Duration::from_secs(2));
        let _ = r.entry_mut(channel(0xC), half, base + Duration::from_secs(3));
        assert_eq!(r.len(), 2);
        assert!(r.get(&channel(0xA)).is_none(), "LRU entry A evicted");
        assert!(r.get(&channel(0xB)).is_some());
        assert!(r.get(&channel(0xC)).is_some());
    }

    #[test]
    fn entry_mut_cap_zero_is_unbounded() {
        let base = t0();
        let mut r = HeatRegistry::with_cap(0);
        let half = Duration::from_secs(60);
        for i in 0..100u64 {
            let _ = r.entry_mut(channel(i), half, base);
        }
        assert_eq!(r.len(), 100);
    }

    #[test]
    fn tick_prunes_fully_decayed_withdrawn_entries() {
        // After withdrawal + full decay, the entry is bookkeeping
        // noise. tick prunes it so subsequent ticks stay O(active
        // chains), not O(historical chains).
        let base = t0();
        let mut r = HeatRegistry::new();
        let policy = super::super::DataGravityPolicy::default();
        let half = policy.decay_half_life;

        // Bump once, emit, then let the rate decay to zero and
        // tick again to emit the withdrawal. D-17: callers now
        // commit_emissions after the sink confirms; tests mirror
        // the new contract by committing inline.
        let counter = r.entry_mut(channel(0xA), half, base);
        counter.bump(base);
        let e0 = r.tick(&policy, base);
        r.commit_emissions(&e0);
        assert_eq!(r.len(), 1);

        // 100 half-lives → rate clamps to zero; next tick emits
        // a withdrawal AND prunes the now-quiescent entry.
        let later = base + half * 100;
        let emissions = r.tick(&policy, later);
        assert!(emissions
            .iter()
            .any(|(_, e)| matches!(e, HeatEmission::Withdraw)));
        r.commit_emissions(&emissions);
        let after = r.tick(&policy, later + Duration::from_secs(1));
        assert!(after.is_empty(), "no further emissions");
        r.commit_emissions(&after);
        assert_eq!(r.len(), 0, "fully-decayed withdrawn entry pruned");
    }

    #[test]
    fn tick_emits_first_observation() {
        let base = t0();
        let mut r = HeatRegistry::new();
        let policy = super::super::DataGravityPolicy::default();
        let counter = r.entry_mut(channel(0xA), policy.decay_half_life, base);
        counter.bump(base);
        let emissions = r.tick(&policy, base);
        assert_eq!(emissions.len(), 1);
        match emissions[0].1 {
            HeatEmission::Emit { rate } => assert!(rate > 0.0),
            other => panic!("expected Emit, got {other:?}"),
        }
        // D-17: commit so the second tick's `should_emit_heat`
        // sees `last_emitted ≈ rate` and suppresses.
        r.commit_emissions(&emissions);
        // Subsequent tick suppresses (rate hasn't moved).
        let emissions2 = r.tick(&policy, base);
        assert!(emissions2.is_empty());
    }

    #[test]
    fn tick_emits_withdrawal_after_decay() {
        let base = t0();
        let mut r = HeatRegistry::new();
        let policy = super::super::DataGravityPolicy::default();
        let counter = r.entry_mut(channel(0xA), policy.decay_half_life, base);
        counter.bump(base);
        // First tick — emit. D-17: commit so the next tick sees
        // the post-emit `last_emitted`.
        let first = r.tick(&policy, base);
        r.commit_emissions(&first);
        // 100 half-lives later — rate decays to zero; withdraw.
        let later = base + policy.decay_half_life * 100;
        let emissions = r.tick(&policy, later);
        assert_eq!(emissions.len(), 1);
        assert_eq!(emissions[0].1, HeatEmission::Withdraw);
    }

    #[test]
    fn tick_doubled_rate_re_emits() {
        let base = t0();
        let mut r = HeatRegistry::new();
        let policy = super::super::DataGravityPolicy::default();
        let counter = r.entry_mut(channel(0xA), policy.decay_half_life, base);
        counter.bump(base);
        // First tick — emit at rate ≈ 1.0. Commit so the second
        // tick's `should_emit_heat` sees `last_emitted ≈ 1.0`.
        let first = r.tick(&policy, base);
        assert_eq!(first.len(), 1);
        r.commit_emissions(&first);
        // More bumps — rate climbs.
        for _ in 0..3 {
            r.entry_mut(channel(0xA), policy.decay_half_life, base)
                .bump(base);
        }
        // Tick — rate is now ≈ 4.0 > 2× last emitted 1.0; emit.
        let second = r.tick(&policy, base);
        assert_eq!(second.len(), 1);
        match second[0].1 {
            HeatEmission::Emit { rate } => assert!(rate >= 4.0 * 0.99),
            other => panic!("expected Emit, got {other:?}"),
        }
    }

    /// D-17 regression: a sink failure (caller skips
    /// `commit_emissions`) must NOT silently mark the counter as
    /// already-emitted. The next tick must reissue the same
    /// candidate so a transient sink error doesn't permanently
    /// strand the chain's heat advertisement.
    #[test]
    fn tick_without_commit_reissues_on_next_tick() {
        let base = t0();
        let mut r = HeatRegistry::new();
        let policy = super::super::DataGravityPolicy::default();
        let counter = r.entry_mut(channel(0xA), policy.decay_half_life, base);
        counter.bump(base);

        // First tick produces a candidate Emit; caller simulates
        // a sink failure and does NOT call commit_emissions.
        let candidates = r.tick(&policy, base);
        assert_eq!(candidates.len(), 1);
        assert!(matches!(candidates[0].1, HeatEmission::Emit { .. }));

        // Next tick must re-emit because `last_emitted` is still
        // `None`. Pre-fix, the inline `record_emission` inside
        // tick() would have advanced `last_emitted ≈ rate` and
        // this second tick would have returned empty (Suppress).
        let candidates2 = r.tick(&policy, base);
        assert_eq!(
            candidates2.len(),
            1,
            "transient sink failure (no commit) must not silence the next tick"
        );
        assert!(matches!(candidates2[0].1, HeatEmission::Emit { .. }));
    }

    // --- BlobHeatRegistry coverage (PR-5j-a) ---

    fn hash(seed: u8) -> [u8; 32] {
        let mut h = [0u8; 32];
        h[0] = seed;
        h
    }

    #[test]
    fn blob_heat_registry_is_empty_by_default() {
        let r = BlobHeatRegistry::new();
        assert!(r.is_empty());
        assert_eq!(r.cap(), DEFAULT_HEAT_REGISTRY_CAP);
    }

    #[test]
    fn blob_heat_entry_mut_creates_then_bumps() {
        let mut r = BlobHeatRegistry::new();
        let half = Duration::from_secs(60);
        let h = hash(0x01);
        r.entry_mut(h, half, t0()).bump(t0());
        let counter = r.get(&h).expect("entry should exist");
        assert!(counter.rate() > 0.0);
    }

    #[test]
    fn blob_heat_entry_mut_at_cap_evicts_lru() {
        let base = t0();
        let mut r = BlobHeatRegistry::with_cap(2);
        let half = Duration::from_secs(60);
        r.entry_mut(hash(0x01), half, base).bump(base);
        r.entry_mut(hash(0x02), half, base + Duration::from_millis(10))
            .bump(base + Duration::from_millis(10));
        r.entry_mut(hash(0x03), half, base + Duration::from_millis(20))
            .bump(base + Duration::from_millis(20));
        // hash(0x01) was LRU when 0x03 inserted past cap → evicted.
        assert!(r.get(&hash(0x01)).is_none());
        assert!(r.get(&hash(0x02)).is_some());
        assert!(r.get(&hash(0x03)).is_some());
    }

    #[test]
    fn blob_heat_tick_emits_above_threshold() {
        let mut r = BlobHeatRegistry::new();
        let policy = super::super::policy::DataGravityPolicy::default();
        let half = policy.decay_half_life;
        let h = hash(0x42);
        let now = t0();
        // Several bumps in quick succession build rate quickly.
        for _ in 0..8 {
            r.entry_mut(h, half, now).bump(now);
        }
        let emissions = r.tick(&policy, now);
        assert!(
            emissions
                .iter()
                .any(|(k, e)| *k == h && matches!(e, HeatEmission::Emit { rate } if *rate > 0.0)),
            "tick must emit for a heated hash; got {emissions:?}"
        );
    }

    /// Regression: `tick_blob_heat` releases the registry mutex
    /// between `tick` and `commit_emissions` so the async sink
    /// call can run without holding a `!Send` parking_lot guard
    /// across `.await`. A concurrent `tick` landing in that
    /// window must NOT re-emit the same candidates — `tick`'s
    /// in-flight set tracks emitted hashes until
    /// `commit_emissions` (or `rollback_emission`) clears them.
    #[test]
    fn blob_heat_concurrent_tick_skips_in_flight_candidates() {
        let mut r = BlobHeatRegistry::new();
        let policy = super::super::policy::DataGravityPolicy::default();
        let half = policy.decay_half_life;
        let h = hash(0x42);
        let now = t0();
        for _ in 0..8 {
            r.entry_mut(h, half, now).bump(now);
        }

        // First tick: emits the candidate and marks it in-flight.
        let emissions = r.tick(&policy, now);
        assert!(emissions.iter().any(|(k, _)| *k == h));

        // Second tick BEFORE commit: must NOT re-emit the same
        // hash; the in-flight set protects against the duplicate.
        let emissions2 = r.tick(&policy, now);
        assert!(
            !emissions2.iter().any(|(k, _)| *k == h),
            "concurrent tick in the pre-commit window must skip in-flight hashes; \
             pre-fix this would re-emit and the sink would receive duplicates"
        );

        // Commit clears the in-flight marker. A subsequent bump
        // can then re-emit normally (rate-doubling re-emit policy).
        r.commit_emissions(&emissions);
        for _ in 0..8 {
            r.entry_mut(h, half, now).bump(now);
        }
        let emissions3 = r.tick(&policy, now);
        assert!(
            emissions3.iter().any(|(k, _)| *k == h),
            "post-commit + further heat must re-enter emission",
        );
    }

    /// Regression: `rollback_emission` (used by the mesh-blob
    /// driver on sink failure) clears the in-flight marker so the
    /// next tick reissues the same emission. Without it, a
    /// transient sink failure would pin the hash in-flight
    /// forever and the chain's heat update would silently stop
    /// reaching the sink.
    #[test]
    fn blob_heat_rollback_emission_lets_next_tick_reissue() {
        let mut r = BlobHeatRegistry::new();
        let policy = super::super::policy::DataGravityPolicy::default();
        let half = policy.decay_half_life;
        let h = hash(0x55);
        let now = t0();
        for _ in 0..8 {
            r.entry_mut(h, half, now).bump(now);
        }
        let emissions = r.tick(&policy, now);
        assert!(emissions.iter().any(|(k, _)| *k == h));

        // Simulate sink failure: caller calls rollback_emission
        // instead of commit_emissions.
        r.rollback_emission(&h);

        let emissions2 = r.tick(&policy, now);
        assert!(
            emissions2.iter().any(|(k, _)| *k == h),
            "rollback_emission must re-enable emission on the next tick",
        );
    }

    /// Review regression: when a hash is `remove`d while it has
    /// an outstanding `in_flight` marker (tick fired but the
    /// caller hasn't yet `commit_emissions`'d), the marker must
    /// be cleared. A later reintroduction (re-bump → `entry_mut`)
    /// creates a fresh counter; pre-fix, `tick` would suppress
    /// every emission for the new counter because
    /// `in_flight.contains(hash)` was still true from the prior
    /// counter's lifetime.
    #[test]
    fn blob_heat_remove_clears_in_flight_so_reintroduced_hash_emits() {
        let mut r = BlobHeatRegistry::new();
        let policy = super::super::policy::DataGravityPolicy::default();
        let half = policy.decay_half_life;
        let h = hash(0x77);
        let now = t0();

        // Heat + tick: marks the hash in-flight.
        for _ in 0..8 {
            r.entry_mut(h, half, now).bump(now);
        }
        let _ = r.tick(&policy, now);

        // Remove without committing — simulates a GC sweep or
        // chunk-delete that fires after `tick` returned but before
        // the caller could `commit_emissions`. Pre-fix the marker
        // leaked here.
        r.remove(&h);
        assert!(r.is_empty());

        // Reintroduce via fresh bumps.
        for _ in 0..8 {
            r.entry_mut(h, half, now).bump(now);
        }

        // The new counter must be able to emit. Pre-fix this
        // tick returned empty because `in_flight.contains(&h)`
        // still held from the removed-counter's lifetime.
        let emissions = r.tick(&policy, now);
        assert!(
            emissions.iter().any(|(k, _)| *k == h),
            "reintroduced hash must emit again; pre-fix in_flight leak suppressed it forever"
        );
    }

    /// Same regression for the LRU-eviction path. A counter at
    /// the cap that gets evicted mid-flight must not leak its
    /// `in_flight` marker.
    ///
    /// Time-stagger each bump-group so `evict_lru`'s
    /// `min_by_key(last_update)` is unambiguous. Pre-fix the
    /// test used a single `now` instant for everything;
    /// `evict_lru` then resolved the tie via `HashMap`
    /// iteration order, making which entry evicted depend on
    /// the hashbrown random seed and turning this into a flake
    /// (~50% fail rate under parallel test execution).
    #[test]
    fn blob_heat_evict_clears_in_flight() {
        let mut r = BlobHeatRegistry::with_cap(2);
        let policy = super::super::policy::DataGravityPolicy::default();
        let half = policy.decay_half_life;
        let t_a = t0();
        let t_b = t_a + Duration::from_millis(10);
        let t_c = t_a + Duration::from_millis(20);
        let h_a = hash(0xA1);
        let h_b = hash(0xB2);
        let h_c = hash(0xC3);

        for _ in 0..8 {
            r.entry_mut(h_a, half, t_a).bump(t_a);
        }
        let _ = r.tick(&policy, t_a); // h_a in flight

        // Insert h_b then h_c at strictly-increasing instants so
        // h_a is unambiguously LRU when h_c lands past cap=2.
        for _ in 0..8 {
            r.entry_mut(h_b, half, t_b).bump(t_b);
        }
        for _ in 0..8 {
            r.entry_mut(h_c, half, t_c).bump(t_c);
        }
        assert!(
            r.get(&h_a).is_none(),
            "h_a must have been evicted (LRU by last_update)"
        );

        // Reintroduce h_a. The eviction-cleared in_flight
        // marker lets the new counter emit on next tick.
        let t_d = t_a + Duration::from_millis(30);
        for _ in 0..8 {
            r.entry_mut(h_a, half, t_d).bump(t_d);
        }
        let emissions = r.tick(&policy, t_d);
        assert!(
            emissions.iter().any(|(k, _)| *k == h_a),
            "reintroduced-after-eviction hash must emit; pre-fix the eviction leak \
             pinned the new counter as in-flight forever"
        );
    }

    #[test]
    fn blob_heat_remove_drops_entry() {
        let mut r = BlobHeatRegistry::new();
        let half = Duration::from_secs(60);
        let h = hash(0x42);
        r.entry_mut(h, half, t0()).bump(t0());
        r.remove(&h);
        assert!(r.is_empty());
    }

    // ========================================================================
    // Concurrency stress (multi-thread bump / tick races)
    //
    // The registries are `HashMap` inside, designed to live under
    // an outer `Arc<Mutex<...>>` (the production wiring on
    // `MeshBlobAdapter`). These tests wrap the registry that way
    // and assert the higher-level invariants — no panic under
    // concurrent bumps from N threads, tick-during-bump remains
    // safe, LRU eviction stays within the cap envelope.
    // ========================================================================

    /// N threads each bump the same chunk hash on a shared
    /// `Arc<Mutex<BlobHeatRegistry>>`. After the race, the rate
    /// must equal `threads × per_thread` (modulo a negligible
    /// decay over the test's millisecond window). Pins the
    /// outer-mutex serialization correctness for concurrent
    /// fetch-path heat updates.
    #[test]
    fn blob_heat_concurrent_bump_accumulates_under_outer_mutex() {
        use std::sync::{Arc, Barrier, Mutex};
        use std::thread;

        let registry = Arc::new(Mutex::new(BlobHeatRegistry::new()));
        let half = Duration::from_secs(60 * 60); // 1 h — negligible decay over the test
        let target = hash(0xAB);
        let threads = 8usize;
        let per_thread = 1_000usize;
        let start = Arc::new(Barrier::new(threads));
        let mut handles = Vec::with_capacity(threads);

        for _ in 0..threads {
            let registry = registry.clone();
            let start = start.clone();
            handles.push(thread::spawn(move || {
                start.wait();
                for _ in 0..per_thread {
                    let now = Instant::now();
                    let mut guard = registry.lock().unwrap();
                    guard.entry_mut(target, half, now).bump(now);
                }
            }));
        }
        for h in handles {
            h.join().expect("worker panicked");
        }

        let guard = registry.lock().unwrap();
        let counter = guard.get(&target).expect("entry must exist");
        let expected = (threads * per_thread) as f64;
        // Decay is negligible (1 h half-life over ms-window), but
        // we allow a generous 1 % slop for the f64 math without
        // making the test brittle.
        let rate = counter.rate();
        assert!(
            rate > expected * 0.99,
            "expected rate ≈ {} (8 × 1000 bumps); got {} (lower bound failed)",
            expected,
            rate,
        );
        assert!(
            rate <= expected,
            "expected rate ≤ {} (no double-counting); got {}",
            expected,
            rate,
        );
    }

    /// Background `bump` storm on a single hash while a foreground
    /// thread runs `tick(policy, now)` repeatedly. Pins the
    /// iter-mut-during-bump safety: neither thread may panic, and
    /// the mutex must stay live across the storm.
    ///
    /// Emission count is intentionally *not* asserted — under mutex
    /// contention the bumper can be starved enough by the ticker
    /// that no tick observes a rate above the emission threshold.
    /// Deterministic emission semantics are covered by
    /// `blob_heat_tick_emits_above_threshold`; this test pins the
    /// concurrency-safety half of the contract.
    #[test]
    fn blob_heat_tick_concurrent_with_bumps_is_panic_free() {
        use std::sync::atomic::{AtomicBool, Ordering};
        use std::sync::{Arc, Barrier, Mutex};
        use std::thread;

        let registry = Arc::new(Mutex::new(BlobHeatRegistry::new()));
        let policy = super::super::policy::DataGravityPolicy::default();
        let half = policy.decay_half_life;
        let target = hash(0xCD);
        let stop = Arc::new(AtomicBool::new(false));
        let start = Arc::new(Barrier::new(2));

        // Pre-seed the target. Under mutex contention the ticker can
        // sweep all 200 iterations before the bumper ever grabs the
        // lock — without a pre-seed the post-storm `get(&target)`
        // would observe a never-inserted hash. Pre-seeding makes the
        // assertion test the storm's effect on a live entry, not
        // whether the bumper got scheduled at all.
        {
            let now = Instant::now();
            registry
                .lock()
                .unwrap()
                .entry_mut(target, half, now)
                .bump(now);
        }

        // Bump storm.
        let bumper = {
            let registry = registry.clone();
            let stop = stop.clone();
            let start = start.clone();
            thread::spawn(move || {
                start.wait();
                while !stop.load(Ordering::Relaxed) {
                    let now = Instant::now();
                    let mut guard = registry.lock().unwrap();
                    guard.entry_mut(target, half, now).bump(now);
                }
            })
        };

        // Tick loop.
        let ticker = {
            let registry = registry.clone();
            let stop = stop.clone();
            let start = start.clone();
            thread::spawn(move || {
                start.wait();
                for _ in 0..200 {
                    let now = Instant::now();
                    let mut guard = registry.lock().unwrap();
                    let _ = guard.tick(&policy, now);
                }
                stop.store(true, Ordering::Relaxed);
            })
        };

        bumper.join().expect("bumper panicked");
        ticker.join().expect("ticker panicked");

        // Registry must still be usable after the storm — pins that
        // the iter-mut-during-bump path didn't leave the underlying
        // HashMap in a torn state.
        let guard = registry.lock().unwrap();
        assert!(
            guard.get(&target).is_some(),
            "target entry should survive the storm"
        );
    }

    /// LRU eviction under a tight cap with concurrent inserts from
    /// N threads. Asserts len() stays within the cap × shards
    /// envelope (the `entry_mut` len-check + `evict_lru` aren't
    /// transactional under DashMap-less HashMap+Mutex, but the
    /// outer mutex serializes the whole `entry_mut` call so the
    /// overshoot is zero).
    #[test]
    fn blob_heat_lru_cap_holds_under_concurrent_inserts() {
        use std::sync::{Arc, Barrier, Mutex};
        use std::thread;

        let cap = 16usize;
        let registry = Arc::new(Mutex::new(BlobHeatRegistry::with_cap(cap)));
        let half = Duration::from_secs(60);
        let threads = 4usize;
        // Each thread inserts a distinct range of keys past the
        // cap, so eviction must fire repeatedly.
        let inserts_per_thread = 64u8;
        let start = Arc::new(Barrier::new(threads));
        let mut handles = Vec::with_capacity(threads);

        for tid in 0..threads as u8 {
            let registry = registry.clone();
            let start = start.clone();
            handles.push(thread::spawn(move || {
                start.wait();
                for i in 0..inserts_per_thread {
                    let k = hash(tid * inserts_per_thread + i);
                    let now = Instant::now();
                    let mut guard = registry.lock().unwrap();
                    guard.entry_mut(k, half, now);
                }
            }));
        }
        for h in handles {
            h.join().expect("worker panicked");
        }

        let guard = registry.lock().unwrap();
        // The cap is enforced under the outer mutex on the
        // entry_mut hot path — len() must never exceed it.
        assert!(
            guard.len() <= cap,
            "len() {} exceeded cap {} after concurrent inserts; LRU eviction is broken",
            guard.len(),
            cap,
        );
    }
}