use agent_phone::session::HandlerOutput;
use agent_phone::{
connect, create_server, encode_did_key, generate_key_pair, ClientOptions, Handler,
ServerOptions,
};
use futures_util::{stream, StreamExt};
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
fn echo_handler() -> Handler {
Arc::new(|p: Value| Box::pin(async move { Ok(HandlerOutput::Unary(p)) }))
}
async fn pair(handlers: HashMap<String, Handler>) -> (agent_phone::Server, agent_phone::Client) {
let resp_kp = generate_key_pair();
let init_kp = generate_key_pair();
let resp_did = encode_did_key(&resp_kp.public_key).unwrap();
let init_did = encode_did_key(&init_kp.public_key).unwrap();
let mut server = create_server(ServerOptions {
did: resp_did.clone(),
private_key: resp_kp.private_key,
handlers,
});
server.listen(0, "127.0.0.1").await.unwrap();
let port = server.address().unwrap().port();
let client = connect(ClientOptions {
url: format!("ws://127.0.0.1:{port}"),
did: init_did,
private_key: init_kp.private_key,
responder_did: resp_did,
responder_public_key: None,
})
.await
.unwrap();
(server, client)
}
#[tokio::test]
async fn end_to_end_unary_echo() {
let mut handlers = HashMap::new();
handlers.insert("echo".into(), echo_handler());
let (mut server, client) = pair(handlers).await;
let res = client
.call("echo", Some(serde_json::json!({"message": "hi"})))
.await
.unwrap();
assert_eq!(res, serde_json::json!({"message": "hi"}));
client.close().await;
server.close().await;
}
#[tokio::test]
async fn c1_handshake_did_binding() {
let real = generate_key_pair();
let fake = generate_key_pair();
let real_did = encode_did_key(&real.public_key).unwrap();
let mut server = create_server(ServerOptions {
did: real_did.clone(),
private_key: fake.private_key, handlers: HashMap::new(),
});
server.listen(0, "127.0.0.1").await.unwrap();
let port = server.address().unwrap().port();
let init = generate_key_pair();
let t0 = Instant::now();
let result = connect(ClientOptions {
url: format!("ws://127.0.0.1:{port}"),
did: encode_did_key(&init.public_key).unwrap(),
private_key: init.private_key,
responder_did: real_did,
responder_public_key: None,
})
.await;
let elapsed = t0.elapsed();
assert!(result.is_err(), "C1: initiator accepted impostor");
assert!(elapsed.as_secs() < 2, "C1: too slow ({elapsed:?})");
server.close().await;
}
#[tokio::test]
async fn c2_streaming_backpressure() {
let n: u64 = 10_000;
let mut handlers: HashMap<String, Handler> = HashMap::new();
handlers.insert(
"torrent".into(),
Arc::new(move |_p: Value| {
Box::pin(async move {
let s = stream::iter((0..n).map(Value::from));
Ok(HandlerOutput::Stream(Box::pin(s)))
})
}),
);
let (mut server, client) = pair(handlers).await;
let mut s = client.stream("torrent", None, 8).await.unwrap();
let mut got: Vec<u64> = Vec::with_capacity(n as usize);
while let Some(v) = s.next().await {
got.push(v.as_u64().unwrap());
}
assert_eq!(got.len(), n as usize, "C2: not all chunks delivered");
for (i, v) in got.iter().enumerate() {
assert_eq!(*v, i as u64);
}
client.close().await;
server.close().await;
}
#[tokio::test]
async fn c3_graceful_cancel() {
let mut handlers: HashMap<String, Handler> = HashMap::new();
handlers.insert(
"infinite".into(),
Arc::new(|_p: Value| {
Box::pin(async move {
let s = stream::iter(0u64..).map(Value::from);
Ok(HandlerOutput::Stream(Box::pin(s)))
})
}),
);
handlers.insert("ping".into(), echo_handler());
let (mut server, client) = pair(handlers).await;
let mut s = client.stream("infinite", None, 8).await.unwrap();
for _ in 0..10 {
let _ = s.next().await;
}
s.cancel().await;
drop(s);
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let r = client
.call("ping", Some(serde_json::json!({"still": "alive"})))
.await
.unwrap();
assert_eq!(r, serde_json::json!({"still": "alive"}));
client.close().await;
server.close().await;
}
#[tokio::test]
async fn unknown_method_rejects() {
let mut handlers = HashMap::new();
handlers.insert("echo".into(), echo_handler());
let (mut server, client) = pair(handlers).await;
let r = client.call("no_such_method", None).await;
assert!(matches!(r, Err(agent_phone::Error::Rpc { .. })));
client.close().await;
server.close().await;
}
#[tokio::test]
async fn two_calls_on_same_session() {
let mut handlers = HashMap::new();
handlers.insert("echo".into(), echo_handler());
let (mut server, client) = pair(handlers).await;
assert_eq!(
client
.call("echo", Some(serde_json::json!({"n": 1})))
.await
.unwrap(),
serde_json::json!({"n": 1})
);
assert_eq!(
client
.call("echo", Some(serde_json::json!({"n": 2})))
.await
.unwrap(),
serde_json::json!({"n": 2})
);
client.close().await;
server.close().await;
}