1#![deny(unused_crate_dependencies)]
2#![deny(unused_qualifications)]
3
4#[cfg(test)]
5mod test;
6
7use std::collections::BTreeMap;
8use std::error::Error;
9use std::fmt::Debug;
10use std::io::Cursor;
11use std::ops::RangeBounds;
12use std::sync::Arc;
13
14use async_std::sync::RwLock;
15use byteorder::BigEndian;
16use byteorder::ByteOrder;
17use byteorder::ReadBytesExt;
18use openraft::storage::LogState;
19use openraft::storage::Snapshot;
20use openraft::AnyError;
21use openraft::BasicNode;
22use openraft::Entry;
23use openraft::EntryPayload;
24use openraft::LogId;
25use openraft::OptionalSend;
26use openraft::RaftLogId;
27use openraft::RaftLogReader;
28use openraft::RaftSnapshotBuilder;
29use openraft::RaftStorage;
30use openraft::RaftTypeConfig;
31use openraft::SnapshotMeta;
32use openraft::StorageError;
33use openraft::StorageIOError;
34use openraft::StoredMembership;
35use openraft::Vote;
36use serde::Deserialize;
37use serde::Serialize;
38use sled::Transactional;
39
40pub type ExampleNodeId = u64;
41
42openraft::declare_raft_types!(
43 pub TypeConfig:
45 D = ExampleRequest,
46 R = ExampleResponse,
47);
48
49#[derive(Serialize, Deserialize, Debug, Clone)]
56pub enum ExampleRequest {
57 Set { key: String, value: String },
58}
59
60#[derive(Serialize, Deserialize, Debug, Clone)]
69pub struct ExampleResponse {
70 pub value: Option<String>,
71}
72
73#[derive(Serialize, Deserialize, Debug)]
74pub struct ExampleSnapshot {
75 pub meta: SnapshotMeta<ExampleNodeId, BasicNode>,
76
77 pub data: Vec<u8>,
79}
80
81#[derive(Serialize, Deserialize, Debug, Default, Clone)]
88pub struct SerializableExampleStateMachine {
89 pub last_applied_log: Option<LogId<ExampleNodeId>>,
90
91 pub last_membership: StoredMembership<ExampleNodeId, BasicNode>,
92
93 pub data: BTreeMap<String, String>,
95}
96
97#[derive(Debug, Clone)]
98pub struct ExampleStateMachine {
99 pub db: Arc<sled::Db>,
101}
102
103impl From<&ExampleStateMachine> for SerializableExampleStateMachine {
104 fn from(state: &ExampleStateMachine) -> Self {
105 let mut data_tree = BTreeMap::new();
106 for entry_res in data(&state.db).iter() {
107 let entry = entry_res.expect("read db failed");
108
109 let key: &[u8] = &entry.0;
110 let value: &[u8] = &entry.1;
111 data_tree.insert(
112 String::from_utf8(key.to_vec()).expect("invalid key"),
113 String::from_utf8(value.to_vec()).expect("invalid data"),
114 );
115 }
116 Self {
117 last_applied_log: state.get_last_applied_log().expect("last_applied_log"),
118 last_membership: state.get_last_membership().expect("last_membership"),
119 data: data_tree,
120 }
121 }
122}
123
124fn read_sm_err<E: Error + 'static>(e: E) -> StorageIOError<ExampleNodeId> {
125 StorageIOError::read_state_machine(&e)
126}
127fn write_sm_err<E: Error + 'static>(e: E) -> StorageIOError<ExampleNodeId> {
128 StorageIOError::write_state_machine(&e)
129}
130fn read_snap_err<E: Error + 'static>(e: E) -> StorageIOError<ExampleNodeId> {
131 StorageIOError::read(&e)
132}
133fn write_snap_err<E: Error + 'static>(e: E) -> StorageIOError<ExampleNodeId> {
134 StorageIOError::write(&e)
135}
136fn read_vote_err<E: Error + 'static>(e: E) -> StorageIOError<ExampleNodeId> {
137 StorageIOError::read_vote(&e)
138}
139fn write_vote_err<E: Error + 'static>(e: E) -> StorageIOError<ExampleNodeId> {
140 StorageIOError::write_vote(&e)
141}
142fn read_logs_err<E: Error + 'static>(e: E) -> StorageIOError<ExampleNodeId> {
143 StorageIOError::read_logs(&e)
144}
145fn write_logs_err<E: Error + 'static>(e: E) -> StorageIOError<ExampleNodeId> {
146 StorageIOError::write_logs(&e)
147}
148fn read_err<E: Error + 'static>(e: E) -> StorageIOError<ExampleNodeId> {
149 StorageIOError::read(&e)
150}
151fn write_err<E: Error + 'static>(e: E) -> StorageIOError<ExampleNodeId> {
152 StorageIOError::write(&e)
153}
154
155fn conflictable_txn_err<E: Error + 'static>(e: E) -> sled::transaction::ConflictableTransactionError<AnyError> {
156 sled::transaction::ConflictableTransactionError::Abort(AnyError::new(&e))
157}
158
159impl ExampleStateMachine {
160 fn get_last_membership(&self) -> StorageResult<StoredMembership<ExampleNodeId, BasicNode>> {
161 let state_machine = state_machine(&self.db);
162 let ivec = state_machine.get(b"last_membership").map_err(read_sm_err)?;
163
164 let m = if let Some(ivec) = ivec {
165 serde_json::from_slice(&ivec).map_err(read_sm_err)?
166 } else {
167 StoredMembership::default()
168 };
169
170 Ok(m)
171 }
172 async fn set_last_membership(&self, membership: StoredMembership<ExampleNodeId, BasicNode>) -> StorageResult<()> {
173 let value = serde_json::to_vec(&membership).map_err(write_sm_err)?;
174 let state_machine = state_machine(&self.db);
175 state_machine.insert(b"last_membership", value).map_err(write_err)?;
176
177 state_machine.flush_async().await.map_err(write_err)?;
178 Ok(())
179 }
180 fn set_last_membership_tx(
181 &self,
182 tx_state_machine: &sled::transaction::TransactionalTree,
183 membership: StoredMembership<ExampleNodeId, BasicNode>,
184 ) -> Result<(), sled::transaction::ConflictableTransactionError<AnyError>> {
185 let value = serde_json::to_vec(&membership).map_err(conflictable_txn_err)?;
186 tx_state_machine.insert(b"last_membership", value).map_err(conflictable_txn_err)?;
187 Ok(())
188 }
189 fn get_last_applied_log(&self) -> StorageResult<Option<LogId<ExampleNodeId>>> {
190 let state_machine = state_machine(&self.db);
191 let ivec = state_machine.get(b"last_applied_log").map_err(read_logs_err)?;
192 let last_applied = if let Some(ivec) = ivec {
193 serde_json::from_slice(&ivec).map_err(read_sm_err)?
194 } else {
195 None
196 };
197 Ok(last_applied)
198 }
199 async fn set_last_applied_log(&self, log_id: LogId<ExampleNodeId>) -> StorageResult<()> {
200 let value = serde_json::to_vec(&log_id).map_err(write_sm_err)?;
201 let state_machine = state_machine(&self.db);
202 state_machine.insert(b"last_applied_log", value).map_err(read_logs_err)?;
203
204 state_machine.flush_async().await.map_err(read_logs_err)?;
205 Ok(())
206 }
207 fn set_last_applied_log_tx(
208 &self,
209 tx_state_machine: &sled::transaction::TransactionalTree,
210 log_id: LogId<ExampleNodeId>,
211 ) -> Result<(), sled::transaction::ConflictableTransactionError<AnyError>> {
212 let value = serde_json::to_vec(&log_id).map_err(conflictable_txn_err)?;
213 tx_state_machine.insert(b"last_applied_log", value).map_err(conflictable_txn_err)?;
214 Ok(())
215 }
216 async fn from_serializable(sm: SerializableExampleStateMachine, db: Arc<sled::Db>) -> StorageResult<Self> {
217 let data_tree = data(&db);
218 let mut batch = sled::Batch::default();
219 for (key, value) in sm.data {
220 batch.insert(key.as_bytes(), value.as_bytes())
221 }
222 data_tree.apply_batch(batch).map_err(write_sm_err)?;
223 data_tree.flush_async().await.map_err(write_snap_err)?;
224
225 let r = Self { db };
226 if let Some(log_id) = sm.last_applied_log {
227 r.set_last_applied_log(log_id).await?;
228 }
229 r.set_last_membership(sm.last_membership).await?;
230
231 Ok(r)
232 }
233
234 fn new(db: Arc<sled::Db>) -> ExampleStateMachine {
235 Self { db }
236 }
237 fn insert_tx(
238 &self,
239 tx_data_tree: &sled::transaction::TransactionalTree,
240 key: String,
241 value: String,
242 ) -> Result<(), sled::transaction::ConflictableTransactionError<AnyError>> {
243 tx_data_tree.insert(key.as_bytes(), value.as_bytes()).map_err(conflictable_txn_err)?;
244 Ok(())
245 }
246 pub fn get(&self, key: &str) -> StorageResult<Option<String>> {
247 let key = key.as_bytes();
248 let data_tree = data(&self.db);
249 data_tree
250 .get(key)
251 .map(|value| value.map(|value| String::from_utf8(value.to_vec()).expect("invalid data")))
252 .map_err(|e| StorageIOError::read(&e).into())
253 }
254 pub fn get_all(&self) -> StorageResult<Vec<String>> {
255 let data_tree = data(&self.db);
256
257 let data = data_tree
258 .iter()
259 .filter_map(|entry_res| {
260 if let Ok(el) = entry_res {
261 Some(String::from_utf8(el.1.to_vec()).expect("invalid data"))
262 } else {
263 None
264 }
265 })
266 .collect::<Vec<_>>();
267 Ok(data)
268 }
269}
270
271#[derive(Debug)]
272pub struct SledStore {
273 db: Arc<sled::Db>,
274
275 pub state_machine: RwLock<ExampleStateMachine>,
277}
278
279type StorageResult<T> = Result<T, StorageError<ExampleNodeId>>;
280
281fn id_to_bin(id: u64) -> [u8; 8] {
285 let mut buf: [u8; 8] = [0; 8];
286 BigEndian::write_u64(&mut buf, id);
287 buf
288}
289
290fn bin_to_id(buf: &[u8]) -> u64 {
291 (&buf[0..8]).read_u64::<BigEndian>().unwrap()
292}
293
294impl SledStore {
295 fn get_last_purged_(&self) -> StorageResult<Option<LogId<u64>>> {
296 let store_tree = store(&self.db);
297 let val = store_tree.get(b"last_purged_log_id").map_err(read_err)?;
298
299 if let Some(v) = val {
300 let val = serde_json::from_slice(&v).map_err(read_err)?;
301 Ok(Some(val))
302 } else {
303 Ok(None)
304 }
305 }
306
307 async fn set_last_purged_(&self, log_id: LogId<u64>) -> StorageResult<()> {
308 let store_tree = store(&self.db);
309 let val = serde_json::to_vec(&log_id).unwrap();
310 store_tree.insert(b"last_purged_log_id", val.as_slice()).map_err(write_snap_err)?;
311
312 store_tree.flush_async().await.map_err(write_snap_err)?;
313 Ok(())
314 }
315
316 fn get_snapshot_index_(&self) -> StorageResult<u64> {
317 let store_tree = store(&self.db);
318 let ivec = store_tree.get(b"snapshot_index").map_err(read_snap_err)?;
319
320 if let Some(v) = ivec {
321 let val = serde_json::from_slice(&v).map_err(read_err)?;
322 Ok(val)
323 } else {
324 Ok(0)
325 }
326 }
327
328 async fn set_snapshot_index_(&self, snapshot_index: u64) -> StorageResult<()> {
329 let store_tree = store(&self.db);
330 let val = serde_json::to_vec(&snapshot_index).unwrap();
331 store_tree.insert(b"snapshot_index", val.as_slice()).map_err(write_snap_err)?;
332
333 store_tree.flush_async().await.map_err(write_snap_err)?;
334 Ok(())
335 }
336
337 async fn set_vote_(&self, vote: &Vote<ExampleNodeId>) -> Result<(), StorageError<ExampleNodeId>> {
338 let store_tree = store(&self.db);
339 let val = serde_json::to_vec(vote).unwrap();
340 store_tree.insert(b"vote", val).map_err(write_vote_err)?;
341
342 store_tree.flush_async().await.map_err(write_vote_err)?;
343
344 Ok(())
345 }
346
347 fn get_vote_(&self) -> StorageResult<Option<Vote<ExampleNodeId>>> {
348 let store_tree = store(&self.db);
349 let val = store_tree.get(b"vote").map_err(read_vote_err)?;
350 let ivec = if let Some(t) = val {
351 t
352 } else {
353 return Ok(None);
354 };
355
356 let v = serde_json::from_slice(&ivec).map_err(read_vote_err)?;
357 Ok(Some(v))
358 }
359
360 fn get_current_snapshot_(&self) -> StorageResult<Option<ExampleSnapshot>> {
361 let store_tree = store(&self.db);
362 let ivec = store_tree.get(b"snapshot").map_err(read_snap_err)?;
363
364 if let Some(ivec) = ivec {
365 let snap = serde_json::from_slice(&ivec).map_err(read_snap_err)?;
366 Ok(Some(snap))
367 } else {
368 Ok(None)
369 }
370 }
371
372 async fn set_current_snapshot_(&self, snap: ExampleSnapshot) -> StorageResult<()> {
373 let store_tree = store(&self.db);
374 let val = serde_json::to_vec(&snap).unwrap();
375 let meta = snap.meta.clone();
376 store_tree.insert(b"snapshot", val.as_slice()).map_err(|e| StorageError::IO {
377 source: StorageIOError::write_snapshot(Some(snap.meta.signature()), &e),
378 })?;
379
380 store_tree
381 .flush_async()
382 .await
383 .map_err(|e| StorageIOError::write_snapshot(Some(meta.signature()), &e).into())
384 .map(|_| ())
385 }
386}
387
388impl RaftLogReader<TypeConfig> for Arc<SledStore> {
389 async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend>(
390 &mut self,
391 range: RB,
392 ) -> StorageResult<Vec<Entry<TypeConfig>>> {
393 let start_bound = range.start_bound();
394 let start = match start_bound {
395 std::ops::Bound::Included(x) => id_to_bin(*x),
396 std::ops::Bound::Excluded(x) => id_to_bin(*x + 1),
397 std::ops::Bound::Unbounded => id_to_bin(0),
398 };
399 let logs_tree = logs(&self.db);
400 let logs = logs_tree
401 .range::<&[u8], _>(start.as_slice()..)
402 .map(|el_res| {
403 let el = el_res.expect("Failed read log entry");
404 let id = el.0;
405 let val = el.1;
406 let entry: StorageResult<Entry<_>> = serde_json::from_slice(&val).map_err(|e| StorageError::IO {
407 source: StorageIOError::read_logs(&e),
408 });
409 let id = bin_to_id(&id);
410
411 assert_eq!(Ok(id), entry.as_ref().map(|e| e.log_id.index));
412 (id, entry)
413 })
414 .take_while(|(id, _)| range.contains(id))
415 .map(|x| x.1)
416 .collect();
417 logs
418 }
419}
420
421impl RaftSnapshotBuilder<TypeConfig> for Arc<SledStore> {
422 #[tracing::instrument(level = "trace", skip(self))]
423 async fn build_snapshot(&mut self) -> Result<Snapshot<TypeConfig>, StorageError<ExampleNodeId>> {
424 let data;
425 let last_applied_log;
426 let last_membership;
427
428 {
429 let state_machine = SerializableExampleStateMachine::from(&*self.state_machine.read().await);
431 data = serde_json::to_vec(&state_machine).map_err(|e| StorageIOError::read_state_machine(&e))?;
432
433 last_applied_log = state_machine.last_applied_log;
434 last_membership = state_machine.last_membership;
435 }
436
437 let snapshot_idx: u64 = self.get_snapshot_index_()? + 1;
439 self.set_snapshot_index_(snapshot_idx).await?;
440
441 let snapshot_id = if let Some(last) = last_applied_log {
442 format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx)
443 } else {
444 format!("--{}", snapshot_idx)
445 };
446
447 let meta = SnapshotMeta {
448 last_log_id: last_applied_log,
449 last_membership,
450 snapshot_id,
451 };
452
453 let snapshot = ExampleSnapshot {
454 meta: meta.clone(),
455 data: data.clone(),
456 };
457
458 self.set_current_snapshot_(snapshot).await?;
459
460 Ok(Snapshot {
461 meta,
462 snapshot: Box::new(Cursor::new(data)),
463 })
464 }
465}
466
467impl RaftStorage<TypeConfig> for Arc<SledStore> {
468 type LogReader = Self;
469 type SnapshotBuilder = Self;
470
471 async fn get_log_state(&mut self) -> StorageResult<LogState<TypeConfig>> {
472 let last_purged = self.get_last_purged_()?;
473
474 let logs_tree = logs(&self.db);
475 let last_ivec_kv = logs_tree.last().map_err(read_logs_err)?;
476 let (_, ent_ivec) = if let Some(last) = last_ivec_kv {
477 last
478 } else {
479 return Ok(LogState {
480 last_purged_log_id: last_purged,
481 last_log_id: last_purged,
482 });
483 };
484
485 let last_ent = serde_json::from_slice::<Entry<TypeConfig>>(&ent_ivec).map_err(read_logs_err)?;
486 let last_log_id = Some(*last_ent.get_log_id());
487
488 let last_log_id = std::cmp::max(last_log_id, last_purged);
489 Ok(LogState {
490 last_purged_log_id: last_purged,
491 last_log_id,
492 })
493 }
494
495 #[tracing::instrument(level = "trace", skip(self))]
496 async fn save_vote(&mut self, vote: &Vote<ExampleNodeId>) -> Result<(), StorageError<ExampleNodeId>> {
497 self.set_vote_(vote).await
498 }
499
500 async fn read_vote(&mut self) -> Result<Option<Vote<ExampleNodeId>>, StorageError<ExampleNodeId>> {
501 self.get_vote_()
502 }
503
504 async fn get_log_reader(&mut self) -> Self::LogReader {
505 self.clone()
506 }
507
508 #[tracing::instrument(level = "trace", skip(self, entries))]
509 async fn append_to_log<I>(&mut self, entries: I) -> StorageResult<()>
510 where I: IntoIterator<Item = Entry<TypeConfig>> + OptionalSend {
511 let logs_tree = logs(&self.db);
512 let mut batch = sled::Batch::default();
513 for entry in entries {
514 let id = id_to_bin(entry.log_id.index);
515 assert_eq!(bin_to_id(&id), entry.log_id.index);
516 let value = serde_json::to_vec(&entry).map_err(write_logs_err)?;
517 batch.insert(id.as_slice(), value);
518 }
519 logs_tree.apply_batch(batch).map_err(write_logs_err)?;
520
521 logs_tree.flush_async().await.map_err(write_logs_err)?;
522 Ok(())
523 }
524
525 #[tracing::instrument(level = "debug", skip(self))]
526 async fn delete_conflict_logs_since(&mut self, log_id: LogId<ExampleNodeId>) -> StorageResult<()> {
527 tracing::debug!("delete_log: [{:?}, +oo)", log_id);
528
529 let from = id_to_bin(log_id.index);
530 let to = id_to_bin(0xff_ff_ff_ff_ff_ff_ff_ff);
531 let logs_tree = logs(&self.db);
532 let entries = logs_tree.range::<&[u8], _>(from.as_slice()..to.as_slice());
533 let mut batch_del = sled::Batch::default();
534 for entry_res in entries {
535 let entry = entry_res.expect("Read db entry failed");
536 batch_del.remove(entry.0);
537 }
538 logs_tree.apply_batch(batch_del).map_err(write_logs_err)?;
539 logs_tree.flush_async().await.map_err(write_logs_err)?;
540 Ok(())
541 }
542
543 #[tracing::instrument(level = "debug", skip(self))]
544 async fn purge_logs_upto(&mut self, log_id: LogId<ExampleNodeId>) -> Result<(), StorageError<ExampleNodeId>> {
545 tracing::debug!("delete_log: [0, {:?}]", log_id);
546
547 self.set_last_purged_(log_id).await?;
548 let from = id_to_bin(0);
549 let to = id_to_bin(log_id.index);
550 let logs_tree = logs(&self.db);
551 let entries = logs_tree.range::<&[u8], _>(from.as_slice()..=to.as_slice());
552 let mut batch_del = sled::Batch::default();
553 for entry_res in entries {
554 let entry = entry_res.expect("Read db entry failed");
555 batch_del.remove(entry.0);
556 }
557 logs_tree.apply_batch(batch_del).map_err(write_logs_err)?;
558
559 logs_tree.flush_async().await.map_err(write_logs_err)?;
560 Ok(())
561 }
562
563 async fn last_applied_state(
564 &mut self,
565 ) -> Result<(Option<LogId<ExampleNodeId>>, StoredMembership<ExampleNodeId, BasicNode>), StorageError<ExampleNodeId>>
566 {
567 let state_machine = self.state_machine.read().await;
568 Ok((
569 state_machine.get_last_applied_log()?,
570 state_machine.get_last_membership()?,
571 ))
572 }
573
574 #[tracing::instrument(level = "trace", skip(self, entries))]
575 async fn apply_to_state_machine(
576 &mut self,
577 entries: &[Entry<TypeConfig>],
578 ) -> Result<Vec<ExampleResponse>, StorageError<ExampleNodeId>> {
579 let sm = self.state_machine.write().await;
580 let state_machine = state_machine(&self.db);
581 let data_tree = data(&self.db);
582 let trans_res = (&state_machine, &data_tree).transaction(|(tx_state_machine, tx_data_tree)| {
583 let mut res = Vec::with_capacity(entries.len());
584
585 for entry in entries {
586 tracing::debug!(%entry.log_id, "replicate to sm");
587
588 sm.set_last_applied_log_tx(tx_state_machine, entry.log_id)?;
589
590 match entry.payload {
591 EntryPayload::Blank => res.push(ExampleResponse { value: None }),
592 EntryPayload::Normal(ref req) => match req {
593 ExampleRequest::Set { key, value } => {
594 sm.insert_tx(tx_data_tree, key.clone(), value.clone())?;
595 res.push(ExampleResponse {
596 value: Some(value.clone()),
597 })
598 }
599 },
600 EntryPayload::Membership(ref mem) => {
601 let membership = StoredMembership::new(Some(entry.log_id), mem.clone());
602 sm.set_last_membership_tx(tx_state_machine, membership)?;
603 res.push(ExampleResponse { value: None })
604 }
605 };
606 }
607 Ok(res)
608 });
609 let result_vec = trans_res.map_err(|e| StorageIOError::write(&e))?;
610
611 self.db.flush_async().await.map_err(|e| StorageIOError::write_logs(&e))?;
612 Ok(result_vec)
613 }
614
615 async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder {
616 self.clone()
617 }
618
619 #[tracing::instrument(level = "trace", skip(self))]
620 async fn begin_receiving_snapshot(
621 &mut self,
622 ) -> Result<Box<<TypeConfig as RaftTypeConfig>::SnapshotData>, StorageError<ExampleNodeId>> {
623 Ok(Box::new(Cursor::new(Vec::new())))
624 }
625
626 #[tracing::instrument(level = "trace", skip(self, snapshot))]
627 async fn install_snapshot(
628 &mut self,
629 meta: &SnapshotMeta<ExampleNodeId, BasicNode>,
630 snapshot: Box<<TypeConfig as RaftTypeConfig>::SnapshotData>,
631 ) -> Result<(), StorageError<ExampleNodeId>> {
632 tracing::info!(
633 { snapshot_size = snapshot.get_ref().len() },
634 "decoding snapshot for installation"
635 );
636
637 let new_snapshot = ExampleSnapshot {
638 meta: meta.clone(),
639 data: snapshot.into_inner(),
640 };
641
642 {
644 let updated_state_machine: SerializableExampleStateMachine = serde_json::from_slice(&new_snapshot.data)
645 .map_err(|e| StorageIOError::read_snapshot(Some(new_snapshot.meta.signature()), &e))?;
646 let mut state_machine = self.state_machine.write().await;
647 *state_machine = ExampleStateMachine::from_serializable(updated_state_machine, self.db.clone()).await?;
648 }
649
650 self.set_current_snapshot_(new_snapshot).await?;
651 Ok(())
652 }
653
654 #[tracing::instrument(level = "trace", skip(self))]
655 async fn get_current_snapshot(&mut self) -> Result<Option<Snapshot<TypeConfig>>, StorageError<ExampleNodeId>> {
656 match SledStore::get_current_snapshot_(self)? {
657 Some(snapshot) => {
658 let data = snapshot.data.clone();
659 Ok(Some(Snapshot {
660 meta: snapshot.meta,
661 snapshot: Box::new(Cursor::new(data)),
662 }))
663 }
664 None => Ok(None),
665 }
666 }
667}
668impl SledStore {
669 pub async fn new(db: Arc<sled::Db>) -> Arc<SledStore> {
670 let _store = store(&db);
671 let _state_machine = state_machine(&db);
672 let _data = data(&db);
673 let _logs = logs(&db);
674
675 let state_machine = RwLock::new(ExampleStateMachine::new(db.clone()));
676 Arc::new(SledStore { db, state_machine })
677 }
678}
679
680fn store(db: &sled::Db) -> sled::Tree {
681 db.open_tree("store").expect("store open failed")
682}
683fn logs(db: &sled::Db) -> sled::Tree {
684 db.open_tree("logs").expect("logs open failed")
685}
686fn data(db: &sled::Db) -> sled::Tree {
687 db.open_tree("data").expect("data open failed")
688}
689fn state_machine(db: &sled::Db) -> sled::Tree {
690 db.open_tree("state_machine").expect("state_machine open failed")
691}