1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4 ExecutionInfo, ExecutionMetadata, InstanceInfo, OrchestrationItem, Provider, ProviderAdmin,
5 ProviderError, QueueDepths, SystemMetrics, WorkItem,
6};
7use duroxide::Event;
8use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
9use std::sync::Arc;
10use std::time::{SystemTime, UNIX_EPOCH};
11use std::time::Duration;
12use tokio::time::sleep;
13use tracing::{debug, error, instrument};
14
15use crate::migrations::MigrationRunner;
16
17pub struct PostgresProvider {
40 pool: Arc<PgPool>,
41 schema_name: String,
42}
43
44impl PostgresProvider {
45 pub async fn new(database_url: &str) -> Result<Self> {
46 Self::new_with_schema(database_url, None).await
47 }
48
49 pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
50 let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
51 .ok()
52 .and_then(|s| s.parse::<u32>().ok())
53 .unwrap_or(10);
54
55 let pool = PgPoolOptions::new()
56 .max_connections(max_connections)
57 .min_connections(1)
58 .acquire_timeout(std::time::Duration::from_secs(30))
59 .connect(database_url)
60 .await?;
61
62 let schema_name = schema_name.unwrap_or("public").to_string();
63
64 let provider = Self {
65 pool: Arc::new(pool),
66 schema_name: schema_name.clone(),
67 };
68
69 let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
71 migration_runner.migrate().await?;
72
73 Ok(provider)
74 }
75
76 #[instrument(skip(self), target = "duroxide::providers::postgres")]
77 pub async fn initialize_schema(&self) -> Result<()> {
78 let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
81 migration_runner.migrate().await?;
82 Ok(())
83 }
84
85 fn now_millis() -> i64 {
87 SystemTime::now()
88 .duration_since(UNIX_EPOCH)
89 .unwrap()
90 .as_millis() as i64
91 }
92
93 fn table_name(&self, table: &str) -> String {
95 format!("{}.{}", self.schema_name, table)
96 }
97
98 pub fn pool(&self) -> &PgPool {
100 &self.pool
101 }
102
103 pub fn schema_name(&self) -> &str {
105 &self.schema_name
106 }
107
108 fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
110 match e {
111 SqlxError::Database(ref db_err) => {
112 let code_opt = db_err.code();
114 let code = code_opt.as_deref();
115 if code == Some("40P01") {
116 ProviderError::retryable(operation, format!("Deadlock detected: {}", e))
118 } else if code == Some("40001") {
119 ProviderError::permanent(operation, format!("Serialization failure: {}", e))
121 } else if code == Some("23505") {
122 ProviderError::permanent(operation, format!("Duplicate detected: {}", e))
124 } else if code == Some("23503") {
125 ProviderError::permanent(operation, format!("Foreign key violation: {}", e))
127 } else {
128 ProviderError::permanent(operation, format!("Database error: {}", e))
129 }
130 }
131 SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
132 ProviderError::retryable(operation, format!("Connection pool error: {}", e))
133 }
134 SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {}", e)),
135 _ => ProviderError::permanent(operation, format!("Unexpected error: {}", e)),
136 }
137 }
138
139 pub async fn cleanup_schema(&self) -> Result<()> {
144 sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
146 .execute(&*self.pool)
147 .await?;
148
149 if self.schema_name != "public" {
152 sqlx::query(&format!(
153 "DROP SCHEMA IF EXISTS {} CASCADE",
154 self.schema_name
155 ))
156 .execute(&*self.pool)
157 .await?;
158 } else {
159 }
162
163 Ok(())
164 }
165}
166
167#[async_trait::async_trait]
168impl Provider for PostgresProvider {
169 #[instrument(skip(self), target = "duroxide::providers::postgres")]
170 async fn fetch_orchestration_item(
171 &self,
172 lock_timeout: Duration,
173 ) -> Result<Option<OrchestrationItem>, ProviderError> {
174 let start = std::time::Instant::now();
175
176 const MAX_RETRIES: u32 = 3;
177 const RETRY_DELAY_MS: u64 = 10;
178
179 let lock_timeout_ms = lock_timeout.as_millis() as i64;
181
182 for attempt in 0..=MAX_RETRIES {
183 let now_ms = Self::now_millis();
184
185 let row: Option<(
186 String,
187 String,
188 String,
189 i64,
190 serde_json::Value,
191 serde_json::Value,
192 String,
193 )> = sqlx::query_as(&format!(
194 "SELECT * FROM {}.fetch_orchestration_item($1, $2)",
195 self.schema_name
196 ))
197 .bind(now_ms)
198 .bind(lock_timeout_ms)
199 .fetch_optional(&*self.pool)
200 .await
201 .map_err(|e| Self::sqlx_to_provider_error("fetch_orchestration_item", e))?;
202
203 if let Some((
204 instance_id,
205 orchestration_name,
206 orchestration_version,
207 execution_id,
208 history_json,
209 messages_json,
210 lock_token,
211 )) = row
212 {
213 let history: Vec<Event> = serde_json::from_value(history_json).map_err(|e| {
214 ProviderError::permanent(
215 "fetch_orchestration_item",
216 &format!("Failed to deserialize history: {}", e),
217 )
218 })?;
219
220 let messages: Vec<WorkItem> =
221 serde_json::from_value(messages_json).map_err(|e| {
222 ProviderError::permanent(
223 "fetch_orchestration_item",
224 &format!("Failed to deserialize messages: {}", e),
225 )
226 })?;
227
228 let duration_ms = start.elapsed().as_millis() as u64;
229 debug!(
230 target = "duroxide::providers::postgres",
231 operation = "fetch_orchestration_item",
232 instance_id = %instance_id,
233 execution_id = execution_id,
234 message_count = messages.len(),
235 history_count = history.len(),
236 duration_ms = duration_ms,
237 attempts = attempt + 1,
238 "Fetched orchestration item via stored procedure"
239 );
240
241 return Ok(Some(OrchestrationItem {
242 instance: instance_id,
243 orchestration_name,
244 execution_id: execution_id as u64,
245 version: orchestration_version,
246 history,
247 messages,
248 lock_token,
249 }));
250 }
251
252 if attempt < MAX_RETRIES {
253 sleep(std::time::Duration::from_millis(RETRY_DELAY_MS)).await;
254 }
255 }
256
257 Ok(None)
258 }
259 #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
260 async fn ack_orchestration_item(
261 &self,
262 lock_token: &str,
263 execution_id: u64,
264 history_delta: Vec<Event>,
265 worker_items: Vec<WorkItem>,
266 orchestrator_items: Vec<WorkItem>,
267 metadata: ExecutionMetadata,
268 ) -> Result<(), ProviderError> {
269 let start = std::time::Instant::now();
270
271 let mut history_delta_payload = Vec::with_capacity(history_delta.len());
272 for event in &history_delta {
273 if event.event_id() == 0 {
274 return Err(ProviderError::permanent(
275 "ack_orchestration_item",
276 "event_id must be set by runtime",
277 ));
278 }
279
280 let event_json = serde_json::to_string(event).map_err(|e| {
281 ProviderError::permanent(
282 "ack_orchestration_item",
283 &format!("Failed to serialize event: {}", e),
284 )
285 })?;
286
287 let event_type = format!("{:?}", event)
288 .split('{')
289 .next()
290 .unwrap_or("Unknown")
291 .trim()
292 .to_string();
293
294 history_delta_payload.push(serde_json::json!({
295 "event_id": event.event_id(),
296 "event_type": event_type,
297 "event_data": event_json,
298 }));
299 }
300
301 let history_delta_json = serde_json::Value::Array(history_delta_payload);
302
303 let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
304 ProviderError::permanent(
305 "ack_orchestration_item",
306 &format!("Failed to serialize worker items: {}", e),
307 )
308 })?;
309
310 let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
311 ProviderError::permanent(
312 "ack_orchestration_item",
313 &format!("Failed to serialize orchestrator items: {}", e),
314 )
315 })?;
316
317 let metadata_json = serde_json::json!({
318 "orchestration_name": metadata.orchestration_name,
319 "orchestration_version": metadata.orchestration_version,
320 "status": metadata.status,
321 "output": metadata.output,
322 });
323
324 match sqlx::query(&format!(
325 "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6)",
326 self.schema_name
327 ))
328 .bind(lock_token)
329 .bind(execution_id as i64)
330 .bind(history_delta_json)
331 .bind(worker_items_json)
332 .bind(orchestrator_items_json)
333 .bind(metadata_json)
334 .execute(&*self.pool)
335 .await
336 {
337 Ok(_) => {}
338 Err(e) => {
339 if let SqlxError::Database(db_err) = &e {
340 if db_err.message().contains("Invalid lock token") {
341 return Err(ProviderError::permanent(
342 "ack_orchestration_item",
343 "Invalid lock token",
344 ));
345 }
346 } else if e.to_string().contains("Invalid lock token") {
347 return Err(ProviderError::permanent(
348 "ack_orchestration_item",
349 "Invalid lock token",
350 ));
351 }
352
353 return Err(Self::sqlx_to_provider_error("ack_orchestration_item", e));
354 }
355 }
356
357 let duration_ms = start.elapsed().as_millis() as u64;
358 debug!(
359 target = "duroxide::providers::postgres",
360 operation = "ack_orchestration_item",
361 execution_id = execution_id,
362 history_count = history_delta.len(),
363 worker_items_count = worker_items.len(),
364 orchestrator_items_count = orchestrator_items.len(),
365 duration_ms = duration_ms,
366 "Acknowledged orchestration item via stored procedure"
367 );
368
369 Ok(())
370 }
371 #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
372 async fn abandon_orchestration_item(
373 &self,
374 lock_token: &str,
375 delay: Option<Duration>,
376 ) -> Result<(), ProviderError> {
377 let start = std::time::Instant::now();
378 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
379
380 let instance_id = match sqlx::query_scalar::<_, String>(&format!(
381 "SELECT {}.abandon_orchestration_item($1, $2)",
382 self.schema_name
383 ))
384 .bind(lock_token)
385 .bind(delay_param)
386 .fetch_one(&*self.pool)
387 .await
388 {
389 Ok(instance_id) => instance_id,
390 Err(e) => {
391 if let SqlxError::Database(db_err) = &e {
392 if db_err.message().contains("Invalid lock token") {
393 return Err(ProviderError::permanent(
394 "abandon_orchestration_item",
395 "Invalid lock token",
396 ));
397 }
398 } else if e.to_string().contains("Invalid lock token") {
399 return Err(ProviderError::permanent(
400 "abandon_orchestration_item",
401 "Invalid lock token",
402 ));
403 }
404
405 return Err(Self::sqlx_to_provider_error(
406 "abandon_orchestration_item",
407 e,
408 ));
409 }
410 };
411
412 let duration_ms = start.elapsed().as_millis() as u64;
413 debug!(
414 target = "duroxide::providers::postgres",
415 operation = "abandon_orchestration_item",
416 instance_id = %instance_id,
417 delay_ms = delay.map(|d| d.as_millis() as u64),
418 duration_ms = duration_ms,
419 "Abandoned orchestration item via stored procedure"
420 );
421
422 Ok(())
423 }
424
425 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
426 async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
427 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
428 "SELECT out_event_data FROM {}.fetch_history($1)",
429 self.schema_name
430 ))
431 .bind(instance)
432 .fetch_all(&*self.pool)
433 .await
434 .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
435
436 Ok(event_data_rows
437 .into_iter()
438 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
439 .collect())
440 }
441
442 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
443 async fn append_with_execution(
444 &self,
445 instance: &str,
446 execution_id: u64,
447 new_events: Vec<Event>,
448 ) -> Result<(), ProviderError> {
449 if new_events.is_empty() {
450 return Ok(());
451 }
452
453 let mut events_payload = Vec::with_capacity(new_events.len());
454 for event in &new_events {
455 if event.event_id() == 0 {
456 error!(
457 target = "duroxide::providers::postgres",
458 operation = "append_with_execution",
459 error_type = "validation_error",
460 instance_id = %instance,
461 execution_id = execution_id,
462 "event_id must be set by runtime"
463 );
464 return Err(ProviderError::permanent(
465 "append_with_execution",
466 "event_id must be set by runtime",
467 ));
468 }
469
470 let event_json = serde_json::to_string(event).map_err(|e| {
471 ProviderError::permanent(
472 "append_with_execution",
473 &format!("Failed to serialize event: {}", e),
474 )
475 })?;
476
477 let event_type = format!("{:?}", event)
478 .split('{')
479 .next()
480 .unwrap_or("Unknown")
481 .trim()
482 .to_string();
483
484 events_payload.push(serde_json::json!({
485 "event_id": event.event_id(),
486 "event_type": event_type,
487 "event_data": event_json,
488 }));
489 }
490
491 let events_json = serde_json::Value::Array(events_payload);
492
493 sqlx::query(&format!(
494 "SELECT {}.append_history($1, $2, $3)",
495 self.schema_name
496 ))
497 .bind(instance)
498 .bind(execution_id as i64)
499 .bind(events_json)
500 .execute(&*self.pool)
501 .await
502 .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
503
504 debug!(
505 target = "duroxide::providers::postgres",
506 operation = "append_with_execution",
507 instance_id = %instance,
508 execution_id = execution_id,
509 event_count = new_events.len(),
510 "Appended history events via stored procedure"
511 );
512
513 Ok(())
514 }
515
516 #[instrument(skip(self), target = "duroxide::providers::postgres")]
517 async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
518 let work_item = serde_json::to_string(&item).map_err(|e| {
519 ProviderError::permanent(
520 "enqueue_worker_work",
521 &format!("Failed to serialize work item: {}", e),
522 )
523 })?;
524
525 sqlx::query(&format!(
526 "SELECT {}.enqueue_worker_work($1)",
527 self.schema_name
528 ))
529 .bind(work_item)
530 .execute(&*self.pool)
531 .await
532 .map_err(|e| {
533 error!(
534 target = "duroxide::providers::postgres",
535 operation = "enqueue_worker_work",
536 error_type = "database_error",
537 error = %e,
538 "Failed to enqueue worker work"
539 );
540 Self::sqlx_to_provider_error("enqueue_worker_work", e)
541 })?;
542
543 Ok(())
544 }
545
546 #[instrument(skip(self), target = "duroxide::providers::postgres")]
547 async fn fetch_work_item(&self, lock_timeout: Duration) -> Result<Option<(WorkItem, String)>, ProviderError> {
548 let start = std::time::Instant::now();
549
550 let lock_timeout_ms = lock_timeout.as_millis() as i64;
552
553 let row = match sqlx::query_as::<_, (String, String)>(&format!(
554 "SELECT * FROM {}.fetch_work_item($1, $2)",
555 self.schema_name
556 ))
557 .bind(Self::now_millis())
558 .bind(lock_timeout_ms)
559 .fetch_optional(&*self.pool)
560 .await
561 {
562 Ok(row) => row,
563 Err(e) => {
564 return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
565 }
566 };
567
568 let (work_item_json, lock_token) = match row {
569 Some(row) => row,
570 None => return Ok(None),
571 };
572
573 let work_item: WorkItem = serde_json::from_str(&work_item_json)
574 .map_err(|e| ProviderError::permanent("fetch_work_item", &format!("Failed to deserialize worker item: {}", e)))?;
575
576 let duration_ms = start.elapsed().as_millis() as u64;
577
578 let instance_id = match &work_item {
580 WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
581 WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
582 WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
583 WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
584 WorkItem::TimerFired { instance, .. } => instance.as_str(),
585 WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
586 WorkItem::CancelInstance { instance, .. } => instance.as_str(),
587 WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
588 WorkItem::SubOrchCompleted { parent_instance, .. } => parent_instance.as_str(),
589 WorkItem::SubOrchFailed { parent_instance, .. } => parent_instance.as_str(),
590 };
591
592 debug!(
593 target = "duroxide::providers::postgres",
594 operation = "fetch_work_item",
595 instance_id = %instance_id,
596 duration_ms = duration_ms,
597 "Fetched activity work item via stored procedure"
598 );
599
600 Ok(Some((work_item, lock_token)))
601 }
602
603 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
604 async fn ack_work_item(&self, token: &str, completion: WorkItem) -> Result<(), ProviderError> {
605 let start = std::time::Instant::now();
606
607 let instance_id = match &completion {
609 WorkItem::ActivityCompleted { instance, .. }
610 | WorkItem::ActivityFailed { instance, .. } => instance,
611 _ => {
612 error!(
613 target = "duroxide::providers::postgres",
614 operation = "ack_worker",
615 error_type = "invalid_completion_type",
616 "Invalid completion work item type"
617 );
618 return Err(ProviderError::permanent(
619 "ack_worker",
620 "Invalid completion work item type",
621 ));
622 }
623 };
624
625 let completion_json = serde_json::to_string(&completion).map_err(|e| {
626 ProviderError::permanent(
627 "ack_worker",
628 &format!("Failed to serialize completion: {}", e),
629 )
630 })?;
631
632 sqlx::query(&format!(
634 "SELECT {}.ack_worker($1, $2, $3)",
635 self.schema_name
636 ))
637 .bind(token)
638 .bind(instance_id)
639 .bind(completion_json)
640 .execute(&*self.pool)
641 .await
642 .map_err(|e| {
643 if e.to_string().contains("Worker queue item not found") {
644 error!(
645 target = "duroxide::providers::postgres",
646 operation = "ack_worker",
647 error_type = "worker_item_not_found",
648 token = %token,
649 "Worker queue item not found or already processed"
650 );
651 ProviderError::permanent(
652 "ack_worker",
653 "Worker queue item not found or already processed",
654 )
655 } else {
656 Self::sqlx_to_provider_error("ack_worker", e)
657 }
658 })?;
659
660 let duration_ms = start.elapsed().as_millis() as u64;
661 debug!(
662 target = "duroxide::providers::postgres",
663 operation = "ack_worker",
664 instance_id = %instance_id,
665 duration_ms = duration_ms,
666 "Acknowledged worker and enqueued completion"
667 );
668
669 Ok(())
670 }
671
672 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
673 async fn renew_work_item_lock(&self, token: &str, extend_for: Duration) -> Result<(), ProviderError> {
674 let start = std::time::Instant::now();
675
676 let now_ms = Self::now_millis();
678
679 let extend_secs = extend_for.as_secs() as i64;
681
682 match sqlx::query(&format!(
683 "SELECT {}.renew_work_item_lock($1, $2, $3)",
684 self.schema_name
685 ))
686 .bind(token)
687 .bind(now_ms)
688 .bind(extend_secs)
689 .execute(&*self.pool)
690 .await
691 {
692 Ok(_) => {
693 let duration_ms = start.elapsed().as_millis() as u64;
694 debug!(
695 target = "duroxide::providers::postgres",
696 operation = "renew_work_item_lock",
697 token = %token,
698 extend_for_secs = extend_secs,
699 duration_ms = duration_ms,
700 "Work item lock renewed successfully"
701 );
702 Ok(())
703 }
704 Err(e) => {
705 if let SqlxError::Database(db_err) = &e {
706 if db_err.message().contains("Lock token invalid") {
707 return Err(ProviderError::permanent(
708 "renew_work_item_lock",
709 "Lock token invalid, expired, or already acked",
710 ));
711 }
712 } else if e.to_string().contains("Lock token invalid") {
713 return Err(ProviderError::permanent(
714 "renew_work_item_lock",
715 "Lock token invalid, expired, or already acked",
716 ));
717 }
718
719 Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
720 }
721 }
722 }
723
724 #[instrument(skip(self), target = "duroxide::providers::postgres")]
725 async fn enqueue_for_orchestrator(
726 &self,
727 item: WorkItem,
728 delay: Option<Duration>,
729 ) -> Result<(), ProviderError> {
730 let work_item = serde_json::to_string(&item).map_err(|e| {
731 ProviderError::permanent(
732 "enqueue_orchestrator_work",
733 &format!("Failed to serialize work item: {}", e),
734 )
735 })?;
736
737 let instance_id = match &item {
739 WorkItem::StartOrchestration { instance, .. }
740 | WorkItem::ActivityCompleted { instance, .. }
741 | WorkItem::ActivityFailed { instance, .. }
742 | WorkItem::TimerFired { instance, .. }
743 | WorkItem::ExternalRaised { instance, .. }
744 | WorkItem::CancelInstance { instance, .. }
745 | WorkItem::ContinueAsNew { instance, .. } => instance,
746 WorkItem::SubOrchCompleted {
747 parent_instance, ..
748 }
749 | WorkItem::SubOrchFailed {
750 parent_instance, ..
751 } => parent_instance,
752 WorkItem::ActivityExecute { .. } => {
753 return Err(ProviderError::permanent(
754 "enqueue_orchestrator_work",
755 "ActivityExecute should go to worker queue, not orchestrator queue",
756 ));
757 }
758 };
759
760 let now_ms = Self::now_millis();
762
763 let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
764 if *fire_at_ms > 0 {
765 if let Some(delay) = delay {
767 std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
768 } else {
769 *fire_at_ms
770 }
771 } else {
772 delay.map(|d| now_ms as u64 + d.as_millis() as u64).unwrap_or(now_ms as u64)
774 }
775 } else {
776 delay.map(|d| now_ms as u64 + d.as_millis() as u64).unwrap_or(now_ms as u64)
778 };
779
780 let visible_at = Utc
781 .timestamp_millis_opt(visible_at_ms as i64)
782 .single()
783 .ok_or_else(|| {
784 ProviderError::permanent(
785 "enqueue_orchestrator_work",
786 "Invalid visible_at timestamp",
787 )
788 })?;
789
790 sqlx::query(&format!(
795 "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
796 self.schema_name
797 ))
798 .bind(instance_id)
799 .bind(&work_item)
800 .bind(visible_at)
801 .bind::<Option<String>>(None) .bind::<Option<String>>(None) .bind::<Option<i64>>(None) .execute(&*self.pool)
805 .await
806 .map_err(|e| {
807 error!(
808 target = "duroxide::providers::postgres",
809 operation = "enqueue_orchestrator_work",
810 error_type = "database_error",
811 error = %e,
812 instance_id = %instance_id,
813 "Failed to enqueue orchestrator work"
814 );
815 Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
816 })?;
817
818 debug!(
819 target = "duroxide::providers::postgres",
820 operation = "enqueue_orchestrator_work",
821 instance_id = %instance_id,
822 delay_ms = delay.map(|d| d.as_millis() as u64),
823 "Enqueued orchestrator work"
824 );
825
826 Ok(())
827 }
828
829 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
830 async fn read_with_execution(
831 &self,
832 instance: &str,
833 execution_id: u64,
834 ) -> Result<Vec<Event>, ProviderError> {
835 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
836 "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
837 self.table_name("history")
838 ))
839 .bind(instance)
840 .bind(execution_id as i64)
841 .fetch_all(&*self.pool)
842 .await
843 .ok()
844 .unwrap_or_default();
845
846 Ok(event_data_rows
847 .into_iter()
848 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
849 .collect())
850 }
851
852 fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
853 Some(self)
854 }
855}
856
857#[async_trait::async_trait]
858impl ProviderAdmin for PostgresProvider {
859 #[instrument(skip(self), target = "duroxide::providers::postgres")]
860 async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
861 sqlx::query_scalar(&format!(
862 "SELECT instance_id FROM {}.list_instances()",
863 self.schema_name
864 ))
865 .fetch_all(&*self.pool)
866 .await
867 .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
868 }
869
870 #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
871 async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
872 sqlx::query_scalar(&format!(
873 "SELECT instance_id FROM {}.list_instances_by_status($1)",
874 self.schema_name
875 ))
876 .bind(status)
877 .fetch_all(&*self.pool)
878 .await
879 .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
880 }
881
882 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
883 async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
884 let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
885 "SELECT execution_id FROM {}.list_executions($1)",
886 self.schema_name
887 ))
888 .bind(instance)
889 .fetch_all(&*self.pool)
890 .await
891 .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
892
893 Ok(execution_ids.into_iter().map(|id| id as u64).collect())
894 }
895
896 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
897 async fn read_history_with_execution_id(
898 &self,
899 instance: &str,
900 execution_id: u64,
901 ) -> Result<Vec<Event>, ProviderError> {
902 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
903 "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
904 self.schema_name
905 ))
906 .bind(instance)
907 .bind(execution_id as i64)
908 .fetch_all(&*self.pool)
909 .await
910 .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
911
912 event_data_rows
913 .into_iter()
914 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
915 .collect::<Vec<Event>>()
916 .into_iter()
917 .map(Ok)
918 .collect()
919 }
920
921 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
922 async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
923 let execution_id = self.latest_execution_id(instance).await?;
924 self.read_history_with_execution_id(instance, execution_id)
925 .await
926 }
927
928 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
929 async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
930 sqlx::query_scalar(&format!(
931 "SELECT {}.latest_execution_id($1)",
932 self.schema_name
933 ))
934 .bind(instance)
935 .fetch_optional(&*self.pool)
936 .await
937 .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
938 .map(|id: i64| id as u64)
939 .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
940 }
941
942 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
943 async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
944 let row: Option<(
945 String,
946 String,
947 String,
948 i64,
949 chrono::DateTime<Utc>,
950 Option<chrono::DateTime<Utc>>,
951 Option<String>,
952 Option<String>,
953 )> = sqlx::query_as(&format!(
954 "SELECT * FROM {}.get_instance_info($1)",
955 self.schema_name
956 ))
957 .bind(instance)
958 .fetch_optional(&*self.pool)
959 .await
960 .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
961
962 let (
963 instance_id,
964 orchestration_name,
965 orchestration_version,
966 current_execution_id,
967 created_at,
968 updated_at,
969 status,
970 output,
971 ) =
972 row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
973
974 Ok(InstanceInfo {
975 instance_id,
976 orchestration_name,
977 orchestration_version,
978 current_execution_id: current_execution_id as u64,
979 status: status.unwrap_or_else(|| "Running".to_string()),
980 output,
981 created_at: created_at.timestamp_millis() as u64,
982 updated_at: updated_at
983 .map(|dt| dt.timestamp_millis() as u64)
984 .unwrap_or(created_at.timestamp_millis() as u64),
985 })
986 }
987
988 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
989 async fn get_execution_info(
990 &self,
991 instance: &str,
992 execution_id: u64,
993 ) -> Result<ExecutionInfo, ProviderError> {
994 let row: Option<(
995 i64,
996 String,
997 Option<String>,
998 chrono::DateTime<Utc>,
999 Option<chrono::DateTime<Utc>>,
1000 i64,
1001 )> = sqlx::query_as(&format!(
1002 "SELECT * FROM {}.get_execution_info($1, $2)",
1003 self.schema_name
1004 ))
1005 .bind(instance)
1006 .bind(execution_id as i64)
1007 .fetch_optional(&*self.pool)
1008 .await
1009 .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1010
1011 let (exec_id, status, output, started_at, completed_at, event_count) = row
1012 .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1013
1014 Ok(ExecutionInfo {
1015 execution_id: exec_id as u64,
1016 status,
1017 output,
1018 started_at: started_at.timestamp_millis() as u64,
1019 completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1020 event_count: event_count as usize,
1021 })
1022 }
1023
1024 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1025 async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1026 let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1027 "SELECT * FROM {}.get_system_metrics()",
1028 self.schema_name
1029 ))
1030 .fetch_optional(&*self.pool)
1031 .await
1032 .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1033
1034 let (
1035 total_instances,
1036 total_executions,
1037 running_instances,
1038 completed_instances,
1039 failed_instances,
1040 total_events,
1041 ) = row.ok_or_else(|| {
1042 ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1043 })?;
1044
1045 Ok(SystemMetrics {
1046 total_instances: total_instances as u64,
1047 total_executions: total_executions as u64,
1048 running_instances: running_instances as u64,
1049 completed_instances: completed_instances as u64,
1050 failed_instances: failed_instances as u64,
1051 total_events: total_events as u64,
1052 })
1053 }
1054
1055 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1056 async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1057 let now_ms = Self::now_millis();
1058
1059 let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1060 "SELECT * FROM {}.get_queue_depths($1)",
1061 self.schema_name
1062 ))
1063 .bind(now_ms)
1064 .fetch_optional(&*self.pool)
1065 .await
1066 .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1067
1068 let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1069 ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1070 })?;
1071
1072 Ok(QueueDepths {
1073 orchestrator_queue: orchestrator_queue as usize,
1074 worker_queue: worker_queue as usize,
1075 timer_queue: 0, })
1077 }
1078}