parallel_processor/memory_fs/file/
writer.rs

1use crate::memory_fs::allocator::{AllocatedChunk, CHUNKS_ALLOCATOR};
2use crate::memory_fs::file::internal::{
3    FileChunk, MemoryFileInternal, MemoryFileMode, OpenMode, UnderlyingFile,
4};
5use crate::memory_fs::stats;
6use bincode::enc::write::Writer;
7use parking_lot::{RwLock, RwLockWriteGuard};
8use std::io::{Seek, SeekFrom, Write};
9use std::ops::{Deref, DerefMut};
10use std::path::{Path, PathBuf};
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::Arc;
13
14pub struct FileWriter {
15    path: PathBuf,
16    current_buffer: RwLock<AllocatedChunk>,
17    file_length: AtomicU64,
18    file: Arc<RwLock<MemoryFileInternal>>,
19}
20
21impl FileWriter {
22    /// Creates a new file with the specified mode
23    pub fn create(path: impl AsRef<Path>, mode: MemoryFileMode) -> Self {
24        Self {
25            path: PathBuf::from(path.as_ref()),
26            current_buffer: RwLock::new(
27                CHUNKS_ALLOCATOR.request_chunk(chunk_usage!(TemporarySpace)),
28            ),
29            file_length: AtomicU64::new(0),
30            file: {
31                let file = MemoryFileInternal::create_new(path, mode);
32                file.write().open(OpenMode::Write).unwrap();
33                file
34            },
35        }
36    }
37
38    /// Returns the total length of the file
39    pub fn len(&self) -> u64 {
40        self.file_length.load(Ordering::Relaxed) + self.current_buffer.read().len() as u64
41    }
42
43    /// Overwrites bytes at the start of the file, the data field should not be longer than 128 bytes
44    pub fn write_at_start(&mut self, data: &[u8]) -> Result<(), ()> {
45        if data.len() > 128 {
46            return Err(());
47        }
48
49        unsafe {
50            let current_buffer_lock = self.current_buffer.read();
51
52            let file_read = self.file.read();
53
54            if file_read.get_chunks_count() > 0 {
55                drop(current_buffer_lock);
56                match file_read.get_chunk(0).read().deref() {
57                    FileChunk::OnDisk { .. } => {
58                        if let UnderlyingFile::WriteMode { file, .. } =
59                            file_read.get_underlying_file()
60                        {
61                            let mut disk_file_lock = file.lock();
62                            let mut disk_file = disk_file_lock.get_file();
63                            let position = disk_file.stream_position().unwrap();
64                            disk_file.seek(SeekFrom::Start(0)).unwrap();
65                            disk_file.write_all(data).unwrap();
66                            disk_file.seek(SeekFrom::Start(position)).unwrap();
67                            Ok(())
68                        } else {
69                            Err(())
70                        }
71                    }
72                    FileChunk::OnMemory { chunk } => {
73                        std::ptr::copy_nonoverlapping(
74                            data.as_ptr(),
75                            chunk.get_mut_ptr(),
76                            data.len(),
77                        );
78                        Ok(())
79                    }
80                }
81            } else {
82                std::ptr::copy_nonoverlapping(
83                    data.as_ptr(),
84                    current_buffer_lock.get_mut_ptr(),
85                    data.len(),
86                );
87                Ok(())
88            }
89        }
90    }
91
92    /// Appends atomically all the data to the file, returning the start position of the written data in the file
93    pub fn write_all_parallel(&self, data: &[u8], el_size: usize) -> u64 {
94        // Update stats
95        stats::add_files_usage(data.len() as u64);
96
97        let buffer = self.current_buffer.read();
98        if let Some(chunk_position) = buffer.write_bytes_noextend(data) {
99            self.file_length.load(Ordering::Relaxed) + chunk_position
100        } else {
101            drop(buffer);
102            let mut buffer = self.current_buffer.write();
103
104            // Check if now the buffer can be written
105            if let Some(chunk_position) = buffer.write_bytes_noextend(data) {
106                return self.file_length.load(Ordering::Relaxed) + chunk_position;
107            }
108
109            let mut temp_vec = Vec::new();
110
111            let position = self
112                .file_length
113                .fetch_add(buffer.len() as u64, Ordering::SeqCst)
114                + (buffer.len() as u64);
115
116            replace_with::replace_with_or_abort(buffer.deref_mut(), |buffer| {
117                let new_buffer = MemoryFileInternal::reserve_space(
118                    &self.file,
119                    buffer,
120                    &mut temp_vec,
121                    data.len(),
122                    el_size,
123                );
124                new_buffer
125            });
126
127            // Add the completely filled chunks to the file, removing the last size as the current chunk is not counted yet in the file_length
128            self.file_length
129                .fetch_add((data.len() - buffer.len()) as u64, Ordering::SeqCst);
130
131            let _buffer_read = RwLockWriteGuard::downgrade(buffer);
132
133            let mut offset = 0;
134            for (_lock, part) in temp_vec.drain(..) {
135                part.copy_from_slice(&data[offset..(offset + part.len())]);
136                offset += part.len();
137            }
138
139            if self.file.read().is_on_disk() {
140                self.file.write().flush_chunks(usize::MAX);
141            }
142            position
143        }
144    }
145
146    pub fn get_path(&self) -> PathBuf {
147        self.path.clone()
148    }
149
150    pub fn flush_async(&self) {}
151}
152
153impl Writer for FileWriter {
154    fn write(&mut self, bytes: &[u8]) -> Result<(), bincode::error::EncodeError> {
155        self.write_all_parallel(bytes, 1);
156        Ok(())
157    }
158}
159
160impl Write for FileWriter {
161    #[inline(always)]
162    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
163        self.write_all_parallel(buf, 1);
164        Ok(buf.len())
165    }
166
167    #[inline(always)]
168    fn flush(&mut self) -> std::io::Result<()> {
169        Ok(())
170    }
171}
172
173impl Drop for FileWriter {
174    fn drop(&mut self) {
175        let mut current_buffer = self.current_buffer.write();
176        if current_buffer.len() > 0 {
177            MemoryFileInternal::add_chunk(
178                &self.file,
179                std::mem::replace(current_buffer.deref_mut(), AllocatedChunk::INVALID),
180            );
181            if self.file.read().is_on_disk() {
182                self.file.write().flush_chunks(usize::MAX);
183            }
184        }
185        self.file.write().close();
186    }
187}