1use deadpool_postgres::Pool;
4use tokio_postgres::NoTls;
5
6use rustvello_core::error::{RustvelloError, RustvelloResult};
7
8pub struct Database {
16 pub(crate) pool: Pool,
17 app_id: String,
18}
19
20impl Database {
21 pub async fn connect(connection_string: &str, app_id: &str) -> RustvelloResult<Self> {
36 Self::connect_with_pool_size(connection_string, app_id, None).await
37 }
38
39 pub async fn connect_with_pool_size(
43 connection_string: &str,
44 app_id: &str,
45 max_size: Option<usize>,
46 ) -> RustvelloResult<Self> {
47 let pg_config: tokio_postgres::Config =
48 connection_string
49 .parse()
50 .map_err(|e: tokio_postgres::Error| {
51 RustvelloError::state_backend(format!("invalid Postgres config: {e}"))
52 })?;
53
54 let mgr = deadpool_postgres::Manager::new(pg_config, NoTls);
55 let mut builder = Pool::builder(mgr);
56 if let Some(size) = max_size {
57 builder = builder.max_size(size);
58 }
59 let pool = builder
60 .build()
61 .map_err(|e| RustvelloError::state_backend(format!("failed to create pool: {e}")))?;
62
63 let db = Self {
64 pool,
65 app_id: app_id.to_string(),
66 };
67 db.initialize_schema().await?;
68 Ok(db)
69 }
70
71 #[cfg(feature = "tls")]
80 #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
81 pub async fn connect_tls(connection_string: &str, app_id: &str) -> RustvelloResult<Self> {
82 Self::connect_tls_with_pool_size(connection_string, app_id, None).await
83 }
84
85 #[cfg(feature = "tls")]
87 #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
88 pub async fn connect_tls_with_pool_size(
89 connection_string: &str,
90 app_id: &str,
91 max_size: Option<usize>,
92 ) -> RustvelloResult<Self> {
93 let pg_config: tokio_postgres::Config =
94 connection_string
95 .parse()
96 .map_err(|e: tokio_postgres::Error| {
97 RustvelloError::state_backend(format!("invalid Postgres config: {e}"))
98 })?;
99
100 let tls_connector = native_tls::TlsConnector::new().map_err(|e| {
101 RustvelloError::state_backend(format!("failed to create TLS connector: {e}"))
102 })?;
103 let pg_tls = postgres_native_tls::MakeTlsConnector::new(tls_connector);
104
105 let mgr = deadpool_postgres::Manager::new(pg_config, pg_tls);
106 let mut builder = Pool::builder(mgr);
107 if let Some(size) = max_size {
108 builder = builder.max_size(size);
109 }
110 let pool = builder
111 .build()
112 .map_err(|e| RustvelloError::state_backend(format!("failed to create pool: {e}")))?;
113
114 let db = Self {
115 pool,
116 app_id: app_id.to_string(),
117 };
118 db.initialize_schema().await?;
119 Ok(db)
120 }
121
122 pub async fn from_pool(pool: Pool, app_id: &str) -> RustvelloResult<Self> {
124 let db = Self {
125 pool,
126 app_id: app_id.to_string(),
127 };
128 db.initialize_schema().await?;
129 Ok(db)
130 }
131
132 pub(crate) async fn conn(&self) -> RustvelloResult<deadpool_postgres::Client> {
137 let client = self
138 .pool
139 .get()
140 .await
141 .map_err(|e| RustvelloError::state_backend(format!("pool error: {e}")))?;
142 let escaped = self.app_id.replace('"', "\"\"");
144 client
145 .execute(&format!("SET search_path TO \"{escaped}\""), &[])
146 .await
147 .map_err(|e| RustvelloError::state_backend(format!("SET search_path failed: {e}")))?;
148 Ok(client)
149 }
150
151 async fn initialize_schema(&self) -> RustvelloResult<()> {
152 let client = self
155 .pool
156 .get()
157 .await
158 .map_err(|e| RustvelloError::state_backend(format!("pool error: {e}")))?;
159
160 let escaped = self.app_id.replace('"', "\"\"");
162 client
163 .batch_execute(&format!(
164 "CREATE SCHEMA IF NOT EXISTS \"{escaped}\"; SET search_path TO \"{escaped}\";"
165 ))
166 .await
167 .map_err(|e| RustvelloError::state_backend(format!("schema creation failed: {e}")))?;
168
169 client
170 .batch_execute(
171 "
172 -- Broker queue
173 CREATE TABLE IF NOT EXISTS broker_queue (
174 id BIGSERIAL PRIMARY KEY,
175 invocation_id TEXT NOT NULL,
176 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
177 );
178 CREATE INDEX IF NOT EXISTS idx_broker_queue_created
179 ON broker_queue(created_at);
180
181 -- Invocations
182 CREATE TABLE IF NOT EXISTS invocations (
183 invocation_id TEXT PRIMARY KEY,
184 task_id TEXT NOT NULL,
185 call_id TEXT NOT NULL,
186 status TEXT NOT NULL,
187 created_at TIMESTAMPTZ NOT NULL,
188 updated_at TIMESTAMPTZ NOT NULL,
189 parent_invocation_id TEXT,
190 workflow_id TEXT,
191 workflow_type TEXT,
192 workflow_depth INTEGER
193 );
194 CREATE INDEX IF NOT EXISTS idx_invocations_task
195 ON invocations(task_id);
196 CREATE INDEX IF NOT EXISTS idx_invocations_call
197 ON invocations(call_id);
198 CREATE INDEX IF NOT EXISTS idx_invocations_status
199 ON invocations(status);
200 CREATE INDEX IF NOT EXISTS idx_invocations_workflow
201 ON invocations(workflow_id);
202 CREATE INDEX IF NOT EXISTS idx_invocations_parent
203 ON invocations(parent_invocation_id);
204
205 -- Calls (arguments)
206 CREATE TABLE IF NOT EXISTS calls (
207 call_id TEXT PRIMARY KEY,
208 task_id TEXT NOT NULL,
209 serialized_arguments TEXT NOT NULL
210 );
211
212 -- Results
213 CREATE TABLE IF NOT EXISTS results (
214 invocation_id TEXT PRIMARY KEY,
215 result TEXT NOT NULL
216 );
217
218 -- Errors
219 CREATE TABLE IF NOT EXISTS errors (
220 invocation_id TEXT PRIMARY KEY,
221 error_type TEXT NOT NULL,
222 message TEXT NOT NULL,
223 traceback TEXT
224 );
225
226 -- Status history
227 CREATE TABLE IF NOT EXISTS history (
228 id BIGSERIAL PRIMARY KEY,
229 invocation_id TEXT NOT NULL,
230 status TEXT NOT NULL,
231 runner_id TEXT,
232 timestamp TIMESTAMPTZ NOT NULL,
233 message TEXT
234 );
235 CREATE INDEX IF NOT EXISTS idx_history_invocation
236 ON history(invocation_id);
237
238 -- Status records (current status with runner ownership)
239 CREATE TABLE IF NOT EXISTS status_records (
240 invocation_id TEXT PRIMARY KEY,
241 status TEXT NOT NULL,
242 runner_id TEXT,
243 timestamp TIMESTAMPTZ NOT NULL
244 );
245
246 -- Waiting-for relationships (blocking control)
247 CREATE TABLE IF NOT EXISTS waiting_for (
248 waiter_id TEXT NOT NULL,
249 waited_on_id TEXT NOT NULL,
250 PRIMARY KEY (waiter_id, waited_on_id)
251 );
252 CREATE INDEX IF NOT EXISTS idx_waiting_for_waited_on
253 ON waiting_for(waited_on_id);
254
255 -- Concurrency control: per-argument-pair index
256 CREATE TABLE IF NOT EXISTS cc_arg_pairs (
257 invocation_id TEXT NOT NULL,
258 task_id TEXT NOT NULL,
259 arg_key TEXT NOT NULL,
260 arg_value TEXT NOT NULL,
261 PRIMARY KEY (invocation_id, arg_key, arg_value)
262 );
263 CREATE INDEX IF NOT EXISTS idx_cc_arg_lookup
264 ON cc_arg_pairs(task_id, arg_key, arg_value);
265
266 -- Client data store
267 CREATE TABLE IF NOT EXISTS client_data (
268 data_key TEXT PRIMARY KEY,
269 data_value TEXT NOT NULL,
270 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
271 );
272
273 -- Runner heartbeats
274 CREATE TABLE IF NOT EXISTS runner_heartbeats (
275 runner_id TEXT PRIMARY KEY,
276 last_heartbeat TIMESTAMPTZ NOT NULL
277 );
278
279 -- Invocation retry counts
280 CREATE TABLE IF NOT EXISTS retries (
281 invocation_id TEXT PRIMARY KEY,
282 count INTEGER NOT NULL DEFAULT 0
283 );
284
285 -- Trigger conditions
286 CREATE TABLE IF NOT EXISTS trg_conditions (
287 condition_id TEXT PRIMARY KEY,
288 condition_type TEXT NOT NULL DEFAULT '',
289 condition_json TEXT NOT NULL,
290 event_code TEXT
291 );
292 CREATE INDEX IF NOT EXISTS idx_trg_cond_type
293 ON trg_conditions(condition_type);
294 CREATE INDEX IF NOT EXISTS idx_trg_cond_event_code
295 ON trg_conditions(event_code);
296
297 -- Trigger definitions
298 CREATE TABLE IF NOT EXISTS trg_triggers (
299 trigger_id TEXT PRIMARY KEY,
300 task_id TEXT NOT NULL,
301 logic TEXT NOT NULL,
302 argument_template TEXT
303 );
304
305 -- Condition-to-trigger mapping (many-to-many)
306 CREATE TABLE IF NOT EXISTS trg_condition_triggers (
307 condition_id TEXT NOT NULL,
308 trigger_id TEXT NOT NULL,
309 PRIMARY KEY (condition_id, trigger_id)
310 );
311 CREATE INDEX IF NOT EXISTS idx_trg_ct_trigger
312 ON trg_condition_triggers(trigger_id);
313
314 -- Valid conditions (pending evaluation)
315 CREATE TABLE IF NOT EXISTS trg_valid_conditions (
316 valid_condition_id TEXT PRIMARY KEY,
317 condition_id TEXT NOT NULL,
318 context_json TEXT NOT NULL
319 );
320 CREATE INDEX IF NOT EXISTS idx_trg_vc_condition
321 ON trg_valid_conditions(condition_id);
322
323 -- Source task → condition mapping (for fast lookup)
324 CREATE TABLE IF NOT EXISTS trg_source_task_conditions (
325 task_id TEXT NOT NULL,
326 condition_id TEXT NOT NULL,
327 PRIMARY KEY (task_id, condition_id)
328 );
329
330 -- Cron execution tracking
331 CREATE TABLE IF NOT EXISTS trg_cron_executions (
332 condition_id TEXT PRIMARY KEY,
333 last_execution TIMESTAMPTZ NOT NULL
334 );
335
336 -- Trigger run claims (dedup)
337 CREATE TABLE IF NOT EXISTS trg_trigger_run_claims (
338 trigger_run_id TEXT PRIMARY KEY,
339 claimed_at TIMESTAMPTZ NOT NULL
340 );
341
342 -- Workflow runs (discovery + tracking)
343 CREATE TABLE IF NOT EXISTS workflow_runs (
344 workflow_id TEXT PRIMARY KEY,
345 workflow_type TEXT NOT NULL,
346 parent_workflow_id TEXT,
347 depth INTEGER NOT NULL DEFAULT 0
348 );
349 CREATE INDEX IF NOT EXISTS idx_workflow_runs_type
350 ON workflow_runs(workflow_type);
351
352 -- Workflow key-value data store
353 CREATE TABLE IF NOT EXISTS workflow_data (
354 workflow_id TEXT NOT NULL,
355 data_key TEXT NOT NULL,
356 data_value TEXT NOT NULL,
357 PRIMARY KEY (workflow_id, data_key)
358 );
359
360 -- App info storage
361 CREATE TABLE IF NOT EXISTS app_infos (
362 app_id TEXT PRIMARY KEY,
363 info_json TEXT NOT NULL
364 );
365
366 -- Workflow sub-invocation tracking
367 CREATE TABLE IF NOT EXISTS workflow_sub_invocations (
368 workflow_id TEXT NOT NULL,
369 sub_invocation_id TEXT NOT NULL,
370 PRIMARY KEY (workflow_id, sub_invocation_id)
371 );
372
373 -- Runner execution contexts
374 CREATE TABLE IF NOT EXISTS runner_contexts (
375 runner_id TEXT PRIMARY KEY,
376 runner_cls TEXT NOT NULL,
377 pid INTEGER NOT NULL,
378 hostname TEXT NOT NULL,
379 thread_id BIGINT NOT NULL,
380 started_at TIMESTAMPTZ NOT NULL,
381 parent_runner_id TEXT,
382 parent_runner_cls TEXT
383 );
384 CREATE INDEX IF NOT EXISTS idx_runner_contexts_parent
385 ON runner_contexts(parent_runner_id);
386
387 -- Add history_timestamp for time-range queries
388 ALTER TABLE history ADD COLUMN IF NOT EXISTS history_timestamp TIMESTAMPTZ;
389 CREATE INDEX IF NOT EXISTS idx_history_runner
390 ON history(runner_id);
391 ",
392 )
393 .await
394 .map_err(|e| RustvelloError::state_backend(format!("schema init failed: {e}")))?;
395
396 Ok(())
397 }
398}
399
400pub(crate) fn pg_err(e: tokio_postgres::Error) -> RustvelloError {
401 RustvelloError::state_backend(format!("Postgres error: {e}"))
402}
403
404pub(crate) fn parse_status(s: &str) -> RustvelloResult<rustvello_proto::status::InvocationStatus> {
405 s.parse::<rustvello_proto::status::InvocationStatus>()
406 .map_err(RustvelloError::state_backend)
407}