use std::sync::Arc;
use either::Either;
use freenet_stdlib::prelude::*;
mod executor;
mod fair_queue;
pub(crate) mod governance;
mod handler;
pub mod storages;
pub(crate) mod user_input;
pub(crate) use executor::{
ContractExecutor, ExecutorTransactionStream, MAX_CREATED_DELEGATES_PER_NODE,
MAX_DELEGATE_CREATION_DEPTH, MAX_DELEGATE_CREATIONS_PER_CALL,
SUBSCRIBER_NOTIFICATION_CHANNEL_SIZE, UpsertOutcome, UpsertResult, mock_runtime::MockRuntime,
};
pub use executor::mock_runtime::{clear_crdt_contracts, is_crdt_contract, register_crdt_contract};
pub(crate) use handler::{
ClientResponsesReceiver, ClientResponsesSender, ContractHandler, ContractHandlerChannel,
ContractHandlerEvent, NetworkContractHandler, SenderHalve, SessionMessage, StashedResponder,
StoreResponse, WaitingResolution, WaitingTransaction, client_responses_channel,
contract_handler_channel,
in_memory::{
MemoryContractHandler, MockWasmContractHandler, MockWasmHandlerBuilder,
SimulationContractHandler, SimulationHandlerBuilder,
},
};
pub use executor::{ContractQueueFull, Executor, ExecutorError, OperationMode};
pub use handler::reset_event_id_counter;
use freenet_stdlib::client_api::DelegateRequest;
use tracing::Instrument;
use self::executor::DelegateNotificationReceiver;
use self::user_input::{CallerIdentity, UserInputPrompter};
use crate::config::GlobalExecutor;
const MAX_CONTRACT_REQUEST_ITERATIONS: usize = 100;
const MAX_DELEGATE_DRAIN_BATCH: usize = 16;
const DEFERRED_RELATED_FETCH_TIMEOUT: std::time::Duration =
std::time::Duration::from_secs(crate::config::OPERATION_TTL.as_secs() + 2);
const MAX_INFLIGHT_DEFERRALS: usize = 256;
const MAX_RESUME_DRAIN_BATCH: usize = 16;
struct DeferredResume {
deferral_id: u64,
key: ContractKey,
update: Either<WrappedState, StateDelta<'static>>,
related_contracts: RelatedContracts<'static>,
code: Option<ContractContainer>,
is_put: bool,
fetched: Result<Vec<(ContractInstanceId, WrappedState)>, ExecutorError>,
}
struct ResumeGuard {
payload: Option<ResumePayload>,
}
struct ResumePayload {
resume_tx: tokio::sync::mpsc::UnboundedSender<DeferredResume>,
deferral_id: u64,
key: ContractKey,
update: Either<WrappedState, StateDelta<'static>>,
related_contracts: RelatedContracts<'static>,
code: Option<ContractContainer>,
is_put: bool,
}
impl ResumeGuard {
fn new(payload: ResumePayload) -> Self {
Self {
payload: Some(payload),
}
}
fn send(mut self, fetched: Result<Vec<(ContractInstanceId, WrappedState)>, ExecutorError>) {
if let Some(p) = self.payload.take() {
Self::deliver(p, fetched);
}
}
fn deliver(
p: ResumePayload,
fetched: Result<Vec<(ContractInstanceId, WrappedState)>, ExecutorError>,
) {
let ResumePayload {
resume_tx,
deferral_id,
key,
update,
related_contracts,
code,
is_put,
} = p;
if resume_tx
.send(DeferredResume {
deferral_id,
key,
update,
related_contracts,
code,
is_put,
fetched,
})
.is_err()
{
tracing::debug!(
contract = %key,
"Deferred resume channel closed; contract-handling loop gone"
);
}
}
}
impl Drop for ResumeGuard {
fn drop(&mut self) {
if let Some(p) = self.payload.take() {
let key_id = *p.key.id();
tracing::warn!(
contract = %p.key,
deferral_id = p.deferral_id,
"Off-loop waiter dropped before sending — delivering MissingRelated \
resume so the deferral terminates and the client is answered (#4391)"
);
Self::deliver(p, Err(ExecutorError::missing_related(key_id)));
}
}
}
struct DeferralCtx {
resume_tx: tokio::sync::mpsc::UnboundedSender<DeferredResume>,
stashed: std::collections::HashMap<u64, StashedResponder>,
next_deferral_id: u64,
}
impl DeferralCtx {
fn new(resume_tx: tokio::sync::mpsc::UnboundedSender<DeferredResume>) -> Self {
Self {
resume_tx,
stashed: std::collections::HashMap::new(),
next_deferral_id: 0,
}
}
fn at_capacity(&self) -> bool {
self.stashed.len() >= MAX_INFLIGHT_DEFERRALS
}
}
#[cfg(test)]
pub(crate) type OffLoopFetchStub = Arc<
dyn Fn(
Vec<ContractInstanceId>,
) -> std::pin::Pin<
Box<
dyn std::future::Future<
Output = Result<Vec<(ContractInstanceId, WrappedState)>, ExecutorError>,
> + Send,
>,
> + Send
+ Sync,
>;
#[cfg(test)]
static OFF_LOOP_FETCH_OVERRIDE: std::sync::Mutex<Option<OffLoopFetchStub>> =
std::sync::Mutex::new(None);
#[cfg(test)]
pub(crate) fn set_off_loop_fetch_override(stub: Option<OffLoopFetchStub>) {
*OFF_LOOP_FETCH_OVERRIDE.lock().unwrap() = stub;
}
fn map_sub_op_outcome(
outcome: crate::operations::get::op_ctx_task::SubOpGetOutcome,
id: ContractInstanceId,
) -> Result<WrappedState, ExecutorError> {
use crate::operations::get::op_ctx_task::SubOpGetOutcome;
match outcome {
SubOpGetOutcome::Found(get_result) => {
Ok(WrappedState::from(get_result.state.as_ref().to_vec()))
}
SubOpGetOutcome::NotFound(_) | SubOpGetOutcome::Infra(_) => {
Err(ExecutorError::missing_related(id))
}
}
}
async fn fetch_related_off_loop(
op_manager: Option<Arc<crate::node::OpManager>>,
missing: Vec<ContractInstanceId>,
) -> Result<Vec<(ContractInstanceId, WrappedState)>, ExecutorError> {
#[cfg(test)]
{
let stub = OFF_LOOP_FETCH_OVERRIDE.lock().unwrap().clone();
if let Some(stub) = stub {
return stub(missing).await;
}
}
let Some(op_manager) = op_manager else {
let id = missing
.first()
.copied()
.unwrap_or_else(|| ContractInstanceId::new([0u8; 32]));
return Err(ExecutorError::missing_related(id));
};
let fetch_all = async {
let results: Vec<(ContractInstanceId, Result<WrappedState, ExecutorError>)> =
futures::future::join_all(missing.iter().map(|id| {
let id = *id;
let op_manager = op_manager.clone();
async move {
let (_tx, rx) = crate::operations::get::op_ctx_task::start_sub_op_get(
&op_manager,
id,
false,
);
let mapped = match rx.await {
Ok(outcome) => map_sub_op_outcome(outcome, id),
Err(_) => Err(ExecutorError::missing_related(id)),
};
(id, mapped)
}
}))
.await;
let mut fetched = Vec::with_capacity(results.len());
for (id, res) in results {
fetched.push((id, res?));
}
Ok::<_, ExecutorError>(fetched)
};
match tokio::time::timeout(DEFERRED_RELATED_FETCH_TIMEOUT, fetch_all).await {
Ok(result) => result,
Err(_) => {
let id = missing
.first()
.copied()
.unwrap_or_else(|| ContractInstanceId::new([0u8; 32]));
Err(ExecutorError::missing_related(id))
}
}
}
async fn handle_delegate_with_contract_requests<CH, P>(
contract_handler: &mut CH,
initial_req: DelegateRequest<'static>,
origin_contract: Option<&ContractInstanceId>,
delegate_key: &DelegateKey,
prompter: &P,
) -> Vec<OutboundDelegateMsg>
where
CH: ContractHandler + Send + 'static,
P: UserInputPrompter,
{
let initial_params = match &initial_req {
DelegateRequest::ApplicationMessages { params, .. } => params.clone(),
DelegateRequest::RegisterDelegate { .. } | DelegateRequest::UnregisterDelegate(_) | _ => {
Parameters::from(Vec::new())
}
};
let mut current_req = initial_req;
let current_params = initial_params;
let mut iterations = 0;
let mut accumulated_messages: Vec<OutboundDelegateMsg> = Vec::new();
loop {
iterations += 1;
if iterations > MAX_CONTRACT_REQUEST_ITERATIONS {
tracing::error!(
delegate_key = %delegate_key,
iterations = iterations,
"Exceeded maximum contract request iterations, possible infinite loop"
);
return accumulated_messages;
}
let values = match contract_handler
.executor()
.execute_delegate_request(current_req, origin_contract, None)
.await
{
Ok(freenet_stdlib::client_api::HostResponse::DelegateResponse { key: _, values }) => {
values
}
Ok(freenet_stdlib::client_api::HostResponse::Ok) => Vec::new(),
Ok(_other) => {
tracing::error!(
delegate_key = %delegate_key,
phase = "unexpected_response",
"Unexpected response type from delegate request"
);
return accumulated_messages;
}
Err(err) => {
if err.is_missing_delegate() {
tracing::warn!(
delegate_key = %delegate_key,
"Delegate not found in store (expected for migration probes)"
);
} else {
tracing::error!(
delegate_key = %delegate_key,
error = %err,
phase = "execution_failed",
"Failed executing delegate request"
);
}
return accumulated_messages;
}
};
let mut get_requests: Vec<GetContractRequest> = Vec::new();
let mut put_requests: Vec<PutContractRequest> = Vec::new();
let mut update_requests: Vec<UpdateContractRequest> = Vec::new();
let mut subscribe_requests: Vec<SubscribeContractRequest> = Vec::new();
let mut delegate_messages: Vec<DelegateMessage> = Vec::new();
let mut user_input_requests: Vec<UserInputRequest<'static>> = Vec::new();
for msg in values {
match msg {
OutboundDelegateMsg::GetContractRequest(req) => {
get_requests.push(req);
}
OutboundDelegateMsg::PutContractRequest(req) => {
put_requests.push(req);
}
OutboundDelegateMsg::UpdateContractRequest(req) => {
update_requests.push(req);
}
OutboundDelegateMsg::SubscribeContractRequest(req) => {
subscribe_requests.push(req);
}
OutboundDelegateMsg::SendDelegateMessage(msg) => {
delegate_messages.push(msg);
}
OutboundDelegateMsg::RequestUserInput(req) => {
user_input_requests.push(req);
}
other @ OutboundDelegateMsg::ApplicationMessage(_)
| other @ OutboundDelegateMsg::ContextUpdated(_) => {
accumulated_messages.push(other);
}
}
}
if get_requests.is_empty()
&& put_requests.is_empty()
&& update_requests.is_empty()
&& subscribe_requests.is_empty()
&& delegate_messages.is_empty()
&& user_input_requests.is_empty()
{
return accumulated_messages;
}
let mut inbound_responses: Vec<InboundDelegateMsg<'static>> = Vec::new();
if !put_requests.is_empty() {
tracing::debug!(
delegate_key = %delegate_key,
count = put_requests.len(),
"Processing PutContractRequest messages from delegate"
);
for req in put_requests {
let contract_key = req.contract.key();
let context = req.context;
let result = contract_handler
.executor()
.upsert_contract_state(
contract_key,
Either::Left(req.state),
req.related_contracts,
Some(req.contract),
)
.await;
let put_result = match result {
Ok(_) => Ok(()),
Err(err) => {
tracing::warn!(
contract = %contract_key,
error = %err,
"Failed to upsert contract for delegate PutContractRequest"
);
Err(format!("{err}"))
}
};
inbound_responses.push(InboundDelegateMsg::PutContractResponse(
PutContractResponse {
contract_id: *contract_key.id(),
result: put_result,
context,
},
));
}
}
if !get_requests.is_empty() {
tracing::debug!(
delegate_key = %delegate_key,
count = get_requests.len(),
"Processing GetContractRequest messages from delegate"
);
for req in get_requests {
let contract_id = req.contract_id;
let context = req.context;
let state = match contract_handler.executor().lookup_key(&contract_id) {
Some(full_key) => {
match contract_handler
.executor()
.fetch_contract(full_key, false)
.await
{
Ok((state, _)) => state,
Err(err) => {
tracing::warn!(
contract = %contract_id,
error = %err,
"Failed to fetch contract for delegate GetContractRequest"
);
None
}
}
}
None => {
tracing::debug!(
contract = %contract_id,
"Contract not found locally for delegate GetContractRequest"
);
None
}
};
inbound_responses.push(InboundDelegateMsg::GetContractResponse(
GetContractResponse {
contract_id,
state,
context,
},
));
}
}
if !update_requests.is_empty() {
tracing::debug!(
delegate_key = %delegate_key,
count = update_requests.len(),
"Processing UpdateContractRequest messages from delegate"
);
for req in update_requests {
let contract_id = req.contract_id;
let context = req.context;
let result = match contract_handler.executor().lookup_key(&contract_id) {
Some(full_key) => {
let update_value: Either<WrappedState, StateDelta<'static>> = match req
.update
{
freenet_stdlib::prelude::UpdateData::State(state) => {
Either::Left(WrappedState::from(state.into_bytes()))
}
freenet_stdlib::prelude::UpdateData::Delta(delta) => {
Either::Right(delta)
}
other @ freenet_stdlib::prelude::UpdateData::StateAndDelta {
..
}
| other @ freenet_stdlib::prelude::UpdateData::RelatedState { .. }
| other @ freenet_stdlib::prelude::UpdateData::RelatedDelta { .. }
| other @ freenet_stdlib::prelude::UpdateData::RelatedStateAndDelta {
..
}
| other => {
tracing::warn!(
contract = %contract_id,
variant = ?std::mem::discriminant(&other),
"Unsupported UpdateData variant in delegate UpdateContractRequest \
(only State and Delta are supported)"
);
inbound_responses.push(InboundDelegateMsg::UpdateContractResponse(
UpdateContractResponse {
contract_id,
result: Err("Unsupported UpdateData variant".to_string()),
context,
},
));
continue;
}
};
contract_handler
.executor()
.upsert_contract_state(
full_key,
update_value,
RelatedContracts::default(),
None,
)
.await
}
None => {
tracing::debug!(
contract = %contract_id,
"Contract not found locally for delegate UpdateContractRequest"
);
inbound_responses.push(InboundDelegateMsg::UpdateContractResponse(
UpdateContractResponse {
contract_id,
result: Err("Contract not found".to_string()),
context,
},
));
continue;
}
};
let update_result = match result {
Ok(_) => Ok(()),
Err(err) => {
tracing::warn!(
contract = %contract_id,
error = %err,
"Failed to update contract for delegate UpdateContractRequest"
);
Err(format!("{err}"))
}
};
inbound_responses.push(InboundDelegateMsg::UpdateContractResponse(
UpdateContractResponse {
contract_id,
result: update_result,
context,
},
));
}
}
if !subscribe_requests.is_empty() {
tracing::debug!(
delegate_key = %delegate_key,
count = subscribe_requests.len(),
"Processing SubscribeContractRequest messages from delegate"
);
for req in subscribe_requests {
let contract_id = req.contract_id;
let context = req.context;
let result = if contract_handler
.executor()
.lookup_key(&contract_id)
.is_some()
{
crate::wasm_runtime::DELEGATE_SUBSCRIPTIONS
.entry(contract_id)
.or_default()
.insert(delegate_key.clone());
Ok(())
} else {
tracing::debug!(
contract = %contract_id,
"Contract not found locally for delegate SubscribeContractRequest"
);
Err("Contract not found".to_string())
};
inbound_responses.push(InboundDelegateMsg::SubscribeContractResponse(
SubscribeContractResponse {
contract_id,
result,
context,
},
));
}
}
if !delegate_messages.is_empty() {
tracing::debug!(
delegate_key = %delegate_key,
count = delegate_messages.len(),
"Delivering delegate-to-delegate messages"
);
for msg in delegate_messages {
let target_key = msg.target.clone();
let inbound = vec![InboundDelegateMsg::DelegateMessage(msg)];
let target_req = DelegateRequest::ApplicationMessages {
key: target_key.clone(),
params: Parameters::from(Vec::new()),
inbound,
};
match contract_handler
.executor()
.execute_delegate_request(target_req, None, Some(delegate_key))
.await
{
Ok(freenet_stdlib::client_api::HostResponse::DelegateResponse {
values,
..
}) => {
for value in values {
if !matches!(value, OutboundDelegateMsg::SendDelegateMessage(_)) {
accumulated_messages.push(value);
}
}
}
Ok(_) => {}
Err(err) => {
tracing::warn!(
target_delegate = %target_key,
error = %err,
"Failed to deliver delegate message (fire-and-forget)"
);
}
}
}
}
if !user_input_requests.is_empty() {
tracing::debug!(
delegate_key = %delegate_key,
count = user_input_requests.len(),
"Processing UserInputRequest messages from delegate"
);
let caller = caller_identity_from_origin(origin_contract);
let delegate_key_str = delegate_key.to_string();
for req in user_input_requests {
let request_id = req.request_id;
let response = match prompter
.prompt(&req, &delegate_key_str, caller.clone())
.await
{
Some((_, response)) => response,
None => {
tracing::warn!(
request_id,
delegate = %delegate_key,
"User input request timed out or was denied"
);
ClientResponse::new(Vec::new())
}
};
inbound_responses.push(InboundDelegateMsg::UserResponse(UserInputResponse {
request_id,
response,
context: DelegateContext::default(),
}));
}
}
current_req = DelegateRequest::ApplicationMessages {
key: delegate_key.clone(),
inbound: inbound_responses,
params: current_params.clone(),
};
}
}
async fn recv_delegate_notification(
rx: &mut Option<DelegateNotificationReceiver>,
) -> executor::DelegateNotification {
match rx {
Some(rx) => match rx.recv().await {
Some(n) => n,
None => std::future::pending().await,
},
None => std::future::pending().await,
}
}
fn try_recv_delegate_notification(
rx: &mut Option<DelegateNotificationReceiver>,
) -> Option<executor::DelegateNotification> {
match rx {
Some(rx) => rx.try_recv().ok(),
None => None,
}
}
fn caller_identity_from_origin(origin: Option<&ContractInstanceId>) -> CallerIdentity {
match origin {
Some(id) => CallerIdentity::WebApp(id.to_string()),
None => CallerIdentity::None,
}
}
fn inject_related_state(
related_contracts: &mut RelatedContracts<'static>,
related_to: ContractInstanceId,
state: freenet_stdlib::prelude::State<'static>,
) {
related_contracts.missing(vec![related_to]);
let mut state_holder = Some(state);
for (id, slot) in related_contracts.update() {
if id == &related_to {
*slot = state_holder.take();
break;
}
}
debug_assert!(
state_holder.is_none(),
"RelatedContracts::missing() must register the slot it just declared, \
so update() should always have a slot for the requested id"
);
}
pub(crate) async fn contract_handling<CH, P>(
mut contract_handler: CH,
prompter: P,
) -> Result<(), ContractError>
where
CH: ContractHandler + Send + 'static,
P: UserInputPrompter,
{
let mut delegate_rx = contract_handler.executor().take_delegate_notification_rx();
let mut fair_queue = fair_queue::FairEventQueue::new();
let (resume_tx, mut resume_rx) = tokio::sync::mpsc::unbounded_channel::<DeferredResume>();
let mut deferral_ctx = DeferralCtx::new(resume_tx);
loop {
for _ in 0..MAX_RESUME_DRAIN_BATCH {
match resume_rx.try_recv() {
Ok(resume) => {
handle_deferred_resume(&mut contract_handler, &mut deferral_ctx, resume)
.await?;
}
Err(_) => break,
}
}
for _ in 0..fair_queue::MAX_DRAIN_BATCH {
match contract_handler.channel().try_recv_from_sender()? {
Some((id, event)) => {
if let ContractHandlerEvent::ClientDisconnect { client_id } = &event {
let client_id = *client_id;
contract_handler.executor().remove_client(client_id);
contract_handler.channel().drop_waiting_response(id);
continue;
}
if let Err(rejected) = fair_queue.try_push(id, event) {
track_pending_reclamation_if_evict(&mut contract_handler, &rejected);
send_queue_full_response(contract_handler.channel(), rejected).await;
}
}
None => break,
}
}
for _ in 0..MAX_DELEGATE_DRAIN_BATCH {
match try_recv_delegate_notification(&mut delegate_rx) {
Some(notification) => {
handle_delegate_notification(&mut contract_handler, notification, &prompter)
.await;
}
None => break,
}
}
if let Some((id, event)) = fair_queue.pop() {
handle_contract_event(
&mut contract_handler,
id,
event,
&prompter,
Some(&mut deferral_ctx),
)
.await?;
continue;
}
tokio::select! {
result = contract_handler.channel().recv_from_sender() => {
let (id, event) = result?;
if let Err(rejected) = fair_queue.try_push(id, event) {
track_pending_reclamation_if_evict(&mut contract_handler, &rejected);
send_queue_full_response(contract_handler.channel(), rejected).await;
}
}
Some(resume) = resume_rx.recv() => {
handle_deferred_resume(&mut contract_handler, &mut deferral_ctx, resume).await?;
}
notification = recv_delegate_notification(&mut delegate_rx) => {
handle_delegate_notification(&mut contract_handler, notification, &prompter).await;
}
}
}
}
async fn handle_deferred_resume<CH>(
contract_handler: &mut CH,
deferral_ctx: &mut DeferralCtx,
resume: DeferredResume,
) -> Result<(), ContractError>
where
CH: ContractHandler + Send + 'static,
{
let DeferredResume {
deferral_id,
key,
update,
mut related_contracts,
code,
is_put,
fetched,
} = resume;
let responder = deferral_ctx.stashed.remove(&deferral_id);
if responder.is_none() {
tracing::debug!(contract = %key, "Deferred resume has no stashed responder");
}
let event_result = match fetched {
Ok(states) => {
for (id, state) in states {
inject_related_state(
&mut related_contracts,
id,
freenet_stdlib::prelude::State::from(state.as_ref().to_vec()),
);
}
let incoming_state = match &update {
Either::Left(state) => Some(state.clone()),
Either::Right(_) => None,
};
let result = match contract_handler
.executor()
.upsert_contract_state_deferrable(key, update, related_contracts, code)
.instrument(tracing::info_span!("upsert_contract_state_resumed", %key))
.await
{
Ok(UpsertOutcome::Completed(result)) => Ok(result),
Ok(UpsertOutcome::DeferRelated(missing)) => {
tracing::debug!(
contract = %key,
missing = missing.len(),
"Resumed upsert still needs a related contract — surfacing \
MissingRelated (one-deferral cap, #4391)"
);
Err(ExecutorError::missing_related(
missing.first().copied().unwrap_or_else(|| *key.id()),
))
}
Err(err) => Err(err),
};
if is_put {
let incoming_state =
incoming_state.unwrap_or_else(|| WrappedState::new(Vec::new()));
put_response_from_result(contract_handler, key, incoming_state, result).await?
} else {
update_response_from_result(contract_handler, key, result).await?
}
}
Err(err) => {
tracing::debug!(
contract = %key,
error = %err,
"Deferred related-contract fetch failed; surfacing to client"
);
if is_put {
ContractHandlerEvent::PutResponse {
new_value: Err(err),
state_changed: false,
}
} else {
ContractHandlerEvent::UpdateResponse {
new_value: Err(err),
state_changed: false,
}
}
}
};
if let Some(responder) = responder {
if let Err(error) = responder.respond(event_result) {
tracing::debug!(
error = %error,
contract = %key,
"Failed to deliver deferred upsert response (client may have disconnected)"
);
}
}
Ok(())
}
enum DeferStep {
Completed(Result<UpsertResult, ExecutorError>),
Deferred,
}
#[allow(clippy::too_many_arguments)]
async fn maybe_defer_upsert<CH>(
contract_handler: &mut CH,
deferral: Option<&mut DeferralCtx>,
id: &handler::EventId,
key: ContractKey,
update: Either<WrappedState, StateDelta<'static>>,
related_contracts: RelatedContracts<'static>,
code: Option<ContractContainer>,
is_put: bool,
) -> DeferStep
where
CH: ContractHandler + Send + 'static,
{
let Some(deferral) = deferral else {
return DeferStep::Completed(
contract_handler
.executor()
.upsert_contract_state(key, update, related_contracts, code)
.instrument(tracing::info_span!("upsert_contract_state", %key))
.await,
);
};
let outcome = contract_handler
.executor()
.upsert_contract_state_deferrable(
key,
update.clone(),
related_contracts.clone(),
code.clone(),
)
.instrument(tracing::info_span!("upsert_contract_state_deferrable", %key))
.await;
let missing = match outcome {
Ok(UpsertOutcome::Completed(result)) => return DeferStep::Completed(Ok(result)),
Ok(UpsertOutcome::DeferRelated(missing)) => missing,
Err(err) => return DeferStep::Completed(Err(err)),
};
if deferral.at_capacity() {
tracing::warn!(
contract = %key,
inflight = deferral.stashed.len(),
limit = MAX_INFLIGHT_DEFERRALS,
"Deferral capacity reached — failing fast with MissingRelated (not deferring)"
);
let id = missing.first().copied().unwrap_or_else(|| *key.id());
return DeferStep::Completed(Err(ExecutorError::missing_related(id)));
}
let Some(responder) = contract_handler.channel().take_waiting_response(id) else {
return DeferStep::Completed(
contract_handler
.executor()
.upsert_contract_state(key, update, related_contracts, code)
.instrument(tracing::info_span!("upsert_contract_state", %key))
.await,
);
};
let deferral_id = deferral.next_deferral_id;
deferral.next_deferral_id = deferral.next_deferral_id.wrapping_add(1);
deferral.stashed.insert(deferral_id, responder);
let op_manager = contract_handler.executor().op_manager_handle();
let resume_tx = deferral.resume_tx.clone();
tracing::debug!(
contract = %key,
missing = missing.len(),
deferral_id,
"Off-loading related-contract fetch from the contract-handling loop (#4391)"
);
let guard = ResumeGuard::new(ResumePayload {
resume_tx,
deferral_id,
key,
update,
related_contracts,
code,
is_put,
});
GlobalExecutor::spawn(async move {
let fetched = fetch_related_off_loop(op_manager, missing).await;
guard.send(fetched);
});
DeferStep::Deferred
}
async fn put_response_from_result<CH>(
contract_handler: &mut CH,
key: ContractKey,
incoming_state: WrappedState,
put_result: Result<UpsertResult, ExecutorError>,
) -> Result<ContractHandlerEvent, ContractError>
where
CH: ContractHandler + Send + 'static,
{
let _ = contract_handler;
Ok(match put_result {
Ok(UpsertResult::NoChange) => ContractHandlerEvent::PutResponse {
new_value: Ok(incoming_state),
state_changed: false,
},
Ok(UpsertResult::Updated(new_state)) => ContractHandlerEvent::PutResponse {
new_value: Ok(new_state),
state_changed: true,
},
Ok(UpsertResult::CurrentWon(current_state)) => {
ContractHandlerEvent::PutResponse {
new_value: Ok(current_state),
state_changed: false,
}
}
Err(err) => {
if err.is_fatal() {
tracing::error!(
contract = %key,
error = %err,
phase = "fatal_error",
"Fatal executor error during put query"
);
return Err(ContractError::FatalExecutorError { key, error: err });
}
ContractHandlerEvent::PutResponse {
new_value: Err(err),
state_changed: false,
}
}
})
}
async fn update_response_from_result<CH>(
contract_handler: &mut CH,
key: ContractKey,
update_result: Result<UpsertResult, ExecutorError>,
) -> Result<ContractHandlerEvent, ContractError>
where
CH: ContractHandler + Send + 'static,
{
Ok(match update_result {
Ok(UpsertResult::NoChange) => {
tracing::debug!(
contract = %key,
phase = "update_no_change",
"UPDATE resulted in NoChange, fetching current state to return UpdateResponse"
);
match contract_handler.executor().fetch_contract(key, false).await {
Ok((Some(current_state), _)) => {
tracing::debug!(
contract = %key,
phase = "fetch_complete",
"Successfully fetched current state for NoChange update"
);
ContractHandlerEvent::UpdateResponse {
new_value: Ok(current_state),
state_changed: false,
}
}
Ok((None, _)) => {
tracing::warn!(
contract = %key,
phase = "fetch_failed",
"No state found when fetching for NoChange update"
);
ContractHandlerEvent::UpdateNoChange { key }
}
Err(err) => {
tracing::error!(
contract = %key,
error = %err,
phase = "fetch_error",
"Error fetching state for NoChange update"
);
ContractHandlerEvent::UpdateNoChange { key }
}
}
}
Ok(UpsertResult::Updated(state)) => ContractHandlerEvent::UpdateResponse {
new_value: Ok(state),
state_changed: true,
},
Ok(UpsertResult::CurrentWon(current_state)) => {
ContractHandlerEvent::UpdateResponse {
new_value: Ok(current_state),
state_changed: false,
}
}
Err(err) => {
if err.is_fatal() {
tracing::error!(
contract = %key,
error = %err,
phase = "fatal_error",
"Fatal executor error during update query"
);
return Err(ContractError::FatalExecutorError { key, error: err });
}
ContractHandlerEvent::UpdateResponse {
new_value: Err(err),
state_changed: false,
}
}
})
}
fn track_pending_reclamation_if_evict<CH>(
contract_handler: &mut CH,
rejected: &fair_queue::RejectedEvent,
) where
CH: ContractHandler + Send + 'static,
{
if let ContractHandlerEvent::EvictContract {
key,
expected_generation,
} = &rejected.event
{
contract_handler
.executor()
.track_pending_reclamation(*key, *expected_generation);
}
}
async fn send_queue_full_response(
channel: &mut handler::ContractHandlerChannel<handler::ContractHandlerHalve>,
rejected: Box<fair_queue::RejectedEvent>,
) {
tracing::debug!(
event = %rejected.event,
"Rejected event due to per-contract queue capacity limit"
);
let make_err = || ExecutorError::other(ContractQueueFull);
let response = match &rejected.event {
ContractHandlerEvent::PutQuery { .. } => ContractHandlerEvent::PutResponse {
new_value: Err(make_err()),
state_changed: false,
},
ContractHandlerEvent::UpdateQuery { .. } => ContractHandlerEvent::UpdateResponse {
new_value: Err(make_err()),
state_changed: false,
},
ContractHandlerEvent::GetQuery { .. } => ContractHandlerEvent::GetResponse {
key: None,
response: Err(make_err()),
},
ContractHandlerEvent::GetSummaryQuery { key, .. } => {
ContractHandlerEvent::GetSummaryResponse {
key: *key,
summary: Err(make_err()),
}
}
ContractHandlerEvent::GetDeltaQuery { key, .. } => ContractHandlerEvent::GetDeltaResponse {
key: *key,
delta: Err(make_err()),
},
ContractHandlerEvent::DelegateRequest { .. }
| ContractHandlerEvent::DelegateResponse(_)
| ContractHandlerEvent::PutResponse { .. }
| ContractHandlerEvent::GetResponse { .. }
| ContractHandlerEvent::UpdateResponse { .. }
| ContractHandlerEvent::UpdateNoChange { .. }
| ContractHandlerEvent::RegisterSubscriberListener { .. }
| ContractHandlerEvent::RegisterSubscriberListenerResponse
| ContractHandlerEvent::QuerySubscriptions { .. }
| ContractHandlerEvent::QuerySubscriptionsResponse
| ContractHandlerEvent::GetSummaryResponse { .. }
| ContractHandlerEvent::GetDeltaResponse { .. }
| ContractHandlerEvent::ClientDisconnect { .. }
| ContractHandlerEvent::EvictContract { .. } => {
channel.drop_waiting_response(rejected.id);
return;
}
};
if let Err(error) = channel.send_to_sender(rejected.id, response).await {
tracing::warn!(
error = %error,
"Failed to send queue-full response (client may have disconnected)"
);
}
}
async fn handle_delegate_notification<CH, P>(
contract_handler: &mut CH,
notification: executor::DelegateNotification,
prompter: &P,
) where
CH: ContractHandler + Send + 'static,
P: UserInputPrompter,
{
let executor::DelegateNotification {
delegate_key,
contract_id,
new_state,
} = notification;
tracing::debug!(
delegate = %delegate_key,
contract = %contract_id,
"Delivering contract notification to delegate"
);
let owned_state = Arc::try_unwrap(new_state).unwrap_or_else(|arc| (*arc).clone());
let inbound = vec![InboundDelegateMsg::ContractNotification(
ContractNotification {
contract_id,
new_state: owned_state,
context: DelegateContext::default(),
},
)];
let req = DelegateRequest::ApplicationMessages {
key: delegate_key.clone(),
params: Parameters::from(vec![]),
inbound,
};
let outbound = handle_delegate_with_contract_requests(
contract_handler,
req,
None,
&delegate_key,
prompter,
)
.await;
for msg in &outbound {
match msg {
OutboundDelegateMsg::ApplicationMessage(app_msg) => {
tracing::warn!(
delegate = %delegate_key,
payload_len = app_msg.payload.len(),
"Delegate produced ApplicationMessage from contract notification \
but no client routing is available yet — message dropped"
);
}
OutboundDelegateMsg::RequestUserInput(_)
| OutboundDelegateMsg::ContextUpdated(_)
| OutboundDelegateMsg::GetContractRequest(_)
| OutboundDelegateMsg::PutContractRequest(_)
| OutboundDelegateMsg::UpdateContractRequest(_)
| OutboundDelegateMsg::SubscribeContractRequest(_)
| OutboundDelegateMsg::SendDelegateMessage(_) => {
tracing::warn!(
delegate = %delegate_key,
msg_type = ?std::mem::discriminant(msg),
"Delegate produced unexpected outbound message from contract notification — dropped"
);
}
}
}
}
async fn handle_contract_event<CH, P>(
contract_handler: &mut CH,
id: handler::EventId,
event: ContractHandlerEvent,
prompter: &P,
deferral: Option<&mut DeferralCtx>,
) -> Result<(), ContractError>
where
CH: ContractHandler + Send + 'static,
P: UserInputPrompter,
{
tracing::debug!(
event = %event,
"Received contract handling event"
);
match event {
ContractHandlerEvent::GetQuery {
instance_id,
return_contract_code,
} => {
let key = contract_handler.executor().lookup_key(&instance_id);
match key {
Some(key) => {
match contract_handler
.executor()
.fetch_contract(key, return_contract_code)
.instrument(
tracing::info_span!("fetch_contract", %key, %return_contract_code),
)
.await
{
Ok((state, contract)) => {
tracing::debug!(
contract = %key,
with_contract_code = return_contract_code,
has_contract = contract.is_some(),
has_state = state.is_some(),
phase = "get_complete",
"Fetched contract"
);
if let Err(error) = contract_handler
.channel()
.send_to_sender(
id,
ContractHandlerEvent::GetResponse {
key: Some(key),
response: Ok(StoreResponse { state, contract }),
},
)
.await
{
tracing::debug!(
error = %error,
contract = %key,
"Failed to send GET response (client may have disconnected)"
);
}
}
Err(err) => {
tracing::warn!(
contract = %key,
error = %err,
phase = "get_failed",
"Error executing get contract query"
);
if err.is_fatal() {
tracing::error!(
contract = %key,
error = %err,
phase = "fatal_error",
"Fatal executor error during get query"
);
return Err(ContractError::FatalExecutorError { key, error: err });
}
if let Err(error) = contract_handler
.channel()
.send_to_sender(
id,
ContractHandlerEvent::GetResponse {
key: Some(key),
response: Err(err),
},
)
.await
{
tracing::debug!(
error = %error,
contract = %key,
"Failed to send GET error response (client may have disconnected)"
);
}
}
}
}
None => {
tracing::debug!(
instance_id = %instance_id,
phase = "not_found",
"Contract not found in local store"
);
if let Err(error) = contract_handler
.channel()
.send_to_sender(
id,
ContractHandlerEvent::GetResponse {
key: None,
response: Ok(StoreResponse {
state: None,
contract: None,
}),
},
)
.await
{
tracing::debug!(
error = %error,
instance_id = %instance_id,
"Failed to send GET not-found response (client may have disconnected)"
);
}
}
}
}
ContractHandlerEvent::PutQuery {
key,
state,
related_contracts,
contract,
} => {
if let Some(ref contract_container) = contract {
tracing::debug!(
contract = %key,
key_code_hash = ?key.code_hash(),
container_key = %contract_container.key(),
container_code_hash = ?contract_container.key().code_hash(),
data_len = contract_container.data().len(),
phase = "put_query_debug",
"DEBUG PUT: In PutQuery handler with contract"
);
} else {
tracing::debug!(
contract = %key,
phase = "put_query_debug",
"DEBUG PUT: In PutQuery handler - contract is None"
);
}
let put_result = match maybe_defer_upsert(
contract_handler,
deferral,
&id,
key,
Either::Left(state.clone()),
related_contracts,
contract,
true,
)
.await
{
DeferStep::Deferred => return Ok(()),
DeferStep::Completed(result) => result,
};
let event_result =
put_response_from_result(contract_handler, key, state, put_result).await?;
if let Err(error) = contract_handler
.channel()
.send_to_sender(id, event_result)
.await
{
tracing::debug!(
error = %error,
contract = %key,
"Failed to send PUT response (client may have disconnected)"
);
}
}
ContractHandlerEvent::UpdateQuery {
key,
data,
mut related_contracts,
} => {
let update_value: Either<WrappedState, StateDelta<'static>> = match data {
freenet_stdlib::prelude::UpdateData::State(state) => {
Either::Left(WrappedState::from(state.into_bytes()))
}
freenet_stdlib::prelude::UpdateData::Delta(delta) => Either::Right(delta),
freenet_stdlib::prelude::UpdateData::StateAndDelta { state, .. } => {
Either::Left(WrappedState::from(state.into_bytes()))
}
freenet_stdlib::prelude::UpdateData::RelatedStateAndDelta {
related_to,
state,
delta,
} => {
inject_related_state(&mut related_contracts, related_to, state);
Either::Right(delta)
}
freenet_stdlib::prelude::UpdateData::RelatedState { .. }
| freenet_stdlib::prelude::UpdateData::RelatedDelta { .. } => {
let err = ExecutorError::other(anyhow::anyhow!(
"RelatedState / RelatedDelta UpdateData variants are not \
accepted directly via ContractRequest::Update — they are \
reserved for the runtime's request-related orchestration"
));
if let Err(send_err) = contract_handler
.channel()
.send_to_sender(
id,
ContractHandlerEvent::UpdateResponse {
new_value: Err(err),
state_changed: false,
},
)
.await
{
tracing::debug!(error = %send_err, contract = %key, "Failed to send rejection");
}
return Ok(());
}
_ => {
let err = ExecutorError::other(anyhow::anyhow!(
"Unknown UpdateData variant reached \
ContractHandlerEvent::UpdateQuery; add explicit \
handling before landing the new variant"
));
if let Err(send_err) = contract_handler
.channel()
.send_to_sender(
id,
ContractHandlerEvent::UpdateResponse {
new_value: Err(err),
state_changed: false,
},
)
.await
{
tracing::debug!(error = %send_err, contract = %key, "Failed to send rejection");
}
return Ok(());
}
};
let update_result = match maybe_defer_upsert(
contract_handler,
deferral,
&id,
key,
update_value,
related_contracts,
None,
false,
)
.await
{
DeferStep::Deferred => return Ok(()),
DeferStep::Completed(result) => result,
};
let event_result =
update_response_from_result(contract_handler, key, update_result).await?;
if let Err(error) = contract_handler
.channel()
.send_to_sender(id, event_result)
.await
{
tracing::debug!(
error = %error,
contract = %key,
"Failed to send UPDATE response (client may have disconnected)"
);
}
}
ContractHandlerEvent::DelegateRequest {
req,
origin_contract,
} => {
let delegate_key = req.key().clone();
tracing::debug!(
delegate_key = %delegate_key,
?origin_contract,
"Processing delegate request"
);
let response = handle_delegate_with_contract_requests(
contract_handler,
req,
origin_contract.as_ref(),
&delegate_key,
prompter,
)
.await;
if let Err(error) = contract_handler
.channel()
.send_to_sender(id, ContractHandlerEvent::DelegateResponse(response))
.await
{
tracing::debug!(
error = %error,
delegate_key = %delegate_key,
"Failed to send DELEGATE response (client may have disconnected)"
);
}
}
ContractHandlerEvent::RegisterSubscriberListener {
key,
client_id,
summary,
subscriber_listener,
} => {
if let Err(err) = contract_handler.executor().register_contract_notifier(
key,
client_id,
subscriber_listener,
summary,
) {
tracing::warn!(
contract = %key,
client = %client_id,
error = %err,
phase = "registration_failed",
"Error registering subscriber listener"
);
}
if let Err(error) = contract_handler
.channel()
.send_to_sender(id, ContractHandlerEvent::RegisterSubscriberListenerResponse)
.await
{
tracing::debug!(
error = %error,
contract = %key,
"Failed to send RegisterSubscriberListener response (client may have disconnected)"
);
}
}
ContractHandlerEvent::QuerySubscriptions { callback } => {
let subscriptions = contract_handler.executor().get_subscription_info();
let connections = vec![]; let network_debug = crate::message::NetworkDebugInfo {
application_subscriptions: subscriptions,
network_subscriptions: vec![], connected_peers: connections,
};
if let Err(e) = callback
.send(crate::message::QueryResult::NetworkDebug(network_debug))
.await
{
tracing::debug!(error = %e, "failed to send network debug info via callback");
}
if let Err(error) = contract_handler
.channel()
.send_to_sender(id, ContractHandlerEvent::QuerySubscriptionsResponse)
.await
{
tracing::debug!(
error = %error,
"Failed to send QuerySubscriptions response (client may have disconnected)"
);
}
}
ContractHandlerEvent::GetSummaryQuery { key } => {
let summary = contract_handler
.executor()
.summarize_contract_state(key)
.instrument(tracing::info_span!("summarize_contract_state", %key))
.await;
if let Err(error) = contract_handler
.channel()
.send_to_sender(
id,
ContractHandlerEvent::GetSummaryResponse { key, summary },
)
.await
{
tracing::debug!(
error = %error,
contract = %key,
"Failed to send GetSummary response (client may have disconnected)"
);
}
}
ContractHandlerEvent::GetDeltaQuery { key, their_summary } => {
let delta = contract_handler
.executor()
.get_contract_state_delta(key, their_summary)
.instrument(tracing::info_span!("get_contract_state_delta", %key))
.await;
if let Err(error) = contract_handler
.channel()
.send_to_sender(id, ContractHandlerEvent::GetDeltaResponse { key, delta })
.await
{
tracing::debug!(
error = %error,
contract = %key,
"Failed to send GetDelta response (client may have disconnected)"
);
}
}
ContractHandlerEvent::ClientDisconnect { client_id } => {
contract_handler.executor().remove_client(client_id);
contract_handler.channel().drop_waiting_response(id);
}
ContractHandlerEvent::EvictContract {
key,
expected_generation,
} => {
match contract_handler
.executor()
.remove_contract(&key, expected_generation)
.await
{
Ok(()) => {
tracing::info!(contract = %key, "Reclaimed on-disk storage for evicted contract");
}
Err(error) => {
tracing::warn!(
contract = %key,
error = %error,
"Failed to reclaim on-disk storage for evicted contract"
);
}
}
contract_handler.channel().drop_waiting_response(id);
}
ContractHandlerEvent::DelegateResponse(_)
| ContractHandlerEvent::PutResponse { .. }
| ContractHandlerEvent::GetResponse { .. }
| ContractHandlerEvent::UpdateResponse { .. }
| ContractHandlerEvent::UpdateNoChange { .. }
| ContractHandlerEvent::RegisterSubscriberListenerResponse
| ContractHandlerEvent::QuerySubscriptionsResponse
| ContractHandlerEvent::GetSummaryResponse { .. }
| ContractHandlerEvent::GetDeltaResponse { .. } => {
unreachable!("response events should not be received by the handler")
}
}
Ok(())
}
const DEBUG_SECTION_PREFIX: &str = ".debug_";
pub(crate) fn debug_sections(wasm: &[u8]) -> Vec<String> {
let mut found = Vec::new();
for payload in wasmparser::Parser::new(0).parse_all(wasm) {
match payload {
Ok(wasmparser::Payload::CustomSection(reader)) => {
let name = reader.name();
if name.starts_with(DEBUG_SECTION_PREFIX) {
found.push(name.to_string());
}
}
Err(_) => return Vec::new(),
Ok(_) => {}
}
}
found
}
pub(crate) fn contains_debug_sections(wasm: &[u8]) -> bool {
!debug_sections(wasm).is_empty()
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum ContractError {
#[error("handler channel dropped")]
ChannelDropped(Box<ContractHandlerEvent>),
#[error("{0}")]
IOError(#[from] std::io::Error),
#[error("no response received from handler")]
NoEvHandlerResponse,
#[error("fatal executor error for contract {key}: {error}")]
FatalExecutorError {
key: ContractKey,
error: ExecutorError,
},
#[error(
"contract appears to be compiled in debug mode \
(contains {sections} section(s)). Debug WASM is typically \
10-100x larger than release builds and may exceed message-size \
limits. Recompile the contract with `--release` before publishing."
)]
DebugWasmRejected { sections: String },
}
#[cfg(test)]
#[allow(clippy::wildcard_enum_match_arm)]
mod tests {
use super::*;
use crate::config::GlobalExecutor;
use std::time::Duration;
#[test]
fn send_queue_full_response_logs_at_debug_not_warn_pin_test() {
let path = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("src/contract.rs");
let source = std::fs::read_to_string(&path)
.unwrap_or_else(|e| panic!("must read own source at {}: {e}", path.display()));
let needle = "Rejected event due to per-contract queue capacity limit";
let idx = source
.find(needle)
.expect("rejection log message must still exist in source");
let preceding = &source[..idx];
let macro_idx = preceding
.rfind("tracing::")
.expect("a tracing macro must precede the rejection log site");
let line_start = preceding[..macro_idx].rfind('\n').map_or(0, |n| n + 1);
let line_prefix = &preceding[line_start..macro_idx];
assert!(
line_prefix.chars().all(char::is_whitespace),
"rfind matched `tracing::` inside a string literal or comment, \
not a macro invocation. Prefix on its line: {line_prefix:?}"
);
let after_macro = &preceding[macro_idx + "tracing::".len()..];
let macro_name = after_macro.split('!').next().unwrap_or("");
let tail_start = preceding
.char_indices()
.map(|(i, _)| i)
.find(|&i| preceding.len() - i <= 200)
.unwrap_or(0);
let context = &preceding[tail_start..];
assert_eq!(
macro_name, "debug",
"Rejected-event log site must be at DEBUG, not WARN/INFO \
(closest preceding macro is `tracing::{macro_name}!`). \
Re-promotion restores the issue #4251 log-volume regression.\n\
Preceding source (last 200 bytes):\n{context}"
);
}
#[test]
fn put_update_arms_route_through_maybe_defer_upsert_pin_test() {
let path = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("src/contract.rs");
let source = std::fs::read_to_string(&path)
.unwrap_or_else(|e| panic!("must read own source at {}: {e}", path.display()));
let body = source
.split("async fn handle_contract_event")
.nth(1)
.expect("handle_contract_event must exist");
let put_arm = body
.split("ContractHandlerEvent::PutQuery {")
.nth(1)
.expect("PutQuery arm must exist");
let put_arm = put_arm
.split("ContractHandlerEvent::UpdateQuery {")
.next()
.expect("PutQuery arm bounded by UpdateQuery arm");
assert!(
put_arm.contains("maybe_defer_upsert"),
"PUT arm must route its upsert through maybe_defer_upsert (#4391); \
re-inlining upsert_contract_state restores the HoL stall"
);
let update_arm = body
.split("ContractHandlerEvent::UpdateQuery {")
.nth(1)
.expect("UpdateQuery arm must exist");
let update_arm = update_arm
.split("ContractHandlerEvent::DelegateRequest {")
.next()
.expect("UpdateQuery arm bounded by DelegateRequest arm");
assert!(
update_arm.contains("maybe_defer_upsert"),
"UPDATE arm must route its upsert through maybe_defer_upsert (#4391)"
);
}
fn make_contract_key() -> ContractKey {
let code = ContractCode::from(vec![42u8; 32]);
let params = Parameters::from(vec![7u8; 8]);
ContractKey::from_params_and_code(¶ms, &code)
}
#[test]
fn test_caller_identity_from_origin() {
assert_eq!(caller_identity_from_origin(None), CallerIdentity::None);
let key = make_contract_key();
let id = *key.id();
let mapped = caller_identity_from_origin(Some(&id));
assert_eq!(mapped, CallerIdentity::WebApp(id.to_string()));
match mapped {
CallerIdentity::WebApp(s) => assert!(!s.is_empty()),
other => panic!("expected WebApp, got {other:?}"),
}
}
#[test]
fn inject_related_state_writes_inline_state_to_slot() {
let related_id = *make_contract_key().id();
let inline_state = freenet_stdlib::prelude::State::from(vec![1, 2, 3, 4]);
let mut related = RelatedContracts::default();
inject_related_state(&mut related, related_id, inline_state.clone());
let states: Vec<_> = related.states().collect();
assert_eq!(states.len(), 1, "expected exactly one related entry");
let (id, slot) = states[0];
assert_eq!(id, &related_id);
let stored = slot.as_ref().expect("slot must hold the inline state");
assert_eq!(stored.as_ref(), inline_state.as_ref());
}
#[test]
fn inject_related_state_overrides_existing_slot() {
let related_id = *make_contract_key().id();
let prior = freenet_stdlib::prelude::State::from(vec![9, 9]);
let inline = freenet_stdlib::prelude::State::from(vec![1, 2, 3]);
let mut map = std::collections::HashMap::new();
map.insert(related_id, Some(prior));
let mut related = RelatedContracts::from(map);
inject_related_state(&mut related, related_id, inline.clone());
let states: Vec<_> = related.states().collect();
assert_eq!(states.len(), 1);
let stored = states[0].1.as_ref().expect("slot must be Some");
assert_eq!(
stored.as_ref(),
inline.as_ref(),
"inline state must override pre-existing slot"
);
}
async fn setup_rejected_event(
send_halve: handler::ContractHandlerChannel<handler::SenderHalve>,
rcv_halve: &mut handler::ContractHandlerChannel<handler::ContractHandlerHalve>,
event: ContractHandlerEvent,
) -> (
Box<fair_queue::RejectedEvent>,
tokio::task::JoinHandle<Result<ContractHandlerEvent, anyhow::Error>>,
) {
let handle = GlobalExecutor::spawn(async move {
send_halve
.send_to_handler(event)
.await
.map_err(|e| anyhow::anyhow!("{e}"))
});
let (id, received_event) =
tokio::time::timeout(Duration::from_millis(200), rcv_halve.recv_from_sender())
.await
.expect("timeout waiting for event")
.expect("channel should be open");
let rejected = Box::new(fair_queue::RejectedEvent {
id,
event: received_event,
});
(rejected, handle)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn send_queue_full_response_put_query() {
let (send_halve, mut rcv_halve, _) = handler::contract_handler_channel();
let key = make_contract_key();
let (rejected, handle) = setup_rejected_event(
send_halve,
&mut rcv_halve,
ContractHandlerEvent::PutQuery {
key,
state: WrappedState::new(vec![1, 2, 3]),
related_contracts: RelatedContracts::default(),
contract: None,
},
)
.await;
send_queue_full_response(&mut rcv_halve, rejected).await;
let response = tokio::time::timeout(Duration::from_millis(200), handle)
.await
.expect("timeout")
.expect("task should complete")
.expect("should get response");
match response {
ContractHandlerEvent::PutResponse {
new_value,
state_changed,
} => {
assert!(new_value.is_err(), "should be an error response");
assert!(!state_changed);
}
other => panic!("expected PutResponse, got {other}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn send_queue_full_response_get_query() {
let (send_halve, mut rcv_halve, _) = handler::contract_handler_channel();
let key = make_contract_key();
let (rejected, handle) = setup_rejected_event(
send_halve,
&mut rcv_halve,
ContractHandlerEvent::GetQuery {
instance_id: *key.id(),
return_contract_code: false,
},
)
.await;
send_queue_full_response(&mut rcv_halve, rejected).await;
let response = tokio::time::timeout(Duration::from_millis(200), handle)
.await
.expect("timeout")
.expect("task should complete")
.expect("should get response");
match response {
ContractHandlerEvent::GetResponse { response, .. } => {
assert!(response.is_err(), "should be an error response");
}
other => panic!("expected GetResponse, got {other}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn send_queue_full_response_update_query() {
let (send_halve, mut rcv_halve, _) = handler::contract_handler_channel();
let key = make_contract_key();
let (rejected, handle) = setup_rejected_event(
send_halve,
&mut rcv_halve,
ContractHandlerEvent::UpdateQuery {
key,
data: UpdateData::Delta(StateDelta::from(vec![1])),
related_contracts: RelatedContracts::default(),
},
)
.await;
send_queue_full_response(&mut rcv_halve, rejected).await;
let response = tokio::time::timeout(Duration::from_millis(200), handle)
.await
.expect("timeout")
.expect("task should complete")
.expect("should get response");
match response {
ContractHandlerEvent::UpdateResponse {
new_value,
state_changed,
} => {
assert!(new_value.is_err(), "should be an error response");
assert!(!state_changed);
}
other => panic!("expected UpdateResponse, got {other}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn send_queue_full_response_get_summary_query() {
let (send_halve, mut rcv_halve, _) = handler::contract_handler_channel();
let key = make_contract_key();
let (rejected, handle) = setup_rejected_event(
send_halve,
&mut rcv_halve,
ContractHandlerEvent::GetSummaryQuery { key },
)
.await;
send_queue_full_response(&mut rcv_halve, rejected).await;
let response = tokio::time::timeout(Duration::from_millis(200), handle)
.await
.expect("timeout")
.expect("task should complete")
.expect("should get response");
match response {
ContractHandlerEvent::GetSummaryResponse {
key: resp_key,
summary,
} => {
assert_eq!(resp_key, key);
assert!(summary.is_err(), "should be an error response");
}
other => panic!("expected GetSummaryResponse, got {other}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn send_queue_full_response_get_delta_query() {
let (send_halve, mut rcv_halve, _) = handler::contract_handler_channel();
let key = make_contract_key();
let (rejected, handle) = setup_rejected_event(
send_halve,
&mut rcv_halve,
ContractHandlerEvent::GetDeltaQuery {
key,
their_summary: StateSummary::from(vec![1, 2]),
},
)
.await;
send_queue_full_response(&mut rcv_halve, rejected).await;
let response = tokio::time::timeout(Duration::from_millis(200), handle)
.await
.expect("timeout")
.expect("task should complete")
.expect("should get response");
match response {
ContractHandlerEvent::GetDeltaResponse {
key: resp_key,
delta,
} => {
assert_eq!(resp_key, key);
assert!(delta.is_err(), "should be an error response");
}
other => panic!("expected GetDeltaResponse, got {other}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn send_queue_full_response_carries_typed_queue_full_marker() {
let (send_halve, mut rcv_halve, _) = handler::contract_handler_channel();
let key = make_contract_key();
let (rejected, handle) = setup_rejected_event(
send_halve,
&mut rcv_halve,
ContractHandlerEvent::UpdateQuery {
key,
data: UpdateData::Delta(StateDelta::from(vec![1])),
related_contracts: RelatedContracts::default(),
},
)
.await;
send_queue_full_response(&mut rcv_halve, rejected).await;
let response = tokio::time::timeout(Duration::from_millis(200), handle)
.await
.expect("timeout")
.expect("task should complete")
.expect("should get response");
match response {
ContractHandlerEvent::UpdateResponse { new_value, .. } => {
let err = new_value.expect_err("UpdateResponse should carry an error");
assert!(
err.is_contract_queue_full(),
"queue-full response MUST be classified by is_contract_queue_full; \
got err = {err:?}. If you removed this classification, you have also \
re-enabled the auto-fetch / ResyncRequest amplification storm — see \
issue #4251."
);
}
other => panic!("expected UpdateResponse, got {other}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn send_queue_full_response_fire_and_forget_cleans_waiting_response() {
let (send_halve, mut rcv_halve, _) = handler::contract_handler_channel();
let key = make_contract_key();
let (rejected, handle) = setup_rejected_event(
send_halve,
&mut rcv_halve,
ContractHandlerEvent::RegisterSubscriberListener {
key: *key.id(),
client_id: crate::client_events::ClientId::next(),
summary: None,
subscriber_listener: tokio::sync::mpsc::channel(64).0,
},
)
.await;
assert!(
rcv_halve.has_waiting_response(&rejected.id),
"waiting_response should contain the event"
);
let rejected_id = handler::EventId { id: rejected.id.id };
send_queue_full_response(&mut rcv_halve, rejected).await;
assert!(
!rcv_halve.has_waiting_response(&rejected_id),
"waiting_response should be cleaned up after rejection"
);
let result = tokio::time::timeout(Duration::from_millis(200), handle)
.await
.expect("timeout")
.expect("task should complete");
assert!(
result.is_err(),
"fire-and-forget rejection should produce an error"
);
}
async fn dispatch_update_query_e2e(
handler: &mut MemoryContractHandler,
send_halve: &handler::ContractHandlerChannel<handler::SenderHalve>,
key: ContractKey,
data: UpdateData<'static>,
related_contracts: RelatedContracts<'static>,
) -> ContractHandlerEvent {
let event = ContractHandlerEvent::UpdateQuery {
key,
data,
related_contracts,
};
let send_fut = send_halve.send_to_handler(event);
let recv_fut = async {
let (id, received) = handler
.channel()
.recv_from_sender()
.await
.expect("handler channel should be open");
handle_contract_event(
handler,
id,
received,
&user_input::AutoApprovePrompter,
None,
)
.await
.expect("dispatch must not error for a well-formed UpdateQuery");
};
let (send_res, ()) = tokio::join!(send_fut, recv_fut);
send_res.expect("sender must receive a response")
}
async fn seed_contract_state(
handler: &mut MemoryContractHandler,
send_halve: &handler::ContractHandlerChannel<handler::SenderHalve>,
contract: ContractContainer,
initial: WrappedState,
) {
let key = contract.key();
let event = ContractHandlerEvent::PutQuery {
key,
state: initial,
related_contracts: RelatedContracts::default(),
contract: Some(contract),
};
let send_fut = send_halve.send_to_handler(event);
let recv_fut = async {
let (id, received) = handler
.channel()
.recv_from_sender()
.await
.expect("handler channel should be open");
handle_contract_event(
handler,
id,
received,
&user_input::AutoApprovePrompter,
None,
)
.await
.expect("seed PutQuery must not error");
};
let (send_res, ()) = tokio::join!(send_fut, recv_fut);
match send_res.expect("seed PutQuery must respond") {
ContractHandlerEvent::PutResponse { new_value, .. } => {
new_value.expect("seed PutQuery must store state");
}
other => panic!("expected PutResponse from seed, got {other}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn update_query_state_and_delta_dispatch_prefers_full_state_e2e() {
let (send_halve, rcv_halve, _) = handler::contract_handler_channel();
let mut handler = MemoryContractHandler::new(
rcv_halve,
None,
"update_query_state_and_delta_dispatch_prefers_full_state",
)
.await;
let contract = executor::mock_runtime::test::create_test_contract(b"sad_e2e");
let key = contract.key();
let candidate_a = WrappedState::new(vec![0xAA; 16]);
let candidate_b = WrappedState::new(vec![0xBB; 16]);
let (seed_state, winning_full_state) = if blake3::hash(candidate_a.as_ref()).as_bytes()
< blake3::hash(candidate_b.as_ref()).as_bytes()
{
(candidate_a, candidate_b)
} else {
(candidate_b, candidate_a)
};
seed_contract_state(&mut handler, &send_halve, contract, seed_state.clone()).await;
let distinct_delta = StateDelta::from(vec![0xCC; 8]);
let data = UpdateData::StateAndDelta {
state: freenet_stdlib::prelude::State::from(winning_full_state.as_ref().to_vec()),
delta: distinct_delta,
};
let response = dispatch_update_query_e2e(
&mut handler,
&send_halve,
key,
data,
RelatedContracts::default(),
)
.await;
match response {
ContractHandlerEvent::UpdateResponse {
new_value,
state_changed,
} => {
let stored = new_value.expect("StateAndDelta update must succeed");
assert_eq!(
stored.as_ref(),
winning_full_state.as_ref(),
"dispatch must store the full state, not seed++delta — \
a regression here means the conversion arm routed the \
delta instead of the authoritative full state"
);
assert!(
state_changed,
"full state differs from seed, so the merge changed state"
);
}
other => panic!("expected UpdateResponse, got {other}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn update_query_related_state_and_delta_dispatch_routes_through_helper_e2e() {
let (send_halve, rcv_halve, _) = handler::contract_handler_channel();
let mut handler = MemoryContractHandler::new(
rcv_halve,
None,
"update_query_related_state_and_delta_dispatch_routes_through_helper",
)
.await;
let contract = executor::mock_runtime::test::create_test_contract(b"rsad_e2e");
let key = contract.key();
let related_contract = executor::mock_runtime::test::create_test_contract(b"rsad_related");
let related_id = *related_contract.key().id();
let seed_state = WrappedState::new(vec![1, 2, 3]);
seed_contract_state(&mut handler, &send_halve, contract, seed_state.clone()).await;
let seed_hash = *blake3::hash(seed_state.as_ref()).as_bytes();
let delta_bytes = (0u8..=255)
.map(|n| vec![n; 4])
.find(|candidate| {
let mut merged = seed_state.as_ref().to_vec();
merged.extend_from_slice(candidate);
*blake3::hash(&merged).as_bytes() > seed_hash
})
.expect("some 4-byte delta must produce a winning concatenation hash");
let data = UpdateData::RelatedStateAndDelta {
related_to: related_id,
state: freenet_stdlib::prelude::State::from(vec![9, 9, 9]),
delta: StateDelta::from(delta_bytes.clone()),
};
let response = dispatch_update_query_e2e(
&mut handler,
&send_halve,
key,
data,
RelatedContracts::default(),
)
.await;
match response {
ContractHandlerEvent::UpdateResponse { new_value, .. } => {
let stored = new_value.expect(
"RelatedStateAndDelta must reach the executor and apply the \
delta — an Err here (or a panic) means the dispatch arm \
regressed to rejecting/unreachable!() the variant",
);
assert!(
stored.as_ref().ends_with(&delta_bytes),
"stored state {:?} must end with the delta bytes {:?}, \
proving the delta arm of RelatedStateAndDelta was used",
stored.as_ref(),
delta_bytes
);
}
other => panic!("expected UpdateResponse, got {other}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn update_query_rejection_arms_return_structured_error_e2e() {
let (send_halve, rcv_halve, _) = handler::contract_handler_channel();
let mut handler = MemoryContractHandler::new(
rcv_halve,
None,
"update_query_rejection_arms_return_structured_error",
)
.await;
let contract = executor::mock_runtime::test::create_test_contract(b"reject_e2e");
let key = contract.key();
let related_id = *executor::mock_runtime::test::create_test_contract(b"reject_related")
.key()
.id();
seed_contract_state(
&mut handler,
&send_halve,
contract,
WrappedState::new(vec![7, 7, 7]),
)
.await;
let rejection_variants = [
UpdateData::RelatedState {
related_to: related_id,
state: freenet_stdlib::prelude::State::from(vec![1, 1]),
},
UpdateData::RelatedDelta {
related_to: related_id,
delta: StateDelta::from(vec![2, 2]),
},
];
for data in rejection_variants {
let label = format!("{data:?}");
let response = dispatch_update_query_e2e(
&mut handler,
&send_halve,
key,
data,
RelatedContracts::default(),
)
.await;
match response {
ContractHandlerEvent::UpdateResponse {
new_value,
state_changed,
} => {
assert!(
new_value.is_err(),
"{label} must be rejected with a structured error, \
not applied as an update"
);
assert!(
!state_changed,
"{label} rejection must report state_changed = false"
);
}
other => panic!("expected UpdateResponse for {label}, got {other}"),
}
}
}
}
#[cfg(test)]
#[allow(clippy::wildcard_enum_match_arm)]
#[allow(clippy::let_underscore_must_use)]
mod hol_4391_tests {
use super::*;
use crate::config::GlobalExecutor;
use crate::contract::executor::mock_wasm_runtime::ValidateOverride;
use std::sync::Arc;
use std::time::Duration;
static TEST_GUARD: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
struct OverrideGuard;
impl OverrideGuard {
fn install(stub: OffLoopFetchStub) -> Self {
set_off_loop_fetch_override(Some(stub));
OverrideGuard
}
}
impl Drop for OverrideGuard {
fn drop(&mut self) {
set_off_loop_fetch_override(None);
}
}
fn make_contract(code_bytes: &[u8]) -> ContractContainer {
let code = ContractCode::from(code_bytes.to_vec());
let params = Parameters::from(vec![]);
ContractContainer::Wasm(ContractWasmAPIVersion::V1(WrappedContract::new(
Arc::new(code),
params,
)))
}
async fn build_handler(
overrides: Vec<(ContractInstanceId, ValidateOverride)>,
) -> (
MockWasmContractHandler,
handler::ContractHandlerChannel<handler::SenderHalve>,
) {
let (send_halve, rcv_halve, _) = handler::contract_handler_channel();
let mut handler = MockWasmContractHandler::new_test(rcv_halve, None, "hol_4391").await;
for (id, ov) in overrides {
handler.runtime_mut().validate_overrides.insert(id, ov);
}
(handler, send_halve)
}
async fn put_local(
send: &handler::ContractHandlerChannel<handler::SenderHalve>,
contract: ContractContainer,
state: WrappedState,
) -> ContractHandlerEvent {
send.send_to_handler(ContractHandlerEvent::PutQuery {
key: contract.key(),
state,
related_contracts: RelatedContracts::default(),
contract: Some(contract),
})
.await
.expect("PUT must respond")
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn deferred_related_fetch_does_not_block_local_get() {
let _guard = TEST_GUARD.lock().await;
let contract_a = make_contract(b"hol_a_blocking");
let key_a = contract_a.key();
let id_b = *make_contract(b"hol_b_missing").key().id();
let (handler, send) = build_handler(vec![(
*key_a.id(),
ValidateOverride::RequestRelated(vec![id_b]),
)])
.await;
let gate = Arc::new(tokio::sync::Notify::new());
let gate_for_stub = gate.clone();
set_off_loop_fetch_override(Some(Arc::new(move |missing| {
let gate = gate_for_stub.clone();
Box::pin(async move {
gate.notified().await;
Err(ExecutorError::missing_related(missing[0]))
})
})));
let contract_c = make_contract(b"hol_c_cached");
let key_c = contract_c.key();
let send = Arc::new(send);
let handle = GlobalExecutor::spawn(contract_handling(
handler,
crate::contract::user_input::AutoApprovePrompter,
));
let put_c = put_local(
send.as_ref(),
contract_c,
WrappedState::new(b"c_state".to_vec()),
)
.await;
assert!(
matches!(
put_c,
ContractHandlerEvent::PutResponse {
new_value: Ok(_),
..
}
),
"seed PUT for C must succeed, got {put_c}"
);
let send_a = send.clone();
let a_state = WrappedState::new(b"a_state".to_vec());
let a_task: tokio::task::JoinHandle<Result<ContractHandlerEvent, ContractError>> =
GlobalExecutor::spawn(async move {
send_a
.send_to_handler(ContractHandlerEvent::PutQuery {
key: key_a,
state: a_state,
related_contracts: RelatedContracts::default(),
contract: Some(contract_a),
})
.await
});
tokio::time::sleep(Duration::from_millis(100)).await;
let get_c = tokio::time::timeout(
Duration::from_secs(2),
send.send_to_handler(ContractHandlerEvent::GetQuery {
instance_id: *key_c.id(),
return_contract_code: false,
}),
)
.await
.expect("GET for cached C must NOT block behind A's deferred network fetch (#4391)")
.expect("GET for C must respond");
match get_c {
ContractHandlerEvent::GetResponse { response, .. } => {
let store = response.expect("GET for C must succeed");
assert_eq!(
store.state.expect("C has state").as_ref(),
b"c_state",
"GET must return C's stored state"
);
}
other => panic!("expected GetResponse for C, got {other}"),
}
gate.notify_one();
let a_resp = tokio::time::timeout(Duration::from_secs(5), a_task)
.await
.expect("A's PUT must eventually resolve after the fetch completes")
.expect("A task join")
.expect("A's PUT must respond");
match a_resp {
ContractHandlerEvent::PutResponse { new_value, .. } => {
assert!(
new_value.is_err(),
"A's PUT must surface MissingRelated once the off-loop fetch fails"
);
}
other => panic!("expected PutResponse for A, got {other}"),
}
set_off_loop_fetch_override(None);
handle.abort();
}
#[test]
fn off_loop_related_fetch_budget_is_at_least_operation_ttl() {
assert!(
DEFERRED_RELATED_FETCH_TIMEOUT >= crate::config::OPERATION_TTL,
"off-loop related-fetch budget {:?} must be >= GET OPERATION_TTL {:?} \
so a slow-but-successful related GET is not failed prematurely (#4391)",
DEFERRED_RELATED_FETCH_TIMEOUT,
crate::config::OPERATION_TTL,
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn concurrent_same_key_deferrals_both_clients_answered() {
let _guard = TEST_GUARD.lock().await;
let key_seed = b"concurrent_a";
let key_a = make_contract(key_seed).key();
let id_b = *make_contract(b"concurrent_b").key().id();
let b_state = WrappedState::new(b"concurrent_b_state".to_vec());
let (handler, send) = build_handler(vec![(
*key_a.id(),
ValidateOverride::RequestRelated(vec![id_b]),
)])
.await;
let sem = Arc::new(tokio::sync::Semaphore::new(0));
let sem_for_stub = sem.clone();
let b_for_stub = b_state.clone();
set_off_loop_fetch_override(Some(Arc::new(move |missing| {
let sem = sem_for_stub.clone();
let b = b_for_stub.clone();
Box::pin(async move {
let _permit = sem.acquire_owned().await.expect("sem open");
Ok(vec![(missing[0], b.clone())])
})
})));
let send = Arc::new(send);
let handle = GlobalExecutor::spawn(contract_handling(
handler,
crate::contract::user_input::AutoApprovePrompter,
));
let send1 = send.clone();
let contract1 = make_contract(key_seed);
let put1 = GlobalExecutor::spawn(async move {
send1
.send_to_handler(ContractHandlerEvent::PutQuery {
key: contract1.key(),
state: WrappedState::new(b"a_state_1".to_vec()),
related_contracts: RelatedContracts::default(),
contract: Some(contract1),
})
.await
});
let send2 = send.clone();
let contract2 = make_contract(key_seed);
let put2 = GlobalExecutor::spawn(async move {
send2
.send_to_handler(ContractHandlerEvent::PutQuery {
key: contract2.key(),
state: WrappedState::new(b"a_state_2".to_vec()),
related_contracts: RelatedContracts::default(),
contract: Some(contract2),
})
.await
});
tokio::time::sleep(Duration::from_millis(200)).await;
sem.add_permits(2);
let r1 = tokio::time::timeout(Duration::from_secs(5), put1)
.await
.expect("put1 must not hang")
.expect("put1 join")
.expect("put1 responds");
let r2 = tokio::time::timeout(Duration::from_secs(5), put2)
.await
.expect("put2 must not hang")
.expect("put2 join")
.expect("put2 responds");
assert!(
matches!(r1, ContractHandlerEvent::PutResponse { .. }),
"put1 client must get its own PutResponse, got {r1}"
);
assert!(
matches!(r2, ContractHandlerEvent::PutResponse { .. }),
"put2 client must get its own PutResponse, got {r2}"
);
set_off_loop_fetch_override(None);
handle.abort();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn same_key_get_during_deferred_put_runs_promptly() {
let _guard = TEST_GUARD.lock().await;
let contract_a = make_contract(b"reorder_a");
let key_a = contract_a.key();
let id_b = *make_contract(b"reorder_b").key().id();
let (handler, send) = build_handler(vec![(
*key_a.id(),
ValidateOverride::RequestRelated(vec![id_b]),
)])
.await;
let gate = Arc::new(tokio::sync::Notify::new());
let gate_for_stub = gate.clone();
set_off_loop_fetch_override(Some(Arc::new(move |missing| {
let gate = gate_for_stub.clone();
Box::pin(async move {
gate.notified().await;
Err(ExecutorError::missing_related(missing[0]))
})
})));
let send = Arc::new(send);
let handle = GlobalExecutor::spawn(contract_handling(
handler,
crate::contract::user_input::AutoApprovePrompter,
));
let send_a = send.clone();
let a_task: tokio::task::JoinHandle<Result<ContractHandlerEvent, ContractError>> =
GlobalExecutor::spawn(async move {
send_a
.send_to_handler(ContractHandlerEvent::PutQuery {
key: key_a,
state: WrappedState::new(b"a_state".to_vec()),
related_contracts: RelatedContracts::default(),
contract: Some(contract_a),
})
.await
});
tokio::time::sleep(Duration::from_millis(100)).await;
let get_a = tokio::time::timeout(
Duration::from_secs(2),
send.send_to_handler(ContractHandlerEvent::GetQuery {
instance_id: *key_a.id(),
return_contract_code: false,
}),
)
.await
.expect("same-key GET must NOT be held behind the deferring PUT (#4391 round 5)")
.expect("GET for A must respond");
match get_a {
ContractHandlerEvent::GetResponse { .. } => {
}
other => panic!("expected GetResponse for A, got {other}"),
}
gate.notify_one();
let a_resp = tokio::time::timeout(Duration::from_secs(5), a_task)
.await
.expect("A's PUT must resolve after the fetch completes")
.expect("A task join")
.expect("A's PUT must respond");
assert!(
matches!(a_resp, ContractHandlerEvent::PutResponse { .. }),
"A's PUT must still receive its own response on resume"
);
set_off_loop_fetch_override(None);
handle.abort();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn deferred_related_fetch_success_resumes_and_completes() {
let _guard = TEST_GUARD.lock().await;
let contract_a = make_contract(b"hol_resume_a");
let key_a = contract_a.key();
let id_b = *make_contract(b"hol_resume_b").key().id();
let (handler, send) = build_handler(vec![(
*key_a.id(),
ValidateOverride::RequestRelated(vec![id_b]),
)])
.await;
set_off_loop_fetch_override(Some(Arc::new(move |missing| {
Box::pin(async move {
Ok(missing
.into_iter()
.map(|id| (id, WrappedState::new(b"fetched_b".to_vec())))
.collect())
})
})));
let handle = GlobalExecutor::spawn(contract_handling(
handler,
crate::contract::user_input::AutoApprovePrompter,
));
let resp = tokio::time::timeout(
Duration::from_secs(5),
send.send_to_handler(ContractHandlerEvent::PutQuery {
key: key_a,
state: WrappedState::new(b"a_state".to_vec()),
related_contracts: RelatedContracts::default(),
contract: Some(contract_a),
}),
)
.await
.expect("deferred PUT must resume and respond")
.expect("PUT must respond");
match resp {
ContractHandlerEvent::PutResponse { new_value, .. } => {
new_value.expect("deferred PUT must succeed once B is supplied off-loop");
}
other => panic!("expected PutResponse, got {other}"),
}
set_off_loop_fetch_override(None);
handle.abort();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn deferred_related_fetch_failure_surfaces_missing_related() {
let _guard = TEST_GUARD.lock().await;
let contract_a = make_contract(b"hol_fail_a");
let key_a = contract_a.key();
let id_b = *make_contract(b"hol_fail_b").key().id();
let (handler, send) = build_handler(vec![(
*key_a.id(),
ValidateOverride::RequestRelated(vec![id_b]),
)])
.await;
set_off_loop_fetch_override(Some(Arc::new(move |missing| {
Box::pin(async move { Err(ExecutorError::missing_related(missing[0])) })
})));
let handle = GlobalExecutor::spawn(contract_handling(
handler,
crate::contract::user_input::AutoApprovePrompter,
));
let resp = tokio::time::timeout(
Duration::from_secs(5),
send.send_to_handler(ContractHandlerEvent::PutQuery {
key: key_a,
state: WrappedState::new(b"a_state".to_vec()),
related_contracts: RelatedContracts::default(),
contract: Some(contract_a),
}),
)
.await
.expect("deferred PUT must resolve (not hang) when the fetch fails")
.expect("PUT must respond");
match resp {
ContractHandlerEvent::PutResponse { new_value, .. } => {
assert!(
new_value.is_err(),
"a failed off-loop fetch must surface an error to the client"
);
}
other => panic!("expected PutResponse, got {other}"),
}
set_off_loop_fetch_override(None);
handle.abort();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn all_related_local_does_not_defer() {
let _guard = TEST_GUARD.lock().await;
let contract_b = make_contract(b"hol_local_b");
let key_b = contract_b.key();
let contract_a = make_contract(b"hol_local_a");
let key_a = contract_a.key();
let (handler, send) = build_handler(vec![(
*key_a.id(),
ValidateOverride::RequestRelated(vec![*key_b.id()]),
)])
.await;
set_off_loop_fetch_override(Some(Arc::new(|_missing| {
Box::pin(async move {
panic!("off-loop fetch must NOT be invoked when all related contracts are local");
})
})));
let handle = GlobalExecutor::spawn(contract_handling(
handler,
crate::contract::user_input::AutoApprovePrompter,
));
let put_b = put_local(&send, contract_b, WrappedState::new(b"b_state".to_vec())).await;
assert!(
matches!(
put_b,
ContractHandlerEvent::PutResponse {
new_value: Ok(_),
..
}
),
"seed PUT for B must succeed, got {put_b}"
);
let resp = tokio::time::timeout(
Duration::from_secs(5),
send.send_to_handler(ContractHandlerEvent::PutQuery {
key: key_a,
state: WrappedState::new(b"a_state".to_vec()),
related_contracts: RelatedContracts::default(),
contract: Some(contract_a),
}),
)
.await
.expect("local-resolvable PUT must complete inline")
.expect("PUT must respond");
match resp {
ContractHandlerEvent::PutResponse { new_value, .. } => {
new_value.expect("A must store successfully when B is already local");
}
other => panic!("expected PutResponse, got {other}"),
}
set_off_loop_fetch_override(None);
handle.abort();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn second_related_request_after_resume_does_not_defer_again() {
let _guard = TEST_GUARD.lock().await;
let contract_a = make_contract(b"hol_cap_a");
let key_a = contract_a.key();
let id_b = *make_contract(b"hol_cap_b").key().id();
let (handler, send) = build_handler(vec![(
*key_a.id(),
ValidateOverride::AlwaysRequestRelated(vec![id_b]),
)])
.await;
let calls = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let calls_stub = calls.clone();
set_off_loop_fetch_override(Some(Arc::new(move |missing| {
calls_stub.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Box::pin(async move {
Ok(missing
.into_iter()
.map(|id| (id, WrappedState::new(b"fetched".to_vec())))
.collect())
})
})));
let handle = GlobalExecutor::spawn(contract_handling(
handler,
crate::contract::user_input::AutoApprovePrompter,
));
let resp = tokio::time::timeout(
Duration::from_secs(5),
send.send_to_handler(ContractHandlerEvent::PutQuery {
key: key_a,
state: WrappedState::new(b"a_state".to_vec()),
related_contracts: RelatedContracts::default(),
contract: Some(contract_a),
}),
)
.await
.expect("deferred-then-resumed PUT must resolve, not loop forever")
.expect("PUT must respond");
match resp {
ContractHandlerEvent::PutResponse { new_value, .. } => {
assert!(
new_value.is_err(),
"a contract that keeps requesting related contracts must be \
rejected after one deferral (depth=1 / one-deferral cap)"
);
}
other => panic!("expected PutResponse, got {other}"),
}
assert_eq!(
calls.load(std::sync::atomic::Ordering::SeqCst),
1,
"the off-loop fetch must run exactly once — the resume must NOT \
defer again (#4391 one-deferral cap)"
);
set_off_loop_fetch_override(None);
handle.abort();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn update_path_deferral_resumes_and_applies() {
use crate::contract::executor::mock_wasm_runtime::UpdateOverride;
let _guard = TEST_GUARD.lock().await;
let contract_k = make_contract(b"upd_defer_k");
let key_k = contract_k.key();
let id_b = *make_contract(b"upd_defer_related").key().id();
let (send_halve, rcv_halve, _) = handler::contract_handler_channel();
let mut handler = MockWasmContractHandler::new_test(rcv_halve, None, "upd_defer").await;
handler
.runtime_mut()
.update_overrides
.insert(*key_k.id(), UpdateOverride::RequiresRelated(vec![id_b]));
let send = Arc::new(send_halve);
let _override =
OverrideGuard::install(Arc::new(move |missing: Vec<ContractInstanceId>| {
Box::pin(async move {
Ok(missing
.into_iter()
.map(|id| (id, WrappedState::new(b"b_state".to_vec())))
.collect())
})
}));
let handle = GlobalExecutor::spawn(contract_handling(
handler,
crate::contract::user_input::AutoApprovePrompter,
));
let put = put_local(send.as_ref(), contract_k, WrappedState::new(vec![0])).await;
assert!(matches!(
put,
ContractHandlerEvent::PutResponse {
new_value: Ok(_),
..
}
));
let resp = tokio::time::timeout(
Duration::from_secs(5),
send.send_to_handler(ContractHandlerEvent::UpdateQuery {
key: key_k,
data: freenet_stdlib::prelude::UpdateData::Delta(StateDelta::from(vec![1])),
related_contracts: RelatedContracts::default(),
}),
)
.await
.expect("deferred UPDATE must resume and respond")
.expect("UPDATE responds");
match resp {
ContractHandlerEvent::UpdateResponse { new_value, .. } => {
new_value.expect("UPDATE must succeed once B is supplied off-loop");
}
ContractHandlerEvent::UpdateNoChange { .. } => {}
other => panic!("expected UpdateResponse, got {other}"),
}
handle.abort();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn update_path_deferral_failure_surfaces_update_error() {
use crate::contract::executor::mock_wasm_runtime::UpdateOverride;
let _guard = TEST_GUARD.lock().await;
let contract_k = make_contract(b"upd_fail_k");
let key_k = contract_k.key();
let id_b = *make_contract(b"upd_fail_related").key().id();
let (send_halve, rcv_halve, _) = handler::contract_handler_channel();
let mut handler = MockWasmContractHandler::new_test(rcv_halve, None, "upd_fail").await;
handler
.runtime_mut()
.update_overrides
.insert(*key_k.id(), UpdateOverride::RequiresRelated(vec![id_b]));
let send = Arc::new(send_halve);
let _override =
OverrideGuard::install(Arc::new(move |missing: Vec<ContractInstanceId>| {
Box::pin(async move { Err(ExecutorError::missing_related(missing[0])) })
}));
let handle = GlobalExecutor::spawn(contract_handling(
handler,
crate::contract::user_input::AutoApprovePrompter,
));
let put = put_local(send.as_ref(), contract_k, WrappedState::new(vec![0])).await;
assert!(matches!(
put,
ContractHandlerEvent::PutResponse {
new_value: Ok(_),
..
}
));
let resp = tokio::time::timeout(
Duration::from_secs(5),
send.send_to_handler(ContractHandlerEvent::UpdateQuery {
key: key_k,
data: freenet_stdlib::prelude::UpdateData::Delta(StateDelta::from(vec![1])),
related_contracts: RelatedContracts::default(),
}),
)
.await
.expect("deferred UPDATE must resolve (not hang) when the fetch fails")
.expect("UPDATE responds");
match resp {
ContractHandlerEvent::UpdateResponse { new_value, .. } => {
assert!(
new_value.is_err(),
"a failed off-loop fetch must surface an UpdateResponse error"
);
}
other => panic!("expected UpdateResponse error, got {other}"),
}
handle.abort();
}
#[test]
fn off_loop_sub_op_outcome_mapping() {
use crate::operations::get::op_ctx_task::SubOpGetOutcome;
let id = *make_contract(b"map_id").key().id();
let found = SubOpGetOutcome::Found(crate::operations::get::GetResult::new(
WrappedState::new(b"fetched".to_vec()),
None,
));
match map_sub_op_outcome(found, id) {
Ok(state) => assert_eq!(state.as_ref(), b"fetched"),
Err(e) => panic!("Found must map to the fetched state, got {e}"),
}
let not_found = SubOpGetOutcome::NotFound("nope".to_string());
assert!(
map_sub_op_outcome(not_found, id).is_err(),
"NotFound must map to MissingRelated"
);
let infra = SubOpGetOutcome::Infra(crate::operations::OpError::UnexpectedOpState);
assert!(
map_sub_op_outcome(infra, id).is_err(),
"Infra must map to MissingRelated"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn client_disconnect_during_deferral_does_not_wedge() {
let _guard = TEST_GUARD.lock().await;
let contract_k = make_contract(b"disc_k");
let key_k = contract_k.key();
let id_b = *make_contract(b"disc_related").key().id();
let (handler, send) = build_handler(vec![(
*key_k.id(),
ValidateOverride::RequestRelated(vec![id_b]),
)])
.await;
let _override =
OverrideGuard::install(Arc::new(move |missing: Vec<ContractInstanceId>| {
Box::pin(async move {
tokio::time::sleep(Duration::from_millis(200)).await;
Ok(missing
.into_iter()
.map(|id| (id, WrappedState::new(b"b_state".to_vec())))
.collect())
})
}));
let send = Arc::new(send);
let handle = GlobalExecutor::spawn(contract_handling(
handler,
crate::contract::user_input::AutoApprovePrompter,
));
let send_k = send.clone();
let contract_k_owned = contract_k;
let disconnect_task = GlobalExecutor::spawn(async move {
let _ = tokio::time::timeout(
Duration::from_millis(50),
send_k.send_to_handler(ContractHandlerEvent::PutQuery {
key: key_k,
state: WrappedState::new(vec![9]),
related_contracts: RelatedContracts::default(),
contract: Some(contract_k_owned),
}),
)
.await;
});
let _ = disconnect_task.await;
tokio::time::sleep(Duration::from_millis(400)).await;
let resp = tokio::time::timeout(
DEFERRED_RELATED_FETCH_TIMEOUT + Duration::from_secs(2),
send.send_to_handler(ContractHandlerEvent::GetQuery {
instance_id: *key_k.id(),
return_contract_code: false,
}),
)
.await
.expect("same-key op after a disconnected deferral must not hang")
.expect("GET responds");
assert!(
matches!(resp, ContractHandlerEvent::GetResponse { .. }),
"expected a GetResponse"
);
handle.abort();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn deferral_cap_fails_fast_with_missing_related() {
let _guard = TEST_GUARD.lock().await;
let released = Arc::new(std::sync::atomic::AtomicBool::new(false));
let released_stub = released.clone();
let _override =
OverrideGuard::install(Arc::new(move |missing: Vec<ContractInstanceId>| {
let released = released_stub.clone();
Box::pin(async move {
while !released.load(std::sync::atomic::Ordering::SeqCst) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
Err(ExecutorError::missing_related(missing[0]))
})
}));
let n = MAX_INFLIGHT_DEFERRALS;
let mut overrides = Vec::with_capacity(n + 1);
let mut put_keys = Vec::with_capacity(n + 1);
let mut put_contracts = Vec::with_capacity(n + 1);
for i in 0..=n {
let c = make_contract(format!("cap_{i}").as_bytes());
let related = *make_contract(format!("cap_rel_{i}").as_bytes()).key().id();
overrides.push((
*c.key().id(),
ValidateOverride::RequestRelated(vec![related]),
));
put_keys.push(c.key());
put_contracts.push(c);
}
let (handler, send) = build_handler(overrides).await;
let send = Arc::new(send);
let handle = GlobalExecutor::spawn(contract_handling(
handler,
crate::contract::user_input::AutoApprovePrompter,
));
let mut tasks = Vec::with_capacity(n);
for i in 0..n {
let send_i = send.clone();
let key_i = put_keys[i];
let contract_i = put_contracts[i].clone();
tasks.push(GlobalExecutor::spawn(async move {
let _ = send_i
.send_to_handler(ContractHandlerEvent::PutQuery {
key: key_i,
state: WrappedState::new(vec![i as u8]),
related_contracts: RelatedContracts::default(),
contract: Some(contract_i),
})
.await;
}));
}
tokio::time::sleep(Duration::from_millis(800)).await;
let over = tokio::time::timeout(
Duration::from_secs(3),
send.send_to_handler(ContractHandlerEvent::PutQuery {
key: put_keys[n],
state: WrappedState::new(vec![0xFF]),
related_contracts: RelatedContracts::default(),
contract: Some(put_contracts[n].clone()),
}),
)
.await
.expect("over-cap PUT must FAIL FAST, not block behind the gated cap")
.expect("over-cap PUT responds");
match over {
ContractHandlerEvent::PutResponse { new_value, .. } => {
assert!(
new_value.is_err(),
"over-cap related-needing PUT must surface MissingRelated"
);
}
other => panic!("expected PutResponse error at cap, got {other}"),
}
released.store(true, std::sync::atomic::Ordering::SeqCst);
for t in tasks {
let _ = tokio::time::timeout(Duration::from_secs(10), t).await;
}
handle.abort();
}
fn instance_id(seed: &[u8]) -> ContractInstanceId {
*make_contract(seed).key().id()
}
#[tokio::test]
async fn dropped_waiter_guard_answers_client_missing_related() {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<DeferredResume>();
let mut ctx = DeferralCtx::new(tx.clone());
let mut handler =
MockWasmContractHandler::new_test(handler::contract_handler_channel().1, None, "drop")
.await;
let contract = make_contract(b"drop_guard_k");
let key = contract.key();
let (resp_tx, resp_rx) = tokio::sync::oneshot::channel();
ctx.stashed
.insert(7, StashedResponder::for_test(7, resp_tx));
{
let guard = ResumeGuard::new(ResumePayload {
resume_tx: tx,
deferral_id: 7,
key,
update: Either::Left(WrappedState::new(vec![1])),
related_contracts: RelatedContracts::default(),
code: Some(contract),
is_put: true,
});
drop(guard);
}
let resume = rx.try_recv().expect("Drop must deliver one resume");
assert_eq!(resume.deferral_id, 7);
assert!(
resume.fetched.is_err(),
"dropped-waiter resume must carry a MissingRelated fetch error"
);
assert!(rx.try_recv().is_err(), "exactly one resume — no extra");
handle_deferred_resume(&mut handler, &mut ctx, resume)
.await
.expect("resume handled");
assert!(
!ctx.stashed.contains_key(&7),
"the stashed responder must be consumed (no leak)"
);
let (_id, ev) = resp_rx.await.expect("client must be answered");
match ev {
ContractHandlerEvent::PutResponse { new_value, .. } => {
assert!(
new_value.is_err(),
"dropped-waiter PUT must surface MissingRelated"
);
}
other => panic!("expected PutResponse error, got {other}"),
}
}
#[tokio::test]
async fn resume_guard_success_sends_exactly_one_resume() {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<DeferredResume>();
let key = make_contract(b"once_k").key();
let guard = ResumeGuard::new(ResumePayload {
resume_tx: tx,
deferral_id: 3,
key,
update: Either::Left(WrappedState::new(vec![1])),
related_contracts: RelatedContracts::default(),
code: None,
is_put: false,
});
guard.send(Ok(vec![]));
let resume = rx.try_recv().expect("success path must deliver one resume");
assert_eq!(resume.deferral_id, 3);
assert!(
resume.fetched.is_ok(),
"success resume must carry the real fetch result"
);
assert!(
rx.try_recv().is_err(),
"exactly one resume — Drop must NOT send again after a success send"
);
}
#[tokio::test]
async fn fetch_related_off_loop_no_op_manager_is_missing_related() {
let _guard = TEST_GUARD.lock().await;
set_off_loop_fetch_override(None);
let id = instance_id(b"off_loop_none");
let result = fetch_related_off_loop(None, vec![id]).await;
assert!(
result.is_err(),
"no op_manager must surface MissingRelated, got {result:?}"
);
}
}