Skip to main content

orca_control/raft/
log_store.rs

1//! In-memory Raft log storage for Orca.
2
3use std::collections::BTreeMap;
4use std::fmt::Debug;
5use std::ops::RangeBounds;
6use std::sync::Arc;
7
8use openraft::storage::{LogFlushed, LogState, RaftLogStorage};
9use openraft::{Entry, LogId, RaftLogReader, StorageError, Vote};
10use tokio::sync::Mutex;
11
12use super::type_config::OrcaTypeConfig;
13
14type C = OrcaTypeConfig;
15
16/// Shared inner state of the log store, guarded by a mutex.
17struct Inner {
18    last_purged: Option<LogId<u64>>,
19    log: BTreeMap<u64, Entry<C>>,
20    vote: Option<Vote<u64>>,
21}
22
23/// In-memory Raft log store.
24///
25/// Entries live in a `BTreeMap<u64, Entry>` behind an `Arc<Mutex<>>`.
26/// Sufficient for single-node and small clusters; for durable setups,
27/// swap in a redb-backed implementation.
28#[derive(Clone)]
29pub struct LogStore {
30    inner: Arc<Mutex<Inner>>,
31}
32
33impl Default for LogStore {
34    fn default() -> Self {
35        Self::new()
36    }
37}
38
39impl LogStore {
40    pub fn new() -> Self {
41        Self {
42            inner: Arc::new(Mutex::new(Inner {
43                last_purged: None,
44                log: BTreeMap::new(),
45                vote: None,
46            })),
47        }
48    }
49}
50
51impl RaftLogReader<C> for LogStore {
52    async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send>(
53        &mut self,
54        range: RB,
55    ) -> Result<Vec<Entry<C>>, StorageError<u64>> {
56        let inner = self.inner.lock().await;
57        let entries = inner.log.range(range).map(|(_, v)| v.clone()).collect();
58        Ok(entries)
59    }
60}
61
62impl RaftLogStorage<C> for LogStore {
63    type LogReader = Self;
64
65    async fn get_log_state(&mut self) -> Result<LogState<C>, StorageError<u64>> {
66        let inner = self.inner.lock().await;
67        let last_log_id = inner
68            .log
69            .last_key_value()
70            .map(|(_, e)| e.log_id)
71            .or(inner.last_purged);
72        Ok(LogState {
73            last_purged_log_id: inner.last_purged,
74            last_log_id,
75        })
76    }
77
78    async fn get_log_reader(&mut self) -> Self::LogReader {
79        self.clone()
80    }
81
82    async fn save_vote(&mut self, vote: &Vote<u64>) -> Result<(), StorageError<u64>> {
83        let mut inner = self.inner.lock().await;
84        inner.vote = Some(*vote);
85        Ok(())
86    }
87
88    async fn read_vote(&mut self) -> Result<Option<Vote<u64>>, StorageError<u64>> {
89        let inner = self.inner.lock().await;
90        Ok(inner.vote)
91    }
92
93    async fn append<I>(
94        &mut self,
95        entries: I,
96        callback: LogFlushed<C>,
97    ) -> Result<(), StorageError<u64>>
98    where
99        I: IntoIterator<Item = Entry<C>> + Send,
100        I::IntoIter: Send,
101    {
102        let mut inner = self.inner.lock().await;
103        for entry in entries {
104            let idx = entry.log_id.index;
105            inner.log.insert(idx, entry);
106        }
107        // In-memory store is immediately "flushed".
108        callback.log_io_completed(Ok(()));
109        Ok(())
110    }
111
112    async fn truncate(&mut self, log_id: LogId<u64>) -> Result<(), StorageError<u64>> {
113        let mut inner = self.inner.lock().await;
114        let to_remove: Vec<u64> = inner.log.range(log_id.index..).map(|(k, _)| *k).collect();
115        for k in to_remove {
116            inner.log.remove(&k);
117        }
118        Ok(())
119    }
120
121    async fn purge(&mut self, log_id: LogId<u64>) -> Result<(), StorageError<u64>> {
122        let mut inner = self.inner.lock().await;
123        let to_remove: Vec<u64> = inner.log.range(..=log_id.index).map(|(k, _)| *k).collect();
124        for k in to_remove {
125            inner.log.remove(&k);
126        }
127        inner.last_purged = Some(log_id);
128        Ok(())
129    }
130}