Skip to main content

hermesmq_core/engine/
log_store.rs

1use std::collections::BTreeMap;
2use std::fmt::Debug;
3use std::ops::{Bound, RangeBounds};
4use std::sync::{Arc, Mutex};
5
6use bytes::Bytes;
7use openraft::storage::{LogFlushed, RaftLogReader, RaftLogStorage};
8use openraft::{Entry, LogId, LogState, OptionalSend, StorageError, Vote};
9use tokio::sync::{mpsc, oneshot};
10
11use super::{dec, enc, sread, swrite};
12use crate::raft::TypeConfig;
13use crate::storage::Storage;
14use crate::types::NodeId;
15use crate::RedbStore;
16
17const KEY_PURGED: &str = "log:purged";
18const FLUSH_MAX_ENTRIES: usize = 512;
19const FLUSH_MAX_BYTES: usize = 8 * 1024 * 1024;
20
21enum FlushJob {
22    Append(Vec<(u64, Bytes)>, LogFlushed<TypeConfig>),
23    Barrier(oneshot::Sender<()>),
24}
25
26struct Shared<S> {
27    db: Arc<S>,
28    pending: Mutex<BTreeMap<u64, Bytes>>,
29}
30
31pub struct LogStore<S = RedbStore> {
32    shared: Arc<Shared<S>>,
33    jobs: mpsc::UnboundedSender<FlushJob>,
34}
35
36impl<S> Clone for LogStore<S> {
37    fn clone(&self) -> Self {
38        Self {
39            shared: Arc::clone(&self.shared),
40            jobs: self.jobs.clone(),
41        }
42    }
43}
44
45impl<S: Storage> LogStore<S> {
46    pub fn new(db: Arc<S>) -> Self {
47        let shared = Arc::new(Shared {
48            db,
49            pending: Mutex::new(BTreeMap::new()),
50        });
51        let (jobs, rx) = mpsc::unbounded_channel();
52        let flusher = Arc::clone(&shared);
53        std::thread::spawn(move || run_flusher(flusher, rx));
54        Self { shared, jobs }
55    }
56
57    async fn barrier(&self) {
58        let (tx, rx) = oneshot::channel();
59        if self.jobs.send(FlushJob::Barrier(tx)).is_ok() {
60            let _ = rx.await;
61        }
62    }
63
64    fn pending_last(&self) -> Option<u64> {
65        self.shared
66            .pending
67            .lock()
68            .unwrap()
69            .keys()
70            .next_back()
71            .copied()
72    }
73}
74
75fn run_flusher<S: Storage>(shared: Arc<Shared<S>>, mut rx: mpsc::UnboundedReceiver<FlushJob>) {
76    while let Some(first) = rx.blocking_recv() {
77        let mut batch: Vec<(u64, Bytes)> = Vec::new();
78        let mut callbacks = Vec::new();
79        let mut barriers = Vec::new();
80        let mut bytes = 0usize;
81        let mut job = Some(first);
82        loop {
83            match job {
84                Some(FlushJob::Append(entries, callback)) => {
85                    bytes += entries.iter().map(|(_, b)| b.len()).sum::<usize>();
86                    batch.extend(entries);
87                    callbacks.push(callback);
88                }
89                Some(FlushJob::Barrier(done)) => {
90                    barriers.push(done);
91                    break;
92                }
93                None => break,
94            }
95            if batch.len() >= FLUSH_MAX_ENTRIES || bytes >= FLUSH_MAX_BYTES {
96                break;
97            }
98            job = rx.try_recv().ok();
99        }
100
101        let result = if batch.is_empty() {
102            Ok(())
103        } else {
104            shared.db.append_log(&batch)
105        };
106        match result {
107            Ok(()) => {
108                if !batch.is_empty() {
109                    let mut pending = shared.pending.lock().unwrap();
110                    for (index, _) in &batch {
111                        pending.remove(index);
112                    }
113                }
114                for callback in callbacks {
115                    callback.log_io_completed(Ok(()));
116                }
117            }
118            Err(e) => {
119                let msg = e.to_string();
120                for callback in callbacks {
121                    callback.log_io_completed(Err(std::io::Error::other(msg.clone())));
122                }
123            }
124        }
125        for done in barriers {
126            let _ = done.send(());
127        }
128    }
129}
130
131impl<S: Storage> RaftLogReader<TypeConfig> for LogStore<S> {
132    async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend>(
133        &mut self,
134        range: RB,
135    ) -> Result<Vec<Entry<TypeConfig>>, StorageError<NodeId>> {
136        let start = match range.start_bound() {
137            Bound::Included(x) => *x,
138            Bound::Excluded(x) => *x + 1,
139            Bound::Unbounded => 0,
140        };
141        let end = match range.end_bound() {
142            Bound::Included(x) => *x + 1,
143            Bound::Excluded(x) => *x,
144            Bound::Unbounded => {
145                let db_last = self.shared.db.last_log_index().map_err(sread)?;
146                db_last
147                    .into_iter()
148                    .chain(self.pending_last())
149                    .max()
150                    .map(|i| i + 1)
151                    .unwrap_or(0)
152            }
153        };
154
155        let mut merged: BTreeMap<u64, Bytes> = BTreeMap::new();
156        for (index, bytes) in self.shared.db.read_log(start, end).map_err(sread)? {
157            merged.insert(index, Bytes::from(bytes));
158        }
159        {
160            let pending = self.shared.pending.lock().unwrap();
161            for (index, bytes) in pending.range(start..end) {
162                merged.insert(*index, bytes.clone());
163            }
164        }
165        let mut out = Vec::with_capacity(merged.len());
166        for bytes in merged.values() {
167            out.push(dec::<Entry<TypeConfig>>(bytes)?);
168        }
169        Ok(out)
170    }
171}
172
173impl<S: Storage> RaftLogStorage<TypeConfig> for LogStore<S> {
174    type LogReader = Self;
175
176    async fn get_log_state(&mut self) -> Result<LogState<TypeConfig>, StorageError<NodeId>> {
177        let last_purged: Option<LogId<NodeId>> = match self.shared.db.get(KEY_PURGED).map_err(sread)? {
178            Some(b) => Some(dec(&b)?),
179            None => None,
180        };
181
182        let db_last = self.shared.db.last_log_index().map_err(sread)?;
183        let last_index = db_last.into_iter().chain(self.pending_last()).max();
184        let last_log_id = match last_index {
185            Some(index) => {
186                let bytes = {
187                    let pending = self.shared.pending.lock().unwrap();
188                    pending.get(&index).cloned()
189                };
190                let bytes = match bytes {
191                    Some(b) => Some(b),
192                    None => self
193                        .shared
194                        .db
195                        .read_log(index, index + 1)
196                        .map_err(sread)?
197                        .into_iter()
198                        .next()
199                        .map(|(_, b)| Bytes::from(b)),
200                };
201                match bytes {
202                    Some(b) => Some(dec::<Entry<TypeConfig>>(&b)?.log_id),
203                    None => last_purged,
204                }
205            }
206            None => last_purged,
207        };
208
209        Ok(LogState {
210            last_purged_log_id: last_purged,
211            last_log_id,
212        })
213    }
214
215    async fn get_log_reader(&mut self) -> Self::LogReader {
216        self.clone()
217    }
218
219    async fn save_vote(&mut self, vote: &Vote<NodeId>) -> Result<(), StorageError<NodeId>> {
220        let bytes = enc(vote)?;
221        self.shared.db.save_vote(&bytes).map_err(swrite)
222    }
223
224    async fn read_vote(&mut self) -> Result<Option<Vote<NodeId>>, StorageError<NodeId>> {
225        match self.shared.db.read_vote().map_err(sread)? {
226            Some(b) => Ok(Some(dec(&b)?)),
227            None => Ok(None),
228        }
229    }
230
231    async fn save_committed(
232        &mut self,
233        committed: Option<LogId<NodeId>>,
234    ) -> Result<(), StorageError<NodeId>> {
235        let bytes = enc(&committed)?;
236        self.shared.db.save_committed(&bytes).map_err(swrite)
237    }
238
239    async fn read_committed(&mut self) -> Result<Option<LogId<NodeId>>, StorageError<NodeId>> {
240        match self.shared.db.read_committed().map_err(sread)? {
241            Some(b) => dec::<Option<LogId<NodeId>>>(&b),
242            None => Ok(None),
243        }
244    }
245
246    async fn append<I>(
247        &mut self,
248        entries: I,
249        callback: LogFlushed<TypeConfig>,
250    ) -> Result<(), StorageError<NodeId>>
251    where
252        I: IntoIterator<Item = Entry<TypeConfig>> + OptionalSend,
253        I::IntoIter: OptionalSend,
254    {
255        let mut batch = Vec::new();
256        for entry in entries {
257            batch.push((entry.log_id.index, Bytes::from(enc(&entry)?)));
258        }
259        {
260            let mut pending = self.shared.pending.lock().unwrap();
261            for (index, bytes) in &batch {
262                pending.insert(*index, bytes.clone());
263            }
264        }
265        if self.jobs.send(FlushJob::Append(batch, callback)).is_err() {
266            return Err(swrite("log flusher thread is gone"));
267        }
268        Ok(())
269    }
270
271    async fn truncate(&mut self, log_id: LogId<NodeId>) -> Result<(), StorageError<NodeId>> {
272        self.barrier().await;
273        self.shared
274            .pending
275            .lock()
276            .unwrap()
277            .split_off(&log_id.index);
278        self.shared
279            .db
280            .truncate_log_from(log_id.index)
281            .map_err(swrite)
282    }
283
284    async fn purge(&mut self, log_id: LogId<NodeId>) -> Result<(), StorageError<NodeId>> {
285        let bytes = enc(&log_id)?;
286        self.shared.db.put(KEY_PURGED, &bytes).map_err(swrite)?;
287        self.shared.db.purge_log_upto(log_id.index).map_err(swrite)
288    }
289}