Skip to main content

asteroid_mq/protocol/node/raft/
log_storage.rs

1use super::TypeConfig;
2use openraft::{storage::RaftLogStorage, LogId, RaftLogReader, RaftTypeConfig, Vote};
3use openraft::{LogState, RaftLogId, StorageError};
4use std::fmt::Debug;
5use std::ops::RangeBounds;
6use std::{collections::BTreeMap, sync::Arc};
7
8#[derive(Clone)]
9pub struct LogStorage<C: RaftTypeConfig = TypeConfig> {
10    inner: Arc<tokio::sync::Mutex<LogStorageInner<C>>>,
11}
12
13impl Default for LogStorage<TypeConfig> {
14    fn default() -> Self {
15        Self {
16            inner: Arc::new(tokio::sync::Mutex::new(LogStorageInner::default())),
17        }
18    }
19}
20#[derive(Debug, Default)]
21pub struct LogStorageInner<C: RaftTypeConfig> {
22    /// The last purged log id.
23    last_purged_log_id: Option<LogId<C::NodeId>>,
24
25    /// The Raft log.
26    log: BTreeMap<u64, C::Entry>,
27
28    /// The commit log id.
29    committed: Option<LogId<C::NodeId>>,
30
31    /// The current granted vote.
32    vote: Option<Vote<C::NodeId>>,
33}
34
35impl<C: RaftTypeConfig> LogStorageInner<C> {
36    pub fn new() -> Self {
37        Self {
38            last_purged_log_id: None,
39            log: BTreeMap::new(),
40            committed: None,
41            vote: None,
42        }
43    }
44    async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug>(
45        &mut self,
46        range: RB,
47    ) -> Result<Vec<C::Entry>, StorageError<C::NodeId>>
48    where
49        C::Entry: Clone,
50    {
51        let response = self
52            .log
53            .range(range.clone())
54            .map(|(_, val)| val.clone())
55            .collect::<Vec<_>>();
56        Ok(response)
57    }
58
59    async fn get_log_state(&self) -> Result<openraft::LogState<C>, StorageError<C::NodeId>> {
60        let last = self
61            .log
62            .iter()
63            .next_back()
64            .map(|(_, ent)| ent.get_log_id().clone());
65
66        let last_purged = self.last_purged_log_id.clone();
67
68        let last = match last {
69            None => last_purged.clone(),
70            Some(x) => Some(x),
71        };
72
73        Ok(LogState {
74            last_purged_log_id: last_purged,
75            last_log_id: last,
76        })
77    }
78
79    async fn save_committed(
80        &mut self,
81        committed: Option<LogId<C::NodeId>>,
82    ) -> Result<(), StorageError<C::NodeId>> {
83        self.committed = committed;
84        Ok(())
85    }
86    async fn read_committed(
87        &mut self,
88    ) -> Result<Option<LogId<C::NodeId>>, StorageError<C::NodeId>> {
89        Ok(self.committed.clone())
90    }
91
92    async fn save_vote(&mut self, vote: &Vote<C::NodeId>) -> Result<(), StorageError<C::NodeId>> {
93        self.vote = Some(vote.clone());
94        Ok(())
95    }
96
97    async fn read_vote(&mut self) -> Result<Option<Vote<C::NodeId>>, StorageError<C::NodeId>> {
98        Ok(self.vote.clone())
99    }
100
101    async fn append<I>(
102        &mut self,
103        entries: I,
104        callback: openraft::storage::LogFlushed<C>,
105    ) -> Result<(), StorageError<C::NodeId>>
106    where
107        I: IntoIterator<Item = C::Entry>,
108    {
109        // Simple implementation that calls the flush-before-return `append_to_log`.
110        for entry in entries {
111            self.log.insert(entry.get_log_id().index, entry);
112        }
113        callback.log_io_completed(Ok(()));
114
115        Ok(())
116    }
117
118    async fn truncate(&mut self, log_id: LogId<C::NodeId>) -> Result<(), StorageError<C::NodeId>> {
119        let keys = self
120            .log
121            .range(log_id.index..)
122            .map(|(k, _v)| *k)
123            .collect::<Vec<_>>();
124        for key in keys {
125            self.log.remove(&key);
126        }
127
128        Ok(())
129    }
130
131    async fn purge(&mut self, log_id: LogId<C::NodeId>) -> Result<(), StorageError<C::NodeId>> {
132        {
133            let ld = &mut self.last_purged_log_id;
134            assert!(*ld <= Some(log_id.clone()));
135            *ld = Some(log_id.clone());
136        }
137
138        {
139            let keys = self
140                .log
141                .range(..=log_id.index)
142                .map(|(k, _v)| *k)
143                .collect::<Vec<_>>();
144            for key in keys {
145                self.log.remove(&key);
146            }
147        }
148
149        Ok(())
150    }
151}
152
153impl RaftLogReader<TypeConfig> for LogStorage<TypeConfig> {
154    async fn try_get_log_entries<
155        RB: std::ops::RangeBounds<u64> + Clone + std::fmt::Debug + openraft::OptionalSend,
156    >(
157        &mut self,
158        range: RB,
159    ) -> Result<
160        Vec<<TypeConfig as openraft::RaftTypeConfig>::Entry>,
161        openraft::StorageError<<TypeConfig as openraft::RaftTypeConfig>::NodeId>,
162    > {
163        self.inner.lock().await.try_get_log_entries(range).await
164    }
165}
166
167impl RaftLogStorage<TypeConfig> for LogStorage<TypeConfig> {
168    type LogReader = Self;
169    async fn append<I>(
170        &mut self,
171        entries: I,
172        callback: openraft::storage::LogFlushed<TypeConfig>,
173    ) -> Result<(), openraft::StorageError<<TypeConfig as openraft::RaftTypeConfig>::NodeId>>
174    where
175        I: IntoIterator<Item = <TypeConfig as openraft::RaftTypeConfig>::Entry>
176            + openraft::OptionalSend,
177        I::IntoIter: openraft::OptionalSend,
178    {
179        self.inner.lock().await.append(entries, callback).await
180    }
181    async fn get_log_reader(&mut self) -> Self::LogReader {
182        self.clone()
183    }
184    async fn get_log_state(
185        &mut self,
186    ) -> Result<
187        openraft::LogState<TypeConfig>,
188        openraft::StorageError<<TypeConfig as openraft::RaftTypeConfig>::NodeId>,
189    > {
190        self.inner.lock().await.get_log_state().await
191    }
192    async fn purge(
193        &mut self,
194        log_id: openraft::LogId<<TypeConfig as openraft::RaftTypeConfig>::NodeId>,
195    ) -> Result<(), openraft::StorageError<<TypeConfig as openraft::RaftTypeConfig>::NodeId>> {
196        self.inner.lock().await.purge(log_id).await
197    }
198    async fn read_committed(
199        &mut self,
200    ) -> Result<
201        Option<openraft::LogId<<TypeConfig as openraft::RaftTypeConfig>::NodeId>>,
202        openraft::StorageError<<TypeConfig as openraft::RaftTypeConfig>::NodeId>,
203    > {
204        self.inner.lock().await.read_committed().await
205    }
206    async fn save_committed(
207        &mut self,
208        committed: Option<openraft::LogId<<TypeConfig as openraft::RaftTypeConfig>::NodeId>>,
209    ) -> Result<(), openraft::StorageError<<TypeConfig as openraft::RaftTypeConfig>::NodeId>> {
210        self.inner.lock().await.save_committed(committed).await
211    }
212    async fn read_vote(
213        &mut self,
214    ) -> Result<
215        Option<openraft::Vote<<TypeConfig as openraft::RaftTypeConfig>::NodeId>>,
216        openraft::StorageError<<TypeConfig as openraft::RaftTypeConfig>::NodeId>,
217    > {
218        self.inner.lock().await.read_vote().await
219    }
220    async fn save_vote(
221        &mut self,
222        vote: &openraft::Vote<<TypeConfig as openraft::RaftTypeConfig>::NodeId>,
223    ) -> Result<(), openraft::StorageError<<TypeConfig as openraft::RaftTypeConfig>::NodeId>> {
224        self.inner.lock().await.save_vote(vote).await
225    }
226    async fn truncate(
227        &mut self,
228        log_id: openraft::LogId<<TypeConfig as openraft::RaftTypeConfig>::NodeId>,
229    ) -> Result<(), openraft::StorageError<<TypeConfig as openraft::RaftTypeConfig>::NodeId>> {
230        self.inner.lock().await.truncate(log_id).await
231    }
232}