batpak 0.9.0

Event sourcing with causal graphs and caller-defined gates. Sync API, no async runtime.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
// Intentional impossible-feature guard: exponential backoff belongs in the
// product supervisor, not the library (ADR-0006: only Once and Bounded restart
// policies). The `exponential-backoff` feature is deliberately undeclared in
// Cargo.toml; build.rs registers the cfg via `cargo::rustc-check-cfg` so this
// compile_error tripwire fires only if someone adds the feature, warning-free.
#[cfg(feature = "exponential-backoff")]
compile_error!(
    "Red flag: only Once and Bounded restart policies. \
     Exponential backoff belongs in the product's supervisor, not here. \
     See 08_CIRCUITS.md."
);

pub use super::fanout::Notification;
use super::fanout::{ReactorSubscriberList, SubscriberList};
use super::staging::{StagedCommitMeta, StagedCommitTiming, StagedCommittedEvent};
use crate::coordinate::{Coordinate, DagPosition};
use crate::event::{Event, EventHeader, EventKind, HashChain};
use crate::store::append::BatchAppendItem;
use crate::store::config::ValidatedStoreConfig;
use crate::store::index::{DiskPos, StoreIndex};
use crate::store::segment::sidx::kind_to_raw;
use crate::store::segment::{self, Active, FramePayloadRef, Segment};
#[cfg(test)]
use crate::store::SystemClock;
use crate::store::{AppendReceipt, StoreConfig, StoreError};
use flume::{Receiver, Sender};
use std::sync::Arc;
mod append;
mod batch;
#[cfg(all(test, feature = "dangerous-test-hooks", feature = "payload-encryption"))]
mod batch_fence_crash_tests;
#[cfg(feature = "payload-encryption")]
mod encrypt;
mod fence_runtime;
mod publish;
mod runtime;
mod watermark;

pub(crate) use self::append::AppendGuards;
use self::fence_runtime::{CommandResult, DeferredReply, FenceLedger};
pub(super) use self::runtime::checked_next_clock;
pub(crate) use self::runtime::find_latest_segment_id;
#[cfg(feature = "dangerous-test-hooks")]
use self::runtime::DriveStep;
use self::runtime::{writer_thread_main, writer_thread_name, WriterRuntime};
pub(crate) use self::watermark::{WatermarkAdvanceHandle, WatermarkState};

pub(super) fn ignore_closed_response_channel<T>(result: Result<(), flume::SendError<T>>) {
    drop(result);
}

/// WriterCommand: messages sent to the background writer thread via flume.
/// All respond channels use `flume::Sender`: sync send from the writer, async recv from callers.
pub(crate) enum WriterCommand {
    BeginVisibilityFence {
        token: u64,
        respond: Sender<Result<(), StoreError>>,
    },
    Append {
        coord: Coordinate,
        event: Box<Event<Vec<u8>>>, // pre-serialized payload as msgpack bytes
        kind: EventKind,
        guards: AppendGuards,
        respond: Sender<Result<AppendReceipt, StoreError>>,
    },
    FenceAppend {
        token: u64,
        coord: Coordinate,
        event: Box<Event<Vec<u8>>>,
        kind: EventKind,
        guards: AppendGuards,
        respond: Sender<Result<AppendReceipt, StoreError>>,
    },
    AppendBatch {
        items: Vec<BatchAppendItem>,
        respond: Sender<Result<Vec<AppendReceipt>, StoreError>>,
    },
    FenceAppendBatch {
        token: u64,
        items: Vec<BatchAppendItem>,
        respond: Sender<Result<Vec<AppendReceipt>, StoreError>>,
    },
    CommitVisibilityFence {
        token: u64,
        respond: Sender<Result<(), StoreError>>,
    },
    CancelVisibilityFence {
        token: u64,
        respond: Sender<Result<(), StoreError>>,
    },
    Sync {
        respond: Sender<Result<(), StoreError>>,
    },
    Shutdown {
        respond: Sender<Result<(), StoreError>>,
    },
    /// Test-only: trigger a panic in the writer thread to exercise restart_policy.
    #[cfg(feature = "dangerous-test-hooks")]
    #[doc(hidden)]
    PanicForTest {
        respond: Sender<Result<(), StoreError>>,
    },
}

/// WriterHandle: owned by Store. Communicates with the background thread.
pub(crate) struct WriterHandle {
    pub tx: Sender<WriterCommand>,
    pub subscribers: Arc<SubscriberList>,
    pub reactor_subscribers: Arc<ReactorSubscriberList>,
    watermark_handle: WatermarkAdvanceHandle,
    drive: WriterDrive,
}

/// Writer state owned by the cooperative pump: the same [`WriterCore`] the
/// spawned writer thread owns, plus the `events_since_sync` counter the threaded
/// path keeps as a `writer_loop` local. Bundled so the [`Mutex`] guards a single
/// unit and the pump can split-borrow both halves through one lock.
#[cfg(feature = "dangerous-test-hooks")]
struct CoopState {
    core: WriterCore,
    events_since_sync: u32,
}

/// Cheap-clone handle that drives the writer inline by draining the command
/// queue on the calling thread. There is NO writer thread in cooperative mode.
///
/// Single-threaded by construction: the only caller is the reply-await funnel,
/// which pumps before it blocks on a receive. The `Mutex` exists purely for the
/// `Send + Sync` soundness the handle needs (it is stored in the `WriterHandle`
/// and reached from `Store` methods), mirroring `SimScheduler`; it is never
/// actually contended.
#[cfg(feature = "dangerous-test-hooks")]
#[derive(Clone)]
pub(crate) struct CooperativePump {
    state: std::sync::Arc<std::sync::Mutex<CoopState>>,
    // flume `Receiver` is `Clone` (a shared consumer), so cloning the pump shares
    // the one command queue rather than forking it.
    rx: Receiver<WriterCommand>,
    validated_cfg: Arc<ValidatedStoreConfig>,
    config: Arc<StoreConfig>,
}

#[cfg(feature = "dangerous-test-hooks")]
impl CooperativePump {
    /// Drain every currently-queued command through the shared drive, inline on
    /// the calling thread. Single-threaded by construction; the lock recovery
    /// (`PoisonError::into_inner`) is the established lint-clean pattern from
    /// `sim/scheduler.rs` and is never reached because the lock is uncontended.
    pub(crate) fn pump(&self) {
        let mut guard = self
            .state
            .lock()
            .unwrap_or_else(std::sync::PoisonError::into_inner);
        // Split borrow: hand `drive_command` a `&mut WriterCore` and a
        // `&mut u32` from the same guard without re-borrowing the whole struct.
        let CoopState {
            core,
            events_since_sync,
        } = &mut *guard;
        while let Ok(cmd) = self.rx.try_recv() {
            if let DriveStep::Exit = core.drive_command(
                &self.rx,
                &self.validated_cfg,
                &self.config,
                events_since_sync,
                cmd,
            ) {
                break;
            }
        }
    }
}

/// How the writer is driven. The production threaded path is byte-identical to
/// before; the cooperative path (only present under `dangerous-test-hooks`)
/// drives the writer inline with no thread.
pub(crate) enum WriterDrive {
    /// Production: the writer runs on a spawned thread (OS thread, or a
    /// SimScheduler task). `None` only for the test-only `from_parts_for_test`
    /// handle that has no live writer.
    Threaded {
        thread: Option<Box<dyn crate::store::platform::spawn::JobHandle>>,
    },
    /// Single-threaded: no writer thread; the queue is pumped inline.
    #[cfg(feature = "dangerous-test-hooks")]
    Cooperative { pump: CooperativePump },
}

impl WriterDrive {
    /// Pump the cooperative queue if cooperative; a no-op on the threaded path.
    fn pump(&self) {
        match self {
            WriterDrive::Threaded { .. } => {}
            #[cfg(feature = "dangerous-test-hooks")]
            WriterDrive::Cooperative { pump } => pump.pump(),
        }
    }
}

/// RestartPolicy: how the writer recovers from panics.
/// Keep this surface intentionally small: exactly two variants. The enum is
/// deliberately exhaustive (not `#[non_exhaustive]`) so every match over it is
/// total without a forward-compat wildcard arm.
#[derive(Clone, Debug, Default)]
pub enum RestartPolicy {
    /// Allow at most one automatic restart after a writer panic.
    #[default]
    Once,
    /// Allow up to `max_restarts` automatic restarts within a rolling `within_ms` millisecond window.
    Bounded {
        /// Maximum number of restarts permitted within the time window.
        max_restarts: u32,
        /// Time window in milliseconds over which `max_restarts` is enforced.
        within_ms: u64,
    },
}

impl WriterHandle {
    /// Spawn the background writer thread.
    pub(crate) fn spawn(
        config: &Arc<StoreConfig>,
        runtime: &Arc<ValidatedStoreConfig>,
        index: &Arc<StoreIndex>,
        subscribers: &Arc<SubscriberList>,
        reactor_subscribers: &Arc<ReactorSubscriberList>,
        reader: &Arc<crate::store::segment::scan::Reader>,
    ) -> Result<Self, StoreError> {
        // Fallible init — propagate errors to Store::open() caller
        config
            .fs()
            .create_dir_all(&config.data_dir)
            .map_err(StoreError::Io)?;
        let initial_segment_id = find_latest_segment_id(&config.data_dir)?.unwrap_or(0) + 1;
        let initial_segment = Segment::<Active>::create_with_created_ns_on(
            &config.data_dir,
            initial_segment_id,
            runtime.now_wall_ns(),
            config.fs(),
        )?;

        let (tx, rx) = flume::bounded::<WriterCommand>(config.writer.channel_capacity);
        let subs = Arc::clone(subscribers);
        let reactor_subs = Arc::clone(reactor_subscribers);
        let watermark_handle = WatermarkState::handle(runtime.clock_arc());
        let cfg = Arc::clone(config);
        let validated = Arc::clone(runtime);
        let idx = Arc::clone(index);
        let rdr = Arc::clone(reader);
        let watermark_for_thread = watermark_handle.clone();

        let thread = config
            .spawner()
            .spawn(
                writer_thread_name(&config.data_dir),
                config.writer.stack_size,
                Box::new(move || {
                    writer_thread_main(
                        &WriterRuntime {
                            rx: &rx,
                            config: cfg,
                            validated_cfg: validated,
                            index: idx,
                            subscribers: subs,
                            reactor_subscribers: reactor_subs,
                            reader: rdr,
                            watermark_handle: watermark_for_thread,
                        },
                        initial_segment,
                        initial_segment_id,
                    );
                }),
            )
            .map_err(StoreError::from)?;

        Ok(Self {
            tx,
            subscribers: Arc::clone(subscribers),
            reactor_subscribers: Arc::clone(reactor_subscribers),
            watermark_handle,
            drive: WriterDrive::Threaded {
                thread: Some(thread),
            },
        })
    }

    /// Build a cooperative (single-threaded) writer handle. Mirrors [`spawn`]'s
    /// fallible init and field wiring exactly, but builds the [`WriterCore`]
    /// directly into a shared [`CooperativePump`] instead of spawning a thread.
    /// NO thread is spawned and `config.spawner()` is never consulted.
    ///
    /// A panic inside a pumped command propagates to the pumping caller (there is
    /// no `catch_unwind` restart loop in cooperative mode); that is acceptable
    /// for the deterministic simulation this mode serves.
    ///
    /// [`spawn`]: Self::spawn
    #[cfg(feature = "dangerous-test-hooks")]
    pub(crate) fn cooperative(
        config: &Arc<StoreConfig>,
        runtime: &Arc<ValidatedStoreConfig>,
        index: &Arc<StoreIndex>,
        subscribers: &Arc<SubscriberList>,
        reactor_subscribers: &Arc<ReactorSubscriberList>,
        reader: &Arc<crate::store::segment::scan::Reader>,
    ) -> Result<Self, StoreError> {
        // Fallible init — identical to `spawn`, propagate errors to the caller.
        config
            .fs()
            .create_dir_all(&config.data_dir)
            .map_err(StoreError::Io)?;
        let initial_segment_id = find_latest_segment_id(&config.data_dir)?.unwrap_or(0) + 1;
        let initial_segment = Segment::<Active>::create_with_created_ns_on(
            &config.data_dir,
            initial_segment_id,
            runtime.now_wall_ns(),
            config.fs(),
        )?;

        let (tx, rx) = flume::bounded::<WriterCommand>(config.writer.channel_capacity);
        let watermark_handle = WatermarkState::handle(runtime.clock_arc());

        // Build the core directly — the same field wiring the `spawn` closure
        // performs inside `writer_loop`, just constructed here and owned by the
        // pump rather than a thread.
        let core = WriterCore {
            index: Arc::clone(index),
            active_segment: initial_segment,
            segment_id: initial_segment_id,
            config: Arc::clone(config),
            runtime: Arc::clone(runtime),
            subscribers: Arc::clone(subscribers),
            reactor_subscribers: Arc::clone(reactor_subscribers),
            reader: Arc::clone(reader),
            watermark_handle: watermark_handle.clone(),
            sidx_collector: crate::store::segment::sidx::SidxEntryCollector::new(),
            fence_ledger: None,
        };
        let state = std::sync::Arc::new(std::sync::Mutex::new(CoopState {
            core,
            events_since_sync: 0,
        }));

        Ok(Self {
            tx,
            subscribers: Arc::clone(subscribers),
            reactor_subscribers: Arc::clone(reactor_subscribers),
            watermark_handle,
            drive: WriterDrive::Cooperative {
                pump: CooperativePump {
                    state,
                    rx,
                    validated_cfg: Arc::clone(runtime),
                    config: Arc::clone(config),
                },
            },
        })
    }

    #[cfg(test)]
    pub(crate) fn from_parts_for_test(
        tx: Sender<WriterCommand>,
        subscribers: Arc<SubscriberList>,
    ) -> Self {
        Self {
            tx,
            subscribers,
            reactor_subscribers: Arc::new(ReactorSubscriberList::new()),
            watermark_handle: WatermarkState::handle(Arc::new(SystemClock::new())),
            drive: WriterDrive::Threaded { thread: None },
        }
    }

    pub(crate) fn watermark_handle(&self) -> WatermarkAdvanceHandle {
        self.watermark_handle.clone()
    }

    pub(crate) fn fail_if_exited(&self) -> Result<(), StoreError> {
        match &self.drive {
            WriterDrive::Threaded { thread } => {
                if thread.as_ref().is_some_and(|thread| thread.is_finished()) {
                    self.watermark_handle.mark_writer_crashed();
                    return Err(StoreError::WriterCrashed);
                }
                Ok(())
            }
            // No thread to exit: the writer runs inline on the calling thread, so
            // it cannot become a zombie. A command panic would unwind the pumping
            // caller rather than silently leaving a dead writer behind.
            #[cfg(feature = "dangerous-test-hooks")]
            WriterDrive::Cooperative { .. } => Ok(()),
        }
    }

    pub(crate) fn join(&mut self) -> Result<(), StoreError> {
        match &mut self.drive {
            WriterDrive::Threaded { thread } => {
                if let Some(thread) = thread.take() {
                    thread.join().map_err(|_| {
                        self.watermark_handle.mark_writer_crashed();
                        StoreError::WriterCrashed
                    })?;
                }
                Ok(())
            }
            // No thread to join; pump once to drain any commands queued behind the
            // Shutdown so the inline writer reaches quiescence, then return.
            #[cfg(feature = "dangerous-test-hooks")]
            WriterDrive::Cooperative { pump } => {
                pump.pump();
                Ok(())
            }
        }
    }

    /// Pump the writer's command queue. No-op on the threaded path (the writer
    /// thread is already draining the queue); on the cooperative path it drives
    /// every currently-queued command inline before the caller awaits a reply.
    pub(crate) fn pump(&self) {
        self.drive.pump();
    }

    /// The cooperative pump handle, cloned, when running cooperatively; `None`
    /// on the threaded path. Used to thread the pump into [`super::control`]
    /// tickets so `Ticket::wait` can drain the queue before blocking.
    #[cfg(feature = "dangerous-test-hooks")]
    pub(crate) fn cooperative_pump(&self) -> Option<CooperativePump> {
        match &self.drive {
            WriterDrive::Threaded { .. } => None,
            WriterDrive::Cooperative { pump } => Some(pump.clone()),
        }
    }

    /// Test-only: abandon the writer the way a power loss would — close its
    /// command channel (so the loop ends WITHOUT a `Shutdown`-triggered drain,
    /// footer, or final sync) and join the thread to quiescence. Takes the
    /// handle by `&mut` (the `Open` typestate owns it in `Store::state.0` and
    /// cannot be moved out past the `Drop` impl); it replaces `tx` with a
    /// freshly-disconnected sender, dropping the original sole sender so the
    /// writer's `rx.iter()` terminates naturally, then joins the thread. Used
    /// by [`super::super::Store::abandon_without_shutdown`].
    #[cfg(feature = "dangerous-test-hooks")]
    pub(crate) fn close_channel_and_join(&mut self) {
        // Replace `tx` with a dead sender whose receiver is already dropped, so
        // the original `tx` (the sole live sender) drops here and the writer
        // loop's `rx.iter()` ends.
        let (dead_tx, _dead_rx) = flume::bounded(0);
        let live_tx = std::mem::replace(&mut self.tx, dead_tx);
        drop(live_tx);
        match &mut self.drive {
            WriterDrive::Threaded { thread } => {
                if let Some(thread) = thread.take() {
                    let _join_result = thread.join();
                }
            }
            // No thread to join; pump to drive any already-queued commands to
            // quiescence. The dropped `tx` means no new commands can arrive.
            WriterDrive::Cooperative { pump } => pump.pump(),
        }
    }

    // NOTE: No send_append() method here. Store::append() and Store::append_reaction()
    // in store/mod.rs create one-shot flume channels and send WriterCommands directly
    // via self.writer.tx.send(). This avoids an unnecessary abstraction layer.
    // `WriterHandle::tx` is `pub(crate)` so store control paths can talk to the writer directly.
}

/// Writer's mutable runtime state, grouped to reduce handle_append param count.
struct WriterCore {
    index: Arc<StoreIndex>,
    active_segment: Segment<Active>,
    segment_id: u64,
    config: Arc<StoreConfig>,
    runtime: Arc<ValidatedStoreConfig>,
    subscribers: Arc<SubscriberList>,
    reactor_subscribers: Arc<ReactorSubscriberList>,
    /// Reader handle — updated on segment rotation so mmap dispatch is correct.
    reader: Arc<crate::store::segment::scan::Reader>,
    /// Shared frontier state for coherent watermark snapshots.
    watermark_handle: WatermarkAdvanceHandle,
    /// Accumulates SIDX entries for the current active segment.
    /// Flushed as a footer on segment rotation and shutdown.
    sidx_collector: crate::store::segment::sidx::SidxEntryCollector,
    /// Currently active public visibility fence, if any.
    fence_ledger: Option<FenceLedger>,
}

#[cfg(test)]
mod tests {
    use super::watermark::elapsed_ms_since;
    use super::{
        checked_next_clock, ReactorSubscriberList, SubscriberList, WatermarkState, WriterCommand,
        WriterDrive, WriterHandle,
    };
    use crate::store::stats::HlcPoint;
    use crate::store::{StoreError, SystemClock};
    use std::sync::mpsc;
    use std::sync::Arc;
    use std::time::{Duration, Instant};

    #[test]
    fn checked_next_clock_advances_and_overflow_fails_closed() {
        assert_eq!(
            checked_next_clock(None, "entity:test").expect("genesis clock"),
            0
        );
        assert_eq!(
            checked_next_clock(Some(7), "entity:test").expect("increment clock"),
            8
        );

        let err = checked_next_clock(Some(u32::MAX), "entity:overflow")
            .expect_err("entity clock overflow must fail closed");
        assert!(matches!(
            err,
            StoreError::EntityClockOverflow { ref entity } if entity == "entity:overflow"
        ));
    }

    #[test]
    fn duplicate_accepted_advance_does_not_restart_pending_write_age() {
        let point = HlcPoint {
            wall_ms: 10,
            global_sequence: 1,
        };
        let mut state = WatermarkState::default();

        state.advance_accepted_on_lane(0, point);
        state.advance_durable(point);
        assert_eq!(
            state.snapshot().oldest_pending_write_age_ms,
            None,
            "PROPERTY: durability to accepted clears pending write age"
        );

        state.advance_accepted_on_lane(0, point);
        assert_eq!(
            state.snapshot().oldest_pending_write_age_ms,
            None,
            "PROPERTY: duplicate accepted advance must not reopen pending write age"
        );
    }

    #[test]
    fn pending_write_age_reports_elapsed_milliseconds_not_nanoseconds_or_products() {
        assert_eq!(
            elapsed_ms_since(3_500_000, 1_000_000),
            2,
            "PROPERTY: frontier pending-write age is floor(elapsed_ns / 1_000_000)"
        );
        assert_eq!(
            elapsed_ms_since(1_000_000, 3_500_000),
            0,
            "PROPERTY: backwards monotonic samples saturate to zero"
        );
    }

    #[test]
    fn writer_handle_join_surfaces_thread_panic_and_poisons_watermarks() {
        let (tx, _rx) = flume::bounded::<WriterCommand>(1);
        let watermark_handle = WatermarkState::handle(Arc::new(SystemClock::new()));
        let thread = crate::store::platform::spawn::Spawn::spawn(
            &crate::store::platform::spawn::ThreadSpawn,
            "writer-join-panic-proof".to_owned(),
            None,
            Box::new(|| {
                // Deterministically unwind this writer body to prove join
                // surfaces the panic as WriterCrashed. `black_box` hides the
                // `None` from the literal-unwrap lint; `expect` is the
                // permitted in-test panic shape (not the `panic!` macro).
                std::hint::black_box(Option::<()>::None)
                    .expect("intentional writer join panic proof");
            }),
        )
        .expect("spawn panic proof thread");

        let mut handle = WriterHandle {
            tx,
            subscribers: Arc::new(SubscriberList::new()),
            reactor_subscribers: Arc::new(ReactorSubscriberList::new()),
            watermark_handle: watermark_handle.clone(),
            drive: WriterDrive::Threaded {
                thread: Some(thread),
            },
        };

        let err = handle
            .join()
            .expect_err("PROPERTY: writer thread panic must surface through join");
        assert!(matches!(err, StoreError::WriterCrashed));

        let poisoned =
            watermark_handle.wait_for_durable(HlcPoint::ORIGIN, Duration::from_millis(1));
        assert!(
            matches!(poisoned, Err(StoreError::WriterCrashed)),
            "PROPERTY: join panic must poison frontier waiters"
        );
    }

    #[test]
    fn dangerous_notify_all_wakes_condvar_waiters() {
        let handle = WatermarkState::handle(std::sync::Arc::new(crate::store::SystemClock::new()));
        let waiter_handle = handle.clone();
        let (ready_tx, ready_rx) = mpsc::channel();
        let (done_tx, done_rx) = mpsc::channel();

        let waiter = std::thread::Builder::new()
            .name("watermark-dangerous-notify-proof".to_string())
            .spawn(move || {
                ready_tx.send(()).expect("signal waiter readiness");
                let timed_out =
                    waiter_handle.dangerous_wait_for_notification(Duration::from_secs(2));
                done_tx.send(timed_out).expect("signal waiter outcome");
            })
            .expect("spawn condvar waiter");

        ready_rx
            .recv_timeout(Duration::from_secs(1))
            .expect("waiter reached condvar wait setup");
        let deadline = Instant::now() + Duration::from_secs(1);
        let timed_out = loop {
            handle.dangerous_notify_all();
            match done_rx.recv_timeout(Duration::from_millis(10)) {
                Ok(timed_out) => break timed_out,
                Err(mpsc::RecvTimeoutError::Timeout) if Instant::now() < deadline => {}
                Err(mpsc::RecvTimeoutError::Timeout | mpsc::RecvTimeoutError::Disconnected) => {
                    break true;
                }
            }
        };

        waiter.join().expect("condvar waiter joins");
        assert!(
            !timed_out,
            "PROPERTY: dangerous_notify_all must wake frontier waiters before their timeout"
        );
    }
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum WriterLoopPhase {
    Main,
    GroupCommitDrain,
    ShutdownDrain,
}

impl WriterCore {
    fn execute_command(&mut self, phase: WriterLoopPhase, cmd: WriterCommand) -> CommandResult {
        match cmd {
            WriterCommand::BeginVisibilityFence { token, respond } => match phase {
                WriterLoopPhase::Main | WriterLoopPhase::ShutdownDrain => {
                    ignore_closed_response_channel(
                        respond.send(self.begin_visibility_fence(token)),
                    );
                    CommandResult::immediate(0)
                }
                WriterLoopPhase::GroupCommitDrain => CommandResult::immediate(0)
                    .with_sync(DeferredReply::BeginVisibilityFence { token, respond })
                    .break_after_reply(),
            },
            WriterCommand::Append {
                coord,
                event,
                kind,
                guards,
                respond,
            } => {
                let result = self.handle_append(&coord, *event, kind, &guards, None);
                ignore_closed_response_channel(respond.send(result));
                let base = CommandResult::immediate(1);
                if matches!(phase, WriterLoopPhase::Main) {
                    base.enter_group_commit_drain()
                } else {
                    base
                }
            }
            WriterCommand::FenceAppend {
                token,
                coord,
                event,
                kind,
                guards,
                respond,
            } => {
                if let Err(error) = self.handle_fence_append_command(
                    token,
                    &coord,
                    *event,
                    kind,
                    &guards,
                    respond.clone(),
                ) {
                    ignore_closed_response_channel(respond.send(Err(error)));
                    CommandResult::immediate(0)
                } else {
                    CommandResult::immediate(1)
                }
            }
            WriterCommand::AppendBatch { items, respond } => {
                let result = self.handle_append_batch(items, None);
                ignore_closed_response_channel(respond.send(result));
                CommandResult::immediate(1)
            }
            WriterCommand::FenceAppendBatch {
                token,
                items,
                respond,
            } => {
                if let Err(error) =
                    self.handle_fence_append_batch_command(token, items, respond.clone())
                {
                    ignore_closed_response_channel(respond.send(Err(error)));
                    CommandResult::immediate(0)
                } else {
                    CommandResult::immediate(1)
                }
            }
            WriterCommand::CommitVisibilityFence { token, respond } => match phase {
                WriterLoopPhase::Main | WriterLoopPhase::GroupCommitDrain => {
                    CommandResult::immediate(0)
                        .with_sync(DeferredReply::CommitVisibilityFence { token, respond })
                        .break_after_reply_if(matches!(phase, WriterLoopPhase::GroupCommitDrain))
                }
                WriterLoopPhase::ShutdownDrain => {
                    ignore_closed_response_channel(
                        respond.send(self.commit_visibility_fence(token)),
                    );
                    CommandResult::immediate(0)
                }
            },
            WriterCommand::CancelVisibilityFence { token, respond } => {
                ignore_closed_response_channel(respond.send(self.cancel_visibility_fence(token)));
                let base = CommandResult::immediate(0);
                if matches!(phase, WriterLoopPhase::GroupCommitDrain) {
                    base.break_after_reply()
                } else {
                    base
                }
            }
            WriterCommand::Sync { respond } => match phase {
                WriterLoopPhase::Main | WriterLoopPhase::GroupCommitDrain => {
                    CommandResult::immediate(0)
                        .with_sync(DeferredReply::Sync { respond })
                        .break_after_reply_if(matches!(phase, WriterLoopPhase::GroupCommitDrain))
                }
                WriterLoopPhase::ShutdownDrain => {
                    ignore_closed_response_channel(respond.send(self.sync_active_segment()));
                    CommandResult::immediate(0)
                }
            },
            WriterCommand::Shutdown { respond } => match phase {
                WriterLoopPhase::Main => CommandResult::immediate(0).enter_shutdown_drain(respond),
                WriterLoopPhase::GroupCommitDrain => CommandResult::immediate(0)
                    .with_sync(DeferredReply::Shutdown { respond })
                    .break_after_reply()
                    .exit_writer(),
                WriterLoopPhase::ShutdownDrain => {
                    ignore_closed_response_channel(respond.send(Ok(())));
                    CommandResult::immediate(0)
                }
            },
            #[cfg(feature = "dangerous-test-hooks")]
            WriterCommand::PanicForTest { respond } => match phase {
                WriterLoopPhase::Main => {
                    ignore_closed_response_channel(respond.send(Ok(())));
                    std::panic::resume_unwind(Box::new(
                        "PanicForTest: intentional writer panic for restart_policy testing",
                    ));
                }
                WriterLoopPhase::GroupCommitDrain | WriterLoopPhase::ShutdownDrain => {
                    ignore_closed_response_channel(respond.send(Ok(())));
                    CommandResult::immediate(0).break_after_reply()
                }
            },
        }
    }

    // NOTE: `sync_active_segment` — the single durability choke point that
    // fsyncs the active segment, advances the durable frontier on success, and
    // poisons the writer fail-closed on failure (fsyncgate) — lives in
    // `runtime.rs` next to `drive_command`'s poison gate.

    /// Check whether the active segment needs rotation, and if so, seal it,
    /// write its SIDX footer, sync, and create a new active segment.
    ///
    /// Returns `Ok(true)` if a rotation occurred, `Ok(false)` if no rotation
    /// was needed. On rotation, the SIDX collector is reset, the old segment
    /// is sealed, segment_id is advanced, and the reader is notified.
    ///
    /// Callers needing batch-specific error context should wrap errors with
    /// the writer-local `batch_failed(...)` helper.
    fn maybe_rotate_segment(&mut self) -> Result<bool, StoreError> {
        if !self
            .active_segment
            .needs_rotation(self.config.segment_max_bytes)
        {
            return Ok(false);
        }
        #[cfg(feature = "dangerous-test-hooks")]
        let old_segment = self.segment_id;
        #[cfg(feature = "dangerous-test-hooks")]
        let new_segment = self.segment_id + 1;
        // Create + fsync the NEW segment FIRST, before touching the old segment
        // or the collector. `create_with_created_ns` performs the file create
        // plus the file/dir fsync (Batch F/C4), and is the only step here that
        // both can fail AND has rollback-requiring side effects. If it fails we
        // return `?` with the old segment and collector FULLY INTACT: rotation
        // simply did not happen. The triggering append errors cleanly and the
        // next append retries against the unchanged old segment — no SIDX footer
        // has been written to the old segment, the collector still holds every
        // old entry, so there is no "frame bytes after footer bytes" corruption
        // and no lost SIDX coverage. (Previously the footer write + collector
        // reset + old-segment sync happened BEFORE this fallible create, so a
        // create/fsync failure left the writer running on a half-sealed old
        // segment with a wiped collector — the silent-corruption P1.)
        //
        // Fault injection point models the create/fsync FAILING here, while the
        // old segment + collector are still pristine. Firing it before the real
        // create (rather than making the create itself fallible-by-injection)
        // exercises the exact same rollback path: `?` returns with nothing
        // mutated, so rotation cleanly did not happen.
        #[cfg(feature = "dangerous-test-hooks")]
        crate::store::fault::maybe_inject(
            crate::store::fault::InjectionPoint::SegmentRotationCreate {
                old_segment,
                new_segment,
            },
            &self.config.fault_injector,
        )?;
        let new_active = Segment::<Active>::create_with_created_ns_on(
            &self.config.data_dir,
            self.segment_id + 1,
            self.runtime.now_wall_ns(),
            self.config.fs(),
        )?;
        // New segment is durably present. Now flush the OLD segment's committed
        // frames while it is still pristine — before writing the footer or
        // resetting the collector. This is the second (and last) fallible step
        // that touches the old segment: if it fails we return `?` with the old
        // segment and collector still fully intact (no footer written,
        // collector unchanged), so rotation cleanly did not happen. Unlike the
        // create failure above (a clean retry — no fsync was attempted), a
        // FAILED flush here poisons the writer fail-closed inside
        // `sync_active_segment` (fsyncgate): the next append surfaces
        // `WriterCrashed` rather than retrying over pages the kernel may have
        // silently dropped.
        self.sync_active_segment()?;
        // From here on every step is infallible or non-fatal, so the rotation
        // completes atomically with respect to its fallible side effects.
        //
        // Write SIDX footer before sealing. append_frames_from_segment now
        // strips SIDX data via detect_sidx_boundary, so this is safe. The footer
        // is a cold-start fast-rebuild optimization: if writing OR its best-effort
        // durability flush fails, recovery falls back to a full frame scan, so
        // both are treated as non-fatal and never abort the rotation (aborting
        // here would reintroduce the half-rotated state this reorder fixes — the
        // footer would be partially written and the collector about to be reset).
        if let Err(e) = self.active_segment.write_sidx_footer(&self.sidx_collector) {
            tracing::warn!("SIDX footer write failed (non-fatal): {e}");
        } else if let Err(e) = self.active_segment.sync_with_mode(&self.config.sync.mode) {
            tracing::warn!("SIDX footer durability flush failed (non-fatal): {e}");
        }
        self.sidx_collector = crate::store::segment::sidx::SidxEntryCollector::new();
        let old = std::mem::replace(&mut self.active_segment, new_active);
        let _sealed = old.seal();
        self.segment_id += 1;
        // Notify the reader of the new active segment so mmap dispatch is correct.
        self.reader.set_active_segment(self.segment_id);
        // Inject a crash during rotation AFTER in-memory state is fully rolled
        // forward (active segment swapped, id incremented, reader advanced), so a
        // returned injected error leaves writer state CONSISTENT — a real crash
        // discards in-memory state, it does not leave active_segment pointing at
        // the new file while segment_id still names the old one (which would make
        // the next append publish DiskPos values that read the wrong segment).
        // The on-disk recovery scenario is unchanged: old segment sealed (with
        // footer), new empty active file present, triggering append not yet
        // written — exactly what cold-start recovery must handle.
        #[cfg(feature = "dangerous-test-hooks")]
        crate::store::fault::maybe_inject(
            crate::store::fault::InjectionPoint::SegmentRotation {
                old_segment,
                new_segment,
            },
            &self.config.fault_injector,
        )?;
        Ok(true)
    }
}

#[cfg(test)]
#[path = "writer_mutation_tests.rs"]
mod mutation_kill_tests;