Skip to main content

rustvello_sqlite/
db.rs

1use rusqlite::Connection;
2use std::path::{Path, PathBuf};
3use std::sync::Mutex;
4
5use rustvello_core::error::{RustvelloError, RustvelloResult};
6
7/// Shared SQLite database connection wrapper with schema initialization.
8///
9/// Data is isolated by `app_id`: for file-based databases the effective
10/// path is `{stem}_{app_id}{ext}` (e.g. `data.db` with app_id `"a1"`
11/// opens `data_a1.db`). In-memory databases are inherently isolated
12/// because each instance gets its own private database.
13pub struct Database {
14    pub(crate) conn: Mutex<Connection>,
15}
16
17/// Compute the effective file path by injecting `app_id` before the extension.
18///
19/// `"path/data.db"` + `"my_app"` → `"path/data_my_app.db"`
20fn effective_path(base: impl AsRef<Path>, app_id: &str) -> PathBuf {
21    let base = base.as_ref();
22    let stem = base.file_stem().unwrap_or_default().to_string_lossy();
23    let ext = base
24        .extension()
25        .map(|e| format!(".{}", e.to_string_lossy()))
26        .unwrap_or_default();
27    let parent = base.parent().unwrap_or_else(|| Path::new(""));
28    parent.join(format!("{stem}_{app_id}{ext}"))
29}
30
31impl Database {
32    /// Open or create a SQLite database at the given path.
33    ///
34    /// The `app_id` is embedded in the filename so that different
35    /// applications using the same base path get separate database files.
36    pub fn open(path: impl AsRef<Path>, app_id: &str) -> RustvelloResult<Self> {
37        let actual = effective_path(path, app_id);
38        let conn = Connection::open(&actual).map_err(|e| {
39            RustvelloError::state_backend(format!("failed to open SQLite database: {}", e))
40        })?;
41        let db = Self {
42            conn: Mutex::new(conn),
43        };
44        db.initialize_schema()?;
45        Ok(db)
46    }
47
48    /// Create an in-memory SQLite database (for testing).
49    ///
50    /// In-memory databases are inherently isolated (each `Connection` owns
51    /// a private database), so `app_id` is accepted for API consistency
52    /// but does not affect behavior.
53    pub fn in_memory() -> RustvelloResult<Self> {
54        let conn = Connection::open_in_memory().map_err(|e| {
55            RustvelloError::state_backend(format!("failed to open in-memory SQLite: {}", e))
56        })?;
57        let db = Self {
58            conn: Mutex::new(conn),
59        };
60        db.initialize_schema()?;
61        Ok(db)
62    }
63
64    fn initialize_schema(&self) -> RustvelloResult<()> {
65        let conn = self
66            .conn
67            .lock()
68            .map_err(|e| RustvelloError::state_backend(format!("lock poisoned: {}", e)))?;
69
70        // PRAGMAs must be set BEFORE schema creation so that journal_mode and
71        // synchronous settings are active for the DDL statements.
72        //
73        // Durability trade-offs:
74        //  - `journal_mode=WAL`: enables concurrent readers & writers; produces
75        //    `-wal` / `-shm` side-car files alongside the `.db` file.
76        //  - `synchronous=NORMAL`: in WAL mode this means the WAL is fsynced on
77        //    checkpoints but not on every commit, so the last committed transaction
78        //    could be lost on a sudden power failure. For a task queue where
79        //    idempotent retry is the recovery mechanism this is an acceptable
80        //    trade-off (FULL sync is ~2-3× slower). Override by setting the PRAGMA
81        //    manually after `Database::open()` if stricter durability is required.
82        conn.pragma_update(None, "journal_mode", "WAL")
83            .map_err(|e| {
84                RustvelloError::state_backend(format!("PRAGMA journal_mode failed: {}", e))
85            })?;
86        conn.pragma_update(None, "synchronous", "NORMAL")
87            .map_err(|e| {
88                RustvelloError::state_backend(format!("PRAGMA synchronous failed: {}", e))
89            })?;
90
91        conn.execute_batch(
92            "
93            -- Broker queue
94            CREATE TABLE IF NOT EXISTS broker_queue (
95                id INTEGER PRIMARY KEY AUTOINCREMENT,
96                invocation_id TEXT NOT NULL,
97                created_at TEXT NOT NULL DEFAULT (datetime('now'))
98            );
99            CREATE INDEX IF NOT EXISTS idx_broker_queue_created
100                ON broker_queue(created_at);
101
102            -- Invocations
103            CREATE TABLE IF NOT EXISTS invocations (
104                invocation_id TEXT PRIMARY KEY,
105                task_id TEXT NOT NULL,
106                call_id TEXT NOT NULL,
107                status TEXT NOT NULL,
108                created_at TEXT NOT NULL,
109                updated_at TEXT NOT NULL,
110                parent_invocation_id TEXT,
111                workflow_id TEXT,
112                workflow_type TEXT,
113                workflow_depth INTEGER
114            );
115            CREATE INDEX IF NOT EXISTS idx_invocations_task
116                ON invocations(task_id);
117            CREATE INDEX IF NOT EXISTS idx_invocations_call
118                ON invocations(call_id);
119            CREATE INDEX IF NOT EXISTS idx_invocations_status
120                ON invocations(status);
121            CREATE INDEX IF NOT EXISTS idx_invocations_workflow
122                ON invocations(workflow_id);
123            CREATE INDEX IF NOT EXISTS idx_invocations_parent
124                ON invocations(parent_invocation_id);
125
126            -- Calls (arguments)
127            CREATE TABLE IF NOT EXISTS calls (
128                call_id TEXT PRIMARY KEY,
129                task_id TEXT NOT NULL,
130                serialized_arguments TEXT NOT NULL
131            );
132
133            -- Results
134            CREATE TABLE IF NOT EXISTS results (
135                invocation_id TEXT PRIMARY KEY,
136                result TEXT NOT NULL
137            );
138
139            -- Errors
140            CREATE TABLE IF NOT EXISTS errors (
141                invocation_id TEXT PRIMARY KEY,
142                error_type TEXT NOT NULL,
143                message TEXT NOT NULL,
144                traceback TEXT
145            );
146
147            -- Status history
148            CREATE TABLE IF NOT EXISTS history (
149                id INTEGER PRIMARY KEY AUTOINCREMENT,
150                invocation_id TEXT NOT NULL,
151                status TEXT NOT NULL,
152                runner_id TEXT,
153                timestamp TEXT NOT NULL,
154                message TEXT,
155                history_timestamp TEXT
156            );
157            CREATE INDEX IF NOT EXISTS idx_history_invocation
158                ON history(invocation_id);
159
160            -- Status records (current status with runner ownership)
161            CREATE TABLE IF NOT EXISTS status_records (
162                invocation_id TEXT PRIMARY KEY,
163                status TEXT NOT NULL,
164                runner_id TEXT,
165                timestamp TEXT NOT NULL
166            );
167
168            -- Waiting-for relationships (blocking control)
169            CREATE TABLE IF NOT EXISTS waiting_for (
170                waiter_id TEXT NOT NULL,
171                waited_on_id TEXT NOT NULL,
172                PRIMARY KEY (waiter_id, waited_on_id)
173            );
174            CREATE INDEX IF NOT EXISTS idx_waiting_for_waited_on
175                ON waiting_for(waited_on_id);
176
177            -- Concurrency control: per-argument-pair index
178            CREATE TABLE IF NOT EXISTS cc_arg_pairs (
179                invocation_id TEXT NOT NULL,
180                task_id TEXT NOT NULL,
181                arg_key TEXT NOT NULL,
182                arg_value TEXT NOT NULL,
183                PRIMARY KEY (invocation_id, arg_key, arg_value)
184            );
185            CREATE INDEX IF NOT EXISTS idx_cc_arg_lookup
186                ON cc_arg_pairs(task_id, arg_key, arg_value);
187
188            -- Client data store
189            CREATE TABLE IF NOT EXISTS client_data (
190                data_key TEXT PRIMARY KEY,
191                data_value TEXT NOT NULL,
192                created_at TEXT NOT NULL DEFAULT (datetime('now'))
193            );
194
195            -- Runner heartbeats
196            CREATE TABLE IF NOT EXISTS runner_heartbeats (
197                runner_id TEXT PRIMARY KEY,
198                creation_time TEXT NOT NULL,
199                last_heartbeat TEXT NOT NULL,
200                can_run_atomic_service INTEGER NOT NULL DEFAULT 0,
201                last_service_start TEXT,
202                last_service_end TEXT
203            );
204
205            -- Invocation retry counters
206            CREATE TABLE IF NOT EXISTS retries (
207                invocation_id TEXT PRIMARY KEY,
208                retry_count INTEGER NOT NULL DEFAULT 0
209            );
210
211            -- Trigger conditions
212            CREATE TABLE IF NOT EXISTS trg_conditions (
213                condition_id TEXT PRIMARY KEY,
214                condition_type TEXT NOT NULL DEFAULT '',
215                event_code TEXT,
216                condition_json TEXT NOT NULL
217            );
218            CREATE INDEX IF NOT EXISTS idx_trg_cond_type
219                ON trg_conditions(condition_type);
220            CREATE INDEX IF NOT EXISTS idx_trg_cond_event
221                ON trg_conditions(condition_type, event_code);
222
223            -- Trigger definitions
224            CREATE TABLE IF NOT EXISTS trg_triggers (
225                trigger_id TEXT PRIMARY KEY,
226                task_id TEXT NOT NULL,
227                logic TEXT NOT NULL,
228                argument_template TEXT
229            );
230
231            -- Condition-to-trigger mapping (many-to-many)
232            CREATE TABLE IF NOT EXISTS trg_condition_triggers (
233                condition_id TEXT NOT NULL,
234                trigger_id TEXT NOT NULL,
235                PRIMARY KEY (condition_id, trigger_id)
236            );
237            CREATE INDEX IF NOT EXISTS idx_trg_ct_trigger
238                ON trg_condition_triggers(trigger_id);
239
240            -- Valid conditions (pending evaluation)
241            CREATE TABLE IF NOT EXISTS trg_valid_conditions (
242                valid_condition_id TEXT PRIMARY KEY,
243                condition_id TEXT NOT NULL,
244                context_json TEXT NOT NULL
245            );
246            CREATE INDEX IF NOT EXISTS idx_trg_vc_condition
247                ON trg_valid_conditions(condition_id);
248
249            -- Source task → condition mapping (for fast lookup)
250            CREATE TABLE IF NOT EXISTS trg_source_task_conditions (
251                task_id TEXT NOT NULL,
252                condition_id TEXT NOT NULL,
253                PRIMARY KEY (task_id, condition_id)
254            );
255
256            -- Cron execution tracking
257            CREATE TABLE IF NOT EXISTS trg_cron_executions (
258                condition_id TEXT PRIMARY KEY,
259                last_execution TEXT NOT NULL
260            );
261
262            -- Trigger run claims (dedup)
263            CREATE TABLE IF NOT EXISTS trg_trigger_run_claims (
264                trigger_run_id TEXT PRIMARY KEY,
265                claimed_at TEXT NOT NULL
266            );
267
268            -- Workflow runs (discovery + tracking)
269            CREATE TABLE IF NOT EXISTS workflow_runs (
270                workflow_id TEXT PRIMARY KEY,
271                workflow_type TEXT NOT NULL,
272                parent_workflow_id TEXT,
273                depth INTEGER NOT NULL DEFAULT 0
274            );
275            CREATE INDEX IF NOT EXISTS idx_workflow_runs_type
276                ON workflow_runs(workflow_type);
277
278            -- Workflow key-value data store
279            CREATE TABLE IF NOT EXISTS workflow_data (
280                workflow_id TEXT NOT NULL,
281                data_key TEXT NOT NULL,
282                data_value TEXT NOT NULL,
283                PRIMARY KEY (workflow_id, data_key)
284            );
285
286            -- App info storage (opaque JSON per app_id)
287            CREATE TABLE IF NOT EXISTS app_infos (
288                app_id TEXT PRIMARY KEY,
289                info_json TEXT NOT NULL
290            );
291
292            -- Workflow sub-invocation tracking
293            CREATE TABLE IF NOT EXISTS workflow_sub_invocations (
294                workflow_id TEXT NOT NULL,
295                sub_invocation_id TEXT NOT NULL,
296                PRIMARY KEY (workflow_id, sub_invocation_id)
297            );
298            CREATE INDEX IF NOT EXISTS idx_wf_sub_inv_workflow
299                ON workflow_sub_invocations(workflow_id);
300
301            -- Runner execution contexts
302            CREATE TABLE IF NOT EXISTS runner_contexts (
303                runner_id TEXT PRIMARY KEY,
304                runner_cls TEXT NOT NULL,
305                pid INTEGER NOT NULL,
306                hostname TEXT NOT NULL,
307                thread_id INTEGER NOT NULL,
308                started_at TEXT NOT NULL,
309                parent_runner_id TEXT,
310                parent_runner_cls TEXT
311            );
312            CREATE INDEX IF NOT EXISTS idx_runner_contexts_parent
313                ON runner_contexts(parent_runner_id);
314
315            -- Auto-purge schedule
316            CREATE TABLE IF NOT EXISTS auto_purge_schedule (
317                invocation_id TEXT PRIMARY KEY,
318                scheduled_at TEXT NOT NULL
319            );
320
321            -- Index for time-range queries on history (COALESCE for history_timestamp fallback)
322            CREATE INDEX IF NOT EXISTS idx_history_timestamp
323                ON history(COALESCE(history_timestamp, timestamp));
324            ",
325        )
326        .map_err(|e| RustvelloError::state_backend(format!("schema init failed: {}", e)))?;
327
328        // Migration: add event_code column for existing databases that lack it.
329        // ALTER TABLE … ADD COLUMN is a no-op if the column already exists in
330        // SQLite (returns error which we ignore).
331        let _ = conn.execute_batch("ALTER TABLE trg_conditions ADD COLUMN event_code TEXT");
332
333        Ok(())
334    }
335}
336
337pub(crate) fn lock_err(e: impl std::fmt::Display) -> RustvelloError {
338    RustvelloError::state_backend(format!("lock poisoned: {}", e))
339}
340
341pub(crate) fn sql_err(e: rusqlite::Error) -> RustvelloError {
342    RustvelloError::state_backend(format!("SQLite error: {}", e))
343}
344
345/// Run a blocking closure on the tokio blocking thread pool.
346///
347/// All SQLite operations must go through this helper to avoid blocking the
348/// tokio worker threads (see NB_10 Finding 3).
349pub(crate) async fn blocking<F, T>(f: F) -> RustvelloResult<T>
350where
351    F: FnOnce() -> RustvelloResult<T> + Send + 'static,
352    T: Send + 'static,
353{
354    tokio::task::spawn_blocking(f)
355        .await
356        .map_err(|e| RustvelloError::Internal {
357            message: format!("spawn_blocking join error: {e}"),
358        })?
359}
360
361pub(crate) fn parse_status(s: &str) -> RustvelloResult<rustvello_proto::status::InvocationStatus> {
362    s.parse::<rustvello_proto::status::InvocationStatus>()
363        .map_err(RustvelloError::state_backend)
364}
365
366pub(crate) fn parse_timestamp(s: &str) -> RustvelloResult<chrono::DateTime<chrono::Utc>> {
367    chrono::DateTime::parse_from_rfc3339(s)
368        .map(|dt| dt.with_timezone(&chrono::Utc))
369        .map_err(|e| RustvelloError::state_backend(format!("invalid timestamp in database: {e}")))
370}