use std::sync::Arc;
use tokio::sync::{broadcast, mpsc};
use tracing::warn;
use atomr_agents_coding_cli_isolator::ProcessHandle;
use crate::session::{SessionEvent, SessionTransport};
const PTY_BROADCAST_CAPACITY: usize = 512;
const PTY_INPUT_CAPACITY: usize = 64;
pub(crate) struct PumpHandles {
pub events_tx: broadcast::Sender<SessionEvent>,
pub input_tx: mpsc::Sender<SessionTransport>,
pub closed: Arc<parking_lot::Mutex<bool>>,
}
pub(crate) fn spawn(mut handle: Box<dyn ProcessHandle>) -> PumpHandles {
let (events_tx, _events_rx) = broadcast::channel(PTY_BROADCAST_CAPACITY);
let (input_tx, mut input_rx) = mpsc::channel::<SessionTransport>(PTY_INPUT_CAPACITY);
let closed = Arc::new(parking_lot::Mutex::new(false));
let stdout_rx = handle.take_stdout();
let stdin_tx = handle.take_stdin();
if let Some(mut rx) = stdout_rx {
let events_tx = events_tx.clone();
tokio::spawn(async move {
while let Some(chunk) = rx.recv().await {
if events_tx.send(SessionEvent::Bytes(chunk)).is_err() {
}
}
});
}
let closed_for_task = closed.clone();
let events_tx_for_exit = events_tx.clone();
tokio::spawn(async move {
let mut handle = handle; while let Some(frame) = input_rx.recv().await {
match frame {
SessionTransport::Stdin(bytes) => {
if let Some(tx) = &stdin_tx {
if tx.send(bytes).await.is_err() {
break;
}
}
}
SessionTransport::Resize { cols, rows } => {
if let Err(e) = handle.resize_pty(cols, rows).await {
warn!(error = %e, "pty resize failed");
}
}
SessionTransport::Detach => {
break;
}
}
}
let status = handle.wait().await;
*closed_for_task.lock() = true;
let _ = events_tx_for_exit.send(SessionEvent::Exited {
code: status.ok().and_then(|s| s.code),
});
});
PumpHandles {
events_tx,
input_tx,
closed,
}
}