1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4 DeleteInstanceResult, DispatcherCapabilityFilter, ExecutionInfo, ExecutionMetadata,
5 InstanceFilter, InstanceInfo, OrchestrationItem, Provider, ProviderAdmin, ProviderError,
6 PruneOptions, PruneResult, QueueDepths, ScheduledActivityIdentifier, SessionFetchConfig,
7 SystemMetrics, TagFilter, WorkItem,
8};
9use duroxide::{Event, EventKind};
10use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
11use std::sync::Arc;
12use std::time::Duration;
13use std::time::{SystemTime, UNIX_EPOCH};
14use tokio::time::sleep;
15use tracing::{debug, error, instrument, warn};
16
17use crate::migrations::MigrationRunner;
18
19pub struct PostgresProvider {
42 pool: Arc<PgPool>,
43 schema_name: String,
44}
45
46impl PostgresProvider {
47 pub async fn new(database_url: &str) -> Result<Self> {
48 Self::new_with_schema(database_url, None).await
49 }
50
51 pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
52 let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
53 .ok()
54 .and_then(|s| s.parse::<u32>().ok())
55 .unwrap_or(10);
56
57 let pool = PgPoolOptions::new()
58 .max_connections(max_connections)
59 .min_connections(1)
60 .acquire_timeout(std::time::Duration::from_secs(30))
61 .connect(database_url)
62 .await?;
63
64 let schema_name = schema_name.unwrap_or("public").to_string();
65
66 let provider = Self {
67 pool: Arc::new(pool),
68 schema_name: schema_name.clone(),
69 };
70
71 let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
73 migration_runner.migrate().await?;
74
75 Ok(provider)
76 }
77
78 #[instrument(skip(self), target = "duroxide::providers::postgres")]
79 pub async fn initialize_schema(&self) -> Result<()> {
80 let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
83 migration_runner.migrate().await?;
84 Ok(())
85 }
86
87 fn now_millis() -> i64 {
89 SystemTime::now()
90 .duration_since(UNIX_EPOCH)
91 .unwrap()
92 .as_millis() as i64
93 }
94
95 fn table_name(&self, table: &str) -> String {
97 format!("{}.{}", self.schema_name, table)
98 }
99
100 pub fn pool(&self) -> &PgPool {
102 &self.pool
103 }
104
105 pub fn schema_name(&self) -> &str {
107 &self.schema_name
108 }
109
110 fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
112 match e {
113 SqlxError::Database(ref db_err) => {
114 let code_opt = db_err.code();
116 let code = code_opt.as_deref();
117 if code == Some("40P01") {
118 ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
120 } else if code == Some("40001") {
121 ProviderError::permanent(operation, format!("Serialization failure: {e}"))
123 } else if code == Some("23505") {
124 ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
126 } else if code == Some("23503") {
127 ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
129 } else {
130 ProviderError::permanent(operation, format!("Database error: {e}"))
131 }
132 }
133 SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
134 ProviderError::retryable(operation, format!("Connection pool error: {e}"))
135 }
136 SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
137 _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
138 }
139 }
140
141 fn tag_filter_to_sql(filter: &TagFilter) -> (&'static str, Vec<String>) {
143 match filter {
144 TagFilter::DefaultOnly => ("default_only", vec![]),
145 TagFilter::Tags(set) => {
146 let mut tags: Vec<String> = set.iter().cloned().collect();
147 tags.sort();
148 ("tags", tags)
149 }
150 TagFilter::DefaultAnd(set) => {
151 let mut tags: Vec<String> = set.iter().cloned().collect();
152 tags.sort();
153 ("default_and", tags)
154 }
155 TagFilter::Any => ("any", vec![]),
156 TagFilter::None => ("none", vec![]),
157 }
158 }
159
160 pub async fn cleanup_schema(&self) -> Result<()> {
165 const MAX_RETRIES: u32 = 5;
166 const BASE_RETRY_DELAY_MS: u64 = 50;
167
168 for attempt in 0..=MAX_RETRIES {
169 let cleanup_result = async {
170 sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
172 .execute(&*self.pool)
173 .await?;
174
175 if self.schema_name != "public" {
178 sqlx::query(&format!(
179 "DROP SCHEMA IF EXISTS {} CASCADE",
180 self.schema_name
181 ))
182 .execute(&*self.pool)
183 .await?;
184 } else {
185 }
188
189 Ok::<(), SqlxError>(())
190 }
191 .await;
192
193 match cleanup_result {
194 Ok(()) => return Ok(()),
195 Err(SqlxError::Database(db_err)) if db_err.code().as_deref() == Some("40P01") => {
196 if attempt < MAX_RETRIES {
197 warn!(
198 target = "duroxide::providers::postgres",
199 schema = %self.schema_name,
200 attempt = attempt + 1,
201 "Deadlock during cleanup_schema, retrying"
202 );
203 sleep(Duration::from_millis(
204 BASE_RETRY_DELAY_MS * (attempt as u64 + 1),
205 ))
206 .await;
207 continue;
208 }
209 return Err(anyhow::anyhow!(db_err.to_string()));
210 }
211 Err(e) => return Err(anyhow::anyhow!(e.to_string())),
212 }
213 }
214
215 Ok(())
216 }
217}
218
219#[async_trait::async_trait]
220impl Provider for PostgresProvider {
221 fn name(&self) -> &str {
222 "duroxide-pg"
223 }
224
225 fn version(&self) -> &str {
226 env!("CARGO_PKG_VERSION")
227 }
228
229 #[instrument(skip(self), target = "duroxide::providers::postgres")]
230 async fn fetch_orchestration_item(
231 &self,
232 lock_timeout: Duration,
233 _poll_timeout: Duration,
234 filter: Option<&DispatcherCapabilityFilter>,
235 ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
236 let start = std::time::Instant::now();
237
238 const MAX_RETRIES: u32 = 3;
239 const RETRY_DELAY_MS: u64 = 50;
240
241 let lock_timeout_ms = lock_timeout.as_millis() as i64;
243 let mut _last_error: Option<ProviderError> = None;
244
245 let (min_packed, max_packed) = if let Some(f) = filter {
247 if let Some(range) = f.supported_duroxide_versions.first() {
248 let min = range.min.major as i64 * 1_000_000
249 + range.min.minor as i64 * 1_000
250 + range.min.patch as i64;
251 let max = range.max.major as i64 * 1_000_000
252 + range.max.minor as i64 * 1_000
253 + range.max.patch as i64;
254 (Some(min), Some(max))
255 } else {
256 return Ok(None);
258 }
259 } else {
260 (None, None)
261 };
262
263 for attempt in 0..=MAX_RETRIES {
264 let now_ms = Self::now_millis();
265
266 let result: Result<
267 Option<(
268 String,
269 String,
270 String,
271 i64,
272 serde_json::Value,
273 serde_json::Value,
274 String,
275 i32,
276 serde_json::Value,
277 )>,
278 SqlxError,
279 > = sqlx::query_as(&format!(
280 "SELECT * FROM {}.fetch_orchestration_item($1, $2, $3, $4)",
281 self.schema_name
282 ))
283 .bind(now_ms)
284 .bind(lock_timeout_ms)
285 .bind(min_packed)
286 .bind(max_packed)
287 .fetch_optional(&*self.pool)
288 .await;
289
290 let row = match result {
291 Ok(r) => r,
292 Err(e) => {
293 let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
294 if provider_err.is_retryable() && attempt < MAX_RETRIES {
295 warn!(
296 target = "duroxide::providers::postgres",
297 operation = "fetch_orchestration_item",
298 attempt = attempt + 1,
299 error = %provider_err,
300 "Retryable error, will retry"
301 );
302 _last_error = Some(provider_err);
303 sleep(std::time::Duration::from_millis(
304 RETRY_DELAY_MS * (attempt as u64 + 1),
305 ))
306 .await;
307 continue;
308 }
309 return Err(provider_err);
310 }
311 };
312
313 if let Some((
314 instance_id,
315 orchestration_name,
316 orchestration_version,
317 execution_id,
318 history_json,
319 messages_json,
320 lock_token,
321 attempt_count,
322 kv_snapshot_json,
323 )) = row
324 {
325 let (history, history_error) =
326 match serde_json::from_value::<Vec<Event>>(history_json) {
327 Ok(h) => (h, None),
328 Err(e) => {
329 let error_msg = format!("Failed to deserialize history: {e}");
330 warn!(
331 target = "duroxide::providers::postgres",
332 instance = %instance_id,
333 error = %error_msg,
334 "History deserialization failed, returning item with history_error"
335 );
336 (vec![], Some(error_msg))
337 }
338 };
339
340 let messages: Vec<WorkItem> =
341 serde_json::from_value(messages_json).map_err(|e| {
342 ProviderError::permanent(
343 "fetch_orchestration_item",
344 format!("Failed to deserialize messages: {e}"),
345 )
346 })?;
347 let kv_snapshot: std::collections::HashMap<String, String> =
348 serde_json::from_value(kv_snapshot_json).unwrap_or_default();
349
350 let duration_ms = start.elapsed().as_millis() as u64;
351 debug!(
352 target = "duroxide::providers::postgres",
353 operation = "fetch_orchestration_item",
354 instance_id = %instance_id,
355 execution_id = execution_id,
356 message_count = messages.len(),
357 history_count = history.len(),
358 attempt_count = attempt_count,
359 duration_ms = duration_ms,
360 attempts = attempt + 1,
361 "Fetched orchestration item via stored procedure"
362 );
363
364 if orchestration_name == "Unknown"
370 && history.is_empty()
371 && messages
372 .iter()
373 .all(|m| matches!(m, WorkItem::QueueMessage { .. }))
374 {
375 let message_count = messages.len();
376 tracing::warn!(
377 target = "duroxide::providers::postgres",
378 instance = %instance_id,
379 message_count,
380 "Dropping orphan queue messages — events enqueued before orchestration started are not supported"
381 );
382 self.ack_orchestration_item(
383 &lock_token,
384 execution_id as u64,
385 vec![],
386 vec![],
387 vec![],
388 ExecutionMetadata::default(),
389 vec![],
390 )
391 .await?;
392 return Ok(None);
393 }
394
395 return Ok(Some((
396 OrchestrationItem {
397 instance: instance_id,
398 orchestration_name,
399 execution_id: execution_id as u64,
400 version: orchestration_version,
401 history,
402 messages,
403 history_error,
404 kv_snapshot,
405 },
406 lock_token,
407 attempt_count as u32,
408 )));
409 }
410
411 return Ok(None);
414 }
415
416 Ok(None)
417 }
418 #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
419 async fn ack_orchestration_item(
420 &self,
421 lock_token: &str,
422 execution_id: u64,
423 history_delta: Vec<Event>,
424 worker_items: Vec<WorkItem>,
425 orchestrator_items: Vec<WorkItem>,
426 metadata: ExecutionMetadata,
427 cancelled_activities: Vec<ScheduledActivityIdentifier>,
428 ) -> Result<(), ProviderError> {
429 let start = std::time::Instant::now();
430
431 const MAX_RETRIES: u32 = 3;
432 const RETRY_DELAY_MS: u64 = 50;
433
434 let mut history_delta_payload = Vec::with_capacity(history_delta.len());
435 for event in &history_delta {
436 if event.event_id() == 0 {
437 return Err(ProviderError::permanent(
438 "ack_orchestration_item",
439 "event_id must be set by runtime",
440 ));
441 }
442
443 let event_json = serde_json::to_string(event).map_err(|e| {
444 ProviderError::permanent(
445 "ack_orchestration_item",
446 format!("Failed to serialize event: {e}"),
447 )
448 })?;
449
450 let event_type = format!("{event:?}")
451 .split('{')
452 .next()
453 .unwrap_or("Unknown")
454 .trim()
455 .to_string();
456
457 history_delta_payload.push(serde_json::json!({
458 "event_id": event.event_id(),
459 "event_type": event_type,
460 "event_data": event_json,
461 }));
462 }
463
464 let history_delta_json = serde_json::Value::Array(history_delta_payload);
465
466 let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
467 ProviderError::permanent(
468 "ack_orchestration_item",
469 format!("Failed to serialize worker items: {e}"),
470 )
471 })?;
472
473 let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
474 ProviderError::permanent(
475 "ack_orchestration_item",
476 format!("Failed to serialize orchestrator items: {e}"),
477 )
478 })?;
479
480 let (custom_status_action, custom_status_value): (Option<&str>, Option<&str>) = {
482 let mut last_status: Option<&Option<String>> = None;
483 for event in &history_delta {
484 if let EventKind::CustomStatusUpdated { ref status } = event.kind {
485 last_status = Some(status);
486 }
487 }
488 match last_status {
489 Some(Some(s)) => (Some("set"), Some(s.as_str())),
490 Some(None) => (Some("clear"), None),
491 None => (None, None),
492 }
493 };
494
495 let kv_mutations: Vec<serde_json::Value> = history_delta
496 .iter()
497 .filter_map(|event| match &event.kind {
498 EventKind::KeyValueSet { key, value } => Some(serde_json::json!({
499 "action": "set",
500 "key": key,
501 "value": value,
502 })),
503 EventKind::KeyValueCleared { key } => Some(serde_json::json!({
504 "action": "clear_key",
505 "key": key,
506 })),
507 EventKind::KeyValuesCleared => Some(serde_json::json!({
508 "action": "clear_all",
509 })),
510 _ => None,
511 })
512 .collect();
513
514 let metadata_json = serde_json::json!({
515 "orchestration_name": metadata.orchestration_name,
516 "orchestration_version": metadata.orchestration_version,
517 "status": metadata.status,
518 "output": metadata.output,
519 "parent_instance_id": metadata.parent_instance_id,
520 "pinned_duroxide_version": metadata.pinned_duroxide_version.as_ref().map(|v| {
521 serde_json::json!({
522 "major": v.major,
523 "minor": v.minor,
524 "patch": v.patch,
525 })
526 }),
527 "custom_status_action": custom_status_action,
528 "custom_status_value": custom_status_value,
529 "kv_mutations": kv_mutations,
530 });
531
532 let cancelled_activities_json: Vec<serde_json::Value> = cancelled_activities
534 .iter()
535 .map(|a| {
536 serde_json::json!({
537 "instance": a.instance,
538 "execution_id": a.execution_id,
539 "activity_id": a.activity_id,
540 })
541 })
542 .collect();
543 let cancelled_activities_json = serde_json::Value::Array(cancelled_activities_json);
544
545 for attempt in 0..=MAX_RETRIES {
546 let now_ms = Self::now_millis();
547 let result = sqlx::query(&format!(
548 "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6, $7, $8)",
549 self.schema_name
550 ))
551 .bind(lock_token)
552 .bind(now_ms)
553 .bind(execution_id as i64)
554 .bind(&history_delta_json)
555 .bind(&worker_items_json)
556 .bind(&orchestrator_items_json)
557 .bind(&metadata_json)
558 .bind(&cancelled_activities_json)
559 .execute(&*self.pool)
560 .await;
561
562 match result {
563 Ok(_) => {
564 let duration_ms = start.elapsed().as_millis() as u64;
565 debug!(
566 target = "duroxide::providers::postgres",
567 operation = "ack_orchestration_item",
568 execution_id = execution_id,
569 history_count = history_delta.len(),
570 worker_items_count = worker_items.len(),
571 orchestrator_items_count = orchestrator_items.len(),
572 cancelled_activities_count = cancelled_activities.len(),
573 duration_ms = duration_ms,
574 attempts = attempt + 1,
575 "Acknowledged orchestration item via stored procedure"
576 );
577 return Ok(());
578 }
579 Err(e) => {
580 if let SqlxError::Database(db_err) = &e {
582 if db_err.message().contains("Invalid lock token") {
583 return Err(ProviderError::permanent(
584 "ack_orchestration_item",
585 "Invalid lock token",
586 ));
587 }
588 } else if e.to_string().contains("Invalid lock token") {
589 return Err(ProviderError::permanent(
590 "ack_orchestration_item",
591 "Invalid lock token",
592 ));
593 }
594
595 let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
596 if provider_err.is_retryable() && attempt < MAX_RETRIES {
597 warn!(
598 target = "duroxide::providers::postgres",
599 operation = "ack_orchestration_item",
600 attempt = attempt + 1,
601 error = %provider_err,
602 "Retryable error, will retry"
603 );
604 sleep(std::time::Duration::from_millis(
605 RETRY_DELAY_MS * (attempt as u64 + 1),
606 ))
607 .await;
608 continue;
609 }
610 return Err(provider_err);
611 }
612 }
613 }
614
615 Ok(())
617 }
618 #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
619 async fn abandon_orchestration_item(
620 &self,
621 lock_token: &str,
622 delay: Option<Duration>,
623 ignore_attempt: bool,
624 ) -> Result<(), ProviderError> {
625 let start = std::time::Instant::now();
626 let now_ms = Self::now_millis();
627 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
628
629 let instance_id = match sqlx::query_scalar::<_, String>(&format!(
630 "SELECT {}.abandon_orchestration_item($1, $2, $3, $4)",
631 self.schema_name
632 ))
633 .bind(lock_token)
634 .bind(now_ms)
635 .bind(delay_param)
636 .bind(ignore_attempt)
637 .fetch_one(&*self.pool)
638 .await
639 {
640 Ok(instance_id) => instance_id,
641 Err(e) => {
642 if let SqlxError::Database(db_err) = &e {
643 if db_err.message().contains("Invalid lock token") {
644 return Err(ProviderError::permanent(
645 "abandon_orchestration_item",
646 "Invalid lock token",
647 ));
648 }
649 } else if e.to_string().contains("Invalid lock token") {
650 return Err(ProviderError::permanent(
651 "abandon_orchestration_item",
652 "Invalid lock token",
653 ));
654 }
655
656 return Err(Self::sqlx_to_provider_error(
657 "abandon_orchestration_item",
658 e,
659 ));
660 }
661 };
662
663 let duration_ms = start.elapsed().as_millis() as u64;
664 debug!(
665 target = "duroxide::providers::postgres",
666 operation = "abandon_orchestration_item",
667 instance_id = %instance_id,
668 delay_ms = delay.map(|d| d.as_millis() as u64),
669 ignore_attempt = ignore_attempt,
670 duration_ms = duration_ms,
671 "Abandoned orchestration item via stored procedure"
672 );
673
674 Ok(())
675 }
676
677 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
678 async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
679 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
680 "SELECT out_event_data FROM {}.fetch_history($1)",
681 self.schema_name
682 ))
683 .bind(instance)
684 .fetch_all(&*self.pool)
685 .await
686 .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
687
688 Ok(event_data_rows
689 .into_iter()
690 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
691 .collect())
692 }
693
694 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
695 async fn append_with_execution(
696 &self,
697 instance: &str,
698 execution_id: u64,
699 new_events: Vec<Event>,
700 ) -> Result<(), ProviderError> {
701 if new_events.is_empty() {
702 return Ok(());
703 }
704
705 let mut events_payload = Vec::with_capacity(new_events.len());
706 for event in &new_events {
707 if event.event_id() == 0 {
708 error!(
709 target = "duroxide::providers::postgres",
710 operation = "append_with_execution",
711 error_type = "validation_error",
712 instance_id = %instance,
713 execution_id = execution_id,
714 "event_id must be set by runtime"
715 );
716 return Err(ProviderError::permanent(
717 "append_with_execution",
718 "event_id must be set by runtime",
719 ));
720 }
721
722 let event_json = serde_json::to_string(event).map_err(|e| {
723 ProviderError::permanent(
724 "append_with_execution",
725 format!("Failed to serialize event: {e}"),
726 )
727 })?;
728
729 let event_type = format!("{event:?}")
730 .split('{')
731 .next()
732 .unwrap_or("Unknown")
733 .trim()
734 .to_string();
735
736 events_payload.push(serde_json::json!({
737 "event_id": event.event_id(),
738 "event_type": event_type,
739 "event_data": event_json,
740 }));
741 }
742
743 let events_json = serde_json::Value::Array(events_payload);
744
745 sqlx::query(&format!(
746 "SELECT {}.append_history($1, $2, $3)",
747 self.schema_name
748 ))
749 .bind(instance)
750 .bind(execution_id as i64)
751 .bind(events_json)
752 .execute(&*self.pool)
753 .await
754 .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
755
756 debug!(
757 target = "duroxide::providers::postgres",
758 operation = "append_with_execution",
759 instance_id = %instance,
760 execution_id = execution_id,
761 event_count = new_events.len(),
762 "Appended history events via stored procedure"
763 );
764
765 Ok(())
766 }
767
768 #[instrument(skip(self), target = "duroxide::providers::postgres")]
769 async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
770 let work_item = serde_json::to_string(&item).map_err(|e| {
771 ProviderError::permanent(
772 "enqueue_worker_work",
773 format!("Failed to serialize work item: {e}"),
774 )
775 })?;
776
777 let now_ms = Self::now_millis();
778
779 let (instance_id, execution_id, activity_id, session_id, tag) = match &item {
781 WorkItem::ActivityExecute {
782 instance,
783 execution_id,
784 id,
785 session_id,
786 tag,
787 ..
788 } => (
789 Some(instance.clone()),
790 Some(*execution_id as i64),
791 Some(*id as i64),
792 session_id.clone(),
793 tag.clone(),
794 ),
795 _ => (None, None, None, None, None),
796 };
797
798 sqlx::query(&format!(
799 "SELECT {}.enqueue_worker_work($1, $2, $3, $4, $5, $6, $7)",
800 self.schema_name
801 ))
802 .bind(work_item)
803 .bind(now_ms)
804 .bind(&instance_id)
805 .bind(execution_id)
806 .bind(activity_id)
807 .bind(&session_id)
808 .bind(&tag)
809 .execute(&*self.pool)
810 .await
811 .map_err(|e| {
812 error!(
813 target = "duroxide::providers::postgres",
814 operation = "enqueue_worker_work",
815 error_type = "database_error",
816 error = %e,
817 "Failed to enqueue worker work"
818 );
819 Self::sqlx_to_provider_error("enqueue_worker_work", e)
820 })?;
821
822 Ok(())
823 }
824
825 #[instrument(skip(self), target = "duroxide::providers::postgres")]
826 async fn fetch_work_item(
827 &self,
828 lock_timeout: Duration,
829 _poll_timeout: Duration,
830 session: Option<&SessionFetchConfig>,
831 tag_filter: &TagFilter,
832 ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
833 if matches!(tag_filter, TagFilter::None) {
835 return Ok(None);
836 }
837
838 let start = std::time::Instant::now();
839
840 let lock_timeout_ms = lock_timeout.as_millis() as i64;
842
843 let (owner_id, session_lock_timeout_ms): (Option<&str>, Option<i64>) = match session {
845 Some(config) => (
846 Some(&config.owner_id),
847 Some(config.lock_timeout.as_millis() as i64),
848 ),
849 None => (None, None),
850 };
851
852 let (tag_mode, tag_names) = Self::tag_filter_to_sql(tag_filter);
854
855 let row = match sqlx::query_as::<_, (String, String, i32)>(&format!(
856 "SELECT * FROM {}.fetch_work_item($1, $2, $3, $4, $5, $6)",
857 self.schema_name
858 ))
859 .bind(Self::now_millis())
860 .bind(lock_timeout_ms)
861 .bind(owner_id)
862 .bind(session_lock_timeout_ms)
863 .bind(&tag_names)
864 .bind(tag_mode)
865 .fetch_optional(&*self.pool)
866 .await
867 {
868 Ok(row) => row,
869 Err(e) => {
870 return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
871 }
872 };
873
874 let (work_item_json, lock_token, attempt_count) = match row {
875 Some(row) => row,
876 None => return Ok(None),
877 };
878
879 let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
880 ProviderError::permanent(
881 "fetch_work_item",
882 format!("Failed to deserialize worker item: {e}"),
883 )
884 })?;
885
886 let duration_ms = start.elapsed().as_millis() as u64;
887
888 let instance_id = match &work_item {
890 WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
891 WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
892 WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
893 WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
894 WorkItem::TimerFired { instance, .. } => instance.as_str(),
895 WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
896 WorkItem::CancelInstance { instance, .. } => instance.as_str(),
897 WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
898 WorkItem::SubOrchCompleted {
899 parent_instance, ..
900 } => parent_instance.as_str(),
901 WorkItem::SubOrchFailed {
902 parent_instance, ..
903 } => parent_instance.as_str(),
904 WorkItem::QueueMessage { instance, .. } => instance.as_str(),
905 };
906
907 debug!(
908 target = "duroxide::providers::postgres",
909 operation = "fetch_work_item",
910 instance_id = %instance_id,
911 attempt_count = attempt_count,
912 duration_ms = duration_ms,
913 "Fetched activity work item via stored procedure"
914 );
915
916 Ok(Some((work_item, lock_token, attempt_count as u32)))
917 }
918
919 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
920 async fn ack_work_item(
921 &self,
922 token: &str,
923 completion: Option<WorkItem>,
924 ) -> Result<(), ProviderError> {
925 let start = std::time::Instant::now();
926
927 let Some(completion) = completion else {
929 let now_ms = Self::now_millis();
930 sqlx::query(&format!(
932 "SELECT {}.ack_worker($1, NULL, NULL, $2)",
933 self.schema_name
934 ))
935 .bind(token)
936 .bind(now_ms)
937 .execute(&*self.pool)
938 .await
939 .map_err(|e| {
940 if e.to_string().contains("Worker queue item not found") {
941 ProviderError::permanent(
942 "ack_worker",
943 "Worker queue item not found or already processed",
944 )
945 } else {
946 Self::sqlx_to_provider_error("ack_worker", e)
947 }
948 })?;
949
950 let duration_ms = start.elapsed().as_millis() as u64;
951 debug!(
952 target = "duroxide::providers::postgres",
953 operation = "ack_worker",
954 token = %token,
955 duration_ms = duration_ms,
956 "Acknowledged worker without completion (cancelled)"
957 );
958 return Ok(());
959 };
960
961 let instance_id = match &completion {
963 WorkItem::ActivityCompleted { instance, .. }
964 | WorkItem::ActivityFailed { instance, .. } => instance,
965 _ => {
966 error!(
967 target = "duroxide::providers::postgres",
968 operation = "ack_worker",
969 error_type = "invalid_completion_type",
970 "Invalid completion work item type"
971 );
972 return Err(ProviderError::permanent(
973 "ack_worker",
974 "Invalid completion work item type",
975 ));
976 }
977 };
978
979 let completion_json = serde_json::to_string(&completion).map_err(|e| {
980 ProviderError::permanent("ack_worker", format!("Failed to serialize completion: {e}"))
981 })?;
982
983 let now_ms = Self::now_millis();
984
985 sqlx::query(&format!(
987 "SELECT {}.ack_worker($1, $2, $3, $4)",
988 self.schema_name
989 ))
990 .bind(token)
991 .bind(instance_id)
992 .bind(completion_json)
993 .bind(now_ms)
994 .execute(&*self.pool)
995 .await
996 .map_err(|e| {
997 if e.to_string().contains("Worker queue item not found") {
998 error!(
999 target = "duroxide::providers::postgres",
1000 operation = "ack_worker",
1001 error_type = "worker_item_not_found",
1002 token = %token,
1003 "Worker queue item not found or already processed"
1004 );
1005 ProviderError::permanent(
1006 "ack_worker",
1007 "Worker queue item not found or already processed",
1008 )
1009 } else {
1010 Self::sqlx_to_provider_error("ack_worker", e)
1011 }
1012 })?;
1013
1014 let duration_ms = start.elapsed().as_millis() as u64;
1015 debug!(
1016 target = "duroxide::providers::postgres",
1017 operation = "ack_worker",
1018 instance_id = %instance_id,
1019 duration_ms = duration_ms,
1020 "Acknowledged worker and enqueued completion"
1021 );
1022
1023 Ok(())
1024 }
1025
1026 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1027 async fn renew_work_item_lock(
1028 &self,
1029 token: &str,
1030 extend_for: Duration,
1031 ) -> Result<(), ProviderError> {
1032 let start = std::time::Instant::now();
1033
1034 let now_ms = Self::now_millis();
1036
1037 let extend_secs = extend_for.as_secs() as i64;
1039
1040 match sqlx::query(&format!(
1041 "SELECT {}.renew_work_item_lock($1, $2, $3)",
1042 self.schema_name
1043 ))
1044 .bind(token)
1045 .bind(now_ms)
1046 .bind(extend_secs)
1047 .execute(&*self.pool)
1048 .await
1049 {
1050 Ok(_) => {
1051 let duration_ms = start.elapsed().as_millis() as u64;
1052 debug!(
1053 target = "duroxide::providers::postgres",
1054 operation = "renew_work_item_lock",
1055 token = %token,
1056 extend_for_secs = extend_secs,
1057 duration_ms = duration_ms,
1058 "Work item lock renewed successfully"
1059 );
1060 Ok(())
1061 }
1062 Err(e) => {
1063 if let SqlxError::Database(db_err) = &e {
1064 if db_err.message().contains("Lock token invalid") {
1065 return Err(ProviderError::permanent(
1066 "renew_work_item_lock",
1067 "Lock token invalid, expired, or already acked",
1068 ));
1069 }
1070 } else if e.to_string().contains("Lock token invalid") {
1071 return Err(ProviderError::permanent(
1072 "renew_work_item_lock",
1073 "Lock token invalid, expired, or already acked",
1074 ));
1075 }
1076
1077 Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
1078 }
1079 }
1080 }
1081
1082 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1083 async fn abandon_work_item(
1084 &self,
1085 token: &str,
1086 delay: Option<Duration>,
1087 ignore_attempt: bool,
1088 ) -> Result<(), ProviderError> {
1089 let start = std::time::Instant::now();
1090 let now_ms = Self::now_millis();
1091 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
1092
1093 match sqlx::query(&format!(
1094 "SELECT {}.abandon_work_item($1, $2, $3, $4)",
1095 self.schema_name
1096 ))
1097 .bind(token)
1098 .bind(now_ms)
1099 .bind(delay_param)
1100 .bind(ignore_attempt)
1101 .execute(&*self.pool)
1102 .await
1103 {
1104 Ok(_) => {
1105 let duration_ms = start.elapsed().as_millis() as u64;
1106 debug!(
1107 target = "duroxide::providers::postgres",
1108 operation = "abandon_work_item",
1109 token = %token,
1110 delay_ms = delay.map(|d| d.as_millis() as u64),
1111 ignore_attempt = ignore_attempt,
1112 duration_ms = duration_ms,
1113 "Abandoned work item via stored procedure"
1114 );
1115 Ok(())
1116 }
1117 Err(e) => {
1118 if let SqlxError::Database(db_err) = &e {
1119 if db_err.message().contains("Invalid lock token")
1120 || db_err.message().contains("already acked")
1121 {
1122 return Err(ProviderError::permanent(
1123 "abandon_work_item",
1124 "Invalid lock token or already acked",
1125 ));
1126 }
1127 } else if e.to_string().contains("Invalid lock token")
1128 || e.to_string().contains("already acked")
1129 {
1130 return Err(ProviderError::permanent(
1131 "abandon_work_item",
1132 "Invalid lock token or already acked",
1133 ));
1134 }
1135
1136 Err(Self::sqlx_to_provider_error("abandon_work_item", e))
1137 }
1138 }
1139 }
1140
1141 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1142 async fn renew_orchestration_item_lock(
1143 &self,
1144 token: &str,
1145 extend_for: Duration,
1146 ) -> Result<(), ProviderError> {
1147 let start = std::time::Instant::now();
1148
1149 let now_ms = Self::now_millis();
1151
1152 let extend_secs = extend_for.as_secs() as i64;
1154
1155 match sqlx::query(&format!(
1156 "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
1157 self.schema_name
1158 ))
1159 .bind(token)
1160 .bind(now_ms)
1161 .bind(extend_secs)
1162 .execute(&*self.pool)
1163 .await
1164 {
1165 Ok(_) => {
1166 let duration_ms = start.elapsed().as_millis() as u64;
1167 debug!(
1168 target = "duroxide::providers::postgres",
1169 operation = "renew_orchestration_item_lock",
1170 token = %token,
1171 extend_for_secs = extend_secs,
1172 duration_ms = duration_ms,
1173 "Orchestration item lock renewed successfully"
1174 );
1175 Ok(())
1176 }
1177 Err(e) => {
1178 if let SqlxError::Database(db_err) = &e {
1179 if db_err.message().contains("Lock token invalid")
1180 || db_err.message().contains("expired")
1181 || db_err.message().contains("already released")
1182 {
1183 return Err(ProviderError::permanent(
1184 "renew_orchestration_item_lock",
1185 "Lock token invalid, expired, or already released",
1186 ));
1187 }
1188 } else if e.to_string().contains("Lock token invalid")
1189 || e.to_string().contains("expired")
1190 || e.to_string().contains("already released")
1191 {
1192 return Err(ProviderError::permanent(
1193 "renew_orchestration_item_lock",
1194 "Lock token invalid, expired, or already released",
1195 ));
1196 }
1197
1198 Err(Self::sqlx_to_provider_error(
1199 "renew_orchestration_item_lock",
1200 e,
1201 ))
1202 }
1203 }
1204 }
1205
1206 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1207 async fn enqueue_for_orchestrator(
1208 &self,
1209 item: WorkItem,
1210 delay: Option<Duration>,
1211 ) -> Result<(), ProviderError> {
1212 let work_item = serde_json::to_string(&item).map_err(|e| {
1213 ProviderError::permanent(
1214 "enqueue_orchestrator_work",
1215 format!("Failed to serialize work item: {e}"),
1216 )
1217 })?;
1218
1219 let instance_id = match &item {
1221 WorkItem::StartOrchestration { instance, .. }
1222 | WorkItem::ActivityCompleted { instance, .. }
1223 | WorkItem::ActivityFailed { instance, .. }
1224 | WorkItem::TimerFired { instance, .. }
1225 | WorkItem::ExternalRaised { instance, .. }
1226 | WorkItem::CancelInstance { instance, .. }
1227 | WorkItem::ContinueAsNew { instance, .. }
1228 | WorkItem::QueueMessage { instance, .. } => instance,
1229 WorkItem::SubOrchCompleted {
1230 parent_instance, ..
1231 }
1232 | WorkItem::SubOrchFailed {
1233 parent_instance, ..
1234 } => parent_instance,
1235 WorkItem::ActivityExecute { .. } => {
1236 return Err(ProviderError::permanent(
1237 "enqueue_orchestrator_work",
1238 "ActivityExecute should go to worker queue, not orchestrator queue",
1239 ));
1240 }
1241 };
1242
1243 let now_ms = Self::now_millis();
1245
1246 let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
1247 if *fire_at_ms > 0 {
1248 if let Some(delay) = delay {
1250 std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
1251 } else {
1252 *fire_at_ms
1253 }
1254 } else {
1255 delay
1257 .map(|d| now_ms as u64 + d.as_millis() as u64)
1258 .unwrap_or(now_ms as u64)
1259 }
1260 } else {
1261 delay
1263 .map(|d| now_ms as u64 + d.as_millis() as u64)
1264 .unwrap_or(now_ms as u64)
1265 };
1266
1267 let visible_at = Utc
1268 .timestamp_millis_opt(visible_at_ms as i64)
1269 .single()
1270 .ok_or_else(|| {
1271 ProviderError::permanent(
1272 "enqueue_orchestrator_work",
1273 "Invalid visible_at timestamp",
1274 )
1275 })?;
1276
1277 sqlx::query(&format!(
1282 "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
1283 self.schema_name
1284 ))
1285 .bind(instance_id)
1286 .bind(&work_item)
1287 .bind(visible_at)
1288 .bind::<Option<String>>(None) .bind::<Option<String>>(None) .bind::<Option<i64>>(None) .execute(&*self.pool)
1292 .await
1293 .map_err(|e| {
1294 error!(
1295 target = "duroxide::providers::postgres",
1296 operation = "enqueue_orchestrator_work",
1297 error_type = "database_error",
1298 error = %e,
1299 instance_id = %instance_id,
1300 "Failed to enqueue orchestrator work"
1301 );
1302 Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1303 })?;
1304
1305 debug!(
1306 target = "duroxide::providers::postgres",
1307 operation = "enqueue_orchestrator_work",
1308 instance_id = %instance_id,
1309 delay_ms = delay.map(|d| d.as_millis() as u64),
1310 "Enqueued orchestrator work"
1311 );
1312
1313 Ok(())
1314 }
1315
1316 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1317 async fn read_with_execution(
1318 &self,
1319 instance: &str,
1320 execution_id: u64,
1321 ) -> Result<Vec<Event>, ProviderError> {
1322 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1323 "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1324 self.table_name("history")
1325 ))
1326 .bind(instance)
1327 .bind(execution_id as i64)
1328 .fetch_all(&*self.pool)
1329 .await
1330 .ok()
1331 .unwrap_or_default();
1332
1333 Ok(event_data_rows
1334 .into_iter()
1335 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1336 .collect())
1337 }
1338
1339 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1340 async fn renew_session_lock(
1341 &self,
1342 owner_ids: &[&str],
1343 extend_for: Duration,
1344 idle_timeout: Duration,
1345 ) -> Result<usize, ProviderError> {
1346 if owner_ids.is_empty() {
1347 return Ok(0);
1348 }
1349
1350 let now_ms = Self::now_millis();
1351 let extend_ms = extend_for.as_millis() as i64;
1352 let idle_timeout_ms = idle_timeout.as_millis() as i64;
1353 let owner_ids_vec: Vec<&str> = owner_ids.to_vec();
1354
1355 let result = sqlx::query_scalar::<_, i64>(&format!(
1356 "SELECT {}.renew_session_lock($1, $2, $3, $4)",
1357 self.schema_name
1358 ))
1359 .bind(&owner_ids_vec)
1360 .bind(now_ms)
1361 .bind(extend_ms)
1362 .bind(idle_timeout_ms)
1363 .fetch_one(&*self.pool)
1364 .await
1365 .map_err(|e| Self::sqlx_to_provider_error("renew_session_lock", e))?;
1366
1367 debug!(
1368 target = "duroxide::providers::postgres",
1369 operation = "renew_session_lock",
1370 owner_count = owner_ids.len(),
1371 sessions_renewed = result,
1372 "Session locks renewed"
1373 );
1374
1375 Ok(result as usize)
1376 }
1377
1378 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1379 async fn cleanup_orphaned_sessions(
1380 &self,
1381 _idle_timeout: Duration,
1382 ) -> Result<usize, ProviderError> {
1383 let now_ms = Self::now_millis();
1384
1385 let result = sqlx::query_scalar::<_, i64>(&format!(
1386 "SELECT {}.cleanup_orphaned_sessions($1)",
1387 self.schema_name
1388 ))
1389 .bind(now_ms)
1390 .fetch_one(&*self.pool)
1391 .await
1392 .map_err(|e| Self::sqlx_to_provider_error("cleanup_orphaned_sessions", e))?;
1393
1394 debug!(
1395 target = "duroxide::providers::postgres",
1396 operation = "cleanup_orphaned_sessions",
1397 sessions_cleaned = result,
1398 "Orphaned sessions cleaned up"
1399 );
1400
1401 Ok(result as usize)
1402 }
1403
1404 fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1405 Some(self)
1406 }
1407
1408 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1409 async fn get_custom_status(
1410 &self,
1411 instance: &str,
1412 last_seen_version: u64,
1413 ) -> Result<Option<(Option<String>, u64)>, ProviderError> {
1414 let row = sqlx::query_as::<_, (Option<String>, i64)>(&format!(
1415 "SELECT * FROM {}.get_custom_status($1, $2)",
1416 self.schema_name
1417 ))
1418 .bind(instance)
1419 .bind(last_seen_version as i64)
1420 .fetch_optional(&*self.pool)
1421 .await
1422 .map_err(|e| Self::sqlx_to_provider_error("get_custom_status", e))?;
1423
1424 match row {
1425 Some((custom_status, version)) => Ok(Some((custom_status, version as u64))),
1426 None => Ok(None),
1427 }
1428 }
1429
1430 async fn get_kv_value(
1431 &self,
1432 instance_id: &str,
1433 key: &str,
1434 ) -> Result<Option<String>, ProviderError> {
1435 let query = format!(
1436 "SELECT value FROM {}.kv_store WHERE instance_id = $1 AND key = $2",
1437 self.schema_name
1438 );
1439 let result: Option<(String,)> = sqlx::query_as(&query)
1440 .bind(instance_id)
1441 .bind(key)
1442 .fetch_optional(&*self.pool)
1443 .await
1444 .map_err(|e| ProviderError::retryable("get_kv_value", format!("get_kv_value: {e}")))?;
1445 Ok(result.map(|(v,)| v))
1446 }
1447}
1448
1449#[async_trait::async_trait]
1450impl ProviderAdmin for PostgresProvider {
1451 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1452 async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1453 sqlx::query_scalar(&format!(
1454 "SELECT instance_id FROM {}.list_instances()",
1455 self.schema_name
1456 ))
1457 .fetch_all(&*self.pool)
1458 .await
1459 .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1460 }
1461
1462 #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1463 async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1464 sqlx::query_scalar(&format!(
1465 "SELECT instance_id FROM {}.list_instances_by_status($1)",
1466 self.schema_name
1467 ))
1468 .bind(status)
1469 .fetch_all(&*self.pool)
1470 .await
1471 .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1472 }
1473
1474 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1475 async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1476 let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1477 "SELECT execution_id FROM {}.list_executions($1)",
1478 self.schema_name
1479 ))
1480 .bind(instance)
1481 .fetch_all(&*self.pool)
1482 .await
1483 .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1484
1485 Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1486 }
1487
1488 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1489 async fn read_history_with_execution_id(
1490 &self,
1491 instance: &str,
1492 execution_id: u64,
1493 ) -> Result<Vec<Event>, ProviderError> {
1494 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1495 "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1496 self.schema_name
1497 ))
1498 .bind(instance)
1499 .bind(execution_id as i64)
1500 .fetch_all(&*self.pool)
1501 .await
1502 .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1503
1504 event_data_rows
1505 .into_iter()
1506 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1507 .collect::<Vec<Event>>()
1508 .into_iter()
1509 .map(Ok)
1510 .collect()
1511 }
1512
1513 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1514 async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1515 let execution_id = self.latest_execution_id(instance).await?;
1516 self.read_history_with_execution_id(instance, execution_id)
1517 .await
1518 }
1519
1520 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1521 async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1522 sqlx::query_scalar(&format!(
1523 "SELECT {}.latest_execution_id($1)",
1524 self.schema_name
1525 ))
1526 .bind(instance)
1527 .fetch_optional(&*self.pool)
1528 .await
1529 .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1530 .map(|id: i64| id as u64)
1531 .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1532 }
1533
1534 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1535 async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1536 let row: Option<(
1537 String,
1538 String,
1539 String,
1540 i64,
1541 chrono::DateTime<Utc>,
1542 Option<chrono::DateTime<Utc>>,
1543 Option<String>,
1544 Option<String>,
1545 Option<String>,
1546 )> = sqlx::query_as(&format!(
1547 "SELECT * FROM {}.get_instance_info($1)",
1548 self.schema_name
1549 ))
1550 .bind(instance)
1551 .fetch_optional(&*self.pool)
1552 .await
1553 .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1554
1555 let (
1556 instance_id,
1557 orchestration_name,
1558 orchestration_version,
1559 current_execution_id,
1560 created_at,
1561 updated_at,
1562 status,
1563 output,
1564 parent_instance_id,
1565 ) =
1566 row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1567
1568 Ok(InstanceInfo {
1569 instance_id,
1570 orchestration_name,
1571 orchestration_version,
1572 current_execution_id: current_execution_id as u64,
1573 status: status.unwrap_or_else(|| "Running".to_string()),
1574 output,
1575 created_at: created_at.timestamp_millis() as u64,
1576 updated_at: updated_at
1577 .map(|dt| dt.timestamp_millis() as u64)
1578 .unwrap_or(created_at.timestamp_millis() as u64),
1579 parent_instance_id,
1580 })
1581 }
1582
1583 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1584 async fn get_execution_info(
1585 &self,
1586 instance: &str,
1587 execution_id: u64,
1588 ) -> Result<ExecutionInfo, ProviderError> {
1589 let row: Option<(
1590 i64,
1591 String,
1592 Option<String>,
1593 chrono::DateTime<Utc>,
1594 Option<chrono::DateTime<Utc>>,
1595 i64,
1596 )> = sqlx::query_as(&format!(
1597 "SELECT * FROM {}.get_execution_info($1, $2)",
1598 self.schema_name
1599 ))
1600 .bind(instance)
1601 .bind(execution_id as i64)
1602 .fetch_optional(&*self.pool)
1603 .await
1604 .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1605
1606 let (exec_id, status, output, started_at, completed_at, event_count) = row
1607 .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1608
1609 Ok(ExecutionInfo {
1610 execution_id: exec_id as u64,
1611 status,
1612 output,
1613 started_at: started_at.timestamp_millis() as u64,
1614 completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1615 event_count: event_count as usize,
1616 })
1617 }
1618
1619 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1620 async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1621 let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1622 "SELECT * FROM {}.get_system_metrics()",
1623 self.schema_name
1624 ))
1625 .fetch_optional(&*self.pool)
1626 .await
1627 .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1628
1629 let (
1630 total_instances,
1631 total_executions,
1632 running_instances,
1633 completed_instances,
1634 failed_instances,
1635 total_events,
1636 ) = row.ok_or_else(|| {
1637 ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1638 })?;
1639
1640 Ok(SystemMetrics {
1641 total_instances: total_instances as u64,
1642 total_executions: total_executions as u64,
1643 running_instances: running_instances as u64,
1644 completed_instances: completed_instances as u64,
1645 failed_instances: failed_instances as u64,
1646 total_events: total_events as u64,
1647 })
1648 }
1649
1650 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1651 async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1652 let now_ms = Self::now_millis();
1653
1654 let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1655 "SELECT * FROM {}.get_queue_depths($1)",
1656 self.schema_name
1657 ))
1658 .bind(now_ms)
1659 .fetch_optional(&*self.pool)
1660 .await
1661 .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1662
1663 let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1664 ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1665 })?;
1666
1667 Ok(QueueDepths {
1668 orchestrator_queue: orchestrator_queue as usize,
1669 worker_queue: worker_queue as usize,
1670 timer_queue: 0, })
1672 }
1673
1674 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1677 async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError> {
1678 sqlx::query_scalar(&format!(
1679 "SELECT child_instance_id FROM {}.list_children($1)",
1680 self.schema_name
1681 ))
1682 .bind(instance_id)
1683 .fetch_all(&*self.pool)
1684 .await
1685 .map_err(|e| Self::sqlx_to_provider_error("list_children", e))
1686 }
1687
1688 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1689 async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError> {
1690 let result: Result<Option<String>, _> =
1693 sqlx::query_scalar(&format!("SELECT {}.get_parent_id($1)", self.schema_name))
1694 .bind(instance_id)
1695 .fetch_one(&*self.pool)
1696 .await;
1697
1698 match result {
1699 Ok(parent_id) => Ok(parent_id),
1700 Err(e) => {
1701 let err_str = e.to_string();
1702 if err_str.contains("Instance not found") {
1703 Err(ProviderError::permanent(
1704 "get_parent_id",
1705 format!("Instance not found: {}", instance_id),
1706 ))
1707 } else {
1708 Err(Self::sqlx_to_provider_error("get_parent_id", e))
1709 }
1710 }
1711 }
1712 }
1713
1714 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1717 async fn delete_instances_atomic(
1718 &self,
1719 ids: &[String],
1720 force: bool,
1721 ) -> Result<DeleteInstanceResult, ProviderError> {
1722 if ids.is_empty() {
1723 return Ok(DeleteInstanceResult::default());
1724 }
1725
1726 let row: Option<(i64, i64, i64, i64)> = sqlx::query_as(&format!(
1727 "SELECT * FROM {}.delete_instances_atomic($1, $2)",
1728 self.schema_name
1729 ))
1730 .bind(ids)
1731 .bind(force)
1732 .fetch_optional(&*self.pool)
1733 .await
1734 .map_err(|e| {
1735 let err_str = e.to_string();
1736 if err_str.contains("is Running") {
1737 ProviderError::permanent("delete_instances_atomic", err_str)
1738 } else if err_str.contains("Orphan detected") {
1739 ProviderError::permanent("delete_instances_atomic", err_str)
1740 } else {
1741 Self::sqlx_to_provider_error("delete_instances_atomic", e)
1742 }
1743 })?;
1744
1745 let (instances_deleted, executions_deleted, events_deleted, queue_messages_deleted) =
1746 row.unwrap_or((0, 0, 0, 0));
1747
1748 debug!(
1749 target = "duroxide::providers::postgres",
1750 operation = "delete_instances_atomic",
1751 instances_deleted = instances_deleted,
1752 executions_deleted = executions_deleted,
1753 events_deleted = events_deleted,
1754 queue_messages_deleted = queue_messages_deleted,
1755 "Deleted instances atomically"
1756 );
1757
1758 Ok(DeleteInstanceResult {
1759 instances_deleted: instances_deleted as u64,
1760 executions_deleted: executions_deleted as u64,
1761 events_deleted: events_deleted as u64,
1762 queue_messages_deleted: queue_messages_deleted as u64,
1763 })
1764 }
1765
1766 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1767 async fn delete_instance_bulk(
1768 &self,
1769 filter: InstanceFilter,
1770 ) -> Result<DeleteInstanceResult, ProviderError> {
1771 let mut sql = format!(
1773 r#"
1774 SELECT i.instance_id
1775 FROM {}.instances i
1776 LEFT JOIN {}.executions e ON i.instance_id = e.instance_id
1777 AND i.current_execution_id = e.execution_id
1778 WHERE i.parent_instance_id IS NULL
1779 AND e.status IN ('Completed', 'Failed', 'ContinuedAsNew')
1780 "#,
1781 self.schema_name, self.schema_name
1782 );
1783
1784 if let Some(ref ids) = filter.instance_ids {
1786 if ids.is_empty() {
1787 return Ok(DeleteInstanceResult::default());
1788 }
1789 let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1790 sql.push_str(&format!(
1791 " AND i.instance_id IN ({})",
1792 placeholders.join(", ")
1793 ));
1794 }
1795
1796 if filter.completed_before.is_some() {
1798 let param_num = filter
1799 .instance_ids
1800 .as_ref()
1801 .map(|ids| ids.len())
1802 .unwrap_or(0)
1803 + 1;
1804 sql.push_str(&format!(
1805 " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1806 param_num
1807 ));
1808 }
1809
1810 let limit = filter.limit.unwrap_or(1000);
1812 let limit_param_num = filter
1813 .instance_ids
1814 .as_ref()
1815 .map(|ids| ids.len())
1816 .unwrap_or(0)
1817 + if filter.completed_before.is_some() {
1818 1
1819 } else {
1820 0
1821 }
1822 + 1;
1823 sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1824
1825 let mut query = sqlx::query_scalar::<_, String>(&sql);
1827 if let Some(ref ids) = filter.instance_ids {
1828 for id in ids {
1829 query = query.bind(id);
1830 }
1831 }
1832 if let Some(completed_before) = filter.completed_before {
1833 query = query.bind(completed_before as i64);
1834 }
1835 query = query.bind(limit as i64);
1836
1837 let instance_ids: Vec<String> = query
1838 .fetch_all(&*self.pool)
1839 .await
1840 .map_err(|e| Self::sqlx_to_provider_error("delete_instance_bulk", e))?;
1841
1842 if instance_ids.is_empty() {
1843 return Ok(DeleteInstanceResult::default());
1844 }
1845
1846 let mut result = DeleteInstanceResult::default();
1848
1849 for instance_id in &instance_ids {
1850 let tree = self.get_instance_tree(instance_id).await?;
1852
1853 let delete_result = self.delete_instances_atomic(&tree.all_ids, true).await?;
1855 result.instances_deleted += delete_result.instances_deleted;
1856 result.executions_deleted += delete_result.executions_deleted;
1857 result.events_deleted += delete_result.events_deleted;
1858 result.queue_messages_deleted += delete_result.queue_messages_deleted;
1859 }
1860
1861 debug!(
1862 target = "duroxide::providers::postgres",
1863 operation = "delete_instance_bulk",
1864 instances_deleted = result.instances_deleted,
1865 executions_deleted = result.executions_deleted,
1866 events_deleted = result.events_deleted,
1867 queue_messages_deleted = result.queue_messages_deleted,
1868 "Bulk deleted instances"
1869 );
1870
1871 Ok(result)
1872 }
1873
1874 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1877 async fn prune_executions(
1878 &self,
1879 instance_id: &str,
1880 options: PruneOptions,
1881 ) -> Result<PruneResult, ProviderError> {
1882 let keep_last: Option<i32> = options.keep_last.map(|v| v as i32);
1883 let completed_before_ms: Option<i64> = options.completed_before.map(|v| v as i64);
1884
1885 let row: Option<(i64, i64, i64)> = sqlx::query_as(&format!(
1886 "SELECT * FROM {}.prune_executions($1, $2, $3)",
1887 self.schema_name
1888 ))
1889 .bind(instance_id)
1890 .bind(keep_last)
1891 .bind(completed_before_ms)
1892 .fetch_optional(&*self.pool)
1893 .await
1894 .map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?;
1895
1896 let (instances_processed, executions_deleted, events_deleted) = row.unwrap_or((0, 0, 0));
1897
1898 debug!(
1899 target = "duroxide::providers::postgres",
1900 operation = "prune_executions",
1901 instance_id = %instance_id,
1902 instances_processed = instances_processed,
1903 executions_deleted = executions_deleted,
1904 events_deleted = events_deleted,
1905 "Pruned executions"
1906 );
1907
1908 Ok(PruneResult {
1909 instances_processed: instances_processed as u64,
1910 executions_deleted: executions_deleted as u64,
1911 events_deleted: events_deleted as u64,
1912 })
1913 }
1914
1915 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1916 async fn prune_executions_bulk(
1917 &self,
1918 filter: InstanceFilter,
1919 options: PruneOptions,
1920 ) -> Result<PruneResult, ProviderError> {
1921 let mut sql = format!(
1926 r#"
1927 SELECT i.instance_id
1928 FROM {}.instances i
1929 LEFT JOIN {}.executions e ON i.instance_id = e.instance_id
1930 AND i.current_execution_id = e.execution_id
1931 WHERE 1=1
1932 "#,
1933 self.schema_name, self.schema_name
1934 );
1935
1936 if let Some(ref ids) = filter.instance_ids {
1938 if ids.is_empty() {
1939 return Ok(PruneResult::default());
1940 }
1941 let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1942 sql.push_str(&format!(
1943 " AND i.instance_id IN ({})",
1944 placeholders.join(", ")
1945 ));
1946 }
1947
1948 if filter.completed_before.is_some() {
1950 let param_num = filter
1951 .instance_ids
1952 .as_ref()
1953 .map(|ids| ids.len())
1954 .unwrap_or(0)
1955 + 1;
1956 sql.push_str(&format!(
1957 " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1958 param_num
1959 ));
1960 }
1961
1962 let limit = filter.limit.unwrap_or(1000);
1964 let limit_param_num = filter
1965 .instance_ids
1966 .as_ref()
1967 .map(|ids| ids.len())
1968 .unwrap_or(0)
1969 + if filter.completed_before.is_some() {
1970 1
1971 } else {
1972 0
1973 }
1974 + 1;
1975 sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1976
1977 let mut query = sqlx::query_scalar::<_, String>(&sql);
1979 if let Some(ref ids) = filter.instance_ids {
1980 for id in ids {
1981 query = query.bind(id);
1982 }
1983 }
1984 if let Some(completed_before) = filter.completed_before {
1985 query = query.bind(completed_before as i64);
1986 }
1987 query = query.bind(limit as i64);
1988
1989 let instance_ids: Vec<String> = query
1990 .fetch_all(&*self.pool)
1991 .await
1992 .map_err(|e| Self::sqlx_to_provider_error("prune_executions_bulk", e))?;
1993
1994 let mut result = PruneResult::default();
1996
1997 for instance_id in &instance_ids {
1998 let single_result = self.prune_executions(instance_id, options.clone()).await?;
1999 result.instances_processed += single_result.instances_processed;
2000 result.executions_deleted += single_result.executions_deleted;
2001 result.events_deleted += single_result.events_deleted;
2002 }
2003
2004 debug!(
2005 target = "duroxide::providers::postgres",
2006 operation = "prune_executions_bulk",
2007 instances_processed = result.instances_processed,
2008 executions_deleted = result.executions_deleted,
2009 events_deleted = result.events_deleted,
2010 "Bulk pruned executions"
2011 );
2012
2013 Ok(result)
2014 }
2015}