1use std::fs;
29#[cfg(test)]
30use std::fs::OpenOptions;
31use std::path::Path;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::sync::{Arc, Mutex, Weak};
34use std::thread::{self, JoinHandle};
35use std::time::Duration;
36
37use lora_store::MutationEvent;
38
39use crate::config::SyncMode;
40use crate::dir::{SegmentDir, SegmentId};
41use crate::error::WalError;
42use crate::lock::DirLock;
43use crate::lsn::Lsn;
44use crate::record::WalRecord;
45use crate::replay::{replay_segments, ReplayOutcome};
46use crate::segment::SegmentWriter;
47
48struct WalState {
52 next_lsn: Lsn,
53 durable_lsn: Lsn,
54 active_segment_id: SegmentId,
55 active_writer: SegmentWriter,
56 oldest_segment_id: SegmentId,
58}
59
60type BgFailure = Mutex<Option<String>>;
68
69#[derive(Debug, Clone, Copy)]
74enum FlushKind {
75 PerConfiguredMode,
78 ForceFsync,
81}
82
83pub struct Wal {
93 segments: SegmentDir,
94 sync_mode: SyncMode,
95 segment_target_bytes: u64,
96 state: Mutex<WalState>,
97 bg_failure: Arc<BgFailure>,
101 _flusher: Mutex<Option<GroupFlusherHandle>>,
105 _dir_lock: DirLock,
108}
109
110impl Wal {
111 pub fn open(
122 dir: impl Into<std::path::PathBuf>,
123 sync_mode: SyncMode,
124 segment_target_bytes: u64,
125 checkpoint_lsn: Lsn,
126 ) -> Result<(Arc<Self>, Vec<MutationEvent>), WalError> {
127 let segments = SegmentDir::new(dir);
128 fs::create_dir_all(segments.root())?;
129 let dir_lock = DirLock::acquire(segments.root())?;
130
131 let entries = segments.list()?;
132 let (active_id, active_writer, replay) = if entries.is_empty() {
133 Self::open_fresh(&segments)?
134 } else {
135 Self::open_existing(&segments, &entries, checkpoint_lsn)?
136 };
137
138 let next_lsn = if replay.max_lsn.is_zero() {
139 Lsn::new(1)
140 } else {
141 replay.max_lsn.next()
142 };
143 let durable_lsn = replay.max_lsn;
149
150 let oldest_segment_id = entries.first().map(|e| e.id).unwrap_or(active_id);
151
152 let state = WalState {
153 next_lsn,
154 durable_lsn,
155 active_segment_id: active_id,
156 active_writer,
157 oldest_segment_id,
158 };
159
160 let wal = Arc::new(Self {
161 segments,
162 sync_mode,
163 segment_target_bytes,
164 state: Mutex::new(state),
165 bg_failure: Arc::new(Mutex::new(None)),
166 _flusher: Mutex::new(None),
167 _dir_lock: dir_lock,
168 });
169
170 if let SyncMode::Group { interval_ms } = sync_mode {
175 let interval = Duration::from_millis(u64::from(interval_ms.max(1)));
176 let handle = spawn_group_flusher(Arc::downgrade(&wal), interval);
177 *wal._flusher.lock().unwrap() = Some(handle);
178 }
179
180 Ok((wal, replay.committed_events))
181 }
182
183 fn open_fresh(
186 segments: &SegmentDir,
187 ) -> Result<(SegmentId, SegmentWriter, ReplayOutcome), WalError> {
188 let id = SegmentId::FIRST;
189 let writer = SegmentWriter::create(segments.path_for(id), Lsn::new(1))?;
190 segments.sync_dir()?;
191 let replay = ReplayOutcome {
192 committed_events: Vec::new(),
193 max_lsn: Lsn::ZERO,
194 torn_tail: None,
195 checkpoint_lsn_observed: None,
196 };
197 Ok((id, writer, replay))
198 }
199
200 fn open_existing(
204 segments: &SegmentDir,
205 entries: &[crate::dir::SegmentEntry],
206 checkpoint_lsn: Lsn,
207 ) -> Result<(SegmentId, SegmentWriter, ReplayOutcome), WalError> {
208 let paths: Vec<_> = entries.iter().map(|e| e.path.clone()).collect();
209 let replay = replay_segments(&paths, checkpoint_lsn)?;
210
211 let active = entries.last().expect("entries non-empty in open_existing");
215 let (mut writer, _torn_from_writer) =
216 SegmentWriter::open_for_append(segments.path_for(active.id))?;
217
218 if let Some(t) = &replay.torn_tail {
222 if t.segment_path == active.path {
223 writer.truncate_to(t.last_good_offset)?;
224 } else {
225 return Err(WalError::Malformed(format!(
226 "torn tail found in sealed segment {}",
227 t.segment_path.display()
228 )));
229 }
230 }
231
232 Ok((active.id, writer, replay))
233 }
234
235 pub fn dir(&self) -> &Path {
236 self.segments.root()
237 }
238
239 pub fn sync_mode(&self) -> SyncMode {
240 self.sync_mode
241 }
242
243 pub fn durable_lsn(&self) -> Lsn {
244 self.state.lock().unwrap().durable_lsn
245 }
246
247 pub fn bg_failure(&self) -> Option<String> {
254 self.bg_failure.lock().unwrap().clone()
255 }
256
257 fn check_healthy(&self) -> Result<(), WalError> {
258 if self.bg_failure.lock().unwrap().is_some() {
259 return Err(WalError::Poisoned);
260 }
261 Ok(())
262 }
263
264 pub fn next_lsn(&self) -> Lsn {
268 self.state.lock().unwrap().next_lsn
269 }
270
271 pub fn oldest_segment_id(&self) -> u64 {
272 self.state.lock().unwrap().oldest_segment_id.raw()
273 }
274
275 pub fn active_segment_id(&self) -> u64 {
276 self.state.lock().unwrap().active_segment_id.raw()
277 }
278
279 pub fn begin(&self) -> Result<Lsn, WalError> {
289 self.check_healthy()?;
290 let mut state = self.state.lock().unwrap();
291 self.maybe_rotate(&mut state)?;
292 Self::alloc_and_append(&mut state, |lsn| WalRecord::TxBegin { lsn })
293 }
294
295 pub fn append(&self, tx_begin_lsn: Lsn, event: &MutationEvent) -> Result<Lsn, WalError> {
298 self.check_healthy()?;
299 let mut state = self.state.lock().unwrap();
300 Self::alloc_and_append(&mut state, |lsn| WalRecord::Mutation {
301 lsn,
302 tx_begin_lsn,
303 event: event.clone(),
304 })
305 }
306
307 pub fn append_batch(
311 &self,
312 tx_begin_lsn: Lsn,
313 events: Vec<MutationEvent>,
314 ) -> Result<Lsn, WalError> {
315 self.check_healthy()?;
316 if events.is_empty() {
317 return Err(WalError::Encode(
318 "mutation batch must contain at least one event".into(),
319 ));
320 }
321 let mut state = self.state.lock().unwrap();
322 Self::alloc_and_append(&mut state, |lsn| WalRecord::MutationBatch {
323 lsn,
324 tx_begin_lsn,
325 events,
326 })
327 }
328
329 pub fn commit(&self, tx_begin_lsn: Lsn) -> Result<Lsn, WalError> {
333 self.check_healthy()?;
334 let mut state = self.state.lock().unwrap();
335 Self::alloc_and_append(&mut state, |lsn| WalRecord::TxCommit { lsn, tx_begin_lsn })
336 }
337
338 pub fn abort(&self, tx_begin_lsn: Lsn) -> Result<Lsn, WalError> {
341 self.check_healthy()?;
342 let mut state = self.state.lock().unwrap();
343 Self::alloc_and_append(&mut state, |lsn| WalRecord::TxAbort { lsn, tx_begin_lsn })
344 }
345
346 pub fn checkpoint_marker(&self, snapshot_lsn: Lsn) -> Result<Lsn, WalError> {
350 self.check_healthy()?;
351 let mut state = self.state.lock().unwrap();
352 Self::alloc_and_append(&mut state, |lsn| WalRecord::Checkpoint {
353 lsn,
354 snapshot_lsn,
355 })
356 }
357
358 #[inline]
364 fn alloc_and_append(
365 state: &mut WalState,
366 build: impl FnOnce(Lsn) -> WalRecord,
367 ) -> Result<Lsn, WalError> {
368 let lsn = state.next_lsn;
369 state.next_lsn = lsn.next();
370 state.active_writer.append(&build(lsn))?;
371 Ok(lsn)
372 }
373
374 pub fn flush(&self) -> Result<(), WalError> {
388 self.check_healthy()?;
389 self.flush_inner(FlushKind::PerConfiguredMode)
390 }
391
392 pub fn force_fsync(&self) -> Result<(), WalError> {
398 self.check_healthy()?;
399 self.flush_inner(FlushKind::ForceFsync)
400 }
401
402 fn flush_inner(&self, kind: FlushKind) -> Result<(), WalError> {
406 let mut state = self.state.lock().unwrap();
407 let written_lsn = Lsn::new(state.next_lsn.raw().saturating_sub(1));
408
409 let do_fsync = matches!(
414 (kind, self.sync_mode),
415 (FlushKind::ForceFsync, _) | (_, SyncMode::PerCommit)
416 );
417 let advance_durable = matches!(
418 (kind, self.sync_mode),
419 (FlushKind::ForceFsync, _) | (_, SyncMode::PerCommit) | (_, SyncMode::None)
420 );
421
422 if do_fsync {
423 state.active_writer.flush_and_sync()?;
424 } else {
425 state.active_writer.flush_buffer()?;
426 }
427 if advance_durable {
428 state.durable_lsn = written_lsn;
429 }
430 Ok(())
431 }
432
433 pub fn truncate_up_to(&self, fence_lsn: Lsn) -> Result<(), WalError> {
442 let mut state = self.state.lock().unwrap();
443 let active_id = state.active_segment_id;
444 let entries = self.segments.list()?;
445
446 let mut to_drop: Vec<crate::dir::SegmentEntry> = Vec::new();
447 for (i, entry) in entries.iter().enumerate() {
448 if entry.id >= active_id.saturating_prev() {
451 break;
452 }
453 let next = match entries.get(i + 1) {
456 Some(n) => n,
457 None => break,
458 };
459 let next_base = SegmentDir::base_lsn(&next.path)?;
460 if next_base.raw().saturating_sub(1) <= fence_lsn.raw() {
461 to_drop.push(entry.clone());
462 }
463 }
464
465 for entry in to_drop {
466 fs::remove_file(&entry.path)?;
467 if entry.id >= state.oldest_segment_id {
468 state.oldest_segment_id = entry.id.next();
469 }
470 }
471 if state.oldest_segment_id != entries.first().map(|e| e.id).unwrap_or(active_id) {
472 self.segments.sync_dir()?;
473 }
474 Ok(())
475 }
476
477 fn maybe_rotate(&self, state: &mut WalState) -> Result<(), WalError> {
481 if state.active_writer.bytes_written() < self.segment_target_bytes {
482 return Ok(());
483 }
484 state.active_writer.flush_and_sync()?;
488 state.active_writer.seal()?;
489
490 let next_id = state.active_segment_id.next();
491 let writer = SegmentWriter::create(self.segments.path_for(next_id), state.next_lsn)?;
492 self.segments.sync_dir()?;
493 state.active_writer = writer;
494 state.active_segment_id = next_id;
495 Ok(())
496 }
497}
498
499impl Drop for Wal {
500 fn drop(&mut self) {
501 if matches!(self.sync_mode, SyncMode::Group { .. }) {
502 let _ = self.flush_inner(FlushKind::ForceFsync);
503 }
504 if let Ok(slot) = self._flusher.get_mut() {
508 let _ = slot.take();
509 }
510 }
511}
512
513struct GroupFlusherHandle {
522 shutdown: Arc<AtomicBool>,
523 handle: Option<JoinHandle<()>>,
524}
525
526impl Drop for GroupFlusherHandle {
527 fn drop(&mut self) {
528 self.shutdown.store(true, Ordering::Release);
529 if let Some(h) = self.handle.take() {
530 let _ = h.join();
535 }
536 }
537}
538
539fn spawn_group_flusher(weak: Weak<Wal>, interval: Duration) -> GroupFlusherHandle {
540 let shutdown = Arc::new(AtomicBool::new(false));
541 let shutdown_clone = Arc::clone(&shutdown);
542 let handle = thread::spawn(move || {
543 while !shutdown_clone.load(Ordering::Acquire) {
548 let slice = Duration::from_millis(50).min(interval);
553 let mut elapsed = Duration::ZERO;
554 while elapsed < interval && !shutdown_clone.load(Ordering::Acquire) {
555 thread::sleep(slice);
556 elapsed += slice;
557 }
558 if shutdown_clone.load(Ordering::Acquire) {
559 break;
560 }
561 match weak.upgrade() {
562 Some(wal) => {
563 if let Err(err) = wal.flush_inner(FlushKind::ForceFsync) {
572 let mut slot = wal.bg_failure.lock().unwrap();
573 if slot.is_none() {
574 *slot = Some(format!("bg fsync failed: {err}"));
575 }
576 break;
577 }
578 }
579 None => break,
580 }
581 }
582 });
583 GroupFlusherHandle {
584 shutdown,
585 handle: Some(handle),
586 }
587}
588
589#[cfg(test)]
590mod tests {
591 use super::*;
592 use lora_store::{MutationEvent, Properties, PropertyValue};
593
594 use crate::testing::TmpDir;
595
596 fn ev(id: u64) -> MutationEvent {
597 let mut p = Properties::new();
598 p.insert("v".into(), PropertyValue::Int(id as i64));
599 MutationEvent::CreateNode {
600 id,
601 labels: vec!["N".into()],
602 properties: p,
603 }
604 }
605
606 fn open_default(dir: &Path) -> (Arc<Wal>, Vec<MutationEvent>) {
607 Wal::open(dir, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO).unwrap()
608 }
609
610 #[test]
611 fn fresh_open_creates_first_segment() {
612 let dir = TmpDir::new("fresh");
613 let (wal, replay) = open_default(&dir.path);
614 assert!(replay.is_empty());
615 assert_eq!(wal.next_lsn(), Lsn::new(1));
616 assert_eq!(wal.active_segment_id(), 1);
617 let entries: Vec<_> = std::fs::read_dir(&dir.path)
620 .unwrap()
621 .filter_map(|e| e.ok())
622 .map(|e| e.file_name().to_string_lossy().into_owned())
623 .collect();
624 assert!(
625 entries.iter().any(|n| n == ".lora-wal.lock"),
626 "WAL dir should contain the live directory lock, found: {entries:?}"
627 );
628 assert!(
629 entries
630 .iter()
631 .filter(|n| n.as_str() != ".lora-wal.lock")
632 .all(|n| n.ends_with(".wal")),
633 "WAL dir should contain only segment files plus the lock, found: {entries:?}"
634 );
635 }
636
637 #[test]
638 fn opening_same_directory_twice_fails_until_first_handle_drops() {
639 let dir = TmpDir::new("exclusive");
640 let (wal, _) = open_default(&dir.path);
641
642 match Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO) {
643 Err(WalError::AlreadyOpen { dir: locked_dir }) => {
644 assert_eq!(locked_dir, dir.path);
645 }
646 Err(err) => panic!("expected AlreadyOpen, got {err:?}"),
647 Ok(_) => panic!("second WAL open on same directory should fail"),
648 }
649
650 drop(wal);
651 let (reopened, _) = open_default(&dir.path);
652 drop(reopened);
653 }
654
655 #[test]
656 fn begin_append_commit_round_trip_through_replay() {
657 let dir = TmpDir::new("commit");
658
659 {
662 let (wal, _) = open_default(&dir.path);
663 let begin = wal.begin().unwrap();
664 wal.append(begin, &ev(1)).unwrap();
665 wal.append(begin, &ev(2)).unwrap();
666 wal.commit(begin).unwrap();
667 wal.flush().unwrap();
668
669 let begin = wal.begin().unwrap();
670 wal.append(begin, &ev(3)).unwrap();
671 wal.commit(begin).unwrap();
672 wal.flush().unwrap();
673 }
675
676 let (wal, replay) = open_default(&dir.path);
678 assert_eq!(replay.len(), 3);
679 assert_eq!(replay[0], ev(1));
680 assert_eq!(replay[1], ev(2));
681 assert_eq!(replay[2], ev(3));
682 assert_eq!(wal.next_lsn(), Lsn::new(8));
685 }
686
687 #[test]
688 fn aborted_transaction_is_dropped_on_replay() {
689 let dir = TmpDir::new("abort");
690
691 {
692 let (wal, _) = open_default(&dir.path);
693 let b1 = wal.begin().unwrap();
694 wal.append(b1, &ev(1)).unwrap();
695 wal.commit(b1).unwrap();
696 wal.flush().unwrap();
697
698 let b2 = wal.begin().unwrap();
699 wal.append(b2, &ev(99)).unwrap();
700 wal.abort(b2).unwrap();
701 wal.flush().unwrap();
702 }
703
704 let (_, replay) = open_default(&dir.path);
705 assert_eq!(replay, vec![ev(1)]);
706 }
707
708 #[test]
709 fn uncommitted_transaction_at_end_of_log_is_discarded() {
710 let dir = TmpDir::new("uncommitted");
711
712 {
713 let (wal, _) = open_default(&dir.path);
714 let b1 = wal.begin().unwrap();
715 wal.append(b1, &ev(1)).unwrap();
716 wal.commit(b1).unwrap();
717 wal.flush().unwrap();
718
719 let b2 = wal.begin().unwrap();
722 wal.append(b2, &ev(99)).unwrap();
723 wal.flush().unwrap();
724 }
725
726 let (_, replay) = open_default(&dir.path);
727 assert_eq!(replay, vec![ev(1)]);
728 }
729
730 #[test]
731 fn segment_rotation_at_begin_boundary() {
732 let dir = TmpDir::new("rotate");
733
734 let (wal, _) = Wal::open(&dir.path, SyncMode::PerCommit, 256, Lsn::ZERO).unwrap();
737
738 let b1 = wal.begin().unwrap();
740 for i in 0..5 {
741 wal.append(b1, &ev(i)).unwrap();
742 }
743 wal.commit(b1).unwrap();
744 wal.flush().unwrap();
745 assert_eq!(wal.active_segment_id(), 1);
746
747 let b2 = wal.begin().unwrap();
749 wal.append(b2, &ev(100)).unwrap();
750 wal.commit(b2).unwrap();
751 wal.flush().unwrap();
752 assert_eq!(
753 wal.active_segment_id(),
754 2,
755 "begin() should have rotated to segment 2"
756 );
757
758 let segments = SegmentDir::new(&dir.path).list().unwrap();
759 assert_eq!(segments.len(), 2);
760
761 drop(wal);
762 let (_, replay) = open_default(&dir.path);
763 assert_eq!(replay.len(), 6);
764 }
765
766 #[test]
767 fn checkpoint_lsn_skips_already_checkpointed_events() {
768 let dir = TmpDir::new("ckpt-skip");
769 let (wal, _) = open_default(&dir.path);
770
771 let a = wal.begin().unwrap();
773 wal.append(a, &ev(1)).unwrap();
774 wal.append(a, &ev(2)).unwrap();
775 let commit_a = wal.commit(a).unwrap();
776 wal.flush().unwrap();
777
778 let b = wal.begin().unwrap();
780 wal.append(b, &ev(3)).unwrap();
781 wal.commit(b).unwrap();
782 wal.flush().unwrap();
783 drop(wal);
784
785 let (_, replay) =
788 Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, commit_a).unwrap();
789 assert_eq!(replay, vec![ev(3)]);
790 }
791
792 #[test]
793 fn replay_rejects_commit_without_begin() {
794 let dir = TmpDir::new("commit-without-begin");
795
796 {
797 let (wal, _) = open_default(&dir.path);
798 wal.commit(Lsn::new(99)).unwrap();
799 wal.flush().unwrap();
800 }
801
802 let err = match Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO) {
803 Ok(_) => panic!("malformed WAL should not open"),
804 Err(err) => err,
805 };
806 assert!(
807 matches!(err, WalError::Malformed(ref msg) if msg.contains("missing tx begin")),
808 "expected malformed missing-begin error, got {err:?}"
809 );
810 }
811
812 #[test]
813 fn replay_rejects_mutation_without_begin() {
814 let dir = TmpDir::new("mutation-without-begin");
815
816 {
817 let (wal, _) = open_default(&dir.path);
818 wal.append(Lsn::new(99), &ev(1)).unwrap();
819 wal.flush().unwrap();
820 }
821
822 let err = match Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO) {
823 Ok(_) => panic!("malformed WAL should not open"),
824 Err(err) => err,
825 };
826 assert!(
827 matches!(err, WalError::Malformed(ref msg) if msg.contains("missing tx begin")),
828 "expected malformed missing-begin error, got {err:?}"
829 );
830 }
831
832 #[test]
833 fn torn_tail_is_truncated_on_open() {
834 let dir = TmpDir::new("torn");
835
836 {
837 let (wal, _) = open_default(&dir.path);
838 let b = wal.begin().unwrap();
839 wal.append(b, &ev(1)).unwrap();
840 wal.commit(b).unwrap();
841 wal.flush().unwrap();
842 }
843
844 let segments = SegmentDir::new(&dir.path).list().unwrap();
846 let active = &segments.last().unwrap().path;
847 {
848 use std::io::Write;
849 let mut f = OpenOptions::new().append(true).open(active).unwrap();
850 f.write_all(&[0xff; 32]).unwrap();
851 f.sync_all().unwrap();
852 }
853
854 let (wal, replay) = open_default(&dir.path);
857 assert_eq!(replay, vec![ev(1)]);
858
859 let b = wal.begin().unwrap();
861 wal.append(b, &ev(2)).unwrap();
862 wal.commit(b).unwrap();
863 wal.flush().unwrap();
864 drop(wal);
865
866 let (_, replay) = open_default(&dir.path);
867 assert_eq!(replay, vec![ev(1), ev(2)]);
868 }
869
870 #[test]
871 fn checkpoint_marker_is_recorded_and_observed() {
872 let dir = TmpDir::new("ckpt-marker");
873
874 let snapshot_lsn = {
875 let (wal, _) = open_default(&dir.path);
876 let b = wal.begin().unwrap();
877 wal.append(b, &ev(1)).unwrap();
878 let commit = wal.commit(b).unwrap();
879 wal.flush().unwrap();
880 wal.checkpoint_marker(commit).unwrap();
881 wal.flush().unwrap();
882 commit
883 };
884
885 let outcome = crate::replay::replay_dir(&dir.path, Lsn::ZERO).unwrap();
886 assert_eq!(
887 outcome.checkpoint_lsn_observed,
888 Some(snapshot_lsn),
889 "checkpoint marker should be surfaced by replay"
890 );
891 }
892
893 #[test]
894 fn group_mode_durable_lsn_advances_via_bg_flusher() {
895 let dir = TmpDir::new("group");
896 let (wal, _) = Wal::open(
899 &dir.path,
900 SyncMode::Group { interval_ms: 25 },
901 8 * 1024 * 1024,
902 Lsn::ZERO,
903 )
904 .unwrap();
905
906 let begin = wal.begin().unwrap();
907 wal.append(begin, &ev(1)).unwrap();
908 wal.commit(begin).unwrap();
909 wal.flush().unwrap(); assert_eq!(
914 wal.durable_lsn(),
915 Lsn::ZERO,
916 "Group flush() must not advance durable_lsn"
917 );
918
919 let deadline = std::time::Instant::now() + std::time::Duration::from_millis(500);
921 loop {
922 if wal.durable_lsn() > Lsn::ZERO {
923 break;
924 }
925 if std::time::Instant::now() >= deadline {
926 panic!(
927 "bg flusher did not advance durable_lsn within 500 ms (still at {})",
928 wal.durable_lsn()
929 );
930 }
931 std::thread::sleep(std::time::Duration::from_millis(10));
932 }
933 assert_eq!(wal.durable_lsn().raw(), wal.next_lsn().raw() - 1);
934 drop(wal);
936 }
937
938 #[test]
939 fn none_mode_advances_durable_lsn_on_flush() {
940 let dir = TmpDir::new("none");
941 let (wal, _) = Wal::open(&dir.path, SyncMode::None, 8 * 1024 * 1024, Lsn::ZERO).unwrap();
942
943 let begin = wal.begin().unwrap();
944 wal.append(begin, &ev(1)).unwrap();
945 wal.commit(begin).unwrap();
946 wal.flush().unwrap();
947
948 assert_eq!(wal.durable_lsn().raw(), wal.next_lsn().raw() - 1);
951 }
952
953 #[test]
954 fn force_fsync_always_advances_durable_lsn() {
955 let dir = TmpDir::new("force-fsync");
956 let (wal, _) = Wal::open(
957 &dir.path,
958 SyncMode::Group {
959 interval_ms: 60_000,
960 },
961 8 * 1024 * 1024,
962 Lsn::ZERO,
963 )
964 .unwrap();
965
966 let begin = wal.begin().unwrap();
967 wal.append(begin, &ev(1)).unwrap();
968 wal.commit(begin).unwrap();
969 wal.flush().unwrap(); assert_eq!(wal.durable_lsn(), Lsn::ZERO);
971
972 wal.force_fsync().unwrap();
975 assert_eq!(wal.durable_lsn().raw(), wal.next_lsn().raw() - 1);
976 }
977
978 #[test]
979 fn truncate_up_to_drops_old_sealed_segments() {
980 let dir = TmpDir::new("truncate");
981
982 let (wal, _) = Wal::open(&dir.path, SyncMode::PerCommit, 64, Lsn::ZERO).unwrap();
984
985 let mut last_commit = Lsn::ZERO;
986 for i in 0..5 {
987 let b = wal.begin().unwrap();
988 wal.append(b, &ev(i)).unwrap();
989 last_commit = wal.commit(b).unwrap();
990 wal.flush().unwrap();
991 }
992 assert!(
994 wal.active_segment_id() >= 4,
995 "expected several rotations, got {}",
996 wal.active_segment_id()
997 );
998
999 let segments = SegmentDir::new(&dir.path);
1000 let before = segments.list().unwrap().len();
1001 wal.truncate_up_to(last_commit).unwrap();
1002 let after = segments.list().unwrap().len();
1003
1004 assert!(
1005 after < before,
1006 "truncate_up_to should have dropped at least one segment ({} → {})",
1007 before,
1008 after
1009 );
1010 assert!(
1012 after >= 2,
1013 "active and the segment preceding it must be kept"
1014 );
1015
1016 drop(wal);
1021 let (_, replay) = Wal::open(&dir.path, SyncMode::PerCommit, 64, last_commit).unwrap();
1022 assert!(replay.is_empty());
1024 }
1025}