datawal 0.1.5

Core record store for datawal: append-only framed records (CRC32C), valid-prefix recovery, bytes-based KV projection with tombstones, manual compaction, and JSONL export. v0.1-pre.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
//! Append-only framed record log: the durable substrate of datawal.
//!
//! See [`crate::format`] for the wire format and [`crate::segment`] for the
//! on-disk segment naming convention.
//!
//! v0.1-pre semantics:
//! - Single writer per directory, enforced by an OS-level advisory lock
//!   on `.lock` (see [`crate::lock`]).
//! - One active segment file at a time. `rotate()` closes the current one
//!   and opens the next id.
//! - **Durability boundary.** `append` / `append_record` write a framed,
//!   CRC-protected record to the active segment's file. The record is
//!   immediately *recoverable* (a subsequent `scan()` will return it) but
//!   is **not yet durable** across a host crash or power loss. Durability
//!   is established by a successful call to `fsync()`, which `sync_all`s
//!   the active segment file and fsyncs the containing directory. This
//!   crate never silently fsyncs on every append.
//! - `scan` reads every segment in order and returns every CRC-valid record.
//!   Tail truncation on the **last** segment is treated as recoverable; any
//!   structural error (bad magic, unknown version/type, oversize) and any
//!   mid-stream corruption are hard errors.

use std::fs::{File, OpenOptions};
use std::io::{Read, Write};
use std::path::{Path, PathBuf};

use anyhow::{anyhow, bail, Context, Result};

use crate::format::{
    decode_next, encode_record, DecodeError, DecodeOutcome, RecordType, HEADER_LEN, MAX_KEY_LEN,
    MAX_PAYLOAD_LEN,
};
use crate::lock::DirLock;
use crate::segment::{
    active_segment_id, list_segment_ids, next_segment_id, segment_path, segment_size,
};

/// Reference to a record's location on disk.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct RecordRef {
    /// Segment id (matches the on-disk filename).
    pub segment: u32,
    /// Byte offset of the record header within that segment.
    pub offset: u64,
    /// Total wire size of the record (header + key + payload + crc).
    pub len: u32,
}

/// A decoded record returned by [`RecordLog::scan`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Record {
    pub record_type: RecordType,
    pub txid: u64,
    pub key: Vec<u8>,
    pub payload: Vec<u8>,
    pub segment: u32,
    pub offset: u64,
    pub len: u32,
}

/// Summary of the last `scan()` over a log: how many records were valid,
/// how many bytes (if any) of trailing garbage were ignored at the tail of
/// the last segment, and so on.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct RecoveryReport {
    /// Total segment files inspected.
    pub files_scanned: u32,
    /// Total CRC-valid records returned.
    pub records_replayed: u64,
    /// Whether the last segment had a non-fatal truncated/CRC-bad tail.
    /// Counted in segments, not records.
    pub tail_truncated: u32,
    /// Bytes of trailing garbage in the last segment that were skipped.
    pub tail_bytes_discarded: u64,
    /// Number of mid-stream errors detected. v0.1-pre aborts on the first
    /// one, so this is always 0 on success and >0 only if a future variant
    /// switches to lenient mode.
    pub mid_stream_errors: u32,
    /// Always 0 in v0.1-pre because unknown versions are a hard error.
    pub unsupported_versions: u32,
    /// Highest txid observed across all replayed records, or 0 if none.
    pub last_txid_seen: u64,
}

/// Append-only framed record log.
///
/// # Failure model
///
/// Mutating operations (`append`, `append_record`, `fsync`, `rotate`) may
/// fail in the middle of an I/O operation, after the kernel has accepted
/// **part** of a frame but before the whole frame is on disk (`ENOSPC`,
/// a broken disk, a torn write). When that happens the log handle enters
/// a **poisoned** state:
///
/// - Every subsequent mutating call returns a deterministic error whose
///   message starts with `datawal: writer poisoned:` and ends with
///   `; drop handle and reopen`. The error is intentionally a plain
///   `anyhow::Error` in 0.1.x; promotion to a typed error variant is
///   tracked for a future minor release.
/// - Read-only operations (`scan`, `scan_iter`, `recovery_report`,
///   `active_segment`, `dir`) remain available so the caller can inspect
///   state before dropping the handle.
/// - The caller **must** drop the handle and re-open the directory with
///   [`RecordLog::open`]. Reopen uses the standard longest-valid-prefix
///   recovery (see invariant 2 in `AGENTS.md`) and will discard any
///   partial tail bytes left behind by the failed write.
///
/// The crate intentionally does not try to truncate the partial tail or
/// resync `active_size` on the live handle. Both are forms of mutating
/// state after a write failure, which expands rather than contains the
/// blast radius.
#[derive(Debug)]
pub struct RecordLog {
    dir: PathBuf,
    _lock: DirLock,
    active_id: u32,
    /// Open file handle on the active segment, opened in append mode.
    active_file: File,
    /// Cached size of the active segment in bytes, used to compute offsets
    /// for `RecordRef` without an extra `metadata()` call per append.
    active_size: u64,
    /// Next txid to assign on append.
    next_txid: u64,
    /// Last scan report, lazily refreshed by `recovery_report()`.
    last_report: Option<RecoveryReport>,
    /// Set on any mutating I/O failure (`append_record`, `fsync`,
    /// `rotate`). Once set, subsequent mutating calls return a
    /// deterministic error; read-only calls remain available. See the
    /// type-level "Failure model" docs for the full contract.
    poisoned: Option<&'static str>,
}

impl RecordLog {
    /// Open (or create) a record log rooted at `dir`.
    ///
    /// Steps:
    /// 1. `mkdir -p dir`.
    /// 2. Acquire an exclusive OS-level advisory lock on `<dir>/.lock`
    ///    (held by a file descriptor; released automatically when this
    ///    `RecordLog` is dropped or when the holding process exits).
    /// 3. Discover segments; if none, create segment id 1.
    /// 4. Pick the highest id as the active segment.
    /// 5. Scan all segments to discover `next_txid` and store the recovery
    ///    report.
    /// 6. Open the active segment for append.
    ///
    /// Fails fast if another `RecordLog` is already open on the same
    /// directory (the kernel-level lock acquisition does not block).
    pub fn open(dir: &Path) -> Result<Self> {
        std::fs::create_dir_all(dir)
            .with_context(|| format!("datawal: create_dir_all {}", dir.display()))?;

        let lock = DirLock::acquire(dir)?;

        let mut ids = list_segment_ids(dir)?;
        if ids.is_empty() {
            // Create segment 1.
            let p = segment_path(dir, 1);
            File::create(&p)
                .with_context(|| format!("datawal: create initial segment {}", p.display()))?;
            // fsync parent so the new file is durable before we proceed.
            safeatomic_rs::fsync_dir(dir)
                .with_context(|| format!("datawal: fsync_dir {}", dir.display()))?;
            ids.push(1);
        }

        let active_id = active_segment_id(dir)?.expect("just ensured at least one segment");

        // Scan once for recovery + next txid.
        let report = scan_all(dir, &ids)?;
        let next_txid = report.last_txid_seen.checked_add(1).unwrap_or(1);

        // Open the active file for append. The recovery report tells us if
        // the active segment has a truncated tail; we ignore that here
        // because we never re-write into the bad region — appends always go
        // to the very end of the file as it currently exists.
        let active_size_logical = report.last_segment_logical_size_for(active_id).unwrap_or(0);
        let active_size_on_disk = segment_size(dir, active_id)?;
        // If there is trailing garbage at the end of the active segment, do
        // **not** physically truncate it in v0.1-pre — that would destroy
        // bytes without an explicit user request. Document the
        // discrepancy by leaving `active_size` set to the logical end.
        let _ = active_size_on_disk;
        let active_file = OpenOptions::new()
            .read(true)
            .append(true)
            .create(false)
            .open(segment_path(dir, active_id))
            .with_context(|| {
                format!(
                    "datawal: open active segment {}",
                    segment_path(dir, active_id).display()
                )
            })?;

        Ok(Self {
            dir: dir.to_path_buf(),
            _lock: lock,
            active_id,
            active_file,
            active_size: active_size_logical,
            next_txid,
            last_report: Some(report.into_public()),
            poisoned: None,
        })
    }

    /// Returns `true` if the writer is poisoned by a prior I/O failure.
    ///
    /// A poisoned log refuses all further mutating operations. Read-only
    /// operations remain available so the caller can inspect state before
    /// dropping the handle. See the type-level "Failure model" docs.
    pub fn is_poisoned(&self) -> bool {
        self.poisoned.is_some()
    }

    /// Internal: produce the stable poison error.
    ///
    /// The message format is part of the public contract documented on
    /// `RecordLog` and is covered by tests; do not change it without
    /// updating both the docs and `tests/poison_writer.rs`.
    fn poison_error(reason: &'static str) -> anyhow::Error {
        anyhow!(
            "datawal: writer poisoned: {}; drop handle and reopen",
            reason
        )
    }

    /// Internal: if poisoned, return the stable poison error.
    fn check_poisoned(&self) -> Result<()> {
        if let Some(reason) = self.poisoned {
            Err(Self::poison_error(reason))
        } else {
            Ok(())
        }
    }

    /// Test-only: synthetically poison the writer. Reached from
    /// integration tests via the `crate::testing` module. Not part
    /// of the public API.
    #[doc(hidden)]
    pub fn __set_poisoned_for_test(&mut self, reason: &'static str) {
        self.poisoned = Some(reason);
    }

    /// Directory backing this log.
    pub fn dir(&self) -> &Path {
        &self.dir
    }

    /// Active segment id.
    pub fn active_segment(&self) -> u32 {
        self.active_id
    }

    /// Last recovery report computed by `open()` or `scan()`.
    pub fn recovery_report(&self) -> Result<RecoveryReport> {
        Ok(self.last_report.clone().unwrap_or_default())
    }

    /// Append an opaque payload as a `Raw` record.
    ///
    /// **Durability boundary.** This call writes a framed, CRC-protected
    /// record to the active segment's file via `write_all`. It does **not**
    /// fsync the file or the directory. The record is *recoverable* (a
    /// subsequent `scan()` will return it) as long as the OS does not lose
    /// the buffered write, but it is **not yet durable** across a power
    /// failure or hard crash of the host until `fsync()` returns
    /// successfully.
    ///
    /// Pattern for "this must survive a crash":
    /// ```ignore
    /// log.append(payload)?;
    /// log.fsync()?;
    /// ```
    pub fn append(&mut self, payload: &[u8]) -> Result<RecordRef> {
        self.append_record(RecordType::Raw, b"", payload)
    }

    /// Append a typed record with a key and a payload.
    ///
    /// Used by [`crate::DataWal`] for `Put` / `Delete`. Length limits are
    /// validated by the encoder before allocation.
    ///
    /// Same durability semantics as [`RecordLog::append`]: framed and
    /// recoverable on return, but only durable after a successful
    /// [`RecordLog::fsync`].
    pub fn append_record(
        &mut self,
        record_type: RecordType,
        key: &[u8],
        payload: &[u8],
    ) -> Result<RecordRef> {
        self.check_poisoned()?;

        let txid = self.next_txid;
        // Encoding errors (over-limit key/payload, txid overflow inside the
        // encoder) happen before any I/O and therefore cannot leave a
        // partial frame on disk. Do not poison the writer in that case.
        let bytes = encode_record(record_type, txid, key, payload)?;
        let len = bytes.len() as u32;
        let offset = self.active_size;

        // OpenOptions::append guarantees writes go to the end on POSIX.
        // A failure here may have written a prefix of `bytes` to the
        // segment file. The longest-valid-prefix recovery on reopen will
        // discard the partial tail, but the live handle's `active_size`
        // and `next_txid` would now be out of sync with the file. Poison
        // the writer so subsequent mutating calls fail loudly.
        if let Err(e) = self.active_file.write_all(&bytes) {
            self.poisoned = Some("append_record write_all failed");
            return Err(anyhow::Error::new(e).context(format!(
                "datawal: write_all to segment {}",
                segment_path(&self.dir, self.active_id).display()
            )));
        }
        // The two checked_add calls below cannot leave a partial frame on
        // disk (the write already succeeded). They are integer-overflow
        // guards. Treat overflow as poisoning anyway, because the live
        // handle's offset bookkeeping is now meaningless.
        self.active_size = match self.active_size.checked_add(len as u64) {
            Some(v) => v,
            None => {
                self.poisoned = Some("active segment size overflow");
                return Err(anyhow!("datawal: active segment size overflow"));
            }
        };
        self.next_txid = match txid.checked_add(1) {
            Some(v) => v,
            None => {
                self.poisoned = Some("txid overflow");
                return Err(anyhow!("datawal: txid overflow at {}", txid));
            }
        };

        Ok(RecordRef {
            segment: self.active_id,
            offset,
            len,
        })
    }

    /// Scan every segment in order and return every valid record.
    ///
    /// Materialises every record into a `Vec<Record>`. For logs with many
    /// records or large payloads, prefer [`RecordLog::scan_iter`] which
    /// yields one record at a time without materialising the whole log.
    ///
    /// Also refreshes `recovery_report()` and the internal `next_txid`.
    pub fn scan(&mut self) -> Result<Vec<Record>> {
        let ids = list_segment_ids(&self.dir)?;
        let internal = scan_all(&self.dir, &ids)?;
        self.next_txid = internal.last_txid_seen.checked_add(1).unwrap_or(1);
        self.last_report = Some(internal.clone().into_public());
        Ok(internal.records)
    }

    /// Returns an iterator over records.
    ///
    /// This is lazy at the record level: callers can pull one record at a
    /// time without materialising the whole log into a `Vec<Record>`. It
    /// is **not** a chunked or zero-copy scanner — v0.1 loads one segment
    /// at a time into memory before yielding records from it. Peak memory
    /// is therefore bounded by the size of the **largest segment**, not by
    /// the total log size.
    ///
    /// Recovery semantics match [`RecordLog::scan`]:
    ///
    /// - A truncated or CRC-bad tail on the **last** segment is tolerated
    ///   and ends iteration cleanly. The amount of trailing garbage
    ///   discarded is reflected in
    ///   [`RecordIter::recovery_report`].
    /// - Any structural decode error, or any CRC/truncation problem in a
    ///   sealed (non-last) segment, is yielded as an `Err` item; iteration
    ///   ends after that error, and the underlying error is the same
    ///   `anyhow` error that [`RecordLog::scan`] would have returned.
    ///
    /// This method takes `&self`. It does not refresh the log's own
    /// `recovery_report()` or `next_txid` — only [`RecordLog::scan`] does
    /// that.
    ///
    /// Aborting iteration early (by dropping the iterator before
    /// exhaustion) is supported and has no on-disk side effects.
    pub fn scan_iter(&self) -> Result<RecordIter<'_>> {
        let ids = list_segment_ids(&self.dir)?;
        Ok(RecordIter::new(&self.dir, ids))
    }

    /// Force durability of all records appended so far.
    ///
    /// On successful return, every record passed to `append` /
    /// `append_record` since this `RecordLog` was opened (or since the last
    /// `fsync` returned) is durable: it will survive a process crash,
    /// kernel panic or power loss on the underlying disk, modulo the
    /// usual filesystem caveats (working `fsync` syscall, no lying disk
    /// cache).
    ///
    /// Internally this calls `File::sync_all` on the active segment **and**
    /// `fsync` on the containing directory, so that segment creations and
    /// rotations are also durable.
    ///
    /// `fsync` may be called as often as desired; on a log with no new
    /// appends since the last fsync it is effectively a no-op at the
    /// kernel level, but it is always safe.
    pub fn fsync(&mut self) -> Result<()> {
        self.check_poisoned()?;

        // `sync_all` can fail (`EIO`, journal flush refused, etc.). On
        // Linux, fsync errors are not always re-reported on subsequent
        // calls, so we treat any fsync failure as a fatal event for this
        // handle. The on-disk state is whatever the kernel made of the
        // dirty pages; the caller must drop + reopen and let
        // longest-valid-prefix recovery settle it.
        if let Err(e) = self.active_file.sync_all() {
            self.poisoned = Some("fsync sync_all failed");
            return Err(anyhow::Error::new(e).context(format!(
                "datawal: sync_all on segment {}",
                segment_path(&self.dir, self.active_id).display()
            )));
        }
        // Also fsync the directory so the directory entry of the active
        // segment (and any prior `rotate()` rename) is durable.
        if let Err(e) = safeatomic_rs::fsync_dir(&self.dir) {
            self.poisoned = Some("fsync fsync_dir failed");
            return Err(e.context(format!("datawal: fsync_dir {}", self.dir.display())));
        }
        Ok(())
    }

    /// Rotate to the next segment. The current segment is closed and
    /// fsynced; the new segment is created empty and becomes active.
    pub fn rotate(&mut self) -> Result<()> {
        self.check_poisoned()?;

        // Make the current segment durable before moving on. A failure
        // here means the previous segment's tail durability is in doubt,
        // so we poison just like in `fsync`.
        if let Err(e) = self.active_file.sync_all() {
            self.poisoned = Some("rotate sync_all on previous segment failed");
            return Err(anyhow::Error::new(e).context(format!(
                "datawal: sync_all on rotate, segment {}",
                segment_path(&self.dir, self.active_id).display()
            )));
        }

        let ids = list_segment_ids(&self.dir)?;
        let new_id = next_segment_id(&ids)?;
        if new_id <= self.active_id {
            // Defensive: not reachable from a clean log because
            // `next_segment_id` returns `max(ids) + 1` and `active_id`
            // is in `ids`. Poison anyway because the on-disk segment
            // sequence is now in an unexpected state.
            self.poisoned = Some("rotate computed non-increasing segment id");
            bail!(
                "datawal: rotate computed non-increasing segment id (current={}, computed={})",
                self.active_id,
                new_id
            );
        }
        let new_path = segment_path(&self.dir, new_id);
        // A failure between creating the new segment file and opening it
        // for append would leave a zero-byte segment on disk that the
        // next reopen would treat as the active segment. That is the
        // textbook poison case.
        if let Err(e) = File::create(&new_path) {
            self.poisoned = Some("rotate create new segment failed");
            return Err(anyhow::Error::new(e)
                .context(format!("datawal: create segment {}", new_path.display())));
        }
        if let Err(e) = safeatomic_rs::fsync_dir(&self.dir) {
            self.poisoned = Some("rotate fsync_dir after segment create failed");
            return Err(e.context(format!("datawal: fsync_dir {}", self.dir.display())));
        }

        let new_file = match OpenOptions::new().read(true).append(true).open(&new_path) {
            Ok(f) => f,
            Err(e) => {
                self.poisoned = Some("rotate open new active segment failed");
                return Err(anyhow::Error::new(e).context(format!(
                    "datawal: open new active segment {}",
                    new_path.display()
                )));
            }
        };
        self.active_file = new_file;
        self.active_id = new_id;
        self.active_size = 0;
        Ok(())
    }

    /// Close the log, releasing the directory lock.
    pub fn close(self) -> Result<()> {
        // Dropping `self` runs `DirLock::drop`, which closes the lock file
        // descriptor and releases the kernel-level flock. The sentinel
        // `.lock` file itself remains on disk; it is not the lock.
        Ok(())
    }
}

/// Internal scan state: accumulates records (for the eager `scan_all` path)
/// or just counts them (for the streaming `RecordIter` path). Kept private
/// so future field changes do not break the public API.
#[derive(Debug, Clone)]
struct ScanInternal {
    /// Records collected eagerly. Empty in the streaming path.
    records: Vec<Record>,
    /// Number of records replayed so far. Drives `RecoveryReport.records_replayed`.
    /// In the eager path this equals `records.len()`; in the streaming
    /// path `records` stays empty and only this counter advances.
    records_replayed: u64,
    files_scanned: u32,
    last_txid_seen: u64,
    tail_truncated: u32,
    tail_bytes_discarded: u64,
    last_segment_logical_end: Option<(u32, u64)>,
}

impl ScanInternal {
    fn last_segment_logical_size_for(&self, segment: u32) -> Option<u64> {
        self.last_segment_logical_end
            .filter(|(id, _)| *id == segment)
            .map(|(_, end)| end)
    }

    fn into_public(self) -> RecoveryReport {
        RecoveryReport {
            files_scanned: self.files_scanned,
            records_replayed: self.records_replayed,
            tail_truncated: self.tail_truncated,
            tail_bytes_discarded: self.tail_bytes_discarded,
            mid_stream_errors: 0,
            unsupported_versions: 0,
            last_txid_seen: self.last_txid_seen,
        }
    }
}

/// Read a segment file completely into memory and decode every record.
///
/// `is_last_segment` controls how trailing problems are treated: tolerated
/// for the last segment, hard error otherwise.
fn scan_segment(dir: &Path, id: u32, is_last_segment: bool, out: &mut ScanInternal) -> Result<()> {
    let path = segment_path(dir, id);
    let mut f =
        File::open(&path).with_context(|| format!("datawal: open segment {}", path.display()))?;
    let mut buf = Vec::new();
    f.read_to_end(&mut buf)
        .with_context(|| format!("datawal: read_to_end {}", path.display()))?;
    let mut offset: u64 = 0;
    let file_len = buf.len() as u64;

    loop {
        if offset == file_len {
            out.last_segment_logical_end = Some((id, offset));
            break;
        }
        match decode_next(&buf, offset) {
            Ok(DecodeOutcome::Ok {
                record_type,
                txid,
                key,
                payload,
                bytes_consumed,
            }) => {
                let len = bytes_consumed;
                out.records.push(Record {
                    record_type,
                    txid,
                    key,
                    payload,
                    segment: id,
                    offset,
                    len,
                });
                out.records_replayed += 1;
                if txid > out.last_txid_seen {
                    out.last_txid_seen = txid;
                }
                offset += bytes_consumed as u64;
            }
            Ok(DecodeOutcome::Truncated { .. }) => {
                if is_last_segment {
                    let discarded = file_len - offset;
                    out.tail_truncated += 1;
                    out.tail_bytes_discarded += discarded;
                    out.last_segment_logical_end = Some((id, offset));
                    break;
                } else {
                    bail!(
                        "datawal: truncated record at offset {} of non-tail segment {} ({}); refusing to silently drop data",
                        offset,
                        id,
                        path.display()
                    );
                }
            }
            Ok(DecodeOutcome::CrcMismatch { bytes_consumed }) => {
                if is_last_segment {
                    // Treat as tail damage and stop.
                    let discarded = file_len - offset;
                    out.tail_truncated += 1;
                    out.tail_bytes_discarded += discarded;
                    out.last_segment_logical_end = Some((id, offset));
                    let _ = bytes_consumed;
                    break;
                } else {
                    bail!(
                        "datawal: CRC mismatch at offset {} of non-tail segment {} ({})",
                        offset,
                        id,
                        path.display()
                    );
                }
            }
            Err(err) => {
                // Structural / hard errors are never silently tolerated, not
                // even on the tail segment.
                let _: DecodeError = err;
                bail!(
                    "datawal: structural decode error at offset {} of segment {} ({}): {}",
                    offset,
                    id,
                    path.display(),
                    err
                );
            }
        }
    }
    Ok(())
}

/// Scan every segment in `ids` (must be sorted ascending). Treat the
/// final segment as recoverable for tail problems; all earlier segments
/// must be fully clean.
fn scan_all(dir: &Path, ids: &[u32]) -> Result<ScanInternal> {
    let mut out = ScanInternal {
        records: Vec::new(),
        records_replayed: 0,
        files_scanned: 0,
        last_txid_seen: 0,
        tail_truncated: 0,
        tail_bytes_discarded: 0,
        last_segment_logical_end: None,
    };
    if ids.is_empty() {
        return Ok(out);
    }
    let last_idx = ids.len() - 1;
    for (i, id) in ids.iter().enumerate() {
        out.files_scanned += 1;
        let is_last = i == last_idx;
        scan_segment(dir, *id, is_last, &mut out)?;
    }
    Ok(out)
}

/// Record-level lazy iterator over a [`RecordLog`].
///
/// Yielded by [`RecordLog::scan_iter`]. The iterator is lazy at the
/// record level: each call to `next()` decodes one frame. It is **not**
/// zero-copy and it is **not** chunked I/O — one whole segment file is
/// resident in memory at a time. Peak memory is bounded by the largest
/// segment, not by the total log size.
///
/// The list of segment ids is snapshotted when [`RecordLog::scan_iter`]
/// is called; segments rotated in by the writer after that point are
/// **not** observed by this iterator (this matches [`RecordLog::scan`]).
///
/// The borrow on `'log` only scopes the iterator to the lifetime of the
/// [`RecordLog`]; the iterator does not hold the directory lock itself.
pub struct RecordIter<'log> {
    dir: PathBuf,
    /// Snapshot of segment ids at the time `scan_iter` was called, sorted
    /// ascending.
    ids: Vec<u32>,
    /// Index into `ids` of the segment currently being decoded.
    cur_idx: usize,
    /// Bytes of the current segment, fully loaded into memory.
    cur_buf: Vec<u8>,
    /// Logical decode cursor within `cur_buf`.
    cur_offset: u64,
    /// Id of the current segment (mirrors `ids[cur_idx]`, cached for
    /// `Record::segment` without re-indexing).
    cur_id: u32,
    /// Whether the current segment has been loaded yet (cleared whenever
    /// we advance to a new segment).
    cur_loaded: bool,
    /// Accumulated recovery state for [`RecordIter::report`].
    report: ScanInternal,
    /// Set to `true` once iteration has yielded `None` or a hard error.
    /// Subsequent calls to `next()` always return `None`.
    done: bool,
    /// Borrow tag tying this iterator to its parent log.
    _log: std::marker::PhantomData<&'log RecordLog>,
}

impl<'log> RecordIter<'log> {
    /// Crate-internal constructor.
    ///
    /// Used by [`RecordLog::scan_iter`] and by
    /// [`crate::RecordLogReader::scan_iter`]. The `'log` lifetime is whatever
    /// borrow the caller is willing to tie the iterator to (the parent log
    /// in one case, the reader handle in the other).
    pub(crate) fn new(dir: &Path, ids: Vec<u32>) -> Self {
        Self {
            dir: dir.to_path_buf(),
            ids,
            cur_idx: 0,
            cur_buf: Vec::new(),
            cur_offset: 0,
            cur_id: 0,
            cur_loaded: false,
            report: ScanInternal {
                records: Vec::new(),
                records_replayed: 0,
                files_scanned: 0,
                last_txid_seen: 0,
                tail_truncated: 0,
                tail_bytes_discarded: 0,
                last_segment_logical_end: None,
            },
            done: false,
            _log: std::marker::PhantomData,
        }
    }

    /// Return the accumulated recovery report.
    ///
    /// The report is **complete only after the iterator has been fully
    /// consumed** (i.e. once `next()` has returned `None`). While
    /// iteration is in progress this returns a partial snapshot:
    ///
    /// - `records_replayed` counts only records the iterator has yielded
    ///   successfully so far.
    /// - `tail_truncated` and `tail_bytes_discarded` are populated only
    ///   when the iterator has reached and finished processing the last
    ///   segment.
    /// - `files_scanned` increments as each segment is loaded; if
    ///   iteration is dropped mid-segment, that segment is still counted.
    pub fn recovery_report(&self) -> RecoveryReport {
        self.report.clone().into_public()
    }

    /// Load `ids[cur_idx]` into `cur_buf` if not already loaded.
    fn ensure_loaded(&mut self) -> Result<()> {
        if self.cur_loaded {
            return Ok(());
        }
        let id = self.ids[self.cur_idx];
        let path = segment_path(&self.dir, id);
        let mut f = File::open(&path)
            .with_context(|| format!("datawal: open segment {}", path.display()))?;
        self.cur_buf.clear();
        f.read_to_end(&mut self.cur_buf)
            .with_context(|| format!("datawal: read_to_end {}", path.display()))?;
        self.cur_id = id;
        self.cur_offset = 0;
        self.cur_loaded = true;
        self.report.files_scanned += 1;
        Ok(())
    }

    /// Try to decode one more record from the current state. Returns:
    /// - `Ok(Some(record))` — yielded a record, more may follow.
    /// - `Ok(None)` — current segment is done; caller should advance.
    /// - `Err(_)` — hard error; iterator must terminate.
    fn try_next_in_segment(&mut self) -> Result<Option<Record>> {
        let id = self.cur_id;
        let is_last = self.cur_idx + 1 == self.ids.len();
        let file_len = self.cur_buf.len() as u64;

        if self.cur_offset == file_len {
            self.report.last_segment_logical_end = Some((id, self.cur_offset));
            return Ok(None);
        }

        match decode_next(&self.cur_buf, self.cur_offset) {
            Ok(DecodeOutcome::Ok {
                record_type,
                txid,
                key,
                payload,
                bytes_consumed,
            }) => {
                let len = bytes_consumed;
                let offset = self.cur_offset;
                self.cur_offset += bytes_consumed as u64;
                if txid > self.report.last_txid_seen {
                    self.report.last_txid_seen = txid;
                }
                // Track records_replayed by counting yielded records: we
                // do not store them in `self.report.records` (that vector
                // is only used by the eager `scan_all` path).
                Ok(Some(Record {
                    record_type,
                    txid,
                    key,
                    payload,
                    segment: id,
                    offset,
                    len,
                }))
            }
            Ok(DecodeOutcome::Truncated { .. }) => {
                if is_last {
                    let discarded = file_len - self.cur_offset;
                    self.report.tail_truncated += 1;
                    self.report.tail_bytes_discarded += discarded;
                    self.report.last_segment_logical_end = Some((id, self.cur_offset));
                    Ok(None)
                } else {
                    let path = segment_path(&self.dir, id);
                    Err(anyhow!(
                        "datawal: truncated record at offset {} of non-tail segment {} ({}); refusing to silently drop data",
                        self.cur_offset,
                        id,
                        path.display()
                    ))
                }
            }
            Ok(DecodeOutcome::CrcMismatch { bytes_consumed }) => {
                if is_last {
                    let discarded = file_len - self.cur_offset;
                    self.report.tail_truncated += 1;
                    self.report.tail_bytes_discarded += discarded;
                    self.report.last_segment_logical_end = Some((id, self.cur_offset));
                    let _ = bytes_consumed;
                    Ok(None)
                } else {
                    let path = segment_path(&self.dir, id);
                    Err(anyhow!(
                        "datawal: CRC mismatch at offset {} of non-tail segment {} ({})",
                        self.cur_offset,
                        id,
                        path.display()
                    ))
                }
            }
            Err(err) => {
                let _: DecodeError = err;
                let path = segment_path(&self.dir, id);
                Err(anyhow!(
                    "datawal: structural decode error at offset {} of segment {} ({}): {}",
                    self.cur_offset,
                    id,
                    path.display(),
                    err
                ))
            }
        }
    }
}

impl Iterator for RecordIter<'_> {
    type Item = Result<Record>;

    fn next(&mut self) -> Option<Self::Item> {
        if self.done {
            return None;
        }
        loop {
            if self.cur_idx >= self.ids.len() {
                self.done = true;
                return None;
            }
            if let Err(e) = self.ensure_loaded() {
                self.done = true;
                return Some(Err(e));
            }
            match self.try_next_in_segment() {
                Ok(Some(rec)) => {
                    self.report.records_replayed += 1;
                    return Some(Ok(rec));
                }
                Ok(None) => {
                    // Current segment exhausted (cleanly, or tail-truncated
                    // on the last segment). Advance to the next.
                    self.cur_idx += 1;
                    self.cur_loaded = false;
                    self.cur_buf.clear();
                    continue;
                }
                Err(e) => {
                    self.done = true;
                    return Some(Err(e));
                }
            }
        }
    }
}

#[allow(dead_code)]
const _ASSERT_HEADER: () = {
    // Document the wire constants compile-time so the doc-comment cannot drift.
    let _ = HEADER_LEN;
    let _ = MAX_KEY_LEN;
    let _ = MAX_PAYLOAD_LEN;
};