parallel_processor/buckets/readers/
async_binary_reader.rs

1use crate::buckets::bucket_writer::BucketItemSerializer;
2use crate::buckets::readers::compressed_binary_reader::CompressedBinaryReader;
3use crate::buckets::readers::generic_binary_reader::{ChunkDecoder, GenericChunkedBinaryReader};
4use crate::buckets::readers::lock_free_binary_reader::LockFreeBinaryReader;
5use crate::memory_fs::file::reader::FileRangeReference;
6use crate::memory_fs::RemoveFileMode;
7use crate::scheduler::{PriorityScheduler, ThreadPriorityHandle};
8use crossbeam::channel::*;
9use parking_lot::{Condvar, Mutex, RwLock, RwLockWriteGuard};
10use serde::de::DeserializeOwned;
11use std::cmp::min;
12use std::io::Read;
13use std::ops::Deref;
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::thread::JoinHandle;
17use std::time::Duration;
18
19use super::generic_binary_reader::ChunkReader;
20use super::BucketReader;
21
22#[derive(Clone)]
23enum OpenedFile {
24    NotOpened,
25    Plain(Arc<LockFreeBinaryReader>),
26    Compressed(Arc<CompressedBinaryReader>),
27    Finished,
28}
29
30impl OpenedFile {
31    pub fn is_finished(&self) -> bool {
32        match self {
33            OpenedFile::NotOpened => false,
34            OpenedFile::Finished => true,
35            OpenedFile::Plain(f) => f.is_finished(),
36            OpenedFile::Compressed(f) => f.is_finished(),
37        }
38    }
39
40    #[allow(dead_code)]
41    pub fn get_path(&self) -> PathBuf {
42        match self {
43            OpenedFile::Plain(f) => f.get_name(),
44            OpenedFile::Compressed(f) => f.get_name(),
45            _ => panic!("File not opened"),
46        }
47    }
48
49    pub fn get_chunks_count(&self) -> usize {
50        match self {
51            OpenedFile::Plain(file) => file.get_chunks_count(),
52            OpenedFile::Compressed(file) => file.get_chunks_count(),
53            OpenedFile::NotOpened | OpenedFile::Finished => 0,
54        }
55    }
56}
57
58pub enum AsyncReaderBuffer {
59    Passtrough {
60        file_range: FileRangeReference,
61        checkpoint_data: Option<Vec<u8>>,
62    },
63    Decompressed {
64        data: Vec<u8>,
65        checkpoint_data: Option<Vec<u8>>,
66        is_continuation: bool,
67    },
68    Closed,
69}
70
71impl Default for AsyncReaderBuffer {
72    fn default() -> Self {
73        Self::Closed
74    }
75}
76
77impl AsyncReaderBuffer {
78    fn into_buffer(self) -> Option<Vec<u8>> {
79        match self {
80            AsyncReaderBuffer::Passtrough { .. } | AsyncReaderBuffer::Closed => None,
81            AsyncReaderBuffer::Decompressed { data, .. } => Some(data),
82        }
83    }
84    fn is_continuation(&self) -> bool {
85        match self {
86            AsyncReaderBuffer::Passtrough { .. } | AsyncReaderBuffer::Closed => false,
87            AsyncReaderBuffer::Decompressed {
88                is_continuation, ..
89            } => *is_continuation,
90        }
91    }
92}
93
94pub enum AllowedCheckpointStrategy<T: ?Sized> {
95    DecompressOnly,
96    AllowPasstrough(Arc<dyn (Fn(Option<&T>) -> bool) + Sync + Send>),
97}
98
99impl Clone for AllowedCheckpointStrategy<[u8]> {
100    fn clone(&self) -> Self {
101        match self {
102            AllowedCheckpointStrategy::DecompressOnly => AllowedCheckpointStrategy::DecompressOnly,
103            AllowedCheckpointStrategy::AllowPasstrough(f) => {
104                AllowedCheckpointStrategy::AllowPasstrough(f.clone())
105            }
106        }
107    }
108}
109
110pub struct AsyncReaderThread {
111    buffers: (Sender<AsyncReaderBuffer>, Receiver<AsyncReaderBuffer>),
112    buffers_pool: (Sender<Vec<u8>>, Receiver<Vec<u8>>),
113    opened_file: Mutex<(OpenedFile, AllowedCheckpointStrategy<[u8]>)>,
114    file_wait_condvar: Condvar,
115    thread: Mutex<Option<JoinHandle<()>>>,
116}
117
118impl AsyncReaderThread {
119    pub fn new(buffers_size: usize, buffers_count: usize) -> Arc<Self> {
120        let buffers_pool = bounded(buffers_count);
121
122        for _ in 0..buffers_count {
123            buffers_pool
124                .0
125                .send(Vec::with_capacity(buffers_size))
126                .unwrap();
127        }
128
129        Arc::new(Self {
130            buffers: bounded(buffers_count),
131            buffers_pool,
132            opened_file: Mutex::new((
133                OpenedFile::Finished,
134                AllowedCheckpointStrategy::DecompressOnly,
135            )),
136            file_wait_condvar: Condvar::new(),
137            thread: Mutex::new(None),
138        })
139    }
140
141    fn read_thread(self: Arc<Self>) {
142        let mut current_stream_compr = None;
143        let mut current_stream_uncompr = None;
144
145        const READ_THREAD_PRIORITY: usize = 3;
146
147        let thread_handle = PriorityScheduler::declare_thread(READ_THREAD_PRIORITY);
148
149        while Arc::strong_count(&self) > 1 {
150            let mut file_guard = self.opened_file.lock();
151
152            let mut buffer = self.buffers_pool.1.recv().unwrap();
153            unsafe {
154                buffer.set_len(buffer.capacity());
155            }
156            let mut cached_buffer = Some(buffer);
157
158            fn read_buffer<D: ChunkDecoder>(
159                file: &GenericChunkedBinaryReader<D>,
160                stream: &mut Option<ChunkReader<Vec<u8>, D::ReaderType>>,
161                allowed_strategy: AllowedCheckpointStrategy<[u8]>,
162                cached_buffer: &mut Option<Vec<u8>>,
163                thread_handle: &ThreadPriorityHandle,
164            ) -> Option<AsyncReaderBuffer> {
165                let mut total_read_bytes = 0;
166                let mut checkpoint_data = None;
167                let mut is_continuation = true;
168
169                let out_buffer = loop {
170                    if stream.is_none() {
171                        is_continuation = false;
172
173                        *stream = file.get_read_parallel_stream(allowed_strategy.clone());
174
175                        match &stream {
176                            Some(stream_) => match stream_ {
177                                ChunkReader::Reader(_, data) => checkpoint_data = data.clone(),
178                                ChunkReader::Passtrough { file_range, data } => {
179                                    // Just pass the file range and take the current stream
180                                    let file_range = file_range.clone();
181                                    let checkpoint_data = data.clone();
182
183                                    stream.take();
184                                    return Some(AsyncReaderBuffer::Passtrough {
185                                        file_range,
186                                        checkpoint_data,
187                                    });
188                                }
189                            },
190                            // File finished
191                            None => return None,
192                        }
193                    }
194
195                    let reader_stream = stream.as_mut().unwrap();
196
197                    let out_buffer = cached_buffer.as_mut().unwrap();
198
199                    let mut last_read = usize::MAX;
200                    while total_read_bytes < out_buffer.len() && last_read > 0 {
201                        last_read = match reader_stream {
202                            ChunkReader::Reader(reader, _) => {
203                                PriorityScheduler::execute_blocking_call(thread_handle, || {
204                                    reader.read(&mut out_buffer[total_read_bytes..]).unwrap()
205                                })
206                            }
207                            _ => unreachable!(),
208                        };
209                        total_read_bytes += last_read;
210                    }
211
212                    if last_read == 0 {
213                        // Current stream finished
214                        stream.take();
215                    }
216
217                    // Avoid passing 0-sized buffers
218                    if total_read_bytes > 0 {
219                        out_buffer.truncate(total_read_bytes);
220                        break cached_buffer.take().unwrap();
221                    }
222                };
223
224                Some(AsyncReaderBuffer::Decompressed {
225                    data: out_buffer,
226                    checkpoint_data,
227                    is_continuation,
228                })
229            }
230
231            let allowed_strategy = file_guard.1.clone();
232
233            let data = match &mut file_guard.0 {
234                OpenedFile::NotOpened | OpenedFile::Finished => {
235                    self.file_wait_condvar
236                        .wait_for(&mut file_guard, Duration::from_secs(5));
237                    let _ = self.buffers_pool.0.send(cached_buffer.take().unwrap());
238                    continue;
239                }
240                OpenedFile::Plain(file) => read_buffer(
241                    &file,
242                    &mut current_stream_uncompr,
243                    allowed_strategy,
244                    &mut cached_buffer,
245                    &thread_handle,
246                ),
247                OpenedFile::Compressed(file) => read_buffer(
248                    file,
249                    &mut current_stream_compr,
250                    allowed_strategy,
251                    &mut cached_buffer,
252                    &thread_handle,
253                ),
254            };
255
256            match data {
257                Some(data) => {
258                    let _ = self.buffers.0.send(data);
259                }
260                None => {
261                    // File completely read
262                    current_stream_compr = None;
263                    current_stream_uncompr = None;
264                    file_guard.0 = OpenedFile::Finished;
265                    // Clear the current closure
266                    file_guard.1 = AllowedCheckpointStrategy::DecompressOnly;
267                    let _ = self.buffers.0.send(AsyncReaderBuffer::Closed);
268                }
269            }
270
271            if let Some(buffer) = cached_buffer {
272                // Add back the buffer to the pool if it was not used
273                let _ = self.buffers_pool.0.send(buffer);
274            }
275        }
276    }
277
278    fn read_bucket<'a, T: DeserializeOwned + 'static>(
279        self: Arc<Self>,
280        new_opened_file: OpenedFile,
281        allowed_strategy: AllowedCheckpointStrategy<T>,
282        thread_handle: &'a ThreadPriorityHandle,
283    ) -> AsyncStreamThreadReader<'a> {
284        let mut opened_file = self.opened_file.lock();
285
286        // Ensure that the previous file is finished
287        match &opened_file.0 {
288            OpenedFile::Finished => {}
289            _ => panic!("File not finished!"),
290        }
291
292        *opened_file = (
293            new_opened_file,
294            match allowed_strategy {
295                AllowedCheckpointStrategy::DecompressOnly => {
296                    AllowedCheckpointStrategy::DecompressOnly
297                }
298                AllowedCheckpointStrategy::AllowPasstrough(f) => {
299                    AllowedCheckpointStrategy::AllowPasstrough(Arc::new(
300                        move |data: Option<&[u8]>| {
301                            let data = data.map(|data| {
302                                bincode::deserialize(data)
303                                    .expect("Failed to deserialize checkpoint data")
304                            });
305                            f(data.as_ref())
306                        },
307                    ))
308                }
309            },
310        );
311
312        self.file_wait_condvar.notify_all();
313        drop(opened_file);
314
315        let stream_recv = self.buffers.1.clone();
316        let owner = self.clone();
317
318        let mut thread = self.thread.lock();
319        let mt_self = self.clone();
320        if thread.is_none() {
321            *thread = Some(
322                std::thread::Builder::new()
323                    .name(String::from("async_reader"))
324                    .spawn(move || {
325                        mt_self.read_thread();
326                    })
327                    .unwrap(),
328            );
329        }
330        drop(thread);
331
332        let current = stream_recv.recv().unwrap();
333
334        AsyncStreamThreadReader {
335            receiver: stream_recv,
336            owner,
337            current,
338            current_pos: 0,
339            checkpoint_finished: true,
340            stream_finished: false,
341            thread_handle,
342        }
343    }
344}
345
346struct AsyncStreamThreadReader<'a> {
347    receiver: Receiver<AsyncReaderBuffer>,
348    owner: Arc<AsyncReaderThread>,
349    current: AsyncReaderBuffer,
350    current_pos: usize,
351    checkpoint_finished: bool,
352    stream_finished: bool,
353    thread_handle: &'a ThreadPriorityHandle,
354}
355
356enum AsyncCheckpointInfo<T> {
357    Stream(Option<T>),
358    Passtrough {
359        file_range: FileRangeReference,
360        checkpoint_data: Option<T>,
361    },
362}
363
364impl<'a> AsyncStreamThreadReader<'a> {
365    fn get_checkpoint_info_and_reset_reader<T: DeserializeOwned>(
366        &mut self,
367    ) -> Option<AsyncCheckpointInfo<T>> {
368        assert!(self.checkpoint_finished);
369
370        if self.stream_finished {
371            return None;
372        }
373
374        match &self.current {
375            AsyncReaderBuffer::Closed => {
376                self.stream_finished = true;
377                None
378            }
379            AsyncReaderBuffer::Passtrough {
380                file_range,
381                checkpoint_data,
382            } => {
383                let info = AsyncCheckpointInfo::Passtrough {
384                    checkpoint_data: checkpoint_data.as_ref().map(|data| {
385                        bincode::deserialize(data).expect("Failed to deserialize checkpoint data")
386                    }),
387                    file_range: file_range.clone(),
388                };
389
390                // This buffer is now used, change it
391                PriorityScheduler::execute_blocking_call(&mut self.thread_handle, || {
392                    self.current = self.receiver.recv().unwrap();
393                });
394
395                Some(info)
396            }
397            AsyncReaderBuffer::Decompressed {
398                checkpoint_data, ..
399            } => {
400                self.checkpoint_finished = false;
401                Some(AsyncCheckpointInfo::Stream(checkpoint_data.as_ref().map(
402                    |data| {
403                        bincode::deserialize(data).expect("Failed to deserialize checkpoint data")
404                    },
405                )))
406            }
407        }
408    }
409}
410
411impl<'a> Read for AsyncStreamThreadReader<'a> {
412    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
413        let mut bytes_read = 0;
414        loop {
415            if self.checkpoint_finished {
416                return Ok(bytes_read);
417            }
418
419            match &self.current {
420                AsyncReaderBuffer::Closed => {
421                    self.checkpoint_finished = true;
422                    return Ok(bytes_read);
423                }
424                AsyncReaderBuffer::Passtrough { .. } => unreachable!(),
425                AsyncReaderBuffer::Decompressed { data, .. } => {
426                    if self.current_pos == data.len() {
427                        if let Some(buffer) = std::mem::replace(
428                            &mut self.current,
429                            PriorityScheduler::execute_blocking_call(
430                                &mut self.thread_handle,
431                                || self.receiver.recv().unwrap(),
432                            ),
433                        )
434                        .into_buffer()
435                        {
436                            PriorityScheduler::execute_blocking_call(
437                                &mut self.thread_handle,
438                                || {
439                                    let _ = self.owner.buffers_pool.0.send(buffer);
440                                },
441                            );
442                        }
443                        self.current_pos = 0;
444                        self.checkpoint_finished = !self.current.is_continuation();
445                        continue;
446                    }
447
448                    let avail = data.len() - self.current_pos;
449                    let to_read = min(buf.len() - bytes_read, avail);
450                    buf[bytes_read..(bytes_read + to_read)]
451                        .copy_from_slice(&data[self.current_pos..(self.current_pos + to_read)]);
452                    bytes_read += to_read;
453                    self.current_pos += to_read;
454
455                    if bytes_read == buf.len() {
456                        return Ok(bytes_read);
457                    }
458                }
459            }
460        }
461    }
462}
463
464impl<'a> Drop for AsyncStreamThreadReader<'a> {
465    fn drop(&mut self) {
466        assert!(matches!(self.current, AsyncReaderBuffer::Closed));
467    }
468}
469
470pub struct AsyncBinaryReader {
471    path: PathBuf,
472    opened_file: RwLock<OpenedFile>,
473    compressed: bool,
474    remove_file: RemoveFileMode,
475    prefetch: Option<usize>,
476}
477
478impl AsyncBinaryReader {
479    fn open_file(
480        path: &PathBuf,
481        compressed: bool,
482        remove_file: RemoveFileMode,
483        prefetch: Option<usize>,
484    ) -> OpenedFile {
485        if compressed {
486            OpenedFile::Compressed(Arc::new(CompressedBinaryReader::new(
487                path,
488                remove_file,
489                None,
490            )))
491        } else {
492            OpenedFile::Plain(Arc::new(LockFreeBinaryReader::new(
493                path,
494                remove_file,
495                prefetch,
496            )))
497        }
498    }
499
500    pub fn new(
501        path: &PathBuf,
502        compressed: bool,
503        remove_file: RemoveFileMode,
504        prefetch: Option<usize>,
505    ) -> Self {
506        Self {
507            path: path.clone(),
508            opened_file: RwLock::new(OpenedFile::NotOpened),
509            compressed,
510            remove_file,
511            prefetch,
512        }
513    }
514
515    fn with_opened_file<T>(&self, f: impl FnOnce(&OpenedFile) -> T) -> T {
516        let tmp_file;
517        let opened_file = &self.opened_file.read();
518        let file = match opened_file.deref() {
519            OpenedFile::NotOpened | OpenedFile::Finished => {
520                tmp_file = Self::open_file(&self.path, self.compressed, RemoveFileMode::Keep, None);
521                &tmp_file
522            }
523            file => file,
524        };
525        f(file)
526    }
527
528    pub fn get_data_format_info<T: DeserializeOwned>(&self) -> Option<T> {
529        self.with_opened_file(|file| match file {
530            OpenedFile::Plain(file) => Some(file.get_data_format_info()),
531            OpenedFile::Compressed(file) => Some(file.get_data_format_info()),
532            OpenedFile::NotOpened | OpenedFile::Finished => None,
533        })
534    }
535
536    pub fn get_chunks_count(&self) -> usize {
537        self.with_opened_file(|file| file.get_chunks_count())
538    }
539
540    pub fn get_file_size(&self) -> usize {
541        self.with_opened_file(|file| match file {
542            OpenedFile::Plain(file) => file.get_length(),
543            OpenedFile::Compressed(file) => file.get_length(),
544            OpenedFile::NotOpened | OpenedFile::Finished => 0,
545        })
546    }
547}
548
549impl AsyncBinaryReader {
550    pub fn is_finished(&self) -> bool {
551        self.opened_file.read().is_finished()
552    }
553
554    pub fn get_items_stream<'a, S: BucketItemSerializer>(
555        &self,
556        read_thread: Arc<AsyncReaderThread>,
557        buffer: S::ReadBuffer,
558        extra_buffer: S::ExtraDataBuffer,
559        allowed_strategy: AllowedCheckpointStrategy<S::CheckpointData>,
560        thread_handle: &'a ThreadPriorityHandle,
561        deserializer_init_data: S::InitData,
562    ) -> AsyncBinaryReaderItemsIterator<'a, S> {
563        let mut opened_file = self.opened_file.read();
564        if matches!(*opened_file, OpenedFile::NotOpened) {
565            drop(opened_file);
566            let mut writable = self.opened_file.write();
567            if matches!(*writable, OpenedFile::NotOpened) {
568                *writable =
569                    Self::open_file(&self.path, self.compressed, self.remove_file, self.prefetch);
570            }
571            opened_file = RwLockWriteGuard::downgrade(writable);
572        }
573
574        let stream = read_thread.read_bucket(opened_file.clone(), allowed_strategy, thread_handle);
575        AsyncBinaryReaderItemsIterator::<_> {
576            buffer,
577            extra_buffer,
578            stream,
579            deserializer: S::new(deserializer_init_data),
580        }
581    }
582
583    pub fn get_name(&self) -> PathBuf {
584        self.path.clone()
585    }
586}
587
588pub struct AsyncBinaryReaderItemsIterator<'a, S: BucketItemSerializer> {
589    buffer: S::ReadBuffer,
590    extra_buffer: S::ExtraDataBuffer,
591    stream: AsyncStreamThreadReader<'a>,
592    deserializer: S,
593}
594
595pub enum AsyncBinaryReaderIteratorData<'a, S: BucketItemSerializer> {
596    Stream(
597        &'a mut AsyncBinaryReaderItemsIteratorCheckpoint<'a, S>,
598        Option<S::CheckpointData>,
599    ),
600    Passtrough {
601        file_range: FileRangeReference,
602        checkpoint_data: Option<S::CheckpointData>,
603    },
604}
605
606impl<'a, S: BucketItemSerializer> AsyncBinaryReaderItemsIterator<'a, S> {
607    pub fn get_next_checkpoint_extended(&mut self) -> Option<AsyncBinaryReaderIteratorData<S>> {
608        let info = self.stream.get_checkpoint_info_and_reset_reader()?;
609        Some(match info {
610            AsyncCheckpointInfo::Stream(data) => {
611                AsyncBinaryReaderIteratorData::Stream(unsafe { std::mem::transmute(self) }, data)
612            }
613            AsyncCheckpointInfo::Passtrough {
614                file_range,
615                checkpoint_data,
616            } => AsyncBinaryReaderIteratorData::Passtrough {
617                file_range,
618                checkpoint_data,
619            },
620        })
621    }
622}
623
624impl<'a, S: BucketItemSerializer> AsyncBinaryReaderItemsIterator<'a, S> {
625    pub fn get_next_checkpoint(
626        &mut self,
627    ) -> Option<(
628        &mut AsyncBinaryReaderItemsIteratorCheckpoint<S>,
629        Option<S::CheckpointData>,
630    )> {
631        let info = self.stream.get_checkpoint_info_and_reset_reader()?;
632        Some(match info {
633            AsyncCheckpointInfo::Stream(data) => (unsafe { std::mem::transmute(self) }, data),
634            AsyncCheckpointInfo::Passtrough { .. } => unreachable!(),
635        })
636    }
637}
638
639#[repr(transparent)]
640pub struct AsyncBinaryReaderItemsIteratorCheckpoint<'a, S: BucketItemSerializer>(
641    AsyncBinaryReaderItemsIterator<'a, S>,
642);
643
644impl<'a, S: BucketItemSerializer> AsyncBinaryReaderItemsIteratorCheckpoint<'a, S> {
645    pub fn next(&mut self) -> Option<(S::ReadType<'_>, &mut S::ExtraDataBuffer)> {
646        let item = self.0.deserializer.read_from(
647            &mut self.0.stream,
648            &mut self.0.buffer,
649            &mut self.0.extra_buffer,
650        )?;
651        Some((item, &mut self.0.extra_buffer))
652    }
653}