use std::collections::HashMap;
use axum::Json;
use axum::extract::{Query, State};
use axum::http::StatusCode;
use chrono::{DateTime, Duration, Utc};
use futures::StreamExt;
use kanade_shared::kv::{BUCKET_JOBS, BUCKET_VIEWS};
use kanade_shared::manifest::{
AggregateAgg, AggregateRender, AggregateScope, AggregateTimeBucket, AggregateTransform,
AggregateWidget, Manifest, View,
};
use serde::{Deserialize, Serialize};
use sqlx::{Row, SqlitePool};
use tracing::warn;
use super::AppState;
const DEFAULT_LIMIT: i64 = 10;
const MAX_RAW_ROWS: i64 = 10_000;
#[derive(Deserialize)]
pub struct AnalyticsQuery {
pub from: Option<DateTime<Utc>>,
pub to: Option<DateTime<Utc>>,
pub tz_offset_minutes: Option<i64>,
pub pc_id: Option<String>,
}
struct Ctx<'a> {
pool: &'a SqlitePool,
pc_id: Option<&'a str>,
from: DateTime<Utc>,
to: DateTime<Utc>,
tz_mod: &'a str,
}
#[derive(Serialize)]
pub struct BarRow {
pub label: String,
pub value: i64,
#[serde(skip_serializing_if = "Option::is_none")]
pub est_minutes: Option<i64>,
}
#[derive(Serialize)]
pub struct HourBucket {
pub hour: i64,
pub total: i64,
pub active: i64,
}
#[derive(Serialize)]
#[serde(tag = "render", rename_all = "lowercase")]
pub enum WidgetData {
Bar {
rows: Vec<BarRow>,
},
Gauge {
total: i64,
active: i64,
ratio: f64,
#[serde(skip_serializing_if = "Option::is_none")]
est_minutes: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
first: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
last: Option<DateTime<Utc>>,
},
Timeline {
buckets: Vec<HourBucket>,
},
Stat {
value: i64,
#[serde(skip_serializing_if = "Option::is_none")]
est_minutes: Option<i64>,
},
}
#[derive(Serialize)]
pub struct WidgetResult {
pub dashboard: String,
pub title: String,
pub scope: &'static str,
#[serde(flatten)]
pub data: WidgetData,
}
pub async fn get(
State(state): State<AppState>,
Query(q): Query<AnalyticsQuery>,
) -> Result<Json<Vec<WidgetResult>>, StatusCode> {
let to = q.to.unwrap_or_else(Utc::now);
let from = q.from.unwrap_or(to - Duration::hours(24));
if from >= to {
return Err(StatusCode::BAD_REQUEST);
}
let tz_off = q.tz_offset_minutes.unwrap_or(0).clamp(-900, 900);
let tz_mod = format!("{tz_off:+} minutes");
let want_scope = if q.pc_id.is_some() {
AggregateScope::Pc
} else {
AggregateScope::Fleet
};
let scope_str = if q.pc_id.is_some() { "pc" } else { "fleet" };
let ctx = Ctx {
pool: &state.pool,
pc_id: q.pc_id.as_deref(),
from,
to,
tz_mod: &tz_mod,
};
let mut widgets = load_widgets(&state.jetstream).await;
widgets.sort_by(|a, b| widget_sort_key(a).cmp(&widget_sort_key(b)));
let mut out = Vec::new();
for w in widgets {
if w.scope != want_scope {
continue;
}
match compute_widget(&ctx, &w).await {
Ok(Some(data)) => out.push(WidgetResult {
dashboard: w.dashboard,
title: w.title,
scope: scope_str,
data,
}),
Ok(None) => {}
Err(e) => {
warn!(error = %e, dashboard = %w.dashboard, title = %w.title, "analytics: widget compute")
}
}
}
Ok(Json(out))
}
fn widget_sort_key(w: &AggregateWidget) -> (i32, &str, &str) {
(w.order.unwrap_or(0), &w.dashboard, &w.title)
}
async fn load_widgets(jetstream: &async_nats::jetstream::Context) -> Vec<AggregateWidget> {
let mut out = Vec::new();
if let Ok(kv) = jetstream.get_key_value(BUCKET_JOBS).await
&& let Ok(mut keys) = kv.keys().await
{
while let Some(key) = keys.next().await {
let Ok(key) = key else { continue };
let Some(entry) = kv.get(&key).await.unwrap_or(None) else {
continue;
};
if let Ok(job) = serde_json::from_slice::<Manifest>(&entry)
&& let Some(widgets) = job.aggregate
{
out.extend(widgets);
}
}
}
if let Ok(kv) = jetstream.get_key_value(BUCKET_VIEWS).await
&& let Ok(mut keys) = kv.keys().await
{
while let Some(key) = keys.next().await {
let Ok(key) = key else { continue };
let Some(entry) = kv.get(&key).await.unwrap_or(None) else {
continue;
};
if let Ok(view) = serde_json::from_slice::<View>(&entry) {
out.extend(view.widgets);
}
}
}
out
}
async fn compute_widget(ctx: &Ctx<'_>, w: &AggregateWidget) -> anyhow::Result<Option<WidgetData>> {
if matches!(w.agg, AggregateAgg::Unknown)
|| matches!(w.render, AggregateRender::Unknown)
|| matches!(w.transform, Some(AggregateTransform::Unknown))
|| matches!(w.time_bucket, Some(AggregateTimeBucket::Unknown))
{
return Ok(None);
}
let exclude_json = if w.exclude.is_empty() {
None
} else {
Some(serde_json::to_string(&w.exclude)?)
};
let limit = w.limit.map(i64::from).unwrap_or(DEFAULT_LIMIT);
let sample = w.sample_minutes.map(i64::from);
if matches!(w.time_bucket, Some(AggregateTimeBucket::Hour)) {
return Ok(Some(timeline(ctx, w).await?));
}
let data = match w.agg {
AggregateAgg::Ratio => gauge(ctx, w, sample).await?,
AggregateAgg::Count => match &w.group_by {
Some(gb) if gb == "pc_id" => bar_count_pc(ctx, w, exclude_json, limit, sample).await?,
Some(gb) if matches!(w.transform, Some(AggregateTransform::Host)) => {
bar_host(ctx, w, gb, limit, sample).await?
}
Some(gb) => bar_count_path(ctx, w, gb, exclude_json, limit, sample).await?,
None => stat_count(ctx, w, sample).await?,
},
AggregateAgg::Sum => {
let vp = w.value_path.as_deref().unwrap_or_default();
match &w.group_by {
Some(gb) if gb == "pc_id" => sum_bar_pc(ctx, w, vp, exclude_json, limit).await?,
Some(gb) => sum_bar_path(ctx, w, gb, vp, exclude_json, limit).await?,
None => sum_stat(ctx, w, vp).await?,
}
}
_ => return Ok(None),
};
Ok(Some(data))
}
async fn bar_count_path(
ctx: &Ctx<'_>,
w: &AggregateWidget,
path: &str,
exclude_json: Option<String>,
limit: i64,
sample: Option<i64>,
) -> anyhow::Result<WidgetData> {
let rows = sqlx::query(
"SELECT json_extract(payload, '$.' || ?6) AS g, COUNT(*) AS n \
FROM obs_events \
WHERE (?1 IS NULL OR pc_id = ?1) AND kind = ?2 AND (?3 IS NULL OR source = ?3) \
AND at >= ?4 AND at < ?5 \
AND json_extract(payload, '$.' || ?6) IS NOT NULL \
AND json_extract(payload, '$.' || ?6) <> '' \
AND (?7 IS NULL OR json_extract(payload, '$.' || ?6) NOT IN (SELECT value FROM json_each(?7))) \
GROUP BY json_extract(payload, '$.' || ?6) ORDER BY n DESC LIMIT ?8",
)
.bind(ctx.pc_id)
.bind(&w.kind)
.bind(w.source.as_deref())
.bind(ctx.from)
.bind(ctx.to)
.bind(path)
.bind(exclude_json)
.bind(limit)
.fetch_all(ctx.pool)
.await?;
Ok(WidgetData::Bar {
rows: rows
.into_iter()
.filter_map(|r| bar_row(&r, sample))
.collect(),
})
}
async fn bar_count_pc(
ctx: &Ctx<'_>,
w: &AggregateWidget,
exclude_json: Option<String>,
limit: i64,
sample: Option<i64>,
) -> anyhow::Result<WidgetData> {
let rows = sqlx::query(
"SELECT pc_id AS g, COUNT(*) AS n \
FROM obs_events \
WHERE (?1 IS NULL OR pc_id = ?1) AND kind = ?2 AND (?3 IS NULL OR source = ?3) \
AND at >= ?4 AND at < ?5 \
AND (?6 IS NULL OR pc_id NOT IN (SELECT value FROM json_each(?6))) \
GROUP BY pc_id ORDER BY n DESC LIMIT ?7",
)
.bind(ctx.pc_id)
.bind(&w.kind)
.bind(w.source.as_deref())
.bind(ctx.from)
.bind(ctx.to)
.bind(exclude_json)
.bind(limit)
.fetch_all(ctx.pool)
.await?;
Ok(WidgetData::Bar {
rows: rows
.into_iter()
.filter_map(|r| bar_row(&r, sample))
.collect(),
})
}
async fn bar_host(
ctx: &Ctx<'_>,
w: &AggregateWidget,
path: &str,
limit: i64,
sample: Option<i64>,
) -> anyhow::Result<WidgetData> {
let rows = sqlx::query(
"SELECT json_extract(payload, '$.' || ?6) AS v \
FROM obs_events \
WHERE (?1 IS NULL OR pc_id = ?1) AND kind = ?2 AND (?3 IS NULL OR source = ?3) \
AND at >= ?4 AND at < ?5 \
AND json_extract(payload, '$.' || ?6) IS NOT NULL \
ORDER BY at DESC LIMIT ?7",
)
.bind(ctx.pc_id)
.bind(&w.kind)
.bind(w.source.as_deref())
.bind(ctx.from)
.bind(ctx.to)
.bind(path)
.bind(MAX_RAW_ROWS)
.fetch_all(ctx.pool)
.await?;
let excluded: std::collections::HashSet<&str> = w.exclude.iter().map(String::as_str).collect();
let mut counts: HashMap<String, i64> = HashMap::new();
for r in &rows {
let v: String = r.try_get("v").unwrap_or_default();
if let Some(host) = host_of(&v) {
if excluded.contains(host.as_str()) {
continue;
}
*counts.entry(host).or_insert(0) += 1;
}
}
let mut ranked: Vec<(String, i64)> = counts.into_iter().collect();
ranked.sort_by(|a, b| b.1.cmp(&a.1).then(a.0.cmp(&b.0)));
ranked.truncate(limit.max(0) as usize);
Ok(WidgetData::Bar {
rows: ranked
.into_iter()
.map(|(label, value)| BarRow {
label,
value,
est_minutes: sample.map(|m| value * m),
})
.collect(),
})
}
async fn stat_count(
ctx: &Ctx<'_>,
w: &AggregateWidget,
sample: Option<i64>,
) -> anyhow::Result<WidgetData> {
let row = sqlx::query(
"SELECT COUNT(*) AS n FROM obs_events \
WHERE (?1 IS NULL OR pc_id = ?1) AND kind = ?2 AND (?3 IS NULL OR source = ?3) \
AND at >= ?4 AND at < ?5",
)
.bind(ctx.pc_id)
.bind(&w.kind)
.bind(w.source.as_deref())
.bind(ctx.from)
.bind(ctx.to)
.fetch_one(ctx.pool)
.await?;
let value: i64 = row.try_get("n").unwrap_or(0);
Ok(WidgetData::Stat {
value,
est_minutes: sample.map(|m| value * m),
})
}
async fn gauge(
ctx: &Ctx<'_>,
w: &AggregateWidget,
sample: Option<i64>,
) -> anyhow::Result<WidgetData> {
let bool_path = w.bool_path.as_deref().unwrap_or_default();
let row = sqlx::query(
"SELECT COUNT(*) AS total, \
COALESCE(SUM(CASE WHEN json_extract(payload, '$.' || ?6) = 1 THEN 1 ELSE 0 END), 0) AS active, \
MIN(CASE WHEN json_extract(payload, '$.' || ?6) = 1 THEN at END) AS first_at, \
MAX(CASE WHEN json_extract(payload, '$.' || ?6) = 1 THEN at END) AS last_at \
FROM obs_events \
WHERE (?1 IS NULL OR pc_id = ?1) AND kind = ?2 AND (?3 IS NULL OR source = ?3) \
AND at >= ?4 AND at < ?5",
)
.bind(ctx.pc_id)
.bind(&w.kind)
.bind(w.source.as_deref())
.bind(ctx.from)
.bind(ctx.to)
.bind(bool_path)
.fetch_one(ctx.pool)
.await?;
let total: i64 = row.try_get("total").unwrap_or(0);
let active: i64 = row.try_get("active").unwrap_or(0);
Ok(WidgetData::Gauge {
total,
active,
ratio: if total > 0 {
active as f64 / total as f64
} else {
0.0
},
est_minutes: sample.map(|m| active * m),
first: row
.try_get::<Option<DateTime<Utc>>, _>("first_at")
.unwrap_or(None),
last: row
.try_get::<Option<DateTime<Utc>>, _>("last_at")
.unwrap_or(None),
})
}
async fn timeline(ctx: &Ctx<'_>, w: &AggregateWidget) -> anyhow::Result<WidgetData> {
let rows = match w.bool_path.as_deref() {
Some(bool_path) => {
sqlx::query(
"SELECT CAST(strftime('%H', at, ?1) AS INTEGER) AS hour, \
COUNT(*) AS total, \
COALESCE(SUM(CASE WHEN json_extract(payload, '$.' || ?7) = 1 THEN 1 ELSE 0 END), 0) AS active \
FROM obs_events \
WHERE (?2 IS NULL OR pc_id = ?2) AND kind = ?3 AND (?4 IS NULL OR source = ?4) \
AND at >= ?5 AND at < ?6 \
GROUP BY hour ORDER BY hour",
)
.bind(ctx.tz_mod)
.bind(ctx.pc_id)
.bind(&w.kind)
.bind(w.source.as_deref())
.bind(ctx.from)
.bind(ctx.to)
.bind(bool_path)
.fetch_all(ctx.pool)
.await?
}
None => {
sqlx::query(
"SELECT CAST(strftime('%H', at, ?1) AS INTEGER) AS hour, \
COUNT(*) AS total, COUNT(*) AS active \
FROM obs_events \
WHERE (?2 IS NULL OR pc_id = ?2) AND kind = ?3 AND (?4 IS NULL OR source = ?4) \
AND at >= ?5 AND at < ?6 \
GROUP BY hour ORDER BY hour",
)
.bind(ctx.tz_mod)
.bind(ctx.pc_id)
.bind(&w.kind)
.bind(w.source.as_deref())
.bind(ctx.from)
.bind(ctx.to)
.fetch_all(ctx.pool)
.await?
}
};
let buckets = rows
.into_iter()
.filter_map(|r| {
let hour: i64 = r.try_get("hour").ok()?;
let total: i64 = r.try_get("total").unwrap_or(0);
let active: i64 = r.try_get("active").unwrap_or(0);
Some(HourBucket {
hour,
total,
active,
})
})
.collect();
Ok(WidgetData::Timeline { buckets })
}
async fn sum_bar_path(
ctx: &Ctx<'_>,
w: &AggregateWidget,
path: &str,
value_path: &str,
exclude_json: Option<String>,
limit: i64,
) -> anyhow::Result<WidgetData> {
let rows = sqlx::query(
"SELECT json_extract(payload, '$.' || ?6) AS g, \
COALESCE(SUM(json_extract(payload, '$.' || ?7)), 0) AS s \
FROM obs_events \
WHERE (?1 IS NULL OR pc_id = ?1) AND kind = ?2 AND (?3 IS NULL OR source = ?3) \
AND at >= ?4 AND at < ?5 \
AND json_extract(payload, '$.' || ?6) IS NOT NULL \
AND (?8 IS NULL OR json_extract(payload, '$.' || ?6) NOT IN (SELECT value FROM json_each(?8))) \
GROUP BY json_extract(payload, '$.' || ?6) ORDER BY s DESC LIMIT ?9",
)
.bind(ctx.pc_id)
.bind(&w.kind)
.bind(w.source.as_deref())
.bind(ctx.from)
.bind(ctx.to)
.bind(path)
.bind(value_path)
.bind(exclude_json)
.bind(limit)
.fetch_all(ctx.pool)
.await?;
Ok(WidgetData::Bar {
rows: rows.into_iter().filter_map(|r| sum_row(&r)).collect(),
})
}
async fn sum_bar_pc(
ctx: &Ctx<'_>,
w: &AggregateWidget,
value_path: &str,
exclude_json: Option<String>,
limit: i64,
) -> anyhow::Result<WidgetData> {
let rows = sqlx::query(
"SELECT pc_id AS g, COALESCE(SUM(json_extract(payload, '$.' || ?6)), 0) AS s \
FROM obs_events \
WHERE (?1 IS NULL OR pc_id = ?1) AND kind = ?2 AND (?3 IS NULL OR source = ?3) \
AND at >= ?4 AND at < ?5 \
AND (?7 IS NULL OR pc_id NOT IN (SELECT value FROM json_each(?7))) \
GROUP BY pc_id ORDER BY s DESC LIMIT ?8",
)
.bind(ctx.pc_id)
.bind(&w.kind)
.bind(w.source.as_deref())
.bind(ctx.from)
.bind(ctx.to)
.bind(value_path)
.bind(exclude_json)
.bind(limit)
.fetch_all(ctx.pool)
.await?;
Ok(WidgetData::Bar {
rows: rows.into_iter().filter_map(|r| sum_row(&r)).collect(),
})
}
async fn sum_stat(
ctx: &Ctx<'_>,
w: &AggregateWidget,
value_path: &str,
) -> anyhow::Result<WidgetData> {
let row = sqlx::query(
"SELECT COALESCE(SUM(json_extract(payload, '$.' || ?6)), 0) AS s FROM obs_events \
WHERE (?1 IS NULL OR pc_id = ?1) AND kind = ?2 AND (?3 IS NULL OR source = ?3) \
AND at >= ?4 AND at < ?5",
)
.bind(ctx.pc_id)
.bind(&w.kind)
.bind(w.source.as_deref())
.bind(ctx.from)
.bind(ctx.to)
.bind(value_path)
.fetch_one(ctx.pool)
.await?;
Ok(WidgetData::Stat {
value: sum_value(&row),
est_minutes: None,
})
}
fn bar_row(r: &sqlx::sqlite::SqliteRow, sample: Option<i64>) -> Option<BarRow> {
let label: String = r.try_get("g").ok()?;
let value: i64 = r.try_get("n").unwrap_or(0);
Some(BarRow {
label,
value,
est_minutes: sample.map(|m| value * m),
})
}
fn sum_row(r: &sqlx::sqlite::SqliteRow) -> Option<BarRow> {
let label: String = r.try_get("g").ok()?;
Some(BarRow {
label,
value: sum_value(r),
est_minutes: None,
})
}
fn sum_value(r: &sqlx::sqlite::SqliteRow) -> i64 {
r.try_get::<i64, _>("s")
.or_else(|_| r.try_get::<f64, _>("s").map(|f| f.round() as i64))
.unwrap_or(0)
}
fn host_of(url: &str) -> Option<String> {
let lower = url.trim_start().to_ascii_lowercase();
for scheme in [
"about:",
"data:",
"javascript:",
"chrome:",
"chrome-extension:",
"edge:",
"brave:",
"view-source:",
"file:",
] {
if lower.starts_with(scheme) {
return None;
}
}
let rest = url.split_once("://").map(|(_, r)| r).unwrap_or(url);
let authority = rest.split(['/', '?', '#']).next()?;
let no_userinfo = authority.rsplit('@').next().unwrap_or(authority);
let host = if no_userinfo.starts_with('[') {
match no_userinfo.rfind(']') {
Some(i) => &no_userinfo[..=i],
None => no_userinfo,
}
} else {
no_userinfo.split(':').next().unwrap_or(no_userinfo)
};
let host = host.trim();
if host.is_empty() {
None
} else {
Some(host.to_lowercase())
}
}
#[cfg(test)]
mod tests {
use chrono::TimeZone;
use super::*;
#[test]
fn host_of_extracts_authority() {
assert_eq!(
host_of("https://user:pw@Example.com:8443/x?y#z").as_deref(),
Some("example.com")
);
assert_eq!(host_of("http://[::1]:8080/").as_deref(), Some("[::1]"));
assert_eq!(host_of("github.com/foo").as_deref(), Some("github.com"));
assert_eq!(host_of("about:blank"), None);
assert_eq!(host_of("chrome-extension://abc/page"), None);
assert_eq!(host_of(" "), None);
}
async fn seeded_pool() -> SqlitePool {
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
sqlx::query(
"CREATE TABLE obs_events ( \
id INTEGER PRIMARY KEY AUTOINCREMENT, pc_id TEXT NOT NULL, \
at TIMESTAMP NOT NULL, kind TEXT NOT NULL, source TEXT NOT NULL, \
event_record_id TEXT, payload TEXT )",
)
.execute(&pool)
.await
.unwrap();
let at = |h: u32| Utc.with_ymd_and_hms(2026, 6, 17, h, 0, 0).unwrap();
for (pc, t, kind, payload) in [
("p1", at(9), "presence", r#"{"active":true}"#),
("p1", at(10), "presence", r#"{"active":true}"#),
("p1", at(11), "presence", r#"{"active":false}"#),
("p1", at(12), "presence", r#"{"active":false}"#),
(
"p1",
at(9),
"app_sample",
r#"{"foreground":{"app":"brave"}}"#,
),
(
"p1",
at(10),
"app_sample",
r#"{"foreground":{"app":"brave"}}"#,
),
(
"p1",
at(11),
"app_sample",
r#"{"foreground":{"app":"brave"}}"#,
),
(
"p1",
at(12),
"app_sample",
r#"{"foreground":{"app":"code"}}"#,
),
(
"p1",
at(13),
"app_sample",
r#"{"foreground":{"app":"LockApp"}}"#,
),
(
"p1",
at(9),
"web_visit",
r#"{"url":"https://github.com/a"}"#,
),
(
"p1",
at(10),
"web_visit",
r#"{"url":"https://github.com/b"}"#,
),
(
"p1",
at(11),
"web_visit",
r#"{"url":"https://example.com/"}"#,
),
(
"p2",
at(9),
"app_sample",
r#"{"foreground":{"app":"brave"}}"#,
),
] {
sqlx::query(
"INSERT INTO obs_events (pc_id, at, kind, source, payload) VALUES (?,?,?,?,?)",
)
.bind(pc)
.bind(t)
.bind(kind)
.bind("test")
.bind(payload)
.execute(&pool)
.await
.unwrap();
}
pool
}
fn widget(kind: &str, agg: AggregateAgg, render: AggregateRender) -> AggregateWidget {
AggregateWidget {
dashboard: "D".into(),
title: "T".into(),
order: None,
scope: AggregateScope::Pc,
kind: kind.into(),
source: None,
agg,
group_by: None,
bool_path: None,
value_path: None,
transform: None,
sample_minutes: None,
exclude: Vec::new(),
time_bucket: None,
limit: None,
render,
}
}
fn ctx<'a>(pool: &'a SqlitePool, pc_id: Option<&'a str>) -> Ctx<'a> {
Ctx {
pool,
pc_id,
from: Utc.with_ymd_and_hms(2026, 6, 17, 0, 0, 0).unwrap(),
to: Utc.with_ymd_and_hms(2026, 6, 18, 0, 0, 0).unwrap(),
tz_mod: "+0 minutes",
}
}
#[test]
fn widget_sort_key_orders_by_order_then_alpha() {
let mut ws = [
{
let mut w = widget("k", AggregateAgg::Count, AggregateRender::Stat);
w.dashboard = "Utilization".into();
w.title = "B".into();
w
},
{
let mut w = widget("k", AggregateAgg::Count, AggregateRender::Stat);
w.dashboard = "Utilization".into();
w.title = "A".into();
w
},
{
let mut w = widget("k", AggregateAgg::Count, AggregateRender::Stat);
w.dashboard = "Zzz".into();
w.title = "Z".into();
w.order = Some(-1);
w
},
];
ws.sort_by(|a, b| widget_sort_key(a).cmp(&widget_sort_key(b)));
let got: Vec<(&str, &str)> = ws
.iter()
.map(|w| (w.dashboard.as_str(), w.title.as_str()))
.collect();
assert_eq!(
got,
[("Zzz", "Z"), ("Utilization", "A"), ("Utilization", "B")]
);
}
#[tokio::test]
async fn count_bar_groups_excludes_and_estimates_time() {
let pool = seeded_pool().await;
let mut w = widget("app_sample", AggregateAgg::Count, AggregateRender::Bar);
w.group_by = Some("foreground.app".into());
w.exclude = vec!["LockApp".into()];
w.sample_minutes = Some(2);
let data = compute_widget(&ctx(&pool, Some("p1")), &w)
.await
.unwrap()
.unwrap();
let WidgetData::Bar { rows } = data else {
panic!("expected bar")
};
assert_eq!(rows.len(), 2);
assert_eq!(rows[0].label, "brave");
assert_eq!(rows[0].value, 3);
assert_eq!(rows[0].est_minutes, Some(6));
assert_eq!(rows[1].label, "code");
}
#[tokio::test]
async fn ratio_gauge_counts_true_over_total() {
let pool = seeded_pool().await;
let mut w = widget("presence", AggregateAgg::Ratio, AggregateRender::Gauge);
w.bool_path = Some("active".into());
w.sample_minutes = Some(5);
let data = compute_widget(&ctx(&pool, Some("p1")), &w)
.await
.unwrap()
.unwrap();
let WidgetData::Gauge {
total,
active,
ratio,
est_minutes,
..
} = data
else {
panic!("expected gauge")
};
assert_eq!(total, 4);
assert_eq!(active, 2);
assert!((ratio - 0.5).abs() < 1e-9);
assert_eq!(est_minutes, Some(10));
}
#[tokio::test]
async fn host_transform_folds_urls_in_rust() {
let pool = seeded_pool().await;
let mut w = widget("web_visit", AggregateAgg::Count, AggregateRender::Bar);
w.group_by = Some("url".into());
w.transform = Some(AggregateTransform::Host);
let data = compute_widget(&ctx(&pool, Some("p1")), &w)
.await
.unwrap()
.unwrap();
let WidgetData::Bar { rows } = data else {
panic!("expected bar")
};
assert_eq!(rows[0].label, "github.com");
assert_eq!(rows[0].value, 2);
assert_eq!(rows[1].label, "example.com");
assert_eq!(rows[1].value, 1);
}
#[tokio::test]
async fn count_stat_is_grand_total() {
let pool = seeded_pool().await;
let w = widget("app_sample", AggregateAgg::Count, AggregateRender::Stat);
let data = compute_widget(&ctx(&pool, Some("p1")), &w)
.await
.unwrap()
.unwrap();
let WidgetData::Stat { value, .. } = data else {
panic!("expected stat")
};
assert_eq!(value, 5);
}
#[tokio::test]
async fn fleet_pc_ranking_counts_all_pcs() {
let pool = seeded_pool().await;
let mut w = widget("app_sample", AggregateAgg::Count, AggregateRender::Bar);
w.scope = AggregateScope::Fleet;
w.group_by = Some("pc_id".into());
let data = compute_widget(&ctx(&pool, None), &w)
.await
.unwrap()
.unwrap();
let WidgetData::Bar { rows } = data else {
panic!("expected bar")
};
assert_eq!(rows[0].label, "p1"); assert_eq!(rows[0].value, 5);
assert_eq!(rows[1].label, "p2"); }
#[tokio::test]
async fn ratio_timeline_buckets_by_local_hour() {
let pool = seeded_pool().await;
let mut w = widget("presence", AggregateAgg::Ratio, AggregateRender::Timeline);
w.bool_path = Some("active".into());
w.time_bucket = Some(AggregateTimeBucket::Hour);
let data = compute_widget(&ctx(&pool, Some("p1")), &w)
.await
.unwrap()
.unwrap();
let WidgetData::Timeline { buckets } = data else {
panic!("expected timeline")
};
let h9 = buckets.iter().find(|b| b.hour == 9).unwrap();
assert_eq!((h9.total, h9.active), (1, 1));
let h11 = buckets.iter().find(|b| b.hour == 11).unwrap();
assert_eq!((h11.total, h11.active), (1, 0));
}
#[tokio::test]
async fn timeline_shifts_into_local_hours() {
let pool = seeded_pool().await;
let mut c = ctx(&pool, Some("p1"));
c.tz_mod = "+540 minutes";
let mut w = widget("presence", AggregateAgg::Ratio, AggregateRender::Timeline);
w.bool_path = Some("active".into());
w.time_bucket = Some(AggregateTimeBucket::Hour);
let data = compute_widget(&c, &w).await.unwrap().unwrap();
let WidgetData::Timeline { buckets } = data else {
panic!("expected timeline")
};
assert!(buckets.iter().any(|b| b.hour == 18 && b.active == 1));
assert!(buckets.iter().all(|b| b.hour != 9));
}
}