1use std::collections::BTreeMap;
38use std::fs::{self, File, OpenOptions};
39use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
40use std::path::{Path, PathBuf};
41use std::sync::atomic::{AtomicU64, AtomicBool, Ordering};
42use std::sync::Arc;
43use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
44
45use parking_lot::{Mutex, RwLock};
46
47pub const DEFAULT_SEGMENT_MAX_SIZE: u64 = 64 * 1024 * 1024;
49
50pub const DEFAULT_ROTATION_INTERVAL: Duration = Duration::from_secs(300);
52
53pub const DEFAULT_CHECKPOINT_INTERVAL: Duration = Duration::from_secs(60);
55
56const SEGMENT_MAGIC: u32 = 0x574C5347; const SEGMENT_VERSION: u16 = 1;
61
62const SEGMENT_HEADER_SIZE: usize = 32;
64
65const CHECKPOINT_MAGIC: u32 = 0x43484B50; #[derive(Debug, Clone)]
70pub struct SegmentConfig {
71 pub max_size: u64,
73 pub rotation_interval: Duration,
75 pub checkpoint_interval: Duration,
77 pub wal_dir: PathBuf,
79 pub sync_on_write: bool,
81 pub preallocate: bool,
83}
84
85impl Default for SegmentConfig {
86 fn default() -> Self {
87 Self {
88 max_size: DEFAULT_SEGMENT_MAX_SIZE,
89 rotation_interval: DEFAULT_ROTATION_INTERVAL,
90 checkpoint_interval: DEFAULT_CHECKPOINT_INTERVAL,
91 wal_dir: PathBuf::from("wal"),
92 sync_on_write: true,
93 preallocate: true,
94 }
95 }
96}
97
98impl SegmentConfig {
99 pub fn with_wal_dir<P: AsRef<Path>>(mut self, dir: P) -> Self {
100 self.wal_dir = dir.as_ref().to_path_buf();
101 self
102 }
103
104 pub fn with_max_size(mut self, size: u64) -> Self {
105 self.max_size = size;
106 self
107 }
108}
109
110#[derive(Debug, Clone)]
112pub struct SegmentHeader {
113 pub magic: u32,
115 pub version: u16,
117 pub flags: u16,
119 pub sequence: u64,
121 pub first_lsn: u64,
123 pub created_at: u64,
125 pub reserved: [u8; 8],
127}
128
129impl SegmentHeader {
130 fn new(sequence: u64, first_lsn: u64) -> Self {
131 let now = SystemTime::now()
132 .duration_since(UNIX_EPOCH)
133 .map(|d| d.as_millis() as u64)
134 .unwrap_or(0);
135
136 Self {
137 magic: SEGMENT_MAGIC,
138 version: SEGMENT_VERSION,
139 flags: 0,
140 sequence,
141 first_lsn,
142 created_at: now,
143 reserved: [0; 8],
144 }
145 }
146
147 fn encode(&self) -> [u8; SEGMENT_HEADER_SIZE] {
148 let mut buf = [0u8; SEGMENT_HEADER_SIZE];
149 buf[0..4].copy_from_slice(&self.magic.to_le_bytes());
150 buf[4..6].copy_from_slice(&self.version.to_le_bytes());
151 buf[6..8].copy_from_slice(&self.flags.to_le_bytes());
152 buf[8..16].copy_from_slice(&self.sequence.to_le_bytes());
153 buf[16..24].copy_from_slice(&self.first_lsn.to_le_bytes());
154 buf[24..32].copy_from_slice(&self.created_at.to_le_bytes());
155 buf
156 }
157
158 fn decode(buf: &[u8]) -> Option<Self> {
159 if buf.len() < SEGMENT_HEADER_SIZE {
160 return None;
161 }
162
163 let magic = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]);
164 if magic != SEGMENT_MAGIC {
165 return None;
166 }
167
168 Some(Self {
169 magic,
170 version: u16::from_le_bytes([buf[4], buf[5]]),
171 flags: u16::from_le_bytes([buf[6], buf[7]]),
172 sequence: u64::from_le_bytes([buf[8], buf[9], buf[10], buf[11], buf[12], buf[13], buf[14], buf[15]]),
173 first_lsn: u64::from_le_bytes([buf[16], buf[17], buf[18], buf[19], buf[20], buf[21], buf[22], buf[23]]),
174 created_at: u64::from_le_bytes([buf[24], buf[25], buf[26], buf[27], buf[28], buf[29], buf[30], buf[31]]),
175 reserved: [0; 8],
176 })
177 }
178}
179
180struct ActiveSegment {
182 file: BufWriter<File>,
184 path: PathBuf,
186 header: SegmentHeader,
188 offset: u64,
190 created_at: Instant,
192}
193
194#[derive(Debug, Clone)]
196pub struct CheckpointRecord {
197 pub lsn: u64,
199 pub last_segment: u64,
201 pub timestamp: u64,
203 pub memtable_checksum: u64,
205 pub entry_count: u64,
207}
208
209impl CheckpointRecord {
210 fn encode(&self) -> Vec<u8> {
211 let mut buf = Vec::with_capacity(48);
212 buf.extend_from_slice(&CHECKPOINT_MAGIC.to_le_bytes());
213 buf.extend_from_slice(&self.lsn.to_le_bytes());
214 buf.extend_from_slice(&self.last_segment.to_le_bytes());
215 buf.extend_from_slice(&self.timestamp.to_le_bytes());
216 buf.extend_from_slice(&self.memtable_checksum.to_le_bytes());
217 buf.extend_from_slice(&self.entry_count.to_le_bytes());
218 let checksum = crc32fast::hash(&buf);
220 buf.extend_from_slice(&checksum.to_le_bytes());
221 buf
222 }
223
224 fn decode(buf: &[u8]) -> Option<Self> {
225 if buf.len() < 48 {
226 return None;
227 }
228
229 let magic = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]);
230 if magic != CHECKPOINT_MAGIC {
231 return None;
232 }
233
234 let stored_checksum = u32::from_le_bytes([buf[44], buf[45], buf[46], buf[47]]);
236 let computed_checksum = crc32fast::hash(&buf[0..44]);
237 if stored_checksum != computed_checksum {
238 return None;
239 }
240
241 Some(Self {
242 lsn: u64::from_le_bytes([buf[4], buf[5], buf[6], buf[7], buf[8], buf[9], buf[10], buf[11]]),
243 last_segment: u64::from_le_bytes([buf[12], buf[13], buf[14], buf[15], buf[16], buf[17], buf[18], buf[19]]),
244 timestamp: u64::from_le_bytes([buf[20], buf[21], buf[22], buf[23], buf[24], buf[25], buf[26], buf[27]]),
245 memtable_checksum: u64::from_le_bytes([buf[28], buf[29], buf[30], buf[31], buf[32], buf[33], buf[34], buf[35]]),
246 entry_count: u64::from_le_bytes([buf[36], buf[37], buf[38], buf[39], buf[40], buf[41], buf[42], buf[43]]),
247 })
248 }
249}
250
251pub struct WalSegmentManager {
255 config: SegmentConfig,
257 active: Mutex<Option<ActiveSegment>>,
259 current_lsn: AtomicU64,
261 segment_sequence: AtomicU64,
263 segments: RwLock<BTreeMap<u64, SegmentMetadata>>,
265 last_checkpoint: RwLock<Option<CheckpointRecord>>,
267 shutdown: AtomicBool,
269}
270
271#[derive(Debug, Clone)]
273pub struct SegmentMetadata {
274 pub sequence: u64,
276 pub first_lsn: u64,
278 pub last_lsn: Option<u64>,
280 pub path: PathBuf,
282 pub size: u64,
284 pub is_active: bool,
286}
287
288impl WalSegmentManager {
289 pub fn new(config: SegmentConfig) -> std::io::Result<Self> {
291 fs::create_dir_all(&config.wal_dir)?;
293
294 let manager = Self {
295 config,
296 active: Mutex::new(None),
297 current_lsn: AtomicU64::new(0),
298 segment_sequence: AtomicU64::new(0),
299 segments: RwLock::new(BTreeMap::new()),
300 last_checkpoint: RwLock::new(None),
301 shutdown: AtomicBool::new(false),
302 };
303
304 manager.recover()?;
306
307 Ok(manager)
308 }
309
310 fn recover(&self) -> std::io::Result<()> {
312 let checkpoint_path = self.config.wal_dir.join("checkpoint");
314 if checkpoint_path.exists() {
315 let mut file = File::open(&checkpoint_path)?;
316 let mut buf = Vec::new();
317 file.read_to_end(&mut buf)?;
318 if let Some(record) = CheckpointRecord::decode(&buf) {
319 *self.last_checkpoint.write() = Some(record);
320 }
321 }
322
323 let entries = fs::read_dir(&self.config.wal_dir)?;
325 let mut max_sequence = 0u64;
326 let mut max_lsn = 0u64;
327
328 for entry in entries {
329 let entry = entry?;
330 let path = entry.path();
331
332 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
333 if name.starts_with("segment_") && name.ends_with(".wal") {
334 let mut file = File::open(&path)?;
336 let mut header_buf = [0u8; SEGMENT_HEADER_SIZE];
337 if file.read_exact(&mut header_buf).is_ok() {
338 if let Some(header) = SegmentHeader::decode(&header_buf) {
339 max_sequence = max_sequence.max(header.sequence);
340 max_lsn = max_lsn.max(header.first_lsn);
341
342 let metadata = file.metadata()?;
343 self.segments.write().insert(header.sequence, SegmentMetadata {
344 sequence: header.sequence,
345 first_lsn: header.first_lsn,
346 last_lsn: None,
347 path: path.clone(),
348 size: metadata.len(),
349 is_active: false,
350 });
351 }
352 }
353 }
354 }
355 }
356
357 self.segment_sequence.store(max_sequence + 1, Ordering::SeqCst);
359 self.current_lsn.store(max_lsn, Ordering::SeqCst);
360
361 Ok(())
362 }
363
364 pub fn append(&self, data: &[u8]) -> std::io::Result<u64> {
366 let mut active = self.active.lock();
367
368 if self.needs_rotation(&active) {
370 self.rotate_segment(&mut active)?;
371 }
372
373 if active.is_none() {
375 self.create_new_segment(&mut active)?;
376 }
377
378 let segment = active.as_mut().unwrap();
379
380 let lsn = self.current_lsn.fetch_add(1, Ordering::SeqCst);
382
383 let record_len = 4 + 8 + data.len() + 4;
385 let mut record = Vec::with_capacity(record_len);
386 record.extend_from_slice(&(data.len() as u32).to_le_bytes());
387 record.extend_from_slice(&lsn.to_le_bytes());
388 record.extend_from_slice(data);
389 let checksum = crc32fast::hash(&record);
390 record.extend_from_slice(&checksum.to_le_bytes());
391
392 segment.file.write_all(&record)?;
393 segment.offset += record_len as u64;
394
395 if self.config.sync_on_write {
396 segment.file.flush()?;
397 }
398
399 Ok(lsn)
400 }
401
402 fn needs_rotation(&self, active: &Option<ActiveSegment>) -> bool {
404 match active {
405 Some(segment) => {
406 segment.offset >= self.config.max_size
407 || segment.created_at.elapsed() >= self.config.rotation_interval
408 }
409 None => false,
410 }
411 }
412
413 fn rotate_segment(&self, active: &mut Option<ActiveSegment>) -> std::io::Result<()> {
415 if let Some(mut segment) = active.take() {
416 segment.file.flush()?;
418 segment.file.into_inner().map_err(|e| e.into_error())?.sync_all()?;
419
420 let current_lsn = self.current_lsn.load(Ordering::SeqCst);
422 if let Some(meta) = self.segments.write().get_mut(&segment.header.sequence) {
423 meta.is_active = false;
424 meta.last_lsn = Some(current_lsn);
425 meta.size = segment.offset;
426 }
427 }
428
429 Ok(())
430 }
431
432 fn create_new_segment(&self, active: &mut Option<ActiveSegment>) -> std::io::Result<()> {
434 let sequence = self.segment_sequence.fetch_add(1, Ordering::SeqCst);
435 let first_lsn = self.current_lsn.load(Ordering::SeqCst);
436
437 let path = self.config.wal_dir.join(format!("segment_{:016x}.wal", sequence));
438
439 let file = OpenOptions::new()
440 .create(true)
441 .write(true)
442 .truncate(true)
443 .open(&path)?;
444
445 if self.config.preallocate {
447 file.set_len(self.config.max_size)?;
448 }
449
450 let mut writer = BufWriter::new(file);
451
452 let header = SegmentHeader::new(sequence, first_lsn);
454 writer.write_all(&header.encode())?;
455
456 let segment = ActiveSegment {
457 file: writer,
458 path: path.clone(),
459 header: header.clone(),
460 offset: SEGMENT_HEADER_SIZE as u64,
461 created_at: Instant::now(),
462 };
463
464 self.segments.write().insert(sequence, SegmentMetadata {
466 sequence,
467 first_lsn,
468 last_lsn: None,
469 path,
470 size: SEGMENT_HEADER_SIZE as u64,
471 is_active: true,
472 });
473
474 *active = Some(segment);
475
476 Ok(())
477 }
478
479 pub fn create_checkpoint(
483 &self,
484 memtable_checksum: u64,
485 entry_count: u64,
486 ) -> std::io::Result<CheckpointRecord> {
487 let lsn = self.current_lsn.load(Ordering::SeqCst);
488
489 let segments = self.segments.read();
491 let last_segment = segments
492 .values()
493 .filter(|s| s.last_lsn.map(|l| l < lsn).unwrap_or(false))
494 .map(|s| s.sequence)
495 .max()
496 .unwrap_or(0);
497
498 let now = SystemTime::now()
499 .duration_since(UNIX_EPOCH)
500 .map(|d| d.as_millis() as u64)
501 .unwrap_or(0);
502
503 let record = CheckpointRecord {
504 lsn,
505 last_segment,
506 timestamp: now,
507 memtable_checksum,
508 entry_count,
509 };
510
511 let checkpoint_path = self.config.wal_dir.join("checkpoint");
513 let temp_path = self.config.wal_dir.join("checkpoint.tmp");
514
515 let mut file = File::create(&temp_path)?;
516 file.write_all(&record.encode())?;
517 file.sync_all()?;
518
519 fs::rename(&temp_path, &checkpoint_path)?;
520
521 *self.last_checkpoint.write() = Some(record.clone());
522
523 Ok(record)
524 }
525
526 pub fn cleanup_old_segments(&self) -> std::io::Result<usize> {
528 let checkpoint = self.last_checkpoint.read().clone();
529
530 let last_safe_segment = match checkpoint {
531 Some(cp) => cp.last_segment,
532 None => return Ok(0),
533 };
534
535 let mut segments = self.segments.write();
536 let old_segments: Vec<u64> = segments
537 .keys()
538 .filter(|&&seq| seq <= last_safe_segment)
539 .copied()
540 .collect();
541
542 let mut cleaned = 0;
543 for sequence in old_segments {
544 if let Some(meta) = segments.remove(&sequence) {
545 if meta.path.exists() {
546 fs::remove_file(&meta.path)?;
547 cleaned += 1;
548 }
549 }
550 }
551
552 Ok(cleaned)
553 }
554
555 pub fn stats(&self) -> SegmentStats {
557 let segments = self.segments.read();
558 let total_size: u64 = segments.values().map(|s| s.size).sum();
559 let checkpoint = self.last_checkpoint.read().clone();
560
561 SegmentStats {
562 segment_count: segments.len(),
563 total_size,
564 current_lsn: self.current_lsn.load(Ordering::SeqCst),
565 current_sequence: self.segment_sequence.load(Ordering::SeqCst),
566 last_checkpoint_lsn: checkpoint.as_ref().map(|c| c.lsn),
567 }
568 }
569
570 pub fn recovery_iterator(&self, from_lsn: u64) -> RecoveryIterator {
572 RecoveryIterator::new(self, from_lsn)
573 }
574
575 pub fn flush(&self) -> std::io::Result<()> {
577 let mut active = self.active.lock();
578 if let Some(ref mut segment) = *active {
579 segment.file.flush()?;
580 }
581 Ok(())
582 }
583
584 pub fn shutdown(&self) -> std::io::Result<()> {
586 self.shutdown.store(true, Ordering::SeqCst);
587
588 let mut active = self.active.lock();
589 if let Some(mut segment) = active.take() {
590 segment.file.flush()?;
591 segment.file.into_inner().map_err(|e| e.into_error())?.sync_all()?;
592 }
593
594 Ok(())
595 }
596}
597
598#[derive(Debug, Clone)]
600pub struct SegmentStats {
601 pub segment_count: usize,
603 pub total_size: u64,
605 pub current_lsn: u64,
607 pub current_sequence: u64,
609 pub last_checkpoint_lsn: Option<u64>,
611}
612
613pub struct RecoveryIterator<'a> {
615 manager: &'a WalSegmentManager,
616 current_segment_idx: usize,
617 segment_sequences: Vec<u64>,
618 current_reader: Option<BufReader<File>>,
619 current_offset: u64,
620 from_lsn: u64,
621}
622
623impl<'a> RecoveryIterator<'a> {
624 fn new(manager: &'a WalSegmentManager, from_lsn: u64) -> Self {
625 let segments = manager.segments.read();
626 let mut sequences: Vec<u64> = segments
627 .values()
628 .filter(|s| s.first_lsn >= from_lsn || s.last_lsn.map(|l| l >= from_lsn).unwrap_or(true))
629 .map(|s| s.sequence)
630 .collect();
631 sequences.sort();
632
633 Self {
634 manager,
635 current_segment_idx: 0,
636 segment_sequences: sequences,
637 current_reader: None,
638 current_offset: SEGMENT_HEADER_SIZE as u64,
639 from_lsn,
640 }
641 }
642
643 pub fn next_entry(&mut self) -> std::io::Result<Option<WalEntry>> {
645 loop {
646 if self.current_reader.is_none() {
648 if self.current_segment_idx >= self.segment_sequences.len() {
649 return Ok(None);
650 }
651
652 let sequence = self.segment_sequences[self.current_segment_idx];
653 let segments = self.manager.segments.read();
654 if let Some(meta) = segments.get(&sequence) {
655 let file = File::open(&meta.path)?;
656 let mut reader = BufReader::new(file);
657 reader.seek(SeekFrom::Start(SEGMENT_HEADER_SIZE as u64))?;
658 self.current_reader = Some(reader);
659 self.current_offset = SEGMENT_HEADER_SIZE as u64;
660 } else {
661 self.current_segment_idx += 1;
662 continue;
663 }
664 }
665
666 let reader = self.current_reader.as_mut().unwrap();
667
668 let mut len_buf = [0u8; 4];
670 match reader.read_exact(&mut len_buf) {
671 Ok(_) => {}
672 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
673 self.current_reader = None;
675 self.current_segment_idx += 1;
676 continue;
677 }
678 Err(e) => return Err(e),
679 }
680
681 let data_len = u32::from_le_bytes(len_buf) as usize;
682 if data_len == 0 || data_len > 100 * 1024 * 1024 {
683 self.current_reader = None;
685 self.current_segment_idx += 1;
686 continue;
687 }
688
689 let mut lsn_buf = [0u8; 8];
691 reader.read_exact(&mut lsn_buf)?;
692 let lsn = u64::from_le_bytes(lsn_buf);
693
694 let mut data = vec![0u8; data_len];
696 reader.read_exact(&mut data)?;
697
698 let mut checksum_buf = [0u8; 4];
700 reader.read_exact(&mut checksum_buf)?;
701 let stored_checksum = u32::from_le_bytes(checksum_buf);
702
703 let mut verify_buf = Vec::with_capacity(4 + 8 + data_len);
704 verify_buf.extend_from_slice(&len_buf);
705 verify_buf.extend_from_slice(&lsn_buf);
706 verify_buf.extend_from_slice(&data);
707 let computed_checksum = crc32fast::hash(&verify_buf);
708
709 if stored_checksum != computed_checksum {
710 return Err(std::io::Error::new(
711 std::io::ErrorKind::InvalidData,
712 "WAL entry checksum mismatch",
713 ));
714 }
715
716 self.current_offset += (4 + 8 + data_len + 4) as u64;
717
718 if lsn < self.from_lsn {
720 continue;
721 }
722
723 return Ok(Some(WalEntry { lsn, data }));
724 }
725 }
726}
727
728#[derive(Debug, Clone)]
730pub struct WalEntry {
731 pub lsn: u64,
733 pub data: Vec<u8>,
735}
736
737#[cfg(test)]
742mod tests {
743 use super::*;
744 use tempfile::tempdir;
745
746 #[test]
747 fn test_segment_manager_basic() {
748 let dir = tempdir().unwrap();
749 let config = SegmentConfig::default()
750 .with_wal_dir(dir.path())
751 .with_max_size(1024);
752
753 let manager = WalSegmentManager::new(config).unwrap();
754
755 for i in 0..100 {
757 let data = format!("entry_{}", i);
758 let lsn = manager.append(data.as_bytes()).unwrap();
759 assert_eq!(lsn, i as u64);
760 }
761
762 let stats = manager.stats();
763 assert!(stats.segment_count > 0);
764 assert_eq!(stats.current_lsn, 100);
765
766 manager.shutdown().unwrap();
767 }
768
769 #[test]
770 fn test_checkpoint_and_cleanup() {
771 let dir = tempdir().unwrap();
772 let config = SegmentConfig::default()
773 .with_wal_dir(dir.path())
774 .with_max_size(256);
775
776 let manager = WalSegmentManager::new(config).unwrap();
777
778 for i in 0..50 {
780 let data = format!("entry_{:04}", i);
781 manager.append(data.as_bytes()).unwrap();
782 }
783
784 manager.flush().unwrap();
786
787 let checkpoint = manager.create_checkpoint(12345, 50).unwrap();
789 assert!(checkpoint.lsn > 0);
790
791 let cleaned = manager.cleanup_old_segments().unwrap();
793 assert!(cleaned >= 0);
795
796 manager.shutdown().unwrap();
797 }
798
799 #[test]
800 fn test_recovery() {
801 let dir = tempdir().unwrap();
802 let config = SegmentConfig::default()
803 .with_wal_dir(dir.path());
804
805 {
807 let manager = WalSegmentManager::new(config.clone()).unwrap();
808 for i in 0..10 {
809 let data = format!("data_{}", i);
810 manager.append(data.as_bytes()).unwrap();
811 }
812 manager.shutdown().unwrap();
813 }
814
815 {
817 let manager = WalSegmentManager::new(config).unwrap();
818 let mut iter = manager.recovery_iterator(0);
819 let mut count = 0;
820
821 while let Some(entry) = iter.next_entry().unwrap() {
822 let data = String::from_utf8_lossy(&entry.data);
823 assert!(data.starts_with("data_"));
824 count += 1;
825 }
826
827 assert_eq!(count, 10);
828 }
829 }
830
831 #[test]
832 fn test_segment_header_encoding() {
833 let header = SegmentHeader::new(42, 12345);
834 let encoded = header.encode();
835 let decoded = SegmentHeader::decode(&encoded).unwrap();
836
837 assert_eq!(decoded.magic, SEGMENT_MAGIC);
838 assert_eq!(decoded.sequence, 42);
839 assert_eq!(decoded.first_lsn, 12345);
840 }
841
842 #[test]
843 fn test_checkpoint_record_encoding() {
844 let record = CheckpointRecord {
845 lsn: 1000,
846 last_segment: 5,
847 timestamp: 123456789,
848 memtable_checksum: 0xDEADBEEF,
849 entry_count: 500,
850 };
851
852 let encoded = record.encode();
853 let decoded = CheckpointRecord::decode(&encoded).unwrap();
854
855 assert_eq!(decoded.lsn, 1000);
856 assert_eq!(decoded.last_segment, 5);
857 assert_eq!(decoded.memtable_checksum, 0xDEADBEEF);
858 assert_eq!(decoded.entry_count, 500);
859 }
860}