parallel_processor/memory_fs/file/
reader.rs1use crate::memory_fs::file::internal::{FileChunk, MemoryFileInternal, OpenMode};
2use parking_lot::lock_api::ArcRwLockReadGuard;
3use parking_lot::{RawRwLock, RwLock};
4use std::cmp::min;
5use std::io;
6use std::io::{ErrorKind, Read, Seek, SeekFrom};
7use std::ops::Range;
8use std::path::{Path, PathBuf};
9use std::slice::from_raw_parts;
10use std::sync::Arc;
11
12use super::writer::FileWriter;
13
14#[derive(Clone)]
15pub struct FileRangeReference {
16 file: Arc<RwLock<MemoryFileInternal>>,
17 start_chunk: usize,
18 start_chunk_offset: usize,
19 bytes_count: usize,
20}
21
22impl FileRangeReference {
23 pub unsafe fn copy_to_unsync(&self, other: &FileWriter) {
24 let file = self.file.read();
25 let mut chunk_index = self.start_chunk;
26 let mut chunk_offset = self.start_chunk_offset;
27 let mut written_bytes = 0;
28
29 while written_bytes < self.bytes_count {
30 let chunk = file.get_chunk(chunk_index);
31 let chunk = chunk.read();
32 let to_copy = (chunk.get_length() - chunk_offset).min(self.bytes_count - written_bytes);
33
34 let data = chunk
35 .get_ptr(&file.get_underlying_file(), None)
36 .add(chunk_offset);
37
38 other.write_all_parallel(from_raw_parts(data, to_copy), to_copy);
39
40 written_bytes += to_copy;
41 chunk_index += 1;
42 chunk_offset = 0;
43 }
44
45 }
47}
48
49pub struct FileReader {
50 path: PathBuf,
51 file: Arc<RwLock<MemoryFileInternal>>,
52 current_chunk_ref: Option<ArcRwLockReadGuard<RawRwLock, FileChunk>>,
53 current_chunk_index: usize,
54 chunks_count: usize,
55 current_ptr: *const u8,
56 current_len: usize,
57 prefetch_amount: Option<usize>,
58}
59
60unsafe impl Sync for FileReader {}
61unsafe impl Send for FileReader {}
62
63impl Clone for FileReader {
64 fn clone(&self) -> Self {
65 Self {
66 path: self.path.clone(),
67 file: self.file.clone(),
68 current_chunk_ref: self
69 .current_chunk_ref
70 .as_ref()
71 .map(|c| ArcRwLockReadGuard::rwlock(&c).read_arc()),
72 current_chunk_index: self.current_chunk_index,
73 chunks_count: self.chunks_count,
74 current_ptr: self.current_ptr,
75 current_len: self.current_len,
76 prefetch_amount: self.prefetch_amount,
77 }
78 }
79}
80
81impl FileReader {
82 fn set_chunk_info(&mut self, index: usize) {
83 let file = self.file.read();
84
85 let chunk = file.get_chunk(index);
86 let chunk_guard = chunk.read_arc();
87
88 let underlying_file = file.get_underlying_file();
89
90 self.current_ptr = chunk_guard.get_ptr(&underlying_file, self.prefetch_amount);
91 self.current_len = chunk_guard.get_length();
92 self.current_chunk_ref = Some(chunk_guard);
93 }
94
95 pub fn open(path: impl AsRef<Path>, prefetch_amount: Option<usize>) -> Option<Self> {
96 let file = match MemoryFileInternal::retrieve_reference(&path) {
97 None => MemoryFileInternal::create_from_fs(&path)?,
98 Some(x) => x,
99 };
100
101 let mut file_lock = file.write();
102
103 file_lock.open(OpenMode::Read).unwrap();
104 let chunks_count = file_lock.get_chunks_count();
105 drop(file_lock);
106
107 let mut reader = Self {
108 path: path.as_ref().into(),
109 file,
110 current_chunk_ref: None,
111 current_chunk_index: 0,
112 chunks_count,
113 current_ptr: std::ptr::null(),
114 current_len: 0,
115 prefetch_amount,
116 };
117
118 if reader.chunks_count > 0 {
119 reader.set_chunk_info(0);
120 }
121
122 Some(reader)
123 }
124
125 pub fn total_file_size(&self) -> usize {
126 self.file.read().len()
127 }
128
129 pub fn close_and_remove(self, remove_fs: bool) -> bool {
130 MemoryFileInternal::delete(self.path, remove_fs)
131 }
132
133 pub fn get_range_reference(&self, file_range: Range<u64>) -> FileRangeReference {
134 let file = self.file.read();
135 let mut chunk_index = 0;
136 let mut chunk_offset = 0;
137 let mut start = file_range.start;
138
139 while start > 0 {
140 let chunk = file.get_chunk(chunk_index);
141 let chunk = chunk.read();
142 let len = chunk.get_length() as u64;
143 if start < len {
144 chunk_offset = start as usize;
145 break;
146 }
147 start -= len;
148 chunk_index += 1;
149 }
150
151 FileRangeReference {
152 file: self.file.clone(),
153 start_chunk: chunk_index,
154 start_chunk_offset: chunk_offset,
155 bytes_count: file_range.end as usize - file_range.start as usize,
156 }
157 }
158
159 }
164
165impl Read for FileReader {
166 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
167 let mut bytes_written = 0;
168
169 while bytes_written != buf.len() {
170 if self.current_len == 0 {
171 self.current_chunk_index += 1;
172 if self.current_chunk_index >= self.chunks_count {
173 return Ok(bytes_written);
175 }
176 self.set_chunk_info(self.current_chunk_index);
177 }
178
179 let copyable_bytes = min(buf.len() - bytes_written, self.current_len);
180
181 unsafe {
182 std::ptr::copy_nonoverlapping(
183 self.current_ptr,
184 buf.as_mut_ptr().add(bytes_written),
185 copyable_bytes,
186 );
187 self.current_ptr = self.current_ptr.add(copyable_bytes);
188 self.current_len -= copyable_bytes;
189 bytes_written += copyable_bytes;
190 }
191 }
192
193 Ok(bytes_written)
194 }
195
196 #[inline(always)]
197 fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
198 match self.read(buf) {
199 Ok(count) => {
200 if count == buf.len() {
201 Ok(())
202 } else {
203 Err(io::Error::new(
204 io::ErrorKind::Other,
205 "Unexpected error while reading",
206 ))
207 }
208 }
209 Err(err) => Err(err),
210 }
211 }
212}
213
214impl Seek for FileReader {
215 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
216 match pos {
217 SeekFrom::Start(mut offset) => {
218 let mut chunk_idx = 0;
219
220 let file = self.file.read();
221 while chunk_idx < self.chunks_count {
222 let len = file.get_chunk(chunk_idx).read().get_length();
223 if offset < (len as u64) {
224 break;
225 }
226 chunk_idx += 1;
227 offset -= len as u64;
228 }
229
230 if chunk_idx == self.chunks_count {
231 return Err(std::io::Error::new(
232 ErrorKind::UnexpectedEof,
233 "Unexpected eof",
234 ));
235 }
236
237 self.current_chunk_index = chunk_idx;
238 drop(file);
239 self.set_chunk_info(chunk_idx);
240 unsafe {
241 self.current_ptr = self.current_ptr.add(offset as usize);
242 self.current_len -= offset as usize;
243 }
244
245 return Ok(offset);
246 }
247 _ => {
248 unimplemented!()
249 }
250 }
251 }
252
253 fn stream_position(&mut self) -> io::Result<u64> {
254 let mut position = 0;
255
256 let file_read = self.file.read();
257
258 for i in 0..self.current_chunk_index {
259 position += file_read.get_chunk(i).read().get_length();
260 }
261
262 position += file_read
263 .get_chunk(self.current_chunk_index)
264 .read()
265 .get_length()
266 - self.current_len;
267
268 Ok(position as u64)
269 }
270}