Skip to main content

reddb_server/storage/wal/
writer.rs

1use super::record::{WalRecord, WAL_MAGIC, WAL_VERSION};
2use std::fs::{File, OpenOptions};
3use std::io::{self, BufWriter, Seek, SeekFrom, Write};
4use std::path::Path;
5use std::sync::Arc;
6
7/// User-space buffer size for the WAL writer.
8///
9/// Chosen so that ~5 000 small records (Begin/Commit ≈ 13 bytes,
10/// small PageWrite ≈ 26 bytes) coalesce into a single `write` syscall
11/// before the next `sync()` drains the buffer. Tunable; reflects the
12/// postgres XLOG block size (8 KiB) scaled up because we batch
13/// record-level rather than page-level.
14const WAL_BUFFER_BYTES: usize = 64 * 1024;
15
16/// Writer for the Write-Ahead Log
17///
18/// Wraps the underlying file in a [`BufWriter`] so each `append` does
19/// not pay a write syscall — bytes accumulate in a 64 KiB user-space
20/// buffer until `sync()` (or `flush_until()`) drains them and then
21/// calls `sync_all()` on the raw file. This is how postgres turns
22/// per-record append cost from ~500 ns down to ~5 ns; reddb's previous
23/// per-append `write_all` directly to the file paid the syscall on
24/// every record.
25///
26/// **Critical contract:** every code path that calls `sync_all()` on
27/// the underlying file *must* drain the [`BufWriter`] first via
28/// `BufWriter::flush()`. Otherwise the bytes in user-space never reach
29/// the kernel before fsync, and durability is silently broken.
30pub struct WalWriter {
31    file: BufWriter<File>,
32    /// Cloned file descriptor for `sync_all()` outside the writer
33    /// mutex. Both this and `file`'s inner `File` point at the same
34    /// kernel inode; calling `sync_all()` on either flushes ALL
35    /// pending bytes for that inode. This is the trick that lets
36    /// the group-commit leader release the WAL writer lock during
37    /// the expensive fsync — see [`WalWriter::drain_for_group_sync`].
38    ///
39    /// Without this clone, a leader holding the writer mutex during
40    /// `sync_all()` blocks every other writer from appending,
41    /// defeating the entire purpose of group commit.
42    sync_handle: Arc<File>,
43    /// Log Sequence Number — byte offset of the next record. Advances
44    /// every `append`; survives across restarts via `seek(End)`.
45    current_lsn: u64,
46    /// Highest LSN that has been `sync_all()`'d to disk. The WAL-first
47    /// flush invariant relies on this: a page with `header.lsn = L` may
48    /// only be written to its data file once `durable_lsn >= L`.
49    /// See `src/storage/cache/README.md` § Invariant 2 and the Target 3
50    /// section of `PLAN.md`.
51    durable_lsn: u64,
52}
53
54impl WalWriter {
55    /// Open a WAL file for writing. Creates it if it doesn't exist.
56    pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
57        let exists = path.as_ref().exists();
58
59        // We do all initial bookkeeping (write header, seek to EOF) on
60        // the raw `File` BEFORE wrapping in a BufWriter so we don't
61        // have to worry about flush ordering during construction.
62        let mut raw = OpenOptions::new()
63            .read(true)
64            .create(true)
65            .append(true)
66            .open(path)?;
67
68        let current_lsn = if !exists || raw.metadata()?.len() == 0 {
69            // Write header for new file
70            // Format: Magic (4) + Version (1) + Reserved (3)
71            let mut header = Vec::with_capacity(8);
72            header.extend_from_slice(WAL_MAGIC);
73            header.push(WAL_VERSION);
74            header.extend_from_slice(&[0u8; 3]); // Reserved
75
76            raw.write_all(&header)?;
77            raw.sync_all()?;
78            8
79        } else {
80            // Existing file, set LSN to current end. Append-mode files
81            // ignore this seek for *writes*, but we use the returned
82            // position as our LSN counter.
83            raw.seek(SeekFrom::End(0))?
84        };
85
86        // Clone the file handle BEFORE wrapping in BufWriter. The
87        // clone shares the same kernel file description, so
88        // sync_all() on either descriptor flushes the whole inode.
89        // The BufWriter owns the original; the Arc<File> is shared
90        // with the group-commit leader.
91        let sync_handle = Arc::new(raw.try_clone()?);
92        let file = BufWriter::with_capacity(WAL_BUFFER_BYTES, raw);
93
94        // On open, every byte already on disk is by definition durable
95        // (any pre-crash unflushed tail was lost when the OS dropped
96        // page cache). Initialise `durable_lsn` to `current_lsn`.
97        Ok(Self {
98            file,
99            sync_handle,
100            current_lsn,
101            durable_lsn: current_lsn,
102        })
103    }
104
105    /// Append a record to the WAL.
106    ///
107    /// Bytes go into the BufWriter — they are NOT durable on disk
108    /// after this call returns. Callers that need durability must
109    /// follow up with [`WalWriter::sync`] or
110    /// [`WalWriter::flush_until`].
111    ///
112    /// Returns the LSN (Log Sequence Number) of the record.
113    pub fn append(&mut self, record: &WalRecord) -> io::Result<u64> {
114        let bytes = record.encode();
115        self.file.write_all(&bytes)?;
116
117        let record_lsn = self.current_lsn;
118        self.current_lsn += bytes.len() as u64;
119
120        Ok(record_lsn)
121    }
122
123    /// Write already-encoded bytes and advance the LSN counter to
124    /// match. Used by the lock-free append path: writers encode +
125    /// atomically reserve an LSN range outside this writer, the
126    /// group-commit coordinator drains the pending queue in LSN
127    /// order, then calls `append_bytes` for each batch.
128    ///
129    /// The bytes MUST be a valid `WalRecord::encode()` payload (or a
130    /// concatenation of such) — no structural validation happens
131    /// here. The caller is responsible for keeping the on-disk
132    /// byte offset synchronised with the externally-tracked LSN
133    /// counter; this method just appends and advances.
134    pub fn append_bytes(&mut self, bytes: &[u8]) -> io::Result<u64> {
135        self.file.write_all(bytes)?;
136        let record_lsn = self.current_lsn;
137        self.current_lsn += bytes.len() as u64;
138        Ok(record_lsn)
139    }
140
141    /// Rewind the writer's LSN counter to a specific value. Used
142    /// by the lock-free append path to resync the writer with the
143    /// externally-tracked `next_lsn` after a drain batch; the
144    /// coordinator knows the exact byte offset it just wrote to
145    /// and needs `current_lsn` to match so subsequent direct
146    /// callers of `append` stay consistent.
147    pub fn set_current_lsn(&mut self, lsn: u64) {
148        self.current_lsn = lsn;
149    }
150
151    /// Force sync to disk.
152    ///
153    /// Drains the user-space [`BufWriter`] first, then calls
154    /// `sync_all()` on the underlying file so every byte appended
155    /// since the last sync is durable. Updates `durable_lsn` so
156    /// subsequent `flush_until` calls become no-ops up to
157    /// `current_lsn`.
158    pub fn sync(&mut self) -> io::Result<()> {
159        self.file.flush()?;
160        self.file.get_ref().sync_all()?;
161        self.durable_lsn = self.current_lsn;
162        Ok(())
163    }
164
165    /// Ensure the WAL is durable on disk at least up to byte offset
166    /// `target`. No-op when `target <= durable_lsn`.
167    ///
168    /// This is the postgres `XLogFlush(LSN)` analogue. Pager flush
169    /// paths call this with `max(dirty.header.lsn)` before writing
170    /// any data page so the WAL record describing the change is
171    /// guaranteed to be on disk before the page itself.
172    pub fn flush_until(&mut self, target: u64) -> io::Result<()> {
173        if self.durable_lsn >= target {
174            return Ok(());
175        }
176        self.file.flush()?;
177        self.file.get_ref().sync_all()?;
178        self.durable_lsn = self.current_lsn;
179        Ok(())
180    }
181
182    /// Highest byte offset that is durable on disk. Used by the pager
183    /// to decide whether a `flush_until` call would actually need a
184    /// `fsync`.
185    pub fn durable_lsn(&self) -> u64 {
186        self.durable_lsn
187    }
188
189    /// Get current LSN (end of file offset)
190    pub fn current_lsn(&self) -> u64 {
191        self.current_lsn
192    }
193
194    /// Drain the BufWriter into the kernel and return the captured
195    /// LSN plus a cloned file handle for the caller to fsync
196    /// **without holding the WAL writer mutex**.
197    ///
198    /// Used by the group-commit leader path. The flow is:
199    ///
200    /// 1. Take the WAL writer mutex.
201    /// 2. Call this method — drains user-space buffer to the kernel
202    ///    and captures `(target_lsn, sync_handle)`.
203    /// 3. Release the WAL writer mutex.
204    /// 4. Call `sync_handle.sync_all()` — this is the expensive
205    ///    ~100 µs syscall, and other writers can keep appending
206    ///    while it runs.
207    /// 5. Take the WAL writer mutex briefly and call
208    ///    [`WalWriter::mark_durable(target_lsn)`] to publish the
209    ///    new durable position.
210    ///
211    /// The cloned `sync_handle` shares the same kernel inode with
212    /// the writer's `file`, so `sync_all()` on the clone flushes
213    /// ALL bytes that have reached the kernel for that file —
214    /// including bytes appended by other writers AFTER step 3.
215    /// This is the coalescing window that makes group commit win.
216    pub fn drain_for_group_sync(&mut self) -> io::Result<(u64, Arc<File>)> {
217        // Drain user-space buffer into the kernel.
218        self.file.flush()?;
219        Ok((self.current_lsn, Arc::clone(&self.sync_handle)))
220    }
221
222    /// Manually advance `durable_lsn` after a successful out-of-lock
223    /// `sync_all()` performed via [`WalWriter::drain_for_group_sync`].
224    ///
225    /// Monotonic — never lowers `durable_lsn`. Safe to call with a
226    /// stale `lsn`; just becomes a no-op.
227    pub fn mark_durable(&mut self, lsn: u64) {
228        if lsn > self.durable_lsn {
229            self.durable_lsn = lsn;
230        }
231    }
232
233    /// Truncate the WAL (usually after checkpoint).
234    ///
235    /// Drains the BufWriter first so no pending bytes hit the file
236    /// after the truncate. Then resets the underlying file, rewrites
237    /// the header through the buffered writer (header is small; the
238    /// followup `flush + sync_all` makes it durable), and resets
239    /// LSN bookkeeping.
240    pub fn truncate(&mut self) -> io::Result<()> {
241        // Drop any pending bytes BEFORE the truncate; otherwise the
242        // BufWriter would flush them to a re-shrunken file in
243        // confused order.
244        self.file.flush()?;
245
246        {
247            let raw = self.file.get_mut();
248            raw.set_len(0)?;
249            raw.seek(SeekFrom::Start(0))?;
250        }
251
252        // Rewrite header through the BufWriter then drain.
253        let mut header = Vec::with_capacity(8);
254        header.extend_from_slice(WAL_MAGIC);
255        header.push(WAL_VERSION);
256        header.extend_from_slice(&[0u8; 3]);
257        self.file.write_all(&header)?;
258        self.file.flush()?;
259        self.file.get_ref().sync_all()?;
260
261        self.current_lsn = 8;
262        self.durable_lsn = 8;
263        Ok(())
264    }
265}
266
267#[cfg(test)]
268mod tests {
269    use super::*;
270    use std::path::PathBuf;
271
272    struct FileGuard {
273        path: PathBuf,
274    }
275
276    impl Drop for FileGuard {
277        fn drop(&mut self) {
278            let _ = std::fs::remove_file(&self.path);
279        }
280    }
281
282    fn temp_wal(name: &str) -> (FileGuard, PathBuf) {
283        let path =
284            std::env::temp_dir().join(format!("rb_wal_writer_{}_{}.wal", name, std::process::id()));
285        let guard = FileGuard { path: path.clone() };
286        let _ = std::fs::remove_file(&path);
287        (guard, path)
288    }
289
290    #[test]
291    fn test_create_new_wal() {
292        let (_guard, path) = temp_wal("create");
293        let writer = WalWriter::open(&path).unwrap();
294
295        // Should start at LSN 8 (after 8-byte header)
296        assert_eq!(writer.current_lsn(), 8);
297        assert!(path.exists());
298    }
299
300    #[test]
301    fn test_append_record() {
302        let (_guard, path) = temp_wal("append");
303        let mut writer = WalWriter::open(&path).unwrap();
304
305        let record = WalRecord::Begin { tx_id: 42 };
306        let lsn = writer.append(&record).unwrap();
307
308        // First record starts at LSN 8
309        assert_eq!(lsn, 8);
310
311        // Next record should start after encoded size
312        // Begin record: 1 (type) + 8 (tx_id) + 4 (checksum) = 13 bytes
313        assert_eq!(writer.current_lsn(), 8 + 13);
314    }
315
316    #[test]
317    fn test_append_multiple_records() {
318        let (_guard, path) = temp_wal("multi");
319        let mut writer = WalWriter::open(&path).unwrap();
320
321        let lsn1 = writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
322        let lsn2 = writer.append(&WalRecord::Begin { tx_id: 2 }).unwrap();
323        let lsn3 = writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
324
325        assert_eq!(lsn1, 8);
326        assert_eq!(lsn2, 8 + 13);
327        assert_eq!(lsn3, 8 + 13 + 13);
328    }
329
330    #[test]
331    fn test_page_write_lsn() {
332        let (_guard, path) = temp_wal("pagewrite");
333        let mut writer = WalWriter::open(&path).unwrap();
334
335        // First record
336        let lsn1 = writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
337        assert_eq!(lsn1, 8);
338
339        // PageWrite record: 1 + 8 + 4 + 4 + data_len + 4 = 21 + data_len
340        let data = vec![1, 2, 3, 4, 5];
341        let lsn2 = writer
342            .append(&WalRecord::PageWrite {
343                tx_id: 1,
344                page_id: 100,
345                data: data.clone(),
346            })
347            .unwrap();
348
349        assert_eq!(lsn2, 8 + 13); // after Begin
350
351        // Next LSN = lsn2 + (1 + 8 + 4 + 4 + 5 + 4) = lsn2 + 26
352        assert_eq!(writer.current_lsn(), 8 + 13 + 26);
353    }
354
355    #[test]
356    fn test_sync() {
357        let (_guard, path) = temp_wal("sync");
358        let mut writer = WalWriter::open(&path).unwrap();
359
360        writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
361        writer.sync().unwrap();
362
363        // File should be synced, just verify no error
364        assert!(path.exists());
365    }
366
367    #[test]
368    fn test_truncate() {
369        let (_guard, path) = temp_wal("truncate");
370        let mut writer = WalWriter::open(&path).unwrap();
371
372        // Write some records
373        writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
374        writer
375            .append(&WalRecord::PageWrite {
376                tx_id: 1,
377                page_id: 0,
378                data: vec![0; 100],
379            })
380            .unwrap();
381        writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
382
383        let lsn_before = writer.current_lsn();
384        assert!(lsn_before > 8);
385
386        // Truncate
387        writer.truncate().unwrap();
388
389        // LSN should be back to 8
390        assert_eq!(writer.current_lsn(), 8);
391
392        // File should be 8 bytes (just header)
393        let len = std::fs::metadata(&path).unwrap().len();
394        assert_eq!(len, 8);
395    }
396
397    #[test]
398    fn test_reopen_existing() {
399        let (_guard, path) = temp_wal("reopen");
400
401        // Create and write
402        let lsn_after_write;
403        {
404            let mut writer = WalWriter::open(&path).unwrap();
405            writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
406            writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
407            lsn_after_write = writer.current_lsn();
408        }
409
410        // Reopen
411        {
412            let writer = WalWriter::open(&path).unwrap();
413            // Should continue from where we left off
414            assert_eq!(writer.current_lsn(), lsn_after_write);
415        }
416    }
417
418    #[test]
419    fn test_checkpoint_record() {
420        let (_guard, path) = temp_wal("checkpoint");
421        let mut writer = WalWriter::open(&path).unwrap();
422
423        // Checkpoint is same size as Begin (1 + 8 + 4 = 13)
424        let lsn = writer
425            .append(&WalRecord::Checkpoint { lsn: 12345 })
426            .unwrap();
427        assert_eq!(lsn, 8);
428        assert_eq!(writer.current_lsn(), 8 + 13);
429    }
430
431    // -----------------------------------------------------------------
432    // Target 3: durable_lsn / flush_until tests
433    // -----------------------------------------------------------------
434
435    #[test]
436    fn fresh_wal_has_durable_lsn_at_header_end() {
437        let (_guard, path) = temp_wal("durable_init");
438        let writer = WalWriter::open(&path).unwrap();
439        assert_eq!(writer.durable_lsn(), 8);
440        assert_eq!(writer.current_lsn(), 8);
441    }
442
443    #[test]
444    fn flush_until_below_durable_is_noop() {
445        let (_guard, path) = temp_wal("flush_noop");
446        let mut writer = WalWriter::open(&path).unwrap();
447        // After open, durable_lsn == 8.
448        let before = writer.durable_lsn();
449        writer.flush_until(0).unwrap();
450        writer.flush_until(8).unwrap();
451        assert_eq!(writer.durable_lsn(), before);
452    }
453
454    #[test]
455    fn flush_until_advances_durable_to_current() {
456        let (_guard, path) = temp_wal("flush_advance");
457        let mut writer = WalWriter::open(&path).unwrap();
458        writer.append(&WalRecord::Begin { tx_id: 7 }).unwrap();
459        writer.append(&WalRecord::Commit { tx_id: 7 }).unwrap();
460        let target = writer.current_lsn();
461        // Before flush_until, durable still at the header.
462        assert_eq!(writer.durable_lsn(), 8);
463        writer.flush_until(target).unwrap();
464        assert_eq!(writer.durable_lsn(), target);
465    }
466
467    #[test]
468    fn flush_until_is_monotonic() {
469        let (_guard, path) = temp_wal("flush_monotonic");
470        let mut writer = WalWriter::open(&path).unwrap();
471        writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
472        let lo = writer.current_lsn();
473        writer.flush_until(lo).unwrap();
474        let durable_after_lo = writer.durable_lsn();
475        writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
476        let hi = writer.current_lsn();
477        writer.flush_until(hi).unwrap();
478        assert!(writer.durable_lsn() >= durable_after_lo);
479        // Calling flush_until(lo) after flush_until(hi) is a no-op.
480        writer.flush_until(lo).unwrap();
481        assert_eq!(writer.durable_lsn(), hi);
482    }
483
484    #[test]
485    fn sync_advances_durable_lsn_too() {
486        let (_guard, path) = temp_wal("sync_durable");
487        let mut writer = WalWriter::open(&path).unwrap();
488        writer.append(&WalRecord::Begin { tx_id: 9 }).unwrap();
489        let before = writer.durable_lsn();
490        let after_append = writer.current_lsn();
491        assert!(after_append > before);
492        writer.sync().unwrap();
493        assert_eq!(writer.durable_lsn(), after_append);
494    }
495
496    #[test]
497    fn truncate_resets_durable_lsn() {
498        let (_guard, path) = temp_wal("truncate_durable");
499        let mut writer = WalWriter::open(&path).unwrap();
500        writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
501        writer.sync().unwrap();
502        assert!(writer.durable_lsn() > 8);
503        writer.truncate().unwrap();
504        assert_eq!(writer.durable_lsn(), 8);
505        assert_eq!(writer.current_lsn(), 8);
506    }
507
508    #[test]
509    fn reopen_initialises_durable_to_current() {
510        let (_guard, path) = temp_wal("reopen_durable");
511        {
512            let mut writer = WalWriter::open(&path).unwrap();
513            writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
514            writer.sync().unwrap();
515        }
516        let writer = WalWriter::open(&path).unwrap();
517        // After reopen, every byte on disk is durable by definition.
518        assert_eq!(writer.durable_lsn(), writer.current_lsn());
519    }
520
521    // -----------------------------------------------------------------
522    // Perf 1.1: BufWriter coalesces small appends until sync
523    // -----------------------------------------------------------------
524
525    #[test]
526    fn bufwriter_coalesces_until_sync() {
527        // Append 100 small records but DO NOT sync. The on-disk file
528        // size must still equal the header (8 bytes) because the
529        // bytes are sitting in the BufWriter, not in the kernel.
530        let (_guard, path) = temp_wal("bufwriter_coalesce");
531        let mut writer = WalWriter::open(&path).unwrap();
532        for tx in 0..100u64 {
533            writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
534        }
535        // current_lsn reflects the in-buffer position.
536        assert_eq!(writer.current_lsn(), 8 + 100 * 13);
537        // But the file on disk only has the header.
538        let on_disk = std::fs::metadata(&path).unwrap().len();
539        assert_eq!(on_disk, 8, "BufWriter leaked bytes to disk before sync");
540    }
541
542    #[test]
543    fn sync_drains_bufwriter_before_fsync() {
544        // After sync(), the file size must equal current_lsn — the
545        // BufWriter has been flushed and sync_all has hit the kernel.
546        let (_guard, path) = temp_wal("sync_drains");
547        let mut writer = WalWriter::open(&path).unwrap();
548        for tx in 0..50u64 {
549            writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
550        }
551        writer.sync().unwrap();
552        let on_disk = std::fs::metadata(&path).unwrap().len();
553        assert_eq!(on_disk, writer.current_lsn());
554        assert_eq!(writer.durable_lsn(), writer.current_lsn());
555    }
556
557    #[test]
558    fn flush_until_drains_bufwriter_too() {
559        // flush_until must drain the BufWriter before calling
560        // sync_all on the underlying file — otherwise pending bytes
561        // never become durable.
562        let (_guard, path) = temp_wal("flush_until_drains");
563        let mut writer = WalWriter::open(&path).unwrap();
564        for tx in 0..30u64 {
565            writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
566        }
567        let target = writer.current_lsn();
568        writer.flush_until(target).unwrap();
569        let on_disk = std::fs::metadata(&path).unwrap().len();
570        assert_eq!(on_disk, target);
571        assert_eq!(writer.durable_lsn(), target);
572    }
573
574    #[test]
575    fn truncate_drains_pending_bufwriter_bytes_first() {
576        // If truncate did NOT drain BufWriter first, the pending bytes
577        // would either land in the post-truncate file (corrupting it
578        // with stale records) or be lost. Verify the resulting file
579        // contains only a fresh header.
580        let (_guard, path) = temp_wal("truncate_drain");
581        let mut writer = WalWriter::open(&path).unwrap();
582        // Write enough small records to fill some of the 64 KiB buffer
583        // but stay below the auto-flush threshold.
584        for tx in 0..200u64 {
585            writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
586        }
587        // Sanity: bytes are buffered.
588        assert_eq!(std::fs::metadata(&path).unwrap().len(), 8);
589
590        writer.truncate().unwrap();
591        // After truncate the file is just the header again.
592        let on_disk = std::fs::metadata(&path).unwrap().len();
593        assert_eq!(on_disk, 8);
594        assert_eq!(writer.current_lsn(), 8);
595        assert_eq!(writer.durable_lsn(), 8);
596
597        // And we can append again successfully.
598        writer.append(&WalRecord::Begin { tx_id: 99 }).unwrap();
599        writer.sync().unwrap();
600        assert_eq!(std::fs::metadata(&path).unwrap().len(), 8 + 13);
601    }
602
603    #[test]
604    fn reopen_sees_only_synced_records() {
605        // Records that were appended but never sync'd must NOT
606        // survive a reopen — they lived in the BufWriter, never made
607        // it to the kernel, and the previous WalWriter went out of
608        // scope. The new WalWriter reopens the file and reads from
609        // EOF, which reflects only the bytes that hit disk.
610        //
611        // We sync some records, then drop the writer mid-buffer, and
612        // assert the reopen LSN matches only the synced prefix.
613        let (_guard, path) = temp_wal("reopen_synced_only");
614        let synced_lsn;
615        {
616            let mut writer = WalWriter::open(&path).unwrap();
617            writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
618            writer.sync().unwrap();
619            synced_lsn = writer.current_lsn();
620            // These records are never sync'd before drop. Drop runs
621            // BufWriter::flush which DOES write them — see note below.
622            for tx in 100..120u64 {
623                writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
624            }
625            // Without a sync, the in-buffer bytes are still pending.
626            // BufWriter's Drop impl does flush to the file but does
627            // not call sync_all. For reopen-LSN purposes, on-disk
628            // bytes count regardless of fsync, so the reopened LSN
629            // will reflect the dropped writes too.
630        }
631        let writer = WalWriter::open(&path).unwrap();
632        // The reopen LSN reflects what's physically on disk after
633        // BufWriter::Drop flushes its buffer. That may or may not
634        // include the unsync'd records depending on platform; the
635        // contract we care about is that durable_lsn ≥ synced_lsn.
636        assert!(writer.durable_lsn() >= synced_lsn);
637    }
638}