1use std::{
2 fmt::Write as _,
3 path::{Path, PathBuf},
4};
5
6use anyhow::{Context, Result};
7use tokio::io::{AsyncBufReadExt, BufReader};
8use tokio_util::sync::CancellationToken;
9
10use super::{GlobalOpts, LookArgs};
11use crate::db::{self, AgentRun, ReviewFinding, Run};
12
13pub async fn run(args: LookArgs, _global: &GlobalOpts) -> Result<()> {
14 let project_dir = std::env::current_dir().context("getting current directory")?;
15
16 if args.stream {
17 return show_stream(&project_dir, args.agent.as_deref());
18 }
19
20 let logs_root = project_dir.join(".oven").join("logs");
21
22 let log_dir = if let Some(ref run_id) = args.run_id {
23 let dir = logs_root.join(run_id);
24 if !dir.exists() {
25 anyhow::bail!("no log directory found for run {run_id}");
26 }
27 dir
28 } else {
29 find_latest_log_dir(&logs_root)?.context("no log directories found in .oven/logs/")?
30 };
31
32 let log_file = log_dir.join("pipeline.log");
33 if !log_file.exists() {
34 anyhow::bail!("no pipeline.log found in {}", log_dir.display());
35 }
36
37 let is_active = is_oven_running(&project_dir);
38
39 if is_active {
40 tail_log(&log_file, args.agent.as_deref()).await?;
41 } else {
42 dump_log(&log_file, args.agent.as_deref()).await?;
43 }
44
45 Ok(())
46}
47
48fn show_stream(project_dir: &Path, agent_filter: Option<&str>) -> Result<()> {
50 let db_path = project_dir.join(".oven").join("oven.db");
51 if !db_path.exists() {
52 anyhow::bail!("no database found at {}", db_path.display());
53 }
54 let conn = db::open(&db_path)?;
55
56 let mut runs = db::runs::get_active_runs(&conn)?;
57 if runs.is_empty() {
58 if let Some(latest) = db::runs::get_latest_run(&conn)? {
60 runs.push(latest);
61 } else {
62 println!("no runs found");
63 return Ok(());
64 }
65 }
66
67 for run in &runs {
68 let agents = db::agent_runs::get_agent_runs_for_run(&conn, &run.id)?;
69 let findings = collect_run_findings(&conn, &agents)?;
70 print_run_status(run, &agents, &findings, agent_filter);
71 }
72
73 Ok(())
74}
75
76fn collect_run_findings(
78 conn: &rusqlite::Connection,
79 agents: &[AgentRun],
80) -> Result<Vec<ReviewFinding>> {
81 let mut findings = Vec::new();
82 for ar in agents {
83 if ar.agent == "reviewer" {
84 let mut f = db::agent_runs::get_findings_for_agent_run(conn, ar.id)?;
85 findings.append(&mut f);
86 }
87 }
88 Ok(findings)
89}
90
91fn print_run_status(
92 run: &Run,
93 agents: &[AgentRun],
94 findings: &[ReviewFinding],
95 agent_filter: Option<&str>,
96) {
97 let branch = run.branch.as_deref().unwrap_or("--");
98 let pr = run.pr_number.map_or_else(|| "--".to_string(), |n| format!("#{n}"));
99 println!(
100 "issue #{:<6} {} {:>14} ${:.2} {}",
101 run.issue_number, pr, run.status, run.cost_usd, branch,
102 );
103
104 for ar in agents {
105 if let Some(filter) = agent_filter {
106 if ar.agent != filter {
107 continue;
108 }
109 }
110 let status_icon = match ar.status.as_str() {
111 "complete" => "done",
112 "running" => "...",
113 "failed" => "FAIL",
114 _ => &ar.status,
115 };
116 let summary =
117 ar.output_summary.as_deref().map(|s| truncate_line(s, 80)).unwrap_or_default();
118 println!(
119 " {:<14} cycle {:<2} {:<6} {:>3} turns ${:.2} {}",
120 ar.agent, ar.cycle, status_icon, ar.turns, ar.cost_usd, summary,
121 );
122 }
123
124 let unresolved: Vec<_> = findings.iter().filter(|f| !f.resolved).collect();
125 if !unresolved.is_empty() {
126 let mut buf = String::new();
127 let _ = writeln!(buf, " findings ({} unresolved):", unresolved.len());
128 for f in &unresolved {
129 let loc = match (&f.file_path, f.line_number) {
130 (Some(path), Some(line)) => format!(" {path}:{line}"),
131 (Some(path), None) => format!(" {path}"),
132 _ => String::new(),
133 };
134 let _ = writeln!(buf, " {}/{}{} -- {}", f.severity, f.category, loc, f.message);
135 }
136 print!("{buf}");
137 }
138
139 println!();
140}
141
142fn truncate_line(s: &str, max: usize) -> String {
145 let line = s.lines().next().unwrap_or("");
146 if line.len() <= max {
147 line.to_string()
148 } else {
149 let mut end = max.saturating_sub(3);
150 while end > 0 && !line.is_char_boundary(end) {
151 end -= 1;
152 }
153 format!("{}...", &line[..end])
154 }
155}
156
157fn find_latest_log_dir(logs_root: &Path) -> Result<Option<PathBuf>> {
159 let entries = match std::fs::read_dir(logs_root) {
160 Ok(e) => e,
161 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
162 Err(e) => return Err(e).context("reading log directory"),
163 };
164
165 let mut dirs: Vec<_> = entries
166 .filter_map(std::result::Result::ok)
167 .filter(|e| e.file_type().ok().is_some_and(|t| t.is_dir()))
168 .collect();
169
170 dirs.sort_by(|a, b| {
171 let ma = a.metadata().ok().and_then(|m| m.modified().ok());
172 let mb = b.metadata().ok().and_then(|m| m.modified().ok());
173 mb.cmp(&ma)
174 });
175
176 Ok(dirs.first().map(std::fs::DirEntry::path))
177}
178
179fn is_oven_running(project_dir: &Path) -> bool {
181 let pid_path = project_dir.join(".oven").join("oven.pid");
182 let Ok(contents) = std::fs::read_to_string(&pid_path) else {
183 return false;
184 };
185 let Ok(pid) = contents.trim().parse::<u32>() else {
186 return false;
187 };
188 std::process::Command::new("kill")
189 .args(["-0", &pid.to_string()])
190 .status()
191 .is_ok_and(|s| s.success())
192}
193
194async fn dump_log(path: &Path, agent_filter: Option<&str>) -> Result<()> {
195 let file = tokio::fs::File::open(path).await.context("reading log file")?;
196 let reader = BufReader::new(file);
197 let mut lines = reader.lines();
198
199 while let Some(line) = lines.next_line().await.context("reading log line")? {
200 if let Some(formatted) = format_log_line(&line, agent_filter) {
201 println!("{formatted}");
202 }
203 }
204
205 Ok(())
206}
207
208async fn tail_log(path: &Path, agent_filter: Option<&str>) -> Result<()> {
209 let cancel = CancellationToken::new();
210 let cancel_for_signal = cancel.clone();
211
212 tokio::spawn(async move {
213 if tokio::signal::ctrl_c().await.is_ok() {
214 cancel_for_signal.cancel();
215 }
216 });
217
218 let file = tokio::fs::File::open(path).await.context("opening log file")?;
219 let mut reader = BufReader::new(file);
220 let mut line = String::new();
221
222 loop {
223 tokio::select! {
224 () = cancel.cancelled() => break,
225 result = reader.read_line(&mut line) => {
226 match result {
227 Ok(0) => {
228 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
229 }
230 Ok(_) => {
231 let trimmed = line.trim_end();
232 if let Some(formatted) = format_log_line(trimmed, agent_filter) {
233 println!("{formatted}");
234 }
235 line.clear();
236 }
237 Err(e) => return Err(e).context("reading log file"),
238 }
239 }
240 }
241 }
242
243 Ok(())
244}
245
246const SUPPRESSED_MESSAGES: &[&str] = &["no actionable issues, waiting"];
248
249fn format_log_line(line: &str, agent_filter: Option<&str>) -> Option<String> {
254 let v: serde_json::Value = serde_json::from_str(line).ok()?;
255
256 let fields = v.get("fields")?;
257 let message = fields.get("message").and_then(|m| m.as_str()).unwrap_or("");
258
259 if SUPPRESSED_MESSAGES.contains(&message) {
261 return None;
262 }
263
264 if let Some(filter) = agent_filter {
266 let agent = fields.get("agent").and_then(|a| a.as_str()).unwrap_or("");
267 if !agent.is_empty() && agent != filter {
268 return None;
269 }
270 }
271
272 let timestamp = v
274 .get("timestamp")
275 .and_then(|t| t.as_str())
276 .and_then(|t| t.find('T').map(|i| &t[i + 1..]))
277 .map_or("??:??:??", |t| if t.len() > 8 { &t[..8] } else { t });
278
279 let level = v.get("level").and_then(|l| l.as_str()).unwrap_or("INFO");
280
281 let level_tag = match level {
282 "ERROR" => "ERR ",
283 "WARN" => "WARN",
284 "DEBUG" => "DBG ",
285 _ => " ",
286 };
287
288 let mut extras = Vec::new();
290 if let Some(obj) = fields.as_object() {
291 for (k, val) in obj {
292 if k == "message" {
293 continue;
294 }
295 let val_str = match val {
296 serde_json::Value::String(s) => s.clone(),
297 other => other.to_string(),
298 };
299 extras.push(format!("{k}={val_str}"));
300 }
301 }
302
303 let extra_str =
304 if extras.is_empty() { String::new() } else { format!(" {}", extras.join(" ")) };
305
306 Some(format!("{timestamp} {level_tag} {message}{extra_str}"))
307}
308
309#[cfg(test)]
310mod tests {
311 use super::*;
312 use crate::db::RunStatus;
313
314 fn json_log(msg: &str, agent: Option<&str>) -> String {
315 let agent_field = agent.map_or_else(String::new, |a| format!(r#","agent":"{a}""#));
316 format!(
317 r#"{{"timestamp":"2026-03-17T09:53:34.350926Z","level":"INFO","fields":{{"message":"{msg}"{agent_field}}},"target":"oven_cli::test"}}"#
318 )
319 }
320
321 #[test]
322 fn format_log_line_basic() {
323 let line = json_log("agent starting", Some("reviewer"));
324 let formatted = format_log_line(&line, None).unwrap();
325 assert!(formatted.contains("09:53:34"));
326 assert!(formatted.contains("agent starting"));
327 assert!(formatted.contains("agent=reviewer"));
328 assert!(!formatted.contains("oven_cli"));
330 }
331
332 #[test]
333 fn format_log_line_suppresses_waiting() {
334 let line = json_log("no actionable issues, waiting", None);
335 assert!(format_log_line(&line, None).is_none());
336 }
337
338 #[test]
339 fn format_log_line_agent_filter() {
340 let line = json_log("starting", Some("implementer"));
341 assert!(format_log_line(&line, Some("reviewer")).is_none());
343 assert!(format_log_line(&line, Some("implementer")).is_some());
345 }
346
347 #[test]
348 fn format_log_line_no_agent_passes_filter() {
349 let line = json_log("pipeline started", None);
351 assert!(format_log_line(&line, Some("reviewer")).is_some());
352 }
353
354 #[test]
355 fn format_log_line_non_json_returns_none() {
356 assert!(format_log_line("not json at all", None).is_none());
357 }
358
359 #[test]
360 fn format_log_line_error_level() {
361 let line = r#"{"timestamp":"2026-03-17T10:00:00Z","level":"ERROR","fields":{"message":"pipeline failed"},"target":"test"}"#;
362 let formatted = format_log_line(line, None).unwrap();
363 assert!(formatted.contains("ERR"));
364 }
365
366 #[test]
367 fn find_latest_log_dir_missing_root_returns_none() {
368 let result = find_latest_log_dir(Path::new("/nonexistent/path/logs")).unwrap();
369 assert!(result.is_none());
370 }
371
372 #[test]
373 fn find_latest_log_dir_empty_dir() {
374 let tmp = tempfile::tempdir().unwrap();
375 let result = find_latest_log_dir(tmp.path()).unwrap();
376 assert!(result.is_none());
377 }
378
379 #[test]
380 fn find_latest_log_dir_picks_newest() {
381 let tmp = tempfile::tempdir().unwrap();
382 let dir_a = tmp.path().join("aaaa1111");
383 let dir_b = tmp.path().join("bbbb2222");
384 std::fs::create_dir(&dir_a).unwrap();
385 std::thread::sleep(std::time::Duration::from_millis(50));
387 std::fs::create_dir(&dir_b).unwrap();
388
389 let result = find_latest_log_dir(tmp.path()).unwrap().unwrap();
390 assert_eq!(result.file_name().unwrap(), "bbbb2222");
391 }
392
393 #[test]
394 fn is_oven_running_returns_false_when_no_pid_file() {
395 let tmp = tempfile::tempdir().unwrap();
396 assert!(!is_oven_running(tmp.path()));
397 }
398
399 #[test]
400 fn is_oven_running_returns_false_for_stale_pid() {
401 let tmp = tempfile::tempdir().unwrap();
402 let oven_dir = tmp.path().join(".oven");
403 std::fs::create_dir_all(&oven_dir).unwrap();
404 std::fs::write(oven_dir.join("oven.pid"), "99999999").unwrap();
406 assert!(!is_oven_running(tmp.path()));
407 }
408
409 #[test]
410 fn truncate_line_short() {
411 assert_eq!(truncate_line("hello", 10), "hello");
412 }
413
414 #[test]
415 fn truncate_line_long() {
416 let long = "a".repeat(100);
417 let result = truncate_line(&long, 20);
418 assert_eq!(result.len(), 20);
419 assert!(result.ends_with("..."));
420 }
421
422 #[test]
423 fn truncate_line_multibyte_does_not_panic() {
424 let s = "😀😀😀😀😀😀";
426 let result = truncate_line(s, 10);
427 assert!(result.ends_with("..."));
428 assert!(result.len() <= 10);
429 }
430
431 #[test]
432 fn truncate_line_multiline_uses_first() {
433 assert_eq!(truncate_line("first\nsecond\nthird", 80), "first");
434 }
435
436 #[test]
437 fn print_run_status_formats_correctly() {
438 let run = Run {
439 id: "abc12345".to_string(),
440 issue_number: 42,
441 status: RunStatus::Reviewing,
442 pr_number: Some(10),
443 branch: Some("oven/issue-42".to_string()),
444 worktree_path: None,
445 cost_usd: 2.34,
446 auto_merge: false,
447 started_at: "2026-03-15T10:00:00".to_string(),
448 finished_at: None,
449 error_message: None,
450 complexity: "full".to_string(),
451 issue_source: "github".to_string(),
452 };
453 let agents = vec![
454 AgentRun {
455 id: 1,
456 run_id: "abc12345".to_string(),
457 agent: "implementer".to_string(),
458 cycle: 1,
459 status: "complete".to_string(),
460 cost_usd: 1.50,
461 turns: 12,
462 started_at: "2026-03-15T10:00:00".to_string(),
463 finished_at: Some("2026-03-15T10:05:00".to_string()),
464 output_summary: Some("Added auth flow".to_string()),
465 error_message: None,
466 raw_output: None,
467 },
468 AgentRun {
469 id: 2,
470 run_id: "abc12345".to_string(),
471 agent: "reviewer".to_string(),
472 cycle: 1,
473 status: "running".to_string(),
474 cost_usd: 0.84,
475 turns: 5,
476 started_at: "2026-03-15T10:05:00".to_string(),
477 finished_at: None,
478 output_summary: None,
479 error_message: None,
480 raw_output: None,
481 },
482 ];
483 print_run_status(&run, &agents, &[], None);
485 }
486
487 #[test]
488 fn print_run_status_with_agent_filter() {
489 let run = Run {
490 id: "abc12345".to_string(),
491 issue_number: 42,
492 status: RunStatus::Reviewing,
493 pr_number: Some(10),
494 branch: Some("oven/issue-42".to_string()),
495 worktree_path: None,
496 cost_usd: 2.34,
497 auto_merge: false,
498 started_at: "2026-03-15T10:00:00".to_string(),
499 finished_at: None,
500 error_message: None,
501 complexity: "full".to_string(),
502 issue_source: "github".to_string(),
503 };
504 let agents = vec![AgentRun {
505 id: 1,
506 run_id: "abc12345".to_string(),
507 agent: "implementer".to_string(),
508 cycle: 1,
509 status: "complete".to_string(),
510 cost_usd: 1.50,
511 turns: 12,
512 started_at: "2026-03-15T10:00:00".to_string(),
513 finished_at: Some("2026-03-15T10:05:00".to_string()),
514 output_summary: Some("ok".to_string()),
515 error_message: None,
516 raw_output: None,
517 }];
518 print_run_status(&run, &agents, &[], Some("reviewer"));
520 }
521
522 #[test]
523 fn show_stream_no_database() {
524 let tmp = tempfile::tempdir().unwrap();
525 let result = show_stream(tmp.path(), None);
526 assert!(result.is_err());
527 assert!(result.unwrap_err().to_string().contains("no database"));
528 }
529
530 #[test]
531 fn show_stream_empty_database() {
532 let tmp = tempfile::tempdir().unwrap();
533 let oven_dir = tmp.path().join(".oven");
534 std::fs::create_dir_all(&oven_dir).unwrap();
535 let db_path = oven_dir.join("oven.db");
536 drop(db::open(&db_path).unwrap());
538
539 show_stream(tmp.path(), None).unwrap();
541 }
542}