batpak 0.3.0

Event sourcing with causal graphs and policy gates. Sync API, zero async.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
// Intentional impossible-feature guard: exponential backoff belongs in the product supervisor, not the library.
// exponential-backoff is not a declared feature — suppress cfg warning for this guard
#[allow(unexpected_cfgs)]
#[cfg(feature = "exponential-backoff")]
compile_error!(
    "Red flag: only Once and Bounded restart policies. \
     Exponential backoff belongs in the product's supervisor, not here. \
     See: SPEC.md ## RED FLAGS."
);

use crate::coordinate::{Coordinate, DagPosition};
use crate::event::{Event, EventHeader, EventKind, HashChain, StoredEvent};
use crate::store::contracts::{BatchAppendItem, CausationRef};
use crate::store::index::{DiskPos, IndexEntry, StoreIndex};
use crate::store::segment::{self, Active, FramePayloadRef, Segment};
use crate::store::{AppendReceipt, BatchStage, StoreConfig, StoreError};
use flume::{Receiver, Sender, TrySendError};
use parking_lot::Mutex;
use std::panic::AssertUnwindSafe;
use std::sync::Arc;
use std::time::Instant;
use tracing::{debug, info, trace};

/// Entity name for batch system markers (BEGIN/COMMIT). Not user-visible.
const BATCH_MARKER_ENTITY: &str = "_batch";
/// Scope name for batch system markers (BEGIN/COMMIT). Not user-visible.
const BATCH_MARKER_SCOPE: &str = "_system";

/// WriterCommand: messages sent to the background writer thread via flume.
/// All respond channels: flume::Sender — sync send from writer, async recv from caller.
/// [SPEC:src/store/writer.rs]
pub(crate) enum WriterCommand {
    Append {
        coord: Coordinate,
        event: Box<Event<Vec<u8>>>, // pre-serialized payload as msgpack bytes
        kind: EventKind,
        guards: AppendGuards,
        respond: Sender<Result<AppendReceipt, StoreError>>,
    },
    AppendBatch {
        items: Vec<BatchAppendItem>,
        respond: Sender<Result<Vec<AppendReceipt>, StoreError>>,
    },
    Sync {
        respond: Sender<Result<(), StoreError>>,
    },
    Shutdown {
        respond: Sender<Result<(), StoreError>>,
    },
    /// Test-only: trigger a panic in the writer thread to exercise restart_policy.
    #[cfg(feature = "dangerous-test-hooks")]
    #[doc(hidden)]
    PanicForTest {
        respond: Sender<Result<(), StoreError>>,
    },
}

/// WriterHandle: owned by Store. Communicates with the background thread.
pub(crate) struct WriterHandle {
    pub tx: Sender<WriterCommand>,
    pub subscribers: Arc<SubscriberList>,
    pub reactor_subscribers: Arc<ReactorSubscriberList>,
    _thread: Option<std::thread::JoinHandle<()>>,
}

/// SubscriberList: push-based notification fanout via flume channels.
/// [SPEC:src/store/writer.rs — try_send pattern]
pub(crate) struct SubscriberList {
    senders: Mutex<Vec<Sender<Notification>>>,
}

/// Private richer event envelope used by internal reactor consumers so they do
/// not need to re-read the just-committed event from disk.
#[derive(Clone, Debug)]
pub(crate) struct CommittedEventEnvelope {
    pub notification: Notification,
    pub stored: StoredEvent<serde_json::Value>,
}

pub(crate) struct ReactorSubscriberList {
    senders: Mutex<Vec<Sender<CommittedEventEnvelope>>>,
}

/// Notification: lightweight event summary pushed to subscribers.
/// Must derive Clone (used in try_send broadcast loop).
/// [SPEC:src/store/writer.rs — Notification struct]
#[derive(Clone, Debug)]
pub struct Notification {
    /// Unique ID of the event that was appended.
    pub event_id: u128,
    /// Correlation ID linking this event to a causal chain.
    pub correlation_id: u128,
    /// ID of the event that caused this one; `None` for root-cause events.
    pub causation_id: Option<u128>,
    /// Entity and scope coordinates for the event.
    pub coord: Coordinate,
    /// Event kind (type discriminant).
    pub kind: EventKind,
    /// Global sequence number assigned to this event at commit time.
    pub sequence: u64,
}

/// RestartPolicy: how the writer recovers from panics.
/// [SPEC:src/store/writer.rs — RestartPolicy]
/// EXACTLY two variants. Adding a third violates the RED FLAGS.
#[derive(Clone, Debug, Default)]
#[non_exhaustive]
pub enum RestartPolicy {
    /// Allow at most one automatic restart after a writer panic.
    #[default]
    Once,
    /// Allow up to `max_restarts` automatic restarts within a rolling `within_ms` millisecond window.
    Bounded {
        /// Maximum number of restarts permitted within the time window.
        max_restarts: u32,
        /// Time window in milliseconds over which `max_restarts` is enforced.
        within_ms: u64,
    },
}

impl SubscriberList {
    pub(crate) fn new() -> Self {
        Self {
            senders: Mutex::new(Vec::new()),
        }
    }

    /// Subscribe: create a new bounded channel, store the sender, return the receiver.
    pub(crate) fn subscribe(&self, capacity: usize) -> Receiver<Notification> {
        let (tx, rx) = flume::bounded(capacity);
        self.senders.lock().push(tx);
        rx
    }

    /// Broadcast: try_send to all, retain on Ok or Full, prune on Disconnected.
    /// NEVER use blocking send() — one slow subscriber must not block the writer.
    /// [DEP:flume::Sender::try_send] → Result<(), TrySendError<T>>
    /// [DEP:flume::TrySendError::Full] / [DEP:flume::TrySendError::Disconnected]
    pub(crate) fn broadcast(&self, notif: &Notification) {
        let mut guard = self.senders.lock();
        guard.retain(|tx| match tx.try_send(notif.clone()) {
            Ok(()) => true,
            Err(TrySendError::Full(_)) => true,
            Err(TrySendError::Disconnected(_)) => false,
        });
    }
}

impl ReactorSubscriberList {
    pub(crate) fn new() -> Self {
        Self {
            senders: Mutex::new(Vec::new()),
        }
    }

    pub(crate) fn subscribe(&self, capacity: usize) -> Receiver<CommittedEventEnvelope> {
        let (tx, rx) = flume::bounded(capacity);
        self.senders.lock().push(tx);
        rx
    }

    pub(crate) fn broadcast(&self, envelope: &CommittedEventEnvelope) {
        let mut guard = self.senders.lock();
        guard.retain(|tx| match tx.try_send(envelope.clone()) {
            Ok(()) => true,
            Err(TrySendError::Full(_)) => true,
            Err(TrySendError::Disconnected(_)) => false,
        });
    }
}

impl WriterHandle {
    /// Spawn the background writer thread.
    /// [SPEC:src/store/writer.rs — "batpak-writer-{hash}" thread]
    pub(crate) fn spawn(
        config: &Arc<StoreConfig>,
        index: &Arc<StoreIndex>,
        subscribers: &Arc<SubscriberList>,
        reactor_subscribers: &Arc<ReactorSubscriberList>,
        reader: &Arc<crate::store::reader::Reader>,
    ) -> Result<Self, StoreError> {
        // Fallible init — propagate errors to Store::open() caller
        std::fs::create_dir_all(&config.data_dir).map_err(StoreError::Io)?;
        let initial_segment_id = find_latest_segment_id(&config.data_dir).unwrap_or(0) + 1;
        let initial_segment = Segment::<Active>::create(&config.data_dir, initial_segment_id)?;

        let (tx, rx) = flume::bounded::<WriterCommand>(config.writer.channel_capacity);
        let subs = Arc::clone(subscribers);
        let reactor_subs = Arc::clone(reactor_subscribers);
        let cfg = Arc::clone(config);
        let idx = Arc::clone(index);
        let rdr = Arc::clone(reader);

        let thread_name = format!("batpak-writer-{:08x}", {
            let mut h: u64 = 0xcbf29ce484222325; // FNV-1a basis
            for b in config.data_dir.to_string_lossy().bytes() {
                h ^= b as u64;
                h = h.wrapping_mul(0x100000001b3); // FNV-1a prime
            }
            h
        });

        let mut builder = std::thread::Builder::new().name(thread_name);
        if let Some(stack_size) = config.writer.stack_size {
            builder = builder.stack_size(stack_size);
        }
        let thread = builder
            .spawn(move || {
                writer_thread_main(
                    WriterRuntime {
                        rx: &rx,
                        config: &cfg,
                        index: &idx,
                        subscribers: &subs,
                        reactor_subscribers: &reactor_subs,
                        reader: &rdr,
                    },
                    initial_segment,
                    initial_segment_id,
                );
            })
            .map_err(StoreError::Io)?;

        Ok(Self {
            tx,
            subscribers: Arc::clone(subscribers),
            reactor_subscribers: Arc::clone(reactor_subscribers),
            _thread: Some(thread),
        })
    }

    #[cfg(test)]
    pub(crate) fn from_parts_for_test(
        tx: Sender<WriterCommand>,
        subscribers: Arc<SubscriberList>,
    ) -> Self {
        Self {
            tx,
            subscribers,
            reactor_subscribers: Arc::new(ReactorSubscriberList::new()),
            _thread: None,
        }
    }

    // NOTE: No send_append() method here. Store::append() and Store::append_reaction()
    // in store/mod.rs create one-shot flume channels and send WriterCommands directly
    // via self.writer.tx.send(). This avoids an unnecessary abstraction layer.
    // WriterHandle.tx is pub(crate) for direct access. [SPEC:INVARIANTS item 4]
}

/// Writer's mutable runtime state, grouped to reduce handle_append param count.
struct WriterState<'a> {
    index: &'a StoreIndex,
    active_segment: &'a mut Segment<Active>,
    segment_id: &'a mut u64,
    config: &'a StoreConfig,
    subscribers: &'a SubscriberList,
    reactor_subscribers: &'a ReactorSubscriberList,
    /// Reader handle — updated on segment rotation so mmap dispatch is correct.
    reader: Arc<crate::store::reader::Reader>,
    /// Accumulates SIDX entries for the current active segment.
    /// Flushed as a footer on segment rotation and shutdown.
    sidx_collector: crate::store::sidx::SidxEntryCollector,
}

#[derive(Clone, Copy)]
struct WriterRuntime<'a> {
    rx: &'a Receiver<WriterCommand>,
    config: &'a StoreConfig,
    index: &'a StoreIndex,
    subscribers: &'a SubscriberList,
    reactor_subscribers: &'a ReactorSubscriberList,
    reader: &'a Arc<crate::store::reader::Reader>,
}

/// Writer thread entry point with panic recovery and restart logic.
/// Wraps writer_loop() in catch_unwind, implementing RestartPolicy.
/// The rx (command receiver) survives across restarts because it lives
/// outside catch_unwind. Segments are re-created on restart since the
/// previous one is dropped during unwind.
/// [SPEC:src/store/writer.rs — RestartPolicy enforcement]
fn writer_thread_main(
    runtime: WriterRuntime<'_>,
    initial_segment: Segment<Active>,
    initial_segment_id: u64,
) {
    let mut segment = initial_segment;
    let mut seg_id = initial_segment_id;
    let mut restarts: u32 = 0;
    let mut window_start = Instant::now();

    loop {
        let rdr = Arc::clone(runtime.reader);
        let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
            writer_loop(
                WriterRuntime {
                    rx: runtime.rx,
                    config: runtime.config,
                    index: runtime.index,
                    subscribers: runtime.subscribers,
                    reactor_subscribers: runtime.reactor_subscribers,
                    reader: &rdr,
                },
                segment,
                seg_id,
            );
        }));

        match result {
            Ok(()) => return, // clean shutdown via WriterCommand::Shutdown
            Err(panic_info) => {
                let panic_msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
                    (*s).to_string()
                } else if let Some(s) = panic_info.downcast_ref::<String>() {
                    s.clone()
                } else {
                    "unknown panic".to_string()
                };

                let budget_ok = match &runtime.config.writer.restart_policy {
                    RestartPolicy::Once => {
                        if restarts >= 1 {
                            false
                        } else {
                            restarts += 1;
                            true
                        }
                    }
                    RestartPolicy::Bounded {
                        max_restarts,
                        within_ms,
                    } => {
                        // Reset counter if window has elapsed
                        if window_start.elapsed() > std::time::Duration::from_millis(*within_ms) {
                            restarts = 0;
                            window_start = Instant::now();
                        }
                        if restarts >= *max_restarts {
                            false
                        } else {
                            restarts += 1;
                            true
                        }
                    }
                };

                if !budget_ok {
                    tracing::error!(
                        "writer restart budget exhausted — thread exiting. \
                         Last panic: {panic_msg}. Policy: {:?}",
                        runtime.config.writer.restart_policy
                    );
                    return;
                }

                tracing::warn!(
                    "writer panic — restarting ({restarts}/{max}). Panic: {panic_msg}",
                    max = match &runtime.config.writer.restart_policy {
                        RestartPolicy::Once => 1,
                        RestartPolicy::Bounded { max_restarts, .. } => *max_restarts,
                    }
                );

                // Re-create segment: the previous one was dropped during unwind.
                seg_id = find_latest_segment_id(&runtime.config.data_dir).unwrap_or(seg_id) + 1;
                segment = match Segment::<Active>::create(&runtime.config.data_dir, seg_id) {
                    Ok(s) => s,
                    Err(e) => {
                        tracing::error!(
                            "writer restart failed — cannot create segment: {e}. Thread exiting."
                        );
                        return;
                    }
                };
            }
        }
    }
}

/// The writer's main loop. Runs on the background thread.
/// The spawn closure owns the Arcs; this function borrows them.
fn writer_loop(
    runtime: WriterRuntime<'_>,
    mut active_segment: Segment<Active>,
    mut segment_id: u64,
) {
    let mut events_since_sync: u32 = 0;

    let mut state = WriterState {
        index: runtime.index,
        active_segment: &mut active_segment,
        segment_id: &mut segment_id,
        config: runtime.config,
        subscribers: runtime.subscribers,
        reactor_subscribers: runtime.reactor_subscribers,
        reader: Arc::clone(runtime.reader),
        sidx_collector: crate::store::sidx::SidxEntryCollector::new(),
    };

    // Main loop: recv commands, dispatch.
    for cmd in runtime.rx.iter() {
        match cmd {
            WriterCommand::Append {
                coord,
                event,
                kind,
                guards,
                respond,
            } => {
                // Process first command in batch.
                let result = state.handle_append(&coord, *event, kind, &guards);
                let _ = respond.send(result);
                events_since_sync += 1;

                // Group commit: drain additional pending Append commands before syncing.
                // group_commit_max_batch == 0 means unbounded drain (drain all pending).
                // group_commit_max_batch == 1 means no draining (backward compat, per-event).
                // group_commit_max_batch > 1 means drain up to (batch - 1) more.
                let extra_budget = if runtime.config.batch.group_commit_max_batch == 0 {
                    u32::MAX
                } else if runtime.config.batch.group_commit_max_batch == 1 {
                    0u32
                } else {
                    runtime
                        .config
                        .batch
                        .group_commit_max_batch
                        .saturating_sub(1)
                };
                let mut drained = 0u32;
                while drained < extra_budget {
                    match runtime.rx.try_recv() {
                        Ok(WriterCommand::Append {
                            coord: c2,
                            event: ev2,
                            kind: k2,
                            guards: g2,
                            respond: r2,
                        }) => {
                            let res2 = state.handle_append(&c2, *ev2, k2, &g2);
                            let _ = r2.send(res2);
                            events_since_sync += 1;
                            drained += 1;
                        }
                        Ok(WriterCommand::AppendBatch { items, respond: r }) => {
                            // Batches are atomic — drain them as a single unit.
                            let res = state.handle_append_batch(&items);
                            let _ = r.send(res);
                            events_since_sync += 1;
                            drained += 1;
                        }
                        Ok(WriterCommand::Sync { respond: r }) => {
                            // Sync mid-batch: honor immediately, stop draining.
                            let sr = state
                                .active_segment
                                .sync_with_mode(&runtime.config.sync.mode);
                            let _ = r.send(sr);
                            events_since_sync = 0;
                            break;
                        }
                        Ok(WriterCommand::Shutdown { respond: r }) => {
                            // Shutdown mid-batch: sync current batch, then exit.
                            // Propagate sync errors honestly — lifecycle honesty invariant.
                            let shutdown_result = if events_since_sync > 0 {
                                let sr = state
                                    .active_segment
                                    .sync_with_mode(&runtime.config.sync.mode);
                                if let Err(ref e) = sr {
                                    tracing::error!("group commit pre-shutdown sync: {e}");
                                }
                                sr
                            } else {
                                Ok(())
                            };
                            let _ = r.send(shutdown_result);
                            return;
                        }
                        #[cfg(feature = "dangerous-test-hooks")]
                        Ok(WriterCommand::PanicForTest { respond: r }) => {
                            // Don't panic mid-drain — acknowledge and stop draining.
                            // The test should send PanicForTest as a standalone command
                            // (through the main loop) not mid-batch. Panicking mid-drain
                            // would leave the batch partially synced with some callers
                            // never receiving their receipt.
                            let _ = r.send(Ok(()));
                            break;
                        }
                        Err(_) => break, // channel empty — batch complete
                    }
                }

                // Single fsync for the entire batch.
                if events_since_sync >= runtime.config.sync.every_n_events {
                    if let Err(e) = state
                        .active_segment
                        .sync_with_mode(&runtime.config.sync.mode)
                    {
                        tracing::error!("periodic sync failed: {e}");
                    }
                    events_since_sync = 0;
                }
            }
            WriterCommand::AppendBatch { items, respond } => {
                let result = state.handle_append_batch(&items);
                let _ = respond.send(result);
                events_since_sync += 1; // Batch counts as one sync event

                // Sync after batch if needed.
                if events_since_sync >= runtime.config.sync.every_n_events {
                    if let Err(e) = state
                        .active_segment
                        .sync_with_mode(&runtime.config.sync.mode)
                    {
                        tracing::error!("post-batch sync failed: {e}");
                    }
                    events_since_sync = 0;
                }
            }
            WriterCommand::Sync { respond } => {
                let result = state
                    .active_segment
                    .sync_with_mode(&runtime.config.sync.mode);
                let _ = respond.send(result);
                events_since_sync = 0;
            }
            WriterCommand::Shutdown { respond } => {
                // Drain up to shutdown_drain_limit queued commands.
                // [SPEC:src/store/writer.rs — Shutdown drain semantics]
                let mut drained = 0;
                while drained < runtime.config.writer.shutdown_drain_limit {
                    match runtime.rx.try_recv() {
                        Ok(WriterCommand::Append {
                            coord,
                            event,
                            kind,
                            guards,
                            respond: r,
                        }) => {
                            let result = state.handle_append(&coord, *event, kind, &guards);
                            let _ = r.send(result);
                            drained += 1;
                        }
                        Ok(WriterCommand::Shutdown { respond: r }) => {
                            let _ = r.send(Ok(()));
                        }
                        Ok(WriterCommand::Sync { respond: r }) => {
                            let _ = r.send(
                                state
                                    .active_segment
                                    .sync_with_mode(&runtime.config.sync.mode),
                            );
                        }
                        Ok(WriterCommand::AppendBatch { items, respond: r }) => {
                            // Drain batches during shutdown.
                            let res = state.handle_append_batch(&items);
                            let _ = r.send(res);
                            drained += 1;
                        }
                        // test-only: discard PanicForTest during shutdown drain
                        #[cfg(feature = "dangerous-test-hooks")]
                        Ok(WriterCommand::PanicForTest { respond: r }) => {
                            let _ = r.send(Ok(())); // discard during drain
                        }
                        Err(_) => break, // channel empty
                    }
                }
                // Write SIDX footer on active segment before shutdown sync.
                if let Err(e) = state
                    .active_segment
                    .write_sidx_footer(&state.sidx_collector)
                {
                    tracing::warn!("shutdown SIDX footer write failed (non-fatal): {e}");
                }
                let sync_result = state
                    .active_segment
                    .sync_with_mode(&runtime.config.sync.mode);
                if let Err(ref e) = sync_result {
                    tracing::error!("shutdown sync failed: {e}");
                }
                let _ = respond.send(sync_result);
                return; // exit writer loop
            }
            // test-only: intentional panic to exercise restart_policy
            #[cfg(feature = "dangerous-test-hooks")]
            // intentional: this panic IS the test - it exercises catch_unwind in writer_thread_main
            #[allow(clippy::panic)]
            // intentional: this panic IS the test — it exercises catch_unwind in writer_thread_main
            WriterCommand::PanicForTest { respond } => {
                // Acknowledge receipt before panicking so the test knows the command was processed.
                let _ = respond.send(Ok(()));
                panic!("PanicForTest: intentional writer panic for restart_policy testing");
            }
        }
    }
}

/// Options and guards for an append operation, passed through the channel.
/// CAS + idempotency checks execute on the single writer thread, so there
/// is no producer/producer race to guard against.
pub(crate) struct AppendGuards {
    pub correlation_id: u128,
    pub causation_id: Option<u128>,
    pub expected_sequence: Option<u32>,
    pub idempotency_key: Option<u128>,
}

/// Pre-computed per-item batch state shared between the precompute, write,
/// stage, and broadcast phases of `handle_append_batch`.
///
/// Every field is derived in `precompute_batch_items` BEFORE any frame is
/// written. The downstream phases (`write_batch_event_frames`,
/// `stage_batch_index_entries`) consume these values verbatim. This is
/// load-bearing: an earlier version computed `event_hash` only inside the
/// frame-write phase and reconstructed it from a shared scratch map in the
/// stage phase, which silently corrupted hash chains for batches with two or
/// more items on the same entity (the second item's `prev_hash` came from a
/// post-precompute `[0u8; 32]` placeholder, and every staged `event_hash`
/// collapsed to the entity's LAST item's hash because the map insert
/// overwrote per item). Capturing all per-item material here makes those
/// invariants impossible to break — there is no scratch map.
struct BatchItemComputed {
    global_seq: u64,
    clock: u32,
    /// Per-item monotonic wall_ms used by BOTH the on-disk header position AND
    /// the in-memory `IndexEntry.wall_ms`. Computed once with `max(last_ms)`
    /// clamping per entity (matches the single-append path) so a regressing
    /// injected/system clock cannot reorder `stream()` results.
    wall_ms: u64,
    /// Microsecond timestamp captured once at the top of
    /// `precompute_batch_items` and reused as the header `timestamp_us` for
    /// every item in the batch. Single-batch / single-`now_us()` semantics
    /// keep the on-disk and in-memory views byte-equivalent.
    wall_us: i64,
    prev_hash: [u8; 32],
    /// Blake3 of the payload, computed in `precompute_batch_items` so the
    /// next same-entity item can read it as its `prev_hash`. Without this,
    /// hash chains for multi-item same-entity batches break.
    event_hash: [u8; 32],
    event_id: u128,
    causation_id: Option<u128>,
}

impl WriterState<'_> {
    /// Check whether the active segment needs rotation, and if so, seal it,
    /// write its SIDX footer, sync, and create a new active segment.
    ///
    /// Returns `Ok(true)` if a rotation occurred, `Ok(false)` if no rotation
    /// was needed. On rotation, the SIDX collector is reset, the old segment
    /// is sealed, segment_id is advanced, and the reader is notified.
    ///
    /// Callers needing batch-specific error context should wrap with
    /// `.map_err(|e| StoreError::BatchFailed { ... })`.
    fn maybe_rotate_segment(&mut self) -> Result<bool, StoreError> {
        if !self
            .active_segment
            .needs_rotation(self.config.segment_max_bytes)
        {
            return Ok(false);
        }
        // Write SIDX footer before sealing. append_frames_from_segment now
        // strips SIDX data via detect_sidx_boundary, so this is safe.
        if let Err(e) = self.active_segment.write_sidx_footer(&self.sidx_collector) {
            tracing::warn!("SIDX footer write failed (non-fatal): {e}");
        }
        self.sidx_collector = crate::store::sidx::SidxEntryCollector::new();
        self.active_segment.sync_with_mode(&self.config.sync.mode)?;
        let old = std::mem::replace(
            self.active_segment,
            Segment::<Active>::create(&self.config.data_dir, *self.segment_id + 1)?,
        );
        let _sealed = old.seal();
        *self.segment_id += 1;
        // Notify the reader of the new active segment so mmap dispatch is correct.
        self.reader.set_active_segment(*self.segment_id);
        Ok(true)
    }

    /// STEPs 1-2: Validate batch size, total bytes, and reject CAS in batches.
    fn validate_batch(&self, items: &[BatchAppendItem]) -> Result<(), StoreError> {
        if items.len() > self.config.batch.max_size as usize {
            return Err(StoreError::BatchFailed {
                item_index: 0,
                stage: BatchStage::Validation,
                source: Box::new(StoreError::Configuration(format!(
                    "batch size {} exceeds max {}",
                    items.len(),
                    self.config.batch.max_size
                ))),
            });
        }
        let total_bytes: usize = items.iter().map(|i| i.payload_bytes.len()).sum();
        if total_bytes > self.config.batch.max_bytes as usize {
            return Err(StoreError::BatchFailed {
                item_index: 0,
                stage: BatchStage::Validation,
                source: Box::new(StoreError::Configuration(format!(
                    "batch bytes {} exceeds max {}",
                    total_bytes, self.config.batch.max_bytes
                ))),
            });
        }
        for (idx, item) in items.iter().enumerate() {
            if item.options.expected_sequence.is_some() {
                return Err(StoreError::BatchFailed {
                    item_index: idx,
                    stage: BatchStage::Validation,
                    source: Box::new(StoreError::Configuration(
                        "CAS not supported in batch append (v1)".into(),
                    )),
                });
            }
        }
        Ok(())
    }

    /// STEP 3: Preflight idempotency check.
    /// Returns `Ok(Some(receipts))` if every item is already committed (full replay),
    /// `Ok(None)` to proceed with the batch write, or `Err` for partial-replay ambiguity.
    fn preflight_batch_idempotency(
        &self,
        items: &[BatchAppendItem],
    ) -> Result<Option<Vec<AppendReceipt>>, StoreError> {
        let mut cached_receipts: Vec<Option<AppendReceipt>> = vec![None; items.len()];
        let mut cached_count = 0usize;
        for (idx, item) in items.iter().enumerate() {
            if let Some(key) = item.options.idempotency_key {
                if let Some(entry) = self.index.get_by_id(key) {
                    cached_receipts[idx] = Some(AppendReceipt {
                        event_id: entry.event_id,
                        sequence: entry.global_sequence,
                        disk_pos: entry.disk_pos,
                    });
                    cached_count += 1;
                }
            }
        }
        if cached_count == items.len() {
            return Ok(Some(
                cached_receipts
                    .into_iter()
                    .map(|r| r.expect("full replay: all cached_receipts verified as Some"))
                    .collect(),
            ));
        }
        if cached_count > 0 {
            return Err(StoreError::BatchFailed {
                item_index: cached_receipts
                    .iter()
                    .position(|r| r.is_none())
                    .unwrap_or(0),
                stage: BatchStage::Validation,
                source: Box::new(StoreError::Configuration(
                    "partial batch replay: some items already committed, some not".into(),
                )),
            });
        }
        Ok(None)
    }

    /// Pre-compute per-item global sequences, clocks, wall_ms, prev_hashes,
    /// event_hashes, event_ids, and causation. Builds intra-batch per-entity
    /// chains for clock, wall_ms, and hash so multi-item same-entity batches
    /// produce a continuous sequence and a continuous hash chain on disk.
    ///
    /// **Single timestamp invariant.** A single `now_us()` is captured at the
    /// top and reused for every item's `wall_us`. The corresponding `wall_ms`
    /// is `max(now_ms, entity_last_ms)` per entity to mirror the single-append
    /// monotonicity guard at `handle_append::STEP 4` — without this clamp, a
    /// regressing clock (mocked test clock, NTP slew) could reorder
    /// `stream()` results within a batch.
    ///
    /// **Eager hash invariant.** `event_hash` is computed here (not deferred
    /// to the frame-write phase) so the next same-entity item can read it as
    /// its `prev_hash`. Without this, the on-disk frame chain and the
    /// in-memory IndexEntry chain diverge — see `BatchItemComputed` for the
    /// historical incident this guards against.
    fn precompute_batch_items(
        &self,
        items: &[BatchAppendItem],
        first_seq: u64,
    ) -> Result<Vec<BatchItemComputed>, StoreError> {
        let mut computed: Vec<BatchItemComputed> = Vec::with_capacity(items.len());
        let mut entity_prev_hashes: std::collections::HashMap<Arc<str>, [u8; 32]> =
            std::collections::HashMap::new();
        let mut entity_batch_clocks: std::collections::HashMap<Arc<str>, u32> =
            std::collections::HashMap::new();
        let mut entity_batch_wall_ms: std::collections::HashMap<Arc<str>, u64> =
            std::collections::HashMap::new();

        // Single timestamp for the entire batch (see Single timestamp invariant
        // above). Header `timestamp_us` and the IndexEntry `wall_ms` are both
        // derived from this one capture.
        let now_us = self.config.now_us();
        #[allow(clippy::cast_sign_loss)] // timestamp_us is always positive (from SystemTime)
        let now_ms = (now_us / 1000) as u64;

        for (idx, item) in items.iter().enumerate() {
            let entity: Arc<str> = Arc::from(item.coord.entity());

            // prev_hash: previous batch item if same entity, else the index's
            // latest entry for the entity, else genesis [0; 32].
            let prev_hash = if let Some(&hash) = entity_prev_hashes.get(&entity) {
                hash
            } else {
                self.index
                    .get_latest(&entity)
                    .map(|e| e.hash_chain.event_hash)
                    .unwrap_or([0u8; 32])
            };

            // clock: monotonic per entity, +1 from prior batch item or index.
            let clock = if let Some(&last_clock) = entity_batch_clocks.get(&entity) {
                last_clock + 1
            } else {
                self.index
                    .get_latest(&entity)
                    .map(|e| e.clock + 1)
                    .unwrap_or(0)
            };
            entity_batch_clocks.insert(Arc::clone(&entity), clock);

            // wall_ms: monotonic per entity. The clamp source must consider
            // BOTH the index AND prior batch items on the same entity — a
            // batch-internal clock regression would otherwise reorder
            // BTreeMap entries in `StoreIndex::streams`.
            let last_ms = entity_batch_wall_ms
                .get(&entity)
                .copied()
                .unwrap_or_else(|| {
                    self.index
                        .get_latest(&entity)
                        .map(|e| e.wall_ms)
                        .unwrap_or(0)
                });
            let wall_ms = now_ms.max(last_ms);
            entity_batch_wall_ms.insert(Arc::clone(&entity), wall_ms);

            let event_id = uuid::Uuid::now_v7().as_u128();

            let causation_id = match item.causation {
                CausationRef::None => item.options.causation_id,
                CausationRef::Absolute(id) => Some(id),
                CausationRef::PriorItem(prior_idx) => {
                    if prior_idx >= idx {
                        return Err(StoreError::BatchFailed {
                            item_index: idx,
                            stage: BatchStage::Validation,
                            source: Box::new(StoreError::Configuration(
                                "PriorItem causation must reference earlier batch item".into(),
                            )),
                        });
                    }
                    Some(computed[prior_idx].event_id)
                }
            };

            // Compute event_hash NOW (eager hash invariant — see fn doc).
            #[cfg(feature = "blake3")]
            let event_hash = crate::event::hash::compute_hash(&item.payload_bytes);
            #[cfg(not(feature = "blake3"))]
            let event_hash = [0u8; 32];

            // Populate the prev_hash source for the next same-entity item
            // with the ACTUAL hash (was a `[0u8; 32]` placeholder before,
            // which broke the chain).
            entity_prev_hashes.insert(entity, event_hash);

            let global_seq = first_seq + idx as u64;
            computed.push(BatchItemComputed {
                global_seq,
                clock,
                wall_ms,
                wall_us: now_us,
                prev_hash,
                event_hash,
                event_id,
                causation_id,
            });
        }
        Ok(computed)
    }

    /// Encode and write a batch marker frame (BEGIN or COMMIT).
    /// Handles segment rotation before the write. Returns the frame offset.
    fn write_batch_marker_frame(
        &mut self,
        batch_id: u64,
        kind: EventKind,
        payload_size: u32,
        item_index_for_error: usize,
    ) -> Result<u64, StoreError> {
        let now_us = self.config.now_us();
        let header = EventHeader::new(
            batch_id as u128,
            batch_id as u128,
            None,
            now_us,
            #[allow(clippy::cast_sign_loss)] // timestamp_us is always positive (from SystemTime)
            DagPosition::child_at(0, (now_us / 1000) as u64, 0),
            payload_size,
            kind,
        );
        let event = Event::new(header, Vec::<u8>::new());
        let payload = crate::store::segment::FramePayloadRef {
            event: &event,
            entity: BATCH_MARKER_ENTITY,
            scope: BATCH_MARKER_SCOPE,
        };
        let frame = segment::frame_encode(&payload).map_err(|e| StoreError::BatchFailed {
            item_index: item_index_for_error,
            stage: BatchStage::Encoding,
            source: Box::new(e),
        })?;

        self.maybe_rotate_segment()
            .map_err(|e| StoreError::BatchFailed {
                item_index: item_index_for_error,
                stage: BatchStage::Syncing,
                source: Box::new(e),
            })?;

        let offset =
            self.active_segment
                .write_frame(&frame)
                .map_err(|e| StoreError::BatchFailed {
                    item_index: item_index_for_error,
                    stage: BatchStage::Writing,
                    source: Box::new(e),
                })?;
        Ok(offset)
    }

    /// The 10-step commit protocol.
    /// [SPEC:src/store/writer.rs — handle_append]
    fn handle_append(
        &mut self,
        coord: &Coordinate,
        mut event: Event<Vec<u8>>,
        kind: EventKind,
        guards: &AppendGuards,
    ) -> Result<AppendReceipt, StoreError> {
        let correlation_id = guards.correlation_id;
        let causation_id = guards.causation_id;
        let entity = coord.entity();
        let scope = coord.scope();

        // STEP 1: Read latest entry. No lock needed: this function runs on the
        // single writer thread, which is the only writer of the index. There
        // is no producer/producer race to guard against.
        let latest = self.index.get_latest(entity);

        // STEP 1a: CAS check.
        if let Some(expected) = guards.expected_sequence {
            let actual = latest.as_ref().map(|entry| entry.clock).unwrap_or(0);
            if actual != expected {
                return Err(StoreError::SequenceMismatch {
                    entity: entity.to_string(),
                    expected,
                    actual,
                });
            }
        }

        // STEP 1b: Idempotency check.
        if let Some(key) = guards.idempotency_key {
            if let Some(entry) = self.index.get_by_id(key) {
                return Ok(AppendReceipt {
                    event_id: entry.event_id,
                    sequence: entry.global_sequence,
                    disk_pos: entry.disk_pos,
                });
            }
        }

        // STEP 2: Get prev_hash from index (or [0u8;32] for genesis).
        // Clone the value out of the DashMap Ref immediately.
        let prev_hash = latest
            .as_ref()
            .map(|entry| entry.hash_chain.event_hash)
            .unwrap_or([0u8; 32]);

        // STEP 3: Compute sequence (latest.clock + 1, or 0).
        let clock = latest.as_ref().map(|entry| entry.clock + 1).unwrap_or(0);

        // STEP 4: Set event header position with HLC wall clock.
        // Ensure wall_ms is monotonically non-decreasing per entity to prevent
        // BTreeMap reordering on clock regression.
        #[allow(clippy::cast_sign_loss)] // timestamp_us is always positive (from SystemTime)
        let raw_ms = (event.header.timestamp_us / 1000) as u64;
        let last_ms = latest.as_ref().map(|entry| entry.wall_ms).unwrap_or(0);
        let now_ms = raw_ms.max(last_ms);
        let position = DagPosition::child_at(clock, now_ms, 0);
        event.header.position = position;
        event.header.event_kind = kind;
        event.header.correlation_id = correlation_id;
        event.header.causation_id = causation_id;

        // STEP 5: Compute blake3 hash, set hash chain (or skip if feature off).
        // [SPEC:INVARIANTS item 5 — blake3 only]
        #[cfg(feature = "blake3")]
        let event_hash = crate::event::hash::compute_hash(&event.payload);
        #[cfg(not(feature = "blake3"))]
        let event_hash = [0u8; 32];

        event.hash_chain = Some(HashChain {
            prev_hash,
            event_hash,
        });
        // Set content_hash on header for projection cache auto-invalidation.
        event.header.content_hash = event_hash;

        // STEP 6: Serialize to MessagePack + CRC32 frame.
        // [SPEC:WIRE FORMAT DECISIONS — rmp_serde::to_vec_named() ALWAYS]
        let frame_payload = FramePayloadRef {
            event: &event,
            entity,
            scope,
        };
        let frame = segment::frame_encode(&frame_payload)?;

        // STEP 7: Check segment rotation.
        if self.maybe_rotate_segment()? {
            info!(segment_id = *self.segment_id, "segment rotated");
        }

        // STEP 8: Write frame to segment file.
        let offset = self.active_segment.write_frame(&frame)?;
        trace!(offset = offset, len = frame.len(), "frame written");

        // STEP 9: Update index.
        let global_seq = self.index.global_sequence();
        let disk_pos = DiskPos {
            segment_id: *self.segment_id,
            offset,
            #[allow(clippy::cast_possible_truncation)] // checked_payload_len already verified < u32::MAX
            length: frame.len() as u32,
        };
        let entity_id = self.index.interner.intern(entity);
        let scope_id = self.index.interner.intern(scope);
        let entry = IndexEntry {
            event_id: event.header.event_id,
            correlation_id,
            causation_id,
            coord: coord.clone(),
            entity_id,
            scope_id,
            kind,
            wall_ms: now_ms,
            clock,
            hash_chain: event.hash_chain.clone().unwrap_or_default(),
            disk_pos,
            global_sequence: global_seq,
        };
        self.index.insert(entry);

        // Publish: make this entry visible to concurrent readers.
        // Explicit boundary: the entry has global_sequence == global_seq,
        // so visible_sequence must advance to global_seq + 1.
        self.index.publish(global_seq + 1);

        // Record SIDX entry for the segment footer (written on rotation/shutdown).
        let hash_chain_ref = event.hash_chain.as_ref();
        let sidx_entry = crate::store::sidx::SidxEntry {
            event_id: event.header.event_id,
            entity_idx: 0, // filled by collector.record()
            scope_idx: 0,  // filled by collector.record()
            kind: crate::store::sidx::kind_to_raw(kind),
            wall_ms: now_ms,
            clock,
            prev_hash: hash_chain_ref.map(|h| h.prev_hash).unwrap_or([0u8; 32]),
            event_hash: hash_chain_ref.map(|h| h.event_hash).unwrap_or([0u8; 32]),
            frame_offset: offset,
            #[allow(clippy::cast_possible_truncation)] // frame.len() checked by checked_payload_len
            frame_length: frame.len() as u32,
            global_sequence: global_seq,
            correlation_id,
            causation_id: causation_id.unwrap_or(0),
        };
        self.sidx_collector.record(sidx_entry, entity, scope);

        debug!(event_id = %event.header.event_id, clock = clock, "append committed");

        // STEP 10: Broadcast notification to subscribers.
        self.subscribers.broadcast(&Notification {
            event_id: event.header.event_id,
            correlation_id,
            causation_id,
            coord: coord.clone(),
            kind,
            sequence: global_seq,
        });
        if let Ok(envelope) = self.single_event_envelope(coord.clone(), &event, global_seq) {
            self.reactor_subscribers.broadcast(&envelope);
        }

        Ok(AppendReceipt {
            event_id: event.header.event_id,
            sequence: global_seq,
            disk_pos,
        })
    }

    /// Batch append protocol: atomic multi-event commit with SYSTEM_BATCH_BEGIN envelope.
    /// [SPEC:src/store/writer.rs — handle_append_batch]
    fn handle_append_batch(
        &mut self,
        items: &[BatchAppendItem],
    ) -> Result<Vec<AppendReceipt>, StoreError> {
        // STEPs 1-2: Validate size, bytes, and reject CAS.
        self.validate_batch(items)?;

        // STEP 3: Preflight idempotency. Full replay returns cached receipts;
        // partial replay errors out; clean batch proceeds.
        if let Some(cached) = self.preflight_batch_idempotency(items)? {
            return Ok(cached);
        }

        // STEPs 4-5: (no locks needed) — single writer thread owns all
        // index mutation. The previous per-entity Mutex was a leftover from
        // a multi-writer design and added overhead with no concurrency benefit.

        // STEP 6: Generate batch_id and reserve global sequences.
        let batch_id = self.index.global_sequence();
        let first_seq = self.index.reserve_sequences(items.len() as u64);

        // FAULT INJECTION: Batch start
        #[cfg(feature = "dangerous-test-hooks")]
        crate::store::fault::maybe_inject(
            crate::store::fault::InjectionPoint::BatchStart {
                batch_id,
                item_count: items.len(),
            },
            &self.config.fault_injector,
        )?;

        // STEP 7: Pre-compute per-item global sequences, clocks, prev_hashes,
        // event_ids, and intra-batch causation chains.
        let computed = self.precompute_batch_items(items, first_seq)?;

        // STEP 8: Write SYSTEM_BATCH_BEGIN marker. Stores batch count in payload_size.
        // batch_max_size validation guarantees items.len() <= u32::MAX.
        #[allow(clippy::cast_possible_truncation)]
        let batch_count = items.len() as u32;
        let marker_offset =
            self.write_batch_marker_frame(batch_id, EventKind::SYSTEM_BATCH_BEGIN, batch_count, 0)?;
        trace!(batch_id, offset = marker_offset, "batch marker written");

        // FAULT INJECTION: After BEGIN marker written
        #[cfg(feature = "dangerous-test-hooks")]
        crate::store::fault::maybe_inject(
            crate::store::fault::InjectionPoint::BatchBeginWritten {
                batch_id,
                item_count: items.len(),
            },
            &self.config.fault_injector,
        )?;

        // STEP 9: Write all event frames. Returns offsets and receipts;
        // every per-item value the stage step needs (`prev_hash`,
        // `event_hash`, `wall_ms`, `clock`) was already locked in by
        // `precompute_batch_items`.
        let (frame_offsets, receipts) =
            self.write_batch_event_frames(items, &computed, batch_id)?;

        // FAULT INJECTION: All batch items complete
        #[cfg(feature = "dangerous-test-hooks")]
        crate::store::fault::maybe_inject(
            crate::store::fault::InjectionPoint::BatchItemsComplete {
                batch_id,
                item_count: items.len(),
            },
            &self.config.fault_injector,
        )?;

        // STEP 10: Write SYSTEM_BATCH_COMMIT marker (two-phase commit).
        let _commit_offset = self.write_batch_marker_frame(
            batch_id,
            EventKind::SYSTEM_BATCH_COMMIT,
            0,
            items.len() - 1,
        )?;
        trace!(batch_id, "batch commit marker written");

        // FAULT INJECTION: After COMMIT written, before fsync
        #[cfg(feature = "dangerous-test-hooks")]
        crate::store::fault::maybe_inject(
            crate::store::fault::InjectionPoint::BatchCommitWritten { batch_id },
            &self.config.fault_injector,
        )?;

        // STEP 11: Sync to disk (atomic durability point).
        // If this fails, the batch may be partially on disk but without the
        // commit marker. Recovery will discard incomplete batches.

        // FAULT INJECTION: During fsync
        #[cfg(feature = "dangerous-test-hooks")]
        crate::store::fault::maybe_inject(
            crate::store::fault::InjectionPoint::BatchFsync { batch_id },
            &self.config.fault_injector,
        )?;

        self.active_segment
            .sync_with_mode(&self.config.sync.mode)
            .map_err(|e| StoreError::BatchFailed {
                item_index: items.len() - 1,
                stage: BatchStage::Syncing,
                source: Box::new(e),
            })?;

        // STEP 12: Build index entries from the precomputed data + frame offsets.
        let staged_entries =
            self.stage_batch_index_entries(items, &computed, &frame_offsets, &receipts)?;

        // FAULT INJECTION: Before atomic publish to index
        #[cfg(feature = "dangerous-test-hooks")]
        crate::store::fault::maybe_inject(
            crate::store::fault::InjectionPoint::BatchPrePublish {
                batch_id,
                item_count: items.len(),
            },
            &self.config.fault_injector,
        )?;

        // STEP 13: Insert all entries into the in-memory index, then publish
        // atomically. Entries occupy [first_seq, first_seq + items.len()).
        self.index.insert_batch(staged_entries);
        #[allow(clippy::cast_possible_truncation)] // items.len() bounded by batch_max_size (u32)
        self.index.publish(first_seq + items.len() as u64);

        // STEP 14: Broadcast notifications. A subscriber that reacts by calling
        // query/get will now see the full batch (publish happened first).
        self.broadcast_batch_notifications(items, &computed)?;

        debug!(batch_id, count = items.len(), "batch committed");
        Ok(receipts)
    }

    /// STEP 9: Write all event frames for the batch. Returns frame offsets and
    /// per-item receipts. All per-item state (`prev_hash`, `event_hash`,
    /// `wall_us`, etc.) is taken verbatim from the precomputed
    /// `BatchItemComputed` slice — this function does NOT recompute hashes,
    /// timestamps, or chain links. See the `BatchItemComputed` doc for the
    /// historical incident behind this discipline.
    fn write_batch_event_frames(
        &mut self,
        items: &[BatchAppendItem],
        computed: &[BatchItemComputed],
        batch_id: u64,
    ) -> Result<(Vec<u64>, Vec<AppendReceipt>), StoreError> {
        let mut frame_offsets: Vec<u64> = Vec::with_capacity(items.len());
        let mut receipts: Vec<AppendReceipt> = Vec::with_capacity(items.len());

        for (idx, item) in items.iter().enumerate() {
            let c = &computed[idx];
            let global_seq = c.global_seq;
            let clock = c.clock;
            let prev_hash = c.prev_hash;
            let event_hash = c.event_hash;
            let event_id = c.event_id;
            let causation_id = c.causation_id;
            let wall_us = c.wall_us;
            let wall_ms = c.wall_ms;

            // Build event header. wall_us / wall_ms come from the single
            // batch-level capture in precompute, with per-entity monotonicity
            // already applied to wall_ms.
            let position = DagPosition::child_at(clock, wall_ms, 0);
            let correlation_id = item.options.correlation_id.unwrap_or(event_id);

            let header = EventHeader::new(
                event_id,
                correlation_id,
                causation_id,
                wall_us,
                position,
                // Payload sizes bounded by batch_max_bytes validation
                #[allow(clippy::cast_possible_truncation)]
                {
                    item.payload_bytes.len() as u32
                },
                item.kind,
            );

            // Build event using the precomputed event_hash. The frame's
            // prev_hash MUST be the precomputed value so multi-item
            // same-entity batches produce a continuous on-disk hash chain.
            let mut event = Event::new(header, item.payload_bytes.clone());
            event.hash_chain = Some(HashChain {
                prev_hash,
                event_hash,
            });
            event.header.content_hash = event_hash;

            // Encode frame.
            let frame_payload = FramePayloadRef {
                event: &event,
                entity: item.coord.entity(),
                scope: item.coord.scope(),
            };
            let frame =
                segment::frame_encode(&frame_payload).map_err(|e| StoreError::BatchFailed {
                    item_index: idx,
                    stage: BatchStage::Encoding,
                    source: Box::new(e),
                })?;

            // Check segment rotation.
            self.maybe_rotate_segment()
                .map_err(|e| StoreError::BatchFailed {
                    item_index: idx,
                    stage: BatchStage::Syncing,
                    source: Box::new(e),
                })?;

            // Write frame.
            let offset =
                self.active_segment
                    .write_frame(&frame)
                    .map_err(|e| StoreError::BatchFailed {
                        item_index: idx,
                        stage: BatchStage::Writing,
                        source: Box::new(e),
                    })?;
            frame_offsets.push(offset);

            // Build receipt (index update happens after all writes succeed).
            let disk_pos = DiskPos {
                segment_id: *self.segment_id,
                offset,
                #[allow(clippy::cast_possible_truncation)] // frame size bounded by segment_max_bytes
                length: frame.len() as u32,
            };
            receipts.push(AppendReceipt {
                event_id,
                sequence: global_seq,
                disk_pos,
            });

            // FAULT INJECTION: After each batch item written
            #[cfg(feature = "dangerous-test-hooks")]
            crate::store::fault::maybe_inject(
                crate::store::fault::InjectionPoint::BatchItemWritten {
                    batch_id,
                    item_index: idx,
                    total_items: items.len(),
                },
                &self.config.fault_injector,
            )?;
        }
        // Suppress unused warning when dangerous-test-hooks is disabled.
        let _ = batch_id;

        Ok((frame_offsets, receipts))
    }

    /// STEP 12: Build IndexEntry vec from precomputed data + write outputs.
    /// Also records SIDX entries for the segment footer. Per-item state
    /// (`prev_hash`, `event_hash`, `wall_ms`, `clock`) is taken verbatim from
    /// `BatchItemComputed`. The map-based hash lookup that lived here
    /// previously was wrong: it returned the entity's LAST item's hash for
    /// every staged entry on that entity, silently corrupting in-memory and
    /// SIDX hash chains for multi-item same-entity batches.
    fn stage_batch_index_entries(
        &mut self,
        items: &[BatchAppendItem],
        computed: &[BatchItemComputed],
        frame_offsets: &[u64],
        receipts: &[AppendReceipt],
    ) -> Result<Vec<IndexEntry>, StoreError> {
        let mut staged_entries: Vec<IndexEntry> = Vec::with_capacity(items.len());
        for (idx, item) in items.iter().enumerate() {
            let c = &computed[idx];
            let global_seq = c.global_seq;
            let clock = c.clock;
            let prev_hash = c.prev_hash;
            let event_hash = c.event_hash;
            let wall_ms = c.wall_ms;
            let event_id = c.event_id;
            let causation_id = c.causation_id;
            let offset = frame_offsets[idx];

            let entity: Arc<str> = Arc::from(item.coord.entity());
            let scope: Arc<str> = Arc::from(item.coord.scope());

            // Use disk_pos captured at write time — if rotation happened mid-batch,
            // earlier items live on a prior segment.
            let disk_pos = receipts[idx].disk_pos;
            let coord =
                Coordinate::new(entity.as_ref(), scope.as_ref()).map_err(StoreError::Coordinate)?;
            let entity_id = self.index.interner.intern(entity.as_ref());
            let scope_id = self.index.interner.intern(scope.as_ref());

            let entry = IndexEntry {
                event_id,
                correlation_id: item.options.correlation_id.unwrap_or(event_id),
                causation_id,
                coord: coord.clone(),
                entity_id,
                scope_id,
                kind: item.kind,
                wall_ms,
                clock,
                hash_chain: HashChain {
                    prev_hash,
                    event_hash,
                },
                disk_pos,
                global_sequence: global_seq,
            };

            staged_entries.push(entry);

            // Record SIDX entry.
            let sidx_entry = crate::store::sidx::SidxEntry {
                event_id,
                entity_idx: 0,
                scope_idx: 0,
                kind: crate::store::sidx::kind_to_raw(item.kind),
                wall_ms,
                clock,
                prev_hash,
                event_hash,
                frame_offset: offset,
                frame_length: receipts[idx].disk_pos.length,
                global_sequence: global_seq,
                correlation_id: item.options.correlation_id.unwrap_or(event_id),
                causation_id: causation_id.unwrap_or(0),
            };
            self.sidx_collector
                .record(sidx_entry, entity.as_ref(), scope.as_ref());
        }
        Ok(staged_entries)
    }

    /// STEP 14: Broadcast a Notification for each item in the committed batch.
    fn broadcast_batch_notifications(
        &self,
        items: &[BatchAppendItem],
        computed: &[BatchItemComputed],
    ) -> Result<(), StoreError> {
        for (idx, item) in items.iter().enumerate() {
            let c = &computed[idx];
            let global_seq = c.global_seq;
            let event_id = c.event_id;
            let causation_id = c.causation_id;
            let entity: Arc<str> = Arc::from(item.coord.entity());
            let scope: Arc<str> = Arc::from(item.coord.scope());
            let coord =
                Coordinate::new(entity.as_ref(), scope.as_ref()).map_err(StoreError::Coordinate)?;

            self.subscribers.broadcast(&Notification {
                event_id,
                correlation_id: item.options.correlation_id.unwrap_or(event_id),
                causation_id,
                coord,
                kind: item.kind,
                sequence: global_seq,
            });
            if let Ok(envelope) = self.batch_event_envelope(item, c, global_seq) {
                self.reactor_subscribers.broadcast(&envelope);
            }
        }
        Ok(())
    }

    fn single_event_envelope(
        &self,
        coord: Coordinate,
        event: &Event<Vec<u8>>,
        sequence: u64,
    ) -> Result<CommittedEventEnvelope, StoreError> {
        let notification = Notification {
            event_id: event.header.event_id,
            correlation_id: event.header.correlation_id,
            causation_id: event.header.causation_id,
            coord: coord.clone(),
            kind: event.header.event_kind,
            sequence,
        };
        let payload = rmp_serde::from_slice::<serde_json::Value>(&event.payload)
            .map_err(|e| StoreError::Serialization(Box::new(e)))?;
        Ok(CommittedEventEnvelope {
            notification,
            stored: StoredEvent {
                coordinate: coord,
                event: Event {
                    header: event.header.clone(),
                    payload,
                    hash_chain: event.hash_chain.clone(),
                },
            },
        })
    }

    fn batch_event_envelope(
        &self,
        item: &BatchAppendItem,
        computed: &BatchItemComputed,
        sequence: u64,
    ) -> Result<CommittedEventEnvelope, StoreError> {
        let event_id = computed.event_id;
        let correlation_id = item.options.correlation_id.unwrap_or(event_id);
        let coord = item.coord.clone();
        let header = EventHeader::new(
            event_id,
            correlation_id,
            computed.causation_id,
            computed.wall_us,
            DagPosition::child_at(computed.clock, computed.wall_ms, 0),
            u32::try_from(item.payload_bytes.len())
                .map_err(|_| StoreError::ser_msg("batch payload exceeds u32"))?,
            item.kind,
        )
        .with_flags(item.options.flags);
        let mut event = Event::new(
            header,
            rmp_serde::from_slice::<serde_json::Value>(&item.payload_bytes)
                .map_err(|e| StoreError::Serialization(Box::new(e)))?,
        );
        event.hash_chain = Some(HashChain {
            prev_hash: computed.prev_hash,
            event_hash: computed.event_hash,
        });
        event.header.content_hash = computed.event_hash;

        Ok(CommittedEventEnvelope {
            notification: Notification {
                event_id,
                correlation_id,
                causation_id: computed.causation_id,
                coord: coord.clone(),
                kind: item.kind,
                sequence,
            },
            stored: StoredEvent {
                coordinate: coord,
                event,
            },
        })
    }
}

/// Find the latest segment ID by scanning data_dir for .fbat files.
pub(crate) fn find_latest_segment_id(dir: &std::path::Path) -> Option<u64> {
    std::fs::read_dir(dir)
        .ok()?
        .filter_map(|e| e.ok())
        .filter_map(|e| {
            let name = e.file_name();
            let name = name.to_str()?;
            if name.ends_with(".fbat") {
                name.trim_end_matches(".fbat").parse::<u64>().ok()
            } else {
                None
            }
        })
        .max()
}