pub(crate) mod op_ctx_task;
pub(crate) use self::messages::{PutMsg, PutStreamingPayload};
use freenet_stdlib::prelude::*;
use super::OpError;
use crate::{
contract::ContractHandlerEvent, message::Transaction, node::OpManager, ring::PeerKeyLocation,
tracing::NetEventLog,
};
use either::Either;
pub(super) struct PutFinalizationData {
pub sender: PeerKeyLocation,
pub hop_count: Option<usize>,
pub state_hash: Option<String>,
pub state_size: Option<usize>,
}
pub(super) async fn finalize_put_at_originator(
op_manager: &OpManager,
id: Transaction,
key: ContractKey,
telemetry: PutFinalizationData,
subscribe: bool,
blocking_subscribe: bool,
) {
if let Some(event) = NetEventLog::put_success(
&id,
&op_manager.ring,
key,
telemetry.sender,
telemetry.hop_count,
telemetry.state_hash,
telemetry.state_size,
) {
op_manager.ring.register_events(Either::Left(event)).await;
}
if subscribe {
start_subscription_after_put(op_manager, id, key, blocking_subscribe).await;
}
}
async fn start_subscription_after_put(
op_manager: &OpManager,
parent_tx: Transaction,
key: ContractKey,
blocking_subscription: bool,
) {
let child_tx =
super::start_subscription_request(op_manager, parent_tx, key, blocking_subscription);
tracing::debug!(
tx = %parent_tx,
child_tx = %child_tx,
contract = %key,
blocking = blocking_subscription,
phase = "subscribe",
"Started subscription after PUT"
);
}
pub(super) async fn put_contract(
op_manager: &OpManager,
key: ContractKey,
state: WrappedState,
related_contracts: RelatedContracts<'static>,
contract: &ContractContainer,
) -> Result<(WrappedState, bool), OpError> {
match op_manager
.notify_contract_handler(ContractHandlerEvent::PutQuery {
key,
state,
related_contracts,
contract: Some(contract.clone()),
})
.await
{
Ok(ContractHandlerEvent::PutResponse {
new_value: Ok(new_val),
state_changed,
}) => {
op_manager.notify_contract_stored(&key);
debug_assert!(
new_val.size() > 0,
"put_contract: stored state must be non-empty after successful PUT for contract {key}"
);
Ok((new_val, state_changed))
}
Ok(ContractHandlerEvent::PutResponse {
new_value: Err(err),
..
}) => {
tracing::error!(contract = %key, error = %err, phase = "error", "Failed to update contract value");
Err(OpError::from(err))
}
Err(err) => Err(err.into()),
Ok(_) => Err(OpError::UnexpectedOpState),
}
}
mod messages {
use std::{collections::HashSet, fmt::Display};
use freenet_stdlib::prelude::*;
use serde::{Deserialize, Serialize};
use crate::message::{InnerMessage, Transaction};
use crate::ring::Location;
use crate::transport::peer_connection::StreamId;
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct PutStreamingPayload {
pub contract: ContractContainer,
#[serde(deserialize_with = "RelatedContracts::deser_related_contracts")]
pub related_contracts: RelatedContracts<'static>,
pub value: WrappedState,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub(crate) enum PutMsg {
Request {
id: Transaction,
contract: ContractContainer,
#[serde(deserialize_with = "RelatedContracts::deser_related_contracts")]
related_contracts: RelatedContracts<'static>,
value: WrappedState,
htl: usize,
skip_list: HashSet<std::net::SocketAddr>,
},
Response { id: Transaction, key: ContractKey },
RequestStreaming {
id: Transaction,
stream_id: StreamId,
contract_key: ContractKey,
total_size: u64,
htl: usize,
skip_list: HashSet<std::net::SocketAddr>,
subscribe: bool,
},
ResponseStreaming {
id: Transaction,
key: ContractKey,
continue_forwarding: bool,
},
ForwardingAck {
id: Transaction,
contract_key: ContractKey,
},
}
impl InnerMessage for PutMsg {
fn id(&self) -> &Transaction {
match self {
Self::Request { id, .. }
| Self::Response { id, .. }
| Self::RequestStreaming { id, .. }
| Self::ResponseStreaming { id, .. }
| Self::ForwardingAck { id, .. } => id,
}
}
fn requested_location(&self) -> Option<Location> {
match self {
Self::Request { contract, .. } => Some(Location::from(contract.id())),
Self::Response { key, .. } => Some(Location::from(key.id())),
Self::RequestStreaming { contract_key, .. } => {
Some(Location::from(contract_key.id()))
}
Self::ResponseStreaming { key, .. } => Some(Location::from(key.id())),
Self::ForwardingAck { contract_key, .. } => Some(Location::from(contract_key.id())),
}
}
}
impl Display for PutMsg {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Request {
id, contract, htl, ..
} => {
write!(
f,
"PutRequest(id: {}, key: {}, htl: {})",
id,
contract.key(),
htl
)
}
Self::Response { id, key } => {
write!(f, "PutResponse(id: {}, key: {})", id, key)
}
Self::RequestStreaming {
id,
stream_id,
contract_key,
total_size,
htl,
..
} => {
write!(
f,
"PutRequestStreaming(id: {}, key: {}, stream: {}, size: {}, htl: {})",
id, contract_key, stream_id, total_size, htl
)
}
Self::ResponseStreaming {
id,
key,
continue_forwarding,
} => {
write!(
f,
"PutResponseStreaming(id: {}, key: {}, continue: {})",
id, key, continue_forwarding
)
}
Self::ForwardingAck { id, contract_key } => {
write!(f, "PutForwardingAck(id: {}, key: {})", id, contract_key)
}
}
}
}
}
#[cfg(test)]
#[allow(clippy::wildcard_enum_match_arm)]
mod tests {
use super::*;
use crate::message::{InnerMessage, Transaction};
use crate::operations::test_utils::make_contract_key;
#[test]
fn put_msg_id_returns_transaction() {
let tx = Transaction::new::<PutMsg>();
let msg = PutMsg::Response {
id: tx,
key: make_contract_key(1),
};
assert_eq!(*msg.id(), tx, "id() should return the transaction ID");
}
#[test]
fn put_msg_display_formats_correctly() {
let tx = Transaction::new::<PutMsg>();
let msg = PutMsg::Response {
id: tx,
key: make_contract_key(1),
};
let display = format!("{}", msg);
assert!(
display.contains("PutResponse"),
"Display should contain message type name"
);
}
#[test]
fn test_forwarding_ack_serde_roundtrip() {
let tx = Transaction::new::<PutMsg>();
let key = make_contract_key(42);
let msg = PutMsg::ForwardingAck {
id: tx,
contract_key: key,
};
let serialized = bincode::serialize(&msg).expect("serialize");
let deserialized: PutMsg = bincode::deserialize(&serialized).expect("deserialize");
match deserialized {
PutMsg::ForwardingAck { id, contract_key } => {
assert_eq!(id, tx);
assert_eq!(contract_key, key);
}
other => panic!("Expected ForwardingAck, got {other}"),
}
}
#[test]
fn put_gc_speculative_retry_block_must_stay_deleted() {
let src = include_str!("../node/op_state_manager.rs");
assert!(
!src.contains("put_retry_candidates"),
"`put_retry_candidates` accumulator must stay deleted"
);
assert!(
!src.contains("put_retried"),
"`put_retried` retry-count map must stay deleted"
);
}
#[test]
fn completed_must_not_touch_per_op_dashmaps() {
let src = include_str!("../node/op_state_manager.rs");
let fn_start = src
.find("pub fn completed(&self, id: Transaction)")
.expect("OpManager::completed not found");
let fn_end = src[fn_start..]
.find("\n }\n")
.expect("OpManager::completed closing brace not found")
+ fn_start;
let body = &src[fn_start..fn_end];
for forbidden in [
"self.ops.connect",
"self.ops.put",
"self.ops.get",
"self.ops.subscribe",
"self.ops.update",
] {
assert!(
!body.contains(forbidden),
"OpManager::completed must not reference `{forbidden}`"
);
}
}
#[test]
fn put_forwarding_ack_senders_must_stay_deleted() {
let src = include_str!("put.rs");
let needle = format!("NetMessage::from({}::ForwardingAck", "PutMsg",);
assert!(
!src.contains(&needle),
"ForwardingAck senders must not be reintroduced"
);
}
}