nebari/io/
memory.rs

1use std::{
2    collections::HashMap,
3    io::{self, SeekFrom},
4    ops::Neg,
5    path::{Path, PathBuf},
6    sync::{Arc, Weak},
7};
8
9use once_cell::sync::Lazy;
10use parking_lot::{Mutex, RwLock};
11
12use super::{FileManager, FileOp, ManagedFile, OpenableFile};
13use crate::{
14    error::Error,
15    io::{File, ManagedFileOpener, OperableFile, PathIds},
16    ErrorKind,
17};
18
19type FileBuffer = Arc<RwLock<Vec<u8>>>;
20
21/// A fake "file" represented by an in-memory buffer. This should only be used
22/// in testing, as this database format is not optimized for memory efficiency.
23#[derive(Clone)]
24pub struct MemoryFile {
25    id: Option<u64>,
26    path: PathBuf,
27    buffer: FileBuffer,
28    position: usize,
29}
30
31impl std::fmt::Debug for MemoryFile {
32    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33        let buffer = self.buffer.read();
34        f.debug_struct("MemoryFile")
35            .field("id", &self.id)
36            .field("path", &self.path)
37            .field("buffer", &buffer.len())
38            .field("position", &self.position)
39            .finish()
40    }
41}
42
43type OpenBuffers = Arc<Mutex<HashMap<PathBuf, Weak<RwLock<Vec<u8>>>>>>;
44static OPEN_BUFFERS: Lazy<OpenBuffers> = Lazy::new(Arc::default);
45
46#[allow(clippy::needless_pass_by_value)]
47fn lookup_buffer(
48    path: impl AsRef<std::path::Path> + Send,
49    create_if_not_found: bool,
50) -> Option<Arc<RwLock<Vec<u8>>>> {
51    let mut open_buffers = OPEN_BUFFERS.lock();
52    if let Some(existing_buffer) = open_buffers.get(path.as_ref()).and_then(Weak::upgrade) {
53        Some(existing_buffer)
54    } else if create_if_not_found {
55        let new_buffer = Arc::default();
56        open_buffers.insert(path.as_ref().to_path_buf(), Arc::downgrade(&new_buffer));
57        Some(new_buffer)
58    } else {
59        None
60    }
61}
62
63impl ManagedFile for MemoryFile {
64    type Manager = MemoryFileManager;
65}
66
67#[allow(clippy::cast_possible_truncation)]
68impl super::File for MemoryFile {
69    fn id(&self) -> Option<u64> {
70        self.id
71    }
72
73    fn path(&self) -> &Path {
74        &self.path
75    }
76
77    fn length(&self) -> Result<u64, Error> {
78        let file_buffer = self.buffer.read();
79        Ok(file_buffer.len() as u64)
80    }
81
82    fn close(self) -> Result<(), Error> {
83        Ok(())
84    }
85}
86
87/// A [`ManagedFileOpener`] implementation that produces [`MemoryFile`]s.
88pub struct MemoryFileOpener;
89
90impl ManagedFileOpener<MemoryFile> for MemoryFileOpener {
91    fn open_for_read(
92        &self,
93        path: impl AsRef<std::path::Path> + Send,
94        id: Option<u64>,
95    ) -> Result<MemoryFile, Error> {
96        let path = path.as_ref();
97        Ok(MemoryFile {
98            id,
99            path: path.to_path_buf(),
100            buffer: lookup_buffer(path, true).unwrap(),
101            position: 0,
102        })
103    }
104
105    fn open_for_append(
106        &self,
107        path: impl AsRef<std::path::Path> + Send,
108        id: Option<u64>,
109    ) -> Result<MemoryFile, Error> {
110        let path = path.as_ref();
111        let buffer = lookup_buffer(path, true).unwrap();
112        let position = {
113            let buffer = buffer.read();
114            buffer.len()
115        };
116        Ok(MemoryFile {
117            id,
118            buffer,
119            position,
120            path: path.to_path_buf(),
121        })
122    }
123}
124
125impl std::io::Seek for MemoryFile {
126    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
127        match pos {
128            SeekFrom::Start(position) => self.position = usize::try_from(position).unwrap(),
129            SeekFrom::End(from_end) => {
130                let buffer = self.buffer.read();
131                self.position = if from_end.is_positive() {
132                    buffer.len()
133                } else {
134                    buffer.len() - usize::try_from(from_end.neg()).unwrap()
135                };
136            }
137            SeekFrom::Current(relative) => {
138                self.position = if relative.is_positive() {
139                    self.position + usize::try_from(relative).unwrap()
140                } else {
141                    self.position - usize::try_from(relative.neg()).unwrap()
142                }
143            }
144        }
145        Ok(self.position as u64)
146    }
147}
148
149impl std::io::Read for MemoryFile {
150    fn read(&mut self, buffer: &mut [u8]) -> io::Result<usize> {
151        let file_buffer = self.buffer.read();
152
153        let read_end = self.position as usize + buffer.len();
154        if read_end > file_buffer.len() {
155            return Err(io::Error::new(
156                io::ErrorKind::UnexpectedEof,
157                ErrorKind::message("read requested more bytes than available"),
158            ));
159        }
160
161        buffer.copy_from_slice(&file_buffer[self.position..read_end]);
162        self.position = read_end;
163
164        Ok(buffer.len())
165    }
166}
167
168impl std::io::Write for MemoryFile {
169    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
170        let mut file_buffer = self.buffer.write();
171
172        file_buffer.extend_from_slice(buf);
173        self.position += buf.len();
174
175        Ok(buf.len())
176    }
177
178    fn flush(&mut self) -> io::Result<()> {
179        Ok(())
180    }
181}
182
183/// The [`FileManager`] implementation for [`MemoryFile`]. Simulates a
184/// persistent in-memory filesystem.
185#[derive(Debug, Default, Clone)]
186pub struct MemoryFileManager {
187    file_ids: PathIds,
188    open_files: Arc<Mutex<HashMap<u64, FileBuffer>>>,
189}
190
191impl MemoryFileManager {
192    fn lookup_file(
193        &self,
194        path: impl AsRef<Path>,
195        create_if_needed: bool,
196        id: Option<u64>,
197    ) -> Result<Option<MemoryFile>, Error> {
198        let path = path.as_ref();
199        let id = match id {
200            Some(id) => id,
201            None => match self.file_ids.file_id_for_path(path, create_if_needed) {
202                Some(id) => id,
203                None => return Ok(None),
204            },
205        };
206        let mut open_files = self.open_files.lock();
207        if let Some(open_file) = open_files.get(&id) {
208            Ok(Some(MemoryFile {
209                id: Some(id),
210                buffer: open_file.clone(),
211                path: path.to_path_buf(),
212                position: 0,
213            }))
214        } else if create_if_needed {
215            let file = MemoryFileOpener.open_for_append(path, Some(id))?;
216            open_files.insert(id, file.buffer.clone());
217            Ok(Some(file))
218        } else {
219            Ok(None)
220        }
221    }
222
223    fn forget_file(&self, path: &Path) -> bool {
224        if let Some(id) = self.file_ids.remove_file_id_for_path(path) {
225            let mut open_files = self.open_files.lock();
226            open_files.remove(&id).is_some()
227        } else {
228            false
229        }
230    }
231}
232
233impl FileManager for MemoryFileManager {
234    type File = MemoryFile;
235    type FileHandle = OpenMemoryFile;
236
237    fn append(&self, path: impl AsRef<Path>) -> Result<Self::FileHandle, Error> {
238        let path = path.as_ref();
239        let id = self.file_ids.file_id_for_path(path, true);
240        self.lookup_file(path, true, id).map(|file| OpenMemoryFile {
241            file: file.unwrap(),
242            manager: self.clone(),
243        })
244    }
245
246    fn read(&self, path: impl AsRef<Path>) -> Result<Self::FileHandle, Error> {
247        self.append(path)
248    }
249
250    fn file_length(&self, path: impl AsRef<Path>) -> Result<u64, Error> {
251        let file = self.lookup_file(path, false, None)?.ok_or_else(|| {
252            ErrorKind::Io(io::Error::new(
253                io::ErrorKind::NotFound,
254                ErrorKind::message("not found"),
255            ))
256        })?;
257        file.length()
258    }
259
260    fn exists(&self, path: impl AsRef<Path>) -> Result<bool, Error> {
261        Ok(self.lookup_file(path, false, None)?.is_some())
262    }
263
264    fn delete(&self, path: impl AsRef<Path>) -> Result<bool, Error> {
265        let path = path.as_ref();
266        {
267            let mut open_buffers = OPEN_BUFFERS.lock();
268            open_buffers.remove(path);
269        }
270        Ok(self.forget_file(path))
271    }
272
273    fn delete_directory(&self, path: impl AsRef<Path>) -> Result<(), Error> {
274        let path = path.as_ref();
275        let removed_ids = self.file_ids.remove_file_ids_for_path_prefix(path);
276        let mut open_files = self.open_files.lock();
277        for id in removed_ids {
278            open_files.remove(&id);
279        }
280
281        Ok(())
282    }
283
284    fn close_handles<F: FnOnce(u64)>(&self, path: impl AsRef<Path>, publish_callback: F) {
285        let path = path.as_ref();
286        self.forget_file(path);
287        let new_id = self.file_ids.file_id_for_path(path, true).unwrap();
288        publish_callback(new_id);
289    }
290}
291
292impl ManagedFileOpener<MemoryFile> for MemoryFileManager {
293    fn open_for_read(
294        &self,
295        path: impl AsRef<Path> + Send,
296        id: Option<u64>,
297    ) -> Result<MemoryFile, Error> {
298        MemoryFileOpener.open_for_read(path, id)
299    }
300
301    fn open_for_append(
302        &self,
303        path: impl AsRef<Path> + Send,
304        id: Option<u64>,
305    ) -> Result<MemoryFile, Error> {
306        MemoryFileOpener.open_for_append(path, id)
307    }
308}
309
310/// An open [`MemoryFile`] that is owned by a [`MemoryFileManager`].
311#[derive(Debug)]
312pub struct OpenMemoryFile {
313    file: MemoryFile,
314    manager: MemoryFileManager,
315}
316
317impl OpenableFile<MemoryFile> for OpenMemoryFile {
318    fn id(&self) -> Option<u64> {
319        self.file.id
320    }
321
322    fn replace_with<C: FnOnce(u64)>(
323        self,
324        replacement: MemoryFile,
325        manager: &MemoryFileManager,
326        publish_callback: C,
327    ) -> Result<Self, Error> {
328        let weak_buffer = Arc::downgrade(&replacement.buffer);
329        drop(self.manager.delete(replacement.path()));
330        {
331            let mut open_buffers = OPEN_BUFFERS.lock();
332            open_buffers.insert(self.file.path.clone(), weak_buffer);
333        }
334
335        manager.close_handles(&self.file.path, publish_callback);
336
337        let new_file = manager.append(&self.file.path)?;
338        {
339            assert!(Arc::ptr_eq(&new_file.file.buffer, &replacement.buffer));
340        }
341        Ok(new_file)
342    }
343
344    fn close(self) -> Result<(), Error> {
345        drop(self);
346        Ok(())
347    }
348}
349
350impl OperableFile<MemoryFile> for OpenMemoryFile {
351    fn execute<Output, Op: FileOp<Output>>(&mut self, operator: Op) -> Output {
352        operator.execute(&mut self.file)
353    }
354}