use std::{convert::Infallible, sync::Arc};
use askama::Template;
use askama_web::WebTemplate;
use axum::{
extract::{Query, State},
response::{
IntoResponse, Response, Sse,
sse::{Event, KeepAlive},
},
};
use datastar::{
consts::ElementPatchMode,
prelude::{PatchElements, PatchSignals},
};
use serde::Deserialize;
use tokio::sync::broadcast::error::RecvError;
use crate::{
resolver::pipeline::LogAction,
storage::query_log::{QueryLogRecord, QueryLogRepository},
telemetry::{QueryEvent, Stats},
web::{
AppState, Chrome,
auth::CurrentUser,
render::{DomainDisplay, WebError, datastar_response},
},
};
const LOG_PAGE_SIZE: i64 = 100;
impl AppState {
pub async fn query_log(user: CurrentUser, State(state): State<AppState>) -> Response {
let records = match state.db.query_log().page(None, LOG_PAGE_SIZE).await {
Ok(records) => records,
Err(e) => return WebError::from(e).into_response(),
};
let oldest = records.last().map(|r| r.id).unwrap_or(0);
let mut rows = Vec::with_capacity(records.len());
for r in &records {
let client = state.client_label(&r.client).await;
if let Ok(html) = LogRowView::from_record(r, client).render() {
rows.push(html);
}
}
LogPageTemplate {
chrome: state.chrome("log", &user).await,
rows,
oldest,
}
.into_response()
}
pub async fn query_log_older(
_user: CurrentUser,
State(state): State<AppState>,
Query(q): Query<OlderQuery>,
) -> Response {
let records = match state
.db
.query_log()
.page(Some(q.before), LOG_PAGE_SIZE)
.await
{
Ok(records) => records,
Err(e) => return WebError::from(e).into_response(),
};
let new_oldest = records.last().map(|r| r.id).unwrap_or(0);
let mut html = String::new();
for r in &records {
let client = state.client_label(&r.client).await;
if let Ok(row) = LogRowView::from_record(r, client).render() {
html.push_str(&row);
}
}
let mut events = Vec::new();
if !html.is_empty() {
events.push(
PatchElements::new(html)
.selector("#log-body")
.mode(ElementPatchMode::Append)
.write_as_axum_sse_event(),
);
}
events.push(
PatchSignals::new(format!(r#"{{"oldest":{new_oldest}}}"#)).write_as_axum_sse_event(),
);
datastar_response(events)
}
pub async fn events(_user: CurrentUser, State(state): State<AppState>) -> impl IntoResponse {
let mut rx = state.telemetry.live_log.subscribe();
let stats = Arc::clone(&state.telemetry.stats);
let stream = async_stream::stream! {
yield Ok::<Event, Infallible>(counters_event(&stats));
loop {
match rx.recv().await {
Ok(ev) => {
let client = state.client_label_ip(ev.client.ip()).await;
yield Ok(row_event(&ev, client));
yield Ok(counters_event(&stats));
}
Err(RecvError::Lagged(_)) => continue,
Err(RecvError::Closed) => break,
}
}
};
Sse::new(stream).keep_alive(KeepAlive::default())
}
}
fn row_event(ev: &QueryEvent, client: String) -> Event {
let html = LogRowView::from_event(ev, client)
.render()
.unwrap_or_default();
PatchElements::new(html)
.selector("#log-body")
.mode(ElementPatchMode::Prepend)
.write_as_axum_sse_event()
}
fn counters_event(stats: &Stats) -> Event {
let s = stats.snapshot(0);
let json = format!(
r#"{{"queries":{},"blocked":{},"cached":{},"forwarded":{}}}"#,
s.total, s.blocked, s.cached, s.forwarded
);
PatchSignals::new(json).write_as_axum_sse_event()
}
#[derive(Template, WebTemplate)]
#[template(path = "log_row.html")]
struct LogRowView {
id: i64,
client: String,
qname: String,
qtype: String,
outcome_label: String,
outcome_cat: &'static str,
latency_ms: u64,
search: String,
action_html: String,
}
impl LogRowView {
fn from_event(ev: &QueryEvent, client: String) -> Self {
let qname = ev.qname.to_string().display_domain().to_owned();
Self::build(
0,
client,
qname,
ev.qtype.to_string(),
ev.outcome.to_string(),
ev.outcome.category(),
ev.outcome.log_action(),
ev.latency.as_millis() as u64,
)
}
fn from_record(record: &QueryLogRecord, client: String) -> Self {
let qname = record.qname.display_domain().to_owned();
Self::build(
record.id,
client,
qname,
record.qtype.clone(),
record.outcome.to_string(),
record.outcome.category(),
record.outcome.log_action(),
record.latency_ms.max(0) as u64,
)
}
#[allow(clippy::too_many_arguments)]
fn build(
id: i64,
client: String,
qname: String,
qtype: String,
outcome_label: String,
outcome_cat: &'static str,
action: Option<LogAction>,
latency_ms: u64,
) -> Self {
let search = format!("{} {}", qname.to_lowercase(), client);
let action_html = ActionButton::for_outcome(action, qname.clone())
.render()
.unwrap_or_default();
Self {
id,
client,
qname,
qtype,
outcome_label,
outcome_cat,
latency_ms,
search,
action_html,
}
}
}
#[derive(Debug, Deserialize)]
pub struct OlderQuery {
before: i64,
}
#[derive(Template, WebTemplate)]
#[template(path = "log_action.html")]
pub(crate) struct ActionButton {
label: &'static str,
endpoint: &'static str,
class: &'static str,
domain: String,
}
impl ActionButton {
fn for_outcome(action: Option<LogAction>, domain: String) -> Self {
let (label, endpoint, class) = match action {
Some(LogAction::Block) => ("Block", "/log/block", "secondary"),
Some(LogAction::Unblock) => ("Unblock", "/log/unblock", ""),
None => ("", "", ""),
};
Self {
label,
endpoint,
class,
domain,
}
}
}
#[derive(Template, WebTemplate)]
#[template(path = "log.html")]
struct LogPageTemplate {
chrome: Chrome,
rows: Vec<String>,
oldest: i64,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::codec::{message::Qtype, name::Name};
use crate::resolver::pipeline::Outcome;
use std::time::Duration;
fn event(domain: &str, outcome: Outcome) -> QueryEvent {
let client = "192.168.1.5:5000".parse().unwrap();
let qname: Name = domain.parse().unwrap();
QueryEvent::new(client, qname, Qtype::A, outcome).with_latency(Duration::from_millis(7))
}
#[test]
fn row_renders_outcome_filter_and_search() {
let html = LogRowView::from_event(
&event("ads.example.com", Outcome::BlockedByBlocklist),
"192.168.1.5".to_owned(),
)
.render()
.expect("render row");
assert!(html.contains("ads.example.com"));
assert!(!html.contains("ads.example.com."));
assert!(html.contains("192.168.1.5"));
assert!(html.contains("sgt-badge--blocked"));
assert!(html.contains("blocked"));
assert!(html.contains("data-show"));
assert!(html.contains("7 ms"));
assert!(html.contains("Unblock"));
assert!(html.contains("/log/unblock?domain=ads.example.com"));
}
#[test]
fn row_action_is_keyed_on_outcome() {
for o in [Outcome::Cached, Outcome::Forwarded] {
let html = LogRowView::from_event(&event("good.example.com", o), "10.0.0.1".to_owned())
.render()
.unwrap();
assert!(html.contains("Block"), "{o:?} must offer block");
assert!(html.contains("/log/block?domain=good.example.com"));
}
for o in [Outcome::BlockedByAdmin, Outcome::BlockedByBlocklist] {
let html = LogRowView::from_event(&event("ads.example.com", o), "10.0.0.1".to_owned())
.render()
.unwrap();
assert!(html.contains("Unblock"), "{o:?} must offer unblock");
assert!(html.contains("/log/unblock?domain=ads.example.com"));
}
for o in [Outcome::Local, Outcome::LocalNoData, Outcome::Servfail] {
let html = LogRowView::from_event(&event("x.example.com", o), "10.0.0.1".to_owned())
.render()
.unwrap();
assert!(!html.contains("Block"), "{o:?} must offer no action");
assert!(!html.contains("Unblock"), "{o:?} must offer no action");
}
}
#[test]
fn counters_event_is_valid_signal_json() {
let stats = Stats::new();
stats.record(&event("a.test", Outcome::Forwarded));
let ev = counters_event(&stats);
let rendered = format!("{ev:?}");
assert!(rendered.contains("queries"));
}
}