parallel_processor/memory_fs/file/
reader.rs

1use crate::memory_fs::file::internal::{FileChunk, MemoryFileInternal, OpenMode};
2use parking_lot::lock_api::ArcRwLockReadGuard;
3use parking_lot::{RawRwLock, RwLock};
4use std::cmp::min;
5use std::io;
6use std::io::{ErrorKind, Read, Seek, SeekFrom};
7use std::ops::Range;
8use std::path::{Path, PathBuf};
9use std::slice::from_raw_parts;
10use std::sync::Arc;
11
12use super::writer::FileWriter;
13
14#[derive(Clone)]
15pub struct FileRangeReference {
16    file: Arc<RwLock<MemoryFileInternal>>,
17    start_chunk: usize,
18    start_chunk_offset: usize,
19    bytes_count: usize,
20}
21
22impl FileRangeReference {
23    pub unsafe fn copy_to_unsync(&self, other: &FileWriter) {
24        let file = self.file.read();
25        let mut chunk_index = self.start_chunk;
26        let mut chunk_offset = self.start_chunk_offset;
27        let mut written_bytes = 0;
28
29        while written_bytes < self.bytes_count {
30            let chunk = file.get_chunk(chunk_index);
31            let chunk = chunk.read();
32            let to_copy = (chunk.get_length() - chunk_offset).min(self.bytes_count - written_bytes);
33
34            let data = chunk
35                .get_ptr(&file.get_underlying_file(), None)
36                .add(chunk_offset);
37
38            other.write_all_parallel(from_raw_parts(data, to_copy), to_copy);
39
40            written_bytes += to_copy;
41            chunk_index += 1;
42            chunk_offset = 0;
43        }
44
45        // other.write_all_parallel(data, el_size)
46    }
47}
48
49pub struct FileReader {
50    path: PathBuf,
51    file: Arc<RwLock<MemoryFileInternal>>,
52    current_chunk_ref: Option<ArcRwLockReadGuard<RawRwLock, FileChunk>>,
53    current_chunk_index: usize,
54    chunks_count: usize,
55    current_ptr: *const u8,
56    current_len: usize,
57    prefetch_amount: Option<usize>,
58}
59
60unsafe impl Sync for FileReader {}
61unsafe impl Send for FileReader {}
62
63impl Clone for FileReader {
64    fn clone(&self) -> Self {
65        Self {
66            path: self.path.clone(),
67            file: self.file.clone(),
68            current_chunk_ref: self
69                .current_chunk_ref
70                .as_ref()
71                .map(|c| ArcRwLockReadGuard::rwlock(&c).read_arc()),
72            current_chunk_index: self.current_chunk_index,
73            chunks_count: self.chunks_count,
74            current_ptr: self.current_ptr,
75            current_len: self.current_len,
76            prefetch_amount: self.prefetch_amount,
77        }
78    }
79}
80
81impl FileReader {
82    fn set_chunk_info(&mut self, index: usize) {
83        let file = self.file.read();
84
85        let chunk = file.get_chunk(index);
86        let chunk_guard = chunk.read_arc();
87
88        let underlying_file = file.get_underlying_file();
89
90        self.current_ptr = chunk_guard.get_ptr(&underlying_file, self.prefetch_amount);
91        self.current_len = chunk_guard.get_length();
92        self.current_chunk_ref = Some(chunk_guard);
93    }
94
95    pub fn open(path: impl AsRef<Path>, prefetch_amount: Option<usize>) -> Option<Self> {
96        let file = match MemoryFileInternal::retrieve_reference(&path) {
97            None => MemoryFileInternal::create_from_fs(&path)?,
98            Some(x) => x,
99        };
100
101        let mut file_lock = file.write();
102
103        file_lock.open(OpenMode::Read).unwrap();
104        let chunks_count = file_lock.get_chunks_count();
105        drop(file_lock);
106
107        let mut reader = Self {
108            path: path.as_ref().into(),
109            file,
110            current_chunk_ref: None,
111            current_chunk_index: 0,
112            chunks_count,
113            current_ptr: std::ptr::null(),
114            current_len: 0,
115            prefetch_amount,
116        };
117
118        if reader.chunks_count > 0 {
119            reader.set_chunk_info(0);
120        }
121
122        Some(reader)
123    }
124
125    pub fn total_file_size(&self) -> usize {
126        self.file.read().len()
127    }
128
129    pub fn close_and_remove(self, remove_fs: bool) -> bool {
130        MemoryFileInternal::delete(self.path, remove_fs)
131    }
132
133    pub fn get_range_reference(&self, file_range: Range<u64>) -> FileRangeReference {
134        let file = self.file.read();
135        let mut chunk_index = 0;
136        let mut chunk_offset = 0;
137        let mut start = file_range.start;
138
139        while start > 0 {
140            let chunk = file.get_chunk(chunk_index);
141            let chunk = chunk.read();
142            let len = chunk.get_length() as u64;
143            if start < len {
144                chunk_offset = start as usize;
145                break;
146            }
147            start -= len;
148            chunk_index += 1;
149        }
150
151        FileRangeReference {
152            file: self.file.clone(),
153            start_chunk: chunk_index,
154            start_chunk_offset: chunk_offset,
155            bytes_count: file_range.end as usize - file_range.start as usize,
156        }
157    }
158
159    // pub fn get_typed_chunks_mut<T>(&mut self) -> Option<impl Iterator<Item = &mut [T]>> {
160    //     todo!();
161    //     Some((0..1).into_iter().map(|_| &mut [0, 1][..]))
162    // }
163}
164
165impl Read for FileReader {
166    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
167        let mut bytes_written = 0;
168
169        while bytes_written != buf.len() {
170            if self.current_len == 0 {
171                self.current_chunk_index += 1;
172                if self.current_chunk_index >= self.chunks_count {
173                    // End of file
174                    return Ok(bytes_written);
175                }
176                self.set_chunk_info(self.current_chunk_index);
177            }
178
179            let copyable_bytes = min(buf.len() - bytes_written, self.current_len);
180
181            unsafe {
182                std::ptr::copy_nonoverlapping(
183                    self.current_ptr,
184                    buf.as_mut_ptr().add(bytes_written),
185                    copyable_bytes,
186                );
187                self.current_ptr = self.current_ptr.add(copyable_bytes);
188                self.current_len -= copyable_bytes;
189                bytes_written += copyable_bytes;
190            }
191        }
192
193        Ok(bytes_written)
194    }
195
196    #[inline(always)]
197    fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
198        match self.read(buf) {
199            Ok(count) => {
200                if count == buf.len() {
201                    Ok(())
202                } else {
203                    Err(io::Error::new(
204                        io::ErrorKind::Other,
205                        "Unexpected error while reading",
206                    ))
207                }
208            }
209            Err(err) => Err(err),
210        }
211    }
212}
213
214impl Seek for FileReader {
215    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
216        match pos {
217            SeekFrom::Start(mut offset) => {
218                let mut chunk_idx = 0;
219
220                let file = self.file.read();
221                while chunk_idx < self.chunks_count {
222                    let len = file.get_chunk(chunk_idx).read().get_length();
223                    if offset < (len as u64) {
224                        break;
225                    }
226                    chunk_idx += 1;
227                    offset -= len as u64;
228                }
229
230                if chunk_idx == self.chunks_count {
231                    return Err(std::io::Error::new(
232                        ErrorKind::UnexpectedEof,
233                        "Unexpected eof",
234                    ));
235                }
236
237                self.current_chunk_index = chunk_idx;
238                drop(file);
239                self.set_chunk_info(chunk_idx);
240                unsafe {
241                    self.current_ptr = self.current_ptr.add(offset as usize);
242                    self.current_len -= offset as usize;
243                }
244
245                return Ok(offset);
246            }
247            _ => {
248                unimplemented!()
249            }
250        }
251    }
252
253    fn stream_position(&mut self) -> io::Result<u64> {
254        let mut position = 0;
255
256        let file_read = self.file.read();
257
258        for i in 0..self.current_chunk_index {
259            position += file_read.get_chunk(i).read().get_length();
260        }
261
262        position += file_read
263            .get_chunk(self.current_chunk_index)
264            .read()
265            .get_length()
266            - self.current_len;
267
268        Ok(position as u64)
269    }
270}