#[cfg(debug_assertions)]
use std::backtrace::Backtrace as StdTrace;
use std::{pin::Pin, time::Duration};
use freenet_stdlib::prelude::{ContractInstanceId, ContractKey};
use futures::Future;
use tokio::sync::mpsc::error::SendError;
use std::net::SocketAddr;
use crate::{
client_events::HostResult,
config::GlobalExecutor,
contract::{ContractError, ExecutorError},
message::{InnerMessage, MessageStats, NetMessage, NetMessageV1, Transaction, TransactionType},
node::{ConnectionError, NetworkBridge, OpManager, OpNotAvailable},
ring::{Location, PeerKeyLocation, RingError},
};
pub(crate) mod connect;
pub(crate) mod get;
pub(crate) mod orphan_streams;
pub(crate) mod put;
pub(crate) mod subscribe;
#[cfg(test)]
pub(crate) mod test_utils;
pub(crate) mod update;
pub(crate) mod visited_peers;
pub(crate) use visited_peers::VisitedPeers;
pub(crate) trait Operation
where
Self: Sized,
{
type Message: InnerMessage + std::fmt::Display;
type Result;
fn load_or_init<'a>(
op_manager: &'a OpManager,
msg: &'a Self::Message,
source_addr: Option<SocketAddr>,
) -> impl Future<Output = Result<OpInitialization<Self>, OpError>> + 'a;
fn id(&self) -> &Transaction;
#[allow(clippy::type_complexity)]
fn process_message<'a, CB: NetworkBridge>(
self,
conn_manager: &'a mut CB,
op_manager: &'a OpManager,
input: &'a Self::Message,
source_addr: Option<SocketAddr>,
) -> Pin<Box<dyn Future<Output = Result<OperationResult, OpError>> + Send + 'a>>;
}
#[must_use]
#[allow(clippy::large_enum_variant)] pub(crate) enum OperationResult {
Completed,
ContinueOp(OpEnum),
SendAndContinue {
msg: NetMessage,
next_hop: Option<SocketAddr>,
state: OpEnum,
stream_data: Option<(crate::transport::peer_connection::StreamId, bytes::Bytes)>,
},
SendAndComplete {
msg: NetMessage,
next_hop: Option<SocketAddr>,
stream_data: Option<(crate::transport::peer_connection::StreamId, bytes::Bytes)>,
},
}
pub(crate) struct OpInitialization<Op> {
#[allow(dead_code)]
pub source_addr: Option<SocketAddr>,
pub op: Op,
}
pub(crate) async fn handle_op_request<Op, NB>(
op_manager: &OpManager,
network_bridge: &mut NB,
msg: &Op::Message,
source_addr: Option<SocketAddr>,
) -> Result<Option<OpEnum>, OpError>
where
Op: Operation,
NB: NetworkBridge,
{
let tx = *msg.id();
let result = {
let OpInitialization { source_addr: _, op } =
Op::load_or_init(op_manager, msg, source_addr).await?;
op.process_message(network_bridge, op_manager, msg, source_addr)
.await
};
handle_op_result(op_manager, network_bridge, result, tx, source_addr).await
}
#[inline(always)]
async fn handle_op_result<CB>(
op_manager: &OpManager,
network_bridge: &mut CB,
result: Result<OperationResult, OpError>,
tx_id: Transaction,
source_addr: Option<SocketAddr>,
) -> Result<Option<OpEnum>, OpError>
where
CB: NetworkBridge,
{
match result {
Err(OpError::StatePushed) => {
tracing::debug!("entered in state pushed to continue with op");
return Ok(None);
}
Err(OpError::OpNotPresent(tx)) => {
tracing::debug!(
tx = %tx,
"Ignoring duplicate message for already-handled operation"
);
return Ok(None);
}
Err(err) => {
tracing::error!(
tx = %tx_id,
error = %err,
error_debug = ?err,
source = ?source_addr,
"handle_op_result: sending Aborted due to operation error"
);
if let Some(addr) = source_addr {
network_bridge
.send(addr, NetMessage::V1(NetMessageV1::Aborted(tx_id)))
.await?;
}
return Err(err);
}
Ok(OperationResult::Completed) => {
op_manager.completed(tx_id);
}
Ok(OperationResult::ContinueOp(state)) => {
if state.finalized() {
if op_manager.failed_parents().remove(&tx_id).is_some() {
tracing::warn!(
tx = %tx_id,
phase = "error",
"Operation reached finalized state after a sub-operation failure; dropping client response"
);
op_manager.completed(tx_id);
return Ok(None);
}
if op_manager.all_sub_operations_completed(tx_id) {
tracing::debug!(%tx_id, "operation complete");
op_manager.completed(tx_id);
return Ok(Some(state));
} else {
let pending_count = op_manager.count_pending_sub_operations(tx_id);
tracing::debug!(
%tx_id,
pending_count,
"root operation awaiting child completion"
);
op_manager.root_ops_awaiting_sub_ops().insert(tx_id, state);
tracing::info!(tx = %tx_id, phase = "wait_sub_ops", "root operation registered as awaiting sub-ops");
return Ok(None);
}
} else {
let id = *state.id();
op_manager.push(id, state).await?;
}
}
Ok(OperationResult::SendAndContinue {
msg,
next_hop,
state: updated_state,
stream_data,
}) => {
if updated_state.finalized() {
let id = *msg.id();
tracing::debug!(%id, "operation finalized with outgoing message");
op_manager.completed(id);
if let Some(target) = next_hop {
tracing::debug!(%id, ?target, "sending final message to target");
send_with_stream(network_bridge, target, msg, stream_data).await?;
}
return Ok(Some(updated_state));
} else {
let id = *msg.id();
tracing::debug!(%id, "operation in progress");
if let Some(target) = next_hop {
tracing::debug!(%id, ?target, "sending updated op state");
op_manager.push(id, updated_state).await?;
send_with_stream(network_bridge, target, msg, stream_data).await?;
} else {
tracing::debug!(%id, "queueing op state for local processing");
debug_assert!(
matches!(
msg,
NetMessage::V1(NetMessageV1::Update(
crate::operations::update::UpdateMsg::Broadcasting { .. }
))
),
"Only Update::Broadcasting messages should be re-queued locally"
);
op_manager.notify_op_change(msg, updated_state).await?;
return Err(OpError::StatePushed);
}
}
}
Ok(OperationResult::SendAndComplete {
msg,
next_hop,
stream_data,
}) => {
if let Some(target) = next_hop {
tracing::debug!(%tx_id, ?target, "sending back message to target");
match send_with_stream(network_bridge, target, msg, stream_data).await {
Ok(()) => {
op_manager.completed(tx_id);
}
Err(e) => {
tracing::warn!(
%tx_id, %target, error = %e,
"Response send failed — originator will retry via speculative path"
);
return Err(e);
}
}
} else {
op_manager.completed(tx_id);
}
}
}
Ok(None)
}
async fn send_with_stream<CB: NetworkBridge>(
network_bridge: &mut CB,
target: SocketAddr,
msg: NetMessage,
stream_data: Option<(crate::transport::peer_connection::StreamId, bytes::Bytes)>,
) -> Result<(), OpError> {
let id = *msg.id();
let metadata = if stream_data.is_some() {
match bincode::serialize(&msg) {
Ok(bytes) => Some(bytes::Bytes::from(bytes)),
Err(e) => {
tracing::warn!(%id, error = %e, "Failed to serialize metadata for embedding");
None
}
}
} else {
None
};
network_bridge.send(target, msg).await?;
if let Some((stream_id, data)) = stream_data {
tracing::debug!(%id, %stream_id, ?target, "sending stream data");
network_bridge
.send_stream(target, stream_id, data, metadata)
.await?;
}
Ok(())
}
#[must_use]
#[allow(clippy::large_enum_variant)]
pub(crate) enum OpEnum {
Connect(Box<connect::ConnectOp>),
Put(put::PutOp),
Get(get::GetOp),
Subscribe(subscribe::SubscribeOp),
Update(update::UpdateOp),
}
impl OpEnum {
delegate::delegate! {
to match self {
OpEnum::Connect(op) => op,
OpEnum::Put(op) => op,
OpEnum::Get(op) => op,
OpEnum::Subscribe(op) => op,
OpEnum::Update(op) => op,
} {
pub fn id(&self) -> &Transaction;
pub fn outcome(&self) -> OpOutcome<'_>;
pub fn finalized(&self) -> bool;
pub fn to_host_result(&self) -> HostResult;
}
}
pub fn is_subscription_renewal(&self) -> bool {
matches!(self, OpEnum::Subscribe(op) if op.is_renewal())
}
}
macro_rules! try_from_op_enum {
($op_enum:path, $op_type:ty, $transaction_type:expr) => {
impl TryFrom<OpEnum> for $op_type {
type Error = OpError;
fn try_from(value: OpEnum) -> Result<Self, Self::Error> {
match value {
$op_enum(op) => Ok(op),
other => Err(OpError::IncorrectTxType(
$transaction_type,
other.id().transaction_type(),
)),
}
}
}
};
}
try_from_op_enum!(OpEnum::Put, put::PutOp, TransactionType::Put);
try_from_op_enum!(OpEnum::Get, get::GetOp, TransactionType::Get);
try_from_op_enum!(
OpEnum::Subscribe,
subscribe::SubscribeOp,
TransactionType::Subscribe
);
try_from_op_enum!(OpEnum::Update, update::UpdateOp, TransactionType::Update);
#[derive(Debug)]
pub(crate) enum OpOutcome<'a> {
ContractOpSuccess {
target_peer: &'a PeerKeyLocation,
contract_location: Location,
first_response_time: Duration,
payload_size: usize,
payload_transfer_time: Duration,
},
ContractOpSuccessUntimed {
target_peer: &'a PeerKeyLocation,
contract_location: Location,
},
ContractOpFailure {
target_peer: &'a PeerKeyLocation,
contract_location: Location,
},
Incomplete,
Irrelevant,
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum OpError {
#[error(transparent)]
ConnError(#[from] ConnectionError),
#[error(transparent)]
RingError(#[from] RingError),
#[error(transparent)]
ContractError(#[from] ContractError),
#[error(transparent)]
ExecutorError(#[from] ExecutorError),
#[error("unexpected operation state")]
UnexpectedOpState,
#[error(
"cannot perform a state transition from the current state with the provided input (tx: {tx})"
)]
InvalidStateTransition {
tx: Transaction,
#[cfg(debug_assertions)]
state: Option<Box<dyn std::fmt::Debug + Send + Sync>>,
#[cfg(debug_assertions)]
trace: StdTrace,
},
#[error("failed notifying, channel closed")]
NotificationError,
#[error("notification channel error: {0}")]
NotificationChannelError(String),
#[error("unspected transaction type, trying to get a {0:?} from a {1:?}")]
IncorrectTxType(TransactionType, TransactionType),
#[error("op not present: {0}")]
OpNotPresent(Transaction),
#[error("op not available")]
OpNotAvailable(#[from] OpNotAvailable),
#[error("stream was cancelled")]
StreamCancelled,
#[error("failed to claim orphan stream")]
OrphanStreamClaimFailed,
#[error("early push of state into the op stack")]
StatePushed,
}
impl OpError {
pub fn invalid_transition(tx: Transaction) -> Self {
Self::InvalidStateTransition {
tx,
#[cfg(debug_assertions)]
state: None,
#[cfg(debug_assertions)]
trace: StdTrace::force_capture(),
}
}
pub fn invalid_transition_with_state(
tx: Transaction,
state: Box<dyn std::fmt::Debug + Send + Sync>,
) -> Self {
#[cfg(not(debug_assertions))]
{
drop(state);
}
Self::InvalidStateTransition {
tx,
#[cfg(debug_assertions)]
state: Some(state),
#[cfg(debug_assertions)]
trace: StdTrace::force_capture(),
}
}
pub fn is_contract_exec_rejection(&self) -> bool {
matches!(self, Self::ExecutorError(e) if e.is_contract_exec_rejection())
}
}
impl<T> From<SendError<T>> for OpError {
fn from(_: SendError<T>) -> OpError {
OpError::NotificationError
}
}
pub(crate) async fn announce_contract_hosted(op_manager: &OpManager, key: &ContractKey) {
if let Some(announcement) = op_manager.neighbor_hosting.on_contract_hosted(key) {
tracing::debug!(
%key,
"NEIGHBOR_HOSTING: Announcing contract hosted to neighbors"
);
if let Err(err) = op_manager
.notify_node_event(crate::message::NodeEvent::BroadcastHostingUpdate {
message: announcement,
})
.await
{
tracing::warn!(
contract = %key,
error = %err,
phase = "error",
"NEIGHBOR_HOSTING: Failed to broadcast hosting announcement"
);
}
}
}
pub(crate) async fn setup_subscription_forwarding_at_relay(
op_manager: &OpManager,
key: &ContractKey,
tx: &crate::message::Transaction,
upstream_response_addr: std::net::SocketAddr,
downstream_requester_addr: std::net::SocketAddr,
) {
if let Some(upstream_pkl) = op_manager
.ring
.connection_manager
.get_peer_by_addr(upstream_response_addr)
{
let peer_key = crate::ring::interest::PeerKey::from(upstream_pkl.pub_key.clone());
op_manager
.interest_manager
.register_peer_interest(key, peer_key, None, true);
} else {
tracing::debug!(
tx = %tx,
contract = %key,
upstream = %upstream_response_addr,
"Piggyback relay: upstream peer not in ring, skipping interest registration"
);
}
subscribe::register_downstream_subscriber(
op_manager,
key,
downstream_requester_addr,
None,
None,
tx,
" (relay, piggybacked on GET response)",
)
.await;
tracing::debug!(
tx = %tx,
contract = %key,
upstream = %upstream_response_addr,
downstream = %downstream_requester_addr,
"Set up subscription forwarding at relay via GET piggyback"
);
}
pub(crate) async fn complete_piggyback_subscription(
op_manager: &OpManager,
key: &ContractKey,
tx: &crate::message::Transaction,
sender_from_addr: &Option<crate::ring::PeerKeyLocation>,
) {
op_manager.ring.subscribe(*key);
op_manager.ring.complete_subscription_request(key, true);
let became_interested = op_manager.interest_manager.add_local_client(key);
if became_interested {
broadcast_change_interests(op_manager, vec![*key], vec![]).await;
}
announce_contract_hosted(op_manager, key).await;
if let Some(upstream_pkl) = sender_from_addr.as_ref() {
let peer_key = crate::ring::interest::PeerKey::from(upstream_pkl.pub_key.clone());
op_manager
.interest_manager
.register_peer_interest(key, peer_key, None, true);
tracing::debug!(tx = %tx, contract = %key, "Subscription completed via GET piggyback");
} else {
tracing::warn!(
tx = %tx,
contract = %key,
"GET piggyback: upstream peer not in ring, subscription tree incomplete -- renewal will heal"
);
}
}
pub(crate) async fn auto_subscribe_on_get_response(
op_manager: &OpManager,
key: &ContractKey,
tx: &crate::message::Transaction,
sender_from_addr: &Option<crate::ring::PeerKeyLocation>,
subscribe_requested: bool,
blocking_sub: bool,
path_label: &str,
) {
if subscribe_requested {
complete_piggyback_subscription(op_manager, key, tx, sender_from_addr).await;
} else {
let child_tx = start_subscription_request(op_manager, *tx, *key, blocking_sub);
tracing::debug!(tx = %tx, %child_tx, blocking = %blocking_sub, "started subscription ({path_label}, fallback)");
}
}
pub(crate) async fn broadcast_change_interests(
op_manager: &OpManager,
added: Vec<ContractKey>,
removed: Vec<ContractKey>,
) {
use crate::ring::interest::contract_hash;
if added.is_empty() && removed.is_empty() {
return;
}
let added_hashes: Vec<u32> = added.iter().map(contract_hash).collect();
let removed_hashes: Vec<u32> = removed.iter().map(contract_hash).collect();
tracing::debug!(
added_count = added_hashes.len(),
removed_count = removed_hashes.len(),
"Broadcasting ChangeInterests to neighbors"
);
if let Err(err) = op_manager
.notify_node_event(crate::message::NodeEvent::BroadcastChangeInterests {
added: added_hashes,
removed: removed_hashes,
})
.await
{
tracing::warn!(
error = %err,
"Failed to broadcast ChangeInterests"
);
}
}
fn start_subscription_request_async(
op_manager: &OpManager,
parent_tx: Transaction,
key: ContractKey,
) -> Transaction {
start_subscription_request_internal(op_manager, parent_tx, key, false)
}
fn start_subscription_request_blocking(
op_manager: &OpManager,
parent_tx: Transaction,
key: ContractKey,
) -> Transaction {
start_subscription_request_internal(op_manager, parent_tx, key, true)
}
pub(super) fn start_subscription_request(
op_manager: &OpManager,
parent_tx: Transaction,
key: ContractKey,
blocking: bool,
) -> Transaction {
if blocking {
start_subscription_request_blocking(op_manager, parent_tx, key)
} else {
start_subscription_request_async(op_manager, parent_tx, key)
}
}
fn start_subscription_request_internal(
op_manager: &OpManager,
parent_tx: Transaction,
key: ContractKey,
track_parent: bool,
) -> Transaction {
let child_tx = Transaction::new_child_of::<subscribe::SubscribeMsg>(&parent_tx);
if track_parent {
op_manager.expect_and_register_sub_operation(parent_tx, child_tx);
}
tracing::debug!(
%parent_tx,
%child_tx,
%key,
"created child subscription operation"
);
let op_manager_cloned = op_manager.clone();
GlobalExecutor::spawn(async move {
tokio::task::yield_now().await;
let sub_op = subscribe::start_op_with_id(*key.id(), child_tx, false);
match subscribe::request_subscribe(&op_manager_cloned, sub_op).await {
Ok(_) => {
tracing::debug!(%child_tx, %parent_tx, "child subscription completed");
}
Err(error) => {
let error_msg = format!("{}", error);
tracing::error!(tx = %parent_tx, child_tx = %child_tx, error = error_msg, phase = "error", "child subscription failed");
if let Err(e) = op_manager_cloned
.sub_operation_failed(child_tx, &error_msg)
.await
{
tracing::error!(tx = %parent_tx, child_tx = %child_tx, error = %e, phase = "error", "failed to propagate failure");
}
if !track_parent {
let instance_id = *key.id();
if let Err(e) = op_manager_cloned
.notify_contract_handler(
crate::contract::ContractHandlerEvent::NotifySubscriptionError {
key: instance_id,
reason: format!("Subscription failed: {}", error_msg),
},
)
.await
{
tracing::debug!(
contract = %instance_id,
error = %e,
"Failed to send subscription error to notification channels"
);
}
}
}
}
});
child_tx
}
async fn has_contract(
op_manager: &OpManager,
instance_id: ContractInstanceId,
) -> Result<Option<ContractKey>, OpError> {
match op_manager
.notify_contract_handler(crate::contract::ContractHandlerEvent::GetQuery {
instance_id,
return_contract_code: false,
})
.await?
{
crate::contract::ContractHandlerEvent::GetResponse {
key,
response: Ok(crate::contract::StoreResponse { state: Some(_), .. }),
} => Ok(key),
crate::contract::ContractHandlerEvent::DelegateRequest { .. }
| crate::contract::ContractHandlerEvent::DelegateResponse(_)
| crate::contract::ContractHandlerEvent::PutQuery { .. }
| crate::contract::ContractHandlerEvent::PutResponse { .. }
| crate::contract::ContractHandlerEvent::GetQuery { .. }
| crate::contract::ContractHandlerEvent::GetResponse { .. }
| crate::contract::ContractHandlerEvent::UpdateQuery { .. }
| crate::contract::ContractHandlerEvent::UpdateResponse { .. }
| crate::contract::ContractHandlerEvent::UpdateNoChange { .. }
| crate::contract::ContractHandlerEvent::RegisterSubscriberListener { .. }
| crate::contract::ContractHandlerEvent::RegisterSubscriberListenerResponse
| crate::contract::ContractHandlerEvent::QuerySubscriptions { .. }
| crate::contract::ContractHandlerEvent::QuerySubscriptionsResponse
| crate::contract::ContractHandlerEvent::GetSummaryQuery { .. }
| crate::contract::ContractHandlerEvent::GetSummaryResponse { .. }
| crate::contract::ContractHandlerEvent::GetDeltaQuery { .. }
| crate::contract::ContractHandlerEvent::GetDeltaResponse { .. }
| crate::contract::ContractHandlerEvent::NotifySubscriptionError { .. }
| crate::contract::ContractHandlerEvent::NotifySubscriptionErrorResponse
| crate::contract::ContractHandlerEvent::ClientDisconnect { .. } => Ok(None),
}
}
pub(crate) fn should_use_streaming(streaming_threshold: usize, payload_size: usize) -> bool {
payload_size > streaming_threshold
}
#[cfg(test)]
mod ordering_invariant_tests {
use super::test_utils::MockNetworkBridge;
use crate::message::{NetMessage, NetMessageV1, Transaction};
use crate::node::NetworkBridge;
use crate::operations::connect::ConnectMsg;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
#[tokio::test]
async fn mock_network_bridge_records_send_ordering() {
let bridge = MockNetworkBridge::new();
let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5000);
let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5001);
let tx1 = Transaction::new::<ConnectMsg>();
let tx2 = Transaction::new::<ConnectMsg>();
bridge
.send(addr1, NetMessage::V1(NetMessageV1::Aborted(tx1)))
.await
.unwrap();
bridge
.send(addr2, NetMessage::V1(NetMessageV1::Aborted(tx2)))
.await
.unwrap();
let sent = bridge.sent_messages();
assert_eq!(sent.len(), 2);
assert_eq!(sent[0].0, addr1, "First send should be to addr1");
assert_eq!(sent[1].0, addr2, "Second send should be to addr2");
}
#[test]
fn push_before_send_invariant_is_documented() {
}
}
#[cfg(test)]
mod streaming_tests {
use super::should_use_streaming;
const DEFAULT_THRESHOLD: usize = 64 * 1024;
#[test]
fn test_streaming_respects_threshold() {
assert!(!should_use_streaming(DEFAULT_THRESHOLD, 0));
assert!(!should_use_streaming(DEFAULT_THRESHOLD, 1000));
assert!(!should_use_streaming(DEFAULT_THRESHOLD, DEFAULT_THRESHOLD)); assert!(should_use_streaming(
DEFAULT_THRESHOLD,
DEFAULT_THRESHOLD + 1
)); assert!(should_use_streaming(DEFAULT_THRESHOLD, 1024 * 1024)); }
#[test]
fn test_streaming_custom_threshold() {
let custom_threshold = 128 * 1024; assert!(!should_use_streaming(custom_threshold, 64 * 1024));
assert!(!should_use_streaming(custom_threshold, custom_threshold));
assert!(should_use_streaming(custom_threshold, custom_threshold + 1));
}
#[test]
fn test_streaming_zero_threshold() {
assert!(!should_use_streaming(0, 0));
assert!(should_use_streaming(0, 1));
assert!(should_use_streaming(0, 100));
}
}