parallel_processor/memory_fs/file/
writer.rs

1use crate::memory_fs::allocator::{AllocatedChunk, CHUNKS_ALLOCATOR};
2use crate::memory_fs::file::internal::{
3    FileChunk, MemoryFileInternal, MemoryFileMode, OpenMode, UnderlyingFile,
4};
5use crate::memory_fs::stats;
6use parking_lot::{RwLock, RwLockWriteGuard};
7use std::io::{Seek, SeekFrom, Write};
8use std::ops::{Deref, DerefMut};
9use std::path::{Path, PathBuf};
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::Arc;
12
13pub struct FileWriter {
14    path: PathBuf,
15    current_buffer: RwLock<AllocatedChunk>,
16    file_length: AtomicU64,
17    file: Arc<RwLock<MemoryFileInternal>>,
18}
19
20impl FileWriter {
21    /// Creates a new file with the specified mode
22    pub fn create(path: impl AsRef<Path>, mode: MemoryFileMode) -> Self {
23        Self {
24            path: PathBuf::from(path.as_ref()),
25            current_buffer: RwLock::new(
26                CHUNKS_ALLOCATOR.request_chunk(chunk_usage!(TemporarySpace)),
27            ),
28            file_length: AtomicU64::new(0),
29            file: {
30                let file = MemoryFileInternal::create_new(path, mode);
31                file.write().open(OpenMode::Write).unwrap();
32                file
33            },
34        }
35    }
36
37    /// Returns the total length of the file (slow method)
38    pub fn len(&self) -> usize {
39        self.file.read().len() + self.current_buffer.read().len()
40    }
41
42    /// Overwrites bytes at the start of the file, the data field should not be longer than 128 bytes
43    pub fn write_at_start(&mut self, data: &[u8]) -> Result<(), ()> {
44        if data.len() > 128 {
45            return Err(());
46        }
47
48        unsafe {
49            let current_buffer_lock = self.current_buffer.read();
50
51            let file_read = self.file.read();
52
53            if file_read.get_chunks_count() > 0 {
54                drop(current_buffer_lock);
55                match file_read.get_chunk(0).read().deref() {
56                    FileChunk::OnDisk { .. } => {
57                        if let UnderlyingFile::WriteMode { file, .. } =
58                            file_read.get_underlying_file()
59                        {
60                            let mut disk_file_lock = file.lock();
61                            let mut disk_file = disk_file_lock.get_file();
62                            let position = disk_file.stream_position().unwrap();
63                            disk_file.seek(SeekFrom::Start(0)).unwrap();
64                            disk_file.write_all(data).unwrap();
65                            disk_file.seek(SeekFrom::Start(position)).unwrap();
66                            Ok(())
67                        } else {
68                            Err(())
69                        }
70                    }
71                    FileChunk::OnMemory { chunk } => {
72                        std::ptr::copy_nonoverlapping(
73                            data.as_ptr(),
74                            chunk.get_mut_ptr(),
75                            data.len(),
76                        );
77                        Ok(())
78                    }
79                }
80            } else {
81                std::ptr::copy_nonoverlapping(
82                    data.as_ptr(),
83                    current_buffer_lock.get_mut_ptr(),
84                    data.len(),
85                );
86                Ok(())
87            }
88        }
89    }
90
91    /// Appends atomically all the data to the file, returning the start position of the written data in the file
92    pub fn write_all_parallel(&self, data: &[u8], el_size: usize) -> u64 {
93        // Update stats
94        stats::add_files_usage(data.len() as u64);
95
96        let buffer = self.current_buffer.read();
97        if let Some(chunk_position) = buffer.write_bytes_noextend(data) {
98            self.file_length.load(Ordering::Relaxed) + chunk_position
99        } else {
100            drop(buffer);
101            let mut buffer = self.current_buffer.write();
102
103            // Check if now the buffer can be written
104            if let Some(chunk_position) = buffer.write_bytes_noextend(data) {
105                return self.file_length.load(Ordering::Relaxed) + chunk_position;
106            }
107
108            let mut temp_vec = Vec::new();
109
110            let position = self
111                .file_length
112                .fetch_add(buffer.len() as u64, Ordering::SeqCst)
113                + (buffer.len() as u64);
114
115            replace_with::replace_with_or_abort(buffer.deref_mut(), |buffer| {
116                let new_buffer = MemoryFileInternal::reserve_space(
117                    &self.file,
118                    buffer,
119                    &mut temp_vec,
120                    data.len(),
121                    el_size,
122                );
123                new_buffer
124            });
125
126            // Add the completely filled chunks to the file, removing the last size as the current chunk is not counted yet in the file_length
127            self.file_length
128                .fetch_add((data.len() - buffer.len()) as u64, Ordering::SeqCst);
129
130            let _buffer_read = RwLockWriteGuard::downgrade(buffer);
131
132            let mut offset = 0;
133            for (_lock, part) in temp_vec.drain(..) {
134                part.copy_from_slice(&data[offset..(offset + part.len())]);
135                offset += part.len();
136            }
137
138            if self.file.read().is_on_disk() {
139                self.file.write().flush_chunks(usize::MAX);
140            }
141            position
142        }
143    }
144
145    pub fn get_path(&self) -> PathBuf {
146        self.path.clone()
147    }
148
149    pub fn flush_async(&self) {}
150}
151
152impl Write for FileWriter {
153    #[inline(always)]
154    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
155        self.write_all_parallel(buf, 1);
156        Ok(buf.len())
157    }
158
159    #[inline(always)]
160    fn flush(&mut self) -> std::io::Result<()> {
161        Ok(())
162    }
163}
164
165impl Drop for FileWriter {
166    fn drop(&mut self) {
167        let mut current_buffer = self.current_buffer.write();
168        if current_buffer.len() > 0 {
169            MemoryFileInternal::add_chunk(
170                &self.file,
171                std::mem::replace(current_buffer.deref_mut(), AllocatedChunk::INVALID),
172            );
173            if self.file.read().is_on_disk() {
174                self.file.write().flush_chunks(usize::MAX);
175            }
176        }
177        self.file.write().close();
178    }
179}