1use crate::buckets::bucket_writer::BucketItemSerializer;
2use crate::buckets::readers::compressed_binary_reader::CompressedBinaryReader;
3use crate::buckets::readers::generic_binary_reader::{ChunkDecoder, GenericChunkedBinaryReader};
4use crate::buckets::readers::lock_free_binary_reader::LockFreeBinaryReader;
5use crate::memory_fs::file::reader::FileRangeReference;
6use crate::memory_fs::RemoveFileMode;
7use crate::scheduler::{PriorityScheduler, ThreadPriorityHandle};
8use crossbeam::channel::*;
9use parking_lot::{Condvar, Mutex, RwLock, RwLockWriteGuard};
10use serde::de::DeserializeOwned;
11use std::cmp::min;
12use std::io::Read;
13use std::ops::Deref;
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::thread::JoinHandle;
17use std::time::Duration;
18
19use super::generic_binary_reader::ChunkReader;
20use super::BucketReader;
21
22#[derive(Clone)]
23enum OpenedFile {
24 NotOpened,
25 Plain(Arc<LockFreeBinaryReader>),
26 Compressed(Arc<CompressedBinaryReader>),
27 Finished,
28}
29
30impl OpenedFile {
31 pub fn is_finished(&self) -> bool {
32 match self {
33 OpenedFile::NotOpened => false,
34 OpenedFile::Finished => true,
35 OpenedFile::Plain(f) => f.is_finished(),
36 OpenedFile::Compressed(f) => f.is_finished(),
37 }
38 }
39
40 #[allow(dead_code)]
41 pub fn get_path(&self) -> PathBuf {
42 match self {
43 OpenedFile::Plain(f) => f.get_name(),
44 OpenedFile::Compressed(f) => f.get_name(),
45 _ => panic!("File not opened"),
46 }
47 }
48
49 pub fn get_chunks_count(&self) -> usize {
50 match self {
51 OpenedFile::Plain(file) => file.get_chunks_count(),
52 OpenedFile::Compressed(file) => file.get_chunks_count(),
53 OpenedFile::NotOpened | OpenedFile::Finished => 0,
54 }
55 }
56}
57
58pub enum AsyncReaderBuffer {
59 Passtrough {
60 file_range: FileRangeReference,
61 checkpoint_data: Option<Vec<u8>>,
62 },
63 Decompressed {
64 data: Vec<u8>,
65 checkpoint_data: Option<Vec<u8>>,
66 is_continuation: bool,
67 },
68 Closed,
69}
70
71impl Default for AsyncReaderBuffer {
72 fn default() -> Self {
73 Self::Closed
74 }
75}
76
77impl AsyncReaderBuffer {
78 fn into_buffer(self) -> Option<Vec<u8>> {
79 match self {
80 AsyncReaderBuffer::Passtrough { .. } | AsyncReaderBuffer::Closed => None,
81 AsyncReaderBuffer::Decompressed { data, .. } => Some(data),
82 }
83 }
84 fn is_continuation(&self) -> bool {
85 match self {
86 AsyncReaderBuffer::Passtrough { .. } | AsyncReaderBuffer::Closed => false,
87 AsyncReaderBuffer::Decompressed {
88 is_continuation, ..
89 } => *is_continuation,
90 }
91 }
92}
93
94pub enum AllowedCheckpointStrategy<T: ?Sized> {
95 DecompressOnly,
96 AllowPasstrough(Arc<dyn (Fn(Option<&T>) -> bool) + Sync + Send>),
97}
98
99impl Clone for AllowedCheckpointStrategy<[u8]> {
100 fn clone(&self) -> Self {
101 match self {
102 AllowedCheckpointStrategy::DecompressOnly => AllowedCheckpointStrategy::DecompressOnly,
103 AllowedCheckpointStrategy::AllowPasstrough(f) => {
104 AllowedCheckpointStrategy::AllowPasstrough(f.clone())
105 }
106 }
107 }
108}
109
110pub struct AsyncReaderThread {
111 buffers: (Sender<AsyncReaderBuffer>, Receiver<AsyncReaderBuffer>),
112 buffers_pool: (Sender<Vec<u8>>, Receiver<Vec<u8>>),
113 opened_file: Mutex<(OpenedFile, AllowedCheckpointStrategy<[u8]>)>,
114 file_wait_condvar: Condvar,
115 thread: Mutex<Option<JoinHandle<()>>>,
116}
117
118impl AsyncReaderThread {
119 pub fn new(buffers_size: usize, buffers_count: usize) -> Arc<Self> {
120 let buffers_pool = bounded(buffers_count);
121
122 for _ in 0..buffers_count {
123 buffers_pool
124 .0
125 .send(Vec::with_capacity(buffers_size))
126 .unwrap();
127 }
128
129 Arc::new(Self {
130 buffers: bounded(buffers_count),
131 buffers_pool,
132 opened_file: Mutex::new((
133 OpenedFile::Finished,
134 AllowedCheckpointStrategy::DecompressOnly,
135 )),
136 file_wait_condvar: Condvar::new(),
137 thread: Mutex::new(None),
138 })
139 }
140
141 fn read_thread(self: Arc<Self>) {
142 let mut current_stream_compr = None;
143 let mut current_stream_uncompr = None;
144
145 const READ_THREAD_PRIORITY: usize = 3;
146
147 let thread_handle = PriorityScheduler::declare_thread(READ_THREAD_PRIORITY);
148
149 while Arc::strong_count(&self) > 1 {
150 let mut file_guard = self.opened_file.lock();
151
152 let mut buffer = self.buffers_pool.1.recv().unwrap();
153 unsafe {
154 buffer.set_len(buffer.capacity());
155 }
156 let mut cached_buffer = Some(buffer);
157
158 fn read_buffer<D: ChunkDecoder>(
159 file: &GenericChunkedBinaryReader<D>,
160 stream: &mut Option<ChunkReader<Vec<u8>, D::ReaderType>>,
161 allowed_strategy: AllowedCheckpointStrategy<[u8]>,
162 cached_buffer: &mut Option<Vec<u8>>,
163 thread_handle: &ThreadPriorityHandle,
164 ) -> Option<AsyncReaderBuffer> {
165 let mut total_read_bytes = 0;
166 let mut checkpoint_data = None;
167 let mut is_continuation = true;
168
169 let out_buffer = loop {
170 if stream.is_none() {
171 is_continuation = false;
172
173 *stream = file.get_read_parallel_stream(allowed_strategy.clone());
174
175 match &stream {
176 Some(stream_) => match stream_ {
177 ChunkReader::Reader(_, data) => checkpoint_data = data.clone(),
178 ChunkReader::Passtrough { file_range, data } => {
179 let file_range = file_range.clone();
181 let checkpoint_data = data.clone();
182
183 stream.take();
184 return Some(AsyncReaderBuffer::Passtrough {
185 file_range,
186 checkpoint_data,
187 });
188 }
189 },
190 None => return None,
192 }
193 }
194
195 let reader_stream = stream.as_mut().unwrap();
196
197 let out_buffer = cached_buffer.as_mut().unwrap();
198
199 let mut last_read = usize::MAX;
200 while total_read_bytes < out_buffer.len() && last_read > 0 {
201 last_read = match reader_stream {
202 ChunkReader::Reader(reader, _) => {
203 PriorityScheduler::execute_blocking_call(thread_handle, || {
204 reader.read(&mut out_buffer[total_read_bytes..]).unwrap()
205 })
206 }
207 _ => unreachable!(),
208 };
209 total_read_bytes += last_read;
210 }
211
212 if last_read == 0 {
213 stream.take();
215 }
216
217 if total_read_bytes > 0 {
219 out_buffer.truncate(total_read_bytes);
220 break cached_buffer.take().unwrap();
221 }
222 };
223
224 Some(AsyncReaderBuffer::Decompressed {
225 data: out_buffer,
226 checkpoint_data,
227 is_continuation,
228 })
229 }
230
231 let allowed_strategy = file_guard.1.clone();
232
233 let data = match &mut file_guard.0 {
234 OpenedFile::NotOpened | OpenedFile::Finished => {
235 self.file_wait_condvar
236 .wait_for(&mut file_guard, Duration::from_secs(5));
237 let _ = self.buffers_pool.0.send(cached_buffer.take().unwrap());
238 continue;
239 }
240 OpenedFile::Plain(file) => read_buffer(
241 &file,
242 &mut current_stream_uncompr,
243 allowed_strategy,
244 &mut cached_buffer,
245 &thread_handle,
246 ),
247 OpenedFile::Compressed(file) => read_buffer(
248 file,
249 &mut current_stream_compr,
250 allowed_strategy,
251 &mut cached_buffer,
252 &thread_handle,
253 ),
254 };
255
256 match data {
257 Some(data) => {
258 let _ = self.buffers.0.send(data);
259 }
260 None => {
261 current_stream_compr = None;
263 current_stream_uncompr = None;
264 file_guard.0 = OpenedFile::Finished;
265 file_guard.1 = AllowedCheckpointStrategy::DecompressOnly;
267 let _ = self.buffers.0.send(AsyncReaderBuffer::Closed);
268 }
269 }
270
271 if let Some(buffer) = cached_buffer {
272 let _ = self.buffers_pool.0.send(buffer);
274 }
275 }
276 }
277
278 fn read_bucket<'a, T: DeserializeOwned + 'static>(
279 self: Arc<Self>,
280 new_opened_file: OpenedFile,
281 allowed_strategy: AllowedCheckpointStrategy<T>,
282 thread_handle: &'a ThreadPriorityHandle,
283 ) -> AsyncStreamThreadReader<'a> {
284 let mut opened_file = self.opened_file.lock();
285
286 match &opened_file.0 {
288 OpenedFile::Finished => {}
289 _ => panic!("File not finished!"),
290 }
291
292 *opened_file = (
293 new_opened_file,
294 match allowed_strategy {
295 AllowedCheckpointStrategy::DecompressOnly => {
296 AllowedCheckpointStrategy::DecompressOnly
297 }
298 AllowedCheckpointStrategy::AllowPasstrough(f) => {
299 AllowedCheckpointStrategy::AllowPasstrough(Arc::new(
300 move |data: Option<&[u8]>| {
301 let data = data.map(|data| {
302 bincode::deserialize(data)
303 .expect("Failed to deserialize checkpoint data")
304 });
305 f(data.as_ref())
306 },
307 ))
308 }
309 },
310 );
311
312 self.file_wait_condvar.notify_all();
313 drop(opened_file);
314
315 let stream_recv = self.buffers.1.clone();
316 let owner = self.clone();
317
318 let mut thread = self.thread.lock();
319 let mt_self = self.clone();
320 if thread.is_none() {
321 *thread = Some(
322 std::thread::Builder::new()
323 .name(String::from("async_reader"))
324 .spawn(move || {
325 mt_self.read_thread();
326 })
327 .unwrap(),
328 );
329 }
330 drop(thread);
331
332 let current = stream_recv.recv().unwrap();
333
334 AsyncStreamThreadReader {
335 receiver: stream_recv,
336 owner,
337 current,
338 current_pos: 0,
339 checkpoint_finished: true,
340 stream_finished: false,
341 thread_handle,
342 }
343 }
344}
345
346struct AsyncStreamThreadReader<'a> {
347 receiver: Receiver<AsyncReaderBuffer>,
348 owner: Arc<AsyncReaderThread>,
349 current: AsyncReaderBuffer,
350 current_pos: usize,
351 checkpoint_finished: bool,
352 stream_finished: bool,
353 thread_handle: &'a ThreadPriorityHandle,
354}
355
356enum AsyncCheckpointInfo<T> {
357 Stream(Option<T>),
358 Passtrough {
359 file_range: FileRangeReference,
360 checkpoint_data: Option<T>,
361 },
362}
363
364impl<'a> AsyncStreamThreadReader<'a> {
365 fn get_checkpoint_info_and_reset_reader<T: DeserializeOwned>(
366 &mut self,
367 ) -> Option<AsyncCheckpointInfo<T>> {
368 assert!(self.checkpoint_finished);
369
370 if self.stream_finished {
371 return None;
372 }
373
374 match &self.current {
375 AsyncReaderBuffer::Closed => {
376 self.stream_finished = true;
377 None
378 }
379 AsyncReaderBuffer::Passtrough {
380 file_range,
381 checkpoint_data,
382 } => {
383 let info = AsyncCheckpointInfo::Passtrough {
384 checkpoint_data: checkpoint_data.as_ref().map(|data| {
385 bincode::deserialize(data).expect("Failed to deserialize checkpoint data")
386 }),
387 file_range: file_range.clone(),
388 };
389
390 PriorityScheduler::execute_blocking_call(&mut self.thread_handle, || {
392 self.current = self.receiver.recv().unwrap();
393 });
394
395 Some(info)
396 }
397 AsyncReaderBuffer::Decompressed {
398 checkpoint_data, ..
399 } => {
400 self.checkpoint_finished = false;
401 Some(AsyncCheckpointInfo::Stream(checkpoint_data.as_ref().map(
402 |data| {
403 bincode::deserialize(data).expect("Failed to deserialize checkpoint data")
404 },
405 )))
406 }
407 }
408 }
409}
410
411impl<'a> Read for AsyncStreamThreadReader<'a> {
412 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
413 let mut bytes_read = 0;
414 loop {
415 if self.checkpoint_finished {
416 return Ok(bytes_read);
417 }
418
419 match &self.current {
420 AsyncReaderBuffer::Closed => {
421 self.checkpoint_finished = true;
422 return Ok(bytes_read);
423 }
424 AsyncReaderBuffer::Passtrough { .. } => unreachable!(),
425 AsyncReaderBuffer::Decompressed { data, .. } => {
426 if self.current_pos == data.len() {
427 if let Some(buffer) = std::mem::replace(
428 &mut self.current,
429 PriorityScheduler::execute_blocking_call(
430 &mut self.thread_handle,
431 || self.receiver.recv().unwrap(),
432 ),
433 )
434 .into_buffer()
435 {
436 PriorityScheduler::execute_blocking_call(
437 &mut self.thread_handle,
438 || {
439 let _ = self.owner.buffers_pool.0.send(buffer);
440 },
441 );
442 }
443 self.current_pos = 0;
444 self.checkpoint_finished = !self.current.is_continuation();
445 continue;
446 }
447
448 let avail = data.len() - self.current_pos;
449 let to_read = min(buf.len() - bytes_read, avail);
450 buf[bytes_read..(bytes_read + to_read)]
451 .copy_from_slice(&data[self.current_pos..(self.current_pos + to_read)]);
452 bytes_read += to_read;
453 self.current_pos += to_read;
454
455 if bytes_read == buf.len() {
456 return Ok(bytes_read);
457 }
458 }
459 }
460 }
461 }
462}
463
464impl<'a> Drop for AsyncStreamThreadReader<'a> {
465 fn drop(&mut self) {
466 assert!(matches!(self.current, AsyncReaderBuffer::Closed));
467 }
468}
469
470pub struct AsyncBinaryReader {
471 path: PathBuf,
472 opened_file: RwLock<OpenedFile>,
473 compressed: bool,
474 remove_file: RemoveFileMode,
475 prefetch: Option<usize>,
476}
477
478impl AsyncBinaryReader {
479 fn open_file(
480 path: &PathBuf,
481 compressed: bool,
482 remove_file: RemoveFileMode,
483 prefetch: Option<usize>,
484 ) -> OpenedFile {
485 if compressed {
486 OpenedFile::Compressed(Arc::new(CompressedBinaryReader::new(
487 path,
488 remove_file,
489 None,
490 )))
491 } else {
492 OpenedFile::Plain(Arc::new(LockFreeBinaryReader::new(
493 path,
494 remove_file,
495 prefetch,
496 )))
497 }
498 }
499
500 pub fn new(
501 path: &PathBuf,
502 compressed: bool,
503 remove_file: RemoveFileMode,
504 prefetch: Option<usize>,
505 ) -> Self {
506 Self {
507 path: path.clone(),
508 opened_file: RwLock::new(OpenedFile::NotOpened),
509 compressed,
510 remove_file,
511 prefetch,
512 }
513 }
514
515 fn with_opened_file<T>(&self, f: impl FnOnce(&OpenedFile) -> T) -> T {
516 let tmp_file;
517 let opened_file = &self.opened_file.read();
518 let file = match opened_file.deref() {
519 OpenedFile::NotOpened | OpenedFile::Finished => {
520 tmp_file = Self::open_file(&self.path, self.compressed, RemoveFileMode::Keep, None);
521 &tmp_file
522 }
523 file => file,
524 };
525 f(file)
526 }
527
528 pub fn get_data_format_info<T: DeserializeOwned>(&self) -> Option<T> {
529 self.with_opened_file(|file| match file {
530 OpenedFile::Plain(file) => Some(file.get_data_format_info()),
531 OpenedFile::Compressed(file) => Some(file.get_data_format_info()),
532 OpenedFile::NotOpened | OpenedFile::Finished => None,
533 })
534 }
535
536 pub fn get_chunks_count(&self) -> usize {
537 self.with_opened_file(|file| file.get_chunks_count())
538 }
539
540 pub fn get_file_size(&self) -> usize {
541 self.with_opened_file(|file| match file {
542 OpenedFile::Plain(file) => file.get_length(),
543 OpenedFile::Compressed(file) => file.get_length(),
544 OpenedFile::NotOpened | OpenedFile::Finished => 0,
545 })
546 }
547}
548
549impl AsyncBinaryReader {
550 pub fn is_finished(&self) -> bool {
551 self.opened_file.read().is_finished()
552 }
553
554 pub fn get_items_stream<'a, S: BucketItemSerializer>(
555 &self,
556 read_thread: Arc<AsyncReaderThread>,
557 buffer: S::ReadBuffer,
558 extra_buffer: S::ExtraDataBuffer,
559 allowed_strategy: AllowedCheckpointStrategy<S::CheckpointData>,
560 thread_handle: &'a ThreadPriorityHandle,
561 deserializer_init_data: S::InitData,
562 ) -> AsyncBinaryReaderItemsIterator<'a, S> {
563 let mut opened_file = self.opened_file.read();
564 if matches!(*opened_file, OpenedFile::NotOpened) {
565 drop(opened_file);
566 let mut writable = self.opened_file.write();
567 if matches!(*writable, OpenedFile::NotOpened) {
568 *writable =
569 Self::open_file(&self.path, self.compressed, self.remove_file, self.prefetch);
570 }
571 opened_file = RwLockWriteGuard::downgrade(writable);
572 }
573
574 let stream = read_thread.read_bucket(opened_file.clone(), allowed_strategy, thread_handle);
575 AsyncBinaryReaderItemsIterator::<_> {
576 buffer,
577 extra_buffer,
578 stream,
579 deserializer: S::new(deserializer_init_data),
580 }
581 }
582
583 pub fn get_name(&self) -> PathBuf {
584 self.path.clone()
585 }
586}
587
588pub struct AsyncBinaryReaderItemsIterator<'a, S: BucketItemSerializer> {
589 buffer: S::ReadBuffer,
590 extra_buffer: S::ExtraDataBuffer,
591 stream: AsyncStreamThreadReader<'a>,
592 deserializer: S,
593}
594
595pub enum AsyncBinaryReaderIteratorData<'a, S: BucketItemSerializer> {
596 Stream(
597 &'a mut AsyncBinaryReaderItemsIteratorCheckpoint<'a, S>,
598 Option<S::CheckpointData>,
599 ),
600 Passtrough {
601 file_range: FileRangeReference,
602 checkpoint_data: Option<S::CheckpointData>,
603 },
604}
605
606impl<'a, S: BucketItemSerializer> AsyncBinaryReaderItemsIterator<'a, S> {
607 pub fn get_next_checkpoint_extended(&mut self) -> Option<AsyncBinaryReaderIteratorData<S>> {
608 let info = self.stream.get_checkpoint_info_and_reset_reader()?;
609 Some(match info {
610 AsyncCheckpointInfo::Stream(data) => {
611 AsyncBinaryReaderIteratorData::Stream(unsafe { std::mem::transmute(self) }, data)
612 }
613 AsyncCheckpointInfo::Passtrough {
614 file_range,
615 checkpoint_data,
616 } => AsyncBinaryReaderIteratorData::Passtrough {
617 file_range,
618 checkpoint_data,
619 },
620 })
621 }
622}
623
624impl<'a, S: BucketItemSerializer> AsyncBinaryReaderItemsIterator<'a, S> {
625 pub fn get_next_checkpoint(
626 &mut self,
627 ) -> Option<(
628 &mut AsyncBinaryReaderItemsIteratorCheckpoint<S>,
629 Option<S::CheckpointData>,
630 )> {
631 let info = self.stream.get_checkpoint_info_and_reset_reader()?;
632 Some(match info {
633 AsyncCheckpointInfo::Stream(data) => (unsafe { std::mem::transmute(self) }, data),
634 AsyncCheckpointInfo::Passtrough { .. } => unreachable!(),
635 })
636 }
637}
638
639#[repr(transparent)]
640pub struct AsyncBinaryReaderItemsIteratorCheckpoint<'a, S: BucketItemSerializer>(
641 AsyncBinaryReaderItemsIterator<'a, S>,
642);
643
644impl<'a, S: BucketItemSerializer> AsyncBinaryReaderItemsIteratorCheckpoint<'a, S> {
645 pub fn next(&mut self) -> Option<(S::ReadType<'_>, &mut S::ExtraDataBuffer)> {
646 let item = self.0.deserializer.read_from(
647 &mut self.0.stream,
648 &mut self.0.buffer,
649 &mut self.0.extra_buffer,
650 )?;
651 Some((item, &mut self.0.extra_buffer))
652 }
653}