Skip to main content

batuta/agent/
repl.rs

1//! Interactive REPL for `apr code`.
2//!
3//! Terminal UI: user types prompts, agent streams responses token-by-token.
4//! Crossterm raw mode input, tokio agent loop, mpsc streaming events.
5//! See: apr-code.md §3, agent-and-playbook.md §7
6
7use std::io::{self, Write};
8use std::sync::Arc;
9
10use std::sync::atomic::{AtomicBool, Ordering};
11
12use tokio::sync::mpsc;
13
14use crate::agent::driver::{LlmDriver, Message, StreamEvent};
15use crate::agent::memory::MemorySubstrate;
16use crate::agent::result::AgentLoopResult;
17use crate::agent::session::SessionStore;
18use crate::agent::tool::ToolRegistry;
19use crate::agent::AgentManifest;
20use crate::ansi_colors::Colorize;
21use crate::serve::context::TokenEstimator;
22
23/// Slash commands recognized by the REPL.
24///
25/// PMAT-CODE-SLASH-PARITY-001: expanded from 11 → 21 variants to mirror
26/// Claude Code's built-in slash command surface. Variants marked STUB
27/// print a placeholder pointing to the closure ticket; the parser
28/// recognizes them so `/help` can advertise them and the user sees a
29/// deliberate "not yet implemented" message rather than `Unknown`.
30#[derive(Debug, PartialEq)]
31enum SlashCommand {
32    Help,
33    Quit,
34    Cost,
35    Context,
36    Model,
37    Compact,
38    Clear,
39    Session,
40    Sessions,
41    Test,
42    Quality,
43    // PMAT-CODE-SLASH-PARITY-001: 10 new variants (Claude-Code parity).
44    // Semantics per row documented in the handler match arms below.
45    Mcp,
46    Config,
47    Review,
48    Memory,
49    Permissions,
50    Hooks,
51    Init,
52    Resume,
53    AddDir,
54    Agents,
55    // PMAT-CODE-SLASH-EXTENDED-001: final 3 variants from Claude Code's
56    // built-in surface. Stubs that print a deliberate placeholder so
57    // operators see "not yet wired" rather than `Unknown`.
58    Debug,
59    Rename,
60    Upgrade,
61    Unknown(String),
62}
63
64impl SlashCommand {
65    fn parse(input: &str) -> Option<Self> {
66        let trimmed = input.trim();
67        if !trimmed.starts_with('/') {
68            return None;
69        }
70        let cmd = trimmed.split_whitespace().next().unwrap_or("");
71        Some(match cmd {
72            "/help" | "/h" | "/?" => Self::Help,
73            "/quit" | "/q" | "/exit" => Self::Quit,
74            "/cost" => Self::Cost,
75            "/context" | "/ctx" => Self::Context,
76            "/model" => Self::Model,
77            "/compact" => Self::Compact,
78            "/clear" => Self::Clear,
79            "/session" => Self::Session,
80            "/sessions" => Self::Sessions,
81            "/test" => Self::Test,
82            "/quality" => Self::Quality,
83            // PMAT-CODE-SLASH-PARITY-001
84            "/mcp" => Self::Mcp,
85            "/config" | "/cfg" => Self::Config,
86            "/review" => Self::Review,
87            "/memory" => Self::Memory,
88            "/permissions" | "/perms" => Self::Permissions,
89            "/hooks" => Self::Hooks,
90            "/init" => Self::Init,
91            "/resume" => Self::Resume,
92            "/add-dir" | "/adddir" => Self::AddDir,
93            "/agents" => Self::Agents,
94            // PMAT-CODE-SLASH-EXTENDED-001
95            "/debug" | "/dbg" => Self::Debug,
96            "/rename" => Self::Rename,
97            "/upgrade" => Self::Upgrade,
98            other => Self::Unknown(other.to_string()),
99        })
100    }
101}
102
103/// Auto-compaction threshold (80% of context window). See apr-code.md §7.3.
104const AUTO_COMPACT_THRESHOLD: f64 = 0.80;
105
106/// Session state tracked across turns.
107pub(super) struct ReplSession {
108    pub(super) turn_count: u32,
109    pub(super) total_input_tokens: u64,
110    pub(super) total_output_tokens: u64,
111    pub(super) total_tool_calls: u32,
112    pub(super) estimated_cost_usd: f64,
113    /// Persistent session store (JSONL). None if persistence failed to init.
114    pub(super) store: Option<SessionStore>,
115    /// Context window size in tokens (from driver).
116    pub(super) context_window: usize,
117}
118
119impl ReplSession {
120    fn new(agent_name: &str, context_window: usize) -> Self {
121        let store = SessionStore::create(agent_name).ok();
122        Self {
123            turn_count: 0,
124            total_input_tokens: 0,
125            total_output_tokens: 0,
126            total_tool_calls: 0,
127            estimated_cost_usd: 0.0,
128            store,
129            context_window,
130        }
131    }
132
133    fn record_turn(&mut self, result: &AgentLoopResult, cost: f64) {
134        self.turn_count += 1;
135        self.total_input_tokens += result.usage.input_tokens;
136        self.total_output_tokens += result.usage.output_tokens;
137        self.total_tool_calls += result.tool_calls;
138        self.estimated_cost_usd += cost;
139        // Persist turn count
140        if let Some(ref mut store) = self.store {
141            let _ = store.record_turn();
142        }
143    }
144
145    /// Persist new messages from this turn to JSONL.
146    fn persist_messages(&self, history: &[Message], prev_len: usize) {
147        if let Some(ref store) = self.store {
148            let new_msgs = &history[prev_len..];
149            if !new_msgs.is_empty() {
150                let _ = store.append_messages(new_msgs);
151            }
152        }
153    }
154
155    pub(crate) fn session_id(&self) -> Option<&str> {
156        self.store.as_ref().map(|s| s.id())
157    }
158
159    /// Estimate total tokens used by conversation history.
160    fn estimate_history_tokens(history: &[Message]) -> usize {
161        let estimator = TokenEstimator::new();
162        let chat_msgs: Vec<_> = history.iter().map(Message::to_chat_message).collect();
163        estimator.estimate_messages(&chat_msgs)
164    }
165
166    /// Context usage as fraction (0.0–1.0).
167    pub(super) fn context_usage(&self, history: &[Message]) -> f64 {
168        if self.context_window == 0 {
169            return 0.0;
170        }
171        Self::estimate_history_tokens(history) as f64 / self.context_window as f64
172    }
173
174    /// Auto-compact if history exceeds 80% of context window.
175    /// Returns true if compaction was triggered.
176    fn auto_compact_if_needed(&self, history: &mut Vec<Message>) -> bool {
177        let usage = self.context_usage(history);
178        if usage >= AUTO_COMPACT_THRESHOLD {
179            let before = history.len();
180            compact_history(history);
181            let after = history.len();
182            if after < before {
183                println!(
184                    "  {} Auto-compacted: {} → {} messages ({:.0}% context)",
185                    "⚙".dimmed(),
186                    before,
187                    after,
188                    self.context_usage(history) * 100.0
189                );
190                return true;
191            }
192        }
193        false
194    }
195}
196
197/// Resume an existing session or create a new one.
198fn resume_or_new(
199    resume_id: Option<&str>,
200    agent_name: &str,
201    ctx_window: usize,
202) -> (ReplSession, Vec<Message>) {
203    if let Some(id) = resume_id {
204        if let Ok(store) = SessionStore::resume(id) {
205            let msgs = store.load_messages().unwrap_or_default();
206            let turns = store.manifest.turns;
207            println!("  {} Resumed {} ({turns} turns, {} msgs)", "✓".green(), id, msgs.len());
208            let s = ReplSession {
209                turn_count: turns,
210                total_input_tokens: 0,
211                total_output_tokens: 0,
212                total_tool_calls: 0,
213                estimated_cost_usd: 0.0,
214                store: Some(store),
215                context_window: ctx_window,
216            };
217            return (s, msgs);
218        }
219        println!("  {} Could not resume session: {id}", "⚠".bright_yellow());
220    }
221    let s = ReplSession::new(agent_name, ctx_window);
222    if let Some(id) = s.session_id() {
223        println!("  {} {}", "Session:".dimmed(), id.dimmed());
224    }
225    (s, Vec::new())
226}
227
228/// Run the interactive REPL.
229///
230/// This is the main entry point for `apr code` interactive mode.
231/// Returns when the user types `/quit` or Ctrl+D.
232///
233/// If `resume_id` is provided, loads conversation history from
234/// the corresponding session in `~/.apr/sessions/`.
235pub fn run_repl(
236    manifest: &AgentManifest,
237    driver: &dyn LlmDriver,
238    tools: &ToolRegistry,
239    memory: &dyn MemorySubstrate,
240    max_turns: u32,
241    budget_usd: f64,
242    resume_id: Option<&str>,
243) -> anyhow::Result<()> {
244    let rt = tokio::runtime::Builder::new_current_thread()
245        .enable_all()
246        .build()
247        .map_err(|e| anyhow::anyhow!("tokio runtime: {e}"))?;
248
249    print_welcome(manifest, driver);
250
251    let ctx_window = driver.context_window();
252
253    let (mut session, mut history) = resume_or_new(resume_id, &manifest.name, ctx_window);
254
255    let stdin = io::stdin();
256    let mut line_buf = String::new();
257
258    loop {
259        // Check turn budget
260        if session.turn_count >= max_turns {
261            println!(
262                "\n{} Max turns ({}) reached. Session complete.",
263                "⚠".bright_yellow(),
264                max_turns
265            );
266            break;
267        }
268        if session.estimated_cost_usd >= budget_usd {
269            println!(
270                "\n{} Budget (${:.2}) exhausted. Session complete.",
271                "⚠".bright_yellow(),
272                budget_usd
273            );
274            break;
275        }
276
277        // Read input
278        let input = match read_input(&stdin, &mut line_buf, &session, budget_usd, &mut history) {
279            InputResult::Prompt(s) => s,
280            InputResult::SlashHandled => continue,
281            InputResult::Exit => break,
282            InputResult::Empty => continue,
283        };
284
285        // Execute turn with streaming
286        let cancel = Arc::new(AtomicBool::new(false));
287        let cancel_clone = Arc::clone(&cancel);
288
289        rt.block_on(async {
290            let flag = cancel_clone;
291            tokio::spawn(async move {
292                if tokio::signal::ctrl_c().await.is_ok() {
293                    flag.store(true, Ordering::SeqCst);
294                }
295            });
296        });
297
298        let (tx, rx) = mpsc::channel::<StreamEvent>(64);
299
300        println!();
301
302        let history_len_before = history.len();
303        let result = rt.block_on(run_turn_streaming(
304            manifest,
305            &input,
306            driver,
307            tools,
308            memory,
309            &mut history,
310            tx,
311            rx,
312            &cancel,
313        ));
314
315        match result {
316            Ok(r) => {
317                let cost = driver.estimate_cost(&r.usage);
318                session.record_turn(&r, cost);
319                // Persist new messages to JSONL
320                session.persist_messages(&history, history_len_before);
321                // Auto-compact at 80% context window (spec §7.3)
322                session.auto_compact_if_needed(&mut history);
323                print_turn_footer(&r, cost, &session, budget_usd);
324            }
325            Err(e) => {
326                if cancel.load(Ordering::SeqCst) {
327                    println!("\n{} Generation cancelled.", "⚠".bright_yellow());
328                } else {
329                    println!("\n{} Error: {e}", "✗".bright_red());
330                }
331            }
332        }
333    }
334
335    print_session_summary(&session);
336    Ok(())
337}
338
339/// Input reading result.
340enum InputResult {
341    Prompt(String),
342    SlashHandled,
343    Exit,
344    Empty,
345}
346
347/// Read one line of input, handling slash commands inline.
348fn read_input(
349    stdin: &io::Stdin,
350    buf: &mut String,
351    session: &ReplSession,
352    budget: f64,
353    history: &mut Vec<Message>,
354) -> InputResult {
355    let cost_str = if session.estimated_cost_usd > 0.0 {
356        format!(" ${:.3}", session.estimated_cost_usd)
357    } else {
358        String::new()
359    };
360    print!(
361        "\n{}{} ",
362        format!("[{}/{}{}]", session.turn_count + 1, "?", cost_str).dimmed(),
363        " >".bright_green().bold(),
364    );
365    io::stdout().flush().ok();
366
367    buf.clear();
368    let bytes = match stdin.read_line(buf) {
369        Ok(b) => b,
370        Err(_) => return InputResult::Exit,
371    };
372    if bytes == 0 {
373        println!();
374        return InputResult::Exit;
375    }
376
377    let trimmed = buf.trim();
378    if trimmed.is_empty() {
379        return InputResult::Empty;
380    }
381
382    // PMAT-CODE-REPL-PHASE2-001: handle `!<cmd>` shell directive
383    // BEFORE slash-command parsing — `!` is a peer to `/` in the
384    // Claude-Code interactive surface.
385    if let Some(shell_cmd) = super::repl_directives::parse_bang_command(trimmed) {
386        match super::repl_directives::execute_bang_command(shell_cmd) {
387            Ok((code, out)) => {
388                if !out.is_empty() {
389                    println!("{out}");
390                }
391                if code != 0 {
392                    eprintln!("(exit {code})");
393                }
394            }
395            Err(e) => eprintln!("! shell error: {e}"),
396        }
397        return InputResult::SlashHandled;
398    }
399
400    // Handle slash commands
401    if let Some(cmd) = SlashCommand::parse(trimmed) {
402        handle_slash_command(&cmd, session, budget, history);
403        return match cmd {
404            SlashCommand::Quit => InputResult::Exit,
405            _ => InputResult::SlashHandled,
406        };
407    }
408
409    // PMAT-CODE-REPL-PHASE2-001: expand `@<path>` tokens inline
410    // before handing the prompt to the agent. Missing files print a
411    // stderr warning and leave the token verbatim (Poka-Yoke — no
412    // silent partial expansion).
413    let mut warnings = Vec::new();
414    let expanded = super::repl_directives::expand_at_paths(trimmed, &mut warnings);
415    for w in &warnings {
416        eprintln!("⚠ @-expansion: {w}");
417    }
418
419    InputResult::Prompt(expanded)
420}
421
422/// Handle a slash command.
423fn handle_slash_command(
424    cmd: &SlashCommand,
425    session: &ReplSession,
426    budget: f64,
427    history: &mut Vec<Message>,
428) {
429    match cmd {
430        SlashCommand::Help => print_help(),
431        SlashCommand::Quit => println!("{} Goodbye.", "✓".green()),
432        SlashCommand::Cost => {
433            // PMAT-169: local inference is free — show tokens, not misleading dollars
434            if session.estimated_cost_usd < 0.0001 {
435                println!("  Cost: {} (local inference)", "free".green());
436            } else {
437                println!(
438                    "  Cost: ${:.4} / ${:.2} ({:.1}%)",
439                    session.estimated_cost_usd,
440                    budget,
441                    (session.estimated_cost_usd / budget * 100.0).min(100.0)
442                );
443            }
444            println!(
445                "  Tokens: {} in / {} out",
446                session.total_input_tokens, session.total_output_tokens
447            );
448            println!("  Turns: {}, Tool calls: {}", session.turn_count, session.total_tool_calls);
449        }
450        SlashCommand::Context => {
451            let user_msgs = history.iter().filter(|m| matches!(m, Message::User(_))).count();
452            let asst_msgs = history.iter().filter(|m| matches!(m, Message::Assistant(_))).count();
453            let tool_msgs = history
454                .iter()
455                .filter(|m| matches!(m, Message::AssistantToolUse(_) | Message::ToolResult(_)))
456                .count();
457            let usage_pct = session.context_usage(history) * 100.0;
458            let est_tokens = ReplSession::estimate_history_tokens(history);
459            println!(
460                "  History: {} messages ({} user, {} assistant, {} tool)",
461                history.len(),
462                user_msgs,
463                asst_msgs,
464                tool_msgs
465            );
466            println!(
467                "  Context: ~{} / {} tokens ({:.0}%)",
468                est_tokens, session.context_window, usage_pct
469            );
470            if usage_pct >= 80.0 {
471                println!("  {} Near context limit — /compact to free space", "⚠".bright_yellow());
472            }
473            println!("  Turns: {}", session.turn_count);
474        }
475        SlashCommand::Model => {
476            println!("  Model switching not yet implemented.");
477        }
478        SlashCommand::Compact => {
479            let before = history.len();
480            compact_history(history);
481            println!("  Compacted: {} -> {} messages", before, history.len());
482        }
483        SlashCommand::Clear => {
484            history.clear();
485            print!("\x1B[2J\x1B[1;1H");
486            io::stdout().flush().ok();
487            println!("  Screen and conversation history cleared.");
488        }
489        SlashCommand::Session => {
490            if let Some(id) = session.session_id() {
491                println!("  Session: {id}");
492                println!("  Turns: {}, Messages: {}", session.turn_count, history.len());
493            } else {
494                println!("  No active session (persistence disabled).");
495            }
496        }
497        SlashCommand::Sessions => {
498            list_recent_sessions();
499        }
500        SlashCommand::Test => {
501            println!("  Running tests...");
502            let _ = io::stdout().flush();
503            run_shell_shortcut("cargo test --lib 2>&1 | tail -5");
504        }
505        SlashCommand::Quality => {
506            println!("  Running quality gate...");
507            let _ = io::stdout().flush();
508            run_shell_shortcut("cargo clippy -- -D warnings 2>&1 | tail -3 && cargo test --lib --quiet 2>&1 | tail -3");
509        }
510        // PMAT-CODE-SLASH-PARITY-001: 10 new Claude-Code-parity variants.
511        // Kept as minimal stubs so the parser + /help advertise them; each
512        // points to its closure ticket so users see a deliberate message
513        // instead of "Unknown command".
514        SlashCommand::Mcp => {
515            println!(
516                "  MCP servers are configured under {} in the AgentManifest TOML.",
517                "mcp_servers[]".bright_yellow()
518            );
519            println!("  Project-root .mcp.json loader: PMAT-CODE-MCP-JSON-LOADER-001 (P2).");
520        }
521        SlashCommand::Config => {
522            println!(
523                "  Config source: {} (TOML). User-global ladder tracked in PMAT-CODE-CONFIG-LADDER-001.",
524                "AgentManifest".bright_yellow()
525            );
526        }
527        SlashCommand::Review => {
528            println!("  /review not yet implemented — tracked by PMAT-CODE-REVIEW-001.");
529        }
530        SlashCommand::Memory => {
531            println!(
532                "  Use the {} tool for CRUD on project memory; /memory TUI: PMAT-CODE-MEMORY-TUI-001.",
533                "memory".bright_yellow()
534            );
535        }
536        SlashCommand::Permissions => {
537            println!(
538                "  Permission modes not yet implemented — tracked by PMAT-CODE-PERMISSIONS-001."
539            );
540        }
541        SlashCommand::Hooks => {
542            println!("  Hooks not yet implemented — tracked by PMAT-CODE-HOOKS-001.");
543        }
544        SlashCommand::Init => {
545            println!("  /init scaffold not yet implemented — tracked by PMAT-CODE-INIT-001.");
546        }
547        SlashCommand::Resume => {
548            println!("  REPL-scope /resume not yet implemented — CLI `apr code --resume [id]` works today.");
549        }
550        SlashCommand::AddDir => {
551            println!("  /add-dir not yet implemented — tracked by PMAT-CODE-ADDDIR-001.");
552        }
553        SlashCommand::Agents => {
554            println!(
555                "  Custom agents not yet implemented — tracked by PMAT-CODE-CUSTOM-AGENTS-001."
556            );
557        }
558        // PMAT-CODE-SLASH-EXTENDED-001: stub handlers for the final 3
559        // Claude-Code-parity variants. Each prints a deliberate placeholder
560        // so the operator sees "recognized, not yet wired" instead of
561        // "Unknown command" — same pattern the 2026-04-18 batch used.
562        SlashCommand::Debug => {
563            println!(
564                "  /debug not yet implemented — interactive trace inspection \
565                 tracked by PMAT-CODE-SLASH-DEBUG-001 (P2). \
566                 Use `apr trace --json --payload` for non-interactive tracing."
567            );
568        }
569        SlashCommand::Rename => {
570            println!(
571                "  /rename not yet implemented — session rename \
572                 tracked by PMAT-CODE-SLASH-RENAME-001 (P2). \
573                 Sessions are currently identified by their UUIDv7 id under \
574                 ~/.apr/sessions/<id>/."
575            );
576        }
577        SlashCommand::Upgrade => {
578            println!(
579                "  /upgrade not yet implemented — version-check + self-upgrade \
580                 tracked by PMAT-CODE-SLASH-UPGRADE-001 (P2). \
581                 Run `cargo install aprender --force` for now."
582            );
583        }
584        SlashCommand::Unknown(name) => {
585            println!("  {} Unknown command: {name}. Type /help for commands.", "?".bright_yellow());
586        }
587    }
588}
589
590/// Execute one turn with streaming output and multi-turn history.
591#[allow(clippy::too_many_arguments)]
592async fn run_turn_streaming(
593    manifest: &AgentManifest,
594    prompt: &str,
595    driver: &dyn LlmDriver,
596    tools: &ToolRegistry,
597    memory: &dyn MemorySubstrate,
598    history: &mut Vec<Message>,
599    tx: mpsc::Sender<StreamEvent>,
600    mut rx: mpsc::Receiver<StreamEvent>,
601    cancel: &Arc<AtomicBool>,
602) -> Result<AgentLoopResult, crate::agent::result::AgentError> {
603    // Drain task: print streaming events as they arrive
604    let drain = tokio::spawn(async move {
605        while let Some(event) = rx.recv().await {
606            print_stream_event_repl(&event);
607        }
608    });
609
610    let result = crate::agent::runtime::run_agent_turn(
611        manifest,
612        history,
613        prompt,
614        driver,
615        tools,
616        memory,
617        Some(tx),
618    )
619    .await;
620
621    // If cancelled, wrap the error
622    if cancel.load(Ordering::SeqCst) && result.is_err() {
623        return Err(crate::agent::result::AgentError::CircuitBreak("cancelled by user".into()));
624    }
625
626    // Ensure drain task finishes
627    let _ = drain.await;
628    result
629}
630
631// Display functions extracted to repl_display.rs for file size compliance.
632use super::repl_display::{
633    compact_history, list_recent_sessions, print_help, print_session_summary,
634    print_stream_event_repl, print_turn_footer, print_welcome, run_shell_shortcut,
635};
636
637#[cfg(test)]
638#[path = "repl_tests.rs"]
639mod tests;