use std::net::SocketAddr;
use std::sync::Arc;
use either::Either;
use freenet_stdlib::client_api::{ContractResponse, ErrorKind, HostResponse};
use freenet_stdlib::prelude::*;
use crate::client_events::HostResult;
use crate::config::GlobalExecutor;
use crate::contract::{ContractHandlerEvent, StoreResponse};
use crate::message::{DeltaOrFullState, InterestMessage, NetMessage, NodeEvent, Transaction};
use crate::node::OpManager;
use crate::operations::OpError;
use crate::ring::{PeerKeyLocation, RingError};
use crate::tracing::{NetEventLog, OperationFailure, state_hash_full};
use super::{UpdateExecution, UpdateMsg};
#[cfg(any(test, feature = "testing"))]
pub static RELAY_UPDATE_DRIVER_CALL_COUNT: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub static RELAY_UPDATE_INFLIGHT: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub static RELAY_UPDATE_SPAWNED_TOTAL: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub static RELAY_UPDATE_COMPLETED_TOTAL: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub static RELAY_UPDATE_DEDUP_REJECTS: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub(crate) async fn start_client_update(
op_manager: Arc<OpManager>,
client_tx: Transaction,
key: ContractKey,
update_data: UpdateData<'static>,
related_contracts: RelatedContracts<'static>,
) -> Result<Transaction, OpError> {
tracing::debug!(
tx = %client_tx,
contract = %key,
"update (task-per-tx): spawning client-initiated task"
);
GlobalExecutor::spawn(run_client_update(
op_manager,
client_tx,
key,
update_data,
related_contracts,
));
Ok(client_tx)
}
async fn run_client_update(
op_manager: Arc<OpManager>,
client_tx: Transaction,
key: ContractKey,
update_data: UpdateData<'static>,
related_contracts: RelatedContracts<'static>,
) {
let outcome = drive_client_update(&op_manager, client_tx, key, update_data, related_contracts)
.await
.unwrap_or_else(DriverOutcome::InfrastructureError);
deliver_outcome(&op_manager, client_tx, outcome);
}
#[derive(Debug)]
enum DriverOutcome {
Publish(HostResult),
InfrastructureError(OpError),
}
async fn drive_client_update(
op_manager: &OpManager,
client_tx: Transaction,
key: ContractKey,
update_data: UpdateData<'static>,
related_contracts: RelatedContracts<'static>,
) -> Result<DriverOutcome, OpError> {
let sender_addr = op_manager.ring.connection_manager.peer_addr()?;
let proximity_neighbors: Vec<_> = op_manager.neighbor_hosting.neighbors_with_contract(&key);
let mut target_from_proximity: Option<PeerKeyLocation> = None;
for pub_key in &proximity_neighbors {
match op_manager
.ring
.connection_manager
.get_peer_by_pub_key(pub_key)
{
Some(peer) => {
if peer
.socket_addr()
.map(|a| a == sender_addr)
.unwrap_or(false)
{
continue;
}
target_from_proximity = Some(peer);
break;
}
None => {
tracing::debug!(
%key,
peer = %pub_key,
"update (task-per-tx): proximity cache neighbor not connected, trying next"
);
}
}
}
let target = if let Some(proximity_neighbor) = target_from_proximity {
tracing::debug!(
%key,
target = ?proximity_neighbor.socket_addr(),
proximity_neighbors_found = proximity_neighbors.len(),
"update (task-per-tx): using proximity cache neighbor as target"
);
Some(proximity_neighbor)
} else {
op_manager
.ring
.closest_potentially_hosting(&key, [sender_addr].as_slice())
};
match target {
None => {
tracing::debug!(
tx = %client_tx,
%key,
"update (task-per-tx): no remote peers, handling locally"
);
let is_hosting = op_manager.ring.is_hosting_contract(&key);
if !is_hosting {
tracing::error!(
contract = %key,
phase = "error",
"update (task-per-tx): cannot update contract on isolated node — not hosted"
);
return Err(OpError::RingError(RingError::NoHostingPeers(*key.id())));
}
let UpdateExecution {
value: _,
summary,
changed,
..
} = super::update_contract(op_manager, key, update_data, related_contracts).await?;
if !changed {
tracing::debug!(
tx = %client_tx,
%key,
"update (task-per-tx): local update resulted in no change"
);
} else {
tracing::debug!(
tx = %client_tx,
%key,
"update (task-per-tx): local-only update complete"
);
}
let host_result: HostResult = Ok(HostResponse::ContractResponse(
ContractResponse::UpdateResponse {
key,
summary: summary.clone(),
},
));
Ok(DriverOutcome::Publish(host_result))
}
Some(target) => {
let target_addr = match target.socket_addr() {
Some(addr) => addr,
None => {
tracing::error!(
tx = %client_tx,
%key,
target_pub_key = %target.pub_key(),
"update (task-per-tx): target peer has no socket address"
);
return Err(OpError::RingError(RingError::NoHostingPeers(*key.id())));
}
};
tracing::debug!(
tx = %client_tx,
%key,
target_peer = %target_addr,
"update (task-per-tx): applying locally before forwarding"
);
let UpdateExecution {
value: updated_value,
summary,
changed: _,
..
} = super::update_contract(
op_manager,
key,
update_data.clone(),
related_contracts.clone(),
)
.await
.map_err(|e| {
tracing::error!(
tx = %client_tx,
contract = %key,
error = %e,
phase = "error",
"update (task-per-tx): failed to apply update locally before forwarding"
);
e
})?;
if let Some(event) =
NetEventLog::update_request(&client_tx, &op_manager.ring, key, target.clone())
{
op_manager.ring.register_events(Either::Left(event)).await;
}
let msg = NetMessage::from(UpdateMsg::RequestUpdate {
id: client_tx,
key,
related_contracts,
value: updated_value,
});
let mut ctx = op_manager.op_ctx(client_tx);
ctx.send_fire_and_forget(target_addr, msg).await?;
tracing::debug!(
tx = %client_tx,
%key,
target_peer = %target_addr,
"update (task-per-tx): forwarded to target, operation complete"
);
let host_result: HostResult = Ok(HostResponse::ContractResponse(
ContractResponse::UpdateResponse {
key,
summary: summary.clone(),
},
));
Ok(DriverOutcome::Publish(host_result))
}
}
}
fn deliver_outcome(op_manager: &OpManager, client_tx: Transaction, outcome: DriverOutcome) {
match outcome {
DriverOutcome::Publish(result) => {
op_manager.send_client_result(client_tx, result);
}
DriverOutcome::InfrastructureError(err) => {
tracing::warn!(
tx = %client_tx,
error = %err,
"update (task-per-tx): infrastructure error; publishing synthesized client error"
);
let synthesized: HostResult = Err(ErrorKind::OperationError {
cause: format!("UPDATE failed: {err}").into(),
}
.into());
op_manager.send_client_result(client_tx, synthesized);
}
}
op_manager.completed(client_tx);
}
pub(crate) async fn start_relay_request_update(
op_manager: Arc<OpManager>,
incoming_tx: Transaction,
key: ContractKey,
related_contracts: RelatedContracts<'static>,
value: WrappedState,
sender_addr: SocketAddr,
) -> Result<(), OpError> {
#[cfg(any(test, feature = "testing"))]
RELAY_UPDATE_DRIVER_CALL_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if !op_manager.active_relay_update_txs.insert(incoming_tx) {
RELAY_UPDATE_DEDUP_REJECTS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
tracing::debug!(
tx = %incoming_tx,
%key,
%sender_addr,
phase = "relay_update_dedup_reject",
"UPDATE relay (task-per-tx): duplicate RequestUpdate for in-flight tx, dropping"
);
return Ok(());
}
tracing::debug!(
tx = %incoming_tx,
%key,
%sender_addr,
phase = "relay_update_request_start",
"UPDATE relay (task-per-tx): spawning RequestUpdate driver"
);
RELAY_UPDATE_INFLIGHT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
RELAY_UPDATE_SPAWNED_TOTAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let guard = RelayUpdateInflightGuard {
op_manager: op_manager.clone(),
incoming_tx,
};
GlobalExecutor::spawn(run_relay_request_update(
guard,
op_manager,
incoming_tx,
key,
related_contracts,
value,
sender_addr,
));
Ok(())
}
pub(crate) async fn start_relay_broadcast_to(
op_manager: Arc<OpManager>,
incoming_tx: Transaction,
key: ContractKey,
payload: DeltaOrFullState,
sender_summary_bytes: Vec<u8>,
sender_addr: SocketAddr,
) -> Result<(), OpError> {
#[cfg(any(test, feature = "testing"))]
RELAY_UPDATE_DRIVER_CALL_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if !op_manager.active_relay_update_txs.insert(incoming_tx) {
RELAY_UPDATE_DEDUP_REJECTS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
tracing::debug!(
tx = %incoming_tx,
%key,
%sender_addr,
phase = "relay_update_dedup_reject",
"UPDATE relay (task-per-tx): duplicate BroadcastTo for in-flight tx, dropping"
);
return Ok(());
}
tracing::debug!(
tx = %incoming_tx,
%key,
%sender_addr,
phase = "relay_update_broadcast_start",
"UPDATE relay (task-per-tx): spawning BroadcastTo driver"
);
RELAY_UPDATE_INFLIGHT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
RELAY_UPDATE_SPAWNED_TOTAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let guard = RelayUpdateInflightGuard {
op_manager: op_manager.clone(),
incoming_tx,
};
GlobalExecutor::spawn(run_relay_broadcast_to(
guard,
op_manager,
incoming_tx,
key,
payload,
sender_summary_bytes,
sender_addr,
));
Ok(())
}
struct RelayUpdateInflightGuard {
op_manager: Arc<OpManager>,
incoming_tx: Transaction,
}
impl Drop for RelayUpdateInflightGuard {
fn drop(&mut self) {
self.op_manager
.active_relay_update_txs
.remove(&self.incoming_tx);
RELAY_UPDATE_INFLIGHT.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
RELAY_UPDATE_COMPLETED_TOTAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
async fn run_relay_request_update(
guard: RelayUpdateInflightGuard,
op_manager: Arc<OpManager>,
incoming_tx: Transaction,
key: ContractKey,
related_contracts: RelatedContracts<'static>,
value: WrappedState,
sender_addr: SocketAddr,
) {
let _guard = guard;
if let Err(err) = drive_relay_request_update(
&op_manager,
incoming_tx,
key,
related_contracts,
value,
sender_addr,
)
.await
{
tracing::warn!(
tx = %incoming_tx,
%key,
error = %err,
phase = "relay_update_request_error",
"UPDATE relay (task-per-tx): RequestUpdate driver returned error"
);
}
}
async fn run_relay_broadcast_to(
guard: RelayUpdateInflightGuard,
op_manager: Arc<OpManager>,
incoming_tx: Transaction,
key: ContractKey,
payload: DeltaOrFullState,
sender_summary_bytes: Vec<u8>,
sender_addr: SocketAddr,
) {
let _guard = guard;
if let Err(err) = drive_relay_broadcast_to(
&op_manager,
incoming_tx,
key,
payload,
sender_summary_bytes,
sender_addr,
)
.await
{
tracing::warn!(
tx = %incoming_tx,
%key,
error = %err,
phase = "relay_update_broadcast_error",
"UPDATE relay (task-per-tx): BroadcastTo driver returned error"
);
}
}
async fn drive_relay_request_update(
op_manager: &OpManager,
incoming_tx: Transaction,
key: ContractKey,
related_contracts: RelatedContracts<'static>,
value: WrappedState,
sender_addr: SocketAddr,
) -> Result<(), OpError> {
let executing_addr = op_manager
.ring
.connection_manager
.own_location()
.socket_addr();
tracing::debug!(
tx = %incoming_tx,
%key,
executing_peer = ?executing_addr,
request_sender = %sender_addr,
"UPDATE relay (task-per-tx): processing RequestUpdate"
);
let state_before = match op_manager
.notify_contract_handler(ContractHandlerEvent::GetQuery {
instance_id: *key.id(),
return_contract_code: false,
})
.await
{
Ok(ContractHandlerEvent::GetResponse {
response: Ok(StoreResponse { state: Some(s), .. }),
..
}) => Some(s),
_ => None,
};
if state_before.is_some() {
let hash_before = state_before.as_ref().map(state_hash_full);
let UpdateExecution {
value: updated_value,
summary: _,
changed,
..
} = super::update_contract(
op_manager,
key,
UpdateData::State(State::from(value.clone())),
related_contracts.clone(),
)
.await?;
let hash_after = Some(state_hash_full(&updated_value));
if let Some(requester_pkl) = op_manager
.ring
.connection_manager
.get_peer_by_addr(sender_addr)
{
if let Some(event) = NetEventLog::update_success(
&incoming_tx,
&op_manager.ring,
key,
requester_pkl,
hash_before,
hash_after,
Some(updated_value.len()),
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
}
if !changed {
tracing::debug!(
tx = %incoming_tx,
%key,
"UPDATE relay (task-per-tx): yielded no state change, skipping broadcast"
);
} else {
tracing::debug!(
tx = %incoming_tx,
%key,
"UPDATE relay (task-per-tx): RequestUpdate succeeded, state changed"
);
}
return Ok(());
}
let self_addr = op_manager.ring.connection_manager.peer_addr()?;
let skip_list = vec![self_addr, sender_addr];
let next_target = op_manager
.ring
.closest_potentially_hosting(&key, skip_list.as_slice());
let forward_target = match next_target {
Some(t) => t,
None => {
let candidates = op_manager
.ring
.k_closest_potentially_hosting(&key, skip_list.as_slice(), 5)
.into_iter()
.filter_map(|loc| loc.socket_addr())
.map(|addr| format!("{:.8}", addr))
.collect::<Vec<_>>();
let connection_count = op_manager.ring.connection_manager.num_connections();
tracing::error!(
tx = %incoming_tx,
contract = %key,
?candidates,
connection_count,
peer_addr = %sender_addr,
phase = "error",
"UPDATE relay (task-per-tx): contract not local and no peers to forward to"
);
if let Some(event) = NetEventLog::update_failure(
&incoming_tx,
&op_manager.ring,
key,
OperationFailure::NoPeersAvailable,
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
return Err(OpError::RingError(RingError::NoHostingPeers(*key.id())));
}
};
let forward_addr = match forward_target.socket_addr() {
Some(addr) => addr,
None => {
tracing::error!(
tx = %incoming_tx,
%key,
target_pub_key = %forward_target.pub_key(),
"UPDATE relay (task-per-tx): forward target has no socket address"
);
return Err(OpError::RingError(RingError::NoHostingPeers(*key.id())));
}
};
tracing::debug!(
tx = %incoming_tx,
%key,
next_peer = %forward_addr,
"UPDATE relay (task-per-tx): forwarding to peer that might have contract"
);
if let Some(event) =
NetEventLog::update_request(&incoming_tx, &op_manager.ring, key, forward_target.clone())
{
op_manager.ring.register_events(Either::Left(event)).await;
}
let request = NetMessage::from(UpdateMsg::RequestUpdate {
id: incoming_tx,
key,
related_contracts,
value,
});
let mut ctx = op_manager.op_ctx(incoming_tx);
if let Err(err) = ctx.send_fire_and_forget(forward_addr, request).await {
tracing::warn!(
tx = %incoming_tx,
%key,
target = %forward_addr,
error = %err,
"UPDATE relay (task-per-tx): forward send failed"
);
return Err(err);
}
Ok(())
}
async fn drive_relay_broadcast_to(
op_manager: &OpManager,
incoming_tx: Transaction,
key: ContractKey,
payload: DeltaOrFullState,
sender_summary_bytes: Vec<u8>,
sender_addr: SocketAddr,
) -> Result<(), OpError> {
let self_location = op_manager.ring.connection_manager.own_location();
let sender_summary = StateSummary::from(sender_summary_bytes);
if let Some(sender_pkl) = op_manager
.ring
.connection_manager
.get_peer_by_addr(sender_addr)
{
let sender_key = crate::ring::PeerKey::from(sender_pkl.pub_key().clone());
op_manager
.interest_manager
.update_peer_summary(&key, &sender_key, Some(sender_summary));
}
let (update_data, payload_bytes) = match &payload {
DeltaOrFullState::Delta(bytes) => {
tracing::debug!(
contract = %key,
delta_size = bytes.len(),
"UPDATE relay (task-per-tx): received delta broadcast"
);
(
UpdateData::Delta(StateDelta::from(bytes.clone())),
bytes.clone(),
)
}
DeltaOrFullState::FullState(bytes) => {
tracing::debug!(
contract = %key,
state_size = bytes.len(),
"UPDATE relay (task-per-tx): received full state broadcast"
);
(UpdateData::State(State::from(bytes.clone())), bytes.clone())
}
};
let is_delta = matches!(payload, DeltaOrFullState::Delta(_));
let state_for_telemetry = WrappedState::from(payload_bytes.clone());
if let Some(requester_pkl) = op_manager
.ring
.connection_manager
.get_peer_by_addr(sender_addr)
{
if let Some(event) = NetEventLog::update_broadcast_received(
&incoming_tx,
&op_manager.ring,
key,
requester_pkl,
state_for_telemetry.clone(),
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
}
if op_manager.broadcast_dedup_cache.check_and_insert(
&key,
&payload_bytes,
is_delta,
op_manager.interest_manager.now(),
) {
tracing::debug!(
tx = %incoming_tx,
%key,
"UPDATE relay (task-per-tx): BroadcastTo skipped — duplicate payload (dedup hit)"
);
return Ok(());
}
let update_result =
super::update_contract(op_manager, key, update_data, RelatedContracts::default()).await;
let UpdateExecution {
value: updated_value,
summary: update_summary,
changed,
..
} = match update_result {
Ok(result) => result,
Err(err) => {
if is_delta {
tracing::warn!(
tx = %incoming_tx,
contract = %key,
sender = %sender_addr,
error = %err,
event = "delta_apply_failed",
"UPDATE relay (task-per-tx): delta apply failed, sending ResyncRequest"
);
if let Some(sender_pkl) = op_manager
.ring
.connection_manager
.get_peer_by_addr(sender_addr)
{
let sender_key = crate::ring::PeerKey::from(sender_pkl.pub_key().clone());
op_manager
.interest_manager
.update_peer_summary(&key, &sender_key, None);
}
tracing::info!(
tx = %incoming_tx,
contract = %key,
target = %sender_addr,
event = "resync_request_sent",
"UPDATE relay (task-per-tx): sending ResyncRequest after delta failure"
);
if let Err(e) = op_manager
.notify_node_event(NodeEvent::SendInterestMessage {
target: sender_addr,
message: InterestMessage::ResyncRequest { key },
})
.await
{
tracing::warn!(
tx = %incoming_tx,
error = %e,
"UPDATE relay (task-per-tx): failed to send ResyncRequest"
);
}
} else if !err.is_contract_exec_rejection() {
op_manager.try_auto_fetch_contract(&key, sender_addr);
}
return Err(err);
}
};
tracing::debug!(
tx = %incoming_tx,
%key,
"UPDATE relay (task-per-tx): BroadcastTo applied"
);
if let Some(event) = NetEventLog::update_broadcast_applied(
&incoming_tx,
&op_manager.ring,
key,
&state_for_telemetry,
&updated_value,
changed,
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
if !changed {
tracing::debug!(
tx = %incoming_tx,
%key,
"UPDATE relay (task-per-tx): BroadcastTo produced no change, ending propagation"
);
return Ok(());
}
tracing::debug!(
"UPDATE relay (task-per-tx): contract {} @ {:?} updated via BroadcastTo",
key,
self_location.location()
);
let op_mgr = op_manager.clone();
let summary = update_summary.clone();
GlobalExecutor::spawn(async move {
super::send_proactive_summary_notification(&op_mgr, &key, sender_addr, summary).await;
});
Ok(())
}
#[cfg(test)]
mod tests {
#[test]
fn client_events_calls_start_client_update() {
let src = include_str!("../../client_events.rs");
assert!(
src.contains("op_ctx_task::start_client_update"),
"client_events.rs must call update::op_ctx_task::start_client_update \
for client-initiated UPDATEs (not the legacy request_update path)"
);
let update_section = src
.split("ContractRequest::Update")
.nth(1)
.expect("client_events.rs must contain a ContractRequest::Update handler");
let update_section = update_section
.split("ContractRequest::")
.next()
.unwrap_or(update_section);
assert!(
!update_section.contains("request_update("),
"client_events.rs UPDATE handler must NOT call request_update() directly \
— it should use op_ctx_task::start_client_update instead"
);
}
#[test]
fn driver_uses_send_client_result_not_raw_try_send() {
let src = include_str!("op_ctx_task.rs");
assert!(
src.contains("send_client_result"),
"driver must use op_manager.send_client_result() for result delivery"
);
let deliver_fn = src
.find("fn deliver_outcome(")
.expect("deliver_outcome must exist");
let deliver_body = &src[deliver_fn..];
let end = deliver_body
.find("#[cfg(test)]")
.unwrap_or(deliver_body.len());
let deliver_body = &deliver_body[..end];
assert!(
deliver_body.contains("send_client_result("),
"deliver_outcome must call send_client_result"
);
}
#[test]
fn driver_calls_completed() {
let src = include_str!("op_ctx_task.rs");
assert!(
src.contains("op_manager.completed("),
"driver must call op_manager.completed() for cleanup"
);
}
#[test]
fn driver_calls_update_contract() {
let src = include_str!("op_ctx_task.rs");
assert!(
src.contains("update_contract("),
"driver must call update_contract() to preserve WASM merge + \
BroadcastStateChange + persistence side effects"
);
}
#[test]
fn driver_sends_updated_value_in_request() {
let src = include_str!("op_ctx_task.rs");
assert!(
src.contains("value: updated_value"),
"RequestUpdate must use the post-merge updated_value, not the original \
client update_data (mirrors legacy update.rs:2055)"
);
}
#[test]
fn relay_drivers_never_call_send_and_await() {
let src = include_str!("op_ctx_task.rs");
let relay_start = src
.find("pub(crate) async fn start_relay_request_update(")
.expect("start_relay_request_update not found");
let relay_end = src
.find("#[cfg(test)]")
.expect("test module marker not found");
let relay_src = &src[relay_start..relay_end];
assert!(
!relay_src.contains(".send_and_await("),
"Relay UPDATE drivers MUST NOT call send_and_await. UPDATE is \
fire-and-forget; introducing a wait reintroduces phase-5 GET's \
amplifier surface."
);
}
#[test]
fn relay_request_update_uses_fire_and_forget() {
let src = include_str!("op_ctx_task.rs");
let driver_start = src
.find("async fn drive_relay_request_update(")
.expect("drive_relay_request_update not found");
let driver_end_marker = src[driver_start..]
.find("\n}")
.expect("driver body end not found");
let driver_src = &src[driver_start..driver_start + driver_end_marker];
assert!(
driver_src.contains("send_fire_and_forget"),
"drive_relay_request_update must use send_fire_and_forget for the \
forward path"
);
}
#[test]
fn broadcast_to_dedup_runs_before_merge() {
let src = include_str!("op_ctx_task.rs");
let driver_start = src
.find("async fn drive_relay_broadcast_to(")
.expect("drive_relay_broadcast_to not found");
let driver_end_marker = src[driver_start..]
.find("\n}")
.expect("driver body end not found");
let driver_src = &src[driver_start..driver_start + driver_end_marker];
let dedup_pos = driver_src
.find("broadcast_dedup_cache.check_and_insert")
.expect("dedup cache check missing in BroadcastTo driver");
let merge_pos = driver_src
.find("super::update_contract(")
.expect("update_contract call missing in BroadcastTo driver");
assert!(
dedup_pos < merge_pos,
"broadcast_dedup_cache.check_and_insert MUST appear before \
update_contract() in drive_relay_broadcast_to. If the order \
flips, duplicate broadcasts re-run WASM merge and amplify cost."
);
}
#[test]
fn broadcast_to_sends_resync_on_delta_failure() {
let src = include_str!("op_ctx_task.rs");
let driver_start = src
.find("async fn drive_relay_broadcast_to(")
.expect("drive_relay_broadcast_to not found");
let driver_src = &src[driver_start..];
assert!(
driver_src.contains("InterestMessage::ResyncRequest"),
"drive_relay_broadcast_to must emit InterestMessage::ResyncRequest \
when a delta payload fails to apply (mirrors legacy update.rs:745-758)."
);
}
#[test]
fn broadcast_to_triggers_auto_fetch_on_full_state_failure() {
let src = include_str!("op_ctx_task.rs");
let driver_start = src
.find("async fn drive_relay_broadcast_to(")
.expect("drive_relay_broadcast_to not found");
let driver_src = &src[driver_start..];
assert!(
driver_src.contains("try_auto_fetch_contract"),
"drive_relay_broadcast_to must call try_auto_fetch_contract on \
non-rejection full-state failures (mirrors legacy update.rs:764)."
);
}
#[test]
fn broadcast_to_spawns_proactive_summary() {
let src = include_str!("op_ctx_task.rs");
let driver_start = src
.find("async fn drive_relay_broadcast_to(")
.expect("drive_relay_broadcast_to not found");
let driver_src = &src[driver_start..];
assert!(
driver_src.contains("send_proactive_summary_notification"),
"drive_relay_broadcast_to must spawn send_proactive_summary_notification \
after a successful state change (mirrors legacy update.rs:806-819)."
);
}
#[test]
fn relay_drivers_use_per_node_dedup_gate() {
let src = include_str!("op_ctx_task.rs");
let request_start = src
.find("pub(crate) async fn start_relay_request_update(")
.expect("start_relay_request_update not found");
let broadcast_start = src
.find("pub(crate) async fn start_relay_broadcast_to(")
.expect("start_relay_broadcast_to not found");
let req_body = &src[request_start..request_start + 1500];
let bc_body = &src[broadcast_start..broadcast_start + 1500];
assert!(
req_body.contains("active_relay_update_txs.insert"),
"start_relay_request_update must check active_relay_update_txs"
);
assert!(
bc_body.contains("active_relay_update_txs.insert"),
"start_relay_broadcast_to must check active_relay_update_txs"
);
}
#[test]
fn dedup_rejection_increments_counter() {
let src = include_str!("op_ctx_task.rs");
let relay_start = src
.find("pub(crate) async fn start_relay_request_update(")
.expect("start_relay_request_update not found");
let relay_end = src
.find("#[cfg(test)]")
.expect("test module marker not found");
let relay_src = &src[relay_start..relay_end];
let sites: Vec<&str> = relay_src
.split("active_relay_update_txs.insert(incoming_tx)")
.skip(1)
.collect();
assert_eq!(
sites.len(),
2,
"expected exactly two dedup sites (RequestUpdate + BroadcastTo)"
);
for (idx, section) in sites.iter().enumerate() {
let window = §ion[..500.min(section.len())];
assert!(
window.contains("RELAY_UPDATE_DEDUP_REJECTS.fetch_add"),
"dedup gate site #{idx} does not increment RELAY_UPDATE_DEDUP_REJECTS"
);
}
}
#[test]
fn raii_guard_clears_dedup_set_on_drop() {
let src = include_str!("op_ctx_task.rs");
let drop_start = src
.find("impl Drop for RelayUpdateInflightGuard")
.expect("RelayUpdateInflightGuard Drop impl not found");
let drop_body = &src[drop_start..drop_start + 600];
assert!(
drop_body.contains("active_relay_update_txs"),
"RelayUpdateInflightGuard::drop must remove from active_relay_update_txs"
);
assert!(
drop_body.contains("RELAY_UPDATE_INFLIGHT.fetch_sub"),
"RelayUpdateInflightGuard::drop must decrement RELAY_UPDATE_INFLIGHT"
);
assert!(
drop_body.contains("RELAY_UPDATE_COMPLETED_TOTAL.fetch_add"),
"RelayUpdateInflightGuard::drop must increment RELAY_UPDATE_COMPLETED_TOTAL"
);
}
#[test]
fn slice_a_does_not_touch_streaming_variants() {
let src = include_str!("op_ctx_task.rs");
let relay_start = src
.find("pub(crate) async fn start_relay_request_update(")
.expect("start_relay_request_update not found");
let relay_end = src
.find("#[cfg(test)]")
.expect("test module marker not found");
let relay_src = &src[relay_start..relay_end];
assert!(
!relay_src.contains("RequestUpdateStreaming"),
"slice A must not handle RequestUpdateStreaming (deferred to slice B)"
);
assert!(
!relay_src.contains("BroadcastToStreaming"),
"slice A must not handle BroadcastToStreaming (deferred to slice B)"
);
}
#[test]
fn deprecated_broadcasting_variant_stays_legacy() {
let src = include_str!("op_ctx_task.rs");
let relay_start = src
.find("pub(crate) async fn start_relay_request_update(")
.expect("start_relay_request_update not found");
let relay_end = src
.find("#[cfg(test)]")
.expect("test module marker not found");
let relay_src = &src[relay_start..relay_end];
assert!(
!relay_src.contains("UpdateMsg::Broadcasting"),
"Driver must not handle UpdateMsg::Broadcasting — deprecated wire \
variant; legacy no-op handler stays in place."
);
}
}