1use crate::entry_metadata::labels::{LABEL_NAMES, LogEntryLabels};
15use crate::parser::ParsedLogEntry;
16use crate::rel_db::DatabaseConnection;
17use crate::rel_db::presets::QueryPreset;
18use chrono::{DateTime, Utc};
19use duckdb::types::Value;
20use duckdb::{Error as DuckDbError, params};
21use serde::{Deserialize, Serialize};
22use std::fmt::Display;
23use std::io::Error as IoError;
24
25fn pool_error_to_duckdb(e: impl Display) -> DuckDbError {
26 DuckDbError::ToSqlConversionFailure(Box::new(IoError::other(format!(
27 "Connection pool error: {}",
28 e
29 ))))
30}
31
32pub struct NodeLogEntry;
33
34const DEFAULT_MAX_QUERY_LIMIT: u64 = 10_000;
35const DB_INSERT_BATCH_SIZE: usize = 3000;
36
37#[derive(Debug, Default, Clone)]
38pub struct QueryContext {
39 pub(crate) since_time: Option<DateTime<Utc>>,
40 pub(crate) to_time: Option<DateTime<Utc>>,
41 pub(crate) severity: Option<String>,
42 pub(crate) erlang_pid: Option<String>,
43 pub(crate) node: Option<String>,
44 pub(crate) subsystem: Option<String>,
45 pub(crate) labels: Vec<String>,
46 pub(crate) matching_all_labels: bool,
47 pub(crate) limit: Option<u64>,
48 pub(crate) has_resolution_or_discussion_url: bool,
49 pub(crate) has_doc_url: bool,
50 pub(crate) preset: Option<QueryPreset>,
51 pub(crate) raw_where_clauses: Vec<String>,
52}
53
54impl QueryContext {
55 #[must_use]
56 pub fn since(mut self, time: DateTime<Utc>) -> Self {
57 self.since_time = Some(time);
58 self
59 }
60
61 #[must_use]
62 pub fn to(mut self, time: DateTime<Utc>) -> Self {
63 self.to_time = Some(time);
64 self
65 }
66
67 #[must_use]
68 pub fn severity(mut self, sev: impl Into<String>) -> Self {
69 self.severity = Some(sev.into());
70 self
71 }
72
73 #[must_use]
74 pub fn erlang_pid(mut self, pid: impl Into<String>) -> Self {
75 self.erlang_pid = Some(pid.into());
76 self
77 }
78
79 #[must_use]
80 pub fn node(mut self, n: impl Into<String>) -> Self {
81 self.node = Some(n.into());
82 self
83 }
84
85 #[must_use]
86 pub fn subsystem(mut self, sub: impl Into<String>) -> Self {
87 self.subsystem = Some(sub.into());
88 self
89 }
90
91 #[must_use]
92 pub fn labels(mut self, labels: Vec<String>) -> Self {
93 self.labels = labels;
94 self
95 }
96
97 #[must_use]
98 pub fn add_label(mut self, label: impl Into<String>) -> Self {
99 self.labels.push(label.into());
100 self
101 }
102
103 #[must_use]
104 pub fn matching_all_labels(mut self, match_all: bool) -> Self {
105 self.matching_all_labels = match_all;
106 self
107 }
108
109 #[must_use]
110 pub fn limit(mut self, l: u64) -> Self {
111 self.limit = Some(l);
112 self
113 }
114
115 #[must_use]
116 pub fn has_resolution_or_discussion_url(mut self, has: bool) -> Self {
117 self.has_resolution_or_discussion_url = has;
118 self
119 }
120
121 #[must_use]
122 pub fn has_doc_url(mut self, has: bool) -> Self {
123 self.has_doc_url = has;
124 self
125 }
126
127 #[must_use]
128 pub fn preset(mut self, preset: QueryPreset) -> Self {
129 self.preset = Some(preset);
130 self
131 }
132
133 #[must_use]
144 pub fn raw_where(mut self, clause: impl Into<String>) -> Self {
145 self.raw_where_clauses.push(clause.into());
146 self
147 }
148
149 #[must_use]
156 pub fn raw_where_clauses(mut self, clauses: Vec<String>) -> Self {
157 self.raw_where_clauses = clauses;
158 self
159 }
160}
161
162#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
163pub struct Model {
164 pub id: i64,
165 pub node: String,
166 pub timestamp: DateTime<Utc>,
167 pub severity: String,
168 pub erlang_pid: String,
169 pub subsystem_id: Option<i16>,
170 pub message: String,
171 pub labels: i64,
172 pub resolution_or_discussion_url_id: Option<i16>,
173 pub doc_url_id: Option<i16>,
174}
175
176impl Model {
177 #[inline]
178 pub fn is_multiline(&self) -> bool {
179 self.message.contains('\n')
180 }
181
182 #[inline]
183 pub fn format_labels(&self) -> String {
184 let labels = LogEntryLabels::from_bits_i64(self.labels);
185 let mut result = String::new();
186 for (i, label_name) in LABEL_NAMES.iter().enumerate() {
187 if labels.bits() & (1u64 << i) != 0 {
188 if !result.is_empty() {
189 result.push('\n');
190 }
191 result.push_str(label_name);
192 }
193 }
194 result
195 }
196
197 #[inline]
198 pub fn get_labels(&self) -> LogEntryLabels {
199 LogEntryLabels::from_bits_i64(self.labels)
200 }
201}
202
203impl NodeLogEntry {
204 pub fn count_all(db: &DatabaseConnection) -> Result<u64, DuckDbError> {
205 let conn = db.get().map_err(pool_error_to_duckdb)?;
206
207 let mut stmt = conn.prepare("SELECT COUNT(*) FROM node_log_entries")?;
208 let count: i64 = stmt.query_row([], |row| row.get(0))?;
209 Ok(count as u64)
210 }
211
212 pub fn max_entry_id(db: &DatabaseConnection) -> Result<i64, DuckDbError> {
213 let conn = db.get().map_err(pool_error_to_duckdb)?;
214
215 let max_id: Option<i64> = conn
216 .query_row("SELECT MAX(id) FROM node_log_entries", [], |row| row.get(0))
217 .ok()
218 .flatten();
219 Ok(max_id.unwrap_or(0))
220 }
221
222 pub fn query(db: &DatabaseConnection, ctx: &QueryContext) -> Result<Vec<Model>, DuckDbError> {
223 let conn = db.get().map_err(pool_error_to_duckdb)?;
224
225 let mut conditions = Vec::new();
226 let mut params: Vec<Value> = Vec::new();
227
228 if let Some(since) = ctx.since_time {
229 conditions.push("timestamp >= ?".to_string());
230 params.push(Value::Timestamp(
231 duckdb::types::TimeUnit::Microsecond,
232 since.timestamp_micros(),
233 ));
234 }
235
236 if let Some(to) = ctx.to_time {
237 conditions.push("timestamp <= ?".to_string());
238 params.push(Value::Timestamp(
239 duckdb::types::TimeUnit::Microsecond,
240 to.timestamp_micros(),
241 ));
242 }
243
244 if let Some(ref preset) = ctx.preset {
245 let mut or_parts = Vec::new();
246
247 if let Some(sev) = preset.severity() {
248 or_parts.push("severity = ?".to_string());
249 params.push(Value::Text(sev.to_string()));
250 }
251
252 let label_mask = preset.labels().bits();
253 if label_mask != 0 {
254 or_parts.push("(labels & ?) != 0".to_string());
255 params.push(Value::BigInt(label_mask as i64));
256 }
257
258 if !or_parts.is_empty() {
259 conditions.push(format!("({})", or_parts.join(" OR ")));
260 }
261 } else {
262 if let Some(ref sev) = ctx.severity {
263 conditions.push("severity = ?".to_string());
264 params.push(Value::Text(sev.clone()));
265 }
266
267 if !ctx.labels.is_empty() {
268 let mut combined_mask: u64 = 0;
269 for label in &ctx.labels {
270 if let Some(bit) = LogEntryLabels::bit_for_label(label) {
271 combined_mask |= bit;
272 }
273 }
274 if combined_mask != 0 {
275 if ctx.matching_all_labels {
276 conditions.push("(labels & ?) = ?".to_string());
277 params.push(Value::BigInt(combined_mask as i64));
278 params.push(Value::BigInt(combined_mask as i64));
279 } else {
280 conditions.push("(labels & ?) != 0".to_string());
281 params.push(Value::BigInt(combined_mask as i64));
282 }
283 }
284 }
285 }
286
287 if let Some(ref pid) = ctx.erlang_pid {
288 conditions.push("erlang_pid = ?".to_string());
289 params.push(Value::Text(pid.clone()));
290 }
291
292 if let Some(ref n) = ctx.node {
293 conditions.push("node = ?".to_string());
294 params.push(Value::Text(n.clone()));
295 }
296
297 if let Some(ref sub) = ctx.subsystem
298 && let Ok(subsystem) = sub.parse::<crate::entry_metadata::subsystems::Subsystem>()
299 {
300 conditions.push("subsystem_id = ?".to_string());
301 params.push(Value::SmallInt(subsystem.to_id()));
302 }
303
304 if ctx.has_resolution_or_discussion_url {
305 conditions.push("resolution_or_discussion_url_id IS NOT NULL".to_string());
306 }
307
308 if ctx.has_doc_url {
309 conditions.push("doc_url_id IS NOT NULL".to_string());
310 }
311
312 for clause in &ctx.raw_where_clauses {
313 conditions.push(clause.clone());
314 }
315
316 let effective_limit = ctx.limit.unwrap_or(DEFAULT_MAX_QUERY_LIMIT);
317
318 let where_clause = if conditions.is_empty() {
319 String::new()
320 } else {
321 format!("WHERE {}", conditions.join(" AND "))
322 };
323
324 let sql = format!(
325 "SELECT id, node, timestamp, severity, erlang_pid, subsystem_id, message, labels, resolution_or_discussion_url_id, doc_url_id
326 FROM node_log_entries
327 {}
328 ORDER BY timestamp ASC
329 LIMIT {}",
330 where_clause, effective_limit
331 );
332
333 let mut stmt = conn.prepare(&sql)?;
334 let params_slice: Vec<&dyn duckdb::ToSql> =
335 params.iter().map(|p| p as &dyn duckdb::ToSql).collect();
336
337 let rows = stmt.query_map(params_slice.as_slice(), |row| {
338 let timestamp_micros: i64 = row.get(2)?;
339 let timestamp = DateTime::from_timestamp_micros(timestamp_micros)
340 .unwrap_or_else(|| DateTime::from_timestamp(0, 0).unwrap());
341
342 Ok(Model {
343 id: row.get(0)?,
344 node: row.get(1)?,
345 timestamp,
346 severity: row.get(3)?,
347 erlang_pid: row.get(4)?,
348 subsystem_id: row.get(5)?,
349 message: row.get(6)?,
350 labels: row.get(7)?,
351 resolution_or_discussion_url_id: row.get(8)?,
352 doc_url_id: row.get(9)?,
353 })
354 })?;
355
356 let mut results = Vec::new();
357 for row_result in rows {
358 results.push(row_result?);
359 }
360 Ok(results)
361 }
362
363 pub fn insert_parsed_entries(
364 db: &DatabaseConnection,
365 entries: &[ParsedLogEntry],
366 node: &str,
367 ) -> Result<(), DuckDbError> {
368 if entries.is_empty() {
369 return Ok(());
370 }
371
372 let conn = db.get().map_err(pool_error_to_duckdb)?;
373
374 let needs_auto_id = entries.iter().any(|e| e.explicit_id.is_none());
375 let mut running_id = if needs_auto_id {
376 let max_id: Option<i64> = conn
377 .query_row("SELECT MAX(id) FROM node_log_entries", [], |row| row.get(0))
378 .ok()
379 .flatten();
380 max_id.unwrap_or(0) + 1
381 } else {
382 0
383 };
384
385 for chunk in entries.chunks(DB_INSERT_BATCH_SIZE) {
386 let mut appender = conn.appender("node_log_entries")?;
387
388 for entry in chunk {
389 let id = entry.explicit_id.unwrap_or_else(|| {
390 let id = running_id;
391 running_id += 1;
392 id
393 });
394 let timestamp_micros = entry.timestamp.timestamp_micros();
395
396 appender.append_row(params![
397 id,
398 node,
399 Value::Timestamp(duckdb::types::TimeUnit::Microsecond, timestamp_micros),
400 entry.severity.to_string(),
401 entry.process_id,
402 entry.subsystem_id,
403 entry.message,
404 entry.labels.to_bits_i64(),
405 entry.resolution_or_discussion_url_id,
406 entry.doc_url_id,
407 ])?;
408 }
409
410 appender.flush()?;
411 }
412
413 Ok(())
414 }
415
416 pub fn find_all(db: &DatabaseConnection) -> Result<Vec<Model>, DuckDbError> {
417 let conn = db.get().map_err(pool_error_to_duckdb)?;
418
419 let mut stmt = conn.prepare(
420 "SELECT id, node, timestamp, severity, erlang_pid, subsystem_id, message, labels, resolution_or_discussion_url_id, doc_url_id
421 FROM node_log_entries
422 ORDER BY timestamp ASC",
423 )?;
424
425 let rows = stmt.query_map([], |row| {
426 let timestamp_micros: i64 = row.get(2)?;
427 let timestamp = DateTime::from_timestamp_micros(timestamp_micros)
428 .unwrap_or_else(|| DateTime::from_timestamp(0, 0).unwrap());
429
430 Ok(Model {
431 id: row.get(0)?,
432 node: row.get(1)?,
433 timestamp,
434 severity: row.get(3)?,
435 erlang_pid: row.get(4)?,
436 subsystem_id: row.get(5)?,
437 message: row.get(6)?,
438 labels: row.get(7)?,
439 resolution_or_discussion_url_id: row.get(8)?,
440 doc_url_id: row.get(9)?,
441 })
442 })?;
443
444 let mut results = Vec::new();
445 for row_result in rows {
446 results.push(row_result?);
447 }
448 Ok(results)
449 }
450
451 pub fn get_node_counts(db: &DatabaseConnection) -> Result<Vec<(String, i64)>, DuckDbError> {
452 let conn = db.get().map_err(pool_error_to_duckdb)?;
453
454 let mut stmt = conn.prepare(
455 "SELECT node, COUNT(*) as count FROM node_log_entries GROUP BY node ORDER BY node ASC",
456 )?;
457
458 let rows = stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?;
459
460 let mut results = Vec::new();
461 for row_result in rows {
462 results.push(row_result?);
463 }
464 Ok(results)
465 }
466}