1mod view;
5mod worker;
6
7use crate::core::event::{Event, SessionRecord, SessionStatus};
8use crate::metrics::types::MetricsReport;
9use crate::metrics::{index, report};
10use crate::store::span_tree::SpanNode;
11use crate::store::{SessionFilter, Store};
12use crate::ui::theme;
13use anyhow::Result;
14use arc_swap::ArcSwapOption;
15use crossterm::{
16 event::{self as cxev, Event as CxEvent, KeyCode, KeyEvent, KeyEventKind},
17 execute,
18 terminal::{EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode, enable_raw_mode},
19};
20use notify::{EventKind as NotifyEventKind, RecommendedWatcher, RecursiveMode, Watcher};
21use ratatui::{
22 Terminal,
23 backend::CrosstermBackend,
24 layout::{Constraint, Direction, Layout},
25 style::{Color, Style},
26 text::{Line, Span},
27 widgets::{Block, Borders, List, ListItem, ListState, Paragraph, Wrap},
28};
29use std::collections::HashMap;
30use std::path::{Path, PathBuf};
31use std::sync::{
32 Arc,
33 atomic::{AtomicBool, Ordering},
34};
35use time::OffsetDateTime;
36use tokio::sync::{Notify, mpsc};
37use tokio::time::{Duration, Instant, sleep_until};
38use view::{DetailData, DetailState, EventView, SessionView};
39use worker::{StoreRequest, StoreResponse, spawn_store_worker};
40
41const THIRTY_DAYS_SEC: u64 = 30 * 24 * 3600;
42const MS_HEURISTIC_THRESHOLD: u64 = 1_000_000_000_000;
44const WAL_REFRESH_COALESCE_MS: u64 = 100;
45const REPORT_REFRESH_MIN_MS: u64 = 2_000;
46const DEFAULT_VIEWPORT_HEIGHT: usize = 32;
47
48struct App {
49 sessions: SessionView,
50 events: EventView,
51 detail_state: DetailState,
52 agent_filter: String,
53 filter_mode: bool,
54 filter_buf: String,
55 clipboard_note: String,
56 left_focus: bool,
57 show_help: bool,
58 detail: bool,
59 show_metrics: bool,
60 metrics: Option<MetricsReport>,
61 metrics_cache: Arc<ArcSwapOption<MetricsReport>>,
62 report_dirty: Arc<AtomicBool>,
63 report_notify: Arc<Notify>,
64 pulse: bool,
65 workspace: String,
66 store_tx: mpsc::UnboundedSender<StoreRequest>,
67 store_rx: mpsc::UnboundedReceiver<StoreResponse>,
68 feedback_scores: HashMap<String, u8>,
69 feedback_token: u64,
70 detail_token: u64,
71 last_session_id: Option<String>,
72 session_viewport_height: usize,
73 event_viewport_height: usize,
74 error_note: String,
75}
76
77impl App {
78 fn open(
79 workspace: &Path,
80 metrics_cache: Arc<ArcSwapOption<MetricsReport>>,
81 report_dirty: Arc<AtomicBool>,
82 report_notify: Arc<Notify>,
83 ) -> Result<Self> {
84 let db = workspace.join(".kaizen/kaizen.db");
85 Store::open(&db)?;
86 let (store_tx, store_rx) = spawn_store_worker(db);
87 let ws = workspace.to_string_lossy().to_string();
88 let metrics = metrics_cache.load_full().as_deref().cloned();
89 let app = Self {
90 sessions: SessionView::new(),
91 events: EventView::new(),
92 detail_state: DetailState::Idle,
93 agent_filter: String::new(),
94 filter_mode: false,
95 filter_buf: String::new(),
96 clipboard_note: String::new(),
97 left_focus: true,
98 show_help: false,
99 detail: false,
100 show_metrics: false,
101 metrics,
102 metrics_cache,
103 report_dirty,
104 report_notify,
105 pulse: false,
106 workspace: ws,
107 store_tx,
108 store_rx,
109 feedback_scores: HashMap::new(),
110 feedback_token: 0,
111 detail_token: 0,
112 last_session_id: None,
113 session_viewport_height: DEFAULT_VIEWPORT_HEIGHT,
114 event_viewport_height: DEFAULT_VIEWPORT_HEIGHT,
115 error_note: String::new(),
116 };
117 app.mark_report_dirty();
118 let mut app = app;
119 app.request_session_pages();
120 app.request_feedback_for_viewport();
121 Ok(app)
122 }
123
124 fn sync_metrics_cache(&mut self) {
125 self.metrics = self.metrics_cache.load_full().as_deref().cloned();
126 }
127
128 fn mark_report_dirty(&self) {
129 self.report_dirty.store(true, Ordering::Release);
130 self.report_notify.notify_one();
131 }
132
133 fn refresh_full(&mut self) -> Result<()> {
134 self.sessions.reset();
135 self.events.clear();
136 self.detail_state = DetailState::Idle;
137 self.last_session_id = None;
138 self.pulse = !self.pulse;
139 self.request_session_pages();
140 self.sync_metrics_cache();
141 self.mark_report_dirty();
142 Ok(())
143 }
144
145 fn refresh(&mut self) -> Result<()> {
146 self.pulse = !self.pulse;
147 self.request_session_pages();
148 self.request_selected_detail();
149 self.request_event_pages();
150 self.sync_metrics_cache();
151 self.mark_report_dirty();
152 Ok(())
153 }
154
155 fn filter(&self) -> SessionFilter {
156 SessionFilter {
157 agent_prefix: Some(self.agent_filter.trim().to_lowercase()).filter(|s| !s.is_empty()),
158 status: None,
159 since_ms: None,
160 }
161 }
162
163 fn request_session_pages(&mut self) {
164 let offsets = self
165 .sessions
166 .needed_page_offsets(self.session_viewport_height);
167 for offset in offsets {
168 if self.sessions.request_page(offset) {
169 let _ = self.store_tx.send(StoreRequest::SessionsPage {
170 token: self.sessions.generation(),
171 workspace: self.workspace.clone(),
172 offset,
173 limit: self.sessions.page_size,
174 filter: self.filter(),
175 });
176 }
177 }
178 }
179
180 fn request_feedback_for_viewport(&mut self) {
181 let ids: Vec<String> = self
182 .sessions
183 .visible_rows(self.session_viewport_height)
184 .into_iter()
185 .filter_map(|(_, row)| row.map(|s| s.id.clone()))
186 .filter(|id| !self.feedback_scores.contains_key(id))
187 .collect();
188 if ids.is_empty() {
189 return;
190 }
191 self.feedback_token = self.feedback_token.wrapping_add(1);
192 let _ = self.store_tx.send(StoreRequest::Feedback {
193 token: self.feedback_token,
194 ids,
195 });
196 }
197
198 fn request_selected_detail(&mut self) {
199 let Some(id) = self.selected_id().map(str::to_string) else {
200 self.detail_state = DetailState::Idle;
201 self.events.clear();
202 self.last_session_id = None;
203 return;
204 };
205 if self.last_session_id.as_deref() != Some(&id) {
206 self.events.reset_for(&id);
207 self.detail_token = self.detail_token.wrapping_add(1);
208 self.detail_state = DetailState::Loading {
209 token: self.detail_token,
210 session_id: id.clone(),
211 };
212 let _ = self.store_tx.send(StoreRequest::Detail {
213 token: self.detail_token,
214 session_id: id.clone(),
215 });
216 self.last_session_id = Some(id);
217 }
218 self.request_event_pages();
219 }
220
221 fn request_event_pages(&mut self) {
222 let Some(session_id) = self.events.session_id().map(str::to_string) else {
223 return;
224 };
225 for after_seq in self.events.needed_after_seq(self.event_viewport_height) {
226 if self.events.request_page(after_seq) {
227 let _ = self.store_tx.send(StoreRequest::EventsPage {
228 token: self.events.generation(),
229 session_id: session_id.clone(),
230 after_seq,
231 limit: self.events.page_size,
232 });
233 }
234 }
235 }
236
237 fn apply_store_response(&mut self, response: StoreResponse) {
238 match response {
239 StoreResponse::SessionsPage {
240 token,
241 offset,
242 result,
243 } => self.apply_session_page(token, offset, result),
244 StoreResponse::EventsPage {
245 token,
246 session_id,
247 after_seq,
248 result,
249 } => self.apply_event_page(token, &session_id, after_seq, result),
250 StoreResponse::Detail {
251 token,
252 session_id,
253 result,
254 } => self.apply_detail(token, &session_id, result),
255 StoreResponse::Feedback { token, result } => self.apply_feedback(token, result),
256 }
257 }
258
259 fn apply_session_page(
260 &mut self,
261 token: u64,
262 offset: usize,
263 result: Result<crate::store::SessionPage, String>,
264 ) {
265 if token != self.sessions.generation() {
266 return;
267 }
268 match result {
269 Ok(page) => {
270 self.sessions.finish_page(offset, page.rows, page.total);
271 self.error_note.clear();
272 self.request_feedback_for_viewport();
273 self.request_selected_detail();
274 }
275 Err(err) => {
276 self.sessions.finish_error(offset);
277 self.error_note = err;
278 }
279 }
280 }
281
282 fn apply_event_page(
283 &mut self,
284 token: u64,
285 session_id: &str,
286 after_seq: u64,
287 result: Result<Vec<Event>, String>,
288 ) {
289 if token != self.events.generation() || self.events.session_id() != Some(session_id) {
290 return;
291 }
292 match result {
293 Ok(rows) => self.events.finish_page(after_seq, rows),
294 Err(err) => {
295 self.events.finish_error(after_seq);
296 self.error_note = err;
297 }
298 }
299 }
300
301 fn apply_detail(&mut self, token: u64, session_id: &str, result: Result<DetailData, String>) {
302 match &self.detail_state {
303 DetailState::Loading {
304 token: active,
305 session_id: active_id,
306 } if *active == token && active_id == session_id => {}
307 _ => return,
308 }
309 self.detail_state = match result {
310 Ok(data) => DetailState::Ready(data),
311 Err(err) => DetailState::Error(err),
312 };
313 }
314
315 fn apply_feedback(&mut self, token: u64, result: Result<HashMap<String, u8>, String>) {
316 if token != self.feedback_token {
317 return;
318 }
319 if let Ok(scores) = result {
320 self.feedback_scores.extend(scores);
321 }
322 }
323
324 fn set_viewport_height(&mut self, height: usize) {
325 let h = height.saturating_sub(4);
326 self.session_viewport_height = h.max(1);
327 self.event_viewport_height = h.max(1);
328 self.sessions
329 .set_viewport_height(self.session_viewport_height);
330 self.events.set_viewport_height(self.event_viewport_height);
331 }
332
333 fn after_session_cursor_move(&mut self) {
334 self.request_session_pages();
335 self.request_feedback_for_viewport();
336 self.request_selected_detail();
337 }
338
339 fn selected_session(&self) -> Option<&SessionRecord> {
340 self.sessions.selected()
341 }
342
343 fn selected_id(&self) -> Option<&str> {
344 self.selected_session().map(|s| s.id.as_str())
345 }
346
347 fn selected_event(&self) -> Option<&Event> {
348 self.events.selected()
349 }
350}
351
352fn time_ago_label(now_ms: u64, ts_ms: u64) -> String {
354 if ts_ms == 0 {
355 return "?".to_string();
356 }
357 let mut ts = ts_ms;
358 if ts < MS_HEURISTIC_THRESHOLD && now_ms >= MS_HEURISTIC_THRESHOLD {
359 ts = ts.saturating_mul(1000);
360 }
361 let diff_sec = now_ms.saturating_sub(ts) / 1000;
362 if diff_sec > THIRTY_DAYS_SEC {
363 return abs_ts_label(ts);
364 }
365 match diff_sec {
366 0 => "just now".to_string(),
367 s if s < 60 => format!("{s}s"),
368 s if s < 3600 => format!("{}m", s / 60),
369 s if s < 86_400 => format!("{}h", s / 3600),
370 s => format!("{}d", s / 86_400),
371 }
372}
373
374fn abs_ts_label(ts_ms: u64) -> String {
375 let Ok(dt) = OffsetDateTime::from_unix_timestamp((ts_ms / 1000) as i64) else {
376 return "?".to_string();
377 };
378 format!(
379 "{:04}-{:02}-{:02} {:02}:{:02}",
380 dt.year(),
381 u8::from(dt.month()),
382 dt.day(),
383 dt.hour(),
384 dt.minute()
385 )
386}
387
388fn truncate(s: &str, max: usize) -> &str {
389 if s.chars().count() <= max {
390 return s;
391 }
392 s.char_indices()
393 .nth(max.saturating_sub(1))
394 .map(|(i, _)| &s[..i])
395 .unwrap_or(s)
396}
397
398fn model_suffix(model: &Option<String>) -> String {
399 const MAX: usize = 20;
400 match model {
401 Some(m) if !m.is_empty() => format!(" {}", truncate(m, MAX)),
402 _ => " —".to_string(),
403 }
404}
405
406fn session_status_letter(s: &SessionRecord) -> char {
407 match s.status {
408 SessionStatus::Running => 'R',
409 SessionStatus::Waiting => 'W',
410 SessionStatus::Idle => 'I',
411 SessionStatus::Done => 'D',
412 }
413}
414
415fn format_event_tokens(e: &Event) -> Option<String> {
416 let mut out = String::new();
417 match (e.tokens_in, e.tokens_out) {
418 (Some(a), Some(b)) => out = format!("{a}/{b}"),
419 (Some(a), None) => out = a.to_string(),
420 (None, Some(b)) => out = b.to_string(),
421 (None, None) => {}
422 }
423 if let Some(r) = e.reasoning_tokens {
424 if out.is_empty() {
425 out = format!("r{r}");
426 } else {
427 out = format!("{out}+r{r}");
428 }
429 }
430 if out.is_empty() { None } else { Some(out) }
431}
432
433fn event_row_text(now_ms: u64, e: &Event, lead: &HashMap<String, u64>) -> String {
434 let age = time_ago_label(now_ms, e.ts_ms);
435 let tool = e.tool.as_deref().unwrap_or("-");
436 let lead_s = e
437 .tool_call_id
438 .as_ref()
439 .and_then(|id| lead.get(id).copied())
440 .map(|ms| format!("{ms}ms"))
441 .unwrap_or_else(|| "—".to_string());
442 let tok = format_event_tokens(e)
443 .map(|s| format!(" tok={s}"))
444 .unwrap_or_default();
445 format!("{age} {kind:?} {tool}{tok} {lead_s}", kind = e.kind)
446}
447
448fn draw(f: &mut ratatui::Frame, app: &App) {
449 if app.show_help {
450 draw_help(f);
451 return;
452 }
453 let chunks = Layout::default()
454 .direction(Direction::Vertical)
455 .constraints([Constraint::Min(1), Constraint::Length(1)])
456 .split(f.area());
457
458 let panes = Layout::default()
459 .direction(Direction::Horizontal)
460 .constraints([Constraint::Percentage(35), Constraint::Percentage(65)])
461 .split(chunks[0]);
462
463 draw_sessions(f, app, panes[0]);
464 draw_events(f, app, panes[1]);
465 draw_statusbar(f, app, chunks[1]);
466}
467
468fn draw_sessions(f: &mut ratatui::Frame, app: &App, area: ratatui::layout::Rect) {
469 let border_color = if app.left_focus {
470 theme::BORDER_ACTIVE
471 } else {
472 theme::BORDER_INACTIVE
473 };
474 let title = if app.agent_filter.is_empty() {
475 format!("Sessions ({})", app.sessions.total)
476 } else {
477 format!(
478 "Sessions {} (agent prefix: {:?})",
479 app.sessions.total, app.agent_filter
480 )
481 };
482 let block = Block::default()
483 .title(title)
484 .borders(Borders::ALL)
485 .border_style(Style::default().fg(border_color));
486 let now = now_ms();
487 let items: Vec<ListItem> = app
488 .sessions
489 .visible_rows(area.height.saturating_sub(2) as usize)
490 .into_iter()
491 .map(|(idx, row)| {
492 row.map(|s| session_item(now, s, &app.feedback_scores))
493 .unwrap_or_else(|| ListItem::new(format!("{idx:>6} loading...")))
494 })
495 .collect();
496 let mut state = ListState::default();
497 state.select(
498 app.sessions
499 .selected_local_index(area.height.saturating_sub(2) as usize),
500 );
501 f.render_stateful_widget(
502 ratatui::widgets::List::new(items)
503 .block(block)
504 .highlight_style(Style::default().bg(Color::Blue).fg(Color::White)),
505 area,
506 &mut state,
507 );
508}
509
510fn session_item<'a>(
511 now: u64,
512 s: &'a SessionRecord,
513 feedback_scores: &HashMap<String, u8>,
514) -> ListItem<'a> {
515 let st = format!("{:?}", s.status);
516 let st_color = theme::status_color(&st);
517 let age = time_ago_label(now, s.started_at_ms);
518 let tag = session_status_letter(s);
519 let m = model_suffix(&s.model);
520 let score_span = feedback_scores.get(&s.id).map(|&sc| {
521 let color = match sc {
522 1..=2 => Color::Red,
523 3 => Color::Yellow,
524 _ => Color::Green,
525 };
526 Span::styled(format!("★{sc}"), Style::default().fg(color))
527 });
528 let mut spans = vec![
529 Span::styled(format!("{:.10}", s.id), Style::default().fg(Color::Gray)),
530 Span::raw(" "),
531 Span::styled(
532 format!("{:.7}", s.agent),
533 Style::default().fg(theme::agent_color(&s.agent)),
534 ),
535 Span::raw(" "),
536 Span::styled(format!("{tag}"), Style::default().fg(st_color)),
537 Span::raw(" "),
538 Span::styled(age, Style::default().fg(Color::White)),
539 Span::styled(m, Style::default().fg(Color::Gray)),
540 ];
541 if let Some(s) = score_span {
542 spans.push(Span::raw(" "));
543 spans.push(s);
544 }
545 ListItem::new(Line::from(spans))
546}
547
548fn draw_events(f: &mut ratatui::Frame, app: &App, area: ratatui::layout::Rect) {
549 if app.show_metrics {
550 draw_metrics(f, app, area);
551 return;
552 }
553 let id = app.selected_id().unwrap_or("-");
554 let model = app
555 .selected_session()
556 .and_then(|s| s.model.as_deref().filter(|m| !m.is_empty()))
557 .map(|m| truncate(m, 24).to_string())
558 .unwrap_or_else(|| "—".to_string());
559 let border_color = if !app.left_focus {
560 theme::BORDER_ACTIVE
561 } else {
562 theme::BORDER_INACTIVE
563 };
564 let title = format!("Events — {:.18} — {}", id, model);
565 let now = now_ms();
566 let leads = app.detail_state.leads();
567 let spans = app.detail_state.spans();
568 if app.detail
569 && let Some(ev) = app.selected_event()
570 {
571 let split = Layout::default()
572 .direction(Direction::Vertical)
573 .constraints([Constraint::Min(2), Constraint::Length(10)])
574 .split(area);
575 render_event_list(f, app, split[0], title.clone(), border_color, now);
576 let detail = event_detail_text(ev, leads);
577 let det_block = Block::default()
578 .title("Detail")
579 .borders(Borders::ALL)
580 .border_style(Style::default().fg(border_color));
581 f.render_widget(
582 Paragraph::new(detail)
583 .block(det_block)
584 .wrap(Wrap { trim: true }),
585 split[1],
586 );
587 return;
588 }
589 if !spans.is_empty() {
590 let max_depth: u32 = spans.iter().map(|n| n.span.depth).max().unwrap_or(0);
591 let strip_h = (max_depth + 3).min(8) as u16;
592 let split = Layout::default()
593 .direction(Direction::Vertical)
594 .constraints([Constraint::Min(2), Constraint::Length(strip_h)])
595 .split(area);
596 render_event_list(f, app, split[0], title, border_color, now);
597 let span_text: Vec<Line> = span_depth_lines(spans);
598 let span_block = Block::default()
599 .title("Span tree")
600 .borders(Borders::ALL)
601 .border_style(Style::default().fg(theme::BORDER_INACTIVE));
602 f.render_widget(
603 Paragraph::new(span_text)
604 .block(span_block)
605 .wrap(Wrap { trim: false }),
606 split[1],
607 );
608 return;
609 }
610 if matches!(app.detail_state, DetailState::Loading { .. }) && app.events.total_loaded == 0 {
611 let block = Block::default()
612 .title(title)
613 .borders(Borders::ALL)
614 .border_style(Style::default().fg(border_color));
615 f.render_widget(Paragraph::new("loading...").block(block), area);
616 return;
617 }
618 if let Some(err) = app.detail_state.error_message()
619 && app.events.total_loaded == 0
620 {
621 let block = Block::default()
622 .title(title)
623 .borders(Borders::ALL)
624 .border_style(Style::default().fg(border_color));
625 f.render_widget(Paragraph::new(err.to_string()).block(block), area);
626 return;
627 }
628 render_event_list(f, app, area, title, border_color, now);
629}
630
631fn render_event_list(
632 f: &mut ratatui::Frame,
633 app: &App,
634 area: ratatui::layout::Rect,
635 title: String,
636 border_color: Color,
637 now: u64,
638) {
639 let block = Block::default()
640 .title(title)
641 .borders(Borders::ALL)
642 .border_style(Style::default().fg(border_color));
643 let items: Vec<ListItem> = app
644 .events
645 .visible_rows(area.height.saturating_sub(2) as usize)
646 .into_iter()
647 .map(|(idx, row)| {
648 row.map(|e| ListItem::new(event_row_text(now, e, app.detail_state.leads())))
649 .unwrap_or_else(|| ListItem::new(format!("{idx:>6} loading...")))
650 })
651 .collect();
652 let mut state = ListState::default();
653 state.select(
654 app.events
655 .selected_local_index(area.height.saturating_sub(2) as usize),
656 );
657 f.render_stateful_widget(
658 List::new(items)
659 .block(block)
660 .highlight_style(Style::default().bg(Color::Blue).fg(Color::White)),
661 area,
662 &mut state,
663 );
664}
665
666fn span_depth_lines(nodes: &[SpanNode]) -> Vec<Line<'static>> {
667 let mut lines = Vec::new();
668 for n in nodes {
669 push_span_line(&mut lines, n, 0);
670 }
671 lines
672}
673
674fn push_span_line(lines: &mut Vec<Line<'static>>, node: &SpanNode, depth: u32) {
675 let indent = " ".repeat(depth as usize);
676 let prefix = if depth == 0 { "┌ " } else { "├ " };
677 let cost = node
678 .span
679 .subtree_cost_usd_e6
680 .map(|c| format!(" ${:.4}", c as f64 / 1_000_000.0))
681 .unwrap_or_default();
682 let text = format!("{}{}{}{}", indent, prefix, node.span.tool, cost);
683 lines.push(Line::from(Span::raw(text)));
684 for child in &node.children {
685 push_span_line(lines, child, depth + 1);
686 }
687}
688
689fn event_detail_text(ev: &Event, lead: &HashMap<String, u64>) -> String {
690 let lead_s = ev
691 .tool_call_id
692 .as_ref()
693 .and_then(|id| lead.get(id).copied())
694 .map(|ms| format!("{ms}ms"))
695 .unwrap_or_else(|| "—".to_string());
696 let head = format!(
697 "seq={} kind={:?} tool={} call_id={} in={:?} out={:?} r={:?} cost_e6={:?} lead={}\n",
698 ev.seq,
699 ev.kind,
700 ev.tool.as_deref().unwrap_or("-"),
701 ev.tool_call_id.as_deref().unwrap_or("—"),
702 ev.tokens_in,
703 ev.tokens_out,
704 ev.reasoning_tokens,
705 ev.cost_usd_e6,
706 lead_s
707 );
708 let json = serde_json::to_string_pretty(&ev.payload).unwrap_or_else(|_| ev.payload.to_string());
709 head + &json
710}
711
712fn draw_metrics(f: &mut ratatui::Frame, app: &App, area: ratatui::layout::Rect) {
713 let block = Block::default()
714 .title("Metrics")
715 .borders(Borders::ALL)
716 .border_style(Style::default().fg(theme::BORDER_ACTIVE));
717 let empty = app.metrics.is_none()
718 || app
719 .metrics
720 .as_ref()
721 .is_some_and(|m| m.slowest_tools.is_empty() && m.hottest_files.is_empty());
722 let text = if empty {
723 "(No metrics in this window yet. Run `kaizen metrics` in a shell, or `r` here after a repo is indexed.)\n\nMetrics need a successful snapshot + events for tool spans — see docs/telemetry-journey.md."
724 .to_string()
725 } else {
726 let mut lines = vec!["Slow tools".to_string()];
727 if let Some(metrics) = &app.metrics {
728 for row in metrics.slowest_tools.iter().take(4) {
729 let p95 = row
730 .p95_ms
731 .map(|v| format!("{v}ms"))
732 .unwrap_or_else(|| "-".into());
733 lines.push(format!("{} p95={} tok={}", row.tool, p95, row.total_tokens));
734 }
735 lines.push(String::new());
736 lines.push("Hot files".into());
737 for row in metrics.hottest_files.iter().take(4) {
738 lines.push(format!("{} {}", row.value, row.path));
739 }
740 }
741 lines.join("\n")
742 };
743 f.render_widget(Paragraph::new(text).block(block), area);
744}
745
746fn draw_statusbar(f: &mut ratatui::Frame, app: &App, area: ratatui::layout::Rect) {
747 let pulse = if app.pulse { "●" } else { "○" };
748 let text = if app.filter_mode {
749 format!(
750 "FILTER type agent substring | Enter apply | Esc cancel | buffer: {}",
751 app.filter_buf
752 )
753 } else {
754 let note = if app.clipboard_note.is_empty() {
755 String::new()
756 } else {
757 format!(" | {}", app.clipboard_note)
758 };
759 format!(
760 "LIVE {pulse} j/k Tab m metrics / filter y copy id Enter detail ? help q quit{note}"
761 )
762 };
763 f.render_widget(Paragraph::new(text), area);
764}
765
766fn draw_help(f: &mut ratatui::Frame) {
767 let text = "j/k ↑/↓ move in focused pane | g/G first/last | Tab switch pane\n\
768 Enter toggle event detail | Esc back | r refresh | q quit\n\
769 / filter sessions by agent substring | y copy session id (left pane)";
770 let block = Block::default().title("Help").borders(Borders::ALL);
771 f.render_widget(Paragraph::new(text).block(block), f.area());
772}
773
774fn now_ms() -> u64 {
775 std::time::SystemTime::now()
776 .duration_since(std::time::UNIX_EPOCH)
777 .unwrap_or_default()
778 .as_millis() as u64
779}
780
781fn spawn_key_reader(stop: Arc<AtomicBool>) -> mpsc::UnboundedReceiver<KeyEvent> {
782 let (tx, rx) = mpsc::unbounded_channel();
783 tokio::task::spawn_blocking(move || {
784 while !stop.load(Ordering::Acquire) {
785 match cxev::poll(Duration::from_millis(250)) {
786 Ok(true) => match cxev::read() {
787 Ok(CxEvent::Key(k)) if k.kind == KeyEventKind::Press => {
788 if tx.send(k).is_err() {
789 break;
790 }
791 }
792 Ok(_) => {}
793 Err(_) => break,
794 },
795 Ok(false) => {}
796 Err(_) => break,
797 }
798 }
799 });
800 rx
801}
802
803fn spawn_wal_watcher(
804 wal_path: &Path,
805 dirty: Arc<AtomicBool>,
806) -> Result<(RecommendedWatcher, mpsc::UnboundedReceiver<()>)> {
807 let (tx, rx) = mpsc::unbounded_channel();
808 let watched_wal = wal_path.to_path_buf();
809 let callback_wal = watched_wal.clone();
810 let mut watcher = RecommendedWatcher::new(
811 move |res: notify::Result<notify::Event>| {
812 if let Ok(event) = res
813 && !matches!(event.kind, NotifyEventKind::Access(_))
814 && event.paths.iter().any(|p| p == &callback_wal)
815 {
816 dirty.store(true, Ordering::Release);
817 let _ = tx.send(());
818 }
819 },
820 notify::Config::default(),
821 )?;
822 watcher.watch(
823 watched_wal.parent().unwrap_or_else(|| Path::new(".")),
824 RecursiveMode::NonRecursive,
825 )?;
826 Ok((watcher, rx))
827}
828
829fn spawn_report_worker(
830 db_path: PathBuf,
831 workspace: String,
832 cache: Arc<ArcSwapOption<MetricsReport>>,
833 dirty: Arc<AtomicBool>,
834 notify: Arc<Notify>,
835 ui_tx: mpsc::UnboundedSender<()>,
836) {
837 tokio::spawn(async move {
838 let mut last_run = Instant::now() - Duration::from_millis(REPORT_REFRESH_MIN_MS);
839 loop {
840 notify.notified().await;
841 let ready_at = last_run + Duration::from_millis(REPORT_REFRESH_MIN_MS);
842 let now = Instant::now();
843 if now < ready_at {
844 sleep_until(ready_at).await;
845 }
846 if !dirty.swap(false, Ordering::AcqRel) {
847 continue;
848 }
849 let db = db_path.clone();
850 let ws = workspace.clone();
851 let next = tokio::task::spawn_blocking(move || {
852 let store = Store::open_read_only(&db)?;
853 report::build_report(&store, &ws, 7)
854 })
855 .await;
856 last_run = Instant::now();
857 if let Ok(Ok(report)) = next {
858 cache.store(Some(Arc::new(report)));
859 let _ = ui_tx.send(());
860 }
861 }
862 });
863}
864
865fn spawn_tui_setup_worker(
866 workspace: PathBuf,
867 db_path: PathBuf,
868 report_dirty: Arc<AtomicBool>,
869 report_notify: Arc<Notify>,
870) {
871 tokio::task::spawn_blocking(move || {
872 if let Ok(store) = Store::open(&db_path) {
873 let _ = index::ensure_indexed(&store, &workspace, false);
874 report_dirty.store(true, Ordering::Release);
875 report_notify.notify_one();
876 }
877 });
878}
879
880async fn wait_for_deadline(deadline: Option<Instant>) {
881 match deadline {
882 Some(deadline) => sleep_until(deadline).await,
883 None => std::future::pending::<()>().await,
884 }
885}
886
887pub async fn run(workspace: &Path) -> Result<()> {
889 let workspace_buf = resolved_workspace_path(workspace);
890 let workspace = workspace_buf.as_path();
891 let db_path = workspace.join(".kaizen/kaizen.db");
892 let metrics_cache = Arc::new(ArcSwapOption::from(None));
893 let report_dirty = Arc::new(AtomicBool::new(true));
894 let report_notify = Arc::new(Notify::new());
895 let (report_ui_tx, mut report_ui_rx) = mpsc::unbounded_channel();
896 spawn_report_worker(
897 db_path.clone(),
898 workspace.to_string_lossy().to_string(),
899 metrics_cache.clone(),
900 report_dirty.clone(),
901 report_notify.clone(),
902 report_ui_tx,
903 );
904 let mut app = App::open(
905 workspace,
906 metrics_cache,
907 report_dirty.clone(),
908 report_notify.clone(),
909 )?;
910 spawn_tui_setup_worker(
911 workspace.to_path_buf(),
912 db_path.clone(),
913 report_dirty.clone(),
914 report_notify.clone(),
915 );
916 enable_raw_mode()?;
917 let mut stdout = std::io::stdout();
918 execute!(stdout, EnterAlternateScreen)?;
919 let backend = CrosstermBackend::new(stdout);
920 let mut terminal = Terminal::new(backend)?;
921
922 std::panic::set_hook(Box::new(|_| {
923 let _ = disable_raw_mode();
924 let _ = execute!(std::io::stdout(), LeaveAlternateScreen);
925 }));
926
927 let wal_dirty = Arc::new(AtomicBool::new(false));
928 let (_watcher, mut wal_rx) =
929 match spawn_wal_watcher(&db_path.with_extension("db-wal"), wal_dirty.clone()) {
930 Ok((watcher, rx)) => (Some(watcher), rx),
931 Err(_) => {
932 let (_tx, rx) = mpsc::unbounded_channel();
933 (None, rx)
934 }
935 };
936 let input_stop = Arc::new(AtomicBool::new(false));
937 let mut key_rx = spawn_key_reader(input_stop.clone());
938 let mut needs_draw = true;
939 let mut pending_refresh = false;
940 let mut refresh_deadline = None;
941 let mut last_refresh = Instant::now() - Duration::from_millis(WAL_REFRESH_COALESCE_MS);
942
943 loop {
944 if needs_draw {
945 app.sync_metrics_cache();
946 terminal.draw(|f| {
947 app.set_viewport_height(f.area().height as usize);
948 app.request_session_pages();
949 app.request_event_pages();
950 draw(f, &app);
951 })?;
952 needs_draw = false;
953 }
954 tokio::select! {
955 Some(response) = app.store_rx.recv() => {
956 app.apply_store_response(response);
957 needs_draw = true;
958 }
959 _ = wait_for_deadline(refresh_deadline), if refresh_deadline.is_some() => {
960 refresh_deadline = None;
961 if pending_refresh {
962 pending_refresh = false;
963 if app.refresh().is_ok() {
964 last_refresh = Instant::now();
965 needs_draw = true;
966 }
967 }
968 }
969 Some(_) = wal_rx.recv() => {
970 if wal_dirty.swap(false, Ordering::AcqRel) {
971 let ready_at = last_refresh + Duration::from_millis(WAL_REFRESH_COALESCE_MS);
972 let now = Instant::now();
973 if now >= ready_at {
974 if app.refresh().is_ok() {
975 last_refresh = now;
976 needs_draw = true;
977 }
978 } else {
979 pending_refresh = true;
980 refresh_deadline = Some(ready_at);
981 }
982 }
983 }
984 Some(_) = report_ui_rx.recv() => {
985 app.sync_metrics_cache();
986 needs_draw = true;
987 }
988 Some(k) = key_rx.recv() => {
989 if app.filter_mode {
990 match k.code {
991 KeyCode::Enter => {
992 app.agent_filter = app.filter_buf.trim().to_string();
993 app.filter_mode = false;
994 let _ = app.refresh_full();
995 needs_draw = true;
996 }
997 KeyCode::Esc => {
998 app.filter_mode = false;
999 app.filter_buf.clear();
1000 needs_draw = true;
1001 }
1002 KeyCode::Backspace => {
1003 app.filter_buf.pop();
1004 needs_draw = true;
1005 }
1006 KeyCode::Char(c) => {
1007 app.filter_buf.push(c);
1008 needs_draw = true;
1009 }
1010 _ => {}
1011 }
1012 continue;
1013 }
1014 match k.code {
1015 KeyCode::Char('/') => {
1016 app.filter_mode = true;
1017 app.filter_buf.clone_from(&app.agent_filter);
1018 needs_draw = true;
1019 }
1020 KeyCode::Char('y') if app.left_focus => {
1021 if let Some(id) = app.selected_id() {
1022 match arboard::Clipboard::new() {
1023 Ok(mut cb) => {
1024 if cb.set_text(id).is_ok() {
1025 app.clipboard_note = "copied session id".to_string();
1026 } else {
1027 app.clipboard_note = "clipboard write failed".to_string();
1028 }
1029 }
1030 Err(_) => app.clipboard_note = "no clipboard".to_string(),
1031 }
1032 needs_draw = true;
1033 }
1034 }
1035 KeyCode::Char('q') | KeyCode::Esc if !app.detail && !app.show_help => break,
1036 KeyCode::Char('q') if app.show_help => { app.show_help = false; needs_draw = true; }
1037 KeyCode::Char('q') => { app.detail = false; app.show_help = false; needs_draw = true; }
1038 KeyCode::Esc | KeyCode::Backspace => {
1039 app.detail = false;
1040 app.show_help = false;
1041 needs_draw = true;
1042 }
1043 KeyCode::Char('?') => { app.show_help = !app.show_help; needs_draw = true; }
1044 KeyCode::Char('m') => { app.show_metrics = !app.show_metrics; needs_draw = true; }
1045 KeyCode::Tab => {
1046 app.left_focus = !app.left_focus;
1047 needs_draw = true;
1048 }
1049 KeyCode::Char('r') => { let _ = app.refresh_full(); needs_draw = true; }
1050 KeyCode::Char('j') | KeyCode::Down => {
1051 if app.show_metrics || app.left_focus {
1052 app.sessions.move_by(1);
1053 app.after_session_cursor_move();
1054 needs_draw = true;
1055 } else {
1056 app.events.move_by(1);
1057 app.request_event_pages();
1058 needs_draw = true;
1059 }
1060 }
1061 KeyCode::Char('k') | KeyCode::Up => {
1062 if app.show_metrics || app.left_focus {
1063 app.sessions.move_by(-1);
1064 app.after_session_cursor_move();
1065 needs_draw = true;
1066 } else {
1067 app.events.move_by(-1);
1068 app.request_event_pages();
1069 needs_draw = true;
1070 }
1071 }
1072 KeyCode::Char('g') => {
1073 if app.show_metrics || app.left_focus {
1074 app.sessions.cursor = 0;
1075 app.after_session_cursor_move();
1076 } else {
1077 app.events.cursor = 0;
1078 app.request_event_pages();
1079 }
1080 needs_draw = true;
1081 }
1082 KeyCode::Char('G') => {
1083 if app.show_metrics || app.left_focus {
1084 app.sessions.jump_last();
1085 app.after_session_cursor_move();
1086 } else {
1087 app.events.jump_last_loaded();
1088 app.request_event_pages();
1089 }
1090 needs_draw = true;
1091 }
1092 KeyCode::Enter if app.selected_event().is_some() && !app.show_metrics => {
1093 app.detail = !app.detail;
1094 needs_draw = true;
1095 }
1096 _ => {}
1097 }
1098 }
1099 else => {}
1100 }
1101 }
1102
1103 input_stop.store(true, Ordering::Release);
1104 disable_raw_mode()?;
1105 execute!(terminal.backend_mut(), LeaveAlternateScreen)?;
1106 Ok(())
1107}
1108
1109fn resolved_workspace_path(workspace: &Path) -> PathBuf {
1110 crate::core::workspace::canonical(workspace)
1111}
1112
1113#[cfg(test)]
1114mod tests {
1115 use super::*;
1116 use crate::core::event::{EventKind, EventSource};
1117
1118 #[test]
1119 fn time_ago_just_now() {
1120 assert_eq!(time_ago_label(10_000, 10_000), "just now");
1121 }
1122
1123 #[test]
1124 fn time_ago_treats_small_ts_as_seconds() {
1125 let now = 1_700_000_000_000u64;
1126 let ts_sec = 1_700_000_000u64;
1127 let label = time_ago_label(now, ts_sec);
1128 assert!(
1129 !label.contains('?'),
1130 "expected relative label, got {label:?}"
1131 );
1132 }
1133
1134 #[test]
1135 fn time_ago_old_uses_absolute() {
1136 let now = 1_700_000_000_000u64;
1137 let old = now - (40u64 * 24 * 3600 * 1000);
1138 let label = time_ago_label(now, old);
1139 assert!(label.contains('-'), "expected date-like label: {label}");
1140 }
1141
1142 #[test]
1143 fn session_view_clamps_cursor_and_suppresses_double_load() {
1144 let mut view = view::SessionView::new();
1145 view.set_viewport_height(4);
1146 assert_eq!(view.needed_page_offsets(4), vec![0]);
1147 assert!(view.request_page(0));
1148 assert!(!view.request_page(0));
1149 view.finish_page(0, vec![session("a", 3), session("b", 2)], 2);
1150 view.move_by(99);
1151 assert_eq!(view.cursor, 1);
1152 assert_eq!(view.selected().unwrap().id, "b");
1153 }
1154
1155 #[test]
1156 fn session_view_eviction_keeps_cursor_page() {
1157 let mut view = view::SessionView::new();
1158 view.page_size = 2;
1159 for offset in (0..=20).step_by(2) {
1160 view.cursor = offset;
1161 view.finish_page(
1162 offset,
1163 vec![session(&format!("s{offset}"), offset as u64)],
1164 30,
1165 );
1166 }
1167 assert!(view.window.contains_key(&20));
1168 assert!(!view.window.contains_key(&0));
1169 }
1170
1171 #[test]
1172 fn event_view_paginates_from_zero_and_resets_generation() {
1173 let mut view = view::EventView::new();
1174 view.page_size = 2;
1175 view.reset_for("s1");
1176 let token = view.generation();
1177 assert!(view.needed_after_seq(4).starts_with(&[0, 2]));
1178 assert!(view.request_page(0));
1179 assert!(!view.request_page(0));
1180 view.finish_page(0, vec![event("s1", 0), event("s1", 1)]);
1181 view.reset_for("s2");
1182 assert_ne!(view.generation(), token);
1183 assert!(view.window.is_empty());
1184 }
1185
1186 #[cfg(unix)]
1187 #[test]
1188 fn resolved_workspace_path_follows_symlink() {
1189 let tmp = tempfile::tempdir().unwrap();
1190 let real = tmp.path().join("real");
1191 let link = tmp.path().join("link");
1192 std::fs::create_dir_all(&real).unwrap();
1193 std::os::unix::fs::symlink(&real, &link).unwrap();
1194 assert_eq!(
1195 resolved_workspace_path(&link),
1196 std::fs::canonicalize(real).unwrap()
1197 );
1198 }
1199
1200 fn session(id: &str, started_at_ms: u64) -> SessionRecord {
1201 SessionRecord {
1202 id: id.to_string(),
1203 agent: "cursor".to_string(),
1204 model: None,
1205 workspace: "/ws".to_string(),
1206 started_at_ms,
1207 ended_at_ms: None,
1208 status: SessionStatus::Done,
1209 trace_path: "/trace".to_string(),
1210 start_commit: None,
1211 end_commit: None,
1212 branch: None,
1213 dirty_start: None,
1214 dirty_end: None,
1215 repo_binding_source: None,
1216 prompt_fingerprint: None,
1217 parent_session_id: None,
1218 agent_version: None,
1219 os: None,
1220 arch: None,
1221 repo_file_count: None,
1222 repo_total_loc: None,
1223 }
1224 }
1225
1226 fn event(session_id: &str, seq: u64) -> Event {
1227 Event {
1228 session_id: session_id.to_string(),
1229 seq,
1230 ts_ms: seq,
1231 ts_exact: true,
1232 kind: EventKind::ToolCall,
1233 source: EventSource::Tail,
1234 tool: None,
1235 tool_call_id: None,
1236 tokens_in: None,
1237 tokens_out: None,
1238 reasoning_tokens: None,
1239 cost_usd_e6: None,
1240 stop_reason: None,
1241 latency_ms: None,
1242 ttft_ms: None,
1243 retry_count: None,
1244 context_used_tokens: None,
1245 context_max_tokens: None,
1246 cache_creation_tokens: None,
1247 cache_read_tokens: None,
1248 system_prompt_tokens: None,
1249 payload: serde_json::Value::Null,
1250 }
1251 }
1252}