parallel_processor/memory_fs/file/
internal.rs

1use crate::memory_fs::allocator::{AllocatedChunk, CHUNKS_ALLOCATOR};
2use crate::memory_fs::file::flush::GlobalFlush;
3use crate::memory_fs::flushable_buffer::{FileFlushMode, FlushableItem};
4use crate::memory_fs::stats;
5use dashmap::DashMap;
6use filebuffer::FileBuffer;
7use nightly_quirks::utils::NightlyUtils;
8use once_cell::sync::Lazy;
9use parking_lot::lock_api::{ArcRwLockReadGuard, ArcRwLockWriteGuard, RawMutex};
10use parking_lot::{Mutex, RawRwLock, RwLock};
11use replace_with::replace_with_or_abort;
12use rustc_hash::FxHashMap;
13use std::cmp::min;
14use std::fs::remove_file;
15use std::ops::Deref;
16use std::path::{Path, PathBuf};
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::sync::{Arc, Weak};
19
20use super::handle::FileHandle;
21
22static MEMORY_MAPPED_FILES: Lazy<DashMap<PathBuf, Arc<RwLock<MemoryFileInternal>>>> =
23    Lazy::new(|| DashMap::new());
24
25pub static SWAPPABLE_FILES: Mutex<
26    Option<FxHashMap<(usize, PathBuf), Weak<RwLock<MemoryFileInternal>>>>,
27> = Mutex::const_new(parking_lot::RawMutex::INIT, None);
28
29#[derive(Copy, Clone, Eq, PartialEq, Debug)]
30pub enum MemoryFileMode {
31    AlwaysMemory,
32    PreferMemory { swap_priority: usize },
33    DiskOnly,
34}
35
36#[derive(Copy, Clone, Eq, PartialEq)]
37pub enum OpenMode {
38    None,
39    Read,
40    Write,
41}
42
43// struct RwLockIterator<'a, A, B, I: Iterator<Item = B>> {
44//     _lock: RwLockReadGuard<'a, A>,
45//     iterator: I,
46// }
47
48// impl<'a, A, B, I: Iterator<Item = B>> Iterator for RwLockIterator<'a, A, B, I> {
49//     type Item = B;
50
51//     fn next(&mut self) -> Option<Self::Item> {
52//         self.iterator.next()
53//     }
54// }
55
56// pub struct MemoryFileChunksIterator<'a, I: Iterator<Item = &'a mut [u8]>> {
57//     iter: I,
58// }
59
60// impl<'a, I: Iterator<Item = &'a mut [u8]>> Iterator for MemoryFileChunksIterator<'a, I> {
61//     type Item = &'a mut [u8];
62
63//     #[inline(always)]
64//     fn next(&mut self) -> Option<Self::Item> {
65//         self.iter.next()
66//     }
67// }
68
69pub enum FileChunk {
70    OnDisk { offset: u64, len: usize },
71    OnMemory { chunk: AllocatedChunk },
72}
73
74impl FileChunk {
75    pub fn get_length(&self) -> usize {
76        match self {
77            FileChunk::OnDisk { len, .. } => *len,
78            FileChunk::OnMemory { chunk } => chunk.len(),
79        }
80    }
81
82    #[inline(always)]
83    pub fn get_ptr(&self, file: &UnderlyingFile, prefetch: Option<usize>) -> *const u8 {
84        unsafe {
85            match self {
86                FileChunk::OnDisk { offset, .. } => {
87                    if let UnderlyingFile::ReadMode(file) = file {
88                        let file = file.as_ref().unwrap();
89
90                        if let Some(prefetch) = prefetch {
91                            let remaining_length = file.len() - *offset as usize;
92                            let prefetch_length = min(remaining_length, prefetch);
93                            file.prefetch(*offset as usize, prefetch_length);
94                        }
95
96                        file.as_ptr().add(*offset as usize)
97                    } else {
98                        panic!("Error, wrong underlying file!");
99                    }
100                }
101                FileChunk::OnMemory { chunk } => chunk.get_mut_ptr() as *const u8,
102            }
103        }
104    }
105}
106
107pub enum UnderlyingFile {
108    NotOpened,
109    MemoryOnly,
110    MemoryPreferred,
111    WriteMode {
112        file: Arc<Mutex<FileHandle>>,
113        chunk_position: usize,
114    },
115    ReadMode(Option<FileBuffer>),
116}
117
118pub struct MemoryFileInternal {
119    /// Path associated with the current file
120    path: PathBuf,
121    /// Disk read/write structure
122    file: UnderlyingFile,
123    /// Memory mode
124    memory_mode: MemoryFileMode,
125    /// Read or write mode
126    open_mode: (OpenMode, usize),
127    /// Actual memory mapping
128    memory: Vec<Arc<RwLock<FileChunk>>>,
129    /// True if it's in the swap list
130    on_swap_list: AtomicBool,
131    /// True if more chunks can be flushed
132    can_flush: bool,
133}
134
135impl MemoryFileInternal {
136    pub fn create_new(path: impl AsRef<Path>, memory_mode: MemoryFileMode) -> Arc<RwLock<Self>> {
137        let new_file = Arc::new(RwLock::new(Self {
138            path: path.as_ref().into(),
139            file: UnderlyingFile::NotOpened,
140            memory_mode,
141            open_mode: (OpenMode::None, 0),
142            memory: Vec::new(),
143            on_swap_list: AtomicBool::new(false),
144            can_flush: true,
145        }));
146
147        MEMORY_MAPPED_FILES.insert(path.as_ref().into(), new_file.clone());
148
149        new_file
150    }
151
152    pub fn create_from_fs(path: impl AsRef<Path>) -> Option<Arc<RwLock<Self>>> {
153        if !path.as_ref().exists() || !path.as_ref().is_file() {
154            return None;
155        }
156        let len = path.as_ref().metadata().ok()?.len() as usize;
157
158        let new_file = Arc::new(RwLock::new(Self {
159            path: path.as_ref().into(),
160            file: UnderlyingFile::NotOpened,
161            memory_mode: MemoryFileMode::DiskOnly,
162            open_mode: (OpenMode::None, 0),
163            memory: vec![Arc::new(RwLock::new(FileChunk::OnDisk { offset: 0, len }))],
164            on_swap_list: AtomicBool::new(false),
165            can_flush: false,
166        }));
167
168        MEMORY_MAPPED_FILES.insert(path.as_ref().into(), new_file.clone());
169
170        Some(new_file)
171    }
172
173    pub fn debug_dump_files() {
174        for file in MEMORY_MAPPED_FILES.iter() {
175            let file = file.read();
176            crate::log_info!(
177                "File '{}' => chunks: {}",
178                file.path.display(),
179                file.memory.len()
180            );
181        }
182    }
183
184    pub fn is_on_disk(&self) -> bool {
185        self.memory_mode == MemoryFileMode::DiskOnly
186    }
187
188    pub fn is_memory_preferred(&self) -> bool {
189        if let MemoryFileMode::PreferMemory { .. } = self.memory_mode {
190            true
191        } else {
192            false
193        }
194    }
195
196    pub fn get_chunk(&self, index: usize) -> Arc<RwLock<FileChunk>> {
197        self.memory[index].clone()
198    }
199
200    pub fn get_chunks_count(&self) -> usize {
201        self.memory.len()
202    }
203
204    pub fn retrieve_reference(path: impl AsRef<Path>) -> Option<Arc<RwLock<Self>>> {
205        MEMORY_MAPPED_FILES.get(path.as_ref()).map(|f| f.clone())
206    }
207
208    pub fn active_files_count() -> usize {
209        MEMORY_MAPPED_FILES.len()
210    }
211
212    pub fn delete(path: impl AsRef<Path>, remove_fs: bool) -> bool {
213        if let Some(file) = MEMORY_MAPPED_FILES.remove(path.as_ref()) {
214            stats::decrease_files_usage(file.1.read().len() as u64);
215            if remove_fs {
216                match file.1.read().memory_mode {
217                    MemoryFileMode::AlwaysMemory => {}
218                    MemoryFileMode::PreferMemory { swap_priority } => {
219                        NightlyUtils::mutex_get_or_init(&mut SWAPPABLE_FILES.lock(), || {
220                            FxHashMap::default()
221                        })
222                        .remove(&(swap_priority, path.as_ref().to_path_buf()));
223                    }
224                    MemoryFileMode::DiskOnly => {
225                        if let Ok(file_meta) = std::fs::metadata(path.as_ref()) {
226                            stats::decrease_disk_usage(file_meta.len());
227                        }
228                        let _ = remove_file(path);
229                    }
230                }
231            }
232            true
233        } else {
234            false
235        }
236    }
237
238    pub fn delete_directory(dir: impl AsRef<Path>, remove_fs: bool) -> bool {
239        let mut all_succeeded = true;
240        let mut to_delete = vec![];
241        for file in MEMORY_MAPPED_FILES.iter() {
242            if file.key().starts_with(&dir) {
243                to_delete.push(file.key().clone());
244            }
245        }
246
247        for file in to_delete {
248            all_succeeded &= Self::delete(&file, remove_fs);
249        }
250
251        all_succeeded
252    }
253
254    fn create_writing_underlying_file(path: &Path) -> UnderlyingFile {
255        // Remove the file if it existed from a previous run
256        let _ = remove_file(path);
257
258        UnderlyingFile::WriteMode {
259            file: Arc::new(Mutex::new(FileHandle::new(path.to_path_buf()))),
260            chunk_position: 0,
261        }
262    }
263
264    pub fn open(&mut self, mode: OpenMode) -> Result<(), String> {
265        if self.open_mode.0 == mode {
266            self.open_mode.1 += 1;
267            return Ok(());
268        }
269
270        if self.open_mode.0 != OpenMode::None {
271            return Err(format!("File {} is already opened!", self.path.display()));
272        }
273
274        {
275            let mut error = None;
276            replace_with_or_abort(&mut self.file, |file| {
277                match mode {
278                    OpenMode::None => UnderlyingFile::NotOpened,
279                    OpenMode::Read => {
280                        self.open_mode = (OpenMode::Read, 1);
281                        self.can_flush = false;
282
283                        if self.memory_mode != MemoryFileMode::DiskOnly {
284                            UnderlyingFile::MemoryOnly
285                        } else {
286                            // Ensure that all chunks are not pending
287                            for chunk in self.memory.iter() {
288                                drop(chunk.read());
289                            }
290
291                            if let UnderlyingFile::WriteMode { file, .. } = file {
292                                file.lock().flush().unwrap();
293                            }
294
295                            UnderlyingFile::ReadMode(
296                                FileBuffer::open(&self.path)
297                                    .inspect_err(|e| {
298                                        error = Some(format!(
299                                            "Error while opening file {}: {}",
300                                            self.path.display(),
301                                            e
302                                        ));
303                                    })
304                                    .ok(),
305                            )
306                        }
307                    }
308                    OpenMode::Write => {
309                        self.open_mode = (OpenMode::Write, 1);
310                        match self.memory_mode {
311                            MemoryFileMode::AlwaysMemory => UnderlyingFile::MemoryOnly,
312                            MemoryFileMode::PreferMemory { .. } => UnderlyingFile::MemoryPreferred,
313                            MemoryFileMode::DiskOnly => {
314                                Self::create_writing_underlying_file(&self.path)
315                            }
316                        }
317                    }
318                }
319            });
320            if let Some(error) = error {
321                return Err(error);
322            }
323        }
324
325        Ok(())
326    }
327
328    pub fn close(&mut self) {
329        self.open_mode.1 -= 1;
330
331        if self.open_mode.1 == 0 {
332            self.open_mode.0 = OpenMode::None;
333            match &self.file {
334                UnderlyingFile::WriteMode { file, .. } => {
335                    file.lock().flush().unwrap();
336                }
337                _ => {}
338            }
339        }
340    }
341
342    fn put_on_swappable_list(self_: &ArcRwLockWriteGuard<RawRwLock, Self>) {
343        if let MemoryFileMode::PreferMemory { swap_priority } = self_.memory_mode {
344            if !self_.on_swap_list.swap(true, Ordering::Relaxed) {
345                NightlyUtils::mutex_get_or_init(&mut SWAPPABLE_FILES.lock(), || {
346                    FxHashMap::default()
347                })
348                .insert(
349                    (swap_priority, self_.path.clone()),
350                    Arc::downgrade(ArcRwLockWriteGuard::rwlock(self_)),
351                );
352            }
353        }
354    }
355
356    pub fn reserve_space(
357        self_: &Arc<RwLock<Self>>,
358        last_chunk: AllocatedChunk,
359        out_chunks: &mut Vec<(Option<ArcRwLockReadGuard<RawRwLock, FileChunk>>, &mut [u8])>,
360        mut size: usize,
361        el_size: usize,
362    ) -> AllocatedChunk {
363        let mut chunk = last_chunk;
364
365        loop {
366            let rem_bytes = chunk.remaining_bytes();
367            let rem_elements = rem_bytes / el_size;
368            let el_bytes = min(size, rem_elements * el_size);
369
370            assert!(chunk.max_len() >= el_size);
371
372            let space = if el_bytes > 0 {
373                Some(unsafe { chunk.prealloc_bytes_single_thread(el_bytes) })
374            } else {
375                None
376            };
377
378            size -= el_bytes;
379
380            let mut self_ = self_.write_arc();
381
382            if size > 0 {
383                self_
384                    .memory
385                    .push(Arc::new(RwLock::new(FileChunk::OnMemory { chunk })));
386
387                if let Some(space) = space {
388                    let chunk_guard = self_.memory.last().unwrap().read_arc();
389                    out_chunks.push((Some(chunk_guard), space));
390                }
391                Self::put_on_swappable_list(&self_);
392
393                drop(self_);
394                chunk = CHUNKS_ALLOCATOR.request_chunk(chunk_usage!(FileBuffer {
395                    path: std::panic::Location::caller().to_string()
396                }));
397            } else {
398                if let Some(space) = space {
399                    out_chunks.push((None, space));
400                }
401                return chunk;
402            }
403        }
404    }
405
406    pub fn get_underlying_file(&self) -> &UnderlyingFile {
407        &self.file
408    }
409
410    pub fn add_chunk(self_: &Arc<RwLock<Self>>, chunk: AllocatedChunk) {
411        let mut self_ = self_.write_arc();
412        self_
413            .memory
414            .push(Arc::new(RwLock::new(FileChunk::OnMemory { chunk })));
415        Self::put_on_swappable_list(&self_);
416    }
417
418    pub fn flush_chunks(&mut self, limit: usize) -> usize {
419        if !self.can_flush {
420            return 0;
421        }
422
423        match &self.file {
424            UnderlyingFile::NotOpened | UnderlyingFile::MemoryPreferred => {
425                self.file = Self::create_writing_underlying_file(&self.path);
426            }
427            _ => {}
428        }
429
430        if let UnderlyingFile::WriteMode {
431            file,
432            chunk_position,
433        } = &mut self.file
434        {
435            {
436                let mut flushed_count = 0;
437                while flushed_count < limit {
438                    if *chunk_position >= self.memory.len() {
439                        return flushed_count;
440                    }
441
442                    if let Some(flushable_chunk) = self.memory[*chunk_position].try_write_arc() {
443                        GlobalFlush::add_item_to_flush_queue(FlushableItem {
444                            underlying_file: file.clone(),
445                            mode: FileFlushMode::Append {
446                                chunk: flushable_chunk,
447                            },
448                        });
449                        *chunk_position += 1;
450                        flushed_count += 1;
451                    } else {
452                        return flushed_count;
453                    }
454                }
455                return flushed_count;
456            }
457        }
458
459        return 0;
460    }
461
462    pub fn flush_pending_chunks_count(&self) -> usize {
463        match &self.file {
464            UnderlyingFile::NotOpened
465            | UnderlyingFile::MemoryOnly
466            | UnderlyingFile::ReadMode(_) => 0,
467            UnderlyingFile::WriteMode { chunk_position, .. } => {
468                self.get_chunks_count() - *chunk_position
469            }
470            UnderlyingFile::MemoryPreferred => self.get_chunks_count(),
471        }
472    }
473
474    #[inline(always)]
475    pub fn has_flush_pending_chunks(&self) -> bool {
476        self.flush_pending_chunks_count() > 0
477    }
478
479    pub fn change_to_disk_only(&mut self) {
480        if self.is_memory_preferred() {
481            self.memory_mode = MemoryFileMode::DiskOnly;
482            self.file = Self::create_writing_underlying_file(&self.path);
483        }
484    }
485
486    #[inline(always)]
487    pub fn has_only_one_chunk(&self) -> bool {
488        self.memory.len() == 1
489    }
490
491    #[inline(always)]
492    pub fn get_path(&self) -> &Path {
493        self.path.as_ref()
494    }
495
496    #[inline(always)]
497    pub fn len(&self) -> usize {
498        self.memory
499            .iter()
500            .map(|x| match x.read().deref() {
501                FileChunk::OnDisk { len, .. } => *len,
502                FileChunk::OnMemory { chunk } => chunk.len(),
503            })
504            .sum::<usize>()
505    }
506}
507
508unsafe impl Sync for MemoryFileInternal {}
509unsafe impl Send for MemoryFileInternal {}