#[cfg(debug_assertions)]
use std::backtrace::Backtrace as StdTrace;
use std::{pin::Pin, time::Duration};
use freenet_stdlib::prelude::{ContractInstanceId, ContractKey};
use futures::Future;
use tokio::sync::mpsc::error::SendError;
use std::net::SocketAddr;
use crate::{
client_events::HostResult,
config::GlobalExecutor,
contract::{ContractError, ExecutorError},
message::{InnerMessage, MessageStats, NetMessage, NetMessageV1, Transaction, TransactionType},
node::{ConnectionError, NetworkBridge, OpManager, OpNotAvailable},
ring::{Location, PeerKeyLocation, RingError},
};
pub(crate) mod connect;
pub(crate) mod get;
pub(crate) mod op_ctx;
pub(crate) mod orphan_streams;
pub(crate) mod put;
pub(crate) mod subscribe;
#[cfg(test)]
pub(crate) mod test_utils;
pub(crate) mod update;
pub(crate) mod visited_peers;
pub(crate) use op_ctx::OpCtx;
pub(crate) use visited_peers::VisitedPeers;
pub(crate) trait Operation
where
Self: Sized,
{
type Message: InnerMessage + std::fmt::Display;
type Result;
fn load_or_init<'a>(
op_manager: &'a OpManager,
msg: &'a Self::Message,
source_addr: Option<SocketAddr>,
) -> impl Future<Output = Result<OpInitialization<Self>, OpError>> + 'a;
fn id(&self) -> &Transaction;
#[allow(clippy::type_complexity)]
fn process_message<'a, CB: NetworkBridge>(
self,
conn_manager: &'a mut CB,
op_manager: &'a OpManager,
input: &'a Self::Message,
source_addr: Option<SocketAddr>,
) -> Pin<Box<dyn Future<Output = Result<OperationResult, OpError>> + Send + 'a>>;
}
#[must_use]
#[allow(clippy::large_enum_variant)] pub(crate) enum OperationResult {
Completed,
ContinueOp(OpEnum),
SendAndContinue {
msg: NetMessage,
next_hop: Option<SocketAddr>,
state: OpEnum,
stream_data: Option<(crate::transport::peer_connection::StreamId, bytes::Bytes)>,
},
SendAndComplete {
msg: NetMessage,
next_hop: Option<SocketAddr>,
stream_data: Option<(crate::transport::peer_connection::StreamId, bytes::Bytes)>,
},
}
pub(crate) struct OpInitialization<Op> {
#[allow(dead_code)]
pub source_addr: Option<SocketAddr>,
pub op: Op,
}
pub(crate) async fn handle_op_request<Op, NB>(
op_manager: &OpManager,
network_bridge: &mut NB,
msg: &Op::Message,
source_addr: Option<SocketAddr>,
) -> Result<Option<OpEnum>, OpError>
where
Op: Operation,
NB: NetworkBridge,
{
let tx = *msg.id();
let result = {
let OpInitialization { source_addr: _, op } =
Op::load_or_init(op_manager, msg, source_addr).await?;
op.process_message(network_bridge, op_manager, msg, source_addr)
.await
};
handle_op_result(op_manager, network_bridge, result, tx, source_addr).await
}
#[inline(always)]
async fn handle_op_result<CB>(
op_manager: &OpManager,
network_bridge: &mut CB,
result: Result<OperationResult, OpError>,
tx_id: Transaction,
source_addr: Option<SocketAddr>,
) -> Result<Option<OpEnum>, OpError>
where
CB: NetworkBridge,
{
match result {
Err(OpError::StatePushed) => {
tracing::debug!("entered in state pushed to continue with op");
return Ok(None);
}
Err(OpError::OpNotPresent(tx)) => {
tracing::debug!(
tx = %tx,
"Ignoring duplicate message for already-handled operation"
);
return Ok(None);
}
Err(err) => {
tracing::error!(
tx = %tx_id,
error = %err,
error_debug = ?err,
source = ?source_addr,
"handle_op_result: sending Aborted due to operation error"
);
if let Some(addr) = source_addr {
network_bridge
.send(addr, NetMessage::V1(NetMessageV1::Aborted(tx_id)))
.await?;
}
return Err(err);
}
Ok(OperationResult::Completed) => {
op_manager.completed(tx_id);
}
Ok(OperationResult::ContinueOp(state)) => {
if state.finalized() {
tracing::debug!(%tx_id, "operation complete");
op_manager.completed(tx_id);
return Ok(Some(state));
} else {
let id = *state.id();
op_manager.push(id, state).await?;
}
}
Ok(OperationResult::SendAndContinue {
msg,
next_hop,
state: updated_state,
stream_data,
}) => {
if updated_state.finalized() {
let id = *msg.id();
tracing::debug!(%id, "operation finalized with outgoing message");
op_manager.completed(id);
if let Some(target) = next_hop {
tracing::debug!(%id, ?target, "sending final message to target");
send_with_stream(network_bridge, target, msg, stream_data).await?;
}
return Ok(Some(updated_state));
} else {
let id = *msg.id();
tracing::debug!(%id, "operation in progress");
let target = next_hop.ok_or_else(|| {
OpError::UnexpectedOpState
})?;
tracing::debug!(%id, ?target, "sending updated op state");
op_manager.push(id, updated_state).await?;
send_with_stream(network_bridge, target, msg, stream_data).await?;
}
}
Ok(OperationResult::SendAndComplete {
msg,
next_hop,
stream_data,
}) => {
if let Some(target) = next_hop {
tracing::debug!(%tx_id, ?target, "sending back message to target");
match send_with_stream(network_bridge, target, msg, stream_data).await {
Ok(()) => {
op_manager.completed(tx_id);
}
Err(e) => {
tracing::warn!(
%tx_id, %target, error = %e,
"Response send failed — originator will retry via speculative path"
);
return Err(e);
}
}
} else {
op_manager.completed(tx_id);
}
}
}
Ok(None)
}
async fn send_with_stream<CB: NetworkBridge>(
network_bridge: &mut CB,
target: SocketAddr,
msg: NetMessage,
stream_data: Option<(crate::transport::peer_connection::StreamId, bytes::Bytes)>,
) -> Result<(), OpError> {
let id = *msg.id();
let metadata = if stream_data.is_some() {
match bincode::serialize(&msg) {
Ok(bytes) => Some(bytes::Bytes::from(bytes)),
Err(e) => {
tracing::warn!(%id, error = %e, "Failed to serialize metadata for embedding");
None
}
}
} else {
None
};
network_bridge.send(target, msg).await?;
if let Some((stream_id, data)) = stream_data {
tracing::debug!(%id, %stream_id, ?target, "sending stream data");
network_bridge
.send_stream(target, stream_id, data, metadata)
.await?;
}
Ok(())
}
#[must_use]
#[allow(clippy::large_enum_variant)]
pub(crate) enum OpEnum {
Connect(Box<connect::ConnectOp>),
Subscribe(subscribe::SubscribeOp),
}
impl OpEnum {
delegate::delegate! {
to match self {
OpEnum::Connect(op) => op,
OpEnum::Subscribe(op) => op,
} {
pub fn id(&self) -> &Transaction;
pub fn outcome(&self) -> OpOutcome<'_>;
pub fn finalized(&self) -> bool;
pub fn to_host_result(&self) -> HostResult;
}
}
pub fn is_subscription_renewal(&self) -> bool {
matches!(self, OpEnum::Subscribe(op) if op.is_renewal())
}
}
macro_rules! try_from_op_enum {
($op_enum:path, $op_type:ty, $transaction_type:expr) => {
impl TryFrom<OpEnum> for $op_type {
type Error = OpError;
fn try_from(value: OpEnum) -> Result<Self, Self::Error> {
match value {
$op_enum(op) => Ok(op),
other => Err(OpError::IncorrectTxType(
$transaction_type,
other.id().transaction_type(),
)),
}
}
}
};
}
try_from_op_enum!(
OpEnum::Subscribe,
subscribe::SubscribeOp,
TransactionType::Subscribe
);
#[derive(Debug)]
pub(crate) enum OpOutcome<'a> {
ContractOpSuccess {
target_peer: &'a PeerKeyLocation,
contract_location: Location,
first_response_time: Duration,
payload_size: usize,
payload_transfer_time: Duration,
},
ContractOpSuccessUntimed {
target_peer: &'a PeerKeyLocation,
contract_location: Location,
},
ContractOpFailure {
target_peer: &'a PeerKeyLocation,
contract_location: Location,
},
Incomplete,
Irrelevant,
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum OpError {
#[error(transparent)]
ConnError(#[from] ConnectionError),
#[error(transparent)]
RingError(#[from] RingError),
#[error(transparent)]
ContractError(#[from] ContractError),
#[error(transparent)]
ExecutorError(#[from] ExecutorError),
#[error("unexpected operation state")]
UnexpectedOpState,
#[error(
"cannot perform a state transition from the current state with the provided input (tx: {tx})"
)]
InvalidStateTransition {
tx: Transaction,
#[cfg(debug_assertions)]
state: Option<Box<dyn std::fmt::Debug + Send + Sync>>,
#[cfg(debug_assertions)]
trace: StdTrace,
},
#[error("failed notifying, channel closed")]
NotificationError,
#[error("notification channel error: {0}")]
NotificationChannelError(String),
#[error("unspected transaction type, trying to get a {0:?} from a {1:?}")]
IncorrectTxType(TransactionType, TransactionType),
#[error("op not present: {0}")]
OpNotPresent(Transaction),
#[error("op not available")]
OpNotAvailable(#[from] OpNotAvailable),
#[error("stream was cancelled")]
StreamCancelled,
#[error("failed to claim orphan stream")]
OrphanStreamClaimFailed,
#[error("early push of state into the op stack")]
StatePushed,
}
impl OpError {
pub fn invalid_transition(tx: Transaction) -> Self {
Self::InvalidStateTransition {
tx,
#[cfg(debug_assertions)]
state: None,
#[cfg(debug_assertions)]
trace: StdTrace::force_capture(),
}
}
pub fn is_contract_exec_rejection(&self) -> bool {
matches!(self, Self::ExecutorError(e) if e.is_contract_exec_rejection())
}
pub fn is_missing_contract_parameters(&self) -> bool {
matches!(self, Self::ExecutorError(e) if e.is_missing_contract_parameters())
}
pub fn is_invalid_update_rejection(&self) -> bool {
matches!(self, Self::ExecutorError(e) if e.is_invalid_update_rejection())
}
}
impl<T> From<SendError<T>> for OpError {
fn from(_: SendError<T>) -> OpError {
OpError::NotificationError
}
}
pub(crate) async fn announce_contract_hosted(op_manager: &OpManager, key: &ContractKey) {
if let Some(announcement) = op_manager.neighbor_hosting.on_contract_hosted(key) {
tracing::debug!(
%key,
"NEIGHBOR_HOSTING: Announcing contract hosted to neighbors"
);
if let Err(err) = op_manager
.notify_node_event(crate::message::NodeEvent::BroadcastHostingUpdate {
message: announcement,
})
.await
{
tracing::warn!(
contract = %key,
error = %err,
phase = "error",
"NEIGHBOR_HOSTING: Failed to broadcast hosting announcement"
);
}
}
}
#[allow(dead_code)]
pub(crate) async fn setup_subscription_forwarding_at_relay(
op_manager: &OpManager,
key: &ContractKey,
tx: &crate::message::Transaction,
upstream_response_addr: std::net::SocketAddr,
downstream_requester_addr: std::net::SocketAddr,
) {
if let Some(upstream_pkl) = op_manager
.ring
.connection_manager
.get_peer_by_addr(upstream_response_addr)
{
let peer_key = crate::ring::interest::PeerKey::from(upstream_pkl.pub_key.clone());
op_manager
.interest_manager
.register_peer_interest(key, peer_key, None, true);
} else {
tracing::debug!(
tx = %tx,
contract = %key,
upstream = %upstream_response_addr,
"Piggyback relay: upstream peer not in ring, skipping interest registration"
);
}
subscribe::register_downstream_subscriber(
op_manager,
key,
downstream_requester_addr,
None,
None,
tx,
" (relay, piggybacked on GET response)",
)
.await;
tracing::debug!(
tx = %tx,
contract = %key,
upstream = %upstream_response_addr,
downstream = %downstream_requester_addr,
"Set up subscription forwarding at relay via GET piggyback"
);
}
pub(crate) async fn complete_piggyback_subscription(
op_manager: &OpManager,
key: &ContractKey,
tx: &crate::message::Transaction,
sender_from_addr: &Option<crate::ring::PeerKeyLocation>,
) {
op_manager.ring.subscribe(*key);
op_manager.ring.complete_subscription_request(key, true);
let became_interested = op_manager.interest_manager.add_local_client(key);
if became_interested {
broadcast_change_interests(op_manager, vec![*key], vec![]).await;
}
announce_contract_hosted(op_manager, key).await;
if let Some(upstream_pkl) = sender_from_addr.as_ref() {
let peer_key = crate::ring::interest::PeerKey::from(upstream_pkl.pub_key.clone());
op_manager
.interest_manager
.register_peer_interest(key, peer_key, None, true);
tracing::debug!(tx = %tx, contract = %key, "Subscription completed via GET piggyback");
} else {
tracing::warn!(
tx = %tx,
contract = %key,
"GET piggyback: upstream peer not in ring, subscription tree incomplete -- renewal will heal"
);
}
}
pub(crate) async fn auto_subscribe_on_get_response(
op_manager: &OpManager,
key: &ContractKey,
tx: &crate::message::Transaction,
sender_from_addr: &Option<crate::ring::PeerKeyLocation>,
subscribe_requested: bool,
blocking_sub: bool,
path_label: &str,
) {
if subscribe_requested {
complete_piggyback_subscription(op_manager, key, tx, sender_from_addr).await;
} else {
let child_tx = start_subscription_request(op_manager, *tx, *key, blocking_sub);
tracing::debug!(tx = %tx, %child_tx, blocking = %blocking_sub, "started subscription ({path_label}, fallback)");
}
}
pub(crate) async fn broadcast_change_interests(
op_manager: &OpManager,
added: Vec<ContractKey>,
removed: Vec<ContractKey>,
) {
use crate::ring::interest::contract_hash;
if added.is_empty() && removed.is_empty() {
return;
}
let added_hashes: Vec<u32> = added.iter().map(contract_hash).collect();
let removed_hashes: Vec<u32> = removed.iter().map(contract_hash).collect();
tracing::debug!(
added_count = added_hashes.len(),
removed_count = removed_hashes.len(),
"Broadcasting ChangeInterests to neighbors"
);
if let Err(err) = op_manager
.notify_node_event(crate::message::NodeEvent::BroadcastChangeInterests {
added: added_hashes,
removed: removed_hashes,
})
.await
{
tracing::warn!(
error = %err,
"Failed to broadcast ChangeInterests"
);
}
}
pub(super) fn start_subscription_request(
op_manager: &OpManager,
parent_tx: Transaction,
key: ContractKey,
blocking: bool,
) -> Transaction {
let child_tx = Transaction::new_child_of::<subscribe::SubscribeMsg>(&parent_tx);
tracing::debug!(
%parent_tx,
%child_tx,
%key,
blocking,
"spawning child subscription operation (task-per-tx driver)"
);
let op_manager_arc = std::sync::Arc::new(op_manager.clone());
let instance_id = *key.id();
GlobalExecutor::spawn(async move {
subscribe::run_client_subscribe(op_manager_arc, instance_id, child_tx).await;
});
child_tx
}
async fn has_contract(
op_manager: &OpManager,
instance_id: ContractInstanceId,
) -> Result<Option<ContractKey>, OpError> {
match op_manager
.notify_contract_handler(crate::contract::ContractHandlerEvent::GetQuery {
instance_id,
return_contract_code: false,
})
.await?
{
crate::contract::ContractHandlerEvent::GetResponse {
key,
response: Ok(crate::contract::StoreResponse { state: Some(_), .. }),
} => Ok(key),
crate::contract::ContractHandlerEvent::DelegateRequest { .. }
| crate::contract::ContractHandlerEvent::DelegateResponse(_)
| crate::contract::ContractHandlerEvent::PutQuery { .. }
| crate::contract::ContractHandlerEvent::PutResponse { .. }
| crate::contract::ContractHandlerEvent::GetQuery { .. }
| crate::contract::ContractHandlerEvent::GetResponse { .. }
| crate::contract::ContractHandlerEvent::UpdateQuery { .. }
| crate::contract::ContractHandlerEvent::UpdateResponse { .. }
| crate::contract::ContractHandlerEvent::UpdateNoChange { .. }
| crate::contract::ContractHandlerEvent::RegisterSubscriberListener { .. }
| crate::contract::ContractHandlerEvent::RegisterSubscriberListenerResponse
| crate::contract::ContractHandlerEvent::QuerySubscriptions { .. }
| crate::contract::ContractHandlerEvent::QuerySubscriptionsResponse
| crate::contract::ContractHandlerEvent::GetSummaryQuery { .. }
| crate::contract::ContractHandlerEvent::GetSummaryResponse { .. }
| crate::contract::ContractHandlerEvent::GetDeltaQuery { .. }
| crate::contract::ContractHandlerEvent::GetDeltaResponse { .. }
| crate::contract::ContractHandlerEvent::NotifySubscriptionError { .. }
| crate::contract::ContractHandlerEvent::NotifySubscriptionErrorResponse
| crate::contract::ContractHandlerEvent::ClientDisconnect { .. } => Ok(None),
}
}
pub(crate) fn should_use_streaming(streaming_threshold: usize, payload_size: usize) -> bool {
payload_size > streaming_threshold
}
const STREAMING_THROUGHPUT_FLOOR_BPS: usize = 20 * 1024;
const STREAMING_MIN_DRAIN_SECS: u64 = 30;
const STREAMING_ATTEMPT_TIMEOUT_CAP: std::time::Duration = std::time::Duration::from_secs(600);
pub(crate) fn streaming_aware_attempt_timeout(
streaming_threshold: usize,
payload_size: usize,
) -> std::time::Duration {
if !should_use_streaming(streaming_threshold, payload_size) {
return crate::config::OPERATION_TTL;
}
let drain_secs =
((payload_size / STREAMING_THROUGHPUT_FLOOR_BPS) as u64).max(STREAMING_MIN_DRAIN_SECS);
let total = crate::config::OPERATION_TTL + std::time::Duration::from_secs(drain_secs);
total.min(STREAMING_ATTEMPT_TIMEOUT_CAP)
}
pub(crate) fn record_relay_route_event(
op_manager: &OpManager,
next_hop: PeerKeyLocation,
contract_location: Location,
outcome: crate::router::RouteOutcome,
op_type: crate::node::network_status::OpType,
) {
#[cfg(any(test, feature = "testing"))]
{
use std::sync::atomic::Ordering;
let counter = match op_type {
crate::node::network_status::OpType::Get => &RELAY_GET_ROUTE_EVENT_COUNT,
crate::node::network_status::OpType::Put => &RELAY_PUT_ROUTE_EVENT_COUNT,
crate::node::network_status::OpType::Update => &RELAY_UPDATE_ROUTE_EVENT_COUNT,
crate::node::network_status::OpType::Subscribe => &RELAY_SUBSCRIBE_ROUTE_EVENT_COUNT,
};
counter.fetch_add(1, Ordering::Relaxed);
}
op_manager
.ring
.router
.write()
.add_event(crate::router::RouteEvent {
peer: next_hop,
contract_location,
outcome,
op_type: Some(op_type),
});
}
#[cfg(any(test, feature = "testing"))]
pub static RELAY_GET_ROUTE_EVENT_COUNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
#[cfg(any(test, feature = "testing"))]
pub static RELAY_PUT_ROUTE_EVENT_COUNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
#[cfg(any(test, feature = "testing"))]
pub static RELAY_UPDATE_ROUTE_EVENT_COUNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
#[cfg(any(test, feature = "testing"))]
pub static RELAY_SUBSCRIBE_ROUTE_EVENT_COUNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
#[cfg(test)]
mod ordering_invariant_tests {
use super::test_utils::MockNetworkBridge;
use crate::message::{NetMessage, NetMessageV1, Transaction};
use crate::node::NetworkBridge;
use crate::operations::connect::ConnectMsg;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
#[tokio::test]
async fn mock_network_bridge_records_send_ordering() {
let bridge = MockNetworkBridge::new();
let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5000);
let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5001);
let tx1 = Transaction::new::<ConnectMsg>();
let tx2 = Transaction::new::<ConnectMsg>();
bridge
.send(addr1, NetMessage::V1(NetMessageV1::Aborted(tx1)))
.await
.unwrap();
bridge
.send(addr2, NetMessage::V1(NetMessageV1::Aborted(tx2)))
.await
.unwrap();
let sent = bridge.sent_messages();
assert_eq!(sent.len(), 2);
assert_eq!(sent[0].0, addr1, "First send should be to addr1");
assert_eq!(sent[1].0, addr2, "Second send should be to addr2");
}
#[test]
fn push_before_send_invariant_is_documented() {
}
}
#[cfg(test)]
mod streaming_tests {
use super::{
STREAMING_ATTEMPT_TIMEOUT_CAP, should_use_streaming, streaming_aware_attempt_timeout,
};
use crate::config::OPERATION_TTL;
use std::time::Duration;
const DEFAULT_THRESHOLD: usize = 64 * 1024;
#[test]
fn test_streaming_respects_threshold() {
assert!(!should_use_streaming(DEFAULT_THRESHOLD, 0));
assert!(!should_use_streaming(DEFAULT_THRESHOLD, 1000));
assert!(!should_use_streaming(DEFAULT_THRESHOLD, DEFAULT_THRESHOLD)); assert!(should_use_streaming(
DEFAULT_THRESHOLD,
DEFAULT_THRESHOLD + 1
)); assert!(should_use_streaming(DEFAULT_THRESHOLD, 1024 * 1024)); }
#[test]
fn test_streaming_custom_threshold() {
let custom_threshold = 128 * 1024; assert!(!should_use_streaming(custom_threshold, 64 * 1024));
assert!(!should_use_streaming(custom_threshold, custom_threshold));
assert!(should_use_streaming(custom_threshold, custom_threshold + 1));
}
#[test]
fn test_streaming_zero_threshold() {
assert!(!should_use_streaming(0, 0));
assert!(should_use_streaming(0, 1));
assert!(should_use_streaming(0, 100));
}
#[test]
fn non_streaming_payload_uses_operation_ttl() {
assert_eq!(
streaming_aware_attempt_timeout(DEFAULT_THRESHOLD, 0),
OPERATION_TTL
);
assert_eq!(
streaming_aware_attempt_timeout(DEFAULT_THRESHOLD, 1024),
OPERATION_TTL
);
assert_eq!(
streaming_aware_attempt_timeout(DEFAULT_THRESHOLD, DEFAULT_THRESHOLD),
OPERATION_TTL
);
}
#[test]
fn website_payload_attempt_timeout_exceeds_observed_completion() {
let website_payload_size = 2_460_242; let timeout = streaming_aware_attempt_timeout(DEFAULT_THRESHOLD, website_payload_size);
let observed_completion = Duration::from_secs(63);
assert!(
timeout > observed_completion,
"streaming-aware timeout ({timeout:?}) must exceed observed \
completion time ({observed_completion:?}) so the retry loop \
does not fire while the original streaming PUT is still in \
flight (issue #4001)"
);
assert!(
timeout > OPERATION_TTL,
"streaming-aware timeout ({timeout:?}) must exceed OPERATION_TTL \
({OPERATION_TTL:?}); otherwise the fix is a no-op for the bug \
reported in #4001"
);
}
#[test]
fn streaming_timeout_scales_with_payload_size() {
let small_streaming = streaming_aware_attempt_timeout(DEFAULT_THRESHOLD, 1_000_000);
let medium_streaming = streaming_aware_attempt_timeout(DEFAULT_THRESHOLD, 10_000_000);
assert!(
small_streaming < medium_streaming,
"1 MB timeout ({small_streaming:?}) must be smaller than \
10 MB timeout ({medium_streaming:?})"
);
assert!(small_streaming > OPERATION_TTL);
assert!(medium_streaming > OPERATION_TTL);
}
#[test]
fn streaming_timeout_capped_at_ceiling() {
let huge = streaming_aware_attempt_timeout(DEFAULT_THRESHOLD, 1024 * 1024 * 1024);
assert_eq!(
huge, STREAMING_ATTEMPT_TIMEOUT_CAP,
"huge payloads must clamp to the cap"
);
}
#[test]
fn streaming_timeout_jumps_above_threshold_boundary() {
let at_threshold = streaming_aware_attempt_timeout(DEFAULT_THRESHOLD, DEFAULT_THRESHOLD);
let just_above = streaming_aware_attempt_timeout(DEFAULT_THRESHOLD, DEFAULT_THRESHOLD + 1);
let in_truncation_gap =
streaming_aware_attempt_timeout(DEFAULT_THRESHOLD, DEFAULT_THRESHOLD + 19 * 1024);
assert_eq!(at_threshold, OPERATION_TTL);
assert!(
just_above > OPERATION_TTL,
"just-above-threshold timeout {just_above:?} must STRICTLY \
exceed OPERATION_TTL ({OPERATION_TTL:?}); a fix that lets \
this equal OPERATION_TTL is a no-op for the size range \
(threshold, threshold + 20 KiB) — exactly the truncation \
gap STREAMING_MIN_DRAIN_SECS exists to close (#4001 \
skeptical review)"
);
assert!(
in_truncation_gap > OPERATION_TTL,
"payload in the truncation gap (threshold + 19 KiB) must \
exceed OPERATION_TTL — STREAMING_MIN_DRAIN_SECS guarantees it"
);
}
#[test]
fn streaming_timeout_min_drain_floor() {
let just_above = streaming_aware_attempt_timeout(DEFAULT_THRESHOLD, DEFAULT_THRESHOLD + 1);
assert_eq!(
just_above,
OPERATION_TTL + Duration::from_secs(super::STREAMING_MIN_DRAIN_SECS),
"streaming-eligible payloads must get at least \
OPERATION_TTL + STREAMING_MIN_DRAIN_SECS"
);
}
#[test]
fn streaming_timeout_cap_boundary() {
const FLOOR_BPS: usize = 20 * 1024;
let scaling_max_bytes =
(STREAMING_ATTEMPT_TIMEOUT_CAP - OPERATION_TTL).as_secs() as usize * FLOOR_BPS;
let just_below_cap = streaming_aware_attempt_timeout(DEFAULT_THRESHOLD, scaling_max_bytes);
let at_cap = streaming_aware_attempt_timeout(DEFAULT_THRESHOLD, scaling_max_bytes + 1);
assert_eq!(just_below_cap, STREAMING_ATTEMPT_TIMEOUT_CAP);
assert_eq!(at_cap, STREAMING_ATTEMPT_TIMEOUT_CAP);
}
}
#[cfg(test)]
mod sub_op_subscribe_migration_pin_tests {
fn extract_start_subscription_request_body() -> &'static str {
let src = include_str!("operations.rs");
let head = ["fn ", "start_subscription_request("].concat();
let start = src
.find(&head)
.expect("`fn start_subscription_request(` must exist in operations.rs");
let body_open = src[start..]
.find('{')
.map(|off| start + off)
.expect("expected `{` after function signature");
let mut depth: i32 = 0;
let mut end = body_open;
for (i, ch) in src[body_open..].char_indices() {
match ch {
'{' => depth += 1,
'}' => {
depth -= 1;
if depth == 0 {
end = body_open + i + 1;
break;
}
}
_ => {}
}
}
assert!(
end > body_open,
"failed to find matching `}}` for start_subscription_request"
);
&src[start..end]
}
#[test]
fn start_subscription_request_does_not_call_legacy_request_subscribe() {
let body = extract_start_subscription_request_body();
assert!(
!body.contains("subscribe::request_subscribe"),
"`start_subscription_request` must NOT route through the \
legacy `subscribe::request_subscribe` driver — sub-op \
SUBSCRIBE migrated to `subscribe::run_client_subscribe` \
(see #1454 follow-up)."
);
}
#[test]
fn start_subscription_request_does_not_register_with_sub_op_tracker() {
let body = extract_start_subscription_request_body();
assert!(
!body.contains("expect_and_register_sub_operation"),
"`start_subscription_request` must NOT call \
`expect_and_register_sub_operation` — `SubOperationTracker` \
was deleted in #1454 Phase 3c. Reintroducing it would be a \
regression."
);
assert!(
!body.contains("sub_operation_failed"),
"`start_subscription_request` must NOT propagate failures \
via `sub_operation_failed` — the task-per-tx subscribe \
driver publishes its own `HostResult::Err`."
);
}
#[test]
fn start_subscription_request_spawns_task_per_tx_driver() {
let body = extract_start_subscription_request_body();
assert!(
body.contains("subscribe::run_client_subscribe"),
"`start_subscription_request` must spawn the task-per-tx \
subscribe driver `subscribe::run_client_subscribe` — \
matches the `maybe_subscribe_child` pattern in \
`put/op_ctx_task.rs` and `get/op_ctx_task.rs`."
);
}
}