reddb_server/storage/wal/writer.rs
1use super::record::{WalRecord, WAL_MAGIC, WAL_VERSION};
2use std::fs::{File, OpenOptions};
3use std::io::{self, BufWriter, Seek, SeekFrom, Write};
4use std::path::Path;
5use std::sync::Arc;
6
7/// User-space buffer size for the WAL writer.
8///
9/// Chosen so that ~5 000 small records (Begin/Commit ≈ 13 bytes,
10/// small PageWrite ≈ 26 bytes) coalesce into a single `write` syscall
11/// before the next `sync()` drains the buffer. Tunable; reflects the
12/// postgres XLOG block size (8 KiB) scaled up because we batch
13/// record-level rather than page-level.
14const WAL_BUFFER_BYTES: usize = 64 * 1024;
15
16/// Writer for the Write-Ahead Log
17///
18/// Wraps the underlying file in a [`BufWriter`] so each `append` does
19/// not pay a write syscall — bytes accumulate in a 64 KiB user-space
20/// buffer until `sync()` (or `flush_until()`) drains them and then
21/// calls `sync_all()` on the raw file. This is how postgres turns
22/// per-record append cost from ~500 ns down to ~5 ns; reddb's previous
23/// per-append `write_all` directly to the file paid the syscall on
24/// every record.
25///
26/// **Critical contract:** every code path that calls `sync_all()` on
27/// the underlying file *must* drain the [`BufWriter`] first via
28/// `BufWriter::flush()`. Otherwise the bytes in user-space never reach
29/// the kernel before fsync, and durability is silently broken.
30pub struct WalWriter {
31 file: BufWriter<File>,
32 /// Cloned file descriptor for `sync_all()` outside the writer
33 /// mutex. Both this and `file`'s inner `File` point at the same
34 /// kernel inode; calling `sync_all()` on either flushes ALL
35 /// pending bytes for that inode. This is the trick that lets
36 /// the group-commit leader release the WAL writer lock during
37 /// the expensive fsync — see [`WalWriter::drain_for_group_sync`].
38 ///
39 /// Without this clone, a leader holding the writer mutex during
40 /// `sync_all()` blocks every other writer from appending,
41 /// defeating the entire purpose of group commit.
42 sync_handle: Arc<File>,
43 /// Log Sequence Number — byte offset of the next record. Advances
44 /// every `append`; survives across restarts via `seek(End)`.
45 current_lsn: u64,
46 /// Highest LSN that has been `sync_all()`'d to disk. The WAL-first
47 /// flush invariant relies on this: a page with `header.lsn = L` may
48 /// only be written to its data file once `durable_lsn >= L`.
49 /// See `src/storage/cache/README.md` § Invariant 2 and the Target 3
50 /// section of `PLAN.md`.
51 durable_lsn: u64,
52}
53
54impl WalWriter {
55 /// Open a WAL file for writing. Creates it if it doesn't exist.
56 pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
57 let exists = path.as_ref().exists();
58
59 // We do all initial bookkeeping (write header, seek to EOF) on
60 // the raw `File` BEFORE wrapping in a BufWriter so we don't
61 // have to worry about flush ordering during construction.
62 let mut raw = OpenOptions::new()
63 .read(true)
64 .create(true)
65 .append(true)
66 .open(path)?;
67
68 let current_lsn = if !exists || raw.metadata()?.len() == 0 {
69 // Write header for new file
70 // Format: Magic (4) + Version (1) + Reserved (3)
71 let mut header = Vec::with_capacity(8);
72 header.extend_from_slice(WAL_MAGIC);
73 header.push(WAL_VERSION);
74 header.extend_from_slice(&[0u8; 3]); // Reserved
75
76 raw.write_all(&header)?;
77 raw.sync_all()?;
78 8
79 } else {
80 // Existing file, set LSN to current end. Append-mode files
81 // ignore this seek for *writes*, but we use the returned
82 // position as our LSN counter.
83 raw.seek(SeekFrom::End(0))?
84 };
85
86 // Clone the file handle BEFORE wrapping in BufWriter. The
87 // clone shares the same kernel file description, so
88 // sync_all() on either descriptor flushes the whole inode.
89 // The BufWriter owns the original; the Arc<File> is shared
90 // with the group-commit leader.
91 let sync_handle = Arc::new(raw.try_clone()?);
92 let file = BufWriter::with_capacity(WAL_BUFFER_BYTES, raw);
93
94 // On open, every byte already on disk is by definition durable
95 // (any pre-crash unflushed tail was lost when the OS dropped
96 // page cache). Initialise `durable_lsn` to `current_lsn`.
97 Ok(Self {
98 file,
99 sync_handle,
100 current_lsn,
101 durable_lsn: current_lsn,
102 })
103 }
104
105 /// Append a record to the WAL.
106 ///
107 /// Bytes go into the BufWriter — they are NOT durable on disk
108 /// after this call returns. Callers that need durability must
109 /// follow up with [`WalWriter::sync`] or
110 /// [`WalWriter::flush_until`].
111 ///
112 /// Returns the LSN (Log Sequence Number) of the record.
113 pub fn append(&mut self, record: &WalRecord) -> io::Result<u64> {
114 let bytes = record.encode();
115 self.file.write_all(&bytes)?;
116
117 let record_lsn = self.current_lsn;
118 self.current_lsn += bytes.len() as u64;
119
120 Ok(record_lsn)
121 }
122
123 /// Write already-encoded bytes and advance the LSN counter to
124 /// match. Used by the lock-free append path: writers encode +
125 /// atomically reserve an LSN range outside this writer, the
126 /// group-commit coordinator drains the pending queue in LSN
127 /// order, then calls `append_bytes` for each batch.
128 ///
129 /// The bytes MUST be a valid `WalRecord::encode()` payload (or a
130 /// concatenation of such) — no structural validation happens
131 /// here. The caller is responsible for keeping the on-disk
132 /// byte offset synchronised with the externally-tracked LSN
133 /// counter; this method just appends and advances.
134 pub fn append_bytes(&mut self, bytes: &[u8]) -> io::Result<u64> {
135 self.file.write_all(bytes)?;
136 let record_lsn = self.current_lsn;
137 self.current_lsn += bytes.len() as u64;
138 Ok(record_lsn)
139 }
140
141 /// Rewind the writer's LSN counter to a specific value. Used
142 /// by the lock-free append path to resync the writer with the
143 /// externally-tracked `next_lsn` after a drain batch; the
144 /// coordinator knows the exact byte offset it just wrote to
145 /// and needs `current_lsn` to match so subsequent direct
146 /// callers of `append` stay consistent.
147 pub fn set_current_lsn(&mut self, lsn: u64) {
148 self.current_lsn = lsn;
149 }
150
151 /// Force sync to disk.
152 ///
153 /// Drains the user-space [`BufWriter`] first, then calls
154 /// `sync_all()` on the underlying file so every byte appended
155 /// since the last sync is durable. Updates `durable_lsn` so
156 /// subsequent `flush_until` calls become no-ops up to
157 /// `current_lsn`.
158 pub fn sync(&mut self) -> io::Result<()> {
159 self.file.flush()?;
160 self.file.get_ref().sync_all()?;
161 self.durable_lsn = self.current_lsn;
162 Ok(())
163 }
164
165 /// Ensure the WAL is durable on disk at least up to byte offset
166 /// `target`. No-op when `target <= durable_lsn`.
167 ///
168 /// This is the postgres `XLogFlush(LSN)` analogue. Pager flush
169 /// paths call this with `max(dirty.header.lsn)` before writing
170 /// any data page so the WAL record describing the change is
171 /// guaranteed to be on disk before the page itself.
172 pub fn flush_until(&mut self, target: u64) -> io::Result<()> {
173 if self.durable_lsn >= target {
174 return Ok(());
175 }
176 self.file.flush()?;
177 self.file.get_ref().sync_all()?;
178 self.durable_lsn = self.current_lsn;
179 Ok(())
180 }
181
182 /// Highest byte offset that is durable on disk. Used by the pager
183 /// to decide whether a `flush_until` call would actually need a
184 /// `fsync`.
185 pub fn durable_lsn(&self) -> u64 {
186 self.durable_lsn
187 }
188
189 /// Get current LSN (end of file offset)
190 pub fn current_lsn(&self) -> u64 {
191 self.current_lsn
192 }
193
194 /// Drain the BufWriter into the kernel and return the captured
195 /// LSN plus a cloned file handle for the caller to fsync
196 /// **without holding the WAL writer mutex**.
197 ///
198 /// Used by the group-commit leader path. The flow is:
199 ///
200 /// 1. Take the WAL writer mutex.
201 /// 2. Call this method — drains user-space buffer to the kernel
202 /// and captures `(target_lsn, sync_handle)`.
203 /// 3. Release the WAL writer mutex.
204 /// 4. Call `sync_handle.sync_all()` — this is the expensive
205 /// ~100 µs syscall, and other writers can keep appending
206 /// while it runs.
207 /// 5. Take the WAL writer mutex briefly and call
208 /// [`WalWriter::mark_durable(target_lsn)`] to publish the
209 /// new durable position.
210 ///
211 /// The cloned `sync_handle` shares the same kernel inode with
212 /// the writer's `file`, so `sync_all()` on the clone flushes
213 /// ALL bytes that have reached the kernel for that file —
214 /// including bytes appended by other writers AFTER step 3.
215 /// This is the coalescing window that makes group commit win.
216 pub fn drain_for_group_sync(&mut self) -> io::Result<(u64, Arc<File>)> {
217 // Drain user-space buffer into the kernel.
218 self.file.flush()?;
219 Ok((self.current_lsn, Arc::clone(&self.sync_handle)))
220 }
221
222 /// Manually advance `durable_lsn` after a successful out-of-lock
223 /// `sync_all()` performed via [`WalWriter::drain_for_group_sync`].
224 ///
225 /// Monotonic — never lowers `durable_lsn`. Safe to call with a
226 /// stale `lsn`; just becomes a no-op.
227 pub fn mark_durable(&mut self, lsn: u64) {
228 if lsn > self.durable_lsn {
229 self.durable_lsn = lsn;
230 }
231 }
232
233 /// Truncate the WAL (usually after checkpoint).
234 ///
235 /// Drains the BufWriter first so no pending bytes hit the file
236 /// after the truncate. Then resets the underlying file, rewrites
237 /// the header through the buffered writer (header is small; the
238 /// followup `flush + sync_all` makes it durable), and resets
239 /// LSN bookkeeping.
240 pub fn truncate(&mut self) -> io::Result<()> {
241 // Drop any pending bytes BEFORE the truncate; otherwise the
242 // BufWriter would flush them to a re-shrunken file in
243 // confused order.
244 self.file.flush()?;
245
246 {
247 let raw = self.file.get_mut();
248 raw.set_len(0)?;
249 raw.seek(SeekFrom::Start(0))?;
250 }
251
252 // Rewrite header through the BufWriter then drain.
253 let mut header = Vec::with_capacity(8);
254 header.extend_from_slice(WAL_MAGIC);
255 header.push(WAL_VERSION);
256 header.extend_from_slice(&[0u8; 3]);
257 self.file.write_all(&header)?;
258 self.file.flush()?;
259 self.file.get_ref().sync_all()?;
260
261 self.current_lsn = 8;
262 self.durable_lsn = 8;
263 Ok(())
264 }
265}
266
267#[cfg(test)]
268mod tests {
269 use super::*;
270 use std::path::PathBuf;
271
272 struct FileGuard {
273 path: PathBuf,
274 }
275
276 impl Drop for FileGuard {
277 fn drop(&mut self) {
278 let _ = std::fs::remove_file(&self.path);
279 }
280 }
281
282 fn temp_wal(name: &str) -> (FileGuard, PathBuf) {
283 let path =
284 std::env::temp_dir().join(format!("rb_wal_writer_{}_{}.wal", name, std::process::id()));
285 let guard = FileGuard { path: path.clone() };
286 let _ = std::fs::remove_file(&path);
287 (guard, path)
288 }
289
290 #[test]
291 fn test_create_new_wal() {
292 let (_guard, path) = temp_wal("create");
293 let writer = WalWriter::open(&path).unwrap();
294
295 // Should start at LSN 8 (after 8-byte header)
296 assert_eq!(writer.current_lsn(), 8);
297 assert!(path.exists());
298 }
299
300 #[test]
301 fn test_append_record() {
302 let (_guard, path) = temp_wal("append");
303 let mut writer = WalWriter::open(&path).unwrap();
304
305 let record = WalRecord::Begin { tx_id: 42 };
306 let lsn = writer.append(&record).unwrap();
307
308 // First record starts at LSN 8
309 assert_eq!(lsn, 8);
310
311 // Next record should start after encoded size
312 // Begin record: 1 (type) + 8 (tx_id) + 4 (checksum) = 13 bytes
313 assert_eq!(writer.current_lsn(), 8 + 13);
314 }
315
316 #[test]
317 fn test_append_multiple_records() {
318 let (_guard, path) = temp_wal("multi");
319 let mut writer = WalWriter::open(&path).unwrap();
320
321 let lsn1 = writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
322 let lsn2 = writer.append(&WalRecord::Begin { tx_id: 2 }).unwrap();
323 let lsn3 = writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
324
325 assert_eq!(lsn1, 8);
326 assert_eq!(lsn2, 8 + 13);
327 assert_eq!(lsn3, 8 + 13 + 13);
328 }
329
330 #[test]
331 fn test_page_write_lsn() {
332 let (_guard, path) = temp_wal("pagewrite");
333 let mut writer = WalWriter::open(&path).unwrap();
334
335 // First record
336 let lsn1 = writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
337 assert_eq!(lsn1, 8);
338
339 // PageWrite record: 1 + 8 + 4 + 4 + data_len + 4 = 21 + data_len
340 let data = vec![1, 2, 3, 4, 5];
341 let lsn2 = writer
342 .append(&WalRecord::PageWrite {
343 tx_id: 1,
344 page_id: 100,
345 data: data.clone(),
346 })
347 .unwrap();
348
349 assert_eq!(lsn2, 8 + 13); // after Begin
350
351 // Next LSN = lsn2 + (1 + 8 + 4 + 4 + 5 + 4) = lsn2 + 26
352 assert_eq!(writer.current_lsn(), 8 + 13 + 26);
353 }
354
355 #[test]
356 fn test_sync() {
357 let (_guard, path) = temp_wal("sync");
358 let mut writer = WalWriter::open(&path).unwrap();
359
360 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
361 writer.sync().unwrap();
362
363 // File should be synced, just verify no error
364 assert!(path.exists());
365 }
366
367 #[test]
368 fn test_truncate() {
369 let (_guard, path) = temp_wal("truncate");
370 let mut writer = WalWriter::open(&path).unwrap();
371
372 // Write some records
373 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
374 writer
375 .append(&WalRecord::PageWrite {
376 tx_id: 1,
377 page_id: 0,
378 data: vec![0; 100],
379 })
380 .unwrap();
381 writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
382
383 let lsn_before = writer.current_lsn();
384 assert!(lsn_before > 8);
385
386 // Truncate
387 writer.truncate().unwrap();
388
389 // LSN should be back to 8
390 assert_eq!(writer.current_lsn(), 8);
391
392 // File should be 8 bytes (just header)
393 let len = std::fs::metadata(&path).unwrap().len();
394 assert_eq!(len, 8);
395 }
396
397 #[test]
398 fn test_reopen_existing() {
399 let (_guard, path) = temp_wal("reopen");
400
401 // Create and write
402 let lsn_after_write;
403 {
404 let mut writer = WalWriter::open(&path).unwrap();
405 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
406 writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
407 lsn_after_write = writer.current_lsn();
408 }
409
410 // Reopen
411 {
412 let writer = WalWriter::open(&path).unwrap();
413 // Should continue from where we left off
414 assert_eq!(writer.current_lsn(), lsn_after_write);
415 }
416 }
417
418 #[test]
419 fn test_checkpoint_record() {
420 let (_guard, path) = temp_wal("checkpoint");
421 let mut writer = WalWriter::open(&path).unwrap();
422
423 // Checkpoint is same size as Begin (1 + 8 + 4 = 13)
424 let lsn = writer
425 .append(&WalRecord::Checkpoint { lsn: 12345 })
426 .unwrap();
427 assert_eq!(lsn, 8);
428 assert_eq!(writer.current_lsn(), 8 + 13);
429 }
430
431 // -----------------------------------------------------------------
432 // Target 3: durable_lsn / flush_until tests
433 // -----------------------------------------------------------------
434
435 #[test]
436 fn fresh_wal_has_durable_lsn_at_header_end() {
437 let (_guard, path) = temp_wal("durable_init");
438 let writer = WalWriter::open(&path).unwrap();
439 assert_eq!(writer.durable_lsn(), 8);
440 assert_eq!(writer.current_lsn(), 8);
441 }
442
443 #[test]
444 fn flush_until_below_durable_is_noop() {
445 let (_guard, path) = temp_wal("flush_noop");
446 let mut writer = WalWriter::open(&path).unwrap();
447 // After open, durable_lsn == 8.
448 let before = writer.durable_lsn();
449 writer.flush_until(0).unwrap();
450 writer.flush_until(8).unwrap();
451 assert_eq!(writer.durable_lsn(), before);
452 }
453
454 #[test]
455 fn flush_until_advances_durable_to_current() {
456 let (_guard, path) = temp_wal("flush_advance");
457 let mut writer = WalWriter::open(&path).unwrap();
458 writer.append(&WalRecord::Begin { tx_id: 7 }).unwrap();
459 writer.append(&WalRecord::Commit { tx_id: 7 }).unwrap();
460 let target = writer.current_lsn();
461 // Before flush_until, durable still at the header.
462 assert_eq!(writer.durable_lsn(), 8);
463 writer.flush_until(target).unwrap();
464 assert_eq!(writer.durable_lsn(), target);
465 }
466
467 #[test]
468 fn flush_until_is_monotonic() {
469 let (_guard, path) = temp_wal("flush_monotonic");
470 let mut writer = WalWriter::open(&path).unwrap();
471 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
472 let lo = writer.current_lsn();
473 writer.flush_until(lo).unwrap();
474 let durable_after_lo = writer.durable_lsn();
475 writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
476 let hi = writer.current_lsn();
477 writer.flush_until(hi).unwrap();
478 assert!(writer.durable_lsn() >= durable_after_lo);
479 // Calling flush_until(lo) after flush_until(hi) is a no-op.
480 writer.flush_until(lo).unwrap();
481 assert_eq!(writer.durable_lsn(), hi);
482 }
483
484 #[test]
485 fn sync_advances_durable_lsn_too() {
486 let (_guard, path) = temp_wal("sync_durable");
487 let mut writer = WalWriter::open(&path).unwrap();
488 writer.append(&WalRecord::Begin { tx_id: 9 }).unwrap();
489 let before = writer.durable_lsn();
490 let after_append = writer.current_lsn();
491 assert!(after_append > before);
492 writer.sync().unwrap();
493 assert_eq!(writer.durable_lsn(), after_append);
494 }
495
496 #[test]
497 fn truncate_resets_durable_lsn() {
498 let (_guard, path) = temp_wal("truncate_durable");
499 let mut writer = WalWriter::open(&path).unwrap();
500 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
501 writer.sync().unwrap();
502 assert!(writer.durable_lsn() > 8);
503 writer.truncate().unwrap();
504 assert_eq!(writer.durable_lsn(), 8);
505 assert_eq!(writer.current_lsn(), 8);
506 }
507
508 #[test]
509 fn reopen_initialises_durable_to_current() {
510 let (_guard, path) = temp_wal("reopen_durable");
511 {
512 let mut writer = WalWriter::open(&path).unwrap();
513 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
514 writer.sync().unwrap();
515 }
516 let writer = WalWriter::open(&path).unwrap();
517 // After reopen, every byte on disk is durable by definition.
518 assert_eq!(writer.durable_lsn(), writer.current_lsn());
519 }
520
521 // -----------------------------------------------------------------
522 // Perf 1.1: BufWriter coalesces small appends until sync
523 // -----------------------------------------------------------------
524
525 #[test]
526 fn bufwriter_coalesces_until_sync() {
527 // Append 100 small records but DO NOT sync. The on-disk file
528 // size must still equal the header (8 bytes) because the
529 // bytes are sitting in the BufWriter, not in the kernel.
530 let (_guard, path) = temp_wal("bufwriter_coalesce");
531 let mut writer = WalWriter::open(&path).unwrap();
532 for tx in 0..100u64 {
533 writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
534 }
535 // current_lsn reflects the in-buffer position.
536 assert_eq!(writer.current_lsn(), 8 + 100 * 13);
537 // But the file on disk only has the header.
538 let on_disk = std::fs::metadata(&path).unwrap().len();
539 assert_eq!(on_disk, 8, "BufWriter leaked bytes to disk before sync");
540 }
541
542 #[test]
543 fn sync_drains_bufwriter_before_fsync() {
544 // After sync(), the file size must equal current_lsn — the
545 // BufWriter has been flushed and sync_all has hit the kernel.
546 let (_guard, path) = temp_wal("sync_drains");
547 let mut writer = WalWriter::open(&path).unwrap();
548 for tx in 0..50u64 {
549 writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
550 }
551 writer.sync().unwrap();
552 let on_disk = std::fs::metadata(&path).unwrap().len();
553 assert_eq!(on_disk, writer.current_lsn());
554 assert_eq!(writer.durable_lsn(), writer.current_lsn());
555 }
556
557 #[test]
558 fn flush_until_drains_bufwriter_too() {
559 // flush_until must drain the BufWriter before calling
560 // sync_all on the underlying file — otherwise pending bytes
561 // never become durable.
562 let (_guard, path) = temp_wal("flush_until_drains");
563 let mut writer = WalWriter::open(&path).unwrap();
564 for tx in 0..30u64 {
565 writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
566 }
567 let target = writer.current_lsn();
568 writer.flush_until(target).unwrap();
569 let on_disk = std::fs::metadata(&path).unwrap().len();
570 assert_eq!(on_disk, target);
571 assert_eq!(writer.durable_lsn(), target);
572 }
573
574 #[test]
575 fn truncate_drains_pending_bufwriter_bytes_first() {
576 // If truncate did NOT drain BufWriter first, the pending bytes
577 // would either land in the post-truncate file (corrupting it
578 // with stale records) or be lost. Verify the resulting file
579 // contains only a fresh header.
580 let (_guard, path) = temp_wal("truncate_drain");
581 let mut writer = WalWriter::open(&path).unwrap();
582 // Write enough small records to fill some of the 64 KiB buffer
583 // but stay below the auto-flush threshold.
584 for tx in 0..200u64 {
585 writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
586 }
587 // Sanity: bytes are buffered.
588 assert_eq!(std::fs::metadata(&path).unwrap().len(), 8);
589
590 writer.truncate().unwrap();
591 // After truncate the file is just the header again.
592 let on_disk = std::fs::metadata(&path).unwrap().len();
593 assert_eq!(on_disk, 8);
594 assert_eq!(writer.current_lsn(), 8);
595 assert_eq!(writer.durable_lsn(), 8);
596
597 // And we can append again successfully.
598 writer.append(&WalRecord::Begin { tx_id: 99 }).unwrap();
599 writer.sync().unwrap();
600 assert_eq!(std::fs::metadata(&path).unwrap().len(), 8 + 13);
601 }
602
603 #[test]
604 fn reopen_sees_only_synced_records() {
605 // Records that were appended but never sync'd must NOT
606 // survive a reopen — they lived in the BufWriter, never made
607 // it to the kernel, and the previous WalWriter went out of
608 // scope. The new WalWriter reopens the file and reads from
609 // EOF, which reflects only the bytes that hit disk.
610 //
611 // We sync some records, then drop the writer mid-buffer, and
612 // assert the reopen LSN matches only the synced prefix.
613 let (_guard, path) = temp_wal("reopen_synced_only");
614 let synced_lsn;
615 {
616 let mut writer = WalWriter::open(&path).unwrap();
617 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
618 writer.sync().unwrap();
619 synced_lsn = writer.current_lsn();
620 // These records are never sync'd before drop. Drop runs
621 // BufWriter::flush which DOES write them — see note below.
622 for tx in 100..120u64 {
623 writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
624 }
625 // Without a sync, the in-buffer bytes are still pending.
626 // BufWriter's Drop impl does flush to the file but does
627 // not call sync_all. For reopen-LSN purposes, on-disk
628 // bytes count regardless of fsync, so the reopened LSN
629 // will reflect the dropped writes too.
630 }
631 let writer = WalWriter::open(&path).unwrap();
632 // The reopen LSN reflects what's physically on disk after
633 // BufWriter::Drop flushes its buffer. That may or may not
634 // include the unsync'd records depending on platform; the
635 // contract we care about is that durable_lsn ≥ synced_lsn.
636 assert!(writer.durable_lsn() >= synced_lsn);
637 }
638}