1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
//! `CortexAdapter<State>` — one RedEX file, one fold, one materialized
//! state.
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, OnceLock};
use std::time::Duration;
use futures::{Stream, StreamExt};
use parking_lot::RwLock;
use tokio::sync::{broadcast, Notify, Semaphore};
use tokio_stream::wrappers::BroadcastStream;
use serde::de::DeserializeOwned;
use serde::Serialize;
use super::super::channel::ChannelName;
use super::super::redex::{
Redex, RedexError, RedexEvent, RedexFile, RedexFileConfig, RedexFold, WriteToken,
};
use super::config::{CortexAdapterConfig, FoldErrorPolicy, StartPosition};
use super::envelope::IntoRedexPayload;
use super::error::CortexAdapterError;
use super::meta::EVENT_META_SIZE;
/// Process-wide cap on in-flight RYW waits across every
/// `CortexAdapter` in the process. `CortexAdapterConfig::ryw_inflight_cap`
/// bounds per-adapter; this caps the total. Operators running
/// thousands of adapters need the process-wide bound so the
/// cumulative permit count doesn't dwarf the memory budget.
///
/// Default: no global cap. Operators install one at startup via
/// [`set_global_ryw_inflight_cap`]; the install is one-shot
/// (`OnceLock`), so subsequent calls return `false` and leave the
/// previous cap in place.
static GLOBAL_RYW_CAP: OnceLock<Arc<Semaphore>> = OnceLock::new();
/// Install a process-wide cap on concurrent `wait_for_token`
/// permits across every `CortexAdapter` instance. Call once at
/// process startup before any RYW traffic. `cap` is clamped to a
/// minimum of 1.
///
/// Returns `true` if the cap was installed by this call, `false`
/// if a prior call already installed one.
pub fn set_global_ryw_inflight_cap(cap: usize) -> bool {
GLOBAL_RYW_CAP
.set(Arc::new(Semaphore::new(cap.max(1))))
.is_ok()
}
/// Lookup the installed global cap; `None` when no cap has been
/// set (default).
fn current_global_ryw_semaphore() -> Option<Arc<Semaphore>> {
GLOBAL_RYW_CAP.get().cloned()
}
/// One-file CortEX adapter: projects envelopes into RedEX payloads,
/// tails the same file, drives a [`RedexFold`] implementation, and
/// exposes the materialized state as a read handle.
///
/// Created via [`Self::open`].
pub struct CortexAdapter<State> {
inner: Arc<AdapterInner<State>>,
}
/// Capacity of the post-fold change-notification broadcast channel.
/// A slow subscriber that falls more than this many events behind
/// gets a `Lagged` signal and should re-read state fresh.
const CHANGES_BROADCAST_CAP: usize = 64;
/// Item type yielded by [`CortexAdapter::changes_with_lag`].
///
/// The plain `changes()` stream uses `filter_map(|r| r.ok())`
/// which silently drops `BroadcastStream::Lagged(n)` errors —
/// downstream telemetry consumers have no way to surface "you
/// missed N changes." This enum exposes both shapes; subscribers
/// who need only the latest sequence can stay on
/// [`CortexAdapter::changes`].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ChangeEvent {
/// A successful fold apply produced this RedEX sequence.
Seq(u64),
/// The subscriber fell `n` events behind the broadcast channel
/// and `n` change notifications were dropped. By the time the
/// subscriber sees this, `state()` already reflects past those
/// events — the lag value is purely observability.
Lagged(u64),
}
struct AdapterInner<State> {
file: RedexFile,
state: Arc<RwLock<State>>,
/// Highest RedEX seq applied to state, as a signed i64 so we can
/// sentinel "nothing folded yet" with `start_seq - 1` (can be
/// negative when `start_seq == 0`).
/// Highest RedEX seq the fold task has *processed* — applied
/// successfully OR skipped via `recoverable_decode` under
/// `Stop` policy. The skip-and-continue path advances this
/// watermark so live consumers waiting via `wait_for_seq`
/// don't deadlock on a single permanently-bad event (the
/// DoS-resistance contract documented on
/// `RedexError::is_recoverable_decode`).
///
/// **Use `applied_through_seq` instead** if you need
/// "state actually reflects this seq" semantics — that
/// watermark only advances on `Ok(())` folds, and is what
/// `snapshot` persists so restore tails from a position
/// consistent with the in-memory state.
///
/// `u64::MAX` is the "nothing folded yet" sentinel; safe
/// because the `open_from_snapshot` overflow guard rejects
/// `last_seq == u64::MAX`, so no real event can ever occupy
/// that slot.
folded_through_seq: AtomicU64,
/// Highest RedEX seq K such that **every** seq in
/// `start_seq..=K` was applied to state via a successful
/// `RedexFold::apply`. A *strict-prefix* watermark — any
/// skip (`recoverable_decode` under `Stop`, or any error
/// under `LogAndContinue`) breaks the prefix at the skipped
/// seq, and `applied_through_seq` cannot advance past it
/// even when later seqs apply successfully. Distinct from
/// `folded_through_seq`, which is the highest *processed*
/// seq and advances over skips so live consumers don't
/// deadlock.
///
/// Snapshot persists this value, so restore tails from
/// `applied_through_seq + 1` with the guarantee that **every
/// prior seq is reflected in state**. Without strict-prefix
/// semantics, a skip at seq M followed by successful applies
/// at M+1, M+2 would advance `applied` to M+2 (highest
/// individual apply); snapshot persists M+2; restore tails
/// from M+3; seq M is never re-attempted on restore — its
/// mutations are permanently lost from durable state, even
/// though the log still carries the event. That's the
/// "violates log is the source of truth" failure audited as
/// #6+#7.
///
/// Same `u64::MAX` "nothing applied yet" sentinel as
/// `folded_through_seq`. `start_seq == 0` is the only
/// configuration that initializes to sentinel; an
/// `open_from_snapshot` with `last_seq = Some(K)` initializes
/// to `K` (the snapshot's contract is that `start_seq..=K`
/// is already in the rehydrated state).
applied_through_seq: AtomicU64,
/// First RedEX seq this adapter began folding from. Any seq
/// strictly below this is conceptually behind us — `wait_for_seq`
/// short-circuits without blocking on the watermark, even when
/// `start_seq == 0` puts the watermark at the `u64::MAX`
/// sentinel. Stored on inner so the wait predicate doesn't
/// need to reach back into the open-time `start_seq` local.
start_seq: u64,
fold_errors: AtomicU64,
running: AtomicBool,
closed: AtomicBool,
notify: Notify,
shutdown: Notify,
/// Broadcast of RedEX seqs after each successful (or LogAndContinue-skipped)
/// fold apply. Subscribers: see [`CortexAdapter::changes`].
changes_tx: broadcast::Sender<u64>,
/// Per-adapter cap on concurrent `wait_for_token` callers. `None`
/// when `CortexAdapterConfig::ryw_inflight_cap == 0` (unbounded).
/// Otherwise sized to `ryw_inflight_cap` permits; each pending
/// wait holds one permit for its duration. Exceeding the cap
/// returns `WaitForTokenError::QueueFull` immediately.
ryw_inflight: Option<Arc<Semaphore>>,
/// Per-adapter RYW counters. See [`RywMetricsSnapshot`].
ryw_metrics: RywMetricsAtomic,
}
/// Atomic counters backing [`CortexAdapter::ryw_metrics`]. One
/// instance lives on each adapter (each adapter is per-channel
/// already, so channel-labeling falls out of the adapter identity).
#[derive(Debug, Default)]
struct RywMetricsAtomic {
waits_total: AtomicU64,
timeouts_total: AtomicU64,
queue_full_total: AtomicU64,
wrong_origin_total: AtomicU64,
wait_duration_nanos_sum: AtomicU64,
}
/// Snapshot of the RYW counters on a [`CortexAdapter`].
///
/// All counters are monotonic; computing a rate is the caller's job
/// (divide deltas between two snapshots by the sampling interval).
/// `wait_duration_nanos_sum / waits_total` approximates the mean
/// wait time without a full histogram.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct RywMetricsSnapshot {
/// Cumulative `wait_for_token` calls that took a permit (i.e.
/// were not rejected up-front with `QueueFull` / `WrongOrigin`).
pub waits_total: u64,
/// Cumulative waits that returned `Timeout`.
pub timeouts_total: u64,
/// Cumulative waits rejected with `QueueFull`.
pub queue_full_total: u64,
/// Cumulative waits rejected with `WrongOrigin` at the bound-
/// adapter layer. The generic `CortexAdapter::wait_for_token`
/// never increments this; only `TasksAdapter::wait_for_token`
/// and `MemoriesAdapter::wait_for_token` do, when their guard
/// fires.
pub wrong_origin_total: u64,
/// Sum of nanoseconds spent inside `wait_for_token` past the
/// permit acquisition. Divide by `waits_total` for the mean.
pub wait_duration_nanos_sum: u64,
}
impl<State> CortexAdapter<State> {
/// Read-only access to the materialized state. The returned `Arc`
/// is cheap to clone; all readers and the fold task share the
/// same `RwLock`.
pub fn state(&self) -> Arc<RwLock<State>> {
self.inner.state.clone()
}
/// Highest RedEX sequence the fold task has *processed* —
/// applied successfully OR skipped via `recoverable_decode`
/// under `Stop` policy.
///
/// Use [`Self::applied_through_seq`] if you need "state
/// actually reflects this seq" semantics; the difference
/// matters under `Stop`+recoverable-decode where the
/// skip-and-continue path advances *this* watermark (so
/// `wait_for_seq` doesn't deadlock on a bad event) but
/// leaves `applied_through_seq` behind.
///
/// `None` if no event has been folded yet since open.
pub fn folded_through_seq(&self) -> Option<u64> {
let v = self.inner.folded_through_seq.load(Ordering::Acquire);
if v == u64::MAX {
None
} else {
Some(v)
}
}
/// Highest RedEX sequence K such that *every* seq in
/// `start_seq..=K` was successfully applied to state via
/// `Ok(()) RedexFold::apply`. A *strict-prefix* watermark:
/// any skip (`recoverable_decode` under `Stop`, or any error
/// under `LogAndContinue`) at seq M permanently caps this
/// watermark at M-1, even if subsequent seqs apply
/// successfully.
///
/// `snapshot` persists this value, so a restore tails from
/// `applied_through_seq + 1` and re-attempts every seq from
/// the first skip onwards — preserving "log is the source
/// of truth" even across the skip-and-continue path.
///
/// Distinct from [`Self::folded_through_seq`], which is the
/// highest *processed* seq and advances over skips so live
/// consumers don't deadlock.
///
/// `None` if no event has been applied yet since open
/// (start_seq=0 with no successful applies, or start_seq=0
/// with the very first event having been skipped).
pub fn applied_through_seq(&self) -> Option<u64> {
let v = self.inner.applied_through_seq.load(Ordering::Acquire);
if v == u64::MAX {
None
} else {
Some(v)
}
}
/// Cumulative count of fold errors (only ever increases under
/// [`FoldErrorPolicy::LogAndContinue`]; under `Stop` it is 0 or
/// 1, with the task exiting after the first error).
pub fn fold_errors(&self) -> u64 {
self.inner.fold_errors.load(Ordering::Acquire)
}
/// True if the fold task is currently running (has not observed
/// shutdown, an error under `Stop`, or a tail-end signal).
pub fn is_running(&self) -> bool {
self.inner.running.load(Ordering::Acquire)
}
/// Block until the fold task has *processed* every event up
/// through `seq` (applied successfully OR skipped via
/// `recoverable_decode` under `Stop` policy), or until the
/// fold task stops (e.g. close, non-recoverable fold error
/// under `Stop`).
///
/// Returns `Ok(())` when the folded watermark reaches `seq`,
/// or `Err(folded)` where `folded` is the highest seq processed
/// before the fold task stopped (`None` if it stopped without
/// processing anything). Pre-fix this returned `()` for both
/// outcomes, so a caller waiting on a seq the fold task never
/// reached (close, Stop-policy halt, retention-evicted tail
/// lag) silently observed stale state. Mirrors
/// [`Self::wait_for_applied_seq`]'s shape.
///
/// Use pattern:
/// ```ignore
/// let seq = adapter.ingest(envelope)?;
/// adapter.wait_for_seq(seq).await?;
/// let state = adapter.state().read();
/// // state reflects the ingest, UNLESS the event at `seq`
/// // was skipped via recoverable_decode under `Stop`.
/// ```
///
/// **Subtle point.** This method waits on
/// [`Self::folded_through_seq`], which advances over
/// events the fold task processed but skipped via
/// `RedexError::is_recoverable_decode`. The skip-and-
/// continue path is the documented DoS-resistance contract
/// (a single corrupt event must not wedge the task forever).
/// If you need to confirm `state` actually reflects the
/// ingest at `seq`, follow up with
/// `adapter.applied_through_seq() >= Some(seq)`.
pub async fn wait_for_seq(&self, seq: u64) -> Result<(), Option<u64>> {
// Any seq strictly below `start_seq` is conceptually behind
// us — those events were applied before we opened the
// adapter (or are explicitly past the LiveOnly cutoff).
// Short-circuit returning immediately so a caller that
// passes a stale seq cannot hang. This also covers the
// `start_seq == 0 && seq == 0` blocked-forever-on-empty-
// file case for adapters opened with `FromBeginning` on a
// freshly-created log: `seq < start_seq` is false, but the
// sentinel check below correctly waits for the first event.
if seq < self.inner.start_seq {
return Ok(());
}
loop {
let notified = self.inner.notify.notified();
tokio::pin!(notified);
notified.as_mut().enable();
let watermark = self.inner.folded_through_seq.load(Ordering::Acquire);
// `u64::MAX` is the "nothing folded yet" sentinel — any
// other value is a real applied seq, so `watermark >= seq`
// after the sentinel check returns exactly when seq has
// been applied.
if watermark != u64::MAX && watermark >= seq {
return Ok(());
}
if !self.inner.running.load(Ordering::Acquire) {
return Err(self.folded_through_seq());
}
notified.await;
}
}
/// RYW-strength wait. Resolves when the **applied** watermark
/// (events that actually ran through the fold body, not
/// recoverable-skipped) catches up to `seq`, OR when the fold
/// task stops before reaching `seq`.
///
/// Returns `Ok(())` on a real apply-through, `Err(applied)`
/// where `applied` is the last successfully-applied seq on
/// stop. Differs from [`Self::wait_for_seq`] which resolves
/// on the folded watermark (including skipped events). RYW
/// requires applied, not folded — otherwise a producer whose
/// write hit a recoverable-decode skip would observe
/// `Ok(())` and then read state that doesn't reflect the
/// write.
pub async fn wait_for_applied_seq(&self, seq: u64) -> Result<(), Option<u64>> {
if seq < self.inner.start_seq {
return Ok(());
}
loop {
let notified = self.inner.notify.notified();
tokio::pin!(notified);
notified.as_mut().enable();
let watermark = self.inner.applied_through_seq.load(Ordering::Acquire);
if watermark != u64::MAX && watermark >= seq {
return Ok(());
}
if !self.inner.running.load(Ordering::Acquire) {
// Stopped without ever reaching seq. Surface the
// last-applied watermark so the caller can build
// a typed error.
let applied = self.applied_through_seq();
return Err(applied);
}
notified.await;
}
}
/// Close the adapter. Stops the fold task (after it finishes any
/// in-progress apply), leaves the RedEX file open so other
/// adapters / callers can continue using it, and leaves the
/// state handle readable. Idempotent.
pub fn close(&self) -> Result<(), CortexAdapterError> {
if self.inner.closed.swap(true, Ordering::AcqRel) {
return Ok(());
}
// `notify_one()` stores a permit if the fold task hasn't yet
// reached its `shutdown.notified()` poll, so a close that
// races the spawn → first-select window is still observed.
// `notify_waiters()` would drop the signal in that window.
self.inner.shutdown.notify_one();
Ok(())
}
/// Stream of RedEX sequences, one per successful (or
/// `LogAndContinue`-skipped) fold application. Used by reactive
/// queries: on each emission, the caller re-reads
/// [`Self::state`] to compute its current view.
///
/// Lag semantics: if a subscriber falls more than 64 events
/// behind (the internal broadcast channel capacity), the channel
/// drops intermediate events. This implementation filters lag
/// errors out silently — by the time the subscriber catches up,
/// `state()` reflects the latest applied events regardless of
/// how many signals were missed. Subscribers that need to
/// observe lag (e.g. for telemetry or reactive-backpressure)
/// should use [`Self::changes_with_lag`] instead.
///
/// The stream ends when all adapter handles have been dropped
/// and the fold task has exited.
pub fn changes(&self) -> impl Stream<Item = u64> + Send + 'static {
BroadcastStream::new(self.inner.changes_tx.subscribe())
.filter_map(|r| async move { r.ok() })
}
/// Stream of changes that surfaces broadcast-channel lag as a
/// `Lagged(n)` event interleaved with the sequence emissions.
///
/// The yielded items are [`ChangeEvent`]s — `Seq(u64)` for a
/// successful fold-apply notification, and `Lagged(n)` when the
/// subscriber fell `n` events behind the broadcast channel
/// (capacity 64). Pre-fix [`Self::changes`] silently dropped
/// `Lagged` errors via `filter_map(|r| r.ok())`; downstream
/// telemetry consumers had no way to surface "you missed N
/// changes." This method is the lossless counterpart — by the
/// time a subscriber sees `Lagged(n)`, `state()` already
/// reflects past those n events, so the subscriber can react
/// (re-read state, log lag, apply backpressure) without
/// missing data.
pub fn changes_with_lag(&self) -> impl Stream<Item = ChangeEvent> + Send + 'static {
use futures::StreamExt;
BroadcastStream::new(self.inner.changes_tx.subscribe()).map(|r| match r {
Ok(seq) => ChangeEvent::Seq(seq),
Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => {
ChangeEvent::Lagged(n)
}
})
}
/// Append an envelope. Projects to `(EventMeta, tail)`, builds the
/// concatenated payload, calls [`RedexFile::append`], and returns
/// the assigned RedEX sequence.
pub fn ingest<E: IntoRedexPayload>(&self, envelope: E) -> Result<u64, CortexAdapterError> {
if self.inner.closed.load(Ordering::Acquire) {
return Err(CortexAdapterError::Closed);
}
let (meta, tail) = envelope.into_redex_payload();
let mut buf = Vec::with_capacity(EVENT_META_SIZE + tail.len());
buf.extend_from_slice(&meta.to_bytes());
buf.extend_from_slice(&tail);
Ok(self.inner.file.append(&buf)?)
}
/// Append an envelope and return a [`WriteToken`] addressing
/// the resulting write. The token is the typed handle the
/// read-your-writes API consumes via
/// [`Self::wait_for_token`]; equivalent to calling [`Self::ingest`]
/// and pairing the returned seq with the envelope's
/// `meta.origin_hash`, but does both in one shot so the binding
/// surface can round-trip a single value.
pub fn ingest_with_token<E: IntoRedexPayload>(
&self,
envelope: E,
) -> Result<WriteToken, CortexAdapterError> {
if self.inner.closed.load(Ordering::Acquire) {
return Err(CortexAdapterError::Closed);
}
let (meta, tail) = envelope.into_redex_payload();
let origin_hash = meta.origin_hash;
let mut buf = Vec::with_capacity(EVENT_META_SIZE + tail.len());
buf.extend_from_slice(&meta.to_bytes());
buf.extend_from_slice(&tail);
let seq = self.inner.file.append(&buf)?;
Ok(WriteToken::new(origin_hash, seq))
}
/// Block until the fold task has processed every event up
/// through `token.seq`, or `deadline` elapses. Returns
/// `Err(WaitForTokenError::Timeout)` on deadline; `Ok(())`
/// once the watermark catches up (or the fold task stops —
/// see [`Self::wait_for_seq`] for the same caveat).
///
/// The token's `origin_hash` is informational at this layer
/// — the generic [`CortexAdapter`] folds every event in its
/// RedEX file regardless of origin. Origin-bound adapters
/// (e.g. [`super::tasks::TasksAdapter`],
/// [`super::memories::MemoriesAdapter`]) layer their own
/// origin assertion on top.
pub async fn wait_for_token(
&self,
token: WriteToken,
deadline: Duration,
) -> Result<(), WaitForTokenError> {
// Try-acquire FIRST so backpressure surfaces before the timer
// arms — under saturation a `QueueFull` is the correct
// diagnostic, not a `Timeout` masking it.
//
// Two-tier acquire: global cap (process-wide; bounds
// cumulative permit count across every adapter) then
// per-adapter cap. Both must succeed; if either is
// saturated we bump the metric and return QueueFull. The
// global permit is dropped at function return; tests that
// exercise saturation rely on this being held for the
// duration of the wait.
let _global_permit = match current_global_ryw_semaphore() {
Some(sem) => Some(sem.try_acquire_owned().map_err(|_| {
self.inner
.ryw_metrics
.queue_full_total
.fetch_add(1, Ordering::Relaxed);
WaitForTokenError::QueueFull
})?),
None => None,
};
let _permit = match &self.inner.ryw_inflight {
Some(sem) => Some(sem.clone().try_acquire_owned().map_err(|_| {
self.inner
.ryw_metrics
.queue_full_total
.fetch_add(1, Ordering::Relaxed);
WaitForTokenError::QueueFull
})?),
None => None,
};
self.inner
.ryw_metrics
.waits_total
.fetch_add(1, Ordering::Relaxed);
let started = tokio::time::Instant::now();
// RYW waits on the *applied* watermark, not the folded
// watermark — events skipped via recoverable_decode under
// FoldErrorPolicy::Stop advance folded but not applied,
// and a producer whose write hit such a skip must NOT
// observe Ok(()) (that would let them read state that
// doesn't reflect their write).
let outcome =
match tokio::time::timeout(deadline, self.wait_for_applied_seq(token.seq)).await {
Ok(Ok(())) => Ok(()),
Ok(Err(applied_through_seq)) => Err(WaitForTokenError::FoldStopped {
applied_through_seq,
}),
Err(_) => {
self.inner
.ryw_metrics
.timeouts_total
.fetch_add(1, Ordering::Relaxed);
Err(WaitForTokenError::Timeout)
}
};
let nanos = u64::try_from(started.elapsed().as_nanos()).unwrap_or(u64::MAX);
self.inner
.ryw_metrics
.wait_duration_nanos_sum
.fetch_add(nanos, Ordering::Relaxed);
outcome
}
/// Snapshot the RYW counters for this adapter. Cheap; reads
/// four atomics under `Relaxed`.
pub fn ryw_metrics(&self) -> RywMetricsSnapshot {
let m = &self.inner.ryw_metrics;
RywMetricsSnapshot {
waits_total: m.waits_total.load(Ordering::Relaxed),
timeouts_total: m.timeouts_total.load(Ordering::Relaxed),
queue_full_total: m.queue_full_total.load(Ordering::Relaxed),
wrong_origin_total: m.wrong_origin_total.load(Ordering::Relaxed),
wait_duration_nanos_sum: m.wait_duration_nanos_sum.load(Ordering::Relaxed),
}
}
/// Bump the `wrong_origin_total` RYW counter. Called by the
/// origin-bound adapter wrappers when their guard fires; not
/// part of the generic adapter's own happy path.
pub(super) fn note_wrong_origin(&self) {
self.inner
.ryw_metrics
.wrong_origin_total
.fetch_add(1, Ordering::Relaxed);
}
}
/// Errors surfaced by [`CortexAdapter::wait_for_token`] and the
/// origin-bound adapters that wrap it.
#[derive(Debug, PartialEq, Eq)]
pub enum WaitForTokenError {
/// Deadline elapsed before the fold watermark advanced to
/// the token's seq. The write may still land later; the
/// caller can retry with a fresh deadline or accept the
/// stale read.
Timeout,
/// Token belongs to a different origin than this adapter
/// folds. Origin-bound adapters surface this to catch the
/// caller-side aliasing where a token from chain A is waited
/// on against an adapter bound to chain B — the wait would
/// otherwise hang on a seq that can never land here.
WrongOrigin {
/// origin the token was issued for.
token_origin: u64,
/// origin this adapter is bound to.
adapter_origin: u64,
},
/// Per-channel in-flight cap is saturated — back-pressure for
/// callers who can shed load instead of stacking unbounded
/// pending waits. See
/// [`super::config::CortexAdapterConfig::with_ryw_inflight_cap`].
QueueFull,
/// Fold task stopped before the token's seq was reached.
/// Under `FoldErrorPolicy::Stop` an unrecoverable fold error
/// halts the task, and any pending RYW wait observing
/// `running == false` surfaces this variant rather than a
/// silent `Ok(())` — otherwise a producer cannot distinguish
/// "your write is visible" from "the adapter is dead and
/// never will reach your write."
FoldStopped {
/// Last seq the fold task fully applied before stopping.
/// `None` if the task stopped before applying any event.
applied_through_seq: Option<u64>,
},
}
impl std::fmt::Display for WaitForTokenError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Timeout => f.write_str("read-your-writes wait timed out"),
Self::WrongOrigin {
token_origin,
adapter_origin,
} => write!(
f,
"token origin {:016x} != adapter origin {:016x}",
token_origin, adapter_origin
),
Self::QueueFull => f.write_str("read-your-writes wait-queue saturated; retry later"),
Self::FoldStopped {
applied_through_seq,
} => match applied_through_seq {
Some(seq) => write!(
f,
"fold task stopped before reaching token seq (applied through {})",
seq
),
None => f.write_str("fold task stopped before applying any event"),
},
}
}
}
impl std::error::Error for WaitForTokenError {}
impl<State: Send + Sync + 'static> CortexAdapter<State> {
/// Open an adapter against a RedEX file.
///
/// Opens (or reuses) `<redex>/<name>` via
/// [`Redex::open_file`](super::super::redex::Redex::open_file),
/// spawns a background task that tails the file and drives
/// `fold`, and returns the handle.
pub fn open<F>(
redex: &Redex,
name: &ChannelName,
redex_config: RedexFileConfig,
adapter_config: CortexAdapterConfig,
fold: F,
initial_state: State,
) -> Result<Self, CortexAdapterError>
where
F: RedexFold<State> + Send + 'static,
{
// Positions that skip a non-empty event prefix require
// externally-rehydrated state — the watermark would
// otherwise advance past events the adapter never saw,
// making `wait_for_seq(k)` return immediately for skipped
// k while `state` has never observed those events.
// Callers using these positions must use
// `open_from_snapshot` (which carries the matching
// `last_seq` + serialized state) and routes through
// `open_unchecked` below.
match adapter_config.start {
StartPosition::FromBeginning => {}
StartPosition::LiveOnly => {
return Err(CortexAdapterError::InvalidStartPosition("LiveOnly"));
}
StartPosition::FromSeq(n) if n > 0 => {
return Err(CortexAdapterError::InvalidStartPosition("FromSeq(n>0)"));
}
StartPosition::FromSeq(_) => {} // FromSeq(0) is equivalent to FromBeginning
}
Self::open_unchecked(
redex,
name,
redex_config,
adapter_config,
fold,
initial_state,
)
}
/// Internal open path that bypasses the start-position
/// guard. Used by `open_from_snapshot`, where the externally-
/// rehydrated state is the legitimate reason to skip the
/// event prefix.
fn open_unchecked<F>(
redex: &Redex,
name: &ChannelName,
redex_config: RedexFileConfig,
adapter_config: CortexAdapterConfig,
mut fold: F,
initial_state: State,
) -> Result<Self, CortexAdapterError>
where
F: RedexFold<State> + Send + 'static,
{
let file = redex.open_file(name, redex_config)?;
let start_seq = match adapter_config.start {
StartPosition::FromBeginning => 0,
StartPosition::LiveOnly => file.next_seq(),
StartPosition::FromSeq(n) => n,
};
let state = Arc::new(RwLock::new(initial_state));
// Initial watermark encodes "applied through start_seq-1", so
// `wait_for_seq(start_seq-1)` returns immediately after open
// (those seqs are conceptually behind us) while
// `wait_for_seq(start_seq)` blocks until the first event
// actually folds. `start_seq == 0` encodes the "nothing
// folded yet" state with the `u64::MAX` sentinel.
let initial_watermark: u64 = if start_seq == 0 {
u64::MAX
} else {
start_seq - 1
};
let (changes_tx, _) = broadcast::channel(CHANGES_BROADCAST_CAP);
let ryw_inflight = if adapter_config.ryw_inflight_cap == 0 {
None
} else {
Some(Arc::new(Semaphore::new(adapter_config.ryw_inflight_cap)))
};
let inner = Arc::new(AdapterInner {
file: file.clone(),
state: state.clone(),
folded_through_seq: AtomicU64::new(initial_watermark),
// Mirror folded's initial watermark: the snapshot/restore
// contract treats the sentinel as "nothing applied yet"
// and the (start_seq - 1) seed as "applied through the
// pre-snapshot prefix" — same semantics, applied to the
// strict watermark.
applied_through_seq: AtomicU64::new(initial_watermark),
start_seq,
fold_errors: AtomicU64::new(0),
running: AtomicBool::new(true),
closed: AtomicBool::new(false),
notify: Notify::new(),
shutdown: Notify::new(),
changes_tx,
ryw_inflight,
ryw_metrics: RywMetricsAtomic::default(),
});
let policy = adapter_config.on_fold_error;
let inner_task = inner.clone();
tokio::spawn(async move {
// Registration and consumption share a task. Pre-fix
// `file.tail(start_seq)` ran on the caller's task and the
// `tokio::spawn` then queued; concurrent appends in that
// window could saturate the bounded tail channel before
// this task polled even once, evicting the watcher with
// `Lagged` and killing the fold loop on its first item.
// Doing both inside the spawn pins them adjacent in
// scheduler order.
let mut stream = Box::pin(inner_task.file.tail(start_seq));
'outer: loop {
tokio::select! {
biased;
_ = inner_task.shutdown.notified() => {
break 'outer;
}
next = stream.next() => {
match next {
None => break 'outer,
Some(Err(RedexError::Lagged)) => {
// Subscriber fell behind the bounded
// tail buffer. Pre-fix this broke the
// fold loop permanently and a watcher
// sitting on `wait_for_seq` would
// silently observe stale state past
// the lag point.
//
// Recover by catching up the gap via
// direct reads from the in-memory
// index, then resubscribing live.
// A naive `file.tail(folded+1)`
// resubscribe would deadlock when the
// gap exceeds `tail_buffer_size`:
// backfill pre-flight would signal
// `Lagged` again and we'd loop on the
// same signal indefinitely.
let resume_head = 'catchup: loop {
let folded = inner_task
.folded_through_seq
.load(Ordering::Acquire);
let resume = if folded == u64::MAX {
start_seq
} else {
folded.saturating_add(1)
};
let head = inner_task.file.next_seq();
if resume >= head {
break 'catchup Some(head);
}
let events = inner_task
.file
.read_range(resume, head);
if events.is_empty() {
// Retention has evicted the
// gap. The fold cannot recover
// what is no longer durable;
// halt rather than silently
// skip ahead.
tracing::error!(
gap_start = resume,
gap_end = head,
"cortex fold task cannot recover \
from tail lag: events evicted by \
retention"
);
break 'catchup None;
}
let mut halted = false;
for event in events {
if handle_event(
&inner_task,
&mut fold,
&event,
policy,
) {
halted = true;
break;
}
}
if halted {
break 'catchup None;
}
};
match resume_head {
Some(head) => {
stream = Box::pin(
inner_task.file.tail(head),
);
tracing::warn!(
"cortex fold task recovered \
from tail lag"
);
}
None => break 'outer,
}
}
Some(Err(_)) => {
// Closed / Io / other terminal error.
break 'outer;
}
Some(Ok(event)) => {
if handle_event(
&inner_task,
&mut fold,
&event,
policy,
) {
break 'outer;
}
}
}
}
}
}
inner_task.running.store(false, Ordering::Release);
inner_task.notify.notify_waiters();
});
Ok(Self { inner })
}
}
impl<State> CortexAdapter<State>
where
State: Serialize + Send + Sync + 'static,
{
/// Capture a point-in-time snapshot of the materialized state.
///
/// Returns `(state_bytes, last_seq)` where `state_bytes` is the
/// postcard-serialized state and `last_seq` is the highest RedEX
/// sequence successfully *applied* to it as a strict prefix
/// (i.e. [`Self::applied_through_seq`]). Persist both together —
/// they form a consistent pair, guaranteed by the adapter
/// holding the state write lock while advancing the watermark.
///
/// Restore via [`Self::open_from_snapshot`] on a State that
/// also implements `DeserializeOwned`. Restore tails from
/// `last_seq + 1`, so any events that were processed but
/// *skipped* via `recoverable_decode` between snapshots are
/// re-attempted on restore — preserving "log is the source of
/// truth" even across the skip-and-continue path. (Pre-fix,
/// `snapshot` read `folded_through_seq`, which advances over
/// skipped events; restore tailed from past them and the gap
/// became permanent in durable state.)
///
/// **Re-apply double-counting.** `state_bytes` reflects
/// in-memory state at snapshot time, which includes the
/// effects of every successful fold *including* applies past
/// any prior skip. Restore tails from the strict-prefix
/// `last_seq + 1`, so seqs past the skip are re-fed to the
/// fold function — fold functions that are not idempotent
/// against re-application will produce divergent state on
/// restore. The mitigations: (1) make the fold idempotent
/// (the standard event-sourcing recommendation); (2)
/// snapshot only when `applied_through_seq() ==
/// folded_through_seq()` (no gap → no re-apply); or (3)
/// accept best-effort restore semantics for adapters that
/// have ever observed a recoverable_decode skip. The
/// trade-off vs. the pre-fix behavior is asymmetric:
/// pre-fix, the skipped seq was permanently lost; post-fix,
/// the skipped seq is re-attempted, at the cost of
/// double-applying intervening successful seqs for
/// non-idempotent folds.
///
/// `last_seq` is `None` if no event has been applied yet
/// since open (the snapshot is still meaningful — it
/// represents the initial State — but callers typically wait
/// until [`Self::wait_for_seq`] has returned and
/// [`Self::applied_through_seq`] has advanced before
/// snapshotting).
pub fn snapshot(&self) -> Result<(Vec<u8>, Option<u64>), CortexAdapterError> {
let state = self.inner.state.read();
let bytes = postcard::to_allocvec(&*state).map_err(|e| {
CortexAdapterError::Redex(RedexError::Encode(format!("snapshot serialize: {}", e)))
})?;
let watermark = self.inner.applied_through_seq.load(Ordering::Acquire);
let last_seq = if watermark == u64::MAX {
None
} else {
Some(watermark)
};
Ok((bytes, last_seq))
}
}
impl<State> CortexAdapter<State>
where
State: DeserializeOwned + Send + Sync + 'static,
{
/// Open an adapter from a previously-captured snapshot, skipping
/// the `[0, last_seq]` replay.
///
/// `state_bytes` is the blob returned from [`Self::snapshot`].
/// `last_seq` is its companion sequence. The tail starts at
/// `last_seq + 1`; the initial state is deserialized from the
/// blob; the fold task is spawned as usual.
///
/// If `last_seq` is `None` (no events had been folded at
/// snapshot time), the tail starts at seq 0 — equivalent to
/// `StartPosition::FromBeginning` with the deserialized initial
/// state.
pub fn open_from_snapshot<F>(
redex: &Redex,
name: &ChannelName,
redex_config: RedexFileConfig,
adapter_config: CortexAdapterConfig,
fold: F,
state_bytes: &[u8],
last_seq: Option<u64>,
) -> Result<Self, CortexAdapterError>
where
F: RedexFold<State> + Send + 'static,
{
let initial_state: State = postcard::from_bytes(state_bytes).map_err(|e| {
CortexAdapterError::Redex(RedexError::Encode(format!("deserialize snapshot: {}", e)))
})?;
let start = match last_seq {
Some(n) => {
let next = n.checked_add(1).ok_or_else(|| {
CortexAdapterError::Redex(RedexError::Encode(
"snapshot last_seq at u64::MAX; cannot resume".to_string(),
))
})?;
StartPosition::FromSeq(next)
}
None => StartPosition::FromBeginning,
};
let config = CortexAdapterConfig {
start,
on_fold_error: adapter_config.on_fold_error,
ryw_inflight_cap: adapter_config.ryw_inflight_cap,
};
// Route through `open_unchecked` so the externally-
// rehydrated state can skip its event prefix.
Self::open_unchecked(redex, name, redex_config, config, fold, initial_state)
}
}
impl<State> Clone for CortexAdapter<State> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<State> std::fmt::Debug for CortexAdapter<State> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CortexAdapter")
.field("folded_through_seq", &self.folded_through_seq())
.field("applied_through_seq", &self.applied_through_seq())
.field("fold_errors", &self.fold_errors())
.field("running", &self.is_running())
.field("closed", &self.inner.closed.load(Ordering::Acquire))
.finish()
}
}
/// Apply one event. Returns `true` if the task should exit
/// (Stop policy + error).
fn handle_event<State, F>(
inner: &Arc<AdapterInner<State>>,
fold: &mut F,
event: &RedexEvent,
policy: FoldErrorPolicy,
) -> bool
where
F: RedexFold<State>,
{
let seq = event.entry.seq;
// Guard the `u64::MAX` sentinel. `applied_through_seq` /
// `folded_through_seq` use `u64::MAX` to encode "nothing
// folded yet"; a real applied seq of `u64::MAX` (only
// reachable after the file's `next_seq` wraps a full 2^64
// append range, but `wrapping_add(1)` in the strict-prefix
// advance below can hit `u64::MAX` from `prev == u64::MAX -
// 1`) would silently fold into the sentinel and `snapshot`
// would persist `None`, making restore re-replay the entire
// chain. `watermark.rs::Watermark` guards the analogous
// `seq_or_ts` case the same way; the redex-seq path needs
// the same gate. Halt rather than overflow the sentinel.
if seq == u64::MAX {
tracing::error!(
"cortex fold halting: redex seq reached u64::MAX (sentinel \
collision); this means the file has assigned every possible \
seq value, an effectively impossible-without-bug condition"
);
return true;
}
// Hold the write lock across both the fold and the watermark
// update so that a `snapshot()` holding `state.read()` observes
// a consistent `(state, folded_through_seq)` pair — otherwise
// the state could reflect seq N while the watermark still reads
// N-1, causing restore to double-apply event N.
let result = {
let mut state = inner.state.write();
let r = fold.apply(event, &mut state);
// Under `Stop` policy, a per-event recoverable decode
// failure (postcard error, EventMeta shape mismatch —
// anything `RedexError::is_recoverable_decode` flags) is
// treated as skip-and-continue rather than halting. Halting
// on every such failure would let a single bad event (disk
// corruption past the 32-bit checksum, or a
// deliberately-crafted matching-collision tail) wedge the
// fold task permanently — a DoS vector against multi-tenant
// cortex instances via one bad event. Stream-level errors
// (`Io`, `Closed`, `Lagged`) and authorization /
// configuration errors still halt under `Stop` as
// documented.
//
// Two watermarks; both written under the state write lock
// so a concurrent `snapshot` reading either one observes a
// value consistent with the state it sees:
// - `applied_through_seq` is a STRICT-PREFIX watermark:
// it advances by exactly one only when this seq is
// the immediate successor of the current value (or
// equals `start_seq` from the sentinel state). Any
// skip — `recoverable_decode` under `Stop`, OR any
// error under `LogAndContinue` — breaks the prefix
// at the skipped seq; further successful applies
// cannot heal it. `snapshot` persists this watermark
// so restore tails from a position guaranteed to
// have every prior seq reflected in state.
// - `folded_through_seq` is the HIGHEST PROCESSED
// watermark: advances on `Ok(())` AND on every skip
// path. Live consumers waiting via `wait_for_seq` use
// this watermark so a single bad event doesn't
// deadlock them — the DoS-resistance contract.
// Why two watermarks, not one strict-prefix that lives
// consumers also wait on: collapsing them re-introduces
// the deadlock — a consumer waiting for the skipped seq
// would block forever even though the fold task has
// moved on.
// Order: `applied` is stored first so any reader that
// observes the `folded` Release also synchronizes-with
// the `applied` Release that preceded it — `applied <=
// folded` is therefore an invariant at every observable
// point.
let applied = matches!(&r, Ok(()));
let recoverable_decode = matches!(&r, Err(e) if e.is_recoverable_decode());
let advance = applied
|| matches!((&r, policy), (Err(_), FoldErrorPolicy::LogAndContinue))
|| recoverable_decode;
if applied {
// Strict-prefix advance: bump by exactly one if this
// seq is the immediate successor of the current
// applied watermark (or this is the very first event
// and matches `start_seq`). Otherwise the prefix is
// broken by a prior skip; leave the watermark alone
// and let restore re-attempt from the gap.
let prev = inner.applied_through_seq.load(Ordering::Acquire);
let advance_applied = if prev == u64::MAX {
seq == inner.start_seq
} else {
seq == prev.wrapping_add(1)
};
if advance_applied {
inner.applied_through_seq.store(seq, Ordering::Release);
}
}
if advance {
inner.folded_through_seq.store(seq, Ordering::Release);
}
r
};
match result {
Ok(()) => {
inner.notify.notify_waiters();
let _ = inner.changes_tx.send(seq);
false
}
Err(err) => {
inner.fold_errors.fetch_add(1, Ordering::AcqRel);
tracing::warn!(seq = seq, error = %err, "cortex fold error");
// Per-event decode errors always skip-and-continue;
// only stream-level / configuration errors halt under
// `Stop`.
let recoverable_decode = err.is_recoverable_decode();
match policy {
FoldErrorPolicy::Stop if !recoverable_decode => {
// Wake subscribers via `notify_waiters` so
// anything parked on `inner.notify` unblocks
// and can observe the halt via
// `is_running()`. Do NOT broadcast `seq` on
// `changes_tx`: this branch did not advance
// `folded_through_seq` (the `advance` gate
// above is false for `Stop + non-recoverable`),
// and `changes_tx` is documented as carrying
// *successful fold-apply* notifications. A
// `ChangeEvent::Seq(seq)` for an unapplied
// sequence would mislead consumers into
// thinking the watermark advanced past the
// failure — the very mis-routing the broadcast
// contract was designed to avoid.
//
// The trade-off: subscribers using
// `changes_with_lag()` won't see a terminal
// event in the stream on halt; they need to
// poll `is_running()` separately (or rely on
// the broadcast channel ending when all adapter
// handles are dropped). That's the documented
// failure mode for non-recoverable halts —
// surfacing a phantom seq was not.
inner.notify.notify_waiters();
true
}
FoldErrorPolicy::Stop | FoldErrorPolicy::LogAndContinue => {
// Watermark was already advanced inside the lock
// above; just notify waiters.
inner.notify.notify_waiters();
let _ = inner.changes_tx.send(seq);
false
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::super::super::channel::ChannelName;
use super::super::super::redex::{RedexError, RedexFold};
use super::super::envelope::EventEnvelope;
use super::super::meta::EventMeta;
use super::*;
use bytes::Bytes;
fn cn(s: &str) -> ChannelName {
ChannelName::new(s).unwrap()
}
struct CountFold;
impl RedexFold<u64> for CountFold {
fn apply(&mut self, _ev: &RedexEvent, state: &mut u64) -> Result<(), RedexError> {
*state += 1;
Ok(())
}
}
/// Pin: `wait_for_seq(seq)` short-circuits without blocking
/// when `seq < start_seq` — those events were applied before
/// the adapter opened (e.g. they're folded into the snapshot
/// the caller passed to `open_from_snapshot`). Pre-fix the
/// function used only the `watermark >= seq` check; until at
/// least one event landed under the new adapter, the
/// `u64::MAX` "nothing folded yet" sentinel kept the
/// comparison false and a caller waiting for a stale seq
/// would block forever.
#[tokio::test]
async fn wait_for_seq_short_circuits_below_start_seq() {
let redex = Redex::new();
// Pre-populate the file with 5 events via a temporary
// FromBeginning adapter, then snapshot.
let bytes;
let last_seq;
{
let pre = CortexAdapter::<u64>::open(
&redex,
&cn("cortex/short-circuit"),
RedexFileConfig::default(),
CortexAdapterConfig::default(),
CountFold,
0u64,
)
.unwrap();
for i in 0..5u64 {
let meta = EventMeta::new(1, 0, 1, i, 0);
let env = EventEnvelope::new(meta, Bytes::from_static(b""));
let seq = pre.ingest(env).unwrap();
pre.wait_for_seq(seq).await.unwrap();
}
let (b, ls) = pre.snapshot().unwrap();
bytes = b;
last_seq = ls;
pre.close().unwrap();
}
// Restore from snapshot: `start_seq` is `last_seq + 1 =
// 5` (the snapshot already absorbed seqs 0..=4). Any
// wait_for_seq below 5 is conceptually behind us.
let adapter = CortexAdapter::<u64>::open_from_snapshot(
&redex,
&cn("cortex/short-circuit"),
RedexFileConfig::default(),
CortexAdapterConfig::default(),
CountFold,
&bytes,
last_seq,
)
.unwrap();
// wait_for_seq(0..5) must all return immediately.
// Wrap each in a tight timeout — pre-fix behavior was
// an indefinite block.
for seq in 0..5u64 {
tokio::time::timeout(std::time::Duration::from_secs(2), adapter.wait_for_seq(seq))
.await
.unwrap_or_else(|_| {
panic!(
"wait_for_seq({}) blocked past start_seq=5 — \
short-circuit regressed",
seq
)
})
.expect("wait_for_seq must Ok-return when seq < start_seq");
}
}
#[tokio::test]
async fn test_open_ingest_wait_query() {
let redex = Redex::new();
let adapter = CortexAdapter::<u64>::open(
&redex,
&cn("cortex/counts"),
RedexFileConfig::default(),
CortexAdapterConfig::default(),
CountFold,
0u64,
)
.unwrap();
for i in 0..10u64 {
let meta = EventMeta::new(1, 0, 1, i, 0);
let env = EventEnvelope::new(meta, Bytes::from_static(b""));
let seq = adapter.ingest(env).unwrap();
adapter.wait_for_seq(seq).await.unwrap();
}
assert_eq!(*adapter.state().read(), 10);
assert_eq!(adapter.fold_errors(), 0);
assert!(adapter.is_running());
}
#[tokio::test]
async fn ingest_with_token_carries_envelope_origin_and_assigned_seq() {
let redex = Redex::new();
let adapter = CortexAdapter::<u64>::open(
&redex,
&cn("cortex/ryw-token"),
RedexFileConfig::default(),
CortexAdapterConfig::default(),
CountFold,
0u64,
)
.unwrap();
let origin: u64 = 0xCAFE_F00D_DEAD_BEEF;
let meta = EventMeta::new(1, 0, origin, 0, 0);
let env = EventEnvelope::new(meta, Bytes::from_static(b""));
let token = adapter.ingest_with_token(env).unwrap();
assert_eq!(token.origin_hash, origin);
assert_eq!(token.seq, 0);
adapter
.wait_for_token(token, Duration::from_secs(2))
.await
.unwrap();
assert_eq!(*adapter.state().read(), 1);
}
#[tokio::test]
async fn wait_for_token_returns_queue_full_above_cap() {
let redex = Redex::new();
let cfg = CortexAdapterConfig::default().with_ryw_inflight_cap(2);
let adapter = Arc::new(
CortexAdapter::<u64>::open(
&redex,
&cn("cortex/ryw-queue"),
RedexFileConfig::default(),
cfg,
CountFold,
0u64,
)
.unwrap(),
);
// Pin two waiters on a seq that never lands — they hold the
// permits until their deadline elapses.
let token = WriteToken::new(0xABCD_EF01, 999);
let a = adapter.clone();
let h1 = tokio::spawn(async move { a.wait_for_token(token, Duration::from_secs(5)).await });
let a = adapter.clone();
let h2 = tokio::spawn(async move { a.wait_for_token(token, Duration::from_secs(5)).await });
// Give both tasks a moment to claim their permits.
tokio::time::sleep(Duration::from_millis(50)).await;
// A third waiter on the same adapter should be rejected
// immediately with QueueFull — no wait, no timeout.
let started = tokio::time::Instant::now();
let err = adapter
.wait_for_token(token, Duration::from_secs(5))
.await
.unwrap_err();
assert_eq!(err, WaitForTokenError::QueueFull);
assert!(
started.elapsed() < Duration::from_millis(100),
"QueueFull must return immediately, not wait for deadline"
);
// Drop the holders so their tasks finish on their own
// schedule.
let _ = (h1, h2);
}
#[tokio::test]
async fn ryw_metrics_track_waits_timeouts_and_queue_full() {
let redex = Redex::new();
let cfg = CortexAdapterConfig::default().with_ryw_inflight_cap(1);
let adapter = Arc::new(
CortexAdapter::<u64>::open(
&redex,
&cn("cortex/ryw-metrics"),
RedexFileConfig::default(),
cfg,
CountFold,
0u64,
)
.unwrap(),
);
// No waits yet → all zeros.
let s0 = adapter.ryw_metrics();
assert_eq!(s0, RywMetricsSnapshot::default());
// Pin a long waiter to hold the only permit, then attempt a
// second wait — it must hit QueueFull and bump that counter.
let token = WriteToken::new(0xABCD_EF01, 999);
let a = adapter.clone();
let holder = tokio::spawn(async move {
let _ = a.wait_for_token(token, Duration::from_secs(2)).await;
});
tokio::time::sleep(Duration::from_millis(50)).await;
let err = adapter
.wait_for_token(token, Duration::from_secs(1))
.await
.unwrap_err();
assert_eq!(err, WaitForTokenError::QueueFull);
// Wait for the holder to time out, then check the counters.
holder.await.unwrap();
let s1 = adapter.ryw_metrics();
assert_eq!(s1.waits_total, 1, "holder takes one permit + one wait slot");
assert_eq!(s1.timeouts_total, 1, "holder's deadline elapsed");
assert_eq!(s1.queue_full_total, 1, "saturating attempt was rejected");
assert_eq!(s1.wrong_origin_total, 0);
assert!(
s1.wait_duration_nanos_sum >= 1_000_000_000,
"holder waited at least its 2s deadline, observed {}ns",
s1.wait_duration_nanos_sum
);
}
#[tokio::test]
async fn wait_for_token_with_zero_cap_skips_queue_check() {
let redex = Redex::new();
let cfg = CortexAdapterConfig::default().with_ryw_inflight_cap(0);
let adapter = CortexAdapter::<u64>::open(
&redex,
&cn("cortex/ryw-uncapped"),
RedexFileConfig::default(),
cfg,
CountFold,
0u64,
)
.unwrap();
// With cap=0 the semaphore is None; the path goes straight
// to the deadline.
let token = WriteToken::new(0xABCD_EF01, 999);
let err = adapter
.wait_for_token(token, Duration::from_millis(20))
.await
.unwrap_err();
assert_eq!(err, WaitForTokenError::Timeout);
}
#[tokio::test]
async fn wait_for_token_times_out_when_seq_never_lands() {
let redex = Redex::new();
let adapter = CortexAdapter::<u64>::open(
&redex,
&cn("cortex/ryw-timeout"),
RedexFileConfig::default(),
CortexAdapterConfig::default(),
CountFold,
0u64,
)
.unwrap();
let unreachable = WriteToken::new(0xDEAD_BEEF, 999);
let err = adapter
.wait_for_token(unreachable, Duration::from_millis(50))
.await
.unwrap_err();
assert_eq!(err, WaitForTokenError::Timeout);
}
#[tokio::test]
async fn test_close_stops_fold_task() {
let redex = Redex::new();
let adapter = CortexAdapter::<u64>::open(
&redex,
&cn("cortex/close"),
RedexFileConfig::default(),
CortexAdapterConfig::default(),
CountFold,
0u64,
)
.unwrap();
adapter.close().unwrap();
// Close is idempotent.
adapter.close().unwrap();
// Ingest after close returns Closed.
let meta = EventMeta::new(0, 0, 0, 0, 0);
let env = EventEnvelope::new(meta, Bytes::from_static(b""));
let err = adapter.ingest(env).unwrap_err();
assert!(matches!(err, CortexAdapterError::Closed));
// State handle still readable.
assert_eq!(*adapter.state().read(), 0);
}
struct FailAtSeq(u64);
impl RedexFold<u64> for FailAtSeq {
fn apply(&mut self, ev: &RedexEvent, state: &mut u64) -> Result<(), RedexError> {
if ev.entry.seq == self.0 {
Err(RedexError::Encode(format!(
"deliberate failure at seq {}",
ev.entry.seq
)))
} else {
*state += 1;
Ok(())
}
}
}
/// Returns `RedexError::Decode` (recoverable) at the
/// configured seqs; counts the attempt so tests can assert
/// re-attempt after restore. Apply on any other seq bumps
/// state.
struct FailDecodeAtSeqs {
skip: std::collections::HashSet<u64>,
attempts: Arc<AtomicU64>,
}
impl RedexFold<u64> for FailDecodeAtSeqs {
fn apply(&mut self, ev: &RedexEvent, state: &mut u64) -> Result<(), RedexError> {
self.attempts.fetch_add(1, Ordering::AcqRel);
if self.skip.contains(&ev.entry.seq) {
Err(RedexError::Decode(format!(
"deliberate decode skip at seq {}",
ev.entry.seq
)))
} else {
*state += 1;
Ok(())
}
}
}
#[tokio::test]
async fn test_stop_policy_halts_on_first_error() {
let redex = Redex::new();
let adapter = CortexAdapter::<u64>::open(
&redex,
&cn("cortex/stop"),
RedexFileConfig::default(),
CortexAdapterConfig::default(), // Stop is default
FailAtSeq(3),
0u64,
)
.unwrap();
for i in 0..10u64 {
let meta = EventMeta::new(0, 0, 0, i, 0);
let env = EventEnvelope::new(meta, Bytes::from_static(b""));
adapter.ingest(env).unwrap();
}
// Wait until fold task stops. wait_for_seq surfaces the
// halt as Err(Some(folded_through)) — pre-fix it returned
// silently and the caller had no way to distinguish a
// halt from a successful fold-through.
let folded = adapter
.wait_for_seq(10)
.await
.expect_err("Stop policy must surface the halt to wait_for_seq");
// Seqs 0..=2 folded; seq 3 errored; the watermark caps at
// 2 under Stop+non-recoverable so `folded_through` reflects
// the successful prefix.
assert_eq!(folded, Some(2));
assert!(!adapter.is_running());
assert_eq!(adapter.fold_errors(), 1);
// Seqs 0..=2 folded; seq 3 errored; seqs 4..=9 never folded.
assert_eq!(*adapter.state().read(), 3);
}
/// `changes_tx` is the broadcast channel `changes_with_lag`
/// surfaces as `ChangeEvent::Seq(u64)` — documented as
/// "successful fold-apply notification". On the
/// Stop+non-recoverable halt path, the watermark
/// (`folded_through_seq`) is NOT advanced, so emitting the
/// failing seq on `changes_tx` would mis-represent an
/// unapplied event as if it were folded. Subscribers reading
/// the broadcast and trusting the contract would advance
/// their own state machines past the failure.
///
/// Pin: after a Stop-policy halt, the broadcast must contain
/// the prefix that *did* apply (seqs 0..=2), and must NOT
/// contain the failing seq (3) or any later seq.
#[tokio::test]
async fn stop_policy_does_not_broadcast_failing_seq() {
use futures::StreamExt;
let redex = Redex::new();
let adapter = CortexAdapter::<u64>::open(
&redex,
&cn("cortex/stop-no-phantom-seq"),
RedexFileConfig::default(),
CortexAdapterConfig::default(), // Stop is default
FailAtSeq(3),
0u64,
)
.unwrap();
// Subscribe BEFORE ingesting so we capture every seq.
let mut changes = adapter.changes_with_lag();
for i in 0..10u64 {
let meta = EventMeta::new(0, 0, 0, i, 0);
let env = EventEnvelope::new(meta, Bytes::from_static(b""));
adapter.ingest(env).unwrap();
}
// Wait for halt. wait_for_seq now surfaces the halt as
// `Err(Some(folded))` rather than silently returning.
let _ = adapter
.wait_for_seq(10)
.await
.expect_err("Stop policy halt");
assert!(!adapter.is_running(), "Stop policy must halt the task");
assert_eq!(adapter.fold_errors(), 1);
// Drain the broadcast with a short bound so a regression
// that re-emits the phantom seq doesn't hang the test.
let mut received: Vec<u64> = Vec::new();
loop {
match tokio::time::timeout(std::time::Duration::from_millis(50), changes.next()).await {
Ok(Some(ChangeEvent::Seq(s))) => received.push(s),
Ok(Some(ChangeEvent::Lagged(_))) => continue,
Ok(None) | Err(_) => break,
}
}
// Successful prefix (0, 1, 2) must be visible. The failing
// seq (3) and any later seq must NOT.
assert_eq!(
received,
vec![0, 1, 2],
"broadcast must carry only successfully-folded seqs; \
pre-fix this would include 3 (the failing seq) as a \
phantom Seq(3) event, mis-routing subscribers' state"
);
}
// ========================================================================
// applied_through_seq vs folded_through_seq
// ========================================================================
/// Under `Stop` policy, a per-event recoverable decode error
/// advances `folded_through_seq` (so live consumers don't
/// deadlock on a permanently-bad event) but must NOT advance
/// `applied_through_seq` (state at the skipped seq was never
/// written). When the two were the same atomic,
/// `wait_for_seq(seq)` returned claiming "state reflects seq"
/// for events whose mutation never landed.
#[tokio::test]
async fn recoverable_decode_skip_advances_folded_but_not_applied() {
let redex = Redex::new();
let attempts = Arc::new(AtomicU64::new(0));
let fold = FailDecodeAtSeqs {
skip: [3u64].into_iter().collect(),
attempts: attempts.clone(),
};
let adapter = CortexAdapter::<u64>::open(
&redex,
&cn("cortex/audit-6-skip"),
RedexFileConfig::default(),
CortexAdapterConfig::default(), // Stop is default
fold,
0u64,
)
.unwrap();
for i in 0..6u64 {
let meta = EventMeta::new(0, 0, 0, i, 0);
let env = EventEnvelope::new(meta, Bytes::from_static(b""));
adapter.ingest(env).unwrap();
}
adapter.wait_for_seq(5).await.unwrap();
// Live consumer didn't deadlock — folded reached the tail.
assert_eq!(
adapter.folded_through_seq(),
Some(5),
"folded must advance over the recoverable_decode skip",
);
// STRICT-PREFIX: applied caps at 2 (the last consecutive
// successful seq before the gap at seq 3). Subsequent
// successful applies at 4 and 5 do NOT heal the prefix.
assert_eq!(
adapter.applied_through_seq(),
Some(2),
"strict-prefix watermark caps at the last consecutive Ok before the skip",
);
// State count = 5 (seqs 0,1,2,4,5 applied; 3 skipped).
// The fold function still mutated state for the seqs it
// saw — only the WATERMARK is strict-prefix.
assert_eq!(*adapter.state().read(), 5);
assert_eq!(adapter.fold_errors(), 1);
// Task still running — recoverable decode does not halt
// under Stop.
assert!(adapter.is_running());
// Six attempts (one per event). The `attempts` counter
// pins that the fold function was invoked for the
// skipped seq (skip is detected inside the fold, not
// upstream).
assert_eq!(attempts.load(Ordering::Acquire), 6);
}
/// When the FIRST event is skipped via
/// recoverable_decode, `applied_through_seq`
/// stays at the "nothing applied yet" sentinel until a
/// subsequent successful fold. This is the strict shape: an
/// adapter freshly opened that has only seen skipped events
/// reports `None` from `applied_through_seq`, even though
/// `folded_through_seq` reports the highest skipped seq.
#[tokio::test]
async fn applied_stays_at_sentinel_when_only_skipped_events_processed() {
let redex = Redex::new();
let attempts = Arc::new(AtomicU64::new(0));
let fold = FailDecodeAtSeqs {
skip: [0u64, 1, 2].into_iter().collect(),
attempts: attempts.clone(),
};
let adapter = CortexAdapter::<u64>::open(
&redex,
&cn("cortex/audit-6-only-skips"),
RedexFileConfig::default(),
CortexAdapterConfig::default(),
fold,
0u64,
)
.unwrap();
for i in 0..3u64 {
let meta = EventMeta::new(0, 0, 0, i, 0);
let env = EventEnvelope::new(meta, Bytes::from_static(b""));
adapter.ingest(env).unwrap();
}
adapter.wait_for_seq(2).await.unwrap();
assert_eq!(adapter.folded_through_seq(), Some(2));
assert_eq!(
adapter.applied_through_seq(),
None,
"no successful apply ⇒ applied_through_seq stays at sentinel",
);
assert_eq!(*adapter.state().read(), 0);
}
/// `snapshot()` must use `applied_through_seq`, not
/// `folded_through_seq`. When the two were the same atomic,
/// snapshot persisted the highest folded seq — including
/// skipped events — so a restore tailed from past the skip
/// and the skipped seq became permanently lost from durable
/// state, even though the in-memory state at snapshot time
/// never reflected it.
#[tokio::test]
async fn snapshot_last_seq_reflects_applied_not_folded_after_skip() {
let redex = Redex::new();
let attempts = Arc::new(AtomicU64::new(0));
let fold = FailDecodeAtSeqs {
skip: [5u64].into_iter().collect(),
attempts: attempts.clone(),
};
let adapter = CortexAdapter::<u64>::open(
&redex,
&cn("cortex/audit-7-snapshot"),
RedexFileConfig::default(),
CortexAdapterConfig::default(),
fold,
0u64,
)
.unwrap();
// 8 events; seq 5 will be skipped via recoverable_decode.
for i in 0..8u64 {
let meta = EventMeta::new(0, 0, 0, i, 0);
let env = EventEnvelope::new(meta, Bytes::from_static(b""));
adapter.ingest(env).unwrap();
}
adapter.wait_for_seq(7).await.unwrap();
let folded = adapter.folded_through_seq();
let applied = adapter.applied_through_seq();
assert_eq!(folded, Some(7), "folded includes the skipped seq's slot");
assert_eq!(
applied,
Some(4),
"strict-prefix applied caps at the last consecutive Ok (seq 4) before the gap at seq 5",
);
let (_bytes, last_seq) = adapter.snapshot().unwrap();
// snapshot reflects applied (strict-prefix), not folded.
// Pre-fix snapshot returned Some(7) — restore would tail
// from seq 8 and seq 5 would be permanently lost from
// durable state.
assert_eq!(last_seq, applied);
assert_eq!(last_seq, Some(4));
}
/// Strict shape: when the LAST processed event is skipped,
/// snapshot's `last_seq` must stay at the highest *applied*
/// seq — not advance to the skipped one. Without the split,
/// snapshot returned the skipped seq; restore tailed from
/// past it; skipped event was lost.
#[tokio::test]
async fn snapshot_last_seq_does_not_advance_to_a_skipped_tail() {
let redex = Redex::new();
let attempts = Arc::new(AtomicU64::new(0));
let fold = FailDecodeAtSeqs {
// Skip the LAST event so applied < folded at snapshot time.
skip: [4u64].into_iter().collect(),
attempts: attempts.clone(),
};
let adapter = CortexAdapter::<u64>::open(
&redex,
&cn("cortex/audit-7-tail-skip"),
RedexFileConfig::default(),
CortexAdapterConfig::default(),
fold,
0u64,
)
.unwrap();
for i in 0..5u64 {
let meta = EventMeta::new(0, 0, 0, i, 0);
let env = EventEnvelope::new(meta, Bytes::from_static(b""));
adapter.ingest(env).unwrap();
}
adapter.wait_for_seq(4).await.unwrap();
assert_eq!(adapter.folded_through_seq(), Some(4));
assert_eq!(
adapter.applied_through_seq(),
Some(3),
"tail apply was skipped ⇒ applied stays at the prior successful seq",
);
let (_bytes, last_seq) = adapter.snapshot().unwrap();
assert_eq!(
last_seq,
Some(3),
"pre-fix this would be Some(4) and a restore would tail from 5, \
dropping seq 4 permanently from durable state",
);
}
/// End-to-end: an adapter whose fold previously skipped seq
/// N via recoverable_decode is snapshotted, closed, and
/// reopened via `open_from_snapshot` with a fold that NOW
/// handles seq N successfully (e.g. a software upgrade fixed
/// the decoder). On restore, seq N must be re-fed to the
/// fold function — without the strict-prefix watermark,
/// snapshot's `last_seq` would be the highest folded
/// (advanced over skips), restore would tail from past the
/// skipped seq, and the previously-skipped event would be
/// permanently lost.
///
/// Asserts the strict-prefix `last_seq` and that the skipped
/// seq is re-fed on restore. State-count after restore is
/// fold-dependent (see `snapshot` doc on the re-apply
/// double-counting trade-off) and is not pinned here.
#[tokio::test]
async fn restore_after_skip_re_attempts_skipped_event() {
let redex = Redex::new();
let pre_attempts = Arc::new(AtomicU64::new(0));
let pre_fold = FailDecodeAtSeqs {
skip: [2u64].into_iter().collect(),
attempts: pre_attempts.clone(),
};
let adapter = CortexAdapter::<u64>::open(
&redex,
&cn("cortex/audit-6-7-restore-reattempt"),
RedexFileConfig::default(),
CortexAdapterConfig::default(),
pre_fold,
0u64,
)
.unwrap();
for i in 0..5u64 {
let meta = EventMeta::new(0, 0, 0, i, 0);
let env = EventEnvelope::new(meta, Bytes::from_static(b""));
adapter.ingest(env).unwrap();
}
adapter.wait_for_seq(4).await.unwrap();
// Pre-snapshot state: STRICT-PREFIX applied caps at 1
// (seq 2 was skipped; the prefix breaks there). Folded
// reached 4. The fold function still wrote state for
// every Ok (0,1,3,4) so state count is 4.
assert_eq!(adapter.applied_through_seq(), Some(1));
assert_eq!(adapter.folded_through_seq(), Some(4));
assert_eq!(*adapter.state().read(), 4);
let (bytes, last_seq) = adapter.snapshot().unwrap();
// snapshot must reflect applied (strict-prefix), so
// restore tails from seq 2 and re-attempts the skipped
// event plus everything after.
assert_eq!(last_seq, Some(1));
adapter.close().unwrap();
// Reopen with a fold that handles seq 2 successfully
// this time (no entries in `skip`). The previously-
// skipped event must be re-fed, AND so must seqs 3 and
// 4 (because their prior application is not part of the
// snapshot's strict-prefix watermark — only seqs 0..=1
// are guaranteed reflected in the rehydrated state).
let post_attempts = Arc::new(AtomicU64::new(0));
let post_fold = FailDecodeAtSeqs {
skip: std::collections::HashSet::new(),
attempts: post_attempts.clone(),
};
let restored = CortexAdapter::<u64>::open_from_snapshot(
&redex,
&cn("cortex/audit-6-7-restore-reattempt"),
RedexFileConfig::default(),
CortexAdapterConfig::default(),
post_fold,
&bytes,
last_seq,
)
.unwrap();
restored.wait_for_seq(4).await.unwrap();
// Pre-fix this assertion would FAIL: snapshot would have
// returned Some(4) (folded), restore would tail from
// seq 5, the post_fold would never see seq 2, and the
// skipped event would be permanently lost.
let post_invocations = post_attempts.load(Ordering::Acquire);
assert_eq!(
post_invocations, 3,
"post-restore fold must be re-fed seqs 2, 3, 4 (everything past the \
snapshot's strict-prefix watermark)",
);
// After restore, the strict-prefix watermark heals all
// the way to seq 4 because every event from start_seq=2
// onwards applied successfully on the second pass.
assert_eq!(restored.applied_through_seq(), Some(4));
// State count is fold-dependent on idempotency. CountFold
// is *not* idempotent (each apply increments), so seqs
// 3 and 4 are double-counted: pre-restore in-memory
// state was 4 (seqs 0,1,3,4 applied), snapshot bytes
// serialized that 4, restore deserializes 4 + applies
// seqs 2,3,4 = 7. The audit-fix's *correctness* claim is
// not "state matches what a re-fold from scratch would
// produce" — that requires either an idempotent fold or
// snapshotting only when there's no gap (see `snapshot`
// doc). The claim is "the skipped seq is re-fed instead
// of being permanently lost from durable state" — pinned
// by `post_invocations == 3` above.
assert_eq!(
*restored.state().read(),
7,
"non-idempotent CountFold double-counts seqs past the gap; this is the \
documented re-apply trade-off, not a bug",
);
}
/// Pin: `wait_for_seq(seq)` returns promptly even when seq
/// was skipped via recoverable_decode. The DoS-resistance
/// contract (one bad event must not deadlock live consumers)
/// is preserved by the `applied`/`folded` split — `wait_for_seq`
/// still waits on `folded`, which advances over skips.
#[tokio::test]
async fn wait_for_seq_returns_after_recoverable_decode_skip() {
let redex = Redex::new();
let attempts = Arc::new(AtomicU64::new(0));
let fold = FailDecodeAtSeqs {
skip: [0u64].into_iter().collect(),
attempts: attempts.clone(),
};
let adapter = CortexAdapter::<u64>::open(
&redex,
&cn("cortex/audit-6-no-deadlock"),
RedexFileConfig::default(),
CortexAdapterConfig::default(),
fold,
0u64,
)
.unwrap();
let meta = EventMeta::new(0, 0, 0, 0, 0);
let env = EventEnvelope::new(meta, Bytes::from_static(b""));
let seq = adapter.ingest(env).unwrap();
// wait_for_seq must NOT block on the skipped seq.
// Tight timeout — pre-fix shape never had this hazard;
// the post-fix split must preserve it. (If `wait_for_seq`
// ever silently switches to `applied_through_seq`, this
// regresses with an indefinite hang.)
tokio::time::timeout(std::time::Duration::from_secs(2), adapter.wait_for_seq(seq))
.await
.expect("wait_for_seq must return promptly even for a recoverable-decode-skipped seq")
.expect("recoverable-decode skip still advances the folded watermark");
// Confirm what was actually observable at return:
// folded reached seq, applied did not.
assert_eq!(adapter.folded_through_seq(), Some(0));
assert_eq!(adapter.applied_through_seq(), None);
}
// ========================================================================
// open must reject FromSeq(n>0) / LiveOnly
// ========================================================================
/// `open` rejects `StartPosition::FromSeq(n)` for n > 0
/// because the watermark would advance past events the adapter
/// never folded, leaving `wait_for_seq` lying about applied
/// state. Callers that intentionally skip a prefix must use
/// `open_from_snapshot`.
#[test]
fn open_rejects_from_seq_n_greater_than_zero() {
let redex = Redex::new();
let cfg = CortexAdapterConfig::new().with_start(StartPosition::FromSeq(5));
let result = CortexAdapter::<u64>::open(
&redex,
&cn("cortex/from-seq-guard"),
RedexFileConfig::default(),
cfg,
CountFold,
0u64,
);
assert!(
matches!(result, Err(CortexAdapterError::InvalidStartPosition(_))),
"open must reject FromSeq(n>0), got {:?}",
result.map(|_| "Ok"),
);
}
/// `open` rejects `StartPosition::LiveOnly` for the same
/// reason — the start_seq is `file.next_seq()`, so any prior
/// events go un-folded but the watermark advances past them.
#[test]
fn open_rejects_live_only_start_position() {
let redex = Redex::new();
let cfg = CortexAdapterConfig::new().with_start(StartPosition::LiveOnly);
let result = CortexAdapter::<u64>::open(
&redex,
&cn("cortex/live-only-guard"),
RedexFileConfig::default(),
cfg,
CountFold,
0u64,
);
assert!(
matches!(result, Err(CortexAdapterError::InvalidStartPosition(_))),
"open must reject LiveOnly, got {:?}",
result.map(|_| "Ok"),
);
}
// ========================================================================
// Stop policy must skip-and-continue on per-event Decode errors
// ========================================================================
struct FailDecodeAtSeq(u64);
impl RedexFold<u64> for FailDecodeAtSeq {
fn apply(&mut self, ev: &RedexEvent, state: &mut u64) -> Result<(), RedexError> {
if ev.entry.seq == self.0 {
// Decode-class error: simulates a corrupt postcard
// tail / EventMeta shape mismatch / checksum miss
// — exactly what the cortex fold paths surface as
// RedexError::Decode.
Err(RedexError::Decode(format!(
"deliberate decode failure at seq {}",
ev.entry.seq
)))
} else {
*state += 1;
Ok(())
}
}
}
/// Under `Stop` policy, a `RedexError::Decode` MUST NOT halt
/// the fold task — it's a per-event recoverable failure
/// (corrupt event payload past the 32-bit checksum, or an
/// attacker-crafted matching collision). Pre-fix this hung
/// the task on the first bad event, DoSing the cortex via one
/// payload. Post-fix: the bad event is logged + skipped, the
/// watermark advances, and subsequent events still fold.
#[tokio::test]
async fn stop_policy_skips_recoverable_decode_error() {
let redex = Redex::new();
let adapter = CortexAdapter::<u64>::open(
&redex,
&cn("cortex/decode-skip"),
RedexFileConfig::default(),
CortexAdapterConfig::default(), // Stop is default
FailDecodeAtSeq(3),
0u64,
)
.unwrap();
for i in 0..10u64 {
let meta = EventMeta::new(0, 0, 0, i, 0);
let env = EventEnvelope::new(meta, Bytes::from_static(b""));
let seq = adapter.ingest(env).unwrap();
adapter.wait_for_seq(seq).await.unwrap();
}
// Fold task is still running — the decode error didn't
// halt it. fold_errors counts the one bad event.
assert!(
adapter.is_running(),
"Stop policy must NOT halt on RedexError::Decode"
);
assert_eq!(adapter.fold_errors(), 1);
// Seqs 0,1,2,4,5,6,7,8,9 folded; seq 3 skipped.
assert_eq!(*adapter.state().read(), 9);
}
/// `Encode` errors (storage / user-fold-level) STILL halt
/// under `Stop` — pins the conservative boundary so the
/// recoverable-decode carve-out is strictly limited to per-event
/// decode failures. The pre-existing `test_stop_policy_halts_on_first_error`
/// already exercises this with `RedexError::Encode`, but we
/// pin the contract explicitly here so a future expansion of
/// `is_recoverable_decode` (e.g. accidentally including
/// `Encode`) is caught.
#[test]
fn redex_error_recoverable_decode_classification_is_decode_only() {
assert!(RedexError::Decode("x".into()).is_recoverable_decode());
assert!(!RedexError::Encode("x".into()).is_recoverable_decode());
assert!(!RedexError::Closed.is_recoverable_decode());
assert!(!RedexError::Io("x".into()).is_recoverable_decode());
assert!(!RedexError::Lagged.is_recoverable_decode());
assert!(!RedexError::Unauthorized.is_recoverable_decode());
}
/// `FromSeq(0)` is equivalent to `FromBeginning` (no events
/// skipped) and must still be accepted — pins the boundary so
/// the start-position guard doesn't accidentally lock out the
/// degenerate-but-valid `FromSeq(0)` form.
#[tokio::test]
async fn open_accepts_from_seq_zero() {
let redex = Redex::new();
let cfg = CortexAdapterConfig::new().with_start(StartPosition::FromSeq(0));
CortexAdapter::<u64>::open(
&redex,
&cn("cortex/from-seq-zero"),
RedexFileConfig::default(),
cfg,
CountFold,
0u64,
)
.expect("FromSeq(0) is equivalent to FromBeginning and must be accepted");
}
// ========================================================================
// changes_with_lag must surface BroadcastStream::Lagged
// ========================================================================
/// `changes_with_lag` yields a `ChangeEvent::Lagged(n)` when a
/// subscriber falls behind the broadcast channel capacity. Pre-
/// fix `changes()` silently dropped these events via
/// `filter_map(|r| r.ok())`; downstream telemetry consumers had
/// no way to detect or count missed change notifications.
///
/// Setup: subscribe, then ingest more than CHANGES_BROADCAST_CAP
/// (64) events without polling the stream. The broadcast channel
/// drops the oldest, and the next stream poll surfaces a
/// `Lagged(n)` for the dropped count.
#[tokio::test]
async fn changes_with_lag_yields_lagged_when_subscriber_falls_behind() {
let redex = Redex::new();
let adapter = CortexAdapter::<u64>::open(
&redex,
&cn("cortex/lag"),
RedexFileConfig::default(),
CortexAdapterConfig::default(),
CountFold,
0u64,
)
.unwrap();
let stream = adapter.changes_with_lag();
tokio::pin!(stream);
// Ingest CHANGES_BROADCAST_CAP + 16 events without polling
// the stream. The broadcast channel will drop the oldest 16
// (or thereabouts — the exact count depends on broadcast
// semantics; we just need at least one Lagged emission).
let total = (CHANGES_BROADCAST_CAP + 16) as u64;
for i in 0..total {
let meta = EventMeta::new(0, 0, 0, i, 0);
let env = EventEnvelope::new(meta, Bytes::from_static(b""));
let seq = adapter.ingest(env).unwrap();
adapter.wait_for_seq(seq).await.unwrap();
}
// First poll should see a Lagged event (the broadcast channel
// has overflowed). Drain the stream up to a reasonable cap and
// assert at least one Lagged event was observed.
let mut saw_lagged = false;
let mut saw_seq = false;
for _ in 0..(total as usize + 4) {
match tokio::time::timeout(std::time::Duration::from_millis(50), stream.next()).await {
Ok(Some(ChangeEvent::Lagged(n))) => {
saw_lagged = true;
assert!(n > 0, "Lagged count must be positive");
}
Ok(Some(ChangeEvent::Seq(_))) => {
saw_seq = true;
}
Ok(None) | Err(_) => break,
}
}
assert!(
saw_lagged,
"subscriber that fell behind {} events must observe Lagged",
CHANGES_BROADCAST_CAP + 16,
);
assert!(
saw_seq,
"the stream should still emit Seq events after the lag",
);
}
/// Pin: when the tail stream surfaces `RedexError::Lagged` the
/// fold task catches the gap up via direct in-memory reads and
/// resubscribes live, rather than silently exiting. Pre-fix
/// the match arm broke out of the fold loop the first time the
/// subscriber fell behind `tail_buffer_size`; state then never
/// advanced past the lag point and `wait_for_seq` returned
/// immediately on the `running == false` branch with no
/// indication anything went wrong.
///
/// Triggers `Lagged` deterministically via the backfill
/// pre-flight: 50 events in the index with `tail_buffer_size =
/// 4` exceeds the buffer, so `RedexFile::tail` signals
/// `Lagged` as the very first stream item.
#[tokio::test]
async fn fold_task_recovers_from_tail_lagged() {
let redex = Redex::new();
let cfg = RedexFileConfig::default().with_tail_buffer_size(4);
// Stage the gap in the file's in-memory index without
// going through an adapter (no watchers → no lag pressure
// during stage). `CountFold` ignores payload, so empty
// bytes are fine.
let file = redex
.open_file(&cn("cortex/tail-lag-recovery"), cfg)
.unwrap();
for _ in 0..50u64 {
file.append(b"").unwrap();
}
// Open a FromBeginning adapter on the same file. The
// tail()'s backfill pre-flight sees 50 retained events vs.
// buffer=4 and signals Lagged first thing. Post-fix the
// recovery loop reads the gap from the index and tails
// live; pre-fix the fold task exits and state stays at 0.
let adapter = CortexAdapter::<u64>::open(
&redex,
&cn("cortex/tail-lag-recovery"),
RedexFileConfig::default(), // ignored on reopen
CortexAdapterConfig::default(),
CountFold,
0u64,
)
.unwrap();
tokio::time::timeout(std::time::Duration::from_secs(5), adapter.wait_for_seq(49))
.await
.expect("wait_for_seq(49) timed out — tail-Lagged recovery regressed")
.expect("fold task must remain alive through Lagged recovery");
assert_eq!(*adapter.state().read(), 50);
assert!(
adapter.is_running(),
"fold task must stay alive after recovering from tail lag",
);
assert_eq!(adapter.fold_errors(), 0);
}
/// `changes()` continues to silently drop lag (the documented
/// best-effort behavior) — pins the contract so a future
/// refactor doesn't accidentally surface `Lagged` through the
/// simple stream and break consumers that don't want it.
#[tokio::test]
async fn changes_filters_out_lag_silently() {
let redex = Redex::new();
let adapter = CortexAdapter::<u64>::open(
&redex,
&cn("cortex/lag-silent"),
RedexFileConfig::default(),
CortexAdapterConfig::default(),
CountFold,
0u64,
)
.unwrap();
let stream = adapter.changes();
tokio::pin!(stream);
// Same overflow setup.
let total = (CHANGES_BROADCAST_CAP + 16) as u64;
for i in 0..total {
let meta = EventMeta::new(0, 0, 0, i, 0);
let env = EventEnvelope::new(meta, Bytes::from_static(b""));
let seq = adapter.ingest(env).unwrap();
adapter.wait_for_seq(seq).await.unwrap();
}
// Drain everything we can from the stream. Item type is `u64`
// (not Result), so we can't observe Lagged in any form. Just
// verify the stream still produces some seqs without errors.
let mut got_seq = false;
for _ in 0..(total as usize + 4) {
match tokio::time::timeout(std::time::Duration::from_millis(50), stream.next()).await {
Ok(Some(_seq)) => {
got_seq = true;
}
Ok(None) | Err(_) => break,
}
}
assert!(got_seq, "changes() must still emit seqs after lag");
}
#[tokio::test]
async fn test_log_and_continue_skips_errors() {
let redex = Redex::new();
let cfg =
CortexAdapterConfig::new().with_fold_error_policy(FoldErrorPolicy::LogAndContinue);
let adapter = CortexAdapter::<u64>::open(
&redex,
&cn("cortex/lc"),
RedexFileConfig::default(),
cfg,
FailAtSeq(3),
0u64,
)
.unwrap();
for i in 0..10u64 {
let meta = EventMeta::new(0, 0, 0, i, 0);
let env = EventEnvelope::new(meta, Bytes::from_static(b""));
let seq = adapter.ingest(env).unwrap();
adapter.wait_for_seq(seq).await.unwrap();
}
assert!(adapter.is_running());
assert_eq!(adapter.fold_errors(), 1);
// All seqs except 3 were folded → state == 9.
assert_eq!(*adapter.state().read(), 9);
}
}