use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
use adk_acp::{
AcpAgentConfig, OutputChunk, PermissionPolicy,
StatusTracker, UsageTracker, AcpUsage, stream_prompt,
status::AgentStatus,
};
use tokio::sync::mpsc;
use tracing::{info, warn};
use super::config::{AgentTransport, CodingAgentInstanceConfig};
use super::models::{TaskError, TaskRequest, TaskResult};
#[derive(Debug, Clone)]
pub enum CodingAgentUpdate {
Status(AgentStatusUpdate),
Text(String),
Thought(String),
ToolCallStarted {
title: String,
},
ToolCallCompleted {
title: String,
},
PermissionRequested {
title: String,
approved: bool,
},
Done {
output: String,
duration: Duration,
success: bool,
error: Option<String>,
},
}
#[derive(Debug, Clone)]
pub struct AgentStatusUpdate {
pub status: AgentStatusKind,
pub description: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AgentStatusKind {
Starting,
Running,
WaitingPermission,
Idle,
Error,
Stopped,
}
impl From<AgentStatus> for AgentStatusKind {
fn from(status: AgentStatus) -> Self {
match status {
AgentStatus::Starting => Self::Starting,
AgentStatus::Running => Self::Running,
AgentStatus::WaitingPermission => Self::WaitingPermission,
AgentStatus::Idle => Self::Idle,
AgentStatus::Error => Self::Error,
AgentStatus::Stopping | AgentStatus::Stopped => Self::Stopped,
}
}
}
pub type CodingAgentUpdateStream = mpsc::Receiver<CodingAgentUpdate>;
pub struct StreamingAcpClient {
usage_tracker: UsageTracker,
policy: Arc<PermissionPolicy>,
}
impl StreamingAcpClient {
pub fn new() -> Self {
Self {
usage_tracker: UsageTracker::new(),
policy: Arc::new(PermissionPolicy::AutoApprove),
}
}
pub fn with_policy(policy: PermissionPolicy) -> Self {
Self {
usage_tracker: UsageTracker::new(),
policy: Arc::new(policy),
}
}
pub async fn execute_streaming(
&self,
agent_id: &str,
config: &CodingAgentInstanceConfig,
request: &TaskRequest,
) -> Result<CodingAgentUpdateStream, TaskError> {
let transport = config.transport.as_ref().ok_or_else(|| {
TaskError::ExecutionError {
message: format!("Agent '{}' has no transport configured", agent_id),
partial_output: None,
}
})?;
let AgentTransport::Stdio { command, args, env } = transport else {
return Err(TaskError::ExecutionError {
message: format!("Agent '{}' has non-stdio transport — streaming requires stdio", agent_id),
partial_output: None,
});
};
let full_command = if args.is_empty() {
command.clone()
} else {
format!("{} {}", command, args.join(" "))
};
let working_dir = request.workspace.clone()
.or_else(|| config.workspaces.first().cloned())
.unwrap_or_else(|| PathBuf::from("."));
let mut acp_config = AcpAgentConfig::new(&full_command)
.working_dir(&working_dir);
for (key, val) in env {
acp_config = acp_config.env(key, val);
}
let prompt = build_prompt(request);
let (update_tx, update_rx) = mpsc::channel::<CodingAgentUpdate>(64);
let status_tracker = StatusTracker::new();
let policy = self.policy.clone();
let usage_tracker = self.usage_tracker.clone();
let agent_id_owned = agent_id.to_string();
let prompt_len = prompt.len();
tokio::spawn(async move {
let start = Instant::now();
let mut full_output = String::new();
let mut success = true;
let mut error_msg: Option<String> = None;
let _ = update_tx.send(CodingAgentUpdate::Status(AgentStatusUpdate {
status: AgentStatusKind::Starting,
description: "Starting coding agent...".to_string(),
})).await;
match stream_prompt(&acp_config, &prompt, policy, status_tracker.clone()).await {
Ok(mut stream) => {
let _ = update_tx.send(CodingAgentUpdate::Status(AgentStatusUpdate {
status: AgentStatusKind::Running,
description: "Agent is working...".to_string(),
})).await;
while let Some(chunk) = stream.recv().await {
match chunk {
OutputChunk::Text(text) => {
full_output.push_str(&text);
let _ = update_tx.send(CodingAgentUpdate::Text(text)).await;
}
OutputChunk::Thought(thought) => {
let _ = update_tx.send(CodingAgentUpdate::Thought(thought)).await;
}
OutputChunk::ToolCall { title } => {
let _ = update_tx.send(CodingAgentUpdate::ToolCallStarted {
title: title.clone(),
}).await;
}
OutputChunk::ToolCallComplete { title } => {
let _ = update_tx.send(CodingAgentUpdate::ToolCallCompleted {
title,
}).await;
}
OutputChunk::PermissionRequested { title, approved } => {
if !approved {
let _ = update_tx.send(CodingAgentUpdate::Status(AgentStatusUpdate {
status: AgentStatusKind::WaitingPermission,
description: format!("Permission denied: {}", title),
})).await;
}
let _ = update_tx.send(CodingAgentUpdate::PermissionRequested {
title,
approved,
}).await;
}
OutputChunk::Done => {
break;
}
OutputChunk::Error(err) => {
success = false;
error_msg = Some(err.clone());
warn!(agent_id = %agent_id_owned, error = %err, "ACP stream error");
break;
}
}
}
}
Err(e) => {
success = false;
error_msg = Some(e.to_string());
warn!(agent_id = %agent_id_owned, error = %e, "Failed to start ACP stream");
}
}
let duration = start.elapsed();
usage_tracker.record(&AcpUsage {
tool_name: agent_id_owned.clone(),
prompt_chars: prompt_len,
response_chars: full_output.len(),
duration,
success,
permission_requests: 0, permissions_denied: 0,
});
let _ = update_tx.send(CodingAgentUpdate::Done {
output: full_output,
duration,
success,
error: error_msg,
}).await;
info!(
agent_id = %agent_id_owned,
duration_ms = duration.as_millis() as u64,
success = success,
"Streaming ACP task completed"
);
});
Ok(update_rx)
}
pub async fn execute_task(
&self,
agent_id: &str,
config: &CodingAgentInstanceConfig,
request: &TaskRequest,
) -> Result<TaskResult, TaskError> {
let mut stream = self.execute_streaming(agent_id, config, request).await?;
let mut output = String::new();
let mut duration = Duration::ZERO;
let mut error: Option<String> = None;
while let Some(update) = stream.recv().await {
match update {
CodingAgentUpdate::Text(text) => {
output.push_str(&text);
}
CodingAgentUpdate::Done { output: final_output, duration: d, success, error: e } => {
output = final_output;
duration = d;
if !success {
error = e;
}
break;
}
_ => {
}
}
}
if let Some(err) = error {
return Err(TaskError::ExecutionError {
message: err,
partial_output: if output.is_empty() { None } else { Some(output) },
});
}
Ok(TaskResult {
output,
modified_files: vec![], duration_ms: duration.as_millis() as u64,
token_usage: None,
})
}
pub fn usage_stats(&self) -> adk_acp::AcpUsageStats {
self.usage_tracker.stats()
}
pub fn reset_stats(&self) {
self.usage_tracker.reset();
}
}
impl Default for StreamingAcpClient {
fn default() -> Self {
Self::new()
}
}
fn build_prompt(request: &TaskRequest) -> String {
let mut prompt = request.description.clone();
if let Some(workspace) = &request.workspace {
prompt = format!("Working directory: {}\n\n{}", workspace.display(), prompt);
}
if let Some(files) = &request.file_context {
if !files.is_empty() {
let file_list: Vec<String> = files.iter().map(|f| f.display().to_string()).collect();
prompt = format!("{}\n\nRelevant files:\n{}", prompt, file_list.join("\n"));
}
}
prompt
}
pub fn format_update_for_display(update: &CodingAgentUpdate) -> Option<String> {
match update {
CodingAgentUpdate::Status(status) => {
let emoji = match status.status {
AgentStatusKind::Starting => "🚀",
AgentStatusKind::Running => "⚙️",
AgentStatusKind::WaitingPermission => "🔐",
AgentStatusKind::Idle => "💤",
AgentStatusKind::Error => "❌",
AgentStatusKind::Stopped => "🛑",
};
Some(format!("{} {}", emoji, status.description))
}
CodingAgentUpdate::ToolCallStarted { title } => {
Some(format!("🔧 {}", title))
}
CodingAgentUpdate::ToolCallCompleted { title } => {
Some(format!("✅ {}", title))
}
CodingAgentUpdate::PermissionRequested { title, approved } => {
if *approved {
Some(format!("🔓 Approved: {}", title))
} else {
Some(format!("🔒 Denied: {}", title))
}
}
CodingAgentUpdate::Done { duration, success, error, .. } => {
if *success {
Some(format!("✅ Completed in {:.1}s", duration.as_secs_f64()))
} else {
Some(format!("❌ Failed after {:.1}s: {}",
duration.as_secs_f64(),
error.as_deref().unwrap_or("unknown error")
))
}
}
CodingAgentUpdate::Text(_) | CodingAgentUpdate::Thought(_) => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_format_update_status() {
let update = CodingAgentUpdate::Status(AgentStatusUpdate {
status: AgentStatusKind::Running,
description: "Agent is working...".to_string(),
});
let formatted = format_update_for_display(&update);
assert!(formatted.is_some());
let text = formatted.unwrap();
assert!(text.contains("⚙️"));
}
#[test]
fn test_format_update_tool_call() {
let update = CodingAgentUpdate::ToolCallStarted {
title: "Reading file src/main.rs".to_string(),
};
let formatted = format_update_for_display(&update);
assert!(formatted.is_some());
let text = formatted.unwrap();
assert!(text.contains("🔧"));
assert!(text.contains("Reading file"));
}
#[test]
fn test_format_update_done_success() {
let update = CodingAgentUpdate::Done {
output: "Done".to_string(),
duration: Duration::from_secs(5),
success: true,
error: None,
};
let formatted = format_update_for_display(&update);
assert!(formatted.is_some());
let text = formatted.unwrap();
assert!(text.contains("✅"));
assert!(text.contains("5.0s"));
}
#[test]
fn test_format_update_done_failure() {
let update = CodingAgentUpdate::Done {
output: "".to_string(),
duration: Duration::from_secs(3),
success: false,
error: Some("Connection lost".to_string()),
};
let formatted = format_update_for_display(&update);
assert!(formatted.is_some());
let text = formatted.unwrap();
assert!(text.contains("❌"));
assert!(text.contains("Connection lost"));
}
#[test]
fn test_format_update_text_returns_none() {
let update = CodingAgentUpdate::Text("Hello".to_string());
assert!(format_update_for_display(&update).is_none());
}
#[test]
fn test_build_prompt_basic() {
let request = TaskRequest {
description: "Fix the bug".to_string(),
trigger: super::super::models::TaskTrigger::ControlPanel {
user_id: "test".to_string(),
},
workspace: None,
file_context: None,
reply_to: super::super::models::ReplyTarget {
channel_type: "telegram".to_string(),
channel_id: "123".to_string(),
message_id: None,
},
};
let prompt = build_prompt(&request);
assert_eq!(prompt, "Fix the bug");
}
#[test]
fn test_build_prompt_with_workspace() {
let request = TaskRequest {
description: "Fix the bug".to_string(),
trigger: super::super::models::TaskTrigger::ControlPanel {
user_id: "test".to_string(),
},
workspace: Some(PathBuf::from("/home/user/project")),
file_context: None,
reply_to: super::super::models::ReplyTarget {
channel_type: "telegram".to_string(),
channel_id: "123".to_string(),
message_id: None,
},
};
let prompt = build_prompt(&request);
assert!(prompt.contains("Working directory: /home/user/project"));
assert!(prompt.contains("Fix the bug"));
}
}