chunk_streamer/
chunk_receiver.rs1use std::pin::Pin;
2use std::task::{Context, Poll};
3use bytes::Bytes;
4use futures::Stream;
5use futures_util::{FutureExt};
6use log::{debug, info};
7use self_encryption::Error;
8use tokio::sync::mpsc::{Receiver};
9use tokio::task::{JoinHandle};
10
11pub struct ChunkReceiver {
12 receiver: Receiver<JoinHandle<Result<Bytes, Error>>>,
13 id: String,
14 file_position: usize,
15 chunk_index: i32,
16 current_task: Option<JoinHandle<Result<Bytes, Error>>>,
17}
18
19impl ChunkReceiver {
20 pub fn new(receiver: Receiver<JoinHandle<Result<Bytes, Error>>>, id: String) -> ChunkReceiver {
21 ChunkReceiver { receiver, id, file_position: 0, chunk_index: 1, current_task: None }
22 }
23
24 fn poll_current_task(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Bytes, Error>>> {
25 match self.current_task.as_mut() {
26 Some(join_handle) => {
27 match join_handle.poll_unpin(cx) {
28 Poll::Pending => {
29 debug!("Join handle pending");
30 Poll::Pending
31 }
32 Poll::Ready(result) => {
33 match result {
34 Ok(result) => match result {
35 Ok(data) => {
36 let bytes_read = data.len();
37 if bytes_read > 0 {
38 info!("Read [{}] bytes from chunk [{}] at file position [{}] for ID [{}]", bytes_read, self.chunk_index, self.file_position, self.id);
39 self.file_position += bytes_read;
40 self.chunk_index += 1;
41 self.current_task = None;
42 Poll::Ready(Some(Ok(data))) } else {
44 info!("No more data at file position [{}] for ID [{}]", self.file_position, self.id);
45 debug!("End of stream A - closing channel");
46 self.receiver.close();
47 Poll::Ready(None) }
49 }
50 Err(e) => {
51 self.receiver.close();
52 debug!("Error getting chunk data: {:?}", e);
53 Poll::Ready(Some(Err(e))) },
55 },
56 Err(e) => {
57 self.receiver.close();
58 debug!("Error getting chunk data: {:?}", e);
59 Poll::Ready(Some(Err(Error::Generic(e.to_string())))) }
61 }
62 },
63 }
64 },
65 None => {
66 debug!("End of stream B - closing channel");
67 self.receiver.close();
68 Poll::Ready(None) }
70 }
71 }
72}
73
74impl Stream for ChunkReceiver {
75 type Item = Result<Bytes, Error>;
76 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
77 if self.current_task.is_none() {
78 match self.receiver.poll_recv(cx) {
79 Poll::Pending => {
80 debug!("Pending data in receiver");
81 Poll::Pending
82 },
83 Poll::Ready(maybe_join_handle) => {
84 self.current_task = maybe_join_handle;
85 self.poll_current_task(cx)
86 }
87 }
88 } else {
89 match self.poll_current_task(cx) {
90 Poll::Pending => {
91 debug!("Pending join handle finishing");
92 Poll::Pending
93 },
94 Poll::Ready(data) => {
95 debug!("Returning join handle result");
96 Poll::Ready(data)
97 }
98 }
99 }
100 }
101}