1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4 CustomStatusUpdate, DeleteInstanceResult, DispatcherCapabilityFilter, ExecutionInfo,
5 ExecutionMetadata, InstanceFilter, InstanceInfo, OrchestrationItem, Provider, ProviderAdmin,
6 ProviderError, PruneOptions, PruneResult, QueueDepths, ScheduledActivityIdentifier,
7 SessionFetchConfig, SystemMetrics, WorkItem,
8};
9use duroxide::Event;
10use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
11use std::sync::Arc;
12use std::time::Duration;
13use std::time::{SystemTime, UNIX_EPOCH};
14use tokio::time::sleep;
15use tracing::{debug, error, instrument, warn};
16
17use crate::migrations::MigrationRunner;
18
19pub struct PostgresProvider {
42 pool: Arc<PgPool>,
43 schema_name: String,
44}
45
46impl PostgresProvider {
47 pub async fn new(database_url: &str) -> Result<Self> {
48 Self::new_with_schema(database_url, None).await
49 }
50
51 pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
52 let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
53 .ok()
54 .and_then(|s| s.parse::<u32>().ok())
55 .unwrap_or(10);
56
57 let pool = PgPoolOptions::new()
58 .max_connections(max_connections)
59 .min_connections(1)
60 .acquire_timeout(std::time::Duration::from_secs(30))
61 .connect(database_url)
62 .await?;
63
64 let schema_name = schema_name.unwrap_or("public").to_string();
65
66 let provider = Self {
67 pool: Arc::new(pool),
68 schema_name: schema_name.clone(),
69 };
70
71 let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
73 migration_runner.migrate().await?;
74
75 Ok(provider)
76 }
77
78 #[instrument(skip(self), target = "duroxide::providers::postgres")]
79 pub async fn initialize_schema(&self) -> Result<()> {
80 let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
83 migration_runner.migrate().await?;
84 Ok(())
85 }
86
87 fn now_millis() -> i64 {
89 SystemTime::now()
90 .duration_since(UNIX_EPOCH)
91 .unwrap()
92 .as_millis() as i64
93 }
94
95
96
97 fn table_name(&self, table: &str) -> String {
99 format!("{}.{}", self.schema_name, table)
100 }
101
102 pub fn pool(&self) -> &PgPool {
104 &self.pool
105 }
106
107 pub fn schema_name(&self) -> &str {
109 &self.schema_name
110 }
111
112 fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
114 match e {
115 SqlxError::Database(ref db_err) => {
116 let code_opt = db_err.code();
118 let code = code_opt.as_deref();
119 if code == Some("40P01") {
120 ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
122 } else if code == Some("40001") {
123 ProviderError::permanent(operation, format!("Serialization failure: {e}"))
125 } else if code == Some("23505") {
126 ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
128 } else if code == Some("23503") {
129 ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
131 } else {
132 ProviderError::permanent(operation, format!("Database error: {e}"))
133 }
134 }
135 SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
136 ProviderError::retryable(operation, format!("Connection pool error: {e}"))
137 }
138 SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
139 _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
140 }
141 }
142
143 pub async fn cleanup_schema(&self) -> Result<()> {
148 sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
150 .execute(&*self.pool)
151 .await?;
152
153 if self.schema_name != "public" {
156 sqlx::query(&format!(
157 "DROP SCHEMA IF EXISTS {} CASCADE",
158 self.schema_name
159 ))
160 .execute(&*self.pool)
161 .await?;
162 } else {
163 }
166
167 Ok(())
168 }
169}
170
171#[async_trait::async_trait]
172impl Provider for PostgresProvider {
173 fn name(&self) -> &str {
174 "duroxide-pg"
175 }
176
177 fn version(&self) -> &str {
178 env!("CARGO_PKG_VERSION")
179 }
180
181 #[instrument(skip(self), target = "duroxide::providers::postgres")]
182 async fn fetch_orchestration_item(
183 &self,
184 lock_timeout: Duration,
185 _poll_timeout: Duration,
186 filter: Option<&DispatcherCapabilityFilter>,
187 ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
188 let start = std::time::Instant::now();
189
190 const MAX_RETRIES: u32 = 3;
191 const RETRY_DELAY_MS: u64 = 50;
192
193 let lock_timeout_ms = lock_timeout.as_millis() as i64;
195 let mut _last_error: Option<ProviderError> = None;
196
197 let (min_packed, max_packed) = if let Some(f) = filter {
199 if let Some(range) = f.supported_duroxide_versions.first() {
200 let min = range.min.major as i64 * 1_000_000 + range.min.minor as i64 * 1_000 + range.min.patch as i64;
201 let max = range.max.major as i64 * 1_000_000 + range.max.minor as i64 * 1_000 + range.max.patch as i64;
202 (Some(min), Some(max))
203 } else {
204 return Ok(None);
206 }
207 } else {
208 (None, None)
209 };
210
211 for attempt in 0..=MAX_RETRIES {
212 let now_ms = Self::now_millis();
213
214 let result: Result<
215 Option<(
216 String,
217 String,
218 String,
219 i64,
220 serde_json::Value,
221 serde_json::Value,
222 String,
223 i32,
224 )>,
225 SqlxError,
226 > = sqlx::query_as(&format!(
227 "SELECT * FROM {}.fetch_orchestration_item($1, $2, $3, $4)",
228 self.schema_name
229 ))
230 .bind(now_ms)
231 .bind(lock_timeout_ms)
232 .bind(min_packed)
233 .bind(max_packed)
234 .fetch_optional(&*self.pool)
235 .await;
236
237 let row = match result {
238 Ok(r) => r,
239 Err(e) => {
240 let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
241 if provider_err.is_retryable() && attempt < MAX_RETRIES {
242 warn!(
243 target = "duroxide::providers::postgres",
244 operation = "fetch_orchestration_item",
245 attempt = attempt + 1,
246 error = %provider_err,
247 "Retryable error, will retry"
248 );
249 _last_error = Some(provider_err);
250 sleep(std::time::Duration::from_millis(
251 RETRY_DELAY_MS * (attempt as u64 + 1),
252 ))
253 .await;
254 continue;
255 }
256 return Err(provider_err);
257 }
258 };
259
260 if let Some((
261 instance_id,
262 orchestration_name,
263 orchestration_version,
264 execution_id,
265 history_json,
266 messages_json,
267 lock_token,
268 attempt_count,
269 )) = row
270 {
271 let (history, history_error) = match serde_json::from_value::<Vec<Event>>(history_json) {
272 Ok(h) => (h, None),
273 Err(e) => {
274 let error_msg = format!("Failed to deserialize history: {e}");
275 warn!(
276 target = "duroxide::providers::postgres",
277 instance = %instance_id,
278 error = %error_msg,
279 "History deserialization failed, returning item with history_error"
280 );
281 (vec![], Some(error_msg))
282 }
283 };
284
285 let messages: Vec<WorkItem> =
286 serde_json::from_value(messages_json).map_err(|e| {
287 ProviderError::permanent(
288 "fetch_orchestration_item",
289 format!("Failed to deserialize messages: {e}"),
290 )
291 })?;
292
293 let duration_ms = start.elapsed().as_millis() as u64;
294 debug!(
295 target = "duroxide::providers::postgres",
296 operation = "fetch_orchestration_item",
297 instance_id = %instance_id,
298 execution_id = execution_id,
299 message_count = messages.len(),
300 history_count = history.len(),
301 attempt_count = attempt_count,
302 duration_ms = duration_ms,
303 attempts = attempt + 1,
304 "Fetched orchestration item via stored procedure"
305 );
306
307 return Ok(Some((
308 OrchestrationItem {
309 instance: instance_id,
310 orchestration_name,
311 execution_id: execution_id as u64,
312 version: orchestration_version,
313 history,
314 messages,
315 history_error,
316 },
317 lock_token,
318 attempt_count as u32,
319 )));
320 }
321
322 return Ok(None);
325 }
326
327 Ok(None)
328 }
329 #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
330 async fn ack_orchestration_item(
331 &self,
332 lock_token: &str,
333 execution_id: u64,
334 history_delta: Vec<Event>,
335 worker_items: Vec<WorkItem>,
336 orchestrator_items: Vec<WorkItem>,
337 metadata: ExecutionMetadata,
338 cancelled_activities: Vec<ScheduledActivityIdentifier>,
339 ) -> Result<(), ProviderError> {
340 let start = std::time::Instant::now();
341
342 const MAX_RETRIES: u32 = 3;
343 const RETRY_DELAY_MS: u64 = 50;
344
345 let mut history_delta_payload = Vec::with_capacity(history_delta.len());
346 for event in &history_delta {
347 if event.event_id() == 0 {
348 return Err(ProviderError::permanent(
349 "ack_orchestration_item",
350 "event_id must be set by runtime",
351 ));
352 }
353
354 let event_json = serde_json::to_string(event).map_err(|e| {
355 ProviderError::permanent(
356 "ack_orchestration_item",
357 format!("Failed to serialize event: {e}"),
358 )
359 })?;
360
361 let event_type = format!("{event:?}")
362 .split('{')
363 .next()
364 .unwrap_or("Unknown")
365 .trim()
366 .to_string();
367
368 history_delta_payload.push(serde_json::json!({
369 "event_id": event.event_id(),
370 "event_type": event_type,
371 "event_data": event_json,
372 }));
373 }
374
375 let history_delta_json = serde_json::Value::Array(history_delta_payload);
376
377 let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
378 ProviderError::permanent(
379 "ack_orchestration_item",
380 format!("Failed to serialize worker items: {e}"),
381 )
382 })?;
383
384 let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
385 ProviderError::permanent(
386 "ack_orchestration_item",
387 format!("Failed to serialize orchestrator items: {e}"),
388 )
389 })?;
390
391 let (custom_status_action, custom_status_value): (Option<&str>, Option<&str>) =
393 match &metadata.custom_status {
394 Some(CustomStatusUpdate::Set(s)) => (Some("set"), Some(s.as_str())),
395 Some(CustomStatusUpdate::Clear) => (Some("clear"), None),
396 None => (None, None),
397 };
398
399 let metadata_json = serde_json::json!({
400 "orchestration_name": metadata.orchestration_name,
401 "orchestration_version": metadata.orchestration_version,
402 "status": metadata.status,
403 "output": metadata.output,
404 "parent_instance_id": metadata.parent_instance_id,
405 "pinned_duroxide_version": metadata.pinned_duroxide_version.as_ref().map(|v| {
406 serde_json::json!({
407 "major": v.major,
408 "minor": v.minor,
409 "patch": v.patch,
410 })
411 }),
412 "custom_status_action": custom_status_action,
413 "custom_status_value": custom_status_value,
414 });
415
416 let cancelled_activities_json: Vec<serde_json::Value> = cancelled_activities
418 .iter()
419 .map(|a| {
420 serde_json::json!({
421 "instance": a.instance,
422 "execution_id": a.execution_id,
423 "activity_id": a.activity_id,
424 })
425 })
426 .collect();
427 let cancelled_activities_json = serde_json::Value::Array(cancelled_activities_json);
428
429 for attempt in 0..=MAX_RETRIES {
430 let now_ms = Self::now_millis();
431 let result = sqlx::query(&format!(
432 "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6, $7, $8)",
433 self.schema_name
434 ))
435 .bind(lock_token)
436 .bind(now_ms)
437 .bind(execution_id as i64)
438 .bind(&history_delta_json)
439 .bind(&worker_items_json)
440 .bind(&orchestrator_items_json)
441 .bind(&metadata_json)
442 .bind(&cancelled_activities_json)
443 .execute(&*self.pool)
444 .await;
445
446 match result {
447 Ok(_) => {
448 let duration_ms = start.elapsed().as_millis() as u64;
449 debug!(
450 target = "duroxide::providers::postgres",
451 operation = "ack_orchestration_item",
452 execution_id = execution_id,
453 history_count = history_delta.len(),
454 worker_items_count = worker_items.len(),
455 orchestrator_items_count = orchestrator_items.len(),
456 cancelled_activities_count = cancelled_activities.len(),
457 duration_ms = duration_ms,
458 attempts = attempt + 1,
459 "Acknowledged orchestration item via stored procedure"
460 );
461 return Ok(());
462 }
463 Err(e) => {
464 if let SqlxError::Database(db_err) = &e {
466 if db_err.message().contains("Invalid lock token") {
467 return Err(ProviderError::permanent(
468 "ack_orchestration_item",
469 "Invalid lock token",
470 ));
471 }
472 } else if e.to_string().contains("Invalid lock token") {
473 return Err(ProviderError::permanent(
474 "ack_orchestration_item",
475 "Invalid lock token",
476 ));
477 }
478
479 let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
480 if provider_err.is_retryable() && attempt < MAX_RETRIES {
481 warn!(
482 target = "duroxide::providers::postgres",
483 operation = "ack_orchestration_item",
484 attempt = attempt + 1,
485 error = %provider_err,
486 "Retryable error, will retry"
487 );
488 sleep(std::time::Duration::from_millis(
489 RETRY_DELAY_MS * (attempt as u64 + 1),
490 ))
491 .await;
492 continue;
493 }
494 return Err(provider_err);
495 }
496 }
497 }
498
499 Ok(())
501 }
502 #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
503 async fn abandon_orchestration_item(
504 &self,
505 lock_token: &str,
506 delay: Option<Duration>,
507 ignore_attempt: bool,
508 ) -> Result<(), ProviderError> {
509 let start = std::time::Instant::now();
510 let now_ms = Self::now_millis();
511 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
512
513 let instance_id = match sqlx::query_scalar::<_, String>(&format!(
514 "SELECT {}.abandon_orchestration_item($1, $2, $3, $4)",
515 self.schema_name
516 ))
517 .bind(lock_token)
518 .bind(now_ms)
519 .bind(delay_param)
520 .bind(ignore_attempt)
521 .fetch_one(&*self.pool)
522 .await
523 {
524 Ok(instance_id) => instance_id,
525 Err(e) => {
526 if let SqlxError::Database(db_err) = &e {
527 if db_err.message().contains("Invalid lock token") {
528 return Err(ProviderError::permanent(
529 "abandon_orchestration_item",
530 "Invalid lock token",
531 ));
532 }
533 } else if e.to_string().contains("Invalid lock token") {
534 return Err(ProviderError::permanent(
535 "abandon_orchestration_item",
536 "Invalid lock token",
537 ));
538 }
539
540 return Err(Self::sqlx_to_provider_error(
541 "abandon_orchestration_item",
542 e,
543 ));
544 }
545 };
546
547 let duration_ms = start.elapsed().as_millis() as u64;
548 debug!(
549 target = "duroxide::providers::postgres",
550 operation = "abandon_orchestration_item",
551 instance_id = %instance_id,
552 delay_ms = delay.map(|d| d.as_millis() as u64),
553 ignore_attempt = ignore_attempt,
554 duration_ms = duration_ms,
555 "Abandoned orchestration item via stored procedure"
556 );
557
558 Ok(())
559 }
560
561 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
562 async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
563 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
564 "SELECT out_event_data FROM {}.fetch_history($1)",
565 self.schema_name
566 ))
567 .bind(instance)
568 .fetch_all(&*self.pool)
569 .await
570 .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
571
572 Ok(event_data_rows
573 .into_iter()
574 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
575 .collect())
576 }
577
578 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
579 async fn append_with_execution(
580 &self,
581 instance: &str,
582 execution_id: u64,
583 new_events: Vec<Event>,
584 ) -> Result<(), ProviderError> {
585 if new_events.is_empty() {
586 return Ok(());
587 }
588
589 let mut events_payload = Vec::with_capacity(new_events.len());
590 for event in &new_events {
591 if event.event_id() == 0 {
592 error!(
593 target = "duroxide::providers::postgres",
594 operation = "append_with_execution",
595 error_type = "validation_error",
596 instance_id = %instance,
597 execution_id = execution_id,
598 "event_id must be set by runtime"
599 );
600 return Err(ProviderError::permanent(
601 "append_with_execution",
602 "event_id must be set by runtime",
603 ));
604 }
605
606 let event_json = serde_json::to_string(event).map_err(|e| {
607 ProviderError::permanent(
608 "append_with_execution",
609 format!("Failed to serialize event: {e}"),
610 )
611 })?;
612
613 let event_type = format!("{event:?}")
614 .split('{')
615 .next()
616 .unwrap_or("Unknown")
617 .trim()
618 .to_string();
619
620 events_payload.push(serde_json::json!({
621 "event_id": event.event_id(),
622 "event_type": event_type,
623 "event_data": event_json,
624 }));
625 }
626
627 let events_json = serde_json::Value::Array(events_payload);
628
629 sqlx::query(&format!(
630 "SELECT {}.append_history($1, $2, $3)",
631 self.schema_name
632 ))
633 .bind(instance)
634 .bind(execution_id as i64)
635 .bind(events_json)
636 .execute(&*self.pool)
637 .await
638 .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
639
640 debug!(
641 target = "duroxide::providers::postgres",
642 operation = "append_with_execution",
643 instance_id = %instance,
644 execution_id = execution_id,
645 event_count = new_events.len(),
646 "Appended history events via stored procedure"
647 );
648
649 Ok(())
650 }
651
652 #[instrument(skip(self), target = "duroxide::providers::postgres")]
653 async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
654 let work_item = serde_json::to_string(&item).map_err(|e| {
655 ProviderError::permanent(
656 "enqueue_worker_work",
657 format!("Failed to serialize work item: {e}"),
658 )
659 })?;
660
661 let now_ms = Self::now_millis();
662
663 let (instance_id, execution_id, activity_id, session_id) = match &item {
665 WorkItem::ActivityExecute {
666 instance,
667 execution_id,
668 id,
669 session_id,
670 ..
671 } => (
672 Some(instance.clone()),
673 Some(*execution_id as i64),
674 Some(*id as i64),
675 session_id.clone(),
676 ),
677 _ => (None, None, None, None),
678 };
679
680 sqlx::query(&format!(
681 "SELECT {}.enqueue_worker_work($1, $2, $3, $4, $5, $6)",
682 self.schema_name
683 ))
684 .bind(work_item)
685 .bind(now_ms)
686 .bind(&instance_id)
687 .bind(execution_id)
688 .bind(activity_id)
689 .bind(&session_id)
690 .execute(&*self.pool)
691 .await
692 .map_err(|e| {
693 error!(
694 target = "duroxide::providers::postgres",
695 operation = "enqueue_worker_work",
696 error_type = "database_error",
697 error = %e,
698 "Failed to enqueue worker work"
699 );
700 Self::sqlx_to_provider_error("enqueue_worker_work", e)
701 })?;
702
703 Ok(())
704 }
705
706 #[instrument(skip(self), target = "duroxide::providers::postgres")]
707 async fn fetch_work_item(
708 &self,
709 lock_timeout: Duration,
710 _poll_timeout: Duration,
711 session: Option<&SessionFetchConfig>,
712 ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
713 let start = std::time::Instant::now();
714
715 let lock_timeout_ms = lock_timeout.as_millis() as i64;
717
718 let (owner_id, session_lock_timeout_ms): (Option<&str>, Option<i64>) = match session {
720 Some(config) => (
721 Some(&config.owner_id),
722 Some(config.lock_timeout.as_millis() as i64),
723 ),
724 None => (None, None),
725 };
726
727 let row = match sqlx::query_as::<_, (String, String, i32)>(&format!(
728 "SELECT * FROM {}.fetch_work_item($1, $2, $3, $4)",
729 self.schema_name
730 ))
731 .bind(Self::now_millis())
732 .bind(lock_timeout_ms)
733 .bind(owner_id)
734 .bind(session_lock_timeout_ms)
735 .fetch_optional(&*self.pool)
736 .await
737 {
738 Ok(row) => row,
739 Err(e) => {
740 return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
741 }
742 };
743
744 let (work_item_json, lock_token, attempt_count) = match row {
745 Some(row) => row,
746 None => return Ok(None),
747 };
748
749 let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
750 ProviderError::permanent(
751 "fetch_work_item",
752 format!("Failed to deserialize worker item: {e}"),
753 )
754 })?;
755
756 let duration_ms = start.elapsed().as_millis() as u64;
757
758 let instance_id = match &work_item {
760 WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
761 WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
762 WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
763 WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
764 WorkItem::TimerFired { instance, .. } => instance.as_str(),
765 WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
766 WorkItem::CancelInstance { instance, .. } => instance.as_str(),
767 WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
768 WorkItem::SubOrchCompleted {
769 parent_instance, ..
770 } => parent_instance.as_str(),
771 WorkItem::SubOrchFailed {
772 parent_instance, ..
773 } => parent_instance.as_str(),
774 WorkItem::QueueMessage { instance, .. } => instance.as_str(),
775 };
776
777 debug!(
778 target = "duroxide::providers::postgres",
779 operation = "fetch_work_item",
780 instance_id = %instance_id,
781 attempt_count = attempt_count,
782 duration_ms = duration_ms,
783 "Fetched activity work item via stored procedure"
784 );
785
786 Ok(Some((
787 work_item,
788 lock_token,
789 attempt_count as u32,
790 )))
791 }
792
793 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
794 async fn ack_work_item(
795 &self,
796 token: &str,
797 completion: Option<WorkItem>,
798 ) -> Result<(), ProviderError> {
799 let start = std::time::Instant::now();
800
801 let Some(completion) = completion else {
803 let now_ms = Self::now_millis();
804 sqlx::query(&format!(
806 "SELECT {}.ack_worker($1, NULL, NULL, $2)",
807 self.schema_name
808 ))
809 .bind(token)
810 .bind(now_ms)
811 .execute(&*self.pool)
812 .await
813 .map_err(|e| {
814 if e.to_string().contains("Worker queue item not found") {
815 ProviderError::permanent(
816 "ack_worker",
817 "Worker queue item not found or already processed",
818 )
819 } else {
820 Self::sqlx_to_provider_error("ack_worker", e)
821 }
822 })?;
823
824 let duration_ms = start.elapsed().as_millis() as u64;
825 debug!(
826 target = "duroxide::providers::postgres",
827 operation = "ack_worker",
828 token = %token,
829 duration_ms = duration_ms,
830 "Acknowledged worker without completion (cancelled)"
831 );
832 return Ok(());
833 };
834
835 let instance_id = match &completion {
837 WorkItem::ActivityCompleted { instance, .. }
838 | WorkItem::ActivityFailed { instance, .. } => instance,
839 _ => {
840 error!(
841 target = "duroxide::providers::postgres",
842 operation = "ack_worker",
843 error_type = "invalid_completion_type",
844 "Invalid completion work item type"
845 );
846 return Err(ProviderError::permanent(
847 "ack_worker",
848 "Invalid completion work item type",
849 ));
850 }
851 };
852
853 let completion_json = serde_json::to_string(&completion).map_err(|e| {
854 ProviderError::permanent("ack_worker", format!("Failed to serialize completion: {e}"))
855 })?;
856
857 let now_ms = Self::now_millis();
858
859 sqlx::query(&format!(
861 "SELECT {}.ack_worker($1, $2, $3, $4)",
862 self.schema_name
863 ))
864 .bind(token)
865 .bind(instance_id)
866 .bind(completion_json)
867 .bind(now_ms)
868 .execute(&*self.pool)
869 .await
870 .map_err(|e| {
871 if e.to_string().contains("Worker queue item not found") {
872 error!(
873 target = "duroxide::providers::postgres",
874 operation = "ack_worker",
875 error_type = "worker_item_not_found",
876 token = %token,
877 "Worker queue item not found or already processed"
878 );
879 ProviderError::permanent(
880 "ack_worker",
881 "Worker queue item not found or already processed",
882 )
883 } else {
884 Self::sqlx_to_provider_error("ack_worker", e)
885 }
886 })?;
887
888 let duration_ms = start.elapsed().as_millis() as u64;
889 debug!(
890 target = "duroxide::providers::postgres",
891 operation = "ack_worker",
892 instance_id = %instance_id,
893 duration_ms = duration_ms,
894 "Acknowledged worker and enqueued completion"
895 );
896
897 Ok(())
898 }
899
900 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
901 async fn renew_work_item_lock(
902 &self,
903 token: &str,
904 extend_for: Duration,
905 ) -> Result<(), ProviderError> {
906 let start = std::time::Instant::now();
907
908 let now_ms = Self::now_millis();
910
911 let extend_secs = extend_for.as_secs() as i64;
913
914 match sqlx::query(&format!(
915 "SELECT {}.renew_work_item_lock($1, $2, $3)",
916 self.schema_name
917 ))
918 .bind(token)
919 .bind(now_ms)
920 .bind(extend_secs)
921 .execute(&*self.pool)
922 .await
923 {
924 Ok(_) => {
925 let duration_ms = start.elapsed().as_millis() as u64;
926 debug!(
927 target = "duroxide::providers::postgres",
928 operation = "renew_work_item_lock",
929 token = %token,
930 extend_for_secs = extend_secs,
931 duration_ms = duration_ms,
932 "Work item lock renewed successfully"
933 );
934 Ok(())
935 }
936 Err(e) => {
937 if let SqlxError::Database(db_err) = &e {
938 if db_err.message().contains("Lock token invalid") {
939 return Err(ProviderError::permanent(
940 "renew_work_item_lock",
941 "Lock token invalid, expired, or already acked",
942 ));
943 }
944 } else if e.to_string().contains("Lock token invalid") {
945 return Err(ProviderError::permanent(
946 "renew_work_item_lock",
947 "Lock token invalid, expired, or already acked",
948 ));
949 }
950
951 Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
952 }
953 }
954 }
955
956 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
957 async fn abandon_work_item(
958 &self,
959 token: &str,
960 delay: Option<Duration>,
961 ignore_attempt: bool,
962 ) -> Result<(), ProviderError> {
963 let start = std::time::Instant::now();
964 let now_ms = Self::now_millis();
965 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
966
967 match sqlx::query(&format!(
968 "SELECT {}.abandon_work_item($1, $2, $3, $4)",
969 self.schema_name
970 ))
971 .bind(token)
972 .bind(now_ms)
973 .bind(delay_param)
974 .bind(ignore_attempt)
975 .execute(&*self.pool)
976 .await
977 {
978 Ok(_) => {
979 let duration_ms = start.elapsed().as_millis() as u64;
980 debug!(
981 target = "duroxide::providers::postgres",
982 operation = "abandon_work_item",
983 token = %token,
984 delay_ms = delay.map(|d| d.as_millis() as u64),
985 ignore_attempt = ignore_attempt,
986 duration_ms = duration_ms,
987 "Abandoned work item via stored procedure"
988 );
989 Ok(())
990 }
991 Err(e) => {
992 if let SqlxError::Database(db_err) = &e {
993 if db_err.message().contains("Invalid lock token")
994 || db_err.message().contains("already acked")
995 {
996 return Err(ProviderError::permanent(
997 "abandon_work_item",
998 "Invalid lock token or already acked",
999 ));
1000 }
1001 } else if e.to_string().contains("Invalid lock token")
1002 || e.to_string().contains("already acked")
1003 {
1004 return Err(ProviderError::permanent(
1005 "abandon_work_item",
1006 "Invalid lock token or already acked",
1007 ));
1008 }
1009
1010 Err(Self::sqlx_to_provider_error("abandon_work_item", e))
1011 }
1012 }
1013 }
1014
1015 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1016 async fn renew_orchestration_item_lock(
1017 &self,
1018 token: &str,
1019 extend_for: Duration,
1020 ) -> Result<(), ProviderError> {
1021 let start = std::time::Instant::now();
1022
1023 let now_ms = Self::now_millis();
1025
1026 let extend_secs = extend_for.as_secs() as i64;
1028
1029 match sqlx::query(&format!(
1030 "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
1031 self.schema_name
1032 ))
1033 .bind(token)
1034 .bind(now_ms)
1035 .bind(extend_secs)
1036 .execute(&*self.pool)
1037 .await
1038 {
1039 Ok(_) => {
1040 let duration_ms = start.elapsed().as_millis() as u64;
1041 debug!(
1042 target = "duroxide::providers::postgres",
1043 operation = "renew_orchestration_item_lock",
1044 token = %token,
1045 extend_for_secs = extend_secs,
1046 duration_ms = duration_ms,
1047 "Orchestration item lock renewed successfully"
1048 );
1049 Ok(())
1050 }
1051 Err(e) => {
1052 if let SqlxError::Database(db_err) = &e {
1053 if db_err.message().contains("Lock token invalid")
1054 || db_err.message().contains("expired")
1055 || db_err.message().contains("already released")
1056 {
1057 return Err(ProviderError::permanent(
1058 "renew_orchestration_item_lock",
1059 "Lock token invalid, expired, or already released",
1060 ));
1061 }
1062 } else if e.to_string().contains("Lock token invalid")
1063 || e.to_string().contains("expired")
1064 || e.to_string().contains("already released")
1065 {
1066 return Err(ProviderError::permanent(
1067 "renew_orchestration_item_lock",
1068 "Lock token invalid, expired, or already released",
1069 ));
1070 }
1071
1072 Err(Self::sqlx_to_provider_error(
1073 "renew_orchestration_item_lock",
1074 e,
1075 ))
1076 }
1077 }
1078 }
1079
1080 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1081 async fn enqueue_for_orchestrator(
1082 &self,
1083 item: WorkItem,
1084 delay: Option<Duration>,
1085 ) -> Result<(), ProviderError> {
1086 let work_item = serde_json::to_string(&item).map_err(|e| {
1087 ProviderError::permanent(
1088 "enqueue_orchestrator_work",
1089 format!("Failed to serialize work item: {e}"),
1090 )
1091 })?;
1092
1093 let instance_id = match &item {
1095 WorkItem::StartOrchestration { instance, .. }
1096 | WorkItem::ActivityCompleted { instance, .. }
1097 | WorkItem::ActivityFailed { instance, .. }
1098 | WorkItem::TimerFired { instance, .. }
1099 | WorkItem::ExternalRaised { instance, .. }
1100 | WorkItem::CancelInstance { instance, .. }
1101 | WorkItem::ContinueAsNew { instance, .. }
1102 | WorkItem::QueueMessage { instance, .. } => instance,
1103 WorkItem::SubOrchCompleted {
1104 parent_instance, ..
1105 }
1106 | WorkItem::SubOrchFailed {
1107 parent_instance, ..
1108 } => parent_instance,
1109 WorkItem::ActivityExecute { .. } => {
1110 return Err(ProviderError::permanent(
1111 "enqueue_orchestrator_work",
1112 "ActivityExecute should go to worker queue, not orchestrator queue",
1113 ));
1114 }
1115 };
1116
1117 let now_ms = Self::now_millis();
1119
1120 let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
1121 if *fire_at_ms > 0 {
1122 if let Some(delay) = delay {
1124 std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
1125 } else {
1126 *fire_at_ms
1127 }
1128 } else {
1129 delay
1131 .map(|d| now_ms as u64 + d.as_millis() as u64)
1132 .unwrap_or(now_ms as u64)
1133 }
1134 } else {
1135 delay
1137 .map(|d| now_ms as u64 + d.as_millis() as u64)
1138 .unwrap_or(now_ms as u64)
1139 };
1140
1141 let visible_at = Utc
1142 .timestamp_millis_opt(visible_at_ms as i64)
1143 .single()
1144 .ok_or_else(|| {
1145 ProviderError::permanent(
1146 "enqueue_orchestrator_work",
1147 "Invalid visible_at timestamp",
1148 )
1149 })?;
1150
1151 sqlx::query(&format!(
1156 "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
1157 self.schema_name
1158 ))
1159 .bind(instance_id)
1160 .bind(&work_item)
1161 .bind(visible_at)
1162 .bind::<Option<String>>(None) .bind::<Option<String>>(None) .bind::<Option<i64>>(None) .execute(&*self.pool)
1166 .await
1167 .map_err(|e| {
1168 error!(
1169 target = "duroxide::providers::postgres",
1170 operation = "enqueue_orchestrator_work",
1171 error_type = "database_error",
1172 error = %e,
1173 instance_id = %instance_id,
1174 "Failed to enqueue orchestrator work"
1175 );
1176 Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1177 })?;
1178
1179 debug!(
1180 target = "duroxide::providers::postgres",
1181 operation = "enqueue_orchestrator_work",
1182 instance_id = %instance_id,
1183 delay_ms = delay.map(|d| d.as_millis() as u64),
1184 "Enqueued orchestrator work"
1185 );
1186
1187 Ok(())
1188 }
1189
1190 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1191 async fn read_with_execution(
1192 &self,
1193 instance: &str,
1194 execution_id: u64,
1195 ) -> Result<Vec<Event>, ProviderError> {
1196 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1197 "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1198 self.table_name("history")
1199 ))
1200 .bind(instance)
1201 .bind(execution_id as i64)
1202 .fetch_all(&*self.pool)
1203 .await
1204 .ok()
1205 .unwrap_or_default();
1206
1207 Ok(event_data_rows
1208 .into_iter()
1209 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1210 .collect())
1211 }
1212
1213 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1214 async fn renew_session_lock(
1215 &self,
1216 owner_ids: &[&str],
1217 extend_for: Duration,
1218 idle_timeout: Duration,
1219 ) -> Result<usize, ProviderError> {
1220 if owner_ids.is_empty() {
1221 return Ok(0);
1222 }
1223
1224 let now_ms = Self::now_millis();
1225 let extend_ms = extend_for.as_millis() as i64;
1226 let idle_timeout_ms = idle_timeout.as_millis() as i64;
1227 let owner_ids_vec: Vec<&str> = owner_ids.to_vec();
1228
1229 let result = sqlx::query_scalar::<_, i64>(&format!(
1230 "SELECT {}.renew_session_lock($1, $2, $3, $4)",
1231 self.schema_name
1232 ))
1233 .bind(&owner_ids_vec)
1234 .bind(now_ms)
1235 .bind(extend_ms)
1236 .bind(idle_timeout_ms)
1237 .fetch_one(&*self.pool)
1238 .await
1239 .map_err(|e| Self::sqlx_to_provider_error("renew_session_lock", e))?;
1240
1241 debug!(
1242 target = "duroxide::providers::postgres",
1243 operation = "renew_session_lock",
1244 owner_count = owner_ids.len(),
1245 sessions_renewed = result,
1246 "Session locks renewed"
1247 );
1248
1249 Ok(result as usize)
1250 }
1251
1252 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1253 async fn cleanup_orphaned_sessions(
1254 &self,
1255 _idle_timeout: Duration,
1256 ) -> Result<usize, ProviderError> {
1257 let now_ms = Self::now_millis();
1258
1259 let result = sqlx::query_scalar::<_, i64>(&format!(
1260 "SELECT {}.cleanup_orphaned_sessions($1)",
1261 self.schema_name
1262 ))
1263 .bind(now_ms)
1264 .fetch_one(&*self.pool)
1265 .await
1266 .map_err(|e| Self::sqlx_to_provider_error("cleanup_orphaned_sessions", e))?;
1267
1268 debug!(
1269 target = "duroxide::providers::postgres",
1270 operation = "cleanup_orphaned_sessions",
1271 sessions_cleaned = result,
1272 "Orphaned sessions cleaned up"
1273 );
1274
1275 Ok(result as usize)
1276 }
1277
1278 fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1279 Some(self)
1280 }
1281
1282 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1283 async fn get_custom_status(
1284 &self,
1285 instance: &str,
1286 last_seen_version: u64,
1287 ) -> Result<Option<(Option<String>, u64)>, ProviderError> {
1288 let row = sqlx::query_as::<_, (Option<String>, i64)>(&format!(
1289 "SELECT * FROM {}.get_custom_status($1, $2)",
1290 self.schema_name
1291 ))
1292 .bind(instance)
1293 .bind(last_seen_version as i64)
1294 .fetch_optional(&*self.pool)
1295 .await
1296 .map_err(|e| Self::sqlx_to_provider_error("get_custom_status", e))?;
1297
1298 match row {
1299 Some((custom_status, version)) => Ok(Some((custom_status, version as u64))),
1300 None => Ok(None),
1301 }
1302 }
1303}
1304
1305#[async_trait::async_trait]
1306impl ProviderAdmin for PostgresProvider {
1307 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1308 async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1309 sqlx::query_scalar(&format!(
1310 "SELECT instance_id FROM {}.list_instances()",
1311 self.schema_name
1312 ))
1313 .fetch_all(&*self.pool)
1314 .await
1315 .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1316 }
1317
1318 #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1319 async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1320 sqlx::query_scalar(&format!(
1321 "SELECT instance_id FROM {}.list_instances_by_status($1)",
1322 self.schema_name
1323 ))
1324 .bind(status)
1325 .fetch_all(&*self.pool)
1326 .await
1327 .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1328 }
1329
1330 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1331 async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1332 let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1333 "SELECT execution_id FROM {}.list_executions($1)",
1334 self.schema_name
1335 ))
1336 .bind(instance)
1337 .fetch_all(&*self.pool)
1338 .await
1339 .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1340
1341 Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1342 }
1343
1344 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1345 async fn read_history_with_execution_id(
1346 &self,
1347 instance: &str,
1348 execution_id: u64,
1349 ) -> Result<Vec<Event>, ProviderError> {
1350 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1351 "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1352 self.schema_name
1353 ))
1354 .bind(instance)
1355 .bind(execution_id as i64)
1356 .fetch_all(&*self.pool)
1357 .await
1358 .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1359
1360 event_data_rows
1361 .into_iter()
1362 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1363 .collect::<Vec<Event>>()
1364 .into_iter()
1365 .map(Ok)
1366 .collect()
1367 }
1368
1369 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1370 async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1371 let execution_id = self.latest_execution_id(instance).await?;
1372 self.read_history_with_execution_id(instance, execution_id)
1373 .await
1374 }
1375
1376 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1377 async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1378 sqlx::query_scalar(&format!(
1379 "SELECT {}.latest_execution_id($1)",
1380 self.schema_name
1381 ))
1382 .bind(instance)
1383 .fetch_optional(&*self.pool)
1384 .await
1385 .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1386 .map(|id: i64| id as u64)
1387 .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1388 }
1389
1390 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1391 async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1392 let row: Option<(
1393 String,
1394 String,
1395 String,
1396 i64,
1397 chrono::DateTime<Utc>,
1398 Option<chrono::DateTime<Utc>>,
1399 Option<String>,
1400 Option<String>,
1401 Option<String>,
1402 )> = sqlx::query_as(&format!(
1403 "SELECT * FROM {}.get_instance_info($1)",
1404 self.schema_name
1405 ))
1406 .bind(instance)
1407 .fetch_optional(&*self.pool)
1408 .await
1409 .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1410
1411 let (
1412 instance_id,
1413 orchestration_name,
1414 orchestration_version,
1415 current_execution_id,
1416 created_at,
1417 updated_at,
1418 status,
1419 output,
1420 parent_instance_id,
1421 ) =
1422 row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1423
1424 Ok(InstanceInfo {
1425 instance_id,
1426 orchestration_name,
1427 orchestration_version,
1428 current_execution_id: current_execution_id as u64,
1429 status: status.unwrap_or_else(|| "Running".to_string()),
1430 output,
1431 created_at: created_at.timestamp_millis() as u64,
1432 updated_at: updated_at
1433 .map(|dt| dt.timestamp_millis() as u64)
1434 .unwrap_or(created_at.timestamp_millis() as u64),
1435 parent_instance_id,
1436 })
1437 }
1438
1439 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1440 async fn get_execution_info(
1441 &self,
1442 instance: &str,
1443 execution_id: u64,
1444 ) -> Result<ExecutionInfo, ProviderError> {
1445 let row: Option<(
1446 i64,
1447 String,
1448 Option<String>,
1449 chrono::DateTime<Utc>,
1450 Option<chrono::DateTime<Utc>>,
1451 i64,
1452 )> = sqlx::query_as(&format!(
1453 "SELECT * FROM {}.get_execution_info($1, $2)",
1454 self.schema_name
1455 ))
1456 .bind(instance)
1457 .bind(execution_id as i64)
1458 .fetch_optional(&*self.pool)
1459 .await
1460 .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1461
1462 let (exec_id, status, output, started_at, completed_at, event_count) = row
1463 .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1464
1465 Ok(ExecutionInfo {
1466 execution_id: exec_id as u64,
1467 status,
1468 output,
1469 started_at: started_at.timestamp_millis() as u64,
1470 completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1471 event_count: event_count as usize,
1472 })
1473 }
1474
1475 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1476 async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1477 let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1478 "SELECT * FROM {}.get_system_metrics()",
1479 self.schema_name
1480 ))
1481 .fetch_optional(&*self.pool)
1482 .await
1483 .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1484
1485 let (
1486 total_instances,
1487 total_executions,
1488 running_instances,
1489 completed_instances,
1490 failed_instances,
1491 total_events,
1492 ) = row.ok_or_else(|| {
1493 ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1494 })?;
1495
1496 Ok(SystemMetrics {
1497 total_instances: total_instances as u64,
1498 total_executions: total_executions as u64,
1499 running_instances: running_instances as u64,
1500 completed_instances: completed_instances as u64,
1501 failed_instances: failed_instances as u64,
1502 total_events: total_events as u64,
1503 })
1504 }
1505
1506 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1507 async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1508 let now_ms = Self::now_millis();
1509
1510 let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1511 "SELECT * FROM {}.get_queue_depths($1)",
1512 self.schema_name
1513 ))
1514 .bind(now_ms)
1515 .fetch_optional(&*self.pool)
1516 .await
1517 .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1518
1519 let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1520 ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1521 })?;
1522
1523 Ok(QueueDepths {
1524 orchestrator_queue: orchestrator_queue as usize,
1525 worker_queue: worker_queue as usize,
1526 timer_queue: 0, })
1528 }
1529
1530 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1533 async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError> {
1534 sqlx::query_scalar(&format!(
1535 "SELECT child_instance_id FROM {}.list_children($1)",
1536 self.schema_name
1537 ))
1538 .bind(instance_id)
1539 .fetch_all(&*self.pool)
1540 .await
1541 .map_err(|e| Self::sqlx_to_provider_error("list_children", e))
1542 }
1543
1544 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1545 async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError> {
1546 let result: Result<Option<String>, _> = sqlx::query_scalar(&format!(
1549 "SELECT {}.get_parent_id($1)",
1550 self.schema_name
1551 ))
1552 .bind(instance_id)
1553 .fetch_one(&*self.pool)
1554 .await;
1555
1556 match result {
1557 Ok(parent_id) => Ok(parent_id),
1558 Err(e) => {
1559 let err_str = e.to_string();
1560 if err_str.contains("Instance not found") {
1561 Err(ProviderError::permanent(
1562 "get_parent_id",
1563 format!("Instance not found: {}", instance_id),
1564 ))
1565 } else {
1566 Err(Self::sqlx_to_provider_error("get_parent_id", e))
1567 }
1568 }
1569 }
1570 }
1571
1572 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1575 async fn delete_instances_atomic(
1576 &self,
1577 ids: &[String],
1578 force: bool,
1579 ) -> Result<DeleteInstanceResult, ProviderError> {
1580 if ids.is_empty() {
1581 return Ok(DeleteInstanceResult::default());
1582 }
1583
1584 let row: Option<(i64, i64, i64, i64)> = sqlx::query_as(&format!(
1585 "SELECT * FROM {}.delete_instances_atomic($1, $2)",
1586 self.schema_name
1587 ))
1588 .bind(ids)
1589 .bind(force)
1590 .fetch_optional(&*self.pool)
1591 .await
1592 .map_err(|e| {
1593 let err_str = e.to_string();
1594 if err_str.contains("is Running") {
1595 ProviderError::permanent(
1596 "delete_instances_atomic",
1597 err_str,
1598 )
1599 } else if err_str.contains("Orphan detected") {
1600 ProviderError::permanent(
1601 "delete_instances_atomic",
1602 err_str,
1603 )
1604 } else {
1605 Self::sqlx_to_provider_error("delete_instances_atomic", e)
1606 }
1607 })?;
1608
1609 let (instances_deleted, executions_deleted, events_deleted, queue_messages_deleted) =
1610 row.unwrap_or((0, 0, 0, 0));
1611
1612 debug!(
1613 target = "duroxide::providers::postgres",
1614 operation = "delete_instances_atomic",
1615 instances_deleted = instances_deleted,
1616 executions_deleted = executions_deleted,
1617 events_deleted = events_deleted,
1618 queue_messages_deleted = queue_messages_deleted,
1619 "Deleted instances atomically"
1620 );
1621
1622 Ok(DeleteInstanceResult {
1623 instances_deleted: instances_deleted as u64,
1624 executions_deleted: executions_deleted as u64,
1625 events_deleted: events_deleted as u64,
1626 queue_messages_deleted: queue_messages_deleted as u64,
1627 })
1628 }
1629
1630 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1631 async fn delete_instance_bulk(
1632 &self,
1633 filter: InstanceFilter,
1634 ) -> Result<DeleteInstanceResult, ProviderError> {
1635 let mut sql = format!(
1637 r#"
1638 SELECT i.instance_id
1639 FROM {}.instances i
1640 LEFT JOIN {}.executions e ON i.instance_id = e.instance_id
1641 AND i.current_execution_id = e.execution_id
1642 WHERE i.parent_instance_id IS NULL
1643 AND e.status IN ('Completed', 'Failed', 'ContinuedAsNew')
1644 "#,
1645 self.schema_name, self.schema_name
1646 );
1647
1648 if let Some(ref ids) = filter.instance_ids {
1650 if ids.is_empty() {
1651 return Ok(DeleteInstanceResult::default());
1652 }
1653 let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1654 sql.push_str(&format!(
1655 " AND i.instance_id IN ({})",
1656 placeholders.join(", ")
1657 ));
1658 }
1659
1660 if filter.completed_before.is_some() {
1662 let param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0) + 1;
1663 sql.push_str(&format!(
1664 " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1665 param_num
1666 ));
1667 }
1668
1669 let limit = filter.limit.unwrap_or(1000);
1671 let limit_param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0)
1672 + if filter.completed_before.is_some() { 1 } else { 0 }
1673 + 1;
1674 sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1675
1676 let mut query = sqlx::query_scalar::<_, String>(&sql);
1678 if let Some(ref ids) = filter.instance_ids {
1679 for id in ids {
1680 query = query.bind(id);
1681 }
1682 }
1683 if let Some(completed_before) = filter.completed_before {
1684 query = query.bind(completed_before as i64);
1685 }
1686 query = query.bind(limit as i64);
1687
1688 let instance_ids: Vec<String> = query
1689 .fetch_all(&*self.pool)
1690 .await
1691 .map_err(|e| Self::sqlx_to_provider_error("delete_instance_bulk", e))?;
1692
1693 if instance_ids.is_empty() {
1694 return Ok(DeleteInstanceResult::default());
1695 }
1696
1697 let mut result = DeleteInstanceResult::default();
1699
1700 for instance_id in &instance_ids {
1701 let tree = self.get_instance_tree(instance_id).await?;
1703
1704 let delete_result = self.delete_instances_atomic(&tree.all_ids, true).await?;
1706 result.instances_deleted += delete_result.instances_deleted;
1707 result.executions_deleted += delete_result.executions_deleted;
1708 result.events_deleted += delete_result.events_deleted;
1709 result.queue_messages_deleted += delete_result.queue_messages_deleted;
1710 }
1711
1712 debug!(
1713 target = "duroxide::providers::postgres",
1714 operation = "delete_instance_bulk",
1715 instances_deleted = result.instances_deleted,
1716 executions_deleted = result.executions_deleted,
1717 events_deleted = result.events_deleted,
1718 queue_messages_deleted = result.queue_messages_deleted,
1719 "Bulk deleted instances"
1720 );
1721
1722 Ok(result)
1723 }
1724
1725 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1728 async fn prune_executions(
1729 &self,
1730 instance_id: &str,
1731 options: PruneOptions,
1732 ) -> Result<PruneResult, ProviderError> {
1733 let keep_last: Option<i32> = options.keep_last.map(|v| v as i32);
1734 let completed_before_ms: Option<i64> = options.completed_before.map(|v| v as i64);
1735
1736 let row: Option<(i64, i64, i64)> = sqlx::query_as(&format!(
1737 "SELECT * FROM {}.prune_executions($1, $2, $3)",
1738 self.schema_name
1739 ))
1740 .bind(instance_id)
1741 .bind(keep_last)
1742 .bind(completed_before_ms)
1743 .fetch_optional(&*self.pool)
1744 .await
1745 .map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?;
1746
1747 let (instances_processed, executions_deleted, events_deleted) = row.unwrap_or((0, 0, 0));
1748
1749 debug!(
1750 target = "duroxide::providers::postgres",
1751 operation = "prune_executions",
1752 instance_id = %instance_id,
1753 instances_processed = instances_processed,
1754 executions_deleted = executions_deleted,
1755 events_deleted = events_deleted,
1756 "Pruned executions"
1757 );
1758
1759 Ok(PruneResult {
1760 instances_processed: instances_processed as u64,
1761 executions_deleted: executions_deleted as u64,
1762 events_deleted: events_deleted as u64,
1763 })
1764 }
1765
1766 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1767 async fn prune_executions_bulk(
1768 &self,
1769 filter: InstanceFilter,
1770 options: PruneOptions,
1771 ) -> Result<PruneResult, ProviderError> {
1772 let mut sql = format!(
1777 r#"
1778 SELECT i.instance_id
1779 FROM {}.instances i
1780 LEFT JOIN {}.executions e ON i.instance_id = e.instance_id
1781 AND i.current_execution_id = e.execution_id
1782 WHERE 1=1
1783 "#,
1784 self.schema_name, self.schema_name
1785 );
1786
1787 if let Some(ref ids) = filter.instance_ids {
1789 if ids.is_empty() {
1790 return Ok(PruneResult::default());
1791 }
1792 let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1793 sql.push_str(&format!(
1794 " AND i.instance_id IN ({})",
1795 placeholders.join(", ")
1796 ));
1797 }
1798
1799 if filter.completed_before.is_some() {
1801 let param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0) + 1;
1802 sql.push_str(&format!(
1803 " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1804 param_num
1805 ));
1806 }
1807
1808 let limit = filter.limit.unwrap_or(1000);
1810 let limit_param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0)
1811 + if filter.completed_before.is_some() { 1 } else { 0 }
1812 + 1;
1813 sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1814
1815 let mut query = sqlx::query_scalar::<_, String>(&sql);
1817 if let Some(ref ids) = filter.instance_ids {
1818 for id in ids {
1819 query = query.bind(id);
1820 }
1821 }
1822 if let Some(completed_before) = filter.completed_before {
1823 query = query.bind(completed_before as i64);
1824 }
1825 query = query.bind(limit as i64);
1826
1827 let instance_ids: Vec<String> = query
1828 .fetch_all(&*self.pool)
1829 .await
1830 .map_err(|e| Self::sqlx_to_provider_error("prune_executions_bulk", e))?;
1831
1832 let mut result = PruneResult::default();
1834
1835 for instance_id in &instance_ids {
1836 let single_result = self.prune_executions(instance_id, options.clone()).await?;
1837 result.instances_processed += single_result.instances_processed;
1838 result.executions_deleted += single_result.executions_deleted;
1839 result.events_deleted += single_result.events_deleted;
1840 }
1841
1842 debug!(
1843 target = "duroxide::providers::postgres",
1844 operation = "prune_executions_bulk",
1845 instances_processed = result.instances_processed,
1846 executions_deleted = result.executions_deleted,
1847 events_deleted = result.events_deleted,
1848 "Bulk pruned executions"
1849 );
1850
1851 Ok(result)
1852 }
1853}