synaps_cli/sidecar/
manager.rs1use std::ffi::OsStr;
13use std::path::Path;
14use std::process::Stdio;
15use std::sync::Arc;
16
17use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
18use tokio::process::{Child, ChildStdin, Command};
19use tokio::sync::{mpsc, Mutex};
20
21use super::protocol::{InsertTextMode, SidecarCommand, SidecarFrame, SIDECAR_PROTOCOL_VERSION};
22
23const EVENT_CHANNEL_CAPACITY: usize = 64;
24
25#[derive(Debug, Clone, PartialEq, Eq)]
29pub enum SidecarLifecycleEvent {
30 Ready {
32 protocol_version: u16,
33 extension: String,
34 capabilities: Vec<String>,
35 },
36 StateChanged {
38 state: String,
39 label: Option<String>,
40 },
41 InsertText {
43 text: String,
44 mode: InsertTextMode,
45 },
46 Error(String),
48 Exited,
50}
51
52#[derive(Debug, thiserror::Error)]
54pub enum SidecarError {
55 #[error("failed to spawn sidecar {bin}: {source}")]
56 Spawn {
57 bin: String,
58 #[source]
59 source: std::io::Error,
60 },
61 #[error("sidecar stdin/stdout was not captured")]
62 PipesUnavailable,
63 #[error("sidecar IO error: {0}")]
64 Io(#[from] std::io::Error),
65 #[error("sidecar process has already shut down")]
66 AlreadyShutDown,
67 #[error("failed to encode sidecar command: {0}")]
68 Encode(#[from] serde_json::Error),
69 #[error("sidecar protocol error: {0}")]
70 Protocol(String),
71}
72
73pub struct SidecarManager {
78 child: Option<Child>,
79 stdin: Arc<Mutex<Option<ChildStdin>>>,
80 rx: mpsc::Receiver<SidecarLifecycleEvent>,
81 reader_handle: Option<tokio::task::JoinHandle<()>>,
82 stderr_handle: Option<tokio::task::JoinHandle<()>>,
83}
84
85impl SidecarManager {
86 pub async fn spawn(
91 bin: &Path,
92 args: &[String],
93 config: serde_json::Value,
94 ) -> Result<Self, SidecarError> {
95 let mut command = Command::new(bin);
96 command
97 .args(args.iter().map(OsStr::new))
98 .stdin(Stdio::piped())
99 .stdout(Stdio::piped())
100 .stderr(Stdio::piped())
101 .kill_on_drop(true);
102
103 let mut child = command.spawn().map_err(|source| SidecarError::Spawn {
104 bin: bin.display().to_string(),
105 source,
106 })?;
107
108 let stdin = child
109 .stdin
110 .take()
111 .ok_or(SidecarError::PipesUnavailable)?;
112 let stdout = child
113 .stdout
114 .take()
115 .ok_or(SidecarError::PipesUnavailable)?;
116 let stderr = child.stderr.take();
117
118 let (tx, rx) = mpsc::channel(EVENT_CHANNEL_CAPACITY);
119 let stdin = Arc::new(Mutex::new(Some(stdin)));
120
121 let event_tx = tx.clone();
123 let reader_handle = tokio::spawn(async move {
124 let mut lines = BufReader::new(stdout).lines();
125 while let Ok(Some(line)) = lines.next_line().await {
126 if line.trim().is_empty() {
127 continue;
128 }
129 let event = match serde_json::from_str::<SidecarFrame>(&line) {
130 Ok(ev) => ev,
131 Err(err) => {
132 let _ = event_tx
133 .send(SidecarLifecycleEvent::Error(format!(
134 "failed to parse sidecar line: {err}: {line}"
135 )))
136 .await;
137 continue;
138 }
139 };
140 let mapped = match event {
141 SidecarFrame::Hello {
142 protocol_version,
143 extension,
144 capabilities,
145 } => {
146 if protocol_version < SIDECAR_PROTOCOL_VERSION {
147 Some(SidecarLifecycleEvent::Error(format!(
148 "sidecar protocol v{protocol_version} is too old; host requires v{SIDECAR_PROTOCOL_VERSION}. Update the plugin via /plugins."
149 )))
150 } else {
151 Some(SidecarLifecycleEvent::Ready {
152 protocol_version,
153 extension,
154 capabilities,
155 })
156 }
157 }
158 SidecarFrame::Status { state, label, .. } => {
159 Some(SidecarLifecycleEvent::StateChanged { state, label })
160 }
161 SidecarFrame::InsertText { text, mode } => {
162 Some(SidecarLifecycleEvent::InsertText { text, mode })
163 }
164 SidecarFrame::Error { message } => Some(SidecarLifecycleEvent::Error(message)),
165 SidecarFrame::Custom => None,
166 };
167 if let Some(event) = mapped {
168 if event_tx.send(event).await.is_err() {
169 break;
171 }
172 }
173 }
174 let _ = event_tx.send(SidecarLifecycleEvent::Exited).await;
175 });
176
177 let stderr_handle = stderr.map(|stderr| {
179 tokio::spawn(async move {
180 let mut lines = BufReader::new(stderr).lines();
181 while let Ok(Some(line)) = lines.next_line().await {
182 tracing::debug!(target: "sidecar::manager", "{line}");
183 }
184 })
185 });
186
187 let mut manager = Self {
188 child: Some(child),
189 stdin,
190 rx,
191 reader_handle: Some(reader_handle),
192 stderr_handle,
193 };
194
195 let hello_timeout = tokio::time::timeout(
200 std::time::Duration::from_secs(10),
201 manager.rx.recv(),
202 )
203 .await
204 .map_err(|_| SidecarError::Protocol("sidecar did not send Hello within 10s".to_string()))?;
205
206 match hello_timeout {
207 Some(SidecarLifecycleEvent::Ready { .. }) => {
208 }
210 Some(SidecarLifecycleEvent::Error(e)) => {
211 return Err(SidecarError::Protocol(format!("sidecar Hello failed: {e}")));
212 }
213 Some(other) => {
214 return Err(SidecarError::Protocol(format!(
215 "expected Hello from sidecar, got: {:?}", other
216 )));
217 }
218 None => {
219 return Err(SidecarError::Protocol("sidecar exited before sending Hello".to_string()));
220 }
221 }
222
223 manager.send(SidecarCommand::Init { config }).await?;
224 Ok(manager)
225 }
226
227 pub async fn press(&mut self) -> Result<(), SidecarError> {
229 self.send(SidecarCommand::Trigger { name: "press".into(), payload: None }).await
230 }
231
232 pub async fn release(&mut self) -> Result<(), SidecarError> {
234 self.send(SidecarCommand::Trigger { name: "release".into(), payload: None }).await
235 }
236
237 pub async fn shutdown(&mut self) -> Result<(), SidecarError> {
239 let _ = self.send(SidecarCommand::Shutdown).await;
240 if let Some(mut stdin) = self.stdin.lock().await.take() {
242 let _ = stdin.shutdown().await;
243 }
244 if let Some(mut child) = self.child.take() {
245 let _ = child.wait().await;
246 }
247 if let Some(handle) = self.reader_handle.take() {
248 handle.abort();
249 }
250 if let Some(handle) = self.stderr_handle.take() {
251 handle.abort();
252 }
253 Ok(())
254 }
255
256 pub async fn next_event(&mut self) -> Option<SidecarLifecycleEvent> {
259 self.rx.recv().await
260 }
261
262 async fn send(&self, cmd: SidecarCommand) -> Result<(), SidecarError> {
263 let mut buf = serde_json::to_vec(&cmd)?;
264 buf.push(b'\n');
265 let mut guard = self.stdin.lock().await;
266 let stdin = guard.as_mut().ok_or(SidecarError::AlreadyShutDown)?;
267 stdin.write_all(&buf).await?;
268 stdin.flush().await?;
269 Ok(())
270 }
271}
272
273impl Drop for SidecarManager {
274 fn drop(&mut self) {
275 if let Some(handle) = self.reader_handle.take() {
277 handle.abort();
278 }
279 if let Some(handle) = self.stderr_handle.take() {
280 handle.abort();
281 }
282 }
283}