use tokio::time::{sleep, Duration, Instant};
use tracing::{debug, warn};
use safe_network::client::{Client, ClientConfig, Error, Result};
use sn_interface::types::{utils::random_bytes, BytesAddress, Scope};
use tiny_keccak::{Hasher, Sha3};
#[cfg(feature = "test-utils")]
use safe_network::{client::utils::test_utils::read_network_conn_info, init_test_logger};
#[cfg(feature = "test-utils")]
#[tokio::main]
async fn main() -> Result<()> {
init_test_logger();
#[cfg(feature = "test-utils")]
run_chunk_soak().await?;
Ok(())
}
const TEST_FILES_COUNT: &str = "TEST_FILES_COUNT";
const DEFAULT_FILES_COUNT: usize = 10;
pub(crate) fn files_count() -> usize {
match std::env::var(TEST_FILES_COUNT) {
Ok(count) => match count.parse() {
Ok(count) => {
warn!(
"FILES_COUNT countout set from env var TEST_FILES_COUNT: {:?}",
TEST_FILES_COUNT
);
count
}
Err(error) => {
warn!("There was an error parsing {:?} env var. DEFAULT_FILES_COUNT will be used: {:?}", TEST_FILES_COUNT, error);
DEFAULT_FILES_COUNT
}
},
Err(_) => DEFAULT_FILES_COUNT,
}
}
#[cfg(feature = "test-utils")]
pub async fn run_chunk_soak() -> Result<()> {
let all_data_put = std::sync::Arc::new(tokio::sync::RwLock::new(vec![]));
let (genesis_key, bootstrap_nodes) =
read_network_conn_info().map_err(|_e| Error::NoNetworkKnowledge)?;
debug!("Contacting nodes: {:?}", bootstrap_nodes);
let files_to_put = files_count();
let config = ClientConfig::new(None, None, genesis_key, None, None, None, None).await;
let client = Client::new(config.clone(), bootstrap_nodes.clone(), None).await?;
let mut put_tasks = vec![];
for i in 1..files_to_put + 1 {
let client = client.clone();
let all_data_put = all_data_put.clone();
let put_handle: tokio::task::JoinHandle<Result<()>> = tokio::spawn(async move {
let (address, hash) = upload_data_using_client(client, i).await?;
all_data_put.write().await.push((address, hash));
Ok(())
});
put_tasks.push(put_handle);
}
let start_putting = Instant::now();
futures::future::join_all(put_tasks).await;
let duration = start_putting.elapsed();
println!("Time elapsed in while putting all data is: {:?}", duration);
assert_eq!(
all_data_put.read().await.len(),
files_to_put,
"put data len is same as we tried to put"
);
let client = Client::new(config, bootstrap_nodes, None).await?;
println!("Now we retrieve the data");
let start_reading = Instant::now();
for (address, known_hash) in all_data_put.read().await.iter().as_ref() {
println!("...reading bytes at address {:?} ...", address);
let mut bytes = client.read_bytes(*address).await;
let mut attempts = 1;
while bytes.is_err() && attempts < 10 {
attempts += 1;
sleep(Duration::from_secs(1)).await;
println!(
"attempt #{attempts}...reading bytes at address {:?} ...",
address
);
bytes = client.read_bytes(*address).await;
}
let bytes = bytes?;
let bytes_len_mbs = bytes.len() / (1024 * 1024);
println!("{bytes_len_mbs}mbs read from {:?}:", address);
let mut hasher = Sha3::v256();
let mut data_hash = [0; 32];
hasher.update(&bytes);
hasher.finalize(&mut data_hash);
assert_eq!(&data_hash, known_hash);
}
let duration = start_reading.elapsed();
println!("Time elapsed in while reading all data: {:?}", duration);
println!("All okay");
Ok(())
}
#[allow(dead_code)]
async fn upload_data_using_fresh_client(iteration: usize) -> Result<(BytesAddress, [u8; 32])> {
let (genesis_key, bootstrap_nodes) =
read_network_conn_info().map_err(|_e| Error::NoNetworkKnowledge)?;
let config = ClientConfig::new(None, None, genesis_key, None, None, None, None).await;
let client = Client::new(config, bootstrap_nodes, None).await?;
upload_data_using_client(client, iteration).await
}
async fn upload_data_using_client(
client: Client,
iteration: usize,
) -> Result<(BytesAddress, [u8; 32])> {
let one_mb = 1024 * 1024;
let bytes = random_bytes(one_mb * iteration);
let bytes_len_mbs = iteration;
println!("{bytes_len_mbs}mbs putting");
let mut hasher = Sha3::v256();
let mut output = [0; 32];
hasher.update(&bytes);
hasher.finalize(&mut output);
let bytes_len = bytes.len();
println!("==================== Upload iteration {iteration:?} ======================= ");
println!("Storing bytes.len : {bytes_len:?} w/ hash {:?}", output);
let start_putting = Instant::now();
let address = client.upload(bytes, Scope::Public).await?;
let duration = start_putting.elapsed();
println!(
"Time elapsed in while putting {bytes_len_mbs}mbs: {:?}",
duration
);
println!("Bytes stored at xorname: {:?}", address);
Ok((address, output))
}