1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4 DeleteInstanceResult, ExecutionInfo, ExecutionMetadata, InstanceFilter, InstanceInfo,
5 OrchestrationItem, Provider, ProviderAdmin, ProviderError, PruneOptions, PruneResult,
6 QueueDepths, ScheduledActivityIdentifier, SystemMetrics, WorkItem,
7};
8use duroxide::Event;
9use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
10use std::sync::Arc;
11use std::time::Duration;
12use std::time::{SystemTime, UNIX_EPOCH};
13use tokio::time::sleep;
14use tracing::{debug, error, instrument, warn};
15
16use crate::migrations::MigrationRunner;
17
18pub struct PostgresProvider {
41 pool: Arc<PgPool>,
42 schema_name: String,
43}
44
45impl PostgresProvider {
46 pub async fn new(database_url: &str) -> Result<Self> {
47 Self::new_with_schema(database_url, None).await
48 }
49
50 pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
51 let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
52 .ok()
53 .and_then(|s| s.parse::<u32>().ok())
54 .unwrap_or(10);
55
56 let pool = PgPoolOptions::new()
57 .max_connections(max_connections)
58 .min_connections(1)
59 .acquire_timeout(std::time::Duration::from_secs(30))
60 .connect(database_url)
61 .await?;
62
63 let schema_name = schema_name.unwrap_or("public").to_string();
64
65 let provider = Self {
66 pool: Arc::new(pool),
67 schema_name: schema_name.clone(),
68 };
69
70 let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
72 migration_runner.migrate().await?;
73
74 Ok(provider)
75 }
76
77 #[instrument(skip(self), target = "duroxide::providers::postgres")]
78 pub async fn initialize_schema(&self) -> Result<()> {
79 let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
82 migration_runner.migrate().await?;
83 Ok(())
84 }
85
86 fn now_millis() -> i64 {
88 SystemTime::now()
89 .duration_since(UNIX_EPOCH)
90 .unwrap()
91 .as_millis() as i64
92 }
93
94
95
96 fn table_name(&self, table: &str) -> String {
98 format!("{}.{}", self.schema_name, table)
99 }
100
101 pub fn pool(&self) -> &PgPool {
103 &self.pool
104 }
105
106 pub fn schema_name(&self) -> &str {
108 &self.schema_name
109 }
110
111 fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
113 match e {
114 SqlxError::Database(ref db_err) => {
115 let code_opt = db_err.code();
117 let code = code_opt.as_deref();
118 if code == Some("40P01") {
119 ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
121 } else if code == Some("40001") {
122 ProviderError::permanent(operation, format!("Serialization failure: {e}"))
124 } else if code == Some("23505") {
125 ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
127 } else if code == Some("23503") {
128 ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
130 } else {
131 ProviderError::permanent(operation, format!("Database error: {e}"))
132 }
133 }
134 SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
135 ProviderError::retryable(operation, format!("Connection pool error: {e}"))
136 }
137 SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
138 _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
139 }
140 }
141
142 pub async fn cleanup_schema(&self) -> Result<()> {
147 sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
149 .execute(&*self.pool)
150 .await?;
151
152 if self.schema_name != "public" {
155 sqlx::query(&format!(
156 "DROP SCHEMA IF EXISTS {} CASCADE",
157 self.schema_name
158 ))
159 .execute(&*self.pool)
160 .await?;
161 } else {
162 }
165
166 Ok(())
167 }
168}
169
170#[async_trait::async_trait]
171impl Provider for PostgresProvider {
172 fn name(&self) -> &str {
173 "duroxide-pg"
174 }
175
176 fn version(&self) -> &str {
177 env!("CARGO_PKG_VERSION")
178 }
179
180 #[instrument(skip(self), target = "duroxide::providers::postgres")]
181 async fn fetch_orchestration_item(
182 &self,
183 lock_timeout: Duration,
184 _poll_timeout: Duration,
185 ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
186 let start = std::time::Instant::now();
187
188 const MAX_RETRIES: u32 = 3;
189 const RETRY_DELAY_MS: u64 = 50;
190
191 let lock_timeout_ms = lock_timeout.as_millis() as i64;
193 let mut _last_error: Option<ProviderError> = None;
194
195 for attempt in 0..=MAX_RETRIES {
196 let now_ms = Self::now_millis();
197
198 let result: Result<
199 Option<(
200 String,
201 String,
202 String,
203 i64,
204 serde_json::Value,
205 serde_json::Value,
206 String,
207 i32,
208 )>,
209 SqlxError,
210 > = sqlx::query_as(&format!(
211 "SELECT * FROM {}.fetch_orchestration_item($1, $2)",
212 self.schema_name
213 ))
214 .bind(now_ms)
215 .bind(lock_timeout_ms)
216 .fetch_optional(&*self.pool)
217 .await;
218
219 let row = match result {
220 Ok(r) => r,
221 Err(e) => {
222 let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
223 if provider_err.is_retryable() && attempt < MAX_RETRIES {
224 warn!(
225 target = "duroxide::providers::postgres",
226 operation = "fetch_orchestration_item",
227 attempt = attempt + 1,
228 error = %provider_err,
229 "Retryable error, will retry"
230 );
231 _last_error = Some(provider_err);
232 sleep(std::time::Duration::from_millis(
233 RETRY_DELAY_MS * (attempt as u64 + 1),
234 ))
235 .await;
236 continue;
237 }
238 return Err(provider_err);
239 }
240 };
241
242 if let Some((
243 instance_id,
244 orchestration_name,
245 orchestration_version,
246 execution_id,
247 history_json,
248 messages_json,
249 lock_token,
250 attempt_count,
251 )) = row
252 {
253 let history: Vec<Event> = serde_json::from_value(history_json).map_err(|e| {
254 ProviderError::permanent(
255 "fetch_orchestration_item",
256 format!("Failed to deserialize history: {e}"),
257 )
258 })?;
259
260 let messages: Vec<WorkItem> =
261 serde_json::from_value(messages_json).map_err(|e| {
262 ProviderError::permanent(
263 "fetch_orchestration_item",
264 format!("Failed to deserialize messages: {e}"),
265 )
266 })?;
267
268 let duration_ms = start.elapsed().as_millis() as u64;
269 debug!(
270 target = "duroxide::providers::postgres",
271 operation = "fetch_orchestration_item",
272 instance_id = %instance_id,
273 execution_id = execution_id,
274 message_count = messages.len(),
275 history_count = history.len(),
276 attempt_count = attempt_count,
277 duration_ms = duration_ms,
278 attempts = attempt + 1,
279 "Fetched orchestration item via stored procedure"
280 );
281
282 return Ok(Some((
283 OrchestrationItem {
284 instance: instance_id,
285 orchestration_name,
286 execution_id: execution_id as u64,
287 version: orchestration_version,
288 history,
289 messages,
290 },
291 lock_token,
292 attempt_count as u32,
293 )));
294 }
295
296 return Ok(None);
299 }
300
301 Ok(None)
302 }
303 #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
304 async fn ack_orchestration_item(
305 &self,
306 lock_token: &str,
307 execution_id: u64,
308 history_delta: Vec<Event>,
309 worker_items: Vec<WorkItem>,
310 orchestrator_items: Vec<WorkItem>,
311 metadata: ExecutionMetadata,
312 cancelled_activities: Vec<ScheduledActivityIdentifier>,
313 ) -> Result<(), ProviderError> {
314 let start = std::time::Instant::now();
315
316 const MAX_RETRIES: u32 = 3;
317 const RETRY_DELAY_MS: u64 = 50;
318
319 let mut history_delta_payload = Vec::with_capacity(history_delta.len());
320 for event in &history_delta {
321 if event.event_id() == 0 {
322 return Err(ProviderError::permanent(
323 "ack_orchestration_item",
324 "event_id must be set by runtime",
325 ));
326 }
327
328 let event_json = serde_json::to_string(event).map_err(|e| {
329 ProviderError::permanent(
330 "ack_orchestration_item",
331 format!("Failed to serialize event: {e}"),
332 )
333 })?;
334
335 let event_type = format!("{event:?}")
336 .split('{')
337 .next()
338 .unwrap_or("Unknown")
339 .trim()
340 .to_string();
341
342 history_delta_payload.push(serde_json::json!({
343 "event_id": event.event_id(),
344 "event_type": event_type,
345 "event_data": event_json,
346 }));
347 }
348
349 let history_delta_json = serde_json::Value::Array(history_delta_payload);
350
351 let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
352 ProviderError::permanent(
353 "ack_orchestration_item",
354 format!("Failed to serialize worker items: {e}"),
355 )
356 })?;
357
358 let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
359 ProviderError::permanent(
360 "ack_orchestration_item",
361 format!("Failed to serialize orchestrator items: {e}"),
362 )
363 })?;
364
365 let metadata_json = serde_json::json!({
366 "orchestration_name": metadata.orchestration_name,
367 "orchestration_version": metadata.orchestration_version,
368 "status": metadata.status,
369 "output": metadata.output,
370 "parent_instance_id": metadata.parent_instance_id,
371 });
372
373 let cancelled_activities_json: Vec<serde_json::Value> = cancelled_activities
375 .iter()
376 .map(|a| {
377 serde_json::json!({
378 "instance": a.instance,
379 "execution_id": a.execution_id,
380 "activity_id": a.activity_id,
381 })
382 })
383 .collect();
384 let cancelled_activities_json = serde_json::Value::Array(cancelled_activities_json);
385
386 for attempt in 0..=MAX_RETRIES {
387 let result = sqlx::query(&format!(
388 "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6, $7)",
389 self.schema_name
390 ))
391 .bind(lock_token)
392 .bind(execution_id as i64)
393 .bind(&history_delta_json)
394 .bind(&worker_items_json)
395 .bind(&orchestrator_items_json)
396 .bind(&metadata_json)
397 .bind(&cancelled_activities_json)
398 .execute(&*self.pool)
399 .await;
400
401 match result {
402 Ok(_) => {
403 let duration_ms = start.elapsed().as_millis() as u64;
404 debug!(
405 target = "duroxide::providers::postgres",
406 operation = "ack_orchestration_item",
407 execution_id = execution_id,
408 history_count = history_delta.len(),
409 worker_items_count = worker_items.len(),
410 orchestrator_items_count = orchestrator_items.len(),
411 cancelled_activities_count = cancelled_activities.len(),
412 duration_ms = duration_ms,
413 attempts = attempt + 1,
414 "Acknowledged orchestration item via stored procedure"
415 );
416 return Ok(());
417 }
418 Err(e) => {
419 if let SqlxError::Database(db_err) = &e {
421 if db_err.message().contains("Invalid lock token") {
422 return Err(ProviderError::permanent(
423 "ack_orchestration_item",
424 "Invalid lock token",
425 ));
426 }
427 } else if e.to_string().contains("Invalid lock token") {
428 return Err(ProviderError::permanent(
429 "ack_orchestration_item",
430 "Invalid lock token",
431 ));
432 }
433
434 let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
435 if provider_err.is_retryable() && attempt < MAX_RETRIES {
436 warn!(
437 target = "duroxide::providers::postgres",
438 operation = "ack_orchestration_item",
439 attempt = attempt + 1,
440 error = %provider_err,
441 "Retryable error, will retry"
442 );
443 sleep(std::time::Duration::from_millis(
444 RETRY_DELAY_MS * (attempt as u64 + 1),
445 ))
446 .await;
447 continue;
448 }
449 return Err(provider_err);
450 }
451 }
452 }
453
454 Ok(())
456 }
457 #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
458 async fn abandon_orchestration_item(
459 &self,
460 lock_token: &str,
461 delay: Option<Duration>,
462 ignore_attempt: bool,
463 ) -> Result<(), ProviderError> {
464 let start = std::time::Instant::now();
465 let now_ms = Self::now_millis();
466 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
467
468 let instance_id = match sqlx::query_scalar::<_, String>(&format!(
469 "SELECT {}.abandon_orchestration_item($1, $2, $3, $4)",
470 self.schema_name
471 ))
472 .bind(lock_token)
473 .bind(now_ms)
474 .bind(delay_param)
475 .bind(ignore_attempt)
476 .fetch_one(&*self.pool)
477 .await
478 {
479 Ok(instance_id) => instance_id,
480 Err(e) => {
481 if let SqlxError::Database(db_err) = &e {
482 if db_err.message().contains("Invalid lock token") {
483 return Err(ProviderError::permanent(
484 "abandon_orchestration_item",
485 "Invalid lock token",
486 ));
487 }
488 } else if e.to_string().contains("Invalid lock token") {
489 return Err(ProviderError::permanent(
490 "abandon_orchestration_item",
491 "Invalid lock token",
492 ));
493 }
494
495 return Err(Self::sqlx_to_provider_error(
496 "abandon_orchestration_item",
497 e,
498 ));
499 }
500 };
501
502 let duration_ms = start.elapsed().as_millis() as u64;
503 debug!(
504 target = "duroxide::providers::postgres",
505 operation = "abandon_orchestration_item",
506 instance_id = %instance_id,
507 delay_ms = delay.map(|d| d.as_millis() as u64),
508 ignore_attempt = ignore_attempt,
509 duration_ms = duration_ms,
510 "Abandoned orchestration item via stored procedure"
511 );
512
513 Ok(())
514 }
515
516 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
517 async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
518 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
519 "SELECT out_event_data FROM {}.fetch_history($1)",
520 self.schema_name
521 ))
522 .bind(instance)
523 .fetch_all(&*self.pool)
524 .await
525 .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
526
527 Ok(event_data_rows
528 .into_iter()
529 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
530 .collect())
531 }
532
533 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
534 async fn append_with_execution(
535 &self,
536 instance: &str,
537 execution_id: u64,
538 new_events: Vec<Event>,
539 ) -> Result<(), ProviderError> {
540 if new_events.is_empty() {
541 return Ok(());
542 }
543
544 let mut events_payload = Vec::with_capacity(new_events.len());
545 for event in &new_events {
546 if event.event_id() == 0 {
547 error!(
548 target = "duroxide::providers::postgres",
549 operation = "append_with_execution",
550 error_type = "validation_error",
551 instance_id = %instance,
552 execution_id = execution_id,
553 "event_id must be set by runtime"
554 );
555 return Err(ProviderError::permanent(
556 "append_with_execution",
557 "event_id must be set by runtime",
558 ));
559 }
560
561 let event_json = serde_json::to_string(event).map_err(|e| {
562 ProviderError::permanent(
563 "append_with_execution",
564 format!("Failed to serialize event: {e}"),
565 )
566 })?;
567
568 let event_type = format!("{event:?}")
569 .split('{')
570 .next()
571 .unwrap_or("Unknown")
572 .trim()
573 .to_string();
574
575 events_payload.push(serde_json::json!({
576 "event_id": event.event_id(),
577 "event_type": event_type,
578 "event_data": event_json,
579 }));
580 }
581
582 let events_json = serde_json::Value::Array(events_payload);
583
584 sqlx::query(&format!(
585 "SELECT {}.append_history($1, $2, $3)",
586 self.schema_name
587 ))
588 .bind(instance)
589 .bind(execution_id as i64)
590 .bind(events_json)
591 .execute(&*self.pool)
592 .await
593 .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
594
595 debug!(
596 target = "duroxide::providers::postgres",
597 operation = "append_with_execution",
598 instance_id = %instance,
599 execution_id = execution_id,
600 event_count = new_events.len(),
601 "Appended history events via stored procedure"
602 );
603
604 Ok(())
605 }
606
607 #[instrument(skip(self), target = "duroxide::providers::postgres")]
608 async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
609 let work_item = serde_json::to_string(&item).map_err(|e| {
610 ProviderError::permanent(
611 "enqueue_worker_work",
612 format!("Failed to serialize work item: {e}"),
613 )
614 })?;
615
616 let now_ms = Self::now_millis();
617
618 let (instance_id, execution_id, activity_id) = match &item {
620 WorkItem::ActivityExecute {
621 instance,
622 execution_id,
623 id,
624 ..
625 } => (
626 Some(instance.clone()),
627 Some(*execution_id as i64),
628 Some(*id as i64),
629 ),
630 _ => (None, None, None),
631 };
632
633 sqlx::query(&format!(
634 "SELECT {}.enqueue_worker_work($1, $2, $3, $4, $5)",
635 self.schema_name
636 ))
637 .bind(work_item)
638 .bind(now_ms)
639 .bind(&instance_id)
640 .bind(execution_id)
641 .bind(activity_id)
642 .execute(&*self.pool)
643 .await
644 .map_err(|e| {
645 error!(
646 target = "duroxide::providers::postgres",
647 operation = "enqueue_worker_work",
648 error_type = "database_error",
649 error = %e,
650 "Failed to enqueue worker work"
651 );
652 Self::sqlx_to_provider_error("enqueue_worker_work", e)
653 })?;
654
655 Ok(())
656 }
657
658 #[instrument(skip(self), target = "duroxide::providers::postgres")]
659 async fn fetch_work_item(
660 &self,
661 lock_timeout: Duration,
662 _poll_timeout: Duration,
663 ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
664 let start = std::time::Instant::now();
665
666 let lock_timeout_ms = lock_timeout.as_millis() as i64;
668
669 let row = match sqlx::query_as::<_, (String, String, i32)>(&format!(
670 "SELECT * FROM {}.fetch_work_item($1, $2)",
671 self.schema_name
672 ))
673 .bind(Self::now_millis())
674 .bind(lock_timeout_ms)
675 .fetch_optional(&*self.pool)
676 .await
677 {
678 Ok(row) => row,
679 Err(e) => {
680 return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
681 }
682 };
683
684 let (work_item_json, lock_token, attempt_count) = match row {
685 Some(row) => row,
686 None => return Ok(None),
687 };
688
689 let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
690 ProviderError::permanent(
691 "fetch_work_item",
692 format!("Failed to deserialize worker item: {e}"),
693 )
694 })?;
695
696 let duration_ms = start.elapsed().as_millis() as u64;
697
698 let instance_id = match &work_item {
700 WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
701 WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
702 WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
703 WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
704 WorkItem::TimerFired { instance, .. } => instance.as_str(),
705 WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
706 WorkItem::CancelInstance { instance, .. } => instance.as_str(),
707 WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
708 WorkItem::SubOrchCompleted {
709 parent_instance, ..
710 } => parent_instance.as_str(),
711 WorkItem::SubOrchFailed {
712 parent_instance, ..
713 } => parent_instance.as_str(),
714 };
715
716 debug!(
717 target = "duroxide::providers::postgres",
718 operation = "fetch_work_item",
719 instance_id = %instance_id,
720 attempt_count = attempt_count,
721 duration_ms = duration_ms,
722 "Fetched activity work item via stored procedure"
723 );
724
725 Ok(Some((
726 work_item,
727 lock_token,
728 attempt_count as u32,
729 )))
730 }
731
732 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
733 async fn ack_work_item(
734 &self,
735 token: &str,
736 completion: Option<WorkItem>,
737 ) -> Result<(), ProviderError> {
738 let start = std::time::Instant::now();
739
740 let Some(completion) = completion else {
742 sqlx::query(&format!(
744 "SELECT {}.ack_worker($1)",
745 self.schema_name
746 ))
747 .bind(token)
748 .execute(&*self.pool)
749 .await
750 .map_err(|e| {
751 if e.to_string().contains("Worker queue item not found") {
752 ProviderError::permanent(
753 "ack_worker",
754 "Worker queue item not found or already processed",
755 )
756 } else {
757 Self::sqlx_to_provider_error("ack_worker", e)
758 }
759 })?;
760
761 let duration_ms = start.elapsed().as_millis() as u64;
762 debug!(
763 target = "duroxide::providers::postgres",
764 operation = "ack_worker",
765 token = %token,
766 duration_ms = duration_ms,
767 "Acknowledged worker without completion (cancelled)"
768 );
769 return Ok(());
770 };
771
772 let instance_id = match &completion {
774 WorkItem::ActivityCompleted { instance, .. }
775 | WorkItem::ActivityFailed { instance, .. } => instance,
776 _ => {
777 error!(
778 target = "duroxide::providers::postgres",
779 operation = "ack_worker",
780 error_type = "invalid_completion_type",
781 "Invalid completion work item type"
782 );
783 return Err(ProviderError::permanent(
784 "ack_worker",
785 "Invalid completion work item type",
786 ));
787 }
788 };
789
790 let completion_json = serde_json::to_string(&completion).map_err(|e| {
791 ProviderError::permanent("ack_worker", format!("Failed to serialize completion: {e}"))
792 })?;
793
794 let now_ms = Self::now_millis();
795
796 sqlx::query(&format!(
798 "SELECT {}.ack_worker($1, $2, $3, $4)",
799 self.schema_name
800 ))
801 .bind(token)
802 .bind(instance_id)
803 .bind(completion_json)
804 .bind(now_ms)
805 .execute(&*self.pool)
806 .await
807 .map_err(|e| {
808 if e.to_string().contains("Worker queue item not found") {
809 error!(
810 target = "duroxide::providers::postgres",
811 operation = "ack_worker",
812 error_type = "worker_item_not_found",
813 token = %token,
814 "Worker queue item not found or already processed"
815 );
816 ProviderError::permanent(
817 "ack_worker",
818 "Worker queue item not found or already processed",
819 )
820 } else {
821 Self::sqlx_to_provider_error("ack_worker", e)
822 }
823 })?;
824
825 let duration_ms = start.elapsed().as_millis() as u64;
826 debug!(
827 target = "duroxide::providers::postgres",
828 operation = "ack_worker",
829 instance_id = %instance_id,
830 duration_ms = duration_ms,
831 "Acknowledged worker and enqueued completion"
832 );
833
834 Ok(())
835 }
836
837 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
838 async fn renew_work_item_lock(
839 &self,
840 token: &str,
841 extend_for: Duration,
842 ) -> Result<(), ProviderError> {
843 let start = std::time::Instant::now();
844
845 let now_ms = Self::now_millis();
847
848 let extend_secs = extend_for.as_secs() as i64;
850
851 match sqlx::query(&format!(
852 "SELECT {}.renew_work_item_lock($1, $2, $3)",
853 self.schema_name
854 ))
855 .bind(token)
856 .bind(now_ms)
857 .bind(extend_secs)
858 .execute(&*self.pool)
859 .await
860 {
861 Ok(_) => {
862 let duration_ms = start.elapsed().as_millis() as u64;
863 debug!(
864 target = "duroxide::providers::postgres",
865 operation = "renew_work_item_lock",
866 token = %token,
867 extend_for_secs = extend_secs,
868 duration_ms = duration_ms,
869 "Work item lock renewed successfully"
870 );
871 Ok(())
872 }
873 Err(e) => {
874 if let SqlxError::Database(db_err) = &e {
875 if db_err.message().contains("Lock token invalid") {
876 return Err(ProviderError::permanent(
877 "renew_work_item_lock",
878 "Lock token invalid, expired, or already acked",
879 ));
880 }
881 } else if e.to_string().contains("Lock token invalid") {
882 return Err(ProviderError::permanent(
883 "renew_work_item_lock",
884 "Lock token invalid, expired, or already acked",
885 ));
886 }
887
888 Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
889 }
890 }
891 }
892
893 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
894 async fn abandon_work_item(
895 &self,
896 token: &str,
897 delay: Option<Duration>,
898 ignore_attempt: bool,
899 ) -> Result<(), ProviderError> {
900 let start = std::time::Instant::now();
901 let now_ms = Self::now_millis();
902 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
903
904 match sqlx::query(&format!(
905 "SELECT {}.abandon_work_item($1, $2, $3, $4)",
906 self.schema_name
907 ))
908 .bind(token)
909 .bind(now_ms)
910 .bind(delay_param)
911 .bind(ignore_attempt)
912 .execute(&*self.pool)
913 .await
914 {
915 Ok(_) => {
916 let duration_ms = start.elapsed().as_millis() as u64;
917 debug!(
918 target = "duroxide::providers::postgres",
919 operation = "abandon_work_item",
920 token = %token,
921 delay_ms = delay.map(|d| d.as_millis() as u64),
922 ignore_attempt = ignore_attempt,
923 duration_ms = duration_ms,
924 "Abandoned work item via stored procedure"
925 );
926 Ok(())
927 }
928 Err(e) => {
929 if let SqlxError::Database(db_err) = &e {
930 if db_err.message().contains("Invalid lock token")
931 || db_err.message().contains("already acked")
932 {
933 return Err(ProviderError::permanent(
934 "abandon_work_item",
935 "Invalid lock token or already acked",
936 ));
937 }
938 } else if e.to_string().contains("Invalid lock token")
939 || e.to_string().contains("already acked")
940 {
941 return Err(ProviderError::permanent(
942 "abandon_work_item",
943 "Invalid lock token or already acked",
944 ));
945 }
946
947 Err(Self::sqlx_to_provider_error("abandon_work_item", e))
948 }
949 }
950 }
951
952 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
953 async fn renew_orchestration_item_lock(
954 &self,
955 token: &str,
956 extend_for: Duration,
957 ) -> Result<(), ProviderError> {
958 let start = std::time::Instant::now();
959
960 let now_ms = Self::now_millis();
962
963 let extend_secs = extend_for.as_secs() as i64;
965
966 match sqlx::query(&format!(
967 "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
968 self.schema_name
969 ))
970 .bind(token)
971 .bind(now_ms)
972 .bind(extend_secs)
973 .execute(&*self.pool)
974 .await
975 {
976 Ok(_) => {
977 let duration_ms = start.elapsed().as_millis() as u64;
978 debug!(
979 target = "duroxide::providers::postgres",
980 operation = "renew_orchestration_item_lock",
981 token = %token,
982 extend_for_secs = extend_secs,
983 duration_ms = duration_ms,
984 "Orchestration item lock renewed successfully"
985 );
986 Ok(())
987 }
988 Err(e) => {
989 if let SqlxError::Database(db_err) = &e {
990 if db_err.message().contains("Lock token invalid")
991 || db_err.message().contains("expired")
992 || db_err.message().contains("already released")
993 {
994 return Err(ProviderError::permanent(
995 "renew_orchestration_item_lock",
996 "Lock token invalid, expired, or already released",
997 ));
998 }
999 } else if e.to_string().contains("Lock token invalid")
1000 || e.to_string().contains("expired")
1001 || e.to_string().contains("already released")
1002 {
1003 return Err(ProviderError::permanent(
1004 "renew_orchestration_item_lock",
1005 "Lock token invalid, expired, or already released",
1006 ));
1007 }
1008
1009 Err(Self::sqlx_to_provider_error(
1010 "renew_orchestration_item_lock",
1011 e,
1012 ))
1013 }
1014 }
1015 }
1016
1017 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1018 async fn enqueue_for_orchestrator(
1019 &self,
1020 item: WorkItem,
1021 delay: Option<Duration>,
1022 ) -> Result<(), ProviderError> {
1023 let work_item = serde_json::to_string(&item).map_err(|e| {
1024 ProviderError::permanent(
1025 "enqueue_orchestrator_work",
1026 format!("Failed to serialize work item: {e}"),
1027 )
1028 })?;
1029
1030 let instance_id = match &item {
1032 WorkItem::StartOrchestration { instance, .. }
1033 | WorkItem::ActivityCompleted { instance, .. }
1034 | WorkItem::ActivityFailed { instance, .. }
1035 | WorkItem::TimerFired { instance, .. }
1036 | WorkItem::ExternalRaised { instance, .. }
1037 | WorkItem::CancelInstance { instance, .. }
1038 | WorkItem::ContinueAsNew { instance, .. } => instance,
1039 WorkItem::SubOrchCompleted {
1040 parent_instance, ..
1041 }
1042 | WorkItem::SubOrchFailed {
1043 parent_instance, ..
1044 } => parent_instance,
1045 WorkItem::ActivityExecute { .. } => {
1046 return Err(ProviderError::permanent(
1047 "enqueue_orchestrator_work",
1048 "ActivityExecute should go to worker queue, not orchestrator queue",
1049 ));
1050 }
1051 };
1052
1053 let now_ms = Self::now_millis();
1055
1056 let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
1057 if *fire_at_ms > 0 {
1058 if let Some(delay) = delay {
1060 std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
1061 } else {
1062 *fire_at_ms
1063 }
1064 } else {
1065 delay
1067 .map(|d| now_ms as u64 + d.as_millis() as u64)
1068 .unwrap_or(now_ms as u64)
1069 }
1070 } else {
1071 delay
1073 .map(|d| now_ms as u64 + d.as_millis() as u64)
1074 .unwrap_or(now_ms as u64)
1075 };
1076
1077 let visible_at = Utc
1078 .timestamp_millis_opt(visible_at_ms as i64)
1079 .single()
1080 .ok_or_else(|| {
1081 ProviderError::permanent(
1082 "enqueue_orchestrator_work",
1083 "Invalid visible_at timestamp",
1084 )
1085 })?;
1086
1087 sqlx::query(&format!(
1092 "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
1093 self.schema_name
1094 ))
1095 .bind(instance_id)
1096 .bind(&work_item)
1097 .bind(visible_at)
1098 .bind::<Option<String>>(None) .bind::<Option<String>>(None) .bind::<Option<i64>>(None) .execute(&*self.pool)
1102 .await
1103 .map_err(|e| {
1104 error!(
1105 target = "duroxide::providers::postgres",
1106 operation = "enqueue_orchestrator_work",
1107 error_type = "database_error",
1108 error = %e,
1109 instance_id = %instance_id,
1110 "Failed to enqueue orchestrator work"
1111 );
1112 Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1113 })?;
1114
1115 debug!(
1116 target = "duroxide::providers::postgres",
1117 operation = "enqueue_orchestrator_work",
1118 instance_id = %instance_id,
1119 delay_ms = delay.map(|d| d.as_millis() as u64),
1120 "Enqueued orchestrator work"
1121 );
1122
1123 Ok(())
1124 }
1125
1126 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1127 async fn read_with_execution(
1128 &self,
1129 instance: &str,
1130 execution_id: u64,
1131 ) -> Result<Vec<Event>, ProviderError> {
1132 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1133 "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1134 self.table_name("history")
1135 ))
1136 .bind(instance)
1137 .bind(execution_id as i64)
1138 .fetch_all(&*self.pool)
1139 .await
1140 .ok()
1141 .unwrap_or_default();
1142
1143 Ok(event_data_rows
1144 .into_iter()
1145 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1146 .collect())
1147 }
1148
1149 fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1150 Some(self)
1151 }
1152}
1153
1154#[async_trait::async_trait]
1155impl ProviderAdmin for PostgresProvider {
1156 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1157 async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1158 sqlx::query_scalar(&format!(
1159 "SELECT instance_id FROM {}.list_instances()",
1160 self.schema_name
1161 ))
1162 .fetch_all(&*self.pool)
1163 .await
1164 .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1165 }
1166
1167 #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1168 async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1169 sqlx::query_scalar(&format!(
1170 "SELECT instance_id FROM {}.list_instances_by_status($1)",
1171 self.schema_name
1172 ))
1173 .bind(status)
1174 .fetch_all(&*self.pool)
1175 .await
1176 .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1177 }
1178
1179 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1180 async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1181 let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1182 "SELECT execution_id FROM {}.list_executions($1)",
1183 self.schema_name
1184 ))
1185 .bind(instance)
1186 .fetch_all(&*self.pool)
1187 .await
1188 .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1189
1190 Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1191 }
1192
1193 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1194 async fn read_history_with_execution_id(
1195 &self,
1196 instance: &str,
1197 execution_id: u64,
1198 ) -> Result<Vec<Event>, ProviderError> {
1199 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1200 "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1201 self.schema_name
1202 ))
1203 .bind(instance)
1204 .bind(execution_id as i64)
1205 .fetch_all(&*self.pool)
1206 .await
1207 .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1208
1209 event_data_rows
1210 .into_iter()
1211 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1212 .collect::<Vec<Event>>()
1213 .into_iter()
1214 .map(Ok)
1215 .collect()
1216 }
1217
1218 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1219 async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1220 let execution_id = self.latest_execution_id(instance).await?;
1221 self.read_history_with_execution_id(instance, execution_id)
1222 .await
1223 }
1224
1225 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1226 async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1227 sqlx::query_scalar(&format!(
1228 "SELECT {}.latest_execution_id($1)",
1229 self.schema_name
1230 ))
1231 .bind(instance)
1232 .fetch_optional(&*self.pool)
1233 .await
1234 .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1235 .map(|id: i64| id as u64)
1236 .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1237 }
1238
1239 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1240 async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1241 let row: Option<(
1242 String,
1243 String,
1244 String,
1245 i64,
1246 chrono::DateTime<Utc>,
1247 Option<chrono::DateTime<Utc>>,
1248 Option<String>,
1249 Option<String>,
1250 Option<String>,
1251 )> = sqlx::query_as(&format!(
1252 "SELECT * FROM {}.get_instance_info($1)",
1253 self.schema_name
1254 ))
1255 .bind(instance)
1256 .fetch_optional(&*self.pool)
1257 .await
1258 .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1259
1260 let (
1261 instance_id,
1262 orchestration_name,
1263 orchestration_version,
1264 current_execution_id,
1265 created_at,
1266 updated_at,
1267 status,
1268 output,
1269 parent_instance_id,
1270 ) =
1271 row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1272
1273 Ok(InstanceInfo {
1274 instance_id,
1275 orchestration_name,
1276 orchestration_version,
1277 current_execution_id: current_execution_id as u64,
1278 status: status.unwrap_or_else(|| "Running".to_string()),
1279 output,
1280 created_at: created_at.timestamp_millis() as u64,
1281 updated_at: updated_at
1282 .map(|dt| dt.timestamp_millis() as u64)
1283 .unwrap_or(created_at.timestamp_millis() as u64),
1284 parent_instance_id,
1285 })
1286 }
1287
1288 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1289 async fn get_execution_info(
1290 &self,
1291 instance: &str,
1292 execution_id: u64,
1293 ) -> Result<ExecutionInfo, ProviderError> {
1294 let row: Option<(
1295 i64,
1296 String,
1297 Option<String>,
1298 chrono::DateTime<Utc>,
1299 Option<chrono::DateTime<Utc>>,
1300 i64,
1301 )> = sqlx::query_as(&format!(
1302 "SELECT * FROM {}.get_execution_info($1, $2)",
1303 self.schema_name
1304 ))
1305 .bind(instance)
1306 .bind(execution_id as i64)
1307 .fetch_optional(&*self.pool)
1308 .await
1309 .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1310
1311 let (exec_id, status, output, started_at, completed_at, event_count) = row
1312 .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1313
1314 Ok(ExecutionInfo {
1315 execution_id: exec_id as u64,
1316 status,
1317 output,
1318 started_at: started_at.timestamp_millis() as u64,
1319 completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1320 event_count: event_count as usize,
1321 })
1322 }
1323
1324 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1325 async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1326 let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1327 "SELECT * FROM {}.get_system_metrics()",
1328 self.schema_name
1329 ))
1330 .fetch_optional(&*self.pool)
1331 .await
1332 .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1333
1334 let (
1335 total_instances,
1336 total_executions,
1337 running_instances,
1338 completed_instances,
1339 failed_instances,
1340 total_events,
1341 ) = row.ok_or_else(|| {
1342 ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1343 })?;
1344
1345 Ok(SystemMetrics {
1346 total_instances: total_instances as u64,
1347 total_executions: total_executions as u64,
1348 running_instances: running_instances as u64,
1349 completed_instances: completed_instances as u64,
1350 failed_instances: failed_instances as u64,
1351 total_events: total_events as u64,
1352 })
1353 }
1354
1355 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1356 async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1357 let now_ms = Self::now_millis();
1358
1359 let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1360 "SELECT * FROM {}.get_queue_depths($1)",
1361 self.schema_name
1362 ))
1363 .bind(now_ms)
1364 .fetch_optional(&*self.pool)
1365 .await
1366 .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1367
1368 let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1369 ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1370 })?;
1371
1372 Ok(QueueDepths {
1373 orchestrator_queue: orchestrator_queue as usize,
1374 worker_queue: worker_queue as usize,
1375 timer_queue: 0, })
1377 }
1378
1379 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1382 async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError> {
1383 sqlx::query_scalar(&format!(
1384 "SELECT child_instance_id FROM {}.list_children($1)",
1385 self.schema_name
1386 ))
1387 .bind(instance_id)
1388 .fetch_all(&*self.pool)
1389 .await
1390 .map_err(|e| Self::sqlx_to_provider_error("list_children", e))
1391 }
1392
1393 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1394 async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError> {
1395 let result: Result<Option<String>, _> = sqlx::query_scalar(&format!(
1398 "SELECT {}.get_parent_id($1)",
1399 self.schema_name
1400 ))
1401 .bind(instance_id)
1402 .fetch_one(&*self.pool)
1403 .await;
1404
1405 match result {
1406 Ok(parent_id) => Ok(parent_id),
1407 Err(e) => {
1408 let err_str = e.to_string();
1409 if err_str.contains("Instance not found") {
1410 Err(ProviderError::permanent(
1411 "get_parent_id",
1412 format!("Instance not found: {}", instance_id),
1413 ))
1414 } else {
1415 Err(Self::sqlx_to_provider_error("get_parent_id", e))
1416 }
1417 }
1418 }
1419 }
1420
1421 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1424 async fn delete_instances_atomic(
1425 &self,
1426 ids: &[String],
1427 force: bool,
1428 ) -> Result<DeleteInstanceResult, ProviderError> {
1429 if ids.is_empty() {
1430 return Ok(DeleteInstanceResult::default());
1431 }
1432
1433 let row: Option<(i64, i64, i64, i64)> = sqlx::query_as(&format!(
1434 "SELECT * FROM {}.delete_instances_atomic($1, $2)",
1435 self.schema_name
1436 ))
1437 .bind(ids)
1438 .bind(force)
1439 .fetch_optional(&*self.pool)
1440 .await
1441 .map_err(|e| {
1442 let err_str = e.to_string();
1443 if err_str.contains("is Running") {
1444 ProviderError::permanent(
1445 "delete_instances_atomic",
1446 err_str,
1447 )
1448 } else if err_str.contains("Orphan detected") {
1449 ProviderError::permanent(
1450 "delete_instances_atomic",
1451 err_str,
1452 )
1453 } else {
1454 Self::sqlx_to_provider_error("delete_instances_atomic", e)
1455 }
1456 })?;
1457
1458 let (instances_deleted, executions_deleted, events_deleted, queue_messages_deleted) =
1459 row.unwrap_or((0, 0, 0, 0));
1460
1461 debug!(
1462 target = "duroxide::providers::postgres",
1463 operation = "delete_instances_atomic",
1464 instances_deleted = instances_deleted,
1465 executions_deleted = executions_deleted,
1466 events_deleted = events_deleted,
1467 queue_messages_deleted = queue_messages_deleted,
1468 "Deleted instances atomically"
1469 );
1470
1471 Ok(DeleteInstanceResult {
1472 instances_deleted: instances_deleted as u64,
1473 executions_deleted: executions_deleted as u64,
1474 events_deleted: events_deleted as u64,
1475 queue_messages_deleted: queue_messages_deleted as u64,
1476 })
1477 }
1478
1479 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1480 async fn delete_instance_bulk(
1481 &self,
1482 filter: InstanceFilter,
1483 ) -> Result<DeleteInstanceResult, ProviderError> {
1484 let mut sql = format!(
1486 r#"
1487 SELECT i.instance_id
1488 FROM {}.instances i
1489 LEFT JOIN {}.executions e ON i.instance_id = e.instance_id
1490 AND i.current_execution_id = e.execution_id
1491 WHERE i.parent_instance_id IS NULL
1492 AND e.status IN ('Completed', 'Failed', 'ContinuedAsNew')
1493 "#,
1494 self.schema_name, self.schema_name
1495 );
1496
1497 if let Some(ref ids) = filter.instance_ids {
1499 if ids.is_empty() {
1500 return Ok(DeleteInstanceResult::default());
1501 }
1502 let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1503 sql.push_str(&format!(
1504 " AND i.instance_id IN ({})",
1505 placeholders.join(", ")
1506 ));
1507 }
1508
1509 if filter.completed_before.is_some() {
1511 let param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0) + 1;
1512 sql.push_str(&format!(
1513 " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1514 param_num
1515 ));
1516 }
1517
1518 let limit = filter.limit.unwrap_or(1000);
1520 let limit_param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0)
1521 + if filter.completed_before.is_some() { 1 } else { 0 }
1522 + 1;
1523 sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1524
1525 let mut query = sqlx::query_scalar::<_, String>(&sql);
1527 if let Some(ref ids) = filter.instance_ids {
1528 for id in ids {
1529 query = query.bind(id);
1530 }
1531 }
1532 if let Some(completed_before) = filter.completed_before {
1533 query = query.bind(completed_before as i64);
1534 }
1535 query = query.bind(limit as i64);
1536
1537 let instance_ids: Vec<String> = query
1538 .fetch_all(&*self.pool)
1539 .await
1540 .map_err(|e| Self::sqlx_to_provider_error("delete_instance_bulk", e))?;
1541
1542 if instance_ids.is_empty() {
1543 return Ok(DeleteInstanceResult::default());
1544 }
1545
1546 let mut result = DeleteInstanceResult::default();
1548
1549 for instance_id in &instance_ids {
1550 let tree = self.get_instance_tree(instance_id).await?;
1552
1553 let delete_result = self.delete_instances_atomic(&tree.all_ids, true).await?;
1555 result.instances_deleted += delete_result.instances_deleted;
1556 result.executions_deleted += delete_result.executions_deleted;
1557 result.events_deleted += delete_result.events_deleted;
1558 result.queue_messages_deleted += delete_result.queue_messages_deleted;
1559 }
1560
1561 debug!(
1562 target = "duroxide::providers::postgres",
1563 operation = "delete_instance_bulk",
1564 instances_deleted = result.instances_deleted,
1565 executions_deleted = result.executions_deleted,
1566 events_deleted = result.events_deleted,
1567 queue_messages_deleted = result.queue_messages_deleted,
1568 "Bulk deleted instances"
1569 );
1570
1571 Ok(result)
1572 }
1573
1574 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1577 async fn prune_executions(
1578 &self,
1579 instance_id: &str,
1580 options: PruneOptions,
1581 ) -> Result<PruneResult, ProviderError> {
1582 let keep_last: Option<i32> = options.keep_last.map(|v| v as i32);
1583 let completed_before_ms: Option<i64> = options.completed_before.map(|v| v as i64);
1584
1585 let row: Option<(i64, i64, i64)> = sqlx::query_as(&format!(
1586 "SELECT * FROM {}.prune_executions($1, $2, $3)",
1587 self.schema_name
1588 ))
1589 .bind(instance_id)
1590 .bind(keep_last)
1591 .bind(completed_before_ms)
1592 .fetch_optional(&*self.pool)
1593 .await
1594 .map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?;
1595
1596 let (instances_processed, executions_deleted, events_deleted) = row.unwrap_or((0, 0, 0));
1597
1598 debug!(
1599 target = "duroxide::providers::postgres",
1600 operation = "prune_executions",
1601 instance_id = %instance_id,
1602 instances_processed = instances_processed,
1603 executions_deleted = executions_deleted,
1604 events_deleted = events_deleted,
1605 "Pruned executions"
1606 );
1607
1608 Ok(PruneResult {
1609 instances_processed: instances_processed as u64,
1610 executions_deleted: executions_deleted as u64,
1611 events_deleted: events_deleted as u64,
1612 })
1613 }
1614
1615 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1616 async fn prune_executions_bulk(
1617 &self,
1618 filter: InstanceFilter,
1619 options: PruneOptions,
1620 ) -> Result<PruneResult, ProviderError> {
1621 let mut sql = format!(
1626 r#"
1627 SELECT i.instance_id
1628 FROM {}.instances i
1629 LEFT JOIN {}.executions e ON i.instance_id = e.instance_id
1630 AND i.current_execution_id = e.execution_id
1631 WHERE 1=1
1632 "#,
1633 self.schema_name, self.schema_name
1634 );
1635
1636 if let Some(ref ids) = filter.instance_ids {
1638 if ids.is_empty() {
1639 return Ok(PruneResult::default());
1640 }
1641 let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1642 sql.push_str(&format!(
1643 " AND i.instance_id IN ({})",
1644 placeholders.join(", ")
1645 ));
1646 }
1647
1648 if filter.completed_before.is_some() {
1650 let param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0) + 1;
1651 sql.push_str(&format!(
1652 " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1653 param_num
1654 ));
1655 }
1656
1657 let limit = filter.limit.unwrap_or(1000);
1659 let limit_param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0)
1660 + if filter.completed_before.is_some() { 1 } else { 0 }
1661 + 1;
1662 sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1663
1664 let mut query = sqlx::query_scalar::<_, String>(&sql);
1666 if let Some(ref ids) = filter.instance_ids {
1667 for id in ids {
1668 query = query.bind(id);
1669 }
1670 }
1671 if let Some(completed_before) = filter.completed_before {
1672 query = query.bind(completed_before as i64);
1673 }
1674 query = query.bind(limit as i64);
1675
1676 let instance_ids: Vec<String> = query
1677 .fetch_all(&*self.pool)
1678 .await
1679 .map_err(|e| Self::sqlx_to_provider_error("prune_executions_bulk", e))?;
1680
1681 let mut result = PruneResult::default();
1683
1684 for instance_id in &instance_ids {
1685 let single_result = self.prune_executions(instance_id, options.clone()).await?;
1686 result.instances_processed += single_result.instances_processed;
1687 result.executions_deleted += single_result.executions_deleted;
1688 result.events_deleted += single_result.events_deleted;
1689 }
1690
1691 debug!(
1692 target = "duroxide::providers::postgres",
1693 operation = "prune_executions_bulk",
1694 instances_processed = result.instances_processed,
1695 executions_deleted = result.executions_deleted,
1696 events_deleted = result.events_deleted,
1697 "Bulk pruned executions"
1698 );
1699
1700 Ok(result)
1701 }
1702}