chunk_streamer/
chunk_receiver.rs

1use std::pin::Pin;
2use std::task::{Context, Poll};
3use bytes::Bytes;
4use futures::Stream;
5use futures_util::{FutureExt};
6use log::{debug, info};
7use self_encryption::Error;
8use tokio::sync::mpsc::{Receiver};
9use tokio::task::{JoinHandle};
10
11pub struct ChunkReceiver {
12    receiver: Receiver<JoinHandle<Result<Bytes, Error>>>,
13    id: String,
14    file_position: usize,
15    chunk_index: i32,
16    current_task: Option<JoinHandle<Result<Bytes, Error>>>,
17}
18
19impl ChunkReceiver {
20    pub fn new(receiver: Receiver<JoinHandle<Result<Bytes, Error>>>, id: String) -> ChunkReceiver {
21        ChunkReceiver { receiver, id, file_position: 0, chunk_index: 1, current_task: None }
22    }
23
24    fn poll_current_task(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Bytes, Error>>> {
25        match self.current_task.as_mut() {
26            Some(join_handle) => {
27                match join_handle.poll_unpin(cx) {
28                    Poll::Pending => {
29                        debug!("Join handle pending");
30                        Poll::Pending
31                    }
32                    Poll::Ready(result) => {
33                        let data = match result {
34                            Ok(result) => match result {
35                                Ok(bytes) => bytes,
36                                Err(e) => {
37                                    self.receiver.close();
38                                    panic!("Error getting chunk data: {:?}", e)
39                                },
40                            },
41                            Err(e) => {
42                                self.receiver.close();
43                                panic!("Error getting chunk data: {:?}", e)
44                            }
45                        };
46                        let bytes_read = data.len();
47                        if bytes_read > 0 {
48                            info!("Read [{}] bytes from chunk [{}] at file position [{}] for ID [{}]", bytes_read, self.chunk_index, self.file_position, self.id);
49                            self.file_position += bytes_read;
50                            self.chunk_index += 1;
51                            self.current_task = None;
52                            Poll::Ready(Some(Ok(data))) // Sending data to the client here
53                        } else {
54                            info!("No more data at file position [{}] for ID [{}]", self.file_position, self.id);
55                            debug!("End of stream A - closing channel");
56                            self.receiver.close();
57                            Poll::Ready(None) // end of stream - break
58                        }
59                    },
60                }
61            },
62            None => {
63                debug!("End of stream B - closing channel");
64                self.receiver.close();
65                Poll::Ready(None) // end of stream - break
66            }
67        }
68    }
69}
70
71impl Stream for ChunkReceiver {
72    type Item = Result<Bytes, Error>;
73    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
74        if self.current_task.is_none() {
75            match self.receiver.poll_recv(cx) {
76                Poll::Pending => {
77                    debug!("Pending data in receiver");
78                    Poll::Pending
79                },
80                Poll::Ready(maybe_join_handle) => {
81                    self.current_task = maybe_join_handle;
82                    self.poll_current_task(cx)
83                }
84            }
85        } else {
86            match self.poll_current_task(cx) {
87                Poll::Pending => {
88                    debug!("Pending join handle finishing");
89                    Poll::Pending
90                },
91                Poll::Ready(data) => {
92                    debug!("Returning join handle result");
93                    Poll::Ready(data)
94                }
95            }
96        }
97    }
98}