use super::{
data::{encrypt_large, to_chunk, LargeFile, SmallFile},
Client,
};
use crate::{api::data::DataMapLevel, Error, Result};
use sn_interface::{
messaging::data::{DataCmd, DataQueryVariant, QueryResponse},
types::{Chunk, ChunkAddress},
};
use bincode::deserialize;
use bytes::Bytes;
use futures::future::join_all;
use itertools::Itertools;
use self_encryption::{self, ChunkInfo, DataMap, EncryptedChunk};
use std::collections::BTreeMap;
use tokio::{task, time::sleep};
use tracing::trace;
use xor_name::XorName;
const CHUNKS_BATCH_MAX_SIZE: usize = 5;
#[derive(Debug)]
pub struct QueriedDataReplicas {
pub name: XorName,
pub outcomes: BTreeMap<usize, Result<()>>,
}
impl Client {
#[instrument(skip(self), level = "debug")]
pub async fn read_bytes(&self, address: XorName) -> Result<Bytes> {
let chunk = self.get_chunk(&address).await?;
if let Ok(data_map) = self.unpack_chunk(chunk.clone()).await {
self.read_all(data_map).await
} else {
Ok(chunk.value().clone())
}
}
#[instrument(skip(self), level = "debug")]
pub async fn read_bytes_from_replicas(
&self,
name: XorName,
replicas: &[usize],
) -> Result<Vec<QueriedDataReplicas>> {
let (datamap_replicas, found_chunk) = self.get_chunk_from_replicas(name, replicas).await?;
let mut chunks_replicas = vec![datamap_replicas];
if let Some(chunk) = found_chunk {
if let Ok(data_map) = self.unpack_chunk(chunk).await {
chunks_replicas.extend(
self.get_chunks_from_replicas(data_map.infos(), replicas)
.await?,
);
}
}
Ok(chunks_replicas)
}
#[instrument(skip_all, level = "trace")]
pub async fn read_from(&self, address: XorName, position: usize, length: usize) -> Result<Bytes>
where
Self: Sized,
{
trace!("Reading {length} bytes at: {address}, starting from position: {position}");
let chunk = self.get_chunk(&address).await?;
if let Ok(data_map) = self.unpack_chunk(chunk.clone()).await {
return self.seek(data_map, position, length).await;
}
let mut bytes = chunk.value().clone();
let _ = bytes.split_to(position);
bytes.truncate(length);
Ok(bytes)
}
#[instrument(skip(self), level = "trace")]
pub(crate) async fn get_chunk(&self, name: &XorName) -> Result<Chunk> {
if let Some(chunk) = self
.chunks_cache
.write()
.await
.find(|c| c.address().name() == name)
{
trace!("Chunk retrieved from local cache: {name:?}");
return Ok(chunk.clone());
}
let query = DataQueryVariant::GetChunk(ChunkAddress(*name));
let res = self.send_query(query.clone()).await?;
let chunk: Chunk = match res.response {
QueryResponse::GetChunk(result) => {
result.map_err(|err| Error::ErrorMsg { source: err })
}
response => return Err(Error::UnexpectedQueryResponse { query, response }),
}?;
let _ = self.chunks_cache.write().await.insert(chunk.clone());
Ok(chunk)
}
#[instrument(skip_all, level = "trace")]
pub fn chunk_bytes(bytes: Bytes) -> Result<(XorName, Vec<Chunk>)> {
match LargeFile::new(bytes.clone()) {
Ok(file) => Self::encrypt_large(file),
Err(Error::TooSmallForSelfEncryption { .. }) => {
let file = SmallFile::new(bytes)?;
let chunk = Self::package_small(file)?;
Ok((*chunk.name(), vec![chunk]))
}
Err(error) => Err(error),
}
}
#[instrument(skip(file), level = "trace")]
fn encrypt_large(file: LargeFile) -> Result<(XorName, Vec<Chunk>)> {
encrypt_large(file.bytes())
}
fn package_small(file: SmallFile) -> Result<Chunk> {
let chunk = to_chunk(file.bytes());
if chunk.value().len() >= self_encryption::MIN_ENCRYPTABLE_BYTES {
return Err(Error::SmallFilePaddingNeeded(chunk.value().len()));
}
Ok(chunk)
}
#[instrument(skip(self, bytes), level = "debug")]
pub async fn upload(&self, bytes: Bytes) -> Result<XorName> {
self.upload_bytes(bytes, false).await
}
#[instrument(skip_all, level = "trace")]
pub async fn upload_and_verify(&self, bytes: Bytes) -> Result<XorName> {
self.upload_bytes(bytes, true).await
}
#[instrument(skip(bytes), level = "debug")]
pub fn calculate_address(bytes: Bytes) -> Result<XorName> {
Self::chunk_bytes(bytes).map(|(name, _)| name)
}
#[instrument(skip(self, bytes), level = "trace")]
async fn upload_bytes(&self, bytes: Bytes, verify: bool) -> Result<XorName> {
match LargeFile::new(bytes.clone()) {
Ok(file) => self.upload_large(file, verify).await,
Err(Error::TooSmallForSelfEncryption { .. }) => {
let file = SmallFile::new(bytes)?;
self.upload_small(file, verify).await
}
Err(error) => Err(error),
}
}
#[instrument(skip_all, level = "trace")]
async fn upload_large(&self, large: LargeFile, verify: bool) -> Result<XorName> {
let (head_address, all_chunks) = Self::encrypt_large(large)?;
for next_batch in all_chunks.chunks(CHUNKS_BATCH_MAX_SIZE) {
self.session
.setup_connections_to_relevant_nodes(next_batch.iter().map(|c| *c.name()).collect())
.await?;
let tasks = next_batch.iter().cloned().map(|chunk| {
let client_clone = self.clone();
task::spawn(async move {
let chunk_addr = *chunk.address().name();
client_clone.send_cmd(DataCmd::StoreChunk(chunk)).await?;
if verify {
client_clone.verify_chunk_is_stored(chunk_addr).await?;
}
Ok::<(), Error>(())
})
});
let respones = join_all(tasks)
.await
.into_iter()
.flatten() .collect_vec();
for res in respones {
res?;
}
}
Ok(head_address)
}
#[instrument(skip_all, level = "trace")]
async fn upload_small(&self, small: SmallFile, verify: bool) -> Result<XorName> {
let chunk = Self::package_small(small)?;
let address = *chunk.name();
self.send_cmd(DataCmd::StoreChunk(chunk)).await?;
if verify {
self.verify_chunk_is_stored(address).await?;
}
Ok(address)
}
async fn verify_chunk_is_stored(&self, address: XorName) -> Result<()> {
let _chunk = tokio::time::timeout(self.query_timeout, async {
let mut response = self.get_chunk(&address).await;
if response.is_err() {
error!(
"Error when attempting to verify chunk was stored at {:?}: {:?}",
address, response
);
sleep(self.max_backoff_interval).await;
response = self.get_chunk(&address).await;
}
response
})
.await
.map_err(|_| Error::ChunkUploadValidationTimeout {
elapsed: self.query_timeout,
address,
})??;
Ok(())
}
async fn read_all(&self, data_map: DataMap) -> Result<Bytes> {
let encrypted_chunks = self.try_get_chunks(data_map.infos()).await?;
let bytes = self_encryption::decrypt_full_set(&data_map, &encrypted_chunks)?;
Ok(bytes)
}
#[instrument(skip_all, level = "trace")]
async fn seek(&self, data_map: DataMap, pos: usize, len: usize) -> Result<Bytes> {
let info = self_encryption::seek_info(data_map.file_size(), pos, len);
let range = &info.index_range;
let all_infos = data_map.infos();
let encrypted_chunks = self
.try_get_chunks(
(range.start..range.end + 1)
.clone()
.map(|i| all_infos[i].clone())
.collect_vec(),
)
.await?;
let bytes =
self_encryption::decrypt_range(&data_map, &encrypted_chunks, info.relative_pos, len)?;
Ok(bytes)
}
#[instrument(skip_all, level = "trace")]
async fn try_get_chunks(&self, chunks_info: Vec<ChunkInfo>) -> Result<Vec<EncryptedChunk>> {
let expected_count = chunks_info.len();
let mut retrieved_chunks = vec![];
for next_batch in chunks_info.chunks(CHUNKS_BATCH_MAX_SIZE) {
let tasks = next_batch.iter().cloned().map(|chunk_info| {
let client = self.clone();
task::spawn(async move {
match client.get_chunk(&chunk_info.dst_hash).await {
Ok(chunk) => Ok(EncryptedChunk {
index: chunk_info.index,
content: chunk.value().clone(),
}),
Err(err) => {
warn!(
"Reading chunk {} from network, resulted in error {err:?}.",
chunk_info.dst_hash
);
Err(err)
}
}
})
});
retrieved_chunks.extend(join_all(tasks).await.into_iter().flatten().flatten());
}
if expected_count > retrieved_chunks.len() {
Err(Error::NotEnoughChunksRetrieved {
expected: expected_count,
retrieved: retrieved_chunks.len(),
})
} else {
Ok(retrieved_chunks)
}
}
async fn get_chunk_from_replicas(
&self,
name: XorName,
replicas: &[usize],
) -> Result<(QueriedDataReplicas, Option<Chunk>)> {
let mut chunk_replicas = QueriedDataReplicas {
name,
outcomes: BTreeMap::new(),
};
let mut found_chunk = None;
let query = DataQueryVariant::GetChunk(ChunkAddress(name));
let results = self.send_query_to_replicas(query.clone(), replicas).await?;
for (replica_index, res) in results {
let outcome = res.map(|query_result| match query_result.response {
QueryResponse::GetChunk(Ok(chunk)) => {
found_chunk = Some(chunk);
Ok(())
}
QueryResponse::GetChunk(Err(err)) => Err(Error::ErrorMsg { source: err }),
other => Err(Error::UnexpectedQueryResponse {
query: query.clone(),
response: other,
}),
})?;
let _ = chunk_replicas.outcomes.insert(replica_index, outcome);
}
Ok((chunk_replicas, found_chunk))
}
#[instrument(skip_all, level = "trace")]
async fn get_chunks_from_replicas(
&self,
chunks_info: Vec<ChunkInfo>,
replicas: &[usize],
) -> Result<Vec<QueriedDataReplicas>> {
let mut chunks_replicas = vec![];
for next_batch in chunks_info.chunks(CHUNKS_BATCH_MAX_SIZE) {
let tasks = next_batch.iter().cloned().map(|chunk_info| {
let client = self.clone();
let replicas_indexes = replicas.to_vec();
task::spawn(async move {
let name = chunk_info.dst_hash;
client
.get_chunk_from_replicas(name, &replicas_indexes)
.await
.map(|(chunk_replicas, _)| chunk_replicas)
})
});
for result in join_all(tasks).await.into_iter().flatten() {
chunks_replicas.push(result?);
}
}
Ok(chunks_replicas)
}
#[instrument(skip_all, level = "trace")]
async fn unpack_chunk(&self, mut chunk: Chunk) -> Result<DataMap> {
loop {
match deserialize(chunk.value())? {
DataMapLevel::First(data_map) => {
return Ok(data_map);
}
DataMapLevel::Additional(data_map) => {
let serialized_chunk = self.read_all(data_map).await?;
chunk = deserialize(&serialized_chunk)?;
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::LargeFile;
use crate::{
utils::test_utils::{create_test_client, init_logger, try_create_test_client},
Client,
};
use self_encryption::MIN_ENCRYPTABLE_BYTES;
use sn_interface::types::{log_markers::LogMarker, utils::random_bytes};
use bytes::Bytes;
use eyre::{eyre, Result};
use futures::future::join_all;
use tokio::time::Instant;
use tracing::{instrument::Instrumented, Instrument};
use xor_name::XorName;
#[test]
#[cfg(feature = "limit-client-upload-size")]
fn limits_upload_size() -> Result<()> {
use super::Error;
use assert_matches::assert_matches;
let too_large_file = random_bytes(LargeFile::CLIENT_UPLOAD_SIZE_LIMIT + 1);
assert_matches!(
Client::chunk_bytes(too_large_file),
Err(Error::UploadSizeLimitExceeded { .. })
);
let ok_file_size = random_bytes(LargeFile::CLIENT_UPLOAD_SIZE_LIMIT);
assert_matches!(Client::chunk_bytes(ok_file_size), Ok(_));
Ok(())
}
#[test]
fn deterministic_chunking() -> Result<()> {
init_logger();
let file = random_bytes(MIN_ENCRYPTABLE_BYTES);
use crate::api::data::encrypt_large;
let (first_address, mut first_chunks) = encrypt_large(file.clone())?;
first_chunks.sort();
for _ in 0..100 {
let (head_address, mut all_chunks) = encrypt_large(file.clone())?;
assert_eq!(first_address, head_address);
all_chunks.sort();
assert_eq!(first_chunks, all_chunks);
}
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn store_and_read_3kb() -> Result<()> {
init_logger();
let _start_span = tracing::info_span!("store_and_read_3kb").entered();
let client = create_test_client().await?;
let file = LargeFile::new(random_bytes(MIN_ENCRYPTABLE_BYTES))?;
let address = client.upload_and_verify(file.bytes()).await?;
let read_data = client.read_bytes(address).await?;
compare(file.bytes(), read_data);
let reupload_address = client
.upload_large(file.clone(), false)
.instrument(tracing::info_span!(
"checking no conflict on same private upload"
))
.await?;
assert_eq!(address, reupload_address);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn seek_with_unknown_length() -> Result<()> {
init_logger();
let _outer_span = tracing::info_span!("seek_with_unknown_length").entered();
let client = create_test_client().await?;
let size = 2 * MIN_ENCRYPTABLE_BYTES;
let file = LargeFile::new(random_bytes(size))?;
let address = client.upload_and_verify(file.bytes()).await?;
let pos = 512;
let read_data = read_from_pos(&file, address, pos, usize::MAX, &client).await?;
assert_eq!(read_data.len(), size - pos);
compare(file.bytes().split_off(pos), read_data);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn seek_in_data() -> Result<()> {
init_logger();
let _outer_span = tracing::info_span!("seek_in_data").entered();
let client = create_test_client().await?;
for i in 1..5 {
let size = i * MIN_ENCRYPTABLE_BYTES;
let _outer_span = tracing::info_span!("size:", size).entered();
for divisor in 2..5 {
let _outer_span = tracing::info_span!("divisor", divisor).entered();
let len = size / divisor;
let file = LargeFile::new(random_bytes(size))?;
let address = client.upload_and_verify(file.bytes()).await?;
let read_data_1 = {
let pos = 0;
read_from_pos(&file, address, pos, len, &client).await?
};
let read_data_2 = {
let pos = len;
read_from_pos(&file, address, pos, len, &client).await?
};
let read_data: Bytes = [read_data_1, read_data_2]
.iter()
.flat_map(|bytes| bytes.clone())
.collect();
compare(file.bytes().slice(0..(2 * len)), read_data);
}
}
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn store_and_read_5mb_from_many_clients() -> Result<()> {
init_logger();
let _start_span = tracing::info_span!("store_and_read_5mb_from_many_clients").entered();
let uploader = create_test_client().await?;
let bytes = random_bytes(5 * 1024 * 1024);
let file = LargeFile::new(bytes)?;
let address = uploader.upload_and_verify(file.bytes()).await?;
debug!("======> Data uploaded");
let concurrent_client_count = 25;
let clients = create_clients(concurrent_client_count).await?;
assert_eq!(concurrent_client_count, clients.len());
let mut tasks = vec![];
for client in clients {
let handle: Instrumented<tokio::task::JoinHandle<Result<()>>> =
tokio::spawn(async move {
match client.read_bytes(address).await {
Ok(_data) => {
debug!("client #{:?} got the data", client.public_key());
}
Err(err) => {
debug!(
"client #{:?} failed to get the data: {:?}",
client.public_key(),
err
);
}
}
Ok(())
})
.in_current_span();
tasks.push(handle);
}
let results = join_all(tasks).await;
for res in results {
res??;
}
Ok(())
}
async fn create_clients(count: usize) -> Result<Vec<Client>> {
let mut tasks = vec![];
for i in 0..count {
debug!("starting client on thread #{:?}", i);
let handle: Instrumented<tokio::task::JoinHandle<Result<Client>>> =
tokio::spawn(async move {
debug!("starting client #{:?}..", i);
let client = create_test_client().await;
debug!("client #{:?} created ok?: {}", i, client.is_ok());
client
})
.in_current_span();
tasks.push(handle);
}
let (clients, errors) = join_all(tasks).await.into_iter().flatten().fold(
(vec![], vec![]),
|(mut clients, mut errors), result| {
match result {
Ok(client) => clients.push(client),
Err(err) => errors.push(err.to_string()),
}
(clients, errors)
},
);
if errors.is_empty() {
Ok(clients)
} else {
Err(eyre!(
"Failed to create {} out of {} clients, due to the following respective errors: {:?}",
errors.len(), count,
errors
))
}
}
#[tokio::test(flavor = "multi_thread")]
#[ignore = "Testnet network_assert_ tests should be excluded from normal tests runs, they need to be run in sequence to ensure validity of checks"]
async fn file_network_assert_expected_log_counts() -> Result<()> {
init_logger();
let _outer_span = tracing::info_span!("file_network_assert").entered();
let network_assert_delay: u64 = std::env::var("NETWORK_ASSERT_DELAY")
.unwrap_or_else(|_| "3".to_string())
.parse()?;
let delay = tokio::time::Duration::from_secs(network_assert_delay);
debug!("Running network asserts with delay of {:?}", delay);
let bytes = random_bytes(MIN_ENCRYPTABLE_BYTES / 3);
let client = create_test_client().await?;
let mut the_logs = crate::testnet_grep::NetworkLogState::new()?;
let _address = client.upload_and_verify(bytes.clone()).await?;
tokio::time::sleep(delay).await;
the_logs.assert_count(LogMarker::DataStoreReceivedAtElder, 3)?;
the_logs.assert_count(LogMarker::StoringChunk, 12)?;
the_logs.assert_count(LogMarker::StoredNewChunk, 12)?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn store_and_read_1kb() {
init_logger();
let size = MIN_ENCRYPTABLE_BYTES / 3;
let _outer_span = tracing::info_span!("store_and_read_1kb", size).entered();
let client = try_create_test_client().await;
store_and_read(&client, size).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn store_and_read_1mb() {
init_logger();
let _outer_span = tracing::info_span!("store_and_read_1mb").entered();
let client = try_create_test_client().await;
store_and_read(&client, 1024 * 1024).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn ae_checks_file_test() {
init_logger();
let _outer_span = tracing::info_span!("ae_checks_file_test").entered();
let client = try_create_test_client().await;
store_and_read(&client, 10 * 1024 * 1024).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn store_and_read_10mb() {
init_logger();
let _outer_span = tracing::info_span!("store_and_read_10mb").entered();
let client = try_create_test_client().await;
store_and_read(&client, 10 * 1024 * 1024).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn store_and_read_20mb() {
init_logger();
let _outer_span = tracing::info_span!("store_and_read_20mb").entered();
let client = try_create_test_client().await;
store_and_read(&client, 20 * 1024 * 1024).await;
}
#[tokio::test(flavor = "multi_thread")]
#[ignore = "too heavy for CI"]
async fn store_and_read_40mb() {
init_logger();
let _outer_span = tracing::info_span!("store_and_read_40mb").entered();
let client = try_create_test_client().await;
store_and_read(&client, 40 * 1024 * 1024).await;
}
#[tokio::test(flavor = "multi_thread")]
#[ignore = "too heavy for CI"]
async fn store_and_read_100mb() {
init_logger();
let _outer_span = tracing::info_span!("store_and_read_100mb").entered();
let client = try_create_test_client().await;
store_and_read(&client, 100 * 1024 * 1024).await;
}
#[tokio::test(flavor = "multi_thread")]
#[ignore = "too heavy for CI"]
async fn parallel_timings() -> Result<()> {
init_logger();
let _outer_span = tracing::info_span!("parallel_timings").entered();
let client = create_test_client().await?;
let handles = (0..1000_usize)
.map(|i| (i, client.clone()))
.map(|(i, client)| {
tokio::spawn(async move {
let file = LargeFile::new(random_bytes(MIN_ENCRYPTABLE_BYTES))?;
let _ = client.upload_large(file, false).await?;
println!("Iter: {}", i);
let res: Result<()> = Ok(());
res
})
});
let results = join_all(handles).await;
for res1 in results {
if let Ok(res2) = res1 {
if res2.is_err() {
println!("Error: {:?}", res2);
}
} else {
println!("Error: {:?}", res1);
}
}
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
#[ignore = "too heavy for CI"]
async fn one_by_one_timings() -> Result<()> {
init_logger();
let _outer_span = tracing::info_span!("test__one_by_one_timings").entered();
let client = create_test_client().await?;
for i in 0..1000_usize {
let file = LargeFile::new(random_bytes(MIN_ENCRYPTABLE_BYTES))?;
let now = Instant::now();
let _ = client.upload_large(file, false).await?;
let elapsed = now.elapsed();
println!("Iter: {}, in {} millis", i, elapsed.as_millis());
}
Ok(())
}
async fn store_and_read(client: &Client, size: usize) {
let _ = tracing::info_span!("store_and_read_bytes", size).entered();
let bytes = random_bytes(size);
let expected_address =
Client::calculate_address(bytes.clone()).expect("Failed to calculate file address");
let address = client
.upload_and_verify(bytes.clone())
.await
.expect("Failed to upload and verify file");
assert_eq!(
address, expected_address,
"expected address {:?} doesn't match the returned one after verification {:?}",
expected_address, address
);
let read_data = client
.read_bytes(address)
.await
.expect("Couldn't fetch uplaoded file");
compare(bytes, read_data);
}
async fn read_from_pos(
file: &LargeFile,
address: XorName,
pos: usize,
len: usize,
client: &Client,
) -> Result<Bytes> {
let read_data = client.read_from(address, pos, len).await?;
let mut expected = file.bytes();
let _ = expected.split_to(pos);
expected.truncate(len);
compare(expected, read_data.clone());
Ok(read_data)
}
fn compare(original: Bytes, result: Bytes) {
assert_eq!(
original.len(),
result.len(),
"original bytes length doesn't match"
);
for (index, (a, b)) in original.into_iter().zip(result).enumerate() {
assert_eq!(a, b, "Bytes don't match the expected at #{} byte", index);
}
}
}