1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4 DeleteInstanceResult, DispatcherCapabilityFilter, ExecutionInfo,
5 ExecutionMetadata, InstanceFilter, InstanceInfo, OrchestrationItem, Provider, ProviderAdmin,
6 ProviderError, PruneOptions, PruneResult, QueueDepths, ScheduledActivityIdentifier,
7 SessionFetchConfig, SystemMetrics, WorkItem,
8};
9use duroxide::{Event, EventKind};
10use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
11use std::sync::Arc;
12use std::time::Duration;
13use std::time::{SystemTime, UNIX_EPOCH};
14use tokio::time::sleep;
15use tracing::{debug, error, instrument, warn};
16
17use crate::migrations::MigrationRunner;
18
19pub struct PostgresProvider {
42 pool: Arc<PgPool>,
43 schema_name: String,
44}
45
46impl PostgresProvider {
47 pub async fn new(database_url: &str) -> Result<Self> {
48 Self::new_with_schema(database_url, None).await
49 }
50
51 pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
52 let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
53 .ok()
54 .and_then(|s| s.parse::<u32>().ok())
55 .unwrap_or(10);
56
57 let pool = PgPoolOptions::new()
58 .max_connections(max_connections)
59 .min_connections(1)
60 .acquire_timeout(std::time::Duration::from_secs(30))
61 .connect(database_url)
62 .await?;
63
64 let schema_name = schema_name.unwrap_or("public").to_string();
65
66 let provider = Self {
67 pool: Arc::new(pool),
68 schema_name: schema_name.clone(),
69 };
70
71 let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
73 migration_runner.migrate().await?;
74
75 Ok(provider)
76 }
77
78 #[instrument(skip(self), target = "duroxide::providers::postgres")]
79 pub async fn initialize_schema(&self) -> Result<()> {
80 let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
83 migration_runner.migrate().await?;
84 Ok(())
85 }
86
87 fn now_millis() -> i64 {
89 SystemTime::now()
90 .duration_since(UNIX_EPOCH)
91 .unwrap()
92 .as_millis() as i64
93 }
94
95
96
97 fn table_name(&self, table: &str) -> String {
99 format!("{}.{}", self.schema_name, table)
100 }
101
102 pub fn pool(&self) -> &PgPool {
104 &self.pool
105 }
106
107 pub fn schema_name(&self) -> &str {
109 &self.schema_name
110 }
111
112 fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
114 match e {
115 SqlxError::Database(ref db_err) => {
116 let code_opt = db_err.code();
118 let code = code_opt.as_deref();
119 if code == Some("40P01") {
120 ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
122 } else if code == Some("40001") {
123 ProviderError::permanent(operation, format!("Serialization failure: {e}"))
125 } else if code == Some("23505") {
126 ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
128 } else if code == Some("23503") {
129 ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
131 } else {
132 ProviderError::permanent(operation, format!("Database error: {e}"))
133 }
134 }
135 SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
136 ProviderError::retryable(operation, format!("Connection pool error: {e}"))
137 }
138 SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
139 _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
140 }
141 }
142
143 pub async fn cleanup_schema(&self) -> Result<()> {
148 sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
150 .execute(&*self.pool)
151 .await?;
152
153 if self.schema_name != "public" {
156 sqlx::query(&format!(
157 "DROP SCHEMA IF EXISTS {} CASCADE",
158 self.schema_name
159 ))
160 .execute(&*self.pool)
161 .await?;
162 } else {
163 }
166
167 Ok(())
168 }
169}
170
171#[async_trait::async_trait]
172impl Provider for PostgresProvider {
173 fn name(&self) -> &str {
174 "duroxide-pg"
175 }
176
177 fn version(&self) -> &str {
178 env!("CARGO_PKG_VERSION")
179 }
180
181 #[instrument(skip(self), target = "duroxide::providers::postgres")]
182 async fn fetch_orchestration_item(
183 &self,
184 lock_timeout: Duration,
185 _poll_timeout: Duration,
186 filter: Option<&DispatcherCapabilityFilter>,
187 ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
188 let start = std::time::Instant::now();
189
190 const MAX_RETRIES: u32 = 3;
191 const RETRY_DELAY_MS: u64 = 50;
192
193 let lock_timeout_ms = lock_timeout.as_millis() as i64;
195 let mut _last_error: Option<ProviderError> = None;
196
197 let (min_packed, max_packed) = if let Some(f) = filter {
199 if let Some(range) = f.supported_duroxide_versions.first() {
200 let min = range.min.major as i64 * 1_000_000 + range.min.minor as i64 * 1_000 + range.min.patch as i64;
201 let max = range.max.major as i64 * 1_000_000 + range.max.minor as i64 * 1_000 + range.max.patch as i64;
202 (Some(min), Some(max))
203 } else {
204 return Ok(None);
206 }
207 } else {
208 (None, None)
209 };
210
211 for attempt in 0..=MAX_RETRIES {
212 let now_ms = Self::now_millis();
213
214 let result: Result<
215 Option<(
216 String,
217 String,
218 String,
219 i64,
220 serde_json::Value,
221 serde_json::Value,
222 String,
223 i32,
224 )>,
225 SqlxError,
226 > = sqlx::query_as(&format!(
227 "SELECT * FROM {}.fetch_orchestration_item($1, $2, $3, $4)",
228 self.schema_name
229 ))
230 .bind(now_ms)
231 .bind(lock_timeout_ms)
232 .bind(min_packed)
233 .bind(max_packed)
234 .fetch_optional(&*self.pool)
235 .await;
236
237 let row = match result {
238 Ok(r) => r,
239 Err(e) => {
240 let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
241 if provider_err.is_retryable() && attempt < MAX_RETRIES {
242 warn!(
243 target = "duroxide::providers::postgres",
244 operation = "fetch_orchestration_item",
245 attempt = attempt + 1,
246 error = %provider_err,
247 "Retryable error, will retry"
248 );
249 _last_error = Some(provider_err);
250 sleep(std::time::Duration::from_millis(
251 RETRY_DELAY_MS * (attempt as u64 + 1),
252 ))
253 .await;
254 continue;
255 }
256 return Err(provider_err);
257 }
258 };
259
260 if let Some((
261 instance_id,
262 orchestration_name,
263 orchestration_version,
264 execution_id,
265 history_json,
266 messages_json,
267 lock_token,
268 attempt_count,
269 )) = row
270 {
271 let (history, history_error) = match serde_json::from_value::<Vec<Event>>(history_json) {
272 Ok(h) => (h, None),
273 Err(e) => {
274 let error_msg = format!("Failed to deserialize history: {e}");
275 warn!(
276 target = "duroxide::providers::postgres",
277 instance = %instance_id,
278 error = %error_msg,
279 "History deserialization failed, returning item with history_error"
280 );
281 (vec![], Some(error_msg))
282 }
283 };
284
285 let messages: Vec<WorkItem> =
286 serde_json::from_value(messages_json).map_err(|e| {
287 ProviderError::permanent(
288 "fetch_orchestration_item",
289 format!("Failed to deserialize messages: {e}"),
290 )
291 })?;
292
293 let duration_ms = start.elapsed().as_millis() as u64;
294 debug!(
295 target = "duroxide::providers::postgres",
296 operation = "fetch_orchestration_item",
297 instance_id = %instance_id,
298 execution_id = execution_id,
299 message_count = messages.len(),
300 history_count = history.len(),
301 attempt_count = attempt_count,
302 duration_ms = duration_ms,
303 attempts = attempt + 1,
304 "Fetched orchestration item via stored procedure"
305 );
306
307 return Ok(Some((
308 OrchestrationItem {
309 instance: instance_id,
310 orchestration_name,
311 execution_id: execution_id as u64,
312 version: orchestration_version,
313 history,
314 messages,
315 history_error,
316 },
317 lock_token,
318 attempt_count as u32,
319 )));
320 }
321
322 return Ok(None);
325 }
326
327 Ok(None)
328 }
329 #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
330 async fn ack_orchestration_item(
331 &self,
332 lock_token: &str,
333 execution_id: u64,
334 history_delta: Vec<Event>,
335 worker_items: Vec<WorkItem>,
336 orchestrator_items: Vec<WorkItem>,
337 metadata: ExecutionMetadata,
338 cancelled_activities: Vec<ScheduledActivityIdentifier>,
339 ) -> Result<(), ProviderError> {
340 let start = std::time::Instant::now();
341
342 const MAX_RETRIES: u32 = 3;
343 const RETRY_DELAY_MS: u64 = 50;
344
345 let mut history_delta_payload = Vec::with_capacity(history_delta.len());
346 for event in &history_delta {
347 if event.event_id() == 0 {
348 return Err(ProviderError::permanent(
349 "ack_orchestration_item",
350 "event_id must be set by runtime",
351 ));
352 }
353
354 let event_json = serde_json::to_string(event).map_err(|e| {
355 ProviderError::permanent(
356 "ack_orchestration_item",
357 format!("Failed to serialize event: {e}"),
358 )
359 })?;
360
361 let event_type = format!("{event:?}")
362 .split('{')
363 .next()
364 .unwrap_or("Unknown")
365 .trim()
366 .to_string();
367
368 history_delta_payload.push(serde_json::json!({
369 "event_id": event.event_id(),
370 "event_type": event_type,
371 "event_data": event_json,
372 }));
373 }
374
375 let history_delta_json = serde_json::Value::Array(history_delta_payload);
376
377 let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
378 ProviderError::permanent(
379 "ack_orchestration_item",
380 format!("Failed to serialize worker items: {e}"),
381 )
382 })?;
383
384 let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
385 ProviderError::permanent(
386 "ack_orchestration_item",
387 format!("Failed to serialize orchestrator items: {e}"),
388 )
389 })?;
390
391 let (custom_status_action, custom_status_value): (Option<&str>, Option<&str>) = {
393 let mut last_status: Option<&Option<String>> = None;
394 for event in &history_delta {
395 if let EventKind::CustomStatusUpdated { ref status } = event.kind {
396 last_status = Some(status);
397 }
398 }
399 match last_status {
400 Some(Some(s)) => (Some("set"), Some(s.as_str())),
401 Some(None) => (Some("clear"), None),
402 None => (None, None),
403 }
404 };
405
406 let metadata_json = serde_json::json!({
407 "orchestration_name": metadata.orchestration_name,
408 "orchestration_version": metadata.orchestration_version,
409 "status": metadata.status,
410 "output": metadata.output,
411 "parent_instance_id": metadata.parent_instance_id,
412 "pinned_duroxide_version": metadata.pinned_duroxide_version.as_ref().map(|v| {
413 serde_json::json!({
414 "major": v.major,
415 "minor": v.minor,
416 "patch": v.patch,
417 })
418 }),
419 "custom_status_action": custom_status_action,
420 "custom_status_value": custom_status_value,
421 });
422
423 let cancelled_activities_json: Vec<serde_json::Value> = cancelled_activities
425 .iter()
426 .map(|a| {
427 serde_json::json!({
428 "instance": a.instance,
429 "execution_id": a.execution_id,
430 "activity_id": a.activity_id,
431 })
432 })
433 .collect();
434 let cancelled_activities_json = serde_json::Value::Array(cancelled_activities_json);
435
436 for attempt in 0..=MAX_RETRIES {
437 let now_ms = Self::now_millis();
438 let result = sqlx::query(&format!(
439 "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6, $7, $8)",
440 self.schema_name
441 ))
442 .bind(lock_token)
443 .bind(now_ms)
444 .bind(execution_id as i64)
445 .bind(&history_delta_json)
446 .bind(&worker_items_json)
447 .bind(&orchestrator_items_json)
448 .bind(&metadata_json)
449 .bind(&cancelled_activities_json)
450 .execute(&*self.pool)
451 .await;
452
453 match result {
454 Ok(_) => {
455 let duration_ms = start.elapsed().as_millis() as u64;
456 debug!(
457 target = "duroxide::providers::postgres",
458 operation = "ack_orchestration_item",
459 execution_id = execution_id,
460 history_count = history_delta.len(),
461 worker_items_count = worker_items.len(),
462 orchestrator_items_count = orchestrator_items.len(),
463 cancelled_activities_count = cancelled_activities.len(),
464 duration_ms = duration_ms,
465 attempts = attempt + 1,
466 "Acknowledged orchestration item via stored procedure"
467 );
468 return Ok(());
469 }
470 Err(e) => {
471 if let SqlxError::Database(db_err) = &e {
473 if db_err.message().contains("Invalid lock token") {
474 return Err(ProviderError::permanent(
475 "ack_orchestration_item",
476 "Invalid lock token",
477 ));
478 }
479 } else if e.to_string().contains("Invalid lock token") {
480 return Err(ProviderError::permanent(
481 "ack_orchestration_item",
482 "Invalid lock token",
483 ));
484 }
485
486 let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
487 if provider_err.is_retryable() && attempt < MAX_RETRIES {
488 warn!(
489 target = "duroxide::providers::postgres",
490 operation = "ack_orchestration_item",
491 attempt = attempt + 1,
492 error = %provider_err,
493 "Retryable error, will retry"
494 );
495 sleep(std::time::Duration::from_millis(
496 RETRY_DELAY_MS * (attempt as u64 + 1),
497 ))
498 .await;
499 continue;
500 }
501 return Err(provider_err);
502 }
503 }
504 }
505
506 Ok(())
508 }
509 #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
510 async fn abandon_orchestration_item(
511 &self,
512 lock_token: &str,
513 delay: Option<Duration>,
514 ignore_attempt: bool,
515 ) -> Result<(), ProviderError> {
516 let start = std::time::Instant::now();
517 let now_ms = Self::now_millis();
518 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
519
520 let instance_id = match sqlx::query_scalar::<_, String>(&format!(
521 "SELECT {}.abandon_orchestration_item($1, $2, $3, $4)",
522 self.schema_name
523 ))
524 .bind(lock_token)
525 .bind(now_ms)
526 .bind(delay_param)
527 .bind(ignore_attempt)
528 .fetch_one(&*self.pool)
529 .await
530 {
531 Ok(instance_id) => instance_id,
532 Err(e) => {
533 if let SqlxError::Database(db_err) = &e {
534 if db_err.message().contains("Invalid lock token") {
535 return Err(ProviderError::permanent(
536 "abandon_orchestration_item",
537 "Invalid lock token",
538 ));
539 }
540 } else if e.to_string().contains("Invalid lock token") {
541 return Err(ProviderError::permanent(
542 "abandon_orchestration_item",
543 "Invalid lock token",
544 ));
545 }
546
547 return Err(Self::sqlx_to_provider_error(
548 "abandon_orchestration_item",
549 e,
550 ));
551 }
552 };
553
554 let duration_ms = start.elapsed().as_millis() as u64;
555 debug!(
556 target = "duroxide::providers::postgres",
557 operation = "abandon_orchestration_item",
558 instance_id = %instance_id,
559 delay_ms = delay.map(|d| d.as_millis() as u64),
560 ignore_attempt = ignore_attempt,
561 duration_ms = duration_ms,
562 "Abandoned orchestration item via stored procedure"
563 );
564
565 Ok(())
566 }
567
568 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
569 async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
570 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
571 "SELECT out_event_data FROM {}.fetch_history($1)",
572 self.schema_name
573 ))
574 .bind(instance)
575 .fetch_all(&*self.pool)
576 .await
577 .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
578
579 Ok(event_data_rows
580 .into_iter()
581 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
582 .collect())
583 }
584
585 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
586 async fn append_with_execution(
587 &self,
588 instance: &str,
589 execution_id: u64,
590 new_events: Vec<Event>,
591 ) -> Result<(), ProviderError> {
592 if new_events.is_empty() {
593 return Ok(());
594 }
595
596 let mut events_payload = Vec::with_capacity(new_events.len());
597 for event in &new_events {
598 if event.event_id() == 0 {
599 error!(
600 target = "duroxide::providers::postgres",
601 operation = "append_with_execution",
602 error_type = "validation_error",
603 instance_id = %instance,
604 execution_id = execution_id,
605 "event_id must be set by runtime"
606 );
607 return Err(ProviderError::permanent(
608 "append_with_execution",
609 "event_id must be set by runtime",
610 ));
611 }
612
613 let event_json = serde_json::to_string(event).map_err(|e| {
614 ProviderError::permanent(
615 "append_with_execution",
616 format!("Failed to serialize event: {e}"),
617 )
618 })?;
619
620 let event_type = format!("{event:?}")
621 .split('{')
622 .next()
623 .unwrap_or("Unknown")
624 .trim()
625 .to_string();
626
627 events_payload.push(serde_json::json!({
628 "event_id": event.event_id(),
629 "event_type": event_type,
630 "event_data": event_json,
631 }));
632 }
633
634 let events_json = serde_json::Value::Array(events_payload);
635
636 sqlx::query(&format!(
637 "SELECT {}.append_history($1, $2, $3)",
638 self.schema_name
639 ))
640 .bind(instance)
641 .bind(execution_id as i64)
642 .bind(events_json)
643 .execute(&*self.pool)
644 .await
645 .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
646
647 debug!(
648 target = "duroxide::providers::postgres",
649 operation = "append_with_execution",
650 instance_id = %instance,
651 execution_id = execution_id,
652 event_count = new_events.len(),
653 "Appended history events via stored procedure"
654 );
655
656 Ok(())
657 }
658
659 #[instrument(skip(self), target = "duroxide::providers::postgres")]
660 async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
661 let work_item = serde_json::to_string(&item).map_err(|e| {
662 ProviderError::permanent(
663 "enqueue_worker_work",
664 format!("Failed to serialize work item: {e}"),
665 )
666 })?;
667
668 let now_ms = Self::now_millis();
669
670 let (instance_id, execution_id, activity_id, session_id) = match &item {
672 WorkItem::ActivityExecute {
673 instance,
674 execution_id,
675 id,
676 session_id,
677 ..
678 } => (
679 Some(instance.clone()),
680 Some(*execution_id as i64),
681 Some(*id as i64),
682 session_id.clone(),
683 ),
684 _ => (None, None, None, None),
685 };
686
687 sqlx::query(&format!(
688 "SELECT {}.enqueue_worker_work($1, $2, $3, $4, $5, $6)",
689 self.schema_name
690 ))
691 .bind(work_item)
692 .bind(now_ms)
693 .bind(&instance_id)
694 .bind(execution_id)
695 .bind(activity_id)
696 .bind(&session_id)
697 .execute(&*self.pool)
698 .await
699 .map_err(|e| {
700 error!(
701 target = "duroxide::providers::postgres",
702 operation = "enqueue_worker_work",
703 error_type = "database_error",
704 error = %e,
705 "Failed to enqueue worker work"
706 );
707 Self::sqlx_to_provider_error("enqueue_worker_work", e)
708 })?;
709
710 Ok(())
711 }
712
713 #[instrument(skip(self), target = "duroxide::providers::postgres")]
714 async fn fetch_work_item(
715 &self,
716 lock_timeout: Duration,
717 _poll_timeout: Duration,
718 session: Option<&SessionFetchConfig>,
719 ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
720 let start = std::time::Instant::now();
721
722 let lock_timeout_ms = lock_timeout.as_millis() as i64;
724
725 let (owner_id, session_lock_timeout_ms): (Option<&str>, Option<i64>) = match session {
727 Some(config) => (
728 Some(&config.owner_id),
729 Some(config.lock_timeout.as_millis() as i64),
730 ),
731 None => (None, None),
732 };
733
734 let row = match sqlx::query_as::<_, (String, String, i32)>(&format!(
735 "SELECT * FROM {}.fetch_work_item($1, $2, $3, $4)",
736 self.schema_name
737 ))
738 .bind(Self::now_millis())
739 .bind(lock_timeout_ms)
740 .bind(owner_id)
741 .bind(session_lock_timeout_ms)
742 .fetch_optional(&*self.pool)
743 .await
744 {
745 Ok(row) => row,
746 Err(e) => {
747 return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
748 }
749 };
750
751 let (work_item_json, lock_token, attempt_count) = match row {
752 Some(row) => row,
753 None => return Ok(None),
754 };
755
756 let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
757 ProviderError::permanent(
758 "fetch_work_item",
759 format!("Failed to deserialize worker item: {e}"),
760 )
761 })?;
762
763 let duration_ms = start.elapsed().as_millis() as u64;
764
765 let instance_id = match &work_item {
767 WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
768 WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
769 WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
770 WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
771 WorkItem::TimerFired { instance, .. } => instance.as_str(),
772 WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
773 WorkItem::CancelInstance { instance, .. } => instance.as_str(),
774 WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
775 WorkItem::SubOrchCompleted {
776 parent_instance, ..
777 } => parent_instance.as_str(),
778 WorkItem::SubOrchFailed {
779 parent_instance, ..
780 } => parent_instance.as_str(),
781 WorkItem::QueueMessage { instance, .. } => instance.as_str(),
782 };
783
784 debug!(
785 target = "duroxide::providers::postgres",
786 operation = "fetch_work_item",
787 instance_id = %instance_id,
788 attempt_count = attempt_count,
789 duration_ms = duration_ms,
790 "Fetched activity work item via stored procedure"
791 );
792
793 Ok(Some((
794 work_item,
795 lock_token,
796 attempt_count as u32,
797 )))
798 }
799
800 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
801 async fn ack_work_item(
802 &self,
803 token: &str,
804 completion: Option<WorkItem>,
805 ) -> Result<(), ProviderError> {
806 let start = std::time::Instant::now();
807
808 let Some(completion) = completion else {
810 let now_ms = Self::now_millis();
811 sqlx::query(&format!(
813 "SELECT {}.ack_worker($1, NULL, NULL, $2)",
814 self.schema_name
815 ))
816 .bind(token)
817 .bind(now_ms)
818 .execute(&*self.pool)
819 .await
820 .map_err(|e| {
821 if e.to_string().contains("Worker queue item not found") {
822 ProviderError::permanent(
823 "ack_worker",
824 "Worker queue item not found or already processed",
825 )
826 } else {
827 Self::sqlx_to_provider_error("ack_worker", e)
828 }
829 })?;
830
831 let duration_ms = start.elapsed().as_millis() as u64;
832 debug!(
833 target = "duroxide::providers::postgres",
834 operation = "ack_worker",
835 token = %token,
836 duration_ms = duration_ms,
837 "Acknowledged worker without completion (cancelled)"
838 );
839 return Ok(());
840 };
841
842 let instance_id = match &completion {
844 WorkItem::ActivityCompleted { instance, .. }
845 | WorkItem::ActivityFailed { instance, .. } => instance,
846 _ => {
847 error!(
848 target = "duroxide::providers::postgres",
849 operation = "ack_worker",
850 error_type = "invalid_completion_type",
851 "Invalid completion work item type"
852 );
853 return Err(ProviderError::permanent(
854 "ack_worker",
855 "Invalid completion work item type",
856 ));
857 }
858 };
859
860 let completion_json = serde_json::to_string(&completion).map_err(|e| {
861 ProviderError::permanent("ack_worker", format!("Failed to serialize completion: {e}"))
862 })?;
863
864 let now_ms = Self::now_millis();
865
866 sqlx::query(&format!(
868 "SELECT {}.ack_worker($1, $2, $3, $4)",
869 self.schema_name
870 ))
871 .bind(token)
872 .bind(instance_id)
873 .bind(completion_json)
874 .bind(now_ms)
875 .execute(&*self.pool)
876 .await
877 .map_err(|e| {
878 if e.to_string().contains("Worker queue item not found") {
879 error!(
880 target = "duroxide::providers::postgres",
881 operation = "ack_worker",
882 error_type = "worker_item_not_found",
883 token = %token,
884 "Worker queue item not found or already processed"
885 );
886 ProviderError::permanent(
887 "ack_worker",
888 "Worker queue item not found or already processed",
889 )
890 } else {
891 Self::sqlx_to_provider_error("ack_worker", e)
892 }
893 })?;
894
895 let duration_ms = start.elapsed().as_millis() as u64;
896 debug!(
897 target = "duroxide::providers::postgres",
898 operation = "ack_worker",
899 instance_id = %instance_id,
900 duration_ms = duration_ms,
901 "Acknowledged worker and enqueued completion"
902 );
903
904 Ok(())
905 }
906
907 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
908 async fn renew_work_item_lock(
909 &self,
910 token: &str,
911 extend_for: Duration,
912 ) -> Result<(), ProviderError> {
913 let start = std::time::Instant::now();
914
915 let now_ms = Self::now_millis();
917
918 let extend_secs = extend_for.as_secs() as i64;
920
921 match sqlx::query(&format!(
922 "SELECT {}.renew_work_item_lock($1, $2, $3)",
923 self.schema_name
924 ))
925 .bind(token)
926 .bind(now_ms)
927 .bind(extend_secs)
928 .execute(&*self.pool)
929 .await
930 {
931 Ok(_) => {
932 let duration_ms = start.elapsed().as_millis() as u64;
933 debug!(
934 target = "duroxide::providers::postgres",
935 operation = "renew_work_item_lock",
936 token = %token,
937 extend_for_secs = extend_secs,
938 duration_ms = duration_ms,
939 "Work item lock renewed successfully"
940 );
941 Ok(())
942 }
943 Err(e) => {
944 if let SqlxError::Database(db_err) = &e {
945 if db_err.message().contains("Lock token invalid") {
946 return Err(ProviderError::permanent(
947 "renew_work_item_lock",
948 "Lock token invalid, expired, or already acked",
949 ));
950 }
951 } else if e.to_string().contains("Lock token invalid") {
952 return Err(ProviderError::permanent(
953 "renew_work_item_lock",
954 "Lock token invalid, expired, or already acked",
955 ));
956 }
957
958 Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
959 }
960 }
961 }
962
963 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
964 async fn abandon_work_item(
965 &self,
966 token: &str,
967 delay: Option<Duration>,
968 ignore_attempt: bool,
969 ) -> Result<(), ProviderError> {
970 let start = std::time::Instant::now();
971 let now_ms = Self::now_millis();
972 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
973
974 match sqlx::query(&format!(
975 "SELECT {}.abandon_work_item($1, $2, $3, $4)",
976 self.schema_name
977 ))
978 .bind(token)
979 .bind(now_ms)
980 .bind(delay_param)
981 .bind(ignore_attempt)
982 .execute(&*self.pool)
983 .await
984 {
985 Ok(_) => {
986 let duration_ms = start.elapsed().as_millis() as u64;
987 debug!(
988 target = "duroxide::providers::postgres",
989 operation = "abandon_work_item",
990 token = %token,
991 delay_ms = delay.map(|d| d.as_millis() as u64),
992 ignore_attempt = ignore_attempt,
993 duration_ms = duration_ms,
994 "Abandoned work item via stored procedure"
995 );
996 Ok(())
997 }
998 Err(e) => {
999 if let SqlxError::Database(db_err) = &e {
1000 if db_err.message().contains("Invalid lock token")
1001 || db_err.message().contains("already acked")
1002 {
1003 return Err(ProviderError::permanent(
1004 "abandon_work_item",
1005 "Invalid lock token or already acked",
1006 ));
1007 }
1008 } else if e.to_string().contains("Invalid lock token")
1009 || e.to_string().contains("already acked")
1010 {
1011 return Err(ProviderError::permanent(
1012 "abandon_work_item",
1013 "Invalid lock token or already acked",
1014 ));
1015 }
1016
1017 Err(Self::sqlx_to_provider_error("abandon_work_item", e))
1018 }
1019 }
1020 }
1021
1022 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1023 async fn renew_orchestration_item_lock(
1024 &self,
1025 token: &str,
1026 extend_for: Duration,
1027 ) -> Result<(), ProviderError> {
1028 let start = std::time::Instant::now();
1029
1030 let now_ms = Self::now_millis();
1032
1033 let extend_secs = extend_for.as_secs() as i64;
1035
1036 match sqlx::query(&format!(
1037 "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
1038 self.schema_name
1039 ))
1040 .bind(token)
1041 .bind(now_ms)
1042 .bind(extend_secs)
1043 .execute(&*self.pool)
1044 .await
1045 {
1046 Ok(_) => {
1047 let duration_ms = start.elapsed().as_millis() as u64;
1048 debug!(
1049 target = "duroxide::providers::postgres",
1050 operation = "renew_orchestration_item_lock",
1051 token = %token,
1052 extend_for_secs = extend_secs,
1053 duration_ms = duration_ms,
1054 "Orchestration item lock renewed successfully"
1055 );
1056 Ok(())
1057 }
1058 Err(e) => {
1059 if let SqlxError::Database(db_err) = &e {
1060 if db_err.message().contains("Lock token invalid")
1061 || db_err.message().contains("expired")
1062 || db_err.message().contains("already released")
1063 {
1064 return Err(ProviderError::permanent(
1065 "renew_orchestration_item_lock",
1066 "Lock token invalid, expired, or already released",
1067 ));
1068 }
1069 } else if e.to_string().contains("Lock token invalid")
1070 || e.to_string().contains("expired")
1071 || e.to_string().contains("already released")
1072 {
1073 return Err(ProviderError::permanent(
1074 "renew_orchestration_item_lock",
1075 "Lock token invalid, expired, or already released",
1076 ));
1077 }
1078
1079 Err(Self::sqlx_to_provider_error(
1080 "renew_orchestration_item_lock",
1081 e,
1082 ))
1083 }
1084 }
1085 }
1086
1087 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1088 async fn enqueue_for_orchestrator(
1089 &self,
1090 item: WorkItem,
1091 delay: Option<Duration>,
1092 ) -> Result<(), ProviderError> {
1093 let work_item = serde_json::to_string(&item).map_err(|e| {
1094 ProviderError::permanent(
1095 "enqueue_orchestrator_work",
1096 format!("Failed to serialize work item: {e}"),
1097 )
1098 })?;
1099
1100 let instance_id = match &item {
1102 WorkItem::StartOrchestration { instance, .. }
1103 | WorkItem::ActivityCompleted { instance, .. }
1104 | WorkItem::ActivityFailed { instance, .. }
1105 | WorkItem::TimerFired { instance, .. }
1106 | WorkItem::ExternalRaised { instance, .. }
1107 | WorkItem::CancelInstance { instance, .. }
1108 | WorkItem::ContinueAsNew { instance, .. }
1109 | WorkItem::QueueMessage { instance, .. } => instance,
1110 WorkItem::SubOrchCompleted {
1111 parent_instance, ..
1112 }
1113 | WorkItem::SubOrchFailed {
1114 parent_instance, ..
1115 } => parent_instance,
1116 WorkItem::ActivityExecute { .. } => {
1117 return Err(ProviderError::permanent(
1118 "enqueue_orchestrator_work",
1119 "ActivityExecute should go to worker queue, not orchestrator queue",
1120 ));
1121 }
1122 };
1123
1124 let now_ms = Self::now_millis();
1126
1127 let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
1128 if *fire_at_ms > 0 {
1129 if let Some(delay) = delay {
1131 std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
1132 } else {
1133 *fire_at_ms
1134 }
1135 } else {
1136 delay
1138 .map(|d| now_ms as u64 + d.as_millis() as u64)
1139 .unwrap_or(now_ms as u64)
1140 }
1141 } else {
1142 delay
1144 .map(|d| now_ms as u64 + d.as_millis() as u64)
1145 .unwrap_or(now_ms as u64)
1146 };
1147
1148 let visible_at = Utc
1149 .timestamp_millis_opt(visible_at_ms as i64)
1150 .single()
1151 .ok_or_else(|| {
1152 ProviderError::permanent(
1153 "enqueue_orchestrator_work",
1154 "Invalid visible_at timestamp",
1155 )
1156 })?;
1157
1158 sqlx::query(&format!(
1163 "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
1164 self.schema_name
1165 ))
1166 .bind(instance_id)
1167 .bind(&work_item)
1168 .bind(visible_at)
1169 .bind::<Option<String>>(None) .bind::<Option<String>>(None) .bind::<Option<i64>>(None) .execute(&*self.pool)
1173 .await
1174 .map_err(|e| {
1175 error!(
1176 target = "duroxide::providers::postgres",
1177 operation = "enqueue_orchestrator_work",
1178 error_type = "database_error",
1179 error = %e,
1180 instance_id = %instance_id,
1181 "Failed to enqueue orchestrator work"
1182 );
1183 Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1184 })?;
1185
1186 debug!(
1187 target = "duroxide::providers::postgres",
1188 operation = "enqueue_orchestrator_work",
1189 instance_id = %instance_id,
1190 delay_ms = delay.map(|d| d.as_millis() as u64),
1191 "Enqueued orchestrator work"
1192 );
1193
1194 Ok(())
1195 }
1196
1197 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1198 async fn read_with_execution(
1199 &self,
1200 instance: &str,
1201 execution_id: u64,
1202 ) -> Result<Vec<Event>, ProviderError> {
1203 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1204 "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1205 self.table_name("history")
1206 ))
1207 .bind(instance)
1208 .bind(execution_id as i64)
1209 .fetch_all(&*self.pool)
1210 .await
1211 .ok()
1212 .unwrap_or_default();
1213
1214 Ok(event_data_rows
1215 .into_iter()
1216 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1217 .collect())
1218 }
1219
1220 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1221 async fn renew_session_lock(
1222 &self,
1223 owner_ids: &[&str],
1224 extend_for: Duration,
1225 idle_timeout: Duration,
1226 ) -> Result<usize, ProviderError> {
1227 if owner_ids.is_empty() {
1228 return Ok(0);
1229 }
1230
1231 let now_ms = Self::now_millis();
1232 let extend_ms = extend_for.as_millis() as i64;
1233 let idle_timeout_ms = idle_timeout.as_millis() as i64;
1234 let owner_ids_vec: Vec<&str> = owner_ids.to_vec();
1235
1236 let result = sqlx::query_scalar::<_, i64>(&format!(
1237 "SELECT {}.renew_session_lock($1, $2, $3, $4)",
1238 self.schema_name
1239 ))
1240 .bind(&owner_ids_vec)
1241 .bind(now_ms)
1242 .bind(extend_ms)
1243 .bind(idle_timeout_ms)
1244 .fetch_one(&*self.pool)
1245 .await
1246 .map_err(|e| Self::sqlx_to_provider_error("renew_session_lock", e))?;
1247
1248 debug!(
1249 target = "duroxide::providers::postgres",
1250 operation = "renew_session_lock",
1251 owner_count = owner_ids.len(),
1252 sessions_renewed = result,
1253 "Session locks renewed"
1254 );
1255
1256 Ok(result as usize)
1257 }
1258
1259 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1260 async fn cleanup_orphaned_sessions(
1261 &self,
1262 _idle_timeout: Duration,
1263 ) -> Result<usize, ProviderError> {
1264 let now_ms = Self::now_millis();
1265
1266 let result = sqlx::query_scalar::<_, i64>(&format!(
1267 "SELECT {}.cleanup_orphaned_sessions($1)",
1268 self.schema_name
1269 ))
1270 .bind(now_ms)
1271 .fetch_one(&*self.pool)
1272 .await
1273 .map_err(|e| Self::sqlx_to_provider_error("cleanup_orphaned_sessions", e))?;
1274
1275 debug!(
1276 target = "duroxide::providers::postgres",
1277 operation = "cleanup_orphaned_sessions",
1278 sessions_cleaned = result,
1279 "Orphaned sessions cleaned up"
1280 );
1281
1282 Ok(result as usize)
1283 }
1284
1285 fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1286 Some(self)
1287 }
1288
1289 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1290 async fn get_custom_status(
1291 &self,
1292 instance: &str,
1293 last_seen_version: u64,
1294 ) -> Result<Option<(Option<String>, u64)>, ProviderError> {
1295 let row = sqlx::query_as::<_, (Option<String>, i64)>(&format!(
1296 "SELECT * FROM {}.get_custom_status($1, $2)",
1297 self.schema_name
1298 ))
1299 .bind(instance)
1300 .bind(last_seen_version as i64)
1301 .fetch_optional(&*self.pool)
1302 .await
1303 .map_err(|e| Self::sqlx_to_provider_error("get_custom_status", e))?;
1304
1305 match row {
1306 Some((custom_status, version)) => Ok(Some((custom_status, version as u64))),
1307 None => Ok(None),
1308 }
1309 }
1310}
1311
1312#[async_trait::async_trait]
1313impl ProviderAdmin for PostgresProvider {
1314 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1315 async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1316 sqlx::query_scalar(&format!(
1317 "SELECT instance_id FROM {}.list_instances()",
1318 self.schema_name
1319 ))
1320 .fetch_all(&*self.pool)
1321 .await
1322 .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1323 }
1324
1325 #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1326 async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1327 sqlx::query_scalar(&format!(
1328 "SELECT instance_id FROM {}.list_instances_by_status($1)",
1329 self.schema_name
1330 ))
1331 .bind(status)
1332 .fetch_all(&*self.pool)
1333 .await
1334 .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1335 }
1336
1337 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1338 async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1339 let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1340 "SELECT execution_id FROM {}.list_executions($1)",
1341 self.schema_name
1342 ))
1343 .bind(instance)
1344 .fetch_all(&*self.pool)
1345 .await
1346 .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1347
1348 Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1349 }
1350
1351 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1352 async fn read_history_with_execution_id(
1353 &self,
1354 instance: &str,
1355 execution_id: u64,
1356 ) -> Result<Vec<Event>, ProviderError> {
1357 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1358 "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1359 self.schema_name
1360 ))
1361 .bind(instance)
1362 .bind(execution_id as i64)
1363 .fetch_all(&*self.pool)
1364 .await
1365 .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1366
1367 event_data_rows
1368 .into_iter()
1369 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1370 .collect::<Vec<Event>>()
1371 .into_iter()
1372 .map(Ok)
1373 .collect()
1374 }
1375
1376 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1377 async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1378 let execution_id = self.latest_execution_id(instance).await?;
1379 self.read_history_with_execution_id(instance, execution_id)
1380 .await
1381 }
1382
1383 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1384 async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1385 sqlx::query_scalar(&format!(
1386 "SELECT {}.latest_execution_id($1)",
1387 self.schema_name
1388 ))
1389 .bind(instance)
1390 .fetch_optional(&*self.pool)
1391 .await
1392 .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1393 .map(|id: i64| id as u64)
1394 .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1395 }
1396
1397 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1398 async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1399 let row: Option<(
1400 String,
1401 String,
1402 String,
1403 i64,
1404 chrono::DateTime<Utc>,
1405 Option<chrono::DateTime<Utc>>,
1406 Option<String>,
1407 Option<String>,
1408 Option<String>,
1409 )> = sqlx::query_as(&format!(
1410 "SELECT * FROM {}.get_instance_info($1)",
1411 self.schema_name
1412 ))
1413 .bind(instance)
1414 .fetch_optional(&*self.pool)
1415 .await
1416 .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1417
1418 let (
1419 instance_id,
1420 orchestration_name,
1421 orchestration_version,
1422 current_execution_id,
1423 created_at,
1424 updated_at,
1425 status,
1426 output,
1427 parent_instance_id,
1428 ) =
1429 row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1430
1431 Ok(InstanceInfo {
1432 instance_id,
1433 orchestration_name,
1434 orchestration_version,
1435 current_execution_id: current_execution_id as u64,
1436 status: status.unwrap_or_else(|| "Running".to_string()),
1437 output,
1438 created_at: created_at.timestamp_millis() as u64,
1439 updated_at: updated_at
1440 .map(|dt| dt.timestamp_millis() as u64)
1441 .unwrap_or(created_at.timestamp_millis() as u64),
1442 parent_instance_id,
1443 })
1444 }
1445
1446 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1447 async fn get_execution_info(
1448 &self,
1449 instance: &str,
1450 execution_id: u64,
1451 ) -> Result<ExecutionInfo, ProviderError> {
1452 let row: Option<(
1453 i64,
1454 String,
1455 Option<String>,
1456 chrono::DateTime<Utc>,
1457 Option<chrono::DateTime<Utc>>,
1458 i64,
1459 )> = sqlx::query_as(&format!(
1460 "SELECT * FROM {}.get_execution_info($1, $2)",
1461 self.schema_name
1462 ))
1463 .bind(instance)
1464 .bind(execution_id as i64)
1465 .fetch_optional(&*self.pool)
1466 .await
1467 .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1468
1469 let (exec_id, status, output, started_at, completed_at, event_count) = row
1470 .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1471
1472 Ok(ExecutionInfo {
1473 execution_id: exec_id as u64,
1474 status,
1475 output,
1476 started_at: started_at.timestamp_millis() as u64,
1477 completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1478 event_count: event_count as usize,
1479 })
1480 }
1481
1482 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1483 async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1484 let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1485 "SELECT * FROM {}.get_system_metrics()",
1486 self.schema_name
1487 ))
1488 .fetch_optional(&*self.pool)
1489 .await
1490 .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1491
1492 let (
1493 total_instances,
1494 total_executions,
1495 running_instances,
1496 completed_instances,
1497 failed_instances,
1498 total_events,
1499 ) = row.ok_or_else(|| {
1500 ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1501 })?;
1502
1503 Ok(SystemMetrics {
1504 total_instances: total_instances as u64,
1505 total_executions: total_executions as u64,
1506 running_instances: running_instances as u64,
1507 completed_instances: completed_instances as u64,
1508 failed_instances: failed_instances as u64,
1509 total_events: total_events as u64,
1510 })
1511 }
1512
1513 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1514 async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1515 let now_ms = Self::now_millis();
1516
1517 let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1518 "SELECT * FROM {}.get_queue_depths($1)",
1519 self.schema_name
1520 ))
1521 .bind(now_ms)
1522 .fetch_optional(&*self.pool)
1523 .await
1524 .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1525
1526 let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1527 ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1528 })?;
1529
1530 Ok(QueueDepths {
1531 orchestrator_queue: orchestrator_queue as usize,
1532 worker_queue: worker_queue as usize,
1533 timer_queue: 0, })
1535 }
1536
1537 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1540 async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError> {
1541 sqlx::query_scalar(&format!(
1542 "SELECT child_instance_id FROM {}.list_children($1)",
1543 self.schema_name
1544 ))
1545 .bind(instance_id)
1546 .fetch_all(&*self.pool)
1547 .await
1548 .map_err(|e| Self::sqlx_to_provider_error("list_children", e))
1549 }
1550
1551 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1552 async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError> {
1553 let result: Result<Option<String>, _> = sqlx::query_scalar(&format!(
1556 "SELECT {}.get_parent_id($1)",
1557 self.schema_name
1558 ))
1559 .bind(instance_id)
1560 .fetch_one(&*self.pool)
1561 .await;
1562
1563 match result {
1564 Ok(parent_id) => Ok(parent_id),
1565 Err(e) => {
1566 let err_str = e.to_string();
1567 if err_str.contains("Instance not found") {
1568 Err(ProviderError::permanent(
1569 "get_parent_id",
1570 format!("Instance not found: {}", instance_id),
1571 ))
1572 } else {
1573 Err(Self::sqlx_to_provider_error("get_parent_id", e))
1574 }
1575 }
1576 }
1577 }
1578
1579 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1582 async fn delete_instances_atomic(
1583 &self,
1584 ids: &[String],
1585 force: bool,
1586 ) -> Result<DeleteInstanceResult, ProviderError> {
1587 if ids.is_empty() {
1588 return Ok(DeleteInstanceResult::default());
1589 }
1590
1591 let row: Option<(i64, i64, i64, i64)> = sqlx::query_as(&format!(
1592 "SELECT * FROM {}.delete_instances_atomic($1, $2)",
1593 self.schema_name
1594 ))
1595 .bind(ids)
1596 .bind(force)
1597 .fetch_optional(&*self.pool)
1598 .await
1599 .map_err(|e| {
1600 let err_str = e.to_string();
1601 if err_str.contains("is Running") {
1602 ProviderError::permanent(
1603 "delete_instances_atomic",
1604 err_str,
1605 )
1606 } else if err_str.contains("Orphan detected") {
1607 ProviderError::permanent(
1608 "delete_instances_atomic",
1609 err_str,
1610 )
1611 } else {
1612 Self::sqlx_to_provider_error("delete_instances_atomic", e)
1613 }
1614 })?;
1615
1616 let (instances_deleted, executions_deleted, events_deleted, queue_messages_deleted) =
1617 row.unwrap_or((0, 0, 0, 0));
1618
1619 debug!(
1620 target = "duroxide::providers::postgres",
1621 operation = "delete_instances_atomic",
1622 instances_deleted = instances_deleted,
1623 executions_deleted = executions_deleted,
1624 events_deleted = events_deleted,
1625 queue_messages_deleted = queue_messages_deleted,
1626 "Deleted instances atomically"
1627 );
1628
1629 Ok(DeleteInstanceResult {
1630 instances_deleted: instances_deleted as u64,
1631 executions_deleted: executions_deleted as u64,
1632 events_deleted: events_deleted as u64,
1633 queue_messages_deleted: queue_messages_deleted as u64,
1634 })
1635 }
1636
1637 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1638 async fn delete_instance_bulk(
1639 &self,
1640 filter: InstanceFilter,
1641 ) -> Result<DeleteInstanceResult, ProviderError> {
1642 let mut sql = format!(
1644 r#"
1645 SELECT i.instance_id
1646 FROM {}.instances i
1647 LEFT JOIN {}.executions e ON i.instance_id = e.instance_id
1648 AND i.current_execution_id = e.execution_id
1649 WHERE i.parent_instance_id IS NULL
1650 AND e.status IN ('Completed', 'Failed', 'ContinuedAsNew')
1651 "#,
1652 self.schema_name, self.schema_name
1653 );
1654
1655 if let Some(ref ids) = filter.instance_ids {
1657 if ids.is_empty() {
1658 return Ok(DeleteInstanceResult::default());
1659 }
1660 let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1661 sql.push_str(&format!(
1662 " AND i.instance_id IN ({})",
1663 placeholders.join(", ")
1664 ));
1665 }
1666
1667 if filter.completed_before.is_some() {
1669 let param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0) + 1;
1670 sql.push_str(&format!(
1671 " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1672 param_num
1673 ));
1674 }
1675
1676 let limit = filter.limit.unwrap_or(1000);
1678 let limit_param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0)
1679 + if filter.completed_before.is_some() { 1 } else { 0 }
1680 + 1;
1681 sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1682
1683 let mut query = sqlx::query_scalar::<_, String>(&sql);
1685 if let Some(ref ids) = filter.instance_ids {
1686 for id in ids {
1687 query = query.bind(id);
1688 }
1689 }
1690 if let Some(completed_before) = filter.completed_before {
1691 query = query.bind(completed_before as i64);
1692 }
1693 query = query.bind(limit as i64);
1694
1695 let instance_ids: Vec<String> = query
1696 .fetch_all(&*self.pool)
1697 .await
1698 .map_err(|e| Self::sqlx_to_provider_error("delete_instance_bulk", e))?;
1699
1700 if instance_ids.is_empty() {
1701 return Ok(DeleteInstanceResult::default());
1702 }
1703
1704 let mut result = DeleteInstanceResult::default();
1706
1707 for instance_id in &instance_ids {
1708 let tree = self.get_instance_tree(instance_id).await?;
1710
1711 let delete_result = self.delete_instances_atomic(&tree.all_ids, true).await?;
1713 result.instances_deleted += delete_result.instances_deleted;
1714 result.executions_deleted += delete_result.executions_deleted;
1715 result.events_deleted += delete_result.events_deleted;
1716 result.queue_messages_deleted += delete_result.queue_messages_deleted;
1717 }
1718
1719 debug!(
1720 target = "duroxide::providers::postgres",
1721 operation = "delete_instance_bulk",
1722 instances_deleted = result.instances_deleted,
1723 executions_deleted = result.executions_deleted,
1724 events_deleted = result.events_deleted,
1725 queue_messages_deleted = result.queue_messages_deleted,
1726 "Bulk deleted instances"
1727 );
1728
1729 Ok(result)
1730 }
1731
1732 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1735 async fn prune_executions(
1736 &self,
1737 instance_id: &str,
1738 options: PruneOptions,
1739 ) -> Result<PruneResult, ProviderError> {
1740 let keep_last: Option<i32> = options.keep_last.map(|v| v as i32);
1741 let completed_before_ms: Option<i64> = options.completed_before.map(|v| v as i64);
1742
1743 let row: Option<(i64, i64, i64)> = sqlx::query_as(&format!(
1744 "SELECT * FROM {}.prune_executions($1, $2, $3)",
1745 self.schema_name
1746 ))
1747 .bind(instance_id)
1748 .bind(keep_last)
1749 .bind(completed_before_ms)
1750 .fetch_optional(&*self.pool)
1751 .await
1752 .map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?;
1753
1754 let (instances_processed, executions_deleted, events_deleted) = row.unwrap_or((0, 0, 0));
1755
1756 debug!(
1757 target = "duroxide::providers::postgres",
1758 operation = "prune_executions",
1759 instance_id = %instance_id,
1760 instances_processed = instances_processed,
1761 executions_deleted = executions_deleted,
1762 events_deleted = events_deleted,
1763 "Pruned executions"
1764 );
1765
1766 Ok(PruneResult {
1767 instances_processed: instances_processed as u64,
1768 executions_deleted: executions_deleted as u64,
1769 events_deleted: events_deleted as u64,
1770 })
1771 }
1772
1773 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1774 async fn prune_executions_bulk(
1775 &self,
1776 filter: InstanceFilter,
1777 options: PruneOptions,
1778 ) -> Result<PruneResult, ProviderError> {
1779 let mut sql = format!(
1784 r#"
1785 SELECT i.instance_id
1786 FROM {}.instances i
1787 LEFT JOIN {}.executions e ON i.instance_id = e.instance_id
1788 AND i.current_execution_id = e.execution_id
1789 WHERE 1=1
1790 "#,
1791 self.schema_name, self.schema_name
1792 );
1793
1794 if let Some(ref ids) = filter.instance_ids {
1796 if ids.is_empty() {
1797 return Ok(PruneResult::default());
1798 }
1799 let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1800 sql.push_str(&format!(
1801 " AND i.instance_id IN ({})",
1802 placeholders.join(", ")
1803 ));
1804 }
1805
1806 if filter.completed_before.is_some() {
1808 let param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0) + 1;
1809 sql.push_str(&format!(
1810 " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1811 param_num
1812 ));
1813 }
1814
1815 let limit = filter.limit.unwrap_or(1000);
1817 let limit_param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0)
1818 + if filter.completed_before.is_some() { 1 } else { 0 }
1819 + 1;
1820 sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1821
1822 let mut query = sqlx::query_scalar::<_, String>(&sql);
1824 if let Some(ref ids) = filter.instance_ids {
1825 for id in ids {
1826 query = query.bind(id);
1827 }
1828 }
1829 if let Some(completed_before) = filter.completed_before {
1830 query = query.bind(completed_before as i64);
1831 }
1832 query = query.bind(limit as i64);
1833
1834 let instance_ids: Vec<String> = query
1835 .fetch_all(&*self.pool)
1836 .await
1837 .map_err(|e| Self::sqlx_to_provider_error("prune_executions_bulk", e))?;
1838
1839 let mut result = PruneResult::default();
1841
1842 for instance_id in &instance_ids {
1843 let single_result = self.prune_executions(instance_id, options.clone()).await?;
1844 result.instances_processed += single_result.instances_processed;
1845 result.executions_deleted += single_result.executions_deleted;
1846 result.events_deleted += single_result.events_deleted;
1847 }
1848
1849 debug!(
1850 target = "duroxide::providers::postgres",
1851 operation = "prune_executions_bulk",
1852 instances_processed = result.instances_processed,
1853 executions_deleted = result.executions_deleted,
1854 events_deleted = result.events_deleted,
1855 "Bulk pruned executions"
1856 );
1857
1858 Ok(result)
1859 }
1860}