orca_control/raft/
log_store.rs1use std::collections::BTreeMap;
4use std::fmt::Debug;
5use std::ops::RangeBounds;
6use std::sync::Arc;
7
8use openraft::storage::{LogFlushed, LogState, RaftLogStorage};
9use openraft::{Entry, LogId, RaftLogReader, StorageError, Vote};
10use tokio::sync::Mutex;
11
12use super::type_config::OrcaTypeConfig;
13
14type C = OrcaTypeConfig;
15
16struct Inner {
18 last_purged: Option<LogId<u64>>,
19 log: BTreeMap<u64, Entry<C>>,
20 vote: Option<Vote<u64>>,
21}
22
23#[derive(Clone)]
29pub struct LogStore {
30 inner: Arc<Mutex<Inner>>,
31}
32
33impl Default for LogStore {
34 fn default() -> Self {
35 Self::new()
36 }
37}
38
39impl LogStore {
40 pub fn new() -> Self {
41 Self {
42 inner: Arc::new(Mutex::new(Inner {
43 last_purged: None,
44 log: BTreeMap::new(),
45 vote: None,
46 })),
47 }
48 }
49}
50
51impl RaftLogReader<C> for LogStore {
52 async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send>(
53 &mut self,
54 range: RB,
55 ) -> Result<Vec<Entry<C>>, StorageError<u64>> {
56 let inner = self.inner.lock().await;
57 let entries = inner.log.range(range).map(|(_, v)| v.clone()).collect();
58 Ok(entries)
59 }
60}
61
62impl RaftLogStorage<C> for LogStore {
63 type LogReader = Self;
64
65 async fn get_log_state(&mut self) -> Result<LogState<C>, StorageError<u64>> {
66 let inner = self.inner.lock().await;
67 let last_log_id = inner
68 .log
69 .last_key_value()
70 .map(|(_, e)| e.log_id)
71 .or(inner.last_purged);
72 Ok(LogState {
73 last_purged_log_id: inner.last_purged,
74 last_log_id,
75 })
76 }
77
78 async fn get_log_reader(&mut self) -> Self::LogReader {
79 self.clone()
80 }
81
82 async fn save_vote(&mut self, vote: &Vote<u64>) -> Result<(), StorageError<u64>> {
83 let mut inner = self.inner.lock().await;
84 inner.vote = Some(*vote);
85 Ok(())
86 }
87
88 async fn read_vote(&mut self) -> Result<Option<Vote<u64>>, StorageError<u64>> {
89 let inner = self.inner.lock().await;
90 Ok(inner.vote)
91 }
92
93 async fn append<I>(
94 &mut self,
95 entries: I,
96 callback: LogFlushed<C>,
97 ) -> Result<(), StorageError<u64>>
98 where
99 I: IntoIterator<Item = Entry<C>> + Send,
100 I::IntoIter: Send,
101 {
102 let mut inner = self.inner.lock().await;
103 for entry in entries {
104 let idx = entry.log_id.index;
105 inner.log.insert(idx, entry);
106 }
107 callback.log_io_completed(Ok(()));
109 Ok(())
110 }
111
112 async fn truncate(&mut self, log_id: LogId<u64>) -> Result<(), StorageError<u64>> {
113 let mut inner = self.inner.lock().await;
114 let to_remove: Vec<u64> = inner.log.range(log_id.index..).map(|(k, _)| *k).collect();
115 for k in to_remove {
116 inner.log.remove(&k);
117 }
118 Ok(())
119 }
120
121 async fn purge(&mut self, log_id: LogId<u64>) -> Result<(), StorageError<u64>> {
122 let mut inner = self.inner.lock().await;
123 let to_remove: Vec<u64> = inner.log.range(..=log_id.index).map(|(k, _)| *k).collect();
124 for k in to_remove {
125 inner.log.remove(&k);
126 }
127 inner.last_purged = Some(log_id);
128 Ok(())
129 }
130}