Skip to main content

chunk_streamer/
chunk_receiver.rs

1use std::pin::Pin;
2use std::task::{Context, Poll};
3use bytes::Bytes;
4use futures::Stream;
5use futures_util::{FutureExt};
6use log::{debug, info};
7use self_encryption::Error;
8use tokio::sync::mpsc::{Receiver};
9use tokio::task::{JoinHandle};
10
11pub struct ChunkReceiver {
12    receiver: Receiver<JoinHandle<Result<Bytes, Error>>>,
13    id: String,
14    file_position: usize,
15    chunk_index: i32,
16    current_task: Option<JoinHandle<Result<Bytes, Error>>>,
17}
18
19impl ChunkReceiver {
20    pub fn new(receiver: Receiver<JoinHandle<Result<Bytes, Error>>>, id: String) -> ChunkReceiver {
21        ChunkReceiver { receiver, id, file_position: 0, chunk_index: 1, current_task: None }
22    }
23
24    fn poll_current_task(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Bytes, Error>>> {
25        match self.current_task.as_mut() {
26            Some(join_handle) => {
27                match join_handle.poll_unpin(cx) {
28                    Poll::Pending => {
29                        debug!("Join handle pending");
30                        Poll::Pending
31                    }
32                    Poll::Ready(result) => {
33                        match result {
34                            Ok(result) => match result {
35                                Ok(data) => {
36                                    let bytes_read = data.len();
37                                    if bytes_read > 0 {
38                                        info!("Read [{}] bytes from chunk [{}] at file position [{}] for ID [{}]", bytes_read, self.chunk_index, self.file_position, self.id);
39                                        self.file_position += bytes_read;
40                                        self.chunk_index += 1;
41                                        self.current_task = None;
42                                        Poll::Ready(Some(Ok(data))) // Sending data to the client here
43                                    } else {
44                                        info!("No more data at file position [{}] for ID [{}]", self.file_position, self.id);
45                                        debug!("End of stream A - closing channel");
46                                        self.receiver.close();
47                                        Poll::Ready(None) // end of stream - break
48                                    }
49                                }
50                                Err(e) => {
51                                    self.receiver.close();
52                                    debug!("Error getting chunk data: {:?}", e);
53                                    Poll::Ready(Some(Err(e))) // Sending error to the client here
54                                },
55                            },
56                            Err(e) => {
57                                self.receiver.close();
58                                debug!("Error getting chunk data: {:?}", e);
59                                Poll::Ready(Some(Err(Error::Generic(e.to_string())))) // Sending error to the client here
60                            }
61                        }
62                    },
63                }
64            },
65            None => {
66                debug!("End of stream B - closing channel");
67                self.receiver.close();
68                Poll::Ready(None) // end of stream - break
69            }
70        }
71    }
72}
73
74impl Stream for ChunkReceiver {
75    type Item = Result<Bytes, Error>;
76    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
77        if self.current_task.is_none() {
78            match self.receiver.poll_recv(cx) {
79                Poll::Pending => {
80                    debug!("Pending data in receiver");
81                    Poll::Pending
82                },
83                Poll::Ready(maybe_join_handle) => {
84                    self.current_task = maybe_join_handle;
85                    self.poll_current_task(cx)
86                }
87            }
88        } else {
89            match self.poll_current_task(cx) {
90                Poll::Pending => {
91                    debug!("Pending join handle finishing");
92                    Poll::Pending
93                },
94                Poll::Ready(data) => {
95                    debug!("Returning join handle result");
96                    Poll::Ready(data)
97                }
98            }
99        }
100    }
101}