epics-ca-rs 0.20.2

EPICS Channel Access protocol client and server
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
use std::collections::HashMap;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::time::{Duration, Instant};

use epics_base_rs::net::AsyncUdpV4;
use epics_base_rs::runtime::sync::mpsc;

use crate::protocol::*;

use super::CoordRequest;

/// Control messages sent INTO the beacon monitor by the coordinator
/// (currently only on TCP-circuit (re)connect). libca `bhe.cpp`
/// resets `averagePeriod` when a fresh client circuit comes up, on
/// the reasoning that an active TCP handshake is fresh evidence the
/// server is alive *now* and any prior beacon-cadence measurements
/// may be stale (the server may have restarted with its beacon
/// counter preserved, OR an older steady-state EMA may misclassify
/// the standard rsrv `online_notify_task` ramp-up as a
/// PeriodCollapse cascade — the cited symptom in archiver-rs's
/// reconnect logs).
pub(crate) enum BeaconControl {
    /// Clear `period_estimate` and `count` for `server_addr` so the
    /// EMA re-establishes from the next observed inter-beacon
    /// interval. `last_id` and `last_seen` are intentionally kept so
    /// the duplicate-detection and stale-prune paths still work.
    ResetServer { server_addr: SocketAddr },
}

/// Why the beacon monitor decided this beacon is "anomalous".
///
/// `FirstSighting` is benign from the *server's* point of view — the
/// IOC is fine, we just hadn't been listening before (or had pruned
/// its `BeaconState` after `BEACON_STALE_THRESHOLD`). It still
/// matters for the search engine: channels stuck in `Searching` /
/// `Disconnected` should re-search immediately because we now know
/// the server is alive. It does NOT justify probing the TCP circuit
/// of operational channels — by definition we already have a working
/// circuit, and an extra EchoProbe under load just risks tripping the
/// 5-s echo timeout in `transport.rs`.
///
/// `IdMismatch` is the sole real-restart signal and warrants the full
/// treatment (search wake-up + EchoProbe to operational circuits, so
/// a half-dead TCP gets surfaced fast).
///
/// `PeriodCollapse` is retired: see the `handle_beacon` classify
/// chain. In practice every site that would have produced it was the
/// IOC's `beacon_emitter` ramp-up cascade after some peer's TCP
/// accept, NOT a real restart. Real restarts reset beacon_id and trip
/// `IdMismatch`; circuits that dropped for the restart receive
/// `BeaconControl::ResetServer` from the coordinator before the
/// cascade arrives. The variant remains for the retained match-arm
/// shape in negative-assertion tests and in case a future
/// distinguishing signature lets us reintroduce a different
/// PeriodCollapse trigger; it is intentionally never produced today.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum BeaconAnomalyKind {
    FirstSighting,
    IdMismatch,
    #[allow(dead_code)]
    PeriodCollapse,
}

// ---------------------------------------------------------------------------
// Per-server beacon state
// ---------------------------------------------------------------------------

struct BeaconState {
    last_id: u32,
    last_seen: Instant,
    /// Estimated period between beacons (exponential moving average,
    /// alpha = 0.25). `None` until the second beacon arrives — at
    /// which point we adopt the first observed inter-beacon
    /// interval as the initial estimate. Mirrors libca `bhe.cpp:51`
    /// where `averagePeriod = -DBL_MAX` is the "no estimate yet"
    /// sentinel and `bhe.cpp:199` sets it to the first measured
    /// `currentPeriod`.
    ///
    /// Why this matters: hardcoding the initial estimate (we used
    /// `Duration::from_secs(15)`) made the EMA start from a value
    /// that was unrelated to the actual server's beacon cadence.
    /// During the standard rsrv `online_notify_task` ramp-up
    /// (server-side beacon emitter starts at 20 ms and doubles up
    /// to 15 s — which `epics-ca-rs/src/server/beacon.rs` also
    /// implements), the first 4-8 beacons all have intervals well
    /// below 15 s / 3, so the PeriodCollapse branch (which fires
    /// when `actual_interval < period_estimate / 3`) tripped on
    /// every one of them. That cascaded into the transport
    /// watchdog flag → echo probe → 5 s timeout → spurious
    /// disconnect → user-visible `get_with_metadata(timeout=2.0)`
    /// failures observed in the mini-beamline IOC against its own
    /// epics-ca-rs server.
    period_estimate: Option<Duration>,
    count: u64,
}

/// Idle threshold after which a tracked server is forgotten. Mirrors
/// pvxs `beaconCleanInterval` (`client.cpp` 2 × 180 s default). When a
/// long-silent server resumes beacons, the next sighting becomes
/// `first_sighting = true` and naturally takes the anomaly path —
/// without this prune, in-sequence beacons after long silence would
/// keep `first_sighting = false` and miss the rescan kick. This
/// replaces the previous "soft poke on every beacon" mechanism, which
/// caused steady-state amplification (multi-IOC networks beaconing
/// within ~6 s aggregate kept the search engine in 200 ms fast-tick
/// mode indefinitely).
const BEACON_STALE_THRESHOLD: Duration = Duration::from_secs(180);

// ---------------------------------------------------------------------------
// Beacon monitor task
// ---------------------------------------------------------------------------

/// Receives beacon messages from the CA repeater, detects anomalies (IOC
/// restart), and notifies the coordinator to rescan affected channels.
/// Re-registration interval: if no beacons for this long, re-register
/// with the repeater in case it restarted.
const REREGISTER_INTERVAL: Duration = Duration::from_secs(300);

pub(crate) async fn run_beacon_monitor(
    coord_tx: mpsc::UnboundedSender<CoordRequest>,
    control_rx: mpsc::UnboundedReceiver<BeaconControl>,
) {
    run_beacon_monitor_inner(
        coord_tx,
        control_rx,
        #[cfg(feature = "cap-tokens")]
        None,
    )
    .await;
}

/// Variant that gates beacon acceptance on a [`SignedBeaconVerifier`].
/// When `verifier` is `Some(...)`, the monitor only forwards beacons
/// to the search engine after a valid companion datagram (cmmd=0xCAFE,
/// see [`crate::server::signed_beacon`]) has been received and
/// verified for the same (server, beacon_id) within the
/// `max_age_secs` window.
#[cfg(feature = "cap-tokens")]
#[allow(dead_code)]
pub(crate) async fn run_beacon_monitor_with_verifier(
    coord_tx: mpsc::UnboundedSender<CoordRequest>,
    control_rx: mpsc::UnboundedReceiver<BeaconControl>,
    verifier: std::sync::Arc<crate::server::signed_beacon::SignedBeaconVerifier>,
) {
    run_beacon_monitor_inner(coord_tx, control_rx, Some(verifier)).await;
}

async fn run_beacon_monitor_inner(
    coord_tx: mpsc::UnboundedSender<CoordRequest>,
    mut control_rx: mpsc::UnboundedReceiver<BeaconControl>,
    #[cfg(feature = "cap-tokens")] verifier: Option<
        std::sync::Arc<crate::server::signed_beacon::SignedBeaconVerifier>,
    >,
) {
    // The CA repeater forwards every accepted beacon to its
    // registered clients over loopback only — there's no multi-NIC
    // routing here. Bind exclusively on `127.0.0.1` so we get the
    // SO_REUSEADDR-friendly per-NIC machinery for free without
    // wasting per-NIC sockets that would never see traffic.
    let socket = match AsyncUdpV4::bind_single(Ipv4Addr::LOCALHOST, 0, false) {
        Ok(s) => s,
        Err(_) => return,
    };
    // pvxs `udp_collector.cpp` parity (commit a064677e3625): opt
    // the kernel into SO_RXQ_OVFL so a sustained beacon backlog
    // (slow main loop, undersized SO_RCVBUF, mass-restart storm)
    // surfaces as a debug log instead of silent loss. No-op on
    // non-Linux. Diagnostic-only failure is logged at trace.
    if let Err(e) = socket.enable_so_rxq_ovfl() {
        tracing::trace!(
            target: "epics_ca_rs::client::beacon_monitor",
            error = %e,
            "SO_RXQ_OVFL enable failed (non-fatal)"
        );
    }
    let mut prev_drops_beacon: u32 = 0;

    // Initial registration with retry
    for attempt in 0..3u32 {
        if register_with_repeater(&socket).await.is_ok() {
            break;
        }
        if attempt < 2 {
            tokio::time::sleep(Duration::from_millis(200 * (1 << attempt))).await;
        }
    }

    // When `verifier` is set, this map remembers which
    // (server_ip, server_port, beacon_id) tuples have been
    // authenticated by a recent companion datagram. Beacons whose
    // tuple isn't here within `max_age_secs` get dropped (or merely
    // counted, when `require_signed` is false).
    #[cfg(feature = "cap-tokens")]
    let mut verified_tuples: HashMap<(u32, u16, u32), std::time::Instant> = HashMap::new();
    #[cfg(feature = "cap-tokens")]
    let require_signed = !matches!(
        epics_base_rs::runtime::env::get("EPICS_CA_BEACON_REQUIRE_SIGNED").as_deref(),
        Some("NO" | "no" | "0" | "false" | "FALSE")
    );
    let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
    // EPICS_RS_CLIENT_IGNORE snapshot — Rust-only client-side IP
    // quarantine (NOT C `EPICS_IOC_IGNORE_SERVERS`, which is
    // server-side; see super::epics_rs_client_ignore docstring).
    // Captured at task start so the beacon hot path stays env-read-
    // free; admins restart the IOC to apply a new ignore list.
    let ignored_servers: std::collections::HashSet<Ipv4Addr> =
        super::epics_rs_client_ignore().into_iter().collect();
    // Beacons are 16 B but the repeater may concatenate VERSION + RSRV_IS_UP
    // and forward client-noop traffic. Use 4 KB so chained datagrams are
    // received intact.
    let mut buf = [0u8; 4096];
    // Set to false once the control channel's last sender drops
    // (CaClient shutdown). After that we stop polling that branch so
    // we don't busy-loop on Ready(None); UDP / re-register continue.
    let mut control_rx_open = true;

    loop {
        // libca bhe-on-connect parity: a coordinator-issued
        // ResetServer (sent on TransportEvent::ServerConnected) clears
        // the per-server EMA so the next beacon reseeds
        // `period_estimate` from the live cadence. Without this, an
        // archiver that reconnects to a server whose `online_notify`
        // ramp-up is in progress sees a PeriodCollapse cascade against
        // its stale steady-state estimate.
        let recv_fut = tokio::time::timeout(
            REREGISTER_INTERVAL,
            socket.recv_with_meta_with_drops(&mut buf),
        );
        let (meta, drops) = tokio::select! {
            ctrl = control_rx.recv(), if control_rx_open => {
                match ctrl {
                    Some(BeaconControl::ResetServer { server_addr }) => {
                        apply_reset_server(&mut servers, server_addr);
                    }
                    None => {
                        control_rx_open = false;
                    }
                }
                continue;
            }
            recv = recv_fut => {
                match recv {
                    Ok(Ok(v)) => v,
                    Ok(Err(_)) => continue,
                    Err(_) => {
                        // No beacons for 5 minutes — repeater may have restarted
                        let _ = register_with_repeater(&socket).await;
                        continue;
                    }
                }
            }
        };
        if drops != 0 && drops != prev_drops_beacon {
            tracing::debug!(
                target: "epics_ca_rs::client::beacon_monitor",
                prev = prev_drops_beacon,
                drops,
                "CA beacon RX socket buffer overflow"
            );
        }
        prev_drops_beacon = drops;
        let len = meta.n;
        if len < CaHeader::SIZE {
            continue;
        }

        // Walk every CA frame in the datagram so chained beacons aren't
        // dropped when the repeater coalesces them.
        let mut offset = 0;
        while offset + CaHeader::SIZE <= len {
            let Ok(hdr) = CaHeader::from_bytes(&buf[offset..len]) else {
                break;
            };
            // C `rsrv/camessage.c:2452` rejects misaligned m_postsize.
            // UDP path drops silently. Without this check, the
            // round-up below would advance into the next message's
            // header bytes.
            if (hdr.postsize as usize) & 0x7 != 0 {
                break;
            }
            let payload_padded = hdr.postsize as usize;
            let frame_len = (CaHeader::SIZE + payload_padded).max(CaHeader::SIZE);
            // Bail out before advancing if the announced frame
            // length runs past the datagram. Otherwise the
            // post-advance slice clamp would silently hand the
            // verifier a truncated body and the parser would
            // continue from a misaligned offset.
            if offset.saturating_add(frame_len) > len {
                break;
            }
            // Used by the cap-tokens companion-frame slice below; the
            // attribute keeps the unused-variable lint quiet when the
            // feature is off.
            #[cfg_attr(not(feature = "cap-tokens"), allow(unused_variables))]
            let frame_start = offset;
            offset += frame_len;

            // Signed-beacon companion (cmmd=0xCAFE, cap-tokens
            // feature). Verify the signature and stash the tuple as
            // "authenticated" so the matching beacon is acceptable.
            #[cfg(feature = "cap-tokens")]
            if hdr.cmmd == crate::server::signed_beacon::CA_PROTO_RSRV_BEACON_SIG {
                if let Some(ref v) = verifier {
                    let frame = &buf[frame_start..frame_start + frame_len];
                    // G3: bind the signed payload's announced server_ip
                    // to the UDP source IP — but only when the datagram
                    // didn't transit a repeater. The CA repeater
                    // (`repeater.cpp:626-630`, our `repeater.rs:224-228`)
                    // forwards the companion verbatim while the kernel
                    // rewrites the L3 source to the repeater's local
                    // socket address (typically 127.0.0.1). Under the
                    // standard production topology the client beacon
                    // socket is bound to LOCALHOST (line 160) and ONLY
                    // receives via the repeater, so `meta.src` is
                    // always loopback and a strict G3 binding would
                    // reject every legitimate companion.
                    //
                    // Replay protection without G3 here rests on (a)
                    // the cryptographic signature over (server_ip,
                    // server_port, beacon_id, ts) — an attacker can't
                    // mint a fresh tuple without the signing key —
                    // and (b) the `ts` freshness window enforced in
                    // SignedBeaconVerifier::verify. G3 still fires
                    // when a non-loopback path is observed (e.g. a
                    // future direct-LAN deployment without a
                    // repeater).
                    let src_ip = match meta.src.ip() {
                        std::net::IpAddr::V4(v) => v,
                        std::net::IpAddr::V6(_) => {
                            metrics::counter!("ca_client_signed_beacon_failures_total")
                                .increment(1);
                            continue;
                        }
                    };
                    let via_repeater = src_ip.is_loopback();
                    match v.verify(frame) {
                        Ok((ip, port, beacon_id))
                            if !via_repeater && Ipv4Addr::from(ip) != src_ip =>
                        {
                            tracing::debug!(
                                announced = %Ipv4Addr::from(ip),
                                actual = %src_ip,
                                port, beacon_id,
                                "signed beacon source-IP mismatch (G3)"
                            );
                            metrics::counter!("ca_client_signed_beacon_source_ip_mismatch_total")
                                .increment(1);
                        }
                        Ok((ip, port, beacon_id)) => {
                            // G2: cap verified_tuples on the companion-
                            // only path. The unsigned-beacon path GC's
                            // it via retain() at line 181, but a peer
                            // sending only signed companions would
                            // otherwise grow it linearly.
                            const MAX_VERIFIED_TUPLES: usize = 8192;
                            if verified_tuples.len() >= MAX_VERIFIED_TUPLES {
                                let max_age = std::time::Duration::from_secs(v.max_age_secs.max(1));
                                let now = std::time::Instant::now();
                                verified_tuples.retain(|_, t| now.duration_since(*t) <= max_age);
                            }
                            verified_tuples
                                .insert((ip, port, beacon_id), std::time::Instant::now());
                            metrics::counter!("ca_client_signed_beacon_verified_total")
                                .increment(1);
                        }
                        Err(e) => {
                            tracing::debug!(error = ?e,
                                "signed beacon companion failed verification");
                            metrics::counter!("ca_client_signed_beacon_failures_total")
                                .increment(1);
                        }
                    }
                }
                continue;
            }

            if hdr.cmmd != CA_PROTO_RSRV_IS_UP {
                continue;
            }

            // Verifier policy: by default, drop unauthenticated
            // beacons when a verifier is configured. The companion
            // signed-beacon datagram can arrive ~simultaneously; we
            // check against the verified-tuple set populated above and
            // GC stale entries every iteration to keep the map bounded.
            //
            // EPICS_CA_BEACON_REQUIRE_SIGNED=NO opts out — unsigned
            // beacons are accepted (with a counter increment) so
            // operators can run mixed deployments where some servers
            // have rolled out signing and some haven't yet.
            #[cfg(feature = "cap-tokens")]
            if let Some(ref v) = verifier {
                let max_age = std::time::Duration::from_secs(v.max_age_secs.max(1));
                let now = std::time::Instant::now();
                verified_tuples.retain(|_, t| now.duration_since(*t) <= max_age);
                // Anchor the lookup on `hdr.available` so the
                // key matches the companion-side insert under the
                // standard production topology (client receives via
                // the CA repeater on LOCALHOST). The CA server emits
                // `m_available = 0` per C `online_notify.c:69`, and
                // the repeater rewrites the field to the original
                // server's source IP before forwarding (C
                // `repeater.cpp:626-630`; our `repeater.rs:224-228`).
                // The companion datagram carries `server_ip` in its
                // signed payload (`signed_beacon.rs::build_packet`
                // bytes 12..16), which `verify()` returns as the same
                // BE-bytes-as-u32 coordinate. The two coordinates now
                // line up regardless of whether the matching beacon
                // arrives directly or via a repeater.
                //
                // The earlier `meta.src.ip()` keying was correct in
                // synthetic direct-LAN tests but broke production: a
                // loopback-bound monitor socket only ever sees
                // `meta.src = 127.0.0.1:<repeater_port>` from the
                // repeater, so the lookup key was always
                // `(127.0.0.1, port, beacon_id)` while the insert key
                // was `(real_server_ip, port, beacon_id)` — every
                // legitimate signed beacon was rejected with
                // `EPICS_CA_BEACON_REQUIRE_SIGNED=YES`. See
                // `verified_tuple_key_matches_via_repeater` for the
                // regression case.
                //
                // Fall back to `meta.src.ip()` when `hdr.available` is
                // zero (e.g. a malformed or non-rewritten beacon).
                // That key won't hit under the loopback topology, but
                // it lets the direct-LAN path keep working.
                let lookup_ip_u32 = if hdr.available != 0 {
                    hdr.available
                } else {
                    match meta.src.ip() {
                        std::net::IpAddr::V4(v) => u32::from_be_bytes(v.octets()),
                        std::net::IpAddr::V6(_) => 0,
                    }
                };
                let key = (lookup_ip_u32, hdr.count, hdr.cid);
                if !verified_tuples.contains_key(&key) {
                    metrics::counter!("ca_client_unsigned_beacon_drops_total").increment(1);
                    if require_signed {
                        continue;
                    }
                }
            }

            handle_beacon(hdr, &mut servers, &coord_tx, &ignored_servers);
        }
    }
}

/// libca `bhe.cpp` "new client connect" parity. Clears the EMA so the
/// next beacon reseeds `period_estimate` from the live cadence;
/// preserves `last_id` and `last_seen` so duplicate-detection and
/// stale-prune still work across the reset.
///
/// `circuit_addr` is the TCP `server_addr` of the freshly-connected
/// circuit. The `BeaconState` map is keyed by the beacon's *announced*
/// address — per `handle_beacon`'s comment "new servers always set
/// available=INADDR_ANY (0)", so the dominant key is `0.0.0.0:port`,
/// NOT the TCP address. Multi-homed IOCs are a second case where the
/// announced IP is one NIC and the circuit reaches the server via a
/// different NIC.
///
/// We resolve the right entry conservatively. A naive port-only
/// sweep would silently blind unrelated IOCs that share the default
/// port 5064 across the network: post-reset `count=0` gates
/// `PeriodCollapse` for the next 4 beacons, so a same-port IOC that
/// restarts with a preserved beacon counter inside that window
/// would go undetected — a real correctness regression, not just a
/// noise issue.
///
/// Resolution order — each step is **terminal** (early-return on
/// hit). The terminality matters: a single CA server announces
/// consistently with one IP per process. `server/beacon.rs` computes
/// `server_ip` once at task start and reuses it for every beacon, so
/// a given IOC produces *one* beacon-state key (real IP or INADDR_ANY,
/// never both). If both an exact-match entry and a `0.0.0.0:port`
/// entry exist simultaneously, they represent DIFFERENT IOCs sharing
/// the port. Falling through past a hit would silently blind the
/// other IOC's PeriodCollapse for the next ~4 beacons.
///
///   1. **Exact match** `circuit_addr` — works when the IOC announced
///      its real IP (rare for new IOCs, common for older / pvxs
///      servers). On hit: reset and return.
///   2. **`0.0.0.0:port`** (INADDR_ANY) — the dominant case for
///      modern IOCs. Only consulted when (1) missed. On hit: reset
///      and return.
///   3. **Single unambiguous `*:port` entry** — only when both (1)
///      and (2) missed AND exactly one other `*:port` entry exists.
///      Catches the multi-homed IOC case (announced via NIC A,
///      circuit via NIC B) without touching unrelated same-port
///      IOCs.
///
/// When (1)/(2) miss and there are multiple ambiguous `*:port`
/// entries, we deliberately do NOT reset — a multi-homed IOC sharing
/// port 5064 with unrelated IOCs reverts to the original
/// post-reconnect cascade behaviour. Acceptable trade-off: the
/// alternative was silent restart-detection failure for every other
/// IOC on that port.
fn apply_reset_server(servers: &mut HashMap<SocketAddr, BeaconState>, circuit_addr: SocketAddr) {
    let port = circuit_addr.port();
    let inaddr_any = SocketAddr::V4(SocketAddrV4::new(std::net::Ipv4Addr::UNSPECIFIED, port));

    if let Some(s) = servers.get_mut(&circuit_addr) {
        s.period_estimate = None;
        s.count = 0;
        return;
    }
    if let Some(s) = servers.get_mut(&inaddr_any) {
        s.period_estimate = None;
        s.count = 0;
        return;
    }

    // Snapshot the matching keys to release the borrow before
    // mutating; HashMap::iter_mut filtered to a single key gets
    // gnarly fast.
    let port_keys: Vec<SocketAddr> = servers
        .keys()
        .filter(|k| k.port() == port)
        .copied()
        .collect();
    if port_keys.len() == 1 {
        if let Some(s) = servers.get_mut(&port_keys[0]) {
            s.period_estimate = None;
            s.count = 0;
        }
    }
    // else: ambiguous (zero or multiple non-INADDR_ANY *:port
    // entries). Skip — see doc comment for rationale.
}

fn handle_beacon(
    hdr: CaHeader,
    servers: &mut HashMap<SocketAddr, BeaconState>,
    coord_tx: &mpsc::UnboundedSender<CoordRequest>,
    ignored_servers: &std::collections::HashSet<Ipv4Addr>,
) {
    // count = server TCP port (CA v4.1+), data_type = protocol version.
    //
    // C `udpiiu::beaconAction` (`modules/ca/src/client/udpiiu.cpp:
    // 770-779`): when `msg.m_count == 0` (old V<4.1 server with
    // no port in the beacon), the client uses `this->serverPort` —
    // which is set from the EPICS_CA_SERVER_PORT env var at
    // udpiiu construction (`udpiiu.cpp:155-156`,
    // `envGetInetPortConfigParam`). Pre-fix Rust hardcoded
    // CA_SERVER_PORT (= 5064), ignoring the env override. A site
    // that runs its IOCs on a non-default port (via
    // EPICS_CA_SERVER_PORT) would see old-style beacons routed to
    // 5064 — effectively dropped because no listener is there.
    let server_port = if hdr.count != 0 {
        hdr.count
    } else {
        epics_base_rs::runtime::env::get("EPICS_CA_SERVER_PORT")
            .and_then(|s| s.parse::<u16>().ok())
            .unwrap_or(CA_SERVER_PORT)
    };
    let beacon_id = hdr.cid;

    // New servers always set available=INADDR_ANY (0).  Use 0.0.0.0
    // as-is for beacon tracking — each IOC still has a unique port,
    // matching the approach used by the C CA client (libca).
    let server_ip = Ipv4Addr::from(hdr.available.to_be_bytes());
    // EPICS_RS_CLIENT_IGNORE: silently drop beacons announcing a
    // quarantined server so the anomaly-poke path doesn't keep
    // waking the search engine for a quarantined IOC. Rust-only
    // extension; NOT the C EPICS_IOC_IGNORE_SERVERS (server-side
    // name list, different semantics — see
    // client::epics_rs_client_ignore docstring). Filter applies only when the announced IP is concrete —
    // INADDR_ANY (0) means "I'm an IOC announcing myself, use the
    // UDP source," which the search engine resolves separately.
    if !server_ip.is_unspecified() && ignored_servers.contains(&server_ip) {
        return;
    }
    let server_addr = SocketAddr::V4(SocketAddrV4::new(server_ip, server_port));
    let now = Instant::now();

    // Drop entries idle past `BEACON_STALE_THRESHOLD` so a long-silent
    // server's revival lands on the `first_sighting = true` path and
    // triggers the anomaly poke naturally (pvxs `tickBeaconClean`
    // parity). This is what protects the search engine from staying in
    // 200 ms fast-tick mode forever in a steady-state network.
    servers.retain(|_, s| now.duration_since(s.last_seen) < BEACON_STALE_THRESHOLD);

    // G1: cap the per-server BeaconState map. With
    // EPICS_CA_BEACON_REQUIRE_SIGNED=NO an attacker can spoof
    // beacons with arbitrary `available`/`count` to grow the map.
    // Reap entries idle for ≥5× period_estimate when the cap is hit.
    const MAX_BEACON_SERVERS: usize = 4096;
    let first_sighting = !servers.contains_key(&server_addr);
    if first_sighting && servers.len() >= MAX_BEACON_SERVERS {
        let cutoff_threshold = Duration::from_secs(15 * 5);
        servers.retain(|_, s| now.duration_since(s.last_seen) < cutoff_threshold);
    }
    let entry = servers.entry(server_addr).or_insert_with(|| BeaconState {
        last_id: beacon_id.wrapping_sub(1),
        last_seen: now,
        period_estimate: None,
        count: 0,
    });

    let actual_interval = now.duration_since(entry.last_seen);
    let expected_next_id = entry.last_id.wrapping_add(1);

    // Multi-NIC / repeater duplicate detection: the SAME beacon (same
    // id, arriving microseconds apart through different paths) used to
    // trip the period-collapse branch below and fire a spurious
    // anomaly on every duplicate. Drop the second copy outright so
    // the search engine isn't woken twice for one beacon. Without
    // soft-poke-on-every-beacon (removed earlier this round) this
    // misclassification was masked by the throttle; the prune-only
    // design surfaces it.
    //
    // We deliberately do NOT refresh `last_seen` here: a server stuck
    // emitting only same-id duplicates (frozen / wedged) will be
    // pruned at `BEACON_STALE_THRESHOLD` and its next real (fresh-id)
    // beacon will land on the `first_sighting = true` path — the
    // desired anomaly behaviour for a recovered server.
    if !first_sighting && beacon_id == entry.last_id {
        return;
    }

    // libca `bhe.cpp:159-182` parity (narrowed): drop beacons
    // whose sequence number jumps FORWARD by 2 or 3 (likely a
    // duplicate route that's slightly ahead of us, or a brief
    // input-queue overrun) or BACKWARDS by 1-4 (a redundant route
    // delivering an older copy). Without this, those cases hit
    // the `IdMismatch` branch below and flag the transport
    // watchdog for ~30 s on what is in reality a healthy IOC.
    //
    // We deliberately narrow libca's backwards window from 256 to
    // 4. libca conflates "duplicate route" with "id reset to a
    // small number" because it relies on the period-collapse
    // check to detect restarts. Our `IdMismatch` branch detects
    // restart-to-1 directly via the id sequence and catches
    // sub-50 ms restarts that period-collapse misses. The wider
    // libca window would swallow those into the dedup path.
    //
    // Update `last_id` to the new value so the next genuine
    // beacon computes its advance from the most recent
    // observation (also matches libca, where
    // `lastBeaconNumber = beaconNumber` runs before the discard
    // checks). `last_seen`, `count`, and `period_estimate` are
    // left untouched — the drop-only-dups path keeps a server
    // stuck emitting nothing-but-dups on the
    // BEACON_STALE_THRESHOLD prune trajectory.
    const BACKWARDS_DUP_WINDOW: u32 = 4;
    if !first_sighting {
        let advance = beacon_id.wrapping_sub(entry.last_id);
        let backwards_dup = advance > u32::MAX - BACKWARDS_DUP_WINDOW;
        let small_forward_dup = advance == 2 || advance == 3;
        if backwards_dup || small_forward_dup {
            entry.last_id = beacon_id;
            return;
        }
    }

    // Anomaly: beacon_id not monotonically increasing (IOC restarted
    // with a fresh sequence), OR period suddenly dropped below 1/3 of
    // the estimated steady-state period (IOC restarted and is in its
    // fast-beacon initial phase). Also: first time we've seen this
    // server — libca treats unknown-server beacons as a hint to
    // re-search immediately so channels still in `Searching` wake up
    // on the new IOC instead of waiting their full bucket cycle.
    //
    // Floor the period-collapse check at 50 ms — multi-NIC duplicate
    // beacons that happen to use the next sequence id (rare but
    // possible if the network reorders) would otherwise still
    // satisfy `actual_interval < period_estimate / 3` for any
    // nonzero period. 50 ms safely separates "duplicate" from
    // "legitimate fast-beacon initial phase" (real IOCs send
    // every 100-500 ms during startup).
    const MIN_PERIOD_COLLAPSE_INTERVAL: Duration = Duration::from_millis(50);
    // Classify in priority order: FirstSighting wins because there's
    // no prior `last_id` / `period_estimate` to make the other two
    // checks meaningful. IdMismatch beats the period-collapse branch
    // because a real restart (id reset to 1) is the dispositive
    // signal even if the inter-beacon interval also happens to be
    // sub-period.
    //
    // The period-collapse branch (id monotonic + interval suddenly
    // dropped below `period_estimate / 3`) does NOT fire
    // `PeriodCollapse` any more. That signature in practice
    // identifies the IOC's `rsrv online_notify_task` ramp-up restart
    // (`server/beacon.rs:124`, `tcp.rs:450`: `beacon_reset.notify_one`
    // on every TCP accept/disconnect), NOT a real server restart.
    // Real restarts reset beacon_id to 0 and trip `IdMismatch` above;
    // any client whose own circuit broke for the restart also gets a
    // `BeaconControl::ResetServer` from the coordinator
    // (`apply_reset_server`) which clears the EMA pre-emptively. The
    // remaining cases the period-collapse heuristic used to catch
    // were ALL false positives: another client on the network
    // connected to the same IOC and our beacon_monitor saw the
    // resulting ramp-up cascade against our mature ~15 s EMA. That
    // produced a stream of `tracing::warn!("IOC may have restarted")`
    // + transport-watchdog sticky flags + reconnect cascades for
    // healthy circuits.
    //
    // Self-reset path: clear `period_estimate` and `count` so the
    // ramp-up cascade reseeds the EMA from the live cadence (same
    // post-condition as `apply_reset_server`). The state-update
    // block below runs unchanged.
    let anomaly_kind = if first_sighting {
        Some(BeaconAnomalyKind::FirstSighting)
    } else if beacon_id != expected_next_id {
        Some(BeaconAnomalyKind::IdMismatch)
    } else if entry.count > 3
        && actual_interval > MIN_PERIOD_COLLAPSE_INTERVAL
        && entry
            .period_estimate
            .is_some_and(|est| actual_interval < est / 3)
    {
        entry.period_estimate = None;
        entry.count = 0;
        None
    } else {
        None
    };

    // Update state.
    entry.last_id = beacon_id;
    entry.last_seen = now;
    entry.count += 1;

    if entry.count > 1 {
        // First observed inter-beacon interval defines the initial
        // estimate; subsequent samples blend in via the EMA. Mirrors
        // libca `bhe.cpp:199` (`this->averagePeriod = currentPeriod`
        // on the second beacon, after the `averagePeriod < 0.0`
        // sentinel guard). See `BeaconState::period_estimate` doc
        // for why a hardcoded 15 s placeholder caused a false
        // PeriodCollapse cascade against ramp-up beacon emitters.
        match entry.period_estimate {
            None => {
                entry.period_estimate = Some(actual_interval);
            }
            Some(prev) => {
                let alpha = 0.25;
                let new_estimate = Duration::from_secs_f64(
                    prev.as_secs_f64() * (1.0 - alpha) + actual_interval.as_secs_f64() * alpha,
                );
                entry.period_estimate = Some(new_estimate);
            }
        }
    }

    // Search-engine wake-up (libca `udpiiu::beaconAnomalyNotify`):
    // ONLY on a classified anomaly. The earlier "soft poke on every
    // beacon" code amplified normal beacon traffic into a permanent
    // fast-tick search storm whenever multiple IOCs beaconed within
    // the engine's revolution window — keep that path lean.
    if let Some(kind) = anomaly_kind {
        let _ = coord_tx.send(CoordRequest::ForceRescanServer { server_addr, kind });
    }

    // Transport-watchdog notification (libca `tcpRecvWatchdog::
    // beaconArrivalNotify` / `beaconAnomalyNotify`). Routed via the
    // coordinator to the per-circuit read loop, where it either
    // pushes the deadline forward (healthy beacon) or sets a sticky
    // anomaly flag (id-mismatch / period-collapse) that suppresses
    // subsequent healthy-beacon refreshes until the next data
    // arrival or echo response.
    //
    // FirstSighting is intentionally skipped — and this is a
    // deliberate divergence from libca, worth being honest about.
    // libca's `bhe.cpp:137` path (BHE freshly created via the
    // TCP-connect search-reply route, then first beacon arrives)
    // calls `beaconAnomalyNotify` as a precaution, setting the
    // tcpRecvWatchdog flag. We don't, on the reasoning that:
    //   * the next healthy beacon (≤ one beacon period) will
    //     refresh the deadline naturally, and
    //   * if the server actually restarted in that one-period
    //     gap, the existing 30 s idle-timeout echo handles it.
    // This keeps the FirstSighting path purely a search-engine
    // concern and avoids per-CaClient false flags on startup,
    // which was the original disconnect-storm trigger.
    let arrival_anomaly = match anomaly_kind {
        None => Some(false),
        Some(BeaconAnomalyKind::IdMismatch | BeaconAnomalyKind::PeriodCollapse) => Some(true),
        Some(BeaconAnomalyKind::FirstSighting) => None,
    };
    if let Some(anomaly) = arrival_anomaly {
        let _ = coord_tx.send(CoordRequest::BeaconArrival {
            server_addr,
            anomaly,
        });
    }
}

// ---------------------------------------------------------------------------
// Repeater registration
// ---------------------------------------------------------------------------

/// Register our socket with the CA repeater at localhost:5065.
async fn register_with_repeater(socket: &AsyncUdpV4) -> Result<(), ()> {
    // We bound to a single loopback NIC, so `local_addrs()` gives the
    // one ephemeral port we want to announce.
    let local_ip = socket
        .local_addrs()
        .into_iter()
        .find_map(|sa| match sa {
            SocketAddr::V4(v4) => Some(*v4.ip()),
            _ => None,
        })
        .unwrap_or(Ipv4Addr::LOCALHOST);

    let mut hdr = CaHeader::new(CA_PROTO_REPEATER_REGISTER);
    hdr.available = u32::from_be_bytes(local_ip.octets());

    // Honour `EPICS_CA_REPEATER_PORT` so the beacon monitor and the
    // daemon agree when operators override the default — without this
    // the monitor would silently fail to re-register every 5 min
    // against a non-default repeater. libca `udpiiu.cpp:168` resolves
    // the same env var.
    let repeater_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, repeater_port()));
    socket
        .send_to(&hdr.to_bytes(), repeater_addr)
        .await
        .map_err(|_| ())?;

    // Wait for REPEATER_CONFIRM.
    let mut buf = [0u8; 64];
    let result = tokio::time::timeout(Duration::from_millis(500), async {
        loop {
            // Brief CONFIRM wait — drop counter is monitored by the
            // long-running run_beacon_monitor_inner loop on the same
            // socket. Here we reuse `recv_with_meta_with_drops` for
            // pattern consistency but ignore drops (the long loop is
            // already tracking).
            let (meta, _drops) = socket
                .recv_with_meta_with_drops(&mut buf)
                .await
                .map_err(|_| ())?;
            let len = meta.n;
            if len >= CaHeader::SIZE {
                if let Ok(resp) = CaHeader::from_bytes(&buf[..len]) {
                    if resp.cmmd == CA_PROTO_REPEATER_CONFIRM {
                        return Ok::<(), ()>(());
                    }
                }
            }
        }
    })
    .await;

    match result {
        Ok(Ok(())) => Ok(()),
        _ => Err(()),
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::time::Instant;

    /// `BEACON_STALE_THRESHOLD` is exactly 180 s (mirrors pvxs
    /// `tickBeaconClean` 2 × beaconCleanInterval / 2 default), and
    /// the prune sweep retains entries seen within the window while
    /// dropping older ones. The prune is what makes long-silent
    /// servers' revival hit the `first_sighting = true` anomaly
    /// path — without it, an in-sequence beacon after long silence
    /// wouldn't kick the search engine out of slow-cadence retry.
    #[test]
    fn beacon_stale_threshold_is_180s() {
        assert_eq!(BEACON_STALE_THRESHOLD, Duration::from_secs(180));
    }

    /// Multi-NIC / repeater duplicate detection: same beacon arriving
    /// twice in quick succession (same `cid`) must NOT fire a second
    /// anomaly request to the coordinator. Without the duplicate
    /// guard, the second copy hit the period-collapse branch
    /// (actual_interval ≈ 0 < period_estimate/3) and rescheduled
    /// every pending search a second time. With soft-poke removed
    /// earlier this round, the misclassification surfaces as a
    /// permanent fast-tick spam in dual-NIC environments.
    #[test]
    fn duplicate_beacon_does_not_double_fire_anomaly() {
        let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
        let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();

        let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
        hdr.count = 5064;
        hdr.cid = 100;
        hdr.available = u32::from_be_bytes([127, 0, 0, 1]);

        // First beacon — first sighting → anomaly fires with the
        // FirstSighting kind so the coordinator can wake searches
        // without probing operational TCP circuits.
        handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
        assert!(matches!(
            rx.try_recv(),
            Ok(CoordRequest::ForceRescanServer {
                kind: BeaconAnomalyKind::FirstSighting,
                ..
            })
        ));
        // Drain any further send (none expected).
        assert!(rx.try_recv().is_err());

        // Second beacon with the SAME cid (true duplicate from another
        // NIC / repeater coalesce) — must be silently dropped.
        handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
        assert!(
            rx.try_recv().is_err(),
            "duplicate same-cid beacon must not fire ForceRescanServer"
        );
    }

    /// A real IOC restart resets the beacon sequence to a fresh value.
    /// Even if the inter-beacon interval is sub-50 ms (faster than
    /// `MIN_PERIOD_COLLAPSE_INTERVAL`), the `beacon_id != expected_next_id`
    /// branch must still classify it as anomaly — the floor only protects
    /// the period-collapse branch from misfiring on duplicates.
    #[test]
    fn sub_50ms_restart_via_id_mismatch_still_fires_anomaly() {
        let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
        let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();

        let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
        hdr.count = 5064;
        hdr.cid = 100;
        hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
        // First sighting — anomaly fires.
        handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
        assert!(rx.try_recv().is_ok());

        // Sub-50ms later, IOC restarts: id resets to 1 (not the
        // expected id=101). period_estimate is 15s default; even
        // though actual_interval < 50ms now, the id-mismatch branch
        // must catch the restart.
        hdr.cid = 1;
        handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
        assert!(
            matches!(
                rx.try_recv(),
                Ok(CoordRequest::ForceRescanServer {
                    kind: BeaconAnomalyKind::IdMismatch,
                    ..
                })
            ),
            "id-mismatch restart must fire IdMismatch anomaly even when interval < 50ms"
        );
    }

    /// Period collapse with monotonic ids (id continues normally
    /// while the inter-beacon interval drops far below the EMA — the
    /// signature of the IOC's `rsrv online_notify_task` `beacon_reset`
    /// being notified on a TCP accept/disconnect, NOT of a real
    /// restart) must NOT fire `PeriodCollapse` any more. Instead, the
    /// monitor self-resets `period_estimate` + `count` so the
    /// resulting ramp-up cascade reseeds the EMA from the live
    /// cadence, exactly like `apply_reset_server` does when the
    /// coordinator routes a `BeaconControl::ResetServer` for our own
    /// circuit. Real ID-preserving restart hypothesis: if our circuit
    /// broke for the restart, the transport-event path issues
    /// ResetServer pre-emptively (see
    /// `reset_on_connect_breaks_period_collapse_cascade_after_reconnect`
    /// below). The case that remained — another client on the
    /// network connecting and triggering OUR beacon_monitor's
    /// PeriodCollapse against a stale EMA — is silently absorbed
    /// here.
    #[test]
    fn monotonic_id_sub_period_clears_ema_no_anomaly() {
        let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
        let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();

        let server: SocketAddr = "127.0.0.1:5064".parse().unwrap();
        // Pre-seed a steady-state entry: 15-s period_estimate, 10
        // beacons in, last_seen far enough back that
        // actual_interval > 50 ms but < 5 s = period_estimate / 3.
        servers.insert(
            server,
            BeaconState {
                last_id: 99,
                last_seen: Instant::now() - Duration::from_millis(200),
                period_estimate: Some(Duration::from_secs(15)),
                count: 10,
            },
        );

        let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
        hdr.count = 5064;
        hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
        hdr.cid = 100; // monotonic — rules out IdMismatch
        handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
        // No `ForceRescanServer` — the cascade is server-side reset,
        // not a restart. `BeaconArrival { anomaly: false }` IS emitted
        // (healthy-beacon refresh path) and that is fine.
        while let Ok(msg) = rx.try_recv() {
            if let CoordRequest::ForceRescanServer { kind, .. } = msg {
                panic!(
                    "monotonic-id, sub-period interval must NOT fire \
                     ForceRescanServer ({kind:?}) — it is the IOC's \
                     `beacon_reset` ramp-up cascade triggered by some \
                     peer's TCP accept, not a real restart"
                );
            }
        }
        // EMA + count cleared so the subsequent ramp-up beacons
        // reseed the estimate from the live cadence. Mirrors
        // `apply_reset_server`'s post-condition.
        let s = servers.get(&server).expect("entry");
        assert!(
            s.period_estimate.is_none(),
            "self-reset must clear period_estimate"
        );
        assert_eq!(
            s.count, 1,
            "self-reset zeros count, then +1 for this beacon"
        );
        assert_eq!(
            s.last_id, 100,
            "last_id advanced normally — the beacon was accepted"
        );
    }

    /// Legitimate fast-beacon (e.g. 200 ms cadence) with monotonically
    /// increasing ids must NOT trip the period-collapse branch — only
    /// the `first_sighting = true` path on the very first beacon. This
    /// tests that the 50 ms floor doesn't fire spurious anomalies on
    /// Regression guard: rsrv `online_notify_task` ramp-up beacons
    /// (20 ms doubling to 15 s — same pattern epics-ca-rs's own
    /// `server/beacon.rs` emits) must NOT fire a stream of
    /// `PeriodCollapse` anomalies on the FIRST sighting of a
    /// freshly-started IOC. Pre-fix the per-server initial
    /// `period_estimate = Duration::from_secs(15)` placeholder
    /// caused every ramp-up beacon past the 50 ms floor (so the
    /// 4th beacon onwards) to satisfy
    /// `actual_interval < 15 s / 3 = 5 s` and trip
    /// `PeriodCollapse`. Mini-beamline IOC users observed this as
    /// 5-s `get_with_metadata(timeout=2.0)` failures driven by the
    /// transport watchdog flag → echo probe → reconnect cascade
    /// downstream. Fix mirrors libca `bhe.cpp:51,199` where
    /// `averagePeriod = -DBL_MAX` until the first measured
    /// `currentPeriod` defines it.
    ///
    /// We reproduce the standard rsrv ramp-up: 20 ms, 40 ms,
    /// 80 ms, 160 ms, 320 ms, 640 ms, 1.28 s, 2.56 s, 5.12 s,
    /// 10.24 s, then capped at 15 s. Only the very first beacon
    /// should fire (FirstSighting). All subsequent ramp-up
    /// beacons must classify as steady-state (no anomaly).
    #[test]
    fn rsrv_rampup_beacons_do_not_fire_period_collapse() {
        // Drive `handle_beacon` directly with a controlled
        // BeaconState so we can advance `last_seen` artificially —
        // the real implementation uses `Instant::now()` and we'd
        // need full virtual time to drive 11 beacons across 30+ s.
        let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
        let server: SocketAddr = "127.0.0.1:5064".parse().unwrap();
        let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();

        let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
        hdr.count = 5064;
        hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
        hdr.cid = 0;

        // Beacon #1 — first sighting.
        handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
        // Drain the FirstSighting event.
        let mut first_sighting_seen = false;
        while let Ok(msg) = rx.try_recv() {
            if matches!(
                msg,
                CoordRequest::ForceRescanServer {
                    kind: BeaconAnomalyKind::FirstSighting,
                    ..
                }
            ) {
                first_sighting_seen = true;
            }
        }
        assert!(first_sighting_seen, "first beacon must fire FirstSighting");

        // Subsequent ramp-up: roll `last_seen` back so each
        // simulated interval is what we want, then handle_beacon
        // computes `actual_interval = now - last_seen`.
        let intervals_ms = [20u64, 40, 80, 160, 320, 640, 1280, 2560, 5120, 10240];
        for (i, &ms) in intervals_ms.iter().enumerate() {
            // Reach into the entry to back-date last_seen by `ms`.
            let s = servers.get_mut(&server).expect("entry");
            s.last_seen = std::time::Instant::now() - Duration::from_millis(ms);
            hdr.cid = (i as u32) + 1;
            handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());

            // Inspect every emitted CoordRequest; PeriodCollapse
            // would surface as a `ForceRescanServer { kind:
            // PeriodCollapse, .. }` here — and that's the bug.
            while let Ok(msg) = rx.try_recv() {
                if let CoordRequest::ForceRescanServer { kind, .. } = msg {
                    assert_ne!(
                        kind,
                        BeaconAnomalyKind::PeriodCollapse,
                        "ramp-up beacon #{} (interval={} ms) must not classify \
                         as PeriodCollapse — see BeaconState::period_estimate doc",
                        i + 2,
                        ms
                    );
                }
            }
        }
    }

    /// healthy fast cadences.
    #[test]
    fn fast_cadence_monotonic_ids_does_not_fire_spurious_anomaly() {
        let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
        let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();

        let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
        hdr.count = 5064;
        hdr.available = u32::from_be_bytes([127, 0, 0, 1]);

        // Five monotonically increasing beacons (ids 100..105). First
        // is first_sighting → ForceRescanServer fires once. Rest must
        // not fire any ForceRescanServer (they will, however, fire
        // BeaconArrival{anomaly=false} — that's the libca-style
        // healthy-beacon watchdog refresh and is correct here).
        for id in 100..105 {
            hdr.cid = id;
            handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
        }
        let mut search_wakes = 0;
        let mut healthy_arrivals = 0;
        let mut anomaly_arrivals = 0;
        while let Ok(msg) = rx.try_recv() {
            match msg {
                CoordRequest::ForceRescanServer { .. } => search_wakes += 1,
                CoordRequest::BeaconArrival { anomaly: false, .. } => healthy_arrivals += 1,
                CoordRequest::BeaconArrival { anomaly: true, .. } => anomaly_arrivals += 1,
                _ => {}
            }
        }
        assert_eq!(
            search_wakes, 1,
            "monotonic fast-cadence beacons must wake searches only on first sighting"
        );
        assert_eq!(
            anomaly_arrivals, 0,
            "monotonic fast-cadence beacons must not flag the watchdog"
        );
        assert_eq!(
            healthy_arrivals, 4,
            "each post-first-sighting healthy beacon must refresh the transport watchdog"
        );
    }

    /// libca `tcpRecvWatchdog::beaconAnomalyNotify` parity: when the
    /// monitor classifies a beacon as a real restart (`IdMismatch`
    /// here), it must emit a `BeaconArrival { anomaly: true }`
    /// alongside the search-wake `ForceRescanServer`. The transport
    /// uses that to set its sticky flag without firing an immediate
    /// echo — the receive watchdog will then expire on schedule.
    #[test]
    fn id_mismatch_emits_anomaly_beacon_arrival() {
        let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
        let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();

        let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
        hdr.count = 5064;
        hdr.cid = 100;
        hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
        // Establish the BHE so the second beacon isn't a first sighting.
        handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
        // Drain first-sighting messages.
        while rx.try_recv().is_ok() {}

        // Restart: id resets to 1.
        hdr.cid = 1;
        handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
        let mut saw_search_wake = false;
        let mut saw_anomaly_arrival = false;
        while let Ok(msg) = rx.try_recv() {
            match msg {
                CoordRequest::ForceRescanServer {
                    kind: BeaconAnomalyKind::IdMismatch,
                    ..
                } => saw_search_wake = true,
                CoordRequest::BeaconArrival { anomaly: true, .. } => saw_anomaly_arrival = true,
                _ => {}
            }
        }
        assert!(saw_search_wake, "IdMismatch must wake searches");
        assert!(
            saw_anomaly_arrival,
            "IdMismatch must flag the transport watchdog"
        );
    }

    /// FirstSighting is purely a per-client bookkeeping event; we
    /// either don't have a circuit yet or just pruned the BHE for an
    /// existing circuit. In either case the watchdog must not be
    /// flagged — emitting `BeaconArrival { anomaly: true }` here was
    /// the original cause of the disconnect storms.
    #[test]
    fn first_sighting_does_not_emit_beacon_arrival() {
        let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
        let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();

        let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
        hdr.count = 5064;
        hdr.cid = 100;
        hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
        handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());

        let mut saw_first_sighting = false;
        let mut saw_arrival = false;
        while let Ok(msg) = rx.try_recv() {
            match msg {
                CoordRequest::ForceRescanServer {
                    kind: BeaconAnomalyKind::FirstSighting,
                    ..
                } => saw_first_sighting = true,
                CoordRequest::BeaconArrival { .. } => saw_arrival = true,
                _ => {}
            }
        }
        assert!(saw_first_sighting, "first sighting must wake searches");
        assert!(
            !saw_arrival,
            "first sighting must not touch the transport watchdog"
        );
    }

    /// libca `bhe.cpp:179` parity: a forward jump of 2 or 3 in the
    /// beacon sequence is treated as a duplicate-route artifact, not
    /// an anomaly. With lazy-echo this matters: classifying it as
    /// `IdMismatch` would set the transport watchdog flag and
    /// suppress healthy-beacon refreshes for the next ~30 s on what
    /// is in reality a perfectly healthy IOC. The drop-only-dup
    /// path must update `last_id` so the next genuine beacon
    /// computes its advance against the most recent observation.
    #[test]
    fn small_forward_advance_is_dropped_not_classified() {
        let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
        let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();

        let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
        hdr.count = 5064;
        hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
        // Establish steady-state beacons (ids 100..103).
        for id in 100..103 {
            hdr.cid = id;
            handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
        }
        while rx.try_recv().is_ok() {}

        // Advance of 2 (last_id = 102, next id = 104) — must drop.
        hdr.cid = 104;
        handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
        assert!(
            rx.try_recv().is_err(),
            "advance=2 must be silently dropped, not classified as anomaly"
        );

        // Advance of 3 from the just-updated 104 — also drop.
        hdr.cid = 107;
        handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
        assert!(rx.try_recv().is_err(), "advance=3 must be silently dropped");

        // last_id should now be 107 (drop path still updates it).
        // The next monotonic beacon (108 = advance=1) is healthy.
        hdr.cid = 108;
        handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
        let mut saw_arrival_healthy = false;
        let mut saw_anomaly = false;
        while let Ok(msg) = rx.try_recv() {
            match msg {
                CoordRequest::BeaconArrival { anomaly: false, .. } => saw_arrival_healthy = true,
                CoordRequest::BeaconArrival { anomaly: true, .. }
                | CoordRequest::ForceRescanServer { .. } => saw_anomaly = true,
                _ => {}
            }
        }
        assert!(
            saw_arrival_healthy,
            "after dropped dups, advance=1 must classify as healthy"
        );
        assert!(
            !saw_anomaly,
            "monotonic recovery from drop sequence must not fire anomaly"
        );
    }

    /// Backwards advance (within libca's 256-id window) is also
    /// dropped — same reasoning as the small-forward case but for
    /// duplicates that arrive late through a slower NIC path.
    #[test]
    fn small_backwards_advance_is_dropped() {
        let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
        let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();

        let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
        hdr.count = 5064;
        hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
        for id in 100..103 {
            hdr.cid = id;
            handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
        }
        while rx.try_recv().is_ok() {}

        // last_id is 102. A late copy with id=101 — wrapping_sub
        // gives u32::MAX (advance treated as 0xFFFFFFFF, > MAX-256).
        hdr.cid = 101;
        handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
        assert!(
            rx.try_recv().is_err(),
            "backwards-by-1 (within 256) must drop"
        );
    }

    #[test]
    fn stale_prune_drops_idle_entries_only() {
        // Anchor `now` a fixed span ahead of a real `base` so the back-dated
        // `last_seen` values below subtract without underflowing Instant on
        // Windows (QPC-since-boot, where uptime may be shorter than the span).
        let base = Instant::now();
        let now = base + Duration::from_secs(300);
        let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
        let fresh: SocketAddr = "127.0.0.1:5064".parse().unwrap();
        let stale: SocketAddr = "127.0.0.1:5065".parse().unwrap();
        servers.insert(
            fresh,
            BeaconState {
                last_id: 0,
                last_seen: now - Duration::from_secs(10),
                period_estimate: Some(Duration::from_secs(15)),
                count: 5,
            },
        );
        servers.insert(
            stale,
            BeaconState {
                last_id: 0,
                last_seen: now - Duration::from_secs(300),
                period_estimate: Some(Duration::from_secs(15)),
                count: 5,
            },
        );
        // The prune logic in handle_beacon: same retain expression.
        servers.retain(|_, s| now.duration_since(s.last_seen) < BEACON_STALE_THRESHOLD);
        assert!(
            servers.contains_key(&fresh),
            "fresh entry must survive prune"
        );
        assert!(
            !servers.contains_key(&stale),
            "180-s-idle entry must be pruned"
        );
    }

    /// Regression for the archiver-rs reconnect noise: a long-lived
    /// CA client that has built a steady-state EMA (e.g. 15 s) for
    /// some server, then loses + re-establishes its TCP circuit while
    /// the server is in `online_notify_task` ramp-up, must NOT log a
    /// stream of `PeriodCollapse` warnings against its stale
    /// estimate. `BeaconControl::ResetServer` (issued by the
    /// coordinator on `TransportEvent::ServerConnected`, libca
    /// `bhe.cpp` "new client connect" parity) clears the EMA so the
    /// next beacon reseeds `period_estimate` from the live cadence.
    #[test]
    fn reset_on_connect_breaks_period_collapse_cascade_after_reconnect() {
        let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
        let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
        let server: SocketAddr = "127.0.0.1:5064".parse().unwrap();

        // Pre-existing steady state: 15-s EMA, 1000 beacons in,
        // last_id=999. Mirrors a long-running archiver before the
        // server's TCP circuit drops.
        servers.insert(
            server,
            BeaconState {
                last_id: 999,
                last_seen: Instant::now(),
                period_estimate: Some(Duration::from_secs(15)),
                count: 1000,
            },
        );

        // Coordinator reports the new circuit. EMA cleared.
        apply_reset_server(&mut servers, server);
        let s = servers.get(&server).expect("entry survives reset");
        assert!(
            s.period_estimate.is_none(),
            "ResetServer must clear period_estimate"
        );
        assert_eq!(s.count, 0, "ResetServer must zero count");
        assert_eq!(
            s.last_id, 999,
            "ResetServer must preserve last_id (dedup still works)"
        );

        // Standard rsrv ramp-up: 20, 40, 80, 160, 320, 640, 1280,
        // 2560, 5120, 10240 ms — same pattern as the
        // `rsrv_rampup_beacons_do_not_fire_period_collapse` test, but
        // arriving on top of the previously-pre-existing entry.
        let intervals_ms = [20u64, 40, 80, 160, 320, 640, 1280, 2560, 5120, 10240];
        let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
        hdr.count = 5064;
        hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
        // Server preserved its beacon counter across restart — ids
        // continue monotonically from 1000. This is the case
        // PeriodCollapse was supposed to catch, but the cascade
        // would otherwise spam every ramp-up beacon past the 50 ms
        // floor against the stale 15-s EMA.
        for (i, &ms) in intervals_ms.iter().enumerate() {
            let s = servers.get_mut(&server).expect("entry");
            s.last_seen = Instant::now() - Duration::from_millis(ms);
            hdr.cid = 1000 + (i as u32);
            handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());

            while let Ok(msg) = rx.try_recv() {
                if let CoordRequest::ForceRescanServer { kind, .. } = msg {
                    assert_ne!(
                        kind,
                        BeaconAnomalyKind::PeriodCollapse,
                        "ramp-up beacon #{} (interval={} ms) after \
                         ResetServer must not classify as PeriodCollapse \
                         — the cascade is the archiver-rs reconnect noise \
                         this fix targets",
                        i + 1,
                        ms
                    );
                }
            }
        }
    }

    /// Peer-client-triggered cascade: an existing CA client with a
    /// mature steady-state EMA (~15 s) must NOT fire a stream of
    /// `PeriodCollapse` warnings when a DIFFERENT client on the
    /// network connects to the same IOC. The peer's TCP accept fires
    /// the IOC's `beacon_reset` notify (`server/tcp.rs:450`), which
    /// restarts the `beacon_emitter` ramp-up cycle. Our circuit
    /// stayed up the whole time, so the coordinator does NOT issue
    /// `BeaconControl::ResetServer` for us. Before the
    /// `handle_beacon` self-reset fix, every ramp-up beacon past the
    /// 50 ms floor satisfied `actual_interval < 15 s / 3 = 5 s` and
    /// fired a WARN log + transport-watchdog sticky flag + search
    /// rescan — the symptom the user reported. After the fix the
    /// monitor recognises the signature (id monotonic + interval
    /// suddenly << EMA) as a server-side reset cascade, clears its
    /// own EMA, and stays silent.
    #[test]
    fn peer_connect_ramp_up_does_not_fire_period_collapse() {
        let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
        let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
        let server: SocketAddr = "127.0.0.1:5064".parse().unwrap();

        // Existing long-lived client: 1000 beacons in, EMA at 15 s,
        // last_id=999. Our circuit is up (so the coordinator never
        // issued ResetServer for us).
        servers.insert(
            server,
            BeaconState {
                last_id: 999,
                last_seen: Instant::now(),
                period_estimate: Some(Duration::from_secs(15)),
                count: 1000,
            },
        );

        // A peer client connects to the IOC. The IOC's
        // `beacon_emitter` interval resets to 20 ms and ramps up
        // through 20, 40, 80, 160, 320, 640, 1280, 2560, 5120,
        // 10240 ms before stabilising at 15 s. beacon_id keeps
        // counting monotonically (the IOC didn't restart).
        let intervals_ms = [20u64, 40, 80, 160, 320, 640, 1280, 2560, 5120, 10240];
        let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
        hdr.count = 5064;
        hdr.available = u32::from_be_bytes([127, 0, 0, 1]);

        for (i, &ms) in intervals_ms.iter().enumerate() {
            let s = servers.get_mut(&server).expect("entry");
            s.last_seen = Instant::now() - Duration::from_millis(ms);
            hdr.cid = 1000 + (i as u32);
            handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());

            while let Ok(msg) = rx.try_recv() {
                if let CoordRequest::ForceRescanServer { kind, .. } = msg {
                    assert_ne!(
                        kind,
                        BeaconAnomalyKind::PeriodCollapse,
                        "peer-connect ramp-up beacon #{} (interval={} ms) \
                         must NOT classify as PeriodCollapse — \
                         the self-reset path in handle_beacon absorbs it",
                        i + 1,
                        ms
                    );
                }
            }
        }

        // After the cascade, the EMA has been reseeded from the
        // ramp-up. It must be > 0 (we processed beacons) and the
        // last_id must reflect the latest beacon. The exact value
        // depends on alpha=0.25 over the doubling sequence; assert
        // structural correctness, not a numeric tolerance.
        let s = servers.get(&server).expect("entry");
        assert_eq!(s.last_id, 1009, "last_id must track ramp-up ids");
        assert!(
            s.period_estimate.is_some(),
            "EMA must be reseeded after the cascade"
        );
    }

    /// `apply_reset_server` for an unknown server is a no-op — the
    /// coordinator will issue ResetServer on every fresh circuit,
    /// including first-ever connects where we may not yet have a
    /// `BeaconState` (e.g. the server hadn't beaconed before our
    /// search reached it via name resolution).
    #[test]
    fn reset_unknown_server_is_noop() {
        let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
        let server: SocketAddr = "127.0.0.1:5064".parse().unwrap();
        apply_reset_server(&mut servers, server);
        assert!(servers.is_empty());
    }

    /// The beacon-state HashMap key is the beacon's *announced*
    /// address, which for modern IOCs is INADDR_ANY (`available=0`)
    /// — so the key is `0.0.0.0:port` while the TCP-circuit
    /// `server_addr` is the real IP:port. An exact-key lookup
    /// would miss this entry and the reset would be a no-op. Mirror
    /// `beacon_arrival_targets`'s port-fallback policy so the EMA is
    /// actually cleared.
    #[test]
    fn reset_matches_inaddr_any_announced_entry() {
        let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
        let inaddr_any: SocketAddr = "0.0.0.0:5064".parse().unwrap();
        servers.insert(
            inaddr_any,
            BeaconState {
                last_id: 999,
                last_seen: Instant::now(),
                period_estimate: Some(Duration::from_secs(15)),
                count: 1000,
            },
        );

        // Coordinator forwards the TCP-circuit's real address, NOT
        // the INADDR_ANY beacon key.
        let circuit: SocketAddr = "10.0.0.5:5064".parse().unwrap();
        apply_reset_server(&mut servers, circuit);

        let s = servers.get(&inaddr_any).expect("entry preserved");
        assert!(
            s.period_estimate.is_none(),
            "INADDR_ANY-keyed entry must be reset by port-match"
        );
        assert_eq!(s.count, 0);
        assert_eq!(s.last_id, 999, "last_id preserved across reset");
    }

    /// Multi-homed IOC: beacon arrives via NIC A's IP, but our search
    /// reply landed via NIC B and the circuit talks to `B:port`.
    /// Beacon-state key is `A:port`. Same port-fallback applies.
    #[test]
    fn reset_matches_multihomed_announced_entry() {
        let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
        let nic_a: SocketAddr = "10.0.0.1:5064".parse().unwrap();
        servers.insert(
            nic_a,
            BeaconState {
                last_id: 42,
                last_seen: Instant::now(),
                period_estimate: Some(Duration::from_secs(15)),
                count: 100,
            },
        );

        let nic_b: SocketAddr = "10.0.0.2:5064".parse().unwrap();
        apply_reset_server(&mut servers, nic_b);

        let s = servers.get(&nic_a).expect("entry preserved");
        assert!(s.period_estimate.is_none());
        assert_eq!(s.count, 0);
    }

    /// Exact-match terminal regression. A single IOC announces with
    /// one IP per process (server/beacon.rs:46-58 computes
    /// `server_ip` once at task start and reuses it on every beacon),
    /// so a given IOC produces *one* beacon-state key — real IP OR
    /// INADDR_ANY, never both. If both `10.0.0.5:5064` and
    /// `0.0.0.0:5064` exist in the map at the same time, they are
    /// DIFFERENT IOCs. After exact-match resets the target,
    /// continuing on to also reset `0.0.0.0:5064` would silently
    /// blind the OTHER IOC's PeriodCollapse for the next ~4 beacons
    /// — same correctness regression as the rejected port-wide
    /// sweep, just narrower.
    #[test]
    fn reset_exact_does_not_cascade_to_inaddr_any_other_ioc() {
        let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
        let target: SocketAddr = "10.0.0.5:5064".parse().unwrap();
        let inaddr_any: SocketAddr = "0.0.0.0:5064".parse().unwrap();
        servers.insert(
            target,
            BeaconState {
                last_id: 1,
                last_seen: Instant::now(),
                period_estimate: Some(Duration::from_secs(15)),
                count: 1000,
            },
        );
        servers.insert(
            inaddr_any,
            BeaconState {
                last_id: 2,
                last_seen: Instant::now(),
                period_estimate: Some(Duration::from_secs(15)),
                count: 500,
            },
        );

        apply_reset_server(&mut servers, target);

        let t = servers.get(&target).expect("target preserved");
        assert!(t.period_estimate.is_none(), "exact-match target reset");
        assert_eq!(t.count, 0);

        let i = servers.get(&inaddr_any).expect("inaddr-any preserved");
        assert_eq!(
            i.period_estimate,
            Some(Duration::from_secs(15)),
            "INADDR_ANY entry from a different IOC must not be touched \
             after an exact-match hit"
        );
        assert_eq!(i.count, 500);
    }

    /// Cross-IOC blinding regression. In real CA networks many
    /// unrelated IOCs share the default port 5064. A naive port-only
    /// sweep would clear `count` and `period_estimate` for every
    /// `*:5064` entry on every reconnect — silently disabling
    /// PeriodCollapse (gated on `count > 3`) for the next ~4 beacons
    /// from each unrelated IOC. A neighbour IOC that restarts with a
    /// preserved beacon counter inside that window would go
    /// undetected.
    ///
    /// With the narrowed policy, an exact-match reset must touch ONLY
    /// the matched entry; unrelated same-port entries stay intact.
    #[test]
    fn reset_does_not_blind_other_same_port_ioc() {
        let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
        let target: SocketAddr = "10.0.0.5:5064".parse().unwrap();
        let neighbour: SocketAddr = "10.0.0.7:5064".parse().unwrap();
        servers.insert(
            target,
            BeaconState {
                last_id: 1000,
                last_seen: Instant::now(),
                period_estimate: Some(Duration::from_secs(15)),
                count: 1000,
            },
        );
        servers.insert(
            neighbour,
            BeaconState {
                last_id: 2000,
                last_seen: Instant::now(),
                period_estimate: Some(Duration::from_secs(15)),
                count: 5000,
            },
        );

        apply_reset_server(&mut servers, target);

        let t = servers.get(&target).expect("target preserved");
        assert!(t.period_estimate.is_none(), "exact-match target reset");
        assert_eq!(t.count, 0);

        let n = servers.get(&neighbour).expect("neighbour preserved");
        assert_eq!(
            n.period_estimate,
            Some(Duration::from_secs(15)),
            "unrelated same-port IOC must NOT have its EMA cleared — \
             that would disable PeriodCollapse on its next restart"
        );
        assert_eq!(n.count, 5000, "neighbour count untouched");
    }

    /// Ambiguous fallback: if neither exact nor INADDR_ANY hits and
    /// multiple `*:port` entries exist, we can't pick one safely —
    /// skip the reset entirely (post-reconnect cascade returns for
    /// the multi-homed-IOC + collision case, but no unrelated IOC is
    /// blinded).
    #[test]
    fn reset_skips_when_ambiguous_no_exact_no_inaddr_any() {
        let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
        let a: SocketAddr = "10.0.0.7:5064".parse().unwrap();
        let b: SocketAddr = "10.0.0.9:5064".parse().unwrap();
        servers.insert(
            a,
            BeaconState {
                last_id: 1,
                last_seen: Instant::now(),
                period_estimate: Some(Duration::from_secs(15)),
                count: 100,
            },
        );
        servers.insert(
            b,
            BeaconState {
                last_id: 2,
                last_seen: Instant::now(),
                period_estimate: Some(Duration::from_secs(15)),
                count: 200,
            },
        );

        // Reset for a circuit that doesn't match either entry.
        let circuit: SocketAddr = "10.0.0.5:5064".parse().unwrap();
        apply_reset_server(&mut servers, circuit);

        for key in [a, b] {
            let s = servers.get(&key).expect("entry preserved");
            assert_eq!(
                s.period_estimate,
                Some(Duration::from_secs(15)),
                "ambiguous fallback must not blind {key}"
            );
        }
    }

    /// INADDR_ANY hit must NOT cascade into a port-wide sweep —
    /// unrelated real-IP `*:port` entries stay intact even when the
    /// reset successfully resolves via INADDR_ANY.
    #[test]
    fn reset_via_inaddr_any_does_not_touch_unrelated_real_ip_entries() {
        let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
        let inaddr_any: SocketAddr = "0.0.0.0:5064".parse().unwrap();
        let unrelated: SocketAddr = "10.0.0.7:5064".parse().unwrap();
        servers.insert(
            inaddr_any,
            BeaconState {
                last_id: 1,
                last_seen: Instant::now(),
                period_estimate: Some(Duration::from_secs(15)),
                count: 100,
            },
        );
        servers.insert(
            unrelated,
            BeaconState {
                last_id: 2,
                last_seen: Instant::now(),
                period_estimate: Some(Duration::from_secs(15)),
                count: 200,
            },
        );

        let circuit: SocketAddr = "10.0.0.5:5064".parse().unwrap();
        apply_reset_server(&mut servers, circuit);

        let i = servers.get(&inaddr_any).expect("inaddr-any preserved");
        assert!(i.period_estimate.is_none(), "INADDR_ANY entry reset");

        let u = servers.get(&unrelated).expect("unrelated preserved");
        assert_eq!(
            u.period_estimate,
            Some(Duration::from_secs(15)),
            "unrelated same-port real-IP entry must not be touched"
        );
        assert_eq!(u.count, 200);
    }

    /// Different port = different IOC. Reset must NOT touch entries on
    /// unrelated ports (port-fallback's *only* fuzz axis is IP, not
    /// port).
    #[test]
    fn reset_leaves_other_port_entries_alone() {
        let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
        let other: SocketAddr = "0.0.0.0:5065".parse().unwrap();
        servers.insert(
            other,
            BeaconState {
                last_id: 7,
                last_seen: Instant::now(),
                period_estimate: Some(Duration::from_secs(15)),
                count: 50,
            },
        );

        let circuit: SocketAddr = "10.0.0.5:5064".parse().unwrap();
        apply_reset_server(&mut servers, circuit);

        let s = servers.get(&other).expect("entry preserved");
        assert_eq!(
            s.period_estimate,
            Some(Duration::from_secs(15)),
            "different-port entry must not be touched"
        );
        assert_eq!(s.count, 50);
    }

    /// Regression: under the standard production topology the
    /// client receives beacons via the CA repeater on LOCALHOST (see
    /// `run_beacon_monitor_inner` bind at line 160). The repeater
    /// rewrites `m_available` on the regular `CA_PROTO_RSRV_IS_UP`
    /// beacon to the original sender's source IP (`repeater.cpp:
    /// 626-630`, our `repeater.rs:224-228`); the 0xCAFE companion is
    /// forwarded verbatim and the kernel rewrites the L3 source IP
    /// to the repeater's loopback. The verified-tuple lookup key
    /// (post-fix: `(hdr.available, hdr.count, hdr.cid)`) therefore
    /// matches the companion-side insert key
    /// (`(signed_ip, signed_port, signed_beacon_id)`) without needing
    /// the L3 source IP to equal the announced server IP.
    ///
    /// An earlier version used `meta.src.ip()` for the lookup, which produced
    /// `127.0.0.1` under this topology — every legitimate signed
    /// beacon was dropped (`EPICS_CA_BEACON_REQUIRE_SIGNED=YES`,
    /// default). This test fixes the failure mode in place.
    #[cfg(feature = "cap-tokens")]
    #[test]
    fn verified_tuple_key_matches_via_repeater() {
        use crate::server::signed_beacon::{SignedBeaconEmitter, SignedBeaconVerifier};
        use ed25519_dalek::SigningKey;
        use rand_core::OsRng;
        use std::net::Ipv4Addr;
        use std::time::SystemTime;

        // Build a signed-beacon companion as the server would emit.
        let mut csprng = OsRng;
        let signing_key = SigningKey::generate(&mut csprng);
        let socket = std::sync::Arc::new(
            tokio::runtime::Builder::new_current_thread()
                .enable_all()
                .build()
                .unwrap()
                .block_on(async { tokio::net::UdpSocket::bind("127.0.0.1:0").await.unwrap() }),
        );

        let server_ip_u32 = u32::from_be_bytes([10, 0, 0, 5]);
        let server_port: u16 = 5064;
        let beacon_id: u32 = 42;
        let ts = SystemTime::now()
            .duration_since(SystemTime::UNIX_EPOCH)
            .unwrap()
            .as_secs();
        let emitter = SignedBeaconEmitter::new(signing_key.clone(), socket, vec![]);
        let packet = emitter.build_packet(server_ip_u32, server_port, beacon_id, ts);

        // Verifier path (companion side).
        let mut verifier = SignedBeaconVerifier::new();
        verifier.trust(signing_key.verifying_key());
        let (verified_ip, verified_port, verified_bid) =
            verifier.verify(&packet).expect("signature verifies");

        // G3 source-IP binding: in the repeater topology meta.src is
        // 127.0.0.1, so the binding is intentionally relaxed. The
        // companion-frame insert proceeds because `via_repeater =
        // src_ip.is_loopback()` short-circuits the strict equality
        // check at line 319.
        let meta_src_via_repeater = Ipv4Addr::LOCALHOST;
        assert!(
            meta_src_via_repeater.is_loopback(),
            "topology precondition: client beacon socket binds to LOCALHOST"
        );

        // Insert as the companion path does — under the verifier
        // policy, the insert uses the SIGNED payload's announced ip.
        let mut verified_tuples: HashMap<(u32, u16, u32), Instant> = HashMap::new();
        verified_tuples.insert((verified_ip, verified_port, verified_bid), Instant::now());

        // Lookup as the regular-beacon path does (post-fix:
        // keyed by `hdr.available`, which the repeater rewrites to
        // the real server IP — equal to `verified_ip` here).
        let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
        hdr.data_type = 0;
        hdr.count = server_port;
        hdr.cid = beacon_id;
        // Repeater rewrite: hdr.available = original server's source IP.
        hdr.available = server_ip_u32;

        let lookup_ip_u32 = if hdr.available != 0 {
            hdr.available
        } else {
            u32::from_be_bytes(meta_src_via_repeater.octets())
        };
        let key = (lookup_ip_u32, hdr.count, hdr.cid);
        assert!(
            verified_tuples.contains_key(&key),
            "regression: regular beacon with hdr.available rewritten by \
             the repeater must hit the companion-inserted tuple"
        );

        // Sanity: the earlier key shape (meta.src.ip(), count, cid)
        // would have missed under the repeater topology.
        let r7_key = (
            u32::from_be_bytes(meta_src_via_repeater.octets()),
            hdr.count,
            hdr.cid,
        );
        assert!(
            !verified_tuples.contains_key(&r7_key),
            "documents the earlier failure mode: meta.src=127.0.0.1 key never matches"
        );
    }

    /// Direct-LAN fallback: when no repeater rewrites
    /// `hdr.available`, the lookup falls back to `meta.src.ip()` so
    /// the key still aligns with the companion-side insert. This is
    /// the original failure scenario, preserved for the case where
    /// a future caller binds the monitor socket to a non-loopback
    /// NIC.
    #[cfg(feature = "cap-tokens")]
    #[test]
    fn verified_tuple_key_falls_back_to_src_for_direct_lan() {
        use crate::server::signed_beacon::{SignedBeaconEmitter, SignedBeaconVerifier};
        use ed25519_dalek::SigningKey;
        use rand_core::OsRng;
        use std::net::Ipv4Addr;
        use std::time::SystemTime;

        let mut csprng = OsRng;
        let signing_key = SigningKey::generate(&mut csprng);
        let socket = std::sync::Arc::new(
            tokio::runtime::Builder::new_current_thread()
                .enable_all()
                .build()
                .unwrap()
                .block_on(async { tokio::net::UdpSocket::bind("127.0.0.1:0").await.unwrap() }),
        );

        let server_ip = Ipv4Addr::new(10, 0, 0, 5);
        let server_ip_u32 = u32::from_be_bytes(server_ip.octets());
        let server_port: u16 = 5064;
        let beacon_id: u32 = 99;
        let ts = SystemTime::now()
            .duration_since(SystemTime::UNIX_EPOCH)
            .unwrap()
            .as_secs();
        let emitter = SignedBeaconEmitter::new(signing_key.clone(), socket, vec![]);
        let packet = emitter.build_packet(server_ip_u32, server_port, beacon_id, ts);
        let mut verifier = SignedBeaconVerifier::new();
        verifier.trust(signing_key.verifying_key());
        let (verified_ip, verified_port, verified_bid) =
            verifier.verify(&packet).expect("signature verifies");

        let mut verified_tuples: HashMap<(u32, u16, u32), Instant> = HashMap::new();
        verified_tuples.insert((verified_ip, verified_port, verified_bid), Instant::now());

        // Direct-LAN: server emits hdr.available=0, no repeater
        // rewrites it. meta.src.ip() is the real server IP.
        let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
        hdr.data_type = 0;
        hdr.count = server_port;
        hdr.cid = beacon_id;
        hdr.available = 0;
        let meta_src = server_ip;

        let lookup_ip_u32 = if hdr.available != 0 {
            hdr.available
        } else {
            u32::from_be_bytes(meta_src.octets())
        };
        let key = (lookup_ip_u32, hdr.count, hdr.cid);
        assert!(
            verified_tuples.contains_key(&key),
            "direct-LAN fallback: meta.src.ip() lookup must hit when \
             hdr.available is zero"
        );
    }
}