1#![cfg_attr(not(feature = "std"), no_std)]
33#![deny(missing_docs)]
34#![deny(clippy::unwrap_used)]
35
36#[cfg(not(feature = "std"))]
37extern crate alloc;
38
39pub mod crc;
41pub mod wire;
43
44#[macro_use]
45pub(crate) mod macros;
46pub(crate) mod core;
47
48#[cfg(feature = "std")]
49pub(crate) mod segment;
50pub(crate) mod state;
51
52mod storage;
53pub use storage::WalStorage;
54
55mod generic;
56pub use generic::GenericRaftWal;
57
58#[cfg(feature = "std")]
59mod std_storage;
60#[cfg(feature = "std")]
61pub use std_storage::StdStorage;
62
63#[cfg(feature = "tokio")]
64mod tokio;
65#[cfg(feature = "tokio")]
66pub use self::tokio::AsyncRaftWal;
67
68#[cfg(feature = "io-uring")]
69mod uring;
70#[cfg(feature = "io-uring")]
71pub use uring::UringRaftWal;
72
73#[cfg(feature = "io-uring-bridge")]
74mod bridge;
75#[cfg(feature = "io-uring-bridge")]
76pub use bridge::BridgedUringWal;
77
78#[cfg(feature = "std")]
79pub mod impls;
80
81#[cfg(feature = "openraft-storage")]
82pub use impls::openraft::OpenRaftLogStorage;
83
84#[cfg(feature = "std")]
86#[derive(Debug)]
87pub enum WalError {
88 Io(std::io::Error),
90}
91
92#[cfg(feature = "std")]
93impl std::fmt::Display for WalError {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 match self {
96 Self::Io(e) => write!(f, "WAL I/O: {e}"),
97 }
98 }
99}
100
101#[cfg(feature = "std")]
102impl std::error::Error for WalError {
103 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
104 match self {
105 Self::Io(e) => Some(e),
106 }
107 }
108}
109
110#[cfg(feature = "std")]
111impl From<std::io::Error> for WalError {
112 fn from(e: std::io::Error) -> Self {
113 Self::Io(e)
114 }
115}
116
117#[cfg(feature = "std")]
119pub type Result<T> = std::result::Result<T, WalError>;
120
121pub struct Entry<'a> {
123 pub index: u64,
125 pub data: &'a [u8],
127}
128
129#[cfg(feature = "std")]
134pub type RaftWal = GenericRaftWal<StdStorage>;
135
136#[cfg(feature = "std")]
137impl RaftWal {
138 pub fn open(data_dir: impl AsRef<std::path::Path>) -> Result<Self> {
145 let storage = StdStorage::new(data_dir)?;
146 GenericRaftWal::new(storage).map_err(WalError::Io)
147 }
148
149 #[must_use]
154 pub fn get(&self, index: u64) -> Option<std::borrow::Cow<'_, [u8]>> {
155 if let Some(data) = self.get_cached(index) {
156 return Some(std::borrow::Cow::Borrowed(data));
157 }
158 self.get_or_read(index)
159 .ok()
160 .flatten()
161 .map(std::borrow::Cow::Owned)
162 }
163}
164
165#[cfg(all(test, feature = "std"))]
166mod tests {
167 use super::*;
168
169 #[test]
170 fn open_append_get() {
171 let dir = tempfile::tempdir().expect("tempdir");
172 let mut wal = RaftWal::open(dir.path()).expect("open");
173 wal.append(1, b"hello").expect("append");
174 assert_eq!(wal.get(1).as_deref(), Some(b"hello".as_slice()));
175 assert_eq!(wal.len(), 1);
176 }
177
178 #[test]
179 fn read_range_works() {
180 let dir = tempfile::tempdir().expect("tempdir");
181 let mut wal = RaftWal::open(dir.path()).expect("open");
182 for i in 1..=10 {
183 wal.append(i, format!("e{i}").as_bytes()).expect("append");
184 }
185 let r = wal.read_range(3..=7);
186 assert_eq!(r.len(), 5);
187 assert_eq!(r[0].0, 3);
188 }
189
190 #[test]
191 fn iter_range_borrowed() {
192 let dir = tempfile::tempdir().expect("tempdir");
193 let mut wal = RaftWal::open(dir.path()).expect("open");
194 for i in 1..=5 {
195 wal.append(i, format!("e{i}").as_bytes()).expect("append");
196 }
197 let entries: Vec<_> = wal.iter_range(2..=4).collect();
198 assert_eq!(entries.len(), 3);
199 assert_eq!(entries[0].index, 2);
200 assert_eq!(entries[0].data, b"e2");
201 assert_eq!(entries[2].index, 4);
202 }
203
204 #[test]
205 fn iter_all() {
206 let dir = tempfile::tempdir().expect("tempdir");
207 let mut wal = RaftWal::open(dir.path()).expect("open");
208 for i in 1..=3 {
209 wal.append(i, format!("e{i}").as_bytes()).expect("append");
210 }
211 let entries: Vec<_> = wal.iter().collect();
212 assert_eq!(entries.len(), 3);
213 assert_eq!(entries[0].index, 1);
214 assert_eq!(entries[2].data, b"e3");
215 }
216
217 #[test]
218 fn recovery() {
219 let dir = tempfile::tempdir().expect("tempdir");
220 let path = dir.path().to_path_buf();
221 {
222 let mut wal = RaftWal::open(&path).expect("open");
223 wal.append(1, b"a").expect("a");
224 wal.append(2, b"b").expect("b");
225 wal.set_meta("vote", b"v1").expect("meta");
226 }
227 {
228 let wal = RaftWal::open(&path).expect("reopen");
229 assert_eq!(wal.len(), 2);
230 assert_eq!(wal.get(2).as_deref(), Some(b"b".as_slice()));
231 assert_eq!(wal.get_meta("vote"), Some(b"v1".as_slice()));
232 }
233 }
234
235 #[test]
236 fn compact_works() {
237 let dir = tempfile::tempdir().expect("tempdir");
238 let mut wal = RaftWal::open(dir.path()).expect("open");
239 for i in 1..=5 {
240 wal.append(i, b"x").expect("a");
241 }
242 wal.compact(3).expect("compact");
243 assert_eq!(wal.len(), 2);
244 assert_eq!(wal.first_index(), Some(4));
245 }
246
247 #[test]
248 fn truncate_works() {
249 let dir = tempfile::tempdir().expect("tempdir");
250 let mut wal = RaftWal::open(dir.path()).expect("open");
251 for i in 1..=5 {
252 wal.append(i, b"x").expect("a");
253 }
254 wal.truncate(3).expect("truncate");
255 assert_eq!(wal.len(), 2);
256 assert_eq!(wal.last_index(), Some(2));
257 }
258
259 #[test]
260 fn batch_append_borrowed() {
261 let dir = tempfile::tempdir().expect("tempdir");
262 let mut wal = RaftWal::open(dir.path()).expect("open");
263 wal.append_batch(&[(1, b"a" as &[u8]), (2, b"b"), (3, b"c")])
264 .expect("batch");
265 assert_eq!(wal.len(), 3);
266 assert_eq!(wal.get(2).as_deref(), Some(b"b".as_slice()));
267 }
268
269 #[test]
270 fn batch_append_owned() {
271 let dir = tempfile::tempdir().expect("tempdir");
272 let mut wal = RaftWal::open(dir.path()).expect("open");
273 let entries = vec![(1u64, vec![1u8, 2, 3]), (2, vec![4, 5, 6])];
274 wal.append_batch(&entries).expect("batch owned");
275 assert_eq!(wal.len(), 2);
276 assert_eq!(wal.get(1).as_deref(), Some([1u8, 2, 3].as_slice()));
277 }
278
279 #[test]
280 fn meta_operations() {
281 let dir = tempfile::tempdir().expect("tempdir");
282 let mut wal = RaftWal::open(dir.path()).expect("open");
283 assert!(wal.get_meta("k").is_none());
284 wal.set_meta("k", b"v").expect("set");
285 assert_eq!(wal.get_meta("k"), Some(b"v".as_slice()));
286 wal.remove_meta("k").expect("rm");
287 assert!(wal.get_meta("k").is_none());
288 }
289
290 #[test]
291 fn empty_wal() {
292 let dir = tempfile::tempdir().expect("tempdir");
293 let wal = RaftWal::open(dir.path()).expect("open");
294 assert!(wal.is_empty());
295 assert_eq!(wal.first_index(), None);
296 assert_eq!(wal.last_index(), None);
297 }
298
299 #[test]
300 fn recovery_after_compact() {
301 let dir = tempfile::tempdir().expect("tempdir");
302 let path = dir.path().to_path_buf();
303 {
304 let mut wal = RaftWal::open(&path).expect("open");
305 for i in 1..=5 {
306 wal.append(i, format!("e{i}").as_bytes()).expect("a");
307 }
308 wal.compact(3).expect("compact");
309 }
310 {
311 let wal = RaftWal::open(&path).expect("reopen");
312 assert_eq!(wal.len(), 2);
313 assert_eq!(wal.first_index(), Some(4));
314 assert_eq!(wal.get(4).as_deref(), Some(b"e4".as_slice()));
315 }
316 }
317
318 #[test]
319 fn recovery_after_truncate() {
320 let dir = tempfile::tempdir().expect("tempdir");
321 let path = dir.path().to_path_buf();
322 {
323 let mut wal = RaftWal::open(&path).expect("open");
324 for i in 1..=5 {
325 wal.append(i, format!("e{i}").as_bytes()).expect("a");
326 }
327 wal.truncate(4).expect("truncate");
328 }
329 {
330 let wal = RaftWal::open(&path).expect("reopen");
331 assert_eq!(wal.len(), 3);
332 assert_eq!(wal.last_index(), Some(3));
333 assert!(wal.get(4).is_none());
334 }
335 }
336
337 #[test]
338 fn append_after_compact() {
339 let dir = tempfile::tempdir().expect("tempdir");
340 let mut wal = RaftWal::open(dir.path()).expect("open");
341 for i in 1..=5 {
342 wal.append(i, b"x").expect("a");
343 }
344 wal.compact(3).expect("compact");
345 wal.append(6, b"new").expect("append after compact");
346 assert_eq!(wal.len(), 3);
347 assert_eq!(wal.get(6).as_deref(), Some(b"new".as_slice()));
348 assert_eq!(wal.first_index(), Some(4));
349 }
350
351 #[test]
352 fn append_after_truncate() {
353 let dir = tempfile::tempdir().expect("tempdir");
354 let mut wal = RaftWal::open(dir.path()).expect("open");
355 for i in 1..=5 {
356 wal.append(i, b"old").expect("a");
357 }
358 wal.truncate(3).expect("truncate");
359 wal.append(3, b"new").expect("append replacement");
360 assert_eq!(wal.len(), 3);
361 assert_eq!(wal.get(3).as_deref(), Some(b"new".as_slice()));
362 }
363
364 #[test]
365 fn compact_all() {
366 let dir = tempfile::tempdir().expect("tempdir");
367 let mut wal = RaftWal::open(dir.path()).expect("open");
368 for i in 1..=3 {
369 wal.append(i, b"x").expect("a");
370 }
371 wal.compact(3).expect("compact all");
372 assert!(wal.is_empty());
373 assert_eq!(wal.first_index(), None);
374 }
375
376 #[test]
377 fn truncate_all() {
378 let dir = tempfile::tempdir().expect("tempdir");
379 let mut wal = RaftWal::open(dir.path()).expect("open");
380 for i in 1..=3 {
381 wal.append(i, b"x").expect("a");
382 }
383 wal.truncate(1).expect("truncate all");
384 assert!(wal.is_empty());
385 assert_eq!(wal.last_index(), None);
386 }
387
388 #[test]
389 fn truncate_noop_on_empty() {
390 let dir = tempfile::tempdir().expect("tempdir");
391 let mut wal = RaftWal::open(dir.path()).expect("open");
392 wal.compact(10).expect("noop");
393 wal.truncate(1).expect("noop");
394 assert!(wal.is_empty());
395 }
396
397 #[test]
398 fn truncate_out_of_range() {
399 let dir = tempfile::tempdir().expect("tempdir");
400 let mut wal = RaftWal::open(dir.path()).expect("open");
401 for i in 5..=10 {
402 wal.append(i, b"x").expect("a");
403 }
404 wal.compact(2).expect("below range");
405 assert_eq!(wal.len(), 6);
406 wal.truncate(20).expect("above range");
407 assert_eq!(wal.len(), 6);
408 }
409
410 #[test]
411 fn get_out_of_range() {
412 let dir = tempfile::tempdir().expect("tempdir");
413 let mut wal = RaftWal::open(dir.path()).expect("open");
414 wal.append(5, b"x").expect("a");
415 assert!(wal.get(0).is_none());
416 assert!(wal.get(4).is_none());
417 assert!(wal.get(6).is_none());
418 assert!(wal.get(u64::MAX).is_none());
419 }
420
421 #[test]
422 fn read_range_empty_result() {
423 let dir = tempfile::tempdir().expect("tempdir");
424 let mut wal = RaftWal::open(dir.path()).expect("open");
425 for i in 5..=10 {
426 wal.append(i, b"x").expect("a");
427 }
428 assert!(wal.read_range(1..=4).is_empty());
429 assert!(wal.read_range(11..=20).is_empty());
430 assert!(wal.read_range(8..8).is_empty());
431 }
432
433 #[test]
434 fn read_range_partial_overlap() {
435 let dir = tempfile::tempdir().expect("tempdir");
436 let mut wal = RaftWal::open(dir.path()).expect("open");
437 for i in 5..=10 {
438 wal.append(i, format!("e{i}").as_bytes()).expect("a");
439 }
440 let r = wal.read_range(3..=7);
441 assert_eq!(r.len(), 3);
442 assert_eq!(r[0].0, 5);
443 assert_eq!(r[2].0, 7);
444 }
445
446 #[test]
447 fn large_entry() {
448 let dir = tempfile::tempdir().expect("tempdir");
449 let path = dir.path().to_path_buf();
450 let big = vec![0xABu8; 1024 * 1024];
451 {
452 let mut wal = RaftWal::open(&path).expect("open");
453 wal.append(1, &big).expect("append big");
454 }
455 {
456 let wal = RaftWal::open(&path).expect("reopen");
457 assert_eq!(wal.get(1).expect("get").as_ref(), big.as_slice());
458 }
459 }
460
461 #[test]
462 fn flush_persists() {
463 let dir = tempfile::tempdir().expect("tempdir");
464 let path = dir.path().to_path_buf();
465 {
466 let mut wal = RaftWal::open(&path).expect("open");
467 wal.append(1, b"buffered").expect("a");
468 wal.flush().expect("flush");
469 }
470 {
471 let wal = RaftWal::open(&path).expect("reopen");
472 assert_eq!(wal.get(1).as_deref(), Some(b"buffered".as_slice()));
473 }
474 }
475
476 #[test]
477 fn sync_persists() {
478 let dir = tempfile::tempdir().expect("tempdir");
479 let path = dir.path().to_path_buf();
480 {
481 let mut wal = RaftWal::open(&path).expect("open");
482 wal.append(1, b"durable").expect("a");
483 wal.sync().expect("sync");
484 }
485 {
486 let wal = RaftWal::open(&path).expect("reopen");
487 assert_eq!(wal.get(1).as_deref(), Some(b"durable".as_slice()));
488 }
489 }
490
491 #[test]
492 fn meta_survives_crash() {
493 let dir = tempfile::tempdir().expect("tempdir");
494 let path = dir.path().to_path_buf();
495 {
496 let mut wal = RaftWal::open(&path).expect("open");
497 wal.set_meta("term", b"5").expect("set term");
498 wal.set_meta("vote", b"node-2").expect("set vote");
499 }
500 {
501 let wal = RaftWal::open(&path).expect("reopen");
502 assert_eq!(wal.get_meta("term"), Some(b"5".as_slice()));
503 assert_eq!(wal.get_meta("vote"), Some(b"node-2".as_slice()));
504 }
505 }
506
507 #[test]
508 fn error_source_chain() {
509 let err = WalError::Io(std::io::Error::new(std::io::ErrorKind::NotFound, "gone"));
510 assert!(std::error::Error::source(&err).is_some());
511 }
512
513 #[test]
514 fn estimated_memory_increases() {
515 let dir = tempfile::tempdir().expect("tempdir");
516 let mut wal = RaftWal::open(dir.path()).expect("open");
517 let before = wal.estimated_memory();
518 for i in 1..=100 {
519 wal.append(i, &[0u8; 256]).expect("a");
520 }
521 assert!(wal.estimated_memory() > before);
522 }
523
524 #[test]
525 fn segment_rotation() {
526 let dir = tempfile::tempdir().expect("tempdir");
527 let mut wal = RaftWal::open(dir.path()).expect("open");
528 wal.set_max_segment_size(200);
529
530 for i in 1..=100 {
531 wal.append(i, &[0u8; 32]).expect("a");
532 }
533
534 for i in 1..=100 {
536 assert!(wal.get(i).is_some(), "missing index {i}");
537 }
538 }
539
540 #[test]
541 fn segment_rotation_recovery() {
542 let dir = tempfile::tempdir().expect("tempdir");
543 let path = dir.path().to_path_buf();
544 {
545 let mut wal = RaftWal::open(&path).expect("open");
546 wal.set_max_segment_size(200);
547 for i in 1..=100 {
548 wal.append(i, format!("e{i}").as_bytes()).expect("a");
549 }
550 }
551 {
552 let wal = RaftWal::open(&path).expect("reopen");
553 assert_eq!(wal.len(), 100);
554 assert_eq!(wal.get(1).as_deref(), Some(b"e1".as_slice()));
555 assert_eq!(wal.get(100).as_deref(), Some(b"e100".as_slice()));
556 }
557 }
558
559 #[test]
560 fn compact_deletes_old_segments() {
561 let dir = tempfile::tempdir().expect("tempdir");
562 let mut wal = RaftWal::open(dir.path()).expect("open");
563 wal.set_max_segment_size(200);
564
565 for i in 1..=100 {
566 wal.append(i, &[0u8; 32]).expect("a");
567 }
568 let seg_count_before = segment::list_segments(dir.path()).len();
569
570 wal.compact(80).expect("compact");
571 let seg_count_after = segment::list_segments(dir.path()).len();
572
573 assert!(seg_count_after < seg_count_before);
574 assert_eq!(wal.len(), 20);
575 assert_eq!(wal.first_index(), Some(81));
576 }
577}