1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4 DeleteInstanceResult, DispatcherCapabilityFilter, ExecutionInfo, ExecutionMetadata,
5 InstanceFilter, InstanceInfo, OrchestrationItem, Provider, ProviderAdmin, ProviderError,
6 PruneOptions, PruneResult, QueueDepths, ScheduledActivityIdentifier, SessionFetchConfig,
7 SystemMetrics, TagFilter, WorkItem,
8};
9use duroxide::{Event, EventKind};
10use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
11use std::sync::Arc;
12use std::time::Duration;
13use std::time::{SystemTime, UNIX_EPOCH};
14use tokio::time::sleep;
15use tracing::{debug, error, instrument, warn};
16
17use crate::migrations::MigrationRunner;
18
19pub struct PostgresProvider {
42 pool: Arc<PgPool>,
43 schema_name: String,
44}
45
46impl PostgresProvider {
47 pub async fn new(database_url: &str) -> Result<Self> {
48 Self::new_with_schema(database_url, None).await
49 }
50
51 pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
52 let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
53 .ok()
54 .and_then(|s| s.parse::<u32>().ok())
55 .unwrap_or(10);
56
57 let pool = PgPoolOptions::new()
58 .max_connections(max_connections)
59 .min_connections(1)
60 .acquire_timeout(std::time::Duration::from_secs(30))
61 .connect(database_url)
62 .await?;
63
64 let schema_name = schema_name.unwrap_or("public").to_string();
65
66 let provider = Self {
67 pool: Arc::new(pool),
68 schema_name: schema_name.clone(),
69 };
70
71 let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
73 migration_runner.migrate().await?;
74
75 Ok(provider)
76 }
77
78 #[instrument(skip(self), target = "duroxide::providers::postgres")]
79 pub async fn initialize_schema(&self) -> Result<()> {
80 let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
83 migration_runner.migrate().await?;
84 Ok(())
85 }
86
87 fn now_millis() -> i64 {
89 SystemTime::now()
90 .duration_since(UNIX_EPOCH)
91 .unwrap()
92 .as_millis() as i64
93 }
94
95 fn table_name(&self, table: &str) -> String {
97 format!("{}.{}", self.schema_name, table)
98 }
99
100 pub fn pool(&self) -> &PgPool {
102 &self.pool
103 }
104
105 pub fn schema_name(&self) -> &str {
107 &self.schema_name
108 }
109
110 fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
112 match e {
113 SqlxError::Database(ref db_err) => {
114 let code_opt = db_err.code();
116 let code = code_opt.as_deref();
117 if code == Some("40P01") {
118 ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
120 } else if code == Some("40001") {
121 ProviderError::permanent(operation, format!("Serialization failure: {e}"))
123 } else if code == Some("23505") {
124 ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
126 } else if code == Some("23503") {
127 ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
129 } else {
130 ProviderError::permanent(operation, format!("Database error: {e}"))
131 }
132 }
133 SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
134 ProviderError::retryable(operation, format!("Connection pool error: {e}"))
135 }
136 SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
137 _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
138 }
139 }
140
141 fn tag_filter_to_sql(filter: &TagFilter) -> (&'static str, Vec<String>) {
143 match filter {
144 TagFilter::DefaultOnly => ("default_only", vec![]),
145 TagFilter::Tags(set) => {
146 let mut tags: Vec<String> = set.iter().cloned().collect();
147 tags.sort();
148 ("tags", tags)
149 }
150 TagFilter::DefaultAnd(set) => {
151 let mut tags: Vec<String> = set.iter().cloned().collect();
152 tags.sort();
153 ("default_and", tags)
154 }
155 TagFilter::Any => ("any", vec![]),
156 TagFilter::None => ("none", vec![]),
157 }
158 }
159
160 pub async fn cleanup_schema(&self) -> Result<()> {
165 const MAX_RETRIES: u32 = 5;
166 const BASE_RETRY_DELAY_MS: u64 = 50;
167
168 for attempt in 0..=MAX_RETRIES {
169 let cleanup_result = async {
170 sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
172 .execute(&*self.pool)
173 .await?;
174
175 if self.schema_name != "public" {
178 sqlx::query(&format!(
179 "DROP SCHEMA IF EXISTS {} CASCADE",
180 self.schema_name
181 ))
182 .execute(&*self.pool)
183 .await?;
184 } else {
185 }
188
189 Ok::<(), SqlxError>(())
190 }
191 .await;
192
193 match cleanup_result {
194 Ok(()) => return Ok(()),
195 Err(SqlxError::Database(db_err)) if db_err.code().as_deref() == Some("40P01") => {
196 if attempt < MAX_RETRIES {
197 warn!(
198 target = "duroxide::providers::postgres",
199 schema = %self.schema_name,
200 attempt = attempt + 1,
201 "Deadlock during cleanup_schema, retrying"
202 );
203 sleep(Duration::from_millis(
204 BASE_RETRY_DELAY_MS * (attempt as u64 + 1),
205 ))
206 .await;
207 continue;
208 }
209 return Err(anyhow::anyhow!(db_err.to_string()));
210 }
211 Err(e) => return Err(anyhow::anyhow!(e.to_string())),
212 }
213 }
214
215 Ok(())
216 }
217}
218
219#[async_trait::async_trait]
220impl Provider for PostgresProvider {
221 fn name(&self) -> &str {
222 "duroxide-pg"
223 }
224
225 fn version(&self) -> &str {
226 env!("CARGO_PKG_VERSION")
227 }
228
229 #[instrument(skip(self), target = "duroxide::providers::postgres")]
230 async fn fetch_orchestration_item(
231 &self,
232 lock_timeout: Duration,
233 _poll_timeout: Duration,
234 filter: Option<&DispatcherCapabilityFilter>,
235 ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
236 let start = std::time::Instant::now();
237
238 const MAX_RETRIES: u32 = 3;
239 const RETRY_DELAY_MS: u64 = 50;
240
241 let lock_timeout_ms = lock_timeout.as_millis() as i64;
243 let mut _last_error: Option<ProviderError> = None;
244
245 let (min_packed, max_packed) = if let Some(f) = filter {
247 if let Some(range) = f.supported_duroxide_versions.first() {
248 let min = range.min.major as i64 * 1_000_000
249 + range.min.minor as i64 * 1_000
250 + range.min.patch as i64;
251 let max = range.max.major as i64 * 1_000_000
252 + range.max.minor as i64 * 1_000
253 + range.max.patch as i64;
254 (Some(min), Some(max))
255 } else {
256 return Ok(None);
258 }
259 } else {
260 (None, None)
261 };
262
263 for attempt in 0..=MAX_RETRIES {
264 let now_ms = Self::now_millis();
265
266 let result: Result<
267 Option<(
268 String,
269 String,
270 String,
271 i64,
272 serde_json::Value,
273 serde_json::Value,
274 String,
275 i32,
276 serde_json::Value,
277 )>,
278 SqlxError,
279 > = sqlx::query_as(&format!(
280 "SELECT * FROM {}.fetch_orchestration_item($1, $2, $3, $4)",
281 self.schema_name
282 ))
283 .bind(now_ms)
284 .bind(lock_timeout_ms)
285 .bind(min_packed)
286 .bind(max_packed)
287 .fetch_optional(&*self.pool)
288 .await;
289
290 let row = match result {
291 Ok(r) => r,
292 Err(e) => {
293 let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
294 if provider_err.is_retryable() && attempt < MAX_RETRIES {
295 warn!(
296 target = "duroxide::providers::postgres",
297 operation = "fetch_orchestration_item",
298 attempt = attempt + 1,
299 error = %provider_err,
300 "Retryable error, will retry"
301 );
302 _last_error = Some(provider_err);
303 sleep(std::time::Duration::from_millis(
304 RETRY_DELAY_MS * (attempt as u64 + 1),
305 ))
306 .await;
307 continue;
308 }
309 return Err(provider_err);
310 }
311 };
312
313 if let Some((
314 instance_id,
315 orchestration_name,
316 orchestration_version,
317 execution_id,
318 history_json,
319 messages_json,
320 lock_token,
321 attempt_count,
322 kv_snapshot_json,
323 )) = row
324 {
325 let (history, history_error) =
326 match serde_json::from_value::<Vec<Event>>(history_json) {
327 Ok(h) => (h, None),
328 Err(e) => {
329 let error_msg = format!("Failed to deserialize history: {e}");
330 warn!(
331 target = "duroxide::providers::postgres",
332 instance = %instance_id,
333 error = %error_msg,
334 "History deserialization failed, returning item with history_error"
335 );
336 (vec![], Some(error_msg))
337 }
338 };
339
340 let messages: Vec<WorkItem> =
341 serde_json::from_value(messages_json).map_err(|e| {
342 ProviderError::permanent(
343 "fetch_orchestration_item",
344 format!("Failed to deserialize messages: {e}"),
345 )
346 })?;
347 let kv_snapshot: std::collections::HashMap<String, duroxide::providers::KvEntry> = {
348 let raw: std::collections::HashMap<String, serde_json::Value> =
349 serde_json::from_value(kv_snapshot_json).unwrap_or_default();
350 raw.into_iter()
351 .filter_map(|(k, v)| {
352 let value = v.get("value")?.as_str()?.to_string();
353 let last_updated_at_ms =
354 v.get("last_updated_at_ms")?.as_u64().unwrap_or(0);
355 Some((
356 k,
357 duroxide::providers::KvEntry {
358 value,
359 last_updated_at_ms,
360 },
361 ))
362 })
363 .collect()
364 };
365
366 let duration_ms = start.elapsed().as_millis() as u64;
367 debug!(
368 target = "duroxide::providers::postgres",
369 operation = "fetch_orchestration_item",
370 instance_id = %instance_id,
371 execution_id = execution_id,
372 message_count = messages.len(),
373 history_count = history.len(),
374 attempt_count = attempt_count,
375 duration_ms = duration_ms,
376 attempts = attempt + 1,
377 "Fetched orchestration item via stored procedure"
378 );
379
380 if orchestration_name == "Unknown"
386 && history.is_empty()
387 && messages
388 .iter()
389 .all(|m| matches!(m, WorkItem::QueueMessage { .. }))
390 {
391 let message_count = messages.len();
392 tracing::warn!(
393 target = "duroxide::providers::postgres",
394 instance = %instance_id,
395 message_count,
396 "Dropping orphan queue messages — events enqueued before orchestration started are not supported"
397 );
398 self.ack_orchestration_item(
399 &lock_token,
400 execution_id as u64,
401 vec![],
402 vec![],
403 vec![],
404 ExecutionMetadata::default(),
405 vec![],
406 )
407 .await?;
408 return Ok(None);
409 }
410
411 return Ok(Some((
412 OrchestrationItem {
413 instance: instance_id,
414 orchestration_name,
415 execution_id: execution_id as u64,
416 version: orchestration_version,
417 history,
418 messages,
419 history_error,
420 kv_snapshot,
421 },
422 lock_token,
423 attempt_count as u32,
424 )));
425 }
426
427 return Ok(None);
430 }
431
432 Ok(None)
433 }
434 #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
435 async fn ack_orchestration_item(
436 &self,
437 lock_token: &str,
438 execution_id: u64,
439 history_delta: Vec<Event>,
440 worker_items: Vec<WorkItem>,
441 orchestrator_items: Vec<WorkItem>,
442 metadata: ExecutionMetadata,
443 cancelled_activities: Vec<ScheduledActivityIdentifier>,
444 ) -> Result<(), ProviderError> {
445 let start = std::time::Instant::now();
446
447 const MAX_RETRIES: u32 = 3;
448 const RETRY_DELAY_MS: u64 = 50;
449
450 let mut history_delta_payload = Vec::with_capacity(history_delta.len());
451 for event in &history_delta {
452 if event.event_id() == 0 {
453 return Err(ProviderError::permanent(
454 "ack_orchestration_item",
455 "event_id must be set by runtime",
456 ));
457 }
458
459 let event_json = serde_json::to_string(event).map_err(|e| {
460 ProviderError::permanent(
461 "ack_orchestration_item",
462 format!("Failed to serialize event: {e}"),
463 )
464 })?;
465
466 let event_type = format!("{event:?}")
467 .split('{')
468 .next()
469 .unwrap_or("Unknown")
470 .trim()
471 .to_string();
472
473 history_delta_payload.push(serde_json::json!({
474 "event_id": event.event_id(),
475 "event_type": event_type,
476 "event_data": event_json,
477 }));
478 }
479
480 let history_delta_json = serde_json::Value::Array(history_delta_payload);
481
482 let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
483 ProviderError::permanent(
484 "ack_orchestration_item",
485 format!("Failed to serialize worker items: {e}"),
486 )
487 })?;
488
489 let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
490 ProviderError::permanent(
491 "ack_orchestration_item",
492 format!("Failed to serialize orchestrator items: {e}"),
493 )
494 })?;
495
496 let (custom_status_action, custom_status_value): (Option<&str>, Option<&str>) = {
498 let mut last_status: Option<&Option<String>> = None;
499 for event in &history_delta {
500 if let EventKind::CustomStatusUpdated { ref status } = event.kind {
501 last_status = Some(status);
502 }
503 }
504 match last_status {
505 Some(Some(s)) => (Some("set"), Some(s.as_str())),
506 Some(None) => (Some("clear"), None),
507 None => (None, None),
508 }
509 };
510
511 let kv_mutations: Vec<serde_json::Value> = history_delta
512 .iter()
513 .filter_map(|event| match &event.kind {
514 EventKind::KeyValueSet {
515 key,
516 value,
517 last_updated_at_ms,
518 } => Some(serde_json::json!({
519 "action": "set",
520 "key": key,
521 "value": value,
522 "last_updated_at_ms": last_updated_at_ms,
523 })),
524 EventKind::KeyValueCleared { key } => Some(serde_json::json!({
525 "action": "clear_key",
526 "key": key,
527 })),
528 EventKind::KeyValuesCleared => Some(serde_json::json!({
529 "action": "clear_all",
530 })),
531 _ => None,
532 })
533 .collect();
534
535 let metadata_json = serde_json::json!({
536 "orchestration_name": metadata.orchestration_name,
537 "orchestration_version": metadata.orchestration_version,
538 "status": metadata.status,
539 "output": metadata.output,
540 "parent_instance_id": metadata.parent_instance_id,
541 "pinned_duroxide_version": metadata.pinned_duroxide_version.as_ref().map(|v| {
542 serde_json::json!({
543 "major": v.major,
544 "minor": v.minor,
545 "patch": v.patch,
546 })
547 }),
548 "custom_status_action": custom_status_action,
549 "custom_status_value": custom_status_value,
550 "kv_mutations": kv_mutations,
551 });
552
553 let cancelled_activities_json: Vec<serde_json::Value> = cancelled_activities
555 .iter()
556 .map(|a| {
557 serde_json::json!({
558 "instance": a.instance,
559 "execution_id": a.execution_id,
560 "activity_id": a.activity_id,
561 })
562 })
563 .collect();
564 let cancelled_activities_json = serde_json::Value::Array(cancelled_activities_json);
565
566 for attempt in 0..=MAX_RETRIES {
567 let now_ms = Self::now_millis();
568 let result = sqlx::query(&format!(
569 "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6, $7, $8)",
570 self.schema_name
571 ))
572 .bind(lock_token)
573 .bind(now_ms)
574 .bind(execution_id as i64)
575 .bind(&history_delta_json)
576 .bind(&worker_items_json)
577 .bind(&orchestrator_items_json)
578 .bind(&metadata_json)
579 .bind(&cancelled_activities_json)
580 .execute(&*self.pool)
581 .await;
582
583 match result {
584 Ok(_) => {
585 let duration_ms = start.elapsed().as_millis() as u64;
586 debug!(
587 target = "duroxide::providers::postgres",
588 operation = "ack_orchestration_item",
589 execution_id = execution_id,
590 history_count = history_delta.len(),
591 worker_items_count = worker_items.len(),
592 orchestrator_items_count = orchestrator_items.len(),
593 cancelled_activities_count = cancelled_activities.len(),
594 duration_ms = duration_ms,
595 attempts = attempt + 1,
596 "Acknowledged orchestration item via stored procedure"
597 );
598 return Ok(());
599 }
600 Err(e) => {
601 if let SqlxError::Database(db_err) = &e {
603 if db_err.message().contains("Invalid lock token") {
604 return Err(ProviderError::permanent(
605 "ack_orchestration_item",
606 "Invalid lock token",
607 ));
608 }
609 } else if e.to_string().contains("Invalid lock token") {
610 return Err(ProviderError::permanent(
611 "ack_orchestration_item",
612 "Invalid lock token",
613 ));
614 }
615
616 let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
617 if provider_err.is_retryable() && attempt < MAX_RETRIES {
618 warn!(
619 target = "duroxide::providers::postgres",
620 operation = "ack_orchestration_item",
621 attempt = attempt + 1,
622 error = %provider_err,
623 "Retryable error, will retry"
624 );
625 sleep(std::time::Duration::from_millis(
626 RETRY_DELAY_MS * (attempt as u64 + 1),
627 ))
628 .await;
629 continue;
630 }
631 return Err(provider_err);
632 }
633 }
634 }
635
636 Ok(())
638 }
639 #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
640 async fn abandon_orchestration_item(
641 &self,
642 lock_token: &str,
643 delay: Option<Duration>,
644 ignore_attempt: bool,
645 ) -> Result<(), ProviderError> {
646 let start = std::time::Instant::now();
647 let now_ms = Self::now_millis();
648 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
649
650 let instance_id = match sqlx::query_scalar::<_, String>(&format!(
651 "SELECT {}.abandon_orchestration_item($1, $2, $3, $4)",
652 self.schema_name
653 ))
654 .bind(lock_token)
655 .bind(now_ms)
656 .bind(delay_param)
657 .bind(ignore_attempt)
658 .fetch_one(&*self.pool)
659 .await
660 {
661 Ok(instance_id) => instance_id,
662 Err(e) => {
663 if let SqlxError::Database(db_err) = &e {
664 if db_err.message().contains("Invalid lock token") {
665 return Err(ProviderError::permanent(
666 "abandon_orchestration_item",
667 "Invalid lock token",
668 ));
669 }
670 } else if e.to_string().contains("Invalid lock token") {
671 return Err(ProviderError::permanent(
672 "abandon_orchestration_item",
673 "Invalid lock token",
674 ));
675 }
676
677 return Err(Self::sqlx_to_provider_error(
678 "abandon_orchestration_item",
679 e,
680 ));
681 }
682 };
683
684 let duration_ms = start.elapsed().as_millis() as u64;
685 debug!(
686 target = "duroxide::providers::postgres",
687 operation = "abandon_orchestration_item",
688 instance_id = %instance_id,
689 delay_ms = delay.map(|d| d.as_millis() as u64),
690 ignore_attempt = ignore_attempt,
691 duration_ms = duration_ms,
692 "Abandoned orchestration item via stored procedure"
693 );
694
695 Ok(())
696 }
697
698 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
699 async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
700 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
701 "SELECT out_event_data FROM {}.fetch_history($1)",
702 self.schema_name
703 ))
704 .bind(instance)
705 .fetch_all(&*self.pool)
706 .await
707 .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
708
709 Ok(event_data_rows
710 .into_iter()
711 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
712 .collect())
713 }
714
715 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
716 async fn append_with_execution(
717 &self,
718 instance: &str,
719 execution_id: u64,
720 new_events: Vec<Event>,
721 ) -> Result<(), ProviderError> {
722 if new_events.is_empty() {
723 return Ok(());
724 }
725
726 let mut events_payload = Vec::with_capacity(new_events.len());
727 for event in &new_events {
728 if event.event_id() == 0 {
729 error!(
730 target = "duroxide::providers::postgres",
731 operation = "append_with_execution",
732 error_type = "validation_error",
733 instance_id = %instance,
734 execution_id = execution_id,
735 "event_id must be set by runtime"
736 );
737 return Err(ProviderError::permanent(
738 "append_with_execution",
739 "event_id must be set by runtime",
740 ));
741 }
742
743 let event_json = serde_json::to_string(event).map_err(|e| {
744 ProviderError::permanent(
745 "append_with_execution",
746 format!("Failed to serialize event: {e}"),
747 )
748 })?;
749
750 let event_type = format!("{event:?}")
751 .split('{')
752 .next()
753 .unwrap_or("Unknown")
754 .trim()
755 .to_string();
756
757 events_payload.push(serde_json::json!({
758 "event_id": event.event_id(),
759 "event_type": event_type,
760 "event_data": event_json,
761 }));
762 }
763
764 let events_json = serde_json::Value::Array(events_payload);
765
766 sqlx::query(&format!(
767 "SELECT {}.append_history($1, $2, $3)",
768 self.schema_name
769 ))
770 .bind(instance)
771 .bind(execution_id as i64)
772 .bind(events_json)
773 .execute(&*self.pool)
774 .await
775 .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
776
777 debug!(
778 target = "duroxide::providers::postgres",
779 operation = "append_with_execution",
780 instance_id = %instance,
781 execution_id = execution_id,
782 event_count = new_events.len(),
783 "Appended history events via stored procedure"
784 );
785
786 Ok(())
787 }
788
789 #[instrument(skip(self), target = "duroxide::providers::postgres")]
790 async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
791 let work_item = serde_json::to_string(&item).map_err(|e| {
792 ProviderError::permanent(
793 "enqueue_worker_work",
794 format!("Failed to serialize work item: {e}"),
795 )
796 })?;
797
798 let now_ms = Self::now_millis();
799
800 let (instance_id, execution_id, activity_id, session_id, tag) = match &item {
802 WorkItem::ActivityExecute {
803 instance,
804 execution_id,
805 id,
806 session_id,
807 tag,
808 ..
809 } => (
810 Some(instance.clone()),
811 Some(*execution_id as i64),
812 Some(*id as i64),
813 session_id.clone(),
814 tag.clone(),
815 ),
816 _ => (None, None, None, None, None),
817 };
818
819 sqlx::query(&format!(
820 "SELECT {}.enqueue_worker_work($1, $2, $3, $4, $5, $6, $7)",
821 self.schema_name
822 ))
823 .bind(work_item)
824 .bind(now_ms)
825 .bind(&instance_id)
826 .bind(execution_id)
827 .bind(activity_id)
828 .bind(&session_id)
829 .bind(&tag)
830 .execute(&*self.pool)
831 .await
832 .map_err(|e| {
833 error!(
834 target = "duroxide::providers::postgres",
835 operation = "enqueue_worker_work",
836 error_type = "database_error",
837 error = %e,
838 "Failed to enqueue worker work"
839 );
840 Self::sqlx_to_provider_error("enqueue_worker_work", e)
841 })?;
842
843 Ok(())
844 }
845
846 #[instrument(skip(self), target = "duroxide::providers::postgres")]
847 async fn fetch_work_item(
848 &self,
849 lock_timeout: Duration,
850 _poll_timeout: Duration,
851 session: Option<&SessionFetchConfig>,
852 tag_filter: &TagFilter,
853 ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
854 if matches!(tag_filter, TagFilter::None) {
856 return Ok(None);
857 }
858
859 let start = std::time::Instant::now();
860
861 let lock_timeout_ms = lock_timeout.as_millis() as i64;
863
864 let (owner_id, session_lock_timeout_ms): (Option<&str>, Option<i64>) = match session {
866 Some(config) => (
867 Some(&config.owner_id),
868 Some(config.lock_timeout.as_millis() as i64),
869 ),
870 None => (None, None),
871 };
872
873 let (tag_mode, tag_names) = Self::tag_filter_to_sql(tag_filter);
875
876 let row = match sqlx::query_as::<_, (String, String, i32)>(&format!(
877 "SELECT * FROM {}.fetch_work_item($1, $2, $3, $4, $5, $6)",
878 self.schema_name
879 ))
880 .bind(Self::now_millis())
881 .bind(lock_timeout_ms)
882 .bind(owner_id)
883 .bind(session_lock_timeout_ms)
884 .bind(&tag_names)
885 .bind(tag_mode)
886 .fetch_optional(&*self.pool)
887 .await
888 {
889 Ok(row) => row,
890 Err(e) => {
891 return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
892 }
893 };
894
895 let (work_item_json, lock_token, attempt_count) = match row {
896 Some(row) => row,
897 None => return Ok(None),
898 };
899
900 let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
901 ProviderError::permanent(
902 "fetch_work_item",
903 format!("Failed to deserialize worker item: {e}"),
904 )
905 })?;
906
907 let duration_ms = start.elapsed().as_millis() as u64;
908
909 let instance_id = match &work_item {
911 WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
912 WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
913 WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
914 WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
915 WorkItem::TimerFired { instance, .. } => instance.as_str(),
916 WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
917 WorkItem::CancelInstance { instance, .. } => instance.as_str(),
918 WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
919 WorkItem::SubOrchCompleted {
920 parent_instance, ..
921 } => parent_instance.as_str(),
922 WorkItem::SubOrchFailed {
923 parent_instance, ..
924 } => parent_instance.as_str(),
925 WorkItem::QueueMessage { instance, .. } => instance.as_str(),
926 };
927
928 debug!(
929 target = "duroxide::providers::postgres",
930 operation = "fetch_work_item",
931 instance_id = %instance_id,
932 attempt_count = attempt_count,
933 duration_ms = duration_ms,
934 "Fetched activity work item via stored procedure"
935 );
936
937 Ok(Some((work_item, lock_token, attempt_count as u32)))
938 }
939
940 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
941 async fn ack_work_item(
942 &self,
943 token: &str,
944 completion: Option<WorkItem>,
945 ) -> Result<(), ProviderError> {
946 let start = std::time::Instant::now();
947
948 let Some(completion) = completion else {
950 let now_ms = Self::now_millis();
951 sqlx::query(&format!(
953 "SELECT {}.ack_worker($1, NULL, NULL, $2)",
954 self.schema_name
955 ))
956 .bind(token)
957 .bind(now_ms)
958 .execute(&*self.pool)
959 .await
960 .map_err(|e| {
961 if e.to_string().contains("Worker queue item not found") {
962 ProviderError::permanent(
963 "ack_worker",
964 "Worker queue item not found or already processed",
965 )
966 } else {
967 Self::sqlx_to_provider_error("ack_worker", e)
968 }
969 })?;
970
971 let duration_ms = start.elapsed().as_millis() as u64;
972 debug!(
973 target = "duroxide::providers::postgres",
974 operation = "ack_worker",
975 token = %token,
976 duration_ms = duration_ms,
977 "Acknowledged worker without completion (cancelled)"
978 );
979 return Ok(());
980 };
981
982 let instance_id = match &completion {
984 WorkItem::ActivityCompleted { instance, .. }
985 | WorkItem::ActivityFailed { instance, .. } => instance,
986 _ => {
987 error!(
988 target = "duroxide::providers::postgres",
989 operation = "ack_worker",
990 error_type = "invalid_completion_type",
991 "Invalid completion work item type"
992 );
993 return Err(ProviderError::permanent(
994 "ack_worker",
995 "Invalid completion work item type",
996 ));
997 }
998 };
999
1000 let completion_json = serde_json::to_string(&completion).map_err(|e| {
1001 ProviderError::permanent("ack_worker", format!("Failed to serialize completion: {e}"))
1002 })?;
1003
1004 let now_ms = Self::now_millis();
1005
1006 sqlx::query(&format!(
1008 "SELECT {}.ack_worker($1, $2, $3, $4)",
1009 self.schema_name
1010 ))
1011 .bind(token)
1012 .bind(instance_id)
1013 .bind(completion_json)
1014 .bind(now_ms)
1015 .execute(&*self.pool)
1016 .await
1017 .map_err(|e| {
1018 if e.to_string().contains("Worker queue item not found") {
1019 error!(
1020 target = "duroxide::providers::postgres",
1021 operation = "ack_worker",
1022 error_type = "worker_item_not_found",
1023 token = %token,
1024 "Worker queue item not found or already processed"
1025 );
1026 ProviderError::permanent(
1027 "ack_worker",
1028 "Worker queue item not found or already processed",
1029 )
1030 } else {
1031 Self::sqlx_to_provider_error("ack_worker", e)
1032 }
1033 })?;
1034
1035 let duration_ms = start.elapsed().as_millis() as u64;
1036 debug!(
1037 target = "duroxide::providers::postgres",
1038 operation = "ack_worker",
1039 instance_id = %instance_id,
1040 duration_ms = duration_ms,
1041 "Acknowledged worker and enqueued completion"
1042 );
1043
1044 Ok(())
1045 }
1046
1047 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1048 async fn renew_work_item_lock(
1049 &self,
1050 token: &str,
1051 extend_for: Duration,
1052 ) -> Result<(), ProviderError> {
1053 let start = std::time::Instant::now();
1054
1055 let now_ms = Self::now_millis();
1057
1058 let extend_secs = extend_for.as_secs() as i64;
1060
1061 match sqlx::query(&format!(
1062 "SELECT {}.renew_work_item_lock($1, $2, $3)",
1063 self.schema_name
1064 ))
1065 .bind(token)
1066 .bind(now_ms)
1067 .bind(extend_secs)
1068 .execute(&*self.pool)
1069 .await
1070 {
1071 Ok(_) => {
1072 let duration_ms = start.elapsed().as_millis() as u64;
1073 debug!(
1074 target = "duroxide::providers::postgres",
1075 operation = "renew_work_item_lock",
1076 token = %token,
1077 extend_for_secs = extend_secs,
1078 duration_ms = duration_ms,
1079 "Work item lock renewed successfully"
1080 );
1081 Ok(())
1082 }
1083 Err(e) => {
1084 if let SqlxError::Database(db_err) = &e {
1085 if db_err.message().contains("Lock token invalid") {
1086 return Err(ProviderError::permanent(
1087 "renew_work_item_lock",
1088 "Lock token invalid, expired, or already acked",
1089 ));
1090 }
1091 } else if e.to_string().contains("Lock token invalid") {
1092 return Err(ProviderError::permanent(
1093 "renew_work_item_lock",
1094 "Lock token invalid, expired, or already acked",
1095 ));
1096 }
1097
1098 Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
1099 }
1100 }
1101 }
1102
1103 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1104 async fn abandon_work_item(
1105 &self,
1106 token: &str,
1107 delay: Option<Duration>,
1108 ignore_attempt: bool,
1109 ) -> Result<(), ProviderError> {
1110 let start = std::time::Instant::now();
1111 let now_ms = Self::now_millis();
1112 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
1113
1114 match sqlx::query(&format!(
1115 "SELECT {}.abandon_work_item($1, $2, $3, $4)",
1116 self.schema_name
1117 ))
1118 .bind(token)
1119 .bind(now_ms)
1120 .bind(delay_param)
1121 .bind(ignore_attempt)
1122 .execute(&*self.pool)
1123 .await
1124 {
1125 Ok(_) => {
1126 let duration_ms = start.elapsed().as_millis() as u64;
1127 debug!(
1128 target = "duroxide::providers::postgres",
1129 operation = "abandon_work_item",
1130 token = %token,
1131 delay_ms = delay.map(|d| d.as_millis() as u64),
1132 ignore_attempt = ignore_attempt,
1133 duration_ms = duration_ms,
1134 "Abandoned work item via stored procedure"
1135 );
1136 Ok(())
1137 }
1138 Err(e) => {
1139 if let SqlxError::Database(db_err) = &e {
1140 if db_err.message().contains("Invalid lock token")
1141 || db_err.message().contains("already acked")
1142 {
1143 return Err(ProviderError::permanent(
1144 "abandon_work_item",
1145 "Invalid lock token or already acked",
1146 ));
1147 }
1148 } else if e.to_string().contains("Invalid lock token")
1149 || e.to_string().contains("already acked")
1150 {
1151 return Err(ProviderError::permanent(
1152 "abandon_work_item",
1153 "Invalid lock token or already acked",
1154 ));
1155 }
1156
1157 Err(Self::sqlx_to_provider_error("abandon_work_item", e))
1158 }
1159 }
1160 }
1161
1162 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1163 async fn renew_orchestration_item_lock(
1164 &self,
1165 token: &str,
1166 extend_for: Duration,
1167 ) -> Result<(), ProviderError> {
1168 let start = std::time::Instant::now();
1169
1170 let now_ms = Self::now_millis();
1172
1173 let extend_secs = extend_for.as_secs() as i64;
1175
1176 match sqlx::query(&format!(
1177 "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
1178 self.schema_name
1179 ))
1180 .bind(token)
1181 .bind(now_ms)
1182 .bind(extend_secs)
1183 .execute(&*self.pool)
1184 .await
1185 {
1186 Ok(_) => {
1187 let duration_ms = start.elapsed().as_millis() as u64;
1188 debug!(
1189 target = "duroxide::providers::postgres",
1190 operation = "renew_orchestration_item_lock",
1191 token = %token,
1192 extend_for_secs = extend_secs,
1193 duration_ms = duration_ms,
1194 "Orchestration item lock renewed successfully"
1195 );
1196 Ok(())
1197 }
1198 Err(e) => {
1199 if let SqlxError::Database(db_err) = &e {
1200 if db_err.message().contains("Lock token invalid")
1201 || db_err.message().contains("expired")
1202 || db_err.message().contains("already released")
1203 {
1204 return Err(ProviderError::permanent(
1205 "renew_orchestration_item_lock",
1206 "Lock token invalid, expired, or already released",
1207 ));
1208 }
1209 } else if e.to_string().contains("Lock token invalid")
1210 || e.to_string().contains("expired")
1211 || e.to_string().contains("already released")
1212 {
1213 return Err(ProviderError::permanent(
1214 "renew_orchestration_item_lock",
1215 "Lock token invalid, expired, or already released",
1216 ));
1217 }
1218
1219 Err(Self::sqlx_to_provider_error(
1220 "renew_orchestration_item_lock",
1221 e,
1222 ))
1223 }
1224 }
1225 }
1226
1227 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1228 async fn enqueue_for_orchestrator(
1229 &self,
1230 item: WorkItem,
1231 delay: Option<Duration>,
1232 ) -> Result<(), ProviderError> {
1233 let work_item = serde_json::to_string(&item).map_err(|e| {
1234 ProviderError::permanent(
1235 "enqueue_orchestrator_work",
1236 format!("Failed to serialize work item: {e}"),
1237 )
1238 })?;
1239
1240 let instance_id = match &item {
1242 WorkItem::StartOrchestration { instance, .. }
1243 | WorkItem::ActivityCompleted { instance, .. }
1244 | WorkItem::ActivityFailed { instance, .. }
1245 | WorkItem::TimerFired { instance, .. }
1246 | WorkItem::ExternalRaised { instance, .. }
1247 | WorkItem::CancelInstance { instance, .. }
1248 | WorkItem::ContinueAsNew { instance, .. }
1249 | WorkItem::QueueMessage { instance, .. } => instance,
1250 WorkItem::SubOrchCompleted {
1251 parent_instance, ..
1252 }
1253 | WorkItem::SubOrchFailed {
1254 parent_instance, ..
1255 } => parent_instance,
1256 WorkItem::ActivityExecute { .. } => {
1257 return Err(ProviderError::permanent(
1258 "enqueue_orchestrator_work",
1259 "ActivityExecute should go to worker queue, not orchestrator queue",
1260 ));
1261 }
1262 };
1263
1264 let now_ms = Self::now_millis();
1266
1267 let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
1268 if *fire_at_ms > 0 {
1269 if let Some(delay) = delay {
1271 std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
1272 } else {
1273 *fire_at_ms
1274 }
1275 } else {
1276 delay
1278 .map(|d| now_ms as u64 + d.as_millis() as u64)
1279 .unwrap_or(now_ms as u64)
1280 }
1281 } else {
1282 delay
1284 .map(|d| now_ms as u64 + d.as_millis() as u64)
1285 .unwrap_or(now_ms as u64)
1286 };
1287
1288 let visible_at = Utc
1289 .timestamp_millis_opt(visible_at_ms as i64)
1290 .single()
1291 .ok_or_else(|| {
1292 ProviderError::permanent(
1293 "enqueue_orchestrator_work",
1294 "Invalid visible_at timestamp",
1295 )
1296 })?;
1297
1298 sqlx::query(&format!(
1303 "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
1304 self.schema_name
1305 ))
1306 .bind(instance_id)
1307 .bind(&work_item)
1308 .bind(visible_at)
1309 .bind::<Option<String>>(None) .bind::<Option<String>>(None) .bind::<Option<i64>>(None) .execute(&*self.pool)
1313 .await
1314 .map_err(|e| {
1315 error!(
1316 target = "duroxide::providers::postgres",
1317 operation = "enqueue_orchestrator_work",
1318 error_type = "database_error",
1319 error = %e,
1320 instance_id = %instance_id,
1321 "Failed to enqueue orchestrator work"
1322 );
1323 Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1324 })?;
1325
1326 debug!(
1327 target = "duroxide::providers::postgres",
1328 operation = "enqueue_orchestrator_work",
1329 instance_id = %instance_id,
1330 delay_ms = delay.map(|d| d.as_millis() as u64),
1331 "Enqueued orchestrator work"
1332 );
1333
1334 Ok(())
1335 }
1336
1337 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1338 async fn read_with_execution(
1339 &self,
1340 instance: &str,
1341 execution_id: u64,
1342 ) -> Result<Vec<Event>, ProviderError> {
1343 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1344 "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1345 self.table_name("history")
1346 ))
1347 .bind(instance)
1348 .bind(execution_id as i64)
1349 .fetch_all(&*self.pool)
1350 .await
1351 .ok()
1352 .unwrap_or_default();
1353
1354 Ok(event_data_rows
1355 .into_iter()
1356 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1357 .collect())
1358 }
1359
1360 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1361 async fn renew_session_lock(
1362 &self,
1363 owner_ids: &[&str],
1364 extend_for: Duration,
1365 idle_timeout: Duration,
1366 ) -> Result<usize, ProviderError> {
1367 if owner_ids.is_empty() {
1368 return Ok(0);
1369 }
1370
1371 let now_ms = Self::now_millis();
1372 let extend_ms = extend_for.as_millis() as i64;
1373 let idle_timeout_ms = idle_timeout.as_millis() as i64;
1374 let owner_ids_vec: Vec<&str> = owner_ids.to_vec();
1375
1376 let result = sqlx::query_scalar::<_, i64>(&format!(
1377 "SELECT {}.renew_session_lock($1, $2, $3, $4)",
1378 self.schema_name
1379 ))
1380 .bind(&owner_ids_vec)
1381 .bind(now_ms)
1382 .bind(extend_ms)
1383 .bind(idle_timeout_ms)
1384 .fetch_one(&*self.pool)
1385 .await
1386 .map_err(|e| Self::sqlx_to_provider_error("renew_session_lock", e))?;
1387
1388 debug!(
1389 target = "duroxide::providers::postgres",
1390 operation = "renew_session_lock",
1391 owner_count = owner_ids.len(),
1392 sessions_renewed = result,
1393 "Session locks renewed"
1394 );
1395
1396 Ok(result as usize)
1397 }
1398
1399 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1400 async fn cleanup_orphaned_sessions(
1401 &self,
1402 _idle_timeout: Duration,
1403 ) -> Result<usize, ProviderError> {
1404 let now_ms = Self::now_millis();
1405
1406 let result = sqlx::query_scalar::<_, i64>(&format!(
1407 "SELECT {}.cleanup_orphaned_sessions($1)",
1408 self.schema_name
1409 ))
1410 .bind(now_ms)
1411 .fetch_one(&*self.pool)
1412 .await
1413 .map_err(|e| Self::sqlx_to_provider_error("cleanup_orphaned_sessions", e))?;
1414
1415 debug!(
1416 target = "duroxide::providers::postgres",
1417 operation = "cleanup_orphaned_sessions",
1418 sessions_cleaned = result,
1419 "Orphaned sessions cleaned up"
1420 );
1421
1422 Ok(result as usize)
1423 }
1424
1425 fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1426 Some(self)
1427 }
1428
1429 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1430 async fn get_custom_status(
1431 &self,
1432 instance: &str,
1433 last_seen_version: u64,
1434 ) -> Result<Option<(Option<String>, u64)>, ProviderError> {
1435 let row = sqlx::query_as::<_, (Option<String>, i64)>(&format!(
1436 "SELECT * FROM {}.get_custom_status($1, $2)",
1437 self.schema_name
1438 ))
1439 .bind(instance)
1440 .bind(last_seen_version as i64)
1441 .fetch_optional(&*self.pool)
1442 .await
1443 .map_err(|e| Self::sqlx_to_provider_error("get_custom_status", e))?;
1444
1445 match row {
1446 Some((custom_status, version)) => Ok(Some((custom_status, version as u64))),
1447 None => Ok(None),
1448 }
1449 }
1450
1451 async fn get_kv_value(
1452 &self,
1453 instance_id: &str,
1454 key: &str,
1455 ) -> Result<Option<String>, ProviderError> {
1456 let query = format!(
1457 "SELECT value FROM {}.kv_store WHERE instance_id = $1 AND key = $2",
1458 self.schema_name
1459 );
1460 let result: Option<(String,)> = sqlx::query_as(&query)
1461 .bind(instance_id)
1462 .bind(key)
1463 .fetch_optional(&*self.pool)
1464 .await
1465 .map_err(|e| ProviderError::retryable("get_kv_value", format!("get_kv_value: {e}")))?;
1466 Ok(result.map(|(v,)| v))
1467 }
1468
1469 async fn get_kv_all_values(
1470 &self,
1471 instance_id: &str,
1472 ) -> Result<std::collections::HashMap<String, String>, ProviderError> {
1473 let query = format!(
1474 "SELECT key, value FROM {}.kv_store WHERE instance_id = $1",
1475 self.schema_name
1476 );
1477 let rows: Vec<(String, String)> = sqlx::query_as(&query)
1478 .bind(instance_id)
1479 .fetch_all(&*self.pool)
1480 .await
1481 .map_err(|e| Self::sqlx_to_provider_error("get_kv_all_values", e))?;
1482 Ok(rows.into_iter().collect())
1483 }
1484}
1485
1486#[async_trait::async_trait]
1487impl ProviderAdmin for PostgresProvider {
1488 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1489 async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1490 sqlx::query_scalar(&format!(
1491 "SELECT instance_id FROM {}.list_instances()",
1492 self.schema_name
1493 ))
1494 .fetch_all(&*self.pool)
1495 .await
1496 .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1497 }
1498
1499 #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1500 async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1501 sqlx::query_scalar(&format!(
1502 "SELECT instance_id FROM {}.list_instances_by_status($1)",
1503 self.schema_name
1504 ))
1505 .bind(status)
1506 .fetch_all(&*self.pool)
1507 .await
1508 .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1509 }
1510
1511 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1512 async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1513 let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1514 "SELECT execution_id FROM {}.list_executions($1)",
1515 self.schema_name
1516 ))
1517 .bind(instance)
1518 .fetch_all(&*self.pool)
1519 .await
1520 .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1521
1522 Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1523 }
1524
1525 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1526 async fn read_history_with_execution_id(
1527 &self,
1528 instance: &str,
1529 execution_id: u64,
1530 ) -> Result<Vec<Event>, ProviderError> {
1531 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1532 "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1533 self.schema_name
1534 ))
1535 .bind(instance)
1536 .bind(execution_id as i64)
1537 .fetch_all(&*self.pool)
1538 .await
1539 .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1540
1541 event_data_rows
1542 .into_iter()
1543 .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1544 .collect::<Vec<Event>>()
1545 .into_iter()
1546 .map(Ok)
1547 .collect()
1548 }
1549
1550 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1551 async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1552 let execution_id = self.latest_execution_id(instance).await?;
1553 self.read_history_with_execution_id(instance, execution_id)
1554 .await
1555 }
1556
1557 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1558 async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1559 sqlx::query_scalar(&format!(
1560 "SELECT {}.latest_execution_id($1)",
1561 self.schema_name
1562 ))
1563 .bind(instance)
1564 .fetch_optional(&*self.pool)
1565 .await
1566 .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1567 .map(|id: i64| id as u64)
1568 .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1569 }
1570
1571 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1572 async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1573 let row: Option<(
1574 String,
1575 String,
1576 String,
1577 i64,
1578 chrono::DateTime<Utc>,
1579 Option<chrono::DateTime<Utc>>,
1580 Option<String>,
1581 Option<String>,
1582 Option<String>,
1583 )> = sqlx::query_as(&format!(
1584 "SELECT * FROM {}.get_instance_info($1)",
1585 self.schema_name
1586 ))
1587 .bind(instance)
1588 .fetch_optional(&*self.pool)
1589 .await
1590 .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1591
1592 let (
1593 instance_id,
1594 orchestration_name,
1595 orchestration_version,
1596 current_execution_id,
1597 created_at,
1598 updated_at,
1599 status,
1600 output,
1601 parent_instance_id,
1602 ) =
1603 row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1604
1605 Ok(InstanceInfo {
1606 instance_id,
1607 orchestration_name,
1608 orchestration_version,
1609 current_execution_id: current_execution_id as u64,
1610 status: status.unwrap_or_else(|| "Running".to_string()),
1611 output,
1612 created_at: created_at.timestamp_millis() as u64,
1613 updated_at: updated_at
1614 .map(|dt| dt.timestamp_millis() as u64)
1615 .unwrap_or(created_at.timestamp_millis() as u64),
1616 parent_instance_id,
1617 })
1618 }
1619
1620 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1621 async fn get_execution_info(
1622 &self,
1623 instance: &str,
1624 execution_id: u64,
1625 ) -> Result<ExecutionInfo, ProviderError> {
1626 let row: Option<(
1627 i64,
1628 String,
1629 Option<String>,
1630 chrono::DateTime<Utc>,
1631 Option<chrono::DateTime<Utc>>,
1632 i64,
1633 )> = sqlx::query_as(&format!(
1634 "SELECT * FROM {}.get_execution_info($1, $2)",
1635 self.schema_name
1636 ))
1637 .bind(instance)
1638 .bind(execution_id as i64)
1639 .fetch_optional(&*self.pool)
1640 .await
1641 .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1642
1643 let (exec_id, status, output, started_at, completed_at, event_count) = row
1644 .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1645
1646 Ok(ExecutionInfo {
1647 execution_id: exec_id as u64,
1648 status,
1649 output,
1650 started_at: started_at.timestamp_millis() as u64,
1651 completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1652 event_count: event_count as usize,
1653 })
1654 }
1655
1656 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1657 async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1658 let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1659 "SELECT * FROM {}.get_system_metrics()",
1660 self.schema_name
1661 ))
1662 .fetch_optional(&*self.pool)
1663 .await
1664 .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1665
1666 let (
1667 total_instances,
1668 total_executions,
1669 running_instances,
1670 completed_instances,
1671 failed_instances,
1672 total_events,
1673 ) = row.ok_or_else(|| {
1674 ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1675 })?;
1676
1677 Ok(SystemMetrics {
1678 total_instances: total_instances as u64,
1679 total_executions: total_executions as u64,
1680 running_instances: running_instances as u64,
1681 completed_instances: completed_instances as u64,
1682 failed_instances: failed_instances as u64,
1683 total_events: total_events as u64,
1684 })
1685 }
1686
1687 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1688 async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1689 let now_ms = Self::now_millis();
1690
1691 let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1692 "SELECT * FROM {}.get_queue_depths($1)",
1693 self.schema_name
1694 ))
1695 .bind(now_ms)
1696 .fetch_optional(&*self.pool)
1697 .await
1698 .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1699
1700 let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1701 ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1702 })?;
1703
1704 Ok(QueueDepths {
1705 orchestrator_queue: orchestrator_queue as usize,
1706 worker_queue: worker_queue as usize,
1707 timer_queue: 0, })
1709 }
1710
1711 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1714 async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError> {
1715 sqlx::query_scalar(&format!(
1716 "SELECT child_instance_id FROM {}.list_children($1)",
1717 self.schema_name
1718 ))
1719 .bind(instance_id)
1720 .fetch_all(&*self.pool)
1721 .await
1722 .map_err(|e| Self::sqlx_to_provider_error("list_children", e))
1723 }
1724
1725 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1726 async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError> {
1727 let result: Result<Option<String>, _> =
1730 sqlx::query_scalar(&format!("SELECT {}.get_parent_id($1)", self.schema_name))
1731 .bind(instance_id)
1732 .fetch_one(&*self.pool)
1733 .await;
1734
1735 match result {
1736 Ok(parent_id) => Ok(parent_id),
1737 Err(e) => {
1738 let err_str = e.to_string();
1739 if err_str.contains("Instance not found") {
1740 Err(ProviderError::permanent(
1741 "get_parent_id",
1742 format!("Instance not found: {}", instance_id),
1743 ))
1744 } else {
1745 Err(Self::sqlx_to_provider_error("get_parent_id", e))
1746 }
1747 }
1748 }
1749 }
1750
1751 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1754 async fn delete_instances_atomic(
1755 &self,
1756 ids: &[String],
1757 force: bool,
1758 ) -> Result<DeleteInstanceResult, ProviderError> {
1759 if ids.is_empty() {
1760 return Ok(DeleteInstanceResult::default());
1761 }
1762
1763 let row: Option<(i64, i64, i64, i64)> = sqlx::query_as(&format!(
1764 "SELECT * FROM {}.delete_instances_atomic($1, $2)",
1765 self.schema_name
1766 ))
1767 .bind(ids)
1768 .bind(force)
1769 .fetch_optional(&*self.pool)
1770 .await
1771 .map_err(|e| {
1772 let err_str = e.to_string();
1773 if err_str.contains("is Running") {
1774 ProviderError::permanent("delete_instances_atomic", err_str)
1775 } else if err_str.contains("Orphan detected") {
1776 ProviderError::permanent("delete_instances_atomic", err_str)
1777 } else {
1778 Self::sqlx_to_provider_error("delete_instances_atomic", e)
1779 }
1780 })?;
1781
1782 let (instances_deleted, executions_deleted, events_deleted, queue_messages_deleted) =
1783 row.unwrap_or((0, 0, 0, 0));
1784
1785 debug!(
1786 target = "duroxide::providers::postgres",
1787 operation = "delete_instances_atomic",
1788 instances_deleted = instances_deleted,
1789 executions_deleted = executions_deleted,
1790 events_deleted = events_deleted,
1791 queue_messages_deleted = queue_messages_deleted,
1792 "Deleted instances atomically"
1793 );
1794
1795 Ok(DeleteInstanceResult {
1796 instances_deleted: instances_deleted as u64,
1797 executions_deleted: executions_deleted as u64,
1798 events_deleted: events_deleted as u64,
1799 queue_messages_deleted: queue_messages_deleted as u64,
1800 })
1801 }
1802
1803 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1804 async fn delete_instance_bulk(
1805 &self,
1806 filter: InstanceFilter,
1807 ) -> Result<DeleteInstanceResult, ProviderError> {
1808 let mut sql = format!(
1810 r#"
1811 SELECT i.instance_id
1812 FROM {}.instances i
1813 LEFT JOIN {}.executions e ON i.instance_id = e.instance_id
1814 AND i.current_execution_id = e.execution_id
1815 WHERE i.parent_instance_id IS NULL
1816 AND e.status IN ('Completed', 'Failed', 'ContinuedAsNew')
1817 "#,
1818 self.schema_name, self.schema_name
1819 );
1820
1821 if let Some(ref ids) = filter.instance_ids {
1823 if ids.is_empty() {
1824 return Ok(DeleteInstanceResult::default());
1825 }
1826 let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1827 sql.push_str(&format!(
1828 " AND i.instance_id IN ({})",
1829 placeholders.join(", ")
1830 ));
1831 }
1832
1833 if filter.completed_before.is_some() {
1835 let param_num = filter
1836 .instance_ids
1837 .as_ref()
1838 .map(|ids| ids.len())
1839 .unwrap_or(0)
1840 + 1;
1841 sql.push_str(&format!(
1842 " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1843 param_num
1844 ));
1845 }
1846
1847 let limit = filter.limit.unwrap_or(1000);
1849 let limit_param_num = filter
1850 .instance_ids
1851 .as_ref()
1852 .map(|ids| ids.len())
1853 .unwrap_or(0)
1854 + if filter.completed_before.is_some() {
1855 1
1856 } else {
1857 0
1858 }
1859 + 1;
1860 sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1861
1862 let mut query = sqlx::query_scalar::<_, String>(&sql);
1864 if let Some(ref ids) = filter.instance_ids {
1865 for id in ids {
1866 query = query.bind(id);
1867 }
1868 }
1869 if let Some(completed_before) = filter.completed_before {
1870 query = query.bind(completed_before as i64);
1871 }
1872 query = query.bind(limit as i64);
1873
1874 let instance_ids: Vec<String> = query
1875 .fetch_all(&*self.pool)
1876 .await
1877 .map_err(|e| Self::sqlx_to_provider_error("delete_instance_bulk", e))?;
1878
1879 if instance_ids.is_empty() {
1880 return Ok(DeleteInstanceResult::default());
1881 }
1882
1883 let mut result = DeleteInstanceResult::default();
1885
1886 for instance_id in &instance_ids {
1887 let tree = self.get_instance_tree(instance_id).await?;
1889
1890 let delete_result = self.delete_instances_atomic(&tree.all_ids, true).await?;
1892 result.instances_deleted += delete_result.instances_deleted;
1893 result.executions_deleted += delete_result.executions_deleted;
1894 result.events_deleted += delete_result.events_deleted;
1895 result.queue_messages_deleted += delete_result.queue_messages_deleted;
1896 }
1897
1898 debug!(
1899 target = "duroxide::providers::postgres",
1900 operation = "delete_instance_bulk",
1901 instances_deleted = result.instances_deleted,
1902 executions_deleted = result.executions_deleted,
1903 events_deleted = result.events_deleted,
1904 queue_messages_deleted = result.queue_messages_deleted,
1905 "Bulk deleted instances"
1906 );
1907
1908 Ok(result)
1909 }
1910
1911 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1914 async fn prune_executions(
1915 &self,
1916 instance_id: &str,
1917 options: PruneOptions,
1918 ) -> Result<PruneResult, ProviderError> {
1919 let keep_last: Option<i32> = options.keep_last.map(|v| v as i32);
1920 let completed_before_ms: Option<i64> = options.completed_before.map(|v| v as i64);
1921
1922 let row: Option<(i64, i64, i64)> = sqlx::query_as(&format!(
1923 "SELECT * FROM {}.prune_executions($1, $2, $3)",
1924 self.schema_name
1925 ))
1926 .bind(instance_id)
1927 .bind(keep_last)
1928 .bind(completed_before_ms)
1929 .fetch_optional(&*self.pool)
1930 .await
1931 .map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?;
1932
1933 let (instances_processed, executions_deleted, events_deleted) = row.unwrap_or((0, 0, 0));
1934
1935 debug!(
1936 target = "duroxide::providers::postgres",
1937 operation = "prune_executions",
1938 instance_id = %instance_id,
1939 instances_processed = instances_processed,
1940 executions_deleted = executions_deleted,
1941 events_deleted = events_deleted,
1942 "Pruned executions"
1943 );
1944
1945 Ok(PruneResult {
1946 instances_processed: instances_processed as u64,
1947 executions_deleted: executions_deleted as u64,
1948 events_deleted: events_deleted as u64,
1949 })
1950 }
1951
1952 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1953 async fn prune_executions_bulk(
1954 &self,
1955 filter: InstanceFilter,
1956 options: PruneOptions,
1957 ) -> Result<PruneResult, ProviderError> {
1958 let mut sql = format!(
1963 r#"
1964 SELECT i.instance_id
1965 FROM {}.instances i
1966 LEFT JOIN {}.executions e ON i.instance_id = e.instance_id
1967 AND i.current_execution_id = e.execution_id
1968 WHERE 1=1
1969 "#,
1970 self.schema_name, self.schema_name
1971 );
1972
1973 if let Some(ref ids) = filter.instance_ids {
1975 if ids.is_empty() {
1976 return Ok(PruneResult::default());
1977 }
1978 let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1979 sql.push_str(&format!(
1980 " AND i.instance_id IN ({})",
1981 placeholders.join(", ")
1982 ));
1983 }
1984
1985 if filter.completed_before.is_some() {
1987 let param_num = filter
1988 .instance_ids
1989 .as_ref()
1990 .map(|ids| ids.len())
1991 .unwrap_or(0)
1992 + 1;
1993 sql.push_str(&format!(
1994 " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1995 param_num
1996 ));
1997 }
1998
1999 let limit = filter.limit.unwrap_or(1000);
2001 let limit_param_num = filter
2002 .instance_ids
2003 .as_ref()
2004 .map(|ids| ids.len())
2005 .unwrap_or(0)
2006 + if filter.completed_before.is_some() {
2007 1
2008 } else {
2009 0
2010 }
2011 + 1;
2012 sql.push_str(&format!(" LIMIT ${}", limit_param_num));
2013
2014 let mut query = sqlx::query_scalar::<_, String>(&sql);
2016 if let Some(ref ids) = filter.instance_ids {
2017 for id in ids {
2018 query = query.bind(id);
2019 }
2020 }
2021 if let Some(completed_before) = filter.completed_before {
2022 query = query.bind(completed_before as i64);
2023 }
2024 query = query.bind(limit as i64);
2025
2026 let instance_ids: Vec<String> = query
2027 .fetch_all(&*self.pool)
2028 .await
2029 .map_err(|e| Self::sqlx_to_provider_error("prune_executions_bulk", e))?;
2030
2031 let mut result = PruneResult::default();
2033
2034 for instance_id in &instance_ids {
2035 let single_result = self.prune_executions(instance_id, options.clone()).await?;
2036 result.instances_processed += single_result.instances_processed;
2037 result.executions_deleted += single_result.executions_deleted;
2038 result.events_deleted += single_result.events_deleted;
2039 }
2040
2041 debug!(
2042 target = "duroxide::providers::postgres",
2043 operation = "prune_executions_bulk",
2044 instances_processed = result.instances_processed,
2045 executions_deleted = result.executions_deleted,
2046 events_deleted = result.events_deleted,
2047 "Bulk pruned executions"
2048 );
2049
2050 Ok(result)
2051 }
2052}