agent-phone 0.1.0

Minimal sync RPC between two AI agents (Rust port of @p-vbordei/agent-phone). Self-custody keys, Noise-framework handshake, DID-bound WebSocket.
Documentation
//! End-to-end tests + conformance vectors C1, C2, C3.

use agent_phone::session::HandlerOutput;
use agent_phone::{
    connect, create_server, encode_did_key, generate_key_pair, ClientOptions, Handler,
    ServerOptions,
};
use futures_util::{stream, StreamExt};
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;

fn echo_handler() -> Handler {
    Arc::new(|p: Value| Box::pin(async move { Ok(HandlerOutput::Unary(p)) }))
}

async fn pair(handlers: HashMap<String, Handler>) -> (agent_phone::Server, agent_phone::Client) {
    let resp_kp = generate_key_pair();
    let init_kp = generate_key_pair();
    let resp_did = encode_did_key(&resp_kp.public_key).unwrap();
    let init_did = encode_did_key(&init_kp.public_key).unwrap();

    let mut server = create_server(ServerOptions {
        did: resp_did.clone(),
        private_key: resp_kp.private_key,
        handlers,
    });
    server.listen(0, "127.0.0.1").await.unwrap();
    let port = server.address().unwrap().port();

    let client = connect(ClientOptions {
        url: format!("ws://127.0.0.1:{port}"),
        did: init_did,
        private_key: init_kp.private_key,
        responder_did: resp_did,
        responder_public_key: None,
    })
    .await
    .unwrap();
    (server, client)
}

#[tokio::test]
async fn end_to_end_unary_echo() {
    let mut handlers = HashMap::new();
    handlers.insert("echo".into(), echo_handler());
    let (mut server, client) = pair(handlers).await;
    let res = client
        .call("echo", Some(serde_json::json!({"message": "hi"})))
        .await
        .unwrap();
    assert_eq!(res, serde_json::json!({"message": "hi"}));
    client.close().await;
    server.close().await;
}

/// C1 — handshake DID-binding: swap responder's static key → initiator aborts.
#[tokio::test]
async fn c1_handshake_did_binding() {
    let real = generate_key_pair();
    let fake = generate_key_pair();
    let real_did = encode_did_key(&real.public_key).unwrap();
    let mut server = create_server(ServerOptions {
        did: real_did.clone(),
        private_key: fake.private_key, // the lie
        handlers: HashMap::new(),
    });
    server.listen(0, "127.0.0.1").await.unwrap();
    let port = server.address().unwrap().port();
    let init = generate_key_pair();
    let t0 = Instant::now();
    let result = connect(ClientOptions {
        url: format!("ws://127.0.0.1:{port}"),
        did: encode_did_key(&init.public_key).unwrap(),
        private_key: init.private_key,
        responder_did: real_did,
        responder_public_key: None,
    })
    .await;
    let elapsed = t0.elapsed();
    assert!(result.is_err(), "C1: initiator accepted impostor");
    assert!(elapsed.as_secs() < 2, "C1: too slow ({elapsed:?})");
    server.close().await;
}

/// C2 — streaming backpressure: client grants 8 at a time, server stays bounded.
#[tokio::test]
async fn c2_streaming_backpressure() {
    let n: u64 = 10_000;
    let mut handlers: HashMap<String, Handler> = HashMap::new();
    handlers.insert(
        "torrent".into(),
        Arc::new(move |_p: Value| {
            Box::pin(async move {
                let s = stream::iter((0..n).map(Value::from));
                Ok(HandlerOutput::Stream(Box::pin(s)))
            })
        }),
    );
    let (mut server, client) = pair(handlers).await;
    let mut s = client.stream("torrent", None, 8).await.unwrap();
    let mut got: Vec<u64> = Vec::with_capacity(n as usize);
    while let Some(v) = s.next().await {
        got.push(v.as_u64().unwrap());
    }
    assert_eq!(got.len(), n as usize, "C2: not all chunks delivered");
    for (i, v) in got.iter().enumerate() {
        assert_eq!(*v, i as u64);
    }
    client.close().await;
    server.close().await;
}

/// C3 — graceful cancel: cancel mid-stream, session stays usable.
#[tokio::test]
async fn c3_graceful_cancel() {
    let mut handlers: HashMap<String, Handler> = HashMap::new();
    handlers.insert(
        "infinite".into(),
        Arc::new(|_p: Value| {
            Box::pin(async move {
                let s = stream::iter(0u64..).map(Value::from);
                Ok(HandlerOutput::Stream(Box::pin(s)))
            })
        }),
    );
    handlers.insert("ping".into(), echo_handler());
    let (mut server, client) = pair(handlers).await;

    let mut s = client.stream("infinite", None, 8).await.unwrap();
    for _ in 0..10 {
        let _ = s.next().await;
    }
    s.cancel().await;
    drop(s);
    tokio::time::sleep(std::time::Duration::from_millis(50)).await;
    let r = client
        .call("ping", Some(serde_json::json!({"still": "alive"})))
        .await
        .unwrap();
    assert_eq!(r, serde_json::json!({"still": "alive"}));
    client.close().await;
    server.close().await;
}

#[tokio::test]
async fn unknown_method_rejects() {
    let mut handlers = HashMap::new();
    handlers.insert("echo".into(), echo_handler());
    let (mut server, client) = pair(handlers).await;
    let r = client.call("no_such_method", None).await;
    assert!(matches!(r, Err(agent_phone::Error::Rpc { .. })));
    client.close().await;
    server.close().await;
}

#[tokio::test]
async fn two_calls_on_same_session() {
    let mut handlers = HashMap::new();
    handlers.insert("echo".into(), echo_handler());
    let (mut server, client) = pair(handlers).await;
    assert_eq!(
        client
            .call("echo", Some(serde_json::json!({"n": 1})))
            .await
            .unwrap(),
        serde_json::json!({"n": 1})
    );
    assert_eq!(
        client
            .call("echo", Some(serde_json::json!({"n": 2})))
            .await
            .unwrap(),
        serde_json::json!({"n": 2})
    );
    client.close().await;
    server.close().await;
}