1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4 ExecutionInfo, ExecutionMetadata, InstanceInfo, OrchestrationItem, Provider, ProviderAdmin,
5 ProviderError, QueueDepths, SystemMetrics, WorkItem,
6};
7use duroxide::Event;
8use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
9use std::sync::Arc;
10use std::time::Duration;
11use std::time::{SystemTime, UNIX_EPOCH};
12use tokio::time::sleep;
13use tracing::{debug, error, instrument, warn};
14
15use crate::migrations::MigrationRunner;
16
17pub struct PostgresProvider {
40 pool: Arc<PgPool>,
41 schema_name: String,
42}
43
44impl PostgresProvider {
45 pub async fn new(database_url: &str) -> Result<Self> {
46 Self::new_with_schema(database_url, None).await
47 }
48
49 pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
50 let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
51 .ok()
52 .and_then(|s| s.parse::<u32>().ok())
53 .unwrap_or(10);
54
55 let pool = PgPoolOptions::new()
56 .max_connections(max_connections)
57 .min_connections(1)
58 .acquire_timeout(std::time::Duration::from_secs(30))
59 .connect(database_url)
60 .await?;
61
62 let schema_name = schema_name.unwrap_or("public").to_string();
63
64 let provider = Self {
65 pool: Arc::new(pool),
66 schema_name: schema_name.clone(),
67 };
68
69 let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
71 migration_runner.migrate().await?;
72
73 Ok(provider)
74 }
75
76 #[instrument(skip(self), target = "duroxide::providers::postgres")]
77 pub async fn initialize_schema(&self) -> Result<()> {
78 let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
81 migration_runner.migrate().await?;
82 Ok(())
83 }
84
85 fn now_millis() -> i64 {
87 SystemTime::now()
88 .duration_since(UNIX_EPOCH)
89 .unwrap()
90 .as_millis() as i64
91 }
92
93 fn table_name(&self, table: &str) -> String {
95 format!("{}.{}", self.schema_name, table)
96 }
97
98 pub fn pool(&self) -> &PgPool {
100 &self.pool
101 }
102
103 pub fn schema_name(&self) -> &str {
105 &self.schema_name
106 }
107
108 fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
110 match e {
111 SqlxError::Database(ref db_err) => {
112 let code_opt = db_err.code();
114 let code = code_opt.as_deref();
115 if code == Some("40P01") {
116 ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
118 } else if code == Some("40001") {
119 ProviderError::permanent(operation, format!("Serialization failure: {e}"))
121 } else if code == Some("23505") {
122 ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
124 } else if code == Some("23503") {
125 ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
127 } else {
128 ProviderError::permanent(operation, format!("Database error: {e}"))
129 }
130 }
131 SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
132 ProviderError::retryable(operation, format!("Connection pool error: {e}"))
133 }
134 SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
135 _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
136 }
137 }
138
139 pub async fn cleanup_schema(&self) -> Result<()> {
144 sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
146 .execute(&*self.pool)
147 .await?;
148
149 if self.schema_name != "public" {
152 sqlx::query(&format!(
153 "DROP SCHEMA IF EXISTS {} CASCADE",
154 self.schema_name
155 ))
156 .execute(&*self.pool)
157 .await?;
158 } else {
159 }
162
163 Ok(())
164 }
165}
166
167#[async_trait::async_trait]
168impl Provider for PostgresProvider {
169 #[instrument(skip(self), target = "duroxide::providers::postgres")]
170 async fn fetch_orchestration_item(
171 &self,
172 lock_timeout: Duration,
173 _poll_timeout: Duration,
174 ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
175 let start = std::time::Instant::now();
176
177 const MAX_RETRIES: u32 = 3;
178 const RETRY_DELAY_MS: u64 = 50;
179
180 let lock_timeout_ms = lock_timeout.as_millis() as i64;
182 let mut _last_error: Option<ProviderError> = None;
183
184 for attempt in 0..=MAX_RETRIES {
185 let now_ms = Self::now_millis();
186
187 let result: Result<
188 Option<(
189 String,
190 String,
191 String,
192 i64,
193 serde_json::Value,
194 serde_json::Value,
195 String,
196 i32,
197 )>,
198 SqlxError,
199 > = sqlx::query_as(&format!(
200 "SELECT * FROM {}.fetch_orchestration_item($1, $2)",
201 self.schema_name
202 ))
203 .bind(now_ms)
204 .bind(lock_timeout_ms)
205 .fetch_optional(&*self.pool)
206 .await;
207
208 let row = match result {
209 Ok(r) => r,
210 Err(e) => {
211 let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
212 if provider_err.is_retryable() && attempt < MAX_RETRIES {
213 warn!(
214 target = "duroxide::providers::postgres",
215 operation = "fetch_orchestration_item",
216 attempt = attempt + 1,
217 error = %provider_err,
218 "Retryable error, will retry"
219 );
220 _last_error = Some(provider_err);
221 sleep(std::time::Duration::from_millis(
222 RETRY_DELAY_MS * (attempt as u64 + 1),
223 ))
224 .await;
225 continue;
226 }
227 return Err(provider_err);
228 }
229 };
230
231 if let Some((
232 instance_id,
233 orchestration_name,
234 orchestration_version,
235 execution_id,
236 history_json,
237 messages_json,
238 lock_token,
239 attempt_count,
240 )) = row
241 {
242 let history: Vec<Event> = serde_json::from_value(history_json).map_err(|e| {
243 ProviderError::permanent(
244 "fetch_orchestration_item",
245 format!("Failed to deserialize history: {e}"),
246 )
247 })?;
248
249 let messages: Vec<WorkItem> =
250 serde_json::from_value(messages_json).map_err(|e| {
251 ProviderError::permanent(
252 "fetch_orchestration_item",
253 format!("Failed to deserialize messages: {e}"),
254 )
255 })?;
256
257 let duration_ms = start.elapsed().as_millis() as u64;
258 debug!(
259 target = "duroxide::providers::postgres",
260 operation = "fetch_orchestration_item",
261 instance_id = %instance_id,
262 execution_id = execution_id,
263 message_count = messages.len(),
264 history_count = history.len(),
265 attempt_count = attempt_count,
266 duration_ms = duration_ms,
267 attempts = attempt + 1,
268 "Fetched orchestration item via stored procedure"
269 );
270
271 return Ok(Some((
272 OrchestrationItem {
273 instance: instance_id,
274 orchestration_name,
275 execution_id: execution_id as u64,
276 version: orchestration_version,
277 history,
278 messages,
279 },
280 lock_token,
281 attempt_count as u32,
282 )));
283 }
284
285 if attempt < MAX_RETRIES {
286 sleep(std::time::Duration::from_millis(RETRY_DELAY_MS)).await;
287 }
288 }
289
290 Ok(None)
291 }
292 #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
293 async fn ack_orchestration_item(
294 &self,
295 lock_token: &str,
296 execution_id: u64,
297 history_delta: Vec<Event>,
298 worker_items: Vec<WorkItem>,
299 orchestrator_items: Vec<WorkItem>,
300 metadata: ExecutionMetadata,
301 ) -> Result<(), ProviderError> {
302 let start = std::time::Instant::now();
303
304 const MAX_RETRIES: u32 = 3;
305 const RETRY_DELAY_MS: u64 = 50;
306
307 let mut history_delta_payload = Vec::with_capacity(history_delta.len());
308 for event in &history_delta {
309 if event.event_id() == 0 {
310 return Err(ProviderError::permanent(
311 "ack_orchestration_item",
312 "event_id must be set by runtime",
313 ));
314 }
315
316 let event_json = serde_json::to_string(event).map_err(|e| {
317 ProviderError::permanent(
318 "ack_orchestration_item",
319 format!("Failed to serialize event: {e}"),
320 )
321 })?;
322
323 let event_type = format!("{event:?}")
324 .split('{')
325 .next()
326 .unwrap_or("Unknown")
327 .trim()
328 .to_string();
329
330 history_delta_payload.push(serde_json::json!({
331 "event_id": event.event_id(),
332 "event_type": event_type,
333 "event_data": event_json,
334 }));
335 }
336
337 let history_delta_json = serde_json::Value::Array(history_delta_payload);
338
339 let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
340 ProviderError::permanent(
341 "ack_orchestration_item",
342 format!("Failed to serialize worker items: {e}"),
343 )
344 })?;
345
346 let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
347 ProviderError::permanent(
348 "ack_orchestration_item",
349 format!("Failed to serialize orchestrator items: {e}"),
350 )
351 })?;
352
353 let metadata_json = serde_json::json!({
354 "orchestration_name": metadata.orchestration_name,
355 "orchestration_version": metadata.orchestration_version,
356 "status": metadata.status,
357 "output": metadata.output,
358 });
359
360 for attempt in 0..=MAX_RETRIES {
361 let result = sqlx::query(&format!(
362 "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6)",
363 self.schema_name
364 ))
365 .bind(lock_token)
366 .bind(execution_id as i64)
367 .bind(&history_delta_json)
368 .bind(&worker_items_json)
369 .bind(&orchestrator_items_json)
370 .bind(&metadata_json)
371 .execute(&*self.pool)
372 .await;
373
374 match result {
375 Ok(_) => {
376 let duration_ms = start.elapsed().as_millis() as u64;
377 debug!(
378 target = "duroxide::providers::postgres",
379 operation = "ack_orchestration_item",
380 execution_id = execution_id,
381 history_count = history_delta.len(),
382 worker_items_count = worker_items.len(),
383 orchestrator_items_count = orchestrator_items.len(),
384 duration_ms = duration_ms,
385 attempts = attempt + 1,
386 "Acknowledged orchestration item via stored procedure"
387 );
388 return Ok(());
389 }
390 Err(e) => {
391 if let SqlxError::Database(db_err) = &e {
393 if db_err.message().contains("Invalid lock token") {
394 return Err(ProviderError::permanent(
395 "ack_orchestration_item",
396 "Invalid lock token",
397 ));
398 }
399 } else if e.to_string().contains("Invalid lock token") {
400 return Err(ProviderError::permanent(
401 "ack_orchestration_item",
402 "Invalid lock token",
403 ));
404 }
405
406 let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
407 if provider_err.is_retryable() && attempt < MAX_RETRIES {
408 warn!(
409 target = "duroxide::providers::postgres",
410 operation = "ack_orchestration_item",
411 attempt = attempt + 1,
412 error = %provider_err,
413 "Retryable error, will retry"
414 );
415 sleep(std::time::Duration::from_millis(
416 RETRY_DELAY_MS * (attempt as u64 + 1),
417 ))
418 .await;
419 continue;
420 }
421 return Err(provider_err);
422 }
423 }
424 }
425
426 Ok(())
428 }
429 #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
430 async fn abandon_orchestration_item(
431 &self,
432 lock_token: &str,
433 delay: Option<Duration>,
434 ignore_attempt: bool,
435 ) -> Result<(), ProviderError> {
436 let start = std::time::Instant::now();
437 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
438
439 let instance_id = match sqlx::query_scalar::<_, String>(&format!(
440 "SELECT {}.abandon_orchestration_item($1, $2, $3)",
441 self.schema_name
442 ))
443 .bind(lock_token)
444 .bind(delay_param)
445 .bind(ignore_attempt)
446 .fetch_one(&*self.pool)
447 .await
448 {
449 Ok(instance_id) => instance_id,
450 Err(e) => {
451 if let SqlxError::Database(db_err) = &e {
452 if db_err.message().contains("Invalid lock token") {
453 return Err(ProviderError::permanent(
454 "abandon_orchestration_item",
455 "Invalid lock token",
456 ));
457 }
458 } else if e.to_string().contains("Invalid lock token") {
459 return Err(ProviderError::permanent(
460 "abandon_orchestration_item",
461 "Invalid lock token",
462 ));
463 }
464
465 return Err(Self::sqlx_to_provider_error(
466 "abandon_orchestration_item",
467 e,
468 ));
469 }
470 };
471
472 let duration_ms = start.elapsed().as_millis() as u64;
473 debug!(
474 target = "duroxide::providers::postgres",
475 operation = "abandon_orchestration_item",
476 instance_id = %instance_id,
477 delay_ms = delay.map(|d| d.as_millis() as u64),
478 ignore_attempt = ignore_attempt,
479 duration_ms = duration_ms,
480 "Abandoned orchestration item via stored procedure"
481 );
482
483 Ok(())
484 }
485
486 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
487 async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
488 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
489 "SELECT out_event_data FROM {}.fetch_history($1)",
490 self.schema_name
491 ))
492 .bind(instance)
493 .fetch_all(&*self.pool)
494 .await
495 .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
496
497 Ok(event_data_rows
498 .into_iter()
499 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
500 .collect())
501 }
502
503 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
504 async fn append_with_execution(
505 &self,
506 instance: &str,
507 execution_id: u64,
508 new_events: Vec<Event>,
509 ) -> Result<(), ProviderError> {
510 if new_events.is_empty() {
511 return Ok(());
512 }
513
514 let mut events_payload = Vec::with_capacity(new_events.len());
515 for event in &new_events {
516 if event.event_id() == 0 {
517 error!(
518 target = "duroxide::providers::postgres",
519 operation = "append_with_execution",
520 error_type = "validation_error",
521 instance_id = %instance,
522 execution_id = execution_id,
523 "event_id must be set by runtime"
524 );
525 return Err(ProviderError::permanent(
526 "append_with_execution",
527 "event_id must be set by runtime",
528 ));
529 }
530
531 let event_json = serde_json::to_string(event).map_err(|e| {
532 ProviderError::permanent(
533 "append_with_execution",
534 format!("Failed to serialize event: {e}"),
535 )
536 })?;
537
538 let event_type = format!("{event:?}")
539 .split('{')
540 .next()
541 .unwrap_or("Unknown")
542 .trim()
543 .to_string();
544
545 events_payload.push(serde_json::json!({
546 "event_id": event.event_id(),
547 "event_type": event_type,
548 "event_data": event_json,
549 }));
550 }
551
552 let events_json = serde_json::Value::Array(events_payload);
553
554 sqlx::query(&format!(
555 "SELECT {}.append_history($1, $2, $3)",
556 self.schema_name
557 ))
558 .bind(instance)
559 .bind(execution_id as i64)
560 .bind(events_json)
561 .execute(&*self.pool)
562 .await
563 .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
564
565 debug!(
566 target = "duroxide::providers::postgres",
567 operation = "append_with_execution",
568 instance_id = %instance,
569 execution_id = execution_id,
570 event_count = new_events.len(),
571 "Appended history events via stored procedure"
572 );
573
574 Ok(())
575 }
576
577 #[instrument(skip(self), target = "duroxide::providers::postgres")]
578 async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
579 let work_item = serde_json::to_string(&item).map_err(|e| {
580 ProviderError::permanent(
581 "enqueue_worker_work",
582 format!("Failed to serialize work item: {e}"),
583 )
584 })?;
585
586 sqlx::query(&format!(
587 "SELECT {}.enqueue_worker_work($1)",
588 self.schema_name
589 ))
590 .bind(work_item)
591 .execute(&*self.pool)
592 .await
593 .map_err(|e| {
594 error!(
595 target = "duroxide::providers::postgres",
596 operation = "enqueue_worker_work",
597 error_type = "database_error",
598 error = %e,
599 "Failed to enqueue worker work"
600 );
601 Self::sqlx_to_provider_error("enqueue_worker_work", e)
602 })?;
603
604 Ok(())
605 }
606
607 #[instrument(skip(self), target = "duroxide::providers::postgres")]
608 async fn fetch_work_item(
609 &self,
610 lock_timeout: Duration,
611 _poll_timeout: Duration,
612 ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
613 let start = std::time::Instant::now();
614
615 let lock_timeout_ms = lock_timeout.as_millis() as i64;
617
618 let row = match sqlx::query_as::<_, (String, String, i32)>(&format!(
619 "SELECT * FROM {}.fetch_work_item($1, $2)",
620 self.schema_name
621 ))
622 .bind(Self::now_millis())
623 .bind(lock_timeout_ms)
624 .fetch_optional(&*self.pool)
625 .await
626 {
627 Ok(row) => row,
628 Err(e) => {
629 return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
630 }
631 };
632
633 let (work_item_json, lock_token, attempt_count) = match row {
634 Some(row) => row,
635 None => return Ok(None),
636 };
637
638 let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
639 ProviderError::permanent(
640 "fetch_work_item",
641 format!("Failed to deserialize worker item: {e}"),
642 )
643 })?;
644
645 let duration_ms = start.elapsed().as_millis() as u64;
646
647 let instance_id = match &work_item {
649 WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
650 WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
651 WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
652 WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
653 WorkItem::TimerFired { instance, .. } => instance.as_str(),
654 WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
655 WorkItem::CancelInstance { instance, .. } => instance.as_str(),
656 WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
657 WorkItem::SubOrchCompleted {
658 parent_instance, ..
659 } => parent_instance.as_str(),
660 WorkItem::SubOrchFailed {
661 parent_instance, ..
662 } => parent_instance.as_str(),
663 };
664
665 debug!(
666 target = "duroxide::providers::postgres",
667 operation = "fetch_work_item",
668 instance_id = %instance_id,
669 attempt_count = attempt_count,
670 duration_ms = duration_ms,
671 "Fetched activity work item via stored procedure"
672 );
673
674 Ok(Some((work_item, lock_token, attempt_count as u32)))
675 }
676
677 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
678 async fn ack_work_item(&self, token: &str, completion: WorkItem) -> Result<(), ProviderError> {
679 let start = std::time::Instant::now();
680
681 let instance_id = match &completion {
683 WorkItem::ActivityCompleted { instance, .. }
684 | WorkItem::ActivityFailed { instance, .. } => instance,
685 _ => {
686 error!(
687 target = "duroxide::providers::postgres",
688 operation = "ack_worker",
689 error_type = "invalid_completion_type",
690 "Invalid completion work item type"
691 );
692 return Err(ProviderError::permanent(
693 "ack_worker",
694 "Invalid completion work item type",
695 ));
696 }
697 };
698
699 let completion_json = serde_json::to_string(&completion).map_err(|e| {
700 ProviderError::permanent("ack_worker", format!("Failed to serialize completion: {e}"))
701 })?;
702
703 sqlx::query(&format!(
705 "SELECT {}.ack_worker($1, $2, $3)",
706 self.schema_name
707 ))
708 .bind(token)
709 .bind(instance_id)
710 .bind(completion_json)
711 .execute(&*self.pool)
712 .await
713 .map_err(|e| {
714 if e.to_string().contains("Worker queue item not found") {
715 error!(
716 target = "duroxide::providers::postgres",
717 operation = "ack_worker",
718 error_type = "worker_item_not_found",
719 token = %token,
720 "Worker queue item not found or already processed"
721 );
722 ProviderError::permanent(
723 "ack_worker",
724 "Worker queue item not found or already processed",
725 )
726 } else {
727 Self::sqlx_to_provider_error("ack_worker", e)
728 }
729 })?;
730
731 let duration_ms = start.elapsed().as_millis() as u64;
732 debug!(
733 target = "duroxide::providers::postgres",
734 operation = "ack_worker",
735 instance_id = %instance_id,
736 duration_ms = duration_ms,
737 "Acknowledged worker and enqueued completion"
738 );
739
740 Ok(())
741 }
742
743 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
744 async fn renew_work_item_lock(
745 &self,
746 token: &str,
747 extend_for: Duration,
748 ) -> Result<(), ProviderError> {
749 let start = std::time::Instant::now();
750
751 let now_ms = Self::now_millis();
753
754 let extend_secs = extend_for.as_secs() as i64;
756
757 match sqlx::query(&format!(
758 "SELECT {}.renew_work_item_lock($1, $2, $3)",
759 self.schema_name
760 ))
761 .bind(token)
762 .bind(now_ms)
763 .bind(extend_secs)
764 .execute(&*self.pool)
765 .await
766 {
767 Ok(_) => {
768 let duration_ms = start.elapsed().as_millis() as u64;
769 debug!(
770 target = "duroxide::providers::postgres",
771 operation = "renew_work_item_lock",
772 token = %token,
773 extend_for_secs = extend_secs,
774 duration_ms = duration_ms,
775 "Work item lock renewed successfully"
776 );
777 Ok(())
778 }
779 Err(e) => {
780 if let SqlxError::Database(db_err) = &e {
781 if db_err.message().contains("Lock token invalid") {
782 return Err(ProviderError::permanent(
783 "renew_work_item_lock",
784 "Lock token invalid, expired, or already acked",
785 ));
786 }
787 } else if e.to_string().contains("Lock token invalid") {
788 return Err(ProviderError::permanent(
789 "renew_work_item_lock",
790 "Lock token invalid, expired, or already acked",
791 ));
792 }
793
794 Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
795 }
796 }
797 }
798
799 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
800 async fn abandon_work_item(
801 &self,
802 token: &str,
803 delay: Option<Duration>,
804 ignore_attempt: bool,
805 ) -> Result<(), ProviderError> {
806 let start = std::time::Instant::now();
807 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
808
809 match sqlx::query(&format!(
810 "SELECT {}.abandon_work_item($1, $2, $3)",
811 self.schema_name
812 ))
813 .bind(token)
814 .bind(delay_param)
815 .bind(ignore_attempt)
816 .execute(&*self.pool)
817 .await
818 {
819 Ok(_) => {
820 let duration_ms = start.elapsed().as_millis() as u64;
821 debug!(
822 target = "duroxide::providers::postgres",
823 operation = "abandon_work_item",
824 token = %token,
825 delay_ms = delay.map(|d| d.as_millis() as u64),
826 ignore_attempt = ignore_attempt,
827 duration_ms = duration_ms,
828 "Abandoned work item via stored procedure"
829 );
830 Ok(())
831 }
832 Err(e) => {
833 if let SqlxError::Database(db_err) = &e {
834 if db_err.message().contains("Invalid lock token")
835 || db_err.message().contains("already acked")
836 {
837 return Err(ProviderError::permanent(
838 "abandon_work_item",
839 "Invalid lock token or already acked",
840 ));
841 }
842 } else if e.to_string().contains("Invalid lock token")
843 || e.to_string().contains("already acked")
844 {
845 return Err(ProviderError::permanent(
846 "abandon_work_item",
847 "Invalid lock token or already acked",
848 ));
849 }
850
851 Err(Self::sqlx_to_provider_error("abandon_work_item", e))
852 }
853 }
854 }
855
856 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
857 async fn renew_orchestration_item_lock(
858 &self,
859 token: &str,
860 extend_for: Duration,
861 ) -> Result<(), ProviderError> {
862 let start = std::time::Instant::now();
863
864 let now_ms = Self::now_millis();
866
867 let extend_secs = extend_for.as_secs() as i64;
869
870 match sqlx::query(&format!(
871 "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
872 self.schema_name
873 ))
874 .bind(token)
875 .bind(now_ms)
876 .bind(extend_secs)
877 .execute(&*self.pool)
878 .await
879 {
880 Ok(_) => {
881 let duration_ms = start.elapsed().as_millis() as u64;
882 debug!(
883 target = "duroxide::providers::postgres",
884 operation = "renew_orchestration_item_lock",
885 token = %token,
886 extend_for_secs = extend_secs,
887 duration_ms = duration_ms,
888 "Orchestration item lock renewed successfully"
889 );
890 Ok(())
891 }
892 Err(e) => {
893 if let SqlxError::Database(db_err) = &e {
894 if db_err.message().contains("Lock token invalid")
895 || db_err.message().contains("expired")
896 || db_err.message().contains("already released")
897 {
898 return Err(ProviderError::permanent(
899 "renew_orchestration_item_lock",
900 "Lock token invalid, expired, or already released",
901 ));
902 }
903 } else if e.to_string().contains("Lock token invalid")
904 || e.to_string().contains("expired")
905 || e.to_string().contains("already released")
906 {
907 return Err(ProviderError::permanent(
908 "renew_orchestration_item_lock",
909 "Lock token invalid, expired, or already released",
910 ));
911 }
912
913 Err(Self::sqlx_to_provider_error(
914 "renew_orchestration_item_lock",
915 e,
916 ))
917 }
918 }
919 }
920
921 #[instrument(skip(self), target = "duroxide::providers::postgres")]
922 async fn enqueue_for_orchestrator(
923 &self,
924 item: WorkItem,
925 delay: Option<Duration>,
926 ) -> Result<(), ProviderError> {
927 let work_item = serde_json::to_string(&item).map_err(|e| {
928 ProviderError::permanent(
929 "enqueue_orchestrator_work",
930 format!("Failed to serialize work item: {e}"),
931 )
932 })?;
933
934 let instance_id = match &item {
936 WorkItem::StartOrchestration { instance, .. }
937 | WorkItem::ActivityCompleted { instance, .. }
938 | WorkItem::ActivityFailed { instance, .. }
939 | WorkItem::TimerFired { instance, .. }
940 | WorkItem::ExternalRaised { instance, .. }
941 | WorkItem::CancelInstance { instance, .. }
942 | WorkItem::ContinueAsNew { instance, .. } => instance,
943 WorkItem::SubOrchCompleted {
944 parent_instance, ..
945 }
946 | WorkItem::SubOrchFailed {
947 parent_instance, ..
948 } => parent_instance,
949 WorkItem::ActivityExecute { .. } => {
950 return Err(ProviderError::permanent(
951 "enqueue_orchestrator_work",
952 "ActivityExecute should go to worker queue, not orchestrator queue",
953 ));
954 }
955 };
956
957 let now_ms = Self::now_millis();
959
960 let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
961 if *fire_at_ms > 0 {
962 if let Some(delay) = delay {
964 std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
965 } else {
966 *fire_at_ms
967 }
968 } else {
969 delay
971 .map(|d| now_ms as u64 + d.as_millis() as u64)
972 .unwrap_or(now_ms as u64)
973 }
974 } else {
975 delay
977 .map(|d| now_ms as u64 + d.as_millis() as u64)
978 .unwrap_or(now_ms as u64)
979 };
980
981 let visible_at = Utc
982 .timestamp_millis_opt(visible_at_ms as i64)
983 .single()
984 .ok_or_else(|| {
985 ProviderError::permanent(
986 "enqueue_orchestrator_work",
987 "Invalid visible_at timestamp",
988 )
989 })?;
990
991 sqlx::query(&format!(
996 "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
997 self.schema_name
998 ))
999 .bind(instance_id)
1000 .bind(&work_item)
1001 .bind(visible_at)
1002 .bind::<Option<String>>(None) .bind::<Option<String>>(None) .bind::<Option<i64>>(None) .execute(&*self.pool)
1006 .await
1007 .map_err(|e| {
1008 error!(
1009 target = "duroxide::providers::postgres",
1010 operation = "enqueue_orchestrator_work",
1011 error_type = "database_error",
1012 error = %e,
1013 instance_id = %instance_id,
1014 "Failed to enqueue orchestrator work"
1015 );
1016 Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1017 })?;
1018
1019 debug!(
1020 target = "duroxide::providers::postgres",
1021 operation = "enqueue_orchestrator_work",
1022 instance_id = %instance_id,
1023 delay_ms = delay.map(|d| d.as_millis() as u64),
1024 "Enqueued orchestrator work"
1025 );
1026
1027 Ok(())
1028 }
1029
1030 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1031 async fn read_with_execution(
1032 &self,
1033 instance: &str,
1034 execution_id: u64,
1035 ) -> Result<Vec<Event>, ProviderError> {
1036 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1037 "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1038 self.table_name("history")
1039 ))
1040 .bind(instance)
1041 .bind(execution_id as i64)
1042 .fetch_all(&*self.pool)
1043 .await
1044 .ok()
1045 .unwrap_or_default();
1046
1047 Ok(event_data_rows
1048 .into_iter()
1049 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1050 .collect())
1051 }
1052
1053 fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1054 Some(self)
1055 }
1056}
1057
1058#[async_trait::async_trait]
1059impl ProviderAdmin for PostgresProvider {
1060 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1061 async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1062 sqlx::query_scalar(&format!(
1063 "SELECT instance_id FROM {}.list_instances()",
1064 self.schema_name
1065 ))
1066 .fetch_all(&*self.pool)
1067 .await
1068 .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1069 }
1070
1071 #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1072 async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1073 sqlx::query_scalar(&format!(
1074 "SELECT instance_id FROM {}.list_instances_by_status($1)",
1075 self.schema_name
1076 ))
1077 .bind(status)
1078 .fetch_all(&*self.pool)
1079 .await
1080 .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1081 }
1082
1083 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1084 async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1085 let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1086 "SELECT execution_id FROM {}.list_executions($1)",
1087 self.schema_name
1088 ))
1089 .bind(instance)
1090 .fetch_all(&*self.pool)
1091 .await
1092 .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1093
1094 Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1095 }
1096
1097 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1098 async fn read_history_with_execution_id(
1099 &self,
1100 instance: &str,
1101 execution_id: u64,
1102 ) -> Result<Vec<Event>, ProviderError> {
1103 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1104 "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1105 self.schema_name
1106 ))
1107 .bind(instance)
1108 .bind(execution_id as i64)
1109 .fetch_all(&*self.pool)
1110 .await
1111 .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1112
1113 event_data_rows
1114 .into_iter()
1115 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1116 .collect::<Vec<Event>>()
1117 .into_iter()
1118 .map(Ok)
1119 .collect()
1120 }
1121
1122 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1123 async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1124 let execution_id = self.latest_execution_id(instance).await?;
1125 self.read_history_with_execution_id(instance, execution_id)
1126 .await
1127 }
1128
1129 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1130 async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1131 sqlx::query_scalar(&format!(
1132 "SELECT {}.latest_execution_id($1)",
1133 self.schema_name
1134 ))
1135 .bind(instance)
1136 .fetch_optional(&*self.pool)
1137 .await
1138 .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1139 .map(|id: i64| id as u64)
1140 .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1141 }
1142
1143 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1144 async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1145 let row: Option<(
1146 String,
1147 String,
1148 String,
1149 i64,
1150 chrono::DateTime<Utc>,
1151 Option<chrono::DateTime<Utc>>,
1152 Option<String>,
1153 Option<String>,
1154 )> = sqlx::query_as(&format!(
1155 "SELECT * FROM {}.get_instance_info($1)",
1156 self.schema_name
1157 ))
1158 .bind(instance)
1159 .fetch_optional(&*self.pool)
1160 .await
1161 .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1162
1163 let (
1164 instance_id,
1165 orchestration_name,
1166 orchestration_version,
1167 current_execution_id,
1168 created_at,
1169 updated_at,
1170 status,
1171 output,
1172 ) =
1173 row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1174
1175 Ok(InstanceInfo {
1176 instance_id,
1177 orchestration_name,
1178 orchestration_version,
1179 current_execution_id: current_execution_id as u64,
1180 status: status.unwrap_or_else(|| "Running".to_string()),
1181 output,
1182 created_at: created_at.timestamp_millis() as u64,
1183 updated_at: updated_at
1184 .map(|dt| dt.timestamp_millis() as u64)
1185 .unwrap_or(created_at.timestamp_millis() as u64),
1186 })
1187 }
1188
1189 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1190 async fn get_execution_info(
1191 &self,
1192 instance: &str,
1193 execution_id: u64,
1194 ) -> Result<ExecutionInfo, ProviderError> {
1195 let row: Option<(
1196 i64,
1197 String,
1198 Option<String>,
1199 chrono::DateTime<Utc>,
1200 Option<chrono::DateTime<Utc>>,
1201 i64,
1202 )> = sqlx::query_as(&format!(
1203 "SELECT * FROM {}.get_execution_info($1, $2)",
1204 self.schema_name
1205 ))
1206 .bind(instance)
1207 .bind(execution_id as i64)
1208 .fetch_optional(&*self.pool)
1209 .await
1210 .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1211
1212 let (exec_id, status, output, started_at, completed_at, event_count) = row
1213 .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1214
1215 Ok(ExecutionInfo {
1216 execution_id: exec_id as u64,
1217 status,
1218 output,
1219 started_at: started_at.timestamp_millis() as u64,
1220 completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1221 event_count: event_count as usize,
1222 })
1223 }
1224
1225 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1226 async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1227 let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1228 "SELECT * FROM {}.get_system_metrics()",
1229 self.schema_name
1230 ))
1231 .fetch_optional(&*self.pool)
1232 .await
1233 .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1234
1235 let (
1236 total_instances,
1237 total_executions,
1238 running_instances,
1239 completed_instances,
1240 failed_instances,
1241 total_events,
1242 ) = row.ok_or_else(|| {
1243 ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1244 })?;
1245
1246 Ok(SystemMetrics {
1247 total_instances: total_instances as u64,
1248 total_executions: total_executions as u64,
1249 running_instances: running_instances as u64,
1250 completed_instances: completed_instances as u64,
1251 failed_instances: failed_instances as u64,
1252 total_events: total_events as u64,
1253 })
1254 }
1255
1256 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1257 async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1258 let now_ms = Self::now_millis();
1259
1260 let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1261 "SELECT * FROM {}.get_queue_depths($1)",
1262 self.schema_name
1263 ))
1264 .bind(now_ms)
1265 .fetch_optional(&*self.pool)
1266 .await
1267 .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1268
1269 let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1270 ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1271 })?;
1272
1273 Ok(QueueDepths {
1274 orchestrator_queue: orchestrator_queue as usize,
1275 worker_queue: worker_queue as usize,
1276 timer_queue: 0, })
1278 }
1279}