1use facet::Facet;
2use moire_types::ConnectionId;
3use rusqlite::Connection;
4use rusqlite_facet::ConnectionFacetExt;
5
6use crate::db::Db;
7
8const DB_SCHEMA_VERSION: i64 = 6;
9
10#[derive(Facet)]
11struct NoParams;
12
13#[derive(Facet)]
14struct UserVersionRow {
15 user_version: i64,
16}
17
18#[derive(Facet)]
19struct MaxConnIdRow {
20 max_conn_id: i64,
21}
22
23pub fn init_sqlite(db: &Db) -> Result<(), String> {
24 let conn = db.open()?;
25 conn.execute_batch("PRAGMA journal_mode = WAL; PRAGMA synchronous = NORMAL;")
26 .map_err(|error| format!("init sqlite pragmas: {error}"))?;
27
28 let user_version = conn
29 .facet_query_one_ref::<UserVersionRow, _>(
30 "SELECT user_version AS user_version FROM pragma_user_version",
31 &NoParams,
32 )
33 .map_err(|error| format!("read sqlite user_version: {error}"))?
34 .user_version;
35
36 if user_version > DB_SCHEMA_VERSION {
37 return Err(format!(
38 "database schema version {} is newer than supported {}",
39 user_version, DB_SCHEMA_VERSION
40 ));
41 }
42
43 if user_version < DB_SCHEMA_VERSION {
44 reset_managed_schema(&conn)?;
45 conn.pragma_update(None, "user_version", DB_SCHEMA_VERSION)
46 .map_err(|error| format!("set sqlite user_version: {error}"))?;
47 }
48
49 conn.execute_batch(managed_schema_sql())
50 .map_err(|error| format!("ensure schema: {error}"))?;
51 Ok(())
52}
53
54pub fn load_next_connection_id(db: &Db) -> Result<ConnectionId, String> {
55 let conn = db.open()?;
56 let max_conn_id = conn
57 .facet_query_one_ref::<MaxConnIdRow, _>(
58 "SELECT COALESCE(MAX(conn_id), 0) AS max_conn_id FROM connections",
59 &NoParams,
60 )
61 .map_err(|error| format!("read max conn_id: {error}"))?
62 .max_conn_id;
63 if max_conn_id < 0 {
64 return Err(format!(
65 "invariant violated: negative conn_id in storage ({max_conn_id})"
66 ));
67 }
68 let max_conn_id = u64::try_from(max_conn_id)
69 .map_err(|error| format!("convert max conn_id to u64: {error}"))?;
70 let next = max_conn_id
71 .checked_add(1)
72 .ok_or_else(|| String::from("invariant violated: conn_id overflow"))?;
73 Ok(ConnectionId::new(next))
74}
75
76fn reset_managed_schema(conn: &Connection) -> Result<(), String> {
77 conn.execute_batch(
78 "
79 DROP TABLE IF EXISTS events;
80 DROP TABLE IF EXISTS edges;
81 DROP TABLE IF EXISTS entities;
82 DROP TABLE IF EXISTS scopes;
83 DROP TABLE IF EXISTS entity_scope_links;
84 DROP TABLE IF EXISTS delta_batches;
85 DROP TABLE IF EXISTS stream_cursors;
86 DROP TABLE IF EXISTS cut_acks;
87 DROP TABLE IF EXISTS cuts;
88 DROP TABLE IF EXISTS top_application_frames;
89 DROP TABLE IF EXISTS symbolicated_frames;
90 DROP TABLE IF EXISTS symbolication_cache;
91 DROP TABLE IF EXISTS backtrace_frames;
92 DROP TABLE IF EXISTS backtraces;
93 DROP TABLE IF EXISTS connection_modules;
94 DROP TABLE IF EXISTS connections;
95 ",
96 )
97 .map_err(|error| format!("reset schema: {error}"))
98}
99
100fn managed_schema_sql() -> &'static str {
101 "
102 CREATE TABLE IF NOT EXISTS connections (
103 conn_id INTEGER PRIMARY KEY,
104 process_id TEXT NOT NULL,
105 process_name TEXT NOT NULL,
106 pid INTEGER NOT NULL,
107 connected_at_ns INTEGER NOT NULL,
108 disconnected_at_ns INTEGER
109 );
110 CREATE INDEX IF NOT EXISTS idx_connections_process_id
111 ON connections (process_id);
112
113 CREATE TABLE IF NOT EXISTS connection_modules (
114 process_id TEXT NOT NULL,
115 module_id INTEGER NOT NULL,
116 module_index INTEGER NOT NULL,
117 module_path TEXT NOT NULL,
118 module_identity TEXT NOT NULL,
119 arch TEXT NOT NULL,
120 runtime_base INTEGER NOT NULL,
121 PRIMARY KEY (process_id, module_index),
122 UNIQUE (process_id, module_id)
123 );
124
125 CREATE TABLE IF NOT EXISTS backtraces (
126 process_id TEXT NOT NULL,
127 backtrace_id INTEGER NOT NULL,
128 frame_count INTEGER NOT NULL,
129 received_at_ns INTEGER NOT NULL,
130 PRIMARY KEY (backtrace_id)
131 );
132
133 CREATE TABLE IF NOT EXISTS backtrace_frames (
134 process_id TEXT NOT NULL,
135 backtrace_id INTEGER NOT NULL,
136 frame_index INTEGER NOT NULL,
137 module_path TEXT NOT NULL,
138 module_identity TEXT NOT NULL,
139 rel_pc INTEGER NOT NULL,
140 PRIMARY KEY (backtrace_id, frame_index)
141 );
142 CREATE INDEX IF NOT EXISTS idx_backtrace_frames_identity_pc
143 ON backtrace_frames (module_identity, rel_pc);
144
145 CREATE TABLE IF NOT EXISTS symbolication_cache (
146 module_identity TEXT NOT NULL,
147 rel_pc INTEGER NOT NULL,
148 status TEXT NOT NULL CHECK(status IN ('resolved', 'unresolved')),
149 function_name TEXT,
150 crate_name TEXT,
151 crate_module_path TEXT,
152 source_file_path TEXT,
153 source_line INTEGER,
154 source_col INTEGER,
155 unresolved_reason TEXT,
156 updated_at_ns INTEGER NOT NULL,
157 PRIMARY KEY (module_identity, rel_pc)
158 );
159
160 CREATE TABLE IF NOT EXISTS symbolicated_frames (
161 process_id TEXT NOT NULL,
162 backtrace_id INTEGER NOT NULL,
163 frame_index INTEGER NOT NULL,
164 module_path TEXT NOT NULL,
165 rel_pc INTEGER NOT NULL,
166 status TEXT NOT NULL CHECK(status IN ('resolved', 'unresolved')),
167 function_name TEXT,
168 crate_name TEXT,
169 crate_module_path TEXT,
170 source_file_path TEXT,
171 source_line INTEGER,
172 source_col INTEGER,
173 unresolved_reason TEXT,
174 updated_at_ns INTEGER NOT NULL,
175 PRIMARY KEY (backtrace_id, frame_index)
176 );
177 CREATE INDEX IF NOT EXISTS idx_symbolicated_frames_backtrace
178 ON symbolicated_frames (backtrace_id, frame_index);
179
180 CREATE TABLE IF NOT EXISTS top_application_frames (
181 process_id TEXT NOT NULL,
182 backtrace_id INTEGER NOT NULL,
183 frame_index INTEGER NOT NULL,
184 function_name TEXT,
185 crate_name TEXT NOT NULL,
186 crate_module_path TEXT,
187 source_file_path TEXT,
188 source_line INTEGER,
189 source_col INTEGER,
190 updated_at_ns INTEGER NOT NULL,
191 PRIMARY KEY (backtrace_id)
192 );
193
194 CREATE TABLE IF NOT EXISTS cuts (
195 cut_id TEXT PRIMARY KEY,
196 requested_at_ns INTEGER NOT NULL
197 );
198
199 CREATE TABLE IF NOT EXISTS cut_acks (
200 cut_id TEXT NOT NULL,
201 process_id TEXT NOT NULL,
202 next_seq_no INTEGER NOT NULL,
203 received_at_ns INTEGER NOT NULL,
204 PRIMARY KEY (cut_id, process_id)
205 );
206
207 CREATE TABLE IF NOT EXISTS stream_cursors (
208 process_id TEXT NOT NULL,
209 next_seq_no INTEGER NOT NULL,
210 updated_at_ns INTEGER NOT NULL,
211 PRIMARY KEY (process_id)
212 );
213
214 CREATE TABLE IF NOT EXISTS delta_batches (
215 id INTEGER PRIMARY KEY AUTOINCREMENT,
216 process_id TEXT NOT NULL,
217 from_seq_no INTEGER NOT NULL,
218 next_seq_no INTEGER NOT NULL,
219 truncated INTEGER NOT NULL,
220 compacted_before_seq_no INTEGER,
221 change_count INTEGER NOT NULL,
222 payload_json TEXT NOT NULL,
223 received_at_ns INTEGER NOT NULL
224 );
225
226 CREATE TABLE IF NOT EXISTS entities (
227 process_id TEXT NOT NULL,
228 entity_id TEXT NOT NULL,
229 entity_json TEXT NOT NULL,
230 updated_at_ns INTEGER NOT NULL,
231 PRIMARY KEY (entity_id)
232 );
233
234 CREATE TABLE IF NOT EXISTS scopes (
235 process_id TEXT NOT NULL,
236 scope_id TEXT NOT NULL,
237 scope_json TEXT NOT NULL,
238 updated_at_ns INTEGER NOT NULL,
239 PRIMARY KEY (scope_id)
240 );
241
242 CREATE TABLE IF NOT EXISTS entity_scope_links (
243 process_id TEXT NOT NULL,
244 entity_id TEXT NOT NULL,
245 scope_id TEXT NOT NULL,
246 updated_at_ns INTEGER NOT NULL,
247 PRIMARY KEY (entity_id, scope_id)
248 );
249
250 CREATE TABLE IF NOT EXISTS edges (
251 process_id TEXT NOT NULL,
252 src_id TEXT NOT NULL,
253 dst_id TEXT NOT NULL,
254 kind_json TEXT NOT NULL,
255 edge_json TEXT NOT NULL,
256 updated_at_ns INTEGER NOT NULL,
257 PRIMARY KEY (src_id, dst_id, kind_json)
258 );
259
260 CREATE TABLE IF NOT EXISTS events (
261 process_id TEXT NOT NULL,
262 seq_no INTEGER NOT NULL,
263 event_id TEXT NOT NULL,
264 event_json TEXT NOT NULL,
265 at_ms INTEGER NOT NULL,
266 PRIMARY KEY (event_id),
267 UNIQUE (process_id, seq_no)
268 );
269 "
270}