use crate::{
events::{Engine, Event},
tmux,
types::SessionId,
};
use anyhow::Result;
use std::sync::Arc;
use tokio::sync::mpsc;
fn fifo_path(session_id: &str) -> String {
format!("/tmp/ninox-{session_id}.fifo")
}
pub async fn start_streaming(
engine: Arc<Engine>,
session_id: SessionId,
tmux_id: &str,
cols: u16,
rows: u16,
) -> Result<()> {
let cancel_rx = engine.register_stream(session_id.clone()).await;
let path = fifo_path(&session_id);
let _ = std::fs::remove_file(&path);
let status = tokio::process::Command::new("mkfifo")
.arg(&path)
.status()
.await?;
anyhow::ensure!(status.success(), "mkfifo {path} failed");
let fifo_file = {
use std::os::unix::fs::OpenOptionsExt;
std::fs::OpenOptions::new()
.read(true)
.custom_flags(libc::O_NONBLOCK)
.open(&path)?
};
tmux::pipe_pane(tmux_id, &path).await?;
let bounce = cols.saturating_sub(1).max(2);
let _ = tmux::resize_window(tmux_id, bounce, rows).await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let _ = tmux::resize_window(tmux_id, cols, rows).await;
let engine_out = engine.clone();
let sid_out = session_id.clone();
let path_out = path.clone();
tokio::spawn(async move {
use tokio::io::{unix::AsyncFd, Interest};
let async_fd =
match AsyncFd::with_interest(fifo_file, Interest::READABLE) {
Ok(fd) => fd,
Err(e) => {
tracing::error!("AsyncFd setup for {sid_out}: {e}");
return;
}
};
let mut buf = vec![0u8; 4096];
tokio::pin!(cancel_rx);
loop {
tokio::select! {
biased;
_ = &mut cancel_rx => break,
guard_result = async_fd.readable() => {
let mut guard = match guard_result {
Ok(g) => g,
Err(_) => break,
};
let result = guard.try_io(|inner| {
use std::io::Read;
inner.get_ref().read(&mut buf)
});
match result {
Ok(Ok(0)) => break,
Ok(Ok(n)) => {
engine_out.emit(Event::TerminalOutput {
session_id: sid_out.clone(),
bytes: buf[..n].to_vec(),
});
}
Ok(Err(e)) => {
tracing::error!("FIFO read {sid_out}: {e}");
break;
}
Err(_would_block) => {}
}
}
}
}
let _ = std::fs::remove_file(&path_out);
tracing::info!("PTY stream ended for {sid_out}");
});
let (input_tx, mut input_rx) = mpsc::unbounded_channel::<Vec<u8>>();
engine.register_pty_writer(session_id.clone(), input_tx).await;
let tmux_id_input = tmux_id.to_string();
tokio::spawn(async move {
let mut counter = 0u64;
while let Some(bytes) = input_rx.recv().await {
let buf = format!("ath-in-{}-{counter}", &tmux_id_input);
let tmp = format!("/tmp/ath-in-{}-{counter}.tmp", &tmux_id_input);
counter += 1;
if std::fs::write(&tmp, &bytes).is_err() { continue; }
let _ = tokio::process::Command::new("tmux")
.args(["load-buffer", "-b", &buf, &tmp, ";",
"paste-buffer", "-b", &buf, "-t", &tmux_id_input, "-d"])
.kill_on_drop(true)
.output()
.await;
let _ = std::fs::remove_file(&tmp);
}
});
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{events::Engine, store::Store, tmux};
use std::sync::Arc;
use tempfile::tempdir;
use tokio::time::{sleep, Duration};
fn tmux_available() -> bool {
std::process::Command::new("tmux")
.args(["-V"])
.output()
.map(|o| o.status.success())
.unwrap_or(false)
}
fn unique_id() -> String {
format!(
"pt-{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis()
)
}
fn test_engine() -> Arc<Engine> {
let store =
Arc::new(Store::open(tempdir().unwrap().keep().join("t.db")).unwrap());
Engine::new(store)
}
async fn collect_output(
rx: &mut tokio::sync::broadcast::Receiver<crate::events::Event>,
session_id: &str,
timeout_ms: u64,
) -> Vec<u8> {
let sid = session_id.to_string();
let deadline = tokio::time::Instant::now() + Duration::from_millis(timeout_ms);
let mut all = Vec::new();
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() { break; }
match tokio::time::timeout(remaining, rx.recv()).await {
Ok(Ok(crate::events::Event::TerminalOutput { session_id, bytes }))
if session_id == sid =>
{
all.extend_from_slice(&bytes);
}
Ok(_) => {}
Err(_) => break, }
}
all
}
#[tokio::test]
async fn streaming_round_trip() {
if !tmux_available() { return; }
let id = unique_id();
let engine = test_engine();
let mut rx = engine.subscribe();
tmux::create_session(&id, "/tmp", "bash", &[]).await.unwrap();
sleep(Duration::from_millis(300)).await;
start_streaming(engine.clone(), id.clone(), &id, 80, 24).await.unwrap();
sleep(Duration::from_millis(200)).await;
if let Some(w) = engine.get_pty_writer(&id).await {
let _ = w.send(b"echo ninox-test\r".to_vec());
}
let out = collect_output(&mut rx, &id, 3000).await;
tmux::kill_session(&id).await.unwrap();
assert!(out.windows(10).any(|w| w == b"ninox-test"), "output: {:?}", String::from_utf8_lossy(&out));
}
#[tokio::test]
async fn tui_input_interaction() {
if !tmux_available() { return; }
let id = unique_id();
let engine = test_engine();
let mut rx = engine.subscribe();
let cmd = "bash -c 'select x in yes no; do echo \"chose:$x\"; break; done'";
tmux::create_session(&id, "/tmp", cmd, &[]).await.unwrap();
sleep(Duration::from_millis(400)).await;
start_streaming(engine.clone(), id.clone(), &id, 80, 24).await.unwrap();
sleep(Duration::from_millis(300)).await;
if let Some(w) = engine.get_pty_writer(&id).await {
let _ = w.send(b"1\r".to_vec());
}
let out = collect_output(&mut rx, &id, 4000).await;
tmux::kill_session(&id).await.unwrap();
let text = String::from_utf8_lossy(&out);
assert!(text.contains("chose:yes"), "TUI selection via paste-buffer failed. output: {text}");
}
}