rlqt_lib/rel_db/
node_log_entry.rs

1// Copyright (C) 2025-2026 Michael S. Klishin and Contributors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use crate::entry_metadata::labels::LABEL_NAMES;
15use crate::parser::ParsedLogEntry;
16use sea_orm::entity::prelude::*;
17use sea_orm::sea_query::Expr;
18use sea_orm::{ActiveValue, Condition, QueryOrder, QuerySelect, TransactionTrait};
19use serde::{Deserialize, Serialize};
20use serde_json::{Map, Value, to_value};
21
22pub type NodeLogEntry = Entity;
23
24/// Default maximum number of entries returned by filtering queries.
25/// Can be overridden by specifying an explicit limit in the query context.
26const DEFAULT_MAX_QUERY_LIMIT: u64 = 10_000;
27
28/// Number of entries to insert in a single database transaction.
29/// Batching provides a 10-15% entry insertion speedup.
30///
31/// Values higher hit the law of diminishing returns.
32const DB_INSERT_BATCH_SIZE: usize = 2000;
33
34/// Query context for filtering log entries
35#[derive(Debug, Default, Clone)]
36pub struct QueryContext {
37    pub(crate) since_time: Option<DateTimeUtc>,
38    pub(crate) to_time: Option<DateTimeUtc>,
39    pub(crate) severity: Option<String>,
40    pub(crate) erlang_pid: Option<String>,
41    pub(crate) node: Option<String>,
42    pub(crate) subsystem: Option<String>,
43    pub(crate) labels: Vec<String>,
44    pub(crate) matching_all_labels: bool,
45    pub(crate) limit: Option<u64>,
46    pub(crate) has_resolution_or_discussion_url: bool,
47    pub(crate) has_doc_url: bool,
48}
49
50impl QueryContext {
51    #[must_use]
52    pub fn since(mut self, time: DateTimeUtc) -> Self {
53        self.since_time = Some(time);
54        self
55    }
56
57    #[must_use]
58    pub fn to(mut self, time: DateTimeUtc) -> Self {
59        self.to_time = Some(time);
60        self
61    }
62
63    #[must_use]
64    pub fn severity(mut self, sev: impl Into<String>) -> Self {
65        self.severity = Some(sev.into());
66        self
67    }
68
69    #[must_use]
70    pub fn erlang_pid(mut self, pid: impl Into<String>) -> Self {
71        self.erlang_pid = Some(pid.into());
72        self
73    }
74
75    #[must_use]
76    pub fn node(mut self, n: impl Into<String>) -> Self {
77        self.node = Some(n.into());
78        self
79    }
80
81    #[must_use]
82    pub fn subsystem(mut self, sub: impl Into<String>) -> Self {
83        self.subsystem = Some(sub.into());
84        self
85    }
86
87    #[must_use]
88    pub fn labels(mut self, labels: Vec<String>) -> Self {
89        self.labels = labels;
90        self
91    }
92
93    #[must_use]
94    pub fn add_label(mut self, label: impl Into<String>) -> Self {
95        self.labels.push(label.into());
96        self
97    }
98
99    #[must_use]
100    pub fn matching_all_labels(mut self, match_all: bool) -> Self {
101        self.matching_all_labels = match_all;
102        self
103    }
104
105    #[must_use]
106    pub fn limit(mut self, l: u64) -> Self {
107        self.limit = Some(l);
108        self
109    }
110
111    #[must_use]
112    pub fn has_resolution_or_discussion_url(mut self, has: bool) -> Self {
113        self.has_resolution_or_discussion_url = has;
114        self
115    }
116
117    #[must_use]
118    pub fn has_doc_url(mut self, has: bool) -> Self {
119        self.has_doc_url = has;
120        self
121    }
122}
123
124/// A RabbitMQ log entry stored in the database
125#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
126#[sea_orm(table_name = "node_log_entries")]
127pub struct Model {
128    /// Unique numerical ID (primary key)
129    #[sea_orm(primary_key, auto_increment = true)]
130    pub id: i64,
131
132    /// Node name
133    #[sea_orm(indexed)]
134    pub node: String,
135
136    /// Timestamp of the log entry
137    #[sea_orm(indexed)]
138    pub timestamp: DateTimeUtc,
139
140    /// Log severity level (debug, info, notice, warning, error, critical)
141    #[sea_orm(indexed)]
142    pub severity: String,
143
144    /// Erlang PID (e.g., "<0.208.0>")
145    #[sea_orm(indexed)]
146    pub erlang_pid: String,
147
148    /// Subsystem identifier
149    #[sea_orm(indexed)]
150    pub subsystem_id: Option<i16>,
151
152    /// Log message content (can be multiline)
153    pub message: String,
154
155    /// Labels attached to this log entry (stored as JSONB)
156    #[sea_orm(column_type = "JsonBinary")]
157    pub labels: Json,
158
159    /// ID of related resolution or discussion URL
160    pub resolution_or_discussion_url_id: Option<i16>,
161
162    /// ID of relevant documentation URL
163    pub doc_url_id: Option<i16>,
164}
165
166#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
167pub enum Relation {}
168
169impl ActiveModelBehavior for ActiveModel {}
170
171impl Model {
172    /// Check if this log entry is multiline
173    #[inline]
174    pub fn is_multiline(&self) -> bool {
175        self.message.contains('\n')
176    }
177
178    /// Format labels as a newline-separated list of set labels
179    #[inline]
180    pub fn format_labels(&self) -> String {
181        if let Some(obj) = self.labels.as_object() {
182            LABEL_NAMES
183                .iter()
184                .filter(|&label| obj.get(*label).and_then(|v| v.as_bool()).unwrap_or(false))
185                .copied()
186                .collect::<Vec<_>>()
187                .join("\n")
188        } else {
189            String::new()
190        }
191    }
192}
193
194impl ActiveModel {
195    fn from_parsed(entry: &ParsedLogEntry, node: &str) -> Self {
196        let labels_json = to_value(entry.labels).unwrap_or_else(|_| Value::Object(Map::new()));
197
198        let id_value = if let Some(explicit_id) = entry.explicit_id {
199            ActiveValue::Set(explicit_id)
200        } else {
201            ActiveValue::NotSet
202        };
203
204        Self {
205            id: id_value,
206            node: ActiveValue::Set(node.to_string()),
207            timestamp: ActiveValue::Set(entry.timestamp),
208            severity: ActiveValue::Set(entry.severity.to_string()),
209            erlang_pid: ActiveValue::Set(entry.process_id.clone()),
210            subsystem_id: ActiveValue::Set(entry.subsystem_id),
211            message: ActiveValue::Set(entry.message.clone()),
212            labels: ActiveValue::Set(labels_json),
213            resolution_or_discussion_url_id: ActiveValue::Set(
214                entry.resolution_or_discussion_url_id,
215            ),
216            doc_url_id: ActiveValue::Set(entry.doc_url_id),
217        }
218    }
219}
220
221impl Entity {
222    /// Count all log entries
223    pub async fn count_all(db: &DatabaseConnection) -> Result<u64, DbErr> {
224        Self::find().count(db).await
225    }
226
227    /// Query log entries with optional filters
228    ///
229    /// Default limit is 10,000 entries to prevent memory exhaustion.
230    /// Specify a limit explicitly to override this.
231    pub async fn query(db: &DatabaseConnection, ctx: &QueryContext) -> Result<Vec<Model>, DbErr> {
232        let mut query = Self::find();
233
234        if let Some(since) = ctx.since_time {
235            query = query.filter(Column::Timestamp.gte(since));
236        }
237
238        if let Some(to) = ctx.to_time {
239            query = query.filter(Column::Timestamp.lte(to));
240        }
241
242        if let Some(ref sev) = ctx.severity {
243            query = query.filter(Column::Severity.eq(sev));
244        }
245
246        if let Some(ref pid) = ctx.erlang_pid {
247            query = query.filter(Column::ErlangPid.eq(pid));
248        }
249
250        if let Some(ref n) = ctx.node {
251            query = query.filter(Column::Node.eq(n));
252        }
253
254        if let Some(ref sub) = ctx.subsystem
255            && let Ok(subsystem) = sub.parse::<crate::entry_metadata::subsystems::Subsystem>()
256        {
257            query = query.filter(Column::SubsystemId.eq(subsystem.to_id()));
258        }
259
260        if !ctx.labels.is_empty() {
261            let mut label_condition = if ctx.matching_all_labels {
262                Condition::all()
263            } else {
264                Condition::any()
265            };
266            for label in &ctx.labels {
267                if LABEL_NAMES.contains(&label.as_str()) {
268                    let json_path = format!("$.{}", label);
269                    label_condition = label_condition.add(
270                        Expr::cust_with_values("json_extract(labels, ?)", [json_path]).eq(true),
271                    );
272                }
273            }
274            query = query.filter(label_condition);
275        }
276
277        if ctx.has_resolution_or_discussion_url {
278            query = query.filter(Column::ResolutionOrDiscussionUrlId.is_not_null());
279        }
280
281        if ctx.has_doc_url {
282            query = query.filter(Column::DocUrlId.is_not_null());
283        }
284
285        query = query.order_by_asc(Column::Timestamp);
286
287        let effective_limit = ctx.limit.unwrap_or(DEFAULT_MAX_QUERY_LIMIT);
288        query = query.limit(effective_limit);
289
290        query.all(db).await
291    }
292
293    async fn insert_batch(db: &DatabaseConnection, entries: Vec<ActiveModel>) -> Result<(), DbErr> {
294        if entries.is_empty() {
295            return Ok(());
296        }
297
298        let txn = db.begin().await?;
299        Entity::insert_many(entries).exec(&txn).await?;
300        txn.commit().await?;
301        Ok(())
302    }
303
304    pub async fn insert_parsed_entries(
305        db: &DatabaseConnection,
306        entries: &[ParsedLogEntry],
307        node: &str,
308    ) -> Result<(), DbErr> {
309        let active_models: Vec<ActiveModel> = entries
310            .iter()
311            .map(|entry| ActiveModel::from_parsed(entry, node))
312            .collect();
313        Self::insert_batch(db, active_models).await
314    }
315
316    pub async fn insert_parsed_entries_bulk(
317        db: &DatabaseConnection,
318        entries: &[ParsedLogEntry],
319        node: &str,
320    ) -> Result<(), DbErr> {
321        if entries.is_empty() {
322            return Ok(());
323        }
324
325        let txn = db.begin().await?;
326        for chunk in entries.chunks(DB_INSERT_BATCH_SIZE) {
327            let active_models: Vec<ActiveModel> = chunk
328                .iter()
329                .map(|entry| ActiveModel::from_parsed(entry, node))
330                .collect();
331            Entity::insert_many(active_models).exec(&txn).await?;
332        }
333        txn.commit().await?;
334        Ok(())
335    }
336}