1use std::fs;
20use std::path::{Path, PathBuf};
21
22use tracing::info;
23
24use crate::error::{Result, WalError};
25use crate::record::WalRecord;
26use crate::segment::{
27 DEFAULT_SEGMENT_TARGET_SIZE, SegmentMeta, TruncateResult, discover_segments, segment_path,
28 truncate_segments,
29};
30use crate::writer::{WalWriter, WalWriterConfig};
31
32#[derive(Debug, Clone)]
34pub struct SegmentedWalConfig {
35 pub wal_dir: PathBuf,
37
38 pub segment_target_size: u64,
42
43 pub writer_config: WalWriterConfig,
45}
46
47impl SegmentedWalConfig {
48 pub fn new(wal_dir: PathBuf) -> Self {
50 Self {
51 wal_dir,
52 segment_target_size: DEFAULT_SEGMENT_TARGET_SIZE,
53 writer_config: WalWriterConfig::default(),
54 }
55 }
56
57 pub fn for_testing(wal_dir: PathBuf) -> Self {
59 Self {
60 wal_dir,
61 segment_target_size: DEFAULT_SEGMENT_TARGET_SIZE,
62 writer_config: WalWriterConfig {
63 use_direct_io: false,
64 ..Default::default()
65 },
66 }
67 }
68}
69
70pub struct SegmentedWal {
76 wal_dir: PathBuf,
78
79 writer: WalWriter,
81
82 active_first_lsn: u64,
84
85 segment_target_size: u64,
87
88 writer_config: WalWriterConfig,
90
91 encryption_ring: Option<crate::crypto::KeyRing>,
93}
94
95impl SegmentedWal {
96 pub fn open(config: SegmentedWalConfig) -> Result<Self> {
102 fs::create_dir_all(&config.wal_dir).map_err(WalError::Io)?;
103
104 let segments = discover_segments(&config.wal_dir)?;
105
106 let (writer, active_first_lsn) = if segments.is_empty() {
107 let path = segment_path(&config.wal_dir, 1);
109 let writer = WalWriter::open(&path, config.writer_config.clone())?;
110 (writer, 1u64)
111 } else {
112 let last = &segments[segments.len() - 1];
114 let writer = WalWriter::open(&last.path, config.writer_config.clone())?;
115 (writer, last.first_lsn)
116 };
117
118 info!(
119 wal_dir = %config.wal_dir.display(),
120 segments = segments.len().max(1),
121 active_first_lsn,
122 next_lsn = writer.next_lsn(),
123 "segmented WAL opened"
124 );
125
126 Ok(Self {
127 wal_dir: config.wal_dir,
128 writer,
129 active_first_lsn,
130 segment_target_size: config.segment_target_size,
131 writer_config: config.writer_config,
132 encryption_ring: None,
133 })
134 }
135
136 pub fn set_encryption_ring(&mut self, ring: crate::crypto::KeyRing) -> Result<()> {
141 self.writer.set_encryption_ring(ring.clone())?;
142 self.encryption_ring = Some(ring);
143 Ok(())
144 }
145
146 pub fn encryption_ring(&self) -> Option<&crate::crypto::KeyRing> {
148 self.encryption_ring.as_ref()
149 }
150
151 pub fn append(
159 &mut self,
160 record_type: u32,
161 tenant_id: u64,
162 vshard_id: u32,
163 database_id: u64,
164 payload: &[u8],
165 ) -> Result<u64> {
166 if self.writer.file_offset() >= self.segment_target_size {
168 self.roll_segment()?;
169 }
170
171 self.writer
172 .append(record_type, tenant_id, vshard_id, database_id, payload)
173 }
174
175 pub fn sync(&mut self) -> Result<()> {
177 self.writer.sync()
178 }
179
180 pub fn next_lsn(&self) -> u64 {
182 self.writer.next_lsn()
183 }
184
185 pub fn active_segment_first_lsn(&self) -> u64 {
187 self.active_first_lsn
188 }
189
190 pub fn wal_dir(&self) -> &Path {
192 &self.wal_dir
193 }
194
195 pub fn truncate_before(&self, checkpoint_lsn: u64) -> Result<TruncateResult> {
202 truncate_segments(&self.wal_dir, checkpoint_lsn, self.active_first_lsn)
203 }
204
205 pub fn replay(&self) -> Result<Vec<WalRecord>> {
209 replay_all_segments(&self.wal_dir)
210 }
211
212 pub fn replay_from(&self, from_lsn: u64) -> Result<Vec<WalRecord>> {
217 let all = self.replay()?;
218 Ok(all
219 .into_iter()
220 .filter(|r| r.header.lsn >= from_lsn)
221 .collect())
222 }
223
224 pub fn replay_from_limit(
226 &self,
227 from_lsn: u64,
228 max_records: usize,
229 ) -> Result<(Vec<WalRecord>, bool)> {
230 replay_from_limit_dir(&self.wal_dir, from_lsn, max_records)
231 }
232
233 pub fn list_segments(&self) -> Result<Vec<SegmentMeta>> {
235 discover_segments(&self.wal_dir)
236 }
237
238 pub fn total_size_bytes(&self) -> Result<u64> {
240 let segments = discover_segments(&self.wal_dir)?;
241 Ok(segments.iter().map(|s| s.file_size).sum())
242 }
243
244 fn roll_segment(&mut self) -> Result<()> {
246 self.writer.seal()?;
248
249 let new_first_lsn = self.writer.next_lsn();
251 let new_path = segment_path(&self.wal_dir, new_first_lsn);
252
253 let mut new_writer =
254 WalWriter::open_with_start_lsn(&new_path, self.writer_config.clone(), new_first_lsn)?;
255
256 if let Some(ref ring) = self.encryption_ring {
261 let fresh_key = ring.current().with_fresh_epoch()?;
262 let new_ring = crate::crypto::KeyRing::new(fresh_key);
263 new_writer.set_encryption_ring(new_ring.clone())?;
264 self.encryption_ring = Some(new_ring);
265 }
266
267 self.writer = new_writer;
268 self.active_first_lsn = new_first_lsn;
269
270 let _ = crate::segment::fsync_directory(&self.wal_dir);
274
275 info!(
276 segment = %new_path.display(),
277 first_lsn = new_first_lsn,
278 "rolled to new WAL segment"
279 );
280
281 Ok(())
282 }
283}
284
285pub fn replay_all_segments(wal_dir: &Path) -> Result<Vec<WalRecord>> {
290 let segments = discover_segments(wal_dir)?;
291 let mut all_records = Vec::new();
292
293 for seg in &segments {
294 let reader = crate::reader::WalReader::open(&seg.path)?;
295 for record_result in reader.records() {
296 all_records.push(record_result?);
297 }
298 }
299
300 Ok(all_records)
301}
302
303pub fn replay_from_limit_dir(
311 wal_dir: &Path,
312 from_lsn: u64,
313 max_records: usize,
314) -> Result<(Vec<WalRecord>, bool)> {
315 let segments = discover_segments(wal_dir)?;
316 let mut records = Vec::with_capacity(max_records.min(4096));
317
318 for seg in &segments {
319 let reader = crate::reader::WalReader::open(&seg.path)?;
320 for record_result in reader.records() {
321 let record = record_result?;
322 if record.header.lsn >= from_lsn {
323 records.push(record);
324 if records.len() >= max_records {
325 return Ok((records, true));
326 }
327 }
328 }
329 }
330
331 Ok((records, false))
332}
333
334#[cfg(test)]
335mod tests {
336 use super::*;
337 use crate::record::RecordType;
338
339 fn test_config(dir: &Path) -> SegmentedWalConfig {
340 SegmentedWalConfig::for_testing(dir.to_path_buf())
341 }
342
343 #[test]
344 fn create_and_append() {
345 let dir = tempfile::tempdir().unwrap();
346 let wal_dir = dir.path().join("wal");
347
348 let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
349 let lsn1 = wal
350 .append(RecordType::Put as u32, 1, 0, 0, b"hello")
351 .unwrap();
352 let lsn2 = wal
353 .append(RecordType::Put as u32, 1, 0, 0, b"world")
354 .unwrap();
355 wal.sync().unwrap();
356
357 assert_eq!(lsn1, 1);
358 assert_eq!(lsn2, 2);
359 assert_eq!(wal.next_lsn(), 3);
360 }
361
362 #[test]
363 fn replay_after_close() {
364 let dir = tempfile::tempdir().unwrap();
365 let wal_dir = dir.path().join("wal");
366
367 {
369 let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
370 wal.append(RecordType::Put as u32, 1, 0, 0, b"first")
371 .unwrap();
372 wal.append(RecordType::Delete as u32, 2, 1, 0, b"second")
373 .unwrap();
374 wal.append(RecordType::Put as u32, 1, 0, 0, b"third")
375 .unwrap();
376 wal.sync().unwrap();
377 }
378
379 let wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
381 let records = wal.replay().unwrap();
382 assert_eq!(records.len(), 3);
383 assert_eq!(records[0].payload, b"first");
384 assert_eq!(records[1].payload, b"second");
385 assert_eq!(records[2].payload, b"third");
386 assert_eq!(wal.next_lsn(), 4);
387 }
388
389 #[test]
390 fn automatic_segment_rollover() {
391 let dir = tempfile::tempdir().unwrap();
392 let wal_dir = dir.path().join("wal");
393
394 let config = SegmentedWalConfig {
396 wal_dir: wal_dir.clone(),
397 segment_target_size: 100, writer_config: WalWriterConfig {
399 use_direct_io: false,
400 ..Default::default()
401 },
402 };
403
404 let mut wal = SegmentedWal::open(config).unwrap();
405
406 for i in 0..20u32 {
408 let payload = format!("record-{i:04}");
409 wal.append(RecordType::Put as u32, 1, 0, 0, payload.as_bytes())
410 .unwrap();
411 wal.sync().unwrap();
412 }
413
414 let segments = wal.list_segments().unwrap();
416 assert!(
417 segments.len() > 1,
418 "expected multiple segments, got {}",
419 segments.len()
420 );
421
422 let records = wal.replay().unwrap();
424 assert_eq!(records.len(), 20);
425 for (i, record) in records.iter().enumerate() {
426 assert_eq!(record.header.lsn, (i + 1) as u64);
427 let expected = format!("record-{i:04}");
428 assert_eq!(record.payload, expected.as_bytes());
429 }
430 }
431
432 #[test]
433 fn truncation_removes_old_segments() {
434 let dir = tempfile::tempdir().unwrap();
435 let wal_dir = dir.path().join("wal");
436
437 let config = SegmentedWalConfig {
438 wal_dir: wal_dir.clone(),
439 segment_target_size: 100,
440 writer_config: WalWriterConfig {
441 use_direct_io: false,
442 ..Default::default()
443 },
444 };
445
446 let mut wal = SegmentedWal::open(config).unwrap();
447
448 for i in 0..20u32 {
450 let payload = format!("record-{i:04}");
451 wal.append(RecordType::Put as u32, 1, 0, 0, payload.as_bytes())
452 .unwrap();
453 wal.sync().unwrap();
454 }
455
456 let segments_before = wal.list_segments().unwrap();
457 assert!(segments_before.len() > 1);
458
459 let result = wal.truncate_before(15).unwrap();
461 assert!(result.segments_deleted > 0);
462 assert!(result.bytes_reclaimed > 0);
463
464 let segments_after = wal.list_segments().unwrap();
465 assert!(segments_after.len() < segments_before.len());
466
467 let records = wal.replay().unwrap();
469 assert!(records.iter().any(|r| r.header.lsn >= 15));
471 }
472
473 #[test]
474 fn replay_from_checkpoint_lsn() {
475 let dir = tempfile::tempdir().unwrap();
476 let wal_dir = dir.path().join("wal");
477
478 let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
479 for i in 0..10u32 {
480 wal.append(RecordType::Put as u32, 1, 0, 0, format!("r{i}").as_bytes())
481 .unwrap();
482 }
483 wal.sync().unwrap();
484
485 let records = wal.replay_from(6).unwrap();
487 assert_eq!(records.len(), 5);
488 assert_eq!(records[0].header.lsn, 6);
489 assert_eq!(records[4].header.lsn, 10);
490 }
491
492 #[test]
493 fn total_size_bytes() {
494 let dir = tempfile::tempdir().unwrap();
495 let wal_dir = dir.path().join("wal");
496
497 let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
498 wal.append(RecordType::Put as u32, 1, 0, 0, b"data")
499 .unwrap();
500 wal.sync().unwrap();
501
502 let size = wal.total_size_bytes().unwrap();
503 assert!(size > 0);
504 }
505
506 #[test]
507 fn reopen_continues_lsn() {
508 let dir = tempfile::tempdir().unwrap();
509 let wal_dir = dir.path().join("wal");
510
511 {
512 let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
513 wal.append(RecordType::Put as u32, 1, 0, 0, b"a").unwrap();
514 wal.append(RecordType::Put as u32, 1, 0, 0, b"b").unwrap();
515 wal.sync().unwrap();
516 }
517
518 {
519 let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
520 assert_eq!(wal.next_lsn(), 3);
521 let lsn = wal.append(RecordType::Put as u32, 1, 0, 0, b"c").unwrap();
522 assert_eq!(lsn, 3);
523 wal.sync().unwrap();
524 }
525
526 let wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
527 let records = wal.replay().unwrap();
528 assert_eq!(records.len(), 3);
529 }
530
531 #[test]
532 fn encryption_persists_across_segments() {
533 let dir = tempfile::tempdir().unwrap();
542 let wal_dir = dir.path().join("wal");
543 let key_bytes = [42u8; 32];
544
545 let config = SegmentedWalConfig {
546 wal_dir: wal_dir.clone(),
547 segment_target_size: 100,
548 writer_config: WalWriterConfig {
549 use_direct_io: false,
550 ..Default::default()
551 },
552 };
553
554 {
556 let key = crate::crypto::WalEncryptionKey::from_bytes(&key_bytes).unwrap();
557 let ring = crate::crypto::KeyRing::new(key);
558 let mut wal = SegmentedWal::open(config.clone()).unwrap();
559 wal.set_encryption_ring(ring).unwrap();
560
561 for i in 0..10u32 {
562 wal.append(
563 RecordType::Put as u32,
564 1,
565 0,
566 0,
567 format!("enc-{i}").as_bytes(),
568 )
569 .unwrap();
570 wal.sync().unwrap();
571 }
572 assert!(wal.list_segments().unwrap().len() > 1);
573 }
574
575 let segments = crate::segment::discover_segments(&wal_dir).unwrap();
577 assert!(
578 segments.len() > 1,
579 "expected multiple segments after rollover"
580 );
581
582 let key_for_read = crate::crypto::WalEncryptionKey::from_bytes(&key_bytes).unwrap();
583 let ring_for_read = crate::crypto::KeyRing::new(key_for_read);
584
585 let mut all_payloads = Vec::new();
586
587 for seg in &segments {
589 let reader = crate::reader::WalReader::open(&seg.path).unwrap();
590 let epoch = *reader
592 .segment_preamble()
593 .expect("encrypted segment must have a preamble")
594 .epoch();
595 let preamble_bytes = reader.segment_preamble().unwrap().to_bytes();
596
597 for record_result in reader.records() {
598 let record = record_result.unwrap();
599 assert!(record.is_encrypted(), "all records should be encrypted");
600 let plaintext = record
601 .decrypt_payload_ring(&epoch, Some(&preamble_bytes), Some(&ring_for_read))
602 .unwrap();
603 all_payloads.push(plaintext);
604 }
605 }
606
607 assert_eq!(all_payloads.len(), 10);
608 for (i, payload) in all_payloads.iter().enumerate() {
609 assert_eq!(payload, format!("enc-{i}").as_bytes());
610 }
611 }
612
613 #[test]
619 fn wal_encrypted_restart_roundtrip() {
620 let dir = tempfile::tempdir().unwrap();
621 let wal_dir = dir.path().join("wal");
622 let key_bytes = [0xABu8; 32];
623
624 let config = SegmentedWalConfig::for_testing(wal_dir.clone());
625
626 {
628 let key = crate::crypto::WalEncryptionKey::from_bytes(&key_bytes).unwrap();
629 let ring = crate::crypto::KeyRing::new(key);
630 let mut wal = SegmentedWal::open(config.clone()).unwrap();
631 wal.set_encryption_ring(ring).unwrap();
632
633 for i in 0..5u32 {
634 wal.append(
635 RecordType::Put as u32,
636 1,
637 0,
638 0,
639 format!("restart-{i}").as_bytes(),
640 )
641 .unwrap();
642 }
643 wal.sync().unwrap();
644 }
645
646 let key_restart = crate::crypto::WalEncryptionKey::from_bytes(&key_bytes).unwrap();
648 let ring_restart = crate::crypto::KeyRing::new(key_restart);
649
650 let segments = crate::segment::discover_segments(&wal_dir).unwrap();
652 let mut payloads = Vec::new();
653
654 for seg in &segments {
655 let reader = crate::reader::WalReader::open(&seg.path).unwrap();
656 let epoch = *reader
657 .segment_preamble()
658 .expect("segment must have preamble after encrypted write")
659 .epoch();
660 let preamble_bytes = reader.segment_preamble().unwrap().to_bytes();
661
662 for record_result in reader.records() {
663 let record = record_result.unwrap();
664 let pt = record
665 .decrypt_payload_ring(&epoch, Some(&preamble_bytes), Some(&ring_restart))
666 .unwrap();
667 payloads.push(pt);
668 }
669 }
670
671 assert_eq!(payloads.len(), 5);
672 for (i, pt) in payloads.iter().enumerate() {
673 assert_eq!(pt, format!("restart-{i}").as_bytes());
674 }
675 }
676
677 #[test]
682 fn epoch_tamper_rejected() {
683 let dir = tempfile::tempdir().unwrap();
684 let wal_dir = dir.path().join("wal");
685 let key_bytes = [0x55u8; 32];
686
687 let config = SegmentedWalConfig::for_testing(wal_dir.clone());
688
689 {
690 let key = crate::crypto::WalEncryptionKey::from_bytes(&key_bytes).unwrap();
691 let ring = crate::crypto::KeyRing::new(key);
692 let mut wal = SegmentedWal::open(config).unwrap();
693 wal.set_encryption_ring(ring).unwrap();
694 wal.append(RecordType::Put as u32, 1, 0, 0, b"sensitive payload")
695 .unwrap();
696 wal.sync().unwrap();
697 }
698
699 let segments = crate::segment::discover_segments(&wal_dir).unwrap();
701 assert_eq!(segments.len(), 1);
702 let seg_path = &segments[0].path;
703
704 let mut raw = std::fs::read(seg_path).unwrap();
705 raw[9] ^= 0xFF;
707 std::fs::write(seg_path, &raw).unwrap();
708
709 let key_read = crate::crypto::WalEncryptionKey::from_bytes(&key_bytes).unwrap();
713 let ring_read = crate::crypto::KeyRing::new(key_read);
714
715 let reader = crate::reader::WalReader::open(seg_path).unwrap();
716 let epoch = *reader.segment_preamble().unwrap().epoch();
717 let preamble_bytes = reader.segment_preamble().unwrap().to_bytes();
718
719 let record = reader.records().next().unwrap().unwrap();
720 let result = record.decrypt_payload_ring(&epoch, Some(&preamble_bytes), Some(&ring_read));
721 assert!(
722 result.is_err(),
723 "decryption with tampered preamble epoch must fail"
724 );
725 }
726
727 #[test]
728 fn replay_from_limit_basic() {
729 let dir = tempfile::tempdir().unwrap();
730 let config = test_config(dir.path());
731 let mut wal = SegmentedWal::open(config).unwrap();
732
733 for i in 0..10u8 {
735 wal.append(RecordType::Put as u32, 1, 0, 0, &[i]).unwrap();
736 }
737 wal.sync().unwrap();
738
739 let (records, has_more) = wal.replay_from_limit(1, 100).unwrap();
741 assert_eq!(records.len(), 10);
742 assert!(!has_more);
743
744 let (records, has_more) = wal.replay_from_limit(1, 3).unwrap();
746 assert_eq!(records.len(), 3);
747 assert!(has_more);
748 assert_eq!(records[0].header.lsn, 1);
749 assert_eq!(records[2].header.lsn, 3);
750 }
751
752 #[test]
753 fn replay_from_limit_with_lsn_filter() {
754 let dir = tempfile::tempdir().unwrap();
755 let config = test_config(dir.path());
756 let mut wal = SegmentedWal::open(config).unwrap();
757
758 for i in 0..10u8 {
759 wal.append(RecordType::Put as u32, 1, 0, 0, &[i]).unwrap();
760 }
761 wal.sync().unwrap();
762
763 let (records, has_more) = wal.replay_from_limit(6, 100).unwrap();
765 assert_eq!(records.len(), 5);
766 assert!(!has_more);
767 assert_eq!(records[0].header.lsn, 6);
768
769 let (records, has_more) = wal.replay_from_limit(6, 2).unwrap();
771 assert_eq!(records.len(), 2);
772 assert!(has_more);
773 }
774
775 #[test]
776 fn replay_from_limit_empty() {
777 let dir = tempfile::tempdir().unwrap();
778 let config = test_config(dir.path());
779 let mut wal = SegmentedWal::open(config).unwrap();
780
781 wal.append(RecordType::Put as u32, 1, 0, 0, b"a").unwrap();
782 wal.sync().unwrap();
783
784 let (records, has_more) = wal.replay_from_limit(999, 100).unwrap();
786 assert!(records.is_empty());
787 assert!(!has_more);
788 }
789
790 #[test]
791 fn replay_from_limit_across_segments() {
792 let dir = tempfile::tempdir().unwrap();
793 let config = test_config(dir.path());
794 let mut wal = SegmentedWal::open(config).unwrap();
795
796 for i in 0..10u8 {
798 wal.append(RecordType::Put as u32, 1, 0, 0, &[i]).unwrap();
799 }
800 wal.sync().unwrap();
801 wal.roll_segment().unwrap();
803
804 for i in 10..20u8 {
806 wal.append(RecordType::Put as u32, 1, 0, 0, &[i]).unwrap();
807 }
808 wal.sync().unwrap();
809
810 let seg_count = wal.list_segments().unwrap().len();
811 assert!(
812 seg_count >= 2,
813 "expected multiple segments, got {seg_count}"
814 );
815
816 let (records, has_more) = wal.replay_from_limit(1, 5).unwrap();
818 assert_eq!(records.len(), 5);
819 assert!(has_more);
820
821 let next_lsn = records.last().unwrap().header.lsn + 1;
823 let (records2, _) = wal.replay_from_limit(next_lsn, 200).unwrap();
824 assert_eq!(records2.len(), 15); }
826}