Skip to main content

oven_cli/cli/
look.rs

1use std::{
2    fmt::Write as _,
3    path::{Path, PathBuf},
4};
5
6use anyhow::{Context, Result};
7use tokio::io::{AsyncBufReadExt, BufReader};
8use tokio_util::sync::CancellationToken;
9
10use super::{GlobalOpts, LookArgs};
11use crate::db::{self, AgentRun, ReviewFinding, Run};
12
13pub async fn run(args: LookArgs, _global: &GlobalOpts) -> Result<()> {
14    let project_dir = std::env::current_dir().context("getting current directory")?;
15
16    if args.stream {
17        return show_stream(&project_dir, args.agent.as_deref());
18    }
19
20    let logs_root = project_dir.join(".oven").join("logs");
21
22    let log_dir = if let Some(ref run_id) = args.run_id {
23        let dir = logs_root.join(run_id);
24        if !dir.exists() {
25            anyhow::bail!("no log directory found for run {run_id}");
26        }
27        dir
28    } else {
29        find_latest_log_dir(&logs_root)?.context("no log directories found in .oven/logs/")?
30    };
31
32    let log_file = log_dir.join("pipeline.log");
33    if !log_file.exists() {
34        anyhow::bail!("no pipeline.log found in {}", log_dir.display());
35    }
36
37    let is_active = is_oven_running(&project_dir);
38
39    if is_active {
40        tail_log(&log_file, args.agent.as_deref()).await?;
41    } else {
42        dump_log(&log_file, args.agent.as_deref()).await?;
43    }
44
45    Ok(())
46}
47
48/// Query the database and display agent progress for active (or recent) runs.
49fn show_stream(project_dir: &Path, agent_filter: Option<&str>) -> Result<()> {
50    let db_path = project_dir.join(".oven").join("oven.db");
51    if !db_path.exists() {
52        anyhow::bail!("no database found at {}", db_path.display());
53    }
54    let conn = db::open(&db_path)?;
55
56    let mut runs = db::runs::get_active_runs(&conn)?;
57    if runs.is_empty() {
58        // Fall back to the most recent run
59        if let Some(latest) = db::runs::get_latest_run(&conn)? {
60            runs.push(latest);
61        } else {
62            println!("no runs found");
63            return Ok(());
64        }
65    }
66
67    for run in &runs {
68        let agents = db::agent_runs::get_agent_runs_for_run(&conn, &run.id)?;
69        let findings = collect_run_findings(&conn, &agents)?;
70        print_run_status(run, &agents, &findings, agent_filter);
71    }
72
73    Ok(())
74}
75
76/// Collect unresolved findings across all reviewer agent runs for a pipeline run.
77fn collect_run_findings(
78    conn: &rusqlite::Connection,
79    agents: &[AgentRun],
80) -> Result<Vec<ReviewFinding>> {
81    let mut findings = Vec::new();
82    for ar in agents {
83        if ar.agent == "reviewer" {
84            let mut f = db::agent_runs::get_findings_for_agent_run(conn, ar.id)?;
85            findings.append(&mut f);
86        }
87    }
88    Ok(findings)
89}
90
91fn print_run_status(
92    run: &Run,
93    agents: &[AgentRun],
94    findings: &[ReviewFinding],
95    agent_filter: Option<&str>,
96) {
97    let branch = run.branch.as_deref().unwrap_or("--");
98    let pr = run.pr_number.map_or_else(|| "--".to_string(), |n| format!("#{n}"));
99    println!(
100        "issue #{:<6} {} {:>14}  ${:.2}  {}",
101        run.issue_number, pr, run.status, run.cost_usd, branch,
102    );
103
104    for ar in agents {
105        if let Some(filter) = agent_filter {
106            if ar.agent != filter {
107                continue;
108            }
109        }
110        let status_icon = match ar.status.as_str() {
111            "complete" => "done",
112            "running" => "...",
113            "failed" => "FAIL",
114            _ => &ar.status,
115        };
116        let summary =
117            ar.output_summary.as_deref().map(|s| truncate_line(s, 80)).unwrap_or_default();
118        println!(
119            "  {:<14} cycle {:<2} {:<6} {:>3} turns  ${:.2}  {}",
120            ar.agent, ar.cycle, status_icon, ar.turns, ar.cost_usd, summary,
121        );
122    }
123
124    let unresolved: Vec<_> = findings.iter().filter(|f| !f.resolved).collect();
125    if !unresolved.is_empty() {
126        let mut buf = String::new();
127        let _ = writeln!(buf, "  findings ({} unresolved):", unresolved.len());
128        for f in &unresolved {
129            let loc = match (&f.file_path, f.line_number) {
130                (Some(path), Some(line)) => format!(" {path}:{line}"),
131                (Some(path), None) => format!(" {path}"),
132                _ => String::new(),
133            };
134            let _ = writeln!(buf, "    {}/{}{} -- {}", f.severity, f.category, loc, f.message);
135        }
136        print!("{buf}");
137    }
138
139    println!();
140}
141
142/// Truncate a string to a single line of at most `max` bytes, appending "..." if truncated.
143/// Always cuts at a valid UTF-8 character boundary to avoid panics on multi-byte input.
144fn truncate_line(s: &str, max: usize) -> String {
145    let line = s.lines().next().unwrap_or("");
146    if line.len() <= max {
147        line.to_string()
148    } else {
149        let mut end = max.saturating_sub(3);
150        while end > 0 && !line.is_char_boundary(end) {
151            end -= 1;
152        }
153        format!("{}...", &line[..end])
154    }
155}
156
157/// Find the most recently modified log directory in `.oven/logs/`.
158fn find_latest_log_dir(logs_root: &Path) -> Result<Option<PathBuf>> {
159    let entries = match std::fs::read_dir(logs_root) {
160        Ok(e) => e,
161        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
162        Err(e) => return Err(e).context("reading log directory"),
163    };
164
165    let mut dirs: Vec<_> = entries
166        .filter_map(std::result::Result::ok)
167        .filter(|e| e.file_type().ok().is_some_and(|t| t.is_dir()))
168        .collect();
169
170    dirs.sort_by(|a, b| {
171        let ma = a.metadata().ok().and_then(|m| m.modified().ok());
172        let mb = b.metadata().ok().and_then(|m| m.modified().ok());
173        mb.cmp(&ma)
174    });
175
176    Ok(dirs.first().map(std::fs::DirEntry::path))
177}
178
179/// Check whether an oven process is currently running via PID file.
180fn is_oven_running(project_dir: &Path) -> bool {
181    let pid_path = project_dir.join(".oven").join("oven.pid");
182    let Ok(contents) = std::fs::read_to_string(&pid_path) else {
183        return false;
184    };
185    let Ok(pid) = contents.trim().parse::<u32>() else {
186        return false;
187    };
188    std::process::Command::new("kill")
189        .args(["-0", &pid.to_string()])
190        .status()
191        .is_ok_and(|s| s.success())
192}
193
194async fn dump_log(path: &Path, agent_filter: Option<&str>) -> Result<()> {
195    let file = tokio::fs::File::open(path).await.context("reading log file")?;
196    let reader = BufReader::new(file);
197    let mut lines = reader.lines();
198
199    while let Some(line) = lines.next_line().await.context("reading log line")? {
200        if let Some(formatted) = format_log_line(&line, agent_filter) {
201            println!("{formatted}");
202        }
203    }
204
205    Ok(())
206}
207
208async fn tail_log(path: &Path, agent_filter: Option<&str>) -> Result<()> {
209    let cancel = CancellationToken::new();
210    let cancel_for_signal = cancel.clone();
211
212    tokio::spawn(async move {
213        if tokio::signal::ctrl_c().await.is_ok() {
214            cancel_for_signal.cancel();
215        }
216    });
217
218    let file = tokio::fs::File::open(path).await.context("opening log file")?;
219    let mut reader = BufReader::new(file);
220    let mut line = String::new();
221
222    loop {
223        tokio::select! {
224            () = cancel.cancelled() => break,
225            result = reader.read_line(&mut line) => {
226                match result {
227                    Ok(0) => {
228                        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
229                    }
230                    Ok(_) => {
231                        let trimmed = line.trim_end();
232                        if let Some(formatted) = format_log_line(trimmed, agent_filter) {
233                            println!("{formatted}");
234                        }
235                        line.clear();
236                    }
237                    Err(e) => return Err(e).context("reading log file"),
238                }
239            }
240        }
241    }
242
243    Ok(())
244}
245
246/// Messages to suppress in formatted output (noisy polling lines).
247const SUPPRESSED_MESSAGES: &[&str] = &["no actionable issues, waiting"];
248
249/// Format a JSON tracing log line into a compact human-readable string.
250///
251/// Returns `None` if the line should be suppressed (noisy polling messages)
252/// or doesn't match the agent filter.
253fn format_log_line(line: &str, agent_filter: Option<&str>) -> Option<String> {
254    let v: serde_json::Value = serde_json::from_str(line).ok()?;
255
256    let fields = v.get("fields")?;
257    let message = fields.get("message").and_then(|m| m.as_str()).unwrap_or("");
258
259    // Suppress noisy polling messages
260    if SUPPRESSED_MESSAGES.contains(&message) {
261        return None;
262    }
263
264    // Agent filter
265    if let Some(filter) = agent_filter {
266        let agent = fields.get("agent").and_then(|a| a.as_str()).unwrap_or("");
267        if !agent.is_empty() && agent != filter {
268            return None;
269        }
270    }
271
272    // Extract timestamp -- just HH:MM:SS from the ISO string
273    let timestamp = v
274        .get("timestamp")
275        .and_then(|t| t.as_str())
276        .and_then(|t| t.find('T').map(|i| &t[i + 1..]))
277        .map_or("??:??:??", |t| if t.len() > 8 { &t[..8] } else { t });
278
279    let level = v.get("level").and_then(|l| l.as_str()).unwrap_or("INFO");
280
281    let level_tag = match level {
282        "ERROR" => "ERR ",
283        "WARN" => "WARN",
284        "DEBUG" => "DBG ",
285        _ => "    ",
286    };
287
288    // Collect extra fields (skip "message" since we already have it)
289    let mut extras = Vec::new();
290    if let Some(obj) = fields.as_object() {
291        for (k, val) in obj {
292            if k == "message" {
293                continue;
294            }
295            let val_str = match val {
296                serde_json::Value::String(s) => s.clone(),
297                other => other.to_string(),
298            };
299            extras.push(format!("{k}={val_str}"));
300        }
301    }
302
303    let extra_str =
304        if extras.is_empty() { String::new() } else { format!("  {}", extras.join(" ")) };
305
306    Some(format!("{timestamp} {level_tag} {message}{extra_str}"))
307}
308
309#[cfg(test)]
310mod tests {
311    use super::*;
312    use crate::db::RunStatus;
313
314    fn json_log(msg: &str, agent: Option<&str>) -> String {
315        let agent_field = agent.map_or_else(String::new, |a| format!(r#","agent":"{a}""#));
316        format!(
317            r#"{{"timestamp":"2026-03-17T09:53:34.350926Z","level":"INFO","fields":{{"message":"{msg}"{agent_field}}},"target":"oven_cli::test"}}"#
318        )
319    }
320
321    #[test]
322    fn format_log_line_basic() {
323        let line = json_log("agent starting", Some("reviewer"));
324        let formatted = format_log_line(&line, None).unwrap();
325        assert!(formatted.contains("09:53:34"));
326        assert!(formatted.contains("agent starting"));
327        assert!(formatted.contains("agent=reviewer"));
328        // Should NOT contain full module path
329        assert!(!formatted.contains("oven_cli"));
330    }
331
332    #[test]
333    fn format_log_line_suppresses_waiting() {
334        let line = json_log("no actionable issues, waiting", None);
335        assert!(format_log_line(&line, None).is_none());
336    }
337
338    #[test]
339    fn format_log_line_agent_filter() {
340        let line = json_log("starting", Some("implementer"));
341        // Filter for reviewer should exclude implementer
342        assert!(format_log_line(&line, Some("reviewer")).is_none());
343        // Filter for implementer should include
344        assert!(format_log_line(&line, Some("implementer")).is_some());
345    }
346
347    #[test]
348    fn format_log_line_no_agent_passes_filter() {
349        // Lines without an agent field should always pass the agent filter
350        let line = json_log("pipeline started", None);
351        assert!(format_log_line(&line, Some("reviewer")).is_some());
352    }
353
354    #[test]
355    fn format_log_line_non_json_returns_none() {
356        assert!(format_log_line("not json at all", None).is_none());
357    }
358
359    #[test]
360    fn format_log_line_error_level() {
361        let line = r#"{"timestamp":"2026-03-17T10:00:00Z","level":"ERROR","fields":{"message":"pipeline failed"},"target":"test"}"#;
362        let formatted = format_log_line(line, None).unwrap();
363        assert!(formatted.contains("ERR"));
364    }
365
366    #[test]
367    fn find_latest_log_dir_missing_root_returns_none() {
368        let result = find_latest_log_dir(Path::new("/nonexistent/path/logs")).unwrap();
369        assert!(result.is_none());
370    }
371
372    #[test]
373    fn find_latest_log_dir_empty_dir() {
374        let tmp = tempfile::tempdir().unwrap();
375        let result = find_latest_log_dir(tmp.path()).unwrap();
376        assert!(result.is_none());
377    }
378
379    #[test]
380    fn find_latest_log_dir_picks_newest() {
381        let tmp = tempfile::tempdir().unwrap();
382        let dir_a = tmp.path().join("aaaa1111");
383        let dir_b = tmp.path().join("bbbb2222");
384        std::fs::create_dir(&dir_a).unwrap();
385        // Small sleep to ensure different mtimes
386        std::thread::sleep(std::time::Duration::from_millis(50));
387        std::fs::create_dir(&dir_b).unwrap();
388
389        let result = find_latest_log_dir(tmp.path()).unwrap().unwrap();
390        assert_eq!(result.file_name().unwrap(), "bbbb2222");
391    }
392
393    #[test]
394    fn is_oven_running_returns_false_when_no_pid_file() {
395        let tmp = tempfile::tempdir().unwrap();
396        assert!(!is_oven_running(tmp.path()));
397    }
398
399    #[test]
400    fn is_oven_running_returns_false_for_stale_pid() {
401        let tmp = tempfile::tempdir().unwrap();
402        let oven_dir = tmp.path().join(".oven");
403        std::fs::create_dir_all(&oven_dir).unwrap();
404        // PID 99999999 almost certainly doesn't exist
405        std::fs::write(oven_dir.join("oven.pid"), "99999999").unwrap();
406        assert!(!is_oven_running(tmp.path()));
407    }
408
409    #[test]
410    fn truncate_line_short() {
411        assert_eq!(truncate_line("hello", 10), "hello");
412    }
413
414    #[test]
415    fn truncate_line_long() {
416        let long = "a".repeat(100);
417        let result = truncate_line(&long, 20);
418        assert_eq!(result.len(), 20);
419        assert!(result.ends_with("..."));
420    }
421
422    #[test]
423    fn truncate_line_multibyte_does_not_panic() {
424        // Emoji is 4 bytes; slicing at a non-boundary would panic without the fix
425        let s = "😀😀😀😀😀😀";
426        let result = truncate_line(s, 10);
427        assert!(result.ends_with("..."));
428        assert!(result.len() <= 10);
429    }
430
431    #[test]
432    fn truncate_line_multiline_uses_first() {
433        assert_eq!(truncate_line("first\nsecond\nthird", 80), "first");
434    }
435
436    #[test]
437    fn print_run_status_formats_correctly() {
438        let run = Run {
439            id: "abc12345".to_string(),
440            issue_number: 42,
441            status: RunStatus::Reviewing,
442            pr_number: Some(10),
443            branch: Some("oven/issue-42".to_string()),
444            worktree_path: None,
445            cost_usd: 2.34,
446            auto_merge: false,
447            started_at: "2026-03-15T10:00:00".to_string(),
448            finished_at: None,
449            error_message: None,
450            complexity: "full".to_string(),
451            issue_source: "github".to_string(),
452        };
453        let agents = vec![
454            AgentRun {
455                id: 1,
456                run_id: "abc12345".to_string(),
457                agent: "implementer".to_string(),
458                cycle: 1,
459                status: "complete".to_string(),
460                cost_usd: 1.50,
461                turns: 12,
462                started_at: "2026-03-15T10:00:00".to_string(),
463                finished_at: Some("2026-03-15T10:05:00".to_string()),
464                output_summary: Some("Added auth flow".to_string()),
465                error_message: None,
466                raw_output: None,
467            },
468            AgentRun {
469                id: 2,
470                run_id: "abc12345".to_string(),
471                agent: "reviewer".to_string(),
472                cycle: 1,
473                status: "running".to_string(),
474                cost_usd: 0.84,
475                turns: 5,
476                started_at: "2026-03-15T10:05:00".to_string(),
477                finished_at: None,
478                output_summary: None,
479                error_message: None,
480                raw_output: None,
481            },
482        ];
483        // Smoke test: should not panic
484        print_run_status(&run, &agents, &[], None);
485    }
486
487    #[test]
488    fn print_run_status_with_agent_filter() {
489        let run = Run {
490            id: "abc12345".to_string(),
491            issue_number: 42,
492            status: RunStatus::Reviewing,
493            pr_number: Some(10),
494            branch: Some("oven/issue-42".to_string()),
495            worktree_path: None,
496            cost_usd: 2.34,
497            auto_merge: false,
498            started_at: "2026-03-15T10:00:00".to_string(),
499            finished_at: None,
500            error_message: None,
501            complexity: "full".to_string(),
502            issue_source: "github".to_string(),
503        };
504        let agents = vec![AgentRun {
505            id: 1,
506            run_id: "abc12345".to_string(),
507            agent: "implementer".to_string(),
508            cycle: 1,
509            status: "complete".to_string(),
510            cost_usd: 1.50,
511            turns: 12,
512            started_at: "2026-03-15T10:00:00".to_string(),
513            finished_at: Some("2026-03-15T10:05:00".to_string()),
514            output_summary: Some("ok".to_string()),
515            error_message: None,
516            raw_output: None,
517        }];
518        // Filter to reviewer (which doesn't exist) -- should not panic
519        print_run_status(&run, &agents, &[], Some("reviewer"));
520    }
521
522    #[test]
523    fn show_stream_no_database() {
524        let tmp = tempfile::tempdir().unwrap();
525        let result = show_stream(tmp.path(), None);
526        assert!(result.is_err());
527        assert!(result.unwrap_err().to_string().contains("no database"));
528    }
529
530    #[test]
531    fn show_stream_empty_database() {
532        let tmp = tempfile::tempdir().unwrap();
533        let oven_dir = tmp.path().join(".oven");
534        std::fs::create_dir_all(&oven_dir).unwrap();
535        let db_path = oven_dir.join("oven.db");
536        // Open and immediately close to create the DB with migrations applied
537        drop(db::open(&db_path).unwrap());
538
539        // Should print "no runs found" and succeed
540        show_stream(tmp.path(), None).unwrap();
541    }
542}