parallel_processor/memory_fs/file/
reader.rs1use crate::memory_fs::file::internal::{FileChunk, MemoryFileInternal, OpenMode};
2use parking_lot::lock_api::ArcRwLockReadGuard;
3use parking_lot::{RawRwLock, RwLock};
4use std::cmp::min;
5use std::io;
6use std::io::{ErrorKind, Read, Seek, SeekFrom};
7use std::ops::Range;
8use std::path::{Path, PathBuf};
9use std::slice::from_raw_parts;
10use std::sync::Arc;
11
12use super::writer::FileWriter;
13
14#[derive(Clone)]
15pub struct FileRangeReference {
16 file: Arc<RwLock<MemoryFileInternal>>,
17 start_chunk: usize,
18 start_chunk_offset: usize,
19 bytes_count: usize,
20}
21
22impl FileRangeReference {
23 pub unsafe fn copy_to_unsync(&self, other: &FileWriter) {
24 let file = self.file.read();
25 let mut chunk_index = self.start_chunk;
26 let mut chunk_offset = self.start_chunk_offset;
27 let mut written_bytes = 0;
28
29 while written_bytes < self.bytes_count {
30 let chunk = file.get_chunk(chunk_index);
31 let chunk = chunk.read();
32 let to_copy = (chunk.get_length() - chunk_offset).min(self.bytes_count - written_bytes);
33
34 let data = chunk
35 .get_ptr(&file.get_underlying_file(), None)
36 .add(chunk_offset);
37
38 other.write_all_parallel(from_raw_parts(data, to_copy), to_copy);
39
40 written_bytes += to_copy;
41 chunk_index += 1;
42 chunk_offset = 0;
43 }
44
45 }
47}
48
49pub struct FileReader {
50 path: PathBuf,
51 file: Arc<RwLock<MemoryFileInternal>>,
52 current_chunk_ref: Option<ArcRwLockReadGuard<RawRwLock, FileChunk>>,
53 current_chunk_index: usize,
54 chunks_count: usize,
55 current_ptr: *const u8,
56 current_len: usize,
57 current_file_position: usize,
58 prefetch_amount: Option<usize>,
59}
60
61unsafe impl Sync for FileReader {}
62unsafe impl Send for FileReader {}
63
64impl Clone for FileReader {
65 fn clone(&self) -> Self {
66 Self {
67 path: self.path.clone(),
68 file: self.file.clone(),
69 current_chunk_ref: self
70 .current_chunk_ref
71 .as_ref()
72 .map(|c| ArcRwLockReadGuard::rwlock(&c).read_arc()),
73 current_chunk_index: self.current_chunk_index,
74 chunks_count: self.chunks_count,
75 current_ptr: self.current_ptr,
76 current_len: self.current_len,
77 current_file_position: self.current_file_position,
78 prefetch_amount: self.prefetch_amount,
79 }
80 }
81}
82
83impl FileReader {
84 fn set_chunk_info(&mut self, index: usize) {
85 let file = self.file.read();
86
87 let chunk = file.get_chunk(index);
88 let chunk_guard = chunk.read_arc();
89
90 let underlying_file = file.get_underlying_file();
91
92 self.current_ptr = chunk_guard.get_ptr(&underlying_file, self.prefetch_amount);
93 self.current_len = chunk_guard.get_length();
94 self.current_chunk_ref = Some(chunk_guard);
95 }
96
97 pub fn open(path: impl AsRef<Path>, prefetch_amount: Option<usize>) -> Option<Self> {
98 let file = match MemoryFileInternal::retrieve_reference(&path) {
99 None => MemoryFileInternal::create_from_fs(&path)?,
100 Some(x) => x,
101 };
102
103 let mut file_lock = file.write();
104
105 file_lock.open(OpenMode::Read).unwrap();
106 let chunks_count = file_lock.get_chunks_count();
107 drop(file_lock);
108
109 let mut reader = Self {
110 path: path.as_ref().into(),
111 file,
112 current_chunk_ref: None,
113 current_chunk_index: 0,
114 chunks_count,
115 current_ptr: std::ptr::null(),
116 current_len: 0,
117 current_file_position: 0,
118 prefetch_amount,
119 };
120
121 if reader.chunks_count > 0 {
122 reader.set_chunk_info(0);
123 }
124
125 Some(reader)
126 }
127
128 pub fn get_unique_file_id(&self) -> usize {
129 self.file.data_ptr() as usize
130 }
131
132 pub fn total_file_size(&self) -> usize {
133 self.file.read().len()
134 }
135
136 pub fn get_file_path(&self) -> &Path {
137 &self.path
138 }
139
140 pub fn close_and_remove(self, remove_fs: bool) -> bool {
141 MemoryFileInternal::delete(self.path, remove_fs)
142 }
143
144 pub fn get_range_reference(&self, file_range: Range<u64>) -> FileRangeReference {
145 let file = self.file.read();
146 let mut chunk_index = 0;
147 let mut chunk_offset = 0;
148 let mut start = file_range.start;
149
150 while start > 0 {
151 let chunk = file.get_chunk(chunk_index);
152 let chunk = chunk.read();
153 let len = chunk.get_length() as u64;
154 if start < len {
155 chunk_offset = start as usize;
156 break;
157 }
158 start -= len;
159 chunk_index += 1;
160 }
161
162 FileRangeReference {
163 file: self.file.clone(),
164 start_chunk: chunk_index,
165 start_chunk_offset: chunk_offset,
166 bytes_count: file_range.end as usize - file_range.start as usize,
167 }
168 }
169
170 }
175
176impl Read for FileReader {
177 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
178 let mut bytes_written = 0;
179
180 while bytes_written != buf.len() {
181 if self.current_len == 0 {
182 self.current_chunk_index += 1;
183 if self.current_chunk_index >= self.chunks_count {
184 return Ok(bytes_written);
186 }
187 self.set_chunk_info(self.current_chunk_index);
188 }
189
190 let copyable_bytes = min(buf.len() - bytes_written, self.current_len);
191
192 unsafe {
193 std::ptr::copy_nonoverlapping(
194 self.current_ptr,
195 buf.as_mut_ptr().add(bytes_written),
196 copyable_bytes,
197 );
198 self.current_ptr = self.current_ptr.add(copyable_bytes);
199 self.current_len -= copyable_bytes;
200 self.current_file_position += copyable_bytes;
201 bytes_written += copyable_bytes;
202 }
203 }
204
205 Ok(bytes_written)
206 }
207
208 #[inline(always)]
209 fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
210 match self.read(buf) {
211 Ok(count) => {
212 if count == buf.len() {
213 Ok(())
214 } else {
215 Err(io::Error::new(
216 io::ErrorKind::Other,
217 "Unexpected error while reading",
218 ))
219 }
220 }
221 Err(err) => Err(err),
222 }
223 }
224}
225
226impl Seek for FileReader {
227 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
228 match pos {
229 SeekFrom::Start(mut offset) => {
230 let mut chunk_idx = 0;
231
232 let file = self.file.read();
233 while chunk_idx < self.chunks_count {
234 let len = file.get_chunk(chunk_idx).read().get_length();
235 if offset < (len as u64) {
236 break;
237 }
238 chunk_idx += 1;
239 offset -= len as u64;
240 }
241
242 if chunk_idx == self.chunks_count {
243 return Err(std::io::Error::new(
244 ErrorKind::UnexpectedEof,
245 "Unexpected eof",
246 ));
247 }
248
249 self.current_chunk_index = chunk_idx;
250 drop(file);
251 self.set_chunk_info(chunk_idx);
252
253 self.current_ptr = unsafe { self.current_ptr.add(offset as usize) };
254 self.current_len -= offset as usize;
255 self.current_file_position = offset as usize;
256
257 return Ok(offset);
258 }
259 SeekFrom::Current(offset) => {
260 assert!(offset >= 0); let mut offset = offset as usize;
262 loop {
263 let clen_offset = offset.min(self.current_len);
264 offset -= clen_offset;
265 self.current_len -= clen_offset;
266 self.current_ptr = unsafe { self.current_ptr.add(clen_offset) };
267 self.current_file_position += clen_offset;
268
269 if offset == 0 {
270 break Ok(self.current_file_position as u64);
271 }
272
273 if self.current_chunk_index >= self.chunks_count - 1 {
274 break Err(std::io::Error::new(
275 ErrorKind::UnexpectedEof,
276 "Unexpected eof",
277 ));
278 }
279
280 self.current_chunk_index += 1;
281 self.set_chunk_info(self.current_chunk_index);
282 }
283 }
284 _ => {
285 unimplemented!()
286 }
287 }
288 }
289
290 fn stream_position(&mut self) -> io::Result<u64> {
291 let mut position = 0;
292
293 let file_read = self.file.read();
294
295 for i in 0..self.current_chunk_index {
296 position += file_read.get_chunk(i).read().get_length();
297 }
298
299 position += file_read
300 .get_chunk(self.current_chunk_index)
301 .read()
302 .get_length()
303 - self.current_len;
304
305 Ok(position as u64)
306 }
307}