Skip to main content

rustvello_postgres/
db.rs

1//! PostgreSQL database wrapper with connection pooling and schema initialization.
2
3use deadpool_postgres::Pool;
4use tokio_postgres::NoTls;
5
6use rustvello_core::error::{RustvelloError, RustvelloResult};
7
8/// Shared PostgreSQL database connection pool with schema initialization.
9///
10/// Data is isolated by `app_id`: each app gets its own PostgreSQL schema
11/// (`CREATE SCHEMA IF NOT EXISTS "{app_id}"`), and every connection
12/// obtained through [`conn()`](Self::conn) has `search_path` set to that
13/// schema.  Two `Database` instances with different `app_id` values
14/// sharing the same physical database will not see each other's tables.
15pub struct Database {
16    pub(crate) pool: Pool,
17    app_id: String,
18}
19
20impl Database {
21    /// Create a new database from a connection string.
22    ///
23    /// The connection string should be in the format:
24    /// `host=localhost user=postgres password=secret dbname=rustvello`
25    /// or a URI: `postgres://user:pass@host/dbname`
26    ///
27    /// The `app_id` is used to create a dedicated PostgreSQL schema so
28    /// that multiple applications can share the same database without
29    /// interference.
30    ///
31    /// **Note:** This method uses `NoTls`, so all data is sent in cleartext.
32    /// For encrypted connections, use [`connect_tls()`](Self::connect_tls)
33    /// (requires the `tls` feature) or [`from_pool()`](Self::from_pool)
34    /// with a custom pool configuration.
35    pub async fn connect(connection_string: &str, app_id: &str) -> RustvelloResult<Self> {
36        Self::connect_with_pool_size(connection_string, app_id, None).await
37    }
38
39    /// Like [`connect()`](Self::connect) but with a configurable maximum pool size.
40    ///
41    /// When `max_size` is `None`, deadpool's default (CPU count × 4) is used.
42    pub async fn connect_with_pool_size(
43        connection_string: &str,
44        app_id: &str,
45        max_size: Option<usize>,
46    ) -> RustvelloResult<Self> {
47        let pg_config: tokio_postgres::Config =
48            connection_string
49                .parse()
50                .map_err(|e: tokio_postgres::Error| {
51                    RustvelloError::state_backend(format!("invalid Postgres config: {e}"))
52                })?;
53
54        let mgr = deadpool_postgres::Manager::new(pg_config, NoTls);
55        let mut builder = Pool::builder(mgr);
56        if let Some(size) = max_size {
57            builder = builder.max_size(size);
58        }
59        let pool = builder
60            .build()
61            .map_err(|e| RustvelloError::state_backend(format!("failed to create pool: {e}")))?;
62
63        let db = Self {
64            pool,
65            app_id: app_id.to_string(),
66        };
67        db.initialize_schema().await?;
68        Ok(db)
69    }
70
71    /// Create a new database with TLS encryption from a connection string.
72    ///
73    /// Uses the system CA trust store to verify server certificates.
74    /// Requires the `tls` feature.
75    ///
76    /// The connection string should be in the format:
77    /// `host=localhost user=postgres password=secret dbname=rustvello sslmode=require`
78    /// or a URI: `postgres://user:pass@host/dbname?sslmode=require`
79    #[cfg(feature = "tls")]
80    #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
81    pub async fn connect_tls(connection_string: &str, app_id: &str) -> RustvelloResult<Self> {
82        Self::connect_tls_with_pool_size(connection_string, app_id, None).await
83    }
84
85    /// Like [`connect_tls()`](Self::connect_tls) but with a configurable maximum pool size.
86    #[cfg(feature = "tls")]
87    #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
88    pub async fn connect_tls_with_pool_size(
89        connection_string: &str,
90        app_id: &str,
91        max_size: Option<usize>,
92    ) -> RustvelloResult<Self> {
93        let pg_config: tokio_postgres::Config =
94            connection_string
95                .parse()
96                .map_err(|e: tokio_postgres::Error| {
97                    RustvelloError::state_backend(format!("invalid Postgres config: {e}"))
98                })?;
99
100        let tls_connector = native_tls::TlsConnector::new().map_err(|e| {
101            RustvelloError::state_backend(format!("failed to create TLS connector: {e}"))
102        })?;
103        let pg_tls = postgres_native_tls::MakeTlsConnector::new(tls_connector);
104
105        let mgr = deadpool_postgres::Manager::new(pg_config, pg_tls);
106        let mut builder = Pool::builder(mgr);
107        if let Some(size) = max_size {
108            builder = builder.max_size(size);
109        }
110        let pool = builder
111            .build()
112            .map_err(|e| RustvelloError::state_backend(format!("failed to create pool: {e}")))?;
113
114        let db = Self {
115            pool,
116            app_id: app_id.to_string(),
117        };
118        db.initialize_schema().await?;
119        Ok(db)
120    }
121
122    /// Create a database from an existing deadpool pool.
123    pub async fn from_pool(pool: Pool, app_id: &str) -> RustvelloResult<Self> {
124        let db = Self {
125            pool,
126            app_id: app_id.to_string(),
127        };
128        db.initialize_schema().await?;
129        Ok(db)
130    }
131
132    /// Get a connection from the pool.
133    ///
134    /// Every connection has its `search_path` set to the app-specific
135    /// schema so that all subsequent SQL operates in the correct namespace.
136    pub(crate) async fn conn(&self) -> RustvelloResult<deadpool_postgres::Client> {
137        let client = self
138            .pool
139            .get()
140            .await
141            .map_err(|e| RustvelloError::state_backend(format!("pool error: {e}")))?;
142        // Double-quote escaping prevents SQL injection in identifiers.
143        let escaped = self.app_id.replace('"', "\"\"");
144        client
145            .execute(&format!("SET search_path TO \"{escaped}\""), &[])
146            .await
147            .map_err(|e| RustvelloError::state_backend(format!("SET search_path failed: {e}")))?;
148        Ok(client)
149    }
150
151    async fn initialize_schema(&self) -> RustvelloResult<()> {
152        // Use a raw pool connection (without SET search_path) so we can
153        // bootstrap the schema itself.
154        let client = self
155            .pool
156            .get()
157            .await
158            .map_err(|e| RustvelloError::state_backend(format!("pool error: {e}")))?;
159
160        // Create the app-specific schema and switch to it.
161        let escaped = self.app_id.replace('"', "\"\"");
162        client
163            .batch_execute(&format!(
164                "CREATE SCHEMA IF NOT EXISTS \"{escaped}\"; SET search_path TO \"{escaped}\";"
165            ))
166            .await
167            .map_err(|e| RustvelloError::state_backend(format!("schema creation failed: {e}")))?;
168
169        client
170            .batch_execute(
171                "
172            -- Broker queue
173            CREATE TABLE IF NOT EXISTS broker_queue (
174                id BIGSERIAL PRIMARY KEY,
175                invocation_id TEXT NOT NULL,
176                created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
177            );
178            CREATE INDEX IF NOT EXISTS idx_broker_queue_created
179                ON broker_queue(created_at);
180
181            -- Invocations
182            CREATE TABLE IF NOT EXISTS invocations (
183                invocation_id TEXT PRIMARY KEY,
184                task_id TEXT NOT NULL,
185                call_id TEXT NOT NULL,
186                status TEXT NOT NULL,
187                created_at TIMESTAMPTZ NOT NULL,
188                updated_at TIMESTAMPTZ NOT NULL,
189                parent_invocation_id TEXT,
190                workflow_id TEXT,
191                workflow_type TEXT,
192                workflow_depth INTEGER
193            );
194            CREATE INDEX IF NOT EXISTS idx_invocations_task
195                ON invocations(task_id);
196            CREATE INDEX IF NOT EXISTS idx_invocations_call
197                ON invocations(call_id);
198            CREATE INDEX IF NOT EXISTS idx_invocations_status
199                ON invocations(status);
200            CREATE INDEX IF NOT EXISTS idx_invocations_workflow
201                ON invocations(workflow_id);
202            CREATE INDEX IF NOT EXISTS idx_invocations_parent
203                ON invocations(parent_invocation_id);
204
205            -- Calls (arguments)
206            CREATE TABLE IF NOT EXISTS calls (
207                call_id TEXT PRIMARY KEY,
208                task_id TEXT NOT NULL,
209                serialized_arguments TEXT NOT NULL
210            );
211
212            -- Results
213            CREATE TABLE IF NOT EXISTS results (
214                invocation_id TEXT PRIMARY KEY,
215                result TEXT NOT NULL
216            );
217
218            -- Errors
219            CREATE TABLE IF NOT EXISTS errors (
220                invocation_id TEXT PRIMARY KEY,
221                error_type TEXT NOT NULL,
222                message TEXT NOT NULL,
223                traceback TEXT
224            );
225
226            -- Status history
227            CREATE TABLE IF NOT EXISTS history (
228                id BIGSERIAL PRIMARY KEY,
229                invocation_id TEXT NOT NULL,
230                status TEXT NOT NULL,
231                runner_id TEXT,
232                timestamp TIMESTAMPTZ NOT NULL,
233                message TEXT
234            );
235            CREATE INDEX IF NOT EXISTS idx_history_invocation
236                ON history(invocation_id);
237
238            -- Status records (current status with runner ownership)
239            CREATE TABLE IF NOT EXISTS status_records (
240                invocation_id TEXT PRIMARY KEY,
241                status TEXT NOT NULL,
242                runner_id TEXT,
243                timestamp TIMESTAMPTZ NOT NULL
244            );
245
246            -- Waiting-for relationships (blocking control)
247            CREATE TABLE IF NOT EXISTS waiting_for (
248                waiter_id TEXT NOT NULL,
249                waited_on_id TEXT NOT NULL,
250                PRIMARY KEY (waiter_id, waited_on_id)
251            );
252            CREATE INDEX IF NOT EXISTS idx_waiting_for_waited_on
253                ON waiting_for(waited_on_id);
254
255            -- Concurrency control: per-argument-pair index
256            CREATE TABLE IF NOT EXISTS cc_arg_pairs (
257                invocation_id TEXT NOT NULL,
258                task_id TEXT NOT NULL,
259                arg_key TEXT NOT NULL,
260                arg_value TEXT NOT NULL,
261                PRIMARY KEY (invocation_id, arg_key, arg_value)
262            );
263            CREATE INDEX IF NOT EXISTS idx_cc_arg_lookup
264                ON cc_arg_pairs(task_id, arg_key, arg_value);
265
266            -- Client data store
267            CREATE TABLE IF NOT EXISTS client_data (
268                data_key TEXT PRIMARY KEY,
269                data_value TEXT NOT NULL,
270                created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
271            );
272
273            -- Runner heartbeats
274            CREATE TABLE IF NOT EXISTS runner_heartbeats (
275                runner_id TEXT PRIMARY KEY,
276                last_heartbeat TIMESTAMPTZ NOT NULL
277            );
278
279            -- Invocation retry counts
280            CREATE TABLE IF NOT EXISTS retries (
281                invocation_id TEXT PRIMARY KEY,
282                count INTEGER NOT NULL DEFAULT 0
283            );
284
285            -- Trigger conditions
286            CREATE TABLE IF NOT EXISTS trg_conditions (
287                condition_id TEXT PRIMARY KEY,
288                condition_type TEXT NOT NULL DEFAULT '',
289                condition_json TEXT NOT NULL,
290                event_code TEXT
291            );
292            CREATE INDEX IF NOT EXISTS idx_trg_cond_type
293                ON trg_conditions(condition_type);
294            CREATE INDEX IF NOT EXISTS idx_trg_cond_event_code
295                ON trg_conditions(event_code);
296
297            -- Trigger definitions
298            CREATE TABLE IF NOT EXISTS trg_triggers (
299                trigger_id TEXT PRIMARY KEY,
300                task_id TEXT NOT NULL,
301                logic TEXT NOT NULL,
302                argument_template TEXT
303            );
304
305            -- Condition-to-trigger mapping (many-to-many)
306            CREATE TABLE IF NOT EXISTS trg_condition_triggers (
307                condition_id TEXT NOT NULL,
308                trigger_id TEXT NOT NULL,
309                PRIMARY KEY (condition_id, trigger_id)
310            );
311            CREATE INDEX IF NOT EXISTS idx_trg_ct_trigger
312                ON trg_condition_triggers(trigger_id);
313
314            -- Valid conditions (pending evaluation)
315            CREATE TABLE IF NOT EXISTS trg_valid_conditions (
316                valid_condition_id TEXT PRIMARY KEY,
317                condition_id TEXT NOT NULL,
318                context_json TEXT NOT NULL
319            );
320            CREATE INDEX IF NOT EXISTS idx_trg_vc_condition
321                ON trg_valid_conditions(condition_id);
322
323            -- Source task → condition mapping (for fast lookup)
324            CREATE TABLE IF NOT EXISTS trg_source_task_conditions (
325                task_id TEXT NOT NULL,
326                condition_id TEXT NOT NULL,
327                PRIMARY KEY (task_id, condition_id)
328            );
329
330            -- Cron execution tracking
331            CREATE TABLE IF NOT EXISTS trg_cron_executions (
332                condition_id TEXT PRIMARY KEY,
333                last_execution TIMESTAMPTZ NOT NULL
334            );
335
336            -- Trigger run claims (dedup)
337            CREATE TABLE IF NOT EXISTS trg_trigger_run_claims (
338                trigger_run_id TEXT PRIMARY KEY,
339                claimed_at TIMESTAMPTZ NOT NULL
340            );
341
342            -- Workflow runs (discovery + tracking)
343            CREATE TABLE IF NOT EXISTS workflow_runs (
344                workflow_id TEXT PRIMARY KEY,
345                workflow_type TEXT NOT NULL,
346                parent_workflow_id TEXT,
347                depth INTEGER NOT NULL DEFAULT 0
348            );
349            CREATE INDEX IF NOT EXISTS idx_workflow_runs_type
350                ON workflow_runs(workflow_type);
351
352            -- Workflow key-value data store
353            CREATE TABLE IF NOT EXISTS workflow_data (
354                workflow_id TEXT NOT NULL,
355                data_key TEXT NOT NULL,
356                data_value TEXT NOT NULL,
357                PRIMARY KEY (workflow_id, data_key)
358            );
359
360            -- App info storage
361            CREATE TABLE IF NOT EXISTS app_infos (
362                app_id TEXT PRIMARY KEY,
363                info_json TEXT NOT NULL
364            );
365
366            -- Workflow sub-invocation tracking
367            CREATE TABLE IF NOT EXISTS workflow_sub_invocations (
368                workflow_id TEXT NOT NULL,
369                sub_invocation_id TEXT NOT NULL,
370                PRIMARY KEY (workflow_id, sub_invocation_id)
371            );
372
373            -- Runner execution contexts
374            CREATE TABLE IF NOT EXISTS runner_contexts (
375                runner_id TEXT PRIMARY KEY,
376                runner_cls TEXT NOT NULL,
377                pid INTEGER NOT NULL,
378                hostname TEXT NOT NULL,
379                thread_id BIGINT NOT NULL,
380                started_at TIMESTAMPTZ NOT NULL,
381                parent_runner_id TEXT,
382                parent_runner_cls TEXT
383            );
384            CREATE INDEX IF NOT EXISTS idx_runner_contexts_parent
385                ON runner_contexts(parent_runner_id);
386
387            -- Add history_timestamp for time-range queries
388            ALTER TABLE history ADD COLUMN IF NOT EXISTS history_timestamp TIMESTAMPTZ;
389            CREATE INDEX IF NOT EXISTS idx_history_runner
390                ON history(runner_id);
391            ",
392            )
393            .await
394            .map_err(|e| RustvelloError::state_backend(format!("schema init failed: {e}")))?;
395
396        Ok(())
397    }
398}
399
400pub(crate) fn pg_err(e: tokio_postgres::Error) -> RustvelloError {
401    RustvelloError::state_backend(format!("Postgres error: {e}"))
402}
403
404pub(crate) fn parse_status(s: &str) -> RustvelloResult<rustvello_proto::status::InvocationStatus> {
405    s.parse::<rustvello_proto::status::InvocationStatus>()
406        .map_err(RustvelloError::state_backend)
407}