parallel_processor/memory_fs/
mod.rs

1use rustc_hash::FxHashMap;
2use std::fs::File;
3
4use std::path::{Path, PathBuf};
5
6use std::sync::Arc;
7
8use crate::memory_fs::file::flush::*;
9
10use parking_lot::Mutex;
11
12use crate::memory_data_size::MemoryDataSize;
13use crate::memory_fs::allocator::{MemoryAllocationLimits, CHUNKS_ALLOCATOR};
14use crate::memory_fs::file::internal::{MemoryFileInternal, SWAPPABLE_FILES};
15
16pub const O_DIRECT: i32 = 0x4000;
17
18#[macro_use]
19pub mod allocator;
20pub mod file;
21pub mod flushable_buffer;
22pub(crate) mod stats;
23
24static FILES_FLUSH_HASH_MAP: Mutex<Option<FxHashMap<PathBuf, Vec<Arc<(PathBuf, Mutex<File>)>>>>> =
25    Mutex::new(None);
26
27pub struct MemoryFs;
28
29#[derive(Copy, Clone, Debug)]
30pub enum RemoveFileMode {
31    Keep,
32    Remove { remove_fs: bool },
33}
34
35impl MemoryFs {
36    pub fn init(
37        memory_size: MemoryDataSize,
38        flush_queue_size: usize,
39        threads_count: usize,
40        min_chunks_count: usize,
41        alloc_limits: Option<MemoryAllocationLimits>,
42    ) {
43        let chunk_size = (memory_size / (min_chunks_count as f64)).as_bytes();
44
45        let mut suggested_chunk_size_log = 1;
46
47        while (1 << (suggested_chunk_size_log + 1)) <= chunk_size {
48            suggested_chunk_size_log += 1;
49        }
50
51        CHUNKS_ALLOCATOR.initialize(
52            memory_size,
53            suggested_chunk_size_log,
54            min_chunks_count,
55            alloc_limits,
56        );
57        *FILES_FLUSH_HASH_MAP.lock() = Some(FxHashMap::with_capacity_and_hasher(
58            8192,
59            Default::default(),
60        ));
61        GlobalFlush::init(flush_queue_size, threads_count);
62    }
63
64    pub fn remove_file(file: impl AsRef<Path>, remove_mode: RemoveFileMode) -> Result<(), ()> {
65        match remove_mode {
66            RemoveFileMode::Keep => {
67                // Do nothing
68                Ok(())
69            }
70            RemoveFileMode::Remove { remove_fs } => {
71                if MemoryFileInternal::delete(file, remove_fs) {
72                    Ok(())
73                } else {
74                    Err(())
75                }
76            }
77        }
78    }
79
80    pub fn get_file_size(file: impl AsRef<Path>) -> Option<usize> {
81        MemoryFileInternal::retrieve_reference(&file)
82            .map(|f| f.read().len())
83            .or_else(|| std::fs::metadata(&file).map(|m| m.len() as usize).ok())
84    }
85
86    pub fn ensure_flushed(file: impl AsRef<Path>) {
87        FILES_FLUSH_HASH_MAP
88            .lock()
89            .as_mut()
90            .unwrap()
91            .remove(&file.as_ref().to_path_buf());
92    }
93
94    pub fn remove_directory(dir: impl AsRef<Path>, remove_fs: bool) -> bool {
95        MemoryFileInternal::delete_directory(dir, remove_fs)
96    }
97
98    pub fn flush_to_disk(flush_all: bool) {
99        if flush_all {
100            MemoryFileInternal::flush_all_to_disk();
101        }
102        GlobalFlush::flush_to_disk();
103    }
104
105    pub fn free_memory() {
106        CHUNKS_ALLOCATOR.giveback_free_memory()
107    }
108
109    pub fn terminate() {
110        GlobalFlush::terminate();
111        CHUNKS_ALLOCATOR.deinitialize();
112    }
113
114    pub fn get_stats() -> stats::MemoryFsStats {
115        stats::get_stats()
116    }
117
118    pub fn stats_reset() {
119        stats::reset();
120    }
121
122    pub fn reduce_pressure() -> bool {
123        // crate::log_info!("Reducing pressure!");
124        let (current, max_size) = GlobalFlush::global_queue_occupation();
125        if current * 3 < max_size {
126            let mut map_lock = SWAPPABLE_FILES.lock();
127            if let Some(file) = map_lock.get_next() {
128                drop(map_lock);
129                let mut file = file.write();
130                file.change_to_disk_only();
131                file.flush_chunks(usize::MAX);
132                return true;
133            }
134        }
135
136        return !GlobalFlush::is_queue_empty();
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use crate::memory_data_size::MemoryDataSize;
143    use crate::memory_fs::file::flush::GlobalFlush;
144    use crate::memory_fs::file::internal::MemoryFileMode;
145    use crate::memory_fs::file::reader::FileReader;
146    use crate::memory_fs::file::writer::FileWriter;
147    use crate::memory_fs::MemoryFs;
148    use rayon::prelude::*;
149    use std::io::{Read, Seek, SeekFrom, Write};
150
151    #[test]
152    #[ignore]
153    pub fn memory_fs_test() {
154        MemoryFs::init(MemoryDataSize::from_mebioctets(100 * 1024), 1024, 3, 0);
155        let data = (0..3337).map(|x| (x % 256) as u8).collect::<Vec<u8>>();
156
157        (0..400).into_par_iter().for_each(|i: u32| {
158            crate::log_info!("Writing file {}", i);
159            let mut file = FileWriter::create(
160                format!("/home/andrea/genome-assembly/test1234/{}.tmp", i),
161                MemoryFileMode::PreferMemory { swap_priority: 3 },
162            );
163            for _ in 0..(1024 * 64) {
164                file.write(data.as_slice()).unwrap();
165            }
166            drop(file);
167            let mut file2 = FileReader::open(
168                format!("/home/andrea/genome-assembly/test1234/{}.tmp", i),
169                None,
170            )
171            .unwrap();
172
173            file2.seek(SeekFrom::Start(17 + 3337 * 12374)).unwrap();
174            let mut buffer = [0; 4];
175            file2.read_exact(&mut buffer).unwrap();
176            assert_eq!(&buffer, &data[17..21]);
177        });
178
179        GlobalFlush::flush_to_disk();
180
181        (0..400).into_par_iter().for_each(|i: u32| {
182            crate::log_info!("Reading file {}", i);
183            let mut datar = vec![0; 3337];
184            let mut file = FileReader::open(
185                format!("/home/andrea/genome-assembly/test1234/{}.tmp", i),
186                None,
187            )
188            .unwrap();
189            for _ in 0..(1024 * 64) {
190                file.read_exact(datar.as_mut_slice()).unwrap();
191                assert_eq!(datar, data);
192            }
193            assert_eq!(file.read(datar.as_mut_slice()).unwrap(), 0);
194            crate::log_info!("Read file {}", i);
195        });
196
197        MemoryFs::terminate();
198    }
199}