1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4 ExecutionInfo, ExecutionMetadata, InstanceInfo, OrchestrationItem, Provider, ProviderAdmin,
5 ProviderError, QueueDepths, SystemMetrics, WorkItem,
6};
7use duroxide::Event;
8use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
9use std::sync::Arc;
10use std::time::Duration;
11use std::time::{SystemTime, UNIX_EPOCH};
12use tokio::time::sleep;
13use tracing::{debug, error, instrument, warn};
14
15use crate::migrations::MigrationRunner;
16
17pub struct PostgresProvider {
40 pool: Arc<PgPool>,
41 schema_name: String,
42}
43
44impl PostgresProvider {
45 pub async fn new(database_url: &str) -> Result<Self> {
46 Self::new_with_schema(database_url, None).await
47 }
48
49 pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
50 let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
51 .ok()
52 .and_then(|s| s.parse::<u32>().ok())
53 .unwrap_or(10);
54
55 let pool = PgPoolOptions::new()
56 .max_connections(max_connections)
57 .min_connections(1)
58 .acquire_timeout(std::time::Duration::from_secs(30))
59 .connect(database_url)
60 .await?;
61
62 let schema_name = schema_name.unwrap_or("public").to_string();
63
64 let provider = Self {
65 pool: Arc::new(pool),
66 schema_name: schema_name.clone(),
67 };
68
69 let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
71 migration_runner.migrate().await?;
72
73 Ok(provider)
74 }
75
76 #[instrument(skip(self), target = "duroxide::providers::postgres")]
77 pub async fn initialize_schema(&self) -> Result<()> {
78 let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
81 migration_runner.migrate().await?;
82 Ok(())
83 }
84
85 fn now_millis() -> i64 {
87 SystemTime::now()
88 .duration_since(UNIX_EPOCH)
89 .unwrap()
90 .as_millis() as i64
91 }
92
93 fn table_name(&self, table: &str) -> String {
95 format!("{}.{}", self.schema_name, table)
96 }
97
98 pub fn pool(&self) -> &PgPool {
100 &self.pool
101 }
102
103 pub fn schema_name(&self) -> &str {
105 &self.schema_name
106 }
107
108 fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
110 match e {
111 SqlxError::Database(ref db_err) => {
112 let code_opt = db_err.code();
114 let code = code_opt.as_deref();
115 if code == Some("40P01") {
116 ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
118 } else if code == Some("40001") {
119 ProviderError::permanent(operation, format!("Serialization failure: {e}"))
121 } else if code == Some("23505") {
122 ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
124 } else if code == Some("23503") {
125 ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
127 } else {
128 ProviderError::permanent(operation, format!("Database error: {e}"))
129 }
130 }
131 SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
132 ProviderError::retryable(operation, format!("Connection pool error: {e}"))
133 }
134 SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
135 _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
136 }
137 }
138
139 pub async fn cleanup_schema(&self) -> Result<()> {
144 sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
146 .execute(&*self.pool)
147 .await?;
148
149 if self.schema_name != "public" {
152 sqlx::query(&format!(
153 "DROP SCHEMA IF EXISTS {} CASCADE",
154 self.schema_name
155 ))
156 .execute(&*self.pool)
157 .await?;
158 } else {
159 }
162
163 Ok(())
164 }
165}
166
167#[async_trait::async_trait]
168impl Provider for PostgresProvider {
169 #[instrument(skip(self), target = "duroxide::providers::postgres")]
170 async fn fetch_orchestration_item(
171 &self,
172 lock_timeout: Duration,
173 _poll_timeout: Duration,
174 ) -> Result<Option<OrchestrationItem>, ProviderError> {
175 let start = std::time::Instant::now();
176
177 const MAX_RETRIES: u32 = 3;
178 const RETRY_DELAY_MS: u64 = 50;
179
180 let lock_timeout_ms = lock_timeout.as_millis() as i64;
182 let mut _last_error: Option<ProviderError> = None;
183
184 for attempt in 0..=MAX_RETRIES {
185 let now_ms = Self::now_millis();
186
187 let result: Result<
188 Option<(
189 String,
190 String,
191 String,
192 i64,
193 serde_json::Value,
194 serde_json::Value,
195 String,
196 )>,
197 SqlxError,
198 > = sqlx::query_as(&format!(
199 "SELECT * FROM {}.fetch_orchestration_item($1, $2)",
200 self.schema_name
201 ))
202 .bind(now_ms)
203 .bind(lock_timeout_ms)
204 .fetch_optional(&*self.pool)
205 .await;
206
207 let row = match result {
208 Ok(r) => r,
209 Err(e) => {
210 let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
211 if provider_err.is_retryable() && attempt < MAX_RETRIES {
212 warn!(
213 target = "duroxide::providers::postgres",
214 operation = "fetch_orchestration_item",
215 attempt = attempt + 1,
216 error = %provider_err,
217 "Retryable error, will retry"
218 );
219 _last_error = Some(provider_err);
220 sleep(std::time::Duration::from_millis(
221 RETRY_DELAY_MS * (attempt as u64 + 1),
222 ))
223 .await;
224 continue;
225 }
226 return Err(provider_err);
227 }
228 };
229
230 if let Some((
231 instance_id,
232 orchestration_name,
233 orchestration_version,
234 execution_id,
235 history_json,
236 messages_json,
237 lock_token,
238 )) = row
239 {
240 let history: Vec<Event> = serde_json::from_value(history_json).map_err(|e| {
241 ProviderError::permanent(
242 "fetch_orchestration_item",
243 format!("Failed to deserialize history: {e}"),
244 )
245 })?;
246
247 let messages: Vec<WorkItem> =
248 serde_json::from_value(messages_json).map_err(|e| {
249 ProviderError::permanent(
250 "fetch_orchestration_item",
251 format!("Failed to deserialize messages: {e}"),
252 )
253 })?;
254
255 let duration_ms = start.elapsed().as_millis() as u64;
256 debug!(
257 target = "duroxide::providers::postgres",
258 operation = "fetch_orchestration_item",
259 instance_id = %instance_id,
260 execution_id = execution_id,
261 message_count = messages.len(),
262 history_count = history.len(),
263 duration_ms = duration_ms,
264 attempts = attempt + 1,
265 "Fetched orchestration item via stored procedure"
266 );
267
268 return Ok(Some(OrchestrationItem {
269 instance: instance_id,
270 orchestration_name,
271 execution_id: execution_id as u64,
272 version: orchestration_version,
273 history,
274 messages,
275 lock_token,
276 }));
277 }
278
279 if attempt < MAX_RETRIES {
280 sleep(std::time::Duration::from_millis(RETRY_DELAY_MS)).await;
281 }
282 }
283
284 Ok(None)
285 }
286 #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
287 async fn ack_orchestration_item(
288 &self,
289 lock_token: &str,
290 execution_id: u64,
291 history_delta: Vec<Event>,
292 worker_items: Vec<WorkItem>,
293 orchestrator_items: Vec<WorkItem>,
294 metadata: ExecutionMetadata,
295 ) -> Result<(), ProviderError> {
296 let start = std::time::Instant::now();
297
298 const MAX_RETRIES: u32 = 3;
299 const RETRY_DELAY_MS: u64 = 50;
300
301 let mut history_delta_payload = Vec::with_capacity(history_delta.len());
302 for event in &history_delta {
303 if event.event_id() == 0 {
304 return Err(ProviderError::permanent(
305 "ack_orchestration_item",
306 "event_id must be set by runtime",
307 ));
308 }
309
310 let event_json = serde_json::to_string(event).map_err(|e| {
311 ProviderError::permanent(
312 "ack_orchestration_item",
313 format!("Failed to serialize event: {e}"),
314 )
315 })?;
316
317 let event_type = format!("{event:?}")
318 .split('{')
319 .next()
320 .unwrap_or("Unknown")
321 .trim()
322 .to_string();
323
324 history_delta_payload.push(serde_json::json!({
325 "event_id": event.event_id(),
326 "event_type": event_type,
327 "event_data": event_json,
328 }));
329 }
330
331 let history_delta_json = serde_json::Value::Array(history_delta_payload);
332
333 let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
334 ProviderError::permanent(
335 "ack_orchestration_item",
336 format!("Failed to serialize worker items: {e}"),
337 )
338 })?;
339
340 let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
341 ProviderError::permanent(
342 "ack_orchestration_item",
343 format!("Failed to serialize orchestrator items: {e}"),
344 )
345 })?;
346
347 let metadata_json = serde_json::json!({
348 "orchestration_name": metadata.orchestration_name,
349 "orchestration_version": metadata.orchestration_version,
350 "status": metadata.status,
351 "output": metadata.output,
352 });
353
354 for attempt in 0..=MAX_RETRIES {
355 let result = sqlx::query(&format!(
356 "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6)",
357 self.schema_name
358 ))
359 .bind(lock_token)
360 .bind(execution_id as i64)
361 .bind(&history_delta_json)
362 .bind(&worker_items_json)
363 .bind(&orchestrator_items_json)
364 .bind(&metadata_json)
365 .execute(&*self.pool)
366 .await;
367
368 match result {
369 Ok(_) => {
370 let duration_ms = start.elapsed().as_millis() as u64;
371 debug!(
372 target = "duroxide::providers::postgres",
373 operation = "ack_orchestration_item",
374 execution_id = execution_id,
375 history_count = history_delta.len(),
376 worker_items_count = worker_items.len(),
377 orchestrator_items_count = orchestrator_items.len(),
378 duration_ms = duration_ms,
379 attempts = attempt + 1,
380 "Acknowledged orchestration item via stored procedure"
381 );
382 return Ok(());
383 }
384 Err(e) => {
385 if let SqlxError::Database(db_err) = &e {
387 if db_err.message().contains("Invalid lock token") {
388 return Err(ProviderError::permanent(
389 "ack_orchestration_item",
390 "Invalid lock token",
391 ));
392 }
393 } else if e.to_string().contains("Invalid lock token") {
394 return Err(ProviderError::permanent(
395 "ack_orchestration_item",
396 "Invalid lock token",
397 ));
398 }
399
400 let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
401 if provider_err.is_retryable() && attempt < MAX_RETRIES {
402 warn!(
403 target = "duroxide::providers::postgres",
404 operation = "ack_orchestration_item",
405 attempt = attempt + 1,
406 error = %provider_err,
407 "Retryable error, will retry"
408 );
409 sleep(std::time::Duration::from_millis(
410 RETRY_DELAY_MS * (attempt as u64 + 1),
411 ))
412 .await;
413 continue;
414 }
415 return Err(provider_err);
416 }
417 }
418 }
419
420 Ok(())
422 }
423 #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
424 async fn abandon_orchestration_item(
425 &self,
426 lock_token: &str,
427 delay: Option<Duration>,
428 ) -> Result<(), ProviderError> {
429 let start = std::time::Instant::now();
430 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
431
432 let instance_id = match sqlx::query_scalar::<_, String>(&format!(
433 "SELECT {}.abandon_orchestration_item($1, $2)",
434 self.schema_name
435 ))
436 .bind(lock_token)
437 .bind(delay_param)
438 .fetch_one(&*self.pool)
439 .await
440 {
441 Ok(instance_id) => instance_id,
442 Err(e) => {
443 if let SqlxError::Database(db_err) = &e {
444 if db_err.message().contains("Invalid lock token") {
445 return Err(ProviderError::permanent(
446 "abandon_orchestration_item",
447 "Invalid lock token",
448 ));
449 }
450 } else if e.to_string().contains("Invalid lock token") {
451 return Err(ProviderError::permanent(
452 "abandon_orchestration_item",
453 "Invalid lock token",
454 ));
455 }
456
457 return Err(Self::sqlx_to_provider_error(
458 "abandon_orchestration_item",
459 e,
460 ));
461 }
462 };
463
464 let duration_ms = start.elapsed().as_millis() as u64;
465 debug!(
466 target = "duroxide::providers::postgres",
467 operation = "abandon_orchestration_item",
468 instance_id = %instance_id,
469 delay_ms = delay.map(|d| d.as_millis() as u64),
470 duration_ms = duration_ms,
471 "Abandoned orchestration item via stored procedure"
472 );
473
474 Ok(())
475 }
476
477 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
478 async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
479 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
480 "SELECT out_event_data FROM {}.fetch_history($1)",
481 self.schema_name
482 ))
483 .bind(instance)
484 .fetch_all(&*self.pool)
485 .await
486 .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
487
488 Ok(event_data_rows
489 .into_iter()
490 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
491 .collect())
492 }
493
494 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
495 async fn append_with_execution(
496 &self,
497 instance: &str,
498 execution_id: u64,
499 new_events: Vec<Event>,
500 ) -> Result<(), ProviderError> {
501 if new_events.is_empty() {
502 return Ok(());
503 }
504
505 let mut events_payload = Vec::with_capacity(new_events.len());
506 for event in &new_events {
507 if event.event_id() == 0 {
508 error!(
509 target = "duroxide::providers::postgres",
510 operation = "append_with_execution",
511 error_type = "validation_error",
512 instance_id = %instance,
513 execution_id = execution_id,
514 "event_id must be set by runtime"
515 );
516 return Err(ProviderError::permanent(
517 "append_with_execution",
518 "event_id must be set by runtime",
519 ));
520 }
521
522 let event_json = serde_json::to_string(event).map_err(|e| {
523 ProviderError::permanent(
524 "append_with_execution",
525 format!("Failed to serialize event: {e}"),
526 )
527 })?;
528
529 let event_type = format!("{event:?}")
530 .split('{')
531 .next()
532 .unwrap_or("Unknown")
533 .trim()
534 .to_string();
535
536 events_payload.push(serde_json::json!({
537 "event_id": event.event_id(),
538 "event_type": event_type,
539 "event_data": event_json,
540 }));
541 }
542
543 let events_json = serde_json::Value::Array(events_payload);
544
545 sqlx::query(&format!(
546 "SELECT {}.append_history($1, $2, $3)",
547 self.schema_name
548 ))
549 .bind(instance)
550 .bind(execution_id as i64)
551 .bind(events_json)
552 .execute(&*self.pool)
553 .await
554 .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
555
556 debug!(
557 target = "duroxide::providers::postgres",
558 operation = "append_with_execution",
559 instance_id = %instance,
560 execution_id = execution_id,
561 event_count = new_events.len(),
562 "Appended history events via stored procedure"
563 );
564
565 Ok(())
566 }
567
568 #[instrument(skip(self), target = "duroxide::providers::postgres")]
569 async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
570 let work_item = serde_json::to_string(&item).map_err(|e| {
571 ProviderError::permanent(
572 "enqueue_worker_work",
573 format!("Failed to serialize work item: {e}"),
574 )
575 })?;
576
577 sqlx::query(&format!(
578 "SELECT {}.enqueue_worker_work($1)",
579 self.schema_name
580 ))
581 .bind(work_item)
582 .execute(&*self.pool)
583 .await
584 .map_err(|e| {
585 error!(
586 target = "duroxide::providers::postgres",
587 operation = "enqueue_worker_work",
588 error_type = "database_error",
589 error = %e,
590 "Failed to enqueue worker work"
591 );
592 Self::sqlx_to_provider_error("enqueue_worker_work", e)
593 })?;
594
595 Ok(())
596 }
597
598 #[instrument(skip(self), target = "duroxide::providers::postgres")]
599 async fn fetch_work_item(
600 &self,
601 lock_timeout: Duration,
602 _poll_timeout: Duration,
603 ) -> Result<Option<(WorkItem, String)>, ProviderError> {
604 let start = std::time::Instant::now();
605
606 let lock_timeout_ms = lock_timeout.as_millis() as i64;
608
609 let row = match sqlx::query_as::<_, (String, String)>(&format!(
610 "SELECT * FROM {}.fetch_work_item($1, $2)",
611 self.schema_name
612 ))
613 .bind(Self::now_millis())
614 .bind(lock_timeout_ms)
615 .fetch_optional(&*self.pool)
616 .await
617 {
618 Ok(row) => row,
619 Err(e) => {
620 return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
621 }
622 };
623
624 let (work_item_json, lock_token) = match row {
625 Some(row) => row,
626 None => return Ok(None),
627 };
628
629 let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
630 ProviderError::permanent(
631 "fetch_work_item",
632 format!("Failed to deserialize worker item: {e}"),
633 )
634 })?;
635
636 let duration_ms = start.elapsed().as_millis() as u64;
637
638 let instance_id = match &work_item {
640 WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
641 WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
642 WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
643 WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
644 WorkItem::TimerFired { instance, .. } => instance.as_str(),
645 WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
646 WorkItem::CancelInstance { instance, .. } => instance.as_str(),
647 WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
648 WorkItem::SubOrchCompleted {
649 parent_instance, ..
650 } => parent_instance.as_str(),
651 WorkItem::SubOrchFailed {
652 parent_instance, ..
653 } => parent_instance.as_str(),
654 };
655
656 debug!(
657 target = "duroxide::providers::postgres",
658 operation = "fetch_work_item",
659 instance_id = %instance_id,
660 duration_ms = duration_ms,
661 "Fetched activity work item via stored procedure"
662 );
663
664 Ok(Some((work_item, lock_token)))
665 }
666
667 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
668 async fn ack_work_item(&self, token: &str, completion: WorkItem) -> Result<(), ProviderError> {
669 let start = std::time::Instant::now();
670
671 let instance_id = match &completion {
673 WorkItem::ActivityCompleted { instance, .. }
674 | WorkItem::ActivityFailed { instance, .. } => instance,
675 _ => {
676 error!(
677 target = "duroxide::providers::postgres",
678 operation = "ack_worker",
679 error_type = "invalid_completion_type",
680 "Invalid completion work item type"
681 );
682 return Err(ProviderError::permanent(
683 "ack_worker",
684 "Invalid completion work item type",
685 ));
686 }
687 };
688
689 let completion_json = serde_json::to_string(&completion).map_err(|e| {
690 ProviderError::permanent(
691 "ack_worker",
692 format!("Failed to serialize completion: {e}"),
693 )
694 })?;
695
696 sqlx::query(&format!(
698 "SELECT {}.ack_worker($1, $2, $3)",
699 self.schema_name
700 ))
701 .bind(token)
702 .bind(instance_id)
703 .bind(completion_json)
704 .execute(&*self.pool)
705 .await
706 .map_err(|e| {
707 if e.to_string().contains("Worker queue item not found") {
708 error!(
709 target = "duroxide::providers::postgres",
710 operation = "ack_worker",
711 error_type = "worker_item_not_found",
712 token = %token,
713 "Worker queue item not found or already processed"
714 );
715 ProviderError::permanent(
716 "ack_worker",
717 "Worker queue item not found or already processed",
718 )
719 } else {
720 Self::sqlx_to_provider_error("ack_worker", e)
721 }
722 })?;
723
724 let duration_ms = start.elapsed().as_millis() as u64;
725 debug!(
726 target = "duroxide::providers::postgres",
727 operation = "ack_worker",
728 instance_id = %instance_id,
729 duration_ms = duration_ms,
730 "Acknowledged worker and enqueued completion"
731 );
732
733 Ok(())
734 }
735
736 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
737 async fn renew_work_item_lock(
738 &self,
739 token: &str,
740 extend_for: Duration,
741 ) -> Result<(), ProviderError> {
742 let start = std::time::Instant::now();
743
744 let now_ms = Self::now_millis();
746
747 let extend_secs = extend_for.as_secs() as i64;
749
750 match sqlx::query(&format!(
751 "SELECT {}.renew_work_item_lock($1, $2, $3)",
752 self.schema_name
753 ))
754 .bind(token)
755 .bind(now_ms)
756 .bind(extend_secs)
757 .execute(&*self.pool)
758 .await
759 {
760 Ok(_) => {
761 let duration_ms = start.elapsed().as_millis() as u64;
762 debug!(
763 target = "duroxide::providers::postgres",
764 operation = "renew_work_item_lock",
765 token = %token,
766 extend_for_secs = extend_secs,
767 duration_ms = duration_ms,
768 "Work item lock renewed successfully"
769 );
770 Ok(())
771 }
772 Err(e) => {
773 if let SqlxError::Database(db_err) = &e {
774 if db_err.message().contains("Lock token invalid") {
775 return Err(ProviderError::permanent(
776 "renew_work_item_lock",
777 "Lock token invalid, expired, or already acked",
778 ));
779 }
780 } else if e.to_string().contains("Lock token invalid") {
781 return Err(ProviderError::permanent(
782 "renew_work_item_lock",
783 "Lock token invalid, expired, or already acked",
784 ));
785 }
786
787 Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
788 }
789 }
790 }
791
792 #[instrument(skip(self), target = "duroxide::providers::postgres")]
793 async fn enqueue_for_orchestrator(
794 &self,
795 item: WorkItem,
796 delay: Option<Duration>,
797 ) -> Result<(), ProviderError> {
798 let work_item = serde_json::to_string(&item).map_err(|e| {
799 ProviderError::permanent(
800 "enqueue_orchestrator_work",
801 format!("Failed to serialize work item: {e}"),
802 )
803 })?;
804
805 let instance_id = match &item {
807 WorkItem::StartOrchestration { instance, .. }
808 | WorkItem::ActivityCompleted { instance, .. }
809 | WorkItem::ActivityFailed { instance, .. }
810 | WorkItem::TimerFired { instance, .. }
811 | WorkItem::ExternalRaised { instance, .. }
812 | WorkItem::CancelInstance { instance, .. }
813 | WorkItem::ContinueAsNew { instance, .. } => instance,
814 WorkItem::SubOrchCompleted {
815 parent_instance, ..
816 }
817 | WorkItem::SubOrchFailed {
818 parent_instance, ..
819 } => parent_instance,
820 WorkItem::ActivityExecute { .. } => {
821 return Err(ProviderError::permanent(
822 "enqueue_orchestrator_work",
823 "ActivityExecute should go to worker queue, not orchestrator queue",
824 ));
825 }
826 };
827
828 let now_ms = Self::now_millis();
830
831 let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
832 if *fire_at_ms > 0 {
833 if let Some(delay) = delay {
835 std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
836 } else {
837 *fire_at_ms
838 }
839 } else {
840 delay
842 .map(|d| now_ms as u64 + d.as_millis() as u64)
843 .unwrap_or(now_ms as u64)
844 }
845 } else {
846 delay
848 .map(|d| now_ms as u64 + d.as_millis() as u64)
849 .unwrap_or(now_ms as u64)
850 };
851
852 let visible_at = Utc
853 .timestamp_millis_opt(visible_at_ms as i64)
854 .single()
855 .ok_or_else(|| {
856 ProviderError::permanent(
857 "enqueue_orchestrator_work",
858 "Invalid visible_at timestamp",
859 )
860 })?;
861
862 sqlx::query(&format!(
867 "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
868 self.schema_name
869 ))
870 .bind(instance_id)
871 .bind(&work_item)
872 .bind(visible_at)
873 .bind::<Option<String>>(None) .bind::<Option<String>>(None) .bind::<Option<i64>>(None) .execute(&*self.pool)
877 .await
878 .map_err(|e| {
879 error!(
880 target = "duroxide::providers::postgres",
881 operation = "enqueue_orchestrator_work",
882 error_type = "database_error",
883 error = %e,
884 instance_id = %instance_id,
885 "Failed to enqueue orchestrator work"
886 );
887 Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
888 })?;
889
890 debug!(
891 target = "duroxide::providers::postgres",
892 operation = "enqueue_orchestrator_work",
893 instance_id = %instance_id,
894 delay_ms = delay.map(|d| d.as_millis() as u64),
895 "Enqueued orchestrator work"
896 );
897
898 Ok(())
899 }
900
901 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
902 async fn read_with_execution(
903 &self,
904 instance: &str,
905 execution_id: u64,
906 ) -> Result<Vec<Event>, ProviderError> {
907 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
908 "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
909 self.table_name("history")
910 ))
911 .bind(instance)
912 .bind(execution_id as i64)
913 .fetch_all(&*self.pool)
914 .await
915 .ok()
916 .unwrap_or_default();
917
918 Ok(event_data_rows
919 .into_iter()
920 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
921 .collect())
922 }
923
924 fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
925 Some(self)
926 }
927}
928
929#[async_trait::async_trait]
930impl ProviderAdmin for PostgresProvider {
931 #[instrument(skip(self), target = "duroxide::providers::postgres")]
932 async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
933 sqlx::query_scalar(&format!(
934 "SELECT instance_id FROM {}.list_instances()",
935 self.schema_name
936 ))
937 .fetch_all(&*self.pool)
938 .await
939 .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
940 }
941
942 #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
943 async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
944 sqlx::query_scalar(&format!(
945 "SELECT instance_id FROM {}.list_instances_by_status($1)",
946 self.schema_name
947 ))
948 .bind(status)
949 .fetch_all(&*self.pool)
950 .await
951 .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
952 }
953
954 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
955 async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
956 let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
957 "SELECT execution_id FROM {}.list_executions($1)",
958 self.schema_name
959 ))
960 .bind(instance)
961 .fetch_all(&*self.pool)
962 .await
963 .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
964
965 Ok(execution_ids.into_iter().map(|id| id as u64).collect())
966 }
967
968 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
969 async fn read_history_with_execution_id(
970 &self,
971 instance: &str,
972 execution_id: u64,
973 ) -> Result<Vec<Event>, ProviderError> {
974 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
975 "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
976 self.schema_name
977 ))
978 .bind(instance)
979 .bind(execution_id as i64)
980 .fetch_all(&*self.pool)
981 .await
982 .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
983
984 event_data_rows
985 .into_iter()
986 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
987 .collect::<Vec<Event>>()
988 .into_iter()
989 .map(Ok)
990 .collect()
991 }
992
993 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
994 async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
995 let execution_id = self.latest_execution_id(instance).await?;
996 self.read_history_with_execution_id(instance, execution_id)
997 .await
998 }
999
1000 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1001 async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1002 sqlx::query_scalar(&format!(
1003 "SELECT {}.latest_execution_id($1)",
1004 self.schema_name
1005 ))
1006 .bind(instance)
1007 .fetch_optional(&*self.pool)
1008 .await
1009 .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1010 .map(|id: i64| id as u64)
1011 .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1012 }
1013
1014 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1015 async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1016 let row: Option<(
1017 String,
1018 String,
1019 String,
1020 i64,
1021 chrono::DateTime<Utc>,
1022 Option<chrono::DateTime<Utc>>,
1023 Option<String>,
1024 Option<String>,
1025 )> = sqlx::query_as(&format!(
1026 "SELECT * FROM {}.get_instance_info($1)",
1027 self.schema_name
1028 ))
1029 .bind(instance)
1030 .fetch_optional(&*self.pool)
1031 .await
1032 .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1033
1034 let (
1035 instance_id,
1036 orchestration_name,
1037 orchestration_version,
1038 current_execution_id,
1039 created_at,
1040 updated_at,
1041 status,
1042 output,
1043 ) =
1044 row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1045
1046 Ok(InstanceInfo {
1047 instance_id,
1048 orchestration_name,
1049 orchestration_version,
1050 current_execution_id: current_execution_id as u64,
1051 status: status.unwrap_or_else(|| "Running".to_string()),
1052 output,
1053 created_at: created_at.timestamp_millis() as u64,
1054 updated_at: updated_at
1055 .map(|dt| dt.timestamp_millis() as u64)
1056 .unwrap_or(created_at.timestamp_millis() as u64),
1057 })
1058 }
1059
1060 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1061 async fn get_execution_info(
1062 &self,
1063 instance: &str,
1064 execution_id: u64,
1065 ) -> Result<ExecutionInfo, ProviderError> {
1066 let row: Option<(
1067 i64,
1068 String,
1069 Option<String>,
1070 chrono::DateTime<Utc>,
1071 Option<chrono::DateTime<Utc>>,
1072 i64,
1073 )> = sqlx::query_as(&format!(
1074 "SELECT * FROM {}.get_execution_info($1, $2)",
1075 self.schema_name
1076 ))
1077 .bind(instance)
1078 .bind(execution_id as i64)
1079 .fetch_optional(&*self.pool)
1080 .await
1081 .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1082
1083 let (exec_id, status, output, started_at, completed_at, event_count) = row
1084 .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1085
1086 Ok(ExecutionInfo {
1087 execution_id: exec_id as u64,
1088 status,
1089 output,
1090 started_at: started_at.timestamp_millis() as u64,
1091 completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1092 event_count: event_count as usize,
1093 })
1094 }
1095
1096 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1097 async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1098 let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1099 "SELECT * FROM {}.get_system_metrics()",
1100 self.schema_name
1101 ))
1102 .fetch_optional(&*self.pool)
1103 .await
1104 .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1105
1106 let (
1107 total_instances,
1108 total_executions,
1109 running_instances,
1110 completed_instances,
1111 failed_instances,
1112 total_events,
1113 ) = row.ok_or_else(|| {
1114 ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1115 })?;
1116
1117 Ok(SystemMetrics {
1118 total_instances: total_instances as u64,
1119 total_executions: total_executions as u64,
1120 running_instances: running_instances as u64,
1121 completed_instances: completed_instances as u64,
1122 failed_instances: failed_instances as u64,
1123 total_events: total_events as u64,
1124 })
1125 }
1126
1127 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1128 async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1129 let now_ms = Self::now_millis();
1130
1131 let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1132 "SELECT * FROM {}.get_queue_depths($1)",
1133 self.schema_name
1134 ))
1135 .bind(now_ms)
1136 .fetch_optional(&*self.pool)
1137 .await
1138 .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1139
1140 let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1141 ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1142 })?;
1143
1144 Ok(QueueDepths {
1145 orchestrator_queue: orchestrator_queue as usize,
1146 worker_queue: worker_queue as usize,
1147 timer_queue: 0, })
1149 }
1150}