reddb_server/storage/wal/writer.rs
1use super::record::{WalRecord, WAL_MAGIC, WAL_VERSION};
2use std::fs::{File, OpenOptions};
3use std::io::{self, BufWriter, Seek, SeekFrom, Write};
4use std::path::Path;
5use std::sync::Arc;
6
7/// User-space buffer size for the WAL writer.
8///
9/// Chosen so that ~5 000 small records (Begin/Commit ≈ 21 bytes,
10/// small PageWrite ≈ 34 bytes) coalesce into a single `write` syscall
11/// before the next `sync()` drains the buffer. Tunable; reflects the
12/// postgres XLOG block size (8 KiB) scaled up because we batch
13/// record-level rather than page-level.
14const WAL_BUFFER_BYTES: usize = 64 * 1024;
15
16/// Size of one pre-allocated WAL segment.
17///
18/// The writer keeps disk blocks reserved one segment ahead of its write
19/// frontier via `fallocate(2)` with `FALLOC_FL_KEEP_SIZE`, so the
20/// continuously-growing WAL lands in contiguous extents instead of
21/// fragmenting the data file's extents on ext4/XFS (issue #893, PRD #851).
22/// 16 MiB mirrors postgres' default WAL segment size.
23const WAL_SEGMENT_BYTES: u64 = 16 * 1024 * 1024;
24
25/// Next segment boundary strictly above `pos`.
26///
27/// `pos` already at a boundary still rounds *up* to the following one, so the
28/// reservation always stays at least one boundary ahead of the frontier.
29#[inline]
30fn next_wal_segment_boundary(pos: u64) -> u64 {
31 (pos / WAL_SEGMENT_BYTES + 1) * WAL_SEGMENT_BYTES
32}
33
34/// Reserve disk blocks for `[offset, offset + len)` **without** growing the
35/// file's logical length (`FALLOC_FL_KEEP_SIZE`).
36///
37/// Pinning `i_size` is the whole trick that makes preallocation invisible to
38/// crash recovery: the WAL's logical end stays equal to its real data length,
39/// so [`WalReader`](super::reader::WalReader)'s EOF scan never walks into a
40/// zero-filled reserved tail (a `0x00` type byte would otherwise decode to an
41/// "Invalid record type" error and abort recovery). This is why we cannot use
42/// `fs2::allocate` here — it calls `posix_fallocate`, which *extends* `i_size`.
43///
44/// Linux-only; other targets return [`io::ErrorKind::Unsupported`] so the
45/// caller disables the optimization silently.
46#[cfg(target_os = "linux")]
47fn reserve_wal_blocks(file: &File, offset: u64, len: u64) -> io::Result<()> {
48 use std::os::unix::io::AsRawFd;
49 if len == 0 {
50 return Ok(());
51 }
52 // SAFETY: `file` owns a valid fd for the duration of the call; fallocate
53 // only mutates block reservations for that fd, never process memory.
54 let ret = unsafe {
55 libc::fallocate(
56 file.as_raw_fd(),
57 libc::FALLOC_FL_KEEP_SIZE,
58 offset as libc::off_t,
59 len as libc::off_t,
60 )
61 };
62 if ret == 0 {
63 Ok(())
64 } else {
65 Err(io::Error::last_os_error())
66 }
67}
68
69#[cfg(not(target_os = "linux"))]
70fn reserve_wal_blocks(_file: &File, _offset: u64, _len: u64) -> io::Result<()> {
71 Err(io::Error::new(
72 io::ErrorKind::Unsupported,
73 "WAL preallocation is only implemented on linux",
74 ))
75}
76
77/// Whether a `fallocate` failure means "this filesystem can't preallocate"
78/// (tmpfs, overlayfs, many network filesystems) rather than a real I/O error.
79/// Those are soft failures that flip the feature off; anything else is left to
80/// the normal write path to surface (e.g. a genuine `ENOSPC`).
81fn fallocate_unsupported(err: &io::Error) -> bool {
82 if err.kind() == io::ErrorKind::Unsupported {
83 return true;
84 }
85 #[cfg(target_os = "linux")]
86 {
87 matches!(
88 err.raw_os_error(),
89 Some(libc::EOPNOTSUPP) | Some(libc::ENOSYS) | Some(libc::EINVAL)
90 )
91 }
92 #[cfg(not(target_os = "linux"))]
93 {
94 false
95 }
96}
97
98/// Writer for the Write-Ahead Log
99///
100/// Wraps the underlying file in a [`BufWriter`] so each `append` does
101/// not pay a write syscall — bytes accumulate in a 64 KiB user-space
102/// buffer until `sync()` (or `flush_until()`) drains them and then
103/// calls `sync_all()` on the raw file. This is how postgres turns
104/// per-record append cost from ~500 ns down to ~5 ns; reddb's previous
105/// per-append `write_all` directly to the file paid the syscall on
106/// every record.
107///
108/// **Critical contract:** every code path that calls `sync_all()` on
109/// the underlying file *must* drain the [`BufWriter`] first via
110/// `BufWriter::flush()`. Otherwise the bytes in user-space never reach
111/// the kernel before fsync, and durability is silently broken.
112pub struct WalWriter {
113 file: BufWriter<File>,
114 /// Cloned file descriptor for `sync_all()` outside the writer
115 /// mutex. Both this and `file`'s inner `File` point at the same
116 /// kernel inode; calling `sync_all()` on either flushes ALL
117 /// pending bytes for that inode. This is the trick that lets
118 /// the group-commit leader release the WAL writer lock during
119 /// the expensive fsync — see [`WalWriter::drain_for_group_sync`].
120 ///
121 /// Without this clone, a leader holding the writer mutex during
122 /// `sync_all()` blocks every other writer from appending,
123 /// defeating the entire purpose of group commit.
124 sync_handle: Arc<File>,
125 /// Log Sequence Number — byte offset of the next record. Advances
126 /// every `append`; survives across restarts via `seek(End)`.
127 current_lsn: u64,
128 /// Highest LSN that has been `sync_all()`'d to disk. The WAL-first
129 /// flush invariant relies on this: a page with `header.lsn = L` may
130 /// only be written to its data file once `durable_lsn >= L`.
131 /// See `src/storage/cache/README.md` § Invariant 2 and the Target 3
132 /// section of `PLAN.md`.
133 durable_lsn: u64,
134 /// Exclusive byte offset up to which disk blocks are pre-reserved via
135 /// `fallocate(FALLOC_FL_KEEP_SIZE)`. Advances one [`WAL_SEGMENT_BYTES`]
136 /// segment at a time as `current_lsn` approaches it (issue #893). Reset to
137 /// `0` on [`truncate`](Self::truncate) — which frees the blocks — and
138 /// immediately re-extended (the checkpoint re-extend path).
139 preallocated_to: u64,
140 /// Cleared the first time `fallocate` reports the backing filesystem can't
141 /// preallocate (tmpfs/overlay/NFS → `EOPNOTSUPP`/`ENOSYS`, or any non-Linux
142 /// target) so we stop issuing syscalls that will always fail. Preallocation
143 /// is a best-effort optimization; clearing this never affects correctness.
144 prealloc_supported: bool,
145}
146
147impl WalWriter {
148 /// Open a WAL file for writing. Creates it if it doesn't exist.
149 pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
150 let exists = path.as_ref().exists();
151
152 // We do all initial bookkeeping (write header, seek to EOF) on
153 // the raw `File` BEFORE wrapping in a BufWriter so we don't
154 // have to worry about flush ordering during construction.
155 let mut raw = OpenOptions::new()
156 .read(true)
157 .create(true)
158 .append(true)
159 .open(path)?;
160
161 let current_lsn = if !exists || raw.metadata()?.len() == 0 {
162 // Write header for new file
163 // Format: Magic (4) + Version (1) + Reserved (3)
164 let mut header = Vec::with_capacity(8);
165 header.extend_from_slice(WAL_MAGIC);
166 header.push(WAL_VERSION);
167 header.extend_from_slice(&[0u8; 3]); // Reserved
168
169 raw.write_all(&header)?;
170 raw.sync_all()?;
171 8
172 } else {
173 // Existing file, set LSN to current end. Append-mode files
174 // ignore this seek for *writes*, but we use the returned
175 // position as our LSN counter.
176 raw.seek(SeekFrom::End(0))?
177 };
178
179 // Clone the file handle BEFORE wrapping in BufWriter. The
180 // clone shares the same kernel file description, so
181 // sync_all() on either descriptor flushes the whole inode.
182 // The BufWriter owns the original; the Arc<File> is shared
183 // with the group-commit leader.
184 let sync_handle = Arc::new(raw.try_clone()?);
185 let file = BufWriter::with_capacity(WAL_BUFFER_BYTES, raw);
186
187 // On open, every byte already on disk is by definition durable
188 // (any pre-crash unflushed tail was lost when the OS dropped
189 // page cache). Initialise `durable_lsn` to `current_lsn`.
190 let mut writer = Self {
191 file,
192 sync_handle,
193 current_lsn,
194 durable_lsn: current_lsn,
195 preallocated_to: 0,
196 prealloc_supported: true,
197 };
198 // Reserve the first segment up front so the very first appends land in
199 // contiguous extents rather than growing the file page-by-page.
200 writer.ensure_preallocated()?;
201 Ok(writer)
202 }
203
204 /// Ensure disk blocks are reserved at least up to the next segment
205 /// boundary above the current write frontier (`current_lsn`).
206 ///
207 /// Cheap (pure arithmetic) until the frontier crosses a
208 /// [`WAL_SEGMENT_BYTES`] boundary, at which point it issues a single
209 /// `fallocate`. Best-effort: a filesystem that can't preallocate disables
210 /// the feature; a transient error is swallowed so a write never fails
211 /// because preallocation hiccuped (the write path surfaces a genuine
212 /// `ENOSPC` on its own). Never grows the file's logical length, so it is
213 /// invisible to crash recovery.
214 fn ensure_preallocated(&mut self) -> io::Result<()> {
215 if !self.prealloc_supported {
216 return Ok(());
217 }
218 let target = next_wal_segment_boundary(self.current_lsn);
219 if target <= self.preallocated_to {
220 return Ok(());
221 }
222 let from = self.preallocated_to;
223 match reserve_wal_blocks(self.file.get_ref(), from, target - from) {
224 Ok(()) => self.preallocated_to = target,
225 Err(ref e) if fallocate_unsupported(e) => self.prealloc_supported = false,
226 Err(_) => {
227 // Best-effort: leave `preallocated_to` as-is and retry at the
228 // next boundary. Never propagate.
229 }
230 }
231 Ok(())
232 }
233
234 /// Append a record to the WAL.
235 ///
236 /// Bytes go into the BufWriter — they are NOT durable on disk
237 /// after this call returns. Callers that need durability must
238 /// follow up with [`WalWriter::sync`] or
239 /// [`WalWriter::flush_until`].
240 ///
241 /// Returns the LSN (Log Sequence Number) of the record.
242 pub fn append(&mut self, record: &WalRecord) -> io::Result<u64> {
243 let bytes = record.encode();
244 self.file.write_all(&bytes)?;
245
246 let record_lsn = self.current_lsn;
247 self.current_lsn += bytes.len() as u64;
248
249 self.ensure_preallocated()?;
250 Ok(record_lsn)
251 }
252
253 /// Write already-encoded bytes and advance the LSN counter to
254 /// match. Used by the lock-free append path: writers encode +
255 /// atomically reserve an LSN range outside this writer, the
256 /// group-commit coordinator drains the pending queue in LSN
257 /// order, then calls `append_bytes` for each batch.
258 ///
259 /// The bytes MUST be a valid `WalRecord::encode()` payload (or a
260 /// concatenation of such) — no structural validation happens
261 /// here. The caller is responsible for keeping the on-disk
262 /// byte offset synchronised with the externally-tracked LSN
263 /// counter; this method just appends and advances.
264 pub fn append_bytes(&mut self, bytes: &[u8]) -> io::Result<u64> {
265 self.file.write_all(bytes)?;
266 let record_lsn = self.current_lsn;
267 self.current_lsn += bytes.len() as u64;
268 self.ensure_preallocated()?;
269 Ok(record_lsn)
270 }
271
272 /// Rewind the writer's LSN counter to a specific value. Used
273 /// by the lock-free append path to resync the writer with the
274 /// externally-tracked `next_lsn` after a drain batch; the
275 /// coordinator knows the exact byte offset it just wrote to
276 /// and needs `current_lsn` to match so subsequent direct
277 /// callers of `append` stay consistent.
278 pub fn set_current_lsn(&mut self, lsn: u64) {
279 self.current_lsn = lsn;
280 }
281
282 /// Force sync to disk.
283 ///
284 /// Drains the user-space [`BufWriter`] first, then calls
285 /// `sync_all()` on the underlying file so every byte appended
286 /// since the last sync is durable. Updates `durable_lsn` so
287 /// subsequent `flush_until` calls become no-ops up to
288 /// `current_lsn`.
289 pub fn sync(&mut self) -> io::Result<()> {
290 self.file.flush()?;
291 self.file.get_ref().sync_all()?;
292 self.durable_lsn = self.current_lsn;
293 Ok(())
294 }
295
296 /// Ensure the WAL is durable on disk at least up to byte offset
297 /// `target`. No-op when `target <= durable_lsn`.
298 ///
299 /// This is the postgres `XLogFlush(LSN)` analogue. Pager flush
300 /// paths call this with `max(dirty.header.lsn)` before writing
301 /// any data page so the WAL record describing the change is
302 /// guaranteed to be on disk before the page itself.
303 pub fn flush_until(&mut self, target: u64) -> io::Result<()> {
304 if self.durable_lsn >= target {
305 return Ok(());
306 }
307 self.file.flush()?;
308 self.file.get_ref().sync_all()?;
309 self.durable_lsn = self.current_lsn;
310 Ok(())
311 }
312
313 /// Highest byte offset that is durable on disk. Used by the pager
314 /// to decide whether a `flush_until` call would actually need a
315 /// `fsync`.
316 pub fn durable_lsn(&self) -> u64 {
317 self.durable_lsn
318 }
319
320 /// Get current LSN (end of file offset)
321 pub fn current_lsn(&self) -> u64 {
322 self.current_lsn
323 }
324
325 /// Drain the BufWriter into the kernel and return the captured
326 /// LSN plus a cloned file handle for the caller to fsync
327 /// **without holding the WAL writer mutex**.
328 ///
329 /// Used by the group-commit leader path. The flow is:
330 ///
331 /// 1. Take the WAL writer mutex.
332 /// 2. Call this method — drains user-space buffer to the kernel
333 /// and captures `(target_lsn, sync_handle)`.
334 /// 3. Release the WAL writer mutex.
335 /// 4. Call `sync_handle.sync_all()` — this is the expensive
336 /// ~100 µs syscall, and other writers can keep appending
337 /// while it runs.
338 /// 5. Take the WAL writer mutex briefly and call
339 /// [`WalWriter::mark_durable(target_lsn)`] to publish the
340 /// new durable position.
341 ///
342 /// The cloned `sync_handle` shares the same kernel inode with
343 /// the writer's `file`, so `sync_all()` on the clone flushes
344 /// ALL bytes that have reached the kernel for that file —
345 /// including bytes appended by other writers AFTER step 3.
346 /// This is the coalescing window that makes group commit win.
347 pub fn drain_for_group_sync(&mut self) -> io::Result<(u64, Arc<File>)> {
348 // Drain user-space buffer into the kernel.
349 self.file.flush()?;
350 Ok((self.current_lsn, Arc::clone(&self.sync_handle)))
351 }
352
353 /// Manually advance `durable_lsn` after a successful out-of-lock
354 /// `sync_all()` performed via [`WalWriter::drain_for_group_sync`].
355 ///
356 /// Monotonic — never lowers `durable_lsn`. Safe to call with a
357 /// stale `lsn`; just becomes a no-op.
358 pub fn mark_durable(&mut self, lsn: u64) {
359 if lsn > self.durable_lsn {
360 self.durable_lsn = lsn;
361 }
362 }
363
364 /// Truncate the WAL (usually after checkpoint).
365 ///
366 /// Drains the BufWriter first so no pending bytes hit the file
367 /// after the truncate. Then resets the underlying file, rewrites
368 /// the header through the buffered writer (header is small; the
369 /// followup `flush + sync_all` makes it durable), and resets
370 /// LSN bookkeeping.
371 pub fn truncate(&mut self) -> io::Result<()> {
372 // Drop any pending bytes BEFORE the truncate; otherwise the
373 // BufWriter would flush them to a re-shrunken file in
374 // confused order.
375 self.file.flush()?;
376
377 {
378 let raw = self.file.get_mut();
379 raw.set_len(0)?;
380 raw.seek(SeekFrom::Start(0))?;
381 }
382
383 // Rewrite header through the BufWriter then drain.
384 let mut header = Vec::with_capacity(8);
385 header.extend_from_slice(WAL_MAGIC);
386 header.push(WAL_VERSION);
387 header.extend_from_slice(&[0u8; 3]);
388 self.file.write_all(&header)?;
389 self.file.flush()?;
390 self.file.get_ref().sync_all()?;
391
392 self.current_lsn = 8;
393 self.durable_lsn = 8;
394
395 // `set_len(0)` freed every reserved block, so the WAL would otherwise
396 // grow page-by-page again from here. Re-extend a fresh segment now —
397 // this is the "truncate/re-extend on checkpoint" half of issue #893.
398 self.preallocated_to = 0;
399 self.ensure_preallocated()?;
400 Ok(())
401 }
402}
403
404#[cfg(test)]
405mod tests {
406 use super::*;
407 use std::path::PathBuf;
408
409 struct FileGuard {
410 path: PathBuf,
411 }
412
413 impl Drop for FileGuard {
414 fn drop(&mut self) {
415 let _ = std::fs::remove_file(&self.path);
416 }
417 }
418
419 fn temp_wal(name: &str) -> (FileGuard, PathBuf) {
420 let path =
421 std::env::temp_dir().join(format!("rb_wal_writer_{}_{}.wal", name, std::process::id()));
422 let guard = FileGuard { path: path.clone() };
423 let _ = std::fs::remove_file(&path);
424 (guard, path)
425 }
426
427 #[test]
428 fn test_create_new_wal() {
429 let (_guard, path) = temp_wal("create");
430 let writer = WalWriter::open(&path).unwrap();
431
432 // Should start at LSN 8 (after 8-byte header)
433 assert_eq!(writer.current_lsn(), 8);
434 assert!(path.exists());
435 }
436
437 #[test]
438 fn test_append_record() {
439 let (_guard, path) = temp_wal("append");
440 let mut writer = WalWriter::open(&path).unwrap();
441
442 let record = WalRecord::Begin { tx_id: 42 };
443 let lsn = writer.append(&record).unwrap();
444
445 // First record starts at LSN 8
446 assert_eq!(lsn, 8);
447
448 // Next record should start after encoded size
449 // Begin record: 1 (type) + 8 (term) + 8 (tx_id) + 4 (checksum) = 21 bytes
450 assert_eq!(writer.current_lsn(), 8 + 21);
451 }
452
453 #[test]
454 fn test_append_multiple_records() {
455 let (_guard, path) = temp_wal("multi");
456 let mut writer = WalWriter::open(&path).unwrap();
457
458 let lsn1 = writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
459 let lsn2 = writer.append(&WalRecord::Begin { tx_id: 2 }).unwrap();
460 let lsn3 = writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
461
462 assert_eq!(lsn1, 8);
463 assert_eq!(lsn2, 8 + 21);
464 assert_eq!(lsn3, 8 + 21 + 21);
465 }
466
467 #[test]
468 fn test_page_write_lsn() {
469 let (_guard, path) = temp_wal("pagewrite");
470 let mut writer = WalWriter::open(&path).unwrap();
471
472 // First record
473 let lsn1 = writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
474 assert_eq!(lsn1, 8);
475
476 // PageWrite record: 1 + 8 + 4 + 4 + data_len + 4 = 21 + data_len
477 let data = vec![1, 2, 3, 4, 5];
478 let lsn2 = writer
479 .append(&WalRecord::PageWrite {
480 tx_id: 1,
481 page_id: 100,
482 data: data.clone(),
483 })
484 .unwrap();
485
486 assert_eq!(lsn2, 8 + 21); // after Begin
487
488 // Next LSN = lsn2 + (1 + 8 + 8 + 4 + 4 + 5 + 4) = lsn2 + 34
489 assert_eq!(writer.current_lsn(), 8 + 21 + 34);
490 }
491
492 #[test]
493 fn test_sync() {
494 let (_guard, path) = temp_wal("sync");
495 let mut writer = WalWriter::open(&path).unwrap();
496
497 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
498 writer.sync().unwrap();
499
500 // File should be synced, just verify no error
501 assert!(path.exists());
502 }
503
504 #[test]
505 fn test_truncate() {
506 let (_guard, path) = temp_wal("truncate");
507 let mut writer = WalWriter::open(&path).unwrap();
508
509 // Write some records
510 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
511 writer
512 .append(&WalRecord::PageWrite {
513 tx_id: 1,
514 page_id: 0,
515 data: vec![0; 100],
516 })
517 .unwrap();
518 writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
519
520 let lsn_before = writer.current_lsn();
521 assert!(lsn_before > 8);
522
523 // Truncate
524 writer.truncate().unwrap();
525
526 // LSN should be back to 8
527 assert_eq!(writer.current_lsn(), 8);
528
529 // File should be 8 bytes (just header)
530 let len = std::fs::metadata(&path).unwrap().len();
531 assert_eq!(len, 8);
532 }
533
534 #[test]
535 fn test_reopen_existing() {
536 let (_guard, path) = temp_wal("reopen");
537
538 // Create and write
539 let lsn_after_write;
540 {
541 let mut writer = WalWriter::open(&path).unwrap();
542 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
543 writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
544 lsn_after_write = writer.current_lsn();
545 }
546
547 // Reopen
548 {
549 let writer = WalWriter::open(&path).unwrap();
550 // Should continue from where we left off
551 assert_eq!(writer.current_lsn(), lsn_after_write);
552 }
553 }
554
555 #[test]
556 fn test_checkpoint_record() {
557 let (_guard, path) = temp_wal("checkpoint");
558 let mut writer = WalWriter::open(&path).unwrap();
559
560 // Checkpoint is same size as Begin (1 + 8 + 8 + 4 = 21)
561 let lsn = writer
562 .append(&WalRecord::Checkpoint { lsn: 12345 })
563 .unwrap();
564 assert_eq!(lsn, 8);
565 assert_eq!(writer.current_lsn(), 8 + 21);
566 }
567
568 // -----------------------------------------------------------------
569 // Target 3: durable_lsn / flush_until tests
570 // -----------------------------------------------------------------
571
572 #[test]
573 fn fresh_wal_has_durable_lsn_at_header_end() {
574 let (_guard, path) = temp_wal("durable_init");
575 let writer = WalWriter::open(&path).unwrap();
576 assert_eq!(writer.durable_lsn(), 8);
577 assert_eq!(writer.current_lsn(), 8);
578 }
579
580 #[test]
581 fn flush_until_below_durable_is_noop() {
582 let (_guard, path) = temp_wal("flush_noop");
583 let mut writer = WalWriter::open(&path).unwrap();
584 // After open, durable_lsn == 8.
585 let before = writer.durable_lsn();
586 writer.flush_until(0).unwrap();
587 writer.flush_until(8).unwrap();
588 assert_eq!(writer.durable_lsn(), before);
589 }
590
591 #[test]
592 fn flush_until_advances_durable_to_current() {
593 let (_guard, path) = temp_wal("flush_advance");
594 let mut writer = WalWriter::open(&path).unwrap();
595 writer.append(&WalRecord::Begin { tx_id: 7 }).unwrap();
596 writer.append(&WalRecord::Commit { tx_id: 7 }).unwrap();
597 let target = writer.current_lsn();
598 // Before flush_until, durable still at the header.
599 assert_eq!(writer.durable_lsn(), 8);
600 writer.flush_until(target).unwrap();
601 assert_eq!(writer.durable_lsn(), target);
602 }
603
604 #[test]
605 fn flush_until_is_monotonic() {
606 let (_guard, path) = temp_wal("flush_monotonic");
607 let mut writer = WalWriter::open(&path).unwrap();
608 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
609 let lo = writer.current_lsn();
610 writer.flush_until(lo).unwrap();
611 let durable_after_lo = writer.durable_lsn();
612 writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
613 let hi = writer.current_lsn();
614 writer.flush_until(hi).unwrap();
615 assert!(writer.durable_lsn() >= durable_after_lo);
616 // Calling flush_until(lo) after flush_until(hi) is a no-op.
617 writer.flush_until(lo).unwrap();
618 assert_eq!(writer.durable_lsn(), hi);
619 }
620
621 #[test]
622 fn sync_advances_durable_lsn_too() {
623 let (_guard, path) = temp_wal("sync_durable");
624 let mut writer = WalWriter::open(&path).unwrap();
625 writer.append(&WalRecord::Begin { tx_id: 9 }).unwrap();
626 let before = writer.durable_lsn();
627 let after_append = writer.current_lsn();
628 assert!(after_append > before);
629 writer.sync().unwrap();
630 assert_eq!(writer.durable_lsn(), after_append);
631 }
632
633 #[test]
634 fn truncate_resets_durable_lsn() {
635 let (_guard, path) = temp_wal("truncate_durable");
636 let mut writer = WalWriter::open(&path).unwrap();
637 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
638 writer.sync().unwrap();
639 assert!(writer.durable_lsn() > 8);
640 writer.truncate().unwrap();
641 assert_eq!(writer.durable_lsn(), 8);
642 assert_eq!(writer.current_lsn(), 8);
643 }
644
645 #[test]
646 fn reopen_initialises_durable_to_current() {
647 let (_guard, path) = temp_wal("reopen_durable");
648 {
649 let mut writer = WalWriter::open(&path).unwrap();
650 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
651 writer.sync().unwrap();
652 }
653 let writer = WalWriter::open(&path).unwrap();
654 // After reopen, every byte on disk is durable by definition.
655 assert_eq!(writer.durable_lsn(), writer.current_lsn());
656 }
657
658 // -----------------------------------------------------------------
659 // Perf 1.1: BufWriter coalesces small appends until sync
660 // -----------------------------------------------------------------
661
662 #[test]
663 fn bufwriter_coalesces_until_sync() {
664 // Append 100 small records but DO NOT sync. The on-disk file
665 // size must still equal the header (8 bytes) because the
666 // bytes are sitting in the BufWriter, not in the kernel.
667 let (_guard, path) = temp_wal("bufwriter_coalesce");
668 let mut writer = WalWriter::open(&path).unwrap();
669 for tx in 0..100u64 {
670 writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
671 }
672 // current_lsn reflects the in-buffer position.
673 assert_eq!(writer.current_lsn(), 8 + 100 * 21);
674 // But the file on disk only has the header.
675 let on_disk = std::fs::metadata(&path).unwrap().len();
676 assert_eq!(on_disk, 8, "BufWriter leaked bytes to disk before sync");
677 }
678
679 #[test]
680 fn sync_drains_bufwriter_before_fsync() {
681 // After sync(), the file size must equal current_lsn — the
682 // BufWriter has been flushed and sync_all has hit the kernel.
683 let (_guard, path) = temp_wal("sync_drains");
684 let mut writer = WalWriter::open(&path).unwrap();
685 for tx in 0..50u64 {
686 writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
687 }
688 writer.sync().unwrap();
689 let on_disk = std::fs::metadata(&path).unwrap().len();
690 assert_eq!(on_disk, writer.current_lsn());
691 assert_eq!(writer.durable_lsn(), writer.current_lsn());
692 }
693
694 #[test]
695 fn flush_until_drains_bufwriter_too() {
696 // flush_until must drain the BufWriter before calling
697 // sync_all on the underlying file — otherwise pending bytes
698 // never become durable.
699 let (_guard, path) = temp_wal("flush_until_drains");
700 let mut writer = WalWriter::open(&path).unwrap();
701 for tx in 0..30u64 {
702 writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
703 }
704 let target = writer.current_lsn();
705 writer.flush_until(target).unwrap();
706 let on_disk = std::fs::metadata(&path).unwrap().len();
707 assert_eq!(on_disk, target);
708 assert_eq!(writer.durable_lsn(), target);
709 }
710
711 #[test]
712 fn truncate_drains_pending_bufwriter_bytes_first() {
713 // If truncate did NOT drain BufWriter first, the pending bytes
714 // would either land in the post-truncate file (corrupting it
715 // with stale records) or be lost. Verify the resulting file
716 // contains only a fresh header.
717 let (_guard, path) = temp_wal("truncate_drain");
718 let mut writer = WalWriter::open(&path).unwrap();
719 // Write enough small records to fill some of the 64 KiB buffer
720 // but stay below the auto-flush threshold.
721 for tx in 0..200u64 {
722 writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
723 }
724 // Sanity: bytes are buffered.
725 assert_eq!(std::fs::metadata(&path).unwrap().len(), 8);
726
727 writer.truncate().unwrap();
728 // After truncate the file is just the header again.
729 let on_disk = std::fs::metadata(&path).unwrap().len();
730 assert_eq!(on_disk, 8);
731 assert_eq!(writer.current_lsn(), 8);
732 assert_eq!(writer.durable_lsn(), 8);
733
734 // And we can append again successfully.
735 writer.append(&WalRecord::Begin { tx_id: 99 }).unwrap();
736 writer.sync().unwrap();
737 assert_eq!(std::fs::metadata(&path).unwrap().len(), 8 + 21);
738 }
739
740 #[test]
741 fn reopen_sees_only_synced_records() {
742 // Records that were appended but never sync'd must NOT
743 // survive a reopen — they lived in the BufWriter, never made
744 // it to the kernel, and the previous WalWriter went out of
745 // scope. The new WalWriter reopens the file and reads from
746 // EOF, which reflects only the bytes that hit disk.
747 //
748 // We sync some records, then drop the writer mid-buffer, and
749 // assert the reopen LSN matches only the synced prefix.
750 let (_guard, path) = temp_wal("reopen_synced_only");
751 let synced_lsn;
752 {
753 let mut writer = WalWriter::open(&path).unwrap();
754 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
755 writer.sync().unwrap();
756 synced_lsn = writer.current_lsn();
757 // These records are never sync'd before drop. Drop runs
758 // BufWriter::flush which DOES write them — see note below.
759 for tx in 100..120u64 {
760 writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
761 }
762 // Without a sync, the in-buffer bytes are still pending.
763 // BufWriter's Drop impl does flush to the file but does
764 // not call sync_all. For reopen-LSN purposes, on-disk
765 // bytes count regardless of fsync, so the reopened LSN
766 // will reflect the dropped writes too.
767 }
768 let writer = WalWriter::open(&path).unwrap();
769 // The reopen LSN reflects what's physically on disk after
770 // BufWriter::Drop flushes its buffer. That may or may not
771 // include the unsync'd records depending on platform; the
772 // contract we care about is that durable_lsn ≥ synced_lsn.
773 assert!(writer.durable_lsn() >= synced_lsn);
774 }
775
776 // -----------------------------------------------------------------
777 // Issue #893: fallocate-based WAL segment preallocation
778 // -----------------------------------------------------------------
779
780 /// On-disk blocks reserved by `fallocate`, in bytes. Returns the
781 /// allocated size (st_blocks × 512), independent of the logical length.
782 fn allocated_bytes(path: &std::path::Path) -> u64 {
783 use fs2::FileExt;
784 let f = std::fs::File::open(path).unwrap();
785 f.allocated_size().unwrap()
786 }
787
788 #[test]
789 fn segment_boundary_rounds_strictly_up() {
790 // Always lands one boundary ahead so the reservation stays in front
791 // of the write frontier.
792 assert_eq!(next_wal_segment_boundary(0), WAL_SEGMENT_BYTES);
793 assert_eq!(next_wal_segment_boundary(8), WAL_SEGMENT_BYTES);
794 assert_eq!(
795 next_wal_segment_boundary(WAL_SEGMENT_BYTES - 1),
796 WAL_SEGMENT_BYTES
797 );
798 // Exactly on a boundary still advances to the next one.
799 assert_eq!(
800 next_wal_segment_boundary(WAL_SEGMENT_BYTES),
801 2 * WAL_SEGMENT_BYTES
802 );
803 assert_eq!(
804 next_wal_segment_boundary(WAL_SEGMENT_BYTES + 1),
805 2 * WAL_SEGMENT_BYTES
806 );
807 }
808
809 #[test]
810 fn open_preallocates_first_segment() {
811 // A freshly opened WAL must reserve a whole segment up front instead
812 // of growing incrementally (acceptance #1).
813 let (_guard, path) = temp_wal("prealloc_open");
814 let writer = WalWriter::open(&path).unwrap();
815 if !writer.prealloc_supported {
816 return; // filesystem without fallocate — feature is a no-op.
817 }
818 assert_eq!(writer.preallocated_to, WAL_SEGMENT_BYTES);
819 // The reservation is real on disk, yet the logical file is still just
820 // the 8-byte header.
821 assert!(allocated_bytes(&path) >= WAL_SEGMENT_BYTES);
822 assert_eq!(std::fs::metadata(&path).unwrap().len(), 8);
823 }
824
825 #[test]
826 fn preallocation_does_not_grow_logical_length() {
827 // The load-bearing invariant for crash recovery: appending records
828 // must NOT inflate the logical file size beyond the real data, or the
829 // EOF scan in WalReader would walk into the reserved tail. Holds on
830 // every filesystem (fallocate keeps i_size pinned; absent fallocate
831 // there is no reservation at all).
832 let (_guard, path) = temp_wal("prealloc_logical");
833 let mut writer = WalWriter::open(&path).unwrap();
834 for tx in 0..50u64 {
835 writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
836 }
837 writer.sync().unwrap();
838 let logical = std::fs::metadata(&path).unwrap().len();
839 assert_eq!(logical, 8 + 50 * 21, "preallocation inflated i_size");
840 assert_eq!(writer.current_lsn(), logical);
841 }
842
843 #[test]
844 fn truncate_re_extends_a_fresh_segment() {
845 // After checkpoint truncation the WAL must re-extend rather than grow
846 // unbounded page-by-page (acceptance #2).
847 let (_guard, path) = temp_wal("prealloc_truncate");
848 let mut writer = WalWriter::open(&path).unwrap();
849 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
850 writer.sync().unwrap();
851
852 writer.truncate().unwrap();
853
854 assert_eq!(writer.current_lsn(), 8);
855 assert_eq!(std::fs::metadata(&path).unwrap().len(), 8);
856 if writer.prealloc_supported {
857 assert_eq!(writer.preallocated_to, WAL_SEGMENT_BYTES);
858 assert!(allocated_bytes(&path) >= WAL_SEGMENT_BYTES);
859 }
860 }
861
862 #[test]
863 fn preallocated_wal_recovers_records_without_trailing_garbage() {
864 // End-to-end: a preallocated WAL must read back exactly the records
865 // written — the reserved (unwritten) tail must be invisible to the
866 // reader, proving crash-recovery is unchanged (acceptance #3).
867 use super::super::reader::WalReader;
868 let (_guard, path) = temp_wal("prealloc_recover");
869 {
870 let mut writer = WalWriter::open(&path).unwrap();
871 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
872 writer
873 .append(&WalRecord::PageWrite {
874 tx_id: 1,
875 page_id: 7,
876 data: vec![1, 2, 3, 4],
877 })
878 .unwrap();
879 writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
880 writer.sync().unwrap();
881 }
882 let records: Vec<_> = WalReader::open(&path)
883 .unwrap()
884 .iter()
885 .collect::<Result<_, _>>()
886 .expect("reader must stop cleanly at real EOF, not in reserved tail");
887 assert_eq!(records.len(), 3);
888 assert_eq!(records[0].1, WalRecord::Begin { tx_id: 1 });
889 assert_eq!(records[2].1, WalRecord::Commit { tx_id: 1 });
890 }
891}