nebari/io/
fs.rs

1use std::{
2    collections::{HashMap, VecDeque},
3    fs::{File, OpenOptions},
4    io::{Read, Seek, SeekFrom, Write},
5    path::{Path, PathBuf},
6    sync::Arc,
7};
8
9use parking_lot::Mutex;
10
11use super::{FileManager, FileOp, ManagedFile, OpenableFile};
12use crate::{
13    error::Error,
14    io::{File as _, ManagedFileOpener, OperableFile, PathIds},
15};
16
17/// An open file that uses [`std::fs`].
18#[derive(Debug)]
19pub struct StdFile {
20    file: File,
21    path: PathBuf,
22    id: Option<u64>,
23}
24
25impl ManagedFile for StdFile {
26    type Manager = StdFileManager;
27}
28
29impl super::File for StdFile {
30    fn id(&self) -> Option<u64> {
31        self.id
32    }
33
34    fn path(&self) -> &Path {
35        &self.path
36    }
37
38    fn length(&self) -> Result<u64, Error> {
39        let metadata = self.file.metadata()?;
40        Ok(metadata.len())
41    }
42
43    fn close(mut self) -> Result<(), Error> {
44        // Closing is done by just dropping it
45        self.flush().map_err(Error::from)
46    }
47}
48
49/// A [`ManagedFileOpener`] implementation that produces [`StdFile`]s.
50pub struct StdFileOpener;
51
52impl ManagedFileOpener<StdFile> for StdFileOpener {
53    fn open_for_read(
54        &self,
55        path: impl AsRef<std::path::Path> + Send,
56        id: Option<u64>,
57    ) -> Result<StdFile, Error> {
58        let path = path.as_ref();
59        Ok(StdFile {
60            file: File::open(path)?,
61            path: path.to_path_buf(),
62            id,
63        })
64    }
65
66    fn open_for_append(
67        &self,
68        path: impl AsRef<std::path::Path> + Send,
69        id: Option<u64>,
70    ) -> Result<StdFile, Error> {
71        let path = path.as_ref();
72        Ok(StdFile {
73            file: OpenOptions::new()
74                .write(true)
75                .append(true)
76                .read(true)
77                .create(true)
78                .open(path)?,
79            path: path.to_path_buf(),
80            id,
81        })
82    }
83}
84
85impl Seek for StdFile {
86    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self,)))]
87    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
88        self.file.seek(pos)
89    }
90}
91
92impl Write for StdFile {
93    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self, buf)))]
94    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
95        self.file.write(buf)
96    }
97
98    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
99    fn flush(&mut self) -> std::io::Result<()> {
100        self.file.sync_all()
101    }
102}
103
104impl Read for StdFile {
105    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self, buf)))]
106    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
107        self.file.read(buf)
108    }
109}
110
111/// The [`FileManager`] for [`StdFile`].
112#[derive(Debug, Default, Clone)]
113pub struct StdFileManager {
114    file_ids: PathIds,
115    open_files: Arc<Mutex<HashMap<u64, FileSlot>>>,
116    reader_files: Arc<Mutex<HashMap<u64, VecDeque<StdFile>>>>,
117}
118
119#[derive(Debug)]
120enum FileSlot {
121    Available(StdFile),
122    Taken,
123    Waiting(flume::Sender<StdFile>),
124}
125
126impl FileManager for StdFileManager {
127    type File = StdFile;
128    type FileHandle = OpenStdFile;
129    fn append(&self, path: impl AsRef<Path>) -> Result<Self::FileHandle, Error> {
130        let path = path.as_ref();
131        let file_id = self.file_ids.file_id_for_path(path, true).unwrap();
132        let mut open_files = self.open_files.lock();
133        if let Some(open_file) = open_files.get_mut(&file_id) {
134            let mut file = FileSlot::Taken;
135            std::mem::swap(&mut file, open_file);
136            let file = match file {
137                FileSlot::Available(file) => file,
138                other => {
139                    let (file_sender, file_receiver) = flume::bounded(1);
140                    *open_file = FileSlot::Waiting(file_sender);
141                    drop(open_files);
142
143                    if let Ok(file) = file_receiver.recv() {
144                        // If we stole the slot from another waiter (shouldn't
145                        // happen in real usage), we need to reinstall it.
146                        if let FileSlot::Waiting(other_sender) = other {
147                            let mut open_files = self.open_files.lock();
148                            if let Some(open_file) = open_files.get_mut(&file_id) {
149                                *open_file = FileSlot::Waiting(other_sender);
150                            }
151                        }
152                        file
153                    } else {
154                        return self.append(path);
155                    }
156                }
157            };
158            Ok(OpenStdFile {
159                file: Some(file),
160                reader: false,
161                manager: Some(self.clone()),
162            })
163        } else {
164            let file = self.open_for_append(path, Some(file_id))?;
165            open_files.insert(file_id, FileSlot::Taken);
166            Ok(OpenStdFile {
167                file: Some(file),
168                reader: false,
169                manager: Some(self.clone()),
170            })
171        }
172    }
173
174    fn read(&self, path: impl AsRef<Path>) -> Result<Self::FileHandle, Error> {
175        let path = path.as_ref();
176        let file_id = self.file_ids.file_id_for_path(path, true).unwrap();
177
178        let mut reader_files = self.reader_files.lock();
179        let files = reader_files.entry(file_id).or_default();
180
181        if let Some(file) = files.pop_front() {
182            return Ok(OpenStdFile {
183                file: Some(file),
184                manager: Some(self.clone()),
185                reader: true,
186            });
187        }
188
189        let file = StdFileOpener.open_for_read(path, Some(file_id))?;
190        Ok(OpenStdFile {
191            file: Some(file),
192            manager: Some(self.clone()),
193            reader: true,
194        })
195    }
196
197    fn delete(&self, path: impl AsRef<Path>) -> Result<bool, Error> {
198        let path = path.as_ref();
199        let file_id = self.file_ids.remove_file_id_for_path(path);
200        if let Some(file_id) = file_id {
201            let mut open_files = self.open_files.lock();
202            let mut reader_files = self.reader_files.lock();
203            open_files.remove(&file_id);
204            reader_files.remove(&file_id);
205        }
206
207        if path.exists() {
208            std::fs::remove_file(path)?;
209            Ok(true)
210        } else {
211            Ok(false)
212        }
213    }
214
215    fn delete_directory(&self, path: impl AsRef<Path>) -> Result<(), Error> {
216        let path = path.as_ref();
217        let removed_ids = self.file_ids.remove_file_ids_for_path_prefix(path);
218        let mut open_files = self.open_files.lock();
219        let mut reader_files = self.reader_files.lock();
220        for id in removed_ids {
221            open_files.remove(&id);
222            reader_files.remove(&id);
223        }
224
225        if path.exists() {
226            std::fs::remove_dir_all(path)?;
227        }
228
229        Ok(())
230    }
231
232    fn close_handles<F: FnOnce(u64)>(&self, path: impl AsRef<Path>, publish_callback: F) {
233        if let Some(result) = self.file_ids.recreate_file_id_for_path(path.as_ref()) {
234            let mut open_files = self.open_files.lock();
235            let mut reader_files = self.reader_files.lock();
236            open_files.remove(&result.previous_id);
237            reader_files.remove(&result.previous_id);
238            publish_callback(result.new_id);
239        }
240    }
241
242    fn exists(&self, path: impl AsRef<std::path::Path>) -> Result<bool, crate::Error> {
243        Ok(path.as_ref().exists())
244    }
245
246    fn file_length(&self, path: impl AsRef<Path>) -> Result<u64, Error> {
247        path.as_ref()
248            .metadata()
249            .map_err(Error::from)
250            .map(|metadata| metadata.len())
251    }
252}
253
254impl ManagedFileOpener<StdFile> for StdFileManager {
255    fn open_for_read(
256        &self,
257        path: impl AsRef<Path> + Send,
258        id: Option<u64>,
259    ) -> Result<StdFile, Error> {
260        StdFileOpener.open_for_read(path, id)
261    }
262
263    fn open_for_append(
264        &self,
265        path: impl AsRef<Path> + Send,
266        id: Option<u64>,
267    ) -> Result<StdFile, Error> {
268        StdFileOpener.open_for_append(path, id)
269    }
270}
271
272/// An open [`StdFile`] that belongs to a [`StdFileManager`].
273#[derive(Debug)]
274pub struct OpenStdFile {
275    file: Option<StdFile>,
276    manager: Option<StdFileManager>,
277    reader: bool,
278}
279
280impl OpenableFile<StdFile> for OpenStdFile {
281    fn id(&self) -> Option<u64> {
282        self.file.as_ref().and_then(StdFile::id)
283    }
284
285    fn replace_with<C: FnOnce(u64)>(
286        self,
287        replacement: StdFile,
288        manager: &StdFileManager,
289        publish_callback: C,
290    ) -> Result<Self, Error> {
291        let current_path = self.file.as_ref().unwrap().path.clone();
292        self.close()?;
293        let path = replacement.path.clone();
294        replacement.close()?;
295
296        std::fs::rename(path, &current_path)?;
297        manager.close_handles(&current_path, publish_callback);
298        manager.append(current_path)
299    }
300
301    fn close(self) -> Result<(), Error> {
302        drop(self);
303        Ok(())
304    }
305}
306
307impl OperableFile<StdFile> for OpenStdFile {
308    fn execute<Output, Op: FileOp<Output>>(&mut self, operator: Op) -> Output {
309        operator.execute(self.file.as_mut().unwrap())
310    }
311}
312
313impl Drop for OpenStdFile {
314    fn drop(&mut self) {
315        if let Some(manager) = &self.manager {
316            let file = self.file.take().unwrap();
317            if let Some(file_id) = file.id {
318                if self.reader {
319                    let mut reader_files = manager.reader_files.lock();
320                    if let Some(path_files) = reader_files.get_mut(&file_id) {
321                        path_files.push_front(file);
322                    }
323                } else {
324                    let mut writer_files = manager.open_files.lock();
325                    if let Some(writer_file) = writer_files.get_mut(&file_id) {
326                        match writer_file {
327                            FileSlot::Available(_) => unreachable!(),
328                            FileSlot::Taken => {
329                                *writer_file = FileSlot::Available(file);
330                            }
331                            FileSlot::Waiting(sender) => {
332                                if let Err(flume::SendError(file)) = sender.send(file) {
333                                    *writer_file = FileSlot::Available(file);
334                                }
335                            }
336                        }
337                    }
338                }
339            }
340        }
341    }
342}