parallel_processor/memory_fs/
mod.rs1use rustc_hash::FxHashMap;
2use std::fs::File;
3
4use std::path::{Path, PathBuf};
5
6use std::sync::Arc;
7
8use crate::memory_fs::file::flush::*;
9
10use parking_lot::Mutex;
11
12use crate::memory_data_size::MemoryDataSize;
13use crate::memory_fs::allocator::CHUNKS_ALLOCATOR;
14use crate::memory_fs::file::internal::{MemoryFileInternal, SWAPPABLE_FILES};
15
16pub const O_DIRECT: i32 = 0x4000;
17
18#[macro_use]
19pub mod allocator;
20pub mod file;
21pub mod flushable_buffer;
22pub(crate) mod stats;
23
24static FILES_FLUSH_HASH_MAP: Mutex<Option<FxHashMap<PathBuf, Vec<Arc<(PathBuf, Mutex<File>)>>>>> =
25 Mutex::new(None);
26
27pub struct MemoryFs;
28
29#[derive(Copy, Clone, Debug)]
30pub enum RemoveFileMode {
31 Keep,
32 Remove { remove_fs: bool },
33}
34
35impl MemoryFs {
36 pub fn init(
37 memory_size: MemoryDataSize,
38 flush_queue_size: usize,
39 threads_count: usize,
40 min_chunks_count: usize,
41 ) {
42 let chunk_size = (memory_size / (min_chunks_count as f64)).as_bytes();
43
44 let mut suggested_chunk_size_log = 1;
45
46 while (1 << (suggested_chunk_size_log + 1)) <= chunk_size {
47 suggested_chunk_size_log += 1;
48 }
49
50 CHUNKS_ALLOCATOR.initialize(memory_size, suggested_chunk_size_log, min_chunks_count);
51 *FILES_FLUSH_HASH_MAP.lock() = Some(FxHashMap::with_capacity_and_hasher(
52 8192,
53 Default::default(),
54 ));
55 GlobalFlush::init(flush_queue_size, threads_count);
56 }
57
58 pub fn remove_file(file: impl AsRef<Path>, remove_mode: RemoveFileMode) -> Result<(), ()> {
59 match remove_mode {
60 RemoveFileMode::Keep => {
61 Ok(())
63 }
64 RemoveFileMode::Remove { remove_fs } => {
65 if MemoryFileInternal::delete(file, remove_fs) {
66 Ok(())
67 } else {
68 Err(())
69 }
70 }
71 }
72 }
73
74 pub fn get_file_size(file: impl AsRef<Path>) -> Option<usize> {
75 MemoryFileInternal::retrieve_reference(&file)
76 .map(|f| f.read().len())
77 .or_else(|| std::fs::metadata(&file).map(|m| m.len() as usize).ok())
78 }
79
80 pub fn ensure_flushed(file: impl AsRef<Path>) {
81 FILES_FLUSH_HASH_MAP
82 .lock()
83 .as_mut()
84 .unwrap()
85 .remove(&file.as_ref().to_path_buf());
86 }
87
88 pub fn remove_directory(dir: impl AsRef<Path>, remove_fs: bool) -> bool {
89 MemoryFileInternal::delete_directory(dir, remove_fs)
90 }
91
92 pub fn flush_to_disk(flush_all: bool) {
93 if flush_all {
94 MemoryFileInternal::flush_all_to_disk();
95 }
96 GlobalFlush::flush_to_disk();
97 }
98
99 pub fn free_memory() {
100 CHUNKS_ALLOCATOR.giveback_free_memory()
101 }
102
103 pub fn terminate() {
104 GlobalFlush::terminate();
105 CHUNKS_ALLOCATOR.deinitialize();
106 }
107
108 pub fn get_stats() -> stats::MemoryFsStats {
109 stats::get_stats()
110 }
111
112 pub fn stats_reset() {
113 stats::reset();
114 }
115
116 pub fn reduce_pressure() -> bool {
117 let (current, max_size) = GlobalFlush::global_queue_occupation();
119 if current * 3 < max_size {
120 let mut map_lock = SWAPPABLE_FILES.lock();
121 if let Some(file) = map_lock.get_next() {
122 drop(map_lock);
123 let mut file = file.write();
124 file.change_to_disk_only();
125 file.flush_chunks(usize::MAX);
126 return true;
127 }
128 }
129
130 return !GlobalFlush::is_queue_empty();
131 }
132}
133
134#[cfg(test)]
135mod tests {
136 use crate::memory_data_size::MemoryDataSize;
137 use crate::memory_fs::file::flush::GlobalFlush;
138 use crate::memory_fs::file::internal::MemoryFileMode;
139 use crate::memory_fs::file::reader::FileReader;
140 use crate::memory_fs::file::writer::FileWriter;
141 use crate::memory_fs::MemoryFs;
142 use rayon::prelude::*;
143 use std::io::{Read, Seek, SeekFrom, Write};
144
145 #[test]
146 #[ignore]
147 pub fn memory_fs_test() {
148 MemoryFs::init(MemoryDataSize::from_mebioctets(100 * 1024), 1024, 3, 0);
149 let data = (0..3337).map(|x| (x % 256) as u8).collect::<Vec<u8>>();
150
151 (0..400).into_par_iter().for_each(|i: u32| {
152 crate::log_info!("Writing file {}", i);
153 let mut file = FileWriter::create(
154 format!("/home/andrea/genome-assembly/test1234/{}.tmp", i),
155 MemoryFileMode::PreferMemory { swap_priority: 3 },
156 );
157 for _ in 0..(1024 * 64) {
158 file.write(data.as_slice()).unwrap();
159 }
160 drop(file);
161 let mut file2 = FileReader::open(
162 format!("/home/andrea/genome-assembly/test1234/{}.tmp", i),
163 None,
164 )
165 .unwrap();
166
167 file2.seek(SeekFrom::Start(17 + 3337 * 12374)).unwrap();
168 let mut buffer = [0; 4];
169 file2.read_exact(&mut buffer).unwrap();
170 assert_eq!(&buffer, &data[17..21]);
171 });
172
173 GlobalFlush::flush_to_disk();
174
175 (0..400).into_par_iter().for_each(|i: u32| {
176 crate::log_info!("Reading file {}", i);
177 let mut datar = vec![0; 3337];
178 let mut file = FileReader::open(
179 format!("/home/andrea/genome-assembly/test1234/{}.tmp", i),
180 None,
181 )
182 .unwrap();
183 for _ in 0..(1024 * 64) {
184 file.read_exact(datar.as_mut_slice()).unwrap();
185 assert_eq!(datar, data);
186 }
187 assert_eq!(file.read(datar.as_mut_slice()).unwrap(), 0);
188 crate::log_info!("Read file {}", i);
189 });
190
191 MemoryFs::terminate();
192 }
193}