1use std::{
2 collections::HashMap,
3 io::{self, SeekFrom},
4 ops::Neg,
5 path::{Path, PathBuf},
6 sync::{Arc, Weak},
7};
8
9use once_cell::sync::Lazy;
10use parking_lot::{Mutex, RwLock};
11
12use super::{FileManager, FileOp, ManagedFile, OpenableFile};
13use crate::{
14 error::Error,
15 io::{File, ManagedFileOpener, OperableFile, PathIds},
16 ErrorKind,
17};
18
19type FileBuffer = Arc<RwLock<Vec<u8>>>;
20
21#[derive(Clone)]
24pub struct MemoryFile {
25 id: Option<u64>,
26 path: PathBuf,
27 buffer: FileBuffer,
28 position: usize,
29}
30
31impl std::fmt::Debug for MemoryFile {
32 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33 let buffer = self.buffer.read();
34 f.debug_struct("MemoryFile")
35 .field("id", &self.id)
36 .field("path", &self.path)
37 .field("buffer", &buffer.len())
38 .field("position", &self.position)
39 .finish()
40 }
41}
42
43type OpenBuffers = Arc<Mutex<HashMap<PathBuf, Weak<RwLock<Vec<u8>>>>>>;
44static OPEN_BUFFERS: Lazy<OpenBuffers> = Lazy::new(Arc::default);
45
46#[allow(clippy::needless_pass_by_value)]
47fn lookup_buffer(
48 path: impl AsRef<std::path::Path> + Send,
49 create_if_not_found: bool,
50) -> Option<Arc<RwLock<Vec<u8>>>> {
51 let mut open_buffers = OPEN_BUFFERS.lock();
52 if let Some(existing_buffer) = open_buffers.get(path.as_ref()).and_then(Weak::upgrade) {
53 Some(existing_buffer)
54 } else if create_if_not_found {
55 let new_buffer = Arc::default();
56 open_buffers.insert(path.as_ref().to_path_buf(), Arc::downgrade(&new_buffer));
57 Some(new_buffer)
58 } else {
59 None
60 }
61}
62
63impl ManagedFile for MemoryFile {
64 type Manager = MemoryFileManager;
65}
66
67#[allow(clippy::cast_possible_truncation)]
68impl super::File for MemoryFile {
69 fn id(&self) -> Option<u64> {
70 self.id
71 }
72
73 fn path(&self) -> &Path {
74 &self.path
75 }
76
77 fn length(&self) -> Result<u64, Error> {
78 let file_buffer = self.buffer.read();
79 Ok(file_buffer.len() as u64)
80 }
81
82 fn close(self) -> Result<(), Error> {
83 Ok(())
84 }
85}
86
87pub struct MemoryFileOpener;
89
90impl ManagedFileOpener<MemoryFile> for MemoryFileOpener {
91 fn open_for_read(
92 &self,
93 path: impl AsRef<std::path::Path> + Send,
94 id: Option<u64>,
95 ) -> Result<MemoryFile, Error> {
96 let path = path.as_ref();
97 Ok(MemoryFile {
98 id,
99 path: path.to_path_buf(),
100 buffer: lookup_buffer(path, true).unwrap(),
101 position: 0,
102 })
103 }
104
105 fn open_for_append(
106 &self,
107 path: impl AsRef<std::path::Path> + Send,
108 id: Option<u64>,
109 ) -> Result<MemoryFile, Error> {
110 let path = path.as_ref();
111 let buffer = lookup_buffer(path, true).unwrap();
112 let position = {
113 let buffer = buffer.read();
114 buffer.len()
115 };
116 Ok(MemoryFile {
117 id,
118 buffer,
119 position,
120 path: path.to_path_buf(),
121 })
122 }
123}
124
125impl std::io::Seek for MemoryFile {
126 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
127 match pos {
128 SeekFrom::Start(position) => self.position = usize::try_from(position).unwrap(),
129 SeekFrom::End(from_end) => {
130 let buffer = self.buffer.read();
131 self.position = if from_end.is_positive() {
132 buffer.len()
133 } else {
134 buffer.len() - usize::try_from(from_end.neg()).unwrap()
135 };
136 }
137 SeekFrom::Current(relative) => {
138 self.position = if relative.is_positive() {
139 self.position + usize::try_from(relative).unwrap()
140 } else {
141 self.position - usize::try_from(relative.neg()).unwrap()
142 }
143 }
144 }
145 Ok(self.position as u64)
146 }
147}
148
149impl std::io::Read for MemoryFile {
150 fn read(&mut self, buffer: &mut [u8]) -> io::Result<usize> {
151 let file_buffer = self.buffer.read();
152
153 let read_end = self.position as usize + buffer.len();
154 if read_end > file_buffer.len() {
155 return Err(io::Error::new(
156 io::ErrorKind::UnexpectedEof,
157 ErrorKind::message("read requested more bytes than available"),
158 ));
159 }
160
161 buffer.copy_from_slice(&file_buffer[self.position..read_end]);
162 self.position = read_end;
163
164 Ok(buffer.len())
165 }
166}
167
168impl std::io::Write for MemoryFile {
169 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
170 let mut file_buffer = self.buffer.write();
171
172 file_buffer.extend_from_slice(buf);
173 self.position += buf.len();
174
175 Ok(buf.len())
176 }
177
178 fn flush(&mut self) -> io::Result<()> {
179 Ok(())
180 }
181}
182
183#[derive(Debug, Default, Clone)]
186pub struct MemoryFileManager {
187 file_ids: PathIds,
188 open_files: Arc<Mutex<HashMap<u64, FileBuffer>>>,
189}
190
191impl MemoryFileManager {
192 fn lookup_file(
193 &self,
194 path: impl AsRef<Path>,
195 create_if_needed: bool,
196 id: Option<u64>,
197 ) -> Result<Option<MemoryFile>, Error> {
198 let path = path.as_ref();
199 let id = match id {
200 Some(id) => id,
201 None => match self.file_ids.file_id_for_path(path, create_if_needed) {
202 Some(id) => id,
203 None => return Ok(None),
204 },
205 };
206 let mut open_files = self.open_files.lock();
207 if let Some(open_file) = open_files.get(&id) {
208 Ok(Some(MemoryFile {
209 id: Some(id),
210 buffer: open_file.clone(),
211 path: path.to_path_buf(),
212 position: 0,
213 }))
214 } else if create_if_needed {
215 let file = MemoryFileOpener.open_for_append(path, Some(id))?;
216 open_files.insert(id, file.buffer.clone());
217 Ok(Some(file))
218 } else {
219 Ok(None)
220 }
221 }
222
223 fn forget_file(&self, path: &Path) -> bool {
224 if let Some(id) = self.file_ids.remove_file_id_for_path(path) {
225 let mut open_files = self.open_files.lock();
226 open_files.remove(&id).is_some()
227 } else {
228 false
229 }
230 }
231}
232
233impl FileManager for MemoryFileManager {
234 type File = MemoryFile;
235 type FileHandle = OpenMemoryFile;
236
237 fn append(&self, path: impl AsRef<Path>) -> Result<Self::FileHandle, Error> {
238 let path = path.as_ref();
239 let id = self.file_ids.file_id_for_path(path, true);
240 self.lookup_file(path, true, id).map(|file| OpenMemoryFile {
241 file: file.unwrap(),
242 manager: self.clone(),
243 })
244 }
245
246 fn read(&self, path: impl AsRef<Path>) -> Result<Self::FileHandle, Error> {
247 self.append(path)
248 }
249
250 fn file_length(&self, path: impl AsRef<Path>) -> Result<u64, Error> {
251 let file = self.lookup_file(path, false, None)?.ok_or_else(|| {
252 ErrorKind::Io(io::Error::new(
253 io::ErrorKind::NotFound,
254 ErrorKind::message("not found"),
255 ))
256 })?;
257 file.length()
258 }
259
260 fn exists(&self, path: impl AsRef<Path>) -> Result<bool, Error> {
261 Ok(self.lookup_file(path, false, None)?.is_some())
262 }
263
264 fn delete(&self, path: impl AsRef<Path>) -> Result<bool, Error> {
265 let path = path.as_ref();
266 {
267 let mut open_buffers = OPEN_BUFFERS.lock();
268 open_buffers.remove(path);
269 }
270 Ok(self.forget_file(path))
271 }
272
273 fn delete_directory(&self, path: impl AsRef<Path>) -> Result<(), Error> {
274 let path = path.as_ref();
275 let removed_ids = self.file_ids.remove_file_ids_for_path_prefix(path);
276 let mut open_files = self.open_files.lock();
277 for id in removed_ids {
278 open_files.remove(&id);
279 }
280
281 Ok(())
282 }
283
284 fn close_handles<F: FnOnce(u64)>(&self, path: impl AsRef<Path>, publish_callback: F) {
285 let path = path.as_ref();
286 self.forget_file(path);
287 let new_id = self.file_ids.file_id_for_path(path, true).unwrap();
288 publish_callback(new_id);
289 }
290}
291
292impl ManagedFileOpener<MemoryFile> for MemoryFileManager {
293 fn open_for_read(
294 &self,
295 path: impl AsRef<Path> + Send,
296 id: Option<u64>,
297 ) -> Result<MemoryFile, Error> {
298 MemoryFileOpener.open_for_read(path, id)
299 }
300
301 fn open_for_append(
302 &self,
303 path: impl AsRef<Path> + Send,
304 id: Option<u64>,
305 ) -> Result<MemoryFile, Error> {
306 MemoryFileOpener.open_for_append(path, id)
307 }
308}
309
310#[derive(Debug)]
312pub struct OpenMemoryFile {
313 file: MemoryFile,
314 manager: MemoryFileManager,
315}
316
317impl OpenableFile<MemoryFile> for OpenMemoryFile {
318 fn id(&self) -> Option<u64> {
319 self.file.id
320 }
321
322 fn replace_with<C: FnOnce(u64)>(
323 self,
324 replacement: MemoryFile,
325 manager: &MemoryFileManager,
326 publish_callback: C,
327 ) -> Result<Self, Error> {
328 let weak_buffer = Arc::downgrade(&replacement.buffer);
329 drop(self.manager.delete(replacement.path()));
330 {
331 let mut open_buffers = OPEN_BUFFERS.lock();
332 open_buffers.insert(self.file.path.clone(), weak_buffer);
333 }
334
335 manager.close_handles(&self.file.path, publish_callback);
336
337 let new_file = manager.append(&self.file.path)?;
338 {
339 assert!(Arc::ptr_eq(&new_file.file.buffer, &replacement.buffer));
340 }
341 Ok(new_file)
342 }
343
344 fn close(self) -> Result<(), Error> {
345 drop(self);
346 Ok(())
347 }
348}
349
350impl OperableFile<MemoryFile> for OpenMemoryFile {
351 fn execute<Output, Op: FileOp<Output>>(&mut self, operator: Op) -> Output {
352 operator.execute(&mut self.file)
353 }
354}