Skip to main content

hiqlite_wal/
log_store_impl.rs

1use crate::{LogStore, LogStoreReader, reader, writer};
2use bincode::config::{Configuration, Fixint, LittleEndian};
3use bincode::error::{DecodeError, EncodeError};
4use openraft::storage::{LogFlushed, RaftLogStorage};
5use openraft::{
6    AnyError, ErrorSubject, ErrorVerb, LogId, OptionalSend, RaftLogId, RaftLogReader,
7    RaftTypeConfig, StorageError, StorageIOError, Vote,
8};
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use std::collections::Bound;
12use std::fmt::Debug;
13use std::ops::RangeBounds;
14use tokio::sync::oneshot;
15use tracing::debug;
16
17const BINCODE_CONFIG: Configuration<LittleEndian, Fixint> = bincode::config::legacy();
18
19#[inline(always)]
20pub fn serialize<T: Serialize>(value: &T) -> Result<Vec<u8>, EncodeError> {
21    // We are using the legacy config on purpose here. It uses fixed-width integer fields, which
22    // uses a bit more space, but is faster.
23    bincode::serde::encode_to_vec(value, BINCODE_CONFIG)
24}
25
26#[inline(always)]
27pub fn deserialize<T: DeserializeOwned>(bytes: &[u8]) -> Result<T, DecodeError> {
28    bincode::serde::decode_from_slice::<T, _>(bytes, BINCODE_CONFIG).map(|(res, _)| res)
29}
30
31impl<T> RaftLogReader<T> for LogStore<T>
32where
33    T: RaftTypeConfig,
34{
35    async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend>(
36        &mut self,
37        range: RB,
38    ) -> Result<Vec<T::Entry>, StorageError<T::NodeId>> {
39        try_get_log_entries::<T, _>(&self.reader, range).await
40    }
41}
42
43impl<T> RaftLogReader<T> for LogStoreReader<T>
44where
45    T: RaftTypeConfig,
46{
47    async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend>(
48        &mut self,
49        range: RB,
50    ) -> Result<Vec<T::Entry>, StorageError<T::NodeId>> {
51        try_get_log_entries::<T, _>(&self.tx, range).await
52    }
53}
54
55#[tracing::instrument(skip_all)]
56#[inline(always)]
57async fn try_get_log_entries<
58    T: RaftTypeConfig,
59    RB: RangeBounds<u64> + Clone + Debug + OptionalSend,
60>(
61    tx: &flume::Sender<reader::Action>,
62    range: RB,
63) -> Result<Vec<T::Entry>, StorageError<T::NodeId>> {
64    let from = match range.start_bound() {
65        Bound::Included(i) => *i,
66        Bound::Excluded(i) => *i + 1,
67        Bound::Unbounded => 0,
68    };
69    let until = match range.end_bound() {
70        Bound::Included(i) => *i,
71        Bound::Excluded(i) => *i - 1,
72        Bound::Unbounded => unreachable!(),
73    };
74    debug!("Entering try_get_log_entries() from {from} until {until}");
75
76    let mut res: Vec<T::Entry> = Vec::with_capacity((until - from) as usize + 1);
77
78    let (ack, rx) = flume::bounded(1);
79    tx.send_async(reader::Action::Logs { from, until, ack })
80        .await
81        .expect("LogsReader to always be listening");
82
83    while let Some(data_res) = rx.recv_async().await.unwrap() {
84        let data = data_res.map_err(|err| StorageError::IO {
85            source: StorageIOError::read_logs(&err),
86        })?;
87        let entry = deserialize::<T::Entry>(&data).map_err(|err| StorageError::IO {
88            source: StorageIOError::<T::NodeId>::read_logs(&err),
89        })?;
90        res.push(entry);
91    }
92
93    Ok(res)
94}
95
96impl<T> RaftLogStorage<T> for LogStore<T>
97where
98    T: RaftTypeConfig,
99{
100    type LogReader = LogStoreReader<T>;
101
102    #[tracing::instrument(skip_all)]
103    async fn get_log_state(&mut self) -> Result<openraft::LogState<T>, StorageError<T::NodeId>> {
104        debug!("Entering get_log_state()");
105
106        let (ack, rx) = oneshot::channel();
107        self.reader
108            .send_async(reader::Action::LogState(ack))
109            .await
110            .map_err(|err| {
111                StorageIOError::new(ErrorSubject::Logs, ErrorVerb::Read, AnyError::new(&err))
112            })?;
113
114        let log_state = rx.await.unwrap().map_err(|err| {
115            StorageIOError::new(ErrorSubject::Logs, ErrorVerb::Read, AnyError::new(&err))
116        })?;
117
118        let last_purged_log_id = if let Some(bytes) = log_state.last_purged_log_id {
119            Some(deserialize(&bytes).map_err(|err| {
120                StorageIOError::new(ErrorSubject::Logs, ErrorVerb::Read, AnyError::new(&err))
121            })?)
122        } else {
123            None
124        };
125        let last_log_id = if let Some(bytes) = log_state.last_log {
126            Some(deserialize(&bytes).map_err(|err| {
127                StorageIOError::new(ErrorSubject::Logs, ErrorVerb::Read, AnyError::new(&err))
128            })?)
129        } else {
130            None
131        };
132
133        Ok(openraft::LogState {
134            last_purged_log_id,
135            last_log_id,
136        })
137    }
138
139    #[tracing::instrument(level = "debug", skip_all)]
140    async fn get_log_reader(&mut self) -> Self::LogReader {
141        debug!("Entering get_log_reader()");
142
143        self.spawn_reader()
144            .expect("Error spawning additional LogStoreReader")
145    }
146
147    #[tracing::instrument(level = "debug", skip_all)]
148    async fn save_vote(&mut self, vote: &Vote<T::NodeId>) -> Result<(), StorageError<T::NodeId>> {
149        debug!("Entering save_vote(): {:?}", vote);
150
151        let (ack, rx) = oneshot::channel();
152        self.writer
153            .send_async(writer::Action::Vote {
154                value: serialize(vote).unwrap(),
155                ack,
156            })
157            .await
158            .expect("Writer to always be running");
159
160        rx.await.unwrap().map_err(|err| StorageError::IO {
161            source: StorageIOError::write_vote(&err),
162        })?;
163
164        Ok(())
165    }
166
167    #[tracing::instrument(level = "debug", skip_all)]
168    async fn read_vote(&mut self) -> Result<Option<Vote<T::NodeId>>, StorageError<T::NodeId>> {
169        debug!("Entering read_vote()");
170
171        let (ack, rx) = oneshot::channel();
172
173        self.reader
174            .send_async(reader::Action::Vote(ack))
175            .await
176            .map_err(|err| StorageError::IO {
177                source: StorageIOError::read_vote(&err),
178            })?;
179
180        let vote = rx
181            .await
182            .unwrap()
183            .map_err(|err| StorageError::IO {
184                source: StorageIOError::read_vote(&err),
185            })?
186            .map(|b| deserialize(&b).unwrap());
187
188        Ok(vote)
189    }
190
191    #[tracing::instrument(level = "debug", skip_all)]
192    async fn append<I>(
193        &mut self,
194        entries: I,
195        callback: LogFlushed<T>,
196    ) -> Result<(), StorageError<T::NodeId>>
197    where
198        I: IntoIterator<Item = T::Entry> + Send,
199        I::IntoIter: Send,
200    {
201        debug!("Entering append()");
202
203        let (tx, rx) = flume::bounded(1);
204        let (ack, ack_rx) = oneshot::channel();
205
206        let callback = Box::new(move || callback.log_io_completed(Ok(())));
207        self.writer
208            .send_async(writer::Action::Append { rx, callback, ack })
209            .await
210            .map_err(|err| StorageIOError::write_logs(&err))?;
211
212        for entry in entries {
213            let data = serialize(&entry).unwrap();
214            tx.send_async(Some((entry.get_log_id().index, data)))
215                .await
216                .map_err(|err| StorageIOError::write_logs(&err))?;
217        }
218        tx.send_async(None)
219            .await
220            .map_err(|err| StorageIOError::write_logs(&err))?;
221
222        ack_rx
223            .await
224            .unwrap()
225            .map_err(|err| StorageIOError::write_logs(&err))?;
226
227        Ok(())
228    }
229
230    #[tracing::instrument(level = "debug", skip_all)]
231    async fn truncate(&mut self, log_id: LogId<T::NodeId>) -> Result<(), StorageError<T::NodeId>> {
232        debug!("truncate(): [{:?}, +oo)", log_id);
233
234        let (ack, rx) = oneshot::channel();
235        self.writer
236            .send_async(writer::Action::Remove {
237                from: log_id.index,
238                until: u64::MAX,
239                last_log: None,
240                ack,
241            })
242            .await
243            .map_err(|err| StorageError::IO {
244                source: StorageIOError::write_logs(&err),
245            })?;
246
247        rx.await.unwrap().map_err(|err| StorageError::IO {
248            source: StorageIOError::write_logs(&err),
249        })
250    }
251
252    #[tracing::instrument(level = "debug", skip_all)]
253    async fn purge(&mut self, log_id: LogId<T::NodeId>) -> Result<(), StorageError<T::NodeId>> {
254        debug!("purge(): [0, {:?}]", log_id);
255
256        let last_log = Some(serialize(&log_id).unwrap());
257        let (ack, rx) = oneshot::channel();
258        self.writer
259            .send_async(writer::Action::Remove {
260                from: 0,
261                until: log_id.index,
262                last_log,
263                ack,
264            })
265            .await
266            .map_err(|err| StorageError::IO {
267                source: StorageIOError::write_logs(&err),
268            })?;
269
270        rx.await.unwrap().map_err(|err| StorageError::IO {
271            source: StorageIOError::write_logs(&err),
272        })
273    }
274}