use std::{collections::HashMap, future::Future, sync::Arc};
use anyhow::{anyhow, bail, Context, Result};
use bytes::Bytes;
use futures_lite::Stream;
use futures_util::{FutureExt, StreamExt, TryStreamExt};
use iroh::{endpoint::presets, Endpoint, PublicKey, SecretKey};
use iroh_blobs::Hash;
use iroh_docs::{
api::{
protocol::{AddrInfoOptions, ShareMode},
Doc,
},
engine::LiveEvent,
store::{DownloadPolicy, FilterKind, Query},
AuthorId, ContentStatus, Entry,
};
use n0_future::time::{Duration, Instant};
use rand::{CryptoRng, RngExt, SeedableRng};
#[cfg(feature = "fs-store")]
use tempfile::tempdir;
use tracing::{debug, error_span, info, Instrument};
use tracing_test::traced_test;
mod util;
use util::{Builder, Node};
use crate::util::empty_endpoint;
const TIMEOUT: Duration = Duration::from_secs(60);
async fn test_node(secret_key: SecretKey) -> Result<Builder> {
let ep = Endpoint::builder(presets::Minimal)
.secret_key(secret_key)
.bind()
.await?;
Ok(Node::memory(ep))
}
fn spawn_node(
i: usize,
rng: &mut impl CryptoRng,
) -> impl Future<Output = anyhow::Result<Node>> + 'static {
let secret_key = SecretKey::from_bytes(&rng.random());
async move {
let node = test_node(secret_key).await?;
let node = node.spawn().await?;
info!(?i, me = %node.id().fmt_short(), "node spawned");
Ok(node)
}
}
async fn spawn_nodes(n: usize, mut rng: &mut impl CryptoRng) -> anyhow::Result<Vec<Node>> {
let mut futs = vec![];
for i in 0..n {
futs.push(spawn_node(i, &mut rng));
}
futures_buffered::join_all(futs).await.into_iter().collect()
}
pub fn test_rng(seed: &[u8]) -> rand::rngs::ChaCha12Rng {
rand::rngs::ChaCha12Rng::from_seed(*Hash::new(seed).as_bytes())
}
macro_rules! match_event {
($pattern:pat $(if $guard:expr)? $(,)?) => {
Box::new(move |e| matches!(e, $pattern $(if $guard)?))
};
}
#[tokio::test]
#[traced_test]
async fn sync_simple() -> Result<()> {
let mut rng = test_rng(b"sync_simple");
let nodes = spawn_nodes(2, &mut rng).await?;
let clients = nodes.iter().map(|node| node.client()).collect::<Vec<_>>();
let peer0 = nodes[0].id();
let author0 = clients[0].docs().author_create().await?;
let doc0 = clients[0].docs().create().await?;
let blobs0 = clients[0].blobs();
let hash0 = doc0
.set_bytes(author0, b"k1".to_vec(), b"v1".to_vec())
.await?;
assert_latest(blobs0, &doc0, b"k1", b"v1").await;
let ticket = doc0
.share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses)
.await?;
let mut events0 = doc0.subscribe().await?;
info!("node1: join");
let peer1 = nodes[1].id();
let doc1 = clients[1].docs().import(ticket.clone()).await?;
let blobs1 = clients[1].blobs();
let mut events1 = doc1.subscribe().await?;
info!("node1: assert 5 events");
assert_next_unordered(
&mut events1,
TIMEOUT,
vec![
Box::new(move |e| matches!(e, LiveEvent::NeighborUp(peer) if *peer == peer0)),
Box::new(move |e| matches!(e, LiveEvent::InsertRemote { from, .. } if *from == peer0 )),
Box::new(move |e| match_sync_finished(e, peer0)),
Box::new(move |e| matches!(e, LiveEvent::ContentReady { hash } if *hash == hash0)),
match_event!(LiveEvent::PendingContentReady),
],
)
.await;
assert_latest(blobs1, &doc1, b"k1", b"v1").await;
info!("node0: assert 2 events");
assert_next_unordered(
&mut events0,
TIMEOUT,
vec![
Box::new(move |e| matches!(e, LiveEvent::NeighborUp(peer) if *peer == peer1)),
Box::new(move |e| match_sync_finished(e, peer1)),
match_event!(LiveEvent::PendingContentReady),
],
)
.await;
for node in nodes {
node.shutdown().await?;
}
Ok(())
}
#[tokio::test]
#[traced_test]
async fn sync_subscribe_no_sync() -> Result<()> {
let mut rng = test_rng(b"sync_subscribe");
let node = spawn_node(0, &mut rng).await?;
let client = node.client();
let doc = client.docs().create().await?;
let mut sub = doc.subscribe().await?;
let author = client.docs().author_create().await?;
doc.set_bytes(author, b"k".to_vec(), b"v".to_vec()).await?;
let event = n0_future::time::timeout(Duration::from_millis(100), sub.next()).await?;
assert!(
matches!(event, Some(Ok(LiveEvent::InsertLocal { .. }))),
"expected InsertLocal but got {event:?}"
);
node.shutdown().await?;
Ok(())
}
#[tokio::test]
#[traced_test]
async fn sync_gossip_bulk() -> Result<()> {
let n_entries: usize = std::env::var("N_ENTRIES")
.map(|x| x.parse().expect("N_ENTRIES must be a number"))
.unwrap_or(100);
let mut rng = test_rng(b"sync_gossip_bulk");
let nodes = spawn_nodes(2, &mut rng).await?;
let clients = nodes.iter().map(|node| node.client()).collect::<Vec<_>>();
let _peer0 = nodes[0].id();
let author0 = clients[0].docs().author_create().await?;
let doc0 = clients[0].docs().create().await?;
let mut ticket = doc0
.share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses)
.await?;
let peers = ticket.nodes.clone();
ticket.nodes = vec![];
let doc1 = clients[1].docs().import(ticket).await?;
let mut events = doc1.subscribe().await?;
let now = Instant::now();
let value = b"foo";
for i in 0..n_entries {
let key = format!("init/{i}");
doc0.set_bytes(author0, key.as_bytes().to_vec(), value.to_vec())
.await?;
}
let elapsed = now.elapsed();
info!(
"insert took {elapsed:?} for {n_entries} ({:?} per entry)",
elapsed / n_entries as u32
);
let now = Instant::now();
let mut count = 0;
doc0.start_sync(vec![]).await?;
doc1.start_sync(peers).await?;
while let Some(event) = events.next().await {
let event = event?;
if matches!(event, LiveEvent::InsertRemote { .. }) {
count += 1;
}
if count == n_entries {
break;
}
}
let elapsed = now.elapsed();
info!(
"initial sync took {elapsed:?} for {n_entries} ({:?} per entry)",
elapsed / n_entries as u32
);
let mut count = 0;
let value = b"foo";
let now = Instant::now();
for i in 0..n_entries {
let key = format!("gossip/{i}");
doc0.set_bytes(author0, key.as_bytes().to_vec(), value.to_vec())
.await?;
}
let elapsed = now.elapsed();
info!(
"insert took {elapsed:?} for {n_entries} ({:?} per entry)",
elapsed / n_entries as u32
);
while let Some(event) = events.next().await {
let event = event?;
if matches!(event, LiveEvent::InsertRemote { .. }) {
count += 1;
}
if count == n_entries {
break;
}
}
let elapsed = now.elapsed();
info!(
"gossip recv took {elapsed:?} for {n_entries} ({:?} per entry)",
elapsed / n_entries as u32
);
Ok(())
}
#[tokio::test]
#[traced_test]
#[ignore = "flaky"]
async fn sync_full_basic() -> testresult::TestResult<()> {
let mut rng = test_rng(b"sync_full_basic");
let mut nodes = spawn_nodes(2, &mut rng).await?;
let mut clients = nodes
.iter()
.map(|node| node.client().clone())
.collect::<Vec<_>>();
let peer0 = nodes[0].id();
let author0 = clients[0].docs().author_create().await?;
let doc0 = clients[0].docs().create().await?;
let blobs0 = clients[0].blobs();
let mut events0 = doc0.subscribe().await?;
let key0 = b"k1";
let value0 = b"v1";
let hash0 = doc0
.set_bytes(author0, key0.to_vec(), value0.to_vec())
.await?;
info!("peer0: wait for 1 event (local insert)");
let e = next(&mut events0).await;
assert!(
matches!(&e, LiveEvent::InsertLocal { entry } if entry.content_hash() == hash0),
"expected LiveEvent::InsertLocal but got {e:?}",
);
assert_latest(blobs0, &doc0, key0, value0).await;
let ticket = doc0
.share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses)
.await?;
info!("peer1: spawn");
let peer1 = nodes[1].id();
let author1 = clients[1].docs().author_create().await?;
info!("peer1: join doc");
let doc1 = clients[1].docs().import(ticket.clone()).await?;
let blobs1 = clients[1].blobs();
info!("peer1: wait for 4 events (for sync and join with peer0)");
let mut events1 = doc1.subscribe().await?;
assert_next_unordered(
&mut events1,
TIMEOUT,
vec![
match_event!(LiveEvent::NeighborUp(peer) if *peer == peer0),
match_event!(LiveEvent::InsertRemote { from, .. } if *from == peer0 ),
Box::new(move |e| match_sync_finished(e, peer0)),
match_event!(LiveEvent::ContentReady { hash } if *hash == hash0),
match_event!(LiveEvent::PendingContentReady),
],
)
.await;
info!("peer0: wait for 2 events (join & accept sync finished from peer1)");
assert_next(
&mut events0,
TIMEOUT,
vec![
match_event!(LiveEvent::NeighborUp(peer) if *peer == peer1),
Box::new(move |e| match_sync_finished(e, peer1)),
match_event!(LiveEvent::PendingContentReady),
],
)
.await;
info!("peer1: insert entry");
let key1 = b"k2";
let value1 = b"v2";
let hash1 = doc1
.set_bytes(author1, key1.to_vec(), value1.to_vec())
.await?;
assert_latest(blobs1, &doc1, key1, value1).await;
info!("peer1: wait for 1 event (local insert, and pendingcontentready)");
assert_next(
&mut events1,
TIMEOUT,
vec![match_event!(LiveEvent::InsertLocal { entry} if entry.content_hash() == hash1)],
)
.await;
info!("peer0: wait for 2 events (gossip'ed entry from peer1)");
assert_next(
&mut events0,
TIMEOUT,
vec![
Box::new(
move |e| matches!(e, LiveEvent::InsertRemote { from, content_status: ContentStatus::Missing, .. } if *from == peer1),
),
Box::new(move |e| matches!(e, LiveEvent::ContentReady { hash } if *hash == hash1)),
],
).await;
assert_latest(blobs0, &doc0, key1, value1).await;
info!("peer2: spawn");
nodes.push(spawn_node(nodes.len(), &mut rng).await?);
clients.push(nodes.last().unwrap().client().clone());
let doc2 = clients[2].docs().import(ticket).await?;
let blobs2 = clients[2].blobs();
let peer2 = nodes[2].id();
let mut events2 = doc2.subscribe().await?;
info!("peer2: wait for 9 events (from sync with peers)");
assert_next_unordered_with_optionals(
&mut events2,
TIMEOUT,
vec![
Box::new(move |e| matches!(e, LiveEvent::NeighborUp(peer) if *peer == peer0)),
Box::new(move |e| matches!(e, LiveEvent::NeighborUp(peer) if *peer == peer1)),
Box::new(move |e| match_sync_finished(e, peer0)),
Box::new(move |e| match_sync_finished(e, peer1)),
Box::new(
move |e| matches!(e, LiveEvent::InsertRemote { entry, content_status: ContentStatus::Missing, .. } if entry.content_hash() == hash0),
),
Box::new(
move |e| matches!(e, LiveEvent::InsertRemote { entry, content_status: ContentStatus::Missing, .. } if entry.content_hash() == hash1),
),
Box::new(move |e| matches!(e, LiveEvent::ContentReady { hash } if *hash == hash0)),
Box::new(move |e| matches!(e, LiveEvent::ContentReady { hash } if *hash == hash1)),
match_event!(LiveEvent::PendingContentReady),
],
vec![
Box::new(move |e| match_sync_finished(e, peer0)),
Box::new(move |e| match_sync_finished(e, peer1)),
match_event!(LiveEvent::PendingContentReady),
match_event!(LiveEvent::PendingContentReady),
]
).await;
assert_latest(blobs2, &doc2, b"k1", b"v1").await;
assert_latest(blobs2, &doc2, b"k2", b"v2").await;
info!("peer0: wait for 2 events (join & accept sync finished from peer2)");
assert_next(
&mut events0,
TIMEOUT,
vec![
Box::new(move |e| matches!(e, LiveEvent::NeighborUp(peer) if *peer == peer2)),
Box::new(move |e| match_sync_finished(e, peer2)),
match_event!(LiveEvent::PendingContentReady),
],
)
.await;
info!("peer1: wait for 2 events (join & accept sync finished from peer2)");
assert_next(
&mut events1,
TIMEOUT,
vec![
Box::new(move |e| matches!(e, LiveEvent::NeighborUp(peer) if *peer == peer2)),
Box::new(move |e| match_sync_finished(e, peer2)),
match_event!(LiveEvent::PendingContentReady),
],
)
.await;
info!("shutdown");
for node in nodes {
node.shutdown().await?;
}
Ok(())
}
#[tokio::test]
#[traced_test]
async fn sync_open_close() -> Result<()> {
let mut rng = test_rng(b"sync_subscribe_stop_close");
let node = spawn_node(0, &mut rng).await?;
let client = node.client();
let doc = client.docs().create().await?;
let status = doc.status().await?;
assert_eq!(status.handles, 1);
let doc2 = client.docs().open(doc.id()).await?.unwrap();
let status = doc2.status().await?;
assert_eq!(status.handles, 2);
doc.close().await?;
assert!(doc.status().await.is_err());
let status = doc2.status().await?;
assert_eq!(status.handles, 1);
Ok(())
}
#[tokio::test]
#[traced_test]
async fn sync_subscribe_stop_close() -> Result<()> {
let mut rng = test_rng(b"sync_subscribe_stop_close");
let node = spawn_node(0, &mut rng).await?;
let client = node.client();
let doc = client.docs().create().await?;
let author = client.docs().author_create().await?;
let status = doc.status().await?;
assert_eq!(status.subscribers, 0);
assert_eq!(status.handles, 1);
assert!(!status.sync);
doc.start_sync(vec![]).await?;
let status = doc.status().await?;
assert!(status.sync);
assert_eq!(status.handles, 2);
assert_eq!(status.subscribers, 1);
let sub = doc.subscribe().await?;
let status = doc.status().await?;
assert_eq!(status.subscribers, 2);
drop(sub);
doc.set_bytes(author, b"x".to_vec(), b"x".to_vec()).await?;
let status = doc.status().await?;
assert_eq!(status.subscribers, 1);
doc.leave().await?;
let status = doc.status().await?;
assert_eq!(status.subscribers, 0);
assert_eq!(status.handles, 1);
assert!(!status.sync);
Ok(())
}
#[tokio::test]
#[traced_test]
async fn test_sync_via_relay() -> Result<()> {
let mut rng = test_rng(b"test_sync_via_relay");
let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await?;
use crate::util::endpoint;
let ep1 = endpoint(
SecretKey::from_bytes(&rng.random()),
relay_map.clone(),
None,
)
.await?;
let node1 = Node::memory(ep1).spawn().await?;
let node1_id = node1.id();
let ep2 = endpoint(
SecretKey::from_bytes(&rng.random()),
relay_map.clone(),
None,
)
.await?;
let node2 = Node::memory(ep2).spawn().await?;
node1.online().await;
node2.online().await;
let doc1 = node1.docs().create().await?;
let author1 = node1.docs().author_create().await?;
let inserted_hash = doc1
.set_bytes(author1, b"foo".to_vec(), b"bar".to_vec())
.await?;
let mut ticket = doc1
.share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses)
.await?;
let mut relay_ticket = ticket.nodes[0].clone();
relay_ticket.addrs = relay_ticket
.addrs
.iter()
.filter(|addr| matches!(addr, iroh::TransportAddr::Relay(_)))
.cloned()
.collect();
ticket.nodes[0] = relay_ticket;
let doc2 = node2.docs().import(ticket).await?;
let blobs2 = node2.blobs();
let mut events = doc2.subscribe().await?;
assert_next_unordered_with_optionals(
&mut events,
Duration::from_secs(2),
vec![
Box::new(move |e| matches!(e, LiveEvent::NeighborUp(n) if *n== node1_id)),
Box::new(move |e| match_sync_finished(e, node1_id)),
Box::new(
move |e| matches!(e, LiveEvent::InsertRemote { from, content_status: ContentStatus::Missing | ContentStatus::Incomplete, .. } if *from == node1_id),
),
Box::new(
move |e| matches!(e, LiveEvent::ContentReady { hash } if *hash == inserted_hash),
),
match_event!(LiveEvent::PendingContentReady),
],
vec![Box::new(move |e| match_sync_finished(e, node1_id))],
).await;
let actual = blobs2
.get_bytes(
doc2.get_exact(author1, b"foo", false)
.await?
.expect("entry to exist")
.content_hash(),
)
.await?;
assert_eq!(actual.as_ref(), b"bar");
let updated_hash = doc1
.set_bytes(author1, b"foo".to_vec(), b"update".to_vec())
.await?;
assert_next_unordered_with_optionals(
&mut events,
Duration::from_secs(10),
vec![
Box::new(
move |e| matches!(e, LiveEvent::InsertRemote { from, content_status: ContentStatus::Missing | ContentStatus::Incomplete, .. } if *from == node1_id),
),
Box::new(
move |e| matches!(e, LiveEvent::ContentReady { hash } if *hash == updated_hash),
),
],
vec![
Box::new(move |e| match_sync_finished(e, node1_id)),
Box::new(move |e| matches!(e, LiveEvent::PendingContentReady)),
],
).await;
let actual = blobs2
.get_bytes(
doc2.get_exact(author1, b"foo", false)
.await?
.expect("entry to exist")
.content_hash(),
)
.await?;
assert_eq!(actual.as_ref(), b"update");
Ok(())
}
#[tokio::test]
#[traced_test]
#[ignore = "flaky"]
#[cfg(feature = "fs-store")]
async fn sync_restart_node() -> Result<()> {
use crate::util::endpoint;
let mut rng = test_rng(b"sync_restart_node");
let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await?;
let lookup_server = iroh::test_utils::DnsPkarrServer::run().await?;
let node1_dir = tempfile::TempDir::with_prefix("test-sync_restart_node-node1")?;
let secret_key_1 = SecretKey::from_bytes(&rng.random());
let ep = endpoint(
secret_key_1.clone(),
relay_map.clone(),
Some(&lookup_server),
)
.await?;
let node1 = Node::persistent(&node1_dir, ep).spawn().await?;
let id1 = node1.id();
let doc1 = node1.docs().create().await?;
let blobs1 = node1.blobs();
let mut events1 = doc1.subscribe().await?;
let ticket = doc1
.share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses)
.await?;
let secret_key_2 = SecretKey::from_bytes(&rng.random());
let ep = endpoint(secret_key_2, relay_map.clone(), Some(&lookup_server)).await?;
let node2 = Node::memory(ep).spawn().await?;
let id2 = node2.id();
let author2 = node2.docs().author_create().await?;
let doc2 = node2.docs().import(ticket.clone()).await?;
let blobs2 = node2.blobs();
info!("node2 set a");
let hash_a = doc2.set_bytes(author2, "n2/a", "a").await?;
assert_latest(blobs2, &doc2, b"n2/a", b"a").await;
assert_next_unordered_with_optionals(
&mut events1,
Duration::from_secs(10),
vec![
match_event!(LiveEvent::NeighborUp(n) if *n == id2),
match_event!(LiveEvent::SyncFinished(e) if e.peer == id2 && e.result.is_ok()),
match_event!(LiveEvent::InsertRemote { from, content_status: ContentStatus::Missing, .. } if *from == id2),
match_event!(LiveEvent::ContentReady { hash } if *hash == hash_a),
match_event!(LiveEvent::PendingContentReady),
],
vec![
match_event!(LiveEvent::SyncFinished(e) if e.peer == id2 && e.result.is_ok()),
match_event!(LiveEvent::PendingContentReady),
],
)
.await;
assert_latest(blobs1, &doc1, b"n2/a", b"a").await;
info!(me = %id1.fmt_short(), "node1 start shutdown");
node1.shutdown().await?;
info!(me = %id1.fmt_short(), "node1 down");
info!(me = %id1.fmt_short(), "sleep 1s");
n0_future::time::sleep(Duration::from_secs(1)).await;
info!(me = %id2.fmt_short(), "node2 set b");
let hash_b = doc2.set_bytes(author2, "n2/b", "b").await?;
info!(me = %id1.fmt_short(), "node1 respawn");
let ep = endpoint(
secret_key_1.clone(),
relay_map.clone(),
Some(&lookup_server),
)
.await?;
let node1 = Node::persistent(&node1_dir, ep).spawn().await?;
assert_eq!(id1, node1.id());
let doc1 = node1.docs().open(doc1.id()).await?.expect("doc to exist");
let blobs1 = node1.blobs();
let mut events1 = doc1.subscribe().await?;
assert_latest(blobs1, &doc1, b"n2/a", b"a").await;
doc1.start_sync(vec![]).await?;
assert_next_unordered_with_optionals(
&mut events1,
Duration::from_secs(10),
vec![
match_event!(LiveEvent::NeighborUp(n) if *n== id2),
match_event!(LiveEvent::SyncFinished(e) if e.peer == id2 && e.result.is_ok()),
match_event!(LiveEvent::InsertRemote { from, content_status: ContentStatus::Missing, .. } if *from == id2),
match_event!(LiveEvent::ContentReady { hash } if *hash == hash_b),
],
vec![
match_event!(LiveEvent::SyncFinished(e) if e.peer == id2 && e.result.is_ok()),
match_event!(LiveEvent::PendingContentReady),
]
).await;
assert_latest(blobs1, &doc1, b"n2/b", b"b").await;
info!(me = %id2.fmt_short(), "node2 set c");
let hash_c = doc2.set_bytes(author2, "n2/c", "c").await?;
assert_next_unordered_with_optionals(
&mut events1,
Duration::from_secs(10),
vec![
match_event!(LiveEvent::InsertRemote { from, content_status: ContentStatus::Missing, .. } if *from == id2),
match_event!(LiveEvent::ContentReady { hash } if *hash == hash_c),
],
vec![
match_event!(LiveEvent::SyncFinished(e) if e.peer == id2 && e.result.is_ok()),
match_event!(LiveEvent::PendingContentReady),
match_event!(LiveEvent::SyncFinished(e) if e.peer == id2 && e.result.is_ok()),
match_event!(LiveEvent::PendingContentReady),
]
).await;
assert_latest(blobs1, &doc1, b"n2/c", b"c").await;
Ok(())
}
#[tokio::test]
async fn test_download_policies() -> Result<()> {
let star_wars_movies = &[
"star_wars/prequel/the_phantom_menace",
"star_wars/prequel/attack_of_the_clones",
"star_wars/prequel/revenge_of_the_sith",
"star_wars/og/a_new_hope",
"star_wars/og/the_empire_strikes_back",
"star_wars/og/return_of_the_jedi",
];
let lotr_movies = &[
"lotr/fellowship_of_the_ring",
"lotr/the_two_towers",
"lotr/return_of_the_king",
];
let policy_b =
DownloadPolicy::EverythingExcept(vec![FilterKind::Prefix("star_wars/og".into())]);
let policy_a = DownloadPolicy::NothingExcept(vec![FilterKind::Exact(
"lotr/fellowship_of_the_ring".into(),
)]);
const EXPECTED_A_SYNCED: usize = 3;
const EXPECTED_A_DOWNLOADED: usize = 1;
const EXPECTED_B_SYNCED: usize = 6;
const EXPECTED_B_DOWNLOADED: usize = 3;
let mut rng = test_rng(b"sync_download_policies");
let nodes = spawn_nodes(2, &mut rng).await?;
let clients = nodes.iter().map(|node| node.client()).collect::<Vec<_>>();
let doc_a = clients[0].docs().create().await?;
let author_a = clients[0].docs().author_create().await?;
let ticket = doc_a
.share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses)
.await?;
let doc_b = clients[1].docs().import(ticket).await?;
let author_b = clients[1].docs().author_create().await?;
doc_a.set_download_policy(policy_a).await?;
doc_b.set_download_policy(policy_b).await?;
let mut events_a = doc_a.subscribe().await?;
let mut events_b = doc_b.subscribe().await?;
let mut key_hashes: HashMap<iroh_blobs::Hash, &'static str> = HashMap::default();
for k in star_wars_movies.iter() {
let hash = doc_a
.set_bytes(author_a, k.to_owned(), k.to_owned())
.await?;
key_hashes.insert(hash, k);
}
for k in lotr_movies.iter() {
let hash = doc_b
.set_bytes(author_b, k.to_owned(), k.to_owned())
.await?;
key_hashes.insert(hash, k);
}
assert_eq!(key_hashes.len(), star_wars_movies.len() + lotr_movies.len());
let fut = async {
use LiveEvent::*;
let mut downloaded_a: Vec<&'static str> = Vec::new();
let mut downloaded_b: Vec<&'static str> = Vec::new();
let mut synced_a = 0usize;
let mut synced_b = 0usize;
loop {
tokio::select! {
Some(Ok(ev)) = events_a.next() => {
match ev {
InsertRemote { content_status, entry, .. } => {
synced_a += 1;
if let ContentStatus::Complete = content_status {
downloaded_a.push(key_hashes.get(&entry.content_hash()).unwrap())
}
},
ContentReady { hash } => {
downloaded_a.push(key_hashes.get(&hash).unwrap());
},
_ => {}
}
}
Some(Ok(ev)) = events_b.next() => {
match ev {
InsertRemote { content_status, entry, .. } => {
synced_b += 1;
if let ContentStatus::Complete = content_status {
downloaded_b.push(key_hashes.get(&entry.content_hash()).unwrap())
}
},
ContentReady { hash } => {
downloaded_b.push(key_hashes.get(&hash).unwrap());
},
_ => {}
}
}
}
if synced_a == EXPECTED_A_SYNCED
&& downloaded_a.len() == EXPECTED_A_DOWNLOADED
&& synced_b == EXPECTED_B_SYNCED
&& downloaded_b.len() == EXPECTED_B_DOWNLOADED
{
break;
}
}
(downloaded_a, downloaded_b)
};
let (downloaded_a, mut downloaded_b) = n0_future::time::timeout(TIMEOUT, fut)
.await
.context("timeout elapsed")?;
downloaded_b.sort();
assert_eq!(downloaded_a, vec!["lotr/fellowship_of_the_ring"]);
assert_eq!(
downloaded_b,
vec![
"star_wars/prequel/attack_of_the_clones",
"star_wars/prequel/revenge_of_the_sith",
"star_wars/prequel/the_phantom_menace",
]
);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
#[traced_test]
#[ignore = "flaky"]
async fn sync_big() -> Result<()> {
let mut rng = test_rng(b"sync_big");
let n_nodes = std::env::var("NODES")
.map(|v| v.parse().expect("NODES must be a number"))
.unwrap_or(10);
let n_entries_init = 1;
tokio::task::spawn(async move {
for i in 0.. {
n0_future::time::sleep(Duration::from_secs(1)).await;
info!("tick {i}");
}
});
let nodes = spawn_nodes(n_nodes, &mut rng).await?;
let node_ids = nodes.iter().map(|node| node.id()).collect::<Vec<_>>();
let clients = nodes.iter().map(|node| node.client()).collect::<Vec<_>>();
let authors = collect_futures(clients.iter().map(|c| c.docs().author_create())).await?;
let doc0 = clients[0].docs().create().await?;
let mut ticket = doc0
.share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses)
.await?;
let peer0 = ticket.nodes[0].clone();
ticket.nodes = vec![];
let docs_clients: Vec<_> = clients.iter().skip(1).collect();
let mut docs = vec![];
docs.push(doc0);
docs.extend_from_slice(
&collect_futures(docs_clients.into_iter().map(|c| {
let ticket = ticket.clone();
async move { c.docs().import(ticket).await }
}))
.await?,
);
let mut expected = vec![];
publish(&docs, &mut expected, n_entries_init, |i, j| {
(
authors[i],
format!("init/{}/{j}", node_ids[i].fmt_short()),
format!("init:{i}:{j}"),
)
})
.await?;
for (i, doc) in docs.iter().enumerate() {
let blobs = nodes[i].blobs();
let entries = get_all_with_content(blobs, doc).await?;
let mut expected = expected
.iter()
.filter(|e| e.author == authors[i])
.cloned()
.collect::<Vec<_>>();
expected.sort();
assert_eq!(entries, expected, "phase1 pre-sync correct");
}
let events = collect_futures(docs.iter().map(|d| d.subscribe())).await?;
for (i, doc) in docs.iter().enumerate().skip(1) {
info!(me = %node_ids[i].fmt_short(), peer = %peer0.id.fmt_short(), "join");
doc.start_sync(vec![peer0.clone()]).await?;
}
info!("wait for all peers to receive insert events");
let expected_inserts = (n_nodes - 1) * n_entries_init;
let mut tasks = tokio::task::JoinSet::default();
for (i, events) in events.into_iter().enumerate() {
let doc = docs[i].clone();
let me = doc.id().fmt_short();
let expected = expected.clone();
let fut = async move {
wait_for_events(events, expected_inserts, TIMEOUT, |e| {
matches!(e, LiveEvent::InsertRemote { .. })
})
.await?;
let entries = get_all(&doc).await?;
if entries != expected {
Err(anyhow!(
"node {i} failed (has {} entries but expected to have {})",
entries.len(),
expected.len()
))
} else {
info!(
"received and checked all {} expected entries",
expected.len()
);
Ok(())
}
}
.instrument(error_span!("sync-test", %me));
let fut = fut.map(move |r| r.with_context(move || format!("node {i} ({me})")));
tasks.spawn(fut);
}
while let Some(res) = tasks.join_next().await {
res??;
}
assert_all_docs(&docs, &node_ids, &expected, "after initial sync").await;
info!("shutdown");
for node in nodes {
node.shutdown().await?;
}
Ok(())
}
#[tokio::test]
#[traced_test]
async fn test_list_docs_stream() -> testresult::TestResult<()> {
let node = Node::memory(empty_endpoint().await?).spawn().await?;
let count = 200;
for _i in 0..count {
let doc = node.docs().create().await?;
doc.close().await?;
}
let mut stream = node.docs().list().await?;
let mut i = 0;
let fut = async {
while let Some((id, _)) = stream.try_next().await.unwrap() {
let _doc = node.docs().open(id).await.unwrap().unwrap();
i += 1;
}
};
n0_future::time::timeout(Duration::from_secs(2), fut)
.await
.expect("not to timeout");
assert_eq!(i, count);
Ok(())
}
async fn get_all(doc: &Doc) -> anyhow::Result<Vec<Entry>> {
let entries = doc.get_many(Query::all()).await?;
let entries = entries.collect::<Vec<_>>().await;
entries.into_iter().collect()
}
async fn get_all_with_content(
blobs: &iroh_blobs::api::Store,
doc: &Doc,
) -> anyhow::Result<Vec<(Entry, Bytes)>> {
let entries = doc.get_many(Query::all()).await?;
let entries = entries.and_then(|entry| async {
let hash = entry.content_hash();
let content = blobs.get_bytes(hash).await.map_err(anyhow::Error::from);
content.map(|c| (entry, c))
});
let entries = entries.collect::<Vec<_>>().await;
let entries = entries.into_iter().collect::<Result<Vec<_>>>()?;
Ok(entries)
}
async fn publish(
docs: &[Doc],
expected: &mut Vec<ExpectedEntry>,
n: usize,
cb: impl Fn(usize, usize) -> (AuthorId, String, String),
) -> anyhow::Result<()> {
for (i, doc) in docs.iter().enumerate() {
for j in 0..n {
let (author, key, value) = cb(i, j);
doc.set_bytes(author, key.as_bytes().to_vec(), value.as_bytes().to_vec())
.await?;
expected.push(ExpectedEntry { author, key, value });
}
}
expected.sort();
Ok(())
}
async fn collect_futures<T>(
futs: impl IntoIterator<Item = impl Future<Output = anyhow::Result<T>>>,
) -> anyhow::Result<Vec<T>> {
futures_buffered::join_all(futs)
.await
.into_iter()
.collect::<Result<Vec<_>>>()
}
async fn wait_for_events(
mut events: impl Stream<Item = Result<LiveEvent>> + Send + Unpin + 'static,
count: usize,
timeout: Duration,
matcher: impl Fn(&LiveEvent) -> bool,
) -> anyhow::Result<Vec<LiveEvent>> {
let mut res = Vec::with_capacity(count);
let sleep = n0_future::time::sleep(timeout);
tokio::pin!(sleep);
while res.len() < count {
tokio::select! {
() = &mut sleep => {
bail!("Failed to collect {count} elements in {timeout:?} (collected only {})", res.len());
},
event = events.try_next() => {
let event = event?;
match event {
None => bail!("stream ended after {} items, but expected {count}", res.len()),
Some(event) => if matcher(&event) {
res.push(event);
debug!("recv event {} of {count}", res.len());
}
}
}
}
}
Ok(res)
}
async fn assert_all_docs(
docs: &[Doc],
node_ids: &[PublicKey],
expected: &Vec<ExpectedEntry>,
label: &str,
) {
info!("validate all peers: {label}");
for (i, doc) in docs.iter().enumerate() {
let entries = get_all(doc).await.unwrap_or_else(|err| {
panic!("failed to get entries for peer {:?}: {err:?}", node_ids[i])
});
assert_eq!(
&entries,
expected,
"{label}: peer {i} {:?} failed (have {} but expected {})",
node_ids[i],
entries.len(),
expected.len()
);
}
}
#[derive(Debug, Ord, Eq, PartialEq, PartialOrd, Clone)]
struct ExpectedEntry {
author: AuthorId,
key: String,
value: String,
}
impl PartialEq<Entry> for ExpectedEntry {
fn eq(&self, other: &Entry) -> bool {
self.key.as_bytes() == other.key()
&& Hash::new(&self.value) == other.content_hash()
&& self.author == other.author()
}
}
impl PartialEq<(Entry, Bytes)> for ExpectedEntry {
fn eq(&self, (entry, content): &(Entry, Bytes)) -> bool {
self.key.as_bytes() == entry.key()
&& Hash::new(&self.value) == entry.content_hash()
&& self.author == entry.author()
&& self.value.as_bytes() == content.as_ref()
}
}
impl PartialEq<ExpectedEntry> for Entry {
fn eq(&self, other: &ExpectedEntry) -> bool {
other.eq(self)
}
}
impl PartialEq<ExpectedEntry> for (Entry, Bytes) {
fn eq(&self, other: &ExpectedEntry) -> bool {
other.eq(self)
}
}
#[tokio::test]
#[traced_test]
#[cfg(feature = "fs-store")]
async fn doc_delete() -> Result<()> {
let tempdir = tempdir()?;
let ep = empty_endpoint().await?;
let node = Node::persistent(tempdir.path(), ep)
.gc_interval(Some(Duration::from_millis(100)))
.spawn()
.await?;
let client = node.client();
let doc = client.docs().create().await?;
let blobs = client.blobs();
let author = client.docs().author_create().await?;
let hash = doc
.set_bytes(author, b"foo".to_vec(), b"hi".to_vec())
.await?;
assert_latest(blobs, &doc, b"foo", b"hi").await;
let deleted = doc.del(author, b"foo".to_vec()).await?;
assert_eq!(deleted, 1);
let entry = doc.get_exact(author, b"foo".to_vec(), false).await?;
assert!(entry.is_none());
n0_future::time::sleep(Duration::from_secs(2)).await;
let bytes = client.blobs().get_bytes(hash).await;
assert!(bytes.is_err());
node.shutdown().await?;
Ok(())
}
#[tokio::test]
#[traced_test]
async fn sync_drop_doc() -> Result<()> {
let mut rng = test_rng(b"sync_drop_doc");
let node = spawn_node(0, &mut rng).await?;
let client = node.client();
let doc = client.docs().create().await?;
let author = client.docs().author_create().await?;
let mut sub = doc.subscribe().await?;
doc.set_bytes(author, b"foo".to_vec(), b"bar".to_vec())
.await?;
let ev = sub.next().await;
assert!(matches!(ev, Some(Ok(LiveEvent::InsertLocal { .. }))));
client.docs().drop_doc(doc.id()).await?;
let res = doc.get_exact(author, b"foo".to_vec(), true).await;
assert!(res.is_err());
let res = doc
.set_bytes(author, b"foo".to_vec(), b"bar".to_vec())
.await;
assert!(res.is_err());
let res = client.docs().open(doc.id()).await;
assert!(res.is_err());
let ev = sub.next().await;
assert!(ev.is_none());
Ok(())
}
async fn assert_latest(blobs: &iroh_blobs::api::Store, doc: &Doc, key: &[u8], value: &[u8]) {
let content = get_latest(blobs, doc, key).await.unwrap();
assert_eq!(content, value.to_vec());
}
async fn get_latest(
blobs: &iroh_blobs::api::Store,
doc: &Doc,
key: &[u8],
) -> anyhow::Result<Vec<u8>> {
let query = Query::single_latest_per_key().key_exact(key);
let stream = doc.get_many(query).await?;
tokio::pin!(stream);
let entry = stream
.next()
.await
.ok_or_else(|| anyhow!("entry not found"))??;
let content = blobs.get_bytes(entry.content_hash()).await?;
Ok(content.to_vec())
}
async fn next<T: std::fmt::Debug>(mut stream: impl Stream<Item = Result<T>> + Unpin) -> T {
let event = stream
.next()
.await
.expect("stream ended")
.expect("stream produced error");
debug!("Event: {event:?}");
event
}
#[allow(clippy::type_complexity)]
fn apply_matchers<T>(item: &T, matchers: &mut Vec<Box<dyn Fn(&T) -> bool + Send>>) -> bool {
for i in 0..matchers.len() {
if matchers[i](item) {
let _ = matchers.remove(i);
return true;
}
}
false
}
#[allow(clippy::type_complexity)]
async fn assert_next<T: std::fmt::Debug + Clone>(
mut stream: impl Stream<Item = Result<T>> + Unpin + Send,
timeout: Duration,
matchers: Vec<Box<dyn Fn(&T) -> bool + Send>>,
) -> Vec<T> {
let fut = async {
let mut items = vec![];
for (i, f) in matchers.iter().enumerate() {
let item = stream
.next()
.await
.expect("event stream ended prematurely")
.expect("event stream errored");
if !(f)(&item) {
panic!("assertion failed for event {i} {item:?}");
}
items.push(item);
}
items
};
let res = n0_future::time::timeout(timeout, fut).await;
res.expect("timeout reached")
}
#[allow(clippy::type_complexity)]
async fn assert_next_unordered<T: std::fmt::Debug + Clone>(
stream: impl Stream<Item = Result<T>> + Unpin + Send,
timeout: Duration,
matchers: Vec<Box<dyn Fn(&T) -> bool + Send>>,
) -> Vec<T> {
assert_next_unordered_with_optionals(stream, timeout, matchers, vec![]).await
}
#[allow(clippy::type_complexity)]
async fn assert_next_unordered_with_optionals<T: std::fmt::Debug + Clone>(
mut stream: impl Stream<Item = Result<T>> + Unpin + Send,
timeout: Duration,
mut required_matchers: Vec<Box<dyn Fn(&T) -> bool + Send>>,
mut optional_matchers: Vec<Box<dyn Fn(&T) -> bool + Send>>,
) -> Vec<T> {
let max = required_matchers.len() + optional_matchers.len();
let required = required_matchers.len();
let events = Arc::new(parking_lot::Mutex::new(vec![]));
let fut = async {
while let Some(event) = stream.next().await {
let event = event.context("failed to read from stream")?;
let len = {
let mut events = events.lock();
events.push(event.clone());
events.len()
};
if !apply_matchers(&event, &mut required_matchers)
&& !apply_matchers(&event, &mut optional_matchers)
{
bail!("Event didn't match any matcher: {event:?}");
}
if required_matchers.is_empty() || len == max {
break;
}
}
if !required_matchers.is_empty() {
bail!(
"Matched only {} of {required} required matchers",
required - required_matchers.len()
);
}
Ok(())
};
tokio::pin!(fut);
let res = n0_future::time::timeout(timeout, fut)
.await
.map_err(|_| anyhow!("Timeout reached ({timeout:?})"))
.and_then(|res| res);
let events = events.lock().clone();
if let Err(err) = &res {
println!("Received events: {events:#?}");
println!(
"Received {} events, expected between {required} and {max}",
events.len()
);
panic!("Failed to receive or match all events: {err:?}");
}
events
}
fn match_sync_finished(event: &LiveEvent, peer: PublicKey) -> bool {
let LiveEvent::SyncFinished(e) = event else {
return false;
};
e.peer == peer && e.result.is_ok()
}