use crate::errors::ServerError;
use crate::server::AppState;
use axum::Json;
use axum::extract::{Query, State};
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use chrono::{DateTime, Utc};
use rlqt_lib::entry_metadata::labels::LABEL_NAMES;
use rlqt_lib::rel_db::node_log_entry::Model;
use rlqt_lib::rel_db::presets::QueryPreset;
use rlqt_lib::{NodeLogEntry, QueryContext};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::io::Error as IoError;
#[derive(Debug, Deserialize)]
pub struct LogQueryParams {
since_time: Option<String>,
to_time: Option<String>,
severity: Option<String>,
erlang_pid: Option<String>,
node: Option<String>,
subsystem: Option<String>,
labels: Option<String>,
matching_all_labels: Option<bool>,
limit: Option<u64>,
has_resolution_or_discussion_url: Option<bool>,
has_doc_url: Option<bool>,
unlabelled: Option<bool>,
}
#[derive(Debug, Serialize)]
pub struct LogQueryResponse {
entries: Vec<LogEntry>,
total: usize,
}
#[derive(Debug, Serialize)]
pub struct LogEntry {
id: i64,
node: String,
timestamp: String,
severity: String,
erlang_pid: String,
message: String,
subsystem: Option<String>,
labels: HashMap<String, bool>,
doc_url: Option<String>,
resolution_or_discussion_url: Option<String>,
}
impl From<Model> for LogEntry {
fn from(model: Model) -> Self {
let label_bits = model.labels as u64;
let mut labels = HashMap::new();
for (i, label_name) in LABEL_NAMES.iter().enumerate() {
if label_bits & (1u64 << i) != 0 {
labels.insert(label_name.to_string(), true);
}
}
let subsystem = model
.subsystem_id
.and_then(rlqt_lib::entry_metadata::subsystems::Subsystem::from_id)
.map(|s| s.to_string());
let doc_url = model
.doc_url_id
.and_then(rlqt_lib::constants::doc_url_from_id);
let resolution_or_discussion_url = model
.resolution_or_discussion_url_id
.and_then(rlqt_lib::constants::resolution_or_discussion_url_from_id);
Self {
id: model.id,
node: model.node,
timestamp: model.timestamp.to_rfc3339(),
severity: model.severity,
erlang_pid: model.erlang_pid,
message: model.message,
subsystem,
labels,
doc_url: doc_url.map(String::from),
resolution_or_discussion_url: resolution_or_discussion_url.map(String::from),
}
}
}
impl IntoResponse for ServerError {
fn into_response(self) -> Response {
let (status, message) = match self {
ServerError::Database(ref e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()),
ServerError::Library(ref e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()),
ServerError::Io(ref e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()),
ServerError::Serialization(ref e) => (StatusCode::BAD_REQUEST, e.to_string()),
ServerError::DateTimeParse(ref e) => (StatusCode::BAD_REQUEST, e.clone()),
ServerError::InvalidPreset(ref e) => (StatusCode::BAD_REQUEST, e.clone()),
ServerError::InvalidQuery(ref e) => (StatusCode::BAD_REQUEST, e.clone()),
};
(status, Json(serde_json::json!({ "error": message }))).into_response()
}
}
pub async fn query_logs(
State(state): State<AppState>,
Query(params): Query<LogQueryParams>,
) -> Result<Json<LogQueryResponse>, ServerError> {
let mut ctx = QueryContext::default();
if let Some(since) = params
.since_time
.as_ref()
.map(|s| parse_datetime_flexible(s))
.transpose()?
{
ctx = ctx.since(since);
}
if let Some(to) = params
.to_time
.as_ref()
.map(|s| parse_datetime_flexible(s))
.transpose()?
{
ctx = ctx.to(to);
}
if let Some(sev) = params.severity.as_ref() {
ctx = ctx.severity(sev);
}
if let Some(pid) = params.erlang_pid.as_ref() {
ctx = ctx.erlang_pid(pid);
}
if let Some(n) = params.node.as_ref() {
ctx = ctx.node(n);
}
if let Some(sub) = params.subsystem.as_ref() {
ctx = ctx.subsystem(sub);
}
if let Some(labels_str) = params.labels.as_ref() {
let labels: Vec<String> = labels_str
.split(',')
.map(|s| s.trim().to_string())
.collect();
for label in labels {
let normalized_label = if label == "election" {
"elections"
} else {
label.as_str()
};
ctx = ctx.add_label(normalized_label);
}
}
if params.matching_all_labels.unwrap_or(false) {
ctx = ctx.matching_all_labels(true);
}
if let Some(l) = params.limit
&& l > 0
{
ctx = ctx.limit(l);
}
if params.has_resolution_or_discussion_url.unwrap_or(false) {
ctx = ctx.has_resolution_or_discussion_url(true);
}
if params.has_doc_url.unwrap_or(false) {
ctx = ctx.has_doc_url(true);
}
if params.unlabelled.unwrap_or(false) {
ctx = ctx.add_label("unlabelled");
}
let db = state.db.clone();
let models = tokio::task::spawn_blocking(move || NodeLogEntry::query(&db, &ctx))
.await
.map_err(|e| ServerError::Io(IoError::other(format!("Task join error: {}", e))))??;
let total = models.len();
let entries: Vec<LogEntry> = models.into_iter().map(LogEntry::from).collect();
Ok(Json(LogQueryResponse { entries, total }))
}
fn parse_datetime_flexible(s: &str) -> Result<DateTime<Utc>, ServerError> {
rlqt_lib::datetime::parse_datetime_flexible(s).map_err(ServerError::DateTimeParse)
}
#[derive(Debug, Deserialize)]
pub struct PresetQueryParams {
since_time: Option<String>,
to_time: Option<String>,
node: Option<String>,
limit: Option<u64>,
}
pub async fn query_logs_by_preset(
State(state): State<AppState>,
axum::extract::Path(preset_name): axum::extract::Path<String>,
Query(params): Query<PresetQueryParams>,
) -> Result<Json<LogQueryResponse>, ServerError> {
let preset: QueryPreset = preset_name
.parse()
.map_err(|e: String| ServerError::InvalidPreset(e))?;
let mut ctx = QueryContext::from(preset);
if let Some(since) = params
.since_time
.as_ref()
.map(|s| parse_datetime_flexible(s))
.transpose()?
{
ctx = ctx.since(since);
}
if let Some(to) = params
.to_time
.as_ref()
.map(|s| parse_datetime_flexible(s))
.transpose()?
{
ctx = ctx.to(to);
}
if let Some(n) = params.node.as_ref() {
ctx = ctx.node(n);
}
if let Some(l) = params.limit
&& l > 0
{
ctx = ctx.limit(l);
}
let db = state.db.clone();
let models = tokio::task::spawn_blocking(move || NodeLogEntry::query(&db, &ctx))
.await
.map_err(|e| ServerError::Io(IoError::other(format!("Task join error: {}", e))))??;
let total = models.len();
let entries: Vec<LogEntry> = models.into_iter().map(LogEntry::from).collect();
Ok(Json(LogQueryResponse { entries, total }))
}
#[derive(Debug, Deserialize)]
pub struct QLQueryParams {
query: String,
limit: Option<u64>,
}
pub async fn query_logs_by_ql(
State(state): State<AppState>,
Query(params): Query<QLQueryParams>,
) -> Result<Json<LogQueryResponse>, ServerError> {
let mut ctx = rlqt_ql::to_query_context(¶ms.query)
.map_err(|e| ServerError::InvalidQuery(e.to_string()))?;
if let Some(l) = params.limit
&& l > 0
{
ctx = ctx.limit(l);
}
let db = state.db.clone();
let models = tokio::task::spawn_blocking(move || NodeLogEntry::query(&db, &ctx))
.await
.map_err(|e| ServerError::Io(IoError::other(format!("Task join error: {}", e))))??;
let total = models.len();
let entries: Vec<LogEntry> = models.into_iter().map(LogEntry::from).collect();
Ok(Json(LogQueryResponse { entries, total }))
}