use super::{
blob_storage::{BlobStorage, BlobStorageDryRun},
Client,
};
use crate::Error;
use bincode::{deserialize, serialize};
use log::{info, trace};
use self_encryption::{DataMap, SelfEncryptor};
use serde::{Deserialize, Serialize};
use sn_data_types::{Blob, BlobAddress, PrivateBlob, PublicBlob, PublicKey};
use sn_messaging::client::{BlobRead, BlobWrite, DataCmd, DataQuery, Query, QueryResponse};
#[derive(Serialize, Deserialize)]
enum DataMapLevel {
Root(DataMap),
Child(DataMap),
}
impl Client {
pub async fn read_blob(
&self,
address: BlobAddress,
position: Option<usize>,
len: Option<usize>,
) -> Result<Vec<u8>, Error>
where
Self: Sized,
{
trace!(
"Fetch Blob: {:?} Position: {:?} Len: {:?}",
&address,
&position,
&len
);
let data = self.fetch_blob_from_network(address).await?;
let public = address.is_public();
let data_map = self.unpack(data).await?;
let raw_data = self
.read_using_data_map(data_map, public, position, len)
.await?;
Ok(raw_data)
}
pub async fn store_public_blob(&self, data: &[u8]) -> Result<BlobAddress, Error> {
self.create_new_blob(data, true).await
}
pub async fn store_private_blob(&self, data: &[u8]) -> Result<BlobAddress, Error> {
self.create_new_blob(data, false).await
}
async fn create_new_blob(&self, data: &[u8], public: bool) -> Result<BlobAddress, Error> {
let data_map = self.write_to_network(data, public).await?;
let blob_content = serialize(&DataMapLevel::Root(data_map))?;
let blob = self.pack(blob_content, public).await?;
let blob_address = *blob.address();
self.store_blob_on_network(blob).await?;
Ok(blob_address)
}
pub(crate) async fn fetch_blob_from_network(
&self,
address: BlobAddress,
) -> Result<Blob, Error> {
let res = self
.send_query(Query::Data(DataQuery::Blob(BlobRead::Get(address))))
.await?;
let msg_id = res.msg_id;
let data: Blob = match res.response {
QueryResponse::GetBlob(result) => result.map_err(|err| Error::from((err, msg_id))),
_ => return Err(Error::ReceivedUnexpectedEvent),
}?;
Ok(data)
}
pub(crate) async fn delete_blob_from_network(&self, address: BlobAddress) -> Result<(), Error> {
let cmd = DataCmd::Blob(BlobWrite::DeletePrivate(address));
self.pay_and_send_data_command(cmd).await?;
Ok(())
}
pub(crate) async fn store_blob_on_network(&self, blob: Blob) -> Result<(), Error> {
if !blob.validate_size() {
return Err(Error::NetworkDataError(sn_data_types::Error::ExceededSize));
}
let cmd = DataCmd::Blob(BlobWrite::New(blob));
self.pay_and_send_data_command(cmd).await?;
Ok(())
}
pub async fn delete_blob(&self, address: BlobAddress) -> Result<(), Error> {
info!("Deleting blob at given address: {:?}", address);
let mut data = self.fetch_blob_from_network(address).await?;
self.delete_blob_from_network(address).await?;
loop {
match deserialize(data.value())? {
DataMapLevel::Root(data_map) => {
self.delete_using_data_map(data_map).await?;
return Ok(());
}
DataMapLevel::Child(data_map) => {
let serialized_blob = self
.read_using_data_map(data_map.clone(), false, None, None)
.await?;
self.delete_using_data_map(data_map).await?;
data = deserialize(&serialized_blob)?;
}
}
}
}
pub async fn blob_data_map(
mut data: Vec<u8>,
privately_owned: Option<PublicKey>,
) -> Result<(DataMap, BlobAddress), Error> {
let mut is_original_data = true;
let (data_map, blob) = loop {
let blob_storage = BlobStorageDryRun::new(privately_owned);
let self_encryptor =
SelfEncryptor::new(blob_storage, DataMap::None).map_err(Error::SelfEncryption)?;
self_encryptor
.write(&data, 0)
.await
.map_err(Error::SelfEncryption)?;
let (data_map, _) = self_encryptor
.close()
.await
.map_err(Error::SelfEncryption)?;
let blob_content = if is_original_data {
is_original_data = false;
serialize(&DataMapLevel::Root(data_map.clone()))?
} else {
serialize(&DataMapLevel::Child(data_map.clone()))?
};
let blob: Blob = if let Some(owner) = privately_owned {
PrivateBlob::new(blob_content, owner).into()
} else {
PublicBlob::new(blob_content).into()
};
if blob.validate_size() {
break (data_map, blob);
} else {
data = serialize(&blob)?;
}
};
Ok((data_map, *blob.address()))
}
async fn write_to_network(&self, data: &[u8], public: bool) -> Result<DataMap, Error> {
let blob_storage = BlobStorage::new(self.clone(), public);
let self_encryptor = SelfEncryptor::new(blob_storage.clone(), DataMap::None)
.map_err(Error::SelfEncryption)?;
self_encryptor
.write(data, 0)
.await
.map_err(Error::SelfEncryption)?;
let (data_map, _) = self_encryptor
.close()
.await
.map_err(Error::SelfEncryption)?;
Ok(data_map)
}
async fn read_using_data_map(
&self,
data_map: DataMap,
public: bool,
position: Option<usize>,
len: Option<usize>,
) -> Result<Vec<u8>, Error> {
let blob_storage = BlobStorage::new(self.clone(), public);
let self_encryptor =
SelfEncryptor::new(blob_storage, data_map).map_err(Error::SelfEncryption)?;
let length = match len {
None => self_encryptor.len().await,
Some(request_length) => request_length,
};
let read_position = position.unwrap_or(0);
match self_encryptor.read(read_position, length).await {
Ok(data) => Ok(data),
Err(error) => Err(Error::SelfEncryption(error)),
}
}
async fn delete_using_data_map(&self, data_map: DataMap) -> Result<(), Error> {
let blob_storage = BlobStorage::new(self.clone(), false);
let self_encryptor =
SelfEncryptor::new(blob_storage, data_map).map_err(Error::SelfEncryption)?;
match self_encryptor.delete().await {
Ok(_) => Ok(()),
Err(error) => Err(Error::SelfEncryption(error)),
}
}
async fn pack(&self, mut contents: Vec<u8>, public: bool) -> Result<Blob, Error> {
loop {
let data: Blob = if public {
PublicBlob::new(contents).into()
} else {
PrivateBlob::new(contents, self.public_key().await).into()
};
if data.validate_size() {
return Ok(data);
} else {
let serialized_blob = serialize(&data)?;
let data_map = self.write_to_network(&serialized_blob, public).await?;
contents = serialize(&DataMapLevel::Child(data_map))?;
}
}
}
async fn unpack(&self, mut data: Blob) -> Result<DataMap, Error> {
loop {
let public = data.is_public();
match deserialize(data.value())? {
DataMapLevel::Root(data_map) => {
return Ok(data_map);
}
DataMapLevel::Child(data_map) => {
let serialized_blob = self
.read_using_data_map(data_map, public, None, None)
.await?;
data = deserialize(&serialized_blob)?;
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::{Blob, BlobAddress, Client, DataMap, DataMapLevel, Error};
use crate::utils::{generate_random_vector, test_utils::create_test_client};
use crate::{client::blob_storage::BlobStorage, retry_loop};
use anyhow::{anyhow, bail, Result};
use bincode::deserialize;
use self_encryption::Storage;
use sn_data_types::{PrivateBlob, PublicBlob, Token};
use sn_messaging::client::Error as ErrorMessage;
use std::str::FromStr;
#[tokio::test]
pub async fn public_blob_test() -> Result<()> {
let client = create_test_client().await?;
let value = generate_random_vector::<u8>(10);
let (_, expected_address) = Client::blob_data_map(value.clone(), None).await?;
match client.read_blob(expected_address, None, None).await {
Ok(data) => bail!("Public Blob should not exist yet: {:?}", data),
Err(Error::ErrorMessage {
source: ErrorMessage::DataNotFound(_),
..
}) => (),
Err(e) => bail!("Unexpected: {:?}", e),
}
let pub_address = client.store_public_blob(&value).await?;
assert_eq!(expected_address, pub_address);
let fetched_data = retry_loop!(client.read_blob(pub_address, None, None));
assert_eq!(value, fetched_data);
let addr = client.store_public_blob(&value).await?;
assert_eq!(addr, pub_address);
Ok(())
}
#[tokio::test]
pub async fn private_blob_test() -> Result<()> {
let client = create_test_client().await?;
let value = generate_random_vector::<u8>(10);
let owner = client.public_key().await;
let (_, expected_address) = Client::blob_data_map(value.clone(), Some(owner)).await?;
match client.read_blob(expected_address, None, None).await {
Ok(_) => bail!("Private Blob should not exist yet"),
Err(Error::ErrorMessage {
source: ErrorMessage::DataNotFound(_),
..
}) => (),
Err(e) => bail!("Expecting DataNotFound error, got: {:?}", e),
}
let priv_address = client.store_private_blob(&value).await?;
assert_eq!(expected_address, priv_address);
let fetched_data = retry_loop!(client.read_blob(priv_address, None, None));
assert_eq!(value, fetched_data);
let addr = client.store_private_blob(&value).await?;
assert_eq!(addr, priv_address);
let pub_address = client.store_public_blob(&value).await?;
let fetched_data = retry_loop!(client.read_blob(pub_address, None, None));
assert_eq!(value, fetched_data);
client.delete_blob(priv_address).await?;
let mut attempts = 3u8;
while client.read_blob(priv_address, None, None).await.is_ok() {
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
if attempts == 0 {
bail!("The private Blob was not deleted: {:?}", priv_address);
} else {
attempts -= 1;
}
}
let new_addr = client.store_private_blob(&value).await?;
assert_eq!(new_addr, priv_address);
let fetched_data = retry_loop!(client.read_blob(priv_address, None, None));
assert_eq!(value, fetched_data);
Ok(())
}
#[tokio::test]
pub async fn private_delete_large() -> Result<()> {
let client = create_test_client().await?;
let value = generate_random_vector::<u8>(1024 * 1024);
let address = client.store_private_blob(&value).await?;
let _ = retry_loop!(client.read_blob(address, None, None));
let fetched_data = retry_loop!(client.fetch_blob_from_network(address));
let root_data_map = match deserialize(fetched_data.value())? {
DataMapLevel::Root(data_map) => data_map,
DataMapLevel::Child(data_map) => bail!(
"A DataMapLevel::Child data-map was unexpectedly returned: {:?}",
data_map
),
};
client.delete_blob(address).await?;
let mut blob_storage = BlobStorage::new(client, false);
if let DataMap::Chunks(chunks) = root_data_map {
for chunk in chunks {
let mut attempts = 3u8;
while blob_storage.get(&chunk.hash).await.is_ok() {
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
if attempts == 0 {
bail!(
"At least one of the chunks of the private Blob was not deleted: {:?}",
chunk.hash
);
} else {
attempts -= 1;
}
}
}
Ok(())
} else {
Err(anyhow!(
"It didn't return DataMap::Chunks, instead: {:?}",
root_data_map
))
}
}
#[tokio::test]
pub async fn blob_deletions_should_cost_put_price() -> Result<()> {
let client = create_test_client().await?;
let address = client
.store_private_blob(&generate_random_vector::<u8>(10))
.await?;
let _ = retry_loop!(client.read_blob(address, None, None));
let balance_before_delete = client.get_balance().await?;
client.delete_blob(address).await?;
let new_balance = client.get_balance().await?;
assert_ne!(balance_before_delete, Token::from_str("0")?);
assert_ne!(balance_before_delete, new_balance);
Ok(())
}
#[tokio::test]
pub async fn create_and_retrieve_1kb_pub_unencrypted() -> Result<()> {
let size = 1024;
gen_data_then_create_and_retrieve(size, true).await?;
Ok(())
}
#[tokio::test]
pub async fn create_and_retrieve_1kb_private_unencrypted() -> Result<()> {
let size = 1024;
gen_data_then_create_and_retrieve(size, false).await?;
Ok(())
}
#[tokio::test]
pub async fn create_and_retrieve_1kb_put_pub_retrieve_private() -> Result<()> {
let size = 1024;
let data = generate_random_vector(size);
let client = create_test_client().await?;
let address = client.store_public_blob(&data).await?;
let _ = retry_loop!(client.read_blob(address, None, None));
let res = client
.read_blob(BlobAddress::Private(*address.name()), None, None)
.await;
assert!(res.is_err());
Ok(())
}
#[tokio::test]
pub async fn create_and_retrieve_1kb_put_private_retrieve_pub() -> Result<()> {
let size = 1024;
let value = generate_random_vector(size);
let client = create_test_client().await?;
let address = client.store_private_blob(&value).await?;
let _ = retry_loop!(client.read_blob(address, None, None));
let res = client
.read_blob(BlobAddress::Public(*address.name()), None, None)
.await;
assert!(res.is_err());
Ok(())
}
#[tokio::test]
pub async fn create_and_retrieve_10mb_private() -> Result<()> {
let size = 1024 * 1024 * 10;
gen_data_then_create_and_retrieve(size, false).await?;
Ok(())
}
#[tokio::test]
pub async fn create_and_retrieve_10mb_public() -> Result<()> {
let size = 1024 * 1024 * 10;
gen_data_then_create_and_retrieve(size, true).await?;
Ok(())
}
#[tokio::test]
pub async fn create_and_retrieve_index_based() -> Result<()> {
create_and_index_based_retrieve(1024).await
}
async fn create_and_index_based_retrieve(size: usize) -> Result<()> {
let data = generate_random_vector(size);
let client = create_test_client().await?;
let address = client.store_public_blob(&data).await?;
let fetched_data = retry_loop!(client.read_blob(address, None, Some(size / 2)));
assert_eq!(fetched_data, data[0..size / 2].to_vec());
let data = generate_random_vector(size);
let client = create_test_client().await?;
let address = client.store_public_blob(&data).await?;
let fetched_data = retry_loop!(client.read_blob(address, Some(size / 2), Some(size / 2)));
assert_eq!(fetched_data, data[size / 2..size].to_vec());
Ok(())
}
#[allow(clippy::match_wild_err_arm)]
async fn gen_data_then_create_and_retrieve(size: usize, public: bool) -> Result<()> {
let raw_data = generate_random_vector(size);
let client = create_test_client().await?;
let blob = if public {
Blob::Public(PublicBlob::new(raw_data.clone()))
} else {
Blob::Private(PrivateBlob::new(
raw_data.clone(),
client.public_key().await,
))
};
let address_before = blob.address();
let res = client.read_blob(*address_before, None, None).await;
match res {
Err(Error::ErrorMessage {
source: ErrorMessage::DataNotFound(_),
..
}) => (),
Ok(_) => bail!("Blob unexpectedly retrieved using address generated by gen_data_map"),
Err(_) => bail!(
"Unexpected error when Blob retrieved using address generated by gen_data_map"
),
};
let address = if public {
client.store_public_blob(&raw_data).await?
} else {
client.store_private_blob(&raw_data).await?
};
let fetched_data = retry_loop!(client.read_blob(address, None, None));
assert_eq!(fetched_data, raw_data);
let privately_owned = if public {
None
} else {
Some(client.public_key().await)
};
let (_, blob_address) = Client::blob_data_map(raw_data, privately_owned).await?;
assert_eq!(blob_address, address);
Ok(())
}
}