cyril_core/protocol/
transport.rs1use anyhow::{Context, Result, bail};
2use tokio::io::AsyncReadExt;
3use tokio::process::{Child, Command};
4use tokio::sync::mpsc;
5use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
6
7pub type CompatStdin = tokio_util::compat::Compat<tokio::process::ChildStdin>;
8pub type CompatStdout = tokio_util::compat::Compat<tokio::process::ChildStdout>;
9
10pub struct AgentProcess {
13 _child: Child,
14 stdin: Option<CompatStdin>,
15 stdout: Option<CompatStdout>,
16 stderr_rx: mpsc::UnboundedReceiver<String>,
17}
18
19impl AgentProcess {
20 pub fn spawn(agent: Option<&str>) -> Result<Self> {
25 let mut cmd = if cfg!(target_os = "windows") {
26 let mut c = Command::new("wsl");
27 c.arg("kiro-cli");
28 c
29 } else {
30 Command::new("kiro-cli")
31 };
32
33 cmd.arg("acp");
34
35 if let Some(name) = agent {
36 cmd.args(["--agent", name]);
37 }
38
39 let mut child = cmd
40 .stdin(std::process::Stdio::piped())
41 .stdout(std::process::Stdio::piped())
42 .stderr(std::process::Stdio::piped())
43 .kill_on_drop(true)
44 .spawn()
45 .context(if cfg!(target_os = "windows") {
46 "Failed to spawn `wsl kiro-cli acp`. Is WSL installed and kiro-cli available?"
47 } else {
48 "Failed to spawn `kiro-cli acp`. Is kiro-cli installed and on PATH?"
49 })?;
50
51 let stdin = child
52 .stdin
53 .take()
54 .context("Failed to capture agent stdin")?
55 .compat_write();
56
57 let stdout = child
58 .stdout
59 .take()
60 .context("Failed to capture agent stdout")?
61 .compat();
62
63 let stderr = child
65 .stderr
66 .take()
67 .context("Failed to capture agent stderr")?;
68
69 let (stderr_tx, stderr_rx) = mpsc::unbounded_channel();
70 tokio::spawn(async move {
71 let mut stderr = stderr;
72 let mut buf = [0u8; 4096];
73 loop {
74 match stderr.read(&mut buf).await {
75 Ok(0) => break,
76 Ok(n) => {
77 let s = String::from_utf8_lossy(&buf[..n]).into_owned();
78 if stderr_tx.send(s).is_err() {
79 break;
80 }
81 }
82 Err(e) => {
83 tracing::warn!("Stderr read error: {e}");
84 break;
85 }
86 }
87 }
88 });
89
90 Ok(Self {
91 _child: child,
92 stdin: Some(stdin),
93 stdout: Some(stdout),
94 stderr_rx,
95 })
96 }
97
98 pub fn take_stdin(&mut self) -> Result<CompatStdin> {
100 self.stdin.take().context("stdin already taken")
101 }
102
103 pub fn take_stdout(&mut self) -> Result<CompatStdout> {
105 self.stdout.take().context("stdout already taken")
106 }
107
108 pub fn drain_stderr(&mut self) -> String {
110 let mut output = String::new();
111 while let Ok(chunk) = self.stderr_rx.try_recv() {
112 output.push_str(&chunk);
113 }
114 output
115 }
116
117 pub fn try_wait(&mut self) -> Result<Option<std::process::ExitStatus>> {
119 self._child.try_wait().context("Failed to check agent process status")
120 }
121
122 pub async fn check_startup(&mut self) -> Result<()> {
125 tokio::time::sleep(std::time::Duration::from_millis(1_000)).await;
127
128 let early_stderr = self.drain_stderr();
130 if !early_stderr.is_empty() {
131 tracing::warn!("Agent early stderr: {early_stderr}");
132 }
133
134 if let Some(status) = self.try_wait()? {
135 let extra_stderr = self.drain_stderr();
136 let stderr = if extra_stderr.is_empty() {
137 early_stderr
138 } else {
139 format!("{early_stderr}{extra_stderr}")
140 };
141
142 if stderr.contains("not logged in") || stderr.contains("please log in") {
143 let login_cmd = if cfg!(target_os = "windows") {
144 "wsl kiro-cli login"
145 } else {
146 "kiro-cli login"
147 };
148 bail!(
149 "kiro-cli requires authentication.\n\
150 Run `{login_cmd}` first, then try again.\n\n\
151 Agent stderr: {stderr}"
152 );
153 }
154 bail!(
155 "Agent process exited immediately with {status}.\n\
156 Agent stderr: {stderr}"
157 );
158 }
159 Ok(())
160 }
161}