1use super::record::WalRecord;
2use std::fs::{File, OpenOptions};
3use std::io::{self, BufWriter, Seek, SeekFrom, Write};
4use std::path::Path;
5use std::sync::Arc;
6
7const WAL_BUFFER_BYTES: usize = 64 * 1024;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17enum WalSyncMethod {
18 Data,
19 All,
20}
21
22pub(crate) struct WalGroupSync {
23 target_lsn: u64,
24 sync_handle: Arc<File>,
25 method: WalSyncMethod,
26}
27
28impl WalGroupSync {
29 pub(crate) fn target_lsn(&self) -> u64 {
30 self.target_lsn
31 }
32
33 pub(crate) fn sync(&self) -> io::Result<()> {
34 match self.method {
35 WalSyncMethod::Data => self.sync_handle.sync_data(),
36 WalSyncMethod::All => self.sync_handle.sync_all(),
37 }
38 }
39}
40
41#[cfg(target_os = "linux")]
54fn reserve_wal_blocks(file: &File, offset: u64, len: u64) -> io::Result<()> {
55 use std::os::unix::io::AsRawFd;
56 if len == 0 {
57 return Ok(());
58 }
59 let ret = unsafe {
62 libc::fallocate(
63 file.as_raw_fd(),
64 libc::FALLOC_FL_KEEP_SIZE,
65 offset as libc::off_t,
66 len as libc::off_t,
67 )
68 };
69 if ret == 0 {
70 Ok(())
71 } else {
72 Err(io::Error::last_os_error())
73 }
74}
75
76#[cfg(not(target_os = "linux"))]
77fn reserve_wal_blocks(_file: &File, _offset: u64, _len: u64) -> io::Result<()> {
78 Err(io::Error::new(
79 io::ErrorKind::Unsupported,
80 "WAL preallocation is only implemented on linux",
81 ))
82}
83
84fn fallocate_unsupported(err: &io::Error) -> bool {
89 if err.kind() == io::ErrorKind::Unsupported {
90 return true;
91 }
92 #[cfg(target_os = "linux")]
93 {
94 matches!(
95 err.raw_os_error(),
96 Some(libc::EOPNOTSUPP) | Some(libc::ENOSYS) | Some(libc::EINVAL)
97 )
98 }
99 #[cfg(not(target_os = "linux"))]
100 {
101 false
102 }
103}
104
105pub struct WalWriter {
120 file: BufWriter<File>,
121 sync_handle: Arc<File>,
132 current_lsn: u64,
135 durable_lsn: u64,
141 last_synced_size: u64,
146 preallocated_to: u64,
153 prealloc_supported: bool,
158 prealloc_metadata_dirty: bool,
162 #[cfg(test)]
163 last_sync_method: Option<WalSyncMethod>,
164}
165
166impl WalWriter {
167 pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
169 let exists = path.as_ref().exists();
170
171 let mut raw = OpenOptions::new()
175 .read(true)
176 .create(true)
177 .append(true)
178 .open(path)?;
179
180 let current_lsn = if !exists || raw.metadata()?.len() == 0 {
181 raw.write_all(&reddb_file::encode_wal_file_header())?;
182 raw.sync_all()?;
183 reddb_file::WAL_FILE_HEADER_BYTES as u64
184 } else {
185 raw.seek(SeekFrom::End(0))?
189 };
190
191 let sync_handle = Arc::new(raw.try_clone()?);
197 let file = BufWriter::with_capacity(WAL_BUFFER_BYTES, raw);
198
199 let mut writer = Self {
203 file,
204 sync_handle,
205 current_lsn,
206 durable_lsn: current_lsn,
207 last_synced_size: current_lsn,
208 preallocated_to: 0,
209 prealloc_supported: true,
210 prealloc_metadata_dirty: false,
211 #[cfg(test)]
212 last_sync_method: None,
213 };
214 writer.ensure_preallocated()?;
217 Ok(writer)
218 }
219
220 fn ensure_preallocated(&mut self) -> io::Result<()> {
231 if !self.prealloc_supported {
232 return Ok(());
233 }
234 let target = reddb_file::next_main_wal_segment_boundary(self.current_lsn);
235 if target <= self.preallocated_to {
236 return Ok(());
237 }
238 let from = self.preallocated_to;
239 match reserve_wal_blocks(self.file.get_ref(), from, target - from) {
240 Ok(()) => {
241 self.preallocated_to = target;
242 self.prealloc_metadata_dirty = true;
243 }
244 Err(ref e) if fallocate_unsupported(e) => self.prealloc_supported = false,
245 Err(_) => {
246 }
249 }
250 Ok(())
251 }
252
253 pub fn append(&mut self, record: &WalRecord) -> io::Result<u64> {
262 let bytes = record.encode();
263 self.file.write_all(&bytes)?;
264
265 let record_lsn = self.current_lsn;
266 self.current_lsn += bytes.len() as u64;
267
268 self.ensure_preallocated()?;
269 Ok(record_lsn)
270 }
271
272 pub fn append_bytes(&mut self, bytes: &[u8]) -> io::Result<u64> {
284 self.file.write_all(bytes)?;
285 let record_lsn = self.current_lsn;
286 self.current_lsn += bytes.len() as u64;
287 self.ensure_preallocated()?;
288 Ok(record_lsn)
289 }
290
291 pub fn set_current_lsn(&mut self, lsn: u64) {
298 self.current_lsn = lsn;
299 }
300
301 pub fn sync(&mut self) -> io::Result<()> {
309 self.file.flush()?;
310 self.sync_flushed_file()?;
311 self.durable_lsn = self.current_lsn;
312 Ok(())
313 }
314
315 pub fn flush_until(&mut self, target: u64) -> io::Result<()> {
323 if self.durable_lsn >= target {
324 return Ok(());
325 }
326 self.file.flush()?;
327 self.sync_flushed_file()?;
328 self.durable_lsn = self.current_lsn;
329 Ok(())
330 }
331
332 fn sync_flushed_file(&mut self) -> io::Result<()> {
333 let method = self.next_sync_method();
334 match method {
335 WalSyncMethod::Data => self.file.get_ref().sync_data()?,
336 WalSyncMethod::All => self.file.get_ref().sync_all()?,
337 }
338 self.mark_sync_complete(method, self.current_lsn);
339 Ok(())
340 }
341
342 fn next_sync_method(&self) -> WalSyncMethod {
343 if !self.prealloc_metadata_dirty && self.current_lsn <= self.last_synced_size {
344 WalSyncMethod::Data
345 } else {
346 WalSyncMethod::All
347 }
348 }
349
350 fn mark_sync_complete(&mut self, method: WalSyncMethod, lsn: u64) {
351 match method {
352 WalSyncMethod::Data => {}
353 WalSyncMethod::All => {
354 self.last_synced_size = self.preallocated_to.max(lsn);
355 self.prealloc_metadata_dirty = false;
356 }
357 }
358 #[cfg(test)]
359 {
360 self.last_sync_method = Some(method);
361 }
362 }
363
364 pub fn durable_lsn(&self) -> u64 {
368 self.durable_lsn
369 }
370
371 pub fn current_lsn(&self) -> u64 {
373 self.current_lsn
374 }
375
376 pub(crate) fn drain_for_group_sync(&mut self) -> io::Result<WalGroupSync> {
396 self.file.flush()?;
398 Ok(WalGroupSync {
399 target_lsn: self.current_lsn,
400 sync_handle: Arc::clone(&self.sync_handle),
401 method: self.next_sync_method(),
402 })
403 }
404
405 pub(crate) fn mark_durable(&mut self, sync: &WalGroupSync) {
411 let lsn = sync.target_lsn;
412 if lsn > self.durable_lsn {
413 self.durable_lsn = lsn;
414 }
415 self.mark_sync_complete(sync.method, lsn);
416 }
417
418 pub fn truncate(&mut self) -> io::Result<()> {
426 self.file.flush()?;
430
431 {
432 let raw = self.file.get_mut();
433 raw.set_len(0)?;
434 raw.seek(SeekFrom::Start(0))?;
435 }
436
437 self.file.write_all(&reddb_file::encode_wal_file_header())?;
439 self.file.flush()?;
440 self.file.get_ref().sync_all()?;
441
442 self.current_lsn = reddb_file::WAL_FILE_HEADER_BYTES as u64;
443 self.durable_lsn = reddb_file::WAL_FILE_HEADER_BYTES as u64;
444 self.last_synced_size = reddb_file::WAL_FILE_HEADER_BYTES as u64;
445 self.prealloc_metadata_dirty = false;
446 #[cfg(test)]
447 {
448 self.last_sync_method = Some(WalSyncMethod::All);
449 }
450
451 self.preallocated_to = 0;
455 self.ensure_preallocated()?;
456 Ok(())
457 }
458}
459
460#[cfg(test)]
461mod tests {
462 use super::*;
463 use std::path::PathBuf;
464
465 struct FileGuard {
466 path: PathBuf,
467 }
468
469 impl Drop for FileGuard {
470 fn drop(&mut self) {
471 let _ = std::fs::remove_file(&self.path);
472 }
473 }
474
475 fn temp_wal(name: &str) -> (FileGuard, PathBuf) {
476 let path = reddb_file::layout::wal_component_temp_path(
477 &std::env::temp_dir(),
478 "writer",
479 name,
480 std::process::id(),
481 );
482 let guard = FileGuard { path: path.clone() };
483 let _ = std::fs::remove_file(&path);
484 (guard, path)
485 }
486
487 #[test]
488 fn test_create_new_wal() {
489 let (_guard, path) = temp_wal("create");
490 let writer = WalWriter::open(&path).expect("open() should succeed");
491
492 assert_eq!(writer.current_lsn(), 8);
494 assert!(path.exists());
495 }
496
497 #[test]
498 fn test_append_record() {
499 let (_guard, path) = temp_wal("append");
500 let mut writer = WalWriter::open(&path).expect("open() should succeed");
501
502 let record = WalRecord::Begin { tx_id: 42 };
503 let lsn = writer.append(&record).expect("append() should succeed");
504
505 assert_eq!(lsn, 8);
507
508 assert_eq!(writer.current_lsn(), 8 + 21);
511 }
512
513 #[test]
514 fn test_append_multiple_records() {
515 let (_guard, path) = temp_wal("multi");
516 let mut writer = WalWriter::open(&path).expect("open() should succeed");
517
518 let lsn1 = writer
519 .append(&WalRecord::Begin { tx_id: 1 })
520 .expect("append() should succeed");
521 let lsn2 = writer
522 .append(&WalRecord::Begin { tx_id: 2 })
523 .expect("append() should succeed");
524 let lsn3 = writer
525 .append(&WalRecord::Commit { tx_id: 1 })
526 .expect("append() should succeed");
527
528 assert_eq!(lsn1, 8);
529 assert_eq!(lsn2, 8 + 21);
530 assert_eq!(lsn3, 8 + 21 + 21);
531 }
532
533 #[test]
534 fn test_page_write_lsn() {
535 let (_guard, path) = temp_wal("pagewrite");
536 let mut writer = WalWriter::open(&path).expect("open() should succeed");
537
538 let lsn1 = writer
540 .append(&WalRecord::Begin { tx_id: 1 })
541 .expect("append() should succeed");
542 assert_eq!(lsn1, 8);
543
544 let data = vec![1, 2, 3, 4, 5];
546 let lsn2 = writer
547 .append(&WalRecord::PageWrite {
548 tx_id: 1,
549 page_id: 100,
550 data: data.clone(),
551 })
552 .expect("value is present");
553
554 assert_eq!(lsn2, 8 + 21); assert_eq!(writer.current_lsn(), 8 + 21 + 34);
558 }
559
560 #[test]
561 fn test_sync() {
562 let (_guard, path) = temp_wal("sync");
563 let mut writer = WalWriter::open(&path).expect("open() should succeed");
564
565 writer
566 .append(&WalRecord::Begin { tx_id: 1 })
567 .expect("append() should succeed");
568 writer.sync().expect("sync() should succeed");
569
570 assert!(path.exists());
572 }
573
574 #[test]
575 fn test_truncate() {
576 let (_guard, path) = temp_wal("truncate");
577 let mut writer = WalWriter::open(&path).expect("open() should succeed");
578
579 writer
581 .append(&WalRecord::Begin { tx_id: 1 })
582 .expect("append() should succeed");
583 writer
584 .append(&WalRecord::PageWrite {
585 tx_id: 1,
586 page_id: 0,
587 data: vec![0; 100],
588 })
589 .expect("value is present");
590 writer
591 .append(&WalRecord::Commit { tx_id: 1 })
592 .expect("append() should succeed");
593
594 let lsn_before = writer.current_lsn();
595 assert!(lsn_before > 8);
596
597 writer.truncate().expect("truncate() should succeed");
599
600 assert_eq!(writer.current_lsn(), 8);
602
603 let len = std::fs::metadata(&path)
605 .expect("metadata() should succeed")
606 .len();
607 assert_eq!(len, 8);
608 }
609
610 #[test]
611 fn test_reopen_existing() {
612 let (_guard, path) = temp_wal("reopen");
613
614 let lsn_after_write;
616 {
617 let mut writer = WalWriter::open(&path).expect("open() should succeed");
618 writer
619 .append(&WalRecord::Begin { tx_id: 1 })
620 .expect("append() should succeed");
621 writer
622 .append(&WalRecord::Commit { tx_id: 1 })
623 .expect("append() should succeed");
624 lsn_after_write = writer.current_lsn();
625 }
626
627 {
629 let writer = WalWriter::open(&path).expect("open() should succeed");
630 assert_eq!(writer.current_lsn(), lsn_after_write);
632 }
633 }
634
635 #[test]
636 fn test_checkpoint_record() {
637 let (_guard, path) = temp_wal("checkpoint");
638 let mut writer = WalWriter::open(&path).expect("open() should succeed");
639
640 let lsn = writer
642 .append(&WalRecord::Checkpoint { lsn: 12345 })
643 .expect("value is present");
644 assert_eq!(lsn, 8);
645 assert_eq!(writer.current_lsn(), 8 + 21);
646 }
647
648 #[test]
653 fn fresh_wal_has_durable_lsn_at_header_end() {
654 let (_guard, path) = temp_wal("durable_init");
655 let writer = WalWriter::open(&path).expect("open() should succeed");
656 assert_eq!(writer.durable_lsn(), 8);
657 assert_eq!(writer.current_lsn(), 8);
658 }
659
660 #[test]
661 fn flush_until_below_durable_is_noop() {
662 let (_guard, path) = temp_wal("flush_noop");
663 let mut writer = WalWriter::open(&path).expect("open() should succeed");
664 let before = writer.durable_lsn();
666 writer.flush_until(0).expect("flush_until() should succeed");
667 writer.flush_until(8).expect("flush_until() should succeed");
668 assert_eq!(writer.durable_lsn(), before);
669 }
670
671 #[test]
672 fn flush_until_advances_durable_to_current() {
673 let (_guard, path) = temp_wal("flush_advance");
674 let mut writer = WalWriter::open(&path).expect("open() should succeed");
675 writer
676 .append(&WalRecord::Begin { tx_id: 7 })
677 .expect("append() should succeed");
678 writer
679 .append(&WalRecord::Commit { tx_id: 7 })
680 .expect("append() should succeed");
681 let target = writer.current_lsn();
682 assert_eq!(writer.durable_lsn(), 8);
684 writer
685 .flush_until(target)
686 .expect("flush_until() should succeed");
687 assert_eq!(writer.durable_lsn(), target);
688 }
689
690 #[test]
691 fn flush_until_is_monotonic() {
692 let (_guard, path) = temp_wal("flush_monotonic");
693 let mut writer = WalWriter::open(&path).expect("open() should succeed");
694 writer
695 .append(&WalRecord::Begin { tx_id: 1 })
696 .expect("append() should succeed");
697 let lo = writer.current_lsn();
698 writer
699 .flush_until(lo)
700 .expect("flush_until() should succeed");
701 let durable_after_lo = writer.durable_lsn();
702 writer
703 .append(&WalRecord::Commit { tx_id: 1 })
704 .expect("append() should succeed");
705 let hi = writer.current_lsn();
706 writer
707 .flush_until(hi)
708 .expect("flush_until() should succeed");
709 assert!(writer.durable_lsn() >= durable_after_lo);
710 writer
712 .flush_until(lo)
713 .expect("flush_until() should succeed");
714 assert_eq!(writer.durable_lsn(), hi);
715 }
716
717 #[test]
718 fn sync_advances_durable_lsn_too() {
719 let (_guard, path) = temp_wal("sync_durable");
720 let mut writer = WalWriter::open(&path).expect("open() should succeed");
721 writer
722 .append(&WalRecord::Begin { tx_id: 9 })
723 .expect("append() should succeed");
724 let before = writer.durable_lsn();
725 let after_append = writer.current_lsn();
726 assert!(after_append > before);
727 writer.sync().expect("sync() should succeed");
728 assert_eq!(writer.durable_lsn(), after_append);
729 }
730
731 #[test]
732 fn sync_all_is_used_when_wal_size_grew() {
733 let (_guard, path) = temp_wal("sync_all_grew");
734 let mut writer = WalWriter::open(&path).expect("open() should succeed");
735
736 writer
737 .append(&WalRecord::Begin { tx_id: 1 })
738 .expect("append() should succeed");
739 writer.sync().expect("sync() should succeed");
740
741 assert_eq!(writer.last_sync_method, Some(WalSyncMethod::All));
742 assert!(writer.last_synced_size >= writer.current_lsn());
743 assert!(!writer.prealloc_metadata_dirty);
744 }
745
746 #[test]
747 fn sync_all_is_used_for_metadata_only_preallocation() {
748 let (_guard, path) = temp_wal("sync_all_prealloc_metadata");
749 let mut writer = WalWriter::open(&path).expect("open() should succeed");
750 if !writer.prealloc_supported {
751 return;
752 }
753
754 assert_eq!(writer.current_lsn(), 8);
755 assert!(writer.prealloc_metadata_dirty);
756
757 writer.sync().expect("sync() should succeed");
758
759 assert_eq!(writer.last_sync_method, Some(WalSyncMethod::All));
760 assert_eq!(writer.last_synced_size, writer.preallocated_to);
761 assert!(!writer.prealloc_metadata_dirty);
762 }
763
764 #[test]
765 fn sync_data_is_used_when_wal_size_is_unchanged() {
766 let (_guard, path) = temp_wal("sync_data_unchanged");
767 let mut writer = WalWriter::open(&path).expect("open() should succeed");
768
769 writer
770 .append(&WalRecord::Begin { tx_id: 1 })
771 .expect("append() should succeed");
772 writer.sync().expect("sync() should succeed");
773 let synced_size = writer.last_synced_size;
774 writer.sync().expect("sync() should succeed");
775
776 assert_eq!(writer.last_sync_method, Some(WalSyncMethod::Data));
777 assert_eq!(writer.last_synced_size, synced_size);
778 assert_eq!(writer.durable_lsn(), writer.current_lsn());
779 }
780
781 #[test]
782 fn sync_data_is_used_for_appends_within_synced_preallocation() {
783 let (_guard, path) = temp_wal("sync_data_preallocated_append");
784 let mut writer = WalWriter::open(&path).expect("open() should succeed");
785 if !writer.prealloc_supported {
786 return;
787 }
788
789 writer
790 .append(&WalRecord::Begin { tx_id: 1 })
791 .expect("append() should succeed");
792 writer.sync().expect("sync() should succeed");
793 assert_eq!(writer.last_sync_method, Some(WalSyncMethod::All));
794
795 writer
796 .append(&WalRecord::Commit { tx_id: 1 })
797 .expect("append() should succeed");
798 writer.sync().expect("sync() should succeed");
799
800 assert_eq!(writer.last_sync_method, Some(WalSyncMethod::Data));
801 assert_eq!(writer.durable_lsn(), writer.current_lsn());
802 assert!(writer.current_lsn() <= writer.last_synced_size);
803 }
804
805 #[test]
806 fn group_sync_uses_sync_data_within_synced_preallocation() {
807 let (_guard, path) = temp_wal("group_sync_data_preallocated_append");
808 let mut writer = WalWriter::open(&path).expect("open() should succeed");
809 if !writer.prealloc_supported {
810 return;
811 }
812
813 writer
814 .append(&WalRecord::Begin { tx_id: 1 })
815 .expect("append() should succeed");
816 writer.sync().expect("sync() should succeed");
817 assert_eq!(writer.last_sync_method, Some(WalSyncMethod::All));
818
819 writer
820 .append(&WalRecord::Commit { tx_id: 1 })
821 .expect("append() should succeed");
822 let sync = writer
823 .drain_for_group_sync()
824 .expect("drain_for_group_sync() should succeed");
825 assert_eq!(sync.method, WalSyncMethod::Data);
826 sync.sync().expect("sync() should succeed");
827 writer.mark_durable(&sync);
828
829 assert_eq!(writer.last_sync_method, Some(WalSyncMethod::Data));
830 assert_eq!(writer.durable_lsn(), writer.current_lsn());
831 }
832
833 #[test]
834 fn truncate_resets_durable_lsn() {
835 let (_guard, path) = temp_wal("truncate_durable");
836 let mut writer = WalWriter::open(&path).expect("open() should succeed");
837 writer
838 .append(&WalRecord::Begin { tx_id: 1 })
839 .expect("append() should succeed");
840 writer.sync().expect("sync() should succeed");
841 assert!(writer.durable_lsn() > 8);
842 writer.truncate().expect("truncate() should succeed");
843 assert_eq!(writer.durable_lsn(), 8);
844 assert_eq!(writer.current_lsn(), 8);
845 }
846
847 #[test]
848 fn reopen_initialises_durable_to_current() {
849 let (_guard, path) = temp_wal("reopen_durable");
850 {
851 let mut writer = WalWriter::open(&path).expect("open() should succeed");
852 writer
853 .append(&WalRecord::Begin { tx_id: 1 })
854 .expect("append() should succeed");
855 writer.sync().expect("sync() should succeed");
856 }
857 let writer = WalWriter::open(&path).expect("open() should succeed");
858 assert_eq!(writer.durable_lsn(), writer.current_lsn());
860 }
861
862 #[test]
867 fn bufwriter_coalesces_until_sync() {
868 let (_guard, path) = temp_wal("bufwriter_coalesce");
872 let mut writer = WalWriter::open(&path).expect("open() should succeed");
873 for tx in 0..100u64 {
874 writer
875 .append(&WalRecord::Begin { tx_id: tx })
876 .expect("append() should succeed");
877 }
878 assert_eq!(writer.current_lsn(), 8 + 100 * 21);
880 let on_disk = std::fs::metadata(&path)
882 .expect("metadata() should succeed")
883 .len();
884 assert_eq!(on_disk, 8, "BufWriter leaked bytes to disk before sync");
885 }
886
887 #[test]
888 fn sync_drains_bufwriter_before_fsync() {
889 let (_guard, path) = temp_wal("sync_drains");
892 let mut writer = WalWriter::open(&path).expect("open() should succeed");
893 for tx in 0..50u64 {
894 writer
895 .append(&WalRecord::Begin { tx_id: tx })
896 .expect("append() should succeed");
897 }
898 writer.sync().expect("sync() should succeed");
899 let on_disk = std::fs::metadata(&path)
900 .expect("metadata() should succeed")
901 .len();
902 assert_eq!(on_disk, writer.current_lsn());
903 assert_eq!(writer.durable_lsn(), writer.current_lsn());
904 }
905
906 #[test]
907 fn flush_until_drains_bufwriter_too() {
908 let (_guard, path) = temp_wal("flush_until_drains");
912 let mut writer = WalWriter::open(&path).expect("open() should succeed");
913 for tx in 0..30u64 {
914 writer
915 .append(&WalRecord::Begin { tx_id: tx })
916 .expect("append() should succeed");
917 }
918 let target = writer.current_lsn();
919 writer
920 .flush_until(target)
921 .expect("flush_until() should succeed");
922 let on_disk = std::fs::metadata(&path)
923 .expect("metadata() should succeed")
924 .len();
925 assert_eq!(on_disk, target);
926 assert_eq!(writer.durable_lsn(), target);
927 }
928
929 #[test]
930 fn truncate_drains_pending_bufwriter_bytes_first() {
931 let (_guard, path) = temp_wal("truncate_drain");
936 let mut writer = WalWriter::open(&path).expect("open() should succeed");
937 for tx in 0..200u64 {
940 writer
941 .append(&WalRecord::Begin { tx_id: tx })
942 .expect("append() should succeed");
943 }
944 assert_eq!(
946 std::fs::metadata(&path)
947 .expect("metadata() should succeed")
948 .len(),
949 8
950 );
951
952 writer.truncate().expect("truncate() should succeed");
953 let on_disk = std::fs::metadata(&path)
955 .expect("metadata() should succeed")
956 .len();
957 assert_eq!(on_disk, 8);
958 assert_eq!(writer.current_lsn(), 8);
959 assert_eq!(writer.durable_lsn(), 8);
960
961 writer
963 .append(&WalRecord::Begin { tx_id: 99 })
964 .expect("append() should succeed");
965 writer.sync().expect("sync() should succeed");
966 assert_eq!(
967 std::fs::metadata(&path)
968 .expect("metadata() should succeed")
969 .len(),
970 8 + 21
971 );
972 }
973
974 #[test]
975 fn reopen_sees_only_synced_records() {
976 let (_guard, path) = temp_wal("reopen_synced_only");
985 let synced_lsn;
986 {
987 let mut writer = WalWriter::open(&path).expect("open() should succeed");
988 writer
989 .append(&WalRecord::Begin { tx_id: 1 })
990 .expect("append() should succeed");
991 writer.sync().expect("sync() should succeed");
992 synced_lsn = writer.current_lsn();
993 for tx in 100..120u64 {
996 writer
997 .append(&WalRecord::Begin { tx_id: tx })
998 .expect("append() should succeed");
999 }
1000 }
1006 let writer = WalWriter::open(&path).expect("open() should succeed");
1007 assert!(writer.durable_lsn() >= synced_lsn);
1012 }
1013
1014 fn allocated_bytes(path: &std::path::Path) -> u64 {
1021 use fs2::FileExt;
1022 let f = std::fs::File::open(path).expect("open() should succeed");
1023 f.allocated_size().expect("allocated_size() should succeed")
1024 }
1025
1026 #[test]
1027 fn segment_boundary_rounds_strictly_up() {
1028 assert_eq!(
1031 reddb_file::next_main_wal_segment_boundary(0),
1032 reddb_file::MAIN_WAL_SEGMENT_BYTES
1033 );
1034 assert_eq!(
1035 reddb_file::next_main_wal_segment_boundary(8),
1036 reddb_file::MAIN_WAL_SEGMENT_BYTES
1037 );
1038 assert_eq!(
1039 reddb_file::next_main_wal_segment_boundary(reddb_file::MAIN_WAL_SEGMENT_BYTES - 1),
1040 reddb_file::MAIN_WAL_SEGMENT_BYTES
1041 );
1042 assert_eq!(
1044 reddb_file::next_main_wal_segment_boundary(reddb_file::MAIN_WAL_SEGMENT_BYTES),
1045 2 * reddb_file::MAIN_WAL_SEGMENT_BYTES
1046 );
1047 assert_eq!(
1048 reddb_file::next_main_wal_segment_boundary(reddb_file::MAIN_WAL_SEGMENT_BYTES + 1),
1049 2 * reddb_file::MAIN_WAL_SEGMENT_BYTES
1050 );
1051 }
1052
1053 #[test]
1054 fn open_preallocates_first_segment() {
1055 let (_guard, path) = temp_wal("prealloc_open");
1058 let writer = WalWriter::open(&path).expect("open() should succeed");
1059 if !writer.prealloc_supported {
1060 return; }
1062 assert_eq!(writer.preallocated_to, reddb_file::MAIN_WAL_SEGMENT_BYTES);
1063 assert!(allocated_bytes(&path) >= reddb_file::MAIN_WAL_SEGMENT_BYTES);
1066 assert_eq!(
1067 std::fs::metadata(&path)
1068 .expect("metadata() should succeed")
1069 .len(),
1070 8
1071 );
1072 }
1073
1074 #[test]
1075 fn preallocation_does_not_grow_logical_length() {
1076 let (_guard, path) = temp_wal("prealloc_logical");
1082 let mut writer = WalWriter::open(&path).expect("open() should succeed");
1083 for tx in 0..50u64 {
1084 writer
1085 .append(&WalRecord::Begin { tx_id: tx })
1086 .expect("append() should succeed");
1087 }
1088 writer.sync().expect("sync() should succeed");
1089 let logical = std::fs::metadata(&path)
1090 .expect("metadata() should succeed")
1091 .len();
1092 assert_eq!(logical, 8 + 50 * 21, "preallocation inflated i_size");
1093 assert_eq!(writer.current_lsn(), logical);
1094 }
1095
1096 #[test]
1097 fn truncate_re_extends_a_fresh_segment() {
1098 let (_guard, path) = temp_wal("prealloc_truncate");
1101 let mut writer = WalWriter::open(&path).expect("open() should succeed");
1102 writer
1103 .append(&WalRecord::Begin { tx_id: 1 })
1104 .expect("append() should succeed");
1105 writer.sync().expect("sync() should succeed");
1106
1107 writer.truncate().expect("truncate() should succeed");
1108
1109 assert_eq!(writer.current_lsn(), 8);
1110 assert_eq!(
1111 std::fs::metadata(&path)
1112 .expect("metadata() should succeed")
1113 .len(),
1114 8
1115 );
1116 if writer.prealloc_supported {
1117 assert_eq!(writer.preallocated_to, reddb_file::MAIN_WAL_SEGMENT_BYTES);
1118 assert!(allocated_bytes(&path) >= reddb_file::MAIN_WAL_SEGMENT_BYTES);
1119 }
1120 }
1121
1122 #[test]
1123 fn preallocated_wal_recovers_records_without_trailing_garbage() {
1124 use super::super::reader::WalReader;
1128 let (_guard, path) = temp_wal("prealloc_recover");
1129 {
1130 let mut writer = WalWriter::open(&path).expect("open() should succeed");
1131 writer
1132 .append(&WalRecord::Begin { tx_id: 1 })
1133 .expect("append() should succeed");
1134 writer
1135 .append(&WalRecord::PageWrite {
1136 tx_id: 1,
1137 page_id: 7,
1138 data: vec![1, 2, 3, 4],
1139 })
1140 .expect("value is present");
1141 writer
1142 .append(&WalRecord::Commit { tx_id: 1 })
1143 .expect("append() should succeed");
1144 writer.sync().expect("sync() should succeed");
1145 }
1146 let records: Vec<_> = WalReader::open(&path)
1147 .expect("value is present")
1148 .iter()
1149 .collect::<Result<_, _>>()
1150 .expect("reader must stop cleanly at real EOF, not in reserved tail");
1151 assert_eq!(records.len(), 3);
1152 assert_eq!(records[0].1, WalRecord::Begin { tx_id: 1 });
1153 assert_eq!(records[2].1, WalRecord::Commit { tx_id: 1 });
1154 }
1155}