1use crate::{LogStore, LogStoreReader, reader, writer};
2use bincode::config::{Configuration, Fixint, LittleEndian};
3use bincode::error::{DecodeError, EncodeError};
4use openraft::storage::{LogFlushed, RaftLogStorage};
5use openraft::{
6 AnyError, ErrorSubject, ErrorVerb, LogId, OptionalSend, RaftLogId, RaftLogReader,
7 RaftTypeConfig, StorageError, StorageIOError, Vote,
8};
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use std::collections::Bound;
12use std::fmt::Debug;
13use std::ops::RangeBounds;
14use tokio::sync::oneshot;
15use tracing::debug;
16
17const BINCODE_CONFIG: Configuration<LittleEndian, Fixint> = bincode::config::legacy();
18
19#[inline(always)]
20pub fn serialize<T: Serialize>(value: &T) -> Result<Vec<u8>, EncodeError> {
21 bincode::serde::encode_to_vec(value, BINCODE_CONFIG)
24}
25
26#[inline(always)]
27pub fn deserialize<T: DeserializeOwned>(bytes: &[u8]) -> Result<T, DecodeError> {
28 bincode::serde::decode_from_slice::<T, _>(bytes, BINCODE_CONFIG).map(|(res, _)| res)
29}
30
31impl<T> RaftLogReader<T> for LogStore<T>
32where
33 T: RaftTypeConfig,
34{
35 async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend>(
36 &mut self,
37 range: RB,
38 ) -> Result<Vec<T::Entry>, StorageError<T::NodeId>> {
39 try_get_log_entries::<T, _>(&self.reader, range).await
40 }
41}
42
43impl<T> RaftLogReader<T> for LogStoreReader<T>
44where
45 T: RaftTypeConfig,
46{
47 async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend>(
48 &mut self,
49 range: RB,
50 ) -> Result<Vec<T::Entry>, StorageError<T::NodeId>> {
51 try_get_log_entries::<T, _>(&self.tx, range).await
52 }
53}
54
55#[tracing::instrument(skip_all)]
56#[inline(always)]
57async fn try_get_log_entries<
58 T: RaftTypeConfig,
59 RB: RangeBounds<u64> + Clone + Debug + OptionalSend,
60>(
61 tx: &flume::Sender<reader::Action>,
62 range: RB,
63) -> Result<Vec<T::Entry>, StorageError<T::NodeId>> {
64 let from = match range.start_bound() {
65 Bound::Included(i) => *i,
66 Bound::Excluded(i) => *i + 1,
67 Bound::Unbounded => 0,
68 };
69 let until = match range.end_bound() {
70 Bound::Included(i) => *i,
71 Bound::Excluded(i) => *i - 1,
72 Bound::Unbounded => unreachable!(),
73 };
74 debug!("Entering try_get_log_entries() from {from} until {until}");
75
76 let mut res: Vec<T::Entry> = Vec::with_capacity((until - from) as usize + 1);
77
78 let (ack, rx) = flume::bounded(1);
79 tx.send_async(reader::Action::Logs { from, until, ack })
80 .await
81 .expect("LogsReader to always be listening");
82
83 while let Some(data_res) = rx.recv_async().await.unwrap() {
84 let data = data_res.map_err(|err| StorageError::IO {
85 source: StorageIOError::read_logs(&err),
86 })?;
87 let entry = deserialize::<T::Entry>(&data).map_err(|err| StorageError::IO {
88 source: StorageIOError::<T::NodeId>::read_logs(&err),
89 })?;
90 res.push(entry);
91 }
92
93 Ok(res)
94}
95
96impl<T> RaftLogStorage<T> for LogStore<T>
97where
98 T: RaftTypeConfig,
99{
100 type LogReader = LogStoreReader<T>;
101
102 #[tracing::instrument(skip_all)]
103 async fn get_log_state(&mut self) -> Result<openraft::LogState<T>, StorageError<T::NodeId>> {
104 debug!("Entering get_log_state()");
105
106 let (ack, rx) = oneshot::channel();
107 self.reader
108 .send_async(reader::Action::LogState(ack))
109 .await
110 .map_err(|err| {
111 StorageIOError::new(ErrorSubject::Logs, ErrorVerb::Read, AnyError::new(&err))
112 })?;
113
114 let log_state = rx.await.unwrap().map_err(|err| {
115 StorageIOError::new(ErrorSubject::Logs, ErrorVerb::Read, AnyError::new(&err))
116 })?;
117
118 let last_purged_log_id = if let Some(bytes) = log_state.last_purged_log_id {
119 Some(deserialize(&bytes).map_err(|err| {
120 StorageIOError::new(ErrorSubject::Logs, ErrorVerb::Read, AnyError::new(&err))
121 })?)
122 } else {
123 None
124 };
125 let last_log_id = if let Some(bytes) = log_state.last_log {
126 Some(deserialize(&bytes).map_err(|err| {
127 StorageIOError::new(ErrorSubject::Logs, ErrorVerb::Read, AnyError::new(&err))
128 })?)
129 } else {
130 None
131 };
132
133 Ok(openraft::LogState {
134 last_purged_log_id,
135 last_log_id,
136 })
137 }
138
139 #[tracing::instrument(level = "debug", skip_all)]
140 async fn get_log_reader(&mut self) -> Self::LogReader {
141 debug!("Entering get_log_reader()");
142
143 self.spawn_reader()
144 .expect("Error spawning additional LogStoreReader")
145 }
146
147 #[tracing::instrument(level = "debug", skip_all)]
148 async fn save_vote(&mut self, vote: &Vote<T::NodeId>) -> Result<(), StorageError<T::NodeId>> {
149 debug!("Entering save_vote(): {:?}", vote);
150
151 let (ack, rx) = oneshot::channel();
152 self.writer
153 .send_async(writer::Action::Vote {
154 value: serialize(vote).unwrap(),
155 ack,
156 })
157 .await
158 .expect("Writer to always be running");
159
160 rx.await.unwrap().map_err(|err| StorageError::IO {
161 source: StorageIOError::write_vote(&err),
162 })?;
163
164 Ok(())
165 }
166
167 #[tracing::instrument(level = "debug", skip_all)]
168 async fn read_vote(&mut self) -> Result<Option<Vote<T::NodeId>>, StorageError<T::NodeId>> {
169 debug!("Entering read_vote()");
170
171 let (ack, rx) = oneshot::channel();
172
173 self.reader
174 .send_async(reader::Action::Vote(ack))
175 .await
176 .map_err(|err| StorageError::IO {
177 source: StorageIOError::read_vote(&err),
178 })?;
179
180 let vote = rx
181 .await
182 .unwrap()
183 .map_err(|err| StorageError::IO {
184 source: StorageIOError::read_vote(&err),
185 })?
186 .map(|b| deserialize(&b).unwrap());
187
188 Ok(vote)
189 }
190
191 #[tracing::instrument(level = "debug", skip_all)]
192 async fn append<I>(
193 &mut self,
194 entries: I,
195 callback: LogFlushed<T>,
196 ) -> Result<(), StorageError<T::NodeId>>
197 where
198 I: IntoIterator<Item = T::Entry> + Send,
199 I::IntoIter: Send,
200 {
201 debug!("Entering append()");
202
203 let (tx, rx) = flume::bounded(1);
204 let (ack, ack_rx) = oneshot::channel();
205
206 let callback = Box::new(move || callback.log_io_completed(Ok(())));
207 self.writer
208 .send_async(writer::Action::Append { rx, callback, ack })
209 .await
210 .map_err(|err| StorageIOError::write_logs(&err))?;
211
212 for entry in entries {
213 let data = serialize(&entry).unwrap();
214 tx.send_async(Some((entry.get_log_id().index, data)))
215 .await
216 .map_err(|err| StorageIOError::write_logs(&err))?;
217 }
218 tx.send_async(None)
219 .await
220 .map_err(|err| StorageIOError::write_logs(&err))?;
221
222 ack_rx
223 .await
224 .unwrap()
225 .map_err(|err| StorageIOError::write_logs(&err))?;
226
227 Ok(())
228 }
229
230 #[tracing::instrument(level = "debug", skip_all)]
231 async fn truncate(&mut self, log_id: LogId<T::NodeId>) -> Result<(), StorageError<T::NodeId>> {
232 debug!("truncate(): [{:?}, +oo)", log_id);
233
234 let (ack, rx) = oneshot::channel();
235 self.writer
236 .send_async(writer::Action::Remove {
237 from: log_id.index,
238 until: u64::MAX,
239 last_log: None,
240 ack,
241 })
242 .await
243 .map_err(|err| StorageError::IO {
244 source: StorageIOError::write_logs(&err),
245 })?;
246
247 rx.await.unwrap().map_err(|err| StorageError::IO {
248 source: StorageIOError::write_logs(&err),
249 })
250 }
251
252 #[tracing::instrument(level = "debug", skip_all)]
253 async fn purge(&mut self, log_id: LogId<T::NodeId>) -> Result<(), StorageError<T::NodeId>> {
254 debug!("purge(): [0, {:?}]", log_id);
255
256 let last_log = Some(serialize(&log_id).unwrap());
257 let (ack, rx) = oneshot::channel();
258 self.writer
259 .send_async(writer::Action::Remove {
260 from: 0,
261 until: log_id.index,
262 last_log,
263 ack,
264 })
265 .await
266 .map_err(|err| StorageError::IO {
267 source: StorageIOError::write_logs(&err),
268 })?;
269
270 rx.await.unwrap().map_err(|err| StorageError::IO {
271 source: StorageIOError::write_logs(&err),
272 })
273 }
274}