lol_core/
simple.rs

1use crate::snapshot::{BytesSnapshot, FileSnapshot};
2use crate::{Index, MakeSnapshot, RaftApp, SnapshotStream};
3use ::bytes::Bytes;
4use anyhow::Result;
5use async_trait::async_trait;
6
7/// Restricted RaftApp where snapshot is an explicit byte sequence.
8#[async_trait]
9pub trait RaftAppSimple: Sync + Send + 'static {
10    async fn process_read(&self, request: &[u8]) -> Result<Vec<u8>>;
11    async fn process_write(&self, request: &[u8]) -> Result<(Vec<u8>, Option<Vec<u8>>)>;
12    async fn install_snapshot(&self, snapshot: Option<&[u8]>) -> Result<()>;
13    async fn fold_snapshot(
14        &self,
15        old_snapshot: Option<&[u8]>,
16        requests: Vec<&[u8]>,
17    ) -> Result<Vec<u8>>;
18}
19
20/// The repository of the snapshot resources.
21#[async_trait]
22pub trait SnapshotRepository: Sync + Send + 'static {
23    async fn save_snapshot_stream(&self, st: SnapshotStream, snapshot_index: Index) -> Result<()>;
24    async fn open_snapshot_stream(&self, index: Index) -> Result<SnapshotStream>;
25    async fn delete_snapshot(&self, index: Index) -> Result<()>;
26}
27
28use std::collections::HashMap;
29use std::sync::Arc;
30use std::sync::RwLock;
31
32/// In-memory implementation of `SnapshotRepository`.
33pub struct BytesRepository {
34    resources: Arc<RwLock<HashMap<Index, BytesSnapshot>>>,
35}
36impl BytesRepository {
37    pub fn new() -> Self {
38        Self {
39            resources: Arc::new(RwLock::new(HashMap::new())),
40        }
41    }
42}
43#[async_trait]
44impl SnapshotRepository for BytesRepository {
45    async fn save_snapshot_stream(&self, st: SnapshotStream, snapshot_index: Index) -> Result<()> {
46        let bin = BytesSnapshot::save_snapshot_stream(st).await.unwrap();
47        self.resources.write().unwrap().insert(snapshot_index, bin);
48        Ok(())
49    }
50    async fn open_snapshot_stream(&self, index: Index) -> Result<SnapshotStream> {
51        let bin = self.resources.read().unwrap().get(&index).unwrap().clone();
52        let st = bin.open_snapshot_stream().await;
53        Ok(st)
54    }
55    async fn delete_snapshot(&self, index: Index) -> Result<()> {
56        self.resources.write().unwrap().remove(&index);
57        Ok(())
58    }
59}
60
61use std::path::{Path, PathBuf};
62
63/// Persistent implementation of `SnapshotRepository`.
64pub struct FileRepository {
65    root_dir: PathBuf,
66}
67impl FileRepository {
68    pub fn destroy(root_dir: &Path) -> Result<()> {
69        std::fs::remove_dir_all(root_dir).ok();
70        Ok(())
71    }
72    /// Create the initial state.
73    /// You should call `destory` before calling this function.
74    pub fn create(root_dir: &Path) -> Result<()> {
75        std::fs::create_dir(root_dir)?;
76        Ok(())
77    }
78    pub fn open(root_dir: &Path) -> Result<Self> {
79        Ok(Self {
80            root_dir: root_dir.to_owned(),
81        })
82    }
83    fn snapshot_path(&self, i: Index) -> PathBuf {
84        self.root_dir.join(format!("{}", i))
85    }
86}
87#[async_trait]
88impl SnapshotRepository for FileRepository {
89    async fn save_snapshot_stream(&self, st: SnapshotStream, snapshot_index: Index) -> Result<()> {
90        let path = self.snapshot_path(snapshot_index);
91        FileSnapshot::save_snapshot_stream(st, &path).await?;
92        Ok(())
93    }
94    async fn open_snapshot_stream(&self, index: Index) -> Result<SnapshotStream> {
95        let path = self.snapshot_path(index);
96        let snap = FileSnapshot { path };
97        snap.open_snapshot_stream().await
98    }
99    async fn delete_snapshot(&self, index: Index) -> Result<()> {
100        let path = self.snapshot_path(index);
101        tokio::fs::remove_file(&path).await.ok();
102        Ok(())
103    }
104}
105
106#[cfg(test)]
107mod tests {
108    use super::*;
109    use serial_test::serial;
110    #[test]
111    #[serial]
112    fn test_file_repository() {
113        let path = Path::new("/tmp/lol-test-file-repo");
114        FileRepository::destroy(&path).unwrap();
115        FileRepository::create(&path).unwrap();
116    }
117}
118
119/// ToRaftApp turns an instance of RaftAppSimple into
120/// a RaftApp instance.
121pub struct ToRaftApp {
122    app: Box<dyn RaftAppSimple>,
123    repo: Box<dyn SnapshotRepository>,
124}
125impl ToRaftApp {
126    pub fn new(app: impl RaftAppSimple, store: impl SnapshotRepository) -> Self {
127        Self {
128            app: Box::new(app),
129            repo: Box::new(store),
130        }
131    }
132    async fn save_snapshot(&self, bin: Bytes, idx: Index) -> Result<()> {
133        let bin = BytesSnapshot { contents: bin };
134        let inp = bin.open_snapshot_stream().await;
135        self.repo.save_snapshot_stream(inp, idx).await
136    }
137    async fn get_snapshot(&self, idx: Index) -> Result<Bytes> {
138        let st = self.repo.open_snapshot_stream(idx).await?;
139        let bin = BytesSnapshot::save_snapshot_stream(st).await?;
140        Ok(bin.contents)
141    }
142}
143#[async_trait]
144impl RaftApp for ToRaftApp {
145    async fn process_read(&self, request: &[u8]) -> Result<Vec<u8>> {
146        self.app.process_read(request).await
147    }
148    async fn process_write(
149        &self,
150        request: &[u8],
151        entry_index: Index,
152    ) -> Result<(Vec<u8>, MakeSnapshot)> {
153        let (res, new_snapshot) = self.app.process_write(request).await?;
154        let make_snapshot = match new_snapshot {
155            Some(x) => {
156                let ok = self.save_snapshot(x.into(), entry_index).await.is_ok();
157                if ok {
158                    MakeSnapshot::CopySnapshot
159                } else {
160                    MakeSnapshot::None
161                }
162            }
163            None => MakeSnapshot::None,
164        };
165        Ok((res, make_snapshot))
166    }
167    async fn install_snapshot(&self, snapshot: Option<Index>) -> Result<()> {
168        let snapshot = match snapshot {
169            Some(idx) => Some(self.get_snapshot(idx).await?),
170            None => None,
171        };
172        self.app.install_snapshot(snapshot.as_deref()).await
173    }
174    async fn fold_snapshot(
175        &self,
176        old_snapshot: Option<Index>,
177        requests: Vec<&[u8]>,
178        snapshot_index: Index,
179    ) -> Result<()> {
180        let old_snapshot = match old_snapshot {
181            Some(idx) => Some(self.get_snapshot(idx).await?),
182            None => None,
183        };
184        let new_snapshot = self
185            .app
186            .fold_snapshot(old_snapshot.as_deref(), requests)
187            .await?;
188        self.save_snapshot(new_snapshot.into(), snapshot_index)
189            .await
190    }
191    async fn save_snapshot(&self, st: SnapshotStream, idx: Index) -> Result<()> {
192        let b = BytesSnapshot::save_snapshot_stream(st).await?;
193        self.save_snapshot(b.contents, idx).await
194    }
195    async fn open_snapshot(&self, x: Index) -> Result<SnapshotStream> {
196        self.repo.open_snapshot_stream(x).await
197    }
198    async fn delete_snapshot(&self, idx: Index) -> Result<()> {
199        self.repo.delete_snapshot(idx).await
200    }
201}