Skip to main content

reddb_server/storage/wal/
writer.rs

1use super::record::WalRecord;
2use std::fs::{File, OpenOptions};
3use std::io::{self, BufWriter, Seek, SeekFrom, Write};
4use std::path::Path;
5use std::sync::Arc;
6
7/// User-space buffer size for the WAL writer.
8///
9/// Chosen so that ~5 000 small records (Begin/Commit ≈ 21 bytes,
10/// small PageWrite ≈ 34 bytes) coalesce into a single `write` syscall
11/// before the next `sync()` drains the buffer. Tunable; reflects the
12/// postgres XLOG block size (8 KiB) scaled up because we batch
13/// record-level rather than page-level.
14const WAL_BUFFER_BYTES: usize = 64 * 1024;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17enum WalSyncMethod {
18    Data,
19    All,
20}
21
22pub(crate) struct WalGroupSync {
23    target_lsn: u64,
24    sync_handle: Arc<File>,
25    method: WalSyncMethod,
26}
27
28impl WalGroupSync {
29    pub(crate) fn target_lsn(&self) -> u64 {
30        self.target_lsn
31    }
32
33    pub(crate) fn sync(&self) -> io::Result<()> {
34        match self.method {
35            WalSyncMethod::Data => self.sync_handle.sync_data(),
36            WalSyncMethod::All => self.sync_handle.sync_all(),
37        }
38    }
39}
40
41/// Reserve disk blocks for `[offset, offset + len)` **without** growing the
42/// file's logical length (`FALLOC_FL_KEEP_SIZE`).
43///
44/// Pinning `i_size` is the whole trick that makes preallocation invisible to
45/// crash recovery: the WAL's logical end stays equal to its real data length,
46/// so [`WalReader`](super::reader::WalReader)'s EOF scan never walks into a
47/// zero-filled reserved tail (a `0x00` type byte would otherwise decode to an
48/// "Invalid record type" error and abort recovery). This is why we cannot use
49/// `fs2::allocate` here — it calls `posix_fallocate`, which *extends* `i_size`.
50///
51/// Linux-only; other targets return [`io::ErrorKind::Unsupported`] so the
52/// caller disables the optimization silently.
53#[cfg(target_os = "linux")]
54fn reserve_wal_blocks(file: &File, offset: u64, len: u64) -> io::Result<()> {
55    use std::os::unix::io::AsRawFd;
56    if len == 0 {
57        return Ok(());
58    }
59    // SAFETY: `file` owns a valid fd for the duration of the call; fallocate
60    // only mutates block reservations for that fd, never process memory.
61    let ret = unsafe {
62        libc::fallocate(
63            file.as_raw_fd(),
64            libc::FALLOC_FL_KEEP_SIZE,
65            offset as libc::off_t,
66            len as libc::off_t,
67        )
68    };
69    if ret == 0 {
70        Ok(())
71    } else {
72        Err(io::Error::last_os_error())
73    }
74}
75
76#[cfg(not(target_os = "linux"))]
77fn reserve_wal_blocks(_file: &File, _offset: u64, _len: u64) -> io::Result<()> {
78    Err(io::Error::new(
79        io::ErrorKind::Unsupported,
80        "WAL preallocation is only implemented on linux",
81    ))
82}
83
84/// Whether a `fallocate` failure means "this filesystem can't preallocate"
85/// (tmpfs, overlayfs, many network filesystems) rather than a real I/O error.
86/// Those are soft failures that flip the feature off; anything else is left to
87/// the normal write path to surface (e.g. a genuine `ENOSPC`).
88fn fallocate_unsupported(err: &io::Error) -> bool {
89    if err.kind() == io::ErrorKind::Unsupported {
90        return true;
91    }
92    #[cfg(target_os = "linux")]
93    {
94        matches!(
95            err.raw_os_error(),
96            Some(libc::EOPNOTSUPP) | Some(libc::ENOSYS) | Some(libc::EINVAL)
97        )
98    }
99    #[cfg(not(target_os = "linux"))]
100    {
101        false
102    }
103}
104
105/// Writer for the Write-Ahead Log
106///
107/// Wraps the underlying file in a [`BufWriter`] so each `append` does
108/// not pay a write syscall — bytes accumulate in a 64 KiB user-space
109/// buffer until `sync()` (or `flush_until()`) drains them and then
110/// calls `sync_data()`/`sync_all()` on the raw file. This is how postgres turns
111/// per-record append cost from ~500 ns down to ~5 ns; reddb's previous
112/// per-append `write_all` directly to the file paid the syscall on
113/// every record.
114///
115/// **Critical contract:** every code path that syncs the underlying
116/// file *must* drain the [`BufWriter`] first via
117/// `BufWriter::flush()`. Otherwise the bytes in user-space never reach
118/// the kernel before fsync, and durability is silently broken.
119pub struct WalWriter {
120    file: BufWriter<File>,
121    /// Cloned file descriptor for `sync_all()` outside the writer
122    /// mutex. Both this and `file`'s inner `File` point at the same
123    /// kernel inode; calling `sync_all()` on either flushes ALL
124    /// pending bytes for that inode. This is the trick that lets
125    /// the group-commit leader release the WAL writer lock during
126    /// the expensive fsync — see [`WalWriter::drain_for_group_sync`].
127    ///
128    /// Without this clone, a leader holding the writer mutex during
129    /// `sync_all()` blocks every other writer from appending,
130    /// defeating the entire purpose of group commit.
131    sync_handle: Arc<File>,
132    /// Log Sequence Number — byte offset of the next record. Advances
133    /// every `append`; survives across restarts via `seek(End)`.
134    current_lsn: u64,
135    /// Highest LSN that has been `sync_all()`'d to disk. The WAL-first
136    /// flush invariant relies on this: a page with `header.lsn = L` may
137    /// only be written to its data file once `durable_lsn >= L`.
138    /// See `src/storage/cache/README.md` § Invariant 2 and the Target 3
139    /// section of `PLAN.md`.
140    durable_lsn: u64,
141    /// WAL byte frontier covered by the last full file sync. Appends that stay
142    /// inside this synced preallocation range can use `sync_data()`; crossing
143    /// it, or syncing after fresh preallocation metadata, falls back to
144    /// `sync_all()`.
145    last_synced_size: u64,
146    /// Exclusive byte offset up to which disk blocks are pre-reserved via
147    /// `fallocate(FALLOC_FL_KEEP_SIZE)`. Advances one
148    /// [`reddb_file::MAIN_WAL_SEGMENT_BYTES`]
149    /// segment at a time as `current_lsn` approaches it (issue #893). Reset to
150    /// `0` on [`truncate`](Self::truncate) — which frees the blocks — and
151    /// immediately re-extended (the checkpoint re-extend path).
152    preallocated_to: u64,
153    /// Cleared the first time `fallocate` reports the backing filesystem can't
154    /// preallocate (tmpfs/overlay/NFS → `EOPNOTSUPP`/`ENOSYS`, or any non-Linux
155    /// target) so we stop issuing syscalls that will always fail. Preallocation
156    /// is a best-effort optimization; clearing this never affects correctness.
157    prealloc_supported: bool,
158    /// Set when `fallocate(FALLOC_FL_KEEP_SIZE)` successfully reserved a new
159    /// range and that allocation metadata has not yet been covered by a full
160    /// sync.
161    prealloc_metadata_dirty: bool,
162    #[cfg(test)]
163    last_sync_method: Option<WalSyncMethod>,
164}
165
166impl WalWriter {
167    /// Open a WAL file for writing. Creates it if it doesn't exist.
168    pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
169        let exists = path.as_ref().exists();
170
171        // We do all initial bookkeeping (write header, seek to EOF) on
172        // the raw `File` BEFORE wrapping in a BufWriter so we don't
173        // have to worry about flush ordering during construction.
174        let mut raw = OpenOptions::new()
175            .read(true)
176            .create(true)
177            .append(true)
178            .open(path)?;
179
180        let current_lsn = if !exists || raw.metadata()?.len() == 0 {
181            raw.write_all(&reddb_file::encode_wal_file_header())?;
182            raw.sync_all()?;
183            reddb_file::WAL_FILE_HEADER_BYTES as u64
184        } else {
185            // Existing file, set LSN to current end. Append-mode files
186            // ignore this seek for *writes*, but we use the returned
187            // position as our LSN counter.
188            raw.seek(SeekFrom::End(0))?
189        };
190
191        // Clone the file handle BEFORE wrapping in BufWriter. The
192        // clone shares the same kernel file description, so
193        // sync_all() on either descriptor flushes the whole inode.
194        // The BufWriter owns the original; the Arc<File> is shared
195        // with the group-commit leader.
196        let sync_handle = Arc::new(raw.try_clone()?);
197        let file = BufWriter::with_capacity(WAL_BUFFER_BYTES, raw);
198
199        // On open, every byte already on disk is by definition durable
200        // (any pre-crash unflushed tail was lost when the OS dropped
201        // page cache). Initialise `durable_lsn` to `current_lsn`.
202        let mut writer = Self {
203            file,
204            sync_handle,
205            current_lsn,
206            durable_lsn: current_lsn,
207            last_synced_size: current_lsn,
208            preallocated_to: 0,
209            prealloc_supported: true,
210            prealloc_metadata_dirty: false,
211            #[cfg(test)]
212            last_sync_method: None,
213        };
214        // Reserve the first segment up front so the very first appends land in
215        // contiguous extents rather than growing the file page-by-page.
216        writer.ensure_preallocated()?;
217        Ok(writer)
218    }
219
220    /// Ensure disk blocks are reserved at least up to the next segment
221    /// boundary above the current write frontier (`current_lsn`).
222    ///
223    /// Cheap (pure arithmetic) until the frontier crosses a
224    /// [`reddb_file::MAIN_WAL_SEGMENT_BYTES`] boundary, at which point it issues a single
225    /// `fallocate`. Best-effort: a filesystem that can't preallocate disables
226    /// the feature; a transient error is swallowed so a write never fails
227    /// because preallocation hiccuped (the write path surfaces a genuine
228    /// `ENOSPC` on its own). Never grows the file's logical length, so it is
229    /// invisible to crash recovery.
230    fn ensure_preallocated(&mut self) -> io::Result<()> {
231        if !self.prealloc_supported {
232            return Ok(());
233        }
234        let target = reddb_file::next_main_wal_segment_boundary(self.current_lsn);
235        if target <= self.preallocated_to {
236            return Ok(());
237        }
238        let from = self.preallocated_to;
239        match reserve_wal_blocks(self.file.get_ref(), from, target - from) {
240            Ok(()) => {
241                self.preallocated_to = target;
242                self.prealloc_metadata_dirty = true;
243            }
244            Err(ref e) if fallocate_unsupported(e) => self.prealloc_supported = false,
245            Err(_) => {
246                // Best-effort: leave `preallocated_to` as-is and retry at the
247                // next boundary. Never propagate.
248            }
249        }
250        Ok(())
251    }
252
253    /// Append a record to the WAL.
254    ///
255    /// Bytes go into the BufWriter — they are NOT durable on disk
256    /// after this call returns. Callers that need durability must
257    /// follow up with [`WalWriter::sync`] or
258    /// [`WalWriter::flush_until`].
259    ///
260    /// Returns the LSN (Log Sequence Number) of the record.
261    pub fn append(&mut self, record: &WalRecord) -> io::Result<u64> {
262        let bytes = record.encode();
263        self.file.write_all(&bytes)?;
264
265        let record_lsn = self.current_lsn;
266        self.current_lsn += bytes.len() as u64;
267
268        self.ensure_preallocated()?;
269        Ok(record_lsn)
270    }
271
272    /// Write already-encoded bytes and advance the LSN counter to
273    /// match. Used by the lock-free append path: writers encode +
274    /// atomically reserve an LSN range outside this writer, the
275    /// group-commit coordinator drains the pending queue in LSN
276    /// order, then calls `append_bytes` for each batch.
277    ///
278    /// The bytes MUST be a valid `WalRecord::encode()` payload (or a
279    /// concatenation of such) — no structural validation happens
280    /// here. The caller is responsible for keeping the on-disk
281    /// byte offset synchronised with the externally-tracked LSN
282    /// counter; this method just appends and advances.
283    pub fn append_bytes(&mut self, bytes: &[u8]) -> io::Result<u64> {
284        self.file.write_all(bytes)?;
285        let record_lsn = self.current_lsn;
286        self.current_lsn += bytes.len() as u64;
287        self.ensure_preallocated()?;
288        Ok(record_lsn)
289    }
290
291    /// Rewind the writer's LSN counter to a specific value. Used
292    /// by the lock-free append path to resync the writer with the
293    /// externally-tracked `next_lsn` after a drain batch; the
294    /// coordinator knows the exact byte offset it just wrote to
295    /// and needs `current_lsn` to match so subsequent direct
296    /// callers of `append` stay consistent.
297    pub fn set_current_lsn(&mut self, lsn: u64) {
298        self.current_lsn = lsn;
299    }
300
301    /// Force sync to disk.
302    ///
303    /// Drains the user-space [`BufWriter`] first, then calls
304    /// `sync_all()` on the underlying file so every byte appended
305    /// since the last sync is durable. Updates `durable_lsn` so
306    /// subsequent `flush_until` calls become no-ops up to
307    /// `current_lsn`.
308    pub fn sync(&mut self) -> io::Result<()> {
309        self.file.flush()?;
310        self.sync_flushed_file()?;
311        self.durable_lsn = self.current_lsn;
312        Ok(())
313    }
314
315    /// Ensure the WAL is durable on disk at least up to byte offset
316    /// `target`. No-op when `target <= durable_lsn`.
317    ///
318    /// This is the postgres `XLogFlush(LSN)` analogue. Pager flush
319    /// paths call this with `max(dirty.header.lsn)` before writing
320    /// any data page so the WAL record describing the change is
321    /// guaranteed to be on disk before the page itself.
322    pub fn flush_until(&mut self, target: u64) -> io::Result<()> {
323        if self.durable_lsn >= target {
324            return Ok(());
325        }
326        self.file.flush()?;
327        self.sync_flushed_file()?;
328        self.durable_lsn = self.current_lsn;
329        Ok(())
330    }
331
332    fn sync_flushed_file(&mut self) -> io::Result<()> {
333        let method = self.next_sync_method();
334        match method {
335            WalSyncMethod::Data => self.file.get_ref().sync_data()?,
336            WalSyncMethod::All => self.file.get_ref().sync_all()?,
337        }
338        self.mark_sync_complete(method, self.current_lsn);
339        Ok(())
340    }
341
342    fn next_sync_method(&self) -> WalSyncMethod {
343        if !self.prealloc_metadata_dirty && self.current_lsn <= self.last_synced_size {
344            WalSyncMethod::Data
345        } else {
346            WalSyncMethod::All
347        }
348    }
349
350    fn mark_sync_complete(&mut self, method: WalSyncMethod, lsn: u64) {
351        match method {
352            WalSyncMethod::Data => {}
353            WalSyncMethod::All => {
354                self.last_synced_size = self.preallocated_to.max(lsn);
355                self.prealloc_metadata_dirty = false;
356            }
357        }
358        #[cfg(test)]
359        {
360            self.last_sync_method = Some(method);
361        }
362    }
363
364    /// Highest byte offset that is durable on disk. Used by the pager
365    /// to decide whether a `flush_until` call would actually need a
366    /// `fsync`.
367    pub fn durable_lsn(&self) -> u64 {
368        self.durable_lsn
369    }
370
371    /// Get current LSN (end of file offset)
372    pub fn current_lsn(&self) -> u64 {
373        self.current_lsn
374    }
375
376    /// Drain the BufWriter into the kernel and return the captured
377    /// LSN plus a cloned file handle and sync method for the caller
378    /// **without holding the WAL writer mutex**.
379    ///
380    /// Used by the group-commit leader path. The flow is:
381    ///
382    /// 1. Take the WAL writer mutex.
383    /// 2. Call this method — drains user-space buffer to the kernel
384    ///    and captures a size-aware sync plan.
385    /// 3. Release the WAL writer mutex.
386    /// 4. Execute the sync plan — this is the expensive ~100 µs syscall,
387    ///    and other writers can keep appending while it runs.
388    /// 5. Take the WAL writer mutex briefly and call
389    ///    [`WalWriter::mark_durable`] to publish the new durable position.
390    ///
391    /// The cloned `sync_handle` shares the same kernel inode with
392    /// the writer's `file`, so syncing the clone flushes bytes that
393    /// have reached the kernel for that file.
394    /// This is the coalescing window that makes group commit win.
395    pub(crate) fn drain_for_group_sync(&mut self) -> io::Result<WalGroupSync> {
396        // Drain user-space buffer into the kernel.
397        self.file.flush()?;
398        Ok(WalGroupSync {
399            target_lsn: self.current_lsn,
400            sync_handle: Arc::clone(&self.sync_handle),
401            method: self.next_sync_method(),
402        })
403    }
404
405    /// Manually advance `durable_lsn` after a successful out-of-lock
406    /// sync performed via [`WalWriter::drain_for_group_sync`].
407    ///
408    /// Monotonic — never lowers `durable_lsn`. Safe to call with a
409    /// stale `lsn`; just becomes a no-op.
410    pub(crate) fn mark_durable(&mut self, sync: &WalGroupSync) {
411        let lsn = sync.target_lsn;
412        if lsn > self.durable_lsn {
413            self.durable_lsn = lsn;
414        }
415        self.mark_sync_complete(sync.method, lsn);
416    }
417
418    /// Truncate the WAL (usually after checkpoint).
419    ///
420    /// Drains the BufWriter first so no pending bytes hit the file
421    /// after the truncate. Then resets the underlying file, rewrites
422    /// the header through the buffered writer (header is small; the
423    /// followup `flush + sync_all` makes it durable), and resets
424    /// LSN bookkeeping.
425    pub fn truncate(&mut self) -> io::Result<()> {
426        // Drop any pending bytes BEFORE the truncate; otherwise the
427        // BufWriter would flush them to a re-shrunken file in
428        // confused order.
429        self.file.flush()?;
430
431        {
432            let raw = self.file.get_mut();
433            raw.set_len(0)?;
434            raw.seek(SeekFrom::Start(0))?;
435        }
436
437        // Rewrite header through the BufWriter then drain.
438        self.file.write_all(&reddb_file::encode_wal_file_header())?;
439        self.file.flush()?;
440        self.file.get_ref().sync_all()?;
441
442        self.current_lsn = reddb_file::WAL_FILE_HEADER_BYTES as u64;
443        self.durable_lsn = reddb_file::WAL_FILE_HEADER_BYTES as u64;
444        self.last_synced_size = reddb_file::WAL_FILE_HEADER_BYTES as u64;
445        self.prealloc_metadata_dirty = false;
446        #[cfg(test)]
447        {
448            self.last_sync_method = Some(WalSyncMethod::All);
449        }
450
451        // `set_len(0)` freed every reserved block, so the WAL would otherwise
452        // grow page-by-page again from here. Re-extend a fresh segment now —
453        // this is the "truncate/re-extend on checkpoint" half of issue #893.
454        self.preallocated_to = 0;
455        self.ensure_preallocated()?;
456        Ok(())
457    }
458}
459
460#[cfg(test)]
461mod tests {
462    use super::*;
463    use std::path::PathBuf;
464
465    struct FileGuard {
466        path: PathBuf,
467    }
468
469    impl Drop for FileGuard {
470        fn drop(&mut self) {
471            let _ = std::fs::remove_file(&self.path);
472        }
473    }
474
475    fn temp_wal(name: &str) -> (FileGuard, PathBuf) {
476        let path = reddb_file::layout::wal_component_temp_path(
477            &std::env::temp_dir(),
478            "writer",
479            name,
480            std::process::id(),
481        );
482        let guard = FileGuard { path: path.clone() };
483        let _ = std::fs::remove_file(&path);
484        (guard, path)
485    }
486
487    #[test]
488    fn test_create_new_wal() {
489        let (_guard, path) = temp_wal("create");
490        let writer = WalWriter::open(&path).expect("open() should succeed");
491
492        // Should start at LSN 8 (after 8-byte header)
493        assert_eq!(writer.current_lsn(), 8);
494        assert!(path.exists());
495    }
496
497    #[test]
498    fn test_append_record() {
499        let (_guard, path) = temp_wal("append");
500        let mut writer = WalWriter::open(&path).expect("open() should succeed");
501
502        let record = WalRecord::Begin { tx_id: 42 };
503        let lsn = writer.append(&record).expect("append() should succeed");
504
505        // First record starts at LSN 8
506        assert_eq!(lsn, 8);
507
508        // Next record should start after encoded size
509        // Begin record: 1 (type) + 8 (term) + 8 (tx_id) + 4 (checksum) = 21 bytes
510        assert_eq!(writer.current_lsn(), 8 + 21);
511    }
512
513    #[test]
514    fn test_append_multiple_records() {
515        let (_guard, path) = temp_wal("multi");
516        let mut writer = WalWriter::open(&path).expect("open() should succeed");
517
518        let lsn1 = writer
519            .append(&WalRecord::Begin { tx_id: 1 })
520            .expect("append() should succeed");
521        let lsn2 = writer
522            .append(&WalRecord::Begin { tx_id: 2 })
523            .expect("append() should succeed");
524        let lsn3 = writer
525            .append(&WalRecord::Commit { tx_id: 1 })
526            .expect("append() should succeed");
527
528        assert_eq!(lsn1, 8);
529        assert_eq!(lsn2, 8 + 21);
530        assert_eq!(lsn3, 8 + 21 + 21);
531    }
532
533    #[test]
534    fn test_page_write_lsn() {
535        let (_guard, path) = temp_wal("pagewrite");
536        let mut writer = WalWriter::open(&path).expect("open() should succeed");
537
538        // First record
539        let lsn1 = writer
540            .append(&WalRecord::Begin { tx_id: 1 })
541            .expect("append() should succeed");
542        assert_eq!(lsn1, 8);
543
544        // PageWrite record: 1 + 8 + 4 + 4 + data_len + 4 = 21 + data_len
545        let data = vec![1, 2, 3, 4, 5];
546        let lsn2 = writer
547            .append(&WalRecord::PageWrite {
548                tx_id: 1,
549                page_id: 100,
550                data: data.clone(),
551            })
552            .expect("value is present");
553
554        assert_eq!(lsn2, 8 + 21); // after Begin
555
556        // Next LSN = lsn2 + (1 + 8 + 8 + 4 + 4 + 5 + 4) = lsn2 + 34
557        assert_eq!(writer.current_lsn(), 8 + 21 + 34);
558    }
559
560    #[test]
561    fn test_sync() {
562        let (_guard, path) = temp_wal("sync");
563        let mut writer = WalWriter::open(&path).expect("open() should succeed");
564
565        writer
566            .append(&WalRecord::Begin { tx_id: 1 })
567            .expect("append() should succeed");
568        writer.sync().expect("sync() should succeed");
569
570        // File should be synced, just verify no error
571        assert!(path.exists());
572    }
573
574    #[test]
575    fn test_truncate() {
576        let (_guard, path) = temp_wal("truncate");
577        let mut writer = WalWriter::open(&path).expect("open() should succeed");
578
579        // Write some records
580        writer
581            .append(&WalRecord::Begin { tx_id: 1 })
582            .expect("append() should succeed");
583        writer
584            .append(&WalRecord::PageWrite {
585                tx_id: 1,
586                page_id: 0,
587                data: vec![0; 100],
588            })
589            .expect("value is present");
590        writer
591            .append(&WalRecord::Commit { tx_id: 1 })
592            .expect("append() should succeed");
593
594        let lsn_before = writer.current_lsn();
595        assert!(lsn_before > 8);
596
597        // Truncate
598        writer.truncate().expect("truncate() should succeed");
599
600        // LSN should be back to 8
601        assert_eq!(writer.current_lsn(), 8);
602
603        // File should be 8 bytes (just header)
604        let len = std::fs::metadata(&path)
605            .expect("metadata() should succeed")
606            .len();
607        assert_eq!(len, 8);
608    }
609
610    #[test]
611    fn test_reopen_existing() {
612        let (_guard, path) = temp_wal("reopen");
613
614        // Create and write
615        let lsn_after_write;
616        {
617            let mut writer = WalWriter::open(&path).expect("open() should succeed");
618            writer
619                .append(&WalRecord::Begin { tx_id: 1 })
620                .expect("append() should succeed");
621            writer
622                .append(&WalRecord::Commit { tx_id: 1 })
623                .expect("append() should succeed");
624            lsn_after_write = writer.current_lsn();
625        }
626
627        // Reopen
628        {
629            let writer = WalWriter::open(&path).expect("open() should succeed");
630            // Should continue from where we left off
631            assert_eq!(writer.current_lsn(), lsn_after_write);
632        }
633    }
634
635    #[test]
636    fn test_checkpoint_record() {
637        let (_guard, path) = temp_wal("checkpoint");
638        let mut writer = WalWriter::open(&path).expect("open() should succeed");
639
640        // Checkpoint is same size as Begin (1 + 8 + 8 + 4 = 21)
641        let lsn = writer
642            .append(&WalRecord::Checkpoint { lsn: 12345 })
643            .expect("value is present");
644        assert_eq!(lsn, 8);
645        assert_eq!(writer.current_lsn(), 8 + 21);
646    }
647
648    // -----------------------------------------------------------------
649    // Target 3: durable_lsn / flush_until tests
650    // -----------------------------------------------------------------
651
652    #[test]
653    fn fresh_wal_has_durable_lsn_at_header_end() {
654        let (_guard, path) = temp_wal("durable_init");
655        let writer = WalWriter::open(&path).expect("open() should succeed");
656        assert_eq!(writer.durable_lsn(), 8);
657        assert_eq!(writer.current_lsn(), 8);
658    }
659
660    #[test]
661    fn flush_until_below_durable_is_noop() {
662        let (_guard, path) = temp_wal("flush_noop");
663        let mut writer = WalWriter::open(&path).expect("open() should succeed");
664        // After open, durable_lsn == 8.
665        let before = writer.durable_lsn();
666        writer.flush_until(0).expect("flush_until() should succeed");
667        writer.flush_until(8).expect("flush_until() should succeed");
668        assert_eq!(writer.durable_lsn(), before);
669    }
670
671    #[test]
672    fn flush_until_advances_durable_to_current() {
673        let (_guard, path) = temp_wal("flush_advance");
674        let mut writer = WalWriter::open(&path).expect("open() should succeed");
675        writer
676            .append(&WalRecord::Begin { tx_id: 7 })
677            .expect("append() should succeed");
678        writer
679            .append(&WalRecord::Commit { tx_id: 7 })
680            .expect("append() should succeed");
681        let target = writer.current_lsn();
682        // Before flush_until, durable still at the header.
683        assert_eq!(writer.durable_lsn(), 8);
684        writer
685            .flush_until(target)
686            .expect("flush_until() should succeed");
687        assert_eq!(writer.durable_lsn(), target);
688    }
689
690    #[test]
691    fn flush_until_is_monotonic() {
692        let (_guard, path) = temp_wal("flush_monotonic");
693        let mut writer = WalWriter::open(&path).expect("open() should succeed");
694        writer
695            .append(&WalRecord::Begin { tx_id: 1 })
696            .expect("append() should succeed");
697        let lo = writer.current_lsn();
698        writer
699            .flush_until(lo)
700            .expect("flush_until() should succeed");
701        let durable_after_lo = writer.durable_lsn();
702        writer
703            .append(&WalRecord::Commit { tx_id: 1 })
704            .expect("append() should succeed");
705        let hi = writer.current_lsn();
706        writer
707            .flush_until(hi)
708            .expect("flush_until() should succeed");
709        assert!(writer.durable_lsn() >= durable_after_lo);
710        // Calling flush_until(lo) after flush_until(hi) is a no-op.
711        writer
712            .flush_until(lo)
713            .expect("flush_until() should succeed");
714        assert_eq!(writer.durable_lsn(), hi);
715    }
716
717    #[test]
718    fn sync_advances_durable_lsn_too() {
719        let (_guard, path) = temp_wal("sync_durable");
720        let mut writer = WalWriter::open(&path).expect("open() should succeed");
721        writer
722            .append(&WalRecord::Begin { tx_id: 9 })
723            .expect("append() should succeed");
724        let before = writer.durable_lsn();
725        let after_append = writer.current_lsn();
726        assert!(after_append > before);
727        writer.sync().expect("sync() should succeed");
728        assert_eq!(writer.durable_lsn(), after_append);
729    }
730
731    #[test]
732    fn sync_all_is_used_when_wal_size_grew() {
733        let (_guard, path) = temp_wal("sync_all_grew");
734        let mut writer = WalWriter::open(&path).expect("open() should succeed");
735
736        writer
737            .append(&WalRecord::Begin { tx_id: 1 })
738            .expect("append() should succeed");
739        writer.sync().expect("sync() should succeed");
740
741        assert_eq!(writer.last_sync_method, Some(WalSyncMethod::All));
742        assert!(writer.last_synced_size >= writer.current_lsn());
743        assert!(!writer.prealloc_metadata_dirty);
744    }
745
746    #[test]
747    fn sync_all_is_used_for_metadata_only_preallocation() {
748        let (_guard, path) = temp_wal("sync_all_prealloc_metadata");
749        let mut writer = WalWriter::open(&path).expect("open() should succeed");
750        if !writer.prealloc_supported {
751            return;
752        }
753
754        assert_eq!(writer.current_lsn(), 8);
755        assert!(writer.prealloc_metadata_dirty);
756
757        writer.sync().expect("sync() should succeed");
758
759        assert_eq!(writer.last_sync_method, Some(WalSyncMethod::All));
760        assert_eq!(writer.last_synced_size, writer.preallocated_to);
761        assert!(!writer.prealloc_metadata_dirty);
762    }
763
764    #[test]
765    fn sync_data_is_used_when_wal_size_is_unchanged() {
766        let (_guard, path) = temp_wal("sync_data_unchanged");
767        let mut writer = WalWriter::open(&path).expect("open() should succeed");
768
769        writer
770            .append(&WalRecord::Begin { tx_id: 1 })
771            .expect("append() should succeed");
772        writer.sync().expect("sync() should succeed");
773        let synced_size = writer.last_synced_size;
774        writer.sync().expect("sync() should succeed");
775
776        assert_eq!(writer.last_sync_method, Some(WalSyncMethod::Data));
777        assert_eq!(writer.last_synced_size, synced_size);
778        assert_eq!(writer.durable_lsn(), writer.current_lsn());
779    }
780
781    #[test]
782    fn sync_data_is_used_for_appends_within_synced_preallocation() {
783        let (_guard, path) = temp_wal("sync_data_preallocated_append");
784        let mut writer = WalWriter::open(&path).expect("open() should succeed");
785        if !writer.prealloc_supported {
786            return;
787        }
788
789        writer
790            .append(&WalRecord::Begin { tx_id: 1 })
791            .expect("append() should succeed");
792        writer.sync().expect("sync() should succeed");
793        assert_eq!(writer.last_sync_method, Some(WalSyncMethod::All));
794
795        writer
796            .append(&WalRecord::Commit { tx_id: 1 })
797            .expect("append() should succeed");
798        writer.sync().expect("sync() should succeed");
799
800        assert_eq!(writer.last_sync_method, Some(WalSyncMethod::Data));
801        assert_eq!(writer.durable_lsn(), writer.current_lsn());
802        assert!(writer.current_lsn() <= writer.last_synced_size);
803    }
804
805    #[test]
806    fn group_sync_uses_sync_data_within_synced_preallocation() {
807        let (_guard, path) = temp_wal("group_sync_data_preallocated_append");
808        let mut writer = WalWriter::open(&path).expect("open() should succeed");
809        if !writer.prealloc_supported {
810            return;
811        }
812
813        writer
814            .append(&WalRecord::Begin { tx_id: 1 })
815            .expect("append() should succeed");
816        writer.sync().expect("sync() should succeed");
817        assert_eq!(writer.last_sync_method, Some(WalSyncMethod::All));
818
819        writer
820            .append(&WalRecord::Commit { tx_id: 1 })
821            .expect("append() should succeed");
822        let sync = writer
823            .drain_for_group_sync()
824            .expect("drain_for_group_sync() should succeed");
825        assert_eq!(sync.method, WalSyncMethod::Data);
826        sync.sync().expect("sync() should succeed");
827        writer.mark_durable(&sync);
828
829        assert_eq!(writer.last_sync_method, Some(WalSyncMethod::Data));
830        assert_eq!(writer.durable_lsn(), writer.current_lsn());
831    }
832
833    #[test]
834    fn truncate_resets_durable_lsn() {
835        let (_guard, path) = temp_wal("truncate_durable");
836        let mut writer = WalWriter::open(&path).expect("open() should succeed");
837        writer
838            .append(&WalRecord::Begin { tx_id: 1 })
839            .expect("append() should succeed");
840        writer.sync().expect("sync() should succeed");
841        assert!(writer.durable_lsn() > 8);
842        writer.truncate().expect("truncate() should succeed");
843        assert_eq!(writer.durable_lsn(), 8);
844        assert_eq!(writer.current_lsn(), 8);
845    }
846
847    #[test]
848    fn reopen_initialises_durable_to_current() {
849        let (_guard, path) = temp_wal("reopen_durable");
850        {
851            let mut writer = WalWriter::open(&path).expect("open() should succeed");
852            writer
853                .append(&WalRecord::Begin { tx_id: 1 })
854                .expect("append() should succeed");
855            writer.sync().expect("sync() should succeed");
856        }
857        let writer = WalWriter::open(&path).expect("open() should succeed");
858        // After reopen, every byte on disk is durable by definition.
859        assert_eq!(writer.durable_lsn(), writer.current_lsn());
860    }
861
862    // -----------------------------------------------------------------
863    // Perf 1.1: BufWriter coalesces small appends until sync
864    // -----------------------------------------------------------------
865
866    #[test]
867    fn bufwriter_coalesces_until_sync() {
868        // Append 100 small records but DO NOT sync. The on-disk file
869        // size must still equal the header (8 bytes) because the
870        // bytes are sitting in the BufWriter, not in the kernel.
871        let (_guard, path) = temp_wal("bufwriter_coalesce");
872        let mut writer = WalWriter::open(&path).expect("open() should succeed");
873        for tx in 0..100u64 {
874            writer
875                .append(&WalRecord::Begin { tx_id: tx })
876                .expect("append() should succeed");
877        }
878        // current_lsn reflects the in-buffer position.
879        assert_eq!(writer.current_lsn(), 8 + 100 * 21);
880        // But the file on disk only has the header.
881        let on_disk = std::fs::metadata(&path)
882            .expect("metadata() should succeed")
883            .len();
884        assert_eq!(on_disk, 8, "BufWriter leaked bytes to disk before sync");
885    }
886
887    #[test]
888    fn sync_drains_bufwriter_before_fsync() {
889        // After sync(), the file size must equal current_lsn — the
890        // BufWriter has been flushed and sync_all has hit the kernel.
891        let (_guard, path) = temp_wal("sync_drains");
892        let mut writer = WalWriter::open(&path).expect("open() should succeed");
893        for tx in 0..50u64 {
894            writer
895                .append(&WalRecord::Begin { tx_id: tx })
896                .expect("append() should succeed");
897        }
898        writer.sync().expect("sync() should succeed");
899        let on_disk = std::fs::metadata(&path)
900            .expect("metadata() should succeed")
901            .len();
902        assert_eq!(on_disk, writer.current_lsn());
903        assert_eq!(writer.durable_lsn(), writer.current_lsn());
904    }
905
906    #[test]
907    fn flush_until_drains_bufwriter_too() {
908        // flush_until must drain the BufWriter before calling
909        // sync_all on the underlying file — otherwise pending bytes
910        // never become durable.
911        let (_guard, path) = temp_wal("flush_until_drains");
912        let mut writer = WalWriter::open(&path).expect("open() should succeed");
913        for tx in 0..30u64 {
914            writer
915                .append(&WalRecord::Begin { tx_id: tx })
916                .expect("append() should succeed");
917        }
918        let target = writer.current_lsn();
919        writer
920            .flush_until(target)
921            .expect("flush_until() should succeed");
922        let on_disk = std::fs::metadata(&path)
923            .expect("metadata() should succeed")
924            .len();
925        assert_eq!(on_disk, target);
926        assert_eq!(writer.durable_lsn(), target);
927    }
928
929    #[test]
930    fn truncate_drains_pending_bufwriter_bytes_first() {
931        // If truncate did NOT drain BufWriter first, the pending bytes
932        // would either land in the post-truncate file (corrupting it
933        // with stale records) or be lost. Verify the resulting file
934        // contains only a fresh header.
935        let (_guard, path) = temp_wal("truncate_drain");
936        let mut writer = WalWriter::open(&path).expect("open() should succeed");
937        // Write enough small records to fill some of the 64 KiB buffer
938        // but stay below the auto-flush threshold.
939        for tx in 0..200u64 {
940            writer
941                .append(&WalRecord::Begin { tx_id: tx })
942                .expect("append() should succeed");
943        }
944        // Sanity: bytes are buffered.
945        assert_eq!(
946            std::fs::metadata(&path)
947                .expect("metadata() should succeed")
948                .len(),
949            8
950        );
951
952        writer.truncate().expect("truncate() should succeed");
953        // After truncate the file is just the header again.
954        let on_disk = std::fs::metadata(&path)
955            .expect("metadata() should succeed")
956            .len();
957        assert_eq!(on_disk, 8);
958        assert_eq!(writer.current_lsn(), 8);
959        assert_eq!(writer.durable_lsn(), 8);
960
961        // And we can append again successfully.
962        writer
963            .append(&WalRecord::Begin { tx_id: 99 })
964            .expect("append() should succeed");
965        writer.sync().expect("sync() should succeed");
966        assert_eq!(
967            std::fs::metadata(&path)
968                .expect("metadata() should succeed")
969                .len(),
970            8 + 21
971        );
972    }
973
974    #[test]
975    fn reopen_sees_only_synced_records() {
976        // Records that were appended but never sync'd must NOT
977        // survive a reopen — they lived in the BufWriter, never made
978        // it to the kernel, and the previous WalWriter went out of
979        // scope. The new WalWriter reopens the file and reads from
980        // EOF, which reflects only the bytes that hit disk.
981        //
982        // We sync some records, then drop the writer mid-buffer, and
983        // assert the reopen LSN matches only the synced prefix.
984        let (_guard, path) = temp_wal("reopen_synced_only");
985        let synced_lsn;
986        {
987            let mut writer = WalWriter::open(&path).expect("open() should succeed");
988            writer
989                .append(&WalRecord::Begin { tx_id: 1 })
990                .expect("append() should succeed");
991            writer.sync().expect("sync() should succeed");
992            synced_lsn = writer.current_lsn();
993            // These records are never sync'd before drop. Drop runs
994            // BufWriter::flush which DOES write them — see note below.
995            for tx in 100..120u64 {
996                writer
997                    .append(&WalRecord::Begin { tx_id: tx })
998                    .expect("append() should succeed");
999            }
1000            // Without a sync, the in-buffer bytes are still pending.
1001            // BufWriter's Drop impl does flush to the file but does
1002            // not call sync_all. For reopen-LSN purposes, on-disk
1003            // bytes count regardless of fsync, so the reopened LSN
1004            // will reflect the dropped writes too.
1005        }
1006        let writer = WalWriter::open(&path).expect("open() should succeed");
1007        // The reopen LSN reflects what's physically on disk after
1008        // BufWriter::Drop flushes its buffer. That may or may not
1009        // include the unsync'd records depending on platform; the
1010        // contract we care about is that durable_lsn ≥ synced_lsn.
1011        assert!(writer.durable_lsn() >= synced_lsn);
1012    }
1013
1014    // -----------------------------------------------------------------
1015    // Issue #893: fallocate-based WAL segment preallocation
1016    // -----------------------------------------------------------------
1017
1018    /// On-disk blocks reserved by `fallocate`, in bytes. Returns the
1019    /// allocated size (st_blocks × 512), independent of the logical length.
1020    fn allocated_bytes(path: &std::path::Path) -> u64 {
1021        use fs2::FileExt;
1022        let f = std::fs::File::open(path).expect("open() should succeed");
1023        f.allocated_size().expect("allocated_size() should succeed")
1024    }
1025
1026    #[test]
1027    fn segment_boundary_rounds_strictly_up() {
1028        // Always lands one boundary ahead so the reservation stays in front
1029        // of the write frontier.
1030        assert_eq!(
1031            reddb_file::next_main_wal_segment_boundary(0),
1032            reddb_file::MAIN_WAL_SEGMENT_BYTES
1033        );
1034        assert_eq!(
1035            reddb_file::next_main_wal_segment_boundary(8),
1036            reddb_file::MAIN_WAL_SEGMENT_BYTES
1037        );
1038        assert_eq!(
1039            reddb_file::next_main_wal_segment_boundary(reddb_file::MAIN_WAL_SEGMENT_BYTES - 1),
1040            reddb_file::MAIN_WAL_SEGMENT_BYTES
1041        );
1042        // Exactly on a boundary still advances to the next one.
1043        assert_eq!(
1044            reddb_file::next_main_wal_segment_boundary(reddb_file::MAIN_WAL_SEGMENT_BYTES),
1045            2 * reddb_file::MAIN_WAL_SEGMENT_BYTES
1046        );
1047        assert_eq!(
1048            reddb_file::next_main_wal_segment_boundary(reddb_file::MAIN_WAL_SEGMENT_BYTES + 1),
1049            2 * reddb_file::MAIN_WAL_SEGMENT_BYTES
1050        );
1051    }
1052
1053    #[test]
1054    fn open_preallocates_first_segment() {
1055        // A freshly opened WAL must reserve a whole segment up front instead
1056        // of growing incrementally (acceptance #1).
1057        let (_guard, path) = temp_wal("prealloc_open");
1058        let writer = WalWriter::open(&path).expect("open() should succeed");
1059        if !writer.prealloc_supported {
1060            return; // filesystem without fallocate — feature is a no-op.
1061        }
1062        assert_eq!(writer.preallocated_to, reddb_file::MAIN_WAL_SEGMENT_BYTES);
1063        // The reservation is real on disk, yet the logical file is still just
1064        // the 8-byte header.
1065        assert!(allocated_bytes(&path) >= reddb_file::MAIN_WAL_SEGMENT_BYTES);
1066        assert_eq!(
1067            std::fs::metadata(&path)
1068                .expect("metadata() should succeed")
1069                .len(),
1070            8
1071        );
1072    }
1073
1074    #[test]
1075    fn preallocation_does_not_grow_logical_length() {
1076        // The load-bearing invariant for crash recovery: appending records
1077        // must NOT inflate the logical file size beyond the real data, or the
1078        // EOF scan in WalReader would walk into the reserved tail. Holds on
1079        // every filesystem (fallocate keeps i_size pinned; absent fallocate
1080        // there is no reservation at all).
1081        let (_guard, path) = temp_wal("prealloc_logical");
1082        let mut writer = WalWriter::open(&path).expect("open() should succeed");
1083        for tx in 0..50u64 {
1084            writer
1085                .append(&WalRecord::Begin { tx_id: tx })
1086                .expect("append() should succeed");
1087        }
1088        writer.sync().expect("sync() should succeed");
1089        let logical = std::fs::metadata(&path)
1090            .expect("metadata() should succeed")
1091            .len();
1092        assert_eq!(logical, 8 + 50 * 21, "preallocation inflated i_size");
1093        assert_eq!(writer.current_lsn(), logical);
1094    }
1095
1096    #[test]
1097    fn truncate_re_extends_a_fresh_segment() {
1098        // After checkpoint truncation the WAL must re-extend rather than grow
1099        // unbounded page-by-page (acceptance #2).
1100        let (_guard, path) = temp_wal("prealloc_truncate");
1101        let mut writer = WalWriter::open(&path).expect("open() should succeed");
1102        writer
1103            .append(&WalRecord::Begin { tx_id: 1 })
1104            .expect("append() should succeed");
1105        writer.sync().expect("sync() should succeed");
1106
1107        writer.truncate().expect("truncate() should succeed");
1108
1109        assert_eq!(writer.current_lsn(), 8);
1110        assert_eq!(
1111            std::fs::metadata(&path)
1112                .expect("metadata() should succeed")
1113                .len(),
1114            8
1115        );
1116        if writer.prealloc_supported {
1117            assert_eq!(writer.preallocated_to, reddb_file::MAIN_WAL_SEGMENT_BYTES);
1118            assert!(allocated_bytes(&path) >= reddb_file::MAIN_WAL_SEGMENT_BYTES);
1119        }
1120    }
1121
1122    #[test]
1123    fn preallocated_wal_recovers_records_without_trailing_garbage() {
1124        // End-to-end: a preallocated WAL must read back exactly the records
1125        // written — the reserved (unwritten) tail must be invisible to the
1126        // reader, proving crash-recovery is unchanged (acceptance #3).
1127        use super::super::reader::WalReader;
1128        let (_guard, path) = temp_wal("prealloc_recover");
1129        {
1130            let mut writer = WalWriter::open(&path).expect("open() should succeed");
1131            writer
1132                .append(&WalRecord::Begin { tx_id: 1 })
1133                .expect("append() should succeed");
1134            writer
1135                .append(&WalRecord::PageWrite {
1136                    tx_id: 1,
1137                    page_id: 7,
1138                    data: vec![1, 2, 3, 4],
1139                })
1140                .expect("value is present");
1141            writer
1142                .append(&WalRecord::Commit { tx_id: 1 })
1143                .expect("append() should succeed");
1144            writer.sync().expect("sync() should succeed");
1145        }
1146        let records: Vec<_> = WalReader::open(&path)
1147            .expect("value is present")
1148            .iter()
1149            .collect::<Result<_, _>>()
1150            .expect("reader must stop cleanly at real EOF, not in reserved tail");
1151        assert_eq!(records.len(), 3);
1152        assert_eq!(records[0].1, WalRecord::Begin { tx_id: 1 });
1153        assert_eq!(records[2].1, WalRecord::Commit { tx_id: 1 });
1154    }
1155}