Skip to main content

rivven_cluster/storage/
redb_store.rs

1//! redb-based Raft log storage
2//!
3//! Pure Rust implementation using redb - zero C dependencies, compiles
4//! cleanly for all targets including musl.
5
6// Suppress clippy warning for large error types from openraft crate
7#![allow(clippy::result_large_err)]
8
9use crate::error::{ClusterError, Result};
10use openraft::storage::{LogState, RaftLogReader, RaftLogStorage};
11use openraft::{StorageError, StorageIOError};
12use redb::{Database, ReadableTable, TableDefinition};
13use serde::{Deserialize, Serialize};
14use std::fmt::Debug;
15use std::ops::RangeBounds;
16use std::path::Path;
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::Arc;
19use tokio::sync::RwLock;
20use tracing::{debug, info};
21
22use crate::raft::{NodeId, RaftEntry, RaftLogId, RaftVote, TypeConfig};
23
24/// Table for Raft log entries (key: u64 index, value: serialized entry)
25const LOGS_TABLE: TableDefinition<u64, &[u8]> = TableDefinition::new("raft_logs");
26
27/// Table for Raft state (key: string, value: serialized data)
28const STATE_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("raft_state");
29
30/// State keys
31const KEY_VOTE: &str = "vote";
32const KEY_LAST_PURGED: &str = "last_purged";
33const KEY_COMMITTED: &str = "committed";
34
35/// redb-backed Raft log storage
36///
37/// This is a pure Rust implementation that:
38/// - Has zero C/C++ dependencies
39/// - Compiles for any target (including musl)
40/// - Provides ACID transactions
41/// - Uses B-tree for efficient ordered access
42///
43/// # Performance Characteristics
44///
45/// - **Write**: O(log n) per entry, batched for throughput
46/// - **Read**: O(log n) point lookup, O(k) range scan for k entries
47/// - **Space**: ~1.5x raw data size (B-tree overhead)
48///
49/// For Raft workloads (append-heavy with sequential reads), this provides
50/// excellent performance with the benefit of pure Rust simplicity.
51pub struct RedbLogStore {
52    /// redb database instance
53    db: Arc<Database>,
54    /// Cached vote (also persisted)
55    vote: RwLock<Option<RaftVote>>,
56    /// Last purged log ID
57    last_purged: RwLock<Option<RaftLogId>>,
58    /// Committed log ID
59    committed: RwLock<Option<RaftLogId>>,
60    /// Monotonic write-version counter.
61    ///
62    /// Incremented on every mutation (save_vote, save_committed, append, truncate,
63    /// purge). Log readers snapshot this value at clone time and can detect
64    /// staleness by comparing against the shared counter.
65    write_version: Arc<AtomicU64>,
66    /// The write-version observed when this instance was cloned (for readers).
67    /// `None` on the primary store (always authoritative).
68    snapshot_version: Option<u64>,
69}
70
71impl RedbLogStore {
72    /// Create new redb log storage at the given path
73    ///
74    /// Creates the directory if it doesn't exist.
75    pub fn new(path: impl AsRef<Path>) -> Result<Self> {
76        let path = path.as_ref();
77
78        // Create directory if needed
79        if let Some(parent) = path.parent() {
80            std::fs::create_dir_all(parent)
81                .map_err(|e| ClusterError::RaftStorage(format!("Failed to create dir: {}", e)))?;
82        }
83
84        // Open redb database
85        let db = Database::create(path)
86            .map_err(|e| ClusterError::RaftStorage(format!("Failed to open redb: {}", e)))?;
87
88        let db = Arc::new(db);
89
90        // Initialize tables
91        {
92            let write_txn = db
93                .begin_write()
94                .map_err(|e| ClusterError::RaftStorage(e.to_string()))?;
95            {
96                // Create tables if they don't exist
97                let _ = write_txn.open_table(LOGS_TABLE);
98                let _ = write_txn.open_table(STATE_TABLE);
99            }
100            write_txn
101                .commit()
102                .map_err(|e| ClusterError::RaftStorage(e.to_string()))?;
103        }
104
105        // Load persisted state
106        let vote = Self::load_state_static(&db, KEY_VOTE);
107        let last_purged = Self::load_state_static(&db, KEY_LAST_PURGED);
108        let committed = Self::load_state_static(&db, KEY_COMMITTED);
109
110        info!(
111            ?vote,
112            ?last_purged,
113            ?committed,
114            "Opened redb Raft log storage"
115        );
116
117        Ok(Self {
118            db,
119            vote: RwLock::new(vote),
120            last_purged: RwLock::new(last_purged),
121            committed: RwLock::new(committed),
122            write_version: Arc::new(AtomicU64::new(0)),
123            snapshot_version: None,
124        })
125    }
126
127    /// Load state from database
128    fn load_state_static<T: for<'de> Deserialize<'de>>(db: &Database, key: &str) -> Option<T> {
129        let read_txn = db.begin_read().ok()?;
130        let table = read_txn.open_table(STATE_TABLE).ok()?;
131        let value = table.get(key).ok()??;
132        postcard::from_bytes(value.value()).ok()
133    }
134
135    /// Save state to database (single key convenience wrapper)
136    fn save_state<T: Serialize>(
137        &self,
138        key: &str,
139        value: &T,
140    ) -> std::result::Result<(), StorageError<NodeId>> {
141        let bytes = postcard::to_allocvec(value).map_err(|e| StorageError::IO {
142            source: StorageIOError::write_logs(openraft::AnyError::new(&e)),
143        })?;
144        self.save_states(&[(key, &bytes)])
145    }
146
147    /// Save multiple state entries in a single write transaction.
148    ///
149    /// Consolidates multiple key-value writes into one ACID transaction,
150    /// avoiding the overhead and inconsistency risk of separate transactions.
151    fn save_states(
152        &self,
153        entries: &[(&str, &[u8])],
154    ) -> std::result::Result<(), StorageError<NodeId>> {
155        let write_txn = self.db.begin_write().map_err(|e| StorageError::IO {
156            source: StorageIOError::write_logs(openraft::AnyError::new(&e)),
157        })?;
158        {
159            let mut table = write_txn
160                .open_table(STATE_TABLE)
161                .map_err(|e| StorageError::IO {
162                    source: StorageIOError::write_logs(openraft::AnyError::new(&e)),
163                })?;
164            for (key, bytes) in entries {
165                table.insert(*key, *bytes).map_err(|e| StorageError::IO {
166                    source: StorageIOError::write_logs(openraft::AnyError::new(&e)),
167                })?;
168            }
169        }
170        write_txn.commit().map_err(|e| StorageError::IO {
171            source: StorageIOError::write_logs(openraft::AnyError::new(&e)),
172        })?;
173        Ok(())
174    }
175
176    /// Get the last log entry
177    fn last_log(&self) -> std::result::Result<Option<RaftEntry>, StorageError<NodeId>> {
178        let read_txn = self.db.begin_read().map_err(|e| StorageError::IO {
179            source: StorageIOError::read_logs(openraft::AnyError::new(&e)),
180        })?;
181        let table = read_txn
182            .open_table(LOGS_TABLE)
183            .map_err(|e| StorageError::IO {
184                source: StorageIOError::read_logs(openraft::AnyError::new(&e)),
185            })?;
186
187        // Get the last entry using reverse iteration
188        let mut iter = table.iter().map_err(|e| StorageError::IO {
189            source: StorageIOError::read_logs(openraft::AnyError::new(&e)),
190        })?;
191
192        if let Some(result) = iter.next_back() {
193            let (_, value) = result.map_err(|e| StorageError::IO {
194                source: StorageIOError::read_logs(openraft::AnyError::new(&e)),
195            })?;
196            let entry: RaftEntry =
197                postcard::from_bytes(value.value()).map_err(|e| StorageError::IO {
198                    source: StorageIOError::read_logs(openraft::AnyError::new(&e)),
199                })?;
200            return Ok(Some(entry));
201        }
202        Ok(None)
203    }
204
205    /// Get log entry by index
206    #[allow(dead_code)]
207    fn get_log(&self, index: u64) -> std::result::Result<Option<RaftEntry>, StorageError<NodeId>> {
208        let read_txn = self.db.begin_read().map_err(|e| StorageError::IO {
209            source: StorageIOError::read_logs(openraft::AnyError::new(&e)),
210        })?;
211        let table = read_txn
212            .open_table(LOGS_TABLE)
213            .map_err(|e| StorageError::IO {
214                source: StorageIOError::read_logs(openraft::AnyError::new(&e)),
215            })?;
216
217        match table.get(index) {
218            Ok(Some(value)) => {
219                let entry: RaftEntry =
220                    postcard::from_bytes(value.value()).map_err(|e| StorageError::IO {
221                        source: StorageIOError::read_logs(openraft::AnyError::new(&e)),
222                    })?;
223                Ok(Some(entry))
224            }
225            Ok(None) => Ok(None),
226            Err(e) => Err(StorageError::IO {
227                source: StorageIOError::read_logs(openraft::AnyError::new(&e)),
228            }),
229        }
230    }
231
232    /// Append a batch of log entries (single transaction for efficiency)
233    fn append_logs(&self, entries: &[RaftEntry]) -> std::result::Result<(), StorageError<NodeId>> {
234        if entries.is_empty() {
235            return Ok(());
236        }
237
238        let write_txn = self.db.begin_write().map_err(|e| StorageError::IO {
239            source: StorageIOError::write_logs(openraft::AnyError::new(&e)),
240        })?;
241        {
242            let mut table = write_txn
243                .open_table(LOGS_TABLE)
244                .map_err(|e| StorageError::IO {
245                    source: StorageIOError::write_logs(openraft::AnyError::new(&e)),
246                })?;
247
248            for entry in entries {
249                let value = postcard::to_allocvec(entry).map_err(|e| StorageError::IO {
250                    source: StorageIOError::write_logs(openraft::AnyError::new(&e)),
251                })?;
252                table
253                    .insert(entry.log_id.index, value.as_slice())
254                    .map_err(|e| StorageError::IO {
255                        source: StorageIOError::write_logs(openraft::AnyError::new(&e)),
256                    })?;
257            }
258        }
259        write_txn.commit().map_err(|e| StorageError::IO {
260            source: StorageIOError::write_logs(openraft::AnyError::new(&e)),
261        })?;
262        Ok(())
263    }
264
265    /// Bump the write-version counter to invalidate reader caches.
266    fn bump_version(&self) {
267        self.write_version.fetch_add(1, Ordering::Release);
268    }
269
270    /// Check whether this instance's cached metadata is still current.
271    ///
272    /// Returns `true` when the shared write-version counter has advanced
273    /// past the version observed at clone-time, indicating that the
274    /// parent store has been mutated (vote, committed, purge, etc.).
275    /// The primary store (snapshot_version == None) is always authoritative.
276    pub fn is_cache_stale(&self) -> bool {
277        match self.snapshot_version {
278            Some(v) => self.write_version.load(Ordering::Acquire) != v,
279            None => false,
280        }
281    }
282
283    /// Reload cached metadata (vote, last_purged, committed) from the
284    /// database if the cache is stale. No-op on the primary store.
285    pub async fn refresh_cache_if_stale(&self) {
286        if !self.is_cache_stale() {
287            return;
288        }
289        let vote: Option<RaftVote> = Self::load_state_static(&self.db, KEY_VOTE);
290        let last_purged: Option<RaftLogId> = Self::load_state_static(&self.db, KEY_LAST_PURGED);
291        let committed: Option<RaftLogId> = Self::load_state_static(&self.db, KEY_COMMITTED);
292        *self.vote.write().await = vote;
293        *self.last_purged.write().await = last_purged;
294        *self.committed.write().await = committed;
295        debug!("Refreshed stale reader cache from DB");
296    }
297
298    /// Delete log entries in range [start, end)
299    ///
300    /// Uses redb's `retain_in()` for efficient batch deletion instead
301    /// of per-key `remove()` calls. This performs a single B-tree
302    /// traversal over the range.
303    fn delete_logs_range(
304        &self,
305        start: u64,
306        end: u64,
307    ) -> std::result::Result<(), StorageError<NodeId>> {
308        let write_txn = self.db.begin_write().map_err(|e| StorageError::IO {
309            source: StorageIOError::write_logs(openraft::AnyError::new(&e)),
310        })?;
311        {
312            let mut table = write_txn
313                .open_table(LOGS_TABLE)
314                .map_err(|e| StorageError::IO {
315                    source: StorageIOError::write_logs(openraft::AnyError::new(&e)),
316                })?;
317
318            // Batch-remove all entries in [start, end) with a single B-tree pass
319            table
320                .retain_in(start..end, |_, _| false)
321                .map_err(|e| StorageError::IO {
322                    source: StorageIOError::write_logs(openraft::AnyError::new(&e)),
323                })?;
324        }
325        write_txn.commit().map_err(|e| StorageError::IO {
326            source: StorageIOError::write_logs(openraft::AnyError::new(&e)),
327        })?;
328        Ok(())
329    }
330}
331
332// Implement RaftLogReader for RedbLogStore
333impl RaftLogReader<TypeConfig> for RedbLogStore {
334    /// Use a single read transaction with range iteration instead
335    /// of opening a separate transaction per entry via `get_log()`.
336    async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send>(
337        &mut self,
338        range: RB,
339    ) -> std::result::Result<Vec<RaftEntry>, StorageError<NodeId>> {
340        let start = match range.start_bound() {
341            std::ops::Bound::Included(&n) => n,
342            std::ops::Bound::Excluded(&n) => n + 1,
343            std::ops::Bound::Unbounded => 0,
344        };
345        let end = match range.end_bound() {
346            std::ops::Bound::Included(&n) => n + 1,
347            std::ops::Bound::Excluded(&n) => n,
348            std::ops::Bound::Unbounded => u64::MAX,
349        };
350
351        // Single read transaction for the entire range scan
352        let read_txn = self.db.begin_read().map_err(|e| StorageError::IO {
353            source: StorageIOError::read_logs(openraft::AnyError::new(&e)),
354        })?;
355        let table = read_txn
356            .open_table(LOGS_TABLE)
357            .map_err(|e| StorageError::IO {
358                source: StorageIOError::read_logs(openraft::AnyError::new(&e)),
359            })?;
360
361        let mut entries = Vec::new();
362        let iter = table.range(start..end).map_err(|e| StorageError::IO {
363            source: StorageIOError::read_logs(openraft::AnyError::new(&e)),
364        })?;
365
366        for item in iter {
367            let (_, value) = item.map_err(|e| StorageError::IO {
368                source: StorageIOError::read_logs(openraft::AnyError::new(&e)),
369            })?;
370            let entry: RaftEntry =
371                postcard::from_bytes(value.value()).map_err(|e| StorageError::IO {
372                    source: StorageIOError::read_logs(openraft::AnyError::new(&e)),
373                })?;
374            entries.push(entry);
375        }
376
377        Ok(entries)
378    }
379}
380
381// Implement RaftLogStorage for RedbLogStore
382impl RaftLogStorage<TypeConfig> for RedbLogStore {
383    type LogReader = Self;
384
385    async fn get_log_state(
386        &mut self,
387    ) -> std::result::Result<LogState<TypeConfig>, StorageError<NodeId>> {
388        let last_purged = *self.last_purged.read().await;
389        let last_log = self.last_log()?;
390
391        let last_log_id = last_log.map(|e| e.log_id).or(last_purged);
392
393        Ok(LogState {
394            last_purged_log_id: last_purged,
395            last_log_id,
396        })
397    }
398
399    async fn get_log_reader(&mut self) -> Self::LogReader {
400        // Return a new instance sharing the same DB and write-version counter.
401        //
402        // The reader records the current write-version at
403        // clone time. Consumers can call `is_cache_stale()` to detect when
404        // the parent store has been mutated, and `refresh_cache_if_stale()`
405        // to reload cached metadata from DB. The shared `Arc<AtomicU64>`
406        // counter is incremented on every mutation (save_vote, purge, etc.)
407        // providing a lightweight invalidation signal without locking.
408        let current_version = self.write_version.load(Ordering::Acquire);
409        Self {
410            db: self.db.clone(),
411            vote: RwLock::new(*self.vote.read().await),
412            last_purged: RwLock::new(*self.last_purged.read().await),
413            committed: RwLock::new(*self.committed.read().await),
414            write_version: self.write_version.clone(),
415            snapshot_version: Some(current_version),
416        }
417    }
418
419    async fn save_vote(
420        &mut self,
421        vote: &RaftVote,
422    ) -> std::result::Result<(), StorageError<NodeId>> {
423        self.save_state(KEY_VOTE, vote)?;
424        *self.vote.write().await = Some(*vote);
425        self.bump_version();
426        debug!(?vote, "Saved vote");
427        Ok(())
428    }
429
430    async fn read_vote(&mut self) -> std::result::Result<Option<RaftVote>, StorageError<NodeId>> {
431        Ok(*self.vote.read().await)
432    }
433
434    async fn save_committed(
435        &mut self,
436        committed: Option<RaftLogId>,
437    ) -> std::result::Result<(), StorageError<NodeId>> {
438        if let Some(ref c) = committed {
439            self.save_state(KEY_COMMITTED, c)?;
440        }
441        *self.committed.write().await = committed;
442        self.bump_version();
443        Ok(())
444    }
445
446    async fn read_committed(
447        &mut self,
448    ) -> std::result::Result<Option<RaftLogId>, StorageError<NodeId>> {
449        Ok(*self.committed.read().await)
450    }
451
452    async fn append<I>(
453        &mut self,
454        entries: I,
455        callback: openraft::storage::LogFlushed<TypeConfig>,
456    ) -> std::result::Result<(), StorageError<NodeId>>
457    where
458        I: IntoIterator<Item = RaftEntry> + Send,
459        I::IntoIter: Send,
460    {
461        // Collect entries for batch write
462        let entries: Vec<_> = entries.into_iter().collect();
463        self.append_logs(&entries)?;
464        self.bump_version();
465
466        // Callback after successful write
467        callback.log_io_completed(Ok(()));
468        Ok(())
469    }
470
471    async fn truncate(
472        &mut self,
473        log_id: RaftLogId,
474    ) -> std::result::Result<(), StorageError<NodeId>> {
475        // Delete all logs after log_id.index
476        let start = log_id.index + 1;
477        let log_state = RaftLogStorage::get_log_state(self).await?;
478        if let Some(last) = log_state.last_log_id {
479            self.delete_logs_range(start, last.index + 1)?;
480        }
481        self.bump_version();
482        debug!(?log_id, "Truncated logs");
483        Ok(())
484    }
485
486    async fn purge(&mut self, log_id: RaftLogId) -> std::result::Result<(), StorageError<NodeId>> {
487        // Delete logs up to and including log_id.index
488        let current_purged = *self.last_purged.read().await;
489        let start = current_purged.map(|l| l.index + 1).unwrap_or(0);
490
491        self.delete_logs_range(start, log_id.index + 1)?;
492
493        // Update and persist last_purged
494        self.save_state(KEY_LAST_PURGED, &log_id)?;
495        *self.last_purged.write().await = Some(log_id);
496        self.bump_version();
497        debug!(?log_id, "Purged logs");
498        Ok(())
499    }
500}
501
502#[cfg(test)]
503mod tests {
504    use super::*;
505    use crate::raft::{RaftEntry, RaftLogId, RaftVote};
506    use openraft::{Entry, EntryPayload, LogId, Vote};
507    use tempfile::TempDir;
508
509    fn create_entry(index: u64, term: u64) -> RaftEntry {
510        Entry {
511            log_id: LogId {
512                leader_id: openraft::LeaderId::new(term, 1),
513                index,
514            },
515            payload: EntryPayload::Blank,
516        }
517    }
518
519    #[tokio::test]
520    async fn test_redb_store_basic() {
521        let temp_dir = TempDir::new().unwrap();
522        let path = temp_dir.path().join("raft.redb");
523
524        let mut store = RedbLogStore::new(&path).unwrap();
525
526        // Initially empty
527        let state = store.get_log_state().await.unwrap();
528        assert!(state.last_log_id.is_none());
529
530        // Append entries directly using internal method
531        let entries = vec![create_entry(1, 1), create_entry(2, 1), create_entry(3, 1)];
532        store.append_logs(&entries).unwrap();
533
534        // Check state
535        let state = store.get_log_state().await.unwrap();
536        assert_eq!(state.last_log_id.unwrap().index, 3);
537
538        // Read entries back
539        let read_entries = store.try_get_log_entries(1..=3).await.unwrap();
540        assert_eq!(read_entries.len(), 3);
541    }
542
543    #[tokio::test]
544    async fn test_redb_store_vote() {
545        let temp_dir = TempDir::new().unwrap();
546        let path = temp_dir.path().join("raft.redb");
547
548        let mut store = RedbLogStore::new(&path).unwrap();
549
550        // No vote initially
551        let vote = store.read_vote().await.unwrap();
552        assert!(vote.is_none());
553
554        // Save vote
555        let test_vote: RaftVote = Vote::new(1, 1);
556        store.save_vote(&test_vote).await.unwrap();
557
558        // Read back
559        let vote = store.read_vote().await.unwrap();
560        assert_eq!(vote, Some(test_vote));
561
562        // Persistence test - reopen
563        drop(store);
564        let mut store = RedbLogStore::new(&path).unwrap();
565        let vote = store.read_vote().await.unwrap();
566        assert_eq!(vote, Some(test_vote));
567    }
568
569    #[tokio::test]
570    async fn test_redb_store_truncate() {
571        let temp_dir = TempDir::new().unwrap();
572        let path = temp_dir.path().join("raft.redb");
573
574        let mut store = RedbLogStore::new(&path).unwrap();
575
576        // Append entries
577        let entries = vec![
578            create_entry(1, 1),
579            create_entry(2, 1),
580            create_entry(3, 1),
581            create_entry(4, 1),
582            create_entry(5, 1),
583        ];
584        store.append_logs(&entries).unwrap();
585
586        // Truncate at index 3
587        let log_id: RaftLogId = LogId {
588            leader_id: openraft::LeaderId::new(1, 1),
589            index: 3,
590        };
591        store.truncate(log_id).await.unwrap();
592
593        // Should have entries 1-3
594        let state = store.get_log_state().await.unwrap();
595        assert_eq!(state.last_log_id.unwrap().index, 3);
596    }
597
598    #[tokio::test]
599    async fn test_redb_store_purge() {
600        let temp_dir = TempDir::new().unwrap();
601        let path = temp_dir.path().join("raft.redb");
602
603        let mut store = RedbLogStore::new(&path).unwrap();
604
605        // Append entries
606        let entries = vec![
607            create_entry(1, 1),
608            create_entry(2, 1),
609            create_entry(3, 1),
610            create_entry(4, 1),
611            create_entry(5, 1),
612        ];
613        store.append_logs(&entries).unwrap();
614
615        // Purge up to index 3
616        let log_id: RaftLogId = LogId {
617            leader_id: openraft::LeaderId::new(1, 1),
618            index: 3,
619        };
620        store.purge(log_id).await.unwrap();
621
622        // Check last_purged
623        let state = store.get_log_state().await.unwrap();
624        assert_eq!(state.last_purged_log_id.unwrap().index, 3);
625
626        // Entries 1-3 should be gone
627        let entries = store.try_get_log_entries(1..=3).await.unwrap();
628        assert!(entries.is_empty());
629
630        // Entries 4-5 should still exist
631        let entries = store.try_get_log_entries(4..=5).await.unwrap();
632        assert_eq!(entries.len(), 2);
633    }
634
635    #[tokio::test]
636    async fn test_redb_store_persistence() {
637        let temp_dir = TempDir::new().unwrap();
638        let path = temp_dir.path().join("raft.redb");
639
640        // Write some data
641        {
642            let mut store = RedbLogStore::new(&path).unwrap();
643            let entries = vec![create_entry(1, 1), create_entry(2, 1)];
644            store.append_logs(&entries).unwrap();
645
646            let vote: RaftVote = Vote::new(2, 1);
647            store.save_vote(&vote).await.unwrap();
648        }
649
650        // Reopen and verify
651        {
652            let mut store = RedbLogStore::new(&path).unwrap();
653
654            // Check entries persisted
655            let state = store.get_log_state().await.unwrap();
656            assert_eq!(state.last_log_id.unwrap().index, 2);
657
658            // Check vote persisted
659            let vote = store.read_vote().await.unwrap();
660            assert_eq!(vote.unwrap().leader_id().term, 2);
661        }
662    }
663}