parallel_processor/memory_fs/file/
internal.rs1use crate::memory_fs::allocator::{AllocatedChunk, CHUNKS_ALLOCATOR};
2use crate::memory_fs::file::flush::GlobalFlush;
3use crate::memory_fs::flushable_buffer::{FileFlushMode, FlushableItem};
4use crate::memory_fs::stats;
5use dashmap::DashMap;
6use filebuffer::FileBuffer;
7use nightly_quirks::utils::NightlyUtils;
8use once_cell::sync::Lazy;
9use parking_lot::lock_api::{ArcRwLockReadGuard, ArcRwLockWriteGuard, RawMutex};
10use parking_lot::{Mutex, RawRwLock, RwLock};
11use replace_with::replace_with_or_abort;
12use rustc_hash::FxHashMap;
13use std::cmp::min;
14use std::fs::remove_file;
15use std::ops::Deref;
16use std::path::{Path, PathBuf};
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::sync::{Arc, Weak};
19
20use super::handle::FileHandle;
21
22static MEMORY_MAPPED_FILES: Lazy<DashMap<PathBuf, Arc<RwLock<MemoryFileInternal>>>> =
23 Lazy::new(|| DashMap::new());
24
25pub static SWAPPABLE_FILES: Mutex<
26 Option<FxHashMap<(usize, PathBuf), Weak<RwLock<MemoryFileInternal>>>>,
27> = Mutex::const_new(parking_lot::RawMutex::INIT, None);
28
29#[derive(Copy, Clone, Eq, PartialEq, Debug)]
30pub enum MemoryFileMode {
31 AlwaysMemory,
32 PreferMemory { swap_priority: usize },
33 DiskOnly,
34}
35
36#[derive(Copy, Clone, Eq, PartialEq)]
37pub enum OpenMode {
38 None,
39 Read,
40 Write,
41}
42
43pub enum FileChunk {
70 OnDisk { offset: u64, len: usize },
71 OnMemory { chunk: AllocatedChunk },
72}
73
74impl FileChunk {
75 pub fn get_length(&self) -> usize {
76 match self {
77 FileChunk::OnDisk { len, .. } => *len,
78 FileChunk::OnMemory { chunk } => chunk.len(),
79 }
80 }
81
82 #[inline(always)]
83 pub fn get_ptr(&self, file: &UnderlyingFile, prefetch: Option<usize>) -> *const u8 {
84 unsafe {
85 match self {
86 FileChunk::OnDisk { offset, .. } => {
87 if let UnderlyingFile::ReadMode(file) = file {
88 let file = file.as_ref().unwrap();
89
90 if let Some(prefetch) = prefetch {
91 let remaining_length = file.len() - *offset as usize;
92 let prefetch_length = min(remaining_length, prefetch);
93 file.prefetch(*offset as usize, prefetch_length);
94 }
95
96 file.as_ptr().add(*offset as usize)
97 } else {
98 panic!("Error, wrong underlying file!");
99 }
100 }
101 FileChunk::OnMemory { chunk } => chunk.get_mut_ptr() as *const u8,
102 }
103 }
104 }
105}
106
107pub enum UnderlyingFile {
108 NotOpened,
109 MemoryOnly,
110 MemoryPreferred,
111 WriteMode {
112 file: Arc<Mutex<FileHandle>>,
113 chunk_position: usize,
114 },
115 ReadMode(Option<FileBuffer>),
116}
117
118pub struct MemoryFileInternal {
119 path: PathBuf,
121 file: UnderlyingFile,
123 memory_mode: MemoryFileMode,
125 open_mode: (OpenMode, usize),
127 memory: Vec<Arc<RwLock<FileChunk>>>,
129 on_swap_list: AtomicBool,
131 can_flush: bool,
133}
134
135impl MemoryFileInternal {
136 pub fn create_new(path: impl AsRef<Path>, memory_mode: MemoryFileMode) -> Arc<RwLock<Self>> {
137 let new_file = Arc::new(RwLock::new(Self {
138 path: path.as_ref().into(),
139 file: UnderlyingFile::NotOpened,
140 memory_mode,
141 open_mode: (OpenMode::None, 0),
142 memory: Vec::new(),
143 on_swap_list: AtomicBool::new(false),
144 can_flush: true,
145 }));
146
147 MEMORY_MAPPED_FILES.insert(path.as_ref().into(), new_file.clone());
148
149 new_file
150 }
151
152 pub fn create_from_fs(path: impl AsRef<Path>) -> Option<Arc<RwLock<Self>>> {
153 if !path.as_ref().exists() || !path.as_ref().is_file() {
154 return None;
155 }
156 let len = path.as_ref().metadata().ok()?.len() as usize;
157
158 let new_file = Arc::new(RwLock::new(Self {
159 path: path.as_ref().into(),
160 file: UnderlyingFile::NotOpened,
161 memory_mode: MemoryFileMode::DiskOnly,
162 open_mode: (OpenMode::None, 0),
163 memory: vec![Arc::new(RwLock::new(FileChunk::OnDisk { offset: 0, len }))],
164 on_swap_list: AtomicBool::new(false),
165 can_flush: false,
166 }));
167
168 MEMORY_MAPPED_FILES.insert(path.as_ref().into(), new_file.clone());
169
170 Some(new_file)
171 }
172
173 pub fn debug_dump_files() {
174 for file in MEMORY_MAPPED_FILES.iter() {
175 let file = file.read();
176 crate::log_info!(
177 "File '{}' => chunks: {}",
178 file.path.display(),
179 file.memory.len()
180 );
181 }
182 }
183
184 pub fn is_on_disk(&self) -> bool {
185 self.memory_mode == MemoryFileMode::DiskOnly
186 }
187
188 pub fn is_memory_preferred(&self) -> bool {
189 if let MemoryFileMode::PreferMemory { .. } = self.memory_mode {
190 true
191 } else {
192 false
193 }
194 }
195
196 pub fn get_chunk(&self, index: usize) -> Arc<RwLock<FileChunk>> {
197 self.memory[index].clone()
198 }
199
200 pub fn get_chunks_count(&self) -> usize {
201 self.memory.len()
202 }
203
204 pub fn retrieve_reference(path: impl AsRef<Path>) -> Option<Arc<RwLock<Self>>> {
205 MEMORY_MAPPED_FILES.get(path.as_ref()).map(|f| f.clone())
206 }
207
208 pub fn active_files_count() -> usize {
209 MEMORY_MAPPED_FILES.len()
210 }
211
212 pub fn delete(path: impl AsRef<Path>, remove_fs: bool) -> bool {
213 if let Some(file) = MEMORY_MAPPED_FILES.remove(path.as_ref()) {
214 stats::decrease_files_usage(file.1.read().len() as u64);
215 if remove_fs {
216 match file.1.read().memory_mode {
217 MemoryFileMode::AlwaysMemory => {}
218 MemoryFileMode::PreferMemory { swap_priority } => {
219 NightlyUtils::mutex_get_or_init(&mut SWAPPABLE_FILES.lock(), || {
220 FxHashMap::default()
221 })
222 .remove(&(swap_priority, path.as_ref().to_path_buf()));
223 }
224 MemoryFileMode::DiskOnly => {
225 if let Ok(file_meta) = std::fs::metadata(path.as_ref()) {
226 stats::decrease_disk_usage(file_meta.len());
227 }
228 let _ = remove_file(path);
229 }
230 }
231 }
232 true
233 } else {
234 false
235 }
236 }
237
238 pub fn delete_directory(dir: impl AsRef<Path>, remove_fs: bool) -> bool {
239 let mut all_succeeded = true;
240 let mut to_delete = vec![];
241 for file in MEMORY_MAPPED_FILES.iter() {
242 if file.key().starts_with(&dir) {
243 to_delete.push(file.key().clone());
244 }
245 }
246
247 for file in to_delete {
248 all_succeeded &= Self::delete(&file, remove_fs);
249 }
250
251 all_succeeded
252 }
253
254 fn create_writing_underlying_file(path: &Path) -> UnderlyingFile {
255 let _ = remove_file(path);
257
258 UnderlyingFile::WriteMode {
259 file: Arc::new(Mutex::new(FileHandle::new(path.to_path_buf()))),
260 chunk_position: 0,
261 }
262 }
263
264 pub fn open(&mut self, mode: OpenMode) -> Result<(), String> {
265 if self.open_mode.0 == mode {
266 self.open_mode.1 += 1;
267 return Ok(());
268 }
269
270 if self.open_mode.0 != OpenMode::None {
271 return Err(format!("File {} is already opened!", self.path.display()));
272 }
273
274 {
275 let mut error = None;
276 replace_with_or_abort(&mut self.file, |file| {
277 match mode {
278 OpenMode::None => UnderlyingFile::NotOpened,
279 OpenMode::Read => {
280 self.open_mode = (OpenMode::Read, 1);
281 self.can_flush = false;
282
283 if self.memory_mode != MemoryFileMode::DiskOnly {
284 UnderlyingFile::MemoryOnly
285 } else {
286 for chunk in self.memory.iter() {
288 drop(chunk.read());
289 }
290
291 if let UnderlyingFile::WriteMode { file, .. } = file {
292 file.lock().flush().unwrap();
293 }
294
295 UnderlyingFile::ReadMode(
296 FileBuffer::open(&self.path)
297 .inspect_err(|e| {
298 error = Some(format!(
299 "Error while opening file {}: {}",
300 self.path.display(),
301 e
302 ));
303 })
304 .ok(),
305 )
306 }
307 }
308 OpenMode::Write => {
309 self.open_mode = (OpenMode::Write, 1);
310 match self.memory_mode {
311 MemoryFileMode::AlwaysMemory => UnderlyingFile::MemoryOnly,
312 MemoryFileMode::PreferMemory { .. } => UnderlyingFile::MemoryPreferred,
313 MemoryFileMode::DiskOnly => {
314 Self::create_writing_underlying_file(&self.path)
315 }
316 }
317 }
318 }
319 });
320 if let Some(error) = error {
321 return Err(error);
322 }
323 }
324
325 Ok(())
326 }
327
328 pub fn close(&mut self) {
329 self.open_mode.1 -= 1;
330
331 if self.open_mode.1 == 0 {
332 self.open_mode.0 = OpenMode::None;
333 match &self.file {
334 UnderlyingFile::WriteMode { file, .. } => {
335 file.lock().flush().unwrap();
336 }
337 _ => {}
338 }
339 }
340 }
341
342 fn put_on_swappable_list(self_: &ArcRwLockWriteGuard<RawRwLock, Self>) {
343 if let MemoryFileMode::PreferMemory { swap_priority } = self_.memory_mode {
344 if !self_.on_swap_list.swap(true, Ordering::Relaxed) {
345 NightlyUtils::mutex_get_or_init(&mut SWAPPABLE_FILES.lock(), || {
346 FxHashMap::default()
347 })
348 .insert(
349 (swap_priority, self_.path.clone()),
350 Arc::downgrade(ArcRwLockWriteGuard::rwlock(self_)),
351 );
352 }
353 }
354 }
355
356 pub fn reserve_space(
357 self_: &Arc<RwLock<Self>>,
358 last_chunk: AllocatedChunk,
359 out_chunks: &mut Vec<(Option<ArcRwLockReadGuard<RawRwLock, FileChunk>>, &mut [u8])>,
360 mut size: usize,
361 el_size: usize,
362 ) -> AllocatedChunk {
363 let mut chunk = last_chunk;
364
365 loop {
366 let rem_bytes = chunk.remaining_bytes();
367 let rem_elements = rem_bytes / el_size;
368 let el_bytes = min(size, rem_elements * el_size);
369
370 assert!(chunk.max_len() >= el_size);
371
372 let space = if el_bytes > 0 {
373 Some(unsafe { chunk.prealloc_bytes_single_thread(el_bytes) })
374 } else {
375 None
376 };
377
378 size -= el_bytes;
379
380 let mut self_ = self_.write_arc();
381
382 if size > 0 {
383 self_
384 .memory
385 .push(Arc::new(RwLock::new(FileChunk::OnMemory { chunk })));
386
387 if let Some(space) = space {
388 let chunk_guard = self_.memory.last().unwrap().read_arc();
389 out_chunks.push((Some(chunk_guard), space));
390 }
391 Self::put_on_swappable_list(&self_);
392
393 drop(self_);
394 chunk = CHUNKS_ALLOCATOR.request_chunk(chunk_usage!(FileBuffer {
395 path: std::panic::Location::caller().to_string()
396 }));
397 } else {
398 if let Some(space) = space {
399 out_chunks.push((None, space));
400 }
401 return chunk;
402 }
403 }
404 }
405
406 pub fn get_underlying_file(&self) -> &UnderlyingFile {
407 &self.file
408 }
409
410 pub fn add_chunk(self_: &Arc<RwLock<Self>>, chunk: AllocatedChunk) {
411 let mut self_ = self_.write_arc();
412 self_
413 .memory
414 .push(Arc::new(RwLock::new(FileChunk::OnMemory { chunk })));
415 Self::put_on_swappable_list(&self_);
416 }
417
418 pub fn flush_chunks(&mut self, limit: usize) -> usize {
419 if !self.can_flush {
420 return 0;
421 }
422
423 match &self.file {
424 UnderlyingFile::NotOpened | UnderlyingFile::MemoryPreferred => {
425 self.file = Self::create_writing_underlying_file(&self.path);
426 }
427 _ => {}
428 }
429
430 if let UnderlyingFile::WriteMode {
431 file,
432 chunk_position,
433 } = &mut self.file
434 {
435 {
436 let mut flushed_count = 0;
437 while flushed_count < limit {
438 if *chunk_position >= self.memory.len() {
439 return flushed_count;
440 }
441
442 if let Some(flushable_chunk) = self.memory[*chunk_position].try_write_arc() {
443 GlobalFlush::add_item_to_flush_queue(FlushableItem {
444 underlying_file: file.clone(),
445 mode: FileFlushMode::Append {
446 chunk: flushable_chunk,
447 },
448 });
449 *chunk_position += 1;
450 flushed_count += 1;
451 } else {
452 return flushed_count;
453 }
454 }
455 return flushed_count;
456 }
457 }
458
459 return 0;
460 }
461
462 pub fn flush_pending_chunks_count(&self) -> usize {
463 match &self.file {
464 UnderlyingFile::NotOpened
465 | UnderlyingFile::MemoryOnly
466 | UnderlyingFile::ReadMode(_) => 0,
467 UnderlyingFile::WriteMode { chunk_position, .. } => {
468 self.get_chunks_count() - *chunk_position
469 }
470 UnderlyingFile::MemoryPreferred => self.get_chunks_count(),
471 }
472 }
473
474 #[inline(always)]
475 pub fn has_flush_pending_chunks(&self) -> bool {
476 self.flush_pending_chunks_count() > 0
477 }
478
479 pub fn change_to_disk_only(&mut self) {
480 if self.is_memory_preferred() {
481 self.memory_mode = MemoryFileMode::DiskOnly;
482 self.file = Self::create_writing_underlying_file(&self.path);
483 }
484 }
485
486 #[inline(always)]
487 pub fn has_only_one_chunk(&self) -> bool {
488 self.memory.len() == 1
489 }
490
491 #[inline(always)]
492 pub fn get_path(&self) -> &Path {
493 self.path.as_ref()
494 }
495
496 #[inline(always)]
497 pub fn len(&self) -> usize {
498 self.memory
499 .iter()
500 .map(|x| match x.read().deref() {
501 FileChunk::OnDisk { len, .. } => *len,
502 FileChunk::OnMemory { chunk } => chunk.len(),
503 })
504 .sum::<usize>()
505 }
506}
507
508unsafe impl Sync for MemoryFileInternal {}
509unsafe impl Send for MemoryFileInternal {}