1use crate::snapshot::{BytesSnapshot, FileSnapshot};
2use crate::{Index, MakeSnapshot, RaftApp, SnapshotStream};
3use ::bytes::Bytes;
4use anyhow::Result;
5use async_trait::async_trait;
6
7#[async_trait]
9pub trait RaftAppSimple: Sync + Send + 'static {
10 async fn process_read(&self, request: &[u8]) -> Result<Vec<u8>>;
11 async fn process_write(&self, request: &[u8]) -> Result<(Vec<u8>, Option<Vec<u8>>)>;
12 async fn install_snapshot(&self, snapshot: Option<&[u8]>) -> Result<()>;
13 async fn fold_snapshot(
14 &self,
15 old_snapshot: Option<&[u8]>,
16 requests: Vec<&[u8]>,
17 ) -> Result<Vec<u8>>;
18}
19
20#[async_trait]
22pub trait SnapshotRepository: Sync + Send + 'static {
23 async fn save_snapshot_stream(&self, st: SnapshotStream, snapshot_index: Index) -> Result<()>;
24 async fn open_snapshot_stream(&self, index: Index) -> Result<SnapshotStream>;
25 async fn delete_snapshot(&self, index: Index) -> Result<()>;
26}
27
28use std::collections::HashMap;
29use std::sync::Arc;
30use std::sync::RwLock;
31
32pub struct BytesRepository {
34 resources: Arc<RwLock<HashMap<Index, BytesSnapshot>>>,
35}
36impl BytesRepository {
37 pub fn new() -> Self {
38 Self {
39 resources: Arc::new(RwLock::new(HashMap::new())),
40 }
41 }
42}
43#[async_trait]
44impl SnapshotRepository for BytesRepository {
45 async fn save_snapshot_stream(&self, st: SnapshotStream, snapshot_index: Index) -> Result<()> {
46 let bin = BytesSnapshot::save_snapshot_stream(st).await.unwrap();
47 self.resources.write().unwrap().insert(snapshot_index, bin);
48 Ok(())
49 }
50 async fn open_snapshot_stream(&self, index: Index) -> Result<SnapshotStream> {
51 let bin = self.resources.read().unwrap().get(&index).unwrap().clone();
52 let st = bin.open_snapshot_stream().await;
53 Ok(st)
54 }
55 async fn delete_snapshot(&self, index: Index) -> Result<()> {
56 self.resources.write().unwrap().remove(&index);
57 Ok(())
58 }
59}
60
61use std::path::{Path, PathBuf};
62
63pub struct FileRepository {
65 root_dir: PathBuf,
66}
67impl FileRepository {
68 pub fn destroy(root_dir: &Path) -> Result<()> {
69 std::fs::remove_dir_all(root_dir).ok();
70 Ok(())
71 }
72 pub fn create(root_dir: &Path) -> Result<()> {
75 std::fs::create_dir(root_dir)?;
76 Ok(())
77 }
78 pub fn open(root_dir: &Path) -> Result<Self> {
79 Ok(Self {
80 root_dir: root_dir.to_owned(),
81 })
82 }
83 fn snapshot_path(&self, i: Index) -> PathBuf {
84 self.root_dir.join(format!("{}", i))
85 }
86}
87#[async_trait]
88impl SnapshotRepository for FileRepository {
89 async fn save_snapshot_stream(&self, st: SnapshotStream, snapshot_index: Index) -> Result<()> {
90 let path = self.snapshot_path(snapshot_index);
91 FileSnapshot::save_snapshot_stream(st, &path).await?;
92 Ok(())
93 }
94 async fn open_snapshot_stream(&self, index: Index) -> Result<SnapshotStream> {
95 let path = self.snapshot_path(index);
96 let snap = FileSnapshot { path };
97 snap.open_snapshot_stream().await
98 }
99 async fn delete_snapshot(&self, index: Index) -> Result<()> {
100 let path = self.snapshot_path(index);
101 tokio::fs::remove_file(&path).await.ok();
102 Ok(())
103 }
104}
105
106#[cfg(test)]
107mod tests {
108 use super::*;
109 use serial_test::serial;
110 #[test]
111 #[serial]
112 fn test_file_repository() {
113 let path = Path::new("/tmp/lol-test-file-repo");
114 FileRepository::destroy(&path).unwrap();
115 FileRepository::create(&path).unwrap();
116 }
117}
118
119pub struct ToRaftApp {
122 app: Box<dyn RaftAppSimple>,
123 repo: Box<dyn SnapshotRepository>,
124}
125impl ToRaftApp {
126 pub fn new(app: impl RaftAppSimple, store: impl SnapshotRepository) -> Self {
127 Self {
128 app: Box::new(app),
129 repo: Box::new(store),
130 }
131 }
132 async fn save_snapshot(&self, bin: Bytes, idx: Index) -> Result<()> {
133 let bin = BytesSnapshot { contents: bin };
134 let inp = bin.open_snapshot_stream().await;
135 self.repo.save_snapshot_stream(inp, idx).await
136 }
137 async fn get_snapshot(&self, idx: Index) -> Result<Bytes> {
138 let st = self.repo.open_snapshot_stream(idx).await?;
139 let bin = BytesSnapshot::save_snapshot_stream(st).await?;
140 Ok(bin.contents)
141 }
142}
143#[async_trait]
144impl RaftApp for ToRaftApp {
145 async fn process_read(&self, request: &[u8]) -> Result<Vec<u8>> {
146 self.app.process_read(request).await
147 }
148 async fn process_write(
149 &self,
150 request: &[u8],
151 entry_index: Index,
152 ) -> Result<(Vec<u8>, MakeSnapshot)> {
153 let (res, new_snapshot) = self.app.process_write(request).await?;
154 let make_snapshot = match new_snapshot {
155 Some(x) => {
156 let ok = self.save_snapshot(x.into(), entry_index).await.is_ok();
157 if ok {
158 MakeSnapshot::CopySnapshot
159 } else {
160 MakeSnapshot::None
161 }
162 }
163 None => MakeSnapshot::None,
164 };
165 Ok((res, make_snapshot))
166 }
167 async fn install_snapshot(&self, snapshot: Option<Index>) -> Result<()> {
168 let snapshot = match snapshot {
169 Some(idx) => Some(self.get_snapshot(idx).await?),
170 None => None,
171 };
172 self.app.install_snapshot(snapshot.as_deref()).await
173 }
174 async fn fold_snapshot(
175 &self,
176 old_snapshot: Option<Index>,
177 requests: Vec<&[u8]>,
178 snapshot_index: Index,
179 ) -> Result<()> {
180 let old_snapshot = match old_snapshot {
181 Some(idx) => Some(self.get_snapshot(idx).await?),
182 None => None,
183 };
184 let new_snapshot = self
185 .app
186 .fold_snapshot(old_snapshot.as_deref(), requests)
187 .await?;
188 self.save_snapshot(new_snapshot.into(), snapshot_index)
189 .await
190 }
191 async fn save_snapshot(&self, st: SnapshotStream, idx: Index) -> Result<()> {
192 let b = BytesSnapshot::save_snapshot_stream(st).await?;
193 self.save_snapshot(b.contents, idx).await
194 }
195 async fn open_snapshot(&self, x: Index) -> Result<SnapshotStream> {
196 self.repo.open_snapshot_stream(x).await
197 }
198 async fn delete_snapshot(&self, idx: Index) -> Result<()> {
199 self.repo.delete_snapshot(idx).await
200 }
201}