chunk_streamer/
chunk_streamer.rs1use ant_core::data::{DataChunk, XorName};
2use crate::chunk_receiver::ChunkReceiver;
3use crate::chunk_sender::ChunkSender;
4use async_trait::async_trait;
5use ant_core::data::error::{Result};
6use bytes::Bytes;
7use log::warn;
8use self_encryption::{streaming_decrypt};
9use tokio::sync::mpsc::channel;
10use crate::chunk_getter::blocking_chunk_getter;
11use crate::data_map_builder::DataMapBuilder;
12
13#[async_trait]
14pub trait ChunkGetter: Clone + Send + Sync + 'static {
15 async fn chunk_get(&self, address: &XorName) -> Result<Option<DataChunk>>;
16}
17
18pub struct ChunkStreamer<T> {
19 id: String,
20 data_map_chunk_bytes: Bytes,
21 chunk_getter: T,
22 download_threads: usize,
23}
24
25impl<T: ChunkGetter> ChunkStreamer<T> {
26 pub fn new(id: String, data_map_chunk_bytes: Bytes, chunk_getter: T, download_threads: usize) -> ChunkStreamer<T> {
27 ChunkStreamer { id, data_map_chunk_bytes, chunk_getter, download_threads }
28 }
29
30 pub async fn open(&self, range_from: u64, range_to: u64) -> Result<ChunkReceiver> {
31 let data_map_builder = DataMapBuilder::new(self.chunk_getter.clone(), self.download_threads);
32 match data_map_builder.get_data_map_from_bytes(&self.data_map_chunk_bytes).await {
33 Ok(data_map) => {
34 let (sender, receiver) = channel(self.download_threads);
35 let chunk_sender = ChunkSender::new(sender, self.id.clone(), self.chunk_getter.clone(), data_map);
36 tokio::spawn(Box::pin(async move { chunk_sender.send(range_from, range_to).await }));
37 Ok(ChunkReceiver::new(receiver, self.id.clone()))
38 },
39 Err(_) => {
40 let (sender, receiver) = channel(self.download_threads);
42 let raw_chunk = self.data_map_chunk_bytes.clone();
43 let join_handle = tokio::task::spawn_blocking(move || { Ok(raw_chunk) });
44 tokio::spawn(Box::pin(async move { sender.send(join_handle).await }));
45 Ok(ChunkReceiver::new(receiver, self.id.clone()))
46 }
47 }
48 }
49
50 pub async fn get_stream_size(&self) -> usize {
51 let data_map_builder = DataMapBuilder::new(self.chunk_getter.clone(), self.download_threads);
52 let data_map = match data_map_builder.get_data_map_from_bytes(&self.data_map_chunk_bytes).await {
53 Ok(data_map) => data_map,
54 Err(e) => {
55 warn!("failed to build data map from chunk: {}", e);
56 return 0;
57 }
58 };
59 let local_chunk_getter = self.chunk_getter.clone();
60
61 let join_handle = tokio::task::spawn_blocking(move || {
62 let get_chunk_functor = blocking_chunk_getter(local_chunk_getter);
63 match streaming_decrypt(&data_map, get_chunk_functor) {
64 Ok(stream) => stream.file_size(),
65 Err(e) => {
66 warn!("failed to call streaming_decrypt: {}", e);
67 0
68 }
69 }
70 });
71 join_handle.await.unwrap_or(0)
72 }
73}