Skip to main content

raft_wal/
lib.rs

1//! # raft-wal
2//!
3//! A minimal append-only WAL (Write-Ahead Log) optimized for Raft consensus.
4//!
5//! - **Segment-based storage** — log is split into segment files; `compact()`
6//!   deletes old segments without rewriting.
7//! - **CRC32C checksums** — HW-accelerated integrity checks on every entry.
8//! - **Raft-correct durability** — metadata (term/vote) is always fsynced;
9//!   log entries are buffered with opt-in [`RaftWal::sync`].
10//! - **Parallel recovery** — segments are read and verified concurrently.
11//! - **openraft integration** — enable `openraft-storage` feature for
12//!   [`RaftLogStorage`](openraft::storage::RaftLogStorage) implementation.
13//!
14//! ## Usage
15//!
16//! ```rust
17//! use raft_wal::RaftWal;
18//!
19//! # let dir = tempfile::tempdir().unwrap();
20//! let mut wal = RaftWal::open(dir.path()).unwrap();
21//!
22//! wal.append(1, b"entry-1").unwrap();
23//! wal.append(2, b"entry-2").unwrap();
24//!
25//! let entries: Vec<_> = wal.iter().collect();
26//! assert_eq!(entries.len(), 2);
27//!
28//! wal.set_meta("vote", b"node-1").unwrap();
29//! assert_eq!(wal.get_meta("vote"), Some(b"node-1".as_slice()));
30//! ```
31
32#![cfg_attr(not(feature = "std"), no_std)]
33#![deny(missing_docs)]
34#![deny(clippy::unwrap_used)]
35
36#[cfg(not(feature = "std"))]
37extern crate alloc;
38
39/// CRC32C functions (hw-accelerated with `std`, software fallback in `no_std`).
40pub mod crc;
41/// Wire format serialization and parsing (no_std compatible).
42pub mod wire;
43
44#[macro_use]
45pub(crate) mod macros;
46pub(crate) mod core;
47
48#[cfg(feature = "std")]
49pub(crate) mod segment;
50pub(crate) mod state;
51
52mod storage;
53pub use storage::WalStorage;
54
55mod generic;
56pub use generic::GenericRaftWal;
57
58#[cfg(feature = "std")]
59mod std_storage;
60#[cfg(feature = "std")]
61pub use std_storage::StdStorage;
62
63#[cfg(feature = "tokio")]
64mod tokio;
65#[cfg(feature = "tokio")]
66pub use self::tokio::AsyncRaftWal;
67
68#[cfg(feature = "io-uring")]
69mod uring;
70#[cfg(feature = "io-uring")]
71pub use uring::UringRaftWal;
72
73#[cfg(feature = "io-uring-bridge")]
74mod bridge;
75#[cfg(feature = "io-uring-bridge")]
76pub use bridge::BridgedUringWal;
77
78#[cfg(feature = "std")]
79pub mod impls;
80
81#[cfg(feature = "openraft-storage")]
82pub use impls::openraft::OpenRaftLogStorage;
83
84/// Errors returned by WAL operations.
85#[cfg(feature = "std")]
86#[derive(Debug)]
87pub enum WalError {
88    /// An I/O error.
89    Io(std::io::Error),
90}
91
92#[cfg(feature = "std")]
93impl std::fmt::Display for WalError {
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        match self {
96            Self::Io(e) => write!(f, "WAL I/O: {e}"),
97        }
98    }
99}
100
101#[cfg(feature = "std")]
102impl std::error::Error for WalError {
103    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
104        match self {
105            Self::Io(e) => Some(e),
106        }
107    }
108}
109
110#[cfg(feature = "std")]
111impl From<std::io::Error> for WalError {
112    fn from(e: std::io::Error) -> Self {
113        Self::Io(e)
114    }
115}
116
117/// Result type for WAL operations.
118#[cfg(feature = "std")]
119pub type Result<T> = std::result::Result<T, WalError>;
120
121/// A borrowed entry yielded by iterators over WAL entries.
122pub struct Entry<'a> {
123    /// The Raft log index.
124    pub index: u64,
125    /// The entry payload.
126    pub data: &'a [u8],
127}
128
129/// An append-only WAL optimized for Raft, backed by the local filesystem.
130///
131/// This is a type alias for [`GenericRaftWal`] with [`StdStorage`].
132/// See [`GenericRaftWal`] for the full API documentation.
133#[cfg(feature = "std")]
134pub type RaftWal = GenericRaftWal<StdStorage>;
135
136#[cfg(feature = "std")]
137impl RaftWal {
138    /// Opens or creates a WAL in the given directory.
139    ///
140    /// # Errors
141    ///
142    /// Returns an error if the directory cannot be created or existing
143    /// segments cannot be recovered.
144    pub fn open(data_dir: impl AsRef<std::path::Path>) -> Result<Self> {
145        let storage = StdStorage::new(data_dir)?;
146        GenericRaftWal::new(storage).map_err(WalError::Io)
147    }
148
149    /// Returns the entry at the given index.
150    ///
151    /// Returns `Cow::Borrowed` for cached (in-memory) entries and
152    /// `Cow::Owned` for evicted entries read from disk.
153    #[must_use]
154    pub fn get(&self, index: u64) -> Option<std::borrow::Cow<'_, [u8]>> {
155        if let Some(data) = self.get_cached(index) {
156            return Some(std::borrow::Cow::Borrowed(data));
157        }
158        self.get_or_read(index)
159            .ok()
160            .flatten()
161            .map(std::borrow::Cow::Owned)
162    }
163}
164
165#[cfg(all(test, feature = "std"))]
166mod tests {
167    use super::*;
168
169    #[test]
170    fn open_append_get() {
171        let dir = tempfile::tempdir().expect("tempdir");
172        let mut wal = RaftWal::open(dir.path()).expect("open");
173        wal.append(1, b"hello").expect("append");
174        assert_eq!(wal.get(1).as_deref(), Some(b"hello".as_slice()));
175        assert_eq!(wal.len(), 1);
176    }
177
178    #[test]
179    fn read_range_works() {
180        let dir = tempfile::tempdir().expect("tempdir");
181        let mut wal = RaftWal::open(dir.path()).expect("open");
182        for i in 1..=10 {
183            wal.append(i, format!("e{i}").as_bytes()).expect("append");
184        }
185        let r = wal.read_range(3..=7);
186        assert_eq!(r.len(), 5);
187        assert_eq!(r[0].0, 3);
188    }
189
190    #[test]
191    fn iter_range_borrowed() {
192        let dir = tempfile::tempdir().expect("tempdir");
193        let mut wal = RaftWal::open(dir.path()).expect("open");
194        for i in 1..=5 {
195            wal.append(i, format!("e{i}").as_bytes()).expect("append");
196        }
197        let entries: Vec<_> = wal.iter_range(2..=4).collect();
198        assert_eq!(entries.len(), 3);
199        assert_eq!(entries[0].index, 2);
200        assert_eq!(entries[0].data, b"e2");
201        assert_eq!(entries[2].index, 4);
202    }
203
204    #[test]
205    fn iter_all() {
206        let dir = tempfile::tempdir().expect("tempdir");
207        let mut wal = RaftWal::open(dir.path()).expect("open");
208        for i in 1..=3 {
209            wal.append(i, format!("e{i}").as_bytes()).expect("append");
210        }
211        let entries: Vec<_> = wal.iter().collect();
212        assert_eq!(entries.len(), 3);
213        assert_eq!(entries[0].index, 1);
214        assert_eq!(entries[2].data, b"e3");
215    }
216
217    #[test]
218    fn recovery() {
219        let dir = tempfile::tempdir().expect("tempdir");
220        let path = dir.path().to_path_buf();
221        {
222            let mut wal = RaftWal::open(&path).expect("open");
223            wal.append(1, b"a").expect("a");
224            wal.append(2, b"b").expect("b");
225            wal.set_meta("vote", b"v1").expect("meta");
226        }
227        {
228            let wal = RaftWal::open(&path).expect("reopen");
229            assert_eq!(wal.len(), 2);
230            assert_eq!(wal.get(2).as_deref(), Some(b"b".as_slice()));
231            assert_eq!(wal.get_meta("vote"), Some(b"v1".as_slice()));
232        }
233    }
234
235    #[test]
236    fn compact_works() {
237        let dir = tempfile::tempdir().expect("tempdir");
238        let mut wal = RaftWal::open(dir.path()).expect("open");
239        for i in 1..=5 {
240            wal.append(i, b"x").expect("a");
241        }
242        wal.compact(3).expect("compact");
243        assert_eq!(wal.len(), 2);
244        assert_eq!(wal.first_index(), Some(4));
245    }
246
247    #[test]
248    fn truncate_works() {
249        let dir = tempfile::tempdir().expect("tempdir");
250        let mut wal = RaftWal::open(dir.path()).expect("open");
251        for i in 1..=5 {
252            wal.append(i, b"x").expect("a");
253        }
254        wal.truncate(3).expect("truncate");
255        assert_eq!(wal.len(), 2);
256        assert_eq!(wal.last_index(), Some(2));
257    }
258
259    #[test]
260    fn batch_append_borrowed() {
261        let dir = tempfile::tempdir().expect("tempdir");
262        let mut wal = RaftWal::open(dir.path()).expect("open");
263        wal.append_batch(&[(1, b"a" as &[u8]), (2, b"b"), (3, b"c")])
264            .expect("batch");
265        assert_eq!(wal.len(), 3);
266        assert_eq!(wal.get(2).as_deref(), Some(b"b".as_slice()));
267    }
268
269    #[test]
270    fn batch_append_owned() {
271        let dir = tempfile::tempdir().expect("tempdir");
272        let mut wal = RaftWal::open(dir.path()).expect("open");
273        let entries = vec![(1u64, vec![1u8, 2, 3]), (2, vec![4, 5, 6])];
274        wal.append_batch(&entries).expect("batch owned");
275        assert_eq!(wal.len(), 2);
276        assert_eq!(wal.get(1).as_deref(), Some([1u8, 2, 3].as_slice()));
277    }
278
279    #[test]
280    fn meta_operations() {
281        let dir = tempfile::tempdir().expect("tempdir");
282        let mut wal = RaftWal::open(dir.path()).expect("open");
283        assert!(wal.get_meta("k").is_none());
284        wal.set_meta("k", b"v").expect("set");
285        assert_eq!(wal.get_meta("k"), Some(b"v".as_slice()));
286        wal.remove_meta("k").expect("rm");
287        assert!(wal.get_meta("k").is_none());
288    }
289
290    #[test]
291    fn empty_wal() {
292        let dir = tempfile::tempdir().expect("tempdir");
293        let wal = RaftWal::open(dir.path()).expect("open");
294        assert!(wal.is_empty());
295        assert_eq!(wal.first_index(), None);
296        assert_eq!(wal.last_index(), None);
297    }
298
299    #[test]
300    fn recovery_after_compact() {
301        let dir = tempfile::tempdir().expect("tempdir");
302        let path = dir.path().to_path_buf();
303        {
304            let mut wal = RaftWal::open(&path).expect("open");
305            for i in 1..=5 {
306                wal.append(i, format!("e{i}").as_bytes()).expect("a");
307            }
308            wal.compact(3).expect("compact");
309        }
310        {
311            let wal = RaftWal::open(&path).expect("reopen");
312            assert_eq!(wal.len(), 2);
313            assert_eq!(wal.first_index(), Some(4));
314            assert_eq!(wal.get(4).as_deref(), Some(b"e4".as_slice()));
315        }
316    }
317
318    #[test]
319    fn recovery_after_truncate() {
320        let dir = tempfile::tempdir().expect("tempdir");
321        let path = dir.path().to_path_buf();
322        {
323            let mut wal = RaftWal::open(&path).expect("open");
324            for i in 1..=5 {
325                wal.append(i, format!("e{i}").as_bytes()).expect("a");
326            }
327            wal.truncate(4).expect("truncate");
328        }
329        {
330            let wal = RaftWal::open(&path).expect("reopen");
331            assert_eq!(wal.len(), 3);
332            assert_eq!(wal.last_index(), Some(3));
333            assert!(wal.get(4).is_none());
334        }
335    }
336
337    #[test]
338    fn append_after_compact() {
339        let dir = tempfile::tempdir().expect("tempdir");
340        let mut wal = RaftWal::open(dir.path()).expect("open");
341        for i in 1..=5 {
342            wal.append(i, b"x").expect("a");
343        }
344        wal.compact(3).expect("compact");
345        wal.append(6, b"new").expect("append after compact");
346        assert_eq!(wal.len(), 3);
347        assert_eq!(wal.get(6).as_deref(), Some(b"new".as_slice()));
348        assert_eq!(wal.first_index(), Some(4));
349    }
350
351    #[test]
352    fn append_after_truncate() {
353        let dir = tempfile::tempdir().expect("tempdir");
354        let mut wal = RaftWal::open(dir.path()).expect("open");
355        for i in 1..=5 {
356            wal.append(i, b"old").expect("a");
357        }
358        wal.truncate(3).expect("truncate");
359        wal.append(3, b"new").expect("append replacement");
360        assert_eq!(wal.len(), 3);
361        assert_eq!(wal.get(3).as_deref(), Some(b"new".as_slice()));
362    }
363
364    #[test]
365    fn compact_all() {
366        let dir = tempfile::tempdir().expect("tempdir");
367        let mut wal = RaftWal::open(dir.path()).expect("open");
368        for i in 1..=3 {
369            wal.append(i, b"x").expect("a");
370        }
371        wal.compact(3).expect("compact all");
372        assert!(wal.is_empty());
373        assert_eq!(wal.first_index(), None);
374    }
375
376    #[test]
377    fn truncate_all() {
378        let dir = tempfile::tempdir().expect("tempdir");
379        let mut wal = RaftWal::open(dir.path()).expect("open");
380        for i in 1..=3 {
381            wal.append(i, b"x").expect("a");
382        }
383        wal.truncate(1).expect("truncate all");
384        assert!(wal.is_empty());
385        assert_eq!(wal.last_index(), None);
386    }
387
388    #[test]
389    fn truncate_noop_on_empty() {
390        let dir = tempfile::tempdir().expect("tempdir");
391        let mut wal = RaftWal::open(dir.path()).expect("open");
392        wal.compact(10).expect("noop");
393        wal.truncate(1).expect("noop");
394        assert!(wal.is_empty());
395    }
396
397    #[test]
398    fn truncate_out_of_range() {
399        let dir = tempfile::tempdir().expect("tempdir");
400        let mut wal = RaftWal::open(dir.path()).expect("open");
401        for i in 5..=10 {
402            wal.append(i, b"x").expect("a");
403        }
404        wal.compact(2).expect("below range");
405        assert_eq!(wal.len(), 6);
406        wal.truncate(20).expect("above range");
407        assert_eq!(wal.len(), 6);
408    }
409
410    #[test]
411    fn get_out_of_range() {
412        let dir = tempfile::tempdir().expect("tempdir");
413        let mut wal = RaftWal::open(dir.path()).expect("open");
414        wal.append(5, b"x").expect("a");
415        assert!(wal.get(0).is_none());
416        assert!(wal.get(4).is_none());
417        assert!(wal.get(6).is_none());
418        assert!(wal.get(u64::MAX).is_none());
419    }
420
421    #[test]
422    fn read_range_empty_result() {
423        let dir = tempfile::tempdir().expect("tempdir");
424        let mut wal = RaftWal::open(dir.path()).expect("open");
425        for i in 5..=10 {
426            wal.append(i, b"x").expect("a");
427        }
428        assert!(wal.read_range(1..=4).is_empty());
429        assert!(wal.read_range(11..=20).is_empty());
430        assert!(wal.read_range(8..8).is_empty());
431    }
432
433    #[test]
434    fn read_range_partial_overlap() {
435        let dir = tempfile::tempdir().expect("tempdir");
436        let mut wal = RaftWal::open(dir.path()).expect("open");
437        for i in 5..=10 {
438            wal.append(i, format!("e{i}").as_bytes()).expect("a");
439        }
440        let r = wal.read_range(3..=7);
441        assert_eq!(r.len(), 3);
442        assert_eq!(r[0].0, 5);
443        assert_eq!(r[2].0, 7);
444    }
445
446    #[test]
447    fn large_entry() {
448        let dir = tempfile::tempdir().expect("tempdir");
449        let path = dir.path().to_path_buf();
450        let big = vec![0xABu8; 1024 * 1024];
451        {
452            let mut wal = RaftWal::open(&path).expect("open");
453            wal.append(1, &big).expect("append big");
454        }
455        {
456            let wal = RaftWal::open(&path).expect("reopen");
457            assert_eq!(wal.get(1).expect("get").as_ref(), big.as_slice());
458        }
459    }
460
461    #[test]
462    fn flush_persists() {
463        let dir = tempfile::tempdir().expect("tempdir");
464        let path = dir.path().to_path_buf();
465        {
466            let mut wal = RaftWal::open(&path).expect("open");
467            wal.append(1, b"buffered").expect("a");
468            wal.flush().expect("flush");
469        }
470        {
471            let wal = RaftWal::open(&path).expect("reopen");
472            assert_eq!(wal.get(1).as_deref(), Some(b"buffered".as_slice()));
473        }
474    }
475
476    #[test]
477    fn sync_persists() {
478        let dir = tempfile::tempdir().expect("tempdir");
479        let path = dir.path().to_path_buf();
480        {
481            let mut wal = RaftWal::open(&path).expect("open");
482            wal.append(1, b"durable").expect("a");
483            wal.sync().expect("sync");
484        }
485        {
486            let wal = RaftWal::open(&path).expect("reopen");
487            assert_eq!(wal.get(1).as_deref(), Some(b"durable".as_slice()));
488        }
489    }
490
491    #[test]
492    fn meta_survives_crash() {
493        let dir = tempfile::tempdir().expect("tempdir");
494        let path = dir.path().to_path_buf();
495        {
496            let mut wal = RaftWal::open(&path).expect("open");
497            wal.set_meta("term", b"5").expect("set term");
498            wal.set_meta("vote", b"node-2").expect("set vote");
499        }
500        {
501            let wal = RaftWal::open(&path).expect("reopen");
502            assert_eq!(wal.get_meta("term"), Some(b"5".as_slice()));
503            assert_eq!(wal.get_meta("vote"), Some(b"node-2".as_slice()));
504        }
505    }
506
507    #[test]
508    fn error_source_chain() {
509        let err = WalError::Io(std::io::Error::new(std::io::ErrorKind::NotFound, "gone"));
510        assert!(std::error::Error::source(&err).is_some());
511    }
512
513    #[test]
514    fn estimated_memory_increases() {
515        let dir = tempfile::tempdir().expect("tempdir");
516        let mut wal = RaftWal::open(dir.path()).expect("open");
517        let before = wal.estimated_memory();
518        for i in 1..=100 {
519            wal.append(i, &[0u8; 256]).expect("a");
520        }
521        assert!(wal.estimated_memory() > before);
522    }
523
524    #[test]
525    fn segment_rotation() {
526        let dir = tempfile::tempdir().expect("tempdir");
527        let mut wal = RaftWal::open(dir.path()).expect("open");
528        wal.set_max_segment_size(200);
529
530        for i in 1..=100 {
531            wal.append(i, &[0u8; 32]).expect("a");
532        }
533
534        // All entries still readable
535        for i in 1..=100 {
536            assert!(wal.get(i).is_some(), "missing index {i}");
537        }
538    }
539
540    #[test]
541    fn segment_rotation_recovery() {
542        let dir = tempfile::tempdir().expect("tempdir");
543        let path = dir.path().to_path_buf();
544        {
545            let mut wal = RaftWal::open(&path).expect("open");
546            wal.set_max_segment_size(200);
547            for i in 1..=100 {
548                wal.append(i, format!("e{i}").as_bytes()).expect("a");
549            }
550        }
551        {
552            let wal = RaftWal::open(&path).expect("reopen");
553            assert_eq!(wal.len(), 100);
554            assert_eq!(wal.get(1).as_deref(), Some(b"e1".as_slice()));
555            assert_eq!(wal.get(100).as_deref(), Some(b"e100".as_slice()));
556        }
557    }
558
559    #[test]
560    fn compact_deletes_old_segments() {
561        let dir = tempfile::tempdir().expect("tempdir");
562        let mut wal = RaftWal::open(dir.path()).expect("open");
563        wal.set_max_segment_size(200);
564
565        for i in 1..=100 {
566            wal.append(i, &[0u8; 32]).expect("a");
567        }
568        let seg_count_before = segment::list_segments(dir.path()).len();
569
570        wal.compact(80).expect("compact");
571        let seg_count_after = segment::list_segments(dir.path()).len();
572
573        assert!(seg_count_after < seg_count_before);
574        assert_eq!(wal.len(), 20);
575        assert_eq!(wal.first_index(), Some(81));
576    }
577}