use std::borrow::Cow;
use std::io::{self, IsTerminal, Write};
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, Ordering};
use async_trait::async_trait;
use rustyline::completion::Completer;
use rustyline::config::Config;
use rustyline::error::ReadlineError;
use rustyline::highlight::Highlighter;
use rustyline::hint::Hinter;
use rustyline::validate::Validator;
use rustyline::{
Cmd as ReadlineCmd, CompletionType, ConditionalEventHandler, Editor, Event, EventContext,
EventHandler, Helper, KeyCode, KeyEvent, Modifiers, RepeatCount,
};
use termimad::MadSkin;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use crate::agent::truncate_for_preview;
use crate::bootstrap::ironclaw_base_dir;
use crate::channels::{Channel, IncomingMessage, MessageStream, OutgoingResponse, StatusUpdate};
use crate::cli::fmt;
use crate::error::ChannelError;
const CLI_TOOL_RESULT_MAX: usize = 200;
const CLI_STATUS_MAX: usize = 200;
const SLASH_COMMANDS: &[&str] = &[
"/help",
"/quit",
"/exit",
"/debug",
"/model",
"/undo",
"/redo",
"/clear",
"/compact",
"/new",
"/interrupt",
"/version",
"/tools",
"/ping",
"/job",
"/status",
"/cancel",
"/list",
"/heartbeat",
"/summarize",
"/suggest",
"/thread",
"/resume",
"/reasoning",
];
struct ReplHelper;
impl Completer for ReplHelper {
type Candidate = String;
fn complete(
&self,
line: &str,
pos: usize,
_ctx: &rustyline::Context<'_>,
) -> rustyline::Result<(usize, Vec<String>)> {
if !line.starts_with('/') {
return Ok((0, vec![]));
}
let prefix = &line[..pos];
let matches: Vec<String> = SLASH_COMMANDS
.iter()
.filter(|cmd| cmd.starts_with(prefix))
.map(|cmd| cmd.to_string())
.collect();
Ok((0, matches))
}
}
impl Hinter for ReplHelper {
type Hint = String;
fn hint(&self, line: &str, pos: usize, _ctx: &rustyline::Context<'_>) -> Option<String> {
if !line.starts_with('/') || pos < line.len() {
return None;
}
SLASH_COMMANDS
.iter()
.find(|cmd| cmd.starts_with(line) && **cmd != line)
.map(|cmd| cmd[line.len()..].to_string())
}
}
impl Highlighter for ReplHelper {
fn highlight_hint<'h>(&self, hint: &'h str) -> Cow<'h, str> {
Cow::Owned(format!("{}{hint}{}", fmt::dim(), fmt::reset()))
}
}
impl Validator for ReplHelper {}
impl Helper for ReplHelper {}
struct EscInterruptHandler {
triggered: Arc<AtomicBool>,
}
impl ConditionalEventHandler for EscInterruptHandler {
fn handle(
&self,
_evt: &Event,
_n: RepeatCount,
_positive: bool,
_ctx: &EventContext,
) -> Option<ReadlineCmd> {
self.triggered.store(true, Ordering::Relaxed);
Some(ReadlineCmd::Interrupt)
}
}
#[derive(Clone, Copy)]
enum ApprovalAction {
Approve,
Always,
Deny,
}
impl std::fmt::Display for ApprovalAction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Approve => write!(f, "Approve (y)"),
Self::Always => write!(f, "Always approve (a)"),
Self::Deny => write!(f, "Deny (n)"),
}
}
}
impl ApprovalAction {
fn as_input(self) -> &'static str {
match self {
Self::Approve => "y",
Self::Always => "a",
Self::Deny => "n",
}
}
}
fn run_approval_selector(allow_always: bool) -> Option<&'static str> {
use crossterm::{
cursor,
event::{self, Event as CtEvent, KeyCode as CtKeyCode, KeyEventKind},
execute,
terminal::{self, ClearType},
};
let options: Vec<ApprovalAction> = if allow_always {
vec![
ApprovalAction::Approve,
ApprovalAction::Always,
ApprovalAction::Deny,
]
} else {
vec![ApprovalAction::Approve, ApprovalAction::Deny]
};
let num = options.len();
let mut sel: usize = 0;
let total_lines = (num + 1) as u16;
let render = |sel: usize| {
let mut w = io::stderr();
let pipe = format!("{}│{}", fmt::accent(), fmt::reset());
for (i, opt) in options.iter().enumerate() {
if i == sel {
let _ = write!(w, " {pipe} {}● {opt}{}\r\n", fmt::bold(), fmt::reset());
} else {
let _ = write!(w, " {pipe} {}○ {opt}{}\r\n", fmt::dim(), fmt::reset());
}
}
let _ = write!(
w,
" {}└{} {}↑↓ enter to select{}\r\n",
fmt::accent(),
fmt::reset(),
fmt::dim(),
fmt::reset()
);
let _ = w.flush();
};
let _ = terminal::enable_raw_mode();
render(sel);
let result = loop {
let Ok(evt) = event::read() else { break None };
if let CtEvent::Key(key) = evt {
if key.kind != KeyEventKind::Press {
continue;
}
match key.code {
CtKeyCode::Up | CtKeyCode::Char('k') => {
sel = if sel == 0 { num - 1 } else { sel - 1 };
}
CtKeyCode::Down | CtKeyCode::Char('j') => {
sel = (sel + 1) % num;
}
CtKeyCode::Enter => break Some(options[sel].as_input()),
CtKeyCode::Char('y') | CtKeyCode::Char('Y') => break Some("y"),
CtKeyCode::Char('a') | CtKeyCode::Char('A') if allow_always => break Some("a"),
CtKeyCode::Char('n') | CtKeyCode::Char('N') => break Some("n"),
CtKeyCode::Esc => break None,
_ => continue,
}
let mut w = io::stderr();
let _ = execute!(w, cursor::MoveUp(total_lines));
let _ = execute!(w, terminal::Clear(ClearType::FromCursorDown));
render(sel);
}
};
let _ = terminal::disable_raw_mode();
let mut w = io::stderr();
let _ = execute!(w, cursor::MoveUp(total_lines));
let _ = execute!(w, terminal::Clear(ClearType::FromCursorDown));
let (label, color) = if let Some(action) = result {
let l = options
.iter()
.find(|o| o.as_input() == action)
.unwrap_or(&options[0]);
let c = if action == "n" {
fmt::error()
} else {
fmt::success()
};
(l.to_string(), c)
} else {
(ApprovalAction::Deny.to_string(), fmt::error())
};
let _ = writeln!(
w,
" {}└{} {color}● {label}{}",
fmt::accent(),
fmt::reset(),
fmt::reset()
);
result
}
fn make_skin() -> MadSkin {
let mut skin = MadSkin::default();
skin.set_headers_fg(crossterm::style::Color::Yellow);
skin.bold.set_fg(crossterm::style::Color::White);
skin.italic.set_fg(crossterm::style::Color::Magenta);
skin.inline_code.set_fg(crossterm::style::Color::Green);
skin.code_block.set_fg(crossterm::style::Color::Green);
skin.code_block.left_margin = 2;
skin
}
fn smart_truncate(s: &str, max_chars: usize) -> Cow<'_, str> {
let char_count = s.chars().count();
if char_count <= max_chars {
return Cow::Borrowed(s);
}
let budget = max_chars.saturating_sub(3);
let head_len = budget / 2;
let tail_len = budget - head_len;
let head: String = s.chars().take(head_len).collect();
let tail: String = s
.chars()
.skip(char_count.saturating_sub(tail_len))
.collect();
Cow::Owned(format!("{head}...{tail}"))
}
fn format_json_params(params: &serde_json::Value, indent: &str) -> String {
let max_val_len = fmt::term_width().saturating_sub(8);
match params {
serde_json::Value::Object(map) => {
let mut lines = Vec::new();
for (key, value) in map {
let val_str = match value {
serde_json::Value::String(s) => {
let display = smart_truncate(s, max_val_len);
format!("{}\"{display}\"{}", fmt::success(), fmt::reset())
}
other => {
let rendered = other.to_string();
smart_truncate(&rendered, max_val_len).into_owned()
}
};
lines.push(format!(
"{indent}{}{key}{}: {val_str}",
fmt::accent(),
fmt::reset()
));
}
lines.join("\n")
}
other => {
let pretty = serde_json::to_string_pretty(other).unwrap_or_else(|_| other.to_string());
let truncated = smart_truncate(&pretty, 300);
truncated
.lines()
.map(|l| format!("{indent}{}{l}{}", fmt::dim(), fmt::reset()))
.collect::<Vec<_>>()
.join("\n")
}
}
}
pub struct ReplChannel {
user_id: String,
single_message: Option<String>,
debug_mode: Arc<AtomicBool>,
is_streaming: Arc<AtomicBool>,
suppress_banner: Arc<AtomicBool>,
msg_tx: Arc<Mutex<Option<mpsc::Sender<IncomingMessage>>>>,
stdin_locked: Arc<AtomicBool>,
transient_lines: std::sync::atomic::AtomicU8,
}
impl ReplChannel {
pub fn new() -> Self {
Self::with_user_id("default")
}
pub fn with_user_id(user_id: impl Into<String>) -> Self {
Self {
user_id: user_id.into(),
single_message: None,
debug_mode: Arc::new(AtomicBool::new(false)),
is_streaming: Arc::new(AtomicBool::new(false)),
suppress_banner: Arc::new(AtomicBool::new(false)),
msg_tx: Arc::new(Mutex::new(None)),
stdin_locked: Arc::new(AtomicBool::new(false)),
transient_lines: std::sync::atomic::AtomicU8::new(0),
}
}
pub fn with_message(message: String) -> Self {
Self::with_message_for_user("default", message)
}
pub fn with_message_for_user(user_id: impl Into<String>, message: String) -> Self {
Self {
user_id: user_id.into(),
single_message: Some(message),
debug_mode: Arc::new(AtomicBool::new(false)),
is_streaming: Arc::new(AtomicBool::new(false)),
suppress_banner: Arc::new(AtomicBool::new(false)),
msg_tx: Arc::new(Mutex::new(None)),
stdin_locked: Arc::new(AtomicBool::new(false)),
transient_lines: std::sync::atomic::AtomicU8::new(0),
}
}
pub fn suppress_banner(&self) {
self.suppress_banner.store(true, Ordering::Relaxed);
}
fn is_debug(&self) -> bool {
self.debug_mode.load(Ordering::Relaxed)
}
fn clear_transient(&self) {
use crossterm::{cursor, execute, terminal};
let n = self.transient_lines.swap(0, Ordering::Relaxed);
if n > 0 {
let mut stderr = io::stderr();
let _ = execute!(stderr, cursor::MoveUp(n as u16));
let _ = execute!(stderr, terminal::Clear(terminal::ClearType::FromCursorDown));
}
}
async fn finish_single_message_turn(&self) {
if self.single_message.is_none() {
return;
}
let tx = self.msg_tx.lock().ok().and_then(|mut guard| guard.take());
if let Some(tx) = tx {
let msg = IncomingMessage::new("repl", &self.user_id, "/quit");
let _ = tx.send(msg).await;
}
}
}
impl Default for ReplChannel {
fn default() -> Self {
Self::new()
}
}
fn print_help() {
let h = fmt::bold();
let c = fmt::bold_accent();
let d = fmt::dim();
let r = fmt::reset();
let hi = fmt::hint();
println!();
println!(" {h}IronClaw REPL{r}");
println!();
println!(" {h}Quick start{r}");
println!(" {c}/new{r} {hi}Start a new thread{r}");
println!(" {c}/compact{r} {hi}Compress context window{r}");
println!(" {c}/quit{r} {hi}Exit{r}");
println!();
println!(" {h}All commands{r}");
println!(
" {d}Conversation{r} {c}/new{r} {c}/clear{r} {c}/compact{r} {c}/undo{r} {c}/redo{r} {c}/summarize{r} {c}/suggest{r}"
);
println!(" {d}Threads{r} {c}/thread{r} {c}/resume{r} {c}/list{r}");
println!(" {d}Execution{r} {c}/interrupt{r} {d}(esc){r} {c}/cancel{r}");
println!(
" {d}System{r} {c}/tools{r} {c}/model{r} {c}/version{r} {c}/status{r} {c}/debug{r} {c}/heartbeat{r}"
);
println!(" {d}Session{r} {c}/help{r} {c}/quit{r}");
println!();
}
fn history_path() -> std::path::PathBuf {
ironclaw_base_dir().join("history")
}
#[async_trait]
impl Channel for ReplChannel {
fn name(&self) -> &str {
"repl"
}
async fn start(&self) -> Result<MessageStream, ChannelError> {
let (tx, rx) = mpsc::channel(32);
if let Ok(mut guard) = self.msg_tx.lock() {
*guard = Some(tx.clone());
}
let single_message = self.single_message.clone();
let user_id = self.user_id.clone();
let debug_mode = Arc::clone(&self.debug_mode);
let suppress_banner = Arc::clone(&self.suppress_banner);
let stdin_locked = Arc::clone(&self.stdin_locked);
let esc_interrupt_triggered_for_thread = Arc::new(AtomicBool::new(false));
std::thread::spawn(move || {
let sys_tz = crate::timezone::detect_system_timezone().name().to_string();
if let Some(msg) = single_message {
let incoming = IncomingMessage::new("repl", &user_id, &msg)
.with_metadata(serde_json::json!({ "single_message_mode": true }))
.with_timezone(&sys_tz);
let _ = tx.blocking_send(incoming);
return;
}
let config = Config::builder()
.history_ignore_dups(true)
.expect("valid config")
.auto_add_history(true)
.completion_type(CompletionType::List)
.build();
let mut rl = match Editor::with_config(config) {
Ok(editor) => editor,
Err(e) => {
eprintln!("Failed to initialize line editor: {e}");
return;
}
};
rl.set_helper(Some(ReplHelper));
rl.bind_sequence(
KeyEvent(KeyCode::Esc, Modifiers::NONE),
EventHandler::Conditional(Box::new(EscInterruptHandler {
triggered: Arc::clone(&esc_interrupt_triggered_for_thread),
})),
);
let hist_path = history_path();
if let Some(parent) = hist_path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let _ = rl.load_history(&hist_path);
if !suppress_banner.load(Ordering::Relaxed) {
println!(
"{}IronClaw{} /help for commands, /quit to exit",
fmt::bold(),
fmt::reset()
);
println!();
}
loop {
while stdin_locked.load(Ordering::Relaxed) {
std::thread::sleep(std::time::Duration::from_millis(50));
}
let prompt = if debug_mode.load(Ordering::Relaxed) {
format!(
"{}[debug]{} {}\u{203A}{} ",
fmt::warning(),
fmt::reset(),
fmt::bold_accent(),
fmt::reset()
)
} else {
format!("{}\u{203A}{} ", fmt::bold_accent(), fmt::reset())
};
match rl.readline(&prompt) {
Ok(line) => {
let line = line.trim();
if line.is_empty() {
continue;
}
match line.to_lowercase().as_str() {
"/quit" | "/exit" => {
let msg = IncomingMessage::new("repl", &user_id, "/quit")
.with_timezone(&sys_tz);
let _ = tx.blocking_send(msg);
break;
}
"/help" => {
print_help();
continue;
}
"/debug" => {
let current = debug_mode.load(Ordering::Relaxed);
debug_mode.store(!current, Ordering::Relaxed);
if !current {
println!("{}debug mode on{}", fmt::dim(), fmt::reset());
} else {
println!("{}debug mode off{}", fmt::dim(), fmt::reset());
}
continue;
}
_ => {}
}
let msg =
IncomingMessage::new("repl", &user_id, line).with_timezone(&sys_tz);
stdin_locked.store(true, Ordering::Relaxed);
if tx.blocking_send(msg).is_err() {
stdin_locked.store(false, Ordering::Relaxed);
break;
}
}
Err(ReadlineError::Interrupted) => {
if esc_interrupt_triggered_for_thread.swap(false, Ordering::Relaxed) {
let msg = IncomingMessage::new("repl", &user_id, "/interrupt")
.with_timezone(&sys_tz);
if tx.blocking_send(msg).is_err() {
break;
}
} else {
let msg = IncomingMessage::new("repl", &user_id, "/quit")
.with_timezone(&sys_tz);
let _ = tx.blocking_send(msg);
break;
}
}
Err(ReadlineError::Eof) => {
if std::io::stdin().is_terminal() {
let msg = IncomingMessage::new("repl", &user_id, "/quit")
.with_timezone(&sys_tz);
let _ = tx.blocking_send(msg);
}
break;
}
Err(e) => {
eprintln!("Input error: {e}");
break;
}
}
}
let _ = rl.save_history(&history_path());
});
Ok(Box::pin(ReceiverStream::new(rx)))
}
async fn respond(
&self,
_msg: &IncomingMessage,
response: OutgoingResponse,
) -> Result<(), ChannelError> {
let width = fmt::term_width();
if self.is_streaming.swap(false, Ordering::Relaxed) {
println!();
println!();
self.stdin_locked.store(false, Ordering::Relaxed);
self.finish_single_message_turn().await;
return Ok(());
}
self.clear_transient();
let sep_width = width.min(80);
eprintln!("{}", fmt::separator(sep_width));
let skin = make_skin();
let text = termimad::FmtText::from(&skin, &response.content, Some(width));
print!("{text}");
println!();
self.stdin_locked.store(false, Ordering::Relaxed);
self.finish_single_message_turn().await;
Ok(())
}
async fn send_status(
&self,
status: StatusUpdate,
_metadata: &serde_json::Value,
) -> Result<(), ChannelError> {
let debug = self.is_debug();
match status {
StatusUpdate::Thinking(msg) => {
self.clear_transient();
let display = truncate_for_preview(&msg, CLI_STATUS_MAX);
eprintln!(" {}\u{25CB} {display}{}", fmt::dim(), fmt::reset());
self.transient_lines.store(1, Ordering::Relaxed);
}
StatusUpdate::ToolStarted { name } => {
self.clear_transient();
eprintln!(" {}\u{25CB} {name}{}", fmt::dim(), fmt::reset());
self.transient_lines.store(1, Ordering::Relaxed);
}
StatusUpdate::ToolCompleted { name, success, .. } => {
self.clear_transient();
if success {
eprintln!(" {}\u{25CF} {name}{}", fmt::success(), fmt::reset());
} else {
eprintln!(" {}\u{2717} {name} (failed){}", fmt::error(), fmt::reset());
}
}
StatusUpdate::ToolResult { name: _, preview } => {
let display = truncate_for_preview(&preview, CLI_TOOL_RESULT_MAX);
eprintln!(" {}{display}{}", fmt::dim(), fmt::reset());
}
StatusUpdate::StreamChunk(chunk) => {
if !self.is_streaming.swap(true, Ordering::Relaxed) {
self.clear_transient();
let sep_width = fmt::term_width().min(80);
eprintln!("{}", fmt::separator(sep_width));
}
print!("{chunk}");
let _ = io::stdout().flush();
}
StatusUpdate::JobStarted {
job_id,
title,
browse_url,
} => {
eprintln!(
" {}[job]{} {title} {}({job_id}){} {}{browse_url}{}",
fmt::accent(),
fmt::reset(),
fmt::dim(),
fmt::reset(),
fmt::link(),
fmt::reset()
);
}
StatusUpdate::Status(msg) => {
if debug || msg.contains("approval") || msg.contains("Approval") {
let display = truncate_for_preview(&msg, CLI_STATUS_MAX);
eprintln!(" {}{display}{}", fmt::dim(), fmt::reset());
}
}
StatusUpdate::ApprovalNeeded {
request_id: _,
tool_name,
description: _,
parameters,
allow_always,
} => {
self.clear_transient();
let pipe = format!("{}│{}", fmt::accent(), fmt::reset());
eprintln!();
eprintln!(
" {}\u{25C6} {}{tool_name}{} requires approval",
fmt::accent(),
fmt::bold(),
fmt::reset()
);
let param_lines = format_json_params(¶meters, &format!(" {pipe} "));
if !param_lines.is_empty() {
eprintln!(" {pipe}");
for line in param_lines.lines() {
eprintln!("{line}");
}
}
eprintln!(" {pipe}");
let msg_tx = Arc::clone(&self.msg_tx);
let user_id = self.user_id.clone();
let lock_flag = Arc::clone(&self.stdin_locked);
let single_message_mode = self.single_message.is_some();
tokio::task::spawn_blocking(move || {
let action = run_approval_selector(allow_always).unwrap_or("n");
lock_flag.store(false, Ordering::Relaxed);
let Ok(guard) = msg_tx.lock() else {
return;
};
if let Some(tx) = guard.as_ref() {
let msg = if single_message_mode {
IncomingMessage::new("repl", &user_id, action)
.with_metadata(serde_json::json!({ "single_message_mode": true }))
} else {
IncomingMessage::new("repl", &user_id, action)
};
let _ = tx.blocking_send(msg);
}
});
}
StatusUpdate::AuthRequired {
extension_name,
instructions,
setup_url,
..
} => {
eprintln!();
eprintln!(
"{} Authentication required for {extension_name}{}",
fmt::warning(),
fmt::reset()
);
if let Some(ref instr) = instructions {
eprintln!(" {instr}");
}
if let Some(ref url) = setup_url {
eprintln!(" {}{url}{}", fmt::link(), fmt::reset());
}
eprintln!();
}
StatusUpdate::AuthCompleted {
extension_name,
success,
message,
} => {
if success {
eprintln!(
"{} {extension_name}: {message}{}",
fmt::success(),
fmt::reset()
);
} else {
eprintln!(
"{} {extension_name}: {message}{}",
fmt::error(),
fmt::reset()
);
}
}
StatusUpdate::ImageGenerated { path, .. } => {
if let Some(ref p) = path {
eprintln!("{} [image] {p}{}", fmt::accent(), fmt::reset());
} else {
eprintln!("{} [image generated]{}", fmt::accent(), fmt::reset());
}
}
StatusUpdate::Suggestions { .. } => {
}
StatusUpdate::ReasoningUpdate {
narrative,
decisions,
} => {
if !narrative.is_empty() {
let display = truncate_for_preview(&narrative, CLI_STATUS_MAX);
eprintln!(" \x1b[94m\u{25B6} {display}\x1b[0m");
}
for d in &decisions {
let display = truncate_for_preview(&d.rationale, CLI_STATUS_MAX);
eprintln!(" \x1b[90m\u{2192} {}: {display}\x1b[0m", d.tool_name);
}
}
StatusUpdate::TurnCost { .. } => {
}
}
Ok(())
}
async fn broadcast(
&self,
_user_id: &str,
response: OutgoingResponse,
) -> Result<(), ChannelError> {
let skin = make_skin();
let width = fmt::term_width();
eprintln!("{}\u{25CF}{} notification", fmt::accent(), fmt::reset());
let text = termimad::FmtText::from(&skin, &response.content, Some(width));
eprint!("{text}");
eprintln!();
Ok(())
}
async fn health_check(&self) -> Result<(), ChannelError> {
Ok(())
}
async fn shutdown(&self) -> Result<(), ChannelError> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use futures::StreamExt;
use tokio::time::{Duration, timeout};
use super::*;
#[tokio::test]
async fn single_message_mode_sends_message_then_quit() {
let repl = ReplChannel::with_message("hi".to_string());
let mut stream = repl.start().await.expect("repl start should succeed");
let first = timeout(Duration::from_secs(1), stream.next())
.await
.expect("timed out waiting for first message")
.expect("first message missing");
assert_eq!(first.channel, "repl");
assert_eq!(first.content, "hi");
assert!(
timeout(Duration::from_millis(100), stream.next())
.await
.is_err(),
"single-message mode should wait for the turn to finish before quitting"
);
repl.respond(&first, OutgoingResponse::text("done"))
.await
.expect("respond should succeed");
let second = timeout(Duration::from_secs(1), stream.next())
.await
.expect("timed out waiting for quit message")
.expect("quit message missing");
assert_eq!(second.channel, "repl");
assert_eq!(second.content, "/quit");
assert!(
timeout(Duration::from_secs(1), stream.next())
.await
.expect("timed out waiting for stream to close")
.is_none(),
"stream should end after /quit"
);
}
}