laminar_storage/per_core_wal/
manager.rs1use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6
7use laminar_core::storage_io::IoCompletion;
8
9use super::entry::PerCoreWalEntry;
10use super::error::PerCoreWalError;
11use super::reader::PerCoreWalReader;
12use super::writer::CoreWalWriter;
13
14#[derive(Debug, Clone)]
16pub struct PerCoreWalConfig {
17 pub base_dir: PathBuf,
19 pub num_cores: usize,
21 pub segment_pattern: String,
23}
24
25impl PerCoreWalConfig {
26 #[must_use]
28 pub fn new(base_dir: &Path, num_cores: usize) -> Self {
29 Self {
30 base_dir: base_dir.to_path_buf(),
31 num_cores,
32 segment_pattern: "wal-{core_id}.log".to_string(),
33 }
34 }
35
36 #[must_use]
40 pub fn with_segment_pattern(mut self, pattern: &str) -> Self {
41 self.segment_pattern = pattern.to_string();
42 self
43 }
44
45 #[must_use]
47 pub fn segment_path(&self, core_id: usize) -> PathBuf {
48 let filename = self
49 .segment_pattern
50 .replace("{core_id}", &core_id.to_string());
51 self.base_dir.join(filename)
52 }
53}
54
55pub struct PerCoreWalManager {
60 config: PerCoreWalConfig,
62 writers: Vec<CoreWalWriter>,
64 global_epoch: Arc<AtomicU64>,
66}
67
68impl PerCoreWalManager {
69 pub fn new(config: PerCoreWalConfig) -> Result<Self, PerCoreWalError> {
77 std::fs::create_dir_all(&config.base_dir)?;
79
80 let mut writers = Vec::with_capacity(config.num_cores);
82 for core_id in 0..config.num_cores {
83 let path = config.segment_path(core_id);
84 let writer = CoreWalWriter::new(core_id, &path)?;
85 writers.push(writer);
86 }
87
88 Ok(Self {
89 config,
90 writers,
91 global_epoch: Arc::new(AtomicU64::new(0)),
92 })
93 }
94
95 pub fn open(config: PerCoreWalConfig) -> Result<Self, PerCoreWalError> {
103 let mut writers = Vec::with_capacity(config.num_cores);
104 let mut max_epoch = 0u64;
105
106 for core_id in 0..config.num_cores {
107 let path = config.segment_path(core_id);
108 if !path.exists() {
109 return Err(PerCoreWalError::SegmentNotFound { core_id, path });
110 }
111
112 let mut reader = PerCoreWalReader::open(core_id, &path)?;
115 let mut valid_end = 0u64;
116 loop {
117 let pos_before = reader.position();
118 match reader.read_next()? {
119 super::reader::WalReadResult::Entry(entry) => {
120 max_epoch = max_epoch.max(entry.epoch);
121 valid_end = reader.position();
122 }
123 super::reader::WalReadResult::Eof => break,
124 super::reader::WalReadResult::TornWrite { .. }
125 | super::reader::WalReadResult::ChecksumMismatch { .. }
126 | super::reader::WalReadResult::Corrupted { .. } => {
127 valid_end = pos_before;
128 break;
129 }
130 }
131 }
132
133 let writer = CoreWalWriter::open_at(core_id, &path, valid_end)?;
134 writers.push(writer);
135 }
136
137 Ok(Self {
138 config,
139 writers,
140 global_epoch: Arc::new(AtomicU64::new(max_epoch)),
141 })
142 }
143
144 #[must_use]
146 pub fn config(&self) -> &PerCoreWalConfig {
147 &self.config
148 }
149
150 #[must_use]
152 pub fn num_cores(&self) -> usize {
153 self.config.num_cores
154 }
155
156 #[must_use]
158 pub fn epoch(&self) -> u64 {
159 self.global_epoch.load(Ordering::Acquire)
160 }
161
162 #[must_use]
164 pub fn epoch_ref(&self) -> Arc<AtomicU64> {
165 Arc::clone(&self.global_epoch)
166 }
167
168 #[must_use]
174 pub fn writer(&mut self, core_id: usize) -> &mut CoreWalWriter {
175 &mut self.writers[core_id]
176 }
177
178 pub fn writer_checked(
184 &mut self,
185 core_id: usize,
186 ) -> Result<&mut CoreWalWriter, PerCoreWalError> {
187 if core_id >= self.config.num_cores {
188 return Err(PerCoreWalError::InvalidCoreId {
189 core_id,
190 max_core_id: self.config.num_cores - 1,
191 });
192 }
193 Ok(&mut self.writers[core_id])
194 }
195
196 #[must_use]
198 pub fn advance_epoch(&self) -> u64 {
199 self.global_epoch.fetch_add(1, Ordering::AcqRel) + 1
200 }
201
202 pub fn set_epoch_all(&mut self, epoch: u64) {
204 for writer in &mut self.writers {
205 writer.set_epoch(epoch);
206 }
207 }
208
209 pub fn sync_all(&mut self) -> Result<(), PerCoreWalError> {
215 for writer in &mut self.writers {
216 writer.sync()?;
217 }
218 Ok(())
219 }
220
221 #[must_use]
226 pub fn positions(&self) -> Vec<u64> {
227 self.writers.iter().map(CoreWalWriter::position).collect()
228 }
229
230 #[must_use]
235 pub fn synced_positions(&self) -> Vec<u64> {
236 self.writers
237 .iter()
238 .map(CoreWalWriter::synced_position)
239 .collect()
240 }
241
242 pub fn truncate_all(&mut self, positions: &[u64]) -> Result<(), PerCoreWalError> {
248 if positions.len() != self.config.num_cores {
249 return Err(PerCoreWalError::InvalidCoreId {
250 core_id: positions.len(),
251 max_core_id: self.config.num_cores - 1,
252 });
253 }
254
255 for (core_id, &position) in positions.iter().enumerate() {
256 self.writers[core_id].truncate(position)?;
257 }
258 Ok(())
259 }
260
261 pub fn reset_all(&mut self) -> Result<(), PerCoreWalError> {
269 for writer in &mut self.writers {
270 writer.reset()?;
271 }
272 Ok(())
273 }
274
275 pub fn merge_segments(&self) -> Result<Vec<PerCoreWalEntry>, PerCoreWalError> {
281 let mut entries = Vec::new();
282
283 for core_id in 0..self.config.num_cores {
284 let path = self.config.segment_path(core_id);
285 if path.exists() {
286 let mut reader = PerCoreWalReader::open(core_id, &path)?;
287 entries.extend(reader.read_all()?);
288 }
289 }
290
291 entries.sort();
293
294 Ok(entries)
295 }
296
297 pub fn merge_segments_up_to_epoch(
303 &self,
304 max_epoch: u64,
305 ) -> Result<Vec<PerCoreWalEntry>, PerCoreWalError> {
306 let mut entries = Vec::new();
307
308 for core_id in 0..self.config.num_cores {
309 let path = self.config.segment_path(core_id);
310 if path.exists() {
311 let mut reader = PerCoreWalReader::open(core_id, &path)?;
312 entries.extend(reader.read_up_to_epoch(max_epoch)?);
313 }
314 }
315
316 entries.sort();
318
319 Ok(entries)
320 }
321
322 pub fn write_epoch_barrier_all(&mut self) -> Result<(), PerCoreWalError> {
330 for writer in &mut self.writers {
331 writer.append_epoch_barrier()?;
332 }
333 Ok(())
334 }
335
336 pub fn check_all_completions(
349 &mut self,
350 completions: &[IoCompletion],
351 ) -> Result<usize, PerCoreWalError> {
352 if completions.is_empty() {
353 return Ok(0);
354 }
355 let mut count = 0;
356 let mut first_err: Option<PerCoreWalError> = None;
357 for writer in &mut self.writers {
358 match writer.check_completions(completions) {
359 Ok(true) => count += 1,
360 Ok(false) => {}
361 Err(e) => {
362 if first_err.is_none() {
363 first_err = Some(e);
364 }
365 }
366 }
367 }
368 match first_err {
369 Some(e) => Err(e),
370 None => Ok(count),
371 }
372 }
373
374 #[must_use]
376 pub fn any_sync_pending(&self) -> bool {
377 self.writers.iter().any(CoreWalWriter::is_sync_pending)
378 }
379
380 #[must_use]
382 pub fn total_size(&self) -> u64 {
383 self.writers.iter().map(CoreWalWriter::position).sum()
384 }
385
386 #[must_use]
388 pub fn segment_path(&self, core_id: usize) -> PathBuf {
389 self.config.segment_path(core_id)
390 }
391}
392
393impl std::fmt::Debug for PerCoreWalManager {
394 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
395 f.debug_struct("PerCoreWalManager")
396 .field("config", &self.config)
397 .field("num_cores", &self.config.num_cores)
398 .field("epoch", &self.epoch())
399 .field("total_size", &self.total_size())
400 .finish_non_exhaustive()
401 }
402}
403
404#[cfg(test)]
405mod tests {
406 use super::*;
407 use tempfile::TempDir;
408
409 fn setup_manager(num_cores: usize) -> (PerCoreWalManager, TempDir) {
410 let temp_dir = TempDir::new().unwrap();
411 let config = PerCoreWalConfig::new(temp_dir.path(), num_cores);
412 let manager = PerCoreWalManager::new(config).unwrap();
413 (manager, temp_dir)
414 }
415
416 #[test]
417 fn test_manager_creation() {
418 let (manager, _temp_dir) = setup_manager(4);
419 assert_eq!(manager.num_cores(), 4);
420 assert_eq!(manager.epoch(), 0);
421 }
422
423 #[test]
424 fn test_writer_access() {
425 let (mut manager, _temp_dir) = setup_manager(4);
426
427 let writer = manager.writer(0);
428 assert_eq!(writer.core_id(), 0);
429
430 let writer = manager.writer(3);
431 assert_eq!(writer.core_id(), 3);
432 }
433
434 #[test]
435 fn test_writer_checked_invalid() {
436 let (mut manager, _temp_dir) = setup_manager(4);
437
438 let result = manager.writer_checked(5);
439 assert!(matches!(result, Err(PerCoreWalError::InvalidCoreId { .. })));
440 }
441
442 #[test]
443 fn test_advance_epoch() {
444 let (manager, _temp_dir) = setup_manager(4);
445
446 assert_eq!(manager.epoch(), 0);
447 let new_epoch = manager.advance_epoch();
448 assert_eq!(new_epoch, 1);
449 assert_eq!(manager.epoch(), 1);
450
451 let new_epoch = manager.advance_epoch();
452 assert_eq!(new_epoch, 2);
453 }
454
455 #[test]
456 fn test_set_epoch_all() {
457 let (mut manager, _temp_dir) = setup_manager(4);
458
459 manager.set_epoch_all(5);
460
461 for core_id in 0..4 {
462 assert_eq!(manager.writer(core_id).epoch(), 5);
463 }
464 }
465
466 #[test]
467 fn test_parallel_writes() {
468 let (mut manager, _temp_dir) = setup_manager(4);
469 manager.set_epoch_all(1);
470
471 manager.writer(0).append_put(b"key0", b"value0").unwrap();
473 manager.writer(1).append_put(b"key1", b"value1").unwrap();
474 manager.writer(2).append_put(b"key2", b"value2").unwrap();
475 manager.writer(3).append_put(b"key3", b"value3").unwrap();
476
477 manager.sync_all().unwrap();
478
479 let positions = manager.positions();
481 assert!(positions[0] > 0);
482 assert!(positions[1] > 0);
483 assert!(positions[2] > 0);
484 assert!(positions[3] > 0);
485 }
486
487 #[test]
488 fn test_merge_segments() {
489 let (mut manager, _temp_dir) = setup_manager(2);
490
491 manager.set_epoch_all(1);
493 manager.writer(0).append_put(b"key0a", b"value0a").unwrap();
494 manager.writer(1).append_put(b"key1a", b"value1a").unwrap();
495
496 manager.set_epoch_all(2);
498 manager.writer(0).append_put(b"key0b", b"value0b").unwrap();
499 manager.writer(1).append_put(b"key1b", b"value1b").unwrap();
500
501 manager.sync_all().unwrap();
502
503 let entries = manager.merge_segments().unwrap();
505 assert_eq!(entries.len(), 4);
506
507 assert_eq!(entries[0].epoch, 1);
509 assert_eq!(entries[1].epoch, 1);
510 assert_eq!(entries[2].epoch, 2);
512 assert_eq!(entries[3].epoch, 2);
513 }
514
515 #[test]
516 fn test_merge_segments_up_to_epoch() {
517 let (mut manager, _temp_dir) = setup_manager(2);
518
519 manager.set_epoch_all(1);
520 manager.writer(0).append_put(b"key0a", b"value0a").unwrap();
521
522 manager.set_epoch_all(2);
523 manager.writer(0).append_put(b"key0b", b"value0b").unwrap();
524
525 manager.set_epoch_all(3);
526 manager.writer(0).append_put(b"key0c", b"value0c").unwrap();
527
528 manager.sync_all().unwrap();
529
530 let entries = manager.merge_segments_up_to_epoch(2).unwrap();
531 assert_eq!(entries.len(), 2); }
533
534 #[test]
535 fn test_reset_all() {
536 let (mut manager, _temp_dir) = setup_manager(2);
537
538 manager.writer(0).append_put(b"key0", b"value0").unwrap();
539 manager.writer(1).append_put(b"key1", b"value1").unwrap();
540 manager.sync_all().unwrap();
541
542 assert!(manager.total_size() > 0);
543
544 manager.reset_all().unwrap();
545
546 assert_eq!(manager.total_size(), 0);
547 assert_eq!(manager.positions(), vec![0, 0]);
548 }
549
550 #[test]
551 fn test_truncate_all() {
552 let (mut manager, _temp_dir) = setup_manager(2);
553
554 manager.writer(0).append_put(b"key0", b"value0").unwrap();
555 let pos0 = manager.writer(0).position();
556
557 manager.writer(0).append_put(b"key0b", b"value0b").unwrap();
558 manager.writer(1).append_put(b"key1", b"value1").unwrap();
559
560 manager.sync_all().unwrap();
561
562 manager.truncate_all(&[pos0, 0]).unwrap();
563
564 let positions = manager.positions();
565 assert_eq!(positions[0], pos0);
566 assert_eq!(positions[1], 0);
567 }
568
569 #[test]
570 fn test_write_epoch_barrier_all() {
571 let (mut manager, _temp_dir) = setup_manager(2);
572 manager.set_epoch_all(1);
573
574 manager.write_epoch_barrier_all().unwrap();
575 manager.sync_all().unwrap();
576
577 let entries = manager.merge_segments().unwrap();
578 assert_eq!(entries.len(), 2);
579 for entry in entries {
580 assert!(matches!(
581 entry.operation,
582 super::super::entry::WalOperation::EpochBarrier { .. }
583 ));
584 }
585 }
586
587 #[test]
588 fn test_open_existing() {
589 let temp_dir = TempDir::new().unwrap();
590 let config = PerCoreWalConfig::new(temp_dir.path(), 2);
591
592 {
594 let mut manager = PerCoreWalManager::new(config.clone()).unwrap();
595 manager.set_epoch_all(5);
596 manager.writer(0).append_put(b"key0", b"value0").unwrap();
597 manager.writer(1).append_put(b"key1", b"value1").unwrap();
598 manager.sync_all().unwrap();
599 }
600
601 let manager = PerCoreWalManager::open(config).unwrap();
603 assert_eq!(manager.epoch(), 5);
604
605 let entries = manager.merge_segments().unwrap();
606 assert_eq!(entries.len(), 2);
607 }
608
609 #[test]
610 fn test_segment_path() {
611 let config = PerCoreWalConfig::new(Path::new("/data/wal"), 4);
612 assert_eq!(config.segment_path(0), PathBuf::from("/data/wal/wal-0.log"));
613 assert_eq!(config.segment_path(3), PathBuf::from("/data/wal/wal-3.log"));
614
615 let custom = config.with_segment_pattern("segment-{core_id}.wal");
616 assert_eq!(
617 custom.segment_path(1),
618 PathBuf::from("/data/wal/segment-1.wal")
619 );
620 }
621
622 #[test]
623 fn test_debug_format() {
624 let (manager, _temp_dir) = setup_manager(4);
625 let debug_str = format!("{manager:?}");
626 assert!(debug_str.contains("PerCoreWalManager"));
627 assert!(debug_str.contains("num_cores"));
628 }
629}