use std::collections::HashMap;
use std::io;
use std::sync::{Arc, Mutex};
use capo_agent::permissions::{Decision, SessionCache};
use capo_agent::{Command, PermissionChoice, UiEvent};
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
use tokio::sync::{mpsc, oneshot};
struct PendingPermission {
tool: String,
args: serde_json::Value,
resolver: oneshot::Sender<Decision>,
}
pub struct PermissionBridge {
pending: Mutex<HashMap<String, PendingPermission>>,
session_cache: Option<Arc<SessionCache>>,
}
impl PermissionBridge {
pub fn new() -> Self {
Self {
pending: Mutex::new(HashMap::new()),
session_cache: None,
}
}
pub fn with_session_cache(session_cache: Arc<SessionCache>) -> Self {
Self {
pending: Mutex::new(HashMap::new()),
session_cache: Some(session_cache),
}
}
pub fn intercept(&self, event: UiEvent) -> String {
if let UiEvent::PermissionRequested {
tool,
args,
resolver,
} = event
{
let id = capo_agent::SessionId::new().into_string();
if let Ok(mut pending) = self.pending.lock() {
pending.insert(
id.clone(),
PendingPermission {
tool: tool.clone(),
args: args.clone(),
resolver,
},
);
}
let value = serde_json::json!({
"type": "permission_requested",
"id": id,
"payload": { "tool": tool, "args": args },
});
serde_json::to_string(&value).unwrap_or_else(|e| {
format!(r#"{{"type":"error","payload":"permission encode failed: {e}"}}"#)
})
} else {
capo_agent::protocol::encode_event(&event)
}
}
pub fn resolve(&self, id: &str, choice: PermissionChoice) {
let pending = match self.pending.lock() {
Ok(mut pending) => pending.remove(id),
Err(_) => return,
};
if let Some(pending) = pending {
let decision = match choice {
PermissionChoice::AllowOnce | PermissionChoice::AllowSession => Decision::Allowed,
PermissionChoice::Deny => Decision::Denied("denied by client".into()),
};
if matches!(choice, PermissionChoice::AllowSession) {
if let Some(cache) = &self.session_cache {
let key = SessionCache::key(&pending.tool, &pending.args);
cache.insert(key, Decision::Allowed);
}
}
let _ = pending.resolver.send(decision);
}
}
}
impl Default for PermissionBridge {
fn default() -> Self {
Self::new()
}
}
async fn write_event_line<W>(
out: &mut W,
bridge: &PermissionBridge,
event: UiEvent,
) -> io::Result<()>
where
W: AsyncWrite + Unpin,
{
let line = bridge.intercept(event);
out.write_all(line.as_bytes()).await?;
out.write_all(b"\n").await?;
out.flush().await
}
#[cfg_attr(not(test), allow(dead_code))]
pub async fn writer_task<W>(
mut out: W,
mut ui_rx: mpsc::Receiver<UiEvent>,
bridge: Arc<PermissionBridge>,
) -> io::Result<()>
where
W: AsyncWrite + Unpin,
{
while let Some(event) = ui_rx.recv().await {
write_event_line(&mut out, &bridge, event).await?;
}
Ok(())
}
pub async fn writer_task_until_shutdown<W>(
mut out: W,
mut ui_rx: mpsc::Receiver<UiEvent>,
bridge: Arc<PermissionBridge>,
mut shutdown: oneshot::Receiver<()>,
) -> io::Result<()>
where
W: AsyncWrite + Unpin,
{
loop {
tokio::select! {
event = ui_rx.recv() => {
let Some(event) = event else {
return Ok(());
};
write_event_line(&mut out, &bridge, event).await?;
}
_ = &mut shutdown => {
while let Ok(event) = ui_rx.try_recv() {
write_event_line(&mut out, &bridge, event).await?;
}
return Ok(());
}
}
}
}
pub async fn reader_task<R>(
mut input: BufReader<R>,
cmd_tx: mpsc::Sender<Command>,
ui_tx: mpsc::Sender<UiEvent>,
bridge: Arc<PermissionBridge>,
) -> io::Result<()>
where
R: AsyncRead + Unpin,
{
let mut line = String::new();
loop {
line.clear();
let n = input.read_line(&mut line).await?;
if n == 0 {
let _ = cmd_tx.send(Command::Quit).await;
return Ok(());
}
let trimmed = line.trim_end_matches(['\r', '\n']);
if trimmed.is_empty() {
continue;
}
let value: serde_json::Value = match serde_json::from_str(trimmed) {
Ok(v) => v,
Err(e) => {
let _ = ui_tx
.send(UiEvent::Error(format!("invalid json: {e}")))
.await;
continue;
}
};
let type_tag = value.get("type").and_then(|v| v.as_str()).unwrap_or("");
if type_tag == "resolve_permission" {
let id = value.get("id").and_then(|v| v.as_str()).unwrap_or("");
let choice = match value.get("choice").and_then(|v| v.as_str()).unwrap_or("") {
"allow_once" => PermissionChoice::AllowOnce,
"allow_session" => PermissionChoice::AllowSession,
"deny" => PermissionChoice::Deny,
other => {
let _ = ui_tx
.send(UiEvent::Error(format!(
"resolve_permission: unknown choice '{other}'"
)))
.await;
continue;
}
};
if id.is_empty() {
let _ = ui_tx
.send(UiEvent::Error("resolve_permission: missing id".into()))
.await;
continue;
}
bridge.resolve(id, choice);
continue;
}
match capo_agent::protocol::decode_command(trimmed) {
Ok(cmd) => {
let is_quit = matches!(cmd, Command::Quit);
if cmd_tx.send(cmd).await.is_err() {
return Ok(());
}
if is_quit {
return Ok(());
}
tokio::task::yield_now().await;
}
Err(e) => {
let _ = ui_tx
.send(UiEvent::Error(format!("decode_command: {e}")))
.await;
}
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::sync::mpsc;
#[tokio::test]
async fn intercept_passes_through_non_permission_events() {
let bridge = PermissionBridge::new();
let line = bridge.intercept(UiEvent::AgentThinking);
assert_eq!(line, r#"{"type":"agent_thinking"}"#);
}
#[tokio::test]
async fn intercept_emits_id_and_resolve_fulfills_the_resolver() {
let bridge = PermissionBridge::new();
let (tx, rx) = oneshot::channel::<Decision>();
let event = UiEvent::PermissionRequested {
tool: "bash".into(),
args: serde_json::json!({"command": "rm -rf /tmp/x"}),
resolver: tx,
};
let line = bridge.intercept(event);
let value: serde_json::Value =
serde_json::from_str(&line).expect("intercept line should be JSON");
assert_eq!(value["type"], "permission_requested");
let id = value["id"].as_str().expect("id present").to_string();
assert_eq!(value["payload"]["tool"], "bash");
assert!(value.get("resolver").is_none(), "resolver leaked: {line}");
bridge.resolve(&id, PermissionChoice::AllowOnce);
let decision = tokio::time::timeout(std::time::Duration::from_millis(100), rx)
.await
.expect("resolver not fulfilled")
.expect("resolver dropped");
assert!(matches!(decision, Decision::Allowed));
}
#[tokio::test]
async fn resolve_with_unknown_id_is_a_safe_noop() {
let bridge = PermissionBridge::new();
bridge.resolve("no-such-id", PermissionChoice::Deny);
}
#[tokio::test]
async fn deny_choice_maps_to_decision_denied() {
let bridge = PermissionBridge::new();
let (tx, rx) = oneshot::channel::<Decision>();
let line = bridge.intercept(UiEvent::PermissionRequested {
tool: "write".into(),
args: serde_json::json!({"path": "/etc/x"}),
resolver: tx,
});
let id = serde_json::from_str::<serde_json::Value>(&line).expect("json")["id"]
.as_str()
.expect("id")
.to_string();
bridge.resolve(&id, PermissionChoice::Deny);
let decision = rx.await.expect("resolver dropped");
assert!(matches!(decision, Decision::Denied(_)));
}
#[tokio::test]
async fn allow_session_choice_updates_session_cache_when_configured() {
let cache = Arc::new(capo_agent::permissions::SessionCache::new());
let bridge = PermissionBridge::with_session_cache(Arc::clone(&cache));
let args = serde_json::json!({"command": "ls /etc"});
let (tx, rx) = oneshot::channel::<Decision>();
let line = bridge.intercept(UiEvent::PermissionRequested {
tool: "bash".into(),
args: args.clone(),
resolver: tx,
});
let id = serde_json::from_str::<serde_json::Value>(&line).expect("json")["id"]
.as_str()
.expect("id")
.to_string();
bridge.resolve(&id, PermissionChoice::AllowSession);
let decision = rx.await.expect("resolver dropped");
assert!(matches!(decision, Decision::Allowed));
let key = capo_agent::permissions::SessionCache::key("bash", &args);
assert!(matches!(cache.get(&key), Some(Decision::Allowed)));
}
#[tokio::test]
async fn writer_task_writes_a_jsonl_line_per_event_via_the_bridge() {
let bridge = Arc::new(PermissionBridge::new());
let (ui_tx, ui_rx) = mpsc::channel::<UiEvent>(8);
let (mut client_read, server_write) = tokio::io::duplex(4096);
let writer = tokio::spawn(writer_task(server_write, ui_rx, Arc::clone(&bridge)));
ui_tx.send(UiEvent::AgentThinking).await.expect("send");
ui_tx
.send(UiEvent::AgentTextDelta("hi".into()))
.await
.expect("send");
drop(ui_tx);
writer
.await
.expect("writer task panicked")
.expect("writer io");
let mut buf = String::new();
client_read.read_to_string(&mut buf).await.expect("read");
let lines: Vec<&str> = buf.lines().collect();
assert_eq!(lines.len(), 2);
assert_eq!(lines[0], r#"{"type":"agent_thinking"}"#);
assert!(lines[1].contains(r#""type":"agent_text_delta""#));
}
#[tokio::test]
async fn reader_task_exits_after_forwarding_quit_even_if_stdin_stays_open() {
let bridge = Arc::new(PermissionBridge::new());
let (cmd_tx, mut cmd_rx) = mpsc::channel::<capo_agent::Command>(4);
let (ui_tx, _ui_rx) = mpsc::channel::<UiEvent>(4);
let (mut client_write, server_read) = tokio::io::duplex(4096);
let reader = tokio::spawn(reader_task(
BufReader::new(server_read),
cmd_tx,
ui_tx,
Arc::clone(&bridge),
));
client_write
.write_all(b"{\"type\":\"quit\"}\n")
.await
.expect("write");
tokio::time::timeout(std::time::Duration::from_millis(200), reader)
.await
.expect("reader did not exit after quit")
.expect("reader panicked")
.expect("reader io");
assert!(matches!(
cmd_rx.recv().await,
Some(capo_agent::Command::Quit)
));
}
#[tokio::test]
async fn reader_task_decodes_commands_to_the_command_channel() {
let bridge = Arc::new(PermissionBridge::new());
let (cmd_tx, mut cmd_rx) = mpsc::channel::<capo_agent::Command>(8);
let (ui_tx, mut ui_rx) = mpsc::channel::<UiEvent>(8);
let (client_write, server_read) = tokio::io::duplex(4096);
let reader = tokio::spawn(reader_task(
BufReader::new(server_read),
cmd_tx,
ui_tx,
Arc::clone(&bridge),
));
let mut client_write = client_write;
client_write
.write_all(b"{\"type\":\"cancel_agent\"}\n")
.await
.expect("write");
client_write
.write_all(b"{\"type\":\"send_user_message\",\"payload\":\"hi\"}\n")
.await
.expect("write");
drop(client_write);
reader
.await
.expect("reader task panicked")
.expect("reader io");
assert!(matches!(
cmd_rx.recv().await,
Some(capo_agent::Command::CancelAgent)
));
match cmd_rx.recv().await {
Some(capo_agent::Command::SendUserMessage(text)) => assert_eq!(text, "hi"),
other => panic!("expected SendUserMessage, got {other:?}"),
}
assert!(ui_rx.try_recv().is_err());
}
#[tokio::test]
async fn reader_task_routes_resolve_permission_to_the_bridge() {
let bridge = Arc::new(PermissionBridge::new());
let (resolver_tx, resolver_rx) = oneshot::channel::<Decision>();
let line = bridge.intercept(UiEvent::PermissionRequested {
tool: "bash".into(),
args: serde_json::json!({"command": "ls"}),
resolver: resolver_tx,
});
let id = serde_json::from_str::<serde_json::Value>(&line).expect("json")["id"]
.as_str()
.expect("id")
.to_string();
let (cmd_tx, mut cmd_rx) = mpsc::channel::<capo_agent::Command>(4);
let (ui_tx, _ui_rx) = mpsc::channel::<UiEvent>(4);
let (client_write, server_read) = tokio::io::duplex(4096);
let reader = tokio::spawn(reader_task(
BufReader::new(server_read),
cmd_tx,
ui_tx,
Arc::clone(&bridge),
));
let mut client_write = client_write;
let payload = format!(
"{{\"type\":\"resolve_permission\",\"id\":\"{id}\",\"choice\":\"allow_once\"}}\n"
);
client_write
.write_all(payload.as_bytes())
.await
.expect("write");
let decision = tokio::time::timeout(std::time::Duration::from_millis(100), resolver_rx)
.await
.expect("resolver not fulfilled")
.expect("resolver dropped");
assert!(matches!(decision, Decision::Allowed));
assert!(cmd_rx.try_recv().is_err());
drop(client_write);
reader.await.expect("reader panicked").expect("reader io");
assert!(matches!(cmd_rx.try_recv(), Ok(Command::Quit)));
}
#[tokio::test]
async fn reader_task_emits_error_event_on_bad_line_and_keeps_serving() {
let bridge = Arc::new(PermissionBridge::new());
let (cmd_tx, mut cmd_rx) = mpsc::channel::<capo_agent::Command>(4);
let (ui_tx, mut ui_rx) = mpsc::channel::<UiEvent>(4);
let (client_write, server_read) = tokio::io::duplex(4096);
let reader = tokio::spawn(reader_task(
BufReader::new(server_read),
cmd_tx,
ui_tx,
Arc::clone(&bridge),
));
let mut client_write = client_write;
client_write.write_all(b"not json\n").await.expect("write");
client_write
.write_all(b"{\"type\":\"cancel_agent\"}\n")
.await
.expect("write");
drop(client_write);
reader.await.expect("reader panicked").expect("reader io");
let evt = ui_rx.recv().await.expect("error event");
assert!(matches!(evt, UiEvent::Error(_)), "got {evt:?}");
assert!(matches!(
cmd_rx.recv().await,
Some(capo_agent::Command::CancelAgent)
));
}
}