Skip to main content

reddb_server/storage/wal/
writer.rs

1use super::record::{WalRecord, WAL_MAGIC, WAL_VERSION};
2use std::fs::{File, OpenOptions};
3use std::io::{self, BufWriter, Seek, SeekFrom, Write};
4use std::path::Path;
5use std::sync::Arc;
6
7/// User-space buffer size for the WAL writer.
8///
9/// Chosen so that ~5 000 small records (Begin/Commit ≈ 21 bytes,
10/// small PageWrite ≈ 34 bytes) coalesce into a single `write` syscall
11/// before the next `sync()` drains the buffer. Tunable; reflects the
12/// postgres XLOG block size (8 KiB) scaled up because we batch
13/// record-level rather than page-level.
14const WAL_BUFFER_BYTES: usize = 64 * 1024;
15
16/// Size of one pre-allocated WAL segment.
17///
18/// The writer keeps disk blocks reserved one segment ahead of its write
19/// frontier via `fallocate(2)` with `FALLOC_FL_KEEP_SIZE`, so the
20/// continuously-growing WAL lands in contiguous extents instead of
21/// fragmenting the data file's extents on ext4/XFS (issue #893, PRD #851).
22/// 16 MiB mirrors postgres' default WAL segment size.
23const WAL_SEGMENT_BYTES: u64 = 16 * 1024 * 1024;
24
25/// Next segment boundary strictly above `pos`.
26///
27/// `pos` already at a boundary still rounds *up* to the following one, so the
28/// reservation always stays at least one boundary ahead of the frontier.
29#[inline]
30fn next_wal_segment_boundary(pos: u64) -> u64 {
31    (pos / WAL_SEGMENT_BYTES + 1) * WAL_SEGMENT_BYTES
32}
33
34/// Reserve disk blocks for `[offset, offset + len)` **without** growing the
35/// file's logical length (`FALLOC_FL_KEEP_SIZE`).
36///
37/// Pinning `i_size` is the whole trick that makes preallocation invisible to
38/// crash recovery: the WAL's logical end stays equal to its real data length,
39/// so [`WalReader`](super::reader::WalReader)'s EOF scan never walks into a
40/// zero-filled reserved tail (a `0x00` type byte would otherwise decode to an
41/// "Invalid record type" error and abort recovery). This is why we cannot use
42/// `fs2::allocate` here — it calls `posix_fallocate`, which *extends* `i_size`.
43///
44/// Linux-only; other targets return [`io::ErrorKind::Unsupported`] so the
45/// caller disables the optimization silently.
46#[cfg(target_os = "linux")]
47fn reserve_wal_blocks(file: &File, offset: u64, len: u64) -> io::Result<()> {
48    use std::os::unix::io::AsRawFd;
49    if len == 0 {
50        return Ok(());
51    }
52    // SAFETY: `file` owns a valid fd for the duration of the call; fallocate
53    // only mutates block reservations for that fd, never process memory.
54    let ret = unsafe {
55        libc::fallocate(
56            file.as_raw_fd(),
57            libc::FALLOC_FL_KEEP_SIZE,
58            offset as libc::off_t,
59            len as libc::off_t,
60        )
61    };
62    if ret == 0 {
63        Ok(())
64    } else {
65        Err(io::Error::last_os_error())
66    }
67}
68
69#[cfg(not(target_os = "linux"))]
70fn reserve_wal_blocks(_file: &File, _offset: u64, _len: u64) -> io::Result<()> {
71    Err(io::Error::new(
72        io::ErrorKind::Unsupported,
73        "WAL preallocation is only implemented on linux",
74    ))
75}
76
77/// Whether a `fallocate` failure means "this filesystem can't preallocate"
78/// (tmpfs, overlayfs, many network filesystems) rather than a real I/O error.
79/// Those are soft failures that flip the feature off; anything else is left to
80/// the normal write path to surface (e.g. a genuine `ENOSPC`).
81fn fallocate_unsupported(err: &io::Error) -> bool {
82    if err.kind() == io::ErrorKind::Unsupported {
83        return true;
84    }
85    #[cfg(target_os = "linux")]
86    {
87        matches!(
88            err.raw_os_error(),
89            Some(libc::EOPNOTSUPP) | Some(libc::ENOSYS) | Some(libc::EINVAL)
90        )
91    }
92    #[cfg(not(target_os = "linux"))]
93    {
94        false
95    }
96}
97
98/// Writer for the Write-Ahead Log
99///
100/// Wraps the underlying file in a [`BufWriter`] so each `append` does
101/// not pay a write syscall — bytes accumulate in a 64 KiB user-space
102/// buffer until `sync()` (or `flush_until()`) drains them and then
103/// calls `sync_all()` on the raw file. This is how postgres turns
104/// per-record append cost from ~500 ns down to ~5 ns; reddb's previous
105/// per-append `write_all` directly to the file paid the syscall on
106/// every record.
107///
108/// **Critical contract:** every code path that calls `sync_all()` on
109/// the underlying file *must* drain the [`BufWriter`] first via
110/// `BufWriter::flush()`. Otherwise the bytes in user-space never reach
111/// the kernel before fsync, and durability is silently broken.
112pub struct WalWriter {
113    file: BufWriter<File>,
114    /// Cloned file descriptor for `sync_all()` outside the writer
115    /// mutex. Both this and `file`'s inner `File` point at the same
116    /// kernel inode; calling `sync_all()` on either flushes ALL
117    /// pending bytes for that inode. This is the trick that lets
118    /// the group-commit leader release the WAL writer lock during
119    /// the expensive fsync — see [`WalWriter::drain_for_group_sync`].
120    ///
121    /// Without this clone, a leader holding the writer mutex during
122    /// `sync_all()` blocks every other writer from appending,
123    /// defeating the entire purpose of group commit.
124    sync_handle: Arc<File>,
125    /// Log Sequence Number — byte offset of the next record. Advances
126    /// every `append`; survives across restarts via `seek(End)`.
127    current_lsn: u64,
128    /// Highest LSN that has been `sync_all()`'d to disk. The WAL-first
129    /// flush invariant relies on this: a page with `header.lsn = L` may
130    /// only be written to its data file once `durable_lsn >= L`.
131    /// See `src/storage/cache/README.md` § Invariant 2 and the Target 3
132    /// section of `PLAN.md`.
133    durable_lsn: u64,
134    /// Exclusive byte offset up to which disk blocks are pre-reserved via
135    /// `fallocate(FALLOC_FL_KEEP_SIZE)`. Advances one [`WAL_SEGMENT_BYTES`]
136    /// segment at a time as `current_lsn` approaches it (issue #893). Reset to
137    /// `0` on [`truncate`](Self::truncate) — which frees the blocks — and
138    /// immediately re-extended (the checkpoint re-extend path).
139    preallocated_to: u64,
140    /// Cleared the first time `fallocate` reports the backing filesystem can't
141    /// preallocate (tmpfs/overlay/NFS → `EOPNOTSUPP`/`ENOSYS`, or any non-Linux
142    /// target) so we stop issuing syscalls that will always fail. Preallocation
143    /// is a best-effort optimization; clearing this never affects correctness.
144    prealloc_supported: bool,
145}
146
147impl WalWriter {
148    /// Open a WAL file for writing. Creates it if it doesn't exist.
149    pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
150        let exists = path.as_ref().exists();
151
152        // We do all initial bookkeeping (write header, seek to EOF) on
153        // the raw `File` BEFORE wrapping in a BufWriter so we don't
154        // have to worry about flush ordering during construction.
155        let mut raw = OpenOptions::new()
156            .read(true)
157            .create(true)
158            .append(true)
159            .open(path)?;
160
161        let current_lsn = if !exists || raw.metadata()?.len() == 0 {
162            // Write header for new file
163            // Format: Magic (4) + Version (1) + Reserved (3)
164            let mut header = Vec::with_capacity(8);
165            header.extend_from_slice(WAL_MAGIC);
166            header.push(WAL_VERSION);
167            header.extend_from_slice(&[0u8; 3]); // Reserved
168
169            raw.write_all(&header)?;
170            raw.sync_all()?;
171            8
172        } else {
173            // Existing file, set LSN to current end. Append-mode files
174            // ignore this seek for *writes*, but we use the returned
175            // position as our LSN counter.
176            raw.seek(SeekFrom::End(0))?
177        };
178
179        // Clone the file handle BEFORE wrapping in BufWriter. The
180        // clone shares the same kernel file description, so
181        // sync_all() on either descriptor flushes the whole inode.
182        // The BufWriter owns the original; the Arc<File> is shared
183        // with the group-commit leader.
184        let sync_handle = Arc::new(raw.try_clone()?);
185        let file = BufWriter::with_capacity(WAL_BUFFER_BYTES, raw);
186
187        // On open, every byte already on disk is by definition durable
188        // (any pre-crash unflushed tail was lost when the OS dropped
189        // page cache). Initialise `durable_lsn` to `current_lsn`.
190        let mut writer = Self {
191            file,
192            sync_handle,
193            current_lsn,
194            durable_lsn: current_lsn,
195            preallocated_to: 0,
196            prealloc_supported: true,
197        };
198        // Reserve the first segment up front so the very first appends land in
199        // contiguous extents rather than growing the file page-by-page.
200        writer.ensure_preallocated()?;
201        Ok(writer)
202    }
203
204    /// Ensure disk blocks are reserved at least up to the next segment
205    /// boundary above the current write frontier (`current_lsn`).
206    ///
207    /// Cheap (pure arithmetic) until the frontier crosses a
208    /// [`WAL_SEGMENT_BYTES`] boundary, at which point it issues a single
209    /// `fallocate`. Best-effort: a filesystem that can't preallocate disables
210    /// the feature; a transient error is swallowed so a write never fails
211    /// because preallocation hiccuped (the write path surfaces a genuine
212    /// `ENOSPC` on its own). Never grows the file's logical length, so it is
213    /// invisible to crash recovery.
214    fn ensure_preallocated(&mut self) -> io::Result<()> {
215        if !self.prealloc_supported {
216            return Ok(());
217        }
218        let target = next_wal_segment_boundary(self.current_lsn);
219        if target <= self.preallocated_to {
220            return Ok(());
221        }
222        let from = self.preallocated_to;
223        match reserve_wal_blocks(self.file.get_ref(), from, target - from) {
224            Ok(()) => self.preallocated_to = target,
225            Err(ref e) if fallocate_unsupported(e) => self.prealloc_supported = false,
226            Err(_) => {
227                // Best-effort: leave `preallocated_to` as-is and retry at the
228                // next boundary. Never propagate.
229            }
230        }
231        Ok(())
232    }
233
234    /// Append a record to the WAL.
235    ///
236    /// Bytes go into the BufWriter — they are NOT durable on disk
237    /// after this call returns. Callers that need durability must
238    /// follow up with [`WalWriter::sync`] or
239    /// [`WalWriter::flush_until`].
240    ///
241    /// Returns the LSN (Log Sequence Number) of the record.
242    pub fn append(&mut self, record: &WalRecord) -> io::Result<u64> {
243        let bytes = record.encode();
244        self.file.write_all(&bytes)?;
245
246        let record_lsn = self.current_lsn;
247        self.current_lsn += bytes.len() as u64;
248
249        self.ensure_preallocated()?;
250        Ok(record_lsn)
251    }
252
253    /// Write already-encoded bytes and advance the LSN counter to
254    /// match. Used by the lock-free append path: writers encode +
255    /// atomically reserve an LSN range outside this writer, the
256    /// group-commit coordinator drains the pending queue in LSN
257    /// order, then calls `append_bytes` for each batch.
258    ///
259    /// The bytes MUST be a valid `WalRecord::encode()` payload (or a
260    /// concatenation of such) — no structural validation happens
261    /// here. The caller is responsible for keeping the on-disk
262    /// byte offset synchronised with the externally-tracked LSN
263    /// counter; this method just appends and advances.
264    pub fn append_bytes(&mut self, bytes: &[u8]) -> io::Result<u64> {
265        self.file.write_all(bytes)?;
266        let record_lsn = self.current_lsn;
267        self.current_lsn += bytes.len() as u64;
268        self.ensure_preallocated()?;
269        Ok(record_lsn)
270    }
271
272    /// Rewind the writer's LSN counter to a specific value. Used
273    /// by the lock-free append path to resync the writer with the
274    /// externally-tracked `next_lsn` after a drain batch; the
275    /// coordinator knows the exact byte offset it just wrote to
276    /// and needs `current_lsn` to match so subsequent direct
277    /// callers of `append` stay consistent.
278    pub fn set_current_lsn(&mut self, lsn: u64) {
279        self.current_lsn = lsn;
280    }
281
282    /// Force sync to disk.
283    ///
284    /// Drains the user-space [`BufWriter`] first, then calls
285    /// `sync_all()` on the underlying file so every byte appended
286    /// since the last sync is durable. Updates `durable_lsn` so
287    /// subsequent `flush_until` calls become no-ops up to
288    /// `current_lsn`.
289    pub fn sync(&mut self) -> io::Result<()> {
290        self.file.flush()?;
291        self.file.get_ref().sync_all()?;
292        self.durable_lsn = self.current_lsn;
293        Ok(())
294    }
295
296    /// Ensure the WAL is durable on disk at least up to byte offset
297    /// `target`. No-op when `target <= durable_lsn`.
298    ///
299    /// This is the postgres `XLogFlush(LSN)` analogue. Pager flush
300    /// paths call this with `max(dirty.header.lsn)` before writing
301    /// any data page so the WAL record describing the change is
302    /// guaranteed to be on disk before the page itself.
303    pub fn flush_until(&mut self, target: u64) -> io::Result<()> {
304        if self.durable_lsn >= target {
305            return Ok(());
306        }
307        self.file.flush()?;
308        self.file.get_ref().sync_all()?;
309        self.durable_lsn = self.current_lsn;
310        Ok(())
311    }
312
313    /// Highest byte offset that is durable on disk. Used by the pager
314    /// to decide whether a `flush_until` call would actually need a
315    /// `fsync`.
316    pub fn durable_lsn(&self) -> u64 {
317        self.durable_lsn
318    }
319
320    /// Get current LSN (end of file offset)
321    pub fn current_lsn(&self) -> u64 {
322        self.current_lsn
323    }
324
325    /// Drain the BufWriter into the kernel and return the captured
326    /// LSN plus a cloned file handle for the caller to fsync
327    /// **without holding the WAL writer mutex**.
328    ///
329    /// Used by the group-commit leader path. The flow is:
330    ///
331    /// 1. Take the WAL writer mutex.
332    /// 2. Call this method — drains user-space buffer to the kernel
333    ///    and captures `(target_lsn, sync_handle)`.
334    /// 3. Release the WAL writer mutex.
335    /// 4. Call `sync_handle.sync_all()` — this is the expensive
336    ///    ~100 µs syscall, and other writers can keep appending
337    ///    while it runs.
338    /// 5. Take the WAL writer mutex briefly and call
339    ///    [`WalWriter::mark_durable(target_lsn)`] to publish the
340    ///    new durable position.
341    ///
342    /// The cloned `sync_handle` shares the same kernel inode with
343    /// the writer's `file`, so `sync_all()` on the clone flushes
344    /// ALL bytes that have reached the kernel for that file —
345    /// including bytes appended by other writers AFTER step 3.
346    /// This is the coalescing window that makes group commit win.
347    pub fn drain_for_group_sync(&mut self) -> io::Result<(u64, Arc<File>)> {
348        // Drain user-space buffer into the kernel.
349        self.file.flush()?;
350        Ok((self.current_lsn, Arc::clone(&self.sync_handle)))
351    }
352
353    /// Manually advance `durable_lsn` after a successful out-of-lock
354    /// `sync_all()` performed via [`WalWriter::drain_for_group_sync`].
355    ///
356    /// Monotonic — never lowers `durable_lsn`. Safe to call with a
357    /// stale `lsn`; just becomes a no-op.
358    pub fn mark_durable(&mut self, lsn: u64) {
359        if lsn > self.durable_lsn {
360            self.durable_lsn = lsn;
361        }
362    }
363
364    /// Truncate the WAL (usually after checkpoint).
365    ///
366    /// Drains the BufWriter first so no pending bytes hit the file
367    /// after the truncate. Then resets the underlying file, rewrites
368    /// the header through the buffered writer (header is small; the
369    /// followup `flush + sync_all` makes it durable), and resets
370    /// LSN bookkeeping.
371    pub fn truncate(&mut self) -> io::Result<()> {
372        // Drop any pending bytes BEFORE the truncate; otherwise the
373        // BufWriter would flush them to a re-shrunken file in
374        // confused order.
375        self.file.flush()?;
376
377        {
378            let raw = self.file.get_mut();
379            raw.set_len(0)?;
380            raw.seek(SeekFrom::Start(0))?;
381        }
382
383        // Rewrite header through the BufWriter then drain.
384        let mut header = Vec::with_capacity(8);
385        header.extend_from_slice(WAL_MAGIC);
386        header.push(WAL_VERSION);
387        header.extend_from_slice(&[0u8; 3]);
388        self.file.write_all(&header)?;
389        self.file.flush()?;
390        self.file.get_ref().sync_all()?;
391
392        self.current_lsn = 8;
393        self.durable_lsn = 8;
394
395        // `set_len(0)` freed every reserved block, so the WAL would otherwise
396        // grow page-by-page again from here. Re-extend a fresh segment now —
397        // this is the "truncate/re-extend on checkpoint" half of issue #893.
398        self.preallocated_to = 0;
399        self.ensure_preallocated()?;
400        Ok(())
401    }
402}
403
404#[cfg(test)]
405mod tests {
406    use super::*;
407    use std::path::PathBuf;
408
409    struct FileGuard {
410        path: PathBuf,
411    }
412
413    impl Drop for FileGuard {
414        fn drop(&mut self) {
415            let _ = std::fs::remove_file(&self.path);
416        }
417    }
418
419    fn temp_wal(name: &str) -> (FileGuard, PathBuf) {
420        let path =
421            std::env::temp_dir().join(format!("rb_wal_writer_{}_{}.wal", name, std::process::id()));
422        let guard = FileGuard { path: path.clone() };
423        let _ = std::fs::remove_file(&path);
424        (guard, path)
425    }
426
427    #[test]
428    fn test_create_new_wal() {
429        let (_guard, path) = temp_wal("create");
430        let writer = WalWriter::open(&path).unwrap();
431
432        // Should start at LSN 8 (after 8-byte header)
433        assert_eq!(writer.current_lsn(), 8);
434        assert!(path.exists());
435    }
436
437    #[test]
438    fn test_append_record() {
439        let (_guard, path) = temp_wal("append");
440        let mut writer = WalWriter::open(&path).unwrap();
441
442        let record = WalRecord::Begin { tx_id: 42 };
443        let lsn = writer.append(&record).unwrap();
444
445        // First record starts at LSN 8
446        assert_eq!(lsn, 8);
447
448        // Next record should start after encoded size
449        // Begin record: 1 (type) + 8 (term) + 8 (tx_id) + 4 (checksum) = 21 bytes
450        assert_eq!(writer.current_lsn(), 8 + 21);
451    }
452
453    #[test]
454    fn test_append_multiple_records() {
455        let (_guard, path) = temp_wal("multi");
456        let mut writer = WalWriter::open(&path).unwrap();
457
458        let lsn1 = writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
459        let lsn2 = writer.append(&WalRecord::Begin { tx_id: 2 }).unwrap();
460        let lsn3 = writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
461
462        assert_eq!(lsn1, 8);
463        assert_eq!(lsn2, 8 + 21);
464        assert_eq!(lsn3, 8 + 21 + 21);
465    }
466
467    #[test]
468    fn test_page_write_lsn() {
469        let (_guard, path) = temp_wal("pagewrite");
470        let mut writer = WalWriter::open(&path).unwrap();
471
472        // First record
473        let lsn1 = writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
474        assert_eq!(lsn1, 8);
475
476        // PageWrite record: 1 + 8 + 4 + 4 + data_len + 4 = 21 + data_len
477        let data = vec![1, 2, 3, 4, 5];
478        let lsn2 = writer
479            .append(&WalRecord::PageWrite {
480                tx_id: 1,
481                page_id: 100,
482                data: data.clone(),
483            })
484            .unwrap();
485
486        assert_eq!(lsn2, 8 + 21); // after Begin
487
488        // Next LSN = lsn2 + (1 + 8 + 8 + 4 + 4 + 5 + 4) = lsn2 + 34
489        assert_eq!(writer.current_lsn(), 8 + 21 + 34);
490    }
491
492    #[test]
493    fn test_sync() {
494        let (_guard, path) = temp_wal("sync");
495        let mut writer = WalWriter::open(&path).unwrap();
496
497        writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
498        writer.sync().unwrap();
499
500        // File should be synced, just verify no error
501        assert!(path.exists());
502    }
503
504    #[test]
505    fn test_truncate() {
506        let (_guard, path) = temp_wal("truncate");
507        let mut writer = WalWriter::open(&path).unwrap();
508
509        // Write some records
510        writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
511        writer
512            .append(&WalRecord::PageWrite {
513                tx_id: 1,
514                page_id: 0,
515                data: vec![0; 100],
516            })
517            .unwrap();
518        writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
519
520        let lsn_before = writer.current_lsn();
521        assert!(lsn_before > 8);
522
523        // Truncate
524        writer.truncate().unwrap();
525
526        // LSN should be back to 8
527        assert_eq!(writer.current_lsn(), 8);
528
529        // File should be 8 bytes (just header)
530        let len = std::fs::metadata(&path).unwrap().len();
531        assert_eq!(len, 8);
532    }
533
534    #[test]
535    fn test_reopen_existing() {
536        let (_guard, path) = temp_wal("reopen");
537
538        // Create and write
539        let lsn_after_write;
540        {
541            let mut writer = WalWriter::open(&path).unwrap();
542            writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
543            writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
544            lsn_after_write = writer.current_lsn();
545        }
546
547        // Reopen
548        {
549            let writer = WalWriter::open(&path).unwrap();
550            // Should continue from where we left off
551            assert_eq!(writer.current_lsn(), lsn_after_write);
552        }
553    }
554
555    #[test]
556    fn test_checkpoint_record() {
557        let (_guard, path) = temp_wal("checkpoint");
558        let mut writer = WalWriter::open(&path).unwrap();
559
560        // Checkpoint is same size as Begin (1 + 8 + 8 + 4 = 21)
561        let lsn = writer
562            .append(&WalRecord::Checkpoint { lsn: 12345 })
563            .unwrap();
564        assert_eq!(lsn, 8);
565        assert_eq!(writer.current_lsn(), 8 + 21);
566    }
567
568    // -----------------------------------------------------------------
569    // Target 3: durable_lsn / flush_until tests
570    // -----------------------------------------------------------------
571
572    #[test]
573    fn fresh_wal_has_durable_lsn_at_header_end() {
574        let (_guard, path) = temp_wal("durable_init");
575        let writer = WalWriter::open(&path).unwrap();
576        assert_eq!(writer.durable_lsn(), 8);
577        assert_eq!(writer.current_lsn(), 8);
578    }
579
580    #[test]
581    fn flush_until_below_durable_is_noop() {
582        let (_guard, path) = temp_wal("flush_noop");
583        let mut writer = WalWriter::open(&path).unwrap();
584        // After open, durable_lsn == 8.
585        let before = writer.durable_lsn();
586        writer.flush_until(0).unwrap();
587        writer.flush_until(8).unwrap();
588        assert_eq!(writer.durable_lsn(), before);
589    }
590
591    #[test]
592    fn flush_until_advances_durable_to_current() {
593        let (_guard, path) = temp_wal("flush_advance");
594        let mut writer = WalWriter::open(&path).unwrap();
595        writer.append(&WalRecord::Begin { tx_id: 7 }).unwrap();
596        writer.append(&WalRecord::Commit { tx_id: 7 }).unwrap();
597        let target = writer.current_lsn();
598        // Before flush_until, durable still at the header.
599        assert_eq!(writer.durable_lsn(), 8);
600        writer.flush_until(target).unwrap();
601        assert_eq!(writer.durable_lsn(), target);
602    }
603
604    #[test]
605    fn flush_until_is_monotonic() {
606        let (_guard, path) = temp_wal("flush_monotonic");
607        let mut writer = WalWriter::open(&path).unwrap();
608        writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
609        let lo = writer.current_lsn();
610        writer.flush_until(lo).unwrap();
611        let durable_after_lo = writer.durable_lsn();
612        writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
613        let hi = writer.current_lsn();
614        writer.flush_until(hi).unwrap();
615        assert!(writer.durable_lsn() >= durable_after_lo);
616        // Calling flush_until(lo) after flush_until(hi) is a no-op.
617        writer.flush_until(lo).unwrap();
618        assert_eq!(writer.durable_lsn(), hi);
619    }
620
621    #[test]
622    fn sync_advances_durable_lsn_too() {
623        let (_guard, path) = temp_wal("sync_durable");
624        let mut writer = WalWriter::open(&path).unwrap();
625        writer.append(&WalRecord::Begin { tx_id: 9 }).unwrap();
626        let before = writer.durable_lsn();
627        let after_append = writer.current_lsn();
628        assert!(after_append > before);
629        writer.sync().unwrap();
630        assert_eq!(writer.durable_lsn(), after_append);
631    }
632
633    #[test]
634    fn truncate_resets_durable_lsn() {
635        let (_guard, path) = temp_wal("truncate_durable");
636        let mut writer = WalWriter::open(&path).unwrap();
637        writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
638        writer.sync().unwrap();
639        assert!(writer.durable_lsn() > 8);
640        writer.truncate().unwrap();
641        assert_eq!(writer.durable_lsn(), 8);
642        assert_eq!(writer.current_lsn(), 8);
643    }
644
645    #[test]
646    fn reopen_initialises_durable_to_current() {
647        let (_guard, path) = temp_wal("reopen_durable");
648        {
649            let mut writer = WalWriter::open(&path).unwrap();
650            writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
651            writer.sync().unwrap();
652        }
653        let writer = WalWriter::open(&path).unwrap();
654        // After reopen, every byte on disk is durable by definition.
655        assert_eq!(writer.durable_lsn(), writer.current_lsn());
656    }
657
658    // -----------------------------------------------------------------
659    // Perf 1.1: BufWriter coalesces small appends until sync
660    // -----------------------------------------------------------------
661
662    #[test]
663    fn bufwriter_coalesces_until_sync() {
664        // Append 100 small records but DO NOT sync. The on-disk file
665        // size must still equal the header (8 bytes) because the
666        // bytes are sitting in the BufWriter, not in the kernel.
667        let (_guard, path) = temp_wal("bufwriter_coalesce");
668        let mut writer = WalWriter::open(&path).unwrap();
669        for tx in 0..100u64 {
670            writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
671        }
672        // current_lsn reflects the in-buffer position.
673        assert_eq!(writer.current_lsn(), 8 + 100 * 21);
674        // But the file on disk only has the header.
675        let on_disk = std::fs::metadata(&path).unwrap().len();
676        assert_eq!(on_disk, 8, "BufWriter leaked bytes to disk before sync");
677    }
678
679    #[test]
680    fn sync_drains_bufwriter_before_fsync() {
681        // After sync(), the file size must equal current_lsn — the
682        // BufWriter has been flushed and sync_all has hit the kernel.
683        let (_guard, path) = temp_wal("sync_drains");
684        let mut writer = WalWriter::open(&path).unwrap();
685        for tx in 0..50u64 {
686            writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
687        }
688        writer.sync().unwrap();
689        let on_disk = std::fs::metadata(&path).unwrap().len();
690        assert_eq!(on_disk, writer.current_lsn());
691        assert_eq!(writer.durable_lsn(), writer.current_lsn());
692    }
693
694    #[test]
695    fn flush_until_drains_bufwriter_too() {
696        // flush_until must drain the BufWriter before calling
697        // sync_all on the underlying file — otherwise pending bytes
698        // never become durable.
699        let (_guard, path) = temp_wal("flush_until_drains");
700        let mut writer = WalWriter::open(&path).unwrap();
701        for tx in 0..30u64 {
702            writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
703        }
704        let target = writer.current_lsn();
705        writer.flush_until(target).unwrap();
706        let on_disk = std::fs::metadata(&path).unwrap().len();
707        assert_eq!(on_disk, target);
708        assert_eq!(writer.durable_lsn(), target);
709    }
710
711    #[test]
712    fn truncate_drains_pending_bufwriter_bytes_first() {
713        // If truncate did NOT drain BufWriter first, the pending bytes
714        // would either land in the post-truncate file (corrupting it
715        // with stale records) or be lost. Verify the resulting file
716        // contains only a fresh header.
717        let (_guard, path) = temp_wal("truncate_drain");
718        let mut writer = WalWriter::open(&path).unwrap();
719        // Write enough small records to fill some of the 64 KiB buffer
720        // but stay below the auto-flush threshold.
721        for tx in 0..200u64 {
722            writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
723        }
724        // Sanity: bytes are buffered.
725        assert_eq!(std::fs::metadata(&path).unwrap().len(), 8);
726
727        writer.truncate().unwrap();
728        // After truncate the file is just the header again.
729        let on_disk = std::fs::metadata(&path).unwrap().len();
730        assert_eq!(on_disk, 8);
731        assert_eq!(writer.current_lsn(), 8);
732        assert_eq!(writer.durable_lsn(), 8);
733
734        // And we can append again successfully.
735        writer.append(&WalRecord::Begin { tx_id: 99 }).unwrap();
736        writer.sync().unwrap();
737        assert_eq!(std::fs::metadata(&path).unwrap().len(), 8 + 21);
738    }
739
740    #[test]
741    fn reopen_sees_only_synced_records() {
742        // Records that were appended but never sync'd must NOT
743        // survive a reopen — they lived in the BufWriter, never made
744        // it to the kernel, and the previous WalWriter went out of
745        // scope. The new WalWriter reopens the file and reads from
746        // EOF, which reflects only the bytes that hit disk.
747        //
748        // We sync some records, then drop the writer mid-buffer, and
749        // assert the reopen LSN matches only the synced prefix.
750        let (_guard, path) = temp_wal("reopen_synced_only");
751        let synced_lsn;
752        {
753            let mut writer = WalWriter::open(&path).unwrap();
754            writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
755            writer.sync().unwrap();
756            synced_lsn = writer.current_lsn();
757            // These records are never sync'd before drop. Drop runs
758            // BufWriter::flush which DOES write them — see note below.
759            for tx in 100..120u64 {
760                writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
761            }
762            // Without a sync, the in-buffer bytes are still pending.
763            // BufWriter's Drop impl does flush to the file but does
764            // not call sync_all. For reopen-LSN purposes, on-disk
765            // bytes count regardless of fsync, so the reopened LSN
766            // will reflect the dropped writes too.
767        }
768        let writer = WalWriter::open(&path).unwrap();
769        // The reopen LSN reflects what's physically on disk after
770        // BufWriter::Drop flushes its buffer. That may or may not
771        // include the unsync'd records depending on platform; the
772        // contract we care about is that durable_lsn ≥ synced_lsn.
773        assert!(writer.durable_lsn() >= synced_lsn);
774    }
775
776    // -----------------------------------------------------------------
777    // Issue #893: fallocate-based WAL segment preallocation
778    // -----------------------------------------------------------------
779
780    /// On-disk blocks reserved by `fallocate`, in bytes. Returns the
781    /// allocated size (st_blocks × 512), independent of the logical length.
782    fn allocated_bytes(path: &std::path::Path) -> u64 {
783        use fs2::FileExt;
784        let f = std::fs::File::open(path).unwrap();
785        f.allocated_size().unwrap()
786    }
787
788    #[test]
789    fn segment_boundary_rounds_strictly_up() {
790        // Always lands one boundary ahead so the reservation stays in front
791        // of the write frontier.
792        assert_eq!(next_wal_segment_boundary(0), WAL_SEGMENT_BYTES);
793        assert_eq!(next_wal_segment_boundary(8), WAL_SEGMENT_BYTES);
794        assert_eq!(
795            next_wal_segment_boundary(WAL_SEGMENT_BYTES - 1),
796            WAL_SEGMENT_BYTES
797        );
798        // Exactly on a boundary still advances to the next one.
799        assert_eq!(
800            next_wal_segment_boundary(WAL_SEGMENT_BYTES),
801            2 * WAL_SEGMENT_BYTES
802        );
803        assert_eq!(
804            next_wal_segment_boundary(WAL_SEGMENT_BYTES + 1),
805            2 * WAL_SEGMENT_BYTES
806        );
807    }
808
809    #[test]
810    fn open_preallocates_first_segment() {
811        // A freshly opened WAL must reserve a whole segment up front instead
812        // of growing incrementally (acceptance #1).
813        let (_guard, path) = temp_wal("prealloc_open");
814        let writer = WalWriter::open(&path).unwrap();
815        if !writer.prealloc_supported {
816            return; // filesystem without fallocate — feature is a no-op.
817        }
818        assert_eq!(writer.preallocated_to, WAL_SEGMENT_BYTES);
819        // The reservation is real on disk, yet the logical file is still just
820        // the 8-byte header.
821        assert!(allocated_bytes(&path) >= WAL_SEGMENT_BYTES);
822        assert_eq!(std::fs::metadata(&path).unwrap().len(), 8);
823    }
824
825    #[test]
826    fn preallocation_does_not_grow_logical_length() {
827        // The load-bearing invariant for crash recovery: appending records
828        // must NOT inflate the logical file size beyond the real data, or the
829        // EOF scan in WalReader would walk into the reserved tail. Holds on
830        // every filesystem (fallocate keeps i_size pinned; absent fallocate
831        // there is no reservation at all).
832        let (_guard, path) = temp_wal("prealloc_logical");
833        let mut writer = WalWriter::open(&path).unwrap();
834        for tx in 0..50u64 {
835            writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
836        }
837        writer.sync().unwrap();
838        let logical = std::fs::metadata(&path).unwrap().len();
839        assert_eq!(logical, 8 + 50 * 21, "preallocation inflated i_size");
840        assert_eq!(writer.current_lsn(), logical);
841    }
842
843    #[test]
844    fn truncate_re_extends_a_fresh_segment() {
845        // After checkpoint truncation the WAL must re-extend rather than grow
846        // unbounded page-by-page (acceptance #2).
847        let (_guard, path) = temp_wal("prealloc_truncate");
848        let mut writer = WalWriter::open(&path).unwrap();
849        writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
850        writer.sync().unwrap();
851
852        writer.truncate().unwrap();
853
854        assert_eq!(writer.current_lsn(), 8);
855        assert_eq!(std::fs::metadata(&path).unwrap().len(), 8);
856        if writer.prealloc_supported {
857            assert_eq!(writer.preallocated_to, WAL_SEGMENT_BYTES);
858            assert!(allocated_bytes(&path) >= WAL_SEGMENT_BYTES);
859        }
860    }
861
862    #[test]
863    fn preallocated_wal_recovers_records_without_trailing_garbage() {
864        // End-to-end: a preallocated WAL must read back exactly the records
865        // written — the reserved (unwritten) tail must be invisible to the
866        // reader, proving crash-recovery is unchanged (acceptance #3).
867        use super::super::reader::WalReader;
868        let (_guard, path) = temp_wal("prealloc_recover");
869        {
870            let mut writer = WalWriter::open(&path).unwrap();
871            writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
872            writer
873                .append(&WalRecord::PageWrite {
874                    tx_id: 1,
875                    page_id: 7,
876                    data: vec![1, 2, 3, 4],
877                })
878                .unwrap();
879            writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
880            writer.sync().unwrap();
881        }
882        let records: Vec<_> = WalReader::open(&path)
883            .unwrap()
884            .iter()
885            .collect::<Result<_, _>>()
886            .expect("reader must stop cleanly at real EOF, not in reserved tail");
887        assert_eq!(records.len(), 3);
888        assert_eq!(records[0].1, WalRecord::Begin { tx_id: 1 });
889        assert_eq!(records[2].1, WalRecord::Commit { tx_id: 1 });
890    }
891}