use std::sync::Arc;
use anyhow::{Result, anyhow};
use cap_rs::core::{AgentEvent, ClientFrame, Content};
use cap_rs::driver::Driver;
use tokio::sync::{RwLock, broadcast, mpsc, oneshot};
use super::{bridge, permission};
use rsclaw_types::OutboundMessage;
use rsclaw_i18n as i18n;
pub(crate) const TURN_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(300);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum AgentKind {
Claudecode,
Openclaude,
Opencode,
Codex,
Qoder,
}
impl AgentKind {
pub fn from_str(s: &str) -> Option<Self> {
match s {
"claudecode" | "claude" => Some(Self::Claudecode),
"openclaude" => Some(Self::Openclaude),
"opencode" => Some(Self::Opencode),
"codex" => Some(Self::Codex),
"qoder" => Some(Self::Qoder),
_ => None,
}
}
pub fn as_str(self) -> &'static str {
match self {
Self::Claudecode => "claudecode",
Self::Openclaude => "openclaude",
Self::Opencode => "opencode",
Self::Codex => "codex",
Self::Qoder => "qoder",
}
}
pub fn display_name(self) -> &'static str {
match self {
Self::Claudecode => "Claude Code",
Self::Openclaude => "OpenClaude",
Self::Opencode => "OpenCode",
Self::Codex => "Codex",
Self::Qoder => "Qoder",
}
}
}
#[derive(Clone)]
pub struct NotifTarget {
pub tx: broadcast::Sender<OutboundMessage>,
pub target_id: String,
pub is_group: bool,
pub channel: String,
pub lang: &'static str,
}
#[derive(Clone)]
pub struct InboxTarget {
pub session_key: String,
pub channel: String,
pub peer_id: String,
pub chat_id: String,
}
pub struct Submitted {
pub session_id: String,
}
pub enum ToolCapRequest {
Prompt {
task: String,
notif: Option<NotifTarget>,
inbox: Option<InboxTarget>,
reply: oneshot::Sender<Result<Submitted>>,
},
}
type Slot = Arc<RwLock<Option<mpsc::Sender<ToolCapRequest>>>>;
pub struct CapAgentManager {
claudecode: Slot,
openclaude: Slot,
opencode: Slot,
codex: Slot,
qoder: Slot,
bus: broadcast::Sender<rsclaw_events::AgentEvent>,
}
impl CapAgentManager {
pub fn new(bus: broadcast::Sender<rsclaw_events::AgentEvent>) -> Self {
Self {
claudecode: Arc::new(RwLock::new(None)),
openclaude: Arc::new(RwLock::new(None)),
opencode: Arc::new(RwLock::new(None)),
codex: Arc::new(RwLock::new(None)),
qoder: Arc::new(RwLock::new(None)),
bus,
}
}
fn slot(&self, kind: AgentKind) -> Slot {
match kind {
AgentKind::Claudecode => Arc::clone(&self.claudecode),
AgentKind::Openclaude => Arc::clone(&self.openclaude),
AgentKind::Opencode => Arc::clone(&self.opencode),
AgentKind::Codex => Arc::clone(&self.codex),
AgentKind::Qoder => Arc::clone(&self.qoder),
}
}
pub async fn dispatch_async(
&self,
kind: AgentKind,
task: String,
cwd: std::path::PathBuf,
notif: Option<NotifTarget>,
inbox: Option<InboxTarget>,
) -> Result<Submitted> {
let tx = self.ensure_actor(kind, cwd).await?;
let (reply_tx, reply_rx) = oneshot::channel();
tx.send(ToolCapRequest::Prompt {
task,
notif,
inbox,
reply: reply_tx,
})
.await
.map_err(|_| anyhow!("cap actor for {} closed", kind.as_str()))?;
reply_rx.await.map_err(|_| anyhow!("cap actor dropped reply"))?
}
async fn ensure_actor(
&self,
kind: AgentKind,
cwd: std::path::PathBuf,
) -> Result<mpsc::Sender<ToolCapRequest>> {
let slot = self.slot(kind);
{
let g = slot.read().await;
if let Some(tx) = g.as_ref() {
return Ok(tx.clone());
}
}
let mut g = slot.write().await;
if let Some(tx) = g.as_ref() {
return Ok(tx.clone());
}
let driver = spawn_driver(kind, &cwd).await?;
let (tx, rx) = mpsc::channel::<ToolCapRequest>(8);
let bus = self.bus.clone();
let slot_for_actor = Arc::clone(&slot);
tokio::spawn(actor_loop(kind, cwd, driver, rx, bus, slot_for_actor));
*g = Some(tx.clone());
Ok(tx)
}
}
pub async fn spawn_driver(
kind: AgentKind,
cwd: &std::path::Path,
) -> Result<Box<dyn Driver>> {
spawn_driver_inner(kind, cwd, ResumeMode::None).await
}
pub async fn spawn_driver_resume(
kind: AgentKind,
cwd: &std::path::Path,
agent_session_id: &str,
) -> Result<Box<dyn Driver>> {
spawn_driver_inner(kind, cwd, ResumeMode::ById(agent_session_id)).await
}
pub async fn spawn_driver_continue_last(
kind: AgentKind,
cwd: &std::path::Path,
) -> Result<Box<dyn Driver>> {
spawn_driver_inner(kind, cwd, ResumeMode::ContinueLast).await
}
pub async fn spawn_driver_acp(
kind: AgentKind,
cwd: &std::path::Path,
) -> Result<Box<dyn Driver>> {
match kind {
AgentKind::Opencode => Ok(Box::new(
cap_rs::driver::acp::AcpDriver::opencode(&cwd)
.await
.map_err(|e| anyhow!("cap opencode ACP spawn: {e}"))?,
)),
other => spawn_driver(other, cwd).await,
}
}
#[derive(Copy, Clone)]
enum ResumeMode<'a> {
None,
ById(&'a str),
ContinueLast,
}
async fn codex_supports_stream_json() -> bool {
let bin = std::env::var("CODEX_BIN").unwrap_or_else(|_| "codex".to_string());
let mut cmd = tokio::process::Command::new(&bin);
cmd.arg("exec").arg("--help");
#[cfg(windows)]
{
use std::os::windows::process::CommandExt;
cmd.creation_flags(0x08000000); }
match cmd.output().await {
Ok(out) => {
let help = String::from_utf8_lossy(&out.stdout);
help.contains("--input-format")
}
Err(_) => false,
}
}
async fn spawn_codex_mcp(
cwd: &std::path::Path,
why: &str,
) -> Result<cap_rs::driver::codex_mcp::CodexMcpDriver> {
cap_rs::driver::codex_mcp::CodexMcpDriver::builder(cwd)
.approval_policy("never")
.spawn()
.await
.map_err(|e| anyhow!("cap codex spawn (stream-json: {why}, MCP: {e})"))
}
async fn spawn_driver_inner(
kind: AgentKind,
cwd: &std::path::Path,
resume_mode: ResumeMode<'_>,
) -> Result<Box<dyn Driver>> {
use cap_rs::driver::stream_json::ClaudeCodeDriver;
fn apply_resume_mode(
b: cap_rs::driver::stream_json::ClaudeCodeDriverBuilder,
mode: ResumeMode<'_>,
) -> cap_rs::driver::stream_json::ClaudeCodeDriverBuilder {
match mode {
ResumeMode::None => b,
ResumeMode::ById(rid) => b.resume(rid),
ResumeMode::ContinueLast => b.continue_last(true),
}
}
let driver: Box<dyn Driver> = match kind {
AgentKind::Claudecode => {
let mut b = ClaudeCodeDriver::builder(cwd).dangerously_skip_permissions(true);
b = apply_resume_mode(b, resume_mode);
Box::new(
b.spawn()
.await
.map_err(|e| anyhow!("cap claudecode spawn: {e}"))?,
)
}
AgentKind::Openclaude => {
let mut b = ClaudeCodeDriver::builder(cwd)
.bin("openclaude")
.dangerously_skip_permissions(true);
b = apply_resume_mode(b, resume_mode);
Box::new(
b.spawn()
.await
.map_err(|e| anyhow!("cap openclaude spawn: {e}"))?,
)
}
AgentKind::Opencode => {
let mut b = ClaudeCodeDriver::opencode_builder(cwd);
b = apply_resume_mode(b, resume_mode);
match b.spawn().await {
Ok(d) => Box::new(d),
Err(e) => {
tracing::info!(
target: "cap",
error = %e,
"opencode stream-json spawn failed, falling back to ACP"
);
Box::new(
cap_rs::driver::acp::AcpDriver::opencode(&cwd)
.await
.map_err(|e2| anyhow!("cap opencode spawn (stream-json: {e}, ACP: {e2})"))?,
)
}
}
}
AgentKind::Codex => {
if codex_supports_stream_json().await {
let mut b =
ClaudeCodeDriver::codex_builder(cwd).dangerously_skip_permissions(true);
b = apply_resume_mode(b, resume_mode);
match b.spawn().await {
Ok(d) => Box::new(d),
Err(e) => {
tracing::info!(
target: "cap",
error = %e,
"codex stream-json spawn failed, falling back to MCP"
);
Box::new(spawn_codex_mcp(cwd, &e.to_string()).await?)
}
}
} else {
tracing::info!(
target: "cap",
"codex binary lacks stream-json exec flags; using MCP driver"
);
Box::new(spawn_codex_mcp(cwd, "stream-json flags unsupported").await?)
}
}
AgentKind::Qoder => {
let mut b = ClaudeCodeDriver::builder(cwd)
.bin("qodercli")
.dangerously_skip_permissions(true);
b = apply_resume_mode(b, resume_mode);
Box::new(
b.spawn()
.await
.map_err(|e| anyhow!("cap qoder spawn: {e}"))?,
)
}
};
Ok(driver)
}
pub fn push_notif(target: &NotifTarget, text: String) {
let msg = OutboundMessage {
target_id: target.target_id.clone(),
is_group: target.is_group,
text,
reply_to: None,
images: Vec::new(),
files: Vec::new(),
channel: Some(target.channel.clone()),
account: None,
};
if let Err(e) = target.tx.send(msg) {
tracing::warn!(target: "cap", err = %e, "cap notif send failed");
}
}
async fn respawn_cap_driver(
kind: AgentKind,
cwd: &std::path::Path,
driver: &mut Box<dyn Driver>,
session_id: &str,
reason: &str,
) -> bool {
match spawn_driver(kind, cwd).await {
Ok(fresh) => {
if let Err(e) = driver.shutdown().await {
tracing::debug!(target: "cap", error = %e, "best-effort shutdown of dead driver");
}
*driver = fresh;
tracing::info!(
target: "cap",
session_id,
agent = kind.as_str(),
reason,
"cap respawned driver after death; retrying prompt once"
);
true
}
Err(e) => {
tracing::warn!(
target: "cap",
session_id,
agent = kind.as_str(),
reason,
error = %e,
"cap driver respawn failed; surfacing error"
);
false
}
}
}
async fn actor_loop(
kind: AgentKind,
cwd: std::path::PathBuf,
mut driver: Box<dyn Driver>,
mut rx: mpsc::Receiver<ToolCapRequest>,
bus: broadcast::Sender<rsclaw_events::AgentEvent>,
slot: Slot,
) {
let agent_id = kind.as_str();
let display = kind.display_name();
while let Some(req) = rx.recv().await {
match req {
ToolCapRequest::Prompt {
task,
notif,
inbox,
reply,
} => {
let session_id = format!("cap-{agent_id}-{}", uuid::Uuid::new_v4());
let _ = reply.send(Ok(Submitted {
session_id: session_id.clone(),
}));
let mut reply_buf = String::new();
let mut attempt = 0u8;
let outcome = loop {
reply_buf.clear();
if let Err(e) = driver
.send(ClientFrame::Prompt {
content: vec![Content::text(task.clone())],
})
.await
{
if attempt == 0
&& respawn_cap_driver(
kind,
&cwd,
&mut driver,
&session_id,
"send failed",
)
.await
{
attempt += 1;
continue;
}
break Err(anyhow!("cap send: {e}"));
}
let turn = match tokio::time::timeout(
TURN_TIMEOUT,
run_turn(
driver.as_mut(),
&bus,
&session_id,
agent_id,
notif.as_ref(),
&mut reply_buf,
),
)
.await
{
Ok(r) => r,
Err(_) => Err(anyhow!(
"cap {display}: turn timed out after {}s (driver hang?)",
TURN_TIMEOUT.as_secs()
)),
};
match turn {
Ok(()) => break Ok(()),
Err(e) => {
if attempt == 0
&& respawn_cap_driver(
kind,
&cwd,
&mut driver,
&session_id,
"exited mid-turn",
)
.await
{
attempt += 1;
continue;
}
break Err(e);
}
}
};
match &outcome {
Ok(()) => {
if let Some(n) = ¬if {
let body = if reply_buf.is_empty() {
i18n::t_fmt(
"acp_done_empty",
n.lang,
&[("status", "✅"), ("name", display)],
)
} else {
i18n::t_fmt(
"acp_done_summary",
n.lang,
&[
("status", "✅"),
("name", display),
("count", "0"),
("summary", reply_buf.as_str()),
],
)
};
push_notif(n, body);
}
let _ = &inbox;
}
Err(e) => {
if let Some(n) = ¬if {
push_notif(
n,
i18n::t_fmt(
"acp_error",
n.lang,
&[("name", display), ("error", &e.to_string())],
),
);
}
}
}
if outcome.is_err() {
break;
}
}
}
}
if let Err(e) = driver.shutdown().await {
tracing::debug!(target: "cap", error = %e, "best-effort shutdown of dead driver");
}
let mut g = slot.write().await;
*g = None;
}
pub async fn run_turn(
driver: &mut dyn Driver,
bus: &broadcast::Sender<rsclaw_events::AgentEvent>,
session_id: &str,
agent_id: &str,
notif: Option<&NotifTarget>,
reply_buf: &mut String,
) -> Result<()> {
loop {
let Some(event) = driver.next_event().await else {
return Err(anyhow!("cap driver exited mid-turn"));
};
if let AgentEvent::PermissionRequest {
req_id,
tool,
risk_level,
..
} = &event
{
let resp = permission::auto_approve(req_id, tool, *risk_level);
if let Err(e) = driver.send(resp).await {
return Err(anyhow!("cap permission send: {e}"));
}
continue;
}
if let AgentEvent::AskUser { ask_id, .. } = &event {
let resp = ClientFrame::AskUserAnswer {
ask_id: ask_id.clone(),
value: serde_json::json!("cancelled"),
};
if let Err(e) = driver.send(resp).await {
return Err(anyhow!("cap ask_user cancel: {e}"));
}
continue;
}
let mut sinks = bridge::Sinks {
notif,
agent_event: Some(bus),
reply: Some(reply_buf),
session_id,
agent_id,
};
let done = bridge::dispatch(&event, &mut sinks);
if done {
return Ok(());
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use async_trait::async_trait;
use cap_rs::core::{RiskLevel, StopReason, TextChannel, Usage};
use cap_rs::driver::DriverError;
use std::collections::VecDeque;
#[test]
fn agent_kind_aliases() {
assert_eq!(AgentKind::from_str("claude"), Some(AgentKind::Claudecode));
assert_eq!(AgentKind::from_str("claudecode"), Some(AgentKind::Claudecode));
assert_eq!(AgentKind::from_str("code"), None);
assert_eq!(AgentKind::from_str("openclaude"), Some(AgentKind::Openclaude));
assert_eq!(AgentKind::from_str("opencode"), Some(AgentKind::Opencode));
assert_eq!(AgentKind::from_str("codex"), Some(AgentKind::Codex));
assert_eq!(AgentKind::from_str("qoder"), Some(AgentKind::Qoder));
assert_eq!(AgentKind::from_str("unknown"), None);
}
#[tokio::test]
#[ignore = "spawns real coding-agent CLIs; run manually"]
async fn cap_live_invoke_all() {
let cwd = std::env::temp_dir();
let (bus, _rx) = broadcast::channel(64);
let mut failures = Vec::new();
for kind in [
AgentKind::Claudecode,
AgentKind::Openclaude,
AgentKind::Opencode,
AgentKind::Codex,
AgentKind::Qoder,
] {
let name = kind.as_str();
let mut driver = match spawn_driver(kind, &cwd).await {
Ok(d) => d,
Err(e) => {
eprintln!("[skip] {name}: spawn failed: {e}");
continue;
}
};
if let Err(e) = driver
.send(ClientFrame::Prompt {
content: vec![Content::text(
"Reply with exactly the word OK and nothing else.".to_string(),
)],
})
.await
{
failures.push(format!("{name}: send: {e}"));
if let Err(e) = driver.shutdown().await {
tracing::debug!(target: "cap", error = %e, "best-effort shutdown of dead driver");
}
continue;
}
let mut reply = String::new();
let outcome = tokio::time::timeout(
std::time::Duration::from_secs(120),
run_turn(driver.as_mut(), &bus, "smoke", name, None, &mut reply),
)
.await;
if let Err(e) = driver.shutdown().await {
tracing::debug!(target: "cap", error = %e, "best-effort shutdown of dead driver");
}
match outcome {
Ok(Ok(())) if reply.trim().is_empty() => {
failures.push(format!("{name}: empty reply"));
eprintln!("[FAIL] {name}: empty reply");
}
Ok(Ok(())) => eprintln!("[ok] {name}: {:?}", reply.trim()),
Ok(Err(e)) => {
failures.push(format!("{name}: run_turn error: {e}"));
eprintln!("[FAIL] {name}: {e}");
}
Err(_) => {
failures.push(format!("{name}: timed out after 120s"));
eprintln!("[FAIL] {name}: timed out");
}
}
}
assert!(failures.is_empty(), "cap agent failures: {failures:?}");
}
#[tokio::test]
#[ignore = "spawns real opencode ACP server; run manually"]
async fn opencode_acp_fallback() {
let cwd = std::env::temp_dir();
let (bus, _rx) = broadcast::channel(64);
let mut driver: Box<dyn Driver> = match cap_rs::driver::acp::AcpDriver::opencode(&cwd).await
{
Ok(d) => Box::new(d),
Err(e) => panic!("opencode ACP spawn failed: {e}"),
};
driver
.send(ClientFrame::Prompt {
content: vec![Content::text(
"Reply with exactly the word OK and nothing else.".to_string(),
)],
})
.await
.expect("opencode ACP send");
let mut reply = String::new();
let outcome = tokio::time::timeout(
std::time::Duration::from_secs(120),
run_turn(driver.as_mut(), &bus, "smoke", "opencode", None, &mut reply),
)
.await;
if let Err(e) = driver.shutdown().await {
tracing::debug!(target: "cap", error = %e, "best-effort shutdown of dead driver");
}
match outcome {
Ok(Ok(())) => {
assert!(!reply.trim().is_empty(), "opencode ACP: empty reply");
eprintln!("[ok] opencode (ACP fallback): {:?}", reply.trim());
}
Ok(Err(e)) => panic!("opencode ACP run_turn error: {e}"),
Err(_) => panic!("opencode ACP timed out after 120s"),
}
}
struct FakeDriver {
events: VecDeque<AgentEvent>,
}
impl FakeDriver {
fn new(events: Vec<AgentEvent>) -> Self {
Self {
events: events.into(),
}
}
}
#[async_trait]
impl Driver for FakeDriver {
async fn send(&mut self, _frame: ClientFrame) -> Result<(), DriverError> {
Ok(())
}
async fn next_event(&mut self) -> Option<AgentEvent> {
self.events.pop_front()
}
async fn shutdown(&mut self) -> Result<(), DriverError> {
Ok(())
}
}
fn done() -> AgentEvent {
AgentEvent::Done {
stop_reason: StopReason::EndTurn,
usage: Usage::default(),
}
}
fn text(t: &str) -> AgentEvent {
AgentEvent::TextChunk {
msg_id: "m".into(),
text: t.into(),
channel: TextChannel::Assistant,
}
}
#[tokio::test]
async fn run_turn_collects_text_until_done() {
let mut driver = FakeDriver::new(vec![text("Hello "), text("world"), done()]);
let (bus, _rx) = broadcast::channel(8);
let mut reply = String::new();
run_turn(&mut driver, &bus, "sess", "claudecode", None, &mut reply)
.await
.unwrap();
assert_eq!(reply, "Hello world");
}
#[tokio::test]
async fn run_turn_auto_approves_permission() {
let mut driver = FakeDriver::new(vec![
AgentEvent::PermissionRequest {
req_id: "p1".into(),
tool: "shell".into(),
intent: serde_json::json!({}),
scope: cap_rs::core::PermissionScope::Execute,
risk_level: RiskLevel::Low,
},
text("ok"),
done(),
]);
let (bus, _rx) = broadcast::channel(8);
let mut reply = String::new();
run_turn(&mut driver, &bus, "sess", "claudecode", None, &mut reply)
.await
.unwrap();
assert_eq!(reply, "ok");
}
#[tokio::test]
async fn run_turn_cancels_ask_user() {
use cap_rs::core::AskKind;
let mut driver = FakeDriver::new(vec![
AgentEvent::AskUser {
ask_id: "q1".into(),
prompt: "Continue?".into(),
ask_kind: AskKind::YesNo,
options: vec![],
timeout_seconds: None,
},
text("ok"),
done(),
]);
let (bus, _rx) = broadcast::channel(8);
let mut reply = String::new();
run_turn(&mut driver, &bus, "sess", "claudecode", None, &mut reply)
.await
.unwrap();
assert_eq!(reply, "ok");
}
#[tokio::test]
async fn run_turn_surfaces_mid_turn_exit() {
let mut driver = FakeDriver::new(vec![text("partial")]);
let (bus, _rx) = broadcast::channel(8);
let mut reply = String::new();
let err = run_turn(&mut driver, &bus, "sess", "claudecode", None, &mut reply)
.await
.unwrap_err();
assert!(err.to_string().contains("exited mid-turn"));
}
#[tokio::test]
async fn run_turn_pushes_tool_call_progress_to_notif() {
let mut driver = FakeDriver::new(vec![
AgentEvent::ToolCallStart {
call_id: "c1".into(),
name: "read_file".into(),
input: serde_json::json!({"path": "/etc/hosts"}),
},
text("done reading"),
done(),
]);
let (bus, _rx) = broadcast::channel(8);
let (notif_tx, mut notif_rx) = broadcast::channel(8);
let notif = NotifTarget {
tx: notif_tx,
target_id: "user@feishu".into(),
is_group: false,
channel: "feishu".into(),
lang: "en",
};
let mut reply = String::new();
run_turn(
&mut driver,
&bus,
"sess",
"claudecode",
Some(¬if),
&mut reply,
)
.await
.unwrap();
let m = notif_rx.try_recv().expect("expected tool-call notif");
assert!(
m.text.contains("read_file"),
"got notif: {:?}",
m.text
);
}
}