use std::process::Stdio;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Child, Command};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use crate::error::{BosunError, Result};
use crate::tmux::control::{ControlParser, Notification};
pub const MONITOR_SESSION: &str = "__bosun_monitor";
pub struct ControlClient {
child: Child,
_reader: JoinHandle<()>,
}
impl ControlClient {
pub async fn spawn(
socket: Option<&str>,
) -> Result<(Self, mpsc::UnboundedReceiver<Notification>)> {
ensure_monitor_session(socket).await?;
let mut cmd = Command::new("tmux");
if let Some(s) = socket {
cmd.arg("-L").arg(s);
}
cmd.arg("-C")
.arg("attach-session")
.arg("-t")
.arg(MONITOR_SESSION)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::null());
let mut child = cmd
.spawn()
.map_err(|e| BosunError::Tmux(format!("failed to spawn tmux -C monitor: {e}")))?;
let stdout = child
.stdout
.take()
.ok_or_else(|| BosunError::Tmux("tmux -C monitor: missing stdout".into()))?;
let (tx, rx) = mpsc::unbounded_channel::<Notification>();
let reader = tokio::spawn(async move {
let mut lines = BufReader::new(stdout).lines();
let mut parser = ControlParser::new();
loop {
match lines.next_line().await {
Ok(Some(line)) => {
if let Some(notif) = parser.feed(&line) {
if tx.send(notif).is_err() {
break;
}
}
}
Ok(None) => {
let _ = tx.send(Notification::Exit);
break;
}
Err(e) => {
tracing::warn!("tmux -C stdout read error: {}", e);
let _ = tx.send(Notification::Exit);
break;
}
}
}
});
Ok((
Self {
child,
_reader: reader,
},
rx,
))
}
}
impl Drop for ControlClient {
fn drop(&mut self) {
let _ = self.child.start_kill();
}
}
async fn ensure_monitor_session(socket: Option<&str>) -> Result<()> {
let mut cmd = Command::new("tmux");
if let Some(s) = socket {
cmd.arg("-L").arg(s);
}
cmd.args(["new-session", "-d", "-s", MONITOR_SESSION]);
let output = cmd
.output()
.await
.map_err(|e| BosunError::Tmux(format!("create monitor session: {e}")))?;
if output.status.success() {
return Ok(());
}
let stderr = String::from_utf8_lossy(&output.stderr);
if stderr.contains("already exists") || stderr.contains("duplicate session") {
return Ok(());
}
Err(BosunError::Tmux(format!(
"create monitor session: {}",
stderr.trim()
)))
}