//! Daily analytics over `~/.netsky/meta.db`.
//!
//! Reads observability tables via DataFusion, aggregates by day, writes
//! `~/.netsky/analytics/<date>.json` + `<date>.html`, and optionally
//! emits a Zorto Markdown page under `website/content/analytics/`.
use std::fs;
use std::path::{Path, PathBuf};
use std::process::Command;
use chrono::{Duration, NaiveDate, TimeZone, Utc};
use clap::Subcommand;
use netsky_core::Result;
use netsky_core::paths::home;
use netsky_db::arrow_array::{Array, Int64Array, StringArray};
use netsky_db::{ArrowRecordBatch as RecordBatch, Db};
use serde::Serialize;
use serde_json::json;
#[derive(Subcommand, Debug)]
pub enum AnalyticsCommand {
/// Aggregate one day from meta.db and emit JSON + HTML.
Daily {
/// UTC date in YYYY-MM-DD form. Defaults to today.
#[arg(long = "for", value_name = "YYYY-MM-DD")]
for_date: Option<String>,
/// Emit a zorto markdown page into WEBSITE_DIR/content/analytics/<date>.md.
#[arg(long, value_name = "WEBSITE_DIR")]
website: Option<PathBuf>,
/// Override output dir (default: ~/.netsky/analytics/).
#[arg(long, value_name = "DIR")]
out: Option<PathBuf>,
/// Print a JSON envelope (suppresses the per-day text line).
#[arg(long)]
json: bool,
},
/// Run `daily` for each of the last N days (inclusive of today).
Backfill {
/// How many past days to backfill. Defaults to 7.
#[arg(long, default_value_t = 7)]
days: u32,
/// Emit a zorto markdown page for each day.
#[arg(long, value_name = "WEBSITE_DIR")]
website: Option<PathBuf>,
/// Override output dir (default: ~/.netsky/analytics/).
#[arg(long, value_name = "DIR")]
out: Option<PathBuf>,
/// Print one JSON envelope summarizing the backfill.
#[arg(long)]
json: bool,
},
}
pub fn run(cmd: AnalyticsCommand) -> Result<()> {
match cmd {
AnalyticsCommand::Daily {
for_date,
website,
out,
json,
} => {
let date = parse_date(for_date.as_deref())?;
let out_dir = out.unwrap_or_else(default_out_dir);
let report = run_daily(date, &out_dir, website.as_deref(), json)?;
if json {
emit_envelope(&out_dir, &[(date, &report)]);
}
}
AnalyticsCommand::Backfill {
days,
website,
out,
json,
} => {
let out_dir = out.unwrap_or_else(default_out_dir);
let today = Utc::now().date_naive();
let mut runs = Vec::new();
for offset in 0..days as i64 {
let date = today - Duration::days(offset);
let report = run_daily(date, &out_dir, website.as_deref(), json)?;
runs.push((date, report));
}
if json {
let entries: Vec<_> = runs.iter().map(|(d, r)| (*d, r)).collect();
emit_envelope(&out_dir, &entries);
}
}
}
Ok(())
}
fn emit_envelope(out_dir: &Path, entries: &[(NaiveDate, &DailyReport)]) {
let days: Vec<_> = entries
.iter()
.map(|(d, r)| {
json!({
"date": d.to_string(),
"json_path": out_dir.join(format!("{d}.json")).display().to_string(),
"html_path": out_dir.join(format!("{d}.html")).display().to_string(),
"sessions": r.sessions.count,
"crashes": r.crashes.count,
"clones_dispatched": r.clones.dispatched,
"messages": r.messages.total,
"commits_to_main": r.commits_to_main,
})
})
.collect();
let envelope = json!({
"command": "analytics",
"status": "green",
"summary": format!("aggregated {} day(s)", days.len()),
"generated_at": Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true),
"data": {
"out_dir": out_dir.display().to_string(),
"days": days,
},
});
println!(
"{}",
serde_json::to_string_pretty(&envelope).unwrap_or_else(|_| "{}".to_string())
);
}
fn default_out_dir() -> PathBuf {
home().join(".netsky").join("analytics")
}
fn parse_date(input: Option<&str>) -> Result<NaiveDate> {
match input {
Some(s) => NaiveDate::parse_from_str(s, "%Y-%m-%d")
.map_err(|e| netsky_core::anyhow!("parse --for date {s}: {e}")),
None => Ok(Utc::now().date_naive()),
}
}
fn run_daily(
date: NaiveDate,
out_dir: &Path,
website_dir: Option<&Path>,
quiet: bool,
) -> Result<DailyReport> {
let db = Db::open().map_err(super::db_diag::wrap_open_error)?;
db.migrate().map_err(super::db_diag::wrap_open_error)?;
let report = aggregate(&db, date)?;
fs::create_dir_all(out_dir)?;
let json_path = out_dir.join(format!("{date}.json"));
fs::write(
&json_path,
serde_json::to_string_pretty(&report)
.map_err(|e| netsky_core::anyhow!("serialize report: {e}"))?,
)?;
let html_path = out_dir.join(format!("{date}.html"));
fs::write(&html_path, render_standalone_html(&report))?;
if let Some(dir) = website_dir {
let md_dir = dir.join("content").join("analytics");
fs::create_dir_all(&md_dir)?;
fs::write(md_dir.join(format!("{date}.md")), render_markdown(&report))?;
}
if !quiet {
println!(
"analytics {date}: {json} + {html}",
json = json_path.display(),
html = html_path.display(),
);
}
Ok(report)
}
// ---- data model ----
#[derive(Debug, Serialize)]
struct DailyReport {
date: String,
generated_at: String,
uptime: Uptime,
sessions: SessionSummary,
crashes: CrashSummary,
clones: CloneSummary,
tokens: TokenSummary,
messages: MessageSummary,
commits_to_main: i64,
iroh: IrohSummary,
incidents: Vec<Incident>,
}
#[derive(Debug, Serialize)]
struct Uptime {
agent0_up_seconds: i64,
any_session_up_seconds: i64,
}
#[derive(Debug, Serialize)]
struct SessionSummary {
count: i64,
median_length_seconds: Option<i64>,
by_agent: Vec<LabeledCount>,
}
#[derive(Debug, Serialize)]
struct CrashSummary {
count: i64,
mttr_seconds: Option<i64>,
by_kind: Vec<LabeledCount>,
}
#[derive(Debug, Serialize)]
struct CloneSummary {
dispatched: i64,
by_runtime: Vec<LabeledCount>,
clone_hours: f64,
}
#[derive(Debug, Serialize)]
struct TokenSummary {
total_input: i64,
total_output: i64,
total_cached: i64,
total_cost_usd: f64,
p50_input_per_event: Option<i64>,
p90_input_per_event: Option<i64>,
p99_input_per_event: Option<i64>,
by_runtime: Vec<LabeledCount>,
}
#[derive(Debug, Serialize)]
struct MessageSummary {
total: i64,
by_source: Vec<LabeledCount>,
}
#[derive(Debug, Serialize)]
struct IrohSummary {
connect: i64,
evict: i64,
reconnect: i64,
handshake_refused: i64,
}
#[derive(Debug, Serialize)]
struct Incident {
ts_utc: String,
kind: String,
agent: Option<String>,
summary: String,
}
#[derive(Debug, Serialize)]
struct LabeledCount {
label: String,
count: i64,
}
// ---- aggregation ----
fn aggregate(db: &Db, date: NaiveDate) -> Result<DailyReport> {
let start = Utc.from_utc_datetime(&date.and_hms_opt(0, 0, 0).unwrap());
let end = start + Duration::days(1);
let start_s = start.to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
let end_s = end.to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
let sessions = sessions_summary(db, &start_s, &end_s)?;
let crashes = crashes_summary(db, &start_s, &end_s)?;
let clones = clones_summary(db, &start_s, &end_s)?;
let tokens = tokens_summary(db, &start_s, &end_s)?;
let messages = messages_summary(db, &start_s, &end_s)?;
let iroh = iroh_summary(db, &start_s, &end_s)?;
let uptime = uptime_summary(db, &start_s, &end_s, &start_s, &end_s)?;
let incidents = incidents_summary(db, &start_s, &end_s)?;
let commits_to_main = commits_to_main_count(date)?;
Ok(DailyReport {
date: date.to_string(),
generated_at: Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true),
uptime,
sessions,
crashes,
clones,
tokens,
messages,
commits_to_main,
iroh,
incidents,
})
}
fn scalar_i64(db: &Db, sql: &str) -> Result<i64> {
let batches = db
.query_batches(sql)
.map_err(|e| netsky_core::anyhow!("scalar query `{sql}`: {e}"))?;
for batch in batches {
if batch.num_rows() == 0 {
continue;
}
if let Some(col) = batch.column(0).as_any().downcast_ref::<Int64Array>() {
if col.is_null(0) {
return Ok(0);
}
return Ok(col.value(0));
}
}
Ok(0)
}
fn labeled_counts(db: &Db, sql: &str) -> Result<Vec<LabeledCount>> {
let batches = db
.query_batches(sql)
.map_err(|e| netsky_core::anyhow!("labeled counts `{sql}`: {e}"))?;
let mut out = Vec::new();
for batch in batches {
let labels = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| netsky_core::anyhow!("expected Utf8 label column"))?;
let counts = batch
.column(1)
.as_any()
.downcast_ref::<Int64Array>()
.ok_or_else(|| netsky_core::anyhow!("expected Int64 count column"))?;
for i in 0..batch.num_rows() {
let label = if labels.is_null(i) {
"-".to_string()
} else {
labels.value(i).to_string()
};
let count = if counts.is_null(i) {
0
} else {
counts.value(i)
};
out.push(LabeledCount { label, count });
}
}
Ok(out)
}
fn sessions_summary(db: &Db, start: &str, end: &str) -> Result<SessionSummary> {
let count = scalar_i64(
db,
&format!(
"SELECT COUNT(*) FROM sessions \
WHERE event = 'up' AND ts_utc >= '{start}' AND ts_utc < '{end}'"
),
)?;
let by_agent = labeled_counts(
db,
&format!(
"SELECT agent, COUNT(*) AS n FROM sessions \
WHERE event = 'up' AND ts_utc >= '{start}' AND ts_utc < '{end}' \
GROUP BY agent ORDER BY n DESC"
),
)?;
let median_length_seconds = session_median_length(db, start, end)?;
Ok(SessionSummary {
count,
median_length_seconds,
by_agent,
})
}
fn session_median_length(db: &Db, start: &str, end: &str) -> Result<Option<i64>> {
// Pair up/down by agent + session_num, keep pairs with up inside the day.
let sql = format!(
"SELECT u.agent, u.session_num, u.ts_utc, d.ts_utc FROM \
(SELECT agent, session_num, ts_utc FROM sessions \
WHERE event = 'up' AND ts_utc >= '{start}' AND ts_utc < '{end}') u \
LEFT JOIN \
(SELECT agent, session_num, MIN(ts_utc) AS ts_utc FROM sessions \
WHERE event = 'down' GROUP BY agent, session_num) d \
ON u.agent = d.agent AND u.session_num = d.session_num"
);
let batches = db
.query_batches(&sql)
.map_err(|e| netsky_core::anyhow!("median length: {e}"))?;
let mut durations = Vec::new();
for batch in batches {
let up_col = batch
.column(2)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| netsky_core::anyhow!("expected up ts"))?;
let down_col = batch
.column(3)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| netsky_core::anyhow!("expected down ts"))?;
for i in 0..batch.num_rows() {
if down_col.is_null(i) || up_col.is_null(i) {
continue;
}
if let (Ok(up), Ok(down)) = (
chrono::DateTime::parse_from_rfc3339(up_col.value(i)),
chrono::DateTime::parse_from_rfc3339(down_col.value(i)),
) {
let secs = (down - up).num_seconds();
if secs >= 0 {
durations.push(secs);
}
}
}
}
if durations.is_empty() {
return Ok(None);
}
durations.sort();
Ok(Some(durations[durations.len() / 2]))
}
fn crashes_summary(db: &Db, start: &str, end: &str) -> Result<CrashSummary> {
let count = scalar_i64(
db,
&format!("SELECT COUNT(*) FROM crashes WHERE ts_utc >= '{start}' AND ts_utc < '{end}'"),
)?;
let by_kind = labeled_counts(
db,
&format!(
"SELECT kind, COUNT(*) AS n FROM crashes \
WHERE ts_utc >= '{start}' AND ts_utc < '{end}' \
GROUP BY kind ORDER BY n DESC"
),
)?;
let mttr_seconds = crash_mttr(db, start, end)?;
Ok(CrashSummary {
count,
mttr_seconds,
by_kind,
})
}
fn crash_mttr(db: &Db, start: &str, end: &str) -> Result<Option<i64>> {
// For each crash, find the next `up` session event within 1 hour.
let sql = format!(
"SELECT c.ts_utc, MIN(s.ts_utc) FROM \
(SELECT ts_utc FROM crashes \
WHERE ts_utc >= '{start}' AND ts_utc < '{end}') c \
LEFT JOIN \
(SELECT ts_utc FROM sessions WHERE event = 'up') s \
ON s.ts_utc > c.ts_utc \
GROUP BY c.ts_utc"
);
let batches = db
.query_batches(&sql)
.map_err(|e| netsky_core::anyhow!("mttr: {e}"))?;
let mut deltas = Vec::new();
for batch in batches {
let crash = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| netsky_core::anyhow!("expected crash ts"))?;
let up = batch
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| netsky_core::anyhow!("expected up ts"))?;
for i in 0..batch.num_rows() {
if crash.is_null(i) || up.is_null(i) {
continue;
}
if let (Ok(c), Ok(u)) = (
chrono::DateTime::parse_from_rfc3339(crash.value(i)),
chrono::DateTime::parse_from_rfc3339(up.value(i)),
) {
let secs = (u - c).num_seconds();
if (0..3600).contains(&secs) {
deltas.push(secs);
}
}
}
}
if deltas.is_empty() {
return Ok(None);
}
deltas.sort();
Ok(Some(deltas[deltas.len() / 2]))
}
fn clones_summary(db: &Db, start: &str, end: &str) -> Result<CloneSummary> {
let dispatched = scalar_i64(
db,
&format!(
"SELECT COUNT(*) FROM clone_dispatches \
WHERE ts_utc_start >= '{start}' AND ts_utc_start < '{end}'"
),
)?;
let by_runtime = labeled_counts(
db,
&format!(
"SELECT COALESCE(runtime, 'unknown') AS runtime, COUNT(*) AS n \
FROM clone_dispatches \
WHERE ts_utc_start >= '{start}' AND ts_utc_start < '{end}' \
GROUP BY runtime ORDER BY n DESC"
),
)?;
let clone_hours = clone_hours_sum(db, start, end)?;
Ok(CloneSummary {
dispatched,
by_runtime,
clone_hours,
})
}
fn clone_hours_sum(db: &Db, start: &str, end: &str) -> Result<f64> {
let sql = format!(
"SELECT ts_utc_start, ts_utc_end FROM clone_lifetimes \
WHERE ts_utc_start >= '{start}' AND ts_utc_start < '{end}'"
);
let batches = db
.query_batches(&sql)
.map_err(|e| netsky_core::anyhow!("clone hours: {e}"))?;
let mut secs_total = 0_i64;
for batch in batches {
let starts = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| netsky_core::anyhow!("expected start ts"))?;
let ends = batch
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| netsky_core::anyhow!("expected end ts"))?;
for i in 0..batch.num_rows() {
if ends.is_null(i) {
continue;
}
if let (Ok(s), Ok(e)) = (
chrono::DateTime::parse_from_rfc3339(starts.value(i)),
chrono::DateTime::parse_from_rfc3339(ends.value(i)),
) {
let d = (e - s).num_seconds();
if d > 0 {
secs_total += d;
}
}
}
}
Ok(secs_total as f64 / 3600.0)
}
fn tokens_summary(db: &Db, start: &str, end: &str) -> Result<TokenSummary> {
let sql = format!(
"SELECT \
COALESCE(SUM(input_tokens), 0), \
COALESCE(SUM(output_tokens), 0), \
COALESCE(SUM(cached_input_tokens), 0), \
COALESCE(SUM(cost_usd_micros), 0) \
FROM token_usage \
WHERE ts_utc >= '{start}' AND ts_utc < '{end}'"
);
let batches = db
.query_batches(&sql)
.map_err(|e| netsky_core::anyhow!("tokens totals: {e}"))?;
let (mut input, mut output, mut cached, mut cost_micros) = (0, 0, 0, 0);
for batch in batches {
if batch.num_rows() == 0 {
continue;
}
input = int_at(&batch, 0)?;
output = int_at(&batch, 1)?;
cached = int_at(&batch, 2)?;
cost_micros = int_at(&batch, 3)?;
break;
}
let by_runtime = labeled_counts(
db,
&format!(
"SELECT COALESCE(runtime, 'unknown') AS runtime, \
COALESCE(SUM(input_tokens + output_tokens), 0) AS n \
FROM token_usage \
WHERE ts_utc >= '{start}' AND ts_utc < '{end}' \
GROUP BY runtime ORDER BY n DESC"
),
)?;
let input_samples = sample_ints(
db,
&format!(
"SELECT COALESCE(input_tokens, 0) FROM token_usage \
WHERE ts_utc >= '{start}' AND ts_utc < '{end}' \
ORDER BY input_tokens"
),
)?;
Ok(TokenSummary {
total_input: input,
total_output: output,
total_cached: cached,
total_cost_usd: cost_micros as f64 / 1_000_000.0,
p50_input_per_event: percentile(&input_samples, 0.5),
p90_input_per_event: percentile(&input_samples, 0.9),
p99_input_per_event: percentile(&input_samples, 0.99),
by_runtime,
})
}
fn int_at(batch: &RecordBatch, col: usize) -> Result<i64> {
let arr = batch
.column(col)
.as_any()
.downcast_ref::<Int64Array>()
.ok_or_else(|| netsky_core::anyhow!("expected int64 col {col}"))?;
if arr.is_null(0) {
return Ok(0);
}
Ok(arr.value(0))
}
fn sample_ints(db: &Db, sql: &str) -> Result<Vec<i64>> {
let batches = db
.query_batches(sql)
.map_err(|e| netsky_core::anyhow!("sample: {e}"))?;
let mut out = Vec::new();
for batch in batches {
let arr = batch
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.ok_or_else(|| netsky_core::anyhow!("expected int64 sample"))?;
for i in 0..arr.len() {
if arr.is_null(i) {
continue;
}
out.push(arr.value(i));
}
}
Ok(out)
}
fn percentile(sorted: &[i64], p: f64) -> Option<i64> {
if sorted.is_empty() {
return None;
}
let idx = ((sorted.len() as f64) * p).floor() as usize;
let idx = idx.min(sorted.len() - 1);
Some(sorted[idx])
}
fn messages_summary(db: &Db, start: &str, end: &str) -> Result<MessageSummary> {
let total = scalar_i64(
db,
&format!("SELECT COUNT(*) FROM messages WHERE ts_utc >= '{start}' AND ts_utc < '{end}'"),
)?;
let by_source = labeled_counts(
db,
&format!(
"SELECT source, COUNT(*) AS n FROM messages \
WHERE ts_utc >= '{start}' AND ts_utc < '{end}' \
GROUP BY source ORDER BY n DESC"
),
)?;
Ok(MessageSummary { total, by_source })
}
fn iroh_summary(db: &Db, start: &str, end: &str) -> Result<IrohSummary> {
let counts = labeled_counts(
db,
&format!(
"SELECT event_type, COUNT(*) AS n FROM iroh_events \
WHERE ts_utc >= '{start}' AND ts_utc < '{end}' \
GROUP BY event_type"
),
)?;
let mut connect = 0;
let mut evict = 0;
let mut reconnect = 0;
let mut handshake_refused = 0;
for c in counts {
match c.label.as_str() {
"connect" => connect = c.count,
"evict" => evict = c.count,
"reconnect" => reconnect = c.count,
"handshake_refused" => handshake_refused = c.count,
_ => {}
}
}
Ok(IrohSummary {
connect,
evict,
reconnect,
handshake_refused,
})
}
fn uptime_summary(
db: &Db,
start: &str,
end: &str,
day_start: &str,
day_end: &str,
) -> Result<Uptime> {
let any = uptime_for(db, None, start, end, day_start, day_end)?;
let agent0 = uptime_for(db, Some("agent0"), start, end, day_start, day_end)?;
Ok(Uptime {
agent0_up_seconds: agent0,
any_session_up_seconds: any,
})
}
fn uptime_for(
db: &Db,
agent: Option<&str>,
start: &str,
end: &str,
day_start: &str,
day_end: &str,
) -> Result<i64> {
// Union of [up, down-or-end] intervals clipped to [start, end].
let agent_filter = match agent {
Some(a) => format!(" AND agent = '{a}'"),
None => String::new(),
};
let sql = format!(
"SELECT agent, session_num, event, ts_utc FROM sessions \
WHERE ts_utc >= '{day_start}' AND ts_utc < '{day_end}'{agent_filter} \
ORDER BY ts_utc"
);
let _ = (start, end); // placeholders to avoid confusion
let batches = db
.query_batches(&sql)
.map_err(|e| netsky_core::anyhow!("uptime: {e}"))?;
let mut events: Vec<(String, i64, String, String)> = Vec::new();
for batch in batches {
let ag = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| netsky_core::anyhow!("agent col"))?;
let sn = batch
.column(1)
.as_any()
.downcast_ref::<Int64Array>()
.ok_or_else(|| netsky_core::anyhow!("session_num col"))?;
let ev = batch
.column(2)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| netsky_core::anyhow!("event col"))?;
let ts = batch
.column(3)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| netsky_core::anyhow!("ts col"))?;
for i in 0..batch.num_rows() {
events.push((
ag.value(i).to_string(),
sn.value(i),
ev.value(i).to_string(),
ts.value(i).to_string(),
));
}
}
let mut open: std::collections::HashMap<(String, i64), chrono::DateTime<Utc>> =
std::collections::HashMap::new();
let mut total = 0_i64;
let end_utc = chrono::DateTime::parse_from_rfc3339(day_end)
.map(|d| d.with_timezone(&Utc))
.map_err(|e| netsky_core::anyhow!("parse day end: {e}"))?;
for (ag, sn, event, ts) in events {
let ts = match chrono::DateTime::parse_from_rfc3339(&ts) {
Ok(t) => t.with_timezone(&Utc),
Err(_) => continue,
};
let key = (ag, sn);
match event.as_str() {
"up" => {
open.insert(key, ts);
}
"down" => {
if let Some(open_ts) = open.remove(&key) {
let secs = (ts - open_ts).num_seconds();
if secs > 0 {
total += secs;
}
}
}
_ => {}
}
}
for (_, open_ts) in open {
let secs = (end_utc - open_ts).num_seconds();
if secs > 0 {
total += secs;
}
}
Ok(total)
}
fn incidents_summary(db: &Db, start: &str, end: &str) -> Result<Vec<Incident>> {
let mut out = Vec::new();
let sql = format!(
"SELECT ts_utc, kind, agent, detail_json FROM crashes \
WHERE ts_utc >= '{start}' AND ts_utc < '{end}' \
ORDER BY ts_utc"
);
let batches = db
.query_batches(&sql)
.map_err(|e| netsky_core::anyhow!("incidents: {e}"))?;
for batch in batches {
let ts = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| netsky_core::anyhow!("ts col"))?;
let kind = batch
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| netsky_core::anyhow!("kind col"))?;
let agent = batch
.column(2)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| netsky_core::anyhow!("agent col"))?;
let detail = batch
.column(3)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| netsky_core::anyhow!("detail col"))?;
for i in 0..batch.num_rows() {
let summary = if detail.is_null(i) {
String::new()
} else {
truncate(detail.value(i), 200)
};
out.push(Incident {
ts_utc: ts.value(i).to_string(),
kind: kind.value(i).to_string(),
agent: if agent.is_null(i) {
None
} else {
Some(agent.value(i).to_string())
},
summary,
});
}
}
Ok(out)
}
fn commits_to_main_count(date: NaiveDate) -> Result<i64> {
// `git log` on the primary checkout is the authoritative source.
let repo = home().join("netsky");
if !repo.join(".git").exists() {
return Ok(0);
}
let since = format!("{} 00:00:00 +0000", date);
let until = format!("{} 00:00:00 +0000", date + Duration::days(1));
let out = Command::new("git")
.args([
"-C",
&repo.to_string_lossy(),
"log",
"main",
"--format=%h",
"--since",
&since,
"--until",
&until,
])
.output();
match out {
Ok(o) if o.status.success() => {
Ok(String::from_utf8_lossy(&o.stdout).lines().count() as i64)
}
_ => Ok(0),
}
}
fn truncate(s: &str, max: usize) -> String {
if s.len() <= max {
return s.to_string();
}
format!("{}…", &s[..max])
}
// ---- render ----
fn render_markdown(r: &DailyReport) -> String {
let mut out = String::new();
out.push_str("+++\n");
out.push_str(&format!("title = \"analytics {}\"\n", r.date));
out.push_str(&format!("date = {}\n", r.date));
out.push_str("[extra]\nplotly = true\n");
out.push_str("+++\n\n");
out.push_str(&format!("*generated {}*\n\n", r.generated_at));
out.push_str("## summary\n\n");
out.push_str(&format!(
"- sessions: **{}** (median {})\n",
r.sessions.count,
fmt_opt_seconds(r.sessions.median_length_seconds),
));
out.push_str(&format!(
"- crashes: **{}** (MTTR {})\n",
r.crashes.count,
fmt_opt_seconds(r.crashes.mttr_seconds),
));
out.push_str(&format!(
"- clones dispatched: **{}** ({:.1} clone-hours)\n",
r.clones.dispatched, r.clones.clone_hours,
));
out.push_str(&format!(
"- tokens: **{}** in / **{}** out / **{}** cached (${:.2})\n",
r.tokens.total_input, r.tokens.total_output, r.tokens.total_cached, r.tokens.total_cost_usd,
));
out.push_str(&format!("- messages: **{}**\n", r.messages.total,));
out.push_str(&format!("- commits to main: **{}**\n", r.commits_to_main,));
out.push_str(&format!(
"- uptime: agent0 {} / any session {}\n\n",
fmt_seconds(r.uptime.agent0_up_seconds),
fmt_seconds(r.uptime.any_session_up_seconds),
));
out.push_str("## sessions by agent\n\n");
out.push_str(&plotly_bar("sessions-by-agent", &r.sessions.by_agent));
out.push_str("## messages by source\n\n");
out.push_str(&plotly_bar("messages-by-source", &r.messages.by_source));
out.push_str("## clones by runtime\n\n");
out.push_str(&plotly_bar("clones-by-runtime", &r.clones.by_runtime));
out.push_str("## tokens by runtime\n\n");
out.push_str(&plotly_bar("tokens-by-runtime", &r.tokens.by_runtime));
out.push_str("## token percentiles (input / event)\n\n");
out.push_str(&plotly_percentiles(
"token-percentiles",
r.tokens.p50_input_per_event,
r.tokens.p90_input_per_event,
r.tokens.p99_input_per_event,
));
if !r.incidents.is_empty() {
out.push_str("## incidents\n\n");
for inc in &r.incidents {
out.push_str(&format!(
"- `{}` **{}** {} — {}\n",
inc.ts_utc,
inc.kind,
inc.agent.as_deref().unwrap_or("-"),
inc.summary,
));
}
out.push('\n');
}
out.push_str("## iroh\n\n");
out.push_str(&format!(
"- connect: {}, evict: {}, reconnect: {}, handshake_refused: {}\n",
r.iroh.connect, r.iroh.evict, r.iroh.reconnect, r.iroh.handshake_refused,
));
out
}
// netsky.ai dark-theme bar layout. Transparent bg lets the page bg show
// through; grid + axis lines use muted accent so they don't compete with
// the data; font matches the site (Public Sans, slate-200).
const PLOT_LAYOUT: &str = r##""paper_bgcolor":"rgba(0,0,0,0)","plot_bgcolor":"rgba(0,0,0,0)","autosize":true,"margin":{"t":16,"r":16,"b":60,"l":48},"font":{"family":"Public Sans, -apple-system, system-ui, sans-serif","color":"#e2e8f0","size":13},"xaxis":{"gridcolor":"rgba(167,139,250,0.08)","linecolor":"rgba(167,139,250,0.25)","tickcolor":"rgba(167,139,250,0.25)","zeroline":false},"yaxis":{"gridcolor":"rgba(167,139,250,0.08)","linecolor":"rgba(167,139,250,0.25)","tickcolor":"rgba(167,139,250,0.25)","zeroline":false},"hoverlabel":{"bgcolor":"#16132a","bordercolor":"rgba(167,139,250,0.35)","font":{"color":"#e2e8f0"}}"##;
const BAR_MARKER: &str = r##""marker":{"color":"rgba(167,139,250,0.85)","line":{"color":"rgb(167,139,250)","width":1}}"##;
const NO_DATA_CARD: &str = "<div class=\"analytics-empty\" style=\"padding:1.5rem;margin:0 0 1.2rem 0;border:1px dashed rgba(167,139,250,0.2);border-radius:8px;color:rgba(226,232,240,0.5);text-align:center;font-style:italic;font-family:system-ui,sans-serif;\">no data for this day</div>\n\n";
fn is_empty(data: &[LabeledCount]) -> bool {
data.is_empty() || data.iter().all(|c| c.count == 0)
}
fn plotly_bar(id: &str, data: &[LabeledCount]) -> String {
if is_empty(data) {
return NO_DATA_CARD.to_string();
}
let labels: Vec<String> = data.iter().map(|c| json_str(&c.label)).collect();
let counts: Vec<String> = data.iter().map(|c| c.count.to_string()).collect();
format!(
"{{% plotly(id=\"{id}\") %}}\n\
{{\"data\":[{{\"type\":\"bar\",\"x\":[{xs}],\"y\":[{ys}],{marker}}}],\
\"layout\":{{{layout}}},\
\"config\":{{\"responsive\":true,\"displayModeBar\":false}}}}\n\
{{% end %}}\n\n",
xs = labels.join(","),
ys = counts.join(","),
marker = BAR_MARKER,
layout = PLOT_LAYOUT,
)
}
fn plotly_percentiles(id: &str, p50: Option<i64>, p90: Option<i64>, p99: Option<i64>) -> String {
let data = [
LabeledCount {
label: "p50".into(),
count: p50.unwrap_or(0),
},
LabeledCount {
label: "p90".into(),
count: p90.unwrap_or(0),
},
LabeledCount {
label: "p99".into(),
count: p99.unwrap_or(0),
},
];
if p50.is_none() && p90.is_none() && p99.is_none() {
return NO_DATA_CARD.to_string();
}
plotly_bar(id, &data)
}
fn json_str(s: &str) -> String {
serde_json::to_string(s).unwrap_or_else(|_| "\"?\"".to_string())
}
fn fmt_seconds(s: i64) -> String {
if s <= 0 {
return "0s".to_string();
}
let hours = s / 3600;
let mins = (s % 3600) / 60;
let secs = s % 60;
if hours > 0 {
format!("{hours}h{mins:02}m")
} else if mins > 0 {
format!("{mins}m{secs:02}s")
} else {
format!("{secs}s")
}
}
fn fmt_opt_seconds(s: Option<i64>) -> String {
s.map(fmt_seconds).unwrap_or_else(|| "—".to_string())
}
fn render_standalone_html(r: &DailyReport) -> String {
let mut charts = String::new();
charts.push_str(&html_bar(
"sessions-by-agent",
"sessions by agent",
&r.sessions.by_agent,
));
charts.push_str(&html_bar(
"messages-by-source",
"messages by source",
&r.messages.by_source,
));
charts.push_str(&html_bar(
"clones-by-runtime",
"clones by runtime",
&r.clones.by_runtime,
));
charts.push_str(&html_bar(
"tokens-by-runtime",
"tokens by runtime",
&r.tokens.by_runtime,
));
let pct = [
LabeledCount {
label: "p50".into(),
count: r.tokens.p50_input_per_event.unwrap_or(0),
},
LabeledCount {
label: "p90".into(),
count: r.tokens.p90_input_per_event.unwrap_or(0),
},
LabeledCount {
label: "p99".into(),
count: r.tokens.p99_input_per_event.unwrap_or(0),
},
];
let pct_slice: &[LabeledCount] = if r.tokens.p50_input_per_event.is_none()
&& r.tokens.p90_input_per_event.is_none()
&& r.tokens.p99_input_per_event.is_none()
{
&[]
} else {
&pct
};
charts.push_str(&html_bar(
"token-percentiles",
"token percentiles (input/event)",
pct_slice,
));
format!(
"<!doctype html>\n\
<html lang=\"en\"><head><meta charset=\"utf-8\"><meta name=\"viewport\" content=\"width=device-width,initial-scale=1\"><title>netsky analytics {date}</title>\n\
<script src=\"https://cdn.plot.ly/plotly-2.35.2.min.js\" charset=\"utf-8\"></script>\n\
<style>\
:root{{--accent:rgb(167,139,250);--accent-secondary:rgb(34,211,238);--bg:#0f0d1a;--bg-raised:#16132a;--fg:#e2e8f0;--muted:rgba(226,232,240,.55);--border:rgba(167,139,250,.18);}}\
body{{font-family:'Public Sans',system-ui,sans-serif;background:var(--bg);color:var(--fg);max-width:960px;margin:0 auto;padding:2rem 1.2rem 4rem;line-height:1.5;}}\
h1{{color:var(--accent);font-weight:700;margin:0 0 .4rem;}}\
h2{{color:var(--fg);font-weight:500;font-size:1.1rem;margin:1.8rem 0 .4rem;border-bottom:1px solid var(--border);padding-bottom:.35rem;}}\
.meta{{color:var(--muted);font-style:italic;margin:0 0 2rem;}}\
ul.stats{{list-style:none;padding:0;display:grid;grid-template-columns:repeat(auto-fit,minmax(240px,1fr));gap:.5rem 1.2rem;margin:0 0 1.5rem;}}\
ul.stats li{{border-left:2px solid var(--accent);padding:.2rem 0 .2rem .75rem;color:var(--muted);}}\
ul.stats b{{color:var(--fg);}}\
.chart{{width:100%;min-height:320px;margin:.3rem 0 1.4rem;}}\
.analytics-empty{{padding:1.5rem;margin:.3rem 0 1.4rem;border:1px dashed rgba(167,139,250,.2);border-radius:8px;color:var(--muted);text-align:center;font-style:italic;}}\
details{{margin-top:2rem;color:var(--muted);}}\
summary{{cursor:pointer;color:var(--accent-secondary);}}\
pre{{background:var(--bg-raised);border:1px solid var(--border);border-radius:6px;padding:1rem;overflow-x:auto;color:var(--fg);}}\
a{{color:var(--accent);}}\
</style>\n\
</head><body>\n\
<h1>netsky analytics · {date}</h1>\n\
<p class=\"meta\">generated {generated}</p>\n\
<ul class=\"stats\">\n\
<li>sessions <b>{session_count}</b> (median {session_median})</li>\n\
<li>crashes <b>{crash_count}</b> (MTTR {mttr})</li>\n\
<li>clones dispatched <b>{clone_count}</b> ({clone_hours:.1} clone-hours)</li>\n\
<li>tokens <b>{t_in}</b> in / <b>{t_out}</b> out / <b>{t_cached}</b> cached (${cost:.2})</li>\n\
<li>messages <b>{msg}</b></li>\n\
<li>commits to main <b>{commits}</b></li>\n\
<li>uptime agent0 <b>{up0}</b> / any session <b>{upa}</b></li>\n\
<li>iroh connect {c} / evict {e} / reconnect {r} / refused {rr}</li>\n\
</ul>\n\
{charts}\n\
<details><summary>raw json</summary><pre>{raw}</pre></details>\n\
</body></html>\n",
date = r.date,
generated = r.generated_at,
session_count = r.sessions.count,
session_median = fmt_opt_seconds(r.sessions.median_length_seconds),
crash_count = r.crashes.count,
mttr = fmt_opt_seconds(r.crashes.mttr_seconds),
clone_count = r.clones.dispatched,
clone_hours = r.clones.clone_hours,
t_in = r.tokens.total_input,
t_out = r.tokens.total_output,
t_cached = r.tokens.total_cached,
cost = r.tokens.total_cost_usd,
msg = r.messages.total,
commits = r.commits_to_main,
up0 = fmt_seconds(r.uptime.agent0_up_seconds),
upa = fmt_seconds(r.uptime.any_session_up_seconds),
c = r.iroh.connect,
e = r.iroh.evict,
r = r.iroh.reconnect,
rr = r.iroh.handshake_refused,
charts = charts,
raw = html_escape(&serde_json::to_string_pretty(r).unwrap_or_default()),
)
}
fn html_bar(id: &str, title: &str, data: &[LabeledCount]) -> String {
if is_empty(data) {
return format!("<h2>{title}</h2>\n{NO_DATA_CARD}");
}
let xs: Vec<String> = data.iter().map(|c| json_str(&c.label)).collect();
let ys: Vec<String> = data.iter().map(|c| c.count.to_string()).collect();
format!(
"<h2>{title}</h2>\n\
<div id=\"{id}\" class=\"chart\"></div>\n\
<script>Plotly.newPlot('{id}',[{{type:'bar',x:[{xs}],y:[{ys}],{marker}}}],{{{layout}}},{{responsive:true,displayModeBar:false}});</script>\n",
xs = xs.join(","),
ys = ys.join(","),
marker = BAR_MARKER,
layout = PLOT_LAYOUT,
)
}
fn html_escape(s: &str) -> String {
s.replace('&', "&")
.replace('<', "<")
.replace('>', ">")
}
#[cfg(test)]
mod tests {
use super::*;
fn sample() -> [LabeledCount; 2] {
[
LabeledCount {
label: "agent0".into(),
count: 7,
},
LabeledCount {
label: "agent1".into(),
count: 3,
},
]
}
#[test]
fn plotly_bar_renders_accent_bar_and_transparent_bg() {
let out = plotly_bar("test", &sample());
assert!(
out.contains("rgba(167,139,250,0.85)"),
"accent bar color: {out}"
);
assert!(
out.contains("paper_bgcolor\":\"rgba(0,0,0,0)"),
"transparent bg: {out}"
);
assert!(
out.contains("\"responsive\":true"),
"responsive config: {out}"
);
assert!(!out.contains("analytics-empty"));
}
#[test]
fn plotly_bar_emits_empty_card_when_no_rows() {
let out = plotly_bar("test", &[]);
assert!(
out.contains("analytics-empty"),
"empty card for zero-row: {out}"
);
assert!(!out.contains("Plotly"));
}
#[test]
fn plotly_bar_emits_empty_card_when_all_zero() {
let all_zero = [
LabeledCount {
label: "a".into(),
count: 0,
},
LabeledCount {
label: "b".into(),
count: 0,
},
];
let out = plotly_bar("test", &all_zero);
assert!(
out.contains("analytics-empty"),
"empty card for all-zero: {out}"
);
}
#[test]
fn plotly_percentiles_empty_when_all_none() {
let out = plotly_percentiles("p", None, None, None);
assert!(out.contains("analytics-empty"));
}
#[test]
fn html_bar_uses_accent_marker() {
let out = html_bar("h", "t", &sample());
assert!(out.contains("rgba(167,139,250,0.85)"));
assert!(out.contains("paper_bgcolor"));
}
#[test]
fn html_bar_empty_state() {
let out = html_bar("h", "empty case", &[]);
assert!(out.contains("empty case"));
assert!(out.contains("analytics-empty"));
}
}