#![allow(clippy::too_many_arguments)]
use crate::actor::{GetLinksRequestOptions, NetworkRequestOptions};
use crate::metrics::{
p2p_handle_incoming_request_duration_metric, p2p_handle_incoming_request_ignored_metric,
p2p_outgoing_request_duration_metric, p2p_recv_remote_signal_metric,
};
use crate::*;
use holochain_sqlite::error::{DatabaseError, DatabaseResult};
use holochain_sqlite::helpers::BytesSql;
use holochain_sqlite::rusqlite::types::Value;
use holochain_sqlite::sql::sql_peer_meta_store;
use holochain_state::prelude::named_params;
use holochain_types::cell_config_overrides::CellConfigOverrides;
use kitsune2_api::*;
use kitsune2_core::get_responsive_remote_agents_near_location;
use rand::prelude::IndexedRandom;
use std::collections::HashMap;
use std::future::Future;
use std::rc::Rc;
use std::sync::{Mutex, Weak};
use std::time::Duration;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::task::AbortHandle;
macro_rules! timing_trace {
($netaudit:literal, $code:block $($rest:tt)*) => {{
let __start = std::time::Instant::now();
let __out = $code;
Box::pin(async move {
let __out = __out.await;
let __elapsed_s = __start.elapsed().as_secs_f64();
if __elapsed_s >= 5.0 {
if $netaudit {
tracing::warn!( target: "NETAUDIT", m = "holochain_p2p", elapsed_s = %__elapsed_s $($rest)* );
} else {
tracing::warn!( elapsed_s = %__elapsed_s $($rest)* );
}
} else {
if $netaudit {
tracing::trace!( target: "NETAUDIT", m = "holochain_p2p", elapsed_s = %__elapsed_s $($rest)* );
} else {
tracing::trace!( elapsed_s = %__elapsed_s $($rest)* );
}
}
__out
})
}};
}
#[derive(Clone, Debug)]
pub struct WrapEvtSender(pub event::DynHcP2pHandler);
impl event::HcP2pHandler for WrapEvtSender {
fn handle_call_remote(
&self,
dna_hash: DnaHash,
to_agent: AgentPubKey,
zome_call_params_serialized: ExternIO,
signature: Signature,
) -> BoxFut<'_, HolochainP2pResult<SerializedBytes>> {
let byte_count = zome_call_params_serialized.0.len();
timing_trace!(
true,
{
self.0.handle_call_remote(
dna_hash,
to_agent,
zome_call_params_serialized,
signature,
)
},
byte_count,
a = "recv_call_remote",
)
}
fn handle_publish(
&self,
dna_hash: DnaHash,
ops: Vec<holochain_types::dht_op::DhtOp>,
) -> BoxFut<'_, HolochainP2pResult<()>> {
let op_count = ops.len();
timing_trace!(
true,
{
self.0.handle_publish(dna_hash, ops)
}, %op_count, a = "recv_publish")
}
fn handle_get(
&self,
dna_hash: DnaHash,
to_agent: AgentPubKey,
dht_hash: holo_hash::AnyDhtHash,
) -> BoxFut<'_, HolochainP2pResult<WireOps>> {
timing_trace!(
true,
{ self.0.handle_get(dna_hash, to_agent, dht_hash) },
a = "recv_get",
)
}
fn handle_get_links(
&self,
dna_hash: DnaHash,
to_agent: AgentPubKey,
link_key: WireLinkKey,
options: event::GetLinksOptions,
) -> BoxFut<'_, HolochainP2pResult<WireLinkOps>> {
timing_trace!(
true,
{
self.0
.handle_get_links(dna_hash, to_agent, link_key, options)
},
a = "recv_get_links",
)
}
fn handle_count_links(
&self,
dna_hash: DnaHash,
to_agent: AgentPubKey,
query: WireLinkQuery,
) -> BoxFut<'_, HolochainP2pResult<CountLinksResponse>> {
timing_trace!(
true,
{ self.0.handle_count_links(dna_hash, to_agent, query) },
a = "recv_count_links"
)
}
fn handle_get_agent_activity(
&self,
dna_hash: DnaHash,
to_agent: AgentPubKey,
agent: AgentPubKey,
query: ChainQueryFilter,
options: event::GetActivityOptions,
) -> BoxFut<'_, HolochainP2pResult<AgentActivityResponse>> {
timing_trace!(
true,
{
self.0
.handle_get_agent_activity(dna_hash, to_agent, agent, query, options)
},
a = "recv_get_agent_activity",
)
}
fn handle_must_get_agent_activity(
&self,
dna_hash: DnaHash,
to_agent: AgentPubKey,
agent: AgentPubKey,
filter: holochain_zome_types::chain::ChainFilter,
) -> BoxFut<'_, HolochainP2pResult<MustGetAgentActivityResponse>> {
timing_trace!(
true,
{
self.0
.handle_must_get_agent_activity(dna_hash, to_agent, agent, filter)
},
a = "recv_must_get_agent_activity",
)
}
fn handle_validation_receipts_received(
&self,
dna_hash: DnaHash,
to_agent: AgentPubKey,
receipts: ValidationReceiptBundle,
) -> BoxFut<'_, HolochainP2pResult<()>> {
timing_trace!(
false,
{
self.0
.handle_validation_receipts_received(dna_hash, to_agent, receipts)
},
a = "recv_validation_receipt_received",
)
}
fn handle_publish_countersign(
&self,
dna_hash: DnaHash,
op: holochain_types::dht_op::ChainOp,
) -> BoxFut<'_, HolochainP2pResult<()>> {
timing_trace!(
true,
{ self.0.handle_publish_countersign(dna_hash, op) },
a = "recv_publish_countersign"
)
}
fn handle_countersigning_session_negotiation(
&self,
dna_hash: DnaHash,
to_agent: AgentPubKey,
message: event::CountersigningSessionNegotiationMessage,
) -> BoxFut<'_, HolochainP2pResult<()>> {
timing_trace!(
false,
{
self.0
.handle_countersigning_session_negotiation(dna_hash, to_agent, message)
},
a = "recv_countersigning_session_negotiation"
)
}
}
type Respond = tokio::sync::oneshot::Sender<crate::wire::WireMessage>;
struct Pending {
this: Weak<Mutex<Self>>,
map: HashMap<u64, Respond>,
}
impl Pending {
fn register(&mut self, msg_id: u64, resp: Respond, timeout: Duration) {
if let Some(this) = self.this.upgrade() {
self.map.insert(msg_id, resp);
tokio::task::spawn(async move {
tokio::time::sleep(timeout).await;
let _ = this.lock().unwrap().respond(msg_id);
});
}
}
fn respond(&mut self, msg_id: u64) -> Option<Respond> {
self.map.remove(&msg_id)
}
}
pub(crate) struct HolochainP2pActor {
this: Weak<Self>,
target_arc_factor: u32,
compat: NetworkCompatParams,
preflight: Arc<Mutex<bytes::Bytes>>,
evt_sender: Arc<std::sync::OnceLock<WrapEvtSender>>,
lair_client: holochain_keystore::MetaLairClient,
kitsune: DynKitsune,
kitsune2_config: Config,
blocks_db_getter: GetDbConductor,
pending: Arc<Mutex<Pending>>,
pruning_task_abort_handle: AbortHandle,
request_timeout: Duration,
incoming_request_concurrency_limit_semaphore: Arc<Semaphore>,
}
impl std::fmt::Debug for HolochainP2pActor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HolochainP2pActor").finish()
}
}
const EVT_REG_ERR: &str = "event handler not registered";
impl SpaceHandler for HolochainP2pActor {
fn recv_notify(&self, from_peer: Url, space: SpaceId, data: bytes::Bytes) -> K2Result<()> {
for msg in WireMessage::decode_batch(&data).map_err(|err| {
K2Error::other_src("decode incoming holochain_p2p wire message batch", err)
})? {
let is_incoming_request_concurrency_limited = matches!(
msg,
WireMessage::GetReq { .. }
| WireMessage::GetLinksReq { .. }
| WireMessage::CountLinksReq { .. }
| WireMessage::GetAgentActivityReq { .. }
| WireMessage::MustGetAgentActivityReq { .. }
);
if is_incoming_request_concurrency_limited {
let Ok(permit) = self
.incoming_request_concurrency_limit_semaphore
.clone()
.try_acquire_owned()
else {
tracing::debug!(?from_peer, ?space, "An incoming authority request message was received but we have already reached the limit of concurrently handled authority requests. The message will be ignored.");
let dna_hash = DnaHash::from_k2_space(&space);
let attributes = vec![
opentelemetry::KeyValue::new("message_type", msg.as_ref().to_string()),
opentelemetry::KeyValue::new("dna_hash", format!("{dna_hash:?}")),
];
p2p_handle_incoming_request_ignored_metric().add(1, &attributes);
continue;
};
self.handle_space_wire_message_received(
msg,
space.clone(),
from_peer.clone(),
Some(permit),
);
} else {
self.handle_space_wire_message_received(
msg,
space.clone(),
from_peer.clone(),
None,
);
}
}
Ok(())
}
}
impl kitsune2_api::KitsuneHandler for HolochainP2pActor {
fn create_space(
&self,
_space: kitsune2_api::SpaceId,
_config: Option<&Config>,
) -> BoxFut<'_, kitsune2_api::K2Result<kitsune2_api::DynSpaceHandler>> {
Box::pin(async move {
let this: Weak<dyn kitsune2_api::SpaceHandler> = self.this.clone();
if let Some(this) = this.upgrade() {
Ok(this)
} else {
Err(kitsune2_api::K2Error::other(
"HolochainP2pActor instance has been dropped",
))
}
})
}
fn preflight_gather_outgoing(&self, _peer_url: Url) -> BoxFut<'_, K2Result<bytes::Bytes>> {
Box::pin(async move { Ok(self.preflight.lock().unwrap().clone()) })
}
fn preflight_validate_incoming(
&self,
_peer_url: Url,
data: bytes::Bytes,
) -> BoxFut<'_, K2Result<()>> {
Box::pin(async move {
let rem = crate::wire::WirePreflightMessage::decode(&data)
.map_err(|err| K2Error::other_src("Invalid remote preflight", err))?;
if rem.compat != self.compat {
return Err(K2Error::other(format!(
"Invalid remote preflight, wanted {:?}, got: {:?}",
self.compat, rem.compat
)));
}
let mut agents = Vec::with_capacity(rem.agents.len());
for agent in rem.agents {
agents.push(AgentInfoSigned::decode(
&kitsune2_core::Ed25519Verifier,
agent.as_bytes(),
)?);
}
if !agents.is_empty() {
let kitsune = self.kitsune.clone();
for agent in agents {
let space = match kitsune.space_if_exists(agent.space.clone()).await {
None => continue,
Some(space) => space,
};
space.peer_store().insert(vec![agent]).await?;
}
}
Ok(())
})
}
}
#[derive(Debug)]
struct BootWrap {
compat: NetworkCompatParams,
preflight: Arc<std::sync::Mutex<bytes::Bytes>>,
orig: kitsune2_api::DynBootstrap,
cache: std::sync::Mutex<Vec<Arc<AgentInfoSigned>>>,
}
impl kitsune2_api::Bootstrap for BootWrap {
fn put(&self, info: Arc<AgentInfoSigned>) {
let Self {
compat,
preflight,
orig,
cache,
} = self;
let agents = {
let mut cache = cache.lock().unwrap();
let now = kitsune2_api::Timestamp::now();
cache.retain(|cache_info| {
if cache_info.expires_at < now {
return false;
}
if cache_info.agent == info.agent && cache_info.space == info.space {
return false;
}
true
});
cache.push(info.clone());
let mut agents = Vec::new();
for agent in cache.iter() {
if let Ok(encoded) = agent.encode() {
agents.push(encoded);
}
}
agents
};
if let Ok(encoded) = (crate::wire::WirePreflightMessage {
compat: compat.clone(),
agents,
})
.encode()
{
*preflight.lock().unwrap() = encoded;
}
orig.put(info);
}
}
#[derive(Debug)]
struct BootWrapFact {
compat: NetworkCompatParams,
preflight: Arc<std::sync::Mutex<bytes::Bytes>>,
orig: kitsune2_api::DynBootstrapFactory,
}
impl kitsune2_api::BootstrapFactory for BootWrapFact {
fn default_config(&self, config: &mut Config) -> K2Result<()> {
self.orig.default_config(config)
}
fn validate_config(&self, config: &Config) -> K2Result<()> {
self.orig.validate_config(config)
}
fn create(
&self,
builder: Arc<Builder>,
peer_store: DynPeerStore,
space: SpaceId,
) -> BoxFut<'static, K2Result<DynBootstrap>> {
let compat = self.compat.clone();
let preflight = self.preflight.clone();
let orig_fut = self.orig.create(builder, peer_store, space);
Box::pin(async move {
let orig = orig_fut.await?;
let out: DynBootstrap = Arc::new(BootWrap {
compat,
preflight,
orig,
cache: Default::default(),
});
Ok(out)
})
}
}
impl Drop for HolochainP2pActor {
fn drop(&mut self) {
self.pruning_task_abort_handle.abort();
}
}
impl HolochainP2pActor {
pub async fn create(
config: HolochainP2pConfig,
lair_client: holochain_keystore::MetaLairClient,
) -> HolochainP2pResult<actor::DynHcP2p> {
check_k2_init();
let mut builder = kitsune2::default_builder();
#[cfg(feature = "test_utils")]
{
if config.disable_bootstrap {
builder.bootstrap = Arc::new(test::NoopBootstrapFactory);
} else if config.mem_bootstrap {
tracing::info!("Running with mem bootstrap");
builder.bootstrap = kitsune2_core::factories::MemBootstrapFactory::create();
}
if config.disable_gossip {
tracing::info!("Running with gossip disabled");
builder.gossip = Arc::new(test::NoopGossipFactory);
}
if config.disable_publish {
tracing::info!("Running with publish disabled");
builder.publish = Arc::new(test::NoopPublishFactory);
}
}
builder.auth_material_bootstrap = config.auth_material_bootstrap;
builder.auth_material_relay = config.auth_material_relay;
let evt_sender = Arc::new(std::sync::OnceLock::new());
if let ReportConfig::JsonLines(_) = &config.report {
builder.report = HcReportFactory::create(lair_client.clone());
}
builder.blocks = Arc::new(HolochainBlocksFactory {
getter: config.get_conductor_db.clone(),
});
builder.peer_meta_store = Arc::new(HolochainPeerMetaStoreFactory {
getter: config.get_db_peer_meta.clone(),
});
builder.op_store = Arc::new(HolochainOpStoreFactory {
getter: config.get_db_op_store.clone(),
cache_getter: config.get_db_cache.clone(),
handler: evt_sender.clone(),
});
let preflight = Arc::new(Mutex::new(
crate::wire::WirePreflightMessage {
compat: config.compat.clone(),
agents: Vec::new(),
}
.encode()?,
));
builder.bootstrap = Arc::new(BootWrapFact {
compat: config.compat.clone(),
preflight: preflight.clone(),
orig: builder.bootstrap,
});
let builder = builder.with_default_config()?;
#[cfg(feature = "test_utils")]
{
#[cfg(feature = "transport-tx5-backend-go-pion")]
builder
.config
.set_module_config(&kitsune2_transport_tx5::Tx5TransportModConfig {
tx5_transport: kitsune2_transport_tx5::Tx5TransportConfig {
signal_allow_plain_text: true,
timeout_s: 20,
webrtc_connect_timeout_s: 15,
..Default::default()
},
})?;
#[cfg(feature = "transport-iroh")]
builder
.config
.set_module_config(&kitsune2_transport_iroh::IrohTransportModConfig {
iroh_transport: kitsune2_transport_iroh::IrohTransportConfig {
relay_allow_plain_text: true,
..Default::default()
},
})?;
builder.config.set_module_config(
&kitsune2_core::factories::CoreBootstrapModConfig {
core_bootstrap: kitsune2_core::factories::CoreBootstrapConfig {
backoff_min_ms: 1_000,
..Default::default()
},
},
)?;
}
if let ReportConfig::JsonLines(hc_report) = config.report {
builder
.config
.set_module_config(&hc_report::HcReportModConfig { hc_report })?;
}
let mut kitsune2_config = Config::default();
if let Some(network_config) = config.network_config {
builder.config.set_module_config(&network_config)?;
kitsune2_config = Self::kitsune2_params_from_value(network_config)?;
}
let pending = Arc::new_cyclic(|this| {
Mutex::new(Pending {
this: this.clone(),
map: HashMap::new(),
})
});
let kitsune = builder.build().await?;
let kitsune2 = kitsune.clone();
let db_getter = config.get_db_peer_meta.clone();
let pruning_task_abort_handle = HolochainP2pActor::spawn_pruning_task(
config.peer_meta_pruning_interval_ms,
kitsune2,
db_getter,
);
Ok(Arc::new_cyclic(|this| Self {
this: this.clone(),
target_arc_factor: config.target_arc_factor,
compat: config.compat,
preflight,
evt_sender,
lair_client,
kitsune,
blocks_db_getter: config.get_conductor_db.clone(),
pending,
kitsune2_config,
pruning_task_abort_handle,
request_timeout: config.request_timeout,
incoming_request_concurrency_limit_semaphore: Arc::new(Semaphore::new(
config.incoming_request_concurrency_limit as usize,
)),
}))
}
fn kitsune2_params_from_value(value: serde_json::Value) -> HolochainP2pResult<Config> {
let config = Config::default();
if let Ok(core_bootstrap_config) = serde_json::from_value::<
kitsune2_core::factories::CoreBootstrapModConfig,
>(value.clone())
{
config.set_module_config(&core_bootstrap_config)?;
}
#[cfg(feature = "transport-tx5-backend-go-pion")]
if let Ok(tx5_transport_config) =
serde_json::from_value::<kitsune2_transport_tx5::Tx5TransportModConfig>(value)
{
config.set_module_config(&tx5_transport_config)?;
}
Ok(config)
}
fn spawn_pruning_task(
interval_ms: u64,
kitsune2: DynKitsune,
db_getter: GetDbPeerMeta,
) -> AbortHandle {
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(interval_ms)).await;
let spaces = kitsune2.list_spaces();
let pruning_futs =
spaces.into_iter().map(|space_id| {
let db_getter = db_getter.clone();
let kitsune2 = kitsune2.clone();
async move {
let space = kitsune2.clone().space(space_id.clone(), None).await?;
let peer_store = space.peer_store().clone();
let db = db_getter(DnaHash::from_k2_space(&space_id)).await?;
db.write_async(|txn| -> DatabaseResult<()> {
let prune_count = txn.execute(sql_peer_meta_store::PRUNE, [])?;
tracing::debug!("Pruned {prune_count} expired rows from peer meta store");
Ok(())
})
.await
.map_err(HolochainP2pError::other)?;
let agents = peer_store.get_all().await?;
let urls_to_prune = db
.read_async(move |txn| -> DatabaseResult<Vec<Value>> {
let mut stmt = txn.prepare(sql_peer_meta_store::GET_ALL_BY_KEY)?;
let mut rows = stmt.query(
named_params! {":meta_key":format!("{KEY_PREFIX_ROOT}:{META_KEY_UNRESPONSIVE}")},
)?;
let mut urls = Vec::new();
while let Some(row) = rows.next()? {
let peer_url = Url::from_str(row.get::<_, String>(0)?).map_err(|err| DatabaseError::Other(err.into()))?;
let meta_value = row.get::<_, BytesSql>("meta_value")?;
let timestamp: kitsune2_api::Timestamp = serde_json::from_slice(&meta_value.0).map_err(|err| DatabaseError::Other(err.into()))?;
if let Some(agent) = agents
.iter()
.find(|agent| agent.url == Some(peer_url.clone()))
{
if agent.created_at > timestamp {
urls.push(Value::Text(peer_url.to_string()));
}
}
}
Ok(urls)
})
.await
.map_err(HolochainP2pError::other)?;
db.write_async(|txn| -> DatabaseResult<()> {
let values = Rc::new(urls_to_prune);
let mut stmt = txn.prepare(sql_peer_meta_store::DELETE_URLS)?;
stmt.execute(named_params! {":urls": values, ":meta_key": format!("{KEY_PREFIX_ROOT}:{META_KEY_UNRESPONSIVE}")})?;
tracing::debug!("Pruned {} unexpired {KEY_PREFIX_ROOT}:{META_KEY_UNRESPONSIVE} rows from peer meta store because we have newer agent info", values.len());
Ok(())
})
.await
.map_err(HolochainP2pError::other)?;
Ok::<_, HolochainP2pError>(())
}
});
let results = futures::future::join_all(pruning_futs).await;
for err in results.into_iter().filter_map(Result::err) {
tracing::warn!("Pruning peer meta store failed: {err}");
}
}
})
.abort_handle()
}
async fn get_peers_for_location(
&self,
space: &DynSpace,
loc: u32,
) -> HolochainP2pResult<Vec<(AgentPubKey, Url)>> {
let agent_list = get_responsive_remote_agents_near_location(
space.peer_store().clone(),
space.local_agent_store().clone(),
space.peer_meta_store().clone(),
loc,
1024,
)
.await?;
Ok(agent_list
.into_iter()
.filter_map(|a| {
#[allow(clippy::question_mark)]
if a.url.is_none() {
return None;
}
if !a.storage_arc.contains(loc) {
return None;
}
Some((
AgentPubKey::from_k2_agent(&a.agent),
a.url.as_ref().unwrap().clone(),
))
})
.collect::<Vec<_>>())
}
async fn get_random_peers_for_location(
&self,
tag: &'static str,
space: &DynSpace,
loc: u32,
options: &NetworkRequestOptions,
) -> HolochainP2pResult<Vec<(AgentPubKey, Url)>> {
let agents = self.get_peers_for_location(space, loc).await?;
if agents.is_empty() {
return Err(HolochainP2pError::NoPeersForLocation(
String::from(tag),
loc,
));
}
Ok(agents
.choose_multiple(&mut rand::rng(), options.remote_agent_count as usize)
.cloned()
.collect())
}
fn should_bridge(&self, space: &DynSpace, to_url: Url) -> bool {
space.current_url() == Some(to_url)
}
async fn send_notify(
&self,
space: &DynSpace,
to_url: Url,
req: WireMessage,
) -> HolochainP2pResult<()> {
let req = WireMessage::encode_batch(&[&req])?;
space.send_notify(to_url, req).await?;
Ok(())
}
async fn send_notify_response(
&self,
space_id: SpaceId,
to_url: Url,
msg_id: u64,
res: WireMessage,
) -> HolochainP2pResult<()> {
let space = self
.kitsune
.space_if_exists(space_id)
.await
.ok_or_else(|| HolochainP2pError::other("no such space"))?;
if self.should_bridge(&space, to_url.clone()) {
let r = self.pending.lock().unwrap().map.remove(&msg_id);
if let Some(r) = r {
if let Err(err) = r.send(res) {
tracing::warn!(?err, "Failed to send bridged response");
}
} else {
tracing::warn!("Attempt to bridge response for unknown msg_id: {msg_id}");
}
} else {
self.send_notify(&space, to_url, res).await?;
}
Ok(())
}
async fn send_request<O, C>(
&self,
tag: &'static str,
space: &DynSpace,
to_url: Url,
msg_id: u64,
req: WireMessage,
dna_hash: DnaHash,
options: NetworkRequestOptions,
zome_call_origin: Option<(ZomeName, FunctionName)>,
cb: C,
) -> HolochainP2pResult<O>
where
C: FnOnce(WireMessage) -> HolochainP2pResult<O>,
{
let req = WireMessage::encode_batch(&[&req])?;
let timeout = match options.timeout_ms {
Some(ms) => Duration::from_millis(ms),
None => self.request_timeout,
};
let (s, r) = tokio::sync::oneshot::channel();
self.pending.lock().unwrap().register(msg_id, s, timeout);
let start = std::time::Instant::now();
if self.should_bridge(space, to_url.clone()) {
self.recv_notify(to_url.clone(), dna_hash.to_k2_space(), req)?;
} else {
space.send_notify(to_url.clone(), req).await?;
}
let record_metric = |error: bool, zome_call_origin: Option<(ZomeName, FunctionName)>| {
let mut attributes = vec![
opentelemetry::KeyValue::new("dna_hash", format!("{dna_hash:?}")),
opentelemetry::KeyValue::new("tag", tag),
opentelemetry::KeyValue::new("error", error),
];
if let Some((zome, function)) = zome_call_origin {
attributes.push(opentelemetry::KeyValue::new("zome", zome.to_string()));
attributes.push(opentelemetry::KeyValue::new("fn", function.to_string()));
}
p2p_outgoing_request_duration_metric()
.record(start.elapsed().as_secs_f64(), &attributes);
};
match r.await {
Err(_) => {
record_metric(true, zome_call_origin);
Err(HolochainP2pError::other(format!(
"{tag} response channel dropped: likely response timeout"
)))
}
Ok(resp) => {
let is_err = matches!(resp, WireMessage::ErrorRes { .. });
record_metric(is_err, zome_call_origin);
cb(resp)
}
}
}
async fn inform_ops_stored(
&self,
space_id: SpaceId,
ops: Vec<StoredOp>,
) -> HolochainP2pResult<()> {
self.kitsune
.space(space_id, None)
.await?
.inform_ops_stored(ops)
.await
.map_err(HolochainP2pError::K2Error)
}
fn space_config_override(
&self,
space_overrides: CellConfigOverrides,
) -> HolochainP2pResult<Option<Config>> {
let mut override_needed = false;
let config = self.kitsune2_config.clone();
if let Some(bootstrap_url) = space_overrides.bootstrap_url.as_ref() {
let mut core_bootstrap_config: kitsune2_core::factories::CoreBootstrapModConfig =
config.get_module_config().unwrap_or_default();
core_bootstrap_config.core_bootstrap.server_url = Some(bootstrap_url.clone());
config.set_module_config(&core_bootstrap_config)?;
override_needed = true;
}
#[cfg(feature = "transport-iroh")]
if let Some(relay_url) = space_overrides.relay_url.as_ref() {
let mut iroh_transport_config: kitsune2_transport_iroh::IrohTransportModConfig =
config.get_module_config().unwrap_or_default();
iroh_transport_config.iroh_transport.relay_url = Some(relay_url.clone());
config.set_module_config(&iroh_transport_config)?;
override_needed = true;
}
#[cfg(feature = "transport-tx5-backend-go-pion")]
if let Some(relay_url) = space_overrides.relay_url.as_ref() {
let mut tx5_transport_config: kitsune2_transport_tx5::Tx5TransportModConfig =
config.get_module_config().unwrap_or_default();
tx5_transport_config.tx5_transport.server_url = relay_url.clone();
config.set_module_config(&tx5_transport_config)?;
override_needed = true;
}
if override_needed {
Ok(Some(config))
} else {
Ok(None)
}
}
fn handle_space_wire_message_received(
&self,
msg: WireMessage,
space_id: SpaceId,
from_peer: Url,
permit: Option<OwnedSemaphorePermit>,
) {
let evt_sender = self.evt_sender.clone();
let kitsune = self.kitsune.clone();
let pending = self.pending.clone();
let this = self.this.clone();
tokio::task::spawn(async move {
use crate::event::HcP2pHandler;
use crate::wire::WireMessage::*;
let _permit = permit;
let start = std::time::Instant::now();
let dna_hash = DnaHash::from_k2_space(&space_id);
let dna_hash_cloned = dna_hash.clone();
let message_type = msg.as_ref().to_string();
let record_incoming_request_duration =
|additional_attributes: &[opentelemetry::KeyValue]| {
let mut attributes = Vec::with_capacity(additional_attributes.len() + 2);
attributes.push(opentelemetry::KeyValue::new("message_type", message_type));
attributes.push(opentelemetry::KeyValue::new(
"dna_hash",
format!("{dna_hash_cloned:?}"),
));
attributes.extend_from_slice(additional_attributes);
p2p_handle_incoming_request_duration_metric()
.record(start.elapsed().as_secs_f64(), &attributes);
};
match msg {
ErrorRes { msg_id, .. }
| CallRemoteRes { msg_id, .. }
| GetRes { msg_id, .. }
| GetLinksRes { msg_id, .. }
| CountLinksRes { msg_id, .. }
| GetAgentActivityRes { msg_id, .. }
| MustGetAgentActivityRes { msg_id, .. }
| SendValidationReceiptsRes { msg_id } => {
if let Some(resp) = pending.lock().unwrap().respond(msg_id) {
let _ = resp.send(msg);
}
record_incoming_request_duration(&[]);
}
CallRemoteReq {
msg_id,
to_agent,
zome_call_params_serialized,
signature,
} => {
let resp = match evt_sender
.get()
.ok_or_else(|| HolochainP2pError::other(EVT_REG_ERR))?
.handle_call_remote(
dna_hash,
to_agent.clone(),
zome_call_params_serialized,
signature,
)
.await
{
Ok(response) => CallRemoteRes { msg_id, response },
Err(err) => ErrorRes {
msg_id,
error: format!("{err:?}"),
},
};
if let Some(this) = this.upgrade() {
if let Err(err) = this
.send_notify_response(space_id, from_peer, msg_id, resp)
.await
{
tracing::debug!(?err, "Error sending call remote response");
}
} else {
tracing::debug!("HolochainP2pActor has been dropped");
}
record_incoming_request_duration(&[opentelemetry::KeyValue::new(
"to_agent",
format!("{to_agent:?}"),
)]);
}
GetReq {
msg_id,
to_agent,
dht_hash,
} => {
let resp = match evt_sender
.get()
.ok_or_else(|| HolochainP2pError::other(EVT_REG_ERR))?
.handle_get(dna_hash, to_agent.clone(), dht_hash)
.await
{
Ok(response) => GetRes { msg_id, response },
Err(err) => ErrorRes {
msg_id,
error: format!("{err:?}"),
},
};
let resp = crate::wire::WireMessage::encode_batch(&[&resp])?;
if let Err(err) = kitsune
.space_if_exists(space_id)
.await
.ok_or_else(|| HolochainP2pError::other("no such space"))?
.send_notify(from_peer, resp)
.await
{
tracing::debug!(?err, "Error sending get response");
}
record_incoming_request_duration(&[opentelemetry::KeyValue::new(
"to_agent",
format!("{to_agent:?}"),
)]);
}
GetLinksReq {
msg_id,
to_agent,
link_key,
options,
} => {
let resp = match evt_sender
.get()
.ok_or_else(|| HolochainP2pError::other(EVT_REG_ERR))?
.handle_get_links(dna_hash, to_agent.clone(), link_key, options)
.await
{
Ok(response) => GetLinksRes { msg_id, response },
Err(err) => ErrorRes {
msg_id,
error: format!("{err:?}"),
},
};
let resp = crate::wire::WireMessage::encode_batch(&[&resp])?;
if let Err(err) = kitsune
.space_if_exists(space_id)
.await
.ok_or_else(|| HolochainP2pError::other("no such space"))?
.send_notify(from_peer, resp)
.await
{
tracing::debug!(?err, "Error sending get_links response");
}
record_incoming_request_duration(&[opentelemetry::KeyValue::new(
"to_agent",
format!("{to_agent:?}"),
)]);
}
CountLinksReq {
msg_id,
to_agent,
query,
} => {
let resp = match evt_sender
.get()
.ok_or_else(|| HolochainP2pError::other(EVT_REG_ERR))?
.handle_count_links(dna_hash, to_agent.clone(), query)
.await
{
Ok(response) => CountLinksRes { msg_id, response },
Err(err) => ErrorRes {
msg_id,
error: format!("{err:?}"),
},
};
let resp = crate::wire::WireMessage::encode_batch(&[&resp])?;
if let Err(err) = kitsune
.space_if_exists(space_id)
.await
.ok_or_else(|| HolochainP2pError::other("no such space"))?
.send_notify(from_peer, resp)
.await
{
tracing::debug!(?err, "Error sending count_links response");
}
record_incoming_request_duration(&[opentelemetry::KeyValue::new(
"to_agent",
format!("{to_agent:?}"),
)]);
}
GetAgentActivityReq {
msg_id,
to_agent,
agent,
query,
options,
} => {
let resp = match evt_sender
.get()
.ok_or_else(|| HolochainP2pError::other(EVT_REG_ERR))?
.handle_get_agent_activity(
dna_hash,
to_agent.clone(),
agent.clone(),
query,
options,
)
.await
{
Ok(response) => GetAgentActivityRes { msg_id, response },
Err(err) => ErrorRes {
msg_id,
error: format!("{err:?}"),
},
};
let resp = crate::wire::WireMessage::encode_batch(&[&resp])?;
if let Err(err) = kitsune
.space_if_exists(space_id)
.await
.ok_or_else(|| HolochainP2pError::other("no such space"))?
.send_notify(from_peer, resp)
.await
{
tracing::debug!(?err, "Error sending get_agent_activity response");
}
record_incoming_request_duration(&[
opentelemetry::KeyValue::new("to_agent", format!("{to_agent:?}")),
opentelemetry::KeyValue::new("agent", format!("{agent:?}")),
]);
}
MustGetAgentActivityReq {
msg_id,
to_agent,
agent,
filter,
} => {
let resp = match evt_sender
.get()
.ok_or_else(|| HolochainP2pError::other(EVT_REG_ERR))?
.handle_must_get_agent_activity(
dna_hash,
to_agent.clone(),
agent.clone(),
filter,
)
.await
{
Ok(response) => MustGetAgentActivityRes { msg_id, response },
Err(err) => ErrorRes {
msg_id,
error: format!("{err:?}"),
},
};
let resp = crate::wire::WireMessage::encode_batch(&[&resp])?;
if let Err(err) = kitsune
.space_if_exists(space_id)
.await
.ok_or_else(|| HolochainP2pError::other("no such space"))?
.send_notify(from_peer, resp)
.await
{
tracing::debug!(?err, "Error sending must_get_agent_activity response");
}
record_incoming_request_duration(&[
opentelemetry::KeyValue::new("to_agent", format!("{to_agent:?}")),
opentelemetry::KeyValue::new("agent", format!("{agent:?}")),
]);
}
SendValidationReceiptsReq {
msg_id,
to_agent,
receipts,
} => {
let resp = match evt_sender
.get()
.ok_or_else(|| HolochainP2pError::other(EVT_REG_ERR))?
.handle_validation_receipts_received(dna_hash, to_agent.clone(), receipts)
.await
{
Ok(_) => SendValidationReceiptsRes { msg_id },
Err(err) => ErrorRes {
msg_id,
error: format!("{err:?}"),
},
};
let resp = crate::wire::WireMessage::encode_batch(&[&resp])?;
if let Err(err) = kitsune
.space_if_exists(space_id)
.await
.ok_or_else(|| HolochainP2pError::other("no such space"))?
.send_notify(from_peer, resp)
.await
{
tracing::debug!(?err, "Error sending send_validation_receipts response");
}
record_incoming_request_duration(&[opentelemetry::KeyValue::new(
"to_agent",
format!("{to_agent:?}"),
)]);
}
RemoteSignalEvt {
to_agent,
zome_call_params_serialized,
signature,
} => {
let _response = evt_sender
.get()
.ok_or_else(|| HolochainP2pError::other(EVT_REG_ERR))?
.handle_call_remote(
dna_hash.clone(),
to_agent.clone(),
zome_call_params_serialized,
signature,
)
.await;
record_incoming_request_duration(&[opentelemetry::KeyValue::new(
"to_agent",
format!("{to_agent:?}"),
)]);
p2p_recv_remote_signal_metric().add(
1,
&[opentelemetry::KeyValue::new(
"dna_hash",
dna_hash.to_string(),
)],
);
}
PublishCountersignEvt { op } => {
evt_sender
.get()
.ok_or_else(|| HolochainP2pError::other(EVT_REG_ERR))?
.handle_publish_countersign(dna_hash, op)
.await?;
record_incoming_request_duration(&[]);
}
CountersigningSessionNegotiationEvt { to_agent, message } => {
evt_sender
.get()
.ok_or_else(|| HolochainP2pError::other(EVT_REG_ERR))?
.handle_countersigning_session_negotiation(
dna_hash,
to_agent.clone(),
message,
)
.await?;
record_incoming_request_duration(&[opentelemetry::KeyValue::new(
"to_agent",
format!("{to_agent:?}"),
)]);
}
}
HolochainP2pResult::Ok(())
});
}
}
macro_rules! timing_trace_out {
($res:ident, $start:ident, $($rest:tt)*) => {
match &$res {
Ok(_) => {
tracing::trace!(
target: "NETAUDIT",
m = "holochain_p2p",
r = "ok",
elapsed_s = $start.elapsed().as_secs_f64(),
$($rest)*
);
}
Err(err) => {
tracing::trace!(
target: "NETAUDIT",
m = "holochain_p2p",
?err,
elapsed_s = $start.elapsed().as_secs_f64(),
$($rest)*
);
}
}
};
}
async fn select_ok_non_empty<I, O>(futures: I, is_empty: fn(&O) -> bool) -> HolochainP2pResult<O>
where
I: IntoIterator,
I::Item: Future<Output = HolochainP2pResult<O>> + Unpin,
{
let mut futures: Vec<_> = futures.into_iter().collect();
let mut best_response = None;
loop {
let (out, _, remaining): (HolochainP2pResult<O>, _, _) =
futures::future::select_all(futures).await;
if let Ok(ops) = out.as_ref() {
if is_empty(ops) {
if remaining.is_empty() {
return out;
}
best_response = Some(out);
} else {
return out;
}
} else if remaining.is_empty() {
return best_response.unwrap_or(out);
}
futures = remaining;
}
}
impl actor::HcP2p for HolochainP2pActor {
#[cfg(feature = "test_utils")]
fn test_kitsune(&self) -> &DynKitsune {
&self.kitsune
}
fn peer_store(&self, dna_hash: DnaHash) -> BoxFut<'_, HolochainP2pResult<DynPeerStore>> {
Box::pin(async move {
Ok(self
.kitsune
.space(dna_hash.to_k2_space(), None)
.await?
.peer_store()
.clone())
})
}
fn register_handler(
&self,
handler: event::DynHcP2pHandler,
) -> BoxFut<'_, HolochainP2pResult<()>> {
Box::pin(async move {
if let Some(this) = self.this.upgrade() {
self.evt_sender
.set(WrapEvtSender(handler))
.map_err(|_| HolochainP2pError::other("handler already set"))?;
self.kitsune.register_handler(this).await?;
Ok(())
} else {
Err(HolochainP2pError::other(
"arc wrapping hc_p2p no longer valid",
))
}
})
}
fn join(
&self,
dna_hash: DnaHash,
agent_pub_key: AgentPubKey,
_maybe_agent_info: Option<AgentInfoSigned>,
config_override: Option<CellConfigOverrides>,
) -> BoxFut<'_, HolochainP2pResult<()>> {
Box::pin(async move {
let config_override = match config_override {
Some(overrides) => self.space_config_override(overrides)?,
None => None,
};
let space = self
.kitsune
.space(dna_hash.to_k2_space(), config_override)
.await?;
let local_agent: DynLocalAgent = Arc::new(HolochainP2pLocalAgent::new(
agent_pub_key,
DhtArc::FULL,
self.target_arc_factor,
self.lair_client.clone(),
));
space.local_agent_join(local_agent).await?;
Ok(())
})
}
fn leave(
&self,
dna_hash: DnaHash,
agent_pub_key: AgentPubKey,
) -> BoxFut<'_, HolochainP2pResult<()>> {
Box::pin(async move {
let space_id = dna_hash.to_k2_space();
let space = self.kitsune.space(space_id.clone(), None).await?;
space.local_agent_leave(agent_pub_key.to_k2_agent()).await;
if space
.local_agent_store()
.get_all()
.await
.is_ok_and(|agents| agents.is_empty())
{
drop(space);
if let Err(err) = self.kitsune.remove_space(space_id).await {
tracing::warn!(?err, "Failed to remove space after last agent left");
}
}
Ok(())
})
}
fn new_integrated_data(
&self,
space_id: SpaceId,
ops: Vec<StoredOp>,
) -> BoxFut<'_, HolochainP2pResult<()>> {
Box::pin(async move { self.inform_ops_stored(space_id, ops).await })
}
fn call_remote(
&self,
dna_hash: DnaHash,
to_agent: AgentPubKey,
zome_call_params_serialized: ExternIO,
signature: Signature,
zome_call_origin: Option<(ZomeName, FunctionName)>,
) -> BoxFut<'_, HolochainP2pResult<SerializedBytes>> {
Box::pin(async move {
let space_id = dna_hash.to_k2_space();
let space = self.kitsune.space(space_id.clone(), None).await?;
let byte_count = zome_call_params_serialized.0.len();
let to_url = space
.peer_store()
.get(to_agent.to_k2_agent())
.await?
.and_then(|i| i.url.clone())
.ok_or_else(|| HolochainP2pError::other("call_remote: no url for peer"))?;
let (msg_id, req) =
WireMessage::call_remote_req(to_agent, zome_call_params_serialized, signature);
let start = std::time::Instant::now();
let out = self
.send_request(
"call_remote",
&space,
to_url,
msg_id,
req,
dna_hash,
NetworkRequestOptions::default(),
zome_call_origin,
|res| match res {
crate::wire::WireMessage::CallRemoteRes { response, .. } => Ok(response),
_ => Err(HolochainP2pError::other(format!(
"invalid response to call_remote: {res:?}"
))),
},
)
.await;
timing_trace_out!(out, start, byte_count, a = "send_call_remote");
out
})
}
fn send_remote_signal(
&self,
dna_hash: DnaHash,
target_payload_list: Vec<(AgentPubKey, ExternIO, Signature)>,
) -> BoxFut<'_, HolochainP2pResult<()>> {
Box::pin(async move {
let space_id = dna_hash.to_k2_space();
let space = self.kitsune.space(space_id.clone(), None).await?;
let byte_count: usize = target_payload_list.iter().map(|(_, p, _)| p.0.len()).sum();
let mut all = Vec::new();
for (to_agent, payload, signature) in target_payload_list {
let to_agent_id = to_agent.to_k2_agent();
let to_url = match space
.peer_store()
.get(to_agent_id)
.await?
.and_then(|i| i.url.clone())
{
Some(to_url) => to_url,
None => continue,
};
let req = WireMessage::remote_signal_evt(to_agent.clone(), payload, signature);
if self.should_bridge(&space, to_url.clone()) {
if let Err(err) = WireMessage::encode_batch(&[&req])
.map(|msg| self.recv_notify(to_url, space_id.clone(), msg))
{
tracing::debug!(?err, "send_remote_signal failed to bridge call");
}
} else {
all.push(async {
if let Err(err) = self.send_notify(&space, to_url, req).await {
tracing::debug!(?err, "send_remote_signal failed");
}
});
}
}
let start = std::time::Instant::now();
if !all.is_empty() {
let _ = futures::future::join_all(all).await;
}
let out = Ok(());
timing_trace_out!(out, start, byte_count, a = "send_remote_signal");
out
})
}
fn publish(
&self,
dna_hash: DnaHash,
basis_hash: OpBasis,
_source: AgentPubKey,
op_hash_list: Vec<DhtOpHash>,
_timeout_ms: Option<u64>,
) -> BoxFut<'_, HolochainP2pResult<()>> {
Box::pin(async move {
let space = dna_hash.to_k2_space();
let space = self.kitsune.space(space, None).await?;
let op_hash_list: Vec<OpId> = op_hash_list
.into_iter()
.map(|h| h.to_located_k2_op_id(&basis_hash))
.collect();
let urls: std::collections::HashSet<Url> = get_responsive_remote_agents_near_location(
space.peer_store().clone(),
space.local_agent_store().clone(),
space.peer_meta_store().clone(),
basis_hash.get_loc(),
usize::MAX,
)
.await?
.into_iter()
.filter_map(|info| {
if info.is_tombstone {
return None;
}
info.url.clone()
})
.collect();
for url in urls {
space
.publish()
.publish_ops(op_hash_list.clone(), url)
.await?;
}
Ok(())
})
}
fn publish_countersign(
&self,
dna_hash: DnaHash,
basis_hash: OpBasis,
op: ChainOp,
) -> BoxFut<'_, HolochainP2pResult<()>> {
Box::pin(async move {
let space_id = dna_hash.to_k2_space();
let space = self.kitsune.space(space_id.clone(), None).await?;
let peers = self
.get_peers_for_location(&space, basis_hash.get_loc())
.await?;
let out = futures::future::join_all(peers.into_iter().map(|p| {
let req = crate::wire::WireMessage::publish_countersign_evt(op.clone());
Box::pin({
let space = space.clone();
async move {
self.send_notify(&space, p.1, req)
.await
.map_err(|e| (p.0, e))
}
})
}))
.await;
if out.iter().all(|r| r.is_err()) {
return Err(HolochainP2pError::other(
"publish_countersign failed to publish to any peers",
));
} else {
out.into_iter()
.filter_map(|r| r.err())
.for_each(|(agent, err)| {
tracing::info!(
?err,
?agent,
"publish_countersign failed to publish to a peer"
);
});
}
Ok(())
})
}
fn get(
&self,
dna_hash: DnaHash,
dht_hash: AnyDhtHash,
options: NetworkRequestOptions,
zome_call_origin: Option<(ZomeName, FunctionName)>,
) -> BoxFut<'_, HolochainP2pResult<Vec<WireOps>>> {
Box::pin(async move {
let space_id = dna_hash.to_k2_space();
let space = self.kitsune.space(space_id.clone(), None).await?;
let loc = dht_hash.get_loc();
let agents = self
.get_random_peers_for_location("get", &space, loc, &options)
.await?;
let start = std::time::Instant::now();
let out = select_ok_non_empty(
agents.into_iter().map(|(to_agent, to_url)| {
Box::pin(async {
let (msg_id, req) = WireMessage::get_req(to_agent, dht_hash.clone());
self.send_request(
"get",
&space,
to_url,
msg_id,
req,
dna_hash.clone(),
options.clone(),
zome_call_origin.clone(),
|res| match res {
WireMessage::GetRes { response, .. } => Ok(response),
_ => Err(HolochainP2pError::other(format!(
"invalid response to get: {res:?}"
))),
},
)
.await
})
}),
|wire_ops| match wire_ops {
WireOps::Entry(WireEntryOps {
creates,
deletes,
updates,
entry,
}) if creates.is_empty()
&& deletes.is_empty()
&& updates.is_empty()
&& entry.is_none() =>
{
true
}
WireOps::Record(WireRecordOps {
action,
deletes,
updates,
entry,
}) if action.is_none()
&& deletes.is_empty()
&& updates.is_empty()
&& entry.is_none() =>
{
true
}
_ => false,
},
)
.await;
timing_trace_out!(out, start, a = "send_get");
out.map(|x| vec![x])
})
}
fn get_links(
&self,
dna_hash: DnaHash,
link_key: WireLinkKey,
options: GetLinksRequestOptions,
zome_call_origin: Option<(ZomeName, FunctionName)>,
) -> BoxFut<'_, HolochainP2pResult<Vec<WireLinkOps>>> {
Box::pin(async move {
let space_id = dna_hash.to_k2_space();
let space = self.kitsune.space(space_id.clone(), None).await?;
let loc = link_key.base.get_loc();
let agents = self
.get_random_peers_for_location(
"get_links",
&space,
loc,
&options.network_req_options,
)
.await?;
let start = std::time::Instant::now();
let out = select_ok_non_empty(
agents.into_iter().map(|(to_agent, to_url)| {
Box::pin(async {
let r_options: event::GetLinksOptions = (&options).into();
let (msg_id, req) = crate::wire::WireMessage::get_links_req(
to_agent,
link_key.clone(),
r_options,
);
self.send_request(
"get_links",
&space,
to_url,
msg_id,
req,
dna_hash.clone(),
options.network_req_options.clone(),
zome_call_origin.clone(),
|res| match res {
crate::wire::WireMessage::GetLinksRes { response, .. } => {
Ok(response)
}
_ => Err(HolochainP2pError::other(format!(
"invalid response to get_links: {res:?}"
))),
},
)
.await
})
}),
|wire_link_ops| wire_link_ops.creates.is_empty(),
)
.await;
timing_trace_out!(out, start, a = "send_get_links");
out.map(|x| vec![x])
})
}
fn count_links(
&self,
dna_hash: DnaHash,
query: WireLinkQuery,
options: NetworkRequestOptions,
zome_call_origin: Option<(ZomeName, FunctionName)>,
) -> BoxFut<'_, HolochainP2pResult<CountLinksResponse>> {
Box::pin(async move {
let space_id = dna_hash.to_k2_space();
let space = self.kitsune.space(space_id.clone(), None).await?;
let loc = query.base.get_loc();
let agents = self
.get_random_peers_for_location("count_links", &space, loc, &options)
.await?;
let start = std::time::Instant::now();
let out = select_ok_non_empty(
agents.into_iter().map(|(to_agent, to_url)| {
Box::pin(async {
let (msg_id, req) = WireMessage::count_links_req(to_agent, query.clone());
self.send_request(
"count_links",
&space,
to_url,
msg_id,
req,
dna_hash.clone(),
options.clone(),
zome_call_origin.clone(),
|res| match res {
crate::wire::WireMessage::CountLinksRes { response, .. } => {
Ok(response)
}
_ => Err(HolochainP2pError::other(format!(
"invalid response to count_links: {res:?}"
))),
},
)
.await
})
}),
|count_links_res| count_links_res.create_link_actions().is_empty(),
)
.await;
timing_trace_out!(out, start, a = "send_count_links");
out
})
}
fn get_agent_activity(
&self,
dna_hash: DnaHash,
agent: AgentPubKey,
query: ChainQueryFilter,
options: actor::GetActivityOptions,
zome_call_origin: Option<(ZomeName, FunctionName)>,
) -> BoxFut<'_, HolochainP2pResult<Vec<AgentActivityResponse>>> {
Box::pin(async move {
let space_id = dna_hash.to_k2_space();
let space = self.kitsune.space(space_id.clone(), None).await?;
let loc = agent.get_loc();
let agents = self
.get_random_peers_for_location(
"get_agent_activity",
&space,
loc,
&options.network_req_options,
)
.await?;
let start = std::time::Instant::now();
let out = select_ok_non_empty(
agents.into_iter().map(|(to_agent, to_url)| {
Box::pin(async {
let r_options: event::GetActivityOptions = (&options).into();
let (msg_id, req) = WireMessage::get_agent_activity_req(
to_agent,
agent.clone(),
query.clone(),
r_options,
);
self.send_request(
"get_agent_activity",
&space,
to_url,
msg_id,
req,
dna_hash.clone(),
options.network_req_options.clone(),
zome_call_origin.clone(),
|res| match res {
WireMessage::GetAgentActivityRes { response, .. } => Ok(response),
_ => Err(HolochainP2pError::other(format!(
"invalid response to get_agent_activity: {res:?}"
))),
},
)
.await
})
}),
|agent_activity| {
matches!(
agent_activity,
AgentActivityResponse {
valid_activity: ChainItems::NotRequested,
rejected_activity: ChainItems::NotRequested,
status: ChainStatus::Empty,
highest_observed: None,
warrants,
..
} if warrants.is_empty()
)
},
)
.await;
timing_trace_out!(out, start, a = "send_get_agent_activity");
out.map(|x| vec![x])
})
}
fn must_get_agent_activity(
&self,
dna_hash: DnaHash,
author: AgentPubKey,
filter: ChainFilter,
options: NetworkRequestOptions,
zome_call_origin: Option<(ZomeName, FunctionName)>,
) -> BoxFut<'_, HolochainP2pResult<Vec<MustGetAgentActivityResponse>>> {
Box::pin(async move {
let space_id = dna_hash.to_k2_space();
let space = self.kitsune.space(space_id.clone(), None).await?;
let loc = author.get_loc();
let agents = self
.get_random_peers_for_location("must_get_agent_activity", &space, loc, &options)
.await?;
let start = std::time::Instant::now();
let out = select_ok_non_empty(
agents.into_iter().map(|(to_agent, to_url)| {
Box::pin(async {
let (msg_id, req) = crate::wire::WireMessage::must_get_agent_activity_req(
to_agent,
author.clone(),
filter.clone(),
);
self.send_request(
"must_get_agent_activity",
&space,
to_url,
msg_id,
req,
dna_hash.clone(),
options.clone(),
zome_call_origin.clone(),
|res| match res {
crate::wire::WireMessage::MustGetAgentActivityRes {
response,
..
} => Ok(response),
_ => Err(HolochainP2pError::other(format!(
"invalid response to must_get_agent_activity: {res:?}"
))),
},
)
.await
})
}),
|agent_activity| matches!(agent_activity, MustGetAgentActivityResponse::EmptyRange),
)
.await;
timing_trace_out!(out, start, a = "send_must_get_agent_activity");
out.map(|x| vec![x])
})
}
fn was_agent_recently_online(
&self,
dna_hash: DnaHash,
agent: AgentPubKey,
) -> BoxFut<'_, HolochainP2pResult<bool>> {
Box::pin(async move {
let space_id = dna_hash.to_k2_space();
let space = self
.kitsune
.space_if_exists(space_id.clone())
.await
.ok_or_else(|| K2Error::other(format!("No space found for: {dna_hash:?}")))?;
let agent_id = agent.to_k2_agent();
let agent_url = space
.peer_store()
.get(agent_id)
.await?
.and_then(|i| i.url.clone());
if let Some(agent_url) = agent_url {
let unresponsive = space.peer_meta_store().get_unresponsive(agent_url).await?;
Ok(unresponsive.is_none())
} else {
Ok(false)
}
})
}
fn send_validation_receipts(
&self,
dna_hash: DnaHash,
to_agent: AgentPubKey,
receipts: ValidationReceiptBundle,
) -> BoxFut<'_, HolochainP2pResult<()>> {
Box::pin(async move {
let space_id = dna_hash.to_k2_space();
let space = self.kitsune.space(space_id.clone(), None).await?;
let agent_id = to_agent.to_k2_agent();
let to_url = match space
.peer_store()
.get(agent_id)
.await?
.and_then(|i| i.url.clone())
{
Some(to_url) => to_url,
None => {
return Err(HolochainP2pError::other(
"send_validation_receipts could not find url for peer",
))
}
};
if space.current_url() == Some(to_url.clone()) {
tracing::info!("ignoring send_validation_receipts to ourselves");
return Ok(());
}
let (msg_id, req) =
WireMessage::send_validation_receipts_req(to_agent.clone(), receipts);
let start = std::time::Instant::now();
let out = self
.send_request(
"send_validation_receipts",
&space,
to_url,
msg_id,
req,
dna_hash,
NetworkRequestOptions::default(),
None,
|res| match res {
WireMessage::SendValidationReceiptsRes { .. } => Ok(()),
_ => Err(HolochainP2pError::other(format!(
"invalid response to send_validation_receipts: {res:?}"
))),
},
)
.await;
timing_trace_out!(out, start, a = "send_validation_receipts");
Ok(())
})
}
fn authority_for_hash(
&self,
dna_hash: DnaHash,
basis: OpBasis,
) -> BoxFut<'_, HolochainP2pResult<bool>> {
Box::pin(async move {
let loc = basis.get_loc();
let space_id = dna_hash.to_k2_space();
let space = self.kitsune.space(space_id.clone(), None).await?;
for agent in space.local_agent_store().get_all().await? {
if agent.get_cur_storage_arc().contains(loc) {
return Ok(true);
}
}
Ok(false)
})
}
fn countersigning_session_negotiation(
&self,
dna_hash: DnaHash,
agents: Vec<AgentPubKey>,
message: event::CountersigningSessionNegotiationMessage,
) -> BoxFut<'_, HolochainP2pResult<()>> {
Box::pin(async move {
let space_id = dna_hash.to_k2_space();
let space = self.kitsune.space(space_id.clone(), None).await?;
let mut peer_urls = Vec::with_capacity(agents.len());
for agent in agents {
let agent_id = agent.to_k2_agent();
if let Some(agent_info) = space
.peer_store()
.get(agent_id.clone())
.await
.inspect_err(|e| {
tracing::error!(
?e,
?agent,
"Failed to get peer for countersigning negotiation"
);
})
.ok()
.flatten()
{
if let Some(url) = &agent_info.url {
peer_urls.push((agent, url.clone()));
} else {
tracing::error!(?agent, "Peer has no url for countersigning negotiation");
}
}
}
let res = futures::future::join_all(peer_urls.into_iter().map(|(agent, url)| {
let req =
WireMessage::countersigning_session_negotiation_evt(agent, message.clone());
Box::pin({
let space = space.clone();
let space_id = space_id.clone();
async move {
if self.should_bridge(&space, url.clone()) {
let message = WireMessage::encode_batch(&[&req])?;
self.recv_notify(url, space_id, message)?;
Ok(())
} else {
self.send_notify(&space, url, req).await
}
}
})
}))
.await;
let any_failed = res.iter().any(|r| r.is_err());
res.into_iter().filter_map(|r| r.err()).for_each(|err| {
tracing::error!(
?err,
"Failed to send countersigning session negotiation to a peer"
);
});
if any_failed {
Err(HolochainP2pError::other(
"Failed to send countersigning session negotiation to all peers",
))
} else {
Ok(())
}
})
}
fn dump_network_metrics(
&self,
request: Kitsune2NetworkMetricsRequest,
) -> BoxFut<'_, HolochainP2pResult<HashMap<DnaHash, Kitsune2NetworkMetrics>>> {
Box::pin(async move {
let spaces = match request.dna_hash {
Some(dna_hash) => {
let space_id = dna_hash.to_k2_space();
vec![(
space_id.clone(),
self.kitsune
.space_if_exists(space_id)
.await
.ok_or_else(|| {
K2Error::other(format!("No space found for: {dna_hash:?}"))
})?,
)]
}
None => {
let all_space_ids = self.kitsune.list_spaces();
let mut spaces = Vec::with_capacity(all_space_ids.len());
for space_id in all_space_ids {
spaces.push((space_id.clone(), self.kitsune.space(space_id, None).await?));
}
spaces
}
};
Ok(
futures::future::join_all(spaces.into_iter().map(|(space_id, space)| {
Box::pin(async move {
let fetch_state_summary = space.fetch().get_state_summary().await?;
let gossip_state_summary = space
.gossip()
.get_state_summary(GossipStateSummaryRequest {
include_dht_summary: request.include_dht_summary,
})
.await?;
let local_agents = space
.local_agent_store()
.get_all()
.await?
.into_iter()
.map(|a| LocalAgentSummary {
agent: AgentPubKey::from_k2_agent(a.agent()),
storage_arc: a.get_cur_storage_arc(),
target_arc: a.get_tgt_storage_arc(),
})
.collect();
Ok((
DnaHash::from_k2_space(&space_id),
Kitsune2NetworkMetrics {
fetch_state_summary,
gossip_state_summary,
local_agents,
},
))
})
}))
.await
.into_iter()
.collect::<K2Result<HashMap<_, _>>>()?,
)
})
}
fn dump_network_stats(&self) -> BoxFut<'_, HolochainP2pResult<ApiTransportStats>> {
Box::pin(async move { Ok(self.kitsune.transport().await?.dump_network_stats().await?) })
}
fn target_arcs(
&self,
dna_hash: DnaHash,
) -> BoxFut<'_, HolochainP2pResult<Vec<kitsune2_api::DhtArc>>> {
Box::pin(async move {
let space_id = dna_hash.to_k2_space();
let space = self.kitsune.space(space_id, None).await?;
Ok(space
.local_agent_store()
.get_all()
.await?
.into_iter()
.map(|a| a.get_tgt_storage_arc())
.collect())
})
}
fn conductor_db_getter(&self) -> GetDbConductor {
self.blocks_db_getter.clone()
}
fn block(&self, block: Block) -> BoxFut<'_, HolochainP2pResult<()>> {
Box::pin(async move {
let target = block.target().clone();
let db = self.conductor_db_getter()().await;
holochain_state::block::block(&db, block)
.await
.map_err(|err| {
HolochainP2pError::other(format!("Could not write block to database: {err}"))
})?;
if let holochain_zome_types::block::BlockTarget::Cell(cell_id, _) = target {
match self
.kitsune
.space_if_exists(cell_id.dna_hash().to_k2_space())
.await
{
Some(space) => {
if let Err(err) = space
.peer_store()
.remove(cell_id.agent_pubkey().to_k2_agent())
.await
{
tracing::warn!(
?err,
?cell_id,
"Failed to remove agent from peer store after writing block"
);
}
}
None => {
tracing::debug!(
?cell_id,
"No Kitsune space exists for this DNA; skipping peer removal"
);
}
}
}
Ok(())
})
}
fn is_blocked(&self, target: BlockTargetId) -> BoxFut<'_, HolochainP2pResult<bool>> {
Box::pin(async move {
let db = self.conductor_db_getter()().await;
db.read_async(|txn| {
holochain_state::block::query_is_blocked(
txn,
target,
holochain_timestamp::Timestamp::now(),
)
})
.await
.map_err(|err| {
HolochainP2pError::other(format!("Could not read block from database: {err}"))
})
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
actor::HcP2p,
event::{
CountersigningSessionNegotiationMessage, GetActivityOptions, GetLinksOptions,
HcP2pHandler,
},
};
#[tokio::test(flavor = "multi_thread")]
async fn correct_id_loc_calc() {
let _ = HolochainP2pActor::create(Default::default(), holochain_keystore::test_keystore())
.await;
let h_space = DnaHash::from_raw_32(vec![0xdb; 32]);
let k_space = h_space.to_k2_space();
assert_eq!(h_space.get_loc(), k_space.loc());
let h_agent = AgentPubKey::from_raw_32(vec![0xdc; 32]);
let k_agent = h_agent.to_k2_agent();
assert_eq!(h_agent.get_loc(), k_agent.loc());
let h_op = DhtOpHash::from_raw_32(vec![0xdd; 32]);
let op_basis = OpBasis::from_raw_32_and_type(vec![0xde; 32], hash_type::AnyLinkable::Entry);
let k_op = h_op.to_located_k2_op_id(&op_basis);
assert_eq!(op_basis.get_loc(), k_op.loc());
}
#[tokio::test(flavor = "multi_thread")]
async fn correct_id_display() {
let _ = HolochainP2pActor::create(Default::default(), holochain_keystore::test_keystore())
.await;
let h_space = DnaHash::from_raw_32(vec![0xdb; 32]);
let k_space = h_space.to_k2_space();
assert_eq!(h_space.to_string(), k_space.to_string());
let h_agent = AgentPubKey::from_raw_32(vec![0xdc; 32]);
let k_agent = h_agent.to_k2_agent();
assert_eq!(h_agent.to_string(), k_agent.to_string());
let h_op = DhtOpHash::from_raw_32(vec![0xdd; 32]);
let k_op = h_op.to_located_k2_op_id(&OpBasis::from_raw_32_and_type(
vec![0xde; 32],
hash_type::AnyLinkable::Entry,
));
assert_eq!(h_op.to_string(), k_op.to_string());
}
#[tokio::test(flavor = "multi_thread")]
#[cfg(feature = "kitsune2_transport_tx5")]
async fn should_set_kitsune2_config() {
let actor = test_p2p_actor().await;
let actor_p2p: Arc<HolochainP2pActor> =
Arc::downcast(actor).expect("failed to downcast actor");
let retrieved_kitsune_config = actor_p2p.kitsune2_config.clone();
let bootstrap_config: kitsune2_core::factories::CoreBootstrapModConfig =
retrieved_kitsune_config
.get_module_config()
.expect("failed to get bootstrap config");
assert_eq!(
bootstrap_config.core_bootstrap.backoff_max_ms, 5_000,
"backoff_max_ms should match"
);
assert_eq!(
bootstrap_config.core_bootstrap.backoff_min_ms, 100,
"backoff_min_ms should match"
);
let tx5_transport_config: kitsune2_transport_tx5::Tx5TransportModConfig =
retrieved_kitsune_config
.get_module_config()
.expect("failed to get tx5 transport config");
assert_eq!(
tx5_transport_config.tx5_transport.server_url, "wss://localhost:9999",
"server_url should match"
);
assert_eq!(
tx5_transport_config.tx5_transport.timeout_s, 300,
"timeout_s should match"
);
}
#[tokio::test(flavor = "multi_thread")]
#[cfg(feature = "kitsune2_transport_tx5")]
async fn should_get_no_overrides_for_space_if_default() {
let actor = test_p2p_actor().await;
let actor_p2p: Arc<HolochainP2pActor> =
Arc::downcast(actor).expect("failed to downcast actor");
let space_overrides = CellConfigOverrides::default();
let overrides = actor_p2p
.space_config_override(space_overrides)
.expect("failed to get overrides");
assert!(overrides.is_none(), "overrides should be none");
}
#[tokio::test(flavor = "multi_thread")]
#[cfg(feature = "kitsune2_transport_tx5")]
async fn should_get_overrides_for_space_if_provided() {
let actor = test_p2p_actor().await;
let actor_p2p: Arc<HolochainP2pActor> =
Arc::downcast(actor).expect("failed to downcast actor");
let space_overrides = CellConfigOverrides {
bootstrap_url: Some("http://override:1234".to_string()),
relay_url: Some("wss://override:5678".to_string()),
};
let overrides = actor_p2p
.space_config_override(space_overrides)
.expect("failed to get overrides")
.expect("overrides should be some");
let bootstrap_config: kitsune2_core::factories::CoreBootstrapModConfig = overrides
.get_module_config()
.expect("failed to get bootstrap config");
assert_eq!(
bootstrap_config.core_bootstrap.server_url,
Some("http://override:1234".to_string()),
"bootstrap_url should match"
);
let tx5_transport_config: kitsune2_transport_tx5::Tx5TransportModConfig = overrides
.get_module_config()
.expect("failed to get tx5 transport config");
assert_eq!(
tx5_transport_config.tx5_transport.server_url, "wss://override:5678",
"relay_url should match"
);
}
#[tokio::test(flavor = "multi_thread")]
#[cfg(feature = "transport-iroh")]
async fn should_get_overrides_for_space_with_iroh_relay_url() {
let actor = test_p2p_actor_iroh().await;
let actor_p2p: Arc<HolochainP2pActor> =
Arc::downcast(actor).expect("failed to downcast actor");
let space_overrides = CellConfigOverrides {
bootstrap_url: Some("http://override:1234".to_string()),
relay_url: Some("wss://override:5678".to_string()),
};
let overrides = actor_p2p
.space_config_override(space_overrides)
.expect("failed to get overrides")
.expect("overrides should be some");
let bootstrap_config: kitsune2_core::factories::CoreBootstrapModConfig = overrides
.get_module_config()
.expect("failed to get bootstrap config");
assert_eq!(
bootstrap_config.core_bootstrap.server_url,
Some("http://override:1234".to_string()),
"bootstrap_url should match"
);
let iroh_transport_config: kitsune2_transport_iroh::IrohTransportModConfig = overrides
.get_module_config()
.expect("failed to get iroh transport config");
assert_eq!(
iroh_transport_config.iroh_transport.relay_url,
Some("wss://override:5678".to_string()),
"relay_url should match"
);
}
#[cfg(feature = "kitsune2_transport_tx5")]
async fn test_p2p_actor() -> Arc<dyn HcP2p> {
use kitsune2_core::factories::{CoreBootstrapConfig, CoreBootstrapModConfig};
let bootstrap = CoreBootstrapModConfig {
core_bootstrap: CoreBootstrapConfig {
server_url: None,
auth_material_base64: None,
backoff_max_ms: 5_000,
backoff_min_ms: 100,
},
};
let tx_config = kitsune2_transport_tx5::Tx5TransportModConfig {
tx5_transport: kitsune2_transport_tx5::Tx5TransportConfig {
server_url: "wss://localhost:9999".to_string(),
timeout_s: 300,
..Default::default()
},
};
let kitsune_config = Config::default();
kitsune_config
.set_module_config(&bootstrap)
.expect("failed to set config");
kitsune_config
.set_module_config(&tx_config)
.expect("failed to set config");
let kitsune_config_json =
serde_json::to_value(&kitsune_config).expect("failed to serialize kitsune config");
let config = HolochainP2pConfig {
network_config: Some(kitsune_config_json),
..Default::default()
};
HolochainP2pActor::create(config, holochain_keystore::test_keystore())
.await
.expect("failed to create actor")
}
#[cfg(feature = "transport-iroh")]
async fn test_p2p_actor_iroh() -> Arc<dyn HcP2p> {
use kitsune2_core::factories::{CoreBootstrapConfig, CoreBootstrapModConfig};
let bootstrap = CoreBootstrapModConfig {
core_bootstrap: CoreBootstrapConfig {
server_url: None,
auth_material_base64: None,
backoff_max_ms: 5_000,
backoff_min_ms: 100,
},
};
let kitsune_config = Config::default();
kitsune_config
.set_module_config(&bootstrap)
.expect("failed to set config");
let kitsune_config_json =
serde_json::to_value(&kitsune_config).expect("failed to serialize kitsune config");
let config = HolochainP2pConfig {
network_config: Some(kitsune_config_json),
..Default::default()
};
HolochainP2pActor::create(config, holochain_keystore::test_keystore())
.await
.expect("failed to create actor")
}
struct TestP2pActorHarness {
pub actor: Arc<HolochainP2pActor>,
pub event_handler: Arc<BlockingEventHandler>,
}
#[derive(Clone, Debug)]
struct BlockingEventHandler {
pub handle_call_remote_count: Arc<Mutex<u32>>,
pub handle_publish_count: Arc<Mutex<u32>>,
pub handle_get_count: Arc<Mutex<u32>>,
pub handle_get_links_count: Arc<Mutex<u32>>,
pub handle_count_links_count: Arc<Mutex<u32>>,
pub handle_get_agent_activity_count: Arc<Mutex<u32>>,
pub handle_must_get_agent_activity_count: Arc<Mutex<u32>>,
pub handle_validation_receipts_received_count: Arc<Mutex<u32>>,
pub handle_publish_countersign_count: Arc<Mutex<u32>>,
pub handle_countersigning_session_negotiation_count: Arc<Mutex<u32>>,
}
impl BlockingEventHandler {
fn new() -> Self {
Self {
handle_call_remote_count: Arc::new(Mutex::new(0)),
handle_publish_count: Arc::new(Mutex::new(0)),
handle_get_count: Arc::new(Mutex::new(0)),
handle_get_links_count: Arc::new(Mutex::new(0)),
handle_count_links_count: Arc::new(Mutex::new(0)),
handle_get_agent_activity_count: Arc::new(Mutex::new(0)),
handle_must_get_agent_activity_count: Arc::new(Mutex::new(0)),
handle_validation_receipts_received_count: Arc::new(Mutex::new(0)),
handle_publish_countersign_count: Arc::new(Mutex::new(0)),
handle_countersigning_session_negotiation_count: Arc::new(Mutex::new(0)),
}
}
}
impl HcP2pHandler for BlockingEventHandler {
fn handle_call_remote(
&self,
_dna_hash: DnaHash,
_to_agent: AgentPubKey,
_zome_call_params_serialized: ExternIO,
_signature: holochain_types::prelude::Signature,
) -> BoxFut<'_, HolochainP2pResult<SerializedBytes>> {
let mut count = self.handle_call_remote_count.lock().unwrap();
*count += 1;
Box::pin(std::future::pending())
}
fn handle_publish(
&self,
_dna_hash: DnaHash,
_ops: Vec<holochain_types::dht_op::DhtOp>,
) -> BoxFut<'_, HolochainP2pResult<()>> {
let mut count = self.handle_publish_count.lock().unwrap();
*count += 1;
Box::pin(std::future::pending())
}
fn handle_get(
&self,
_dna_hash: DnaHash,
_to_agent: AgentPubKey,
_dht_hash: holo_hash::AnyDhtHash,
) -> BoxFut<'_, HolochainP2pResult<WireOps>> {
let mut count = self.handle_get_count.lock().unwrap();
*count += 1;
Box::pin(std::future::pending())
}
fn handle_get_links(
&self,
_dna_hash: DnaHash,
_to_agent: AgentPubKey,
_link_key: WireLinkKey,
_options: GetLinksOptions,
) -> BoxFut<'_, HolochainP2pResult<WireLinkOps>> {
let mut count = self.handle_get_links_count.lock().unwrap();
*count += 1;
Box::pin(std::future::pending())
}
fn handle_count_links(
&self,
_dna_hash: DnaHash,
_to_agent: AgentPubKey,
_query: WireLinkQuery,
) -> BoxFut<'_, HolochainP2pResult<CountLinksResponse>> {
let mut count = self.handle_count_links_count.lock().unwrap();
*count += 1;
Box::pin(std::future::pending())
}
fn handle_get_agent_activity(
&self,
_dna_hash: DnaHash,
_to_agent: AgentPubKey,
_agent: AgentPubKey,
_query: ChainQueryFilter,
_options: GetActivityOptions,
) -> BoxFut<'_, HolochainP2pResult<AgentActivityResponse>> {
let mut count = self.handle_get_agent_activity_count.lock().unwrap();
*count += 1;
Box::pin(std::future::pending())
}
fn handle_must_get_agent_activity(
&self,
_dna_hash: DnaHash,
_to_agent: AgentPubKey,
_author: AgentPubKey,
_filter: holochain_zome_types::chain::ChainFilter,
) -> BoxFut<'_, HolochainP2pResult<MustGetAgentActivityResponse>> {
let mut count = self.handle_must_get_agent_activity_count.lock().unwrap();
*count += 1;
Box::pin(std::future::pending())
}
fn handle_validation_receipts_received(
&self,
_dna_hash: DnaHash,
_to_agent: AgentPubKey,
_receipts: holochain_types::prelude::ValidationReceiptBundle,
) -> BoxFut<'_, HolochainP2pResult<()>> {
let mut count = self
.handle_validation_receipts_received_count
.lock()
.unwrap();
*count += 1;
Box::pin(std::future::pending())
}
fn handle_publish_countersign(
&self,
_dna_hash: DnaHash,
_op: holochain_types::dht_op::ChainOp,
) -> BoxFut<'_, HolochainP2pResult<()>> {
let mut count = self.handle_publish_countersign_count.lock().unwrap();
*count += 1;
Box::pin(std::future::pending())
}
fn handle_countersigning_session_negotiation(
&self,
_dna_hash: DnaHash,
_to_agent: AgentPubKey,
_message: CountersigningSessionNegotiationMessage,
) -> BoxFut<'_, HolochainP2pResult<()>> {
let mut count = self
.handle_countersigning_session_negotiation_count
.lock()
.unwrap();
*count += 1;
Box::pin(std::future::pending())
}
}
impl TestP2pActorHarness {
async fn new(concurrency_limit: u16) -> Self {
let config = HolochainP2pConfig {
incoming_request_concurrency_limit: concurrency_limit,
network_config: Some(serde_json::json!({
"coreBootstrap": {
"serverUrl": "https://not_a_host"
},
"tx5Transport": {
"serverUrl": "wss://not_a_host",
"timeoutS": 30,
"webrtcConnectTimeoutS": 25,
}
})),
..Default::default()
};
let actor = HolochainP2pActor::create(config, holochain_keystore::test_keystore())
.await
.unwrap();
let actor: Arc<HolochainP2pActor> = Arc::downcast(actor).unwrap();
let event_handler = Arc::new(BlockingEventHandler::new());
actor.register_handler(event_handler.clone()).await.unwrap();
Self {
actor,
event_handler,
}
}
fn recv_notify(
&self,
from_peer: Url,
space_id: SpaceId,
data: bytes::Bytes,
) -> K2Result<()> {
self.actor.recv_notify(from_peer, space_id, data)
}
fn register_pending_message_response_handler(
&self,
msg_id: u64,
) -> tokio::sync::oneshot::Receiver<WireMessage> {
let (s, r) = tokio::sync::oneshot::channel();
self.actor
.pending
.lock()
.unwrap()
.register(msg_id, s, Duration::from_secs(60));
r
}
}
fn create_encode_wire_message_get_req(msg_id: u64) -> bytes::Bytes {
let msg = WireMessage::GetReq {
msg_id,
to_agent: AgentPubKey::from_raw_32(vec![1; 32]),
dht_hash: ActionHash::from_raw_32(vec![2; 32]).into(),
};
WireMessage::encode_batch(&[&msg]).unwrap()
}
#[tokio::test(flavor = "multi_thread")]
async fn incoming_authority_requests_are_concurrency_limited() {
let dna_hash = DnaHash::from_raw_32(vec![0; 32]);
let space_id = dna_hash.to_k2_space();
let from_peer = kitsune2_api::Url::from_str("ws://test:80/1").unwrap();
let concurrency_limit = 15;
let harness = TestP2pActorHarness::new(concurrency_limit).await;
for i in 0..concurrency_limit {
let msg_data = create_encode_wire_message_get_req(i as u64);
harness
.recv_notify(from_peer.clone(), space_id.clone(), msg_data)
.unwrap();
}
let all_handled = retry_fn_until_timeout(
async || {
let handled_count = harness.event_handler.handle_get_count.lock().unwrap();
*handled_count == concurrency_limit as u32
},
None,
None,
)
.await;
assert!(all_handled.is_ok());
assert_eq!(
harness
.actor
.incoming_request_concurrency_limit_semaphore
.available_permits(),
0
);
let msg_data = create_encode_wire_message_get_req(3);
harness
.recv_notify(from_peer.clone(), space_id.clone(), msg_data)
.unwrap();
let is_handled = retry_fn_until_timeout(
async || {
let handled_count = harness.event_handler.handle_get_count.lock().unwrap();
*handled_count == concurrency_limit as u32 + 1
},
None,
None,
)
.await;
assert!(is_handled.is_err());
}
#[tokio::test(flavor = "multi_thread")]
async fn authority_request_wire_messages_are_concurrency_limited() {
let dna_hash = DnaHash::from_raw_32(vec![0; 32]);
let space_id = dna_hash.to_k2_space();
let from_peer = kitsune2_api::Url::from_str("ws://test:80/1").unwrap();
let concurrency_limit = 2;
let harness = TestP2pActorHarness::new(concurrency_limit).await;
assert_eq!(
harness
.actor
.incoming_request_concurrency_limit_semaphore
.available_permits(),
concurrency_limit as usize
);
for i in 0..concurrency_limit {
let msg_data = create_encode_wire_message_get_req(i as u64);
harness
.recv_notify(from_peer.clone(), space_id.clone(), msg_data)
.unwrap();
}
let msg = WireMessage::GetReq {
msg_id: 1,
to_agent: AgentPubKey::from_raw_32(vec![1; 32]),
dht_hash: ActionHash::from_raw_32(vec![2; 32]).into(),
};
let msg_data = WireMessage::encode_batch(&[&msg]).unwrap();
harness
.recv_notify(from_peer.clone(), space_id.clone(), msg_data)
.unwrap();
let is_handled = retry_fn_until_timeout(
async || {
let handled_count = harness.event_handler.handle_get_count.lock().unwrap();
*handled_count == concurrency_limit as u32 + 1
},
Some(1000),
None,
)
.await;
assert!(is_handled.is_err());
let msg = WireMessage::GetLinksReq {
msg_id: 2,
to_agent: AgentPubKey::from_raw_32(vec![1; 32]),
options: GetLinksOptions {},
link_key: WireLinkKey {
base: ActionHash::from_raw_32(vec![2; 32]).into(),
type_query: LinkTypeFilter::Types(Vec::new()),
tag: None,
after: None,
before: None,
author: None,
},
};
let msg_data = WireMessage::encode_batch(&[&msg]).unwrap();
harness
.recv_notify(from_peer.clone(), space_id.clone(), msg_data)
.unwrap();
let is_handled = retry_fn_until_timeout(
async || {
let handled_count = harness.event_handler.handle_get_links_count.lock().unwrap();
*handled_count == 1
},
Some(1000),
None,
)
.await;
assert!(is_handled.is_err());
let msg = WireMessage::CountLinksReq {
msg_id: 3,
to_agent: AgentPubKey::from_raw_32(vec![1; 32]),
query: WireLinkQuery {
base: ActionHash::from_raw_32(vec![2; 32]).into(),
link_type: LinkTypeFilter::Types(Vec::new()),
tag_prefix: None,
before: None,
after: None,
author: None,
},
};
let msg_data = WireMessage::encode_batch(&[&msg]).unwrap();
harness
.recv_notify(from_peer.clone(), space_id.clone(), msg_data)
.unwrap();
let is_handled = retry_fn_until_timeout(
async || {
let handled_count = harness
.event_handler
.handle_count_links_count
.lock()
.unwrap();
*handled_count == 1
},
Some(1000),
None,
)
.await;
assert!(is_handled.is_err());
let msg = WireMessage::GetAgentActivityReq {
msg_id: 4,
to_agent: AgentPubKey::from_raw_32(vec![1; 32]),
agent: AgentPubKey::from_raw_32(vec![2; 32]),
query: ChainQueryFilter::new(),
options: GetActivityOptions {
include_valid_activity: true,
include_rejected_activity: true,
include_warrants: true,
include_full_records: true,
},
};
let msg_data = WireMessage::encode_batch(&[&msg]).unwrap();
harness
.recv_notify(from_peer.clone(), space_id.clone(), msg_data)
.unwrap();
let is_handled = retry_fn_until_timeout(
async || {
let handled_count = harness
.event_handler
.handle_get_agent_activity_count
.lock()
.unwrap();
*handled_count == 1
},
Some(1000),
None,
)
.await;
assert!(is_handled.is_err());
let msg = WireMessage::MustGetAgentActivityReq {
msg_id: 5,
to_agent: AgentPubKey::from_raw_32(vec![1; 32]),
agent: AgentPubKey::from_raw_32(vec![2; 32]),
filter: ChainFilter::new(ActionHash::from_raw_32(vec![3; 32])),
};
let msg_data = WireMessage::encode_batch(&[&msg]).unwrap();
harness
.recv_notify(from_peer.clone(), space_id.clone(), msg_data)
.unwrap();
let is_handled = retry_fn_until_timeout(
async || {
let handled_count = harness
.event_handler
.handle_must_get_agent_activity_count
.lock()
.unwrap();
*handled_count == 1
},
Some(1000),
None,
)
.await;
assert!(is_handled.is_err());
}
#[tokio::test(flavor = "multi_thread")]
async fn other_wire_messages_are_not_concurrency_limited() {
let dna_hash = DnaHash::from_raw_32(vec![0; 32]);
let space_id = dna_hash.to_k2_space();
let from_peer = kitsune2_api::Url::from_str("ws://test:80/1").unwrap();
let concurrency_limit = 2;
let harness = TestP2pActorHarness::new(concurrency_limit).await;
assert_eq!(
harness
.actor
.incoming_request_concurrency_limit_semaphore
.available_permits(),
concurrency_limit as usize
);
for i in 0..concurrency_limit {
let msg_data = create_encode_wire_message_get_req(i as u64);
harness
.recv_notify(from_peer.clone(), space_id.clone(), msg_data)
.unwrap();
}
let msg = WireMessage::ErrorRes {
msg_id: 1,
error: "test error".to_string(),
};
let msg_data = WireMessage::encode_batch(&[&msg]).unwrap();
let msg_receiver = harness.register_pending_message_response_handler(1);
harness
.recv_notify(from_peer.clone(), space_id.clone(), msg_data)
.unwrap();
assert!(msg_receiver.await.is_ok());
let msg = WireMessage::CallRemoteRes {
msg_id: 2,
response: SerializedBytes::try_from(()).unwrap(),
};
let msg_data = WireMessage::encode_batch(&[&msg]).unwrap();
let msg_receiver = harness.register_pending_message_response_handler(2);
harness
.recv_notify(from_peer.clone(), space_id.clone(), msg_data)
.unwrap();
assert!(msg_receiver.await.is_ok());
let msg = WireMessage::GetRes {
msg_id: 3,
response: WireOps::Record(WireRecordOps::new()),
};
let msg_data = WireMessage::encode_batch(&[&msg]).unwrap();
let msg_receiver = harness.register_pending_message_response_handler(3);
harness
.recv_notify(from_peer.clone(), space_id.clone(), msg_data)
.unwrap();
assert!(msg_receiver.await.is_ok());
let msg = WireMessage::GetLinksRes {
msg_id: 4,
response: WireLinkOps::default(),
};
let msg_data = WireMessage::encode_batch(&[&msg]).unwrap();
let msg_receiver = harness.register_pending_message_response_handler(4);
harness
.recv_notify(from_peer.clone(), space_id.clone(), msg_data)
.unwrap();
assert!(msg_receiver.await.is_ok());
let msg = WireMessage::CountLinksRes {
msg_id: 5,
response: CountLinksResponse::new(Vec::new()),
};
let msg_data = WireMessage::encode_batch(&[&msg]).unwrap();
let msg_receiver = harness.register_pending_message_response_handler(5);
harness
.recv_notify(from_peer.clone(), space_id.clone(), msg_data)
.unwrap();
assert!(msg_receiver.await.is_ok());
let msg = WireMessage::GetAgentActivityRes {
msg_id: 6,
response: AgentActivityResponse {
agent: AgentPubKey::from_raw_32(vec![1; 32]),
valid_activity: ChainItems::NotRequested,
rejected_activity: ChainItems::NotRequested,
status: ChainStatus::Valid(ChainHead {
action_seq: 0,
hash: ActionHash::from_raw_32(vec![2; 32]),
}),
highest_observed: None,
warrants: Vec::new(),
},
};
let msg_data = WireMessage::encode_batch(&[&msg]).unwrap();
let msg_receiver = harness.register_pending_message_response_handler(6);
harness
.recv_notify(from_peer.clone(), space_id.clone(), msg_data)
.unwrap();
assert!(msg_receiver.await.is_ok());
let msg = WireMessage::MustGetAgentActivityRes {
msg_id: 7,
response: MustGetAgentActivityResponse::EmptyRange,
};
let msg_data = WireMessage::encode_batch(&[&msg]).unwrap();
let msg_receiver = harness.register_pending_message_response_handler(7);
harness
.recv_notify(from_peer.clone(), space_id.clone(), msg_data)
.unwrap();
assert!(msg_receiver.await.is_ok());
let msg = WireMessage::SendValidationReceiptsRes { msg_id: 8 };
let msg_data = WireMessage::encode_batch(&[&msg]).unwrap();
let msg_receiver = harness.register_pending_message_response_handler(8);
harness
.recv_notify(from_peer.clone(), space_id.clone(), msg_data)
.unwrap();
assert!(msg_receiver.await.is_ok());
let msg = WireMessage::CallRemoteReq {
msg_id: 9,
to_agent: AgentPubKey::from_raw_32(vec![1; 32]),
zome_call_params_serialized: ExternIO::encode(()).unwrap(),
signature: Signature([0; 64]),
};
let msg_data = WireMessage::encode_batch(&[&msg]).unwrap();
harness
.recv_notify(from_peer.clone(), space_id.clone(), msg_data)
.unwrap();
let is_handled = retry_fn_until_timeout(
async || {
let handled_count = harness
.event_handler
.handle_call_remote_count
.lock()
.unwrap();
*handled_count == 1
},
None,
None,
)
.await;
assert!(is_handled.is_ok());
let msg = WireMessage::SendValidationReceiptsReq {
msg_id: 10,
to_agent: AgentPubKey::from_raw_32(vec![1; 32]),
receipts: Vec::new().into(),
};
let msg_data = WireMessage::encode_batch(&[&msg]).unwrap();
harness
.recv_notify(from_peer.clone(), space_id.clone(), msg_data)
.unwrap();
let is_handled = retry_fn_until_timeout(
async || {
let handled_count = harness
.event_handler
.handle_validation_receipts_received_count
.lock()
.unwrap();
*handled_count == 1
},
None,
None,
)
.await;
assert!(is_handled.is_ok());
let msg = WireMessage::RemoteSignalEvt {
to_agent: AgentPubKey::from_raw_32(vec![1; 32]),
zome_call_params_serialized: ExternIO::encode(()).unwrap(),
signature: Signature([0; 64]),
};
let msg_data = WireMessage::encode_batch(&[&msg]).unwrap();
harness
.recv_notify(from_peer.clone(), space_id.clone(), msg_data)
.unwrap();
let is_handled = retry_fn_until_timeout(
async || {
let handled_count = harness
.event_handler
.handle_call_remote_count
.lock()
.unwrap();
*handled_count == 2 },
None,
None,
)
.await;
assert!(is_handled.is_ok());
let msg = WireMessage::PublishCountersignEvt {
op: holochain_types::dht_op::ChainOp::RegisterAgentActivity(
Signature([0; 64]),
Action::InitZomesComplete(InitZomesComplete {
author: AgentPubKey::from_raw_32(vec![1; 32]),
timestamp: holochain_types::prelude::Timestamp::now(),
action_seq: 0,
prev_action: ActionHash::from_raw_32(vec![2; 32]),
}),
),
};
let msg_data = WireMessage::encode_batch(&[&msg]).unwrap();
harness
.recv_notify(from_peer.clone(), space_id.clone(), msg_data)
.unwrap();
let is_handled = retry_fn_until_timeout(
async || {
let handled_count = harness
.event_handler
.handle_publish_countersign_count
.lock()
.unwrap();
*handled_count == 1 },
None,
None,
)
.await;
assert!(is_handled.is_ok());
let msg = WireMessage::CountersigningSessionNegotiationEvt {
to_agent: AgentPubKey::from_raw_32(vec![1; 32]),
message: CountersigningSessionNegotiationMessage::AuthorityResponse(Vec::new()),
};
let msg_data = WireMessage::encode_batch(&[&msg]).unwrap();
harness
.recv_notify(from_peer.clone(), space_id.clone(), msg_data)
.unwrap();
let is_handled = retry_fn_until_timeout(
async || {
let handled_count = harness
.event_handler
.handle_countersigning_session_negotiation_count
.lock()
.unwrap();
*handled_count == 1 },
None,
None,
)
.await;
assert!(is_handled.is_ok());
}
#[tokio::test(flavor = "multi_thread")]
async fn batch_wire_messages_apply_concurrency_limit_only_to_authority_requests() {
let dna_hash = DnaHash::from_raw_32(vec![0; 32]);
let space_id = dna_hash.to_k2_space();
let from_peer = kitsune2_api::Url::from_str("ws://test:80/1").unwrap();
let concurrency_limit = 2;
let harness = TestP2pActorHarness::new(concurrency_limit).await;
assert_eq!(
harness
.actor
.incoming_request_concurrency_limit_semaphore
.available_permits(),
concurrency_limit as usize
);
let mut wire_messages_batch = vec![];
for i in 0..concurrency_limit {
let msg = WireMessage::GetReq {
msg_id: i as u64,
to_agent: AgentPubKey::from_raw_32(vec![1; 32]),
dht_hash: ActionHash::from_raw_32(vec![2; 32]).into(),
};
wire_messages_batch.push(msg);
}
let msg_id = 888;
let unlimited_message_reciever = harness.register_pending_message_response_handler(msg_id);
let msg = WireMessage::ErrorRes {
msg_id,
error: "test error".to_string(),
};
wire_messages_batch.push(msg);
let msg = WireMessage::GetReq {
msg_id: 999,
to_agent: AgentPubKey::from_raw_32(vec![1; 32]),
dht_hash: ActionHash::from_raw_32(vec![2; 32]).into(),
};
wire_messages_batch.push(msg);
let wire_message_batch: Vec<&WireMessage> = wire_messages_batch.iter().collect();
let wire_message_batch_slice: &[&WireMessage] = &wire_message_batch;
let wire_messages_batch_bytes =
WireMessage::encode_batch(wire_message_batch_slice).unwrap();
harness
.recv_notify(
from_peer.clone(),
space_id.clone(),
wire_messages_batch_bytes,
)
.unwrap();
let is_handled = retry_fn_until_timeout(
async || {
let handled_count = harness.event_handler.handle_get_count.lock().unwrap();
*handled_count == concurrency_limit as u32
},
None,
None,
)
.await;
assert!(is_handled.is_ok());
let is_handled = retry_fn_until_timeout(
async || {
let handled_count = harness.event_handler.handle_get_count.lock().unwrap();
*handled_count == concurrency_limit as u32 + 1
},
None,
None,
)
.await;
assert!(is_handled.is_err());
assert!(unlimited_message_reciever.await.is_ok());
}
}