1use std::collections::BTreeMap;
41use std::fs::{self, File, OpenOptions};
42use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
43use std::path::{Path, PathBuf};
44use std::sync::atomic::{AtomicU64, AtomicBool, Ordering};
45use std::sync::Arc;
46use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
47
48use parking_lot::{Mutex, RwLock};
49
50pub const DEFAULT_SEGMENT_MAX_SIZE: u64 = 64 * 1024 * 1024;
52
53pub const DEFAULT_ROTATION_INTERVAL: Duration = Duration::from_secs(300);
55
56pub const DEFAULT_CHECKPOINT_INTERVAL: Duration = Duration::from_secs(60);
58
59const SEGMENT_MAGIC: u32 = 0x574C5347; const SEGMENT_VERSION: u16 = 1;
64
65const SEGMENT_HEADER_SIZE: usize = 32;
67
68const CHECKPOINT_MAGIC: u32 = 0x43484B50; #[derive(Debug, Clone)]
73pub struct SegmentConfig {
74 pub max_size: u64,
76 pub rotation_interval: Duration,
78 pub checkpoint_interval: Duration,
80 pub wal_dir: PathBuf,
82 pub sync_on_write: bool,
84 pub preallocate: bool,
86}
87
88impl Default for SegmentConfig {
89 fn default() -> Self {
90 Self {
91 max_size: DEFAULT_SEGMENT_MAX_SIZE,
92 rotation_interval: DEFAULT_ROTATION_INTERVAL,
93 checkpoint_interval: DEFAULT_CHECKPOINT_INTERVAL,
94 wal_dir: PathBuf::from("wal"),
95 sync_on_write: true,
96 preallocate: true,
97 }
98 }
99}
100
101impl SegmentConfig {
102 pub fn with_wal_dir<P: AsRef<Path>>(mut self, dir: P) -> Self {
103 self.wal_dir = dir.as_ref().to_path_buf();
104 self
105 }
106
107 pub fn with_max_size(mut self, size: u64) -> Self {
108 self.max_size = size;
109 self
110 }
111}
112
113#[derive(Debug, Clone)]
115pub struct SegmentHeader {
116 pub magic: u32,
118 pub version: u16,
120 pub flags: u16,
122 pub sequence: u64,
124 pub first_lsn: u64,
126 pub created_at: u64,
128 pub reserved: [u8; 8],
130}
131
132impl SegmentHeader {
133 fn new(sequence: u64, first_lsn: u64) -> Self {
134 let now = SystemTime::now()
135 .duration_since(UNIX_EPOCH)
136 .map(|d| d.as_millis() as u64)
137 .unwrap_or(0);
138
139 Self {
140 magic: SEGMENT_MAGIC,
141 version: SEGMENT_VERSION,
142 flags: 0,
143 sequence,
144 first_lsn,
145 created_at: now,
146 reserved: [0; 8],
147 }
148 }
149
150 fn encode(&self) -> [u8; SEGMENT_HEADER_SIZE] {
151 let mut buf = [0u8; SEGMENT_HEADER_SIZE];
152 buf[0..4].copy_from_slice(&self.magic.to_le_bytes());
153 buf[4..6].copy_from_slice(&self.version.to_le_bytes());
154 buf[6..8].copy_from_slice(&self.flags.to_le_bytes());
155 buf[8..16].copy_from_slice(&self.sequence.to_le_bytes());
156 buf[16..24].copy_from_slice(&self.first_lsn.to_le_bytes());
157 buf[24..32].copy_from_slice(&self.created_at.to_le_bytes());
158 buf
159 }
160
161 fn decode(buf: &[u8]) -> Option<Self> {
162 if buf.len() < SEGMENT_HEADER_SIZE {
163 return None;
164 }
165
166 let magic = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]);
167 if magic != SEGMENT_MAGIC {
168 return None;
169 }
170
171 Some(Self {
172 magic,
173 version: u16::from_le_bytes([buf[4], buf[5]]),
174 flags: u16::from_le_bytes([buf[6], buf[7]]),
175 sequence: u64::from_le_bytes([buf[8], buf[9], buf[10], buf[11], buf[12], buf[13], buf[14], buf[15]]),
176 first_lsn: u64::from_le_bytes([buf[16], buf[17], buf[18], buf[19], buf[20], buf[21], buf[22], buf[23]]),
177 created_at: u64::from_le_bytes([buf[24], buf[25], buf[26], buf[27], buf[28], buf[29], buf[30], buf[31]]),
178 reserved: [0; 8],
179 })
180 }
181}
182
183struct ActiveSegment {
185 file: BufWriter<File>,
187 path: PathBuf,
189 header: SegmentHeader,
191 offset: u64,
193 created_at: Instant,
195}
196
197#[derive(Debug, Clone)]
199pub struct CheckpointRecord {
200 pub lsn: u64,
202 pub last_segment: u64,
204 pub timestamp: u64,
206 pub memtable_checksum: u64,
208 pub entry_count: u64,
210}
211
212impl CheckpointRecord {
213 fn encode(&self) -> Vec<u8> {
214 let mut buf = Vec::with_capacity(48);
215 buf.extend_from_slice(&CHECKPOINT_MAGIC.to_le_bytes());
216 buf.extend_from_slice(&self.lsn.to_le_bytes());
217 buf.extend_from_slice(&self.last_segment.to_le_bytes());
218 buf.extend_from_slice(&self.timestamp.to_le_bytes());
219 buf.extend_from_slice(&self.memtable_checksum.to_le_bytes());
220 buf.extend_from_slice(&self.entry_count.to_le_bytes());
221 let checksum = crc32fast::hash(&buf);
223 buf.extend_from_slice(&checksum.to_le_bytes());
224 buf
225 }
226
227 fn decode(buf: &[u8]) -> Option<Self> {
228 if buf.len() < 48 {
229 return None;
230 }
231
232 let magic = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]);
233 if magic != CHECKPOINT_MAGIC {
234 return None;
235 }
236
237 let stored_checksum = u32::from_le_bytes([buf[44], buf[45], buf[46], buf[47]]);
239 let computed_checksum = crc32fast::hash(&buf[0..44]);
240 if stored_checksum != computed_checksum {
241 return None;
242 }
243
244 Some(Self {
245 lsn: u64::from_le_bytes([buf[4], buf[5], buf[6], buf[7], buf[8], buf[9], buf[10], buf[11]]),
246 last_segment: u64::from_le_bytes([buf[12], buf[13], buf[14], buf[15], buf[16], buf[17], buf[18], buf[19]]),
247 timestamp: u64::from_le_bytes([buf[20], buf[21], buf[22], buf[23], buf[24], buf[25], buf[26], buf[27]]),
248 memtable_checksum: u64::from_le_bytes([buf[28], buf[29], buf[30], buf[31], buf[32], buf[33], buf[34], buf[35]]),
249 entry_count: u64::from_le_bytes([buf[36], buf[37], buf[38], buf[39], buf[40], buf[41], buf[42], buf[43]]),
250 })
251 }
252}
253
254pub struct WalSegmentManager {
258 config: SegmentConfig,
260 active: Mutex<Option<ActiveSegment>>,
262 current_lsn: AtomicU64,
264 segment_sequence: AtomicU64,
266 segments: RwLock<BTreeMap<u64, SegmentMetadata>>,
268 last_checkpoint: RwLock<Option<CheckpointRecord>>,
270 shutdown: AtomicBool,
272}
273
274#[derive(Debug, Clone)]
276pub struct SegmentMetadata {
277 pub sequence: u64,
279 pub first_lsn: u64,
281 pub last_lsn: Option<u64>,
283 pub path: PathBuf,
285 pub size: u64,
287 pub is_active: bool,
289}
290
291impl WalSegmentManager {
292 pub fn new(config: SegmentConfig) -> std::io::Result<Self> {
294 fs::create_dir_all(&config.wal_dir)?;
296
297 let manager = Self {
298 config,
299 active: Mutex::new(None),
300 current_lsn: AtomicU64::new(0),
301 segment_sequence: AtomicU64::new(0),
302 segments: RwLock::new(BTreeMap::new()),
303 last_checkpoint: RwLock::new(None),
304 shutdown: AtomicBool::new(false),
305 };
306
307 manager.recover()?;
309
310 Ok(manager)
311 }
312
313 fn recover(&self) -> std::io::Result<()> {
315 let checkpoint_path = self.config.wal_dir.join("checkpoint");
317 if checkpoint_path.exists() {
318 let mut file = File::open(&checkpoint_path)?;
319 let mut buf = Vec::new();
320 file.read_to_end(&mut buf)?;
321 if let Some(record) = CheckpointRecord::decode(&buf) {
322 *self.last_checkpoint.write() = Some(record);
323 }
324 }
325
326 let entries = fs::read_dir(&self.config.wal_dir)?;
328 let mut max_sequence = 0u64;
329 let mut max_lsn = 0u64;
330
331 for entry in entries {
332 let entry = entry?;
333 let path = entry.path();
334
335 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
336 if name.starts_with("segment_") && name.ends_with(".wal") {
337 let mut file = File::open(&path)?;
339 let mut header_buf = [0u8; SEGMENT_HEADER_SIZE];
340 if file.read_exact(&mut header_buf).is_ok() {
341 if let Some(header) = SegmentHeader::decode(&header_buf) {
342 max_sequence = max_sequence.max(header.sequence);
343 max_lsn = max_lsn.max(header.first_lsn);
344
345 let metadata = file.metadata()?;
346 self.segments.write().insert(header.sequence, SegmentMetadata {
347 sequence: header.sequence,
348 first_lsn: header.first_lsn,
349 last_lsn: None,
350 path: path.clone(),
351 size: metadata.len(),
352 is_active: false,
353 });
354 }
355 }
356 }
357 }
358 }
359
360 self.segment_sequence.store(max_sequence + 1, Ordering::SeqCst);
362 self.current_lsn.store(max_lsn, Ordering::SeqCst);
363
364 Ok(())
365 }
366
367 pub fn append(&self, data: &[u8]) -> std::io::Result<u64> {
369 let mut active = self.active.lock();
370
371 if self.needs_rotation(&active) {
373 self.rotate_segment(&mut active)?;
374 }
375
376 if active.is_none() {
378 self.create_new_segment(&mut active)?;
379 }
380
381 let segment = active.as_mut().unwrap();
382
383 let lsn = self.current_lsn.fetch_add(1, Ordering::SeqCst);
385
386 let record_len = 4 + 8 + data.len() + 4;
388 let mut record = Vec::with_capacity(record_len);
389 record.extend_from_slice(&(data.len() as u32).to_le_bytes());
390 record.extend_from_slice(&lsn.to_le_bytes());
391 record.extend_from_slice(data);
392 let checksum = crc32fast::hash(&record);
393 record.extend_from_slice(&checksum.to_le_bytes());
394
395 segment.file.write_all(&record)?;
396 segment.offset += record_len as u64;
397
398 if self.config.sync_on_write {
399 segment.file.flush()?;
400 }
401
402 Ok(lsn)
403 }
404
405 fn needs_rotation(&self, active: &Option<ActiveSegment>) -> bool {
407 match active {
408 Some(segment) => {
409 segment.offset >= self.config.max_size
410 || segment.created_at.elapsed() >= self.config.rotation_interval
411 }
412 None => false,
413 }
414 }
415
416 fn rotate_segment(&self, active: &mut Option<ActiveSegment>) -> std::io::Result<()> {
418 if let Some(mut segment) = active.take() {
419 segment.file.flush()?;
421 segment.file.into_inner().map_err(|e| e.into_error())?.sync_all()?;
422
423 let current_lsn = self.current_lsn.load(Ordering::SeqCst);
425 if let Some(meta) = self.segments.write().get_mut(&segment.header.sequence) {
426 meta.is_active = false;
427 meta.last_lsn = Some(current_lsn);
428 meta.size = segment.offset;
429 }
430 }
431
432 Ok(())
433 }
434
435 fn create_new_segment(&self, active: &mut Option<ActiveSegment>) -> std::io::Result<()> {
437 let sequence = self.segment_sequence.fetch_add(1, Ordering::SeqCst);
438 let first_lsn = self.current_lsn.load(Ordering::SeqCst);
439
440 let path = self.config.wal_dir.join(format!("segment_{:016x}.wal", sequence));
441
442 let file = OpenOptions::new()
443 .create(true)
444 .write(true)
445 .truncate(true)
446 .open(&path)?;
447
448 if self.config.preallocate {
450 file.set_len(self.config.max_size)?;
451 }
452
453 let mut writer = BufWriter::new(file);
454
455 let header = SegmentHeader::new(sequence, first_lsn);
457 writer.write_all(&header.encode())?;
458
459 let segment = ActiveSegment {
460 file: writer,
461 path: path.clone(),
462 header: header.clone(),
463 offset: SEGMENT_HEADER_SIZE as u64,
464 created_at: Instant::now(),
465 };
466
467 self.segments.write().insert(sequence, SegmentMetadata {
469 sequence,
470 first_lsn,
471 last_lsn: None,
472 path,
473 size: SEGMENT_HEADER_SIZE as u64,
474 is_active: true,
475 });
476
477 *active = Some(segment);
478
479 Ok(())
480 }
481
482 pub fn create_checkpoint(
486 &self,
487 memtable_checksum: u64,
488 entry_count: u64,
489 ) -> std::io::Result<CheckpointRecord> {
490 let lsn = self.current_lsn.load(Ordering::SeqCst);
491
492 let segments = self.segments.read();
494 let last_segment = segments
495 .values()
496 .filter(|s| s.last_lsn.map(|l| l < lsn).unwrap_or(false))
497 .map(|s| s.sequence)
498 .max()
499 .unwrap_or(0);
500
501 let now = SystemTime::now()
502 .duration_since(UNIX_EPOCH)
503 .map(|d| d.as_millis() as u64)
504 .unwrap_or(0);
505
506 let record = CheckpointRecord {
507 lsn,
508 last_segment,
509 timestamp: now,
510 memtable_checksum,
511 entry_count,
512 };
513
514 let checkpoint_path = self.config.wal_dir.join("checkpoint");
516 let temp_path = self.config.wal_dir.join("checkpoint.tmp");
517
518 let mut file = File::create(&temp_path)?;
519 file.write_all(&record.encode())?;
520 file.sync_all()?;
521
522 fs::rename(&temp_path, &checkpoint_path)?;
523
524 *self.last_checkpoint.write() = Some(record.clone());
525
526 Ok(record)
527 }
528
529 pub fn cleanup_old_segments(&self) -> std::io::Result<usize> {
531 let checkpoint = self.last_checkpoint.read().clone();
532
533 let last_safe_segment = match checkpoint {
534 Some(cp) => cp.last_segment,
535 None => return Ok(0),
536 };
537
538 let mut segments = self.segments.write();
539 let old_segments: Vec<u64> = segments
540 .keys()
541 .filter(|&&seq| seq <= last_safe_segment)
542 .copied()
543 .collect();
544
545 let mut cleaned = 0;
546 for sequence in old_segments {
547 if let Some(meta) = segments.remove(&sequence) {
548 if meta.path.exists() {
549 fs::remove_file(&meta.path)?;
550 cleaned += 1;
551 }
552 }
553 }
554
555 Ok(cleaned)
556 }
557
558 pub fn stats(&self) -> SegmentStats {
560 let segments = self.segments.read();
561 let total_size: u64 = segments.values().map(|s| s.size).sum();
562 let checkpoint = self.last_checkpoint.read().clone();
563
564 SegmentStats {
565 segment_count: segments.len(),
566 total_size,
567 current_lsn: self.current_lsn.load(Ordering::SeqCst),
568 current_sequence: self.segment_sequence.load(Ordering::SeqCst),
569 last_checkpoint_lsn: checkpoint.as_ref().map(|c| c.lsn),
570 }
571 }
572
573 pub fn recovery_iterator(&self, from_lsn: u64) -> RecoveryIterator {
575 RecoveryIterator::new(self, from_lsn)
576 }
577
578 pub fn flush(&self) -> std::io::Result<()> {
580 let mut active = self.active.lock();
581 if let Some(ref mut segment) = *active {
582 segment.file.flush()?;
583 }
584 Ok(())
585 }
586
587 pub fn shutdown(&self) -> std::io::Result<()> {
589 self.shutdown.store(true, Ordering::SeqCst);
590
591 let mut active = self.active.lock();
592 if let Some(mut segment) = active.take() {
593 segment.file.flush()?;
594 segment.file.into_inner().map_err(|e| e.into_error())?.sync_all()?;
595 }
596
597 Ok(())
598 }
599}
600
601#[derive(Debug, Clone)]
603pub struct SegmentStats {
604 pub segment_count: usize,
606 pub total_size: u64,
608 pub current_lsn: u64,
610 pub current_sequence: u64,
612 pub last_checkpoint_lsn: Option<u64>,
614}
615
616pub struct RecoveryIterator<'a> {
618 manager: &'a WalSegmentManager,
619 current_segment_idx: usize,
620 segment_sequences: Vec<u64>,
621 current_reader: Option<BufReader<File>>,
622 current_offset: u64,
623 from_lsn: u64,
624}
625
626impl<'a> RecoveryIterator<'a> {
627 fn new(manager: &'a WalSegmentManager, from_lsn: u64) -> Self {
628 let segments = manager.segments.read();
629 let mut sequences: Vec<u64> = segments
630 .values()
631 .filter(|s| s.first_lsn >= from_lsn || s.last_lsn.map(|l| l >= from_lsn).unwrap_or(true))
632 .map(|s| s.sequence)
633 .collect();
634 sequences.sort();
635
636 Self {
637 manager,
638 current_segment_idx: 0,
639 segment_sequences: sequences,
640 current_reader: None,
641 current_offset: SEGMENT_HEADER_SIZE as u64,
642 from_lsn,
643 }
644 }
645
646 pub fn next_entry(&mut self) -> std::io::Result<Option<WalEntry>> {
648 loop {
649 if self.current_reader.is_none() {
651 if self.current_segment_idx >= self.segment_sequences.len() {
652 return Ok(None);
653 }
654
655 let sequence = self.segment_sequences[self.current_segment_idx];
656 let segments = self.manager.segments.read();
657 if let Some(meta) = segments.get(&sequence) {
658 let file = File::open(&meta.path)?;
659 let mut reader = BufReader::new(file);
660 reader.seek(SeekFrom::Start(SEGMENT_HEADER_SIZE as u64))?;
661 self.current_reader = Some(reader);
662 self.current_offset = SEGMENT_HEADER_SIZE as u64;
663 } else {
664 self.current_segment_idx += 1;
665 continue;
666 }
667 }
668
669 let reader = self.current_reader.as_mut().unwrap();
670
671 let mut len_buf = [0u8; 4];
673 match reader.read_exact(&mut len_buf) {
674 Ok(_) => {}
675 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
676 self.current_reader = None;
678 self.current_segment_idx += 1;
679 continue;
680 }
681 Err(e) => return Err(e),
682 }
683
684 let data_len = u32::from_le_bytes(len_buf) as usize;
685 if data_len == 0 || data_len > 100 * 1024 * 1024 {
686 self.current_reader = None;
688 self.current_segment_idx += 1;
689 continue;
690 }
691
692 let mut lsn_buf = [0u8; 8];
694 reader.read_exact(&mut lsn_buf)?;
695 let lsn = u64::from_le_bytes(lsn_buf);
696
697 let mut data = vec![0u8; data_len];
699 reader.read_exact(&mut data)?;
700
701 let mut checksum_buf = [0u8; 4];
703 reader.read_exact(&mut checksum_buf)?;
704 let stored_checksum = u32::from_le_bytes(checksum_buf);
705
706 let mut verify_buf = Vec::with_capacity(4 + 8 + data_len);
707 verify_buf.extend_from_slice(&len_buf);
708 verify_buf.extend_from_slice(&lsn_buf);
709 verify_buf.extend_from_slice(&data);
710 let computed_checksum = crc32fast::hash(&verify_buf);
711
712 if stored_checksum != computed_checksum {
713 return Err(std::io::Error::new(
714 std::io::ErrorKind::InvalidData,
715 "WAL entry checksum mismatch",
716 ));
717 }
718
719 self.current_offset += (4 + 8 + data_len + 4) as u64;
720
721 if lsn < self.from_lsn {
723 continue;
724 }
725
726 return Ok(Some(WalEntry { lsn, data }));
727 }
728 }
729}
730
731#[derive(Debug, Clone)]
733pub struct WalEntry {
734 pub lsn: u64,
736 pub data: Vec<u8>,
738}
739
740#[cfg(test)]
745mod tests {
746 use super::*;
747 use tempfile::tempdir;
748
749 #[test]
750 fn test_segment_manager_basic() {
751 let dir = tempdir().unwrap();
752 let config = SegmentConfig::default()
753 .with_wal_dir(dir.path())
754 .with_max_size(1024);
755
756 let manager = WalSegmentManager::new(config).unwrap();
757
758 for i in 0..100 {
760 let data = format!("entry_{}", i);
761 let lsn = manager.append(data.as_bytes()).unwrap();
762 assert_eq!(lsn, i as u64);
763 }
764
765 let stats = manager.stats();
766 assert!(stats.segment_count > 0);
767 assert_eq!(stats.current_lsn, 100);
768
769 manager.shutdown().unwrap();
770 }
771
772 #[test]
773 fn test_checkpoint_and_cleanup() {
774 let dir = tempdir().unwrap();
775 let config = SegmentConfig::default()
776 .with_wal_dir(dir.path())
777 .with_max_size(256);
778
779 let manager = WalSegmentManager::new(config).unwrap();
780
781 for i in 0..50 {
783 let data = format!("entry_{:04}", i);
784 manager.append(data.as_bytes()).unwrap();
785 }
786
787 manager.flush().unwrap();
789
790 let checkpoint = manager.create_checkpoint(12345, 50).unwrap();
792 assert!(checkpoint.lsn > 0);
793
794 let cleaned = manager.cleanup_old_segments().unwrap();
796 assert!(cleaned >= 0);
798
799 manager.shutdown().unwrap();
800 }
801
802 #[test]
803 fn test_recovery() {
804 let dir = tempdir().unwrap();
805 let config = SegmentConfig::default()
806 .with_wal_dir(dir.path());
807
808 {
810 let manager = WalSegmentManager::new(config.clone()).unwrap();
811 for i in 0..10 {
812 let data = format!("data_{}", i);
813 manager.append(data.as_bytes()).unwrap();
814 }
815 manager.shutdown().unwrap();
816 }
817
818 {
820 let manager = WalSegmentManager::new(config).unwrap();
821 let mut iter = manager.recovery_iterator(0);
822 let mut count = 0;
823
824 while let Some(entry) = iter.next_entry().unwrap() {
825 let data = String::from_utf8_lossy(&entry.data);
826 assert!(data.starts_with("data_"));
827 count += 1;
828 }
829
830 assert_eq!(count, 10);
831 }
832 }
833
834 #[test]
835 fn test_segment_header_encoding() {
836 let header = SegmentHeader::new(42, 12345);
837 let encoded = header.encode();
838 let decoded = SegmentHeader::decode(&encoded).unwrap();
839
840 assert_eq!(decoded.magic, SEGMENT_MAGIC);
841 assert_eq!(decoded.sequence, 42);
842 assert_eq!(decoded.first_lsn, 12345);
843 }
844
845 #[test]
846 fn test_checkpoint_record_encoding() {
847 let record = CheckpointRecord {
848 lsn: 1000,
849 last_segment: 5,
850 timestamp: 123456789,
851 memtable_checksum: 0xDEADBEEF,
852 entry_count: 500,
853 };
854
855 let encoded = record.encode();
856 let decoded = CheckpointRecord::decode(&encoded).unwrap();
857
858 assert_eq!(decoded.lsn, 1000);
859 assert_eq!(decoded.last_segment, 5);
860 assert_eq!(decoded.memtable_checksum, 0xDEADBEEF);
861 assert_eq!(decoded.entry_count, 500);
862 }
863}