parallel_processor/memory_fs/file/
writer.rs1use crate::memory_fs::allocator::{AllocatedChunk, CHUNKS_ALLOCATOR};
2use crate::memory_fs::file::internal::{
3 FileChunk, MemoryFileInternal, MemoryFileMode, OpenMode, UnderlyingFile,
4};
5use crate::memory_fs::stats;
6use bincode::enc::write::Writer;
7use parking_lot::{RwLock, RwLockWriteGuard};
8use std::io::{Seek, SeekFrom, Write};
9use std::ops::{Deref, DerefMut};
10use std::path::{Path, PathBuf};
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::Arc;
13
14pub struct FileWriter {
15 path: PathBuf,
16 current_buffer: RwLock<AllocatedChunk>,
17 file_length: AtomicU64,
18 file: Arc<RwLock<MemoryFileInternal>>,
19}
20
21impl FileWriter {
22 pub fn create(path: impl AsRef<Path>, mode: MemoryFileMode) -> Self {
24 Self {
25 path: PathBuf::from(path.as_ref()),
26 current_buffer: RwLock::new(
27 CHUNKS_ALLOCATOR.request_chunk(chunk_usage!(TemporarySpace)),
28 ),
29 file_length: AtomicU64::new(0),
30 file: {
31 let file = MemoryFileInternal::create_new(path, mode);
32 file.write().open(OpenMode::Write).unwrap();
33 file
34 },
35 }
36 }
37
38 pub fn len(&self) -> u64 {
40 self.file_length.load(Ordering::Relaxed) + self.current_buffer.read().len() as u64
41 }
42
43 pub fn write_at_start(&mut self, data: &[u8]) -> Result<(), ()> {
45 if data.len() > 128 {
46 return Err(());
47 }
48
49 unsafe {
50 let current_buffer_lock = self.current_buffer.read();
51
52 let file_read = self.file.read();
53
54 if file_read.get_chunks_count() > 0 {
55 drop(current_buffer_lock);
56 match file_read.get_chunk(0).read().deref() {
57 FileChunk::OnDisk { .. } => {
58 if let UnderlyingFile::WriteMode { file, .. } =
59 file_read.get_underlying_file()
60 {
61 let mut disk_file_lock = file.lock();
62 let mut disk_file = disk_file_lock.get_file();
63 let position = disk_file.stream_position().unwrap();
64 disk_file.seek(SeekFrom::Start(0)).unwrap();
65 disk_file.write_all(data).unwrap();
66 disk_file.seek(SeekFrom::Start(position)).unwrap();
67 Ok(())
68 } else {
69 Err(())
70 }
71 }
72 FileChunk::OnMemory { chunk } => {
73 std::ptr::copy_nonoverlapping(
74 data.as_ptr(),
75 chunk.get_mut_ptr(),
76 data.len(),
77 );
78 Ok(())
79 }
80 }
81 } else {
82 std::ptr::copy_nonoverlapping(
83 data.as_ptr(),
84 current_buffer_lock.get_mut_ptr(),
85 data.len(),
86 );
87 Ok(())
88 }
89 }
90 }
91
92 pub fn write_all_parallel(&self, data: &[u8], el_size: usize) -> u64 {
94 stats::add_files_usage(data.len() as u64);
96
97 let buffer = self.current_buffer.read();
98 if let Some(chunk_position) = buffer.write_bytes_noextend(data) {
99 self.file_length.load(Ordering::Relaxed) + chunk_position
100 } else {
101 drop(buffer);
102 let mut buffer = self.current_buffer.write();
103
104 if let Some(chunk_position) = buffer.write_bytes_noextend(data) {
106 return self.file_length.load(Ordering::Relaxed) + chunk_position;
107 }
108
109 let mut temp_vec = Vec::new();
110
111 let position = self
112 .file_length
113 .fetch_add(buffer.len() as u64, Ordering::SeqCst)
114 + (buffer.len() as u64);
115
116 replace_with::replace_with_or_abort(buffer.deref_mut(), |buffer| {
117 let new_buffer = MemoryFileInternal::reserve_space(
118 &self.file,
119 buffer,
120 &mut temp_vec,
121 data.len(),
122 el_size,
123 );
124 new_buffer
125 });
126
127 self.file_length
129 .fetch_add((data.len() - buffer.len()) as u64, Ordering::SeqCst);
130
131 let _buffer_read = RwLockWriteGuard::downgrade(buffer);
132
133 let mut offset = 0;
134 for (_lock, part) in temp_vec.drain(..) {
135 part.copy_from_slice(&data[offset..(offset + part.len())]);
136 offset += part.len();
137 }
138
139 if self.file.read().is_on_disk() {
140 self.file.write().flush_chunks(usize::MAX);
141 }
142 position
143 }
144 }
145
146 pub fn get_path(&self) -> PathBuf {
147 self.path.clone()
148 }
149
150 pub fn flush_async(&self) {}
151}
152
153impl Writer for FileWriter {
154 fn write(&mut self, bytes: &[u8]) -> Result<(), bincode::error::EncodeError> {
155 self.write_all_parallel(bytes, 1);
156 Ok(())
157 }
158}
159
160impl Write for FileWriter {
161 #[inline(always)]
162 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
163 self.write_all_parallel(buf, 1);
164 Ok(buf.len())
165 }
166
167 #[inline(always)]
168 fn flush(&mut self) -> std::io::Result<()> {
169 Ok(())
170 }
171}
172
173impl Drop for FileWriter {
174 fn drop(&mut self) {
175 let mut current_buffer = self.current_buffer.write();
176 if current_buffer.len() > 0 {
177 MemoryFileInternal::add_chunk(
178 &self.file,
179 std::mem::replace(current_buffer.deref_mut(), AllocatedChunk::INVALID),
180 );
181 if self.file.read().is_on_disk() {
182 self.file.write().flush_chunks(usize::MAX);
183 }
184 }
185 self.file.write().close();
186 }
187}