parallel_processor/memory_fs/file/
internal.rs1use crate::memory_fs::allocator::{AllocatedChunk, CHUNKS_ALLOCATOR};
2use crate::memory_fs::file::flush::GlobalFlush;
3use crate::memory_fs::flushable_buffer::{FileFlushMode, FlushableItem};
4use crate::memory_fs::stats;
5use dashmap::DashMap;
6use filebuffer::FileBuffer;
7use once_cell::sync::Lazy;
8use parking_lot::lock_api::{ArcRwLockReadGuard, ArcRwLockWriteGuard};
9use parking_lot::{Mutex, RawRwLock, RwLock};
10use replace_with::replace_with_or_abort;
11use rustc_hash::FxHashMap;
12use std::cmp::min;
13use std::collections::BinaryHeap;
14use std::fs::remove_file;
15use std::ops::Deref;
16use std::path::{Path, PathBuf};
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::sync::{Arc, LazyLock, Weak};
19
20use super::handle::FileHandle;
21
22static MEMORY_MAPPED_FILES: Lazy<DashMap<PathBuf, Arc<RwLock<MemoryFileInternal>>>> =
23 Lazy::new(|| DashMap::new());
24
25#[derive(Default)]
26pub struct SwappableFiles {
27 index: FxHashMap<PathBuf, Weak<RwLock<MemoryFileInternal>>>,
28 queue: BinaryHeap<(usize, PathBuf)>,
29}
30
31impl SwappableFiles {
32 pub fn add_file(
33 &mut self,
34 priority: usize,
35 path: PathBuf,
36 file: Weak<RwLock<MemoryFileInternal>>,
37 ) {
38 self.index.insert(path.clone(), file);
39 self.queue.push((priority, path));
40 }
41
42 pub fn remove_file(&mut self, path: &Path) {
43 self.index.remove(path);
44 }
45
46 pub fn get_next(&mut self) -> Option<Arc<RwLock<MemoryFileInternal>>> {
47 while let Some((_, file)) = self.queue.pop() {
48 if let Some(file_entry) = self.index.remove(&file) {
49 if let Some(file_entry) = file_entry.upgrade() {
50 let file_read = file_entry.read();
51 if file_read.is_memory_preferred() && file_read.has_flush_pending_chunks() {
52 drop(file_read);
53 return Some(file_entry);
54 } else {
55 file_read.on_swap_list.store(false, Ordering::Relaxed);
57 }
58 }
59 }
60 }
61
62 None
63 }
64}
65
66pub static SWAPPABLE_FILES: LazyLock<Mutex<SwappableFiles>> =
67 LazyLock::new(|| Mutex::new(Default::default()));
68
69#[derive(Copy, Clone, Eq, PartialEq, Debug)]
70pub enum MemoryFileMode {
71 AlwaysMemory,
72 PreferMemory { swap_priority: usize },
73 DiskOnly,
74}
75
76#[derive(Copy, Clone, Eq, PartialEq)]
77pub enum OpenMode {
78 None,
79 Read,
80 Write,
81}
82
83pub enum FileChunk {
110 OnDisk { offset: u64, len: usize },
111 OnMemory { chunk: AllocatedChunk },
112}
113
114impl FileChunk {
115 pub fn get_length(&self) -> usize {
116 match self {
117 FileChunk::OnDisk { len, .. } => *len,
118 FileChunk::OnMemory { chunk } => chunk.len(),
119 }
120 }
121
122 #[inline(always)]
123 pub fn get_ptr(&self, file: &UnderlyingFile, prefetch: Option<usize>) -> *const u8 {
124 unsafe {
125 match self {
126 FileChunk::OnDisk { offset, .. } => {
127 if let UnderlyingFile::ReadMode(file) = file {
128 let file = file.as_ref().unwrap();
129
130 if let Some(prefetch) = prefetch {
131 let remaining_length = file.len() - *offset as usize;
132 let prefetch_length = min(remaining_length, prefetch);
133 file.prefetch(*offset as usize, prefetch_length);
134 }
135
136 file.as_ptr().add(*offset as usize)
137 } else {
138 panic!("Error, wrong underlying file!");
139 }
140 }
141 FileChunk::OnMemory { chunk } => chunk.get_mut_ptr() as *const u8,
142 }
143 }
144 }
145}
146
147pub enum UnderlyingFile {
148 NotOpened,
149 MemoryOnly,
150 MemoryPreferred,
151 WriteMode {
152 file: Arc<Mutex<FileHandle>>,
153 chunk_position: usize,
154 },
155 ReadMode(Option<FileBuffer>),
156}
157
158pub struct MemoryFileInternal {
159 path: PathBuf,
161 file: UnderlyingFile,
163 memory_mode: MemoryFileMode,
165 open_mode: (OpenMode, usize),
167 memory: Vec<Arc<RwLock<FileChunk>>>,
169 on_swap_list: AtomicBool,
171 can_flush: bool,
173}
174
175impl MemoryFileInternal {
176 pub fn create_new(path: impl AsRef<Path>, memory_mode: MemoryFileMode) -> Arc<RwLock<Self>> {
177 let new_file = Arc::new(RwLock::new(Self {
178 path: path.as_ref().into(),
179 file: UnderlyingFile::NotOpened,
180 memory_mode,
181 open_mode: (OpenMode::None, 0),
182 memory: Vec::new(),
183 on_swap_list: AtomicBool::new(false),
184 can_flush: true,
185 }));
186
187 MEMORY_MAPPED_FILES.insert(path.as_ref().into(), new_file.clone());
188
189 new_file
190 }
191
192 pub fn create_from_fs(path: impl AsRef<Path>) -> Option<Arc<RwLock<Self>>> {
193 if !path.as_ref().exists() || !path.as_ref().is_file() {
194 return None;
195 }
196 let len = path.as_ref().metadata().ok()?.len() as usize;
197
198 let new_file = Arc::new(RwLock::new(Self {
199 path: path.as_ref().into(),
200 file: UnderlyingFile::NotOpened,
201 memory_mode: MemoryFileMode::DiskOnly,
202 open_mode: (OpenMode::None, 0),
203 memory: vec![Arc::new(RwLock::new(FileChunk::OnDisk { offset: 0, len }))],
204 on_swap_list: AtomicBool::new(false),
205 can_flush: false,
206 }));
207
208 MEMORY_MAPPED_FILES.insert(path.as_ref().into(), new_file.clone());
209
210 Some(new_file)
211 }
212
213 pub fn debug_dump_files() {
214 for file in MEMORY_MAPPED_FILES.iter() {
215 let file = file.read();
216 crate::log_info!(
217 "File '{}' => chunks: {}",
218 file.path.display(),
219 file.memory.len()
220 );
221 }
222 }
223
224 pub fn is_on_disk(&self) -> bool {
225 self.memory_mode == MemoryFileMode::DiskOnly
226 }
227
228 pub fn is_memory_preferred(&self) -> bool {
229 if let MemoryFileMode::PreferMemory { .. } = self.memory_mode {
230 true
231 } else {
232 false
233 }
234 }
235
236 pub fn get_chunk(&self, index: usize) -> Arc<RwLock<FileChunk>> {
237 self.memory[index].clone()
238 }
239
240 pub fn get_chunks_count(&self) -> usize {
241 self.memory.len()
242 }
243
244 pub fn retrieve_reference(path: impl AsRef<Path>) -> Option<Arc<RwLock<Self>>> {
245 MEMORY_MAPPED_FILES.get(path.as_ref()).map(|f| f.clone())
246 }
247
248 pub fn active_files_count() -> usize {
249 MEMORY_MAPPED_FILES.len()
250 }
251
252 pub fn flush_all_to_disk() {
253 for file in MEMORY_MAPPED_FILES.iter() {
254 let mut file = file.write();
255 file.change_to_disk_only();
256 file.flush_chunks(usize::MAX);
257 }
258 }
259
260 pub fn delete(path: impl AsRef<Path>, remove_fs: bool) -> bool {
261 if let Some(file) = MEMORY_MAPPED_FILES.remove(path.as_ref()) {
262 stats::decrease_files_usage(file.1.read().len() as u64);
263 if remove_fs {
264 match file.1.read().memory_mode {
265 MemoryFileMode::AlwaysMemory => {}
266 MemoryFileMode::PreferMemory { .. } => {
267 SWAPPABLE_FILES.lock().remove_file(path.as_ref())
268 }
269 MemoryFileMode::DiskOnly => {
270 if let Ok(file_meta) = std::fs::metadata(path.as_ref()) {
271 stats::decrease_disk_usage(file_meta.len());
272 }
273 let _ = remove_file(path);
274 }
275 }
276 }
277 true
278 } else {
279 false
280 }
281 }
282
283 pub fn delete_directory(dir: impl AsRef<Path>, remove_fs: bool) -> bool {
284 let mut all_succeeded = true;
285 let mut to_delete = vec![];
286 for file in MEMORY_MAPPED_FILES.iter() {
287 if file.key().starts_with(&dir) {
288 to_delete.push(file.key().clone());
289 }
290 }
291
292 for file in to_delete {
293 all_succeeded &= Self::delete(&file, remove_fs);
294 }
295
296 all_succeeded
297 }
298
299 fn create_writing_underlying_file(path: &Path) -> UnderlyingFile {
300 let _ = remove_file(path);
302
303 UnderlyingFile::WriteMode {
304 file: Arc::new(Mutex::new(FileHandle::new(path.to_path_buf()))),
305 chunk_position: 0,
306 }
307 }
308
309 pub fn open(&mut self, mode: OpenMode) -> Result<(), String> {
310 if self.open_mode.0 == mode {
311 self.open_mode.1 += 1;
312 return Ok(());
313 }
314
315 if self.open_mode.0 != OpenMode::None {
316 return Err(format!("File {} is already opened!", self.path.display()));
317 }
318
319 {
320 let mut error = None;
321 replace_with_or_abort(&mut self.file, |file| {
322 match mode {
323 OpenMode::None => UnderlyingFile::NotOpened,
324 OpenMode::Read => {
325 self.open_mode = (OpenMode::Read, 1);
326 self.can_flush = false;
327
328 if self.memory_mode != MemoryFileMode::DiskOnly {
329 UnderlyingFile::MemoryOnly
330 } else {
331 for chunk in self.memory.iter() {
333 drop(chunk.read());
334 }
335
336 if let UnderlyingFile::WriteMode { file, .. } = file {
337 file.lock().flush().unwrap();
338 }
339
340 UnderlyingFile::ReadMode(
341 FileBuffer::open(&self.path)
342 .inspect_err(|e| {
343 error = Some(format!(
344 "Error while opening file {}: {}",
345 self.path.display(),
346 e
347 ));
348 })
349 .ok(),
350 )
351 }
352 }
353 OpenMode::Write => {
354 self.open_mode = (OpenMode::Write, 1);
355 match self.memory_mode {
356 MemoryFileMode::AlwaysMemory => UnderlyingFile::MemoryOnly,
357 MemoryFileMode::PreferMemory { .. } => UnderlyingFile::MemoryPreferred,
358 MemoryFileMode::DiskOnly => {
359 Self::create_writing_underlying_file(&self.path)
360 }
361 }
362 }
363 }
364 });
365 if let Some(error) = error {
366 return Err(error);
367 }
368 }
369
370 Ok(())
371 }
372
373 pub fn close(&mut self) {
374 self.open_mode.1 -= 1;
375
376 if self.open_mode.1 == 0 {
377 self.open_mode.0 = OpenMode::None;
378 match &self.file {
379 UnderlyingFile::WriteMode { file, .. } => {
380 file.lock().flush().unwrap();
381 }
382 _ => {}
383 }
384 }
385 }
386
387 fn put_on_swappable_list(self_: &ArcRwLockWriteGuard<RawRwLock, Self>) {
388 if let MemoryFileMode::PreferMemory { swap_priority } = self_.memory_mode {
389 if !self_.on_swap_list.swap(true, Ordering::Relaxed) {
390 SWAPPABLE_FILES.lock().add_file(
391 swap_priority,
392 self_.path.clone(),
393 Arc::downgrade(ArcRwLockWriteGuard::rwlock(self_)),
394 );
395 }
396 }
397 }
398
399 pub fn reserve_space(
400 self_: &Arc<RwLock<Self>>,
401 last_chunk: AllocatedChunk,
402 out_chunks: &mut Vec<(Option<ArcRwLockReadGuard<RawRwLock, FileChunk>>, &mut [u8])>,
403 mut size: usize,
404 el_size: usize,
405 ) -> AllocatedChunk {
406 let mut chunk = last_chunk;
407
408 loop {
409 let rem_bytes = chunk.remaining_bytes();
410 let rem_elements = rem_bytes / el_size;
411 let el_bytes = min(size, rem_elements * el_size);
412
413 assert!(chunk.max_len() >= el_size);
414
415 let space = if el_bytes > 0 {
416 Some(unsafe { chunk.prealloc_bytes_single_thread(el_bytes) })
417 } else {
418 None
419 };
420
421 size -= el_bytes;
422
423 let mut self_ = self_.write_arc();
424
425 if size > 0 {
426 self_
427 .memory
428 .push(Arc::new(RwLock::new(FileChunk::OnMemory { chunk })));
429
430 if let Some(space) = space {
431 let chunk_guard = self_.memory.last().unwrap().read_arc();
432 out_chunks.push((Some(chunk_guard), space));
433 }
434 Self::put_on_swappable_list(&self_);
435
436 drop(self_);
437 chunk = CHUNKS_ALLOCATOR.request_chunk(chunk_usage!(FileBuffer {
438 path: std::panic::Location::caller().to_string()
439 }));
440 } else {
441 if let Some(space) = space {
442 out_chunks.push((None, space));
443 }
444 return chunk;
445 }
446 }
447 }
448
449 pub fn get_underlying_file(&self) -> &UnderlyingFile {
450 &self.file
451 }
452
453 pub fn add_chunk(self_: &Arc<RwLock<Self>>, chunk: AllocatedChunk) {
454 let mut self_ = self_.write_arc();
455 self_
456 .memory
457 .push(Arc::new(RwLock::new(FileChunk::OnMemory { chunk })));
458 Self::put_on_swappable_list(&self_);
459 }
460
461 pub fn flush_chunks(&mut self, limit: usize) -> usize {
462 if !self.can_flush {
463 return 0;
464 }
465
466 match &self.file {
467 UnderlyingFile::NotOpened | UnderlyingFile::MemoryPreferred => {
468 self.file = Self::create_writing_underlying_file(&self.path);
469 }
470 _ => {}
471 }
472
473 if let UnderlyingFile::WriteMode {
474 file,
475 chunk_position,
476 } = &mut self.file
477 {
478 {
479 let mut flushed_count = 0;
480 while flushed_count < limit {
481 if *chunk_position >= self.memory.len() {
482 return flushed_count;
483 }
484
485 if let Some(flushable_chunk) = self.memory[*chunk_position].try_write_arc() {
486 GlobalFlush::add_item_to_flush_queue(FlushableItem {
487 underlying_file: file.clone(),
488 mode: FileFlushMode::Append {
489 chunk: flushable_chunk,
490 },
491 });
492 *chunk_position += 1;
493 flushed_count += 1;
494 } else {
495 return flushed_count;
496 }
497 }
498 return flushed_count;
499 }
500 }
501
502 return 0;
503 }
504
505 pub fn flush_pending_chunks_count(&self) -> usize {
506 match &self.file {
507 UnderlyingFile::NotOpened
508 | UnderlyingFile::MemoryOnly
509 | UnderlyingFile::ReadMode(_) => 0,
510 UnderlyingFile::WriteMode { chunk_position, .. } => {
511 self.get_chunks_count() - *chunk_position
512 }
513 UnderlyingFile::MemoryPreferred => self.get_chunks_count(),
514 }
515 }
516
517 #[inline(always)]
518 pub fn has_flush_pending_chunks(&self) -> bool {
519 self.flush_pending_chunks_count() > 0
520 }
521
522 pub fn change_to_disk_only(&mut self) {
523 if self.is_memory_preferred() {
524 self.memory_mode = MemoryFileMode::DiskOnly;
525 self.file = Self::create_writing_underlying_file(&self.path);
526 }
527 }
528
529 #[inline(always)]
530 pub fn has_only_one_chunk(&self) -> bool {
531 self.memory.len() == 1
532 }
533
534 #[inline(always)]
535 pub fn get_path(&self) -> &Path {
536 self.path.as_ref()
537 }
538
539 #[inline(always)]
540 pub fn len(&self) -> usize {
541 self.memory
542 .iter()
543 .map(|x| match x.read().deref() {
544 FileChunk::OnDisk { len, .. } => *len,
545 FileChunk::OnMemory { chunk } => chunk.len(),
546 })
547 .sum::<usize>()
548 }
549}
550
551unsafe impl Sync for MemoryFileInternal {}
552unsafe impl Send for MemoryFileInternal {}