Skip to main content

rlqt_lib/rel_db/
mod.rs

1// Copyright (C) 2025-2026 Michael S. Klishin and Contributors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14pub mod file_metadata;
15pub mod node_log_entry;
16pub mod presets;
17
18pub use file_metadata::FileMetadata;
19pub use node_log_entry::{NodeLogEntry, QueryContext};
20pub use presets::QueryPreset;
21
22use duckdb::Error as DuckDbError;
23use r2d2::{Pool, PooledConnection};
24use r2d2_duckdb::DuckDbConnectionManager;
25use std::io::Error as IoError;
26use std::path::Path;
27use std::sync::Arc;
28use std::time::Duration;
29
30mod r2d2_duckdb {
31    use duckdb::{Connection, Error as DuckDbError};
32    use r2d2::ManageConnection;
33    use std::path::PathBuf;
34
35    pub struct DuckDbConnectionManager {
36        path: PathBuf,
37    }
38
39    impl DuckDbConnectionManager {
40        pub fn file(path: PathBuf) -> Self {
41            Self { path }
42        }
43    }
44
45    impl ManageConnection for DuckDbConnectionManager {
46        type Connection = Connection;
47        type Error = DuckDbError;
48
49        fn connect(&self) -> Result<Self::Connection, Self::Error> {
50            Connection::open(&self.path)
51        }
52
53        fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
54            conn.execute_batch("SELECT 1")?;
55            Ok(())
56        }
57
58        fn has_broken(&self, _conn: &mut Self::Connection) -> bool {
59            false
60        }
61    }
62}
63
64pub type DbPool = Pool<DuckDbConnectionManager>;
65pub type DbConnection = PooledConnection<DuckDbConnectionManager>;
66
67#[derive(Clone)]
68pub struct DatabaseConnection {
69    pool: Arc<DbPool>,
70}
71
72impl DatabaseConnection {
73    pub fn get(&self) -> Result<DbConnection, r2d2::Error> {
74        self.pool.get()
75    }
76}
77
78pub fn create_database(db_path: &Path) -> Result<DatabaseConnection, DuckDbError> {
79    create_database_with_options(db_path, false)
80}
81
82pub fn create_database_for_bulk_import(db_path: &Path) -> Result<DatabaseConnection, DuckDbError> {
83    create_database_with_options(db_path, true)
84}
85
86fn create_database_with_options(
87    db_path: &Path,
88    _fast_import: bool,
89) -> Result<DatabaseConnection, DuckDbError> {
90    let manager = DuckDbConnectionManager::file(db_path.to_path_buf());
91    let pool = Pool::builder()
92        .max_size(4)
93        .connection_timeout(Duration::from_secs(1))
94        .build(manager)
95        .map_err(|e| {
96            DuckDbError::ToSqlConversionFailure(Box::new(IoError::other(e.to_string())))
97        })?;
98
99    let conn = pool.get().map_err(|e| {
100        DuckDbError::ToSqlConversionFailure(Box::new(IoError::other(e.to_string())))
101    })?;
102
103    conn.execute_batch(
104        "
105        CREATE TABLE IF NOT EXISTS node_log_entries (
106            id BIGINT PRIMARY KEY,
107            node VARCHAR NOT NULL,
108            timestamp TIMESTAMPTZ NOT NULL,
109            severity VARCHAR NOT NULL,
110            erlang_pid VARCHAR NOT NULL,
111            subsystem_id SMALLINT,
112            message VARCHAR NOT NULL,
113            labels BIGINT NOT NULL DEFAULT 0,
114            resolution_or_discussion_url_id SMALLINT,
115            doc_url_id SMALLINT
116        );
117
118        CREATE TABLE IF NOT EXISTS file_metadata (
119            file_path VARCHAR PRIMARY KEY,
120            rabbitmq_versions VARCHAR NOT NULL DEFAULT '[]',
121            erlang_versions VARCHAR NOT NULL DEFAULT '[]',
122            tls_library VARCHAR,
123            oldest_entry_at TIMESTAMPTZ,
124            most_recent_entry_at TIMESTAMPTZ,
125            total_lines BIGINT NOT NULL,
126            total_entries BIGINT NOT NULL,
127            nodes VARCHAR NOT NULL DEFAULT '[]',
128            subsystems VARCHAR NOT NULL DEFAULT '[]',
129            labels VARCHAR NOT NULL DEFAULT '[]',
130            enabled_plugins VARCHAR NOT NULL DEFAULT '[]'
131        );
132        ",
133    )?;
134
135    Ok(DatabaseConnection {
136        pool: Arc::new(pool),
137    })
138}
139
140pub fn finalize_bulk_import(_db: &DatabaseConnection) -> Result<(), DuckDbError> {
141    Ok(())
142}
143
144pub fn post_insertion_operations(db: &DatabaseConnection) -> Result<(), DuckDbError> {
145    let conn = db.get().map_err(|e| {
146        DuckDbError::ToSqlConversionFailure(Box::new(IoError::other(e.to_string())))
147    })?;
148
149    conn.execute_batch(
150        "
151        CREATE INDEX IF NOT EXISTS idx_node_log_entries_node ON node_log_entries(node);
152        CREATE INDEX IF NOT EXISTS idx_node_log_entries_timestamp ON node_log_entries(timestamp);
153        CREATE INDEX IF NOT EXISTS idx_node_log_entries_severity ON node_log_entries(severity);
154        CREATE INDEX IF NOT EXISTS idx_node_log_entries_erlang_pid ON node_log_entries(erlang_pid);
155        CREATE INDEX IF NOT EXISTS idx_node_log_entries_subsystem_id ON node_log_entries(subsystem_id);
156        CREATE INDEX IF NOT EXISTS idx_node_timestamp ON node_log_entries(node, timestamp);
157        CREATE INDEX IF NOT EXISTS idx_timestamp_severity ON node_log_entries(timestamp, severity);
158        CREATE INDEX IF NOT EXISTS idx_timestamp_subsystem_id ON node_log_entries(timestamp, subsystem_id);
159        CREATE INDEX IF NOT EXISTS idx_resolution_or_discussion_url_id ON node_log_entries(resolution_or_discussion_url_id);
160        CREATE INDEX IF NOT EXISTS idx_doc_url_id ON node_log_entries(doc_url_id);
161        CREATE INDEX IF NOT EXISTS idx_timestamp_doc_url_id ON node_log_entries(timestamp, doc_url_id);
162        CREATE INDEX IF NOT EXISTS idx_timestamp_resolution_url_id ON node_log_entries(timestamp, resolution_or_discussion_url_id);
163        CREATE INDEX IF NOT EXISTS idx_severity_doc_url_id ON node_log_entries(severity, doc_url_id);
164        CREATE INDEX IF NOT EXISTS idx_node_timestamp_doc_url_id ON node_log_entries(node, timestamp, doc_url_id);
165        ",
166    )?;
167
168    Ok(())
169}
170
171pub fn open_database(db_path: &Path) -> Result<DatabaseConnection, DuckDbError> {
172    let manager = DuckDbConnectionManager::file(db_path.to_path_buf());
173    let pool = Pool::builder()
174        .max_size(4)
175        .connection_timeout(Duration::from_secs(1))
176        .build(manager)
177        .map_err(|e| {
178            DuckDbError::ToSqlConversionFailure(Box::new(IoError::other(e.to_string())))
179        })?;
180
181    Ok(DatabaseConnection {
182        pool: Arc::new(pool),
183    })
184}