1use std::collections::BTreeMap;
2use std::fmt::Debug;
3use std::ops::{Bound, RangeBounds};
4use std::sync::{Arc, Mutex};
5
6use bytes::Bytes;
7use openraft::storage::{LogFlushed, RaftLogReader, RaftLogStorage};
8use openraft::{Entry, LogId, LogState, OptionalSend, StorageError, Vote};
9use tokio::sync::{mpsc, oneshot};
10
11use super::{dec, enc, sread, swrite};
12use crate::raft::TypeConfig;
13use crate::storage::Storage;
14use crate::types::NodeId;
15use crate::RedbStore;
16
17const KEY_PURGED: &str = "log:purged";
18const FLUSH_MAX_ENTRIES: usize = 512;
19const FLUSH_MAX_BYTES: usize = 8 * 1024 * 1024;
20
21enum FlushJob {
22 Append(Vec<(u64, Bytes)>, LogFlushed<TypeConfig>),
23 Barrier(oneshot::Sender<()>),
24}
25
26struct Shared<S> {
27 db: Arc<S>,
28 pending: Mutex<BTreeMap<u64, Bytes>>,
29}
30
31pub struct LogStore<S = RedbStore> {
32 shared: Arc<Shared<S>>,
33 jobs: mpsc::UnboundedSender<FlushJob>,
34}
35
36impl<S> Clone for LogStore<S> {
37 fn clone(&self) -> Self {
38 Self {
39 shared: Arc::clone(&self.shared),
40 jobs: self.jobs.clone(),
41 }
42 }
43}
44
45impl<S: Storage> LogStore<S> {
46 pub fn new(db: Arc<S>) -> Self {
47 let shared = Arc::new(Shared {
48 db,
49 pending: Mutex::new(BTreeMap::new()),
50 });
51 let (jobs, rx) = mpsc::unbounded_channel();
52 let flusher = Arc::clone(&shared);
53 std::thread::spawn(move || run_flusher(flusher, rx));
54 Self { shared, jobs }
55 }
56
57 async fn barrier(&self) {
58 let (tx, rx) = oneshot::channel();
59 if self.jobs.send(FlushJob::Barrier(tx)).is_ok() {
60 let _ = rx.await;
61 }
62 }
63
64 fn pending_last(&self) -> Option<u64> {
65 self.shared
66 .pending
67 .lock()
68 .unwrap()
69 .keys()
70 .next_back()
71 .copied()
72 }
73}
74
75fn run_flusher<S: Storage>(shared: Arc<Shared<S>>, mut rx: mpsc::UnboundedReceiver<FlushJob>) {
76 while let Some(first) = rx.blocking_recv() {
77 let mut batch: Vec<(u64, Bytes)> = Vec::new();
78 let mut callbacks = Vec::new();
79 let mut barriers = Vec::new();
80 let mut bytes = 0usize;
81 let mut job = Some(first);
82 loop {
83 match job {
84 Some(FlushJob::Append(entries, callback)) => {
85 bytes += entries.iter().map(|(_, b)| b.len()).sum::<usize>();
86 batch.extend(entries);
87 callbacks.push(callback);
88 }
89 Some(FlushJob::Barrier(done)) => {
90 barriers.push(done);
91 break;
92 }
93 None => break,
94 }
95 if batch.len() >= FLUSH_MAX_ENTRIES || bytes >= FLUSH_MAX_BYTES {
96 break;
97 }
98 job = rx.try_recv().ok();
99 }
100
101 let result = if batch.is_empty() {
102 Ok(())
103 } else {
104 shared.db.append_log(&batch)
105 };
106 match result {
107 Ok(()) => {
108 if !batch.is_empty() {
109 let mut pending = shared.pending.lock().unwrap();
110 for (index, _) in &batch {
111 pending.remove(index);
112 }
113 }
114 for callback in callbacks {
115 callback.log_io_completed(Ok(()));
116 }
117 }
118 Err(e) => {
119 let msg = e.to_string();
120 for callback in callbacks {
121 callback.log_io_completed(Err(std::io::Error::other(msg.clone())));
122 }
123 }
124 }
125 for done in barriers {
126 let _ = done.send(());
127 }
128 }
129}
130
131impl<S: Storage> RaftLogReader<TypeConfig> for LogStore<S> {
132 async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend>(
133 &mut self,
134 range: RB,
135 ) -> Result<Vec<Entry<TypeConfig>>, StorageError<NodeId>> {
136 let start = match range.start_bound() {
137 Bound::Included(x) => *x,
138 Bound::Excluded(x) => *x + 1,
139 Bound::Unbounded => 0,
140 };
141 let end = match range.end_bound() {
142 Bound::Included(x) => *x + 1,
143 Bound::Excluded(x) => *x,
144 Bound::Unbounded => {
145 let db_last = self.shared.db.last_log_index().map_err(sread)?;
146 db_last
147 .into_iter()
148 .chain(self.pending_last())
149 .max()
150 .map(|i| i + 1)
151 .unwrap_or(0)
152 }
153 };
154
155 let mut merged: BTreeMap<u64, Bytes> = BTreeMap::new();
156 for (index, bytes) in self.shared.db.read_log(start, end).map_err(sread)? {
157 merged.insert(index, Bytes::from(bytes));
158 }
159 {
160 let pending = self.shared.pending.lock().unwrap();
161 for (index, bytes) in pending.range(start..end) {
162 merged.insert(*index, bytes.clone());
163 }
164 }
165 let mut out = Vec::with_capacity(merged.len());
166 for bytes in merged.values() {
167 out.push(dec::<Entry<TypeConfig>>(bytes)?);
168 }
169 Ok(out)
170 }
171}
172
173impl<S: Storage> RaftLogStorage<TypeConfig> for LogStore<S> {
174 type LogReader = Self;
175
176 async fn get_log_state(&mut self) -> Result<LogState<TypeConfig>, StorageError<NodeId>> {
177 let last_purged: Option<LogId<NodeId>> = match self.shared.db.get(KEY_PURGED).map_err(sread)? {
178 Some(b) => Some(dec(&b)?),
179 None => None,
180 };
181
182 let db_last = self.shared.db.last_log_index().map_err(sread)?;
183 let last_index = db_last.into_iter().chain(self.pending_last()).max();
184 let last_log_id = match last_index {
185 Some(index) => {
186 let bytes = {
187 let pending = self.shared.pending.lock().unwrap();
188 pending.get(&index).cloned()
189 };
190 let bytes = match bytes {
191 Some(b) => Some(b),
192 None => self
193 .shared
194 .db
195 .read_log(index, index + 1)
196 .map_err(sread)?
197 .into_iter()
198 .next()
199 .map(|(_, b)| Bytes::from(b)),
200 };
201 match bytes {
202 Some(b) => Some(dec::<Entry<TypeConfig>>(&b)?.log_id),
203 None => last_purged,
204 }
205 }
206 None => last_purged,
207 };
208
209 Ok(LogState {
210 last_purged_log_id: last_purged,
211 last_log_id,
212 })
213 }
214
215 async fn get_log_reader(&mut self) -> Self::LogReader {
216 self.clone()
217 }
218
219 async fn save_vote(&mut self, vote: &Vote<NodeId>) -> Result<(), StorageError<NodeId>> {
220 let bytes = enc(vote)?;
221 self.shared.db.save_vote(&bytes).map_err(swrite)
222 }
223
224 async fn read_vote(&mut self) -> Result<Option<Vote<NodeId>>, StorageError<NodeId>> {
225 match self.shared.db.read_vote().map_err(sread)? {
226 Some(b) => Ok(Some(dec(&b)?)),
227 None => Ok(None),
228 }
229 }
230
231 async fn save_committed(
232 &mut self,
233 committed: Option<LogId<NodeId>>,
234 ) -> Result<(), StorageError<NodeId>> {
235 let bytes = enc(&committed)?;
236 self.shared.db.save_committed(&bytes).map_err(swrite)
237 }
238
239 async fn read_committed(&mut self) -> Result<Option<LogId<NodeId>>, StorageError<NodeId>> {
240 match self.shared.db.read_committed().map_err(sread)? {
241 Some(b) => dec::<Option<LogId<NodeId>>>(&b),
242 None => Ok(None),
243 }
244 }
245
246 async fn append<I>(
247 &mut self,
248 entries: I,
249 callback: LogFlushed<TypeConfig>,
250 ) -> Result<(), StorageError<NodeId>>
251 where
252 I: IntoIterator<Item = Entry<TypeConfig>> + OptionalSend,
253 I::IntoIter: OptionalSend,
254 {
255 let mut batch = Vec::new();
256 for entry in entries {
257 batch.push((entry.log_id.index, Bytes::from(enc(&entry)?)));
258 }
259 {
260 let mut pending = self.shared.pending.lock().unwrap();
261 for (index, bytes) in &batch {
262 pending.insert(*index, bytes.clone());
263 }
264 }
265 if self.jobs.send(FlushJob::Append(batch, callback)).is_err() {
266 return Err(swrite("log flusher thread is gone"));
267 }
268 Ok(())
269 }
270
271 async fn truncate(&mut self, log_id: LogId<NodeId>) -> Result<(), StorageError<NodeId>> {
272 self.barrier().await;
273 self.shared
274 .pending
275 .lock()
276 .unwrap()
277 .split_off(&log_id.index);
278 self.shared
279 .db
280 .truncate_log_from(log_id.index)
281 .map_err(swrite)
282 }
283
284 async fn purge(&mut self, log_id: LogId<NodeId>) -> Result<(), StorageError<NodeId>> {
285 let bytes = enc(&log_id)?;
286 self.shared.db.put(KEY_PURGED, &bytes).map_err(swrite)?;
287 self.shared.db.purge_log_upto(log_id.index).map_err(swrite)
288 }
289}