parallel_processor/memory_fs/file/
handle.rs1use std::{
2 collections::HashMap,
3 fs::{File, OpenOptions},
4 io::{Seek, SeekFrom, Write},
5 path::PathBuf,
6 sync::{Arc, Weak},
7};
8
9use once_cell::sync::Lazy;
10use parking_lot::{ArcMutexGuard, Mutex};
11
12static OPENED_FILES: Lazy<Mutex<HashMap<PathBuf, Arc<Mutex<File>>>>> =
13 Lazy::new(|| Mutex::new(HashMap::new()));
14static MAX_OPENED_FILES: Lazy<usize> = Lazy::new(|| {
15 const DEFAULT_MAX_OPENED_FILES: u32 = 1024;
16
17 let os_limit = match () {
18 #[cfg(target_os = "linux")]
19 () => limits_rs::get_own_limits()
20 .map(|l| {
21 l.max_open_files
22 .soft
23 .unwrap_or(l.max_open_files.hard.unwrap_or(DEFAULT_MAX_OPENED_FILES))
24 })
25 .unwrap_or(DEFAULT_MAX_OPENED_FILES) as usize,
26 #[cfg(not(target_os = "linux"))]
27 () => DEFAULT_MAX_OPENED_FILES as usize,
28 };
29 os_limit.saturating_sub(30).min(os_limit / 2).max(4)
30});
31
32pub struct FileHandle {
33 path: PathBuf,
34 file: Weak<Mutex<File>>,
35}
36
37impl FileHandle {
38 fn open_file(path: &PathBuf, create_new: bool) -> Arc<Mutex<File>> {
39 let mut opened_files = OPENED_FILES.lock();
40
41 if opened_files.len() >= *MAX_OPENED_FILES {
42 for open_file in opened_files.iter() {
44 if Arc::strong_count(open_file.1) == 1 {
45 let to_remove = open_file.0.clone();
46 opened_files.remove(&to_remove);
47 break;
49 }
50 }
51 }
52
53 let file = Arc::new(Mutex::new({
54 let mut file = OpenOptions::new()
55 .create(create_new)
56 .write(true)
57 .append(false)
58 .open(&path)
60 .map_err(|e| format!("Error while opening file {}: {}", path.display(), e))
61 .unwrap();
62 if !create_new {
63 file.seek(SeekFrom::End(0)).unwrap();
65 }
66 file
67 }));
68
69 opened_files.insert(path.clone(), file.clone());
70 file
71 }
72
73 pub fn new(path: PathBuf) -> Self {
74 let file = Self::open_file(&path, true);
75 Self {
76 path,
77 file: Arc::downgrade(&file),
78 }
79 }
80
81 pub fn flush(&self) -> std::io::Result<()> {
82 let Some(file) = self.file.upgrade() else {
84 return Ok(());
85 };
86 let mut file = file.lock();
87 file.flush()
88 }
89
90 pub fn get_path(&self) -> &PathBuf {
91 &self.path
92 }
93
94 pub fn get_file(&mut self) -> ArcMutexGuard<parking_lot::RawMutex, File> {
95 if let Some(file) = self.file.upgrade() {
96 file.lock_arc()
97 } else {
98 let file = Self::open_file(&self.path, false);
99 self.file = Arc::downgrade(&file);
100 file.lock_arc()
101 }
102 }
103}
104
105impl Drop for FileHandle {
106 fn drop(&mut self) {
107 OPENED_FILES.lock().remove(&self.path);
109 }
110}