use std::path::PathBuf;
use std::time::Duration;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::sync::mpsc;
use tracing::{debug, error, info};
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct AcpConfig {
pub endpoint: String,
#[serde(default = "default_timeout_secs", rename = "timeoutSecs")]
pub timeout_secs: u64,
#[serde(default, rename = "agentType")]
pub agent_type: AcpAgentType,
}
fn default_timeout_secs() -> u64 {
300
}
impl Default for AcpConfig {
fn default() -> Self {
Self {
endpoint: "http://localhost:3000/acp".to_string(),
timeout_secs: default_timeout_secs(),
agent_type: AcpAgentType::default(),
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "camelCase")]
pub enum AcpAgentType {
ClaudeCode,
Codex,
Custom {
name: String,
},
}
impl Default for AcpAgentType {
fn default() -> Self {
Self::ClaudeCode
}
}
impl AcpAgentType {
pub fn display_name(&self) -> &str {
match self {
AcpAgentType::ClaudeCode => "Claude Code",
AcpAgentType::Codex => "Codex",
AcpAgentType::Custom { name } => name,
}
}
}
#[derive(Debug, Error)]
pub enum AcpError {
#[error("ACP endpoint unreachable at {endpoint}: {reason}")]
EndpointUnreachable { endpoint: String, reason: String },
#[error("ACP task timed out after {timeout_secs}s (agent: {agent_type})")]
Timeout {
timeout_secs: u64,
agent_type: String,
},
#[error("ACP endpoint returned error (status {status}): {message}")]
EndpointError { status: u16, message: String },
#[error("Failed to build ACP request: {0}")]
RequestBuildError(String),
#[error("Failed to parse ACP response: {0}")]
ResponseParseError(String),
#[error("Progress channel closed")]
ProgressChannelClosed,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AcpRequest {
pub task: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub file_context: Option<Vec<PathBuf>>,
pub agent_type: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AcpResult {
pub success: bool,
pub output: String,
#[serde(default)]
pub modified_files: Vec<PathBuf>,
pub duration_ms: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AcpProgress {
pub message: String,
pub percent_complete: Option<u8>,
}
pub struct AcpTool {
config: AcpConfig,
client: reqwest::Client,
}
impl AcpTool {
pub fn new(config: AcpConfig) -> Self {
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(config.timeout_secs))
.connect_timeout(Duration::from_secs(10))
.build()
.unwrap_or_else(|_| reqwest::Client::new());
Self { config, client }
}
pub async fn execute(
&self,
task: &str,
file_context: Option<&[PathBuf]>,
progress_tx: mpsc::Sender<String>,
) -> Result<AcpResult, AcpError> {
info!(
agent_type = self.config.agent_type.display_name(),
endpoint = %self.config.endpoint,
"Delegating task to ACP agent"
);
let request = AcpRequest {
task: task.to_string(),
file_context: file_context.map(|f| f.to_vec()),
agent_type: self.config.agent_type.display_name().to_string(),
};
let _ = progress_tx
.send(format!(
"🔄 Delegating task to {} via ACP...",
self.config.agent_type.display_name()
))
.await;
let progress_tx_clone = progress_tx.clone();
let agent_name = self.config.agent_type.display_name().to_string();
let timeout_secs = self.config.timeout_secs;
let progress_handle = tokio::spawn(async move {
Self::send_periodic_progress(progress_tx_clone, &agent_name, timeout_secs).await;
});
let result = self.do_execute(&request).await;
progress_handle.abort();
match &result {
Ok(r) => {
let _ = progress_tx
.send(format!(
"✅ {} completed task ({}ms)",
self.config.agent_type.display_name(),
r.duration_ms
))
.await;
}
Err(e) => {
let _ = progress_tx
.send(format!(
"❌ {} task failed: {}",
self.config.agent_type.display_name(),
e
))
.await;
}
}
result
}
async fn do_execute(&self, request: &AcpRequest) -> Result<AcpResult, AcpError> {
let response = self
.client
.post(&self.config.endpoint)
.json(request)
.send()
.await
.map_err(|e| {
if e.is_timeout() {
AcpError::Timeout {
timeout_secs: self.config.timeout_secs,
agent_type: self.config.agent_type.display_name().to_string(),
}
} else if e.is_connect() {
AcpError::EndpointUnreachable {
endpoint: self.config.endpoint.clone(),
reason: e.to_string(),
}
} else {
AcpError::EndpointUnreachable {
endpoint: self.config.endpoint.clone(),
reason: e.to_string(),
}
}
})?;
let status = response.status();
if !status.is_success() {
let body = response.text().await.unwrap_or_default();
error!(
status = status.as_u16(),
body = %body,
"ACP endpoint returned error"
);
return Err(AcpError::EndpointError {
status: status.as_u16(),
message: body,
});
}
let result: AcpResult = response.json().await.map_err(|e| {
AcpError::ResponseParseError(format!("Failed to parse ACP response JSON: {}", e))
})?;
debug!(
success = result.success,
duration_ms = result.duration_ms,
modified_files = result.modified_files.len(),
"ACP task completed"
);
Ok(result)
}
async fn send_periodic_progress(
progress_tx: mpsc::Sender<String>,
agent_name: &str,
timeout_secs: u64,
) {
let interval = Duration::from_secs(30);
let mut elapsed_secs: u64 = 0;
loop {
tokio::time::sleep(interval).await;
elapsed_secs += 30;
if elapsed_secs >= timeout_secs {
let _ = progress_tx
.send(format!(
"⏳ {} is approaching timeout ({}/{}s)...",
agent_name, elapsed_secs, timeout_secs
))
.await;
break;
}
let msg = format!(
"⏳ {} is still working... ({}s elapsed)",
agent_name, elapsed_secs
);
if progress_tx.send(msg).await.is_err() {
break;
}
}
}
pub fn config(&self) -> &AcpConfig {
&self.config
}
pub fn agent_type_name(&self) -> &str {
self.config.agent_type.display_name()
}
}
pub struct AcpToolRegistry {
tools: std::collections::HashMap<String, Vec<AcpTool>>,
}
impl AcpToolRegistry {
pub fn new() -> Self {
Self {
tools: std::collections::HashMap::new(),
}
}
pub fn register_for_agent(&mut self, agent_id: &str, configs: Vec<AcpConfig>) {
let tools: Vec<AcpTool> = configs.into_iter().map(AcpTool::new).collect();
if !tools.is_empty() {
info!(
agent_id = agent_id,
tool_count = tools.len(),
"Registered ACP tools for agent"
);
}
self.tools.insert(agent_id.to_string(), tools);
}
pub fn tools_for_agent(&self, agent_id: &str) -> Option<&[AcpTool]> {
self.tools.get(agent_id).map(|v| v.as_slice())
}
pub fn has_tools_for_agent(&self, agent_id: &str) -> bool {
self.tools
.get(agent_id)
.map(|v| !v.is_empty())
.unwrap_or(false)
}
pub fn total_tool_count(&self) -> usize {
self.tools.values().map(|v| v.len()).sum()
}
}
impl Default for AcpToolRegistry {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct AgentAcpConfig {
#[serde(default)]
pub enabled: bool,
#[serde(default)]
pub agents: Vec<AcpConfig>,
}
impl AgentAcpConfig {
pub fn from_value(value: &serde_json::Value) -> Result<Self, serde_json::Error> {
serde_json::from_value(value.clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_acp_agent_type_display_name() {
assert_eq!(AcpAgentType::ClaudeCode.display_name(), "Claude Code");
assert_eq!(AcpAgentType::Codex.display_name(), "Codex");
assert_eq!(
AcpAgentType::Custom {
name: "MyAgent".to_string()
}
.display_name(),
"MyAgent"
);
}
#[test]
fn test_acp_config_defaults() {
let config = AcpConfig::default();
assert_eq!(config.timeout_secs, 300);
assert_eq!(config.agent_type, AcpAgentType::ClaudeCode);
assert_eq!(config.endpoint, "http://localhost:3000/acp");
}
#[test]
fn test_acp_config_serialization() {
let config = AcpConfig {
endpoint: "http://example.com/acp".to_string(),
timeout_secs: 600,
agent_type: AcpAgentType::Codex,
};
let json = serde_json::to_string(&config).unwrap();
let deserialized: AcpConfig = serde_json::from_str(&json).unwrap();
assert_eq!(config, deserialized);
}
#[test]
fn test_acp_config_custom_agent_serialization() {
let config = AcpConfig {
endpoint: "http://custom.local:8080/acp".to_string(),
timeout_secs: 120,
agent_type: AcpAgentType::Custom {
name: "CustomCoder".to_string(),
},
};
let json = serde_json::to_string(&config).unwrap();
let deserialized: AcpConfig = serde_json::from_str(&json).unwrap();
assert_eq!(config, deserialized);
}
#[test]
fn test_acp_tool_registry_new() {
let registry = AcpToolRegistry::new();
assert_eq!(registry.total_tool_count(), 0);
assert!(!registry.has_tools_for_agent("test-agent"));
}
#[test]
fn test_acp_tool_registry_register() {
let mut registry = AcpToolRegistry::new();
let configs = vec![
AcpConfig {
endpoint: "http://localhost:3000/acp".to_string(),
timeout_secs: 300,
agent_type: AcpAgentType::ClaudeCode,
},
AcpConfig {
endpoint: "http://localhost:3001/acp".to_string(),
timeout_secs: 600,
agent_type: AcpAgentType::Codex,
},
];
registry.register_for_agent("agent-1", configs);
assert!(registry.has_tools_for_agent("agent-1"));
assert!(!registry.has_tools_for_agent("agent-2"));
assert_eq!(registry.total_tool_count(), 2);
assert_eq!(registry.tools_for_agent("agent-1").unwrap().len(), 2);
}
#[test]
fn test_acp_tool_registry_multiple_agents() {
let mut registry = AcpToolRegistry::new();
registry.register_for_agent(
"agent-1",
vec![AcpConfig {
endpoint: "http://localhost:3000/acp".to_string(),
timeout_secs: 300,
agent_type: AcpAgentType::ClaudeCode,
}],
);
registry.register_for_agent(
"agent-2",
vec![AcpConfig {
endpoint: "http://localhost:3001/acp".to_string(),
timeout_secs: 600,
agent_type: AcpAgentType::Codex,
}],
);
assert_eq!(registry.total_tool_count(), 2);
assert!(registry.has_tools_for_agent("agent-1"));
assert!(registry.has_tools_for_agent("agent-2"));
}
#[test]
fn test_acp_tool_creation() {
let config = AcpConfig {
endpoint: "http://localhost:3000/acp".to_string(),
timeout_secs: 300,
agent_type: AcpAgentType::ClaudeCode,
};
let tool = AcpTool::new(config.clone());
assert_eq!(tool.config(), &config);
assert_eq!(tool.agent_type_name(), "Claude Code");
}
#[test]
fn test_acp_error_display() {
let err = AcpError::EndpointUnreachable {
endpoint: "http://localhost:3000/acp".to_string(),
reason: "connection refused".to_string(),
};
assert!(err.to_string().contains("unreachable"));
assert!(err.to_string().contains("localhost:3000"));
let err = AcpError::Timeout {
timeout_secs: 300,
agent_type: "Claude Code".to_string(),
};
assert!(err.to_string().contains("timed out"));
assert!(err.to_string().contains("300s"));
let err = AcpError::EndpointError {
status: 500,
message: "Internal Server Error".to_string(),
};
assert!(err.to_string().contains("500"));
}
#[test]
fn test_acp_request_serialization() {
let request = AcpRequest {
task: "Fix the bug in auth.rs".to_string(),
file_context: Some(vec![PathBuf::from("src/auth.rs")]),
agent_type: "Claude Code".to_string(),
};
let json = serde_json::to_value(&request).unwrap();
assert_eq!(json["task"], "Fix the bug in auth.rs");
assert_eq!(json["file_context"][0], "src/auth.rs");
assert_eq!(json["agent_type"], "Claude Code");
}
#[test]
fn test_acp_result_deserialization() {
let json = serde_json::json!({
"success": true,
"output": "Fixed the authentication bug",
"modified_files": ["src/auth.rs", "tests/auth_test.rs"],
"duration_ms": 5000
});
let result: AcpResult = serde_json::from_value(json).unwrap();
assert!(result.success);
assert_eq!(result.output, "Fixed the authentication bug");
assert_eq!(result.modified_files.len(), 2);
assert_eq!(result.duration_ms, 5000);
}
#[test]
fn test_agent_acp_config_default() {
let config = AgentAcpConfig::default();
assert!(!config.enabled);
assert!(config.agents.is_empty());
}
#[tokio::test]
async fn test_acp_tool_execute_unreachable_endpoint() {
let config = AcpConfig {
endpoint: "http://127.0.0.1:1/acp".to_string(), timeout_secs: 5,
agent_type: AcpAgentType::ClaudeCode,
};
let tool = AcpTool::new(config);
let (tx, mut rx) = mpsc::channel(10);
let result = tool.execute("test task", None, tx).await;
assert!(result.is_err());
let err = result.unwrap_err();
match &err {
AcpError::EndpointUnreachable { endpoint, .. } => {
assert!(endpoint.contains("127.0.0.1:1"));
}
AcpError::Timeout { .. } => {
}
other => panic!("Expected EndpointUnreachable or Timeout, got: {:?}", other),
}
let mut messages = Vec::new();
while let Ok(msg) = rx.try_recv() {
messages.push(msg);
}
assert!(!messages.is_empty(), "Should have received progress messages");
assert!(
messages[0].contains("Delegating task"),
"First message should indicate delegation"
);
}
#[tokio::test]
async fn test_acp_tool_execute_with_file_context() {
let config = AcpConfig {
endpoint: "http://127.0.0.1:1/acp".to_string(),
timeout_secs: 2,
agent_type: AcpAgentType::Codex,
};
let tool = AcpTool::new(config);
let (tx, _rx) = mpsc::channel(10);
let files = vec![PathBuf::from("src/main.rs"), PathBuf::from("src/lib.rs")];
let result = tool.execute("refactor code", Some(&files), tx).await;
assert!(result.is_err());
}
#[test]
fn test_acp_agent_type_serde_roundtrip() {
let types = vec![
AcpAgentType::ClaudeCode,
AcpAgentType::Codex,
AcpAgentType::Custom {
name: "TestAgent".to_string(),
},
];
for agent_type in types {
let json = serde_json::to_string(&agent_type).unwrap();
let deserialized: AcpAgentType = serde_json::from_str(&json).unwrap();
assert_eq!(agent_type, deserialized);
}
}
}