1use std::{
2 collections::{HashMap, VecDeque},
3 fs::{File, OpenOptions},
4 io::{Read, Seek, SeekFrom, Write},
5 path::{Path, PathBuf},
6 sync::Arc,
7};
8
9use parking_lot::Mutex;
10
11use super::{FileManager, FileOp, ManagedFile, OpenableFile};
12use crate::{
13 error::Error,
14 io::{File as _, ManagedFileOpener, OperableFile, PathIds},
15};
16
17#[derive(Debug)]
19pub struct StdFile {
20 file: File,
21 path: PathBuf,
22 id: Option<u64>,
23}
24
25impl ManagedFile for StdFile {
26 type Manager = StdFileManager;
27}
28
29impl super::File for StdFile {
30 fn id(&self) -> Option<u64> {
31 self.id
32 }
33
34 fn path(&self) -> &Path {
35 &self.path
36 }
37
38 fn length(&self) -> Result<u64, Error> {
39 let metadata = self.file.metadata()?;
40 Ok(metadata.len())
41 }
42
43 fn close(mut self) -> Result<(), Error> {
44 self.flush().map_err(Error::from)
46 }
47}
48
49pub struct StdFileOpener;
51
52impl ManagedFileOpener<StdFile> for StdFileOpener {
53 fn open_for_read(
54 &self,
55 path: impl AsRef<std::path::Path> + Send,
56 id: Option<u64>,
57 ) -> Result<StdFile, Error> {
58 let path = path.as_ref();
59 Ok(StdFile {
60 file: File::open(path)?,
61 path: path.to_path_buf(),
62 id,
63 })
64 }
65
66 fn open_for_append(
67 &self,
68 path: impl AsRef<std::path::Path> + Send,
69 id: Option<u64>,
70 ) -> Result<StdFile, Error> {
71 let path = path.as_ref();
72 Ok(StdFile {
73 file: OpenOptions::new()
74 .write(true)
75 .append(true)
76 .read(true)
77 .create(true)
78 .open(path)?,
79 path: path.to_path_buf(),
80 id,
81 })
82 }
83}
84
85impl Seek for StdFile {
86 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self,)))]
87 fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
88 self.file.seek(pos)
89 }
90}
91
92impl Write for StdFile {
93 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self, buf)))]
94 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
95 self.file.write(buf)
96 }
97
98 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
99 fn flush(&mut self) -> std::io::Result<()> {
100 self.file.sync_all()
101 }
102}
103
104impl Read for StdFile {
105 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self, buf)))]
106 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
107 self.file.read(buf)
108 }
109}
110
111#[derive(Debug, Default, Clone)]
113pub struct StdFileManager {
114 file_ids: PathIds,
115 open_files: Arc<Mutex<HashMap<u64, FileSlot>>>,
116 reader_files: Arc<Mutex<HashMap<u64, VecDeque<StdFile>>>>,
117}
118
119#[derive(Debug)]
120enum FileSlot {
121 Available(StdFile),
122 Taken,
123 Waiting(flume::Sender<StdFile>),
124}
125
126impl FileManager for StdFileManager {
127 type File = StdFile;
128 type FileHandle = OpenStdFile;
129 fn append(&self, path: impl AsRef<Path>) -> Result<Self::FileHandle, Error> {
130 let path = path.as_ref();
131 let file_id = self.file_ids.file_id_for_path(path, true).unwrap();
132 let mut open_files = self.open_files.lock();
133 if let Some(open_file) = open_files.get_mut(&file_id) {
134 let mut file = FileSlot::Taken;
135 std::mem::swap(&mut file, open_file);
136 let file = match file {
137 FileSlot::Available(file) => file,
138 other => {
139 let (file_sender, file_receiver) = flume::bounded(1);
140 *open_file = FileSlot::Waiting(file_sender);
141 drop(open_files);
142
143 if let Ok(file) = file_receiver.recv() {
144 if let FileSlot::Waiting(other_sender) = other {
147 let mut open_files = self.open_files.lock();
148 if let Some(open_file) = open_files.get_mut(&file_id) {
149 *open_file = FileSlot::Waiting(other_sender);
150 }
151 }
152 file
153 } else {
154 return self.append(path);
155 }
156 }
157 };
158 Ok(OpenStdFile {
159 file: Some(file),
160 reader: false,
161 manager: Some(self.clone()),
162 })
163 } else {
164 let file = self.open_for_append(path, Some(file_id))?;
165 open_files.insert(file_id, FileSlot::Taken);
166 Ok(OpenStdFile {
167 file: Some(file),
168 reader: false,
169 manager: Some(self.clone()),
170 })
171 }
172 }
173
174 fn read(&self, path: impl AsRef<Path>) -> Result<Self::FileHandle, Error> {
175 let path = path.as_ref();
176 let file_id = self.file_ids.file_id_for_path(path, true).unwrap();
177
178 let mut reader_files = self.reader_files.lock();
179 let files = reader_files.entry(file_id).or_default();
180
181 if let Some(file) = files.pop_front() {
182 return Ok(OpenStdFile {
183 file: Some(file),
184 manager: Some(self.clone()),
185 reader: true,
186 });
187 }
188
189 let file = StdFileOpener.open_for_read(path, Some(file_id))?;
190 Ok(OpenStdFile {
191 file: Some(file),
192 manager: Some(self.clone()),
193 reader: true,
194 })
195 }
196
197 fn delete(&self, path: impl AsRef<Path>) -> Result<bool, Error> {
198 let path = path.as_ref();
199 let file_id = self.file_ids.remove_file_id_for_path(path);
200 if let Some(file_id) = file_id {
201 let mut open_files = self.open_files.lock();
202 let mut reader_files = self.reader_files.lock();
203 open_files.remove(&file_id);
204 reader_files.remove(&file_id);
205 }
206
207 if path.exists() {
208 std::fs::remove_file(path)?;
209 Ok(true)
210 } else {
211 Ok(false)
212 }
213 }
214
215 fn delete_directory(&self, path: impl AsRef<Path>) -> Result<(), Error> {
216 let path = path.as_ref();
217 let removed_ids = self.file_ids.remove_file_ids_for_path_prefix(path);
218 let mut open_files = self.open_files.lock();
219 let mut reader_files = self.reader_files.lock();
220 for id in removed_ids {
221 open_files.remove(&id);
222 reader_files.remove(&id);
223 }
224
225 if path.exists() {
226 std::fs::remove_dir_all(path)?;
227 }
228
229 Ok(())
230 }
231
232 fn close_handles<F: FnOnce(u64)>(&self, path: impl AsRef<Path>, publish_callback: F) {
233 if let Some(result) = self.file_ids.recreate_file_id_for_path(path.as_ref()) {
234 let mut open_files = self.open_files.lock();
235 let mut reader_files = self.reader_files.lock();
236 open_files.remove(&result.previous_id);
237 reader_files.remove(&result.previous_id);
238 publish_callback(result.new_id);
239 }
240 }
241
242 fn exists(&self, path: impl AsRef<std::path::Path>) -> Result<bool, crate::Error> {
243 Ok(path.as_ref().exists())
244 }
245
246 fn file_length(&self, path: impl AsRef<Path>) -> Result<u64, Error> {
247 path.as_ref()
248 .metadata()
249 .map_err(Error::from)
250 .map(|metadata| metadata.len())
251 }
252}
253
254impl ManagedFileOpener<StdFile> for StdFileManager {
255 fn open_for_read(
256 &self,
257 path: impl AsRef<Path> + Send,
258 id: Option<u64>,
259 ) -> Result<StdFile, Error> {
260 StdFileOpener.open_for_read(path, id)
261 }
262
263 fn open_for_append(
264 &self,
265 path: impl AsRef<Path> + Send,
266 id: Option<u64>,
267 ) -> Result<StdFile, Error> {
268 StdFileOpener.open_for_append(path, id)
269 }
270}
271
272#[derive(Debug)]
274pub struct OpenStdFile {
275 file: Option<StdFile>,
276 manager: Option<StdFileManager>,
277 reader: bool,
278}
279
280impl OpenableFile<StdFile> for OpenStdFile {
281 fn id(&self) -> Option<u64> {
282 self.file.as_ref().and_then(StdFile::id)
283 }
284
285 fn replace_with<C: FnOnce(u64)>(
286 self,
287 replacement: StdFile,
288 manager: &StdFileManager,
289 publish_callback: C,
290 ) -> Result<Self, Error> {
291 let current_path = self.file.as_ref().unwrap().path.clone();
292 self.close()?;
293 let path = replacement.path.clone();
294 replacement.close()?;
295
296 std::fs::rename(path, ¤t_path)?;
297 manager.close_handles(¤t_path, publish_callback);
298 manager.append(current_path)
299 }
300
301 fn close(self) -> Result<(), Error> {
302 drop(self);
303 Ok(())
304 }
305}
306
307impl OperableFile<StdFile> for OpenStdFile {
308 fn execute<Output, Op: FileOp<Output>>(&mut self, operator: Op) -> Output {
309 operator.execute(self.file.as_mut().unwrap())
310 }
311}
312
313impl Drop for OpenStdFile {
314 fn drop(&mut self) {
315 if let Some(manager) = &self.manager {
316 let file = self.file.take().unwrap();
317 if let Some(file_id) = file.id {
318 if self.reader {
319 let mut reader_files = manager.reader_files.lock();
320 if let Some(path_files) = reader_files.get_mut(&file_id) {
321 path_files.push_front(file);
322 }
323 } else {
324 let mut writer_files = manager.open_files.lock();
325 if let Some(writer_file) = writer_files.get_mut(&file_id) {
326 match writer_file {
327 FileSlot::Available(_) => unreachable!(),
328 FileSlot::Taken => {
329 *writer_file = FileSlot::Available(file);
330 }
331 FileSlot::Waiting(sender) => {
332 if let Err(flume::SendError(file)) = sender.send(file) {
333 *writer_file = FileSlot::Available(file);
334 }
335 }
336 }
337 }
338 }
339 }
340 }
341 }
342}