use std::ffi::OsStr;
use std::path::Path;
use std::process::Stdio;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{Child, ChildStdin, Command};
use tokio::sync::{mpsc, Mutex};
use super::protocol::{InsertTextMode, SidecarCommand, SidecarFrame, SIDECAR_PROTOCOL_VERSION};
const EVENT_CHANNEL_CAPACITY: usize = 64;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SidecarLifecycleEvent {
Ready {
protocol_version: u16,
extension: String,
capabilities: Vec<String>,
},
StateChanged {
state: String,
label: Option<String>,
},
InsertText {
text: String,
mode: InsertTextMode,
},
Error(String),
Exited,
}
#[derive(Debug, thiserror::Error)]
pub enum SidecarError {
#[error("failed to spawn sidecar {bin}: {source}")]
Spawn {
bin: String,
#[source]
source: std::io::Error,
},
#[error("sidecar stdin/stdout was not captured")]
PipesUnavailable,
#[error("sidecar IO error: {0}")]
Io(#[from] std::io::Error),
#[error("sidecar process has already shut down")]
AlreadyShutDown,
#[error("failed to encode sidecar command: {0}")]
Encode(#[from] serde_json::Error),
#[error("sidecar protocol error: {0}")]
Protocol(String),
}
pub struct SidecarManager {
child: Option<Child>,
stdin: Arc<Mutex<Option<ChildStdin>>>,
rx: mpsc::Receiver<SidecarLifecycleEvent>,
reader_handle: Option<tokio::task::JoinHandle<()>>,
stderr_handle: Option<tokio::task::JoinHandle<()>>,
}
impl SidecarManager {
pub async fn spawn(
bin: &Path,
args: &[String],
config: serde_json::Value,
) -> Result<Self, SidecarError> {
let mut command = Command::new(bin);
command
.args(args.iter().map(OsStr::new))
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true);
let mut child = command.spawn().map_err(|source| SidecarError::Spawn {
bin: bin.display().to_string(),
source,
})?;
let stdin = child
.stdin
.take()
.ok_or(SidecarError::PipesUnavailable)?;
let stdout = child
.stdout
.take()
.ok_or(SidecarError::PipesUnavailable)?;
let stderr = child.stderr.take();
let (tx, rx) = mpsc::channel(EVENT_CHANNEL_CAPACITY);
let stdin = Arc::new(Mutex::new(Some(stdin)));
let event_tx = tx.clone();
let reader_handle = tokio::spawn(async move {
let mut lines = BufReader::new(stdout).lines();
while let Ok(Some(line)) = lines.next_line().await {
if line.trim().is_empty() {
continue;
}
let event = match serde_json::from_str::<SidecarFrame>(&line) {
Ok(ev) => ev,
Err(err) => {
let _ = event_tx
.send(SidecarLifecycleEvent::Error(format!(
"failed to parse sidecar line: {err}: {line}"
)))
.await;
continue;
}
};
let mapped = match event {
SidecarFrame::Hello {
protocol_version,
extension,
capabilities,
} => {
if protocol_version < SIDECAR_PROTOCOL_VERSION {
Some(SidecarLifecycleEvent::Error(format!(
"sidecar protocol v{protocol_version} is too old; host requires v{SIDECAR_PROTOCOL_VERSION}. Update the plugin via /plugins."
)))
} else {
Some(SidecarLifecycleEvent::Ready {
protocol_version,
extension,
capabilities,
})
}
}
SidecarFrame::Status { state, label, .. } => {
Some(SidecarLifecycleEvent::StateChanged { state, label })
}
SidecarFrame::InsertText { text, mode } => {
Some(SidecarLifecycleEvent::InsertText { text, mode })
}
SidecarFrame::Error { message } => Some(SidecarLifecycleEvent::Error(message)),
SidecarFrame::Custom => None,
};
if let Some(event) = mapped {
if event_tx.send(event).await.is_err() {
break;
}
}
}
let _ = event_tx.send(SidecarLifecycleEvent::Exited).await;
});
let stderr_handle = stderr.map(|stderr| {
tokio::spawn(async move {
let mut lines = BufReader::new(stderr).lines();
while let Ok(Some(line)) = lines.next_line().await {
tracing::debug!(target: "sidecar::manager", "{line}");
}
})
});
let mut manager = Self {
child: Some(child),
stdin,
rx,
reader_handle: Some(reader_handle),
stderr_handle,
};
let hello_timeout = tokio::time::timeout(
std::time::Duration::from_secs(10),
manager.rx.recv(),
)
.await
.map_err(|_| SidecarError::Protocol("sidecar did not send Hello within 10s".to_string()))?;
match hello_timeout {
Some(SidecarLifecycleEvent::Ready { .. }) => {
}
Some(SidecarLifecycleEvent::Error(e)) => {
return Err(SidecarError::Protocol(format!("sidecar Hello failed: {e}")));
}
Some(other) => {
return Err(SidecarError::Protocol(format!(
"expected Hello from sidecar, got: {:?}", other
)));
}
None => {
return Err(SidecarError::Protocol("sidecar exited before sending Hello".to_string()));
}
}
manager.send(SidecarCommand::Init { config }).await?;
Ok(manager)
}
pub async fn press(&mut self) -> Result<(), SidecarError> {
self.send(SidecarCommand::Trigger { name: "press".into(), payload: None }).await
}
pub async fn release(&mut self) -> Result<(), SidecarError> {
self.send(SidecarCommand::Trigger { name: "release".into(), payload: None }).await
}
pub async fn shutdown(&mut self) -> Result<(), SidecarError> {
let _ = self.send(SidecarCommand::Shutdown).await;
if let Some(mut stdin) = self.stdin.lock().await.take() {
let _ = stdin.shutdown().await;
}
if let Some(mut child) = self.child.take() {
let _ = child.wait().await;
}
if let Some(handle) = self.reader_handle.take() {
handle.abort();
}
if let Some(handle) = self.stderr_handle.take() {
handle.abort();
}
Ok(())
}
pub async fn next_event(&mut self) -> Option<SidecarLifecycleEvent> {
self.rx.recv().await
}
async fn send(&self, cmd: SidecarCommand) -> Result<(), SidecarError> {
let mut buf = serde_json::to_vec(&cmd)?;
buf.push(b'\n');
let mut guard = self.stdin.lock().await;
let stdin = guard.as_mut().ok_or(SidecarError::AlreadyShutDown)?;
stdin.write_all(&buf).await?;
stdin.flush().await?;
Ok(())
}
}
impl Drop for SidecarManager {
fn drop(&mut self) {
if let Some(handle) = self.reader_handle.take() {
handle.abort();
}
if let Some(handle) = self.stderr_handle.take() {
handle.abort();
}
}
}