Skip to main content

moire_web/db/
schema.rs

1use facet::Facet;
2use moire_types::ConnectionId;
3use rusqlite::Connection;
4use rusqlite_facet::ConnectionFacetExt;
5
6use crate::db::Db;
7
8const DB_SCHEMA_VERSION: i64 = 6;
9
10#[derive(Facet)]
11struct NoParams;
12
13#[derive(Facet)]
14struct UserVersionRow {
15    user_version: i64,
16}
17
18#[derive(Facet)]
19struct MaxConnIdRow {
20    max_conn_id: i64,
21}
22
23pub fn init_sqlite(db: &Db) -> Result<(), String> {
24    let conn = db.open()?;
25    conn.execute_batch("PRAGMA journal_mode = WAL; PRAGMA synchronous = NORMAL;")
26        .map_err(|error| format!("init sqlite pragmas: {error}"))?;
27
28    let user_version = conn
29        .facet_query_one_ref::<UserVersionRow, _>(
30            "SELECT user_version AS user_version FROM pragma_user_version",
31            &NoParams,
32        )
33        .map_err(|error| format!("read sqlite user_version: {error}"))?
34        .user_version;
35
36    if user_version > DB_SCHEMA_VERSION {
37        return Err(format!(
38            "database schema version {} is newer than supported {}",
39            user_version, DB_SCHEMA_VERSION
40        ));
41    }
42
43    if user_version < DB_SCHEMA_VERSION {
44        reset_managed_schema(&conn)?;
45        conn.pragma_update(None, "user_version", DB_SCHEMA_VERSION)
46            .map_err(|error| format!("set sqlite user_version: {error}"))?;
47    }
48
49    conn.execute_batch(managed_schema_sql())
50        .map_err(|error| format!("ensure schema: {error}"))?;
51    Ok(())
52}
53
54pub fn load_next_connection_id(db: &Db) -> Result<ConnectionId, String> {
55    let conn = db.open()?;
56    let max_conn_id = conn
57        .facet_query_one_ref::<MaxConnIdRow, _>(
58            "SELECT COALESCE(MAX(conn_id), 0) AS max_conn_id FROM connections",
59            &NoParams,
60        )
61        .map_err(|error| format!("read max conn_id: {error}"))?
62        .max_conn_id;
63    if max_conn_id < 0 {
64        return Err(format!(
65            "invariant violated: negative conn_id in storage ({max_conn_id})"
66        ));
67    }
68    let max_conn_id = u64::try_from(max_conn_id)
69        .map_err(|error| format!("convert max conn_id to u64: {error}"))?;
70    let next = max_conn_id
71        .checked_add(1)
72        .ok_or_else(|| String::from("invariant violated: conn_id overflow"))?;
73    Ok(ConnectionId::new(next))
74}
75
76fn reset_managed_schema(conn: &Connection) -> Result<(), String> {
77    conn.execute_batch(
78        "
79        DROP TABLE IF EXISTS events;
80        DROP TABLE IF EXISTS edges;
81        DROP TABLE IF EXISTS entities;
82        DROP TABLE IF EXISTS scopes;
83        DROP TABLE IF EXISTS entity_scope_links;
84        DROP TABLE IF EXISTS delta_batches;
85        DROP TABLE IF EXISTS stream_cursors;
86        DROP TABLE IF EXISTS cut_acks;
87        DROP TABLE IF EXISTS cuts;
88        DROP TABLE IF EXISTS top_application_frames;
89        DROP TABLE IF EXISTS symbolicated_frames;
90        DROP TABLE IF EXISTS symbolication_cache;
91        DROP TABLE IF EXISTS backtrace_frames;
92        DROP TABLE IF EXISTS backtraces;
93        DROP TABLE IF EXISTS connection_modules;
94        DROP TABLE IF EXISTS connections;
95        ",
96    )
97    .map_err(|error| format!("reset schema: {error}"))
98}
99
100fn managed_schema_sql() -> &'static str {
101    "
102    CREATE TABLE IF NOT EXISTS connections (
103        conn_id INTEGER PRIMARY KEY,
104        process_id TEXT NOT NULL,
105        process_name TEXT NOT NULL,
106        pid INTEGER NOT NULL,
107        connected_at_ns INTEGER NOT NULL,
108        disconnected_at_ns INTEGER
109    );
110    CREATE INDEX IF NOT EXISTS idx_connections_process_id
111        ON connections (process_id);
112
113    CREATE TABLE IF NOT EXISTS connection_modules (
114        process_id TEXT NOT NULL,
115        module_id INTEGER NOT NULL,
116        module_index INTEGER NOT NULL,
117        module_path TEXT NOT NULL,
118        module_identity TEXT NOT NULL,
119        arch TEXT NOT NULL,
120        runtime_base INTEGER NOT NULL,
121        PRIMARY KEY (process_id, module_index),
122        UNIQUE (process_id, module_id)
123    );
124
125    CREATE TABLE IF NOT EXISTS backtraces (
126        process_id TEXT NOT NULL,
127        backtrace_id INTEGER NOT NULL,
128        frame_count INTEGER NOT NULL,
129        received_at_ns INTEGER NOT NULL,
130        PRIMARY KEY (backtrace_id)
131    );
132
133    CREATE TABLE IF NOT EXISTS backtrace_frames (
134        process_id TEXT NOT NULL,
135        backtrace_id INTEGER NOT NULL,
136        frame_index INTEGER NOT NULL,
137        module_path TEXT NOT NULL,
138        module_identity TEXT NOT NULL,
139        rel_pc INTEGER NOT NULL,
140        PRIMARY KEY (backtrace_id, frame_index)
141    );
142    CREATE INDEX IF NOT EXISTS idx_backtrace_frames_identity_pc
143        ON backtrace_frames (module_identity, rel_pc);
144
145    CREATE TABLE IF NOT EXISTS symbolication_cache (
146        module_identity TEXT NOT NULL,
147        rel_pc INTEGER NOT NULL,
148        status TEXT NOT NULL CHECK(status IN ('resolved', 'unresolved')),
149        function_name TEXT,
150        crate_name TEXT,
151        crate_module_path TEXT,
152        source_file_path TEXT,
153        source_line INTEGER,
154        source_col INTEGER,
155        unresolved_reason TEXT,
156        updated_at_ns INTEGER NOT NULL,
157        PRIMARY KEY (module_identity, rel_pc)
158    );
159
160    CREATE TABLE IF NOT EXISTS symbolicated_frames (
161        process_id TEXT NOT NULL,
162        backtrace_id INTEGER NOT NULL,
163        frame_index INTEGER NOT NULL,
164        module_path TEXT NOT NULL,
165        rel_pc INTEGER NOT NULL,
166        status TEXT NOT NULL CHECK(status IN ('resolved', 'unresolved')),
167        function_name TEXT,
168        crate_name TEXT,
169        crate_module_path TEXT,
170        source_file_path TEXT,
171        source_line INTEGER,
172        source_col INTEGER,
173        unresolved_reason TEXT,
174        updated_at_ns INTEGER NOT NULL,
175        PRIMARY KEY (backtrace_id, frame_index)
176    );
177    CREATE INDEX IF NOT EXISTS idx_symbolicated_frames_backtrace
178        ON symbolicated_frames (backtrace_id, frame_index);
179
180    CREATE TABLE IF NOT EXISTS top_application_frames (
181        process_id TEXT NOT NULL,
182        backtrace_id INTEGER NOT NULL,
183        frame_index INTEGER NOT NULL,
184        function_name TEXT,
185        crate_name TEXT NOT NULL,
186        crate_module_path TEXT,
187        source_file_path TEXT,
188        source_line INTEGER,
189        source_col INTEGER,
190        updated_at_ns INTEGER NOT NULL,
191        PRIMARY KEY (backtrace_id)
192    );
193
194    CREATE TABLE IF NOT EXISTS cuts (
195        cut_id TEXT PRIMARY KEY,
196        requested_at_ns INTEGER NOT NULL
197    );
198
199    CREATE TABLE IF NOT EXISTS cut_acks (
200        cut_id TEXT NOT NULL,
201        process_id TEXT NOT NULL,
202        next_seq_no INTEGER NOT NULL,
203        received_at_ns INTEGER NOT NULL,
204        PRIMARY KEY (cut_id, process_id)
205    );
206
207    CREATE TABLE IF NOT EXISTS stream_cursors (
208        process_id TEXT NOT NULL,
209        next_seq_no INTEGER NOT NULL,
210        updated_at_ns INTEGER NOT NULL,
211        PRIMARY KEY (process_id)
212    );
213
214    CREATE TABLE IF NOT EXISTS delta_batches (
215        id INTEGER PRIMARY KEY AUTOINCREMENT,
216        process_id TEXT NOT NULL,
217        from_seq_no INTEGER NOT NULL,
218        next_seq_no INTEGER NOT NULL,
219        truncated INTEGER NOT NULL,
220        compacted_before_seq_no INTEGER,
221        change_count INTEGER NOT NULL,
222        payload_json TEXT NOT NULL,
223        received_at_ns INTEGER NOT NULL
224    );
225
226    CREATE TABLE IF NOT EXISTS entities (
227        process_id TEXT NOT NULL,
228        entity_id TEXT NOT NULL,
229        entity_json TEXT NOT NULL,
230        updated_at_ns INTEGER NOT NULL,
231        PRIMARY KEY (entity_id)
232    );
233
234    CREATE TABLE IF NOT EXISTS scopes (
235        process_id TEXT NOT NULL,
236        scope_id TEXT NOT NULL,
237        scope_json TEXT NOT NULL,
238        updated_at_ns INTEGER NOT NULL,
239        PRIMARY KEY (scope_id)
240    );
241
242    CREATE TABLE IF NOT EXISTS entity_scope_links (
243        process_id TEXT NOT NULL,
244        entity_id TEXT NOT NULL,
245        scope_id TEXT NOT NULL,
246        updated_at_ns INTEGER NOT NULL,
247        PRIMARY KEY (entity_id, scope_id)
248    );
249
250    CREATE TABLE IF NOT EXISTS edges (
251        process_id TEXT NOT NULL,
252        src_id TEXT NOT NULL,
253        dst_id TEXT NOT NULL,
254        kind_json TEXT NOT NULL,
255        edge_json TEXT NOT NULL,
256        updated_at_ns INTEGER NOT NULL,
257        PRIMARY KEY (src_id, dst_id, kind_json)
258    );
259
260    CREATE TABLE IF NOT EXISTS events (
261        process_id TEXT NOT NULL,
262        seq_no INTEGER NOT NULL,
263        event_id TEXT NOT NULL,
264        event_json TEXT NOT NULL,
265        at_ms INTEGER NOT NULL,
266        PRIMARY KEY (event_id),
267        UNIQUE (process_id, seq_no)
268    );
269    "
270}