use std::{convert::Infallible, sync::Arc};
use askama::Template;
use askama_web::WebTemplate;
use axum::{
extract::State,
response::{
IntoResponse, Sse,
sse::{Event, KeepAlive},
},
};
use datastar::{
consts::ElementPatchMode,
prelude::{PatchElements, PatchSignals},
};
use tokio::sync::broadcast::error::RecvError;
use crate::{
resolver::pipeline::LogAction,
telemetry::{QueryEvent, Stats},
web::{AppState, Chrome, auth::CurrentUser, render::DomainDisplay},
};
impl AppState {
pub async fn query_log(user: CurrentUser, State(state): State<AppState>) -> impl IntoResponse {
let rows: Vec<String> = state
.telemetry
.live_log
.recent()
.iter()
.rev()
.filter_map(|ev| LogRowView::from_event(ev).render().ok())
.collect();
LogPageTemplate {
chrome: state.chrome("log", &user).await,
rows,
}
}
pub async fn events(_user: CurrentUser, State(state): State<AppState>) -> impl IntoResponse {
let mut rx = state.telemetry.live_log.subscribe();
let stats = Arc::clone(&state.telemetry.stats);
let stream = async_stream::stream! {
yield Ok::<Event, Infallible>(counters_event(&stats));
loop {
match rx.recv().await {
Ok(ev) => {
yield Ok(row_event(&ev));
yield Ok(counters_event(&stats));
}
Err(RecvError::Lagged(_)) => continue,
Err(RecvError::Closed) => break,
}
}
};
Sse::new(stream).keep_alive(KeepAlive::default())
}
}
fn row_event(ev: &QueryEvent) -> Event {
let html = LogRowView::from_event(ev).render().unwrap_or_default();
PatchElements::new(html)
.selector("#log-body")
.mode(ElementPatchMode::Prepend)
.write_as_axum_sse_event()
}
fn counters_event(stats: &Stats) -> Event {
let s = stats.snapshot(0);
let json = format!(
r#"{{"queries":{},"blocked":{},"cached":{},"forwarded":{}}}"#,
s.total, s.blocked, s.cached, s.forwarded
);
PatchSignals::new(json).write_as_axum_sse_event()
}
#[derive(Template, WebTemplate)]
#[template(path = "log_row.html")]
struct LogRowView {
client: String,
qname: String,
qtype: String,
outcome_label: String,
outcome_cat: &'static str,
latency_ms: u64,
search: String,
action_html: String,
}
impl LogRowView {
fn from_event(ev: &QueryEvent) -> Self {
let qname = ev.qname.to_string().display_domain().to_owned();
let client = ev.client.ip().to_string();
let search = format!("{} {}", qname.to_lowercase(), client);
let action_html = ActionButton::for_outcome(ev.outcome.log_action(), qname.clone())
.render()
.unwrap_or_default();
Self {
client,
qname,
qtype: format!("{:?}", ev.qtype),
outcome_label: ev.outcome.to_string(),
outcome_cat: ev.outcome.category(),
latency_ms: ev.latency.as_millis() as u64,
search,
action_html,
}
}
}
#[derive(Template, WebTemplate)]
#[template(path = "log_action.html")]
pub(crate) struct ActionButton {
label: &'static str,
endpoint: &'static str,
class: &'static str,
domain: String,
}
impl ActionButton {
fn for_outcome(action: Option<LogAction>, domain: String) -> Self {
let (label, endpoint, class) = match action {
Some(LogAction::Block) => ("Block", "/log/block", "secondary"),
Some(LogAction::Unblock) => ("Unblock", "/log/unblock", ""),
None => ("", "", ""),
};
Self {
label,
endpoint,
class,
domain,
}
}
}
#[derive(Template, WebTemplate)]
#[template(path = "log.html")]
struct LogPageTemplate {
chrome: Chrome,
rows: Vec<String>,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::codec::{message::Qtype, name::Name};
use crate::resolver::pipeline::Outcome;
use std::time::Duration;
fn event(domain: &str, outcome: Outcome) -> QueryEvent {
let client = "192.168.1.5:5000".parse().unwrap();
let qname: Name = domain.parse().unwrap();
QueryEvent::new(client, qname, Qtype::A, outcome).with_latency(Duration::from_millis(7))
}
#[test]
fn row_renders_outcome_filter_and_search() {
let html = LogRowView::from_event(&event("ads.example.com", Outcome::BlockedByBlocklist))
.render()
.expect("render row");
assert!(html.contains("ads.example.com"));
assert!(!html.contains("ads.example.com."));
assert!(html.contains("192.168.1.5"));
assert!(html.contains("sgt-badge--blocked"));
assert!(html.contains("blocked"));
assert!(html.contains("data-show"));
assert!(html.contains("7 ms"));
assert!(html.contains("Unblock"));
assert!(html.contains("/log/unblock?domain=ads.example.com"));
}
#[test]
fn row_action_is_keyed_on_outcome() {
for o in [Outcome::Cached, Outcome::Forwarded] {
let html = LogRowView::from_event(&event("good.example.com", o))
.render()
.unwrap();
assert!(html.contains("Block"), "{o:?} must offer block");
assert!(html.contains("/log/block?domain=good.example.com"));
}
for o in [Outcome::BlockedByAdmin, Outcome::BlockedByBlocklist] {
let html = LogRowView::from_event(&event("ads.example.com", o))
.render()
.unwrap();
assert!(html.contains("Unblock"), "{o:?} must offer unblock");
assert!(html.contains("/log/unblock?domain=ads.example.com"));
}
for o in [Outcome::Local, Outcome::LocalNoData, Outcome::Servfail] {
let html = LogRowView::from_event(&event("x.example.com", o))
.render()
.unwrap();
assert!(!html.contains("Block"), "{o:?} must offer no action");
assert!(!html.contains("Unblock"), "{o:?} must offer no action");
}
}
#[test]
fn counters_event_is_valid_signal_json() {
let stats = Stats::new();
stats.record(&event("a.test", Outcome::Forwarded));
let ev = counters_event(&stats);
let rendered = format!("{ev:?}");
assert!(rendered.contains("queries"));
}
}