chunk_streamer/
chunk_streamer.rs1use crate::chunk_receiver::ChunkReceiver;
2use crate::chunk_sender::ChunkSender;
3use async_trait::async_trait;
4use autonomi::client::GetError;
5use autonomi::{Chunk, ChunkAddress};
6use bytes::Bytes;
7use self_encryption::streaming_decrypt;
8use tokio::sync::mpsc::channel;
9use crate::chunk_getter::blocking_chunk_getter;
10use crate::data_map_builder::DataMapBuilder;
11
12#[async_trait]
13pub trait ChunkGetter: Clone + Send + Sync + 'static {
14 async fn chunk_get(&self, address: &ChunkAddress) -> Result<Chunk, GetError>;
15}
16
17pub struct ChunkStreamer<T> {
18 id: String,
19 data_map_chunk_bytes: Bytes,
20 chunk_getter: T,
21 download_threads: usize,
22}
23
24impl<T: ChunkGetter> ChunkStreamer<T> {
25 pub fn new(id: String, data_map_chunk_bytes: Bytes, chunk_getter: T, download_threads: usize) -> ChunkStreamer<T> {
26 ChunkStreamer { id, data_map_chunk_bytes, chunk_getter, download_threads }
27 }
28
29 pub async fn open(&self, range_from: u64, range_to: u64) -> Result<ChunkReceiver, GetError> {
30 let data_map_builder = DataMapBuilder::new(self.chunk_getter.clone(), self.download_threads);
31 match data_map_builder.get_data_map_from_bytes(&self.data_map_chunk_bytes).await {
32 Ok(data_map) => {
33 let (sender, receiver) = channel(self.download_threads);
34 let chunk_sender = ChunkSender::new(sender, self.id.clone(), self.chunk_getter.clone(), data_map);
35 tokio::spawn(Box::pin(async move { chunk_sender.send(range_from, range_to).await; }));
36 Ok(ChunkReceiver::new(receiver, self.id.clone()))
37 },
38 Err(error) => Err(error)
39 }
40 }
41
42 pub async fn get_stream_size(&self) -> usize {
43 let data_map_builder = DataMapBuilder::new(self.chunk_getter.clone(), self.download_threads);
44 let data_map = data_map_builder.get_data_map_from_bytes(&self.data_map_chunk_bytes).await.expect("failed to build data map from chunk");
45 let local_chunk_getter = self.chunk_getter.clone();
46
47 let join_handle = tokio::task::spawn_blocking(move || {
48 let get_chunk_functor = blocking_chunk_getter(local_chunk_getter);
49 let stream = streaming_decrypt(&data_map, get_chunk_functor)
50 .expect("failed to execute streaming_decrypt");
51 stream.file_size()
52 });
53 join_handle.await.unwrap_or(0)
54 }
55}