1#![cfg_attr(not(feature = "std"), no_std)]
33#![deny(missing_docs)]
34#![deny(clippy::unwrap_used)]
35
36#[cfg(not(feature = "std"))]
37extern crate alloc;
38
39pub mod crc;
41pub mod wire;
43
44#[macro_use]
45pub(crate) mod macros;
46pub(crate) mod core;
47
48#[cfg(feature = "std")]
49pub(crate) mod segment;
50pub(crate) mod state;
51
52mod storage;
53pub use storage::WalStorage;
54
55mod generic;
56pub use generic::GenericRaftWal;
57
58#[cfg(feature = "std")]
59mod std_storage;
60#[cfg(feature = "std")]
61pub use std_storage::StdStorage;
62
63#[cfg(feature = "tokio")]
64mod tokio;
65#[cfg(feature = "tokio")]
66pub use self::tokio::AsyncRaftWal;
67
68#[cfg(feature = "io-uring")]
69mod uring;
70#[cfg(feature = "io-uring")]
71pub use uring::UringRaftWal;
72
73#[cfg(feature = "io-uring-bridge")]
74mod bridge;
75#[cfg(feature = "io-uring-bridge")]
76pub use bridge::BridgedUringWal;
77
78#[cfg(feature = "std")]
79pub mod impls;
80
81#[cfg(feature = "openraft-storage")]
82pub use impls::openraft::OpenRaftLogStorage;
83
84#[cfg(feature = "std")]
86#[derive(Debug)]
87pub enum WalError {
88 Io(std::io::Error),
90}
91
92#[cfg(feature = "std")]
93impl std::fmt::Display for WalError {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 match self {
96 Self::Io(e) => write!(f, "WAL I/O: {e}"),
97 }
98 }
99}
100
101#[cfg(feature = "std")]
102impl std::error::Error for WalError {
103 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
104 match self {
105 Self::Io(e) => Some(e),
106 }
107 }
108}
109
110#[cfg(feature = "std")]
111impl From<std::io::Error> for WalError {
112 fn from(e: std::io::Error) -> Self {
113 Self::Io(e)
114 }
115}
116
117#[cfg(feature = "std")]
119pub type Result<T> = std::result::Result<T, WalError>;
120
121pub struct Entry<'a> {
123 pub index: u64,
125 pub data: &'a [u8],
127}
128
129#[cfg(feature = "std")]
134pub type RaftWal = GenericRaftWal<StdStorage>;
135
136#[cfg(feature = "std")]
137impl RaftWal {
138 pub fn open(data_dir: impl AsRef<std::path::Path>) -> Result<Self> {
145 let storage = StdStorage::new(data_dir)?;
146 GenericRaftWal::new(storage).map_err(WalError::Io)
147 }
148
149 #[must_use]
154 pub fn get(&self, index: u64) -> Option<std::borrow::Cow<'_, [u8]>> {
155 if let Some(data) = self.get_cached(index) {
156 return Some(std::borrow::Cow::Borrowed(data));
157 }
158 self.get_or_read(index).ok().flatten().map(std::borrow::Cow::Owned)
159 }
160}
161
162#[cfg(all(test, feature = "std"))]
163mod tests {
164 use super::*;
165
166 #[test]
167 fn open_append_get() {
168 let dir = tempfile::tempdir().expect("tempdir");
169 let mut wal = RaftWal::open(dir.path()).expect("open");
170 wal.append(1, b"hello").expect("append");
171 assert_eq!(wal.get(1).as_deref(), Some(b"hello".as_slice()));
172 assert_eq!(wal.len(), 1);
173 }
174
175 #[test]
176 fn read_range_works() {
177 let dir = tempfile::tempdir().expect("tempdir");
178 let mut wal = RaftWal::open(dir.path()).expect("open");
179 for i in 1..=10 {
180 wal.append(i, format!("e{i}").as_bytes()).expect("append");
181 }
182 let r = wal.read_range(3..=7);
183 assert_eq!(r.len(), 5);
184 assert_eq!(r[0].0, 3);
185 }
186
187 #[test]
188 fn iter_range_borrowed() {
189 let dir = tempfile::tempdir().expect("tempdir");
190 let mut wal = RaftWal::open(dir.path()).expect("open");
191 for i in 1..=5 {
192 wal.append(i, format!("e{i}").as_bytes()).expect("append");
193 }
194 let entries: Vec<_> = wal.iter_range(2..=4).collect();
195 assert_eq!(entries.len(), 3);
196 assert_eq!(entries[0].index, 2);
197 assert_eq!(entries[0].data, b"e2");
198 assert_eq!(entries[2].index, 4);
199 }
200
201 #[test]
202 fn iter_all() {
203 let dir = tempfile::tempdir().expect("tempdir");
204 let mut wal = RaftWal::open(dir.path()).expect("open");
205 for i in 1..=3 {
206 wal.append(i, format!("e{i}").as_bytes()).expect("append");
207 }
208 let entries: Vec<_> = wal.iter().collect();
209 assert_eq!(entries.len(), 3);
210 assert_eq!(entries[0].index, 1);
211 assert_eq!(entries[2].data, b"e3");
212 }
213
214 #[test]
215 fn recovery() {
216 let dir = tempfile::tempdir().expect("tempdir");
217 let path = dir.path().to_path_buf();
218 {
219 let mut wal = RaftWal::open(&path).expect("open");
220 wal.append(1, b"a").expect("a");
221 wal.append(2, b"b").expect("b");
222 wal.set_meta("vote", b"v1").expect("meta");
223 }
224 {
225 let wal = RaftWal::open(&path).expect("reopen");
226 assert_eq!(wal.len(), 2);
227 assert_eq!(wal.get(2).as_deref(), Some(b"b".as_slice()));
228 assert_eq!(wal.get_meta("vote"), Some(b"v1".as_slice()));
229 }
230 }
231
232 #[test]
233 fn compact_works() {
234 let dir = tempfile::tempdir().expect("tempdir");
235 let mut wal = RaftWal::open(dir.path()).expect("open");
236 for i in 1..=5 {
237 wal.append(i, b"x").expect("a");
238 }
239 wal.compact(3).expect("compact");
240 assert_eq!(wal.len(), 2);
241 assert_eq!(wal.first_index(), Some(4));
242 }
243
244 #[test]
245 fn truncate_works() {
246 let dir = tempfile::tempdir().expect("tempdir");
247 let mut wal = RaftWal::open(dir.path()).expect("open");
248 for i in 1..=5 {
249 wal.append(i, b"x").expect("a");
250 }
251 wal.truncate(3).expect("truncate");
252 assert_eq!(wal.len(), 2);
253 assert_eq!(wal.last_index(), Some(2));
254 }
255
256 #[test]
257 fn batch_append_borrowed() {
258 let dir = tempfile::tempdir().expect("tempdir");
259 let mut wal = RaftWal::open(dir.path()).expect("open");
260 wal.append_batch(&[(1, b"a" as &[u8]), (2, b"b"), (3, b"c")])
261 .expect("batch");
262 assert_eq!(wal.len(), 3);
263 assert_eq!(wal.get(2).as_deref(), Some(b"b".as_slice()));
264 }
265
266 #[test]
267 fn batch_append_owned() {
268 let dir = tempfile::tempdir().expect("tempdir");
269 let mut wal = RaftWal::open(dir.path()).expect("open");
270 let entries = vec![(1u64, vec![1u8, 2, 3]), (2, vec![4, 5, 6])];
271 wal.append_batch(&entries).expect("batch owned");
272 assert_eq!(wal.len(), 2);
273 assert_eq!(wal.get(1).as_deref(), Some([1u8, 2, 3].as_slice()));
274 }
275
276 #[test]
277 fn meta_operations() {
278 let dir = tempfile::tempdir().expect("tempdir");
279 let mut wal = RaftWal::open(dir.path()).expect("open");
280 assert!(wal.get_meta("k").is_none());
281 wal.set_meta("k", b"v").expect("set");
282 assert_eq!(wal.get_meta("k"), Some(b"v".as_slice()));
283 wal.remove_meta("k").expect("rm");
284 assert!(wal.get_meta("k").is_none());
285 }
286
287 #[test]
288 fn empty_wal() {
289 let dir = tempfile::tempdir().expect("tempdir");
290 let wal = RaftWal::open(dir.path()).expect("open");
291 assert!(wal.is_empty());
292 assert_eq!(wal.first_index(), None);
293 assert_eq!(wal.last_index(), None);
294 }
295
296 #[test]
297 fn recovery_after_compact() {
298 let dir = tempfile::tempdir().expect("tempdir");
299 let path = dir.path().to_path_buf();
300 {
301 let mut wal = RaftWal::open(&path).expect("open");
302 for i in 1..=5 {
303 wal.append(i, format!("e{i}").as_bytes()).expect("a");
304 }
305 wal.compact(3).expect("compact");
306 }
307 {
308 let wal = RaftWal::open(&path).expect("reopen");
309 assert_eq!(wal.len(), 2);
310 assert_eq!(wal.first_index(), Some(4));
311 assert_eq!(wal.get(4).as_deref(), Some(b"e4".as_slice()));
312 }
313 }
314
315 #[test]
316 fn recovery_after_truncate() {
317 let dir = tempfile::tempdir().expect("tempdir");
318 let path = dir.path().to_path_buf();
319 {
320 let mut wal = RaftWal::open(&path).expect("open");
321 for i in 1..=5 {
322 wal.append(i, format!("e{i}").as_bytes()).expect("a");
323 }
324 wal.truncate(4).expect("truncate");
325 }
326 {
327 let wal = RaftWal::open(&path).expect("reopen");
328 assert_eq!(wal.len(), 3);
329 assert_eq!(wal.last_index(), Some(3));
330 assert!(wal.get(4).is_none());
331 }
332 }
333
334 #[test]
335 fn append_after_compact() {
336 let dir = tempfile::tempdir().expect("tempdir");
337 let mut wal = RaftWal::open(dir.path()).expect("open");
338 for i in 1..=5 {
339 wal.append(i, b"x").expect("a");
340 }
341 wal.compact(3).expect("compact");
342 wal.append(6, b"new").expect("append after compact");
343 assert_eq!(wal.len(), 3);
344 assert_eq!(wal.get(6).as_deref(), Some(b"new".as_slice()));
345 assert_eq!(wal.first_index(), Some(4));
346 }
347
348 #[test]
349 fn append_after_truncate() {
350 let dir = tempfile::tempdir().expect("tempdir");
351 let mut wal = RaftWal::open(dir.path()).expect("open");
352 for i in 1..=5 {
353 wal.append(i, b"old").expect("a");
354 }
355 wal.truncate(3).expect("truncate");
356 wal.append(3, b"new").expect("append replacement");
357 assert_eq!(wal.len(), 3);
358 assert_eq!(wal.get(3).as_deref(), Some(b"new".as_slice()));
359 }
360
361 #[test]
362 fn compact_all() {
363 let dir = tempfile::tempdir().expect("tempdir");
364 let mut wal = RaftWal::open(dir.path()).expect("open");
365 for i in 1..=3 {
366 wal.append(i, b"x").expect("a");
367 }
368 wal.compact(3).expect("compact all");
369 assert!(wal.is_empty());
370 assert_eq!(wal.first_index(), None);
371 }
372
373 #[test]
374 fn truncate_all() {
375 let dir = tempfile::tempdir().expect("tempdir");
376 let mut wal = RaftWal::open(dir.path()).expect("open");
377 for i in 1..=3 {
378 wal.append(i, b"x").expect("a");
379 }
380 wal.truncate(1).expect("truncate all");
381 assert!(wal.is_empty());
382 assert_eq!(wal.last_index(), None);
383 }
384
385 #[test]
386 fn truncate_noop_on_empty() {
387 let dir = tempfile::tempdir().expect("tempdir");
388 let mut wal = RaftWal::open(dir.path()).expect("open");
389 wal.compact(10).expect("noop");
390 wal.truncate(1).expect("noop");
391 assert!(wal.is_empty());
392 }
393
394 #[test]
395 fn truncate_out_of_range() {
396 let dir = tempfile::tempdir().expect("tempdir");
397 let mut wal = RaftWal::open(dir.path()).expect("open");
398 for i in 5..=10 {
399 wal.append(i, b"x").expect("a");
400 }
401 wal.compact(2).expect("below range");
402 assert_eq!(wal.len(), 6);
403 wal.truncate(20).expect("above range");
404 assert_eq!(wal.len(), 6);
405 }
406
407 #[test]
408 fn get_out_of_range() {
409 let dir = tempfile::tempdir().expect("tempdir");
410 let mut wal = RaftWal::open(dir.path()).expect("open");
411 wal.append(5, b"x").expect("a");
412 assert!(wal.get(0).is_none());
413 assert!(wal.get(4).is_none());
414 assert!(wal.get(6).is_none());
415 assert!(wal.get(u64::MAX).is_none());
416 }
417
418 #[test]
419 fn read_range_empty_result() {
420 let dir = tempfile::tempdir().expect("tempdir");
421 let mut wal = RaftWal::open(dir.path()).expect("open");
422 for i in 5..=10 {
423 wal.append(i, b"x").expect("a");
424 }
425 assert!(wal.read_range(1..=4).is_empty());
426 assert!(wal.read_range(11..=20).is_empty());
427 assert!(wal.read_range(8..8).is_empty());
428 }
429
430 #[test]
431 fn read_range_partial_overlap() {
432 let dir = tempfile::tempdir().expect("tempdir");
433 let mut wal = RaftWal::open(dir.path()).expect("open");
434 for i in 5..=10 {
435 wal.append(i, format!("e{i}").as_bytes()).expect("a");
436 }
437 let r = wal.read_range(3..=7);
438 assert_eq!(r.len(), 3);
439 assert_eq!(r[0].0, 5);
440 assert_eq!(r[2].0, 7);
441 }
442
443 #[test]
444 fn large_entry() {
445 let dir = tempfile::tempdir().expect("tempdir");
446 let path = dir.path().to_path_buf();
447 let big = vec![0xABu8; 1024 * 1024];
448 {
449 let mut wal = RaftWal::open(&path).expect("open");
450 wal.append(1, &big).expect("append big");
451 }
452 {
453 let wal = RaftWal::open(&path).expect("reopen");
454 assert_eq!(wal.get(1).expect("get").as_ref(), big.as_slice());
455 }
456 }
457
458 #[test]
459 fn flush_persists() {
460 let dir = tempfile::tempdir().expect("tempdir");
461 let path = dir.path().to_path_buf();
462 {
463 let mut wal = RaftWal::open(&path).expect("open");
464 wal.append(1, b"buffered").expect("a");
465 wal.flush().expect("flush");
466 }
467 {
468 let wal = RaftWal::open(&path).expect("reopen");
469 assert_eq!(wal.get(1).as_deref(), Some(b"buffered".as_slice()));
470 }
471 }
472
473 #[test]
474 fn sync_persists() {
475 let dir = tempfile::tempdir().expect("tempdir");
476 let path = dir.path().to_path_buf();
477 {
478 let mut wal = RaftWal::open(&path).expect("open");
479 wal.append(1, b"durable").expect("a");
480 wal.sync().expect("sync");
481 }
482 {
483 let wal = RaftWal::open(&path).expect("reopen");
484 assert_eq!(wal.get(1).as_deref(), Some(b"durable".as_slice()));
485 }
486 }
487
488 #[test]
489 fn meta_survives_crash() {
490 let dir = tempfile::tempdir().expect("tempdir");
491 let path = dir.path().to_path_buf();
492 {
493 let mut wal = RaftWal::open(&path).expect("open");
494 wal.set_meta("term", b"5").expect("set term");
495 wal.set_meta("vote", b"node-2").expect("set vote");
496 }
497 {
498 let wal = RaftWal::open(&path).expect("reopen");
499 assert_eq!(wal.get_meta("term"), Some(b"5".as_slice()));
500 assert_eq!(wal.get_meta("vote"), Some(b"node-2".as_slice()));
501 }
502 }
503
504 #[test]
505 fn error_source_chain() {
506 let err = WalError::Io(std::io::Error::new(std::io::ErrorKind::NotFound, "gone"));
507 assert!(std::error::Error::source(&err).is_some());
508 }
509
510 #[test]
511 fn estimated_memory_increases() {
512 let dir = tempfile::tempdir().expect("tempdir");
513 let mut wal = RaftWal::open(dir.path()).expect("open");
514 let before = wal.estimated_memory();
515 for i in 1..=100 {
516 wal.append(i, &[0u8; 256]).expect("a");
517 }
518 assert!(wal.estimated_memory() > before);
519 }
520
521 #[test]
522 fn segment_rotation() {
523 let dir = tempfile::tempdir().expect("tempdir");
524 let mut wal = RaftWal::open(dir.path()).expect("open");
525 wal.set_max_segment_size(200);
526
527 for i in 1..=100 {
528 wal.append(i, &[0u8; 32]).expect("a");
529 }
530
531 for i in 1..=100 {
533 assert!(wal.get(i).is_some(), "missing index {i}");
534 }
535 }
536
537 #[test]
538 fn segment_rotation_recovery() {
539 let dir = tempfile::tempdir().expect("tempdir");
540 let path = dir.path().to_path_buf();
541 {
542 let mut wal = RaftWal::open(&path).expect("open");
543 wal.set_max_segment_size(200);
544 for i in 1..=100 {
545 wal.append(i, format!("e{i}").as_bytes()).expect("a");
546 }
547 }
548 {
549 let wal = RaftWal::open(&path).expect("reopen");
550 assert_eq!(wal.len(), 100);
551 assert_eq!(wal.get(1).as_deref(), Some(b"e1".as_slice()));
552 assert_eq!(wal.get(100).as_deref(), Some(b"e100".as_slice()));
553 }
554 }
555
556 #[test]
557 fn compact_deletes_old_segments() {
558 let dir = tempfile::tempdir().expect("tempdir");
559 let mut wal = RaftWal::open(dir.path()).expect("open");
560 wal.set_max_segment_size(200);
561
562 for i in 1..=100 {
563 wal.append(i, &[0u8; 32]).expect("a");
564 }
565 let seg_count_before = segment::list_segments(dir.path()).len();
566
567 wal.compact(80).expect("compact");
568 let seg_count_after = segment::list_segments(dir.path()).len();
569
570 assert!(seg_count_after < seg_count_before);
571 assert_eq!(wal.len(), 20);
572 assert_eq!(wal.first_index(), Some(81));
573 }
574}