allsource_core/infrastructure/persistence/
wal.rs1use crate::{
2 domain::entities::Event,
3 error::{AllSourceError, Result},
4};
5use chrono::{DateTime, Utc};
6use parking_lot::RwLock;
7use serde::{Deserialize, Serialize};
8use std::{
9 fs::{self, File, OpenOptions},
10 io::{BufRead, BufReader, BufWriter, Write},
11 path::{Path, PathBuf},
12 sync::Arc,
13};
14
15pub struct WriteAheadLog {
17 wal_dir: PathBuf,
19
20 current_file: Arc<RwLock<WALFile>>,
22
23 config: WALConfig,
25
26 stats: Arc<RwLock<WALStats>>,
28
29 sequence: Arc<RwLock<u64>>,
31
32 replication_tx: parking_lot::Mutex<Option<tokio::sync::broadcast::Sender<WALEntry>>>,
36}
37
38#[derive(Debug, Clone)]
39pub struct WALConfig {
40 pub max_file_size: usize,
42
43 pub sync_on_write: bool,
45
46 pub max_wal_files: usize,
48
49 pub compress: bool,
51
52 pub fsync_interval_ms: Option<u64>,
57}
58
59impl Default for WALConfig {
60 fn default() -> Self {
61 Self {
62 max_file_size: 64 * 1024 * 1024, sync_on_write: true,
64 max_wal_files: 10,
65 compress: false,
66 fsync_interval_ms: None,
67 }
68 }
69}
70
71#[derive(Debug, Clone, Default, Serialize)]
72pub struct WALStats {
73 pub total_entries: u64,
74 pub total_bytes_written: u64,
75 pub current_file_size: usize,
76 pub files_rotated: u64,
77 pub files_cleaned: u64,
78 pub recovery_count: u64,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct WALEntry {
84 pub sequence: u64,
86
87 pub wal_timestamp: DateTime<Utc>,
89
90 pub event: Event,
92
93 pub checksum: u32,
95}
96
97impl WALEntry {
98 pub fn new(sequence: u64, event: Event) -> Self {
99 let mut entry = Self {
100 sequence,
101 wal_timestamp: Utc::now(),
102 event,
103 checksum: 0,
104 };
105 entry.checksum = entry.calculate_checksum();
106 entry
107 }
108
109 fn calculate_checksum(&self) -> u32 {
110 let data = format!("{}{}{}", self.sequence, self.wal_timestamp, self.event.id);
112 crc32fast::hash(data.as_bytes())
113 }
114
115 pub fn verify(&self) -> bool {
116 self.checksum == self.calculate_checksum()
117 }
118}
119
120struct WALFile {
122 path: PathBuf,
123 writer: BufWriter<File>,
124 size: usize,
125 created_at: DateTime<Utc>,
126}
127
128impl WALFile {
129 fn new(path: PathBuf) -> Result<Self> {
130 let file = OpenOptions::new()
131 .create(true)
132 .append(true)
133 .open(&path)
134 .map_err(|e| AllSourceError::StorageError(format!("Failed to open WAL file: {e}")))?;
135
136 let size = file.metadata().map(|m| m.len() as usize).unwrap_or(0);
137
138 Ok(Self {
139 path,
140 writer: BufWriter::new(file),
141 size,
142 created_at: Utc::now(),
143 })
144 }
145
146 fn write_entry(&mut self, entry: &WALEntry, sync: bool) -> Result<usize> {
147 let json = serde_json::to_string(entry)?;
149
150 let line = format!("{json}\n");
151 let bytes_written = line.len();
152
153 self.writer
154 .write_all(line.as_bytes())
155 .map_err(|e| AllSourceError::StorageError(format!("Failed to write to WAL: {e}")))?;
156
157 if sync {
158 self.writer
159 .flush()
160 .map_err(|e| AllSourceError::StorageError(format!("Failed to flush WAL: {e}")))?;
161
162 self.writer
163 .get_ref()
164 .sync_all()
165 .map_err(|e| AllSourceError::StorageError(format!("Failed to sync WAL: {e}")))?;
166 }
167
168 self.size += bytes_written;
169 Ok(bytes_written)
170 }
171
172 fn flush(&mut self) -> Result<()> {
173 self.writer
174 .flush()
175 .map_err(|e| AllSourceError::StorageError(format!("Failed to flush WAL: {e}")))?;
176 Ok(())
177 }
178}
179
180impl WriteAheadLog {
181 pub fn new(wal_dir: impl Into<PathBuf>, config: WALConfig) -> Result<Self> {
183 let wal_dir = wal_dir.into();
184
185 fs::create_dir_all(&wal_dir).map_err(|e| {
187 AllSourceError::StorageError(format!("Failed to create WAL directory: {e}"))
188 })?;
189
190 let initial_file_path = Self::generate_wal_filename(&wal_dir, 0);
192 let current_file = WALFile::new(initial_file_path)?;
193
194 tracing::info!("โ
WAL initialized at: {}", wal_dir.display());
195
196 Ok(Self {
197 wal_dir,
198 current_file: Arc::new(RwLock::new(current_file)),
199 config,
200 stats: Arc::new(RwLock::new(WALStats::default())),
201 sequence: Arc::new(RwLock::new(0)),
202 replication_tx: parking_lot::Mutex::new(None),
203 })
204 }
205
206 fn generate_wal_filename(dir: &Path, sequence: u64) -> PathBuf {
208 dir.join(format!("wal-{sequence:016x}.log"))
209 }
210
211 #[cfg_attr(feature = "hotpath", hotpath::measure)]
213 pub fn append(&self, event: Event) -> Result<u64> {
214 let mut seq = self.sequence.write();
216 *seq += 1;
217 let sequence = *seq;
218 drop(seq);
219
220 let entry = WALEntry::new(sequence, event);
222
223 let mut current = self.current_file.write();
225 let bytes_written = current.write_entry(&entry, self.config.sync_on_write)?;
226
227 let mut stats = self.stats.write();
229 stats.total_entries += 1;
230 stats.total_bytes_written += bytes_written as u64;
231 stats.current_file_size = current.size;
232 drop(stats);
233
234 if let Some(ref tx) = *self.replication_tx.lock() {
237 let _ = tx.send(entry);
238 }
239
240 let should_rotate = current.size >= self.config.max_file_size;
242 drop(current);
243
244 if should_rotate {
245 self.rotate()?;
246 }
247
248 tracing::trace!("WAL entry written: sequence={}", sequence);
249
250 Ok(sequence)
251 }
252
253 #[cfg_attr(feature = "hotpath", hotpath::measure)]
255 fn rotate(&self) -> Result<()> {
256 let seq = *self.sequence.read();
257 let new_file_path = Self::generate_wal_filename(&self.wal_dir, seq);
258
259 tracing::info!("๐ Rotating WAL to new file: {:?}", new_file_path);
260
261 let new_file = WALFile::new(new_file_path)?;
262
263 let mut current = self.current_file.write();
264 current.flush()?;
265 *current = new_file;
266
267 let mut stats = self.stats.write();
268 stats.files_rotated += 1;
269 stats.current_file_size = 0;
270 drop(stats);
271
272 self.cleanup_old_files()?;
274
275 Ok(())
276 }
277
278 #[cfg_attr(feature = "hotpath", hotpath::measure)]
280 fn cleanup_old_files(&self) -> Result<()> {
281 let mut wal_files = self.list_wal_files()?;
282 wal_files.sort();
283
284 if wal_files.len() > self.config.max_wal_files {
285 let to_remove = wal_files.len() - self.config.max_wal_files;
286 let files_to_delete = &wal_files[..to_remove];
287
288 for file_path in files_to_delete {
289 if let Err(e) = fs::remove_file(file_path) {
290 tracing::warn!("Failed to remove old WAL file {:?}: {}", file_path, e);
291 } else {
292 tracing::debug!("๐๏ธ Removed old WAL file: {:?}", file_path);
293 let mut stats = self.stats.write();
294 stats.files_cleaned += 1;
295 }
296 }
297 }
298
299 Ok(())
300 }
301
302 fn list_wal_files(&self) -> Result<Vec<PathBuf>> {
304 let entries = fs::read_dir(&self.wal_dir).map_err(|e| {
305 AllSourceError::StorageError(format!("Failed to read WAL directory: {e}"))
306 })?;
307
308 let mut wal_files = Vec::new();
309 for entry in entries {
310 let entry = entry.map_err(|e| {
311 AllSourceError::StorageError(format!("Failed to read directory entry: {e}"))
312 })?;
313
314 let path = entry.path();
315 if let Some(name) = path.file_name()
316 && name.to_string_lossy().starts_with("wal-")
317 && name.to_string_lossy().ends_with(".log")
318 {
319 wal_files.push(path);
320 }
321 }
322
323 Ok(wal_files)
324 }
325
326 #[cfg_attr(feature = "hotpath", hotpath::measure)]
328 pub fn recover(&self) -> Result<Vec<Event>> {
329 tracing::info!("๐ Starting WAL recovery...");
330
331 let mut wal_files = self.list_wal_files()?;
332 wal_files.sort();
333
334 let mut recovered_events = Vec::new();
335 let mut max_sequence = 0u64;
336 let mut corrupted_entries = 0;
337
338 for wal_file_path in &wal_files {
339 tracing::debug!("Reading WAL file: {:?}", wal_file_path);
340
341 let file = File::open(wal_file_path).map_err(|e| {
342 AllSourceError::StorageError(format!("Failed to open WAL file for recovery: {e}"))
343 })?;
344
345 let reader = BufReader::new(file);
346
347 for (line_num, line) in reader.lines().enumerate() {
348 let line = match line {
349 Ok(l) => l,
350 Err(e) => {
351 tracing::warn!(
352 "I/O error reading WAL line at {:?}:{}: {}",
353 wal_file_path,
354 line_num + 1,
355 e
356 );
357 corrupted_entries += 1;
358 continue;
359 }
360 };
361
362 if line.trim().is_empty() {
363 continue;
364 }
365
366 match serde_json::from_str::<WALEntry>(&line) {
367 Ok(entry) => {
368 if !entry.verify() {
370 tracing::warn!(
371 "Corrupted WAL entry at {:?}:{} (checksum mismatch)",
372 wal_file_path,
373 line_num + 1
374 );
375 corrupted_entries += 1;
376 continue;
377 }
378
379 max_sequence = max_sequence.max(entry.sequence);
380 recovered_events.push(entry.event);
381 }
382 Err(e) => {
383 tracing::warn!(
384 "Failed to parse WAL entry at {:?}:{}: {}",
385 wal_file_path,
386 line_num + 1,
387 e
388 );
389 corrupted_entries += 1;
390 }
391 }
392 }
393 }
394
395 let mut seq = self.sequence.write();
397 *seq = max_sequence;
398 drop(seq);
399
400 let mut stats = self.stats.write();
402 stats.recovery_count += 1;
403 drop(stats);
404
405 tracing::info!(
406 "โ
WAL recovery complete: {} events recovered, {} corrupted entries",
407 recovered_events.len(),
408 corrupted_entries
409 );
410
411 Ok(recovered_events)
412 }
413
414 #[cfg_attr(feature = "hotpath", hotpath::measure)]
416 pub fn flush(&self) -> Result<()> {
417 let mut current = self.current_file.write();
418 current.flush()?;
419 Ok(())
420 }
421
422 #[cfg_attr(feature = "hotpath", hotpath::measure)]
428 pub fn sync(&self) -> Result<()> {
429 let mut current = self.current_file.write();
430 current
431 .writer
432 .flush()
433 .map_err(|e| AllSourceError::StorageError(format!("Failed to flush WAL: {e}")))?;
434 current
435 .writer
436 .get_ref()
437 .sync_all()
438 .map_err(|e| AllSourceError::StorageError(format!("Failed to sync WAL: {e}")))?;
439 Ok(())
440 }
441
442 pub fn fsync_interval_ms(&self) -> Option<u64> {
444 self.config.fsync_interval_ms
445 }
446
447 #[cfg_attr(feature = "hotpath", hotpath::measure)]
449 pub fn truncate(&self) -> Result<()> {
450 tracing::info!("๐งน Truncating WAL after checkpoint");
451
452 let mut current = self.current_file.write();
454 current.flush()?;
455
456 let wal_files = self.list_wal_files()?;
458 for file_path in wal_files {
459 fs::remove_file(&file_path).map_err(|e| {
460 AllSourceError::StorageError(format!("Failed to remove WAL file: {e}"))
461 })?;
462 tracing::debug!("Removed WAL file: {:?}", file_path);
463 }
464
465 let new_file_path = Self::generate_wal_filename(&self.wal_dir, 0);
467 *current = WALFile::new(new_file_path)?;
468
469 let mut seq = self.sequence.write();
471 *seq = 0;
472
473 tracing::info!("โ
WAL truncated successfully");
474
475 Ok(())
476 }
477
478 pub fn stats(&self) -> WALStats {
480 (*self.stats.read()).clone()
481 }
482
483 pub fn current_sequence(&self) -> u64 {
485 *self.sequence.read()
486 }
487
488 pub fn oldest_sequence(&self) -> Option<u64> {
494 let Ok(mut wal_files) = self.list_wal_files() else {
495 return None;
496 };
497
498 if wal_files.is_empty() {
499 return None;
500 }
501
502 wal_files.sort();
503
504 for wal_file_path in &wal_files {
506 let Ok(file) = File::open(wal_file_path) else {
507 continue;
508 };
509 let reader = BufReader::new(file);
510 for line in reader.lines() {
511 let Ok(line) = line else {
512 continue;
513 };
514 if line.trim().is_empty() {
515 continue;
516 }
517 if let Ok(entry) = serde_json::from_str::<WALEntry>(&line) {
518 return Some(entry.sequence);
519 }
520 }
521 }
522
523 None
524 }
525
526 pub fn set_replication_tx(&self, tx: tokio::sync::broadcast::Sender<WALEntry>) {
531 *self.replication_tx.lock() = Some(tx);
532 }
533}
534
535#[cfg(test)]
536mod tests {
537 use super::*;
538 use serde_json::json;
539 use tempfile::TempDir;
540 use uuid::Uuid;
541
542 fn create_test_event() -> Event {
543 Event::reconstruct_from_strings(
544 Uuid::new_v4(),
545 "test.event".to_string(),
546 "test-entity".to_string(),
547 "default".to_string(),
548 json!({"test": "data"}),
549 Utc::now(),
550 None,
551 1,
552 )
553 }
554
555 #[test]
556 fn test_wal_creation() {
557 let temp_dir = TempDir::new().unwrap();
558 let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default());
559 assert!(wal.is_ok());
560 }
561
562 #[test]
563 fn test_wal_append() {
564 let temp_dir = TempDir::new().unwrap();
565 let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
566
567 let event = create_test_event();
568 let seq = wal.append(event);
569 assert!(seq.is_ok());
570 assert_eq!(seq.unwrap(), 1);
571
572 let stats = wal.stats();
573 assert_eq!(stats.total_entries, 1);
574 }
575
576 #[test]
577 fn test_wal_recovery() {
578 let temp_dir = TempDir::new().unwrap();
579 let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
580
581 for _ in 0..5 {
583 wal.append(create_test_event()).unwrap();
584 }
585
586 wal.flush().unwrap();
587
588 let wal2 = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
590 let recovered = wal2.recover().unwrap();
591
592 assert_eq!(recovered.len(), 5);
593 }
594
595 #[test]
596 fn test_wal_recovery_with_partial_write() {
597 let temp_dir = TempDir::new().unwrap();
598 let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
599
600 for _ in 0..3 {
602 wal.append(create_test_event()).unwrap();
603 }
604 wal.flush().unwrap();
605
606 let wal_file_path = temp_dir.path().join("wal-0000000000000000.log");
608 use std::io::Write as _;
609 let mut f = std::fs::OpenOptions::new()
610 .append(true)
611 .open(&wal_file_path)
612 .unwrap();
613 f.write_all(b"{\"partial\": true, \"seq\"\n").unwrap(); drop(f);
615
616 let wal2 = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
618 let recovered = wal2.recover().unwrap();
619 assert_eq!(
620 recovered.len(),
621 3,
622 "Should recover only the 3 valid events, not the partial one"
623 );
624 }
625
626 #[test]
627 fn test_wal_rotation() {
628 let temp_dir = TempDir::new().unwrap();
629 let config = WALConfig {
630 max_file_size: 1024, ..Default::default()
632 };
633
634 let wal = WriteAheadLog::new(temp_dir.path(), config).unwrap();
635
636 for _ in 0..50 {
638 wal.append(create_test_event()).unwrap();
639 }
640
641 let stats = wal.stats();
642 assert!(stats.files_rotated > 0);
643 }
644
645 #[test]
646 fn test_wal_entry_checksum() {
647 let event = create_test_event();
648 let entry = WALEntry::new(1, event);
649
650 assert!(entry.verify());
651
652 let mut corrupted = entry.clone();
654 corrupted.checksum = 0;
655 assert!(!corrupted.verify());
656 }
657
658 #[test]
659 fn test_wal_fsync_interval_config() {
660 let config = WALConfig {
661 fsync_interval_ms: Some(100),
662 ..Default::default()
663 };
664 assert_eq!(config.fsync_interval_ms, Some(100));
665 assert_eq!(WALConfig::default().fsync_interval_ms, None);
667 }
668
669 #[test]
670 fn test_wal_sync_method() {
671 let temp_dir = TempDir::new().unwrap();
672 let config = WALConfig {
673 sync_on_write: false, ..Default::default()
675 };
676 let wal = WriteAheadLog::new(temp_dir.path(), config).unwrap();
677
678 for _ in 0..5 {
680 wal.append(create_test_event()).unwrap();
681 }
682
683 wal.sync().unwrap();
685
686 let wal2 = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
688 let recovered = wal2.recover().unwrap();
689 assert_eq!(recovered.len(), 5);
690 }
691
692 #[test]
693 fn test_wal_truncate() {
694 let temp_dir = TempDir::new().unwrap();
695 let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
696
697 for _ in 0..5 {
699 wal.append(create_test_event()).unwrap();
700 }
701
702 wal.truncate().unwrap();
704
705 assert_eq!(wal.current_sequence(), 0);
707
708 let recovered = wal.recover().unwrap();
710 assert_eq!(recovered.len(), 0);
711 }
712}