simperby_network/
storage.rs

1use async_trait::async_trait;
2use fs2::FileExt;
3use futures::stream::*;
4use tokio::{fs, io::AsyncWriteExt, task::spawn_blocking};
5
6pub type StorageError = std::io::Error;
7
8/// An abstraction of the synchronized storage backed by the host file system.
9#[async_trait]
10pub trait Storage: Send + Sync + 'static {
11    /// Creates a new and empty directory.
12    /// If the directory already exists, removes and re-creates the directory.
13    async fn create(storage_directory: &str) -> Result<(), StorageError>;
14
15    /// Opens an existing directory, locking it.
16    async fn open(storage_directory: &str) -> Result<Self, StorageError>
17    where
18        Self: Sized;
19
20    /// Shows the list of files.
21    async fn list_files(&self) -> Result<Vec<String>, StorageError>;
22
23    /// Adds the given file to the storage.
24    async fn add_or_overwrite_file(
25        &mut self,
26        name: &str,
27        content: String,
28    ) -> Result<(), StorageError>;
29
30    /// Reads the given file.
31    async fn read_file(&self, name: &str) -> Result<String, StorageError>;
32
33    /// Removes the given file.
34    async fn remove_file(&mut self, name: &str) -> Result<(), StorageError>;
35
36    /// Removes all files.
37    async fn remove_all_files(&mut self) -> Result<(), StorageError>;
38}
39
40pub struct StorageImpl {
41    lock_file: Option<std::fs::File>,
42    path: String,
43}
44
45#[async_trait]
46impl Storage for StorageImpl {
47    async fn create(storage_directory: &str) -> Result<(), StorageError> {
48        let _ = fs::remove_dir_all(storage_directory).await;
49        fs::create_dir_all(storage_directory).await?;
50        fs::File::create(format!("{storage_directory}/lock")).await?;
51        Ok(())
52    }
53
54    async fn open(storage_directory: &str) -> Result<Self, StorageError>
55    where
56        Self: Sized,
57    {
58        let storage_directory_ = storage_directory.to_owned();
59        let file =
60            spawn_blocking(move || std::fs::File::open(format!("{storage_directory_}/lock")))
61                .await??;
62        let file = spawn_blocking(move || {
63            let result = file.lock_exclusive();
64            result.map(|_| file)
65        })
66        .await??;
67        Ok(Self {
68            lock_file: Some(file),
69            path: storage_directory.to_owned(),
70        })
71    }
72
73    async fn list_files(&self) -> Result<Vec<String>, StorageError> {
74        let dir = tokio_stream::wrappers::ReadDirStream::new(fs::read_dir(&self.path).await?);
75        let files = dir
76            .collect::<Vec<_>>()
77            .await
78            .into_iter()
79            .collect::<Result<Vec<_>, _>>()?;
80        Ok(files
81            .into_iter()
82            .map(|file| file.file_name().into_string().unwrap())
83            .filter(|file| file != "lock")
84            .collect())
85    }
86
87    async fn add_or_overwrite_file(
88        &mut self,
89        name: &str,
90        content: String,
91    ) -> Result<(), StorageError> {
92        let mut file = fs::File::create(format!("{}/{}", self.path, name)).await?;
93        file.write_all(content.as_bytes()).await?;
94        // IMPORTANT!
95        file.flush().await?;
96        Ok(())
97    }
98
99    async fn read_file(&self, name: &str) -> Result<String, StorageError> {
100        fs::read_to_string(format!("{}/{}", self.path, name)).await
101    }
102
103    async fn remove_file(&mut self, name: &str) -> Result<(), StorageError> {
104        fs::remove_file(format!("{}/{}", self.path, name)).await
105    }
106
107    async fn remove_all_files(&mut self) -> Result<(), StorageError> {
108        let files = self.list_files().await?;
109        for file in files {
110            self.remove_file(&file).await?;
111        }
112        Ok(())
113    }
114}
115
116impl Drop for StorageImpl {
117    fn drop(&mut self) {
118        let lock_file = self.lock_file.take().unwrap();
119        spawn_blocking(move || {
120            if let Err(e) = lock_file.unlock() {
121                log::error!("failed to unlock storage: {}", e);
122            }
123        });
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    use super::*;
130    use rand::prelude::*;
131    use simperby_core::*;
132
133    fn generate_random_string() -> String {
134        let mut rng = rand::thread_rng();
135        let s1: u128 = rng.gen();
136        let s2: u128 = rng.gen();
137        Hash256::hash(format!("{s1}{s2}").as_bytes()).to_string()[0..16].to_owned()
138    }
139
140    fn gerenate_random_storage_directory() -> String {
141        let temp_dir = std::env::temp_dir();
142        format!(
143            "{}/{}",
144            temp_dir.to_str().unwrap(),
145            generate_random_string()
146        )
147    }
148
149    #[tokio::test]
150    async fn simple1() {
151        let dir = gerenate_random_storage_directory();
152        StorageImpl::create(&dir).await.unwrap();
153        let mut storage = StorageImpl::open(&dir).await.unwrap();
154        for _ in 0..10 {
155            let name = generate_random_string();
156            let content = generate_random_string();
157            storage
158                .add_or_overwrite_file(&name, content.clone())
159                .await
160                .unwrap();
161            assert_eq!(storage.read_file(&name).await.unwrap(), content);
162        }
163    }
164
165    #[tokio::test]
166    async fn never_interrupted() {
167        let dir = gerenate_random_storage_directory();
168        StorageImpl::create(&dir).await.unwrap();
169        let mut storage = StorageImpl::open(&dir).await.unwrap();
170        let mut tasks = Vec::new();
171        for _ in 0..10 {
172            let name = generate_random_string();
173            let content = generate_random_string();
174            storage
175                .add_or_overwrite_file(&name, content.clone())
176                .await
177                .unwrap();
178        }
179        for _ in 0..100 {
180            let dir_ = dir.clone();
181            tasks.push(tokio::spawn(async move {
182                let mut storage = StorageImpl::open(&dir_).await.unwrap();
183                storage.remove_all_files().await.unwrap();
184            }))
185        }
186        tokio::time::sleep(std::time::Duration::from_millis(2000)).await;
187        // assert that files are not yet removed
188        assert_eq!(storage.list_files().await.unwrap().len(), 10);
189        drop(storage);
190        futures::future::join_all(tasks)
191            .await
192            .into_iter()
193            .collect::<Result<Vec<_>, _>>()
194            .unwrap();
195        let storage = StorageImpl::open(&dir).await.unwrap();
196        // assert that files are removed
197        assert_eq!(storage.list_files().await.unwrap().len(), 0);
198    }
199}