1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4 DeleteInstanceResult, DispatcherCapabilityFilter, ExecutionInfo, ExecutionMetadata,
5 InstanceFilter, InstanceInfo, OrchestrationItem, Provider, ProviderAdmin, ProviderError,
6 PruneOptions, PruneResult, QueueDepths, ScheduledActivityIdentifier, SystemMetrics, WorkItem,
7};
8use duroxide::Event;
9use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
10use std::sync::Arc;
11use std::time::Duration;
12use std::time::{SystemTime, UNIX_EPOCH};
13use tokio::time::sleep;
14use tracing::{debug, error, instrument, warn};
15
16use crate::migrations::MigrationRunner;
17
18pub struct PostgresProvider {
41 pool: Arc<PgPool>,
42 schema_name: String,
43}
44
45impl PostgresProvider {
46 pub async fn new(database_url: &str) -> Result<Self> {
47 Self::new_with_schema(database_url, None).await
48 }
49
50 pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
51 let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
52 .ok()
53 .and_then(|s| s.parse::<u32>().ok())
54 .unwrap_or(10);
55
56 let pool = PgPoolOptions::new()
57 .max_connections(max_connections)
58 .min_connections(1)
59 .acquire_timeout(std::time::Duration::from_secs(30))
60 .connect(database_url)
61 .await?;
62
63 let schema_name = schema_name.unwrap_or("public").to_string();
64
65 let provider = Self {
66 pool: Arc::new(pool),
67 schema_name: schema_name.clone(),
68 };
69
70 let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
72 migration_runner.migrate().await?;
73
74 Ok(provider)
75 }
76
77 #[instrument(skip(self), target = "duroxide::providers::postgres")]
78 pub async fn initialize_schema(&self) -> Result<()> {
79 let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
82 migration_runner.migrate().await?;
83 Ok(())
84 }
85
86 fn now_millis() -> i64 {
88 SystemTime::now()
89 .duration_since(UNIX_EPOCH)
90 .unwrap()
91 .as_millis() as i64
92 }
93
94
95
96 fn table_name(&self, table: &str) -> String {
98 format!("{}.{}", self.schema_name, table)
99 }
100
101 pub fn pool(&self) -> &PgPool {
103 &self.pool
104 }
105
106 pub fn schema_name(&self) -> &str {
108 &self.schema_name
109 }
110
111 fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
113 match e {
114 SqlxError::Database(ref db_err) => {
115 let code_opt = db_err.code();
117 let code = code_opt.as_deref();
118 if code == Some("40P01") {
119 ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
121 } else if code == Some("40001") {
122 ProviderError::permanent(operation, format!("Serialization failure: {e}"))
124 } else if code == Some("23505") {
125 ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
127 } else if code == Some("23503") {
128 ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
130 } else {
131 ProviderError::permanent(operation, format!("Database error: {e}"))
132 }
133 }
134 SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
135 ProviderError::retryable(operation, format!("Connection pool error: {e}"))
136 }
137 SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
138 _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
139 }
140 }
141
142 pub async fn cleanup_schema(&self) -> Result<()> {
147 sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
149 .execute(&*self.pool)
150 .await?;
151
152 if self.schema_name != "public" {
155 sqlx::query(&format!(
156 "DROP SCHEMA IF EXISTS {} CASCADE",
157 self.schema_name
158 ))
159 .execute(&*self.pool)
160 .await?;
161 } else {
162 }
165
166 Ok(())
167 }
168}
169
170#[async_trait::async_trait]
171impl Provider for PostgresProvider {
172 fn name(&self) -> &str {
173 "duroxide-pg"
174 }
175
176 fn version(&self) -> &str {
177 env!("CARGO_PKG_VERSION")
178 }
179
180 #[instrument(skip(self), target = "duroxide::providers::postgres")]
181 async fn fetch_orchestration_item(
182 &self,
183 lock_timeout: Duration,
184 _poll_timeout: Duration,
185 filter: Option<&DispatcherCapabilityFilter>,
186 ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
187 let start = std::time::Instant::now();
188
189 const MAX_RETRIES: u32 = 3;
190 const RETRY_DELAY_MS: u64 = 50;
191
192 let lock_timeout_ms = lock_timeout.as_millis() as i64;
194 let mut _last_error: Option<ProviderError> = None;
195
196 let (min_packed, max_packed) = if let Some(f) = filter {
198 if let Some(range) = f.supported_duroxide_versions.first() {
199 let min = range.min.major as i64 * 1_000_000 + range.min.minor as i64 * 1_000 + range.min.patch as i64;
200 let max = range.max.major as i64 * 1_000_000 + range.max.minor as i64 * 1_000 + range.max.patch as i64;
201 (Some(min), Some(max))
202 } else {
203 return Ok(None);
205 }
206 } else {
207 (None, None)
208 };
209
210 for attempt in 0..=MAX_RETRIES {
211 let now_ms = Self::now_millis();
212
213 let result: Result<
214 Option<(
215 String,
216 String,
217 String,
218 i64,
219 serde_json::Value,
220 serde_json::Value,
221 String,
222 i32,
223 )>,
224 SqlxError,
225 > = sqlx::query_as(&format!(
226 "SELECT * FROM {}.fetch_orchestration_item($1, $2, $3, $4)",
227 self.schema_name
228 ))
229 .bind(now_ms)
230 .bind(lock_timeout_ms)
231 .bind(min_packed)
232 .bind(max_packed)
233 .fetch_optional(&*self.pool)
234 .await;
235
236 let row = match result {
237 Ok(r) => r,
238 Err(e) => {
239 let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
240 if provider_err.is_retryable() && attempt < MAX_RETRIES {
241 warn!(
242 target = "duroxide::providers::postgres",
243 operation = "fetch_orchestration_item",
244 attempt = attempt + 1,
245 error = %provider_err,
246 "Retryable error, will retry"
247 );
248 _last_error = Some(provider_err);
249 sleep(std::time::Duration::from_millis(
250 RETRY_DELAY_MS * (attempt as u64 + 1),
251 ))
252 .await;
253 continue;
254 }
255 return Err(provider_err);
256 }
257 };
258
259 if let Some((
260 instance_id,
261 orchestration_name,
262 orchestration_version,
263 execution_id,
264 history_json,
265 messages_json,
266 lock_token,
267 attempt_count,
268 )) = row
269 {
270 let (history, history_error) = match serde_json::from_value::<Vec<Event>>(history_json) {
271 Ok(h) => (h, None),
272 Err(e) => {
273 let error_msg = format!("Failed to deserialize history: {e}");
274 warn!(
275 target = "duroxide::providers::postgres",
276 instance = %instance_id,
277 error = %error_msg,
278 "History deserialization failed, returning item with history_error"
279 );
280 (vec![], Some(error_msg))
281 }
282 };
283
284 let messages: Vec<WorkItem> =
285 serde_json::from_value(messages_json).map_err(|e| {
286 ProviderError::permanent(
287 "fetch_orchestration_item",
288 format!("Failed to deserialize messages: {e}"),
289 )
290 })?;
291
292 let duration_ms = start.elapsed().as_millis() as u64;
293 debug!(
294 target = "duroxide::providers::postgres",
295 operation = "fetch_orchestration_item",
296 instance_id = %instance_id,
297 execution_id = execution_id,
298 message_count = messages.len(),
299 history_count = history.len(),
300 attempt_count = attempt_count,
301 duration_ms = duration_ms,
302 attempts = attempt + 1,
303 "Fetched orchestration item via stored procedure"
304 );
305
306 return Ok(Some((
307 OrchestrationItem {
308 instance: instance_id,
309 orchestration_name,
310 execution_id: execution_id as u64,
311 version: orchestration_version,
312 history,
313 messages,
314 history_error,
315 },
316 lock_token,
317 attempt_count as u32,
318 )));
319 }
320
321 return Ok(None);
324 }
325
326 Ok(None)
327 }
328 #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
329 async fn ack_orchestration_item(
330 &self,
331 lock_token: &str,
332 execution_id: u64,
333 history_delta: Vec<Event>,
334 worker_items: Vec<WorkItem>,
335 orchestrator_items: Vec<WorkItem>,
336 metadata: ExecutionMetadata,
337 cancelled_activities: Vec<ScheduledActivityIdentifier>,
338 ) -> Result<(), ProviderError> {
339 let start = std::time::Instant::now();
340
341 const MAX_RETRIES: u32 = 3;
342 const RETRY_DELAY_MS: u64 = 50;
343
344 let mut history_delta_payload = Vec::with_capacity(history_delta.len());
345 for event in &history_delta {
346 if event.event_id() == 0 {
347 return Err(ProviderError::permanent(
348 "ack_orchestration_item",
349 "event_id must be set by runtime",
350 ));
351 }
352
353 let event_json = serde_json::to_string(event).map_err(|e| {
354 ProviderError::permanent(
355 "ack_orchestration_item",
356 format!("Failed to serialize event: {e}"),
357 )
358 })?;
359
360 let event_type = format!("{event:?}")
361 .split('{')
362 .next()
363 .unwrap_or("Unknown")
364 .trim()
365 .to_string();
366
367 history_delta_payload.push(serde_json::json!({
368 "event_id": event.event_id(),
369 "event_type": event_type,
370 "event_data": event_json,
371 }));
372 }
373
374 let history_delta_json = serde_json::Value::Array(history_delta_payload);
375
376 let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
377 ProviderError::permanent(
378 "ack_orchestration_item",
379 format!("Failed to serialize worker items: {e}"),
380 )
381 })?;
382
383 let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
384 ProviderError::permanent(
385 "ack_orchestration_item",
386 format!("Failed to serialize orchestrator items: {e}"),
387 )
388 })?;
389
390 let metadata_json = serde_json::json!({
391 "orchestration_name": metadata.orchestration_name,
392 "orchestration_version": metadata.orchestration_version,
393 "status": metadata.status,
394 "output": metadata.output,
395 "parent_instance_id": metadata.parent_instance_id,
396 "pinned_duroxide_version": metadata.pinned_duroxide_version.as_ref().map(|v| {
397 serde_json::json!({
398 "major": v.major,
399 "minor": v.minor,
400 "patch": v.patch,
401 })
402 }),
403 });
404
405 let cancelled_activities_json: Vec<serde_json::Value> = cancelled_activities
407 .iter()
408 .map(|a| {
409 serde_json::json!({
410 "instance": a.instance,
411 "execution_id": a.execution_id,
412 "activity_id": a.activity_id,
413 })
414 })
415 .collect();
416 let cancelled_activities_json = serde_json::Value::Array(cancelled_activities_json);
417
418 for attempt in 0..=MAX_RETRIES {
419 let now_ms = Self::now_millis();
420 let result = sqlx::query(&format!(
421 "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6, $7, $8)",
422 self.schema_name
423 ))
424 .bind(lock_token)
425 .bind(now_ms)
426 .bind(execution_id as i64)
427 .bind(&history_delta_json)
428 .bind(&worker_items_json)
429 .bind(&orchestrator_items_json)
430 .bind(&metadata_json)
431 .bind(&cancelled_activities_json)
432 .execute(&*self.pool)
433 .await;
434
435 match result {
436 Ok(_) => {
437 let duration_ms = start.elapsed().as_millis() as u64;
438 debug!(
439 target = "duroxide::providers::postgres",
440 operation = "ack_orchestration_item",
441 execution_id = execution_id,
442 history_count = history_delta.len(),
443 worker_items_count = worker_items.len(),
444 orchestrator_items_count = orchestrator_items.len(),
445 cancelled_activities_count = cancelled_activities.len(),
446 duration_ms = duration_ms,
447 attempts = attempt + 1,
448 "Acknowledged orchestration item via stored procedure"
449 );
450 return Ok(());
451 }
452 Err(e) => {
453 if let SqlxError::Database(db_err) = &e {
455 if db_err.message().contains("Invalid lock token") {
456 return Err(ProviderError::permanent(
457 "ack_orchestration_item",
458 "Invalid lock token",
459 ));
460 }
461 } else if e.to_string().contains("Invalid lock token") {
462 return Err(ProviderError::permanent(
463 "ack_orchestration_item",
464 "Invalid lock token",
465 ));
466 }
467
468 let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
469 if provider_err.is_retryable() && attempt < MAX_RETRIES {
470 warn!(
471 target = "duroxide::providers::postgres",
472 operation = "ack_orchestration_item",
473 attempt = attempt + 1,
474 error = %provider_err,
475 "Retryable error, will retry"
476 );
477 sleep(std::time::Duration::from_millis(
478 RETRY_DELAY_MS * (attempt as u64 + 1),
479 ))
480 .await;
481 continue;
482 }
483 return Err(provider_err);
484 }
485 }
486 }
487
488 Ok(())
490 }
491 #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
492 async fn abandon_orchestration_item(
493 &self,
494 lock_token: &str,
495 delay: Option<Duration>,
496 ignore_attempt: bool,
497 ) -> Result<(), ProviderError> {
498 let start = std::time::Instant::now();
499 let now_ms = Self::now_millis();
500 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
501
502 let instance_id = match sqlx::query_scalar::<_, String>(&format!(
503 "SELECT {}.abandon_orchestration_item($1, $2, $3, $4)",
504 self.schema_name
505 ))
506 .bind(lock_token)
507 .bind(now_ms)
508 .bind(delay_param)
509 .bind(ignore_attempt)
510 .fetch_one(&*self.pool)
511 .await
512 {
513 Ok(instance_id) => instance_id,
514 Err(e) => {
515 if let SqlxError::Database(db_err) = &e {
516 if db_err.message().contains("Invalid lock token") {
517 return Err(ProviderError::permanent(
518 "abandon_orchestration_item",
519 "Invalid lock token",
520 ));
521 }
522 } else if e.to_string().contains("Invalid lock token") {
523 return Err(ProviderError::permanent(
524 "abandon_orchestration_item",
525 "Invalid lock token",
526 ));
527 }
528
529 return Err(Self::sqlx_to_provider_error(
530 "abandon_orchestration_item",
531 e,
532 ));
533 }
534 };
535
536 let duration_ms = start.elapsed().as_millis() as u64;
537 debug!(
538 target = "duroxide::providers::postgres",
539 operation = "abandon_orchestration_item",
540 instance_id = %instance_id,
541 delay_ms = delay.map(|d| d.as_millis() as u64),
542 ignore_attempt = ignore_attempt,
543 duration_ms = duration_ms,
544 "Abandoned orchestration item via stored procedure"
545 );
546
547 Ok(())
548 }
549
550 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
551 async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
552 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
553 "SELECT out_event_data FROM {}.fetch_history($1)",
554 self.schema_name
555 ))
556 .bind(instance)
557 .fetch_all(&*self.pool)
558 .await
559 .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
560
561 Ok(event_data_rows
562 .into_iter()
563 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
564 .collect())
565 }
566
567 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
568 async fn append_with_execution(
569 &self,
570 instance: &str,
571 execution_id: u64,
572 new_events: Vec<Event>,
573 ) -> Result<(), ProviderError> {
574 if new_events.is_empty() {
575 return Ok(());
576 }
577
578 let mut events_payload = Vec::with_capacity(new_events.len());
579 for event in &new_events {
580 if event.event_id() == 0 {
581 error!(
582 target = "duroxide::providers::postgres",
583 operation = "append_with_execution",
584 error_type = "validation_error",
585 instance_id = %instance,
586 execution_id = execution_id,
587 "event_id must be set by runtime"
588 );
589 return Err(ProviderError::permanent(
590 "append_with_execution",
591 "event_id must be set by runtime",
592 ));
593 }
594
595 let event_json = serde_json::to_string(event).map_err(|e| {
596 ProviderError::permanent(
597 "append_with_execution",
598 format!("Failed to serialize event: {e}"),
599 )
600 })?;
601
602 let event_type = format!("{event:?}")
603 .split('{')
604 .next()
605 .unwrap_or("Unknown")
606 .trim()
607 .to_string();
608
609 events_payload.push(serde_json::json!({
610 "event_id": event.event_id(),
611 "event_type": event_type,
612 "event_data": event_json,
613 }));
614 }
615
616 let events_json = serde_json::Value::Array(events_payload);
617
618 sqlx::query(&format!(
619 "SELECT {}.append_history($1, $2, $3)",
620 self.schema_name
621 ))
622 .bind(instance)
623 .bind(execution_id as i64)
624 .bind(events_json)
625 .execute(&*self.pool)
626 .await
627 .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
628
629 debug!(
630 target = "duroxide::providers::postgres",
631 operation = "append_with_execution",
632 instance_id = %instance,
633 execution_id = execution_id,
634 event_count = new_events.len(),
635 "Appended history events via stored procedure"
636 );
637
638 Ok(())
639 }
640
641 #[instrument(skip(self), target = "duroxide::providers::postgres")]
642 async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
643 let work_item = serde_json::to_string(&item).map_err(|e| {
644 ProviderError::permanent(
645 "enqueue_worker_work",
646 format!("Failed to serialize work item: {e}"),
647 )
648 })?;
649
650 let now_ms = Self::now_millis();
651
652 let (instance_id, execution_id, activity_id) = match &item {
654 WorkItem::ActivityExecute {
655 instance,
656 execution_id,
657 id,
658 ..
659 } => (
660 Some(instance.clone()),
661 Some(*execution_id as i64),
662 Some(*id as i64),
663 ),
664 _ => (None, None, None),
665 };
666
667 sqlx::query(&format!(
668 "SELECT {}.enqueue_worker_work($1, $2, $3, $4, $5)",
669 self.schema_name
670 ))
671 .bind(work_item)
672 .bind(now_ms)
673 .bind(&instance_id)
674 .bind(execution_id)
675 .bind(activity_id)
676 .execute(&*self.pool)
677 .await
678 .map_err(|e| {
679 error!(
680 target = "duroxide::providers::postgres",
681 operation = "enqueue_worker_work",
682 error_type = "database_error",
683 error = %e,
684 "Failed to enqueue worker work"
685 );
686 Self::sqlx_to_provider_error("enqueue_worker_work", e)
687 })?;
688
689 Ok(())
690 }
691
692 #[instrument(skip(self), target = "duroxide::providers::postgres")]
693 async fn fetch_work_item(
694 &self,
695 lock_timeout: Duration,
696 _poll_timeout: Duration,
697 ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
698 let start = std::time::Instant::now();
699
700 let lock_timeout_ms = lock_timeout.as_millis() as i64;
702
703 let row = match sqlx::query_as::<_, (String, String, i32)>(&format!(
704 "SELECT * FROM {}.fetch_work_item($1, $2)",
705 self.schema_name
706 ))
707 .bind(Self::now_millis())
708 .bind(lock_timeout_ms)
709 .fetch_optional(&*self.pool)
710 .await
711 {
712 Ok(row) => row,
713 Err(e) => {
714 return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
715 }
716 };
717
718 let (work_item_json, lock_token, attempt_count) = match row {
719 Some(row) => row,
720 None => return Ok(None),
721 };
722
723 let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
724 ProviderError::permanent(
725 "fetch_work_item",
726 format!("Failed to deserialize worker item: {e}"),
727 )
728 })?;
729
730 let duration_ms = start.elapsed().as_millis() as u64;
731
732 let instance_id = match &work_item {
734 WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
735 WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
736 WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
737 WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
738 WorkItem::TimerFired { instance, .. } => instance.as_str(),
739 WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
740 WorkItem::CancelInstance { instance, .. } => instance.as_str(),
741 WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
742 WorkItem::SubOrchCompleted {
743 parent_instance, ..
744 } => parent_instance.as_str(),
745 WorkItem::SubOrchFailed {
746 parent_instance, ..
747 } => parent_instance.as_str(),
748 };
749
750 debug!(
751 target = "duroxide::providers::postgres",
752 operation = "fetch_work_item",
753 instance_id = %instance_id,
754 attempt_count = attempt_count,
755 duration_ms = duration_ms,
756 "Fetched activity work item via stored procedure"
757 );
758
759 Ok(Some((
760 work_item,
761 lock_token,
762 attempt_count as u32,
763 )))
764 }
765
766 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
767 async fn ack_work_item(
768 &self,
769 token: &str,
770 completion: Option<WorkItem>,
771 ) -> Result<(), ProviderError> {
772 let start = std::time::Instant::now();
773
774 let Some(completion) = completion else {
776 sqlx::query(&format!(
778 "SELECT {}.ack_worker($1)",
779 self.schema_name
780 ))
781 .bind(token)
782 .execute(&*self.pool)
783 .await
784 .map_err(|e| {
785 if e.to_string().contains("Worker queue item not found") {
786 ProviderError::permanent(
787 "ack_worker",
788 "Worker queue item not found or already processed",
789 )
790 } else {
791 Self::sqlx_to_provider_error("ack_worker", e)
792 }
793 })?;
794
795 let duration_ms = start.elapsed().as_millis() as u64;
796 debug!(
797 target = "duroxide::providers::postgres",
798 operation = "ack_worker",
799 token = %token,
800 duration_ms = duration_ms,
801 "Acknowledged worker without completion (cancelled)"
802 );
803 return Ok(());
804 };
805
806 let instance_id = match &completion {
808 WorkItem::ActivityCompleted { instance, .. }
809 | WorkItem::ActivityFailed { instance, .. } => instance,
810 _ => {
811 error!(
812 target = "duroxide::providers::postgres",
813 operation = "ack_worker",
814 error_type = "invalid_completion_type",
815 "Invalid completion work item type"
816 );
817 return Err(ProviderError::permanent(
818 "ack_worker",
819 "Invalid completion work item type",
820 ));
821 }
822 };
823
824 let completion_json = serde_json::to_string(&completion).map_err(|e| {
825 ProviderError::permanent("ack_worker", format!("Failed to serialize completion: {e}"))
826 })?;
827
828 let now_ms = Self::now_millis();
829
830 sqlx::query(&format!(
832 "SELECT {}.ack_worker($1, $2, $3, $4)",
833 self.schema_name
834 ))
835 .bind(token)
836 .bind(instance_id)
837 .bind(completion_json)
838 .bind(now_ms)
839 .execute(&*self.pool)
840 .await
841 .map_err(|e| {
842 if e.to_string().contains("Worker queue item not found") {
843 error!(
844 target = "duroxide::providers::postgres",
845 operation = "ack_worker",
846 error_type = "worker_item_not_found",
847 token = %token,
848 "Worker queue item not found or already processed"
849 );
850 ProviderError::permanent(
851 "ack_worker",
852 "Worker queue item not found or already processed",
853 )
854 } else {
855 Self::sqlx_to_provider_error("ack_worker", e)
856 }
857 })?;
858
859 let duration_ms = start.elapsed().as_millis() as u64;
860 debug!(
861 target = "duroxide::providers::postgres",
862 operation = "ack_worker",
863 instance_id = %instance_id,
864 duration_ms = duration_ms,
865 "Acknowledged worker and enqueued completion"
866 );
867
868 Ok(())
869 }
870
871 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
872 async fn renew_work_item_lock(
873 &self,
874 token: &str,
875 extend_for: Duration,
876 ) -> Result<(), ProviderError> {
877 let start = std::time::Instant::now();
878
879 let now_ms = Self::now_millis();
881
882 let extend_secs = extend_for.as_secs() as i64;
884
885 match sqlx::query(&format!(
886 "SELECT {}.renew_work_item_lock($1, $2, $3)",
887 self.schema_name
888 ))
889 .bind(token)
890 .bind(now_ms)
891 .bind(extend_secs)
892 .execute(&*self.pool)
893 .await
894 {
895 Ok(_) => {
896 let duration_ms = start.elapsed().as_millis() as u64;
897 debug!(
898 target = "duroxide::providers::postgres",
899 operation = "renew_work_item_lock",
900 token = %token,
901 extend_for_secs = extend_secs,
902 duration_ms = duration_ms,
903 "Work item lock renewed successfully"
904 );
905 Ok(())
906 }
907 Err(e) => {
908 if let SqlxError::Database(db_err) = &e {
909 if db_err.message().contains("Lock token invalid") {
910 return Err(ProviderError::permanent(
911 "renew_work_item_lock",
912 "Lock token invalid, expired, or already acked",
913 ));
914 }
915 } else if e.to_string().contains("Lock token invalid") {
916 return Err(ProviderError::permanent(
917 "renew_work_item_lock",
918 "Lock token invalid, expired, or already acked",
919 ));
920 }
921
922 Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
923 }
924 }
925 }
926
927 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
928 async fn abandon_work_item(
929 &self,
930 token: &str,
931 delay: Option<Duration>,
932 ignore_attempt: bool,
933 ) -> Result<(), ProviderError> {
934 let start = std::time::Instant::now();
935 let now_ms = Self::now_millis();
936 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
937
938 match sqlx::query(&format!(
939 "SELECT {}.abandon_work_item($1, $2, $3, $4)",
940 self.schema_name
941 ))
942 .bind(token)
943 .bind(now_ms)
944 .bind(delay_param)
945 .bind(ignore_attempt)
946 .execute(&*self.pool)
947 .await
948 {
949 Ok(_) => {
950 let duration_ms = start.elapsed().as_millis() as u64;
951 debug!(
952 target = "duroxide::providers::postgres",
953 operation = "abandon_work_item",
954 token = %token,
955 delay_ms = delay.map(|d| d.as_millis() as u64),
956 ignore_attempt = ignore_attempt,
957 duration_ms = duration_ms,
958 "Abandoned work item via stored procedure"
959 );
960 Ok(())
961 }
962 Err(e) => {
963 if let SqlxError::Database(db_err) = &e {
964 if db_err.message().contains("Invalid lock token")
965 || db_err.message().contains("already acked")
966 {
967 return Err(ProviderError::permanent(
968 "abandon_work_item",
969 "Invalid lock token or already acked",
970 ));
971 }
972 } else if e.to_string().contains("Invalid lock token")
973 || e.to_string().contains("already acked")
974 {
975 return Err(ProviderError::permanent(
976 "abandon_work_item",
977 "Invalid lock token or already acked",
978 ));
979 }
980
981 Err(Self::sqlx_to_provider_error("abandon_work_item", e))
982 }
983 }
984 }
985
986 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
987 async fn renew_orchestration_item_lock(
988 &self,
989 token: &str,
990 extend_for: Duration,
991 ) -> Result<(), ProviderError> {
992 let start = std::time::Instant::now();
993
994 let now_ms = Self::now_millis();
996
997 let extend_secs = extend_for.as_secs() as i64;
999
1000 match sqlx::query(&format!(
1001 "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
1002 self.schema_name
1003 ))
1004 .bind(token)
1005 .bind(now_ms)
1006 .bind(extend_secs)
1007 .execute(&*self.pool)
1008 .await
1009 {
1010 Ok(_) => {
1011 let duration_ms = start.elapsed().as_millis() as u64;
1012 debug!(
1013 target = "duroxide::providers::postgres",
1014 operation = "renew_orchestration_item_lock",
1015 token = %token,
1016 extend_for_secs = extend_secs,
1017 duration_ms = duration_ms,
1018 "Orchestration item lock renewed successfully"
1019 );
1020 Ok(())
1021 }
1022 Err(e) => {
1023 if let SqlxError::Database(db_err) = &e {
1024 if db_err.message().contains("Lock token invalid")
1025 || db_err.message().contains("expired")
1026 || db_err.message().contains("already released")
1027 {
1028 return Err(ProviderError::permanent(
1029 "renew_orchestration_item_lock",
1030 "Lock token invalid, expired, or already released",
1031 ));
1032 }
1033 } else if e.to_string().contains("Lock token invalid")
1034 || e.to_string().contains("expired")
1035 || e.to_string().contains("already released")
1036 {
1037 return Err(ProviderError::permanent(
1038 "renew_orchestration_item_lock",
1039 "Lock token invalid, expired, or already released",
1040 ));
1041 }
1042
1043 Err(Self::sqlx_to_provider_error(
1044 "renew_orchestration_item_lock",
1045 e,
1046 ))
1047 }
1048 }
1049 }
1050
1051 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1052 async fn enqueue_for_orchestrator(
1053 &self,
1054 item: WorkItem,
1055 delay: Option<Duration>,
1056 ) -> Result<(), ProviderError> {
1057 let work_item = serde_json::to_string(&item).map_err(|e| {
1058 ProviderError::permanent(
1059 "enqueue_orchestrator_work",
1060 format!("Failed to serialize work item: {e}"),
1061 )
1062 })?;
1063
1064 let instance_id = match &item {
1066 WorkItem::StartOrchestration { instance, .. }
1067 | WorkItem::ActivityCompleted { instance, .. }
1068 | WorkItem::ActivityFailed { instance, .. }
1069 | WorkItem::TimerFired { instance, .. }
1070 | WorkItem::ExternalRaised { instance, .. }
1071 | WorkItem::CancelInstance { instance, .. }
1072 | WorkItem::ContinueAsNew { instance, .. } => instance,
1073 WorkItem::SubOrchCompleted {
1074 parent_instance, ..
1075 }
1076 | WorkItem::SubOrchFailed {
1077 parent_instance, ..
1078 } => parent_instance,
1079 WorkItem::ActivityExecute { .. } => {
1080 return Err(ProviderError::permanent(
1081 "enqueue_orchestrator_work",
1082 "ActivityExecute should go to worker queue, not orchestrator queue",
1083 ));
1084 }
1085 };
1086
1087 let now_ms = Self::now_millis();
1089
1090 let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
1091 if *fire_at_ms > 0 {
1092 if let Some(delay) = delay {
1094 std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
1095 } else {
1096 *fire_at_ms
1097 }
1098 } else {
1099 delay
1101 .map(|d| now_ms as u64 + d.as_millis() as u64)
1102 .unwrap_or(now_ms as u64)
1103 }
1104 } else {
1105 delay
1107 .map(|d| now_ms as u64 + d.as_millis() as u64)
1108 .unwrap_or(now_ms as u64)
1109 };
1110
1111 let visible_at = Utc
1112 .timestamp_millis_opt(visible_at_ms as i64)
1113 .single()
1114 .ok_or_else(|| {
1115 ProviderError::permanent(
1116 "enqueue_orchestrator_work",
1117 "Invalid visible_at timestamp",
1118 )
1119 })?;
1120
1121 sqlx::query(&format!(
1126 "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
1127 self.schema_name
1128 ))
1129 .bind(instance_id)
1130 .bind(&work_item)
1131 .bind(visible_at)
1132 .bind::<Option<String>>(None) .bind::<Option<String>>(None) .bind::<Option<i64>>(None) .execute(&*self.pool)
1136 .await
1137 .map_err(|e| {
1138 error!(
1139 target = "duroxide::providers::postgres",
1140 operation = "enqueue_orchestrator_work",
1141 error_type = "database_error",
1142 error = %e,
1143 instance_id = %instance_id,
1144 "Failed to enqueue orchestrator work"
1145 );
1146 Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1147 })?;
1148
1149 debug!(
1150 target = "duroxide::providers::postgres",
1151 operation = "enqueue_orchestrator_work",
1152 instance_id = %instance_id,
1153 delay_ms = delay.map(|d| d.as_millis() as u64),
1154 "Enqueued orchestrator work"
1155 );
1156
1157 Ok(())
1158 }
1159
1160 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1161 async fn read_with_execution(
1162 &self,
1163 instance: &str,
1164 execution_id: u64,
1165 ) -> Result<Vec<Event>, ProviderError> {
1166 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1167 "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1168 self.table_name("history")
1169 ))
1170 .bind(instance)
1171 .bind(execution_id as i64)
1172 .fetch_all(&*self.pool)
1173 .await
1174 .ok()
1175 .unwrap_or_default();
1176
1177 Ok(event_data_rows
1178 .into_iter()
1179 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1180 .collect())
1181 }
1182
1183 fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1184 Some(self)
1185 }
1186}
1187
1188#[async_trait::async_trait]
1189impl ProviderAdmin for PostgresProvider {
1190 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1191 async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1192 sqlx::query_scalar(&format!(
1193 "SELECT instance_id FROM {}.list_instances()",
1194 self.schema_name
1195 ))
1196 .fetch_all(&*self.pool)
1197 .await
1198 .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1199 }
1200
1201 #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1202 async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1203 sqlx::query_scalar(&format!(
1204 "SELECT instance_id FROM {}.list_instances_by_status($1)",
1205 self.schema_name
1206 ))
1207 .bind(status)
1208 .fetch_all(&*self.pool)
1209 .await
1210 .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1211 }
1212
1213 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1214 async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1215 let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1216 "SELECT execution_id FROM {}.list_executions($1)",
1217 self.schema_name
1218 ))
1219 .bind(instance)
1220 .fetch_all(&*self.pool)
1221 .await
1222 .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1223
1224 Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1225 }
1226
1227 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1228 async fn read_history_with_execution_id(
1229 &self,
1230 instance: &str,
1231 execution_id: u64,
1232 ) -> Result<Vec<Event>, ProviderError> {
1233 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1234 "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1235 self.schema_name
1236 ))
1237 .bind(instance)
1238 .bind(execution_id as i64)
1239 .fetch_all(&*self.pool)
1240 .await
1241 .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1242
1243 event_data_rows
1244 .into_iter()
1245 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1246 .collect::<Vec<Event>>()
1247 .into_iter()
1248 .map(Ok)
1249 .collect()
1250 }
1251
1252 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1253 async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1254 let execution_id = self.latest_execution_id(instance).await?;
1255 self.read_history_with_execution_id(instance, execution_id)
1256 .await
1257 }
1258
1259 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1260 async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1261 sqlx::query_scalar(&format!(
1262 "SELECT {}.latest_execution_id($1)",
1263 self.schema_name
1264 ))
1265 .bind(instance)
1266 .fetch_optional(&*self.pool)
1267 .await
1268 .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1269 .map(|id: i64| id as u64)
1270 .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1271 }
1272
1273 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1274 async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1275 let row: Option<(
1276 String,
1277 String,
1278 String,
1279 i64,
1280 chrono::DateTime<Utc>,
1281 Option<chrono::DateTime<Utc>>,
1282 Option<String>,
1283 Option<String>,
1284 Option<String>,
1285 )> = sqlx::query_as(&format!(
1286 "SELECT * FROM {}.get_instance_info($1)",
1287 self.schema_name
1288 ))
1289 .bind(instance)
1290 .fetch_optional(&*self.pool)
1291 .await
1292 .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1293
1294 let (
1295 instance_id,
1296 orchestration_name,
1297 orchestration_version,
1298 current_execution_id,
1299 created_at,
1300 updated_at,
1301 status,
1302 output,
1303 parent_instance_id,
1304 ) =
1305 row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1306
1307 Ok(InstanceInfo {
1308 instance_id,
1309 orchestration_name,
1310 orchestration_version,
1311 current_execution_id: current_execution_id as u64,
1312 status: status.unwrap_or_else(|| "Running".to_string()),
1313 output,
1314 created_at: created_at.timestamp_millis() as u64,
1315 updated_at: updated_at
1316 .map(|dt| dt.timestamp_millis() as u64)
1317 .unwrap_or(created_at.timestamp_millis() as u64),
1318 parent_instance_id,
1319 })
1320 }
1321
1322 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1323 async fn get_execution_info(
1324 &self,
1325 instance: &str,
1326 execution_id: u64,
1327 ) -> Result<ExecutionInfo, ProviderError> {
1328 let row: Option<(
1329 i64,
1330 String,
1331 Option<String>,
1332 chrono::DateTime<Utc>,
1333 Option<chrono::DateTime<Utc>>,
1334 i64,
1335 )> = sqlx::query_as(&format!(
1336 "SELECT * FROM {}.get_execution_info($1, $2)",
1337 self.schema_name
1338 ))
1339 .bind(instance)
1340 .bind(execution_id as i64)
1341 .fetch_optional(&*self.pool)
1342 .await
1343 .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1344
1345 let (exec_id, status, output, started_at, completed_at, event_count) = row
1346 .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1347
1348 Ok(ExecutionInfo {
1349 execution_id: exec_id as u64,
1350 status,
1351 output,
1352 started_at: started_at.timestamp_millis() as u64,
1353 completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1354 event_count: event_count as usize,
1355 })
1356 }
1357
1358 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1359 async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1360 let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1361 "SELECT * FROM {}.get_system_metrics()",
1362 self.schema_name
1363 ))
1364 .fetch_optional(&*self.pool)
1365 .await
1366 .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1367
1368 let (
1369 total_instances,
1370 total_executions,
1371 running_instances,
1372 completed_instances,
1373 failed_instances,
1374 total_events,
1375 ) = row.ok_or_else(|| {
1376 ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1377 })?;
1378
1379 Ok(SystemMetrics {
1380 total_instances: total_instances as u64,
1381 total_executions: total_executions as u64,
1382 running_instances: running_instances as u64,
1383 completed_instances: completed_instances as u64,
1384 failed_instances: failed_instances as u64,
1385 total_events: total_events as u64,
1386 })
1387 }
1388
1389 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1390 async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1391 let now_ms = Self::now_millis();
1392
1393 let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1394 "SELECT * FROM {}.get_queue_depths($1)",
1395 self.schema_name
1396 ))
1397 .bind(now_ms)
1398 .fetch_optional(&*self.pool)
1399 .await
1400 .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1401
1402 let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1403 ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1404 })?;
1405
1406 Ok(QueueDepths {
1407 orchestrator_queue: orchestrator_queue as usize,
1408 worker_queue: worker_queue as usize,
1409 timer_queue: 0, })
1411 }
1412
1413 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1416 async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError> {
1417 sqlx::query_scalar(&format!(
1418 "SELECT child_instance_id FROM {}.list_children($1)",
1419 self.schema_name
1420 ))
1421 .bind(instance_id)
1422 .fetch_all(&*self.pool)
1423 .await
1424 .map_err(|e| Self::sqlx_to_provider_error("list_children", e))
1425 }
1426
1427 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1428 async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError> {
1429 let result: Result<Option<String>, _> = sqlx::query_scalar(&format!(
1432 "SELECT {}.get_parent_id($1)",
1433 self.schema_name
1434 ))
1435 .bind(instance_id)
1436 .fetch_one(&*self.pool)
1437 .await;
1438
1439 match result {
1440 Ok(parent_id) => Ok(parent_id),
1441 Err(e) => {
1442 let err_str = e.to_string();
1443 if err_str.contains("Instance not found") {
1444 Err(ProviderError::permanent(
1445 "get_parent_id",
1446 format!("Instance not found: {}", instance_id),
1447 ))
1448 } else {
1449 Err(Self::sqlx_to_provider_error("get_parent_id", e))
1450 }
1451 }
1452 }
1453 }
1454
1455 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1458 async fn delete_instances_atomic(
1459 &self,
1460 ids: &[String],
1461 force: bool,
1462 ) -> Result<DeleteInstanceResult, ProviderError> {
1463 if ids.is_empty() {
1464 return Ok(DeleteInstanceResult::default());
1465 }
1466
1467 let row: Option<(i64, i64, i64, i64)> = sqlx::query_as(&format!(
1468 "SELECT * FROM {}.delete_instances_atomic($1, $2)",
1469 self.schema_name
1470 ))
1471 .bind(ids)
1472 .bind(force)
1473 .fetch_optional(&*self.pool)
1474 .await
1475 .map_err(|e| {
1476 let err_str = e.to_string();
1477 if err_str.contains("is Running") {
1478 ProviderError::permanent(
1479 "delete_instances_atomic",
1480 err_str,
1481 )
1482 } else if err_str.contains("Orphan detected") {
1483 ProviderError::permanent(
1484 "delete_instances_atomic",
1485 err_str,
1486 )
1487 } else {
1488 Self::sqlx_to_provider_error("delete_instances_atomic", e)
1489 }
1490 })?;
1491
1492 let (instances_deleted, executions_deleted, events_deleted, queue_messages_deleted) =
1493 row.unwrap_or((0, 0, 0, 0));
1494
1495 debug!(
1496 target = "duroxide::providers::postgres",
1497 operation = "delete_instances_atomic",
1498 instances_deleted = instances_deleted,
1499 executions_deleted = executions_deleted,
1500 events_deleted = events_deleted,
1501 queue_messages_deleted = queue_messages_deleted,
1502 "Deleted instances atomically"
1503 );
1504
1505 Ok(DeleteInstanceResult {
1506 instances_deleted: instances_deleted as u64,
1507 executions_deleted: executions_deleted as u64,
1508 events_deleted: events_deleted as u64,
1509 queue_messages_deleted: queue_messages_deleted as u64,
1510 })
1511 }
1512
1513 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1514 async fn delete_instance_bulk(
1515 &self,
1516 filter: InstanceFilter,
1517 ) -> Result<DeleteInstanceResult, ProviderError> {
1518 let mut sql = format!(
1520 r#"
1521 SELECT i.instance_id
1522 FROM {}.instances i
1523 LEFT JOIN {}.executions e ON i.instance_id = e.instance_id
1524 AND i.current_execution_id = e.execution_id
1525 WHERE i.parent_instance_id IS NULL
1526 AND e.status IN ('Completed', 'Failed', 'ContinuedAsNew')
1527 "#,
1528 self.schema_name, self.schema_name
1529 );
1530
1531 if let Some(ref ids) = filter.instance_ids {
1533 if ids.is_empty() {
1534 return Ok(DeleteInstanceResult::default());
1535 }
1536 let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1537 sql.push_str(&format!(
1538 " AND i.instance_id IN ({})",
1539 placeholders.join(", ")
1540 ));
1541 }
1542
1543 if filter.completed_before.is_some() {
1545 let param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0) + 1;
1546 sql.push_str(&format!(
1547 " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1548 param_num
1549 ));
1550 }
1551
1552 let limit = filter.limit.unwrap_or(1000);
1554 let limit_param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0)
1555 + if filter.completed_before.is_some() { 1 } else { 0 }
1556 + 1;
1557 sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1558
1559 let mut query = sqlx::query_scalar::<_, String>(&sql);
1561 if let Some(ref ids) = filter.instance_ids {
1562 for id in ids {
1563 query = query.bind(id);
1564 }
1565 }
1566 if let Some(completed_before) = filter.completed_before {
1567 query = query.bind(completed_before as i64);
1568 }
1569 query = query.bind(limit as i64);
1570
1571 let instance_ids: Vec<String> = query
1572 .fetch_all(&*self.pool)
1573 .await
1574 .map_err(|e| Self::sqlx_to_provider_error("delete_instance_bulk", e))?;
1575
1576 if instance_ids.is_empty() {
1577 return Ok(DeleteInstanceResult::default());
1578 }
1579
1580 let mut result = DeleteInstanceResult::default();
1582
1583 for instance_id in &instance_ids {
1584 let tree = self.get_instance_tree(instance_id).await?;
1586
1587 let delete_result = self.delete_instances_atomic(&tree.all_ids, true).await?;
1589 result.instances_deleted += delete_result.instances_deleted;
1590 result.executions_deleted += delete_result.executions_deleted;
1591 result.events_deleted += delete_result.events_deleted;
1592 result.queue_messages_deleted += delete_result.queue_messages_deleted;
1593 }
1594
1595 debug!(
1596 target = "duroxide::providers::postgres",
1597 operation = "delete_instance_bulk",
1598 instances_deleted = result.instances_deleted,
1599 executions_deleted = result.executions_deleted,
1600 events_deleted = result.events_deleted,
1601 queue_messages_deleted = result.queue_messages_deleted,
1602 "Bulk deleted instances"
1603 );
1604
1605 Ok(result)
1606 }
1607
1608 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1611 async fn prune_executions(
1612 &self,
1613 instance_id: &str,
1614 options: PruneOptions,
1615 ) -> Result<PruneResult, ProviderError> {
1616 let keep_last: Option<i32> = options.keep_last.map(|v| v as i32);
1617 let completed_before_ms: Option<i64> = options.completed_before.map(|v| v as i64);
1618
1619 let row: Option<(i64, i64, i64)> = sqlx::query_as(&format!(
1620 "SELECT * FROM {}.prune_executions($1, $2, $3)",
1621 self.schema_name
1622 ))
1623 .bind(instance_id)
1624 .bind(keep_last)
1625 .bind(completed_before_ms)
1626 .fetch_optional(&*self.pool)
1627 .await
1628 .map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?;
1629
1630 let (instances_processed, executions_deleted, events_deleted) = row.unwrap_or((0, 0, 0));
1631
1632 debug!(
1633 target = "duroxide::providers::postgres",
1634 operation = "prune_executions",
1635 instance_id = %instance_id,
1636 instances_processed = instances_processed,
1637 executions_deleted = executions_deleted,
1638 events_deleted = events_deleted,
1639 "Pruned executions"
1640 );
1641
1642 Ok(PruneResult {
1643 instances_processed: instances_processed as u64,
1644 executions_deleted: executions_deleted as u64,
1645 events_deleted: events_deleted as u64,
1646 })
1647 }
1648
1649 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1650 async fn prune_executions_bulk(
1651 &self,
1652 filter: InstanceFilter,
1653 options: PruneOptions,
1654 ) -> Result<PruneResult, ProviderError> {
1655 let mut sql = format!(
1660 r#"
1661 SELECT i.instance_id
1662 FROM {}.instances i
1663 LEFT JOIN {}.executions e ON i.instance_id = e.instance_id
1664 AND i.current_execution_id = e.execution_id
1665 WHERE 1=1
1666 "#,
1667 self.schema_name, self.schema_name
1668 );
1669
1670 if let Some(ref ids) = filter.instance_ids {
1672 if ids.is_empty() {
1673 return Ok(PruneResult::default());
1674 }
1675 let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1676 sql.push_str(&format!(
1677 " AND i.instance_id IN ({})",
1678 placeholders.join(", ")
1679 ));
1680 }
1681
1682 if filter.completed_before.is_some() {
1684 let param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0) + 1;
1685 sql.push_str(&format!(
1686 " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1687 param_num
1688 ));
1689 }
1690
1691 let limit = filter.limit.unwrap_or(1000);
1693 let limit_param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0)
1694 + if filter.completed_before.is_some() { 1 } else { 0 }
1695 + 1;
1696 sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1697
1698 let mut query = sqlx::query_scalar::<_, String>(&sql);
1700 if let Some(ref ids) = filter.instance_ids {
1701 for id in ids {
1702 query = query.bind(id);
1703 }
1704 }
1705 if let Some(completed_before) = filter.completed_before {
1706 query = query.bind(completed_before as i64);
1707 }
1708 query = query.bind(limit as i64);
1709
1710 let instance_ids: Vec<String> = query
1711 .fetch_all(&*self.pool)
1712 .await
1713 .map_err(|e| Self::sqlx_to_provider_error("prune_executions_bulk", e))?;
1714
1715 let mut result = PruneResult::default();
1717
1718 for instance_id in &instance_ids {
1719 let single_result = self.prune_executions(instance_id, options.clone()).await?;
1720 result.instances_processed += single_result.instances_processed;
1721 result.executions_deleted += single_result.executions_deleted;
1722 result.events_deleted += single_result.events_deleted;
1723 }
1724
1725 debug!(
1726 target = "duroxide::providers::postgres",
1727 operation = "prune_executions_bulk",
1728 instances_processed = result.instances_processed,
1729 executions_deleted = result.executions_deleted,
1730 events_deleted = result.events_deleted,
1731 "Bulk pruned executions"
1732 );
1733
1734 Ok(result)
1735 }
1736}