use crate::client_events::{ClientId, OpenRequest, RequestId};
use crate::config::GlobalExecutor;
use crate::contract::{SessionMessage, contract_handler_channel};
use freenet_stdlib::client_api::ClientRequest;
use tokio::sync::mpsc;
#[tokio::test]
async fn test_end_to_end_correlation_workflow() {
let client_id = ClientId::FIRST;
let request = Box::new(ClientRequest::Disconnect { cause: None });
let open_req = OpenRequest::new(client_id, request);
let correlation_id = open_req.request_id;
let display = format!("{}", open_req);
assert!(display.contains("client: "));
assert!(display.contains(&format!("request_id: {}", correlation_id)));
let processed_request_id = open_req.request_id;
assert_eq!(correlation_id, processed_request_id);
let concurrent_requests: Vec<_> = (0..100)
.map(|_| {
let req = Box::new(ClientRequest::Disconnect { cause: None });
OpenRequest::new(client_id, req)
})
.collect();
let mut request_ids: Vec<_> = concurrent_requests
.iter()
.map(|req| req.request_id)
.collect();
request_ids.sort();
request_ids.dedup();
assert_eq!(request_ids.len(), 100, "All request IDs should be unique");
}
#[tokio::test]
async fn test_session_adapter_installation() {
let (mut ch_outbound, _ch_inbound, _wait_for_event) = contract_handler_channel();
let (session_tx, _session_rx) = mpsc::channel(100);
ch_outbound.with_session_adapter(session_tx);
}
#[tokio::test]
async fn test_result_router_receives_host_responses() {
use crate::client_events::{HostResult, result_router::ResultRouter};
use crate::message::Transaction;
use freenet_stdlib::client_api::HostResponse;
let (network_tx, network_rx) = mpsc::channel::<(Transaction, HostResult)>(100);
let (session_tx, mut session_rx) = mpsc::channel::<SessionMessage>(100);
let router = ResultRouter::new(network_rx, session_tx);
let router_handle = GlobalExecutor::spawn(async move {
router.run().await;
});
use crate::operations::put::PutMsg;
use freenet_stdlib::prelude::{ContractCode, Parameters, WrappedContract};
use std::sync::Arc;
let tx = Transaction::new::<PutMsg>();
let contract = WrappedContract::new(
Arc::new(ContractCode::from(vec![1, 2, 3])),
Parameters::from(vec![4u8, 5u8]),
);
let success_response = Ok(HostResponse::ContractResponse(
freenet_stdlib::client_api::ContractResponse::PutResponse {
key: *contract.key(),
},
));
network_tx.send((tx, success_response)).await.unwrap();
if let Ok(msg) =
tokio::time::timeout(tokio::time::Duration::from_millis(100), session_rx.recv()).await
{
match msg.unwrap() {
SessionMessage::DeliverHostResponse {
tx: received_tx,
response,
} => {
assert_eq!(received_tx, tx);
assert!((*response).is_ok());
}
other @ SessionMessage::RegisterClient { .. }
| other @ SessionMessage::RegisterTransaction { .. }
| other @ SessionMessage::DeliverResult { .. }
| other @ SessionMessage::DeliverHostResponseWithRequestId { .. }
| other @ SessionMessage::ClientDisconnect { .. } => {
panic!("Expected DeliverHostResponse, got {:?}", other)
}
}
} else {
panic!("Router should forward message within timeout");
}
drop(network_tx);
router_handle.abort();
}
#[tokio::test]
async fn test_dual_path_identical_results() {
use crate::client_events::HostResult;
use crate::message::Transaction;
use freenet_stdlib::client_api::HostResponse;
use tokio::sync::mpsc;
let (router_tx, mut router_rx) = mpsc::channel::<(Transaction, HostResult)>(100);
let (legacy_tx, mut legacy_rx) =
mpsc::channel::<(crate::client_events::ClientId, HostResult)>(100);
use crate::operations::put::PutMsg;
use freenet_stdlib::prelude::{ContractCode, Parameters, WrappedContract};
use std::sync::Arc;
let tx = Transaction::new::<PutMsg>();
let client_id = crate::client_events::ClientId::FIRST;
let contract = WrappedContract::new(
Arc::new(ContractCode::from(vec![1, 2, 3])),
Parameters::from(vec![4u8, 5u8]),
);
let host_result_1 = Ok(HostResponse::ContractResponse(
freenet_stdlib::client_api::ContractResponse::PutResponse {
key: *contract.key(),
},
));
let host_result_2 = Ok(HostResponse::ContractResponse(
freenet_stdlib::client_api::ContractResponse::PutResponse {
key: *contract.key(),
},
));
router_tx.send((tx, host_result_1)).await.unwrap();
legacy_tx.send((client_id, host_result_2)).await.unwrap();
let router_result = router_rx.recv().await.unwrap();
let legacy_result = legacy_rx.recv().await.unwrap();
assert_eq!(router_result.0, tx);
match (&router_result.1, &legacy_result.1) {
(Ok(_), Ok(_)) => {} (Err(_), Err(_)) => {} _ => panic!("Results should have same success/error state"),
}
assert_eq!(legacy_result.0, client_id);
}
#[tokio::test]
async fn test_router_receives_results_without_legacy_callback() {
use crate::client_events::HostResult;
use crate::message::Transaction;
use freenet_stdlib::client_api::HostResponse;
use tokio::sync::mpsc;
let (router_tx, mut router_rx) = mpsc::channel::<(Transaction, HostResult)>(100);
use crate::operations::put::PutMsg;
use freenet_stdlib::prelude::{ContractCode, Parameters, WrappedContract};
use std::sync::Arc;
let tx = Transaction::new::<PutMsg>();
let contract = WrappedContract::new(
Arc::new(ContractCode::from(vec![1, 2, 3])),
Parameters::from(vec![4u8, 5u8]),
);
let host_result = Ok(HostResponse::ContractResponse(
freenet_stdlib::client_api::ContractResponse::PutResponse {
key: *contract.key(),
},
));
if router_tx.try_send((tx, host_result)).is_ok() {
}
let router_result = router_rx.recv().await.unwrap();
assert_eq!(router_result.0, tx);
assert!(router_result.1.is_ok());
}
#[tokio::test]
async fn test_zero_performance_overhead_correlation() {
use std::time::Instant;
const NUM_REQUESTS: usize = 10000;
let start = Instant::now();
let requests_with_correlation: Vec<_> = (0..NUM_REQUESTS)
.map(|_| {
let req = Box::new(ClientRequest::Disconnect { cause: None });
OpenRequest::new(ClientId::FIRST, req)
})
.collect();
let with_correlation = start.elapsed();
assert_eq!(requests_with_correlation.len(), NUM_REQUESTS);
assert!(
with_correlation.as_millis() < 100,
"Correlation should add minimal overhead, took {}ms",
with_correlation.as_millis()
);
let mut ids: Vec<_> = requests_with_correlation
.iter()
.map(|r| r.request_id)
.collect();
ids.sort();
ids.dedup();
assert_eq!(ids.len(), NUM_REQUESTS, "All IDs should be unique at scale");
}
#[test]
fn test_correlation_thread_safety_comprehensive() {
use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use std::thread;
const NUM_THREADS: usize = 10;
const REQUESTS_PER_THREAD: usize = 1000;
let all_ids = Arc::new(Mutex::new(HashSet::new()));
let handles: Vec<_> = (0..NUM_THREADS)
.map(|_| {
let ids_clone = Arc::clone(&all_ids);
thread::spawn(move || {
let thread_ids: Vec<_> =
(0..REQUESTS_PER_THREAD).map(|_| RequestId::new()).collect();
let mut global_ids = ids_clone.lock().unwrap();
for id in thread_ids {
assert!(global_ids.insert(id), "Duplicate ID found: {}", id);
}
})
})
.collect();
for handle in handles {
handle.join().expect("Thread should complete successfully");
}
let final_ids = all_ids.lock().unwrap();
assert_eq!(
final_ids.len(),
NUM_THREADS * REQUESTS_PER_THREAD,
"All IDs should be unique across threads"
);
}