use cid::Cid;
use futures::future::{pending, select, Either, FutureExt};
use futures::future::{AbortHandle, Abortable};
use ipfs::Node;
use tokio::{
task,
time::{delay_for, timeout},
};
use std::{
convert::TryFrom,
future::Future,
time::{Duration, Instant},
};
async fn bounded_retry<Fun, Fut, F, T>(
timeout: Duration,
mut future: Fun,
check: F,
) -> Result<(), ()>
where
Fun: FnMut() -> Fut,
Fut: Future<Output = T>,
F: Fn(T) -> bool,
{
let started = Instant::now();
let mut elapsed = Default::default();
loop {
if elapsed > timeout {
return Err(());
}
if check(future().await) {
return Ok(());
}
delay_for(Duration::from_millis(100)).await;
elapsed = started.elapsed();
}
}
async fn check_cid_subscriptions(ipfs: &Node, cid: &Cid, expected_count: usize) {
let subscription_count = {
let subs = ipfs.get_subscriptions().lock().unwrap();
if expected_count > 0 {
assert_eq!(subs.len(), 1);
}
subs.get(&cid.clone().into()).map(|l| l.len())
};
assert_eq!(subscription_count.unwrap_or(0), expected_count);
}
#[tokio::test(max_threads = 1)]
async fn wantlist_cancellation() {
tracing_subscriber::fmt::init();
let ipfs = Node::new("test_node").await;
let cid = Cid::try_from("QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KaGa").unwrap();
let ipfs_clone = ipfs.clone();
let cid_clone = cid.clone();
let (abort_handle1, abort_reg) = AbortHandle::new_pair();
let abortable_req = Abortable::new(
async move { ipfs_clone.get_block(&cid_clone).await },
abort_reg,
);
let _get_request1 = task::spawn(abortable_req);
let wantlist_populated = bounded_retry(
Duration::from_secs(1),
|| ipfs.bitswap_wantlist(None),
|ret| ret.unwrap().get(0).map(|x| &x.0) == Some(&cid),
)
.await;
assert!(
wantlist_populated.is_ok(),
"the wantlist is still empty after the request was issued"
);
check_cid_subscriptions(&ipfs, &cid, 1).await;
let ipfs_clone = ipfs.clone();
let cid_clone = cid.clone();
let get_request2 = ipfs_clone.get_block(&cid_clone);
let get_timeout = timeout(Duration::from_millis(100), pending::<()>());
let get_request2 = match select(get_timeout.boxed(), get_request2.boxed()).await {
Either::Left((_, fut)) => fut,
Either::Right(_) => unreachable!(),
};
check_cid_subscriptions(&ipfs, &cid, 2).await;
let ipfs_clone = ipfs.clone();
let cid_clone = cid.clone();
let get_request3 = ipfs_clone.get_block(&cid_clone);
let get_timeout = timeout(Duration::from_millis(100), pending::<()>());
let get_request3 = match select(get_timeout.boxed(), get_request3.boxed()).await {
Either::Left((_, fut)) => fut,
Either::Right(_) => unreachable!(),
};
check_cid_subscriptions(&ipfs, &cid, 3).await;
abort_handle1.abort();
let wantlist_partially_cleared1 = bounded_retry(
Duration::from_secs(1),
|| ipfs.bitswap_wantlist(None),
|ret| ret.unwrap().len() == 1,
)
.await;
assert!(
wantlist_partially_cleared1.is_ok(),
"the wantlist is empty despite there still being 2 live get requests"
);
check_cid_subscriptions(&ipfs, &cid, 2).await;
drop(get_request2);
let wantlist_partially_cleared2 = bounded_retry(
Duration::from_secs(1),
|| ipfs.bitswap_wantlist(None),
|ret| ret.unwrap().len() == 1,
)
.await;
assert!(
wantlist_partially_cleared2.is_ok(),
"the wantlist is empty despite there still being a live get request"
);
check_cid_subscriptions(&ipfs, &cid, 1).await;
drop(get_request3);
let wantlist_cleared = bounded_retry(
Duration::from_secs(1),
|| ipfs.bitswap_wantlist(None),
|ret| ret.unwrap().is_empty(),
)
.await;
assert!(
wantlist_cleared.is_ok(),
"a block was not removed from the wantlist after all its subscriptions had died"
);
check_cid_subscriptions(&ipfs, &cid, 0).await;
}