use crate::{CliError, CliResult};
use clap::Args;
use rusqlite::Connection;
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::path::Path;
use std::time::Duration;
#[derive(Debug, Args)]
pub struct DbServeArgs {
#[arg(long, default_value = "127.0.0.1")]
pub host: String,
#[arg(long, default_value_t = 8787)]
pub port: u16,
#[arg(long, help = "serve one request, then exit")]
pub once: bool,
}
struct DashboardSession {
session_id: String,
title: String,
phase: String,
mode: String,
workflow_status: String,
lead_agent_id: String,
artifact_ref: String,
updated_at: String,
next_action: String,
blockers: String,
asks_for_zevs: String,
risk_or_residual_uncertainty: String,
expected_wait: String,
}
struct DashboardMessage {
session_id: String,
mid: String,
message_type: String,
delivery_status: String,
verified_by: String,
latest_audit_id: String,
timestamp: String,
proofs: Vec<DeliveryProof>,
}
struct DeliveryProof {
delivery_status: String,
verified_by: String,
audit_id: String,
}
struct StatusEvent {
session_id: String,
timestamp: String,
phase: String,
mode: String,
workflow_status: String,
next_action: String,
}
enum TimelineEntry {
Status(StatusEvent),
Message(DashboardMessage),
}
struct RequestTarget {
method: String,
route: String,
selected_session_id: Option<String>,
}
pub fn serve(path: &Path, args: DbServeArgs) -> CliResult<()> {
if args.host != "127.0.0.1" {
return Err(CliError::usage(
"db dashboard server binds only to 127.0.0.1 in v0.2",
));
}
crate::db::open_database(path)?;
let listener = TcpListener::bind((args.host.as_str(), args.port)).map_err(|error| {
CliError::failure(format!(
"failed to bind dashboard server on {}:{}: {error}",
args.host, args.port
))
})?;
let address = listener.local_addr().map_err(|error| {
CliError::failure(format!(
"failed to read dashboard listener address: {error}"
))
})?;
println!("listening on http://{address}/");
std::io::stdout()
.flush()
.map_err(|error| CliError::failure(format!("failed to flush dashboard URL: {error}")))?;
if args.once {
let (stream, _) = listener.accept().map_err(|error| {
CliError::failure(format!("failed to accept dashboard request: {error}"))
})?;
return handle_connection(stream, path);
}
for stream in listener.incoming() {
let stream = stream.map_err(|error| {
CliError::failure(format!("failed to accept dashboard request: {error}"))
})?;
handle_connection(stream, path)?;
}
Ok(())
}
fn handle_connection(mut stream: TcpStream, path: &Path) -> CliResult<()> {
stream
.set_read_timeout(Some(Duration::from_secs(5)))
.map_err(|error| CliError::failure(format!("failed to set read timeout: {error}")))?;
let request = read_http_request(&mut stream)?;
let target = parse_request_target(&request);
let (status, content_type, body) = if !matches!(target.method.as_str(), "GET" | "HEAD") {
(
"405 Method Not Allowed",
"text/plain; charset=utf-8",
"method not allowed\n".to_string(),
)
} else if matches!(target.route.as_str(), "/" | "/index.html") {
let connection = crate::db::open_read_database(path)?;
(
"200 OK",
"text/html; charset=utf-8",
render_dashboard(&connection, target.selected_session_id.as_deref())?,
)
} else {
(
"404 Not Found",
"text/plain; charset=utf-8",
"not found\n".to_string(),
)
};
write_response(&mut stream, status, content_type, &body)
}
fn parse_request_target(request: &str) -> RequestTarget {
let mut tokens = request
.lines()
.next()
.unwrap_or("GET / HTTP/1.1")
.split_whitespace();
let method = tokens.next().unwrap_or("GET").to_string();
let target = tokens.next().unwrap_or("/");
let (route, query) = target.split_once('?').unwrap_or((target, ""));
RequestTarget {
method,
route: route.to_string(),
selected_session_id: query_param(query, "session"),
}
}
fn query_param(query: &str, name: &str) -> Option<String> {
query.split('&').find_map(|part| {
let (key, value) = part.split_once('=')?;
(key == name).then(|| percent_decode(value))
})
}
fn read_http_request(stream: &mut TcpStream) -> CliResult<String> {
let mut request = Vec::new();
let mut buffer = [0_u8; 1024];
loop {
let count = stream.read(&mut buffer).map_err(|error| {
CliError::failure(format!("failed to read dashboard request: {error}"))
})?;
if count == 0 {
break;
}
request.extend_from_slice(&buffer[..count]);
if request.windows(4).any(|window| window == b"\r\n\r\n") || request.len() > 8192 {
break;
}
}
Ok(String::from_utf8_lossy(&request).to_string())
}
fn write_response(
stream: &mut TcpStream,
status: &str,
content_type: &str,
body: &str,
) -> CliResult<()> {
let response = format!(
"HTTP/1.1 {status}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
body.len()
);
stream
.write_all(response.as_bytes())
.map_err(|error| CliError::failure(format!("failed to write dashboard response: {error}")))
}
fn render_dashboard(
connection: &Connection,
selected_session_id: Option<&str>,
) -> CliResult<String> {
let sessions = load_sessions(connection)?;
let selected = selected_session_id
.and_then(|session_id| {
sessions
.iter()
.find(|session| session.session_id == session_id)
})
.or_else(|| sessions.first());
let selected_id = selected.map(|session| session.session_id.as_str());
let events = load_status_events(connection, selected_id)?;
let messages = load_messages(connection, selected_id)?;
let timeline = timeline_entries(events, messages);
let mut html = String::new();
html.push_str("<!doctype html><html lang=\"en\"><head><meta charset=\"utf-8\">");
html.push_str("<meta name=\"viewport\" content=\"width=device-width, initial-scale=1\">");
html.push_str("<title>zynk dashboard</title><style>");
html.push_str(STYLES);
html.push_str("</style></head><body>");
html.push_str("<div class=\"app-shell\">");
html.push_str("<aside class=\"sidebar\"><div class=\"brand\">zynk</div><nav>");
if sessions.is_empty() {
html.push_str("<p class=\"empty\">No sessions in this database.</p>");
} else {
for session in &sessions {
html.push_str(&format!(
"<a class=\"session-link\" href=\"/?session={}\"><span>{}</span><span class=\"badge status-{}\">{}</span></a>",
escape_url_component(&session.session_id),
escape_html(&session.session_id),
escape_class(&session.workflow_status),
escape_html(&session.workflow_status),
));
}
}
html.push_str("</nav></aside>");
html.push_str("<main class=\"timeline\"><header class=\"timeline-header\">");
html.push_str("<div><h1>Timeline</h1>");
if let Some(session) = selected {
html.push_str(&format!(
"<p>{} / {} / {}</p>",
escape_html(&session.session_id),
escape_html(&session.phase),
escape_html(&session.mode)
));
}
html.push_str("</div></header>");
if timeline.is_empty() {
html.push_str("<section class=\"empty-state\">No timeline records yet.</section>");
} else {
render_timeline(&mut html, &timeline);
}
html.push_str("</main>");
html.push_str("<aside class=\"detail-panel\"><h2>Status</h2>");
if let Some(session) = selected {
html.push_str(&format!(
"<dl><dt>Session</dt><dd>{}</dd><dt>State</dt><dd>{}</dd><dt>Next</dt><dd>{}</dd><dt>Ask</dt><dd>{}</dd><dt>Blockers</dt><dd>{}</dd><dt>Risk</dt><dd>{}</dd><dt>Expected wait</dt><dd>{}</dd><dt>Artifact</dt><dd>{}</dd><dt>Lead</dt><dd>{}</dd><dt>Updated</dt><dd>{}</dd></dl>",
escape_html(&session.title),
escape_html(&session.workflow_status),
escape_html(&session.next_action),
escape_html(&session.asks_for_zevs),
escape_html(&session.blockers),
escape_html(&session.risk_or_residual_uncertainty),
escape_html(&session.expected_wait),
escape_html(&session.artifact_ref),
escape_html(&session.lead_agent_id),
escape_html(&session.updated_at),
));
}
html.push_str("</aside></div></body></html>");
Ok(html)
}
fn render_timeline(html: &mut String, entries: &[TimelineEntry]) {
for entry in entries {
match entry {
TimelineEntry::Status(event) => render_status_event(html, event),
TimelineEntry::Message(message) => render_message(html, message),
}
}
}
fn render_status_event(html: &mut String, event: &StatusEvent) {
html.push_str(&format!(
"<article class=\"timeline-item status-event\"><div class=\"timestamp\">{}</div><h2>{}</h2><p>{} / {} / {}</p><p>{}</p></article>",
escape_html(&event.timestamp),
escape_html(&event.session_id),
escape_html(&event.phase),
escape_html(&event.mode),
escape_html(&event.workflow_status),
escape_html(&event.next_action),
));
}
fn render_message(html: &mut String, message: &DashboardMessage) {
html.push_str(&format!(
"<article class=\"timeline-item message\"><div class=\"timestamp\">{}</div><h2>{}</h2><p>{} / {} / latest {}</p><div class=\"proof-strip\">",
escape_html(&message.timestamp),
escape_html(&message.mid),
escape_html(&message.session_id),
escape_html(&message.message_type),
escape_html(&message.latest_audit_id),
));
if message.proofs.is_empty() {
html.push_str(&format!(
"<span class=\"proof proof-{}\">{} / {}</span>",
escape_class(&message.delivery_status),
escape_html(&message.delivery_status),
escape_html(&message.verified_by),
));
} else {
for proof in &message.proofs {
html.push_str(&format!(
"<span class=\"proof proof-{}\" title=\"audit {}\">{} / {}</span>",
escape_class(&proof.delivery_status),
escape_attr(&proof.audit_id),
escape_html(&proof.delivery_status),
escape_html(&proof.verified_by),
));
}
}
html.push_str("</div></article>");
}
fn load_sessions(connection: &Connection) -> CliResult<Vec<DashboardSession>> {
let mut statement = connection
.prepare(
"SELECT
s.session_id,
s.title,
s.phase,
s.mode,
s.workflow_status,
COALESCE(s.lead_agent_id, 'unknown'),
COALESCE(s.artifact_ref, 'unknown'),
s.updated_at,
COALESCE(se.next_action, 'unknown'),
COALESCE(se.blockers, 'unknown'),
COALESCE(se.asks_for_zevs, 'unknown'),
COALESCE(se.risk_or_residual_uncertainty, 'unknown'),
COALESCE(se.expected_wait, 'unknown')
FROM sessions AS s
LEFT JOIN status_events AS se
ON se.status_event_id = (
SELECT status_event_id
FROM status_events
WHERE session_id = s.session_id
ORDER BY timestamp DESC, status_event_id DESC
LIMIT 1
)
ORDER BY s.updated_at DESC, s.session_id",
)
.map_err(|error| {
CliError::failure(format!("failed to query dashboard sessions: {error}"))
})?;
let sessions = statement
.query_map([], |row| {
Ok(DashboardSession {
session_id: row.get(0)?,
title: row.get(1)?,
phase: row.get(2)?,
mode: row.get(3)?,
workflow_status: row.get(4)?,
lead_agent_id: row.get(5)?,
artifact_ref: row.get(6)?,
updated_at: row.get(7)?,
next_action: row.get(8)?,
blockers: row.get(9)?,
asks_for_zevs: row.get(10)?,
risk_or_residual_uncertainty: row.get(11)?,
expected_wait: row.get(12)?,
})
})
.map_err(|error| CliError::failure(format!("failed to read dashboard sessions: {error}")))?
.collect::<Result<Vec<_>, _>>()
.map_err(|error| {
CliError::failure(format!("failed to read dashboard sessions: {error}"))
})?;
Ok(sessions)
}
fn load_status_events(
connection: &Connection,
selected_session_id: Option<&str>,
) -> CliResult<Vec<StatusEvent>> {
let mut statement = connection
.prepare(
"SELECT session_id, timestamp, phase, mode, workflow_status, next_action
FROM status_events
WHERE (?1 IS NULL OR session_id = ?1)
ORDER BY timestamp DESC, status_event_id DESC
LIMIT 50",
)
.map_err(|error| CliError::failure(format!("failed to query status events: {error}")))?;
let events = statement
.query_map([selected_session_id], |row| {
Ok(StatusEvent {
session_id: row.get(0)?,
timestamp: row.get(1)?,
phase: row.get(2)?,
mode: row.get(3)?,
workflow_status: row.get(4)?,
next_action: row.get(5)?,
})
})
.map_err(|error| CliError::failure(format!("failed to read status events: {error}")))?
.collect::<Result<Vec<_>, _>>()
.map_err(|error| CliError::failure(format!("failed to read status events: {error}")))?;
Ok(events)
}
fn load_messages(
connection: &Connection,
selected_session_id: Option<&str>,
) -> CliResult<Vec<DashboardMessage>> {
let mut statement = connection
.prepare(
"SELECT session_id, mid, message_type, latest_delivery_status,
latest_verified_by, COALESCE(latest_audit_id, 'unknown'), timestamp
FROM messages
WHERE (?1 IS NULL OR session_id = ?1)
ORDER BY timestamp DESC, mid DESC
LIMIT 50",
)
.map_err(|error| CliError::failure(format!("failed to query messages: {error}")))?;
let mut messages = statement
.query_map([selected_session_id], |row| {
Ok(DashboardMessage {
session_id: row.get(0)?,
mid: row.get(1)?,
message_type: row.get(2)?,
delivery_status: row.get(3)?,
verified_by: row.get(4)?,
latest_audit_id: row.get(5)?,
timestamp: row.get(6)?,
proofs: Vec::new(),
})
})
.map_err(|error| CliError::failure(format!("failed to read messages: {error}")))?
.collect::<Result<Vec<_>, _>>()
.map_err(|error| CliError::failure(format!("failed to read messages: {error}")))?;
for message in &mut messages {
message.proofs = load_message_proofs(connection, &message.session_id, &message.mid)?;
}
Ok(messages)
}
fn timeline_entries(
events: Vec<StatusEvent>,
messages: Vec<DashboardMessage>,
) -> Vec<TimelineEntry> {
let mut entries = events
.into_iter()
.map(TimelineEntry::Status)
.chain(messages.into_iter().map(TimelineEntry::Message))
.collect::<Vec<_>>();
entries.sort_by(|left, right| right.timestamp().cmp(left.timestamp()));
entries
}
impl TimelineEntry {
fn timestamp(&self) -> &str {
match self {
TimelineEntry::Status(event) => &event.timestamp,
TimelineEntry::Message(message) => &message.timestamp,
}
}
}
fn load_message_proofs(
connection: &Connection,
session_id: &str,
mid: &str,
) -> CliResult<Vec<DeliveryProof>> {
let mut statement = connection
.prepare(
"SELECT delivery_status, verified_by, audit_id
FROM audit_records
WHERE session_id = ?1 AND mid = ?2
ORDER BY timestamp, audit_id",
)
.map_err(|error| CliError::failure(format!("failed to query message proofs: {error}")))?;
let proofs = statement
.query_map([session_id, mid], |row| {
Ok(DeliveryProof {
delivery_status: row.get(0)?,
verified_by: row.get(1)?,
audit_id: row.get(2)?,
})
})
.map_err(|error| CliError::failure(format!("failed to read message proofs: {error}")))?
.collect::<Result<Vec<_>, _>>()
.map_err(|error| CliError::failure(format!("failed to read message proofs: {error}")))?;
Ok(proofs)
}
fn escape_html(value: &str) -> String {
value
.replace('&', "&")
.replace('<', "<")
.replace('>', ">")
.replace('"', """)
.replace('\'', "'")
}
fn escape_attr(value: &str) -> String {
escape_html(value)
}
fn escape_class(value: &str) -> String {
escape_html(value)
.chars()
.map(|ch| {
if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_') {
ch
} else {
'-'
}
})
.collect()
}
fn escape_url_component(value: &str) -> String {
let mut escaped = String::new();
for byte in value.bytes() {
if byte.is_ascii_alphanumeric() || matches!(byte, b'-' | b'.' | b'_' | b'~') {
escaped.push(byte as char);
} else {
escaped.push_str(&format!("%{byte:02X}"));
}
}
escaped
}
fn percent_decode(value: &str) -> String {
let mut decoded = Vec::new();
let bytes = value.as_bytes();
let mut index = 0;
while index < bytes.len() {
if bytes[index] == b'%' && index + 2 < bytes.len() {
if let Ok(hex) = std::str::from_utf8(&bytes[index + 1..index + 3]) {
if let Ok(byte) = u8::from_str_radix(hex, 16) {
decoded.push(byte);
index += 3;
continue;
}
}
}
decoded.push(bytes[index]);
index += 1;
}
String::from_utf8_lossy(&decoded).to_string()
}
const STYLES: &str = r#"
:root { color-scheme: light; --ink: #1c2024; --muted: #667085; --line: #d6dbe1; --panel: #f7f8fa; --accent: #0f766e; --warn: #9a3412; --ok: #166534; }
* { box-sizing: border-box; }
body { margin: 0; font: 14px/1.45 system-ui, -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif; color: var(--ink); background: #ffffff; letter-spacing: 0; }
.app-shell { min-height: 100vh; display: grid; grid-template-columns: minmax(220px, 18vw) minmax(0, 1fr) minmax(260px, 22vw); }
.sidebar, .detail-panel { background: var(--panel); border-color: var(--line); padding: 18px; overflow: auto; }
.sidebar { border-right: 1px solid var(--line); }
.detail-panel { border-left: 1px solid var(--line); }
.brand { font-weight: 700; font-size: 18px; margin-bottom: 18px; }
.session-link { display: grid; grid-template-columns: minmax(0, 1fr) auto; gap: 8px; align-items: center; color: inherit; text-decoration: none; padding: 9px 0; border-bottom: 1px solid var(--line); }
.badge, .proof { display: inline-flex; align-items: center; min-height: 24px; padding: 3px 8px; border: 1px solid var(--line); border-radius: 6px; background: #fff; font-size: 12px; white-space: nowrap; }
.status-working, .proof-observed { border-color: #86efac; color: var(--ok); }
.status-blocked, .status-waiting-for-operator, .proof-failed { border-color: #fdba74; color: var(--warn); }
.proof-sent { border-color: #5eead4; color: var(--accent); }
.status-idle, .status-done, .proof-drafted, .proof-unknown { border-color: #d0d5dd; color: var(--muted); }
.timeline { padding: 20px clamp(18px, 3vw, 42px); overflow: auto; }
.timeline-header { display: flex; justify-content: space-between; align-items: end; border-bottom: 1px solid var(--line); margin-bottom: 18px; padding-bottom: 12px; }
h1 { font-size: 24px; margin: 0; }
h2 { font-size: 15px; margin: 4px 0; }
p { color: var(--muted); margin: 4px 0; }
.timeline-item { max-width: 860px; border: 1px solid var(--line); border-radius: 8px; padding: 14px 16px; margin: 0 0 12px; background: #fff; }
.timestamp { color: var(--muted); font-size: 12px; }
.proof-strip { display: flex; flex-wrap: wrap; gap: 6px; margin-top: 10px; }
dl { display: grid; grid-template-columns: 96px minmax(0, 1fr); gap: 9px 12px; margin: 0; }
dt { color: var(--muted); }
dd { margin: 0; overflow-wrap: anywhere; }
.empty, .empty-state { color: var(--muted); }
@media (max-width: 900px) { .app-shell { grid-template-columns: 1fr; } .sidebar, .detail-panel { border: 0; border-bottom: 1px solid var(--line); } }
"#;