1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4 DeleteInstanceResult, DispatcherCapabilityFilter, ExecutionInfo, ExecutionMetadata,
5 InstanceFilter, InstanceInfo, OrchestrationItem, Provider, ProviderAdmin, ProviderError,
6 PruneOptions, PruneResult, QueueDepths, ScheduledActivityIdentifier, SessionFetchConfig,
7 SystemMetrics, TagFilter, WorkItem,
8};
9use duroxide::{Event, EventKind};
10use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
11use std::sync::Arc;
12use std::time::Duration;
13use std::time::{SystemTime, UNIX_EPOCH};
14use tokio::sync::Notify;
15use tokio::task::JoinHandle;
16use tokio::time::sleep;
17use tracing::{debug, error, info, instrument, warn};
18
19use crate::db_metrics::{record_fetch_result, DbCallTimer, DbOperation, FetchType};
20use crate::migrations::MigrationRunner;
21use crate::notifier::{LongPollConfig, Notifier};
22
23#[cfg(feature = "test-fault-injection")]
24use crate::fault_injection::FaultInjector;
25
26pub struct PostgresProvider {
49 pool: Arc<PgPool>,
50 schema_name: String,
51
52 orch_notify: Option<Arc<Notify>>,
54 worker_notify: Option<Arc<Notify>>,
55 notifier_handle: Option<JoinHandle<()>>,
56
57 #[cfg(feature = "test-fault-injection")]
59 fault_injector: Option<Arc<FaultInjector>>,
60}
61
62impl PostgresProvider {
63 pub async fn new(database_url: &str) -> Result<Self> {
65 Self::new_with_options(database_url, None, LongPollConfig::default()).await
66 }
67
68 pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
70 Self::new_with_options(database_url, schema_name, LongPollConfig::default()).await
71 }
72
73 pub async fn new_with_options(
75 database_url: &str,
76 schema_name: Option<&str>,
77 config: LongPollConfig,
78 ) -> Result<Self> {
79 let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
80 .ok()
81 .and_then(|s| s.parse::<u32>().ok())
82 .unwrap_or(10);
83
84 let pool = PgPoolOptions::new()
85 .max_connections(max_connections)
86 .min_connections(1)
87 .acquire_timeout(std::time::Duration::from_secs(30))
88 .connect(database_url)
89 .await?;
90
91 let schema_name = schema_name.unwrap_or("public").to_string();
92
93 let migration_runner = MigrationRunner::new(Arc::new(pool.clone()), schema_name.clone());
95 migration_runner.migrate().await?;
96
97 let (orch_notify, worker_notify, notifier_handle) = if config.enabled {
99 let orch_notify = Arc::new(Notify::new());
100 let worker_notify = Arc::new(Notify::new());
101
102 let mut notifier = Notifier::new(
103 pool.clone(),
104 schema_name.clone(),
105 orch_notify.clone(),
106 worker_notify.clone(),
107 config.clone(),
108 )
109 .await?;
110
111 let handle = tokio::spawn(async move {
112 notifier.run().await;
113 });
114
115 info!(
116 target = "duroxide::providers::postgres",
117 schema = %schema_name,
118 "Long-polling enabled"
119 );
120
121 (Some(orch_notify), Some(worker_notify), Some(handle))
122 } else {
123 debug!(
124 target = "duroxide::providers::postgres",
125 schema = %schema_name,
126 "Long-polling disabled"
127 );
128 (None, None, None)
129 };
130
131 Ok(Self {
132 pool: Arc::new(pool),
133 schema_name,
134 orch_notify,
135 worker_notify,
136 notifier_handle,
137 #[cfg(feature = "test-fault-injection")]
138 fault_injector: None,
139 })
140 }
141
142 #[cfg(feature = "test-fault-injection")]
147 pub async fn new_with_fault_injection(
148 database_url: &str,
149 schema_name: Option<&str>,
150 config: LongPollConfig,
151 fault_injector: Arc<FaultInjector>,
152 ) -> Result<Self> {
153 let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
154 .ok()
155 .and_then(|s| s.parse::<u32>().ok())
156 .unwrap_or(10);
157
158 let notifier_disabled = fault_injector.is_notifier_disabled();
160
161 let pool = PgPoolOptions::new()
162 .max_connections(max_connections)
163 .min_connections(1)
164 .acquire_timeout(std::time::Duration::from_secs(30))
165 .connect(database_url)
166 .await?;
167
168 let schema_name = schema_name.unwrap_or("public").to_string();
169
170 let migration_runner = MigrationRunner::new(Arc::new(pool.clone()), schema_name.clone());
172 migration_runner.migrate().await?;
173
174 let (orch_notify, worker_notify, notifier_handle, fi) =
176 if config.enabled && !notifier_disabled {
177 let orch_notify = Arc::new(Notify::new());
178 let worker_notify = Arc::new(Notify::new());
179
180 let mut notifier = Notifier::new_with_fault_injection(
181 pool.clone(),
182 schema_name.clone(),
183 orch_notify.clone(),
184 worker_notify.clone(),
185 config.clone(),
186 fault_injector.clone(),
187 )
188 .await?;
189
190 let handle = tokio::spawn(async move {
191 notifier.run().await;
192 });
193
194 info!(
195 target = "duroxide::providers::postgres",
196 schema = %schema_name,
197 "Long-polling enabled"
198 );
199
200 (
201 Some(orch_notify),
202 Some(worker_notify),
203 Some(handle),
204 Some(fault_injector),
205 )
206 } else {
207 if notifier_disabled {
208 warn!(
209 target = "duroxide::providers::postgres",
210 schema = %schema_name,
211 "Long-polling disabled by fault injection"
212 );
213 } else {
214 debug!(
215 target = "duroxide::providers::postgres",
216 schema = %schema_name,
217 "Long-polling disabled"
218 );
219 }
220 (None, None, None, Some(fault_injector))
221 };
222
223 Ok(Self {
224 pool: Arc::new(pool),
225 schema_name,
226 orch_notify,
227 worker_notify,
228 notifier_handle,
229 fault_injector: fi,
230 })
231 }
232
233 #[instrument(skip(self), target = "duroxide::providers::postgres")]
234 pub async fn initialize_schema(&self) -> Result<()> {
235 let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
238 migration_runner.migrate().await?;
239 Ok(())
240 }
241
242 fn now_millis_base() -> i64 {
246 SystemTime::now()
247 .duration_since(UNIX_EPOCH)
248 .unwrap_or_default()
249 .as_millis() as i64
250 }
251
252 #[cfg(feature = "test-fault-injection")]
261 fn now_millis(&self) -> i64 {
262 let base = Self::now_millis_base();
263 if let Some(ref fi) = self.fault_injector {
264 base + fi.get_clock_skew_ms()
265 } else {
266 base
267 }
268 }
269
270 #[cfg(not(feature = "test-fault-injection"))]
272 fn now_millis(&self) -> i64 {
273 Self::now_millis_base()
274 }
275
276 fn table_name(&self, table: &str) -> String {
278 format!("{}.{}", self.schema_name, table)
279 }
280
281 pub fn pool(&self) -> &PgPool {
283 &self.pool
284 }
285
286 pub fn schema_name(&self) -> &str {
288 &self.schema_name
289 }
290
291 fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
293 match e {
294 SqlxError::Database(ref db_err) => {
295 let code_opt = db_err.code();
297 let code = code_opt.as_deref();
298 if code == Some("40P01") {
299 ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
301 } else if code == Some("40001") {
302 ProviderError::permanent(operation, format!("Serialization failure: {e}"))
304 } else if code == Some("23505") {
305 ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
307 } else if code == Some("23503") {
308 ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
310 } else {
311 ProviderError::permanent(operation, format!("Database error: {e}"))
312 }
313 }
314 SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
315 ProviderError::retryable(operation, format!("Connection pool error: {e}"))
316 }
317 SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
318 _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
319 }
320 }
321
322 pub async fn cleanup_schema(&self) -> Result<()> {
327 const MAX_RETRIES: u32 = 5;
328 const BASE_RETRY_DELAY_MS: u64 = 50;
329
330 for attempt in 0..=MAX_RETRIES {
331 let cleanup_result = async {
332 let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("cleanup_schema"));
334 sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
335 .execute(&*self.pool)
336 .await?;
337
338 if self.schema_name != "public" {
341 let _timer = DbCallTimer::new(DbOperation::Ddl, None);
342 sqlx::query(&format!(
343 "DROP SCHEMA IF EXISTS {} CASCADE",
344 self.schema_name
345 ))
346 .execute(&*self.pool)
347 .await?;
348 } else {
349 }
352
353 Ok::<(), SqlxError>(())
354 }
355 .await;
356
357 match cleanup_result {
358 Ok(()) => return Ok(()),
359 Err(SqlxError::Database(db_err)) if db_err.code().as_deref() == Some("40P01") => {
360 if attempt < MAX_RETRIES {
361 warn!(
362 target = "duroxide::providers::postgres",
363 schema = %self.schema_name,
364 attempt = attempt + 1,
365 "Deadlock during cleanup_schema, retrying"
366 );
367 sleep(Duration::from_millis(
368 BASE_RETRY_DELAY_MS * (attempt as u64 + 1),
369 ))
370 .await;
371 continue;
372 }
373 return Err(anyhow::anyhow!(db_err.to_string()));
374 }
375 Err(e) => return Err(anyhow::anyhow!(e.to_string())),
376 }
377 }
378
379 Ok(())
380 }
381
382 async fn do_fetch_orchestration_item(
384 &self,
385 lock_timeout: Duration,
386 filter: Option<&DispatcherCapabilityFilter>,
387 ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
388 let start = std::time::Instant::now();
389
390 const MAX_RETRIES: u32 = 3;
391 const RETRY_DELAY_MS: u64 = 50;
392
393 let lock_timeout_ms = lock_timeout.as_millis() as i64;
395 let mut _last_error: Option<ProviderError> = None;
396
397 let (min_packed, max_packed): (Option<i64>, Option<i64>) = if let Some(cap_filter) = filter
399 {
400 match cap_filter.supported_duroxide_versions.first() {
401 Some(range) => {
402 let min = range.min.major as i64 * 1_000_000
403 + range.min.minor as i64 * 1_000
404 + range.min.patch as i64;
405 let max = range.max.major as i64 * 1_000_000
406 + range.max.minor as i64 * 1_000
407 + range.max.patch as i64;
408 (Some(min), Some(max))
409 }
410 None => {
411 return Ok(None);
413 }
414 }
415 } else {
416 (None, None)
417 };
418
419 for attempt in 0..=MAX_RETRIES {
420 let now_ms = self.now_millis();
421
422 let _timer = DbCallTimer::new(
423 DbOperation::StoredProcedure,
424 Some("fetch_orchestration_item"),
425 );
426 #[allow(clippy::type_complexity)]
427 let result: Result<
428 Option<(
429 String,
430 String,
431 String,
432 i64,
433 serde_json::Value,
434 serde_json::Value,
435 String,
436 i32,
437 serde_json::Value,
438 )>,
439 SqlxError,
440 > = sqlx::query_as(&format!(
441 "SELECT * FROM {}.fetch_orchestration_item($1, $2, $3, $4)",
442 self.schema_name
443 ))
444 .bind(now_ms)
445 .bind(lock_timeout_ms)
446 .bind(min_packed)
447 .bind(max_packed)
448 .fetch_optional(&*self.pool)
449 .await;
450
451 let row = match result {
452 Ok(r) => r,
453 Err(e) => {
454 let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
455 if provider_err.is_retryable() && attempt < MAX_RETRIES {
456 warn!(
457 target = "duroxide::providers::postgres",
458 operation = "fetch_orchestration_item",
459 attempt = attempt + 1,
460 error = %provider_err,
461 "Retryable error, will retry"
462 );
463 _last_error = Some(provider_err);
464 sleep(std::time::Duration::from_millis(
465 RETRY_DELAY_MS * (attempt as u64 + 1),
466 ))
467 .await;
468 continue;
469 }
470 return Err(provider_err);
471 }
472 };
473
474 if let Some((
475 instance_id,
476 orchestration_name,
477 orchestration_version,
478 execution_id,
479 history_json,
480 messages_json,
481 lock_token,
482 attempt_count,
483 kv_snapshot_json,
484 )) = row
485 {
486 let (history, history_error) =
487 match serde_json::from_value::<Vec<Event>>(history_json) {
488 Ok(h) => (h, None),
489 Err(e) => {
490 let error_msg = format!("Failed to deserialize history: {e}");
491 tracing::warn!(
492 target = "duroxide::providers::postgres",
493 instance = %instance_id,
494 error = %error_msg,
495 "History deserialization failed, returning item with history_error"
496 );
497 (vec![], Some(error_msg))
498 }
499 };
500
501 let messages: Vec<WorkItem> =
502 serde_json::from_value(messages_json).map_err(|e| {
503 ProviderError::permanent(
504 "fetch_orchestration_item",
505 format!("Failed to deserialize messages: {e}"),
506 )
507 })?;
508 let kv_snapshot: std::collections::HashMap<String, duroxide::providers::KvEntry> = {
509 let raw: std::collections::HashMap<String, serde_json::Value> =
510 serde_json::from_value(kv_snapshot_json).unwrap_or_default();
511 raw.into_iter()
512 .filter_map(|(k, v)| {
513 let value = v.get("value")?.as_str()?.to_string();
514 let last_updated_at_ms =
515 v.get("last_updated_at_ms")?.as_u64().unwrap_or(0);
516 Some((
517 k,
518 duroxide::providers::KvEntry {
519 value,
520 last_updated_at_ms,
521 },
522 ))
523 })
524 .collect()
525 };
526
527 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
528 debug!(
529 target = "duroxide::providers::postgres",
530 operation = "fetch_orchestration_item",
531 instance_id = %instance_id,
532 execution_id = execution_id,
533 message_count = messages.len(),
534 history_count = history.len(),
535 attempt_count = attempt_count,
536 duration_ms = duration_ms,
537 attempts = attempt + 1,
538 "Fetched orchestration item via stored procedure"
539 );
540
541 record_fetch_result(FetchType::Orchestration, 1, duration_ms);
543
544 if orchestration_name == "Unknown"
550 && history.is_empty()
551 && messages
552 .iter()
553 .all(|m| matches!(m, WorkItem::QueueMessage { .. }))
554 {
555 let message_count = messages.len();
556 tracing::warn!(
557 target = "duroxide::providers::postgres",
558 instance = %instance_id,
559 message_count,
560 "Dropping orphan queue messages — events enqueued before orchestration started are not supported"
561 );
562 self.ack_orchestration_item(
563 &lock_token,
564 execution_id as u64,
565 vec![],
566 vec![],
567 vec![],
568 ExecutionMetadata::default(),
569 vec![],
570 )
571 .await?;
572 return Ok(None);
573 }
574
575 return Ok(Some((
576 OrchestrationItem {
577 instance: instance_id,
578 orchestration_name,
579 execution_id: execution_id as u64,
580 version: orchestration_version,
581 history,
582 messages,
583 history_error,
584 kv_snapshot,
585 },
586 lock_token,
587 attempt_count as u32,
588 )));
589 }
590
591 break;
594 }
595
596 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
598 record_fetch_result(FetchType::Orchestration, 0, duration_ms);
599
600 Ok(None)
601 }
602
603 async fn do_fetch_work_item(
605 &self,
606 lock_timeout: Duration,
607 session: Option<&SessionFetchConfig>,
608 tag_filter: &TagFilter,
609 ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
610 let start = std::time::Instant::now();
611
612 let lock_timeout_ms = lock_timeout.as_millis() as i64;
614
615 let (owner_id, session_lock_timeout_ms): (Option<&str>, Option<i64>) = match session {
617 Some(config) => (
618 Some(&config.owner_id),
619 Some(config.lock_timeout.as_millis() as i64),
620 ),
621 None => (None, None),
622 };
623
624 let (tag_filter_values, tag_mode): (Option<Vec<String>>, &str) = match tag_filter {
625 TagFilter::DefaultOnly => (None, "default_only"),
626 TagFilter::Tags(tags) => (Some(tags.iter().cloned().collect()), "tags"),
627 TagFilter::DefaultAnd(tags) => (Some(tags.iter().cloned().collect()), "default_and"),
628 TagFilter::Any => (None, "any"),
629 TagFilter::None => (None, "none"),
630 };
631
632 let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("fetch_work_item"));
633 let row = match sqlx::query_as::<_, (String, String, i32)>(&format!(
635 "SELECT * FROM {}.fetch_work_item($1, $2, $3, $4, $5, $6)",
636 self.schema_name
637 ))
638 .bind(self.now_millis())
639 .bind(lock_timeout_ms)
640 .bind(owner_id)
641 .bind(session_lock_timeout_ms)
642 .bind(tag_filter_values)
643 .bind(tag_mode)
644 .fetch_optional(&*self.pool)
645 .await
646 {
647 Ok(row) => row,
648 Err(e) => {
649 return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
650 }
651 };
652
653 let (work_item_json, lock_token, attempt_count) = match row {
654 Some(row) => row,
655 None => {
656 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
658 record_fetch_result(FetchType::WorkItem, 0, duration_ms);
659 return Ok(None);
660 }
661 };
662
663 let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
664 ProviderError::permanent(
665 "fetch_work_item",
666 format!("Failed to deserialize worker item: {e}"),
667 )
668 })?;
669
670 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
671
672 let instance_id = match &work_item {
674 WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
675 WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
676 WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
677 WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
678 WorkItem::TimerFired { instance, .. } => instance.as_str(),
679 WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
680 WorkItem::CancelInstance { instance, .. } => instance.as_str(),
681 WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
682 WorkItem::SubOrchCompleted {
683 parent_instance, ..
684 } => parent_instance.as_str(),
685 WorkItem::SubOrchFailed {
686 parent_instance, ..
687 } => parent_instance.as_str(),
688 WorkItem::QueueMessage { instance, .. } => instance.as_str(),
689 };
690
691 debug!(
692 target = "duroxide::providers::postgres",
693 operation = "fetch_work_item",
694 instance_id = %instance_id,
695 attempt_count = attempt_count,
696 duration_ms = duration_ms,
697 "Fetched activity work item via stored procedure"
698 );
699
700 record_fetch_result(FetchType::WorkItem, 1, duration_ms);
702
703 Ok(Some((work_item, lock_token, attempt_count as u32)))
704 }
705}
706
707impl Drop for PostgresProvider {
708 fn drop(&mut self) {
709 if let Some(handle) = self.notifier_handle.take() {
711 handle.abort();
712 }
713 }
714}
715
716#[async_trait::async_trait]
717impl Provider for PostgresProvider {
718 fn name(&self) -> &str {
719 env!("CARGO_PKG_NAME")
720 }
721
722 fn version(&self) -> &str {
723 env!("CARGO_PKG_VERSION")
724 }
725
726 #[instrument(skip(self), target = "duroxide::providers::postgres")]
727 async fn fetch_orchestration_item(
728 &self,
729 lock_timeout: Duration,
730 poll_timeout: Duration,
731 filter: Option<&DispatcherCapabilityFilter>,
732 ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
733 if poll_timeout.is_zero() {
737 return self.do_fetch_orchestration_item(lock_timeout, filter).await;
738 }
739
740 if let Some(notify) = &self.orch_notify {
742 let notified = notify.notified();
746 tokio::pin!(notified);
747 notified.as_mut().enable();
748
749 let result = self
751 .do_fetch_orchestration_item(lock_timeout, filter)
752 .await?;
753 if result.is_some() {
754 return Ok(result);
755 }
756
757 tokio::select! {
761 _ = &mut notified => {
762 return self.do_fetch_orchestration_item(lock_timeout, filter).await;
764 }
765 _ = tokio::time::sleep(poll_timeout) => {
766 return Ok(None);
768 }
769 }
770 }
771
772 self.do_fetch_orchestration_item(lock_timeout, filter).await
774 }
775
776 #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
777 async fn ack_orchestration_item(
778 &self,
779 lock_token: &str,
780 execution_id: u64,
781 history_delta: Vec<Event>,
782 worker_items: Vec<WorkItem>,
783 orchestrator_items: Vec<WorkItem>,
784 metadata: ExecutionMetadata,
785 cancelled_activities: Vec<ScheduledActivityIdentifier>,
786 ) -> Result<(), ProviderError> {
787 let start = std::time::Instant::now();
788
789 const MAX_RETRIES: u32 = 3;
790 const RETRY_DELAY_MS: u64 = 50;
791
792 let mut history_delta_payload = Vec::with_capacity(history_delta.len());
793 for event in &history_delta {
794 if event.event_id() == 0 {
795 return Err(ProviderError::permanent(
796 "ack_orchestration_item",
797 "event_id must be set by runtime",
798 ));
799 }
800
801 let event_json = serde_json::to_string(event).map_err(|e| {
802 ProviderError::permanent(
803 "ack_orchestration_item",
804 format!("Failed to serialize event: {e}"),
805 )
806 })?;
807
808 let event_type = format!("{event:?}")
809 .split('{')
810 .next()
811 .unwrap_or("Unknown")
812 .trim()
813 .to_string();
814
815 history_delta_payload.push(serde_json::json!({
816 "event_id": event.event_id(),
817 "event_type": event_type,
818 "event_data": event_json,
819 }));
820 }
821
822 let history_delta_json = serde_json::Value::Array(history_delta_payload);
823
824 let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
825 ProviderError::permanent(
826 "ack_orchestration_item",
827 format!("Failed to serialize worker items: {e}"),
828 )
829 })?;
830
831 let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
832 ProviderError::permanent(
833 "ack_orchestration_item",
834 format!("Failed to serialize orchestrator items: {e}"),
835 )
836 })?;
837
838 let (custom_status_action, custom_status_value): (Option<&str>, Option<&str>) = {
840 let mut last_status: Option<&Option<String>> = None;
841 for event in &history_delta {
842 if let EventKind::CustomStatusUpdated { ref status } = event.kind {
843 last_status = Some(status);
844 }
845 }
846 match last_status {
847 Some(Some(s)) => (Some("set"), Some(s.as_str())),
848 Some(None) => (Some("clear"), None),
849 None => (None, None),
850 }
851 };
852
853 let kv_mutations: Vec<serde_json::Value> = history_delta
854 .iter()
855 .filter_map(|event| match &event.kind {
856 duroxide::EventKind::KeyValueSet {
857 key,
858 value,
859 last_updated_at_ms,
860 } => Some(serde_json::json!({
861 "action": "set",
862 "key": key,
863 "value": value,
864 "last_updated_at_ms": last_updated_at_ms,
865 })),
866 duroxide::EventKind::KeyValueCleared { key } => Some(serde_json::json!({
867 "action": "clear_key",
868 "key": key,
869 })),
870 duroxide::EventKind::KeyValuesCleared => Some(serde_json::json!({
871 "action": "clear_all",
872 })),
873 _ => None,
874 })
875 .collect();
876
877 let metadata_json = serde_json::json!({
878 "orchestration_name": metadata.orchestration_name,
879 "orchestration_version": metadata.orchestration_version,
880 "status": metadata.status,
881 "output": metadata.output,
882 "parent_instance_id": metadata.parent_instance_id,
883 "pinned_duroxide_version": metadata.pinned_duroxide_version.as_ref().map(|v| {
884 serde_json::json!({
885 "major": v.major,
886 "minor": v.minor,
887 "patch": v.patch,
888 })
889 }),
890 "custom_status_action": custom_status_action,
891 "custom_status_value": custom_status_value,
892 "kv_mutations": kv_mutations,
893 });
894
895 let cancelled_activities_json = serde_json::Value::Array(
903 cancelled_activities
904 .iter()
905 .map(|sa| {
906 serde_json::json!({
907 "execution_id": sa.execution_id,
908 "activity_id": sa.activity_id
909 })
910 })
911 .collect(),
912 );
913
914 let now_ms = self.now_millis();
915
916 for attempt in 0..=MAX_RETRIES {
917 let _timer =
918 DbCallTimer::new(DbOperation::StoredProcedure, Some("ack_orchestration_item"));
919 let result = sqlx::query(&format!(
920 "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6, $7, $8)",
921 self.schema_name
922 ))
923 .bind(lock_token)
924 .bind(now_ms)
925 .bind(execution_id as i64)
926 .bind(&history_delta_json)
927 .bind(&worker_items_json)
928 .bind(&orchestrator_items_json)
929 .bind(&metadata_json)
930 .bind(&cancelled_activities_json)
931 .execute(&*self.pool)
932 .await;
933
934 match result {
935 Ok(_) => {
936 let duration_ms = start.elapsed().as_millis() as u64;
937 debug!(
938 target = "duroxide::providers::postgres",
939 operation = "ack_orchestration_item",
940 execution_id = execution_id,
941 history_count = history_delta.len(),
942 worker_items_count = worker_items.len(),
943 orchestrator_items_count = orchestrator_items.len(),
944 duration_ms = duration_ms,
945 attempts = attempt + 1,
946 "Acknowledged orchestration item via stored procedure"
947 );
948 return Ok(());
949 }
950 Err(e) => {
951 if let SqlxError::Database(db_err) = &e {
953 if db_err.message().contains("Invalid lock token") {
954 return Err(ProviderError::permanent(
955 "ack_orchestration_item",
956 "Invalid lock token",
957 ));
958 }
959 } else if e.to_string().contains("Invalid lock token") {
960 return Err(ProviderError::permanent(
961 "ack_orchestration_item",
962 "Invalid lock token",
963 ));
964 }
965
966 let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
967 if provider_err.is_retryable() && attempt < MAX_RETRIES {
968 warn!(
969 target = "duroxide::providers::postgres",
970 operation = "ack_orchestration_item",
971 attempt = attempt + 1,
972 error = %provider_err,
973 "Retryable error, will retry"
974 );
975 sleep(std::time::Duration::from_millis(
976 RETRY_DELAY_MS * (attempt as u64 + 1),
977 ))
978 .await;
979 continue;
980 }
981 return Err(provider_err);
982 }
983 }
984 }
985
986 Ok(())
988 }
989 #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
990 async fn abandon_orchestration_item(
991 &self,
992 lock_token: &str,
993 delay: Option<Duration>,
994 ignore_attempt: bool,
995 ) -> Result<(), ProviderError> {
996 let start = std::time::Instant::now();
997 let now_ms = self.now_millis();
998 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
999
1000 let _timer = DbCallTimer::new(
1001 DbOperation::StoredProcedure,
1002 Some("abandon_orchestration_item"),
1003 );
1004 let instance_id = match sqlx::query_scalar::<_, String>(&format!(
1005 "SELECT {}.abandon_orchestration_item($1, $2, $3, $4)",
1006 self.schema_name
1007 ))
1008 .bind(lock_token)
1009 .bind(now_ms)
1010 .bind(delay_param)
1011 .bind(ignore_attempt)
1012 .fetch_one(&*self.pool)
1013 .await
1014 {
1015 Ok(instance_id) => instance_id,
1016 Err(e) => {
1017 if let SqlxError::Database(db_err) = &e {
1018 if db_err.message().contains("Invalid lock token") {
1019 return Err(ProviderError::permanent(
1020 "abandon_orchestration_item",
1021 "Invalid lock token",
1022 ));
1023 }
1024 } else if e.to_string().contains("Invalid lock token") {
1025 return Err(ProviderError::permanent(
1026 "abandon_orchestration_item",
1027 "Invalid lock token",
1028 ));
1029 }
1030
1031 return Err(Self::sqlx_to_provider_error(
1032 "abandon_orchestration_item",
1033 e,
1034 ));
1035 }
1036 };
1037
1038 let duration_ms = start.elapsed().as_millis() as u64;
1039 debug!(
1040 target = "duroxide::providers::postgres",
1041 operation = "abandon_orchestration_item",
1042 instance_id = %instance_id,
1043 delay_ms = delay.map(|d| d.as_millis() as u64),
1044 ignore_attempt = ignore_attempt,
1045 duration_ms = duration_ms,
1046 "Abandoned orchestration item via stored procedure"
1047 );
1048
1049 Ok(())
1050 }
1051
1052 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1053 async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1054 let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("fetch_history"));
1055 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1056 "SELECT out_event_data FROM {}.fetch_history($1)",
1057 self.schema_name
1058 ))
1059 .bind(instance)
1060 .fetch_all(&*self.pool)
1061 .await
1062 .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
1063
1064 Ok(event_data_rows
1065 .into_iter()
1066 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1067 .collect())
1068 }
1069
1070 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1071 async fn append_with_execution(
1072 &self,
1073 instance: &str,
1074 execution_id: u64,
1075 new_events: Vec<Event>,
1076 ) -> Result<(), ProviderError> {
1077 if new_events.is_empty() {
1078 return Ok(());
1079 }
1080
1081 let mut events_payload = Vec::with_capacity(new_events.len());
1082 for event in &new_events {
1083 if event.event_id() == 0 {
1084 error!(
1085 target = "duroxide::providers::postgres",
1086 operation = "append_with_execution",
1087 error_type = "validation_error",
1088 instance_id = %instance,
1089 execution_id = execution_id,
1090 "event_id must be set by runtime"
1091 );
1092 return Err(ProviderError::permanent(
1093 "append_with_execution",
1094 "event_id must be set by runtime",
1095 ));
1096 }
1097
1098 let event_json = serde_json::to_string(event).map_err(|e| {
1099 ProviderError::permanent(
1100 "append_with_execution",
1101 format!("Failed to serialize event: {e}"),
1102 )
1103 })?;
1104
1105 let event_type = format!("{event:?}")
1106 .split('{')
1107 .next()
1108 .unwrap_or("Unknown")
1109 .trim()
1110 .to_string();
1111
1112 events_payload.push(serde_json::json!({
1113 "event_id": event.event_id(),
1114 "event_type": event_type,
1115 "event_data": event_json,
1116 }));
1117 }
1118
1119 let events_json = serde_json::Value::Array(events_payload);
1120 let now_ms = self.now_millis();
1121
1122 let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("append_history"));
1123 sqlx::query(&format!(
1124 "SELECT {}.append_history($1, $2, $3, $4)",
1125 self.schema_name
1126 ))
1127 .bind(instance)
1128 .bind(execution_id as i64)
1129 .bind(events_json)
1130 .bind(now_ms)
1131 .execute(&*self.pool)
1132 .await
1133 .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
1134
1135 debug!(
1136 target = "duroxide::providers::postgres",
1137 operation = "append_with_execution",
1138 instance_id = %instance,
1139 execution_id = execution_id,
1140 event_count = new_events.len(),
1141 "Appended history events via stored procedure"
1142 );
1143
1144 Ok(())
1145 }
1146
1147 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1148 async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
1149 let work_item = serde_json::to_string(&item).map_err(|e| {
1150 ProviderError::permanent(
1151 "enqueue_worker_work",
1152 format!("Failed to serialize work item: {e}"),
1153 )
1154 })?;
1155
1156 let now_ms = self.now_millis();
1157
1158 let (session_id, tag) = match &item {
1160 WorkItem::ActivityExecute {
1161 session_id, tag, ..
1162 } => (session_id.clone(), tag.clone()),
1163 _ => (None, None),
1164 };
1165
1166 let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("enqueue_worker_work"));
1167 sqlx::query(&format!(
1168 "SELECT {}.enqueue_worker_work($1, $2, $3, $4)",
1169 self.schema_name
1170 ))
1171 .bind(work_item)
1172 .bind(now_ms)
1173 .bind(&session_id)
1174 .bind(&tag)
1175 .execute(&*self.pool)
1176 .await
1177 .map_err(|e| {
1178 error!(
1179 target = "duroxide::providers::postgres",
1180 operation = "enqueue_worker_work",
1181 error_type = "database_error",
1182 error = %e,
1183 "Failed to enqueue worker work"
1184 );
1185 Self::sqlx_to_provider_error("enqueue_worker_work", e)
1186 })?;
1187
1188 Ok(())
1189 }
1190
1191 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1192 async fn fetch_work_item(
1193 &self,
1194 lock_timeout: Duration,
1195 poll_timeout: Duration,
1196 session: Option<&SessionFetchConfig>,
1197 tag_filter: &TagFilter,
1198 ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
1199 if poll_timeout.is_zero() {
1203 return self
1204 .do_fetch_work_item(lock_timeout, session, tag_filter)
1205 .await;
1206 }
1207
1208 if let Some(notify) = &self.worker_notify {
1210 let notified = notify.notified();
1212 tokio::pin!(notified);
1213 notified.as_mut().enable();
1214
1215 let result = self
1217 .do_fetch_work_item(lock_timeout, session, tag_filter)
1218 .await?;
1219 if result.is_some() {
1220 return Ok(result);
1221 }
1222
1223 tokio::select! {
1225 _ = &mut notified => {
1226 return self.do_fetch_work_item(lock_timeout, session, tag_filter).await;
1228 }
1229 _ = tokio::time::sleep(poll_timeout) => {
1230 return Ok(None);
1232 }
1233 }
1234 }
1235
1236 self.do_fetch_work_item(lock_timeout, session, tag_filter)
1238 .await
1239 }
1240
1241 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1242 async fn ack_work_item(
1243 &self,
1244 token: &str,
1245 completion: Option<WorkItem>,
1246 ) -> Result<(), ProviderError> {
1247 let start = std::time::Instant::now();
1248
1249 let (instance_id, completion_json): (Option<String>, Option<String>) = match &completion {
1251 Some(WorkItem::ActivityCompleted { instance, .. })
1252 | Some(WorkItem::ActivityFailed { instance, .. }) => {
1253 let json = serde_json::to_string(&completion).map_err(|e| {
1254 ProviderError::permanent(
1255 "ack_worker",
1256 format!("Failed to serialize completion: {e}"),
1257 )
1258 })?;
1259 (Some(instance.clone()), Some(json))
1260 }
1261 Some(_) => {
1262 error!(
1263 target = "duroxide::providers::postgres",
1264 operation = "ack_worker",
1265 error_type = "invalid_completion_type",
1266 "Invalid completion work item type"
1267 );
1268 return Err(ProviderError::permanent(
1269 "ack_worker",
1270 "Invalid completion work item type",
1271 ));
1272 }
1273 None => (None, None), };
1275
1276 let now_ms = self.now_millis();
1277
1278 let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("ack_worker"));
1280 sqlx::query(&format!(
1281 "SELECT {}.ack_worker($1, $2, $3, $4)",
1282 self.schema_name
1283 ))
1284 .bind(token)
1285 .bind(&instance_id)
1286 .bind(&completion_json)
1287 .bind(now_ms)
1288 .execute(&*self.pool)
1289 .await
1290 .map_err(|e| {
1291 if e.to_string().contains("Worker queue item not found") {
1292 error!(
1293 target = "duroxide::providers::postgres",
1294 operation = "ack_worker",
1295 error_type = "worker_item_not_found",
1296 token = %token,
1297 "Worker queue item not found or already processed"
1298 );
1299 ProviderError::permanent(
1300 "ack_worker",
1301 "Worker queue item not found or already processed",
1302 )
1303 } else {
1304 Self::sqlx_to_provider_error("ack_worker", e)
1305 }
1306 })?;
1307
1308 let duration_ms = start.elapsed().as_millis() as u64;
1309 debug!(
1310 target = "duroxide::providers::postgres",
1311 operation = "ack_worker",
1312 instance_id = ?instance_id,
1313 completion_provided = completion.is_some(),
1314 duration_ms = duration_ms,
1315 "Acknowledged worker item"
1316 );
1317
1318 Ok(())
1319 }
1320
1321 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1322 async fn renew_work_item_lock(
1323 &self,
1324 token: &str,
1325 extend_for: Duration,
1326 ) -> Result<(), ProviderError> {
1327 let start = std::time::Instant::now();
1328
1329 let now_ms = self.now_millis();
1331
1332 let extend_ms = extend_for.as_millis() as i64;
1334
1335 let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("renew_work_item_lock"));
1336 match sqlx::query(&format!(
1339 "SELECT {}.renew_work_item_lock($1, $2, $3)",
1340 self.schema_name
1341 ))
1342 .bind(token)
1343 .bind(now_ms)
1344 .bind(extend_ms)
1345 .execute(&*self.pool)
1346 .await
1347 {
1348 Ok(_) => {
1349 let duration_ms = start.elapsed().as_millis() as u64;
1350 debug!(
1351 target = "duroxide::providers::postgres",
1352 operation = "renew_work_item_lock",
1353 token = %token,
1354 extend_for_ms = extend_ms,
1355 duration_ms = duration_ms,
1356 "Renew work item lock completed successfully"
1357 );
1358 Ok(())
1359 }
1360 Err(e) => {
1361 if let SqlxError::Database(db_err) = &e {
1362 if db_err.message().contains("Lock token invalid") {
1363 return Err(ProviderError::permanent(
1364 "renew_work_item_lock",
1365 "Lock token invalid, expired, or already acked",
1366 ));
1367 }
1368 } else if e.to_string().contains("Lock token invalid") {
1369 return Err(ProviderError::permanent(
1370 "renew_work_item_lock",
1371 "Lock token invalid, expired, or already acked",
1372 ));
1373 }
1374
1375 Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
1376 }
1377 }
1378 }
1379
1380 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1381 async fn abandon_work_item(
1382 &self,
1383 token: &str,
1384 delay: Option<Duration>,
1385 ignore_attempt: bool,
1386 ) -> Result<(), ProviderError> {
1387 let start = std::time::Instant::now();
1388 let now_ms = self.now_millis();
1389 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
1390
1391 let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("abandon_work_item"));
1392 match sqlx::query(&format!(
1393 "SELECT {}.abandon_work_item($1, $2, $3, $4)",
1394 self.schema_name
1395 ))
1396 .bind(token)
1397 .bind(now_ms)
1398 .bind(delay_param)
1399 .bind(ignore_attempt)
1400 .execute(&*self.pool)
1401 .await
1402 {
1403 Ok(_) => {
1404 let duration_ms = start.elapsed().as_millis() as u64;
1405 debug!(
1406 target = "duroxide::providers::postgres",
1407 operation = "abandon_work_item",
1408 token = %token,
1409 delay_ms = delay.map(|d| d.as_millis() as u64),
1410 ignore_attempt = ignore_attempt,
1411 duration_ms = duration_ms,
1412 "Abandoned work item via stored procedure"
1413 );
1414 Ok(())
1415 }
1416 Err(e) => {
1417 if let SqlxError::Database(db_err) = &e {
1418 if db_err.message().contains("Invalid lock token")
1419 || db_err.message().contains("already acked")
1420 {
1421 return Err(ProviderError::permanent(
1422 "abandon_work_item",
1423 "Invalid lock token or already acked",
1424 ));
1425 }
1426 } else if e.to_string().contains("Invalid lock token")
1427 || e.to_string().contains("already acked")
1428 {
1429 return Err(ProviderError::permanent(
1430 "abandon_work_item",
1431 "Invalid lock token or already acked",
1432 ));
1433 }
1434
1435 Err(Self::sqlx_to_provider_error("abandon_work_item", e))
1436 }
1437 }
1438 }
1439
1440 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1441 async fn renew_orchestration_item_lock(
1442 &self,
1443 token: &str,
1444 extend_for: Duration,
1445 ) -> Result<(), ProviderError> {
1446 let start = std::time::Instant::now();
1447
1448 let now_ms = self.now_millis();
1450
1451 let extend_ms = extend_for.as_millis() as i64;
1453
1454 let _timer = DbCallTimer::new(
1455 DbOperation::StoredProcedure,
1456 Some("renew_orchestration_item_lock"),
1457 );
1458 match sqlx::query(&format!(
1459 "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
1460 self.schema_name
1461 ))
1462 .bind(token)
1463 .bind(now_ms)
1464 .bind(extend_ms)
1465 .execute(&*self.pool)
1466 .await
1467 {
1468 Ok(_) => {
1469 let duration_ms = start.elapsed().as_millis() as u64;
1470 debug!(
1471 target = "duroxide::providers::postgres",
1472 operation = "renew_orchestration_item_lock",
1473 token = %token,
1474 extend_for_ms = extend_ms,
1475 duration_ms = duration_ms,
1476 "Orchestration item lock renewed successfully"
1477 );
1478 Ok(())
1479 }
1480 Err(e) => {
1481 if let SqlxError::Database(db_err) = &e {
1482 if db_err.message().contains("Lock token invalid")
1483 || db_err.message().contains("expired")
1484 || db_err.message().contains("already released")
1485 {
1486 return Err(ProviderError::permanent(
1487 "renew_orchestration_item_lock",
1488 "Lock token invalid, expired, or already released",
1489 ));
1490 }
1491 } else if e.to_string().contains("Lock token invalid")
1492 || e.to_string().contains("expired")
1493 || e.to_string().contains("already released")
1494 {
1495 return Err(ProviderError::permanent(
1496 "renew_orchestration_item_lock",
1497 "Lock token invalid, expired, or already released",
1498 ));
1499 }
1500
1501 Err(Self::sqlx_to_provider_error(
1502 "renew_orchestration_item_lock",
1503 e,
1504 ))
1505 }
1506 }
1507 }
1508
1509 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1510 async fn enqueue_for_orchestrator(
1511 &self,
1512 item: WorkItem,
1513 delay: Option<Duration>,
1514 ) -> Result<(), ProviderError> {
1515 let work_item = serde_json::to_string(&item).map_err(|e| {
1516 ProviderError::permanent(
1517 "enqueue_orchestrator_work",
1518 format!("Failed to serialize work item: {e}"),
1519 )
1520 })?;
1521
1522 let instance_id = match &item {
1524 WorkItem::StartOrchestration { instance, .. }
1525 | WorkItem::ActivityCompleted { instance, .. }
1526 | WorkItem::ActivityFailed { instance, .. }
1527 | WorkItem::TimerFired { instance, .. }
1528 | WorkItem::ExternalRaised { instance, .. }
1529 | WorkItem::CancelInstance { instance, .. }
1530 | WorkItem::ContinueAsNew { instance, .. }
1531 | WorkItem::QueueMessage { instance, .. } => instance,
1532 WorkItem::SubOrchCompleted {
1533 parent_instance, ..
1534 }
1535 | WorkItem::SubOrchFailed {
1536 parent_instance, ..
1537 } => parent_instance,
1538 WorkItem::ActivityExecute { .. } => {
1539 return Err(ProviderError::permanent(
1540 "enqueue_orchestrator_work",
1541 "ActivityExecute should go to worker queue, not orchestrator queue",
1542 ));
1543 }
1544 };
1545
1546 let now_ms = self.now_millis();
1548
1549 let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
1550 if *fire_at_ms > 0 {
1551 if let Some(delay) = delay {
1553 std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
1554 } else {
1555 *fire_at_ms
1556 }
1557 } else {
1558 delay
1560 .map(|d| now_ms as u64 + d.as_millis() as u64)
1561 .unwrap_or(now_ms as u64)
1562 }
1563 } else {
1564 delay
1566 .map(|d| now_ms as u64 + d.as_millis() as u64)
1567 .unwrap_or(now_ms as u64)
1568 };
1569
1570 let visible_at = Utc
1571 .timestamp_millis_opt(visible_at_ms as i64)
1572 .single()
1573 .ok_or_else(|| {
1574 ProviderError::permanent(
1575 "enqueue_orchestrator_work",
1576 "Invalid visible_at timestamp",
1577 )
1578 })?;
1579
1580 let _timer = DbCallTimer::new(
1585 DbOperation::StoredProcedure,
1586 Some("enqueue_orchestrator_work"),
1587 );
1588 sqlx::query(&format!(
1589 "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6, $7)",
1590 self.schema_name
1591 ))
1592 .bind(instance_id)
1593 .bind(&work_item)
1594 .bind(visible_at)
1595 .bind(now_ms) .bind::<Option<String>>(None) .bind::<Option<String>>(None) .bind::<Option<i64>>(None) .execute(&*self.pool)
1600 .await
1601 .map_err(|e| {
1602 error!(
1603 target = "duroxide::providers::postgres",
1604 operation = "enqueue_orchestrator_work",
1605 error_type = "database_error",
1606 error = %e,
1607 instance_id = %instance_id,
1608 "Failed to enqueue orchestrator work"
1609 );
1610 Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1611 })?;
1612
1613 debug!(
1614 target = "duroxide::providers::postgres",
1615 operation = "enqueue_orchestrator_work",
1616 instance_id = %instance_id,
1617 delay_ms = delay.map(|d| d.as_millis() as u64),
1618 "Enqueued orchestrator work"
1619 );
1620
1621 Ok(())
1622 }
1623
1624 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1625 async fn read_with_execution(
1626 &self,
1627 instance: &str,
1628 execution_id: u64,
1629 ) -> Result<Vec<Event>, ProviderError> {
1630 let _timer = DbCallTimer::new(DbOperation::Select, None);
1631 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1632 "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1633 self.table_name("history")
1634 ))
1635 .bind(instance)
1636 .bind(execution_id as i64)
1637 .fetch_all(&*self.pool)
1638 .await
1639 .ok()
1640 .unwrap_or_default();
1641
1642 Ok(event_data_rows
1643 .into_iter()
1644 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1645 .collect())
1646 }
1647
1648 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1649 async fn renew_session_lock(
1650 &self,
1651 owner_ids: &[&str],
1652 extend_for: Duration,
1653 idle_timeout: Duration,
1654 ) -> Result<usize, ProviderError> {
1655 if owner_ids.is_empty() {
1656 return Ok(0);
1657 }
1658
1659 let now_ms = self.now_millis();
1660 let extend_ms = extend_for.as_millis() as i64;
1661 let idle_timeout_ms = idle_timeout.as_millis() as i64;
1662 let owner_ids_vec: Vec<&str> = owner_ids.to_vec();
1663
1664 let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("renew_session_lock"));
1665 let result = sqlx::query_scalar::<_, i64>(&format!(
1666 "SELECT {}.renew_session_lock($1, $2, $3, $4)",
1667 self.schema_name
1668 ))
1669 .bind(&owner_ids_vec)
1670 .bind(now_ms)
1671 .bind(extend_ms)
1672 .bind(idle_timeout_ms)
1673 .fetch_one(&*self.pool)
1674 .await
1675 .map_err(|e| Self::sqlx_to_provider_error("renew_session_lock", e))?;
1676
1677 debug!(
1678 target = "duroxide::providers::postgres",
1679 operation = "renew_session_lock",
1680 owner_count = owner_ids.len(),
1681 sessions_renewed = result,
1682 "Session locks renewed"
1683 );
1684
1685 Ok(result as usize)
1686 }
1687
1688 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1689 async fn cleanup_orphaned_sessions(
1690 &self,
1691 _idle_timeout: Duration,
1692 ) -> Result<usize, ProviderError> {
1693 let now_ms = self.now_millis();
1694
1695 let _timer = DbCallTimer::new(
1696 DbOperation::StoredProcedure,
1697 Some("cleanup_orphaned_sessions"),
1698 );
1699 let result = sqlx::query_scalar::<_, i64>(&format!(
1700 "SELECT {}.cleanup_orphaned_sessions($1)",
1701 self.schema_name
1702 ))
1703 .bind(now_ms)
1704 .fetch_one(&*self.pool)
1705 .await
1706 .map_err(|e| Self::sqlx_to_provider_error("cleanup_orphaned_sessions", e))?;
1707
1708 debug!(
1709 target = "duroxide::providers::postgres",
1710 operation = "cleanup_orphaned_sessions",
1711 sessions_cleaned = result,
1712 "Orphaned sessions cleaned up"
1713 );
1714
1715 Ok(result as usize)
1716 }
1717
1718 fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1719 Some(self)
1720 }
1721
1722 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1723 async fn get_custom_status(
1724 &self,
1725 instance: &str,
1726 last_seen_version: u64,
1727 ) -> Result<Option<(Option<String>, u64)>, ProviderError> {
1728 let row = sqlx::query_as::<_, (Option<String>, i64)>(&format!(
1729 "SELECT * FROM {}.get_custom_status($1, $2)",
1730 self.schema_name
1731 ))
1732 .bind(instance)
1733 .bind(last_seen_version as i64)
1734 .fetch_optional(&*self.pool)
1735 .await
1736 .map_err(|e| Self::sqlx_to_provider_error("get_custom_status", e))?;
1737
1738 match row {
1739 Some((custom_status, version)) => Ok(Some((custom_status, version as u64))),
1740 None => Ok(None),
1741 }
1742 }
1743
1744 async fn get_kv_value(
1745 &self,
1746 instance_id: &str,
1747 key: &str,
1748 ) -> Result<Option<String>, ProviderError> {
1749 let query = format!(
1750 "SELECT value FROM {}.kv_store WHERE instance_id = $1 AND key = $2",
1751 self.schema_name
1752 );
1753 let result: Option<(String,)> = sqlx::query_as(&query)
1754 .bind(instance_id)
1755 .bind(key)
1756 .fetch_optional(&*self.pool)
1757 .await
1758 .map_err(|e| ProviderError::retryable("get_kv_value", format!("get_kv_value: {e}")))?;
1759 Ok(result.map(|(value,)| value))
1760 }
1761
1762 async fn get_kv_all_values(
1763 &self,
1764 instance_id: &str,
1765 ) -> Result<std::collections::HashMap<String, String>, ProviderError> {
1766 let query = format!(
1767 "SELECT key, value FROM {}.kv_store WHERE instance_id = $1",
1768 self.schema_name
1769 );
1770 let rows: Vec<(String, String)> = sqlx::query_as(&query)
1771 .bind(instance_id)
1772 .fetch_all(&*self.pool)
1773 .await
1774 .map_err(|e| Self::sqlx_to_provider_error("get_kv_all_values", e))?;
1775 Ok(rows.into_iter().collect())
1776 }
1777}
1778
1779#[async_trait::async_trait]
1780impl ProviderAdmin for PostgresProvider {
1781 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1782 async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1783 let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("list_instances"));
1784 sqlx::query_scalar(&format!(
1785 "SELECT instance_id FROM {}.list_instances()",
1786 self.schema_name
1787 ))
1788 .fetch_all(&*self.pool)
1789 .await
1790 .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1791 }
1792
1793 #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1794 async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1795 let _timer = DbCallTimer::new(
1796 DbOperation::StoredProcedure,
1797 Some("list_instances_by_status"),
1798 );
1799 sqlx::query_scalar(&format!(
1800 "SELECT instance_id FROM {}.list_instances_by_status($1)",
1801 self.schema_name
1802 ))
1803 .bind(status)
1804 .fetch_all(&*self.pool)
1805 .await
1806 .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1807 }
1808
1809 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1810 async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1811 let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("list_executions"));
1812 let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1813 "SELECT execution_id FROM {}.list_executions($1)",
1814 self.schema_name
1815 ))
1816 .bind(instance)
1817 .fetch_all(&*self.pool)
1818 .await
1819 .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1820
1821 Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1822 }
1823
1824 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1825 async fn read_history_with_execution_id(
1826 &self,
1827 instance: &str,
1828 execution_id: u64,
1829 ) -> Result<Vec<Event>, ProviderError> {
1830 let _timer = DbCallTimer::new(
1831 DbOperation::StoredProcedure,
1832 Some("fetch_history_with_execution"),
1833 );
1834 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1835 "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1836 self.schema_name
1837 ))
1838 .bind(instance)
1839 .bind(execution_id as i64)
1840 .fetch_all(&*self.pool)
1841 .await
1842 .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1843
1844 event_data_rows
1845 .into_iter()
1846 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1847 .collect::<Vec<Event>>()
1848 .into_iter()
1849 .map(Ok)
1850 .collect()
1851 }
1852
1853 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1854 async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1855 let execution_id = self.latest_execution_id(instance).await?;
1856 self.read_history_with_execution_id(instance, execution_id)
1857 .await
1858 }
1859
1860 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1861 async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1862 let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("latest_execution_id"));
1863 sqlx::query_scalar(&format!(
1864 "SELECT {}.latest_execution_id($1)",
1865 self.schema_name
1866 ))
1867 .bind(instance)
1868 .fetch_optional(&*self.pool)
1869 .await
1870 .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1871 .map(|id: i64| id as u64)
1872 .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1873 }
1874
1875 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1876 async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1877 let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("get_instance_info"));
1878 let row: Option<(
1879 String,
1880 String,
1881 String,
1882 i64,
1883 chrono::DateTime<Utc>,
1884 Option<chrono::DateTime<Utc>>,
1885 Option<String>,
1886 Option<String>,
1887 Option<String>,
1888 )> = sqlx::query_as(&format!(
1889 "SELECT * FROM {}.get_instance_info($1)",
1890 self.schema_name
1891 ))
1892 .bind(instance)
1893 .fetch_optional(&*self.pool)
1894 .await
1895 .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1896
1897 let (
1898 instance_id,
1899 orchestration_name,
1900 orchestration_version,
1901 current_execution_id,
1902 created_at,
1903 updated_at,
1904 status,
1905 output,
1906 parent_instance_id,
1907 ) =
1908 row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1909
1910 Ok(InstanceInfo {
1911 instance_id,
1912 orchestration_name,
1913 orchestration_version,
1914 current_execution_id: current_execution_id as u64,
1915 status: status.unwrap_or_else(|| "Running".to_string()),
1916 output,
1917 created_at: created_at.timestamp_millis() as u64,
1918 updated_at: updated_at
1919 .map(|dt| dt.timestamp_millis() as u64)
1920 .unwrap_or(created_at.timestamp_millis() as u64),
1921 parent_instance_id,
1922 })
1923 }
1924
1925 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1926 async fn get_execution_info(
1927 &self,
1928 instance: &str,
1929 execution_id: u64,
1930 ) -> Result<ExecutionInfo, ProviderError> {
1931 let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("get_execution_info"));
1932 let row: Option<(
1933 i64,
1934 String,
1935 Option<String>,
1936 chrono::DateTime<Utc>,
1937 Option<chrono::DateTime<Utc>>,
1938 i64,
1939 )> = sqlx::query_as(&format!(
1940 "SELECT * FROM {}.get_execution_info($1, $2)",
1941 self.schema_name
1942 ))
1943 .bind(instance)
1944 .bind(execution_id as i64)
1945 .fetch_optional(&*self.pool)
1946 .await
1947 .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1948
1949 let (exec_id, status, output, started_at, completed_at, event_count) = row
1950 .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1951
1952 Ok(ExecutionInfo {
1953 execution_id: exec_id as u64,
1954 status,
1955 output,
1956 started_at: started_at.timestamp_millis() as u64,
1957 completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1958 event_count: event_count as usize,
1959 })
1960 }
1961
1962 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1963 async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1964 let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("get_system_metrics"));
1965 let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1966 "SELECT * FROM {}.get_system_metrics()",
1967 self.schema_name
1968 ))
1969 .fetch_optional(&*self.pool)
1970 .await
1971 .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1972
1973 let (
1974 total_instances,
1975 total_executions,
1976 running_instances,
1977 completed_instances,
1978 failed_instances,
1979 total_events,
1980 ) = row.ok_or_else(|| {
1981 ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1982 })?;
1983
1984 Ok(SystemMetrics {
1985 total_instances: total_instances as u64,
1986 total_executions: total_executions as u64,
1987 running_instances: running_instances as u64,
1988 completed_instances: completed_instances as u64,
1989 failed_instances: failed_instances as u64,
1990 total_events: total_events as u64,
1991 })
1992 }
1993
1994 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1995 async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1996 let now_ms = self.now_millis();
1997
1998 let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("get_queue_depths"));
1999 let row: Option<(i64, i64)> = sqlx::query_as(&format!(
2000 "SELECT * FROM {}.get_queue_depths($1)",
2001 self.schema_name
2002 ))
2003 .bind(now_ms)
2004 .fetch_optional(&*self.pool)
2005 .await
2006 .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
2007
2008 let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
2009 ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
2010 })?;
2011
2012 Ok(QueueDepths {
2013 orchestrator_queue: orchestrator_queue as usize,
2014 worker_queue: worker_queue as usize,
2015 timer_queue: 0, })
2017 }
2018
2019 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
2022 async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError> {
2023 let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("list_children"));
2024 sqlx::query_scalar(&format!(
2025 "SELECT child_instance_id FROM {}.list_children($1)",
2026 self.schema_name
2027 ))
2028 .bind(instance_id)
2029 .fetch_all(&*self.pool)
2030 .await
2031 .map_err(|e| Self::sqlx_to_provider_error("list_children", e))
2032 }
2033
2034 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
2035 async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError> {
2036 let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("get_parent_id"));
2037 let result: Result<Option<String>, _> =
2038 sqlx::query_scalar(&format!("SELECT {}.get_parent_id($1)", self.schema_name))
2039 .bind(instance_id)
2040 .fetch_one(&*self.pool)
2041 .await;
2042
2043 match result {
2044 Ok(parent_id) => Ok(parent_id),
2045 Err(e) => {
2046 let err_str = e.to_string();
2047 if err_str.contains("Instance not found") {
2048 Err(ProviderError::permanent(
2049 "get_parent_id",
2050 format!("Instance not found: {instance_id}"),
2051 ))
2052 } else {
2053 Err(Self::sqlx_to_provider_error("get_parent_id", e))
2054 }
2055 }
2056 }
2057 }
2058
2059 #[instrument(skip(self), target = "duroxide::providers::postgres")]
2062 async fn delete_instances_atomic(
2063 &self,
2064 ids: &[String],
2065 force: bool,
2066 ) -> Result<DeleteInstanceResult, ProviderError> {
2067 if ids.is_empty() {
2068 return Ok(DeleteInstanceResult::default());
2069 }
2070
2071 let _timer = DbCallTimer::new(
2072 DbOperation::StoredProcedure,
2073 Some("delete_instances_atomic"),
2074 );
2075 let row: Option<(i64, i64, i64, i64)> = sqlx::query_as(&format!(
2076 "SELECT * FROM {}.delete_instances_atomic($1, $2)",
2077 self.schema_name
2078 ))
2079 .bind(ids)
2080 .bind(force)
2081 .fetch_optional(&*self.pool)
2082 .await
2083 .map_err(|e| {
2084 let err_str = e.to_string();
2085 if err_str.contains("is Running") || err_str.contains("Orphan detected") {
2086 ProviderError::permanent("delete_instances_atomic", err_str)
2087 } else {
2088 Self::sqlx_to_provider_error("delete_instances_atomic", e)
2089 }
2090 })?;
2091
2092 let (instances_deleted, executions_deleted, events_deleted, queue_messages_deleted) =
2093 row.unwrap_or((0, 0, 0, 0));
2094
2095 debug!(
2096 target = "duroxide::providers::postgres",
2097 operation = "delete_instances_atomic",
2098 instances_deleted = instances_deleted,
2099 executions_deleted = executions_deleted,
2100 events_deleted = events_deleted,
2101 queue_messages_deleted = queue_messages_deleted,
2102 "Deleted instances atomically"
2103 );
2104
2105 Ok(DeleteInstanceResult {
2106 instances_deleted: instances_deleted as u64,
2107 executions_deleted: executions_deleted as u64,
2108 events_deleted: events_deleted as u64,
2109 queue_messages_deleted: queue_messages_deleted as u64,
2110 })
2111 }
2112
2113 #[instrument(skip(self), target = "duroxide::providers::postgres")]
2114 async fn delete_instance_bulk(
2115 &self,
2116 filter: InstanceFilter,
2117 ) -> Result<DeleteInstanceResult, ProviderError> {
2118 let mut sql = format!(
2120 r#"
2121 SELECT i.instance_id
2122 FROM {}.instances i
2123 LEFT JOIN {}.executions e ON i.instance_id = e.instance_id
2124 AND i.current_execution_id = e.execution_id
2125 WHERE i.parent_instance_id IS NULL
2126 AND e.status IN ('Completed', 'Failed', 'ContinuedAsNew')
2127 "#,
2128 self.schema_name, self.schema_name
2129 );
2130
2131 if let Some(ref ids) = filter.instance_ids {
2133 if ids.is_empty() {
2134 return Ok(DeleteInstanceResult::default());
2135 }
2136 let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${i}")).collect();
2137 sql.push_str(&format!(
2138 " AND i.instance_id IN ({})",
2139 placeholders.join(", ")
2140 ));
2141 }
2142
2143 if filter.completed_before.is_some() {
2145 let param_num = filter
2146 .instance_ids
2147 .as_ref()
2148 .map(|ids| ids.len())
2149 .unwrap_or(0)
2150 + 1;
2151 sql.push_str(&format!(
2152 " AND e.completed_at < TO_TIMESTAMP(${param_num} / 1000.0)"
2153 ));
2154 }
2155
2156 let limit = filter.limit.unwrap_or(1000);
2158 let limit_param_num = filter
2159 .instance_ids
2160 .as_ref()
2161 .map(|ids| ids.len())
2162 .unwrap_or(0)
2163 + if filter.completed_before.is_some() {
2164 1
2165 } else {
2166 0
2167 }
2168 + 1;
2169 sql.push_str(&format!(" LIMIT ${limit_param_num}"));
2170
2171 let _timer = DbCallTimer::new(DbOperation::Select, None);
2173 let mut query = sqlx::query_scalar::<_, String>(&sql);
2174 if let Some(ref ids) = filter.instance_ids {
2175 for id in ids {
2176 query = query.bind(id);
2177 }
2178 }
2179 if let Some(completed_before) = filter.completed_before {
2180 query = query.bind(completed_before as i64);
2181 }
2182 query = query.bind(limit as i64);
2183
2184 let instance_ids: Vec<String> = query
2185 .fetch_all(&*self.pool)
2186 .await
2187 .map_err(|e| Self::sqlx_to_provider_error("delete_instance_bulk", e))?;
2188
2189 if instance_ids.is_empty() {
2190 return Ok(DeleteInstanceResult::default());
2191 }
2192
2193 let mut result = DeleteInstanceResult::default();
2195
2196 for instance_id in &instance_ids {
2197 let tree = self.get_instance_tree(instance_id).await?;
2199
2200 let delete_result = self.delete_instances_atomic(&tree.all_ids, true).await?;
2202 result.instances_deleted += delete_result.instances_deleted;
2203 result.executions_deleted += delete_result.executions_deleted;
2204 result.events_deleted += delete_result.events_deleted;
2205 result.queue_messages_deleted += delete_result.queue_messages_deleted;
2206 }
2207
2208 debug!(
2209 target = "duroxide::providers::postgres",
2210 operation = "delete_instance_bulk",
2211 instances_deleted = result.instances_deleted,
2212 executions_deleted = result.executions_deleted,
2213 events_deleted = result.events_deleted,
2214 queue_messages_deleted = result.queue_messages_deleted,
2215 "Bulk deleted instances"
2216 );
2217
2218 Ok(result)
2219 }
2220
2221 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
2224 async fn prune_executions(
2225 &self,
2226 instance_id: &str,
2227 options: PruneOptions,
2228 ) -> Result<PruneResult, ProviderError> {
2229 let keep_last: Option<i32> = options.keep_last.map(|v| v as i32);
2230 let completed_before_ms: Option<i64> = options.completed_before.map(|v| v as i64);
2231
2232 let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("prune_executions"));
2233 let row: Option<(i64, i64, i64)> = sqlx::query_as(&format!(
2234 "SELECT * FROM {}.prune_executions($1, $2, $3)",
2235 self.schema_name
2236 ))
2237 .bind(instance_id)
2238 .bind(keep_last)
2239 .bind(completed_before_ms)
2240 .fetch_optional(&*self.pool)
2241 .await
2242 .map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?;
2243
2244 let (instances_processed, executions_deleted, events_deleted) = row.unwrap_or((0, 0, 0));
2245
2246 debug!(
2247 target = "duroxide::providers::postgres",
2248 operation = "prune_executions",
2249 instance_id = %instance_id,
2250 instances_processed = instances_processed,
2251 executions_deleted = executions_deleted,
2252 events_deleted = events_deleted,
2253 "Pruned executions"
2254 );
2255
2256 Ok(PruneResult {
2257 instances_processed: instances_processed as u64,
2258 executions_deleted: executions_deleted as u64,
2259 events_deleted: events_deleted as u64,
2260 })
2261 }
2262
2263 #[instrument(skip(self), target = "duroxide::providers::postgres")]
2264 async fn prune_executions_bulk(
2265 &self,
2266 filter: InstanceFilter,
2267 options: PruneOptions,
2268 ) -> Result<PruneResult, ProviderError> {
2269 let mut sql = format!(
2274 r#"
2275 SELECT i.instance_id
2276 FROM {}.instances i
2277 LEFT JOIN {}.executions e ON i.instance_id = e.instance_id
2278 AND i.current_execution_id = e.execution_id
2279 WHERE 1=1
2280 "#,
2281 self.schema_name, self.schema_name
2282 );
2283
2284 if let Some(ref ids) = filter.instance_ids {
2286 if ids.is_empty() {
2287 return Ok(PruneResult::default());
2288 }
2289 let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${i}")).collect();
2290 sql.push_str(&format!(
2291 " AND i.instance_id IN ({})",
2292 placeholders.join(", ")
2293 ));
2294 }
2295
2296 if filter.completed_before.is_some() {
2298 let param_num = filter
2299 .instance_ids
2300 .as_ref()
2301 .map(|ids| ids.len())
2302 .unwrap_or(0)
2303 + 1;
2304 sql.push_str(&format!(
2305 " AND e.completed_at < TO_TIMESTAMP(${param_num} / 1000.0)"
2306 ));
2307 }
2308
2309 let limit = filter.limit.unwrap_or(1000);
2311 let limit_param_num = filter
2312 .instance_ids
2313 .as_ref()
2314 .map(|ids| ids.len())
2315 .unwrap_or(0)
2316 + if filter.completed_before.is_some() {
2317 1
2318 } else {
2319 0
2320 }
2321 + 1;
2322 sql.push_str(&format!(" LIMIT ${limit_param_num}"));
2323
2324 let _timer = DbCallTimer::new(DbOperation::Select, None);
2326 let mut query = sqlx::query_scalar::<_, String>(&sql);
2327 if let Some(ref ids) = filter.instance_ids {
2328 for id in ids {
2329 query = query.bind(id);
2330 }
2331 }
2332 if let Some(completed_before) = filter.completed_before {
2333 query = query.bind(completed_before as i64);
2334 }
2335 query = query.bind(limit as i64);
2336
2337 let instance_ids: Vec<String> = query
2338 .fetch_all(&*self.pool)
2339 .await
2340 .map_err(|e| Self::sqlx_to_provider_error("prune_executions_bulk", e))?;
2341
2342 let mut result = PruneResult::default();
2344
2345 for instance_id in &instance_ids {
2346 let single_result = self.prune_executions(instance_id, options.clone()).await?;
2347 result.instances_processed += single_result.instances_processed;
2348 result.executions_deleted += single_result.executions_deleted;
2349 result.events_deleted += single_result.events_deleted;
2350 }
2351
2352 debug!(
2353 target = "duroxide::providers::postgres",
2354 operation = "prune_executions_bulk",
2355 instances_processed = result.instances_processed,
2356 executions_deleted = result.executions_deleted,
2357 events_deleted = result.events_deleted,
2358 "Bulk pruned executions"
2359 );
2360
2361 Ok(result)
2362 }
2363}