use std::str::FromStr;
use std::sync::Arc;
use agent_client_protocol::schema::{
ContentBlock, InitializeRequest, ProtocolVersion, RequestPermissionOutcome,
RequestPermissionRequest, RequestPermissionResponse, SelectedPermissionOutcome,
SessionNotification, SessionUpdate,
};
use agent_client_protocol::{Agent, Client, ConnectionTo};
use agent_client_protocol_tokio::AcpAgent;
use tokio::sync::mpsc;
use tracing::{info, warn};
use crate::connection::AcpAgentConfig;
use crate::error::{AcpError, Result};
use crate::permissions::{
PermissionDecision, PermissionOption, PermissionPolicy, PermissionRequest,
};
use crate::status::{AgentStatus, StatusTracker};
#[derive(Debug, Clone)]
pub enum OutputChunk {
Text(String),
Thought(String),
ToolCall {
title: String,
},
ToolCallComplete {
title: String,
},
PermissionRequested {
title: String,
approved: bool,
},
Done,
Error(String),
}
pub type OutputStream = mpsc::Receiver<OutputChunk>;
pub async fn stream_prompt(
config: &AcpAgentConfig,
prompt: &str,
policy: Arc<PermissionPolicy>,
status: StatusTracker,
) -> Result<OutputStream> {
info!(command = %config.command, "starting streaming ACP prompt");
let agent = AcpAgent::from_str(&config.command).map_err(|e| {
AcpError::InvalidConfig(format!("invalid command '{}': {e}", config.command))
})?;
let (chunk_tx, chunk_rx) = mpsc::channel::<OutputChunk>(64);
let prompt_text = prompt.to_string();
let working_dir = config.working_dir.clone();
status.set(AgentStatus::Starting);
tokio::spawn(async move {
let chunk_tx_err = chunk_tx.clone();
let status_inner = status.clone();
let policy_clone = policy.clone();
let chunk_tx_perm = chunk_tx.clone();
let outcome = Client
.builder()
.on_receive_notification(
{
let tx = chunk_tx.clone();
async move |notif: SessionNotification, _cx: ConnectionTo<Agent>| {
match notif.update {
SessionUpdate::AgentMessageChunk(chunk) => {
if let ContentBlock::Text(text_content) = chunk.content {
let _ = tx
.send(OutputChunk::Text(text_content.text.to_string()))
.await;
}
}
SessionUpdate::AgentThoughtChunk(chunk) => {
if let ContentBlock::Text(text_content) = chunk.content {
let _ = tx
.send(OutputChunk::Thought(text_content.text.to_string()))
.await;
}
}
SessionUpdate::ToolCall(tool_call) => {
let _ = tx
.send(OutputChunk::ToolCall {
title: tool_call.title.to_string(),
})
.await;
}
_ => {}
}
Ok(())
}
},
agent_client_protocol::on_receive_notification!(),
)
.on_receive_request(
{
let status = status_inner.clone();
async move |request: RequestPermissionRequest,
responder,
_cx: ConnectionTo<Agent>| {
status.set(AgentStatus::WaitingPermission);
let title = request
.options
.first()
.map(|o| o.name.to_string())
.unwrap_or_else(|| "Unknown".to_string());
let perm_request = PermissionRequest {
title: title.clone(),
options: request
.options
.iter()
.map(|o| PermissionOption {
id: o.option_id.to_string(),
name: o.name.to_string(),
})
.collect(),
};
let decision = policy_clone.decide(&perm_request);
let approved = matches!(decision, PermissionDecision::Allow(_));
let _ = chunk_tx_perm
.send(OutputChunk::PermissionRequested {
title: title.clone(),
approved,
})
.await;
status.set(AgentStatus::Running);
match decision {
PermissionDecision::Allow(id) => responder.respond(
RequestPermissionResponse::new(RequestPermissionOutcome::Selected(
SelectedPermissionOutcome::new(id),
)),
),
PermissionDecision::Deny => responder.respond(
RequestPermissionResponse::new(RequestPermissionOutcome::Cancelled),
),
}
}
},
agent_client_protocol::on_receive_request!(),
)
.connect_with(agent, {
let status = status_inner.clone();
let tx = chunk_tx.clone();
|connection: ConnectionTo<Agent>| async move {
status.set(AgentStatus::Starting);
connection
.send_request(InitializeRequest::new(ProtocolVersion::V1))
.block_task()
.await?;
status.set(AgentStatus::Running);
connection
.build_session(&working_dir)
.block_task()
.run_until(async |mut session| {
session.send_prompt(&prompt_text)?;
let _ = session.read_to_string().await?;
let _ = tx.send(OutputChunk::Done).await;
Ok(())
})
.await?;
status.set(AgentStatus::Idle);
Ok(())
}
})
.await;
if let Err(e) = outcome {
warn!(error = %e, "streaming ACP session ended with error");
let _ = chunk_tx_err.send(OutputChunk::Error(e.to_string())).await;
}
status_inner.set(AgentStatus::Stopped);
});
Ok(chunk_rx)
}