Skip to main content

synaps_cli/sidecar/
manager.rs

1//! Sidecar lifecycle and IO.
2//!
3//! [`SidecarManager`] spawns a sidecar process, writes line-JSON
4//! [`SidecarCommand`] values to its stdin, and surfaces the
5//! deserialized [`SidecarFrame`] stream as higher-level
6//! [`SidecarLifecycleEvent`] values on an mpsc channel.
7//!
8//! Modality-agnostic. Plugin-specific work lives in the plugin process;
9//! this module is intentionally small and dependency-free beyond `tokio`
10//! and `serde_json`.
11
12use std::ffi::OsStr;
13use std::path::Path;
14use std::process::Stdio;
15use std::sync::Arc;
16
17use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
18use tokio::process::{Child, ChildStdin, Command};
19use tokio::sync::{mpsc, Mutex};
20
21use super::protocol::{InsertTextMode, SidecarCommand, SidecarFrame, SIDECAR_PROTOCOL_VERSION};
22
23const EVENT_CHANNEL_CAPACITY: usize = 64;
24
25/// High-level events emitted by the manager. This is a curated subset
26/// of [`SidecarFrame`] tailored for chatui consumers; plugin-specific
27/// frames that are not actionable by core are dropped.
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub enum SidecarLifecycleEvent {
30    /// Sidecar handshake complete.
31    Ready {
32        protocol_version: u16,
33        extension: String,
34        capabilities: Vec<String>,
35    },
36    /// Sidecar reports a plugin-defined state transition.
37    StateChanged {
38        state: String,
39        label: Option<String>,
40    },
41    /// Sidecar wants text applied to the input buffer.
42    InsertText {
43        text: String,
44        mode: InsertTextMode,
45    },
46    /// Sidecar reported an error message.
47    Error(String),
48    /// Sidecar process exited (clean or otherwise).
49    Exited,
50}
51
52/// Errors surfaced by the manager.
53#[derive(Debug, thiserror::Error)]
54pub enum SidecarError {
55    #[error("failed to spawn sidecar {bin}: {source}")]
56    Spawn {
57        bin: String,
58        #[source]
59        source: std::io::Error,
60    },
61    #[error("sidecar stdin/stdout was not captured")]
62    PipesUnavailable,
63    #[error("sidecar IO error: {0}")]
64    Io(#[from] std::io::Error),
65    #[error("sidecar process has already shut down")]
66    AlreadyShutDown,
67    #[error("failed to encode sidecar command: {0}")]
68    Encode(#[from] serde_json::Error),
69    #[error("sidecar protocol error: {0}")]
70    Protocol(String),
71}
72
73/// Supervises one sidecar process and its line-JSON streams.
74///
75/// Construct via [`SidecarManager::spawn`]; drive with [`press`],
76/// [`release`], [`shutdown`]. Receive events with [`next_event`].
77pub struct SidecarManager {
78    child: Option<Child>,
79    stdin: Arc<Mutex<Option<ChildStdin>>>,
80    rx: mpsc::Receiver<SidecarLifecycleEvent>,
81    reader_handle: Option<tokio::task::JoinHandle<()>>,
82    stderr_handle: Option<tokio::task::JoinHandle<()>>,
83}
84
85impl SidecarManager {
86    /// Spawn `bin` with `args`, send the [`Init`] handshake, and start
87    /// the background reader task.
88    ///
89    /// [`Init`]: SidecarCommand::Init
90    pub async fn spawn(
91        bin: &Path,
92        args: &[String],
93        config: serde_json::Value,
94    ) -> Result<Self, SidecarError> {
95        let mut command = Command::new(bin);
96        command
97            .args(args.iter().map(OsStr::new))
98            .stdin(Stdio::piped())
99            .stdout(Stdio::piped())
100            .stderr(Stdio::piped())
101            .kill_on_drop(true);
102
103        let mut child = command.spawn().map_err(|source| SidecarError::Spawn {
104            bin: bin.display().to_string(),
105            source,
106        })?;
107
108        let stdin = child
109            .stdin
110            .take()
111            .ok_or(SidecarError::PipesUnavailable)?;
112        let stdout = child
113            .stdout
114            .take()
115            .ok_or(SidecarError::PipesUnavailable)?;
116        let stderr = child.stderr.take();
117
118        let (tx, rx) = mpsc::channel(EVENT_CHANNEL_CAPACITY);
119        let stdin = Arc::new(Mutex::new(Some(stdin)));
120
121        // Reader task: parse line-JSON events and forward as SidecarLifecycleEvent.
122        let event_tx = tx.clone();
123        let reader_handle = tokio::spawn(async move {
124            let mut lines = BufReader::new(stdout).lines();
125            while let Ok(Some(line)) = lines.next_line().await {
126                if line.trim().is_empty() {
127                    continue;
128                }
129                let event = match serde_json::from_str::<SidecarFrame>(&line) {
130                    Ok(ev) => ev,
131                    Err(err) => {
132                        let _ = event_tx
133                            .send(SidecarLifecycleEvent::Error(format!(
134                                "failed to parse sidecar line: {err}: {line}"
135                            )))
136                            .await;
137                        continue;
138                    }
139                };
140                let mapped = match event {
141                    SidecarFrame::Hello {
142                        protocol_version,
143                        extension,
144                        capabilities,
145                    } => {
146                        if protocol_version < SIDECAR_PROTOCOL_VERSION {
147                            Some(SidecarLifecycleEvent::Error(format!(
148                                "sidecar protocol v{protocol_version} is too old; host requires v{SIDECAR_PROTOCOL_VERSION}. Update the plugin via /plugins."
149                            )))
150                        } else {
151                            Some(SidecarLifecycleEvent::Ready {
152                                protocol_version,
153                                extension,
154                                capabilities,
155                            })
156                        }
157                    }
158                    SidecarFrame::Status { state, label, .. } => {
159                        Some(SidecarLifecycleEvent::StateChanged { state, label })
160                    }
161                    SidecarFrame::InsertText { text, mode } => {
162                        Some(SidecarLifecycleEvent::InsertText { text, mode })
163                    }
164                    SidecarFrame::Error { message } => Some(SidecarLifecycleEvent::Error(message)),
165                    SidecarFrame::Custom => None,
166                };
167                if let Some(event) = mapped {
168                    if event_tx.send(event).await.is_err() {
169                        // Receiver dropped — give up.
170                        break;
171                    }
172                }
173            }
174            let _ = event_tx.send(SidecarLifecycleEvent::Exited).await;
175        });
176
177        // Stderr task: forward sidecar stderr to tracing for diagnostics.
178        let stderr_handle = stderr.map(|stderr| {
179            tokio::spawn(async move {
180                let mut lines = BufReader::new(stderr).lines();
181                while let Ok(Some(line)) = lines.next_line().await {
182                    tracing::debug!(target: "sidecar::manager", "{line}");
183                }
184            })
185        });
186
187        let mut manager = Self {
188            child: Some(child),
189            stdin,
190            rx,
191            reader_handle: Some(reader_handle),
192            stderr_handle,
193        };
194
195        // Wait for the sidecar's Hello frame before sending Init.
196        // The sidecar must announce its protocol version first so we can
197        // reject incompatible versions before committing to the handshake.
198        // Timeout: 10s — if the sidecar can't say Hello in 10s, it's broken.
199        let hello_timeout = tokio::time::timeout(
200            std::time::Duration::from_secs(10),
201            manager.rx.recv(),
202        )
203        .await
204        .map_err(|_| SidecarError::Protocol("sidecar did not send Hello within 10s".to_string()))?;
205
206        match hello_timeout {
207            Some(SidecarLifecycleEvent::Ready { .. }) => {
208                // Hello received and protocol version is acceptable — proceed with Init
209            }
210            Some(SidecarLifecycleEvent::Error(e)) => {
211                return Err(SidecarError::Protocol(format!("sidecar Hello failed: {e}")));
212            }
213            Some(other) => {
214                return Err(SidecarError::Protocol(format!(
215                    "expected Hello from sidecar, got: {:?}", other
216                )));
217            }
218            None => {
219                return Err(SidecarError::Protocol("sidecar exited before sending Hello".to_string()));
220            }
221        }
222
223        manager.send(SidecarCommand::Init { config }).await?;
224        Ok(manager)
225    }
226
227    /// Send a trigger press command.
228    pub async fn press(&mut self) -> Result<(), SidecarError> {
229        self.send(SidecarCommand::Trigger { name: "press".into(), payload: None }).await
230    }
231
232    /// Send a trigger release command.
233    pub async fn release(&mut self) -> Result<(), SidecarError> {
234        self.send(SidecarCommand::Trigger { name: "release".into(), payload: None }).await
235    }
236
237    /// Send a graceful `shutdown` command and reap the child process.
238    pub async fn shutdown(&mut self) -> Result<(), SidecarError> {
239        let _ = self.send(SidecarCommand::Shutdown).await;
240        // Drop the stdin so the sidecar sees EOF if it ignored shutdown.
241        if let Some(mut stdin) = self.stdin.lock().await.take() {
242            let _ = stdin.shutdown().await;
243        }
244        if let Some(mut child) = self.child.take() {
245            let _ = child.wait().await;
246        }
247        if let Some(handle) = self.reader_handle.take() {
248            handle.abort();
249        }
250        if let Some(handle) = self.stderr_handle.take() {
251            handle.abort();
252        }
253        Ok(())
254    }
255
256    /// Receive the next high-level event, or `None` if the channel
257    /// closed (sidecar exited and reader task drained).
258    pub async fn next_event(&mut self) -> Option<SidecarLifecycleEvent> {
259        self.rx.recv().await
260    }
261
262    async fn send(&self, cmd: SidecarCommand) -> Result<(), SidecarError> {
263        let mut buf = serde_json::to_vec(&cmd)?;
264        buf.push(b'\n');
265        let mut guard = self.stdin.lock().await;
266        let stdin = guard.as_mut().ok_or(SidecarError::AlreadyShutDown)?;
267        stdin.write_all(&buf).await?;
268        stdin.flush().await?;
269        Ok(())
270    }
271}
272
273impl Drop for SidecarManager {
274    fn drop(&mut self) {
275        // Best-effort: kill the child if shutdown wasn't called.
276        if let Some(handle) = self.reader_handle.take() {
277            handle.abort();
278        }
279        if let Some(handle) = self.stderr_handle.take() {
280            handle.abort();
281        }
282    }
283}