parallel_processor/memory_fs/
mod.rs1use std::collections::{BTreeMap, HashMap};
2use std::fs::File;
3
4use std::path::{Path, PathBuf};
5
6use std::sync::Arc;
7
8use crate::memory_fs::file::flush::*;
9
10use parking_lot::Mutex;
11
12use crate::memory_data_size::MemoryDataSize;
13use crate::memory_fs::allocator::CHUNKS_ALLOCATOR;
14use crate::memory_fs::file::internal::{MemoryFileInternal, SWAPPABLE_FILES};
15use nightly_quirks::utils::NightlyUtils;
16
17pub const O_DIRECT: i32 = 0x4000;
18
19#[macro_use]
20pub mod allocator;
21pub mod file;
22pub mod flushable_buffer;
23pub(crate) mod stats;
24
25static mut FILES_FLUSH_HASH_MAP: Option<Mutex<HashMap<PathBuf, Vec<Arc<(PathBuf, Mutex<File>)>>>>> =
26 None;
27
28pub struct MemoryFs;
29
30#[derive(Copy, Clone, Debug)]
31pub enum RemoveFileMode {
32 Keep,
33 Remove { remove_fs: bool },
34}
35
36impl MemoryFs {
37 pub fn init(
38 memory_size: MemoryDataSize,
39 flush_queue_size: usize,
40 threads_count: usize,
41 min_chunks_count: usize,
42 ) {
43 unsafe {
44 let chunk_size = (memory_size / (min_chunks_count as f64)).as_bytes();
45
46 let mut suggested_chunk_size_log = 1;
47
48 while (1 << (suggested_chunk_size_log + 1)) <= chunk_size {
49 suggested_chunk_size_log += 1;
50 }
51
52 CHUNKS_ALLOCATOR.initialize(memory_size, suggested_chunk_size_log, min_chunks_count);
53 FILES_FLUSH_HASH_MAP = Some(Mutex::new(HashMap::with_capacity(8192)));
54 GlobalFlush::init(flush_queue_size, threads_count);
55 }
56 }
57
58 pub fn remove_file(file: impl AsRef<Path>, remove_mode: RemoveFileMode) -> Result<(), ()> {
59 match remove_mode {
60 RemoveFileMode::Keep => {
61 Ok(())
63 }
64 RemoveFileMode::Remove { remove_fs } => {
65 if MemoryFileInternal::delete(file, remove_fs) {
66 Ok(())
67 } else {
68 Err(())
69 }
70 }
71 }
72 }
73
74 pub fn get_file_size(file: impl AsRef<Path>) -> Option<usize> {
75 MemoryFileInternal::retrieve_reference(&file)
76 .map(|f| f.read().len())
77 .or_else(|| std::fs::metadata(&file).map(|m| m.len() as usize).ok())
78 }
79
80 pub fn ensure_flushed(file: impl AsRef<Path>) {
81 unsafe {
82 FILES_FLUSH_HASH_MAP
83 .as_mut()
84 .unwrap()
85 .lock()
86 .remove(&file.as_ref().to_path_buf());
87 }
88 }
89
90 pub fn remove_directory(dir: impl AsRef<Path>, remove_fs: bool) -> bool {
91 MemoryFileInternal::delete_directory(dir, remove_fs)
92 }
93
94 pub fn flush_all_to_disk() {
95 GlobalFlush::flush_to_disk();
96 }
97
98 pub fn free_memory() {
99 CHUNKS_ALLOCATOR.giveback_free_memory()
100 }
101
102 pub fn terminate() {
103 GlobalFlush::terminate();
104 CHUNKS_ALLOCATOR.deinitialize();
105 }
106
107 pub fn get_stats() -> stats::MemoryFsStats {
108 stats::get_stats()
109 }
110
111 pub fn stats_reset() {
112 stats::reset();
113 }
114
115 pub fn reduce_pressure() -> bool {
116 let (current, max_size) = GlobalFlush::global_queue_occupation();
118 if current * 3 < max_size {
119 let mut map_lock = SWAPPABLE_FILES.lock();
120 let map_lock_mut = NightlyUtils::mutex_get_or_init(&mut map_lock, || BTreeMap::new());
121
122 let mut file_ref = None;
123
124 for (key, file) in map_lock_mut.iter() {
125 if let Some(file) = file.upgrade() {
126 let file_read = file.read();
127 if file_read.is_memory_preferred() && file_read.has_flush_pending_chunks() {
128 drop(file_read);
129 file_ref = Some((key.clone(), file));
130 break;
131 }
132 }
133 }
134
135 if let Some((key, file)) = file_ref {
136 map_lock_mut.remove(&key);
137 drop(map_lock);
138 let mut file = file.write();
139 file.change_to_disk_only();
140 file.flush_chunks(usize::MAX);
141 return true;
142 }
143 }
144
145 return !GlobalFlush::is_queue_empty();
146 }
147}
148
149#[cfg(test)]
150mod tests {
151 use crate::memory_data_size::MemoryDataSize;
152 use crate::memory_fs::file::flush::GlobalFlush;
153 use crate::memory_fs::file::internal::MemoryFileMode;
154 use crate::memory_fs::file::reader::FileReader;
155 use crate::memory_fs::file::writer::FileWriter;
156 use crate::memory_fs::MemoryFs;
157 use rayon::prelude::*;
158 use std::io::{Read, Seek, SeekFrom, Write};
159
160 #[test]
161 #[ignore]
162 pub fn memory_fs_test() {
163 MemoryFs::init(MemoryDataSize::from_mebioctets(100 * 1024), 1024, 3, 0);
164 let data = (0..3337).map(|x| (x % 256) as u8).collect::<Vec<u8>>();
165
166 (0..400).into_par_iter().for_each(|i: u32| {
167 crate::log_info!("Writing file {}", i);
168 let mut file = FileWriter::create(
169 format!("/home/andrea/genome-assembly/test1234/{}.tmp", i),
170 MemoryFileMode::PreferMemory { swap_priority: 3 },
171 );
172 for _ in 0..(1024 * 64) {
173 file.write(data.as_slice()).unwrap();
174 }
175 drop(file);
176 let mut file2 = FileReader::open(
177 format!("/home/andrea/genome-assembly/test1234/{}.tmp", i),
178 None,
179 )
180 .unwrap();
181
182 file2.seek(SeekFrom::Start(17 + 3337 * 12374)).unwrap();
183 let mut buffer = [0; 4];
184 file2.read_exact(&mut buffer).unwrap();
185 assert_eq!(&buffer, &data[17..21]);
186 });
187
188 GlobalFlush::flush_to_disk();
189
190 (0..400).into_par_iter().for_each(|i: u32| {
191 crate::log_info!("Reading file {}", i);
192 let mut datar = vec![0; 3337];
193 let mut file = FileReader::open(
194 format!("/home/andrea/genome-assembly/test1234/{}.tmp", i),
195 None,
196 )
197 .unwrap();
198 for _ in 0..(1024 * 64) {
199 file.read_exact(datar.as_mut_slice()).unwrap();
200 assert_eq!(datar, data);
201 }
202 assert_eq!(file.read(datar.as_mut_slice()).unwrap(), 0);
203 crate::log_info!("Read file {}", i);
204 });
205
206 MemoryFs::terminate();
207 }
208}