parallel_processor/memory_fs/file/
handle.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
use std::{
    collections::HashMap,
    fs::{File, OpenOptions},
    io::{Seek, SeekFrom, Write},
    path::PathBuf,
    sync::{Arc, Weak},
};

use once_cell::sync::Lazy;
use parking_lot::{ArcMutexGuard, Mutex};

static OPENED_FILES: Lazy<Mutex<HashMap<PathBuf, Arc<Mutex<File>>>>> =
    Lazy::new(|| Mutex::new(HashMap::new()));
static MAX_OPENED_FILES: Lazy<usize> = Lazy::new(|| {
    const DEFAULT_MAX_OPENED_FILES: u32 = 1024;
    let os_limit = limits_rs::get_own_limits()
        .map(|l| {
            l.max_open_files
                .soft
                .unwrap_or(l.max_open_files.hard.unwrap_or(DEFAULT_MAX_OPENED_FILES))
        })
        .unwrap_or(DEFAULT_MAX_OPENED_FILES) as usize;
    os_limit.saturating_sub(30).min(os_limit / 2).max(4)
});

pub struct FileHandle {
    path: PathBuf,
    file: Weak<Mutex<File>>,
}

impl FileHandle {
    fn open_file(path: &PathBuf, create_new: bool) -> Arc<Mutex<File>> {
        let mut opened_files = OPENED_FILES.lock();

        if opened_files.len() >= *MAX_OPENED_FILES {
            // Check which file to close
            for open_file in opened_files.iter() {
                if Arc::strong_count(open_file.1) == 1 {
                    let to_remove = open_file.0.clone();
                    opened_files.remove(&to_remove);
                    // println!("Closing file {}", to_remove.display());
                    break;
                }
            }
        }

        let file = Arc::new(Mutex::new({
            let mut file = OpenOptions::new()
                .create(create_new)
                .write(true)
                .append(false)
                // .custom_flags(O_DIRECT)
                .open(&path)
                .map_err(|e| format!("Error while opening file {}: {}", path.display(), e))
                .unwrap();
            if !create_new {
                // Seek to the end of the file if the file is reopened
                file.seek(SeekFrom::End(0)).unwrap();
            }
            file
        }));

        opened_files.insert(path.clone(), file.clone());
        file
    }

    pub fn new(path: PathBuf) -> Self {
        let file = Self::open_file(&path, true);
        Self {
            path,
            file: Arc::downgrade(&file),
        }
    }

    pub fn flush(&self) -> std::io::Result<()> {
        // Do not flush if the file is closed
        let Some(file) = self.file.upgrade() else {
            return Ok(());
        };
        let mut file = file.lock();
        file.flush()
    }

    pub fn get_path(&self) -> &PathBuf {
        &self.path
    }

    pub fn get_file(&mut self) -> ArcMutexGuard<parking_lot::RawMutex, File> {
        if let Some(file) = self.file.upgrade() {
            file.lock_arc()
        } else {
            let file = Self::open_file(&self.path, false);
            self.file = Arc::downgrade(&file);
            file.lock_arc()
        }
    }
}