1use std::collections::{BTreeMap, VecDeque};
2use std::future::Future;
3use std::io::ErrorKind;
4use std::path::{Path, PathBuf};
5use std::pin::Pin;
6use std::sync::{Arc, Mutex};
7use std::time::Duration;
8
9pub trait AsyncFileSystem: Clone + Send + Sync + 'static {
10 fn read(
11 &self,
12 path: &Path,
13 ) -> Pin<Box<dyn Future<Output = std::io::Result<Vec<u8>>> + Send + 'static>>;
14
15 fn write(
16 &self,
17 path: &Path,
18 contents: Vec<u8>,
19 ) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send + 'static>>;
20
21 fn remove_file(
22 &self,
23 path: &Path,
24 ) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send + 'static>>;
25
26 fn create_dir_all(
27 &self,
28 path: &Path,
29 ) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send + 'static>>;
30
31 fn exists(&self, path: &Path) -> Pin<Box<dyn Future<Output = bool> + Send + 'static>>;
32}
33
34#[derive(Clone, Default)]
35pub struct Vfs {
36 state: Arc<Mutex<VfsState>>,
37}
38
39#[derive(Default)]
40struct VfsState {
41 files: BTreeMap<PathBuf, Vec<u8>>,
42 dirs: BTreeMap<PathBuf, ()>,
43 write_faults: VecDeque<WriteFault>,
44}
45
46#[derive(Debug, Clone)]
47pub enum WriteFault {
48 DiskFull,
49 PartialWrite(usize),
50 Latency(Duration),
51}
52
53impl Vfs {
54 pub fn new() -> Self {
55 Self::default()
56 }
57
58 pub fn push_write_fault(&self, fault: WriteFault) {
59 self.state
60 .lock()
61 .expect("sim vfs")
62 .write_faults
63 .push_back(fault);
64 }
65
66 pub fn clear_write_faults(&self) {
67 self.state.lock().expect("sim vfs").write_faults.clear();
68 }
69}
70
71impl AsyncFileSystem for Vfs {
72 fn read(
73 &self,
74 path: &Path,
75 ) -> Pin<Box<dyn Future<Output = std::io::Result<Vec<u8>>> + Send + 'static>> {
76 let state = self.state.lock().expect("sim vfs");
77 let res = state
78 .files
79 .get(path)
80 .cloned()
81 .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::NotFound, "file not found"));
82 Box::pin(async move { res })
83 }
84
85 fn write(
86 &self,
87 path: &Path,
88 contents: Vec<u8>,
89 ) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send + 'static>> {
90 let mut state = self.state.lock().expect("sim vfs");
91 if let Some(parent) = path.parent() {
92 if !parent.as_os_str().is_empty() && !state.dirs.contains_key(parent) {
93 return Box::pin(async move {
94 Err(std::io::Error::new(
95 std::io::ErrorKind::NotFound,
96 "parent directory not found",
97 ))
98 });
99 }
100 }
101
102 match state.write_faults.pop_front() {
103 Some(WriteFault::DiskFull) => {
104 return Box::pin(async move {
105 Err(std::io::Error::new(
106 ErrorKind::StorageFull,
107 "simulated disk full",
108 ))
109 });
110 }
111 Some(WriteFault::PartialWrite(n)) => {
112 let clipped = n.min(contents.len());
113 state
114 .files
115 .insert(path.to_path_buf(), contents[..clipped].to_vec());
116 return Box::pin(async move {
117 Err(std::io::Error::new(
118 ErrorKind::WriteZero,
119 "simulated partial write",
120 ))
121 });
122 }
123 Some(WriteFault::Latency(delay)) => {
124 drop(state);
125 std::thread::sleep(delay);
126 let mut state = self.state.lock().expect("sim vfs");
127 state.files.insert(path.to_path_buf(), contents);
128 return Box::pin(async move { Ok(()) });
129 }
130 None => {}
131 }
132
133 state.files.insert(path.to_path_buf(), contents);
134 Box::pin(async move { Ok(()) })
135 }
136
137 fn remove_file(
138 &self,
139 path: &Path,
140 ) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send + 'static>> {
141 let mut state = self.state.lock().expect("sim vfs");
142 state.files.remove(path);
143 Box::pin(async move { Ok(()) })
144 }
145
146 fn create_dir_all(
147 &self,
148 path: &Path,
149 ) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send + 'static>> {
150 let mut state = self.state.lock().expect("sim vfs");
151 let mut curr = PathBuf::new();
152 for component in path.components() {
153 curr.push(component);
154 state.dirs.insert(curr.clone(), ());
155 }
156 Box::pin(async move { Ok(()) })
157 }
158
159 fn exists(&self, path: &Path) -> Pin<Box<dyn Future<Output = bool> + Send + 'static>> {
160 let state = self.state.lock().expect("sim vfs");
161 let res = state.files.contains_key(path) || state.dirs.contains_key(path);
162 Box::pin(async move { res })
163 }
164}
165
166#[cfg(test)]
167mod tests {
168 use super::{AsyncFileSystem, Vfs, WriteFault};
169 use std::future::Future;
170 use std::io::ErrorKind;
171 use std::path::Path;
172 use std::time::Duration;
173
174 fn block_on<T>(fut: impl Future<Output = T>) -> T {
175 let rt = tokio::runtime::Builder::new_current_thread()
176 .enable_all()
177 .build()
178 .expect("build current-thread runtime");
179 rt.block_on(fut)
180 }
181
182 #[test]
183 fn sim_vfs_partial_write_fault_persists_truncated_content() {
184 let fs = Vfs::new();
185 let dir = Path::new("/raft");
186 let file = Path::new("/raft/log-1");
187
188 block_on(fs.create_dir_all(dir)).expect("create_dir_all");
189 fs.push_write_fault(WriteFault::PartialWrite(4));
190 let err = block_on(fs.write(file, b"abcdef".to_vec())).expect_err("partial write fault");
191 assert_eq!(err.kind(), ErrorKind::WriteZero);
192
193 let got = block_on(fs.read(file)).expect("read file");
194 assert_eq!(got, b"abcd".to_vec());
195 }
196
197 #[test]
198 fn sim_vfs_disk_full_fault_leaves_existing_content_unchanged() {
199 let fs = Vfs::new();
200 let dir = Path::new("/raft");
201 let file = Path::new("/raft/snapshot");
202
203 block_on(fs.create_dir_all(dir)).expect("create_dir_all");
204 block_on(fs.write(file, b"v1".to_vec())).expect("initial write");
205
206 fs.push_write_fault(WriteFault::DiskFull);
207 let err = block_on(fs.write(file, b"v2".to_vec())).expect_err("disk full fault");
208 assert_eq!(err.kind(), ErrorKind::StorageFull);
209
210 let got = block_on(fs.read(file)).expect("read existing file");
211 assert_eq!(got, b"v1".to_vec());
212 }
213
214 #[test]
215 fn sim_vfs_latency_fault_delays_and_persists_write() {
216 let fs = Vfs::new();
217 let dir = Path::new("/raft");
218 let file = Path::new("/raft/latency-log");
219
220 block_on(fs.create_dir_all(dir)).expect("create_dir_all");
221 fs.push_write_fault(WriteFault::Latency(Duration::from_millis(20)));
222
223 let started = std::time::Instant::now();
224 block_on(fs.write(file, b"entry".to_vec())).expect("latency write");
225 let elapsed = started.elapsed();
226
227 assert!(elapsed >= Duration::from_millis(15));
228 let got = block_on(fs.read(file)).expect("read file");
229 assert_eq!(got, b"entry".to_vec());
230 }
231}