Skip to main content

rustvello_postgres/
db.rs

1//! PostgreSQL database wrapper with connection pooling and schema initialization.
2
3use deadpool_postgres::Pool;
4use tokio_postgres::NoTls;
5
6use rustvello_core::error::{RustvelloError, RustvelloResult};
7
8/// Format a `tokio_postgres::Error` with full `DbError` details when available.
9///
10/// `tokio_postgres::Error::Display` only writes the kind string (e.g. `"db error"`)
11/// and does NOT include the server message. This helper extracts the `DbError`
12/// fields so we get actionable diagnostics.
13fn fmt_pg(e: &tokio_postgres::Error) -> String {
14    if let Some(db) = e.as_db_error() {
15        use std::fmt::Write;
16        let mut msg = format!(
17            "{}: {} (code: {})",
18            db.severity(),
19            db.message(),
20            db.code().code()
21        );
22        if let Some(detail) = db.detail() {
23            let _ = write!(msg, " detail={detail}");
24        }
25        if let Some(hint) = db.hint() {
26            let _ = write!(msg, " hint={hint}");
27        }
28        msg
29    } else {
30        e.to_string()
31    }
32}
33
34/// Shared PostgreSQL database connection pool with schema initialization.
35///
36/// Data is isolated by `app_id`: each app gets its own PostgreSQL schema
37/// (`CREATE SCHEMA IF NOT EXISTS "{app_id}"`), and every connection
38/// obtained through [`conn()`](Self::conn) has `search_path` set to that
39/// schema.  Two `Database` instances with different `app_id` values
40/// sharing the same physical database will not see each other's tables.
41pub struct Database {
42    pub(crate) pool: Pool,
43    app_id: String,
44}
45
46impl Database {
47    /// Create a new database from a connection string.
48    ///
49    /// The connection string should be in the format:
50    /// `host=localhost user=postgres password=secret dbname=rustvello`
51    /// or a URI: `postgres://user:pass@host/dbname`
52    ///
53    /// The `app_id` is used to create a dedicated PostgreSQL schema so
54    /// that multiple applications can share the same database without
55    /// interference.
56    ///
57    /// **Note:** This method uses `NoTls`, so all data is sent in cleartext.
58    /// For encrypted connections, use [`connect_tls()`](Self::connect_tls)
59    /// (requires the `tls` feature) or [`from_pool()`](Self::from_pool)
60    /// with a custom pool configuration.
61    pub async fn connect(connection_string: &str, app_id: &str) -> RustvelloResult<Self> {
62        Self::connect_with_pool_size(connection_string, app_id, None).await
63    }
64
65    /// Like [`connect()`](Self::connect) but with a configurable maximum pool size.
66    ///
67    /// When `max_size` is `None`, deadpool's default (CPU count × 4) is used.
68    pub async fn connect_with_pool_size(
69        connection_string: &str,
70        app_id: &str,
71        max_size: Option<usize>,
72    ) -> RustvelloResult<Self> {
73        let pg_config: tokio_postgres::Config =
74            connection_string
75                .parse()
76                .map_err(|e: tokio_postgres::Error| {
77                    RustvelloError::state_backend(format!("invalid Postgres config: {e}"))
78                })?;
79
80        let mgr = deadpool_postgres::Manager::new(pg_config, NoTls);
81        let mut builder = Pool::builder(mgr);
82        if let Some(size) = max_size {
83            builder = builder.max_size(size);
84        }
85        let pool = builder
86            .build()
87            .map_err(|e| RustvelloError::state_backend(format!("failed to create pool: {e}")))?;
88
89        let db = Self {
90            pool,
91            app_id: app_id.to_string(),
92        };
93        db.initialize_schema().await?;
94        Ok(db)
95    }
96
97    /// Create a new database with TLS encryption from a connection string.
98    ///
99    /// Uses the system CA trust store to verify server certificates.
100    /// Requires the `tls` feature.
101    ///
102    /// The connection string should be in the format:
103    /// `host=localhost user=postgres password=secret dbname=rustvello sslmode=require`
104    /// or a URI: `postgres://user:pass@host/dbname?sslmode=require`
105    #[cfg(feature = "tls")]
106    #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
107    pub async fn connect_tls(connection_string: &str, app_id: &str) -> RustvelloResult<Self> {
108        Self::connect_tls_with_pool_size(connection_string, app_id, None).await
109    }
110
111    /// Like [`connect_tls()`](Self::connect_tls) but with a configurable maximum pool size.
112    #[cfg(feature = "tls")]
113    #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
114    pub async fn connect_tls_with_pool_size(
115        connection_string: &str,
116        app_id: &str,
117        max_size: Option<usize>,
118    ) -> RustvelloResult<Self> {
119        let pg_config: tokio_postgres::Config =
120            connection_string
121                .parse()
122                .map_err(|e: tokio_postgres::Error| {
123                    RustvelloError::state_backend(format!("invalid Postgres config: {e}"))
124                })?;
125
126        let tls_connector = native_tls::TlsConnector::new().map_err(|e| {
127            RustvelloError::state_backend(format!("failed to create TLS connector: {e}"))
128        })?;
129        let pg_tls = postgres_native_tls::MakeTlsConnector::new(tls_connector);
130
131        let mgr = deadpool_postgres::Manager::new(pg_config, pg_tls);
132        let mut builder = Pool::builder(mgr);
133        if let Some(size) = max_size {
134            builder = builder.max_size(size);
135        }
136        let pool = builder
137            .build()
138            .map_err(|e| RustvelloError::state_backend(format!("failed to create pool: {e}")))?;
139
140        let db = Self {
141            pool,
142            app_id: app_id.to_string(),
143        };
144        db.initialize_schema().await?;
145        Ok(db)
146    }
147
148    /// Create a database from an existing deadpool pool.
149    pub async fn from_pool(pool: Pool, app_id: &str) -> RustvelloResult<Self> {
150        let db = Self {
151            pool,
152            app_id: app_id.to_string(),
153        };
154        db.initialize_schema().await?;
155        Ok(db)
156    }
157
158    /// Get a connection from the pool.
159    ///
160    /// Every connection has its `search_path` set to the app-specific
161    /// schema so that all subsequent SQL operates in the correct namespace.
162    pub(crate) async fn conn(&self) -> RustvelloResult<deadpool_postgres::Client> {
163        let client = self
164            .pool
165            .get()
166            .await
167            .map_err(|e| RustvelloError::state_backend(format!("pool error: {e}")))?;
168        // Double-quote escaping prevents SQL injection in identifiers.
169        let escaped = self.app_id.replace('"', "\"\"");
170        client
171            .execute(&format!("SET search_path TO \"{escaped}\""), &[])
172            .await
173            .map_err(|e| {
174                RustvelloError::state_backend(format!("SET search_path failed: {}", fmt_pg(&e)))
175            })?;
176        Ok(client)
177    }
178
179    async fn initialize_schema(&self) -> RustvelloResult<()> {
180        // Use a raw pool connection (without SET search_path) so we can
181        // bootstrap the schema itself.
182        let client = self
183            .pool
184            .get()
185            .await
186            .map_err(|e| RustvelloError::state_backend(format!("pool error: {e}")))?;
187
188        // Create the app-specific schema and switch to it.
189        let escaped = self.app_id.replace('"', "\"\"");
190        client
191            .batch_execute(&format!(
192                "CREATE SCHEMA IF NOT EXISTS \"{escaped}\"; SET search_path TO \"{escaped}\";"
193            ))
194            .await
195            .map_err(|e| {
196                RustvelloError::state_backend(format!("schema creation failed: {}", fmt_pg(&e)))
197            })?;
198
199        client
200            .batch_execute(
201                "
202            -- Broker queue
203            CREATE TABLE IF NOT EXISTS broker_queue (
204                id BIGSERIAL PRIMARY KEY,
205                invocation_id TEXT NOT NULL,
206                created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
207            );
208            CREATE INDEX IF NOT EXISTS idx_broker_queue_created
209                ON broker_queue(created_at);
210
211            -- Invocations
212            CREATE TABLE IF NOT EXISTS invocations (
213                invocation_id TEXT PRIMARY KEY,
214                task_id TEXT NOT NULL,
215                call_id TEXT NOT NULL,
216                status TEXT NOT NULL,
217                created_at TIMESTAMPTZ NOT NULL,
218                updated_at TIMESTAMPTZ NOT NULL,
219                parent_invocation_id TEXT,
220                workflow_id TEXT,
221                workflow_type TEXT,
222                workflow_depth INTEGER
223            );
224            CREATE INDEX IF NOT EXISTS idx_invocations_task
225                ON invocations(task_id);
226            CREATE INDEX IF NOT EXISTS idx_invocations_call
227                ON invocations(call_id);
228            CREATE INDEX IF NOT EXISTS idx_invocations_status
229                ON invocations(status);
230            CREATE INDEX IF NOT EXISTS idx_invocations_workflow
231                ON invocations(workflow_id);
232            CREATE INDEX IF NOT EXISTS idx_invocations_parent
233                ON invocations(parent_invocation_id);
234
235            -- Calls (arguments)
236            CREATE TABLE IF NOT EXISTS calls (
237                call_id TEXT PRIMARY KEY,
238                task_id TEXT NOT NULL,
239                serialized_arguments TEXT NOT NULL
240            );
241
242            -- Results
243            CREATE TABLE IF NOT EXISTS results (
244                invocation_id TEXT PRIMARY KEY,
245                result TEXT NOT NULL
246            );
247
248            -- Errors
249            CREATE TABLE IF NOT EXISTS errors (
250                invocation_id TEXT PRIMARY KEY,
251                error_type TEXT NOT NULL,
252                message TEXT NOT NULL,
253                traceback TEXT
254            );
255
256            -- Status history
257            CREATE TABLE IF NOT EXISTS history (
258                id BIGSERIAL PRIMARY KEY,
259                invocation_id TEXT NOT NULL,
260                status TEXT NOT NULL,
261                runner_id TEXT,
262                timestamp TIMESTAMPTZ NOT NULL,
263                message TEXT
264            );
265            CREATE INDEX IF NOT EXISTS idx_history_invocation
266                ON history(invocation_id);
267
268            -- Status records (current status with runner ownership)
269            CREATE TABLE IF NOT EXISTS status_records (
270                invocation_id TEXT PRIMARY KEY,
271                status TEXT NOT NULL,
272                runner_id TEXT,
273                timestamp TIMESTAMPTZ NOT NULL
274            );
275
276            -- Waiting-for relationships (blocking control)
277            CREATE TABLE IF NOT EXISTS waiting_for (
278                waiter_id TEXT NOT NULL,
279                waited_on_id TEXT NOT NULL,
280                PRIMARY KEY (waiter_id, waited_on_id)
281            );
282            CREATE INDEX IF NOT EXISTS idx_waiting_for_waited_on
283                ON waiting_for(waited_on_id);
284
285            -- Concurrency control: per-argument-pair index
286            CREATE TABLE IF NOT EXISTS cc_arg_pairs (
287                invocation_id TEXT NOT NULL,
288                task_id TEXT NOT NULL,
289                arg_key TEXT NOT NULL,
290                arg_value TEXT NOT NULL,
291                PRIMARY KEY (invocation_id, arg_key, arg_value)
292            );
293            CREATE INDEX IF NOT EXISTS idx_cc_arg_lookup
294                ON cc_arg_pairs(task_id, arg_key, arg_value);
295
296            -- Client data store
297            CREATE TABLE IF NOT EXISTS client_data (
298                data_key TEXT PRIMARY KEY,
299                data_value TEXT NOT NULL,
300                created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
301            );
302
303            -- Runner heartbeats
304            CREATE TABLE IF NOT EXISTS runner_heartbeats (
305                runner_id TEXT PRIMARY KEY,
306                last_heartbeat TIMESTAMPTZ NOT NULL
307            );
308
309            -- Invocation retry counts
310            CREATE TABLE IF NOT EXISTS retries (
311                invocation_id TEXT PRIMARY KEY,
312                count INTEGER NOT NULL DEFAULT 0
313            );
314
315            -- Trigger conditions
316            CREATE TABLE IF NOT EXISTS trg_conditions (
317                condition_id TEXT PRIMARY KEY,
318                condition_type TEXT NOT NULL DEFAULT '',
319                condition_json TEXT NOT NULL,
320                event_code TEXT
321            );
322            CREATE INDEX IF NOT EXISTS idx_trg_cond_type
323                ON trg_conditions(condition_type);
324            CREATE INDEX IF NOT EXISTS idx_trg_cond_event_code
325                ON trg_conditions(event_code);
326
327            -- Trigger definitions
328            CREATE TABLE IF NOT EXISTS trg_triggers (
329                trigger_id TEXT PRIMARY KEY,
330                task_id TEXT NOT NULL,
331                logic TEXT NOT NULL,
332                argument_template TEXT
333            );
334
335            -- Condition-to-trigger mapping (many-to-many)
336            CREATE TABLE IF NOT EXISTS trg_condition_triggers (
337                condition_id TEXT NOT NULL,
338                trigger_id TEXT NOT NULL,
339                PRIMARY KEY (condition_id, trigger_id)
340            );
341            CREATE INDEX IF NOT EXISTS idx_trg_ct_trigger
342                ON trg_condition_triggers(trigger_id);
343
344            -- Valid conditions (pending evaluation)
345            CREATE TABLE IF NOT EXISTS trg_valid_conditions (
346                valid_condition_id TEXT PRIMARY KEY,
347                condition_id TEXT NOT NULL,
348                context_json TEXT NOT NULL
349            );
350            CREATE INDEX IF NOT EXISTS idx_trg_vc_condition
351                ON trg_valid_conditions(condition_id);
352
353            -- Source task → condition mapping (for fast lookup)
354            CREATE TABLE IF NOT EXISTS trg_source_task_conditions (
355                task_id TEXT NOT NULL,
356                condition_id TEXT NOT NULL,
357                PRIMARY KEY (task_id, condition_id)
358            );
359
360            -- Cron execution tracking
361            CREATE TABLE IF NOT EXISTS trg_cron_executions (
362                condition_id TEXT PRIMARY KEY,
363                last_execution TIMESTAMPTZ NOT NULL
364            );
365
366            -- Trigger run claims (dedup)
367            CREATE TABLE IF NOT EXISTS trg_trigger_run_claims (
368                trigger_run_id TEXT PRIMARY KEY,
369                claimed_at TIMESTAMPTZ NOT NULL
370            );
371
372            -- Workflow runs (discovery + tracking)
373            CREATE TABLE IF NOT EXISTS workflow_runs (
374                workflow_id TEXT PRIMARY KEY,
375                workflow_type TEXT NOT NULL,
376                parent_workflow_id TEXT,
377                depth INTEGER NOT NULL DEFAULT 0
378            );
379            CREATE INDEX IF NOT EXISTS idx_workflow_runs_type
380                ON workflow_runs(workflow_type);
381
382            -- Workflow key-value data store
383            CREATE TABLE IF NOT EXISTS workflow_data (
384                workflow_id TEXT NOT NULL,
385                data_key TEXT NOT NULL,
386                data_value TEXT NOT NULL,
387                PRIMARY KEY (workflow_id, data_key)
388            );
389
390            -- App info storage
391            CREATE TABLE IF NOT EXISTS app_infos (
392                app_id TEXT PRIMARY KEY,
393                info_json TEXT NOT NULL
394            );
395
396            -- Workflow sub-invocation tracking
397            CREATE TABLE IF NOT EXISTS workflow_sub_invocations (
398                workflow_id TEXT NOT NULL,
399                sub_invocation_id TEXT NOT NULL,
400                PRIMARY KEY (workflow_id, sub_invocation_id)
401            );
402
403            -- Runner execution contexts
404            CREATE TABLE IF NOT EXISTS runner_contexts (
405                runner_id TEXT PRIMARY KEY,
406                runner_cls TEXT NOT NULL,
407                pid INTEGER NOT NULL,
408                hostname TEXT NOT NULL,
409                thread_id BIGINT NOT NULL,
410                started_at TIMESTAMPTZ NOT NULL,
411                parent_runner_id TEXT,
412                parent_runner_cls TEXT
413            );
414            CREATE INDEX IF NOT EXISTS idx_runner_contexts_parent
415                ON runner_contexts(parent_runner_id);
416
417            -- Add history_timestamp for time-range queries
418            ALTER TABLE history ADD COLUMN IF NOT EXISTS history_timestamp TIMESTAMPTZ;
419            CREATE INDEX IF NOT EXISTS idx_history_runner
420                ON history(runner_id);
421            ",
422            )
423            .await
424            .map_err(|e| {
425                RustvelloError::state_backend(format!("schema init failed: {}", fmt_pg(&e)))
426            })?;
427
428        Ok(())
429    }
430}
431
432pub(crate) fn pg_err(e: tokio_postgres::Error) -> RustvelloError {
433    RustvelloError::state_backend(format!("Postgres error: {}", fmt_pg(&e)))
434}
435
436pub(crate) fn parse_status(s: &str) -> RustvelloResult<rustvello_proto::status::InvocationStatus> {
437    s.parse::<rustvello_proto::status::InvocationStatus>()
438        .map_err(RustvelloError::state_backend)
439}