1use std::fs;
18use std::path::{Path, PathBuf};
19
20use tracing::info;
21
22use crate::error::{Result, WalError};
23use crate::record::WalRecord;
24use crate::segment::{
25 DEFAULT_SEGMENT_TARGET_SIZE, SegmentMeta, TruncateResult, discover_segments, segment_path,
26 truncate_segments,
27};
28use crate::writer::{WalWriter, WalWriterConfig};
29
30#[derive(Debug, Clone)]
32pub struct SegmentedWalConfig {
33 pub wal_dir: PathBuf,
35
36 pub segment_target_size: u64,
40
41 pub writer_config: WalWriterConfig,
43}
44
45impl SegmentedWalConfig {
46 pub fn new(wal_dir: PathBuf) -> Self {
48 Self {
49 wal_dir,
50 segment_target_size: DEFAULT_SEGMENT_TARGET_SIZE,
51 writer_config: WalWriterConfig::default(),
52 }
53 }
54
55 pub fn for_testing(wal_dir: PathBuf) -> Self {
57 Self {
58 wal_dir,
59 segment_target_size: DEFAULT_SEGMENT_TARGET_SIZE,
60 writer_config: WalWriterConfig {
61 use_direct_io: false,
62 ..Default::default()
63 },
64 }
65 }
66}
67
68pub struct SegmentedWal {
74 wal_dir: PathBuf,
76
77 writer: WalWriter,
79
80 active_first_lsn: u64,
82
83 segment_target_size: u64,
85
86 writer_config: WalWriterConfig,
88
89 encryption_ring: Option<crate::crypto::KeyRing>,
91}
92
93impl SegmentedWal {
94 pub fn open(config: SegmentedWalConfig) -> Result<Self> {
100 fs::create_dir_all(&config.wal_dir).map_err(WalError::Io)?;
101
102 let segments = discover_segments(&config.wal_dir)?;
103
104 let (writer, active_first_lsn) = if segments.is_empty() {
105 let path = segment_path(&config.wal_dir, 1);
107 let writer = WalWriter::open(&path, config.writer_config.clone())?;
108 (writer, 1u64)
109 } else {
110 let last = &segments[segments.len() - 1];
112 let writer = WalWriter::open(&last.path, config.writer_config.clone())?;
113 (writer, last.first_lsn)
114 };
115
116 info!(
117 wal_dir = %config.wal_dir.display(),
118 segments = segments.len().max(1),
119 active_first_lsn,
120 next_lsn = writer.next_lsn(),
121 "segmented WAL opened"
122 );
123
124 Ok(Self {
125 wal_dir: config.wal_dir,
126 writer,
127 active_first_lsn,
128 segment_target_size: config.segment_target_size,
129 writer_config: config.writer_config,
130 encryption_ring: None,
131 })
132 }
133
134 pub fn set_encryption_ring(&mut self, ring: crate::crypto::KeyRing) {
136 self.writer.set_encryption_ring(ring.clone());
137 self.encryption_ring = Some(ring);
138 }
139
140 pub fn encryption_ring(&self) -> Option<&crate::crypto::KeyRing> {
142 self.encryption_ring.as_ref()
143 }
144
145 pub fn append(
150 &mut self,
151 record_type: u16,
152 tenant_id: u32,
153 vshard_id: u16,
154 payload: &[u8],
155 ) -> Result<u64> {
156 if self.writer.file_offset() >= self.segment_target_size {
158 self.roll_segment()?;
159 }
160
161 self.writer
162 .append(record_type, tenant_id, vshard_id, payload)
163 }
164
165 pub fn sync(&mut self) -> Result<()> {
167 self.writer.sync()
168 }
169
170 pub fn next_lsn(&self) -> u64 {
172 self.writer.next_lsn()
173 }
174
175 pub fn active_segment_first_lsn(&self) -> u64 {
177 self.active_first_lsn
178 }
179
180 pub fn wal_dir(&self) -> &Path {
182 &self.wal_dir
183 }
184
185 pub fn truncate_before(&self, checkpoint_lsn: u64) -> Result<TruncateResult> {
192 truncate_segments(&self.wal_dir, checkpoint_lsn, self.active_first_lsn)
193 }
194
195 pub fn replay(&self) -> Result<Vec<WalRecord>> {
199 replay_all_segments(&self.wal_dir)
200 }
201
202 pub fn replay_from(&self, from_lsn: u64) -> Result<Vec<WalRecord>> {
207 let all = self.replay()?;
208 Ok(all
209 .into_iter()
210 .filter(|r| r.header.lsn >= from_lsn)
211 .collect())
212 }
213
214 pub fn replay_from_limit(
216 &self,
217 from_lsn: u64,
218 max_records: usize,
219 ) -> Result<(Vec<WalRecord>, bool)> {
220 replay_from_limit_dir(&self.wal_dir, from_lsn, max_records)
221 }
222
223 pub fn list_segments(&self) -> Result<Vec<SegmentMeta>> {
225 discover_segments(&self.wal_dir)
226 }
227
228 pub fn total_size_bytes(&self) -> Result<u64> {
230 let segments = discover_segments(&self.wal_dir)?;
231 Ok(segments.iter().map(|s| s.file_size).sum())
232 }
233
234 fn roll_segment(&mut self) -> Result<()> {
236 self.writer.seal()?;
238
239 let new_first_lsn = self.writer.next_lsn();
241 let new_path = segment_path(&self.wal_dir, new_first_lsn);
242
243 let mut new_writer =
244 WalWriter::open_with_start_lsn(&new_path, self.writer_config.clone(), new_first_lsn)?;
245
246 if let Some(ref ring) = self.encryption_ring {
248 new_writer.set_encryption_ring(ring.clone());
249 }
250
251 self.writer = new_writer;
252 self.active_first_lsn = new_first_lsn;
253
254 let _ = crate::segment::fsync_directory(&self.wal_dir);
258
259 info!(
260 segment = %new_path.display(),
261 first_lsn = new_first_lsn,
262 "rolled to new WAL segment"
263 );
264
265 Ok(())
266 }
267}
268
269pub fn replay_all_segments(wal_dir: &Path) -> Result<Vec<WalRecord>> {
274 let segments = discover_segments(wal_dir)?;
275 let mut all_records = Vec::new();
276
277 for seg in &segments {
278 let reader = crate::reader::WalReader::open(&seg.path)?;
279 for record_result in reader.records() {
280 all_records.push(record_result?);
281 }
282 }
283
284 Ok(all_records)
285}
286
287pub fn replay_from_limit_dir(
295 wal_dir: &Path,
296 from_lsn: u64,
297 max_records: usize,
298) -> Result<(Vec<WalRecord>, bool)> {
299 let segments = discover_segments(wal_dir)?;
300 let mut records = Vec::with_capacity(max_records.min(4096));
301
302 for seg in &segments {
303 let reader = crate::reader::WalReader::open(&seg.path)?;
304 for record_result in reader.records() {
305 let record = record_result?;
306 if record.header.lsn >= from_lsn {
307 records.push(record);
308 if records.len() >= max_records {
309 return Ok((records, true));
310 }
311 }
312 }
313 }
314
315 Ok((records, false))
316}
317
318#[cfg(test)]
319mod tests {
320 use super::*;
321 use crate::record::RecordType;
322
323 fn test_config(dir: &Path) -> SegmentedWalConfig {
324 SegmentedWalConfig::for_testing(dir.to_path_buf())
325 }
326
327 #[test]
328 fn create_and_append() {
329 let dir = tempfile::tempdir().unwrap();
330 let wal_dir = dir.path().join("wal");
331
332 let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
333 let lsn1 = wal.append(RecordType::Put as u16, 1, 0, b"hello").unwrap();
334 let lsn2 = wal.append(RecordType::Put as u16, 1, 0, b"world").unwrap();
335 wal.sync().unwrap();
336
337 assert_eq!(lsn1, 1);
338 assert_eq!(lsn2, 2);
339 assert_eq!(wal.next_lsn(), 3);
340 }
341
342 #[test]
343 fn replay_after_close() {
344 let dir = tempfile::tempdir().unwrap();
345 let wal_dir = dir.path().join("wal");
346
347 {
349 let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
350 wal.append(RecordType::Put as u16, 1, 0, b"first").unwrap();
351 wal.append(RecordType::Delete as u16, 2, 1, b"second")
352 .unwrap();
353 wal.append(RecordType::Put as u16, 1, 0, b"third").unwrap();
354 wal.sync().unwrap();
355 }
356
357 let wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
359 let records = wal.replay().unwrap();
360 assert_eq!(records.len(), 3);
361 assert_eq!(records[0].payload, b"first");
362 assert_eq!(records[1].payload, b"second");
363 assert_eq!(records[2].payload, b"third");
364 assert_eq!(wal.next_lsn(), 4);
365 }
366
367 #[test]
368 fn automatic_segment_rollover() {
369 let dir = tempfile::tempdir().unwrap();
370 let wal_dir = dir.path().join("wal");
371
372 let config = SegmentedWalConfig {
374 wal_dir: wal_dir.clone(),
375 segment_target_size: 100, writer_config: WalWriterConfig {
377 use_direct_io: false,
378 ..Default::default()
379 },
380 };
381
382 let mut wal = SegmentedWal::open(config).unwrap();
383
384 for i in 0..20u32 {
386 let payload = format!("record-{i:04}");
387 wal.append(RecordType::Put as u16, 1, 0, payload.as_bytes())
388 .unwrap();
389 wal.sync().unwrap();
390 }
391
392 let segments = wal.list_segments().unwrap();
394 assert!(
395 segments.len() > 1,
396 "expected multiple segments, got {}",
397 segments.len()
398 );
399
400 let records = wal.replay().unwrap();
402 assert_eq!(records.len(), 20);
403 for (i, record) in records.iter().enumerate() {
404 assert_eq!(record.header.lsn, (i + 1) as u64);
405 let expected = format!("record-{i:04}");
406 assert_eq!(record.payload, expected.as_bytes());
407 }
408 }
409
410 #[test]
411 fn truncation_removes_old_segments() {
412 let dir = tempfile::tempdir().unwrap();
413 let wal_dir = dir.path().join("wal");
414
415 let config = SegmentedWalConfig {
416 wal_dir: wal_dir.clone(),
417 segment_target_size: 100,
418 writer_config: WalWriterConfig {
419 use_direct_io: false,
420 ..Default::default()
421 },
422 };
423
424 let mut wal = SegmentedWal::open(config).unwrap();
425
426 for i in 0..20u32 {
428 let payload = format!("record-{i:04}");
429 wal.append(RecordType::Put as u16, 1, 0, payload.as_bytes())
430 .unwrap();
431 wal.sync().unwrap();
432 }
433
434 let segments_before = wal.list_segments().unwrap();
435 assert!(segments_before.len() > 1);
436
437 let result = wal.truncate_before(15).unwrap();
439 assert!(result.segments_deleted > 0);
440 assert!(result.bytes_reclaimed > 0);
441
442 let segments_after = wal.list_segments().unwrap();
443 assert!(segments_after.len() < segments_before.len());
444
445 let records = wal.replay().unwrap();
447 assert!(records.iter().any(|r| r.header.lsn >= 15));
449 }
450
451 #[test]
452 fn replay_from_checkpoint_lsn() {
453 let dir = tempfile::tempdir().unwrap();
454 let wal_dir = dir.path().join("wal");
455
456 let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
457 for i in 0..10u32 {
458 wal.append(RecordType::Put as u16, 1, 0, format!("r{i}").as_bytes())
459 .unwrap();
460 }
461 wal.sync().unwrap();
462
463 let records = wal.replay_from(6).unwrap();
465 assert_eq!(records.len(), 5);
466 assert_eq!(records[0].header.lsn, 6);
467 assert_eq!(records[4].header.lsn, 10);
468 }
469
470 #[test]
471 fn total_size_bytes() {
472 let dir = tempfile::tempdir().unwrap();
473 let wal_dir = dir.path().join("wal");
474
475 let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
476 wal.append(RecordType::Put as u16, 1, 0, b"data").unwrap();
477 wal.sync().unwrap();
478
479 let size = wal.total_size_bytes().unwrap();
480 assert!(size > 0);
481 }
482
483 #[test]
484 fn reopen_continues_lsn() {
485 let dir = tempfile::tempdir().unwrap();
486 let wal_dir = dir.path().join("wal");
487
488 {
489 let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
490 wal.append(RecordType::Put as u16, 1, 0, b"a").unwrap();
491 wal.append(RecordType::Put as u16, 1, 0, b"b").unwrap();
492 wal.sync().unwrap();
493 }
494
495 {
496 let mut wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
497 assert_eq!(wal.next_lsn(), 3);
498 let lsn = wal.append(RecordType::Put as u16, 1, 0, b"c").unwrap();
499 assert_eq!(lsn, 3);
500 wal.sync().unwrap();
501 }
502
503 let wal = SegmentedWal::open(test_config(&wal_dir)).unwrap();
504 let records = wal.replay().unwrap();
505 assert_eq!(records.len(), 3);
506 }
507
508 #[test]
509 fn encryption_persists_across_segments() {
510 let dir = tempfile::tempdir().unwrap();
511 let wal_dir = dir.path().join("wal");
512
513 let key = crate::crypto::WalEncryptionKey::from_bytes(&[42u8; 32]);
514 let ring = crate::crypto::KeyRing::new(key);
515
516 let config = SegmentedWalConfig {
517 wal_dir: wal_dir.clone(),
518 segment_target_size: 100, writer_config: WalWriterConfig {
520 use_direct_io: false,
521 ..Default::default()
522 },
523 };
524
525 let mut wal = SegmentedWal::open(config).unwrap();
526 wal.set_encryption_ring(ring);
527
528 for i in 0..10u32 {
530 wal.append(RecordType::Put as u16, 1, 0, format!("enc-{i}").as_bytes())
531 .unwrap();
532 wal.sync().unwrap();
533 }
534
535 assert!(wal.list_segments().unwrap().len() > 1);
537
538 let records = wal.replay().unwrap();
542 assert_eq!(records.len(), 10);
543 assert!(records.iter().all(|r| r.is_encrypted()));
545 }
546
547 #[test]
548 fn replay_from_limit_basic() {
549 let dir = tempfile::tempdir().unwrap();
550 let config = test_config(dir.path());
551 let mut wal = SegmentedWal::open(config).unwrap();
552
553 for i in 0..10u8 {
555 wal.append(RecordType::Put as u16, 1, 0, &[i]).unwrap();
556 }
557 wal.sync().unwrap();
558
559 let (records, has_more) = wal.replay_from_limit(1, 100).unwrap();
561 assert_eq!(records.len(), 10);
562 assert!(!has_more);
563
564 let (records, has_more) = wal.replay_from_limit(1, 3).unwrap();
566 assert_eq!(records.len(), 3);
567 assert!(has_more);
568 assert_eq!(records[0].header.lsn, 1);
569 assert_eq!(records[2].header.lsn, 3);
570 }
571
572 #[test]
573 fn replay_from_limit_with_lsn_filter() {
574 let dir = tempfile::tempdir().unwrap();
575 let config = test_config(dir.path());
576 let mut wal = SegmentedWal::open(config).unwrap();
577
578 for i in 0..10u8 {
579 wal.append(RecordType::Put as u16, 1, 0, &[i]).unwrap();
580 }
581 wal.sync().unwrap();
582
583 let (records, has_more) = wal.replay_from_limit(6, 100).unwrap();
585 assert_eq!(records.len(), 5);
586 assert!(!has_more);
587 assert_eq!(records[0].header.lsn, 6);
588
589 let (records, has_more) = wal.replay_from_limit(6, 2).unwrap();
591 assert_eq!(records.len(), 2);
592 assert!(has_more);
593 }
594
595 #[test]
596 fn replay_from_limit_empty() {
597 let dir = tempfile::tempdir().unwrap();
598 let config = test_config(dir.path());
599 let mut wal = SegmentedWal::open(config).unwrap();
600
601 wal.append(RecordType::Put as u16, 1, 0, b"a").unwrap();
602 wal.sync().unwrap();
603
604 let (records, has_more) = wal.replay_from_limit(999, 100).unwrap();
606 assert!(records.is_empty());
607 assert!(!has_more);
608 }
609
610 #[test]
611 fn replay_from_limit_across_segments() {
612 let dir = tempfile::tempdir().unwrap();
613 let config = test_config(dir.path());
614 let mut wal = SegmentedWal::open(config).unwrap();
615
616 for i in 0..10u8 {
618 wal.append(RecordType::Put as u16, 1, 0, &[i]).unwrap();
619 }
620 wal.sync().unwrap();
621 wal.roll_segment().unwrap();
623
624 for i in 10..20u8 {
626 wal.append(RecordType::Put as u16, 1, 0, &[i]).unwrap();
627 }
628 wal.sync().unwrap();
629
630 let seg_count = wal.list_segments().unwrap().len();
631 assert!(
632 seg_count >= 2,
633 "expected multiple segments, got {seg_count}"
634 );
635
636 let (records, has_more) = wal.replay_from_limit(1, 5).unwrap();
638 assert_eq!(records.len(), 5);
639 assert!(has_more);
640
641 let next_lsn = records.last().unwrap().header.lsn + 1;
643 let (records2, _) = wal.replay_from_limit(next_lsn, 200).unwrap();
644 assert_eq!(records2.len(), 15); }
646}