use std::collections::HashMap;
use std::collections::VecDeque;
use std::io::Stdout;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use crossterm::{
event::{self, Event, KeyCode, KeyModifiers},
execute,
terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen},
};
use ratatui::{
backend::CrosstermBackend,
layout::{Constraint, Direction, Layout},
style::{Color, Modifier, Style},
text::{Line, Span},
widgets::{Block, Borders, List, ListItem, Paragraph, Wrap},
Frame, Terminal,
};
use tokio_stream::StreamExt;
use crate::canonical::Role;
use crate::logging::{LogEntry, RouterEvent};
const MAX_EVENTS: usize = 200;
#[derive(Default)]
struct Stats {
total: u32,
errors: u32,
total_duration_ms: u64,
by_provider: HashMap<String, u32>,
by_tag: HashMap<String, u32>,
last_ok: HashMap<String, u64>,
}
impl Stats {
fn ingest(&mut self, e: &LogEntry) {
self.total += 1;
if e.error.is_some() {
self.errors += 1;
} else {
self.last_ok.insert(e.provider.clone(), e.ts_ms);
}
self.total_duration_ms += e.duration_ms;
*self.by_provider.entry(e.provider.clone()).or_default() += 1;
for tag in &e.tags {
*self.by_tag.entry(tag.clone()).or_default() += 1;
}
if let Some(resp) = &e.response {
for tag in &resp.tags {
*self.by_tag.entry(tag.clone()).or_default() += 1;
}
}
}
fn avg_latency_ms(&self) -> u64 {
if self.total == 0 {
0
} else {
self.total_duration_ms / self.total as u64
}
}
}
#[derive(PartialEq, Clone, Copy)]
enum Focus {
Feed,
Chat,
}
struct AppState {
events: VecDeque<LogEntry>,
pending: Vec<PendingEntry>,
stats: Stats,
scroll: usize,
focus: Focus,
input: String,
model: String,
chat_history: Vec<serde_json::Value>,
stream_buf: String,
last_routed: Option<RoutedMeta>,
sending: bool,
sse_status: String,
base_url: String,
}
struct PendingEntry {
id: u64,
ts_ms: u64,
model: String,
tags: Vec<String>,
routed: Option<(String, String)>, }
struct RoutedMeta {
provider: String,
sent_model: String,
tags: Vec<String>,
response_tags: Vec<String>,
}
impl AppState {
fn new(base_url: &str) -> Self {
AppState {
events: VecDeque::new(),
pending: Vec::new(),
stats: Stats::default(),
scroll: 0,
focus: Focus::Feed,
input: String::new(),
model: "llama3.1:8b".to_string(),
chat_history: Vec::new(),
stream_buf: String::new(),
last_routed: None,
sending: false,
sse_status: "connecting…".to_string(),
base_url: base_url.to_string(),
}
}
fn on_start(&mut self, id: u64, ts_ms: u64, model: String) {
self.pending.push(PendingEntry { id, ts_ms, model, tags: vec![], routed: None });
}
fn on_classified(&mut self, id: u64, tags: Vec<String>) {
if let Some(e) = self.pending.iter_mut().find(|e| e.id == id) {
e.tags = tags;
}
}
fn on_routed(&mut self, id: u64, provider: String, model: String) {
if let Some(e) = self.pending.iter_mut().find(|e| e.id == id) {
e.routed = Some((provider, model));
}
}
fn on_complete(&mut self, id: u64, entry: LogEntry) {
self.pending.retain(|e| e.id != id);
self.stats.ingest(&entry);
self.last_routed = Some(RoutedMeta {
provider: entry.provider.clone(),
sent_model: entry.sent_model.clone(),
tags: entry.tags.clone(),
response_tags: entry.response.as_ref().map(|r| r.tags.clone()).unwrap_or_default(),
});
self.events.push_front(entry);
if self.events.len() > MAX_EVENTS {
self.events.pop_back();
}
if self.scroll > 0 {
self.scroll += 1;
}
}
}
pub async fn run(base_url: &str) -> anyhow::Result<()> {
let state = Arc::new(Mutex::new(AppState::new(base_url)));
{
let state = state.clone();
let url = format!("{}/dashboard/events", base_url.trim_end_matches('/'));
tokio::spawn(async move { read_sse(url, state).await });
}
enable_raw_mode()?;
let mut stdout = std::io::stdout();
execute!(stdout, EnterAlternateScreen)?;
let backend = CrosstermBackend::new(stdout);
let mut terminal = Terminal::new(backend)?;
terminal.clear()?;
let result = event_loop(&mut terminal, &state).await;
let _ = disable_raw_mode();
let _ = execute!(terminal.backend_mut(), LeaveAlternateScreen);
let _ = terminal.show_cursor();
result
}
async fn read_sse(url: String, state: Arc<Mutex<AppState>>) {
let client = reqwest::Client::new();
let resp = match client
.get(&url)
.header("Accept", "text/event-stream")
.send()
.await
{
Ok(r) => {
state.lock().unwrap().sse_status = "live".to_string();
r
}
Err(e) => {
state.lock().unwrap().sse_status = format!("error: {e}");
return;
}
};
let mut stream = resp.bytes_stream();
let mut buf = String::new();
while let Some(chunk) = stream.next().await {
let chunk = match chunk {
Ok(c) => c,
Err(e) => {
state.lock().unwrap().sse_status = format!("disconnected: {e}");
return;
}
};
buf.push_str(&String::from_utf8_lossy(&chunk));
while let Some(nl) = buf.find('\n') {
let line = buf[..nl].trim_end_matches('\r').to_owned();
buf.drain(..=nl);
if let Some(data) = line.strip_prefix("data: ") {
match serde_json::from_str::<RouterEvent>(data) {
Ok(RouterEvent::Start { id, ts_ms, model, .. }) => {
state.lock().unwrap().on_start(id, ts_ms, model);
}
Ok(RouterEvent::Classified { id, tags, .. }) => {
state.lock().unwrap().on_classified(id, tags);
}
Ok(RouterEvent::Routed { id, provider, model, .. }) => {
state.lock().unwrap().on_routed(id, provider, model);
}
Ok(RouterEvent::Complete { id, entry }) => {
state.lock().unwrap().on_complete(id, entry);
}
Err(_) => {}
}
}
}
}
state.lock().unwrap().sse_status = "disconnected".to_string();
}
async fn event_loop(
terminal: &mut Terminal<CrosstermBackend<Stdout>>,
state: &Arc<Mutex<AppState>>,
) -> anyhow::Result<()> {
let mut tick = tokio::time::interval(Duration::from_millis(50));
loop {
tick.tick().await;
while event::poll(Duration::ZERO)? {
if let Event::Key(key) = event::read()? {
if handle_key(key, state).await? {
return Ok(());
}
}
}
let s = state.lock().unwrap();
terminal.draw(|f| render(f, &s))?;
}
}
async fn handle_key(
key: event::KeyEvent,
state: &Arc<Mutex<AppState>>,
) -> anyhow::Result<bool> {
if key.modifiers.contains(KeyModifiers::CONTROL) && key.code == KeyCode::Char('c') {
return Ok(true);
}
let mut s = state.lock().unwrap();
match s.focus {
Focus::Feed => match key.code {
KeyCode::Char('q') => return Ok(true),
KeyCode::Tab | KeyCode::Char('i') => s.focus = Focus::Chat,
KeyCode::Up | KeyCode::Char('k') => {
let max = s.events.len().saturating_sub(1);
if s.scroll < max {
s.scroll += 1;
}
}
KeyCode::Down | KeyCode::Char('j') => {
s.scroll = s.scroll.saturating_sub(1);
}
_ => {}
},
Focus::Chat => match key.code {
KeyCode::Esc => {
s.input.clear();
s.focus = Focus::Feed;
}
KeyCode::Tab => {
s.focus = Focus::Feed;
}
KeyCode::Backspace => {
s.input.pop();
}
KeyCode::Enter => {
if s.input.is_empty() || s.sending {
} else if let Some(name) = s.input.strip_prefix(":model ") {
s.model = name.trim().to_string();
s.input.clear();
} else if s.input.trim() == ":clear" {
s.chat_history.clear();
s.input.clear();
} else {
let msg = std::mem::take(&mut s.input);
s.chat_history.push(serde_json::json!({"role": "user", "content": msg}));
let history = s.chat_history.clone();
let model = s.model.clone();
let url = s.base_url.clone();
s.sending = true;
drop(s);
let state2 = state.clone();
tokio::spawn(async move {
stream_chat(state2, &url, &model, &history).await;
});
return Ok(false);
}
}
KeyCode::Char(c) => s.input.push(c),
_ => {}
},
}
Ok(false)
}
async fn stream_chat(
state: Arc<Mutex<AppState>>,
base_url: &str,
model: &str,
history: &[serde_json::Value],
) {
let client = reqwest::Client::new();
let result = client
.post(format!("{}/v1/chat/completions", base_url.trim_end_matches('/')))
.json(&serde_json::json!({
"model": model,
"messages": history,
"stream": true,
}))
.send()
.await;
let resp = match result {
Ok(r) => r,
Err(e) => {
let mut st = state.lock().unwrap();
st.chat_history.pop(); st.chat_history.push(serde_json::json!({"role": "assistant", "content": format!("error: {e}")}));
st.sending = false;
return;
}
};
let mut byte_stream = resp.bytes_stream();
let mut buf = String::new();
while let Some(chunk) = byte_stream.next().await {
let bytes = match chunk {
Ok(b) => b,
Err(e) => {
let mut st = state.lock().unwrap();
let partial = std::mem::take(&mut st.stream_buf);
let content = if partial.is_empty() {
format!("error: {e}")
} else {
partial
};
st.chat_history.push(serde_json::json!({"role": "assistant", "content": content}));
st.sending = false;
return;
}
};
buf.push_str(&String::from_utf8_lossy(&bytes));
while let Some(nl) = buf.find('\n') {
let line = buf[..nl].trim_end_matches('\r').to_owned();
buf.drain(..=nl);
let Some(data) = line.strip_prefix("data: ") else { continue };
if data == "[DONE]" {
let mut st = state.lock().unwrap();
let content = std::mem::take(&mut st.stream_buf);
st.chat_history.push(serde_json::json!({"role": "assistant", "content": content}));
st.sending = false;
return;
}
if let Ok(v) = serde_json::from_str::<serde_json::Value>(data) {
if let Some(text) = v["choices"][0]["delta"]["content"].as_str() {
state.lock().unwrap().stream_buf.push_str(text);
}
}
}
}
let mut st = state.lock().unwrap();
let content = std::mem::take(&mut st.stream_buf);
if !content.is_empty() {
st.chat_history.push(serde_json::json!({"role": "assistant", "content": content}));
} else {
st.chat_history.pop();
st.chat_history.push(serde_json::json!({"role": "assistant", "content": "(empty response)"}));
}
st.sending = false;
}
fn render(f: &mut Frame, s: &AppState) {
let outer = Layout::default()
.direction(Direction::Vertical)
.constraints([Constraint::Percentage(60), Constraint::Percentage(40)])
.split(f.area());
let bottom = Layout::default()
.direction(Direction::Horizontal)
.constraints([Constraint::Percentage(35), Constraint::Percentage(65)])
.split(outer[1]);
render_feed(f, s, outer[0]);
render_stats(f, s, bottom[0]);
render_chat(f, s, bottom[1]);
}
fn render_feed(f: &mut Frame, s: &AppState, area: ratatui::layout::Rect) {
let focused = s.focus == Focus::Feed;
let border_style = focus_border(focused);
let in_flight = s.pending.len();
let flight_suffix = if in_flight > 0 {
format!(" {} in flight", in_flight)
} else {
String::new()
};
let title = format!(
" Pipeline {} {} events{} ",
s.sse_status,
s.events.len(),
flight_suffix,
);
let hint = if focused {
" ↑/↓ scroll Tab/i → chat q quit "
} else {
" Tab/i to focus "
};
let block = Block::default()
.title(title)
.title_bottom(hint)
.borders(Borders::ALL)
.border_style(border_style);
let inner = block.inner(area);
f.render_widget(block, area);
let pending_items: Vec<ListItem> = s
.pending
.iter()
.rev()
.map(|e| pending_to_list_item(e))
.collect();
let done_items: Vec<ListItem> = s
.events
.iter()
.skip(s.scroll)
.map(entry_to_list_item)
.collect();
let all_items: Vec<ListItem> = pending_items.into_iter().chain(done_items).collect();
f.render_widget(List::new(all_items), inner);
}
fn pending_to_list_item(e: &PendingEntry) -> ListItem<'static> {
let secs = e.ts_ms / 1000;
let time = format!("{:02}:{:02}:{:02}", (secs / 3600) % 24, (secs / 60) % 60, secs % 60);
let elapsed_ms = LogEntry::now_ms().saturating_sub(e.ts_ms);
let mut spans = vec![
Span::styled(time, Style::default().fg(Color::DarkGray)),
Span::raw(" "),
Span::styled("⋯", Style::default().fg(Color::Yellow).add_modifier(Modifier::BOLD)),
Span::raw(" "),
];
if let Some((provider, model)) = &e.routed {
spans.push(Span::styled(provider.clone(), Style::default().fg(Color::Cyan).add_modifier(Modifier::BOLD)));
spans.push(Span::raw(" "));
spans.push(Span::styled(model.clone(), Style::default().fg(Color::Blue)));
} else {
spans.push(Span::styled(e.model.clone(), Style::default().fg(Color::DarkGray)));
}
for tag in &e.tags {
spans.push(Span::raw(" "));
spans.push(Span::styled(format!("[{tag}]"), Style::default().fg(Color::Magenta)));
}
spans.push(Span::styled(
format!(" {}ms", elapsed_ms),
Style::default().fg(Color::DarkGray),
));
ListItem::new(vec![Line::from(spans), Line::from("")])
}
fn entry_to_list_item(e: &LogEntry) -> ListItem<'static> {
let secs = e.ts_ms / 1000;
let time = format!("{:02}:{:02}:{:02}", (secs / 3600) % 24, (secs / 60) % 60, secs % 60);
let mut header: Vec<Span<'static>> = vec![
Span::styled(time, Style::default().fg(Color::DarkGray)),
Span::raw(" "),
Span::styled(e.provider.clone(), Style::default().fg(Color::Cyan).add_modifier(Modifier::BOLD)),
Span::raw(" "),
];
if e.requested_model != e.sent_model {
header.push(Span::styled(e.requested_model.clone(), Style::default().fg(Color::DarkGray)));
header.push(Span::styled(" → ", Style::default().fg(Color::DarkGray)));
header.push(Span::styled(e.sent_model.clone(), Style::default().fg(Color::Blue)));
} else {
header.push(Span::styled(e.sent_model.clone(), Style::default().fg(Color::Blue)));
}
for tag in &e.tags {
header.push(Span::raw(" "));
header.push(Span::styled(format!("[{tag}]"), Style::default().fg(Color::Yellow)));
}
for plugin in &e.plugins {
header.push(Span::raw(" "));
header.push(Span::styled(format!("({plugin})"), Style::default().fg(Color::Magenta)));
}
if let Some(resp) = &e.response {
for tag in &resp.tags {
header.push(Span::raw(" "));
header.push(Span::styled(format!("<{tag}>"), Style::default().fg(Color::Red)));
}
header.push(Span::raw(" "));
header.push(Span::styled(format!("↑{}", resp.usage.input_tokens), Style::default().fg(Color::Green)));
header.push(Span::raw(" "));
header.push(Span::styled(format!("↓{}", resp.usage.output_tokens), Style::default().fg(Color::Magenta)));
}
let dur_color = if e.error.is_some() { Color::Red } else { Color::DarkGray };
header.push(Span::styled(
format!(" {}ms", e.duration_ms),
Style::default().fg(dur_color),
));
let last_user = e
.messages
.iter()
.rev()
.find(|m| m.role == Role::User)
.map(|m| m.text_content().replace('\n', " "))
.unwrap_or_default();
let body = if let Some(err) = &e.error {
Line::from(vec![
Span::styled(" error: ", Style::default().fg(Color::DarkGray)),
Span::styled(trunc(err, 90), Style::default().fg(Color::Red)),
])
} else if let Some(resp) = &e.response {
Line::from(vec![
Span::styled(" reply: ", Style::default().fg(Color::DarkGray)),
Span::styled(
trunc(resp.content.replace('\n', " ").as_str(), 90),
Style::default().fg(Color::White),
),
])
} else {
Line::from("")
};
ListItem::new(vec![
Line::from(header),
Line::from(vec![
Span::styled(" prompt:", Style::default().fg(Color::DarkGray)),
Span::raw(" "),
Span::raw(trunc(&last_user, 90)),
]),
body,
Line::from(""),
])
}
fn render_stats(f: &mut Frame, s: &AppState, area: ratatui::layout::Rect) {
let mut lines: Vec<Line> = vec![
Line::from(vec![
Span::styled(
s.stats.total.to_string(),
Style::default().fg(Color::White).add_modifier(Modifier::BOLD),
),
Span::styled(" requests", Style::default().fg(Color::DarkGray)),
]),
Line::from(vec![
Span::styled(
s.stats.errors.to_string(),
Style::default().fg(if s.stats.errors > 0 { Color::Red } else { Color::DarkGray }),
),
Span::styled(" errors", Style::default().fg(Color::DarkGray)),
]),
Line::from(vec![
Span::styled(
format!("{}ms", s.stats.avg_latency_ms()),
Style::default().fg(Color::White),
),
Span::styled(" avg", Style::default().fg(Color::DarkGray)),
]),
];
if !s.pending.is_empty() {
lines.push(Line::from(vec![
Span::styled(
s.pending.len().to_string(),
Style::default().fg(Color::Yellow).add_modifier(Modifier::BOLD),
),
Span::styled(" in flight", Style::default().fg(Color::DarkGray)),
]));
}
if !s.stats.by_provider.is_empty() {
lines.push(Line::from(""));
lines.push(Line::from(Span::styled(
"providers",
Style::default().fg(Color::DarkGray).add_modifier(Modifier::ITALIC),
)));
let mut providers: Vec<(&String, &u32)> = s.stats.by_provider.iter().collect();
providers.sort_by(|a, b| b.1.cmp(a.1));
for (name, count) in providers {
let pct = if s.stats.total > 0 { count * 100 / s.stats.total } else { 0 };
let ago = s.stats.last_ok.get(name)
.map(|&ts| ago_str(ts))
.unwrap_or_else(|| "never".to_string());
let ago_color = if ago == "never" { Color::Red } else { Color::DarkGray };
lines.push(Line::from(vec![
Span::styled(format!(" {name}"), Style::default().fg(Color::Cyan)),
Span::styled(format!(" {count} ({pct}%)"), Style::default().fg(Color::DarkGray)),
Span::styled(format!(" last {ago}"), Style::default().fg(ago_color)),
]));
}
}
if !s.stats.by_tag.is_empty() {
lines.push(Line::from(""));
lines.push(Line::from(Span::styled(
"tags",
Style::default().fg(Color::DarkGray).add_modifier(Modifier::ITALIC),
)));
let mut tags: Vec<(&String, &u32)> = s.stats.by_tag.iter().collect();
tags.sort_by(|a, b| b.1.cmp(a.1));
for (tag, count) in tags {
lines.push(Line::from(vec![
Span::styled(format!(" [{tag}]"), Style::default().fg(Color::Yellow)),
Span::styled(format!(" {count}"), Style::default().fg(Color::DarkGray)),
]));
}
}
f.render_widget(
Paragraph::new(lines).block(
Block::default()
.title(" Stats ")
.borders(Borders::ALL)
.border_style(Style::default().fg(Color::DarkGray)),
),
area,
);
}
fn render_chat(f: &mut Frame, s: &AppState, area: ratatui::layout::Rect) {
let focused = s.focus == Focus::Chat;
let border_style = focus_border(focused);
let hint = if focused {
" Enter=send Esc=back :model <name>=change "
} else {
" Tab/i to focus "
};
let block = Block::default()
.title(format!(" Chat model: {} ", s.model))
.title_bottom(hint)
.borders(Borders::ALL)
.border_style(border_style);
let inner = block.inner(area);
f.render_widget(block, area);
let split = Layout::default()
.direction(Direction::Vertical)
.constraints([Constraint::Min(1), Constraint::Length(3)])
.split(inner);
let route_title = if let Some(meta) = &s.last_routed {
let mut t = format!(" {} / {} ", meta.provider, meta.sent_model);
for tag in &meta.tags {
t.push_str(&format!("[{tag}] "));
}
for tag in &meta.response_tags {
t.push_str(&format!("<{tag}> "));
}
t
} else {
" conversation ".to_string()
};
let conv_lines: Vec<Line> = if s.chat_history.is_empty() && !s.sending {
vec![Line::from(Span::styled(
"(no messages yet — type below and press Enter)",
Style::default().fg(Color::DarkGray),
))]
} else {
let mut lines: Vec<Line> = Vec::new();
for msg in &s.chat_history {
let role = msg["role"].as_str().unwrap_or("");
let content = msg["content"].as_str().unwrap_or("");
match role {
"user" => {
lines.push(Line::from(vec![
Span::styled("you: ", Style::default().fg(Color::Cyan).add_modifier(Modifier::BOLD)),
Span::styled(content.to_string(), Style::default().fg(Color::White)),
]));
}
"assistant" => {
for line in content.split('\n') {
lines.push(Line::from(vec![
Span::raw(" "),
Span::styled(line.to_string(), Style::default().fg(Color::White)),
]));
}
lines.push(Line::from(""));
}
_ => {}
}
}
if s.sending {
if s.stream_buf.is_empty() {
lines.push(Line::from(Span::styled(" …", Style::default().fg(Color::DarkGray))));
} else {
for line in s.stream_buf.split('\n') {
lines.push(Line::from(vec![
Span::raw(" "),
Span::styled(line.to_string(), Style::default().fg(Color::White)),
]));
}
}
}
lines
};
let pane_height = split[0].height.saturating_sub(2) as usize;
let scroll_offset = conv_lines.len().saturating_sub(pane_height) as u16;
f.render_widget(
Paragraph::new(conv_lines)
.wrap(Wrap { trim: false })
.scroll((scroll_offset, 0))
.block(
Block::default()
.title(route_title)
.borders(Borders::BOTTOM)
.border_style(Style::default().fg(Color::DarkGray)),
),
split[0],
);
let cursor = if focused && !s.sending { "█" } else { "" };
f.render_widget(
Paragraph::new(format!("{}{}", s.input, cursor))
.style(Style::default().fg(Color::White))
.block(
Block::default()
.title(" ▸ ")
.borders(Borders::ALL)
.border_style(if focused {
Style::default().fg(Color::Cyan)
} else {
Style::default().fg(Color::DarkGray)
}),
),
split[1],
);
}
fn focus_border(focused: bool) -> Style {
if focused {
Style::default().fg(Color::Cyan)
} else {
Style::default().fg(Color::DarkGray)
}
}
fn trunc(s: &str, max: usize) -> String {
let mut chars = s.chars();
let head: String = chars.by_ref().take(max).collect();
if chars.next().is_some() { head + "…" } else { head }
}
fn ago_str(ts_ms: u64) -> String {
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let secs = now_ms.saturating_sub(ts_ms) / 1000;
if secs < 60 {
format!("{secs}s ago")
} else if secs < 3600 {
format!("{}m ago", secs / 60)
} else {
format!("{}h ago", secs / 3600)
}
}