allsource_core/infrastructure/persistence/
wal.rs1use crate::{
2 domain::entities::Event,
3 error::{AllSourceError, Result},
4};
5use chrono::{DateTime, Utc};
6use parking_lot::RwLock;
7use serde::{Deserialize, Serialize};
8use std::{
9 fs::{self, File, OpenOptions},
10 io::{BufRead, BufReader, BufWriter, Write},
11 path::{Path, PathBuf},
12 sync::Arc,
13};
14
15pub struct WriteAheadLog {
17 wal_dir: PathBuf,
19
20 current_file: Arc<RwLock<WALFile>>,
22
23 config: WALConfig,
25
26 stats: Arc<RwLock<WALStats>>,
28
29 sequence: Arc<RwLock<u64>>,
31
32 replication_tx: parking_lot::Mutex<Option<tokio::sync::broadcast::Sender<WALEntry>>>,
36}
37
38#[derive(Debug, Clone)]
39pub struct WALConfig {
40 pub max_file_size: usize,
42
43 pub sync_on_write: bool,
45
46 pub max_wal_files: usize,
48
49 pub compress: bool,
51}
52
53impl Default for WALConfig {
54 fn default() -> Self {
55 Self {
56 max_file_size: 64 * 1024 * 1024, sync_on_write: true,
58 max_wal_files: 10,
59 compress: false,
60 }
61 }
62}
63
64#[derive(Debug, Clone, Default, Serialize)]
65pub struct WALStats {
66 pub total_entries: u64,
67 pub total_bytes_written: u64,
68 pub current_file_size: usize,
69 pub files_rotated: u64,
70 pub files_cleaned: u64,
71 pub recovery_count: u64,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct WALEntry {
77 pub sequence: u64,
79
80 pub wal_timestamp: DateTime<Utc>,
82
83 pub event: Event,
85
86 pub checksum: u32,
88}
89
90impl WALEntry {
91 pub fn new(sequence: u64, event: Event) -> Self {
92 let mut entry = Self {
93 sequence,
94 wal_timestamp: Utc::now(),
95 event,
96 checksum: 0,
97 };
98 entry.checksum = entry.calculate_checksum();
99 entry
100 }
101
102 fn calculate_checksum(&self) -> u32 {
103 let data = format!("{}{}{}", self.sequence, self.wal_timestamp, self.event.id);
105 crc32fast::hash(data.as_bytes())
106 }
107
108 pub fn verify(&self) -> bool {
109 self.checksum == self.calculate_checksum()
110 }
111}
112
113struct WALFile {
115 path: PathBuf,
116 writer: BufWriter<File>,
117 size: usize,
118 created_at: DateTime<Utc>,
119}
120
121impl WALFile {
122 fn new(path: PathBuf) -> Result<Self> {
123 let file = OpenOptions::new()
124 .create(true)
125 .append(true)
126 .open(&path)
127 .map_err(|e| AllSourceError::StorageError(format!("Failed to open WAL file: {e}")))?;
128
129 let size = file.metadata().map(|m| m.len() as usize).unwrap_or(0);
130
131 Ok(Self {
132 path,
133 writer: BufWriter::new(file),
134 size,
135 created_at: Utc::now(),
136 })
137 }
138
139 fn write_entry(&mut self, entry: &WALEntry, sync: bool) -> Result<usize> {
140 let json = serde_json::to_string(entry)?;
142
143 let line = format!("{}\n", json);
144 let bytes_written = line.len();
145
146 self.writer
147 .write_all(line.as_bytes())
148 .map_err(|e| AllSourceError::StorageError(format!("Failed to write to WAL: {e}")))?;
149
150 if sync {
151 self.writer
152 .flush()
153 .map_err(|e| AllSourceError::StorageError(format!("Failed to flush WAL: {e}")))?;
154
155 self.writer
156 .get_ref()
157 .sync_all()
158 .map_err(|e| AllSourceError::StorageError(format!("Failed to sync WAL: {e}")))?;
159 }
160
161 self.size += bytes_written;
162 Ok(bytes_written)
163 }
164
165 fn flush(&mut self) -> Result<()> {
166 self.writer
167 .flush()
168 .map_err(|e| AllSourceError::StorageError(format!("Failed to flush WAL: {}", e)))?;
169 Ok(())
170 }
171}
172
173impl WriteAheadLog {
174 pub fn new(wal_dir: impl Into<PathBuf>, config: WALConfig) -> Result<Self> {
176 let wal_dir = wal_dir.into();
177
178 fs::create_dir_all(&wal_dir).map_err(|e| {
180 AllSourceError::StorageError(format!("Failed to create WAL directory: {e}"))
181 })?;
182
183 let initial_file_path = Self::generate_wal_filename(&wal_dir, 0);
185 let current_file = WALFile::new(initial_file_path)?;
186
187 tracing::info!("โ
WAL initialized at: {}", wal_dir.display());
188
189 Ok(Self {
190 wal_dir,
191 current_file: Arc::new(RwLock::new(current_file)),
192 config,
193 stats: Arc::new(RwLock::new(WALStats::default())),
194 sequence: Arc::new(RwLock::new(0)),
195 replication_tx: parking_lot::Mutex::new(None),
196 })
197 }
198
199 fn generate_wal_filename(dir: &Path, sequence: u64) -> PathBuf {
201 dir.join(format!("wal-{:016x}.log", sequence))
202 }
203
204 pub fn append(&self, event: Event) -> Result<u64> {
206 let mut seq = self.sequence.write();
208 *seq += 1;
209 let sequence = *seq;
210 drop(seq);
211
212 let entry = WALEntry::new(sequence, event);
214
215 let mut current = self.current_file.write();
217 let bytes_written = current.write_entry(&entry, self.config.sync_on_write)?;
218
219 let mut stats = self.stats.write();
221 stats.total_entries += 1;
222 stats.total_bytes_written += bytes_written as u64;
223 stats.current_file_size = current.size;
224 drop(stats);
225
226 if let Some(ref tx) = *self.replication_tx.lock() {
229 let _ = tx.send(entry);
230 }
231
232 let should_rotate = current.size >= self.config.max_file_size;
234 drop(current);
235
236 if should_rotate {
237 self.rotate()?;
238 }
239
240 tracing::trace!("WAL entry written: sequence={}", sequence);
241
242 Ok(sequence)
243 }
244
245 fn rotate(&self) -> Result<()> {
247 let seq = *self.sequence.read();
248 let new_file_path = Self::generate_wal_filename(&self.wal_dir, seq);
249
250 tracing::info!("๐ Rotating WAL to new file: {:?}", new_file_path);
251
252 let new_file = WALFile::new(new_file_path)?;
253
254 let mut current = self.current_file.write();
255 current.flush()?;
256 *current = new_file;
257
258 let mut stats = self.stats.write();
259 stats.files_rotated += 1;
260 stats.current_file_size = 0;
261 drop(stats);
262
263 self.cleanup_old_files()?;
265
266 Ok(())
267 }
268
269 fn cleanup_old_files(&self) -> Result<()> {
271 let mut wal_files = self.list_wal_files()?;
272 wal_files.sort();
273
274 if wal_files.len() > self.config.max_wal_files {
275 let to_remove = wal_files.len() - self.config.max_wal_files;
276 let files_to_delete = &wal_files[..to_remove];
277
278 for file_path in files_to_delete {
279 if let Err(e) = fs::remove_file(file_path) {
280 tracing::warn!("Failed to remove old WAL file {:?}: {}", file_path, e);
281 } else {
282 tracing::debug!("๐๏ธ Removed old WAL file: {:?}", file_path);
283 let mut stats = self.stats.write();
284 stats.files_cleaned += 1;
285 }
286 }
287 }
288
289 Ok(())
290 }
291
292 fn list_wal_files(&self) -> Result<Vec<PathBuf>> {
294 let entries = fs::read_dir(&self.wal_dir).map_err(|e| {
295 AllSourceError::StorageError(format!("Failed to read WAL directory: {e}"))
296 })?;
297
298 let mut wal_files = Vec::new();
299 for entry in entries {
300 let entry = entry.map_err(|e| {
301 AllSourceError::StorageError(format!("Failed to read directory entry: {e}"))
302 })?;
303
304 let path = entry.path();
305 if let Some(name) = path.file_name()
306 && name.to_string_lossy().starts_with("wal-")
307 && name.to_string_lossy().ends_with(".log")
308 {
309 wal_files.push(path);
310 }
311 }
312
313 Ok(wal_files)
314 }
315
316 pub fn recover(&self) -> Result<Vec<Event>> {
318 tracing::info!("๐ Starting WAL recovery...");
319
320 let mut wal_files = self.list_wal_files()?;
321 wal_files.sort();
322
323 let mut recovered_events = Vec::new();
324 let mut max_sequence = 0u64;
325 let mut corrupted_entries = 0;
326
327 for wal_file_path in &wal_files {
328 tracing::debug!("Reading WAL file: {:?}", wal_file_path);
329
330 let file = File::open(wal_file_path).map_err(|e| {
331 AllSourceError::StorageError(format!("Failed to open WAL file for recovery: {e}"))
332 })?;
333
334 let reader = BufReader::new(file);
335
336 for (line_num, line) in reader.lines().enumerate() {
337 let line = line.map_err(|e| {
338 AllSourceError::StorageError(format!("Failed to read WAL line: {e}"))
339 })?;
340
341 if line.trim().is_empty() {
342 continue;
343 }
344
345 match serde_json::from_str::<WALEntry>(&line) {
346 Ok(entry) => {
347 if !entry.verify() {
349 tracing::warn!(
350 "Corrupted WAL entry at {:?}:{} (checksum mismatch)",
351 wal_file_path,
352 line_num + 1
353 );
354 corrupted_entries += 1;
355 continue;
356 }
357
358 max_sequence = max_sequence.max(entry.sequence);
359 recovered_events.push(entry.event);
360 }
361 Err(e) => {
362 tracing::warn!(
363 "Failed to parse WAL entry at {:?}:{}: {}",
364 wal_file_path,
365 line_num + 1,
366 e
367 );
368 corrupted_entries += 1;
369 }
370 }
371 }
372 }
373
374 let mut seq = self.sequence.write();
376 *seq = max_sequence;
377 drop(seq);
378
379 let mut stats = self.stats.write();
381 stats.recovery_count += 1;
382 drop(stats);
383
384 tracing::info!(
385 "โ
WAL recovery complete: {} events recovered, {} corrupted entries",
386 recovered_events.len(),
387 corrupted_entries
388 );
389
390 Ok(recovered_events)
391 }
392
393 pub fn flush(&self) -> Result<()> {
395 let mut current = self.current_file.write();
396 current.flush()?;
397 Ok(())
398 }
399
400 pub fn truncate(&self) -> Result<()> {
402 tracing::info!("๐งน Truncating WAL after checkpoint");
403
404 let mut current = self.current_file.write();
406 current.flush()?;
407
408 let wal_files = self.list_wal_files()?;
410 for file_path in wal_files {
411 fs::remove_file(&file_path).map_err(|e| {
412 AllSourceError::StorageError(format!("Failed to remove WAL file: {e}"))
413 })?;
414 tracing::debug!("Removed WAL file: {:?}", file_path);
415 }
416
417 let new_file_path = Self::generate_wal_filename(&self.wal_dir, 0);
419 *current = WALFile::new(new_file_path)?;
420
421 let mut seq = self.sequence.write();
423 *seq = 0;
424
425 tracing::info!("โ
WAL truncated successfully");
426
427 Ok(())
428 }
429
430 pub fn stats(&self) -> WALStats {
432 (*self.stats.read()).clone()
433 }
434
435 pub fn current_sequence(&self) -> u64 {
437 *self.sequence.read()
438 }
439
440 pub fn oldest_sequence(&self) -> Option<u64> {
446 let mut wal_files = match self.list_wal_files() {
447 Ok(files) => files,
448 Err(_) => return None,
449 };
450
451 if wal_files.is_empty() {
452 return None;
453 }
454
455 wal_files.sort();
456
457 for wal_file_path in &wal_files {
459 let file = match File::open(wal_file_path) {
460 Ok(f) => f,
461 Err(_) => continue,
462 };
463 let reader = BufReader::new(file);
464 for line in reader.lines() {
465 let line = match line {
466 Ok(l) => l,
467 Err(_) => continue,
468 };
469 if line.trim().is_empty() {
470 continue;
471 }
472 if let Ok(entry) = serde_json::from_str::<WALEntry>(&line) {
473 return Some(entry.sequence);
474 }
475 }
476 }
477
478 None
479 }
480
481 pub fn set_replication_tx(&self, tx: tokio::sync::broadcast::Sender<WALEntry>) {
486 *self.replication_tx.lock() = Some(tx);
487 }
488}
489
490#[cfg(test)]
491mod tests {
492 use super::*;
493 use serde_json::json;
494 use tempfile::TempDir;
495 use uuid::Uuid;
496
497 fn create_test_event() -> Event {
498 Event::reconstruct_from_strings(
499 Uuid::new_v4(),
500 "test.event".to_string(),
501 "test-entity".to_string(),
502 "default".to_string(),
503 json!({"test": "data"}),
504 Utc::now(),
505 None,
506 1,
507 )
508 }
509
510 #[test]
511 fn test_wal_creation() {
512 let temp_dir = TempDir::new().unwrap();
513 let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default());
514 assert!(wal.is_ok());
515 }
516
517 #[test]
518 fn test_wal_append() {
519 let temp_dir = TempDir::new().unwrap();
520 let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
521
522 let event = create_test_event();
523 let seq = wal.append(event);
524 assert!(seq.is_ok());
525 assert_eq!(seq.unwrap(), 1);
526
527 let stats = wal.stats();
528 assert_eq!(stats.total_entries, 1);
529 }
530
531 #[test]
532 fn test_wal_recovery() {
533 let temp_dir = TempDir::new().unwrap();
534 let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
535
536 for _ in 0..5 {
538 wal.append(create_test_event()).unwrap();
539 }
540
541 wal.flush().unwrap();
542
543 let wal2 = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
545 let recovered = wal2.recover().unwrap();
546
547 assert_eq!(recovered.len(), 5);
548 }
549
550 #[test]
551 fn test_wal_rotation() {
552 let temp_dir = TempDir::new().unwrap();
553 let config = WALConfig {
554 max_file_size: 1024, ..Default::default()
556 };
557
558 let wal = WriteAheadLog::new(temp_dir.path(), config).unwrap();
559
560 for _ in 0..50 {
562 wal.append(create_test_event()).unwrap();
563 }
564
565 let stats = wal.stats();
566 assert!(stats.files_rotated > 0);
567 }
568
569 #[test]
570 fn test_wal_entry_checksum() {
571 let event = create_test_event();
572 let entry = WALEntry::new(1, event);
573
574 assert!(entry.verify());
575
576 let mut corrupted = entry.clone();
578 corrupted.checksum = 0;
579 assert!(!corrupted.verify());
580 }
581
582 #[test]
583 fn test_wal_truncate() {
584 let temp_dir = TempDir::new().unwrap();
585 let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
586
587 for _ in 0..5 {
589 wal.append(create_test_event()).unwrap();
590 }
591
592 wal.truncate().unwrap();
594
595 assert_eq!(wal.current_sequence(), 0);
597
598 let recovered = wal.recover().unwrap();
600 assert_eq!(recovered.len(), 0);
601 }
602}