pub(crate) mod op_ctx_task;
use either::Either;
use freenet_stdlib::client_api::{ErrorKind, HostResponse};
use freenet_stdlib::prelude::*;
pub(crate) use self::messages::{BroadcastStreamingPayload, UpdateMsg, UpdateStreamingPayload};
use super::{
OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult, should_use_streaming,
};
use crate::contract::{ContractHandlerEvent, ExecutorError, StoreResponse};
use crate::message::{InnerMessage, NetMessage, NodeEvent, Transaction};
use crate::node::IsOperationCompleted;
use crate::ring::{Location, PeerKeyLocation, RingError};
use crate::transport::peer_connection::StreamId;
use crate::{
client_events::HostResult,
node::{NetworkBridge, OpManager},
tracing::{NetEventLog, OperationFailure, state_hash_full},
};
use std::collections::VecDeque;
use std::net::SocketAddr;
use dashmap::DashMap;
use tokio::time::Instant;
pub(crate) struct BroadcastDedupCache {
entries: DashMap<ContractKey, VecDeque<DedupEntry>>,
}
struct DedupEntry {
delta_hash: u64,
inserted_at: Instant,
}
const DEDUP_MAX_ENTRIES_PER_CONTRACT: usize = 64;
const DEDUP_TTL: std::time::Duration = std::time::Duration::from_secs(60);
impl BroadcastDedupCache {
pub fn new() -> Self {
Self {
entries: DashMap::new(),
}
}
pub fn check_and_insert(
&self,
key: &ContractKey,
payload_bytes: &[u8],
is_delta: bool,
now: Instant,
) -> bool {
use ahash::AHasher;
use std::hash::Hasher;
let mut hasher = AHasher::default();
hasher.write_u8(if is_delta { 1 } else { 0 });
hasher.write(payload_bytes);
let delta_hash = hasher.finish();
let mut entry = self.entries.entry(*key).or_default();
let queue = entry.value_mut();
while let Some(front) = queue.front() {
if now.duration_since(front.inserted_at) > DEDUP_TTL {
queue.pop_front();
} else {
break;
}
}
if queue.iter().any(|e| e.delta_hash == delta_hash) {
return true; }
while queue.len() >= DEDUP_MAX_ENTRIES_PER_CONTRACT {
queue.pop_front();
}
queue.push_back(DedupEntry {
delta_hash,
inserted_at: now,
});
false }
}
pub(crate) struct BroadcastTargetResult {
pub targets: Vec<PeerKeyLocation>,
pub proximity_found: usize,
pub proximity_resolve_failed: usize,
pub interest_found: usize,
pub interest_resolve_failed: usize,
pub skipped_self: usize,
pub skipped_sender: usize,
}
pub(crate) struct UpdateOp {
pub id: Transaction,
pub(crate) state: Option<UpdateState>,
stats: Option<UpdateStats>,
upstream_addr: Option<std::net::SocketAddr>,
}
impl UpdateOp {
pub fn outcome(&self) -> OpOutcome<'_> {
if self.finalized() {
if let Some(UpdateStats {
target: Some(ref target),
contract_location: Some(loc),
}) = self.stats
{
return OpOutcome::ContractOpSuccessUntimed {
target_peer: target,
contract_location: loc,
};
}
return OpOutcome::Irrelevant;
}
if let Some(UpdateStats {
target: Some(ref target),
contract_location: Some(loc),
}) = self.stats
{
OpOutcome::ContractOpFailure {
target_peer: target,
contract_location: loc,
}
} else {
OpOutcome::Incomplete
}
}
pub(crate) fn is_client_initiated(&self) -> bool {
self.upstream_addr.is_none()
}
pub(crate) fn failure_routing_info(&self) -> Option<(PeerKeyLocation, Location)> {
match &self.stats {
Some(UpdateStats {
target: Some(target),
contract_location: Some(loc),
}) => Some((target.clone(), *loc)),
_ => None,
}
}
pub fn finalized(&self) -> bool {
matches!(self.state, None | Some(UpdateState::Finished(_)))
}
pub fn get_next_hop_addr(&self) -> Option<std::net::SocketAddr> {
self.stats
.as_ref()
.and_then(|s| s.target.as_ref())
.and_then(|t| t.socket_addr())
}
pub(super) fn to_host_result(&self) -> HostResult {
if let Some(UpdateState::Finished(data)) = &self.state {
let (key, summary) = (&data.key, &data.summary);
tracing::debug!(
"Creating UpdateResponse for transaction {} with key {} and summary length {}",
self.id,
key,
summary.size()
);
Ok(HostResponse::ContractResponse(
freenet_stdlib::client_api::ContractResponse::UpdateResponse {
key: *key,
summary: summary.clone(),
},
))
} else {
tracing::error!(
tx = %self.id,
state = ?self.state,
phase = "error",
"UPDATE operation failed to finish successfully"
);
Err(ErrorKind::OperationError {
cause: "update didn't finish successfully".into(),
}
.into())
}
}
pub(crate) async fn handle_abort(self, op_manager: &OpManager) -> Result<(), OpError> {
tracing::warn!(
tx = %self.id,
"Update operation aborted due to connection failure"
);
let error_result: crate::client_events::HostResult =
Err(freenet_stdlib::client_api::ErrorKind::OperationError {
cause: "Update operation failed: peer connection dropped".into(),
}
.into());
if let Err(err) = op_manager
.result_router_tx
.try_send((self.id, error_result))
{
tracing::error!(
tx = %self.id,
error = %err,
"Failed to send abort notification to client \
(result router channel full or closed)"
);
}
op_manager.completed(self.id);
Ok(())
}
}
struct UpdateStats {
target: Option<PeerKeyLocation>,
contract_location: Option<Location>,
}
pub(crate) struct UpdateExecution {
pub(crate) value: WrappedState,
pub(crate) summary: StateSummary<'static>,
pub(crate) changed: bool,
}
pub(crate) struct UpdateResult {}
impl TryFrom<UpdateOp> for UpdateResult {
type Error = OpError;
fn try_from(op: UpdateOp) -> Result<Self, Self::Error> {
if matches!(op.state, None | Some(UpdateState::Finished(_))) {
Ok(UpdateResult {})
} else {
Err(OpError::UnexpectedOpState)
}
}
}
impl Operation for UpdateOp {
type Message = UpdateMsg;
type Result = UpdateResult;
async fn load_or_init<'a>(
op_manager: &'a crate::node::OpManager,
msg: &'a Self::Message,
source_addr: Option<std::net::SocketAddr>,
) -> Result<super::OpInitialization<Self>, OpError> {
let tx = *msg.id();
match op_manager.pop(msg.id()) {
Ok(Some(OpEnum::Update(update_op))) => {
Ok(OpInitialization {
op: update_op,
source_addr,
})
}
Ok(Some(op)) => {
if let Err(e) = op_manager.push(tx, op).await {
tracing::warn!(tx = %tx, error = %e, "failed to push mismatched op back");
}
Err(OpError::OpNotPresent(tx))
}
Ok(None) => {
tracing::debug!(tx = %tx, ?source_addr, "initializing new op");
Ok(OpInitialization {
op: Self {
state: Some(UpdateState::ReceivedRequest),
id: tx,
stats: None, upstream_addr: source_addr, },
source_addr,
})
}
Err(err) => Err(err.into()),
}
}
fn id(&self) -> &crate::message::Transaction {
&self.id
}
fn process_message<'a, NB: NetworkBridge>(
self,
_conn_manager: &'a mut NB,
op_manager: &'a crate::node::OpManager,
input: &'a Self::Message,
source_addr: Option<std::net::SocketAddr>,
) -> std::pin::Pin<
Box<dyn futures::Future<Output = Result<super::OperationResult, OpError>> + Send + 'a>,
> {
Box::pin(async move {
let return_msg;
let new_state;
let mut stats = self.stats;
let mut forward_hop: Option<SocketAddr> = None;
let mut stream_data: Option<(StreamId, bytes::Bytes)> = None;
match input {
UpdateMsg::RequestUpdate {
id,
key,
related_contracts,
value,
} => {
let self_location = op_manager.ring.connection_manager.own_location();
let executing_addr = self_location.socket_addr();
tracing::debug!(
tx = %id,
%key,
executing_peer = ?executing_addr,
request_sender = ?source_addr,
"UPDATE RequestUpdate: processing request"
);
{
let state_before = match op_manager
.notify_contract_handler(ContractHandlerEvent::GetQuery {
instance_id: *key.id(),
return_contract_code: false,
})
.await
{
Ok(ContractHandlerEvent::GetResponse {
response: Ok(StoreResponse { state: Some(s), .. }),
..
}) => {
tracing::debug!(tx = %id, %key, "Contract exists locally, handling UPDATE");
Some(s)
}
_ => {
tracing::debug!(tx = %id, %key, "Contract not found locally");
None
}
};
if state_before.is_some() {
tracing::debug!(
tx = %id,
%key,
"Handling UPDATE locally - contract exists"
);
let hash_before = state_before.as_ref().map(state_hash_full);
let UpdateExecution {
value: updated_value,
summary,
changed,
..
} = update_contract(
op_manager,
*key,
UpdateData::State(State::from(value.clone())),
related_contracts.clone(),
)
.await?;
let hash_after = Some(state_hash_full(&updated_value));
let requester_addr = source_addr.expect(
"remote UpdateMsg::RequestUpdate must have source_addr for telemetry",
);
if let Some(requester_pkl) = op_manager
.ring
.connection_manager
.get_peer_by_addr(requester_addr)
{
if let Some(event) = NetEventLog::update_success(
id,
&op_manager.ring,
*key,
requester_pkl,
hash_before,
hash_after.clone(),
Some(updated_value.len()),
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
}
if !changed {
tracing::debug!(
tx = %id,
%key,
"UPDATE yielded no state change, skipping broadcast"
);
} else {
tracing::debug!(
tx = %id,
%key,
"UPDATE succeeded, state changed"
);
}
if self.upstream_addr.is_none() {
new_state = Some(UpdateState::Finished(FinishedData {
key: *key,
summary: summary.clone(),
}));
} else {
new_state = None;
}
return_msg = None;
} else {
let self_addr = op_manager.ring.connection_manager.peer_addr()?;
let sender_addr = source_addr
.expect("remote UpdateMsg::RequestUpdate must have source_addr");
let skip_list = vec![self_addr, sender_addr];
let next_target = op_manager
.ring
.closest_potentially_hosting(key, skip_list.as_slice());
if let Some(forward_target) = next_target {
let forward_addr = forward_target
.socket_addr()
.expect("forward target must have socket address");
tracing::debug!(
tx = %id,
%key,
next_peer = %forward_addr,
"Forwarding UPDATE to peer that might have contract"
);
if let Some(event) = NetEventLog::update_request(
id,
&op_manager.ring,
*key,
forward_target.clone(),
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
let payload = UpdateStreamingPayload {
related_contracts: related_contracts.clone(),
value: value.clone(),
};
let payload_bytes = bincode::serialize(&payload).map_err(|e| {
OpError::NotificationChannelError(format!(
"Failed to serialize UpdateStreamingPayload: {e}"
))
})?;
let payload_size = payload_bytes.len();
if should_use_streaming(
op_manager.streaming_threshold,
payload_size,
) {
let sid = StreamId::next_operations();
tracing::debug!(
tx = %id,
%key,
stream_id = %sid,
payload_size,
"Using streaming for UPDATE RequestUpdate forward"
);
return_msg = Some(UpdateMsg::RequestUpdateStreaming {
id: *id,
stream_id: sid,
key: *key,
total_size: payload_size as u64,
});
stream_data = Some((sid, bytes::Bytes::from(payload_bytes)));
} else {
return_msg = Some(UpdateMsg::RequestUpdate {
id: *id,
key: *key,
related_contracts: related_contracts.clone(),
value: value.clone(),
});
}
new_state = Some(UpdateState::ReceivedRequest);
forward_hop = Some(forward_addr);
stats = Some(UpdateStats {
target: Some(forward_target),
contract_location: Some(Location::from(key)),
});
} else {
let candidates = op_manager
.ring
.k_closest_potentially_hosting(key, skip_list.as_slice(), 5)
.into_iter()
.filter_map(|loc| loc.socket_addr())
.map(|addr| format!("{:.8}", addr))
.collect::<Vec<_>>();
let connection_count =
op_manager.ring.connection_manager.num_connections();
tracing::error!(
tx = %id,
contract = %key,
candidates = ?candidates,
connection_count,
peer_addr = ?sender_addr,
phase = "error",
"Cannot handle UPDATE: contract not found locally and no peers to forward to"
);
if let Some(event) = NetEventLog::update_failure(
id,
&op_manager.ring,
*key,
OperationFailure::NoPeersAvailable,
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
return Err(OpError::RingError(RingError::NoHostingPeers(
*key.id(),
)));
}
}
}
}
UpdateMsg::BroadcastTo {
id,
key,
payload,
sender_summary_bytes,
} => {
let sender_addr = source_addr.expect("BroadcastTo requires source_addr");
let self_location = op_manager.ring.connection_manager.own_location();
let sender_summary = StateSummary::from(sender_summary_bytes.clone());
if let Some(sender_pkl) = op_manager
.ring
.connection_manager
.get_peer_by_addr(sender_addr)
{
let sender_key = crate::ring::PeerKey::from(sender_pkl.pub_key().clone());
op_manager.interest_manager.update_peer_summary(
key,
&sender_key,
Some(sender_summary.clone()),
);
}
let update_data = match payload {
crate::message::DeltaOrFullState::Delta(bytes) => {
tracing::debug!(
contract = %key,
delta_size = bytes.len(),
"Received delta update"
);
UpdateData::Delta(StateDelta::from(bytes.clone()))
}
crate::message::DeltaOrFullState::FullState(bytes) => {
tracing::debug!(
contract = %key,
state_size = bytes.len(),
"Received full state update"
);
UpdateData::State(State::from(bytes.clone()))
}
};
let payload_bytes = match payload {
crate::message::DeltaOrFullState::FullState(bytes)
| crate::message::DeltaOrFullState::Delta(bytes) => bytes,
};
let state_for_telemetry = WrappedState::from(payload_bytes.clone());
if let Some(requester_pkl) = op_manager
.ring
.connection_manager
.get_peer_by_addr(sender_addr)
{
if let Some(event) = NetEventLog::update_broadcast_received(
id,
&op_manager.ring,
*key,
requester_pkl,
state_for_telemetry.clone(),
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
}
let is_delta_payload =
matches!(payload, crate::message::DeltaOrFullState::Delta(_));
if op_manager.broadcast_dedup_cache.check_and_insert(
key,
payload_bytes,
is_delta_payload,
op_manager.interest_manager.now(),
) {
tracing::debug!(
tx = %id,
%key,
"BroadcastTo skipped — duplicate payload (dedup cache hit)"
);
new_state = None;
return_msg = None;
return build_op_result(
self.id,
new_state,
return_msg,
stats,
self.upstream_addr,
forward_hop,
stream_data,
);
}
tracing::debug!("Attempting contract value update - BroadcastTo - update");
let is_delta = matches!(payload, crate::message::DeltaOrFullState::Delta(_));
let update_result =
update_contract(op_manager, *key, update_data, RelatedContracts::default())
.await;
let UpdateExecution {
value: updated_value,
summary: update_summary,
changed,
..
} = match update_result {
Ok(result) => result,
Err(err) => {
if err.is_invalid_update_rejection() {
let op_mgr = op_manager.clone();
let contract_key = *key;
let sender_summary = sender_summary_bytes.clone();
tokio::spawn(async move {
send_summary_back_on_rejection(
&op_mgr,
&contract_key,
sender_addr,
sender_summary,
)
.await;
});
}
if is_delta {
tracing::warn!(
tx = %id,
contract = %key,
sender = %sender_addr,
error = %err,
event = "delta_apply_failed",
"Delta application failed, sending ResyncRequest to get full state"
);
if let Some(sender_pkl) = op_manager
.ring
.connection_manager
.get_peer_by_addr(sender_addr)
{
let sender_key =
crate::ring::PeerKey::from(sender_pkl.pub_key().clone());
op_manager.interest_manager.update_peer_summary(
key,
&sender_key,
None,
);
}
tracing::info!(
tx = %id,
contract = %key,
target = %sender_addr,
event = "resync_request_sent",
"Sending ResyncRequest to peer after delta failure"
);
if let Err(e) = op_manager
.notify_node_event(
crate::message::NodeEvent::SendInterestMessage {
target: sender_addr,
message:
crate::message::InterestMessage::ResyncRequest {
key: *key,
},
},
)
.await
{
tracing::warn!(tx = %id, error = %e, "failed to send ResyncRequest");
}
} else if !err.is_contract_exec_rejection() {
op_manager.try_auto_fetch_contract(key, sender_addr);
}
return Err(err);
}
};
tracing::debug!("Contract successfully updated - BroadcastTo - update");
if let Some(event) = NetEventLog::update_broadcast_applied(
id,
&op_manager.ring,
*key,
&state_for_telemetry,
&updated_value,
changed,
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
if !changed {
tracing::debug!(
tx = %id,
%key,
"BroadcastTo update produced no change, ending propagation"
);
} else {
tracing::debug!(
"Successfully updated contract {} @ {:?} - BroadcastTo",
key,
self_location.location()
);
{
let op_mgr = op_manager.clone();
let contract_key = *key;
let summary = update_summary.clone();
tokio::spawn(async move {
send_proactive_summary_notification(
&op_mgr,
&contract_key,
sender_addr,
summary,
)
.await;
});
}
}
new_state = None;
return_msg = None;
}
UpdateMsg::RequestUpdateStreaming {
id,
stream_id,
key,
total_size,
} => {
use crate::operations::orphan_streams::{
OrphanStreamError, STREAM_CLAIM_TIMEOUT,
};
tracing::info!(
tx = %id,
contract = %key,
stream_id = %stream_id,
total_size,
"Processing UPDATE RequestUpdateStreaming"
);
let peer_addr = match source_addr {
Some(addr) => addr,
None => {
tracing::error!(tx = %id, "source_addr missing for streaming UPDATE request");
return Err(OpError::UnexpectedOpState);
}
};
let stream_handle = match op_manager
.orphan_stream_registry()
.claim_or_wait(peer_addr, *stream_id, STREAM_CLAIM_TIMEOUT)
.await
{
Ok(handle) => handle,
Err(OrphanStreamError::AlreadyClaimed) => {
tracing::debug!(
tx = %id,
stream_id = %stream_id,
"UPDATE RequestUpdateStreaming skipped — stream already claimed (dedup)"
);
if self.state.is_some() {
if let Err(e) = op_manager
.push(
*id,
OpEnum::Update(UpdateOp {
id: *id,
state: self.state,
stats,
upstream_addr: self.upstream_addr,
}),
)
.await
{
tracing::warn!(tx = %id, error = %e, "failed to push UPDATE op state back after dedup");
}
}
return Err(OpError::OpNotPresent(*id));
}
Err(e) => {
tracing::error!(
tx = %id,
stream_id = %stream_id,
error = %e,
"Failed to claim stream from orphan registry for UPDATE"
);
if self.state.is_some() {
if let Err(e) = op_manager
.push(
*id,
OpEnum::Update(UpdateOp {
id: *id,
state: self.state,
stats,
upstream_addr: self.upstream_addr,
}),
)
.await
{
tracing::warn!(tx = %id, error = %e, "failed to push UPDATE op state back after orphan claim failure");
}
}
return Err(OpError::OrphanStreamClaimFailed);
}
};
let stream_data = match stream_handle.assemble().await {
Ok(data) => {
tracing::debug!(
tx = %id,
stream_id = %stream_id,
assembled_size = data.len(),
expected_size = total_size,
"Stream assembled for UPDATE"
);
data
}
Err(e) => {
tracing::error!(
tx = %id,
stream_id = %stream_id,
error = %e,
"Failed to assemble stream for UPDATE"
);
return Err(OpError::StreamCancelled);
}
};
let payload: UpdateStreamingPayload = match bincode::deserialize(&stream_data) {
Ok(p) => p,
Err(e) => {
tracing::error!(
tx = %id,
error = %e,
"Failed to deserialize UpdateStreamingPayload"
);
return Err(OpError::invalid_transition(self.id));
}
};
let UpdateStreamingPayload {
related_contracts,
value,
} = payload;
let self_location = op_manager.ring.connection_manager.own_location();
let executing_addr = self_location.socket_addr();
tracing::debug!(
tx = %id,
%key,
executing_peer = ?executing_addr,
request_sender = ?source_addr,
"UPDATE RequestUpdateStreaming: applying update"
);
let UpdateExecution {
value: updated_value,
summary,
changed,
..
} = update_contract(
op_manager,
*key,
UpdateData::State(State::from(value.clone())),
related_contracts.clone(),
)
.await?;
let hash_after = Some(state_hash_full(&updated_value));
if let Some(sender_addr) = source_addr {
if let Some(requester_pkl) = op_manager
.ring
.connection_manager
.get_peer_by_addr(sender_addr)
{
if let Some(event) = NetEventLog::update_success(
id,
&op_manager.ring,
*key,
requester_pkl,
None, hash_after.clone(),
Some(updated_value.len()),
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
}
}
if !changed {
tracing::debug!(
tx = %id,
%key,
"UPDATE streaming yielded no state change"
);
} else {
tracing::debug!(
tx = %id,
%key,
"UPDATE streaming succeeded, state changed"
);
}
if self.upstream_addr.is_none() {
new_state = Some(UpdateState::Finished(FinishedData {
key: *key,
summary: summary.clone(),
}));
} else {
new_state = None;
}
return_msg = None;
}
UpdateMsg::BroadcastToStreaming {
id,
stream_id,
key,
total_size,
} => {
use crate::operations::orphan_streams::{
OrphanStreamError, STREAM_CLAIM_TIMEOUT,
};
let sender_addr = match source_addr {
Some(addr) => addr,
None => {
tracing::error!(
tx = %id,
contract = %key,
stream_id = %stream_id,
"BroadcastToStreaming received without source_addr"
);
return Err(OpError::UnexpectedOpState);
}
};
tracing::info!(
tx = %id,
contract = %key,
stream_id = %stream_id,
total_size,
sender = %sender_addr,
"Processing UPDATE BroadcastToStreaming"
);
let stream_handle = match op_manager
.orphan_stream_registry()
.claim_or_wait(sender_addr, *stream_id, STREAM_CLAIM_TIMEOUT)
.await
{
Ok(handle) => handle,
Err(OrphanStreamError::AlreadyClaimed) => {
tracing::debug!(
tx = %id,
stream_id = %stream_id,
"UPDATE BroadcastToStreaming skipped — stream already claimed (dedup)"
);
if self.state.is_some() {
if let Err(e) = op_manager
.push(
*id,
OpEnum::Update(UpdateOp {
id: *id,
state: self.state,
stats,
upstream_addr: self.upstream_addr,
}),
)
.await
{
tracing::warn!(tx = %id, error = %e, "failed to push UPDATE broadcast op state back after dedup");
}
}
return Err(OpError::OpNotPresent(*id));
}
Err(e) => {
tracing::error!(
tx = %id,
stream_id = %stream_id,
error = %e,
"Failed to claim stream from orphan registry for broadcast"
);
if self.state.is_some() {
if let Err(e) = op_manager
.push(
*id,
OpEnum::Update(UpdateOp {
id: *id,
state: self.state,
stats,
upstream_addr: self.upstream_addr,
}),
)
.await
{
tracing::warn!(tx = %id, error = %e, "failed to push UPDATE broadcast op state back after orphan claim failure");
}
}
return Err(OpError::OrphanStreamClaimFailed);
}
};
let stream_data = match stream_handle.assemble().await {
Ok(data) => {
tracing::debug!(
tx = %id,
stream_id = %stream_id,
assembled_size = data.len(),
expected_size = total_size,
"Stream assembled for broadcast"
);
data
}
Err(e) => {
tracing::error!(
tx = %id,
stream_id = %stream_id,
error = %e,
"Failed to assemble stream for broadcast"
);
return Err(OpError::StreamCancelled);
}
};
let payload: BroadcastStreamingPayload =
match bincode::deserialize(&stream_data) {
Ok(p) => p,
Err(e) => {
tracing::error!(
tx = %id,
error = %e,
"Failed to deserialize BroadcastStreamingPayload"
);
return Err(OpError::invalid_transition(self.id));
}
};
let BroadcastStreamingPayload {
state_bytes,
sender_summary_bytes,
} = payload;
let sender_summary = StateSummary::from(sender_summary_bytes.clone());
if let Some(sender_pkl) = op_manager
.ring
.connection_manager
.get_peer_by_addr(sender_addr)
{
let sender_key = crate::ring::PeerKey::from(sender_pkl.pub_key().clone());
op_manager.interest_manager.update_peer_summary(
key,
&sender_key,
Some(sender_summary.clone()),
);
}
if op_manager.broadcast_dedup_cache.check_and_insert(
key,
&state_bytes,
false,
op_manager.interest_manager.now(),
) {
tracing::debug!(
tx = %id,
%key,
"BroadcastToStreaming skipped — duplicate payload (dedup cache hit)"
);
new_state = None;
return_msg = None;
return build_op_result(
self.id,
new_state,
return_msg,
stats,
self.upstream_addr,
forward_hop,
None, );
}
let update_data = UpdateData::State(State::from(state_bytes.clone()));
let state_for_telemetry = WrappedState::from(state_bytes.clone());
if let Some(requester_pkl) = op_manager
.ring
.connection_manager
.get_peer_by_addr(sender_addr)
{
if let Some(event) = NetEventLog::update_broadcast_received(
id,
&op_manager.ring,
*key,
requester_pkl,
state_for_telemetry.clone(),
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
}
tracing::debug!("Attempting contract value update - BroadcastToStreaming");
match update_contract(
op_manager,
*key,
update_data,
RelatedContracts::default(),
)
.await
{
Ok(UpdateExecution {
value: updated_value,
summary: streaming_update_summary,
changed,
..
}) => {
tracing::debug!("Contract successfully updated - BroadcastToStreaming");
if let Some(event) = NetEventLog::update_broadcast_applied(
id,
&op_manager.ring,
*key,
&state_for_telemetry,
&updated_value,
changed,
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
if !changed {
tracing::debug!(
tx = %id,
%key,
"BroadcastToStreaming update produced no change"
);
} else {
crate::node::network_status::record_update_received();
tracing::debug!(
tx = %id,
%key,
"Successfully updated contract via BroadcastToStreaming"
);
{
let op_mgr = op_manager.clone();
let contract_key = *key;
let summary = streaming_update_summary.clone();
tokio::spawn(async move {
send_proactive_summary_notification(
&op_mgr,
&contract_key,
sender_addr,
summary,
)
.await;
});
}
}
}
Err(err) => {
if log_broadcast_to_streaming_failure(id, key, &err) {
op_manager.try_auto_fetch_contract(key, sender_addr);
} else if err.is_invalid_update_rejection() {
let op_mgr = op_manager.clone();
let contract_key = *key;
let sender_summary = sender_summary_bytes.clone();
tokio::spawn(async move {
send_summary_back_on_rejection(
&op_mgr,
&contract_key,
sender_addr,
sender_summary,
)
.await;
});
}
}
}
new_state = None;
return_msg = None;
}
}
build_op_result(
self.id,
new_state,
return_msg,
stats,
self.upstream_addr,
forward_hop,
stream_data,
)
})
}
}
pub(crate) const CONTRACT_FETCH_COOLDOWN_MS: u64 = 300_000;
impl OpManager {
pub(crate) fn try_auto_fetch_contract(&self, key: &ContractKey, sender_addr: SocketAddr) {
use crate::config::GlobalSimulationTime;
let instance_id = *key.id();
let now_ms = GlobalSimulationTime::read_time_ms();
{
use dashmap::mapref::entry::Entry;
match self.pending_contract_fetches.entry(instance_id) {
Entry::Occupied(mut existing) => {
let elapsed_ms = now_ms.saturating_sub(*existing.get());
if elapsed_ms < CONTRACT_FETCH_COOLDOWN_MS {
return; }
*existing.get_mut() = now_ms;
}
Entry::Vacant(slot) => {
slot.insert(now_ms);
}
}
}
let sender_pkl = match self.ring.connection_manager.get_peer_by_addr(sender_addr) {
Some(pkl) => pkl,
None => {
tracing::debug!(
contract = %key,
sender = %sender_addr,
"Cannot auto-fetch: UPDATE sender not found in connection manager"
);
self.pending_contract_fetches.remove(&instance_id);
return;
}
};
tracing::info!(
contract = %key,
sender = %sender_addr,
"Auto-fetching contract from UPDATE sender (missing parameters)"
);
let max_htl = self.ring.max_hops_to_live;
let op_manager = self.clone();
crate::config::GlobalExecutor::spawn(async move {
let (targeted_op, msg) =
super::get::start_targeted_op(instance_id, sender_pkl, max_htl);
match op_manager
.notify_op_change(
crate::message::NetMessage::from(msg),
super::OpEnum::Get(targeted_op),
)
.await
{
Ok(()) => {
tracing::info!(
contract = %instance_id,
target = %sender_addr,
"Auto-fetch GET sent directly to UPDATE sender"
);
}
Err(e) => {
tracing::warn!(
contract = %instance_id,
error = %e,
"Auto-fetch GET failed to send to UPDATE sender"
);
op_manager.pending_contract_fetches.remove(&instance_id);
}
}
});
}
pub(crate) fn get_broadcast_targets_update(
&self,
key: &ContractKey,
sender: &SocketAddr,
) -> BroadcastTargetResult {
use std::collections::HashSet;
let self_addr = self.ring.connection_manager.get_own_addr();
let is_local_update_initiator = self_addr.as_ref().map(|me| me == sender).unwrap_or(false);
let mut targets: HashSet<PeerKeyLocation> = HashSet::new();
let mut proximity_resolve_failed: usize = 0;
let mut interest_resolve_failed: usize = 0;
let mut skipped_self: usize = 0;
let mut skipped_sender: usize = 0;
let proximity_pub_keys = self.neighbor_hosting.neighbors_with_contract(key);
let proximity_found = proximity_pub_keys.len();
for pub_key in proximity_pub_keys {
if let Some(pkl) = self.ring.connection_manager.get_peer_by_pub_key(&pub_key) {
if let Some(pkl_addr) = pkl.socket_addr() {
if &pkl_addr == sender && !is_local_update_initiator {
skipped_sender += 1;
continue;
}
if !is_local_update_initiator && self_addr.as_ref() == Some(&pkl_addr) {
skipped_self += 1;
continue;
}
}
targets.insert(pkl);
} else {
proximity_resolve_failed += 1;
tracing::warn!(
contract = %format!("{:.8}", key),
proximity_neighbor = %pub_key,
is_local = is_local_update_initiator,
phase = "target_lookup_failed",
"Proximity cache neighbor not found in connection manager"
);
}
}
let interested_peers = self.interest_manager.get_interested_peers(key);
let interest_found = interested_peers.len();
for (peer_key, _interest) in interested_peers {
if let Some(pkl) = self
.ring
.connection_manager
.get_peer_by_pub_key(&peer_key.0)
{
if let Some(pkl_addr) = pkl.socket_addr() {
if &pkl_addr == sender && !is_local_update_initiator {
skipped_sender += 1;
continue;
}
if !is_local_update_initiator && self_addr.as_ref() == Some(&pkl_addr) {
skipped_self += 1;
continue;
}
}
targets.insert(pkl);
} else {
interest_resolve_failed += 1;
tracing::warn!(
contract = %format!("{:.8}", key),
interest_peer = %peer_key.0,
is_local = is_local_update_initiator,
phase = "target_lookup_failed",
"Interest manager peer not found in connection manager"
);
}
}
let mut result: Vec<PeerKeyLocation> = targets.into_iter().collect();
result.sort();
if !result.is_empty() {
tracing::info!(
contract = %format!("{:.8}", key),
peer_addr = %sender,
targets = %result
.iter()
.filter_map(|s| s.socket_addr())
.map(|addr| format!("{:.8}", addr))
.collect::<Vec<_>>()
.join(","),
count = result.len(),
proximity_sources = proximity_found,
interest_sources = interest_found,
phase = "broadcast",
"UPDATE_PROPAGATION"
);
} else {
tracing::debug!(
contract = %format!("{:.8}", key),
peer_addr = %sender,
self_addr = ?self_addr.map(|a| format!("{:.8}", a)),
proximity_sources = proximity_found,
interest_sources = interest_found,
phase = "warning",
"UPDATE_PROPAGATION: NO_TARGETS - update will not propagate further"
);
}
BroadcastTargetResult {
targets: result,
proximity_found,
proximity_resolve_failed,
interest_found,
interest_resolve_failed,
skipped_self,
skipped_sender,
}
}
}
fn build_op_result(
id: Transaction,
state: Option<UpdateState>,
return_msg: Option<UpdateMsg>,
stats: Option<UpdateStats>,
upstream_addr: Option<std::net::SocketAddr>,
forward_hop: Option<std::net::SocketAddr>,
stream_data: Option<(StreamId, bytes::Bytes)>,
) -> Result<super::OperationResult, OpError> {
let next_hop = forward_hop;
let output_op = state.map(|op| UpdateOp {
id,
state: Some(op),
stats,
upstream_addr,
});
let op_state = output_op.map(OpEnum::Update);
let return_msg = return_msg.map(NetMessage::from);
Ok(match (return_msg, op_state) {
(Some(msg), Some(state)) => OperationResult::SendAndContinue {
msg,
next_hop,
state,
stream_data,
},
(Some(msg), None) => OperationResult::SendAndComplete {
msg,
next_hop,
stream_data,
},
(None, Some(state)) => OperationResult::ContinueOp(state),
(None, None) => OperationResult::Completed,
})
}
fn log_update_contract_failure(key: &ContractKey, err: &ExecutorError) {
if err.is_invalid_update_rejection() {
tracing::info!(
contract = %key,
error = %err,
event = "merge_rejected_invalid_update",
"Update rejected by contract: incoming state invalid (likely stale rebroadcast), keeping local"
);
} else {
tracing::error!(
contract = %key,
error = %err,
phase = "error",
"Failed to update contract value"
);
}
}
pub(crate) fn log_broadcast_to_streaming_failure(
tx: &Transaction,
key: &ContractKey,
err: &OpError,
) -> bool {
if err.is_invalid_update_rejection() {
tracing::info!(
tx = %tx,
%key,
error = %err,
event = "merge_rejected_invalid_update",
"BroadcastToStreaming merge rejected: incoming state invalid (likely stale rebroadcast), keeping local"
);
} else {
tracing::warn!(
tx = %tx,
%key,
error = %err,
"BroadcastToStreaming update skipped: contract not ready locally"
);
}
!err.is_contract_exec_rejection()
}
pub(crate) async fn update_contract(
op_manager: &OpManager,
key: ContractKey,
update_data: UpdateData<'static>,
related_contracts: RelatedContracts<'static>,
) -> Result<UpdateExecution, OpError> {
let previous_state = match op_manager
.notify_contract_handler(ContractHandlerEvent::GetQuery {
instance_id: *key.id(),
return_contract_code: false,
})
.await
{
Ok(ContractHandlerEvent::GetResponse {
response: Ok(StoreResponse { state, .. }),
..
}) => state,
Ok(other) => {
tracing::trace!(?other, %key, "Unexpected get response while preparing update summary");
None
}
Err(err) => {
tracing::debug!(%key, %err, "Failed to fetch existing contract state before update");
None
}
};
match op_manager
.notify_contract_handler(ContractHandlerEvent::UpdateQuery {
key,
data: update_data.clone(),
related_contracts,
})
.await
{
Ok(ContractHandlerEvent::UpdateResponse {
new_value: Ok(new_val),
state_changed,
}) => {
debug_assert!(
new_val.size() > 0,
"update_contract: state must be non-empty after successful UPDATE for contract {key}"
);
let new_bytes = State::from(new_val.clone()).into_bytes();
let summary = StateSummary::from(new_bytes);
Ok(UpdateExecution {
value: new_val,
summary,
changed: state_changed,
})
}
Ok(ContractHandlerEvent::UpdateResponse {
new_value: Err(err),
..
}) => {
log_update_contract_failure(&key, &err);
Err(err.into())
}
Ok(ContractHandlerEvent::UpdateNoChange { .. }) => {
fn extract_state_from_update_data(
update_data: &UpdateData<'static>,
) -> Option<WrappedState> {
match update_data {
UpdateData::State(s) => Some(WrappedState::from(s.clone().into_bytes())),
UpdateData::StateAndDelta { state, .. }
| UpdateData::RelatedState { state, .. }
| UpdateData::RelatedStateAndDelta { state, .. } => {
Some(WrappedState::from(state.clone().into_bytes()))
}
UpdateData::Delta(_) | UpdateData::RelatedDelta { .. } => None,
_ => None,
}
}
let resolved_state = match previous_state {
Some(prev_state) => prev_state,
None => {
let fetched_state = op_manager
.notify_contract_handler(ContractHandlerEvent::GetQuery {
instance_id: *key.id(),
return_contract_code: false,
})
.await
.ok()
.and_then(|event| match event {
ContractHandlerEvent::GetResponse {
response: Ok(StoreResponse { state, .. }),
..
} => state,
ContractHandlerEvent::DelegateRequest { .. }
| ContractHandlerEvent::DelegateResponse(_)
| ContractHandlerEvent::PutQuery { .. }
| ContractHandlerEvent::PutResponse { .. }
| ContractHandlerEvent::GetQuery { .. }
| ContractHandlerEvent::GetResponse { .. }
| ContractHandlerEvent::UpdateQuery { .. }
| ContractHandlerEvent::UpdateResponse { .. }
| ContractHandlerEvent::UpdateNoChange { .. }
| ContractHandlerEvent::RegisterSubscriberListener { .. }
| ContractHandlerEvent::RegisterSubscriberListenerResponse
| ContractHandlerEvent::QuerySubscriptions { .. }
| ContractHandlerEvent::QuerySubscriptionsResponse
| ContractHandlerEvent::GetSummaryQuery { .. }
| ContractHandlerEvent::GetSummaryResponse { .. }
| ContractHandlerEvent::GetDeltaQuery { .. }
| ContractHandlerEvent::GetDeltaResponse { .. }
| ContractHandlerEvent::NotifySubscriptionError { .. }
| ContractHandlerEvent::NotifySubscriptionErrorResponse
| ContractHandlerEvent::ClientDisconnect { .. } => None,
});
match fetched_state {
Some(state) => state,
None => {
tracing::debug!(
%key,
"Fallback fetch for UpdateNoChange returned no state; trying to extract from update_data"
);
match extract_state_from_update_data(&update_data) {
Some(state) => state,
None => {
tracing::error!(
%key,
"Cannot extract state from delta-only UpdateData in NoChange fallback"
);
return Err(OpError::UnexpectedOpState);
}
}
}
}
}
};
let bytes = State::from(resolved_state.clone()).into_bytes();
let summary = StateSummary::from(bytes);
Ok(UpdateExecution {
value: resolved_state,
summary,
changed: false,
})
}
Err(err) => Err(err.into()),
Ok(other) => {
tracing::error!(event = ?other, contract = %key, phase = "error", "Unexpected event from contract handler during update");
Err(OpError::UnexpectedOpState)
}
}
}
pub(crate) fn start_op(
key: ContractKey,
update_data: UpdateData<'static>,
related_contracts: RelatedContracts<'static>,
) -> UpdateOp {
let contract_location = Location::from(&key);
tracing::debug!(%contract_location, %key, "Requesting update");
let id = Transaction::new::<UpdateMsg>();
let state = Some(UpdateState::PrepareRequest(PrepareRequestData {
key,
related_contracts,
update_data,
}));
UpdateOp {
id,
state,
stats: Some(UpdateStats {
target: None,
contract_location: Some(contract_location),
}),
upstream_addr: None, }
}
#[allow(dead_code)] pub(crate) fn start_op_with_id(
key: ContractKey,
update_data: UpdateData<'static>,
related_contracts: RelatedContracts<'static>,
id: Transaction,
) -> UpdateOp {
let contract_location = Location::from(&key);
tracing::debug!(%contract_location, %key, "Requesting update with transaction ID {}", id);
let state = Some(UpdateState::PrepareRequest(PrepareRequestData {
key,
related_contracts,
update_data,
}));
UpdateOp {
id,
state,
stats: Some(UpdateStats {
target: None,
contract_location: Some(contract_location),
}),
upstream_addr: None, }
}
pub(crate) async fn request_update(
op_manager: &OpManager,
mut update_op: UpdateOp,
) -> Result<(), OpError> {
let (key, update_data, related_contracts) =
if let Some(UpdateState::PrepareRequest(data)) = update_op.state.take() {
(data.key, data.update_data, data.related_contracts)
} else {
return Err(OpError::UnexpectedOpState);
};
let sender_addr = op_manager.ring.connection_manager.peer_addr()?;
let proximity_neighbors: Vec<_> = op_manager.neighbor_hosting.neighbors_with_contract(&key);
let mut target_from_proximity = None;
for pub_key in &proximity_neighbors {
match op_manager
.ring
.connection_manager
.get_peer_by_pub_key(pub_key)
{
Some(peer) => {
if peer
.socket_addr()
.map(|a| a == sender_addr)
.unwrap_or(false)
{
continue;
}
target_from_proximity = Some(peer);
break;
}
None => {
tracing::debug!(
%key,
peer = %pub_key,
"UPDATE: Proximity cache neighbor not connected, trying next"
);
}
}
}
let target = if let Some(proximity_neighbor) = target_from_proximity {
tracing::debug!(
%key,
target = ?proximity_neighbor.socket_addr(),
proximity_neighbors_found = proximity_neighbors.len(),
"UPDATE: Using proximity cache neighbor as target"
);
proximity_neighbor
} else {
let remote_target = op_manager
.ring
.closest_potentially_hosting(&key, [sender_addr].as_slice());
if let Some(target) = remote_target {
target
} else {
tracing::debug!(
"UPDATE: No remote peers available for contract {}, handling locally",
key
);
let id = update_op.id;
let is_hosting = op_manager.ring.is_hosting_contract(&key);
let should_handle_update = is_hosting;
if !should_handle_update {
tracing::error!(
contract = %key,
phase = "error",
"UPDATE: Cannot update contract on isolated node - contract not hosted"
);
return Err(OpError::RingError(RingError::NoHostingPeers(*key.id())));
}
let UpdateExecution {
value: _updated_value,
summary,
changed,
..
} = update_contract(op_manager, key, update_data, related_contracts).await?;
tracing::debug!(
tx = %id,
%key,
"Successfully updated contract locally on isolated node"
);
if !changed {
tracing::debug!(
tx = %id,
%key,
"Local update resulted in no change; finishing without broadcast"
);
deliver_update_result(op_manager, id, key, summary.clone()).await?;
return Ok(());
}
deliver_update_result(op_manager, id, key, summary.clone()).await?;
tracing::debug!(
tx = %id,
%key,
"UPDATE operation complete on isolated node"
);
return Ok(());
}
};
let id = update_op.id;
let target_addr = target
.socket_addr()
.expect("target must have socket address");
tracing::debug!(
tx = %id,
%key,
target_peer = %target_addr,
"Applying UPDATE locally before forwarding to target peer"
);
let UpdateExecution {
value: updated_value,
summary,
changed: _changed,
..
} = update_contract(
op_manager,
key,
update_data.clone(),
related_contracts.clone(),
)
.await
.map_err(|e| {
tracing::error!(
tx = %id,
contract = %key,
error = %e,
phase = "error",
"Failed to apply update locally before forwarding UPDATE"
);
e
})?;
tracing::debug!(
tx = %id,
%key,
"Local update complete, now forwarding UPDATE to target peer"
);
if let Some(stats) = &mut update_op.stats {
stats.target = Some(target.clone());
}
if let Some(event) = NetEventLog::update_request(&id, &op_manager.ring, key, target.clone()) {
op_manager.ring.register_events(Either::Left(event)).await;
}
let msg = UpdateMsg::RequestUpdate {
id,
key,
related_contracts,
value: updated_value, };
let op_state = UpdateOp {
id,
state: Some(UpdateState::ReceivedRequest),
stats: Some(UpdateStats {
target: Some(target),
contract_location: Some(Location::from(&key)),
}),
upstream_addr: None, };
op_manager
.notify_op_change(NetMessage::from(msg), OpEnum::Update(op_state))
.await?;
let op = UpdateOp {
id,
state: Some(UpdateState::Finished(FinishedData {
key,
summary: summary.clone(),
})),
stats: None,
upstream_addr: None,
};
let host_result = op.to_host_result();
op_manager
.result_router_tx
.try_send((id, host_result))
.map_err(|error| {
tracing::error!(tx = %id, error = %error, phase = "error", "Failed to send UPDATE result to result router (channel full or closed)");
OpError::NotificationError
})?;
Ok(())
}
async fn deliver_update_result(
op_manager: &OpManager,
id: Transaction,
key: ContractKey,
summary: StateSummary<'static>,
) -> Result<(), OpError> {
let op = UpdateOp {
id,
state: Some(UpdateState::Finished(FinishedData {
key,
summary: summary.clone(),
})),
stats: None,
upstream_addr: None, };
let host_result = op.to_host_result();
op_manager
.result_router_tx
.try_send((id, host_result))
.map_err(|error| {
tracing::error!(
tx = %id,
error = %error,
phase = "error",
"Failed to send UPDATE result to result router (channel full or closed)"
);
OpError::NotificationError
})?;
if let Err(error) = op_manager
.to_event_listener
.notifications_sender()
.try_send(Either::Right(NodeEvent::TransactionCompleted(id)))
{
tracing::warn!(
tx = %id,
error = %error,
phase = "error",
"Failed to notify transaction completion for UPDATE"
);
}
op_manager.completed(id);
Ok(())
}
impl IsOperationCompleted for UpdateOp {
fn is_completed(&self) -> bool {
matches!(self.state, Some(UpdateState::Finished(_)))
}
}
pub(crate) async fn send_proactive_summary_notification(
op_manager: &OpManager,
key: &ContractKey,
sender_addr: SocketAddr,
summary: StateSummary<'static>,
) {
use crate::message::{InterestMessage, SummaryEntry};
use crate::ring::interest::contract_hash;
if !op_manager
.interest_manager
.should_send_summary_notification(key)
{
return;
}
let hash = contract_hash(key);
let message = InterestMessage::Summaries {
entries: vec![SummaryEntry::from_summary(hash, Some(&summary))],
};
let interested = op_manager.interest_manager.get_interested_peers(key);
let self_addr = op_manager.ring.connection_manager.get_own_addr();
for (peer_key, _interest) in &interested {
let peer_addr = match op_manager
.ring
.connection_manager
.get_peer_by_pub_key(&peer_key.0)
{
Some(pkl) => match pkl.socket_addr() {
Some(addr) => addr,
None => continue,
},
None => continue,
};
if peer_addr == sender_addr {
continue;
}
if self_addr.as_ref() == Some(&peer_addr) {
continue;
}
if let Err(e) = op_manager
.notify_node_event(NodeEvent::SendInterestMessage {
target: peer_addr,
message: message.clone(),
})
.await
{
tracing::debug!(
contract = %key,
peer = %peer_addr,
error = %e,
"Failed to send proactive summary notification"
);
}
}
tracing::debug!(
contract = %key,
peer_count = interested.len(),
"Sent proactive summary notifications after state change"
);
}
pub(crate) async fn send_summary_back_on_rejection(
op_manager: &OpManager,
key: &ContractKey,
target_addr: SocketAddr,
sender_summary_bytes: Vec<u8>,
) {
use crate::message::{InterestMessage, SummaryEntry};
use crate::ring::interest::contract_hash;
if !op_manager
.interest_manager
.should_send_summary_notification(key)
{
return;
}
let Some(our_summary) = op_manager
.interest_manager
.get_contract_summary(op_manager, key)
.await
else {
tracing::debug!(
contract = %key,
peer = %target_addr,
"Skipping summary-back on rejection — no local summary available"
);
return;
};
if our_summary.as_ref() != sender_summary_bytes.as_slice() {
tracing::debug!(
contract = %key,
peer = %target_addr,
"Skipping summary-back on rejection — sender's summary differs \
from ours (peer is genuinely out of sync; heartbeat will converge)"
);
return;
}
let hash = contract_hash(key);
let message = InterestMessage::Summaries {
entries: vec![SummaryEntry::from_summary(hash, Some(&our_summary))],
};
if let Err(e) = op_manager
.notify_node_event(NodeEvent::SendInterestMessage {
target: target_addr,
message,
})
.await
{
tracing::info!(
contract = %key,
peer = %target_addr,
error = %e,
"Failed to send summary-back after broadcast rejection"
);
}
}
mod messages {
use std::fmt::Display;
use freenet_stdlib::prelude::{ContractKey, RelatedContracts, WrappedState};
use serde::{Deserialize, Serialize};
use crate::{
message::{InnerMessage, Transaction},
ring::Location,
transport::peer_connection::StreamId,
};
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct UpdateStreamingPayload {
#[serde(deserialize_with = "RelatedContracts::deser_related_contracts")]
pub related_contracts: RelatedContracts<'static>,
pub value: WrappedState,
}
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct BroadcastStreamingPayload {
pub state_bytes: Vec<u8>,
pub sender_summary_bytes: Vec<u8>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub(crate) enum UpdateMsg {
RequestUpdate {
id: Transaction,
key: ContractKey,
#[serde(deserialize_with = "RelatedContracts::deser_related_contracts")]
related_contracts: RelatedContracts<'static>,
value: WrappedState,
},
BroadcastTo {
id: Transaction,
key: ContractKey,
payload: crate::message::DeltaOrFullState,
sender_summary_bytes: Vec<u8>,
},
RequestUpdateStreaming {
id: Transaction,
stream_id: StreamId,
key: ContractKey,
total_size: u64,
},
BroadcastToStreaming {
id: Transaction,
stream_id: StreamId,
key: ContractKey,
total_size: u64,
},
}
impl InnerMessage for UpdateMsg {
fn id(&self) -> &Transaction {
match self {
UpdateMsg::RequestUpdate { id, .. }
| UpdateMsg::BroadcastTo { id, .. }
| UpdateMsg::RequestUpdateStreaming { id, .. }
| UpdateMsg::BroadcastToStreaming { id, .. } => id,
}
}
fn requested_location(&self) -> Option<crate::ring::Location> {
match self {
UpdateMsg::RequestUpdate { key, .. }
| UpdateMsg::BroadcastTo { key, .. }
| UpdateMsg::RequestUpdateStreaming { key, .. }
| UpdateMsg::BroadcastToStreaming { key, .. } => Some(Location::from(key.id())),
}
}
}
impl Display for UpdateMsg {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
UpdateMsg::RequestUpdate { id, .. } => write!(f, "RequestUpdate(id: {id})"),
UpdateMsg::BroadcastTo { id, .. } => write!(f, "BroadcastTo(id: {id})"),
UpdateMsg::RequestUpdateStreaming { id, stream_id, .. } => {
write!(f, "RequestUpdateStreaming(id: {id}, stream: {stream_id})")
}
UpdateMsg::BroadcastToStreaming { id, stream_id, .. } => {
write!(f, "BroadcastToStreaming(id: {id}, stream: {stream_id})")
}
}
}
}
}
#[derive(Debug)]
pub struct FinishedData {
pub key: ContractKey,
pub summary: StateSummary<'static>,
}
#[derive(Debug)]
pub struct PrepareRequestData {
pub key: ContractKey,
pub related_contracts: RelatedContracts<'static>,
pub update_data: UpdateData<'static>,
}
impl PrepareRequestData {
#[allow(dead_code)] pub fn into_finished(self, summary: StateSummary<'static>) -> FinishedData {
FinishedData {
key: self.key,
summary,
}
}
}
#[derive(Debug)]
pub enum UpdateState {
ReceivedRequest,
Finished(FinishedData),
PrepareRequest(PrepareRequestData),
}
#[cfg(test)]
#[allow(clippy::wildcard_enum_match_arm)]
mod tests {
use super::*;
use crate::operations::OpOutcome;
use crate::operations::test_utils::make_contract_key;
fn make_update_op(state: Option<UpdateState>, stats: Option<UpdateStats>) -> UpdateOp {
UpdateOp {
id: Transaction::new::<UpdateMsg>(),
state,
stats,
upstream_addr: None,
}
}
#[test]
fn is_client_initiated_true_when_no_upstream() {
let op = make_update_op(None, None);
assert!(op.is_client_initiated());
}
#[test]
fn is_client_initiated_false_when_forwarded() {
let op = UpdateOp {
id: Transaction::new::<UpdateMsg>(),
state: None,
stats: None,
upstream_addr: Some("127.0.0.1:12345".parse().unwrap()),
};
assert!(!op.is_client_initiated());
}
#[test]
fn update_op_outcome_success_untimed_when_finalized_with_full_stats() {
let target = PeerKeyLocation::random();
let loc = Location::random();
let op = make_update_op(
Some(UpdateState::Finished(FinishedData {
key: make_contract_key(1),
summary: StateSummary::from(vec![1u8]),
})),
Some(UpdateStats {
target: Some(target.clone()),
contract_location: Some(loc),
}),
);
match op.outcome() {
OpOutcome::ContractOpSuccessUntimed {
target_peer,
contract_location,
} => {
assert_eq!(*target_peer, target);
assert_eq!(contract_location, loc);
}
OpOutcome::ContractOpSuccess { .. }
| OpOutcome::ContractOpFailure { .. }
| OpOutcome::Incomplete
| OpOutcome::Irrelevant => {
panic!("Expected ContractOpSuccessUntimed for finalized update with full stats")
}
}
}
#[test]
fn update_op_outcome_irrelevant_when_finalized_without_stats() {
let op = make_update_op(
Some(UpdateState::Finished(FinishedData {
key: make_contract_key(1),
summary: StateSummary::from(vec![1u8]),
})),
None,
);
assert!(matches!(op.outcome(), OpOutcome::Irrelevant));
}
#[test]
fn update_op_outcome_irrelevant_when_finalized_with_partial_stats() {
let op = make_update_op(
Some(UpdateState::Finished(FinishedData {
key: make_contract_key(1),
summary: StateSummary::from(vec![1u8]),
})),
Some(UpdateStats {
target: None,
contract_location: Some(Location::random()),
}),
);
assert!(matches!(op.outcome(), OpOutcome::Irrelevant));
}
#[test]
fn update_op_outcome_failure_when_not_finalized_with_full_stats() {
let target = PeerKeyLocation::random();
let loc = Location::random();
let op = make_update_op(
Some(UpdateState::ReceivedRequest),
Some(UpdateStats {
target: Some(target.clone()),
contract_location: Some(loc),
}),
);
match op.outcome() {
OpOutcome::ContractOpFailure {
target_peer,
contract_location,
} => {
assert_eq!(*target_peer, target);
assert_eq!(contract_location, loc);
}
OpOutcome::ContractOpSuccess { .. }
| OpOutcome::ContractOpSuccessUntimed { .. }
| OpOutcome::Incomplete
| OpOutcome::Irrelevant => {
panic!("Expected ContractOpFailure for non-finalized update with full stats")
}
}
}
#[test]
fn update_op_outcome_incomplete_when_not_finalized_without_stats() {
let op = make_update_op(Some(UpdateState::ReceivedRequest), None);
assert!(matches!(op.outcome(), OpOutcome::Incomplete));
}
#[test]
fn update_op_failure_routing_info_with_full_stats() {
let target = PeerKeyLocation::random();
let loc = Location::random();
let op = make_update_op(
None,
Some(UpdateStats {
target: Some(target.clone()),
contract_location: Some(loc),
}),
);
let info = op.failure_routing_info().expect("should have routing info");
assert_eq!(info.0, target);
assert_eq!(info.1, loc);
}
#[test]
fn update_op_failure_routing_info_without_stats() {
let op = make_update_op(None, None);
assert!(op.failure_routing_info().is_none());
}
#[test]
fn update_op_failure_routing_info_with_partial_stats() {
let op = make_update_op(
None,
Some(UpdateStats {
target: None,
contract_location: Some(Location::random()),
}),
);
assert!(op.failure_routing_info().is_none());
}
#[test]
fn start_op_creates_update_with_partial_stats() {
use crate::operations::test_utils::make_test_contract;
let contract = make_test_contract(&[1u8]);
let contract_key = contract.key();
let contract_location = Location::from(&contract_key);
let op = start_op(
contract_key,
UpdateData::State(WrappedState::new(vec![1u8]).into()),
RelatedContracts::default(),
);
let stats = op.stats.as_ref().expect("start_op should create stats");
assert!(
stats.target.is_none(),
"target should be None before forwarding"
);
assert_eq!(
stats.contract_location,
Some(contract_location),
"contract_location should be set from start_op"
);
assert!(matches!(op.outcome(), OpOutcome::Incomplete));
}
#[test]
fn update_op_stats_lifecycle_from_partial_to_complete() {
let target = PeerKeyLocation::random();
let loc = Location::random();
let mut op = make_update_op(
Some(UpdateState::ReceivedRequest),
Some(UpdateStats {
target: None,
contract_location: Some(loc),
}),
);
assert!(matches!(op.outcome(), OpOutcome::Incomplete));
assert!(op.failure_routing_info().is_none());
op.stats = Some(UpdateStats {
target: Some(target.clone()),
contract_location: Some(loc),
});
match op.outcome() {
OpOutcome::ContractOpFailure {
target_peer,
contract_location,
} => {
assert_eq!(*target_peer, target);
assert_eq!(contract_location, loc);
}
OpOutcome::ContractOpSuccess { .. }
| OpOutcome::ContractOpSuccessUntimed { .. }
| OpOutcome::Incomplete
| OpOutcome::Irrelevant => {
panic!("Expected ContractOpFailure for in-progress update with stats")
}
}
assert!(op.failure_routing_info().is_some());
op.state = Some(UpdateState::Finished(FinishedData {
key: make_contract_key(1),
summary: StateSummary::from(vec![1u8]),
}));
match op.outcome() {
OpOutcome::ContractOpSuccessUntimed {
target_peer,
contract_location,
} => {
assert_eq!(*target_peer, target);
assert_eq!(contract_location, loc);
}
OpOutcome::ContractOpSuccess { .. }
| OpOutcome::ContractOpFailure { .. }
| OpOutcome::Incomplete
| OpOutcome::Irrelevant => {
panic!("Expected ContractOpSuccessUntimed for finished update with stats")
}
}
}
#[test]
fn update_op_outcome_with_target_but_no_contract_location() {
let target = PeerKeyLocation::random();
let op = make_update_op(
Some(UpdateState::ReceivedRequest),
Some(UpdateStats {
target: Some(target.clone()),
contract_location: None,
}),
);
assert!(matches!(op.outcome(), OpOutcome::Incomplete));
assert!(op.failure_routing_info().is_none());
let op = make_update_op(
Some(UpdateState::Finished(FinishedData {
key: make_contract_key(1),
summary: StateSummary::from(vec![1u8]),
})),
Some(UpdateStats {
target: Some(target),
contract_location: None,
}),
);
assert!(matches!(op.outcome(), OpOutcome::Irrelevant));
}
use crate::operations::test_utils::make_peer;
#[test]
fn test_update_failure_outcome_with_stats() {
let target = make_peer(9001);
let contract_location = Location::from(&make_contract_key(42));
let op = make_update_op(
Some(UpdateState::ReceivedRequest),
Some(UpdateStats {
target: Some(target.clone()),
contract_location: Some(contract_location),
}),
);
assert!(!op.finalized());
match op.outcome() {
OpOutcome::ContractOpFailure {
target_peer,
contract_location: loc,
} => {
assert_eq!(target_peer, &target);
assert_eq!(loc, contract_location);
}
other => panic!("Expected ContractOpFailure, got {other:?}"),
}
}
#[test]
fn test_update_failure_outcome_without_stats() {
let op = make_update_op(Some(UpdateState::ReceivedRequest), None);
assert!(!op.finalized());
assert!(
matches!(op.outcome(), OpOutcome::Incomplete),
"UPDATE without stats should return Incomplete"
);
}
#[test]
fn test_update_failure_routing_info() {
let target = make_peer(9002);
let contract_location = Location::from(&make_contract_key(42));
let op = make_update_op(
Some(UpdateState::ReceivedRequest),
Some(UpdateStats {
target: Some(target.clone()),
contract_location: Some(contract_location),
}),
);
let (peer, loc) = op.failure_routing_info().expect("should have routing info");
assert_eq!(peer, target);
assert_eq!(loc, contract_location);
}
mod log_severity {
use super::*;
use crate::contract::ExecutorError;
use crate::test_utils::TestLogger;
use freenet_stdlib::client_api::{ContractError as StdContractError, RequestError};
fn invalid_update_rejection() -> ExecutorError {
let req: RequestError = StdContractError::update_exec_error(
make_contract_key(1),
"invalid contract update, reason: New state version 100 must be higher than current version 100",
)
.into();
req.into()
}
fn out_of_gas_failure() -> ExecutorError {
let req: RequestError = StdContractError::update_exec_error(
make_contract_key(1),
"The operation ran out of gas. This might be caused by an infinite loop or an inefficient computation.",
)
.into();
req.into()
}
fn missing_parameters_failure() -> ExecutorError {
let req: RequestError = StdContractError::Update {
key: make_contract_key(2),
cause: "missing contract parameters".into(),
}
.into();
req.into()
}
#[test]
fn update_contract_failure_logs_info_for_invalid_update_rejection() {
let logger = TestLogger::new().capture_logs().with_level("info").init();
log_update_contract_failure(&make_contract_key(1), &invalid_update_rejection());
assert!(
logger.contains("merge_rejected_invalid_update"),
"expected event=merge_rejected_invalid_update in logs, got: {:?}",
logger.logs()
);
assert!(
logger.contains("INFO"),
"expected INFO-level log for invalid-update rejection, got: {:?}",
logger.logs()
);
assert!(
!logger.logs().iter().any(|l| l.contains("ERROR")),
"invalid-update rejection must not produce ERROR-level logs, got: {:?}",
logger.logs()
);
}
#[test]
fn update_contract_failure_logs_error_for_real_failure() {
let logger = TestLogger::new().capture_logs().with_level("info").init();
log_update_contract_failure(&make_contract_key(2), &missing_parameters_failure());
assert!(
logger.contains("ERROR"),
"real failures must remain ERROR-level, got: {:?}",
logger.logs()
);
assert!(
logger.contains("Failed to update contract value"),
"expected ERROR message text, got: {:?}",
logger.logs()
);
}
#[test]
fn update_contract_failure_logs_error_for_out_of_gas() {
let logger = TestLogger::new().capture_logs().with_level("info").init();
log_update_contract_failure(&make_contract_key(1), &out_of_gas_failure());
assert!(
logger.contains("ERROR"),
"out-of-gas must remain ERROR-level (real WASM fault), got: {:?}",
logger.logs()
);
assert!(
!logger
.logs()
.iter()
.any(|l| l.contains("merge_rejected_invalid_update")),
"out-of-gas must NOT be classified as a benign rejection, got: {:?}",
logger.logs()
);
}
#[test]
fn broadcast_to_streaming_failure_logs_info_and_skips_auto_fetch_for_invalid_update() {
let logger = TestLogger::new().capture_logs().with_level("info").init();
let tx = Transaction::new::<UpdateMsg>();
let err: OpError = invalid_update_rejection().into();
let needs_auto_fetch =
log_broadcast_to_streaming_failure(&tx, &make_contract_key(1), &err);
assert!(
!needs_auto_fetch,
"invalid-update rejection must NOT trigger self-heal auto-fetch (contract code is present)"
);
assert!(
logger.contains("merge_rejected_invalid_update"),
"expected event=merge_rejected_invalid_update in logs, got: {:?}",
logger.logs()
);
assert!(
!logger.logs().iter().any(|l| l.contains("WARN")),
"invalid-update rejection must not produce WARN-level logs (the old misleading 'contract not ready locally' line), got: {:?}",
logger.logs()
);
assert!(
!logger
.logs()
.iter()
.any(|l| l.contains("contract not ready locally")),
"the misleading 'contract not ready locally' message must not appear for invalid-update rejections, got: {:?}",
logger.logs()
);
}
#[test]
fn broadcast_to_streaming_failure_logs_warn_and_triggers_auto_fetch_for_real_failure() {
let logger = TestLogger::new().capture_logs().with_level("info").init();
let tx = Transaction::new::<UpdateMsg>();
let err: OpError = missing_parameters_failure().into();
let needs_auto_fetch =
log_broadcast_to_streaming_failure(&tx, &make_contract_key(2), &err);
assert!(
needs_auto_fetch,
"real failures must trigger self-heal auto-fetch"
);
assert!(
logger.contains("WARN"),
"real failures remain WARN-level for the streaming branch, got: {:?}",
logger.logs()
);
assert!(
logger.contains("contract not ready locally"),
"expected the WARN message text for real failure, got: {:?}",
logger.logs()
);
}
#[test]
fn broadcast_to_streaming_failure_logs_warn_and_skips_auto_fetch_for_out_of_gas() {
let logger = TestLogger::new().capture_logs().with_level("info").init();
let tx = Transaction::new::<UpdateMsg>();
let err: OpError = out_of_gas_failure().into();
let needs_auto_fetch =
log_broadcast_to_streaming_failure(&tx, &make_contract_key(1), &err);
assert!(
logger.contains("WARN"),
"out-of-gas must remain WARN-level for the streaming branch, got: {:?}",
logger.logs()
);
assert!(
!logger
.logs()
.iter()
.any(|l| l.contains("merge_rejected_invalid_update")),
"out-of-gas must NOT be classified as a benign rejection, got: {:?}",
logger.logs()
);
assert!(
!needs_auto_fetch,
"out-of-gas must NOT trigger self-heal auto-fetch (contract code is present locally; the broader is_contract_exec_rejection predicate catches this case independently of log severity)"
);
}
}
#[test]
fn legacy_broadcast_to_sends_summary_back_on_rejection() {
let src = include_str!("update.rs");
let bcast_arm_pos = src
.find("UpdateMsg::BroadcastTo {")
.expect("UpdateMsg::BroadcastTo match arm not found");
let err_branch_start = src[bcast_arm_pos..]
.find("Err(err) =>")
.map(|p| bcast_arm_pos + p)
.expect("BroadcastTo Err(err) branch not found");
let branch = &src[err_branch_start..err_branch_start + 3500];
assert!(
branch.contains("err.is_invalid_update_rejection()"),
"BroadcastTo rejection branch MUST gate summary-back on \
is_invalid_update_rejection (not is_contract_exec_rejection): \
the broader predicate matches OOG/traps which are \
attacker-inducible and must not amplify into summary-back"
);
assert!(
branch.contains("send_summary_back_on_rejection"),
"send_summary_back_on_rejection call missing — stale-version \
rejections in legacy BroadcastTo will keep amplifying"
);
}
#[test]
fn legacy_broadcast_to_streaming_sends_summary_back_on_rejection() {
let src = include_str!("update.rs");
let stream_arm_pos = src
.find("UpdateMsg::BroadcastToStreaming")
.expect("BroadcastToStreaming arm not found");
let err_branch_start = src[stream_arm_pos..]
.find("Err(err) =>")
.map(|p| stream_arm_pos + p)
.expect("BroadcastToStreaming Err(err) branch not found");
let branch = &src[err_branch_start..err_branch_start + 3000];
assert!(
branch.contains("err.is_invalid_update_rejection()"),
"BroadcastToStreaming Err branch must gate summary-back on \
is_invalid_update_rejection"
);
assert!(
branch.contains("send_summary_back_on_rejection"),
"BroadcastToStreaming Err branch must spawn \
send_summary_back_on_rejection on invalid-update rejection"
);
assert!(
branch.contains("try_auto_fetch_contract"),
"BroadcastToStreaming Err branch must still call \
try_auto_fetch_contract on the non-rejection (missing-contract) path"
);
}
#[test]
fn summary_back_helper_gates_on_summary_equality() {
let src = include_str!("update.rs");
let fn_start = src
.find("pub(crate) async fn send_summary_back_on_rejection(")
.expect("send_summary_back_on_rejection fn not found");
let fn_end_offset = src[fn_start..]
.find("\n}\n")
.expect("send_summary_back_on_rejection fn close not found");
let fn_body = &src[fn_start..fn_start + fn_end_offset];
assert!(
fn_body.contains("sender_summary_bytes"),
"send_summary_back_on_rejection must take sender_summary_bytes \
as a parameter"
);
let throttle_pos = fn_body
.find("should_send_summary_notification")
.expect("helper MUST call should_send_summary_notification (throttle)");
let wasm_call_pos = fn_body
.find("get_contract_summary")
.expect("helper must call get_contract_summary to compute our_summary");
assert!(
throttle_pos < wasm_call_pos,
"should_send_summary_notification MUST run before get_contract_summary \
— otherwise attacker-induced rejections force unbounded WASM amplification"
);
let inequality_check_pos = fn_body
.find("our_summary.as_ref() != sender_summary_bytes.as_slice()")
.expect(
"helper MUST use `our_summary.as_ref() != sender_summary_bytes.as_slice()` \
as the gate condition (reversed direction reintroduces the SyncStateToPeer \
loop — see node.rs:1791-1839)",
);
let after_check = &fn_body[inequality_check_pos..];
let return_pos = after_check.find("return;").expect(
"gate must early-return on inequality (reversed direction would bypass \
the SyncStateToPeer safeguard)",
);
let notify_pos = after_check
.find("notify_node_event")
.expect("helper must still call notify_node_event on the equality path");
assert!(
return_pos < notify_pos,
"early return on inequality MUST precede the notify_node_event \
send — otherwise mismatched summaries would still be transmitted"
);
}
}