parallel_processor/memory_fs/file/
internal.rs

1use crate::memory_fs::allocator::{AllocatedChunk, CHUNKS_ALLOCATOR};
2use crate::memory_fs::file::flush::GlobalFlush;
3use crate::memory_fs::flushable_buffer::{FileFlushMode, FlushableItem};
4use crate::memory_fs::stats;
5use dashmap::DashMap;
6use filebuffer::FileBuffer;
7use once_cell::sync::Lazy;
8use parking_lot::lock_api::{ArcRwLockReadGuard, ArcRwLockWriteGuard};
9use parking_lot::{Mutex, RawRwLock, RwLock};
10use replace_with::replace_with_or_abort;
11use rustc_hash::FxHashMap;
12use std::cmp::min;
13use std::collections::BinaryHeap;
14use std::fs::remove_file;
15use std::ops::Deref;
16use std::path::{Path, PathBuf};
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::sync::{Arc, LazyLock, Weak};
19
20use super::handle::FileHandle;
21
22static MEMORY_MAPPED_FILES: Lazy<DashMap<PathBuf, Arc<RwLock<MemoryFileInternal>>>> =
23    Lazy::new(|| DashMap::new());
24
25#[derive(Default)]
26pub struct SwappableFiles {
27    index: FxHashMap<PathBuf, Weak<RwLock<MemoryFileInternal>>>,
28    queue: BinaryHeap<(usize, PathBuf)>,
29}
30
31impl SwappableFiles {
32    pub fn add_file(
33        &mut self,
34        priority: usize,
35        path: PathBuf,
36        file: Weak<RwLock<MemoryFileInternal>>,
37    ) {
38        self.index.insert(path.clone(), file);
39        self.queue.push((priority, path));
40    }
41
42    pub fn remove_file(&mut self, path: &Path) {
43        self.index.remove(path);
44    }
45
46    pub fn get_next(&mut self) -> Option<Arc<RwLock<MemoryFileInternal>>> {
47        while let Some((_, file)) = self.queue.pop() {
48            if let Some(file_entry) = self.index.remove(&file) {
49                if let Some(file_entry) = file_entry.upgrade() {
50                    let file_read = file_entry.read();
51                    if file_read.is_memory_preferred() && file_read.has_flush_pending_chunks() {
52                        drop(file_read);
53                        return Some(file_entry);
54                    } else {
55                        // Remove the file if it cannot be swapped
56                        file_read.on_swap_list.store(false, Ordering::Relaxed);
57                    }
58                }
59            }
60        }
61
62        None
63    }
64}
65
66pub static SWAPPABLE_FILES: LazyLock<Mutex<SwappableFiles>> =
67    LazyLock::new(|| Mutex::new(Default::default()));
68
69#[derive(Copy, Clone, Eq, PartialEq, Debug)]
70pub enum MemoryFileMode {
71    AlwaysMemory,
72    PreferMemory { swap_priority: usize },
73    DiskOnly,
74}
75
76#[derive(Copy, Clone, Eq, PartialEq)]
77pub enum OpenMode {
78    None,
79    Read,
80    Write,
81}
82
83// struct RwLockIterator<'a, A, B, I: Iterator<Item = B>> {
84//     _lock: RwLockReadGuard<'a, A>,
85//     iterator: I,
86// }
87
88// impl<'a, A, B, I: Iterator<Item = B>> Iterator for RwLockIterator<'a, A, B, I> {
89//     type Item = B;
90
91//     fn next(&mut self) -> Option<Self::Item> {
92//         self.iterator.next()
93//     }
94// }
95
96// pub struct MemoryFileChunksIterator<'a, I: Iterator<Item = &'a mut [u8]>> {
97//     iter: I,
98// }
99
100// impl<'a, I: Iterator<Item = &'a mut [u8]>> Iterator for MemoryFileChunksIterator<'a, I> {
101//     type Item = &'a mut [u8];
102
103//     #[inline(always)]
104//     fn next(&mut self) -> Option<Self::Item> {
105//         self.iter.next()
106//     }
107// }
108
109pub enum FileChunk {
110    OnDisk { offset: u64, len: usize },
111    OnMemory { chunk: AllocatedChunk },
112}
113
114impl FileChunk {
115    pub fn get_length(&self) -> usize {
116        match self {
117            FileChunk::OnDisk { len, .. } => *len,
118            FileChunk::OnMemory { chunk } => chunk.len(),
119        }
120    }
121
122    #[inline(always)]
123    pub fn get_ptr(&self, file: &UnderlyingFile, prefetch: Option<usize>) -> *const u8 {
124        unsafe {
125            match self {
126                FileChunk::OnDisk { offset, .. } => {
127                    if let UnderlyingFile::ReadMode(file) = file {
128                        let file = file.as_ref().unwrap();
129
130                        if let Some(prefetch) = prefetch {
131                            let remaining_length = file.len() - *offset as usize;
132                            let prefetch_length = min(remaining_length, prefetch);
133                            file.prefetch(*offset as usize, prefetch_length);
134                        }
135
136                        file.as_ptr().add(*offset as usize)
137                    } else {
138                        panic!("Error, wrong underlying file!");
139                    }
140                }
141                FileChunk::OnMemory { chunk } => chunk.get_mut_ptr() as *const u8,
142            }
143        }
144    }
145}
146
147pub enum UnderlyingFile {
148    NotOpened,
149    MemoryOnly,
150    MemoryPreferred,
151    WriteMode {
152        file: Arc<Mutex<FileHandle>>,
153        chunk_position: usize,
154    },
155    ReadMode(Option<FileBuffer>),
156}
157
158pub struct MemoryFileInternal {
159    /// Path associated with the current file
160    path: PathBuf,
161    /// Disk read/write structure
162    file: UnderlyingFile,
163    /// Memory mode
164    memory_mode: MemoryFileMode,
165    /// Read or write mode
166    open_mode: (OpenMode, usize),
167    /// Actual memory mapping
168    memory: Vec<Arc<RwLock<FileChunk>>>,
169    /// True if it's in the swap list
170    on_swap_list: AtomicBool,
171    /// True if more chunks can be flushed
172    can_flush: bool,
173}
174
175impl MemoryFileInternal {
176    pub fn create_new(path: impl AsRef<Path>, memory_mode: MemoryFileMode) -> Arc<RwLock<Self>> {
177        let new_file = Arc::new(RwLock::new(Self {
178            path: path.as_ref().into(),
179            file: UnderlyingFile::NotOpened,
180            memory_mode,
181            open_mode: (OpenMode::None, 0),
182            memory: Vec::new(),
183            on_swap_list: AtomicBool::new(false),
184            can_flush: true,
185        }));
186
187        MEMORY_MAPPED_FILES.insert(path.as_ref().into(), new_file.clone());
188
189        new_file
190    }
191
192    pub fn create_from_fs(path: impl AsRef<Path>) -> Option<Arc<RwLock<Self>>> {
193        if !path.as_ref().exists() || !path.as_ref().is_file() {
194            return None;
195        }
196        let len = path.as_ref().metadata().ok()?.len() as usize;
197
198        let new_file = Arc::new(RwLock::new(Self {
199            path: path.as_ref().into(),
200            file: UnderlyingFile::NotOpened,
201            memory_mode: MemoryFileMode::DiskOnly,
202            open_mode: (OpenMode::None, 0),
203            memory: vec![Arc::new(RwLock::new(FileChunk::OnDisk { offset: 0, len }))],
204            on_swap_list: AtomicBool::new(false),
205            can_flush: false,
206        }));
207
208        MEMORY_MAPPED_FILES.insert(path.as_ref().into(), new_file.clone());
209
210        Some(new_file)
211    }
212
213    pub fn debug_dump_files() {
214        for file in MEMORY_MAPPED_FILES.iter() {
215            let file = file.read();
216            crate::log_info!(
217                "File '{}' => chunks: {}",
218                file.path.display(),
219                file.memory.len()
220            );
221        }
222    }
223
224    pub fn is_on_disk(&self) -> bool {
225        self.memory_mode == MemoryFileMode::DiskOnly
226    }
227
228    pub fn is_memory_preferred(&self) -> bool {
229        if let MemoryFileMode::PreferMemory { .. } = self.memory_mode {
230            true
231        } else {
232            false
233        }
234    }
235
236    pub fn get_chunk(&self, index: usize) -> Arc<RwLock<FileChunk>> {
237        self.memory[index].clone()
238    }
239
240    pub fn get_chunks_count(&self) -> usize {
241        self.memory.len()
242    }
243
244    pub fn retrieve_reference(path: impl AsRef<Path>) -> Option<Arc<RwLock<Self>>> {
245        MEMORY_MAPPED_FILES.get(path.as_ref()).map(|f| f.clone())
246    }
247
248    pub fn active_files_count() -> usize {
249        MEMORY_MAPPED_FILES.len()
250    }
251
252    pub fn flush_all_to_disk() {
253        for file in MEMORY_MAPPED_FILES.iter() {
254            let mut file = file.write();
255            file.change_to_disk_only();
256            file.flush_chunks(usize::MAX);
257        }
258    }
259
260    pub fn delete(path: impl AsRef<Path>, remove_fs: bool) -> bool {
261        if let Some(file) = MEMORY_MAPPED_FILES.remove(path.as_ref()) {
262            stats::decrease_files_usage(file.1.read().len() as u64);
263            if remove_fs {
264                match file.1.read().memory_mode {
265                    MemoryFileMode::AlwaysMemory => {}
266                    MemoryFileMode::PreferMemory { .. } => {
267                        SWAPPABLE_FILES.lock().remove_file(path.as_ref())
268                    }
269                    MemoryFileMode::DiskOnly => {
270                        if let Ok(file_meta) = std::fs::metadata(path.as_ref()) {
271                            stats::decrease_disk_usage(file_meta.len());
272                        }
273                        let _ = remove_file(path);
274                    }
275                }
276            }
277            true
278        } else {
279            false
280        }
281    }
282
283    pub fn delete_directory(dir: impl AsRef<Path>, remove_fs: bool) -> bool {
284        let mut all_succeeded = true;
285        let mut to_delete = vec![];
286        for file in MEMORY_MAPPED_FILES.iter() {
287            if file.key().starts_with(&dir) {
288                to_delete.push(file.key().clone());
289            }
290        }
291
292        for file in to_delete {
293            all_succeeded &= Self::delete(&file, remove_fs);
294        }
295
296        all_succeeded
297    }
298
299    fn create_writing_underlying_file(path: &Path) -> UnderlyingFile {
300        // Remove the file if it existed from a previous run
301        let _ = remove_file(path);
302
303        UnderlyingFile::WriteMode {
304            file: Arc::new(Mutex::new(FileHandle::new(path.to_path_buf()))),
305            chunk_position: 0,
306        }
307    }
308
309    pub fn open(&mut self, mode: OpenMode) -> Result<(), String> {
310        if self.open_mode.0 == mode {
311            self.open_mode.1 += 1;
312            return Ok(());
313        }
314
315        if self.open_mode.0 != OpenMode::None {
316            return Err(format!("File {} is already opened!", self.path.display()));
317        }
318
319        {
320            let mut error = None;
321            replace_with_or_abort(&mut self.file, |file| {
322                match mode {
323                    OpenMode::None => UnderlyingFile::NotOpened,
324                    OpenMode::Read => {
325                        self.open_mode = (OpenMode::Read, 1);
326                        self.can_flush = false;
327
328                        if self.memory_mode != MemoryFileMode::DiskOnly {
329                            UnderlyingFile::MemoryOnly
330                        } else {
331                            // Ensure that all chunks are not pending
332                            for chunk in self.memory.iter() {
333                                drop(chunk.read());
334                            }
335
336                            if let UnderlyingFile::WriteMode { file, .. } = file {
337                                file.lock().flush().unwrap();
338                            }
339
340                            UnderlyingFile::ReadMode(
341                                FileBuffer::open(&self.path)
342                                    .inspect_err(|e| {
343                                        error = Some(format!(
344                                            "Error while opening file {}: {}",
345                                            self.path.display(),
346                                            e
347                                        ));
348                                    })
349                                    .ok(),
350                            )
351                        }
352                    }
353                    OpenMode::Write => {
354                        self.open_mode = (OpenMode::Write, 1);
355                        match self.memory_mode {
356                            MemoryFileMode::AlwaysMemory => UnderlyingFile::MemoryOnly,
357                            MemoryFileMode::PreferMemory { .. } => UnderlyingFile::MemoryPreferred,
358                            MemoryFileMode::DiskOnly => {
359                                Self::create_writing_underlying_file(&self.path)
360                            }
361                        }
362                    }
363                }
364            });
365            if let Some(error) = error {
366                return Err(error);
367            }
368        }
369
370        Ok(())
371    }
372
373    pub fn close(&mut self) {
374        self.open_mode.1 -= 1;
375
376        if self.open_mode.1 == 0 {
377            self.open_mode.0 = OpenMode::None;
378            match &self.file {
379                UnderlyingFile::WriteMode { file, .. } => {
380                    file.lock().flush().unwrap();
381                }
382                _ => {}
383            }
384        }
385    }
386
387    fn put_on_swappable_list(self_: &ArcRwLockWriteGuard<RawRwLock, Self>) {
388        if let MemoryFileMode::PreferMemory { swap_priority } = self_.memory_mode {
389            if !self_.on_swap_list.swap(true, Ordering::Relaxed) {
390                SWAPPABLE_FILES.lock().add_file(
391                    swap_priority,
392                    self_.path.clone(),
393                    Arc::downgrade(ArcRwLockWriteGuard::rwlock(self_)),
394                );
395            }
396        }
397    }
398
399    pub fn reserve_space(
400        self_: &Arc<RwLock<Self>>,
401        last_chunk: AllocatedChunk,
402        out_chunks: &mut Vec<(Option<ArcRwLockReadGuard<RawRwLock, FileChunk>>, &mut [u8])>,
403        mut size: usize,
404        el_size: usize,
405    ) -> AllocatedChunk {
406        let mut chunk = last_chunk;
407
408        loop {
409            let rem_bytes = chunk.remaining_bytes();
410            let rem_elements = rem_bytes / el_size;
411            let el_bytes = min(size, rem_elements * el_size);
412
413            assert!(chunk.max_len() >= el_size);
414
415            let space = if el_bytes > 0 {
416                Some(unsafe { chunk.prealloc_bytes_single_thread(el_bytes) })
417            } else {
418                None
419            };
420
421            size -= el_bytes;
422
423            let mut self_ = self_.write_arc();
424
425            if size > 0 {
426                self_
427                    .memory
428                    .push(Arc::new(RwLock::new(FileChunk::OnMemory { chunk })));
429
430                if let Some(space) = space {
431                    let chunk_guard = self_.memory.last().unwrap().read_arc();
432                    out_chunks.push((Some(chunk_guard), space));
433                }
434                Self::put_on_swappable_list(&self_);
435
436                drop(self_);
437                chunk = CHUNKS_ALLOCATOR.request_chunk(chunk_usage!(FileBuffer {
438                    path: std::panic::Location::caller().to_string()
439                }));
440            } else {
441                if let Some(space) = space {
442                    out_chunks.push((None, space));
443                }
444                return chunk;
445            }
446        }
447    }
448
449    pub fn get_underlying_file(&self) -> &UnderlyingFile {
450        &self.file
451    }
452
453    pub fn add_chunk(self_: &Arc<RwLock<Self>>, chunk: AllocatedChunk) {
454        let mut self_ = self_.write_arc();
455        self_
456            .memory
457            .push(Arc::new(RwLock::new(FileChunk::OnMemory { chunk })));
458        Self::put_on_swappable_list(&self_);
459    }
460
461    pub fn flush_chunks(&mut self, limit: usize) -> usize {
462        if !self.can_flush {
463            return 0;
464        }
465
466        match &self.file {
467            UnderlyingFile::NotOpened | UnderlyingFile::MemoryPreferred => {
468                self.file = Self::create_writing_underlying_file(&self.path);
469            }
470            _ => {}
471        }
472
473        if let UnderlyingFile::WriteMode {
474            file,
475            chunk_position,
476        } = &mut self.file
477        {
478            {
479                let mut flushed_count = 0;
480                while flushed_count < limit {
481                    if *chunk_position >= self.memory.len() {
482                        return flushed_count;
483                    }
484
485                    if let Some(flushable_chunk) = self.memory[*chunk_position].try_write_arc() {
486                        GlobalFlush::add_item_to_flush_queue(FlushableItem {
487                            underlying_file: file.clone(),
488                            mode: FileFlushMode::Append {
489                                chunk: flushable_chunk,
490                            },
491                        });
492                        *chunk_position += 1;
493                        flushed_count += 1;
494                    } else {
495                        return flushed_count;
496                    }
497                }
498                return flushed_count;
499            }
500        }
501
502        return 0;
503    }
504
505    pub fn flush_pending_chunks_count(&self) -> usize {
506        match &self.file {
507            UnderlyingFile::NotOpened
508            | UnderlyingFile::MemoryOnly
509            | UnderlyingFile::ReadMode(_) => 0,
510            UnderlyingFile::WriteMode { chunk_position, .. } => {
511                self.get_chunks_count() - *chunk_position
512            }
513            UnderlyingFile::MemoryPreferred => self.get_chunks_count(),
514        }
515    }
516
517    #[inline(always)]
518    pub fn has_flush_pending_chunks(&self) -> bool {
519        self.flush_pending_chunks_count() > 0
520    }
521
522    pub fn change_to_disk_only(&mut self) {
523        if self.is_memory_preferred() {
524            self.memory_mode = MemoryFileMode::DiskOnly;
525            self.file = Self::create_writing_underlying_file(&self.path);
526        }
527    }
528
529    #[inline(always)]
530    pub fn has_only_one_chunk(&self) -> bool {
531        self.memory.len() == 1
532    }
533
534    #[inline(always)]
535    pub fn get_path(&self) -> &Path {
536        self.path.as_ref()
537    }
538
539    #[inline(always)]
540    pub fn len(&self) -> usize {
541        self.memory
542            .iter()
543            .map(|x| match x.read().deref() {
544                FileChunk::OnDisk { len, .. } => *len,
545                FileChunk::OnMemory { chunk } => chunk.len(),
546            })
547            .sum::<usize>()
548    }
549}
550
551unsafe impl Sync for MemoryFileInternal {}
552unsafe impl Send for MemoryFileInternal {}