cnctd-service-ssh 0.1.8

SSH command execution service - library and MCP server
Documentation
//! SSH connection handling using portable-pty for interactive PTY sessions.
//!
//! Uses the system ssh command with PTY support for full terminal emulation.

use crate::service_error::ServiceError;
use portable_pty::{native_pty_system, CommandBuilder, PtySize};
use std::io::{Read, Write};
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use tokio::sync::mpsc;
use tracing::{debug, info, warn};

/// Active SSH connection with PTY
pub struct SshConnection {
    /// Channel sender for writing to PTY
    write_tx: mpsc::UnboundedSender<Vec<u8>>,
    /// Channel receiver for reading from PTY
    read_rx: mpsc::UnboundedReceiver<Vec<u8>>,
    /// Flag indicating if connection is alive
    alive: Arc<AtomicBool>,
    /// Target host
    pub host: String,
    /// Target port
    pub port: u16,
    /// Target user
    pub user: String,
    /// Current terminal size
    cols: u16,
    rows: u16,
    /// Handle to resize the PTY (via channel)
    resize_tx: mpsc::UnboundedSender<(u16, u16)>,
}

impl SshConnection {
    /// Connect to an SSH server and open a PTY session
    pub async fn connect(
        host: &str,
        port: u16,
        user: &str,
        key_path: &Path,
        _key_passphrase: Option<&str>,
        cols: u16,
        rows: u16,
        shell: Option<&str>,
    ) -> Result<Self, ServiceError> {
        // Verify key file exists
        if !key_path.exists() {
            return Err(ServiceError::InvalidParams(format!(
                "SSH key not found: {:?}",
                key_path
            )));
        }

        // Create channels for communication
        let (write_tx, mut write_rx) = mpsc::unbounded_channel::<Vec<u8>>();
        let (read_tx, read_rx) = mpsc::unbounded_channel::<Vec<u8>>();
        let (resize_tx, mut resize_rx) = mpsc::unbounded_channel::<(u16, u16)>();

        let alive = Arc::new(AtomicBool::new(true));
        let alive_clone = Arc::clone(&alive);

        // Build command arguments
        let key_path_str = key_path.to_string_lossy().to_string();
        let target = format!("{}@{}", user, host);
        let port_str = port.to_string();
        let shell_cmd = shell.map(|s| s.to_string());

        // Spawn a blocking thread to handle the PTY
        thread::spawn(move || {
            // Create PTY system
            let pty_system = native_pty_system();

            // Configure PTY size
            let pty_size = PtySize {
                rows,
                cols,
                pixel_width: 0,
                pixel_height: 0,
            };

            // Open PTY pair
            let pair = match pty_system.openpty(pty_size) {
                Ok(p) => p,
                Err(e) => {
                    warn!("Failed to open PTY: {}", e);
                    alive_clone.store(false, Ordering::SeqCst);
                    return;
                }
            };

            // Build SSH command
            let mut cmd = CommandBuilder::new("ssh");
            // Set TERM for proper terminal emulation (enables alternate screen for TUI apps)
            cmd.env("TERM", "xterm-256color");
            cmd.arg("-i");
            cmd.arg(&key_path_str);
            cmd.arg("-p");
            cmd.arg(&port_str);
            cmd.arg("-o");
            cmd.arg("StrictHostKeyChecking=accept-new");
            cmd.arg("-o");
            cmd.arg("UserKnownHostsFile=/dev/null");
            // Request pseudo-terminal allocation explicitly
            cmd.arg("-t");
            cmd.arg(&target);

            if let Some(ref shell_cmd) = shell_cmd {
                cmd.arg(shell_cmd);
            }

            debug!("Spawning SSH command to {}", target);

            // Spawn the SSH process in the PTY
            let _child = match pair.slave.spawn_command(cmd) {
                Ok(c) => c,
                Err(e) => {
                    warn!("Failed to spawn SSH: {}", e);
                    alive_clone.store(false, Ordering::SeqCst);
                    return;
                }
            };

            // Drop the slave to close our handle to it
            drop(pair.slave);

            // Get reader and writer from master
            let mut reader = match pair.master.try_clone_reader() {
                Ok(r) => r,
                Err(e) => {
                    warn!("Failed to get PTY reader: {}", e);
                    alive_clone.store(false, Ordering::SeqCst);
                    return;
                }
            };

            let mut writer = match pair.master.take_writer() {
                Ok(w) => w,
                Err(e) => {
                    warn!("Failed to get PTY writer: {}", e);
                    alive_clone.store(false, Ordering::SeqCst);
                    return;
                }
            };

            info!("SSH PTY session established");

            // Use a thread for reading
            let read_alive = Arc::clone(&alive_clone);
            let read_thread = thread::spawn(move || {
                let mut buf = [0u8; 4096];
                loop {
                    if !read_alive.load(Ordering::SeqCst) {
                        break;
                    }

                    match reader.read(&mut buf) {
                        Ok(0) => {
                            // EOF
                            read_alive.store(false, Ordering::SeqCst);
                            break;
                        }
                        Ok(n) => {
                            if read_tx.send(buf[..n].to_vec()).is_err() {
                                break;
                            }
                        }
                        Err(e) => {
                            warn!("PTY read error: {}", e);
                            read_alive.store(false, Ordering::SeqCst);
                            break;
                        }
                    }
                }
            });

            // Handle writes and resizes in this thread
            loop {
                if !alive_clone.load(Ordering::SeqCst) {
                    break;
                }

                // Check for write data (non-blocking via try_recv simulation with timeout)
                if let Ok(data) = write_rx.try_recv() {
                    if let Err(e) = writer.write_all(&data) {
                        warn!("PTY write error: {}", e);
                        alive_clone.store(false, Ordering::SeqCst);
                        break;
                    }
                    let _ = writer.flush();
                }

                // Check for resize
                if let Ok((new_cols, new_rows)) = resize_rx.try_recv() {
                    let size = PtySize {
                        rows: new_rows,
                        cols: new_cols,
                        pixel_width: 0,
                        pixel_height: 0,
                    };
                    if let Err(e) = pair.master.resize(size) {
                        warn!("PTY resize error: {}", e);
                    }
                }

                // Small sleep to prevent busy loop
                thread::sleep(std::time::Duration::from_millis(10));
            }

            // Wait for read thread
            let _ = read_thread.join();
        });

        Ok(Self {
            write_tx,
            read_rx,
            alive,
            host: host.to_string(),
            port,
            user: user.to_string(),
            cols,
            rows,
            resize_tx,
        })
    }

    /// Send data to the PTY
    pub async fn send(&self, data: &[u8]) -> Result<(), ServiceError> {
        self.write_tx.send(data.to_vec()).map_err(|_| {
            ServiceError::Internal("Failed to send to PTY - channel closed".to_string())
        })
    }

    /// Try to receive data from the PTY (non-blocking)
    pub fn try_recv(&mut self) -> Option<Vec<u8>> {
        self.read_rx.try_recv().ok()
    }

    /// Receive data from PTY with timeout
    pub async fn recv_timeout(&mut self, timeout: std::time::Duration) -> Option<Vec<u8>> {
        tokio::time::timeout(timeout, self.read_rx.recv())
            .await
            .ok()
            .flatten()
    }

    /// Check if the connection is still alive
    pub fn is_alive(&self) -> bool {
        self.alive.load(Ordering::SeqCst)
    }

    /// Mark connection as dead
    pub fn mark_dead(&self) {
        self.alive.store(false, Ordering::SeqCst);
    }

    /// Resize the PTY
    pub fn resize(&mut self, cols: u16, rows: u16) -> Result<(), ServiceError> {
        self.resize_tx.send((cols, rows)).map_err(|_| {
            ServiceError::Internal("Failed to resize PTY - channel closed".to_string())
        })?;
        self.cols = cols;
        self.rows = rows;
        Ok(())
    }

    /// Get current PTY size
    pub fn size(&self) -> (u16, u16) {
        (self.cols, self.rows)
    }

    /// Close the connection
    pub async fn close(self) -> Result<(), ServiceError> {
        self.alive.store(false, Ordering::SeqCst);
        Ok(())
    }
}