parallel_processor/memory_fs/file/
handle.rs

1use std::{
2    collections::HashMap,
3    fs::{File, OpenOptions},
4    io::{Seek, SeekFrom, Write},
5    path::PathBuf,
6    sync::{Arc, Weak},
7};
8
9use once_cell::sync::Lazy;
10use parking_lot::{ArcMutexGuard, Mutex};
11
12static OPENED_FILES: Lazy<Mutex<HashMap<PathBuf, Arc<Mutex<File>>>>> =
13    Lazy::new(|| Mutex::new(HashMap::new()));
14static MAX_OPENED_FILES: Lazy<usize> = Lazy::new(|| {
15    const DEFAULT_MAX_OPENED_FILES: u32 = 1024;
16
17    let os_limit = match () {
18        #[cfg(target_os = "linux")]
19        () => limits_rs::get_own_limits()
20            .map(|l| {
21                l.max_open_files
22                    .soft
23                    .unwrap_or(l.max_open_files.hard.unwrap_or(DEFAULT_MAX_OPENED_FILES))
24            })
25            .unwrap_or(DEFAULT_MAX_OPENED_FILES) as usize,
26        #[cfg(not(target_os = "linux"))]
27        () => DEFAULT_MAX_OPENED_FILES as usize,
28    };
29    os_limit.saturating_sub(30).min(os_limit / 2).max(4)
30});
31
32pub struct FileHandle {
33    path: PathBuf,
34    file: Weak<Mutex<File>>,
35}
36
37impl FileHandle {
38    fn open_file(path: &PathBuf, create_new: bool) -> Arc<Mutex<File>> {
39        let mut opened_files = OPENED_FILES.lock();
40
41        if opened_files.len() >= *MAX_OPENED_FILES {
42            // Check which file to close
43            for open_file in opened_files.iter() {
44                if Arc::strong_count(open_file.1) == 1 {
45                    let to_remove = open_file.0.clone();
46                    opened_files.remove(&to_remove);
47                    // println!("Closing file {}", to_remove.display());
48                    break;
49                }
50            }
51        }
52
53        let file = Arc::new(Mutex::new({
54            let mut file = OpenOptions::new()
55                .create(create_new)
56                .write(true)
57                .append(false)
58                // .custom_flags(O_DIRECT)
59                .open(&path)
60                .map_err(|e| format!("Error while opening file {}: {}", path.display(), e))
61                .unwrap();
62            if !create_new {
63                // Seek to the end of the file if the file is reopened
64                file.seek(SeekFrom::End(0)).unwrap();
65            }
66            file
67        }));
68
69        opened_files.insert(path.clone(), file.clone());
70        file
71    }
72
73    pub fn new(path: PathBuf) -> Self {
74        let file = Self::open_file(&path, true);
75        Self {
76            path,
77            file: Arc::downgrade(&file),
78        }
79    }
80
81    pub fn flush(&self) -> std::io::Result<()> {
82        // Do not flush if the file is closed
83        let Some(file) = self.file.upgrade() else {
84            return Ok(());
85        };
86        let mut file = file.lock();
87        file.flush()
88    }
89
90    pub fn get_path(&self) -> &PathBuf {
91        &self.path
92    }
93
94    pub fn get_file(&mut self) -> ArcMutexGuard<parking_lot::RawMutex, File> {
95        if let Some(file) = self.file.upgrade() {
96            file.lock_arc()
97        } else {
98            let file = Self::open_file(&self.path, false);
99            self.file = Arc::downgrade(&file);
100            file.lock_arc()
101        }
102    }
103}
104
105impl Drop for FileHandle {
106    fn drop(&mut self) {
107        // Close the file writer once the handle is dropped
108        OPENED_FILES.lock().remove(&self.path);
109    }
110}