parallel_processor/memory_fs/file/
writer.rs1use crate::memory_fs::allocator::{AllocatedChunk, CHUNKS_ALLOCATOR};
2use crate::memory_fs::file::internal::{
3 FileChunk, MemoryFileInternal, MemoryFileMode, OpenMode, UnderlyingFile,
4};
5use crate::memory_fs::stats;
6use parking_lot::{RwLock, RwLockWriteGuard};
7use std::io::{Seek, SeekFrom, Write};
8use std::ops::{Deref, DerefMut};
9use std::path::{Path, PathBuf};
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::Arc;
12
13pub struct FileWriter {
14 path: PathBuf,
15 current_buffer: RwLock<AllocatedChunk>,
16 file_length: AtomicU64,
17 file: Arc<RwLock<MemoryFileInternal>>,
18}
19
20impl FileWriter {
21 pub fn create(path: impl AsRef<Path>, mode: MemoryFileMode) -> Self {
23 Self {
24 path: PathBuf::from(path.as_ref()),
25 current_buffer: RwLock::new(
26 CHUNKS_ALLOCATOR.request_chunk(chunk_usage!(TemporarySpace)),
27 ),
28 file_length: AtomicU64::new(0),
29 file: {
30 let file = MemoryFileInternal::create_new(path, mode);
31 file.write().open(OpenMode::Write).unwrap();
32 file
33 },
34 }
35 }
36
37 pub fn len(&self) -> usize {
39 self.file.read().len() + self.current_buffer.read().len()
40 }
41
42 pub fn write_at_start(&mut self, data: &[u8]) -> Result<(), ()> {
44 if data.len() > 128 {
45 return Err(());
46 }
47
48 unsafe {
49 let current_buffer_lock = self.current_buffer.read();
50
51 let file_read = self.file.read();
52
53 if file_read.get_chunks_count() > 0 {
54 drop(current_buffer_lock);
55 match file_read.get_chunk(0).read().deref() {
56 FileChunk::OnDisk { .. } => {
57 if let UnderlyingFile::WriteMode { file, .. } =
58 file_read.get_underlying_file()
59 {
60 let mut disk_file_lock = file.lock();
61 let mut disk_file = disk_file_lock.get_file();
62 let position = disk_file.stream_position().unwrap();
63 disk_file.seek(SeekFrom::Start(0)).unwrap();
64 disk_file.write_all(data).unwrap();
65 disk_file.seek(SeekFrom::Start(position)).unwrap();
66 Ok(())
67 } else {
68 Err(())
69 }
70 }
71 FileChunk::OnMemory { chunk } => {
72 std::ptr::copy_nonoverlapping(
73 data.as_ptr(),
74 chunk.get_mut_ptr(),
75 data.len(),
76 );
77 Ok(())
78 }
79 }
80 } else {
81 std::ptr::copy_nonoverlapping(
82 data.as_ptr(),
83 current_buffer_lock.get_mut_ptr(),
84 data.len(),
85 );
86 Ok(())
87 }
88 }
89 }
90
91 pub fn write_all_parallel(&self, data: &[u8], el_size: usize) -> u64 {
93 stats::add_files_usage(data.len() as u64);
95
96 let buffer = self.current_buffer.read();
97 if let Some(chunk_position) = buffer.write_bytes_noextend(data) {
98 self.file_length.load(Ordering::Relaxed) + chunk_position
99 } else {
100 drop(buffer);
101 let mut buffer = self.current_buffer.write();
102
103 if let Some(chunk_position) = buffer.write_bytes_noextend(data) {
105 return self.file_length.load(Ordering::Relaxed) + chunk_position;
106 }
107
108 let mut temp_vec = Vec::new();
109
110 let position = self
111 .file_length
112 .fetch_add(buffer.len() as u64, Ordering::SeqCst)
113 + (buffer.len() as u64);
114
115 replace_with::replace_with_or_abort(buffer.deref_mut(), |buffer| {
116 let new_buffer = MemoryFileInternal::reserve_space(
117 &self.file,
118 buffer,
119 &mut temp_vec,
120 data.len(),
121 el_size,
122 );
123 new_buffer
124 });
125
126 self.file_length
128 .fetch_add((data.len() - buffer.len()) as u64, Ordering::SeqCst);
129
130 let _buffer_read = RwLockWriteGuard::downgrade(buffer);
131
132 let mut offset = 0;
133 for (_lock, part) in temp_vec.drain(..) {
134 part.copy_from_slice(&data[offset..(offset + part.len())]);
135 offset += part.len();
136 }
137
138 if self.file.read().is_on_disk() {
139 self.file.write().flush_chunks(usize::MAX);
140 }
141 position
142 }
143 }
144
145 pub fn get_path(&self) -> PathBuf {
146 self.path.clone()
147 }
148
149 pub fn flush_async(&self) {}
150}
151
152impl Write for FileWriter {
153 #[inline(always)]
154 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
155 self.write_all_parallel(buf, 1);
156 Ok(buf.len())
157 }
158
159 #[inline(always)]
160 fn flush(&mut self) -> std::io::Result<()> {
161 Ok(())
162 }
163}
164
165impl Drop for FileWriter {
166 fn drop(&mut self) {
167 let mut current_buffer = self.current_buffer.write();
168 if current_buffer.len() > 0 {
169 MemoryFileInternal::add_chunk(
170 &self.file,
171 std::mem::replace(current_buffer.deref_mut(), AllocatedChunk::INVALID),
172 );
173 if self.file.read().is_on_disk() {
174 self.file.write().flush_chunks(usize::MAX);
175 }
176 }
177 self.file.write().close();
178 }
179}