1use super::agent::acp_agent_builder;
2use super::agent_key::AgentKey;
3use super::agent_runtime::{AgentRuntime, RuntimeEvent, RuntimeFactory};
4use super::error::SessionError;
5use super::fake_prompt_mcp::FakePromptMcp;
6use super::model_config::{Modes, ValidatedMode};
7use super::session_actor::{SessionActor, SessionActorInit};
8use super::session_config_state::SessionConfigState;
9use super::session_factory::InitialSessionSelection;
10use super::state::{AcpState, AcpStateConfig};
11use crate::acp::session_store::{SessionMeta, SessionStore};
12use crate::error::CliError;
13use crate::settings_args::SettingsSourceArgs;
14use acp_utils::notifications::McpNotification;
15use acp_utils::testing::{TestPeer, duplex_pair};
16use aether_auth::OAuthCredentialStorage;
17use aether_core::agent_spec::{AgentSpec, AgentSpecExposure};
18use aether_core::context::ext::{SessionControlEvent, SessionEvent, UserEvent, last_agent_from_events};
19use aether_core::core::{AgentBuilder, AgentHandle, Prompt};
20use aether_core::events::{AgentMessage, Command};
21use aether_core::mcp::McpSpawnResult;
22use aether_core::mcp::mcp;
23use agent_client_protocol::schema::{SessionId, SessionUpdate};
24use agent_client_protocol::{Agent, Client, ConnectionTo};
25use llm::ProviderConnectionOverrides;
26use llm::testing::FakeLlmProvider;
27use llm::{ChatMessage, Context, LlmResponse, StreamingModelProvider};
28use mcp_utils::client::{McpServer, McpTransport};
29use std::collections::HashMap;
30use std::path::PathBuf;
31use std::sync::{Arc, Mutex};
32use tokio::sync::{mpsc, oneshot};
33use tokio::task::spawn_local;
34
35const PLANNER_REPLY: &str = "planner reply";
36const CODER_REPLY: &str = "coder reply";
37
38pub struct AcpTestHarness {
44 pub client_cx: ConnectionTo<Agent>,
45 pub peer: TestPeer,
46 agent_cx: ConnectionTo<Client>,
47 state: Arc<AcpState>,
48 session_store: Arc<SessionStore>,
49 _tmp: tempfile::TempDir,
50}
51
52pub struct FakeAgentSwitchingSession {
53 session_id: SessionId,
54 planner: FakeAcpAgent,
55 coder: FakeAcpAgent,
56}
57
58#[derive(Clone)]
59pub struct FakeAcpAgent {
60 name: String,
61 captured_contexts: Arc<Mutex<Vec<Context>>>,
62}
63
64impl AcpTestHarness {
65 pub async fn start() -> Self {
66 let tmp = tempfile::tempdir().expect("tempdir for session store");
67 let session_store = Arc::new(SessionStore::from_path(tmp.path().to_path_buf()));
68 let state = Arc::new(AcpState::new(AcpStateConfig {
69 session_store: session_store.clone(),
70 oauth_credential_store: fake_oauth_store(),
71 initial_selection: InitialSessionSelection::default(),
72 settings_source: SettingsSourceArgs::default(),
73 provider_connections: ProviderConnectionOverrides::default(),
74 }));
75
76 let (peer, client_builder) = TestPeer::new();
77 let (agent_transport, client_transport) = duplex_pair();
78 let (agent_cx_tx, agent_cx_rx) = oneshot::channel::<ConnectionTo<Client>>();
79 let (client_cx_tx, client_cx_rx) = oneshot::channel::<ConnectionTo<Agent>>();
80
81 let server_state = state.clone();
82 spawn_local(async move {
83 let _ = acp_agent_builder(server_state)
84 .connect_with(agent_transport, async move |cx: ConnectionTo<Client>| {
85 let _ = agent_cx_tx.send(cx);
86 std::future::pending::<()>().await;
87 Ok(())
88 })
89 .await;
90 });
91
92 spawn_local(async move {
93 let _ = client_builder
94 .connect_with(client_transport, async move |cx: ConnectionTo<Agent>| {
95 let _ = client_cx_tx.send(cx);
96 std::future::pending::<()>().await;
97 Ok(())
98 })
99 .await;
100 });
101
102 let agent_cx = agent_cx_rx.await.expect("agent side connect_with produced a ConnectionTo");
103 let client_cx = client_cx_rx.await.expect("client side connect_with produced a ConnectionTo");
104 Self { client_cx, peer, agent_cx, state, session_store, _tmp: tmp }
105 }
106
107 pub async fn insert_agent_switching_session(&self) -> FakeAgentSwitchingSession {
108 self.insert_switching_session(
109 SessionId::new("agent-switching-session"),
110 Vec::new(),
111 Some("Planner".to_string()),
112 false,
113 )
114 .await
115 }
116
117 pub async fn insert_agent_switching_session_with_serverless_coder(&self) -> FakeAgentSwitchingSession {
118 self.insert_switching_session(
119 SessionId::new("agent-switching-serverless-session"),
120 Vec::new(),
121 Some("Planner".to_string()),
122 true,
123 )
124 .await
125 }
126
127 pub async fn insert_loaded_agent_switching_session(&self, session_id: &str) -> FakeAgentSwitchingSession {
128 let events = self.session_store.load(session_id).map(|(_, events)| events).unwrap_or_default();
129 let selected_mode = last_agent_from_events(Some("Planner".to_string()), &events);
130 self.insert_switching_session(SessionId::new(session_id), events, selected_mode, false).await
131 }
132
133 pub async fn expect_mcp_server_status(&mut self, expected: &[&str]) {
134 assert_server_status(self.peer.next_mcp_notification().await, expected);
135 }
136
137 pub async fn expect_mcp_server_status_exact(&mut self, expected: &[&str]) {
138 assert_server_status_exact(self.peer.next_mcp_notification().await, expected);
139 }
140
141 pub async fn expect_available_commands(&mut self, expected: &[&str], unexpected: &[&str]) {
142 loop {
143 let update = self.peer.next_session_notification().await.update;
144 if matches!(update, SessionUpdate::AvailableCommandsUpdate(_)) {
145 assert_available_commands(update, expected, unexpected);
146 return;
147 }
148 }
149 }
150
151 pub fn append_agent_switch(&self, session_id: &str, from: Option<&str>, to: Option<&str>) {
152 self.append_stored_event(
153 session_id,
154 &SessionEvent::Control(SessionControlEvent::AgentSwitched {
155 from: from.map(str::to_string),
156 to: to.map(str::to_string),
157 }),
158 );
159 }
160
161 pub async fn insert_stub_session(
167 &self,
168 agent_tx: mpsc::Sender<Command>,
169 agent_rx: mpsc::Receiver<AgentMessage>,
170 agent_handle: AgentHandle,
171 id: SessionId,
172 model: &str,
173 ) {
174 let model_spec: llm::catalog::LlmModel = "anthropic:claude-sonnet-4-5".parse().expect("test model parses");
175 let spec = AgentSpec::default_spec(&model_spec, None, Vec::new());
176 let factory = Arc::new(StubRuntimeFactory {
177 cwd: PathBuf::from("/tmp"),
178 agent_parts: Mutex::new(Some(StubAgentParts { tx: agent_tx, rx: agent_rx, handle: agent_handle })),
179 });
180
181 let handle = SessionActor::spawn(SessionActorInit {
182 session_id: id.clone(),
183 connection: self.agent_cx.clone(),
184 repository: self.session_store.clone(),
185 oauth_credential_store: fake_oauth_store(),
186 active_agent: AgentKey::Default,
187 specs: HashMap::from([(AgentKey::Default, spec)]),
188 runtime_factory: factory,
189 transcript: Vec::new(),
190 modes: Modes::default(),
191 config: SessionConfigState::with_selection(model.to_string(), None, None),
192 })
193 .await
194 .expect("stub session actor spawns");
195 self.state.register_session(&id, handle).await;
196 }
197
198 pub fn append_stored_session(&self, session_id: &str, created_at: &str) {
199 let meta = SessionMeta {
200 session_id: session_id.to_string(),
201 cwd: PathBuf::from("/tmp"),
202 model: "test-model".to_string(),
203 selected_mode: None,
204 created_at: created_at.to_string(),
205 };
206
207 self.session_store.append_meta(session_id, &meta).expect("stored session meta appends");
208 }
209
210 pub fn append_stored_prompt(&self, session_id: &str, prompt: &str) {
211 self.append_stored_event(
212 session_id,
213 &SessionEvent::User(UserEvent::Message { content: vec![llm::ContentBlock::text(prompt)] }),
214 );
215 }
216
217 pub fn append_stored_user_blocks(&self, session_id: &str, blocks: Vec<llm::ContentBlock>) {
218 self.append_stored_event(session_id, &SessionEvent::User(UserEvent::Message { content: blocks }));
219 }
220
221 pub fn append_stored_agent_text(&self, session_id: &str, text: &str) {
222 self.append_stored_event(
223 session_id,
224 &SessionEvent::Agent(AgentMessage::Text {
225 message_id: "msg".to_string(),
226 chunk: text.to_string(),
227 is_complete: true,
228 model_name: "test".to_string(),
229 }),
230 );
231 }
232
233 async fn insert_switching_session(
234 &self,
235 acp_session_id: SessionId,
236 events: Vec<SessionEvent>,
237 selected_mode: Option<String>,
238 serverless_coder: bool,
239 ) -> FakeAgentSwitchingSession {
240 let (planner_def, planner) = fake_agent("Planner", "planner-mcp", "plan", PLANNER_REPLY);
241 let (mut coder_def, coder) = fake_agent("Coder", "coder-mcp", "edit", CODER_REPLY);
242 if serverless_coder {
243 coder_def.mcp = None;
244 }
245
246 let mut specs = HashMap::new();
247 let mut agents = HashMap::new();
248 for def in [planner_def, coder_def] {
249 specs.insert(AgentKey::Named(def.spec.name.clone()), def.spec.clone());
250 agents.insert(def.spec.name.clone(), def);
251 }
252
253 let factory = Arc::new(FakeRuntimeFactory { cwd: PathBuf::from("/tmp"), agents });
254 let initial_agent = selected_mode.clone().unwrap_or_else(|| "Planner".to_string());
255
256 let handle = SessionActor::spawn(SessionActorInit {
257 session_id: acp_session_id.clone(),
258 connection: self.agent_cx.clone(),
259 repository: self.session_store.clone(),
260 oauth_credential_store: fake_oauth_store(),
261 active_agent: AgentKey::Named(initial_agent),
262 specs,
263 runtime_factory: factory,
264 transcript: events,
265 modes: switching_modes(),
266 config: SessionConfigState::with_selection("anthropic:claude-sonnet-4-5".to_string(), selected_mode, None),
267 })
268 .await
269 .expect("fake agent switching session actor spawns");
270 self.state.register_session(&acp_session_id, handle).await;
271 FakeAgentSwitchingSession { session_id: acp_session_id, planner, coder }
272 }
273
274 fn append_stored_event(&self, session_id: &str, event: &SessionEvent) {
275 self.session_store.append_event(session_id, event).expect("stored session event appends");
276 }
277}
278
279impl FakeAgentSwitchingSession {
280 pub fn session_id(&self) -> &SessionId {
281 &self.session_id
282 }
283
284 pub fn planner(&self) -> &FakeAcpAgent {
285 &self.planner
286 }
287
288 pub fn coder(&self) -> &FakeAcpAgent {
289 &self.coder
290 }
291
292 pub fn agent(&self, name: &str) -> &FakeAcpAgent {
293 match name {
294 "Planner" => &self.planner,
295 "Coder" => &self.coder,
296 other => panic!("unknown fake ACP agent {other:?}"),
297 }
298 }
299}
300
301impl FakeAcpAgent {
302 pub fn name(&self) -> &str {
303 &self.name
304 }
305
306 pub fn assert_saw(&self, expected: &[&str]) {
309 let seen = self.latest_conversation();
310 for text in expected {
311 assert!(seen.iter().any(|m| m == text), "{} should have seen {text:?}; saw {seen:?}", self.name);
312 }
313 }
314
315 pub fn assert_saw_exactly(&self, expected: &[&str]) {
319 let seen = self.latest_conversation();
320 let expected: Vec<String> = expected.iter().map(|t| (*t).to_string()).collect();
321 assert_eq!(seen, expected, "{} conversation mismatch", self.name);
322 }
323
324 pub fn assert_never_ran(&self) {
326 let contexts = self.captured_contexts.lock().expect("captured contexts lock is healthy");
327 assert!(contexts.is_empty(), "{} should not have run; captured {} context(s)", self.name, contexts.len());
328 }
329
330 fn latest_conversation(&self) -> Vec<String> {
331 let contexts = self.captured_contexts.lock().expect("captured contexts lock is healthy");
332 let latest = contexts.last().unwrap_or_else(|| panic!("{} should have run a turn", self.name));
333 conversation_texts(latest)
334 }
335}
336
337struct FakeRuntimeFactory {
341 cwd: PathBuf,
342 agents: HashMap<String, FakeAgentDef>,
343}
344
345struct FakeAgentDef {
346 spec: AgentSpec,
347 provider: Mutex<Option<Arc<dyn StreamingModelProvider>>>,
348 mcp: Option<(String, String)>,
349}
350
351#[async_trait::async_trait]
352impl RuntimeFactory for FakeRuntimeFactory {
353 async fn spawn(
354 &self,
355 agent: AgentKey,
356 spec: &AgentSpec,
357 initial_messages: Vec<ChatMessage>,
358 runtime_event_tx: mpsc::Sender<RuntimeEvent>,
359 ) -> Result<AgentRuntime, SessionError> {
360 let def = self.agents.get(&spec.name).ok_or_else(|| SessionError::AgentNotFound(spec.name.clone()))?;
361 let provider = def
362 .provider
363 .lock()
364 .expect("fake provider lock is healthy")
365 .take()
366 .expect("fake agent runtime spawned more than once");
367
368 let servers = def.mcp.as_ref().map_or_else(Vec::new, |(server_name, prompt_name)| {
369 vec![McpServer::new(
370 server_name.clone(),
371 McpTransport::InMemory { server: FakePromptMcp::new(prompt_name).into_dyn() },
372 false,
373 )]
374 });
375 let mut spawn = mcp(&self.cwd)
376 .with_servers(servers)
377 .spawn()
378 .await
379 .map_err(|e| SessionError::Build(CliError::McpError(e.to_string())))?;
380 let snapshot = spawn
381 .block_until_ready()
382 .await
383 .ok_or_else(|| SessionError::McpOperation("fake MCP bootstrap aborted".to_string()))?;
384 let McpSpawnResult { command_tx: mcp_tx, event_rx, handle: mcp_handle } = spawn;
385
386 let filtered_tools = spec.tools.apply(snapshot.tool_definitions.clone());
387 let mut builder = AgentBuilder::new(provider).max_auto_continues(0);
388 for prompt in &spec.prompts {
389 builder = builder.system_prompt(prompt.clone());
390 }
391 let (agent_tx, agent_rx, agent_handle) = builder
392 .tools(mcp_tx.clone(), filtered_tools)
393 .messages(initial_messages)
394 .spawn()
395 .await
396 .map_err(|e| SessionError::Build(CliError::AgentError(e.to_string())))?;
397
398 Ok(AgentRuntime::new(
399 agent,
400 spec,
401 agent_tx,
402 agent_rx,
403 Some(agent_handle),
404 mcp_tx,
405 event_rx,
406 mcp_handle,
407 snapshot,
408 runtime_event_tx,
409 ))
410 }
411}
412
413struct StubRuntimeFactory {
414 cwd: PathBuf,
415 agent_parts: Mutex<Option<StubAgentParts>>,
416}
417
418struct StubAgentParts {
419 tx: mpsc::Sender<Command>,
420 rx: mpsc::Receiver<AgentMessage>,
421 handle: AgentHandle,
422}
423
424#[async_trait::async_trait]
425impl RuntimeFactory for StubRuntimeFactory {
426 async fn spawn(
427 &self,
428 agent: AgentKey,
429 spec: &AgentSpec,
430 _initial_messages: Vec<ChatMessage>,
431 runtime_event_tx: mpsc::Sender<RuntimeEvent>,
432 ) -> Result<AgentRuntime, SessionError> {
433 let parts = self
434 .agent_parts
435 .lock()
436 .expect("stub agent parts lock is healthy")
437 .take()
438 .expect("stub runtime spawned more than once");
439
440 let mut spawn =
441 mcp(&self.cwd).spawn().await.map_err(|e| SessionError::Build(CliError::McpError(e.to_string())))?;
442 let snapshot = spawn
443 .block_until_ready()
444 .await
445 .ok_or_else(|| SessionError::McpOperation("stub MCP bootstrap aborted".to_string()))?;
446 let McpSpawnResult { command_tx: mcp_tx, event_rx, handle: mcp_handle } = spawn;
447
448 Ok(AgentRuntime::new(
449 agent,
450 spec,
451 parts.tx,
452 parts.rx,
453 Some(parts.handle),
454 mcp_tx,
455 event_rx,
456 mcp_handle,
457 snapshot,
458 runtime_event_tx,
459 ))
460 }
461}
462
463fn fake_agent(name: &str, server_name: &str, prompt_name: &str, reply: &str) -> (FakeAgentDef, FakeAcpAgent) {
464 let provider =
465 FakeLlmProvider::new(vec![vec![LlmResponse::start("msg"), LlmResponse::text(reply), LlmResponse::done()]])
466 .with_display_name(name);
467 let captured_contexts = provider.captured_contexts();
468 let def = FakeAgentDef {
469 spec: fake_agent_spec(name),
470 provider: Mutex::new(Some(Arc::new(provider))),
471 mcp: Some((server_name.to_string(), prompt_name.to_string())),
472 };
473 let observer = FakeAcpAgent { name: name.to_string(), captured_contexts };
474 (def, observer)
475}
476
477fn fake_oauth_store() -> Arc<dyn OAuthCredentialStorage> {
478 Arc::new(aether_auth::FakeOAuthCredentialStore::new())
479}
480
481fn switching_modes() -> Modes {
482 Modes::new(vec![
483 ValidatedMode {
484 name: "Planner".to_string(),
485 model: "anthropic:claude-sonnet-4-5".to_string(),
486 reasoning_effort: None,
487 },
488 ValidatedMode {
489 name: "Coder".to_string(),
490 model: "deepseek:deepseek-chat".to_string(),
491 reasoning_effort: None,
492 },
493 ])
494}
495
496fn fake_agent_spec(name: &str) -> AgentSpec {
497 let model: llm::catalog::LlmModel = "anthropic:claude-sonnet-4-5".parse().expect("test model parses");
498 let mut spec = AgentSpec::default_spec(&model, None, vec![Prompt::text(&format!("{name} system prompt"))]);
499 spec.name = name.to_string();
500 spec.description = format!("{name} test agent");
501 spec.exposure = AgentSpecExposure::user_only();
502 spec
503}
504
505fn assert_available_commands(update: SessionUpdate, expected: &[&str], unexpected: &[&str]) {
506 let SessionUpdate::AvailableCommandsUpdate(commands) = update else {
507 panic!("expected available commands update");
508 };
509 let names = commands.available_commands.iter().map(|command| command.name.as_str()).collect::<Vec<_>>();
510 for name in expected {
511 assert!(names.contains(name), "expected command /{name} in {names:?}");
512 }
513 for name in unexpected {
514 assert!(!names.contains(name), "did not expect command /{name} in {names:?}");
515 }
516}
517
518fn assert_server_status(notification: McpNotification, expected: &[&str]) {
519 let McpNotification::ServerStatus { servers } = notification else {
520 panic!("expected server status notification");
521 };
522 let names = servers.iter().map(|server| server.name.as_str()).collect::<Vec<_>>();
523 for server_name in expected {
524 assert!(names.contains(server_name), "expected server {server_name} in {names:?}");
525 }
526}
527
528fn assert_server_status_exact(notification: McpNotification, expected: &[&str]) {
529 let McpNotification::ServerStatus { servers } = notification else {
530 panic!("expected server status notification");
531 };
532 let names = servers.iter().map(|server| server.name.as_str()).collect::<Vec<_>>();
533 assert_eq!(names, expected);
534}
535
536fn conversation_texts(context: &Context) -> Vec<String> {
537 context
538 .messages()
539 .iter()
540 .filter_map(|message| match message {
541 ChatMessage::User { content, .. } => llm::ContentBlock::first_text(content).map(str::to_string),
542 ChatMessage::Assistant { content, .. } if !content.is_empty() => Some(content.clone()),
543 _ => None,
544 })
545 .collect()
546}