1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4 DeleteInstanceResult, DispatcherCapabilityFilter, ExecutionInfo, ExecutionMetadata,
5 InstanceFilter, InstanceInfo, OrchestrationItem, Provider, ProviderAdmin, ProviderError,
6 PruneOptions, PruneResult, QueueDepths, ScheduledActivityIdentifier, SessionFetchConfig,
7 SystemMetrics, TagFilter, WorkItem,
8};
9use duroxide::{Event, EventKind};
10use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
11use std::sync::Arc;
12use std::time::Duration;
13use std::time::{SystemTime, UNIX_EPOCH};
14use tokio::time::sleep;
15use tracing::{debug, error, instrument, warn};
16
17use crate::migrations::MigrationRunner;
18
19pub struct PostgresProvider {
42 pool: Arc<PgPool>,
43 schema_name: String,
44}
45
46impl PostgresProvider {
47 pub async fn new(database_url: &str) -> Result<Self> {
48 Self::new_with_schema(database_url, None).await
49 }
50
51 pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
52 let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
53 .ok()
54 .and_then(|s| s.parse::<u32>().ok())
55 .unwrap_or(10);
56
57 let pool = PgPoolOptions::new()
58 .max_connections(max_connections)
59 .min_connections(1)
60 .acquire_timeout(std::time::Duration::from_secs(30))
61 .connect(database_url)
62 .await?;
63
64 let schema_name = schema_name.unwrap_or("public").to_string();
65
66 let provider = Self {
67 pool: Arc::new(pool),
68 schema_name: schema_name.clone(),
69 };
70
71 let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
73 migration_runner.migrate().await?;
74
75 Ok(provider)
76 }
77
78 #[instrument(skip(self), target = "duroxide::providers::postgres")]
79 pub async fn initialize_schema(&self) -> Result<()> {
80 let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
83 migration_runner.migrate().await?;
84 Ok(())
85 }
86
87 fn now_millis() -> i64 {
89 SystemTime::now()
90 .duration_since(UNIX_EPOCH)
91 .unwrap()
92 .as_millis() as i64
93 }
94
95 fn table_name(&self, table: &str) -> String {
97 format!("{}.{}", self.schema_name, table)
98 }
99
100 pub fn pool(&self) -> &PgPool {
102 &self.pool
103 }
104
105 pub fn schema_name(&self) -> &str {
107 &self.schema_name
108 }
109
110 fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
112 match e {
113 SqlxError::Database(ref db_err) => {
114 let code_opt = db_err.code();
116 let code = code_opt.as_deref();
117 if code == Some("40P01") {
118 ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
120 } else if code == Some("40001") {
121 ProviderError::permanent(operation, format!("Serialization failure: {e}"))
123 } else if code == Some("23505") {
124 ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
126 } else if code == Some("23503") {
127 ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
129 } else {
130 ProviderError::permanent(operation, format!("Database error: {e}"))
131 }
132 }
133 SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
134 ProviderError::retryable(operation, format!("Connection pool error: {e}"))
135 }
136 SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
137 _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
138 }
139 }
140
141 fn tag_filter_to_sql(filter: &TagFilter) -> (&'static str, Vec<String>) {
143 match filter {
144 TagFilter::DefaultOnly => ("default_only", vec![]),
145 TagFilter::Tags(set) => {
146 let mut tags: Vec<String> = set.iter().cloned().collect();
147 tags.sort();
148 ("tags", tags)
149 }
150 TagFilter::DefaultAnd(set) => {
151 let mut tags: Vec<String> = set.iter().cloned().collect();
152 tags.sort();
153 ("default_and", tags)
154 }
155 TagFilter::Any => ("any", vec![]),
156 TagFilter::None => ("none", vec![]),
157 }
158 }
159
160 pub async fn cleanup_schema(&self) -> Result<()> {
165 const MAX_RETRIES: u32 = 5;
166 const BASE_RETRY_DELAY_MS: u64 = 50;
167
168 for attempt in 0..=MAX_RETRIES {
169 let cleanup_result = async {
170 sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
172 .execute(&*self.pool)
173 .await?;
174
175 if self.schema_name != "public" {
178 sqlx::query(&format!(
179 "DROP SCHEMA IF EXISTS {} CASCADE",
180 self.schema_name
181 ))
182 .execute(&*self.pool)
183 .await?;
184 } else {
185 }
188
189 Ok::<(), SqlxError>(())
190 }
191 .await;
192
193 match cleanup_result {
194 Ok(()) => return Ok(()),
195 Err(SqlxError::Database(db_err)) if db_err.code().as_deref() == Some("40P01") => {
196 if attempt < MAX_RETRIES {
197 warn!(
198 target = "duroxide::providers::postgres",
199 schema = %self.schema_name,
200 attempt = attempt + 1,
201 "Deadlock during cleanup_schema, retrying"
202 );
203 sleep(Duration::from_millis(BASE_RETRY_DELAY_MS * (attempt as u64 + 1)))
204 .await;
205 continue;
206 }
207 return Err(anyhow::anyhow!(db_err.to_string()));
208 }
209 Err(e) => return Err(anyhow::anyhow!(e.to_string())),
210 }
211 }
212
213 Ok(())
214 }
215}
216
217#[async_trait::async_trait]
218impl Provider for PostgresProvider {
219 fn name(&self) -> &str {
220 "duroxide-pg"
221 }
222
223 fn version(&self) -> &str {
224 env!("CARGO_PKG_VERSION")
225 }
226
227 #[instrument(skip(self), target = "duroxide::providers::postgres")]
228 async fn fetch_orchestration_item(
229 &self,
230 lock_timeout: Duration,
231 _poll_timeout: Duration,
232 filter: Option<&DispatcherCapabilityFilter>,
233 ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
234 let start = std::time::Instant::now();
235
236 const MAX_RETRIES: u32 = 3;
237 const RETRY_DELAY_MS: u64 = 50;
238
239 let lock_timeout_ms = lock_timeout.as_millis() as i64;
241 let mut _last_error: Option<ProviderError> = None;
242
243 let (min_packed, max_packed) = if let Some(f) = filter {
245 if let Some(range) = f.supported_duroxide_versions.first() {
246 let min = range.min.major as i64 * 1_000_000
247 + range.min.minor as i64 * 1_000
248 + range.min.patch as i64;
249 let max = range.max.major as i64 * 1_000_000
250 + range.max.minor as i64 * 1_000
251 + range.max.patch as i64;
252 (Some(min), Some(max))
253 } else {
254 return Ok(None);
256 }
257 } else {
258 (None, None)
259 };
260
261 for attempt in 0..=MAX_RETRIES {
262 let now_ms = Self::now_millis();
263
264 let result: Result<
265 Option<(
266 String,
267 String,
268 String,
269 i64,
270 serde_json::Value,
271 serde_json::Value,
272 String,
273 i32,
274 )>,
275 SqlxError,
276 > = sqlx::query_as(&format!(
277 "SELECT * FROM {}.fetch_orchestration_item($1, $2, $3, $4)",
278 self.schema_name
279 ))
280 .bind(now_ms)
281 .bind(lock_timeout_ms)
282 .bind(min_packed)
283 .bind(max_packed)
284 .fetch_optional(&*self.pool)
285 .await;
286
287 let row = match result {
288 Ok(r) => r,
289 Err(e) => {
290 let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
291 if provider_err.is_retryable() && attempt < MAX_RETRIES {
292 warn!(
293 target = "duroxide::providers::postgres",
294 operation = "fetch_orchestration_item",
295 attempt = attempt + 1,
296 error = %provider_err,
297 "Retryable error, will retry"
298 );
299 _last_error = Some(provider_err);
300 sleep(std::time::Duration::from_millis(
301 RETRY_DELAY_MS * (attempt as u64 + 1),
302 ))
303 .await;
304 continue;
305 }
306 return Err(provider_err);
307 }
308 };
309
310 if let Some((
311 instance_id,
312 orchestration_name,
313 orchestration_version,
314 execution_id,
315 history_json,
316 messages_json,
317 lock_token,
318 attempt_count,
319 )) = row
320 {
321 let (history, history_error) =
322 match serde_json::from_value::<Vec<Event>>(history_json) {
323 Ok(h) => (h, None),
324 Err(e) => {
325 let error_msg = format!("Failed to deserialize history: {e}");
326 warn!(
327 target = "duroxide::providers::postgres",
328 instance = %instance_id,
329 error = %error_msg,
330 "History deserialization failed, returning item with history_error"
331 );
332 (vec![], Some(error_msg))
333 }
334 };
335
336 let messages: Vec<WorkItem> =
337 serde_json::from_value(messages_json).map_err(|e| {
338 ProviderError::permanent(
339 "fetch_orchestration_item",
340 format!("Failed to deserialize messages: {e}"),
341 )
342 })?;
343
344 let duration_ms = start.elapsed().as_millis() as u64;
345 debug!(
346 target = "duroxide::providers::postgres",
347 operation = "fetch_orchestration_item",
348 instance_id = %instance_id,
349 execution_id = execution_id,
350 message_count = messages.len(),
351 history_count = history.len(),
352 attempt_count = attempt_count,
353 duration_ms = duration_ms,
354 attempts = attempt + 1,
355 "Fetched orchestration item via stored procedure"
356 );
357
358 if orchestration_name == "Unknown"
364 && history.is_empty()
365 && messages
366 .iter()
367 .all(|m| matches!(m, WorkItem::QueueMessage { .. }))
368 {
369 let message_count = messages.len();
370 tracing::warn!(
371 target = "duroxide::providers::postgres",
372 instance = %instance_id,
373 message_count,
374 "Dropping orphan queue messages — events enqueued before orchestration started are not supported"
375 );
376 self.ack_orchestration_item(
377 &lock_token,
378 execution_id as u64,
379 vec![],
380 vec![],
381 vec![],
382 ExecutionMetadata::default(),
383 vec![],
384 )
385 .await?;
386 return Ok(None);
387 }
388
389 return Ok(Some((
390 OrchestrationItem {
391 instance: instance_id,
392 orchestration_name,
393 execution_id: execution_id as u64,
394 version: orchestration_version,
395 history,
396 messages,
397 history_error,
398 },
399 lock_token,
400 attempt_count as u32,
401 )));
402 }
403
404 return Ok(None);
407 }
408
409 Ok(None)
410 }
411 #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
412 async fn ack_orchestration_item(
413 &self,
414 lock_token: &str,
415 execution_id: u64,
416 history_delta: Vec<Event>,
417 worker_items: Vec<WorkItem>,
418 orchestrator_items: Vec<WorkItem>,
419 metadata: ExecutionMetadata,
420 cancelled_activities: Vec<ScheduledActivityIdentifier>,
421 ) -> Result<(), ProviderError> {
422 let start = std::time::Instant::now();
423
424 const MAX_RETRIES: u32 = 3;
425 const RETRY_DELAY_MS: u64 = 50;
426
427 let mut history_delta_payload = Vec::with_capacity(history_delta.len());
428 for event in &history_delta {
429 if event.event_id() == 0 {
430 return Err(ProviderError::permanent(
431 "ack_orchestration_item",
432 "event_id must be set by runtime",
433 ));
434 }
435
436 let event_json = serde_json::to_string(event).map_err(|e| {
437 ProviderError::permanent(
438 "ack_orchestration_item",
439 format!("Failed to serialize event: {e}"),
440 )
441 })?;
442
443 let event_type = format!("{event:?}")
444 .split('{')
445 .next()
446 .unwrap_or("Unknown")
447 .trim()
448 .to_string();
449
450 history_delta_payload.push(serde_json::json!({
451 "event_id": event.event_id(),
452 "event_type": event_type,
453 "event_data": event_json,
454 }));
455 }
456
457 let history_delta_json = serde_json::Value::Array(history_delta_payload);
458
459 let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
460 ProviderError::permanent(
461 "ack_orchestration_item",
462 format!("Failed to serialize worker items: {e}"),
463 )
464 })?;
465
466 let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
467 ProviderError::permanent(
468 "ack_orchestration_item",
469 format!("Failed to serialize orchestrator items: {e}"),
470 )
471 })?;
472
473 let (custom_status_action, custom_status_value): (Option<&str>, Option<&str>) = {
475 let mut last_status: Option<&Option<String>> = None;
476 for event in &history_delta {
477 if let EventKind::CustomStatusUpdated { ref status } = event.kind {
478 last_status = Some(status);
479 }
480 }
481 match last_status {
482 Some(Some(s)) => (Some("set"), Some(s.as_str())),
483 Some(None) => (Some("clear"), None),
484 None => (None, None),
485 }
486 };
487
488 let metadata_json = serde_json::json!({
489 "orchestration_name": metadata.orchestration_name,
490 "orchestration_version": metadata.orchestration_version,
491 "status": metadata.status,
492 "output": metadata.output,
493 "parent_instance_id": metadata.parent_instance_id,
494 "pinned_duroxide_version": metadata.pinned_duroxide_version.as_ref().map(|v| {
495 serde_json::json!({
496 "major": v.major,
497 "minor": v.minor,
498 "patch": v.patch,
499 })
500 }),
501 "custom_status_action": custom_status_action,
502 "custom_status_value": custom_status_value,
503 });
504
505 let cancelled_activities_json: Vec<serde_json::Value> = cancelled_activities
507 .iter()
508 .map(|a| {
509 serde_json::json!({
510 "instance": a.instance,
511 "execution_id": a.execution_id,
512 "activity_id": a.activity_id,
513 })
514 })
515 .collect();
516 let cancelled_activities_json = serde_json::Value::Array(cancelled_activities_json);
517
518 for attempt in 0..=MAX_RETRIES {
519 let now_ms = Self::now_millis();
520 let result = sqlx::query(&format!(
521 "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6, $7, $8)",
522 self.schema_name
523 ))
524 .bind(lock_token)
525 .bind(now_ms)
526 .bind(execution_id as i64)
527 .bind(&history_delta_json)
528 .bind(&worker_items_json)
529 .bind(&orchestrator_items_json)
530 .bind(&metadata_json)
531 .bind(&cancelled_activities_json)
532 .execute(&*self.pool)
533 .await;
534
535 match result {
536 Ok(_) => {
537 let duration_ms = start.elapsed().as_millis() as u64;
538 debug!(
539 target = "duroxide::providers::postgres",
540 operation = "ack_orchestration_item",
541 execution_id = execution_id,
542 history_count = history_delta.len(),
543 worker_items_count = worker_items.len(),
544 orchestrator_items_count = orchestrator_items.len(),
545 cancelled_activities_count = cancelled_activities.len(),
546 duration_ms = duration_ms,
547 attempts = attempt + 1,
548 "Acknowledged orchestration item via stored procedure"
549 );
550 return Ok(());
551 }
552 Err(e) => {
553 if let SqlxError::Database(db_err) = &e {
555 if db_err.message().contains("Invalid lock token") {
556 return Err(ProviderError::permanent(
557 "ack_orchestration_item",
558 "Invalid lock token",
559 ));
560 }
561 } else if e.to_string().contains("Invalid lock token") {
562 return Err(ProviderError::permanent(
563 "ack_orchestration_item",
564 "Invalid lock token",
565 ));
566 }
567
568 let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
569 if provider_err.is_retryable() && attempt < MAX_RETRIES {
570 warn!(
571 target = "duroxide::providers::postgres",
572 operation = "ack_orchestration_item",
573 attempt = attempt + 1,
574 error = %provider_err,
575 "Retryable error, will retry"
576 );
577 sleep(std::time::Duration::from_millis(
578 RETRY_DELAY_MS * (attempt as u64 + 1),
579 ))
580 .await;
581 continue;
582 }
583 return Err(provider_err);
584 }
585 }
586 }
587
588 Ok(())
590 }
591 #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
592 async fn abandon_orchestration_item(
593 &self,
594 lock_token: &str,
595 delay: Option<Duration>,
596 ignore_attempt: bool,
597 ) -> Result<(), ProviderError> {
598 let start = std::time::Instant::now();
599 let now_ms = Self::now_millis();
600 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
601
602 let instance_id = match sqlx::query_scalar::<_, String>(&format!(
603 "SELECT {}.abandon_orchestration_item($1, $2, $3, $4)",
604 self.schema_name
605 ))
606 .bind(lock_token)
607 .bind(now_ms)
608 .bind(delay_param)
609 .bind(ignore_attempt)
610 .fetch_one(&*self.pool)
611 .await
612 {
613 Ok(instance_id) => instance_id,
614 Err(e) => {
615 if let SqlxError::Database(db_err) = &e {
616 if db_err.message().contains("Invalid lock token") {
617 return Err(ProviderError::permanent(
618 "abandon_orchestration_item",
619 "Invalid lock token",
620 ));
621 }
622 } else if e.to_string().contains("Invalid lock token") {
623 return Err(ProviderError::permanent(
624 "abandon_orchestration_item",
625 "Invalid lock token",
626 ));
627 }
628
629 return Err(Self::sqlx_to_provider_error(
630 "abandon_orchestration_item",
631 e,
632 ));
633 }
634 };
635
636 let duration_ms = start.elapsed().as_millis() as u64;
637 debug!(
638 target = "duroxide::providers::postgres",
639 operation = "abandon_orchestration_item",
640 instance_id = %instance_id,
641 delay_ms = delay.map(|d| d.as_millis() as u64),
642 ignore_attempt = ignore_attempt,
643 duration_ms = duration_ms,
644 "Abandoned orchestration item via stored procedure"
645 );
646
647 Ok(())
648 }
649
650 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
651 async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
652 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
653 "SELECT out_event_data FROM {}.fetch_history($1)",
654 self.schema_name
655 ))
656 .bind(instance)
657 .fetch_all(&*self.pool)
658 .await
659 .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
660
661 Ok(event_data_rows
662 .into_iter()
663 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
664 .collect())
665 }
666
667 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
668 async fn append_with_execution(
669 &self,
670 instance: &str,
671 execution_id: u64,
672 new_events: Vec<Event>,
673 ) -> Result<(), ProviderError> {
674 if new_events.is_empty() {
675 return Ok(());
676 }
677
678 let mut events_payload = Vec::with_capacity(new_events.len());
679 for event in &new_events {
680 if event.event_id() == 0 {
681 error!(
682 target = "duroxide::providers::postgres",
683 operation = "append_with_execution",
684 error_type = "validation_error",
685 instance_id = %instance,
686 execution_id = execution_id,
687 "event_id must be set by runtime"
688 );
689 return Err(ProviderError::permanent(
690 "append_with_execution",
691 "event_id must be set by runtime",
692 ));
693 }
694
695 let event_json = serde_json::to_string(event).map_err(|e| {
696 ProviderError::permanent(
697 "append_with_execution",
698 format!("Failed to serialize event: {e}"),
699 )
700 })?;
701
702 let event_type = format!("{event:?}")
703 .split('{')
704 .next()
705 .unwrap_or("Unknown")
706 .trim()
707 .to_string();
708
709 events_payload.push(serde_json::json!({
710 "event_id": event.event_id(),
711 "event_type": event_type,
712 "event_data": event_json,
713 }));
714 }
715
716 let events_json = serde_json::Value::Array(events_payload);
717
718 sqlx::query(&format!(
719 "SELECT {}.append_history($1, $2, $3)",
720 self.schema_name
721 ))
722 .bind(instance)
723 .bind(execution_id as i64)
724 .bind(events_json)
725 .execute(&*self.pool)
726 .await
727 .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
728
729 debug!(
730 target = "duroxide::providers::postgres",
731 operation = "append_with_execution",
732 instance_id = %instance,
733 execution_id = execution_id,
734 event_count = new_events.len(),
735 "Appended history events via stored procedure"
736 );
737
738 Ok(())
739 }
740
741 #[instrument(skip(self), target = "duroxide::providers::postgres")]
742 async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
743 let work_item = serde_json::to_string(&item).map_err(|e| {
744 ProviderError::permanent(
745 "enqueue_worker_work",
746 format!("Failed to serialize work item: {e}"),
747 )
748 })?;
749
750 let now_ms = Self::now_millis();
751
752 let (instance_id, execution_id, activity_id, session_id, tag) = match &item {
754 WorkItem::ActivityExecute {
755 instance,
756 execution_id,
757 id,
758 session_id,
759 tag,
760 ..
761 } => (
762 Some(instance.clone()),
763 Some(*execution_id as i64),
764 Some(*id as i64),
765 session_id.clone(),
766 tag.clone(),
767 ),
768 _ => (None, None, None, None, None),
769 };
770
771 sqlx::query(&format!(
772 "SELECT {}.enqueue_worker_work($1, $2, $3, $4, $5, $6, $7)",
773 self.schema_name
774 ))
775 .bind(work_item)
776 .bind(now_ms)
777 .bind(&instance_id)
778 .bind(execution_id)
779 .bind(activity_id)
780 .bind(&session_id)
781 .bind(&tag)
782 .execute(&*self.pool)
783 .await
784 .map_err(|e| {
785 error!(
786 target = "duroxide::providers::postgres",
787 operation = "enqueue_worker_work",
788 error_type = "database_error",
789 error = %e,
790 "Failed to enqueue worker work"
791 );
792 Self::sqlx_to_provider_error("enqueue_worker_work", e)
793 })?;
794
795 Ok(())
796 }
797
798 #[instrument(skip(self), target = "duroxide::providers::postgres")]
799 async fn fetch_work_item(
800 &self,
801 lock_timeout: Duration,
802 _poll_timeout: Duration,
803 session: Option<&SessionFetchConfig>,
804 tag_filter: &TagFilter,
805 ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
806 if matches!(tag_filter, TagFilter::None) {
808 return Ok(None);
809 }
810
811 let start = std::time::Instant::now();
812
813 let lock_timeout_ms = lock_timeout.as_millis() as i64;
815
816 let (owner_id, session_lock_timeout_ms): (Option<&str>, Option<i64>) = match session {
818 Some(config) => (
819 Some(&config.owner_id),
820 Some(config.lock_timeout.as_millis() as i64),
821 ),
822 None => (None, None),
823 };
824
825 let (tag_mode, tag_names) = Self::tag_filter_to_sql(tag_filter);
827
828 let row = match sqlx::query_as::<_, (String, String, i32)>(&format!(
829 "SELECT * FROM {}.fetch_work_item($1, $2, $3, $4, $5, $6)",
830 self.schema_name
831 ))
832 .bind(Self::now_millis())
833 .bind(lock_timeout_ms)
834 .bind(owner_id)
835 .bind(session_lock_timeout_ms)
836 .bind(&tag_names)
837 .bind(tag_mode)
838 .fetch_optional(&*self.pool)
839 .await
840 {
841 Ok(row) => row,
842 Err(e) => {
843 return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
844 }
845 };
846
847 let (work_item_json, lock_token, attempt_count) = match row {
848 Some(row) => row,
849 None => return Ok(None),
850 };
851
852 let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
853 ProviderError::permanent(
854 "fetch_work_item",
855 format!("Failed to deserialize worker item: {e}"),
856 )
857 })?;
858
859 let duration_ms = start.elapsed().as_millis() as u64;
860
861 let instance_id = match &work_item {
863 WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
864 WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
865 WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
866 WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
867 WorkItem::TimerFired { instance, .. } => instance.as_str(),
868 WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
869 WorkItem::CancelInstance { instance, .. } => instance.as_str(),
870 WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
871 WorkItem::SubOrchCompleted {
872 parent_instance, ..
873 } => parent_instance.as_str(),
874 WorkItem::SubOrchFailed {
875 parent_instance, ..
876 } => parent_instance.as_str(),
877 WorkItem::QueueMessage { instance, .. } => instance.as_str(),
878 };
879
880 debug!(
881 target = "duroxide::providers::postgres",
882 operation = "fetch_work_item",
883 instance_id = %instance_id,
884 attempt_count = attempt_count,
885 duration_ms = duration_ms,
886 "Fetched activity work item via stored procedure"
887 );
888
889 Ok(Some((work_item, lock_token, attempt_count as u32)))
890 }
891
892 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
893 async fn ack_work_item(
894 &self,
895 token: &str,
896 completion: Option<WorkItem>,
897 ) -> Result<(), ProviderError> {
898 let start = std::time::Instant::now();
899
900 let Some(completion) = completion else {
902 let now_ms = Self::now_millis();
903 sqlx::query(&format!(
905 "SELECT {}.ack_worker($1, NULL, NULL, $2)",
906 self.schema_name
907 ))
908 .bind(token)
909 .bind(now_ms)
910 .execute(&*self.pool)
911 .await
912 .map_err(|e| {
913 if e.to_string().contains("Worker queue item not found") {
914 ProviderError::permanent(
915 "ack_worker",
916 "Worker queue item not found or already processed",
917 )
918 } else {
919 Self::sqlx_to_provider_error("ack_worker", e)
920 }
921 })?;
922
923 let duration_ms = start.elapsed().as_millis() as u64;
924 debug!(
925 target = "duroxide::providers::postgres",
926 operation = "ack_worker",
927 token = %token,
928 duration_ms = duration_ms,
929 "Acknowledged worker without completion (cancelled)"
930 );
931 return Ok(());
932 };
933
934 let instance_id = match &completion {
936 WorkItem::ActivityCompleted { instance, .. }
937 | WorkItem::ActivityFailed { instance, .. } => instance,
938 _ => {
939 error!(
940 target = "duroxide::providers::postgres",
941 operation = "ack_worker",
942 error_type = "invalid_completion_type",
943 "Invalid completion work item type"
944 );
945 return Err(ProviderError::permanent(
946 "ack_worker",
947 "Invalid completion work item type",
948 ));
949 }
950 };
951
952 let completion_json = serde_json::to_string(&completion).map_err(|e| {
953 ProviderError::permanent("ack_worker", format!("Failed to serialize completion: {e}"))
954 })?;
955
956 let now_ms = Self::now_millis();
957
958 sqlx::query(&format!(
960 "SELECT {}.ack_worker($1, $2, $3, $4)",
961 self.schema_name
962 ))
963 .bind(token)
964 .bind(instance_id)
965 .bind(completion_json)
966 .bind(now_ms)
967 .execute(&*self.pool)
968 .await
969 .map_err(|e| {
970 if e.to_string().contains("Worker queue item not found") {
971 error!(
972 target = "duroxide::providers::postgres",
973 operation = "ack_worker",
974 error_type = "worker_item_not_found",
975 token = %token,
976 "Worker queue item not found or already processed"
977 );
978 ProviderError::permanent(
979 "ack_worker",
980 "Worker queue item not found or already processed",
981 )
982 } else {
983 Self::sqlx_to_provider_error("ack_worker", e)
984 }
985 })?;
986
987 let duration_ms = start.elapsed().as_millis() as u64;
988 debug!(
989 target = "duroxide::providers::postgres",
990 operation = "ack_worker",
991 instance_id = %instance_id,
992 duration_ms = duration_ms,
993 "Acknowledged worker and enqueued completion"
994 );
995
996 Ok(())
997 }
998
999 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1000 async fn renew_work_item_lock(
1001 &self,
1002 token: &str,
1003 extend_for: Duration,
1004 ) -> Result<(), ProviderError> {
1005 let start = std::time::Instant::now();
1006
1007 let now_ms = Self::now_millis();
1009
1010 let extend_secs = extend_for.as_secs() as i64;
1012
1013 match sqlx::query(&format!(
1014 "SELECT {}.renew_work_item_lock($1, $2, $3)",
1015 self.schema_name
1016 ))
1017 .bind(token)
1018 .bind(now_ms)
1019 .bind(extend_secs)
1020 .execute(&*self.pool)
1021 .await
1022 {
1023 Ok(_) => {
1024 let duration_ms = start.elapsed().as_millis() as u64;
1025 debug!(
1026 target = "duroxide::providers::postgres",
1027 operation = "renew_work_item_lock",
1028 token = %token,
1029 extend_for_secs = extend_secs,
1030 duration_ms = duration_ms,
1031 "Work item lock renewed successfully"
1032 );
1033 Ok(())
1034 }
1035 Err(e) => {
1036 if let SqlxError::Database(db_err) = &e {
1037 if db_err.message().contains("Lock token invalid") {
1038 return Err(ProviderError::permanent(
1039 "renew_work_item_lock",
1040 "Lock token invalid, expired, or already acked",
1041 ));
1042 }
1043 } else if e.to_string().contains("Lock token invalid") {
1044 return Err(ProviderError::permanent(
1045 "renew_work_item_lock",
1046 "Lock token invalid, expired, or already acked",
1047 ));
1048 }
1049
1050 Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
1051 }
1052 }
1053 }
1054
1055 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1056 async fn abandon_work_item(
1057 &self,
1058 token: &str,
1059 delay: Option<Duration>,
1060 ignore_attempt: bool,
1061 ) -> Result<(), ProviderError> {
1062 let start = std::time::Instant::now();
1063 let now_ms = Self::now_millis();
1064 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
1065
1066 match sqlx::query(&format!(
1067 "SELECT {}.abandon_work_item($1, $2, $3, $4)",
1068 self.schema_name
1069 ))
1070 .bind(token)
1071 .bind(now_ms)
1072 .bind(delay_param)
1073 .bind(ignore_attempt)
1074 .execute(&*self.pool)
1075 .await
1076 {
1077 Ok(_) => {
1078 let duration_ms = start.elapsed().as_millis() as u64;
1079 debug!(
1080 target = "duroxide::providers::postgres",
1081 operation = "abandon_work_item",
1082 token = %token,
1083 delay_ms = delay.map(|d| d.as_millis() as u64),
1084 ignore_attempt = ignore_attempt,
1085 duration_ms = duration_ms,
1086 "Abandoned work item via stored procedure"
1087 );
1088 Ok(())
1089 }
1090 Err(e) => {
1091 if let SqlxError::Database(db_err) = &e {
1092 if db_err.message().contains("Invalid lock token")
1093 || db_err.message().contains("already acked")
1094 {
1095 return Err(ProviderError::permanent(
1096 "abandon_work_item",
1097 "Invalid lock token or already acked",
1098 ));
1099 }
1100 } else if e.to_string().contains("Invalid lock token")
1101 || e.to_string().contains("already acked")
1102 {
1103 return Err(ProviderError::permanent(
1104 "abandon_work_item",
1105 "Invalid lock token or already acked",
1106 ));
1107 }
1108
1109 Err(Self::sqlx_to_provider_error("abandon_work_item", e))
1110 }
1111 }
1112 }
1113
1114 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1115 async fn renew_orchestration_item_lock(
1116 &self,
1117 token: &str,
1118 extend_for: Duration,
1119 ) -> Result<(), ProviderError> {
1120 let start = std::time::Instant::now();
1121
1122 let now_ms = Self::now_millis();
1124
1125 let extend_secs = extend_for.as_secs() as i64;
1127
1128 match sqlx::query(&format!(
1129 "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
1130 self.schema_name
1131 ))
1132 .bind(token)
1133 .bind(now_ms)
1134 .bind(extend_secs)
1135 .execute(&*self.pool)
1136 .await
1137 {
1138 Ok(_) => {
1139 let duration_ms = start.elapsed().as_millis() as u64;
1140 debug!(
1141 target = "duroxide::providers::postgres",
1142 operation = "renew_orchestration_item_lock",
1143 token = %token,
1144 extend_for_secs = extend_secs,
1145 duration_ms = duration_ms,
1146 "Orchestration item lock renewed successfully"
1147 );
1148 Ok(())
1149 }
1150 Err(e) => {
1151 if let SqlxError::Database(db_err) = &e {
1152 if db_err.message().contains("Lock token invalid")
1153 || db_err.message().contains("expired")
1154 || db_err.message().contains("already released")
1155 {
1156 return Err(ProviderError::permanent(
1157 "renew_orchestration_item_lock",
1158 "Lock token invalid, expired, or already released",
1159 ));
1160 }
1161 } else if e.to_string().contains("Lock token invalid")
1162 || e.to_string().contains("expired")
1163 || e.to_string().contains("already released")
1164 {
1165 return Err(ProviderError::permanent(
1166 "renew_orchestration_item_lock",
1167 "Lock token invalid, expired, or already released",
1168 ));
1169 }
1170
1171 Err(Self::sqlx_to_provider_error(
1172 "renew_orchestration_item_lock",
1173 e,
1174 ))
1175 }
1176 }
1177 }
1178
1179 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1180 async fn enqueue_for_orchestrator(
1181 &self,
1182 item: WorkItem,
1183 delay: Option<Duration>,
1184 ) -> Result<(), ProviderError> {
1185 let work_item = serde_json::to_string(&item).map_err(|e| {
1186 ProviderError::permanent(
1187 "enqueue_orchestrator_work",
1188 format!("Failed to serialize work item: {e}"),
1189 )
1190 })?;
1191
1192 let instance_id = match &item {
1194 WorkItem::StartOrchestration { instance, .. }
1195 | WorkItem::ActivityCompleted { instance, .. }
1196 | WorkItem::ActivityFailed { instance, .. }
1197 | WorkItem::TimerFired { instance, .. }
1198 | WorkItem::ExternalRaised { instance, .. }
1199 | WorkItem::CancelInstance { instance, .. }
1200 | WorkItem::ContinueAsNew { instance, .. }
1201 | WorkItem::QueueMessage { instance, .. } => instance,
1202 WorkItem::SubOrchCompleted {
1203 parent_instance, ..
1204 }
1205 | WorkItem::SubOrchFailed {
1206 parent_instance, ..
1207 } => parent_instance,
1208 WorkItem::ActivityExecute { .. } => {
1209 return Err(ProviderError::permanent(
1210 "enqueue_orchestrator_work",
1211 "ActivityExecute should go to worker queue, not orchestrator queue",
1212 ));
1213 }
1214 };
1215
1216 let now_ms = Self::now_millis();
1218
1219 let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
1220 if *fire_at_ms > 0 {
1221 if let Some(delay) = delay {
1223 std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
1224 } else {
1225 *fire_at_ms
1226 }
1227 } else {
1228 delay
1230 .map(|d| now_ms as u64 + d.as_millis() as u64)
1231 .unwrap_or(now_ms as u64)
1232 }
1233 } else {
1234 delay
1236 .map(|d| now_ms as u64 + d.as_millis() as u64)
1237 .unwrap_or(now_ms as u64)
1238 };
1239
1240 let visible_at = Utc
1241 .timestamp_millis_opt(visible_at_ms as i64)
1242 .single()
1243 .ok_or_else(|| {
1244 ProviderError::permanent(
1245 "enqueue_orchestrator_work",
1246 "Invalid visible_at timestamp",
1247 )
1248 })?;
1249
1250 sqlx::query(&format!(
1255 "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
1256 self.schema_name
1257 ))
1258 .bind(instance_id)
1259 .bind(&work_item)
1260 .bind(visible_at)
1261 .bind::<Option<String>>(None) .bind::<Option<String>>(None) .bind::<Option<i64>>(None) .execute(&*self.pool)
1265 .await
1266 .map_err(|e| {
1267 error!(
1268 target = "duroxide::providers::postgres",
1269 operation = "enqueue_orchestrator_work",
1270 error_type = "database_error",
1271 error = %e,
1272 instance_id = %instance_id,
1273 "Failed to enqueue orchestrator work"
1274 );
1275 Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1276 })?;
1277
1278 debug!(
1279 target = "duroxide::providers::postgres",
1280 operation = "enqueue_orchestrator_work",
1281 instance_id = %instance_id,
1282 delay_ms = delay.map(|d| d.as_millis() as u64),
1283 "Enqueued orchestrator work"
1284 );
1285
1286 Ok(())
1287 }
1288
1289 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1290 async fn read_with_execution(
1291 &self,
1292 instance: &str,
1293 execution_id: u64,
1294 ) -> Result<Vec<Event>, ProviderError> {
1295 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1296 "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1297 self.table_name("history")
1298 ))
1299 .bind(instance)
1300 .bind(execution_id as i64)
1301 .fetch_all(&*self.pool)
1302 .await
1303 .ok()
1304 .unwrap_or_default();
1305
1306 Ok(event_data_rows
1307 .into_iter()
1308 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1309 .collect())
1310 }
1311
1312 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1313 async fn renew_session_lock(
1314 &self,
1315 owner_ids: &[&str],
1316 extend_for: Duration,
1317 idle_timeout: Duration,
1318 ) -> Result<usize, ProviderError> {
1319 if owner_ids.is_empty() {
1320 return Ok(0);
1321 }
1322
1323 let now_ms = Self::now_millis();
1324 let extend_ms = extend_for.as_millis() as i64;
1325 let idle_timeout_ms = idle_timeout.as_millis() as i64;
1326 let owner_ids_vec: Vec<&str> = owner_ids.to_vec();
1327
1328 let result = sqlx::query_scalar::<_, i64>(&format!(
1329 "SELECT {}.renew_session_lock($1, $2, $3, $4)",
1330 self.schema_name
1331 ))
1332 .bind(&owner_ids_vec)
1333 .bind(now_ms)
1334 .bind(extend_ms)
1335 .bind(idle_timeout_ms)
1336 .fetch_one(&*self.pool)
1337 .await
1338 .map_err(|e| Self::sqlx_to_provider_error("renew_session_lock", e))?;
1339
1340 debug!(
1341 target = "duroxide::providers::postgres",
1342 operation = "renew_session_lock",
1343 owner_count = owner_ids.len(),
1344 sessions_renewed = result,
1345 "Session locks renewed"
1346 );
1347
1348 Ok(result as usize)
1349 }
1350
1351 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1352 async fn cleanup_orphaned_sessions(
1353 &self,
1354 _idle_timeout: Duration,
1355 ) -> Result<usize, ProviderError> {
1356 let now_ms = Self::now_millis();
1357
1358 let result = sqlx::query_scalar::<_, i64>(&format!(
1359 "SELECT {}.cleanup_orphaned_sessions($1)",
1360 self.schema_name
1361 ))
1362 .bind(now_ms)
1363 .fetch_one(&*self.pool)
1364 .await
1365 .map_err(|e| Self::sqlx_to_provider_error("cleanup_orphaned_sessions", e))?;
1366
1367 debug!(
1368 target = "duroxide::providers::postgres",
1369 operation = "cleanup_orphaned_sessions",
1370 sessions_cleaned = result,
1371 "Orphaned sessions cleaned up"
1372 );
1373
1374 Ok(result as usize)
1375 }
1376
1377 fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1378 Some(self)
1379 }
1380
1381 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1382 async fn get_custom_status(
1383 &self,
1384 instance: &str,
1385 last_seen_version: u64,
1386 ) -> Result<Option<(Option<String>, u64)>, ProviderError> {
1387 let row = sqlx::query_as::<_, (Option<String>, i64)>(&format!(
1388 "SELECT * FROM {}.get_custom_status($1, $2)",
1389 self.schema_name
1390 ))
1391 .bind(instance)
1392 .bind(last_seen_version as i64)
1393 .fetch_optional(&*self.pool)
1394 .await
1395 .map_err(|e| Self::sqlx_to_provider_error("get_custom_status", e))?;
1396
1397 match row {
1398 Some((custom_status, version)) => Ok(Some((custom_status, version as u64))),
1399 None => Ok(None),
1400 }
1401 }
1402}
1403
1404#[async_trait::async_trait]
1405impl ProviderAdmin for PostgresProvider {
1406 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1407 async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1408 sqlx::query_scalar(&format!(
1409 "SELECT instance_id FROM {}.list_instances()",
1410 self.schema_name
1411 ))
1412 .fetch_all(&*self.pool)
1413 .await
1414 .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1415 }
1416
1417 #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1418 async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1419 sqlx::query_scalar(&format!(
1420 "SELECT instance_id FROM {}.list_instances_by_status($1)",
1421 self.schema_name
1422 ))
1423 .bind(status)
1424 .fetch_all(&*self.pool)
1425 .await
1426 .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1427 }
1428
1429 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1430 async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1431 let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1432 "SELECT execution_id FROM {}.list_executions($1)",
1433 self.schema_name
1434 ))
1435 .bind(instance)
1436 .fetch_all(&*self.pool)
1437 .await
1438 .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1439
1440 Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1441 }
1442
1443 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1444 async fn read_history_with_execution_id(
1445 &self,
1446 instance: &str,
1447 execution_id: u64,
1448 ) -> Result<Vec<Event>, ProviderError> {
1449 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1450 "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1451 self.schema_name
1452 ))
1453 .bind(instance)
1454 .bind(execution_id as i64)
1455 .fetch_all(&*self.pool)
1456 .await
1457 .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1458
1459 event_data_rows
1460 .into_iter()
1461 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1462 .collect::<Vec<Event>>()
1463 .into_iter()
1464 .map(Ok)
1465 .collect()
1466 }
1467
1468 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1469 async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1470 let execution_id = self.latest_execution_id(instance).await?;
1471 self.read_history_with_execution_id(instance, execution_id)
1472 .await
1473 }
1474
1475 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1476 async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1477 sqlx::query_scalar(&format!(
1478 "SELECT {}.latest_execution_id($1)",
1479 self.schema_name
1480 ))
1481 .bind(instance)
1482 .fetch_optional(&*self.pool)
1483 .await
1484 .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1485 .map(|id: i64| id as u64)
1486 .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1487 }
1488
1489 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1490 async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1491 let row: Option<(
1492 String,
1493 String,
1494 String,
1495 i64,
1496 chrono::DateTime<Utc>,
1497 Option<chrono::DateTime<Utc>>,
1498 Option<String>,
1499 Option<String>,
1500 Option<String>,
1501 )> = sqlx::query_as(&format!(
1502 "SELECT * FROM {}.get_instance_info($1)",
1503 self.schema_name
1504 ))
1505 .bind(instance)
1506 .fetch_optional(&*self.pool)
1507 .await
1508 .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1509
1510 let (
1511 instance_id,
1512 orchestration_name,
1513 orchestration_version,
1514 current_execution_id,
1515 created_at,
1516 updated_at,
1517 status,
1518 output,
1519 parent_instance_id,
1520 ) =
1521 row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1522
1523 Ok(InstanceInfo {
1524 instance_id,
1525 orchestration_name,
1526 orchestration_version,
1527 current_execution_id: current_execution_id as u64,
1528 status: status.unwrap_or_else(|| "Running".to_string()),
1529 output,
1530 created_at: created_at.timestamp_millis() as u64,
1531 updated_at: updated_at
1532 .map(|dt| dt.timestamp_millis() as u64)
1533 .unwrap_or(created_at.timestamp_millis() as u64),
1534 parent_instance_id,
1535 })
1536 }
1537
1538 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1539 async fn get_execution_info(
1540 &self,
1541 instance: &str,
1542 execution_id: u64,
1543 ) -> Result<ExecutionInfo, ProviderError> {
1544 let row: Option<(
1545 i64,
1546 String,
1547 Option<String>,
1548 chrono::DateTime<Utc>,
1549 Option<chrono::DateTime<Utc>>,
1550 i64,
1551 )> = sqlx::query_as(&format!(
1552 "SELECT * FROM {}.get_execution_info($1, $2)",
1553 self.schema_name
1554 ))
1555 .bind(instance)
1556 .bind(execution_id as i64)
1557 .fetch_optional(&*self.pool)
1558 .await
1559 .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1560
1561 let (exec_id, status, output, started_at, completed_at, event_count) = row
1562 .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1563
1564 Ok(ExecutionInfo {
1565 execution_id: exec_id as u64,
1566 status,
1567 output,
1568 started_at: started_at.timestamp_millis() as u64,
1569 completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1570 event_count: event_count as usize,
1571 })
1572 }
1573
1574 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1575 async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1576 let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1577 "SELECT * FROM {}.get_system_metrics()",
1578 self.schema_name
1579 ))
1580 .fetch_optional(&*self.pool)
1581 .await
1582 .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1583
1584 let (
1585 total_instances,
1586 total_executions,
1587 running_instances,
1588 completed_instances,
1589 failed_instances,
1590 total_events,
1591 ) = row.ok_or_else(|| {
1592 ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1593 })?;
1594
1595 Ok(SystemMetrics {
1596 total_instances: total_instances as u64,
1597 total_executions: total_executions as u64,
1598 running_instances: running_instances as u64,
1599 completed_instances: completed_instances as u64,
1600 failed_instances: failed_instances as u64,
1601 total_events: total_events as u64,
1602 })
1603 }
1604
1605 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1606 async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1607 let now_ms = Self::now_millis();
1608
1609 let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1610 "SELECT * FROM {}.get_queue_depths($1)",
1611 self.schema_name
1612 ))
1613 .bind(now_ms)
1614 .fetch_optional(&*self.pool)
1615 .await
1616 .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1617
1618 let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1619 ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1620 })?;
1621
1622 Ok(QueueDepths {
1623 orchestrator_queue: orchestrator_queue as usize,
1624 worker_queue: worker_queue as usize,
1625 timer_queue: 0, })
1627 }
1628
1629 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1632 async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError> {
1633 sqlx::query_scalar(&format!(
1634 "SELECT child_instance_id FROM {}.list_children($1)",
1635 self.schema_name
1636 ))
1637 .bind(instance_id)
1638 .fetch_all(&*self.pool)
1639 .await
1640 .map_err(|e| Self::sqlx_to_provider_error("list_children", e))
1641 }
1642
1643 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1644 async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError> {
1645 let result: Result<Option<String>, _> =
1648 sqlx::query_scalar(&format!("SELECT {}.get_parent_id($1)", self.schema_name))
1649 .bind(instance_id)
1650 .fetch_one(&*self.pool)
1651 .await;
1652
1653 match result {
1654 Ok(parent_id) => Ok(parent_id),
1655 Err(e) => {
1656 let err_str = e.to_string();
1657 if err_str.contains("Instance not found") {
1658 Err(ProviderError::permanent(
1659 "get_parent_id",
1660 format!("Instance not found: {}", instance_id),
1661 ))
1662 } else {
1663 Err(Self::sqlx_to_provider_error("get_parent_id", e))
1664 }
1665 }
1666 }
1667 }
1668
1669 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1672 async fn delete_instances_atomic(
1673 &self,
1674 ids: &[String],
1675 force: bool,
1676 ) -> Result<DeleteInstanceResult, ProviderError> {
1677 if ids.is_empty() {
1678 return Ok(DeleteInstanceResult::default());
1679 }
1680
1681 let row: Option<(i64, i64, i64, i64)> = sqlx::query_as(&format!(
1682 "SELECT * FROM {}.delete_instances_atomic($1, $2)",
1683 self.schema_name
1684 ))
1685 .bind(ids)
1686 .bind(force)
1687 .fetch_optional(&*self.pool)
1688 .await
1689 .map_err(|e| {
1690 let err_str = e.to_string();
1691 if err_str.contains("is Running") {
1692 ProviderError::permanent("delete_instances_atomic", err_str)
1693 } else if err_str.contains("Orphan detected") {
1694 ProviderError::permanent("delete_instances_atomic", err_str)
1695 } else {
1696 Self::sqlx_to_provider_error("delete_instances_atomic", e)
1697 }
1698 })?;
1699
1700 let (instances_deleted, executions_deleted, events_deleted, queue_messages_deleted) =
1701 row.unwrap_or((0, 0, 0, 0));
1702
1703 debug!(
1704 target = "duroxide::providers::postgres",
1705 operation = "delete_instances_atomic",
1706 instances_deleted = instances_deleted,
1707 executions_deleted = executions_deleted,
1708 events_deleted = events_deleted,
1709 queue_messages_deleted = queue_messages_deleted,
1710 "Deleted instances atomically"
1711 );
1712
1713 Ok(DeleteInstanceResult {
1714 instances_deleted: instances_deleted as u64,
1715 executions_deleted: executions_deleted as u64,
1716 events_deleted: events_deleted as u64,
1717 queue_messages_deleted: queue_messages_deleted as u64,
1718 })
1719 }
1720
1721 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1722 async fn delete_instance_bulk(
1723 &self,
1724 filter: InstanceFilter,
1725 ) -> Result<DeleteInstanceResult, ProviderError> {
1726 let mut sql = format!(
1728 r#"
1729 SELECT i.instance_id
1730 FROM {}.instances i
1731 LEFT JOIN {}.executions e ON i.instance_id = e.instance_id
1732 AND i.current_execution_id = e.execution_id
1733 WHERE i.parent_instance_id IS NULL
1734 AND e.status IN ('Completed', 'Failed', 'ContinuedAsNew')
1735 "#,
1736 self.schema_name, self.schema_name
1737 );
1738
1739 if let Some(ref ids) = filter.instance_ids {
1741 if ids.is_empty() {
1742 return Ok(DeleteInstanceResult::default());
1743 }
1744 let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1745 sql.push_str(&format!(
1746 " AND i.instance_id IN ({})",
1747 placeholders.join(", ")
1748 ));
1749 }
1750
1751 if filter.completed_before.is_some() {
1753 let param_num = filter
1754 .instance_ids
1755 .as_ref()
1756 .map(|ids| ids.len())
1757 .unwrap_or(0)
1758 + 1;
1759 sql.push_str(&format!(
1760 " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1761 param_num
1762 ));
1763 }
1764
1765 let limit = filter.limit.unwrap_or(1000);
1767 let limit_param_num = filter
1768 .instance_ids
1769 .as_ref()
1770 .map(|ids| ids.len())
1771 .unwrap_or(0)
1772 + if filter.completed_before.is_some() {
1773 1
1774 } else {
1775 0
1776 }
1777 + 1;
1778 sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1779
1780 let mut query = sqlx::query_scalar::<_, String>(&sql);
1782 if let Some(ref ids) = filter.instance_ids {
1783 for id in ids {
1784 query = query.bind(id);
1785 }
1786 }
1787 if let Some(completed_before) = filter.completed_before {
1788 query = query.bind(completed_before as i64);
1789 }
1790 query = query.bind(limit as i64);
1791
1792 let instance_ids: Vec<String> = query
1793 .fetch_all(&*self.pool)
1794 .await
1795 .map_err(|e| Self::sqlx_to_provider_error("delete_instance_bulk", e))?;
1796
1797 if instance_ids.is_empty() {
1798 return Ok(DeleteInstanceResult::default());
1799 }
1800
1801 let mut result = DeleteInstanceResult::default();
1803
1804 for instance_id in &instance_ids {
1805 let tree = self.get_instance_tree(instance_id).await?;
1807
1808 let delete_result = self.delete_instances_atomic(&tree.all_ids, true).await?;
1810 result.instances_deleted += delete_result.instances_deleted;
1811 result.executions_deleted += delete_result.executions_deleted;
1812 result.events_deleted += delete_result.events_deleted;
1813 result.queue_messages_deleted += delete_result.queue_messages_deleted;
1814 }
1815
1816 debug!(
1817 target = "duroxide::providers::postgres",
1818 operation = "delete_instance_bulk",
1819 instances_deleted = result.instances_deleted,
1820 executions_deleted = result.executions_deleted,
1821 events_deleted = result.events_deleted,
1822 queue_messages_deleted = result.queue_messages_deleted,
1823 "Bulk deleted instances"
1824 );
1825
1826 Ok(result)
1827 }
1828
1829 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1832 async fn prune_executions(
1833 &self,
1834 instance_id: &str,
1835 options: PruneOptions,
1836 ) -> Result<PruneResult, ProviderError> {
1837 let keep_last: Option<i32> = options.keep_last.map(|v| v as i32);
1838 let completed_before_ms: Option<i64> = options.completed_before.map(|v| v as i64);
1839
1840 let row: Option<(i64, i64, i64)> = sqlx::query_as(&format!(
1841 "SELECT * FROM {}.prune_executions($1, $2, $3)",
1842 self.schema_name
1843 ))
1844 .bind(instance_id)
1845 .bind(keep_last)
1846 .bind(completed_before_ms)
1847 .fetch_optional(&*self.pool)
1848 .await
1849 .map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?;
1850
1851 let (instances_processed, executions_deleted, events_deleted) = row.unwrap_or((0, 0, 0));
1852
1853 debug!(
1854 target = "duroxide::providers::postgres",
1855 operation = "prune_executions",
1856 instance_id = %instance_id,
1857 instances_processed = instances_processed,
1858 executions_deleted = executions_deleted,
1859 events_deleted = events_deleted,
1860 "Pruned executions"
1861 );
1862
1863 Ok(PruneResult {
1864 instances_processed: instances_processed as u64,
1865 executions_deleted: executions_deleted as u64,
1866 events_deleted: events_deleted as u64,
1867 })
1868 }
1869
1870 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1871 async fn prune_executions_bulk(
1872 &self,
1873 filter: InstanceFilter,
1874 options: PruneOptions,
1875 ) -> Result<PruneResult, ProviderError> {
1876 let mut sql = format!(
1881 r#"
1882 SELECT i.instance_id
1883 FROM {}.instances i
1884 LEFT JOIN {}.executions e ON i.instance_id = e.instance_id
1885 AND i.current_execution_id = e.execution_id
1886 WHERE 1=1
1887 "#,
1888 self.schema_name, self.schema_name
1889 );
1890
1891 if let Some(ref ids) = filter.instance_ids {
1893 if ids.is_empty() {
1894 return Ok(PruneResult::default());
1895 }
1896 let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1897 sql.push_str(&format!(
1898 " AND i.instance_id IN ({})",
1899 placeholders.join(", ")
1900 ));
1901 }
1902
1903 if filter.completed_before.is_some() {
1905 let param_num = filter
1906 .instance_ids
1907 .as_ref()
1908 .map(|ids| ids.len())
1909 .unwrap_or(0)
1910 + 1;
1911 sql.push_str(&format!(
1912 " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1913 param_num
1914 ));
1915 }
1916
1917 let limit = filter.limit.unwrap_or(1000);
1919 let limit_param_num = filter
1920 .instance_ids
1921 .as_ref()
1922 .map(|ids| ids.len())
1923 .unwrap_or(0)
1924 + if filter.completed_before.is_some() {
1925 1
1926 } else {
1927 0
1928 }
1929 + 1;
1930 sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1931
1932 let mut query = sqlx::query_scalar::<_, String>(&sql);
1934 if let Some(ref ids) = filter.instance_ids {
1935 for id in ids {
1936 query = query.bind(id);
1937 }
1938 }
1939 if let Some(completed_before) = filter.completed_before {
1940 query = query.bind(completed_before as i64);
1941 }
1942 query = query.bind(limit as i64);
1943
1944 let instance_ids: Vec<String> = query
1945 .fetch_all(&*self.pool)
1946 .await
1947 .map_err(|e| Self::sqlx_to_provider_error("prune_executions_bulk", e))?;
1948
1949 let mut result = PruneResult::default();
1951
1952 for instance_id in &instance_ids {
1953 let single_result = self.prune_executions(instance_id, options.clone()).await?;
1954 result.instances_processed += single_result.instances_processed;
1955 result.executions_deleted += single_result.executions_deleted;
1956 result.events_deleted += single_result.events_deleted;
1957 }
1958
1959 debug!(
1960 target = "duroxide::providers::postgres",
1961 operation = "prune_executions_bulk",
1962 instances_processed = result.instances_processed,
1963 executions_deleted = result.executions_deleted,
1964 events_deleted = result.events_deleted,
1965 "Bulk pruned executions"
1966 );
1967
1968 Ok(result)
1969 }
1970}