1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4 ExecutionInfo, ExecutionMetadata, InstanceInfo, OrchestrationItem, Provider, ProviderAdmin,
5 ProviderError, QueueDepths, SystemMetrics, WorkItem,
6};
7use duroxide::Event;
8use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
9use std::sync::Arc;
10use std::time::Duration;
11use std::time::{SystemTime, UNIX_EPOCH};
12use tokio::time::sleep;
13use tracing::{debug, error, instrument, warn};
14
15use crate::migrations::MigrationRunner;
16
17pub struct PostgresProvider {
40 pool: Arc<PgPool>,
41 schema_name: String,
42}
43
44impl PostgresProvider {
45 pub async fn new(database_url: &str) -> Result<Self> {
46 Self::new_with_schema(database_url, None).await
47 }
48
49 pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
50 let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
51 .ok()
52 .and_then(|s| s.parse::<u32>().ok())
53 .unwrap_or(10);
54
55 let pool = PgPoolOptions::new()
56 .max_connections(max_connections)
57 .min_connections(1)
58 .acquire_timeout(std::time::Duration::from_secs(30))
59 .connect(database_url)
60 .await?;
61
62 let schema_name = schema_name.unwrap_or("public").to_string();
63
64 let provider = Self {
65 pool: Arc::new(pool),
66 schema_name: schema_name.clone(),
67 };
68
69 let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
71 migration_runner.migrate().await?;
72
73 Ok(provider)
74 }
75
76 #[instrument(skip(self), target = "duroxide::providers::postgres")]
77 pub async fn initialize_schema(&self) -> Result<()> {
78 let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
81 migration_runner.migrate().await?;
82 Ok(())
83 }
84
85 fn now_millis() -> i64 {
87 SystemTime::now()
88 .duration_since(UNIX_EPOCH)
89 .unwrap()
90 .as_millis() as i64
91 }
92
93 fn table_name(&self, table: &str) -> String {
95 format!("{}.{}", self.schema_name, table)
96 }
97
98 pub fn pool(&self) -> &PgPool {
100 &self.pool
101 }
102
103 pub fn schema_name(&self) -> &str {
105 &self.schema_name
106 }
107
108 fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
110 match e {
111 SqlxError::Database(ref db_err) => {
112 let code_opt = db_err.code();
114 let code = code_opt.as_deref();
115 if code == Some("40P01") {
116 ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
118 } else if code == Some("40001") {
119 ProviderError::permanent(operation, format!("Serialization failure: {e}"))
121 } else if code == Some("23505") {
122 ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
124 } else if code == Some("23503") {
125 ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
127 } else {
128 ProviderError::permanent(operation, format!("Database error: {e}"))
129 }
130 }
131 SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
132 ProviderError::retryable(operation, format!("Connection pool error: {e}"))
133 }
134 SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
135 _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
136 }
137 }
138
139 pub async fn cleanup_schema(&self) -> Result<()> {
144 sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
146 .execute(&*self.pool)
147 .await?;
148
149 if self.schema_name != "public" {
152 sqlx::query(&format!(
153 "DROP SCHEMA IF EXISTS {} CASCADE",
154 self.schema_name
155 ))
156 .execute(&*self.pool)
157 .await?;
158 } else {
159 }
162
163 Ok(())
164 }
165}
166
167#[async_trait::async_trait]
168impl Provider for PostgresProvider {
169 fn name(&self) -> &str {
170 "duroxide-pg"
171 }
172
173 fn version(&self) -> &str {
174 env!("CARGO_PKG_VERSION")
175 }
176
177 #[instrument(skip(self), target = "duroxide::providers::postgres")]
178 async fn fetch_orchestration_item(
179 &self,
180 lock_timeout: Duration,
181 _poll_timeout: Duration,
182 ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
183 let start = std::time::Instant::now();
184
185 const MAX_RETRIES: u32 = 3;
186 const RETRY_DELAY_MS: u64 = 50;
187
188 let lock_timeout_ms = lock_timeout.as_millis() as i64;
190 let mut _last_error: Option<ProviderError> = None;
191
192 for attempt in 0..=MAX_RETRIES {
193 let now_ms = Self::now_millis();
194
195 let result: Result<
196 Option<(
197 String,
198 String,
199 String,
200 i64,
201 serde_json::Value,
202 serde_json::Value,
203 String,
204 i32,
205 )>,
206 SqlxError,
207 > = sqlx::query_as(&format!(
208 "SELECT * FROM {}.fetch_orchestration_item($1, $2)",
209 self.schema_name
210 ))
211 .bind(now_ms)
212 .bind(lock_timeout_ms)
213 .fetch_optional(&*self.pool)
214 .await;
215
216 let row = match result {
217 Ok(r) => r,
218 Err(e) => {
219 let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
220 if provider_err.is_retryable() && attempt < MAX_RETRIES {
221 warn!(
222 target = "duroxide::providers::postgres",
223 operation = "fetch_orchestration_item",
224 attempt = attempt + 1,
225 error = %provider_err,
226 "Retryable error, will retry"
227 );
228 _last_error = Some(provider_err);
229 sleep(std::time::Duration::from_millis(
230 RETRY_DELAY_MS * (attempt as u64 + 1),
231 ))
232 .await;
233 continue;
234 }
235 return Err(provider_err);
236 }
237 };
238
239 if let Some((
240 instance_id,
241 orchestration_name,
242 orchestration_version,
243 execution_id,
244 history_json,
245 messages_json,
246 lock_token,
247 attempt_count,
248 )) = row
249 {
250 let history: Vec<Event> = serde_json::from_value(history_json).map_err(|e| {
251 ProviderError::permanent(
252 "fetch_orchestration_item",
253 format!("Failed to deserialize history: {e}"),
254 )
255 })?;
256
257 let messages: Vec<WorkItem> =
258 serde_json::from_value(messages_json).map_err(|e| {
259 ProviderError::permanent(
260 "fetch_orchestration_item",
261 format!("Failed to deserialize messages: {e}"),
262 )
263 })?;
264
265 let duration_ms = start.elapsed().as_millis() as u64;
266 debug!(
267 target = "duroxide::providers::postgres",
268 operation = "fetch_orchestration_item",
269 instance_id = %instance_id,
270 execution_id = execution_id,
271 message_count = messages.len(),
272 history_count = history.len(),
273 attempt_count = attempt_count,
274 duration_ms = duration_ms,
275 attempts = attempt + 1,
276 "Fetched orchestration item via stored procedure"
277 );
278
279 return Ok(Some((
280 OrchestrationItem {
281 instance: instance_id,
282 orchestration_name,
283 execution_id: execution_id as u64,
284 version: orchestration_version,
285 history,
286 messages,
287 },
288 lock_token,
289 attempt_count as u32,
290 )));
291 }
292
293 if attempt < MAX_RETRIES {
294 sleep(std::time::Duration::from_millis(RETRY_DELAY_MS)).await;
295 }
296 }
297
298 Ok(None)
299 }
300 #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
301 async fn ack_orchestration_item(
302 &self,
303 lock_token: &str,
304 execution_id: u64,
305 history_delta: Vec<Event>,
306 worker_items: Vec<WorkItem>,
307 orchestrator_items: Vec<WorkItem>,
308 metadata: ExecutionMetadata,
309 ) -> Result<(), ProviderError> {
310 let start = std::time::Instant::now();
311
312 const MAX_RETRIES: u32 = 3;
313 const RETRY_DELAY_MS: u64 = 50;
314
315 let mut history_delta_payload = Vec::with_capacity(history_delta.len());
316 for event in &history_delta {
317 if event.event_id() == 0 {
318 return Err(ProviderError::permanent(
319 "ack_orchestration_item",
320 "event_id must be set by runtime",
321 ));
322 }
323
324 let event_json = serde_json::to_string(event).map_err(|e| {
325 ProviderError::permanent(
326 "ack_orchestration_item",
327 format!("Failed to serialize event: {e}"),
328 )
329 })?;
330
331 let event_type = format!("{event:?}")
332 .split('{')
333 .next()
334 .unwrap_or("Unknown")
335 .trim()
336 .to_string();
337
338 history_delta_payload.push(serde_json::json!({
339 "event_id": event.event_id(),
340 "event_type": event_type,
341 "event_data": event_json,
342 }));
343 }
344
345 let history_delta_json = serde_json::Value::Array(history_delta_payload);
346
347 let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
348 ProviderError::permanent(
349 "ack_orchestration_item",
350 format!("Failed to serialize worker items: {e}"),
351 )
352 })?;
353
354 let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
355 ProviderError::permanent(
356 "ack_orchestration_item",
357 format!("Failed to serialize orchestrator items: {e}"),
358 )
359 })?;
360
361 let metadata_json = serde_json::json!({
362 "orchestration_name": metadata.orchestration_name,
363 "orchestration_version": metadata.orchestration_version,
364 "status": metadata.status,
365 "output": metadata.output,
366 });
367
368 for attempt in 0..=MAX_RETRIES {
369 let result = sqlx::query(&format!(
370 "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6)",
371 self.schema_name
372 ))
373 .bind(lock_token)
374 .bind(execution_id as i64)
375 .bind(&history_delta_json)
376 .bind(&worker_items_json)
377 .bind(&orchestrator_items_json)
378 .bind(&metadata_json)
379 .execute(&*self.pool)
380 .await;
381
382 match result {
383 Ok(_) => {
384 let duration_ms = start.elapsed().as_millis() as u64;
385 debug!(
386 target = "duroxide::providers::postgres",
387 operation = "ack_orchestration_item",
388 execution_id = execution_id,
389 history_count = history_delta.len(),
390 worker_items_count = worker_items.len(),
391 orchestrator_items_count = orchestrator_items.len(),
392 duration_ms = duration_ms,
393 attempts = attempt + 1,
394 "Acknowledged orchestration item via stored procedure"
395 );
396 return Ok(());
397 }
398 Err(e) => {
399 if let SqlxError::Database(db_err) = &e {
401 if db_err.message().contains("Invalid lock token") {
402 return Err(ProviderError::permanent(
403 "ack_orchestration_item",
404 "Invalid lock token",
405 ));
406 }
407 } else if e.to_string().contains("Invalid lock token") {
408 return Err(ProviderError::permanent(
409 "ack_orchestration_item",
410 "Invalid lock token",
411 ));
412 }
413
414 let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
415 if provider_err.is_retryable() && attempt < MAX_RETRIES {
416 warn!(
417 target = "duroxide::providers::postgres",
418 operation = "ack_orchestration_item",
419 attempt = attempt + 1,
420 error = %provider_err,
421 "Retryable error, will retry"
422 );
423 sleep(std::time::Duration::from_millis(
424 RETRY_DELAY_MS * (attempt as u64 + 1),
425 ))
426 .await;
427 continue;
428 }
429 return Err(provider_err);
430 }
431 }
432 }
433
434 Ok(())
436 }
437 #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
438 async fn abandon_orchestration_item(
439 &self,
440 lock_token: &str,
441 delay: Option<Duration>,
442 ignore_attempt: bool,
443 ) -> Result<(), ProviderError> {
444 let start = std::time::Instant::now();
445 let now_ms = Self::now_millis();
446 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
447
448 let instance_id = match sqlx::query_scalar::<_, String>(&format!(
449 "SELECT {}.abandon_orchestration_item($1, $2, $3, $4)",
450 self.schema_name
451 ))
452 .bind(lock_token)
453 .bind(now_ms)
454 .bind(delay_param)
455 .bind(ignore_attempt)
456 .fetch_one(&*self.pool)
457 .await
458 {
459 Ok(instance_id) => instance_id,
460 Err(e) => {
461 if let SqlxError::Database(db_err) = &e {
462 if db_err.message().contains("Invalid lock token") {
463 return Err(ProviderError::permanent(
464 "abandon_orchestration_item",
465 "Invalid lock token",
466 ));
467 }
468 } else if e.to_string().contains("Invalid lock token") {
469 return Err(ProviderError::permanent(
470 "abandon_orchestration_item",
471 "Invalid lock token",
472 ));
473 }
474
475 return Err(Self::sqlx_to_provider_error(
476 "abandon_orchestration_item",
477 e,
478 ));
479 }
480 };
481
482 let duration_ms = start.elapsed().as_millis() as u64;
483 debug!(
484 target = "duroxide::providers::postgres",
485 operation = "abandon_orchestration_item",
486 instance_id = %instance_id,
487 delay_ms = delay.map(|d| d.as_millis() as u64),
488 ignore_attempt = ignore_attempt,
489 duration_ms = duration_ms,
490 "Abandoned orchestration item via stored procedure"
491 );
492
493 Ok(())
494 }
495
496 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
497 async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
498 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
499 "SELECT out_event_data FROM {}.fetch_history($1)",
500 self.schema_name
501 ))
502 .bind(instance)
503 .fetch_all(&*self.pool)
504 .await
505 .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
506
507 Ok(event_data_rows
508 .into_iter()
509 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
510 .collect())
511 }
512
513 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
514 async fn append_with_execution(
515 &self,
516 instance: &str,
517 execution_id: u64,
518 new_events: Vec<Event>,
519 ) -> Result<(), ProviderError> {
520 if new_events.is_empty() {
521 return Ok(());
522 }
523
524 let mut events_payload = Vec::with_capacity(new_events.len());
525 for event in &new_events {
526 if event.event_id() == 0 {
527 error!(
528 target = "duroxide::providers::postgres",
529 operation = "append_with_execution",
530 error_type = "validation_error",
531 instance_id = %instance,
532 execution_id = execution_id,
533 "event_id must be set by runtime"
534 );
535 return Err(ProviderError::permanent(
536 "append_with_execution",
537 "event_id must be set by runtime",
538 ));
539 }
540
541 let event_json = serde_json::to_string(event).map_err(|e| {
542 ProviderError::permanent(
543 "append_with_execution",
544 format!("Failed to serialize event: {e}"),
545 )
546 })?;
547
548 let event_type = format!("{event:?}")
549 .split('{')
550 .next()
551 .unwrap_or("Unknown")
552 .trim()
553 .to_string();
554
555 events_payload.push(serde_json::json!({
556 "event_id": event.event_id(),
557 "event_type": event_type,
558 "event_data": event_json,
559 }));
560 }
561
562 let events_json = serde_json::Value::Array(events_payload);
563
564 sqlx::query(&format!(
565 "SELECT {}.append_history($1, $2, $3)",
566 self.schema_name
567 ))
568 .bind(instance)
569 .bind(execution_id as i64)
570 .bind(events_json)
571 .execute(&*self.pool)
572 .await
573 .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
574
575 debug!(
576 target = "duroxide::providers::postgres",
577 operation = "append_with_execution",
578 instance_id = %instance,
579 execution_id = execution_id,
580 event_count = new_events.len(),
581 "Appended history events via stored procedure"
582 );
583
584 Ok(())
585 }
586
587 #[instrument(skip(self), target = "duroxide::providers::postgres")]
588 async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
589 let work_item = serde_json::to_string(&item).map_err(|e| {
590 ProviderError::permanent(
591 "enqueue_worker_work",
592 format!("Failed to serialize work item: {e}"),
593 )
594 })?;
595
596 let now_ms = Self::now_millis();
597
598 sqlx::query(&format!(
599 "SELECT {}.enqueue_worker_work($1, $2)",
600 self.schema_name
601 ))
602 .bind(work_item)
603 .bind(now_ms)
604 .execute(&*self.pool)
605 .await
606 .map_err(|e| {
607 error!(
608 target = "duroxide::providers::postgres",
609 operation = "enqueue_worker_work",
610 error_type = "database_error",
611 error = %e,
612 "Failed to enqueue worker work"
613 );
614 Self::sqlx_to_provider_error("enqueue_worker_work", e)
615 })?;
616
617 Ok(())
618 }
619
620 #[instrument(skip(self), target = "duroxide::providers::postgres")]
621 async fn fetch_work_item(
622 &self,
623 lock_timeout: Duration,
624 _poll_timeout: Duration,
625 ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
626 let start = std::time::Instant::now();
627
628 let lock_timeout_ms = lock_timeout.as_millis() as i64;
630
631 let row = match sqlx::query_as::<_, (String, String, i32)>(&format!(
632 "SELECT * FROM {}.fetch_work_item($1, $2)",
633 self.schema_name
634 ))
635 .bind(Self::now_millis())
636 .bind(lock_timeout_ms)
637 .fetch_optional(&*self.pool)
638 .await
639 {
640 Ok(row) => row,
641 Err(e) => {
642 return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
643 }
644 };
645
646 let (work_item_json, lock_token, attempt_count) = match row {
647 Some(row) => row,
648 None => return Ok(None),
649 };
650
651 let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
652 ProviderError::permanent(
653 "fetch_work_item",
654 format!("Failed to deserialize worker item: {e}"),
655 )
656 })?;
657
658 let duration_ms = start.elapsed().as_millis() as u64;
659
660 let instance_id = match &work_item {
662 WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
663 WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
664 WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
665 WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
666 WorkItem::TimerFired { instance, .. } => instance.as_str(),
667 WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
668 WorkItem::CancelInstance { instance, .. } => instance.as_str(),
669 WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
670 WorkItem::SubOrchCompleted {
671 parent_instance, ..
672 } => parent_instance.as_str(),
673 WorkItem::SubOrchFailed {
674 parent_instance, ..
675 } => parent_instance.as_str(),
676 };
677
678 debug!(
679 target = "duroxide::providers::postgres",
680 operation = "fetch_work_item",
681 instance_id = %instance_id,
682 attempt_count = attempt_count,
683 duration_ms = duration_ms,
684 "Fetched activity work item via stored procedure"
685 );
686
687 Ok(Some((work_item, lock_token, attempt_count as u32)))
688 }
689
690 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
691 async fn ack_work_item(&self, token: &str, completion: WorkItem) -> Result<(), ProviderError> {
692 let start = std::time::Instant::now();
693
694 let instance_id = match &completion {
696 WorkItem::ActivityCompleted { instance, .. }
697 | WorkItem::ActivityFailed { instance, .. } => instance,
698 _ => {
699 error!(
700 target = "duroxide::providers::postgres",
701 operation = "ack_worker",
702 error_type = "invalid_completion_type",
703 "Invalid completion work item type"
704 );
705 return Err(ProviderError::permanent(
706 "ack_worker",
707 "Invalid completion work item type",
708 ));
709 }
710 };
711
712 let completion_json = serde_json::to_string(&completion).map_err(|e| {
713 ProviderError::permanent("ack_worker", format!("Failed to serialize completion: {e}"))
714 })?;
715
716 let now_ms = Self::now_millis();
717
718 sqlx::query(&format!(
720 "SELECT {}.ack_worker($1, $2, $3, $4)",
721 self.schema_name
722 ))
723 .bind(token)
724 .bind(instance_id)
725 .bind(completion_json)
726 .bind(now_ms)
727 .execute(&*self.pool)
728 .await
729 .map_err(|e| {
730 if e.to_string().contains("Worker queue item not found") {
731 error!(
732 target = "duroxide::providers::postgres",
733 operation = "ack_worker",
734 error_type = "worker_item_not_found",
735 token = %token,
736 "Worker queue item not found or already processed"
737 );
738 ProviderError::permanent(
739 "ack_worker",
740 "Worker queue item not found or already processed",
741 )
742 } else {
743 Self::sqlx_to_provider_error("ack_worker", e)
744 }
745 })?;
746
747 let duration_ms = start.elapsed().as_millis() as u64;
748 debug!(
749 target = "duroxide::providers::postgres",
750 operation = "ack_worker",
751 instance_id = %instance_id,
752 duration_ms = duration_ms,
753 "Acknowledged worker and enqueued completion"
754 );
755
756 Ok(())
757 }
758
759 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
760 async fn renew_work_item_lock(
761 &self,
762 token: &str,
763 extend_for: Duration,
764 ) -> Result<(), ProviderError> {
765 let start = std::time::Instant::now();
766
767 let now_ms = Self::now_millis();
769
770 let extend_secs = extend_for.as_secs() as i64;
772
773 match sqlx::query(&format!(
774 "SELECT {}.renew_work_item_lock($1, $2, $3)",
775 self.schema_name
776 ))
777 .bind(token)
778 .bind(now_ms)
779 .bind(extend_secs)
780 .execute(&*self.pool)
781 .await
782 {
783 Ok(_) => {
784 let duration_ms = start.elapsed().as_millis() as u64;
785 debug!(
786 target = "duroxide::providers::postgres",
787 operation = "renew_work_item_lock",
788 token = %token,
789 extend_for_secs = extend_secs,
790 duration_ms = duration_ms,
791 "Work item lock renewed successfully"
792 );
793 Ok(())
794 }
795 Err(e) => {
796 if let SqlxError::Database(db_err) = &e {
797 if db_err.message().contains("Lock token invalid") {
798 return Err(ProviderError::permanent(
799 "renew_work_item_lock",
800 "Lock token invalid, expired, or already acked",
801 ));
802 }
803 } else if e.to_string().contains("Lock token invalid") {
804 return Err(ProviderError::permanent(
805 "renew_work_item_lock",
806 "Lock token invalid, expired, or already acked",
807 ));
808 }
809
810 Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
811 }
812 }
813 }
814
815 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
816 async fn abandon_work_item(
817 &self,
818 token: &str,
819 delay: Option<Duration>,
820 ignore_attempt: bool,
821 ) -> Result<(), ProviderError> {
822 let start = std::time::Instant::now();
823 let now_ms = Self::now_millis();
824 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
825
826 match sqlx::query(&format!(
827 "SELECT {}.abandon_work_item($1, $2, $3, $4)",
828 self.schema_name
829 ))
830 .bind(token)
831 .bind(now_ms)
832 .bind(delay_param)
833 .bind(ignore_attempt)
834 .execute(&*self.pool)
835 .await
836 {
837 Ok(_) => {
838 let duration_ms = start.elapsed().as_millis() as u64;
839 debug!(
840 target = "duroxide::providers::postgres",
841 operation = "abandon_work_item",
842 token = %token,
843 delay_ms = delay.map(|d| d.as_millis() as u64),
844 ignore_attempt = ignore_attempt,
845 duration_ms = duration_ms,
846 "Abandoned work item via stored procedure"
847 );
848 Ok(())
849 }
850 Err(e) => {
851 if let SqlxError::Database(db_err) = &e {
852 if db_err.message().contains("Invalid lock token")
853 || db_err.message().contains("already acked")
854 {
855 return Err(ProviderError::permanent(
856 "abandon_work_item",
857 "Invalid lock token or already acked",
858 ));
859 }
860 } else if e.to_string().contains("Invalid lock token")
861 || e.to_string().contains("already acked")
862 {
863 return Err(ProviderError::permanent(
864 "abandon_work_item",
865 "Invalid lock token or already acked",
866 ));
867 }
868
869 Err(Self::sqlx_to_provider_error("abandon_work_item", e))
870 }
871 }
872 }
873
874 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
875 async fn renew_orchestration_item_lock(
876 &self,
877 token: &str,
878 extend_for: Duration,
879 ) -> Result<(), ProviderError> {
880 let start = std::time::Instant::now();
881
882 let now_ms = Self::now_millis();
884
885 let extend_secs = extend_for.as_secs() as i64;
887
888 match sqlx::query(&format!(
889 "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
890 self.schema_name
891 ))
892 .bind(token)
893 .bind(now_ms)
894 .bind(extend_secs)
895 .execute(&*self.pool)
896 .await
897 {
898 Ok(_) => {
899 let duration_ms = start.elapsed().as_millis() as u64;
900 debug!(
901 target = "duroxide::providers::postgres",
902 operation = "renew_orchestration_item_lock",
903 token = %token,
904 extend_for_secs = extend_secs,
905 duration_ms = duration_ms,
906 "Orchestration item lock renewed successfully"
907 );
908 Ok(())
909 }
910 Err(e) => {
911 if let SqlxError::Database(db_err) = &e {
912 if db_err.message().contains("Lock token invalid")
913 || db_err.message().contains("expired")
914 || db_err.message().contains("already released")
915 {
916 return Err(ProviderError::permanent(
917 "renew_orchestration_item_lock",
918 "Lock token invalid, expired, or already released",
919 ));
920 }
921 } else if e.to_string().contains("Lock token invalid")
922 || e.to_string().contains("expired")
923 || e.to_string().contains("already released")
924 {
925 return Err(ProviderError::permanent(
926 "renew_orchestration_item_lock",
927 "Lock token invalid, expired, or already released",
928 ));
929 }
930
931 Err(Self::sqlx_to_provider_error(
932 "renew_orchestration_item_lock",
933 e,
934 ))
935 }
936 }
937 }
938
939 #[instrument(skip(self), target = "duroxide::providers::postgres")]
940 async fn enqueue_for_orchestrator(
941 &self,
942 item: WorkItem,
943 delay: Option<Duration>,
944 ) -> Result<(), ProviderError> {
945 let work_item = serde_json::to_string(&item).map_err(|e| {
946 ProviderError::permanent(
947 "enqueue_orchestrator_work",
948 format!("Failed to serialize work item: {e}"),
949 )
950 })?;
951
952 let instance_id = match &item {
954 WorkItem::StartOrchestration { instance, .. }
955 | WorkItem::ActivityCompleted { instance, .. }
956 | WorkItem::ActivityFailed { instance, .. }
957 | WorkItem::TimerFired { instance, .. }
958 | WorkItem::ExternalRaised { instance, .. }
959 | WorkItem::CancelInstance { instance, .. }
960 | WorkItem::ContinueAsNew { instance, .. } => instance,
961 WorkItem::SubOrchCompleted {
962 parent_instance, ..
963 }
964 | WorkItem::SubOrchFailed {
965 parent_instance, ..
966 } => parent_instance,
967 WorkItem::ActivityExecute { .. } => {
968 return Err(ProviderError::permanent(
969 "enqueue_orchestrator_work",
970 "ActivityExecute should go to worker queue, not orchestrator queue",
971 ));
972 }
973 };
974
975 let now_ms = Self::now_millis();
977
978 let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
979 if *fire_at_ms > 0 {
980 if let Some(delay) = delay {
982 std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
983 } else {
984 *fire_at_ms
985 }
986 } else {
987 delay
989 .map(|d| now_ms as u64 + d.as_millis() as u64)
990 .unwrap_or(now_ms as u64)
991 }
992 } else {
993 delay
995 .map(|d| now_ms as u64 + d.as_millis() as u64)
996 .unwrap_or(now_ms as u64)
997 };
998
999 let visible_at = Utc
1000 .timestamp_millis_opt(visible_at_ms as i64)
1001 .single()
1002 .ok_or_else(|| {
1003 ProviderError::permanent(
1004 "enqueue_orchestrator_work",
1005 "Invalid visible_at timestamp",
1006 )
1007 })?;
1008
1009 sqlx::query(&format!(
1014 "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
1015 self.schema_name
1016 ))
1017 .bind(instance_id)
1018 .bind(&work_item)
1019 .bind(visible_at)
1020 .bind::<Option<String>>(None) .bind::<Option<String>>(None) .bind::<Option<i64>>(None) .execute(&*self.pool)
1024 .await
1025 .map_err(|e| {
1026 error!(
1027 target = "duroxide::providers::postgres",
1028 operation = "enqueue_orchestrator_work",
1029 error_type = "database_error",
1030 error = %e,
1031 instance_id = %instance_id,
1032 "Failed to enqueue orchestrator work"
1033 );
1034 Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1035 })?;
1036
1037 debug!(
1038 target = "duroxide::providers::postgres",
1039 operation = "enqueue_orchestrator_work",
1040 instance_id = %instance_id,
1041 delay_ms = delay.map(|d| d.as_millis() as u64),
1042 "Enqueued orchestrator work"
1043 );
1044
1045 Ok(())
1046 }
1047
1048 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1049 async fn read_with_execution(
1050 &self,
1051 instance: &str,
1052 execution_id: u64,
1053 ) -> Result<Vec<Event>, ProviderError> {
1054 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1055 "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1056 self.table_name("history")
1057 ))
1058 .bind(instance)
1059 .bind(execution_id as i64)
1060 .fetch_all(&*self.pool)
1061 .await
1062 .ok()
1063 .unwrap_or_default();
1064
1065 Ok(event_data_rows
1066 .into_iter()
1067 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1068 .collect())
1069 }
1070
1071 fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1072 Some(self)
1073 }
1074}
1075
1076#[async_trait::async_trait]
1077impl ProviderAdmin for PostgresProvider {
1078 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1079 async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1080 sqlx::query_scalar(&format!(
1081 "SELECT instance_id FROM {}.list_instances()",
1082 self.schema_name
1083 ))
1084 .fetch_all(&*self.pool)
1085 .await
1086 .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1087 }
1088
1089 #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1090 async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1091 sqlx::query_scalar(&format!(
1092 "SELECT instance_id FROM {}.list_instances_by_status($1)",
1093 self.schema_name
1094 ))
1095 .bind(status)
1096 .fetch_all(&*self.pool)
1097 .await
1098 .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1099 }
1100
1101 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1102 async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1103 let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1104 "SELECT execution_id FROM {}.list_executions($1)",
1105 self.schema_name
1106 ))
1107 .bind(instance)
1108 .fetch_all(&*self.pool)
1109 .await
1110 .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1111
1112 Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1113 }
1114
1115 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1116 async fn read_history_with_execution_id(
1117 &self,
1118 instance: &str,
1119 execution_id: u64,
1120 ) -> Result<Vec<Event>, ProviderError> {
1121 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1122 "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1123 self.schema_name
1124 ))
1125 .bind(instance)
1126 .bind(execution_id as i64)
1127 .fetch_all(&*self.pool)
1128 .await
1129 .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1130
1131 event_data_rows
1132 .into_iter()
1133 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1134 .collect::<Vec<Event>>()
1135 .into_iter()
1136 .map(Ok)
1137 .collect()
1138 }
1139
1140 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1141 async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1142 let execution_id = self.latest_execution_id(instance).await?;
1143 self.read_history_with_execution_id(instance, execution_id)
1144 .await
1145 }
1146
1147 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1148 async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1149 sqlx::query_scalar(&format!(
1150 "SELECT {}.latest_execution_id($1)",
1151 self.schema_name
1152 ))
1153 .bind(instance)
1154 .fetch_optional(&*self.pool)
1155 .await
1156 .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1157 .map(|id: i64| id as u64)
1158 .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1159 }
1160
1161 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1162 async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1163 let row: Option<(
1164 String,
1165 String,
1166 String,
1167 i64,
1168 chrono::DateTime<Utc>,
1169 Option<chrono::DateTime<Utc>>,
1170 Option<String>,
1171 Option<String>,
1172 )> = sqlx::query_as(&format!(
1173 "SELECT * FROM {}.get_instance_info($1)",
1174 self.schema_name
1175 ))
1176 .bind(instance)
1177 .fetch_optional(&*self.pool)
1178 .await
1179 .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1180
1181 let (
1182 instance_id,
1183 orchestration_name,
1184 orchestration_version,
1185 current_execution_id,
1186 created_at,
1187 updated_at,
1188 status,
1189 output,
1190 ) =
1191 row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1192
1193 Ok(InstanceInfo {
1194 instance_id,
1195 orchestration_name,
1196 orchestration_version,
1197 current_execution_id: current_execution_id as u64,
1198 status: status.unwrap_or_else(|| "Running".to_string()),
1199 output,
1200 created_at: created_at.timestamp_millis() as u64,
1201 updated_at: updated_at
1202 .map(|dt| dt.timestamp_millis() as u64)
1203 .unwrap_or(created_at.timestamp_millis() as u64),
1204 })
1205 }
1206
1207 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1208 async fn get_execution_info(
1209 &self,
1210 instance: &str,
1211 execution_id: u64,
1212 ) -> Result<ExecutionInfo, ProviderError> {
1213 let row: Option<(
1214 i64,
1215 String,
1216 Option<String>,
1217 chrono::DateTime<Utc>,
1218 Option<chrono::DateTime<Utc>>,
1219 i64,
1220 )> = sqlx::query_as(&format!(
1221 "SELECT * FROM {}.get_execution_info($1, $2)",
1222 self.schema_name
1223 ))
1224 .bind(instance)
1225 .bind(execution_id as i64)
1226 .fetch_optional(&*self.pool)
1227 .await
1228 .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1229
1230 let (exec_id, status, output, started_at, completed_at, event_count) = row
1231 .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1232
1233 Ok(ExecutionInfo {
1234 execution_id: exec_id as u64,
1235 status,
1236 output,
1237 started_at: started_at.timestamp_millis() as u64,
1238 completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1239 event_count: event_count as usize,
1240 })
1241 }
1242
1243 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1244 async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1245 let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1246 "SELECT * FROM {}.get_system_metrics()",
1247 self.schema_name
1248 ))
1249 .fetch_optional(&*self.pool)
1250 .await
1251 .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1252
1253 let (
1254 total_instances,
1255 total_executions,
1256 running_instances,
1257 completed_instances,
1258 failed_instances,
1259 total_events,
1260 ) = row.ok_or_else(|| {
1261 ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1262 })?;
1263
1264 Ok(SystemMetrics {
1265 total_instances: total_instances as u64,
1266 total_executions: total_executions as u64,
1267 running_instances: running_instances as u64,
1268 completed_instances: completed_instances as u64,
1269 failed_instances: failed_instances as u64,
1270 total_events: total_events as u64,
1271 })
1272 }
1273
1274 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1275 async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1276 let now_ms = Self::now_millis();
1277
1278 let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1279 "SELECT * FROM {}.get_queue_depths($1)",
1280 self.schema_name
1281 ))
1282 .bind(now_ms)
1283 .fetch_optional(&*self.pool)
1284 .await
1285 .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1286
1287 let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1288 ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1289 })?;
1290
1291 Ok(QueueDepths {
1292 orchestrator_queue: orchestrator_queue as usize,
1293 worker_queue: worker_queue as usize,
1294 timer_queue: 0, })
1296 }
1297}