parallel_processor/memory_fs/file/
internal.rs

1use crate::memory_fs::allocator::{AllocatedChunk, CHUNKS_ALLOCATOR};
2use crate::memory_fs::file::flush::GlobalFlush;
3use crate::memory_fs::flushable_buffer::{FileFlushMode, FlushableItem};
4use crate::memory_fs::stats;
5use dashmap::DashMap;
6use filebuffer::FileBuffer;
7use once_cell::sync::Lazy;
8use parking_lot::lock_api::{ArcRwLockReadGuard, ArcRwLockWriteGuard};
9use parking_lot::{Mutex, RawRwLock, RwLock};
10use replace_with::replace_with_or_abort;
11use rustc_hash::FxHashMap;
12use std::cmp::min;
13use std::collections::BinaryHeap;
14use std::fs::remove_file;
15use std::ops::Deref;
16use std::path::{Path, PathBuf};
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::sync::{Arc, LazyLock, Weak};
19
20use super::handle::FileHandle;
21
22static MEMORY_MAPPED_FILES: Lazy<DashMap<PathBuf, Arc<RwLock<MemoryFileInternal>>>> =
23    Lazy::new(|| DashMap::new());
24
25#[derive(Default)]
26pub struct SwappableFiles {
27    index: FxHashMap<PathBuf, Weak<RwLock<MemoryFileInternal>>>,
28    queue: BinaryHeap<(usize, PathBuf)>,
29}
30
31impl SwappableFiles {
32    pub fn add_file(
33        &mut self,
34        priority: usize,
35        path: PathBuf,
36        file: Weak<RwLock<MemoryFileInternal>>,
37    ) {
38        self.index.insert(path.clone(), file);
39        self.queue.push((priority, path));
40    }
41
42    pub fn remove_file(&mut self, path: &Path) {
43        self.index.remove(path);
44    }
45
46    pub fn get_next(&mut self) -> Option<Arc<RwLock<MemoryFileInternal>>> {
47        while let Some((_, file)) = self.queue.pop() {
48            if let Some(file_entry) = self.index.remove(&file) {
49                if let Some(file_entry) = file_entry.upgrade() {
50                    let file_read = file_entry.read();
51                    if file_read.is_memory_preferred() && file_read.has_flush_pending_chunks() {
52                        drop(file_read);
53                        return Some(file_entry);
54                    } else {
55                        // Remove the file if it cannot be swapped
56                        file_read.on_swap_list.store(false, Ordering::Relaxed);
57                    }
58                }
59            }
60        }
61
62        None
63    }
64}
65
66pub static SWAPPABLE_FILES: LazyLock<Mutex<SwappableFiles>> =
67    LazyLock::new(|| Mutex::new(Default::default()));
68
69#[derive(Copy, Clone, Eq, PartialEq, Debug)]
70pub enum MemoryFileMode {
71    AlwaysMemory,
72    PreferMemory { swap_priority: usize },
73    DiskOnly,
74}
75
76#[derive(Copy, Clone, Eq, PartialEq)]
77pub enum OpenMode {
78    None,
79    Read,
80    Write,
81}
82
83// struct RwLockIterator<'a, A, B, I: Iterator<Item = B>> {
84//     _lock: RwLockReadGuard<'a, A>,
85//     iterator: I,
86// }
87
88// impl<'a, A, B, I: Iterator<Item = B>> Iterator for RwLockIterator<'a, A, B, I> {
89//     type Item = B;
90
91//     fn next(&mut self) -> Option<Self::Item> {
92//         self.iterator.next()
93//     }
94// }
95
96// pub struct MemoryFileChunksIterator<'a, I: Iterator<Item = &'a mut [u8]>> {
97//     iter: I,
98// }
99
100// impl<'a, I: Iterator<Item = &'a mut [u8]>> Iterator for MemoryFileChunksIterator<'a, I> {
101//     type Item = &'a mut [u8];
102
103//     #[inline(always)]
104//     fn next(&mut self) -> Option<Self::Item> {
105//         self.iter.next()
106//     }
107// }
108
109pub enum FileChunk {
110    OnDisk { offset: u64, len: usize },
111    OnMemory { chunk: AllocatedChunk },
112}
113
114impl FileChunk {
115    pub fn get_length(&self) -> usize {
116        match self {
117            FileChunk::OnDisk { len, .. } => *len,
118            FileChunk::OnMemory { chunk } => chunk.len(),
119        }
120    }
121
122    #[inline(always)]
123    pub fn get_ptr(&self, file: &UnderlyingFile, prefetch: Option<usize>) -> *const u8 {
124        unsafe {
125            match self {
126                FileChunk::OnDisk { offset, .. } => {
127                    if let UnderlyingFile::ReadMode(file) = file {
128                        let file = file.as_ref().unwrap();
129
130                        if let Some(prefetch) = prefetch {
131                            let remaining_length = file.len() - *offset as usize;
132                            let prefetch_length = min(remaining_length, prefetch);
133                            file.prefetch(*offset as usize, prefetch_length);
134                        }
135
136                        file.as_ptr().add(*offset as usize)
137                    } else {
138                        panic!("Error, wrong underlying file!");
139                    }
140                }
141                FileChunk::OnMemory { chunk } => chunk.get_mut_ptr() as *const u8,
142            }
143        }
144    }
145}
146
147pub enum UnderlyingFile {
148    NotOpened,
149    MemoryOnly,
150    MemoryPreferred,
151    WriteMode {
152        file: Arc<Mutex<FileHandle>>,
153        chunk_position: usize,
154    },
155    ReadMode(Option<FileBuffer>),
156}
157
158pub struct MemoryFileInternal {
159    /// Path associated with the current file
160    path: PathBuf,
161    /// Disk read/write structure
162    file: UnderlyingFile,
163    /// Memory mode
164    memory_mode: MemoryFileMode,
165    /// Read or write mode
166    open_mode: (OpenMode, usize),
167    /// Actual memory mapping
168    memory: Vec<Arc<RwLock<FileChunk>>>,
169    /// True if it's in the swap list
170    on_swap_list: AtomicBool,
171    /// True if more chunks can be flushed
172    can_flush: bool,
173}
174
175impl MemoryFileInternal {
176    pub fn create_new(path: impl AsRef<Path>, memory_mode: MemoryFileMode) -> Arc<RwLock<Self>> {
177        let new_file = Arc::new(RwLock::new(Self {
178            path: path.as_ref().into(),
179            file: UnderlyingFile::NotOpened,
180            memory_mode,
181            open_mode: (OpenMode::None, 0),
182            memory: Vec::new(),
183            on_swap_list: AtomicBool::new(false),
184            can_flush: true,
185        }));
186
187        MEMORY_MAPPED_FILES.insert(path.as_ref().into(), new_file.clone());
188
189        new_file
190    }
191
192    pub fn create_from_fs(path: impl AsRef<Path>) -> Option<Arc<RwLock<Self>>> {
193        if !path.as_ref().exists() || !path.as_ref().is_file() {
194            return None;
195        }
196        let len = path.as_ref().metadata().ok()?.len() as usize;
197
198        let new_file = Arc::new(RwLock::new(Self {
199            path: path.as_ref().into(),
200            file: UnderlyingFile::NotOpened,
201            memory_mode: MemoryFileMode::DiskOnly,
202            open_mode: (OpenMode::None, 0),
203            memory: vec![Arc::new(RwLock::new(FileChunk::OnDisk { offset: 0, len }))],
204            on_swap_list: AtomicBool::new(false),
205            can_flush: false,
206        }));
207
208        MEMORY_MAPPED_FILES.insert(path.as_ref().into(), new_file.clone());
209
210        Some(new_file)
211    }
212
213    pub fn debug_dump_files() {
214        for file in MEMORY_MAPPED_FILES.iter() {
215            let file = file.read();
216            crate::log_info!(
217                "File '{}' => chunks: {}",
218                file.path.display(),
219                file.memory.len()
220            );
221        }
222    }
223
224    pub fn is_on_disk(&self) -> bool {
225        self.memory_mode == MemoryFileMode::DiskOnly
226    }
227
228    pub fn is_memory_preferred(&self) -> bool {
229        if let MemoryFileMode::PreferMemory { .. } = self.memory_mode {
230            true
231        } else {
232            false
233        }
234    }
235
236    pub fn get_chunk(&self, index: usize) -> Arc<RwLock<FileChunk>> {
237        self.memory[index].clone()
238    }
239
240    pub fn get_chunks_count(&self) -> usize {
241        self.memory.len()
242    }
243
244    pub fn retrieve_reference(path: impl AsRef<Path>) -> Option<Arc<RwLock<Self>>> {
245        MEMORY_MAPPED_FILES.get(path.as_ref()).map(|f| f.clone())
246    }
247
248    pub fn active_files_count() -> usize {
249        MEMORY_MAPPED_FILES.len()
250    }
251
252    pub fn delete(path: impl AsRef<Path>, remove_fs: bool) -> bool {
253        if let Some(file) = MEMORY_MAPPED_FILES.remove(path.as_ref()) {
254            stats::decrease_files_usage(file.1.read().len() as u64);
255            if remove_fs {
256                match file.1.read().memory_mode {
257                    MemoryFileMode::AlwaysMemory => {}
258                    MemoryFileMode::PreferMemory { .. } => {
259                        SWAPPABLE_FILES.lock().remove_file(path.as_ref())
260                    }
261                    MemoryFileMode::DiskOnly => {
262                        if let Ok(file_meta) = std::fs::metadata(path.as_ref()) {
263                            stats::decrease_disk_usage(file_meta.len());
264                        }
265                        let _ = remove_file(path);
266                    }
267                }
268            }
269            true
270        } else {
271            false
272        }
273    }
274
275    pub fn delete_directory(dir: impl AsRef<Path>, remove_fs: bool) -> bool {
276        let mut all_succeeded = true;
277        let mut to_delete = vec![];
278        for file in MEMORY_MAPPED_FILES.iter() {
279            if file.key().starts_with(&dir) {
280                to_delete.push(file.key().clone());
281            }
282        }
283
284        for file in to_delete {
285            all_succeeded &= Self::delete(&file, remove_fs);
286        }
287
288        all_succeeded
289    }
290
291    fn create_writing_underlying_file(path: &Path) -> UnderlyingFile {
292        // Remove the file if it existed from a previous run
293        let _ = remove_file(path);
294
295        UnderlyingFile::WriteMode {
296            file: Arc::new(Mutex::new(FileHandle::new(path.to_path_buf()))),
297            chunk_position: 0,
298        }
299    }
300
301    pub fn open(&mut self, mode: OpenMode) -> Result<(), String> {
302        if self.open_mode.0 == mode {
303            self.open_mode.1 += 1;
304            return Ok(());
305        }
306
307        if self.open_mode.0 != OpenMode::None {
308            return Err(format!("File {} is already opened!", self.path.display()));
309        }
310
311        {
312            let mut error = None;
313            replace_with_or_abort(&mut self.file, |file| {
314                match mode {
315                    OpenMode::None => UnderlyingFile::NotOpened,
316                    OpenMode::Read => {
317                        self.open_mode = (OpenMode::Read, 1);
318                        self.can_flush = false;
319
320                        if self.memory_mode != MemoryFileMode::DiskOnly {
321                            UnderlyingFile::MemoryOnly
322                        } else {
323                            // Ensure that all chunks are not pending
324                            for chunk in self.memory.iter() {
325                                drop(chunk.read());
326                            }
327
328                            if let UnderlyingFile::WriteMode { file, .. } = file {
329                                file.lock().flush().unwrap();
330                            }
331
332                            UnderlyingFile::ReadMode(
333                                FileBuffer::open(&self.path)
334                                    .inspect_err(|e| {
335                                        error = Some(format!(
336                                            "Error while opening file {}: {}",
337                                            self.path.display(),
338                                            e
339                                        ));
340                                    })
341                                    .ok(),
342                            )
343                        }
344                    }
345                    OpenMode::Write => {
346                        self.open_mode = (OpenMode::Write, 1);
347                        match self.memory_mode {
348                            MemoryFileMode::AlwaysMemory => UnderlyingFile::MemoryOnly,
349                            MemoryFileMode::PreferMemory { .. } => UnderlyingFile::MemoryPreferred,
350                            MemoryFileMode::DiskOnly => {
351                                Self::create_writing_underlying_file(&self.path)
352                            }
353                        }
354                    }
355                }
356            });
357            if let Some(error) = error {
358                return Err(error);
359            }
360        }
361
362        Ok(())
363    }
364
365    pub fn close(&mut self) {
366        self.open_mode.1 -= 1;
367
368        if self.open_mode.1 == 0 {
369            self.open_mode.0 = OpenMode::None;
370            match &self.file {
371                UnderlyingFile::WriteMode { file, .. } => {
372                    file.lock().flush().unwrap();
373                }
374                _ => {}
375            }
376        }
377    }
378
379    fn put_on_swappable_list(self_: &ArcRwLockWriteGuard<RawRwLock, Self>) {
380        if let MemoryFileMode::PreferMemory { swap_priority } = self_.memory_mode {
381            if !self_.on_swap_list.swap(true, Ordering::Relaxed) {
382                SWAPPABLE_FILES.lock().add_file(
383                    swap_priority,
384                    self_.path.clone(),
385                    Arc::downgrade(ArcRwLockWriteGuard::rwlock(self_)),
386                );
387            }
388        }
389    }
390
391    pub fn reserve_space(
392        self_: &Arc<RwLock<Self>>,
393        last_chunk: AllocatedChunk,
394        out_chunks: &mut Vec<(Option<ArcRwLockReadGuard<RawRwLock, FileChunk>>, &mut [u8])>,
395        mut size: usize,
396        el_size: usize,
397    ) -> AllocatedChunk {
398        let mut chunk = last_chunk;
399
400        loop {
401            let rem_bytes = chunk.remaining_bytes();
402            let rem_elements = rem_bytes / el_size;
403            let el_bytes = min(size, rem_elements * el_size);
404
405            assert!(chunk.max_len() >= el_size);
406
407            let space = if el_bytes > 0 {
408                Some(unsafe { chunk.prealloc_bytes_single_thread(el_bytes) })
409            } else {
410                None
411            };
412
413            size -= el_bytes;
414
415            let mut self_ = self_.write_arc();
416
417            if size > 0 {
418                self_
419                    .memory
420                    .push(Arc::new(RwLock::new(FileChunk::OnMemory { chunk })));
421
422                if let Some(space) = space {
423                    let chunk_guard = self_.memory.last().unwrap().read_arc();
424                    out_chunks.push((Some(chunk_guard), space));
425                }
426                Self::put_on_swappable_list(&self_);
427
428                drop(self_);
429                chunk = CHUNKS_ALLOCATOR.request_chunk(chunk_usage!(FileBuffer {
430                    path: std::panic::Location::caller().to_string()
431                }));
432            } else {
433                if let Some(space) = space {
434                    out_chunks.push((None, space));
435                }
436                return chunk;
437            }
438        }
439    }
440
441    pub fn get_underlying_file(&self) -> &UnderlyingFile {
442        &self.file
443    }
444
445    pub fn add_chunk(self_: &Arc<RwLock<Self>>, chunk: AllocatedChunk) {
446        let mut self_ = self_.write_arc();
447        self_
448            .memory
449            .push(Arc::new(RwLock::new(FileChunk::OnMemory { chunk })));
450        Self::put_on_swappable_list(&self_);
451    }
452
453    pub fn flush_chunks(&mut self, limit: usize) -> usize {
454        if !self.can_flush {
455            return 0;
456        }
457
458        match &self.file {
459            UnderlyingFile::NotOpened | UnderlyingFile::MemoryPreferred => {
460                self.file = Self::create_writing_underlying_file(&self.path);
461            }
462            _ => {}
463        }
464
465        if let UnderlyingFile::WriteMode {
466            file,
467            chunk_position,
468        } = &mut self.file
469        {
470            {
471                let mut flushed_count = 0;
472                while flushed_count < limit {
473                    if *chunk_position >= self.memory.len() {
474                        return flushed_count;
475                    }
476
477                    if let Some(flushable_chunk) = self.memory[*chunk_position].try_write_arc() {
478                        GlobalFlush::add_item_to_flush_queue(FlushableItem {
479                            underlying_file: file.clone(),
480                            mode: FileFlushMode::Append {
481                                chunk: flushable_chunk,
482                            },
483                        });
484                        *chunk_position += 1;
485                        flushed_count += 1;
486                    } else {
487                        return flushed_count;
488                    }
489                }
490                return flushed_count;
491            }
492        }
493
494        return 0;
495    }
496
497    pub fn flush_pending_chunks_count(&self) -> usize {
498        match &self.file {
499            UnderlyingFile::NotOpened
500            | UnderlyingFile::MemoryOnly
501            | UnderlyingFile::ReadMode(_) => 0,
502            UnderlyingFile::WriteMode { chunk_position, .. } => {
503                self.get_chunks_count() - *chunk_position
504            }
505            UnderlyingFile::MemoryPreferred => self.get_chunks_count(),
506        }
507    }
508
509    #[inline(always)]
510    pub fn has_flush_pending_chunks(&self) -> bool {
511        self.flush_pending_chunks_count() > 0
512    }
513
514    pub fn change_to_disk_only(&mut self) {
515        if self.is_memory_preferred() {
516            self.memory_mode = MemoryFileMode::DiskOnly;
517            self.file = Self::create_writing_underlying_file(&self.path);
518        }
519    }
520
521    #[inline(always)]
522    pub fn has_only_one_chunk(&self) -> bool {
523        self.memory.len() == 1
524    }
525
526    #[inline(always)]
527    pub fn get_path(&self) -> &Path {
528        self.path.as_ref()
529    }
530
531    #[inline(always)]
532    pub fn len(&self) -> usize {
533        self.memory
534            .iter()
535            .map(|x| match x.read().deref() {
536                FileChunk::OnDisk { len, .. } => *len,
537                FileChunk::OnMemory { chunk } => chunk.len(),
538            })
539            .sum::<usize>()
540    }
541}
542
543unsafe impl Sync for MemoryFileInternal {}
544unsafe impl Send for MemoryFileInternal {}