1#![allow(clippy::result_large_err)]
8
9use crate::error::{ClusterError, Result};
10use openraft::storage::{LogState, RaftLogReader, RaftLogStorage};
11use openraft::{StorageError, StorageIOError};
12use redb::{Database, ReadableTable, TableDefinition};
13use serde::{Deserialize, Serialize};
14use std::fmt::Debug;
15use std::ops::RangeBounds;
16use std::path::Path;
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::Arc;
19use tokio::sync::RwLock;
20use tracing::{debug, info};
21
22use crate::raft::{NodeId, RaftEntry, RaftLogId, RaftVote, TypeConfig};
23
24const LOGS_TABLE: TableDefinition<u64, &[u8]> = TableDefinition::new("raft_logs");
26
27const STATE_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("raft_state");
29
30const KEY_VOTE: &str = "vote";
32const KEY_LAST_PURGED: &str = "last_purged";
33const KEY_COMMITTED: &str = "committed";
34
35pub struct RedbLogStore {
52 db: Arc<Database>,
54 vote: RwLock<Option<RaftVote>>,
56 last_purged: RwLock<Option<RaftLogId>>,
58 committed: RwLock<Option<RaftLogId>>,
60 write_version: Arc<AtomicU64>,
66 snapshot_version: Option<u64>,
69}
70
71impl RedbLogStore {
72 pub fn new(path: impl AsRef<Path>) -> Result<Self> {
76 let path = path.as_ref();
77
78 if let Some(parent) = path.parent() {
80 std::fs::create_dir_all(parent)
81 .map_err(|e| ClusterError::RaftStorage(format!("Failed to create dir: {}", e)))?;
82 }
83
84 let db = Database::create(path)
86 .map_err(|e| ClusterError::RaftStorage(format!("Failed to open redb: {}", e)))?;
87
88 let db = Arc::new(db);
89
90 {
92 let write_txn = db
93 .begin_write()
94 .map_err(|e| ClusterError::RaftStorage(e.to_string()))?;
95 {
96 let _ = write_txn.open_table(LOGS_TABLE);
98 let _ = write_txn.open_table(STATE_TABLE);
99 }
100 write_txn
101 .commit()
102 .map_err(|e| ClusterError::RaftStorage(e.to_string()))?;
103 }
104
105 let vote = Self::load_state_static(&db, KEY_VOTE);
107 let last_purged = Self::load_state_static(&db, KEY_LAST_PURGED);
108 let committed = Self::load_state_static(&db, KEY_COMMITTED);
109
110 info!(
111 ?vote,
112 ?last_purged,
113 ?committed,
114 "Opened redb Raft log storage"
115 );
116
117 Ok(Self {
118 db,
119 vote: RwLock::new(vote),
120 last_purged: RwLock::new(last_purged),
121 committed: RwLock::new(committed),
122 write_version: Arc::new(AtomicU64::new(0)),
123 snapshot_version: None,
124 })
125 }
126
127 fn load_state_static<T: for<'de> Deserialize<'de>>(db: &Database, key: &str) -> Option<T> {
129 let read_txn = db.begin_read().ok()?;
130 let table = read_txn.open_table(STATE_TABLE).ok()?;
131 let value = table.get(key).ok()??;
132 postcard::from_bytes(value.value()).ok()
133 }
134
135 fn save_state<T: Serialize>(
137 &self,
138 key: &str,
139 value: &T,
140 ) -> std::result::Result<(), StorageError<NodeId>> {
141 let bytes = postcard::to_allocvec(value).map_err(|e| StorageError::IO {
142 source: StorageIOError::write_logs(openraft::AnyError::new(&e)),
143 })?;
144 self.save_states(&[(key, &bytes)])
145 }
146
147 fn save_states(
152 &self,
153 entries: &[(&str, &[u8])],
154 ) -> std::result::Result<(), StorageError<NodeId>> {
155 let write_txn = self.db.begin_write().map_err(|e| StorageError::IO {
156 source: StorageIOError::write_logs(openraft::AnyError::new(&e)),
157 })?;
158 {
159 let mut table = write_txn
160 .open_table(STATE_TABLE)
161 .map_err(|e| StorageError::IO {
162 source: StorageIOError::write_logs(openraft::AnyError::new(&e)),
163 })?;
164 for (key, bytes) in entries {
165 table.insert(*key, *bytes).map_err(|e| StorageError::IO {
166 source: StorageIOError::write_logs(openraft::AnyError::new(&e)),
167 })?;
168 }
169 }
170 write_txn.commit().map_err(|e| StorageError::IO {
171 source: StorageIOError::write_logs(openraft::AnyError::new(&e)),
172 })?;
173 Ok(())
174 }
175
176 fn last_log(&self) -> std::result::Result<Option<RaftEntry>, StorageError<NodeId>> {
178 let read_txn = self.db.begin_read().map_err(|e| StorageError::IO {
179 source: StorageIOError::read_logs(openraft::AnyError::new(&e)),
180 })?;
181 let table = read_txn
182 .open_table(LOGS_TABLE)
183 .map_err(|e| StorageError::IO {
184 source: StorageIOError::read_logs(openraft::AnyError::new(&e)),
185 })?;
186
187 let mut iter = table.iter().map_err(|e| StorageError::IO {
189 source: StorageIOError::read_logs(openraft::AnyError::new(&e)),
190 })?;
191
192 if let Some(result) = iter.next_back() {
193 let (_, value) = result.map_err(|e| StorageError::IO {
194 source: StorageIOError::read_logs(openraft::AnyError::new(&e)),
195 })?;
196 let entry: RaftEntry =
197 postcard::from_bytes(value.value()).map_err(|e| StorageError::IO {
198 source: StorageIOError::read_logs(openraft::AnyError::new(&e)),
199 })?;
200 return Ok(Some(entry));
201 }
202 Ok(None)
203 }
204
205 #[allow(dead_code)]
207 fn get_log(&self, index: u64) -> std::result::Result<Option<RaftEntry>, StorageError<NodeId>> {
208 let read_txn = self.db.begin_read().map_err(|e| StorageError::IO {
209 source: StorageIOError::read_logs(openraft::AnyError::new(&e)),
210 })?;
211 let table = read_txn
212 .open_table(LOGS_TABLE)
213 .map_err(|e| StorageError::IO {
214 source: StorageIOError::read_logs(openraft::AnyError::new(&e)),
215 })?;
216
217 match table.get(index) {
218 Ok(Some(value)) => {
219 let entry: RaftEntry =
220 postcard::from_bytes(value.value()).map_err(|e| StorageError::IO {
221 source: StorageIOError::read_logs(openraft::AnyError::new(&e)),
222 })?;
223 Ok(Some(entry))
224 }
225 Ok(None) => Ok(None),
226 Err(e) => Err(StorageError::IO {
227 source: StorageIOError::read_logs(openraft::AnyError::new(&e)),
228 }),
229 }
230 }
231
232 fn append_logs(&self, entries: &[RaftEntry]) -> std::result::Result<(), StorageError<NodeId>> {
234 if entries.is_empty() {
235 return Ok(());
236 }
237
238 let write_txn = self.db.begin_write().map_err(|e| StorageError::IO {
239 source: StorageIOError::write_logs(openraft::AnyError::new(&e)),
240 })?;
241 {
242 let mut table = write_txn
243 .open_table(LOGS_TABLE)
244 .map_err(|e| StorageError::IO {
245 source: StorageIOError::write_logs(openraft::AnyError::new(&e)),
246 })?;
247
248 for entry in entries {
249 let value = postcard::to_allocvec(entry).map_err(|e| StorageError::IO {
250 source: StorageIOError::write_logs(openraft::AnyError::new(&e)),
251 })?;
252 table
253 .insert(entry.log_id.index, value.as_slice())
254 .map_err(|e| StorageError::IO {
255 source: StorageIOError::write_logs(openraft::AnyError::new(&e)),
256 })?;
257 }
258 }
259 write_txn.commit().map_err(|e| StorageError::IO {
260 source: StorageIOError::write_logs(openraft::AnyError::new(&e)),
261 })?;
262 Ok(())
263 }
264
265 fn bump_version(&self) {
267 self.write_version.fetch_add(1, Ordering::Release);
268 }
269
270 pub fn is_cache_stale(&self) -> bool {
277 match self.snapshot_version {
278 Some(v) => self.write_version.load(Ordering::Acquire) != v,
279 None => false,
280 }
281 }
282
283 pub async fn refresh_cache_if_stale(&self) {
286 if !self.is_cache_stale() {
287 return;
288 }
289 let vote: Option<RaftVote> = Self::load_state_static(&self.db, KEY_VOTE);
290 let last_purged: Option<RaftLogId> = Self::load_state_static(&self.db, KEY_LAST_PURGED);
291 let committed: Option<RaftLogId> = Self::load_state_static(&self.db, KEY_COMMITTED);
292 *self.vote.write().await = vote;
293 *self.last_purged.write().await = last_purged;
294 *self.committed.write().await = committed;
295 debug!("Refreshed stale reader cache from DB");
296 }
297
298 fn delete_logs_range(
304 &self,
305 start: u64,
306 end: u64,
307 ) -> std::result::Result<(), StorageError<NodeId>> {
308 let write_txn = self.db.begin_write().map_err(|e| StorageError::IO {
309 source: StorageIOError::write_logs(openraft::AnyError::new(&e)),
310 })?;
311 {
312 let mut table = write_txn
313 .open_table(LOGS_TABLE)
314 .map_err(|e| StorageError::IO {
315 source: StorageIOError::write_logs(openraft::AnyError::new(&e)),
316 })?;
317
318 table
320 .retain_in(start..end, |_, _| false)
321 .map_err(|e| StorageError::IO {
322 source: StorageIOError::write_logs(openraft::AnyError::new(&e)),
323 })?;
324 }
325 write_txn.commit().map_err(|e| StorageError::IO {
326 source: StorageIOError::write_logs(openraft::AnyError::new(&e)),
327 })?;
328 Ok(())
329 }
330}
331
332impl RaftLogReader<TypeConfig> for RedbLogStore {
334 async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send>(
337 &mut self,
338 range: RB,
339 ) -> std::result::Result<Vec<RaftEntry>, StorageError<NodeId>> {
340 let start = match range.start_bound() {
341 std::ops::Bound::Included(&n) => n,
342 std::ops::Bound::Excluded(&n) => n + 1,
343 std::ops::Bound::Unbounded => 0,
344 };
345 let end = match range.end_bound() {
346 std::ops::Bound::Included(&n) => n + 1,
347 std::ops::Bound::Excluded(&n) => n,
348 std::ops::Bound::Unbounded => u64::MAX,
349 };
350
351 let read_txn = self.db.begin_read().map_err(|e| StorageError::IO {
353 source: StorageIOError::read_logs(openraft::AnyError::new(&e)),
354 })?;
355 let table = read_txn
356 .open_table(LOGS_TABLE)
357 .map_err(|e| StorageError::IO {
358 source: StorageIOError::read_logs(openraft::AnyError::new(&e)),
359 })?;
360
361 let mut entries = Vec::new();
362 let iter = table.range(start..end).map_err(|e| StorageError::IO {
363 source: StorageIOError::read_logs(openraft::AnyError::new(&e)),
364 })?;
365
366 for item in iter {
367 let (_, value) = item.map_err(|e| StorageError::IO {
368 source: StorageIOError::read_logs(openraft::AnyError::new(&e)),
369 })?;
370 let entry: RaftEntry =
371 postcard::from_bytes(value.value()).map_err(|e| StorageError::IO {
372 source: StorageIOError::read_logs(openraft::AnyError::new(&e)),
373 })?;
374 entries.push(entry);
375 }
376
377 Ok(entries)
378 }
379}
380
381impl RaftLogStorage<TypeConfig> for RedbLogStore {
383 type LogReader = Self;
384
385 async fn get_log_state(
386 &mut self,
387 ) -> std::result::Result<LogState<TypeConfig>, StorageError<NodeId>> {
388 let last_purged = *self.last_purged.read().await;
389 let last_log = self.last_log()?;
390
391 let last_log_id = last_log.map(|e| e.log_id).or(last_purged);
392
393 Ok(LogState {
394 last_purged_log_id: last_purged,
395 last_log_id,
396 })
397 }
398
399 async fn get_log_reader(&mut self) -> Self::LogReader {
400 let current_version = self.write_version.load(Ordering::Acquire);
409 Self {
410 db: self.db.clone(),
411 vote: RwLock::new(*self.vote.read().await),
412 last_purged: RwLock::new(*self.last_purged.read().await),
413 committed: RwLock::new(*self.committed.read().await),
414 write_version: self.write_version.clone(),
415 snapshot_version: Some(current_version),
416 }
417 }
418
419 async fn save_vote(
420 &mut self,
421 vote: &RaftVote,
422 ) -> std::result::Result<(), StorageError<NodeId>> {
423 self.save_state(KEY_VOTE, vote)?;
424 *self.vote.write().await = Some(*vote);
425 self.bump_version();
426 debug!(?vote, "Saved vote");
427 Ok(())
428 }
429
430 async fn read_vote(&mut self) -> std::result::Result<Option<RaftVote>, StorageError<NodeId>> {
431 Ok(*self.vote.read().await)
432 }
433
434 async fn save_committed(
435 &mut self,
436 committed: Option<RaftLogId>,
437 ) -> std::result::Result<(), StorageError<NodeId>> {
438 if let Some(ref c) = committed {
439 self.save_state(KEY_COMMITTED, c)?;
440 }
441 *self.committed.write().await = committed;
442 self.bump_version();
443 Ok(())
444 }
445
446 async fn read_committed(
447 &mut self,
448 ) -> std::result::Result<Option<RaftLogId>, StorageError<NodeId>> {
449 Ok(*self.committed.read().await)
450 }
451
452 async fn append<I>(
453 &mut self,
454 entries: I,
455 callback: openraft::storage::LogFlushed<TypeConfig>,
456 ) -> std::result::Result<(), StorageError<NodeId>>
457 where
458 I: IntoIterator<Item = RaftEntry> + Send,
459 I::IntoIter: Send,
460 {
461 let entries: Vec<_> = entries.into_iter().collect();
463 self.append_logs(&entries)?;
464 self.bump_version();
465
466 callback.log_io_completed(Ok(()));
468 Ok(())
469 }
470
471 async fn truncate(
472 &mut self,
473 log_id: RaftLogId,
474 ) -> std::result::Result<(), StorageError<NodeId>> {
475 let start = log_id.index + 1;
477 let log_state = RaftLogStorage::get_log_state(self).await?;
478 if let Some(last) = log_state.last_log_id {
479 self.delete_logs_range(start, last.index + 1)?;
480 }
481 self.bump_version();
482 debug!(?log_id, "Truncated logs");
483 Ok(())
484 }
485
486 async fn purge(&mut self, log_id: RaftLogId) -> std::result::Result<(), StorageError<NodeId>> {
487 let current_purged = *self.last_purged.read().await;
489 let start = current_purged.map(|l| l.index + 1).unwrap_or(0);
490
491 self.delete_logs_range(start, log_id.index + 1)?;
492
493 self.save_state(KEY_LAST_PURGED, &log_id)?;
495 *self.last_purged.write().await = Some(log_id);
496 self.bump_version();
497 debug!(?log_id, "Purged logs");
498 Ok(())
499 }
500}
501
502#[cfg(test)]
503mod tests {
504 use super::*;
505 use crate::raft::{RaftEntry, RaftLogId, RaftVote};
506 use openraft::{Entry, EntryPayload, LogId, Vote};
507 use tempfile::TempDir;
508
509 fn create_entry(index: u64, term: u64) -> RaftEntry {
510 Entry {
511 log_id: LogId {
512 leader_id: openraft::LeaderId::new(term, 1),
513 index,
514 },
515 payload: EntryPayload::Blank,
516 }
517 }
518
519 #[tokio::test]
520 async fn test_redb_store_basic() {
521 let temp_dir = TempDir::new().unwrap();
522 let path = temp_dir.path().join("raft.redb");
523
524 let mut store = RedbLogStore::new(&path).unwrap();
525
526 let state = store.get_log_state().await.unwrap();
528 assert!(state.last_log_id.is_none());
529
530 let entries = vec![create_entry(1, 1), create_entry(2, 1), create_entry(3, 1)];
532 store.append_logs(&entries).unwrap();
533
534 let state = store.get_log_state().await.unwrap();
536 assert_eq!(state.last_log_id.unwrap().index, 3);
537
538 let read_entries = store.try_get_log_entries(1..=3).await.unwrap();
540 assert_eq!(read_entries.len(), 3);
541 }
542
543 #[tokio::test]
544 async fn test_redb_store_vote() {
545 let temp_dir = TempDir::new().unwrap();
546 let path = temp_dir.path().join("raft.redb");
547
548 let mut store = RedbLogStore::new(&path).unwrap();
549
550 let vote = store.read_vote().await.unwrap();
552 assert!(vote.is_none());
553
554 let test_vote: RaftVote = Vote::new(1, 1);
556 store.save_vote(&test_vote).await.unwrap();
557
558 let vote = store.read_vote().await.unwrap();
560 assert_eq!(vote, Some(test_vote));
561
562 drop(store);
564 let mut store = RedbLogStore::new(&path).unwrap();
565 let vote = store.read_vote().await.unwrap();
566 assert_eq!(vote, Some(test_vote));
567 }
568
569 #[tokio::test]
570 async fn test_redb_store_truncate() {
571 let temp_dir = TempDir::new().unwrap();
572 let path = temp_dir.path().join("raft.redb");
573
574 let mut store = RedbLogStore::new(&path).unwrap();
575
576 let entries = vec![
578 create_entry(1, 1),
579 create_entry(2, 1),
580 create_entry(3, 1),
581 create_entry(4, 1),
582 create_entry(5, 1),
583 ];
584 store.append_logs(&entries).unwrap();
585
586 let log_id: RaftLogId = LogId {
588 leader_id: openraft::LeaderId::new(1, 1),
589 index: 3,
590 };
591 store.truncate(log_id).await.unwrap();
592
593 let state = store.get_log_state().await.unwrap();
595 assert_eq!(state.last_log_id.unwrap().index, 3);
596 }
597
598 #[tokio::test]
599 async fn test_redb_store_purge() {
600 let temp_dir = TempDir::new().unwrap();
601 let path = temp_dir.path().join("raft.redb");
602
603 let mut store = RedbLogStore::new(&path).unwrap();
604
605 let entries = vec![
607 create_entry(1, 1),
608 create_entry(2, 1),
609 create_entry(3, 1),
610 create_entry(4, 1),
611 create_entry(5, 1),
612 ];
613 store.append_logs(&entries).unwrap();
614
615 let log_id: RaftLogId = LogId {
617 leader_id: openraft::LeaderId::new(1, 1),
618 index: 3,
619 };
620 store.purge(log_id).await.unwrap();
621
622 let state = store.get_log_state().await.unwrap();
624 assert_eq!(state.last_purged_log_id.unwrap().index, 3);
625
626 let entries = store.try_get_log_entries(1..=3).await.unwrap();
628 assert!(entries.is_empty());
629
630 let entries = store.try_get_log_entries(4..=5).await.unwrap();
632 assert_eq!(entries.len(), 2);
633 }
634
635 #[tokio::test]
636 async fn test_redb_store_persistence() {
637 let temp_dir = TempDir::new().unwrap();
638 let path = temp_dir.path().join("raft.redb");
639
640 {
642 let mut store = RedbLogStore::new(&path).unwrap();
643 let entries = vec![create_entry(1, 1), create_entry(2, 1)];
644 store.append_logs(&entries).unwrap();
645
646 let vote: RaftVote = Vote::new(2, 1);
647 store.save_vote(&vote).await.unwrap();
648 }
649
650 {
652 let mut store = RedbLogStore::new(&path).unwrap();
653
654 let state = store.get_log_state().await.unwrap();
656 assert_eq!(state.last_log_id.unwrap().index, 2);
657
658 let vote = store.read_vote().await.unwrap();
660 assert_eq!(vote.unwrap().leader_id().term, 2);
661 }
662 }
663}