use std::collections::HashSet;
use std::sync::Arc;
use freenet_stdlib::client_api::{ContractResponse, ErrorKind, HostResponse};
use freenet_stdlib::prelude::*;
use crate::client_events::HostResult;
use crate::config::GlobalExecutor;
use crate::message::{NetMessage, NetMessageV1, Transaction};
use crate::node::OpManager;
use crate::operations::OpError;
use crate::operations::op_ctx::{
AdvanceOutcome, AttemptOutcome, RetryDriver, RetryLoopOutcome, drive_retry_loop,
};
use crate::ring::{Location, PeerKeyLocation};
use crate::router::{RouteEvent, RouteOutcome};
use super::{PutFinalizationData, PutMsg};
#[allow(clippy::too_many_arguments)]
pub(crate) async fn start_client_put(
op_manager: Arc<OpManager>,
client_tx: Transaction,
contract: ContractContainer,
related: RelatedContracts<'static>,
value: WrappedState,
htl: usize,
subscribe: bool,
blocking_subscribe: bool,
) -> Result<Transaction, OpError> {
tracing::debug!(
tx = %client_tx,
contract = %contract.key(),
"put (task-per-tx): spawning client-initiated task"
);
GlobalExecutor::spawn(run_client_put(
op_manager,
client_tx,
contract,
related,
value,
htl,
subscribe,
blocking_subscribe,
));
Ok(client_tx)
}
#[allow(clippy::too_many_arguments)]
async fn run_client_put(
op_manager: Arc<OpManager>,
client_tx: Transaction,
contract: ContractContainer,
related: RelatedContracts<'static>,
value: WrappedState,
htl: usize,
subscribe: bool,
blocking_subscribe: bool,
) {
let outcome = drive_client_put(
op_manager.clone(),
client_tx,
contract,
related,
value,
htl,
subscribe,
blocking_subscribe,
)
.await;
deliver_outcome(&op_manager, client_tx, outcome);
}
#[derive(Debug)]
enum DriverOutcome {
Publish(HostResult),
InfrastructureError(OpError),
}
#[allow(clippy::too_many_arguments)]
async fn drive_client_put(
op_manager: Arc<OpManager>,
client_tx: Transaction,
contract: ContractContainer,
related: RelatedContracts<'static>,
value: WrappedState,
htl: usize,
subscribe: bool,
blocking_subscribe: bool,
) -> DriverOutcome {
match drive_client_put_inner(
&op_manager,
client_tx,
contract,
related,
value,
htl,
subscribe,
blocking_subscribe,
)
.await
{
Ok(outcome) => outcome,
Err(err) => DriverOutcome::InfrastructureError(err),
}
}
#[allow(clippy::too_many_arguments)]
async fn drive_client_put_inner(
op_manager: &Arc<OpManager>,
client_tx: Transaction,
contract: ContractContainer,
related: RelatedContracts<'static>,
value: WrappedState,
htl: usize,
subscribe: bool,
blocking_subscribe: bool,
) -> Result<DriverOutcome, OpError> {
let key = contract.key();
let mut tried: Vec<std::net::SocketAddr> = Vec::new();
if let Some(own_addr) = op_manager.ring.connection_manager.get_own_addr() {
tried.push(own_addr);
}
let initial_target = op_manager
.ring
.closest_potentially_hosting(&key, tried.as_slice());
let current_target = match initial_target {
Some(peer) => {
if let Some(addr) = peer.socket_addr() {
tried.push(addr);
}
peer
}
None => op_manager.ring.connection_manager.own_location(),
};
struct PutRetryDriver<'a> {
op_manager: &'a OpManager,
key: ContractKey,
contract: ContractContainer,
related: RelatedContracts<'static>,
value: WrappedState,
htl: usize,
tried: Vec<std::net::SocketAddr>,
retries: usize,
current_target: PeerKeyLocation,
}
impl RetryDriver for PutRetryDriver<'_> {
type Terminal = ContractKey;
fn new_attempt_tx(&mut self) -> Transaction {
Transaction::new::<PutMsg>()
}
fn build_request(&mut self, attempt_tx: Transaction) -> NetMessage {
NetMessage::from(PutMsg::Request {
id: attempt_tx,
contract: self.contract.clone(),
related_contracts: self.related.clone(),
value: self.value.clone(),
htl: self.htl,
skip_list: self
.op_manager
.ring
.connection_manager
.get_own_addr()
.into_iter()
.collect::<HashSet<_>>(),
})
}
fn classify(&mut self, reply: NetMessage) -> AttemptOutcome<ContractKey> {
match classify_reply(&reply) {
ReplyClass::Stored { key } | ReplyClass::LocalCompletion { key } => {
AttemptOutcome::Terminal(key)
}
ReplyClass::Unexpected => AttemptOutcome::Unexpected,
}
}
fn advance(&mut self) -> AdvanceOutcome {
match advance_to_next_peer(
self.op_manager,
&self.key,
&mut self.tried,
&mut self.retries,
) {
Some((next_target, _next_addr)) => {
self.current_target = next_target;
AdvanceOutcome::Next
}
None => AdvanceOutcome::Exhausted,
}
}
}
let mut driver = PutRetryDriver {
op_manager,
key,
contract,
related,
value,
htl,
tried,
retries: 0,
current_target,
};
let loop_result = drive_retry_loop(op_manager, client_tx, "put", &mut driver).await;
match loop_result {
RetryLoopOutcome::Done(reply_key) => {
op_manager.completed(client_tx);
let contract_location = Location::from(&reply_key);
let route_event = RouteEvent {
peer: driver.current_target.clone(),
contract_location,
outcome: RouteOutcome::SuccessUntimed,
op_type: Some(crate::node::network_status::OpType::Put),
};
if let Some(log_event) =
crate::tracing::NetEventLog::route_event(&client_tx, &op_manager.ring, &route_event)
{
op_manager
.ring
.register_events(either::Either::Left(log_event))
.await;
}
op_manager.ring.routing_finished(route_event);
crate::node::network_status::record_op_result(
crate::node::network_status::OpType::Put,
true,
);
super::finalize_put_at_originator(
op_manager,
client_tx,
reply_key,
PutFinalizationData {
sender: driver.current_target,
hop_count: None,
state_hash: None,
state_size: None,
},
false,
false,
)
.await;
maybe_subscribe_child(
op_manager,
client_tx,
reply_key,
subscribe,
blocking_subscribe,
)
.await;
Ok(DriverOutcome::Publish(Ok(HostResponse::ContractResponse(
ContractResponse::PutResponse { key: reply_key },
))))
}
RetryLoopOutcome::Exhausted(cause) => {
Ok(DriverOutcome::Publish(Err(ErrorKind::OperationError {
cause: cause.into(),
}
.into())))
}
RetryLoopOutcome::Unexpected => Err(OpError::UnexpectedOpState),
RetryLoopOutcome::InfraError(err) => Err(err),
}
}
#[derive(Debug)]
enum ReplyClass {
Stored {
key: ContractKey,
},
LocalCompletion {
key: ContractKey,
},
Unexpected,
}
fn classify_reply(msg: &NetMessage) -> ReplyClass {
match msg {
NetMessage::V1(NetMessageV1::Put(
PutMsg::Response { key, .. } | PutMsg::ResponseStreaming { key, .. },
)) => ReplyClass::Stored { key: *key },
NetMessage::V1(NetMessageV1::Put(PutMsg::Request {
id: _, contract, ..
})) => ReplyClass::LocalCompletion {
key: contract.key(),
},
_ => ReplyClass::Unexpected,
}
}
const MAX_RETRIES: usize = 3;
fn advance_to_next_peer(
op_manager: &OpManager,
key: &ContractKey,
tried: &mut Vec<std::net::SocketAddr>,
retries: &mut usize,
) -> Option<(PeerKeyLocation, std::net::SocketAddr)> {
if *retries >= MAX_RETRIES {
return None;
}
*retries += 1;
let peer = op_manager
.ring
.closest_potentially_hosting(key, tried.as_slice())?;
let addr = peer.socket_addr()?;
tried.push(addr);
Some((peer, addr))
}
async fn maybe_subscribe_child(
op_manager: &Arc<OpManager>,
client_tx: Transaction,
key: ContractKey,
subscribe: bool,
blocking_subscribe: bool,
) {
if !subscribe {
return;
}
use crate::operations::subscribe;
let child_tx = Transaction::new_child_of::<subscribe::SubscribeMsg>(&client_tx);
op_manager.expect_and_register_sub_operation(client_tx, child_tx);
if blocking_subscribe {
subscribe::run_client_subscribe(op_manager.clone(), *key.id(), child_tx).await;
} else {
GlobalExecutor::spawn(subscribe::run_client_subscribe(
op_manager.clone(),
*key.id(),
child_tx,
));
}
}
fn deliver_outcome(op_manager: &OpManager, client_tx: Transaction, outcome: DriverOutcome) {
match outcome {
DriverOutcome::Publish(result) => {
op_manager.send_client_result(client_tx, result);
}
DriverOutcome::InfrastructureError(err) => {
tracing::warn!(
tx = %client_tx,
error = %err,
"put (task-per-tx): infrastructure error; publishing synthesized client error"
);
let synthesized: HostResult = Err(ErrorKind::OperationError {
cause: format!("PUT failed: {err}").into(),
}
.into());
op_manager.send_client_result(client_tx, synthesized);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn dummy_key() -> ContractKey {
ContractKey::from_id_and_code(ContractInstanceId::new([1u8; 32]), CodeHash::new([2u8; 32]))
}
fn dummy_tx() -> Transaction {
Transaction::new::<PutMsg>()
}
#[test]
fn classify_reply_response_is_stored() {
let tx = dummy_tx();
let key = dummy_key();
let msg = NetMessage::V1(NetMessageV1::Put(PutMsg::Response { id: tx, key }));
assert!(matches!(classify_reply(&msg), ReplyClass::Stored { .. }));
}
#[test]
fn classify_reply_response_streaming_is_stored() {
let tx = dummy_tx();
let key = dummy_key();
let msg = NetMessage::V1(NetMessageV1::Put(PutMsg::ResponseStreaming {
id: tx,
key,
continue_forwarding: false,
}));
assert!(matches!(classify_reply(&msg), ReplyClass::Stored { .. }));
}
#[test]
fn classify_reply_forwarding_ack_is_unexpected() {
let tx = dummy_tx();
let key = dummy_key();
let msg = NetMessage::V1(NetMessageV1::Put(PutMsg::ForwardingAck {
id: tx,
contract_key: key,
}));
assert!(
matches!(classify_reply(&msg), ReplyClass::Unexpected),
"ForwardingAck must NOT be classified as terminal (Phase 2b bug 2)"
);
}
#[test]
fn finalize_put_at_originator_never_subscribes_from_driver() {
const SOURCE: &str = include_str!("op_ctx_task.rs");
let call_marker = "finalize_put_at_originator(";
let mut offset = 0;
let mut call_count = 0;
while let Some(pos) = SOURCE[offset..].find(call_marker) {
let abs_pos = offset + pos;
let window_end = SOURCE[abs_pos..]
.find(".await")
.map(|p| abs_pos + p)
.unwrap_or(SOURCE.len().min(abs_pos + 500));
let window = &SOURCE[abs_pos..window_end];
let after_struct = window.find("},").map(|p| &window[p..]);
if let Some(tail) = after_struct {
assert!(
!tail.contains("subscribe"),
"finalize_put_at_originator call in driver passes subscribe \
arguments that reference the `subscribe` variable instead of \
hardcoded `false`. This would cause double-subscription — \
subscriptions must be handled exclusively by maybe_subscribe_child. \
See commit 494a3c69 for the original fix."
);
}
call_count += 1;
offset = abs_pos + call_marker.len();
}
assert!(
call_count >= 1,
"Expected at least 1 finalize_put_at_originator call in the driver, \
found {call_count}"
);
}
#[test]
fn max_retries_boundary_exhausts_at_limit() {
let mut retries: usize = 0;
for _ in 0..MAX_RETRIES {
assert!(retries < MAX_RETRIES, "should not exhaust before limit");
retries += 1;
}
assert!(
retries >= MAX_RETRIES,
"should exhaust at MAX_RETRIES={MAX_RETRIES}"
);
}
#[test]
fn classify_reply_unexpected_for_non_put_message() {
let tx = dummy_tx();
let msg = NetMessage::V1(NetMessageV1::Aborted(tx));
assert!(matches!(classify_reply(&msg), ReplyClass::Unexpected));
}
#[test]
fn driver_outcome_exhausted_produces_client_error() {
let cause = "PUT to contract failed after 3 attempts".to_string();
let outcome: DriverOutcome = match RetryLoopOutcome::<ContractKey>::Exhausted(cause) {
RetryLoopOutcome::Exhausted(cause) => {
DriverOutcome::Publish(Err(ErrorKind::OperationError {
cause: cause.into(),
}
.into()))
}
RetryLoopOutcome::Done(_)
| RetryLoopOutcome::Unexpected
| RetryLoopOutcome::InfraError(_) => unreachable!(),
};
assert!(
matches!(outcome, DriverOutcome::Publish(Err(_))),
"Exhaustion must produce a client error, not be swallowed"
);
}
#[test]
fn classify_reply_request_is_local_completion() {
let tx = dummy_tx();
let msg = NetMessage::V1(NetMessageV1::Put(PutMsg::Request {
id: tx,
contract: ContractContainer::Wasm(ContractWasmAPIVersion::V1(WrappedContract::new(
Arc::new(ContractCode::from(vec![0u8])),
Parameters::from(vec![]),
))),
related_contracts: RelatedContracts::default(),
value: WrappedState::new(vec![1u8]),
htl: 5,
skip_list: HashSet::new(),
}));
assert!(matches!(
classify_reply(&msg),
ReplyClass::LocalCompletion { .. }
));
}
}