1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4 DeleteInstanceResult, DispatcherCapabilityFilter, ExecutionInfo, ExecutionMetadata,
5 InstanceFilter, InstanceInfo, OrchestrationItem, Provider, ProviderAdmin, ProviderError,
6 PruneOptions, PruneResult, QueueDepths, ScheduledActivityIdentifier, SessionFetchConfig,
7 SystemMetrics, TagFilter, WorkItem,
8};
9use duroxide::{Event, EventKind, SystemStats};
10use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
11use std::sync::Arc;
12use std::time::Duration;
13use std::time::{SystemTime, UNIX_EPOCH};
14use tokio::time::sleep;
15use tracing::{debug, error, instrument, warn};
16
17use crate::migrations::MigrationRunner;
18
19pub struct PostgresProvider {
42 pool: Arc<PgPool>,
43 schema_name: String,
44}
45
46impl PostgresProvider {
47 pub async fn new(database_url: &str) -> Result<Self> {
48 Self::new_with_schema(database_url, None).await
49 }
50
51 pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
52 let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
53 .ok()
54 .and_then(|s| s.parse::<u32>().ok())
55 .unwrap_or(10);
56
57 let pool = PgPoolOptions::new()
58 .max_connections(max_connections)
59 .min_connections(1)
60 .acquire_timeout(std::time::Duration::from_secs(30))
61 .connect(database_url)
62 .await?;
63
64 let schema_name = schema_name.unwrap_or("public").to_string();
65
66 let provider = Self {
67 pool: Arc::new(pool),
68 schema_name: schema_name.clone(),
69 };
70
71 let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
73 migration_runner.migrate().await?;
74
75 Ok(provider)
76 }
77
78 #[instrument(skip(self), target = "duroxide::providers::postgres")]
79 pub async fn initialize_schema(&self) -> Result<()> {
80 let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
83 migration_runner.migrate().await?;
84 Ok(())
85 }
86
87 fn now_millis() -> i64 {
89 SystemTime::now()
90 .duration_since(UNIX_EPOCH)
91 .unwrap()
92 .as_millis() as i64
93 }
94
95 fn table_name(&self, table: &str) -> String {
97 format!("{}.{}", self.schema_name, table)
98 }
99
100 pub fn pool(&self) -> &PgPool {
102 &self.pool
103 }
104
105 pub fn schema_name(&self) -> &str {
107 &self.schema_name
108 }
109
110 fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
112 match e {
113 SqlxError::Database(ref db_err) => {
114 let code_opt = db_err.code();
116 let code = code_opt.as_deref();
117 if code == Some("40P01") {
118 ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
120 } else if code == Some("40001") {
121 ProviderError::permanent(operation, format!("Serialization failure: {e}"))
123 } else if code == Some("23505") {
124 ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
126 } else if code == Some("23503") {
127 ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
129 } else if code == Some("0A000") {
130 ProviderError::retryable(operation, format!("Cached plan invalidated: {e}"))
132 } else {
133 ProviderError::permanent(operation, format!("Database error: {e}"))
134 }
135 }
136 SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
137 ProviderError::retryable(operation, format!("Connection pool error: {e}"))
138 }
139 SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
140 _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
141 }
142 }
143
144 fn tag_filter_to_sql(filter: &TagFilter) -> (&'static str, Vec<String>) {
146 match filter {
147 TagFilter::DefaultOnly => ("default_only", vec![]),
148 TagFilter::Tags(set) => {
149 let mut tags: Vec<String> = set.iter().cloned().collect();
150 tags.sort();
151 ("tags", tags)
152 }
153 TagFilter::DefaultAnd(set) => {
154 let mut tags: Vec<String> = set.iter().cloned().collect();
155 tags.sort();
156 ("default_and", tags)
157 }
158 TagFilter::Any => ("any", vec![]),
159 TagFilter::None => ("none", vec![]),
160 }
161 }
162
163 pub async fn cleanup_schema(&self) -> Result<()> {
168 const MAX_RETRIES: u32 = 5;
169 const BASE_RETRY_DELAY_MS: u64 = 50;
170
171 for attempt in 0..=MAX_RETRIES {
172 let cleanup_result = async {
173 sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
175 .execute(&*self.pool)
176 .await?;
177
178 if self.schema_name != "public" {
181 sqlx::query(&format!(
182 "DROP SCHEMA IF EXISTS {} CASCADE",
183 self.schema_name
184 ))
185 .execute(&*self.pool)
186 .await?;
187 } else {
188 }
191
192 Ok::<(), SqlxError>(())
193 }
194 .await;
195
196 match cleanup_result {
197 Ok(()) => return Ok(()),
198 Err(SqlxError::Database(db_err)) if db_err.code().as_deref() == Some("40P01") => {
199 if attempt < MAX_RETRIES {
200 warn!(
201 target = "duroxide::providers::postgres",
202 schema = %self.schema_name,
203 attempt = attempt + 1,
204 "Deadlock during cleanup_schema, retrying"
205 );
206 sleep(Duration::from_millis(
207 BASE_RETRY_DELAY_MS * (attempt as u64 + 1),
208 ))
209 .await;
210 continue;
211 }
212 return Err(anyhow::anyhow!(db_err.to_string()));
213 }
214 Err(e) => return Err(anyhow::anyhow!(e.to_string())),
215 }
216 }
217
218 Ok(())
219 }
220}
221
222#[async_trait::async_trait]
223impl Provider for PostgresProvider {
224 fn name(&self) -> &str {
225 "duroxide-pg"
226 }
227
228 fn version(&self) -> &str {
229 env!("CARGO_PKG_VERSION")
230 }
231
232 #[instrument(skip(self), target = "duroxide::providers::postgres")]
233 async fn fetch_orchestration_item(
234 &self,
235 lock_timeout: Duration,
236 _poll_timeout: Duration,
237 filter: Option<&DispatcherCapabilityFilter>,
238 ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
239 let start = std::time::Instant::now();
240
241 const MAX_RETRIES: u32 = 3;
242 const RETRY_DELAY_MS: u64 = 50;
243
244 let lock_timeout_ms = lock_timeout.as_millis() as i64;
246 let mut _last_error: Option<ProviderError> = None;
247
248 let (min_packed, max_packed) = if let Some(f) = filter {
250 if let Some(range) = f.supported_duroxide_versions.first() {
251 let min = range.min.major as i64 * 1_000_000
252 + range.min.minor as i64 * 1_000
253 + range.min.patch as i64;
254 let max = range.max.major as i64 * 1_000_000
255 + range.max.minor as i64 * 1_000
256 + range.max.patch as i64;
257 (Some(min), Some(max))
258 } else {
259 return Ok(None);
261 }
262 } else {
263 (None, None)
264 };
265
266 for attempt in 0..=MAX_RETRIES {
267 let now_ms = Self::now_millis();
268
269 let result: Result<
270 Option<(
271 String,
272 String,
273 String,
274 i64,
275 serde_json::Value,
276 serde_json::Value,
277 String,
278 i32,
279 serde_json::Value,
280 )>,
281 SqlxError,
282 > = sqlx::query_as(&format!(
283 "SELECT * FROM {}.fetch_orchestration_item($1, $2, $3, $4)",
284 self.schema_name
285 ))
286 .bind(now_ms)
287 .bind(lock_timeout_ms)
288 .bind(min_packed)
289 .bind(max_packed)
290 .fetch_optional(&*self.pool)
291 .await;
292
293 let row = match result {
294 Ok(r) => r,
295 Err(e) => {
296 let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
297 if provider_err.is_retryable() && attempt < MAX_RETRIES {
298 warn!(
299 target = "duroxide::providers::postgres",
300 operation = "fetch_orchestration_item",
301 attempt = attempt + 1,
302 error = %provider_err,
303 "Retryable error, will retry"
304 );
305 _last_error = Some(provider_err);
306 sleep(std::time::Duration::from_millis(
307 RETRY_DELAY_MS * (attempt as u64 + 1),
308 ))
309 .await;
310 continue;
311 }
312 return Err(provider_err);
313 }
314 };
315
316 if let Some((
317 instance_id,
318 orchestration_name,
319 orchestration_version,
320 execution_id,
321 history_json,
322 messages_json,
323 lock_token,
324 attempt_count,
325 kv_snapshot_json,
326 )) = row
327 {
328 let (history, history_error) =
329 match serde_json::from_value::<Vec<Event>>(history_json) {
330 Ok(h) => (h, None),
331 Err(e) => {
332 let error_msg = format!("Failed to deserialize history: {e}");
333 warn!(
334 target = "duroxide::providers::postgres",
335 instance = %instance_id,
336 error = %error_msg,
337 "History deserialization failed, returning item with history_error"
338 );
339 (vec![], Some(error_msg))
340 }
341 };
342
343 let messages: Vec<WorkItem> =
344 serde_json::from_value(messages_json).map_err(|e| {
345 ProviderError::permanent(
346 "fetch_orchestration_item",
347 format!("Failed to deserialize messages: {e}"),
348 )
349 })?;
350 let kv_snapshot: std::collections::HashMap<String, duroxide::providers::KvEntry> = {
351 let raw: std::collections::HashMap<String, serde_json::Value> =
352 serde_json::from_value(kv_snapshot_json).unwrap_or_default();
353 raw.into_iter()
354 .filter_map(|(k, v)| {
355 let value = v.get("value")?.as_str()?.to_string();
356 let last_updated_at_ms =
357 v.get("last_updated_at_ms")?.as_u64().unwrap_or(0);
358 Some((
359 k,
360 duroxide::providers::KvEntry {
361 value,
362 last_updated_at_ms,
363 },
364 ))
365 })
366 .collect()
367 };
368
369 let duration_ms = start.elapsed().as_millis() as u64;
370 debug!(
371 target = "duroxide::providers::postgres",
372 operation = "fetch_orchestration_item",
373 instance_id = %instance_id,
374 execution_id = execution_id,
375 message_count = messages.len(),
376 history_count = history.len(),
377 attempt_count = attempt_count,
378 duration_ms = duration_ms,
379 attempts = attempt + 1,
380 "Fetched orchestration item via stored procedure"
381 );
382
383 if orchestration_name == "Unknown"
389 && history.is_empty()
390 && messages
391 .iter()
392 .all(|m| matches!(m, WorkItem::QueueMessage { .. }))
393 {
394 let message_count = messages.len();
395 tracing::warn!(
396 target = "duroxide::providers::postgres",
397 instance = %instance_id,
398 message_count,
399 "Dropping orphan queue messages — events enqueued before orchestration started are not supported"
400 );
401 self.ack_orchestration_item(
402 &lock_token,
403 execution_id as u64,
404 vec![],
405 vec![],
406 vec![],
407 ExecutionMetadata::default(),
408 vec![],
409 )
410 .await?;
411 return Ok(None);
412 }
413
414 return Ok(Some((
415 OrchestrationItem {
416 instance: instance_id,
417 orchestration_name,
418 execution_id: execution_id as u64,
419 version: orchestration_version,
420 history,
421 messages,
422 history_error,
423 kv_snapshot,
424 },
425 lock_token,
426 attempt_count as u32,
427 )));
428 }
429
430 return Ok(None);
433 }
434
435 Ok(None)
436 }
437 #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
438 async fn ack_orchestration_item(
439 &self,
440 lock_token: &str,
441 execution_id: u64,
442 history_delta: Vec<Event>,
443 worker_items: Vec<WorkItem>,
444 orchestrator_items: Vec<WorkItem>,
445 metadata: ExecutionMetadata,
446 cancelled_activities: Vec<ScheduledActivityIdentifier>,
447 ) -> Result<(), ProviderError> {
448 let start = std::time::Instant::now();
449
450 const MAX_RETRIES: u32 = 3;
451 const RETRY_DELAY_MS: u64 = 50;
452
453 let mut history_delta_payload = Vec::with_capacity(history_delta.len());
454 for event in &history_delta {
455 if event.event_id() == 0 {
456 return Err(ProviderError::permanent(
457 "ack_orchestration_item",
458 "event_id must be set by runtime",
459 ));
460 }
461
462 let event_json = serde_json::to_string(event).map_err(|e| {
463 ProviderError::permanent(
464 "ack_orchestration_item",
465 format!("Failed to serialize event: {e}"),
466 )
467 })?;
468
469 let event_type = format!("{event:?}")
470 .split('{')
471 .next()
472 .unwrap_or("Unknown")
473 .trim()
474 .to_string();
475
476 history_delta_payload.push(serde_json::json!({
477 "event_id": event.event_id(),
478 "event_type": event_type,
479 "event_data": event_json,
480 }));
481 }
482
483 let history_delta_json = serde_json::Value::Array(history_delta_payload);
484
485 let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
486 ProviderError::permanent(
487 "ack_orchestration_item",
488 format!("Failed to serialize worker items: {e}"),
489 )
490 })?;
491
492 let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
493 ProviderError::permanent(
494 "ack_orchestration_item",
495 format!("Failed to serialize orchestrator items: {e}"),
496 )
497 })?;
498
499 let (custom_status_action, custom_status_value): (Option<&str>, Option<&str>) = {
501 let mut last_status: Option<&Option<String>> = None;
502 for event in &history_delta {
503 if let EventKind::CustomStatusUpdated { ref status } = event.kind {
504 last_status = Some(status);
505 }
506 }
507 match last_status {
508 Some(Some(s)) => (Some("set"), Some(s.as_str())),
509 Some(None) => (Some("clear"), None),
510 None => (None, None),
511 }
512 };
513
514 let kv_mutations: Vec<serde_json::Value> = history_delta
515 .iter()
516 .filter_map(|event| match &event.kind {
517 EventKind::KeyValueSet {
518 key,
519 value,
520 last_updated_at_ms,
521 } => Some(serde_json::json!({
522 "action": "set",
523 "key": key,
524 "value": value,
525 "last_updated_at_ms": last_updated_at_ms,
526 })),
527 EventKind::KeyValueCleared { key } => Some(serde_json::json!({
528 "action": "clear_key",
529 "key": key,
530 })),
531 EventKind::KeyValuesCleared => Some(serde_json::json!({
532 "action": "clear_all",
533 })),
534 _ => None,
535 })
536 .collect();
537
538 let metadata_json = serde_json::json!({
539 "orchestration_name": metadata.orchestration_name,
540 "orchestration_version": metadata.orchestration_version,
541 "status": metadata.status,
542 "output": metadata.output,
543 "parent_instance_id": metadata.parent_instance_id,
544 "pinned_duroxide_version": metadata.pinned_duroxide_version.as_ref().map(|v| {
545 serde_json::json!({
546 "major": v.major,
547 "minor": v.minor,
548 "patch": v.patch,
549 })
550 }),
551 "custom_status_action": custom_status_action,
552 "custom_status_value": custom_status_value,
553 "kv_mutations": kv_mutations,
554 });
555
556 let cancelled_activities_json: Vec<serde_json::Value> = cancelled_activities
558 .iter()
559 .map(|a| {
560 serde_json::json!({
561 "instance": a.instance,
562 "execution_id": a.execution_id,
563 "activity_id": a.activity_id,
564 })
565 })
566 .collect();
567 let cancelled_activities_json = serde_json::Value::Array(cancelled_activities_json);
568
569 for attempt in 0..=MAX_RETRIES {
570 let now_ms = Self::now_millis();
571 let result = sqlx::query(&format!(
572 "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6, $7, $8)",
573 self.schema_name
574 ))
575 .bind(lock_token)
576 .bind(now_ms)
577 .bind(execution_id as i64)
578 .bind(&history_delta_json)
579 .bind(&worker_items_json)
580 .bind(&orchestrator_items_json)
581 .bind(&metadata_json)
582 .bind(&cancelled_activities_json)
583 .execute(&*self.pool)
584 .await;
585
586 match result {
587 Ok(_) => {
588 let duration_ms = start.elapsed().as_millis() as u64;
589 debug!(
590 target = "duroxide::providers::postgres",
591 operation = "ack_orchestration_item",
592 execution_id = execution_id,
593 history_count = history_delta.len(),
594 worker_items_count = worker_items.len(),
595 orchestrator_items_count = orchestrator_items.len(),
596 cancelled_activities_count = cancelled_activities.len(),
597 duration_ms = duration_ms,
598 attempts = attempt + 1,
599 "Acknowledged orchestration item via stored procedure"
600 );
601 return Ok(());
602 }
603 Err(e) => {
604 if let SqlxError::Database(db_err) = &e {
606 if db_err.message().contains("Invalid lock token") {
607 return Err(ProviderError::permanent(
608 "ack_orchestration_item",
609 "Invalid lock token",
610 ));
611 }
612 } else if e.to_string().contains("Invalid lock token") {
613 return Err(ProviderError::permanent(
614 "ack_orchestration_item",
615 "Invalid lock token",
616 ));
617 }
618
619 let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
620 if provider_err.is_retryable() && attempt < MAX_RETRIES {
621 warn!(
622 target = "duroxide::providers::postgres",
623 operation = "ack_orchestration_item",
624 attempt = attempt + 1,
625 error = %provider_err,
626 "Retryable error, will retry"
627 );
628 sleep(std::time::Duration::from_millis(
629 RETRY_DELAY_MS * (attempt as u64 + 1),
630 ))
631 .await;
632 continue;
633 }
634 return Err(provider_err);
635 }
636 }
637 }
638
639 Ok(())
641 }
642 #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
643 async fn abandon_orchestration_item(
644 &self,
645 lock_token: &str,
646 delay: Option<Duration>,
647 ignore_attempt: bool,
648 ) -> Result<(), ProviderError> {
649 let start = std::time::Instant::now();
650 let now_ms = Self::now_millis();
651 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
652
653 let instance_id = match sqlx::query_scalar::<_, String>(&format!(
654 "SELECT {}.abandon_orchestration_item($1, $2, $3, $4)",
655 self.schema_name
656 ))
657 .bind(lock_token)
658 .bind(now_ms)
659 .bind(delay_param)
660 .bind(ignore_attempt)
661 .fetch_one(&*self.pool)
662 .await
663 {
664 Ok(instance_id) => instance_id,
665 Err(e) => {
666 if let SqlxError::Database(db_err) = &e {
667 if db_err.message().contains("Invalid lock token") {
668 return Err(ProviderError::permanent(
669 "abandon_orchestration_item",
670 "Invalid lock token",
671 ));
672 }
673 } else if e.to_string().contains("Invalid lock token") {
674 return Err(ProviderError::permanent(
675 "abandon_orchestration_item",
676 "Invalid lock token",
677 ));
678 }
679
680 return Err(Self::sqlx_to_provider_error(
681 "abandon_orchestration_item",
682 e,
683 ));
684 }
685 };
686
687 let duration_ms = start.elapsed().as_millis() as u64;
688 debug!(
689 target = "duroxide::providers::postgres",
690 operation = "abandon_orchestration_item",
691 instance_id = %instance_id,
692 delay_ms = delay.map(|d| d.as_millis() as u64),
693 ignore_attempt = ignore_attempt,
694 duration_ms = duration_ms,
695 "Abandoned orchestration item via stored procedure"
696 );
697
698 Ok(())
699 }
700
701 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
702 async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
703 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
704 "SELECT out_event_data FROM {}.fetch_history($1)",
705 self.schema_name
706 ))
707 .bind(instance)
708 .fetch_all(&*self.pool)
709 .await
710 .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
711
712 event_data_rows
713 .into_iter()
714 .map(|event_data| {
715 serde_json::from_str::<Event>(&event_data).map_err(|e| {
716 ProviderError::permanent("read", format!("Failed to deserialize event: {e}"))
717 })
718 })
719 .collect()
720 }
721
722 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
723 async fn append_with_execution(
724 &self,
725 instance: &str,
726 execution_id: u64,
727 new_events: Vec<Event>,
728 ) -> Result<(), ProviderError> {
729 if new_events.is_empty() {
730 return Ok(());
731 }
732
733 let mut events_payload = Vec::with_capacity(new_events.len());
734 for event in &new_events {
735 if event.event_id() == 0 {
736 error!(
737 target = "duroxide::providers::postgres",
738 operation = "append_with_execution",
739 error_type = "validation_error",
740 instance_id = %instance,
741 execution_id = execution_id,
742 "event_id must be set by runtime"
743 );
744 return Err(ProviderError::permanent(
745 "append_with_execution",
746 "event_id must be set by runtime",
747 ));
748 }
749
750 let event_json = serde_json::to_string(event).map_err(|e| {
751 ProviderError::permanent(
752 "append_with_execution",
753 format!("Failed to serialize event: {e}"),
754 )
755 })?;
756
757 let event_type = format!("{event:?}")
758 .split('{')
759 .next()
760 .unwrap_or("Unknown")
761 .trim()
762 .to_string();
763
764 events_payload.push(serde_json::json!({
765 "event_id": event.event_id(),
766 "event_type": event_type,
767 "event_data": event_json,
768 }));
769 }
770
771 let events_json = serde_json::Value::Array(events_payload);
772
773 sqlx::query(&format!(
774 "SELECT {}.append_history($1, $2, $3)",
775 self.schema_name
776 ))
777 .bind(instance)
778 .bind(execution_id as i64)
779 .bind(events_json)
780 .execute(&*self.pool)
781 .await
782 .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
783
784 debug!(
785 target = "duroxide::providers::postgres",
786 operation = "append_with_execution",
787 instance_id = %instance,
788 execution_id = execution_id,
789 event_count = new_events.len(),
790 "Appended history events via stored procedure"
791 );
792
793 Ok(())
794 }
795
796 #[instrument(skip(self), target = "duroxide::providers::postgres")]
797 async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
798 let work_item = serde_json::to_string(&item).map_err(|e| {
799 ProviderError::permanent(
800 "enqueue_worker_work",
801 format!("Failed to serialize work item: {e}"),
802 )
803 })?;
804
805 let now_ms = Self::now_millis();
806
807 let (instance_id, execution_id, activity_id, session_id, tag) = match &item {
809 WorkItem::ActivityExecute {
810 instance,
811 execution_id,
812 id,
813 session_id,
814 tag,
815 ..
816 } => (
817 Some(instance.clone()),
818 Some(*execution_id as i64),
819 Some(*id as i64),
820 session_id.clone(),
821 tag.clone(),
822 ),
823 _ => (None, None, None, None, None),
824 };
825
826 sqlx::query(&format!(
827 "SELECT {}.enqueue_worker_work($1, $2, $3, $4, $5, $6, $7)",
828 self.schema_name
829 ))
830 .bind(work_item)
831 .bind(now_ms)
832 .bind(&instance_id)
833 .bind(execution_id)
834 .bind(activity_id)
835 .bind(&session_id)
836 .bind(&tag)
837 .execute(&*self.pool)
838 .await
839 .map_err(|e| {
840 error!(
841 target = "duroxide::providers::postgres",
842 operation = "enqueue_worker_work",
843 error_type = "database_error",
844 error = %e,
845 "Failed to enqueue worker work"
846 );
847 Self::sqlx_to_provider_error("enqueue_worker_work", e)
848 })?;
849
850 Ok(())
851 }
852
853 #[instrument(skip(self), target = "duroxide::providers::postgres")]
854 async fn fetch_work_item(
855 &self,
856 lock_timeout: Duration,
857 _poll_timeout: Duration,
858 session: Option<&SessionFetchConfig>,
859 tag_filter: &TagFilter,
860 ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
861 if matches!(tag_filter, TagFilter::None) {
863 return Ok(None);
864 }
865
866 let start = std::time::Instant::now();
867
868 let lock_timeout_ms = lock_timeout.as_millis() as i64;
870
871 let (owner_id, session_lock_timeout_ms): (Option<&str>, Option<i64>) = match session {
873 Some(config) => (
874 Some(&config.owner_id),
875 Some(config.lock_timeout.as_millis() as i64),
876 ),
877 None => (None, None),
878 };
879
880 let (tag_mode, tag_names) = Self::tag_filter_to_sql(tag_filter);
882
883 let row = match sqlx::query_as::<_, (String, String, i32)>(&format!(
884 "SELECT * FROM {}.fetch_work_item($1, $2, $3, $4, $5, $6)",
885 self.schema_name
886 ))
887 .bind(Self::now_millis())
888 .bind(lock_timeout_ms)
889 .bind(owner_id)
890 .bind(session_lock_timeout_ms)
891 .bind(&tag_names)
892 .bind(tag_mode)
893 .fetch_optional(&*self.pool)
894 .await
895 {
896 Ok(row) => row,
897 Err(e) => {
898 return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
899 }
900 };
901
902 let (work_item_json, lock_token, attempt_count) = match row {
903 Some(row) => row,
904 None => return Ok(None),
905 };
906
907 let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
908 ProviderError::permanent(
909 "fetch_work_item",
910 format!("Failed to deserialize worker item: {e}"),
911 )
912 })?;
913
914 let duration_ms = start.elapsed().as_millis() as u64;
915
916 let instance_id = match &work_item {
918 WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
919 WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
920 WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
921 WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
922 WorkItem::TimerFired { instance, .. } => instance.as_str(),
923 WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
924 WorkItem::CancelInstance { instance, .. } => instance.as_str(),
925 WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
926 WorkItem::SubOrchCompleted {
927 parent_instance, ..
928 } => parent_instance.as_str(),
929 WorkItem::SubOrchFailed {
930 parent_instance, ..
931 } => parent_instance.as_str(),
932 WorkItem::QueueMessage { instance, .. } => instance.as_str(),
933 };
934
935 debug!(
936 target = "duroxide::providers::postgres",
937 operation = "fetch_work_item",
938 instance_id = %instance_id,
939 attempt_count = attempt_count,
940 duration_ms = duration_ms,
941 "Fetched activity work item via stored procedure"
942 );
943
944 Ok(Some((work_item, lock_token, attempt_count as u32)))
945 }
946
947 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
948 async fn ack_work_item(
949 &self,
950 token: &str,
951 completion: Option<WorkItem>,
952 ) -> Result<(), ProviderError> {
953 let start = std::time::Instant::now();
954
955 let Some(completion) = completion else {
957 let now_ms = Self::now_millis();
958 sqlx::query(&format!(
960 "SELECT {}.ack_worker($1, NULL, NULL, $2)",
961 self.schema_name
962 ))
963 .bind(token)
964 .bind(now_ms)
965 .execute(&*self.pool)
966 .await
967 .map_err(|e| {
968 if e.to_string().contains("Worker queue item not found") {
969 ProviderError::permanent(
970 "ack_worker",
971 "Worker queue item not found or already processed",
972 )
973 } else {
974 Self::sqlx_to_provider_error("ack_worker", e)
975 }
976 })?;
977
978 let duration_ms = start.elapsed().as_millis() as u64;
979 debug!(
980 target = "duroxide::providers::postgres",
981 operation = "ack_worker",
982 token = %token,
983 duration_ms = duration_ms,
984 "Acknowledged worker without completion (cancelled)"
985 );
986 return Ok(());
987 };
988
989 let instance_id = match &completion {
991 WorkItem::ActivityCompleted { instance, .. }
992 | WorkItem::ActivityFailed { instance, .. } => instance,
993 _ => {
994 error!(
995 target = "duroxide::providers::postgres",
996 operation = "ack_worker",
997 error_type = "invalid_completion_type",
998 "Invalid completion work item type"
999 );
1000 return Err(ProviderError::permanent(
1001 "ack_worker",
1002 "Invalid completion work item type",
1003 ));
1004 }
1005 };
1006
1007 let completion_json = serde_json::to_string(&completion).map_err(|e| {
1008 ProviderError::permanent("ack_worker", format!("Failed to serialize completion: {e}"))
1009 })?;
1010
1011 let now_ms = Self::now_millis();
1012
1013 sqlx::query(&format!(
1015 "SELECT {}.ack_worker($1, $2, $3, $4)",
1016 self.schema_name
1017 ))
1018 .bind(token)
1019 .bind(instance_id)
1020 .bind(completion_json)
1021 .bind(now_ms)
1022 .execute(&*self.pool)
1023 .await
1024 .map_err(|e| {
1025 if e.to_string().contains("Worker queue item not found") {
1026 error!(
1027 target = "duroxide::providers::postgres",
1028 operation = "ack_worker",
1029 error_type = "worker_item_not_found",
1030 token = %token,
1031 "Worker queue item not found or already processed"
1032 );
1033 ProviderError::permanent(
1034 "ack_worker",
1035 "Worker queue item not found or already processed",
1036 )
1037 } else {
1038 Self::sqlx_to_provider_error("ack_worker", e)
1039 }
1040 })?;
1041
1042 let duration_ms = start.elapsed().as_millis() as u64;
1043 debug!(
1044 target = "duroxide::providers::postgres",
1045 operation = "ack_worker",
1046 instance_id = %instance_id,
1047 duration_ms = duration_ms,
1048 "Acknowledged worker and enqueued completion"
1049 );
1050
1051 Ok(())
1052 }
1053
1054 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1055 async fn renew_work_item_lock(
1056 &self,
1057 token: &str,
1058 extend_for: Duration,
1059 ) -> Result<(), ProviderError> {
1060 let start = std::time::Instant::now();
1061
1062 let now_ms = Self::now_millis();
1064
1065 let extend_secs = extend_for.as_secs() as i64;
1067
1068 match sqlx::query(&format!(
1069 "SELECT {}.renew_work_item_lock($1, $2, $3)",
1070 self.schema_name
1071 ))
1072 .bind(token)
1073 .bind(now_ms)
1074 .bind(extend_secs)
1075 .execute(&*self.pool)
1076 .await
1077 {
1078 Ok(_) => {
1079 let duration_ms = start.elapsed().as_millis() as u64;
1080 debug!(
1081 target = "duroxide::providers::postgres",
1082 operation = "renew_work_item_lock",
1083 token = %token,
1084 extend_for_secs = extend_secs,
1085 duration_ms = duration_ms,
1086 "Work item lock renewed successfully"
1087 );
1088 Ok(())
1089 }
1090 Err(e) => {
1091 if let SqlxError::Database(db_err) = &e {
1092 if db_err.message().contains("Lock token invalid") {
1093 return Err(ProviderError::permanent(
1094 "renew_work_item_lock",
1095 "Lock token invalid, expired, or already acked",
1096 ));
1097 }
1098 } else if e.to_string().contains("Lock token invalid") {
1099 return Err(ProviderError::permanent(
1100 "renew_work_item_lock",
1101 "Lock token invalid, expired, or already acked",
1102 ));
1103 }
1104
1105 Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
1106 }
1107 }
1108 }
1109
1110 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1111 async fn abandon_work_item(
1112 &self,
1113 token: &str,
1114 delay: Option<Duration>,
1115 ignore_attempt: bool,
1116 ) -> Result<(), ProviderError> {
1117 let start = std::time::Instant::now();
1118 let now_ms = Self::now_millis();
1119 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
1120
1121 match sqlx::query(&format!(
1122 "SELECT {}.abandon_work_item($1, $2, $3, $4)",
1123 self.schema_name
1124 ))
1125 .bind(token)
1126 .bind(now_ms)
1127 .bind(delay_param)
1128 .bind(ignore_attempt)
1129 .execute(&*self.pool)
1130 .await
1131 {
1132 Ok(_) => {
1133 let duration_ms = start.elapsed().as_millis() as u64;
1134 debug!(
1135 target = "duroxide::providers::postgres",
1136 operation = "abandon_work_item",
1137 token = %token,
1138 delay_ms = delay.map(|d| d.as_millis() as u64),
1139 ignore_attempt = ignore_attempt,
1140 duration_ms = duration_ms,
1141 "Abandoned work item via stored procedure"
1142 );
1143 Ok(())
1144 }
1145 Err(e) => {
1146 if let SqlxError::Database(db_err) = &e {
1147 if db_err.message().contains("Invalid lock token")
1148 || db_err.message().contains("already acked")
1149 {
1150 return Err(ProviderError::permanent(
1151 "abandon_work_item",
1152 "Invalid lock token or already acked",
1153 ));
1154 }
1155 } else if e.to_string().contains("Invalid lock token")
1156 || e.to_string().contains("already acked")
1157 {
1158 return Err(ProviderError::permanent(
1159 "abandon_work_item",
1160 "Invalid lock token or already acked",
1161 ));
1162 }
1163
1164 Err(Self::sqlx_to_provider_error("abandon_work_item", e))
1165 }
1166 }
1167 }
1168
1169 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1170 async fn renew_orchestration_item_lock(
1171 &self,
1172 token: &str,
1173 extend_for: Duration,
1174 ) -> Result<(), ProviderError> {
1175 let start = std::time::Instant::now();
1176
1177 let now_ms = Self::now_millis();
1179
1180 let extend_secs = extend_for.as_secs() as i64;
1182
1183 match sqlx::query(&format!(
1184 "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
1185 self.schema_name
1186 ))
1187 .bind(token)
1188 .bind(now_ms)
1189 .bind(extend_secs)
1190 .execute(&*self.pool)
1191 .await
1192 {
1193 Ok(_) => {
1194 let duration_ms = start.elapsed().as_millis() as u64;
1195 debug!(
1196 target = "duroxide::providers::postgres",
1197 operation = "renew_orchestration_item_lock",
1198 token = %token,
1199 extend_for_secs = extend_secs,
1200 duration_ms = duration_ms,
1201 "Orchestration item lock renewed successfully"
1202 );
1203 Ok(())
1204 }
1205 Err(e) => {
1206 if let SqlxError::Database(db_err) = &e {
1207 if db_err.message().contains("Lock token invalid")
1208 || db_err.message().contains("expired")
1209 || db_err.message().contains("already released")
1210 {
1211 return Err(ProviderError::permanent(
1212 "renew_orchestration_item_lock",
1213 "Lock token invalid, expired, or already released",
1214 ));
1215 }
1216 } else if e.to_string().contains("Lock token invalid")
1217 || e.to_string().contains("expired")
1218 || e.to_string().contains("already released")
1219 {
1220 return Err(ProviderError::permanent(
1221 "renew_orchestration_item_lock",
1222 "Lock token invalid, expired, or already released",
1223 ));
1224 }
1225
1226 Err(Self::sqlx_to_provider_error(
1227 "renew_orchestration_item_lock",
1228 e,
1229 ))
1230 }
1231 }
1232 }
1233
1234 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1235 async fn enqueue_for_orchestrator(
1236 &self,
1237 item: WorkItem,
1238 delay: Option<Duration>,
1239 ) -> Result<(), ProviderError> {
1240 let work_item = serde_json::to_string(&item).map_err(|e| {
1241 ProviderError::permanent(
1242 "enqueue_orchestrator_work",
1243 format!("Failed to serialize work item: {e}"),
1244 )
1245 })?;
1246
1247 let instance_id = match &item {
1249 WorkItem::StartOrchestration { instance, .. }
1250 | WorkItem::ActivityCompleted { instance, .. }
1251 | WorkItem::ActivityFailed { instance, .. }
1252 | WorkItem::TimerFired { instance, .. }
1253 | WorkItem::ExternalRaised { instance, .. }
1254 | WorkItem::CancelInstance { instance, .. }
1255 | WorkItem::ContinueAsNew { instance, .. }
1256 | WorkItem::QueueMessage { instance, .. } => instance,
1257 WorkItem::SubOrchCompleted {
1258 parent_instance, ..
1259 }
1260 | WorkItem::SubOrchFailed {
1261 parent_instance, ..
1262 } => parent_instance,
1263 WorkItem::ActivityExecute { .. } => {
1264 return Err(ProviderError::permanent(
1265 "enqueue_orchestrator_work",
1266 "ActivityExecute should go to worker queue, not orchestrator queue",
1267 ));
1268 }
1269 };
1270
1271 let now_ms = Self::now_millis();
1273
1274 let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
1275 if *fire_at_ms > 0 {
1276 if let Some(delay) = delay {
1278 std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
1279 } else {
1280 *fire_at_ms
1281 }
1282 } else {
1283 delay
1285 .map(|d| now_ms as u64 + d.as_millis() as u64)
1286 .unwrap_or(now_ms as u64)
1287 }
1288 } else {
1289 delay
1291 .map(|d| now_ms as u64 + d.as_millis() as u64)
1292 .unwrap_or(now_ms as u64)
1293 };
1294
1295 let visible_at = Utc
1296 .timestamp_millis_opt(visible_at_ms as i64)
1297 .single()
1298 .ok_or_else(|| {
1299 ProviderError::permanent(
1300 "enqueue_orchestrator_work",
1301 "Invalid visible_at timestamp",
1302 )
1303 })?;
1304
1305 sqlx::query(&format!(
1310 "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
1311 self.schema_name
1312 ))
1313 .bind(instance_id)
1314 .bind(&work_item)
1315 .bind(visible_at)
1316 .bind::<Option<String>>(None) .bind::<Option<String>>(None) .bind::<Option<i64>>(None) .execute(&*self.pool)
1320 .await
1321 .map_err(|e| {
1322 error!(
1323 target = "duroxide::providers::postgres",
1324 operation = "enqueue_orchestrator_work",
1325 error_type = "database_error",
1326 error = %e,
1327 instance_id = %instance_id,
1328 "Failed to enqueue orchestrator work"
1329 );
1330 Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1331 })?;
1332
1333 debug!(
1334 target = "duroxide::providers::postgres",
1335 operation = "enqueue_orchestrator_work",
1336 instance_id = %instance_id,
1337 delay_ms = delay.map(|d| d.as_millis() as u64),
1338 "Enqueued orchestrator work"
1339 );
1340
1341 Ok(())
1342 }
1343
1344 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1345 async fn read_with_execution(
1346 &self,
1347 instance: &str,
1348 execution_id: u64,
1349 ) -> Result<Vec<Event>, ProviderError> {
1350 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1351 "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1352 self.table_name("history")
1353 ))
1354 .bind(instance)
1355 .bind(execution_id as i64)
1356 .fetch_all(&*self.pool)
1357 .await
1358 .map_err(|e| Self::sqlx_to_provider_error("read_with_execution", e))?;
1359
1360 event_data_rows
1361 .into_iter()
1362 .map(|event_data| {
1363 serde_json::from_str::<Event>(&event_data).map_err(|e| {
1364 ProviderError::permanent(
1365 "read_with_execution",
1366 format!("Failed to deserialize event: {e}"),
1367 )
1368 })
1369 })
1370 .collect()
1371 }
1372
1373 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1374 async fn renew_session_lock(
1375 &self,
1376 owner_ids: &[&str],
1377 extend_for: Duration,
1378 idle_timeout: Duration,
1379 ) -> Result<usize, ProviderError> {
1380 if owner_ids.is_empty() {
1381 return Ok(0);
1382 }
1383
1384 let now_ms = Self::now_millis();
1385 let extend_ms = extend_for.as_millis() as i64;
1386 let idle_timeout_ms = idle_timeout.as_millis() as i64;
1387 let owner_ids_vec: Vec<&str> = owner_ids.to_vec();
1388
1389 let result = sqlx::query_scalar::<_, i64>(&format!(
1390 "SELECT {}.renew_session_lock($1, $2, $3, $4)",
1391 self.schema_name
1392 ))
1393 .bind(&owner_ids_vec)
1394 .bind(now_ms)
1395 .bind(extend_ms)
1396 .bind(idle_timeout_ms)
1397 .fetch_one(&*self.pool)
1398 .await
1399 .map_err(|e| Self::sqlx_to_provider_error("renew_session_lock", e))?;
1400
1401 debug!(
1402 target = "duroxide::providers::postgres",
1403 operation = "renew_session_lock",
1404 owner_count = owner_ids.len(),
1405 sessions_renewed = result,
1406 "Session locks renewed"
1407 );
1408
1409 Ok(result as usize)
1410 }
1411
1412 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1413 async fn cleanup_orphaned_sessions(
1414 &self,
1415 _idle_timeout: Duration,
1416 ) -> Result<usize, ProviderError> {
1417 let now_ms = Self::now_millis();
1418
1419 let result = sqlx::query_scalar::<_, i64>(&format!(
1420 "SELECT {}.cleanup_orphaned_sessions($1)",
1421 self.schema_name
1422 ))
1423 .bind(now_ms)
1424 .fetch_one(&*self.pool)
1425 .await
1426 .map_err(|e| Self::sqlx_to_provider_error("cleanup_orphaned_sessions", e))?;
1427
1428 debug!(
1429 target = "duroxide::providers::postgres",
1430 operation = "cleanup_orphaned_sessions",
1431 sessions_cleaned = result,
1432 "Orphaned sessions cleaned up"
1433 );
1434
1435 Ok(result as usize)
1436 }
1437
1438 fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1439 Some(self)
1440 }
1441
1442 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1443 async fn get_custom_status(
1444 &self,
1445 instance: &str,
1446 last_seen_version: u64,
1447 ) -> Result<Option<(Option<String>, u64)>, ProviderError> {
1448 let row = sqlx::query_as::<_, (Option<String>, i64)>(&format!(
1449 "SELECT * FROM {}.get_custom_status($1, $2)",
1450 self.schema_name
1451 ))
1452 .bind(instance)
1453 .bind(last_seen_version as i64)
1454 .fetch_optional(&*self.pool)
1455 .await
1456 .map_err(|e| Self::sqlx_to_provider_error("get_custom_status", e))?;
1457
1458 match row {
1459 Some((custom_status, version)) => Ok(Some((custom_status, version as u64))),
1460 None => Ok(None),
1461 }
1462 }
1463
1464 async fn get_kv_value(
1465 &self,
1466 instance_id: &str,
1467 key: &str,
1468 ) -> Result<Option<String>, ProviderError> {
1469 let row: Option<(Option<String>, bool)> = sqlx::query_as(&format!(
1470 "SELECT * FROM {}.get_kv_value($1, $2)",
1471 self.schema_name
1472 ))
1473 .bind(instance_id)
1474 .bind(key)
1475 .fetch_optional(&*self.pool)
1476 .await
1477 .map_err(|e| Self::sqlx_to_provider_error("get_kv_value", e))?;
1478
1479 Ok(row.and_then(|(value, found)| if found { value } else { None }))
1480 }
1481
1482 async fn get_kv_all_values(
1483 &self,
1484 instance_id: &str,
1485 ) -> Result<std::collections::HashMap<String, String>, ProviderError> {
1486 let rows: Vec<(String, String)> = sqlx::query_as(&format!(
1487 "SELECT * FROM {}.get_kv_all_values($1)",
1488 self.schema_name
1489 ))
1490 .bind(instance_id)
1491 .fetch_all(&*self.pool)
1492 .await
1493 .map_err(|e| Self::sqlx_to_provider_error("get_kv_all_values", e))?;
1494
1495 Ok(rows.into_iter().collect())
1496 }
1497
1498 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1499 async fn get_instance_stats(&self, instance: &str) -> Result<Option<SystemStats>, ProviderError> {
1500 let row: Option<(bool, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1501 "SELECT * FROM {}.get_instance_stats($1)",
1502 self.schema_name
1503 ))
1504 .bind(instance)
1505 .fetch_optional(&*self.pool)
1506 .await
1507 .map_err(|e| Self::sqlx_to_provider_error("get_instance_stats", e))?;
1508
1509 match row {
1510 Some((true, history_event_count, history_size_bytes, queue_pending_count, kv_user_key_count, kv_total_value_bytes)) => {
1511 Ok(Some(SystemStats {
1512 history_event_count: history_event_count as u64,
1513 history_size_bytes: history_size_bytes as u64,
1514 queue_pending_count: queue_pending_count as u64,
1515 kv_user_key_count: kv_user_key_count as u64,
1516 kv_total_value_bytes: kv_total_value_bytes as u64,
1517 }))
1518 }
1519 _ => Ok(None),
1520 }
1521 }
1522}
1523
1524#[async_trait::async_trait]
1525impl ProviderAdmin for PostgresProvider {
1526 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1527 async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1528 sqlx::query_scalar(&format!(
1529 "SELECT instance_id FROM {}.list_instances()",
1530 self.schema_name
1531 ))
1532 .fetch_all(&*self.pool)
1533 .await
1534 .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1535 }
1536
1537 #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1538 async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1539 sqlx::query_scalar(&format!(
1540 "SELECT instance_id FROM {}.list_instances_by_status($1)",
1541 self.schema_name
1542 ))
1543 .bind(status)
1544 .fetch_all(&*self.pool)
1545 .await
1546 .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1547 }
1548
1549 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1550 async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1551 let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1552 "SELECT execution_id FROM {}.list_executions($1)",
1553 self.schema_name
1554 ))
1555 .bind(instance)
1556 .fetch_all(&*self.pool)
1557 .await
1558 .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1559
1560 Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1561 }
1562
1563 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1564 async fn read_history_with_execution_id(
1565 &self,
1566 instance: &str,
1567 execution_id: u64,
1568 ) -> Result<Vec<Event>, ProviderError> {
1569 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1570 "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1571 self.schema_name
1572 ))
1573 .bind(instance)
1574 .bind(execution_id as i64)
1575 .fetch_all(&*self.pool)
1576 .await
1577 .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1578
1579 event_data_rows
1580 .into_iter()
1581 .map(|event_data| {
1582 serde_json::from_str::<Event>(&event_data).map_err(|e| {
1583 ProviderError::permanent(
1584 "read_history_with_execution_id",
1585 format!("Failed to deserialize event: {e}"),
1586 )
1587 })
1588 })
1589 .collect()
1590 }
1591
1592 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1593 async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1594 let execution_id = self.latest_execution_id(instance).await?;
1595 self.read_history_with_execution_id(instance, execution_id)
1596 .await
1597 }
1598
1599 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1600 async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1601 sqlx::query_scalar(&format!(
1602 "SELECT {}.latest_execution_id($1)",
1603 self.schema_name
1604 ))
1605 .bind(instance)
1606 .fetch_optional(&*self.pool)
1607 .await
1608 .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1609 .map(|id: i64| id as u64)
1610 .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1611 }
1612
1613 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1614 async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1615 let row: Option<(
1616 String,
1617 String,
1618 String,
1619 i64,
1620 chrono::DateTime<Utc>,
1621 Option<chrono::DateTime<Utc>>,
1622 Option<String>,
1623 Option<String>,
1624 Option<String>,
1625 )> = sqlx::query_as(&format!(
1626 "SELECT * FROM {}.get_instance_info($1)",
1627 self.schema_name
1628 ))
1629 .bind(instance)
1630 .fetch_optional(&*self.pool)
1631 .await
1632 .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1633
1634 let (
1635 instance_id,
1636 orchestration_name,
1637 orchestration_version,
1638 current_execution_id,
1639 created_at,
1640 updated_at,
1641 status,
1642 output,
1643 parent_instance_id,
1644 ) =
1645 row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1646
1647 Ok(InstanceInfo {
1648 instance_id,
1649 orchestration_name,
1650 orchestration_version,
1651 current_execution_id: current_execution_id as u64,
1652 status: status.unwrap_or_else(|| "Running".to_string()),
1653 output,
1654 created_at: created_at.timestamp_millis() as u64,
1655 updated_at: updated_at
1656 .map(|dt| dt.timestamp_millis() as u64)
1657 .unwrap_or(created_at.timestamp_millis() as u64),
1658 parent_instance_id,
1659 })
1660 }
1661
1662 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1663 async fn get_execution_info(
1664 &self,
1665 instance: &str,
1666 execution_id: u64,
1667 ) -> Result<ExecutionInfo, ProviderError> {
1668 let row: Option<(
1669 i64,
1670 String,
1671 Option<String>,
1672 chrono::DateTime<Utc>,
1673 Option<chrono::DateTime<Utc>>,
1674 i64,
1675 )> = sqlx::query_as(&format!(
1676 "SELECT * FROM {}.get_execution_info($1, $2)",
1677 self.schema_name
1678 ))
1679 .bind(instance)
1680 .bind(execution_id as i64)
1681 .fetch_optional(&*self.pool)
1682 .await
1683 .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1684
1685 let (exec_id, status, output, started_at, completed_at, event_count) = row
1686 .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1687
1688 Ok(ExecutionInfo {
1689 execution_id: exec_id as u64,
1690 status,
1691 output,
1692 started_at: started_at.timestamp_millis() as u64,
1693 completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1694 event_count: event_count as usize,
1695 })
1696 }
1697
1698 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1699 async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1700 let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1701 "SELECT * FROM {}.get_system_metrics()",
1702 self.schema_name
1703 ))
1704 .fetch_optional(&*self.pool)
1705 .await
1706 .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1707
1708 let (
1709 total_instances,
1710 total_executions,
1711 running_instances,
1712 completed_instances,
1713 failed_instances,
1714 total_events,
1715 ) = row.ok_or_else(|| {
1716 ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1717 })?;
1718
1719 Ok(SystemMetrics {
1720 total_instances: total_instances as u64,
1721 total_executions: total_executions as u64,
1722 running_instances: running_instances as u64,
1723 completed_instances: completed_instances as u64,
1724 failed_instances: failed_instances as u64,
1725 total_events: total_events as u64,
1726 })
1727 }
1728
1729 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1730 async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1731 let now_ms = Self::now_millis();
1732
1733 let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1734 "SELECT * FROM {}.get_queue_depths($1)",
1735 self.schema_name
1736 ))
1737 .bind(now_ms)
1738 .fetch_optional(&*self.pool)
1739 .await
1740 .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1741
1742 let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1743 ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1744 })?;
1745
1746 Ok(QueueDepths {
1747 orchestrator_queue: orchestrator_queue as usize,
1748 worker_queue: worker_queue as usize,
1749 timer_queue: 0, })
1751 }
1752
1753 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1756 async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError> {
1757 sqlx::query_scalar(&format!(
1758 "SELECT child_instance_id FROM {}.list_children($1)",
1759 self.schema_name
1760 ))
1761 .bind(instance_id)
1762 .fetch_all(&*self.pool)
1763 .await
1764 .map_err(|e| Self::sqlx_to_provider_error("list_children", e))
1765 }
1766
1767 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1768 async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError> {
1769 let result: Result<Option<String>, _> =
1772 sqlx::query_scalar(&format!("SELECT {}.get_parent_id($1)", self.schema_name))
1773 .bind(instance_id)
1774 .fetch_one(&*self.pool)
1775 .await;
1776
1777 match result {
1778 Ok(parent_id) => Ok(parent_id),
1779 Err(e) => {
1780 let err_str = e.to_string();
1781 if err_str.contains("Instance not found") {
1782 Err(ProviderError::permanent(
1783 "get_parent_id",
1784 format!("Instance not found: {}", instance_id),
1785 ))
1786 } else {
1787 Err(Self::sqlx_to_provider_error("get_parent_id", e))
1788 }
1789 }
1790 }
1791 }
1792
1793 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1796 async fn delete_instances_atomic(
1797 &self,
1798 ids: &[String],
1799 force: bool,
1800 ) -> Result<DeleteInstanceResult, ProviderError> {
1801 if ids.is_empty() {
1802 return Ok(DeleteInstanceResult::default());
1803 }
1804
1805 let row: Option<(i64, i64, i64, i64)> = sqlx::query_as(&format!(
1806 "SELECT * FROM {}.delete_instances_atomic($1, $2)",
1807 self.schema_name
1808 ))
1809 .bind(ids)
1810 .bind(force)
1811 .fetch_optional(&*self.pool)
1812 .await
1813 .map_err(|e| {
1814 let err_str = e.to_string();
1815 if err_str.contains("is Running") {
1816 ProviderError::permanent("delete_instances_atomic", err_str)
1817 } else if err_str.contains("Orphan detected") {
1818 ProviderError::permanent("delete_instances_atomic", err_str)
1819 } else {
1820 Self::sqlx_to_provider_error("delete_instances_atomic", e)
1821 }
1822 })?;
1823
1824 let (instances_deleted, executions_deleted, events_deleted, queue_messages_deleted) =
1825 row.unwrap_or((0, 0, 0, 0));
1826
1827 debug!(
1828 target = "duroxide::providers::postgres",
1829 operation = "delete_instances_atomic",
1830 instances_deleted = instances_deleted,
1831 executions_deleted = executions_deleted,
1832 events_deleted = events_deleted,
1833 queue_messages_deleted = queue_messages_deleted,
1834 "Deleted instances atomically"
1835 );
1836
1837 Ok(DeleteInstanceResult {
1838 instances_deleted: instances_deleted as u64,
1839 executions_deleted: executions_deleted as u64,
1840 events_deleted: events_deleted as u64,
1841 queue_messages_deleted: queue_messages_deleted as u64,
1842 })
1843 }
1844
1845 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1846 async fn delete_instance_bulk(
1847 &self,
1848 filter: InstanceFilter,
1849 ) -> Result<DeleteInstanceResult, ProviderError> {
1850 let mut sql = format!(
1852 r#"
1853 SELECT i.instance_id
1854 FROM {}.instances i
1855 LEFT JOIN {}.executions e ON i.instance_id = e.instance_id
1856 AND i.current_execution_id = e.execution_id
1857 WHERE i.parent_instance_id IS NULL
1858 AND e.status IN ('Completed', 'Failed', 'ContinuedAsNew')
1859 "#,
1860 self.schema_name, self.schema_name
1861 );
1862
1863 if let Some(ref ids) = filter.instance_ids {
1865 if ids.is_empty() {
1866 return Ok(DeleteInstanceResult::default());
1867 }
1868 let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1869 sql.push_str(&format!(
1870 " AND i.instance_id IN ({})",
1871 placeholders.join(", ")
1872 ));
1873 }
1874
1875 if filter.completed_before.is_some() {
1877 let param_num = filter
1878 .instance_ids
1879 .as_ref()
1880 .map(|ids| ids.len())
1881 .unwrap_or(0)
1882 + 1;
1883 sql.push_str(&format!(
1884 " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1885 param_num
1886 ));
1887 }
1888
1889 let limit = filter.limit.unwrap_or(1000);
1891 let limit_param_num = filter
1892 .instance_ids
1893 .as_ref()
1894 .map(|ids| ids.len())
1895 .unwrap_or(0)
1896 + if filter.completed_before.is_some() {
1897 1
1898 } else {
1899 0
1900 }
1901 + 1;
1902 sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1903
1904 let mut query = sqlx::query_scalar::<_, String>(&sql);
1906 if let Some(ref ids) = filter.instance_ids {
1907 for id in ids {
1908 query = query.bind(id);
1909 }
1910 }
1911 if let Some(completed_before) = filter.completed_before {
1912 query = query.bind(completed_before as i64);
1913 }
1914 query = query.bind(limit as i64);
1915
1916 let instance_ids: Vec<String> = query
1917 .fetch_all(&*self.pool)
1918 .await
1919 .map_err(|e| Self::sqlx_to_provider_error("delete_instance_bulk", e))?;
1920
1921 if instance_ids.is_empty() {
1922 return Ok(DeleteInstanceResult::default());
1923 }
1924
1925 let mut result = DeleteInstanceResult::default();
1927
1928 for instance_id in &instance_ids {
1929 let tree = self.get_instance_tree(instance_id).await?;
1931
1932 let delete_result = self.delete_instances_atomic(&tree.all_ids, true).await?;
1934 result.instances_deleted += delete_result.instances_deleted;
1935 result.executions_deleted += delete_result.executions_deleted;
1936 result.events_deleted += delete_result.events_deleted;
1937 result.queue_messages_deleted += delete_result.queue_messages_deleted;
1938 }
1939
1940 debug!(
1941 target = "duroxide::providers::postgres",
1942 operation = "delete_instance_bulk",
1943 instances_deleted = result.instances_deleted,
1944 executions_deleted = result.executions_deleted,
1945 events_deleted = result.events_deleted,
1946 queue_messages_deleted = result.queue_messages_deleted,
1947 "Bulk deleted instances"
1948 );
1949
1950 Ok(result)
1951 }
1952
1953 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1956 async fn prune_executions(
1957 &self,
1958 instance_id: &str,
1959 options: PruneOptions,
1960 ) -> Result<PruneResult, ProviderError> {
1961 let keep_last: Option<i32> = options.keep_last.map(|v| v as i32);
1962 let completed_before_ms: Option<i64> = options.completed_before.map(|v| v as i64);
1963
1964 let row: Option<(i64, i64, i64)> = sqlx::query_as(&format!(
1965 "SELECT * FROM {}.prune_executions($1, $2, $3)",
1966 self.schema_name
1967 ))
1968 .bind(instance_id)
1969 .bind(keep_last)
1970 .bind(completed_before_ms)
1971 .fetch_optional(&*self.pool)
1972 .await
1973 .map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?;
1974
1975 let (instances_processed, executions_deleted, events_deleted) = row.unwrap_or((0, 0, 0));
1976
1977 debug!(
1978 target = "duroxide::providers::postgres",
1979 operation = "prune_executions",
1980 instance_id = %instance_id,
1981 instances_processed = instances_processed,
1982 executions_deleted = executions_deleted,
1983 events_deleted = events_deleted,
1984 "Pruned executions"
1985 );
1986
1987 Ok(PruneResult {
1988 instances_processed: instances_processed as u64,
1989 executions_deleted: executions_deleted as u64,
1990 events_deleted: events_deleted as u64,
1991 })
1992 }
1993
1994 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1995 async fn prune_executions_bulk(
1996 &self,
1997 filter: InstanceFilter,
1998 options: PruneOptions,
1999 ) -> Result<PruneResult, ProviderError> {
2000 let mut sql = format!(
2005 r#"
2006 SELECT i.instance_id
2007 FROM {}.instances i
2008 LEFT JOIN {}.executions e ON i.instance_id = e.instance_id
2009 AND i.current_execution_id = e.execution_id
2010 WHERE 1=1
2011 "#,
2012 self.schema_name, self.schema_name
2013 );
2014
2015 if let Some(ref ids) = filter.instance_ids {
2017 if ids.is_empty() {
2018 return Ok(PruneResult::default());
2019 }
2020 let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
2021 sql.push_str(&format!(
2022 " AND i.instance_id IN ({})",
2023 placeholders.join(", ")
2024 ));
2025 }
2026
2027 if filter.completed_before.is_some() {
2029 let param_num = filter
2030 .instance_ids
2031 .as_ref()
2032 .map(|ids| ids.len())
2033 .unwrap_or(0)
2034 + 1;
2035 sql.push_str(&format!(
2036 " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
2037 param_num
2038 ));
2039 }
2040
2041 let limit = filter.limit.unwrap_or(1000);
2043 let limit_param_num = filter
2044 .instance_ids
2045 .as_ref()
2046 .map(|ids| ids.len())
2047 .unwrap_or(0)
2048 + if filter.completed_before.is_some() {
2049 1
2050 } else {
2051 0
2052 }
2053 + 1;
2054 sql.push_str(&format!(" LIMIT ${}", limit_param_num));
2055
2056 let mut query = sqlx::query_scalar::<_, String>(&sql);
2058 if let Some(ref ids) = filter.instance_ids {
2059 for id in ids {
2060 query = query.bind(id);
2061 }
2062 }
2063 if let Some(completed_before) = filter.completed_before {
2064 query = query.bind(completed_before as i64);
2065 }
2066 query = query.bind(limit as i64);
2067
2068 let instance_ids: Vec<String> = query
2069 .fetch_all(&*self.pool)
2070 .await
2071 .map_err(|e| Self::sqlx_to_provider_error("prune_executions_bulk", e))?;
2072
2073 let mut result = PruneResult::default();
2075
2076 for instance_id in &instance_ids {
2077 let single_result = self.prune_executions(instance_id, options.clone()).await?;
2078 result.instances_processed += single_result.instances_processed;
2079 result.executions_deleted += single_result.executions_deleted;
2080 result.events_deleted += single_result.events_deleted;
2081 }
2082
2083 debug!(
2084 target = "duroxide::providers::postgres",
2085 operation = "prune_executions_bulk",
2086 instances_processed = result.instances_processed,
2087 executions_deleted = result.executions_deleted,
2088 events_deleted = result.events_deleted,
2089 "Bulk pruned executions"
2090 );
2091
2092 Ok(result)
2093 }
2094}