parallel_processor/memory_fs/file/
reader.rs

1use crate::memory_fs::file::internal::{FileChunk, MemoryFileInternal, OpenMode};
2use parking_lot::lock_api::ArcRwLockReadGuard;
3use parking_lot::{RawRwLock, RwLock};
4use std::cmp::min;
5use std::io;
6use std::io::{ErrorKind, Read, Seek, SeekFrom};
7use std::ops::Range;
8use std::path::{Path, PathBuf};
9use std::slice::from_raw_parts;
10use std::sync::Arc;
11
12use super::writer::FileWriter;
13
14#[derive(Clone)]
15pub struct FileRangeReference {
16    file: Arc<RwLock<MemoryFileInternal>>,
17    start_chunk: usize,
18    start_chunk_offset: usize,
19    bytes_count: usize,
20}
21
22impl FileRangeReference {
23    pub unsafe fn copy_to_unsync(&self, other: &FileWriter) {
24        let file = self.file.read();
25        let mut chunk_index = self.start_chunk;
26        let mut chunk_offset = self.start_chunk_offset;
27        let mut written_bytes = 0;
28
29        while written_bytes < self.bytes_count {
30            let chunk = file.get_chunk(chunk_index);
31            let chunk = chunk.read();
32            let to_copy = (chunk.get_length() - chunk_offset).min(self.bytes_count - written_bytes);
33
34            let data = chunk
35                .get_ptr(&file.get_underlying_file(), None)
36                .add(chunk_offset);
37
38            other.write_all_parallel(from_raw_parts(data, to_copy), to_copy);
39
40            written_bytes += to_copy;
41            chunk_index += 1;
42            chunk_offset = 0;
43        }
44
45        // other.write_all_parallel(data, el_size)
46    }
47}
48
49pub struct FileReader {
50    path: PathBuf,
51    file: Arc<RwLock<MemoryFileInternal>>,
52    current_chunk_ref: Option<ArcRwLockReadGuard<RawRwLock, FileChunk>>,
53    current_chunk_index: usize,
54    chunks_count: usize,
55    current_ptr: *const u8,
56    current_len: usize,
57    current_file_position: usize,
58    prefetch_amount: Option<usize>,
59}
60
61unsafe impl Sync for FileReader {}
62unsafe impl Send for FileReader {}
63
64impl Clone for FileReader {
65    fn clone(&self) -> Self {
66        Self {
67            path: self.path.clone(),
68            file: self.file.clone(),
69            current_chunk_ref: self
70                .current_chunk_ref
71                .as_ref()
72                .map(|c| ArcRwLockReadGuard::rwlock(&c).read_arc()),
73            current_chunk_index: self.current_chunk_index,
74            chunks_count: self.chunks_count,
75            current_ptr: self.current_ptr,
76            current_len: self.current_len,
77            current_file_position: self.current_file_position,
78            prefetch_amount: self.prefetch_amount,
79        }
80    }
81}
82
83impl FileReader {
84    fn set_chunk_info(&mut self, index: usize) {
85        let file = self.file.read();
86
87        let chunk = file.get_chunk(index);
88        let chunk_guard = chunk.read_arc();
89
90        let underlying_file = file.get_underlying_file();
91
92        self.current_ptr = chunk_guard.get_ptr(&underlying_file, self.prefetch_amount);
93        self.current_len = chunk_guard.get_length();
94        self.current_chunk_ref = Some(chunk_guard);
95    }
96
97    pub fn open(path: impl AsRef<Path>, prefetch_amount: Option<usize>) -> Option<Self> {
98        let file = match MemoryFileInternal::retrieve_reference(&path) {
99            None => MemoryFileInternal::create_from_fs(&path)?,
100            Some(x) => x,
101        };
102
103        let mut file_lock = file.write();
104
105        file_lock.open(OpenMode::Read).unwrap();
106        let chunks_count = file_lock.get_chunks_count();
107        drop(file_lock);
108
109        let mut reader = Self {
110            path: path.as_ref().into(),
111            file,
112            current_chunk_ref: None,
113            current_chunk_index: 0,
114            chunks_count,
115            current_ptr: std::ptr::null(),
116            current_len: 0,
117            current_file_position: 0,
118            prefetch_amount,
119        };
120
121        if reader.chunks_count > 0 {
122            reader.set_chunk_info(0);
123        }
124
125        Some(reader)
126    }
127
128    pub fn get_unique_file_id(&self) -> usize {
129        self.file.data_ptr() as usize
130    }
131
132    pub fn total_file_size(&self) -> usize {
133        self.file.read().len()
134    }
135
136    pub fn get_file_path(&self) -> &Path {
137        &self.path
138    }
139
140    pub fn close_and_remove(self, remove_fs: bool) -> bool {
141        MemoryFileInternal::delete(self.path, remove_fs)
142    }
143
144    pub fn get_range_reference(&self, file_range: Range<u64>) -> FileRangeReference {
145        let file = self.file.read();
146        let mut chunk_index = 0;
147        let mut chunk_offset = 0;
148        let mut start = file_range.start;
149
150        while start > 0 {
151            let chunk = file.get_chunk(chunk_index);
152            let chunk = chunk.read();
153            let len = chunk.get_length() as u64;
154            if start < len {
155                chunk_offset = start as usize;
156                break;
157            }
158            start -= len;
159            chunk_index += 1;
160        }
161
162        FileRangeReference {
163            file: self.file.clone(),
164            start_chunk: chunk_index,
165            start_chunk_offset: chunk_offset,
166            bytes_count: file_range.end as usize - file_range.start as usize,
167        }
168    }
169
170    // pub fn get_typed_chunks_mut<T>(&mut self) -> Option<impl Iterator<Item = &mut [T]>> {
171    //     todo!();
172    //     Some((0..1).into_iter().map(|_| &mut [0, 1][..]))
173    // }
174}
175
176impl Read for FileReader {
177    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
178        let mut bytes_written = 0;
179
180        while bytes_written != buf.len() {
181            if self.current_len == 0 {
182                self.current_chunk_index += 1;
183                if self.current_chunk_index >= self.chunks_count {
184                    // End of file
185                    return Ok(bytes_written);
186                }
187                self.set_chunk_info(self.current_chunk_index);
188            }
189
190            let copyable_bytes = min(buf.len() - bytes_written, self.current_len);
191
192            unsafe {
193                std::ptr::copy_nonoverlapping(
194                    self.current_ptr,
195                    buf.as_mut_ptr().add(bytes_written),
196                    copyable_bytes,
197                );
198                self.current_ptr = self.current_ptr.add(copyable_bytes);
199                self.current_len -= copyable_bytes;
200                self.current_file_position += copyable_bytes;
201                bytes_written += copyable_bytes;
202            }
203        }
204
205        Ok(bytes_written)
206    }
207
208    #[inline(always)]
209    fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
210        match self.read(buf) {
211            Ok(count) => {
212                if count == buf.len() {
213                    Ok(())
214                } else {
215                    Err(io::Error::new(
216                        io::ErrorKind::Other,
217                        "Unexpected error while reading",
218                    ))
219                }
220            }
221            Err(err) => Err(err),
222        }
223    }
224}
225
226impl Seek for FileReader {
227    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
228        match pos {
229            SeekFrom::Start(mut offset) => {
230                let mut chunk_idx = 0;
231
232                let file = self.file.read();
233                while chunk_idx < self.chunks_count {
234                    let len = file.get_chunk(chunk_idx).read().get_length();
235                    if offset < (len as u64) {
236                        break;
237                    }
238                    chunk_idx += 1;
239                    offset -= len as u64;
240                }
241
242                if chunk_idx == self.chunks_count {
243                    return Err(std::io::Error::new(
244                        ErrorKind::UnexpectedEof,
245                        "Unexpected eof",
246                    ));
247                }
248
249                self.current_chunk_index = chunk_idx;
250                drop(file);
251                self.set_chunk_info(chunk_idx);
252
253                self.current_ptr = unsafe { self.current_ptr.add(offset as usize) };
254                self.current_len -= offset as usize;
255                self.current_file_position = offset as usize;
256
257                return Ok(offset);
258            }
259            SeekFrom::Current(offset) => {
260                assert!(offset >= 0); // < 0  not supported yet
261                let mut offset = offset as usize;
262                loop {
263                    let clen_offset = offset.min(self.current_len);
264                    offset -= clen_offset;
265                    self.current_len -= clen_offset;
266                    self.current_ptr = unsafe { self.current_ptr.add(clen_offset) };
267                    self.current_file_position += clen_offset;
268
269                    if offset == 0 {
270                        break Ok(self.current_file_position as u64);
271                    }
272
273                    if self.current_chunk_index >= self.chunks_count - 1 {
274                        break Err(std::io::Error::new(
275                            ErrorKind::UnexpectedEof,
276                            "Unexpected eof",
277                        ));
278                    }
279
280                    self.current_chunk_index += 1;
281                    self.set_chunk_info(self.current_chunk_index);
282                }
283            }
284            _ => {
285                unimplemented!()
286            }
287        }
288    }
289
290    fn stream_position(&mut self) -> io::Result<u64> {
291        let mut position = 0;
292
293        let file_read = self.file.read();
294
295        for i in 0..self.current_chunk_index {
296            position += file_read.get_chunk(i).read().get_length();
297        }
298
299        position += file_read
300            .get_chunk(self.current_chunk_index)
301            .read()
302            .get_length()
303            - self.current_len;
304
305        Ok(position as u64)
306    }
307}