1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4 ExecutionInfo, ExecutionMetadata, ExecutionState, InstanceInfo, OrchestrationItem, Provider,
5 ProviderAdmin, ProviderError, QueueDepths, SystemMetrics, WorkItem,
6};
7use duroxide::Event;
8use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
9use std::sync::Arc;
10use std::time::Duration;
11use std::time::{SystemTime, UNIX_EPOCH};
12use tokio::time::sleep;
13use tracing::{debug, error, instrument, warn};
14
15use crate::migrations::MigrationRunner;
16
17pub struct PostgresProvider {
40 pool: Arc<PgPool>,
41 schema_name: String,
42}
43
44impl PostgresProvider {
45 pub async fn new(database_url: &str) -> Result<Self> {
46 Self::new_with_schema(database_url, None).await
47 }
48
49 pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
50 let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
51 .ok()
52 .and_then(|s| s.parse::<u32>().ok())
53 .unwrap_or(10);
54
55 let pool = PgPoolOptions::new()
56 .max_connections(max_connections)
57 .min_connections(1)
58 .acquire_timeout(std::time::Duration::from_secs(30))
59 .connect(database_url)
60 .await?;
61
62 let schema_name = schema_name.unwrap_or("public").to_string();
63
64 let provider = Self {
65 pool: Arc::new(pool),
66 schema_name: schema_name.clone(),
67 };
68
69 let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
71 migration_runner.migrate().await?;
72
73 Ok(provider)
74 }
75
76 #[instrument(skip(self), target = "duroxide::providers::postgres")]
77 pub async fn initialize_schema(&self) -> Result<()> {
78 let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
81 migration_runner.migrate().await?;
82 Ok(())
83 }
84
85 fn now_millis() -> i64 {
87 SystemTime::now()
88 .duration_since(UNIX_EPOCH)
89 .unwrap()
90 .as_millis() as i64
91 }
92
93 fn parse_execution_state(state_str: &str) -> ExecutionState {
96 if state_str == "Running" {
97 ExecutionState::Running
98 } else if state_str == "Missing" {
99 ExecutionState::Missing
100 } else if let Some(status) = state_str.strip_prefix("Terminal:") {
101 ExecutionState::Terminal {
102 status: status.to_string(),
103 }
104 } else {
105 warn!(
107 target = "duroxide::providers::postgres",
108 state = %state_str,
109 "Unrecognized execution state, defaulting to Running"
110 );
111 ExecutionState::Running
112 }
113 }
114
115 fn table_name(&self, table: &str) -> String {
117 format!("{}.{}", self.schema_name, table)
118 }
119
120 pub fn pool(&self) -> &PgPool {
122 &self.pool
123 }
124
125 pub fn schema_name(&self) -> &str {
127 &self.schema_name
128 }
129
130 fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
132 match e {
133 SqlxError::Database(ref db_err) => {
134 let code_opt = db_err.code();
136 let code = code_opt.as_deref();
137 if code == Some("40P01") {
138 ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
140 } else if code == Some("40001") {
141 ProviderError::permanent(operation, format!("Serialization failure: {e}"))
143 } else if code == Some("23505") {
144 ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
146 } else if code == Some("23503") {
147 ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
149 } else {
150 ProviderError::permanent(operation, format!("Database error: {e}"))
151 }
152 }
153 SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
154 ProviderError::retryable(operation, format!("Connection pool error: {e}"))
155 }
156 SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
157 _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
158 }
159 }
160
161 pub async fn cleanup_schema(&self) -> Result<()> {
166 sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
168 .execute(&*self.pool)
169 .await?;
170
171 if self.schema_name != "public" {
174 sqlx::query(&format!(
175 "DROP SCHEMA IF EXISTS {} CASCADE",
176 self.schema_name
177 ))
178 .execute(&*self.pool)
179 .await?;
180 } else {
181 }
184
185 Ok(())
186 }
187}
188
189#[async_trait::async_trait]
190impl Provider for PostgresProvider {
191 fn name(&self) -> &str {
192 "duroxide-pg"
193 }
194
195 fn version(&self) -> &str {
196 env!("CARGO_PKG_VERSION")
197 }
198
199 #[instrument(skip(self), target = "duroxide::providers::postgres")]
200 async fn fetch_orchestration_item(
201 &self,
202 lock_timeout: Duration,
203 _poll_timeout: Duration,
204 ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
205 let start = std::time::Instant::now();
206
207 const MAX_RETRIES: u32 = 3;
208 const RETRY_DELAY_MS: u64 = 50;
209
210 let lock_timeout_ms = lock_timeout.as_millis() as i64;
212 let mut _last_error: Option<ProviderError> = None;
213
214 for attempt in 0..=MAX_RETRIES {
215 let now_ms = Self::now_millis();
216
217 let result: Result<
218 Option<(
219 String,
220 String,
221 String,
222 i64,
223 serde_json::Value,
224 serde_json::Value,
225 String,
226 i32,
227 )>,
228 SqlxError,
229 > = sqlx::query_as(&format!(
230 "SELECT * FROM {}.fetch_orchestration_item($1, $2)",
231 self.schema_name
232 ))
233 .bind(now_ms)
234 .bind(lock_timeout_ms)
235 .fetch_optional(&*self.pool)
236 .await;
237
238 let row = match result {
239 Ok(r) => r,
240 Err(e) => {
241 let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
242 if provider_err.is_retryable() && attempt < MAX_RETRIES {
243 warn!(
244 target = "duroxide::providers::postgres",
245 operation = "fetch_orchestration_item",
246 attempt = attempt + 1,
247 error = %provider_err,
248 "Retryable error, will retry"
249 );
250 _last_error = Some(provider_err);
251 sleep(std::time::Duration::from_millis(
252 RETRY_DELAY_MS * (attempt as u64 + 1),
253 ))
254 .await;
255 continue;
256 }
257 return Err(provider_err);
258 }
259 };
260
261 if let Some((
262 instance_id,
263 orchestration_name,
264 orchestration_version,
265 execution_id,
266 history_json,
267 messages_json,
268 lock_token,
269 attempt_count,
270 )) = row
271 {
272 let history: Vec<Event> = serde_json::from_value(history_json).map_err(|e| {
273 ProviderError::permanent(
274 "fetch_orchestration_item",
275 format!("Failed to deserialize history: {e}"),
276 )
277 })?;
278
279 let messages: Vec<WorkItem> =
280 serde_json::from_value(messages_json).map_err(|e| {
281 ProviderError::permanent(
282 "fetch_orchestration_item",
283 format!("Failed to deserialize messages: {e}"),
284 )
285 })?;
286
287 let duration_ms = start.elapsed().as_millis() as u64;
288 debug!(
289 target = "duroxide::providers::postgres",
290 operation = "fetch_orchestration_item",
291 instance_id = %instance_id,
292 execution_id = execution_id,
293 message_count = messages.len(),
294 history_count = history.len(),
295 attempt_count = attempt_count,
296 duration_ms = duration_ms,
297 attempts = attempt + 1,
298 "Fetched orchestration item via stored procedure"
299 );
300
301 return Ok(Some((
302 OrchestrationItem {
303 instance: instance_id,
304 orchestration_name,
305 execution_id: execution_id as u64,
306 version: orchestration_version,
307 history,
308 messages,
309 },
310 lock_token,
311 attempt_count as u32,
312 )));
313 }
314
315 if attempt < MAX_RETRIES {
316 sleep(std::time::Duration::from_millis(RETRY_DELAY_MS)).await;
317 }
318 }
319
320 Ok(None)
321 }
322 #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
323 async fn ack_orchestration_item(
324 &self,
325 lock_token: &str,
326 execution_id: u64,
327 history_delta: Vec<Event>,
328 worker_items: Vec<WorkItem>,
329 orchestrator_items: Vec<WorkItem>,
330 metadata: ExecutionMetadata,
331 ) -> Result<(), ProviderError> {
332 let start = std::time::Instant::now();
333
334 const MAX_RETRIES: u32 = 3;
335 const RETRY_DELAY_MS: u64 = 50;
336
337 let mut history_delta_payload = Vec::with_capacity(history_delta.len());
338 for event in &history_delta {
339 if event.event_id() == 0 {
340 return Err(ProviderError::permanent(
341 "ack_orchestration_item",
342 "event_id must be set by runtime",
343 ));
344 }
345
346 let event_json = serde_json::to_string(event).map_err(|e| {
347 ProviderError::permanent(
348 "ack_orchestration_item",
349 format!("Failed to serialize event: {e}"),
350 )
351 })?;
352
353 let event_type = format!("{event:?}")
354 .split('{')
355 .next()
356 .unwrap_or("Unknown")
357 .trim()
358 .to_string();
359
360 history_delta_payload.push(serde_json::json!({
361 "event_id": event.event_id(),
362 "event_type": event_type,
363 "event_data": event_json,
364 }));
365 }
366
367 let history_delta_json = serde_json::Value::Array(history_delta_payload);
368
369 let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
370 ProviderError::permanent(
371 "ack_orchestration_item",
372 format!("Failed to serialize worker items: {e}"),
373 )
374 })?;
375
376 let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
377 ProviderError::permanent(
378 "ack_orchestration_item",
379 format!("Failed to serialize orchestrator items: {e}"),
380 )
381 })?;
382
383 let metadata_json = serde_json::json!({
384 "orchestration_name": metadata.orchestration_name,
385 "orchestration_version": metadata.orchestration_version,
386 "status": metadata.status,
387 "output": metadata.output,
388 });
389
390 for attempt in 0..=MAX_RETRIES {
391 let result = sqlx::query(&format!(
392 "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6)",
393 self.schema_name
394 ))
395 .bind(lock_token)
396 .bind(execution_id as i64)
397 .bind(&history_delta_json)
398 .bind(&worker_items_json)
399 .bind(&orchestrator_items_json)
400 .bind(&metadata_json)
401 .execute(&*self.pool)
402 .await;
403
404 match result {
405 Ok(_) => {
406 let duration_ms = start.elapsed().as_millis() as u64;
407 debug!(
408 target = "duroxide::providers::postgres",
409 operation = "ack_orchestration_item",
410 execution_id = execution_id,
411 history_count = history_delta.len(),
412 worker_items_count = worker_items.len(),
413 orchestrator_items_count = orchestrator_items.len(),
414 duration_ms = duration_ms,
415 attempts = attempt + 1,
416 "Acknowledged orchestration item via stored procedure"
417 );
418 return Ok(());
419 }
420 Err(e) => {
421 if let SqlxError::Database(db_err) = &e {
423 if db_err.message().contains("Invalid lock token") {
424 return Err(ProviderError::permanent(
425 "ack_orchestration_item",
426 "Invalid lock token",
427 ));
428 }
429 } else if e.to_string().contains("Invalid lock token") {
430 return Err(ProviderError::permanent(
431 "ack_orchestration_item",
432 "Invalid lock token",
433 ));
434 }
435
436 let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
437 if provider_err.is_retryable() && attempt < MAX_RETRIES {
438 warn!(
439 target = "duroxide::providers::postgres",
440 operation = "ack_orchestration_item",
441 attempt = attempt + 1,
442 error = %provider_err,
443 "Retryable error, will retry"
444 );
445 sleep(std::time::Duration::from_millis(
446 RETRY_DELAY_MS * (attempt as u64 + 1),
447 ))
448 .await;
449 continue;
450 }
451 return Err(provider_err);
452 }
453 }
454 }
455
456 Ok(())
458 }
459 #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
460 async fn abandon_orchestration_item(
461 &self,
462 lock_token: &str,
463 delay: Option<Duration>,
464 ignore_attempt: bool,
465 ) -> Result<(), ProviderError> {
466 let start = std::time::Instant::now();
467 let now_ms = Self::now_millis();
468 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
469
470 let instance_id = match sqlx::query_scalar::<_, String>(&format!(
471 "SELECT {}.abandon_orchestration_item($1, $2, $3, $4)",
472 self.schema_name
473 ))
474 .bind(lock_token)
475 .bind(now_ms)
476 .bind(delay_param)
477 .bind(ignore_attempt)
478 .fetch_one(&*self.pool)
479 .await
480 {
481 Ok(instance_id) => instance_id,
482 Err(e) => {
483 if let SqlxError::Database(db_err) = &e {
484 if db_err.message().contains("Invalid lock token") {
485 return Err(ProviderError::permanent(
486 "abandon_orchestration_item",
487 "Invalid lock token",
488 ));
489 }
490 } else if e.to_string().contains("Invalid lock token") {
491 return Err(ProviderError::permanent(
492 "abandon_orchestration_item",
493 "Invalid lock token",
494 ));
495 }
496
497 return Err(Self::sqlx_to_provider_error(
498 "abandon_orchestration_item",
499 e,
500 ));
501 }
502 };
503
504 let duration_ms = start.elapsed().as_millis() as u64;
505 debug!(
506 target = "duroxide::providers::postgres",
507 operation = "abandon_orchestration_item",
508 instance_id = %instance_id,
509 delay_ms = delay.map(|d| d.as_millis() as u64),
510 ignore_attempt = ignore_attempt,
511 duration_ms = duration_ms,
512 "Abandoned orchestration item via stored procedure"
513 );
514
515 Ok(())
516 }
517
518 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
519 async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
520 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
521 "SELECT out_event_data FROM {}.fetch_history($1)",
522 self.schema_name
523 ))
524 .bind(instance)
525 .fetch_all(&*self.pool)
526 .await
527 .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
528
529 Ok(event_data_rows
530 .into_iter()
531 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
532 .collect())
533 }
534
535 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
536 async fn append_with_execution(
537 &self,
538 instance: &str,
539 execution_id: u64,
540 new_events: Vec<Event>,
541 ) -> Result<(), ProviderError> {
542 if new_events.is_empty() {
543 return Ok(());
544 }
545
546 let mut events_payload = Vec::with_capacity(new_events.len());
547 for event in &new_events {
548 if event.event_id() == 0 {
549 error!(
550 target = "duroxide::providers::postgres",
551 operation = "append_with_execution",
552 error_type = "validation_error",
553 instance_id = %instance,
554 execution_id = execution_id,
555 "event_id must be set by runtime"
556 );
557 return Err(ProviderError::permanent(
558 "append_with_execution",
559 "event_id must be set by runtime",
560 ));
561 }
562
563 let event_json = serde_json::to_string(event).map_err(|e| {
564 ProviderError::permanent(
565 "append_with_execution",
566 format!("Failed to serialize event: {e}"),
567 )
568 })?;
569
570 let event_type = format!("{event:?}")
571 .split('{')
572 .next()
573 .unwrap_or("Unknown")
574 .trim()
575 .to_string();
576
577 events_payload.push(serde_json::json!({
578 "event_id": event.event_id(),
579 "event_type": event_type,
580 "event_data": event_json,
581 }));
582 }
583
584 let events_json = serde_json::Value::Array(events_payload);
585
586 sqlx::query(&format!(
587 "SELECT {}.append_history($1, $2, $3)",
588 self.schema_name
589 ))
590 .bind(instance)
591 .bind(execution_id as i64)
592 .bind(events_json)
593 .execute(&*self.pool)
594 .await
595 .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
596
597 debug!(
598 target = "duroxide::providers::postgres",
599 operation = "append_with_execution",
600 instance_id = %instance,
601 execution_id = execution_id,
602 event_count = new_events.len(),
603 "Appended history events via stored procedure"
604 );
605
606 Ok(())
607 }
608
609 #[instrument(skip(self), target = "duroxide::providers::postgres")]
610 async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
611 let work_item = serde_json::to_string(&item).map_err(|e| {
612 ProviderError::permanent(
613 "enqueue_worker_work",
614 format!("Failed to serialize work item: {e}"),
615 )
616 })?;
617
618 let now_ms = Self::now_millis();
619
620 sqlx::query(&format!(
621 "SELECT {}.enqueue_worker_work($1, $2)",
622 self.schema_name
623 ))
624 .bind(work_item)
625 .bind(now_ms)
626 .execute(&*self.pool)
627 .await
628 .map_err(|e| {
629 error!(
630 target = "duroxide::providers::postgres",
631 operation = "enqueue_worker_work",
632 error_type = "database_error",
633 error = %e,
634 "Failed to enqueue worker work"
635 );
636 Self::sqlx_to_provider_error("enqueue_worker_work", e)
637 })?;
638
639 Ok(())
640 }
641
642 #[instrument(skip(self), target = "duroxide::providers::postgres")]
643 async fn fetch_work_item(
644 &self,
645 lock_timeout: Duration,
646 _poll_timeout: Duration,
647 ) -> Result<Option<(WorkItem, String, u32, ExecutionState)>, ProviderError> {
648 let start = std::time::Instant::now();
649
650 let lock_timeout_ms = lock_timeout.as_millis() as i64;
652
653 let row = match sqlx::query_as::<_, (String, String, i32, String)>(&format!(
654 "SELECT * FROM {}.fetch_work_item($1, $2)",
655 self.schema_name
656 ))
657 .bind(Self::now_millis())
658 .bind(lock_timeout_ms)
659 .fetch_optional(&*self.pool)
660 .await
661 {
662 Ok(row) => row,
663 Err(e) => {
664 return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
665 }
666 };
667
668 let (work_item_json, lock_token, attempt_count, execution_state_str) = match row {
669 Some(row) => row,
670 None => return Ok(None),
671 };
672
673 let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
674 ProviderError::permanent(
675 "fetch_work_item",
676 format!("Failed to deserialize worker item: {e}"),
677 )
678 })?;
679
680 let execution_state = Self::parse_execution_state(&execution_state_str);
682
683 let duration_ms = start.elapsed().as_millis() as u64;
684
685 let instance_id = match &work_item {
687 WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
688 WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
689 WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
690 WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
691 WorkItem::TimerFired { instance, .. } => instance.as_str(),
692 WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
693 WorkItem::CancelInstance { instance, .. } => instance.as_str(),
694 WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
695 WorkItem::SubOrchCompleted {
696 parent_instance, ..
697 } => parent_instance.as_str(),
698 WorkItem::SubOrchFailed {
699 parent_instance, ..
700 } => parent_instance.as_str(),
701 };
702
703 debug!(
704 target = "duroxide::providers::postgres",
705 operation = "fetch_work_item",
706 instance_id = %instance_id,
707 attempt_count = attempt_count,
708 execution_state = ?execution_state,
709 duration_ms = duration_ms,
710 "Fetched activity work item via stored procedure"
711 );
712
713 Ok(Some((
714 work_item,
715 lock_token,
716 attempt_count as u32,
717 execution_state,
718 )))
719 }
720
721 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
722 async fn ack_work_item(
723 &self,
724 token: &str,
725 completion: Option<WorkItem>,
726 ) -> Result<(), ProviderError> {
727 let start = std::time::Instant::now();
728
729 let Some(completion) = completion else {
731 sqlx::query(&format!(
733 "SELECT {}.ack_worker($1)",
734 self.schema_name
735 ))
736 .bind(token)
737 .execute(&*self.pool)
738 .await
739 .map_err(|e| {
740 if e.to_string().contains("Worker queue item not found") {
741 ProviderError::permanent(
742 "ack_worker",
743 "Worker queue item not found or already processed",
744 )
745 } else {
746 Self::sqlx_to_provider_error("ack_worker", e)
747 }
748 })?;
749
750 let duration_ms = start.elapsed().as_millis() as u64;
751 debug!(
752 target = "duroxide::providers::postgres",
753 operation = "ack_worker",
754 token = %token,
755 duration_ms = duration_ms,
756 "Acknowledged worker without completion (cancelled)"
757 );
758 return Ok(());
759 };
760
761 let instance_id = match &completion {
763 WorkItem::ActivityCompleted { instance, .. }
764 | WorkItem::ActivityFailed { instance, .. } => instance,
765 _ => {
766 error!(
767 target = "duroxide::providers::postgres",
768 operation = "ack_worker",
769 error_type = "invalid_completion_type",
770 "Invalid completion work item type"
771 );
772 return Err(ProviderError::permanent(
773 "ack_worker",
774 "Invalid completion work item type",
775 ));
776 }
777 };
778
779 let completion_json = serde_json::to_string(&completion).map_err(|e| {
780 ProviderError::permanent("ack_worker", format!("Failed to serialize completion: {e}"))
781 })?;
782
783 let now_ms = Self::now_millis();
784
785 sqlx::query(&format!(
787 "SELECT {}.ack_worker($1, $2, $3, $4)",
788 self.schema_name
789 ))
790 .bind(token)
791 .bind(instance_id)
792 .bind(completion_json)
793 .bind(now_ms)
794 .execute(&*self.pool)
795 .await
796 .map_err(|e| {
797 if e.to_string().contains("Worker queue item not found") {
798 error!(
799 target = "duroxide::providers::postgres",
800 operation = "ack_worker",
801 error_type = "worker_item_not_found",
802 token = %token,
803 "Worker queue item not found or already processed"
804 );
805 ProviderError::permanent(
806 "ack_worker",
807 "Worker queue item not found or already processed",
808 )
809 } else {
810 Self::sqlx_to_provider_error("ack_worker", e)
811 }
812 })?;
813
814 let duration_ms = start.elapsed().as_millis() as u64;
815 debug!(
816 target = "duroxide::providers::postgres",
817 operation = "ack_worker",
818 instance_id = %instance_id,
819 duration_ms = duration_ms,
820 "Acknowledged worker and enqueued completion"
821 );
822
823 Ok(())
824 }
825
826 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
827 async fn renew_work_item_lock(
828 &self,
829 token: &str,
830 extend_for: Duration,
831 ) -> Result<ExecutionState, ProviderError> {
832 let start = std::time::Instant::now();
833
834 let now_ms = Self::now_millis();
836
837 let extend_secs = extend_for.as_secs() as i64;
839
840 match sqlx::query_as::<_, (String,)>(&format!(
841 "SELECT {}.renew_work_item_lock($1, $2, $3)",
842 self.schema_name
843 ))
844 .bind(token)
845 .bind(now_ms)
846 .bind(extend_secs)
847 .fetch_one(&*self.pool)
848 .await
849 {
850 Ok((execution_state_str,)) => {
851 let execution_state = Self::parse_execution_state(&execution_state_str);
852 let duration_ms = start.elapsed().as_millis() as u64;
853 debug!(
854 target = "duroxide::providers::postgres",
855 operation = "renew_work_item_lock",
856 token = %token,
857 extend_for_secs = extend_secs,
858 execution_state = ?execution_state,
859 duration_ms = duration_ms,
860 "Work item lock renewed successfully"
861 );
862 Ok(execution_state)
863 }
864 Err(e) => {
865 if let SqlxError::Database(db_err) = &e {
866 if db_err.message().contains("Lock token invalid") {
867 return Err(ProviderError::permanent(
868 "renew_work_item_lock",
869 "Lock token invalid, expired, or already acked",
870 ));
871 }
872 } else if e.to_string().contains("Lock token invalid") {
873 return Err(ProviderError::permanent(
874 "renew_work_item_lock",
875 "Lock token invalid, expired, or already acked",
876 ));
877 }
878
879 Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
880 }
881 }
882 }
883
884 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
885 async fn abandon_work_item(
886 &self,
887 token: &str,
888 delay: Option<Duration>,
889 ignore_attempt: bool,
890 ) -> Result<(), ProviderError> {
891 let start = std::time::Instant::now();
892 let now_ms = Self::now_millis();
893 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
894
895 match sqlx::query(&format!(
896 "SELECT {}.abandon_work_item($1, $2, $3, $4)",
897 self.schema_name
898 ))
899 .bind(token)
900 .bind(now_ms)
901 .bind(delay_param)
902 .bind(ignore_attempt)
903 .execute(&*self.pool)
904 .await
905 {
906 Ok(_) => {
907 let duration_ms = start.elapsed().as_millis() as u64;
908 debug!(
909 target = "duroxide::providers::postgres",
910 operation = "abandon_work_item",
911 token = %token,
912 delay_ms = delay.map(|d| d.as_millis() as u64),
913 ignore_attempt = ignore_attempt,
914 duration_ms = duration_ms,
915 "Abandoned work item via stored procedure"
916 );
917 Ok(())
918 }
919 Err(e) => {
920 if let SqlxError::Database(db_err) = &e {
921 if db_err.message().contains("Invalid lock token")
922 || db_err.message().contains("already acked")
923 {
924 return Err(ProviderError::permanent(
925 "abandon_work_item",
926 "Invalid lock token or already acked",
927 ));
928 }
929 } else if e.to_string().contains("Invalid lock token")
930 || e.to_string().contains("already acked")
931 {
932 return Err(ProviderError::permanent(
933 "abandon_work_item",
934 "Invalid lock token or already acked",
935 ));
936 }
937
938 Err(Self::sqlx_to_provider_error("abandon_work_item", e))
939 }
940 }
941 }
942
943 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
944 async fn renew_orchestration_item_lock(
945 &self,
946 token: &str,
947 extend_for: Duration,
948 ) -> Result<(), ProviderError> {
949 let start = std::time::Instant::now();
950
951 let now_ms = Self::now_millis();
953
954 let extend_secs = extend_for.as_secs() as i64;
956
957 match sqlx::query(&format!(
958 "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
959 self.schema_name
960 ))
961 .bind(token)
962 .bind(now_ms)
963 .bind(extend_secs)
964 .execute(&*self.pool)
965 .await
966 {
967 Ok(_) => {
968 let duration_ms = start.elapsed().as_millis() as u64;
969 debug!(
970 target = "duroxide::providers::postgres",
971 operation = "renew_orchestration_item_lock",
972 token = %token,
973 extend_for_secs = extend_secs,
974 duration_ms = duration_ms,
975 "Orchestration item lock renewed successfully"
976 );
977 Ok(())
978 }
979 Err(e) => {
980 if let SqlxError::Database(db_err) = &e {
981 if db_err.message().contains("Lock token invalid")
982 || db_err.message().contains("expired")
983 || db_err.message().contains("already released")
984 {
985 return Err(ProviderError::permanent(
986 "renew_orchestration_item_lock",
987 "Lock token invalid, expired, or already released",
988 ));
989 }
990 } else if e.to_string().contains("Lock token invalid")
991 || e.to_string().contains("expired")
992 || e.to_string().contains("already released")
993 {
994 return Err(ProviderError::permanent(
995 "renew_orchestration_item_lock",
996 "Lock token invalid, expired, or already released",
997 ));
998 }
999
1000 Err(Self::sqlx_to_provider_error(
1001 "renew_orchestration_item_lock",
1002 e,
1003 ))
1004 }
1005 }
1006 }
1007
1008 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1009 async fn enqueue_for_orchestrator(
1010 &self,
1011 item: WorkItem,
1012 delay: Option<Duration>,
1013 ) -> Result<(), ProviderError> {
1014 let work_item = serde_json::to_string(&item).map_err(|e| {
1015 ProviderError::permanent(
1016 "enqueue_orchestrator_work",
1017 format!("Failed to serialize work item: {e}"),
1018 )
1019 })?;
1020
1021 let instance_id = match &item {
1023 WorkItem::StartOrchestration { instance, .. }
1024 | WorkItem::ActivityCompleted { instance, .. }
1025 | WorkItem::ActivityFailed { instance, .. }
1026 | WorkItem::TimerFired { instance, .. }
1027 | WorkItem::ExternalRaised { instance, .. }
1028 | WorkItem::CancelInstance { instance, .. }
1029 | WorkItem::ContinueAsNew { instance, .. } => instance,
1030 WorkItem::SubOrchCompleted {
1031 parent_instance, ..
1032 }
1033 | WorkItem::SubOrchFailed {
1034 parent_instance, ..
1035 } => parent_instance,
1036 WorkItem::ActivityExecute { .. } => {
1037 return Err(ProviderError::permanent(
1038 "enqueue_orchestrator_work",
1039 "ActivityExecute should go to worker queue, not orchestrator queue",
1040 ));
1041 }
1042 };
1043
1044 let now_ms = Self::now_millis();
1046
1047 let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
1048 if *fire_at_ms > 0 {
1049 if let Some(delay) = delay {
1051 std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
1052 } else {
1053 *fire_at_ms
1054 }
1055 } else {
1056 delay
1058 .map(|d| now_ms as u64 + d.as_millis() as u64)
1059 .unwrap_or(now_ms as u64)
1060 }
1061 } else {
1062 delay
1064 .map(|d| now_ms as u64 + d.as_millis() as u64)
1065 .unwrap_or(now_ms as u64)
1066 };
1067
1068 let visible_at = Utc
1069 .timestamp_millis_opt(visible_at_ms as i64)
1070 .single()
1071 .ok_or_else(|| {
1072 ProviderError::permanent(
1073 "enqueue_orchestrator_work",
1074 "Invalid visible_at timestamp",
1075 )
1076 })?;
1077
1078 sqlx::query(&format!(
1083 "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
1084 self.schema_name
1085 ))
1086 .bind(instance_id)
1087 .bind(&work_item)
1088 .bind(visible_at)
1089 .bind::<Option<String>>(None) .bind::<Option<String>>(None) .bind::<Option<i64>>(None) .execute(&*self.pool)
1093 .await
1094 .map_err(|e| {
1095 error!(
1096 target = "duroxide::providers::postgres",
1097 operation = "enqueue_orchestrator_work",
1098 error_type = "database_error",
1099 error = %e,
1100 instance_id = %instance_id,
1101 "Failed to enqueue orchestrator work"
1102 );
1103 Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1104 })?;
1105
1106 debug!(
1107 target = "duroxide::providers::postgres",
1108 operation = "enqueue_orchestrator_work",
1109 instance_id = %instance_id,
1110 delay_ms = delay.map(|d| d.as_millis() as u64),
1111 "Enqueued orchestrator work"
1112 );
1113
1114 Ok(())
1115 }
1116
1117 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1118 async fn read_with_execution(
1119 &self,
1120 instance: &str,
1121 execution_id: u64,
1122 ) -> Result<Vec<Event>, ProviderError> {
1123 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1124 "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1125 self.table_name("history")
1126 ))
1127 .bind(instance)
1128 .bind(execution_id as i64)
1129 .fetch_all(&*self.pool)
1130 .await
1131 .ok()
1132 .unwrap_or_default();
1133
1134 Ok(event_data_rows
1135 .into_iter()
1136 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1137 .collect())
1138 }
1139
1140 fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1141 Some(self)
1142 }
1143}
1144
1145#[async_trait::async_trait]
1146impl ProviderAdmin for PostgresProvider {
1147 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1148 async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1149 sqlx::query_scalar(&format!(
1150 "SELECT instance_id FROM {}.list_instances()",
1151 self.schema_name
1152 ))
1153 .fetch_all(&*self.pool)
1154 .await
1155 .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1156 }
1157
1158 #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1159 async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1160 sqlx::query_scalar(&format!(
1161 "SELECT instance_id FROM {}.list_instances_by_status($1)",
1162 self.schema_name
1163 ))
1164 .bind(status)
1165 .fetch_all(&*self.pool)
1166 .await
1167 .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1168 }
1169
1170 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1171 async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1172 let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1173 "SELECT execution_id FROM {}.list_executions($1)",
1174 self.schema_name
1175 ))
1176 .bind(instance)
1177 .fetch_all(&*self.pool)
1178 .await
1179 .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1180
1181 Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1182 }
1183
1184 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1185 async fn read_history_with_execution_id(
1186 &self,
1187 instance: &str,
1188 execution_id: u64,
1189 ) -> Result<Vec<Event>, ProviderError> {
1190 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1191 "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1192 self.schema_name
1193 ))
1194 .bind(instance)
1195 .bind(execution_id as i64)
1196 .fetch_all(&*self.pool)
1197 .await
1198 .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1199
1200 event_data_rows
1201 .into_iter()
1202 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1203 .collect::<Vec<Event>>()
1204 .into_iter()
1205 .map(Ok)
1206 .collect()
1207 }
1208
1209 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1210 async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1211 let execution_id = self.latest_execution_id(instance).await?;
1212 self.read_history_with_execution_id(instance, execution_id)
1213 .await
1214 }
1215
1216 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1217 async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1218 sqlx::query_scalar(&format!(
1219 "SELECT {}.latest_execution_id($1)",
1220 self.schema_name
1221 ))
1222 .bind(instance)
1223 .fetch_optional(&*self.pool)
1224 .await
1225 .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1226 .map(|id: i64| id as u64)
1227 .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1228 }
1229
1230 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1231 async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1232 let row: Option<(
1233 String,
1234 String,
1235 String,
1236 i64,
1237 chrono::DateTime<Utc>,
1238 Option<chrono::DateTime<Utc>>,
1239 Option<String>,
1240 Option<String>,
1241 )> = sqlx::query_as(&format!(
1242 "SELECT * FROM {}.get_instance_info($1)",
1243 self.schema_name
1244 ))
1245 .bind(instance)
1246 .fetch_optional(&*self.pool)
1247 .await
1248 .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1249
1250 let (
1251 instance_id,
1252 orchestration_name,
1253 orchestration_version,
1254 current_execution_id,
1255 created_at,
1256 updated_at,
1257 status,
1258 output,
1259 ) =
1260 row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1261
1262 Ok(InstanceInfo {
1263 instance_id,
1264 orchestration_name,
1265 orchestration_version,
1266 current_execution_id: current_execution_id as u64,
1267 status: status.unwrap_or_else(|| "Running".to_string()),
1268 output,
1269 created_at: created_at.timestamp_millis() as u64,
1270 updated_at: updated_at
1271 .map(|dt| dt.timestamp_millis() as u64)
1272 .unwrap_or(created_at.timestamp_millis() as u64),
1273 })
1274 }
1275
1276 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1277 async fn get_execution_info(
1278 &self,
1279 instance: &str,
1280 execution_id: u64,
1281 ) -> Result<ExecutionInfo, ProviderError> {
1282 let row: Option<(
1283 i64,
1284 String,
1285 Option<String>,
1286 chrono::DateTime<Utc>,
1287 Option<chrono::DateTime<Utc>>,
1288 i64,
1289 )> = sqlx::query_as(&format!(
1290 "SELECT * FROM {}.get_execution_info($1, $2)",
1291 self.schema_name
1292 ))
1293 .bind(instance)
1294 .bind(execution_id as i64)
1295 .fetch_optional(&*self.pool)
1296 .await
1297 .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1298
1299 let (exec_id, status, output, started_at, completed_at, event_count) = row
1300 .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1301
1302 Ok(ExecutionInfo {
1303 execution_id: exec_id as u64,
1304 status,
1305 output,
1306 started_at: started_at.timestamp_millis() as u64,
1307 completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1308 event_count: event_count as usize,
1309 })
1310 }
1311
1312 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1313 async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1314 let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1315 "SELECT * FROM {}.get_system_metrics()",
1316 self.schema_name
1317 ))
1318 .fetch_optional(&*self.pool)
1319 .await
1320 .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1321
1322 let (
1323 total_instances,
1324 total_executions,
1325 running_instances,
1326 completed_instances,
1327 failed_instances,
1328 total_events,
1329 ) = row.ok_or_else(|| {
1330 ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1331 })?;
1332
1333 Ok(SystemMetrics {
1334 total_instances: total_instances as u64,
1335 total_executions: total_executions as u64,
1336 running_instances: running_instances as u64,
1337 completed_instances: completed_instances as u64,
1338 failed_instances: failed_instances as u64,
1339 total_events: total_events as u64,
1340 })
1341 }
1342
1343 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1344 async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1345 let now_ms = Self::now_millis();
1346
1347 let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1348 "SELECT * FROM {}.get_queue_depths($1)",
1349 self.schema_name
1350 ))
1351 .bind(now_ms)
1352 .fetch_optional(&*self.pool)
1353 .await
1354 .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1355
1356 let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1357 ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1358 })?;
1359
1360 Ok(QueueDepths {
1361 orchestrator_queue: orchestrator_queue as usize,
1362 worker_queue: worker_queue as usize,
1363 timer_queue: 0, })
1365 }
1366}