1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4 ExecutionInfo, ExecutionMetadata, InstanceInfo, OrchestrationItem, Provider,
5 ProviderAdmin, ProviderError, QueueDepths, ScheduledActivityIdentifier, SystemMetrics, WorkItem,
6};
7use duroxide::Event;
8use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
9use std::sync::Arc;
10use std::time::Duration;
11use std::time::{SystemTime, UNIX_EPOCH};
12use tokio::time::sleep;
13use tracing::{debug, error, instrument, warn};
14
15use crate::migrations::MigrationRunner;
16
17pub struct PostgresProvider {
40 pool: Arc<PgPool>,
41 schema_name: String,
42}
43
44impl PostgresProvider {
45 pub async fn new(database_url: &str) -> Result<Self> {
46 Self::new_with_schema(database_url, None).await
47 }
48
49 pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
50 let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
51 .ok()
52 .and_then(|s| s.parse::<u32>().ok())
53 .unwrap_or(10);
54
55 let pool = PgPoolOptions::new()
56 .max_connections(max_connections)
57 .min_connections(1)
58 .acquire_timeout(std::time::Duration::from_secs(30))
59 .connect(database_url)
60 .await?;
61
62 let schema_name = schema_name.unwrap_or("public").to_string();
63
64 let provider = Self {
65 pool: Arc::new(pool),
66 schema_name: schema_name.clone(),
67 };
68
69 let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
71 migration_runner.migrate().await?;
72
73 Ok(provider)
74 }
75
76 #[instrument(skip(self), target = "duroxide::providers::postgres")]
77 pub async fn initialize_schema(&self) -> Result<()> {
78 let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
81 migration_runner.migrate().await?;
82 Ok(())
83 }
84
85 fn now_millis() -> i64 {
87 SystemTime::now()
88 .duration_since(UNIX_EPOCH)
89 .unwrap()
90 .as_millis() as i64
91 }
92
93
94
95 fn table_name(&self, table: &str) -> String {
97 format!("{}.{}", self.schema_name, table)
98 }
99
100 pub fn pool(&self) -> &PgPool {
102 &self.pool
103 }
104
105 pub fn schema_name(&self) -> &str {
107 &self.schema_name
108 }
109
110 fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
112 match e {
113 SqlxError::Database(ref db_err) => {
114 let code_opt = db_err.code();
116 let code = code_opt.as_deref();
117 if code == Some("40P01") {
118 ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
120 } else if code == Some("40001") {
121 ProviderError::permanent(operation, format!("Serialization failure: {e}"))
123 } else if code == Some("23505") {
124 ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
126 } else if code == Some("23503") {
127 ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
129 } else {
130 ProviderError::permanent(operation, format!("Database error: {e}"))
131 }
132 }
133 SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
134 ProviderError::retryable(operation, format!("Connection pool error: {e}"))
135 }
136 SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
137 _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
138 }
139 }
140
141 pub async fn cleanup_schema(&self) -> Result<()> {
146 sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
148 .execute(&*self.pool)
149 .await?;
150
151 if self.schema_name != "public" {
154 sqlx::query(&format!(
155 "DROP SCHEMA IF EXISTS {} CASCADE",
156 self.schema_name
157 ))
158 .execute(&*self.pool)
159 .await?;
160 } else {
161 }
164
165 Ok(())
166 }
167}
168
169#[async_trait::async_trait]
170impl Provider for PostgresProvider {
171 fn name(&self) -> &str {
172 "duroxide-pg"
173 }
174
175 fn version(&self) -> &str {
176 env!("CARGO_PKG_VERSION")
177 }
178
179 #[instrument(skip(self), target = "duroxide::providers::postgres")]
180 async fn fetch_orchestration_item(
181 &self,
182 lock_timeout: Duration,
183 _poll_timeout: Duration,
184 ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
185 let start = std::time::Instant::now();
186
187 const MAX_RETRIES: u32 = 3;
188 const RETRY_DELAY_MS: u64 = 50;
189
190 let lock_timeout_ms = lock_timeout.as_millis() as i64;
192 let mut _last_error: Option<ProviderError> = None;
193
194 for attempt in 0..=MAX_RETRIES {
195 let now_ms = Self::now_millis();
196
197 let result: Result<
198 Option<(
199 String,
200 String,
201 String,
202 i64,
203 serde_json::Value,
204 serde_json::Value,
205 String,
206 i32,
207 )>,
208 SqlxError,
209 > = sqlx::query_as(&format!(
210 "SELECT * FROM {}.fetch_orchestration_item($1, $2)",
211 self.schema_name
212 ))
213 .bind(now_ms)
214 .bind(lock_timeout_ms)
215 .fetch_optional(&*self.pool)
216 .await;
217
218 let row = match result {
219 Ok(r) => r,
220 Err(e) => {
221 let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
222 if provider_err.is_retryable() && attempt < MAX_RETRIES {
223 warn!(
224 target = "duroxide::providers::postgres",
225 operation = "fetch_orchestration_item",
226 attempt = attempt + 1,
227 error = %provider_err,
228 "Retryable error, will retry"
229 );
230 _last_error = Some(provider_err);
231 sleep(std::time::Duration::from_millis(
232 RETRY_DELAY_MS * (attempt as u64 + 1),
233 ))
234 .await;
235 continue;
236 }
237 return Err(provider_err);
238 }
239 };
240
241 if let Some((
242 instance_id,
243 orchestration_name,
244 orchestration_version,
245 execution_id,
246 history_json,
247 messages_json,
248 lock_token,
249 attempt_count,
250 )) = row
251 {
252 let history: Vec<Event> = serde_json::from_value(history_json).map_err(|e| {
253 ProviderError::permanent(
254 "fetch_orchestration_item",
255 format!("Failed to deserialize history: {e}"),
256 )
257 })?;
258
259 let messages: Vec<WorkItem> =
260 serde_json::from_value(messages_json).map_err(|e| {
261 ProviderError::permanent(
262 "fetch_orchestration_item",
263 format!("Failed to deserialize messages: {e}"),
264 )
265 })?;
266
267 let duration_ms = start.elapsed().as_millis() as u64;
268 debug!(
269 target = "duroxide::providers::postgres",
270 operation = "fetch_orchestration_item",
271 instance_id = %instance_id,
272 execution_id = execution_id,
273 message_count = messages.len(),
274 history_count = history.len(),
275 attempt_count = attempt_count,
276 duration_ms = duration_ms,
277 attempts = attempt + 1,
278 "Fetched orchestration item via stored procedure"
279 );
280
281 return Ok(Some((
282 OrchestrationItem {
283 instance: instance_id,
284 orchestration_name,
285 execution_id: execution_id as u64,
286 version: orchestration_version,
287 history,
288 messages,
289 },
290 lock_token,
291 attempt_count as u32,
292 )));
293 }
294
295 return Ok(None);
298 }
299
300 Ok(None)
301 }
302 #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
303 async fn ack_orchestration_item(
304 &self,
305 lock_token: &str,
306 execution_id: u64,
307 history_delta: Vec<Event>,
308 worker_items: Vec<WorkItem>,
309 orchestrator_items: Vec<WorkItem>,
310 metadata: ExecutionMetadata,
311 cancelled_activities: Vec<ScheduledActivityIdentifier>,
312 ) -> Result<(), ProviderError> {
313 let start = std::time::Instant::now();
314
315 const MAX_RETRIES: u32 = 3;
316 const RETRY_DELAY_MS: u64 = 50;
317
318 let mut history_delta_payload = Vec::with_capacity(history_delta.len());
319 for event in &history_delta {
320 if event.event_id() == 0 {
321 return Err(ProviderError::permanent(
322 "ack_orchestration_item",
323 "event_id must be set by runtime",
324 ));
325 }
326
327 let event_json = serde_json::to_string(event).map_err(|e| {
328 ProviderError::permanent(
329 "ack_orchestration_item",
330 format!("Failed to serialize event: {e}"),
331 )
332 })?;
333
334 let event_type = format!("{event:?}")
335 .split('{')
336 .next()
337 .unwrap_or("Unknown")
338 .trim()
339 .to_string();
340
341 history_delta_payload.push(serde_json::json!({
342 "event_id": event.event_id(),
343 "event_type": event_type,
344 "event_data": event_json,
345 }));
346 }
347
348 let history_delta_json = serde_json::Value::Array(history_delta_payload);
349
350 let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
351 ProviderError::permanent(
352 "ack_orchestration_item",
353 format!("Failed to serialize worker items: {e}"),
354 )
355 })?;
356
357 let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
358 ProviderError::permanent(
359 "ack_orchestration_item",
360 format!("Failed to serialize orchestrator items: {e}"),
361 )
362 })?;
363
364 let metadata_json = serde_json::json!({
365 "orchestration_name": metadata.orchestration_name,
366 "orchestration_version": metadata.orchestration_version,
367 "status": metadata.status,
368 "output": metadata.output,
369 });
370
371 let cancelled_activities_json: Vec<serde_json::Value> = cancelled_activities
373 .iter()
374 .map(|a| {
375 serde_json::json!({
376 "instance": a.instance,
377 "execution_id": a.execution_id,
378 "activity_id": a.activity_id,
379 })
380 })
381 .collect();
382 let cancelled_activities_json = serde_json::Value::Array(cancelled_activities_json);
383
384 for attempt in 0..=MAX_RETRIES {
385 let result = sqlx::query(&format!(
386 "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6, $7)",
387 self.schema_name
388 ))
389 .bind(lock_token)
390 .bind(execution_id as i64)
391 .bind(&history_delta_json)
392 .bind(&worker_items_json)
393 .bind(&orchestrator_items_json)
394 .bind(&metadata_json)
395 .bind(&cancelled_activities_json)
396 .execute(&*self.pool)
397 .await;
398
399 match result {
400 Ok(_) => {
401 let duration_ms = start.elapsed().as_millis() as u64;
402 debug!(
403 target = "duroxide::providers::postgres",
404 operation = "ack_orchestration_item",
405 execution_id = execution_id,
406 history_count = history_delta.len(),
407 worker_items_count = worker_items.len(),
408 orchestrator_items_count = orchestrator_items.len(),
409 cancelled_activities_count = cancelled_activities.len(),
410 duration_ms = duration_ms,
411 attempts = attempt + 1,
412 "Acknowledged orchestration item via stored procedure"
413 );
414 return Ok(());
415 }
416 Err(e) => {
417 if let SqlxError::Database(db_err) = &e {
419 if db_err.message().contains("Invalid lock token") {
420 return Err(ProviderError::permanent(
421 "ack_orchestration_item",
422 "Invalid lock token",
423 ));
424 }
425 } else if e.to_string().contains("Invalid lock token") {
426 return Err(ProviderError::permanent(
427 "ack_orchestration_item",
428 "Invalid lock token",
429 ));
430 }
431
432 let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
433 if provider_err.is_retryable() && attempt < MAX_RETRIES {
434 warn!(
435 target = "duroxide::providers::postgres",
436 operation = "ack_orchestration_item",
437 attempt = attempt + 1,
438 error = %provider_err,
439 "Retryable error, will retry"
440 );
441 sleep(std::time::Duration::from_millis(
442 RETRY_DELAY_MS * (attempt as u64 + 1),
443 ))
444 .await;
445 continue;
446 }
447 return Err(provider_err);
448 }
449 }
450 }
451
452 Ok(())
454 }
455 #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
456 async fn abandon_orchestration_item(
457 &self,
458 lock_token: &str,
459 delay: Option<Duration>,
460 ignore_attempt: bool,
461 ) -> Result<(), ProviderError> {
462 let start = std::time::Instant::now();
463 let now_ms = Self::now_millis();
464 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
465
466 let instance_id = match sqlx::query_scalar::<_, String>(&format!(
467 "SELECT {}.abandon_orchestration_item($1, $2, $3, $4)",
468 self.schema_name
469 ))
470 .bind(lock_token)
471 .bind(now_ms)
472 .bind(delay_param)
473 .bind(ignore_attempt)
474 .fetch_one(&*self.pool)
475 .await
476 {
477 Ok(instance_id) => instance_id,
478 Err(e) => {
479 if let SqlxError::Database(db_err) = &e {
480 if db_err.message().contains("Invalid lock token") {
481 return Err(ProviderError::permanent(
482 "abandon_orchestration_item",
483 "Invalid lock token",
484 ));
485 }
486 } else if e.to_string().contains("Invalid lock token") {
487 return Err(ProviderError::permanent(
488 "abandon_orchestration_item",
489 "Invalid lock token",
490 ));
491 }
492
493 return Err(Self::sqlx_to_provider_error(
494 "abandon_orchestration_item",
495 e,
496 ));
497 }
498 };
499
500 let duration_ms = start.elapsed().as_millis() as u64;
501 debug!(
502 target = "duroxide::providers::postgres",
503 operation = "abandon_orchestration_item",
504 instance_id = %instance_id,
505 delay_ms = delay.map(|d| d.as_millis() as u64),
506 ignore_attempt = ignore_attempt,
507 duration_ms = duration_ms,
508 "Abandoned orchestration item via stored procedure"
509 );
510
511 Ok(())
512 }
513
514 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
515 async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
516 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
517 "SELECT out_event_data FROM {}.fetch_history($1)",
518 self.schema_name
519 ))
520 .bind(instance)
521 .fetch_all(&*self.pool)
522 .await
523 .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
524
525 Ok(event_data_rows
526 .into_iter()
527 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
528 .collect())
529 }
530
531 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
532 async fn append_with_execution(
533 &self,
534 instance: &str,
535 execution_id: u64,
536 new_events: Vec<Event>,
537 ) -> Result<(), ProviderError> {
538 if new_events.is_empty() {
539 return Ok(());
540 }
541
542 let mut events_payload = Vec::with_capacity(new_events.len());
543 for event in &new_events {
544 if event.event_id() == 0 {
545 error!(
546 target = "duroxide::providers::postgres",
547 operation = "append_with_execution",
548 error_type = "validation_error",
549 instance_id = %instance,
550 execution_id = execution_id,
551 "event_id must be set by runtime"
552 );
553 return Err(ProviderError::permanent(
554 "append_with_execution",
555 "event_id must be set by runtime",
556 ));
557 }
558
559 let event_json = serde_json::to_string(event).map_err(|e| {
560 ProviderError::permanent(
561 "append_with_execution",
562 format!("Failed to serialize event: {e}"),
563 )
564 })?;
565
566 let event_type = format!("{event:?}")
567 .split('{')
568 .next()
569 .unwrap_or("Unknown")
570 .trim()
571 .to_string();
572
573 events_payload.push(serde_json::json!({
574 "event_id": event.event_id(),
575 "event_type": event_type,
576 "event_data": event_json,
577 }));
578 }
579
580 let events_json = serde_json::Value::Array(events_payload);
581
582 sqlx::query(&format!(
583 "SELECT {}.append_history($1, $2, $3)",
584 self.schema_name
585 ))
586 .bind(instance)
587 .bind(execution_id as i64)
588 .bind(events_json)
589 .execute(&*self.pool)
590 .await
591 .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
592
593 debug!(
594 target = "duroxide::providers::postgres",
595 operation = "append_with_execution",
596 instance_id = %instance,
597 execution_id = execution_id,
598 event_count = new_events.len(),
599 "Appended history events via stored procedure"
600 );
601
602 Ok(())
603 }
604
605 #[instrument(skip(self), target = "duroxide::providers::postgres")]
606 async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
607 let work_item = serde_json::to_string(&item).map_err(|e| {
608 ProviderError::permanent(
609 "enqueue_worker_work",
610 format!("Failed to serialize work item: {e}"),
611 )
612 })?;
613
614 let now_ms = Self::now_millis();
615
616 let (instance_id, execution_id, activity_id) = match &item {
618 WorkItem::ActivityExecute {
619 instance,
620 execution_id,
621 id,
622 ..
623 } => (
624 Some(instance.clone()),
625 Some(*execution_id as i64),
626 Some(*id as i64),
627 ),
628 _ => (None, None, None),
629 };
630
631 sqlx::query(&format!(
632 "SELECT {}.enqueue_worker_work($1, $2, $3, $4, $5)",
633 self.schema_name
634 ))
635 .bind(work_item)
636 .bind(now_ms)
637 .bind(&instance_id)
638 .bind(execution_id)
639 .bind(activity_id)
640 .execute(&*self.pool)
641 .await
642 .map_err(|e| {
643 error!(
644 target = "duroxide::providers::postgres",
645 operation = "enqueue_worker_work",
646 error_type = "database_error",
647 error = %e,
648 "Failed to enqueue worker work"
649 );
650 Self::sqlx_to_provider_error("enqueue_worker_work", e)
651 })?;
652
653 Ok(())
654 }
655
656 #[instrument(skip(self), target = "duroxide::providers::postgres")]
657 async fn fetch_work_item(
658 &self,
659 lock_timeout: Duration,
660 _poll_timeout: Duration,
661 ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
662 let start = std::time::Instant::now();
663
664 let lock_timeout_ms = lock_timeout.as_millis() as i64;
666
667 let row = match sqlx::query_as::<_, (String, String, i32)>(&format!(
668 "SELECT * FROM {}.fetch_work_item($1, $2)",
669 self.schema_name
670 ))
671 .bind(Self::now_millis())
672 .bind(lock_timeout_ms)
673 .fetch_optional(&*self.pool)
674 .await
675 {
676 Ok(row) => row,
677 Err(e) => {
678 return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
679 }
680 };
681
682 let (work_item_json, lock_token, attempt_count) = match row {
683 Some(row) => row,
684 None => return Ok(None),
685 };
686
687 let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
688 ProviderError::permanent(
689 "fetch_work_item",
690 format!("Failed to deserialize worker item: {e}"),
691 )
692 })?;
693
694 let duration_ms = start.elapsed().as_millis() as u64;
695
696 let instance_id = match &work_item {
698 WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
699 WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
700 WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
701 WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
702 WorkItem::TimerFired { instance, .. } => instance.as_str(),
703 WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
704 WorkItem::CancelInstance { instance, .. } => instance.as_str(),
705 WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
706 WorkItem::SubOrchCompleted {
707 parent_instance, ..
708 } => parent_instance.as_str(),
709 WorkItem::SubOrchFailed {
710 parent_instance, ..
711 } => parent_instance.as_str(),
712 };
713
714 debug!(
715 target = "duroxide::providers::postgres",
716 operation = "fetch_work_item",
717 instance_id = %instance_id,
718 attempt_count = attempt_count,
719 duration_ms = duration_ms,
720 "Fetched activity work item via stored procedure"
721 );
722
723 Ok(Some((
724 work_item,
725 lock_token,
726 attempt_count as u32,
727 )))
728 }
729
730 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
731 async fn ack_work_item(
732 &self,
733 token: &str,
734 completion: Option<WorkItem>,
735 ) -> Result<(), ProviderError> {
736 let start = std::time::Instant::now();
737
738 let Some(completion) = completion else {
740 sqlx::query(&format!(
742 "SELECT {}.ack_worker($1)",
743 self.schema_name
744 ))
745 .bind(token)
746 .execute(&*self.pool)
747 .await
748 .map_err(|e| {
749 if e.to_string().contains("Worker queue item not found") {
750 ProviderError::permanent(
751 "ack_worker",
752 "Worker queue item not found or already processed",
753 )
754 } else {
755 Self::sqlx_to_provider_error("ack_worker", e)
756 }
757 })?;
758
759 let duration_ms = start.elapsed().as_millis() as u64;
760 debug!(
761 target = "duroxide::providers::postgres",
762 operation = "ack_worker",
763 token = %token,
764 duration_ms = duration_ms,
765 "Acknowledged worker without completion (cancelled)"
766 );
767 return Ok(());
768 };
769
770 let instance_id = match &completion {
772 WorkItem::ActivityCompleted { instance, .. }
773 | WorkItem::ActivityFailed { instance, .. } => instance,
774 _ => {
775 error!(
776 target = "duroxide::providers::postgres",
777 operation = "ack_worker",
778 error_type = "invalid_completion_type",
779 "Invalid completion work item type"
780 );
781 return Err(ProviderError::permanent(
782 "ack_worker",
783 "Invalid completion work item type",
784 ));
785 }
786 };
787
788 let completion_json = serde_json::to_string(&completion).map_err(|e| {
789 ProviderError::permanent("ack_worker", format!("Failed to serialize completion: {e}"))
790 })?;
791
792 let now_ms = Self::now_millis();
793
794 sqlx::query(&format!(
796 "SELECT {}.ack_worker($1, $2, $3, $4)",
797 self.schema_name
798 ))
799 .bind(token)
800 .bind(instance_id)
801 .bind(completion_json)
802 .bind(now_ms)
803 .execute(&*self.pool)
804 .await
805 .map_err(|e| {
806 if e.to_string().contains("Worker queue item not found") {
807 error!(
808 target = "duroxide::providers::postgres",
809 operation = "ack_worker",
810 error_type = "worker_item_not_found",
811 token = %token,
812 "Worker queue item not found or already processed"
813 );
814 ProviderError::permanent(
815 "ack_worker",
816 "Worker queue item not found or already processed",
817 )
818 } else {
819 Self::sqlx_to_provider_error("ack_worker", e)
820 }
821 })?;
822
823 let duration_ms = start.elapsed().as_millis() as u64;
824 debug!(
825 target = "duroxide::providers::postgres",
826 operation = "ack_worker",
827 instance_id = %instance_id,
828 duration_ms = duration_ms,
829 "Acknowledged worker and enqueued completion"
830 );
831
832 Ok(())
833 }
834
835 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
836 async fn renew_work_item_lock(
837 &self,
838 token: &str,
839 extend_for: Duration,
840 ) -> Result<(), ProviderError> {
841 let start = std::time::Instant::now();
842
843 let now_ms = Self::now_millis();
845
846 let extend_secs = extend_for.as_secs() as i64;
848
849 match sqlx::query(&format!(
850 "SELECT {}.renew_work_item_lock($1, $2, $3)",
851 self.schema_name
852 ))
853 .bind(token)
854 .bind(now_ms)
855 .bind(extend_secs)
856 .execute(&*self.pool)
857 .await
858 {
859 Ok(_) => {
860 let duration_ms = start.elapsed().as_millis() as u64;
861 debug!(
862 target = "duroxide::providers::postgres",
863 operation = "renew_work_item_lock",
864 token = %token,
865 extend_for_secs = extend_secs,
866 duration_ms = duration_ms,
867 "Work item lock renewed successfully"
868 );
869 Ok(())
870 }
871 Err(e) => {
872 if let SqlxError::Database(db_err) = &e {
873 if db_err.message().contains("Lock token invalid") {
874 return Err(ProviderError::permanent(
875 "renew_work_item_lock",
876 "Lock token invalid, expired, or already acked",
877 ));
878 }
879 } else if e.to_string().contains("Lock token invalid") {
880 return Err(ProviderError::permanent(
881 "renew_work_item_lock",
882 "Lock token invalid, expired, or already acked",
883 ));
884 }
885
886 Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
887 }
888 }
889 }
890
891 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
892 async fn abandon_work_item(
893 &self,
894 token: &str,
895 delay: Option<Duration>,
896 ignore_attempt: bool,
897 ) -> Result<(), ProviderError> {
898 let start = std::time::Instant::now();
899 let now_ms = Self::now_millis();
900 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
901
902 match sqlx::query(&format!(
903 "SELECT {}.abandon_work_item($1, $2, $3, $4)",
904 self.schema_name
905 ))
906 .bind(token)
907 .bind(now_ms)
908 .bind(delay_param)
909 .bind(ignore_attempt)
910 .execute(&*self.pool)
911 .await
912 {
913 Ok(_) => {
914 let duration_ms = start.elapsed().as_millis() as u64;
915 debug!(
916 target = "duroxide::providers::postgres",
917 operation = "abandon_work_item",
918 token = %token,
919 delay_ms = delay.map(|d| d.as_millis() as u64),
920 ignore_attempt = ignore_attempt,
921 duration_ms = duration_ms,
922 "Abandoned work item via stored procedure"
923 );
924 Ok(())
925 }
926 Err(e) => {
927 if let SqlxError::Database(db_err) = &e {
928 if db_err.message().contains("Invalid lock token")
929 || db_err.message().contains("already acked")
930 {
931 return Err(ProviderError::permanent(
932 "abandon_work_item",
933 "Invalid lock token or already acked",
934 ));
935 }
936 } else if e.to_string().contains("Invalid lock token")
937 || e.to_string().contains("already acked")
938 {
939 return Err(ProviderError::permanent(
940 "abandon_work_item",
941 "Invalid lock token or already acked",
942 ));
943 }
944
945 Err(Self::sqlx_to_provider_error("abandon_work_item", e))
946 }
947 }
948 }
949
950 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
951 async fn renew_orchestration_item_lock(
952 &self,
953 token: &str,
954 extend_for: Duration,
955 ) -> Result<(), ProviderError> {
956 let start = std::time::Instant::now();
957
958 let now_ms = Self::now_millis();
960
961 let extend_secs = extend_for.as_secs() as i64;
963
964 match sqlx::query(&format!(
965 "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
966 self.schema_name
967 ))
968 .bind(token)
969 .bind(now_ms)
970 .bind(extend_secs)
971 .execute(&*self.pool)
972 .await
973 {
974 Ok(_) => {
975 let duration_ms = start.elapsed().as_millis() as u64;
976 debug!(
977 target = "duroxide::providers::postgres",
978 operation = "renew_orchestration_item_lock",
979 token = %token,
980 extend_for_secs = extend_secs,
981 duration_ms = duration_ms,
982 "Orchestration item lock renewed successfully"
983 );
984 Ok(())
985 }
986 Err(e) => {
987 if let SqlxError::Database(db_err) = &e {
988 if db_err.message().contains("Lock token invalid")
989 || db_err.message().contains("expired")
990 || db_err.message().contains("already released")
991 {
992 return Err(ProviderError::permanent(
993 "renew_orchestration_item_lock",
994 "Lock token invalid, expired, or already released",
995 ));
996 }
997 } else if e.to_string().contains("Lock token invalid")
998 || e.to_string().contains("expired")
999 || e.to_string().contains("already released")
1000 {
1001 return Err(ProviderError::permanent(
1002 "renew_orchestration_item_lock",
1003 "Lock token invalid, expired, or already released",
1004 ));
1005 }
1006
1007 Err(Self::sqlx_to_provider_error(
1008 "renew_orchestration_item_lock",
1009 e,
1010 ))
1011 }
1012 }
1013 }
1014
1015 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1016 async fn enqueue_for_orchestrator(
1017 &self,
1018 item: WorkItem,
1019 delay: Option<Duration>,
1020 ) -> Result<(), ProviderError> {
1021 let work_item = serde_json::to_string(&item).map_err(|e| {
1022 ProviderError::permanent(
1023 "enqueue_orchestrator_work",
1024 format!("Failed to serialize work item: {e}"),
1025 )
1026 })?;
1027
1028 let instance_id = match &item {
1030 WorkItem::StartOrchestration { instance, .. }
1031 | WorkItem::ActivityCompleted { instance, .. }
1032 | WorkItem::ActivityFailed { instance, .. }
1033 | WorkItem::TimerFired { instance, .. }
1034 | WorkItem::ExternalRaised { instance, .. }
1035 | WorkItem::CancelInstance { instance, .. }
1036 | WorkItem::ContinueAsNew { instance, .. } => instance,
1037 WorkItem::SubOrchCompleted {
1038 parent_instance, ..
1039 }
1040 | WorkItem::SubOrchFailed {
1041 parent_instance, ..
1042 } => parent_instance,
1043 WorkItem::ActivityExecute { .. } => {
1044 return Err(ProviderError::permanent(
1045 "enqueue_orchestrator_work",
1046 "ActivityExecute should go to worker queue, not orchestrator queue",
1047 ));
1048 }
1049 };
1050
1051 let now_ms = Self::now_millis();
1053
1054 let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
1055 if *fire_at_ms > 0 {
1056 if let Some(delay) = delay {
1058 std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
1059 } else {
1060 *fire_at_ms
1061 }
1062 } else {
1063 delay
1065 .map(|d| now_ms as u64 + d.as_millis() as u64)
1066 .unwrap_or(now_ms as u64)
1067 }
1068 } else {
1069 delay
1071 .map(|d| now_ms as u64 + d.as_millis() as u64)
1072 .unwrap_or(now_ms as u64)
1073 };
1074
1075 let visible_at = Utc
1076 .timestamp_millis_opt(visible_at_ms as i64)
1077 .single()
1078 .ok_or_else(|| {
1079 ProviderError::permanent(
1080 "enqueue_orchestrator_work",
1081 "Invalid visible_at timestamp",
1082 )
1083 })?;
1084
1085 sqlx::query(&format!(
1090 "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
1091 self.schema_name
1092 ))
1093 .bind(instance_id)
1094 .bind(&work_item)
1095 .bind(visible_at)
1096 .bind::<Option<String>>(None) .bind::<Option<String>>(None) .bind::<Option<i64>>(None) .execute(&*self.pool)
1100 .await
1101 .map_err(|e| {
1102 error!(
1103 target = "duroxide::providers::postgres",
1104 operation = "enqueue_orchestrator_work",
1105 error_type = "database_error",
1106 error = %e,
1107 instance_id = %instance_id,
1108 "Failed to enqueue orchestrator work"
1109 );
1110 Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1111 })?;
1112
1113 debug!(
1114 target = "duroxide::providers::postgres",
1115 operation = "enqueue_orchestrator_work",
1116 instance_id = %instance_id,
1117 delay_ms = delay.map(|d| d.as_millis() as u64),
1118 "Enqueued orchestrator work"
1119 );
1120
1121 Ok(())
1122 }
1123
1124 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1125 async fn read_with_execution(
1126 &self,
1127 instance: &str,
1128 execution_id: u64,
1129 ) -> Result<Vec<Event>, ProviderError> {
1130 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1131 "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1132 self.table_name("history")
1133 ))
1134 .bind(instance)
1135 .bind(execution_id as i64)
1136 .fetch_all(&*self.pool)
1137 .await
1138 .ok()
1139 .unwrap_or_default();
1140
1141 Ok(event_data_rows
1142 .into_iter()
1143 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1144 .collect())
1145 }
1146
1147 fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1148 Some(self)
1149 }
1150}
1151
1152#[async_trait::async_trait]
1153impl ProviderAdmin for PostgresProvider {
1154 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1155 async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1156 sqlx::query_scalar(&format!(
1157 "SELECT instance_id FROM {}.list_instances()",
1158 self.schema_name
1159 ))
1160 .fetch_all(&*self.pool)
1161 .await
1162 .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1163 }
1164
1165 #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1166 async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1167 sqlx::query_scalar(&format!(
1168 "SELECT instance_id FROM {}.list_instances_by_status($1)",
1169 self.schema_name
1170 ))
1171 .bind(status)
1172 .fetch_all(&*self.pool)
1173 .await
1174 .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1175 }
1176
1177 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1178 async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1179 let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1180 "SELECT execution_id FROM {}.list_executions($1)",
1181 self.schema_name
1182 ))
1183 .bind(instance)
1184 .fetch_all(&*self.pool)
1185 .await
1186 .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1187
1188 Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1189 }
1190
1191 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1192 async fn read_history_with_execution_id(
1193 &self,
1194 instance: &str,
1195 execution_id: u64,
1196 ) -> Result<Vec<Event>, ProviderError> {
1197 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1198 "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1199 self.schema_name
1200 ))
1201 .bind(instance)
1202 .bind(execution_id as i64)
1203 .fetch_all(&*self.pool)
1204 .await
1205 .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1206
1207 event_data_rows
1208 .into_iter()
1209 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1210 .collect::<Vec<Event>>()
1211 .into_iter()
1212 .map(Ok)
1213 .collect()
1214 }
1215
1216 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1217 async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1218 let execution_id = self.latest_execution_id(instance).await?;
1219 self.read_history_with_execution_id(instance, execution_id)
1220 .await
1221 }
1222
1223 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1224 async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1225 sqlx::query_scalar(&format!(
1226 "SELECT {}.latest_execution_id($1)",
1227 self.schema_name
1228 ))
1229 .bind(instance)
1230 .fetch_optional(&*self.pool)
1231 .await
1232 .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1233 .map(|id: i64| id as u64)
1234 .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1235 }
1236
1237 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1238 async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1239 let row: Option<(
1240 String,
1241 String,
1242 String,
1243 i64,
1244 chrono::DateTime<Utc>,
1245 Option<chrono::DateTime<Utc>>,
1246 Option<String>,
1247 Option<String>,
1248 )> = sqlx::query_as(&format!(
1249 "SELECT * FROM {}.get_instance_info($1)",
1250 self.schema_name
1251 ))
1252 .bind(instance)
1253 .fetch_optional(&*self.pool)
1254 .await
1255 .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1256
1257 let (
1258 instance_id,
1259 orchestration_name,
1260 orchestration_version,
1261 current_execution_id,
1262 created_at,
1263 updated_at,
1264 status,
1265 output,
1266 ) =
1267 row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1268
1269 Ok(InstanceInfo {
1270 instance_id,
1271 orchestration_name,
1272 orchestration_version,
1273 current_execution_id: current_execution_id as u64,
1274 status: status.unwrap_or_else(|| "Running".to_string()),
1275 output,
1276 created_at: created_at.timestamp_millis() as u64,
1277 updated_at: updated_at
1278 .map(|dt| dt.timestamp_millis() as u64)
1279 .unwrap_or(created_at.timestamp_millis() as u64),
1280 })
1281 }
1282
1283 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1284 async fn get_execution_info(
1285 &self,
1286 instance: &str,
1287 execution_id: u64,
1288 ) -> Result<ExecutionInfo, ProviderError> {
1289 let row: Option<(
1290 i64,
1291 String,
1292 Option<String>,
1293 chrono::DateTime<Utc>,
1294 Option<chrono::DateTime<Utc>>,
1295 i64,
1296 )> = sqlx::query_as(&format!(
1297 "SELECT * FROM {}.get_execution_info($1, $2)",
1298 self.schema_name
1299 ))
1300 .bind(instance)
1301 .bind(execution_id as i64)
1302 .fetch_optional(&*self.pool)
1303 .await
1304 .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1305
1306 let (exec_id, status, output, started_at, completed_at, event_count) = row
1307 .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1308
1309 Ok(ExecutionInfo {
1310 execution_id: exec_id as u64,
1311 status,
1312 output,
1313 started_at: started_at.timestamp_millis() as u64,
1314 completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1315 event_count: event_count as usize,
1316 })
1317 }
1318
1319 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1320 async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1321 let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1322 "SELECT * FROM {}.get_system_metrics()",
1323 self.schema_name
1324 ))
1325 .fetch_optional(&*self.pool)
1326 .await
1327 .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1328
1329 let (
1330 total_instances,
1331 total_executions,
1332 running_instances,
1333 completed_instances,
1334 failed_instances,
1335 total_events,
1336 ) = row.ok_or_else(|| {
1337 ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1338 })?;
1339
1340 Ok(SystemMetrics {
1341 total_instances: total_instances as u64,
1342 total_executions: total_executions as u64,
1343 running_instances: running_instances as u64,
1344 completed_instances: completed_instances as u64,
1345 failed_instances: failed_instances as u64,
1346 total_events: total_events as u64,
1347 })
1348 }
1349
1350 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1351 async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1352 let now_ms = Self::now_millis();
1353
1354 let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1355 "SELECT * FROM {}.get_queue_depths($1)",
1356 self.schema_name
1357 ))
1358 .bind(now_ms)
1359 .fetch_optional(&*self.pool)
1360 .await
1361 .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1362
1363 let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1364 ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1365 })?;
1366
1367 Ok(QueueDepths {
1368 orchestrator_queue: orchestrator_queue as usize,
1369 worker_queue: worker_queue as usize,
1370 timer_queue: 0, })
1372 }
1373}