use phantom_protocol::runtime::{BoxFuture, Runtime, SpawnHandle, TokioRuntime};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
#[derive(Default)]
struct CountingRuntime {
inner: TokioRuntime,
spawns: AtomicUsize,
sleeps: AtomicUsize,
}
impl CountingRuntime {
fn spawns(&self) -> usize {
self.spawns.load(Ordering::SeqCst)
}
fn sleeps(&self) -> usize {
self.sleeps.load(Ordering::SeqCst)
}
}
impl Runtime for CountingRuntime {
fn spawn(&self, fut: BoxFuture<()>) -> SpawnHandle {
self.spawns.fetch_add(1, Ordering::SeqCst);
self.inner.spawn(fut)
}
fn sleep(&self, duration: Duration) -> BoxFuture<()> {
self.sleeps.fetch_add(1, Ordering::SeqCst);
self.inner.sleep(duration)
}
fn now_monotonic(&self) -> Instant {
self.inner.now_monotonic()
}
fn now_wall_clock(&self) -> SystemTime {
self.inner.now_wall_clock()
}
}
#[tokio::test]
async fn runtime_substitution_drives_session_and_listener() {
use phantom_protocol::api::{PhantomListener, PhantomSession, TcpSessionTransport};
use phantom_protocol::crypto::hybrid_sign::HybridVerifyingKey;
use tokio::net::TcpStream;
let listener_rt: Arc<CountingRuntime> = Arc::new(CountingRuntime::default());
let client_rt: Arc<CountingRuntime> = Arc::new(CountingRuntime::default());
let listener = PhantomListener::bind_with_runtime(
"127.0.0.1:0".to_string(),
listener_rt.clone() as Arc<dyn Runtime>,
)
.await
.expect("bind");
let server_addr: std::net::SocketAddr = listener.local_addr().parse().expect("addr");
let server_key = HybridVerifyingKey::from_bytes(&listener.verifying_key_bytes()).expect("key");
let listener_clone = listener.clone();
let server_handle = tokio::spawn(async move {
let session = listener_clone.accept().await.expect("accept").session();
let req = session.recv().await.expect("recv");
session.send(req).await.expect("echo");
tokio::time::sleep(Duration::from_millis(50)).await;
let _ = session.disconnect().await;
});
let stream = TcpStream::connect(server_addr).await.expect("connect");
let transport = TcpSessionTransport::new(stream);
let session = PhantomSession::connect_with_transport_with_runtime(
&server_addr.to_string(),
transport,
server_key,
client_rt.clone() as Arc<dyn Runtime>,
);
tokio::time::sleep(Duration::from_millis(500)).await;
session.send(b"ping".to_vec()).await.expect("send");
let echo = session.recv().await.expect("recv");
assert_eq!(echo, b"ping");
server_handle.await.expect("server task");
session.disconnect().await.expect("close");
assert!(
listener_rt.spawns() >= 1,
"listener runtime should have observed at least one spawn, got {}",
listener_rt.spawns()
);
assert!(
client_rt.spawns() >= 1,
"client runtime should have observed at least one spawn, got {}",
client_rt.spawns()
);
let _ = client_rt.sleeps();
}