use either::Either;
pub(crate) use self::messages::{SubscribeMsg, SubscribeMsgResult};
use super::OpError;
use crate::ring::PeerKeyLocation;
use crate::tracing::NetEventLog;
use crate::{
message::{InnerMessage, Transaction},
node::OpManager,
ring::Location,
};
use freenet_stdlib::prelude::*;
use serde::{Deserialize, Serialize};
use tokio::time::{Duration, sleep};
pub(super) const MAX_BREADTH: usize = 3;
pub(super) const MAX_RETRIES: usize = 10;
pub(super) const CONTRACT_WAIT_TIMEOUT_MS: u64 = 2_000;
pub(super) async fn wait_for_contract_with_timeout(
op_manager: &OpManager,
instance_id: ContractInstanceId,
timeout_ms: u64,
) -> Result<Option<ContractKey>, OpError> {
if let Some(key) = super::has_contract(op_manager, instance_id).await? {
return Ok(Some(key));
}
let notifier = op_manager.wait_for_contract(instance_id);
if let Some(key) = super::has_contract(op_manager, instance_id).await? {
return Ok(Some(key));
}
crate::deterministic_select! {
_ = notifier => {},
_ = sleep(Duration::from_millis(timeout_ms)) => {},
}
super::has_contract(op_manager, instance_id).await
}
#[derive(Debug)]
pub(super) enum InitialRequest {
LocallyComplete { key: ContractKey },
NoHostingPeers,
PeerNotJoined,
NetworkRequest {
target: PeerKeyLocation,
target_addr: std::net::SocketAddr,
visited: super::VisitedPeers,
alternatives: Vec<PeerKeyLocation>,
htl: usize,
},
}
pub(super) async fn prepare_initial_request(
op_manager: &OpManager,
id: Transaction,
instance_id: ContractInstanceId,
is_renewal: bool,
) -> Result<InitialRequest, OpError> {
let own_addr = match op_manager.ring.connection_manager.peer_addr() {
Ok(addr) => addr,
Err(_) => {
if let Some(key) = super::has_contract(op_manager, instance_id).await? {
tracing::info!(
tx = %id,
contract = %key,
phase = "local_complete",
"Peer not joined, but contract available locally - completing subscription locally"
);
return Ok(InitialRequest::LocallyComplete { key });
}
tracing::warn!(
tx = %id,
contract = %instance_id,
phase = "peer_not_joined",
"Cannot subscribe: peer has not joined network yet and contract not available locally"
);
return Ok(InitialRequest::PeerNotJoined);
}
};
let mut visited = super::VisitedPeers::new(&id);
visited.mark_visited(own_addr);
let mut candidates =
op_manager
.ring
.k_closest_potentially_hosting(&instance_id, &visited, MAX_BREADTH);
let target = if !candidates.is_empty() {
candidates.remove(0)
} else {
let connections = op_manager
.ring
.connection_manager
.get_connections_by_location();
let mut sorted_keys: Vec<_> = connections.keys().collect();
sorted_keys.sort();
let fallback_target = sorted_keys
.into_iter()
.filter_map(|loc| connections.get(loc))
.flatten()
.find(|conn| {
conn.location
.socket_addr()
.map(|addr| !visited.probably_visited(addr))
.unwrap_or(false)
})
.map(|conn| conn.location.clone());
match fallback_target {
Some(target) => {
tracing::debug!(
tx = %id,
contract = %instance_id,
target = ?target.socket_addr(),
phase = "fallback_routing",
"Using fallback connection for subscription (k_closest returned empty)"
);
target
}
None => {
if let Some(key) = super::has_contract(op_manager, instance_id).await? {
tracing::info!(
tx = %id,
contract = %key,
phase = "local_complete",
"Contract available locally and no network connections, completing subscription locally"
);
return Ok(InitialRequest::LocallyComplete { key });
}
tracing::warn!(tx = %id, contract = %instance_id, phase = "error", "No remote peers available for subscription");
return Ok(InitialRequest::NoHostingPeers);
}
}
};
let target_addr = target
.socket_addr()
.expect("target must have socket address");
visited.mark_visited(target_addr);
tracing::debug!(
tx = %id,
contract = %instance_id,
is_renewal,
target_peer = %target_addr,
"subscribe: forwarding Request to target peer"
);
if let Some(event) = NetEventLog::subscribe_request(
&id,
&op_manager.ring,
instance_id,
target.clone(),
op_manager.ring.max_hops_to_live,
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
Ok(InitialRequest::NetworkRequest {
target,
target_addr,
visited,
alternatives: candidates,
htl: op_manager.ring.max_hops_to_live,
})
}
async fn complete_local_subscription(
op_manager: &OpManager,
id: Transaction,
key: ContractKey,
is_renewal: bool,
) -> Result<(), OpError> {
tracing::debug!(
%key,
tx = %id,
is_renewal,
"Local subscription completed - client will receive updates via executor notification channel"
);
if !is_renewal {
let became_interested = op_manager.interest_manager.add_local_client(&key);
if became_interested {
super::broadcast_change_interests(op_manager, vec![key], vec![]).await;
}
}
op_manager
.notify_node_event(crate::message::NodeEvent::LocalSubscribeComplete {
tx: id,
key,
subscribed: true,
is_renewal,
})
.await?;
op_manager.completed(id);
Ok(())
}
pub(crate) async fn register_downstream_subscriber(
op_manager: &OpManager,
key: &ContractKey,
requester_addr: std::net::SocketAddr,
requester_pub_key: Option<&crate::transport::TransportPublicKey>,
source_addr: Option<std::net::SocketAddr>,
tx: &Transaction,
warn_suffix: &str,
) {
let peer_key = requester_pub_key
.map(|pk| crate::ring::interest::PeerKey::from(pk.clone()))
.or_else(|| {
op_manager
.ring
.connection_manager
.get_peer_by_addr(requester_addr)
.or_else(|| {
source_addr
.and_then(|sa| op_manager.ring.connection_manager.get_peer_by_addr(sa))
})
.map(|pkl| crate::ring::interest::PeerKey::from(pkl.pub_key))
});
if let Some(peer_key) = peer_key {
if op_manager
.ring
.add_downstream_subscriber(key, peer_key.clone())
{
let is_new_peer = op_manager
.interest_manager
.register_peer_interest(key, peer_key, None, false);
if is_new_peer {
let became_interested = op_manager.interest_manager.add_downstream_subscriber(key);
if became_interested {
super::broadcast_change_interests(op_manager, vec![*key], vec![]).await;
}
}
} else {
tracing::warn!(
tx = %tx,
contract = %key,
"Downstream subscriber limit reached — skipping peer interest registration"
);
}
} else {
tracing::warn!(
tx = %tx,
contract = %key,
requester_addr = %requester_addr,
source_addr = ?source_addr,
"Subscribe: could not find peer to register interest{}",
warn_suffix
);
}
}
pub(crate) async fn handle_unsubscribe_inbound(
op_manager: &OpManager,
tx: Transaction,
instance_id: ContractInstanceId,
source_addr: Option<std::net::SocketAddr>,
) {
tracing::debug!(
tx = %tx,
%instance_id,
?source_addr,
"received unsubscribe notification"
);
let sender_peer = source_addr.and_then(|addr| {
op_manager
.ring
.connection_manager
.get_peer_by_addr(addr)
.map(|pkl| crate::ring::interest::PeerKey::from(pkl.pub_key))
});
let key = match super::has_contract(op_manager, instance_id).await {
Ok(Some(key)) => key,
Ok(None) => {
tracing::debug!(
tx = %tx,
%instance_id,
"Contract not found locally, ignoring unsubscribe"
);
return;
}
Err(err) => {
tracing::warn!(
tx = %tx,
%instance_id,
%err,
"Contract lookup failed while handling unsubscribe"
);
return;
}
};
if let Some(peer) = &sender_peer {
let was_downstream = op_manager.ring.remove_downstream_subscriber(&key, peer);
let was_interested = op_manager.interest_manager.remove_peer_interest(&key, peer);
if was_downstream || was_interested {
let lost_interest = op_manager
.interest_manager
.remove_downstream_subscriber(&key);
if lost_interest {
super::broadcast_change_interests(op_manager, vec![], vec![key]).await;
}
}
} else {
tracing::warn!(
tx = %tx,
%instance_id,
?source_addr,
"Unsubscribe: could not resolve sender peer, downstream entry not removed"
);
}
if op_manager.ring.should_unsubscribe_upstream(&key) {
tracing::debug!(
tx = %tx,
contract = %key,
"No remaining subscribers, propagating unsubscribe upstream"
);
op_manager.send_unsubscribe_upstream(&key).await;
} else {
tracing::debug!(
tx = %tx,
contract = %key,
"Still have subscribers, not propagating unsubscribe"
);
}
}
#[cfg(test)]
mod tests;
pub(crate) mod op_ctx_task;
pub(crate) use op_ctx_task::{
RenewalOutcome, run_client_subscribe, run_executor_subscribe, run_renewal_subscribe,
start_client_subscribe,
};
mod messages {
use std::fmt::Display;
use super::*;
#[derive(Debug, Serialize, Deserialize, Clone)]
pub(crate) enum SubscribeMsgResult {
Subscribed { key: ContractKey },
NotFound,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub(crate) enum SubscribeMsg {
Request {
id: Transaction,
instance_id: ContractInstanceId,
htl: usize,
visited: super::super::VisitedPeers,
is_renewal: bool,
},
Response {
id: Transaction,
instance_id: ContractInstanceId,
result: SubscribeMsgResult,
},
Unsubscribe {
id: Transaction,
instance_id: ContractInstanceId,
},
ForwardingAck {
id: Transaction,
instance_id: ContractInstanceId,
},
}
impl InnerMessage for SubscribeMsg {
fn id(&self) -> &Transaction {
match self {
Self::Request { id, .. }
| Self::Response { id, .. }
| Self::Unsubscribe { id, .. }
| Self::ForwardingAck { id, .. } => id,
}
}
fn requested_location(&self) -> Option<Location> {
match self {
Self::Request { instance_id, .. }
| Self::Response { instance_id, .. }
| Self::Unsubscribe { instance_id, .. }
| Self::ForwardingAck { instance_id, .. } => Some(Location::from(instance_id)),
}
}
}
impl Display for SubscribeMsg {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let id = self.id();
match self {
Self::Request { instance_id, .. } => {
write!(f, "Subscribe::Request(id: {id}, contract: {instance_id})")
}
Self::Response {
instance_id,
result,
..
} => {
let result_str = match result {
SubscribeMsgResult::Subscribed { key } => format!("Subscribed({key})"),
SubscribeMsgResult::NotFound => "NotFound".to_string(),
};
write!(
f,
"Subscribe::Response(id: {id}, instance_id: {instance_id}, result: {result_str})"
)
}
Self::Unsubscribe { instance_id, .. } => {
write!(
f,
"Subscribe::Unsubscribe(id: {id}, contract: {instance_id})"
)
}
Self::ForwardingAck { instance_id, .. } => {
write!(
f,
"Subscribe::ForwardingAck(id: {id}, instance_id: {instance_id})"
)
}
}
}
}
}