use anyhow::Context;
use either::Either;
use freenet_stdlib::{
client_api::{ClientRequest, ErrorKind},
prelude::ContractInstanceId,
};
use std::{
borrow::Cow,
fs::File,
io::Read,
net::{IpAddr, SocketAddr, ToSocketAddrs},
sync::Arc,
time::Duration,
};
use std::{collections::HashSet, convert::Infallible};
use self::p2p_impl::NodeP2P;
use crate::{
client_events::{BoxedClient, ClientEventsProxy, ClientId, OpenRequest},
config::{Address, GatewayConfig, WebsocketApiConfig},
contract::{Callback, ExecutorError, ExecutorToEventLoopChannel, NetworkContractHandler},
local_node::Executor,
message::{InnerMessage, NetMessage, NodeEvent, Transaction, TransactionType},
operations::{
OpEnum, OpError, OpOutcome,
connect::{self, ConnectOp},
get, put, subscribe, update,
},
ring::{Location, PeerKeyLocation},
router::{RouteEvent, RouteOutcome},
tracing::{EventRegister, NetEventLog, NetEventRegister},
};
use crate::{
config::Config,
message::{MessageStats, NetMessageV1},
};
use freenet_stdlib::client_api::DelegateRequest;
use serde::{Deserialize, Serialize};
use tracing::Instrument;
use crate::operations::handle_op_request;
pub(crate) use network_bridge::{
ConnectionError, EventLoopNotificationsSender, NetworkBridge, OpExecutionPayload,
};
#[cfg(test)]
pub(crate) use network_bridge::{EventLoopNotificationsReceiver, event_loop_notification_channel};
pub use network_bridge::{EventLoopExitReason, NetworkStats, reset_channel_id_counter};
use crate::topology::rate::Rate;
use crate::transport::{TransportKeypair, TransportPublicKey};
pub(crate) use op_state_manager::{OpManager, OpNotAvailable};
mod network_bridge;
pub use network_bridge::in_memory::{FaultInjectorState, get_fault_injector, set_fault_injector};
pub(crate) mod background_task_monitor;
pub(crate) mod neighbor_hosting;
pub(crate) mod network_status;
mod op_state_manager;
mod p2p_impl;
mod request_router;
pub(crate) mod testing_impl;
pub use request_router::{DeduplicatedRequest, RequestRouter};
#[derive(Clone)]
pub struct ShutdownHandle {
tx: tokio::sync::mpsc::Sender<NodeEvent>,
}
impl ShutdownHandle {
pub async fn shutdown(&self) {
if let Err(err) = self
.tx
.send(NodeEvent::Disconnect {
cause: Some("graceful shutdown".into()),
})
.await
{
tracing::debug!(
error = %err,
"failed to send graceful shutdown signal; shutdown channel may already be closed"
);
}
}
}
pub struct Node {
inner: NodeP2P,
shutdown_handle: ShutdownHandle,
}
impl Node {
pub fn update_location(&mut self, location: Location) {
self.inner
.op_manager
.ring
.connection_manager
.update_location(Some(location));
}
pub fn shutdown_handle(&self) -> ShutdownHandle {
self.shutdown_handle.clone()
}
pub async fn run(self) -> anyhow::Result<Infallible> {
self.inner.run_node().await
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[non_exhaustive] pub struct NodeConfig {
pub should_connect: bool,
pub is_gateway: bool,
pub key_pair: TransportKeypair,
pub network_listener_ip: IpAddr,
pub network_listener_port: u16,
pub(crate) own_addr: Option<SocketAddr>,
pub(crate) config: Arc<Config>,
pub(crate) gateways: Vec<InitPeerNode>,
pub(crate) location: Option<Location>,
pub(crate) max_hops_to_live: Option<usize>,
pub(crate) rnd_if_htl_above: Option<usize>,
pub(crate) max_number_conn: Option<usize>,
pub(crate) min_number_conn: Option<usize>,
pub(crate) max_upstream_bandwidth: Option<Rate>,
pub(crate) max_downstream_bandwidth: Option<Rate>,
pub(crate) blocked_addresses: Option<HashSet<SocketAddr>>,
pub(crate) transient_budget: usize,
pub(crate) transient_ttl: Duration,
#[serde(default)]
pub(crate) relay_ready_connections: Option<usize>,
}
impl NodeConfig {
pub async fn new(config: Config) -> anyhow::Result<NodeConfig> {
tracing::info!("Loading node configuration for mode {}", config.mode);
let own_pub_key = config.transport_keypair().public();
let mut gateways = Vec::with_capacity(config.gateways.len());
for gw in &config.gateways {
let GatewayConfig {
address,
public_key_path,
location,
} = gw;
let mut key_bytes = None;
for attempt in 0..10 {
let mut key_file = File::open(public_key_path).with_context(|| {
format!("failed loading gateway pubkey from {public_key_path:?}")
})?;
let mut buf = String::new();
key_file.read_to_string(&mut buf)?;
let buf = buf.trim();
if buf.starts_with("-----BEGIN") {
if attempt < 9 {
tracing::debug!(
public_key_path = ?public_key_path,
attempt = attempt + 1,
"Gateway public key is still RSA PEM format, waiting for X25519 conversion..."
);
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
continue;
} else {
tracing::warn!(
public_key_path = ?public_key_path,
"Gateway public key still in RSA PEM format after 5s. Skipping this gateway."
);
break;
}
}
match hex::decode(buf) {
Ok(bytes) if bytes.len() == 32 => {
key_bytes = Some(bytes);
break;
}
Ok(bytes) => {
anyhow::bail!(
"invalid gateway pubkey length {} (expected 32) from {public_key_path:?}",
bytes.len()
);
}
Err(e) => {
anyhow::bail!(
"failed to decode gateway pubkey hex from {public_key_path:?}: {e}"
);
}
}
}
let key_bytes = match key_bytes {
Some(bytes) => bytes,
None => continue, };
let mut key_arr = [0u8; 32];
key_arr.copy_from_slice(&key_bytes);
let transport_pub_key = TransportPublicKey::from_bytes(key_arr);
if &transport_pub_key == own_pub_key {
tracing::warn!(
"Skipping gateway with same public key as self: {:?}",
public_key_path
);
continue;
}
let address = Self::parse_socket_addr(address).await?;
let peer_key_location = PeerKeyLocation::new(transport_pub_key, address);
let location = location
.map(Location::new)
.unwrap_or_else(|| Location::from_address(&address));
gateways.push(InitPeerNode::new(peer_key_location, location));
}
tracing::info!(
"Node will be listening at {}:{} internal address",
config.network_api.address,
config.network_api.port
);
if let Some(own_addr) = &config.peer_id {
tracing::info!("Node external address: {}", own_addr.socket_addr());
}
Ok(NodeConfig {
should_connect: true,
is_gateway: config.is_gateway,
key_pair: config.transport_keypair().clone(),
gateways,
own_addr: config.peer_id.clone().map(|p| p.socket_addr()),
network_listener_ip: config.network_api.address,
network_listener_port: config.network_api.port,
location: config.location.map(Location::new),
config: Arc::new(config.clone()),
max_hops_to_live: None,
rnd_if_htl_above: None,
max_number_conn: Some(config.network_api.max_connections),
min_number_conn: Some(config.network_api.min_connections),
max_upstream_bandwidth: None,
max_downstream_bandwidth: None,
blocked_addresses: config.network_api.blocked_addresses.clone(),
transient_budget: config.network_api.transient_budget,
transient_ttl: Duration::from_secs(config.network_api.transient_ttl_secs),
relay_ready_connections: if config.network_api.skip_load_from_network {
Some(0) } else {
Some(3) },
})
}
pub(crate) async fn parse_socket_addr(address: &Address) -> anyhow::Result<SocketAddr> {
let (hostname, port) = match address {
crate::config::Address::Hostname(hostname) => {
match hostname.rsplit_once(':') {
None => {
let hostname_with_port =
format!("{}:{}", hostname, crate::config::default_network_api_port());
if let Ok(mut addrs) = hostname_with_port.to_socket_addrs() {
if let Some(addr) = addrs.next() {
return Ok(addr);
}
}
(Cow::Borrowed(hostname.as_str()), None)
}
Some((host, port)) => match port.parse::<u16>() {
Ok(port) => {
if let Ok(mut addrs) = hostname.to_socket_addrs() {
if let Some(addr) = addrs.next() {
return Ok(addr);
}
}
(Cow::Borrowed(host), Some(port))
}
Err(_) => return Err(anyhow::anyhow!("Invalid port number: {port}")),
},
}
}
Address::HostAddress(addr) => return Ok(*addr),
};
let (conf, opts) = hickory_resolver::system_conf::read_system_conf()?;
let resolver = hickory_resolver::TokioAsyncResolver::new(
conf,
opts,
hickory_resolver::name_server::GenericConnector::new(
hickory_resolver::name_server::TokioRuntimeProvider::new(),
),
);
let hostname = if hostname.ends_with('.') {
hostname
} else {
Cow::Owned(format!("{hostname}."))
};
let ips = resolver.lookup_ip(hostname.as_ref()).await?;
match ips.into_iter().next() {
Some(ip) => Ok(SocketAddr::new(
ip,
port.unwrap_or_else(crate::config::default_network_api_port),
)),
None => Err(anyhow::anyhow!("Fail to resolve IP address of {hostname}")),
}
}
pub fn config(&self) -> &Config {
&self.config
}
pub fn is_gateway(&mut self) -> &mut Self {
self.is_gateway = true;
self
}
pub fn first_gateway(&mut self) {
self.should_connect = false;
}
pub fn with_should_connect(&mut self, should_connect: bool) -> &mut Self {
self.should_connect = should_connect;
self
}
pub fn max_hops_to_live(&mut self, num_hops: usize) -> &mut Self {
self.max_hops_to_live = Some(num_hops);
self
}
pub fn rnd_if_htl_above(&mut self, num_hops: usize) -> &mut Self {
self.rnd_if_htl_above = Some(num_hops);
self
}
pub fn max_number_of_connections(&mut self, num: usize) -> &mut Self {
self.max_number_conn = Some(num);
self
}
pub fn min_number_of_connections(&mut self, num: usize) -> &mut Self {
self.min_number_conn = Some(num);
self
}
pub fn relay_ready_connections(&mut self, num: Option<usize>) -> &mut Self {
self.relay_ready_connections = num;
self
}
pub fn with_own_addr(&mut self, addr: SocketAddr) -> &mut Self {
self.own_addr = Some(addr);
self
}
pub fn with_location(&mut self, loc: Location) -> &mut Self {
self.location = Some(loc);
self
}
pub fn add_gateway(&mut self, peer: InitPeerNode) -> &mut Self {
self.gateways.push(peer);
self
}
pub async fn build<const CLIENTS: usize>(
self,
clients: [BoxedClient; CLIENTS],
) -> anyhow::Result<Node> {
let (node, _flush_handle) = self.build_with_flush_handle(clients).await?;
Ok(node)
}
pub async fn build_with_flush_handle<const CLIENTS: usize>(
self,
clients: [BoxedClient; CLIENTS],
) -> anyhow::Result<(Node, crate::tracing::EventFlushHandle)> {
let (event_register, flush_handle) = {
use super::tracing::{DynamicRegister, TelemetryReporter};
let event_reg = EventRegister::new(self.config.event_log());
let flush_handle = event_reg.flush_handle();
let mut registers: Vec<Box<dyn NetEventRegister>> = vec![Box::new(event_reg)];
#[cfg(feature = "trace-ot")]
{
use super::tracing::OTEventRegister;
registers.push(Box::new(OTEventRegister::new()));
}
if let Some(telemetry) = TelemetryReporter::new(&self.config.telemetry) {
registers.push(Box::new(telemetry));
}
(DynamicRegister::new(registers), flush_handle)
};
let cfg = self.config.clone();
let (node_inner, shutdown_tx) = NodeP2P::build::<NetworkContractHandler, CLIENTS, _>(
self,
clients,
event_register,
cfg,
)
.await?;
let shutdown_handle = ShutdownHandle { tx: shutdown_tx };
Ok((
Node {
inner: node_inner,
shutdown_handle,
},
flush_handle,
))
}
pub fn get_own_addr(&self) -> Option<SocketAddr> {
self.own_addr
}
fn get_gateways(&self) -> anyhow::Result<Vec<PeerKeyLocation>> {
let gateways: Vec<PeerKeyLocation> = self
.gateways
.iter()
.map(|node| node.peer_key_location.clone())
.collect();
if !self.is_gateway && gateways.is_empty() {
anyhow::bail!(
"At least one remote gateway is required to join an existing network for non-gateway nodes."
)
} else {
Ok(gateways)
}
}
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct InitPeerNode {
peer_key_location: PeerKeyLocation,
location: Location,
}
impl InitPeerNode {
pub fn new(peer_key_location: PeerKeyLocation, location: Location) -> Self {
Self {
peer_key_location,
location,
}
}
}
async fn report_result(
tx: Option<Transaction>,
op_result: Result<Option<OpEnum>, OpError>,
op_manager: &OpManager,
executor_callback: Option<ExecutorToEventLoopChannel<Callback>>,
event_listener: &mut dyn NetEventRegister,
) {
if let Some(tx_id) = tx {
if matches!(tx_id.transaction_type(), TransactionType::Update) {
tracing::debug!("report_result called for UPDATE transaction {}", tx_id);
}
}
match op_result {
Ok(Some(op_res)) => {
if let crate::operations::OpEnum::Update(ref update_op) = op_res {
tracing::debug!(
"UPDATE operation {} completed, finalized: {}",
update_op.id,
update_op.finalized()
);
}
if let Some(transaction) = tx {
if transaction.is_sub_operation() {
tracing::debug!(
tx = %transaction,
"Skipping client notification for sub-operation"
);
} else if op_res.is_subscription_renewal() {
tracing::debug!(
tx = %transaction,
"Skipping client notification for subscription renewal"
);
} else {
let host_result = op_res.to_host_result();
op_manager.send_client_result(transaction, host_result);
}
}
let (classified_op_type, classified_success) =
classify_op_outcome(op_res.id().transaction_type(), op_res.outcome());
if let Some(op_type) = classified_op_type {
network_status::record_op_result(op_type, classified_success);
}
let route_event = match op_res.outcome() {
OpOutcome::ContractOpSuccess {
target_peer,
contract_location,
first_response_time,
payload_size,
payload_transfer_time,
} => Some(RouteEvent {
peer: target_peer.clone(),
contract_location,
outcome: RouteOutcome::Success {
time_to_response_start: first_response_time,
payload_size,
payload_transfer_time,
},
op_type: classified_op_type,
}),
OpOutcome::ContractOpSuccessUntimed {
target_peer,
contract_location,
} => Some(RouteEvent {
peer: target_peer.clone(),
contract_location,
outcome: RouteOutcome::SuccessUntimed,
op_type: classified_op_type,
}),
OpOutcome::ContractOpFailure {
target_peer,
contract_location,
} => Some(RouteEvent {
peer: target_peer.clone(),
contract_location,
outcome: RouteOutcome::Failure,
op_type: classified_op_type,
}),
OpOutcome::Incomplete | OpOutcome::Irrelevant => None,
};
if let Some(event) = route_event {
if let Some(log_event) =
NetEventLog::route_event(op_res.id(), &op_manager.ring, &event)
{
event_listener
.register_events(Either::Left(log_event))
.await;
}
op_manager.ring.routing_finished(event);
}
if let Some(mut cb) = executor_callback {
cb.response(op_res).await;
}
}
Ok(None) => {
tracing::debug!(?tx, "No operation result found");
}
Err(err) => {
if let Some(tx) = tx {
if !tx.is_sub_operation() {
let client_error = freenet_stdlib::client_api::ClientError::from(
freenet_stdlib::client_api::ErrorKind::OperationError {
cause: err.to_string().into(),
},
);
op_manager.send_client_result(tx, Err(client_error));
}
op_manager.completed(tx);
}
#[cfg(any(debug_assertions, test))]
{
use std::io::Write;
#[cfg(debug_assertions)]
let OpError::InvalidStateTransition { tx, state, trace } = err else {
tracing::error!("Finished transaction with error: {err}");
return;
};
#[cfg(not(debug_assertions))]
let OpError::InvalidStateTransition { tx } = err else {
tracing::error!("Finished transaction with error: {err}");
return;
};
#[cfg(debug_assertions)]
let trace = format!("{trace}");
#[cfg(debug_assertions)]
{
let mut tr_lines = trace.lines();
let trace = tr_lines
.nth(2)
.map(|second_trace| {
let second_trace_lines =
[second_trace, tr_lines.next().unwrap_or_default()];
second_trace_lines.join("\n")
})
.unwrap_or_default();
let peer = op_manager.ring.connection_manager.own_location();
let log = format!(
"Transaction ({tx} @ {peer}) error trace:\n {trace} \nstate:\n {state:?}\n"
);
std::io::stderr().write_all(log.as_bytes()).unwrap();
}
#[cfg(not(debug_assertions))]
{
let peer = op_manager.ring.connection_manager.own_location();
let log = format!("Transaction ({tx} @ {peer}) error\n");
std::io::stderr().write_all(log.as_bytes()).unwrap();
}
}
#[cfg(not(any(debug_assertions, test)))]
{
tracing::debug!("Finished transaction with error: {err}");
}
}
}
}
pub(crate) async fn process_message_decoupled<CB>(
msg: NetMessage,
source_addr: Option<std::net::SocketAddr>,
op_manager: Arc<OpManager>,
conn_manager: CB,
mut event_listener: Box<dyn NetEventRegister>,
executor_callback: Option<ExecutorToEventLoopChannel<crate::contract::Callback>>,
pending_op_result: Option<tokio::sync::mpsc::Sender<NetMessage>>,
) where
CB: NetworkBridge,
{
let tx = *msg.id();
let op_result = handle_pure_network_message(
msg,
source_addr,
op_manager.clone(),
conn_manager,
event_listener.as_mut(),
pending_op_result,
)
.await;
report_result(
Some(tx),
op_result,
&op_manager,
executor_callback,
&mut *event_listener,
)
.await;
}
#[allow(clippy::too_many_arguments)]
async fn handle_pure_network_message<CB>(
msg: NetMessage,
source_addr: Option<std::net::SocketAddr>,
op_manager: Arc<OpManager>,
conn_manager: CB,
event_listener: &mut dyn NetEventRegister,
pending_op_result: Option<tokio::sync::mpsc::Sender<NetMessage>>,
) -> Result<Option<crate::operations::OpEnum>, crate::node::OpError>
where
CB: NetworkBridge,
{
match msg {
NetMessage::V1(msg_v1) => {
handle_pure_network_message_v1(
msg_v1,
source_addr,
op_manager,
conn_manager,
event_listener,
pending_op_result,
)
.await
}
}
}
fn op_retry_backoff(attempt: usize) -> Duration {
Duration::from_millis((5u64 << attempt.min(8)).min(1_000))
}
fn try_forward_task_per_tx_reply(
pending_op_result: Option<&tokio::sync::mpsc::Sender<NetMessage>>,
reply: NetMessage,
op_label: &'static str,
) -> bool {
let Some(callback) = pending_op_result else {
return false;
};
let tx_id = *reply.id();
if let Err(err) = callback.try_send(reply) {
tracing::error!(
%err,
%tx_id,
op = op_label,
"Failed to forward task-per-tx reply to OpCtx task"
);
}
true
}
fn forward_pending_op_result_if_completed(
op_result: &Result<Option<OpEnum>, OpError>,
pending_op_result: Option<&tokio::sync::mpsc::Sender<NetMessage>>,
reply: NetMessage,
) {
if !is_operation_completed(op_result) {
return;
}
let Some(callback) = pending_op_result else {
return;
};
let tx_id = *reply.id();
if let Err(err) = callback.try_send(reply) {
tracing::error!(%err, %tx_id, "Failed to send message to executor");
}
}
#[allow(clippy::too_many_arguments)]
async fn handle_pure_network_message_v1<CB>(
msg: NetMessageV1,
source_addr: Option<std::net::SocketAddr>,
op_manager: Arc<OpManager>,
mut conn_manager: CB,
event_listener: &mut dyn NetEventRegister,
pending_op_result: Option<tokio::sync::mpsc::Sender<NetMessage>>,
) -> Result<Option<crate::operations::OpEnum>, crate::node::OpError>
where
CB: NetworkBridge,
{
event_listener
.register_events(NetEventLog::from_inbound_msg_v1(
&msg,
&op_manager,
source_addr,
))
.await;
const MAX_RETRIES: usize = 15usize;
for i in 0..MAX_RETRIES {
let tx = Some(*msg.id());
tracing::debug!(?tx, "Processing pure network operation, iteration: {i}");
match msg {
NetMessageV1::Connect(ref op) => {
let parent_span = tracing::Span::current();
let span = tracing::info_span!(
parent: parent_span,
"handle_connect_op_request",
transaction = %msg.id(),
tx_type = %msg.id().transaction_type()
);
let op_result = handle_op_request::<ConnectOp, _>(
&op_manager,
&mut conn_manager,
op,
source_addr,
)
.instrument(span)
.await;
forward_pending_op_result_if_completed(
&op_result,
pending_op_result.as_ref(),
NetMessage::V1(NetMessageV1::Connect((*op).clone())),
);
if let Err(OpError::OpNotAvailable(state)) = &op_result {
match state {
OpNotAvailable::Running => {
let delay = op_retry_backoff(i);
tracing::debug!(
delay_ms = delay.as_millis() as u64,
attempt = i,
"Pure network: Operation still running, backing off"
);
tokio::time::sleep(delay).await;
continue;
}
OpNotAvailable::Completed => {
tracing::debug!(
tx = %msg.id(),
tx_type = ?msg.id().transaction_type(),
"Pure network: Operation already completed"
);
return Ok(None);
}
}
}
return handle_pure_network_result(
tx,
op_result,
&op_manager,
&mut *event_listener,
)
.await;
}
NetMessageV1::Put(ref op) => {
if matches!(
op,
put::PutMsg::Response { .. } | put::PutMsg::ResponseStreaming { .. }
) && try_forward_task_per_tx_reply(
pending_op_result.as_ref(),
NetMessage::V1(NetMessageV1::Put((*op).clone())),
"put",
) {
return Ok(None);
}
tracing::debug!(
tx = %op.id(),
"handle_pure_network_message_v1: Processing PUT message"
);
let op_result = handle_op_request::<put::PutOp, _>(
&op_manager,
&mut conn_manager,
op,
source_addr,
)
.await;
tracing::debug!(
tx = %op.id(),
op_result_ok = op_result.is_ok(),
"handle_pure_network_message_v1: PUT handle_op_request completed"
);
forward_pending_op_result_if_completed(
&op_result,
pending_op_result.as_ref(),
NetMessage::V1(NetMessageV1::Put((*op).clone())),
);
if let Err(OpError::OpNotAvailable(state)) = &op_result {
match state {
OpNotAvailable::Running => {
let delay = op_retry_backoff(i);
tracing::debug!(
delay_ms = delay.as_millis() as u64,
attempt = i,
"Pure network: Operation still running, backing off"
);
tokio::time::sleep(delay).await;
continue;
}
OpNotAvailable::Completed => {
tracing::debug!("Pure network: Operation already completed");
return Ok(None);
}
}
}
return handle_pure_network_result(
tx,
op_result,
&op_manager,
&mut *event_listener,
)
.await;
}
NetMessageV1::Get(ref op) => {
if matches!(
op,
get::GetMsg::Response { .. } | get::GetMsg::ResponseStreaming { .. }
) && try_forward_task_per_tx_reply(
pending_op_result.as_ref(),
NetMessage::V1(NetMessageV1::Get((*op).clone())),
"get",
) {
return Ok(None);
}
if let get::GetMsg::Request {
id,
instance_id,
fetch_contract,
htl,
visited,
subscribe,
} = op
{
if let Some(upstream_addr) = source_addr {
if !op_manager.has_get_op(id) {
if let Err(err) = get::op_ctx_task::start_relay_get(
op_manager.clone(),
*id,
*instance_id,
*htl,
upstream_addr,
visited.clone(),
*fetch_contract,
*subscribe,
)
.await
{
tracing::error!(
tx = %id,
%instance_id,
error = %err,
"GET relay dispatch: start_relay_get failed"
);
}
return Ok(None);
}
}
}
let op_result = handle_op_request::<get::GetOp, _>(
&op_manager,
&mut conn_manager,
op,
source_addr,
)
.await;
forward_pending_op_result_if_completed(
&op_result,
pending_op_result.as_ref(),
NetMessage::V1(NetMessageV1::Get((*op).clone())),
);
if let Err(OpError::OpNotAvailable(state)) = &op_result {
match state {
OpNotAvailable::Running => {
let delay = op_retry_backoff(i);
tracing::debug!(
delay_ms = delay.as_millis() as u64,
attempt = i,
"Pure network: Operation still running, backing off"
);
tokio::time::sleep(delay).await;
continue;
}
OpNotAvailable::Completed => {
tracing::debug!("Pure network: Operation already completed");
return Ok(None);
}
}
}
return handle_pure_network_result(
tx,
op_result,
&op_manager,
&mut *event_listener,
)
.await;
}
NetMessageV1::Update(ref op) => {
if let Some(sender_addr) = source_addr {
#[allow(clippy::wildcard_enum_match_arm)]
match op {
update::UpdateMsg::RequestUpdate {
id,
key,
related_contracts,
value,
} if !op_manager.has_update_op(id) => {
if let Err(err) = update::op_ctx_task::start_relay_request_update(
op_manager.clone(),
*id,
*key,
related_contracts.clone(),
value.clone(),
sender_addr,
)
.await
{
tracing::error!(
tx = %id,
%key,
error = %err,
"UPDATE relay dispatch: start_relay_request_update failed"
);
}
return Ok(None);
}
update::UpdateMsg::BroadcastTo {
id,
key,
payload,
sender_summary_bytes,
} if !op_manager.has_update_op(id) => {
if let Err(err) = update::op_ctx_task::start_relay_broadcast_to(
op_manager.clone(),
*id,
*key,
payload.clone(),
sender_summary_bytes.clone(),
sender_addr,
)
.await
{
tracing::error!(
tx = %id,
%key,
error = %err,
"UPDATE relay dispatch: start_relay_broadcast_to failed"
);
}
return Ok(None);
}
_ => {}
}
}
let op_result = handle_op_request::<update::UpdateOp, _>(
&op_manager,
&mut conn_manager,
op,
source_addr,
)
.await;
forward_pending_op_result_if_completed(
&op_result,
pending_op_result.as_ref(),
NetMessage::V1(NetMessageV1::Update((*op).clone())),
);
if let Err(OpError::OpNotAvailable(state)) = &op_result {
match state {
OpNotAvailable::Running => {
let delay = op_retry_backoff(i);
tracing::debug!(
delay_ms = delay.as_millis() as u64,
attempt = i,
"Pure network: Operation still running, backing off"
);
tokio::time::sleep(delay).await;
continue;
}
OpNotAvailable::Completed => {
tracing::debug!("Pure network: Operation already completed");
return Ok(None);
}
}
}
return handle_pure_network_result(
tx,
op_result,
&op_manager,
&mut *event_listener,
)
.await;
}
NetMessageV1::Subscribe(ref op) => {
if matches!(op, subscribe::SubscribeMsg::Response { .. })
&& try_forward_task_per_tx_reply(
pending_op_result.as_ref(),
NetMessage::V1(NetMessageV1::Subscribe((*op).clone())),
"subscribe",
)
{
return Ok(None);
}
let op_result = handle_op_request::<subscribe::SubscribeOp, _>(
&op_manager,
&mut conn_manager,
op,
source_addr,
)
.await;
if let Some(ref callback) = pending_op_result {
if is_operation_completed(&op_result) {
let instance_id = match op {
subscribe::SubscribeMsg::Request { instance_id, .. }
| subscribe::SubscribeMsg::Response { instance_id, .. }
| subscribe::SubscribeMsg::Unsubscribe { instance_id, .. }
| subscribe::SubscribeMsg::ForwardingAck { instance_id, .. } => {
*instance_id
}
};
let result = match &op_result {
Ok(Some(OpEnum::Subscribe(sub_op))) => match sub_op.completed_key() {
Some(key) => subscribe::SubscribeMsgResult::Subscribed { key },
None => subscribe::SubscribeMsgResult::NotFound,
},
_ => subscribe::SubscribeMsgResult::NotFound,
};
let reply = NetMessage::from(subscribe::SubscribeMsg::Response {
id: *op.id(),
instance_id,
result,
});
if let Err(err) = callback.try_send(reply) {
tracing::debug!(
%err,
"subscribe local-completion: callback send failed \
(task may have timed out)"
);
}
}
}
if let Err(OpError::OpNotAvailable(state)) = &op_result {
match state {
OpNotAvailable::Running => {
let delay = op_retry_backoff(i);
tracing::debug!(
delay_ms = delay.as_millis() as u64,
attempt = i,
"Pure network: Operation still running, backing off"
);
tokio::time::sleep(delay).await;
continue;
}
OpNotAvailable::Completed => {
tracing::debug!("Pure network: Operation already completed");
return Ok(None);
}
}
}
return handle_pure_network_result(
tx,
op_result,
&op_manager,
&mut *event_listener,
)
.await;
}
NetMessageV1::NeighborHosting { ref message } => {
let Some(source) = source_addr else {
tracing::warn!(
"Received NeighborHosting message without source address (pure network)"
);
return Ok(None);
};
tracing::debug!(
from = %source,
"Processing NeighborHosting message (pure network)"
);
let source_pub_key = op_manager
.ring
.connection_manager
.get_peer_by_addr(source)
.map(|pkl| pkl.pub_key().clone());
let Some(source_pub_key) = source_pub_key else {
tracing::debug!(
%source,
"NeighborHosting: could not resolve source addr to pub_key, skipping"
);
return Ok(None);
};
let result = op_manager
.neighbor_hosting
.handle_message(&source_pub_key, message.clone());
if let Some(response) = result.response {
let response_msg =
NetMessage::V1(NetMessageV1::NeighborHosting { message: response });
if let Err(err) = conn_manager.send(source, response_msg).await {
tracing::error!(%err, %source, "Failed to send NeighborHosting response");
}
}
for instance_id in result.overlapping_contracts {
if let Some((key, state)) =
get_contract_state_by_id(&op_manager, &instance_id).await
{
if !op_manager.ring.is_receiving_updates(&key)
&& !op_manager.ring.has_downstream_subscribers(&key)
{
continue;
}
tracing::debug!(
contract = %key,
peer = %source_pub_key,
"Proximity cache overlap — syncing state to neighbor"
);
if let Err(e) = op_manager
.notify_node_event(NodeEvent::SyncStateToPeer {
key,
new_state: state,
target: source,
})
.await
{
tracing::warn!(
contract = %instance_id,
error = %e,
"Failed to emit SyncStateToPeer for proximity sync"
);
}
}
}
return Ok(None);
}
NetMessageV1::InterestSync { ref message } => {
let Some(source) = source_addr else {
tracing::warn!("Received InterestSync message without source address");
return Ok(None);
};
tracing::debug!(
from = %source,
"Processing InterestSync message"
);
if let Some(response) =
handle_interest_sync_message(&op_manager, source, message.clone()).await
{
let response_msg =
NetMessage::V1(NetMessageV1::InterestSync { message: response });
if let Err(err) = conn_manager.send(source, response_msg).await {
tracing::error!(%err, %source, "Failed to send InterestSync response");
}
}
return Ok(None);
}
NetMessageV1::ReadyState { ready } => {
let Some(source) = source_addr else {
tracing::warn!("Received ReadyState message without source address");
return Ok(None);
};
if ready {
op_manager.ring.connection_manager.mark_peer_ready(source);
} else {
op_manager
.ring
.connection_manager
.mark_peer_not_ready(source);
}
tracing::debug!(
from = %source,
ready,
"Processed ReadyState from peer"
);
return Ok(None);
}
NetMessageV1::Aborted(tx) => {
tracing::debug!(
%tx,
tx_type = ?tx.transaction_type(),
"Received Aborted message, delegating to handle_aborted_op"
);
if let Err(err) = handle_aborted_op(tx, &op_manager, &[]).await {
if !matches!(err, OpError::StatePushed) {
tracing::error!(
%tx,
error = %err,
"Error handling aborted operation"
);
}
}
return Ok(None);
}
}
}
tracing::warn!(
tx = %msg.id(),
tx_type = ?msg.id().transaction_type(),
"Dropping message after {MAX_RETRIES} retry attempts (operation busy)"
);
Ok(None)
}
async fn handle_pure_network_result(
tx: Option<Transaction>,
op_result: Result<Option<crate::operations::OpEnum>, OpError>,
_op_manager: &Arc<OpManager>,
_event_listener: &mut dyn NetEventRegister,
) -> Result<Option<crate::operations::OpEnum>, crate::node::OpError> {
tracing::debug!("Pure network result handling for transaction: {:?}", tx);
match &op_result {
Ok(Some(_op_res)) => {
tracing::debug!(
"Network operation completed successfully for transaction: {:?}",
tx
);
if let Some(tx_id) = tx {
tracing::debug!("Network operation completed for transaction: {}", tx_id);
}
}
Ok(None) => {
tracing::debug!("Network operation returned no result");
}
Err(OpError::StatePushed) => {
return Ok(None);
}
Err(OpError::OpNotPresent(tx_id)) => {
tracing::debug!(
tx = %tx_id,
"Network response arrived for non-existent operation (likely timed out or already completed)"
);
return Ok(None);
}
Err(e) => {
tracing::error!("Network operation failed: {}", e);
if let Some(tx_id) = tx {
tracing::debug!(
"Network operation failed for transaction: {} with error: {}",
tx_id,
e
);
}
}
}
op_result
}
async fn handle_interest_sync_message(
op_manager: &Arc<OpManager>,
source: std::net::SocketAddr,
message: crate::message::InterestMessage,
) -> Option<crate::message::InterestMessage> {
use crate::message::{InterestMessage, NodeEvent, SummaryEntry};
use crate::ring::interest::contract_hash;
match message {
InterestMessage::Interests { hashes } => {
tracing::debug!(
from = %source,
hash_count = hashes.len(),
"Received Interests message"
);
let peer_key = get_peer_key_from_addr(op_manager, source);
if let Some(ref pk) = peer_key {
let incoming_hashes: std::collections::HashSet<u32> =
hashes.iter().copied().collect();
let current_contracts = op_manager.interest_manager.get_contracts_for_peer(pk);
let mut removed = 0usize;
for contract in ¤t_contracts {
let h = contract_hash(contract);
if !incoming_hashes.contains(&h) {
op_manager
.interest_manager
.remove_peer_interest(contract, pk);
removed += 1;
}
}
if removed > 0 {
tracing::debug!(
from = %source,
removed,
"Full-replace: removed stale interest entries"
);
}
}
let matching = op_manager.interest_manager.get_matching_contracts(&hashes);
let mut entries = Vec::with_capacity(matching.len());
for contract in matching {
let hash = contract_hash(&contract);
let summary = get_contract_summary(op_manager, &contract).await;
entries.push(SummaryEntry::from_summary(hash, summary.as_ref()));
if let Some(ref pk) = peer_key {
if op_manager
.interest_manager
.get_peer_interest(&contract, pk)
.is_some()
{
op_manager
.interest_manager
.refresh_peer_interest(&contract, pk);
} else {
op_manager.interest_manager.register_peer_interest(
&contract,
pk.clone(),
None, false,
);
}
}
}
if entries.is_empty() {
None
} else {
Some(InterestMessage::Summaries { entries })
}
}
InterestMessage::Summaries { entries } => {
tracing::debug!(
from = %source,
entry_count = entries.len(),
"Received Summaries message"
);
let peer_key = get_peer_key_from_addr(op_manager, source);
let mut stale_contracts = Vec::new();
let emit_confirmed = crate::config::SimulationIdleTimeout::is_enabled();
let mut confirmed_states: Vec<(freenet_stdlib::prelude::ContractKey, String)> =
Vec::new();
if let Some(pk) = peer_key {
for entry in entries {
for contract in op_manager.interest_manager.lookup_by_hash(entry.hash) {
if !op_manager.interest_manager.has_local_interest(&contract) {
continue;
}
let their_summary = entry.to_summary();
let our_summary = get_contract_summary(op_manager, &contract).await;
if emit_confirmed {
if let Some(ref summary) = our_summary {
confirmed_states.push((contract, hex::encode(summary.as_ref())));
}
}
let is_stale = our_summary
.as_ref()
.zip(their_summary.as_ref())
.is_some_and(|(ours, theirs)| ours.as_ref() != theirs.as_ref());
op_manager.interest_manager.update_peer_summary(
&contract,
&pk,
their_summary,
);
if is_stale && !stale_contracts.contains(&contract) {
stale_contracts.push(contract);
}
}
}
}
for contract in stale_contracts {
let Some(state) = get_contract_state(op_manager, &contract).await else {
tracing::trace!(
contract = %contract,
"Skipping stale-peer sync — no local state available"
);
continue;
};
tracing::info!(
contract = %contract,
stale_peer = %source,
"Summary mismatch in interest sync — syncing state to stale peer"
);
if let Err(e) = op_manager
.notify_node_event(NodeEvent::SyncStateToPeer {
key: contract,
new_state: state,
target: source,
})
.await
{
tracing::warn!(
contract = %contract,
error = %e,
"Failed to emit SyncStateToPeer for stale peer correction"
);
}
}
for (key, state_hash) in confirmed_states {
if let Some(event) =
crate::tracing::NetEventLog::state_confirmed(&op_manager.ring, key, state_hash)
{
op_manager
.ring
.register_events(either::Either::Left(event))
.await;
}
}
None
}
InterestMessage::ChangeInterests { added, removed } => {
tracing::debug!(
from = %source,
added_count = added.len(),
removed_count = removed.len(),
"Received ChangeInterests message"
);
let peer_key = get_peer_key_from_addr(op_manager, source);
if let Some(ref pk) = peer_key {
for hash in removed {
for contract in op_manager.interest_manager.lookup_by_hash(hash) {
op_manager
.interest_manager
.remove_peer_interest(&contract, pk);
}
}
}
let mut entries = Vec::new();
if let Some(ref pk) = peer_key {
for hash in added {
for contract in op_manager.interest_manager.lookup_by_hash(hash) {
if !op_manager.interest_manager.has_local_interest(&contract) {
continue;
}
op_manager.interest_manager.register_peer_interest(
&contract,
pk.clone(),
None,
false,
);
let summary = get_contract_summary(op_manager, &contract).await;
entries.push(SummaryEntry::from_summary(hash, summary.as_ref()));
}
}
}
if entries.is_empty() {
None
} else {
Some(InterestMessage::Summaries { entries })
}
}
InterestMessage::ResyncRequest { key } => {
tracing::info!(
from = %source,
contract = %key,
event = "resync_request_received",
"Received ResyncRequest - peer needs full state"
);
op_manager.interest_manager.record_resync_request_received();
crate::config::GlobalTestMetrics::record_resync_request();
let peer_key = get_peer_key_from_addr(op_manager, source);
if let Some(ref pk) = peer_key {
op_manager
.interest_manager
.update_peer_summary(&key, pk, None);
}
let from_peer = op_manager.ring.connection_manager.get_peer_by_addr(source);
if let Some(ref from_pkl) = from_peer {
if let Some(event) = crate::tracing::NetEventLog::resync_request_received(
&op_manager.ring,
key,
from_pkl.clone(),
) {
op_manager
.ring
.register_events(either::Either::Left(event))
.await;
}
} else {
tracing::debug!(
contract = %key,
source = %source,
"ResyncRequest telemetry skipped: peer lookup failed"
);
}
let state = get_contract_state(op_manager, &key).await;
let Some(state) = state else {
tracing::warn!(
contract = %key,
"ResyncRequest for contract we don't have state for"
);
return None;
};
let summary = get_contract_summary(op_manager, &key).await;
let Some(summary) = summary else {
tracing::warn!(
contract = %key,
"ResyncRequest for contract we can't compute summary for"
);
return None;
};
tracing::info!(
to = %source,
contract = %key,
state_size = state.as_ref().len(),
summary_size = summary.as_ref().len(),
event = "resync_response_sent",
"Sending ResyncResponse with full state"
);
if let Some(ref to_pkl) = from_peer {
if let Some(event) = crate::tracing::NetEventLog::resync_response_sent(
&op_manager.ring,
key,
to_pkl.clone(),
state.as_ref().len(),
) {
op_manager
.ring
.register_events(either::Either::Left(event))
.await;
}
}
Some(InterestMessage::ResyncResponse {
key,
state_bytes: state.as_ref().to_vec(),
summary_bytes: summary.as_ref().to_vec(),
})
}
InterestMessage::ResyncResponse {
key,
state_bytes,
summary_bytes,
} => {
tracing::info!(
from = %source,
contract = %key,
state_size = state_bytes.len(),
event = "resync_response_received",
"Received ResyncResponse with full state"
);
let state = freenet_stdlib::prelude::State::from(state_bytes.clone());
let update_data = freenet_stdlib::prelude::UpdateData::State(state);
use crate::contract::ContractHandlerEvent;
match op_manager
.notify_contract_handler(ContractHandlerEvent::UpdateQuery {
key,
data: update_data,
related_contracts: Default::default(),
})
.await
{
Ok(ContractHandlerEvent::UpdateResponse {
new_value: Ok(_), ..
}) => {
tracing::info!(
from = %source,
contract = %key,
event = "resync_applied",
changed = true,
"ResyncResponse state applied successfully"
);
}
Ok(ContractHandlerEvent::UpdateNoChange { .. }) => {
tracing::info!(
from = %source,
contract = %key,
event = "resync_applied",
changed = false,
"ResyncResponse state unchanged (already had this state)"
);
}
Ok(other) => {
tracing::warn!(
from = %source,
contract = %key,
event = "resync_failed",
response = ?other,
"Unexpected response to resync update"
);
}
Err(e) => {
tracing::error!(
from = %source,
contract = %key,
event = "resync_failed",
error = %e,
"Failed to apply resync state"
);
}
}
let peer_key = get_peer_key_from_addr(op_manager, source);
if let Some(pk) = peer_key {
let summary = freenet_stdlib::prelude::StateSummary::from(summary_bytes);
op_manager
.interest_manager
.update_peer_summary(&key, &pk, Some(summary));
}
None
}
}
}
async fn get_contract_state(
op_manager: &Arc<OpManager>,
key: &freenet_stdlib::prelude::ContractKey,
) -> Option<freenet_stdlib::prelude::WrappedState> {
get_contract_state_by_id(op_manager, key.id())
.await
.map(|(_, state)| state)
}
async fn get_contract_state_by_id(
op_manager: &Arc<OpManager>,
instance_id: &freenet_stdlib::prelude::ContractInstanceId,
) -> Option<(
freenet_stdlib::prelude::ContractKey,
freenet_stdlib::prelude::WrappedState,
)> {
use crate::contract::ContractHandlerEvent;
match op_manager
.notify_contract_handler(ContractHandlerEvent::GetQuery {
instance_id: *instance_id,
return_contract_code: false,
})
.await
{
Ok(ContractHandlerEvent::GetResponse {
key: Some(key),
response: Ok(store_response),
}) => store_response.state.map(|state| (key, state)),
Ok(ContractHandlerEvent::GetResponse {
response: Err(e), ..
}) => {
tracing::warn!(
contract = %instance_id,
error = %e,
"Failed to get contract state by instance id"
);
None
}
_ => None,
}
}
async fn get_contract_summary(
op_manager: &Arc<OpManager>,
key: &freenet_stdlib::prelude::ContractKey,
) -> Option<freenet_stdlib::prelude::StateSummary<'static>> {
use crate::contract::ContractHandlerEvent;
match op_manager
.notify_contract_handler(ContractHandlerEvent::GetSummaryQuery { key: *key })
.await
{
Ok(ContractHandlerEvent::GetSummaryResponse {
summary: Ok(summary),
..
}) => Some(summary),
Ok(ContractHandlerEvent::GetSummaryResponse {
summary: Err(e), ..
}) => {
tracing::warn!(
contract = %key,
error = %e,
"Failed to get contract summary"
);
None
}
_ => None,
}
}
fn get_peer_key_from_addr(
op_manager: &Arc<OpManager>,
addr: std::net::SocketAddr,
) -> Option<crate::ring::interest::PeerKey> {
op_manager
.ring
.connection_manager
.get_peer_by_addr(addr)
.map(|pkl| crate::ring::interest::PeerKey::from(pkl.pub_key.clone()))
}
#[allow(dead_code)]
pub async fn subscribe(
op_manager: Arc<OpManager>,
instance_id: ContractInstanceId,
client_id: Option<ClientId>,
) -> Result<Transaction, OpError> {
subscribe_with_id(op_manager, instance_id, client_id, None).await
}
pub async fn subscribe_with_id(
op_manager: Arc<OpManager>,
instance_id: ContractInstanceId,
client_id: Option<ClientId>,
transaction_id: Option<Transaction>,
) -> Result<Transaction, OpError> {
let client_tx = match transaction_id {
Some(id) => id,
None => Transaction::new::<subscribe::SubscribeMsg>(),
};
if let Some(client_id) = client_id {
use crate::client_events::RequestId;
let request_id = RequestId::new();
if let Err(e) = op_manager
.ch_outbound
.waiting_for_subscription_result(client_tx, instance_id, client_id, request_id)
.await
{
tracing::warn!(tx = %client_tx, error = %e, "failed to register subscription result waiter");
}
}
subscribe::start_client_subscribe(op_manager, instance_id, client_tx).await
}
async fn handle_aborted_op(
tx: Transaction,
op_manager: &OpManager,
gateways: &[PeerKeyLocation],
) -> Result<(), OpError> {
use crate::util::IterExt;
match tx.transaction_type() {
TransactionType::Connect => {
match op_manager.pop(&tx) {
Ok(Some(OpEnum::Connect(op)))
if op_manager.ring.open_connections()
< op_manager.ring.connection_manager.min_connections =>
{
let gateway = op.gateway().cloned();
if let Some(gateway) = gateway {
if let Some(peer_addr) = gateway.peer_addr.as_known() {
op_manager
.ring
.connection_manager
.prune_in_transit_connection(*peer_addr);
let backoff_duration = {
let mut backoff = op_manager.gateway_backoff.lock();
backoff.record_failure(*peer_addr);
backoff.remaining_backoff(*peer_addr)
};
if let Some(duration) = backoff_duration {
let open_conns = op_manager.ring.open_connections();
let effective = if open_conns > 0 {
let jitter_ms = crate::config::GlobalRng::random_range(
0u64..(connect::GATEWAY_BACKOFF_POLL_CAP.as_millis() / 5)
as u64,
);
let cap = connect::GATEWAY_BACKOFF_POLL_CAP.mul_f64(0.8)
+ Duration::from_millis(jitter_ms);
duration.min(cap)
} else {
duration
};
tracing::info!(
gateway = %gateway,
backoff_secs = duration.as_secs(),
effective_wait_secs = effective.as_secs(),
open_connections = open_conns,
"Gateway connection failed, waiting before retry"
);
tokio::select! {
_ = tokio::time::sleep(effective) => {},
_ = op_manager.gateway_backoff_cleared.notified() => {
tracing::info!(
gateway = %gateway,
"Gateway backoff cleared externally, retrying immediately"
);
},
}
}
}
tracing::debug!("Retrying connection to gateway {}", gateway);
connect::join_ring_request(&gateway, op_manager).await?;
}
}
Ok(Some(OpEnum::Connect(op))) => {
if let Some(peer_addr) = op.get_next_hop_addr() {
op_manager
.ring
.connection_manager
.prune_in_transit_connection(peer_addr);
}
if op_manager.ring.open_connections() == 0 && op_manager.ring.is_gateway() {
tracing::warn!("Retrying joining the ring with an other gateway");
if let Some(gateway) = gateways.iter().shuffle().next() {
connect::join_ring_request(gateway, op_manager).await?
}
}
}
Ok(Some(other)) => {
op_manager.push(tx, other).await?;
}
_ => {}
}
}
TransactionType::Get => match op_manager.pop(&tx) {
Ok(Some(OpEnum::Get(op))) => {
if let Err(err) = op.handle_abort(op_manager).await {
if !matches!(err, OpError::StatePushed) {
return Err(err);
}
}
}
Ok(Some(other)) => {
op_manager.push(tx, other).await?;
}
_ => {}
},
TransactionType::Subscribe => match op_manager.pop(&tx) {
Ok(Some(OpEnum::Subscribe(op))) => {
if let Err(err) = op.handle_abort(op_manager).await {
if !matches!(err, OpError::StatePushed) {
return Err(err);
}
}
}
Ok(Some(other)) => {
op_manager.push(tx, other).await?;
}
_ => {}
},
TransactionType::Put => match op_manager.pop(&tx) {
Ok(Some(OpEnum::Put(op))) => {
if let Err(err) = op.handle_abort(op_manager).await {
if !matches!(err, OpError::StatePushed) {
return Err(err);
}
}
}
Ok(Some(other)) => {
op_manager.push(tx, other).await?;
}
_ => {}
},
TransactionType::Update => match op_manager.pop(&tx) {
Ok(Some(OpEnum::Update(op))) => {
if let Err(err) = op.handle_abort(op_manager).await {
if !matches!(err, OpError::StatePushed) {
return Err(err);
}
}
}
Ok(Some(other)) => {
op_manager.push(tx, other).await?;
}
_ => {}
},
}
Ok(())
}
pub type PeerId = crate::ring::KnownPeerKeyLocation;
pub async fn run_local_node(
mut executor: Executor,
socket: WebsocketApiConfig,
) -> anyhow::Result<()> {
if !crate::server::is_private_ip(&socket.address) {
anyhow::bail!(
"invalid ip: {}, only loopback and private network addresses are allowed",
socket.address
)
}
let (mut gw, mut ws_proxy) = crate::server::serve_client_api_in(socket).await?;
enum Receiver {
Ws,
Gw,
}
let mut receiver;
loop {
let req = crate::deterministic_select! {
req = ws_proxy.recv() => {
receiver = Receiver::Ws;
req?
},
req = gw.recv() => {
receiver = Receiver::Gw;
req?
},
};
let OpenRequest {
client_id: id,
request,
notification_channel,
token,
origin_contract,
..
} = req;
tracing::debug!(client_id = %id, ?token, "Received OpenRequest -> {request}");
let res = match *request {
ClientRequest::ContractOp(op) => {
executor
.contract_requests(op, id, notification_channel)
.await
}
ClientRequest::DelegateOp(op) => {
let op_name = match op {
DelegateRequest::RegisterDelegate { .. } => "RegisterDelegate",
DelegateRequest::ApplicationMessages { .. } => "ApplicationMessages",
DelegateRequest::UnregisterDelegate(_) => "UnregisterDelegate",
_ => "Unknown",
};
tracing::debug!(
op_name = ?op_name,
?origin_contract,
"Handling ClientRequest::DelegateOp"
);
executor.delegate_request(op, origin_contract.as_ref(), None)
}
ClientRequest::Disconnect { cause } => {
if let Some(cause) = cause {
tracing::info!("disconnecting cause: {cause}");
}
continue;
}
ClientRequest::Authenticate { .. }
| ClientRequest::NodeQueries(_)
| ClientRequest::Close
| _ => Err(ExecutorError::other(anyhow::anyhow!("not supported"))),
};
match res {
Ok(res) => {
match receiver {
Receiver::Ws => ws_proxy.send(id, Ok(res)).await?,
Receiver::Gw => gw.send(id, Ok(res)).await?,
};
}
Err(err) if err.is_request() => {
let err = ErrorKind::RequestError(err.unwrap_request());
match receiver {
Receiver::Ws => {
ws_proxy.send(id, Err(err.into())).await?;
}
Receiver::Gw => {
gw.send(id, Err(err.into())).await?;
}
};
}
Err(err) => {
tracing::error!("{err}");
let err = Err(ErrorKind::Unhandled {
cause: format!("{err}").into(),
}
.into());
match receiver {
Receiver::Ws => {
ws_proxy.send(id, err).await?;
}
Receiver::Gw => {
gw.send(id, err).await?;
}
};
}
}
}
}
pub async fn run_network_node(mut node: Node) -> anyhow::Result<()> {
tracing::info!("Starting node");
let is_gateway = node.inner.is_gateway;
let location = if let Some(loc) = node.inner.location {
Some(loc)
} else {
is_gateway
.then(|| {
node.inner
.peer_id
.as_ref()
.map(|id| Location::from_address(&id.socket_addr()))
})
.flatten()
};
if let Some(location) = location {
tracing::info!("Setting initial location: {location}");
node.update_location(location);
}
match node.run().await {
Ok(_) => {
if is_gateway {
tracing::info!("Gateway finished");
} else {
tracing::info!("Node finished");
}
Ok(())
}
Err(e) => {
tracing::error!("{e}");
Err(e)
}
}
}
pub trait IsOperationCompleted {
fn is_completed(&self) -> bool;
}
impl IsOperationCompleted for OpEnum {
fn is_completed(&self) -> bool {
match self {
OpEnum::Connect(op) => op.is_completed(),
OpEnum::Put(op) => op.is_completed(),
OpEnum::Get(op) => op.is_completed(),
OpEnum::Subscribe(op) => op.is_completed(),
OpEnum::Update(op) => op.is_completed(),
}
}
}
fn classify_op_outcome(
tx_type: TransactionType,
outcome: OpOutcome<'_>,
) -> (Option<network_status::OpType>, bool) {
use network_status::OpType;
match (tx_type, outcome) {
(
TransactionType::Get,
OpOutcome::ContractOpSuccess { .. } | OpOutcome::ContractOpSuccessUntimed { .. },
) => (Some(OpType::Get), true),
(TransactionType::Get, OpOutcome::ContractOpFailure { .. }) => (Some(OpType::Get), false),
(
TransactionType::Put,
OpOutcome::ContractOpSuccess { .. } | OpOutcome::ContractOpSuccessUntimed { .. },
) => (Some(OpType::Put), true),
(TransactionType::Put, OpOutcome::ContractOpFailure { .. }) => (Some(OpType::Put), false),
(
TransactionType::Update,
OpOutcome::ContractOpSuccess { .. } | OpOutcome::ContractOpSuccessUntimed { .. },
) => (Some(OpType::Update), true),
(TransactionType::Update, OpOutcome::ContractOpFailure { .. }) => {
(Some(OpType::Update), false)
}
(
TransactionType::Subscribe,
OpOutcome::ContractOpSuccess { .. } | OpOutcome::ContractOpSuccessUntimed { .. },
) => (Some(OpType::Subscribe), true),
(TransactionType::Subscribe, OpOutcome::ContractOpFailure { .. }) => {
(Some(OpType::Subscribe), false)
}
(TransactionType::Get, OpOutcome::Irrelevant) => (Some(OpType::Get), true),
(TransactionType::Put, OpOutcome::Irrelevant) => (Some(OpType::Put), true),
(TransactionType::Update, OpOutcome::Irrelevant) => (Some(OpType::Update), true),
(TransactionType::Subscribe, OpOutcome::Irrelevant) => (Some(OpType::Subscribe), true),
(TransactionType::Get, OpOutcome::Incomplete) => (Some(OpType::Get), false),
(TransactionType::Put, OpOutcome::Incomplete) => (Some(OpType::Put), false),
(TransactionType::Update, OpOutcome::Incomplete) => (Some(OpType::Update), false),
(TransactionType::Subscribe, OpOutcome::Incomplete) => (Some(OpType::Subscribe), false),
_ => (None, false),
}
}
pub fn is_operation_completed(op_result: &Result<Option<OpEnum>, OpError>) -> bool {
match op_result {
Ok(Some(op)) => op.is_completed(),
_ => false,
}
}
#[cfg(test)]
mod tests {
use std::net::{Ipv4Addr, Ipv6Addr};
use super::*;
use crate::operations::OpError;
use rstest::rstest;
#[tokio::test]
async fn test_hostname_resolution_localhost() {
let addr = Address::Hostname("localhost".to_string());
let socket_addr = NodeConfig::parse_socket_addr(&addr).await.unwrap();
assert!(
socket_addr.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST)
|| socket_addr.ip() == IpAddr::V6(Ipv6Addr::LOCALHOST)
);
assert!(socket_addr.port() > 1024);
}
#[tokio::test]
async fn test_hostname_resolution_with_port() {
let addr = Address::Hostname("google.com:8080".to_string());
let socket_addr = NodeConfig::parse_socket_addr(&addr).await.unwrap();
assert_eq!(socket_addr.port(), 8080);
}
#[tokio::test]
async fn test_hostname_resolution_with_trailing_dot() {
let addr = Address::Hostname("localhost.".to_string());
let result = NodeConfig::parse_socket_addr(&addr).await;
if let Ok(socket_addr) = result {
assert!(
socket_addr.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST)
|| socket_addr.ip() == IpAddr::V6(Ipv6Addr::LOCALHOST)
);
}
}
#[tokio::test]
async fn test_hostname_resolution_direct_socket_addr() {
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
let addr = Address::HostAddress(socket);
let resolved = NodeConfig::parse_socket_addr(&addr).await.unwrap();
assert_eq!(resolved, socket);
}
#[tokio::test]
async fn test_hostname_resolution_invalid_port() {
let addr = Address::Hostname("localhost:not_a_port".to_string());
let result = NodeConfig::parse_socket_addr(&addr).await;
assert!(result.is_err());
}
#[ignore]
#[rstest]
#[case::same_addr_different_keys(8080, 8080, true)]
#[case::different_addr_same_key(8080, 8081, false)]
fn test_peer_id_equality(#[case] port1: u16, #[case] port2: u16, #[case] expected_equal: bool) {
let keypair1 = TransportKeypair::new();
let keypair2 = TransportKeypair::new();
let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port1);
let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port2);
let peer1 = PeerId::new(keypair1.public().clone(), addr1);
let peer2 = PeerId::new(keypair2.public().clone(), addr2);
assert_eq!(peer1 == peer2, expected_equal);
}
#[test]
fn test_peer_id_equality_same_key_same_addr() {
let keypair = TransportKeypair::new();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let peer1 = PeerId::new(keypair.public().clone(), addr);
let peer2 = PeerId::new(keypair.public().clone(), addr);
assert_eq!(peer1, peer2);
}
#[test]
fn test_peer_id_equality_different_key_same_addr() {
let keypair1 = TransportKeypair::new();
let keypair2 = TransportKeypair::new();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let peer1 = PeerId::new(keypair1.public().clone(), addr);
let peer2 = PeerId::new(keypair2.public().clone(), addr);
assert_ne!(peer1, peer2);
}
#[test]
fn test_peer_id_equality_different_addr() {
let keypair = TransportKeypair::new();
let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8081);
let peer1 = PeerId::new(keypair.public().clone(), addr1);
let peer2 = PeerId::new(keypair.public().clone(), addr2);
assert_ne!(peer1, peer2);
}
#[rstest]
#[case::lower_port_first(8080, 8081)]
#[case::high_port_diff(1024, 65535)]
fn test_peer_id_ordering(#[case] lower_port: u16, #[case] higher_port: u16) {
let keypair = TransportKeypair::new();
let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), lower_port);
let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), higher_port);
let peer1 = PeerId::new(keypair.public().clone(), addr1);
let peer2 = PeerId::new(keypair.public().clone(), addr2);
assert!(peer1 < peer2);
assert!(peer2 > peer1);
}
#[test]
fn test_peer_id_hash_consistency() {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let keypair = TransportKeypair::new();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let peer1 = PeerId::new(keypair.public().clone(), addr);
let peer2 = PeerId::new(keypair.public().clone(), addr);
let mut hasher1 = DefaultHasher::new();
let mut hasher2 = DefaultHasher::new();
peer1.hash(&mut hasher1);
peer2.hash(&mut hasher2);
assert_eq!(hasher1.finish(), hasher2.finish());
}
#[test]
fn test_peer_id_random_produces_unique() {
let peer1 = PeerId::random();
let peer2 = PeerId::random();
assert_ne!(peer1.socket_addr(), peer2.socket_addr());
}
#[test]
fn test_peer_id_serialization() {
let peer = PeerId::random();
let bytes = peer.to_bytes();
assert!(!bytes.is_empty());
let deserialized: PeerId = bincode::deserialize(&bytes).unwrap();
assert_eq!(peer.socket_addr(), deserialized.socket_addr());
}
#[test]
fn test_peer_id_display() {
let peer = PeerId::random();
let display = format!("{}", peer);
let debug = format!("{:?}", peer);
assert_eq!(display, debug);
assert!(!display.is_empty());
}
#[test]
fn test_init_peer_node_construction() {
let keypair = TransportKeypair::new();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
let peer_key_location = PeerKeyLocation::new(keypair.public().clone(), addr);
let location = Location::new(0.5);
let init_peer = InitPeerNode::new(peer_key_location.clone(), location);
assert_eq!(init_peer.peer_key_location, peer_key_location);
assert_eq!(init_peer.location, location);
}
#[rstest]
#[case::with_none(Ok(None), false)]
#[case::with_running_error(Err(OpError::OpNotAvailable(super::OpNotAvailable::Running)), false)]
#[case::with_state_pushed_error(Err(OpError::StatePushed), false)]
fn test_is_operation_completed(
#[case] result: Result<Option<OpEnum>, OpError>,
#[case] expected: bool,
) {
assert_eq!(is_operation_completed(&result), expected);
}
mod classify_op_outcome_tests {
use super::super::{classify_op_outcome, network_status::OpType};
use crate::message::TransactionType;
use crate::operations::OpOutcome;
#[test]
fn irrelevant_counted_as_success() {
let (op_type, success) =
classify_op_outcome(TransactionType::Update, OpOutcome::Irrelevant);
assert!(matches!(op_type, Some(OpType::Update)));
assert!(success);
}
#[test]
fn incomplete_counted_as_failure() {
let (op_type, success) =
classify_op_outcome(TransactionType::Get, OpOutcome::Incomplete);
assert!(matches!(op_type, Some(OpType::Get)));
assert!(!success);
}
#[test]
fn connect_skipped() {
let (op_type, _) = classify_op_outcome(TransactionType::Connect, OpOutcome::Irrelevant);
assert!(op_type.is_none());
let (op_type, _) = classify_op_outcome(TransactionType::Connect, OpOutcome::Incomplete);
assert!(op_type.is_none());
}
#[test]
fn subscribe_irrelevant_is_success() {
let (op_type, success) =
classify_op_outcome(TransactionType::Subscribe, OpOutcome::Irrelevant);
assert!(matches!(op_type, Some(OpType::Subscribe)));
assert!(success);
}
#[test]
fn put_incomplete_is_failure() {
let (op_type, success) =
classify_op_outcome(TransactionType::Put, OpOutcome::Incomplete);
assert!(matches!(op_type, Some(OpType::Put)));
assert!(!success);
}
}
mod callback_forward_tests {
use super::super::{
OpError, OpNotAvailable, forward_pending_op_result_if_completed,
try_forward_task_per_tx_reply,
};
use crate::message::{MessageStats, NetMessage, NetMessageV1, Transaction};
use crate::operations::OpEnum;
use crate::operations::connect::{ConnectMsg, ConnectOp, ConnectState};
fn completed_connect_op() -> ConnectOp {
ConnectOp::with_state(ConnectState::Completed)
}
fn dummy_reply() -> NetMessage {
NetMessage::V1(NetMessageV1::Aborted(Transaction::new::<ConnectMsg>()))
}
#[tokio::test]
async fn forwards_reply_when_completed_and_sender_present() {
let op = completed_connect_op();
let op_result = Ok(Some(OpEnum::Connect(Box::new(op))));
let (tx, mut rx) = tokio::sync::mpsc::channel::<NetMessage>(1);
let reply = dummy_reply();
let expected_id = *reply.id();
forward_pending_op_result_if_completed(&op_result, Some(&tx), reply);
let received = rx.try_recv().expect("helper should forward the reply");
assert_eq!(*received.id(), expected_id);
}
#[tokio::test]
async fn no_forward_when_sender_absent() {
let op = completed_connect_op();
let op_result = Ok(Some(OpEnum::Connect(Box::new(op))));
forward_pending_op_result_if_completed(&op_result, None, dummy_reply());
}
#[tokio::test]
async fn no_forward_when_op_not_completed() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<NetMessage>(1);
let ok_none: Result<Option<OpEnum>, OpError> = Ok(None);
forward_pending_op_result_if_completed(&ok_none, Some(&tx), dummy_reply());
assert!(rx.try_recv().is_err(), "Ok(None) must not forward");
let err_running: Result<Option<OpEnum>, OpError> =
Err(OpError::OpNotAvailable(OpNotAvailable::Running));
forward_pending_op_result_if_completed(&err_running, Some(&tx), dummy_reply());
assert!(
rx.try_recv().is_err(),
"OpNotAvailable::Running must not forward"
);
let err_completed: Result<Option<OpEnum>, OpError> =
Err(OpError::OpNotAvailable(OpNotAvailable::Completed));
forward_pending_op_result_if_completed(&err_completed, Some(&tx), dummy_reply());
assert!(
rx.try_recv().is_err(),
"OpNotAvailable::Completed must not forward (no OpEnum payload)"
);
}
#[tokio::test]
async fn no_forward_when_op_in_progress() {
use crate::operations::connect::JoinerState;
use std::collections::HashSet;
use tokio::time::Instant;
let waiting = ConnectState::WaitingForResponses(JoinerState {
target_connections: 1,
observed_address: None,
accepted: HashSet::new(),
last_progress: Instant::now(),
started_without_address: true,
});
let op = ConnectOp::with_state(waiting);
assert!(
!op.is_completed(),
"precondition: WaitingForResponses must not be completed"
);
let op_result = Ok(Some(OpEnum::Connect(Box::new(op))));
let (tx, mut rx) = tokio::sync::mpsc::channel::<NetMessage>(1);
forward_pending_op_result_if_completed(&op_result, Some(&tx), dummy_reply());
assert!(
rx.try_recv().is_err(),
"in-progress op must not forward to pending_op_result"
);
}
#[tokio::test]
async fn no_hang_when_receiver_dropped() {
let op = completed_connect_op();
let op_result = Ok(Some(OpEnum::Connect(Box::new(op))));
let (tx, rx) = tokio::sync::mpsc::channel::<NetMessage>(1);
drop(rx);
forward_pending_op_result_if_completed(&op_result, Some(&tx), dummy_reply());
}
#[tokio::test]
async fn bypass_forwards_when_callback_registered() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<NetMessage>(1);
let reply = dummy_reply();
let expected_id = *reply.id();
let taken = try_forward_task_per_tx_reply(Some(&tx), reply, "subscribe");
assert!(taken, "callback present → bypass must be taken");
let received = rx
.try_recv()
.expect("helper should forward the reply to the callback");
assert_eq!(*received.id(), expected_id);
}
#[tokio::test]
async fn bypass_returns_false_when_no_callback() {
let taken = try_forward_task_per_tx_reply(None, dummy_reply(), "subscribe");
assert!(!taken, "no callback → bypass must not be taken");
}
#[tokio::test]
async fn bypass_returns_true_even_when_receiver_dropped() {
let (tx, rx) = tokio::sync::mpsc::channel::<NetMessage>(1);
drop(rx);
let taken = try_forward_task_per_tx_reply(Some(&tx), dummy_reply(), "subscribe");
assert!(
taken,
"callback present but receiver dropped → bypass still taken"
);
}
#[test]
fn bypass_is_wired_into_subscribe_branch_regression_guard() {
const SOURCE: &str = include_str!("node.rs");
let subscribe_branch_anchor = "NetMessageV1::Subscribe(ref op) => {";
let branch_start = SOURCE.find(subscribe_branch_anchor).expect(
"SUBSCRIBE branch of handle_pure_network_message_v1 not found; \
the match arm has been renamed or moved — update this regression guard",
);
let window_end = SOURCE[branch_start..]
.find("handle_op_request::<subscribe::SubscribeOp, _>")
.expect("SUBSCRIBE branch no longer calls handle_op_request — update guard")
+ branch_start;
let window = &SOURCE[branch_start..window_end];
assert!(
window.contains("try_forward_task_per_tx_reply("),
"SUBSCRIBE branch no longer calls try_forward_task_per_tx_reply \
before handle_op_request. This is the bypass Phase 2b (#1454) \
added to prevent task-per-tx callers from hanging on replies \
that load_or_init would drop as OpNotPresent. Either restore \
the bypass invocation or update this regression guard if the \
branch has been legitimately refactored."
);
assert!(
window.contains("matches!(op, subscribe::SubscribeMsg::Response { .. })"),
"SUBSCRIBE branch bypass is not gated on Response-only. \
Non-terminal messages (ForwardingAck, Unsubscribe) must NOT \
be forwarded to the task-per-tx channel — they would fill \
the capacity-1 reply slot and block the real Response."
);
}
#[tokio::test]
async fn bypass_does_not_block_when_channel_already_full() {
let (tx, _rx) = tokio::sync::mpsc::channel::<NetMessage>(1);
tx.try_send(dummy_reply())
.expect("capacity-1 channel should accept first message");
let taken = try_forward_task_per_tx_reply(Some(&tx), dummy_reply(), "subscribe");
assert!(
taken,
"callback present but channel full → bypass still taken"
);
}
use crate::operations::VisitedPeers;
use crate::operations::subscribe::{SubscribeMsg, SubscribeMsgResult};
fn subscribe_branch_would_forward(
op: &SubscribeMsg,
callback: Option<&tokio::sync::mpsc::Sender<NetMessage>>,
) -> bool {
matches!(op, SubscribeMsg::Response { .. })
&& try_forward_task_per_tx_reply(
callback,
NetMessage::V1(NetMessageV1::Subscribe(op.clone())),
"subscribe",
)
}
#[tokio::test]
async fn subscribe_response_is_forwarded_to_task() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<NetMessage>(1);
let sub_tx = Transaction::new::<SubscribeMsg>();
let instance_id = freenet_stdlib::prelude::ContractInstanceId::new([1u8; 32]);
let key = freenet_stdlib::prelude::ContractKey::from_id_and_code(
instance_id,
freenet_stdlib::prelude::CodeHash::new([2u8; 32]),
);
let op = SubscribeMsg::Response {
id: sub_tx,
instance_id,
result: SubscribeMsgResult::Subscribed { key },
};
let taken = subscribe_branch_would_forward(&op, Some(&tx));
assert!(taken, "Response with callback → must be forwarded");
let received = rx.try_recv().expect("Response should be in channel");
assert_eq!(*received.id(), sub_tx);
}
#[tokio::test]
async fn forwarding_ack_is_not_forwarded_to_task() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<NetMessage>(1);
let sub_tx = Transaction::new::<SubscribeMsg>();
let instance_id = freenet_stdlib::prelude::ContractInstanceId::new([3u8; 32]);
let op = SubscribeMsg::ForwardingAck {
id: sub_tx,
instance_id,
};
let taken = subscribe_branch_would_forward(&op, Some(&tx));
assert!(
!taken,
"ForwardingAck must NOT be forwarded to task channel"
);
assert!(
rx.try_recv().is_err(),
"channel must remain empty after ForwardingAck"
);
}
#[tokio::test]
async fn unsubscribe_is_not_forwarded_to_task() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<NetMessage>(1);
let sub_tx = Transaction::new::<SubscribeMsg>();
let instance_id = freenet_stdlib::prelude::ContractInstanceId::new([4u8; 32]);
let op = SubscribeMsg::Unsubscribe {
id: sub_tx,
instance_id,
};
let taken = subscribe_branch_would_forward(&op, Some(&tx));
assert!(!taken, "Unsubscribe must NOT be forwarded to task channel");
assert!(rx.try_recv().is_err(), "channel must remain empty");
}
#[tokio::test]
async fn request_is_not_forwarded_to_task() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<NetMessage>(1);
let sub_tx = Transaction::new::<SubscribeMsg>();
let instance_id = freenet_stdlib::prelude::ContractInstanceId::new([5u8; 32]);
let op = SubscribeMsg::Request {
id: sub_tx,
instance_id,
htl: 5,
visited: VisitedPeers::new(&sub_tx),
is_renewal: false,
};
let taken = subscribe_branch_would_forward(&op, Some(&tx));
assert!(!taken, "Request must NOT be forwarded to task channel");
assert!(rx.try_recv().is_err(), "channel must remain empty");
}
#[tokio::test]
async fn response_without_callback_falls_through() {
let sub_tx = Transaction::new::<SubscribeMsg>();
let instance_id = freenet_stdlib::prelude::ContractInstanceId::new([6u8; 32]);
let op = SubscribeMsg::Response {
id: sub_tx,
instance_id,
result: SubscribeMsgResult::NotFound,
};
let taken = subscribe_branch_would_forward(&op, None);
assert!(
!taken,
"Response without callback → must fall through to legacy path"
);
}
#[test]
fn bypass_is_wired_into_put_branch_regression_guard() {
const SOURCE: &str = include_str!("node.rs");
let put_branch_anchor = "NetMessageV1::Put(ref op) => {";
let branch_start = SOURCE.find(put_branch_anchor).expect(
"PUT branch of handle_pure_network_message_v1 not found; \
the match arm has been renamed or moved — update this regression guard",
);
let window_end = SOURCE[branch_start..]
.find("handle_op_request::<put::PutOp, _>")
.expect("PUT branch no longer calls handle_op_request — update guard")
+ branch_start;
let window = &SOURCE[branch_start..window_end];
assert!(
window.contains("try_forward_task_per_tx_reply("),
"PUT branch no longer calls try_forward_task_per_tx_reply \
before handle_op_request. This is the bypass Phase 3a (#1454) \
added to prevent task-per-tx callers from hanging on replies \
that load_or_init would drop as OpNotPresent. Either restore \
the bypass invocation or update this regression guard if the \
branch has been legitimately refactored."
);
assert!(
window.contains("put::PutMsg::Response { .. }"),
"PUT branch bypass is not gated on Response. \
Non-terminal messages must NOT be forwarded to the task-per-tx channel."
);
assert!(
window.contains("put::PutMsg::ResponseStreaming { .. }"),
"PUT branch bypass is not gated on ResponseStreaming. \
Both terminal variants must be forwarded."
);
}
use crate::operations::put::PutMsg;
use freenet_stdlib::prelude::*;
fn dummy_put_key(a: u8, b: u8) -> ContractKey {
ContractKey::from_id_and_code(ContractInstanceId::new([a; 32]), CodeHash::new([b; 32]))
}
fn put_branch_would_forward(
op: &PutMsg,
callback: Option<&tokio::sync::mpsc::Sender<NetMessage>>,
) -> bool {
matches!(
op,
PutMsg::Response { .. } | PutMsg::ResponseStreaming { .. }
) && try_forward_task_per_tx_reply(
callback,
NetMessage::V1(NetMessageV1::Put(op.clone())),
"put",
)
}
#[tokio::test]
async fn put_response_is_forwarded_to_task() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<NetMessage>(1);
let put_tx = Transaction::new::<PutMsg>();
let key = dummy_put_key(10, 11);
let op = PutMsg::Response { id: put_tx, key };
let taken = put_branch_would_forward(&op, Some(&tx));
assert!(taken, "Response with callback → must be forwarded");
let received = rx.try_recv().expect("Response should be in channel");
assert_eq!(*received.id(), put_tx);
}
#[tokio::test]
async fn put_response_streaming_is_forwarded_to_task() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<NetMessage>(1);
let put_tx = Transaction::new::<PutMsg>();
let key = dummy_put_key(12, 13);
let op = PutMsg::ResponseStreaming {
id: put_tx,
key,
continue_forwarding: false,
};
let taken = put_branch_would_forward(&op, Some(&tx));
assert!(taken, "ResponseStreaming with callback → must be forwarded");
let received = rx
.try_recv()
.expect("ResponseStreaming should be in channel");
assert_eq!(*received.id(), put_tx);
}
#[tokio::test]
async fn put_forwarding_ack_is_not_forwarded_to_task() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<NetMessage>(1);
let put_tx = Transaction::new::<PutMsg>();
let key = dummy_put_key(14, 15);
let op = PutMsg::ForwardingAck {
id: put_tx,
contract_key: key,
};
let taken = put_branch_would_forward(&op, Some(&tx));
assert!(
!taken,
"ForwardingAck must NOT be forwarded to task channel"
);
assert!(
rx.try_recv().is_err(),
"channel must remain empty after ForwardingAck"
);
}
#[tokio::test]
async fn put_request_is_not_forwarded_to_task() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<NetMessage>(1);
let put_tx = Transaction::new::<PutMsg>();
let op = PutMsg::Request {
id: put_tx,
contract: ContractContainer::Wasm(ContractWasmAPIVersion::V1(
WrappedContract::new(
std::sync::Arc::new(ContractCode::from(vec![0u8])),
Parameters::from(vec![]),
),
)),
related_contracts: RelatedContracts::default(),
value: WrappedState::new(vec![1u8]),
htl: 5,
skip_list: std::collections::HashSet::new(),
};
let taken = put_branch_would_forward(&op, Some(&tx));
assert!(!taken, "Request must NOT be forwarded to task channel");
assert!(rx.try_recv().is_err(), "channel must remain empty");
}
#[tokio::test]
async fn put_response_without_callback_falls_through() {
let put_tx = Transaction::new::<PutMsg>();
let key = dummy_put_key(16, 17);
let op = PutMsg::Response { id: put_tx, key };
let taken = put_branch_would_forward(&op, None);
assert!(
!taken,
"Response without callback → must fall through to legacy path"
);
}
#[test]
fn bypass_is_wired_into_get_branch_regression_guard() {
const SOURCE: &str = include_str!("node.rs");
let get_branch_anchor = "NetMessageV1::Get(ref op) => {";
let branch_start = SOURCE.find(get_branch_anchor).expect(
"GET branch of handle_pure_network_message_v1 not found; \
the match arm has been renamed or moved — update this regression guard",
);
let window_end = SOURCE[branch_start..]
.find("handle_op_request::<get::GetOp, _>")
.expect("GET branch no longer calls handle_op_request — update guard")
+ branch_start;
let window = &SOURCE[branch_start..window_end];
assert!(
window.contains("try_forward_task_per_tx_reply("),
"GET branch no longer calls try_forward_task_per_tx_reply \
before handle_op_request. Phase-3b (#1454) bypass removed — restore it."
);
assert!(
window.contains("get::GetMsg::Response { .. }"),
"GET branch bypass is not gated on Response. \
Non-terminal messages must NOT be forwarded to the task-per-tx channel."
);
assert!(
window.contains("get::GetMsg::ResponseStreaming { .. }"),
"GET branch bypass is not gated on ResponseStreaming. \
Both terminal variants must be forwarded."
);
assert!(
window.contains("start_relay_get("),
"GET branch no longer calls start_relay_get for relay dispatch. \
#3883 phase-5 relay dispatch was removed — restore it."
);
assert!(
window.contains("source_addr"),
"GET branch relay dispatch is not gated on source_addr. \
Originator loop-back (source_addr.is_none()) must fall through to legacy."
);
assert!(
window.contains("has_get_op"),
"GET branch relay dispatch is not guarded by has_get_op. \
GC-spawned retries must fall through to legacy handle_op_request."
);
}
#[test]
fn get_branch_phase3b_bypass_precedes_relay_dispatch() {
const SOURCE: &str = include_str!("node.rs");
let get_branch_anchor = "NetMessageV1::Get(ref op) => {";
let branch_start = SOURCE
.find(get_branch_anchor)
.expect("GET branch not found");
let window_end = SOURCE[branch_start..]
.find("handle_op_request::<get::GetOp, _>")
.expect("GET branch no longer calls handle_op_request")
+ branch_start;
let window = &SOURCE[branch_start..window_end];
let bypass_pos = window
.find("try_forward_task_per_tx_reply(")
.expect("try_forward_task_per_tx_reply not found in GET branch");
let relay_pos = window
.find("start_relay_get(")
.expect("start_relay_get not found in GET branch");
assert!(
bypass_pos < relay_pos,
"Phase-3b bypass (try_forward_task_per_tx_reply) must appear \
BEFORE relay dispatch (start_relay_get) in the GET branch. \
Swapping order would break the client-driver terminal-reply fast path."
);
}
#[test]
fn update_branch_dispatch_calls_relay_drivers() {
const SOURCE: &str = include_str!("node.rs");
let anchor = "NetMessageV1::Update(ref op) => {";
let branch_start = SOURCE.find(anchor).expect(
"UPDATE branch of handle_pure_network_message_v1 not found; \
the match arm has been renamed or moved — update this regression guard",
);
let window_end = SOURCE[branch_start..]
.find("handle_op_request::<update::UpdateOp, _>")
.expect("UPDATE branch no longer calls handle_op_request — update guard")
+ branch_start;
let window = &SOURCE[branch_start..window_end];
assert!(
window.contains("start_relay_request_update("),
"UPDATE branch no longer calls start_relay_request_update for relay \
dispatch. #1454 phase-5 relay UPDATE dispatch was removed — restore it."
);
assert!(
window.contains("start_relay_broadcast_to("),
"UPDATE branch no longer calls start_relay_broadcast_to for relay \
dispatch. #1454 phase-5 relay UPDATE dispatch was removed — restore it."
);
}
#[test]
fn update_branch_dispatch_gates_on_source_and_existing_op() {
const SOURCE: &str = include_str!("node.rs");
let anchor = "NetMessageV1::Update(ref op) => {";
let branch_start = SOURCE.find(anchor).expect("UPDATE branch not found");
let window_end = SOURCE[branch_start..]
.find("handle_op_request::<update::UpdateOp, _>")
.expect("UPDATE branch no longer calls handle_op_request")
+ branch_start;
let window = &SOURCE[branch_start..window_end];
assert!(
window.contains("source_addr"),
"UPDATE relay dispatch must be gated on source_addr — \
internal callers (source_addr.is_none()) must fall through to legacy."
);
assert!(
window.contains("has_update_op"),
"UPDATE relay dispatch must be guarded by has_update_op — \
GC-spawned retries / pre-registered ops must fall through to legacy."
);
}
#[test]
fn update_branch_does_not_dispatch_streaming_or_broadcasting() {
const SOURCE: &str = include_str!("node.rs");
let anchor = "NetMessageV1::Update(ref op) => {";
let branch_start = SOURCE.find(anchor).expect("UPDATE branch not found");
let window_end = SOURCE[branch_start..]
.find("handle_op_request::<update::UpdateOp, _>")
.expect("UPDATE branch no longer calls handle_op_request")
+ branch_start;
let window = &SOURCE[branch_start..window_end];
assert!(
!window.contains("RequestUpdateStreaming {")
|| window.contains("// Streaming variants"),
"UPDATE branch appears to dispatch RequestUpdateStreaming through \
the relay driver. Slice A keeps streaming on the legacy path — \
see docs/port-plans/relay-update-task-per-tx.md."
);
assert!(
!window.contains("BroadcastToStreaming {")
|| window.contains("// Streaming variants"),
"UPDATE branch appears to dispatch BroadcastToStreaming through \
the relay driver. Slice A keeps streaming on the legacy path."
);
assert!(
!window.contains("UpdateMsg::Broadcasting {"),
"UPDATE branch dispatches deprecated Broadcasting variant through \
the relay driver. Broadcasting must stay on the legacy path."
);
}
}
}