1use super::record::WalRecord;
2use std::fs::{File, OpenOptions};
3use std::io::{self, BufWriter, Seek, SeekFrom, Write};
4use std::path::Path;
5use std::sync::Arc;
6
7const WAL_BUFFER_BYTES: usize = 64 * 1024;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17enum WalSyncMethod {
18 Data,
19 All,
20}
21
22pub(crate) struct WalGroupSync {
23 target_lsn: u64,
24 sync_handle: Arc<File>,
25 method: WalSyncMethod,
26}
27
28impl WalGroupSync {
29 pub(crate) fn target_lsn(&self) -> u64 {
30 self.target_lsn
31 }
32
33 pub(crate) fn sync(&self) -> io::Result<()> {
34 match self.method {
35 WalSyncMethod::Data => self.sync_handle.sync_data(),
36 WalSyncMethod::All => self.sync_handle.sync_all(),
37 }
38 }
39}
40
41#[cfg(target_os = "linux")]
54fn reserve_wal_blocks(file: &File, offset: u64, len: u64) -> io::Result<()> {
55 use std::os::unix::io::AsRawFd;
56 if len == 0 {
57 return Ok(());
58 }
59 let ret = unsafe {
62 libc::fallocate(
63 file.as_raw_fd(),
64 libc::FALLOC_FL_KEEP_SIZE,
65 offset as libc::off_t,
66 len as libc::off_t,
67 )
68 };
69 if ret == 0 {
70 Ok(())
71 } else {
72 Err(io::Error::last_os_error())
73 }
74}
75
76#[cfg(not(target_os = "linux"))]
77fn reserve_wal_blocks(_file: &File, _offset: u64, _len: u64) -> io::Result<()> {
78 Err(io::Error::new(
79 io::ErrorKind::Unsupported,
80 "WAL preallocation is only implemented on linux",
81 ))
82}
83
84fn fallocate_unsupported(err: &io::Error) -> bool {
89 if err.kind() == io::ErrorKind::Unsupported {
90 return true;
91 }
92 #[cfg(target_os = "linux")]
93 {
94 matches!(
95 err.raw_os_error(),
96 Some(libc::EOPNOTSUPP) | Some(libc::ENOSYS) | Some(libc::EINVAL)
97 )
98 }
99 #[cfg(not(target_os = "linux"))]
100 {
101 false
102 }
103}
104
105pub struct WalWriter {
120 file: BufWriter<File>,
121 sync_handle: Arc<File>,
132 current_lsn: u64,
135 durable_lsn: u64,
141 last_synced_size: u64,
146 preallocated_to: u64,
153 prealloc_supported: bool,
158 prealloc_metadata_dirty: bool,
162 #[cfg(test)]
163 last_sync_method: Option<WalSyncMethod>,
164}
165
166impl WalWriter {
167 pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
169 let exists = path.as_ref().exists();
170
171 let mut raw = OpenOptions::new()
175 .read(true)
176 .create(true)
177 .append(true)
178 .open(path)?;
179
180 let current_lsn = if !exists || raw.metadata()?.len() == 0 {
181 raw.write_all(&reddb_file::encode_wal_file_header())?;
182 raw.sync_all()?;
183 reddb_file::WAL_FILE_HEADER_BYTES as u64
184 } else {
185 raw.seek(SeekFrom::End(0))?
189 };
190
191 let sync_handle = Arc::new(raw.try_clone()?);
197 let file = BufWriter::with_capacity(WAL_BUFFER_BYTES, raw);
198
199 let mut writer = Self {
203 file,
204 sync_handle,
205 current_lsn,
206 durable_lsn: current_lsn,
207 last_synced_size: current_lsn,
208 preallocated_to: 0,
209 prealloc_supported: true,
210 prealloc_metadata_dirty: false,
211 #[cfg(test)]
212 last_sync_method: None,
213 };
214 writer.ensure_preallocated()?;
217 Ok(writer)
218 }
219
220 fn ensure_preallocated(&mut self) -> io::Result<()> {
231 if !self.prealloc_supported {
232 return Ok(());
233 }
234 let target = reddb_file::next_main_wal_segment_boundary(self.current_lsn);
235 if target <= self.preallocated_to {
236 return Ok(());
237 }
238 let from = self.preallocated_to;
239 match reserve_wal_blocks(self.file.get_ref(), from, target - from) {
240 Ok(()) => {
241 self.preallocated_to = target;
242 self.prealloc_metadata_dirty = true;
243 }
244 Err(ref e) if fallocate_unsupported(e) => self.prealloc_supported = false,
245 Err(_) => {
246 }
249 }
250 Ok(())
251 }
252
253 pub fn append(&mut self, record: &WalRecord) -> io::Result<u64> {
262 let bytes = record.encode();
263 self.file.write_all(&bytes)?;
264
265 let record_lsn = self.current_lsn;
266 self.current_lsn += bytes.len() as u64;
267
268 self.ensure_preallocated()?;
269 Ok(record_lsn)
270 }
271
272 pub fn append_bytes(&mut self, bytes: &[u8]) -> io::Result<u64> {
284 self.file.write_all(bytes)?;
285 let record_lsn = self.current_lsn;
286 self.current_lsn += bytes.len() as u64;
287 self.ensure_preallocated()?;
288 Ok(record_lsn)
289 }
290
291 pub fn set_current_lsn(&mut self, lsn: u64) {
298 self.current_lsn = lsn;
299 }
300
301 pub fn sync(&mut self) -> io::Result<()> {
309 self.file.flush()?;
310 self.sync_flushed_file()?;
311 self.durable_lsn = self.current_lsn;
312 Ok(())
313 }
314
315 pub fn flush_until(&mut self, target: u64) -> io::Result<()> {
323 if self.durable_lsn >= target {
324 return Ok(());
325 }
326 self.file.flush()?;
327 self.sync_flushed_file()?;
328 self.durable_lsn = self.current_lsn;
329 Ok(())
330 }
331
332 fn sync_flushed_file(&mut self) -> io::Result<()> {
333 let method = self.next_sync_method();
334 match method {
335 WalSyncMethod::Data => self.file.get_ref().sync_data()?,
336 WalSyncMethod::All => self.file.get_ref().sync_all()?,
337 }
338 self.mark_sync_complete(method, self.current_lsn);
339 Ok(())
340 }
341
342 fn next_sync_method(&self) -> WalSyncMethod {
343 if !self.prealloc_metadata_dirty && self.current_lsn <= self.last_synced_size {
344 WalSyncMethod::Data
345 } else {
346 WalSyncMethod::All
347 }
348 }
349
350 fn mark_sync_complete(&mut self, method: WalSyncMethod, lsn: u64) {
351 match method {
352 WalSyncMethod::Data => {}
353 WalSyncMethod::All => {
354 self.last_synced_size = self.preallocated_to.max(lsn);
355 self.prealloc_metadata_dirty = false;
356 }
357 }
358 #[cfg(test)]
359 {
360 self.last_sync_method = Some(method);
361 }
362 }
363
364 pub fn durable_lsn(&self) -> u64 {
368 self.durable_lsn
369 }
370
371 pub fn current_lsn(&self) -> u64 {
373 self.current_lsn
374 }
375
376 pub(crate) fn drain_for_group_sync(&mut self) -> io::Result<WalGroupSync> {
396 self.file.flush()?;
398 Ok(WalGroupSync {
399 target_lsn: self.current_lsn,
400 sync_handle: Arc::clone(&self.sync_handle),
401 method: self.next_sync_method(),
402 })
403 }
404
405 pub(crate) fn mark_durable(&mut self, sync: &WalGroupSync) {
411 let lsn = sync.target_lsn;
412 if lsn > self.durable_lsn {
413 self.durable_lsn = lsn;
414 }
415 self.mark_sync_complete(sync.method, lsn);
416 }
417
418 pub fn truncate(&mut self) -> io::Result<()> {
426 self.file.flush()?;
430
431 {
432 let raw = self.file.get_mut();
433 raw.set_len(0)?;
434 raw.seek(SeekFrom::Start(0))?;
435 }
436
437 self.file.write_all(&reddb_file::encode_wal_file_header())?;
439 self.file.flush()?;
440 self.file.get_ref().sync_all()?;
441
442 self.current_lsn = reddb_file::WAL_FILE_HEADER_BYTES as u64;
443 self.durable_lsn = reddb_file::WAL_FILE_HEADER_BYTES as u64;
444 self.last_synced_size = reddb_file::WAL_FILE_HEADER_BYTES as u64;
445 self.prealloc_metadata_dirty = false;
446 #[cfg(test)]
447 {
448 self.last_sync_method = Some(WalSyncMethod::All);
449 }
450
451 self.preallocated_to = 0;
455 self.ensure_preallocated()?;
456 Ok(())
457 }
458}
459
460#[cfg(test)]
461mod tests {
462 use super::*;
463 use std::path::PathBuf;
464
465 struct FileGuard {
466 path: PathBuf,
467 }
468
469 impl Drop for FileGuard {
470 fn drop(&mut self) {
471 let _ = std::fs::remove_file(&self.path);
472 }
473 }
474
475 fn temp_wal(name: &str) -> (FileGuard, PathBuf) {
476 let path = reddb_file::layout::wal_component_temp_path(
477 &std::env::temp_dir(),
478 "writer",
479 name,
480 std::process::id(),
481 );
482 let guard = FileGuard { path: path.clone() };
483 let _ = std::fs::remove_file(&path);
484 (guard, path)
485 }
486
487 #[test]
488 fn test_create_new_wal() {
489 let (_guard, path) = temp_wal("create");
490 let writer = WalWriter::open(&path).unwrap();
491
492 assert_eq!(writer.current_lsn(), 8);
494 assert!(path.exists());
495 }
496
497 #[test]
498 fn test_append_record() {
499 let (_guard, path) = temp_wal("append");
500 let mut writer = WalWriter::open(&path).unwrap();
501
502 let record = WalRecord::Begin { tx_id: 42 };
503 let lsn = writer.append(&record).unwrap();
504
505 assert_eq!(lsn, 8);
507
508 assert_eq!(writer.current_lsn(), 8 + 21);
511 }
512
513 #[test]
514 fn test_append_multiple_records() {
515 let (_guard, path) = temp_wal("multi");
516 let mut writer = WalWriter::open(&path).unwrap();
517
518 let lsn1 = writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
519 let lsn2 = writer.append(&WalRecord::Begin { tx_id: 2 }).unwrap();
520 let lsn3 = writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
521
522 assert_eq!(lsn1, 8);
523 assert_eq!(lsn2, 8 + 21);
524 assert_eq!(lsn3, 8 + 21 + 21);
525 }
526
527 #[test]
528 fn test_page_write_lsn() {
529 let (_guard, path) = temp_wal("pagewrite");
530 let mut writer = WalWriter::open(&path).unwrap();
531
532 let lsn1 = writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
534 assert_eq!(lsn1, 8);
535
536 let data = vec![1, 2, 3, 4, 5];
538 let lsn2 = writer
539 .append(&WalRecord::PageWrite {
540 tx_id: 1,
541 page_id: 100,
542 data: data.clone(),
543 })
544 .unwrap();
545
546 assert_eq!(lsn2, 8 + 21); assert_eq!(writer.current_lsn(), 8 + 21 + 34);
550 }
551
552 #[test]
553 fn test_sync() {
554 let (_guard, path) = temp_wal("sync");
555 let mut writer = WalWriter::open(&path).unwrap();
556
557 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
558 writer.sync().unwrap();
559
560 assert!(path.exists());
562 }
563
564 #[test]
565 fn test_truncate() {
566 let (_guard, path) = temp_wal("truncate");
567 let mut writer = WalWriter::open(&path).unwrap();
568
569 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
571 writer
572 .append(&WalRecord::PageWrite {
573 tx_id: 1,
574 page_id: 0,
575 data: vec![0; 100],
576 })
577 .unwrap();
578 writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
579
580 let lsn_before = writer.current_lsn();
581 assert!(lsn_before > 8);
582
583 writer.truncate().unwrap();
585
586 assert_eq!(writer.current_lsn(), 8);
588
589 let len = std::fs::metadata(&path).unwrap().len();
591 assert_eq!(len, 8);
592 }
593
594 #[test]
595 fn test_reopen_existing() {
596 let (_guard, path) = temp_wal("reopen");
597
598 let lsn_after_write;
600 {
601 let mut writer = WalWriter::open(&path).unwrap();
602 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
603 writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
604 lsn_after_write = writer.current_lsn();
605 }
606
607 {
609 let writer = WalWriter::open(&path).unwrap();
610 assert_eq!(writer.current_lsn(), lsn_after_write);
612 }
613 }
614
615 #[test]
616 fn test_checkpoint_record() {
617 let (_guard, path) = temp_wal("checkpoint");
618 let mut writer = WalWriter::open(&path).unwrap();
619
620 let lsn = writer
622 .append(&WalRecord::Checkpoint { lsn: 12345 })
623 .unwrap();
624 assert_eq!(lsn, 8);
625 assert_eq!(writer.current_lsn(), 8 + 21);
626 }
627
628 #[test]
633 fn fresh_wal_has_durable_lsn_at_header_end() {
634 let (_guard, path) = temp_wal("durable_init");
635 let writer = WalWriter::open(&path).unwrap();
636 assert_eq!(writer.durable_lsn(), 8);
637 assert_eq!(writer.current_lsn(), 8);
638 }
639
640 #[test]
641 fn flush_until_below_durable_is_noop() {
642 let (_guard, path) = temp_wal("flush_noop");
643 let mut writer = WalWriter::open(&path).unwrap();
644 let before = writer.durable_lsn();
646 writer.flush_until(0).unwrap();
647 writer.flush_until(8).unwrap();
648 assert_eq!(writer.durable_lsn(), before);
649 }
650
651 #[test]
652 fn flush_until_advances_durable_to_current() {
653 let (_guard, path) = temp_wal("flush_advance");
654 let mut writer = WalWriter::open(&path).unwrap();
655 writer.append(&WalRecord::Begin { tx_id: 7 }).unwrap();
656 writer.append(&WalRecord::Commit { tx_id: 7 }).unwrap();
657 let target = writer.current_lsn();
658 assert_eq!(writer.durable_lsn(), 8);
660 writer.flush_until(target).unwrap();
661 assert_eq!(writer.durable_lsn(), target);
662 }
663
664 #[test]
665 fn flush_until_is_monotonic() {
666 let (_guard, path) = temp_wal("flush_monotonic");
667 let mut writer = WalWriter::open(&path).unwrap();
668 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
669 let lo = writer.current_lsn();
670 writer.flush_until(lo).unwrap();
671 let durable_after_lo = writer.durable_lsn();
672 writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
673 let hi = writer.current_lsn();
674 writer.flush_until(hi).unwrap();
675 assert!(writer.durable_lsn() >= durable_after_lo);
676 writer.flush_until(lo).unwrap();
678 assert_eq!(writer.durable_lsn(), hi);
679 }
680
681 #[test]
682 fn sync_advances_durable_lsn_too() {
683 let (_guard, path) = temp_wal("sync_durable");
684 let mut writer = WalWriter::open(&path).unwrap();
685 writer.append(&WalRecord::Begin { tx_id: 9 }).unwrap();
686 let before = writer.durable_lsn();
687 let after_append = writer.current_lsn();
688 assert!(after_append > before);
689 writer.sync().unwrap();
690 assert_eq!(writer.durable_lsn(), after_append);
691 }
692
693 #[test]
694 fn sync_all_is_used_when_wal_size_grew() {
695 let (_guard, path) = temp_wal("sync_all_grew");
696 let mut writer = WalWriter::open(&path).unwrap();
697
698 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
699 writer.sync().unwrap();
700
701 assert_eq!(writer.last_sync_method, Some(WalSyncMethod::All));
702 assert!(writer.last_synced_size >= writer.current_lsn());
703 assert!(!writer.prealloc_metadata_dirty);
704 }
705
706 #[test]
707 fn sync_all_is_used_for_metadata_only_preallocation() {
708 let (_guard, path) = temp_wal("sync_all_prealloc_metadata");
709 let mut writer = WalWriter::open(&path).unwrap();
710 if !writer.prealloc_supported {
711 return;
712 }
713
714 assert_eq!(writer.current_lsn(), 8);
715 assert!(writer.prealloc_metadata_dirty);
716
717 writer.sync().unwrap();
718
719 assert_eq!(writer.last_sync_method, Some(WalSyncMethod::All));
720 assert_eq!(writer.last_synced_size, writer.preallocated_to);
721 assert!(!writer.prealloc_metadata_dirty);
722 }
723
724 #[test]
725 fn sync_data_is_used_when_wal_size_is_unchanged() {
726 let (_guard, path) = temp_wal("sync_data_unchanged");
727 let mut writer = WalWriter::open(&path).unwrap();
728
729 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
730 writer.sync().unwrap();
731 let synced_size = writer.last_synced_size;
732 writer.sync().unwrap();
733
734 assert_eq!(writer.last_sync_method, Some(WalSyncMethod::Data));
735 assert_eq!(writer.last_synced_size, synced_size);
736 assert_eq!(writer.durable_lsn(), writer.current_lsn());
737 }
738
739 #[test]
740 fn sync_data_is_used_for_appends_within_synced_preallocation() {
741 let (_guard, path) = temp_wal("sync_data_preallocated_append");
742 let mut writer = WalWriter::open(&path).unwrap();
743 if !writer.prealloc_supported {
744 return;
745 }
746
747 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
748 writer.sync().unwrap();
749 assert_eq!(writer.last_sync_method, Some(WalSyncMethod::All));
750
751 writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
752 writer.sync().unwrap();
753
754 assert_eq!(writer.last_sync_method, Some(WalSyncMethod::Data));
755 assert_eq!(writer.durable_lsn(), writer.current_lsn());
756 assert!(writer.current_lsn() <= writer.last_synced_size);
757 }
758
759 #[test]
760 fn group_sync_uses_sync_data_within_synced_preallocation() {
761 let (_guard, path) = temp_wal("group_sync_data_preallocated_append");
762 let mut writer = WalWriter::open(&path).unwrap();
763 if !writer.prealloc_supported {
764 return;
765 }
766
767 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
768 writer.sync().unwrap();
769 assert_eq!(writer.last_sync_method, Some(WalSyncMethod::All));
770
771 writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
772 let sync = writer.drain_for_group_sync().unwrap();
773 assert_eq!(sync.method, WalSyncMethod::Data);
774 sync.sync().unwrap();
775 writer.mark_durable(&sync);
776
777 assert_eq!(writer.last_sync_method, Some(WalSyncMethod::Data));
778 assert_eq!(writer.durable_lsn(), writer.current_lsn());
779 }
780
781 #[test]
782 fn truncate_resets_durable_lsn() {
783 let (_guard, path) = temp_wal("truncate_durable");
784 let mut writer = WalWriter::open(&path).unwrap();
785 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
786 writer.sync().unwrap();
787 assert!(writer.durable_lsn() > 8);
788 writer.truncate().unwrap();
789 assert_eq!(writer.durable_lsn(), 8);
790 assert_eq!(writer.current_lsn(), 8);
791 }
792
793 #[test]
794 fn reopen_initialises_durable_to_current() {
795 let (_guard, path) = temp_wal("reopen_durable");
796 {
797 let mut writer = WalWriter::open(&path).unwrap();
798 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
799 writer.sync().unwrap();
800 }
801 let writer = WalWriter::open(&path).unwrap();
802 assert_eq!(writer.durable_lsn(), writer.current_lsn());
804 }
805
806 #[test]
811 fn bufwriter_coalesces_until_sync() {
812 let (_guard, path) = temp_wal("bufwriter_coalesce");
816 let mut writer = WalWriter::open(&path).unwrap();
817 for tx in 0..100u64 {
818 writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
819 }
820 assert_eq!(writer.current_lsn(), 8 + 100 * 21);
822 let on_disk = std::fs::metadata(&path).unwrap().len();
824 assert_eq!(on_disk, 8, "BufWriter leaked bytes to disk before sync");
825 }
826
827 #[test]
828 fn sync_drains_bufwriter_before_fsync() {
829 let (_guard, path) = temp_wal("sync_drains");
832 let mut writer = WalWriter::open(&path).unwrap();
833 for tx in 0..50u64 {
834 writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
835 }
836 writer.sync().unwrap();
837 let on_disk = std::fs::metadata(&path).unwrap().len();
838 assert_eq!(on_disk, writer.current_lsn());
839 assert_eq!(writer.durable_lsn(), writer.current_lsn());
840 }
841
842 #[test]
843 fn flush_until_drains_bufwriter_too() {
844 let (_guard, path) = temp_wal("flush_until_drains");
848 let mut writer = WalWriter::open(&path).unwrap();
849 for tx in 0..30u64 {
850 writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
851 }
852 let target = writer.current_lsn();
853 writer.flush_until(target).unwrap();
854 let on_disk = std::fs::metadata(&path).unwrap().len();
855 assert_eq!(on_disk, target);
856 assert_eq!(writer.durable_lsn(), target);
857 }
858
859 #[test]
860 fn truncate_drains_pending_bufwriter_bytes_first() {
861 let (_guard, path) = temp_wal("truncate_drain");
866 let mut writer = WalWriter::open(&path).unwrap();
867 for tx in 0..200u64 {
870 writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
871 }
872 assert_eq!(std::fs::metadata(&path).unwrap().len(), 8);
874
875 writer.truncate().unwrap();
876 let on_disk = std::fs::metadata(&path).unwrap().len();
878 assert_eq!(on_disk, 8);
879 assert_eq!(writer.current_lsn(), 8);
880 assert_eq!(writer.durable_lsn(), 8);
881
882 writer.append(&WalRecord::Begin { tx_id: 99 }).unwrap();
884 writer.sync().unwrap();
885 assert_eq!(std::fs::metadata(&path).unwrap().len(), 8 + 21);
886 }
887
888 #[test]
889 fn reopen_sees_only_synced_records() {
890 let (_guard, path) = temp_wal("reopen_synced_only");
899 let synced_lsn;
900 {
901 let mut writer = WalWriter::open(&path).unwrap();
902 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
903 writer.sync().unwrap();
904 synced_lsn = writer.current_lsn();
905 for tx in 100..120u64 {
908 writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
909 }
910 }
916 let writer = WalWriter::open(&path).unwrap();
917 assert!(writer.durable_lsn() >= synced_lsn);
922 }
923
924 fn allocated_bytes(path: &std::path::Path) -> u64 {
931 use fs2::FileExt;
932 let f = std::fs::File::open(path).unwrap();
933 f.allocated_size().unwrap()
934 }
935
936 #[test]
937 fn segment_boundary_rounds_strictly_up() {
938 assert_eq!(
941 reddb_file::next_main_wal_segment_boundary(0),
942 reddb_file::MAIN_WAL_SEGMENT_BYTES
943 );
944 assert_eq!(
945 reddb_file::next_main_wal_segment_boundary(8),
946 reddb_file::MAIN_WAL_SEGMENT_BYTES
947 );
948 assert_eq!(
949 reddb_file::next_main_wal_segment_boundary(reddb_file::MAIN_WAL_SEGMENT_BYTES - 1),
950 reddb_file::MAIN_WAL_SEGMENT_BYTES
951 );
952 assert_eq!(
954 reddb_file::next_main_wal_segment_boundary(reddb_file::MAIN_WAL_SEGMENT_BYTES),
955 2 * reddb_file::MAIN_WAL_SEGMENT_BYTES
956 );
957 assert_eq!(
958 reddb_file::next_main_wal_segment_boundary(reddb_file::MAIN_WAL_SEGMENT_BYTES + 1),
959 2 * reddb_file::MAIN_WAL_SEGMENT_BYTES
960 );
961 }
962
963 #[test]
964 fn open_preallocates_first_segment() {
965 let (_guard, path) = temp_wal("prealloc_open");
968 let writer = WalWriter::open(&path).unwrap();
969 if !writer.prealloc_supported {
970 return; }
972 assert_eq!(writer.preallocated_to, reddb_file::MAIN_WAL_SEGMENT_BYTES);
973 assert!(allocated_bytes(&path) >= reddb_file::MAIN_WAL_SEGMENT_BYTES);
976 assert_eq!(std::fs::metadata(&path).unwrap().len(), 8);
977 }
978
979 #[test]
980 fn preallocation_does_not_grow_logical_length() {
981 let (_guard, path) = temp_wal("prealloc_logical");
987 let mut writer = WalWriter::open(&path).unwrap();
988 for tx in 0..50u64 {
989 writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
990 }
991 writer.sync().unwrap();
992 let logical = std::fs::metadata(&path).unwrap().len();
993 assert_eq!(logical, 8 + 50 * 21, "preallocation inflated i_size");
994 assert_eq!(writer.current_lsn(), logical);
995 }
996
997 #[test]
998 fn truncate_re_extends_a_fresh_segment() {
999 let (_guard, path) = temp_wal("prealloc_truncate");
1002 let mut writer = WalWriter::open(&path).unwrap();
1003 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
1004 writer.sync().unwrap();
1005
1006 writer.truncate().unwrap();
1007
1008 assert_eq!(writer.current_lsn(), 8);
1009 assert_eq!(std::fs::metadata(&path).unwrap().len(), 8);
1010 if writer.prealloc_supported {
1011 assert_eq!(writer.preallocated_to, reddb_file::MAIN_WAL_SEGMENT_BYTES);
1012 assert!(allocated_bytes(&path) >= reddb_file::MAIN_WAL_SEGMENT_BYTES);
1013 }
1014 }
1015
1016 #[test]
1017 fn preallocated_wal_recovers_records_without_trailing_garbage() {
1018 use super::super::reader::WalReader;
1022 let (_guard, path) = temp_wal("prealloc_recover");
1023 {
1024 let mut writer = WalWriter::open(&path).unwrap();
1025 writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
1026 writer
1027 .append(&WalRecord::PageWrite {
1028 tx_id: 1,
1029 page_id: 7,
1030 data: vec![1, 2, 3, 4],
1031 })
1032 .unwrap();
1033 writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
1034 writer.sync().unwrap();
1035 }
1036 let records: Vec<_> = WalReader::open(&path)
1037 .unwrap()
1038 .iter()
1039 .collect::<Result<_, _>>()
1040 .expect("reader must stop cleanly at real EOF, not in reserved tail");
1041 assert_eq!(records.len(), 3);
1042 assert_eq!(records[0].1, WalRecord::Begin { tx_id: 1 });
1043 assert_eq!(records[2].1, WalRecord::Commit { tx_id: 1 });
1044 }
1045}