chunk_streamer/
chunk_receiver.rs1use std::pin::Pin;
2use std::task::{Context, Poll};
3use bytes::Bytes;
4use futures::Stream;
5use futures_util::{FutureExt};
6use log::{debug, info};
7use self_encryption_old::Error;
8use tokio::sync::mpsc::{Receiver};
9use tokio::task::{JoinHandle};
10
11pub struct ChunkReceiver {
12 receiver: Receiver<JoinHandle<Result<Bytes, Error>>>,
13 id: String,
14 file_position: usize,
15 chunk_index: i32,
16 current_task: Option<JoinHandle<Result<Bytes, Error>>>,
17}
18
19impl ChunkReceiver {
20 pub fn new(receiver: Receiver<JoinHandle<Result<Bytes, Error>>>, id: String) -> ChunkReceiver {
21 ChunkReceiver { receiver, id, file_position: 0, chunk_index: 1, current_task: None }
22 }
23
24 fn poll_current_task(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Bytes, Error>>> {
25 match self.current_task.as_mut() {
26 Some(join_handle) => {
27 match join_handle.poll_unpin(cx) {
28 Poll::Pending => {
29 debug!("Join handle pending");
30 Poll::Pending
31 }
32 Poll::Ready(result) => {
33 let data = match result {
34 Ok(result) => match result {
35 Ok(bytes) => bytes,
36 Err(e) => {
37 self.receiver.close();
38 panic!("Error getting chunk data: {:?}", e)
39 },
40 },
41 Err(e) => {
42 self.receiver.close();
43 panic!("Error getting chunk data: {:?}", e)
44 }
45 };
46 let bytes_read = data.len();
47 if bytes_read > 0 {
48 info!("Read [{}] bytes from chunk [{}] at file position [{}] for ID [{}]", bytes_read, self.chunk_index, self.file_position, self.id);
49 self.file_position += bytes_read;
50 self.chunk_index += 1;
51 self.current_task = None;
52 Poll::Ready(Some(Ok(data))) } else {
54 info!("No more data at file position [{}] for ID [{}]", self.file_position, self.id);
55 debug!("End of stream A - closing channel");
56 self.receiver.close();
57 Poll::Ready(None) }
59 },
60 }
61 },
62 None => {
63 debug!("End of stream B - closing channel");
64 self.receiver.close();
65 Poll::Ready(None) }
67 }
68 }
69}
70
71impl Stream for ChunkReceiver {
72 type Item = Result<Bytes, Error>;
73 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
74 if self.current_task.is_none() {
75 match self.receiver.poll_recv(cx) {
76 Poll::Pending => {
77 debug!("Pending data in receiver");
78 Poll::Pending
79 },
80 Poll::Ready(maybe_join_handle) => {
81 self.current_task = maybe_join_handle;
82 self.poll_current_task(cx)
83 }
84 }
85 } else {
86 match self.poll_current_task(cx) {
87 Poll::Pending => {
88 debug!("Pending join handle finishing");
89 Poll::Pending
90 },
91 Poll::Ready(data) => {
92 debug!("Returning join handle result");
93 Poll::Ready(data)
94 }
95 }
96 }
97 }
98}