use crate::client_events::HostResult;
use crate::contract::SessionMessage;
use crate::message::Transaction;
use tokio::sync::mpsc;
pub struct ResultRouter {
network_results: mpsc::Receiver<(Transaction, HostResult)>,
session_actor_tx: mpsc::Sender<SessionMessage>,
}
impl ResultRouter {
pub fn new(
network_results: mpsc::Receiver<(Transaction, HostResult)>,
session_actor_tx: mpsc::Sender<SessionMessage>,
) -> Self {
Self {
network_results,
session_actor_tx,
}
}
const SESSION_SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
pub async fn run(mut self) {
while let Some((tx, host_result)) = self.network_results.recv().await {
tracing::info!("ResultRouter received result for transaction: {}", tx);
let msg = SessionMessage::DeliverHostResponse {
tx,
response: std::sync::Arc::new(host_result),
};
tracing::info!(
"ResultRouter sending result to SessionActor for transaction: {}",
tx
);
match tokio::time::timeout(Self::SESSION_SEND_TIMEOUT, self.session_actor_tx.send(msg))
.await
{
Ok(Ok(())) => {}
Ok(Err(e)) => {
tracing::error!(
transaction = %tx,
error = %e,
"CRITICAL: Session actor channel closed - result routing failed. \
Actor-based client delivery is broken."
);
break;
}
Err(_) => {
tracing::warn!(
transaction = %tx,
timeout_secs = Self::SESSION_SEND_TIMEOUT.as_secs(),
"ResultRouter dropping result: session actor channel full"
);
}
}
}
tracing::error!(
"CRITICAL: ResultRouter shutting down due to session actor failure. \
Dual-path delivery compromised. Consider restarting node."
);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::GlobalExecutor;
use crate::operations::get::GetMsg;
use freenet_stdlib::client_api::HostResponse;
fn create_test_transaction() -> Transaction {
Transaction::new::<GetMsg>()
}
fn create_test_host_result() -> HostResult {
Ok(HostResponse::Ok)
}
#[tokio::test]
async fn test_result_router_creation() {
let (network_tx, network_rx) = mpsc::channel(100);
let (session_tx, _session_rx) = mpsc::channel(100);
let router = ResultRouter::new(network_rx, session_tx);
drop(router);
drop(network_tx);
}
#[tokio::test]
async fn test_result_routing() {
let (network_tx, network_rx) = mpsc::channel(100);
let (session_tx, mut session_rx) = mpsc::channel(100);
let router = ResultRouter::new(network_rx, session_tx);
GlobalExecutor::spawn(async move {
router.run().await;
});
drop(network_tx);
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
assert!(session_rx.try_recv().is_err());
}
#[tokio::test]
async fn test_single_result_delivery() {
let (network_tx, network_rx) = mpsc::channel(100);
let (session_tx, mut session_rx) = mpsc::channel(100);
let router = ResultRouter::new(network_rx, session_tx);
GlobalExecutor::spawn(async move {
router.run().await;
});
let tx = create_test_transaction();
let result = create_test_host_result();
network_tx
.send((tx, result.clone()))
.await
.expect("Failed to send result");
let received =
tokio::time::timeout(tokio::time::Duration::from_millis(100), session_rx.recv())
.await
.expect("Timeout waiting for session message")
.expect("Channel closed unexpectedly");
match received {
SessionMessage::DeliverHostResponse {
tx: received_tx,
response,
} => {
assert_eq!(received_tx, tx);
assert!(response.is_ok(), "Expected success result");
}
SessionMessage::RegisterClient { .. }
| SessionMessage::RegisterTransaction { .. }
| SessionMessage::DeliverResult { .. }
| SessionMessage::DeliverHostResponseWithRequestId { .. }
| SessionMessage::ClientDisconnect { .. } => panic!("Unexpected message type"),
}
}
#[tokio::test]
async fn test_multiple_results_delivery() {
let (network_tx, network_rx) = mpsc::channel(100);
let (session_tx, mut session_rx) = mpsc::channel(100);
let router = ResultRouter::new(network_rx, session_tx);
GlobalExecutor::spawn(async move {
router.run().await;
});
let tx1 = create_test_transaction();
let tx2 = create_test_transaction();
let tx3 = create_test_transaction();
let result = create_test_host_result();
network_tx.send((tx1, result.clone())).await.unwrap();
network_tx.send((tx2, result.clone())).await.unwrap();
network_tx.send((tx3, result.clone())).await.unwrap();
for expected_tx in [tx1, tx2, tx3] {
let received =
tokio::time::timeout(tokio::time::Duration::from_millis(100), session_rx.recv())
.await
.expect("Timeout waiting for session message")
.expect("Channel closed unexpectedly");
match received {
SessionMessage::DeliverHostResponse {
tx: received_tx,
response,
} => {
assert_eq!(received_tx, expected_tx);
assert!(response.is_ok(), "Expected success result");
}
SessionMessage::RegisterClient { .. }
| SessionMessage::RegisterTransaction { .. }
| SessionMessage::DeliverResult { .. }
| SessionMessage::DeliverHostResponseWithRequestId { .. }
| SessionMessage::ClientDisconnect { .. } => panic!("Unexpected message type"),
}
}
}
#[tokio::test]
async fn test_session_actor_channel_closed() {
let (network_tx, network_rx) = mpsc::channel(100);
let (session_tx, session_rx) = mpsc::channel(100);
let router = ResultRouter::new(network_rx, session_tx);
let router_handle = GlobalExecutor::spawn(async move {
router.run().await;
});
drop(session_rx);
let tx = create_test_transaction();
let result = create_test_host_result();
network_tx.send((tx, result)).await.unwrap();
tokio::time::timeout(tokio::time::Duration::from_millis(100), router_handle)
.await
.expect("Router should exit when session actor channel closes")
.expect("Router task should complete successfully");
}
#[tokio::test]
async fn test_concurrent_result_delivery() {
let (network_tx, network_rx) = mpsc::channel(100);
let (session_tx, mut session_rx) = mpsc::channel(100);
let router = ResultRouter::new(network_rx, session_tx);
GlobalExecutor::spawn(async move {
router.run().await;
});
let mut handles = vec![];
for _ in 0..10 {
let tx_clone = network_tx.clone();
let handle = GlobalExecutor::spawn(async move {
let tx = create_test_transaction();
let result = create_test_host_result();
tx_clone.send((tx, result)).await.unwrap();
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
for _ in 0..10 {
let received =
tokio::time::timeout(tokio::time::Duration::from_millis(100), session_rx.recv())
.await
.expect("Timeout waiting for session message")
.expect("Channel closed unexpectedly");
assert!(
matches!(received, SessionMessage::DeliverHostResponse { .. }),
"Expected DeliverHostResponse message"
);
}
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
assert!(
session_rx.try_recv().is_err(),
"No extra messages should be present"
);
}
#[tokio::test]
async fn test_network_channel_closed() {
let (network_tx, network_rx) = mpsc::channel(100);
let (session_tx, mut session_rx) = mpsc::channel(100);
let router = ResultRouter::new(network_rx, session_tx);
let router_handle = GlobalExecutor::spawn(async move {
router.run().await;
});
let tx = create_test_transaction();
let result = create_test_host_result();
network_tx.send((tx, result.clone())).await.unwrap();
drop(network_tx);
let received =
tokio::time::timeout(tokio::time::Duration::from_millis(100), session_rx.recv())
.await
.expect("Timeout waiting for session message")
.expect("Channel closed unexpectedly");
match received {
SessionMessage::DeliverHostResponse {
tx: received_tx,
response,
} => {
assert_eq!(received_tx, tx);
assert!(response.is_ok(), "Expected success result");
}
SessionMessage::RegisterClient { .. }
| SessionMessage::RegisterTransaction { .. }
| SessionMessage::DeliverResult { .. }
| SessionMessage::DeliverHostResponseWithRequestId { .. }
| SessionMessage::ClientDisconnect { .. } => panic!("Unexpected message type"),
}
tokio::time::timeout(tokio::time::Duration::from_millis(100), router_handle)
.await
.expect("Router should exit when network channel closes")
.expect("Router task should complete successfully");
}
}