1use deadpool_postgres::Pool;
4use tokio_postgres::NoTls;
5
6use rustvello_core::error::{RustvelloError, RustvelloResult};
7
8fn fmt_pg(e: &tokio_postgres::Error) -> String {
14 if let Some(db) = e.as_db_error() {
15 use std::fmt::Write;
16 let mut msg = format!(
17 "{}: {} (code: {})",
18 db.severity(),
19 db.message(),
20 db.code().code()
21 );
22 if let Some(detail) = db.detail() {
23 let _ = write!(msg, " detail={detail}");
24 }
25 if let Some(hint) = db.hint() {
26 let _ = write!(msg, " hint={hint}");
27 }
28 msg
29 } else {
30 e.to_string()
31 }
32}
33
34pub struct Database {
42 pub(crate) pool: Pool,
43 app_id: String,
44}
45
46impl Database {
47 pub async fn connect(connection_string: &str, app_id: &str) -> RustvelloResult<Self> {
62 Self::connect_with_pool_size(connection_string, app_id, None).await
63 }
64
65 pub async fn connect_with_pool_size(
69 connection_string: &str,
70 app_id: &str,
71 max_size: Option<usize>,
72 ) -> RustvelloResult<Self> {
73 let pg_config: tokio_postgres::Config =
74 connection_string
75 .parse()
76 .map_err(|e: tokio_postgres::Error| {
77 RustvelloError::state_backend(format!("invalid Postgres config: {e}"))
78 })?;
79
80 let mgr = deadpool_postgres::Manager::new(pg_config, NoTls);
81 let mut builder = Pool::builder(mgr);
82 if let Some(size) = max_size {
83 builder = builder.max_size(size);
84 }
85 let pool = builder
86 .build()
87 .map_err(|e| RustvelloError::state_backend(format!("failed to create pool: {e}")))?;
88
89 let db = Self {
90 pool,
91 app_id: app_id.to_string(),
92 };
93 db.initialize_schema().await?;
94 Ok(db)
95 }
96
97 #[cfg(feature = "tls")]
106 #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
107 pub async fn connect_tls(connection_string: &str, app_id: &str) -> RustvelloResult<Self> {
108 Self::connect_tls_with_pool_size(connection_string, app_id, None).await
109 }
110
111 #[cfg(feature = "tls")]
113 #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
114 pub async fn connect_tls_with_pool_size(
115 connection_string: &str,
116 app_id: &str,
117 max_size: Option<usize>,
118 ) -> RustvelloResult<Self> {
119 let pg_config: tokio_postgres::Config =
120 connection_string
121 .parse()
122 .map_err(|e: tokio_postgres::Error| {
123 RustvelloError::state_backend(format!("invalid Postgres config: {e}"))
124 })?;
125
126 let tls_connector = native_tls::TlsConnector::new().map_err(|e| {
127 RustvelloError::state_backend(format!("failed to create TLS connector: {e}"))
128 })?;
129 let pg_tls = postgres_native_tls::MakeTlsConnector::new(tls_connector);
130
131 let mgr = deadpool_postgres::Manager::new(pg_config, pg_tls);
132 let mut builder = Pool::builder(mgr);
133 if let Some(size) = max_size {
134 builder = builder.max_size(size);
135 }
136 let pool = builder
137 .build()
138 .map_err(|e| RustvelloError::state_backend(format!("failed to create pool: {e}")))?;
139
140 let db = Self {
141 pool,
142 app_id: app_id.to_string(),
143 };
144 db.initialize_schema().await?;
145 Ok(db)
146 }
147
148 pub async fn from_pool(pool: Pool, app_id: &str) -> RustvelloResult<Self> {
150 let db = Self {
151 pool,
152 app_id: app_id.to_string(),
153 };
154 db.initialize_schema().await?;
155 Ok(db)
156 }
157
158 pub(crate) async fn conn(&self) -> RustvelloResult<deadpool_postgres::Client> {
163 let client = self
164 .pool
165 .get()
166 .await
167 .map_err(|e| RustvelloError::state_backend(format!("pool error: {e}")))?;
168 let escaped = self.app_id.replace('"', "\"\"");
170 client
171 .execute(&format!("SET search_path TO \"{escaped}\""), &[])
172 .await
173 .map_err(|e| {
174 RustvelloError::state_backend(format!("SET search_path failed: {}", fmt_pg(&e)))
175 })?;
176 Ok(client)
177 }
178
179 async fn initialize_schema(&self) -> RustvelloResult<()> {
180 let client = self
183 .pool
184 .get()
185 .await
186 .map_err(|e| RustvelloError::state_backend(format!("pool error: {e}")))?;
187
188 let escaped = self.app_id.replace('"', "\"\"");
190 client
191 .batch_execute(&format!(
192 "CREATE SCHEMA IF NOT EXISTS \"{escaped}\"; SET search_path TO \"{escaped}\";"
193 ))
194 .await
195 .map_err(|e| {
196 RustvelloError::state_backend(format!("schema creation failed: {}", fmt_pg(&e)))
197 })?;
198
199 client
200 .batch_execute(
201 "
202 -- Broker queue
203 CREATE TABLE IF NOT EXISTS broker_queue (
204 id BIGSERIAL PRIMARY KEY,
205 invocation_id TEXT NOT NULL,
206 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
207 );
208 CREATE INDEX IF NOT EXISTS idx_broker_queue_created
209 ON broker_queue(created_at);
210
211 -- Invocations
212 CREATE TABLE IF NOT EXISTS invocations (
213 invocation_id TEXT PRIMARY KEY,
214 task_id TEXT NOT NULL,
215 call_id TEXT NOT NULL,
216 status TEXT NOT NULL,
217 created_at TIMESTAMPTZ NOT NULL,
218 updated_at TIMESTAMPTZ NOT NULL,
219 parent_invocation_id TEXT,
220 workflow_id TEXT,
221 workflow_type TEXT,
222 workflow_depth INTEGER
223 );
224 CREATE INDEX IF NOT EXISTS idx_invocations_task
225 ON invocations(task_id);
226 CREATE INDEX IF NOT EXISTS idx_invocations_call
227 ON invocations(call_id);
228 CREATE INDEX IF NOT EXISTS idx_invocations_status
229 ON invocations(status);
230 CREATE INDEX IF NOT EXISTS idx_invocations_workflow
231 ON invocations(workflow_id);
232 CREATE INDEX IF NOT EXISTS idx_invocations_parent
233 ON invocations(parent_invocation_id);
234
235 -- Calls (arguments)
236 CREATE TABLE IF NOT EXISTS calls (
237 call_id TEXT PRIMARY KEY,
238 task_id TEXT NOT NULL,
239 serialized_arguments TEXT NOT NULL
240 );
241
242 -- Results
243 CREATE TABLE IF NOT EXISTS results (
244 invocation_id TEXT PRIMARY KEY,
245 result TEXT NOT NULL
246 );
247
248 -- Errors
249 CREATE TABLE IF NOT EXISTS errors (
250 invocation_id TEXT PRIMARY KEY,
251 error_type TEXT NOT NULL,
252 message TEXT NOT NULL,
253 traceback TEXT
254 );
255
256 -- Status history
257 CREATE TABLE IF NOT EXISTS history (
258 id BIGSERIAL PRIMARY KEY,
259 invocation_id TEXT NOT NULL,
260 status TEXT NOT NULL,
261 runner_id TEXT,
262 timestamp TIMESTAMPTZ NOT NULL,
263 message TEXT
264 );
265 CREATE INDEX IF NOT EXISTS idx_history_invocation
266 ON history(invocation_id);
267
268 -- Status records (current status with runner ownership)
269 CREATE TABLE IF NOT EXISTS status_records (
270 invocation_id TEXT PRIMARY KEY,
271 status TEXT NOT NULL,
272 runner_id TEXT,
273 timestamp TIMESTAMPTZ NOT NULL
274 );
275
276 -- Waiting-for relationships (blocking control)
277 CREATE TABLE IF NOT EXISTS waiting_for (
278 waiter_id TEXT NOT NULL,
279 waited_on_id TEXT NOT NULL,
280 PRIMARY KEY (waiter_id, waited_on_id)
281 );
282 CREATE INDEX IF NOT EXISTS idx_waiting_for_waited_on
283 ON waiting_for(waited_on_id);
284
285 -- Concurrency control: per-argument-pair index
286 CREATE TABLE IF NOT EXISTS cc_arg_pairs (
287 invocation_id TEXT NOT NULL,
288 task_id TEXT NOT NULL,
289 arg_key TEXT NOT NULL,
290 arg_value TEXT NOT NULL,
291 PRIMARY KEY (invocation_id, arg_key, arg_value)
292 );
293 CREATE INDEX IF NOT EXISTS idx_cc_arg_lookup
294 ON cc_arg_pairs(task_id, arg_key, arg_value);
295
296 -- Client data store
297 CREATE TABLE IF NOT EXISTS client_data (
298 data_key TEXT PRIMARY KEY,
299 data_value TEXT NOT NULL,
300 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
301 );
302
303 -- Runner heartbeats
304 CREATE TABLE IF NOT EXISTS runner_heartbeats (
305 runner_id TEXT PRIMARY KEY,
306 last_heartbeat TIMESTAMPTZ NOT NULL
307 );
308
309 -- Invocation retry counts
310 CREATE TABLE IF NOT EXISTS retries (
311 invocation_id TEXT PRIMARY KEY,
312 count INTEGER NOT NULL DEFAULT 0
313 );
314
315 -- Trigger conditions
316 CREATE TABLE IF NOT EXISTS trg_conditions (
317 condition_id TEXT PRIMARY KEY,
318 condition_type TEXT NOT NULL DEFAULT '',
319 condition_json TEXT NOT NULL,
320 event_code TEXT
321 );
322 CREATE INDEX IF NOT EXISTS idx_trg_cond_type
323 ON trg_conditions(condition_type);
324 CREATE INDEX IF NOT EXISTS idx_trg_cond_event_code
325 ON trg_conditions(event_code);
326
327 -- Trigger definitions
328 CREATE TABLE IF NOT EXISTS trg_triggers (
329 trigger_id TEXT PRIMARY KEY,
330 task_id TEXT NOT NULL,
331 logic TEXT NOT NULL,
332 argument_template TEXT
333 );
334
335 -- Condition-to-trigger mapping (many-to-many)
336 CREATE TABLE IF NOT EXISTS trg_condition_triggers (
337 condition_id TEXT NOT NULL,
338 trigger_id TEXT NOT NULL,
339 PRIMARY KEY (condition_id, trigger_id)
340 );
341 CREATE INDEX IF NOT EXISTS idx_trg_ct_trigger
342 ON trg_condition_triggers(trigger_id);
343
344 -- Valid conditions (pending evaluation)
345 CREATE TABLE IF NOT EXISTS trg_valid_conditions (
346 valid_condition_id TEXT PRIMARY KEY,
347 condition_id TEXT NOT NULL,
348 context_json TEXT NOT NULL
349 );
350 CREATE INDEX IF NOT EXISTS idx_trg_vc_condition
351 ON trg_valid_conditions(condition_id);
352
353 -- Source task → condition mapping (for fast lookup)
354 CREATE TABLE IF NOT EXISTS trg_source_task_conditions (
355 task_id TEXT NOT NULL,
356 condition_id TEXT NOT NULL,
357 PRIMARY KEY (task_id, condition_id)
358 );
359
360 -- Cron execution tracking
361 CREATE TABLE IF NOT EXISTS trg_cron_executions (
362 condition_id TEXT PRIMARY KEY,
363 last_execution TIMESTAMPTZ NOT NULL
364 );
365
366 -- Trigger run claims (dedup)
367 CREATE TABLE IF NOT EXISTS trg_trigger_run_claims (
368 trigger_run_id TEXT PRIMARY KEY,
369 claimed_at TIMESTAMPTZ NOT NULL
370 );
371
372 -- Workflow runs (discovery + tracking)
373 CREATE TABLE IF NOT EXISTS workflow_runs (
374 workflow_id TEXT PRIMARY KEY,
375 workflow_type TEXT NOT NULL,
376 parent_workflow_id TEXT,
377 depth INTEGER NOT NULL DEFAULT 0
378 );
379 CREATE INDEX IF NOT EXISTS idx_workflow_runs_type
380 ON workflow_runs(workflow_type);
381
382 -- Workflow key-value data store
383 CREATE TABLE IF NOT EXISTS workflow_data (
384 workflow_id TEXT NOT NULL,
385 data_key TEXT NOT NULL,
386 data_value TEXT NOT NULL,
387 PRIMARY KEY (workflow_id, data_key)
388 );
389
390 -- App info storage
391 CREATE TABLE IF NOT EXISTS app_infos (
392 app_id TEXT PRIMARY KEY,
393 info_json TEXT NOT NULL
394 );
395
396 -- Workflow sub-invocation tracking
397 CREATE TABLE IF NOT EXISTS workflow_sub_invocations (
398 workflow_id TEXT NOT NULL,
399 sub_invocation_id TEXT NOT NULL,
400 PRIMARY KEY (workflow_id, sub_invocation_id)
401 );
402
403 -- Runner execution contexts
404 CREATE TABLE IF NOT EXISTS runner_contexts (
405 runner_id TEXT PRIMARY KEY,
406 runner_cls TEXT NOT NULL,
407 pid INTEGER NOT NULL,
408 hostname TEXT NOT NULL,
409 thread_id BIGINT NOT NULL,
410 started_at TIMESTAMPTZ NOT NULL,
411 parent_runner_id TEXT,
412 parent_runner_cls TEXT
413 );
414 CREATE INDEX IF NOT EXISTS idx_runner_contexts_parent
415 ON runner_contexts(parent_runner_id);
416
417 -- Add history_timestamp for time-range queries
418 ALTER TABLE history ADD COLUMN IF NOT EXISTS history_timestamp TIMESTAMPTZ;
419 CREATE INDEX IF NOT EXISTS idx_history_runner
420 ON history(runner_id);
421 ",
422 )
423 .await
424 .map_err(|e| {
425 RustvelloError::state_backend(format!("schema init failed: {}", fmt_pg(&e)))
426 })?;
427
428 Ok(())
429 }
430}
431
432pub(crate) fn pg_err(e: tokio_postgres::Error) -> RustvelloError {
433 RustvelloError::state_backend(format!("Postgres error: {}", fmt_pg(&e)))
434}
435
436pub(crate) fn parse_status(s: &str) -> RustvelloResult<rustvello_proto::status::InvocationStatus> {
437 s.parse::<rustvello_proto::status::InvocationStatus>()
438 .map_err(RustvelloError::state_backend)
439}