#![allow(
clippy::expect_used,
clippy::unwrap_used,
clippy::panic,
clippy::missing_panics_doc,
clippy::similar_names
)]
use std::time::Duration;
use arcp::auth::BearerAuthenticator;
use arcp::envelope::Envelope;
use arcp::messages::{
AuthScheme, CancelPayload, CancelTargetKind, Capabilities, ClientIdentity, Credentials,
MessageType, PingPayload, SessionAckPayload, SessionListJobsPayload, SessionPingPayload,
};
use arcp::runtime::ARCPRuntime;
use arcp::transport::{paired, Transport};
use arcp::ARCPClient;
async fn handshake_and_get_session_id(
client: ARCPClient<arcp::transport::MemoryTransport>,
) -> arcp::ids::SessionId {
let session = client
.open()
.expect("open")
.authenticate(
Credentials {
scheme: AuthScheme::Bearer,
token: Some("t".into()),
},
ClientIdentity {
kind: "test".into(),
version: "0".into(),
fingerprint: None,
principal: None,
},
Capabilities::default(),
)
.await
.expect("auth");
session.id().await.expect("session id")
}
async fn spawn_pair() -> (
arcp::transport::MemoryTransport,
arcp::transport::MemoryTransport,
) {
let runtime = ARCPRuntime::builder()
.with_authenticator(Box::new(BearerAuthenticator::new().with_token("t", "p")))
.build()
.await
.expect("build");
let (server_t, client_t) = paired();
let _h = runtime.serve_connection(server_t);
let (extra_a, extra_b) = paired();
drop(extra_a);
drop(extra_b);
(server_t_dummy(), client_t)
}
fn server_t_dummy() -> arcp::transport::MemoryTransport {
let (a, _b) = paired();
a
}
#[tokio::test]
async fn ping_dispatched_to_pong_after_handshake() {
let runtime = ARCPRuntime::builder()
.with_authenticator(Box::new(BearerAuthenticator::new().with_token("t", "p")))
.build()
.await
.expect("build");
let (server_t, client_t) = paired();
let _h = runtime.serve_connection(server_t);
let open_id = arcp::ids::MessageId::new();
let mut open = Envelope::new(MessageType::SessionOpen(
arcp::messages::SessionOpenPayload {
auth: Credentials {
scheme: AuthScheme::Bearer,
token: Some("t".into()),
},
client: ClientIdentity {
kind: "test".into(),
version: "0".into(),
fingerprint: None,
principal: None,
},
capabilities: Capabilities::default(),
},
));
open.id = open_id.clone();
client_t.send(open).await.expect("send open");
let accept = client_t.recv().await.expect("recv").expect("present");
assert!(matches!(accept.payload, MessageType::SessionAccepted(_)));
let session_id = match accept.payload {
MessageType::SessionAccepted(p) => p.session_id,
_ => unreachable!(),
};
let mut ping = Envelope::new(MessageType::Ping(PingPayload::default()));
ping.session_id = Some(session_id);
let ping_id = ping.id.clone();
client_t.send(ping).await.expect("send ping");
let pong = tokio::time::timeout(Duration::from_millis(200), client_t.recv())
.await
.expect("timely")
.expect("recv")
.expect("present");
assert!(matches!(pong.payload, MessageType::Pong(_)));
assert_eq!(pong.correlation_id.as_ref(), Some(&ping_id));
}
#[tokio::test]
async fn job_submit_with_unknown_agent_version_yields_error() {
use arcp::error::{ARCPError, ErrorCode};
use arcp::messages::{AgentInventory, AgentInventoryEntry, ToolInvokePayload};
use arcp::runtime::tools::{ToolHandler, ToolRegistryBuilder};
use async_trait::async_trait;
use std::sync::Arc;
struct Echo;
#[async_trait]
impl ToolHandler for Echo {
fn name(&self) -> &'static str {
"echo"
}
async fn invoke(
&self,
arguments: serde_json::Value,
_ctx: arcp::runtime::context::ToolContext,
) -> Result<serde_json::Value, ARCPError> {
Ok(arguments)
}
}
let caps = Capabilities {
agents: Some(AgentInventory::Rich(vec![AgentInventoryEntry {
name: "echo".into(),
versions: vec!["1.0.0".into()],
default: Some("1.0.0".into()),
}])),
..Capabilities::default()
};
let tools = ToolRegistryBuilder::new().with(Arc::new(Echo)).build();
let runtime = ARCPRuntime::builder()
.with_authenticator(Box::new(BearerAuthenticator::new().with_token("t", "p")))
.with_tools(tools)
.with_capabilities(caps)
.build()
.await
.expect("build");
let (server_t, client_t) = paired();
let _h = runtime.serve_connection(server_t);
let mut open = Envelope::new(MessageType::SessionOpen(
arcp::messages::SessionOpenPayload {
auth: Credentials {
scheme: AuthScheme::Bearer,
token: Some("t".into()),
},
client: ClientIdentity {
kind: "test".into(),
version: "0".into(),
fingerprint: None,
principal: None,
},
capabilities: Capabilities::default(),
},
));
open.id = arcp::ids::MessageId::new();
client_t.send(open).await.expect("send open");
let accepted = client_t.recv().await.expect("recv").expect("present");
let session_id = match accepted.payload {
MessageType::SessionAccepted(p) => p.session_id,
other => panic!("expected accepted, got {other:?}"),
};
let mut invoke = Envelope::new(MessageType::ToolInvoke(ToolInvokePayload::new(
"echo@2.0.0",
serde_json::json!({}),
)));
invoke.session_id = Some(session_id);
client_t.send(invoke).await.expect("send invoke");
let _ = client_t.recv().await.expect("recv").expect("job.accepted");
let next = tokio::time::timeout(Duration::from_millis(500), client_t.recv())
.await
.expect("timely")
.expect("recv")
.expect("present");
match next.payload {
MessageType::JobFailed(p) => {
assert_eq!(p.code, ErrorCode::AgentVersionNotAvailable);
assert_eq!(p.retryable, Some(false));
}
other => panic!("expected job.failed, got {other:?}"),
}
}
#[tokio::test]
async fn session_list_jobs_returns_visible_jobs() {
use arcp::error::ARCPError;
use arcp::messages::ToolInvokePayload;
use arcp::runtime::tools::{ToolHandler, ToolRegistryBuilder};
use async_trait::async_trait;
use std::sync::Arc;
struct SleepTool;
#[async_trait]
impl ToolHandler for SleepTool {
fn name(&self) -> &'static str {
"sleep"
}
async fn invoke(
&self,
_arguments: serde_json::Value,
ctx: arcp::runtime::context::ToolContext,
) -> Result<serde_json::Value, ARCPError> {
tokio::select! {
() = ctx.cancel.cancelled() => Err(ARCPError::Cancelled { reason: "x".into() }),
() = tokio::time::sleep(Duration::from_secs(5)) => Ok(serde_json::json!(null)),
}
}
}
let tools = ToolRegistryBuilder::new().with(Arc::new(SleepTool)).build();
let runtime = ARCPRuntime::builder()
.with_authenticator(Box::new(BearerAuthenticator::new().with_token("t", "p")))
.with_tools(tools)
.build()
.await
.expect("build");
let (server_t, client_t) = paired();
let _h = runtime.serve_connection(server_t);
let mut open = Envelope::new(MessageType::SessionOpen(
arcp::messages::SessionOpenPayload {
auth: Credentials {
scheme: AuthScheme::Bearer,
token: Some("t".into()),
},
client: ClientIdentity {
kind: "test".into(),
version: "0".into(),
fingerprint: None,
principal: None,
},
capabilities: Capabilities::default(),
},
));
open.id = arcp::ids::MessageId::new();
client_t.send(open).await.expect("send open");
let accept = client_t.recv().await.expect("recv").expect("present");
let session_id = match accept.payload {
MessageType::SessionAccepted(p) => p.session_id,
other => panic!("expected accepted, got {other:?}"),
};
let mut invoke = Envelope::new(MessageType::ToolInvoke(ToolInvokePayload::new(
"sleep",
serde_json::json!({}),
)));
invoke.session_id = Some(session_id.clone());
client_t.send(invoke).await.expect("send invoke");
let _accepted = tokio::time::timeout(Duration::from_millis(300), client_t.recv())
.await
.expect("timely")
.expect("recv")
.expect("present");
let mut list = Envelope::new(MessageType::SessionListJobs(SessionListJobsPayload {
filter: None,
limit: None,
cursor: None,
}));
list.session_id = Some(session_id);
let list_id = list.id.clone();
client_t.send(list).await.expect("send list");
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
let response = loop {
let env = tokio::time::timeout_at(deadline, client_t.recv())
.await
.expect("timely")
.expect("recv")
.expect("present");
if let MessageType::SessionJobs(p) = env.payload {
assert_eq!(env.correlation_id.as_ref(), Some(&list_id));
break p;
}
};
assert_eq!(response.jobs.len(), 1);
assert_eq!(response.jobs[0].agent, "sleep");
assert!(response.next_cursor.is_none());
}
#[tokio::test]
async fn session_ack_unblocks_writer_when_window_exhausted() {
use arcp::error::ARCPError;
use arcp::messages::ToolInvokePayload;
use arcp::runtime::tools::{ToolHandler, ToolRegistryBuilder};
use async_trait::async_trait;
use std::sync::Arc;
struct EchoTool;
#[async_trait]
impl ToolHandler for EchoTool {
fn name(&self) -> &'static str {
"echo"
}
async fn invoke(
&self,
arguments: serde_json::Value,
_ctx: arcp::runtime::context::ToolContext,
) -> Result<serde_json::Value, ARCPError> {
Ok(arguments)
}
}
let tools = ToolRegistryBuilder::new().with(Arc::new(EchoTool)).build();
let runtime = ARCPRuntime::builder()
.with_authenticator(Box::new(BearerAuthenticator::new().with_token("t", "p")))
.with_tools(tools)
.with_ack_window(1)
.build()
.await
.expect("build");
let (server_t, client_t) = paired();
let _h = runtime.serve_connection(server_t);
let mut open = Envelope::new(MessageType::SessionOpen(
arcp::messages::SessionOpenPayload {
auth: Credentials {
scheme: AuthScheme::Bearer,
token: Some("t".into()),
},
client: ClientIdentity {
kind: "test".into(),
version: "0".into(),
fingerprint: None,
principal: None,
},
capabilities: Capabilities::default(),
},
));
open.id = arcp::ids::MessageId::new();
client_t.send(open).await.expect("send open");
let accept = client_t.recv().await.expect("recv").expect("present");
let session_id = match accept.payload {
MessageType::SessionAccepted(p) => p.session_id,
other => panic!("expected accepted, got {other:?}"),
};
let mut invoke = Envelope::new(MessageType::ToolInvoke(ToolInvokePayload::new(
"echo",
serde_json::json!({"hello": "world"}),
)));
invoke.session_id = Some(session_id.clone());
client_t.send(invoke).await.expect("send invoke");
let first = tokio::time::timeout(Duration::from_millis(300), client_t.recv())
.await
.expect("timely")
.expect("recv")
.expect("present");
assert!(matches!(first.payload, MessageType::JobAccepted(_)));
let result = tokio::time::timeout(Duration::from_millis(150), client_t.recv()).await;
assert!(
result.is_err(),
"writer should be paused, but got: {result:?}"
);
let mut ack = Envelope::new(MessageType::SessionAck(SessionAckPayload {
last_processed_seq: 1,
}));
ack.session_id = Some(session_id);
client_t.send(ack).await.expect("send ack");
let next = tokio::time::timeout(Duration::from_secs(1), client_t.recv())
.await
.expect("timely")
.expect("recv")
.expect("present");
assert!(
matches!(
next.payload,
MessageType::JobStarted(_) | MessageType::JobCompleted(_)
),
"expected job.started/completed, got {:?}",
next.payload
);
}
#[tokio::test]
async fn session_ping_dispatched_to_session_pong() {
let runtime = ARCPRuntime::builder()
.with_authenticator(Box::new(BearerAuthenticator::new().with_token("t", "p")))
.build()
.await
.expect("build");
let (server_t, client_t) = paired();
let _h = runtime.serve_connection(server_t);
let open_id = arcp::ids::MessageId::new();
let mut open = Envelope::new(MessageType::SessionOpen(
arcp::messages::SessionOpenPayload {
auth: Credentials {
scheme: AuthScheme::Bearer,
token: Some("t".into()),
},
client: ClientIdentity {
kind: "test".into(),
version: "0".into(),
fingerprint: None,
principal: None,
},
capabilities: Capabilities::default(),
},
));
open.id = open_id.clone();
client_t.send(open).await.expect("send open");
let accept = client_t.recv().await.expect("recv").expect("present");
let session_id = match accept.payload {
MessageType::SessionAccepted(p) => p.session_id,
other => panic!("expected accepted, got {other:?}"),
};
let nonce = "p_01J".to_owned();
let mut ping = Envelope::new(MessageType::SessionPing(SessionPingPayload {
nonce: nonce.clone(),
sent_at: chrono::Utc::now(),
}));
ping.session_id = Some(session_id);
let ping_id = ping.id.clone();
client_t.send(ping).await.expect("send session.ping");
let pong = tokio::time::timeout(Duration::from_millis(200), client_t.recv())
.await
.expect("timely")
.expect("recv")
.expect("present");
match pong.payload {
MessageType::SessionPong(p) => {
assert_eq!(p.ping_nonce, nonce);
}
other => panic!("expected session.pong, got {other:?}"),
}
assert_eq!(pong.correlation_id.as_ref(), Some(&ping_id));
}
#[tokio::test]
async fn cancel_for_unknown_job_yields_cancel_refused() {
let runtime = ARCPRuntime::builder()
.with_authenticator(Box::new(BearerAuthenticator::new().with_token("t", "p")))
.build()
.await
.expect("build");
let (server_t, client_t) = paired();
let _h = runtime.serve_connection(server_t);
let client = ARCPClient::new(client_t.clone());
let _session = handshake_and_get_session_id(client).await;
let mut env = Envelope::new(MessageType::Cancel(CancelPayload {
target: CancelTargetKind::Job,
target_id: "job_DOES_NOT_EXIST_01ABCDEFGHJKMNPQRSTVWXYZ".into(),
reason: Some("test".into()),
deadline_ms: Some(1000),
}));
env.session_id = Some(arcp::ids::SessionId::new()); let cancel_id = env.id.clone();
client_t.send(env).await.expect("send cancel");
let _ = cancel_id; tokio::time::sleep(Duration::from_millis(50)).await;
}
#[tokio::test]
async fn cancel_with_malformed_target_id_yields_cancel_refused() {
let runtime = ARCPRuntime::builder()
.with_authenticator(Box::new(BearerAuthenticator::new().with_token("t", "p")))
.build()
.await
.expect("build");
let (server_t, client_t) = paired();
let _h = runtime.serve_connection(server_t);
let client = ARCPClient::new(client_t.clone());
let _session = handshake_and_get_session_id(client).await;
let mut env = Envelope::new(MessageType::Cancel(CancelPayload {
target: CancelTargetKind::Job,
target_id: "not-a-valid-id".into(),
reason: None,
deadline_ms: None,
}));
env.session_id = Some(arcp::ids::SessionId::new());
client_t.send(env).await.expect("send cancel");
tokio::time::sleep(Duration::from_millis(50)).await;
}
#[tokio::test]
async fn pre_acceptance_non_handshake_message_is_dropped() {
let runtime = ARCPRuntime::builder()
.with_authenticator(Box::new(BearerAuthenticator::new().with_token("t", "p")))
.build()
.await
.expect("build");
let (server_t, client_t) = paired();
let _h = runtime.serve_connection(server_t);
let ping = Envelope::new(MessageType::Ping(PingPayload::default()));
client_t.send(ping).await.expect("send ping");
let open = Envelope::new(MessageType::SessionOpen(
arcp::messages::SessionOpenPayload {
auth: Credentials {
scheme: AuthScheme::Bearer,
token: Some("t".into()),
},
client: ClientIdentity {
kind: "test".into(),
version: "0".into(),
fingerprint: None,
principal: None,
},
capabilities: Capabilities::default(),
},
));
client_t.send(open).await.expect("send open");
let accept = tokio::time::timeout(Duration::from_millis(500), client_t.recv())
.await
.expect("timely")
.expect("recv")
.expect("present");
assert!(matches!(accept.payload, MessageType::SessionAccepted(_)));
}
#[tokio::test]
async fn duplicate_envelope_id_is_silently_ignored() {
let runtime = ARCPRuntime::builder()
.with_authenticator(Box::new(BearerAuthenticator::new().with_token("t", "p")))
.build()
.await
.expect("build");
let (server_t, client_t) = paired();
let _h = runtime.serve_connection(server_t);
let open = Envelope::new(MessageType::SessionOpen(
arcp::messages::SessionOpenPayload {
auth: Credentials {
scheme: AuthScheme::Bearer,
token: Some("t".into()),
},
client: ClientIdentity {
kind: "test".into(),
version: "0".into(),
fingerprint: None,
principal: None,
},
capabilities: Capabilities::default(),
},
));
let mut replay = open.clone();
replay.id = open.id.clone();
client_t.send(open).await.expect("send first");
let _accept = client_t.recv().await.expect("recv").expect("present");
client_t.send(replay).await.expect("send replay");
let timed_out = tokio::time::timeout(Duration::from_millis(80), client_t.recv())
.await
.is_err();
assert!(timed_out, "runtime must not respond twice to a replayed id");
let _ = spawn_pair().await; }