openraft_sledstore/
lib.rs

1#![deny(unused_crate_dependencies)]
2#![deny(unused_qualifications)]
3
4#[cfg(test)]
5mod test;
6
7use std::collections::BTreeMap;
8use std::error::Error;
9use std::fmt::Debug;
10use std::io::Cursor;
11use std::ops::RangeBounds;
12use std::sync::Arc;
13
14use async_std::sync::RwLock;
15use byteorder::BigEndian;
16use byteorder::ByteOrder;
17use byteorder::ReadBytesExt;
18use openraft::storage::LogState;
19use openraft::storage::Snapshot;
20use openraft::AnyError;
21use openraft::BasicNode;
22use openraft::Entry;
23use openraft::EntryPayload;
24use openraft::LogId;
25use openraft::OptionalSend;
26use openraft::RaftLogId;
27use openraft::RaftLogReader;
28use openraft::RaftSnapshotBuilder;
29use openraft::RaftStorage;
30use openraft::RaftTypeConfig;
31use openraft::SnapshotMeta;
32use openraft::StorageError;
33use openraft::StorageIOError;
34use openraft::StoredMembership;
35use openraft::Vote;
36use serde::Deserialize;
37use serde::Serialize;
38use sled::Transactional;
39
40pub type ExampleNodeId = u64;
41
42openraft::declare_raft_types!(
43    /// Declare the type configuration for example K/V store.
44    pub TypeConfig:
45        D = ExampleRequest,
46        R = ExampleResponse,
47);
48
49/**
50 * Here you will set the types of request that will interact with the raft nodes.
51 * For example the `Set` will be used to write data (key and value) to the raft database.
52 * The `AddNode` will append a new node to the current existing shared list of nodes.
53 * You will want to add any request that can write data in all nodes here.
54 */
55#[derive(Serialize, Deserialize, Debug, Clone)]
56pub enum ExampleRequest {
57    Set { key: String, value: String },
58}
59
60/**
61 * Here you will defined what type of answer you expect from reading the data of a node.
62 * In this example it will return a optional value from a given key in
63 * the `ExampleRequest.Set`.
64 *
65 * TODO: Should we explain how to create multiple `AppDataResponse`?
66 *
67 */
68#[derive(Serialize, Deserialize, Debug, Clone)]
69pub struct ExampleResponse {
70    pub value: Option<String>,
71}
72
73#[derive(Serialize, Deserialize, Debug)]
74pub struct ExampleSnapshot {
75    pub meta: SnapshotMeta<ExampleNodeId, BasicNode>,
76
77    /// The data of the state machine at the time of this snapshot.
78    pub data: Vec<u8>,
79}
80
81/**
82 * Here defines a state machine of the raft, this state represents a copy of the data
83 * between each node. Note that we are using `serde` to serialize the `data`, which has
84 * a implementation to be serialized. Note that for this test we set both the key and
85 * value as String, but you could set any type of value that has the serialization impl.
86 */
87#[derive(Serialize, Deserialize, Debug, Default, Clone)]
88pub struct SerializableExampleStateMachine {
89    pub last_applied_log: Option<LogId<ExampleNodeId>>,
90
91    pub last_membership: StoredMembership<ExampleNodeId, BasicNode>,
92
93    /// Application data.
94    pub data: BTreeMap<String, String>,
95}
96
97#[derive(Debug, Clone)]
98pub struct ExampleStateMachine {
99    /// Application data.
100    pub db: Arc<sled::Db>,
101}
102
103impl From<&ExampleStateMachine> for SerializableExampleStateMachine {
104    fn from(state: &ExampleStateMachine) -> Self {
105        let mut data_tree = BTreeMap::new();
106        for entry_res in data(&state.db).iter() {
107            let entry = entry_res.expect("read db failed");
108
109            let key: &[u8] = &entry.0;
110            let value: &[u8] = &entry.1;
111            data_tree.insert(
112                String::from_utf8(key.to_vec()).expect("invalid key"),
113                String::from_utf8(value.to_vec()).expect("invalid data"),
114            );
115        }
116        Self {
117            last_applied_log: state.get_last_applied_log().expect("last_applied_log"),
118            last_membership: state.get_last_membership().expect("last_membership"),
119            data: data_tree,
120        }
121    }
122}
123
124fn read_sm_err<E: Error + 'static>(e: E) -> StorageIOError<ExampleNodeId> {
125    StorageIOError::read_state_machine(&e)
126}
127fn write_sm_err<E: Error + 'static>(e: E) -> StorageIOError<ExampleNodeId> {
128    StorageIOError::write_state_machine(&e)
129}
130fn read_snap_err<E: Error + 'static>(e: E) -> StorageIOError<ExampleNodeId> {
131    StorageIOError::read(&e)
132}
133fn write_snap_err<E: Error + 'static>(e: E) -> StorageIOError<ExampleNodeId> {
134    StorageIOError::write(&e)
135}
136fn read_vote_err<E: Error + 'static>(e: E) -> StorageIOError<ExampleNodeId> {
137    StorageIOError::read_vote(&e)
138}
139fn write_vote_err<E: Error + 'static>(e: E) -> StorageIOError<ExampleNodeId> {
140    StorageIOError::write_vote(&e)
141}
142fn read_logs_err<E: Error + 'static>(e: E) -> StorageIOError<ExampleNodeId> {
143    StorageIOError::read_logs(&e)
144}
145fn write_logs_err<E: Error + 'static>(e: E) -> StorageIOError<ExampleNodeId> {
146    StorageIOError::write_logs(&e)
147}
148fn read_err<E: Error + 'static>(e: E) -> StorageIOError<ExampleNodeId> {
149    StorageIOError::read(&e)
150}
151fn write_err<E: Error + 'static>(e: E) -> StorageIOError<ExampleNodeId> {
152    StorageIOError::write(&e)
153}
154
155fn conflictable_txn_err<E: Error + 'static>(e: E) -> sled::transaction::ConflictableTransactionError<AnyError> {
156    sled::transaction::ConflictableTransactionError::Abort(AnyError::new(&e))
157}
158
159impl ExampleStateMachine {
160    fn get_last_membership(&self) -> StorageResult<StoredMembership<ExampleNodeId, BasicNode>> {
161        let state_machine = state_machine(&self.db);
162        let ivec = state_machine.get(b"last_membership").map_err(read_sm_err)?;
163
164        let m = if let Some(ivec) = ivec {
165            serde_json::from_slice(&ivec).map_err(read_sm_err)?
166        } else {
167            StoredMembership::default()
168        };
169
170        Ok(m)
171    }
172    async fn set_last_membership(&self, membership: StoredMembership<ExampleNodeId, BasicNode>) -> StorageResult<()> {
173        let value = serde_json::to_vec(&membership).map_err(write_sm_err)?;
174        let state_machine = state_machine(&self.db);
175        state_machine.insert(b"last_membership", value).map_err(write_err)?;
176
177        state_machine.flush_async().await.map_err(write_err)?;
178        Ok(())
179    }
180    fn set_last_membership_tx(
181        &self,
182        tx_state_machine: &sled::transaction::TransactionalTree,
183        membership: StoredMembership<ExampleNodeId, BasicNode>,
184    ) -> Result<(), sled::transaction::ConflictableTransactionError<AnyError>> {
185        let value = serde_json::to_vec(&membership).map_err(conflictable_txn_err)?;
186        tx_state_machine.insert(b"last_membership", value).map_err(conflictable_txn_err)?;
187        Ok(())
188    }
189    fn get_last_applied_log(&self) -> StorageResult<Option<LogId<ExampleNodeId>>> {
190        let state_machine = state_machine(&self.db);
191        let ivec = state_machine.get(b"last_applied_log").map_err(read_logs_err)?;
192        let last_applied = if let Some(ivec) = ivec {
193            serde_json::from_slice(&ivec).map_err(read_sm_err)?
194        } else {
195            None
196        };
197        Ok(last_applied)
198    }
199    async fn set_last_applied_log(&self, log_id: LogId<ExampleNodeId>) -> StorageResult<()> {
200        let value = serde_json::to_vec(&log_id).map_err(write_sm_err)?;
201        let state_machine = state_machine(&self.db);
202        state_machine.insert(b"last_applied_log", value).map_err(read_logs_err)?;
203
204        state_machine.flush_async().await.map_err(read_logs_err)?;
205        Ok(())
206    }
207    fn set_last_applied_log_tx(
208        &self,
209        tx_state_machine: &sled::transaction::TransactionalTree,
210        log_id: LogId<ExampleNodeId>,
211    ) -> Result<(), sled::transaction::ConflictableTransactionError<AnyError>> {
212        let value = serde_json::to_vec(&log_id).map_err(conflictable_txn_err)?;
213        tx_state_machine.insert(b"last_applied_log", value).map_err(conflictable_txn_err)?;
214        Ok(())
215    }
216    async fn from_serializable(sm: SerializableExampleStateMachine, db: Arc<sled::Db>) -> StorageResult<Self> {
217        let data_tree = data(&db);
218        let mut batch = sled::Batch::default();
219        for (key, value) in sm.data {
220            batch.insert(key.as_bytes(), value.as_bytes())
221        }
222        data_tree.apply_batch(batch).map_err(write_sm_err)?;
223        data_tree.flush_async().await.map_err(write_snap_err)?;
224
225        let r = Self { db };
226        if let Some(log_id) = sm.last_applied_log {
227            r.set_last_applied_log(log_id).await?;
228        }
229        r.set_last_membership(sm.last_membership).await?;
230
231        Ok(r)
232    }
233
234    fn new(db: Arc<sled::Db>) -> ExampleStateMachine {
235        Self { db }
236    }
237    fn insert_tx(
238        &self,
239        tx_data_tree: &sled::transaction::TransactionalTree,
240        key: String,
241        value: String,
242    ) -> Result<(), sled::transaction::ConflictableTransactionError<AnyError>> {
243        tx_data_tree.insert(key.as_bytes(), value.as_bytes()).map_err(conflictable_txn_err)?;
244        Ok(())
245    }
246    pub fn get(&self, key: &str) -> StorageResult<Option<String>> {
247        let key = key.as_bytes();
248        let data_tree = data(&self.db);
249        data_tree
250            .get(key)
251            .map(|value| value.map(|value| String::from_utf8(value.to_vec()).expect("invalid data")))
252            .map_err(|e| StorageIOError::read(&e).into())
253    }
254    pub fn get_all(&self) -> StorageResult<Vec<String>> {
255        let data_tree = data(&self.db);
256
257        let data = data_tree
258            .iter()
259            .filter_map(|entry_res| {
260                if let Ok(el) = entry_res {
261                    Some(String::from_utf8(el.1.to_vec()).expect("invalid data"))
262                } else {
263                    None
264                }
265            })
266            .collect::<Vec<_>>();
267        Ok(data)
268    }
269}
270
271#[derive(Debug)]
272pub struct SledStore {
273    db: Arc<sled::Db>,
274
275    /// The Raft state machine.
276    pub state_machine: RwLock<ExampleStateMachine>,
277}
278
279type StorageResult<T> = Result<T, StorageError<ExampleNodeId>>;
280
281/// converts an id to a byte vector for storing in the database.
282/// Note that we're using big endian encoding to ensure correct sorting of keys
283/// with notes form: <https://github.com/spacejam/sled#a-note-on-lexicographic-ordering-and-endianness>
284fn id_to_bin(id: u64) -> [u8; 8] {
285    let mut buf: [u8; 8] = [0; 8];
286    BigEndian::write_u64(&mut buf, id);
287    buf
288}
289
290fn bin_to_id(buf: &[u8]) -> u64 {
291    (&buf[0..8]).read_u64::<BigEndian>().unwrap()
292}
293
294impl SledStore {
295    fn get_last_purged_(&self) -> StorageResult<Option<LogId<u64>>> {
296        let store_tree = store(&self.db);
297        let val = store_tree.get(b"last_purged_log_id").map_err(read_err)?;
298
299        if let Some(v) = val {
300            let val = serde_json::from_slice(&v).map_err(read_err)?;
301            Ok(Some(val))
302        } else {
303            Ok(None)
304        }
305    }
306
307    async fn set_last_purged_(&self, log_id: LogId<u64>) -> StorageResult<()> {
308        let store_tree = store(&self.db);
309        let val = serde_json::to_vec(&log_id).unwrap();
310        store_tree.insert(b"last_purged_log_id", val.as_slice()).map_err(write_snap_err)?;
311
312        store_tree.flush_async().await.map_err(write_snap_err)?;
313        Ok(())
314    }
315
316    fn get_snapshot_index_(&self) -> StorageResult<u64> {
317        let store_tree = store(&self.db);
318        let ivec = store_tree.get(b"snapshot_index").map_err(read_snap_err)?;
319
320        if let Some(v) = ivec {
321            let val = serde_json::from_slice(&v).map_err(read_err)?;
322            Ok(val)
323        } else {
324            Ok(0)
325        }
326    }
327
328    async fn set_snapshot_index_(&self, snapshot_index: u64) -> StorageResult<()> {
329        let store_tree = store(&self.db);
330        let val = serde_json::to_vec(&snapshot_index).unwrap();
331        store_tree.insert(b"snapshot_index", val.as_slice()).map_err(write_snap_err)?;
332
333        store_tree.flush_async().await.map_err(write_snap_err)?;
334        Ok(())
335    }
336
337    async fn set_vote_(&self, vote: &Vote<ExampleNodeId>) -> Result<(), StorageError<ExampleNodeId>> {
338        let store_tree = store(&self.db);
339        let val = serde_json::to_vec(vote).unwrap();
340        store_tree.insert(b"vote", val).map_err(write_vote_err)?;
341
342        store_tree.flush_async().await.map_err(write_vote_err)?;
343
344        Ok(())
345    }
346
347    fn get_vote_(&self) -> StorageResult<Option<Vote<ExampleNodeId>>> {
348        let store_tree = store(&self.db);
349        let val = store_tree.get(b"vote").map_err(read_vote_err)?;
350        let ivec = if let Some(t) = val {
351            t
352        } else {
353            return Ok(None);
354        };
355
356        let v = serde_json::from_slice(&ivec).map_err(read_vote_err)?;
357        Ok(Some(v))
358    }
359
360    fn get_current_snapshot_(&self) -> StorageResult<Option<ExampleSnapshot>> {
361        let store_tree = store(&self.db);
362        let ivec = store_tree.get(b"snapshot").map_err(read_snap_err)?;
363
364        if let Some(ivec) = ivec {
365            let snap = serde_json::from_slice(&ivec).map_err(read_snap_err)?;
366            Ok(Some(snap))
367        } else {
368            Ok(None)
369        }
370    }
371
372    async fn set_current_snapshot_(&self, snap: ExampleSnapshot) -> StorageResult<()> {
373        let store_tree = store(&self.db);
374        let val = serde_json::to_vec(&snap).unwrap();
375        let meta = snap.meta.clone();
376        store_tree.insert(b"snapshot", val.as_slice()).map_err(|e| StorageError::IO {
377            source: StorageIOError::write_snapshot(Some(snap.meta.signature()), &e),
378        })?;
379
380        store_tree
381            .flush_async()
382            .await
383            .map_err(|e| StorageIOError::write_snapshot(Some(meta.signature()), &e).into())
384            .map(|_| ())
385    }
386}
387
388impl RaftLogReader<TypeConfig> for Arc<SledStore> {
389    async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend>(
390        &mut self,
391        range: RB,
392    ) -> StorageResult<Vec<Entry<TypeConfig>>> {
393        let start_bound = range.start_bound();
394        let start = match start_bound {
395            std::ops::Bound::Included(x) => id_to_bin(*x),
396            std::ops::Bound::Excluded(x) => id_to_bin(*x + 1),
397            std::ops::Bound::Unbounded => id_to_bin(0),
398        };
399        let logs_tree = logs(&self.db);
400        let logs = logs_tree
401            .range::<&[u8], _>(start.as_slice()..)
402            .map(|el_res| {
403                let el = el_res.expect("Failed read log entry");
404                let id = el.0;
405                let val = el.1;
406                let entry: StorageResult<Entry<_>> = serde_json::from_slice(&val).map_err(|e| StorageError::IO {
407                    source: StorageIOError::read_logs(&e),
408                });
409                let id = bin_to_id(&id);
410
411                assert_eq!(Ok(id), entry.as_ref().map(|e| e.log_id.index));
412                (id, entry)
413            })
414            .take_while(|(id, _)| range.contains(id))
415            .map(|x| x.1)
416            .collect();
417        logs
418    }
419}
420
421impl RaftSnapshotBuilder<TypeConfig> for Arc<SledStore> {
422    #[tracing::instrument(level = "trace", skip(self))]
423    async fn build_snapshot(&mut self) -> Result<Snapshot<TypeConfig>, StorageError<ExampleNodeId>> {
424        let data;
425        let last_applied_log;
426        let last_membership;
427
428        {
429            // Serialize the data of the state machine.
430            let state_machine = SerializableExampleStateMachine::from(&*self.state_machine.read().await);
431            data = serde_json::to_vec(&state_machine).map_err(|e| StorageIOError::read_state_machine(&e))?;
432
433            last_applied_log = state_machine.last_applied_log;
434            last_membership = state_machine.last_membership;
435        }
436
437        // TODO: we probably want this to be atomic.
438        let snapshot_idx: u64 = self.get_snapshot_index_()? + 1;
439        self.set_snapshot_index_(snapshot_idx).await?;
440
441        let snapshot_id = if let Some(last) = last_applied_log {
442            format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx)
443        } else {
444            format!("--{}", snapshot_idx)
445        };
446
447        let meta = SnapshotMeta {
448            last_log_id: last_applied_log,
449            last_membership,
450            snapshot_id,
451        };
452
453        let snapshot = ExampleSnapshot {
454            meta: meta.clone(),
455            data: data.clone(),
456        };
457
458        self.set_current_snapshot_(snapshot).await?;
459
460        Ok(Snapshot {
461            meta,
462            snapshot: Box::new(Cursor::new(data)),
463        })
464    }
465}
466
467impl RaftStorage<TypeConfig> for Arc<SledStore> {
468    type LogReader = Self;
469    type SnapshotBuilder = Self;
470
471    async fn get_log_state(&mut self) -> StorageResult<LogState<TypeConfig>> {
472        let last_purged = self.get_last_purged_()?;
473
474        let logs_tree = logs(&self.db);
475        let last_ivec_kv = logs_tree.last().map_err(read_logs_err)?;
476        let (_, ent_ivec) = if let Some(last) = last_ivec_kv {
477            last
478        } else {
479            return Ok(LogState {
480                last_purged_log_id: last_purged,
481                last_log_id: last_purged,
482            });
483        };
484
485        let last_ent = serde_json::from_slice::<Entry<TypeConfig>>(&ent_ivec).map_err(read_logs_err)?;
486        let last_log_id = Some(*last_ent.get_log_id());
487
488        let last_log_id = std::cmp::max(last_log_id, last_purged);
489        Ok(LogState {
490            last_purged_log_id: last_purged,
491            last_log_id,
492        })
493    }
494
495    #[tracing::instrument(level = "trace", skip(self))]
496    async fn save_vote(&mut self, vote: &Vote<ExampleNodeId>) -> Result<(), StorageError<ExampleNodeId>> {
497        self.set_vote_(vote).await
498    }
499
500    async fn read_vote(&mut self) -> Result<Option<Vote<ExampleNodeId>>, StorageError<ExampleNodeId>> {
501        self.get_vote_()
502    }
503
504    async fn get_log_reader(&mut self) -> Self::LogReader {
505        self.clone()
506    }
507
508    #[tracing::instrument(level = "trace", skip(self, entries))]
509    async fn append_to_log<I>(&mut self, entries: I) -> StorageResult<()>
510    where I: IntoIterator<Item = Entry<TypeConfig>> + OptionalSend {
511        let logs_tree = logs(&self.db);
512        let mut batch = sled::Batch::default();
513        for entry in entries {
514            let id = id_to_bin(entry.log_id.index);
515            assert_eq!(bin_to_id(&id), entry.log_id.index);
516            let value = serde_json::to_vec(&entry).map_err(write_logs_err)?;
517            batch.insert(id.as_slice(), value);
518        }
519        logs_tree.apply_batch(batch).map_err(write_logs_err)?;
520
521        logs_tree.flush_async().await.map_err(write_logs_err)?;
522        Ok(())
523    }
524
525    #[tracing::instrument(level = "debug", skip(self))]
526    async fn delete_conflict_logs_since(&mut self, log_id: LogId<ExampleNodeId>) -> StorageResult<()> {
527        tracing::debug!("delete_log: [{:?}, +oo)", log_id);
528
529        let from = id_to_bin(log_id.index);
530        let to = id_to_bin(0xff_ff_ff_ff_ff_ff_ff_ff);
531        let logs_tree = logs(&self.db);
532        let entries = logs_tree.range::<&[u8], _>(from.as_slice()..to.as_slice());
533        let mut batch_del = sled::Batch::default();
534        for entry_res in entries {
535            let entry = entry_res.expect("Read db entry failed");
536            batch_del.remove(entry.0);
537        }
538        logs_tree.apply_batch(batch_del).map_err(write_logs_err)?;
539        logs_tree.flush_async().await.map_err(write_logs_err)?;
540        Ok(())
541    }
542
543    #[tracing::instrument(level = "debug", skip(self))]
544    async fn purge_logs_upto(&mut self, log_id: LogId<ExampleNodeId>) -> Result<(), StorageError<ExampleNodeId>> {
545        tracing::debug!("delete_log: [0, {:?}]", log_id);
546
547        self.set_last_purged_(log_id).await?;
548        let from = id_to_bin(0);
549        let to = id_to_bin(log_id.index);
550        let logs_tree = logs(&self.db);
551        let entries = logs_tree.range::<&[u8], _>(from.as_slice()..=to.as_slice());
552        let mut batch_del = sled::Batch::default();
553        for entry_res in entries {
554            let entry = entry_res.expect("Read db entry failed");
555            batch_del.remove(entry.0);
556        }
557        logs_tree.apply_batch(batch_del).map_err(write_logs_err)?;
558
559        logs_tree.flush_async().await.map_err(write_logs_err)?;
560        Ok(())
561    }
562
563    async fn last_applied_state(
564        &mut self,
565    ) -> Result<(Option<LogId<ExampleNodeId>>, StoredMembership<ExampleNodeId, BasicNode>), StorageError<ExampleNodeId>>
566    {
567        let state_machine = self.state_machine.read().await;
568        Ok((
569            state_machine.get_last_applied_log()?,
570            state_machine.get_last_membership()?,
571        ))
572    }
573
574    #[tracing::instrument(level = "trace", skip(self, entries))]
575    async fn apply_to_state_machine(
576        &mut self,
577        entries: &[Entry<TypeConfig>],
578    ) -> Result<Vec<ExampleResponse>, StorageError<ExampleNodeId>> {
579        let sm = self.state_machine.write().await;
580        let state_machine = state_machine(&self.db);
581        let data_tree = data(&self.db);
582        let trans_res = (&state_machine, &data_tree).transaction(|(tx_state_machine, tx_data_tree)| {
583            let mut res = Vec::with_capacity(entries.len());
584
585            for entry in entries {
586                tracing::debug!(%entry.log_id, "replicate to sm");
587
588                sm.set_last_applied_log_tx(tx_state_machine, entry.log_id)?;
589
590                match entry.payload {
591                    EntryPayload::Blank => res.push(ExampleResponse { value: None }),
592                    EntryPayload::Normal(ref req) => match req {
593                        ExampleRequest::Set { key, value } => {
594                            sm.insert_tx(tx_data_tree, key.clone(), value.clone())?;
595                            res.push(ExampleResponse {
596                                value: Some(value.clone()),
597                            })
598                        }
599                    },
600                    EntryPayload::Membership(ref mem) => {
601                        let membership = StoredMembership::new(Some(entry.log_id), mem.clone());
602                        sm.set_last_membership_tx(tx_state_machine, membership)?;
603                        res.push(ExampleResponse { value: None })
604                    }
605                };
606            }
607            Ok(res)
608        });
609        let result_vec = trans_res.map_err(|e| StorageIOError::write(&e))?;
610
611        self.db.flush_async().await.map_err(|e| StorageIOError::write_logs(&e))?;
612        Ok(result_vec)
613    }
614
615    async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder {
616        self.clone()
617    }
618
619    #[tracing::instrument(level = "trace", skip(self))]
620    async fn begin_receiving_snapshot(
621        &mut self,
622    ) -> Result<Box<<TypeConfig as RaftTypeConfig>::SnapshotData>, StorageError<ExampleNodeId>> {
623        Ok(Box::new(Cursor::new(Vec::new())))
624    }
625
626    #[tracing::instrument(level = "trace", skip(self, snapshot))]
627    async fn install_snapshot(
628        &mut self,
629        meta: &SnapshotMeta<ExampleNodeId, BasicNode>,
630        snapshot: Box<<TypeConfig as RaftTypeConfig>::SnapshotData>,
631    ) -> Result<(), StorageError<ExampleNodeId>> {
632        tracing::info!(
633            { snapshot_size = snapshot.get_ref().len() },
634            "decoding snapshot for installation"
635        );
636
637        let new_snapshot = ExampleSnapshot {
638            meta: meta.clone(),
639            data: snapshot.into_inner(),
640        };
641
642        // Update the state machine.
643        {
644            let updated_state_machine: SerializableExampleStateMachine = serde_json::from_slice(&new_snapshot.data)
645                .map_err(|e| StorageIOError::read_snapshot(Some(new_snapshot.meta.signature()), &e))?;
646            let mut state_machine = self.state_machine.write().await;
647            *state_machine = ExampleStateMachine::from_serializable(updated_state_machine, self.db.clone()).await?;
648        }
649
650        self.set_current_snapshot_(new_snapshot).await?;
651        Ok(())
652    }
653
654    #[tracing::instrument(level = "trace", skip(self))]
655    async fn get_current_snapshot(&mut self) -> Result<Option<Snapshot<TypeConfig>>, StorageError<ExampleNodeId>> {
656        match SledStore::get_current_snapshot_(self)? {
657            Some(snapshot) => {
658                let data = snapshot.data.clone();
659                Ok(Some(Snapshot {
660                    meta: snapshot.meta,
661                    snapshot: Box::new(Cursor::new(data)),
662                }))
663            }
664            None => Ok(None),
665        }
666    }
667}
668impl SledStore {
669    pub async fn new(db: Arc<sled::Db>) -> Arc<SledStore> {
670        let _store = store(&db);
671        let _state_machine = state_machine(&db);
672        let _data = data(&db);
673        let _logs = logs(&db);
674
675        let state_machine = RwLock::new(ExampleStateMachine::new(db.clone()));
676        Arc::new(SledStore { db, state_machine })
677    }
678}
679
680fn store(db: &sled::Db) -> sled::Tree {
681    db.open_tree("store").expect("store open failed")
682}
683fn logs(db: &sled::Db) -> sled::Tree {
684    db.open_tree("logs").expect("logs open failed")
685}
686fn data(db: &sled::Db) -> sled::Tree {
687    db.open_tree("data").expect("data open failed")
688}
689fn state_machine(db: &sled::Db) -> sled::Tree {
690    db.open_tree("state_machine").expect("state_machine open failed")
691}