1mod view;
5mod worker;
6
7use crate::core::event::{Event, SessionRecord, SessionStatus};
8use crate::metrics::report;
9use crate::metrics::types::MetricsReport;
10use crate::store::span_tree::SpanNode;
11use crate::store::{SessionFilter, Store};
12use crate::ui::theme;
13use anyhow::Result;
14use arc_swap::ArcSwapOption;
15use crossterm::{
16 event::{self as cxev, Event as CxEvent, KeyCode, KeyEvent, KeyEventKind},
17 execute,
18 terminal::{EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode, enable_raw_mode},
19};
20use notify::{EventKind as NotifyEventKind, RecommendedWatcher, RecursiveMode, Watcher};
21use ratatui::{
22 Terminal,
23 backend::CrosstermBackend,
24 layout::{Constraint, Direction, Layout},
25 style::{Color, Style},
26 text::{Line, Span},
27 widgets::{Block, Borders, List, ListItem, ListState, Paragraph, Wrap},
28};
29use std::collections::HashMap;
30use std::path::{Path, PathBuf};
31use std::sync::{
32 Arc,
33 atomic::{AtomicBool, Ordering},
34};
35use time::OffsetDateTime;
36use tokio::sync::{Notify, mpsc};
37use tokio::task::JoinHandle;
38use tokio::time::{Duration, Instant, sleep_until};
39use view::{DetailData, DetailState, EventView, SessionView};
40use worker::{StoreRequest, StoreResponse, spawn_store_worker};
41
42const THIRTY_DAYS_SEC: u64 = 30 * 24 * 3600;
43const MS_HEURISTIC_THRESHOLD: u64 = 1_000_000_000_000;
45const WAL_REFRESH_COALESCE_MS: u64 = 100;
46const REPORT_REFRESH_MIN_MS: u64 = 2_000;
47const DEFAULT_VIEWPORT_HEIGHT: usize = 32;
48
49struct App {
50 sessions: SessionView,
51 events: EventView,
52 detail_state: DetailState,
53 agent_filter: String,
54 filter_mode: bool,
55 filter_buf: String,
56 clipboard_note: String,
57 left_focus: bool,
58 show_help: bool,
59 detail: bool,
60 show_metrics: bool,
61 metrics: Option<MetricsReport>,
62 metrics_cache: Arc<ArcSwapOption<MetricsReport>>,
63 report_dirty: Arc<AtomicBool>,
64 report_notify: Arc<Notify>,
65 pulse: bool,
66 workspace: String,
67 store_tx: mpsc::UnboundedSender<StoreRequest>,
68 store_rx: mpsc::UnboundedReceiver<StoreResponse>,
69 feedback_scores: HashMap<String, u8>,
70 feedback_token: u64,
71 detail_token: u64,
72 last_session_id: Option<String>,
73 session_viewport_height: usize,
74 event_viewport_height: usize,
75 error_note: String,
76}
77
78impl App {
79 fn open(
80 workspace: &Path,
81 metrics_cache: Arc<ArcSwapOption<MetricsReport>>,
82 report_dirty: Arc<AtomicBool>,
83 report_notify: Arc<Notify>,
84 ) -> Result<Self> {
85 let db = crate::core::workspace::db_path(workspace)?;
86 Store::open(&db)?;
87 let (store_tx, store_rx) = spawn_store_worker(db);
88 let ws = workspace.to_string_lossy().to_string();
89 let metrics = metrics_cache.load_full().as_deref().cloned();
90 let app = Self {
91 sessions: SessionView::new(),
92 events: EventView::new(),
93 detail_state: DetailState::Idle,
94 agent_filter: String::new(),
95 filter_mode: false,
96 filter_buf: String::new(),
97 clipboard_note: String::new(),
98 left_focus: true,
99 show_help: false,
100 detail: false,
101 show_metrics: false,
102 metrics,
103 metrics_cache,
104 report_dirty,
105 report_notify,
106 pulse: false,
107 workspace: ws,
108 store_tx,
109 store_rx,
110 feedback_scores: HashMap::new(),
111 feedback_token: 0,
112 detail_token: 0,
113 last_session_id: None,
114 session_viewport_height: DEFAULT_VIEWPORT_HEIGHT,
115 event_viewport_height: DEFAULT_VIEWPORT_HEIGHT,
116 error_note: String::new(),
117 };
118 app.mark_report_dirty();
119 let mut app = app;
120 app.request_session_pages();
121 app.request_feedback_for_viewport();
122 Ok(app)
123 }
124
125 fn sync_metrics_cache(&mut self) {
126 self.metrics = self.metrics_cache.load_full().as_deref().cloned();
127 }
128
129 fn mark_report_dirty(&self) {
130 self.report_dirty.store(true, Ordering::Release);
131 self.report_notify.notify_one();
132 }
133
134 fn refresh_full(&mut self) -> Result<()> {
135 self.sessions.reset();
136 self.events.clear();
137 self.detail_state = DetailState::Idle;
138 self.last_session_id = None;
139 self.pulse = !self.pulse;
140 self.request_session_pages();
141 self.sync_metrics_cache();
142 self.mark_report_dirty();
143 Ok(())
144 }
145
146 fn refresh(&mut self) -> Result<()> {
147 self.pulse = !self.pulse;
148 self.request_session_pages();
149 self.request_selected_detail();
150 self.request_event_pages();
151 self.sync_metrics_cache();
152 self.mark_report_dirty();
153 Ok(())
154 }
155
156 fn filter(&self) -> SessionFilter {
157 SessionFilter {
158 agent_prefix: Some(self.agent_filter.trim().to_lowercase()).filter(|s| !s.is_empty()),
159 status: None,
160 since_ms: None,
161 }
162 }
163
164 fn request_session_pages(&mut self) {
165 let offsets = self
166 .sessions
167 .needed_page_offsets(self.session_viewport_height);
168 for offset in offsets {
169 if self.sessions.request_page(offset) {
170 let _ = self.store_tx.send(StoreRequest::SessionsPage {
171 token: self.sessions.generation(),
172 workspace: self.workspace.clone(),
173 offset,
174 limit: self.sessions.page_size,
175 filter: self.filter(),
176 });
177 }
178 }
179 }
180
181 fn request_feedback_for_viewport(&mut self) {
182 let ids: Vec<String> = self
183 .sessions
184 .visible_rows(self.session_viewport_height)
185 .into_iter()
186 .filter_map(|(_, row)| row.map(|s| s.id.clone()))
187 .filter(|id| !self.feedback_scores.contains_key(id))
188 .collect();
189 if ids.is_empty() {
190 return;
191 }
192 self.feedback_token = self.feedback_token.wrapping_add(1);
193 let _ = self.store_tx.send(StoreRequest::Feedback {
194 token: self.feedback_token,
195 ids,
196 });
197 }
198
199 fn request_selected_detail(&mut self) {
200 let Some(id) = self.selected_id().map(str::to_string) else {
201 self.detail_state = DetailState::Idle;
202 self.events.clear();
203 self.last_session_id = None;
204 return;
205 };
206 if self.last_session_id.as_deref() != Some(&id) {
207 self.events.reset_for(&id);
208 self.detail_token = self.detail_token.wrapping_add(1);
209 self.detail_state = DetailState::Loading {
210 token: self.detail_token,
211 session_id: id.clone(),
212 };
213 let _ = self.store_tx.send(StoreRequest::Detail {
214 token: self.detail_token,
215 session_id: id.clone(),
216 });
217 self.last_session_id = Some(id);
218 }
219 self.request_event_pages();
220 }
221
222 fn request_event_pages(&mut self) {
223 let Some(session_id) = self.events.session_id().map(str::to_string) else {
224 return;
225 };
226 for after_seq in self.events.needed_after_seq(self.event_viewport_height) {
227 if self.events.request_page(after_seq) {
228 let _ = self.store_tx.send(StoreRequest::EventsPage {
229 token: self.events.generation(),
230 session_id: session_id.clone(),
231 after_seq,
232 limit: self.events.page_size,
233 });
234 }
235 }
236 }
237
238 fn apply_store_response(&mut self, response: StoreResponse) {
239 match response {
240 StoreResponse::SessionsPage {
241 token,
242 offset,
243 result,
244 } => self.apply_session_page(token, offset, result),
245 StoreResponse::EventsPage {
246 token,
247 session_id,
248 after_seq,
249 result,
250 } => self.apply_event_page(token, &session_id, after_seq, result),
251 StoreResponse::Detail {
252 token,
253 session_id,
254 result,
255 } => self.apply_detail(token, &session_id, result),
256 StoreResponse::Feedback { token, result } => self.apply_feedback(token, result),
257 }
258 }
259
260 fn apply_session_page(
261 &mut self,
262 token: u64,
263 offset: usize,
264 result: Result<crate::store::SessionPage, String>,
265 ) {
266 if token != self.sessions.generation() {
267 return;
268 }
269 match result {
270 Ok(page) => {
271 self.sessions.finish_page(offset, page.rows, page.total);
272 self.error_note.clear();
273 self.request_feedback_for_viewport();
274 self.request_selected_detail();
275 }
276 Err(err) => {
277 self.sessions.finish_error(offset);
278 self.error_note = err;
279 }
280 }
281 }
282
283 fn apply_event_page(
284 &mut self,
285 token: u64,
286 session_id: &str,
287 after_seq: u64,
288 result: Result<Vec<Event>, String>,
289 ) {
290 if token != self.events.generation() || self.events.session_id() != Some(session_id) {
291 return;
292 }
293 match result {
294 Ok(rows) => self.events.finish_page(after_seq, rows),
295 Err(err) => {
296 self.events.finish_error(after_seq);
297 self.error_note = err;
298 }
299 }
300 }
301
302 fn apply_detail(&mut self, token: u64, session_id: &str, result: Result<DetailData, String>) {
303 match &self.detail_state {
304 DetailState::Loading {
305 token: active,
306 session_id: active_id,
307 } if *active == token && active_id == session_id => {}
308 _ => return,
309 }
310 self.detail_state = match result {
311 Ok(data) => DetailState::Ready(data),
312 Err(err) => DetailState::Error(err),
313 };
314 }
315
316 fn apply_feedback(&mut self, token: u64, result: Result<HashMap<String, u8>, String>) {
317 if token != self.feedback_token {
318 return;
319 }
320 if let Ok(scores) = result {
321 self.feedback_scores.extend(scores);
322 }
323 }
324
325 fn set_viewport_height(&mut self, height: usize) {
326 let h = height.saturating_sub(4);
327 self.session_viewport_height = h.max(1);
328 self.event_viewport_height = h.max(1);
329 self.sessions
330 .set_viewport_height(self.session_viewport_height);
331 self.events.set_viewport_height(self.event_viewport_height);
332 }
333
334 fn after_session_cursor_move(&mut self) {
335 self.request_session_pages();
336 self.request_feedback_for_viewport();
337 self.request_selected_detail();
338 }
339
340 fn selected_session(&self) -> Option<&SessionRecord> {
341 self.sessions.selected()
342 }
343
344 fn selected_id(&self) -> Option<&str> {
345 self.selected_session().map(|s| s.id.as_str())
346 }
347
348 fn selected_event(&self) -> Option<&Event> {
349 self.events.selected()
350 }
351}
352
353fn time_ago_label(now_ms: u64, ts_ms: u64) -> String {
355 if ts_ms == 0 {
356 return "?".to_string();
357 }
358 let mut ts = ts_ms;
359 if ts < MS_HEURISTIC_THRESHOLD && now_ms >= MS_HEURISTIC_THRESHOLD {
360 ts = ts.saturating_mul(1000);
361 }
362 let diff_sec = now_ms.saturating_sub(ts) / 1000;
363 if diff_sec > THIRTY_DAYS_SEC {
364 return abs_ts_label(ts);
365 }
366 match diff_sec {
367 0 => "just now".to_string(),
368 s if s < 60 => format!("{s}s"),
369 s if s < 3600 => format!("{}m", s / 60),
370 s if s < 86_400 => format!("{}h", s / 3600),
371 s => format!("{}d", s / 86_400),
372 }
373}
374
375fn abs_ts_label(ts_ms: u64) -> String {
376 let Ok(dt) = OffsetDateTime::from_unix_timestamp((ts_ms / 1000) as i64) else {
377 return "?".to_string();
378 };
379 format!(
380 "{:04}-{:02}-{:02} {:02}:{:02}",
381 dt.year(),
382 u8::from(dt.month()),
383 dt.day(),
384 dt.hour(),
385 dt.minute()
386 )
387}
388
389fn truncate(s: &str, max: usize) -> &str {
390 if s.chars().count() <= max {
391 return s;
392 }
393 s.char_indices()
394 .nth(max.saturating_sub(1))
395 .map(|(i, _)| &s[..i])
396 .unwrap_or(s)
397}
398
399fn model_suffix(model: &Option<String>) -> String {
400 const MAX: usize = 20;
401 match model {
402 Some(m) if !m.is_empty() => format!(" {}", truncate(m, MAX)),
403 _ => " —".to_string(),
404 }
405}
406
407fn session_status_letter(s: &SessionRecord) -> char {
408 match s.status {
409 SessionStatus::Running => 'R',
410 SessionStatus::Waiting => 'W',
411 SessionStatus::Idle => 'I',
412 SessionStatus::Done => 'D',
413 }
414}
415
416fn format_event_tokens(e: &Event) -> Option<String> {
417 let mut out = String::new();
418 match (e.tokens_in, e.tokens_out) {
419 (Some(a), Some(b)) => out = format!("{a}/{b}"),
420 (Some(a), None) => out = a.to_string(),
421 (None, Some(b)) => out = b.to_string(),
422 (None, None) => {}
423 }
424 if let Some(r) = e.reasoning_tokens {
425 if out.is_empty() {
426 out = format!("r{r}");
427 } else {
428 out = format!("{out}+r{r}");
429 }
430 }
431 if out.is_empty() { None } else { Some(out) }
432}
433
434fn event_row_text(now_ms: u64, e: &Event, lead: &HashMap<String, u64>) -> String {
435 let age = time_ago_label(now_ms, e.ts_ms);
436 let tool = e.tool.as_deref().unwrap_or("-");
437 let lead_s = e
438 .tool_call_id
439 .as_ref()
440 .and_then(|id| lead.get(id).copied())
441 .map(|ms| format!("{ms}ms"))
442 .unwrap_or_else(|| "—".to_string());
443 let tok = format_event_tokens(e)
444 .map(|s| format!(" tok={s}"))
445 .unwrap_or_default();
446 format!("{age} {kind:?} {tool}{tok} {lead_s}", kind = e.kind)
447}
448
449fn draw(f: &mut ratatui::Frame, app: &App) {
450 if app.show_help {
451 draw_help(f);
452 return;
453 }
454 let chunks = Layout::default()
455 .direction(Direction::Vertical)
456 .constraints([Constraint::Min(1), Constraint::Length(1)])
457 .split(f.area());
458
459 let panes = Layout::default()
460 .direction(Direction::Horizontal)
461 .constraints([Constraint::Percentage(35), Constraint::Percentage(65)])
462 .split(chunks[0]);
463
464 draw_sessions(f, app, panes[0]);
465 draw_events(f, app, panes[1]);
466 draw_statusbar(f, app, chunks[1]);
467}
468
469fn draw_sessions(f: &mut ratatui::Frame, app: &App, area: ratatui::layout::Rect) {
470 let border_color = if app.left_focus {
471 theme::BORDER_ACTIVE
472 } else {
473 theme::BORDER_INACTIVE
474 };
475 let title = if app.agent_filter.is_empty() {
476 format!("Sessions ({})", app.sessions.total)
477 } else {
478 format!(
479 "Sessions {} (agent prefix: {:?})",
480 app.sessions.total, app.agent_filter
481 )
482 };
483 let block = Block::default()
484 .title(title)
485 .borders(Borders::ALL)
486 .border_style(Style::default().fg(border_color));
487 let now = now_ms();
488 let items: Vec<ListItem> = app
489 .sessions
490 .visible_rows(area.height.saturating_sub(2) as usize)
491 .into_iter()
492 .map(|(idx, row)| {
493 row.map(|s| session_item(now, s, &app.feedback_scores))
494 .unwrap_or_else(|| ListItem::new(format!("{idx:>6} loading...")))
495 })
496 .collect();
497 let mut state = ListState::default();
498 state.select(
499 app.sessions
500 .selected_local_index(area.height.saturating_sub(2) as usize),
501 );
502 f.render_stateful_widget(
503 ratatui::widgets::List::new(items)
504 .block(block)
505 .highlight_style(Style::default().bg(Color::Blue).fg(Color::White)),
506 area,
507 &mut state,
508 );
509}
510
511fn session_item<'a>(
512 now: u64,
513 s: &'a SessionRecord,
514 feedback_scores: &HashMap<String, u8>,
515) -> ListItem<'a> {
516 let st = format!("{:?}", s.status);
517 let st_color = theme::status_color(&st);
518 let age = time_ago_label(now, s.started_at_ms);
519 let tag = session_status_letter(s);
520 let m = model_suffix(&s.model);
521 let score_span = feedback_scores.get(&s.id).map(|&sc| {
522 let color = match sc {
523 1..=2 => Color::Red,
524 3 => Color::Yellow,
525 _ => Color::Green,
526 };
527 Span::styled(format!("★{sc}"), Style::default().fg(color))
528 });
529 let mut spans = vec![
530 Span::styled(format!("{:.10}", s.id), Style::default().fg(Color::Gray)),
531 Span::raw(" "),
532 Span::styled(
533 format!("{:.7}", s.agent),
534 Style::default().fg(theme::agent_color(&s.agent)),
535 ),
536 Span::raw(" "),
537 Span::styled(format!("{tag}"), Style::default().fg(st_color)),
538 Span::raw(" "),
539 Span::styled(age, Style::default().fg(Color::White)),
540 Span::styled(m, Style::default().fg(Color::Gray)),
541 ];
542 if let Some(s) = score_span {
543 spans.push(Span::raw(" "));
544 spans.push(s);
545 }
546 ListItem::new(Line::from(spans))
547}
548
549fn draw_events(f: &mut ratatui::Frame, app: &App, area: ratatui::layout::Rect) {
550 if app.show_metrics {
551 draw_metrics(f, app, area);
552 return;
553 }
554 let id = app.selected_id().unwrap_or("-");
555 let model = app
556 .selected_session()
557 .and_then(|s| s.model.as_deref().filter(|m| !m.is_empty()))
558 .map(|m| truncate(m, 24).to_string())
559 .unwrap_or_else(|| "—".to_string());
560 let border_color = if !app.left_focus {
561 theme::BORDER_ACTIVE
562 } else {
563 theme::BORDER_INACTIVE
564 };
565 let title = format!("Events — {:.18} — {}", id, model);
566 let now = now_ms();
567 let leads = app.detail_state.leads();
568 let spans = app.detail_state.spans();
569 if app.detail
570 && let Some(ev) = app.selected_event()
571 {
572 let split = Layout::default()
573 .direction(Direction::Vertical)
574 .constraints([Constraint::Min(2), Constraint::Length(10)])
575 .split(area);
576 render_event_list(f, app, split[0], title.clone(), border_color, now);
577 let detail = event_detail_text(ev, leads);
578 let det_block = Block::default()
579 .title("Detail")
580 .borders(Borders::ALL)
581 .border_style(Style::default().fg(border_color));
582 f.render_widget(
583 Paragraph::new(detail)
584 .block(det_block)
585 .wrap(Wrap { trim: true }),
586 split[1],
587 );
588 return;
589 }
590 if !spans.is_empty() {
591 let max_depth: u32 = spans.iter().map(|n| n.span.depth).max().unwrap_or(0);
592 let strip_h = (max_depth + 3).min(8) as u16;
593 let split = Layout::default()
594 .direction(Direction::Vertical)
595 .constraints([Constraint::Min(2), Constraint::Length(strip_h)])
596 .split(area);
597 render_event_list(f, app, split[0], title, border_color, now);
598 let span_text: Vec<Line> = span_depth_lines(spans);
599 let span_block = Block::default()
600 .title("Span tree")
601 .borders(Borders::ALL)
602 .border_style(Style::default().fg(theme::BORDER_INACTIVE));
603 f.render_widget(
604 Paragraph::new(span_text)
605 .block(span_block)
606 .wrap(Wrap { trim: false }),
607 split[1],
608 );
609 return;
610 }
611 if matches!(app.detail_state, DetailState::Loading { .. }) && app.events.total_loaded == 0 {
612 let block = Block::default()
613 .title(title)
614 .borders(Borders::ALL)
615 .border_style(Style::default().fg(border_color));
616 f.render_widget(Paragraph::new("loading...").block(block), area);
617 return;
618 }
619 if let Some(err) = app.detail_state.error_message()
620 && app.events.total_loaded == 0
621 {
622 let block = Block::default()
623 .title(title)
624 .borders(Borders::ALL)
625 .border_style(Style::default().fg(border_color));
626 f.render_widget(Paragraph::new(err.to_string()).block(block), area);
627 return;
628 }
629 render_event_list(f, app, area, title, border_color, now);
630}
631
632fn render_event_list(
633 f: &mut ratatui::Frame,
634 app: &App,
635 area: ratatui::layout::Rect,
636 title: String,
637 border_color: Color,
638 now: u64,
639) {
640 let block = Block::default()
641 .title(title)
642 .borders(Borders::ALL)
643 .border_style(Style::default().fg(border_color));
644 let items: Vec<ListItem> = app
645 .events
646 .visible_rows(area.height.saturating_sub(2) as usize)
647 .into_iter()
648 .map(|(idx, row)| {
649 row.map(|e| ListItem::new(event_row_text(now, e, app.detail_state.leads())))
650 .unwrap_or_else(|| ListItem::new(format!("{idx:>6} loading...")))
651 })
652 .collect();
653 let mut state = ListState::default();
654 state.select(
655 app.events
656 .selected_local_index(area.height.saturating_sub(2) as usize),
657 );
658 f.render_stateful_widget(
659 List::new(items)
660 .block(block)
661 .highlight_style(Style::default().bg(Color::Blue).fg(Color::White)),
662 area,
663 &mut state,
664 );
665}
666
667fn span_depth_lines(nodes: &[SpanNode]) -> Vec<Line<'static>> {
668 let mut lines = Vec::new();
669 for n in nodes {
670 push_span_line(&mut lines, n, 0);
671 }
672 lines
673}
674
675fn push_span_line(lines: &mut Vec<Line<'static>>, node: &SpanNode, depth: u32) {
676 let indent = " ".repeat(depth as usize);
677 let prefix = if depth == 0 { "┌ " } else { "├ " };
678 let cost = node
679 .span
680 .subtree_cost_usd_e6
681 .map(|c| format!(" ${:.4}", c as f64 / 1_000_000.0))
682 .unwrap_or_default();
683 let text = format!("{}{}{}{}", indent, prefix, node.span.tool, cost);
684 lines.push(Line::from(Span::raw(text)));
685 for child in &node.children {
686 push_span_line(lines, child, depth + 1);
687 }
688}
689
690fn event_detail_text(ev: &Event, lead: &HashMap<String, u64>) -> String {
691 let lead_s = ev
692 .tool_call_id
693 .as_ref()
694 .and_then(|id| lead.get(id).copied())
695 .map(|ms| format!("{ms}ms"))
696 .unwrap_or_else(|| "—".to_string());
697 let head = format!(
698 "seq={} kind={:?} tool={} call_id={} in={:?} out={:?} r={:?} cost_e6={:?} lead={}\n",
699 ev.seq,
700 ev.kind,
701 ev.tool.as_deref().unwrap_or("-"),
702 ev.tool_call_id.as_deref().unwrap_or("—"),
703 ev.tokens_in,
704 ev.tokens_out,
705 ev.reasoning_tokens,
706 ev.cost_usd_e6,
707 lead_s
708 );
709 let json = serde_json::to_string_pretty(&ev.payload).unwrap_or_else(|_| ev.payload.to_string());
710 head + &json
711}
712
713fn draw_metrics(f: &mut ratatui::Frame, app: &App, area: ratatui::layout::Rect) {
714 let block = Block::default()
715 .title("Metrics")
716 .borders(Borders::ALL)
717 .border_style(Style::default().fg(theme::BORDER_ACTIVE));
718 let empty = app.metrics.is_none()
719 || app
720 .metrics
721 .as_ref()
722 .is_some_and(|m| m.slowest_tools.is_empty() && m.hottest_files.is_empty());
723 let text = if empty {
724 "(No metrics in this window yet. Run `kaizen metrics` in a shell, or `r` here after a repo is indexed.)\n\nMetrics need a successful snapshot + events for tool spans — see docs/telemetry-journey.md."
725 .to_string()
726 } else {
727 let mut lines = vec!["Slow tools".to_string()];
728 if let Some(metrics) = &app.metrics {
729 for row in metrics.slowest_tools.iter().take(4) {
730 let p95 = row
731 .p95_ms
732 .map(|v| format!("{v}ms"))
733 .unwrap_or_else(|| "-".into());
734 lines.push(format!("{} p95={} tok={}", row.tool, p95, row.total_tokens));
735 }
736 lines.push(String::new());
737 lines.push("Hot files".into());
738 for row in metrics.hottest_files.iter().take(4) {
739 lines.push(format!("{} {}", row.value, row.path));
740 }
741 }
742 lines.join("\n")
743 };
744 f.render_widget(Paragraph::new(text).block(block), area);
745}
746
747fn draw_statusbar(f: &mut ratatui::Frame, app: &App, area: ratatui::layout::Rect) {
748 let pulse = if app.pulse { "●" } else { "○" };
749 let text = if app.filter_mode {
750 format!(
751 "FILTER type agent substring | Enter apply | Esc cancel | buffer: {}",
752 app.filter_buf
753 )
754 } else {
755 let note = if app.clipboard_note.is_empty() {
756 String::new()
757 } else {
758 format!(" | {}", app.clipboard_note)
759 };
760 format!(
761 "LIVE {pulse} j/k Tab m metrics / filter y copy id Enter detail ? help q quit{note}"
762 )
763 };
764 f.render_widget(Paragraph::new(text), area);
765}
766
767fn draw_help(f: &mut ratatui::Frame) {
768 let text = "j/k ↑/↓ move in focused pane | g/G first/last | Tab switch pane\n\
769 Enter toggle event detail | Esc back | r refresh | q quit\n\
770 / filter sessions by agent substring | y copy session id (left pane)";
771 let block = Block::default().title("Help").borders(Borders::ALL);
772 f.render_widget(Paragraph::new(text).block(block), f.area());
773}
774
775fn now_ms() -> u64 {
776 std::time::SystemTime::now()
777 .duration_since(std::time::UNIX_EPOCH)
778 .unwrap_or_default()
779 .as_millis() as u64
780}
781
782fn spawn_key_reader(stop: Arc<AtomicBool>) -> mpsc::UnboundedReceiver<KeyEvent> {
783 let (tx, rx) = mpsc::unbounded_channel();
784 tokio::task::spawn_blocking(move || {
785 while !stop.load(Ordering::Acquire) {
786 match cxev::poll(Duration::from_millis(250)) {
787 Ok(true) => match cxev::read() {
788 Ok(CxEvent::Key(k)) if k.kind == KeyEventKind::Press => {
789 if tx.send(k).is_err() {
790 break;
791 }
792 }
793 Ok(_) => {}
794 Err(_) => break,
795 },
796 Ok(false) => {}
797 Err(_) => break,
798 }
799 }
800 });
801 rx
802}
803
804fn spawn_wal_watcher(
805 wal_path: &Path,
806 dirty: Arc<AtomicBool>,
807) -> Result<(RecommendedWatcher, mpsc::UnboundedReceiver<()>)> {
808 let (tx, rx) = mpsc::unbounded_channel();
809 let watched_wal = wal_path.to_path_buf();
810 let callback_wal = watched_wal.clone();
811 let mut watcher = RecommendedWatcher::new(
812 move |res: notify::Result<notify::Event>| {
813 if let Ok(event) = res
814 && !matches!(event.kind, NotifyEventKind::Access(_))
815 && event.paths.iter().any(|p| p == &callback_wal)
816 {
817 dirty.store(true, Ordering::Release);
818 let _ = tx.send(());
819 }
820 },
821 notify::Config::default(),
822 )?;
823 watcher.watch(
824 watched_wal.parent().unwrap_or_else(|| Path::new(".")),
825 RecursiveMode::NonRecursive,
826 )?;
827 Ok((watcher, rx))
828}
829
830fn spawn_report_worker(
831 db_path: PathBuf,
832 workspace: String,
833 cache: Arc<ArcSwapOption<MetricsReport>>,
834 dirty: Arc<AtomicBool>,
835 notify: Arc<Notify>,
836 ui_tx: mpsc::UnboundedSender<()>,
837) -> JoinHandle<()> {
838 tokio::spawn(async move {
839 let mut last_run = Instant::now() - Duration::from_millis(REPORT_REFRESH_MIN_MS);
840 loop {
841 notify.notified().await;
842 let ready_at = last_run + Duration::from_millis(REPORT_REFRESH_MIN_MS);
843 let now = Instant::now();
844 if now < ready_at {
845 sleep_until(ready_at).await;
846 }
847 if !dirty.swap(false, Ordering::AcqRel) {
848 continue;
849 }
850 let db = db_path.clone();
851 let ws = workspace.clone();
852 let next = tokio::task::spawn_blocking(move || {
853 let store = Store::open_read_only(&db)?;
854 report::build_report(&store, &ws, 7)
855 })
856 .await;
857 last_run = Instant::now();
858 if let Ok(Ok(report)) = next {
859 cache.store(Some(Arc::new(report)));
860 let _ = ui_tx.send(());
861 }
862 }
863 })
864}
865
866struct BackgroundWorkers {
867 report: JoinHandle<()>,
868}
869
870impl BackgroundWorkers {
871 fn shutdown(self) {
872 self.report.abort();
873 }
874}
875
876async fn wait_for_deadline(deadline: Option<Instant>) {
877 match deadline {
878 Some(deadline) => sleep_until(deadline).await,
879 None => std::future::pending::<()>().await,
880 }
881}
882
883pub async fn run(workspace: &Path) -> Result<()> {
885 let workspace_buf = resolved_workspace_path(workspace);
886 let workspace = workspace_buf.as_path();
887 let db_path = crate::core::workspace::db_path(workspace)?;
888 let metrics_cache = Arc::new(ArcSwapOption::from(None));
889 let report_dirty = Arc::new(AtomicBool::new(true));
890 let report_notify = Arc::new(Notify::new());
891 let (report_ui_tx, mut report_ui_rx) = mpsc::unbounded_channel();
892 let workers = BackgroundWorkers {
893 report: spawn_report_worker(
894 db_path.clone(),
895 workspace.to_string_lossy().to_string(),
896 metrics_cache.clone(),
897 report_dirty.clone(),
898 report_notify.clone(),
899 report_ui_tx,
900 ),
901 };
902 let mut app = App::open(
903 workspace,
904 metrics_cache,
905 report_dirty.clone(),
906 report_notify.clone(),
907 )?;
908 enable_raw_mode()?;
909 let mut stdout = std::io::stdout();
910 execute!(stdout, EnterAlternateScreen)?;
911 let backend = CrosstermBackend::new(stdout);
912 let mut terminal = Terminal::new(backend)?;
913
914 std::panic::set_hook(Box::new(|_| {
915 let _ = disable_raw_mode();
916 let _ = execute!(std::io::stdout(), LeaveAlternateScreen);
917 }));
918
919 let wal_dirty = Arc::new(AtomicBool::new(false));
920 let (_watcher, mut wal_rx) =
921 match spawn_wal_watcher(&db_path.with_extension("db-wal"), wal_dirty.clone()) {
922 Ok((watcher, rx)) => (Some(watcher), rx),
923 Err(_) => {
924 let (_tx, rx) = mpsc::unbounded_channel();
925 (None, rx)
926 }
927 };
928 let input_stop = Arc::new(AtomicBool::new(false));
929 let mut key_rx = spawn_key_reader(input_stop.clone());
930 let mut needs_draw = true;
931 let mut pending_refresh = false;
932 let mut refresh_deadline = None;
933 let mut last_refresh = Instant::now() - Duration::from_millis(WAL_REFRESH_COALESCE_MS);
934
935 loop {
936 if needs_draw {
937 app.sync_metrics_cache();
938 terminal.draw(|f| {
939 app.set_viewport_height(f.area().height as usize);
940 app.request_session_pages();
941 app.request_event_pages();
942 draw(f, &app);
943 })?;
944 needs_draw = false;
945 }
946 tokio::select! {
947 Some(response) = app.store_rx.recv() => {
948 app.apply_store_response(response);
949 needs_draw = true;
950 }
951 _ = wait_for_deadline(refresh_deadline), if refresh_deadline.is_some() => {
952 refresh_deadline = None;
953 if pending_refresh {
954 pending_refresh = false;
955 if app.refresh().is_ok() {
956 last_refresh = Instant::now();
957 needs_draw = true;
958 }
959 }
960 }
961 Some(_) = wal_rx.recv() => {
962 if wal_dirty.swap(false, Ordering::AcqRel) {
963 let ready_at = last_refresh + Duration::from_millis(WAL_REFRESH_COALESCE_MS);
964 let now = Instant::now();
965 if now >= ready_at {
966 if app.refresh().is_ok() {
967 last_refresh = now;
968 needs_draw = true;
969 }
970 } else {
971 pending_refresh = true;
972 refresh_deadline = Some(ready_at);
973 }
974 }
975 }
976 Some(_) = report_ui_rx.recv() => {
977 app.sync_metrics_cache();
978 needs_draw = true;
979 }
980 Some(k) = key_rx.recv() => {
981 if app.filter_mode {
982 match k.code {
983 KeyCode::Enter => {
984 app.agent_filter = app.filter_buf.trim().to_string();
985 app.filter_mode = false;
986 let _ = app.refresh_full();
987 needs_draw = true;
988 }
989 KeyCode::Esc => {
990 app.filter_mode = false;
991 app.filter_buf.clear();
992 needs_draw = true;
993 }
994 KeyCode::Backspace => {
995 app.filter_buf.pop();
996 needs_draw = true;
997 }
998 KeyCode::Char(c) => {
999 app.filter_buf.push(c);
1000 needs_draw = true;
1001 }
1002 _ => {}
1003 }
1004 continue;
1005 }
1006 match k.code {
1007 KeyCode::Char('/') => {
1008 app.filter_mode = true;
1009 app.filter_buf.clone_from(&app.agent_filter);
1010 needs_draw = true;
1011 }
1012 KeyCode::Char('y') if app.left_focus => {
1013 if let Some(id) = app.selected_id() {
1014 match arboard::Clipboard::new() {
1015 Ok(mut cb) => {
1016 if cb.set_text(id).is_ok() {
1017 app.clipboard_note = "copied session id".to_string();
1018 } else {
1019 app.clipboard_note = "clipboard write failed".to_string();
1020 }
1021 }
1022 Err(_) => app.clipboard_note = "no clipboard".to_string(),
1023 }
1024 needs_draw = true;
1025 }
1026 }
1027 KeyCode::Char('q') | KeyCode::Esc if !app.detail && !app.show_help => break,
1028 KeyCode::Char('q') if app.show_help => { app.show_help = false; needs_draw = true; }
1029 KeyCode::Char('q') => { app.detail = false; app.show_help = false; needs_draw = true; }
1030 KeyCode::Esc | KeyCode::Backspace => {
1031 app.detail = false;
1032 app.show_help = false;
1033 needs_draw = true;
1034 }
1035 KeyCode::Char('?') => { app.show_help = !app.show_help; needs_draw = true; }
1036 KeyCode::Char('m') => { app.show_metrics = !app.show_metrics; needs_draw = true; }
1037 KeyCode::Tab => {
1038 app.left_focus = !app.left_focus;
1039 needs_draw = true;
1040 }
1041 KeyCode::Char('r') => { let _ = app.refresh_full(); needs_draw = true; }
1042 KeyCode::Char('j') | KeyCode::Down => {
1043 if app.show_metrics || app.left_focus {
1044 app.sessions.move_by(1);
1045 app.after_session_cursor_move();
1046 needs_draw = true;
1047 } else {
1048 app.events.move_by(1);
1049 app.request_event_pages();
1050 needs_draw = true;
1051 }
1052 }
1053 KeyCode::Char('k') | KeyCode::Up => {
1054 if app.show_metrics || app.left_focus {
1055 app.sessions.move_by(-1);
1056 app.after_session_cursor_move();
1057 needs_draw = true;
1058 } else {
1059 app.events.move_by(-1);
1060 app.request_event_pages();
1061 needs_draw = true;
1062 }
1063 }
1064 KeyCode::Char('g') => {
1065 if app.show_metrics || app.left_focus {
1066 app.sessions.cursor = 0;
1067 app.after_session_cursor_move();
1068 } else {
1069 app.events.cursor = 0;
1070 app.request_event_pages();
1071 }
1072 needs_draw = true;
1073 }
1074 KeyCode::Char('G') => {
1075 if app.show_metrics || app.left_focus {
1076 app.sessions.jump_last();
1077 app.after_session_cursor_move();
1078 } else {
1079 app.events.jump_last_loaded();
1080 app.request_event_pages();
1081 }
1082 needs_draw = true;
1083 }
1084 KeyCode::Enter if app.selected_event().is_some() && !app.show_metrics => {
1085 app.detail = !app.detail;
1086 needs_draw = true;
1087 }
1088 _ => {}
1089 }
1090 }
1091 else => {}
1092 }
1093 }
1094
1095 input_stop.store(true, Ordering::Release);
1096 workers.shutdown();
1097 drop(app);
1098 disable_raw_mode()?;
1099 execute!(terminal.backend_mut(), LeaveAlternateScreen)?;
1100 Ok(())
1101}
1102
1103fn resolved_workspace_path(workspace: &Path) -> PathBuf {
1104 crate::core::workspace::canonical(workspace)
1105}
1106
1107#[cfg(test)]
1108mod tests {
1109 use super::*;
1110 use crate::core::event::{EventKind, EventSource};
1111
1112 #[test]
1113 fn time_ago_just_now() {
1114 assert_eq!(time_ago_label(10_000, 10_000), "just now");
1115 }
1116
1117 #[test]
1118 fn time_ago_treats_small_ts_as_seconds() {
1119 let now = 1_700_000_000_000u64;
1120 let ts_sec = 1_700_000_000u64;
1121 let label = time_ago_label(now, ts_sec);
1122 assert!(
1123 !label.contains('?'),
1124 "expected relative label, got {label:?}"
1125 );
1126 }
1127
1128 #[test]
1129 fn time_ago_old_uses_absolute() {
1130 let now = 1_700_000_000_000u64;
1131 let old = now - (40u64 * 24 * 3600 * 1000);
1132 let label = time_ago_label(now, old);
1133 assert!(label.contains('-'), "expected date-like label: {label}");
1134 }
1135
1136 #[test]
1137 fn session_view_clamps_cursor_and_suppresses_double_load() {
1138 let mut view = view::SessionView::new();
1139 view.set_viewport_height(4);
1140 assert_eq!(view.needed_page_offsets(4), vec![0]);
1141 assert!(view.request_page(0));
1142 assert!(!view.request_page(0));
1143 view.finish_page(0, vec![session("a", 3), session("b", 2)], 2);
1144 view.move_by(99);
1145 assert_eq!(view.cursor, 1);
1146 assert_eq!(view.selected().unwrap().id, "b");
1147 }
1148
1149 #[test]
1150 fn session_view_eviction_keeps_cursor_page() {
1151 let mut view = view::SessionView::new();
1152 view.page_size = 2;
1153 for offset in (0..=20).step_by(2) {
1154 view.cursor = offset;
1155 view.finish_page(
1156 offset,
1157 vec![session(&format!("s{offset}"), offset as u64)],
1158 30,
1159 );
1160 }
1161 assert!(view.window.contains_key(&20));
1162 assert!(!view.window.contains_key(&0));
1163 }
1164
1165 #[test]
1166 fn event_view_paginates_from_zero_and_resets_generation() {
1167 let mut view = view::EventView::new();
1168 view.page_size = 2;
1169 view.reset_for("s1");
1170 let token = view.generation();
1171 assert!(view.needed_after_seq(4).starts_with(&[0, 2]));
1172 assert!(view.request_page(0));
1173 assert!(!view.request_page(0));
1174 view.finish_page(0, vec![event("s1", 0), event("s1", 1)]);
1175 view.reset_for("s2");
1176 assert_ne!(view.generation(), token);
1177 assert!(view.window.is_empty());
1178 }
1179
1180 #[test]
1181 fn runtime_shutdown_timeout_bounds_blocking_task() {
1182 let start = std::time::Instant::now();
1183 let rt = tokio::runtime::Builder::new_multi_thread()
1184 .enable_all()
1185 .build()
1186 .unwrap();
1187 rt.spawn_blocking(|| std::thread::sleep(std::time::Duration::from_millis(200)));
1188 rt.shutdown_timeout(std::time::Duration::from_millis(10));
1189 assert!(start.elapsed() < std::time::Duration::from_millis(150));
1190 }
1191
1192 #[cfg(unix)]
1193 #[test]
1194 fn resolved_workspace_path_follows_symlink() {
1195 let tmp = tempfile::tempdir().unwrap();
1196 let real = tmp.path().join("real");
1197 let link = tmp.path().join("link");
1198 std::fs::create_dir_all(&real).unwrap();
1199 std::os::unix::fs::symlink(&real, &link).unwrap();
1200 assert_eq!(
1201 resolved_workspace_path(&link),
1202 std::fs::canonicalize(real).unwrap()
1203 );
1204 }
1205
1206 fn session(id: &str, started_at_ms: u64) -> SessionRecord {
1207 SessionRecord {
1208 id: id.to_string(),
1209 agent: "cursor".to_string(),
1210 model: None,
1211 workspace: "/ws".to_string(),
1212 started_at_ms,
1213 ended_at_ms: None,
1214 status: SessionStatus::Done,
1215 trace_path: "/trace".to_string(),
1216 start_commit: None,
1217 end_commit: None,
1218 branch: None,
1219 dirty_start: None,
1220 dirty_end: None,
1221 repo_binding_source: None,
1222 prompt_fingerprint: None,
1223 parent_session_id: None,
1224 agent_version: None,
1225 os: None,
1226 arch: None,
1227 repo_file_count: None,
1228 repo_total_loc: None,
1229 }
1230 }
1231
1232 fn event(session_id: &str, seq: u64) -> Event {
1233 Event {
1234 session_id: session_id.to_string(),
1235 seq,
1236 ts_ms: seq,
1237 ts_exact: true,
1238 kind: EventKind::ToolCall,
1239 source: EventSource::Tail,
1240 tool: None,
1241 tool_call_id: None,
1242 tokens_in: None,
1243 tokens_out: None,
1244 reasoning_tokens: None,
1245 cost_usd_e6: None,
1246 stop_reason: None,
1247 latency_ms: None,
1248 ttft_ms: None,
1249 retry_count: None,
1250 context_used_tokens: None,
1251 context_max_tokens: None,
1252 cache_creation_tokens: None,
1253 cache_read_tokens: None,
1254 system_prompt_tokens: None,
1255 payload: serde_json::Value::Null,
1256 }
1257 }
1258}