1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4 DeleteInstanceResult, DispatcherCapabilityFilter, ExecutionInfo, ExecutionMetadata,
5 InstanceFilter, InstanceInfo, OrchestrationItem, Provider, ProviderAdmin, ProviderError,
6 PruneOptions, PruneResult, QueueDepths, ScheduledActivityIdentifier, SessionFetchConfig,
7 SystemMetrics, WorkItem,
8};
9use duroxide::{Event, EventKind};
10use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
11use std::sync::Arc;
12use std::time::Duration;
13use std::time::{SystemTime, UNIX_EPOCH};
14use tokio::time::sleep;
15use tracing::{debug, error, instrument, warn};
16
17use crate::migrations::MigrationRunner;
18
19pub struct PostgresProvider {
42 pool: Arc<PgPool>,
43 schema_name: String,
44}
45
46impl PostgresProvider {
47 pub async fn new(database_url: &str) -> Result<Self> {
48 Self::new_with_schema(database_url, None).await
49 }
50
51 pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
52 let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
53 .ok()
54 .and_then(|s| s.parse::<u32>().ok())
55 .unwrap_or(10);
56
57 let pool = PgPoolOptions::new()
58 .max_connections(max_connections)
59 .min_connections(1)
60 .acquire_timeout(std::time::Duration::from_secs(30))
61 .connect(database_url)
62 .await?;
63
64 let schema_name = schema_name.unwrap_or("public").to_string();
65
66 let provider = Self {
67 pool: Arc::new(pool),
68 schema_name: schema_name.clone(),
69 };
70
71 let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
73 migration_runner.migrate().await?;
74
75 Ok(provider)
76 }
77
78 #[instrument(skip(self), target = "duroxide::providers::postgres")]
79 pub async fn initialize_schema(&self) -> Result<()> {
80 let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
83 migration_runner.migrate().await?;
84 Ok(())
85 }
86
87 fn now_millis() -> i64 {
89 SystemTime::now()
90 .duration_since(UNIX_EPOCH)
91 .unwrap()
92 .as_millis() as i64
93 }
94
95 fn table_name(&self, table: &str) -> String {
97 format!("{}.{}", self.schema_name, table)
98 }
99
100 pub fn pool(&self) -> &PgPool {
102 &self.pool
103 }
104
105 pub fn schema_name(&self) -> &str {
107 &self.schema_name
108 }
109
110 fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
112 match e {
113 SqlxError::Database(ref db_err) => {
114 let code_opt = db_err.code();
116 let code = code_opt.as_deref();
117 if code == Some("40P01") {
118 ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
120 } else if code == Some("40001") {
121 ProviderError::permanent(operation, format!("Serialization failure: {e}"))
123 } else if code == Some("23505") {
124 ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
126 } else if code == Some("23503") {
127 ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
129 } else {
130 ProviderError::permanent(operation, format!("Database error: {e}"))
131 }
132 }
133 SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
134 ProviderError::retryable(operation, format!("Connection pool error: {e}"))
135 }
136 SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
137 _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
138 }
139 }
140
141 pub async fn cleanup_schema(&self) -> Result<()> {
146 sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
148 .execute(&*self.pool)
149 .await?;
150
151 if self.schema_name != "public" {
154 sqlx::query(&format!(
155 "DROP SCHEMA IF EXISTS {} CASCADE",
156 self.schema_name
157 ))
158 .execute(&*self.pool)
159 .await?;
160 } else {
161 }
164
165 Ok(())
166 }
167}
168
169#[async_trait::async_trait]
170impl Provider for PostgresProvider {
171 fn name(&self) -> &str {
172 "duroxide-pg"
173 }
174
175 fn version(&self) -> &str {
176 env!("CARGO_PKG_VERSION")
177 }
178
179 #[instrument(skip(self), target = "duroxide::providers::postgres")]
180 async fn fetch_orchestration_item(
181 &self,
182 lock_timeout: Duration,
183 _poll_timeout: Duration,
184 filter: Option<&DispatcherCapabilityFilter>,
185 ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
186 let start = std::time::Instant::now();
187
188 const MAX_RETRIES: u32 = 3;
189 const RETRY_DELAY_MS: u64 = 50;
190
191 let lock_timeout_ms = lock_timeout.as_millis() as i64;
193 let mut _last_error: Option<ProviderError> = None;
194
195 let (min_packed, max_packed) = if let Some(f) = filter {
197 if let Some(range) = f.supported_duroxide_versions.first() {
198 let min = range.min.major as i64 * 1_000_000
199 + range.min.minor as i64 * 1_000
200 + range.min.patch as i64;
201 let max = range.max.major as i64 * 1_000_000
202 + range.max.minor as i64 * 1_000
203 + range.max.patch as i64;
204 (Some(min), Some(max))
205 } else {
206 return Ok(None);
208 }
209 } else {
210 (None, None)
211 };
212
213 for attempt in 0..=MAX_RETRIES {
214 let now_ms = Self::now_millis();
215
216 let result: Result<
217 Option<(
218 String,
219 String,
220 String,
221 i64,
222 serde_json::Value,
223 serde_json::Value,
224 String,
225 i32,
226 )>,
227 SqlxError,
228 > = sqlx::query_as(&format!(
229 "SELECT * FROM {}.fetch_orchestration_item($1, $2, $3, $4)",
230 self.schema_name
231 ))
232 .bind(now_ms)
233 .bind(lock_timeout_ms)
234 .bind(min_packed)
235 .bind(max_packed)
236 .fetch_optional(&*self.pool)
237 .await;
238
239 let row = match result {
240 Ok(r) => r,
241 Err(e) => {
242 let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
243 if provider_err.is_retryable() && attempt < MAX_RETRIES {
244 warn!(
245 target = "duroxide::providers::postgres",
246 operation = "fetch_orchestration_item",
247 attempt = attempt + 1,
248 error = %provider_err,
249 "Retryable error, will retry"
250 );
251 _last_error = Some(provider_err);
252 sleep(std::time::Duration::from_millis(
253 RETRY_DELAY_MS * (attempt as u64 + 1),
254 ))
255 .await;
256 continue;
257 }
258 return Err(provider_err);
259 }
260 };
261
262 if let Some((
263 instance_id,
264 orchestration_name,
265 orchestration_version,
266 execution_id,
267 history_json,
268 messages_json,
269 lock_token,
270 attempt_count,
271 )) = row
272 {
273 let (history, history_error) =
274 match serde_json::from_value::<Vec<Event>>(history_json) {
275 Ok(h) => (h, None),
276 Err(e) => {
277 let error_msg = format!("Failed to deserialize history: {e}");
278 warn!(
279 target = "duroxide::providers::postgres",
280 instance = %instance_id,
281 error = %error_msg,
282 "History deserialization failed, returning item with history_error"
283 );
284 (vec![], Some(error_msg))
285 }
286 };
287
288 let messages: Vec<WorkItem> =
289 serde_json::from_value(messages_json).map_err(|e| {
290 ProviderError::permanent(
291 "fetch_orchestration_item",
292 format!("Failed to deserialize messages: {e}"),
293 )
294 })?;
295
296 let duration_ms = start.elapsed().as_millis() as u64;
297 debug!(
298 target = "duroxide::providers::postgres",
299 operation = "fetch_orchestration_item",
300 instance_id = %instance_id,
301 execution_id = execution_id,
302 message_count = messages.len(),
303 history_count = history.len(),
304 attempt_count = attempt_count,
305 duration_ms = duration_ms,
306 attempts = attempt + 1,
307 "Fetched orchestration item via stored procedure"
308 );
309
310 if orchestration_name == "Unknown"
316 && history.is_empty()
317 && messages
318 .iter()
319 .all(|m| matches!(m, WorkItem::QueueMessage { .. }))
320 {
321 let message_count = messages.len();
322 tracing::warn!(
323 target = "duroxide::providers::postgres",
324 instance = %instance_id,
325 message_count,
326 "Dropping orphan queue messages — events enqueued before orchestration started are not supported"
327 );
328 self.ack_orchestration_item(
329 &lock_token,
330 execution_id as u64,
331 vec![],
332 vec![],
333 vec![],
334 ExecutionMetadata::default(),
335 vec![],
336 )
337 .await?;
338 return Ok(None);
339 }
340
341 return Ok(Some((
342 OrchestrationItem {
343 instance: instance_id,
344 orchestration_name,
345 execution_id: execution_id as u64,
346 version: orchestration_version,
347 history,
348 messages,
349 history_error,
350 },
351 lock_token,
352 attempt_count as u32,
353 )));
354 }
355
356 return Ok(None);
359 }
360
361 Ok(None)
362 }
363 #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
364 async fn ack_orchestration_item(
365 &self,
366 lock_token: &str,
367 execution_id: u64,
368 history_delta: Vec<Event>,
369 worker_items: Vec<WorkItem>,
370 orchestrator_items: Vec<WorkItem>,
371 metadata: ExecutionMetadata,
372 cancelled_activities: Vec<ScheduledActivityIdentifier>,
373 ) -> Result<(), ProviderError> {
374 let start = std::time::Instant::now();
375
376 const MAX_RETRIES: u32 = 3;
377 const RETRY_DELAY_MS: u64 = 50;
378
379 let mut history_delta_payload = Vec::with_capacity(history_delta.len());
380 for event in &history_delta {
381 if event.event_id() == 0 {
382 return Err(ProviderError::permanent(
383 "ack_orchestration_item",
384 "event_id must be set by runtime",
385 ));
386 }
387
388 let event_json = serde_json::to_string(event).map_err(|e| {
389 ProviderError::permanent(
390 "ack_orchestration_item",
391 format!("Failed to serialize event: {e}"),
392 )
393 })?;
394
395 let event_type = format!("{event:?}")
396 .split('{')
397 .next()
398 .unwrap_or("Unknown")
399 .trim()
400 .to_string();
401
402 history_delta_payload.push(serde_json::json!({
403 "event_id": event.event_id(),
404 "event_type": event_type,
405 "event_data": event_json,
406 }));
407 }
408
409 let history_delta_json = serde_json::Value::Array(history_delta_payload);
410
411 let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
412 ProviderError::permanent(
413 "ack_orchestration_item",
414 format!("Failed to serialize worker items: {e}"),
415 )
416 })?;
417
418 let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
419 ProviderError::permanent(
420 "ack_orchestration_item",
421 format!("Failed to serialize orchestrator items: {e}"),
422 )
423 })?;
424
425 let (custom_status_action, custom_status_value): (Option<&str>, Option<&str>) = {
427 let mut last_status: Option<&Option<String>> = None;
428 for event in &history_delta {
429 if let EventKind::CustomStatusUpdated { ref status } = event.kind {
430 last_status = Some(status);
431 }
432 }
433 match last_status {
434 Some(Some(s)) => (Some("set"), Some(s.as_str())),
435 Some(None) => (Some("clear"), None),
436 None => (None, None),
437 }
438 };
439
440 let metadata_json = serde_json::json!({
441 "orchestration_name": metadata.orchestration_name,
442 "orchestration_version": metadata.orchestration_version,
443 "status": metadata.status,
444 "output": metadata.output,
445 "parent_instance_id": metadata.parent_instance_id,
446 "pinned_duroxide_version": metadata.pinned_duroxide_version.as_ref().map(|v| {
447 serde_json::json!({
448 "major": v.major,
449 "minor": v.minor,
450 "patch": v.patch,
451 })
452 }),
453 "custom_status_action": custom_status_action,
454 "custom_status_value": custom_status_value,
455 });
456
457 let cancelled_activities_json: Vec<serde_json::Value> = cancelled_activities
459 .iter()
460 .map(|a| {
461 serde_json::json!({
462 "instance": a.instance,
463 "execution_id": a.execution_id,
464 "activity_id": a.activity_id,
465 })
466 })
467 .collect();
468 let cancelled_activities_json = serde_json::Value::Array(cancelled_activities_json);
469
470 for attempt in 0..=MAX_RETRIES {
471 let now_ms = Self::now_millis();
472 let result = sqlx::query(&format!(
473 "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6, $7, $8)",
474 self.schema_name
475 ))
476 .bind(lock_token)
477 .bind(now_ms)
478 .bind(execution_id as i64)
479 .bind(&history_delta_json)
480 .bind(&worker_items_json)
481 .bind(&orchestrator_items_json)
482 .bind(&metadata_json)
483 .bind(&cancelled_activities_json)
484 .execute(&*self.pool)
485 .await;
486
487 match result {
488 Ok(_) => {
489 let duration_ms = start.elapsed().as_millis() as u64;
490 debug!(
491 target = "duroxide::providers::postgres",
492 operation = "ack_orchestration_item",
493 execution_id = execution_id,
494 history_count = history_delta.len(),
495 worker_items_count = worker_items.len(),
496 orchestrator_items_count = orchestrator_items.len(),
497 cancelled_activities_count = cancelled_activities.len(),
498 duration_ms = duration_ms,
499 attempts = attempt + 1,
500 "Acknowledged orchestration item via stored procedure"
501 );
502 return Ok(());
503 }
504 Err(e) => {
505 if let SqlxError::Database(db_err) = &e {
507 if db_err.message().contains("Invalid lock token") {
508 return Err(ProviderError::permanent(
509 "ack_orchestration_item",
510 "Invalid lock token",
511 ));
512 }
513 } else if e.to_string().contains("Invalid lock token") {
514 return Err(ProviderError::permanent(
515 "ack_orchestration_item",
516 "Invalid lock token",
517 ));
518 }
519
520 let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
521 if provider_err.is_retryable() && attempt < MAX_RETRIES {
522 warn!(
523 target = "duroxide::providers::postgres",
524 operation = "ack_orchestration_item",
525 attempt = attempt + 1,
526 error = %provider_err,
527 "Retryable error, will retry"
528 );
529 sleep(std::time::Duration::from_millis(
530 RETRY_DELAY_MS * (attempt as u64 + 1),
531 ))
532 .await;
533 continue;
534 }
535 return Err(provider_err);
536 }
537 }
538 }
539
540 Ok(())
542 }
543 #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
544 async fn abandon_orchestration_item(
545 &self,
546 lock_token: &str,
547 delay: Option<Duration>,
548 ignore_attempt: bool,
549 ) -> Result<(), ProviderError> {
550 let start = std::time::Instant::now();
551 let now_ms = Self::now_millis();
552 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
553
554 let instance_id = match sqlx::query_scalar::<_, String>(&format!(
555 "SELECT {}.abandon_orchestration_item($1, $2, $3, $4)",
556 self.schema_name
557 ))
558 .bind(lock_token)
559 .bind(now_ms)
560 .bind(delay_param)
561 .bind(ignore_attempt)
562 .fetch_one(&*self.pool)
563 .await
564 {
565 Ok(instance_id) => instance_id,
566 Err(e) => {
567 if let SqlxError::Database(db_err) = &e {
568 if db_err.message().contains("Invalid lock token") {
569 return Err(ProviderError::permanent(
570 "abandon_orchestration_item",
571 "Invalid lock token",
572 ));
573 }
574 } else if e.to_string().contains("Invalid lock token") {
575 return Err(ProviderError::permanent(
576 "abandon_orchestration_item",
577 "Invalid lock token",
578 ));
579 }
580
581 return Err(Self::sqlx_to_provider_error(
582 "abandon_orchestration_item",
583 e,
584 ));
585 }
586 };
587
588 let duration_ms = start.elapsed().as_millis() as u64;
589 debug!(
590 target = "duroxide::providers::postgres",
591 operation = "abandon_orchestration_item",
592 instance_id = %instance_id,
593 delay_ms = delay.map(|d| d.as_millis() as u64),
594 ignore_attempt = ignore_attempt,
595 duration_ms = duration_ms,
596 "Abandoned orchestration item via stored procedure"
597 );
598
599 Ok(())
600 }
601
602 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
603 async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
604 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
605 "SELECT out_event_data FROM {}.fetch_history($1)",
606 self.schema_name
607 ))
608 .bind(instance)
609 .fetch_all(&*self.pool)
610 .await
611 .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
612
613 Ok(event_data_rows
614 .into_iter()
615 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
616 .collect())
617 }
618
619 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
620 async fn append_with_execution(
621 &self,
622 instance: &str,
623 execution_id: u64,
624 new_events: Vec<Event>,
625 ) -> Result<(), ProviderError> {
626 if new_events.is_empty() {
627 return Ok(());
628 }
629
630 let mut events_payload = Vec::with_capacity(new_events.len());
631 for event in &new_events {
632 if event.event_id() == 0 {
633 error!(
634 target = "duroxide::providers::postgres",
635 operation = "append_with_execution",
636 error_type = "validation_error",
637 instance_id = %instance,
638 execution_id = execution_id,
639 "event_id must be set by runtime"
640 );
641 return Err(ProviderError::permanent(
642 "append_with_execution",
643 "event_id must be set by runtime",
644 ));
645 }
646
647 let event_json = serde_json::to_string(event).map_err(|e| {
648 ProviderError::permanent(
649 "append_with_execution",
650 format!("Failed to serialize event: {e}"),
651 )
652 })?;
653
654 let event_type = format!("{event:?}")
655 .split('{')
656 .next()
657 .unwrap_or("Unknown")
658 .trim()
659 .to_string();
660
661 events_payload.push(serde_json::json!({
662 "event_id": event.event_id(),
663 "event_type": event_type,
664 "event_data": event_json,
665 }));
666 }
667
668 let events_json = serde_json::Value::Array(events_payload);
669
670 sqlx::query(&format!(
671 "SELECT {}.append_history($1, $2, $3)",
672 self.schema_name
673 ))
674 .bind(instance)
675 .bind(execution_id as i64)
676 .bind(events_json)
677 .execute(&*self.pool)
678 .await
679 .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
680
681 debug!(
682 target = "duroxide::providers::postgres",
683 operation = "append_with_execution",
684 instance_id = %instance,
685 execution_id = execution_id,
686 event_count = new_events.len(),
687 "Appended history events via stored procedure"
688 );
689
690 Ok(())
691 }
692
693 #[instrument(skip(self), target = "duroxide::providers::postgres")]
694 async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
695 let work_item = serde_json::to_string(&item).map_err(|e| {
696 ProviderError::permanent(
697 "enqueue_worker_work",
698 format!("Failed to serialize work item: {e}"),
699 )
700 })?;
701
702 let now_ms = Self::now_millis();
703
704 let (instance_id, execution_id, activity_id, session_id) = match &item {
706 WorkItem::ActivityExecute {
707 instance,
708 execution_id,
709 id,
710 session_id,
711 ..
712 } => (
713 Some(instance.clone()),
714 Some(*execution_id as i64),
715 Some(*id as i64),
716 session_id.clone(),
717 ),
718 _ => (None, None, None, None),
719 };
720
721 sqlx::query(&format!(
722 "SELECT {}.enqueue_worker_work($1, $2, $3, $4, $5, $6)",
723 self.schema_name
724 ))
725 .bind(work_item)
726 .bind(now_ms)
727 .bind(&instance_id)
728 .bind(execution_id)
729 .bind(activity_id)
730 .bind(&session_id)
731 .execute(&*self.pool)
732 .await
733 .map_err(|e| {
734 error!(
735 target = "duroxide::providers::postgres",
736 operation = "enqueue_worker_work",
737 error_type = "database_error",
738 error = %e,
739 "Failed to enqueue worker work"
740 );
741 Self::sqlx_to_provider_error("enqueue_worker_work", e)
742 })?;
743
744 Ok(())
745 }
746
747 #[instrument(skip(self), target = "duroxide::providers::postgres")]
748 async fn fetch_work_item(
749 &self,
750 lock_timeout: Duration,
751 _poll_timeout: Duration,
752 session: Option<&SessionFetchConfig>,
753 ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
754 let start = std::time::Instant::now();
755
756 let lock_timeout_ms = lock_timeout.as_millis() as i64;
758
759 let (owner_id, session_lock_timeout_ms): (Option<&str>, Option<i64>) = match session {
761 Some(config) => (
762 Some(&config.owner_id),
763 Some(config.lock_timeout.as_millis() as i64),
764 ),
765 None => (None, None),
766 };
767
768 let row = match sqlx::query_as::<_, (String, String, i32)>(&format!(
769 "SELECT * FROM {}.fetch_work_item($1, $2, $3, $4)",
770 self.schema_name
771 ))
772 .bind(Self::now_millis())
773 .bind(lock_timeout_ms)
774 .bind(owner_id)
775 .bind(session_lock_timeout_ms)
776 .fetch_optional(&*self.pool)
777 .await
778 {
779 Ok(row) => row,
780 Err(e) => {
781 return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
782 }
783 };
784
785 let (work_item_json, lock_token, attempt_count) = match row {
786 Some(row) => row,
787 None => return Ok(None),
788 };
789
790 let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
791 ProviderError::permanent(
792 "fetch_work_item",
793 format!("Failed to deserialize worker item: {e}"),
794 )
795 })?;
796
797 let duration_ms = start.elapsed().as_millis() as u64;
798
799 let instance_id = match &work_item {
801 WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
802 WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
803 WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
804 WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
805 WorkItem::TimerFired { instance, .. } => instance.as_str(),
806 WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
807 WorkItem::CancelInstance { instance, .. } => instance.as_str(),
808 WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
809 WorkItem::SubOrchCompleted {
810 parent_instance, ..
811 } => parent_instance.as_str(),
812 WorkItem::SubOrchFailed {
813 parent_instance, ..
814 } => parent_instance.as_str(),
815 WorkItem::QueueMessage { instance, .. } => instance.as_str(),
816 };
817
818 debug!(
819 target = "duroxide::providers::postgres",
820 operation = "fetch_work_item",
821 instance_id = %instance_id,
822 attempt_count = attempt_count,
823 duration_ms = duration_ms,
824 "Fetched activity work item via stored procedure"
825 );
826
827 Ok(Some((work_item, lock_token, attempt_count as u32)))
828 }
829
830 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
831 async fn ack_work_item(
832 &self,
833 token: &str,
834 completion: Option<WorkItem>,
835 ) -> Result<(), ProviderError> {
836 let start = std::time::Instant::now();
837
838 let Some(completion) = completion else {
840 let now_ms = Self::now_millis();
841 sqlx::query(&format!(
843 "SELECT {}.ack_worker($1, NULL, NULL, $2)",
844 self.schema_name
845 ))
846 .bind(token)
847 .bind(now_ms)
848 .execute(&*self.pool)
849 .await
850 .map_err(|e| {
851 if e.to_string().contains("Worker queue item not found") {
852 ProviderError::permanent(
853 "ack_worker",
854 "Worker queue item not found or already processed",
855 )
856 } else {
857 Self::sqlx_to_provider_error("ack_worker", e)
858 }
859 })?;
860
861 let duration_ms = start.elapsed().as_millis() as u64;
862 debug!(
863 target = "duroxide::providers::postgres",
864 operation = "ack_worker",
865 token = %token,
866 duration_ms = duration_ms,
867 "Acknowledged worker without completion (cancelled)"
868 );
869 return Ok(());
870 };
871
872 let instance_id = match &completion {
874 WorkItem::ActivityCompleted { instance, .. }
875 | WorkItem::ActivityFailed { instance, .. } => instance,
876 _ => {
877 error!(
878 target = "duroxide::providers::postgres",
879 operation = "ack_worker",
880 error_type = "invalid_completion_type",
881 "Invalid completion work item type"
882 );
883 return Err(ProviderError::permanent(
884 "ack_worker",
885 "Invalid completion work item type",
886 ));
887 }
888 };
889
890 let completion_json = serde_json::to_string(&completion).map_err(|e| {
891 ProviderError::permanent("ack_worker", format!("Failed to serialize completion: {e}"))
892 })?;
893
894 let now_ms = Self::now_millis();
895
896 sqlx::query(&format!(
898 "SELECT {}.ack_worker($1, $2, $3, $4)",
899 self.schema_name
900 ))
901 .bind(token)
902 .bind(instance_id)
903 .bind(completion_json)
904 .bind(now_ms)
905 .execute(&*self.pool)
906 .await
907 .map_err(|e| {
908 if e.to_string().contains("Worker queue item not found") {
909 error!(
910 target = "duroxide::providers::postgres",
911 operation = "ack_worker",
912 error_type = "worker_item_not_found",
913 token = %token,
914 "Worker queue item not found or already processed"
915 );
916 ProviderError::permanent(
917 "ack_worker",
918 "Worker queue item not found or already processed",
919 )
920 } else {
921 Self::sqlx_to_provider_error("ack_worker", e)
922 }
923 })?;
924
925 let duration_ms = start.elapsed().as_millis() as u64;
926 debug!(
927 target = "duroxide::providers::postgres",
928 operation = "ack_worker",
929 instance_id = %instance_id,
930 duration_ms = duration_ms,
931 "Acknowledged worker and enqueued completion"
932 );
933
934 Ok(())
935 }
936
937 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
938 async fn renew_work_item_lock(
939 &self,
940 token: &str,
941 extend_for: Duration,
942 ) -> Result<(), ProviderError> {
943 let start = std::time::Instant::now();
944
945 let now_ms = Self::now_millis();
947
948 let extend_secs = extend_for.as_secs() as i64;
950
951 match sqlx::query(&format!(
952 "SELECT {}.renew_work_item_lock($1, $2, $3)",
953 self.schema_name
954 ))
955 .bind(token)
956 .bind(now_ms)
957 .bind(extend_secs)
958 .execute(&*self.pool)
959 .await
960 {
961 Ok(_) => {
962 let duration_ms = start.elapsed().as_millis() as u64;
963 debug!(
964 target = "duroxide::providers::postgres",
965 operation = "renew_work_item_lock",
966 token = %token,
967 extend_for_secs = extend_secs,
968 duration_ms = duration_ms,
969 "Work item lock renewed successfully"
970 );
971 Ok(())
972 }
973 Err(e) => {
974 if let SqlxError::Database(db_err) = &e {
975 if db_err.message().contains("Lock token invalid") {
976 return Err(ProviderError::permanent(
977 "renew_work_item_lock",
978 "Lock token invalid, expired, or already acked",
979 ));
980 }
981 } else if e.to_string().contains("Lock token invalid") {
982 return Err(ProviderError::permanent(
983 "renew_work_item_lock",
984 "Lock token invalid, expired, or already acked",
985 ));
986 }
987
988 Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
989 }
990 }
991 }
992
993 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
994 async fn abandon_work_item(
995 &self,
996 token: &str,
997 delay: Option<Duration>,
998 ignore_attempt: bool,
999 ) -> Result<(), ProviderError> {
1000 let start = std::time::Instant::now();
1001 let now_ms = Self::now_millis();
1002 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
1003
1004 match sqlx::query(&format!(
1005 "SELECT {}.abandon_work_item($1, $2, $3, $4)",
1006 self.schema_name
1007 ))
1008 .bind(token)
1009 .bind(now_ms)
1010 .bind(delay_param)
1011 .bind(ignore_attempt)
1012 .execute(&*self.pool)
1013 .await
1014 {
1015 Ok(_) => {
1016 let duration_ms = start.elapsed().as_millis() as u64;
1017 debug!(
1018 target = "duroxide::providers::postgres",
1019 operation = "abandon_work_item",
1020 token = %token,
1021 delay_ms = delay.map(|d| d.as_millis() as u64),
1022 ignore_attempt = ignore_attempt,
1023 duration_ms = duration_ms,
1024 "Abandoned work item via stored procedure"
1025 );
1026 Ok(())
1027 }
1028 Err(e) => {
1029 if let SqlxError::Database(db_err) = &e {
1030 if db_err.message().contains("Invalid lock token")
1031 || db_err.message().contains("already acked")
1032 {
1033 return Err(ProviderError::permanent(
1034 "abandon_work_item",
1035 "Invalid lock token or already acked",
1036 ));
1037 }
1038 } else if e.to_string().contains("Invalid lock token")
1039 || e.to_string().contains("already acked")
1040 {
1041 return Err(ProviderError::permanent(
1042 "abandon_work_item",
1043 "Invalid lock token or already acked",
1044 ));
1045 }
1046
1047 Err(Self::sqlx_to_provider_error("abandon_work_item", e))
1048 }
1049 }
1050 }
1051
1052 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1053 async fn renew_orchestration_item_lock(
1054 &self,
1055 token: &str,
1056 extend_for: Duration,
1057 ) -> Result<(), ProviderError> {
1058 let start = std::time::Instant::now();
1059
1060 let now_ms = Self::now_millis();
1062
1063 let extend_secs = extend_for.as_secs() as i64;
1065
1066 match sqlx::query(&format!(
1067 "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
1068 self.schema_name
1069 ))
1070 .bind(token)
1071 .bind(now_ms)
1072 .bind(extend_secs)
1073 .execute(&*self.pool)
1074 .await
1075 {
1076 Ok(_) => {
1077 let duration_ms = start.elapsed().as_millis() as u64;
1078 debug!(
1079 target = "duroxide::providers::postgres",
1080 operation = "renew_orchestration_item_lock",
1081 token = %token,
1082 extend_for_secs = extend_secs,
1083 duration_ms = duration_ms,
1084 "Orchestration item lock renewed successfully"
1085 );
1086 Ok(())
1087 }
1088 Err(e) => {
1089 if let SqlxError::Database(db_err) = &e {
1090 if db_err.message().contains("Lock token invalid")
1091 || db_err.message().contains("expired")
1092 || db_err.message().contains("already released")
1093 {
1094 return Err(ProviderError::permanent(
1095 "renew_orchestration_item_lock",
1096 "Lock token invalid, expired, or already released",
1097 ));
1098 }
1099 } else if e.to_string().contains("Lock token invalid")
1100 || e.to_string().contains("expired")
1101 || e.to_string().contains("already released")
1102 {
1103 return Err(ProviderError::permanent(
1104 "renew_orchestration_item_lock",
1105 "Lock token invalid, expired, or already released",
1106 ));
1107 }
1108
1109 Err(Self::sqlx_to_provider_error(
1110 "renew_orchestration_item_lock",
1111 e,
1112 ))
1113 }
1114 }
1115 }
1116
1117 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1118 async fn enqueue_for_orchestrator(
1119 &self,
1120 item: WorkItem,
1121 delay: Option<Duration>,
1122 ) -> Result<(), ProviderError> {
1123 let work_item = serde_json::to_string(&item).map_err(|e| {
1124 ProviderError::permanent(
1125 "enqueue_orchestrator_work",
1126 format!("Failed to serialize work item: {e}"),
1127 )
1128 })?;
1129
1130 let instance_id = match &item {
1132 WorkItem::StartOrchestration { instance, .. }
1133 | WorkItem::ActivityCompleted { instance, .. }
1134 | WorkItem::ActivityFailed { instance, .. }
1135 | WorkItem::TimerFired { instance, .. }
1136 | WorkItem::ExternalRaised { instance, .. }
1137 | WorkItem::CancelInstance { instance, .. }
1138 | WorkItem::ContinueAsNew { instance, .. }
1139 | WorkItem::QueueMessage { instance, .. } => instance,
1140 WorkItem::SubOrchCompleted {
1141 parent_instance, ..
1142 }
1143 | WorkItem::SubOrchFailed {
1144 parent_instance, ..
1145 } => parent_instance,
1146 WorkItem::ActivityExecute { .. } => {
1147 return Err(ProviderError::permanent(
1148 "enqueue_orchestrator_work",
1149 "ActivityExecute should go to worker queue, not orchestrator queue",
1150 ));
1151 }
1152 };
1153
1154 let now_ms = Self::now_millis();
1156
1157 let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
1158 if *fire_at_ms > 0 {
1159 if let Some(delay) = delay {
1161 std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
1162 } else {
1163 *fire_at_ms
1164 }
1165 } else {
1166 delay
1168 .map(|d| now_ms as u64 + d.as_millis() as u64)
1169 .unwrap_or(now_ms as u64)
1170 }
1171 } else {
1172 delay
1174 .map(|d| now_ms as u64 + d.as_millis() as u64)
1175 .unwrap_or(now_ms as u64)
1176 };
1177
1178 let visible_at = Utc
1179 .timestamp_millis_opt(visible_at_ms as i64)
1180 .single()
1181 .ok_or_else(|| {
1182 ProviderError::permanent(
1183 "enqueue_orchestrator_work",
1184 "Invalid visible_at timestamp",
1185 )
1186 })?;
1187
1188 sqlx::query(&format!(
1193 "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
1194 self.schema_name
1195 ))
1196 .bind(instance_id)
1197 .bind(&work_item)
1198 .bind(visible_at)
1199 .bind::<Option<String>>(None) .bind::<Option<String>>(None) .bind::<Option<i64>>(None) .execute(&*self.pool)
1203 .await
1204 .map_err(|e| {
1205 error!(
1206 target = "duroxide::providers::postgres",
1207 operation = "enqueue_orchestrator_work",
1208 error_type = "database_error",
1209 error = %e,
1210 instance_id = %instance_id,
1211 "Failed to enqueue orchestrator work"
1212 );
1213 Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1214 })?;
1215
1216 debug!(
1217 target = "duroxide::providers::postgres",
1218 operation = "enqueue_orchestrator_work",
1219 instance_id = %instance_id,
1220 delay_ms = delay.map(|d| d.as_millis() as u64),
1221 "Enqueued orchestrator work"
1222 );
1223
1224 Ok(())
1225 }
1226
1227 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1228 async fn read_with_execution(
1229 &self,
1230 instance: &str,
1231 execution_id: u64,
1232 ) -> Result<Vec<Event>, ProviderError> {
1233 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1234 "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1235 self.table_name("history")
1236 ))
1237 .bind(instance)
1238 .bind(execution_id as i64)
1239 .fetch_all(&*self.pool)
1240 .await
1241 .ok()
1242 .unwrap_or_default();
1243
1244 Ok(event_data_rows
1245 .into_iter()
1246 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1247 .collect())
1248 }
1249
1250 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1251 async fn renew_session_lock(
1252 &self,
1253 owner_ids: &[&str],
1254 extend_for: Duration,
1255 idle_timeout: Duration,
1256 ) -> Result<usize, ProviderError> {
1257 if owner_ids.is_empty() {
1258 return Ok(0);
1259 }
1260
1261 let now_ms = Self::now_millis();
1262 let extend_ms = extend_for.as_millis() as i64;
1263 let idle_timeout_ms = idle_timeout.as_millis() as i64;
1264 let owner_ids_vec: Vec<&str> = owner_ids.to_vec();
1265
1266 let result = sqlx::query_scalar::<_, i64>(&format!(
1267 "SELECT {}.renew_session_lock($1, $2, $3, $4)",
1268 self.schema_name
1269 ))
1270 .bind(&owner_ids_vec)
1271 .bind(now_ms)
1272 .bind(extend_ms)
1273 .bind(idle_timeout_ms)
1274 .fetch_one(&*self.pool)
1275 .await
1276 .map_err(|e| Self::sqlx_to_provider_error("renew_session_lock", e))?;
1277
1278 debug!(
1279 target = "duroxide::providers::postgres",
1280 operation = "renew_session_lock",
1281 owner_count = owner_ids.len(),
1282 sessions_renewed = result,
1283 "Session locks renewed"
1284 );
1285
1286 Ok(result as usize)
1287 }
1288
1289 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1290 async fn cleanup_orphaned_sessions(
1291 &self,
1292 _idle_timeout: Duration,
1293 ) -> Result<usize, ProviderError> {
1294 let now_ms = Self::now_millis();
1295
1296 let result = sqlx::query_scalar::<_, i64>(&format!(
1297 "SELECT {}.cleanup_orphaned_sessions($1)",
1298 self.schema_name
1299 ))
1300 .bind(now_ms)
1301 .fetch_one(&*self.pool)
1302 .await
1303 .map_err(|e| Self::sqlx_to_provider_error("cleanup_orphaned_sessions", e))?;
1304
1305 debug!(
1306 target = "duroxide::providers::postgres",
1307 operation = "cleanup_orphaned_sessions",
1308 sessions_cleaned = result,
1309 "Orphaned sessions cleaned up"
1310 );
1311
1312 Ok(result as usize)
1313 }
1314
1315 fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1316 Some(self)
1317 }
1318
1319 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1320 async fn get_custom_status(
1321 &self,
1322 instance: &str,
1323 last_seen_version: u64,
1324 ) -> Result<Option<(Option<String>, u64)>, ProviderError> {
1325 let row = sqlx::query_as::<_, (Option<String>, i64)>(&format!(
1326 "SELECT * FROM {}.get_custom_status($1, $2)",
1327 self.schema_name
1328 ))
1329 .bind(instance)
1330 .bind(last_seen_version as i64)
1331 .fetch_optional(&*self.pool)
1332 .await
1333 .map_err(|e| Self::sqlx_to_provider_error("get_custom_status", e))?;
1334
1335 match row {
1336 Some((custom_status, version)) => Ok(Some((custom_status, version as u64))),
1337 None => Ok(None),
1338 }
1339 }
1340}
1341
1342#[async_trait::async_trait]
1343impl ProviderAdmin for PostgresProvider {
1344 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1345 async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1346 sqlx::query_scalar(&format!(
1347 "SELECT instance_id FROM {}.list_instances()",
1348 self.schema_name
1349 ))
1350 .fetch_all(&*self.pool)
1351 .await
1352 .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1353 }
1354
1355 #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1356 async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1357 sqlx::query_scalar(&format!(
1358 "SELECT instance_id FROM {}.list_instances_by_status($1)",
1359 self.schema_name
1360 ))
1361 .bind(status)
1362 .fetch_all(&*self.pool)
1363 .await
1364 .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1365 }
1366
1367 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1368 async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1369 let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1370 "SELECT execution_id FROM {}.list_executions($1)",
1371 self.schema_name
1372 ))
1373 .bind(instance)
1374 .fetch_all(&*self.pool)
1375 .await
1376 .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1377
1378 Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1379 }
1380
1381 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1382 async fn read_history_with_execution_id(
1383 &self,
1384 instance: &str,
1385 execution_id: u64,
1386 ) -> Result<Vec<Event>, ProviderError> {
1387 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1388 "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1389 self.schema_name
1390 ))
1391 .bind(instance)
1392 .bind(execution_id as i64)
1393 .fetch_all(&*self.pool)
1394 .await
1395 .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1396
1397 event_data_rows
1398 .into_iter()
1399 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1400 .collect::<Vec<Event>>()
1401 .into_iter()
1402 .map(Ok)
1403 .collect()
1404 }
1405
1406 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1407 async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1408 let execution_id = self.latest_execution_id(instance).await?;
1409 self.read_history_with_execution_id(instance, execution_id)
1410 .await
1411 }
1412
1413 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1414 async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1415 sqlx::query_scalar(&format!(
1416 "SELECT {}.latest_execution_id($1)",
1417 self.schema_name
1418 ))
1419 .bind(instance)
1420 .fetch_optional(&*self.pool)
1421 .await
1422 .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1423 .map(|id: i64| id as u64)
1424 .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1425 }
1426
1427 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1428 async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1429 let row: Option<(
1430 String,
1431 String,
1432 String,
1433 i64,
1434 chrono::DateTime<Utc>,
1435 Option<chrono::DateTime<Utc>>,
1436 Option<String>,
1437 Option<String>,
1438 Option<String>,
1439 )> = sqlx::query_as(&format!(
1440 "SELECT * FROM {}.get_instance_info($1)",
1441 self.schema_name
1442 ))
1443 .bind(instance)
1444 .fetch_optional(&*self.pool)
1445 .await
1446 .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1447
1448 let (
1449 instance_id,
1450 orchestration_name,
1451 orchestration_version,
1452 current_execution_id,
1453 created_at,
1454 updated_at,
1455 status,
1456 output,
1457 parent_instance_id,
1458 ) =
1459 row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1460
1461 Ok(InstanceInfo {
1462 instance_id,
1463 orchestration_name,
1464 orchestration_version,
1465 current_execution_id: current_execution_id as u64,
1466 status: status.unwrap_or_else(|| "Running".to_string()),
1467 output,
1468 created_at: created_at.timestamp_millis() as u64,
1469 updated_at: updated_at
1470 .map(|dt| dt.timestamp_millis() as u64)
1471 .unwrap_or(created_at.timestamp_millis() as u64),
1472 parent_instance_id,
1473 })
1474 }
1475
1476 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1477 async fn get_execution_info(
1478 &self,
1479 instance: &str,
1480 execution_id: u64,
1481 ) -> Result<ExecutionInfo, ProviderError> {
1482 let row: Option<(
1483 i64,
1484 String,
1485 Option<String>,
1486 chrono::DateTime<Utc>,
1487 Option<chrono::DateTime<Utc>>,
1488 i64,
1489 )> = sqlx::query_as(&format!(
1490 "SELECT * FROM {}.get_execution_info($1, $2)",
1491 self.schema_name
1492 ))
1493 .bind(instance)
1494 .bind(execution_id as i64)
1495 .fetch_optional(&*self.pool)
1496 .await
1497 .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1498
1499 let (exec_id, status, output, started_at, completed_at, event_count) = row
1500 .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1501
1502 Ok(ExecutionInfo {
1503 execution_id: exec_id as u64,
1504 status,
1505 output,
1506 started_at: started_at.timestamp_millis() as u64,
1507 completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1508 event_count: event_count as usize,
1509 })
1510 }
1511
1512 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1513 async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1514 let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1515 "SELECT * FROM {}.get_system_metrics()",
1516 self.schema_name
1517 ))
1518 .fetch_optional(&*self.pool)
1519 .await
1520 .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1521
1522 let (
1523 total_instances,
1524 total_executions,
1525 running_instances,
1526 completed_instances,
1527 failed_instances,
1528 total_events,
1529 ) = row.ok_or_else(|| {
1530 ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1531 })?;
1532
1533 Ok(SystemMetrics {
1534 total_instances: total_instances as u64,
1535 total_executions: total_executions as u64,
1536 running_instances: running_instances as u64,
1537 completed_instances: completed_instances as u64,
1538 failed_instances: failed_instances as u64,
1539 total_events: total_events as u64,
1540 })
1541 }
1542
1543 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1544 async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1545 let now_ms = Self::now_millis();
1546
1547 let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1548 "SELECT * FROM {}.get_queue_depths($1)",
1549 self.schema_name
1550 ))
1551 .bind(now_ms)
1552 .fetch_optional(&*self.pool)
1553 .await
1554 .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1555
1556 let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1557 ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1558 })?;
1559
1560 Ok(QueueDepths {
1561 orchestrator_queue: orchestrator_queue as usize,
1562 worker_queue: worker_queue as usize,
1563 timer_queue: 0, })
1565 }
1566
1567 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1570 async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError> {
1571 sqlx::query_scalar(&format!(
1572 "SELECT child_instance_id FROM {}.list_children($1)",
1573 self.schema_name
1574 ))
1575 .bind(instance_id)
1576 .fetch_all(&*self.pool)
1577 .await
1578 .map_err(|e| Self::sqlx_to_provider_error("list_children", e))
1579 }
1580
1581 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1582 async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError> {
1583 let result: Result<Option<String>, _> =
1586 sqlx::query_scalar(&format!("SELECT {}.get_parent_id($1)", self.schema_name))
1587 .bind(instance_id)
1588 .fetch_one(&*self.pool)
1589 .await;
1590
1591 match result {
1592 Ok(parent_id) => Ok(parent_id),
1593 Err(e) => {
1594 let err_str = e.to_string();
1595 if err_str.contains("Instance not found") {
1596 Err(ProviderError::permanent(
1597 "get_parent_id",
1598 format!("Instance not found: {}", instance_id),
1599 ))
1600 } else {
1601 Err(Self::sqlx_to_provider_error("get_parent_id", e))
1602 }
1603 }
1604 }
1605 }
1606
1607 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1610 async fn delete_instances_atomic(
1611 &self,
1612 ids: &[String],
1613 force: bool,
1614 ) -> Result<DeleteInstanceResult, ProviderError> {
1615 if ids.is_empty() {
1616 return Ok(DeleteInstanceResult::default());
1617 }
1618
1619 let row: Option<(i64, i64, i64, i64)> = sqlx::query_as(&format!(
1620 "SELECT * FROM {}.delete_instances_atomic($1, $2)",
1621 self.schema_name
1622 ))
1623 .bind(ids)
1624 .bind(force)
1625 .fetch_optional(&*self.pool)
1626 .await
1627 .map_err(|e| {
1628 let err_str = e.to_string();
1629 if err_str.contains("is Running") {
1630 ProviderError::permanent("delete_instances_atomic", err_str)
1631 } else if err_str.contains("Orphan detected") {
1632 ProviderError::permanent("delete_instances_atomic", err_str)
1633 } else {
1634 Self::sqlx_to_provider_error("delete_instances_atomic", e)
1635 }
1636 })?;
1637
1638 let (instances_deleted, executions_deleted, events_deleted, queue_messages_deleted) =
1639 row.unwrap_or((0, 0, 0, 0));
1640
1641 debug!(
1642 target = "duroxide::providers::postgres",
1643 operation = "delete_instances_atomic",
1644 instances_deleted = instances_deleted,
1645 executions_deleted = executions_deleted,
1646 events_deleted = events_deleted,
1647 queue_messages_deleted = queue_messages_deleted,
1648 "Deleted instances atomically"
1649 );
1650
1651 Ok(DeleteInstanceResult {
1652 instances_deleted: instances_deleted as u64,
1653 executions_deleted: executions_deleted as u64,
1654 events_deleted: events_deleted as u64,
1655 queue_messages_deleted: queue_messages_deleted as u64,
1656 })
1657 }
1658
1659 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1660 async fn delete_instance_bulk(
1661 &self,
1662 filter: InstanceFilter,
1663 ) -> Result<DeleteInstanceResult, ProviderError> {
1664 let mut sql = format!(
1666 r#"
1667 SELECT i.instance_id
1668 FROM {}.instances i
1669 LEFT JOIN {}.executions e ON i.instance_id = e.instance_id
1670 AND i.current_execution_id = e.execution_id
1671 WHERE i.parent_instance_id IS NULL
1672 AND e.status IN ('Completed', 'Failed', 'ContinuedAsNew')
1673 "#,
1674 self.schema_name, self.schema_name
1675 );
1676
1677 if let Some(ref ids) = filter.instance_ids {
1679 if ids.is_empty() {
1680 return Ok(DeleteInstanceResult::default());
1681 }
1682 let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1683 sql.push_str(&format!(
1684 " AND i.instance_id IN ({})",
1685 placeholders.join(", ")
1686 ));
1687 }
1688
1689 if filter.completed_before.is_some() {
1691 let param_num = filter
1692 .instance_ids
1693 .as_ref()
1694 .map(|ids| ids.len())
1695 .unwrap_or(0)
1696 + 1;
1697 sql.push_str(&format!(
1698 " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1699 param_num
1700 ));
1701 }
1702
1703 let limit = filter.limit.unwrap_or(1000);
1705 let limit_param_num = filter
1706 .instance_ids
1707 .as_ref()
1708 .map(|ids| ids.len())
1709 .unwrap_or(0)
1710 + if filter.completed_before.is_some() {
1711 1
1712 } else {
1713 0
1714 }
1715 + 1;
1716 sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1717
1718 let mut query = sqlx::query_scalar::<_, String>(&sql);
1720 if let Some(ref ids) = filter.instance_ids {
1721 for id in ids {
1722 query = query.bind(id);
1723 }
1724 }
1725 if let Some(completed_before) = filter.completed_before {
1726 query = query.bind(completed_before as i64);
1727 }
1728 query = query.bind(limit as i64);
1729
1730 let instance_ids: Vec<String> = query
1731 .fetch_all(&*self.pool)
1732 .await
1733 .map_err(|e| Self::sqlx_to_provider_error("delete_instance_bulk", e))?;
1734
1735 if instance_ids.is_empty() {
1736 return Ok(DeleteInstanceResult::default());
1737 }
1738
1739 let mut result = DeleteInstanceResult::default();
1741
1742 for instance_id in &instance_ids {
1743 let tree = self.get_instance_tree(instance_id).await?;
1745
1746 let delete_result = self.delete_instances_atomic(&tree.all_ids, true).await?;
1748 result.instances_deleted += delete_result.instances_deleted;
1749 result.executions_deleted += delete_result.executions_deleted;
1750 result.events_deleted += delete_result.events_deleted;
1751 result.queue_messages_deleted += delete_result.queue_messages_deleted;
1752 }
1753
1754 debug!(
1755 target = "duroxide::providers::postgres",
1756 operation = "delete_instance_bulk",
1757 instances_deleted = result.instances_deleted,
1758 executions_deleted = result.executions_deleted,
1759 events_deleted = result.events_deleted,
1760 queue_messages_deleted = result.queue_messages_deleted,
1761 "Bulk deleted instances"
1762 );
1763
1764 Ok(result)
1765 }
1766
1767 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1770 async fn prune_executions(
1771 &self,
1772 instance_id: &str,
1773 options: PruneOptions,
1774 ) -> Result<PruneResult, ProviderError> {
1775 let keep_last: Option<i32> = options.keep_last.map(|v| v as i32);
1776 let completed_before_ms: Option<i64> = options.completed_before.map(|v| v as i64);
1777
1778 let row: Option<(i64, i64, i64)> = sqlx::query_as(&format!(
1779 "SELECT * FROM {}.prune_executions($1, $2, $3)",
1780 self.schema_name
1781 ))
1782 .bind(instance_id)
1783 .bind(keep_last)
1784 .bind(completed_before_ms)
1785 .fetch_optional(&*self.pool)
1786 .await
1787 .map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?;
1788
1789 let (instances_processed, executions_deleted, events_deleted) = row.unwrap_or((0, 0, 0));
1790
1791 debug!(
1792 target = "duroxide::providers::postgres",
1793 operation = "prune_executions",
1794 instance_id = %instance_id,
1795 instances_processed = instances_processed,
1796 executions_deleted = executions_deleted,
1797 events_deleted = events_deleted,
1798 "Pruned executions"
1799 );
1800
1801 Ok(PruneResult {
1802 instances_processed: instances_processed as u64,
1803 executions_deleted: executions_deleted as u64,
1804 events_deleted: events_deleted as u64,
1805 })
1806 }
1807
1808 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1809 async fn prune_executions_bulk(
1810 &self,
1811 filter: InstanceFilter,
1812 options: PruneOptions,
1813 ) -> Result<PruneResult, ProviderError> {
1814 let mut sql = format!(
1819 r#"
1820 SELECT i.instance_id
1821 FROM {}.instances i
1822 LEFT JOIN {}.executions e ON i.instance_id = e.instance_id
1823 AND i.current_execution_id = e.execution_id
1824 WHERE 1=1
1825 "#,
1826 self.schema_name, self.schema_name
1827 );
1828
1829 if let Some(ref ids) = filter.instance_ids {
1831 if ids.is_empty() {
1832 return Ok(PruneResult::default());
1833 }
1834 let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1835 sql.push_str(&format!(
1836 " AND i.instance_id IN ({})",
1837 placeholders.join(", ")
1838 ));
1839 }
1840
1841 if filter.completed_before.is_some() {
1843 let param_num = filter
1844 .instance_ids
1845 .as_ref()
1846 .map(|ids| ids.len())
1847 .unwrap_or(0)
1848 + 1;
1849 sql.push_str(&format!(
1850 " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1851 param_num
1852 ));
1853 }
1854
1855 let limit = filter.limit.unwrap_or(1000);
1857 let limit_param_num = filter
1858 .instance_ids
1859 .as_ref()
1860 .map(|ids| ids.len())
1861 .unwrap_or(0)
1862 + if filter.completed_before.is_some() {
1863 1
1864 } else {
1865 0
1866 }
1867 + 1;
1868 sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1869
1870 let mut query = sqlx::query_scalar::<_, String>(&sql);
1872 if let Some(ref ids) = filter.instance_ids {
1873 for id in ids {
1874 query = query.bind(id);
1875 }
1876 }
1877 if let Some(completed_before) = filter.completed_before {
1878 query = query.bind(completed_before as i64);
1879 }
1880 query = query.bind(limit as i64);
1881
1882 let instance_ids: Vec<String> = query
1883 .fetch_all(&*self.pool)
1884 .await
1885 .map_err(|e| Self::sqlx_to_provider_error("prune_executions_bulk", e))?;
1886
1887 let mut result = PruneResult::default();
1889
1890 for instance_id in &instance_ids {
1891 let single_result = self.prune_executions(instance_id, options.clone()).await?;
1892 result.instances_processed += single_result.instances_processed;
1893 result.executions_deleted += single_result.executions_deleted;
1894 result.events_deleted += single_result.events_deleted;
1895 }
1896
1897 debug!(
1898 target = "duroxide::providers::postgres",
1899 operation = "prune_executions_bulk",
1900 instances_processed = result.instances_processed,
1901 executions_deleted = result.executions_deleted,
1902 events_deleted = result.events_deleted,
1903 "Bulk pruned executions"
1904 );
1905
1906 Ok(result)
1907 }
1908}