mod common;
use agent_os_client::{AgentOs, ClientError, CreateSessionOptions};
async fn try_create_session(os: &AgentOs) -> Option<String> {
match os
.create_session("pi", CreateSessionOptions::default())
.await
{
Ok(session) => Some(session.session_id),
Err(error) => {
eprintln!(
"skipping session e2e: create_session unavailable in this environment ({error})"
);
None
}
}
}
#[tokio::test]
async fn session_surface_create_prompt_events_close() {
if !common::sidecar_available() {
eprintln!("skipping session_surface_create_prompt_events_close: sidecar binary not built");
return;
}
let os = common::new_vm().await;
assert!(os.list_sessions().is_empty(), "a fresh VM has no sessions");
let agents = os.list_agents();
assert_eq!(agents.len(), 5, "the five built-in agents must be listed");
assert!(
agents
.iter()
.any(|a| a.id == "pi" && a.acp_adapter == "@rivet-dev/agent-os-pi"),
"list_agents must include the pi agent config"
);
assert!(
matches!(os.resume_session("nope"), Err(ClientError::SessionNotFound(_))),
"resume_session(unknown) must return SessionNotFound"
);
assert!(
matches!(os.close_session("nope"), Err(ClientError::SessionNotFound(_))),
"close_session(unknown) must return SessionNotFound"
);
assert!(
os.prompt("nope", "x")
.await
.unwrap_err()
.downcast_ref::<ClientError>()
.map(|error| matches!(error, ClientError::SessionNotFound(_)))
.unwrap_or(false),
"prompt(unknown) must return SessionNotFound"
);
let session_id = match try_create_session(&os).await {
Some(id) => id,
None => {
os.shutdown().await.expect("shutdown");
return;
}
};
assert!(
os.list_sessions().iter().any(|s| s.session_id == session_id),
"created session must appear in list_sessions"
);
let (mut events, _sub) = os
.on_session_event(&session_id)
.expect("on_session_event for live session");
let result = os
.prompt(&session_id, "Say the word PONG and nothing else.")
.await
.expect("prompt");
assert_eq!(result.response.jsonrpc, "2.0");
let saw_update = tokio::time::timeout(std::time::Duration::from_secs(5), async {
use futures::StreamExt;
events.next().await.map(|n| n.method == "session/update")
})
.await
.ok()
.flatten()
.unwrap_or(false);
assert!(
saw_update || !result.text.is_empty(),
"prompt should surface agent activity either via the event stream or accumulated text"
);
let recorded = os
.get_session_events(&session_id, Default::default())
.expect("get_session_events");
assert!(
!recorded.is_empty(),
"prompting should have recorded at least one sequenced event"
);
os.close_session(&session_id).expect("close_session");
let gone = tokio::time::timeout(std::time::Duration::from_secs(5), async {
loop {
if matches!(
os.prompt(&session_id, "ignored").await,
Err(error) if error.downcast_ref::<ClientError>()
.map(|e| matches!(e, ClientError::SessionNotFound(_)))
.unwrap_or(false)
) {
return true;
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
})
.await
.unwrap_or(false);
assert!(
gone,
"after close_session, prompting the session must report SessionNotFound"
);
os.shutdown().await.expect("shutdown");
}