mod common;
use crate::common::{
client::{get_client_and_funded_wallet, get_node_count},
NodeRestart,
};
use ant_logging::LogBuilder;
use ant_protocol::{
storage::{ChunkAddress, GraphEntry, GraphEntryAddress, PointerTarget, ScratchpadAddress},
NetworkAddress,
};
use autonomi::{data::DataAddress, Client, Wallet};
use bls::{PublicKey, SecretKey};
use bytes::Bytes;
use common::client::transfer_to_new_wallet;
use eyre::{bail, ErrReport, Result};
use rand::Rng;
use self_encryption::MAX_CHUNK_SIZE;
use std::{
collections::{BTreeMap, HashMap, VecDeque},
fmt,
fs::create_dir_all,
sync::{Arc, LazyLock},
time::{Duration, Instant},
};
use tempfile::tempdir;
use test_utils::gen_random_data;
use tokio::{sync::RwLock, task::JoinHandle, time::sleep};
use tracing::{debug, error, info, trace, warn};
use xor_name::XorName;
const TOKENS_TO_TRANSFER: usize = 10000000;
const EXTRA_CHURN_COUNT: u32 = 5;
const CHURN_CYCLES: u32 = 2;
const CHUNK_CREATION_RATIO_TO_CHURN: u32 = 13;
const POINTER_CREATION_RATIO_TO_CHURN: u32 = 11;
const SCRATCHPAD_CREATION_RATIO_TO_CHURN: u32 = 9;
const GRAPHENTRY_CREATION_RATIO_TO_CHURN: u32 = 7;
static DATA_SIZE: LazyLock<usize> = LazyLock::new(|| *MAX_CHUNK_SIZE / 3);
const CONTENT_QUERY_RATIO_TO_CHURN: u32 = 40;
const MAX_NUM_OF_QUERY_ATTEMPTS: u8 = 5;
const TEST_DURATION: Duration = Duration::from_secs(60 * 60);
type ContentList = Arc<RwLock<VecDeque<NetworkAddress>>>;
struct ContentError {
net_addr: NetworkAddress,
attempts: u8,
last_err: ErrReport,
}
impl fmt::Debug for ContentError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"{:?}, attempts: {}, last error: {:?}",
self.net_addr, self.attempts, self.last_err
)
}
}
type ContentErredList = Arc<RwLock<BTreeMap<NetworkAddress, ContentError>>>;
#[tokio::test(flavor = "multi_thread")]
async fn data_availability_during_churn() -> Result<()> {
let _log_appender_guard = LogBuilder::init_multi_threaded_tokio_test("data_with_churn", false);
let test_duration = if let Ok(str) = std::env::var("TEST_DURATION_MINS") {
Duration::from_secs(60 * str.parse::<u64>()?)
} else {
TEST_DURATION
};
let node_count = get_node_count();
let churn_period = if let Ok(str) = std::env::var("TEST_TOTAL_CHURN_CYCLES") {
println!("Using value set in 'TEST_TOTAL_CHURN_CYCLES' env var: {str}");
info!("Using value set in 'TEST_TOTAL_CHURN_CYCLES' env var: {str}");
let cycles = str.parse::<u32>()?;
test_duration / cycles
} else {
test_duration
/ std::cmp::max(
CHURN_CYCLES * node_count as u32,
node_count as u32 + EXTRA_CHURN_COUNT,
)
};
println!("Nodes will churn every {churn_period:?}");
info!("Nodes will churn every {churn_period:?}");
let churn_count = Arc::new(RwLock::new(0_usize));
let chunks_only = std::env::var("CHUNKS_ONLY").is_ok();
println!(
"Running this test for {test_duration:?}{}...",
if chunks_only { " (Chunks only)" } else { "" }
);
info!(
"Running this test for {test_duration:?}{}...",
if chunks_only { " (Chunks only)" } else { "" }
);
let (client, main_wallet) = get_client_and_funded_wallet().await;
info!(
"Client and wallet created. Main wallet address: {:?}",
main_wallet.address()
);
let content = ContentList::default();
println!("Uploading some chunks before carry out node churning");
info!("Uploading some chunks before carry out node churning");
let chunk_wallet = transfer_to_new_wallet(&main_wallet, TOKENS_TO_TRANSFER).await?;
let store_chunks_handle = store_chunks_task(
client.clone(),
chunk_wallet,
Arc::clone(&content),
churn_period,
);
let create_pointer_handle = if !chunks_only {
let pointer_wallet = transfer_to_new_wallet(&main_wallet, TOKENS_TO_TRANSFER).await?;
let create_pointer_handle = create_pointers_task(
client.clone(),
pointer_wallet,
Arc::clone(&content),
churn_period,
);
Some(create_pointer_handle)
} else {
None
};
let create_graph_entry_handle = if !chunks_only {
let graph_entry_wallet = transfer_to_new_wallet(&main_wallet, TOKENS_TO_TRANSFER).await?;
let create_graph_entry_handle = create_graph_entry_task(
client.clone(),
graph_entry_wallet,
Arc::clone(&content),
churn_period,
);
Some(create_graph_entry_handle)
} else {
None
};
let create_scratchpad_handle = if !chunks_only {
let scratchpad_wallet = transfer_to_new_wallet(&main_wallet, TOKENS_TO_TRANSFER).await?;
let create_scratchpad_handle = create_scratchpad_task(
client.clone(),
scratchpad_wallet,
Arc::clone(&content),
churn_period,
);
Some(create_scratchpad_handle)
} else {
None
};
churn_nodes_task(Arc::clone(&churn_count), test_duration, churn_period);
let content_erred = ContentErredList::default();
let failures = ContentErredList::default();
query_content_task(
client.clone(),
Arc::clone(&content),
Arc::clone(&content_erred),
churn_period,
);
retry_query_content_task(
client.clone(),
Arc::clone(&content_erred),
Arc::clone(&failures),
churn_period,
);
info!("All tasks have been spawned. The test is now running...");
println!("All tasks have been spawned. The test is now running...");
let start_time = Instant::now();
while start_time.elapsed() < test_duration {
if store_chunks_handle.is_finished() {
bail!("Store chunks task has finished before the test duration. Probably due to an error.");
}
if let Some(handle) = &create_pointer_handle {
if handle.is_finished() {
bail!("Create Pointers task has finished before the test duration. Probably due to an error.");
}
}
if let Some(handle) = &create_graph_entry_handle {
if handle.is_finished() {
bail!("Create GraphEntry task has finished before the test duration. Probably due to an error.");
}
}
if let Some(handle) = &create_scratchpad_handle {
if handle.is_finished() {
bail!("Create ScratchPad task has finished before the test duration. Probably due to an error.");
}
}
let failed = failures.read().await;
if start_time.elapsed().as_secs() % 10 == 0 {
println!(
"Current failures after {:?} ({}): {:?}",
start_time.elapsed(),
failed.len(),
failed.values()
);
info!(
"Current failures after {:?} ({}): {:?}",
start_time.elapsed(),
failed.len(),
failed.values()
);
}
sleep(Duration::from_secs(3)).await;
}
println!();
println!(
">>>>>> Test stopping after running for {:?}. <<<<<<",
start_time.elapsed()
);
println!("{:?} churn events happened.", *churn_count.read().await);
println!();
println!("Final querying confirmation of content");
info!("Final querying confirmation of content");
let content = content.read().await;
let uploaded_content_count = content.len();
let mut handles = Vec::new();
for net_addr in content.iter() {
let client = client.clone();
let net_addr = net_addr.clone();
let failures = Arc::clone(&failures);
let handle = tokio::spawn(async move {
final_retry_query_content(&client, &net_addr, churn_period, failures).await
});
handles.push(handle);
}
let results: Vec<_> = futures::future::join_all(handles).await;
let content_queried_count = results.iter().filter(|r| r.is_ok()).count();
assert_eq!(
content_queried_count, uploaded_content_count,
"Not all content was queried successfully"
);
println!("{content_queried_count:?} pieces of content queried");
assert_eq!(
content_queried_count, uploaded_content_count,
"Not all content was queried"
);
let failed = failures.read().await;
if !failed.is_empty() {
bail!("{} failure/s in test: {:?}", failed.len(), failed.values());
}
println!("Test passed after running for {:?}.", start_time.elapsed());
Ok(())
}
fn create_scratchpad_task(
client: Client,
wallet: Wallet,
content: ContentList,
churn_period: Duration,
) -> JoinHandle<Result<()>> {
let handle: JoinHandle<Result<()>> = tokio::spawn(async move {
let mut owners: HashMap<ScratchpadAddress, SecretKey> = HashMap::new();
let delay = churn_period / SCRATCHPAD_CREATION_RATIO_TO_CHURN;
loop {
sleep(delay).await;
let is_update: bool = if owners.is_empty() {
false
} else {
rand::random()
};
let content_type: u64 = rand::random();
let data_byte: u8 = rand::random();
let mut data = vec![data_byte; 100];
rand::thread_rng().fill(&mut data[..]);
let bytes = Bytes::from(data);
let mut retries = 1;
if is_update {
let index = rand::thread_rng().gen_range(0..owners.len());
let iterator: Vec<_> = owners.iter().collect();
let (addr, owner) = iterator[index];
loop {
match client.scratchpad_update(owner, content_type, &bytes).await {
Ok(_) => {
println!("Updated ScratchPad at {addr:?} after a delay of: {delay:?}");
break;
}
Err(err) => {
println!("Failed to update ScratchPad at {addr:?}. Retrying ...");
error!("Failed to update ScratchPad at {addr:?}. Retrying ...");
if retries >= 3 {
println!(
"Failed to update pointer at {addr:?} after 3 retries: {err}"
);
error!(
"Failed to update pointer at {addr:?} after 3 retries: {err}"
);
bail!(
"Failed to update pointer at {addr:?} after 3 retries: {err}"
);
}
retries += 1;
}
}
}
} else {
let owner = SecretKey::random();
loop {
match client
.scratchpad_create(&owner, content_type, &bytes, (&wallet).into())
.await
{
Ok((cost, addr)) => {
println!("Created new ScratchPad at {addr:?} with cost of {cost:?} after a delay of: {delay:?}");
let net_addr = NetworkAddress::ScratchpadAddress(addr);
content.write().await.push_back(net_addr);
let _ = owners.insert(addr, owner);
break;
}
Err(err) => {
println!("Failed to create ScratchPad: {err:?}. Retrying ...");
error!("Failed to create ScratchPad: {err:?}. Retrying ...");
if retries >= 3 {
println!("Failed to create ScratchPad after 3 retries: {err}");
error!("Failed to create ScratchPad after 3 retries: {err}");
bail!("Failed to create ScratchPad after 3 retries: {err}");
}
retries += 1;
}
}
}
}
}
});
handle
}
fn create_graph_entry_task(
client: Client,
wallet: Wallet,
content_list: ContentList,
churn_period: Duration,
) -> JoinHandle<Result<()>> {
let handle: JoinHandle<Result<()>> = tokio::spawn(async move {
let mut growing_history: Vec<Vec<GraphEntryAddress>> = vec![];
let mut owners: HashMap<PublicKey, SecretKey> = HashMap::new();
let delay = churn_period / GRAPHENTRY_CREATION_RATIO_TO_CHURN;
loop {
sleep(delay).await;
let is_growing: bool = if growing_history.is_empty() {
false
} else {
rand::random()
};
let output = SecretKey::random();
let output_content: [u8; 32] = rand::random();
let outputs = vec![(output.public_key(), output_content)];
#[allow(unused_assignments)]
let mut index = growing_history.len();
let mut graph_entry_to_put = None;
if is_growing {
index = rand::thread_rng().gen_range(0..growing_history.len());
let Some(addr) = growing_history[index].last() else {
println!("Doesn't have history GraphEntry of {index:?}");
error!("Doesn't have history GraphEntry of {index:?}");
continue;
};
let mut retries = 1;
loop {
match client.graph_entry_get(addr).await {
Ok(graph_entry) => {
println!("Fetched graph_entry at {addr:?}");
let Some((old_output, old_content)) = graph_entry.descendants.last()
else {
println!("Can't get output from the graph_entry of {addr:?}");
error!("Can't get output from the graph_entry of {addr:?}");
break;
};
let Some(owner) = owners.get(old_output) else {
println!("Can't get secret_key of {output:?}");
error!("Can't get secret_key of {output:?}");
break;
};
let parents = vec![graph_entry.owner];
let graph_entry =
GraphEntry::new(owner, parents, *old_content, outputs);
growing_history[index].push(graph_entry.address());
graph_entry_to_put = Some(graph_entry);
break;
}
Err(err) => {
println!(
"Failed to get graph_entry at {addr:?}: {err:?}. Retrying ..."
);
error!("Failed to get graph_entry at {addr:?} : {err:?}. Retrying ...");
if retries >= 3 {
println!(
"Failed to get graph_entry at {addr:?} after 3 retries: {err}"
);
error!(
"Failed to get graph_entry at {addr:?} after 3 retries: {err}"
);
bail!(
"Failed to get graph_entry at {addr:?} after 3 retries: {err}"
);
}
retries += 1;
sleep(delay).await;
}
}
}
} else {
let owner = SecretKey::random();
let content: [u8; 32] = rand::random();
let parents = vec![];
let graph_entry = GraphEntry::new(&owner, parents, content, outputs);
growing_history.push(vec![graph_entry.address()]);
let _ = owners.insert(owner.public_key(), owner);
graph_entry_to_put = Some(graph_entry);
};
let Some(graph_entry) = graph_entry_to_put else {
println!("Doesn't have graph_entry to put to network.");
error!("Doesn't have graph_entry to put to network.");
continue;
};
let _ = owners.insert(output.public_key(), output);
let mut retries = 1;
loop {
match client
.graph_entry_put(graph_entry.clone(), (&wallet).into())
.await
{
Ok((cost, addr)) => {
println!("Uploaded graph_entry to {addr:?} with cost of {cost:?} after a delay of: {delay:?}");
let net_addr = NetworkAddress::GraphEntryAddress(addr);
content_list.write().await.push_back(net_addr);
break;
}
Err(err) => {
println!("Failed to upload graph_entry: {err:?}. Retrying ...");
error!("Failed to upload graph_entry: {err:?}. Retrying ...");
if retries >= 3 {
println!("Failed to upload graph_entry after 3 retries: {err}");
error!("Failed to upload graph_entry after 3 retries: {err}");
bail!("Failed to upload graph_entry after 3 retries: {err}");
}
retries += 1;
sleep(delay).await;
}
}
}
}
});
handle
}
fn create_pointers_task(
client: Client,
wallet: Wallet,
content: ContentList,
churn_period: Duration,
) -> JoinHandle<Result<()>> {
let handle: JoinHandle<Result<()>> = tokio::spawn(async move {
let mut owners: HashMap<NetworkAddress, (SecretKey, PointerTarget)> = HashMap::new();
let delay = churn_period / POINTER_CREATION_RATIO_TO_CHURN;
loop {
sleep(delay).await;
#[allow(unused_assignments)]
let mut pointer_addr = None;
let is_update: bool = if owners.is_empty() {
false
} else {
rand::random()
};
let mut retries = 1;
if is_update {
let index = rand::thread_rng().gen_range(0..owners.len());
let iterator: Vec<_> = owners.iter().collect();
let (addr, (owner, old_target)) = iterator[index];
let new_target =
PointerTarget::ChunkAddress(ChunkAddress::new(XorName(rand::random())));
loop {
match client.pointer_update(owner, new_target.clone()).await {
Ok(_) => {
println!("Updated Pointer at {addr:?} with {old_target:?} to new target {new_target:?} after a delay of: {delay:?}");
pointer_addr = Some((addr.clone(), None, new_target));
break;
}
Err(err) => {
println!(
"Failed to update pointer at {addr:?} with {old_target:?}: {err:?}. Retrying ..."
);
error!(
"Failed to update pointer at {addr:?} with {old_target:?}: {err:?}. Retrying ..."
);
if retries >= 3 {
println!("Failed to update pointer at {addr:?} with {old_target:?} after 3 retries: {err}");
error!("Failed to update pointer at {addr:?} with {old_target:?} after 3 retries: {err}");
bail!("Failed to update pointer at {addr:?} with {old_target:?} after 3 retries: {err}");
}
retries += 1;
}
}
}
} else {
let owner = SecretKey::random();
let pointer_target =
PointerTarget::ChunkAddress(ChunkAddress::new(XorName(rand::random())));
loop {
match client
.pointer_create(&owner, pointer_target.clone(), (&wallet).into())
.await
{
Ok((cost, addr)) => {
println!("Created new Pointer ({pointer_target:?}) at {addr:?} with cost of {cost:?} after a delay of: {delay:?}");
let net_addr = NetworkAddress::PointerAddress(addr);
pointer_addr = Some((net_addr.clone(), Some(owner), pointer_target));
content.write().await.push_back(net_addr);
break;
}
Err(err) => {
println!(
"Failed to create pointer {pointer_target:?}: {err:?}. Retrying ..."
);
error!(
"Failed to create pointer {pointer_target:?}: {err:?}. Retrying ..."
);
if retries >= 3 {
println!("Failed to create pointer {pointer_target:?} after 3 retries: {err}");
error!("Failed to create pointer {pointer_target:?} after 3 retries: {err}");
bail!("Failed to create pointer {pointer_target:?} after 3 retries: {err}");
}
retries += 1;
}
}
}
}
match pointer_addr {
Some((addr, Some(owner), target)) => {
let _ = owners.insert(addr, (owner, target));
}
Some((addr, None, new_target)) => {
if let Some((_owner, target)) = owners.get_mut(&addr) {
*target = new_target;
}
}
_ => {}
}
}
});
handle
}
fn store_chunks_task(
client: Client,
wallet: Wallet,
content: ContentList,
churn_period: Duration,
) -> JoinHandle<Result<()>> {
let handle: JoinHandle<Result<()>> = tokio::spawn(async move {
let temp_dir = tempdir().expect("Can not create a temp directory for store_chunks_task!");
let output_dir = temp_dir.path().join("chunk_path");
create_dir_all(output_dir.clone())
.expect("failed to create output dir for encrypted chunks");
let delay = churn_period / CHUNK_CREATION_RATIO_TO_CHURN;
loop {
let random_data = gen_random_data(*DATA_SIZE);
let mut retries = 1;
loop {
match client
.data_put_public(random_data.clone(), (&wallet).into())
.await
.inspect_err(|err| {
println!("Error to put chunk: {err:?}");
error!("Error to put chunk: {err:?}")
}) {
Ok((_cost, data_map)) => {
println!("Stored Chunk/s at {data_map:?} after a delay of: {delay:?}");
info!("Stored Chunk/s at {data_map:?} after a delay of: {delay:?}");
content
.write()
.await
.push_back(NetworkAddress::ChunkAddress(ChunkAddress::new(
*data_map.xorname(),
)));
break;
}
Err(err) => {
println!("Failed to store chunk: {err:?}. Retrying ...");
error!("Failed to store chunk: {err:?}. Retrying ...");
if retries >= 3 {
println!("Failed to store chunk after 3 retries: {err}");
error!("Failed to store chunk after 3 retries: {err}");
bail!("Failed to store chunk after 3 retries: {err}");
}
retries += 1;
}
}
}
sleep(delay).await;
}
});
handle
}
fn query_content_task(
client: Client,
content: ContentList,
content_erred: ContentErredList,
churn_period: Duration,
) {
let _handle = tokio::spawn(async move {
let delay = churn_period / CONTENT_QUERY_RATIO_TO_CHURN;
loop {
let len = content.read().await.len();
if len == 0 {
println!("No content created/stored just yet, let's try in {delay:?} ...");
info!("No content created/stored just yet, let's try in {delay:?} ...");
sleep(delay).await;
continue;
}
let index = rand::thread_rng().gen_range(0..len);
let net_addr = content.read().await[index].clone();
trace!("Querying content (bucket index: {index}) at {net_addr:?} in {delay:?}");
sleep(delay).await;
match query_content(&client, &net_addr).await {
Ok(_) => {
let _ = content_erred.write().await.remove(&net_addr);
}
Err(last_err) => {
println!(
"Failed to query content (index: {index}) at {net_addr}: {last_err:?}"
);
error!("Failed to query content (index: {index}) at {net_addr}: {last_err:?}");
let _ = content_erred
.write()
.await
.entry(net_addr.clone())
.and_modify(|curr| curr.attempts += 1)
.or_insert(ContentError {
net_addr,
attempts: 1,
last_err,
});
}
}
}
});
}
fn churn_nodes_task(
churn_count: Arc<RwLock<usize>>,
test_duration: Duration,
churn_period: Duration,
) {
let start = Instant::now();
let _handle: JoinHandle<Result<()>> = tokio::spawn(async move {
let mut node_restart = NodeRestart::new(true, false).await?;
loop {
sleep(churn_period).await;
if start.elapsed() > test_duration {
debug!("Test duration reached, stopping churn nodes task");
break;
}
if let Err(err) = node_restart.restart_next(true, true).await {
println!("Failed to restart node {err}");
info!("Failed to restart node {err}");
continue;
}
*churn_count.write().await += 1;
}
Ok(())
});
}
fn retry_query_content_task(
client: Client,
content_erred: ContentErredList,
failures: ContentErredList,
churn_period: Duration,
) {
let _handle = tokio::spawn(async move {
let delay = 2 * churn_period;
loop {
sleep(delay).await;
let erred = content_erred.write().await.pop_first();
if let Some((net_addr, mut content_error)) = erred {
let attempts = content_error.attempts + 1;
println!("Querying erred content at {net_addr}, attempt: #{attempts} ...");
info!("Querying erred content at {net_addr}, attempt: #{attempts} ...");
if let Err(last_err) = query_content(&client, &net_addr).await {
println!("Erred content is still not retrievable at {net_addr} after {attempts} attempts: {last_err:?}");
warn!("Erred content is still not retrievable at {net_addr} after {attempts} attempts: {last_err:?}");
content_error.attempts = attempts;
content_error.last_err = last_err;
if attempts == MAX_NUM_OF_QUERY_ATTEMPTS {
let _ = failures.write().await.insert(net_addr, content_error);
} else {
let _ = content_erred.write().await.insert(net_addr, content_error);
}
} else {
let _ = failures.write().await.remove(&net_addr);
let _ = content_erred.write().await.remove(&net_addr);
}
}
}
});
}
async fn final_retry_query_content(
client: &Client,
net_addr: &NetworkAddress,
churn_period: Duration,
failures: ContentErredList,
) -> Result<()> {
let mut attempts = 1;
let net_addr = net_addr.clone();
loop {
println!("Final querying content at {net_addr}, attempt: #{attempts} ...");
debug!("Final querying content at {net_addr}, attempt: #{attempts} ...");
if let Err(last_err) = query_content(client, &net_addr).await {
if attempts == MAX_NUM_OF_QUERY_ATTEMPTS {
println!("Final check: Content is still not retrievable at {net_addr} after {attempts} attempts: {last_err:?}");
error!("Final check: Content is still not retrievable at {net_addr} after {attempts} attempts: {last_err:?}");
bail!("Final check: Content is still not retrievable at {net_addr} after {attempts} attempts: {last_err:?}");
} else {
attempts += 1;
let delay = 2 * churn_period;
debug!("Delaying last check of {net_addr:?} for {delay:?} ...");
sleep(delay).await;
continue;
}
} else {
failures.write().await.remove(&net_addr);
return Ok(());
}
}
}
async fn query_content(client: &Client, net_addr: &NetworkAddress) -> Result<()> {
match net_addr {
NetworkAddress::ChunkAddress(addr) => {
client
.data_get_public(&DataAddress::new(*addr.xorname()))
.await?;
Ok(())
}
NetworkAddress::PointerAddress(addr) => {
let _ = client.pointer_get(addr).await?;
Ok(())
}
NetworkAddress::GraphEntryAddress(addr) => {
let _ = client.graph_entry_get(addr).await?;
Ok(())
}
NetworkAddress::ScratchpadAddress(addr) => {
let _ = client.scratchpad_get(addr).await?;
Ok(())
}
NetworkAddress::PeerId(_) | NetworkAddress::RecordKey(_) => Ok(()),
}
}