Skip to main content

rlqt_lib/rel_db/
node_log_entry.rs

1// Copyright (C) 2025-2026 Michael S. Klishin and Contributors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use crate::entry_metadata::labels::{LABEL_NAMES, LogEntryLabels};
15use crate::parser::ParsedLogEntry;
16use crate::rel_db::DatabaseConnection;
17use crate::rel_db::presets::QueryPreset;
18use chrono::{DateTime, Utc};
19use duckdb::types::Value;
20use duckdb::{Error as DuckDbError, params};
21use serde::{Deserialize, Serialize};
22use std::fmt::Display;
23use std::io::Error as IoError;
24
25fn pool_error_to_duckdb(e: impl Display) -> DuckDbError {
26    DuckDbError::ToSqlConversionFailure(Box::new(IoError::other(format!(
27        "Connection pool error: {}",
28        e
29    ))))
30}
31
32pub struct NodeLogEntry;
33
34const DEFAULT_MAX_QUERY_LIMIT: u64 = 10_000;
35const DB_INSERT_BATCH_SIZE: usize = 3000;
36
37#[derive(Debug, Default, Clone)]
38pub struct QueryContext {
39    pub(crate) since_time: Option<DateTime<Utc>>,
40    pub(crate) to_time: Option<DateTime<Utc>>,
41    pub(crate) severity: Option<String>,
42    pub(crate) erlang_pid: Option<String>,
43    pub(crate) node: Option<String>,
44    pub(crate) subsystem: Option<String>,
45    pub(crate) labels: Vec<String>,
46    pub(crate) matching_all_labels: bool,
47    pub(crate) limit: Option<u64>,
48    pub(crate) has_resolution_or_discussion_url: bool,
49    pub(crate) has_doc_url: bool,
50    pub(crate) preset: Option<QueryPreset>,
51    pub(crate) raw_where_clauses: Vec<String>,
52}
53
54impl QueryContext {
55    #[must_use]
56    pub fn since(mut self, time: DateTime<Utc>) -> Self {
57        self.since_time = Some(time);
58        self
59    }
60
61    #[must_use]
62    pub fn to(mut self, time: DateTime<Utc>) -> Self {
63        self.to_time = Some(time);
64        self
65    }
66
67    #[must_use]
68    pub fn severity(mut self, sev: impl Into<String>) -> Self {
69        self.severity = Some(sev.into());
70        self
71    }
72
73    #[must_use]
74    pub fn erlang_pid(mut self, pid: impl Into<String>) -> Self {
75        self.erlang_pid = Some(pid.into());
76        self
77    }
78
79    #[must_use]
80    pub fn node(mut self, n: impl Into<String>) -> Self {
81        self.node = Some(n.into());
82        self
83    }
84
85    #[must_use]
86    pub fn subsystem(mut self, sub: impl Into<String>) -> Self {
87        self.subsystem = Some(sub.into());
88        self
89    }
90
91    #[must_use]
92    pub fn labels(mut self, labels: Vec<String>) -> Self {
93        self.labels = labels;
94        self
95    }
96
97    #[must_use]
98    pub fn add_label(mut self, label: impl Into<String>) -> Self {
99        self.labels.push(label.into());
100        self
101    }
102
103    #[must_use]
104    pub fn matching_all_labels(mut self, match_all: bool) -> Self {
105        self.matching_all_labels = match_all;
106        self
107    }
108
109    #[must_use]
110    pub fn limit(mut self, l: u64) -> Self {
111        self.limit = Some(l);
112        self
113    }
114
115    #[must_use]
116    pub fn has_resolution_or_discussion_url(mut self, has: bool) -> Self {
117        self.has_resolution_or_discussion_url = has;
118        self
119    }
120
121    #[must_use]
122    pub fn has_doc_url(mut self, has: bool) -> Self {
123        self.has_doc_url = has;
124        self
125    }
126
127    #[must_use]
128    pub fn preset(mut self, preset: QueryPreset) -> Self {
129        self.preset = Some(preset);
130        self
131    }
132
133    /// Adds a raw SQL WHERE clause fragment.
134    ///
135    /// ## Safety
136    ///
137    /// Only use with trusted, pre-validated input such as output from the QL compiler.
138    /// User input must be properly escaped before being included in the clause.
139    ///
140    /// The QL compiler (`rlqt_ql::compiler`) uses `escape_sql_string()` and
141    /// `escape_like_pattern()` to sanitize user-provided values before generating
142    /// SQL fragments that are passed to this method.
143    #[must_use]
144    pub fn raw_where(mut self, clause: impl Into<String>) -> Self {
145        self.raw_where_clauses.push(clause.into());
146        self
147    }
148
149    /// Sets multiple raw SQL WHERE clause fragments.
150    ///
151    /// ## Safety
152    ///
153    /// **This method accepts raw SQL that is concatenated directly into the query.**
154    /// See [`raw_where`](Self::raw_where).
155    #[must_use]
156    pub fn raw_where_clauses(mut self, clauses: Vec<String>) -> Self {
157        self.raw_where_clauses = clauses;
158        self
159    }
160}
161
162#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
163pub struct Model {
164    pub id: i64,
165    pub node: String,
166    pub timestamp: DateTime<Utc>,
167    pub severity: String,
168    pub erlang_pid: String,
169    pub subsystem_id: Option<i16>,
170    pub message: String,
171    pub labels: i64,
172    pub resolution_or_discussion_url_id: Option<i16>,
173    pub doc_url_id: Option<i16>,
174}
175
176impl Model {
177    #[inline]
178    pub fn is_multiline(&self) -> bool {
179        self.message.contains('\n')
180    }
181
182    #[inline]
183    pub fn format_labels(&self) -> String {
184        let labels = LogEntryLabels::from_bits_i64(self.labels);
185        let mut result = String::new();
186        for (i, label_name) in LABEL_NAMES.iter().enumerate() {
187            if labels.bits() & (1u64 << i) != 0 {
188                if !result.is_empty() {
189                    result.push('\n');
190                }
191                result.push_str(label_name);
192            }
193        }
194        result
195    }
196
197    #[inline]
198    pub fn get_labels(&self) -> LogEntryLabels {
199        LogEntryLabels::from_bits_i64(self.labels)
200    }
201}
202
203impl NodeLogEntry {
204    pub fn count_all(db: &DatabaseConnection) -> Result<u64, DuckDbError> {
205        let conn = db.get().map_err(pool_error_to_duckdb)?;
206
207        let mut stmt = conn.prepare("SELECT COUNT(*) FROM node_log_entries")?;
208        let count: i64 = stmt.query_row([], |row| row.get(0))?;
209        Ok(count as u64)
210    }
211
212    pub fn max_entry_id(db: &DatabaseConnection) -> Result<i64, DuckDbError> {
213        let conn = db.get().map_err(pool_error_to_duckdb)?;
214
215        let max_id: Option<i64> = conn
216            .query_row("SELECT MAX(id) FROM node_log_entries", [], |row| row.get(0))
217            .ok()
218            .flatten();
219        Ok(max_id.unwrap_or(0))
220    }
221
222    pub fn query(db: &DatabaseConnection, ctx: &QueryContext) -> Result<Vec<Model>, DuckDbError> {
223        let conn = db.get().map_err(pool_error_to_duckdb)?;
224
225        let mut conditions = Vec::new();
226        let mut params: Vec<Value> = Vec::new();
227
228        if let Some(since) = ctx.since_time {
229            conditions.push("timestamp >= ?".to_string());
230            params.push(Value::Timestamp(
231                duckdb::types::TimeUnit::Microsecond,
232                since.timestamp_micros(),
233            ));
234        }
235
236        if let Some(to) = ctx.to_time {
237            conditions.push("timestamp <= ?".to_string());
238            params.push(Value::Timestamp(
239                duckdb::types::TimeUnit::Microsecond,
240                to.timestamp_micros(),
241            ));
242        }
243
244        if let Some(ref preset) = ctx.preset {
245            let mut or_parts = Vec::new();
246
247            if let Some(sev) = preset.severity() {
248                or_parts.push("severity = ?".to_string());
249                params.push(Value::Text(sev.to_string()));
250            }
251
252            let label_mask = preset.labels().bits();
253            if label_mask != 0 {
254                or_parts.push("(labels & ?) != 0".to_string());
255                params.push(Value::BigInt(label_mask as i64));
256            }
257
258            if !or_parts.is_empty() {
259                conditions.push(format!("({})", or_parts.join(" OR ")));
260            }
261        } else {
262            if let Some(ref sev) = ctx.severity {
263                conditions.push("severity = ?".to_string());
264                params.push(Value::Text(sev.clone()));
265            }
266
267            if !ctx.labels.is_empty() {
268                let mut combined_mask: u64 = 0;
269                for label in &ctx.labels {
270                    if let Some(bit) = LogEntryLabels::bit_for_label(label) {
271                        combined_mask |= bit;
272                    }
273                }
274                if combined_mask != 0 {
275                    if ctx.matching_all_labels {
276                        conditions.push("(labels & ?) = ?".to_string());
277                        params.push(Value::BigInt(combined_mask as i64));
278                        params.push(Value::BigInt(combined_mask as i64));
279                    } else {
280                        conditions.push("(labels & ?) != 0".to_string());
281                        params.push(Value::BigInt(combined_mask as i64));
282                    }
283                }
284            }
285        }
286
287        if let Some(ref pid) = ctx.erlang_pid {
288            conditions.push("erlang_pid = ?".to_string());
289            params.push(Value::Text(pid.clone()));
290        }
291
292        if let Some(ref n) = ctx.node {
293            conditions.push("node = ?".to_string());
294            params.push(Value::Text(n.clone()));
295        }
296
297        if let Some(ref sub) = ctx.subsystem
298            && let Ok(subsystem) = sub.parse::<crate::entry_metadata::subsystems::Subsystem>()
299        {
300            conditions.push("subsystem_id = ?".to_string());
301            params.push(Value::SmallInt(subsystem.to_id()));
302        }
303
304        if ctx.has_resolution_or_discussion_url {
305            conditions.push("resolution_or_discussion_url_id IS NOT NULL".to_string());
306        }
307
308        if ctx.has_doc_url {
309            conditions.push("doc_url_id IS NOT NULL".to_string());
310        }
311
312        for clause in &ctx.raw_where_clauses {
313            conditions.push(clause.clone());
314        }
315
316        let effective_limit = ctx.limit.unwrap_or(DEFAULT_MAX_QUERY_LIMIT);
317
318        let where_clause = if conditions.is_empty() {
319            String::new()
320        } else {
321            format!("WHERE {}", conditions.join(" AND "))
322        };
323
324        let sql = format!(
325            "SELECT id, node, timestamp, severity, erlang_pid, subsystem_id, message, labels, resolution_or_discussion_url_id, doc_url_id
326             FROM node_log_entries
327             {}
328             ORDER BY timestamp ASC
329             LIMIT {}",
330            where_clause, effective_limit
331        );
332
333        let mut stmt = conn.prepare(&sql)?;
334        let params_slice: Vec<&dyn duckdb::ToSql> =
335            params.iter().map(|p| p as &dyn duckdb::ToSql).collect();
336
337        let rows = stmt.query_map(params_slice.as_slice(), |row| {
338            let timestamp_micros: i64 = row.get(2)?;
339            let timestamp = DateTime::from_timestamp_micros(timestamp_micros)
340                .unwrap_or_else(|| DateTime::from_timestamp(0, 0).unwrap());
341
342            Ok(Model {
343                id: row.get(0)?,
344                node: row.get(1)?,
345                timestamp,
346                severity: row.get(3)?,
347                erlang_pid: row.get(4)?,
348                subsystem_id: row.get(5)?,
349                message: row.get(6)?,
350                labels: row.get(7)?,
351                resolution_or_discussion_url_id: row.get(8)?,
352                doc_url_id: row.get(9)?,
353            })
354        })?;
355
356        let mut results = Vec::new();
357        for row_result in rows {
358            results.push(row_result?);
359        }
360        Ok(results)
361    }
362
363    pub fn insert_parsed_entries(
364        db: &DatabaseConnection,
365        entries: &[ParsedLogEntry],
366        node: &str,
367    ) -> Result<(), DuckDbError> {
368        if entries.is_empty() {
369            return Ok(());
370        }
371
372        let conn = db.get().map_err(pool_error_to_duckdb)?;
373
374        let needs_auto_id = entries.iter().any(|e| e.explicit_id.is_none());
375        let mut running_id = if needs_auto_id {
376            let max_id: Option<i64> = conn
377                .query_row("SELECT MAX(id) FROM node_log_entries", [], |row| row.get(0))
378                .ok()
379                .flatten();
380            max_id.unwrap_or(0) + 1
381        } else {
382            0
383        };
384
385        for chunk in entries.chunks(DB_INSERT_BATCH_SIZE) {
386            let mut appender = conn.appender("node_log_entries")?;
387
388            for entry in chunk {
389                let id = entry.explicit_id.unwrap_or_else(|| {
390                    let id = running_id;
391                    running_id += 1;
392                    id
393                });
394                let timestamp_micros = entry.timestamp.timestamp_micros();
395
396                appender.append_row(params![
397                    id,
398                    node,
399                    Value::Timestamp(duckdb::types::TimeUnit::Microsecond, timestamp_micros),
400                    entry.severity.to_string(),
401                    entry.process_id,
402                    entry.subsystem_id,
403                    entry.message,
404                    entry.labels.to_bits_i64(),
405                    entry.resolution_or_discussion_url_id,
406                    entry.doc_url_id,
407                ])?;
408            }
409
410            appender.flush()?;
411        }
412
413        Ok(())
414    }
415
416    pub fn find_all(db: &DatabaseConnection) -> Result<Vec<Model>, DuckDbError> {
417        let conn = db.get().map_err(pool_error_to_duckdb)?;
418
419        let mut stmt = conn.prepare(
420            "SELECT id, node, timestamp, severity, erlang_pid, subsystem_id, message, labels, resolution_or_discussion_url_id, doc_url_id
421             FROM node_log_entries
422             ORDER BY timestamp ASC",
423        )?;
424
425        let rows = stmt.query_map([], |row| {
426            let timestamp_micros: i64 = row.get(2)?;
427            let timestamp = DateTime::from_timestamp_micros(timestamp_micros)
428                .unwrap_or_else(|| DateTime::from_timestamp(0, 0).unwrap());
429
430            Ok(Model {
431                id: row.get(0)?,
432                node: row.get(1)?,
433                timestamp,
434                severity: row.get(3)?,
435                erlang_pid: row.get(4)?,
436                subsystem_id: row.get(5)?,
437                message: row.get(6)?,
438                labels: row.get(7)?,
439                resolution_or_discussion_url_id: row.get(8)?,
440                doc_url_id: row.get(9)?,
441            })
442        })?;
443
444        let mut results = Vec::new();
445        for row_result in rows {
446            results.push(row_result?);
447        }
448        Ok(results)
449    }
450
451    pub fn get_node_counts(db: &DatabaseConnection) -> Result<Vec<(String, i64)>, DuckDbError> {
452        let conn = db.get().map_err(pool_error_to_duckdb)?;
453
454        let mut stmt = conn.prepare(
455            "SELECT node, COUNT(*) as count FROM node_log_entries GROUP BY node ORDER BY node ASC",
456        )?;
457
458        let rows = stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?;
459
460        let mut results = Vec::new();
461        for row_result in rows {
462            results.push(row_result?);
463        }
464        Ok(results)
465    }
466}