use std::net::SocketAddr;
use tokio::sync::mpsc;
use crate::message::{MessageStats, NetMessage, Transaction};
use crate::node::OpExecutionPayload;
use crate::operations::OpError;
pub(crate) struct OpCtx {
tx: Transaction,
op_execution_sender: mpsc::Sender<OpExecutionPayload>,
}
impl OpCtx {
pub(crate) fn new(
tx: Transaction,
op_execution_sender: mpsc::Sender<OpExecutionPayload>,
) -> Self {
Self {
tx,
op_execution_sender,
}
}
#[allow(dead_code)] pub fn tx(&self) -> Transaction {
self.tx
}
#[allow(dead_code)] pub async fn send_and_await(&mut self, msg: NetMessage) -> Result<NetMessage, OpError> {
self.send_and_await_inner(msg, None).await
}
pub async fn send_to_and_await(
&mut self,
target_addr: SocketAddr,
msg: NetMessage,
) -> Result<NetMessage, OpError> {
self.send_and_await_inner(msg, Some(target_addr)).await
}
pub async fn send_fire_and_forget(
&mut self,
target_addr: SocketAddr,
msg: NetMessage,
) -> Result<(), OpError> {
debug_assert_eq!(
msg.id(),
&self.tx,
"OpCtx::send_fire_and_forget: msg.id must match ctx.tx"
);
let (response_sender, _response_receiver) = mpsc::channel::<NetMessage>(1);
self.op_execution_sender
.send((response_sender, msg, Some(target_addr)))
.await
.map_err(|_| OpError::NotificationError)
}
async fn send_and_await_inner(
&mut self,
msg: NetMessage,
target_addr: Option<SocketAddr>,
) -> Result<NetMessage, OpError> {
debug_assert_eq!(
msg.id(),
&self.tx,
"OpCtx::send_and_await: msg.id must match ctx.tx"
);
let (response_sender, mut response_receiver) = mpsc::channel::<NetMessage>(1);
self.op_execution_sender
.send((response_sender, msg, target_addr))
.await
.map_err(|_| OpError::NotificationError)?;
match response_receiver.recv().await {
Some(reply) => {
debug_assert_eq!(
reply.id(),
&self.tx,
"OpCtx::send_and_await: reply tx must match ctx.tx"
);
Ok(reply)
}
None => Err(OpError::NotificationError),
}
}
}
use crate::config::OPERATION_TTL;
use crate::node::OpManager;
#[allow(dead_code)] pub(crate) enum AttemptOutcome<T> {
Terminal(T),
Retry,
Unexpected,
}
pub(crate) enum AdvanceOutcome {
Next,
Exhausted,
}
#[allow(dead_code)] pub(crate) enum RetryLoopOutcome<T> {
Done(T),
Exhausted(String),
Unexpected,
InfraError(OpError),
}
pub(crate) trait RetryDriver {
type Terminal;
fn new_attempt_tx(&mut self) -> Transaction;
fn build_request(&mut self, attempt_tx: Transaction) -> NetMessage;
fn classify(&mut self, reply: NetMessage) -> AttemptOutcome<Self::Terminal>;
fn advance(&mut self) -> AdvanceOutcome;
}
pub(crate) async fn drive_retry_loop<D: RetryDriver>(
op_manager: &OpManager,
client_tx: Transaction,
op_label: &str,
driver: &mut D,
) -> RetryLoopOutcome<D::Terminal> {
let mut is_first_attempt = true;
let mut attempt_count: usize = 0;
loop {
let attempt_tx = if is_first_attempt {
client_tx
} else {
driver.new_attempt_tx()
};
is_first_attempt = false;
attempt_count += 1;
let request = driver.build_request(attempt_tx);
let mut ctx = op_manager.op_ctx(attempt_tx);
let round_trip = tokio::time::timeout(OPERATION_TTL, ctx.send_and_await(request)).await;
op_manager.release_pending_op_slot(attempt_tx).await;
let reply = match round_trip {
Ok(Ok(reply)) => reply,
Ok(Err(err)) => {
tracing::warn!(
tx = %client_tx,
attempt_tx = %attempt_tx,
attempt = attempt_count,
outcome = "wire_error",
error = %err,
"{op_label} (task-per-tx): send_and_await failed; advancing"
);
match driver.advance() {
AdvanceOutcome::Next => continue,
AdvanceOutcome::Exhausted => {
return RetryLoopOutcome::Exhausted(format!(
"{op_label} failed after {attempt_count} attempts (last error: {err})"
));
}
}
}
Err(_) => {
tracing::warn!(
tx = %client_tx,
attempt_tx = %attempt_tx,
attempt = attempt_count,
outcome = "timeout",
timeout_secs = OPERATION_TTL.as_secs(),
"{op_label} (task-per-tx): attempt timed out; advancing"
);
match driver.advance() {
AdvanceOutcome::Next => continue,
AdvanceOutcome::Exhausted => {
return RetryLoopOutcome::Exhausted(format!(
"{op_label} timed out after {attempt_count} attempts"
));
}
}
}
};
match driver.classify(reply) {
AttemptOutcome::Terminal(value) => {
return RetryLoopOutcome::Done(value);
}
AttemptOutcome::Retry => {
tracing::debug!(
tx = %client_tx,
attempt_tx = %attempt_tx,
attempt = attempt_count,
outcome = "retry",
"{op_label} (task-per-tx): peer indicated retry; advancing"
);
match driver.advance() {
AdvanceOutcome::Next => continue,
AdvanceOutcome::Exhausted => {
return RetryLoopOutcome::Exhausted(format!(
"{op_label} exhausted all peers after {attempt_count} attempts"
));
}
}
}
AttemptOutcome::Unexpected => {
tracing::warn!(
tx = %client_tx,
attempt_tx = %attempt_tx,
attempt = attempt_count,
"{op_label} (task-per-tx): unexpected terminal reply"
);
return RetryLoopOutcome::Unexpected;
}
}
}
}
#[cfg(test)]
const _: fn() = || {
fn assert_send<T: Send>() {}
assert_send::<OpCtx>();
};
#[cfg(test)]
mod tests {
use super::*;
use crate::message::NetMessageV1;
use crate::node::{EventLoopNotificationsReceiver, event_loop_notification_channel};
use crate::operations::connect::ConnectMsg;
use tokio::time::{Duration, timeout};
fn dummy_reply_with_tx(tx: Transaction) -> NetMessage {
NetMessage::V1(NetMessageV1::Aborted(tx))
}
#[tokio::test]
async fn send_and_await_returns_reply_on_completion() {
let (receiver, sender) = event_loop_notification_channel();
let EventLoopNotificationsReceiver {
mut op_execution_receiver,
..
} = receiver;
let tx = Transaction::new::<ConnectMsg>();
let mut ctx = OpCtx::new(tx, sender.op_execution_sender.clone());
let executor = tokio::spawn(async move {
let (reply_sender, outbound, target_addr) = op_execution_receiver
.recv()
.await
.expect("outbound msg should be delivered");
assert_eq!(outbound.id(), &tx, "outbound msg tx must match the ctx tx");
assert_eq!(
target_addr, None,
"send_and_await should not specify a target"
);
reply_sender
.try_send(dummy_reply_with_tx(tx))
.expect("capacity-1 reply channel should accept the first send");
});
let outbound = dummy_reply_with_tx(tx);
let reply = timeout(Duration::from_secs(1), ctx.send_and_await(outbound))
.await
.expect("send_and_await should complete quickly")
.expect("send_and_await should return Ok");
assert_eq!(reply.id(), &tx, "reply tx must match ctx tx");
executor
.await
.expect("executor task should complete without panicking");
}
#[tokio::test]
async fn send_to_and_await_forwards_target_address() {
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
let (receiver, sender) = event_loop_notification_channel();
let EventLoopNotificationsReceiver {
mut op_execution_receiver,
..
} = receiver;
let tx = Transaction::new::<ConnectMsg>();
let mut ctx = OpCtx::new(tx, sender.op_execution_sender.clone());
let expected_target = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 42)), 31337);
let executor = tokio::spawn(async move {
let (reply_sender, outbound, target_addr) = op_execution_receiver
.recv()
.await
.expect("outbound msg should be delivered");
assert_eq!(outbound.id(), &tx, "outbound msg tx must match the ctx tx");
assert_eq!(
target_addr,
Some(expected_target),
"send_to_and_await must propagate the caller's target address \
through the op execution channel (regression for #3838)"
);
reply_sender
.try_send(dummy_reply_with_tx(tx))
.expect("capacity-1 reply channel should accept the first send");
});
let outbound = dummy_reply_with_tx(tx);
let reply = timeout(
Duration::from_secs(1),
ctx.send_to_and_await(expected_target, outbound),
)
.await
.expect("send_to_and_await should complete quickly")
.expect("send_to_and_await should return Ok");
assert_eq!(reply.id(), &tx, "reply tx must match ctx tx");
executor
.await
.expect("executor task should complete without panicking");
}
#[tokio::test]
async fn send_and_await_errors_on_dropped_receiver() {
let (receiver, sender) = event_loop_notification_channel();
let EventLoopNotificationsReceiver {
mut op_execution_receiver,
..
} = receiver;
let tx = Transaction::new::<ConnectMsg>();
let mut ctx = OpCtx::new(tx, sender.op_execution_sender.clone());
let executor = tokio::spawn(async move {
let (reply_sender, _outbound, _target_addr) = op_execution_receiver
.recv()
.await
.expect("outbound msg should be delivered");
drop(reply_sender);
});
let outbound = dummy_reply_with_tx(tx);
let result = timeout(Duration::from_secs(1), ctx.send_and_await(outbound))
.await
.expect("send_and_await should not hang when reply_sender is dropped");
assert!(
matches!(result, Err(OpError::NotificationError)),
"expected NotificationError on dropped reply_sender, got {result:?}"
);
executor
.await
.expect("executor task should complete without panicking");
}
#[tokio::test]
async fn send_and_await_errors_on_closed_sender() {
let (receiver, sender) = event_loop_notification_channel();
drop(receiver);
let tx = Transaction::new::<ConnectMsg>();
let mut ctx = OpCtx::new(tx, sender.op_execution_sender.clone());
let outbound = dummy_reply_with_tx(tx);
let result = ctx.send_and_await(outbound).await;
assert!(
matches!(result, Err(OpError::NotificationError)),
"expected NotificationError on closed executor channel, got {result:?}"
);
}
#[tokio::test]
async fn send_fire_and_forget_delivers_message_with_target() {
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
let (receiver, sender) = event_loop_notification_channel();
let EventLoopNotificationsReceiver {
mut op_execution_receiver,
..
} = receiver;
let tx = Transaction::new::<ConnectMsg>();
let mut ctx = OpCtx::new(tx, sender.op_execution_sender.clone());
let expected_target = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 9000);
let executor = tokio::spawn(async move {
let (reply_sender, outbound, target_addr) = op_execution_receiver
.recv()
.await
.expect("outbound msg should be delivered");
assert_eq!(outbound.id(), &tx, "outbound msg tx must match the ctx tx");
assert_eq!(
target_addr,
Some(expected_target),
"send_fire_and_forget must propagate the target address"
);
assert!(
reply_sender.is_closed(),
"response receiver should be dropped (callback reclaimable)"
);
});
let outbound = dummy_reply_with_tx(tx);
timeout(
Duration::from_secs(1),
ctx.send_fire_and_forget(expected_target, outbound),
)
.await
.expect("send_fire_and_forget should complete quickly")
.expect("send_fire_and_forget should return Ok");
executor
.await
.expect("executor task should complete without panicking");
}
#[tokio::test]
async fn send_fire_and_forget_errors_on_closed_sender() {
let (receiver, sender) = event_loop_notification_channel();
drop(receiver);
let tx = Transaction::new::<ConnectMsg>();
let mut ctx = OpCtx::new(tx, sender.op_execution_sender.clone());
let outbound = dummy_reply_with_tx(tx);
let result = ctx
.send_fire_and_forget(
std::net::SocketAddr::new(
std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
1234,
),
outbound,
)
.await;
assert!(
matches!(result, Err(OpError::NotificationError)),
"expected NotificationError on closed executor channel, got {result:?}"
);
}
#[tokio::test]
async fn send_and_await_second_call_hangs_as_documented() {
use tokio::sync::oneshot;
let (receiver, sender) = event_loop_notification_channel();
let EventLoopNotificationsReceiver {
mut op_execution_receiver,
..
} = receiver;
let tx = Transaction::new::<ConnectMsg>();
let mut ctx = OpCtx::new(tx, sender.op_execution_sender.clone());
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let executor = tokio::spawn(async move {
let (reply_sender_1, _first, _target_addr_1) = op_execution_receiver
.recv()
.await
.expect("first outbound delivered");
reply_sender_1
.try_send(dummy_reply_with_tx(tx))
.expect("first reply accepted");
let (reply_sender_2, _second, _target_addr_2) = op_execution_receiver
.recv()
.await
.expect("second outbound delivered");
drop(shutdown_rx.await);
drop(reply_sender_2);
});
let first = timeout(
Duration::from_secs(1),
ctx.send_and_await(dummy_reply_with_tx(tx)),
)
.await
.expect("first send_and_await should complete quickly")
.expect("first send_and_await should return Ok");
assert_eq!(first.id(), &tx);
let second = timeout(
Duration::from_millis(500),
ctx.send_and_await(dummy_reply_with_tx(tx)),
)
.await;
assert!(
second.is_err(),
"second send_and_await should have elapsed per the single-use-per-tx doc; got {second:?}"
);
match shutdown_tx.send(()) {
Ok(()) | Err(()) => {}
}
executor
.await
.expect("executor task should complete without panicking");
}
}