execkit 0.1.2

Stateful, structured, safe shell sessions for AI agents on real infrastructure.
Documentation
// SPDX-License-Identifier: Apache-2.0
//! A persistent session: frame each command with unguessable start/end sentinels
//! that carry exit code + cwd, and dump the command's stderr back *through the
//! channel* between them - so the framing is identical for local and remote
//! transports (no local-filesystem dependency). Then apply policy, redaction,
//! bounding, and audit.

use std::io::Read;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

use crate::audit::AuditLog;
use crate::error::{Error, Result};
use crate::exec::ExecResult;
use crate::output::{bound, clean};
use crate::policy::Policy;
use crate::redact::redact;
use crate::transport::{self, local::LocalPty, Transport};

const US: u8 = 0x1f; // unit separator

/// A live, stateful shell session.
pub struct Session {
    io: Box<dyn Transport>,
    token: String,
    policy: Option<Policy>,
    audit: Option<AuditLog>,
    timeout: Duration,
    max_output: usize,
    /// Set after a timeout: the prior command is still running and would desync
    /// framing, so the session refuses further commands.
    poisoned: bool,
}

impl Session {
    /// Open a session backed by a local `bash` PTY.
    pub fn local() -> Result<Self> {
        let pty = LocalPty::spawn("bash", &["--norc", "--noprofile"])?;
        Self::from_transport(Box::new(pty))
    }

    /// Open a session over SSH.
    #[cfg(feature = "ssh")]
    pub fn ssh(config: crate::transport::ssh::SshConfig) -> Result<Self> {
        let t = crate::transport::ssh::SshTransport::connect(config)?;
        Self::from_transport(Box::new(t))
    }

    /// Build a session over any transport: run the readiness handshake and set
    /// up the per-session sentinel token.
    fn from_transport(mut io: Box<dyn Transport>) -> Result<Self> {
        transport::shell_init(io.as_mut())?;
        Ok(Self {
            io,
            token: unique_token(),
            policy: None,
            audit: None,
            timeout: Duration::from_secs(30),
            max_output: 100_000,
            poisoned: false,
        })
    }

    /// Attach an advisory policy (checked before each command runs).
    pub fn with_policy(mut self, policy: Policy) -> Self {
        self.policy = Some(policy);
        self
    }

    /// Attach an audit log (every result is appended).
    pub fn with_audit(mut self, audit: AuditLog) -> Self {
        self.audit = Some(audit);
        self
    }

    /// Set the per-command completion timeout.
    pub fn with_timeout(mut self, timeout: Duration) -> Self {
        self.timeout = timeout;
        self
    }

    /// Cap the (char) size of returned stdout/stderr; also bounds in-memory
    /// accumulation so a flooding command can't exhaust RAM.
    pub fn with_max_output(mut self, max: usize) -> Self {
        self.max_output = max;
        self
    }

    /// True if a prior timeout left the session unusable.
    pub fn is_poisoned(&self) -> bool {
        self.poisoned
    }

    /// Run a command and return a structured [`ExecResult`].
    ///
    /// On a completion timeout this returns [`Error::StillRunning`] and poisons
    /// the session (subsequent calls return [`Error::SessionPoisoned`]).
    pub fn exec(&mut self, command: &str) -> Result<ExecResult> {
        if self.poisoned {
            return Err(Error::SessionPoisoned);
        }
        if let Some(p) = &self.policy {
            if let Err(reason) = p.check(command) {
                return Err(Error::PolicyDenied(reason));
            }
        }

        let start_m = format!("__EXECKIT_{}__", self.token);
        let end_m = format!("__EXECKITEND_{}__", self.token);
        // stderr -> a remote temp file (mktemp = 0600), cat back through the
        // channel between the start/end sentinels, then removed. Works
        // identically for local and SSH; nothing touches the local filesystem.
        // Layout on the wire: <stdout>\n<START>\x1f<exit>\x1f<cwd>\x1f<stderr><END>\n
        // The fallback temp file (no `mktemp`) is created with umask 077 INSIDE
        // the command substitution, so it's 0600 and the umask never leaks into
        // the user's command.
        let payload = format!(
            "__E=$(umask 077; mktemp 2>/dev/null||{{ f=/tmp/execkitE_{tok}; : >\"$f\"; echo \"$f\"; }}); \
{{ {cmd} ; }} 2>\"$__E\"; \
printf '\\n{start}\\037%d\\037%s\\037' \"$?\" \"$PWD\"; cat \"$__E\" 2>/dev/null; \
printf '{end}\\n'; rm -f \"$__E\"\n",
            tok = self.token,
            cmd = command,
            start = start_m,
            end = end_m,
        );

        let started = Instant::now();
        self.io.write_all(payload.as_bytes())?;

        let start_b = start_m.as_bytes();
        let end_b = end_m.as_bytes();
        // Bound in-memory accumulation. The trailer (START..END) arrives last and
        // is small, so a head+tail keep always retains it.
        let max_acc = self.max_output.saturating_mul(2).max(65_536);
        let mut acc: Vec<u8> = Vec::new();
        let mut overflowed = false;
        let deadline = Instant::now() + self.timeout;

        loop {
            let now = Instant::now();
            if now >= deadline {
                self.poisoned = true;
                return Err(Error::StillRunning);
            }
            let chunk = match self.io.recv_timeout(deadline - now) {
                Some(c) => c,
                None => {
                    self.poisoned = true;
                    return Err(Error::StillRunning);
                }
            };
            acc.extend_from_slice(&chunk);

            if acc.len() > max_acc {
                let keep = max_acc / 2;
                let tail_start = acc.len() - keep;
                let mut compacted = Vec::with_capacity(keep * 2);
                compacted.extend_from_slice(&acc[..keep]);
                compacted.extend_from_slice(&acc[tail_start..]);
                acc = compacted;
                overflowed = true;
            }

            // Completion = the END sentinel has arrived.
            let Some(end_pos) = find(&acc, end_b) else {
                continue;
            };
            let Some(start_pos) = find(&acc[..end_pos], start_b) else {
                continue;
            };
            let between = &acc[start_pos + start_b.len()..end_pos];
            let seps: Vec<usize> = between
                .iter()
                .enumerate()
                .filter(|(_, b)| **b == US)
                .map(|(i, _)| i)
                .collect();
            if seps.len() < 3 {
                continue;
            }
            let exit_code: i32 = String::from_utf8_lossy(&between[seps[0] + 1..seps[1]])
                .trim()
                .parse()
                .unwrap_or(-1);
            let cwd = String::from_utf8_lossy(&between[seps[1] + 1..seps[2]]).into_owned();
            // Everything after the 3rd separator is stderr (may itself contain
            // separators - we only consume the first three).
            let raw_err = clean(&String::from_utf8_lossy(&between[seps[2] + 1..]));
            let raw_out = clean(&String::from_utf8_lossy(&acc[..start_pos]));
            let (stdout, t1) = bound(&redact(&raw_out), self.max_output);
            let (stderr, t2) = bound(&redact(&raw_err), self.max_output);

            let result = ExecResult {
                command: command.to_string(),
                stdout,
                stderr,
                exit_code,
                duration_ms: started.elapsed().as_millis() as u64,
                cwd,
                truncated: t1 || t2 || overflowed,
            };
            if let Some(a) = &self.audit {
                if let Err(e) = a.record(&result) {
                    eprintln!("execkit: audit write failed: {e}");
                }
            }
            return Ok(result);
        }
    }
}

fn find(hay: &[u8], needle: &[u8]) -> Option<usize> {
    if needle.is_empty() || hay.len() < needle.len() {
        return None;
    }
    hay.windows(needle.len()).position(|w| w == needle)
}

fn unique_token() -> String {
    static COUNTER: AtomicU64 = AtomicU64::new(0);
    let nanos = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .map(|d| d.as_nanos())
        .unwrap_or(0);
    let n = COUNTER.fetch_add(1, Ordering::Relaxed);
    // Unpredictable suffix so command output can't forge the sentinels and the
    // remote temp-file fallback path can't be guessed.
    let mut rnd = [0u8; 8];
    if let Ok(mut f) = std::fs::File::open("/dev/urandom") {
        let _ = f.read_exact(&mut rnd);
    }
    let rhex: String = rnd.iter().map(|b| format!("{b:02x}")).collect();
    format!("{nanos:x}{n:x}{rhex}")
}