use crate::errors::Error;
use crate::Client;
use bincode::{deserialize, serialize};
use log::{info, trace};
use serde::{Deserialize, Serialize};
use crate::client::blob_storage::{BlobStorage, BlobStorageDryRun};
use self_encryption::{DataMap, SelfEncryptor};
use sn_data_types::{Blob, BlobAddress, PrivateBlob, PublicBlob};
use sn_messaging::{BlobRead, BlobWrite, DataCmd, DataQuery, Query, QueryResponse};
#[derive(Serialize, Deserialize)]
enum DataMapLevel {
Root(DataMap),
Child(DataMap),
}
impl Client {
pub async fn read_blob(
&self,
address: BlobAddress,
position: Option<u64>,
len: Option<u64>,
) -> Result<Vec<u8>, Error>
where
Self: Sized,
{
trace!("Fetch Blob");
let data = self.fetch_blob_from_network(address).await?;
let published = address.is_pub();
let data_map = self.unpack(data).await?;
let raw_data = self
.read_using_data_map(data_map, published, position, len)
.await?;
Ok(raw_data)
}
pub async fn store_public_blob(&self, data: &[u8]) -> Result<BlobAddress, Error> {
self.create_new_blob(data, true).await
}
pub async fn store_private_blob(&self, data: &[u8]) -> Result<BlobAddress, Error> {
self.create_new_blob(data, false).await
}
async fn create_new_blob(&self, data: &[u8], published: bool) -> Result<BlobAddress, Error> {
let data_map = self.write_to_network(data, published).await?;
let data = serialize(&DataMapLevel::Root(data_map))?;
let data_map_blob = self.pack(data, published).await?;
let data_map_address = *data_map_blob.address();
self.store_blob_on_network(data_map_blob).await?;
Ok(data_map_address)
}
pub(crate) async fn fetch_blob_from_network(
&self,
address: BlobAddress,
) -> Result<Blob, Error> {
let res = self
.send_query(Query::Data(DataQuery::Blob(BlobRead::Get(address))))
.await?;
let data: Blob = match res {
QueryResponse::GetBlob(res) => res.map_err(Error::from),
_ => return Err(Error::ReceivedUnexpectedEvent),
}?;
Ok(data)
}
pub(crate) async fn store_blob_on_network(&self, blob: Blob) -> Result<(), Error> {
if !blob.validate_size() {
return Err(Error::NetworkDataError(sn_data_types::Error::ExceededSize));
}
let cmd = DataCmd::Blob(BlobWrite::New(blob));
self.pay_and_send_data_command(cmd).await?;
Ok(())
}
pub async fn delete_blob(&self, address: BlobAddress) -> Result<(), Error> {
info!("Deleting blob at given address: {:?}", address);
let cmd = DataCmd::Blob(BlobWrite::DeletePrivate(address));
self.pay_and_send_data_command(cmd).await?;
Ok(())
}
pub async fn generate_data_map(&self, the_blob: &Blob) -> Result<DataMap, Error> {
let blob_storage = BlobStorageDryRun::new(self.clone(), the_blob.is_pub());
let self_encryptor =
SelfEncryptor::new(blob_storage, DataMap::None).map_err(Error::SelfEncryption)?;
self_encryptor
.write(the_blob.value(), 0)
.await
.map_err(Error::SelfEncryption)?;
let (data_map, _) = self_encryptor
.close()
.await
.map_err(Error::SelfEncryption)?;
Ok(data_map)
}
async fn write_to_network(&self, data: &[u8], published: bool) -> Result<DataMap, Error> {
let blob_storage = BlobStorage::new(self.clone(), published);
let self_encryptor = SelfEncryptor::new(blob_storage.clone(), DataMap::None)
.map_err(Error::SelfEncryption)?;
self_encryptor
.write(data, 0)
.await
.map_err(Error::SelfEncryption)?;
let (data_map, _) = self_encryptor
.close()
.await
.map_err(Error::SelfEncryption)?;
Ok(data_map)
}
async fn read_using_data_map(
&self,
data_map: DataMap,
published: bool,
position: Option<u64>,
len: Option<u64>,
) -> Result<Vec<u8>, Error> {
let blob_storage = BlobStorage::new(self.clone(), published);
let self_encryptor =
SelfEncryptor::new(blob_storage, data_map).map_err(Error::SelfEncryption)?;
let length = match len {
None => self_encryptor.len().await,
Some(request_length) => request_length,
};
let read_position = position.unwrap_or(0);
match self_encryptor.read(read_position, length).await {
Ok(data) => Ok(data),
Err(error) => Err(Error::SelfEncryption(error)),
}
}
async fn pack(&self, mut contents: Vec<u8>, published: bool) -> Result<Blob, Error> {
loop {
let data: Blob = if published {
PublicBlob::new(contents).into()
} else {
PrivateBlob::new(contents, self.public_key().await)?.into()
};
if data.validate_size() {
return Ok(data);
} else {
let serialized_blob = serialize(&data)?;
let data_map = self.write_to_network(&serialized_blob, published).await?;
contents = serialize(&DataMapLevel::Child(data_map))?
}
}
}
async fn unpack(&self, mut data: Blob) -> Result<DataMap, Error> {
loop {
let published = data.is_pub();
match deserialize(data.value())? {
DataMapLevel::Root(data_map) => {
return Ok(data_map);
}
DataMapLevel::Child(data_map) => {
let serialized_blob = self
.read_using_data_map(data_map, published, None, None)
.await?;
data = deserialize(&serialized_blob)?;
}
}
}
}
}
#[allow(missing_docs)]
#[cfg(any(test, feature = "simulated-payouts"))]
pub mod exported_tests {
use super::{Blob, BlobAddress, Client, Error};
use crate::utils::{generate_random_vector, test_utils::gen_bls_keypair};
use anyhow::{bail, Result};
use sn_data_types::{Money, PrivateBlob, PublicBlob};
use sn_messaging::Error as ErrorMessage;
use std::str::FromStr;
use unwrap::unwrap;
pub async fn pub_blob_test() -> Result<()> {
let client = Client::new(None, None).await?;
let _start_bal = unwrap!(Money::from_str("10"));
let value = generate_random_vector::<u8>(10);
let data = Blob::Public(PublicBlob::new(value.clone()));
let address = *data.address();
let _pk = gen_bls_keypair().public_key();
let res = client
.read_blob(address, None, None)
.await;
match res {
Ok(data) => bail!("Pub blob should not exist yet: {:?}", data),
Err(Error::ErrorMessage(ErrorMessage::NoSuchData)) => (),
Err(e) => bail!("Unexpected: {:?}", e),
}
let address = client.store_public_blob(&value).await?;
let mut fetched_data = client.read_blob(address, None, None).await;
while fetched_data.is_err() {
fetched_data = client.read_blob(address, None, None).await;
}
assert_eq!(value, fetched_data?);
Ok(())
}
pub async fn unpub_blob_test() -> Result<()> {
let client = Client::new(None, None).await?;
let pk = client.public_key().await;
let value = generate_random_vector::<u8>(10);
let data = Blob::Private(PrivateBlob::new(value.clone(), pk)?);
let address = *data.address();
let res = client
.read_blob(address, None, None)
.await;
match res {
Ok(_) => bail!("Private blob should not exist yet"),
Err(Error::ErrorMessage(ErrorMessage::NoSuchData)) => (),
Err(e) => bail!("Unexpected: {:?}", e),
}
let address = client.store_private_blob(&value).await?;
let mut res = client.read_blob(address, None, None).await;
while res.is_err() {
res = client.read_blob(address, None, None).await;
}
let _ = client.store_private_blob(&value).await;
let _ = match tokio::time::timeout(
std::time::Duration::from_secs(60),
client.notification_receiver.clone().lock().await.recv(),
)
.await
{
Ok(Some(Error::ErrorMessage(ErrorMessage::DataExists))) => {
}
Ok(Some(error)) => bail!("Expecting DataExists error got: {:?}", error),
Ok(None) => bail!("Expecting DataExists Error, got None"),
Err(_) => bail!("Timeout when expecting DataExists error"),
};
let pub_address = client.store_public_blob(&value).await?;
let mut fetched_data = client.read_blob(pub_address, None, None).await;
while fetched_data.is_err() {
fetched_data = client.read_blob(pub_address, None, None).await;
}
client.delete_blob(address).await?;
let mut fetched_data = client.read_blob(address, None, None).await;
while fetched_data.is_ok() {
fetched_data = client.read_blob(address, None, None).await;
}
let _ = client.store_private_blob(&value).await?;
Ok(())
}
pub async fn blob_deletions_should_cost_put_price() -> Result<()> {
let client = Client::new(None, None).await?;
let address = client
.store_private_blob(&generate_random_vector::<u8>(10))
.await?;
let balance_before_delete = client.get_balance().await?;
client.delete_blob(address).await?;
let new_balance = client.get_balance().await?;
assert_ne!(balance_before_delete, Money::from_str("0")?);
assert_ne!(balance_before_delete, new_balance);
Ok(())
}
pub async fn create_and_retrieve_1kb_pub_unencrypted() -> Result<()> {
let size = 1024;
gen_data_then_create_and_retrieve(size, true).await?;
Ok(())
}
pub async fn create_and_retrieve_1kb_private_unencrypted() -> Result<()> {
let size = 1024;
gen_data_then_create_and_retrieve(size, false).await?;
Ok(())
}
pub async fn create_and_retrieve_1kb_put_pub_retrieve_private() -> Result<()> {
let size = 1024;
let data = generate_random_vector(size);
let client = Client::new(None, None).await?;
let address = client.store_public_blob(&data).await?;
let res = client
.read_blob(BlobAddress::Private(*address.name()), None, None)
.await;
assert!(res.is_err());
Ok(())
}
pub async fn create_and_retrieve_1kb_put_private_retrieve_pub() -> Result<()> {
let size = 1024;
let value = generate_random_vector(size);
let client = Client::new(None, None).await?;
let address = client.store_private_blob(&value).await?;
let res = client
.read_blob(BlobAddress::Public(*address.name()), None, None)
.await;
assert!(res.is_err());
Ok(())
}
pub async fn create_and_retrieve_10mb_private() -> Result<()> {
let size = 1024 * 1024 * 10;
gen_data_then_create_and_retrieve(size, false).await?;
Ok(())
}
pub async fn create_and_retrieve_10mb_public() -> Result<()> {
let size = 1024 * 1024 * 10;
gen_data_then_create_and_retrieve(size, true).await?;
Ok(())
}
pub async fn create_and_retrieve_index_based() -> Result<()> {
create_and_index_based_retrieve(1024).await
}
async fn create_and_index_based_retrieve(size: usize) -> Result<()> {
let data = generate_random_vector(size);
{
let client = Client::new(None, None).await?;
let address = client.store_public_blob(&data).await?;
let mut fetch_res = client.read_blob(address, None, Some(size as u64 / 2)).await;
while fetch_res.is_err() {
fetch_res = client.read_blob(address, None, Some(size as u64 / 2)).await;
}
let fetched_data = fetch_res?;
assert_eq!(fetched_data, data[0..size / 2].to_vec());
}
let data = generate_random_vector(size);
{
let client = Client::new(None, None).await?;
let address = client.store_public_blob(&data).await?;
let mut fetch_res = client
.read_blob(address, Some(size as u64 / 2), Some(size as u64 / 2))
.await;
while fetch_res.is_err() {
fetch_res = client
.read_blob(address, Some(size as u64 / 2), Some(size as u64 / 2))
.await;
}
let fetched_data = fetch_res?;
assert_eq!(fetched_data, data[size / 2..size].to_vec());
}
Ok(())
}
#[allow(clippy::match_wild_err_arm)]
async fn gen_data_then_create_and_retrieve(size: usize, publish: bool) -> Result<()> {
let raw_data = generate_random_vector(size);
let client = Client::new(None, None).await?;
let blob = if publish {
Blob::Public(PublicBlob::new(raw_data.clone()))
} else {
Blob::Private(PrivateBlob::new(
raw_data.clone(),
client.public_key().await,
)?)
};
let address_before = blob.address();
let res = client.read_blob(*address_before, None, None).await;
match res {
Err(Error::ErrorMessage(ErrorMessage::NoSuchData)) => (),
Ok(_) => bail!("Blob unexpectedly retrieved using address generated by gen_data_map"),
Err(_) => bail!(
"Unexpected error when Blob retrieved using address generated by gen_data_map"
),
};
let address = if publish {
client.store_public_blob(&raw_data).await?
} else {
client.store_private_blob(&raw_data).await?
};
let mut fetch_result;
fetch_result = client.read_blob(address, None, None).await;
while fetch_result.is_err() {
fetch_result = client.read_blob(address, None, None).await;
}
assert_eq!(fetch_result?, raw_data);
Ok(())
}
}
#[allow(missing_docs)]
#[cfg(all(test, feature = "simulated-payouts"))]
mod tests {
use super::exported_tests;
use anyhow::Result;
#[tokio::test]
async fn pub_blob_test() -> Result<()> {
exported_tests::pub_blob_test().await
}
#[tokio::test]
async fn unpub_blob_test() -> Result<()> {
exported_tests::unpub_blob_test().await
}
#[tokio::test]
async fn blob_deletions_should_cost_put_price() -> Result<()> {
exported_tests::blob_deletions_should_cost_put_price().await
}
#[tokio::test]
async fn create_and_retrieve_1kb_pub_unencrypted() -> Result<()> {
exported_tests::create_and_retrieve_1kb_pub_unencrypted().await
}
#[tokio::test]
async fn create_and_retrieve_1kb_private_unencrypted() -> Result<()> {
exported_tests::create_and_retrieve_1kb_private_unencrypted().await
}
#[tokio::test]
async fn create_and_retrieve_1kb_put_pub_retrieve_private() -> Result<()> {
exported_tests::create_and_retrieve_1kb_put_pub_retrieve_private().await
}
#[tokio::test]
async fn create_and_retrieve_1kb_put_private_retrieve_pub() -> Result<()> {
exported_tests::create_and_retrieve_1kb_put_private_retrieve_pub().await
}
#[tokio::test]
async fn create_and_retrieve_10mb_private() -> Result<()> {
exported_tests::create_and_retrieve_10mb_private().await
}
#[tokio::test]
async fn create_and_retrieve_10mb_public() -> Result<()> {
exported_tests::create_and_retrieve_10mb_public().await
}
#[tokio::test]
async fn create_and_retrieve_index_based() -> Result<()> {
exported_tests::create_and_retrieve_index_based().await
}
}