rlqt-ui 0.19.0

Web UI for RabbitMQ Log Querying Tools
Documentation
// Copyright (C) 2025-2026 Michael S. Klishin and Contributors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::errors::ServerError;
use crate::server::AppState;
use axum::Json;
use axum::extract::{Query, State};
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use chrono::{DateTime, Utc};
use rlqt_lib::entry_metadata::labels::LABEL_NAMES;
use rlqt_lib::rel_db::node_log_entry::Model;
use rlqt_lib::rel_db::presets::QueryPreset;
use rlqt_lib::{NodeLogEntry, QueryContext};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::io::Error as IoError;

#[derive(Debug, Deserialize)]
pub struct LogQueryParams {
    since_time: Option<String>,
    to_time: Option<String>,
    severity: Option<String>,
    erlang_pid: Option<String>,
    node: Option<String>,
    subsystem: Option<String>,
    labels: Option<String>,
    matching_all_labels: Option<bool>,
    limit: Option<u64>,
    has_resolution_or_discussion_url: Option<bool>,
    has_doc_url: Option<bool>,
    unlabelled: Option<bool>,
}

#[derive(Debug, Serialize)]
pub struct LogQueryResponse {
    entries: Vec<LogEntry>,
    total: usize,
}

#[derive(Debug, Serialize)]
pub struct LogEntry {
    id: i64,
    node: String,
    timestamp: String,
    severity: String,
    erlang_pid: String,
    message: String,
    subsystem: Option<String>,
    labels: HashMap<String, bool>,
    doc_url: Option<String>,
    resolution_or_discussion_url: Option<String>,
}

impl From<Model> for LogEntry {
    fn from(model: Model) -> Self {
        let label_bits = model.labels as u64;
        let mut labels = HashMap::new();
        for (i, label_name) in LABEL_NAMES.iter().enumerate() {
            if label_bits & (1u64 << i) != 0 {
                labels.insert(label_name.to_string(), true);
            }
        }

        let subsystem = model
            .subsystem_id
            .and_then(rlqt_lib::entry_metadata::subsystems::Subsystem::from_id)
            .map(|s| s.to_string());

        let doc_url = model
            .doc_url_id
            .and_then(rlqt_lib::constants::doc_url_from_id);

        let resolution_or_discussion_url = model
            .resolution_or_discussion_url_id
            .and_then(rlqt_lib::constants::resolution_or_discussion_url_from_id);

        Self {
            id: model.id,
            node: model.node,
            timestamp: model.timestamp.to_rfc3339(),
            severity: model.severity,
            erlang_pid: model.erlang_pid,
            message: model.message,
            subsystem,
            labels,
            doc_url: doc_url.map(String::from),
            resolution_or_discussion_url: resolution_or_discussion_url.map(String::from),
        }
    }
}

impl IntoResponse for ServerError {
    fn into_response(self) -> Response {
        let (status, message) = match self {
            ServerError::Database(ref e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()),
            ServerError::Library(ref e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()),
            ServerError::Io(ref e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()),
            ServerError::Serialization(ref e) => (StatusCode::BAD_REQUEST, e.to_string()),
            ServerError::DateTimeParse(ref e) => (StatusCode::BAD_REQUEST, e.clone()),
            ServerError::InvalidPreset(ref e) => (StatusCode::BAD_REQUEST, e.clone()),
            ServerError::InvalidQuery(ref e) => (StatusCode::BAD_REQUEST, e.clone()),
        };

        (status, Json(serde_json::json!({ "error": message }))).into_response()
    }
}

pub async fn query_logs(
    State(state): State<AppState>,
    Query(params): Query<LogQueryParams>,
) -> Result<Json<LogQueryResponse>, ServerError> {
    let mut ctx = QueryContext::default();

    if let Some(since) = params
        .since_time
        .as_ref()
        .map(|s| parse_datetime_flexible(s))
        .transpose()?
    {
        ctx = ctx.since(since);
    }

    if let Some(to) = params
        .to_time
        .as_ref()
        .map(|s| parse_datetime_flexible(s))
        .transpose()?
    {
        ctx = ctx.to(to);
    }

    if let Some(sev) = params.severity.as_ref() {
        ctx = ctx.severity(sev);
    }

    if let Some(pid) = params.erlang_pid.as_ref() {
        ctx = ctx.erlang_pid(pid);
    }

    if let Some(n) = params.node.as_ref() {
        ctx = ctx.node(n);
    }

    if let Some(sub) = params.subsystem.as_ref() {
        ctx = ctx.subsystem(sub);
    }

    if let Some(labels_str) = params.labels.as_ref() {
        let labels: Vec<String> = labels_str
            .split(',')
            .map(|s| s.trim().to_string())
            .collect();
        for label in labels {
            let normalized_label = if label == "election" {
                "elections"
            } else {
                label.as_str()
            };
            ctx = ctx.add_label(normalized_label);
        }
    }

    if params.matching_all_labels.unwrap_or(false) {
        ctx = ctx.matching_all_labels(true);
    }

    if let Some(l) = params.limit
        && l > 0
    {
        ctx = ctx.limit(l);
    }

    if params.has_resolution_or_discussion_url.unwrap_or(false) {
        ctx = ctx.has_resolution_or_discussion_url(true);
    }

    if params.has_doc_url.unwrap_or(false) {
        ctx = ctx.has_doc_url(true);
    }

    if params.unlabelled.unwrap_or(false) {
        ctx = ctx.add_label("unlabelled");
    }

    let db = state.db.clone();
    let models = tokio::task::spawn_blocking(move || NodeLogEntry::query(&db, &ctx))
        .await
        .map_err(|e| ServerError::Io(IoError::other(format!("Task join error: {}", e))))??;

    let total = models.len();
    let entries: Vec<LogEntry> = models.into_iter().map(LogEntry::from).collect();

    Ok(Json(LogQueryResponse { entries, total }))
}

fn parse_datetime_flexible(s: &str) -> Result<DateTime<Utc>, ServerError> {
    rlqt_lib::datetime::parse_datetime_flexible(s).map_err(ServerError::DateTimeParse)
}

#[derive(Debug, Deserialize)]
pub struct PresetQueryParams {
    since_time: Option<String>,
    to_time: Option<String>,
    node: Option<String>,
    limit: Option<u64>,
}

pub async fn query_logs_by_preset(
    State(state): State<AppState>,
    axum::extract::Path(preset_name): axum::extract::Path<String>,
    Query(params): Query<PresetQueryParams>,
) -> Result<Json<LogQueryResponse>, ServerError> {
    let preset: QueryPreset = preset_name
        .parse()
        .map_err(|e: String| ServerError::InvalidPreset(e))?;

    let mut ctx = QueryContext::from(preset);

    if let Some(since) = params
        .since_time
        .as_ref()
        .map(|s| parse_datetime_flexible(s))
        .transpose()?
    {
        ctx = ctx.since(since);
    }

    if let Some(to) = params
        .to_time
        .as_ref()
        .map(|s| parse_datetime_flexible(s))
        .transpose()?
    {
        ctx = ctx.to(to);
    }

    if let Some(n) = params.node.as_ref() {
        ctx = ctx.node(n);
    }

    if let Some(l) = params.limit
        && l > 0
    {
        ctx = ctx.limit(l);
    }

    let db = state.db.clone();
    let models = tokio::task::spawn_blocking(move || NodeLogEntry::query(&db, &ctx))
        .await
        .map_err(|e| ServerError::Io(IoError::other(format!("Task join error: {}", e))))??;

    let total = models.len();
    let entries: Vec<LogEntry> = models.into_iter().map(LogEntry::from).collect();

    Ok(Json(LogQueryResponse { entries, total }))
}

#[derive(Debug, Deserialize)]
pub struct QLQueryParams {
    query: String,
    limit: Option<u64>,
}

pub async fn query_logs_by_ql(
    State(state): State<AppState>,
    Query(params): Query<QLQueryParams>,
) -> Result<Json<LogQueryResponse>, ServerError> {
    let mut ctx = rlqt_ql::to_query_context(&params.query)
        .map_err(|e| ServerError::InvalidQuery(e.to_string()))?;

    if let Some(l) = params.limit
        && l > 0
    {
        ctx = ctx.limit(l);
    }

    let db = state.db.clone();
    let models = tokio::task::spawn_blocking(move || NodeLogEntry::query(&db, &ctx))
        .await
        .map_err(|e| ServerError::Io(IoError::other(format!("Task join error: {}", e))))??;

    let total = models.len();
    let entries: Vec<LogEntry> = models.into_iter().map(LogEntry::from).collect();

    Ok(Json(LogQueryResponse { entries, total }))
}