pub(crate) mod op_ctx_task;
pub(crate) use self::messages::{PutMsg, PutStreamingPayload};
use freenet_stdlib::prelude::*;
use super::OpError;
use crate::{
contract::ContractHandlerEvent, message::Transaction, node::OpManager, ring::PeerKeyLocation,
tracing::NetEventLog,
};
use either::Either;
pub(crate) const PUT_TERMINAL_CAUSE_MAX_BYTES: usize = 2048;
pub(crate) fn bound_cause(cause: String) -> String {
const SUFFIX: &str = "...[truncated]";
if cause.len() <= PUT_TERMINAL_CAUSE_MAX_BYTES {
return cause;
}
let budget = PUT_TERMINAL_CAUSE_MAX_BYTES.saturating_sub(SUFFIX.len());
let mut cut = budget.min(cause.len());
while cut > 0 && !cause.is_char_boundary(cut) {
cut -= 1;
}
let mut out = String::with_capacity(cut + SUFFIX.len());
out.push_str(&cause[..cut]);
out.push_str(SUFFIX);
out
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct PutTerminalError {
cause: String,
}
impl PutTerminalError {
pub(crate) fn from_wire(cause: String) -> Self {
Self {
cause: bound_cause(cause),
}
}
pub(crate) fn as_str(&self) -> &str {
&self.cause
}
pub(crate) fn into_string(self) -> String {
self.cause
}
}
impl std::fmt::Display for PutTerminalError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
pub(super) struct PutFinalizationData {
pub sender: PeerKeyLocation,
pub hop_count: Option<usize>,
pub state_hash: Option<String>,
pub state_size: Option<usize>,
}
pub(super) async fn finalize_put_at_originator(
op_manager: &OpManager,
id: Transaction,
key: ContractKey,
telemetry: PutFinalizationData,
subscribe: bool,
blocking_subscribe: bool,
) {
if let Some(event) = NetEventLog::put_success(
&id,
&op_manager.ring,
key,
telemetry.sender,
telemetry.hop_count,
telemetry.state_hash,
telemetry.state_size,
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
op_manager.ring.mark_local_client_access(&key);
if subscribe {
start_subscription_after_put(op_manager, id, key, blocking_subscribe).await;
}
}
async fn start_subscription_after_put(
op_manager: &OpManager,
parent_tx: Transaction,
key: ContractKey,
blocking_subscription: bool,
) {
let child_tx =
super::start_subscription_request(op_manager, parent_tx, key, blocking_subscription);
tracing::debug!(
tx = %parent_tx,
child_tx = %child_tx,
contract = %key,
blocking = blocking_subscription,
phase = "subscribe",
"Started subscription after PUT"
);
}
pub(super) async fn put_contract(
op_manager: &OpManager,
key: ContractKey,
state: WrappedState,
related_contracts: RelatedContracts<'static>,
contract: &ContractContainer,
priority: crate::contract::Priority,
) -> Result<(WrappedState, bool), OpError> {
if crate::contract::contains_debug_sections(contract.data()) {
let sections = crate::contract::debug_sections(contract.data()).join(", ");
tracing::warn!(
contract = %key,
sections = %sections,
"Rejecting debug-compiled contract at storage time (#2257)"
);
return Err(OpError::ContractError(
crate::contract::ContractError::DebugWasmRejected { sections },
));
}
match op_manager
.notify_contract_handler_prioritized(
ContractHandlerEvent::PutQuery {
key,
state,
related_contracts,
contract: Some(contract.clone()),
},
priority,
)
.await
{
Ok(ContractHandlerEvent::PutResponse {
new_value: Ok(new_val),
state_changed,
}) => {
op_manager.notify_contract_stored(&key);
debug_assert!(
new_val.size() > 0,
"put_contract: stored state must be non-empty after successful PUT for contract {key}"
);
Ok((new_val, state_changed))
}
Ok(ContractHandlerEvent::PutResponse {
new_value: Err(err),
..
}) => {
if err.is_contract_queue_full() {
tracing::debug!(
contract = %key,
error = %err,
event = "queue_full",
"PUT skipped: per-contract queue saturated"
);
} else {
tracing::error!(contract = %key, error = %err, phase = "error", "Failed to update contract value");
}
Err(OpError::from(err))
}
Err(err) => Err(err.into()),
Ok(_) => Err(OpError::UnexpectedOpState),
}
}
mod messages {
use std::{collections::HashSet, fmt::Display};
use freenet_stdlib::prelude::*;
use serde::{Deserialize, Serialize};
use crate::message::{InnerMessage, Transaction};
use crate::ring::Location;
use crate::transport::peer_connection::StreamId;
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct PutStreamingPayload {
pub contract: ContractContainer,
#[serde(deserialize_with = "RelatedContracts::deser_related_contracts")]
pub related_contracts: RelatedContracts<'static>,
pub value: WrappedState,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[non_exhaustive]
pub(crate) enum PutMsg {
Request {
id: Transaction,
contract: ContractContainer,
#[serde(deserialize_with = "RelatedContracts::deser_related_contracts")]
related_contracts: RelatedContracts<'static>,
value: WrappedState,
htl: usize,
skip_list: HashSet<std::net::SocketAddr>,
},
Response {
id: Transaction,
key: ContractKey,
#[serde(default)]
hop_count: usize,
},
RequestStreaming {
id: Transaction,
stream_id: StreamId,
contract_key: ContractKey,
total_size: u64,
htl: usize,
skip_list: HashSet<std::net::SocketAddr>,
subscribe: bool,
},
ResponseStreaming {
id: Transaction,
key: ContractKey,
continue_forwarding: bool,
#[serde(default)]
hop_count: usize,
},
ForwardingAck {
id: Transaction,
contract_key: ContractKey,
},
Error {
id: Transaction,
cause: String,
},
}
impl InnerMessage for PutMsg {
fn id(&self) -> &Transaction {
match self {
Self::Request { id, .. }
| Self::Response { id, .. }
| Self::RequestStreaming { id, .. }
| Self::ResponseStreaming { id, .. }
| Self::ForwardingAck { id, .. }
| Self::Error { id, .. } => id,
}
}
fn requested_location(&self) -> Option<Location> {
match self {
Self::Request { contract, .. } => Some(Location::from(contract.id())),
Self::Response { key, .. } => Some(Location::from(key.id())),
Self::RequestStreaming { contract_key, .. } => {
Some(Location::from(contract_key.id()))
}
Self::ResponseStreaming { key, .. } => Some(Location::from(key.id())),
Self::ForwardingAck { contract_key, .. } => Some(Location::from(contract_key.id())),
Self::Error { .. } => None,
}
}
}
impl Display for PutMsg {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Request {
id, contract, htl, ..
} => {
write!(
f,
"PutRequest(id: {}, key: {}, htl: {})",
id,
contract.key(),
htl
)
}
Self::Response { id, key, .. } => {
write!(f, "PutResponse(id: {}, key: {})", id, key)
}
Self::RequestStreaming {
id,
stream_id,
contract_key,
total_size,
htl,
..
} => {
write!(
f,
"PutRequestStreaming(id: {}, key: {}, stream: {}, size: {}, htl: {})",
id, contract_key, stream_id, total_size, htl
)
}
Self::ResponseStreaming {
id,
key,
continue_forwarding,
..
} => {
write!(
f,
"PutResponseStreaming(id: {}, key: {}, continue: {})",
id, key, continue_forwarding
)
}
Self::ForwardingAck { id, contract_key } => {
write!(f, "PutForwardingAck(id: {}, key: {})", id, contract_key)
}
Self::Error { id, cause } => {
write!(f, "PutError(id: {}, cause: {})", id, cause)
}
}
}
}
}
#[cfg(test)]
#[allow(clippy::wildcard_enum_match_arm)]
mod tests {
use super::*;
use crate::message::{InnerMessage, Transaction};
use crate::operations::test_utils::make_contract_key;
#[test]
fn put_msg_id_returns_transaction() {
let tx = Transaction::new::<PutMsg>();
let msg = PutMsg::Response {
id: tx,
key: make_contract_key(1),
hop_count: 0,
};
assert_eq!(*msg.id(), tx, "id() should return the transaction ID");
}
#[test]
fn put_msg_display_formats_correctly() {
let tx = Transaction::new::<PutMsg>();
let msg = PutMsg::Response {
id: tx,
key: make_contract_key(1),
hop_count: 0,
};
let display = format!("{}", msg);
assert!(
display.contains("PutResponse"),
"Display should contain message type name"
);
}
#[test]
fn put_msg_error_id_and_display() {
let tx = Transaction::new::<PutMsg>();
let msg = PutMsg::Error {
id: tx,
cause: "contract rejected: version must be higher".into(),
};
assert_eq!(*msg.id(), tx, "id() should return the transaction ID");
let display = format!("{}", msg);
assert!(display.contains("PutError"), "Display tag");
assert!(display.contains("version must be higher"), "Display cause");
}
#[test]
fn put_msg_error_serde_roundtrip() {
let tx = Transaction::new::<PutMsg>();
let cause = "execution error: invalid contract update".to_string();
let msg = PutMsg::Error {
id: tx,
cause: cause.clone(),
};
let serialized = bincode::serialize(&msg).expect("serialize");
let deserialized: PutMsg = bincode::deserialize(&serialized).expect("deserialize");
match deserialized {
PutMsg::Error {
id,
cause: decoded_cause,
} => {
assert_eq!(id, tx);
assert_eq!(decoded_cause, cause);
}
other => panic!("Expected Error, got {other}"),
}
}
#[test]
fn put_msg_error_has_no_requested_location() {
let tx = Transaction::new::<PutMsg>();
let msg = PutMsg::Error {
id: tx,
cause: "x".into(),
};
assert!(msg.requested_location().is_none());
}
#[test]
fn put_msg_wire_format_variant_tags_are_stable() {
let tx = Transaction::new::<PutMsg>();
let key = make_contract_key(0);
let samples: [(u32, PutMsg); 6] = [
(
0,
PutMsg::Request {
id: tx,
contract: crate::operations::test_utils::make_test_contract(&[]),
related_contracts: RelatedContracts::default(),
value: WrappedState::new(vec![]),
htl: 0,
skip_list: Default::default(),
},
),
(
1,
PutMsg::Response {
id: tx,
key,
hop_count: 0,
},
),
(
2,
PutMsg::RequestStreaming {
id: tx,
stream_id: crate::transport::StreamId::next(),
contract_key: key,
total_size: 0,
htl: 0,
skip_list: Default::default(),
subscribe: false,
},
),
(
3,
PutMsg::ResponseStreaming {
id: tx,
key,
continue_forwarding: false,
hop_count: 0,
},
),
(
4,
PutMsg::ForwardingAck {
id: tx,
contract_key: key,
},
),
(
5,
PutMsg::Error {
id: tx,
cause: String::new(),
},
),
];
for (expected_tag, msg) in samples {
let bytes = bincode::serialize(&msg).expect("serialize");
assert!(
bytes.len() >= 4,
"encoded PutMsg too short to carry a tag: {msg}"
);
let actual_tag = u32::from_le_bytes(bytes[..4].try_into().expect("first 4 bytes"));
assert_eq!(
actual_tag, expected_tag,
"PutMsg wire tag for `{msg}` shifted — reordering variants is a wire-format \
break. If you intentionally renumbered, bump the freenet-stdlib major and \
coordinate the upgrade."
);
}
}
#[test]
fn bound_cause_short_string_passes_through() {
let s = "execution error: contract rejected".to_string();
assert_eq!(bound_cause(s.clone()), s);
}
#[test]
fn bound_cause_truncates_oversize_at_char_boundary() {
let s = "a".repeat(PUT_TERMINAL_CAUSE_MAX_BYTES * 2);
let bounded = bound_cause(s);
assert!(bounded.len() <= PUT_TERMINAL_CAUSE_MAX_BYTES);
assert!(bounded.ends_with("...[truncated]"));
}
#[test]
fn bound_cause_never_splits_utf8_codepoint() {
const FOUR_BYTE: &str = "𝄞"; assert_eq!(FOUR_BYTE.len(), 4);
let n_codepoints = 600;
let s: String = FOUR_BYTE.repeat(n_codepoints);
assert_eq!(s.len(), 4 * n_codepoints);
let suffix_len = "...[truncated]".len();
let raw_budget = PUT_TERMINAL_CAUSE_MAX_BYTES.saturating_sub(suffix_len);
assert!(
!s.is_char_boundary(raw_budget),
"test invariant: raw budget {raw_budget} must land mid-codepoint \
for the walk-back loop to be exercised — pick a codepoint width \
coprime with the budget"
);
let bounded = bound_cause(s);
assert!(
bounded.is_char_boundary(bounded.len()),
"bounded output must end on a UTF-8 codepoint boundary"
);
assert!(
bounded.len() < PUT_TERMINAL_CAUSE_MAX_BYTES,
"walk-back must have trimmed at least one byte (cap={}, \
actual bounded.len()={})",
PUT_TERMINAL_CAUSE_MAX_BYTES,
bounded.len()
);
assert!(bounded.ends_with("...[truncated]"));
let _ = bounded.chars().count();
}
#[test]
fn bound_cause_cap_boundary_is_inclusive() {
let exact = "a".repeat(PUT_TERMINAL_CAUSE_MAX_BYTES);
let bounded = bound_cause(exact.clone());
assert_eq!(
bounded, exact,
"input of exactly PUT_TERMINAL_CAUSE_MAX_BYTES bytes must pass \
through verbatim — the cap is INCLUSIVE"
);
let one_over = "a".repeat(PUT_TERMINAL_CAUSE_MAX_BYTES + 1);
let bounded = bound_cause(one_over);
assert!(
bounded.ends_with("...[truncated]"),
"input of PUT_TERMINAL_CAUSE_MAX_BYTES + 1 bytes MUST be \
truncated with the suffix marker"
);
assert!(
bounded.len() <= PUT_TERMINAL_CAUSE_MAX_BYTES,
"truncated output MUST fit within the cap"
);
}
#[test]
fn put_terminal_error_from_wire_truncates() {
let oversize = "x".repeat(PUT_TERMINAL_CAUSE_MAX_BYTES * 3);
let err = PutTerminalError::from_wire(oversize);
assert!(err.as_str().len() <= PUT_TERMINAL_CAUSE_MAX_BYTES);
assert!(err.as_str().ends_with("...[truncated]"));
}
#[test]
fn test_forwarding_ack_serde_roundtrip() {
let tx = Transaction::new::<PutMsg>();
let key = make_contract_key(42);
let msg = PutMsg::ForwardingAck {
id: tx,
contract_key: key,
};
let serialized = bincode::serialize(&msg).expect("serialize");
let deserialized: PutMsg = bincode::deserialize(&serialized).expect("deserialize");
match deserialized {
PutMsg::ForwardingAck { id, contract_key } => {
assert_eq!(id, tx);
assert_eq!(contract_key, key);
}
other => panic!("Expected ForwardingAck, got {other}"),
}
}
#[test]
fn test_put_msg_response_hop_count_roundtrip() {
let key = make_contract_key(7);
let cases: &[(&str, usize)] = &[
("zero", 0),
("one", 1),
("mid", 4),
("htl", 10),
("large", 64),
];
for (label, hop_count) in cases.iter().copied() {
let response = PutMsg::Response {
id: Transaction::new::<PutMsg>(),
key,
hop_count,
};
let bytes = bincode::serialize(&response).expect(label);
let restored: PutMsg = bincode::deserialize(&bytes).expect(label);
match restored {
PutMsg::Response { hop_count: hc, .. } => {
assert_eq!(hc, hop_count, "Response.hop_count must roundtrip ({label})")
}
_ => panic!("expected Response for {label}"),
}
let streaming = PutMsg::ResponseStreaming {
id: Transaction::new::<PutMsg>(),
key,
continue_forwarding: false,
hop_count,
};
let bytes = bincode::serialize(&streaming).expect(label);
let restored: PutMsg = bincode::deserialize(&bytes).expect(label);
match restored {
PutMsg::ResponseStreaming { hop_count: hc, .. } => assert_eq!(
hc, hop_count,
"ResponseStreaming.hop_count must roundtrip ({label})"
),
_ => panic!("expected ResponseStreaming for {label}"),
}
}
}
#[test]
fn put_gc_speculative_retry_block_must_stay_deleted() {
let src = include_str!("../node/op_state_manager.rs");
assert!(
!src.contains("put_retry_candidates"),
"`put_retry_candidates` accumulator must stay deleted"
);
assert!(
!src.contains("put_retried"),
"`put_retried` retry-count map must stay deleted"
);
}
#[test]
fn completed_must_not_touch_per_op_dashmaps() {
let src = include_str!("../node/op_state_manager.rs");
let fn_start = src
.find("pub fn completed(&self, id: Transaction)")
.expect("OpManager::completed not found");
let fn_end = src[fn_start..]
.find("\n }\n")
.expect("OpManager::completed closing brace not found")
+ fn_start;
let body = &src[fn_start..fn_end];
for forbidden in [
"self.ops.connect",
"self.ops.put",
"self.ops.get",
"self.ops.subscribe",
"self.ops.update",
] {
assert!(
!body.contains(forbidden),
"OpManager::completed must not reference `{forbidden}`"
);
}
}
#[test]
fn put_forwarding_ack_senders_must_stay_deleted() {
let src = include_str!("put.rs");
let needle = format!("NetMessage::from({}::ForwardingAck", "PutMsg",);
assert!(
!src.contains(&needle),
"ForwardingAck senders must not be reintroduced"
);
}
#[test]
fn finalize_put_at_originator_marks_local_client_access() {
const SOURCE: &str = include_str!("put.rs");
let fn_start = SOURCE
.find("pub(super) async fn finalize_put_at_originator(")
.expect("finalize_put_at_originator definition not found");
let body_start = SOURCE[fn_start..]
.find('{')
.map(|p| fn_start + p)
.expect("function body opening brace not found");
let body_end = SOURCE[body_start..]
.find("\n}\n")
.map(|p| body_start + p)
.expect("function body closing brace not found");
let body = &SOURCE[body_start..body_end];
let executable: String = body
.lines()
.map(|line| line.split("//").next().unwrap_or(""))
.collect::<Vec<_>>()
.join("\n");
assert!(
executable.contains("ring.mark_local_client_access(&key)"),
"finalize_put_at_originator MUST call \
`op_manager.ring.mark_local_client_access(&key)` (executable \
code, not just a comment mention) so self-hosted contracts \
get the local-client flag set. Without this call, the \
freenet-stdlib mirror demo 180s cold-GET timeout (2026-05-14) \
returns. See contracts_needing_renewal at hosting.rs:971-991 \
for the downstream gate that depends on this flag."
);
}
#[test]
fn put_contract_rejects_debug_wasm_before_storing() {
const SOURCE: &str = include_str!("put.rs");
let fn_start = SOURCE
.find("pub(super) async fn put_contract(")
.expect("put_contract definition not found");
let body_start = SOURCE[fn_start..]
.find('{')
.map(|p| fn_start + p)
.expect("put_contract body opening brace not found");
let body_end = SOURCE[body_start..]
.find("\n}\n")
.map(|p| body_start + p)
.expect("put_contract body closing brace not found");
let body = &SOURCE[body_start..body_end];
let executable: String = body
.lines()
.map(|line| line.split("//").next().unwrap_or(""))
.collect::<Vec<_>>()
.join("\n");
let guard_pos = executable
.find("contains_debug_sections")
.expect("put_contract must call contains_debug_sections (executable code)");
let store_pos = executable
.find("notify_contract_handler")
.expect("put_contract must call notify_contract_handler");
assert!(
guard_pos < store_pos,
"the debug-WASM guard MUST run BEFORE notify_contract_handler so a \
relayed debug contract is rejected before this node stores it"
);
}
}