pub(crate) mod op_ctx_task;
use freenet_stdlib::client_api::{ErrorKind, HostResponse};
use freenet_stdlib::prelude::*;
pub(crate) use self::messages::{BroadcastStreamingPayload, UpdateMsg, UpdateStreamingPayload};
use super::{OpError, OpOutcome};
use crate::contract::{ContractHandlerEvent, ExecutorError, StoreResponse};
use crate::message::{NodeEvent, Transaction};
use crate::node::IsOperationCompleted;
use crate::ring::{Location, PeerKeyLocation};
use crate::{client_events::HostResult, node::OpManager};
use std::collections::VecDeque;
use std::net::SocketAddr;
use dashmap::DashMap;
use tokio::time::Instant;
pub(crate) struct BroadcastDedupCache {
entries: DashMap<ContractKey, VecDeque<DedupEntry>>,
}
struct DedupEntry {
delta_hash: u64,
inserted_at: Instant,
}
const DEDUP_MAX_ENTRIES_PER_CONTRACT: usize = 64;
const DEDUP_TTL: std::time::Duration = std::time::Duration::from_secs(60);
impl BroadcastDedupCache {
pub fn new() -> Self {
Self {
entries: DashMap::new(),
}
}
pub fn check_and_insert(
&self,
key: &ContractKey,
payload_bytes: &[u8],
is_delta: bool,
now: Instant,
) -> bool {
use ahash::AHasher;
use std::hash::Hasher;
let mut hasher = AHasher::default();
hasher.write_u8(if is_delta { 1 } else { 0 });
hasher.write(payload_bytes);
let delta_hash = hasher.finish();
let mut entry = self.entries.entry(*key).or_default();
let queue = entry.value_mut();
while let Some(front) = queue.front() {
if now.duration_since(front.inserted_at) > DEDUP_TTL {
queue.pop_front();
} else {
break;
}
}
if queue.iter().any(|e| e.delta_hash == delta_hash) {
return true; }
while queue.len() >= DEDUP_MAX_ENTRIES_PER_CONTRACT {
queue.pop_front();
}
queue.push_back(DedupEntry {
delta_hash,
inserted_at: now,
});
false }
}
pub(crate) struct BroadcastTargetResult {
pub targets: Vec<PeerKeyLocation>,
pub proximity_found: usize,
pub proximity_resolve_failed: usize,
pub interest_found: usize,
pub interest_resolve_failed: usize,
pub skipped_self: usize,
pub skipped_sender: usize,
}
#[allow(dead_code)]
pub(crate) struct UpdateOp {
pub id: Transaction,
pub(crate) state: Option<UpdateState>,
pub(crate) stats: Option<UpdateStats>,
pub(crate) upstream_addr: Option<std::net::SocketAddr>,
}
#[allow(dead_code)]
impl UpdateOp {
pub fn id(&self) -> &Transaction {
&self.id
}
pub fn outcome(&self) -> OpOutcome<'_> {
if self.finalized() {
if let Some(UpdateStats {
target: Some(ref target),
contract_location: Some(loc),
}) = self.stats
{
return OpOutcome::ContractOpSuccessUntimed {
target_peer: target,
contract_location: loc,
};
}
return OpOutcome::Irrelevant;
}
if let Some(UpdateStats {
target: Some(ref target),
contract_location: Some(loc),
}) = self.stats
{
OpOutcome::ContractOpFailure {
target_peer: target,
contract_location: loc,
}
} else {
OpOutcome::Incomplete
}
}
pub(crate) fn is_client_initiated(&self) -> bool {
self.upstream_addr.is_none()
}
pub(crate) fn failure_routing_info(&self) -> Option<(PeerKeyLocation, Location)> {
match &self.stats {
Some(UpdateStats {
target: Some(target),
contract_location: Some(loc),
}) => Some((target.clone(), *loc)),
_ => None,
}
}
pub fn finalized(&self) -> bool {
matches!(self.state, None | Some(UpdateState::Finished(_)))
}
pub fn get_next_hop_addr(&self) -> Option<std::net::SocketAddr> {
self.stats
.as_ref()
.and_then(|s| s.target.as_ref())
.and_then(|t| t.socket_addr())
}
pub(super) fn to_host_result(&self) -> HostResult {
if let Some(UpdateState::Finished(data)) = &self.state {
let (key, summary) = (&data.key, &data.summary);
tracing::debug!(
"Creating UpdateResponse for transaction {} with key {} and summary length {}",
self.id,
key,
summary.size()
);
Ok(HostResponse::ContractResponse(
freenet_stdlib::client_api::ContractResponse::UpdateResponse {
key: *key,
summary: summary.clone(),
},
))
} else {
tracing::error!(
tx = %self.id,
state = ?self.state,
phase = "error",
"UPDATE operation failed to finish successfully"
);
Err(ErrorKind::OperationError {
cause: "update didn't finish successfully".into(),
}
.into())
}
}
pub(crate) async fn handle_abort(self, op_manager: &OpManager) -> Result<(), OpError> {
tracing::warn!(
tx = %self.id,
"Update operation aborted due to connection failure"
);
let error_result: crate::client_events::HostResult =
Err(freenet_stdlib::client_api::ErrorKind::OperationError {
cause: "Update operation failed: peer connection dropped".into(),
}
.into());
if let Err(err) = op_manager
.result_router_tx
.try_send((self.id, error_result))
{
tracing::error!(
tx = %self.id,
error = %err,
"Failed to send abort notification to client \
(result router channel full or closed)"
);
}
op_manager.completed(self.id);
Ok(())
}
}
#[allow(dead_code)]
pub(crate) struct UpdateStats {
pub(crate) target: Option<PeerKeyLocation>,
pub(crate) contract_location: Option<Location>,
}
pub(crate) struct UpdateExecution {
pub(crate) value: WrappedState,
pub(crate) summary: StateSummary<'static>,
pub(crate) changed: bool,
}
pub(crate) const CONTRACT_FETCH_COOLDOWN_MS: u64 = 300_000;
impl OpManager {
pub(crate) fn try_auto_fetch_contract(&self, key: &ContractKey, sender_addr: SocketAddr) {
use crate::config::GlobalSimulationTime;
let instance_id = *key.id();
let now_ms = GlobalSimulationTime::read_time_ms();
{
use dashmap::mapref::entry::Entry;
match self.pending_contract_fetches.entry(instance_id) {
Entry::Occupied(mut existing) => {
let elapsed_ms = now_ms.saturating_sub(*existing.get());
if elapsed_ms < CONTRACT_FETCH_COOLDOWN_MS {
return; }
*existing.get_mut() = now_ms;
}
Entry::Vacant(slot) => {
slot.insert(now_ms);
}
}
}
let sender_pkl = match self.ring.connection_manager.get_peer_by_addr(sender_addr) {
Some(pkl) => pkl,
None => {
tracing::debug!(
contract = %key,
sender = %sender_addr,
"Cannot auto-fetch: UPDATE sender not found in connection manager"
);
self.pending_contract_fetches.remove(&instance_id);
return;
}
};
tracing::info!(
contract = %key,
sender = %sender_addr,
"Auto-fetching contract from UPDATE sender (missing parameters)"
);
let _tx = super::get::op_ctx_task::start_targeted_sub_op_get(self, instance_id, sender_pkl);
}
pub(crate) fn get_broadcast_targets_update(
&self,
key: &ContractKey,
sender: &SocketAddr,
) -> BroadcastTargetResult {
use std::collections::HashSet;
let self_addr = self.ring.connection_manager.get_own_addr();
let is_local_update_initiator = self_addr.as_ref().map(|me| me == sender).unwrap_or(false);
let mut targets: HashSet<PeerKeyLocation> = HashSet::new();
let mut proximity_resolve_failed: usize = 0;
let mut interest_resolve_failed: usize = 0;
let mut skipped_self: usize = 0;
let mut skipped_sender: usize = 0;
let proximity_pub_keys = self.neighbor_hosting.neighbors_with_contract(key);
let proximity_found = proximity_pub_keys.len();
for pub_key in proximity_pub_keys {
if let Some(pkl) = self.ring.connection_manager.get_peer_by_pub_key(&pub_key) {
if let Some(pkl_addr) = pkl.socket_addr() {
if &pkl_addr == sender && !is_local_update_initiator {
skipped_sender += 1;
continue;
}
if !is_local_update_initiator && self_addr.as_ref() == Some(&pkl_addr) {
skipped_self += 1;
continue;
}
}
targets.insert(pkl);
} else {
proximity_resolve_failed += 1;
tracing::warn!(
contract = %format!("{:.8}", key),
proximity_neighbor = %pub_key,
is_local = is_local_update_initiator,
phase = "target_lookup_failed",
"Proximity cache neighbor not found in connection manager"
);
}
}
let interested_peers = self.interest_manager.get_interested_peers(key);
let interest_found = interested_peers.len();
for (peer_key, _interest) in interested_peers {
if let Some(pkl) = self
.ring
.connection_manager
.get_peer_by_pub_key(&peer_key.0)
{
if let Some(pkl_addr) = pkl.socket_addr() {
if &pkl_addr == sender && !is_local_update_initiator {
skipped_sender += 1;
continue;
}
if !is_local_update_initiator && self_addr.as_ref() == Some(&pkl_addr) {
skipped_self += 1;
continue;
}
}
targets.insert(pkl);
} else {
interest_resolve_failed += 1;
tracing::warn!(
contract = %format!("{:.8}", key),
interest_peer = %peer_key.0,
is_local = is_local_update_initiator,
phase = "target_lookup_failed",
"Interest manager peer not found in connection manager"
);
}
}
let mut result: Vec<PeerKeyLocation> = targets.into_iter().collect();
result.sort();
if !result.is_empty() {
tracing::info!(
contract = %format!("{:.8}", key),
peer_addr = %sender,
targets = %result
.iter()
.filter_map(|s| s.socket_addr())
.map(|addr| format!("{:.8}", addr))
.collect::<Vec<_>>()
.join(","),
count = result.len(),
proximity_sources = proximity_found,
interest_sources = interest_found,
phase = "broadcast",
"UPDATE_PROPAGATION"
);
} else {
tracing::debug!(
contract = %format!("{:.8}", key),
peer_addr = %sender,
self_addr = ?self_addr.map(|a| format!("{:.8}", a)),
proximity_sources = proximity_found,
interest_sources = interest_found,
phase = "warning",
"UPDATE_PROPAGATION: NO_TARGETS - update will not propagate further"
);
}
BroadcastTargetResult {
targets: result,
proximity_found,
proximity_resolve_failed,
interest_found,
interest_resolve_failed,
skipped_self,
skipped_sender,
}
}
}
fn log_update_contract_failure(key: &ContractKey, err: &ExecutorError) {
if err.is_invalid_update_rejection() {
tracing::info!(
contract = %key,
error = %err,
event = "merge_rejected_invalid_update",
"Update rejected by contract: incoming state invalid (likely stale rebroadcast), keeping local"
);
} else {
tracing::error!(
contract = %key,
error = %err,
phase = "error",
"Failed to update contract value"
);
}
}
pub(crate) fn log_broadcast_to_streaming_failure(
tx: &Transaction,
key: &ContractKey,
err: &OpError,
) -> bool {
if err.is_invalid_update_rejection() {
tracing::info!(
tx = %tx,
%key,
error = %err,
event = "merge_rejected_invalid_update",
"BroadcastToStreaming merge rejected: incoming state invalid (likely stale rebroadcast), keeping local"
);
} else {
tracing::warn!(
tx = %tx,
%key,
error = %err,
"BroadcastToStreaming update skipped: contract not ready locally"
);
}
!err.is_contract_exec_rejection()
}
pub(crate) async fn update_contract(
op_manager: &OpManager,
key: ContractKey,
update_data: UpdateData<'static>,
related_contracts: RelatedContracts<'static>,
) -> Result<UpdateExecution, OpError> {
let previous_state = match op_manager
.notify_contract_handler(ContractHandlerEvent::GetQuery {
instance_id: *key.id(),
return_contract_code: false,
})
.await
{
Ok(ContractHandlerEvent::GetResponse {
response: Ok(StoreResponse { state, .. }),
..
}) => state,
Ok(other) => {
tracing::trace!(?other, %key, "Unexpected get response while preparing update summary");
None
}
Err(err) => {
tracing::debug!(%key, %err, "Failed to fetch existing contract state before update");
None
}
};
match op_manager
.notify_contract_handler(ContractHandlerEvent::UpdateQuery {
key,
data: update_data.clone(),
related_contracts,
})
.await
{
Ok(ContractHandlerEvent::UpdateResponse {
new_value: Ok(new_val),
state_changed,
}) => {
debug_assert!(
new_val.size() > 0,
"update_contract: state must be non-empty after successful UPDATE for contract {key}"
);
let new_bytes = State::from(new_val.clone()).into_bytes();
let summary = StateSummary::from(new_bytes);
Ok(UpdateExecution {
value: new_val,
summary,
changed: state_changed,
})
}
Ok(ContractHandlerEvent::UpdateResponse {
new_value: Err(err),
..
}) => {
log_update_contract_failure(&key, &err);
Err(err.into())
}
Ok(ContractHandlerEvent::UpdateNoChange { .. }) => {
fn extract_state_from_update_data(
update_data: &UpdateData<'static>,
) -> Option<WrappedState> {
match update_data {
UpdateData::State(s) => Some(WrappedState::from(s.clone().into_bytes())),
UpdateData::StateAndDelta { state, .. }
| UpdateData::RelatedState { state, .. }
| UpdateData::RelatedStateAndDelta { state, .. } => {
Some(WrappedState::from(state.clone().into_bytes()))
}
UpdateData::Delta(_) | UpdateData::RelatedDelta { .. } => None,
_ => None,
}
}
let resolved_state = match previous_state {
Some(prev_state) => prev_state,
None => {
let fetched_state = op_manager
.notify_contract_handler(ContractHandlerEvent::GetQuery {
instance_id: *key.id(),
return_contract_code: false,
})
.await
.ok()
.and_then(|event| match event {
ContractHandlerEvent::GetResponse {
response: Ok(StoreResponse { state, .. }),
..
} => state,
ContractHandlerEvent::DelegateRequest { .. }
| ContractHandlerEvent::DelegateResponse(_)
| ContractHandlerEvent::PutQuery { .. }
| ContractHandlerEvent::PutResponse { .. }
| ContractHandlerEvent::GetQuery { .. }
| ContractHandlerEvent::GetResponse { .. }
| ContractHandlerEvent::UpdateQuery { .. }
| ContractHandlerEvent::UpdateResponse { .. }
| ContractHandlerEvent::UpdateNoChange { .. }
| ContractHandlerEvent::RegisterSubscriberListener { .. }
| ContractHandlerEvent::RegisterSubscriberListenerResponse
| ContractHandlerEvent::QuerySubscriptions { .. }
| ContractHandlerEvent::QuerySubscriptionsResponse
| ContractHandlerEvent::GetSummaryQuery { .. }
| ContractHandlerEvent::GetSummaryResponse { .. }
| ContractHandlerEvent::GetDeltaQuery { .. }
| ContractHandlerEvent::GetDeltaResponse { .. }
| ContractHandlerEvent::NotifySubscriptionError { .. }
| ContractHandlerEvent::NotifySubscriptionErrorResponse
| ContractHandlerEvent::ClientDisconnect { .. } => None,
});
match fetched_state {
Some(state) => state,
None => {
tracing::debug!(
%key,
"Fallback fetch for UpdateNoChange returned no state; trying to extract from update_data"
);
match extract_state_from_update_data(&update_data) {
Some(state) => state,
None => {
tracing::error!(
%key,
"Cannot extract state from delta-only UpdateData in NoChange fallback"
);
return Err(OpError::UnexpectedOpState);
}
}
}
}
}
};
let bytes = State::from(resolved_state.clone()).into_bytes();
let summary = StateSummary::from(bytes);
Ok(UpdateExecution {
value: resolved_state,
summary,
changed: false,
})
}
Err(err) => Err(err.into()),
Ok(other) => {
tracing::error!(event = ?other, contract = %key, phase = "error", "Unexpected event from contract handler during update");
Err(OpError::UnexpectedOpState)
}
}
}
#[allow(dead_code)]
impl IsOperationCompleted for UpdateOp {
fn is_completed(&self) -> bool {
matches!(self.state, Some(UpdateState::Finished(_)))
}
}
pub(crate) async fn send_proactive_summary_notification(
op_manager: &OpManager,
key: &ContractKey,
sender_addr: SocketAddr,
summary: StateSummary<'static>,
) {
use crate::message::{InterestMessage, SummaryEntry};
use crate::ring::interest::contract_hash;
if !op_manager
.interest_manager
.should_send_summary_notification(key)
{
return;
}
let hash = contract_hash(key);
let message = InterestMessage::Summaries {
entries: vec![SummaryEntry::from_summary(hash, Some(&summary))],
};
let interested = op_manager.interest_manager.get_interested_peers(key);
let self_addr = op_manager.ring.connection_manager.get_own_addr();
for (peer_key, _interest) in &interested {
let peer_addr = match op_manager
.ring
.connection_manager
.get_peer_by_pub_key(&peer_key.0)
{
Some(pkl) => match pkl.socket_addr() {
Some(addr) => addr,
None => continue,
},
None => continue,
};
if peer_addr == sender_addr {
continue;
}
if self_addr.as_ref() == Some(&peer_addr) {
continue;
}
if let Err(e) = op_manager
.notify_node_event(NodeEvent::SendInterestMessage {
target: peer_addr,
message: message.clone(),
})
.await
{
tracing::debug!(
contract = %key,
peer = %peer_addr,
error = %e,
"Failed to send proactive summary notification"
);
}
}
tracing::debug!(
contract = %key,
peer_count = interested.len(),
"Sent proactive summary notifications after state change"
);
}
pub(crate) async fn send_summary_back_on_rejection(
op_manager: &OpManager,
key: &ContractKey,
target_addr: SocketAddr,
sender_summary_bytes: Vec<u8>,
) {
use crate::message::{InterestMessage, SummaryEntry};
use crate::ring::interest::contract_hash;
if !op_manager
.interest_manager
.should_send_summary_notification(key)
{
return;
}
let Some(our_summary) = op_manager
.interest_manager
.get_contract_summary(op_manager, key)
.await
else {
tracing::debug!(
contract = %key,
peer = %target_addr,
"Skipping summary-back on rejection — no local summary available"
);
return;
};
if our_summary.as_ref() != sender_summary_bytes.as_slice() {
tracing::debug!(
contract = %key,
peer = %target_addr,
"Skipping summary-back on rejection — sender's summary differs \
from ours (peer is genuinely out of sync; heartbeat will converge)"
);
return;
}
let hash = contract_hash(key);
let message = InterestMessage::Summaries {
entries: vec![SummaryEntry::from_summary(hash, Some(&our_summary))],
};
if let Err(e) = op_manager
.notify_node_event(NodeEvent::SendInterestMessage {
target: target_addr,
message,
})
.await
{
tracing::info!(
contract = %key,
peer = %target_addr,
error = %e,
"Failed to send summary-back after broadcast rejection"
);
}
}
mod messages {
use std::fmt::Display;
use freenet_stdlib::prelude::{ContractKey, RelatedContracts, WrappedState};
use serde::{Deserialize, Serialize};
use crate::{
message::{InnerMessage, Transaction},
ring::Location,
transport::peer_connection::StreamId,
};
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct UpdateStreamingPayload {
#[serde(deserialize_with = "RelatedContracts::deser_related_contracts")]
pub related_contracts: RelatedContracts<'static>,
pub value: WrappedState,
}
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct BroadcastStreamingPayload {
pub state_bytes: Vec<u8>,
pub sender_summary_bytes: Vec<u8>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub(crate) enum UpdateMsg {
RequestUpdate {
id: Transaction,
key: ContractKey,
#[serde(deserialize_with = "RelatedContracts::deser_related_contracts")]
related_contracts: RelatedContracts<'static>,
value: WrappedState,
},
BroadcastTo {
id: Transaction,
key: ContractKey,
payload: crate::message::DeltaOrFullState,
sender_summary_bytes: Vec<u8>,
},
RequestUpdateStreaming {
id: Transaction,
stream_id: StreamId,
key: ContractKey,
total_size: u64,
},
BroadcastToStreaming {
id: Transaction,
stream_id: StreamId,
key: ContractKey,
total_size: u64,
},
}
impl InnerMessage for UpdateMsg {
fn id(&self) -> &Transaction {
match self {
UpdateMsg::RequestUpdate { id, .. }
| UpdateMsg::BroadcastTo { id, .. }
| UpdateMsg::RequestUpdateStreaming { id, .. }
| UpdateMsg::BroadcastToStreaming { id, .. } => id,
}
}
fn requested_location(&self) -> Option<crate::ring::Location> {
match self {
UpdateMsg::RequestUpdate { key, .. }
| UpdateMsg::BroadcastTo { key, .. }
| UpdateMsg::RequestUpdateStreaming { key, .. }
| UpdateMsg::BroadcastToStreaming { key, .. } => Some(Location::from(key.id())),
}
}
}
impl Display for UpdateMsg {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
UpdateMsg::RequestUpdate { id, .. } => write!(f, "RequestUpdate(id: {id})"),
UpdateMsg::BroadcastTo { id, .. } => write!(f, "BroadcastTo(id: {id})"),
UpdateMsg::RequestUpdateStreaming { id, stream_id, .. } => {
write!(f, "RequestUpdateStreaming(id: {id}, stream: {stream_id})")
}
UpdateMsg::BroadcastToStreaming { id, stream_id, .. } => {
write!(f, "BroadcastToStreaming(id: {id}, stream: {stream_id})")
}
}
}
}
}
#[allow(dead_code)]
#[derive(Debug)]
pub struct FinishedData {
pub key: ContractKey,
pub summary: StateSummary<'static>,
}
#[allow(dead_code)]
#[derive(Debug)]
pub enum UpdateState {
ReceivedRequest,
Finished(FinishedData),
}
#[cfg(test)]
#[allow(clippy::wildcard_enum_match_arm)]
mod tests {
use super::*;
use crate::operations::OpOutcome;
use crate::operations::test_utils::make_contract_key;
fn make_update_op(state: Option<UpdateState>, stats: Option<UpdateStats>) -> UpdateOp {
UpdateOp {
id: Transaction::new::<UpdateMsg>(),
state,
stats,
upstream_addr: None,
}
}
#[test]
fn is_client_initiated_true_when_no_upstream() {
let op = make_update_op(None, None);
assert!(op.is_client_initiated());
}
#[test]
fn is_client_initiated_false_when_forwarded() {
let op = UpdateOp {
id: Transaction::new::<UpdateMsg>(),
state: None,
stats: None,
upstream_addr: Some("127.0.0.1:12345".parse().unwrap()),
};
assert!(!op.is_client_initiated());
}
#[test]
fn update_op_outcome_success_untimed_when_finalized_with_full_stats() {
let target = PeerKeyLocation::random();
let loc = Location::random();
let op = make_update_op(
Some(UpdateState::Finished(FinishedData {
key: make_contract_key(1),
summary: StateSummary::from(vec![1u8]),
})),
Some(UpdateStats {
target: Some(target.clone()),
contract_location: Some(loc),
}),
);
match op.outcome() {
OpOutcome::ContractOpSuccessUntimed {
target_peer,
contract_location,
} => {
assert_eq!(*target_peer, target);
assert_eq!(contract_location, loc);
}
OpOutcome::ContractOpSuccess { .. }
| OpOutcome::ContractOpFailure { .. }
| OpOutcome::Incomplete
| OpOutcome::Irrelevant => {
panic!("Expected ContractOpSuccessUntimed for finalized update with full stats")
}
}
}
#[test]
fn update_op_outcome_irrelevant_when_finalized_without_stats() {
let op = make_update_op(
Some(UpdateState::Finished(FinishedData {
key: make_contract_key(1),
summary: StateSummary::from(vec![1u8]),
})),
None,
);
assert!(matches!(op.outcome(), OpOutcome::Irrelevant));
}
#[test]
fn update_op_outcome_irrelevant_when_finalized_with_partial_stats() {
let op = make_update_op(
Some(UpdateState::Finished(FinishedData {
key: make_contract_key(1),
summary: StateSummary::from(vec![1u8]),
})),
Some(UpdateStats {
target: None,
contract_location: Some(Location::random()),
}),
);
assert!(matches!(op.outcome(), OpOutcome::Irrelevant));
}
#[test]
fn update_op_outcome_failure_when_not_finalized_with_full_stats() {
let target = PeerKeyLocation::random();
let loc = Location::random();
let op = make_update_op(
Some(UpdateState::ReceivedRequest),
Some(UpdateStats {
target: Some(target.clone()),
contract_location: Some(loc),
}),
);
match op.outcome() {
OpOutcome::ContractOpFailure {
target_peer,
contract_location,
} => {
assert_eq!(*target_peer, target);
assert_eq!(contract_location, loc);
}
OpOutcome::ContractOpSuccess { .. }
| OpOutcome::ContractOpSuccessUntimed { .. }
| OpOutcome::Incomplete
| OpOutcome::Irrelevant => {
panic!("Expected ContractOpFailure for non-finalized update with full stats")
}
}
}
#[test]
fn update_op_outcome_incomplete_when_not_finalized_without_stats() {
let op = make_update_op(Some(UpdateState::ReceivedRequest), None);
assert!(matches!(op.outcome(), OpOutcome::Incomplete));
}
#[test]
fn update_op_failure_routing_info_with_full_stats() {
let target = PeerKeyLocation::random();
let loc = Location::random();
let op = make_update_op(
None,
Some(UpdateStats {
target: Some(target.clone()),
contract_location: Some(loc),
}),
);
let info = op.failure_routing_info().expect("should have routing info");
assert_eq!(info.0, target);
assert_eq!(info.1, loc);
}
#[test]
fn update_op_failure_routing_info_without_stats() {
let op = make_update_op(None, None);
assert!(op.failure_routing_info().is_none());
}
#[test]
fn update_op_failure_routing_info_with_partial_stats() {
let op = make_update_op(
None,
Some(UpdateStats {
target: None,
contract_location: Some(Location::random()),
}),
);
assert!(op.failure_routing_info().is_none());
}
#[test]
fn update_op_stats_lifecycle_from_partial_to_complete() {
let target = PeerKeyLocation::random();
let loc = Location::random();
let mut op = make_update_op(
Some(UpdateState::ReceivedRequest),
Some(UpdateStats {
target: None,
contract_location: Some(loc),
}),
);
assert!(matches!(op.outcome(), OpOutcome::Incomplete));
assert!(op.failure_routing_info().is_none());
op.stats = Some(UpdateStats {
target: Some(target.clone()),
contract_location: Some(loc),
});
match op.outcome() {
OpOutcome::ContractOpFailure {
target_peer,
contract_location,
} => {
assert_eq!(*target_peer, target);
assert_eq!(contract_location, loc);
}
OpOutcome::ContractOpSuccess { .. }
| OpOutcome::ContractOpSuccessUntimed { .. }
| OpOutcome::Incomplete
| OpOutcome::Irrelevant => {
panic!("Expected ContractOpFailure for in-progress update with stats")
}
}
assert!(op.failure_routing_info().is_some());
op.state = Some(UpdateState::Finished(FinishedData {
key: make_contract_key(1),
summary: StateSummary::from(vec![1u8]),
}));
match op.outcome() {
OpOutcome::ContractOpSuccessUntimed {
target_peer,
contract_location,
} => {
assert_eq!(*target_peer, target);
assert_eq!(contract_location, loc);
}
OpOutcome::ContractOpSuccess { .. }
| OpOutcome::ContractOpFailure { .. }
| OpOutcome::Incomplete
| OpOutcome::Irrelevant => {
panic!("Expected ContractOpSuccessUntimed for finished update with stats")
}
}
}
#[test]
fn update_op_outcome_with_target_but_no_contract_location() {
let target = PeerKeyLocation::random();
let op = make_update_op(
Some(UpdateState::ReceivedRequest),
Some(UpdateStats {
target: Some(target.clone()),
contract_location: None,
}),
);
assert!(matches!(op.outcome(), OpOutcome::Incomplete));
assert!(op.failure_routing_info().is_none());
let op = make_update_op(
Some(UpdateState::Finished(FinishedData {
key: make_contract_key(1),
summary: StateSummary::from(vec![1u8]),
})),
Some(UpdateStats {
target: Some(target),
contract_location: None,
}),
);
assert!(matches!(op.outcome(), OpOutcome::Irrelevant));
}
use crate::operations::test_utils::make_peer;
#[test]
fn test_update_failure_outcome_with_stats() {
let target = make_peer(9001);
let contract_location = Location::from(&make_contract_key(42));
let op = make_update_op(
Some(UpdateState::ReceivedRequest),
Some(UpdateStats {
target: Some(target.clone()),
contract_location: Some(contract_location),
}),
);
assert!(!op.finalized());
match op.outcome() {
OpOutcome::ContractOpFailure {
target_peer,
contract_location: loc,
} => {
assert_eq!(target_peer, &target);
assert_eq!(loc, contract_location);
}
other => panic!("Expected ContractOpFailure, got {other:?}"),
}
}
#[test]
fn test_update_failure_outcome_without_stats() {
let op = make_update_op(Some(UpdateState::ReceivedRequest), None);
assert!(!op.finalized());
assert!(
matches!(op.outcome(), OpOutcome::Incomplete),
"UPDATE without stats should return Incomplete"
);
}
#[test]
fn test_update_failure_routing_info() {
let target = make_peer(9002);
let contract_location = Location::from(&make_contract_key(42));
let op = make_update_op(
Some(UpdateState::ReceivedRequest),
Some(UpdateStats {
target: Some(target.clone()),
contract_location: Some(contract_location),
}),
);
let (peer, loc) = op.failure_routing_info().expect("should have routing info");
assert_eq!(peer, target);
assert_eq!(loc, contract_location);
}
mod log_severity {
use super::*;
use crate::contract::ExecutorError;
use crate::test_utils::TestLogger;
use freenet_stdlib::client_api::{ContractError as StdContractError, RequestError};
fn invalid_update_rejection() -> ExecutorError {
let req: RequestError = StdContractError::update_exec_error(
make_contract_key(1),
"invalid contract update, reason: New state version 100 must be higher than current version 100",
)
.into();
req.into()
}
fn out_of_gas_failure() -> ExecutorError {
let req: RequestError = StdContractError::update_exec_error(
make_contract_key(1),
"The operation ran out of gas. This might be caused by an infinite loop or an inefficient computation.",
)
.into();
req.into()
}
fn missing_parameters_failure() -> ExecutorError {
let req: RequestError = StdContractError::Update {
key: make_contract_key(2),
cause: "missing contract parameters".into(),
}
.into();
req.into()
}
#[test]
fn update_contract_failure_logs_info_for_invalid_update_rejection() {
let logger = TestLogger::new().capture_logs().with_level("info").init();
log_update_contract_failure(&make_contract_key(1), &invalid_update_rejection());
assert!(
logger.contains("merge_rejected_invalid_update"),
"expected event=merge_rejected_invalid_update in logs, got: {:?}",
logger.logs()
);
assert!(
logger.contains("INFO"),
"expected INFO-level log for invalid-update rejection, got: {:?}",
logger.logs()
);
assert!(
!logger.logs().iter().any(|l| l.contains("ERROR")),
"invalid-update rejection must not produce ERROR-level logs, got: {:?}",
logger.logs()
);
}
#[test]
fn update_contract_failure_logs_error_for_real_failure() {
let logger = TestLogger::new().capture_logs().with_level("info").init();
log_update_contract_failure(&make_contract_key(2), &missing_parameters_failure());
assert!(
logger.contains("ERROR"),
"real failures must remain ERROR-level, got: {:?}",
logger.logs()
);
assert!(
logger.contains("Failed to update contract value"),
"expected ERROR message text, got: {:?}",
logger.logs()
);
}
#[test]
fn update_contract_failure_logs_error_for_out_of_gas() {
let logger = TestLogger::new().capture_logs().with_level("info").init();
log_update_contract_failure(&make_contract_key(1), &out_of_gas_failure());
assert!(
logger.contains("ERROR"),
"out-of-gas must remain ERROR-level (real WASM fault), got: {:?}",
logger.logs()
);
assert!(
!logger
.logs()
.iter()
.any(|l| l.contains("merge_rejected_invalid_update")),
"out-of-gas must NOT be classified as a benign rejection, got: {:?}",
logger.logs()
);
}
#[test]
fn broadcast_to_streaming_failure_logs_info_and_skips_auto_fetch_for_invalid_update() {
let logger = TestLogger::new().capture_logs().with_level("info").init();
let tx = Transaction::new::<UpdateMsg>();
let err: OpError = invalid_update_rejection().into();
let needs_auto_fetch =
log_broadcast_to_streaming_failure(&tx, &make_contract_key(1), &err);
assert!(
!needs_auto_fetch,
"invalid-update rejection must NOT trigger self-heal auto-fetch (contract code is present)"
);
assert!(
logger.contains("merge_rejected_invalid_update"),
"expected event=merge_rejected_invalid_update in logs, got: {:?}",
logger.logs()
);
assert!(
!logger.logs().iter().any(|l| l.contains("WARN")),
"invalid-update rejection must not produce WARN-level logs (the old misleading 'contract not ready locally' line), got: {:?}",
logger.logs()
);
assert!(
!logger
.logs()
.iter()
.any(|l| l.contains("contract not ready locally")),
"the misleading 'contract not ready locally' message must not appear for invalid-update rejections, got: {:?}",
logger.logs()
);
}
#[test]
fn broadcast_to_streaming_failure_logs_warn_and_triggers_auto_fetch_for_real_failure() {
let logger = TestLogger::new().capture_logs().with_level("info").init();
let tx = Transaction::new::<UpdateMsg>();
let err: OpError = missing_parameters_failure().into();
let needs_auto_fetch =
log_broadcast_to_streaming_failure(&tx, &make_contract_key(2), &err);
assert!(
needs_auto_fetch,
"real failures must trigger self-heal auto-fetch"
);
assert!(
logger.contains("WARN"),
"real failures remain WARN-level for the streaming branch, got: {:?}",
logger.logs()
);
assert!(
logger.contains("contract not ready locally"),
"expected the WARN message text for real failure, got: {:?}",
logger.logs()
);
}
#[test]
fn broadcast_to_streaming_failure_logs_warn_and_skips_auto_fetch_for_out_of_gas() {
let logger = TestLogger::new().capture_logs().with_level("info").init();
let tx = Transaction::new::<UpdateMsg>();
let err: OpError = out_of_gas_failure().into();
let needs_auto_fetch =
log_broadcast_to_streaming_failure(&tx, &make_contract_key(1), &err);
assert!(
logger.contains("WARN"),
"out-of-gas must remain WARN-level for the streaming branch, got: {:?}",
logger.logs()
);
assert!(
!logger
.logs()
.iter()
.any(|l| l.contains("merge_rejected_invalid_update")),
"out-of-gas must NOT be classified as a benign rejection, got: {:?}",
logger.logs()
);
assert!(
!needs_auto_fetch,
"out-of-gas must NOT trigger self-heal auto-fetch (contract code is present locally; the broader is_contract_exec_rejection predicate catches this case independently of log severity)"
);
}
}
#[test]
fn legacy_broadcast_to_sends_summary_back_on_rejection() {
let src = include_str!("update.rs");
let bcast_arm_pos = src
.find("UpdateMsg::BroadcastTo {")
.expect("UpdateMsg::BroadcastTo match arm not found");
let err_branch_start = src[bcast_arm_pos..]
.find("Err(err) =>")
.map(|p| bcast_arm_pos + p)
.expect("BroadcastTo Err(err) branch not found");
let branch = &src[err_branch_start..err_branch_start + 3500];
assert!(
branch.contains("err.is_invalid_update_rejection()"),
"BroadcastTo rejection branch MUST gate summary-back on \
is_invalid_update_rejection (not is_contract_exec_rejection): \
the broader predicate matches OOG/traps which are \
attacker-inducible and must not amplify into summary-back"
);
assert!(
branch.contains("send_summary_back_on_rejection"),
"send_summary_back_on_rejection call missing — stale-version \
rejections in legacy BroadcastTo will keep amplifying"
);
}
#[test]
fn legacy_broadcast_to_streaming_sends_summary_back_on_rejection() {
let src = include_str!("update.rs");
let stream_arm_pos = src
.find("UpdateMsg::BroadcastToStreaming")
.expect("BroadcastToStreaming arm not found");
let err_branch_start = src[stream_arm_pos..]
.find("Err(err) =>")
.map(|p| stream_arm_pos + p)
.expect("BroadcastToStreaming Err(err) branch not found");
let branch = &src[err_branch_start..err_branch_start + 3000];
assert!(
branch.contains("err.is_invalid_update_rejection()"),
"BroadcastToStreaming Err branch must gate summary-back on \
is_invalid_update_rejection"
);
assert!(
branch.contains("send_summary_back_on_rejection"),
"BroadcastToStreaming Err branch must spawn \
send_summary_back_on_rejection on invalid-update rejection"
);
assert!(
branch.contains("try_auto_fetch_contract"),
"BroadcastToStreaming Err branch must still call \
try_auto_fetch_contract on the non-rejection (missing-contract) path"
);
}
#[test]
fn summary_back_helper_gates_on_summary_equality() {
let src = include_str!("update.rs");
let fn_start = src
.find("pub(crate) async fn send_summary_back_on_rejection(")
.expect("send_summary_back_on_rejection fn not found");
let fn_end_offset = src[fn_start..]
.find("\n}\n")
.expect("send_summary_back_on_rejection fn close not found");
let fn_body = &src[fn_start..fn_start + fn_end_offset];
assert!(
fn_body.contains("sender_summary_bytes"),
"send_summary_back_on_rejection must take sender_summary_bytes \
as a parameter"
);
let throttle_pos = fn_body
.find("should_send_summary_notification")
.expect("helper MUST call should_send_summary_notification (throttle)");
let wasm_call_pos = fn_body
.find("get_contract_summary")
.expect("helper must call get_contract_summary to compute our_summary");
assert!(
throttle_pos < wasm_call_pos,
"should_send_summary_notification MUST run before get_contract_summary \
— otherwise attacker-induced rejections force unbounded WASM amplification"
);
let inequality_check_pos = fn_body
.find("our_summary.as_ref() != sender_summary_bytes.as_slice()")
.expect(
"helper MUST use `our_summary.as_ref() != sender_summary_bytes.as_slice()` \
as the gate condition (reversed direction reintroduces the SyncStateToPeer \
loop — see node.rs:1791-1839)",
);
let after_check = &fn_body[inequality_check_pos..];
let return_pos = after_check.find("return;").expect(
"gate must early-return on inequality (reversed direction would bypass \
the SyncStateToPeer safeguard)",
);
let notify_pos = after_check
.find("notify_node_event")
.expect("helper must still call notify_node_event on the equality path");
assert!(
return_pos < notify_pos,
"early return on inequality MUST precede the notify_node_event \
send — otherwise mismatched summaries would still be transmitted"
);
}
}