1use std::fs;
20use std::path::{Path, PathBuf};
21
22use tracing::info;
23
24use crate::error::{Result, WalError};
25use crate::record::WalRecord;
26use crate::segment::{
27 DEFAULT_SEGMENT_TARGET_SIZE, SegmentMeta, TruncateResult, discover_segments, segment_path,
28 truncate_segments,
29};
30use crate::writer::{WalWriter, WalWriterConfig};
31
32#[derive(Debug, Clone)]
34pub struct SegmentedWalConfig {
35 pub wal_dir: PathBuf,
37
38 pub segment_target_size: u64,
42
43 pub writer_config: WalWriterConfig,
45}
46
47impl SegmentedWalConfig {
48 pub fn new(wal_dir: PathBuf) -> Self {
50 Self {
51 wal_dir,
52 segment_target_size: DEFAULT_SEGMENT_TARGET_SIZE,
53 writer_config: WalWriterConfig::default(),
54 }
55 }
56
57 pub fn for_testing(wal_dir: PathBuf) -> Self {
59 Self {
60 wal_dir,
61 segment_target_size: DEFAULT_SEGMENT_TARGET_SIZE,
62 writer_config: WalWriterConfig {
63 use_direct_io: false,
64 ..Default::default()
65 },
66 }
67 }
68}
69
70pub struct SegmentedWal {
76 wal_dir: PathBuf,
78
79 writer: WalWriter,
81
82 active_first_lsn: u64,
84
85 segment_target_size: u64,
87
88 writer_config: WalWriterConfig,
90
91 encryption_ring: Option<crate::crypto::KeyRing>,
93}
94
95impl SegmentedWal {
96 pub fn open(config: SegmentedWalConfig) -> Result<Self> {
102 fs::create_dir_all(&config.wal_dir).map_err(WalError::Io)?;
103
104 let segments = discover_segments(&config.wal_dir)?;
105
106 let (writer, active_first_lsn) = if segments.is_empty() {
107 let path = segment_path(&config.wal_dir, 1);
109 let writer = WalWriter::open(&path, config.writer_config.clone())?;
110 (writer, 1u64)
111 } else {
112 let last = &segments[segments.len() - 1];
114 let writer = WalWriter::open(&last.path, config.writer_config.clone())?;
115 (writer, last.first_lsn)
116 };
117
118 info!(
119 wal_dir = %config.wal_dir.display(),
120 segments = segments.len().max(1),
121 active_first_lsn,
122 next_lsn = writer.next_lsn(),
123 "segmented WAL opened"
124 );
125
126 Ok(Self {
127 wal_dir: config.wal_dir,
128 writer,
129 active_first_lsn,
130 segment_target_size: config.segment_target_size,
131 writer_config: config.writer_config,
132 encryption_ring: None,
133 })
134 }
135
136 pub fn set_encryption_ring(&mut self, ring: crate::crypto::KeyRing) -> Result<()> {
141 self.writer.set_encryption_ring(ring.clone())?;
142 self.encryption_ring = Some(ring);
143 Ok(())
144 }
145
146 pub fn encryption_ring(&self) -> Option<&crate::crypto::KeyRing> {
148 self.encryption_ring.as_ref()
149 }
150
151 pub fn append(
156 &mut self,
157 record_type: u32,
158 tenant_id: u64,
159 vshard_id: u32,
160 payload: &[u8],
161 ) -> Result<u64> {
162 if self.writer.file_offset() >= self.segment_target_size {
164 self.roll_segment()?;
165 }
166
167 self.writer
168 .append(record_type, tenant_id, vshard_id, payload)
169 }
170
171 pub fn sync(&mut self) -> Result<()> {
173 self.writer.sync()
174 }
175
176 pub fn next_lsn(&self) -> u64 {
178 self.writer.next_lsn()
179 }
180
181 pub fn active_segment_first_lsn(&self) -> u64 {
183 self.active_first_lsn
184 }
185
186 pub fn wal_dir(&self) -> &Path {
188 &self.wal_dir
189 }
190
191 pub fn truncate_before(&self, checkpoint_lsn: u64) -> Result<TruncateResult> {
198 truncate_segments(&self.wal_dir, checkpoint_lsn, self.active_first_lsn)
199 }
200
201 pub fn replay(&self) -> Result<Vec<WalRecord>> {
205 replay_all_segments(&self.wal_dir)
206 }
207
208 pub fn replay_from(&self, from_lsn: u64) -> Result<Vec<WalRecord>> {
213 let all = self.replay()?;
214 Ok(all
215 .into_iter()
216 .filter(|r| r.header.lsn >= from_lsn)
217 .collect())
218 }
219
220 pub fn replay_from_limit(
222 &self,
223 from_lsn: u64,
224 max_records: usize,
225 ) -> Result<(Vec<WalRecord>, bool)> {
226 replay_from_limit_dir(&self.wal_dir, from_lsn, max_records)
227 }
228
229 pub fn list_segments(&self) -> Result<Vec<SegmentMeta>> {
231 discover_segments(&self.wal_dir)
232 }
233
234 pub fn total_size_bytes(&self) -> Result<u64> {
236 let segments = discover_segments(&self.wal_dir)?;
237 Ok(segments.iter().map(|s| s.file_size).sum())
238 }
239
240 fn roll_segment(&mut self) -> Result<()> {
242 self.writer.seal()?;
244
245 let new_first_lsn = self.writer.next_lsn();
247 let new_path = segment_path(&self.wal_dir, new_first_lsn);
248
249 let mut new_writer =
250 WalWriter::open_with_start_lsn(&new_path, self.writer_config.clone(), new_first_lsn)?;
251
252 if let Some(ref ring) = self.encryption_ring {
257 let fresh_key = ring.current().with_fresh_epoch()?;
258 let new_ring = crate::crypto::KeyRing::new(fresh_key);
259 new_writer.set_encryption_ring(new_ring.clone())?;
260 self.encryption_ring = Some(new_ring);
261 }
262
263 self.writer = new_writer;
264 self.active_first_lsn = new_first_lsn;
265
266 let _ = crate::segment::fsync_directory(&self.wal_dir);
270
271 info!(
272 segment = %new_path.display(),
273 first_lsn = new_first_lsn,
274 "rolled to new WAL segment"
275 );
276
277 Ok(())
278 }
279}
280
281pub fn replay_all_segments(wal_dir: &Path) -> Result<Vec<WalRecord>> {
286 let segments = discover_segments(wal_dir)?;
287 let mut all_records = Vec::new();
288
289 for seg in &segments {
290 let reader = crate::reader::WalReader::open(&seg.path)?;
291 for record_result in reader.records() {
292 all_records.push(record_result?);
293 }
294 }
295
296 Ok(all_records)
297}
298
299pub fn replay_from_limit_dir(
307 wal_dir: &Path,
308 from_lsn: u64,
309 max_records: usize,
310) -> Result<(Vec<WalRecord>, bool)> {
311 let segments = discover_segments(wal_dir)?;
312 let mut records = Vec::with_capacity(max_records.min(4096));
313
314 for seg in &segments {
315 let reader = crate::reader::WalReader::open(&seg.path)?;
316 for record_result in reader.records() {
317 let record = record_result?;
318 if record.header.lsn >= from_lsn {
319 records.push(record);
320 if records.len() >= max_records {
321 return Ok((records, true));
322 }
323 }
324 }
325 }
326
327 Ok((records, false))
328}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333 use crate::record::RecordType;
334
335 fn test_config(dir: &Path) -> SegmentedWalConfig {
336 SegmentedWalConfig::for_testing(dir.to_path_buf())
337 }
338
339 #[test]
340 fn create_and_append() {
341 let dir = tempfile::tempdir().unwrap();
342 let wal_dir = dir.path().join("wal");
343
344 let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
345 let lsn1 = wal.append(RecordType::Put as u32, 1, 0, b"hello").unwrap();
346 let lsn2 = wal.append(RecordType::Put as u32, 1, 0, b"world").unwrap();
347 wal.sync().unwrap();
348
349 assert_eq!(lsn1, 1);
350 assert_eq!(lsn2, 2);
351 assert_eq!(wal.next_lsn(), 3);
352 }
353
354 #[test]
355 fn replay_after_close() {
356 let dir = tempfile::tempdir().unwrap();
357 let wal_dir = dir.path().join("wal");
358
359 {
361 let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
362 wal.append(RecordType::Put as u32, 1, 0, b"first").unwrap();
363 wal.append(RecordType::Delete as u32, 2, 1, b"second")
364 .unwrap();
365 wal.append(RecordType::Put as u32, 1, 0, b"third").unwrap();
366 wal.sync().unwrap();
367 }
368
369 let wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
371 let records = wal.replay().unwrap();
372 assert_eq!(records.len(), 3);
373 assert_eq!(records[0].payload, b"first");
374 assert_eq!(records[1].payload, b"second");
375 assert_eq!(records[2].payload, b"third");
376 assert_eq!(wal.next_lsn(), 4);
377 }
378
379 #[test]
380 fn automatic_segment_rollover() {
381 let dir = tempfile::tempdir().unwrap();
382 let wal_dir = dir.path().join("wal");
383
384 let config = SegmentedWalConfig {
386 wal_dir: wal_dir.clone(),
387 segment_target_size: 100, writer_config: WalWriterConfig {
389 use_direct_io: false,
390 ..Default::default()
391 },
392 };
393
394 let mut wal = SegmentedWal::open(config).unwrap();
395
396 for i in 0..20u32 {
398 let payload = format!("record-{i:04}");
399 wal.append(RecordType::Put as u32, 1, 0, payload.as_bytes())
400 .unwrap();
401 wal.sync().unwrap();
402 }
403
404 let segments = wal.list_segments().unwrap();
406 assert!(
407 segments.len() > 1,
408 "expected multiple segments, got {}",
409 segments.len()
410 );
411
412 let records = wal.replay().unwrap();
414 assert_eq!(records.len(), 20);
415 for (i, record) in records.iter().enumerate() {
416 assert_eq!(record.header.lsn, (i + 1) as u64);
417 let expected = format!("record-{i:04}");
418 assert_eq!(record.payload, expected.as_bytes());
419 }
420 }
421
422 #[test]
423 fn truncation_removes_old_segments() {
424 let dir = tempfile::tempdir().unwrap();
425 let wal_dir = dir.path().join("wal");
426
427 let config = SegmentedWalConfig {
428 wal_dir: wal_dir.clone(),
429 segment_target_size: 100,
430 writer_config: WalWriterConfig {
431 use_direct_io: false,
432 ..Default::default()
433 },
434 };
435
436 let mut wal = SegmentedWal::open(config).unwrap();
437
438 for i in 0..20u32 {
440 let payload = format!("record-{i:04}");
441 wal.append(RecordType::Put as u32, 1, 0, payload.as_bytes())
442 .unwrap();
443 wal.sync().unwrap();
444 }
445
446 let segments_before = wal.list_segments().unwrap();
447 assert!(segments_before.len() > 1);
448
449 let result = wal.truncate_before(15).unwrap();
451 assert!(result.segments_deleted > 0);
452 assert!(result.bytes_reclaimed > 0);
453
454 let segments_after = wal.list_segments().unwrap();
455 assert!(segments_after.len() < segments_before.len());
456
457 let records = wal.replay().unwrap();
459 assert!(records.iter().any(|r| r.header.lsn >= 15));
461 }
462
463 #[test]
464 fn replay_from_checkpoint_lsn() {
465 let dir = tempfile::tempdir().unwrap();
466 let wal_dir = dir.path().join("wal");
467
468 let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
469 for i in 0..10u32 {
470 wal.append(RecordType::Put as u32, 1, 0, format!("r{i}").as_bytes())
471 .unwrap();
472 }
473 wal.sync().unwrap();
474
475 let records = wal.replay_from(6).unwrap();
477 assert_eq!(records.len(), 5);
478 assert_eq!(records[0].header.lsn, 6);
479 assert_eq!(records[4].header.lsn, 10);
480 }
481
482 #[test]
483 fn total_size_bytes() {
484 let dir = tempfile::tempdir().unwrap();
485 let wal_dir = dir.path().join("wal");
486
487 let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
488 wal.append(RecordType::Put as u32, 1, 0, b"data").unwrap();
489 wal.sync().unwrap();
490
491 let size = wal.total_size_bytes().unwrap();
492 assert!(size > 0);
493 }
494
495 #[test]
496 fn reopen_continues_lsn() {
497 let dir = tempfile::tempdir().unwrap();
498 let wal_dir = dir.path().join("wal");
499
500 {
501 let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
502 wal.append(RecordType::Put as u32, 1, 0, b"a").unwrap();
503 wal.append(RecordType::Put as u32, 1, 0, b"b").unwrap();
504 wal.sync().unwrap();
505 }
506
507 {
508 let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
509 assert_eq!(wal.next_lsn(), 3);
510 let lsn = wal.append(RecordType::Put as u32, 1, 0, b"c").unwrap();
511 assert_eq!(lsn, 3);
512 wal.sync().unwrap();
513 }
514
515 let wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
516 let records = wal.replay().unwrap();
517 assert_eq!(records.len(), 3);
518 }
519
520 #[test]
521 fn encryption_persists_across_segments() {
522 let dir = tempfile::tempdir().unwrap();
531 let wal_dir = dir.path().join("wal");
532 let key_bytes = [42u8; 32];
533
534 let config = SegmentedWalConfig {
535 wal_dir: wal_dir.clone(),
536 segment_target_size: 100,
537 writer_config: WalWriterConfig {
538 use_direct_io: false,
539 ..Default::default()
540 },
541 };
542
543 {
545 let key = crate::crypto::WalEncryptionKey::from_bytes(&key_bytes).unwrap();
546 let ring = crate::crypto::KeyRing::new(key);
547 let mut wal = SegmentedWal::open(config.clone()).unwrap();
548 wal.set_encryption_ring(ring).unwrap();
549
550 for i in 0..10u32 {
551 wal.append(RecordType::Put as u32, 1, 0, format!("enc-{i}").as_bytes())
552 .unwrap();
553 wal.sync().unwrap();
554 }
555 assert!(wal.list_segments().unwrap().len() > 1);
556 }
557
558 let segments = crate::segment::discover_segments(&wal_dir).unwrap();
560 assert!(
561 segments.len() > 1,
562 "expected multiple segments after rollover"
563 );
564
565 let key_for_read = crate::crypto::WalEncryptionKey::from_bytes(&key_bytes).unwrap();
566 let ring_for_read = crate::crypto::KeyRing::new(key_for_read);
567
568 let mut all_payloads = Vec::new();
569
570 for seg in &segments {
572 let reader = crate::reader::WalReader::open(&seg.path).unwrap();
573 let epoch = *reader
575 .segment_preamble()
576 .expect("encrypted segment must have a preamble")
577 .epoch();
578 let preamble_bytes = reader.segment_preamble().unwrap().to_bytes();
579
580 for record_result in reader.records() {
581 let record = record_result.unwrap();
582 assert!(record.is_encrypted(), "all records should be encrypted");
583 let plaintext = record
584 .decrypt_payload_ring(&epoch, Some(&preamble_bytes), Some(&ring_for_read))
585 .unwrap();
586 all_payloads.push(plaintext);
587 }
588 }
589
590 assert_eq!(all_payloads.len(), 10);
591 for (i, payload) in all_payloads.iter().enumerate() {
592 assert_eq!(payload, format!("enc-{i}").as_bytes());
593 }
594 }
595
596 #[test]
602 fn wal_encrypted_restart_roundtrip() {
603 let dir = tempfile::tempdir().unwrap();
604 let wal_dir = dir.path().join("wal");
605 let key_bytes = [0xABu8; 32];
606
607 let config = SegmentedWalConfig::for_testing(wal_dir.clone());
608
609 {
611 let key = crate::crypto::WalEncryptionKey::from_bytes(&key_bytes).unwrap();
612 let ring = crate::crypto::KeyRing::new(key);
613 let mut wal = SegmentedWal::open(config.clone()).unwrap();
614 wal.set_encryption_ring(ring).unwrap();
615
616 for i in 0..5u32 {
617 wal.append(
618 RecordType::Put as u32,
619 1,
620 0,
621 format!("restart-{i}").as_bytes(),
622 )
623 .unwrap();
624 }
625 wal.sync().unwrap();
626 }
627
628 let key_restart = crate::crypto::WalEncryptionKey::from_bytes(&key_bytes).unwrap();
630 let ring_restart = crate::crypto::KeyRing::new(key_restart);
631
632 let segments = crate::segment::discover_segments(&wal_dir).unwrap();
634 let mut payloads = Vec::new();
635
636 for seg in &segments {
637 let reader = crate::reader::WalReader::open(&seg.path).unwrap();
638 let epoch = *reader
639 .segment_preamble()
640 .expect("segment must have preamble after encrypted write")
641 .epoch();
642 let preamble_bytes = reader.segment_preamble().unwrap().to_bytes();
643
644 for record_result in reader.records() {
645 let record = record_result.unwrap();
646 let pt = record
647 .decrypt_payload_ring(&epoch, Some(&preamble_bytes), Some(&ring_restart))
648 .unwrap();
649 payloads.push(pt);
650 }
651 }
652
653 assert_eq!(payloads.len(), 5);
654 for (i, pt) in payloads.iter().enumerate() {
655 assert_eq!(pt, format!("restart-{i}").as_bytes());
656 }
657 }
658
659 #[test]
664 fn epoch_tamper_rejected() {
665 let dir = tempfile::tempdir().unwrap();
666 let wal_dir = dir.path().join("wal");
667 let key_bytes = [0x55u8; 32];
668
669 let config = SegmentedWalConfig::for_testing(wal_dir.clone());
670
671 {
672 let key = crate::crypto::WalEncryptionKey::from_bytes(&key_bytes).unwrap();
673 let ring = crate::crypto::KeyRing::new(key);
674 let mut wal = SegmentedWal::open(config).unwrap();
675 wal.set_encryption_ring(ring).unwrap();
676 wal.append(RecordType::Put as u32, 1, 0, b"sensitive payload")
677 .unwrap();
678 wal.sync().unwrap();
679 }
680
681 let segments = crate::segment::discover_segments(&wal_dir).unwrap();
683 assert_eq!(segments.len(), 1);
684 let seg_path = &segments[0].path;
685
686 let mut raw = std::fs::read(seg_path).unwrap();
687 raw[9] ^= 0xFF;
689 std::fs::write(seg_path, &raw).unwrap();
690
691 let key_read = crate::crypto::WalEncryptionKey::from_bytes(&key_bytes).unwrap();
695 let ring_read = crate::crypto::KeyRing::new(key_read);
696
697 let reader = crate::reader::WalReader::open(seg_path).unwrap();
698 let epoch = *reader.segment_preamble().unwrap().epoch();
699 let preamble_bytes = reader.segment_preamble().unwrap().to_bytes();
700
701 let record = reader.records().next().unwrap().unwrap();
702 let result = record.decrypt_payload_ring(&epoch, Some(&preamble_bytes), Some(&ring_read));
703 assert!(
704 result.is_err(),
705 "decryption with tampered preamble epoch must fail"
706 );
707 }
708
709 #[test]
710 fn replay_from_limit_basic() {
711 let dir = tempfile::tempdir().unwrap();
712 let config = test_config(dir.path());
713 let mut wal = SegmentedWal::open(config).unwrap();
714
715 for i in 0..10u8 {
717 wal.append(RecordType::Put as u32, 1, 0, &[i]).unwrap();
718 }
719 wal.sync().unwrap();
720
721 let (records, has_more) = wal.replay_from_limit(1, 100).unwrap();
723 assert_eq!(records.len(), 10);
724 assert!(!has_more);
725
726 let (records, has_more) = wal.replay_from_limit(1, 3).unwrap();
728 assert_eq!(records.len(), 3);
729 assert!(has_more);
730 assert_eq!(records[0].header.lsn, 1);
731 assert_eq!(records[2].header.lsn, 3);
732 }
733
734 #[test]
735 fn replay_from_limit_with_lsn_filter() {
736 let dir = tempfile::tempdir().unwrap();
737 let config = test_config(dir.path());
738 let mut wal = SegmentedWal::open(config).unwrap();
739
740 for i in 0..10u8 {
741 wal.append(RecordType::Put as u32, 1, 0, &[i]).unwrap();
742 }
743 wal.sync().unwrap();
744
745 let (records, has_more) = wal.replay_from_limit(6, 100).unwrap();
747 assert_eq!(records.len(), 5);
748 assert!(!has_more);
749 assert_eq!(records[0].header.lsn, 6);
750
751 let (records, has_more) = wal.replay_from_limit(6, 2).unwrap();
753 assert_eq!(records.len(), 2);
754 assert!(has_more);
755 }
756
757 #[test]
758 fn replay_from_limit_empty() {
759 let dir = tempfile::tempdir().unwrap();
760 let config = test_config(dir.path());
761 let mut wal = SegmentedWal::open(config).unwrap();
762
763 wal.append(RecordType::Put as u32, 1, 0, b"a").unwrap();
764 wal.sync().unwrap();
765
766 let (records, has_more) = wal.replay_from_limit(999, 100).unwrap();
768 assert!(records.is_empty());
769 assert!(!has_more);
770 }
771
772 #[test]
773 fn replay_from_limit_across_segments() {
774 let dir = tempfile::tempdir().unwrap();
775 let config = test_config(dir.path());
776 let mut wal = SegmentedWal::open(config).unwrap();
777
778 for i in 0..10u8 {
780 wal.append(RecordType::Put as u32, 1, 0, &[i]).unwrap();
781 }
782 wal.sync().unwrap();
783 wal.roll_segment().unwrap();
785
786 for i in 10..20u8 {
788 wal.append(RecordType::Put as u32, 1, 0, &[i]).unwrap();
789 }
790 wal.sync().unwrap();
791
792 let seg_count = wal.list_segments().unwrap().len();
793 assert!(
794 seg_count >= 2,
795 "expected multiple segments, got {seg_count}"
796 );
797
798 let (records, has_more) = wal.replay_from_limit(1, 5).unwrap();
800 assert_eq!(records.len(), 5);
801 assert!(has_more);
802
803 let next_lsn = records.last().unwrap().header.lsn + 1;
805 let (records2, _) = wal.replay_from_limit(next_lsn, 200).unwrap();
806 assert_eq!(records2.len(), 15); }
808}