1use std::fs;
29#[cfg(test)]
30use std::fs::OpenOptions;
31use std::path::Path;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::sync::{Arc, Mutex, Weak};
34use std::thread::{self, JoinHandle};
35use std::time::Duration;
36
37use lora_store::MutationEvent;
38
39use crate::config::SyncMode;
40use crate::dir::{SegmentDir, SegmentId};
41use crate::error::WalError;
42use crate::lock::DirLock;
43use crate::lsn::Lsn;
44use crate::record::WalRecord;
45use crate::replay::{replay_segments, ReplayOutcome};
46use crate::segment::SegmentWriter;
47
48struct WalState {
52 next_lsn: Lsn,
53 durable_lsn: Lsn,
54 active_segment_id: SegmentId,
55 active_writer: SegmentWriter,
56 oldest_segment_id: SegmentId,
58}
59
60type BgFailure = Mutex<Option<String>>;
68
69#[derive(Debug, Clone, Copy)]
74enum FlushKind {
75 PerConfiguredMode,
78 ForceFsync,
81}
82
83pub struct Wal {
93 segments: SegmentDir,
94 sync_mode: SyncMode,
95 segment_target_bytes: u64,
96 state: Mutex<WalState>,
97 bg_failure: Arc<BgFailure>,
101 _flusher: Mutex<Option<GroupFlusherHandle>>,
105 _dir_lock: DirLock,
108}
109
110impl Wal {
111 pub fn open(
122 dir: impl Into<std::path::PathBuf>,
123 sync_mode: SyncMode,
124 segment_target_bytes: u64,
125 checkpoint_lsn: Lsn,
126 ) -> Result<(Arc<Self>, Vec<MutationEvent>), WalError> {
127 let segments = SegmentDir::new(dir);
128 fs::create_dir_all(segments.root())?;
129 let dir_lock = DirLock::acquire(segments.root())?;
130
131 let entries = segments.list()?;
132 let (active_id, active_writer, replay) = if entries.is_empty() {
133 Self::open_fresh(&segments)?
134 } else {
135 Self::open_existing(&segments, &entries, checkpoint_lsn)?
136 };
137
138 let next_lsn = if replay.max_lsn.is_zero() {
139 Lsn::new(1)
140 } else {
141 replay.max_lsn.next()
142 };
143 let durable_lsn = replay.max_lsn;
149
150 let oldest_segment_id = entries.first().map(|e| e.id).unwrap_or(active_id);
151
152 let state = WalState {
153 next_lsn,
154 durable_lsn,
155 active_segment_id: active_id,
156 active_writer,
157 oldest_segment_id,
158 };
159
160 let wal = Arc::new(Self {
161 segments,
162 sync_mode,
163 segment_target_bytes,
164 state: Mutex::new(state),
165 bg_failure: Arc::new(Mutex::new(None)),
166 _flusher: Mutex::new(None),
167 _dir_lock: dir_lock,
168 });
169
170 if let SyncMode::Group { interval_ms } = sync_mode {
175 let interval = Duration::from_millis(u64::from(interval_ms.max(1)));
176 let handle = spawn_group_flusher(Arc::downgrade(&wal), interval);
177 *wal._flusher.lock().unwrap() = Some(handle);
178 }
179
180 Ok((wal, replay.committed_events))
181 }
182
183 fn open_fresh(
186 segments: &SegmentDir,
187 ) -> Result<(SegmentId, SegmentWriter, ReplayOutcome), WalError> {
188 let id = SegmentId::FIRST;
189 let writer = SegmentWriter::create(segments.path_for(id), Lsn::new(1))?;
190 segments.sync_dir()?;
191 let replay = ReplayOutcome {
192 committed_events: Vec::new(),
193 max_lsn: Lsn::ZERO,
194 torn_tail: None,
195 checkpoint_lsn_observed: None,
196 };
197 Ok((id, writer, replay))
198 }
199
200 fn open_existing(
204 segments: &SegmentDir,
205 entries: &[crate::dir::SegmentEntry],
206 checkpoint_lsn: Lsn,
207 ) -> Result<(SegmentId, SegmentWriter, ReplayOutcome), WalError> {
208 let paths: Vec<_> = entries.iter().map(|e| e.path.clone()).collect();
209 let replay = replay_segments(&paths, checkpoint_lsn)?;
210
211 let active = entries.last().expect("entries non-empty in open_existing");
215 let (mut writer, _torn_from_writer) =
216 SegmentWriter::open_for_append(segments.path_for(active.id))?;
217
218 if let Some(t) = &replay.torn_tail {
222 if t.segment_path == active.path {
223 writer.truncate_to(t.last_good_offset)?;
224 } else {
225 return Err(WalError::Malformed(format!(
226 "torn tail found in sealed segment {}",
227 t.segment_path.display()
228 )));
229 }
230 }
231
232 Ok((active.id, writer, replay))
233 }
234
235 pub fn dir(&self) -> &Path {
236 self.segments.root()
237 }
238
239 pub fn sync_mode(&self) -> SyncMode {
240 self.sync_mode
241 }
242
243 pub fn durable_lsn(&self) -> Lsn {
244 self.state.lock().unwrap().durable_lsn
245 }
246
247 pub fn bg_failure(&self) -> Option<String> {
254 self.bg_failure.lock().unwrap().clone()
255 }
256
257 fn check_healthy(&self) -> Result<(), WalError> {
258 if self.bg_failure.lock().unwrap().is_some() {
259 return Err(WalError::Poisoned);
260 }
261 Ok(())
262 }
263
264 pub fn next_lsn(&self) -> Lsn {
268 self.state.lock().unwrap().next_lsn
269 }
270
271 pub fn oldest_segment_id(&self) -> u64 {
272 self.state.lock().unwrap().oldest_segment_id.raw()
273 }
274
275 pub fn active_segment_id(&self) -> u64 {
276 self.state.lock().unwrap().active_segment_id.raw()
277 }
278
279 pub fn begin(&self) -> Result<Lsn, WalError> {
289 self.check_healthy()?;
290 let mut state = self.state.lock().unwrap();
291 self.maybe_rotate(&mut state)?;
292 Self::alloc_and_append(&mut state, |lsn| WalRecord::TxBegin { lsn })
293 }
294
295 pub fn append(&self, tx_begin_lsn: Lsn, event: &MutationEvent) -> Result<Lsn, WalError> {
298 self.check_healthy()?;
299 let mut state = self.state.lock().unwrap();
300 Self::alloc_and_append(&mut state, |lsn| WalRecord::Mutation {
301 lsn,
302 tx_begin_lsn,
303 event: event.clone(),
304 })
305 }
306
307 pub fn append_batch(
311 &self,
312 tx_begin_lsn: Lsn,
313 events: Vec<MutationEvent>,
314 ) -> Result<Lsn, WalError> {
315 self.check_healthy()?;
316 if events.is_empty() {
317 return Err(WalError::Encode(
318 "mutation batch must contain at least one event".into(),
319 ));
320 }
321 let mut state = self.state.lock().unwrap();
322 Self::alloc_and_append(&mut state, |lsn| WalRecord::MutationBatch {
323 lsn,
324 tx_begin_lsn,
325 events,
326 })
327 }
328
329 pub fn commit(&self, tx_begin_lsn: Lsn) -> Result<Lsn, WalError> {
333 self.check_healthy()?;
334 let mut state = self.state.lock().unwrap();
335 Self::alloc_and_append(&mut state, |lsn| WalRecord::TxCommit { lsn, tx_begin_lsn })
336 }
337
338 pub fn abort(&self, tx_begin_lsn: Lsn) -> Result<Lsn, WalError> {
341 self.check_healthy()?;
342 let mut state = self.state.lock().unwrap();
343 Self::alloc_and_append(&mut state, |lsn| WalRecord::TxAbort { lsn, tx_begin_lsn })
344 }
345
346 pub fn checkpoint_marker(&self, snapshot_lsn: Lsn) -> Result<Lsn, WalError> {
350 self.check_healthy()?;
351 let mut state = self.state.lock().unwrap();
352 Self::alloc_and_append(&mut state, |lsn| WalRecord::Checkpoint {
353 lsn,
354 snapshot_lsn,
355 })
356 }
357
358 #[inline]
364 fn alloc_and_append(
365 state: &mut WalState,
366 build: impl FnOnce(Lsn) -> WalRecord,
367 ) -> Result<Lsn, WalError> {
368 let lsn = state.next_lsn;
369 state.next_lsn = lsn.next();
370 state.active_writer.append(&build(lsn))?;
371 Ok(lsn)
372 }
373
374 pub fn flush(&self) -> Result<(), WalError> {
388 self.check_healthy()?;
389 self.flush_inner(FlushKind::PerConfiguredMode)
390 }
391
392 pub fn force_fsync(&self) -> Result<(), WalError> {
398 self.check_healthy()?;
399 self.flush_inner(FlushKind::ForceFsync)
400 }
401
402 fn flush_inner(&self, kind: FlushKind) -> Result<(), WalError> {
406 let mut state = self.state.lock().unwrap();
407 let written_lsn = Lsn::new(state.next_lsn.raw().saturating_sub(1));
408
409 let do_fsync = matches!(
414 (kind, self.sync_mode),
415 (FlushKind::ForceFsync, _) | (_, SyncMode::PerCommit)
416 );
417 let advance_durable = matches!(
418 (kind, self.sync_mode),
419 (FlushKind::ForceFsync, _) | (_, SyncMode::PerCommit) | (_, SyncMode::None)
420 );
421
422 if matches!(
423 (kind, self.sync_mode),
424 (FlushKind::PerConfiguredMode, SyncMode::Group { .. })
425 ) {
426 } else if do_fsync {
430 state.active_writer.flush_and_sync()?;
431 } else {
432 state.active_writer.flush_buffer()?;
433 }
434 if advance_durable {
435 state.durable_lsn = written_lsn;
436 }
437 Ok(())
438 }
439
440 pub fn truncate_up_to(&self, fence_lsn: Lsn) -> Result<(), WalError> {
449 let mut state = self.state.lock().unwrap();
450 let active_id = state.active_segment_id;
451 let entries = self.segments.list()?;
452
453 let mut to_drop: Vec<crate::dir::SegmentEntry> = Vec::new();
454 for (i, entry) in entries.iter().enumerate() {
455 if entry.id >= active_id.saturating_prev() {
458 break;
459 }
460 let next = match entries.get(i + 1) {
463 Some(n) => n,
464 None => break,
465 };
466 let next_base = SegmentDir::base_lsn(&next.path)?;
467 if next_base.raw().saturating_sub(1) <= fence_lsn.raw() {
468 to_drop.push(entry.clone());
469 }
470 }
471
472 for entry in to_drop {
473 fs::remove_file(&entry.path)?;
474 if entry.id >= state.oldest_segment_id {
475 state.oldest_segment_id = entry.id.next();
476 }
477 }
478 if state.oldest_segment_id != entries.first().map(|e| e.id).unwrap_or(active_id) {
479 self.segments.sync_dir()?;
480 }
481 Ok(())
482 }
483
484 fn maybe_rotate(&self, state: &mut WalState) -> Result<(), WalError> {
488 if state.active_writer.bytes_written() < self.segment_target_bytes {
489 return Ok(());
490 }
491 state.active_writer.flush_and_sync()?;
495 state.active_writer.seal()?;
496
497 let next_id = state.active_segment_id.next();
498 let writer = SegmentWriter::create(self.segments.path_for(next_id), state.next_lsn)?;
499 self.segments.sync_dir()?;
500 state.active_writer = writer;
501 state.active_segment_id = next_id;
502 Ok(())
503 }
504}
505
506impl Drop for Wal {
507 fn drop(&mut self) {
508 if matches!(self.sync_mode, SyncMode::Group { .. }) {
509 let _ = self.flush_inner(FlushKind::ForceFsync);
510 }
511 if let Ok(slot) = self._flusher.get_mut() {
515 let _ = slot.take();
516 }
517 }
518}
519
520struct GroupFlusherHandle {
529 shutdown: Arc<AtomicBool>,
530 handle: Option<JoinHandle<()>>,
531}
532
533impl Drop for GroupFlusherHandle {
534 fn drop(&mut self) {
535 self.shutdown.store(true, Ordering::Release);
536 if let Some(h) = self.handle.take() {
537 let _ = h.join();
542 }
543 }
544}
545
546fn spawn_group_flusher(weak: Weak<Wal>, interval: Duration) -> GroupFlusherHandle {
547 let shutdown = Arc::new(AtomicBool::new(false));
548 let shutdown_clone = Arc::clone(&shutdown);
549 let handle = thread::spawn(move || {
550 while !shutdown_clone.load(Ordering::Acquire) {
555 let slice = Duration::from_millis(50).min(interval);
560 let mut elapsed = Duration::ZERO;
561 while elapsed < interval && !shutdown_clone.load(Ordering::Acquire) {
562 thread::sleep(slice);
563 elapsed += slice;
564 }
565 if shutdown_clone.load(Ordering::Acquire) {
566 break;
567 }
568 match weak.upgrade() {
569 Some(wal) => {
570 if let Err(err) = wal.flush_inner(FlushKind::ForceFsync) {
579 let mut slot = wal.bg_failure.lock().unwrap();
580 if slot.is_none() {
581 *slot = Some(format!("bg fsync failed: {err}"));
582 }
583 break;
584 }
585 }
586 None => break,
587 }
588 }
589 });
590 GroupFlusherHandle {
591 shutdown,
592 handle: Some(handle),
593 }
594}
595
596#[cfg(test)]
597mod tests {
598 use super::*;
599 use lora_store::{MutationEvent, Properties, PropertyValue};
600
601 use crate::testing::TmpDir;
602
603 fn ev(id: u64) -> MutationEvent {
604 let mut p = Properties::new();
605 p.insert("v".into(), PropertyValue::Int(id as i64));
606 MutationEvent::CreateNode {
607 id,
608 labels: vec!["N".into()],
609 properties: p,
610 }
611 }
612
613 fn open_default(dir: &Path) -> (Arc<Wal>, Vec<MutationEvent>) {
614 Wal::open(dir, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO).unwrap()
615 }
616
617 #[test]
618 fn fresh_open_creates_first_segment() {
619 let dir = TmpDir::new("fresh");
620 let (wal, replay) = open_default(&dir.path);
621 assert!(replay.is_empty());
622 assert_eq!(wal.next_lsn(), Lsn::new(1));
623 assert_eq!(wal.active_segment_id(), 1);
624 let entries: Vec<_> = std::fs::read_dir(&dir.path)
627 .unwrap()
628 .filter_map(|e| e.ok())
629 .map(|e| e.file_name().to_string_lossy().into_owned())
630 .collect();
631 assert!(
632 entries.iter().any(|n| n == ".lora-wal.lock"),
633 "WAL dir should contain the live directory lock, found: {entries:?}"
634 );
635 assert!(
636 entries
637 .iter()
638 .filter(|n| n.as_str() != ".lora-wal.lock")
639 .all(|n| n.ends_with(".wal")),
640 "WAL dir should contain only segment files plus the lock, found: {entries:?}"
641 );
642 }
643
644 #[test]
645 fn opening_same_directory_twice_fails_until_first_handle_drops() {
646 let dir = TmpDir::new("exclusive");
647 let (wal, _) = open_default(&dir.path);
648
649 match Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO) {
650 Err(WalError::AlreadyOpen { dir: locked_dir }) => {
651 assert_eq!(locked_dir, dir.path);
652 }
653 Err(err) => panic!("expected AlreadyOpen, got {err:?}"),
654 Ok(_) => panic!("second WAL open on same directory should fail"),
655 }
656
657 drop(wal);
658 let (reopened, _) = open_default(&dir.path);
659 drop(reopened);
660 }
661
662 #[test]
663 fn begin_append_commit_round_trip_through_replay() {
664 let dir = TmpDir::new("commit");
665
666 {
669 let (wal, _) = open_default(&dir.path);
670 let begin = wal.begin().unwrap();
671 wal.append(begin, &ev(1)).unwrap();
672 wal.append(begin, &ev(2)).unwrap();
673 wal.commit(begin).unwrap();
674 wal.flush().unwrap();
675
676 let begin = wal.begin().unwrap();
677 wal.append(begin, &ev(3)).unwrap();
678 wal.commit(begin).unwrap();
679 wal.flush().unwrap();
680 }
682
683 let (wal, replay) = open_default(&dir.path);
685 assert_eq!(replay.len(), 3);
686 assert_eq!(replay[0], ev(1));
687 assert_eq!(replay[1], ev(2));
688 assert_eq!(replay[2], ev(3));
689 assert_eq!(wal.next_lsn(), Lsn::new(8));
692 }
693
694 #[test]
695 fn aborted_transaction_is_dropped_on_replay() {
696 let dir = TmpDir::new("abort");
697
698 {
699 let (wal, _) = open_default(&dir.path);
700 let b1 = wal.begin().unwrap();
701 wal.append(b1, &ev(1)).unwrap();
702 wal.commit(b1).unwrap();
703 wal.flush().unwrap();
704
705 let b2 = wal.begin().unwrap();
706 wal.append(b2, &ev(99)).unwrap();
707 wal.abort(b2).unwrap();
708 wal.flush().unwrap();
709 }
710
711 let (_, replay) = open_default(&dir.path);
712 assert_eq!(replay, vec![ev(1)]);
713 }
714
715 #[test]
716 fn uncommitted_transaction_at_end_of_log_is_discarded() {
717 let dir = TmpDir::new("uncommitted");
718
719 {
720 let (wal, _) = open_default(&dir.path);
721 let b1 = wal.begin().unwrap();
722 wal.append(b1, &ev(1)).unwrap();
723 wal.commit(b1).unwrap();
724 wal.flush().unwrap();
725
726 let b2 = wal.begin().unwrap();
729 wal.append(b2, &ev(99)).unwrap();
730 wal.flush().unwrap();
731 }
732
733 let (_, replay) = open_default(&dir.path);
734 assert_eq!(replay, vec![ev(1)]);
735 }
736
737 #[test]
738 fn segment_rotation_at_begin_boundary() {
739 let dir = TmpDir::new("rotate");
740
741 let (wal, _) = Wal::open(&dir.path, SyncMode::PerCommit, 256, Lsn::ZERO).unwrap();
744
745 let b1 = wal.begin().unwrap();
747 for i in 0..5 {
748 wal.append(b1, &ev(i)).unwrap();
749 }
750 wal.commit(b1).unwrap();
751 wal.flush().unwrap();
752 assert_eq!(wal.active_segment_id(), 1);
753
754 let b2 = wal.begin().unwrap();
756 wal.append(b2, &ev(100)).unwrap();
757 wal.commit(b2).unwrap();
758 wal.flush().unwrap();
759 assert_eq!(
760 wal.active_segment_id(),
761 2,
762 "begin() should have rotated to segment 2"
763 );
764
765 let segments = SegmentDir::new(&dir.path).list().unwrap();
766 assert_eq!(segments.len(), 2);
767
768 drop(wal);
769 let (_, replay) = open_default(&dir.path);
770 assert_eq!(replay.len(), 6);
771 }
772
773 #[test]
774 fn checkpoint_lsn_skips_already_checkpointed_events() {
775 let dir = TmpDir::new("ckpt-skip");
776 let (wal, _) = open_default(&dir.path);
777
778 let a = wal.begin().unwrap();
780 wal.append(a, &ev(1)).unwrap();
781 wal.append(a, &ev(2)).unwrap();
782 let commit_a = wal.commit(a).unwrap();
783 wal.flush().unwrap();
784
785 let b = wal.begin().unwrap();
787 wal.append(b, &ev(3)).unwrap();
788 wal.commit(b).unwrap();
789 wal.flush().unwrap();
790 drop(wal);
791
792 let (_, replay) =
795 Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, commit_a).unwrap();
796 assert_eq!(replay, vec![ev(3)]);
797 }
798
799 #[test]
800 fn replay_rejects_commit_without_begin() {
801 let dir = TmpDir::new("commit-without-begin");
802
803 {
804 let (wal, _) = open_default(&dir.path);
805 wal.commit(Lsn::new(99)).unwrap();
806 wal.flush().unwrap();
807 }
808
809 let err = match Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO) {
810 Ok(_) => panic!("malformed WAL should not open"),
811 Err(err) => err,
812 };
813 assert!(
814 matches!(err, WalError::Malformed(ref msg) if msg.contains("missing tx begin")),
815 "expected malformed missing-begin error, got {err:?}"
816 );
817 }
818
819 #[test]
820 fn replay_rejects_mutation_without_begin() {
821 let dir = TmpDir::new("mutation-without-begin");
822
823 {
824 let (wal, _) = open_default(&dir.path);
825 wal.append(Lsn::new(99), &ev(1)).unwrap();
826 wal.flush().unwrap();
827 }
828
829 let err = match Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO) {
830 Ok(_) => panic!("malformed WAL should not open"),
831 Err(err) => err,
832 };
833 assert!(
834 matches!(err, WalError::Malformed(ref msg) if msg.contains("missing tx begin")),
835 "expected malformed missing-begin error, got {err:?}"
836 );
837 }
838
839 #[test]
840 fn torn_tail_is_truncated_on_open() {
841 let dir = TmpDir::new("torn");
842
843 {
844 let (wal, _) = open_default(&dir.path);
845 let b = wal.begin().unwrap();
846 wal.append(b, &ev(1)).unwrap();
847 wal.commit(b).unwrap();
848 wal.flush().unwrap();
849 }
850
851 let segments = SegmentDir::new(&dir.path).list().unwrap();
853 let active = &segments.last().unwrap().path;
854 {
855 use std::io::Write;
856 let mut f = OpenOptions::new().append(true).open(active).unwrap();
857 f.write_all(&[0xff; 32]).unwrap();
858 f.sync_all().unwrap();
859 }
860
861 let (wal, replay) = open_default(&dir.path);
864 assert_eq!(replay, vec![ev(1)]);
865
866 let b = wal.begin().unwrap();
868 wal.append(b, &ev(2)).unwrap();
869 wal.commit(b).unwrap();
870 wal.flush().unwrap();
871 drop(wal);
872
873 let (_, replay) = open_default(&dir.path);
874 assert_eq!(replay, vec![ev(1), ev(2)]);
875 }
876
877 #[test]
878 fn checkpoint_marker_is_recorded_and_observed() {
879 let dir = TmpDir::new("ckpt-marker");
880
881 let snapshot_lsn = {
882 let (wal, _) = open_default(&dir.path);
883 let b = wal.begin().unwrap();
884 wal.append(b, &ev(1)).unwrap();
885 let commit = wal.commit(b).unwrap();
886 wal.flush().unwrap();
887 wal.checkpoint_marker(commit).unwrap();
888 wal.flush().unwrap();
889 commit
890 };
891
892 let outcome = crate::replay::replay_dir(&dir.path, Lsn::ZERO).unwrap();
893 assert_eq!(
894 outcome.checkpoint_lsn_observed,
895 Some(snapshot_lsn),
896 "checkpoint marker should be surfaced by replay"
897 );
898 }
899
900 #[test]
901 fn group_mode_durable_lsn_advances_via_bg_flusher() {
902 let dir = TmpDir::new("group");
903 let (wal, _) = Wal::open(
906 &dir.path,
907 SyncMode::Group { interval_ms: 25 },
908 8 * 1024 * 1024,
909 Lsn::ZERO,
910 )
911 .unwrap();
912
913 let begin = wal.begin().unwrap();
914 wal.append(begin, &ev(1)).unwrap();
915 wal.commit(begin).unwrap();
916 wal.flush().unwrap(); assert_eq!(
921 wal.durable_lsn(),
922 Lsn::ZERO,
923 "Group flush() must not advance durable_lsn"
924 );
925
926 let deadline = std::time::Instant::now() + std::time::Duration::from_millis(500);
928 loop {
929 if wal.durable_lsn() > Lsn::ZERO {
930 break;
931 }
932 if std::time::Instant::now() >= deadline {
933 panic!(
934 "bg flusher did not advance durable_lsn within 500 ms (still at {})",
935 wal.durable_lsn()
936 );
937 }
938 std::thread::sleep(std::time::Duration::from_millis(10));
939 }
940 assert_eq!(wal.durable_lsn().raw(), wal.next_lsn().raw() - 1);
941 drop(wal);
943 }
944
945 #[test]
946 fn none_mode_advances_durable_lsn_on_flush() {
947 let dir = TmpDir::new("none");
948 let (wal, _) = Wal::open(&dir.path, SyncMode::None, 8 * 1024 * 1024, Lsn::ZERO).unwrap();
949
950 let begin = wal.begin().unwrap();
951 wal.append(begin, &ev(1)).unwrap();
952 wal.commit(begin).unwrap();
953 wal.flush().unwrap();
954
955 assert_eq!(wal.durable_lsn().raw(), wal.next_lsn().raw() - 1);
958 }
959
960 #[test]
961 fn force_fsync_always_advances_durable_lsn() {
962 let dir = TmpDir::new("force-fsync");
963 let (wal, _) = Wal::open(
964 &dir.path,
965 SyncMode::Group {
966 interval_ms: 60_000,
967 },
968 8 * 1024 * 1024,
969 Lsn::ZERO,
970 )
971 .unwrap();
972
973 let begin = wal.begin().unwrap();
974 wal.append(begin, &ev(1)).unwrap();
975 wal.commit(begin).unwrap();
976 wal.flush().unwrap(); assert_eq!(wal.durable_lsn(), Lsn::ZERO);
978
979 wal.force_fsync().unwrap();
982 assert_eq!(wal.durable_lsn().raw(), wal.next_lsn().raw() - 1);
983 }
984
985 #[test]
986 fn truncate_up_to_drops_old_sealed_segments() {
987 let dir = TmpDir::new("truncate");
988
989 let (wal, _) = Wal::open(&dir.path, SyncMode::PerCommit, 64, Lsn::ZERO).unwrap();
991
992 let mut last_commit = Lsn::ZERO;
993 for i in 0..5 {
994 let b = wal.begin().unwrap();
995 wal.append(b, &ev(i)).unwrap();
996 last_commit = wal.commit(b).unwrap();
997 wal.flush().unwrap();
998 }
999 assert!(
1001 wal.active_segment_id() >= 4,
1002 "expected several rotations, got {}",
1003 wal.active_segment_id()
1004 );
1005
1006 let segments = SegmentDir::new(&dir.path);
1007 let before = segments.list().unwrap().len();
1008 wal.truncate_up_to(last_commit).unwrap();
1009 let after = segments.list().unwrap().len();
1010
1011 assert!(
1012 after < before,
1013 "truncate_up_to should have dropped at least one segment ({} → {})",
1014 before,
1015 after
1016 );
1017 assert!(
1019 after >= 2,
1020 "active and the segment preceding it must be kept"
1021 );
1022
1023 drop(wal);
1028 let (_, replay) = Wal::open(&dir.path, SyncMode::PerCommit, 64, last_commit).unwrap();
1029 assert!(replay.is_empty());
1031 }
1032}