use std::collections::{HashMap, VecDeque};
use std::io;
#[cfg(unix)]
use std::os::fd::RawFd;
#[cfg(windows)]
use std::os::windows::io::RawHandle;
use std::path::Path;
use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU64, Ordering};
use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use crate::pty::NativePtyProcess;
use crate::terminal_graphics::TerminalGraphicsCapabilities;
use tokio::sync::mpsc;
use tracing::{debug, warn};
use crate::daemon::telemetry::{
TeeEvent, TeeFileOptions, TeeHandle, TeeOptions, TeeRawOptions, TeeRegistry, TeeSnapshot,
TeeStatus, TeeStream,
};
pub const DEFAULT_BACKLOG_BYTES: usize = 1_048_576;
pub const STREAM_CHUNK_BYTES: usize = 64 * 1024;
pub struct RingBuffer {
capacity: usize,
data: VecDeque<u8>,
bytes_dropped: u64,
}
impl RingBuffer {
pub fn new(capacity: usize) -> Self {
Self {
capacity,
data: VecDeque::with_capacity(capacity.min(64 * 1024)),
bytes_dropped: 0,
}
}
pub fn push(&mut self, bytes: &[u8]) {
let (slice, extra_dropped) = if bytes.len() > self.capacity {
let drop_n = bytes.len() - self.capacity;
(&bytes[drop_n..], drop_n as u64)
} else {
(bytes, 0)
};
let overflow = (self.data.len() + slice.len()).saturating_sub(self.capacity);
if overflow > 0 {
self.data.drain(..overflow);
self.bytes_dropped += overflow as u64;
}
self.bytes_dropped += extra_dropped;
self.data.extend(slice);
}
pub fn drain(&mut self) -> (Vec<u8>, u64) {
let bytes: Vec<u8> = self.data.drain(..).collect();
(bytes, self.bytes_dropped)
}
pub fn snapshot(&self) -> (Vec<u8>, u64) {
(self.data.iter().copied().collect(), self.bytes_dropped)
}
pub fn dropped(&self) -> u64 {
self.bytes_dropped
}
pub fn len(&self) -> usize {
self.data.len()
}
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
}
#[derive(Debug, Clone)]
pub enum AttachmentEnded {
SessionExited,
Stolen,
Terminated,
Detached,
}
#[derive(Debug, Clone)]
pub enum OutboundFrame {
Output(Vec<u8>),
MissedBytes(u64),
Exit(i32),
Ended(AttachmentEnded),
}
pub struct AttachmentHandle {
pub receiver: mpsc::UnboundedReceiver<OutboundFrame>,
}
struct AttachedClient {
sender: mpsc::UnboundedSender<OutboundFrame>,
is_tty: bool,
term: String,
graphics_capabilities: TerminalGraphicsCapabilities,
}
#[derive(Debug, Clone)]
pub struct ExitState {
pub exit_code: i32,
pub exited_at_unix: f64,
pub outcome: TerminationOutcome,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TerminationOutcome {
Unspecified,
NaturalExit,
SoftExit,
HardKilled,
}
#[derive(Debug, Clone, Copy)]
pub(crate) struct PendingTermination {
pub started_at_unix: f64,
pub grace_secs: f64,
}
pub struct OwnedPtySession {
pub id: String,
pub process: Arc<NativePtyProcess>,
pub pid: u32,
pub command: String,
pub cwd: String,
pub originator: String,
pub created_at_unix: f64,
pub rows: AtomicU16,
pub cols: AtomicU16,
backlog: Mutex<RingBuffer>,
tees: TeeRegistry,
attached: Mutex<Option<AttachedClient>>,
exit_state: Mutex<Option<ExitState>>,
pub(crate) pending_termination: Mutex<Option<PendingTermination>>,
hard_kill_fired: Arc<AtomicBool>,
reader_shutdown: Arc<AtomicBool>,
reader_thread: Mutex<Option<thread::JoinHandle<()>>>,
}
impl OwnedPtySession {
pub fn is_attached(&self) -> bool {
self.attached.lock().unwrap().is_some()
}
pub fn exit_state(&self) -> Option<ExitState> {
self.exit_state.lock().unwrap().clone()
}
pub fn rows(&self) -> u16 {
self.rows.load(Ordering::Acquire)
}
pub fn cols(&self) -> u16 {
self.cols.load(Ordering::Acquire)
}
pub fn backlog_snapshot(&self) -> (Vec<u8>, u64) {
self.backlog.lock().unwrap().snapshot()
}
pub fn tee_output_ring(&self, capacity: usize) -> TeeHandle {
self.tees.add_ring(TeeStream::PtyOutput, capacity)
}
pub fn tee_output_channel(&self, capacity: usize) -> (TeeHandle, Receiver<TeeEvent>) {
self.tee_output_channel_with_options(capacity, TeeOptions::default())
}
pub fn tee_output_channel_with_options(
&self,
capacity: usize,
options: TeeOptions,
) -> (TeeHandle, Receiver<TeeEvent>) {
self.tees
.add_channel_with_options(TeeStream::PtyOutput, capacity, options)
}
pub fn tee_output_callback<F>(&self, capacity: usize, callback: F) -> TeeHandle
where
F: FnMut(TeeEvent) + Send + 'static,
{
self.tee_output_callback_with_options(capacity, TeeOptions::default(), callback)
}
pub fn tee_output_callback_with_options<F>(
&self,
capacity: usize,
options: TeeOptions,
callback: F,
) -> TeeHandle
where
F: FnMut(TeeEvent) + Send + 'static,
{
self.tees
.add_callback_with_options(TeeStream::PtyOutput, capacity, options, callback)
}
pub fn tee_output_file<P>(&self, path: P, options: TeeFileOptions) -> io::Result<TeeHandle>
where
P: AsRef<Path>,
{
self.tees.add_file(TeeStream::PtyOutput, path, options)
}
#[cfg(unix)]
pub fn tee_output_raw_fd(&self, fd: RawFd, options: TeeRawOptions) -> TeeHandle {
self.tees.add_raw_fd(TeeStream::PtyOutput, fd, options)
}
#[cfg(windows)]
pub fn tee_output_raw_handle(&self, handle: RawHandle, options: TeeRawOptions) -> TeeHandle {
self.tees
.add_raw_handle(TeeStream::PtyOutput, handle, options)
}
pub fn tee_input_ring(&self, capacity: usize) -> TeeHandle {
self.tees.add_ring(TeeStream::Stdin, capacity)
}
pub fn tee_input_channel(&self, capacity: usize) -> (TeeHandle, Receiver<TeeEvent>) {
self.tee_input_channel_with_options(capacity, TeeOptions::default())
}
pub fn tee_input_channel_with_options(
&self,
capacity: usize,
options: TeeOptions,
) -> (TeeHandle, Receiver<TeeEvent>) {
self.tees
.add_channel_with_options(TeeStream::Stdin, capacity, options)
}
pub fn tee_input_callback<F>(&self, capacity: usize, callback: F) -> TeeHandle
where
F: FnMut(TeeEvent) + Send + 'static,
{
self.tee_input_callback_with_options(capacity, TeeOptions::default(), callback)
}
pub fn tee_input_callback_with_options<F>(
&self,
capacity: usize,
options: TeeOptions,
callback: F,
) -> TeeHandle
where
F: FnMut(TeeEvent) + Send + 'static,
{
self.tees
.add_callback_with_options(TeeStream::Stdin, capacity, options, callback)
}
pub fn tee_input_file<P>(&self, path: P, options: TeeFileOptions) -> io::Result<TeeHandle>
where
P: AsRef<Path>,
{
self.tees.add_file(TeeStream::Stdin, path, options)
}
#[cfg(unix)]
pub fn tee_input_raw_fd(&self, fd: RawFd, options: TeeRawOptions) -> TeeHandle {
self.tees.add_raw_fd(TeeStream::Stdin, fd, options)
}
#[cfg(windows)]
pub fn tee_input_raw_handle(&self, handle: RawHandle, options: TeeRawOptions) -> TeeHandle {
self.tees.add_raw_handle(TeeStream::Stdin, handle, options)
}
pub fn tee_snapshot(&self, handle: TeeHandle) -> Option<TeeSnapshot> {
self.tees.snapshot(handle)
}
pub fn tee_status(&self, handle: TeeHandle) -> Option<TeeStatus> {
self.tees.status(handle)
}
pub fn untee(&self, handle: TeeHandle) -> bool {
self.tees.remove(handle)
}
pub fn attached_is_tty(&self) -> bool {
self.attached
.lock()
.unwrap()
.as_ref()
.map(|c| c.is_tty)
.unwrap_or(false)
}
pub fn attached_term(&self) -> String {
self.attached
.lock()
.unwrap()
.as_ref()
.map(|c| c.term.clone())
.unwrap_or_default()
}
pub fn attached_graphics_capabilities(&self) -> TerminalGraphicsCapabilities {
self.attached
.lock()
.unwrap()
.as_ref()
.map(|c| c.graphics_capabilities.clone())
.unwrap_or_else(TerminalGraphicsCapabilities::unknown)
}
pub fn attach(
&self,
steal: bool,
rows: u16,
cols: u16,
) -> Result<(AttachmentHandle, Vec<u8>, u64), AttachError> {
self.attach_with_terminal_info(
steal,
rows,
cols,
true,
String::new(),
TerminalGraphicsCapabilities::unknown(),
)
}
pub fn attach_with_terminal_info(
&self,
steal: bool,
rows: u16,
cols: u16,
is_tty: bool,
term: String,
graphics_capabilities: TerminalGraphicsCapabilities,
) -> Result<(AttachmentHandle, Vec<u8>, u64), AttachError> {
if let Some(state) = self.exit_state() {
return Err(AttachError::SessionExited(state));
}
let mut attached = self.attached.lock().unwrap();
if attached.is_some() {
if !steal {
return Err(AttachError::AlreadyAttached);
}
if let Some(existing) = attached.take() {
let _ = existing
.sender
.send(OutboundFrame::Ended(AttachmentEnded::Stolen));
}
}
if is_tty {
self.rows.store(rows, Ordering::Release);
self.cols.store(cols, Ordering::Release);
if rows > 0 && cols > 0 {
if let Err(e) = self.process.resize_impl(rows, cols) {
warn!(session_id = %self.id, error = %e, "resize on attach failed");
}
}
}
let (tx, rx) = mpsc::unbounded_channel();
let (backlog, dropped) = self.backlog.lock().unwrap().drain();
*attached = Some(AttachedClient {
sender: tx,
is_tty,
term,
graphics_capabilities,
});
Ok((AttachmentHandle { receiver: rx }, backlog, dropped))
}
pub fn clear_attachment(&self) {
*self.attached.lock().unwrap() = None;
}
pub fn notify_attached(&self, frame: OutboundFrame) {
if let Some(client) = self.attached.lock().unwrap().as_ref() {
let _ = client.sender.send(frame);
}
}
pub fn write_input(&self, bytes: &[u8]) -> Result<(), crate::pty::PtyError> {
self.process.write_impl(bytes, false)?;
self.tees.write(TeeStream::Stdin, bytes);
Ok(())
}
pub fn resize(&self, rows: u16, cols: u16) -> Result<(), crate::pty::PtyError> {
self.rows.store(rows, Ordering::Release);
self.cols.store(cols, Ordering::Release);
self.process.resize_impl(rows, cols)
}
pub fn send_interrupt(&self) -> Result<(), crate::pty::PtyError> {
self.process.send_interrupt_impl()
}
pub fn terminate(&self, grace: Duration) -> Result<(), crate::pty::PtyError> {
*self.pending_termination.lock().unwrap() = Some(PendingTermination {
started_at_unix: unix_now(),
grace_secs: grace.as_secs_f64(),
});
self.process.terminate_tree_impl()?;
let process = Arc::clone(&self.process);
let hard_kill_fired = Arc::clone(&self.hard_kill_fired);
thread::spawn(move || {
thread::sleep(grace);
if process.wait_impl(Some(0.0)).is_err() {
hard_kill_fired.store(true, Ordering::Release);
let _ = process.kill_tree_impl();
}
});
Ok(())
}
pub(crate) fn classify_termination(&self, exited_at_unix: f64) -> TerminationOutcome {
match *self.pending_termination.lock().unwrap() {
None => TerminationOutcome::NaturalExit,
Some(p) => {
if self.hard_kill_fired.load(Ordering::Acquire) {
TerminationOutcome::HardKilled
} else if exited_at_unix - p.started_at_unix <= p.grace_secs + 0.25 {
TerminationOutcome::SoftExit
} else {
TerminationOutcome::HardKilled
}
}
}
}
}
#[derive(Debug)]
pub enum AttachError {
AlreadyAttached,
SessionExited(ExitState),
}
pub struct PtySessionRegistry {
sessions: Mutex<HashMap<String, Arc<OwnedPtySession>>>,
next_id: AtomicU64,
}
impl PtySessionRegistry {
pub fn new() -> Self {
Self {
sessions: Mutex::new(HashMap::new()),
next_id: AtomicU64::new(1),
}
}
pub fn get(&self, id: &str) -> Option<Arc<OwnedPtySession>> {
self.sessions.lock().unwrap().get(id).cloned()
}
pub fn list(&self) -> Vec<Arc<OwnedPtySession>> {
self.sessions.lock().unwrap().values().cloned().collect()
}
#[allow(clippy::too_many_arguments)]
pub fn spawn(
self: &Arc<Self>,
argv: Vec<String>,
cwd: Option<String>,
env: Option<Vec<(String, String)>>,
rows: u16,
cols: u16,
originator: String,
command_display: String,
) -> Result<Arc<OwnedPtySession>, SpawnError> {
if argv.is_empty() {
return Err(SpawnError::EmptyArgv);
}
let process = NativePtyProcess::new(argv, cwd.clone(), env, rows, cols, None)
.map_err(|e| SpawnError::Construct(e.to_string()))?;
process
.start_impl()
.map_err(|e| SpawnError::Spawn(e.to_string()))?;
let pid = pid_of(&process).unwrap_or(0);
let id = self.next_session_id();
let session = Arc::new(OwnedPtySession {
id: id.clone(),
process: Arc::new(process),
pid,
command: command_display,
cwd: cwd.unwrap_or_default(),
originator,
created_at_unix: unix_now(),
rows: AtomicU16::new(rows),
cols: AtomicU16::new(cols),
backlog: Mutex::new(RingBuffer::new(DEFAULT_BACKLOG_BYTES)),
tees: TeeRegistry::new(),
attached: Mutex::new(None),
exit_state: Mutex::new(None),
pending_termination: Mutex::new(None),
hard_kill_fired: Arc::new(AtomicBool::new(false)),
reader_shutdown: Arc::new(AtomicBool::new(false)),
reader_thread: Mutex::new(None),
});
let reader_session = Arc::clone(&session);
let handle = thread::spawn(move || reader_loop(reader_session));
*session.reader_thread.lock().unwrap() = Some(handle);
self.sessions
.lock()
.unwrap()
.insert(id, Arc::clone(&session));
Ok(session)
}
pub fn remove(&self, id: &str) -> Option<Arc<OwnedPtySession>> {
self.sessions.lock().unwrap().remove(id)
}
pub fn purge_exited(&self, originator: &str) -> usize {
let mut guard = self.sessions.lock().unwrap();
let to_remove: Vec<String> = guard
.iter()
.filter(|(_, s)| {
s.exit_state().is_some() && (originator.is_empty() || s.originator == originator)
})
.map(|(k, _)| k.clone())
.collect();
for k in &to_remove {
guard.remove(k);
}
to_remove.len()
}
fn next_session_id(&self) -> String {
let counter = self.next_id.fetch_add(1, Ordering::Relaxed);
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
format!("pty-{nanos:016x}-{counter:08x}")
}
}
impl Default for PtySessionRegistry {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub enum SpawnError {
EmptyArgv,
Construct(String),
Spawn(String),
}
impl std::fmt::Display for SpawnError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SpawnError::EmptyArgv => write!(f, "argv must not be empty"),
SpawnError::Construct(s) => write!(f, "failed to build PTY process: {s}"),
SpawnError::Spawn(s) => write!(f, "failed to spawn PTY: {s}"),
}
}
}
impl std::error::Error for SpawnError {}
fn reader_loop(session: Arc<OwnedPtySession>) {
let read_timeout = Some(0.1_f64);
loop {
if session.reader_shutdown.load(Ordering::Acquire) {
break;
}
match session.process.read_chunk_impl(read_timeout) {
Ok(Some(bytes)) if !bytes.is_empty() => {
session.backlog.lock().unwrap().push(&bytes);
session.tees.write(TeeStream::PtyOutput, &bytes);
if let Some(client) = session.attached.lock().unwrap().as_ref() {
for slice in bytes.chunks(STREAM_CHUNK_BYTES) {
let _ = client.sender.send(OutboundFrame::Output(slice.to_vec()));
}
}
}
Ok(_) => {
}
Err(_e) => {
debug!(session_id = %session.id, "PTY reader closed; probing child status");
break;
}
}
}
match session.process.wait_impl(Some(5.0)) {
Ok(exit_code) => {
let exited_at_unix = unix_now();
let outcome = session.classify_termination(exited_at_unix);
let state = ExitState {
exit_code,
exited_at_unix,
outcome,
};
*session.exit_state.lock().unwrap() = Some(state.clone());
if let Some(client) = session.attached.lock().unwrap().take() {
let _ = client.sender.send(OutboundFrame::Exit(state.exit_code));
let _ = client
.sender
.send(OutboundFrame::Ended(AttachmentEnded::SessionExited));
}
}
Err(_) => {
debug!(
session_id = %session.id,
"PTY reader closed but child still alive; leaving exit_state=None"
);
*session.attached.lock().unwrap() = None;
}
}
}
fn unix_now() -> f64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs_f64())
.unwrap_or(0.0)
}
fn pid_of(process: &NativePtyProcess) -> Option<u32> {
let guard = process.handles.lock().unwrap();
guard.as_ref().map(|h| h.child.pid())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn ring_buffer_drops_oldest_when_capacity_exceeded() {
let mut rb = RingBuffer::new(8);
rb.push(b"abcdefgh");
assert_eq!(rb.len(), 8);
assert_eq!(rb.dropped(), 0);
rb.push(b"ij");
assert_eq!(rb.len(), 8);
assert_eq!(rb.dropped(), 2);
let (bytes, dropped) = rb.drain();
assert_eq!(bytes, b"cdefghij");
assert_eq!(dropped, 2);
assert!(rb.is_empty());
}
#[test]
fn ring_buffer_handles_single_push_larger_than_capacity() {
let mut rb = RingBuffer::new(4);
rb.push(b"abcdefghij");
assert_eq!(rb.dropped(), 6);
let (bytes, _) = rb.drain();
assert_eq!(bytes, b"ghij");
}
#[test]
fn registry_assigns_unique_session_ids() {
let r = Arc::new(PtySessionRegistry::new());
let a = r.next_session_id();
let b = r.next_session_id();
assert_ne!(a, b);
assert!(a.starts_with("pty-"));
}
}