use super::agent::acp_agent_builder;
use super::agent_key::AgentKey;
use super::agent_runtime::{AgentRuntime, RuntimeEvent, RuntimeFactory};
use super::error::SessionError;
use super::fake_prompt_mcp::FakePromptMcp;
use super::model_config::{Modes, ValidatedMode};
use super::session_actor::{SessionActor, SessionActorInit};
use super::session_config_state::SessionConfigState;
use super::session_factory::InitialSessionSelection;
use super::state::{AcpState, AcpStateConfig};
use crate::acp::session_store::{SessionMeta, SessionStore};
use crate::error::CliError;
use crate::settings_args::SettingsSourceArgs;
use acp_utils::notifications::McpNotification;
use acp_utils::testing::{TestPeer, duplex_pair};
use aether_auth::OAuthCredentialStorage;
use aether_core::agent_spec::{AgentSpec, AgentSpecExposure};
use aether_core::context::ext::{SessionControlEvent, SessionEvent, UserEvent, last_agent_from_events};
use aether_core::core::{AgentBuilder, AgentHandle, Prompt};
use aether_core::events::{AgentMessage, Command};
use aether_core::mcp::McpSpawnResult;
use aether_core::mcp::mcp;
use agent_client_protocol::schema::{SessionId, SessionUpdate};
use agent_client_protocol::{Agent, Client, ConnectionTo};
use llm::ProviderConnectionOverrides;
use llm::testing::FakeLlmProvider;
use llm::{ChatMessage, Context, LlmResponse, StreamingModelProvider};
use mcp_utils::client::{McpServer, McpTransport};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use tokio::sync::{mpsc, oneshot};
use tokio::task::spawn_local;
const PLANNER_REPLY: &str = "planner reply";
const CODER_REPLY: &str = "coder reply";
pub struct AcpTestHarness {
pub client_cx: ConnectionTo<Agent>,
pub peer: TestPeer,
agent_cx: ConnectionTo<Client>,
state: Arc<AcpState>,
session_store: Arc<SessionStore>,
_tmp: tempfile::TempDir,
}
pub struct FakeAgentSwitchingSession {
session_id: SessionId,
planner: FakeAcpAgent,
coder: FakeAcpAgent,
}
#[derive(Clone)]
pub struct FakeAcpAgent {
name: String,
captured_contexts: Arc<Mutex<Vec<Context>>>,
}
impl AcpTestHarness {
pub async fn start() -> Self {
let tmp = tempfile::tempdir().expect("tempdir for session store");
let session_store = Arc::new(SessionStore::from_path(tmp.path().to_path_buf()));
let state = Arc::new(AcpState::new(AcpStateConfig {
session_store: session_store.clone(),
oauth_credential_store: fake_oauth_store(),
initial_selection: InitialSessionSelection::default(),
settings_source: SettingsSourceArgs::default(),
provider_connections: ProviderConnectionOverrides::default(),
}));
let (peer, client_builder) = TestPeer::new();
let (agent_transport, client_transport) = duplex_pair();
let (agent_cx_tx, agent_cx_rx) = oneshot::channel::<ConnectionTo<Client>>();
let (client_cx_tx, client_cx_rx) = oneshot::channel::<ConnectionTo<Agent>>();
let server_state = state.clone();
spawn_local(async move {
let _ = acp_agent_builder(server_state)
.connect_with(agent_transport, async move |cx: ConnectionTo<Client>| {
let _ = agent_cx_tx.send(cx);
std::future::pending::<()>().await;
Ok(())
})
.await;
});
spawn_local(async move {
let _ = client_builder
.connect_with(client_transport, async move |cx: ConnectionTo<Agent>| {
let _ = client_cx_tx.send(cx);
std::future::pending::<()>().await;
Ok(())
})
.await;
});
let agent_cx = agent_cx_rx.await.expect("agent side connect_with produced a ConnectionTo");
let client_cx = client_cx_rx.await.expect("client side connect_with produced a ConnectionTo");
Self { client_cx, peer, agent_cx, state, session_store, _tmp: tmp }
}
pub async fn insert_agent_switching_session(&self) -> FakeAgentSwitchingSession {
self.insert_switching_session(
SessionId::new("agent-switching-session"),
Vec::new(),
Some("Planner".to_string()),
false,
)
.await
}
pub async fn insert_agent_switching_session_with_serverless_coder(&self) -> FakeAgentSwitchingSession {
self.insert_switching_session(
SessionId::new("agent-switching-serverless-session"),
Vec::new(),
Some("Planner".to_string()),
true,
)
.await
}
pub async fn insert_loaded_agent_switching_session(&self, session_id: &str) -> FakeAgentSwitchingSession {
let events = self.session_store.load(session_id).map(|(_, events)| events).unwrap_or_default();
let selected_mode = last_agent_from_events(Some("Planner".to_string()), &events);
self.insert_switching_session(SessionId::new(session_id), events, selected_mode, false).await
}
pub async fn expect_mcp_server_status(&mut self, expected: &[&str]) {
assert_server_status(self.peer.next_mcp_notification().await, expected);
}
pub async fn expect_mcp_server_status_exact(&mut self, expected: &[&str]) {
assert_server_status_exact(self.peer.next_mcp_notification().await, expected);
}
pub async fn expect_available_commands(&mut self, expected: &[&str], unexpected: &[&str]) {
loop {
let update = self.peer.next_session_notification().await.update;
if matches!(update, SessionUpdate::AvailableCommandsUpdate(_)) {
assert_available_commands(update, expected, unexpected);
return;
}
}
}
pub fn append_agent_switch(&self, session_id: &str, from: Option<&str>, to: Option<&str>) {
self.append_stored_event(
session_id,
&SessionEvent::Control(SessionControlEvent::AgentSwitched {
from: from.map(str::to_string),
to: to.map(str::to_string),
}),
);
}
pub async fn insert_stub_session(
&self,
agent_tx: mpsc::Sender<Command>,
agent_rx: mpsc::Receiver<AgentMessage>,
agent_handle: AgentHandle,
id: SessionId,
model: &str,
) {
let model_spec: llm::catalog::LlmModel = "anthropic:claude-sonnet-4-5".parse().expect("test model parses");
let spec = AgentSpec::default_spec(&model_spec, None, Vec::new());
let factory = Arc::new(StubRuntimeFactory {
cwd: PathBuf::from("/tmp"),
agent_parts: Mutex::new(Some(StubAgentParts { tx: agent_tx, rx: agent_rx, handle: agent_handle })),
});
let handle = SessionActor::spawn(SessionActorInit {
session_id: id.clone(),
connection: self.agent_cx.clone(),
repository: self.session_store.clone(),
oauth_credential_store: fake_oauth_store(),
active_agent: AgentKey::Default,
specs: HashMap::from([(AgentKey::Default, spec)]),
runtime_factory: factory,
transcript: Vec::new(),
modes: Modes::default(),
config: SessionConfigState::with_selection(model.to_string(), None, None),
})
.await
.expect("stub session actor spawns");
self.state.register_session(&id, handle).await;
}
pub fn append_stored_session(&self, session_id: &str, created_at: &str) {
let meta = SessionMeta {
session_id: session_id.to_string(),
cwd: PathBuf::from("/tmp"),
model: "test-model".to_string(),
selected_mode: None,
created_at: created_at.to_string(),
};
self.session_store.append_meta(session_id, &meta).expect("stored session meta appends");
}
pub fn append_stored_prompt(&self, session_id: &str, prompt: &str) {
self.append_stored_event(
session_id,
&SessionEvent::User(UserEvent::Message { content: vec![llm::ContentBlock::text(prompt)] }),
);
}
pub fn append_stored_user_blocks(&self, session_id: &str, blocks: Vec<llm::ContentBlock>) {
self.append_stored_event(session_id, &SessionEvent::User(UserEvent::Message { content: blocks }));
}
pub fn append_stored_agent_text(&self, session_id: &str, text: &str) {
self.append_stored_event(
session_id,
&SessionEvent::Agent(AgentMessage::Text {
message_id: "msg".to_string(),
chunk: text.to_string(),
is_complete: true,
model_name: "test".to_string(),
}),
);
}
async fn insert_switching_session(
&self,
acp_session_id: SessionId,
events: Vec<SessionEvent>,
selected_mode: Option<String>,
serverless_coder: bool,
) -> FakeAgentSwitchingSession {
let (planner_def, planner) = fake_agent("Planner", "planner-mcp", "plan", PLANNER_REPLY);
let (mut coder_def, coder) = fake_agent("Coder", "coder-mcp", "edit", CODER_REPLY);
if serverless_coder {
coder_def.mcp = None;
}
let mut specs = HashMap::new();
let mut agents = HashMap::new();
for def in [planner_def, coder_def] {
specs.insert(AgentKey::Named(def.spec.name.clone()), def.spec.clone());
agents.insert(def.spec.name.clone(), def);
}
let factory = Arc::new(FakeRuntimeFactory { cwd: PathBuf::from("/tmp"), agents });
let initial_agent = selected_mode.clone().unwrap_or_else(|| "Planner".to_string());
let handle = SessionActor::spawn(SessionActorInit {
session_id: acp_session_id.clone(),
connection: self.agent_cx.clone(),
repository: self.session_store.clone(),
oauth_credential_store: fake_oauth_store(),
active_agent: AgentKey::Named(initial_agent),
specs,
runtime_factory: factory,
transcript: events,
modes: switching_modes(),
config: SessionConfigState::with_selection("anthropic:claude-sonnet-4-5".to_string(), selected_mode, None),
})
.await
.expect("fake agent switching session actor spawns");
self.state.register_session(&acp_session_id, handle).await;
FakeAgentSwitchingSession { session_id: acp_session_id, planner, coder }
}
fn append_stored_event(&self, session_id: &str, event: &SessionEvent) {
self.session_store.append_event(session_id, event).expect("stored session event appends");
}
}
impl FakeAgentSwitchingSession {
pub fn session_id(&self) -> &SessionId {
&self.session_id
}
pub fn planner(&self) -> &FakeAcpAgent {
&self.planner
}
pub fn coder(&self) -> &FakeAcpAgent {
&self.coder
}
pub fn agent(&self, name: &str) -> &FakeAcpAgent {
match name {
"Planner" => &self.planner,
"Coder" => &self.coder,
other => panic!("unknown fake ACP agent {other:?}"),
}
}
}
impl FakeAcpAgent {
pub fn name(&self) -> &str {
&self.name
}
pub fn assert_saw(&self, expected: &[&str]) {
let seen = self.latest_conversation();
for text in expected {
assert!(seen.iter().any(|m| m == text), "{} should have seen {text:?}; saw {seen:?}", self.name);
}
}
pub fn assert_saw_exactly(&self, expected: &[&str]) {
let seen = self.latest_conversation();
let expected: Vec<String> = expected.iter().map(|t| (*t).to_string()).collect();
assert_eq!(seen, expected, "{} conversation mismatch", self.name);
}
pub fn assert_never_ran(&self) {
let contexts = self.captured_contexts.lock().expect("captured contexts lock is healthy");
assert!(contexts.is_empty(), "{} should not have run; captured {} context(s)", self.name, contexts.len());
}
fn latest_conversation(&self) -> Vec<String> {
let contexts = self.captured_contexts.lock().expect("captured contexts lock is healthy");
let latest = contexts.last().unwrap_or_else(|| panic!("{} should have run a turn", self.name));
conversation_texts(latest)
}
}
struct FakeRuntimeFactory {
cwd: PathBuf,
agents: HashMap<String, FakeAgentDef>,
}
struct FakeAgentDef {
spec: AgentSpec,
provider: Mutex<Option<Arc<dyn StreamingModelProvider>>>,
mcp: Option<(String, String)>,
}
#[async_trait::async_trait]
impl RuntimeFactory for FakeRuntimeFactory {
async fn spawn(
&self,
agent: AgentKey,
spec: &AgentSpec,
initial_messages: Vec<ChatMessage>,
runtime_event_tx: mpsc::Sender<RuntimeEvent>,
) -> Result<AgentRuntime, SessionError> {
let def = self.agents.get(&spec.name).ok_or_else(|| SessionError::AgentNotFound(spec.name.clone()))?;
let provider = def
.provider
.lock()
.expect("fake provider lock is healthy")
.take()
.expect("fake agent runtime spawned more than once");
let servers = def.mcp.as_ref().map_or_else(Vec::new, |(server_name, prompt_name)| {
vec![McpServer::new(
server_name.clone(),
McpTransport::InMemory { server: FakePromptMcp::new(prompt_name).into_dyn() },
false,
)]
});
let mut spawn = mcp(&self.cwd)
.with_servers(servers)
.spawn()
.await
.map_err(|e| SessionError::Build(CliError::McpError(e.to_string())))?;
let snapshot = spawn
.block_until_ready()
.await
.ok_or_else(|| SessionError::McpOperation("fake MCP bootstrap aborted".to_string()))?;
let McpSpawnResult { command_tx: mcp_tx, event_rx, handle: mcp_handle } = spawn;
let filtered_tools = spec.tools.apply(snapshot.tool_definitions.clone());
let mut builder = AgentBuilder::new(provider).max_auto_continues(0);
for prompt in &spec.prompts {
builder = builder.system_prompt(prompt.clone());
}
let (agent_tx, agent_rx, agent_handle) = builder
.tools(mcp_tx.clone(), filtered_tools)
.messages(initial_messages)
.spawn()
.await
.map_err(|e| SessionError::Build(CliError::AgentError(e.to_string())))?;
Ok(AgentRuntime::new(
agent,
spec,
agent_tx,
agent_rx,
Some(agent_handle),
mcp_tx,
event_rx,
mcp_handle,
snapshot,
runtime_event_tx,
))
}
}
struct StubRuntimeFactory {
cwd: PathBuf,
agent_parts: Mutex<Option<StubAgentParts>>,
}
struct StubAgentParts {
tx: mpsc::Sender<Command>,
rx: mpsc::Receiver<AgentMessage>,
handle: AgentHandle,
}
#[async_trait::async_trait]
impl RuntimeFactory for StubRuntimeFactory {
async fn spawn(
&self,
agent: AgentKey,
spec: &AgentSpec,
_initial_messages: Vec<ChatMessage>,
runtime_event_tx: mpsc::Sender<RuntimeEvent>,
) -> Result<AgentRuntime, SessionError> {
let parts = self
.agent_parts
.lock()
.expect("stub agent parts lock is healthy")
.take()
.expect("stub runtime spawned more than once");
let mut spawn =
mcp(&self.cwd).spawn().await.map_err(|e| SessionError::Build(CliError::McpError(e.to_string())))?;
let snapshot = spawn
.block_until_ready()
.await
.ok_or_else(|| SessionError::McpOperation("stub MCP bootstrap aborted".to_string()))?;
let McpSpawnResult { command_tx: mcp_tx, event_rx, handle: mcp_handle } = spawn;
Ok(AgentRuntime::new(
agent,
spec,
parts.tx,
parts.rx,
Some(parts.handle),
mcp_tx,
event_rx,
mcp_handle,
snapshot,
runtime_event_tx,
))
}
}
fn fake_agent(name: &str, server_name: &str, prompt_name: &str, reply: &str) -> (FakeAgentDef, FakeAcpAgent) {
let provider =
FakeLlmProvider::new(vec![vec![LlmResponse::start("msg"), LlmResponse::text(reply), LlmResponse::done()]])
.with_display_name(name);
let captured_contexts = provider.captured_contexts();
let def = FakeAgentDef {
spec: fake_agent_spec(name),
provider: Mutex::new(Some(Arc::new(provider))),
mcp: Some((server_name.to_string(), prompt_name.to_string())),
};
let observer = FakeAcpAgent { name: name.to_string(), captured_contexts };
(def, observer)
}
fn fake_oauth_store() -> Arc<dyn OAuthCredentialStorage> {
Arc::new(aether_auth::FakeOAuthCredentialStore::new())
}
fn switching_modes() -> Modes {
Modes::new(vec![
ValidatedMode {
name: "Planner".to_string(),
model: "anthropic:claude-sonnet-4-5".to_string(),
reasoning_effort: None,
},
ValidatedMode {
name: "Coder".to_string(),
model: "deepseek:deepseek-chat".to_string(),
reasoning_effort: None,
},
])
}
fn fake_agent_spec(name: &str) -> AgentSpec {
let model: llm::catalog::LlmModel = "anthropic:claude-sonnet-4-5".parse().expect("test model parses");
let mut spec = AgentSpec::default_spec(&model, None, vec![Prompt::text(&format!("{name} system prompt"))]);
spec.name = name.to_string();
spec.description = format!("{name} test agent");
spec.exposure = AgentSpecExposure::user_only();
spec
}
fn assert_available_commands(update: SessionUpdate, expected: &[&str], unexpected: &[&str]) {
let SessionUpdate::AvailableCommandsUpdate(commands) = update else {
panic!("expected available commands update");
};
let names = commands.available_commands.iter().map(|command| command.name.as_str()).collect::<Vec<_>>();
for name in expected {
assert!(names.contains(name), "expected command /{name} in {names:?}");
}
for name in unexpected {
assert!(!names.contains(name), "did not expect command /{name} in {names:?}");
}
}
fn assert_server_status(notification: McpNotification, expected: &[&str]) {
let McpNotification::ServerStatus { servers } = notification else {
panic!("expected server status notification");
};
let names = servers.iter().map(|server| server.name.as_str()).collect::<Vec<_>>();
for server_name in expected {
assert!(names.contains(server_name), "expected server {server_name} in {names:?}");
}
}
fn assert_server_status_exact(notification: McpNotification, expected: &[&str]) {
let McpNotification::ServerStatus { servers } = notification else {
panic!("expected server status notification");
};
let names = servers.iter().map(|server| server.name.as_str()).collect::<Vec<_>>();
assert_eq!(names, expected);
}
fn conversation_texts(context: &Context) -> Vec<String> {
context
.messages()
.iter()
.filter_map(|message| match message {
ChatMessage::User { content, .. } => llm::ContentBlock::first_text(content).map(str::to_string),
ChatMessage::Assistant { content, .. } if !content.is_empty() => Some(content.clone()),
_ => None,
})
.collect()
}