1use rusqlite::Connection;
2use std::path::{Path, PathBuf};
3use std::sync::Mutex;
4
5use rustvello_core::error::{RustvelloError, RustvelloResult};
6
7pub struct Database {
14 pub(crate) conn: Mutex<Connection>,
15}
16
17fn effective_path(base: impl AsRef<Path>, app_id: &str) -> PathBuf {
21 let base = base.as_ref();
22 let stem = base.file_stem().unwrap_or_default().to_string_lossy();
23 let ext = base
24 .extension()
25 .map(|e| format!(".{}", e.to_string_lossy()))
26 .unwrap_or_default();
27 let parent = base.parent().unwrap_or_else(|| Path::new(""));
28 parent.join(format!("{stem}_{app_id}{ext}"))
29}
30
31impl Database {
32 pub fn open(path: impl AsRef<Path>, app_id: &str) -> RustvelloResult<Self> {
37 let actual = effective_path(path, app_id);
38 let conn = Connection::open(&actual).map_err(|e| {
39 RustvelloError::state_backend(format!("failed to open SQLite database: {}", e))
40 })?;
41 let db = Self {
42 conn: Mutex::new(conn),
43 };
44 db.initialize_schema()?;
45 Ok(db)
46 }
47
48 pub fn in_memory() -> RustvelloResult<Self> {
54 let conn = Connection::open_in_memory().map_err(|e| {
55 RustvelloError::state_backend(format!("failed to open in-memory SQLite: {}", e))
56 })?;
57 let db = Self {
58 conn: Mutex::new(conn),
59 };
60 db.initialize_schema()?;
61 Ok(db)
62 }
63
64 fn initialize_schema(&self) -> RustvelloResult<()> {
65 let conn = self
66 .conn
67 .lock()
68 .map_err(|e| RustvelloError::state_backend(format!("lock poisoned: {}", e)))?;
69
70 conn.pragma_update(None, "journal_mode", "WAL")
83 .map_err(|e| {
84 RustvelloError::state_backend(format!("PRAGMA journal_mode failed: {}", e))
85 })?;
86 conn.pragma_update(None, "synchronous", "NORMAL")
87 .map_err(|e| {
88 RustvelloError::state_backend(format!("PRAGMA synchronous failed: {}", e))
89 })?;
90
91 conn.execute_batch(
92 "
93 -- Broker queue
94 CREATE TABLE IF NOT EXISTS broker_queue (
95 id INTEGER PRIMARY KEY AUTOINCREMENT,
96 invocation_id TEXT NOT NULL,
97 created_at TEXT NOT NULL DEFAULT (datetime('now'))
98 );
99 CREATE INDEX IF NOT EXISTS idx_broker_queue_created
100 ON broker_queue(created_at);
101
102 -- Invocations
103 CREATE TABLE IF NOT EXISTS invocations (
104 invocation_id TEXT PRIMARY KEY,
105 task_id TEXT NOT NULL,
106 call_id TEXT NOT NULL,
107 status TEXT NOT NULL,
108 created_at TEXT NOT NULL,
109 updated_at TEXT NOT NULL,
110 parent_invocation_id TEXT,
111 workflow_id TEXT,
112 workflow_type TEXT,
113 workflow_depth INTEGER
114 );
115 CREATE INDEX IF NOT EXISTS idx_invocations_task
116 ON invocations(task_id);
117 CREATE INDEX IF NOT EXISTS idx_invocations_call
118 ON invocations(call_id);
119 CREATE INDEX IF NOT EXISTS idx_invocations_status
120 ON invocations(status);
121 CREATE INDEX IF NOT EXISTS idx_invocations_workflow
122 ON invocations(workflow_id);
123 CREATE INDEX IF NOT EXISTS idx_invocations_parent
124 ON invocations(parent_invocation_id);
125
126 -- Calls (arguments)
127 CREATE TABLE IF NOT EXISTS calls (
128 call_id TEXT PRIMARY KEY,
129 task_id TEXT NOT NULL,
130 serialized_arguments TEXT NOT NULL
131 );
132
133 -- Results
134 CREATE TABLE IF NOT EXISTS results (
135 invocation_id TEXT PRIMARY KEY,
136 result TEXT NOT NULL
137 );
138
139 -- Errors
140 CREATE TABLE IF NOT EXISTS errors (
141 invocation_id TEXT PRIMARY KEY,
142 error_type TEXT NOT NULL,
143 message TEXT NOT NULL,
144 traceback TEXT
145 );
146
147 -- Status history
148 CREATE TABLE IF NOT EXISTS history (
149 id INTEGER PRIMARY KEY AUTOINCREMENT,
150 invocation_id TEXT NOT NULL,
151 status TEXT NOT NULL,
152 runner_id TEXT,
153 timestamp TEXT NOT NULL,
154 message TEXT,
155 history_timestamp TEXT
156 );
157 CREATE INDEX IF NOT EXISTS idx_history_invocation
158 ON history(invocation_id);
159
160 -- Status records (current status with runner ownership)
161 CREATE TABLE IF NOT EXISTS status_records (
162 invocation_id TEXT PRIMARY KEY,
163 status TEXT NOT NULL,
164 runner_id TEXT,
165 timestamp TEXT NOT NULL
166 );
167
168 -- Waiting-for relationships (blocking control)
169 CREATE TABLE IF NOT EXISTS waiting_for (
170 waiter_id TEXT NOT NULL,
171 waited_on_id TEXT NOT NULL,
172 PRIMARY KEY (waiter_id, waited_on_id)
173 );
174 CREATE INDEX IF NOT EXISTS idx_waiting_for_waited_on
175 ON waiting_for(waited_on_id);
176
177 -- Concurrency control: per-argument-pair index
178 CREATE TABLE IF NOT EXISTS cc_arg_pairs (
179 invocation_id TEXT NOT NULL,
180 task_id TEXT NOT NULL,
181 arg_key TEXT NOT NULL,
182 arg_value TEXT NOT NULL,
183 PRIMARY KEY (invocation_id, arg_key, arg_value)
184 );
185 CREATE INDEX IF NOT EXISTS idx_cc_arg_lookup
186 ON cc_arg_pairs(task_id, arg_key, arg_value);
187
188 -- Client data store
189 CREATE TABLE IF NOT EXISTS client_data (
190 data_key TEXT PRIMARY KEY,
191 data_value TEXT NOT NULL,
192 created_at TEXT NOT NULL DEFAULT (datetime('now'))
193 );
194
195 -- Runner heartbeats
196 CREATE TABLE IF NOT EXISTS runner_heartbeats (
197 runner_id TEXT PRIMARY KEY,
198 creation_time TEXT NOT NULL,
199 last_heartbeat TEXT NOT NULL,
200 can_run_atomic_service INTEGER NOT NULL DEFAULT 0,
201 last_service_start TEXT,
202 last_service_end TEXT
203 );
204
205 -- Invocation retry counters
206 CREATE TABLE IF NOT EXISTS retries (
207 invocation_id TEXT PRIMARY KEY,
208 retry_count INTEGER NOT NULL DEFAULT 0
209 );
210
211 -- Trigger conditions
212 CREATE TABLE IF NOT EXISTS trg_conditions (
213 condition_id TEXT PRIMARY KEY,
214 condition_type TEXT NOT NULL DEFAULT '',
215 event_code TEXT,
216 condition_json TEXT NOT NULL
217 );
218 CREATE INDEX IF NOT EXISTS idx_trg_cond_type
219 ON trg_conditions(condition_type);
220 CREATE INDEX IF NOT EXISTS idx_trg_cond_event
221 ON trg_conditions(condition_type, event_code);
222
223 -- Trigger definitions
224 CREATE TABLE IF NOT EXISTS trg_triggers (
225 trigger_id TEXT PRIMARY KEY,
226 task_id TEXT NOT NULL,
227 logic TEXT NOT NULL,
228 argument_template TEXT
229 );
230
231 -- Condition-to-trigger mapping (many-to-many)
232 CREATE TABLE IF NOT EXISTS trg_condition_triggers (
233 condition_id TEXT NOT NULL,
234 trigger_id TEXT NOT NULL,
235 PRIMARY KEY (condition_id, trigger_id)
236 );
237 CREATE INDEX IF NOT EXISTS idx_trg_ct_trigger
238 ON trg_condition_triggers(trigger_id);
239
240 -- Valid conditions (pending evaluation)
241 CREATE TABLE IF NOT EXISTS trg_valid_conditions (
242 valid_condition_id TEXT PRIMARY KEY,
243 condition_id TEXT NOT NULL,
244 context_json TEXT NOT NULL
245 );
246 CREATE INDEX IF NOT EXISTS idx_trg_vc_condition
247 ON trg_valid_conditions(condition_id);
248
249 -- Source task → condition mapping (for fast lookup)
250 CREATE TABLE IF NOT EXISTS trg_source_task_conditions (
251 task_id TEXT NOT NULL,
252 condition_id TEXT NOT NULL,
253 PRIMARY KEY (task_id, condition_id)
254 );
255
256 -- Cron execution tracking
257 CREATE TABLE IF NOT EXISTS trg_cron_executions (
258 condition_id TEXT PRIMARY KEY,
259 last_execution TEXT NOT NULL
260 );
261
262 -- Trigger run claims (dedup)
263 CREATE TABLE IF NOT EXISTS trg_trigger_run_claims (
264 trigger_run_id TEXT PRIMARY KEY,
265 claimed_at TEXT NOT NULL
266 );
267
268 -- Workflow runs (discovery + tracking)
269 CREATE TABLE IF NOT EXISTS workflow_runs (
270 workflow_id TEXT PRIMARY KEY,
271 workflow_type TEXT NOT NULL,
272 parent_workflow_id TEXT,
273 depth INTEGER NOT NULL DEFAULT 0
274 );
275 CREATE INDEX IF NOT EXISTS idx_workflow_runs_type
276 ON workflow_runs(workflow_type);
277
278 -- Workflow key-value data store
279 CREATE TABLE IF NOT EXISTS workflow_data (
280 workflow_id TEXT NOT NULL,
281 data_key TEXT NOT NULL,
282 data_value TEXT NOT NULL,
283 PRIMARY KEY (workflow_id, data_key)
284 );
285
286 -- App info storage (opaque JSON per app_id)
287 CREATE TABLE IF NOT EXISTS app_infos (
288 app_id TEXT PRIMARY KEY,
289 info_json TEXT NOT NULL
290 );
291
292 -- Workflow sub-invocation tracking
293 CREATE TABLE IF NOT EXISTS workflow_sub_invocations (
294 workflow_id TEXT NOT NULL,
295 sub_invocation_id TEXT NOT NULL,
296 PRIMARY KEY (workflow_id, sub_invocation_id)
297 );
298 CREATE INDEX IF NOT EXISTS idx_wf_sub_inv_workflow
299 ON workflow_sub_invocations(workflow_id);
300
301 -- Runner execution contexts
302 CREATE TABLE IF NOT EXISTS runner_contexts (
303 runner_id TEXT PRIMARY KEY,
304 runner_cls TEXT NOT NULL,
305 pid INTEGER NOT NULL,
306 hostname TEXT NOT NULL,
307 thread_id INTEGER NOT NULL,
308 started_at TEXT NOT NULL,
309 parent_runner_id TEXT,
310 parent_runner_cls TEXT
311 );
312 CREATE INDEX IF NOT EXISTS idx_runner_contexts_parent
313 ON runner_contexts(parent_runner_id);
314
315 -- Auto-purge schedule
316 CREATE TABLE IF NOT EXISTS auto_purge_schedule (
317 invocation_id TEXT PRIMARY KEY,
318 scheduled_at TEXT NOT NULL
319 );
320
321 -- Index for time-range queries on history (COALESCE for history_timestamp fallback)
322 CREATE INDEX IF NOT EXISTS idx_history_timestamp
323 ON history(COALESCE(history_timestamp, timestamp));
324 ",
325 )
326 .map_err(|e| RustvelloError::state_backend(format!("schema init failed: {}", e)))?;
327
328 let _ = conn.execute_batch("ALTER TABLE trg_conditions ADD COLUMN event_code TEXT");
332
333 Ok(())
334 }
335}
336
337pub(crate) fn lock_err(e: impl std::fmt::Display) -> RustvelloError {
338 RustvelloError::state_backend(format!("lock poisoned: {}", e))
339}
340
341pub(crate) fn sql_err(e: rusqlite::Error) -> RustvelloError {
342 RustvelloError::state_backend(format!("SQLite error: {}", e))
343}
344
345pub(crate) async fn blocking<F, T>(f: F) -> RustvelloResult<T>
350where
351 F: FnOnce() -> RustvelloResult<T> + Send + 'static,
352 T: Send + 'static,
353{
354 tokio::task::spawn_blocking(f)
355 .await
356 .map_err(|e| RustvelloError::Internal {
357 message: format!("spawn_blocking join error: {e}"),
358 })?
359}
360
361pub(crate) fn parse_status(s: &str) -> RustvelloResult<rustvello_proto::status::InvocationStatus> {
362 s.parse::<rustvello_proto::status::InvocationStatus>()
363 .map_err(RustvelloError::state_backend)
364}
365
366pub(crate) fn parse_timestamp(s: &str) -> RustvelloResult<chrono::DateTime<chrono::Utc>> {
367 chrono::DateTime::parse_from_rfc3339(s)
368 .map(|dt| dt.with_timezone(&chrono::Utc))
369 .map_err(|e| RustvelloError::state_backend(format!("invalid timestamp in database: {e}")))
370}