rlqt_lib/rel_db/
node_log_entry.rs1use crate::entry_metadata::labels::LABEL_NAMES;
15use crate::parser::ParsedLogEntry;
16use sea_orm::entity::prelude::*;
17use sea_orm::sea_query::Expr;
18use sea_orm::{ActiveValue, Condition, QueryOrder, QuerySelect, TransactionTrait};
19use serde::{Deserialize, Serialize};
20use serde_json::{Map, Value, to_value};
21
22pub type NodeLogEntry = Entity;
23
24const DEFAULT_MAX_QUERY_LIMIT: u64 = 10_000;
27
28const DB_INSERT_BATCH_SIZE: usize = 2000;
33
34#[derive(Debug, Default, Clone)]
36pub struct QueryContext {
37 pub(crate) since_time: Option<DateTimeUtc>,
38 pub(crate) to_time: Option<DateTimeUtc>,
39 pub(crate) severity: Option<String>,
40 pub(crate) erlang_pid: Option<String>,
41 pub(crate) node: Option<String>,
42 pub(crate) subsystem: Option<String>,
43 pub(crate) labels: Vec<String>,
44 pub(crate) matching_all_labels: bool,
45 pub(crate) limit: Option<u64>,
46 pub(crate) has_resolution_or_discussion_url: bool,
47 pub(crate) has_doc_url: bool,
48}
49
50impl QueryContext {
51 #[must_use]
52 pub fn since(mut self, time: DateTimeUtc) -> Self {
53 self.since_time = Some(time);
54 self
55 }
56
57 #[must_use]
58 pub fn to(mut self, time: DateTimeUtc) -> Self {
59 self.to_time = Some(time);
60 self
61 }
62
63 #[must_use]
64 pub fn severity(mut self, sev: impl Into<String>) -> Self {
65 self.severity = Some(sev.into());
66 self
67 }
68
69 #[must_use]
70 pub fn erlang_pid(mut self, pid: impl Into<String>) -> Self {
71 self.erlang_pid = Some(pid.into());
72 self
73 }
74
75 #[must_use]
76 pub fn node(mut self, n: impl Into<String>) -> Self {
77 self.node = Some(n.into());
78 self
79 }
80
81 #[must_use]
82 pub fn subsystem(mut self, sub: impl Into<String>) -> Self {
83 self.subsystem = Some(sub.into());
84 self
85 }
86
87 #[must_use]
88 pub fn labels(mut self, labels: Vec<String>) -> Self {
89 self.labels = labels;
90 self
91 }
92
93 #[must_use]
94 pub fn add_label(mut self, label: impl Into<String>) -> Self {
95 self.labels.push(label.into());
96 self
97 }
98
99 #[must_use]
100 pub fn matching_all_labels(mut self, match_all: bool) -> Self {
101 self.matching_all_labels = match_all;
102 self
103 }
104
105 #[must_use]
106 pub fn limit(mut self, l: u64) -> Self {
107 self.limit = Some(l);
108 self
109 }
110
111 #[must_use]
112 pub fn has_resolution_or_discussion_url(mut self, has: bool) -> Self {
113 self.has_resolution_or_discussion_url = has;
114 self
115 }
116
117 #[must_use]
118 pub fn has_doc_url(mut self, has: bool) -> Self {
119 self.has_doc_url = has;
120 self
121 }
122}
123
124#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
126#[sea_orm(table_name = "node_log_entries")]
127pub struct Model {
128 #[sea_orm(primary_key, auto_increment = true)]
130 pub id: i64,
131
132 #[sea_orm(indexed)]
134 pub node: String,
135
136 #[sea_orm(indexed)]
138 pub timestamp: DateTimeUtc,
139
140 #[sea_orm(indexed)]
142 pub severity: String,
143
144 #[sea_orm(indexed)]
146 pub erlang_pid: String,
147
148 #[sea_orm(indexed)]
150 pub subsystem_id: Option<i16>,
151
152 pub message: String,
154
155 #[sea_orm(column_type = "JsonBinary")]
157 pub labels: Json,
158
159 pub resolution_or_discussion_url_id: Option<i16>,
161
162 pub doc_url_id: Option<i16>,
164}
165
166#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
167pub enum Relation {}
168
169impl ActiveModelBehavior for ActiveModel {}
170
171impl Model {
172 #[inline]
174 pub fn is_multiline(&self) -> bool {
175 self.message.contains('\n')
176 }
177
178 #[inline]
180 pub fn format_labels(&self) -> String {
181 if let Some(obj) = self.labels.as_object() {
182 LABEL_NAMES
183 .iter()
184 .filter(|&label| obj.get(*label).and_then(|v| v.as_bool()).unwrap_or(false))
185 .copied()
186 .collect::<Vec<_>>()
187 .join("\n")
188 } else {
189 String::new()
190 }
191 }
192}
193
194impl ActiveModel {
195 fn from_parsed(entry: &ParsedLogEntry, node: &str) -> Self {
196 let labels_json = to_value(entry.labels).unwrap_or_else(|_| Value::Object(Map::new()));
197
198 let id_value = if let Some(explicit_id) = entry.explicit_id {
199 ActiveValue::Set(explicit_id)
200 } else {
201 ActiveValue::NotSet
202 };
203
204 Self {
205 id: id_value,
206 node: ActiveValue::Set(node.to_string()),
207 timestamp: ActiveValue::Set(entry.timestamp),
208 severity: ActiveValue::Set(entry.severity.to_string()),
209 erlang_pid: ActiveValue::Set(entry.process_id.clone()),
210 subsystem_id: ActiveValue::Set(entry.subsystem_id),
211 message: ActiveValue::Set(entry.message.clone()),
212 labels: ActiveValue::Set(labels_json),
213 resolution_or_discussion_url_id: ActiveValue::Set(
214 entry.resolution_or_discussion_url_id,
215 ),
216 doc_url_id: ActiveValue::Set(entry.doc_url_id),
217 }
218 }
219}
220
221impl Entity {
222 pub async fn count_all(db: &DatabaseConnection) -> Result<u64, DbErr> {
224 Self::find().count(db).await
225 }
226
227 pub async fn query(db: &DatabaseConnection, ctx: &QueryContext) -> Result<Vec<Model>, DbErr> {
232 let mut query = Self::find();
233
234 if let Some(since) = ctx.since_time {
235 query = query.filter(Column::Timestamp.gte(since));
236 }
237
238 if let Some(to) = ctx.to_time {
239 query = query.filter(Column::Timestamp.lte(to));
240 }
241
242 if let Some(ref sev) = ctx.severity {
243 query = query.filter(Column::Severity.eq(sev));
244 }
245
246 if let Some(ref pid) = ctx.erlang_pid {
247 query = query.filter(Column::ErlangPid.eq(pid));
248 }
249
250 if let Some(ref n) = ctx.node {
251 query = query.filter(Column::Node.eq(n));
252 }
253
254 if let Some(ref sub) = ctx.subsystem
255 && let Ok(subsystem) = sub.parse::<crate::entry_metadata::subsystems::Subsystem>()
256 {
257 query = query.filter(Column::SubsystemId.eq(subsystem.to_id()));
258 }
259
260 if !ctx.labels.is_empty() {
261 let mut label_condition = if ctx.matching_all_labels {
262 Condition::all()
263 } else {
264 Condition::any()
265 };
266 for label in &ctx.labels {
267 if LABEL_NAMES.contains(&label.as_str()) {
268 let json_path = format!("$.{}", label);
269 label_condition = label_condition.add(
270 Expr::cust_with_values("json_extract(labels, ?)", [json_path]).eq(true),
271 );
272 }
273 }
274 query = query.filter(label_condition);
275 }
276
277 if ctx.has_resolution_or_discussion_url {
278 query = query.filter(Column::ResolutionOrDiscussionUrlId.is_not_null());
279 }
280
281 if ctx.has_doc_url {
282 query = query.filter(Column::DocUrlId.is_not_null());
283 }
284
285 query = query.order_by_asc(Column::Timestamp);
286
287 let effective_limit = ctx.limit.unwrap_or(DEFAULT_MAX_QUERY_LIMIT);
288 query = query.limit(effective_limit);
289
290 query.all(db).await
291 }
292
293 async fn insert_batch(db: &DatabaseConnection, entries: Vec<ActiveModel>) -> Result<(), DbErr> {
294 if entries.is_empty() {
295 return Ok(());
296 }
297
298 let txn = db.begin().await?;
299 Entity::insert_many(entries).exec(&txn).await?;
300 txn.commit().await?;
301 Ok(())
302 }
303
304 pub async fn insert_parsed_entries(
305 db: &DatabaseConnection,
306 entries: &[ParsedLogEntry],
307 node: &str,
308 ) -> Result<(), DbErr> {
309 let active_models: Vec<ActiveModel> = entries
310 .iter()
311 .map(|entry| ActiveModel::from_parsed(entry, node))
312 .collect();
313 Self::insert_batch(db, active_models).await
314 }
315
316 pub async fn insert_parsed_entries_bulk(
317 db: &DatabaseConnection,
318 entries: &[ParsedLogEntry],
319 node: &str,
320 ) -> Result<(), DbErr> {
321 if entries.is_empty() {
322 return Ok(());
323 }
324
325 let txn = db.begin().await?;
326 for chunk in entries.chunks(DB_INSERT_BATCH_SIZE) {
327 let active_models: Vec<ActiveModel> = chunk
328 .iter()
329 .map(|entry| ActiveModel::from_parsed(entry, node))
330 .collect();
331 Entity::insert_many(active_models).exec(&txn).await?;
332 }
333 txn.commit().await?;
334 Ok(())
335 }
336}