Skip to main content

cyril_core/protocol/
transport.rs

1use anyhow::{Context, Result, bail};
2use tokio::io::AsyncReadExt;
3use tokio::process::{Child, Command};
4use tokio::sync::mpsc;
5use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
6
7pub type CompatStdin = tokio_util::compat::Compat<tokio::process::ChildStdin>;
8pub type CompatStdout = tokio_util::compat::Compat<tokio::process::ChildStdout>;
9
10/// Wraps the agent subprocess and its compat-wrapped pipes.
11/// On Windows, spawns via `wsl kiro-cli acp`; on Linux, runs `kiro-cli acp` directly.
12pub struct AgentProcess {
13    _child: Child,
14    stdin: Option<CompatStdin>,
15    stdout: Option<CompatStdout>,
16    stderr_rx: mpsc::UnboundedReceiver<String>,
17}
18
19impl AgentProcess {
20    /// Spawn the kiro-cli ACP subprocess and return compat-wrapped stdin/stdout
21    /// suitable for passing to `ClientSideConnection::new`.
22    /// On Windows, runs via `wsl kiro-cli acp`; on Linux, runs `kiro-cli acp` directly.
23    /// If `agent` is provided, passes `--agent <name>` to kiro-cli.
24    pub fn spawn(agent: Option<&str>) -> Result<Self> {
25        let mut cmd = if cfg!(target_os = "windows") {
26            let mut c = Command::new("wsl");
27            c.arg("kiro-cli");
28            c
29        } else {
30            Command::new("kiro-cli")
31        };
32
33        cmd.arg("acp");
34
35        if let Some(name) = agent {
36            cmd.args(["--agent", name]);
37        }
38
39        let mut child = cmd
40            .stdin(std::process::Stdio::piped())
41            .stdout(std::process::Stdio::piped())
42            .stderr(std::process::Stdio::piped())
43            .kill_on_drop(true)
44            .spawn()
45            .context(if cfg!(target_os = "windows") {
46                "Failed to spawn `wsl kiro-cli acp`. Is WSL installed and kiro-cli available?"
47            } else {
48                "Failed to spawn `kiro-cli acp`. Is kiro-cli installed and on PATH?"
49            })?;
50
51        let stdin = child
52            .stdin
53            .take()
54            .context("Failed to capture agent stdin")?
55            .compat_write();
56
57        let stdout = child
58            .stdout
59            .take()
60            .context("Failed to capture agent stdout")?
61            .compat();
62
63        // Capture stderr in background so we can surface auth/startup errors
64        let stderr = child
65            .stderr
66            .take()
67            .context("Failed to capture agent stderr")?;
68
69        let (stderr_tx, stderr_rx) = mpsc::unbounded_channel();
70        tokio::spawn(async move {
71            let mut stderr = stderr;
72            let mut buf = [0u8; 4096];
73            loop {
74                match stderr.read(&mut buf).await {
75                    Ok(0) => break,
76                    Ok(n) => {
77                        let s = String::from_utf8_lossy(&buf[..n]).into_owned();
78                        if stderr_tx.send(s).is_err() {
79                            break;
80                        }
81                    }
82                    Err(e) => {
83                        tracing::warn!("Stderr read error: {e}");
84                        break;
85                    }
86                }
87            }
88        });
89
90        Ok(Self {
91            _child: child,
92            stdin: Some(stdin),
93            stdout: Some(stdout),
94            stderr_rx,
95        })
96    }
97
98    /// Take the stdin pipe (can only be called once).
99    pub fn take_stdin(&mut self) -> Result<CompatStdin> {
100        self.stdin.take().context("stdin already taken")
101    }
102
103    /// Take the stdout pipe (can only be called once).
104    pub fn take_stdout(&mut self) -> Result<CompatStdout> {
105        self.stdout.take().context("stdout already taken")
106    }
107
108    /// Drain any stderr output collected so far.
109    pub fn drain_stderr(&mut self) -> String {
110        let mut output = String::new();
111        while let Ok(chunk) = self.stderr_rx.try_recv() {
112            output.push_str(&chunk);
113        }
114        output
115    }
116
117    /// Check if the process has already exited (non-blocking).
118    pub fn try_wait(&mut self) -> Result<Option<std::process::ExitStatus>> {
119        self._child.try_wait().context("Failed to check agent process status")
120    }
121
122    /// Wait briefly for the process to start, returning an error if it exits
123    /// immediately (e.g. due to auth failure).
124    pub async fn check_startup(&mut self) -> Result<()> {
125        // Give the process a moment to fail — 1s accommodates slower WSL startup
126        tokio::time::sleep(std::time::Duration::from_millis(1_000)).await;
127
128        // Surface any early stderr even if the process is still running
129        let early_stderr = self.drain_stderr();
130        if !early_stderr.is_empty() {
131            tracing::warn!("Agent early stderr: {early_stderr}");
132        }
133
134        if let Some(status) = self.try_wait()? {
135            let extra_stderr = self.drain_stderr();
136            let stderr = if extra_stderr.is_empty() {
137                early_stderr
138            } else {
139                format!("{early_stderr}{extra_stderr}")
140            };
141
142            if stderr.contains("not logged in") || stderr.contains("please log in") {
143                let login_cmd = if cfg!(target_os = "windows") {
144                    "wsl kiro-cli login"
145                } else {
146                    "kiro-cli login"
147                };
148                bail!(
149                    "kiro-cli requires authentication.\n\
150                     Run `{login_cmd}` first, then try again.\n\n\
151                     Agent stderr: {stderr}"
152                );
153            }
154            bail!(
155                "Agent process exited immediately with {status}.\n\
156                 Agent stderr: {stderr}"
157            );
158        }
159        Ok(())
160    }
161}