1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/*#[double]
use crate::client::CachingClient;
#[double]
use crate::client::StreamingClient;
use crate::client::PublicDataCachingClient;
use crate::client::PUBLIC_ARCHIVE_CACHE_KEY;
use autonomi::client::payment::PaymentOption;
use autonomi::data::DataAddress;
use autonomi::files::archive_public::ArchiveAddress;
use autonomi::files::PublicArchive;
use bytes::Bytes;
use log::info;
use mockall::mock;
use mockall_double::double;
use crate::error::CreateError;
use crate::controller::StoreType;
use crate::error::public_archive_error::PublicArchiveError;
#[derive(Debug, Clone)]
pub struct PublicArchiveCachingClient {
caching_client: CachingClient,
streaming_client: StreamingClient,
}
mock! {
#[derive(Debug)]
pub PublicArchiveCachingClient {
pub fn new(caching_client: CachingClient, streaming_client: StreamingClient) -> Self;
pub async fn archive_put_public(&self, archive: &PublicArchive, payment_option: PaymentOption, store_type: StoreType) -> Result<ArchiveAddress, PublicArchiveError>;
pub async fn archive_get_public(&self, archive_address: ArchiveAddress) -> Result<PublicArchive, PublicArchiveError>;
pub async fn archive_get_public_raw(&self, addr: &DataAddress) -> Result<Bytes, PublicArchiveError>;
}
impl Clone for PublicArchiveCachingClient {
fn clone(&self) -> Self;
}
}
impl PublicArchiveCachingClient {
pub fn new(caching_client: CachingClient, streaming_client: StreamingClient) -> Self {
Self { caching_client, streaming_client }
}
pub async fn archive_put_public(&self, archive: &PublicArchive, payment_option: PaymentOption, store_type: StoreType) -> Result<ArchiveAddress, PublicArchiveError> {
match archive.to_bytes() {
Ok(bytes) => {
// todo: can this be injected?
let public_data_caching_client = PublicDataCachingClient::new(self.caching_client.clone(), self.streaming_client.clone());
Ok(public_data_caching_client.data_put_public(bytes, payment_option, store_type).await?)
},
Err(e) => Err(CreateError::Serialization(format!("Failed to serialize archive: {}", e.to_string())).into()),
}
}
/// Fetch an archive from the network
pub async fn archive_get_public(&self, archive_address: ArchiveAddress) -> Result<PublicArchive, PublicArchiveError> {
match self.archive_get_public_raw(&archive_address).await {
Ok(bytes) => match PublicArchive::from_bytes(bytes) {
Ok(public_archive) => Ok(public_archive),
Err(e) => Err(CreateError::Serialization(format!("Failed to deserialize archive: {}", e.to_string())).into()),
},
Err(e) => Err(e),
}
}
pub async fn archive_get_public_raw(&self, addr: &DataAddress) -> Result<Bytes, PublicArchiveError> {
let local_streaming_client = self.streaming_client.clone();
let local_address = addr.clone();
match self.caching_client.get_hybrid_cache().get_ref().get_or_fetch(&format!("{}{}", PUBLIC_ARCHIVE_CACHE_KEY, local_address.to_hex()), || async move {
// todo: optimise range_to to first chunk length (to avoid downloading other chunks when not needed)
let maybe_bytes = local_streaming_client.download_stream(&local_address, 0, 524288).await;
match maybe_bytes {
Ok(bytes) => {
info!("retrieved public data for [{}] from network - storing in hybrid cache", local_address.to_hex());
Ok(Vec::from(bytes))
},
Err(e) => Err(anyhow::anyhow!(format!("Failed to download stream for [{}] from network: {:?}", local_address.to_hex(), e)))
}
}).await {
Ok(cache_entry) => {
info!("retrieved public data for [{}] from hybrid cache", addr.to_hex());
Ok(Bytes::from(cache_entry.value().to_vec()))
},
Err(e) => Err(PublicArchiveError::GetError(e.into()))
}
}
}*/