1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4 DeleteInstanceResult, DispatcherCapabilityFilter, ExecutionInfo, ExecutionMetadata,
5 InstanceFilter, InstanceInfo, OrchestrationItem, Provider, ProviderAdmin, ProviderError,
6 PruneOptions, PruneResult, QueueDepths, ScheduledActivityIdentifier, SessionFetchConfig,
7 SystemMetrics, WorkItem,
8};
9use duroxide::Event;
10use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
11use std::sync::Arc;
12use std::time::Duration;
13use std::time::{SystemTime, UNIX_EPOCH};
14use tokio::time::sleep;
15use tracing::{debug, error, instrument, warn};
16
17use crate::migrations::MigrationRunner;
18
19pub struct PostgresProvider {
42 pool: Arc<PgPool>,
43 schema_name: String,
44}
45
46impl PostgresProvider {
47 pub async fn new(database_url: &str) -> Result<Self> {
48 Self::new_with_schema(database_url, None).await
49 }
50
51 pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
52 let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
53 .ok()
54 .and_then(|s| s.parse::<u32>().ok())
55 .unwrap_or(10);
56
57 let pool = PgPoolOptions::new()
58 .max_connections(max_connections)
59 .min_connections(1)
60 .acquire_timeout(std::time::Duration::from_secs(30))
61 .connect(database_url)
62 .await?;
63
64 let schema_name = schema_name.unwrap_or("public").to_string();
65
66 let provider = Self {
67 pool: Arc::new(pool),
68 schema_name: schema_name.clone(),
69 };
70
71 let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
73 migration_runner.migrate().await?;
74
75 Ok(provider)
76 }
77
78 #[instrument(skip(self), target = "duroxide::providers::postgres")]
79 pub async fn initialize_schema(&self) -> Result<()> {
80 let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
83 migration_runner.migrate().await?;
84 Ok(())
85 }
86
87 fn now_millis() -> i64 {
89 SystemTime::now()
90 .duration_since(UNIX_EPOCH)
91 .unwrap()
92 .as_millis() as i64
93 }
94
95
96
97 fn table_name(&self, table: &str) -> String {
99 format!("{}.{}", self.schema_name, table)
100 }
101
102 pub fn pool(&self) -> &PgPool {
104 &self.pool
105 }
106
107 pub fn schema_name(&self) -> &str {
109 &self.schema_name
110 }
111
112 fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
114 match e {
115 SqlxError::Database(ref db_err) => {
116 let code_opt = db_err.code();
118 let code = code_opt.as_deref();
119 if code == Some("40P01") {
120 ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
122 } else if code == Some("40001") {
123 ProviderError::permanent(operation, format!("Serialization failure: {e}"))
125 } else if code == Some("23505") {
126 ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
128 } else if code == Some("23503") {
129 ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
131 } else {
132 ProviderError::permanent(operation, format!("Database error: {e}"))
133 }
134 }
135 SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
136 ProviderError::retryable(operation, format!("Connection pool error: {e}"))
137 }
138 SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
139 _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
140 }
141 }
142
143 pub async fn cleanup_schema(&self) -> Result<()> {
148 sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
150 .execute(&*self.pool)
151 .await?;
152
153 if self.schema_name != "public" {
156 sqlx::query(&format!(
157 "DROP SCHEMA IF EXISTS {} CASCADE",
158 self.schema_name
159 ))
160 .execute(&*self.pool)
161 .await?;
162 } else {
163 }
166
167 Ok(())
168 }
169}
170
171#[async_trait::async_trait]
172impl Provider for PostgresProvider {
173 fn name(&self) -> &str {
174 "duroxide-pg"
175 }
176
177 fn version(&self) -> &str {
178 env!("CARGO_PKG_VERSION")
179 }
180
181 #[instrument(skip(self), target = "duroxide::providers::postgres")]
182 async fn fetch_orchestration_item(
183 &self,
184 lock_timeout: Duration,
185 _poll_timeout: Duration,
186 filter: Option<&DispatcherCapabilityFilter>,
187 ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
188 let start = std::time::Instant::now();
189
190 const MAX_RETRIES: u32 = 3;
191 const RETRY_DELAY_MS: u64 = 50;
192
193 let lock_timeout_ms = lock_timeout.as_millis() as i64;
195 let mut _last_error: Option<ProviderError> = None;
196
197 let (min_packed, max_packed) = if let Some(f) = filter {
199 if let Some(range) = f.supported_duroxide_versions.first() {
200 let min = range.min.major as i64 * 1_000_000 + range.min.minor as i64 * 1_000 + range.min.patch as i64;
201 let max = range.max.major as i64 * 1_000_000 + range.max.minor as i64 * 1_000 + range.max.patch as i64;
202 (Some(min), Some(max))
203 } else {
204 return Ok(None);
206 }
207 } else {
208 (None, None)
209 };
210
211 for attempt in 0..=MAX_RETRIES {
212 let now_ms = Self::now_millis();
213
214 let result: Result<
215 Option<(
216 String,
217 String,
218 String,
219 i64,
220 serde_json::Value,
221 serde_json::Value,
222 String,
223 i32,
224 )>,
225 SqlxError,
226 > = sqlx::query_as(&format!(
227 "SELECT * FROM {}.fetch_orchestration_item($1, $2, $3, $4)",
228 self.schema_name
229 ))
230 .bind(now_ms)
231 .bind(lock_timeout_ms)
232 .bind(min_packed)
233 .bind(max_packed)
234 .fetch_optional(&*self.pool)
235 .await;
236
237 let row = match result {
238 Ok(r) => r,
239 Err(e) => {
240 let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
241 if provider_err.is_retryable() && attempt < MAX_RETRIES {
242 warn!(
243 target = "duroxide::providers::postgres",
244 operation = "fetch_orchestration_item",
245 attempt = attempt + 1,
246 error = %provider_err,
247 "Retryable error, will retry"
248 );
249 _last_error = Some(provider_err);
250 sleep(std::time::Duration::from_millis(
251 RETRY_DELAY_MS * (attempt as u64 + 1),
252 ))
253 .await;
254 continue;
255 }
256 return Err(provider_err);
257 }
258 };
259
260 if let Some((
261 instance_id,
262 orchestration_name,
263 orchestration_version,
264 execution_id,
265 history_json,
266 messages_json,
267 lock_token,
268 attempt_count,
269 )) = row
270 {
271 let (history, history_error) = match serde_json::from_value::<Vec<Event>>(history_json) {
272 Ok(h) => (h, None),
273 Err(e) => {
274 let error_msg = format!("Failed to deserialize history: {e}");
275 warn!(
276 target = "duroxide::providers::postgres",
277 instance = %instance_id,
278 error = %error_msg,
279 "History deserialization failed, returning item with history_error"
280 );
281 (vec![], Some(error_msg))
282 }
283 };
284
285 let messages: Vec<WorkItem> =
286 serde_json::from_value(messages_json).map_err(|e| {
287 ProviderError::permanent(
288 "fetch_orchestration_item",
289 format!("Failed to deserialize messages: {e}"),
290 )
291 })?;
292
293 let duration_ms = start.elapsed().as_millis() as u64;
294 debug!(
295 target = "duroxide::providers::postgres",
296 operation = "fetch_orchestration_item",
297 instance_id = %instance_id,
298 execution_id = execution_id,
299 message_count = messages.len(),
300 history_count = history.len(),
301 attempt_count = attempt_count,
302 duration_ms = duration_ms,
303 attempts = attempt + 1,
304 "Fetched orchestration item via stored procedure"
305 );
306
307 return Ok(Some((
308 OrchestrationItem {
309 instance: instance_id,
310 orchestration_name,
311 execution_id: execution_id as u64,
312 version: orchestration_version,
313 history,
314 messages,
315 history_error,
316 },
317 lock_token,
318 attempt_count as u32,
319 )));
320 }
321
322 return Ok(None);
325 }
326
327 Ok(None)
328 }
329 #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
330 async fn ack_orchestration_item(
331 &self,
332 lock_token: &str,
333 execution_id: u64,
334 history_delta: Vec<Event>,
335 worker_items: Vec<WorkItem>,
336 orchestrator_items: Vec<WorkItem>,
337 metadata: ExecutionMetadata,
338 cancelled_activities: Vec<ScheduledActivityIdentifier>,
339 ) -> Result<(), ProviderError> {
340 let start = std::time::Instant::now();
341
342 const MAX_RETRIES: u32 = 3;
343 const RETRY_DELAY_MS: u64 = 50;
344
345 let mut history_delta_payload = Vec::with_capacity(history_delta.len());
346 for event in &history_delta {
347 if event.event_id() == 0 {
348 return Err(ProviderError::permanent(
349 "ack_orchestration_item",
350 "event_id must be set by runtime",
351 ));
352 }
353
354 let event_json = serde_json::to_string(event).map_err(|e| {
355 ProviderError::permanent(
356 "ack_orchestration_item",
357 format!("Failed to serialize event: {e}"),
358 )
359 })?;
360
361 let event_type = format!("{event:?}")
362 .split('{')
363 .next()
364 .unwrap_or("Unknown")
365 .trim()
366 .to_string();
367
368 history_delta_payload.push(serde_json::json!({
369 "event_id": event.event_id(),
370 "event_type": event_type,
371 "event_data": event_json,
372 }));
373 }
374
375 let history_delta_json = serde_json::Value::Array(history_delta_payload);
376
377 let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
378 ProviderError::permanent(
379 "ack_orchestration_item",
380 format!("Failed to serialize worker items: {e}"),
381 )
382 })?;
383
384 let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
385 ProviderError::permanent(
386 "ack_orchestration_item",
387 format!("Failed to serialize orchestrator items: {e}"),
388 )
389 })?;
390
391 let metadata_json = serde_json::json!({
392 "orchestration_name": metadata.orchestration_name,
393 "orchestration_version": metadata.orchestration_version,
394 "status": metadata.status,
395 "output": metadata.output,
396 "parent_instance_id": metadata.parent_instance_id,
397 "pinned_duroxide_version": metadata.pinned_duroxide_version.as_ref().map(|v| {
398 serde_json::json!({
399 "major": v.major,
400 "minor": v.minor,
401 "patch": v.patch,
402 })
403 }),
404 });
405
406 let cancelled_activities_json: Vec<serde_json::Value> = cancelled_activities
408 .iter()
409 .map(|a| {
410 serde_json::json!({
411 "instance": a.instance,
412 "execution_id": a.execution_id,
413 "activity_id": a.activity_id,
414 })
415 })
416 .collect();
417 let cancelled_activities_json = serde_json::Value::Array(cancelled_activities_json);
418
419 for attempt in 0..=MAX_RETRIES {
420 let now_ms = Self::now_millis();
421 let result = sqlx::query(&format!(
422 "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6, $7, $8)",
423 self.schema_name
424 ))
425 .bind(lock_token)
426 .bind(now_ms)
427 .bind(execution_id as i64)
428 .bind(&history_delta_json)
429 .bind(&worker_items_json)
430 .bind(&orchestrator_items_json)
431 .bind(&metadata_json)
432 .bind(&cancelled_activities_json)
433 .execute(&*self.pool)
434 .await;
435
436 match result {
437 Ok(_) => {
438 let duration_ms = start.elapsed().as_millis() as u64;
439 debug!(
440 target = "duroxide::providers::postgres",
441 operation = "ack_orchestration_item",
442 execution_id = execution_id,
443 history_count = history_delta.len(),
444 worker_items_count = worker_items.len(),
445 orchestrator_items_count = orchestrator_items.len(),
446 cancelled_activities_count = cancelled_activities.len(),
447 duration_ms = duration_ms,
448 attempts = attempt + 1,
449 "Acknowledged orchestration item via stored procedure"
450 );
451 return Ok(());
452 }
453 Err(e) => {
454 if let SqlxError::Database(db_err) = &e {
456 if db_err.message().contains("Invalid lock token") {
457 return Err(ProviderError::permanent(
458 "ack_orchestration_item",
459 "Invalid lock token",
460 ));
461 }
462 } else if e.to_string().contains("Invalid lock token") {
463 return Err(ProviderError::permanent(
464 "ack_orchestration_item",
465 "Invalid lock token",
466 ));
467 }
468
469 let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
470 if provider_err.is_retryable() && attempt < MAX_RETRIES {
471 warn!(
472 target = "duroxide::providers::postgres",
473 operation = "ack_orchestration_item",
474 attempt = attempt + 1,
475 error = %provider_err,
476 "Retryable error, will retry"
477 );
478 sleep(std::time::Duration::from_millis(
479 RETRY_DELAY_MS * (attempt as u64 + 1),
480 ))
481 .await;
482 continue;
483 }
484 return Err(provider_err);
485 }
486 }
487 }
488
489 Ok(())
491 }
492 #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
493 async fn abandon_orchestration_item(
494 &self,
495 lock_token: &str,
496 delay: Option<Duration>,
497 ignore_attempt: bool,
498 ) -> Result<(), ProviderError> {
499 let start = std::time::Instant::now();
500 let now_ms = Self::now_millis();
501 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
502
503 let instance_id = match sqlx::query_scalar::<_, String>(&format!(
504 "SELECT {}.abandon_orchestration_item($1, $2, $3, $4)",
505 self.schema_name
506 ))
507 .bind(lock_token)
508 .bind(now_ms)
509 .bind(delay_param)
510 .bind(ignore_attempt)
511 .fetch_one(&*self.pool)
512 .await
513 {
514 Ok(instance_id) => instance_id,
515 Err(e) => {
516 if let SqlxError::Database(db_err) = &e {
517 if db_err.message().contains("Invalid lock token") {
518 return Err(ProviderError::permanent(
519 "abandon_orchestration_item",
520 "Invalid lock token",
521 ));
522 }
523 } else if e.to_string().contains("Invalid lock token") {
524 return Err(ProviderError::permanent(
525 "abandon_orchestration_item",
526 "Invalid lock token",
527 ));
528 }
529
530 return Err(Self::sqlx_to_provider_error(
531 "abandon_orchestration_item",
532 e,
533 ));
534 }
535 };
536
537 let duration_ms = start.elapsed().as_millis() as u64;
538 debug!(
539 target = "duroxide::providers::postgres",
540 operation = "abandon_orchestration_item",
541 instance_id = %instance_id,
542 delay_ms = delay.map(|d| d.as_millis() as u64),
543 ignore_attempt = ignore_attempt,
544 duration_ms = duration_ms,
545 "Abandoned orchestration item via stored procedure"
546 );
547
548 Ok(())
549 }
550
551 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
552 async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
553 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
554 "SELECT out_event_data FROM {}.fetch_history($1)",
555 self.schema_name
556 ))
557 .bind(instance)
558 .fetch_all(&*self.pool)
559 .await
560 .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
561
562 Ok(event_data_rows
563 .into_iter()
564 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
565 .collect())
566 }
567
568 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
569 async fn append_with_execution(
570 &self,
571 instance: &str,
572 execution_id: u64,
573 new_events: Vec<Event>,
574 ) -> Result<(), ProviderError> {
575 if new_events.is_empty() {
576 return Ok(());
577 }
578
579 let mut events_payload = Vec::with_capacity(new_events.len());
580 for event in &new_events {
581 if event.event_id() == 0 {
582 error!(
583 target = "duroxide::providers::postgres",
584 operation = "append_with_execution",
585 error_type = "validation_error",
586 instance_id = %instance,
587 execution_id = execution_id,
588 "event_id must be set by runtime"
589 );
590 return Err(ProviderError::permanent(
591 "append_with_execution",
592 "event_id must be set by runtime",
593 ));
594 }
595
596 let event_json = serde_json::to_string(event).map_err(|e| {
597 ProviderError::permanent(
598 "append_with_execution",
599 format!("Failed to serialize event: {e}"),
600 )
601 })?;
602
603 let event_type = format!("{event:?}")
604 .split('{')
605 .next()
606 .unwrap_or("Unknown")
607 .trim()
608 .to_string();
609
610 events_payload.push(serde_json::json!({
611 "event_id": event.event_id(),
612 "event_type": event_type,
613 "event_data": event_json,
614 }));
615 }
616
617 let events_json = serde_json::Value::Array(events_payload);
618
619 sqlx::query(&format!(
620 "SELECT {}.append_history($1, $2, $3)",
621 self.schema_name
622 ))
623 .bind(instance)
624 .bind(execution_id as i64)
625 .bind(events_json)
626 .execute(&*self.pool)
627 .await
628 .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
629
630 debug!(
631 target = "duroxide::providers::postgres",
632 operation = "append_with_execution",
633 instance_id = %instance,
634 execution_id = execution_id,
635 event_count = new_events.len(),
636 "Appended history events via stored procedure"
637 );
638
639 Ok(())
640 }
641
642 #[instrument(skip(self), target = "duroxide::providers::postgres")]
643 async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
644 let work_item = serde_json::to_string(&item).map_err(|e| {
645 ProviderError::permanent(
646 "enqueue_worker_work",
647 format!("Failed to serialize work item: {e}"),
648 )
649 })?;
650
651 let now_ms = Self::now_millis();
652
653 let (instance_id, execution_id, activity_id, session_id) = match &item {
655 WorkItem::ActivityExecute {
656 instance,
657 execution_id,
658 id,
659 session_id,
660 ..
661 } => (
662 Some(instance.clone()),
663 Some(*execution_id as i64),
664 Some(*id as i64),
665 session_id.clone(),
666 ),
667 _ => (None, None, None, None),
668 };
669
670 sqlx::query(&format!(
671 "SELECT {}.enqueue_worker_work($1, $2, $3, $4, $5, $6)",
672 self.schema_name
673 ))
674 .bind(work_item)
675 .bind(now_ms)
676 .bind(&instance_id)
677 .bind(execution_id)
678 .bind(activity_id)
679 .bind(&session_id)
680 .execute(&*self.pool)
681 .await
682 .map_err(|e| {
683 error!(
684 target = "duroxide::providers::postgres",
685 operation = "enqueue_worker_work",
686 error_type = "database_error",
687 error = %e,
688 "Failed to enqueue worker work"
689 );
690 Self::sqlx_to_provider_error("enqueue_worker_work", e)
691 })?;
692
693 Ok(())
694 }
695
696 #[instrument(skip(self), target = "duroxide::providers::postgres")]
697 async fn fetch_work_item(
698 &self,
699 lock_timeout: Duration,
700 _poll_timeout: Duration,
701 session: Option<&SessionFetchConfig>,
702 ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
703 let start = std::time::Instant::now();
704
705 let lock_timeout_ms = lock_timeout.as_millis() as i64;
707
708 let (owner_id, session_lock_timeout_ms): (Option<&str>, Option<i64>) = match session {
710 Some(config) => (
711 Some(&config.owner_id),
712 Some(config.lock_timeout.as_millis() as i64),
713 ),
714 None => (None, None),
715 };
716
717 let row = match sqlx::query_as::<_, (String, String, i32)>(&format!(
718 "SELECT * FROM {}.fetch_work_item($1, $2, $3, $4)",
719 self.schema_name
720 ))
721 .bind(Self::now_millis())
722 .bind(lock_timeout_ms)
723 .bind(owner_id)
724 .bind(session_lock_timeout_ms)
725 .fetch_optional(&*self.pool)
726 .await
727 {
728 Ok(row) => row,
729 Err(e) => {
730 return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
731 }
732 };
733
734 let (work_item_json, lock_token, attempt_count) = match row {
735 Some(row) => row,
736 None => return Ok(None),
737 };
738
739 let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
740 ProviderError::permanent(
741 "fetch_work_item",
742 format!("Failed to deserialize worker item: {e}"),
743 )
744 })?;
745
746 let duration_ms = start.elapsed().as_millis() as u64;
747
748 let instance_id = match &work_item {
750 WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
751 WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
752 WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
753 WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
754 WorkItem::TimerFired { instance, .. } => instance.as_str(),
755 WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
756 WorkItem::CancelInstance { instance, .. } => instance.as_str(),
757 WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
758 WorkItem::SubOrchCompleted {
759 parent_instance, ..
760 } => parent_instance.as_str(),
761 WorkItem::SubOrchFailed {
762 parent_instance, ..
763 } => parent_instance.as_str(),
764 };
765
766 debug!(
767 target = "duroxide::providers::postgres",
768 operation = "fetch_work_item",
769 instance_id = %instance_id,
770 attempt_count = attempt_count,
771 duration_ms = duration_ms,
772 "Fetched activity work item via stored procedure"
773 );
774
775 Ok(Some((
776 work_item,
777 lock_token,
778 attempt_count as u32,
779 )))
780 }
781
782 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
783 async fn ack_work_item(
784 &self,
785 token: &str,
786 completion: Option<WorkItem>,
787 ) -> Result<(), ProviderError> {
788 let start = std::time::Instant::now();
789
790 let Some(completion) = completion else {
792 sqlx::query(&format!(
794 "SELECT {}.ack_worker($1)",
795 self.schema_name
796 ))
797 .bind(token)
798 .execute(&*self.pool)
799 .await
800 .map_err(|e| {
801 if e.to_string().contains("Worker queue item not found") {
802 ProviderError::permanent(
803 "ack_worker",
804 "Worker queue item not found or already processed",
805 )
806 } else {
807 Self::sqlx_to_provider_error("ack_worker", e)
808 }
809 })?;
810
811 let duration_ms = start.elapsed().as_millis() as u64;
812 debug!(
813 target = "duroxide::providers::postgres",
814 operation = "ack_worker",
815 token = %token,
816 duration_ms = duration_ms,
817 "Acknowledged worker without completion (cancelled)"
818 );
819 return Ok(());
820 };
821
822 let instance_id = match &completion {
824 WorkItem::ActivityCompleted { instance, .. }
825 | WorkItem::ActivityFailed { instance, .. } => instance,
826 _ => {
827 error!(
828 target = "duroxide::providers::postgres",
829 operation = "ack_worker",
830 error_type = "invalid_completion_type",
831 "Invalid completion work item type"
832 );
833 return Err(ProviderError::permanent(
834 "ack_worker",
835 "Invalid completion work item type",
836 ));
837 }
838 };
839
840 let completion_json = serde_json::to_string(&completion).map_err(|e| {
841 ProviderError::permanent("ack_worker", format!("Failed to serialize completion: {e}"))
842 })?;
843
844 let now_ms = Self::now_millis();
845
846 sqlx::query(&format!(
848 "SELECT {}.ack_worker($1, $2, $3, $4)",
849 self.schema_name
850 ))
851 .bind(token)
852 .bind(instance_id)
853 .bind(completion_json)
854 .bind(now_ms)
855 .execute(&*self.pool)
856 .await
857 .map_err(|e| {
858 if e.to_string().contains("Worker queue item not found") {
859 error!(
860 target = "duroxide::providers::postgres",
861 operation = "ack_worker",
862 error_type = "worker_item_not_found",
863 token = %token,
864 "Worker queue item not found or already processed"
865 );
866 ProviderError::permanent(
867 "ack_worker",
868 "Worker queue item not found or already processed",
869 )
870 } else {
871 Self::sqlx_to_provider_error("ack_worker", e)
872 }
873 })?;
874
875 let duration_ms = start.elapsed().as_millis() as u64;
876 debug!(
877 target = "duroxide::providers::postgres",
878 operation = "ack_worker",
879 instance_id = %instance_id,
880 duration_ms = duration_ms,
881 "Acknowledged worker and enqueued completion"
882 );
883
884 Ok(())
885 }
886
887 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
888 async fn renew_work_item_lock(
889 &self,
890 token: &str,
891 extend_for: Duration,
892 ) -> Result<(), ProviderError> {
893 let start = std::time::Instant::now();
894
895 let now_ms = Self::now_millis();
897
898 let extend_secs = extend_for.as_secs() as i64;
900
901 match sqlx::query(&format!(
902 "SELECT {}.renew_work_item_lock($1, $2, $3)",
903 self.schema_name
904 ))
905 .bind(token)
906 .bind(now_ms)
907 .bind(extend_secs)
908 .execute(&*self.pool)
909 .await
910 {
911 Ok(_) => {
912 let duration_ms = start.elapsed().as_millis() as u64;
913 debug!(
914 target = "duroxide::providers::postgres",
915 operation = "renew_work_item_lock",
916 token = %token,
917 extend_for_secs = extend_secs,
918 duration_ms = duration_ms,
919 "Work item lock renewed successfully"
920 );
921 Ok(())
922 }
923 Err(e) => {
924 if let SqlxError::Database(db_err) = &e {
925 if db_err.message().contains("Lock token invalid") {
926 return Err(ProviderError::permanent(
927 "renew_work_item_lock",
928 "Lock token invalid, expired, or already acked",
929 ));
930 }
931 } else if e.to_string().contains("Lock token invalid") {
932 return Err(ProviderError::permanent(
933 "renew_work_item_lock",
934 "Lock token invalid, expired, or already acked",
935 ));
936 }
937
938 Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
939 }
940 }
941 }
942
943 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
944 async fn abandon_work_item(
945 &self,
946 token: &str,
947 delay: Option<Duration>,
948 ignore_attempt: bool,
949 ) -> Result<(), ProviderError> {
950 let start = std::time::Instant::now();
951 let now_ms = Self::now_millis();
952 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
953
954 match sqlx::query(&format!(
955 "SELECT {}.abandon_work_item($1, $2, $3, $4)",
956 self.schema_name
957 ))
958 .bind(token)
959 .bind(now_ms)
960 .bind(delay_param)
961 .bind(ignore_attempt)
962 .execute(&*self.pool)
963 .await
964 {
965 Ok(_) => {
966 let duration_ms = start.elapsed().as_millis() as u64;
967 debug!(
968 target = "duroxide::providers::postgres",
969 operation = "abandon_work_item",
970 token = %token,
971 delay_ms = delay.map(|d| d.as_millis() as u64),
972 ignore_attempt = ignore_attempt,
973 duration_ms = duration_ms,
974 "Abandoned work item via stored procedure"
975 );
976 Ok(())
977 }
978 Err(e) => {
979 if let SqlxError::Database(db_err) = &e {
980 if db_err.message().contains("Invalid lock token")
981 || db_err.message().contains("already acked")
982 {
983 return Err(ProviderError::permanent(
984 "abandon_work_item",
985 "Invalid lock token or already acked",
986 ));
987 }
988 } else if e.to_string().contains("Invalid lock token")
989 || e.to_string().contains("already acked")
990 {
991 return Err(ProviderError::permanent(
992 "abandon_work_item",
993 "Invalid lock token or already acked",
994 ));
995 }
996
997 Err(Self::sqlx_to_provider_error("abandon_work_item", e))
998 }
999 }
1000 }
1001
1002 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1003 async fn renew_orchestration_item_lock(
1004 &self,
1005 token: &str,
1006 extend_for: Duration,
1007 ) -> Result<(), ProviderError> {
1008 let start = std::time::Instant::now();
1009
1010 let now_ms = Self::now_millis();
1012
1013 let extend_secs = extend_for.as_secs() as i64;
1015
1016 match sqlx::query(&format!(
1017 "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
1018 self.schema_name
1019 ))
1020 .bind(token)
1021 .bind(now_ms)
1022 .bind(extend_secs)
1023 .execute(&*self.pool)
1024 .await
1025 {
1026 Ok(_) => {
1027 let duration_ms = start.elapsed().as_millis() as u64;
1028 debug!(
1029 target = "duroxide::providers::postgres",
1030 operation = "renew_orchestration_item_lock",
1031 token = %token,
1032 extend_for_secs = extend_secs,
1033 duration_ms = duration_ms,
1034 "Orchestration item lock renewed successfully"
1035 );
1036 Ok(())
1037 }
1038 Err(e) => {
1039 if let SqlxError::Database(db_err) = &e {
1040 if db_err.message().contains("Lock token invalid")
1041 || db_err.message().contains("expired")
1042 || db_err.message().contains("already released")
1043 {
1044 return Err(ProviderError::permanent(
1045 "renew_orchestration_item_lock",
1046 "Lock token invalid, expired, or already released",
1047 ));
1048 }
1049 } else if e.to_string().contains("Lock token invalid")
1050 || e.to_string().contains("expired")
1051 || e.to_string().contains("already released")
1052 {
1053 return Err(ProviderError::permanent(
1054 "renew_orchestration_item_lock",
1055 "Lock token invalid, expired, or already released",
1056 ));
1057 }
1058
1059 Err(Self::sqlx_to_provider_error(
1060 "renew_orchestration_item_lock",
1061 e,
1062 ))
1063 }
1064 }
1065 }
1066
1067 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1068 async fn enqueue_for_orchestrator(
1069 &self,
1070 item: WorkItem,
1071 delay: Option<Duration>,
1072 ) -> Result<(), ProviderError> {
1073 let work_item = serde_json::to_string(&item).map_err(|e| {
1074 ProviderError::permanent(
1075 "enqueue_orchestrator_work",
1076 format!("Failed to serialize work item: {e}"),
1077 )
1078 })?;
1079
1080 let instance_id = match &item {
1082 WorkItem::StartOrchestration { instance, .. }
1083 | WorkItem::ActivityCompleted { instance, .. }
1084 | WorkItem::ActivityFailed { instance, .. }
1085 | WorkItem::TimerFired { instance, .. }
1086 | WorkItem::ExternalRaised { instance, .. }
1087 | WorkItem::CancelInstance { instance, .. }
1088 | WorkItem::ContinueAsNew { instance, .. } => instance,
1089 WorkItem::SubOrchCompleted {
1090 parent_instance, ..
1091 }
1092 | WorkItem::SubOrchFailed {
1093 parent_instance, ..
1094 } => parent_instance,
1095 WorkItem::ActivityExecute { .. } => {
1096 return Err(ProviderError::permanent(
1097 "enqueue_orchestrator_work",
1098 "ActivityExecute should go to worker queue, not orchestrator queue",
1099 ));
1100 }
1101 };
1102
1103 let now_ms = Self::now_millis();
1105
1106 let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
1107 if *fire_at_ms > 0 {
1108 if let Some(delay) = delay {
1110 std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
1111 } else {
1112 *fire_at_ms
1113 }
1114 } else {
1115 delay
1117 .map(|d| now_ms as u64 + d.as_millis() as u64)
1118 .unwrap_or(now_ms as u64)
1119 }
1120 } else {
1121 delay
1123 .map(|d| now_ms as u64 + d.as_millis() as u64)
1124 .unwrap_or(now_ms as u64)
1125 };
1126
1127 let visible_at = Utc
1128 .timestamp_millis_opt(visible_at_ms as i64)
1129 .single()
1130 .ok_or_else(|| {
1131 ProviderError::permanent(
1132 "enqueue_orchestrator_work",
1133 "Invalid visible_at timestamp",
1134 )
1135 })?;
1136
1137 sqlx::query(&format!(
1142 "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
1143 self.schema_name
1144 ))
1145 .bind(instance_id)
1146 .bind(&work_item)
1147 .bind(visible_at)
1148 .bind::<Option<String>>(None) .bind::<Option<String>>(None) .bind::<Option<i64>>(None) .execute(&*self.pool)
1152 .await
1153 .map_err(|e| {
1154 error!(
1155 target = "duroxide::providers::postgres",
1156 operation = "enqueue_orchestrator_work",
1157 error_type = "database_error",
1158 error = %e,
1159 instance_id = %instance_id,
1160 "Failed to enqueue orchestrator work"
1161 );
1162 Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1163 })?;
1164
1165 debug!(
1166 target = "duroxide::providers::postgres",
1167 operation = "enqueue_orchestrator_work",
1168 instance_id = %instance_id,
1169 delay_ms = delay.map(|d| d.as_millis() as u64),
1170 "Enqueued orchestrator work"
1171 );
1172
1173 Ok(())
1174 }
1175
1176 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1177 async fn read_with_execution(
1178 &self,
1179 instance: &str,
1180 execution_id: u64,
1181 ) -> Result<Vec<Event>, ProviderError> {
1182 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1183 "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1184 self.table_name("history")
1185 ))
1186 .bind(instance)
1187 .bind(execution_id as i64)
1188 .fetch_all(&*self.pool)
1189 .await
1190 .ok()
1191 .unwrap_or_default();
1192
1193 Ok(event_data_rows
1194 .into_iter()
1195 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1196 .collect())
1197 }
1198
1199 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1200 async fn renew_session_lock(
1201 &self,
1202 owner_ids: &[&str],
1203 extend_for: Duration,
1204 idle_timeout: Duration,
1205 ) -> Result<usize, ProviderError> {
1206 if owner_ids.is_empty() {
1207 return Ok(0);
1208 }
1209
1210 let now_ms = Self::now_millis();
1211 let extend_ms = extend_for.as_millis() as i64;
1212 let idle_timeout_ms = idle_timeout.as_millis() as i64;
1213 let owner_ids_vec: Vec<&str> = owner_ids.to_vec();
1214
1215 let result = sqlx::query_scalar::<_, i64>(&format!(
1216 "SELECT {}.renew_session_lock($1, $2, $3, $4)",
1217 self.schema_name
1218 ))
1219 .bind(&owner_ids_vec)
1220 .bind(now_ms)
1221 .bind(extend_ms)
1222 .bind(idle_timeout_ms)
1223 .fetch_one(&*self.pool)
1224 .await
1225 .map_err(|e| Self::sqlx_to_provider_error("renew_session_lock", e))?;
1226
1227 debug!(
1228 target = "duroxide::providers::postgres",
1229 operation = "renew_session_lock",
1230 owner_count = owner_ids.len(),
1231 sessions_renewed = result,
1232 "Session locks renewed"
1233 );
1234
1235 Ok(result as usize)
1236 }
1237
1238 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1239 async fn cleanup_orphaned_sessions(
1240 &self,
1241 _idle_timeout: Duration,
1242 ) -> Result<usize, ProviderError> {
1243 let now_ms = Self::now_millis();
1244
1245 let result = sqlx::query_scalar::<_, i64>(&format!(
1246 "SELECT {}.cleanup_orphaned_sessions($1)",
1247 self.schema_name
1248 ))
1249 .bind(now_ms)
1250 .fetch_one(&*self.pool)
1251 .await
1252 .map_err(|e| Self::sqlx_to_provider_error("cleanup_orphaned_sessions", e))?;
1253
1254 debug!(
1255 target = "duroxide::providers::postgres",
1256 operation = "cleanup_orphaned_sessions",
1257 sessions_cleaned = result,
1258 "Orphaned sessions cleaned up"
1259 );
1260
1261 Ok(result as usize)
1262 }
1263
1264 fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1265 Some(self)
1266 }
1267}
1268
1269#[async_trait::async_trait]
1270impl ProviderAdmin for PostgresProvider {
1271 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1272 async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1273 sqlx::query_scalar(&format!(
1274 "SELECT instance_id FROM {}.list_instances()",
1275 self.schema_name
1276 ))
1277 .fetch_all(&*self.pool)
1278 .await
1279 .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1280 }
1281
1282 #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1283 async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1284 sqlx::query_scalar(&format!(
1285 "SELECT instance_id FROM {}.list_instances_by_status($1)",
1286 self.schema_name
1287 ))
1288 .bind(status)
1289 .fetch_all(&*self.pool)
1290 .await
1291 .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1292 }
1293
1294 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1295 async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1296 let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1297 "SELECT execution_id FROM {}.list_executions($1)",
1298 self.schema_name
1299 ))
1300 .bind(instance)
1301 .fetch_all(&*self.pool)
1302 .await
1303 .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1304
1305 Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1306 }
1307
1308 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1309 async fn read_history_with_execution_id(
1310 &self,
1311 instance: &str,
1312 execution_id: u64,
1313 ) -> Result<Vec<Event>, ProviderError> {
1314 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1315 "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1316 self.schema_name
1317 ))
1318 .bind(instance)
1319 .bind(execution_id as i64)
1320 .fetch_all(&*self.pool)
1321 .await
1322 .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1323
1324 event_data_rows
1325 .into_iter()
1326 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1327 .collect::<Vec<Event>>()
1328 .into_iter()
1329 .map(Ok)
1330 .collect()
1331 }
1332
1333 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1334 async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1335 let execution_id = self.latest_execution_id(instance).await?;
1336 self.read_history_with_execution_id(instance, execution_id)
1337 .await
1338 }
1339
1340 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1341 async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1342 sqlx::query_scalar(&format!(
1343 "SELECT {}.latest_execution_id($1)",
1344 self.schema_name
1345 ))
1346 .bind(instance)
1347 .fetch_optional(&*self.pool)
1348 .await
1349 .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1350 .map(|id: i64| id as u64)
1351 .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1352 }
1353
1354 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1355 async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1356 let row: Option<(
1357 String,
1358 String,
1359 String,
1360 i64,
1361 chrono::DateTime<Utc>,
1362 Option<chrono::DateTime<Utc>>,
1363 Option<String>,
1364 Option<String>,
1365 Option<String>,
1366 )> = sqlx::query_as(&format!(
1367 "SELECT * FROM {}.get_instance_info($1)",
1368 self.schema_name
1369 ))
1370 .bind(instance)
1371 .fetch_optional(&*self.pool)
1372 .await
1373 .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1374
1375 let (
1376 instance_id,
1377 orchestration_name,
1378 orchestration_version,
1379 current_execution_id,
1380 created_at,
1381 updated_at,
1382 status,
1383 output,
1384 parent_instance_id,
1385 ) =
1386 row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1387
1388 Ok(InstanceInfo {
1389 instance_id,
1390 orchestration_name,
1391 orchestration_version,
1392 current_execution_id: current_execution_id as u64,
1393 status: status.unwrap_or_else(|| "Running".to_string()),
1394 output,
1395 created_at: created_at.timestamp_millis() as u64,
1396 updated_at: updated_at
1397 .map(|dt| dt.timestamp_millis() as u64)
1398 .unwrap_or(created_at.timestamp_millis() as u64),
1399 parent_instance_id,
1400 })
1401 }
1402
1403 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1404 async fn get_execution_info(
1405 &self,
1406 instance: &str,
1407 execution_id: u64,
1408 ) -> Result<ExecutionInfo, ProviderError> {
1409 let row: Option<(
1410 i64,
1411 String,
1412 Option<String>,
1413 chrono::DateTime<Utc>,
1414 Option<chrono::DateTime<Utc>>,
1415 i64,
1416 )> = sqlx::query_as(&format!(
1417 "SELECT * FROM {}.get_execution_info($1, $2)",
1418 self.schema_name
1419 ))
1420 .bind(instance)
1421 .bind(execution_id as i64)
1422 .fetch_optional(&*self.pool)
1423 .await
1424 .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1425
1426 let (exec_id, status, output, started_at, completed_at, event_count) = row
1427 .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1428
1429 Ok(ExecutionInfo {
1430 execution_id: exec_id as u64,
1431 status,
1432 output,
1433 started_at: started_at.timestamp_millis() as u64,
1434 completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1435 event_count: event_count as usize,
1436 })
1437 }
1438
1439 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1440 async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1441 let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1442 "SELECT * FROM {}.get_system_metrics()",
1443 self.schema_name
1444 ))
1445 .fetch_optional(&*self.pool)
1446 .await
1447 .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1448
1449 let (
1450 total_instances,
1451 total_executions,
1452 running_instances,
1453 completed_instances,
1454 failed_instances,
1455 total_events,
1456 ) = row.ok_or_else(|| {
1457 ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1458 })?;
1459
1460 Ok(SystemMetrics {
1461 total_instances: total_instances as u64,
1462 total_executions: total_executions as u64,
1463 running_instances: running_instances as u64,
1464 completed_instances: completed_instances as u64,
1465 failed_instances: failed_instances as u64,
1466 total_events: total_events as u64,
1467 })
1468 }
1469
1470 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1471 async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1472 let now_ms = Self::now_millis();
1473
1474 let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1475 "SELECT * FROM {}.get_queue_depths($1)",
1476 self.schema_name
1477 ))
1478 .bind(now_ms)
1479 .fetch_optional(&*self.pool)
1480 .await
1481 .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1482
1483 let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1484 ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1485 })?;
1486
1487 Ok(QueueDepths {
1488 orchestrator_queue: orchestrator_queue as usize,
1489 worker_queue: worker_queue as usize,
1490 timer_queue: 0, })
1492 }
1493
1494 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1497 async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError> {
1498 sqlx::query_scalar(&format!(
1499 "SELECT child_instance_id FROM {}.list_children($1)",
1500 self.schema_name
1501 ))
1502 .bind(instance_id)
1503 .fetch_all(&*self.pool)
1504 .await
1505 .map_err(|e| Self::sqlx_to_provider_error("list_children", e))
1506 }
1507
1508 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1509 async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError> {
1510 let result: Result<Option<String>, _> = sqlx::query_scalar(&format!(
1513 "SELECT {}.get_parent_id($1)",
1514 self.schema_name
1515 ))
1516 .bind(instance_id)
1517 .fetch_one(&*self.pool)
1518 .await;
1519
1520 match result {
1521 Ok(parent_id) => Ok(parent_id),
1522 Err(e) => {
1523 let err_str = e.to_string();
1524 if err_str.contains("Instance not found") {
1525 Err(ProviderError::permanent(
1526 "get_parent_id",
1527 format!("Instance not found: {}", instance_id),
1528 ))
1529 } else {
1530 Err(Self::sqlx_to_provider_error("get_parent_id", e))
1531 }
1532 }
1533 }
1534 }
1535
1536 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1539 async fn delete_instances_atomic(
1540 &self,
1541 ids: &[String],
1542 force: bool,
1543 ) -> Result<DeleteInstanceResult, ProviderError> {
1544 if ids.is_empty() {
1545 return Ok(DeleteInstanceResult::default());
1546 }
1547
1548 let row: Option<(i64, i64, i64, i64)> = sqlx::query_as(&format!(
1549 "SELECT * FROM {}.delete_instances_atomic($1, $2)",
1550 self.schema_name
1551 ))
1552 .bind(ids)
1553 .bind(force)
1554 .fetch_optional(&*self.pool)
1555 .await
1556 .map_err(|e| {
1557 let err_str = e.to_string();
1558 if err_str.contains("is Running") {
1559 ProviderError::permanent(
1560 "delete_instances_atomic",
1561 err_str,
1562 )
1563 } else if err_str.contains("Orphan detected") {
1564 ProviderError::permanent(
1565 "delete_instances_atomic",
1566 err_str,
1567 )
1568 } else {
1569 Self::sqlx_to_provider_error("delete_instances_atomic", e)
1570 }
1571 })?;
1572
1573 let (instances_deleted, executions_deleted, events_deleted, queue_messages_deleted) =
1574 row.unwrap_or((0, 0, 0, 0));
1575
1576 debug!(
1577 target = "duroxide::providers::postgres",
1578 operation = "delete_instances_atomic",
1579 instances_deleted = instances_deleted,
1580 executions_deleted = executions_deleted,
1581 events_deleted = events_deleted,
1582 queue_messages_deleted = queue_messages_deleted,
1583 "Deleted instances atomically"
1584 );
1585
1586 Ok(DeleteInstanceResult {
1587 instances_deleted: instances_deleted as u64,
1588 executions_deleted: executions_deleted as u64,
1589 events_deleted: events_deleted as u64,
1590 queue_messages_deleted: queue_messages_deleted as u64,
1591 })
1592 }
1593
1594 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1595 async fn delete_instance_bulk(
1596 &self,
1597 filter: InstanceFilter,
1598 ) -> Result<DeleteInstanceResult, ProviderError> {
1599 let mut sql = format!(
1601 r#"
1602 SELECT i.instance_id
1603 FROM {}.instances i
1604 LEFT JOIN {}.executions e ON i.instance_id = e.instance_id
1605 AND i.current_execution_id = e.execution_id
1606 WHERE i.parent_instance_id IS NULL
1607 AND e.status IN ('Completed', 'Failed', 'ContinuedAsNew')
1608 "#,
1609 self.schema_name, self.schema_name
1610 );
1611
1612 if let Some(ref ids) = filter.instance_ids {
1614 if ids.is_empty() {
1615 return Ok(DeleteInstanceResult::default());
1616 }
1617 let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1618 sql.push_str(&format!(
1619 " AND i.instance_id IN ({})",
1620 placeholders.join(", ")
1621 ));
1622 }
1623
1624 if filter.completed_before.is_some() {
1626 let param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0) + 1;
1627 sql.push_str(&format!(
1628 " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1629 param_num
1630 ));
1631 }
1632
1633 let limit = filter.limit.unwrap_or(1000);
1635 let limit_param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0)
1636 + if filter.completed_before.is_some() { 1 } else { 0 }
1637 + 1;
1638 sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1639
1640 let mut query = sqlx::query_scalar::<_, String>(&sql);
1642 if let Some(ref ids) = filter.instance_ids {
1643 for id in ids {
1644 query = query.bind(id);
1645 }
1646 }
1647 if let Some(completed_before) = filter.completed_before {
1648 query = query.bind(completed_before as i64);
1649 }
1650 query = query.bind(limit as i64);
1651
1652 let instance_ids: Vec<String> = query
1653 .fetch_all(&*self.pool)
1654 .await
1655 .map_err(|e| Self::sqlx_to_provider_error("delete_instance_bulk", e))?;
1656
1657 if instance_ids.is_empty() {
1658 return Ok(DeleteInstanceResult::default());
1659 }
1660
1661 let mut result = DeleteInstanceResult::default();
1663
1664 for instance_id in &instance_ids {
1665 let tree = self.get_instance_tree(instance_id).await?;
1667
1668 let delete_result = self.delete_instances_atomic(&tree.all_ids, true).await?;
1670 result.instances_deleted += delete_result.instances_deleted;
1671 result.executions_deleted += delete_result.executions_deleted;
1672 result.events_deleted += delete_result.events_deleted;
1673 result.queue_messages_deleted += delete_result.queue_messages_deleted;
1674 }
1675
1676 debug!(
1677 target = "duroxide::providers::postgres",
1678 operation = "delete_instance_bulk",
1679 instances_deleted = result.instances_deleted,
1680 executions_deleted = result.executions_deleted,
1681 events_deleted = result.events_deleted,
1682 queue_messages_deleted = result.queue_messages_deleted,
1683 "Bulk deleted instances"
1684 );
1685
1686 Ok(result)
1687 }
1688
1689 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1692 async fn prune_executions(
1693 &self,
1694 instance_id: &str,
1695 options: PruneOptions,
1696 ) -> Result<PruneResult, ProviderError> {
1697 let keep_last: Option<i32> = options.keep_last.map(|v| v as i32);
1698 let completed_before_ms: Option<i64> = options.completed_before.map(|v| v as i64);
1699
1700 let row: Option<(i64, i64, i64)> = sqlx::query_as(&format!(
1701 "SELECT * FROM {}.prune_executions($1, $2, $3)",
1702 self.schema_name
1703 ))
1704 .bind(instance_id)
1705 .bind(keep_last)
1706 .bind(completed_before_ms)
1707 .fetch_optional(&*self.pool)
1708 .await
1709 .map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?;
1710
1711 let (instances_processed, executions_deleted, events_deleted) = row.unwrap_or((0, 0, 0));
1712
1713 debug!(
1714 target = "duroxide::providers::postgres",
1715 operation = "prune_executions",
1716 instance_id = %instance_id,
1717 instances_processed = instances_processed,
1718 executions_deleted = executions_deleted,
1719 events_deleted = events_deleted,
1720 "Pruned executions"
1721 );
1722
1723 Ok(PruneResult {
1724 instances_processed: instances_processed as u64,
1725 executions_deleted: executions_deleted as u64,
1726 events_deleted: events_deleted as u64,
1727 })
1728 }
1729
1730 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1731 async fn prune_executions_bulk(
1732 &self,
1733 filter: InstanceFilter,
1734 options: PruneOptions,
1735 ) -> Result<PruneResult, ProviderError> {
1736 let mut sql = format!(
1741 r#"
1742 SELECT i.instance_id
1743 FROM {}.instances i
1744 LEFT JOIN {}.executions e ON i.instance_id = e.instance_id
1745 AND i.current_execution_id = e.execution_id
1746 WHERE 1=1
1747 "#,
1748 self.schema_name, self.schema_name
1749 );
1750
1751 if let Some(ref ids) = filter.instance_ids {
1753 if ids.is_empty() {
1754 return Ok(PruneResult::default());
1755 }
1756 let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1757 sql.push_str(&format!(
1758 " AND i.instance_id IN ({})",
1759 placeholders.join(", ")
1760 ));
1761 }
1762
1763 if filter.completed_before.is_some() {
1765 let param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0) + 1;
1766 sql.push_str(&format!(
1767 " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1768 param_num
1769 ));
1770 }
1771
1772 let limit = filter.limit.unwrap_or(1000);
1774 let limit_param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0)
1775 + if filter.completed_before.is_some() { 1 } else { 0 }
1776 + 1;
1777 sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1778
1779 let mut query = sqlx::query_scalar::<_, String>(&sql);
1781 if let Some(ref ids) = filter.instance_ids {
1782 for id in ids {
1783 query = query.bind(id);
1784 }
1785 }
1786 if let Some(completed_before) = filter.completed_before {
1787 query = query.bind(completed_before as i64);
1788 }
1789 query = query.bind(limit as i64);
1790
1791 let instance_ids: Vec<String> = query
1792 .fetch_all(&*self.pool)
1793 .await
1794 .map_err(|e| Self::sqlx_to_provider_error("prune_executions_bulk", e))?;
1795
1796 let mut result = PruneResult::default();
1798
1799 for instance_id in &instance_ids {
1800 let single_result = self.prune_executions(instance_id, options.clone()).await?;
1801 result.instances_processed += single_result.instances_processed;
1802 result.executions_deleted += single_result.executions_deleted;
1803 result.events_deleted += single_result.events_deleted;
1804 }
1805
1806 debug!(
1807 target = "duroxide::providers::postgres",
1808 operation = "prune_executions_bulk",
1809 instances_processed = result.instances_processed,
1810 executions_deleted = result.executions_deleted,
1811 events_deleted = result.events_deleted,
1812 "Bulk pruned executions"
1813 );
1814
1815 Ok(result)
1816 }
1817}