chunk_streamer/
chunk_streamer.rs

1use crate::chunk_receiver::ChunkReceiver;
2use crate::chunk_sender::ChunkSender;
3use async_trait::async_trait;
4use autonomi::client::GetError;
5use autonomi::{Chunk, ChunkAddress};
6use self_encryption::{streaming_decrypt, DataMap};
7use tokio::sync::mpsc::channel;
8use crate::chunk_getter_wrapper::ChunkGetterWrapper;
9
10#[async_trait]
11pub trait ChunkGetter: Clone + Send + Sync + 'static {
12    async fn chunk_get(&self, address: &ChunkAddress) -> Result<Chunk, GetError>;
13}
14
15pub struct ChunkStreamer<T> {
16    id: String,
17    data_map: DataMap,
18    chunk_getter: T,
19    download_threads: usize,
20}
21
22impl<T: ChunkGetter> ChunkStreamer<T> {
23    pub fn new(id: String, data_map: DataMap, chunk_getter: T, download_threads: usize) -> ChunkStreamer<T> {
24        ChunkStreamer { id, data_map, chunk_getter, download_threads }
25    }
26    
27    pub fn open(&self, range_from: u64, range_to: u64) -> ChunkReceiver {
28        let (sender, receiver) = channel(self.download_threads);
29        let chunk_sender = ChunkSender::new(sender, self.id.clone(), self.chunk_getter.clone(), self.data_map.clone());
30        tokio::spawn( Box::pin(async move { chunk_sender.send(range_from, range_to).await; }));
31        ChunkReceiver::new(receiver, self.id.clone())
32    }
33
34    pub async fn get_stream_size(&self) -> usize {
35        let local_data_map = self.data_map.clone();
36        let local_chunk_getter = self.chunk_getter.clone();
37
38        let join_handle = tokio::task::spawn_blocking(move || {
39            let chunk_getter_wrapper = ChunkGetterWrapper::new(local_chunk_getter);
40            let get_chunk_functor = chunk_getter_wrapper.get_chunk_functor();
41            let stream = streaming_decrypt(&local_data_map, get_chunk_functor)
42                .expect("failed to execute streaming_decrypt");
43            stream.file_size()
44        });
45        join_handle.await.unwrap_or(0)
46    }
47}