Skip to main content

strontium_core/
fs.rs

1use std::collections::{BTreeMap, VecDeque};
2use std::future::Future;
3use std::io::ErrorKind;
4use std::path::{Path, PathBuf};
5use std::pin::Pin;
6use std::sync::{Arc, Mutex};
7use std::time::Duration;
8
9pub trait AsyncFileSystem: Clone + Send + Sync + 'static {
10    fn read(
11        &self,
12        path: &Path,
13    ) -> Pin<Box<dyn Future<Output = std::io::Result<Vec<u8>>> + Send + 'static>>;
14
15    fn write(
16        &self,
17        path: &Path,
18        contents: Vec<u8>,
19    ) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send + 'static>>;
20
21    fn remove_file(
22        &self,
23        path: &Path,
24    ) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send + 'static>>;
25
26    fn create_dir_all(
27        &self,
28        path: &Path,
29    ) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send + 'static>>;
30
31    fn exists(&self, path: &Path) -> Pin<Box<dyn Future<Output = bool> + Send + 'static>>;
32}
33
34#[derive(Clone, Default)]
35pub struct Vfs {
36    state: Arc<Mutex<VfsState>>,
37}
38
39#[derive(Default)]
40struct VfsState {
41    files: BTreeMap<PathBuf, Vec<u8>>,
42    dirs: BTreeMap<PathBuf, ()>,
43    write_faults: VecDeque<WriteFault>,
44}
45
46#[derive(Debug, Clone)]
47pub enum WriteFault {
48    DiskFull,
49    PartialWrite(usize),
50    Latency(Duration),
51}
52
53impl Vfs {
54    pub fn new() -> Self {
55        Self::default()
56    }
57
58    pub fn push_write_fault(&self, fault: WriteFault) {
59        self.state
60            .lock()
61            .expect("sim vfs")
62            .write_faults
63            .push_back(fault);
64    }
65
66    pub fn clear_write_faults(&self) {
67        self.state.lock().expect("sim vfs").write_faults.clear();
68    }
69}
70
71impl AsyncFileSystem for Vfs {
72    fn read(
73        &self,
74        path: &Path,
75    ) -> Pin<Box<dyn Future<Output = std::io::Result<Vec<u8>>> + Send + 'static>> {
76        let state = self.state.lock().expect("sim vfs");
77        let res = state
78            .files
79            .get(path)
80            .cloned()
81            .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::NotFound, "file not found"));
82        Box::pin(async move { res })
83    }
84
85    fn write(
86        &self,
87        path: &Path,
88        contents: Vec<u8>,
89    ) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send + 'static>> {
90        let mut state = self.state.lock().expect("sim vfs");
91        if let Some(parent) = path.parent() {
92            if !parent.as_os_str().is_empty() && !state.dirs.contains_key(parent) {
93                return Box::pin(async move {
94                    Err(std::io::Error::new(
95                        std::io::ErrorKind::NotFound,
96                        "parent directory not found",
97                    ))
98                });
99            }
100        }
101
102        match state.write_faults.pop_front() {
103            Some(WriteFault::DiskFull) => {
104                return Box::pin(async move {
105                    Err(std::io::Error::new(
106                        ErrorKind::StorageFull,
107                        "simulated disk full",
108                    ))
109                });
110            }
111            Some(WriteFault::PartialWrite(n)) => {
112                let clipped = n.min(contents.len());
113                state
114                    .files
115                    .insert(path.to_path_buf(), contents[..clipped].to_vec());
116                return Box::pin(async move {
117                    Err(std::io::Error::new(
118                        ErrorKind::WriteZero,
119                        "simulated partial write",
120                    ))
121                });
122            }
123            Some(WriteFault::Latency(delay)) => {
124                drop(state);
125                std::thread::sleep(delay);
126                let mut state = self.state.lock().expect("sim vfs");
127                state.files.insert(path.to_path_buf(), contents);
128                return Box::pin(async move { Ok(()) });
129            }
130            None => {}
131        }
132
133        state.files.insert(path.to_path_buf(), contents);
134        Box::pin(async move { Ok(()) })
135    }
136
137    fn remove_file(
138        &self,
139        path: &Path,
140    ) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send + 'static>> {
141        let mut state = self.state.lock().expect("sim vfs");
142        state.files.remove(path);
143        Box::pin(async move { Ok(()) })
144    }
145
146    fn create_dir_all(
147        &self,
148        path: &Path,
149    ) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send + 'static>> {
150        let mut state = self.state.lock().expect("sim vfs");
151        let mut curr = PathBuf::new();
152        for component in path.components() {
153            curr.push(component);
154            state.dirs.insert(curr.clone(), ());
155        }
156        Box::pin(async move { Ok(()) })
157    }
158
159    fn exists(&self, path: &Path) -> Pin<Box<dyn Future<Output = bool> + Send + 'static>> {
160        let state = self.state.lock().expect("sim vfs");
161        let res = state.files.contains_key(path) || state.dirs.contains_key(path);
162        Box::pin(async move { res })
163    }
164}
165
166#[cfg(test)]
167mod tests {
168    use super::{AsyncFileSystem, Vfs, WriteFault};
169    use std::future::Future;
170    use std::io::ErrorKind;
171    use std::path::Path;
172    use std::time::Duration;
173
174    fn block_on<T>(fut: impl Future<Output = T>) -> T {
175        let rt = tokio::runtime::Builder::new_current_thread()
176            .enable_all()
177            .build()
178            .expect("build current-thread runtime");
179        rt.block_on(fut)
180    }
181
182    #[test]
183    fn sim_vfs_partial_write_fault_persists_truncated_content() {
184        let fs = Vfs::new();
185        let dir = Path::new("/raft");
186        let file = Path::new("/raft/log-1");
187
188        block_on(fs.create_dir_all(dir)).expect("create_dir_all");
189        fs.push_write_fault(WriteFault::PartialWrite(4));
190        let err = block_on(fs.write(file, b"abcdef".to_vec())).expect_err("partial write fault");
191        assert_eq!(err.kind(), ErrorKind::WriteZero);
192
193        let got = block_on(fs.read(file)).expect("read file");
194        assert_eq!(got, b"abcd".to_vec());
195    }
196
197    #[test]
198    fn sim_vfs_disk_full_fault_leaves_existing_content_unchanged() {
199        let fs = Vfs::new();
200        let dir = Path::new("/raft");
201        let file = Path::new("/raft/snapshot");
202
203        block_on(fs.create_dir_all(dir)).expect("create_dir_all");
204        block_on(fs.write(file, b"v1".to_vec())).expect("initial write");
205
206        fs.push_write_fault(WriteFault::DiskFull);
207        let err = block_on(fs.write(file, b"v2".to_vec())).expect_err("disk full fault");
208        assert_eq!(err.kind(), ErrorKind::StorageFull);
209
210        let got = block_on(fs.read(file)).expect("read existing file");
211        assert_eq!(got, b"v1".to_vec());
212    }
213
214    #[test]
215    fn sim_vfs_latency_fault_delays_and_persists_write() {
216        let fs = Vfs::new();
217        let dir = Path::new("/raft");
218        let file = Path::new("/raft/latency-log");
219
220        block_on(fs.create_dir_all(dir)).expect("create_dir_all");
221        fs.push_write_fault(WriteFault::Latency(Duration::from_millis(20)));
222
223        let started = std::time::Instant::now();
224        block_on(fs.write(file, b"entry".to_vec())).expect("latency write");
225        let elapsed = started.elapsed();
226
227        assert!(elapsed >= Duration::from_millis(15));
228        let got = block_on(fs.read(file)).expect("read file");
229        assert_eq!(got, b"entry".to_vec());
230    }
231}