Skip to main content

reddb_server/storage/wal/
writer.rs

1use super::record::WalRecord;
2use std::fs::{File, OpenOptions};
3use std::io::{self, BufWriter, Seek, SeekFrom, Write};
4use std::path::Path;
5use std::sync::Arc;
6
7/// User-space buffer size for the WAL writer.
8///
9/// Chosen so that ~5 000 small records (Begin/Commit ≈ 21 bytes,
10/// small PageWrite ≈ 34 bytes) coalesce into a single `write` syscall
11/// before the next `sync()` drains the buffer. Tunable; reflects the
12/// postgres XLOG block size (8 KiB) scaled up because we batch
13/// record-level rather than page-level.
14const WAL_BUFFER_BYTES: usize = 64 * 1024;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17enum WalSyncMethod {
18    Data,
19    All,
20}
21
22pub(crate) struct WalGroupSync {
23    target_lsn: u64,
24    sync_handle: Arc<File>,
25    method: WalSyncMethod,
26}
27
28impl WalGroupSync {
29    pub(crate) fn target_lsn(&self) -> u64 {
30        self.target_lsn
31    }
32
33    pub(crate) fn sync(&self) -> io::Result<()> {
34        match self.method {
35            WalSyncMethod::Data => self.sync_handle.sync_data(),
36            WalSyncMethod::All => self.sync_handle.sync_all(),
37        }
38    }
39}
40
41/// Reserve disk blocks for `[offset, offset + len)` **without** growing the
42/// file's logical length (`FALLOC_FL_KEEP_SIZE`).
43///
44/// Pinning `i_size` is the whole trick that makes preallocation invisible to
45/// crash recovery: the WAL's logical end stays equal to its real data length,
46/// so [`WalReader`](super::reader::WalReader)'s EOF scan never walks into a
47/// zero-filled reserved tail (a `0x00` type byte would otherwise decode to an
48/// "Invalid record type" error and abort recovery). This is why we cannot use
49/// `fs2::allocate` here — it calls `posix_fallocate`, which *extends* `i_size`.
50///
51/// Linux-only; other targets return [`io::ErrorKind::Unsupported`] so the
52/// caller disables the optimization silently.
53#[cfg(target_os = "linux")]
54fn reserve_wal_blocks(file: &File, offset: u64, len: u64) -> io::Result<()> {
55    use std::os::unix::io::AsRawFd;
56    if len == 0 {
57        return Ok(());
58    }
59    // SAFETY: `file` owns a valid fd for the duration of the call; fallocate
60    // only mutates block reservations for that fd, never process memory.
61    let ret = unsafe {
62        libc::fallocate(
63            file.as_raw_fd(),
64            libc::FALLOC_FL_KEEP_SIZE,
65            offset as libc::off_t,
66            len as libc::off_t,
67        )
68    };
69    if ret == 0 {
70        Ok(())
71    } else {
72        Err(io::Error::last_os_error())
73    }
74}
75
76#[cfg(not(target_os = "linux"))]
77fn reserve_wal_blocks(_file: &File, _offset: u64, _len: u64) -> io::Result<()> {
78    Err(io::Error::new(
79        io::ErrorKind::Unsupported,
80        "WAL preallocation is only implemented on linux",
81    ))
82}
83
84/// Whether a `fallocate` failure means "this filesystem can't preallocate"
85/// (tmpfs, overlayfs, many network filesystems) rather than a real I/O error.
86/// Those are soft failures that flip the feature off; anything else is left to
87/// the normal write path to surface (e.g. a genuine `ENOSPC`).
88fn fallocate_unsupported(err: &io::Error) -> bool {
89    if err.kind() == io::ErrorKind::Unsupported {
90        return true;
91    }
92    #[cfg(target_os = "linux")]
93    {
94        matches!(
95            err.raw_os_error(),
96            Some(libc::EOPNOTSUPP) | Some(libc::ENOSYS) | Some(libc::EINVAL)
97        )
98    }
99    #[cfg(not(target_os = "linux"))]
100    {
101        false
102    }
103}
104
105/// Writer for the Write-Ahead Log
106///
107/// Wraps the underlying file in a [`BufWriter`] so each `append` does
108/// not pay a write syscall — bytes accumulate in a 64 KiB user-space
109/// buffer until `sync()` (or `flush_until()`) drains them and then
110/// calls `sync_data()`/`sync_all()` on the raw file. This is how postgres turns
111/// per-record append cost from ~500 ns down to ~5 ns; reddb's previous
112/// per-append `write_all` directly to the file paid the syscall on
113/// every record.
114///
115/// **Critical contract:** every code path that syncs the underlying
116/// file *must* drain the [`BufWriter`] first via
117/// `BufWriter::flush()`. Otherwise the bytes in user-space never reach
118/// the kernel before fsync, and durability is silently broken.
119pub struct WalWriter {
120    file: BufWriter<File>,
121    /// Cloned file descriptor for `sync_all()` outside the writer
122    /// mutex. Both this and `file`'s inner `File` point at the same
123    /// kernel inode; calling `sync_all()` on either flushes ALL
124    /// pending bytes for that inode. This is the trick that lets
125    /// the group-commit leader release the WAL writer lock during
126    /// the expensive fsync — see [`WalWriter::drain_for_group_sync`].
127    ///
128    /// Without this clone, a leader holding the writer mutex during
129    /// `sync_all()` blocks every other writer from appending,
130    /// defeating the entire purpose of group commit.
131    sync_handle: Arc<File>,
132    /// Log Sequence Number — byte offset of the next record. Advances
133    /// every `append`; survives across restarts via `seek(End)`.
134    current_lsn: u64,
135    /// Highest LSN that has been `sync_all()`'d to disk. The WAL-first
136    /// flush invariant relies on this: a page with `header.lsn = L` may
137    /// only be written to its data file once `durable_lsn >= L`.
138    /// See `src/storage/cache/README.md` § Invariant 2 and the Target 3
139    /// section of `PLAN.md`.
140    durable_lsn: u64,
141    /// WAL byte frontier covered by the last full file sync. Appends that stay
142    /// inside this synced preallocation range can use `sync_data()`; crossing
143    /// it, or syncing after fresh preallocation metadata, falls back to
144    /// `sync_all()`.
145    last_synced_size: u64,
146    /// Exclusive byte offset up to which disk blocks are pre-reserved via
147    /// `fallocate(FALLOC_FL_KEEP_SIZE)`. Advances one
148    /// [`reddb_file::MAIN_WAL_SEGMENT_BYTES`]
149    /// segment at a time as `current_lsn` approaches it (issue #893). Reset to
150    /// `0` on [`truncate`](Self::truncate) — which frees the blocks — and
151    /// immediately re-extended (the checkpoint re-extend path).
152    preallocated_to: u64,
153    /// Cleared the first time `fallocate` reports the backing filesystem can't
154    /// preallocate (tmpfs/overlay/NFS → `EOPNOTSUPP`/`ENOSYS`, or any non-Linux
155    /// target) so we stop issuing syscalls that will always fail. Preallocation
156    /// is a best-effort optimization; clearing this never affects correctness.
157    prealloc_supported: bool,
158    /// Set when `fallocate(FALLOC_FL_KEEP_SIZE)` successfully reserved a new
159    /// range and that allocation metadata has not yet been covered by a full
160    /// sync.
161    prealloc_metadata_dirty: bool,
162    #[cfg(test)]
163    last_sync_method: Option<WalSyncMethod>,
164}
165
166impl WalWriter {
167    /// Open a WAL file for writing. Creates it if it doesn't exist.
168    pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
169        let exists = path.as_ref().exists();
170
171        // We do all initial bookkeeping (write header, seek to EOF) on
172        // the raw `File` BEFORE wrapping in a BufWriter so we don't
173        // have to worry about flush ordering during construction.
174        let mut raw = OpenOptions::new()
175            .read(true)
176            .create(true)
177            .append(true)
178            .open(path)?;
179
180        let current_lsn = if !exists || raw.metadata()?.len() == 0 {
181            raw.write_all(&reddb_file::encode_wal_file_header())?;
182            raw.sync_all()?;
183            reddb_file::WAL_FILE_HEADER_BYTES as u64
184        } else {
185            // Existing file, set LSN to current end. Append-mode files
186            // ignore this seek for *writes*, but we use the returned
187            // position as our LSN counter.
188            raw.seek(SeekFrom::End(0))?
189        };
190
191        // Clone the file handle BEFORE wrapping in BufWriter. The
192        // clone shares the same kernel file description, so
193        // sync_all() on either descriptor flushes the whole inode.
194        // The BufWriter owns the original; the Arc<File> is shared
195        // with the group-commit leader.
196        let sync_handle = Arc::new(raw.try_clone()?);
197        let file = BufWriter::with_capacity(WAL_BUFFER_BYTES, raw);
198
199        // On open, every byte already on disk is by definition durable
200        // (any pre-crash unflushed tail was lost when the OS dropped
201        // page cache). Initialise `durable_lsn` to `current_lsn`.
202        let mut writer = Self {
203            file,
204            sync_handle,
205            current_lsn,
206            durable_lsn: current_lsn,
207            last_synced_size: current_lsn,
208            preallocated_to: 0,
209            prealloc_supported: true,
210            prealloc_metadata_dirty: false,
211            #[cfg(test)]
212            last_sync_method: None,
213        };
214        // Reserve the first segment up front so the very first appends land in
215        // contiguous extents rather than growing the file page-by-page.
216        writer.ensure_preallocated()?;
217        Ok(writer)
218    }
219
220    /// Ensure disk blocks are reserved at least up to the next segment
221    /// boundary above the current write frontier (`current_lsn`).
222    ///
223    /// Cheap (pure arithmetic) until the frontier crosses a
224    /// [`reddb_file::MAIN_WAL_SEGMENT_BYTES`] boundary, at which point it issues a single
225    /// `fallocate`. Best-effort: a filesystem that can't preallocate disables
226    /// the feature; a transient error is swallowed so a write never fails
227    /// because preallocation hiccuped (the write path surfaces a genuine
228    /// `ENOSPC` on its own). Never grows the file's logical length, so it is
229    /// invisible to crash recovery.
230    fn ensure_preallocated(&mut self) -> io::Result<()> {
231        if !self.prealloc_supported {
232            return Ok(());
233        }
234        let target = reddb_file::next_main_wal_segment_boundary(self.current_lsn);
235        if target <= self.preallocated_to {
236            return Ok(());
237        }
238        let from = self.preallocated_to;
239        match reserve_wal_blocks(self.file.get_ref(), from, target - from) {
240            Ok(()) => {
241                self.preallocated_to = target;
242                self.prealloc_metadata_dirty = true;
243            }
244            Err(ref e) if fallocate_unsupported(e) => self.prealloc_supported = false,
245            Err(_) => {
246                // Best-effort: leave `preallocated_to` as-is and retry at the
247                // next boundary. Never propagate.
248            }
249        }
250        Ok(())
251    }
252
253    /// Append a record to the WAL.
254    ///
255    /// Bytes go into the BufWriter — they are NOT durable on disk
256    /// after this call returns. Callers that need durability must
257    /// follow up with [`WalWriter::sync`] or
258    /// [`WalWriter::flush_until`].
259    ///
260    /// Returns the LSN (Log Sequence Number) of the record.
261    pub fn append(&mut self, record: &WalRecord) -> io::Result<u64> {
262        let bytes = record.encode();
263        self.file.write_all(&bytes)?;
264
265        let record_lsn = self.current_lsn;
266        self.current_lsn += bytes.len() as u64;
267
268        self.ensure_preallocated()?;
269        Ok(record_lsn)
270    }
271
272    /// Write already-encoded bytes and advance the LSN counter to
273    /// match. Used by the lock-free append path: writers encode +
274    /// atomically reserve an LSN range outside this writer, the
275    /// group-commit coordinator drains the pending queue in LSN
276    /// order, then calls `append_bytes` for each batch.
277    ///
278    /// The bytes MUST be a valid `WalRecord::encode()` payload (or a
279    /// concatenation of such) — no structural validation happens
280    /// here. The caller is responsible for keeping the on-disk
281    /// byte offset synchronised with the externally-tracked LSN
282    /// counter; this method just appends and advances.
283    pub fn append_bytes(&mut self, bytes: &[u8]) -> io::Result<u64> {
284        self.file.write_all(bytes)?;
285        let record_lsn = self.current_lsn;
286        self.current_lsn += bytes.len() as u64;
287        self.ensure_preallocated()?;
288        Ok(record_lsn)
289    }
290
291    /// Rewind the writer's LSN counter to a specific value. Used
292    /// by the lock-free append path to resync the writer with the
293    /// externally-tracked `next_lsn` after a drain batch; the
294    /// coordinator knows the exact byte offset it just wrote to
295    /// and needs `current_lsn` to match so subsequent direct
296    /// callers of `append` stay consistent.
297    pub fn set_current_lsn(&mut self, lsn: u64) {
298        self.current_lsn = lsn;
299    }
300
301    /// Force sync to disk.
302    ///
303    /// Drains the user-space [`BufWriter`] first, then calls
304    /// `sync_all()` on the underlying file so every byte appended
305    /// since the last sync is durable. Updates `durable_lsn` so
306    /// subsequent `flush_until` calls become no-ops up to
307    /// `current_lsn`.
308    pub fn sync(&mut self) -> io::Result<()> {
309        self.file.flush()?;
310        self.sync_flushed_file()?;
311        self.durable_lsn = self.current_lsn;
312        Ok(())
313    }
314
315    /// Ensure the WAL is durable on disk at least up to byte offset
316    /// `target`. No-op when `target <= durable_lsn`.
317    ///
318    /// This is the postgres `XLogFlush(LSN)` analogue. Pager flush
319    /// paths call this with `max(dirty.header.lsn)` before writing
320    /// any data page so the WAL record describing the change is
321    /// guaranteed to be on disk before the page itself.
322    pub fn flush_until(&mut self, target: u64) -> io::Result<()> {
323        if self.durable_lsn >= target {
324            return Ok(());
325        }
326        self.file.flush()?;
327        self.sync_flushed_file()?;
328        self.durable_lsn = self.current_lsn;
329        Ok(())
330    }
331
332    fn sync_flushed_file(&mut self) -> io::Result<()> {
333        let method = self.next_sync_method();
334        match method {
335            WalSyncMethod::Data => self.file.get_ref().sync_data()?,
336            WalSyncMethod::All => self.file.get_ref().sync_all()?,
337        }
338        self.mark_sync_complete(method, self.current_lsn);
339        Ok(())
340    }
341
342    fn next_sync_method(&self) -> WalSyncMethod {
343        if !self.prealloc_metadata_dirty && self.current_lsn <= self.last_synced_size {
344            WalSyncMethod::Data
345        } else {
346            WalSyncMethod::All
347        }
348    }
349
350    fn mark_sync_complete(&mut self, method: WalSyncMethod, lsn: u64) {
351        match method {
352            WalSyncMethod::Data => {}
353            WalSyncMethod::All => {
354                self.last_synced_size = self.preallocated_to.max(lsn);
355                self.prealloc_metadata_dirty = false;
356            }
357        }
358        #[cfg(test)]
359        {
360            self.last_sync_method = Some(method);
361        }
362    }
363
364    /// Highest byte offset that is durable on disk. Used by the pager
365    /// to decide whether a `flush_until` call would actually need a
366    /// `fsync`.
367    pub fn durable_lsn(&self) -> u64 {
368        self.durable_lsn
369    }
370
371    /// Get current LSN (end of file offset)
372    pub fn current_lsn(&self) -> u64 {
373        self.current_lsn
374    }
375
376    /// Drain the BufWriter into the kernel and return the captured
377    /// LSN plus a cloned file handle and sync method for the caller
378    /// **without holding the WAL writer mutex**.
379    ///
380    /// Used by the group-commit leader path. The flow is:
381    ///
382    /// 1. Take the WAL writer mutex.
383    /// 2. Call this method — drains user-space buffer to the kernel
384    ///    and captures a size-aware sync plan.
385    /// 3. Release the WAL writer mutex.
386    /// 4. Execute the sync plan — this is the expensive ~100 µs syscall,
387    ///    and other writers can keep appending while it runs.
388    /// 5. Take the WAL writer mutex briefly and call
389    ///    [`WalWriter::mark_durable`] to publish the new durable position.
390    ///
391    /// The cloned `sync_handle` shares the same kernel inode with
392    /// the writer's `file`, so syncing the clone flushes bytes that
393    /// have reached the kernel for that file.
394    /// This is the coalescing window that makes group commit win.
395    pub(crate) fn drain_for_group_sync(&mut self) -> io::Result<WalGroupSync> {
396        // Drain user-space buffer into the kernel.
397        self.file.flush()?;
398        Ok(WalGroupSync {
399            target_lsn: self.current_lsn,
400            sync_handle: Arc::clone(&self.sync_handle),
401            method: self.next_sync_method(),
402        })
403    }
404
405    /// Manually advance `durable_lsn` after a successful out-of-lock
406    /// sync performed via [`WalWriter::drain_for_group_sync`].
407    ///
408    /// Monotonic — never lowers `durable_lsn`. Safe to call with a
409    /// stale `lsn`; just becomes a no-op.
410    pub(crate) fn mark_durable(&mut self, sync: &WalGroupSync) {
411        let lsn = sync.target_lsn;
412        if lsn > self.durable_lsn {
413            self.durable_lsn = lsn;
414        }
415        self.mark_sync_complete(sync.method, lsn);
416    }
417
418    /// Truncate the WAL (usually after checkpoint).
419    ///
420    /// Drains the BufWriter first so no pending bytes hit the file
421    /// after the truncate. Then resets the underlying file, rewrites
422    /// the header through the buffered writer (header is small; the
423    /// followup `flush + sync_all` makes it durable), and resets
424    /// LSN bookkeeping.
425    pub fn truncate(&mut self) -> io::Result<()> {
426        // Drop any pending bytes BEFORE the truncate; otherwise the
427        // BufWriter would flush them to a re-shrunken file in
428        // confused order.
429        self.file.flush()?;
430
431        {
432            let raw = self.file.get_mut();
433            raw.set_len(0)?;
434            raw.seek(SeekFrom::Start(0))?;
435        }
436
437        // Rewrite header through the BufWriter then drain.
438        self.file.write_all(&reddb_file::encode_wal_file_header())?;
439        self.file.flush()?;
440        self.file.get_ref().sync_all()?;
441
442        self.current_lsn = reddb_file::WAL_FILE_HEADER_BYTES as u64;
443        self.durable_lsn = reddb_file::WAL_FILE_HEADER_BYTES as u64;
444        self.last_synced_size = reddb_file::WAL_FILE_HEADER_BYTES as u64;
445        self.prealloc_metadata_dirty = false;
446        #[cfg(test)]
447        {
448            self.last_sync_method = Some(WalSyncMethod::All);
449        }
450
451        // `set_len(0)` freed every reserved block, so the WAL would otherwise
452        // grow page-by-page again from here. Re-extend a fresh segment now —
453        // this is the "truncate/re-extend on checkpoint" half of issue #893.
454        self.preallocated_to = 0;
455        self.ensure_preallocated()?;
456        Ok(())
457    }
458}
459
460#[cfg(test)]
461mod tests {
462    use super::*;
463    use std::path::PathBuf;
464
465    struct FileGuard {
466        path: PathBuf,
467    }
468
469    impl Drop for FileGuard {
470        fn drop(&mut self) {
471            let _ = std::fs::remove_file(&self.path);
472        }
473    }
474
475    fn temp_wal(name: &str) -> (FileGuard, PathBuf) {
476        let path = reddb_file::layout::wal_component_temp_path(
477            &std::env::temp_dir(),
478            "writer",
479            name,
480            std::process::id(),
481        );
482        let guard = FileGuard { path: path.clone() };
483        let _ = std::fs::remove_file(&path);
484        (guard, path)
485    }
486
487    #[test]
488    fn test_create_new_wal() {
489        let (_guard, path) = temp_wal("create");
490        let writer = WalWriter::open(&path).unwrap();
491
492        // Should start at LSN 8 (after 8-byte header)
493        assert_eq!(writer.current_lsn(), 8);
494        assert!(path.exists());
495    }
496
497    #[test]
498    fn test_append_record() {
499        let (_guard, path) = temp_wal("append");
500        let mut writer = WalWriter::open(&path).unwrap();
501
502        let record = WalRecord::Begin { tx_id: 42 };
503        let lsn = writer.append(&record).unwrap();
504
505        // First record starts at LSN 8
506        assert_eq!(lsn, 8);
507
508        // Next record should start after encoded size
509        // Begin record: 1 (type) + 8 (term) + 8 (tx_id) + 4 (checksum) = 21 bytes
510        assert_eq!(writer.current_lsn(), 8 + 21);
511    }
512
513    #[test]
514    fn test_append_multiple_records() {
515        let (_guard, path) = temp_wal("multi");
516        let mut writer = WalWriter::open(&path).unwrap();
517
518        let lsn1 = writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
519        let lsn2 = writer.append(&WalRecord::Begin { tx_id: 2 }).unwrap();
520        let lsn3 = writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
521
522        assert_eq!(lsn1, 8);
523        assert_eq!(lsn2, 8 + 21);
524        assert_eq!(lsn3, 8 + 21 + 21);
525    }
526
527    #[test]
528    fn test_page_write_lsn() {
529        let (_guard, path) = temp_wal("pagewrite");
530        let mut writer = WalWriter::open(&path).unwrap();
531
532        // First record
533        let lsn1 = writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
534        assert_eq!(lsn1, 8);
535
536        // PageWrite record: 1 + 8 + 4 + 4 + data_len + 4 = 21 + data_len
537        let data = vec![1, 2, 3, 4, 5];
538        let lsn2 = writer
539            .append(&WalRecord::PageWrite {
540                tx_id: 1,
541                page_id: 100,
542                data: data.clone(),
543            })
544            .unwrap();
545
546        assert_eq!(lsn2, 8 + 21); // after Begin
547
548        // Next LSN = lsn2 + (1 + 8 + 8 + 4 + 4 + 5 + 4) = lsn2 + 34
549        assert_eq!(writer.current_lsn(), 8 + 21 + 34);
550    }
551
552    #[test]
553    fn test_sync() {
554        let (_guard, path) = temp_wal("sync");
555        let mut writer = WalWriter::open(&path).unwrap();
556
557        writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
558        writer.sync().unwrap();
559
560        // File should be synced, just verify no error
561        assert!(path.exists());
562    }
563
564    #[test]
565    fn test_truncate() {
566        let (_guard, path) = temp_wal("truncate");
567        let mut writer = WalWriter::open(&path).unwrap();
568
569        // Write some records
570        writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
571        writer
572            .append(&WalRecord::PageWrite {
573                tx_id: 1,
574                page_id: 0,
575                data: vec![0; 100],
576            })
577            .unwrap();
578        writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
579
580        let lsn_before = writer.current_lsn();
581        assert!(lsn_before > 8);
582
583        // Truncate
584        writer.truncate().unwrap();
585
586        // LSN should be back to 8
587        assert_eq!(writer.current_lsn(), 8);
588
589        // File should be 8 bytes (just header)
590        let len = std::fs::metadata(&path).unwrap().len();
591        assert_eq!(len, 8);
592    }
593
594    #[test]
595    fn test_reopen_existing() {
596        let (_guard, path) = temp_wal("reopen");
597
598        // Create and write
599        let lsn_after_write;
600        {
601            let mut writer = WalWriter::open(&path).unwrap();
602            writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
603            writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
604            lsn_after_write = writer.current_lsn();
605        }
606
607        // Reopen
608        {
609            let writer = WalWriter::open(&path).unwrap();
610            // Should continue from where we left off
611            assert_eq!(writer.current_lsn(), lsn_after_write);
612        }
613    }
614
615    #[test]
616    fn test_checkpoint_record() {
617        let (_guard, path) = temp_wal("checkpoint");
618        let mut writer = WalWriter::open(&path).unwrap();
619
620        // Checkpoint is same size as Begin (1 + 8 + 8 + 4 = 21)
621        let lsn = writer
622            .append(&WalRecord::Checkpoint { lsn: 12345 })
623            .unwrap();
624        assert_eq!(lsn, 8);
625        assert_eq!(writer.current_lsn(), 8 + 21);
626    }
627
628    // -----------------------------------------------------------------
629    // Target 3: durable_lsn / flush_until tests
630    // -----------------------------------------------------------------
631
632    #[test]
633    fn fresh_wal_has_durable_lsn_at_header_end() {
634        let (_guard, path) = temp_wal("durable_init");
635        let writer = WalWriter::open(&path).unwrap();
636        assert_eq!(writer.durable_lsn(), 8);
637        assert_eq!(writer.current_lsn(), 8);
638    }
639
640    #[test]
641    fn flush_until_below_durable_is_noop() {
642        let (_guard, path) = temp_wal("flush_noop");
643        let mut writer = WalWriter::open(&path).unwrap();
644        // After open, durable_lsn == 8.
645        let before = writer.durable_lsn();
646        writer.flush_until(0).unwrap();
647        writer.flush_until(8).unwrap();
648        assert_eq!(writer.durable_lsn(), before);
649    }
650
651    #[test]
652    fn flush_until_advances_durable_to_current() {
653        let (_guard, path) = temp_wal("flush_advance");
654        let mut writer = WalWriter::open(&path).unwrap();
655        writer.append(&WalRecord::Begin { tx_id: 7 }).unwrap();
656        writer.append(&WalRecord::Commit { tx_id: 7 }).unwrap();
657        let target = writer.current_lsn();
658        // Before flush_until, durable still at the header.
659        assert_eq!(writer.durable_lsn(), 8);
660        writer.flush_until(target).unwrap();
661        assert_eq!(writer.durable_lsn(), target);
662    }
663
664    #[test]
665    fn flush_until_is_monotonic() {
666        let (_guard, path) = temp_wal("flush_monotonic");
667        let mut writer = WalWriter::open(&path).unwrap();
668        writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
669        let lo = writer.current_lsn();
670        writer.flush_until(lo).unwrap();
671        let durable_after_lo = writer.durable_lsn();
672        writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
673        let hi = writer.current_lsn();
674        writer.flush_until(hi).unwrap();
675        assert!(writer.durable_lsn() >= durable_after_lo);
676        // Calling flush_until(lo) after flush_until(hi) is a no-op.
677        writer.flush_until(lo).unwrap();
678        assert_eq!(writer.durable_lsn(), hi);
679    }
680
681    #[test]
682    fn sync_advances_durable_lsn_too() {
683        let (_guard, path) = temp_wal("sync_durable");
684        let mut writer = WalWriter::open(&path).unwrap();
685        writer.append(&WalRecord::Begin { tx_id: 9 }).unwrap();
686        let before = writer.durable_lsn();
687        let after_append = writer.current_lsn();
688        assert!(after_append > before);
689        writer.sync().unwrap();
690        assert_eq!(writer.durable_lsn(), after_append);
691    }
692
693    #[test]
694    fn sync_all_is_used_when_wal_size_grew() {
695        let (_guard, path) = temp_wal("sync_all_grew");
696        let mut writer = WalWriter::open(&path).unwrap();
697
698        writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
699        writer.sync().unwrap();
700
701        assert_eq!(writer.last_sync_method, Some(WalSyncMethod::All));
702        assert!(writer.last_synced_size >= writer.current_lsn());
703        assert!(!writer.prealloc_metadata_dirty);
704    }
705
706    #[test]
707    fn sync_all_is_used_for_metadata_only_preallocation() {
708        let (_guard, path) = temp_wal("sync_all_prealloc_metadata");
709        let mut writer = WalWriter::open(&path).unwrap();
710        if !writer.prealloc_supported {
711            return;
712        }
713
714        assert_eq!(writer.current_lsn(), 8);
715        assert!(writer.prealloc_metadata_dirty);
716
717        writer.sync().unwrap();
718
719        assert_eq!(writer.last_sync_method, Some(WalSyncMethod::All));
720        assert_eq!(writer.last_synced_size, writer.preallocated_to);
721        assert!(!writer.prealloc_metadata_dirty);
722    }
723
724    #[test]
725    fn sync_data_is_used_when_wal_size_is_unchanged() {
726        let (_guard, path) = temp_wal("sync_data_unchanged");
727        let mut writer = WalWriter::open(&path).unwrap();
728
729        writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
730        writer.sync().unwrap();
731        let synced_size = writer.last_synced_size;
732        writer.sync().unwrap();
733
734        assert_eq!(writer.last_sync_method, Some(WalSyncMethod::Data));
735        assert_eq!(writer.last_synced_size, synced_size);
736        assert_eq!(writer.durable_lsn(), writer.current_lsn());
737    }
738
739    #[test]
740    fn sync_data_is_used_for_appends_within_synced_preallocation() {
741        let (_guard, path) = temp_wal("sync_data_preallocated_append");
742        let mut writer = WalWriter::open(&path).unwrap();
743        if !writer.prealloc_supported {
744            return;
745        }
746
747        writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
748        writer.sync().unwrap();
749        assert_eq!(writer.last_sync_method, Some(WalSyncMethod::All));
750
751        writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
752        writer.sync().unwrap();
753
754        assert_eq!(writer.last_sync_method, Some(WalSyncMethod::Data));
755        assert_eq!(writer.durable_lsn(), writer.current_lsn());
756        assert!(writer.current_lsn() <= writer.last_synced_size);
757    }
758
759    #[test]
760    fn group_sync_uses_sync_data_within_synced_preallocation() {
761        let (_guard, path) = temp_wal("group_sync_data_preallocated_append");
762        let mut writer = WalWriter::open(&path).unwrap();
763        if !writer.prealloc_supported {
764            return;
765        }
766
767        writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
768        writer.sync().unwrap();
769        assert_eq!(writer.last_sync_method, Some(WalSyncMethod::All));
770
771        writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
772        let sync = writer.drain_for_group_sync().unwrap();
773        assert_eq!(sync.method, WalSyncMethod::Data);
774        sync.sync().unwrap();
775        writer.mark_durable(&sync);
776
777        assert_eq!(writer.last_sync_method, Some(WalSyncMethod::Data));
778        assert_eq!(writer.durable_lsn(), writer.current_lsn());
779    }
780
781    #[test]
782    fn truncate_resets_durable_lsn() {
783        let (_guard, path) = temp_wal("truncate_durable");
784        let mut writer = WalWriter::open(&path).unwrap();
785        writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
786        writer.sync().unwrap();
787        assert!(writer.durable_lsn() > 8);
788        writer.truncate().unwrap();
789        assert_eq!(writer.durable_lsn(), 8);
790        assert_eq!(writer.current_lsn(), 8);
791    }
792
793    #[test]
794    fn reopen_initialises_durable_to_current() {
795        let (_guard, path) = temp_wal("reopen_durable");
796        {
797            let mut writer = WalWriter::open(&path).unwrap();
798            writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
799            writer.sync().unwrap();
800        }
801        let writer = WalWriter::open(&path).unwrap();
802        // After reopen, every byte on disk is durable by definition.
803        assert_eq!(writer.durable_lsn(), writer.current_lsn());
804    }
805
806    // -----------------------------------------------------------------
807    // Perf 1.1: BufWriter coalesces small appends until sync
808    // -----------------------------------------------------------------
809
810    #[test]
811    fn bufwriter_coalesces_until_sync() {
812        // Append 100 small records but DO NOT sync. The on-disk file
813        // size must still equal the header (8 bytes) because the
814        // bytes are sitting in the BufWriter, not in the kernel.
815        let (_guard, path) = temp_wal("bufwriter_coalesce");
816        let mut writer = WalWriter::open(&path).unwrap();
817        for tx in 0..100u64 {
818            writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
819        }
820        // current_lsn reflects the in-buffer position.
821        assert_eq!(writer.current_lsn(), 8 + 100 * 21);
822        // But the file on disk only has the header.
823        let on_disk = std::fs::metadata(&path).unwrap().len();
824        assert_eq!(on_disk, 8, "BufWriter leaked bytes to disk before sync");
825    }
826
827    #[test]
828    fn sync_drains_bufwriter_before_fsync() {
829        // After sync(), the file size must equal current_lsn — the
830        // BufWriter has been flushed and sync_all has hit the kernel.
831        let (_guard, path) = temp_wal("sync_drains");
832        let mut writer = WalWriter::open(&path).unwrap();
833        for tx in 0..50u64 {
834            writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
835        }
836        writer.sync().unwrap();
837        let on_disk = std::fs::metadata(&path).unwrap().len();
838        assert_eq!(on_disk, writer.current_lsn());
839        assert_eq!(writer.durable_lsn(), writer.current_lsn());
840    }
841
842    #[test]
843    fn flush_until_drains_bufwriter_too() {
844        // flush_until must drain the BufWriter before calling
845        // sync_all on the underlying file — otherwise pending bytes
846        // never become durable.
847        let (_guard, path) = temp_wal("flush_until_drains");
848        let mut writer = WalWriter::open(&path).unwrap();
849        for tx in 0..30u64 {
850            writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
851        }
852        let target = writer.current_lsn();
853        writer.flush_until(target).unwrap();
854        let on_disk = std::fs::metadata(&path).unwrap().len();
855        assert_eq!(on_disk, target);
856        assert_eq!(writer.durable_lsn(), target);
857    }
858
859    #[test]
860    fn truncate_drains_pending_bufwriter_bytes_first() {
861        // If truncate did NOT drain BufWriter first, the pending bytes
862        // would either land in the post-truncate file (corrupting it
863        // with stale records) or be lost. Verify the resulting file
864        // contains only a fresh header.
865        let (_guard, path) = temp_wal("truncate_drain");
866        let mut writer = WalWriter::open(&path).unwrap();
867        // Write enough small records to fill some of the 64 KiB buffer
868        // but stay below the auto-flush threshold.
869        for tx in 0..200u64 {
870            writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
871        }
872        // Sanity: bytes are buffered.
873        assert_eq!(std::fs::metadata(&path).unwrap().len(), 8);
874
875        writer.truncate().unwrap();
876        // After truncate the file is just the header again.
877        let on_disk = std::fs::metadata(&path).unwrap().len();
878        assert_eq!(on_disk, 8);
879        assert_eq!(writer.current_lsn(), 8);
880        assert_eq!(writer.durable_lsn(), 8);
881
882        // And we can append again successfully.
883        writer.append(&WalRecord::Begin { tx_id: 99 }).unwrap();
884        writer.sync().unwrap();
885        assert_eq!(std::fs::metadata(&path).unwrap().len(), 8 + 21);
886    }
887
888    #[test]
889    fn reopen_sees_only_synced_records() {
890        // Records that were appended but never sync'd must NOT
891        // survive a reopen — they lived in the BufWriter, never made
892        // it to the kernel, and the previous WalWriter went out of
893        // scope. The new WalWriter reopens the file and reads from
894        // EOF, which reflects only the bytes that hit disk.
895        //
896        // We sync some records, then drop the writer mid-buffer, and
897        // assert the reopen LSN matches only the synced prefix.
898        let (_guard, path) = temp_wal("reopen_synced_only");
899        let synced_lsn;
900        {
901            let mut writer = WalWriter::open(&path).unwrap();
902            writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
903            writer.sync().unwrap();
904            synced_lsn = writer.current_lsn();
905            // These records are never sync'd before drop. Drop runs
906            // BufWriter::flush which DOES write them — see note below.
907            for tx in 100..120u64 {
908                writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
909            }
910            // Without a sync, the in-buffer bytes are still pending.
911            // BufWriter's Drop impl does flush to the file but does
912            // not call sync_all. For reopen-LSN purposes, on-disk
913            // bytes count regardless of fsync, so the reopened LSN
914            // will reflect the dropped writes too.
915        }
916        let writer = WalWriter::open(&path).unwrap();
917        // The reopen LSN reflects what's physically on disk after
918        // BufWriter::Drop flushes its buffer. That may or may not
919        // include the unsync'd records depending on platform; the
920        // contract we care about is that durable_lsn ≥ synced_lsn.
921        assert!(writer.durable_lsn() >= synced_lsn);
922    }
923
924    // -----------------------------------------------------------------
925    // Issue #893: fallocate-based WAL segment preallocation
926    // -----------------------------------------------------------------
927
928    /// On-disk blocks reserved by `fallocate`, in bytes. Returns the
929    /// allocated size (st_blocks × 512), independent of the logical length.
930    fn allocated_bytes(path: &std::path::Path) -> u64 {
931        use fs2::FileExt;
932        let f = std::fs::File::open(path).unwrap();
933        f.allocated_size().unwrap()
934    }
935
936    #[test]
937    fn segment_boundary_rounds_strictly_up() {
938        // Always lands one boundary ahead so the reservation stays in front
939        // of the write frontier.
940        assert_eq!(
941            reddb_file::next_main_wal_segment_boundary(0),
942            reddb_file::MAIN_WAL_SEGMENT_BYTES
943        );
944        assert_eq!(
945            reddb_file::next_main_wal_segment_boundary(8),
946            reddb_file::MAIN_WAL_SEGMENT_BYTES
947        );
948        assert_eq!(
949            reddb_file::next_main_wal_segment_boundary(reddb_file::MAIN_WAL_SEGMENT_BYTES - 1),
950            reddb_file::MAIN_WAL_SEGMENT_BYTES
951        );
952        // Exactly on a boundary still advances to the next one.
953        assert_eq!(
954            reddb_file::next_main_wal_segment_boundary(reddb_file::MAIN_WAL_SEGMENT_BYTES),
955            2 * reddb_file::MAIN_WAL_SEGMENT_BYTES
956        );
957        assert_eq!(
958            reddb_file::next_main_wal_segment_boundary(reddb_file::MAIN_WAL_SEGMENT_BYTES + 1),
959            2 * reddb_file::MAIN_WAL_SEGMENT_BYTES
960        );
961    }
962
963    #[test]
964    fn open_preallocates_first_segment() {
965        // A freshly opened WAL must reserve a whole segment up front instead
966        // of growing incrementally (acceptance #1).
967        let (_guard, path) = temp_wal("prealloc_open");
968        let writer = WalWriter::open(&path).unwrap();
969        if !writer.prealloc_supported {
970            return; // filesystem without fallocate — feature is a no-op.
971        }
972        assert_eq!(writer.preallocated_to, reddb_file::MAIN_WAL_SEGMENT_BYTES);
973        // The reservation is real on disk, yet the logical file is still just
974        // the 8-byte header.
975        assert!(allocated_bytes(&path) >= reddb_file::MAIN_WAL_SEGMENT_BYTES);
976        assert_eq!(std::fs::metadata(&path).unwrap().len(), 8);
977    }
978
979    #[test]
980    fn preallocation_does_not_grow_logical_length() {
981        // The load-bearing invariant for crash recovery: appending records
982        // must NOT inflate the logical file size beyond the real data, or the
983        // EOF scan in WalReader would walk into the reserved tail. Holds on
984        // every filesystem (fallocate keeps i_size pinned; absent fallocate
985        // there is no reservation at all).
986        let (_guard, path) = temp_wal("prealloc_logical");
987        let mut writer = WalWriter::open(&path).unwrap();
988        for tx in 0..50u64 {
989            writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
990        }
991        writer.sync().unwrap();
992        let logical = std::fs::metadata(&path).unwrap().len();
993        assert_eq!(logical, 8 + 50 * 21, "preallocation inflated i_size");
994        assert_eq!(writer.current_lsn(), logical);
995    }
996
997    #[test]
998    fn truncate_re_extends_a_fresh_segment() {
999        // After checkpoint truncation the WAL must re-extend rather than grow
1000        // unbounded page-by-page (acceptance #2).
1001        let (_guard, path) = temp_wal("prealloc_truncate");
1002        let mut writer = WalWriter::open(&path).unwrap();
1003        writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
1004        writer.sync().unwrap();
1005
1006        writer.truncate().unwrap();
1007
1008        assert_eq!(writer.current_lsn(), 8);
1009        assert_eq!(std::fs::metadata(&path).unwrap().len(), 8);
1010        if writer.prealloc_supported {
1011            assert_eq!(writer.preallocated_to, reddb_file::MAIN_WAL_SEGMENT_BYTES);
1012            assert!(allocated_bytes(&path) >= reddb_file::MAIN_WAL_SEGMENT_BYTES);
1013        }
1014    }
1015
1016    #[test]
1017    fn preallocated_wal_recovers_records_without_trailing_garbage() {
1018        // End-to-end: a preallocated WAL must read back exactly the records
1019        // written — the reserved (unwritten) tail must be invisible to the
1020        // reader, proving crash-recovery is unchanged (acceptance #3).
1021        use super::super::reader::WalReader;
1022        let (_guard, path) = temp_wal("prealloc_recover");
1023        {
1024            let mut writer = WalWriter::open(&path).unwrap();
1025            writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
1026            writer
1027                .append(&WalRecord::PageWrite {
1028                    tx_id: 1,
1029                    page_id: 7,
1030                    data: vec![1, 2, 3, 4],
1031                })
1032                .unwrap();
1033            writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
1034            writer.sync().unwrap();
1035        }
1036        let records: Vec<_> = WalReader::open(&path)
1037            .unwrap()
1038            .iter()
1039            .collect::<Result<_, _>>()
1040            .expect("reader must stop cleanly at real EOF, not in reserved tail");
1041        assert_eq!(records.len(), 3);
1042        assert_eq!(records[0].1, WalRecord::Begin { tx_id: 1 });
1043        assert_eq!(records[2].1, WalRecord::Commit { tx_id: 1 });
1044    }
1045}