1pub mod file_metadata;
15pub mod node_log_entry;
16pub mod presets;
17
18pub use file_metadata::FileMetadata;
19pub use node_log_entry::{NodeLogEntry, QueryContext};
20pub use presets::QueryPreset;
21
22use duckdb::Error as DuckDbError;
23use r2d2::{Pool, PooledConnection};
24use r2d2_duckdb::DuckDbConnectionManager;
25use std::io::Error as IoError;
26use std::path::Path;
27use std::sync::Arc;
28use std::time::Duration;
29
30mod r2d2_duckdb {
31 use duckdb::{Connection, Error as DuckDbError};
32 use r2d2::ManageConnection;
33 use std::path::PathBuf;
34
35 pub struct DuckDbConnectionManager {
36 path: PathBuf,
37 }
38
39 impl DuckDbConnectionManager {
40 pub fn file(path: PathBuf) -> Self {
41 Self { path }
42 }
43 }
44
45 impl ManageConnection for DuckDbConnectionManager {
46 type Connection = Connection;
47 type Error = DuckDbError;
48
49 fn connect(&self) -> Result<Self::Connection, Self::Error> {
50 Connection::open(&self.path)
51 }
52
53 fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
54 conn.execute_batch("SELECT 1")?;
55 Ok(())
56 }
57
58 fn has_broken(&self, _conn: &mut Self::Connection) -> bool {
59 false
60 }
61 }
62}
63
64pub type DbPool = Pool<DuckDbConnectionManager>;
65pub type DbConnection = PooledConnection<DuckDbConnectionManager>;
66
67#[derive(Clone)]
68pub struct DatabaseConnection {
69 pool: Arc<DbPool>,
70}
71
72impl DatabaseConnection {
73 pub fn get(&self) -> Result<DbConnection, r2d2::Error> {
74 self.pool.get()
75 }
76}
77
78pub fn create_database(db_path: &Path) -> Result<DatabaseConnection, DuckDbError> {
79 create_database_with_options(db_path, false)
80}
81
82pub fn create_database_for_bulk_import(db_path: &Path) -> Result<DatabaseConnection, DuckDbError> {
83 create_database_with_options(db_path, true)
84}
85
86fn create_database_with_options(
87 db_path: &Path,
88 _fast_import: bool,
89) -> Result<DatabaseConnection, DuckDbError> {
90 let manager = DuckDbConnectionManager::file(db_path.to_path_buf());
91 let pool = Pool::builder()
92 .max_size(4)
93 .connection_timeout(Duration::from_secs(1))
94 .build(manager)
95 .map_err(|e| {
96 DuckDbError::ToSqlConversionFailure(Box::new(IoError::other(e.to_string())))
97 })?;
98
99 let conn = pool.get().map_err(|e| {
100 DuckDbError::ToSqlConversionFailure(Box::new(IoError::other(e.to_string())))
101 })?;
102
103 conn.execute_batch(
104 "
105 CREATE TABLE IF NOT EXISTS node_log_entries (
106 id BIGINT PRIMARY KEY,
107 node VARCHAR NOT NULL,
108 timestamp TIMESTAMPTZ NOT NULL,
109 severity VARCHAR NOT NULL,
110 erlang_pid VARCHAR NOT NULL,
111 subsystem_id SMALLINT,
112 message VARCHAR NOT NULL,
113 labels BIGINT NOT NULL DEFAULT 0,
114 resolution_or_discussion_url_id SMALLINT,
115 doc_url_id SMALLINT
116 );
117
118 CREATE TABLE IF NOT EXISTS file_metadata (
119 file_path VARCHAR PRIMARY KEY,
120 rabbitmq_versions VARCHAR NOT NULL DEFAULT '[]',
121 erlang_versions VARCHAR NOT NULL DEFAULT '[]',
122 tls_library VARCHAR,
123 oldest_entry_at TIMESTAMPTZ,
124 most_recent_entry_at TIMESTAMPTZ,
125 total_lines BIGINT NOT NULL,
126 total_entries BIGINT NOT NULL,
127 nodes VARCHAR NOT NULL DEFAULT '[]',
128 subsystems VARCHAR NOT NULL DEFAULT '[]',
129 labels VARCHAR NOT NULL DEFAULT '[]',
130 enabled_plugins VARCHAR NOT NULL DEFAULT '[]'
131 );
132 ",
133 )?;
134
135 Ok(DatabaseConnection {
136 pool: Arc::new(pool),
137 })
138}
139
140pub fn finalize_bulk_import(_db: &DatabaseConnection) -> Result<(), DuckDbError> {
141 Ok(())
142}
143
144pub fn post_insertion_operations(db: &DatabaseConnection) -> Result<(), DuckDbError> {
145 let conn = db.get().map_err(|e| {
146 DuckDbError::ToSqlConversionFailure(Box::new(IoError::other(e.to_string())))
147 })?;
148
149 conn.execute_batch(
150 "
151 CREATE INDEX IF NOT EXISTS idx_node_log_entries_node ON node_log_entries(node);
152 CREATE INDEX IF NOT EXISTS idx_node_log_entries_timestamp ON node_log_entries(timestamp);
153 CREATE INDEX IF NOT EXISTS idx_node_log_entries_severity ON node_log_entries(severity);
154 CREATE INDEX IF NOT EXISTS idx_node_log_entries_erlang_pid ON node_log_entries(erlang_pid);
155 CREATE INDEX IF NOT EXISTS idx_node_log_entries_subsystem_id ON node_log_entries(subsystem_id);
156 CREATE INDEX IF NOT EXISTS idx_node_timestamp ON node_log_entries(node, timestamp);
157 CREATE INDEX IF NOT EXISTS idx_timestamp_severity ON node_log_entries(timestamp, severity);
158 CREATE INDEX IF NOT EXISTS idx_timestamp_subsystem_id ON node_log_entries(timestamp, subsystem_id);
159 CREATE INDEX IF NOT EXISTS idx_resolution_or_discussion_url_id ON node_log_entries(resolution_or_discussion_url_id);
160 CREATE INDEX IF NOT EXISTS idx_doc_url_id ON node_log_entries(doc_url_id);
161 CREATE INDEX IF NOT EXISTS idx_timestamp_doc_url_id ON node_log_entries(timestamp, doc_url_id);
162 CREATE INDEX IF NOT EXISTS idx_timestamp_resolution_url_id ON node_log_entries(timestamp, resolution_or_discussion_url_id);
163 CREATE INDEX IF NOT EXISTS idx_severity_doc_url_id ON node_log_entries(severity, doc_url_id);
164 CREATE INDEX IF NOT EXISTS idx_node_timestamp_doc_url_id ON node_log_entries(node, timestamp, doc_url_id);
165 ",
166 )?;
167
168 Ok(())
169}
170
171pub fn open_database(db_path: &Path) -> Result<DatabaseConnection, DuckDbError> {
172 let manager = DuckDbConnectionManager::file(db_path.to_path_buf());
173 let pool = Pool::builder()
174 .max_size(4)
175 .connection_timeout(Duration::from_secs(1))
176 .build(manager)
177 .map_err(|e| {
178 DuckDbError::ToSqlConversionFailure(Box::new(IoError::other(e.to_string())))
179 })?;
180
181 Ok(DatabaseConnection {
182 pool: Arc::new(pool),
183 })
184}