parallel_processor/memory_fs/
mod.rs

1use rustc_hash::FxHashMap;
2use std::fs::File;
3
4use std::path::{Path, PathBuf};
5
6use std::sync::Arc;
7
8use crate::memory_fs::file::flush::*;
9
10use parking_lot::Mutex;
11
12use crate::memory_data_size::MemoryDataSize;
13use crate::memory_fs::allocator::CHUNKS_ALLOCATOR;
14use crate::memory_fs::file::internal::{MemoryFileInternal, SWAPPABLE_FILES};
15use nightly_quirks::utils::NightlyUtils;
16
17pub const O_DIRECT: i32 = 0x4000;
18
19#[macro_use]
20pub mod allocator;
21pub mod file;
22pub mod flushable_buffer;
23pub(crate) mod stats;
24
25static FILES_FLUSH_HASH_MAP: Mutex<Option<FxHashMap<PathBuf, Vec<Arc<(PathBuf, Mutex<File>)>>>>> =
26    Mutex::new(None);
27
28pub struct MemoryFs;
29
30#[derive(Copy, Clone, Debug)]
31pub enum RemoveFileMode {
32    Keep,
33    Remove { remove_fs: bool },
34}
35
36impl MemoryFs {
37    pub fn init(
38        memory_size: MemoryDataSize,
39        flush_queue_size: usize,
40        threads_count: usize,
41        min_chunks_count: usize,
42    ) {
43        let chunk_size = (memory_size / (min_chunks_count as f64)).as_bytes();
44
45        let mut suggested_chunk_size_log = 1;
46
47        while (1 << (suggested_chunk_size_log + 1)) <= chunk_size {
48            suggested_chunk_size_log += 1;
49        }
50
51        CHUNKS_ALLOCATOR.initialize(memory_size, suggested_chunk_size_log, min_chunks_count);
52        *FILES_FLUSH_HASH_MAP.lock() = Some(FxHashMap::with_capacity_and_hasher(
53            8192,
54            Default::default(),
55        ));
56        GlobalFlush::init(flush_queue_size, threads_count);
57    }
58
59    pub fn remove_file(file: impl AsRef<Path>, remove_mode: RemoveFileMode) -> Result<(), ()> {
60        match remove_mode {
61            RemoveFileMode::Keep => {
62                // Do nothing
63                Ok(())
64            }
65            RemoveFileMode::Remove { remove_fs } => {
66                if MemoryFileInternal::delete(file, remove_fs) {
67                    Ok(())
68                } else {
69                    Err(())
70                }
71            }
72        }
73    }
74
75    pub fn get_file_size(file: impl AsRef<Path>) -> Option<usize> {
76        MemoryFileInternal::retrieve_reference(&file)
77            .map(|f| f.read().len())
78            .or_else(|| std::fs::metadata(&file).map(|m| m.len() as usize).ok())
79    }
80
81    pub fn ensure_flushed(file: impl AsRef<Path>) {
82        FILES_FLUSH_HASH_MAP
83            .lock()
84            .as_mut()
85            .unwrap()
86            .remove(&file.as_ref().to_path_buf());
87    }
88
89    pub fn remove_directory(dir: impl AsRef<Path>, remove_fs: bool) -> bool {
90        MemoryFileInternal::delete_directory(dir, remove_fs)
91    }
92
93    pub fn flush_all_to_disk() {
94        GlobalFlush::flush_to_disk();
95    }
96
97    pub fn free_memory() {
98        CHUNKS_ALLOCATOR.giveback_free_memory()
99    }
100
101    pub fn terminate() {
102        GlobalFlush::terminate();
103        CHUNKS_ALLOCATOR.deinitialize();
104    }
105
106    pub fn get_stats() -> stats::MemoryFsStats {
107        stats::get_stats()
108    }
109
110    pub fn stats_reset() {
111        stats::reset();
112    }
113
114    pub fn reduce_pressure() -> bool {
115        // crate::log_info!("Reducing pressure!");
116        let (current, max_size) = GlobalFlush::global_queue_occupation();
117        if current * 3 < max_size {
118            let mut map_lock = SWAPPABLE_FILES.lock();
119            let map_lock_mut =
120                NightlyUtils::mutex_get_or_init(&mut map_lock, || FxHashMap::default());
121
122            let mut file_ref = None;
123
124            for (key, file) in map_lock_mut.iter() {
125                if let Some(file) = file.upgrade() {
126                    let file_read = file.read();
127                    if file_read.is_memory_preferred() && file_read.has_flush_pending_chunks() {
128                        drop(file_read);
129                        file_ref = Some((key.clone(), file));
130                        break;
131                    }
132                }
133            }
134
135            if let Some((key, file)) = file_ref {
136                map_lock_mut.remove(&key);
137                drop(map_lock);
138                let mut file = file.write();
139                file.change_to_disk_only();
140                file.flush_chunks(usize::MAX);
141                return true;
142            }
143        }
144
145        return !GlobalFlush::is_queue_empty();
146    }
147}
148
149#[cfg(test)]
150mod tests {
151    use crate::memory_data_size::MemoryDataSize;
152    use crate::memory_fs::file::flush::GlobalFlush;
153    use crate::memory_fs::file::internal::MemoryFileMode;
154    use crate::memory_fs::file::reader::FileReader;
155    use crate::memory_fs::file::writer::FileWriter;
156    use crate::memory_fs::MemoryFs;
157    use rayon::prelude::*;
158    use std::io::{Read, Seek, SeekFrom, Write};
159
160    #[test]
161    #[ignore]
162    pub fn memory_fs_test() {
163        MemoryFs::init(MemoryDataSize::from_mebioctets(100 * 1024), 1024, 3, 0);
164        let data = (0..3337).map(|x| (x % 256) as u8).collect::<Vec<u8>>();
165
166        (0..400).into_par_iter().for_each(|i: u32| {
167            crate::log_info!("Writing file {}", i);
168            let mut file = FileWriter::create(
169                format!("/home/andrea/genome-assembly/test1234/{}.tmp", i),
170                MemoryFileMode::PreferMemory { swap_priority: 3 },
171            );
172            for _ in 0..(1024 * 64) {
173                file.write(data.as_slice()).unwrap();
174            }
175            drop(file);
176            let mut file2 = FileReader::open(
177                format!("/home/andrea/genome-assembly/test1234/{}.tmp", i),
178                None,
179            )
180            .unwrap();
181
182            file2.seek(SeekFrom::Start(17 + 3337 * 12374)).unwrap();
183            let mut buffer = [0; 4];
184            file2.read_exact(&mut buffer).unwrap();
185            assert_eq!(&buffer, &data[17..21]);
186        });
187
188        GlobalFlush::flush_to_disk();
189
190        (0..400).into_par_iter().for_each(|i: u32| {
191            crate::log_info!("Reading file {}", i);
192            let mut datar = vec![0; 3337];
193            let mut file = FileReader::open(
194                format!("/home/andrea/genome-assembly/test1234/{}.tmp", i),
195                None,
196            )
197            .unwrap();
198            for _ in 0..(1024 * 64) {
199                file.read_exact(datar.as_mut_slice()).unwrap();
200                assert_eq!(datar, data);
201            }
202            assert_eq!(file.read(datar.as_mut_slice()).unwrap(), 0);
203            crate::log_info!("Read file {}", i);
204        });
205
206        MemoryFs::terminate();
207    }
208}