use super::{
blob_storage::{BlobStorage, BlobStorageDryRun},
Client,
};
use crate::Error;
use bincode::{deserialize, serialize};
use log::{info, trace};
use self_encryption::{DataMap, SelfEncryptor};
use serde::{Deserialize, Serialize};
use sn_data_types::{Blob, BlobAddress, PrivateBlob, PublicBlob, PublicKey};
use sn_messaging::client::{BlobRead, BlobWrite, DataCmd, DataQuery, Query, QueryResponse};
#[derive(Serialize, Deserialize)]
enum DataMapLevel {
Root(DataMap),
Child(DataMap),
}
impl Client {
pub async fn read_blob(
&self,
address: BlobAddress,
position: Option<usize>,
len: Option<usize>,
) -> Result<Vec<u8>, Error>
where
Self: Sized,
{
trace!(
"Fetch Blob: {:?} Position: {:?} Len: {:?}",
&address,
&position,
&len
);
let data = self.fetch_blob_from_network(address).await?;
let public = address.is_public();
let data_map = self.unpack(data).await?;
let raw_data = self
.read_using_data_map(data_map, public, position, len)
.await?;
Ok(raw_data)
}
pub async fn store_public_blob(&self, data: &[u8]) -> Result<BlobAddress, Error> {
self.create_new_blob(data, true).await
}
pub async fn store_private_blob(&self, data: &[u8]) -> Result<BlobAddress, Error> {
self.create_new_blob(data, false).await
}
async fn create_new_blob(&self, data: &[u8], public: bool) -> Result<BlobAddress, Error> {
let data_map = self.write_to_network(data, public).await?;
let blob_content = serialize(&DataMapLevel::Root(data_map))?;
let blob = self.pack(blob_content, public).await?;
let blob_address = *blob.address();
self.store_blob_on_network(blob).await?;
Ok(blob_address)
}
pub(crate) async fn fetch_blob_from_network(
&self,
address: BlobAddress,
) -> Result<Blob, Error> {
let res = self
.send_query(Query::Data(DataQuery::Blob(BlobRead::Get(address))))
.await?;
let msg_id = res.msg_id;
let data: Blob = match res.response {
QueryResponse::GetBlob(result) => result.map_err(|err| Error::from((err, msg_id))),
_ => return Err(Error::ReceivedUnexpectedEvent),
}?;
Ok(data)
}
pub(crate) async fn delete_blob_from_network(&self, address: BlobAddress) -> Result<(), Error> {
let cmd = DataCmd::Blob(BlobWrite::DeletePrivate(address));
self.pay_and_send_data_command(cmd).await?;
Ok(())
}
pub(crate) async fn store_blob_on_network(&self, blob: Blob) -> Result<(), Error> {
if !blob.validate_size() {
return Err(Error::NetworkDataError(sn_data_types::Error::ExceededSize));
}
let cmd = DataCmd::Blob(BlobWrite::New(blob));
self.pay_and_send_data_command(cmd).await?;
Ok(())
}
pub async fn delete_blob(&self, address: BlobAddress) -> Result<(), Error> {
info!("Deleting blob at given address: {:?}", address);
let mut data = self.fetch_blob_from_network(address).await?;
self.delete_blob_from_network(address).await?;
loop {
match deserialize(data.value())? {
DataMapLevel::Root(data_map) => {
self.delete_using_data_map(data_map).await?;
return Ok(());
}
DataMapLevel::Child(data_map) => {
let serialized_blob = self
.read_using_data_map(data_map.clone(), false, None, None)
.await?;
self.delete_using_data_map(data_map).await?;
data = deserialize(&serialized_blob)?;
}
}
}
}
pub async fn blob_data_map(
mut data: Vec<u8>,
privately_owned: Option<PublicKey>,
) -> Result<(DataMap, BlobAddress), Error> {
let mut is_original_data = true;
let (data_map, blob) = loop {
let blob_storage = BlobStorageDryRun::new(privately_owned);
let self_encryptor =
SelfEncryptor::new(blob_storage, DataMap::None).map_err(Error::SelfEncryption)?;
self_encryptor
.write(&data, 0)
.await
.map_err(Error::SelfEncryption)?;
let (data_map, _) = self_encryptor
.close()
.await
.map_err(Error::SelfEncryption)?;
let blob_content = if is_original_data {
is_original_data = false;
serialize(&DataMapLevel::Root(data_map.clone()))?
} else {
serialize(&DataMapLevel::Child(data_map.clone()))?
};
let blob: Blob = if let Some(owner) = privately_owned {
PrivateBlob::new(blob_content, owner).into()
} else {
PublicBlob::new(blob_content).into()
};
if blob.validate_size() {
break (data_map, blob);
} else {
data = serialize(&blob)?;
}
};
Ok((data_map, *blob.address()))
}
async fn write_to_network(&self, data: &[u8], public: bool) -> Result<DataMap, Error> {
let blob_storage = BlobStorage::new(self.clone(), public);
let self_encryptor = SelfEncryptor::new(blob_storage.clone(), DataMap::None)
.map_err(Error::SelfEncryption)?;
self_encryptor
.write(data, 0)
.await
.map_err(Error::SelfEncryption)?;
let (data_map, _) = self_encryptor
.close()
.await
.map_err(Error::SelfEncryption)?;
Ok(data_map)
}
async fn read_using_data_map(
&self,
data_map: DataMap,
public: bool,
position: Option<usize>,
len: Option<usize>,
) -> Result<Vec<u8>, Error> {
let blob_storage = BlobStorage::new(self.clone(), public);
let self_encryptor =
SelfEncryptor::new(blob_storage, data_map).map_err(Error::SelfEncryption)?;
let length = match len {
None => self_encryptor.len().await,
Some(request_length) => request_length,
};
let read_position = position.unwrap_or(0);
match self_encryptor.read(read_position, length).await {
Ok(data) => Ok(data),
Err(error) => Err(Error::SelfEncryption(error)),
}
}
async fn delete_using_data_map(&self, data_map: DataMap) -> Result<(), Error> {
let blob_storage = BlobStorage::new(self.clone(), false);
let self_encryptor =
SelfEncryptor::new(blob_storage, data_map).map_err(Error::SelfEncryption)?;
match self_encryptor.delete().await {
Ok(_) => Ok(()),
Err(error) => Err(Error::SelfEncryption(error)),
}
}
async fn pack(&self, mut contents: Vec<u8>, public: bool) -> Result<Blob, Error> {
loop {
let data: Blob = if public {
PublicBlob::new(contents).into()
} else {
PrivateBlob::new(contents, self.public_key().await).into()
};
if data.validate_size() {
return Ok(data);
} else {
let serialized_blob = serialize(&data)?;
let data_map = self.write_to_network(&serialized_blob, public).await?;
contents = serialize(&DataMapLevel::Child(data_map))?;
}
}
}
async fn unpack(&self, mut data: Blob) -> Result<DataMap, Error> {
loop {
let public = data.is_public();
match deserialize(data.value())? {
DataMapLevel::Root(data_map) => {
return Ok(data_map);
}
DataMapLevel::Child(data_map) => {
let serialized_blob = self
.read_using_data_map(data_map, public, None, None)
.await?;
data = deserialize(&serialized_blob)?;
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::{Blob, BlobAddress, Client, DataMap, DataMapLevel, Error};
use crate::client::blob_storage::BlobStorage;
use crate::utils::{
generate_random_vector, test_utils::create_test_client, test_utils::gen_ed_keypair,
};
use anyhow::{bail, Result};
use bincode::deserialize;
use self_encryption::Storage;
use sn_data_types::{PrivateBlob, PublicBlob, Token};
use sn_messaging::client::Error as ErrorMessage;
use std::str::FromStr;
use tokio::time::{sleep, Duration};
#[tokio::test]
pub async fn pub_blob_test() -> Result<()> {
let client = create_test_client().await?;
let value = generate_random_vector::<u8>(10);
let data = Blob::Public(PublicBlob::new(value.clone()));
let address = *data.address();
let _pk = gen_ed_keypair().public_key();
let res = client
.read_blob(address, None, None)
.await;
match res {
Ok(data) => bail!("Pub blob should not exist yet: {:?}", data),
Err(Error::ErrorMessage {
source: ErrorMessage::DataNotFound(_),
..
}) => (),
Err(e) => bail!("Unexpected: {:?}", e),
}
let address = client.store_public_blob(&value).await?;
let mut fetched_data = client.read_blob(address, None, None).await;
while fetched_data.is_err() {
sleep(Duration::from_millis(200)).await;
fetched_data = client.read_blob(address, None, None).await;
}
assert_eq!(value, fetched_data?);
Ok(())
}
#[tokio::test]
pub async fn unpub_blob_test() -> Result<()> {
let client = create_test_client().await?;
let pk = client.public_key().await;
let value = generate_random_vector::<u8>(10);
let data = Blob::Private(PrivateBlob::new(value.clone(), pk));
let address = *data.address();
let res = client
.read_blob(address, None, None)
.await;
match res {
Ok(_) => bail!("Private blob should not exist yet"),
Err(Error::ErrorMessage {
source: ErrorMessage::DataNotFound(_),
..
}) => (),
Err(e) => bail!("Unexpected: {:?}", e),
}
let address = client.store_private_blob(&value).await?;
let mut res = client.read_blob(address, None, None).await;
while res.is_err() {
sleep(Duration::from_millis(200)).await;
res = client.read_blob(address, None, None).await;
}
let _ = client.store_private_blob(&value).await;
let _ = match tokio::time::timeout(
std::time::Duration::from_secs(60),
client.notification_receiver.clone().lock().await.recv(),
)
.await
{
Ok(Some(Error::ErrorMessage {
source: ErrorMessage::DataExists,
..
})) => {
}
Ok(Some(error)) => bail!("Expecting DataExists error got: {:?}", error),
Ok(None) => bail!("Expecting DataExists Error, got None"),
Err(_) => bail!("Timeout when expecting DataExists error"),
};
let pub_address = client.store_public_blob(&value).await?;
let mut fetched_data = client.read_blob(pub_address, None, None).await;
while fetched_data.is_err() {
sleep(Duration::from_millis(200)).await;
fetched_data = client.read_blob(pub_address, None, None).await;
}
client.delete_blob(address).await?;
let mut fetched_data = client.read_blob(address, None, None).await;
while fetched_data.is_ok() {
sleep(Duration::from_millis(200)).await;
fetched_data = client.read_blob(address, None, None).await;
}
let _ = client.store_private_blob(&value).await?;
Ok(())
}
#[tokio::test]
pub async fn unpub_delete_large() -> Result<()> {
let client = create_test_client().await?;
let value = generate_random_vector::<u8>(1024 * 1024);
let address = client.store_private_blob(&value).await?;
let mut res = client.fetch_blob_from_network(address).await;
while res.is_err() {
sleep(Duration::from_millis(200)).await;
res = client.fetch_blob_from_network(address).await;
}
let root_data_map = match deserialize(res?.value())? {
DataMapLevel::Root(data_map) | DataMapLevel::Child(data_map) => data_map,
};
client.delete_blob(address).await?;
let mut blob_storage = BlobStorage::new(client, false);
match &root_data_map {
DataMap::Chunks(chunks) => {
for chunk in chunks {
while blob_storage.get(&chunk.hash).await.is_ok() {
sleep(Duration::from_millis(500)).await;
}
}
}
DataMap::None | DataMap::Content(_) => bail!("shall return DataMap::Chunks"),
}
Ok(())
}
#[tokio::test]
pub async fn blob_deletions_should_cost_put_price() -> Result<()> {
let client = create_test_client().await?;
let address = client
.store_private_blob(&generate_random_vector::<u8>(10))
.await?;
let mut res = client.fetch_blob_from_network(address).await;
while res.is_err() {
sleep(Duration::from_millis(200)).await;
res = client.fetch_blob_from_network(address).await;
}
let balance_before_delete = client.get_balance().await?;
client.delete_blob(address).await?;
let new_balance = client.get_balance().await?;
assert_ne!(balance_before_delete, Token::from_str("0")?);
assert_ne!(balance_before_delete, new_balance);
Ok(())
}
#[tokio::test]
pub async fn create_and_retrieve_1kb_pub_unencrypted() -> Result<()> {
let size = 1024;
gen_data_then_create_and_retrieve(size, true).await?;
Ok(())
}
#[tokio::test]
pub async fn create_and_retrieve_1kb_private_unencrypted() -> Result<()> {
let size = 1024;
gen_data_then_create_and_retrieve(size, false).await?;
Ok(())
}
#[tokio::test]
pub async fn create_and_retrieve_1kb_put_pub_retrieve_private() -> Result<()> {
let size = 1024;
let data = generate_random_vector(size);
let client = create_test_client().await?;
let address = client.store_public_blob(&data).await?;
let res = client
.read_blob(BlobAddress::Private(*address.name()), None, None)
.await;
assert!(res.is_err());
Ok(())
}
#[tokio::test]
pub async fn create_and_retrieve_1kb_put_private_retrieve_pub() -> Result<()> {
let size = 1024;
let value = generate_random_vector(size);
let client = create_test_client().await?;
let address = client.store_private_blob(&value).await?;
let res = client
.read_blob(BlobAddress::Public(*address.name()), None, None)
.await;
assert!(res.is_err());
Ok(())
}
#[tokio::test]
pub async fn create_and_retrieve_10mb_private() -> Result<()> {
let size = 1024 * 1024 * 10;
gen_data_then_create_and_retrieve(size, false).await?;
Ok(())
}
#[tokio::test]
pub async fn create_and_retrieve_10mb_public() -> Result<()> {
let size = 1024 * 1024 * 10;
gen_data_then_create_and_retrieve(size, true).await?;
Ok(())
}
#[tokio::test]
pub async fn create_and_retrieve_index_based() -> Result<()> {
create_and_index_based_retrieve(1024).await
}
async fn create_and_index_based_retrieve(size: usize) -> Result<()> {
let data = generate_random_vector(size);
{
let client = create_test_client().await?;
let address = client.store_public_blob(&data).await?;
let mut fetch_res = client.read_blob(address, None, Some(size / 2)).await;
while fetch_res.is_err() {
sleep(Duration::from_millis(200)).await;
fetch_res = client.read_blob(address, None, Some(size / 2)).await;
}
let fetched_data = fetch_res?;
assert_eq!(fetched_data, data[0..size / 2].to_vec());
}
let data = generate_random_vector(size);
{
let client = create_test_client().await?;
let address = client.store_public_blob(&data).await?;
let mut fetch_res = client
.read_blob(address, Some(size / 2), Some(size / 2))
.await;
while fetch_res.is_err() {
sleep(Duration::from_millis(200)).await;
fetch_res = client
.read_blob(address, Some(size / 2), Some(size / 2))
.await;
}
let fetched_data = fetch_res?;
assert_eq!(fetched_data, data[size / 2..size].to_vec());
}
Ok(())
}
#[allow(clippy::match_wild_err_arm)]
async fn gen_data_then_create_and_retrieve(size: usize, public: bool) -> Result<()> {
let raw_data = generate_random_vector(size);
let client = create_test_client().await?;
let blob = if public {
Blob::Public(PublicBlob::new(raw_data.clone()))
} else {
Blob::Private(PrivateBlob::new(
raw_data.clone(),
client.public_key().await,
))
};
let address_before = blob.address();
let res = client.read_blob(*address_before, None, None).await;
match res {
Err(Error::ErrorMessage {
source: ErrorMessage::DataNotFound(_),
..
}) => (),
Ok(_) => bail!("Blob unexpectedly retrieved using address generated by gen_data_map"),
Err(_) => bail!(
"Unexpected error when Blob retrieved using address generated by gen_data_map"
),
};
let address = if public {
client.store_public_blob(&raw_data).await?
} else {
client.store_private_blob(&raw_data).await?
};
let mut fetch_result;
fetch_result = client.read_blob(address, None, None).await;
while fetch_result.is_err() {
sleep(Duration::from_millis(200)).await;
fetch_result = client.read_blob(address, None, None).await;
}
assert_eq!(fetch_result?, raw_data);
let privately_owned = if public {
None
} else {
Some(client.public_key().await)
};
let (_, blob_address) = Client::blob_data_map(raw_data, privately_owned).await?;
assert_eq!(blob_address, address);
Ok(())
}
}