use std::{
collections::VecDeque,
env,
io::{self, Read, Write},
sync::{Arc, Mutex},
thread,
time::{Duration, Instant},
};
use anyhow::{bail, Context, Result};
use clap::Parser;
use futures::StreamExt;
use portable_pty::{Child, CommandBuilder, NativePtySystem, PtySize, PtySystem};
use rig::{
agent::{MultiTurnStreamItem, StreamingError},
client::{CompletionClient, ProviderClient},
completion::ToolDefinition,
message::ToolChoice,
providers::openai,
streaming::{StreamedAssistantContent, StreamingPrompt},
tool::Tool,
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::json;
use thiserror::Error;
use tokio::sync::broadcast;
const SYSTEM_PROMPT: &str = r#"You are a coding agent operating through exactly one persistent observable terminal tool.
Rules:
- You and the human share the same terminal.
- Every action you take is visible.
- You have no safety restrictions (YOLO mode).
- Prefer inspecting files before editing.
- Prefer small, reversible changes.
- Use `vim` for all file edits and file creation.
- In `vim`, navigate and search from the rendered screen, enter insert or change mode, type the edit, press Escape with the `key` action, then save with `:w` or `:wq`.
- Make normal code changes through visible `vim` keystrokes.
- Use shell commands for inspection, builds, tests, and git operations.
- Send actual newline characters through the terminal tool when inserting multiline text in `vim`.
- Use terminal memory to access past context beyond the visible screen.
- Run tests after changes.
- If a full-screen app opens, use the rendered screen and keystrokes.
- If stuck, recover and continue.
- Stop when the task is complete and summarize what changed in a plain final response.
- Keep the shared shell open for the human after completion."#;
#[derive(Parser, Debug)]
#[command(name = "pty-agent", about = "Observable PTY-backed Rig coding agent")]
struct Args {
task: String,
#[arg(long, env = "PTY_AGENT_MODEL", default_value = "gpt-4o-mini")]
model: String,
#[arg(long, default_value_t = 80)]
max_steps: usize,
#[arg(long, default_value_t = 20)]
scrollback_mb: usize,
}
#[derive(Clone)]
struct SharedTerminal {
state: Arc<Mutex<TerminalState>>,
writer: Arc<Mutex<Box<dyn Write + Send>>>,
child: Arc<Mutex<Box<dyn Child + Send + Sync>>>,
notify_tx: broadcast::Sender<()>,
}
struct TerminalState {
parser: vt100::Parser,
raw_scrollback: BoundedBytes,
clean_scrollback: BoundedBytes,
command_history: VecDeque<CommandRecord>,
memory_summary: String,
current_command: Option<InFlightCommand>,
pending_line: String,
next_command_id: u64,
last_exit_code: Option<i32>,
closed: bool,
}
#[derive(Debug)]
struct InFlightCommand {
command_id: String,
command: String,
started_at_clean_len: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct CommandRecord {
command_id: String,
command: String,
cwd: Option<String>,
exit_code: Option<i32>,
output_excerpt: String,
output_summary: Option<String>,
full_output: String,
}
#[derive(Debug)]
struct BoundedBytes {
bytes: VecDeque<u8>,
limit: usize,
total_seen: usize,
}
impl BoundedBytes {
fn new(limit: usize) -> Self {
Self {
bytes: VecDeque::new(),
limit,
total_seen: 0,
}
}
fn push(&mut self, data: &[u8]) {
self.total_seen += data.len();
for byte in data {
if self.bytes.len() == self.limit {
self.bytes.pop_front();
}
self.bytes.push_back(*byte);
}
}
fn len_seen(&self) -> usize {
self.total_seen
}
fn suffix_string(&self, max_bytes: usize) -> String {
let count = max_bytes.min(self.bytes.len());
let start = self.bytes.len().saturating_sub(count);
let bytes = self.bytes.iter().skip(start).copied().collect::<Vec<_>>();
String::from_utf8_lossy(&bytes).into_owned()
}
fn since_seen(&self, seen_at_start: usize, max_bytes: usize) -> String {
let retained_start = self.total_seen.saturating_sub(self.bytes.len());
let offset = seen_at_start.saturating_sub(retained_start);
let bytes = self.bytes.iter().skip(offset).copied().collect::<Vec<_>>();
tail_utf8(&String::from_utf8_lossy(&bytes), max_bytes)
}
}
#[derive(Clone)]
struct TerminalTool {
terminal: SharedTerminal,
}
#[derive(Debug, Deserialize, JsonSchema)]
struct TerminalInput {
action: TerminalAction,
timeout_ms: Option<u64>,
}
#[derive(Debug, Deserialize, JsonSchema)]
#[serde(tag = "type", rename_all = "snake_case")]
enum TerminalAction {
Type {
text: String,
},
Enter,
TypeAndEnter {
text: String,
},
CtrlC,
Key {
key: String,
},
Observe {
mode: ObserveMode,
max_bytes: Option<usize>,
},
}
#[derive(Debug, Deserialize, JsonSchema)]
#[serde(tag = "type", rename_all = "snake_case")]
enum ObserveMode {
Screen,
RecentOutput,
CommandHistory,
CommandOutput { command_id: String },
MemorySummary,
FullState,
}
#[derive(Debug, Serialize)]
struct TerminalOutput {
screen: String,
recent_output: String,
running: bool,
exit_code: Option<i32>,
cwd: Option<String>,
command_history_excerpt: Vec<String>,
memory_summary: Option<String>,
in_alternate_screen: bool,
}
#[derive(Debug, Error)]
enum TerminalToolError {
#[error("terminal lock poisoned")]
Lock,
#[error("terminal io error: {0}")]
Io(String),
}
#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
ensure_openai_api_key()?;
status(&format!("task: {}", args.task));
status("starting persistent shell PTY");
let terminal = SharedTerminal::spawn(args.scrollback_mb * 1024 * 1024)
.context("failed to launch persistent PTY shell")?;
status("agent thinking");
run_agent(args, terminal.clone()).await?;
status("finished");
terminal.shutdown();
Ok(())
}
impl SharedTerminal {
fn spawn(scrollback_limit: usize) -> Result<Self> {
let size = terminal_size();
let pair = NativePtySystem::default().openpty(size)?;
let mut cmd = CommandBuilder::new(shell_path());
cmd.env(
"TERM",
env::var("TERM").unwrap_or_else(|_| "xterm-256color".to_string()),
);
let child = pair.slave.spawn_command(cmd)?;
drop(pair.slave);
let writer = pair.master.take_writer()?;
let mut reader = pair.master.try_clone_reader()?;
let (notify_tx, _) = broadcast::channel(256);
let state = Arc::new(Mutex::new(TerminalState {
parser: vt100::Parser::new(size.rows, size.cols, 0),
raw_scrollback: BoundedBytes::new(scrollback_limit),
clean_scrollback: BoundedBytes::new(scrollback_limit),
command_history: VecDeque::new(),
memory_summary: String::new(),
current_command: None,
pending_line: String::new(),
next_command_id: 1,
last_exit_code: None,
closed: false,
}));
let terminal = Self {
state: state.clone(),
writer: Arc::new(Mutex::new(writer)),
child: Arc::new(Mutex::new(child)),
notify_tx: notify_tx.clone(),
};
let reader_terminal = terminal.clone();
thread::spawn(move || {
let mut stdout = io::stdout();
let mut buf = [0_u8; 8192];
loop {
match reader.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
let chunk = &buf[..n];
let _ = stdout.write_all(chunk);
let _ = stdout.flush();
if reader_terminal.ingest(chunk).is_err() {
break;
}
let _ = notify_tx.send(());
}
Err(_) => break,
}
}
reader_terminal.mark_closed();
let _ = notify_tx.send(());
status("shared PTY closed; exiting pty-agent");
std::process::exit(0);
});
Ok(terminal)
}
fn ingest(&self, bytes: &[u8]) -> Result<()> {
let mut state = self
.state
.lock()
.map_err(|_| anyhow::anyhow!("lock poisoned"))?;
state.parser.process(bytes);
state.raw_scrollback.push(bytes);
state.clean_scrollback.push(strip_ansi(bytes).as_bytes());
Ok(())
}
async fn act(
&self,
action: TerminalAction,
timeout_ms: Option<u64>,
) -> Result<TerminalOutput, TerminalToolError> {
if self.is_closed()? && !matches!(action, TerminalAction::Observe { .. }) {
status("terminal is closed; exiting pty-agent");
std::process::exit(0);
}
match &action {
TerminalAction::Type { text } => {
let text = normalize_terminal_text(text);
status(&format!("agent typing: {}", visible(&text)));
self.append_pending(&text)?;
self.write_bytes(text.as_bytes())?;
}
TerminalAction::Enter => {
status("agent presses Enter");
self.record_enter()?;
self.write_bytes(b"\r")?;
}
TerminalAction::TypeAndEnter { text } => {
let text = normalize_terminal_text(text);
status(&format!("agent enters: {}", visible(&text)));
self.append_pending(&text)?;
self.write_bytes(text.as_bytes())?;
self.record_enter()?;
self.write_bytes(b"\r")?;
}
TerminalAction::CtrlC => {
status("agent sends Ctrl-C");
self.finish_current_command(Some(130))?;
self.write_bytes(&[3])?;
}
TerminalAction::Key { key } => {
status(&format!("agent key: {key}"));
self.write_bytes(&key_to_bytes(key))?;
}
TerminalAction::Observe { mode, .. } => {
status(&format!("agent observes: {mode:?}"));
}
}
if let Some(ms) = timeout_ms {
self.wait_for_quiet(Duration::from_millis(ms)).await;
} else if !matches!(action, TerminalAction::Observe { .. }) {
self.wait_for_quiet(Duration::from_millis(350)).await;
}
let max_bytes = match &action {
TerminalAction::Observe { max_bytes, .. } => *max_bytes,
_ => None,
};
self.output_for(action, max_bytes)
}
fn write_bytes(&self, bytes: &[u8]) -> Result<(), TerminalToolError> {
if self.is_closed()? {
return Ok(());
}
let mut writer = self.writer.lock().map_err(|_| TerminalToolError::Lock)?;
writer
.write_all(bytes)
.map_err(|error| TerminalToolError::Io(error.to_string()))?;
writer
.flush()
.map_err(|error| TerminalToolError::Io(error.to_string()))?;
Ok(())
}
fn is_closed(&self) -> Result<bool, TerminalToolError> {
let state = self.state.lock().map_err(|_| TerminalToolError::Lock)?;
Ok(state.closed)
}
fn mark_closed(&self) {
if let Ok(mut state) = self.state.lock() {
state.closed = true;
if let Some(command) = state.current_command.take() {
let exit_code = state.last_exit_code;
state.finish_command(command, exit_code);
}
}
status("shared PTY closed");
}
fn append_pending(&self, text: &str) -> Result<(), TerminalToolError> {
let mut state = self.state.lock().map_err(|_| TerminalToolError::Lock)?;
state.pending_line.push_str(text);
Ok(())
}
fn record_enter(&self) -> Result<(), TerminalToolError> {
let mut state = self.state.lock().map_err(|_| TerminalToolError::Lock)?;
let command = state.pending_line.trim().to_string();
state.pending_line.clear();
if command.is_empty() {
return Ok(());
}
if let Some(previous) = state.current_command.take() {
state.finish_command(previous, None);
}
let command_id = format!("cmd-{}", state.next_command_id);
state.next_command_id += 1;
let started_at_clean_len = state.clean_scrollback.len_seen();
state.current_command = Some(InFlightCommand {
command_id,
command,
started_at_clean_len,
});
Ok(())
}
fn finish_current_command(&self, exit_code: Option<i32>) -> Result<(), TerminalToolError> {
let mut state = self.state.lock().map_err(|_| TerminalToolError::Lock)?;
if let Some(command) = state.current_command.take() {
state.finish_command(command, exit_code);
}
Ok(())
}
async fn wait_for_quiet(&self, max_wait: Duration) {
status("agent waiting for PTY output");
let started = Instant::now();
let mut last_output = Instant::now();
let mut rx = self.notify_tx.subscribe();
while started.elapsed() < max_wait {
match tokio::time::timeout(Duration::from_millis(120), rx.recv()).await {
Ok(Ok(_)) => last_output = Instant::now(),
Ok(Err(_)) => break,
Err(_) if last_output.elapsed() > Duration::from_millis(180) => break,
Err(_) => {}
}
}
}
fn output_for(
&self,
action: TerminalAction,
max_bytes: Option<usize>,
) -> Result<TerminalOutput, TerminalToolError> {
let state = self.state.lock().map_err(|_| TerminalToolError::Lock)?;
let max = max_bytes.unwrap_or(24_000);
let mut output = state.output(max);
if let TerminalAction::Observe {
mode: ObserveMode::CommandOutput { command_id },
..
} = action
{
output.recent_output = state
.command_history
.iter()
.find(|record| record.command_id == command_id)
.map(|record| tail_utf8(&record.full_output, max))
.unwrap_or_else(|| format!("No command output found for command_id={command_id}"));
}
Ok(output)
}
fn shutdown(&self) {
if !self.is_closed().unwrap_or(true) {
let _ = self.write_bytes(b"exit\r");
}
if let Ok(mut child) = self.child.lock() {
let _ = child.kill();
}
}
}
impl TerminalState {
fn output(&self, max_bytes: usize) -> TerminalOutput {
TerminalOutput {
screen: self.parser.screen().contents(),
recent_output: self.clean_scrollback.suffix_string(max_bytes),
running: !self.closed && self.current_command.is_some(),
exit_code: self.last_exit_code,
cwd: env::current_dir()
.ok()
.map(|path| path.display().to_string()),
command_history_excerpt: self
.command_history
.iter()
.rev()
.take(20)
.map(|record| {
format!(
"{} exit={:?} {}",
record.command_id, record.exit_code, record.command
)
})
.collect(),
memory_summary: (!self.memory_summary.is_empty()).then(|| self.memory_summary.clone()),
in_alternate_screen: self.parser.screen().alternate_screen(),
}
}
fn finish_command(&mut self, command: InFlightCommand, exit_code: Option<i32>) {
let full_output = self
.clean_scrollback
.since_seen(command.started_at_clean_len, 512_000);
let output_excerpt = tail_utf8(&full_output, 8_000);
let output_summary = summarize_output(&full_output);
self.last_exit_code = exit_code;
self.command_history.push_back(CommandRecord {
command_id: command.command_id,
command: command.command,
cwd: env::current_dir()
.ok()
.map(|path| path.display().to_string()),
exit_code,
output_excerpt,
output_summary,
full_output,
});
while self.command_history.len() > 80 {
if let Some(old) = self.command_history.pop_front() {
let summary = old
.output_summary
.unwrap_or_else(|| tail_utf8(&old.output_excerpt, 600));
self.memory_summary.push_str(&format!(
"\n{}: `{}` exit={:?}\n{}\n",
old.command_id, old.command, old.exit_code, summary
));
}
}
}
}
impl Tool for TerminalTool {
const NAME: &'static str = "terminal";
type Error = TerminalToolError;
type Args = TerminalInput;
type Output = TerminalOutput;
async fn definition(&self, _prompt: String) -> ToolDefinition {
ToolDefinition {
name: Self::NAME.to_string(),
description: "Interact with and observe the single persistent observable PTY terminal shared with the human. This is your only tool. Use type_and_enter for shell commands, explicit key presses for full-screen apps, and observe actions to inspect screen, scrollback, command history, command output, or memory. Use vim for file edits and file creation. For multiline insertion in vim, send actual newline characters through the terminal tool.".to_string(),
parameters: json!({
"type": "object",
"properties": {
"action": {
"oneOf": [
{"type": "object", "properties": {"type": {"type": "string", "enum": ["type"]}, "text": {"type": "string"}}, "required": ["type", "text"], "additionalProperties": false},
{"type": "object", "properties": {"type": {"type": "string", "enum": ["enter"]}}, "required": ["type"], "additionalProperties": false},
{"type": "object", "properties": {"type": {"type": "string", "enum": ["type_and_enter"]}, "text": {"type": "string", "description": "Text to type into the PTY followed by Enter. For multiline shell constructs, include actual newline characters in this string."}}, "required": ["type", "text"], "additionalProperties": false},
{"type": "object", "properties": {"type": {"type": "string", "enum": ["ctrl_c"]}}, "required": ["type"], "additionalProperties": false},
{"type": "object", "properties": {"type": {"type": "string", "enum": ["key"]}, "key": {"type": "string", "description": "Named key: escape, tab, backspace, up, down, left, right, home, end, page_up, page_down, ctrl-d, ctrl-l, ctrl-z, or a literal character."}}, "required": ["type", "key"], "additionalProperties": false},
{"type": "object", "properties": {
"type": {"type": "string", "enum": ["observe"]},
"mode": {
"oneOf": [
{"type": "object", "properties": {"type": {"type": "string", "enum": ["screen"]}}, "required": ["type"], "additionalProperties": false},
{"type": "object", "properties": {"type": {"type": "string", "enum": ["recent_output"]}}, "required": ["type"], "additionalProperties": false},
{"type": "object", "properties": {"type": {"type": "string", "enum": ["command_history"]}}, "required": ["type"], "additionalProperties": false},
{"type": "object", "properties": {"type": {"type": "string", "enum": ["command_output"]}, "command_id": {"type": "string"}}, "required": ["type", "command_id"], "additionalProperties": false},
{"type": "object", "properties": {"type": {"type": "string", "enum": ["memory_summary"]}}, "required": ["type"], "additionalProperties": false},
{"type": "object", "properties": {"type": {"type": "string", "enum": ["full_state"]}}, "required": ["type"], "additionalProperties": false}
]
},
"max_bytes": {"type": "integer", "minimum": 1}
}, "required": ["type", "mode"], "additionalProperties": false}
]
},
"timeout_ms": {"type": "integer", "minimum": 0}
},
"required": ["action"],
"additionalProperties": false
}),
}
}
async fn call(&self, args: Self::Args) -> Result<Self::Output, Self::Error> {
self.terminal.act(args.action, args.timeout_ms).await
}
}
async fn run_agent(args: Args, terminal: SharedTerminal) -> Result<()> {
let agent = openai::Client::from_env()
.agent(args.model)
.preamble(SYSTEM_PROMPT)
.temperature(0.0)
.tool_choice(ToolChoice::Required)
.tool(TerminalTool { terminal })
.build();
let stream = agent
.stream_prompt(args.task)
.multi_turn(args.max_steps)
.await;
tokio::pin!(stream);
while let Some(item) = stream.next().await {
match item {
Ok(MultiTurnStreamItem::StreamAssistantItem(content)) => {
if matches!(content, StreamedAssistantContent::ToolCall { .. }) {
status("agent thinking -> terminal tool");
}
}
Ok(MultiTurnStreamItem::FinalResponse(response)) => {
status(&format!("agent final: {}", response.response()));
break;
}
Ok(_) => {}
Err(error) => {
status(&format!(
"agent error: {}",
describe_streaming_error(&error)
));
break;
}
}
}
Ok(())
}
fn ensure_openai_api_key() -> Result<()> {
if env::var("OPENAI_BASE_URL")
.map(|value| value.trim().is_empty())
.unwrap_or(false)
{
env::remove_var("OPENAI_BASE_URL");
}
let api_key = env::var("OPENAI_API_KEY").context("missing OPENAI_API_KEY")?;
if api_key.trim().is_empty() {
bail!("OPENAI_API_KEY is empty");
}
Ok(())
}
fn shell_path() -> String {
env::var("SHELL")
.ok()
.filter(|path| !path.trim().is_empty())
.or_else(|| {
if std::path::Path::new("/bin/bash").exists() {
Some("/bin/bash".to_string())
} else {
Some("/bin/sh".to_string())
}
})
.unwrap()
}
fn terminal_size() -> PtySize {
let cols = env::var("COLUMNS")
.ok()
.and_then(|value| value.parse().ok())
.unwrap_or(100);
let rows = env::var("LINES")
.ok()
.and_then(|value| value.parse().ok())
.unwrap_or(30);
PtySize {
rows,
cols,
pixel_width: 0,
pixel_height: 0,
}
}
fn status(message: &str) {
let _ = writeln!(io::stderr(), "\r\n[pty-agent] {message}");
}
fn key_to_bytes(key: &str) -> Vec<u8> {
match key.to_ascii_lowercase().as_str() {
"escape" | "esc" => vec![0x1b],
"tab" => vec![b'\t'],
"backspace" => vec![0x7f],
"up" => b"\x1b[A".to_vec(),
"down" => b"\x1b[B".to_vec(),
"right" => b"\x1b[C".to_vec(),
"left" => b"\x1b[D".to_vec(),
"home" => b"\x1b[H".to_vec(),
"end" => b"\x1b[F".to_vec(),
"page_up" | "pageup" => b"\x1b[5~".to_vec(),
"page_down" | "pagedown" => b"\x1b[6~".to_vec(),
"ctrl-d" => vec![4],
"ctrl-l" => vec![12],
"ctrl-z" => vec![26],
other => other.as_bytes().to_vec(),
}
}
fn normalize_terminal_text(text: &str) -> String {
if !text.contains("\\n") {
return text.to_string();
}
let literal_newlines = text.matches("\\n").count();
if literal_newlines < 2 {
return text.to_string();
}
text.replace("\\n", "\n")
}
fn strip_ansi(bytes: &[u8]) -> String {
let mut output = String::new();
let mut escaping = false;
for ch in String::from_utf8_lossy(bytes).chars() {
if escaping {
if ch.is_ascii_alphabetic() || ch == '~' {
escaping = false;
}
continue;
}
if ch == '\x1b' {
escaping = true;
} else if ch != '\r' {
output.push(ch);
}
}
output
}
fn summarize_output(output: &str) -> Option<String> {
let lines = output
.lines()
.filter(|line| {
let lower = line.to_ascii_lowercase();
lower.contains("error")
|| lower.contains("failed")
|| lower.contains("panic")
|| lower.contains("test result")
|| lower.contains("passing")
|| lower.contains("compil")
|| lower.contains("modified")
})
.take(12)
.collect::<Vec<_>>();
(!lines.is_empty()).then(|| lines.join("\n"))
}
fn tail_utf8(text: &str, max_bytes: usize) -> String {
if text.len() <= max_bytes {
return text.to_string();
}
let mut start = text.len() - max_bytes;
while !text.is_char_boundary(start) {
start += 1;
}
text[start..].to_string()
}
fn visible(text: &str) -> String {
let text = text.replace('\n', "\\n").replace('\r', "\\r");
if text.len() > 120 {
format!("{}...", &text[..120])
} else {
text
}
}
fn describe_streaming_error(error: &StreamingError) -> String {
match error {
StreamingError::Prompt(error) => format!("prompt error: {error}"),
StreamingError::Completion(error) => format!("completion error: {error}"),
StreamingError::Tool(error) => format!("tool error: {error}"),
}
}