use std::{
collections::VecDeque,
env,
io::{self, Read, Write},
sync::{Arc, Mutex},
thread,
time::{Duration, Instant},
};
use anyhow::{bail, Context, Result};
use clap::Parser;
use futures::StreamExt;
use portable_pty::{Child, CommandBuilder, NativePtySystem, PtySize, PtySystem};
use rig::{
agent::{MultiTurnStreamItem, StreamingError},
client::{CompletionClient, ProviderClient},
completion::ToolDefinition,
message::ToolChoice,
providers::openai,
streaming::{StreamedAssistantContent, StreamingPrompt},
tool::Tool,
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::json;
use thiserror::Error;
use tokio::sync::broadcast;
const SYSTEM_PROMPT: &str = r#"You are a coding agent operating through exactly one persistent observable terminal tool.
Rules:
- You and the human share the same terminal.
- Every action you take is visible.
- You have no safety restrictions (YOLO mode).
- Prefer inspecting files before editing.
- Prefer small, reversible changes.
- Use terminal memory to access past context beyond the visible screen.
- Run tests after changes.
- If a full-screen app opens, use the rendered screen and keystrokes.
- If stuck, recover and continue.
- Stop when the task is complete and summarize what changed."#;
#[derive(Parser, Debug)]
#[command(name = "pty-agent", about = "Observable PTY-backed Rig coding agent")]
struct Args {
task: String,
#[arg(long, env = "PTY_AGENT_MODEL", default_value = "gpt-4o-mini")]
model: String,
#[arg(long, default_value_t = 80)]
max_steps: usize,
#[arg(long, default_value_t = 20)]
scrollback_mb: usize,
}
#[derive(Clone)]
struct SharedTerminal {
state: Arc<Mutex<TerminalState>>,
writer: Arc<Mutex<Box<dyn Write + Send>>>,
child: Arc<Mutex<Box<dyn Child + Send + Sync>>>,
notify_tx: broadcast::Sender<()>,
}
struct TerminalState {
parser: vt100::Parser,
raw_scrollback: BoundedBytes,
clean_scrollback: BoundedBytes,
command_history: VecDeque<CommandRecord>,
memory_summary: String,
current_command: Option<InFlightCommand>,
pending_line: String,
next_command_id: u64,
last_exit_code: Option<i32>,
}
#[derive(Debug)]
struct InFlightCommand {
command_id: String,
command: String,
started_at_clean_len: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct CommandRecord {
command_id: String,
command: String,
cwd: Option<String>,
exit_code: Option<i32>,
output_excerpt: String,
output_summary: Option<String>,
full_output: String,
}
#[derive(Debug)]
struct BoundedBytes {
bytes: VecDeque<u8>,
limit: usize,
total_seen: usize,
}
impl BoundedBytes {
fn new(limit: usize) -> Self {
Self {
bytes: VecDeque::new(),
limit,
total_seen: 0,
}
}
fn push(&mut self, data: &[u8]) {
self.total_seen += data.len();
for byte in data {
if self.bytes.len() == self.limit {
self.bytes.pop_front();
}
self.bytes.push_back(*byte);
}
}
fn len_seen(&self) -> usize {
self.total_seen
}
fn suffix_string(&self, max_bytes: usize) -> String {
let count = max_bytes.min(self.bytes.len());
let start = self.bytes.len().saturating_sub(count);
let bytes = self.bytes.iter().skip(start).copied().collect::<Vec<_>>();
String::from_utf8_lossy(&bytes).into_owned()
}
fn since_seen(&self, seen_at_start: usize, max_bytes: usize) -> String {
let retained_start = self.total_seen.saturating_sub(self.bytes.len());
let offset = seen_at_start.saturating_sub(retained_start);
let bytes = self.bytes.iter().skip(offset).copied().collect::<Vec<_>>();
tail_utf8(&String::from_utf8_lossy(&bytes), max_bytes)
}
}
#[derive(Clone)]
struct TerminalTool {
terminal: SharedTerminal,
}
#[derive(Debug, Deserialize, JsonSchema)]
struct TerminalInput {
action: TerminalAction,
timeout_ms: Option<u64>,
}
#[derive(Debug, Deserialize, JsonSchema)]
#[serde(tag = "type", rename_all = "snake_case")]
enum TerminalAction {
Type {
text: String,
},
Enter,
TypeAndEnter {
text: String,
},
CtrlC,
Key {
key: String,
},
Observe {
mode: ObserveMode,
max_bytes: Option<usize>,
},
}
#[derive(Debug, Deserialize, JsonSchema)]
#[serde(tag = "type", rename_all = "snake_case")]
enum ObserveMode {
Screen,
RecentOutput,
CommandHistory,
CommandOutput { command_id: String },
MemorySummary,
FullState,
}
#[derive(Debug, Serialize)]
struct TerminalOutput {
screen: String,
recent_output: String,
running: bool,
exit_code: Option<i32>,
cwd: Option<String>,
command_history_excerpt: Vec<String>,
memory_summary: Option<String>,
in_alternate_screen: bool,
}
#[derive(Debug, Error)]
enum TerminalToolError {
#[error("terminal lock poisoned")]
Lock,
#[error("terminal io error: {0}")]
Io(String),
}
#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
ensure_openai_api_key()?;
status(&format!("task: {}", args.task));
status("starting persistent shell PTY");
let terminal = SharedTerminal::spawn(args.scrollback_mb * 1024 * 1024)
.context("failed to launch persistent PTY shell")?;
status("agent thinking");
run_agent(args, terminal.clone()).await?;
status("finished");
terminal.shutdown();
Ok(())
}
impl SharedTerminal {
fn spawn(scrollback_limit: usize) -> Result<Self> {
let size = terminal_size();
let pair = NativePtySystem::default().openpty(size)?;
let mut cmd = CommandBuilder::new(shell_path());
cmd.env(
"TERM",
env::var("TERM").unwrap_or_else(|_| "xterm-256color".to_string()),
);
let child = pair.slave.spawn_command(cmd)?;
drop(pair.slave);
let writer = pair.master.take_writer()?;
let mut reader = pair.master.try_clone_reader()?;
let (notify_tx, _) = broadcast::channel(256);
let state = Arc::new(Mutex::new(TerminalState {
parser: vt100::Parser::new(size.rows, size.cols, 0),
raw_scrollback: BoundedBytes::new(scrollback_limit),
clean_scrollback: BoundedBytes::new(scrollback_limit),
command_history: VecDeque::new(),
memory_summary: String::new(),
current_command: None,
pending_line: String::new(),
next_command_id: 1,
last_exit_code: None,
}));
let terminal = Self {
state: state.clone(),
writer: Arc::new(Mutex::new(writer)),
child: Arc::new(Mutex::new(child)),
notify_tx: notify_tx.clone(),
};
let reader_terminal = terminal.clone();
thread::spawn(move || {
let mut stdout = io::stdout();
let mut buf = [0_u8; 8192];
loop {
match reader.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
let chunk = &buf[..n];
let _ = stdout.write_all(chunk);
let _ = stdout.flush();
if reader_terminal.ingest(chunk).is_err() {
break;
}
let _ = notify_tx.send(());
}
Err(_) => break,
}
}
});
Ok(terminal)
}
fn ingest(&self, bytes: &[u8]) -> Result<()> {
let mut state = self
.state
.lock()
.map_err(|_| anyhow::anyhow!("lock poisoned"))?;
state.parser.process(bytes);
state.raw_scrollback.push(bytes);
state.clean_scrollback.push(strip_ansi(bytes).as_bytes());
Ok(())
}
async fn act(
&self,
action: TerminalAction,
timeout_ms: Option<u64>,
) -> Result<TerminalOutput, TerminalToolError> {
match &action {
TerminalAction::Type { text } => {
status(&format!("agent typing: {}", visible(text)));
self.append_pending(text)?;
self.write_bytes(text.as_bytes())?;
}
TerminalAction::Enter => {
status("agent presses Enter");
self.record_enter()?;
self.write_bytes(b"\r")?;
}
TerminalAction::TypeAndEnter { text } => {
status(&format!("agent enters: {}", visible(text)));
self.append_pending(text)?;
self.write_bytes(text.as_bytes())?;
self.record_enter()?;
self.write_bytes(b"\r")?;
}
TerminalAction::CtrlC => {
status("agent sends Ctrl-C");
self.finish_current_command(Some(130))?;
self.write_bytes(&[3])?;
}
TerminalAction::Key { key } => {
status(&format!("agent key: {key}"));
self.write_bytes(&key_to_bytes(key))?;
}
TerminalAction::Observe { mode, .. } => {
status(&format!("agent observes: {mode:?}"));
}
}
if let Some(ms) = timeout_ms {
self.wait_for_quiet(Duration::from_millis(ms)).await;
} else if !matches!(action, TerminalAction::Observe { .. }) {
self.wait_for_quiet(Duration::from_millis(350)).await;
}
let max_bytes = match &action {
TerminalAction::Observe { max_bytes, .. } => *max_bytes,
_ => None,
};
self.output_for(action, max_bytes)
}
fn write_bytes(&self, bytes: &[u8]) -> Result<(), TerminalToolError> {
let mut writer = self.writer.lock().map_err(|_| TerminalToolError::Lock)?;
writer
.write_all(bytes)
.map_err(|error| TerminalToolError::Io(error.to_string()))?;
writer
.flush()
.map_err(|error| TerminalToolError::Io(error.to_string()))?;
Ok(())
}
fn append_pending(&self, text: &str) -> Result<(), TerminalToolError> {
let mut state = self.state.lock().map_err(|_| TerminalToolError::Lock)?;
state.pending_line.push_str(text);
Ok(())
}
fn record_enter(&self) -> Result<(), TerminalToolError> {
let mut state = self.state.lock().map_err(|_| TerminalToolError::Lock)?;
let command = state.pending_line.trim().to_string();
state.pending_line.clear();
if command.is_empty() {
return Ok(());
}
if let Some(previous) = state.current_command.take() {
state.finish_command(previous, None);
}
let command_id = format!("cmd-{}", state.next_command_id);
state.next_command_id += 1;
let started_at_clean_len = state.clean_scrollback.len_seen();
state.current_command = Some(InFlightCommand {
command_id,
command,
started_at_clean_len,
});
Ok(())
}
fn finish_current_command(&self, exit_code: Option<i32>) -> Result<(), TerminalToolError> {
let mut state = self.state.lock().map_err(|_| TerminalToolError::Lock)?;
if let Some(command) = state.current_command.take() {
state.finish_command(command, exit_code);
}
Ok(())
}
async fn wait_for_quiet(&self, max_wait: Duration) {
status("agent waiting for PTY output");
let started = Instant::now();
let mut last_output = Instant::now();
let mut rx = self.notify_tx.subscribe();
while started.elapsed() < max_wait {
match tokio::time::timeout(Duration::from_millis(120), rx.recv()).await {
Ok(Ok(_)) => last_output = Instant::now(),
Ok(Err(_)) => break,
Err(_) if last_output.elapsed() > Duration::from_millis(180) => break,
Err(_) => {}
}
}
}
fn output_for(
&self,
action: TerminalAction,
max_bytes: Option<usize>,
) -> Result<TerminalOutput, TerminalToolError> {
let state = self.state.lock().map_err(|_| TerminalToolError::Lock)?;
let max = max_bytes.unwrap_or(24_000);
let mut output = state.output(max);
if let TerminalAction::Observe {
mode: ObserveMode::CommandOutput { command_id },
..
} = action
{
output.recent_output = state
.command_history
.iter()
.find(|record| record.command_id == command_id)
.map(|record| tail_utf8(&record.full_output, max))
.unwrap_or_else(|| format!("No command output found for command_id={command_id}"));
}
Ok(output)
}
fn shutdown(&self) {
let _ = self.write_bytes(b"exit\r");
if let Ok(mut child) = self.child.lock() {
let _ = child.kill();
}
}
}
impl TerminalState {
fn output(&self, max_bytes: usize) -> TerminalOutput {
TerminalOutput {
screen: self.parser.screen().contents(),
recent_output: self.clean_scrollback.suffix_string(max_bytes),
running: self.current_command.is_some(),
exit_code: self.last_exit_code,
cwd: env::current_dir()
.ok()
.map(|path| path.display().to_string()),
command_history_excerpt: self
.command_history
.iter()
.rev()
.take(20)
.map(|record| {
format!(
"{} exit={:?} {}",
record.command_id, record.exit_code, record.command
)
})
.collect(),
memory_summary: (!self.memory_summary.is_empty()).then(|| self.memory_summary.clone()),
in_alternate_screen: self.parser.screen().alternate_screen(),
}
}
fn finish_command(&mut self, command: InFlightCommand, exit_code: Option<i32>) {
let full_output = self
.clean_scrollback
.since_seen(command.started_at_clean_len, 512_000);
let output_excerpt = tail_utf8(&full_output, 8_000);
let output_summary = summarize_output(&full_output);
self.last_exit_code = exit_code;
self.command_history.push_back(CommandRecord {
command_id: command.command_id,
command: command.command,
cwd: env::current_dir()
.ok()
.map(|path| path.display().to_string()),
exit_code,
output_excerpt,
output_summary,
full_output,
});
while self.command_history.len() > 80 {
if let Some(old) = self.command_history.pop_front() {
let summary = old
.output_summary
.unwrap_or_else(|| tail_utf8(&old.output_excerpt, 600));
self.memory_summary.push_str(&format!(
"\n{}: `{}` exit={:?}\n{}\n",
old.command_id, old.command, old.exit_code, summary
));
}
}
}
}
impl Tool for TerminalTool {
const NAME: &'static str = "terminal";
type Error = TerminalToolError;
type Args = TerminalInput;
type Output = TerminalOutput;
async fn definition(&self, _prompt: String) -> ToolDefinition {
ToolDefinition {
name: Self::NAME.to_string(),
description: "Interact with and observe the single persistent observable PTY terminal shared with the human. This is your only tool. Use type_and_enter for shell commands, explicit key presses for full-screen apps, and observe actions to inspect screen, scrollback, command history, command output, or memory.".to_string(),
parameters: json!({
"type": "object",
"properties": {
"action": {
"oneOf": [
{"type": "object", "properties": {"type": {"type": "string", "enum": ["type"]}, "text": {"type": "string"}}, "required": ["type", "text"], "additionalProperties": false},
{"type": "object", "properties": {"type": {"type": "string", "enum": ["enter"]}}, "required": ["type"], "additionalProperties": false},
{"type": "object", "properties": {"type": {"type": "string", "enum": ["type_and_enter"]}, "text": {"type": "string"}}, "required": ["type", "text"], "additionalProperties": false},
{"type": "object", "properties": {"type": {"type": "string", "enum": ["ctrl_c"]}}, "required": ["type"], "additionalProperties": false},
{"type": "object", "properties": {"type": {"type": "string", "enum": ["key"]}, "key": {"type": "string", "description": "Named key: escape, tab, backspace, up, down, left, right, home, end, page_up, page_down, ctrl-d, ctrl-l, ctrl-z, or a literal character."}}, "required": ["type", "key"], "additionalProperties": false},
{"type": "object", "properties": {
"type": {"type": "string", "enum": ["observe"]},
"mode": {
"oneOf": [
{"type": "object", "properties": {"type": {"type": "string", "enum": ["screen"]}}, "required": ["type"], "additionalProperties": false},
{"type": "object", "properties": {"type": {"type": "string", "enum": ["recent_output"]}}, "required": ["type"], "additionalProperties": false},
{"type": "object", "properties": {"type": {"type": "string", "enum": ["command_history"]}}, "required": ["type"], "additionalProperties": false},
{"type": "object", "properties": {"type": {"type": "string", "enum": ["command_output"]}, "command_id": {"type": "string"}}, "required": ["type", "command_id"], "additionalProperties": false},
{"type": "object", "properties": {"type": {"type": "string", "enum": ["memory_summary"]}}, "required": ["type"], "additionalProperties": false},
{"type": "object", "properties": {"type": {"type": "string", "enum": ["full_state"]}}, "required": ["type"], "additionalProperties": false}
]
},
"max_bytes": {"type": "integer", "minimum": 1}
}, "required": ["type", "mode"], "additionalProperties": false}
]
},
"timeout_ms": {"type": "integer", "minimum": 0}
},
"required": ["action"],
"additionalProperties": false
}),
}
}
async fn call(&self, args: Self::Args) -> Result<Self::Output, Self::Error> {
self.terminal.act(args.action, args.timeout_ms).await
}
}
async fn run_agent(args: Args, terminal: SharedTerminal) -> Result<()> {
let agent = openai::Client::from_env()
.agent(args.model)
.preamble(SYSTEM_PROMPT)
.temperature(0.0)
.tool_choice(ToolChoice::Required)
.tool(TerminalTool { terminal })
.build();
let stream = agent
.stream_prompt(args.task)
.multi_turn(args.max_steps)
.await;
tokio::pin!(stream);
while let Some(item) = stream.next().await {
match item {
Ok(MultiTurnStreamItem::StreamAssistantItem(content)) => {
if matches!(content, StreamedAssistantContent::ToolCall { .. }) {
status("agent thinking -> terminal tool");
}
}
Ok(MultiTurnStreamItem::FinalResponse(response)) => {
status(&format!("agent final: {}", response.response()));
break;
}
Ok(_) => {}
Err(error) => {
status(&format!(
"agent error: {}",
describe_streaming_error(&error)
));
break;
}
}
}
Ok(())
}
fn ensure_openai_api_key() -> Result<()> {
if env::var("OPENAI_BASE_URL")
.map(|value| value.trim().is_empty())
.unwrap_or(false)
{
env::remove_var("OPENAI_BASE_URL");
}
let api_key = env::var("OPENAI_API_KEY").context("missing OPENAI_API_KEY")?;
if api_key.trim().is_empty() {
bail!("OPENAI_API_KEY is empty");
}
Ok(())
}
fn shell_path() -> String {
env::var("SHELL")
.ok()
.filter(|path| !path.trim().is_empty())
.or_else(|| {
if std::path::Path::new("/bin/bash").exists() {
Some("/bin/bash".to_string())
} else {
Some("/bin/sh".to_string())
}
})
.unwrap()
}
fn terminal_size() -> PtySize {
let cols = env::var("COLUMNS")
.ok()
.and_then(|value| value.parse().ok())
.unwrap_or(100);
let rows = env::var("LINES")
.ok()
.and_then(|value| value.parse().ok())
.unwrap_or(30);
PtySize {
rows,
cols,
pixel_width: 0,
pixel_height: 0,
}
}
fn status(message: &str) {
let _ = writeln!(io::stderr(), "\r\n[pty-agent] {message}");
}
fn key_to_bytes(key: &str) -> Vec<u8> {
match key.to_ascii_lowercase().as_str() {
"escape" | "esc" => vec![0x1b],
"tab" => vec![b'\t'],
"backspace" => vec![0x7f],
"up" => b"\x1b[A".to_vec(),
"down" => b"\x1b[B".to_vec(),
"right" => b"\x1b[C".to_vec(),
"left" => b"\x1b[D".to_vec(),
"home" => b"\x1b[H".to_vec(),
"end" => b"\x1b[F".to_vec(),
"page_up" | "pageup" => b"\x1b[5~".to_vec(),
"page_down" | "pagedown" => b"\x1b[6~".to_vec(),
"ctrl-d" => vec![4],
"ctrl-l" => vec![12],
"ctrl-z" => vec![26],
other => other.as_bytes().to_vec(),
}
}
fn strip_ansi(bytes: &[u8]) -> String {
let mut output = String::new();
let mut escaping = false;
for ch in String::from_utf8_lossy(bytes).chars() {
if escaping {
if ch.is_ascii_alphabetic() || ch == '~' {
escaping = false;
}
continue;
}
if ch == '\x1b' {
escaping = true;
} else if ch != '\r' {
output.push(ch);
}
}
output
}
fn summarize_output(output: &str) -> Option<String> {
let lines = output
.lines()
.filter(|line| {
let lower = line.to_ascii_lowercase();
lower.contains("error")
|| lower.contains("failed")
|| lower.contains("panic")
|| lower.contains("test result")
|| lower.contains("passing")
|| lower.contains("compil")
|| lower.contains("modified")
})
.take(12)
.collect::<Vec<_>>();
(!lines.is_empty()).then(|| lines.join("\n"))
}
fn tail_utf8(text: &str, max_bytes: usize) -> String {
if text.len() <= max_bytes {
return text.to_string();
}
let mut start = text.len() - max_bytes;
while !text.is_char_boundary(start) {
start += 1;
}
text[start..].to_string()
}
fn visible(text: &str) -> String {
let text = text.replace('\n', "\\n").replace('\r', "\\r");
if text.len() > 120 {
format!("{}...", &text[..120])
} else {
text
}
}
fn describe_streaming_error(error: &StreamingError) -> String {
match error {
StreamingError::Prompt(error) => format!("prompt error: {error}"),
StreamingError::Completion(error) => format!("completion error: {error}"),
StreamingError::Tool(error) => format!("tool error: {error}"),
}
}