aether_cli/acp/
testing.rs1use super::handlers::acp_agent_builder;
2use super::relay::spawn_relay;
3use super::session::Session;
4use super::session_manager::{InitialSessionSelection, SessionManager, SessionManagerConfig};
5use super::session_registry::SessionRegistry;
6use super::session_store::SessionStore;
7use acp_utils::testing::{TestPeer, duplex_pair};
8use aether_core::core::AgentHandle;
9use aether_core::events::{AgentMessage, UserMessage};
10use agent_client_protocol::schema::SessionId;
11use agent_client_protocol::{Agent, Client, ConnectionTo};
12use llm::oauth::OAuthCredentialStore;
13use std::sync::Arc;
14use tempfile::TempDir;
15use tokio::sync::{mpsc, oneshot};
16use tokio::task::spawn_local;
17
18pub struct AcpTestHarness {
24 pub client_cx: ConnectionTo<Agent>,
25 pub peer: TestPeer,
26 agent_cx: ConnectionTo<Client>,
27 registry: Arc<SessionRegistry>,
28 session_store: Arc<SessionStore>,
29 _tmp: TempDir,
30}
31
32impl AcpTestHarness {
33 pub async fn start() -> Self {
34 let tmp = tempfile::tempdir().expect("tempdir for session store");
35 let registry = Arc::new(SessionRegistry::new());
36 let session_store = Arc::new(SessionStore::from_path(tmp.path().to_path_buf()));
37 let manager = Arc::new(SessionManager::new(SessionManagerConfig {
38 registry: registry.clone(),
39 session_store: session_store.clone(),
40 has_oauth_credential: OAuthCredentialStore::has_credential,
41 initial_selection: InitialSessionSelection::default(),
42 settings_source: None,
43 }));
44
45 let (peer, client_builder) = TestPeer::new();
46 let (agent_transport, client_transport) = duplex_pair();
47 let (agent_cx_tx, agent_cx_rx) = oneshot::channel::<ConnectionTo<Client>>();
48 let (client_cx_tx, client_cx_rx) = oneshot::channel::<ConnectionTo<Agent>>();
49
50 spawn_local(async move {
51 let _ = acp_agent_builder(manager)
52 .connect_with(agent_transport, async move |cx: ConnectionTo<Client>| {
53 let _ = agent_cx_tx.send(cx);
54 std::future::pending::<()>().await;
55 Ok(())
56 })
57 .await;
58 });
59
60 spawn_local(async move {
61 let _ = client_builder
62 .connect_with(client_transport, async move |cx: ConnectionTo<Agent>| {
63 let _ = client_cx_tx.send(cx);
64 std::future::pending::<()>().await;
65 Ok(())
66 })
67 .await;
68 });
69
70 let agent_cx = agent_cx_rx.await.expect("agent side connect_with produced a ConnectionTo");
71 let client_cx = client_cx_rx.await.expect("client side connect_with produced a ConnectionTo");
72 Self { client_cx, peer, agent_cx, registry, session_store, _tmp: tmp }
73 }
74
75 pub async fn insert_stub_session(
81 &self,
82 agent_tx: mpsc::Sender<UserMessage>,
83 agent_rx: mpsc::Receiver<AgentMessage>,
84 agent_handle: AgentHandle,
85 id: SessionId,
86 model: &str,
87 ) {
88 let (mcp_tx, _mcp_rx) = mpsc::channel(1);
89 let (_event_tx, event_rx) = mpsc::channel(1);
90 let session = Session {
91 agent_tx,
92 agent_rx,
93 agent_handle,
94 _mcp_handle: tokio::spawn(async {}),
95 mcp_tx,
96 event_rx,
97 initial_server_statuses: vec![],
98 };
99 let relay = spawn_relay(session, self.agent_cx.clone(), id.clone(), self.session_store.clone());
100 self.registry.insert(id.0.to_string(), relay, model.to_string(), None, None, vec![]).await;
101 }
102}