use bytes::Bytes;
use kitsune2::default_builder;
use kitsune2_api::{
BoxFut, Builder, Config, DhtArc, DynKitsune, DynSpace, DynSpaceHandler, Id,
K2Result, KitsuneHandler, LocalAgent, OpId, SpaceHandler, SpaceId,
Timestamp,
};
use kitsune2_core::{
Ed25519LocalAgent,
factories::{
MemoryOp,
config::{CoreBootstrapConfig, CoreBootstrapModConfig},
},
};
use kitsune2_gossip::{K2GossipConfig, K2GossipModConfig};
use kitsune2_test_utils::{
bootstrap::TestBootstrapSrv, enable_tracing, iter_check, random_bytes,
space::TEST_SPACE_ID,
};
#[cfg(all(
not(feature = "transport-tx5-backend-go-pion"),
feature = "transport-iroh"
))]
use kitsune2_transport_iroh::{
IrohTransportFactory,
config::{IrohTransportConfig, IrohTransportModConfig},
};
use std::sync::Arc;
#[cfg(feature = "transport-tx5-backend-go-pion")]
use {
kitsune2_transport_tx5::{
Tx5TransportFactory,
config::{Tx5TransportConfig, Tx5TransportModConfig},
},
sbd_server::SbdServer,
};
fn create_op_list(num_ops: u16) -> (Vec<Bytes>, Vec<OpId>) {
let mut ops = Vec::new();
let mut op_ids = Vec::new();
for _ in 0..num_ops {
let op = MemoryOp::new(Timestamp::from_micros(0), random_bytes(256));
let op_id = op.compute_op_id();
ops.push(op.into());
op_ids.push(op_id);
}
(ops, op_ids)
}
#[derive(Debug)]
struct TestKitsuneHandler;
impl KitsuneHandler for TestKitsuneHandler {
fn create_space(
&self,
_space_id: SpaceId,
_config_override: Option<&Config>,
) -> BoxFut<'_, K2Result<DynSpaceHandler>> {
Box::pin(async {
let space_handler: DynSpaceHandler = Arc::new(TestSpaceHandler);
Ok(space_handler)
})
}
}
#[derive(Debug)]
struct TestSpaceHandler;
impl SpaceHandler for TestSpaceHandler {}
async fn make_kitsune_node(
relay_server_url: &str,
bootstrap_server_url: &str,
) -> DynKitsune {
let kitsune_builder = Builder {
#[cfg(feature = "transport-tx5-backend-go-pion")]
transport: Tx5TransportFactory::create(),
#[cfg(all(
not(feature = "transport-tx5-backend-go-pion"),
feature = "transport-iroh"
))]
transport: IrohTransportFactory::create(),
..default_builder()
}
.with_default_config()
.unwrap();
kitsune_builder
.config
.set_module_config(&CoreBootstrapModConfig {
core_bootstrap: CoreBootstrapConfig {
server_url: Some(bootstrap_server_url.to_owned()),
backoff_min_ms: 1000,
backoff_max_ms: 1000,
..Default::default()
},
})
.unwrap();
#[cfg(feature = "transport-tx5-backend-go-pion")]
kitsune_builder
.config
.set_module_config(&Tx5TransportModConfig {
tx5_transport: Tx5TransportConfig {
server_url: relay_server_url.to_owned(),
signal_allow_plain_text: true,
timeout_s: 5,
webrtc_connect_timeout_s: 3,
..Default::default()
},
})
.unwrap();
#[cfg(all(
not(feature = "transport-tx5-backend-go-pion"),
feature = "transport-iroh"
))]
kitsune_builder
.config
.set_module_config(&IrohTransportModConfig {
iroh_transport: IrohTransportConfig {
relay_url: Some(relay_server_url.to_string()),
relay_allow_plain_text: true,
..Default::default()
},
})
.unwrap();
kitsune_builder
.config
.set_module_config(&K2GossipModConfig {
k2_gossip: K2GossipConfig {
initiate_interval_ms: 1000,
min_initiate_interval_ms: 100,
initiate_jitter_ms: 100,
round_timeout_ms: 10_000,
..Default::default()
},
})
.unwrap();
let kitsune_handler = Arc::new(TestKitsuneHandler);
let kitsune = kitsune_builder.build().await.unwrap();
kitsune
.register_handler(kitsune_handler.clone())
.await
.unwrap();
kitsune
}
#[cfg(feature = "transport-tx5-backend-go-pion")]
async fn sbd_signal_server() -> (String, SbdServer) {
let signal_server = SbdServer::new(Arc::new(sbd_server::Config {
bind: vec!["127.0.0.1:0".to_string()],
..Default::default()
}))
.await
.unwrap();
let relay_server_url = format!("ws://{}", signal_server.bind_addrs()[0]);
(relay_server_url, signal_server)
}
#[cfg(all(
not(feature = "transport-tx5-backend-go-pion"),
feature = "transport-iroh"
))]
async fn iroh_relay_from_bootstrap(bootstrap: &TestBootstrapSrv) -> String {
format!("{}/relay", bootstrap.addr())
}
async fn start_space(kitsune: &DynKitsune) -> DynSpace {
let space = kitsune.space(TEST_SPACE_ID, None).await.unwrap();
let local_agent = Arc::new(Ed25519LocalAgent::default());
local_agent.set_tgt_storage_arc_hint(DhtArc::FULL);
space.local_agent_join(local_agent.clone()).await.unwrap();
iter_check!(5000, 100, {
let agent = local_agent.agent().clone();
match space.peer_store().get(agent.clone()).await {
Ok(Some(peer)) => {
tracing::info!("Found local agent in peer store: {:?}", peer);
break;
}
Ok(None) => {
tracing::debug!(
"Local agent not yet in peer store: {:?}",
agent
);
}
Err(e) => {
tracing::error!(
"Error getting local agent from peer store: {:?}",
e
);
panic!("Peer store error: {e:?}");
}
}
});
space
}
#[tokio::test]
async fn two_node_gossip() {
enable_tracing();
let bootstrap_server = TestBootstrapSrv::new(false).await;
let bootstrap_server_url = bootstrap_server.addr().to_string();
#[cfg(feature = "transport-tx5-backend-go-pion")]
let (relay_server_url, _relay_server) = sbd_signal_server().await;
#[cfg(all(
not(feature = "transport-tx5-backend-go-pion"),
feature = "transport-iroh"
))]
let relay_server_url = iroh_relay_from_bootstrap(&bootstrap_server).await;
let kitsune_1 =
make_kitsune_node(&relay_server_url, &bootstrap_server_url).await;
let kitsune_2 =
make_kitsune_node(&relay_server_url, &bootstrap_server_url).await;
let space_1 = start_space(&kitsune_1).await;
let space_2 = start_space(&kitsune_2).await;
let (ops_1, op_ids_1) = create_op_list(1000);
space_1
.op_store()
.process_incoming_ops(ops_1.clone())
.await
.unwrap();
let (ops_2, op_ids_2) = create_op_list(1000);
space_2
.op_store()
.process_incoming_ops(ops_2.clone())
.await
.unwrap();
iter_check!(60_000, 1_000, {
let actual_ops_1 = space_1
.op_store()
.retrieve_ops(op_ids_2.clone())
.await
.unwrap();
let actual_ops_2 = space_2
.op_store()
.retrieve_ops(op_ids_1.clone())
.await
.unwrap();
if actual_ops_1.len() == ops_2.len()
&& actual_ops_2.len() == ops_1.len()
{
break;
} else {
tracing::info!(
"space 1 actual ops received {}/expected {}",
actual_ops_1.len(),
ops_2.len()
);
tracing::info!(
"space 2 actual ops received {}/expected {}",
actual_ops_2.len(),
ops_1.len()
);
}
});
}
#[cfg(feature = "transport-tx5-backend-go-pion")]
#[tokio::test]
async fn shutdown_space() {
enable_tracing();
let bootstrap_server = TestBootstrapSrv::new(false).await;
let bootstrap_server_url = bootstrap_server.addr().to_string();
#[cfg(feature = "transport-tx5-backend-go-pion")]
let (relay_server_url, _relay_server) = sbd_signal_server().await;
#[cfg(all(
not(feature = "transport-tx5-backend-go-pion"),
feature = "transport-iroh"
))]
let relay_server_url = iroh_relay_from_bootstrap(&bootstrap_server).await;
let kitsune_1 =
make_kitsune_node(&relay_server_url, &bootstrap_server_url).await;
let kitsune_2 =
make_kitsune_node(&relay_server_url, &bootstrap_server_url).await;
let metrics = tokio::runtime::Handle::current().metrics();
let initial_tasks = metrics.num_alive_tasks();
let space_1 = start_space(&kitsune_1).await;
let space_2 = start_space(&kitsune_2).await;
let (ops_1, op_ids_1) = create_op_list(10);
space_1
.op_store()
.process_incoming_ops(ops_1.clone())
.await
.unwrap();
let (ops_2, op_ids_2) = create_op_list(10);
space_2
.op_store()
.process_incoming_ops(ops_2.clone())
.await
.unwrap();
iter_check!(15000, 500, {
let actual_ops_1 = space_1
.op_store()
.retrieve_ops(op_ids_2.clone())
.await
.unwrap();
let actual_ops_2 = space_2
.op_store()
.retrieve_ops(op_ids_1.clone())
.await
.unwrap();
if actual_ops_1.len() == ops_2.len()
&& actual_ops_2.len() == ops_1.len()
{
break;
} else {
println!(
"space 1 actual ops received {}/expected {}",
actual_ops_1.len(),
ops_2.len()
);
println!(
"space 2 actual ops received {}/expected {}",
actual_ops_2.len(),
ops_1.len()
);
}
});
let err = kitsune_1.remove_space(TEST_SPACE_ID).await.unwrap_err();
assert!(
err.to_string()
.contains("Cannot remove space with local agents"),
"Got error: {err}"
);
for local_agent in space_1.local_agent_store().get_all().await.unwrap() {
space_1.local_agent_leave(local_agent.agent().clone()).await;
}
for local_agent in space_2.local_agent_store().get_all().await.unwrap() {
space_2.local_agent_leave(local_agent.agent().clone()).await;
}
iter_check!(5000, 500, {
let all_peers_tombstone_1 = space_1
.peer_store()
.get_all()
.await
.unwrap()
.iter()
.all(|a| a.url.is_none());
let all_peers_tombstone_2 = space_2
.peer_store()
.get_all()
.await
.unwrap()
.iter()
.all(|a| a.url.is_none());
if all_peers_tombstone_1 && all_peers_tombstone_2 {
break;
} else {
println!(
"space 1 peers: {:?}",
space_1.peer_store().get_all().await.unwrap()
);
println!(
"space 2 peers: {:?}",
space_2.peer_store().get_all().await.unwrap()
);
}
});
drop(space_1);
drop(space_2);
kitsune_1.remove_space(TEST_SPACE_ID).await.unwrap();
kitsune_2.remove_space(TEST_SPACE_ID).await.unwrap();
iter_check!(30000, 100, {
let current_tasks = metrics.num_alive_tasks();
if current_tasks == initial_tasks {
break;
} else {
println!(
"Current tasks: {current_tasks}, Initial tasks: {initial_tasks}"
);
}
});
assert!(kitsune_1.space_if_exists(TEST_SPACE_ID).await.is_none());
assert!(kitsune_2.space_if_exists(TEST_SPACE_ID).await.is_none());
}
#[tokio::test]
async fn test_space_should_not_start_without_bootstrap_url_configured() {
enable_tracing();
let kitsune_builder = default_builder().with_default_config().unwrap();
#[cfg(feature = "transport-tx5-backend-go-pion")]
{
let signal_server = SbdServer::new(Arc::new(sbd_server::Config {
bind: vec!["127.0.0.1:0".to_string()],
..Default::default()
}))
.await
.unwrap();
let signal_server_url =
format!("ws://{}", signal_server.bind_addrs()[0]);
kitsune_builder
.config
.set_module_config(&Tx5TransportModConfig {
tx5_transport: Tx5TransportConfig {
server_url: signal_server_url.to_owned(),
signal_allow_plain_text: true,
timeout_s: 5,
webrtc_connect_timeout_s: 3,
..Default::default()
},
})
.unwrap();
}
kitsune_builder
.config
.set_module_config(&K2GossipModConfig {
k2_gossip: K2GossipConfig {
initiate_interval_ms: 1000,
min_initiate_interval_ms: 100,
initiate_jitter_ms: 100,
round_timeout_ms: 10_000,
..Default::default()
},
})
.unwrap();
let kitsune = kitsune_builder.build().await.expect("Build Kitsune2");
let kitsune_handler = Arc::new(TestKitsuneHandler);
kitsune
.register_handler(kitsune_handler.clone())
.await
.expect("Register handler");
let result = kitsune.space(TEST_SPACE_ID, None).await;
assert!(
result.is_err(),
"Expected creating a space to fail when no bootstrap URL is configured"
);
}
#[tokio::test]
async fn test_should_start_space_with_different_bootstrap_urls() {
enable_tracing();
let bootstrap_a = TestBootstrapSrv::new(false).await;
let bootstrap_b = TestBootstrapSrv::new(false).await;
let bootstrap_url_a = bootstrap_a.addr().to_string();
let bootstrap_url_b = bootstrap_b.addr().to_string();
let kitsune_builder = default_builder().with_default_config().unwrap();
#[cfg(feature = "transport-tx5-backend-go-pion")]
{
let signal_server = SbdServer::new(Arc::new(sbd_server::Config {
bind: vec!["127.0.0.1:0".to_string()],
..Default::default()
}))
.await
.unwrap();
let signal_server_url =
format!("ws://{}", signal_server.bind_addrs()[0]);
kitsune_builder
.config
.set_module_config(&Tx5TransportModConfig {
tx5_transport: Tx5TransportConfig {
server_url: signal_server_url.to_owned(),
signal_allow_plain_text: true,
timeout_s: 5,
webrtc_connect_timeout_s: 3,
..Default::default()
},
})
.unwrap();
}
kitsune_builder
.config
.set_module_config(&K2GossipModConfig {
k2_gossip: K2GossipConfig {
initiate_interval_ms: 1000,
min_initiate_interval_ms: 100,
initiate_jitter_ms: 100,
round_timeout_ms: 10_000,
..Default::default()
},
})
.unwrap();
let kitsune = kitsune_builder.build().await.unwrap();
kitsune
.register_handler(Arc::new(TestKitsuneHandler))
.await
.unwrap();
let config_a = Config::default();
config_a
.set_module_config(&CoreBootstrapModConfig {
core_bootstrap: CoreBootstrapConfig {
server_url: Some(bootstrap_url_a.clone()),
..Default::default()
},
})
.unwrap();
let config_b = Config::default();
config_b
.set_module_config(&CoreBootstrapModConfig {
core_bootstrap: CoreBootstrapConfig {
server_url: Some(bootstrap_url_b.clone()),
..Default::default()
},
})
.unwrap();
let space_a = kitsune
.space(TEST_SPACE_ID, Some(config_a))
.await
.expect("Create space A");
let space_b = kitsune
.space(SpaceId(Id(Bytes::from("space_b"))), Some(config_b))
.await
.expect("Create space B");
let agent_a = Arc::new(Ed25519LocalAgent::default());
agent_a.set_tgt_storage_arc_hint(DhtArc::FULL);
iter_check!(60_000, 1_000, {
if space_a.local_agent_join(agent_a.clone()).await.is_ok() {
break;
}
});
let agent_b = Arc::new(Ed25519LocalAgent::default());
agent_b.set_tgt_storage_arc_hint(DhtArc::FULL);
iter_check!(60_000, 1_000, {
if space_b.local_agent_join(agent_b.clone()).await.is_ok() {
break;
}
});
let agent_a_id = agent_a.agent();
let agent_b_id = agent_b.agent();
assert!(space_a.peer_store().get(agent_a_id.clone()).await.is_ok());
assert!(space_b.peer_store().get(agent_b_id.clone()).await.is_ok());
assert!(
space_a
.peer_store()
.get(agent_b_id.clone())
.await
.unwrap()
.is_none(),
"Agent B should not be in space A's peer store"
);
assert!(
space_b
.peer_store()
.get(agent_a_id.clone())
.await
.unwrap()
.is_none(),
"Agent A should not be in space B's peer store"
);
}
#[cfg(all(
not(feature = "transport-tx5-backend-go-pion"),
feature = "transport-iroh"
))]
#[tokio::test]
async fn two_spaces_different_relays() {
enable_tracing();
let bootstrap_a = TestBootstrapSrv::new(false).await;
let bootstrap_b = TestBootstrapSrv::new(false).await;
let bootstrap_url_a = bootstrap_a.addr().to_string();
let bootstrap_url_b = bootstrap_b.addr().to_string();
let relay_url_a = format!("{}/relay", bootstrap_a.addr());
let relay_url_b = format!("{}/relay", bootstrap_b.addr());
let kitsune_builder = default_builder().with_default_config().unwrap();
kitsune_builder
.config
.set_module_config(&IrohTransportModConfig {
iroh_transport: IrohTransportConfig {
relay_url: None,
relay_allow_plain_text: true,
..Default::default()
},
})
.unwrap();
kitsune_builder
.config
.set_module_config(&K2GossipModConfig {
k2_gossip: K2GossipConfig {
initiate_interval_ms: 1000,
min_initiate_interval_ms: 100,
initiate_jitter_ms: 100,
round_timeout_ms: 10_000,
..Default::default()
},
})
.unwrap();
let kitsune = kitsune_builder.build().await.unwrap();
kitsune
.register_handler(Arc::new(TestKitsuneHandler))
.await
.unwrap();
let config_a = Config::default();
config_a
.set_module_config(&CoreBootstrapModConfig {
core_bootstrap: CoreBootstrapConfig {
server_url: Some(bootstrap_url_a.clone()),
..Default::default()
},
})
.unwrap();
config_a
.set_module_config(&IrohTransportModConfig {
iroh_transport: IrohTransportConfig {
relay_url: Some(relay_url_a.clone()),
relay_allow_plain_text: true,
..Default::default()
},
})
.unwrap();
let config_b = Config::default();
config_b
.set_module_config(&CoreBootstrapModConfig {
core_bootstrap: CoreBootstrapConfig {
server_url: Some(bootstrap_url_b.clone()),
..Default::default()
},
})
.unwrap();
config_b
.set_module_config(&IrohTransportModConfig {
iroh_transport: IrohTransportConfig {
relay_url: Some(relay_url_b.clone()),
relay_allow_plain_text: true,
..Default::default()
},
})
.unwrap();
let space_id_a = TEST_SPACE_ID;
let space_id_b = SpaceId(Id(Bytes::from("space_b_relay")));
let space_a = kitsune
.space(space_id_a, Some(config_a))
.await
.expect("Create space A with per-space relay");
let space_b = kitsune
.space(space_id_b, Some(config_b))
.await
.expect("Create space B with per-space relay");
let agent_a = Arc::new(Ed25519LocalAgent::default());
agent_a.set_tgt_storage_arc_hint(DhtArc::FULL);
iter_check!(60_000, 1_000, {
if space_a.local_agent_join(agent_a.clone()).await.is_ok() {
break;
}
});
let agent_b = Arc::new(Ed25519LocalAgent::default());
agent_b.set_tgt_storage_arc_hint(DhtArc::FULL);
iter_check!(60_000, 1_000, {
if space_b.local_agent_join(agent_b.clone()).await.is_ok() {
break;
}
});
let agent_a_id = agent_a.agent();
let agent_b_id = agent_b.agent();
iter_check!(10_000, 500, {
if space_a
.peer_store()
.get(agent_a_id.clone())
.await
.unwrap()
.is_some()
{
break;
}
});
iter_check!(10_000, 500, {
if space_b
.peer_store()
.get(agent_b_id.clone())
.await
.unwrap()
.is_some()
{
break;
}
});
assert!(
space_a
.peer_store()
.get(agent_b_id.clone())
.await
.unwrap()
.is_none(),
"Agent B should not be in space A's peer store"
);
assert!(
space_b
.peer_store()
.get(agent_a_id.clone())
.await
.unwrap()
.is_none(),
"Agent A should not be in space B's peer store"
);
let host_a = bootstrap_a
.addr()
.strip_prefix("http://")
.unwrap_or(&bootstrap_url_a)
.to_string();
let host_b = bootstrap_b
.addr()
.strip_prefix("http://")
.unwrap_or(&bootstrap_url_b)
.to_string();
iter_check!(10_000, 500, {
if let Some(url) = space_a.current_url() {
let s = url.to_string();
if s.contains(&host_a) {
tracing::info!("Space A URL: {s}");
break;
}
}
});
iter_check!(10_000, 500, {
if let Some(url) = space_b.current_url() {
let s = url.to_string();
if s.contains(&host_b) {
tracing::info!("Space B URL: {s}");
break;
}
}
});
assert_ne!(
host_a, host_b,
"Relay hosts should differ between the two spaces"
);
}