use std::sync::Arc;
use freenet_stdlib::client_api::{ContractResponse, ErrorKind, HostResponse};
use freenet_stdlib::prelude::*;
use crate::client_events::HostResult;
use crate::config::GlobalExecutor;
use crate::contract::{ContractHandlerEvent, StoreResponse};
use crate::message::{NetMessage, NetMessageV1, Transaction};
use crate::node::OpManager;
use crate::operations::OpError;
use crate::operations::VisitedPeers;
use crate::operations::op_ctx::{
AdvanceOutcome, AttemptOutcome, RetryDriver, RetryLoopOutcome, drive_retry_loop,
};
use crate::ring::{Location, PeerKeyLocation};
use crate::router::{RouteEvent, RouteOutcome};
use crate::transport::peer_connection::StreamId;
use super::{GetMsg, GetMsgResult, GetStreamingPayload};
use crate::operations::orphan_streams::{OrphanStreamError, STREAM_CLAIM_TIMEOUT};
#[cfg(any(test, feature = "testing"))]
pub static DRIVER_CALL_COUNT: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
#[allow(clippy::too_many_arguments)]
pub(crate) async fn start_client_get(
op_manager: Arc<OpManager>,
client_tx: Transaction,
instance_id: ContractInstanceId,
return_contract_code: bool,
subscribe: bool,
blocking_subscribe: bool,
) -> Result<Transaction, OpError> {
#[cfg(any(test, feature = "testing"))]
DRIVER_CALL_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
tracing::debug!(
tx = %client_tx,
contract = %instance_id,
"get (task-per-tx): spawning client-initiated task"
);
GlobalExecutor::spawn(run_client_get(
op_manager,
client_tx,
instance_id,
return_contract_code,
subscribe,
blocking_subscribe,
));
Ok(client_tx)
}
async fn run_client_get(
op_manager: Arc<OpManager>,
client_tx: Transaction,
instance_id: ContractInstanceId,
return_contract_code: bool,
subscribe: bool,
blocking_subscribe: bool,
) {
let outcome = drive_client_get(
op_manager.clone(),
client_tx,
instance_id,
return_contract_code,
subscribe,
blocking_subscribe,
)
.await;
deliver_outcome(&op_manager, client_tx, outcome);
}
#[derive(Debug)]
enum DriverOutcome {
Publish(HostResult),
InfrastructureError(OpError),
}
async fn drive_client_get(
op_manager: Arc<OpManager>,
client_tx: Transaction,
instance_id: ContractInstanceId,
return_contract_code: bool,
subscribe: bool,
blocking_subscribe: bool,
) -> DriverOutcome {
match drive_client_get_inner(
&op_manager,
client_tx,
instance_id,
return_contract_code,
subscribe,
blocking_subscribe,
)
.await
{
Ok(outcome) => outcome,
Err(err) => DriverOutcome::InfrastructureError(err),
}
}
async fn drive_client_get_inner(
op_manager: &Arc<OpManager>,
client_tx: Transaction,
instance_id: ContractInstanceId,
return_contract_code: bool,
subscribe: bool,
blocking_subscribe: bool,
) -> Result<DriverOutcome, OpError> {
let htl = op_manager.ring.max_hops_to_live;
let mut tried: Vec<std::net::SocketAddr> = Vec::new();
if let Some(own_addr) = op_manager.ring.connection_manager.get_own_addr() {
tried.push(own_addr);
}
let initial_target = op_manager
.ring
.k_closest_potentially_hosting(&instance_id, tried.as_slice(), 1)
.into_iter()
.next();
let current_target = match initial_target {
Some(peer) => {
if let Some(addr) = peer.socket_addr() {
tried.push(addr);
}
peer
}
None => op_manager.ring.connection_manager.own_location(),
};
let mut driver = GetRetryDriver {
op_manager,
instance_id,
htl,
tried,
retries: 0,
current_target,
attempt_visited: VisitedPeers::new(&client_tx),
};
let loop_result = drive_retry_loop(op_manager, client_tx, "get", &mut driver).await;
match loop_result {
RetryLoopOutcome::Done(terminal) => {
op_manager.completed(client_tx);
let reply_key = match &terminal {
Terminal::InlineFound {
key,
state,
contract,
} => {
cache_contract_locally(op_manager, *key, state.clone(), contract.clone()).await;
*key
}
Terminal::Streaming {
key,
stream_id,
includes_contract,
} => {
if let Some(peer_addr) = driver.current_target.socket_addr() {
if let Err(e) = assemble_and_cache_stream(
op_manager,
peer_addr,
*stream_id,
*key,
*includes_contract,
)
.await
{
tracing::warn!(
%key,
error = %e,
"get (task-per-tx): stream assembly failed — \
state will not be cached locally"
);
}
} else {
tracing::warn!(
%key,
"get (task-per-tx): current_target has no socket_addr; \
cannot claim orphan stream"
);
}
*key
}
Terminal::LocalCompletion => {
match lookup_stored_key(op_manager, &instance_id).await {
Some(k) => k,
None => synthetic_key(&instance_id),
}
}
};
let host_result =
build_host_response(op_manager, &instance_id, return_contract_code).await;
if host_result.is_ok()
&& !subscribe
&& crate::ring::AUTO_SUBSCRIBE_ON_GET
&& !op_manager.ring.is_subscribed(&reply_key)
{
let path_label = match &terminal {
Terminal::Streaming { .. } => "streaming (task-per-tx)",
Terminal::InlineFound { .. } | Terminal::LocalCompletion => {
"non-streaming (task-per-tx)"
}
};
crate::operations::auto_subscribe_on_get_response(
op_manager,
&reply_key,
&client_tx,
&Some(driver.current_target.clone()),
false,
blocking_subscribe,
path_label,
)
.await;
}
let contract_location = Location::from(&reply_key);
let route_event = RouteEvent {
peer: driver.current_target.clone(),
contract_location,
outcome: if host_result.is_ok() {
RouteOutcome::SuccessUntimed
} else {
RouteOutcome::Failure
},
op_type: Some(crate::node::network_status::OpType::Get),
};
if let Some(log_event) =
crate::tracing::NetEventLog::route_event(&client_tx, &op_manager.ring, &route_event)
{
op_manager
.ring
.register_events(either::Either::Left(log_event))
.await;
}
op_manager.ring.routing_finished(route_event);
crate::node::network_status::record_op_result(
crate::node::network_status::OpType::Get,
host_result.is_ok(),
);
maybe_subscribe_child(
op_manager,
client_tx,
reply_key,
subscribe,
blocking_subscribe,
)
.await;
Ok(DriverOutcome::Publish(host_result))
}
RetryLoopOutcome::Exhausted(cause) => {
Ok(DriverOutcome::Publish(Err(ErrorKind::OperationError {
cause: cause.into(),
}
.into())))
}
RetryLoopOutcome::Unexpected => Err(OpError::UnexpectedOpState),
RetryLoopOutcome::InfraError(err) => Err(err),
}
}
struct GetRetryDriver<'a> {
op_manager: &'a OpManager,
instance_id: ContractInstanceId,
htl: usize,
tried: Vec<std::net::SocketAddr>,
retries: usize,
current_target: PeerKeyLocation,
attempt_visited: VisitedPeers,
}
#[derive(Debug)]
enum Terminal {
InlineFound {
key: ContractKey,
state: WrappedState,
contract: Option<ContractContainer>,
},
Streaming {
key: ContractKey,
stream_id: StreamId,
includes_contract: bool,
},
LocalCompletion,
}
fn classify(reply: NetMessage) -> AttemptOutcome<Terminal> {
match reply {
NetMessage::V1(NetMessageV1::Get(GetMsg::Response {
result:
GetMsgResult::Found {
key,
value:
StoreResponse {
state: Some(state),
contract,
},
},
..
})) => AttemptOutcome::Terminal(Terminal::InlineFound {
key,
state,
contract,
}),
NetMessage::V1(NetMessageV1::Get(GetMsg::Response {
result: GetMsgResult::Found { value, .. },
..
})) => {
tracing::warn!(
?value,
"get (task-per-tx): Response{{Found}} arrived without state"
);
AttemptOutcome::Unexpected
}
NetMessage::V1(NetMessageV1::Get(GetMsg::Response {
result: GetMsgResult::NotFound,
..
})) => AttemptOutcome::Retry,
NetMessage::V1(NetMessageV1::Get(GetMsg::ResponseStreaming {
key,
stream_id,
includes_contract,
..
})) => AttemptOutcome::Terminal(Terminal::Streaming {
key,
stream_id,
includes_contract,
}),
NetMessage::V1(NetMessageV1::Get(GetMsg::Request { .. })) => {
AttemptOutcome::Terminal(Terminal::LocalCompletion)
}
NetMessage::V1(NetMessageV1::Get(
GetMsg::ForwardingAck { .. } | GetMsg::ResponseStreamingAck { .. },
)) => AttemptOutcome::Unexpected,
_ => AttemptOutcome::Unexpected,
}
}
impl RetryDriver for GetRetryDriver<'_> {
type Terminal = Terminal;
fn new_attempt_tx(&mut self) -> Transaction {
let tx = Transaction::new::<GetMsg>();
self.attempt_visited = VisitedPeers::new(&tx);
tx
}
fn build_request(&mut self, attempt_tx: Transaction) -> NetMessage {
NetMessage::from(GetMsg::Request {
id: attempt_tx,
instance_id: self.instance_id,
fetch_contract: true,
htl: self.htl,
visited: self.attempt_visited.clone(),
subscribe: false,
})
}
fn classify(&mut self, reply: NetMessage) -> AttemptOutcome<Terminal> {
classify(reply)
}
fn advance(&mut self) -> AdvanceOutcome {
match advance_to_next_peer(
self.op_manager,
&self.instance_id,
&mut self.tried,
&mut self.retries,
) {
Some((next_target, _next_addr)) => {
self.current_target = next_target;
AdvanceOutcome::Next
}
None => AdvanceOutcome::Exhausted,
}
}
}
async fn build_host_response(
op_manager: &OpManager,
instance_id: &ContractInstanceId,
return_contract_code: bool,
) -> HostResult {
let lookup = op_manager
.notify_contract_handler(ContractHandlerEvent::GetQuery {
instance_id: *instance_id,
return_contract_code,
})
.await;
match lookup {
Ok(ContractHandlerEvent::GetResponse {
key: Some(resolved_key),
response:
Ok(StoreResponse {
state: Some(state),
contract,
}),
}) => {
let client_contract = if return_contract_code { contract } else { None };
Ok(HostResponse::ContractResponse(
ContractResponse::GetResponse {
key: resolved_key,
contract: client_contract,
state,
},
))
}
_ => {
tracing::warn!(
contract = %instance_id,
"get (task-per-tx): terminal reply classified success but local \
store lookup returned no state; synthesizing client error"
);
Err(ErrorKind::OperationError {
cause: format!(
"GET succeeded on wire but local store lookup failed for {instance_id}"
)
.into(),
}
.into())
}
}
}
fn synthetic_key(instance_id: &ContractInstanceId) -> ContractKey {
ContractKey::from_id_and_code(*instance_id, CodeHash::new([0u8; 32]))
}
async fn cache_contract_locally(
op_manager: &OpManager,
key: ContractKey,
state: WrappedState,
contract: Option<ContractContainer>,
) {
let state_size = state.size() as u64;
let local_state = op_manager
.notify_contract_handler(ContractHandlerEvent::GetQuery {
instance_id: *key.id(),
return_contract_code: false,
})
.await;
let state_matches = matches!(
&local_state,
Ok(ContractHandlerEvent::GetResponse {
response: Ok(StoreResponse {
state: Some(local),
..
}),
..
}) if local.as_ref() == state.as_ref(),
);
let put_persisted = if state_matches {
tracing::debug!(
%key,
"get (task-per-tx): local state matches, skipping redundant PutQuery"
);
false
} else if let Some(contract_code) = contract {
match op_manager
.notify_contract_handler(ContractHandlerEvent::PutQuery {
key,
state,
related_contracts: RelatedContracts::default(),
contract: Some(contract_code),
})
.await
{
Ok(ContractHandlerEvent::PutResponse {
new_value: Ok(_), ..
}) => true,
Ok(ContractHandlerEvent::PutResponse {
new_value: Err(err),
..
}) => {
tracing::warn!(
%key,
%err,
"get (task-per-tx): PutQuery rejected by executor"
);
false
}
Ok(other) => {
tracing::warn!(
%key,
?other,
"get (task-per-tx): PutQuery returned unexpected event"
);
false
}
Err(err) => {
tracing::warn!(
%key,
%err,
"get (task-per-tx): PutQuery failed"
);
false
}
}
} else {
tracing::debug!(
%key,
"get (task-per-tx): skipping local cache — contract code missing"
);
false
};
let access_result = op_manager.ring.record_get_access(key, state_size);
op_manager.ring.mark_local_client_access(&key);
let mut removed_contracts = Vec::new();
for evicted_key in &access_result.evicted {
if op_manager
.interest_manager
.unregister_local_hosting(evicted_key)
{
removed_contracts.push(*evicted_key);
}
}
if access_result.is_new && put_persisted {
crate::operations::announce_contract_hosted(op_manager, &key).await;
let became_interested = op_manager.interest_manager.register_local_hosting(&key);
let added = if became_interested { vec![key] } else { vec![] };
if !added.is_empty() || !removed_contracts.is_empty() {
crate::operations::broadcast_change_interests(op_manager, added, removed_contracts)
.await;
}
} else if !removed_contracts.is_empty() {
crate::operations::broadcast_change_interests(op_manager, vec![], removed_contracts).await;
}
}
async fn assemble_and_cache_stream(
op_manager: &OpManager,
peer_addr: std::net::SocketAddr,
stream_id: StreamId,
expected_key: ContractKey,
includes_contract: bool,
) -> Result<(), String> {
let handle = match op_manager
.orphan_stream_registry()
.claim_or_wait(peer_addr, stream_id, STREAM_CLAIM_TIMEOUT)
.await
{
Ok(h) => h,
Err(OrphanStreamError::AlreadyClaimed) => {
tracing::debug!(
%peer_addr,
%stream_id,
"stream already claimed (dedup)"
);
return Ok(());
}
Err(e) => return Err(format!("claim_or_wait: {e}")),
};
let bytes = handle
.assemble()
.await
.map_err(|e| format!("stream assembly: {e}"))?;
let payload: GetStreamingPayload =
bincode::deserialize(&bytes).map_err(|e| format!("deserialize: {e}"))?;
if payload.key != expected_key {
return Err(format!(
"stream key mismatch: expected {expected_key}, got {}",
payload.key
));
}
let Some(state) = payload.value.state else {
return Err("stream payload has no state".into());
};
let contract = if includes_contract {
payload.value.contract
} else {
None
};
cache_contract_locally(op_manager, payload.key, state, contract).await;
Ok(())
}
async fn lookup_stored_key(
op_manager: &OpManager,
instance_id: &ContractInstanceId,
) -> Option<ContractKey> {
let lookup = op_manager
.notify_contract_handler(ContractHandlerEvent::GetQuery {
instance_id: *instance_id,
return_contract_code: false,
})
.await;
match lookup {
Ok(ContractHandlerEvent::GetResponse {
key: Some(key),
response: Ok(_),
}) => Some(key),
_ => None,
}
}
const MAX_RETRIES: usize = 3;
fn advance_to_next_peer(
op_manager: &OpManager,
instance_id: &ContractInstanceId,
tried: &mut Vec<std::net::SocketAddr>,
retries: &mut usize,
) -> Option<(PeerKeyLocation, std::net::SocketAddr)> {
if *retries >= MAX_RETRIES {
return None;
}
*retries += 1;
let peer = op_manager
.ring
.k_closest_potentially_hosting(instance_id, tried.as_slice(), 1)
.into_iter()
.next()?;
let addr = peer.socket_addr()?;
tried.push(addr);
Some((peer, addr))
}
async fn maybe_subscribe_child(
op_manager: &Arc<OpManager>,
client_tx: Transaction,
key: ContractKey,
subscribe: bool,
blocking_subscribe: bool,
) {
if !subscribe {
return;
}
use crate::operations::subscribe;
let child_tx = Transaction::new_child_of::<subscribe::SubscribeMsg>(&client_tx);
op_manager.expect_and_register_sub_operation(client_tx, child_tx);
if blocking_subscribe {
subscribe::run_client_subscribe(op_manager.clone(), *key.id(), child_tx).await;
} else {
GlobalExecutor::spawn(subscribe::run_client_subscribe(
op_manager.clone(),
*key.id(),
child_tx,
));
}
}
fn deliver_outcome(op_manager: &OpManager, client_tx: Transaction, outcome: DriverOutcome) {
match outcome {
DriverOutcome::Publish(result) => {
op_manager.send_client_result(client_tx, result);
}
DriverOutcome::InfrastructureError(err) => {
tracing::warn!(
tx = %client_tx,
error = %err,
"get (task-per-tx): infrastructure error; publishing synthesized client error"
);
let synthesized: HostResult = Err(ErrorKind::OperationError {
cause: format!("GET failed: {err}").into(),
}
.into());
op_manager.send_client_result(client_tx, synthesized);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn dummy_key() -> ContractKey {
ContractKey::from_id_and_code(ContractInstanceId::new([1u8; 32]), CodeHash::new([2u8; 32]))
}
fn dummy_tx() -> Transaction {
Transaction::new::<GetMsg>()
}
#[test]
fn classify_response_found_is_inline_terminal() {
let tx = dummy_tx();
let key = dummy_key();
let msg = NetMessage::V1(NetMessageV1::Get(GetMsg::Response {
id: tx,
instance_id: *key.id(),
result: GetMsgResult::Found {
key,
value: StoreResponse {
state: Some(WrappedState::new(vec![1u8])),
contract: None,
},
},
}));
assert!(matches!(
classify(msg),
AttemptOutcome::Terminal(Terminal::InlineFound { .. })
));
}
#[test]
fn classify_response_notfound_is_retry() {
let tx = dummy_tx();
let key = dummy_key();
let msg = NetMessage::V1(NetMessageV1::Get(GetMsg::Response {
id: tx,
instance_id: *key.id(),
result: GetMsgResult::NotFound,
}));
assert!(matches!(classify(msg), AttemptOutcome::Retry));
}
#[test]
fn classify_response_streaming_is_streaming_terminal() {
let tx = dummy_tx();
let key = dummy_key();
let msg = NetMessage::V1(NetMessageV1::Get(GetMsg::ResponseStreaming {
id: tx,
instance_id: *key.id(),
stream_id: crate::transport::peer_connection::StreamId::next(),
key,
total_size: 1024,
includes_contract: true,
}));
assert!(matches!(
classify(msg),
AttemptOutcome::Terminal(Terminal::Streaming { .. })
));
}
#[test]
fn classify_forwarding_ack_is_unexpected() {
let tx = dummy_tx();
let key = dummy_key();
let msg = NetMessage::V1(NetMessageV1::Get(GetMsg::ForwardingAck {
id: tx,
instance_id: *key.id(),
}));
assert!(
matches!(classify(msg), AttemptOutcome::Unexpected),
"ForwardingAck must NOT be classified as terminal (Phase 2b bug 2)"
);
}
#[test]
fn classify_response_streaming_ack_is_unexpected() {
let tx = dummy_tx();
let msg = NetMessage::V1(NetMessageV1::Get(GetMsg::ResponseStreamingAck {
id: tx,
stream_id: crate::transport::peer_connection::StreamId::next(),
}));
assert!(matches!(classify(msg), AttemptOutcome::Unexpected));
}
#[test]
fn classify_request_echo_is_local_completion() {
let tx = dummy_tx();
let key = dummy_key();
let msg = NetMessage::V1(NetMessageV1::Get(GetMsg::Request {
id: tx,
instance_id: *key.id(),
fetch_contract: true,
htl: 5,
visited: VisitedPeers::new(&tx),
subscribe: false,
}));
assert!(matches!(
classify(msg),
AttemptOutcome::Terminal(Terminal::LocalCompletion)
));
}
#[test]
fn classify_response_found_without_state_is_unexpected() {
let tx = dummy_tx();
let key = dummy_key();
let msg = NetMessage::V1(NetMessageV1::Get(GetMsg::Response {
id: tx,
instance_id: *key.id(),
result: GetMsgResult::Found {
key,
value: StoreResponse {
state: None,
contract: None,
},
},
}));
assert!(matches!(classify(msg), AttemptOutcome::Unexpected));
}
#[test]
fn classify_unexpected_for_non_get_message() {
let tx = dummy_tx();
let msg = NetMessage::V1(NetMessageV1::Aborted(tx));
assert!(matches!(classify(msg), AttemptOutcome::Unexpected));
}
#[test]
fn max_retries_boundary_exhausts_at_limit() {
let mut retries: usize = 0;
for _ in 0..MAX_RETRIES {
assert!(retries < MAX_RETRIES, "should not exhaust before limit");
retries += 1;
}
assert!(
retries >= MAX_RETRIES,
"should exhaust at MAX_RETRIES={MAX_RETRIES}"
);
}
#[test]
fn driver_outcome_exhausted_produces_client_error() {
let cause = "GET to contract failed after 3 attempts".to_string();
let outcome: DriverOutcome = match RetryLoopOutcome::<()>::Exhausted(cause) {
RetryLoopOutcome::Exhausted(cause) => {
DriverOutcome::Publish(Err(ErrorKind::OperationError {
cause: cause.into(),
}
.into()))
}
RetryLoopOutcome::Done(_)
| RetryLoopOutcome::Unexpected
| RetryLoopOutcome::InfraError(_) => unreachable!(),
};
assert!(
matches!(outcome, DriverOutcome::Publish(Err(_))),
"Exhaustion must produce a client error, not be swallowed"
);
}
fn production_source() -> &'static str {
const FULL: &str = include_str!("op_ctx_task.rs");
let cutoff = FULL
.find("#[cfg(test)]")
.expect("file must have a #[cfg(test)] section");
#[allow(clippy::manual_unwrap_or_default)]
{
&FULL[..cutoff]
}
}
fn extract_fn_body<'a>(source: &'a str, signature_prefix: &str) -> &'a str {
let start = source
.find(signature_prefix)
.unwrap_or_else(|| panic!("could not find {signature_prefix}"));
let brace = source[start..].find('{').expect("fn sig must have body");
let body_start = start + brace + 1;
let bytes = source.as_bytes();
let mut depth: i32 = 1;
let mut i = body_start;
while i < bytes.len() {
match bytes[i] {
b'{' => depth += 1,
b'}' => {
depth -= 1;
if depth == 0 {
return &source[body_start..i];
}
}
_ => {}
}
i += 1;
}
panic!("unterminated fn body for {signature_prefix}");
}
#[test]
fn cache_contract_locally_has_state_matches_short_circuit() {
let src = production_source();
let body = extract_fn_body(src, "async fn cache_contract_locally(");
let get_pos = body
.find("ContractHandlerEvent::GetQuery")
.unwrap_or(usize::MAX);
let put_pos = body
.find("ContractHandlerEvent::PutQuery")
.unwrap_or(usize::MAX);
let has_byte_compare = body.contains("as_ref() ==") || body.contains("state_matches");
let has_short_circuit = get_pos < put_pos && has_byte_compare;
assert!(
has_short_circuit,
"cache_contract_locally is missing the state_matches idempotency \
short-circuit from the legacy Response{{Found}} branch \
(get.rs:2218-2241). Without it the driver re-invokes PutQuery \
on identical state — regressing issue #2018 for contracts \
that enforce idempotency in update_state()."
);
}
#[test]
fn cache_contract_locally_runs_side_effects_on_put_error() {
let src = production_source();
let body = extract_fn_body(src, "async fn cache_contract_locally(");
let err_arm = body
.find("new_value: Err(")
.expect("PutResponse Err arm must exist");
let side_effect = body
.find("record_get_access")
.expect("record_get_access must be called");
assert!(
side_effect > err_arm,
"record_get_access must run AFTER the PutResponse match \
(outside both Ok and Err arms) so hosting LRU/TTL refresh on \
any successful wire-level GET — including when the local \
executor rejects the PutQuery. The legacy branch at \
get.rs:2420-2435 continues these side effects on error; \
the driver must match."
);
}
#[test]
fn driver_calls_auto_subscribe_on_get_response() {
let src = production_source();
assert!(
src.contains("auto_subscribe_on_get_response"),
"The driver must invoke `auto_subscribe_on_get_response` on \
successful GET terminal paths (AUTO_SUBSCRIBE_ON_GET = true in \
ring.rs:60). The legacy branch does this at get.rs:2313/2408/3136/3185; \
the driver must mirror it so client GETs with subscribe=false \
still register the fallback subscription."
);
}
#[test]
fn record_op_result_reflects_host_result_outcome() {
const SOURCE: &str = include_str!("op_ctx_task.rs");
let done_arm_start = SOURCE
.find("RetryLoopOutcome::Done(")
.expect("Done arm must exist");
let next_arm = SOURCE[done_arm_start..]
.find("RetryLoopOutcome::Exhausted")
.expect("Exhausted arm must follow");
let arm = &SOURCE[done_arm_start..done_arm_start + next_arm];
let call_pos = arm
.find("record_op_result")
.expect("record_op_result must be called in Done arm");
let tail = &arm[call_pos..];
let call_window = &tail[..tail.len().min(200)];
let looks_unconditional = call_window.contains("true,") && !call_window.contains("is_ok()");
assert!(
!looks_unconditional,
"record_op_result in the Done arm is passed an unconditional \
`true`. The success flag must track `host_result.is_ok()` so \
telemetry does not diverge from the client-visible outcome. \
Call window: {call_window}"
);
}
#[test]
fn driver_hardcodes_fetch_contract_true_per_issue_3757() {
let src = production_source();
let build_body = extract_fn_body(
src,
"fn build_request(&mut self, attempt_tx: Transaction) -> NetMessage {",
);
assert!(
build_body.contains("fetch_contract: true,"),
"GetMsg::Request.fetch_contract must stay hard-coded `true` — \
the node needs WASM for local validation/hosting regardless of \
the client's return_contract_code preference (issue #3757 / \
get.rs:52-55)."
);
}
#[test]
fn streaming_terminal_calls_assemble_and_cache_stream() {
let src = production_source();
let body = extract_fn_body(src, "async fn drive_client_get_inner(");
let arm = body
.find("Terminal::Streaming {")
.expect("Done arm must handle Terminal::Streaming");
let tail = &body[arm..];
let arm_end = tail[1..]
.find("Terminal::")
.map(|p| p + 1)
.unwrap_or(tail.len());
let arm_body = &tail[..arm_end];
assert!(
arm_body.contains("assemble_and_cache_stream"),
"Terminal::Streaming arm of drive_client_get_inner must call \
`assemble_and_cache_stream`. Without this, cold-cache streaming \
GETs return OperationError because nothing writes the local \
store. See bug #1 in PR #3884 review."
);
}
#[test]
fn streaming_payload_round_trips_via_bincode() {
let key = dummy_key();
let state_bytes = vec![0x42u8; 512];
let payload = GetStreamingPayload {
key,
value: StoreResponse {
state: Some(WrappedState::new(state_bytes.clone())),
contract: None,
},
};
let encoded = bincode::serialize(&payload).expect("bincode encode");
let decoded: GetStreamingPayload = bincode::deserialize(&encoded).expect("bincode decode");
assert_eq!(decoded.key, key);
assert_eq!(
decoded.value.state.as_ref().map(|s| s.as_ref().to_vec()),
Some(state_bytes),
"state bytes must round-trip through the streaming payload"
);
}
#[test]
fn assemble_and_cache_stream_performs_claim_assemble_key_check() {
let src = production_source();
let body = extract_fn_body(src, "async fn assemble_and_cache_stream(");
assert!(
body.contains("orphan_stream_registry") && body.contains("claim_or_wait"),
"assemble_and_cache_stream must claim the stream via \
orphan_stream_registry().claim_or_wait()"
);
assert!(
body.contains(".assemble()") && body.contains(".await"),
"assemble_and_cache_stream must await stream assembly"
);
assert!(
body.contains("GetStreamingPayload") && body.contains("bincode::deserialize"),
"assemble_and_cache_stream must deserialize the payload \
as GetStreamingPayload"
);
assert!(
body.contains("payload.key != expected_key"),
"assemble_and_cache_stream must verify the stream payload's \
key matches the expected ContractKey — a mismatch would \
silently cache the wrong contract under the expected key"
);
assert!(
body.contains("cache_contract_locally"),
"assemble_and_cache_stream must delegate the actual write \
and hosting side effects to cache_contract_locally"
);
}
#[test]
fn maybe_subscribe_child_short_circuits_on_false() {
const SOURCE: &str = include_str!("op_ctx_task.rs");
let fn_start = SOURCE
.find("async fn maybe_subscribe_child(")
.expect("maybe_subscribe_child must exist");
let body = &SOURCE[fn_start..];
let early_return = body
.find("if !subscribe {")
.expect("maybe_subscribe_child must short-circuit on !subscribe");
let register_call = body
.find("expect_and_register_sub_operation")
.expect("maybe_subscribe_child must register sub-operation");
assert!(
early_return < register_call,
"The !subscribe short-circuit must come BEFORE the \
expect_and_register_sub_operation call — otherwise we'd \
register a spurious sub-op for a client who didn't ask for \
one. See PUT 3a commit 494a3c69 for the analogous bug class."
);
}
}