simperby_network/
storage.rs1use async_trait::async_trait;
2use fs2::FileExt;
3use futures::stream::*;
4use tokio::{fs, io::AsyncWriteExt, task::spawn_blocking};
5
6pub type StorageError = std::io::Error;
7
8#[async_trait]
10pub trait Storage: Send + Sync + 'static {
11 async fn create(storage_directory: &str) -> Result<(), StorageError>;
14
15 async fn open(storage_directory: &str) -> Result<Self, StorageError>
17 where
18 Self: Sized;
19
20 async fn list_files(&self) -> Result<Vec<String>, StorageError>;
22
23 async fn add_or_overwrite_file(
25 &mut self,
26 name: &str,
27 content: String,
28 ) -> Result<(), StorageError>;
29
30 async fn read_file(&self, name: &str) -> Result<String, StorageError>;
32
33 async fn remove_file(&mut self, name: &str) -> Result<(), StorageError>;
35
36 async fn remove_all_files(&mut self) -> Result<(), StorageError>;
38}
39
40pub struct StorageImpl {
41 lock_file: Option<std::fs::File>,
42 path: String,
43}
44
45#[async_trait]
46impl Storage for StorageImpl {
47 async fn create(storage_directory: &str) -> Result<(), StorageError> {
48 let _ = fs::remove_dir_all(storage_directory).await;
49 fs::create_dir_all(storage_directory).await?;
50 fs::File::create(format!("{storage_directory}/lock")).await?;
51 Ok(())
52 }
53
54 async fn open(storage_directory: &str) -> Result<Self, StorageError>
55 where
56 Self: Sized,
57 {
58 let storage_directory_ = storage_directory.to_owned();
59 let file =
60 spawn_blocking(move || std::fs::File::open(format!("{storage_directory_}/lock")))
61 .await??;
62 let file = spawn_blocking(move || {
63 let result = file.lock_exclusive();
64 result.map(|_| file)
65 })
66 .await??;
67 Ok(Self {
68 lock_file: Some(file),
69 path: storage_directory.to_owned(),
70 })
71 }
72
73 async fn list_files(&self) -> Result<Vec<String>, StorageError> {
74 let dir = tokio_stream::wrappers::ReadDirStream::new(fs::read_dir(&self.path).await?);
75 let files = dir
76 .collect::<Vec<_>>()
77 .await
78 .into_iter()
79 .collect::<Result<Vec<_>, _>>()?;
80 Ok(files
81 .into_iter()
82 .map(|file| file.file_name().into_string().unwrap())
83 .filter(|file| file != "lock")
84 .collect())
85 }
86
87 async fn add_or_overwrite_file(
88 &mut self,
89 name: &str,
90 content: String,
91 ) -> Result<(), StorageError> {
92 let mut file = fs::File::create(format!("{}/{}", self.path, name)).await?;
93 file.write_all(content.as_bytes()).await?;
94 file.flush().await?;
96 Ok(())
97 }
98
99 async fn read_file(&self, name: &str) -> Result<String, StorageError> {
100 fs::read_to_string(format!("{}/{}", self.path, name)).await
101 }
102
103 async fn remove_file(&mut self, name: &str) -> Result<(), StorageError> {
104 fs::remove_file(format!("{}/{}", self.path, name)).await
105 }
106
107 async fn remove_all_files(&mut self) -> Result<(), StorageError> {
108 let files = self.list_files().await?;
109 for file in files {
110 self.remove_file(&file).await?;
111 }
112 Ok(())
113 }
114}
115
116impl Drop for StorageImpl {
117 fn drop(&mut self) {
118 let lock_file = self.lock_file.take().unwrap();
119 spawn_blocking(move || {
120 if let Err(e) = lock_file.unlock() {
121 log::error!("failed to unlock storage: {}", e);
122 }
123 });
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use super::*;
130 use rand::prelude::*;
131 use simperby_core::*;
132
133 fn generate_random_string() -> String {
134 let mut rng = rand::thread_rng();
135 let s1: u128 = rng.gen();
136 let s2: u128 = rng.gen();
137 Hash256::hash(format!("{s1}{s2}").as_bytes()).to_string()[0..16].to_owned()
138 }
139
140 fn gerenate_random_storage_directory() -> String {
141 let temp_dir = std::env::temp_dir();
142 format!(
143 "{}/{}",
144 temp_dir.to_str().unwrap(),
145 generate_random_string()
146 )
147 }
148
149 #[tokio::test]
150 async fn simple1() {
151 let dir = gerenate_random_storage_directory();
152 StorageImpl::create(&dir).await.unwrap();
153 let mut storage = StorageImpl::open(&dir).await.unwrap();
154 for _ in 0..10 {
155 let name = generate_random_string();
156 let content = generate_random_string();
157 storage
158 .add_or_overwrite_file(&name, content.clone())
159 .await
160 .unwrap();
161 assert_eq!(storage.read_file(&name).await.unwrap(), content);
162 }
163 }
164
165 #[tokio::test]
166 async fn never_interrupted() {
167 let dir = gerenate_random_storage_directory();
168 StorageImpl::create(&dir).await.unwrap();
169 let mut storage = StorageImpl::open(&dir).await.unwrap();
170 let mut tasks = Vec::new();
171 for _ in 0..10 {
172 let name = generate_random_string();
173 let content = generate_random_string();
174 storage
175 .add_or_overwrite_file(&name, content.clone())
176 .await
177 .unwrap();
178 }
179 for _ in 0..100 {
180 let dir_ = dir.clone();
181 tasks.push(tokio::spawn(async move {
182 let mut storage = StorageImpl::open(&dir_).await.unwrap();
183 storage.remove_all_files().await.unwrap();
184 }))
185 }
186 tokio::time::sleep(std::time::Duration::from_millis(2000)).await;
187 assert_eq!(storage.list_files().await.unwrap().len(), 10);
189 drop(storage);
190 futures::future::join_all(tasks)
191 .await
192 .into_iter()
193 .collect::<Result<Vec<_>, _>>()
194 .unwrap();
195 let storage = StorageImpl::open(&dir).await.unwrap();
196 assert_eq!(storage.list_files().await.unwrap().len(), 0);
198 }
199}