use std::collections::HashMap;
use axum::Json;
use axum::extract::{Path, Query, State};
use axum::http::StatusCode;
use chrono::{DateTime, Duration, Utc};
use serde::{Deserialize, Serialize};
use sqlx::{Row, SqlitePool};
use tracing::warn;
const MAX_VISIT_ROWS: i64 = 10_000;
const PRESENCE_SAMPLE_MINUTES: i64 = 5;
const APP_SAMPLE_MINUTES: i64 = 2;
const LOCK_SCREEN_APP: &str = "LockApp";
#[derive(Deserialize)]
pub struct WindowQuery {
pub from: Option<DateTime<Utc>>,
pub to: Option<DateTime<Utc>>,
pub tz_offset_minutes: Option<i64>,
}
#[derive(Serialize)]
pub struct ActiveSummary {
pub total_samples: i64,
pub active_samples: i64,
pub active_ratio: f64,
pub first_active: Option<DateTime<Utc>>,
pub last_active: Option<DateTime<Utc>>,
pub est_active_minutes: i64,
}
#[derive(Serialize)]
pub struct AppCount {
pub app: String,
pub samples: i64,
pub est_minutes: i64,
}
#[derive(Serialize)]
pub struct SiteCount {
pub host: String,
pub visits: i64,
}
#[derive(Serialize)]
pub struct HourBucket {
pub hour: i64,
pub total: i64,
pub active: i64,
}
#[derive(Serialize)]
pub struct UtilizationResponse {
pub pc_id: String,
pub from: DateTime<Utc>,
pub to: DateTime<Utc>,
pub active: ActiveSummary,
pub top_apps: Vec<AppCount>,
pub top_sites: Vec<SiteCount>,
pub site_visits_capped: bool,
pub timeline: Vec<HourBucket>,
}
fn host_of(url: &str) -> Option<String> {
let lower = url.trim_start().to_ascii_lowercase();
for scheme in [
"about:",
"data:",
"javascript:",
"chrome:",
"chrome-extension:",
"edge:",
"brave:",
"view-source:",
"file:",
] {
if lower.starts_with(scheme) {
return None;
}
}
let rest = url.split_once("://").map(|(_, r)| r).unwrap_or(url);
let authority = rest.split(['/', '?', '#']).next()?;
let no_userinfo = authority.rsplit('@').next().unwrap_or(authority);
let host = if no_userinfo.starts_with('[') {
match no_userinfo.rfind(']') {
Some(i) => &no_userinfo[..=i],
None => no_userinfo,
}
} else {
no_userinfo.split(':').next().unwrap_or(no_userinfo)
};
let host = host.trim();
if host.is_empty() {
None
} else {
Some(host.to_lowercase())
}
}
pub async fn get(
State(pool): State<SqlitePool>,
Path(pc_id): Path<String>,
Query(q): Query<WindowQuery>,
) -> Result<Json<UtilizationResponse>, StatusCode> {
let to = q.to.unwrap_or_else(Utc::now);
let from = q.from.unwrap_or_else(|| to - Duration::hours(24));
if from >= to {
return Err(StatusCode::BAD_REQUEST);
}
let tz_off = q.tz_offset_minutes.unwrap_or(0).clamp(-900, 900);
let tz_mod = format!("{tz_off:+} minutes");
let active_row = sqlx::query(
"SELECT \
COUNT(*) AS total, \
COALESCE(SUM(CASE WHEN json_extract(payload, '$.active') = 1 THEN 1 ELSE 0 END), 0) AS active, \
MIN(CASE WHEN json_extract(payload, '$.active') = 1 THEN at END) AS first_active, \
MAX(CASE WHEN json_extract(payload, '$.active') = 1 THEN at END) AS last_active \
FROM obs_events \
WHERE pc_id = ? AND kind = 'presence' AND at >= ? AND at < ?",
)
.bind(&pc_id)
.bind(from)
.bind(to)
.fetch_one(&pool)
.await
.map_err(|e| {
warn!(error = %e, "utilization: presence aggregate");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let total_samples: i64 = active_row.try_get("total").unwrap_or(0);
let active_samples: i64 = active_row.try_get("active").unwrap_or(0);
let active = ActiveSummary {
total_samples,
active_samples,
active_ratio: if total_samples > 0 {
active_samples as f64 / total_samples as f64
} else {
0.0
},
first_active: active_row
.try_get::<Option<DateTime<Utc>>, _>("first_active")
.unwrap_or(None),
last_active: active_row
.try_get::<Option<DateTime<Utc>>, _>("last_active")
.unwrap_or(None),
est_active_minutes: active_samples * PRESENCE_SAMPLE_MINUTES,
};
let app_rows = sqlx::query(
"SELECT json_extract(payload, '$.foreground.app') AS app, COUNT(*) AS n \
FROM obs_events \
WHERE pc_id = ? AND kind = 'app_sample' AND at >= ? AND at < ? \
AND json_extract(payload, '$.foreground.app') IS NOT NULL \
AND json_extract(payload, '$.foreground.app') <> '' \
AND json_extract(payload, '$.foreground.app') <> ? \
GROUP BY json_extract(payload, '$.foreground.app') ORDER BY n DESC LIMIT 10",
)
.bind(&pc_id)
.bind(from)
.bind(to)
.bind(LOCK_SCREEN_APP)
.fetch_all(&pool)
.await
.map_err(|e| {
warn!(error = %e, "utilization: app_sample aggregate");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let top_apps = app_rows
.into_iter()
.filter_map(|r| {
let app: String = r.try_get("app").ok()?;
let samples: i64 = r.try_get("n").unwrap_or(0);
Some(AppCount {
app,
samples,
est_minutes: samples * APP_SAMPLE_MINUTES,
})
})
.collect();
let visit_rows = sqlx::query(
"SELECT json_extract(payload, '$.url') AS url \
FROM obs_events \
WHERE pc_id = ? AND kind = 'web_visit' AND at >= ? AND at < ? \
AND json_extract(payload, '$.url') IS NOT NULL \
ORDER BY at DESC LIMIT ?",
)
.bind(&pc_id)
.bind(from)
.bind(to)
.bind(MAX_VISIT_ROWS)
.fetch_all(&pool)
.await
.map_err(|e| {
warn!(error = %e, "utilization: web_visit fetch");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let site_visits_capped = visit_rows.len() as i64 >= MAX_VISIT_ROWS;
let mut host_counts: HashMap<String, i64> = HashMap::new();
for r in visit_rows {
let url: String = match r.try_get("url") {
Ok(u) => u,
Err(_) => continue,
};
if let Some(h) = host_of(&url) {
*host_counts.entry(h).or_insert(0) += 1;
}
}
let mut top_sites: Vec<SiteCount> = host_counts
.into_iter()
.map(|(host, visits)| SiteCount { host, visits })
.collect();
top_sites.sort_by(|a, b| b.visits.cmp(&a.visits).then_with(|| a.host.cmp(&b.host)));
top_sites.truncate(10);
let hour_rows = sqlx::query(
"SELECT CAST(strftime('%H', at, ?) AS INTEGER) AS hour, \
COUNT(*) AS total, \
COALESCE(SUM(CASE WHEN json_extract(payload, '$.active') = 1 THEN 1 ELSE 0 END), 0) AS active \
FROM obs_events \
WHERE pc_id = ? AND kind = 'presence' AND at >= ? AND at < ? \
GROUP BY hour ORDER BY hour",
)
.bind(&tz_mod)
.bind(&pc_id)
.bind(from)
.bind(to)
.fetch_all(&pool)
.await
.map_err(|e| {
warn!(error = %e, "utilization: timeline aggregate");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let timeline = hour_rows
.into_iter()
.filter_map(|r| {
let hour: i64 = r.try_get("hour").ok()?;
let total: i64 = r.try_get("total").unwrap_or(0);
let active: i64 = r.try_get("active").unwrap_or(0);
Some(HourBucket {
hour,
total,
active,
})
})
.collect();
Ok(Json(UtilizationResponse {
pc_id,
from,
to,
active,
top_apps,
top_sites,
site_visits_capped,
timeline,
}))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn host_of_extracts_authority() {
assert_eq!(
host_of("https://github.com/yukimemi/kanade").as_deref(),
Some("github.com")
);
assert_eq!(
host_of("http://localhost:8080/events?kind=x").as_deref(),
Some("localhost")
);
assert_eq!(
host_of("https://user:pw@mail.google.com/chat/").as_deref(),
Some("mail.google.com")
);
assert_eq!(
host_of("HTTPS://Example.COM/").as_deref(),
Some("example.com")
);
assert_eq!(host_of("http://[::1]:8080/").as_deref(), Some("[::1]"));
assert_eq!(host_of("about:blank"), None);
assert_eq!(host_of("data:text/html,hi"), None);
assert_eq!(host_of("chrome-extension://abc/page.html"), None);
assert_eq!(host_of(""), None);
assert_eq!(host_of("https://"), None);
}
}