parallel_processor/memory_fs/file/
internal.rs1use crate::memory_fs::allocator::{AllocatedChunk, CHUNKS_ALLOCATOR};
2use crate::memory_fs::file::flush::GlobalFlush;
3use crate::memory_fs::flushable_buffer::{FileFlushMode, FlushableItem};
4use crate::memory_fs::stats;
5use dashmap::DashMap;
6use filebuffer::FileBuffer;
7use once_cell::sync::Lazy;
8use parking_lot::lock_api::{ArcRwLockReadGuard, ArcRwLockWriteGuard};
9use parking_lot::{Mutex, RawRwLock, RwLock};
10use replace_with::replace_with_or_abort;
11use rustc_hash::FxHashMap;
12use std::cmp::min;
13use std::collections::BinaryHeap;
14use std::fs::remove_file;
15use std::ops::Deref;
16use std::path::{Path, PathBuf};
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::sync::{Arc, LazyLock, Weak};
19
20use super::handle::FileHandle;
21
22static MEMORY_MAPPED_FILES: Lazy<DashMap<PathBuf, Arc<RwLock<MemoryFileInternal>>>> =
23 Lazy::new(|| DashMap::new());
24
25#[derive(Default)]
26pub struct SwappableFiles {
27 index: FxHashMap<PathBuf, Weak<RwLock<MemoryFileInternal>>>,
28 queue: BinaryHeap<(usize, PathBuf)>,
29}
30
31impl SwappableFiles {
32 pub fn add_file(
33 &mut self,
34 priority: usize,
35 path: PathBuf,
36 file: Weak<RwLock<MemoryFileInternal>>,
37 ) {
38 self.index.insert(path.clone(), file);
39 self.queue.push((priority, path));
40 }
41
42 pub fn remove_file(&mut self, path: &Path) {
43 self.index.remove(path);
44 }
45
46 pub fn get_next(&mut self) -> Option<Arc<RwLock<MemoryFileInternal>>> {
47 while let Some((_, file)) = self.queue.pop() {
48 if let Some(file_entry) = self.index.remove(&file) {
49 if let Some(file_entry) = file_entry.upgrade() {
50 let file_read = file_entry.read();
51 if file_read.is_memory_preferred() && file_read.has_flush_pending_chunks() {
52 drop(file_read);
53 return Some(file_entry);
54 } else {
55 file_read.on_swap_list.store(false, Ordering::Relaxed);
57 }
58 }
59 }
60 }
61
62 None
63 }
64}
65
66pub static SWAPPABLE_FILES: LazyLock<Mutex<SwappableFiles>> =
67 LazyLock::new(|| Mutex::new(Default::default()));
68
69#[derive(Copy, Clone, Eq, PartialEq, Debug)]
70pub enum MemoryFileMode {
71 AlwaysMemory,
72 PreferMemory { swap_priority: usize },
73 DiskOnly,
74}
75
76#[derive(Copy, Clone, Eq, PartialEq)]
77pub enum OpenMode {
78 None,
79 Read,
80 Write,
81}
82
83pub enum FileChunk {
110 OnDisk { offset: u64, len: usize },
111 OnMemory { chunk: AllocatedChunk },
112}
113
114impl FileChunk {
115 pub fn get_length(&self) -> usize {
116 match self {
117 FileChunk::OnDisk { len, .. } => *len,
118 FileChunk::OnMemory { chunk } => chunk.len(),
119 }
120 }
121
122 #[inline(always)]
123 pub fn get_ptr(&self, file: &UnderlyingFile, prefetch: Option<usize>) -> *const u8 {
124 unsafe {
125 match self {
126 FileChunk::OnDisk { offset, .. } => {
127 if let UnderlyingFile::ReadMode(file) = file {
128 let file = file.as_ref().unwrap();
129
130 if let Some(prefetch) = prefetch {
131 let remaining_length = file.len() - *offset as usize;
132 let prefetch_length = min(remaining_length, prefetch);
133 file.prefetch(*offset as usize, prefetch_length);
134 }
135
136 file.as_ptr().add(*offset as usize)
137 } else {
138 panic!("Error, wrong underlying file!");
139 }
140 }
141 FileChunk::OnMemory { chunk } => chunk.get_mut_ptr() as *const u8,
142 }
143 }
144 }
145}
146
147pub enum UnderlyingFile {
148 NotOpened,
149 MemoryOnly,
150 MemoryPreferred,
151 WriteMode {
152 file: Arc<Mutex<FileHandle>>,
153 chunk_position: usize,
154 },
155 ReadMode(Option<FileBuffer>),
156}
157
158pub struct MemoryFileInternal {
159 path: PathBuf,
161 file: UnderlyingFile,
163 memory_mode: MemoryFileMode,
165 open_mode: (OpenMode, usize),
167 memory: Vec<Arc<RwLock<FileChunk>>>,
169 on_swap_list: AtomicBool,
171 can_flush: bool,
173}
174
175impl MemoryFileInternal {
176 pub fn create_new(path: impl AsRef<Path>, memory_mode: MemoryFileMode) -> Arc<RwLock<Self>> {
177 let new_file = Arc::new(RwLock::new(Self {
178 path: path.as_ref().into(),
179 file: UnderlyingFile::NotOpened,
180 memory_mode,
181 open_mode: (OpenMode::None, 0),
182 memory: Vec::new(),
183 on_swap_list: AtomicBool::new(false),
184 can_flush: true,
185 }));
186
187 MEMORY_MAPPED_FILES.insert(path.as_ref().into(), new_file.clone());
188
189 new_file
190 }
191
192 pub fn create_from_fs(path: impl AsRef<Path>) -> Option<Arc<RwLock<Self>>> {
193 if !path.as_ref().exists() || !path.as_ref().is_file() {
194 return None;
195 }
196 let len = path.as_ref().metadata().ok()?.len() as usize;
197
198 let new_file = Arc::new(RwLock::new(Self {
199 path: path.as_ref().into(),
200 file: UnderlyingFile::NotOpened,
201 memory_mode: MemoryFileMode::DiskOnly,
202 open_mode: (OpenMode::None, 0),
203 memory: vec![Arc::new(RwLock::new(FileChunk::OnDisk { offset: 0, len }))],
204 on_swap_list: AtomicBool::new(false),
205 can_flush: false,
206 }));
207
208 MEMORY_MAPPED_FILES.insert(path.as_ref().into(), new_file.clone());
209
210 Some(new_file)
211 }
212
213 pub fn debug_dump_files() {
214 for file in MEMORY_MAPPED_FILES.iter() {
215 let file = file.read();
216 crate::log_info!(
217 "File '{}' => chunks: {}",
218 file.path.display(),
219 file.memory.len()
220 );
221 }
222 }
223
224 pub fn is_on_disk(&self) -> bool {
225 self.memory_mode == MemoryFileMode::DiskOnly
226 }
227
228 pub fn is_memory_preferred(&self) -> bool {
229 if let MemoryFileMode::PreferMemory { .. } = self.memory_mode {
230 true
231 } else {
232 false
233 }
234 }
235
236 pub fn get_chunk(&self, index: usize) -> Arc<RwLock<FileChunk>> {
237 self.memory[index].clone()
238 }
239
240 pub fn get_chunks_count(&self) -> usize {
241 self.memory.len()
242 }
243
244 pub fn retrieve_reference(path: impl AsRef<Path>) -> Option<Arc<RwLock<Self>>> {
245 MEMORY_MAPPED_FILES.get(path.as_ref()).map(|f| f.clone())
246 }
247
248 pub fn active_files_count() -> usize {
249 MEMORY_MAPPED_FILES.len()
250 }
251
252 pub fn delete(path: impl AsRef<Path>, remove_fs: bool) -> bool {
253 if let Some(file) = MEMORY_MAPPED_FILES.remove(path.as_ref()) {
254 stats::decrease_files_usage(file.1.read().len() as u64);
255 if remove_fs {
256 match file.1.read().memory_mode {
257 MemoryFileMode::AlwaysMemory => {}
258 MemoryFileMode::PreferMemory { .. } => {
259 SWAPPABLE_FILES.lock().remove_file(path.as_ref())
260 }
261 MemoryFileMode::DiskOnly => {
262 if let Ok(file_meta) = std::fs::metadata(path.as_ref()) {
263 stats::decrease_disk_usage(file_meta.len());
264 }
265 let _ = remove_file(path);
266 }
267 }
268 }
269 true
270 } else {
271 false
272 }
273 }
274
275 pub fn delete_directory(dir: impl AsRef<Path>, remove_fs: bool) -> bool {
276 let mut all_succeeded = true;
277 let mut to_delete = vec![];
278 for file in MEMORY_MAPPED_FILES.iter() {
279 if file.key().starts_with(&dir) {
280 to_delete.push(file.key().clone());
281 }
282 }
283
284 for file in to_delete {
285 all_succeeded &= Self::delete(&file, remove_fs);
286 }
287
288 all_succeeded
289 }
290
291 fn create_writing_underlying_file(path: &Path) -> UnderlyingFile {
292 let _ = remove_file(path);
294
295 UnderlyingFile::WriteMode {
296 file: Arc::new(Mutex::new(FileHandle::new(path.to_path_buf()))),
297 chunk_position: 0,
298 }
299 }
300
301 pub fn open(&mut self, mode: OpenMode) -> Result<(), String> {
302 if self.open_mode.0 == mode {
303 self.open_mode.1 += 1;
304 return Ok(());
305 }
306
307 if self.open_mode.0 != OpenMode::None {
308 return Err(format!("File {} is already opened!", self.path.display()));
309 }
310
311 {
312 let mut error = None;
313 replace_with_or_abort(&mut self.file, |file| {
314 match mode {
315 OpenMode::None => UnderlyingFile::NotOpened,
316 OpenMode::Read => {
317 self.open_mode = (OpenMode::Read, 1);
318 self.can_flush = false;
319
320 if self.memory_mode != MemoryFileMode::DiskOnly {
321 UnderlyingFile::MemoryOnly
322 } else {
323 for chunk in self.memory.iter() {
325 drop(chunk.read());
326 }
327
328 if let UnderlyingFile::WriteMode { file, .. } = file {
329 file.lock().flush().unwrap();
330 }
331
332 UnderlyingFile::ReadMode(
333 FileBuffer::open(&self.path)
334 .inspect_err(|e| {
335 error = Some(format!(
336 "Error while opening file {}: {}",
337 self.path.display(),
338 e
339 ));
340 })
341 .ok(),
342 )
343 }
344 }
345 OpenMode::Write => {
346 self.open_mode = (OpenMode::Write, 1);
347 match self.memory_mode {
348 MemoryFileMode::AlwaysMemory => UnderlyingFile::MemoryOnly,
349 MemoryFileMode::PreferMemory { .. } => UnderlyingFile::MemoryPreferred,
350 MemoryFileMode::DiskOnly => {
351 Self::create_writing_underlying_file(&self.path)
352 }
353 }
354 }
355 }
356 });
357 if let Some(error) = error {
358 return Err(error);
359 }
360 }
361
362 Ok(())
363 }
364
365 pub fn close(&mut self) {
366 self.open_mode.1 -= 1;
367
368 if self.open_mode.1 == 0 {
369 self.open_mode.0 = OpenMode::None;
370 match &self.file {
371 UnderlyingFile::WriteMode { file, .. } => {
372 file.lock().flush().unwrap();
373 }
374 _ => {}
375 }
376 }
377 }
378
379 fn put_on_swappable_list(self_: &ArcRwLockWriteGuard<RawRwLock, Self>) {
380 if let MemoryFileMode::PreferMemory { swap_priority } = self_.memory_mode {
381 if !self_.on_swap_list.swap(true, Ordering::Relaxed) {
382 SWAPPABLE_FILES.lock().add_file(
383 swap_priority,
384 self_.path.clone(),
385 Arc::downgrade(ArcRwLockWriteGuard::rwlock(self_)),
386 );
387 }
388 }
389 }
390
391 pub fn reserve_space(
392 self_: &Arc<RwLock<Self>>,
393 last_chunk: AllocatedChunk,
394 out_chunks: &mut Vec<(Option<ArcRwLockReadGuard<RawRwLock, FileChunk>>, &mut [u8])>,
395 mut size: usize,
396 el_size: usize,
397 ) -> AllocatedChunk {
398 let mut chunk = last_chunk;
399
400 loop {
401 let rem_bytes = chunk.remaining_bytes();
402 let rem_elements = rem_bytes / el_size;
403 let el_bytes = min(size, rem_elements * el_size);
404
405 assert!(chunk.max_len() >= el_size);
406
407 let space = if el_bytes > 0 {
408 Some(unsafe { chunk.prealloc_bytes_single_thread(el_bytes) })
409 } else {
410 None
411 };
412
413 size -= el_bytes;
414
415 let mut self_ = self_.write_arc();
416
417 if size > 0 {
418 self_
419 .memory
420 .push(Arc::new(RwLock::new(FileChunk::OnMemory { chunk })));
421
422 if let Some(space) = space {
423 let chunk_guard = self_.memory.last().unwrap().read_arc();
424 out_chunks.push((Some(chunk_guard), space));
425 }
426 Self::put_on_swappable_list(&self_);
427
428 drop(self_);
429 chunk = CHUNKS_ALLOCATOR.request_chunk(chunk_usage!(FileBuffer {
430 path: std::panic::Location::caller().to_string()
431 }));
432 } else {
433 if let Some(space) = space {
434 out_chunks.push((None, space));
435 }
436 return chunk;
437 }
438 }
439 }
440
441 pub fn get_underlying_file(&self) -> &UnderlyingFile {
442 &self.file
443 }
444
445 pub fn add_chunk(self_: &Arc<RwLock<Self>>, chunk: AllocatedChunk) {
446 let mut self_ = self_.write_arc();
447 self_
448 .memory
449 .push(Arc::new(RwLock::new(FileChunk::OnMemory { chunk })));
450 Self::put_on_swappable_list(&self_);
451 }
452
453 pub fn flush_chunks(&mut self, limit: usize) -> usize {
454 if !self.can_flush {
455 return 0;
456 }
457
458 match &self.file {
459 UnderlyingFile::NotOpened | UnderlyingFile::MemoryPreferred => {
460 self.file = Self::create_writing_underlying_file(&self.path);
461 }
462 _ => {}
463 }
464
465 if let UnderlyingFile::WriteMode {
466 file,
467 chunk_position,
468 } = &mut self.file
469 {
470 {
471 let mut flushed_count = 0;
472 while flushed_count < limit {
473 if *chunk_position >= self.memory.len() {
474 return flushed_count;
475 }
476
477 if let Some(flushable_chunk) = self.memory[*chunk_position].try_write_arc() {
478 GlobalFlush::add_item_to_flush_queue(FlushableItem {
479 underlying_file: file.clone(),
480 mode: FileFlushMode::Append {
481 chunk: flushable_chunk,
482 },
483 });
484 *chunk_position += 1;
485 flushed_count += 1;
486 } else {
487 return flushed_count;
488 }
489 }
490 return flushed_count;
491 }
492 }
493
494 return 0;
495 }
496
497 pub fn flush_pending_chunks_count(&self) -> usize {
498 match &self.file {
499 UnderlyingFile::NotOpened
500 | UnderlyingFile::MemoryOnly
501 | UnderlyingFile::ReadMode(_) => 0,
502 UnderlyingFile::WriteMode { chunk_position, .. } => {
503 self.get_chunks_count() - *chunk_position
504 }
505 UnderlyingFile::MemoryPreferred => self.get_chunks_count(),
506 }
507 }
508
509 #[inline(always)]
510 pub fn has_flush_pending_chunks(&self) -> bool {
511 self.flush_pending_chunks_count() > 0
512 }
513
514 pub fn change_to_disk_only(&mut self) {
515 if self.is_memory_preferred() {
516 self.memory_mode = MemoryFileMode::DiskOnly;
517 self.file = Self::create_writing_underlying_file(&self.path);
518 }
519 }
520
521 #[inline(always)]
522 pub fn has_only_one_chunk(&self) -> bool {
523 self.memory.len() == 1
524 }
525
526 #[inline(always)]
527 pub fn get_path(&self) -> &Path {
528 self.path.as_ref()
529 }
530
531 #[inline(always)]
532 pub fn len(&self) -> usize {
533 self.memory
534 .iter()
535 .map(|x| match x.read().deref() {
536 FileChunk::OnDisk { len, .. } => *len,
537 FileChunk::OnMemory { chunk } => chunk.len(),
538 })
539 .sum::<usize>()
540 }
541}
542
543unsafe impl Sync for MemoryFileInternal {}
544unsafe impl Send for MemoryFileInternal {}