#[cfg(debug_assertions)]
use std::backtrace::Backtrace as StdTrace;
use std::time::Duration;
use freenet_stdlib::prelude::{ContractInstanceId, ContractKey};
use tokio::sync::mpsc::error::SendError;
use crate::{
config::GlobalExecutor,
contract::{ContractError, ExecutorError},
message::{Transaction, TransactionType},
node::{ConnectionError, OpManager},
ring::{Location, PeerKeyLocation, RingError},
};
pub(crate) mod connect;
pub(crate) mod get;
pub(crate) mod op_ctx;
pub(crate) mod orphan_streams;
pub(crate) mod put;
pub(crate) mod subscribe;
#[cfg(test)]
pub(crate) mod test_utils;
pub(crate) mod update;
pub(crate) mod visited_peers;
pub(crate) use op_ctx::OpCtx;
pub(crate) use visited_peers::VisitedPeers;
#[derive(Debug)]
#[allow(dead_code)]
pub(crate) enum OpOutcome<'a> {
ContractOpSuccess {
target_peer: &'a PeerKeyLocation,
contract_location: Location,
first_response_time: Duration,
payload_size: usize,
payload_transfer_time: Duration,
},
ContractOpSuccessUntimed {
target_peer: &'a PeerKeyLocation,
contract_location: Location,
},
ContractOpFailure {
target_peer: &'a PeerKeyLocation,
contract_location: Location,
},
Incomplete,
Irrelevant,
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum OpError {
#[error(transparent)]
ConnError(#[from] ConnectionError),
#[error(transparent)]
RingError(#[from] RingError),
#[error(transparent)]
ContractError(#[from] ContractError),
#[error(transparent)]
ExecutorError(#[from] ExecutorError),
#[error("unexpected operation state")]
UnexpectedOpState,
#[error(
"cannot perform a state transition from the current state with the provided input (tx: {tx})"
)]
InvalidStateTransition {
tx: Transaction,
#[cfg(debug_assertions)]
state: Option<Box<dyn std::fmt::Debug + Send + Sync>>,
#[cfg(debug_assertions)]
trace: StdTrace,
},
#[error("failed notifying, channel closed")]
NotificationError,
#[error("notification channel error: {0}")]
NotificationChannelError(String),
#[allow(dead_code)]
#[error("unspected transaction type, trying to get a {0:?} from a {1:?}")]
IncorrectTxType(TransactionType, TransactionType),
#[allow(dead_code)]
#[error("op not present: {0}")]
OpNotPresent(Transaction),
#[error("stream was cancelled")]
StreamCancelled,
#[error("failed to claim orphan stream")]
OrphanStreamClaimFailed,
}
impl OpError {
pub fn invalid_transition(tx: Transaction) -> Self {
Self::InvalidStateTransition {
tx,
#[cfg(debug_assertions)]
state: None,
#[cfg(debug_assertions)]
trace: StdTrace::force_capture(),
}
}
pub fn is_contract_exec_rejection(&self) -> bool {
matches!(self, Self::ExecutorError(e) if e.is_contract_exec_rejection())
}
pub fn is_missing_contract_parameters(&self) -> bool {
matches!(self, Self::ExecutorError(e) if e.is_missing_contract_parameters())
}
pub fn is_invalid_update_rejection(&self) -> bool {
matches!(self, Self::ExecutorError(e) if e.is_invalid_update_rejection())
}
pub fn is_contract_queue_full(&self) -> bool {
matches!(self, Self::ExecutorError(e) if e.is_contract_queue_full())
}
}
impl<T> From<SendError<T>> for OpError {
fn from(_: SendError<T>) -> OpError {
OpError::NotificationError
}
}
pub(crate) async fn announce_contract_hosted(op_manager: &OpManager, key: &ContractKey) {
if let Some(announcement) = op_manager.neighbor_hosting.on_contract_hosted(key) {
tracing::debug!(
%key,
"NEIGHBOR_HOSTING: Announcing contract hosted to neighbors"
);
if let Err(err) = op_manager
.notify_node_event(crate::message::NodeEvent::BroadcastHostingUpdate {
message: announcement,
})
.await
{
tracing::warn!(
contract = %key,
error = %err,
phase = "error",
"NEIGHBOR_HOSTING: Failed to broadcast hosting announcement"
);
}
}
}
pub(crate) fn reclaim_evicted_contract(
op_manager: &OpManager,
key: ContractKey,
expected_generation: u64,
) {
if op_manager.ring.contract_in_use(&key) {
tracing::debug!(
contract = %key,
"Skipping disk reclamation for evicted contract — still in use \
(client subscription or downstream subscriber); queued for retry"
);
op_manager
.ring
.pending_reclamation_add(key, expected_generation);
return;
}
op_manager.notify_contract_handler_fire_and_forget(
crate::contract::ContractHandlerEvent::EvictContract {
key,
expected_generation,
},
);
}
pub(crate) async fn complete_piggyback_subscription(
op_manager: &OpManager,
key: &ContractKey,
tx: &crate::message::Transaction,
sender_from_addr: &Option<crate::ring::PeerKeyLocation>,
) {
op_manager.ring.subscribe(*key);
op_manager.ring.complete_subscription_request(key, true);
let became_interested = op_manager.interest_manager.add_local_client(key);
if became_interested {
broadcast_change_interests(op_manager, vec![*key], vec![]).await;
}
announce_contract_hosted(op_manager, key).await;
if let Some(upstream_pkl) = sender_from_addr.as_ref() {
let peer_key = crate::ring::interest::PeerKey::from(upstream_pkl.pub_key.clone());
op_manager
.interest_manager
.register_peer_interest(key, peer_key, None, true);
tracing::debug!(tx = %tx, contract = %key, "Subscription completed via GET piggyback");
} else {
tracing::warn!(
tx = %tx,
contract = %key,
"GET piggyback: upstream peer not in ring, subscription tree incomplete -- renewal will heal"
);
}
}
pub(crate) async fn auto_subscribe_on_get_response(
op_manager: &OpManager,
key: &ContractKey,
tx: &crate::message::Transaction,
sender_from_addr: &Option<crate::ring::PeerKeyLocation>,
subscribe_requested: bool,
blocking_sub: bool,
path_label: &str,
) {
if subscribe_requested {
complete_piggyback_subscription(op_manager, key, tx, sender_from_addr).await;
} else {
let child_tx = start_subscription_request(op_manager, *tx, *key, blocking_sub);
tracing::debug!(tx = %tx, %child_tx, blocking = %blocking_sub, "started subscription ({path_label}, fallback)");
}
}
pub(crate) async fn broadcast_change_interests(
op_manager: &OpManager,
added: Vec<ContractKey>,
removed: Vec<ContractKey>,
) {
use crate::ring::interest::contract_hash;
if added.is_empty() && removed.is_empty() {
return;
}
let added_hashes: Vec<u32> = added.iter().map(contract_hash).collect();
let removed_hashes: Vec<u32> = removed.iter().map(contract_hash).collect();
tracing::debug!(
added_count = added_hashes.len(),
removed_count = removed_hashes.len(),
"Broadcasting ChangeInterests to neighbors"
);
if let Err(err) =
op_manager.try_notify_node_event(crate::message::NodeEvent::BroadcastChangeInterests {
added: added_hashes,
removed: removed_hashes,
})
{
tracing::debug!(
error = %err,
"Failed to broadcast ChangeInterests (best-effort)"
);
}
}
pub(super) fn start_subscription_request(
op_manager: &OpManager,
parent_tx: Transaction,
key: ContractKey,
blocking: bool,
) -> Transaction {
let child_tx = Transaction::new_child_of::<subscribe::SubscribeMsg>(&parent_tx);
tracing::debug!(
%parent_tx,
%child_tx,
%key,
blocking,
"spawning child subscription operation (driver)"
);
let op_manager_arc = std::sync::Arc::new(op_manager.clone());
let instance_id = *key.id();
GlobalExecutor::spawn(async move {
subscribe::run_client_subscribe(op_manager_arc, instance_id, child_tx).await;
});
child_tx
}
pub(crate) async fn has_contract(
op_manager: &OpManager,
instance_id: ContractInstanceId,
) -> Result<Option<ContractKey>, OpError> {
match op_manager
.notify_contract_handler(crate::contract::ContractHandlerEvent::GetQuery {
instance_id,
return_contract_code: false,
})
.await?
{
crate::contract::ContractHandlerEvent::GetResponse {
key,
response: Ok(crate::contract::StoreResponse { state: Some(_), .. }),
} => Ok(key),
crate::contract::ContractHandlerEvent::DelegateRequest { .. }
| crate::contract::ContractHandlerEvent::DelegateResponse(_)
| crate::contract::ContractHandlerEvent::PutQuery { .. }
| crate::contract::ContractHandlerEvent::PutResponse { .. }
| crate::contract::ContractHandlerEvent::GetQuery { .. }
| crate::contract::ContractHandlerEvent::GetResponse { .. }
| crate::contract::ContractHandlerEvent::UpdateQuery { .. }
| crate::contract::ContractHandlerEvent::UpdateResponse { .. }
| crate::contract::ContractHandlerEvent::UpdateNoChange { .. }
| crate::contract::ContractHandlerEvent::RegisterSubscriberListener { .. }
| crate::contract::ContractHandlerEvent::RegisterSubscriberListenerResponse
| crate::contract::ContractHandlerEvent::QuerySubscriptions { .. }
| crate::contract::ContractHandlerEvent::QuerySubscriptionsResponse
| crate::contract::ContractHandlerEvent::GetSummaryQuery { .. }
| crate::contract::ContractHandlerEvent::GetSummaryResponse { .. }
| crate::contract::ContractHandlerEvent::GetDeltaQuery { .. }
| crate::contract::ContractHandlerEvent::GetDeltaResponse { .. }
| crate::contract::ContractHandlerEvent::ClientDisconnect { .. }
| crate::contract::ContractHandlerEvent::EvictContract { .. } => Ok(None),
}
}
pub(crate) fn should_use_streaming(streaming_threshold: usize, payload_size: usize) -> bool {
payload_size > streaming_threshold
}
const STREAMING_THROUGHPUT_FLOOR_BPS: usize = 20 * 1024;
const STREAMING_MIN_DRAIN_SECS: u64 = 30;
const STREAMING_ATTEMPT_TIMEOUT_CAP: std::time::Duration = std::time::Duration::from_secs(600);
pub(crate) fn streaming_aware_attempt_timeout(
streaming_threshold: usize,
payload_size: usize,
) -> std::time::Duration {
if !should_use_streaming(streaming_threshold, payload_size) {
return crate::config::OPERATION_TTL;
}
let drain_secs =
((payload_size / STREAMING_THROUGHPUT_FLOOR_BPS) as u64).max(STREAMING_MIN_DRAIN_SECS);
let total = crate::config::OPERATION_TTL + std::time::Duration::from_secs(drain_secs);
total.min(STREAMING_ATTEMPT_TIMEOUT_CAP)
}
pub(crate) fn record_relay_route_event(
op_manager: &OpManager,
next_hop: PeerKeyLocation,
contract_location: Location,
outcome: crate::router::RouteOutcome,
op_type: crate::node::network_status::OpType,
) {
#[cfg(any(test, feature = "testing"))]
{
use std::sync::atomic::Ordering;
let counter = match op_type {
crate::node::network_status::OpType::Get => &RELAY_GET_ROUTE_EVENT_COUNT,
crate::node::network_status::OpType::Put => &RELAY_PUT_ROUTE_EVENT_COUNT,
crate::node::network_status::OpType::Update => &RELAY_UPDATE_ROUTE_EVENT_COUNT,
crate::node::network_status::OpType::Subscribe => &RELAY_SUBSCRIBE_ROUTE_EVENT_COUNT,
};
counter.fetch_add(1, Ordering::Relaxed);
}
op_manager
.ring
.router
.write()
.add_event(crate::router::RouteEvent {
peer: next_hop,
contract_location,
outcome,
op_type: Some(op_type),
});
}
#[cfg(any(test, feature = "testing"))]
pub static RELAY_GET_ROUTE_EVENT_COUNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
#[cfg(any(test, feature = "testing"))]
pub static RELAY_PUT_ROUTE_EVENT_COUNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
#[cfg(any(test, feature = "testing"))]
pub static RELAY_UPDATE_ROUTE_EVENT_COUNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
#[cfg(any(test, feature = "testing"))]
pub static RELAY_SUBSCRIBE_ROUTE_EVENT_COUNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
#[cfg(test)]
mod ordering_invariant_tests {
use super::test_utils::MockNetworkBridge;
use crate::message::{NetMessage, NetMessageV1, Transaction};
use crate::node::NetworkBridge;
use crate::operations::connect::ConnectMsg;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
#[tokio::test]
async fn mock_network_bridge_records_send_ordering() {
let bridge = MockNetworkBridge::new();
let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5000);
let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5001);
let tx1 = Transaction::new::<ConnectMsg>();
let tx2 = Transaction::new::<ConnectMsg>();
bridge
.send(addr1, NetMessage::V1(NetMessageV1::Aborted(tx1)))
.await
.unwrap();
bridge
.send(addr2, NetMessage::V1(NetMessageV1::Aborted(tx2)))
.await
.unwrap();
let sent = bridge.sent_messages();
assert_eq!(sent.len(), 2);
assert_eq!(sent[0].0, addr1, "First send should be to addr1");
assert_eq!(sent[1].0, addr2, "Second send should be to addr2");
}
#[test]
fn push_before_send_invariant_is_documented() {
}
}
#[cfg(test)]
mod streaming_tests {
use super::{
STREAMING_ATTEMPT_TIMEOUT_CAP, should_use_streaming, streaming_aware_attempt_timeout,
};
use crate::config::OPERATION_TTL;
use std::time::Duration;
const DEFAULT_THRESHOLD: usize = 64 * 1024;
#[test]
fn test_streaming_respects_threshold() {
assert!(!should_use_streaming(DEFAULT_THRESHOLD, 0));
assert!(!should_use_streaming(DEFAULT_THRESHOLD, 1000));
assert!(!should_use_streaming(DEFAULT_THRESHOLD, DEFAULT_THRESHOLD)); assert!(should_use_streaming(
DEFAULT_THRESHOLD,
DEFAULT_THRESHOLD + 1
)); assert!(should_use_streaming(DEFAULT_THRESHOLD, 1024 * 1024)); }
#[test]
fn test_streaming_custom_threshold() {
let custom_threshold = 128 * 1024; assert!(!should_use_streaming(custom_threshold, 64 * 1024));
assert!(!should_use_streaming(custom_threshold, custom_threshold));
assert!(should_use_streaming(custom_threshold, custom_threshold + 1));
}
#[test]
fn test_streaming_zero_threshold() {
assert!(!should_use_streaming(0, 0));
assert!(should_use_streaming(0, 1));
assert!(should_use_streaming(0, 100));
}
#[test]
fn non_streaming_payload_uses_operation_ttl() {
assert_eq!(
streaming_aware_attempt_timeout(DEFAULT_THRESHOLD, 0),
OPERATION_TTL
);
assert_eq!(
streaming_aware_attempt_timeout(DEFAULT_THRESHOLD, 1024),
OPERATION_TTL
);
assert_eq!(
streaming_aware_attempt_timeout(DEFAULT_THRESHOLD, DEFAULT_THRESHOLD),
OPERATION_TTL
);
}
#[test]
fn website_payload_attempt_timeout_exceeds_observed_completion() {
let website_payload_size = 2_460_242; let timeout = streaming_aware_attempt_timeout(DEFAULT_THRESHOLD, website_payload_size);
let observed_completion = Duration::from_secs(63);
assert!(
timeout > observed_completion,
"streaming-aware timeout ({timeout:?}) must exceed observed \
completion time ({observed_completion:?}) so the retry loop \
does not fire while the original streaming PUT is still in \
flight (issue #4001)"
);
assert!(
timeout > OPERATION_TTL,
"streaming-aware timeout ({timeout:?}) must exceed OPERATION_TTL \
({OPERATION_TTL:?}); otherwise the fix is a no-op for the bug \
reported in #4001"
);
}
#[test]
fn streaming_timeout_scales_with_payload_size() {
let small_streaming = streaming_aware_attempt_timeout(DEFAULT_THRESHOLD, 1_000_000);
let medium_streaming = streaming_aware_attempt_timeout(DEFAULT_THRESHOLD, 10_000_000);
assert!(
small_streaming < medium_streaming,
"1 MB timeout ({small_streaming:?}) must be smaller than \
10 MB timeout ({medium_streaming:?})"
);
assert!(small_streaming > OPERATION_TTL);
assert!(medium_streaming > OPERATION_TTL);
}
#[test]
fn streaming_timeout_capped_at_ceiling() {
let huge = streaming_aware_attempt_timeout(DEFAULT_THRESHOLD, 1024 * 1024 * 1024);
assert_eq!(
huge, STREAMING_ATTEMPT_TIMEOUT_CAP,
"huge payloads must clamp to the cap"
);
}
#[test]
fn streaming_timeout_jumps_above_threshold_boundary() {
let at_threshold = streaming_aware_attempt_timeout(DEFAULT_THRESHOLD, DEFAULT_THRESHOLD);
let just_above = streaming_aware_attempt_timeout(DEFAULT_THRESHOLD, DEFAULT_THRESHOLD + 1);
let in_truncation_gap =
streaming_aware_attempt_timeout(DEFAULT_THRESHOLD, DEFAULT_THRESHOLD + 19 * 1024);
assert_eq!(at_threshold, OPERATION_TTL);
assert!(
just_above > OPERATION_TTL,
"just-above-threshold timeout {just_above:?} must STRICTLY \
exceed OPERATION_TTL ({OPERATION_TTL:?}); a fix that lets \
this equal OPERATION_TTL is a no-op for the size range \
(threshold, threshold + 20 KiB) — exactly the truncation \
gap STREAMING_MIN_DRAIN_SECS exists to close (#4001 \
skeptical review)"
);
assert!(
in_truncation_gap > OPERATION_TTL,
"payload in the truncation gap (threshold + 19 KiB) must \
exceed OPERATION_TTL — STREAMING_MIN_DRAIN_SECS guarantees it"
);
}
#[test]
fn streaming_timeout_min_drain_floor() {
let just_above = streaming_aware_attempt_timeout(DEFAULT_THRESHOLD, DEFAULT_THRESHOLD + 1);
assert_eq!(
just_above,
OPERATION_TTL + Duration::from_secs(super::STREAMING_MIN_DRAIN_SECS),
"streaming-eligible payloads must get at least \
OPERATION_TTL + STREAMING_MIN_DRAIN_SECS"
);
}
#[test]
fn streaming_timeout_cap_boundary() {
const FLOOR_BPS: usize = 20 * 1024;
let scaling_max_bytes =
(STREAMING_ATTEMPT_TIMEOUT_CAP - OPERATION_TTL).as_secs() as usize * FLOOR_BPS;
let just_below_cap = streaming_aware_attempt_timeout(DEFAULT_THRESHOLD, scaling_max_bytes);
let at_cap = streaming_aware_attempt_timeout(DEFAULT_THRESHOLD, scaling_max_bytes + 1);
assert_eq!(just_below_cap, STREAMING_ATTEMPT_TIMEOUT_CAP);
assert_eq!(at_cap, STREAMING_ATTEMPT_TIMEOUT_CAP);
}
}
#[cfg(test)]
mod sub_op_subscribe_pin_tests {
fn extract_start_subscription_request_body() -> &'static str {
let src = include_str!("operations.rs");
let head = ["fn ", "start_subscription_request("].concat();
let start = src
.find(&head)
.expect("`fn start_subscription_request(` must exist in operations.rs");
let body_open = src[start..]
.find('{')
.map(|off| start + off)
.expect("expected `{` after function signature");
let mut depth: i32 = 0;
let mut end = body_open;
for (i, ch) in src[body_open..].char_indices() {
match ch {
'{' => depth += 1,
'}' => {
depth -= 1;
if depth == 0 {
end = body_open + i + 1;
break;
}
}
_ => {}
}
}
assert!(
end > body_open,
"failed to find matching `}}` for start_subscription_request"
);
&src[start..end]
}
#[test]
fn start_subscription_request_uses_run_client_subscribe() {
let body = extract_start_subscription_request_body();
assert!(
!body.contains("subscribe::request_subscribe"),
"`start_subscription_request` must route through \
`subscribe::run_client_subscribe`, not `request_subscribe`."
);
}
#[test]
fn start_subscription_request_does_not_register_with_sub_op_tracker() {
let body = extract_start_subscription_request_body();
assert!(
!body.contains("expect_and_register_sub_operation"),
"`start_subscription_request` must NOT register with a \
sub-operation tracker."
);
assert!(
!body.contains("sub_operation_failed"),
"`start_subscription_request` must NOT propagate failures \
via `sub_operation_failed` — the subscribe driver \
publishes its own `HostResult::Err`."
);
}
#[test]
fn start_subscription_request_spawns_driver_driver() {
let body = extract_start_subscription_request_body();
assert!(
body.contains("subscribe::run_client_subscribe"),
"`start_subscription_request` must spawn the driver \
subscribe driver `subscribe::run_client_subscribe` — \
matches the `maybe_subscribe_child` pattern in \
`put/op_ctx_task.rs` and `get/op_ctx_task.rs`."
);
}
}