1use crate::errors::ServerError;
16use crate::server::AppState;
17use axum::Json;
18use axum::extract::{Query, State};
19use axum::http::StatusCode;
20use axum::response::{IntoResponse, Response};
21use chrono::{DateTime, Utc};
22use rlqt_lib::entry_metadata::labels::LABEL_NAMES;
23use rlqt_lib::rel_db::node_log_entry::Model;
24use rlqt_lib::rel_db::presets::QueryPreset;
25use rlqt_lib::{NodeLogEntry, QueryContext};
26use serde::{Deserialize, Serialize};
27use std::collections::HashMap;
28use std::io::Error as IoError;
29
30#[derive(Debug, Deserialize)]
31pub struct LogQueryParams {
32 since_time: Option<String>,
33 to_time: Option<String>,
34 severity: Option<String>,
35 erlang_pid: Option<String>,
36 node: Option<String>,
37 subsystem: Option<String>,
38 labels: Option<String>,
39 matching_all_labels: Option<bool>,
40 limit: Option<u64>,
41 has_resolution_or_discussion_url: Option<bool>,
42 has_doc_url: Option<bool>,
43 unlabelled: Option<bool>,
44}
45
46#[derive(Debug, Serialize)]
47pub struct LogQueryResponse {
48 entries: Vec<LogEntry>,
49 total: usize,
50}
51
52#[derive(Debug, Serialize)]
53pub struct LogEntry {
54 id: i64,
55 node: String,
56 timestamp: String,
57 severity: String,
58 erlang_pid: String,
59 message: String,
60 subsystem: Option<String>,
61 labels: HashMap<String, bool>,
62 doc_url: Option<String>,
63 resolution_or_discussion_url: Option<String>,
64}
65
66impl From<Model> for LogEntry {
67 fn from(model: Model) -> Self {
68 let label_bits = model.labels as u64;
69 let mut labels = HashMap::new();
70 for (i, label_name) in LABEL_NAMES.iter().enumerate() {
71 if label_bits & (1u64 << i) != 0 {
72 labels.insert(label_name.to_string(), true);
73 }
74 }
75
76 let subsystem = model
77 .subsystem_id
78 .and_then(rlqt_lib::entry_metadata::subsystems::Subsystem::from_id)
79 .map(|s| s.to_string());
80
81 let doc_url = model
82 .doc_url_id
83 .and_then(rlqt_lib::constants::doc_url_from_id);
84
85 let resolution_or_discussion_url = model
86 .resolution_or_discussion_url_id
87 .and_then(rlqt_lib::constants::resolution_or_discussion_url_from_id);
88
89 Self {
90 id: model.id,
91 node: model.node,
92 timestamp: model.timestamp.to_rfc3339(),
93 severity: model.severity,
94 erlang_pid: model.erlang_pid,
95 message: model.message,
96 subsystem,
97 labels,
98 doc_url: doc_url.map(String::from),
99 resolution_or_discussion_url: resolution_or_discussion_url.map(String::from),
100 }
101 }
102}
103
104impl IntoResponse for ServerError {
105 fn into_response(self) -> Response {
106 let (status, message) = match self {
107 ServerError::Database(ref e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()),
108 ServerError::Library(ref e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()),
109 ServerError::Io(ref e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()),
110 ServerError::Serialization(ref e) => (StatusCode::BAD_REQUEST, e.to_string()),
111 ServerError::DateTimeParse(ref e) => (StatusCode::BAD_REQUEST, e.clone()),
112 ServerError::InvalidPreset(ref e) => (StatusCode::BAD_REQUEST, e.clone()),
113 ServerError::InvalidQuery(ref e) => (StatusCode::BAD_REQUEST, e.clone()),
114 };
115
116 (status, Json(serde_json::json!({ "error": message }))).into_response()
117 }
118}
119
120pub async fn query_logs(
121 State(state): State<AppState>,
122 Query(params): Query<LogQueryParams>,
123) -> Result<Json<LogQueryResponse>, ServerError> {
124 let mut ctx = QueryContext::default();
125
126 if let Some(since) = params
127 .since_time
128 .as_ref()
129 .map(|s| parse_datetime_flexible(s))
130 .transpose()?
131 {
132 ctx = ctx.since(since);
133 }
134
135 if let Some(to) = params
136 .to_time
137 .as_ref()
138 .map(|s| parse_datetime_flexible(s))
139 .transpose()?
140 {
141 ctx = ctx.to(to);
142 }
143
144 if let Some(sev) = params.severity.as_ref() {
145 ctx = ctx.severity(sev);
146 }
147
148 if let Some(pid) = params.erlang_pid.as_ref() {
149 ctx = ctx.erlang_pid(pid);
150 }
151
152 if let Some(n) = params.node.as_ref() {
153 ctx = ctx.node(n);
154 }
155
156 if let Some(sub) = params.subsystem.as_ref() {
157 ctx = ctx.subsystem(sub);
158 }
159
160 if let Some(labels_str) = params.labels.as_ref() {
161 let labels: Vec<String> = labels_str
162 .split(',')
163 .map(|s| s.trim().to_string())
164 .collect();
165 for label in labels {
166 let normalized_label = if label == "election" {
167 "elections"
168 } else {
169 label.as_str()
170 };
171 ctx = ctx.add_label(normalized_label);
172 }
173 }
174
175 if params.matching_all_labels.unwrap_or(false) {
176 ctx = ctx.matching_all_labels(true);
177 }
178
179 if let Some(l) = params.limit
180 && l > 0
181 {
182 ctx = ctx.limit(l);
183 }
184
185 if params.has_resolution_or_discussion_url.unwrap_or(false) {
186 ctx = ctx.has_resolution_or_discussion_url(true);
187 }
188
189 if params.has_doc_url.unwrap_or(false) {
190 ctx = ctx.has_doc_url(true);
191 }
192
193 if params.unlabelled.unwrap_or(false) {
194 ctx = ctx.add_label("unlabelled");
195 }
196
197 let db = state.db.clone();
198 let models = tokio::task::spawn_blocking(move || NodeLogEntry::query(&db, &ctx))
199 .await
200 .map_err(|e| ServerError::Io(IoError::other(format!("Task join error: {}", e))))??;
201
202 let total = models.len();
203 let entries: Vec<LogEntry> = models.into_iter().map(LogEntry::from).collect();
204
205 Ok(Json(LogQueryResponse { entries, total }))
206}
207
208fn parse_datetime_flexible(s: &str) -> Result<DateTime<Utc>, ServerError> {
209 rlqt_lib::datetime::parse_datetime_flexible(s).map_err(ServerError::DateTimeParse)
210}
211
212#[derive(Debug, Deserialize)]
213pub struct PresetQueryParams {
214 since_time: Option<String>,
215 to_time: Option<String>,
216 node: Option<String>,
217 limit: Option<u64>,
218}
219
220pub async fn query_logs_by_preset(
221 State(state): State<AppState>,
222 axum::extract::Path(preset_name): axum::extract::Path<String>,
223 Query(params): Query<PresetQueryParams>,
224) -> Result<Json<LogQueryResponse>, ServerError> {
225 let preset: QueryPreset = preset_name
226 .parse()
227 .map_err(|e: String| ServerError::InvalidPreset(e))?;
228
229 let mut ctx = QueryContext::from(preset);
230
231 if let Some(since) = params
232 .since_time
233 .as_ref()
234 .map(|s| parse_datetime_flexible(s))
235 .transpose()?
236 {
237 ctx = ctx.since(since);
238 }
239
240 if let Some(to) = params
241 .to_time
242 .as_ref()
243 .map(|s| parse_datetime_flexible(s))
244 .transpose()?
245 {
246 ctx = ctx.to(to);
247 }
248
249 if let Some(n) = params.node.as_ref() {
250 ctx = ctx.node(n);
251 }
252
253 if let Some(l) = params.limit
254 && l > 0
255 {
256 ctx = ctx.limit(l);
257 }
258
259 let db = state.db.clone();
260 let models = tokio::task::spawn_blocking(move || NodeLogEntry::query(&db, &ctx))
261 .await
262 .map_err(|e| ServerError::Io(IoError::other(format!("Task join error: {}", e))))??;
263
264 let total = models.len();
265 let entries: Vec<LogEntry> = models.into_iter().map(LogEntry::from).collect();
266
267 Ok(Json(LogQueryResponse { entries, total }))
268}
269
270#[derive(Debug, Deserialize)]
271pub struct QLQueryParams {
272 query: String,
273 limit: Option<u64>,
274}
275
276pub async fn query_logs_by_ql(
277 State(state): State<AppState>,
278 Query(params): Query<QLQueryParams>,
279) -> Result<Json<LogQueryResponse>, ServerError> {
280 let mut ctx = rlqt_ql::to_query_context(¶ms.query)
281 .map_err(|e| ServerError::InvalidQuery(e.to_string()))?;
282
283 if let Some(l) = params.limit
284 && l > 0
285 {
286 ctx = ctx.limit(l);
287 }
288
289 let db = state.db.clone();
290 let models = tokio::task::spawn_blocking(move || NodeLogEntry::query(&db, &ctx))
291 .await
292 .map_err(|e| ServerError::Io(IoError::other(format!("Task join error: {}", e))))??;
293
294 let total = models.len();
295 let entries: Vec<LogEntry> = models.into_iter().map(LogEntry::from).collect();
296
297 Ok(Json(LogQueryResponse { entries, total }))
298}