use std::io::IsTerminal;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use agent_client_protocol_schema::{ContentBlock, SessionId, StopReason, TextContent};
use crossterm::event::{Event, KeyCode, KeyEvent, KeyEventKind, KeyModifiers};
use crossterm::terminal::{disable_raw_mode, enable_raw_mode};
use defect_agent::event::AgentEvent;
use defect_agent::llm::{Message, MessageContent, Role};
use defect_agent::session::{AgentCore, TurnError};
use futures::{FutureExt, StreamExt};
use owo_colors::OwoColorize;
use tokio::io::{AsyncWriteExt, Stdout};
use tokio::sync::mpsc;
use crate::session_open::open_session;
pub async fn run(
agent: Arc<dyn AgentCore>,
cwd: PathBuf,
resume: Option<SessionId>,
) -> anyhow::Result<()> {
let mut out = tokio::io::stdout();
let session = open_session(&agent, &cwd, resume).await?;
let banner = format!(
"defect repl — {} @ {}\n\
type a prompt and hit enter; :q or Ctrl-D to quit.\n",
session.current_model(),
cwd.display(),
);
write(&mut out, &banner.dimmed().to_string()).await?;
let history = session.history_snapshot();
if !history.is_empty() {
write(
&mut out,
&format!("— resumed {} message(s) —\n", history.len())
.dimmed()
.to_string(),
)
.await?;
for message in &history {
render_history_message(&mut out, message).await?;
}
}
let mut events = session.subscribe();
let (key_tx, mut key_rx) = mpsc::channel::<KeyMsg>(64);
let _input = InputReader::spawn(key_tx);
let mut editor = LineEditor::new("› ".cyan().bold().to_string());
editor.redraw(&mut out).await?;
let mut pending: std::collections::VecDeque<String> = std::collections::VecDeque::new();
'session: loop {
let line = if let Some(queued) = pending.pop_front() {
editor.echo_submitted(&mut out, &queued).await?; queued
} else {
let mut submitted: Option<String> = None;
while submitted.is_none() {
tokio::select! {
key = key_rx.recv() => match key {
Some(KeyMsg::Line(text)) => submitted = Some(text), Some(KeyMsg::Edit(key)) => submitted = editor.on_key(key, &mut out).await?,
Some(KeyMsg::Interrupt) => editor.clear_line(&mut out).await?, Some(KeyMsg::Eof) | None => break 'session, },
ev = events.next() => {
if let Some(ev) = ev {
editor.render_event(&mut out, ev).await?;
}
}
}
}
submitted.expect("loop exits only when submitted is Some")
};
let prompt_text = line.trim();
if prompt_text.is_empty() {
editor.redraw(&mut out).await?;
continue;
}
if matches!(prompt_text, ":q" | ":quit" | ":exit") {
break;
}
let (stop, queued) = run_user_turn(
session.as_ref(),
prompt_text.to_owned(),
&mut events,
&mut key_rx,
&mut editor,
&mut out,
)
.await?;
pending.extend(queued);
match stop {
Ok(_) => {
editor.ensure_idle(&mut out).await?;
}
Err(e) => {
editor
.print_error(&mut out, &format!("{} {e}", "turn error:".red().bold()))
.await?;
}
}
}
write(&mut out, &"\r\nbye.\r\n".dimmed().to_string()).await?;
Ok(())
}
async fn run_user_turn(
session: &dyn defect_agent::session::Session,
prompt_text: String,
events: &mut defect_agent::session::EventStream,
key_rx: &mut mpsc::Receiver<KeyMsg>,
editor: &mut LineEditor,
out: &mut Stdout,
) -> anyhow::Result<(Result<StopReason, TurnError>, Vec<String>)> {
const MAX_RETRIES: u32 = 100;
const BACKOFF: Duration = Duration::from_millis(20);
let mut queued: Vec<String> = Vec::new();
let mut keys_open = true;
let mut attempt = 0u32;
let result = loop {
let prompt_blocks = vec![ContentBlock::Text(TextContent::new(prompt_text.clone()))];
let turn = session.run_turn(prompt_blocks);
tokio::pin!(turn);
let result = loop {
tokio::select! {
biased;
ev = events.next() => {
if let Some(ev) = ev {
editor.render_event(out, ev).await?;
}
}
key = key_rx.recv(), if keys_open => {
match key {
None => keys_open = false,
Some(msg) => {
if let Some(line) = handle_key_during_turn(session, msg, editor, out).await? {
queued.push(line);
}
}
}
}
r = &mut turn => break r,
}
};
while let Some(Some(ev)) = events.next().now_or_never() {
editor.render_event(out, ev).await?;
}
match result {
Err(TurnError::TurnInProgress) if attempt < MAX_RETRIES => {
attempt += 1;
let sleep = tokio::time::sleep(BACKOFF);
tokio::pin!(sleep);
loop {
tokio::select! {
() = &mut sleep => break,
ev = events.next() => {
if let Some(ev) = ev {
editor.render_event(out, ev).await?;
}
}
key = key_rx.recv(), if keys_open => {
match key {
None => keys_open = false,
Some(msg) => {
if let Some(line) = handle_key_during_turn(session, msg, editor, out).await? {
queued.push(line);
}
}
}
}
}
}
}
other => break other,
}
};
Ok((result, queued))
}
async fn handle_key_during_turn(
session: &dyn defect_agent::session::Session,
msg: KeyMsg,
editor: &mut LineEditor,
out: &mut Stdout,
) -> anyhow::Result<Option<String>> {
match msg {
KeyMsg::Line(text) => Ok(Some(text)),
KeyMsg::Edit(key) => editor.on_key(key, out).await,
KeyMsg::Interrupt => {
session.cancel_turn();
editor.clear_line(out).await?;
Ok(None)
}
KeyMsg::Eof => Ok(None),
}
}
enum KeyMsg {
Edit(KeyEvent),
Line(String),
Interrupt,
Eof,
}
struct InputReader {
handle: Option<std::thread::JoinHandle<()>>,
_raw: Option<RawMode>,
}
impl InputReader {
fn spawn(tx: mpsc::Sender<KeyMsg>) -> Self {
let tty = std::io::stdin().is_terminal();
let raw = if tty { RawMode::enable().ok() } else { None };
let handle = std::thread::spawn(move || {
if tty {
read_keys_raw(&tx);
} else {
read_lines_cooked(&tx);
}
});
Self {
handle: Some(handle),
_raw: raw,
}
}
}
impl Drop for InputReader {
fn drop(&mut self) {
if let Some(h) = self.handle.take() {
drop(h);
}
}
}
fn read_keys_raw(tx: &mpsc::Sender<KeyMsg>) {
let mut len = 0usize;
loop {
let Ok(event) = crossterm::event::read() else {
let _ = tx.blocking_send(KeyMsg::Eof);
return;
};
let Event::Key(key) = event else {
continue; };
if key.kind == KeyEventKind::Release {
continue;
}
let ctrl = key.modifiers.contains(KeyModifiers::CONTROL);
let msg = match key.code {
KeyCode::Enter => {
len = 0;
KeyMsg::Edit(key)
}
KeyCode::Char('c') if ctrl => {
len = 0;
KeyMsg::Interrupt
}
KeyCode::Char('d') if ctrl && len == 0 => KeyMsg::Eof,
KeyCode::Char('d') if ctrl => continue, KeyCode::Backspace => {
len = len.saturating_sub(1);
KeyMsg::Edit(key)
}
KeyCode::Char(_) if !ctrl => {
len += 1;
KeyMsg::Edit(key)
}
_ => continue, };
if tx.blocking_send(msg).is_err() {
return; }
}
}
fn read_lines_cooked(tx: &mpsc::Sender<KeyMsg>) {
use std::io::BufRead;
let stdin = std::io::stdin();
let mut line = String::new();
loop {
line.clear();
match stdin.lock().read_line(&mut line) {
Ok(0) | Err(_) => {
let _ = tx.blocking_send(KeyMsg::Eof);
return;
}
Ok(_) => {
let trimmed = line.trim_end_matches(['\r', '\n']).to_owned();
if tx.blocking_send(KeyMsg::Line(trimmed)).is_err() {
return;
}
}
}
}
}
struct RawMode;
impl RawMode {
fn enable() -> std::io::Result<Self> {
enable_raw_mode()?;
Ok(Self)
}
}
impl Drop for RawMode {
fn drop(&mut self) {
let _ = disable_raw_mode();
}
}
struct LineEditor {
prompt: String,
buf: String,
tty: bool,
streaming: bool,
at_line_start: bool,
}
impl LineEditor {
fn new(prompt: String) -> Self {
Self {
prompt,
buf: String::new(),
tty: std::io::stdin().is_terminal(),
streaming: false,
at_line_start: true,
}
}
async fn redraw(&self, out: &mut Stdout) -> anyhow::Result<()> {
if self.tty {
write(out, &format!("\r\x1b[K{}{}", self.prompt, self.buf)).await?;
} else {
write(out, &self.prompt).await?;
}
out.flush().await?;
Ok(())
}
async fn echo_submitted(&mut self, out: &mut Stdout, line: &str) -> anyhow::Result<()> {
self.buf.clear();
if self.tty {
write(out, &format!("\r\x1b[K{}{}\r\n", self.prompt, line)).await?;
} else {
write(out, &format!("{}{}\n", self.prompt, line)).await?;
}
out.flush().await?;
Ok(())
}
async fn on_key(&mut self, key: KeyEvent, out: &mut Stdout) -> anyhow::Result<Option<String>> {
let ctrl = key.modifiers.contains(KeyModifiers::CONTROL);
match key.code {
KeyCode::Enter => {
if !self.streaming {
write(out, "\r\n").await?;
out.flush().await?;
}
return Ok(Some(std::mem::take(&mut self.buf)));
}
KeyCode::Backspace => {
let changed = self.buf.pop().is_some();
if changed && !self.streaming {
self.redraw(out).await?;
}
}
KeyCode::Char(c) if !ctrl => {
self.buf.push(c);
if !self.streaming {
self.redraw(out).await?;
}
}
_ => {}
}
Ok(None)
}
async fn clear_line(&mut self, out: &mut Stdout) -> anyhow::Result<()> {
self.buf.clear();
if !self.streaming {
self.redraw(out).await?;
}
Ok(())
}
async fn enter_streaming(&mut self, out: &mut Stdout) -> anyhow::Result<()> {
if !self.streaming {
if self.tty {
write(out, "\r\x1b[K").await?; }
self.streaming = true;
self.at_line_start = true;
}
Ok(())
}
async fn end_streaming(&mut self, out: &mut Stdout) -> anyhow::Result<()> {
if self.streaming {
if !self.at_line_start {
write(out, if self.tty { "\r\n" } else { "\n" }).await?;
}
self.streaming = false;
self.redraw(out).await?;
}
Ok(())
}
async fn render_event(&mut self, out: &mut Stdout, event: AgentEvent) -> anyhow::Result<()> {
match event {
AgentEvent::AssistantText { content } => {
if let Some(text) = block_text(&content) {
self.stream_text(out, &text).await?;
}
}
AgentEvent::AssistantThought { content } => {
if let Some(text) = block_text(&content) {
self.stream_text(out, &text.dimmed().italic().to_string())
.await?;
}
}
AgentEvent::ToolCallStarted { name, fields, .. } => {
let title = fields.title.unwrap_or(name);
self.stream_text(out, &format!("\n{} {}\n", "⚙".yellow(), title.yellow()))
.await?;
}
AgentEvent::ToolCallFinished { fields, .. } => {
if let Some(status) = fields.status {
self.stream_text(out, &format!("{} {status:?}\n", " ↳".dimmed()))
.await?;
}
}
AgentEvent::TurnEnded { .. } => {
self.end_streaming(out).await?;
}
_ => {}
}
Ok(())
}
async fn stream_text(&mut self, out: &mut Stdout, text: &str) -> anyhow::Result<()> {
if text.is_empty() {
return Ok(());
}
self.enter_streaming(out).await?;
write(out, &nl(text, self.tty)).await?;
out.flush().await?;
self.at_line_start = text.ends_with('\n');
Ok(())
}
async fn ensure_idle(&mut self, out: &mut Stdout) -> anyhow::Result<()> {
if self.streaming {
self.end_streaming(out).await?;
}
Ok(())
}
async fn print_error(&mut self, out: &mut Stdout, text: &str) -> anyhow::Result<()> {
if self.streaming && !self.at_line_start {
write(out, if self.tty { "\r\n" } else { "\n" }).await?;
}
self.streaming = false;
if self.tty {
write(out, &format!("\r\x1b[K{text}\r\n")).await?;
} else {
write(out, &format!("{text}\n")).await?;
}
self.redraw(out).await?;
Ok(())
}
}
fn nl(s: &str, tty: bool) -> String {
if tty {
s.replace('\n', "\r\n")
} else {
s.to_owned()
}
}
fn block_text(block: &ContentBlock) -> Option<String> {
match block {
ContentBlock::Text(t) => Some(t.text.clone()),
_ => None,
}
}
async fn write(out: &mut Stdout, s: &str) -> anyhow::Result<()> {
out.write_all(s.as_bytes()).await?;
Ok(())
}
async fn render_history_message(out: &mut Stdout, message: &Message) -> anyhow::Result<()> {
let prefix = match message.role {
Role::User => "user> ".cyan().bold().to_string(),
Role::Assistant => "asst> ".green().bold().to_string(),
};
for content in message.content.iter() {
match content {
MessageContent::Text { text } => {
write(out, &format!("{prefix}{text}\n")).await?;
}
MessageContent::Thinking { text, .. } => {
write(out, &format!("{prefix}{}\n", text.dimmed().italic())).await?;
}
MessageContent::ToolUse { name, .. } => {
write(out, &format!(" {} {}\n", "⚙".yellow(), name.yellow())).await?;
}
MessageContent::ToolResult { is_error, .. } => {
let label = if *is_error { "error" } else { "ok" };
write(out, &format!(" {} {label}\n", "↳".dimmed())).await?;
}
_ => {}
}
}
Ok(())
}