Skip to main content

raft_wal/
lib.rs

1//! # raft-wal
2//!
3//! A minimal append-only WAL (Write-Ahead Log) optimized for Raft consensus.
4//!
5//! - **Segment-based storage** — log is split into segment files; `compact()`
6//!   deletes old segments without rewriting.
7//! - **CRC32C checksums** — HW-accelerated integrity checks on every entry.
8//! - **Raft-correct durability** — metadata (term/vote) is always fsynced;
9//!   log entries are buffered with opt-in [`RaftWal::sync`].
10//! - **Parallel recovery** — segments are read and verified concurrently.
11//! - **openraft integration** — enable `openraft-storage` feature for
12//!   [`RaftLogStorage`](openraft::storage::RaftLogStorage) implementation.
13//!
14//! ## Usage
15//!
16//! ```rust
17//! use raft_wal::RaftWal;
18//!
19//! # let dir = tempfile::tempdir().unwrap();
20//! let mut wal = RaftWal::open(dir.path()).unwrap();
21//!
22//! wal.append(1, b"entry-1").unwrap();
23//! wal.append(2, b"entry-2").unwrap();
24//!
25//! let entries: Vec<_> = wal.iter().collect();
26//! assert_eq!(entries.len(), 2);
27//!
28//! wal.set_meta("vote", b"node-1").unwrap();
29//! assert_eq!(wal.get_meta("vote"), Some(b"node-1".as_slice()));
30//! ```
31
32#![cfg_attr(not(feature = "std"), no_std)]
33#![deny(missing_docs)]
34#![deny(clippy::unwrap_used)]
35
36#[cfg(not(feature = "std"))]
37extern crate alloc;
38
39/// CRC32C functions (hw-accelerated with `std`, software fallback in `no_std`).
40pub mod crc;
41/// Wire format serialization and parsing (no_std compatible).
42pub mod wire;
43
44#[macro_use]
45pub(crate) mod macros;
46pub(crate) mod core;
47
48#[cfg(feature = "std")]
49pub(crate) mod segment;
50pub(crate) mod state;
51
52mod storage;
53pub use storage::WalStorage;
54
55mod generic;
56pub use generic::GenericRaftWal;
57
58#[cfg(feature = "std")]
59mod std_storage;
60#[cfg(feature = "std")]
61pub use std_storage::StdStorage;
62
63#[cfg(feature = "tokio")]
64mod tokio;
65#[cfg(feature = "tokio")]
66pub use self::tokio::AsyncRaftWal;
67
68#[cfg(feature = "io-uring")]
69mod uring;
70#[cfg(feature = "io-uring")]
71pub use uring::UringRaftWal;
72
73#[cfg(feature = "io-uring-bridge")]
74mod bridge;
75#[cfg(feature = "io-uring-bridge")]
76pub use bridge::BridgedUringWal;
77
78#[cfg(feature = "std")]
79pub mod impls;
80
81#[cfg(feature = "openraft-storage")]
82pub use impls::openraft::OpenRaftLogStorage;
83
84/// Errors returned by WAL operations.
85#[cfg(feature = "std")]
86#[derive(Debug)]
87pub enum WalError {
88    /// An I/O error.
89    Io(std::io::Error),
90}
91
92#[cfg(feature = "std")]
93impl std::fmt::Display for WalError {
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        match self {
96            Self::Io(e) => write!(f, "WAL I/O: {e}"),
97        }
98    }
99}
100
101#[cfg(feature = "std")]
102impl std::error::Error for WalError {
103    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
104        match self {
105            Self::Io(e) => Some(e),
106        }
107    }
108}
109
110#[cfg(feature = "std")]
111impl From<std::io::Error> for WalError {
112    fn from(e: std::io::Error) -> Self {
113        Self::Io(e)
114    }
115}
116
117/// Result type for WAL operations.
118#[cfg(feature = "std")]
119pub type Result<T> = std::result::Result<T, WalError>;
120
121/// A borrowed entry yielded by iterators over WAL entries.
122pub struct Entry<'a> {
123    /// The Raft log index.
124    pub index: u64,
125    /// The entry payload.
126    pub data: &'a [u8],
127}
128
129/// An append-only WAL optimized for Raft, backed by the local filesystem.
130///
131/// This is a type alias for [`GenericRaftWal`] with [`StdStorage`].
132/// See [`GenericRaftWal`] for the full API documentation.
133#[cfg(feature = "std")]
134pub type RaftWal = GenericRaftWal<StdStorage>;
135
136#[cfg(feature = "std")]
137impl RaftWal {
138    /// Opens or creates a WAL in the given directory.
139    ///
140    /// # Errors
141    ///
142    /// Returns an error if the directory cannot be created or existing
143    /// segments cannot be recovered.
144    pub fn open(data_dir: impl AsRef<std::path::Path>) -> Result<Self> {
145        let storage = StdStorage::new(data_dir)?;
146        GenericRaftWal::new(storage).map_err(WalError::Io)
147    }
148
149    /// Returns the entry at the given index.
150    ///
151    /// Returns `Cow::Borrowed` for cached (in-memory) entries and
152    /// `Cow::Owned` for evicted entries read from disk.
153    #[must_use]
154    pub fn get(&self, index: u64) -> Option<std::borrow::Cow<'_, [u8]>> {
155        if let Some(data) = self.get_cached(index) {
156            return Some(std::borrow::Cow::Borrowed(data));
157        }
158        self.get_or_read(index).ok().flatten().map(std::borrow::Cow::Owned)
159    }
160}
161
162#[cfg(all(test, feature = "std"))]
163mod tests {
164    use super::*;
165
166    #[test]
167    fn open_append_get() {
168        let dir = tempfile::tempdir().expect("tempdir");
169        let mut wal = RaftWal::open(dir.path()).expect("open");
170        wal.append(1, b"hello").expect("append");
171        assert_eq!(wal.get(1).as_deref(), Some(b"hello".as_slice()));
172        assert_eq!(wal.len(), 1);
173    }
174
175    #[test]
176    fn read_range_works() {
177        let dir = tempfile::tempdir().expect("tempdir");
178        let mut wal = RaftWal::open(dir.path()).expect("open");
179        for i in 1..=10 {
180            wal.append(i, format!("e{i}").as_bytes()).expect("append");
181        }
182        let r = wal.read_range(3..=7);
183        assert_eq!(r.len(), 5);
184        assert_eq!(r[0].0, 3);
185    }
186
187    #[test]
188    fn iter_range_borrowed() {
189        let dir = tempfile::tempdir().expect("tempdir");
190        let mut wal = RaftWal::open(dir.path()).expect("open");
191        for i in 1..=5 {
192            wal.append(i, format!("e{i}").as_bytes()).expect("append");
193        }
194        let entries: Vec<_> = wal.iter_range(2..=4).collect();
195        assert_eq!(entries.len(), 3);
196        assert_eq!(entries[0].index, 2);
197        assert_eq!(entries[0].data, b"e2");
198        assert_eq!(entries[2].index, 4);
199    }
200
201    #[test]
202    fn iter_all() {
203        let dir = tempfile::tempdir().expect("tempdir");
204        let mut wal = RaftWal::open(dir.path()).expect("open");
205        for i in 1..=3 {
206            wal.append(i, format!("e{i}").as_bytes()).expect("append");
207        }
208        let entries: Vec<_> = wal.iter().collect();
209        assert_eq!(entries.len(), 3);
210        assert_eq!(entries[0].index, 1);
211        assert_eq!(entries[2].data, b"e3");
212    }
213
214    #[test]
215    fn recovery() {
216        let dir = tempfile::tempdir().expect("tempdir");
217        let path = dir.path().to_path_buf();
218        {
219            let mut wal = RaftWal::open(&path).expect("open");
220            wal.append(1, b"a").expect("a");
221            wal.append(2, b"b").expect("b");
222            wal.set_meta("vote", b"v1").expect("meta");
223        }
224        {
225            let wal = RaftWal::open(&path).expect("reopen");
226            assert_eq!(wal.len(), 2);
227            assert_eq!(wal.get(2).as_deref(), Some(b"b".as_slice()));
228            assert_eq!(wal.get_meta("vote"), Some(b"v1".as_slice()));
229        }
230    }
231
232    #[test]
233    fn compact_works() {
234        let dir = tempfile::tempdir().expect("tempdir");
235        let mut wal = RaftWal::open(dir.path()).expect("open");
236        for i in 1..=5 {
237            wal.append(i, b"x").expect("a");
238        }
239        wal.compact(3).expect("compact");
240        assert_eq!(wal.len(), 2);
241        assert_eq!(wal.first_index(), Some(4));
242    }
243
244    #[test]
245    fn truncate_works() {
246        let dir = tempfile::tempdir().expect("tempdir");
247        let mut wal = RaftWal::open(dir.path()).expect("open");
248        for i in 1..=5 {
249            wal.append(i, b"x").expect("a");
250        }
251        wal.truncate(3).expect("truncate");
252        assert_eq!(wal.len(), 2);
253        assert_eq!(wal.last_index(), Some(2));
254    }
255
256    #[test]
257    fn batch_append_borrowed() {
258        let dir = tempfile::tempdir().expect("tempdir");
259        let mut wal = RaftWal::open(dir.path()).expect("open");
260        wal.append_batch(&[(1, b"a" as &[u8]), (2, b"b"), (3, b"c")])
261            .expect("batch");
262        assert_eq!(wal.len(), 3);
263        assert_eq!(wal.get(2).as_deref(), Some(b"b".as_slice()));
264    }
265
266    #[test]
267    fn batch_append_owned() {
268        let dir = tempfile::tempdir().expect("tempdir");
269        let mut wal = RaftWal::open(dir.path()).expect("open");
270        let entries = vec![(1u64, vec![1u8, 2, 3]), (2, vec![4, 5, 6])];
271        wal.append_batch(&entries).expect("batch owned");
272        assert_eq!(wal.len(), 2);
273        assert_eq!(wal.get(1).as_deref(), Some([1u8, 2, 3].as_slice()));
274    }
275
276    #[test]
277    fn meta_operations() {
278        let dir = tempfile::tempdir().expect("tempdir");
279        let mut wal = RaftWal::open(dir.path()).expect("open");
280        assert!(wal.get_meta("k").is_none());
281        wal.set_meta("k", b"v").expect("set");
282        assert_eq!(wal.get_meta("k"), Some(b"v".as_slice()));
283        wal.remove_meta("k").expect("rm");
284        assert!(wal.get_meta("k").is_none());
285    }
286
287    #[test]
288    fn empty_wal() {
289        let dir = tempfile::tempdir().expect("tempdir");
290        let wal = RaftWal::open(dir.path()).expect("open");
291        assert!(wal.is_empty());
292        assert_eq!(wal.first_index(), None);
293        assert_eq!(wal.last_index(), None);
294    }
295
296    #[test]
297    fn recovery_after_compact() {
298        let dir = tempfile::tempdir().expect("tempdir");
299        let path = dir.path().to_path_buf();
300        {
301            let mut wal = RaftWal::open(&path).expect("open");
302            for i in 1..=5 {
303                wal.append(i, format!("e{i}").as_bytes()).expect("a");
304            }
305            wal.compact(3).expect("compact");
306        }
307        {
308            let wal = RaftWal::open(&path).expect("reopen");
309            assert_eq!(wal.len(), 2);
310            assert_eq!(wal.first_index(), Some(4));
311            assert_eq!(wal.get(4).as_deref(), Some(b"e4".as_slice()));
312        }
313    }
314
315    #[test]
316    fn recovery_after_truncate() {
317        let dir = tempfile::tempdir().expect("tempdir");
318        let path = dir.path().to_path_buf();
319        {
320            let mut wal = RaftWal::open(&path).expect("open");
321            for i in 1..=5 {
322                wal.append(i, format!("e{i}").as_bytes()).expect("a");
323            }
324            wal.truncate(4).expect("truncate");
325        }
326        {
327            let wal = RaftWal::open(&path).expect("reopen");
328            assert_eq!(wal.len(), 3);
329            assert_eq!(wal.last_index(), Some(3));
330            assert!(wal.get(4).is_none());
331        }
332    }
333
334    #[test]
335    fn append_after_compact() {
336        let dir = tempfile::tempdir().expect("tempdir");
337        let mut wal = RaftWal::open(dir.path()).expect("open");
338        for i in 1..=5 {
339            wal.append(i, b"x").expect("a");
340        }
341        wal.compact(3).expect("compact");
342        wal.append(6, b"new").expect("append after compact");
343        assert_eq!(wal.len(), 3);
344        assert_eq!(wal.get(6).as_deref(), Some(b"new".as_slice()));
345        assert_eq!(wal.first_index(), Some(4));
346    }
347
348    #[test]
349    fn append_after_truncate() {
350        let dir = tempfile::tempdir().expect("tempdir");
351        let mut wal = RaftWal::open(dir.path()).expect("open");
352        for i in 1..=5 {
353            wal.append(i, b"old").expect("a");
354        }
355        wal.truncate(3).expect("truncate");
356        wal.append(3, b"new").expect("append replacement");
357        assert_eq!(wal.len(), 3);
358        assert_eq!(wal.get(3).as_deref(), Some(b"new".as_slice()));
359    }
360
361    #[test]
362    fn compact_all() {
363        let dir = tempfile::tempdir().expect("tempdir");
364        let mut wal = RaftWal::open(dir.path()).expect("open");
365        for i in 1..=3 {
366            wal.append(i, b"x").expect("a");
367        }
368        wal.compact(3).expect("compact all");
369        assert!(wal.is_empty());
370        assert_eq!(wal.first_index(), None);
371    }
372
373    #[test]
374    fn truncate_all() {
375        let dir = tempfile::tempdir().expect("tempdir");
376        let mut wal = RaftWal::open(dir.path()).expect("open");
377        for i in 1..=3 {
378            wal.append(i, b"x").expect("a");
379        }
380        wal.truncate(1).expect("truncate all");
381        assert!(wal.is_empty());
382        assert_eq!(wal.last_index(), None);
383    }
384
385    #[test]
386    fn truncate_noop_on_empty() {
387        let dir = tempfile::tempdir().expect("tempdir");
388        let mut wal = RaftWal::open(dir.path()).expect("open");
389        wal.compact(10).expect("noop");
390        wal.truncate(1).expect("noop");
391        assert!(wal.is_empty());
392    }
393
394    #[test]
395    fn truncate_out_of_range() {
396        let dir = tempfile::tempdir().expect("tempdir");
397        let mut wal = RaftWal::open(dir.path()).expect("open");
398        for i in 5..=10 {
399            wal.append(i, b"x").expect("a");
400        }
401        wal.compact(2).expect("below range");
402        assert_eq!(wal.len(), 6);
403        wal.truncate(20).expect("above range");
404        assert_eq!(wal.len(), 6);
405    }
406
407    #[test]
408    fn get_out_of_range() {
409        let dir = tempfile::tempdir().expect("tempdir");
410        let mut wal = RaftWal::open(dir.path()).expect("open");
411        wal.append(5, b"x").expect("a");
412        assert!(wal.get(0).is_none());
413        assert!(wal.get(4).is_none());
414        assert!(wal.get(6).is_none());
415        assert!(wal.get(u64::MAX).is_none());
416    }
417
418    #[test]
419    fn read_range_empty_result() {
420        let dir = tempfile::tempdir().expect("tempdir");
421        let mut wal = RaftWal::open(dir.path()).expect("open");
422        for i in 5..=10 {
423            wal.append(i, b"x").expect("a");
424        }
425        assert!(wal.read_range(1..=4).is_empty());
426        assert!(wal.read_range(11..=20).is_empty());
427        assert!(wal.read_range(8..8).is_empty());
428    }
429
430    #[test]
431    fn read_range_partial_overlap() {
432        let dir = tempfile::tempdir().expect("tempdir");
433        let mut wal = RaftWal::open(dir.path()).expect("open");
434        for i in 5..=10 {
435            wal.append(i, format!("e{i}").as_bytes()).expect("a");
436        }
437        let r = wal.read_range(3..=7);
438        assert_eq!(r.len(), 3);
439        assert_eq!(r[0].0, 5);
440        assert_eq!(r[2].0, 7);
441    }
442
443    #[test]
444    fn large_entry() {
445        let dir = tempfile::tempdir().expect("tempdir");
446        let path = dir.path().to_path_buf();
447        let big = vec![0xABu8; 1024 * 1024];
448        {
449            let mut wal = RaftWal::open(&path).expect("open");
450            wal.append(1, &big).expect("append big");
451        }
452        {
453            let wal = RaftWal::open(&path).expect("reopen");
454            assert_eq!(wal.get(1).expect("get").as_ref(), big.as_slice());
455        }
456    }
457
458    #[test]
459    fn flush_persists() {
460        let dir = tempfile::tempdir().expect("tempdir");
461        let path = dir.path().to_path_buf();
462        {
463            let mut wal = RaftWal::open(&path).expect("open");
464            wal.append(1, b"buffered").expect("a");
465            wal.flush().expect("flush");
466        }
467        {
468            let wal = RaftWal::open(&path).expect("reopen");
469            assert_eq!(wal.get(1).as_deref(), Some(b"buffered".as_slice()));
470        }
471    }
472
473    #[test]
474    fn sync_persists() {
475        let dir = tempfile::tempdir().expect("tempdir");
476        let path = dir.path().to_path_buf();
477        {
478            let mut wal = RaftWal::open(&path).expect("open");
479            wal.append(1, b"durable").expect("a");
480            wal.sync().expect("sync");
481        }
482        {
483            let wal = RaftWal::open(&path).expect("reopen");
484            assert_eq!(wal.get(1).as_deref(), Some(b"durable".as_slice()));
485        }
486    }
487
488    #[test]
489    fn meta_survives_crash() {
490        let dir = tempfile::tempdir().expect("tempdir");
491        let path = dir.path().to_path_buf();
492        {
493            let mut wal = RaftWal::open(&path).expect("open");
494            wal.set_meta("term", b"5").expect("set term");
495            wal.set_meta("vote", b"node-2").expect("set vote");
496        }
497        {
498            let wal = RaftWal::open(&path).expect("reopen");
499            assert_eq!(wal.get_meta("term"), Some(b"5".as_slice()));
500            assert_eq!(wal.get_meta("vote"), Some(b"node-2".as_slice()));
501        }
502    }
503
504    #[test]
505    fn error_source_chain() {
506        let err = WalError::Io(std::io::Error::new(std::io::ErrorKind::NotFound, "gone"));
507        assert!(std::error::Error::source(&err).is_some());
508    }
509
510    #[test]
511    fn estimated_memory_increases() {
512        let dir = tempfile::tempdir().expect("tempdir");
513        let mut wal = RaftWal::open(dir.path()).expect("open");
514        let before = wal.estimated_memory();
515        for i in 1..=100 {
516            wal.append(i, &[0u8; 256]).expect("a");
517        }
518        assert!(wal.estimated_memory() > before);
519    }
520
521    #[test]
522    fn segment_rotation() {
523        let dir = tempfile::tempdir().expect("tempdir");
524        let mut wal = RaftWal::open(dir.path()).expect("open");
525        wal.set_max_segment_size(200);
526
527        for i in 1..=100 {
528            wal.append(i, &[0u8; 32]).expect("a");
529        }
530
531        // All entries still readable
532        for i in 1..=100 {
533            assert!(wal.get(i).is_some(), "missing index {i}");
534        }
535    }
536
537    #[test]
538    fn segment_rotation_recovery() {
539        let dir = tempfile::tempdir().expect("tempdir");
540        let path = dir.path().to_path_buf();
541        {
542            let mut wal = RaftWal::open(&path).expect("open");
543            wal.set_max_segment_size(200);
544            for i in 1..=100 {
545                wal.append(i, format!("e{i}").as_bytes()).expect("a");
546            }
547        }
548        {
549            let wal = RaftWal::open(&path).expect("reopen");
550            assert_eq!(wal.len(), 100);
551            assert_eq!(wal.get(1).as_deref(), Some(b"e1".as_slice()));
552            assert_eq!(wal.get(100).as_deref(), Some(b"e100".as_slice()));
553        }
554    }
555
556    #[test]
557    fn compact_deletes_old_segments() {
558        let dir = tempfile::tempdir().expect("tempdir");
559        let mut wal = RaftWal::open(dir.path()).expect("open");
560        wal.set_max_segment_size(200);
561
562        for i in 1..=100 {
563            wal.append(i, &[0u8; 32]).expect("a");
564        }
565        let seg_count_before = segment::list_segments(dir.path()).len();
566
567        wal.compact(80).expect("compact");
568        let seg_count_after = segment::list_segments(dir.path()).len();
569
570        assert!(seg_count_after < seg_count_before);
571        assert_eq!(wal.len(), 20);
572        assert_eq!(wal.first_index(), Some(81));
573    }
574}