1use std::fs;
30#[cfg(test)]
31use std::fs::OpenOptions;
32use std::path::Path;
33use std::sync::atomic::{AtomicBool, Ordering};
34use std::sync::{Arc, Mutex, Weak};
35use std::thread::{self, JoinHandle};
36use std::time::Duration;
37
38use lora_store::MutationEvent;
39
40use crate::config::SyncMode;
41use crate::dir::{SegmentDir, SegmentId};
42use crate::error::WalError;
43use crate::lock::DirLock;
44use crate::lsn::Lsn;
45use crate::record::WalRecord;
46use crate::replay::{replay_segments, ReplayOutcome};
47use crate::segment::SegmentWriter;
48
49struct WalState {
53 next_lsn: Lsn,
54 durable_lsn: Lsn,
55 active_segment_id: SegmentId,
56 active_writer: SegmentWriter,
57 oldest_segment_id: SegmentId,
59}
60
61type BgFailure = Mutex<Option<String>>;
69
70#[derive(Debug, Clone, Copy)]
75enum FlushKind {
76 PerConfiguredMode,
79 ForceFsync,
82}
83
84pub struct Wal {
94 segments: SegmentDir,
95 sync_mode: SyncMode,
96 segment_target_bytes: u64,
97 state: Mutex<WalState>,
98 bg_failure: Arc<BgFailure>,
102 _flusher: Mutex<Option<GroupFlusherHandle>>,
106 _dir_lock: DirLock,
109}
110
111impl Wal {
112 pub fn open(
123 dir: impl Into<std::path::PathBuf>,
124 sync_mode: SyncMode,
125 segment_target_bytes: u64,
126 checkpoint_lsn: Lsn,
127 ) -> Result<(Arc<Self>, Vec<MutationEvent>), WalError> {
128 let segments = SegmentDir::new(dir);
129 fs::create_dir_all(segments.root())?;
130 let dir_lock = DirLock::acquire(segments.root())?;
131
132 let entries = segments.list()?;
133 let (active_id, active_writer, replay) = if entries.is_empty() {
134 Self::open_fresh(&segments)?
135 } else {
136 Self::open_existing(&segments, &entries, checkpoint_lsn)?
137 };
138
139 let next_lsn = if replay.max_lsn.is_zero() {
140 Lsn::new(1)
141 } else {
142 replay.max_lsn.next()
143 };
144 let durable_lsn = replay.max_lsn;
150
151 let oldest_segment_id = entries.first().map(|e| e.id).unwrap_or(active_id);
152
153 let state = WalState {
154 next_lsn,
155 durable_lsn,
156 active_segment_id: active_id,
157 active_writer,
158 oldest_segment_id,
159 };
160
161 let wal = Arc::new(Self {
162 segments,
163 sync_mode,
164 segment_target_bytes,
165 state: Mutex::new(state),
166 bg_failure: Arc::new(Mutex::new(None)),
167 _flusher: Mutex::new(None),
168 _dir_lock: dir_lock,
169 });
170
171 if let SyncMode::Group { interval_ms } = sync_mode {
176 let interval = Duration::from_millis(u64::from(interval_ms.max(1)));
177 let handle = spawn_group_flusher(Arc::downgrade(&wal), interval);
178 *wal._flusher.lock().unwrap() = Some(handle);
179 }
180
181 Ok((wal, replay.committed_events))
182 }
183
184 fn open_fresh(
187 segments: &SegmentDir,
188 ) -> Result<(SegmentId, SegmentWriter, ReplayOutcome), WalError> {
189 let id = SegmentId::FIRST;
190 let writer = SegmentWriter::create(segments.path_for(id), Lsn::new(1))?;
191 segments.sync_dir()?;
192 let replay = ReplayOutcome {
193 committed_events: Vec::new(),
194 max_lsn: Lsn::ZERO,
195 torn_tail: None,
196 checkpoint_lsn_observed: None,
197 };
198 Ok((id, writer, replay))
199 }
200
201 fn open_existing(
205 segments: &SegmentDir,
206 entries: &[crate::dir::SegmentEntry],
207 checkpoint_lsn: Lsn,
208 ) -> Result<(SegmentId, SegmentWriter, ReplayOutcome), WalError> {
209 let paths: Vec<_> = entries.iter().map(|e| e.path.clone()).collect();
210 let replay = replay_segments(&paths, checkpoint_lsn)?;
211
212 let active = entries.last().expect("entries non-empty in open_existing");
216 let (mut writer, _torn_from_writer) =
217 SegmentWriter::open_for_append(segments.path_for(active.id))?;
218
219 if let Some(t) = &replay.torn_tail {
223 if t.segment_path == active.path {
224 writer.truncate_to(t.last_good_offset)?;
225 } else {
226 return Err(WalError::Malformed(format!(
227 "torn tail found in sealed segment {}",
228 t.segment_path.display()
229 )));
230 }
231 }
232
233 Ok((active.id, writer, replay))
234 }
235
236 pub fn dir(&self) -> &Path {
237 self.segments.root()
238 }
239
240 pub fn sync_mode(&self) -> SyncMode {
241 self.sync_mode
242 }
243
244 pub fn durable_lsn(&self) -> Lsn {
245 self.state.lock().unwrap().durable_lsn
246 }
247
248 pub fn bg_failure(&self) -> Option<String> {
255 self.bg_failure.lock().unwrap().clone()
256 }
257
258 fn check_healthy(&self) -> Result<(), WalError> {
259 if self.bg_failure.lock().unwrap().is_some() {
260 return Err(WalError::Poisoned);
261 }
262 Ok(())
263 }
264
265 pub fn next_lsn(&self) -> Lsn {
269 self.state.lock().unwrap().next_lsn
270 }
271
272 pub fn oldest_segment_id(&self) -> u64 {
273 self.state.lock().unwrap().oldest_segment_id.raw()
274 }
275
276 pub fn active_segment_id(&self) -> u64 {
277 self.state.lock().unwrap().active_segment_id.raw()
278 }
279
280 pub fn begin(&self) -> Result<Lsn, WalError> {
290 self.check_healthy()?;
291 let mut state = self.state.lock().unwrap();
292 self.maybe_rotate(&mut state)?;
293 Self::alloc_and_append(&mut state, |lsn| WalRecord::TxBegin { lsn })
294 }
295
296 pub fn append(&self, tx_begin_lsn: Lsn, event: &MutationEvent) -> Result<Lsn, WalError> {
299 self.check_healthy()?;
300 let mut state = self.state.lock().unwrap();
301 Self::alloc_and_append(&mut state, |lsn| WalRecord::Mutation {
302 lsn,
303 tx_begin_lsn,
304 event: event.clone(),
305 })
306 }
307
308 pub fn commit(&self, tx_begin_lsn: Lsn) -> Result<Lsn, WalError> {
312 self.check_healthy()?;
313 let mut state = self.state.lock().unwrap();
314 Self::alloc_and_append(&mut state, |lsn| WalRecord::TxCommit { lsn, tx_begin_lsn })
315 }
316
317 pub fn abort(&self, tx_begin_lsn: Lsn) -> Result<Lsn, WalError> {
320 self.check_healthy()?;
321 let mut state = self.state.lock().unwrap();
322 Self::alloc_and_append(&mut state, |lsn| WalRecord::TxAbort { lsn, tx_begin_lsn })
323 }
324
325 pub fn checkpoint_marker(&self, snapshot_lsn: Lsn) -> Result<Lsn, WalError> {
329 self.check_healthy()?;
330 let mut state = self.state.lock().unwrap();
331 Self::alloc_and_append(&mut state, |lsn| WalRecord::Checkpoint {
332 lsn,
333 snapshot_lsn,
334 })
335 }
336
337 #[inline]
343 fn alloc_and_append(
344 state: &mut WalState,
345 build: impl FnOnce(Lsn) -> WalRecord,
346 ) -> Result<Lsn, WalError> {
347 let lsn = state.next_lsn;
348 state.next_lsn = lsn.next();
349 state.active_writer.append(&build(lsn))?;
350 Ok(lsn)
351 }
352
353 pub fn flush(&self) -> Result<(), WalError> {
368 self.check_healthy()?;
369 self.flush_inner(FlushKind::PerConfiguredMode)
370 }
371
372 pub fn force_fsync(&self) -> Result<(), WalError> {
378 self.check_healthy()?;
379 self.flush_inner(FlushKind::ForceFsync)
380 }
381
382 fn flush_inner(&self, kind: FlushKind) -> Result<(), WalError> {
386 let mut state = self.state.lock().unwrap();
387 let written_lsn = Lsn::new(state.next_lsn.raw().saturating_sub(1));
388
389 let do_fsync = matches!(
394 (kind, self.sync_mode),
395 (FlushKind::ForceFsync, _) | (_, SyncMode::PerCommit)
396 );
397 let advance_durable = matches!(
398 (kind, self.sync_mode),
399 (FlushKind::ForceFsync, _) | (_, SyncMode::PerCommit) | (_, SyncMode::None)
400 );
401
402 if do_fsync {
403 state.active_writer.flush_and_sync()?;
404 } else {
405 state.active_writer.flush_buffer()?;
406 }
407 if advance_durable {
408 state.durable_lsn = written_lsn;
409 }
410 Ok(())
411 }
412
413 pub fn truncate_up_to(&self, fence_lsn: Lsn) -> Result<(), WalError> {
422 let mut state = self.state.lock().unwrap();
423 let active_id = state.active_segment_id;
424 let entries = self.segments.list()?;
425
426 let mut to_drop: Vec<crate::dir::SegmentEntry> = Vec::new();
427 for (i, entry) in entries.iter().enumerate() {
428 if entry.id >= active_id.saturating_prev() {
431 break;
432 }
433 let next = match entries.get(i + 1) {
436 Some(n) => n,
437 None => break,
438 };
439 let next_base = SegmentDir::base_lsn(&next.path)?;
440 if next_base.raw().saturating_sub(1) <= fence_lsn.raw() {
441 to_drop.push(entry.clone());
442 }
443 }
444
445 for entry in to_drop {
446 fs::remove_file(&entry.path)?;
447 if entry.id >= state.oldest_segment_id {
448 state.oldest_segment_id = entry.id.next();
449 }
450 }
451 if state.oldest_segment_id != entries.first().map(|e| e.id).unwrap_or(active_id) {
452 self.segments.sync_dir()?;
453 }
454 Ok(())
455 }
456
457 fn maybe_rotate(&self, state: &mut WalState) -> Result<(), WalError> {
461 if state.active_writer.bytes_written() < self.segment_target_bytes {
462 return Ok(());
463 }
464 state.active_writer.flush_and_sync()?;
468 state.active_writer.seal()?;
469
470 let next_id = state.active_segment_id.next();
471 let writer = SegmentWriter::create(self.segments.path_for(next_id), state.next_lsn)?;
472 self.segments.sync_dir()?;
473 state.active_writer = writer;
474 state.active_segment_id = next_id;
475 Ok(())
476 }
477}
478
479impl Drop for Wal {
480 fn drop(&mut self) {
481 if let Ok(slot) = self._flusher.get_mut() {
485 let _ = slot.take();
486 }
487 }
488}
489
490struct GroupFlusherHandle {
499 shutdown: Arc<AtomicBool>,
500 handle: Option<JoinHandle<()>>,
501}
502
503impl Drop for GroupFlusherHandle {
504 fn drop(&mut self) {
505 self.shutdown.store(true, Ordering::Release);
506 if let Some(h) = self.handle.take() {
507 let _ = h.join();
512 }
513 }
514}
515
516fn spawn_group_flusher(weak: Weak<Wal>, interval: Duration) -> GroupFlusherHandle {
517 let shutdown = Arc::new(AtomicBool::new(false));
518 let shutdown_clone = Arc::clone(&shutdown);
519 let handle = thread::spawn(move || {
520 while !shutdown_clone.load(Ordering::Acquire) {
525 let slice = Duration::from_millis(50).min(interval);
530 let mut elapsed = Duration::ZERO;
531 while elapsed < interval && !shutdown_clone.load(Ordering::Acquire) {
532 thread::sleep(slice);
533 elapsed += slice;
534 }
535 if shutdown_clone.load(Ordering::Acquire) {
536 break;
537 }
538 match weak.upgrade() {
539 Some(wal) => {
540 if let Err(err) = wal.flush_inner(FlushKind::ForceFsync) {
549 let mut slot = wal.bg_failure.lock().unwrap();
550 if slot.is_none() {
551 *slot = Some(format!("bg fsync failed: {err}"));
552 }
553 break;
554 }
555 }
556 None => break,
557 }
558 }
559 });
560 GroupFlusherHandle {
561 shutdown,
562 handle: Some(handle),
563 }
564}
565
566#[cfg(test)]
567mod tests {
568 use super::*;
569 use lora_store::{MutationEvent, Properties, PropertyValue};
570
571 use crate::testing::TmpDir;
572
573 fn ev(id: u64) -> MutationEvent {
574 let mut p = Properties::new();
575 p.insert("v".into(), PropertyValue::Int(id as i64));
576 MutationEvent::CreateNode {
577 id,
578 labels: vec!["N".into()],
579 properties: p,
580 }
581 }
582
583 fn open_default(dir: &Path) -> (Arc<Wal>, Vec<MutationEvent>) {
584 Wal::open(dir, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO).unwrap()
585 }
586
587 #[test]
588 fn fresh_open_creates_first_segment() {
589 let dir = TmpDir::new("fresh");
590 let (wal, replay) = open_default(&dir.path);
591 assert!(replay.is_empty());
592 assert_eq!(wal.next_lsn(), Lsn::new(1));
593 assert_eq!(wal.active_segment_id(), 1);
594 let entries: Vec<_> = std::fs::read_dir(&dir.path)
597 .unwrap()
598 .filter_map(|e| e.ok())
599 .map(|e| e.file_name().to_string_lossy().into_owned())
600 .collect();
601 assert!(
602 entries.iter().any(|n| n == ".lora-wal.lock"),
603 "WAL dir should contain the live directory lock, found: {entries:?}"
604 );
605 assert!(
606 entries
607 .iter()
608 .filter(|n| n.as_str() != ".lora-wal.lock")
609 .all(|n| n.ends_with(".wal")),
610 "WAL dir should contain only segment files plus the lock, found: {entries:?}"
611 );
612 }
613
614 #[test]
615 fn opening_same_directory_twice_fails_until_first_handle_drops() {
616 let dir = TmpDir::new("exclusive");
617 let (wal, _) = open_default(&dir.path);
618
619 match Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO) {
620 Err(WalError::AlreadyOpen { dir: locked_dir }) => {
621 assert_eq!(locked_dir, dir.path);
622 }
623 Err(err) => panic!("expected AlreadyOpen, got {err:?}"),
624 Ok(_) => panic!("second WAL open on same directory should fail"),
625 }
626
627 drop(wal);
628 let (reopened, _) = open_default(&dir.path);
629 drop(reopened);
630 }
631
632 #[test]
633 fn begin_append_commit_round_trip_through_replay() {
634 let dir = TmpDir::new("commit");
635
636 {
639 let (wal, _) = open_default(&dir.path);
640 let begin = wal.begin().unwrap();
641 wal.append(begin, &ev(1)).unwrap();
642 wal.append(begin, &ev(2)).unwrap();
643 wal.commit(begin).unwrap();
644 wal.flush().unwrap();
645
646 let begin = wal.begin().unwrap();
647 wal.append(begin, &ev(3)).unwrap();
648 wal.commit(begin).unwrap();
649 wal.flush().unwrap();
650 }
652
653 let (wal, replay) = open_default(&dir.path);
655 assert_eq!(replay.len(), 3);
656 assert_eq!(replay[0], ev(1));
657 assert_eq!(replay[1], ev(2));
658 assert_eq!(replay[2], ev(3));
659 assert_eq!(wal.next_lsn(), Lsn::new(8));
662 }
663
664 #[test]
665 fn aborted_transaction_is_dropped_on_replay() {
666 let dir = TmpDir::new("abort");
667
668 {
669 let (wal, _) = open_default(&dir.path);
670 let b1 = wal.begin().unwrap();
671 wal.append(b1, &ev(1)).unwrap();
672 wal.commit(b1).unwrap();
673 wal.flush().unwrap();
674
675 let b2 = wal.begin().unwrap();
676 wal.append(b2, &ev(99)).unwrap();
677 wal.abort(b2).unwrap();
678 wal.flush().unwrap();
679 }
680
681 let (_, replay) = open_default(&dir.path);
682 assert_eq!(replay, vec![ev(1)]);
683 }
684
685 #[test]
686 fn uncommitted_transaction_at_end_of_log_is_discarded() {
687 let dir = TmpDir::new("uncommitted");
688
689 {
690 let (wal, _) = open_default(&dir.path);
691 let b1 = wal.begin().unwrap();
692 wal.append(b1, &ev(1)).unwrap();
693 wal.commit(b1).unwrap();
694 wal.flush().unwrap();
695
696 let b2 = wal.begin().unwrap();
699 wal.append(b2, &ev(99)).unwrap();
700 wal.flush().unwrap();
701 }
702
703 let (_, replay) = open_default(&dir.path);
704 assert_eq!(replay, vec![ev(1)]);
705 }
706
707 #[test]
708 fn segment_rotation_at_begin_boundary() {
709 let dir = TmpDir::new("rotate");
710
711 let (wal, _) = Wal::open(&dir.path, SyncMode::PerCommit, 256, Lsn::ZERO).unwrap();
714
715 let b1 = wal.begin().unwrap();
717 for i in 0..5 {
718 wal.append(b1, &ev(i)).unwrap();
719 }
720 wal.commit(b1).unwrap();
721 wal.flush().unwrap();
722 assert_eq!(wal.active_segment_id(), 1);
723
724 let b2 = wal.begin().unwrap();
726 wal.append(b2, &ev(100)).unwrap();
727 wal.commit(b2).unwrap();
728 wal.flush().unwrap();
729 assert_eq!(
730 wal.active_segment_id(),
731 2,
732 "begin() should have rotated to segment 2"
733 );
734
735 let segments = SegmentDir::new(&dir.path).list().unwrap();
736 assert_eq!(segments.len(), 2);
737
738 drop(wal);
739 let (_, replay) = open_default(&dir.path);
740 assert_eq!(replay.len(), 6);
741 }
742
743 #[test]
744 fn checkpoint_lsn_skips_already_checkpointed_events() {
745 let dir = TmpDir::new("ckpt-skip");
746 let (wal, _) = open_default(&dir.path);
747
748 let a = wal.begin().unwrap();
750 wal.append(a, &ev(1)).unwrap();
751 wal.append(a, &ev(2)).unwrap();
752 let commit_a = wal.commit(a).unwrap();
753 wal.flush().unwrap();
754
755 let b = wal.begin().unwrap();
757 wal.append(b, &ev(3)).unwrap();
758 wal.commit(b).unwrap();
759 wal.flush().unwrap();
760 drop(wal);
761
762 let (_, replay) =
765 Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, commit_a).unwrap();
766 assert_eq!(replay, vec![ev(3)]);
767 }
768
769 #[test]
770 fn replay_rejects_commit_without_begin() {
771 let dir = TmpDir::new("commit-without-begin");
772
773 {
774 let (wal, _) = open_default(&dir.path);
775 wal.commit(Lsn::new(99)).unwrap();
776 wal.flush().unwrap();
777 }
778
779 let err = match Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO) {
780 Ok(_) => panic!("malformed WAL should not open"),
781 Err(err) => err,
782 };
783 assert!(
784 matches!(err, WalError::Malformed(ref msg) if msg.contains("missing tx begin")),
785 "expected malformed missing-begin error, got {err:?}"
786 );
787 }
788
789 #[test]
790 fn replay_rejects_mutation_without_begin() {
791 let dir = TmpDir::new("mutation-without-begin");
792
793 {
794 let (wal, _) = open_default(&dir.path);
795 wal.append(Lsn::new(99), &ev(1)).unwrap();
796 wal.flush().unwrap();
797 }
798
799 let err = match Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO) {
800 Ok(_) => panic!("malformed WAL should not open"),
801 Err(err) => err,
802 };
803 assert!(
804 matches!(err, WalError::Malformed(ref msg) if msg.contains("missing tx begin")),
805 "expected malformed missing-begin error, got {err:?}"
806 );
807 }
808
809 #[test]
810 fn torn_tail_is_truncated_on_open() {
811 let dir = TmpDir::new("torn");
812
813 {
814 let (wal, _) = open_default(&dir.path);
815 let b = wal.begin().unwrap();
816 wal.append(b, &ev(1)).unwrap();
817 wal.commit(b).unwrap();
818 wal.flush().unwrap();
819 }
820
821 let segments = SegmentDir::new(&dir.path).list().unwrap();
823 let active = &segments.last().unwrap().path;
824 {
825 use std::io::Write;
826 let mut f = OpenOptions::new().append(true).open(active).unwrap();
827 f.write_all(&[0xff; 32]).unwrap();
828 f.sync_all().unwrap();
829 }
830
831 let (wal, replay) = open_default(&dir.path);
834 assert_eq!(replay, vec![ev(1)]);
835
836 let b = wal.begin().unwrap();
838 wal.append(b, &ev(2)).unwrap();
839 wal.commit(b).unwrap();
840 wal.flush().unwrap();
841 drop(wal);
842
843 let (_, replay) = open_default(&dir.path);
844 assert_eq!(replay, vec![ev(1), ev(2)]);
845 }
846
847 #[test]
848 fn checkpoint_marker_is_recorded_and_observed() {
849 let dir = TmpDir::new("ckpt-marker");
850
851 let snapshot_lsn = {
852 let (wal, _) = open_default(&dir.path);
853 let b = wal.begin().unwrap();
854 wal.append(b, &ev(1)).unwrap();
855 let commit = wal.commit(b).unwrap();
856 wal.flush().unwrap();
857 wal.checkpoint_marker(commit).unwrap();
858 wal.flush().unwrap();
859 commit
860 };
861
862 let outcome = crate::replay::replay_dir(&dir.path, Lsn::ZERO).unwrap();
863 assert_eq!(
864 outcome.checkpoint_lsn_observed,
865 Some(snapshot_lsn),
866 "checkpoint marker should be surfaced by replay"
867 );
868 }
869
870 #[test]
871 fn group_mode_durable_lsn_advances_via_bg_flusher() {
872 let dir = TmpDir::new("group");
873 let (wal, _) = Wal::open(
876 &dir.path,
877 SyncMode::Group { interval_ms: 25 },
878 8 * 1024 * 1024,
879 Lsn::ZERO,
880 )
881 .unwrap();
882
883 let begin = wal.begin().unwrap();
884 wal.append(begin, &ev(1)).unwrap();
885 wal.commit(begin).unwrap();
886 wal.flush().unwrap(); assert_eq!(
891 wal.durable_lsn(),
892 Lsn::ZERO,
893 "Group flush() must not advance durable_lsn"
894 );
895
896 let deadline = std::time::Instant::now() + std::time::Duration::from_millis(500);
898 loop {
899 if wal.durable_lsn() > Lsn::ZERO {
900 break;
901 }
902 if std::time::Instant::now() >= deadline {
903 panic!(
904 "bg flusher did not advance durable_lsn within 500 ms (still at {})",
905 wal.durable_lsn()
906 );
907 }
908 std::thread::sleep(std::time::Duration::from_millis(10));
909 }
910 assert_eq!(wal.durable_lsn().raw(), wal.next_lsn().raw() - 1);
911 drop(wal);
913 }
914
915 #[test]
916 fn none_mode_advances_durable_lsn_on_flush() {
917 let dir = TmpDir::new("none");
918 let (wal, _) = Wal::open(&dir.path, SyncMode::None, 8 * 1024 * 1024, Lsn::ZERO).unwrap();
919
920 let begin = wal.begin().unwrap();
921 wal.append(begin, &ev(1)).unwrap();
922 wal.commit(begin).unwrap();
923 wal.flush().unwrap();
924
925 assert_eq!(wal.durable_lsn().raw(), wal.next_lsn().raw() - 1);
928 }
929
930 #[test]
931 fn force_fsync_always_advances_durable_lsn() {
932 let dir = TmpDir::new("force-fsync");
933 let (wal, _) = Wal::open(
934 &dir.path,
935 SyncMode::Group {
936 interval_ms: 60_000,
937 },
938 8 * 1024 * 1024,
939 Lsn::ZERO,
940 )
941 .unwrap();
942
943 let begin = wal.begin().unwrap();
944 wal.append(begin, &ev(1)).unwrap();
945 wal.commit(begin).unwrap();
946 wal.flush().unwrap(); assert_eq!(wal.durable_lsn(), Lsn::ZERO);
948
949 wal.force_fsync().unwrap();
952 assert_eq!(wal.durable_lsn().raw(), wal.next_lsn().raw() - 1);
953 }
954
955 #[test]
956 fn truncate_up_to_drops_old_sealed_segments() {
957 let dir = TmpDir::new("truncate");
958
959 let (wal, _) = Wal::open(&dir.path, SyncMode::PerCommit, 64, Lsn::ZERO).unwrap();
961
962 let mut last_commit = Lsn::ZERO;
963 for i in 0..5 {
964 let b = wal.begin().unwrap();
965 wal.append(b, &ev(i)).unwrap();
966 last_commit = wal.commit(b).unwrap();
967 wal.flush().unwrap();
968 }
969 assert!(
971 wal.active_segment_id() >= 4,
972 "expected several rotations, got {}",
973 wal.active_segment_id()
974 );
975
976 let segments = SegmentDir::new(&dir.path);
977 let before = segments.list().unwrap().len();
978 wal.truncate_up_to(last_commit).unwrap();
979 let after = segments.list().unwrap().len();
980
981 assert!(
982 after < before,
983 "truncate_up_to should have dropped at least one segment ({} → {})",
984 before,
985 after
986 );
987 assert!(
989 after >= 2,
990 "active and the segment preceding it must be kept"
991 );
992
993 drop(wal);
998 let (_, replay) = Wal::open(&dir.path, SyncMode::PerCommit, 64, last_commit).unwrap();
999 assert!(replay.is_empty());
1001 }
1002}