Skip to main content

aether_cli/acp/
testing.rs

1use super::agent::acp_agent_builder;
2use super::agent_key::AgentKey;
3use super::agent_runtime::{AgentRuntime, RuntimeEvent, RuntimeFactory};
4use super::error::SessionError;
5use super::fake_prompt_mcp::FakePromptMcp;
6use super::model_config::{Modes, ValidatedMode};
7use super::session_actor::{SessionActor, SessionActorInit};
8use super::session_config_state::SessionConfigState;
9use super::session_factory::InitialSessionSelection;
10use super::state::{AcpState, AcpStateConfig};
11use crate::acp::session_store::{SessionMeta, SessionStore};
12use crate::error::CliError;
13use crate::settings_args::SettingsSourceArgs;
14use acp_utils::notifications::McpNotification;
15use acp_utils::testing::{TestPeer, duplex_pair};
16use aether_auth::OAuthCredentialStorage;
17use aether_core::agent_spec::{AgentSpec, AgentSpecExposure};
18use aether_core::context::ext::{SessionControlEvent, SessionEvent, UserEvent, last_agent_from_events};
19use aether_core::core::{AgentBuilder, AgentHandle, Prompt};
20use aether_core::events::{AgentMessage, Command};
21use aether_core::mcp::McpSpawnResult;
22use aether_core::mcp::mcp;
23use agent_client_protocol::schema::{SessionId, SessionUpdate};
24use agent_client_protocol::{Agent, Client, ConnectionTo};
25use llm::ProviderConnectionOverrides;
26use llm::testing::FakeLlmProvider;
27use llm::{ChatMessage, Context, LlmResponse, StreamingModelProvider};
28use mcp_utils::client::{McpServer, McpTransport};
29use std::collections::HashMap;
30use std::path::PathBuf;
31use std::sync::{Arc, Mutex};
32use tokio::sync::{mpsc, oneshot};
33use tokio::task::spawn_local;
34
35const PLANNER_REPLY: &str = "planner reply";
36const CODER_REPLY: &str = "coder reply";
37
38/// In-memory ACP harness running the real `acp_agent_builder` against a
39/// pre-wired test client. Created via [`AcpTestHarness::start`] inside a
40/// `LocalSet`. The harness owns an [`AcpState`] and a temp-dir-backed
41/// [`SessionStore`] so tests can register fake-driven sessions without
42/// going through `new_session`.
43pub struct AcpTestHarness {
44    pub client_cx: ConnectionTo<Agent>,
45    pub peer: TestPeer,
46    agent_cx: ConnectionTo<Client>,
47    state: Arc<AcpState>,
48    session_store: Arc<SessionStore>,
49    _tmp: tempfile::TempDir,
50}
51
52pub struct FakeAgentSwitchingSession {
53    session_id: SessionId,
54    planner: FakeAcpAgent,
55    coder: FakeAcpAgent,
56}
57
58#[derive(Clone)]
59pub struct FakeAcpAgent {
60    name: String,
61    captured_contexts: Arc<Mutex<Vec<Context>>>,
62}
63
64impl AcpTestHarness {
65    pub async fn start() -> Self {
66        let tmp = tempfile::tempdir().expect("tempdir for session store");
67        let session_store = Arc::new(SessionStore::from_path(tmp.path().to_path_buf()));
68        let state = Arc::new(AcpState::new(AcpStateConfig {
69            session_store: session_store.clone(),
70            oauth_credential_store: fake_oauth_store(),
71            initial_selection: InitialSessionSelection::default(),
72            settings_source: SettingsSourceArgs::default(),
73            provider_connections: ProviderConnectionOverrides::default(),
74        }));
75
76        let (peer, client_builder) = TestPeer::new();
77        let (agent_transport, client_transport) = duplex_pair();
78        let (agent_cx_tx, agent_cx_rx) = oneshot::channel::<ConnectionTo<Client>>();
79        let (client_cx_tx, client_cx_rx) = oneshot::channel::<ConnectionTo<Agent>>();
80
81        let server_state = state.clone();
82        spawn_local(async move {
83            let _ = acp_agent_builder(server_state)
84                .connect_with(agent_transport, async move |cx: ConnectionTo<Client>| {
85                    let _ = agent_cx_tx.send(cx);
86                    std::future::pending::<()>().await;
87                    Ok(())
88                })
89                .await;
90        });
91
92        spawn_local(async move {
93            let _ = client_builder
94                .connect_with(client_transport, async move |cx: ConnectionTo<Agent>| {
95                    let _ = client_cx_tx.send(cx);
96                    std::future::pending::<()>().await;
97                    Ok(())
98                })
99                .await;
100        });
101
102        let agent_cx = agent_cx_rx.await.expect("agent side connect_with produced a ConnectionTo");
103        let client_cx = client_cx_rx.await.expect("client side connect_with produced a ConnectionTo");
104        Self { client_cx, peer, agent_cx, state, session_store, _tmp: tmp }
105    }
106
107    pub async fn insert_agent_switching_session(&self) -> FakeAgentSwitchingSession {
108        self.insert_switching_session(
109            SessionId::new("agent-switching-session"),
110            Vec::new(),
111            Some("Planner".to_string()),
112            false,
113        )
114        .await
115    }
116
117    pub async fn insert_agent_switching_session_with_serverless_coder(&self) -> FakeAgentSwitchingSession {
118        self.insert_switching_session(
119            SessionId::new("agent-switching-serverless-session"),
120            Vec::new(),
121            Some("Planner".to_string()),
122            true,
123        )
124        .await
125    }
126
127    pub async fn insert_loaded_agent_switching_session(&self, session_id: &str) -> FakeAgentSwitchingSession {
128        let events = self.session_store.load(session_id).map(|(_, events)| events).unwrap_or_default();
129        let selected_mode = last_agent_from_events(Some("Planner".to_string()), &events);
130        self.insert_switching_session(SessionId::new(session_id), events, selected_mode, false).await
131    }
132
133    pub async fn expect_mcp_server_status(&mut self, expected: &[&str]) {
134        assert_server_status(self.peer.next_mcp_notification().await, expected);
135    }
136
137    pub async fn expect_mcp_server_status_exact(&mut self, expected: &[&str]) {
138        assert_server_status_exact(self.peer.next_mcp_notification().await, expected);
139    }
140
141    pub async fn expect_available_commands(&mut self, expected: &[&str], unexpected: &[&str]) {
142        loop {
143            let update = self.peer.next_session_notification().await.update;
144            if matches!(update, SessionUpdate::AvailableCommandsUpdate(_)) {
145                assert_available_commands(update, expected, unexpected);
146                return;
147            }
148        }
149    }
150
151    pub fn append_agent_switch(&self, session_id: &str, from: Option<&str>, to: Option<&str>) {
152        self.append_stored_event(
153            session_id,
154            &SessionEvent::Control(SessionControlEvent::AgentSwitched {
155                from: from.map(str::to_string),
156                to: to.map(str::to_string),
157            }),
158        );
159    }
160
161    /// Register a stub session built from a hand-spawned
162    /// `(agent_tx, agent_rx, agent_handle)` triple — typically from
163    /// `aether_core::core::agent(fake_llm).spawn().await`. Pairs the agent with a
164    /// real but empty in-memory MCP (no servers). The session is routable via
165    /// `state.route_prompt(id)` / `state.cancel(id)`.
166    pub async fn insert_stub_session(
167        &self,
168        agent_tx: mpsc::Sender<Command>,
169        agent_rx: mpsc::Receiver<AgentMessage>,
170        agent_handle: AgentHandle,
171        id: SessionId,
172        model: &str,
173    ) {
174        let model_spec: llm::catalog::LlmModel = "anthropic:claude-sonnet-4-5".parse().expect("test model parses");
175        let spec = AgentSpec::default_spec(&model_spec, None, Vec::new());
176        let factory = Arc::new(StubRuntimeFactory {
177            cwd: PathBuf::from("/tmp"),
178            agent_parts: Mutex::new(Some(StubAgentParts { tx: agent_tx, rx: agent_rx, handle: agent_handle })),
179        });
180
181        let handle = SessionActor::spawn(SessionActorInit {
182            session_id: id.clone(),
183            connection: self.agent_cx.clone(),
184            repository: self.session_store.clone(),
185            oauth_credential_store: fake_oauth_store(),
186            active_agent: AgentKey::Default,
187            specs: HashMap::from([(AgentKey::Default, spec)]),
188            runtime_factory: factory,
189            transcript: Vec::new(),
190            modes: Modes::default(),
191            config: SessionConfigState::with_selection(model.to_string(), None, None),
192        })
193        .await
194        .expect("stub session actor spawns");
195        self.state.register_session(&id, handle).await;
196    }
197
198    pub fn append_stored_session(&self, session_id: &str, created_at: &str) {
199        let meta = SessionMeta {
200            session_id: session_id.to_string(),
201            cwd: PathBuf::from("/tmp"),
202            model: "test-model".to_string(),
203            selected_mode: None,
204            created_at: created_at.to_string(),
205        };
206
207        self.session_store.append_meta(session_id, &meta).expect("stored session meta appends");
208    }
209
210    pub fn append_stored_prompt(&self, session_id: &str, prompt: &str) {
211        self.append_stored_event(
212            session_id,
213            &SessionEvent::User(UserEvent::Message { content: vec![llm::ContentBlock::text(prompt)] }),
214        );
215    }
216
217    pub fn append_stored_user_blocks(&self, session_id: &str, blocks: Vec<llm::ContentBlock>) {
218        self.append_stored_event(session_id, &SessionEvent::User(UserEvent::Message { content: blocks }));
219    }
220
221    pub fn append_stored_agent_text(&self, session_id: &str, text: &str) {
222        self.append_stored_event(
223            session_id,
224            &SessionEvent::Agent(AgentMessage::Text {
225                message_id: "msg".to_string(),
226                chunk: text.to_string(),
227                is_complete: true,
228                model_name: "test".to_string(),
229            }),
230        );
231    }
232
233    async fn insert_switching_session(
234        &self,
235        acp_session_id: SessionId,
236        events: Vec<SessionEvent>,
237        selected_mode: Option<String>,
238        serverless_coder: bool,
239    ) -> FakeAgentSwitchingSession {
240        let (planner_def, planner) = fake_agent("Planner", "planner-mcp", "plan", PLANNER_REPLY);
241        let (mut coder_def, coder) = fake_agent("Coder", "coder-mcp", "edit", CODER_REPLY);
242        if serverless_coder {
243            coder_def.mcp = None;
244        }
245
246        let mut specs = HashMap::new();
247        let mut agents = HashMap::new();
248        for def in [planner_def, coder_def] {
249            specs.insert(AgentKey::Named(def.spec.name.clone()), def.spec.clone());
250            agents.insert(def.spec.name.clone(), def);
251        }
252
253        let factory = Arc::new(FakeRuntimeFactory { cwd: PathBuf::from("/tmp"), agents });
254        let initial_agent = selected_mode.clone().unwrap_or_else(|| "Planner".to_string());
255
256        let handle = SessionActor::spawn(SessionActorInit {
257            session_id: acp_session_id.clone(),
258            connection: self.agent_cx.clone(),
259            repository: self.session_store.clone(),
260            oauth_credential_store: fake_oauth_store(),
261            active_agent: AgentKey::Named(initial_agent),
262            specs,
263            runtime_factory: factory,
264            transcript: events,
265            modes: switching_modes(),
266            config: SessionConfigState::with_selection("anthropic:claude-sonnet-4-5".to_string(), selected_mode, None),
267        })
268        .await
269        .expect("fake agent switching session actor spawns");
270        self.state.register_session(&acp_session_id, handle).await;
271        FakeAgentSwitchingSession { session_id: acp_session_id, planner, coder }
272    }
273
274    fn append_stored_event(&self, session_id: &str, event: &SessionEvent) {
275        self.session_store.append_event(session_id, event).expect("stored session event appends");
276    }
277}
278
279impl FakeAgentSwitchingSession {
280    pub fn session_id(&self) -> &SessionId {
281        &self.session_id
282    }
283
284    pub fn planner(&self) -> &FakeAcpAgent {
285        &self.planner
286    }
287
288    pub fn coder(&self) -> &FakeAcpAgent {
289        &self.coder
290    }
291
292    pub fn agent(&self, name: &str) -> &FakeAcpAgent {
293        match name {
294            "Planner" => &self.planner,
295            "Coder" => &self.coder,
296            other => panic!("unknown fake ACP agent {other:?}"),
297        }
298    }
299}
300
301impl FakeAcpAgent {
302    pub fn name(&self) -> &str {
303        &self.name
304    }
305
306    /// Asserts the agent's most recent turn saw a conversation containing each
307    /// of `expected` (user or assistant text), in addition to anything else.
308    pub fn assert_saw(&self, expected: &[&str]) {
309        let seen = self.latest_conversation();
310        for text in expected {
311            assert!(seen.iter().any(|m| m == text), "{} should have seen {text:?}; saw {seen:?}", self.name);
312        }
313    }
314
315    /// Asserts the agent's most recent turn saw *exactly* `expected` and nothing
316    /// else — used to prove a freshly-activated agent started with no prior
317    /// transcript.
318    pub fn assert_saw_exactly(&self, expected: &[&str]) {
319        let seen = self.latest_conversation();
320        let expected: Vec<String> = expected.iter().map(|t| (*t).to_string()).collect();
321        assert_eq!(seen, expected, "{} conversation mismatch", self.name);
322    }
323
324    /// Asserts the agent never ran a turn (its LLM was never invoked).
325    pub fn assert_never_ran(&self) {
326        let contexts = self.captured_contexts.lock().expect("captured contexts lock is healthy");
327        assert!(contexts.is_empty(), "{} should not have run; captured {} context(s)", self.name, contexts.len());
328    }
329
330    fn latest_conversation(&self) -> Vec<String> {
331        let contexts = self.captured_contexts.lock().expect("captured contexts lock is healthy");
332        let latest = contexts.last().unwrap_or_else(|| panic!("{} should have run a turn", self.name));
333        conversation_texts(latest)
334    }
335}
336
337/// Spawns each agent's runtime through the real [`AgentRuntime`] wiring, but
338/// backed by a [`FakeLlmProvider`] and an in-memory MCP server instead of a
339/// network LLM and external MCP processes.
340struct FakeRuntimeFactory {
341    cwd: PathBuf,
342    agents: HashMap<String, FakeAgentDef>,
343}
344
345struct FakeAgentDef {
346    spec: AgentSpec,
347    provider: Mutex<Option<Arc<dyn StreamingModelProvider>>>,
348    mcp: Option<(String, String)>,
349}
350
351#[async_trait::async_trait]
352impl RuntimeFactory for FakeRuntimeFactory {
353    async fn spawn(
354        &self,
355        agent: AgentKey,
356        spec: &AgentSpec,
357        initial_messages: Vec<ChatMessage>,
358        runtime_event_tx: mpsc::Sender<RuntimeEvent>,
359    ) -> Result<AgentRuntime, SessionError> {
360        let def = self.agents.get(&spec.name).ok_or_else(|| SessionError::AgentNotFound(spec.name.clone()))?;
361        let provider = def
362            .provider
363            .lock()
364            .expect("fake provider lock is healthy")
365            .take()
366            .expect("fake agent runtime spawned more than once");
367
368        let servers = def.mcp.as_ref().map_or_else(Vec::new, |(server_name, prompt_name)| {
369            vec![McpServer::new(
370                server_name.clone(),
371                McpTransport::InMemory { server: FakePromptMcp::new(prompt_name).into_dyn() },
372                false,
373            )]
374        });
375        let mut spawn = mcp(&self.cwd)
376            .with_servers(servers)
377            .spawn()
378            .await
379            .map_err(|e| SessionError::Build(CliError::McpError(e.to_string())))?;
380        let snapshot = spawn
381            .block_until_ready()
382            .await
383            .ok_or_else(|| SessionError::McpOperation("fake MCP bootstrap aborted".to_string()))?;
384        let McpSpawnResult { command_tx: mcp_tx, event_rx, handle: mcp_handle } = spawn;
385
386        let filtered_tools = spec.tools.apply(snapshot.tool_definitions.clone());
387        let mut builder = AgentBuilder::new(provider).max_auto_continues(0);
388        for prompt in &spec.prompts {
389            builder = builder.system_prompt(prompt.clone());
390        }
391        let (agent_tx, agent_rx, agent_handle) = builder
392            .tools(mcp_tx.clone(), filtered_tools)
393            .messages(initial_messages)
394            .spawn()
395            .await
396            .map_err(|e| SessionError::Build(CliError::AgentError(e.to_string())))?;
397
398        Ok(AgentRuntime::new(
399            agent,
400            spec,
401            agent_tx,
402            agent_rx,
403            Some(agent_handle),
404            mcp_tx,
405            event_rx,
406            mcp_handle,
407            snapshot,
408            runtime_event_tx,
409        ))
410    }
411}
412
413struct StubRuntimeFactory {
414    cwd: PathBuf,
415    agent_parts: Mutex<Option<StubAgentParts>>,
416}
417
418struct StubAgentParts {
419    tx: mpsc::Sender<Command>,
420    rx: mpsc::Receiver<AgentMessage>,
421    handle: AgentHandle,
422}
423
424#[async_trait::async_trait]
425impl RuntimeFactory for StubRuntimeFactory {
426    async fn spawn(
427        &self,
428        agent: AgentKey,
429        spec: &AgentSpec,
430        _initial_messages: Vec<ChatMessage>,
431        runtime_event_tx: mpsc::Sender<RuntimeEvent>,
432    ) -> Result<AgentRuntime, SessionError> {
433        let parts = self
434            .agent_parts
435            .lock()
436            .expect("stub agent parts lock is healthy")
437            .take()
438            .expect("stub runtime spawned more than once");
439
440        let mut spawn =
441            mcp(&self.cwd).spawn().await.map_err(|e| SessionError::Build(CliError::McpError(e.to_string())))?;
442        let snapshot = spawn
443            .block_until_ready()
444            .await
445            .ok_or_else(|| SessionError::McpOperation("stub MCP bootstrap aborted".to_string()))?;
446        let McpSpawnResult { command_tx: mcp_tx, event_rx, handle: mcp_handle } = spawn;
447
448        Ok(AgentRuntime::new(
449            agent,
450            spec,
451            parts.tx,
452            parts.rx,
453            Some(parts.handle),
454            mcp_tx,
455            event_rx,
456            mcp_handle,
457            snapshot,
458            runtime_event_tx,
459        ))
460    }
461}
462
463fn fake_agent(name: &str, server_name: &str, prompt_name: &str, reply: &str) -> (FakeAgentDef, FakeAcpAgent) {
464    let provider =
465        FakeLlmProvider::new(vec![vec![LlmResponse::start("msg"), LlmResponse::text(reply), LlmResponse::done()]])
466            .with_display_name(name);
467    let captured_contexts = provider.captured_contexts();
468    let def = FakeAgentDef {
469        spec: fake_agent_spec(name),
470        provider: Mutex::new(Some(Arc::new(provider))),
471        mcp: Some((server_name.to_string(), prompt_name.to_string())),
472    };
473    let observer = FakeAcpAgent { name: name.to_string(), captured_contexts };
474    (def, observer)
475}
476
477fn fake_oauth_store() -> Arc<dyn OAuthCredentialStorage> {
478    Arc::new(aether_auth::FakeOAuthCredentialStore::new())
479}
480
481fn switching_modes() -> Modes {
482    Modes::new(vec![
483        ValidatedMode {
484            name: "Planner".to_string(),
485            model: "anthropic:claude-sonnet-4-5".to_string(),
486            reasoning_effort: None,
487        },
488        ValidatedMode {
489            name: "Coder".to_string(),
490            model: "deepseek:deepseek-chat".to_string(),
491            reasoning_effort: None,
492        },
493    ])
494}
495
496fn fake_agent_spec(name: &str) -> AgentSpec {
497    let model: llm::catalog::LlmModel = "anthropic:claude-sonnet-4-5".parse().expect("test model parses");
498    let mut spec = AgentSpec::default_spec(&model, None, vec![Prompt::text(&format!("{name} system prompt"))]);
499    spec.name = name.to_string();
500    spec.description = format!("{name} test agent");
501    spec.exposure = AgentSpecExposure::user_only();
502    spec
503}
504
505fn assert_available_commands(update: SessionUpdate, expected: &[&str], unexpected: &[&str]) {
506    let SessionUpdate::AvailableCommandsUpdate(commands) = update else {
507        panic!("expected available commands update");
508    };
509    let names = commands.available_commands.iter().map(|command| command.name.as_str()).collect::<Vec<_>>();
510    for name in expected {
511        assert!(names.contains(name), "expected command /{name} in {names:?}");
512    }
513    for name in unexpected {
514        assert!(!names.contains(name), "did not expect command /{name} in {names:?}");
515    }
516}
517
518fn assert_server_status(notification: McpNotification, expected: &[&str]) {
519    let McpNotification::ServerStatus { servers } = notification else {
520        panic!("expected server status notification");
521    };
522    let names = servers.iter().map(|server| server.name.as_str()).collect::<Vec<_>>();
523    for server_name in expected {
524        assert!(names.contains(server_name), "expected server {server_name} in {names:?}");
525    }
526}
527
528fn assert_server_status_exact(notification: McpNotification, expected: &[&str]) {
529    let McpNotification::ServerStatus { servers } = notification else {
530        panic!("expected server status notification");
531    };
532    let names = servers.iter().map(|server| server.name.as_str()).collect::<Vec<_>>();
533    assert_eq!(names, expected);
534}
535
536fn conversation_texts(context: &Context) -> Vec<String> {
537    context
538        .messages()
539        .iter()
540        .filter_map(|message| match message {
541            ChatMessage::User { content, .. } => llm::ContentBlock::first_text(content).map(str::to_string),
542            ChatMessage::Assistant { content, .. } if !content.is_empty() => Some(content.clone()),
543            _ => None,
544        })
545        .collect()
546}