use std::collections::HashSet;
use std::future::Future;
use std::pin::Pin;
use std::time::Instant;
use either::Either;
pub(crate) use self::messages::{SubscribeMsg, SubscribeMsgResult};
use super::{OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult, get};
use crate::contract::{ContractHandlerEvent, StoreResponse};
use crate::node::IsOperationCompleted;
use crate::ring::PeerKeyLocation;
use crate::tracing::NetEventLog;
use crate::{
client_events::HostResult,
message::{InnerMessage, NetMessage, Transaction},
node::{NetworkBridge, OpManager},
ring::{KnownPeerKeyLocation, Location, RingError},
};
use freenet_stdlib::{
client_api::{ContractResponse, ErrorKind, HostResponse},
prelude::*,
};
use serde::{Deserialize, Serialize};
use tokio::time::{Duration, sleep};
const MAX_BREADTH: usize = 3;
const MAX_RETRIES: usize = 10;
const MIN_RETRY_HTL: usize = 3;
const CONTRACT_WAIT_TIMEOUT_MS: u64 = 2_000;
async fn wait_for_contract_with_timeout(
op_manager: &OpManager,
instance_id: ContractInstanceId,
timeout_ms: u64,
) -> Result<Option<ContractKey>, OpError> {
if let Some(key) = super::has_contract(op_manager, instance_id).await? {
return Ok(Some(key));
}
let notifier = op_manager.wait_for_contract(instance_id);
if let Some(key) = super::has_contract(op_manager, instance_id).await? {
return Ok(Some(key));
}
crate::deterministic_select! {
_ = notifier => {},
_ = sleep(Duration::from_millis(timeout_ms)) => {},
}
super::has_contract(op_manager, instance_id).await
}
async fn fetch_contract_if_missing(
op_manager: &OpManager,
instance_id: ContractInstanceId,
) -> Result<Option<ContractKey>, OpError> {
if let Some(key) = super::has_contract(op_manager, instance_id).await? {
return Ok(Some(key));
}
let get_op = get::start_op(instance_id, true, false, false);
let visited = super::VisitedPeers::new(&get_op.id);
get::request_get(op_manager, get_op, visited).await?;
wait_for_contract_with_timeout(op_manager, instance_id, CONTRACT_WAIT_TIMEOUT_MS).await
}
#[derive(Debug)]
struct PrepareRequestData {
id: Transaction,
instance_id: ContractInstanceId,
is_renewal: bool,
}
impl PrepareRequestData {
#[allow(dead_code)] fn into_awaiting_response(
self,
next_hop: Option<std::net::SocketAddr>,
) -> AwaitingResponseData {
AwaitingResponseData {
next_hop,
instance_id: self.instance_id,
retries: 0,
current_hop: 0,
tried_peers: HashSet::new(),
alternatives: Vec::new(),
attempts_at_hop: 0,
visited: super::VisitedPeers::new(&self.id),
}
}
#[allow(dead_code)] fn into_completed(self, key: ContractKey) -> CompletedData {
CompletedData { key }
}
}
#[derive(Debug)]
struct AwaitingResponseData {
next_hop: Option<std::net::SocketAddr>,
instance_id: ContractInstanceId,
retries: usize,
current_hop: usize,
tried_peers: HashSet<std::net::SocketAddr>,
alternatives: Vec<PeerKeyLocation>,
attempts_at_hop: usize,
visited: super::VisitedPeers,
}
impl AwaitingResponseData {
#[allow(dead_code)] fn into_completed(self, key: ContractKey) -> CompletedData {
CompletedData { key }
}
}
#[derive(Debug, Clone, Copy)]
struct CompletedData {
key: ContractKey,
}
#[derive(Debug)]
enum SubscribeState {
PrepareRequest(PrepareRequestData),
AwaitingResponse(AwaitingResponseData),
Completed(CompletedData),
Failed,
}
pub(crate) struct SubscribeResult {}
impl TryFrom<SubscribeOp> for SubscribeResult {
type Error = OpError;
fn try_from(value: SubscribeOp) -> Result<Self, Self::Error> {
if let SubscribeState::Completed(_) = value.state {
Ok(SubscribeResult {})
} else {
Err(OpError::UnexpectedOpState)
}
}
}
pub(crate) fn start_op(instance_id: ContractInstanceId, is_renewal: bool) -> SubscribeOp {
let id = Transaction::new::<SubscribeMsg>();
SubscribeOp {
id,
state: SubscribeState::PrepareRequest(PrepareRequestData {
id,
instance_id,
is_renewal,
}),
requester_addr: None, requester_pub_key: None,
is_renewal,
stats: None,
ack_received: false,
speculative_paths: 0,
}
}
pub(crate) fn start_op_with_id(
instance_id: ContractInstanceId,
id: Transaction,
is_renewal: bool,
) -> SubscribeOp {
SubscribeOp {
id,
state: SubscribeState::PrepareRequest(PrepareRequestData {
id,
instance_id,
is_renewal,
}),
requester_addr: None, requester_pub_key: None,
is_renewal,
stats: None,
ack_received: false,
speculative_paths: 0,
}
}
pub(crate) fn create_unsubscribe_op(
instance_id: ContractInstanceId,
tx: Transaction,
target_addr: std::net::SocketAddr,
) -> SubscribeOp {
SubscribeOp {
id: tx,
state: SubscribeState::AwaitingResponse(AwaitingResponseData {
next_hop: Some(target_addr),
instance_id,
retries: 0,
current_hop: 0,
tried_peers: HashSet::new(),
alternatives: Vec::new(),
attempts_at_hop: 0,
visited: super::VisitedPeers::new(&tx),
}),
requester_addr: None,
requester_pub_key: None,
is_renewal: false,
stats: None,
ack_received: false,
speculative_paths: 0,
}
}
pub(crate) async fn request_subscribe(
op_manager: &OpManager,
sub_op: SubscribeOp,
) -> Result<(), OpError> {
let SubscribeState::PrepareRequest(ref data) = sub_op.state else {
return Err(OpError::UnexpectedOpState);
};
let id = data.id;
let instance_id = data.instance_id;
let is_renewal = data.is_renewal;
tracing::debug!(tx = %id, contract = %instance_id, is_renewal, "subscribe: request_subscribe invoked");
match prepare_initial_request(op_manager, id, instance_id, is_renewal).await? {
InitialRequest::LocallyComplete { key } => {
complete_local_subscription(op_manager, id, key, is_renewal).await
}
InitialRequest::NoHostingPeers => Err(RingError::NoHostingPeers(instance_id).into()),
InitialRequest::PeerNotJoined => Err(RingError::PeerNotJoined.into()),
InitialRequest::NetworkRequest {
target,
target_addr,
visited,
alternatives,
htl,
} => {
let msg = SubscribeMsg::Request {
id,
instance_id,
htl,
visited: visited.clone(),
is_renewal,
};
let mut tried_peers = HashSet::new();
tried_peers.insert(target_addr);
let op = SubscribeOp {
id,
state: SubscribeState::AwaitingResponse(AwaitingResponseData {
next_hop: Some(target_addr),
instance_id,
retries: 0,
current_hop: htl,
tried_peers,
alternatives, attempts_at_hop: 1,
visited,
}),
requester_addr: None, requester_pub_key: None,
is_renewal,
stats: Some(SubscribeStats {
target_peer: target,
contract_location: Location::from(&instance_id),
request_sent_at: Instant::now(),
}),
ack_received: false,
speculative_paths: 0,
};
if is_renewal {
op_manager
.notify_op_change_nonblocking(NetMessage::from(msg), OpEnum::Subscribe(op))
.await?;
} else {
op_manager
.notify_op_change(NetMessage::from(msg), OpEnum::Subscribe(op))
.await?;
}
Ok(())
}
}
}
#[derive(Debug)]
pub(super) enum InitialRequest {
LocallyComplete { key: ContractKey },
NoHostingPeers,
PeerNotJoined,
NetworkRequest {
target: PeerKeyLocation,
target_addr: std::net::SocketAddr,
visited: super::VisitedPeers,
alternatives: Vec<PeerKeyLocation>,
htl: usize,
},
}
pub(super) async fn prepare_initial_request(
op_manager: &OpManager,
id: Transaction,
instance_id: ContractInstanceId,
is_renewal: bool,
) -> Result<InitialRequest, OpError> {
let own_addr = match op_manager.ring.connection_manager.peer_addr() {
Ok(addr) => addr,
Err(_) => {
if let Some(key) = super::has_contract(op_manager, instance_id).await? {
tracing::info!(
tx = %id,
contract = %key,
phase = "local_complete",
"Peer not joined, but contract available locally - completing subscription locally"
);
return Ok(InitialRequest::LocallyComplete { key });
}
tracing::warn!(
tx = %id,
contract = %instance_id,
phase = "peer_not_joined",
"Cannot subscribe: peer has not joined network yet and contract not available locally"
);
return Ok(InitialRequest::PeerNotJoined);
}
};
let mut visited = super::VisitedPeers::new(&id);
visited.mark_visited(own_addr);
let mut candidates =
op_manager
.ring
.k_closest_potentially_hosting(&instance_id, &visited, MAX_BREADTH);
let target = if !candidates.is_empty() {
candidates.remove(0)
} else {
let connections = op_manager
.ring
.connection_manager
.get_connections_by_location();
let mut sorted_keys: Vec<_> = connections.keys().collect();
sorted_keys.sort();
let fallback_target = sorted_keys
.into_iter()
.filter_map(|loc| connections.get(loc))
.flatten()
.find(|conn| {
conn.location
.socket_addr()
.map(|addr| !visited.probably_visited(addr))
.unwrap_or(false)
})
.map(|conn| conn.location.clone());
match fallback_target {
Some(target) => {
tracing::debug!(
tx = %id,
contract = %instance_id,
target = ?target.socket_addr(),
phase = "fallback_routing",
"Using fallback connection for subscription (k_closest returned empty)"
);
target
}
None => {
if let Some(key) = super::has_contract(op_manager, instance_id).await? {
tracing::info!(
tx = %id,
contract = %key,
phase = "local_complete",
"Contract available locally and no network connections, completing subscription locally"
);
return Ok(InitialRequest::LocallyComplete { key });
}
tracing::warn!(tx = %id, contract = %instance_id, phase = "error", "No remote peers available for subscription");
return Ok(InitialRequest::NoHostingPeers);
}
}
};
let target_addr = target
.socket_addr()
.expect("target must have socket address");
visited.mark_visited(target_addr);
tracing::debug!(
tx = %id,
contract = %instance_id,
is_renewal,
target_peer = %target_addr,
"subscribe: forwarding Request to target peer"
);
if let Some(event) = NetEventLog::subscribe_request(
&id,
&op_manager.ring,
instance_id,
target.clone(),
op_manager.ring.max_hops_to_live,
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
Ok(InitialRequest::NetworkRequest {
target,
target_addr,
visited,
alternatives: candidates,
htl: op_manager.ring.max_hops_to_live,
})
}
async fn complete_local_subscription(
op_manager: &OpManager,
id: Transaction,
key: ContractKey,
is_renewal: bool,
) -> Result<(), OpError> {
tracing::debug!(
%key,
tx = %id,
is_renewal,
"Local subscription completed - client will receive updates via executor notification channel"
);
if !is_renewal {
let became_interested = op_manager.interest_manager.add_local_client(&key);
if became_interested {
super::broadcast_change_interests(op_manager, vec![key], vec![]).await;
}
}
op_manager
.notify_node_event(crate::message::NodeEvent::LocalSubscribeComplete {
tx: id,
key,
subscribed: true,
is_renewal,
})
.await?;
op_manager.completed(id);
Ok(())
}
struct SubscribeStats {
target_peer: crate::ring::PeerKeyLocation,
contract_location: Location,
request_sent_at: Instant,
}
pub(crate) struct SubscribeOp {
pub id: Transaction,
state: SubscribeState,
requester_addr: Option<std::net::SocketAddr>,
requester_pub_key: Option<crate::transport::TransportPublicKey>,
is_renewal: bool,
stats: Option<SubscribeStats>,
pub(crate) ack_received: bool,
pub(crate) speculative_paths: u8,
}
impl SubscribeOp {
pub(crate) fn instance_id(&self) -> Option<ContractInstanceId> {
match &self.state {
SubscribeState::PrepareRequest(data) => Some(data.instance_id),
SubscribeState::AwaitingResponse(data) => Some(data.instance_id),
SubscribeState::Completed(_) | SubscribeState::Failed => None,
}
}
pub(crate) fn completed_key(&self) -> Option<ContractKey> {
match &self.state {
SubscribeState::Completed(data) => Some(data.key),
SubscribeState::PrepareRequest(_)
| SubscribeState::AwaitingResponse(_)
| SubscribeState::Failed => None,
}
}
pub(crate) fn is_renewal(&self) -> bool {
self.is_renewal
}
pub(super) fn outcome(&self) -> OpOutcome<'_> {
if self.finalized() {
if let Some(ref stats) = self.stats {
let response_time = stats.request_sent_at.elapsed();
return OpOutcome::ContractOpSuccess {
target_peer: &stats.target_peer,
contract_location: stats.contract_location,
first_response_time: response_time,
payload_size: 0,
payload_transfer_time: std::time::Duration::ZERO,
};
}
return OpOutcome::Irrelevant;
}
if let Some(ref stats) = self.stats {
OpOutcome::ContractOpFailure {
target_peer: &stats.target_peer,
contract_location: stats.contract_location,
}
} else {
OpOutcome::Incomplete
}
}
pub(crate) fn failure_routing_info(&self) -> Option<(crate::ring::PeerKeyLocation, Location)> {
self.stats
.as_ref()
.map(|s| (s.target_peer.clone(), s.contract_location))
}
pub(super) fn finalized(&self) -> bool {
matches!(self.state, SubscribeState::Completed(_))
}
pub(super) fn to_host_result(&self) -> HostResult {
if let SubscribeState::Completed(data) = &self.state {
Ok(HostResponse::ContractResponse(
ContractResponse::SubscribeResponse {
key: data.key,
subscribed: true,
},
))
} else {
Err(ErrorKind::OperationError {
cause: "subscribe didn't finish successfully".into(),
}
.into())
}
}
pub(crate) fn get_next_hop_addr(&self) -> Option<std::net::SocketAddr> {
match &self.state {
SubscribeState::AwaitingResponse(data) => data.next_hop,
SubscribeState::PrepareRequest(_)
| SubscribeState::Completed(_)
| SubscribeState::Failed => None,
}
}
pub(crate) fn is_originator(&self) -> bool {
self.requester_addr.is_none()
}
pub(crate) fn retry_with_next_alternative(
mut self,
max_hops_to_live: usize,
fallback_peers: &[PeerKeyLocation],
) -> Result<(SubscribeOp, SubscribeMsg), Box<SubscribeOp>> {
match self.state {
SubscribeState::AwaitingResponse(ref mut data) => {
if data.alternatives.is_empty() && !fallback_peers.is_empty() {
for peer in fallback_peers {
if let Some(addr) = peer.socket_addr() {
if !data.tried_peers.contains(&addr)
&& !data.visited.probably_visited(addr)
{
data.alternatives.push(peer.clone());
}
}
}
if !data.alternatives.is_empty() {
tracing::info!(
tx = %self.id,
contract = %data.instance_id,
new_candidates = data.alternatives.len(),
"Subscribe broadening search to all connected peers (fallback)"
);
}
}
if data.alternatives.is_empty() {
return Err(Box::new(self));
}
let SubscribeState::AwaitingResponse(mut data) =
std::mem::replace(&mut self.state, SubscribeState::Failed)
else {
unreachable!();
};
let instance_id = data.instance_id;
let is_renewal = self.is_renewal;
let (next_target, addr) = loop {
if data.alternatives.is_empty() {
self.state = SubscribeState::AwaitingResponse(data);
return Err(Box::new(self));
}
let candidate = data.alternatives.remove(0);
if let Some(addr) = candidate.socket_addr() {
break (candidate, addr);
}
};
data.tried_peers.insert(addr);
data.visited.mark_visited(addr);
data.next_hop = Some(addr);
data.attempts_at_hop += 1;
let visited = data.visited.clone();
tracing::info!(
tx = %self.id,
contract = %instance_id,
target = ?next_target.socket_addr(),
remaining_alternatives = data.alternatives.len(),
"Subscribe retrying with alternative peer after timeout"
);
self.stats = Some(SubscribeStats {
target_peer: next_target,
contract_location: Location::from(&instance_id),
request_sent_at: Instant::now(),
});
let retry_htl = (max_hops_to_live / (data.attempts_at_hop.max(1)))
.max(MIN_RETRY_HTL)
.min(max_hops_to_live);
self.state = SubscribeState::AwaitingResponse(data);
let msg = SubscribeMsg::Request {
id: self.id,
instance_id,
htl: retry_htl,
visited,
is_renewal,
};
Ok((self, msg))
}
SubscribeState::PrepareRequest(_)
| SubscribeState::Completed(_)
| SubscribeState::Failed => Err(Box::new(self)),
}
}
fn not_found_result(
id: Transaction,
instance_id: ContractInstanceId,
requester_addr: Option<std::net::SocketAddr>,
reason: &str,
) -> Result<OperationResult, OpError> {
if let Some(requester_addr) = requester_addr {
tracing::debug!(
tx = %id,
%instance_id,
requester = %requester_addr,
%reason,
"Sending NotFound response to requester"
);
Ok(OperationResult::SendAndComplete {
msg: NetMessage::from(SubscribeMsg::Response {
id,
instance_id,
result: SubscribeMsgResult::NotFound,
}),
next_hop: Some(requester_addr),
stream_data: None,
})
} else {
tracing::warn!(
tx = %id,
%instance_id,
%reason,
phase = "not_found",
"Subscribe failed at originator"
);
Err(RingError::NoHostingPeers(instance_id).into())
}
}
pub(crate) async fn handle_abort(self, op_manager: &OpManager) -> Result<(), OpError> {
let tx_id = self.id;
let requester_addr = self.requester_addr;
let requester_pub_key = self.requester_pub_key;
let is_renewal = self.is_renewal;
let stats = self.stats;
let is_sub_op = op_manager.is_sub_operation(tx_id);
tracing::debug!(
tx = %tx_id,
?requester_addr,
"Subscribe operation aborted due to connection failure"
);
if let SubscribeState::AwaitingResponse(AwaitingResponseData {
next_hop: failed_hop,
instance_id,
retries,
current_hop,
mut tried_peers,
mut alternatives,
attempts_at_hop,
mut visited,
}) = self.state
{
if let Some(addr) = failed_hop {
tried_peers.insert(addr);
visited.mark_visited(addr);
}
if !alternatives.is_empty() && attempts_at_hop < MAX_BREADTH {
let next_target = alternatives.remove(0);
if let Some(next_addr) = next_target.socket_addr() {
tried_peers.insert(next_addr);
visited.mark_visited(next_addr);
tracing::debug!(
tx = %tx_id,
%instance_id,
next_target = %next_addr,
"Subscribe: connection aborted, trying alternative peer"
);
let msg = SubscribeMsg::Request {
id: tx_id,
instance_id,
htl: current_hop,
visited: visited.clone(),
is_renewal,
};
let op = SubscribeOp {
id: tx_id,
state: SubscribeState::AwaitingResponse(AwaitingResponseData {
next_hop: Some(next_addr),
instance_id,
retries,
current_hop,
tried_peers,
alternatives,
attempts_at_hop: attempts_at_hop + 1,
visited,
}),
requester_addr,
requester_pub_key,
is_renewal,
stats: stats.map(|mut s| {
s.target_peer = next_target.clone();
s
}),
ack_received: false,
speculative_paths: 0,
};
op_manager
.notify_op_change(NetMessage::from(msg), OpEnum::Subscribe(op))
.await?;
return Err(OpError::StatePushed);
}
}
if retries < MAX_RETRIES {
for addr in &tried_peers {
visited.mark_visited(*addr);
}
let mut new_candidates = op_manager.ring.k_closest_potentially_hosting(
&instance_id,
&visited,
MAX_BREADTH,
);
if !new_candidates.is_empty() {
let next_target = new_candidates.remove(0);
if let Some(next_addr) = next_target.socket_addr() {
let mut new_tried_peers = HashSet::new();
new_tried_peers.insert(next_addr);
visited.mark_visited(next_addr);
tracing::debug!(
tx = %tx_id,
%instance_id,
next_target = %next_addr,
retries = retries + 1,
"Subscribe: connection aborted, found new candidate"
);
let msg = SubscribeMsg::Request {
id: tx_id,
instance_id,
htl: current_hop,
visited: visited.clone(),
is_renewal,
};
let op = SubscribeOp {
id: tx_id,
state: SubscribeState::AwaitingResponse(AwaitingResponseData {
next_hop: Some(next_addr),
instance_id,
retries: retries + 1,
current_hop,
tried_peers: new_tried_peers,
alternatives: new_candidates,
attempts_at_hop: 1,
visited,
}),
requester_addr,
requester_pub_key,
is_renewal,
stats: stats.map(|mut s| {
s.target_peer = next_target.clone();
s
}),
ack_received: false,
speculative_paths: 0,
};
op_manager
.notify_op_change(NetMessage::from(msg), OpEnum::Subscribe(op))
.await?;
return Err(OpError::StatePushed);
}
}
}
if let Some(req_addr) = requester_addr {
tracing::warn!(
tx = %tx_id,
%instance_id,
requester = %req_addr,
phase = "not_found",
"Subscribe aborted (retries exhausted) - sending NotFound upstream"
);
if let Some(event) =
NetEventLog::subscribe_not_found(&tx_id, &op_manager.ring, instance_id, None)
{
op_manager.ring.register_events(Either::Left(event)).await;
}
let response_op = SubscribeOp {
id: tx_id,
state: SubscribeState::Failed,
requester_addr,
requester_pub_key,
is_renewal,
stats,
ack_received: false,
speculative_paths: 0,
};
op_manager
.notify_op_change(
NetMessage::from(SubscribeMsg::Response {
id: tx_id,
instance_id,
result: SubscribeMsgResult::NotFound,
}),
OpEnum::Subscribe(response_op),
)
.await?;
return Err(OpError::StatePushed);
}
notify_abort_failure(op_manager, tx_id, is_sub_op, is_renewal, &instance_id).await;
op_manager.completed(tx_id);
return Ok(());
}
op_manager.completed(tx_id);
Ok(())
}
}
async fn notify_abort_failure(
op_manager: &OpManager,
tx_id: Transaction,
is_sub_op: bool,
is_renewal: bool,
instance_id: &ContractInstanceId,
) {
if is_sub_op {
let reason = format!(
"Subscription failed for contract {}: peer connection dropped",
instance_id
);
if let Err(e) = op_manager
.notify_contract_handler(
crate::contract::ContractHandlerEvent::NotifySubscriptionError {
key: *instance_id,
reason,
},
)
.await
{
tracing::debug!(
tx = %tx_id,
contract = %instance_id,
error = %e,
"Failed to send subscription abort error to notification channels"
);
}
} else if is_renewal {
tracing::debug!(
tx = %tx_id,
"Subscription renewal aborted, no client to notify"
);
} else {
let error_result: crate::client_events::HostResult =
Err(freenet_stdlib::client_api::ErrorKind::OperationError {
cause: "Subscribe operation failed: peer connection dropped".into(),
}
.into());
if let Err(err) = op_manager.result_router_tx.try_send((tx_id, error_result)) {
tracing::error!(
tx = %tx_id,
error = %err,
"Failed to send abort notification to client \
(result router channel full or closed)"
);
}
}
}
pub(crate) async fn register_downstream_subscriber(
op_manager: &OpManager,
key: &ContractKey,
requester_addr: std::net::SocketAddr,
requester_pub_key: Option<&crate::transport::TransportPublicKey>,
source_addr: Option<std::net::SocketAddr>,
tx: &Transaction,
warn_suffix: &str,
) {
let peer_key = requester_pub_key
.map(|pk| crate::ring::interest::PeerKey::from(pk.clone()))
.or_else(|| {
op_manager
.ring
.connection_manager
.get_peer_by_addr(requester_addr)
.or_else(|| {
source_addr
.and_then(|sa| op_manager.ring.connection_manager.get_peer_by_addr(sa))
})
.map(|pkl| crate::ring::interest::PeerKey::from(pkl.pub_key.clone()))
});
if let Some(peer_key) = peer_key {
if op_manager
.ring
.add_downstream_subscriber(key, peer_key.clone())
{
let is_new_peer = op_manager
.interest_manager
.register_peer_interest(key, peer_key, None, false);
if is_new_peer {
let became_interested = op_manager.interest_manager.add_downstream_subscriber(key);
if became_interested {
super::broadcast_change_interests(op_manager, vec![*key], vec![]).await;
}
}
} else {
tracing::warn!(
tx = %tx,
contract = %key,
"Downstream subscriber limit reached — skipping peer interest registration"
);
}
} else {
tracing::warn!(
tx = %tx,
contract = %key,
requester_addr = %requester_addr,
source_addr = ?source_addr,
"Subscribe: could not find peer to register interest{}",
warn_suffix
);
}
}
impl Operation for SubscribeOp {
type Message = SubscribeMsg;
type Result = SubscribeResult;
async fn load_or_init<'a>(
op_manager: &'a OpManager,
msg: &'a Self::Message,
source_addr: Option<std::net::SocketAddr>,
) -> Result<OpInitialization<Self>, OpError> {
let id = *msg.id();
let msg_type = match msg {
SubscribeMsg::Request { .. } => "Request",
SubscribeMsg::Response { .. } => "Response",
SubscribeMsg::Unsubscribe { .. } => "Unsubscribe",
SubscribeMsg::ForwardingAck { .. } => "ForwardingAck",
};
tracing::debug!(
tx = %id,
%msg_type,
source_addr = ?source_addr,
"LOAD_OR_INIT_ENTRY: entering load_or_init for Subscribe"
);
match op_manager.pop(msg.id()) {
Ok(Some(OpEnum::Subscribe(subscribe_op))) => {
tracing::debug!(
tx = %id,
%msg_type,
"LOAD_OR_INIT_POPPED: found existing Subscribe operation"
);
Ok(OpInitialization {
op: subscribe_op,
source_addr,
})
}
Ok(Some(op)) => {
if let Err(e) = op_manager.push(id, op).await {
tracing::warn!(tx = %id, error = %e, "failed to push mismatched op back");
}
Err(OpError::OpNotPresent(id))
}
Ok(None) => {
if matches!(
msg,
SubscribeMsg::Response { .. } | SubscribeMsg::ForwardingAck { .. }
) {
tracing::debug!(
tx = %id,
%msg_type,
phase = "load_or_init",
"SUBSCRIBE_OP_MISSING: response/ack arrived for non-existent operation"
);
return Err(OpError::OpNotPresent(id));
}
let (is_renewal, msg_instance_id) = match msg {
SubscribeMsg::Request {
is_renewal,
instance_id,
..
} => (*is_renewal, *instance_id),
SubscribeMsg::Unsubscribe { instance_id, .. } => (false, *instance_id),
SubscribeMsg::Response { .. } | SubscribeMsg::ForwardingAck { .. } => {
unreachable!("Response/ForwardingAck handled above")
}
};
let requester_pub_key = source_addr.and_then(|addr| {
op_manager
.ring
.connection_manager
.get_peer_by_addr(addr)
.map(|pkl| pkl.pub_key().clone())
});
Ok(OpInitialization {
op: Self {
id,
state: SubscribeState::AwaitingResponse(AwaitingResponseData {
next_hop: None, instance_id: msg_instance_id,
retries: 0,
current_hop: 0,
tried_peers: HashSet::new(),
alternatives: Vec::new(),
attempts_at_hop: 0,
visited: super::VisitedPeers::new(&id),
}),
requester_addr: source_addr, requester_pub_key,
is_renewal,
stats: None,
ack_received: false,
speculative_paths: 0,
},
source_addr,
})
}
Err(err) => Err(err.into()),
}
}
fn id(&self) -> &Transaction {
&self.id
}
fn process_message<'a, NB: NetworkBridge>(
self,
conn_manager: &'a mut NB,
op_manager: &'a OpManager,
input: &'a Self::Message,
source_addr: Option<std::net::SocketAddr>,
) -> Pin<Box<dyn Future<Output = Result<OperationResult, OpError>> + Send + 'a>> {
Box::pin(async move {
let id = self.id;
match input {
SubscribeMsg::Request {
id,
instance_id,
htl,
visited,
is_renewal,
} => {
tracing::debug!(
tx = %id,
%instance_id,
htl,
is_renewal,
requester_addr = ?self.requester_addr,
"subscribe: processing Request"
);
if let Some(key) = super::has_contract(op_manager, *instance_id).await? {
if let Some(requester_addr) = self.requester_addr {
register_downstream_subscriber(
op_manager,
&key,
requester_addr,
self.requester_pub_key.as_ref(),
source_addr,
id,
"",
)
.await;
tracing::info!(tx = %id, contract = %key, is_renewal, phase = "response", "Subscription fulfilled, sending Response");
return Ok(OperationResult::SendAndComplete {
msg: NetMessage::from(SubscribeMsg::Response {
id: *id,
instance_id: *instance_id,
result: SubscribeMsgResult::Subscribed { key },
}),
next_hop: Some(requester_addr),
stream_data: None,
});
} else {
tracing::info!(tx = %id, contract = %key, phase = "complete", "Subscribe completed (originator has contract locally)");
return Ok(OperationResult::ContinueOp(OpEnum::Subscribe(
SubscribeOp {
id: *id,
state: SubscribeState::Completed(CompletedData { key }),
requester_addr: None,
requester_pub_key: None,
is_renewal: self.is_renewal,
stats: self.stats,
ack_received: false,
speculative_paths: 0,
},
)));
}
}
if let Some(key) = wait_for_contract_with_timeout(
op_manager,
*instance_id,
CONTRACT_WAIT_TIMEOUT_MS,
)
.await?
{
if let Some(requester_addr) = self.requester_addr {
register_downstream_subscriber(
op_manager,
&key,
requester_addr,
self.requester_pub_key.as_ref(),
source_addr,
id,
" (after contract wait)",
)
.await;
return Ok(OperationResult::SendAndComplete {
msg: NetMessage::from(SubscribeMsg::Response {
id: *id,
instance_id: *instance_id,
result: SubscribeMsgResult::Subscribed { key },
}),
next_hop: Some(requester_addr),
stream_data: None,
});
} else {
tracing::info!(tx = %id, contract = %key, phase = "complete", "Subscribe completed (originator, contract arrived after wait)");
return Ok(OperationResult::ContinueOp(OpEnum::Subscribe(
SubscribeOp {
id: *id,
state: SubscribeState::Completed(CompletedData { key }),
requester_addr: None,
requester_pub_key: None,
is_renewal: self.is_renewal,
stats: self.stats,
ack_received: false,
speculative_paths: 0,
},
)));
}
}
if *htl == 0 {
tracing::warn!(tx = %id, contract = %instance_id, htl = 0, phase = "not_found", "Subscribe request exhausted HTL");
if self.requester_addr.is_some() {
if let Some(event) = NetEventLog::subscribe_not_found(
id,
&op_manager.ring,
*instance_id,
Some(op_manager.ring.max_hops_to_live),
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
}
return Self::not_found_result(
*id,
*instance_id,
self.requester_addr,
"HTL exhausted",
);
}
let own_addr = op_manager.ring.connection_manager.peer_addr()?;
let mut new_visited = visited.clone().with_transaction(id);
new_visited.mark_visited(own_addr);
if let Some(requester) = self.requester_addr {
new_visited.mark_visited(requester);
}
let mut candidates = op_manager.ring.k_closest_potentially_hosting(
instance_id,
&new_visited,
MAX_BREADTH,
);
if candidates.is_empty() {
tracing::warn!(tx = %id, contract = %instance_id, phase = "not_found", "No closer peers to forward subscribe request");
if self.requester_addr.is_some() {
if let Some(event) = NetEventLog::subscribe_not_found(
id,
&op_manager.ring,
*instance_id,
None,
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
}
return Self::not_found_result(
*id,
*instance_id,
self.requester_addr,
"no closer peers available",
);
}
let next_hop = candidates.remove(0);
let next_hop_known = match KnownPeerKeyLocation::try_from(&next_hop) {
Ok(known) => known,
Err(e) => {
tracing::error!(
tx = %id,
pub_key = %e.pub_key,
"INTERNAL ERROR: next_hop has unknown address - routing selected peer without address"
);
return Self::not_found_result(
*id,
*instance_id,
self.requester_addr,
"next hop has unknown address",
);
}
};
let next_addr = next_hop_known.socket_addr();
new_visited.mark_visited(next_addr);
let new_htl = htl.saturating_sub(1);
let mut tried_peers = HashSet::new();
tried_peers.insert(next_addr);
tracing::debug!(tx = %id, %instance_id, next = %next_addr, alternatives = candidates.len(), is_renewal, "Forwarding subscribe request");
if let Some(event) = NetEventLog::subscribe_request(
id,
&op_manager.ring,
*instance_id,
next_hop.clone(),
new_htl,
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
if let Some(requester) = self.requester_addr {
let ack = NetMessage::from(SubscribeMsg::ForwardingAck {
id: *id,
instance_id: *instance_id,
});
drop(conn_manager.send(requester, ack).await);
}
Ok(OperationResult::SendAndContinue {
msg: NetMessage::from(SubscribeMsg::Request {
id: *id,
instance_id: *instance_id,
htl: new_htl,
visited: new_visited.clone(),
is_renewal: *is_renewal,
}),
next_hop: Some(next_addr),
state: OpEnum::Subscribe(SubscribeOp {
id: *id,
state: SubscribeState::AwaitingResponse(AwaitingResponseData {
next_hop: None,
instance_id: *instance_id,
retries: 0,
current_hop: new_htl,
tried_peers,
alternatives: candidates,
attempts_at_hop: 1,
visited: new_visited,
}),
requester_addr: self.requester_addr,
requester_pub_key: self.requester_pub_key,
is_renewal: self.is_renewal,
stats: Some(SubscribeStats {
target_peer: next_hop.clone(),
contract_location: Location::from(instance_id),
request_sent_at: Instant::now(),
}),
ack_received: false,
speculative_paths: 0,
}),
stream_data: None,
})
}
SubscribeMsg::Response {
id: msg_id,
instance_id,
result,
} => {
tracing::debug!(
tx = %msg_id,
%instance_id,
requester_addr = ?self.requester_addr,
source_addr = ?source_addr,
"SUBSCRIBE_RESPONSE_ENTRY: entered Response handler"
);
match result {
SubscribeMsgResult::Subscribed { key } => {
tracing::debug!(
tx = %msg_id,
%key,
requester_addr = ?self.requester_addr,
source_addr = ?source_addr,
"subscribe: processing Subscribed response"
);
if let Some(requester_addr) = self.requester_addr {
register_downstream_subscriber(
op_manager,
key,
requester_addr,
self.requester_pub_key.as_ref(),
None,
msg_id,
" (relay registration on Response)",
)
.await;
tracing::debug!(tx = %msg_id, %key, requester = %requester_addr, "Forwarding Subscribed response to requester");
return Ok(OperationResult::SendAndComplete {
msg: NetMessage::from(SubscribeMsg::Response {
id: *msg_id,
instance_id: *instance_id,
result: SubscribeMsgResult::Subscribed { key: *key },
}),
next_hop: Some(requester_addr),
stream_data: None,
});
}
op_manager.ring.subscribe(*key);
op_manager.ring.complete_subscription_request(key, true);
tracing::info!(
tx = %msg_id,
contract = %format!("{:.8}", key),
"SUBSCRIPTION_ACCEPTED: registered lease-based subscription"
);
crate::node::network_status::record_subscription(format!("{key}"));
if let Err(e) = fetch_contract_if_missing(op_manager, *key.id()).await {
tracing::debug!(
tx = %msg_id,
contract = %format!("{:.8}", key),
error = ?e,
"fetch_contract_if_missing failed, will receive state via UPDATE"
);
}
super::announce_contract_hosted(op_manager, key).await;
if let Some(resp_addr) = source_addr {
if let Some(pkl) = op_manager
.ring
.connection_manager
.get_peer_by_addr(resp_addr)
{
let peer_key =
crate::ring::interest::PeerKey::from(pkl.pub_key.clone());
op_manager
.interest_manager
.register_peer_interest(key, peer_key, None, true);
}
}
tracing::info!(tx = %msg_id, contract = %key, phase = "complete", "Subscribe completed (originator)");
if !self.is_renewal {
let became_interested =
op_manager.interest_manager.add_local_client(key);
if became_interested {
super::broadcast_change_interests(
op_manager,
vec![*key],
vec![],
)
.await;
}
}
let own_loc = op_manager.ring.connection_manager.own_location();
if let Some(event) = NetEventLog::subscribe_success(
msg_id,
&op_manager.ring,
*key,
own_loc,
None, ) {
op_manager.ring.register_events(Either::Left(event)).await;
}
Ok(OperationResult::ContinueOp(OpEnum::Subscribe(
SubscribeOp {
id,
state: SubscribeState::Completed(CompletedData { key: *key }),
requester_addr: None,
requester_pub_key: None,
is_renewal: self.is_renewal,
stats: self.stats,
ack_received: false,
speculative_paths: 0,
},
)))
}
SubscribeMsgResult::NotFound => {
tracing::debug!(
tx = %msg_id,
%instance_id,
requester_addr = ?self.requester_addr,
source_addr = ?source_addr,
"subscribe: processing NotFound response"
);
let (
retries,
current_hop,
mut tried_peers,
mut alternatives,
attempts_at_hop,
mut visited,
) = if let SubscribeState::AwaitingResponse(ref data) = self.state {
(
data.retries,
data.current_hop,
data.tried_peers.clone(),
data.alternatives.clone(),
data.attempts_at_hop,
data.visited.clone(),
)
} else {
(
0,
0,
HashSet::new(),
Vec::new(),
0,
super::VisitedPeers::new(msg_id),
)
};
if let Some(addr) = source_addr {
tried_peers.insert(addr);
visited.mark_visited(addr);
}
if !alternatives.is_empty() && attempts_at_hop < MAX_BREADTH {
let next_target = alternatives.remove(0);
if let Some(next_addr) = next_target.socket_addr() {
tried_peers.insert(next_addr);
visited.mark_visited(next_addr);
tracing::info!(
tx = %msg_id,
%instance_id,
peer_addr = %next_addr,
attempts_at_hop = attempts_at_hop + 1,
max_attempts = MAX_BREADTH,
phase = "retry",
"Subscribe: trying alternative peer at same hop"
);
return Ok(OperationResult::SendAndContinue {
msg: NetMessage::from(SubscribeMsg::Request {
id: *msg_id,
instance_id: *instance_id,
htl: current_hop,
visited: visited.clone(),
is_renewal: self.is_renewal,
}),
next_hop: Some(next_addr),
state: OpEnum::Subscribe(SubscribeOp {
id,
state: SubscribeState::AwaitingResponse(
AwaitingResponseData {
next_hop: Some(next_addr),
instance_id: *instance_id,
retries,
current_hop,
tried_peers,
alternatives,
attempts_at_hop: attempts_at_hop + 1,
visited,
},
),
requester_addr: self.requester_addr,
requester_pub_key: self.requester_pub_key,
is_renewal: self.is_renewal,
stats: self.stats.map(|mut s| {
s.target_peer = next_target.clone();
s
}),
ack_received: false,
speculative_paths: 0,
}),
stream_data: None,
});
}
}
if retries < MAX_RETRIES {
for addr in &tried_peers {
visited.mark_visited(*addr);
}
let mut new_candidates =
op_manager.ring.k_closest_potentially_hosting(
instance_id,
&visited,
MAX_BREADTH,
);
if !new_candidates.is_empty() {
let next_target = new_candidates.remove(0);
if let Some(next_addr) = next_target.socket_addr() {
let mut new_tried_peers = HashSet::new();
new_tried_peers.insert(next_addr);
visited.mark_visited(next_addr);
tracing::info!(
tx = %msg_id,
%instance_id,
peer_addr = %next_addr,
retries = retries + 1,
max_retries = MAX_RETRIES,
new_candidates = new_candidates.len(),
phase = "retry",
"Subscribe: seeking new candidates after exhausted alternatives"
);
return Ok(OperationResult::SendAndContinue {
msg: NetMessage::from(SubscribeMsg::Request {
id: *msg_id,
instance_id: *instance_id,
htl: current_hop,
visited: visited.clone(),
is_renewal: self.is_renewal,
}),
next_hop: Some(next_addr),
state: OpEnum::Subscribe(SubscribeOp {
id,
state: SubscribeState::AwaitingResponse(
AwaitingResponseData {
next_hop: Some(next_addr),
instance_id: *instance_id,
retries: retries + 1,
current_hop,
tried_peers: new_tried_peers,
alternatives: new_candidates,
attempts_at_hop: 1,
visited,
},
),
requester_addr: self.requester_addr,
requester_pub_key: self.requester_pub_key,
is_renewal: self.is_renewal,
stats: self.stats.map(|mut s| {
s.target_peer = next_target.clone();
s
}),
ack_received: false,
speculative_paths: 0,
}),
stream_data: None,
});
}
}
}
if let Some(requester_addr) = self.requester_addr {
tracing::debug!(tx = %msg_id, %instance_id, requester = %requester_addr, "Forwarding NotFound response to requester (retries exhausted)");
Ok(OperationResult::SendAndComplete {
msg: NetMessage::from(SubscribeMsg::Response {
id: *msg_id,
instance_id: *instance_id,
result: SubscribeMsgResult::NotFound,
}),
next_hop: Some(requester_addr),
stream_data: None,
})
} else {
let local_contract = match op_manager
.notify_contract_handler(ContractHandlerEvent::GetQuery {
instance_id: *instance_id,
return_contract_code: true,
})
.await
{
Ok(ContractHandlerEvent::GetResponse {
key: Some(key),
response:
Ok(StoreResponse {
state: Some(state),
contract,
}),
}) => Some((key, state, contract)),
_ => None,
};
if let Some((key, state, contract)) = local_contract {
tracing::info!(
tx = %msg_id,
contract = %key,
phase = "reseed",
"Subscribe: Network returned NotFound, re-hosting from local cache"
);
if let Some(contract_code) = contract {
let put_result = op_manager
.notify_contract_handler(
ContractHandlerEvent::PutQuery {
key,
state,
related_contracts: RelatedContracts::default(),
contract: Some(contract_code),
},
)
.await;
match put_result {
Ok(ContractHandlerEvent::PutResponse {
new_value: Ok(_),
..
}) => {
tracing::debug!(tx = %msg_id, %key, "Re-hosted contract to network");
super::announce_contract_hosted(op_manager, &key)
.await;
}
_ => {
tracing::warn!(tx = %msg_id, %key, "Failed to re-host contract");
}
}
}
let own_loc = op_manager.ring.connection_manager.own_location();
if let Some(event) = NetEventLog::subscribe_success(
msg_id,
&op_manager.ring,
key,
own_loc,
None, ) {
op_manager.ring.register_events(Either::Left(event)).await;
}
Ok(OperationResult::ContinueOp(OpEnum::Subscribe(
SubscribeOp {
id,
state: SubscribeState::Completed(CompletedData { key }),
requester_addr: None,
requester_pub_key: None,
is_renewal: self.is_renewal,
stats: self.stats,
ack_received: false,
speculative_paths: 0,
},
)))
} else {
tracing::warn!(tx = %msg_id, %instance_id, phase = "not_found", "Subscribe failed - contract not found");
let reason = format!(
"Subscription failed: contract {} not found in network",
instance_id
);
if let Err(e) = op_manager
.notify_contract_handler(
ContractHandlerEvent::NotifySubscriptionError {
key: *instance_id,
reason,
},
)
.await
{
tracing::debug!(
contract = %instance_id,
error = %e,
"Failed to send subscription error to client notification channels"
);
}
if self.requester_addr.is_some() {
if let Some(event) = NetEventLog::subscribe_not_found(
msg_id,
&op_manager.ring,
*instance_id,
None, ) {
op_manager
.ring
.register_events(Either::Left(event))
.await;
}
}
Ok(OperationResult::ContinueOp(OpEnum::Subscribe(
SubscribeOp {
id,
state: SubscribeState::Failed,
requester_addr: None,
requester_pub_key: None,
is_renewal: self.is_renewal,
stats: self.stats,
ack_received: false,
speculative_paths: 0,
},
)))
}
}
}
}
}
SubscribeMsg::Unsubscribe { id, instance_id } => {
tracing::debug!(
tx = %id,
%instance_id,
source_addr = ?source_addr,
"received unsubscribe notification"
);
let sender_peer = self
.requester_pub_key
.as_ref()
.map(|pk| crate::ring::interest::PeerKey::from(pk.clone()))
.or_else(|| {
source_addr.and_then(|addr| {
op_manager
.ring
.connection_manager
.get_peer_by_addr(addr)
.map(|pkl| {
crate::ring::interest::PeerKey::from(pkl.pub_key.clone())
})
})
});
if let Some(key) = super::has_contract(op_manager, *instance_id).await? {
if let Some(peer) = &sender_peer {
let was_downstream =
op_manager.ring.remove_downstream_subscriber(&key, peer);
let was_interested =
op_manager.interest_manager.remove_peer_interest(&key, peer);
if was_downstream || was_interested {
let lost_interest = op_manager
.interest_manager
.remove_downstream_subscriber(&key);
if lost_interest {
super::broadcast_change_interests(
op_manager,
vec![],
vec![key],
)
.await;
}
}
} else {
tracing::warn!(
tx = %id,
%instance_id,
source_addr = ?source_addr,
"Unsubscribe: could not resolve sender peer, downstream entry not removed"
);
}
if op_manager.ring.should_unsubscribe_upstream(&key) {
tracing::debug!(
tx = %id,
contract = %key,
"No remaining subscribers, propagating unsubscribe upstream"
);
op_manager.send_unsubscribe_upstream(&key).await;
} else {
tracing::debug!(
tx = %id,
contract = %key,
"Still have subscribers, not propagating unsubscribe"
);
}
} else {
tracing::debug!(
tx = %id,
%instance_id,
"Contract not found locally, ignoring unsubscribe"
);
}
Ok(OperationResult::Completed)
}
SubscribeMsg::ForwardingAck { id, instance_id } => {
tracing::debug!(
tx = %id,
%instance_id,
"Received forwarding ACK from downstream relay"
);
Ok(OperationResult::ContinueOp(OpEnum::Subscribe(
SubscribeOp {
ack_received: true,
..self
},
)))
}
}
})
}
}
impl IsOperationCompleted for SubscribeOp {
fn is_completed(&self) -> bool {
matches!(self.state, SubscribeState::Completed(_))
}
}
#[cfg(test)]
mod tests;
mod op_ctx_task;
pub(crate) use op_ctx_task::start_client_subscribe;
mod messages {
use std::fmt::Display;
use super::*;
#[derive(Debug, Serialize, Deserialize, Clone)]
pub(crate) enum SubscribeMsgResult {
Subscribed { key: ContractKey },
NotFound,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub(crate) enum SubscribeMsg {
Request {
id: Transaction,
instance_id: ContractInstanceId,
htl: usize,
visited: super::super::VisitedPeers,
is_renewal: bool,
},
Response {
id: Transaction,
instance_id: ContractInstanceId,
result: SubscribeMsgResult,
},
Unsubscribe {
id: Transaction,
instance_id: ContractInstanceId,
},
ForwardingAck {
id: Transaction,
instance_id: ContractInstanceId,
},
}
impl InnerMessage for SubscribeMsg {
fn id(&self) -> &Transaction {
match self {
Self::Request { id, .. }
| Self::Response { id, .. }
| Self::Unsubscribe { id, .. }
| Self::ForwardingAck { id, .. } => id,
}
}
fn requested_location(&self) -> Option<Location> {
match self {
Self::Request { instance_id, .. }
| Self::Response { instance_id, .. }
| Self::Unsubscribe { instance_id, .. }
| Self::ForwardingAck { instance_id, .. } => Some(Location::from(instance_id)),
}
}
}
impl Display for SubscribeMsg {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let id = self.id();
match self {
Self::Request { instance_id, .. } => {
write!(f, "Subscribe::Request(id: {id}, contract: {instance_id})")
}
Self::Response {
instance_id,
result,
..
} => {
let result_str = match result {
SubscribeMsgResult::Subscribed { key } => format!("Subscribed({key})"),
SubscribeMsgResult::NotFound => "NotFound".to_string(),
};
write!(
f,
"Subscribe::Response(id: {id}, instance_id: {instance_id}, result: {result_str})"
)
}
Self::Unsubscribe { instance_id, .. } => {
write!(
f,
"Subscribe::Unsubscribe(id: {id}, contract: {instance_id})"
)
}
Self::ForwardingAck { instance_id, .. } => {
write!(
f,
"Subscribe::ForwardingAck(id: {id}, instance_id: {instance_id})"
)
}
}
}
}
}