1use std::io::{self, Write};
8use std::sync::Arc;
9
10use std::sync::atomic::{AtomicBool, Ordering};
11
12use tokio::sync::mpsc;
13
14use crate::agent::driver::{LlmDriver, Message, StreamEvent};
15use crate::agent::memory::MemorySubstrate;
16use crate::agent::result::AgentLoopResult;
17use crate::agent::session::SessionStore;
18use crate::agent::tool::ToolRegistry;
19use crate::agent::AgentManifest;
20use crate::ansi_colors::Colorize;
21use crate::serve::context::TokenEstimator;
22
23#[derive(Debug, PartialEq)]
31enum SlashCommand {
32 Help,
33 Quit,
34 Cost,
35 Context,
36 Model,
37 Compact,
38 Clear,
39 Session,
40 Sessions,
41 Test,
42 Quality,
43 Mcp,
46 Config,
47 Review,
48 Memory,
49 Permissions,
50 Hooks,
51 Init,
52 Resume,
53 AddDir,
54 Agents,
55 Debug,
59 Rename,
60 Upgrade,
61 Unknown(String),
62}
63
64impl SlashCommand {
65 fn parse(input: &str) -> Option<Self> {
66 let trimmed = input.trim();
67 if !trimmed.starts_with('/') {
68 return None;
69 }
70 let cmd = trimmed.split_whitespace().next().unwrap_or("");
71 Some(match cmd {
72 "/help" | "/h" | "/?" => Self::Help,
73 "/quit" | "/q" | "/exit" => Self::Quit,
74 "/cost" => Self::Cost,
75 "/context" | "/ctx" => Self::Context,
76 "/model" => Self::Model,
77 "/compact" => Self::Compact,
78 "/clear" => Self::Clear,
79 "/session" => Self::Session,
80 "/sessions" => Self::Sessions,
81 "/test" => Self::Test,
82 "/quality" => Self::Quality,
83 "/mcp" => Self::Mcp,
85 "/config" | "/cfg" => Self::Config,
86 "/review" => Self::Review,
87 "/memory" => Self::Memory,
88 "/permissions" | "/perms" => Self::Permissions,
89 "/hooks" => Self::Hooks,
90 "/init" => Self::Init,
91 "/resume" => Self::Resume,
92 "/add-dir" | "/adddir" => Self::AddDir,
93 "/agents" => Self::Agents,
94 "/debug" | "/dbg" => Self::Debug,
96 "/rename" => Self::Rename,
97 "/upgrade" => Self::Upgrade,
98 other => Self::Unknown(other.to_string()),
99 })
100 }
101}
102
103const AUTO_COMPACT_THRESHOLD: f64 = 0.80;
105
106pub(super) struct ReplSession {
108 pub(super) turn_count: u32,
109 pub(super) total_input_tokens: u64,
110 pub(super) total_output_tokens: u64,
111 pub(super) total_tool_calls: u32,
112 pub(super) estimated_cost_usd: f64,
113 pub(super) store: Option<SessionStore>,
115 pub(super) context_window: usize,
117}
118
119impl ReplSession {
120 fn new(agent_name: &str, context_window: usize) -> Self {
121 let store = SessionStore::create(agent_name).ok();
122 Self {
123 turn_count: 0,
124 total_input_tokens: 0,
125 total_output_tokens: 0,
126 total_tool_calls: 0,
127 estimated_cost_usd: 0.0,
128 store,
129 context_window,
130 }
131 }
132
133 fn record_turn(&mut self, result: &AgentLoopResult, cost: f64) {
134 self.turn_count += 1;
135 self.total_input_tokens += result.usage.input_tokens;
136 self.total_output_tokens += result.usage.output_tokens;
137 self.total_tool_calls += result.tool_calls;
138 self.estimated_cost_usd += cost;
139 if let Some(ref mut store) = self.store {
141 let _ = store.record_turn();
142 }
143 }
144
145 fn persist_messages(&self, history: &[Message], prev_len: usize) {
147 if let Some(ref store) = self.store {
148 let new_msgs = &history[prev_len..];
149 if !new_msgs.is_empty() {
150 let _ = store.append_messages(new_msgs);
151 }
152 }
153 }
154
155 pub(crate) fn session_id(&self) -> Option<&str> {
156 self.store.as_ref().map(|s| s.id())
157 }
158
159 fn estimate_history_tokens(history: &[Message]) -> usize {
161 let estimator = TokenEstimator::new();
162 let chat_msgs: Vec<_> = history.iter().map(Message::to_chat_message).collect();
163 estimator.estimate_messages(&chat_msgs)
164 }
165
166 pub(super) fn context_usage(&self, history: &[Message]) -> f64 {
168 if self.context_window == 0 {
169 return 0.0;
170 }
171 Self::estimate_history_tokens(history) as f64 / self.context_window as f64
172 }
173
174 fn auto_compact_if_needed(&self, history: &mut Vec<Message>) -> bool {
177 let usage = self.context_usage(history);
178 if usage >= AUTO_COMPACT_THRESHOLD {
179 let before = history.len();
180 compact_history(history);
181 let after = history.len();
182 if after < before {
183 println!(
184 " {} Auto-compacted: {} → {} messages ({:.0}% context)",
185 "⚙".dimmed(),
186 before,
187 after,
188 self.context_usage(history) * 100.0
189 );
190 return true;
191 }
192 }
193 false
194 }
195}
196
197fn resume_or_new(
199 resume_id: Option<&str>,
200 agent_name: &str,
201 ctx_window: usize,
202) -> (ReplSession, Vec<Message>) {
203 if let Some(id) = resume_id {
204 if let Ok(store) = SessionStore::resume(id) {
205 let msgs = store.load_messages().unwrap_or_default();
206 let turns = store.manifest.turns;
207 println!(" {} Resumed {} ({turns} turns, {} msgs)", "✓".green(), id, msgs.len());
208 let s = ReplSession {
209 turn_count: turns,
210 total_input_tokens: 0,
211 total_output_tokens: 0,
212 total_tool_calls: 0,
213 estimated_cost_usd: 0.0,
214 store: Some(store),
215 context_window: ctx_window,
216 };
217 return (s, msgs);
218 }
219 println!(" {} Could not resume session: {id}", "⚠".bright_yellow());
220 }
221 let s = ReplSession::new(agent_name, ctx_window);
222 if let Some(id) = s.session_id() {
223 println!(" {} {}", "Session:".dimmed(), id.dimmed());
224 }
225 (s, Vec::new())
226}
227
228pub fn run_repl(
236 manifest: &AgentManifest,
237 driver: &dyn LlmDriver,
238 tools: &ToolRegistry,
239 memory: &dyn MemorySubstrate,
240 max_turns: u32,
241 budget_usd: f64,
242 resume_id: Option<&str>,
243) -> anyhow::Result<()> {
244 let rt = tokio::runtime::Builder::new_current_thread()
245 .enable_all()
246 .build()
247 .map_err(|e| anyhow::anyhow!("tokio runtime: {e}"))?;
248
249 print_welcome(manifest, driver);
250
251 let ctx_window = driver.context_window();
252
253 let (mut session, mut history) = resume_or_new(resume_id, &manifest.name, ctx_window);
254
255 let stdin = io::stdin();
256 let mut line_buf = String::new();
257
258 loop {
259 if session.turn_count >= max_turns {
261 println!(
262 "\n{} Max turns ({}) reached. Session complete.",
263 "⚠".bright_yellow(),
264 max_turns
265 );
266 break;
267 }
268 if session.estimated_cost_usd >= budget_usd {
269 println!(
270 "\n{} Budget (${:.2}) exhausted. Session complete.",
271 "⚠".bright_yellow(),
272 budget_usd
273 );
274 break;
275 }
276
277 let input = match read_input(&stdin, &mut line_buf, &session, budget_usd, &mut history) {
279 InputResult::Prompt(s) => s,
280 InputResult::SlashHandled => continue,
281 InputResult::Exit => break,
282 InputResult::Empty => continue,
283 };
284
285 let cancel = Arc::new(AtomicBool::new(false));
287 let cancel_clone = Arc::clone(&cancel);
288
289 rt.block_on(async {
290 let flag = cancel_clone;
291 tokio::spawn(async move {
292 if tokio::signal::ctrl_c().await.is_ok() {
293 flag.store(true, Ordering::SeqCst);
294 }
295 });
296 });
297
298 let (tx, rx) = mpsc::channel::<StreamEvent>(64);
299
300 println!();
301
302 let history_len_before = history.len();
303 let result = rt.block_on(run_turn_streaming(
304 manifest,
305 &input,
306 driver,
307 tools,
308 memory,
309 &mut history,
310 tx,
311 rx,
312 &cancel,
313 ));
314
315 match result {
316 Ok(r) => {
317 let cost = driver.estimate_cost(&r.usage);
318 session.record_turn(&r, cost);
319 session.persist_messages(&history, history_len_before);
321 session.auto_compact_if_needed(&mut history);
323 print_turn_footer(&r, cost, &session, budget_usd);
324 }
325 Err(e) => {
326 if cancel.load(Ordering::SeqCst) {
327 println!("\n{} Generation cancelled.", "⚠".bright_yellow());
328 } else {
329 println!("\n{} Error: {e}", "✗".bright_red());
330 }
331 }
332 }
333 }
334
335 print_session_summary(&session);
336 Ok(())
337}
338
339enum InputResult {
341 Prompt(String),
342 SlashHandled,
343 Exit,
344 Empty,
345}
346
347fn read_input(
349 stdin: &io::Stdin,
350 buf: &mut String,
351 session: &ReplSession,
352 budget: f64,
353 history: &mut Vec<Message>,
354) -> InputResult {
355 let cost_str = if session.estimated_cost_usd > 0.0 {
356 format!(" ${:.3}", session.estimated_cost_usd)
357 } else {
358 String::new()
359 };
360 print!(
361 "\n{}{} ",
362 format!("[{}/{}{}]", session.turn_count + 1, "?", cost_str).dimmed(),
363 " >".bright_green().bold(),
364 );
365 io::stdout().flush().ok();
366
367 buf.clear();
368 let bytes = match stdin.read_line(buf) {
369 Ok(b) => b,
370 Err(_) => return InputResult::Exit,
371 };
372 if bytes == 0 {
373 println!();
374 return InputResult::Exit;
375 }
376
377 let trimmed = buf.trim();
378 if trimmed.is_empty() {
379 return InputResult::Empty;
380 }
381
382 if let Some(shell_cmd) = super::repl_directives::parse_bang_command(trimmed) {
386 match super::repl_directives::execute_bang_command(shell_cmd) {
387 Ok((code, out)) => {
388 if !out.is_empty() {
389 println!("{out}");
390 }
391 if code != 0 {
392 eprintln!("(exit {code})");
393 }
394 }
395 Err(e) => eprintln!("! shell error: {e}"),
396 }
397 return InputResult::SlashHandled;
398 }
399
400 if let Some(cmd) = SlashCommand::parse(trimmed) {
402 handle_slash_command(&cmd, session, budget, history);
403 return match cmd {
404 SlashCommand::Quit => InputResult::Exit,
405 _ => InputResult::SlashHandled,
406 };
407 }
408
409 let mut warnings = Vec::new();
414 let expanded = super::repl_directives::expand_at_paths(trimmed, &mut warnings);
415 for w in &warnings {
416 eprintln!("⚠ @-expansion: {w}");
417 }
418
419 InputResult::Prompt(expanded)
420}
421
422fn handle_slash_command(
424 cmd: &SlashCommand,
425 session: &ReplSession,
426 budget: f64,
427 history: &mut Vec<Message>,
428) {
429 match cmd {
430 SlashCommand::Help => print_help(),
431 SlashCommand::Quit => println!("{} Goodbye.", "✓".green()),
432 SlashCommand::Cost => {
433 if session.estimated_cost_usd < 0.0001 {
435 println!(" Cost: {} (local inference)", "free".green());
436 } else {
437 println!(
438 " Cost: ${:.4} / ${:.2} ({:.1}%)",
439 session.estimated_cost_usd,
440 budget,
441 (session.estimated_cost_usd / budget * 100.0).min(100.0)
442 );
443 }
444 println!(
445 " Tokens: {} in / {} out",
446 session.total_input_tokens, session.total_output_tokens
447 );
448 println!(" Turns: {}, Tool calls: {}", session.turn_count, session.total_tool_calls);
449 }
450 SlashCommand::Context => {
451 let user_msgs = history.iter().filter(|m| matches!(m, Message::User(_))).count();
452 let asst_msgs = history.iter().filter(|m| matches!(m, Message::Assistant(_))).count();
453 let tool_msgs = history
454 .iter()
455 .filter(|m| matches!(m, Message::AssistantToolUse(_) | Message::ToolResult(_)))
456 .count();
457 let usage_pct = session.context_usage(history) * 100.0;
458 let est_tokens = ReplSession::estimate_history_tokens(history);
459 println!(
460 " History: {} messages ({} user, {} assistant, {} tool)",
461 history.len(),
462 user_msgs,
463 asst_msgs,
464 tool_msgs
465 );
466 println!(
467 " Context: ~{} / {} tokens ({:.0}%)",
468 est_tokens, session.context_window, usage_pct
469 );
470 if usage_pct >= 80.0 {
471 println!(" {} Near context limit — /compact to free space", "⚠".bright_yellow());
472 }
473 println!(" Turns: {}", session.turn_count);
474 }
475 SlashCommand::Model => {
476 println!(" Model switching not yet implemented.");
477 }
478 SlashCommand::Compact => {
479 let before = history.len();
480 compact_history(history);
481 println!(" Compacted: {} -> {} messages", before, history.len());
482 }
483 SlashCommand::Clear => {
484 history.clear();
485 print!("\x1B[2J\x1B[1;1H");
486 io::stdout().flush().ok();
487 println!(" Screen and conversation history cleared.");
488 }
489 SlashCommand::Session => {
490 if let Some(id) = session.session_id() {
491 println!(" Session: {id}");
492 println!(" Turns: {}, Messages: {}", session.turn_count, history.len());
493 } else {
494 println!(" No active session (persistence disabled).");
495 }
496 }
497 SlashCommand::Sessions => {
498 list_recent_sessions();
499 }
500 SlashCommand::Test => {
501 println!(" Running tests...");
502 let _ = io::stdout().flush();
503 run_shell_shortcut("cargo test --lib 2>&1 | tail -5");
504 }
505 SlashCommand::Quality => {
506 println!(" Running quality gate...");
507 let _ = io::stdout().flush();
508 run_shell_shortcut("cargo clippy -- -D warnings 2>&1 | tail -3 && cargo test --lib --quiet 2>&1 | tail -3");
509 }
510 SlashCommand::Mcp => {
515 println!(
516 " MCP servers are configured under {} in the AgentManifest TOML.",
517 "mcp_servers[]".bright_yellow()
518 );
519 println!(" Project-root .mcp.json loader: PMAT-CODE-MCP-JSON-LOADER-001 (P2).");
520 }
521 SlashCommand::Config => {
522 println!(
523 " Config source: {} (TOML). User-global ladder tracked in PMAT-CODE-CONFIG-LADDER-001.",
524 "AgentManifest".bright_yellow()
525 );
526 }
527 SlashCommand::Review => {
528 println!(" /review not yet implemented — tracked by PMAT-CODE-REVIEW-001.");
529 }
530 SlashCommand::Memory => {
531 println!(
532 " Use the {} tool for CRUD on project memory; /memory TUI: PMAT-CODE-MEMORY-TUI-001.",
533 "memory".bright_yellow()
534 );
535 }
536 SlashCommand::Permissions => {
537 println!(
538 " Permission modes not yet implemented — tracked by PMAT-CODE-PERMISSIONS-001."
539 );
540 }
541 SlashCommand::Hooks => {
542 println!(" Hooks not yet implemented — tracked by PMAT-CODE-HOOKS-001.");
543 }
544 SlashCommand::Init => {
545 println!(" /init scaffold not yet implemented — tracked by PMAT-CODE-INIT-001.");
546 }
547 SlashCommand::Resume => {
548 println!(" REPL-scope /resume not yet implemented — CLI `apr code --resume [id]` works today.");
549 }
550 SlashCommand::AddDir => {
551 println!(" /add-dir not yet implemented — tracked by PMAT-CODE-ADDDIR-001.");
552 }
553 SlashCommand::Agents => {
554 println!(
555 " Custom agents not yet implemented — tracked by PMAT-CODE-CUSTOM-AGENTS-001."
556 );
557 }
558 SlashCommand::Debug => {
563 println!(
564 " /debug not yet implemented — interactive trace inspection \
565 tracked by PMAT-CODE-SLASH-DEBUG-001 (P2). \
566 Use `apr trace --json --payload` for non-interactive tracing."
567 );
568 }
569 SlashCommand::Rename => {
570 println!(
571 " /rename not yet implemented — session rename \
572 tracked by PMAT-CODE-SLASH-RENAME-001 (P2). \
573 Sessions are currently identified by their UUIDv7 id under \
574 ~/.apr/sessions/<id>/."
575 );
576 }
577 SlashCommand::Upgrade => {
578 println!(
579 " /upgrade not yet implemented — version-check + self-upgrade \
580 tracked by PMAT-CODE-SLASH-UPGRADE-001 (P2). \
581 Run `cargo install aprender --force` for now."
582 );
583 }
584 SlashCommand::Unknown(name) => {
585 println!(" {} Unknown command: {name}. Type /help for commands.", "?".bright_yellow());
586 }
587 }
588}
589
590#[allow(clippy::too_many_arguments)]
592async fn run_turn_streaming(
593 manifest: &AgentManifest,
594 prompt: &str,
595 driver: &dyn LlmDriver,
596 tools: &ToolRegistry,
597 memory: &dyn MemorySubstrate,
598 history: &mut Vec<Message>,
599 tx: mpsc::Sender<StreamEvent>,
600 mut rx: mpsc::Receiver<StreamEvent>,
601 cancel: &Arc<AtomicBool>,
602) -> Result<AgentLoopResult, crate::agent::result::AgentError> {
603 let drain = tokio::spawn(async move {
605 while let Some(event) = rx.recv().await {
606 print_stream_event_repl(&event);
607 }
608 });
609
610 let result = crate::agent::runtime::run_agent_turn(
611 manifest,
612 history,
613 prompt,
614 driver,
615 tools,
616 memory,
617 Some(tx),
618 )
619 .await;
620
621 if cancel.load(Ordering::SeqCst) && result.is_err() {
623 return Err(crate::agent::result::AgentError::CircuitBreak("cancelled by user".into()));
624 }
625
626 let _ = drain.await;
628 result
629}
630
631use super::repl_display::{
633 compact_history, list_recent_sessions, print_help, print_session_summary,
634 print_stream_event_repl, print_turn_footer, print_welcome, run_shell_shortcut,
635};
636
637#[cfg(test)]
638#[path = "repl_tests.rs"]
639mod tests;