1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4 DeleteInstanceResult, DispatcherCapabilityFilter, ExecutionInfo, ExecutionMetadata,
5 InstanceFilter, InstanceInfo, OrchestrationItem, Provider, ProviderAdmin, ProviderError,
6 PruneOptions, PruneResult, QueueDepths, ScheduledActivityIdentifier, SessionFetchConfig,
7 SystemMetrics, TagFilter, WorkItem,
8};
9use duroxide::{Event, EventKind, SystemStats};
10use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
11use std::sync::Arc;
12use std::time::Duration;
13use std::time::{SystemTime, UNIX_EPOCH};
14use tokio::time::sleep;
15use tracing::{debug, error, instrument, warn};
16
17use crate::migrations::MigrationRunner;
18
19pub struct PostgresProvider {
42 pool: Arc<PgPool>,
43 schema_name: String,
44}
45
46impl PostgresProvider {
47 pub async fn new(database_url: &str) -> Result<Self> {
48 Self::new_with_schema(database_url, None).await
49 }
50
51 pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
52 let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
53 .ok()
54 .and_then(|s| s.parse::<u32>().ok())
55 .unwrap_or(10);
56
57 let pool = PgPoolOptions::new()
58 .max_connections(max_connections)
59 .min_connections(1)
60 .acquire_timeout(std::time::Duration::from_secs(30))
61 .connect(database_url)
62 .await?;
63
64 let schema_name = schema_name.unwrap_or("public").to_string();
65
66 let provider = Self {
67 pool: Arc::new(pool),
68 schema_name: schema_name.clone(),
69 };
70
71 let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
73 migration_runner.migrate().await?;
74
75 Ok(provider)
76 }
77
78 #[instrument(skip(self), target = "duroxide::providers::postgres")]
79 pub async fn initialize_schema(&self) -> Result<()> {
80 let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
83 migration_runner.migrate().await?;
84 Ok(())
85 }
86
87 fn now_millis() -> i64 {
89 SystemTime::now()
90 .duration_since(UNIX_EPOCH)
91 .unwrap()
92 .as_millis() as i64
93 }
94
95 fn table_name(&self, table: &str) -> String {
97 format!("{}.{}", self.schema_name, table)
98 }
99
100 pub fn pool(&self) -> &PgPool {
102 &self.pool
103 }
104
105 pub fn schema_name(&self) -> &str {
107 &self.schema_name
108 }
109
110 fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
112 match e {
113 SqlxError::Database(ref db_err) => {
114 let code_opt = db_err.code();
116 let code = code_opt.as_deref();
117 if code == Some("40P01") {
118 ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
120 } else if code == Some("40001") {
121 ProviderError::permanent(operation, format!("Serialization failure: {e}"))
123 } else if code == Some("23505") {
124 ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
126 } else if code == Some("23503") {
127 ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
129 } else {
130 ProviderError::permanent(operation, format!("Database error: {e}"))
131 }
132 }
133 SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
134 ProviderError::retryable(operation, format!("Connection pool error: {e}"))
135 }
136 SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
137 _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
138 }
139 }
140
141 fn tag_filter_to_sql(filter: &TagFilter) -> (&'static str, Vec<String>) {
143 match filter {
144 TagFilter::DefaultOnly => ("default_only", vec![]),
145 TagFilter::Tags(set) => {
146 let mut tags: Vec<String> = set.iter().cloned().collect();
147 tags.sort();
148 ("tags", tags)
149 }
150 TagFilter::DefaultAnd(set) => {
151 let mut tags: Vec<String> = set.iter().cloned().collect();
152 tags.sort();
153 ("default_and", tags)
154 }
155 TagFilter::Any => ("any", vec![]),
156 TagFilter::None => ("none", vec![]),
157 }
158 }
159
160 pub async fn cleanup_schema(&self) -> Result<()> {
165 const MAX_RETRIES: u32 = 5;
166 const BASE_RETRY_DELAY_MS: u64 = 50;
167
168 for attempt in 0..=MAX_RETRIES {
169 let cleanup_result = async {
170 sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
172 .execute(&*self.pool)
173 .await?;
174
175 if self.schema_name != "public" {
178 sqlx::query(&format!(
179 "DROP SCHEMA IF EXISTS {} CASCADE",
180 self.schema_name
181 ))
182 .execute(&*self.pool)
183 .await?;
184 } else {
185 }
188
189 Ok::<(), SqlxError>(())
190 }
191 .await;
192
193 match cleanup_result {
194 Ok(()) => return Ok(()),
195 Err(SqlxError::Database(db_err)) if db_err.code().as_deref() == Some("40P01") => {
196 if attempt < MAX_RETRIES {
197 warn!(
198 target = "duroxide::providers::postgres",
199 schema = %self.schema_name,
200 attempt = attempt + 1,
201 "Deadlock during cleanup_schema, retrying"
202 );
203 sleep(Duration::from_millis(
204 BASE_RETRY_DELAY_MS * (attempt as u64 + 1),
205 ))
206 .await;
207 continue;
208 }
209 return Err(anyhow::anyhow!(db_err.to_string()));
210 }
211 Err(e) => return Err(anyhow::anyhow!(e.to_string())),
212 }
213 }
214
215 Ok(())
216 }
217}
218
219#[async_trait::async_trait]
220impl Provider for PostgresProvider {
221 fn name(&self) -> &str {
222 "duroxide-pg"
223 }
224
225 fn version(&self) -> &str {
226 env!("CARGO_PKG_VERSION")
227 }
228
229 #[instrument(skip(self), target = "duroxide::providers::postgres")]
230 async fn fetch_orchestration_item(
231 &self,
232 lock_timeout: Duration,
233 _poll_timeout: Duration,
234 filter: Option<&DispatcherCapabilityFilter>,
235 ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
236 let start = std::time::Instant::now();
237
238 const MAX_RETRIES: u32 = 3;
239 const RETRY_DELAY_MS: u64 = 50;
240
241 let lock_timeout_ms = lock_timeout.as_millis() as i64;
243 let mut _last_error: Option<ProviderError> = None;
244
245 let (min_packed, max_packed) = if let Some(f) = filter {
247 if let Some(range) = f.supported_duroxide_versions.first() {
248 let min = range.min.major as i64 * 1_000_000
249 + range.min.minor as i64 * 1_000
250 + range.min.patch as i64;
251 let max = range.max.major as i64 * 1_000_000
252 + range.max.minor as i64 * 1_000
253 + range.max.patch as i64;
254 (Some(min), Some(max))
255 } else {
256 return Ok(None);
258 }
259 } else {
260 (None, None)
261 };
262
263 for attempt in 0..=MAX_RETRIES {
264 let now_ms = Self::now_millis();
265
266 let result: Result<
267 Option<(
268 String,
269 String,
270 String,
271 i64,
272 serde_json::Value,
273 serde_json::Value,
274 String,
275 i32,
276 serde_json::Value,
277 )>,
278 SqlxError,
279 > = sqlx::query_as(&format!(
280 "SELECT * FROM {}.fetch_orchestration_item($1, $2, $3, $4)",
281 self.schema_name
282 ))
283 .bind(now_ms)
284 .bind(lock_timeout_ms)
285 .bind(min_packed)
286 .bind(max_packed)
287 .fetch_optional(&*self.pool)
288 .await;
289
290 let row = match result {
291 Ok(r) => r,
292 Err(e) => {
293 let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
294 if provider_err.is_retryable() && attempt < MAX_RETRIES {
295 warn!(
296 target = "duroxide::providers::postgres",
297 operation = "fetch_orchestration_item",
298 attempt = attempt + 1,
299 error = %provider_err,
300 "Retryable error, will retry"
301 );
302 _last_error = Some(provider_err);
303 sleep(std::time::Duration::from_millis(
304 RETRY_DELAY_MS * (attempt as u64 + 1),
305 ))
306 .await;
307 continue;
308 }
309 return Err(provider_err);
310 }
311 };
312
313 if let Some((
314 instance_id,
315 orchestration_name,
316 orchestration_version,
317 execution_id,
318 history_json,
319 messages_json,
320 lock_token,
321 attempt_count,
322 kv_snapshot_json,
323 )) = row
324 {
325 let (history, history_error) =
326 match serde_json::from_value::<Vec<Event>>(history_json) {
327 Ok(h) => (h, None),
328 Err(e) => {
329 let error_msg = format!("Failed to deserialize history: {e}");
330 warn!(
331 target = "duroxide::providers::postgres",
332 instance = %instance_id,
333 error = %error_msg,
334 "History deserialization failed, returning item with history_error"
335 );
336 (vec![], Some(error_msg))
337 }
338 };
339
340 let messages: Vec<WorkItem> =
341 serde_json::from_value(messages_json).map_err(|e| {
342 ProviderError::permanent(
343 "fetch_orchestration_item",
344 format!("Failed to deserialize messages: {e}"),
345 )
346 })?;
347 let kv_snapshot: std::collections::HashMap<String, duroxide::providers::KvEntry> = {
348 let raw: std::collections::HashMap<String, serde_json::Value> =
349 serde_json::from_value(kv_snapshot_json).unwrap_or_default();
350 raw.into_iter()
351 .filter_map(|(k, v)| {
352 let value = v.get("value")?.as_str()?.to_string();
353 let last_updated_at_ms =
354 v.get("last_updated_at_ms")?.as_u64().unwrap_or(0);
355 Some((
356 k,
357 duroxide::providers::KvEntry {
358 value,
359 last_updated_at_ms,
360 },
361 ))
362 })
363 .collect()
364 };
365
366 let duration_ms = start.elapsed().as_millis() as u64;
367 debug!(
368 target = "duroxide::providers::postgres",
369 operation = "fetch_orchestration_item",
370 instance_id = %instance_id,
371 execution_id = execution_id,
372 message_count = messages.len(),
373 history_count = history.len(),
374 attempt_count = attempt_count,
375 duration_ms = duration_ms,
376 attempts = attempt + 1,
377 "Fetched orchestration item via stored procedure"
378 );
379
380 if orchestration_name == "Unknown"
386 && history.is_empty()
387 && messages
388 .iter()
389 .all(|m| matches!(m, WorkItem::QueueMessage { .. }))
390 {
391 let message_count = messages.len();
392 tracing::warn!(
393 target = "duroxide::providers::postgres",
394 instance = %instance_id,
395 message_count,
396 "Dropping orphan queue messages — events enqueued before orchestration started are not supported"
397 );
398 self.ack_orchestration_item(
399 &lock_token,
400 execution_id as u64,
401 vec![],
402 vec![],
403 vec![],
404 ExecutionMetadata::default(),
405 vec![],
406 )
407 .await?;
408 return Ok(None);
409 }
410
411 return Ok(Some((
412 OrchestrationItem {
413 instance: instance_id,
414 orchestration_name,
415 execution_id: execution_id as u64,
416 version: orchestration_version,
417 history,
418 messages,
419 history_error,
420 kv_snapshot,
421 },
422 lock_token,
423 attempt_count as u32,
424 )));
425 }
426
427 return Ok(None);
430 }
431
432 Ok(None)
433 }
434 #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
435 async fn ack_orchestration_item(
436 &self,
437 lock_token: &str,
438 execution_id: u64,
439 history_delta: Vec<Event>,
440 worker_items: Vec<WorkItem>,
441 orchestrator_items: Vec<WorkItem>,
442 metadata: ExecutionMetadata,
443 cancelled_activities: Vec<ScheduledActivityIdentifier>,
444 ) -> Result<(), ProviderError> {
445 let start = std::time::Instant::now();
446
447 const MAX_RETRIES: u32 = 3;
448 const RETRY_DELAY_MS: u64 = 50;
449
450 let mut history_delta_payload = Vec::with_capacity(history_delta.len());
451 for event in &history_delta {
452 if event.event_id() == 0 {
453 return Err(ProviderError::permanent(
454 "ack_orchestration_item",
455 "event_id must be set by runtime",
456 ));
457 }
458
459 let event_json = serde_json::to_string(event).map_err(|e| {
460 ProviderError::permanent(
461 "ack_orchestration_item",
462 format!("Failed to serialize event: {e}"),
463 )
464 })?;
465
466 let event_type = format!("{event:?}")
467 .split('{')
468 .next()
469 .unwrap_or("Unknown")
470 .trim()
471 .to_string();
472
473 history_delta_payload.push(serde_json::json!({
474 "event_id": event.event_id(),
475 "event_type": event_type,
476 "event_data": event_json,
477 }));
478 }
479
480 let history_delta_json = serde_json::Value::Array(history_delta_payload);
481
482 let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
483 ProviderError::permanent(
484 "ack_orchestration_item",
485 format!("Failed to serialize worker items: {e}"),
486 )
487 })?;
488
489 let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
490 ProviderError::permanent(
491 "ack_orchestration_item",
492 format!("Failed to serialize orchestrator items: {e}"),
493 )
494 })?;
495
496 let (custom_status_action, custom_status_value): (Option<&str>, Option<&str>) = {
498 let mut last_status: Option<&Option<String>> = None;
499 for event in &history_delta {
500 if let EventKind::CustomStatusUpdated { ref status } = event.kind {
501 last_status = Some(status);
502 }
503 }
504 match last_status {
505 Some(Some(s)) => (Some("set"), Some(s.as_str())),
506 Some(None) => (Some("clear"), None),
507 None => (None, None),
508 }
509 };
510
511 let kv_mutations: Vec<serde_json::Value> = history_delta
512 .iter()
513 .filter_map(|event| match &event.kind {
514 EventKind::KeyValueSet {
515 key,
516 value,
517 last_updated_at_ms,
518 } => Some(serde_json::json!({
519 "action": "set",
520 "key": key,
521 "value": value,
522 "last_updated_at_ms": last_updated_at_ms,
523 })),
524 EventKind::KeyValueCleared { key } => Some(serde_json::json!({
525 "action": "clear_key",
526 "key": key,
527 })),
528 EventKind::KeyValuesCleared => Some(serde_json::json!({
529 "action": "clear_all",
530 })),
531 _ => None,
532 })
533 .collect();
534
535 let metadata_json = serde_json::json!({
536 "orchestration_name": metadata.orchestration_name,
537 "orchestration_version": metadata.orchestration_version,
538 "status": metadata.status,
539 "output": metadata.output,
540 "parent_instance_id": metadata.parent_instance_id,
541 "pinned_duroxide_version": metadata.pinned_duroxide_version.as_ref().map(|v| {
542 serde_json::json!({
543 "major": v.major,
544 "minor": v.minor,
545 "patch": v.patch,
546 })
547 }),
548 "custom_status_action": custom_status_action,
549 "custom_status_value": custom_status_value,
550 "kv_mutations": kv_mutations,
551 });
552
553 let cancelled_activities_json: Vec<serde_json::Value> = cancelled_activities
555 .iter()
556 .map(|a| {
557 serde_json::json!({
558 "instance": a.instance,
559 "execution_id": a.execution_id,
560 "activity_id": a.activity_id,
561 })
562 })
563 .collect();
564 let cancelled_activities_json = serde_json::Value::Array(cancelled_activities_json);
565
566 for attempt in 0..=MAX_RETRIES {
567 let now_ms = Self::now_millis();
568 let result = sqlx::query(&format!(
569 "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6, $7, $8)",
570 self.schema_name
571 ))
572 .bind(lock_token)
573 .bind(now_ms)
574 .bind(execution_id as i64)
575 .bind(&history_delta_json)
576 .bind(&worker_items_json)
577 .bind(&orchestrator_items_json)
578 .bind(&metadata_json)
579 .bind(&cancelled_activities_json)
580 .execute(&*self.pool)
581 .await;
582
583 match result {
584 Ok(_) => {
585 let duration_ms = start.elapsed().as_millis() as u64;
586 debug!(
587 target = "duroxide::providers::postgres",
588 operation = "ack_orchestration_item",
589 execution_id = execution_id,
590 history_count = history_delta.len(),
591 worker_items_count = worker_items.len(),
592 orchestrator_items_count = orchestrator_items.len(),
593 cancelled_activities_count = cancelled_activities.len(),
594 duration_ms = duration_ms,
595 attempts = attempt + 1,
596 "Acknowledged orchestration item via stored procedure"
597 );
598 return Ok(());
599 }
600 Err(e) => {
601 if let SqlxError::Database(db_err) = &e {
603 if db_err.message().contains("Invalid lock token") {
604 return Err(ProviderError::permanent(
605 "ack_orchestration_item",
606 "Invalid lock token",
607 ));
608 }
609 } else if e.to_string().contains("Invalid lock token") {
610 return Err(ProviderError::permanent(
611 "ack_orchestration_item",
612 "Invalid lock token",
613 ));
614 }
615
616 let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
617 if provider_err.is_retryable() && attempt < MAX_RETRIES {
618 warn!(
619 target = "duroxide::providers::postgres",
620 operation = "ack_orchestration_item",
621 attempt = attempt + 1,
622 error = %provider_err,
623 "Retryable error, will retry"
624 );
625 sleep(std::time::Duration::from_millis(
626 RETRY_DELAY_MS * (attempt as u64 + 1),
627 ))
628 .await;
629 continue;
630 }
631 return Err(provider_err);
632 }
633 }
634 }
635
636 Ok(())
638 }
639 #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
640 async fn abandon_orchestration_item(
641 &self,
642 lock_token: &str,
643 delay: Option<Duration>,
644 ignore_attempt: bool,
645 ) -> Result<(), ProviderError> {
646 let start = std::time::Instant::now();
647 let now_ms = Self::now_millis();
648 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
649
650 let instance_id = match sqlx::query_scalar::<_, String>(&format!(
651 "SELECT {}.abandon_orchestration_item($1, $2, $3, $4)",
652 self.schema_name
653 ))
654 .bind(lock_token)
655 .bind(now_ms)
656 .bind(delay_param)
657 .bind(ignore_attempt)
658 .fetch_one(&*self.pool)
659 .await
660 {
661 Ok(instance_id) => instance_id,
662 Err(e) => {
663 if let SqlxError::Database(db_err) = &e {
664 if db_err.message().contains("Invalid lock token") {
665 return Err(ProviderError::permanent(
666 "abandon_orchestration_item",
667 "Invalid lock token",
668 ));
669 }
670 } else if e.to_string().contains("Invalid lock token") {
671 return Err(ProviderError::permanent(
672 "abandon_orchestration_item",
673 "Invalid lock token",
674 ));
675 }
676
677 return Err(Self::sqlx_to_provider_error(
678 "abandon_orchestration_item",
679 e,
680 ));
681 }
682 };
683
684 let duration_ms = start.elapsed().as_millis() as u64;
685 debug!(
686 target = "duroxide::providers::postgres",
687 operation = "abandon_orchestration_item",
688 instance_id = %instance_id,
689 delay_ms = delay.map(|d| d.as_millis() as u64),
690 ignore_attempt = ignore_attempt,
691 duration_ms = duration_ms,
692 "Abandoned orchestration item via stored procedure"
693 );
694
695 Ok(())
696 }
697
698 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
699 async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
700 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
701 "SELECT out_event_data FROM {}.fetch_history($1)",
702 self.schema_name
703 ))
704 .bind(instance)
705 .fetch_all(&*self.pool)
706 .await
707 .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
708
709 event_data_rows
710 .into_iter()
711 .map(|event_data| {
712 serde_json::from_str::<Event>(&event_data).map_err(|e| {
713 ProviderError::permanent("read", format!("Failed to deserialize event: {e}"))
714 })
715 })
716 .collect()
717 }
718
719 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
720 async fn append_with_execution(
721 &self,
722 instance: &str,
723 execution_id: u64,
724 new_events: Vec<Event>,
725 ) -> Result<(), ProviderError> {
726 if new_events.is_empty() {
727 return Ok(());
728 }
729
730 let mut events_payload = Vec::with_capacity(new_events.len());
731 for event in &new_events {
732 if event.event_id() == 0 {
733 error!(
734 target = "duroxide::providers::postgres",
735 operation = "append_with_execution",
736 error_type = "validation_error",
737 instance_id = %instance,
738 execution_id = execution_id,
739 "event_id must be set by runtime"
740 );
741 return Err(ProviderError::permanent(
742 "append_with_execution",
743 "event_id must be set by runtime",
744 ));
745 }
746
747 let event_json = serde_json::to_string(event).map_err(|e| {
748 ProviderError::permanent(
749 "append_with_execution",
750 format!("Failed to serialize event: {e}"),
751 )
752 })?;
753
754 let event_type = format!("{event:?}")
755 .split('{')
756 .next()
757 .unwrap_or("Unknown")
758 .trim()
759 .to_string();
760
761 events_payload.push(serde_json::json!({
762 "event_id": event.event_id(),
763 "event_type": event_type,
764 "event_data": event_json,
765 }));
766 }
767
768 let events_json = serde_json::Value::Array(events_payload);
769
770 sqlx::query(&format!(
771 "SELECT {}.append_history($1, $2, $3)",
772 self.schema_name
773 ))
774 .bind(instance)
775 .bind(execution_id as i64)
776 .bind(events_json)
777 .execute(&*self.pool)
778 .await
779 .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
780
781 debug!(
782 target = "duroxide::providers::postgres",
783 operation = "append_with_execution",
784 instance_id = %instance,
785 execution_id = execution_id,
786 event_count = new_events.len(),
787 "Appended history events via stored procedure"
788 );
789
790 Ok(())
791 }
792
793 #[instrument(skip(self), target = "duroxide::providers::postgres")]
794 async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
795 let work_item = serde_json::to_string(&item).map_err(|e| {
796 ProviderError::permanent(
797 "enqueue_worker_work",
798 format!("Failed to serialize work item: {e}"),
799 )
800 })?;
801
802 let now_ms = Self::now_millis();
803
804 let (instance_id, execution_id, activity_id, session_id, tag) = match &item {
806 WorkItem::ActivityExecute {
807 instance,
808 execution_id,
809 id,
810 session_id,
811 tag,
812 ..
813 } => (
814 Some(instance.clone()),
815 Some(*execution_id as i64),
816 Some(*id as i64),
817 session_id.clone(),
818 tag.clone(),
819 ),
820 _ => (None, None, None, None, None),
821 };
822
823 sqlx::query(&format!(
824 "SELECT {}.enqueue_worker_work($1, $2, $3, $4, $5, $6, $7)",
825 self.schema_name
826 ))
827 .bind(work_item)
828 .bind(now_ms)
829 .bind(&instance_id)
830 .bind(execution_id)
831 .bind(activity_id)
832 .bind(&session_id)
833 .bind(&tag)
834 .execute(&*self.pool)
835 .await
836 .map_err(|e| {
837 error!(
838 target = "duroxide::providers::postgres",
839 operation = "enqueue_worker_work",
840 error_type = "database_error",
841 error = %e,
842 "Failed to enqueue worker work"
843 );
844 Self::sqlx_to_provider_error("enqueue_worker_work", e)
845 })?;
846
847 Ok(())
848 }
849
850 #[instrument(skip(self), target = "duroxide::providers::postgres")]
851 async fn fetch_work_item(
852 &self,
853 lock_timeout: Duration,
854 _poll_timeout: Duration,
855 session: Option<&SessionFetchConfig>,
856 tag_filter: &TagFilter,
857 ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
858 if matches!(tag_filter, TagFilter::None) {
860 return Ok(None);
861 }
862
863 let start = std::time::Instant::now();
864
865 let lock_timeout_ms = lock_timeout.as_millis() as i64;
867
868 let (owner_id, session_lock_timeout_ms): (Option<&str>, Option<i64>) = match session {
870 Some(config) => (
871 Some(&config.owner_id),
872 Some(config.lock_timeout.as_millis() as i64),
873 ),
874 None => (None, None),
875 };
876
877 let (tag_mode, tag_names) = Self::tag_filter_to_sql(tag_filter);
879
880 let row = match sqlx::query_as::<_, (String, String, i32)>(&format!(
881 "SELECT * FROM {}.fetch_work_item($1, $2, $3, $4, $5, $6)",
882 self.schema_name
883 ))
884 .bind(Self::now_millis())
885 .bind(lock_timeout_ms)
886 .bind(owner_id)
887 .bind(session_lock_timeout_ms)
888 .bind(&tag_names)
889 .bind(tag_mode)
890 .fetch_optional(&*self.pool)
891 .await
892 {
893 Ok(row) => row,
894 Err(e) => {
895 return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
896 }
897 };
898
899 let (work_item_json, lock_token, attempt_count) = match row {
900 Some(row) => row,
901 None => return Ok(None),
902 };
903
904 let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
905 ProviderError::permanent(
906 "fetch_work_item",
907 format!("Failed to deserialize worker item: {e}"),
908 )
909 })?;
910
911 let duration_ms = start.elapsed().as_millis() as u64;
912
913 let instance_id = match &work_item {
915 WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
916 WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
917 WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
918 WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
919 WorkItem::TimerFired { instance, .. } => instance.as_str(),
920 WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
921 WorkItem::CancelInstance { instance, .. } => instance.as_str(),
922 WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
923 WorkItem::SubOrchCompleted {
924 parent_instance, ..
925 } => parent_instance.as_str(),
926 WorkItem::SubOrchFailed {
927 parent_instance, ..
928 } => parent_instance.as_str(),
929 WorkItem::QueueMessage { instance, .. } => instance.as_str(),
930 };
931
932 debug!(
933 target = "duroxide::providers::postgres",
934 operation = "fetch_work_item",
935 instance_id = %instance_id,
936 attempt_count = attempt_count,
937 duration_ms = duration_ms,
938 "Fetched activity work item via stored procedure"
939 );
940
941 Ok(Some((work_item, lock_token, attempt_count as u32)))
942 }
943
944 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
945 async fn ack_work_item(
946 &self,
947 token: &str,
948 completion: Option<WorkItem>,
949 ) -> Result<(), ProviderError> {
950 let start = std::time::Instant::now();
951
952 let Some(completion) = completion else {
954 let now_ms = Self::now_millis();
955 sqlx::query(&format!(
957 "SELECT {}.ack_worker($1, NULL, NULL, $2)",
958 self.schema_name
959 ))
960 .bind(token)
961 .bind(now_ms)
962 .execute(&*self.pool)
963 .await
964 .map_err(|e| {
965 if e.to_string().contains("Worker queue item not found") {
966 ProviderError::permanent(
967 "ack_worker",
968 "Worker queue item not found or already processed",
969 )
970 } else {
971 Self::sqlx_to_provider_error("ack_worker", e)
972 }
973 })?;
974
975 let duration_ms = start.elapsed().as_millis() as u64;
976 debug!(
977 target = "duroxide::providers::postgres",
978 operation = "ack_worker",
979 token = %token,
980 duration_ms = duration_ms,
981 "Acknowledged worker without completion (cancelled)"
982 );
983 return Ok(());
984 };
985
986 let instance_id = match &completion {
988 WorkItem::ActivityCompleted { instance, .. }
989 | WorkItem::ActivityFailed { instance, .. } => instance,
990 _ => {
991 error!(
992 target = "duroxide::providers::postgres",
993 operation = "ack_worker",
994 error_type = "invalid_completion_type",
995 "Invalid completion work item type"
996 );
997 return Err(ProviderError::permanent(
998 "ack_worker",
999 "Invalid completion work item type",
1000 ));
1001 }
1002 };
1003
1004 let completion_json = serde_json::to_string(&completion).map_err(|e| {
1005 ProviderError::permanent("ack_worker", format!("Failed to serialize completion: {e}"))
1006 })?;
1007
1008 let now_ms = Self::now_millis();
1009
1010 sqlx::query(&format!(
1012 "SELECT {}.ack_worker($1, $2, $3, $4)",
1013 self.schema_name
1014 ))
1015 .bind(token)
1016 .bind(instance_id)
1017 .bind(completion_json)
1018 .bind(now_ms)
1019 .execute(&*self.pool)
1020 .await
1021 .map_err(|e| {
1022 if e.to_string().contains("Worker queue item not found") {
1023 error!(
1024 target = "duroxide::providers::postgres",
1025 operation = "ack_worker",
1026 error_type = "worker_item_not_found",
1027 token = %token,
1028 "Worker queue item not found or already processed"
1029 );
1030 ProviderError::permanent(
1031 "ack_worker",
1032 "Worker queue item not found or already processed",
1033 )
1034 } else {
1035 Self::sqlx_to_provider_error("ack_worker", e)
1036 }
1037 })?;
1038
1039 let duration_ms = start.elapsed().as_millis() as u64;
1040 debug!(
1041 target = "duroxide::providers::postgres",
1042 operation = "ack_worker",
1043 instance_id = %instance_id,
1044 duration_ms = duration_ms,
1045 "Acknowledged worker and enqueued completion"
1046 );
1047
1048 Ok(())
1049 }
1050
1051 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1052 async fn renew_work_item_lock(
1053 &self,
1054 token: &str,
1055 extend_for: Duration,
1056 ) -> Result<(), ProviderError> {
1057 let start = std::time::Instant::now();
1058
1059 let now_ms = Self::now_millis();
1061
1062 let extend_secs = extend_for.as_secs() as i64;
1064
1065 match sqlx::query(&format!(
1066 "SELECT {}.renew_work_item_lock($1, $2, $3)",
1067 self.schema_name
1068 ))
1069 .bind(token)
1070 .bind(now_ms)
1071 .bind(extend_secs)
1072 .execute(&*self.pool)
1073 .await
1074 {
1075 Ok(_) => {
1076 let duration_ms = start.elapsed().as_millis() as u64;
1077 debug!(
1078 target = "duroxide::providers::postgres",
1079 operation = "renew_work_item_lock",
1080 token = %token,
1081 extend_for_secs = extend_secs,
1082 duration_ms = duration_ms,
1083 "Work item lock renewed successfully"
1084 );
1085 Ok(())
1086 }
1087 Err(e) => {
1088 if let SqlxError::Database(db_err) = &e {
1089 if db_err.message().contains("Lock token invalid") {
1090 return Err(ProviderError::permanent(
1091 "renew_work_item_lock",
1092 "Lock token invalid, expired, or already acked",
1093 ));
1094 }
1095 } else if e.to_string().contains("Lock token invalid") {
1096 return Err(ProviderError::permanent(
1097 "renew_work_item_lock",
1098 "Lock token invalid, expired, or already acked",
1099 ));
1100 }
1101
1102 Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
1103 }
1104 }
1105 }
1106
1107 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1108 async fn abandon_work_item(
1109 &self,
1110 token: &str,
1111 delay: Option<Duration>,
1112 ignore_attempt: bool,
1113 ) -> Result<(), ProviderError> {
1114 let start = std::time::Instant::now();
1115 let now_ms = Self::now_millis();
1116 let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
1117
1118 match sqlx::query(&format!(
1119 "SELECT {}.abandon_work_item($1, $2, $3, $4)",
1120 self.schema_name
1121 ))
1122 .bind(token)
1123 .bind(now_ms)
1124 .bind(delay_param)
1125 .bind(ignore_attempt)
1126 .execute(&*self.pool)
1127 .await
1128 {
1129 Ok(_) => {
1130 let duration_ms = start.elapsed().as_millis() as u64;
1131 debug!(
1132 target = "duroxide::providers::postgres",
1133 operation = "abandon_work_item",
1134 token = %token,
1135 delay_ms = delay.map(|d| d.as_millis() as u64),
1136 ignore_attempt = ignore_attempt,
1137 duration_ms = duration_ms,
1138 "Abandoned work item via stored procedure"
1139 );
1140 Ok(())
1141 }
1142 Err(e) => {
1143 if let SqlxError::Database(db_err) = &e {
1144 if db_err.message().contains("Invalid lock token")
1145 || db_err.message().contains("already acked")
1146 {
1147 return Err(ProviderError::permanent(
1148 "abandon_work_item",
1149 "Invalid lock token or already acked",
1150 ));
1151 }
1152 } else if e.to_string().contains("Invalid lock token")
1153 || e.to_string().contains("already acked")
1154 {
1155 return Err(ProviderError::permanent(
1156 "abandon_work_item",
1157 "Invalid lock token or already acked",
1158 ));
1159 }
1160
1161 Err(Self::sqlx_to_provider_error("abandon_work_item", e))
1162 }
1163 }
1164 }
1165
1166 #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1167 async fn renew_orchestration_item_lock(
1168 &self,
1169 token: &str,
1170 extend_for: Duration,
1171 ) -> Result<(), ProviderError> {
1172 let start = std::time::Instant::now();
1173
1174 let now_ms = Self::now_millis();
1176
1177 let extend_secs = extend_for.as_secs() as i64;
1179
1180 match sqlx::query(&format!(
1181 "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
1182 self.schema_name
1183 ))
1184 .bind(token)
1185 .bind(now_ms)
1186 .bind(extend_secs)
1187 .execute(&*self.pool)
1188 .await
1189 {
1190 Ok(_) => {
1191 let duration_ms = start.elapsed().as_millis() as u64;
1192 debug!(
1193 target = "duroxide::providers::postgres",
1194 operation = "renew_orchestration_item_lock",
1195 token = %token,
1196 extend_for_secs = extend_secs,
1197 duration_ms = duration_ms,
1198 "Orchestration item lock renewed successfully"
1199 );
1200 Ok(())
1201 }
1202 Err(e) => {
1203 if let SqlxError::Database(db_err) = &e {
1204 if db_err.message().contains("Lock token invalid")
1205 || db_err.message().contains("expired")
1206 || db_err.message().contains("already released")
1207 {
1208 return Err(ProviderError::permanent(
1209 "renew_orchestration_item_lock",
1210 "Lock token invalid, expired, or already released",
1211 ));
1212 }
1213 } else if e.to_string().contains("Lock token invalid")
1214 || e.to_string().contains("expired")
1215 || e.to_string().contains("already released")
1216 {
1217 return Err(ProviderError::permanent(
1218 "renew_orchestration_item_lock",
1219 "Lock token invalid, expired, or already released",
1220 ));
1221 }
1222
1223 Err(Self::sqlx_to_provider_error(
1224 "renew_orchestration_item_lock",
1225 e,
1226 ))
1227 }
1228 }
1229 }
1230
1231 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1232 async fn enqueue_for_orchestrator(
1233 &self,
1234 item: WorkItem,
1235 delay: Option<Duration>,
1236 ) -> Result<(), ProviderError> {
1237 let work_item = serde_json::to_string(&item).map_err(|e| {
1238 ProviderError::permanent(
1239 "enqueue_orchestrator_work",
1240 format!("Failed to serialize work item: {e}"),
1241 )
1242 })?;
1243
1244 let instance_id = match &item {
1246 WorkItem::StartOrchestration { instance, .. }
1247 | WorkItem::ActivityCompleted { instance, .. }
1248 | WorkItem::ActivityFailed { instance, .. }
1249 | WorkItem::TimerFired { instance, .. }
1250 | WorkItem::ExternalRaised { instance, .. }
1251 | WorkItem::CancelInstance { instance, .. }
1252 | WorkItem::ContinueAsNew { instance, .. }
1253 | WorkItem::QueueMessage { instance, .. } => instance,
1254 WorkItem::SubOrchCompleted {
1255 parent_instance, ..
1256 }
1257 | WorkItem::SubOrchFailed {
1258 parent_instance, ..
1259 } => parent_instance,
1260 WorkItem::ActivityExecute { .. } => {
1261 return Err(ProviderError::permanent(
1262 "enqueue_orchestrator_work",
1263 "ActivityExecute should go to worker queue, not orchestrator queue",
1264 ));
1265 }
1266 };
1267
1268 let now_ms = Self::now_millis();
1270
1271 let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
1272 if *fire_at_ms > 0 {
1273 if let Some(delay) = delay {
1275 std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
1276 } else {
1277 *fire_at_ms
1278 }
1279 } else {
1280 delay
1282 .map(|d| now_ms as u64 + d.as_millis() as u64)
1283 .unwrap_or(now_ms as u64)
1284 }
1285 } else {
1286 delay
1288 .map(|d| now_ms as u64 + d.as_millis() as u64)
1289 .unwrap_or(now_ms as u64)
1290 };
1291
1292 let visible_at = Utc
1293 .timestamp_millis_opt(visible_at_ms as i64)
1294 .single()
1295 .ok_or_else(|| {
1296 ProviderError::permanent(
1297 "enqueue_orchestrator_work",
1298 "Invalid visible_at timestamp",
1299 )
1300 })?;
1301
1302 sqlx::query(&format!(
1307 "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
1308 self.schema_name
1309 ))
1310 .bind(instance_id)
1311 .bind(&work_item)
1312 .bind(visible_at)
1313 .bind::<Option<String>>(None) .bind::<Option<String>>(None) .bind::<Option<i64>>(None) .execute(&*self.pool)
1317 .await
1318 .map_err(|e| {
1319 error!(
1320 target = "duroxide::providers::postgres",
1321 operation = "enqueue_orchestrator_work",
1322 error_type = "database_error",
1323 error = %e,
1324 instance_id = %instance_id,
1325 "Failed to enqueue orchestrator work"
1326 );
1327 Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1328 })?;
1329
1330 debug!(
1331 target = "duroxide::providers::postgres",
1332 operation = "enqueue_orchestrator_work",
1333 instance_id = %instance_id,
1334 delay_ms = delay.map(|d| d.as_millis() as u64),
1335 "Enqueued orchestrator work"
1336 );
1337
1338 Ok(())
1339 }
1340
1341 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1342 async fn read_with_execution(
1343 &self,
1344 instance: &str,
1345 execution_id: u64,
1346 ) -> Result<Vec<Event>, ProviderError> {
1347 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1348 "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1349 self.table_name("history")
1350 ))
1351 .bind(instance)
1352 .bind(execution_id as i64)
1353 .fetch_all(&*self.pool)
1354 .await
1355 .map_err(|e| Self::sqlx_to_provider_error("read_with_execution", e))?;
1356
1357 event_data_rows
1358 .into_iter()
1359 .map(|event_data| {
1360 serde_json::from_str::<Event>(&event_data).map_err(|e| {
1361 ProviderError::permanent(
1362 "read_with_execution",
1363 format!("Failed to deserialize event: {e}"),
1364 )
1365 })
1366 })
1367 .collect()
1368 }
1369
1370 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1371 async fn renew_session_lock(
1372 &self,
1373 owner_ids: &[&str],
1374 extend_for: Duration,
1375 idle_timeout: Duration,
1376 ) -> Result<usize, ProviderError> {
1377 if owner_ids.is_empty() {
1378 return Ok(0);
1379 }
1380
1381 let now_ms = Self::now_millis();
1382 let extend_ms = extend_for.as_millis() as i64;
1383 let idle_timeout_ms = idle_timeout.as_millis() as i64;
1384 let owner_ids_vec: Vec<&str> = owner_ids.to_vec();
1385
1386 let result = sqlx::query_scalar::<_, i64>(&format!(
1387 "SELECT {}.renew_session_lock($1, $2, $3, $4)",
1388 self.schema_name
1389 ))
1390 .bind(&owner_ids_vec)
1391 .bind(now_ms)
1392 .bind(extend_ms)
1393 .bind(idle_timeout_ms)
1394 .fetch_one(&*self.pool)
1395 .await
1396 .map_err(|e| Self::sqlx_to_provider_error("renew_session_lock", e))?;
1397
1398 debug!(
1399 target = "duroxide::providers::postgres",
1400 operation = "renew_session_lock",
1401 owner_count = owner_ids.len(),
1402 sessions_renewed = result,
1403 "Session locks renewed"
1404 );
1405
1406 Ok(result as usize)
1407 }
1408
1409 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1410 async fn cleanup_orphaned_sessions(
1411 &self,
1412 _idle_timeout: Duration,
1413 ) -> Result<usize, ProviderError> {
1414 let now_ms = Self::now_millis();
1415
1416 let result = sqlx::query_scalar::<_, i64>(&format!(
1417 "SELECT {}.cleanup_orphaned_sessions($1)",
1418 self.schema_name
1419 ))
1420 .bind(now_ms)
1421 .fetch_one(&*self.pool)
1422 .await
1423 .map_err(|e| Self::sqlx_to_provider_error("cleanup_orphaned_sessions", e))?;
1424
1425 debug!(
1426 target = "duroxide::providers::postgres",
1427 operation = "cleanup_orphaned_sessions",
1428 sessions_cleaned = result,
1429 "Orphaned sessions cleaned up"
1430 );
1431
1432 Ok(result as usize)
1433 }
1434
1435 fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1436 Some(self)
1437 }
1438
1439 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1440 async fn get_custom_status(
1441 &self,
1442 instance: &str,
1443 last_seen_version: u64,
1444 ) -> Result<Option<(Option<String>, u64)>, ProviderError> {
1445 let row = sqlx::query_as::<_, (Option<String>, i64)>(&format!(
1446 "SELECT * FROM {}.get_custom_status($1, $2)",
1447 self.schema_name
1448 ))
1449 .bind(instance)
1450 .bind(last_seen_version as i64)
1451 .fetch_optional(&*self.pool)
1452 .await
1453 .map_err(|e| Self::sqlx_to_provider_error("get_custom_status", e))?;
1454
1455 match row {
1456 Some((custom_status, version)) => Ok(Some((custom_status, version as u64))),
1457 None => Ok(None),
1458 }
1459 }
1460
1461 async fn get_kv_value(
1462 &self,
1463 instance_id: &str,
1464 key: &str,
1465 ) -> Result<Option<String>, ProviderError> {
1466 let row: Option<(Option<String>, bool)> = sqlx::query_as(&format!(
1467 "SELECT * FROM {}.get_kv_value($1, $2)",
1468 self.schema_name
1469 ))
1470 .bind(instance_id)
1471 .bind(key)
1472 .fetch_optional(&*self.pool)
1473 .await
1474 .map_err(|e| Self::sqlx_to_provider_error("get_kv_value", e))?;
1475
1476 Ok(row.and_then(|(value, found)| if found { value } else { None }))
1477 }
1478
1479 async fn get_kv_all_values(
1480 &self,
1481 instance_id: &str,
1482 ) -> Result<std::collections::HashMap<String, String>, ProviderError> {
1483 let rows: Vec<(String, String)> = sqlx::query_as(&format!(
1484 "SELECT * FROM {}.get_kv_all_values($1)",
1485 self.schema_name
1486 ))
1487 .bind(instance_id)
1488 .fetch_all(&*self.pool)
1489 .await
1490 .map_err(|e| Self::sqlx_to_provider_error("get_kv_all_values", e))?;
1491
1492 Ok(rows.into_iter().collect())
1493 }
1494
1495 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1496 async fn get_instance_stats(&self, instance: &str) -> Result<Option<SystemStats>, ProviderError> {
1497 let row: Option<(bool, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1498 "SELECT * FROM {}.get_instance_stats($1)",
1499 self.schema_name
1500 ))
1501 .bind(instance)
1502 .fetch_optional(&*self.pool)
1503 .await
1504 .map_err(|e| Self::sqlx_to_provider_error("get_instance_stats", e))?;
1505
1506 match row {
1507 Some((true, history_event_count, history_size_bytes, queue_pending_count, kv_user_key_count, kv_total_value_bytes)) => {
1508 Ok(Some(SystemStats {
1509 history_event_count: history_event_count as u64,
1510 history_size_bytes: history_size_bytes as u64,
1511 queue_pending_count: queue_pending_count as u64,
1512 kv_user_key_count: kv_user_key_count as u64,
1513 kv_total_value_bytes: kv_total_value_bytes as u64,
1514 }))
1515 }
1516 _ => Ok(None),
1517 }
1518 }
1519}
1520
1521#[async_trait::async_trait]
1522impl ProviderAdmin for PostgresProvider {
1523 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1524 async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1525 sqlx::query_scalar(&format!(
1526 "SELECT instance_id FROM {}.list_instances()",
1527 self.schema_name
1528 ))
1529 .fetch_all(&*self.pool)
1530 .await
1531 .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1532 }
1533
1534 #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1535 async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1536 sqlx::query_scalar(&format!(
1537 "SELECT instance_id FROM {}.list_instances_by_status($1)",
1538 self.schema_name
1539 ))
1540 .bind(status)
1541 .fetch_all(&*self.pool)
1542 .await
1543 .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1544 }
1545
1546 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1547 async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1548 let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1549 "SELECT execution_id FROM {}.list_executions($1)",
1550 self.schema_name
1551 ))
1552 .bind(instance)
1553 .fetch_all(&*self.pool)
1554 .await
1555 .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1556
1557 Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1558 }
1559
1560 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1561 async fn read_history_with_execution_id(
1562 &self,
1563 instance: &str,
1564 execution_id: u64,
1565 ) -> Result<Vec<Event>, ProviderError> {
1566 let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1567 "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1568 self.schema_name
1569 ))
1570 .bind(instance)
1571 .bind(execution_id as i64)
1572 .fetch_all(&*self.pool)
1573 .await
1574 .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1575
1576 event_data_rows
1577 .into_iter()
1578 .map(|event_data| {
1579 serde_json::from_str::<Event>(&event_data).map_err(|e| {
1580 ProviderError::permanent(
1581 "read_history_with_execution_id",
1582 format!("Failed to deserialize event: {e}"),
1583 )
1584 })
1585 })
1586 .collect()
1587 }
1588
1589 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1590 async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1591 let execution_id = self.latest_execution_id(instance).await?;
1592 self.read_history_with_execution_id(instance, execution_id)
1593 .await
1594 }
1595
1596 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1597 async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1598 sqlx::query_scalar(&format!(
1599 "SELECT {}.latest_execution_id($1)",
1600 self.schema_name
1601 ))
1602 .bind(instance)
1603 .fetch_optional(&*self.pool)
1604 .await
1605 .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1606 .map(|id: i64| id as u64)
1607 .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1608 }
1609
1610 #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1611 async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1612 let row: Option<(
1613 String,
1614 String,
1615 String,
1616 i64,
1617 chrono::DateTime<Utc>,
1618 Option<chrono::DateTime<Utc>>,
1619 Option<String>,
1620 Option<String>,
1621 Option<String>,
1622 )> = sqlx::query_as(&format!(
1623 "SELECT * FROM {}.get_instance_info($1)",
1624 self.schema_name
1625 ))
1626 .bind(instance)
1627 .fetch_optional(&*self.pool)
1628 .await
1629 .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1630
1631 let (
1632 instance_id,
1633 orchestration_name,
1634 orchestration_version,
1635 current_execution_id,
1636 created_at,
1637 updated_at,
1638 status,
1639 output,
1640 parent_instance_id,
1641 ) =
1642 row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1643
1644 Ok(InstanceInfo {
1645 instance_id,
1646 orchestration_name,
1647 orchestration_version,
1648 current_execution_id: current_execution_id as u64,
1649 status: status.unwrap_or_else(|| "Running".to_string()),
1650 output,
1651 created_at: created_at.timestamp_millis() as u64,
1652 updated_at: updated_at
1653 .map(|dt| dt.timestamp_millis() as u64)
1654 .unwrap_or(created_at.timestamp_millis() as u64),
1655 parent_instance_id,
1656 })
1657 }
1658
1659 #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1660 async fn get_execution_info(
1661 &self,
1662 instance: &str,
1663 execution_id: u64,
1664 ) -> Result<ExecutionInfo, ProviderError> {
1665 let row: Option<(
1666 i64,
1667 String,
1668 Option<String>,
1669 chrono::DateTime<Utc>,
1670 Option<chrono::DateTime<Utc>>,
1671 i64,
1672 )> = sqlx::query_as(&format!(
1673 "SELECT * FROM {}.get_execution_info($1, $2)",
1674 self.schema_name
1675 ))
1676 .bind(instance)
1677 .bind(execution_id as i64)
1678 .fetch_optional(&*self.pool)
1679 .await
1680 .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1681
1682 let (exec_id, status, output, started_at, completed_at, event_count) = row
1683 .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1684
1685 Ok(ExecutionInfo {
1686 execution_id: exec_id as u64,
1687 status,
1688 output,
1689 started_at: started_at.timestamp_millis() as u64,
1690 completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1691 event_count: event_count as usize,
1692 })
1693 }
1694
1695 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1696 async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1697 let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1698 "SELECT * FROM {}.get_system_metrics()",
1699 self.schema_name
1700 ))
1701 .fetch_optional(&*self.pool)
1702 .await
1703 .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1704
1705 let (
1706 total_instances,
1707 total_executions,
1708 running_instances,
1709 completed_instances,
1710 failed_instances,
1711 total_events,
1712 ) = row.ok_or_else(|| {
1713 ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1714 })?;
1715
1716 Ok(SystemMetrics {
1717 total_instances: total_instances as u64,
1718 total_executions: total_executions as u64,
1719 running_instances: running_instances as u64,
1720 completed_instances: completed_instances as u64,
1721 failed_instances: failed_instances as u64,
1722 total_events: total_events as u64,
1723 })
1724 }
1725
1726 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1727 async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1728 let now_ms = Self::now_millis();
1729
1730 let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1731 "SELECT * FROM {}.get_queue_depths($1)",
1732 self.schema_name
1733 ))
1734 .bind(now_ms)
1735 .fetch_optional(&*self.pool)
1736 .await
1737 .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1738
1739 let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1740 ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1741 })?;
1742
1743 Ok(QueueDepths {
1744 orchestrator_queue: orchestrator_queue as usize,
1745 worker_queue: worker_queue as usize,
1746 timer_queue: 0, })
1748 }
1749
1750 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1753 async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError> {
1754 sqlx::query_scalar(&format!(
1755 "SELECT child_instance_id FROM {}.list_children($1)",
1756 self.schema_name
1757 ))
1758 .bind(instance_id)
1759 .fetch_all(&*self.pool)
1760 .await
1761 .map_err(|e| Self::sqlx_to_provider_error("list_children", e))
1762 }
1763
1764 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1765 async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError> {
1766 let result: Result<Option<String>, _> =
1769 sqlx::query_scalar(&format!("SELECT {}.get_parent_id($1)", self.schema_name))
1770 .bind(instance_id)
1771 .fetch_one(&*self.pool)
1772 .await;
1773
1774 match result {
1775 Ok(parent_id) => Ok(parent_id),
1776 Err(e) => {
1777 let err_str = e.to_string();
1778 if err_str.contains("Instance not found") {
1779 Err(ProviderError::permanent(
1780 "get_parent_id",
1781 format!("Instance not found: {}", instance_id),
1782 ))
1783 } else {
1784 Err(Self::sqlx_to_provider_error("get_parent_id", e))
1785 }
1786 }
1787 }
1788 }
1789
1790 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1793 async fn delete_instances_atomic(
1794 &self,
1795 ids: &[String],
1796 force: bool,
1797 ) -> Result<DeleteInstanceResult, ProviderError> {
1798 if ids.is_empty() {
1799 return Ok(DeleteInstanceResult::default());
1800 }
1801
1802 let row: Option<(i64, i64, i64, i64)> = sqlx::query_as(&format!(
1803 "SELECT * FROM {}.delete_instances_atomic($1, $2)",
1804 self.schema_name
1805 ))
1806 .bind(ids)
1807 .bind(force)
1808 .fetch_optional(&*self.pool)
1809 .await
1810 .map_err(|e| {
1811 let err_str = e.to_string();
1812 if err_str.contains("is Running") {
1813 ProviderError::permanent("delete_instances_atomic", err_str)
1814 } else if err_str.contains("Orphan detected") {
1815 ProviderError::permanent("delete_instances_atomic", err_str)
1816 } else {
1817 Self::sqlx_to_provider_error("delete_instances_atomic", e)
1818 }
1819 })?;
1820
1821 let (instances_deleted, executions_deleted, events_deleted, queue_messages_deleted) =
1822 row.unwrap_or((0, 0, 0, 0));
1823
1824 debug!(
1825 target = "duroxide::providers::postgres",
1826 operation = "delete_instances_atomic",
1827 instances_deleted = instances_deleted,
1828 executions_deleted = executions_deleted,
1829 events_deleted = events_deleted,
1830 queue_messages_deleted = queue_messages_deleted,
1831 "Deleted instances atomically"
1832 );
1833
1834 Ok(DeleteInstanceResult {
1835 instances_deleted: instances_deleted as u64,
1836 executions_deleted: executions_deleted as u64,
1837 events_deleted: events_deleted as u64,
1838 queue_messages_deleted: queue_messages_deleted as u64,
1839 })
1840 }
1841
1842 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1843 async fn delete_instance_bulk(
1844 &self,
1845 filter: InstanceFilter,
1846 ) -> Result<DeleteInstanceResult, ProviderError> {
1847 let mut sql = format!(
1849 r#"
1850 SELECT i.instance_id
1851 FROM {}.instances i
1852 LEFT JOIN {}.executions e ON i.instance_id = e.instance_id
1853 AND i.current_execution_id = e.execution_id
1854 WHERE i.parent_instance_id IS NULL
1855 AND e.status IN ('Completed', 'Failed', 'ContinuedAsNew')
1856 "#,
1857 self.schema_name, self.schema_name
1858 );
1859
1860 if let Some(ref ids) = filter.instance_ids {
1862 if ids.is_empty() {
1863 return Ok(DeleteInstanceResult::default());
1864 }
1865 let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1866 sql.push_str(&format!(
1867 " AND i.instance_id IN ({})",
1868 placeholders.join(", ")
1869 ));
1870 }
1871
1872 if filter.completed_before.is_some() {
1874 let param_num = filter
1875 .instance_ids
1876 .as_ref()
1877 .map(|ids| ids.len())
1878 .unwrap_or(0)
1879 + 1;
1880 sql.push_str(&format!(
1881 " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1882 param_num
1883 ));
1884 }
1885
1886 let limit = filter.limit.unwrap_or(1000);
1888 let limit_param_num = filter
1889 .instance_ids
1890 .as_ref()
1891 .map(|ids| ids.len())
1892 .unwrap_or(0)
1893 + if filter.completed_before.is_some() {
1894 1
1895 } else {
1896 0
1897 }
1898 + 1;
1899 sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1900
1901 let mut query = sqlx::query_scalar::<_, String>(&sql);
1903 if let Some(ref ids) = filter.instance_ids {
1904 for id in ids {
1905 query = query.bind(id);
1906 }
1907 }
1908 if let Some(completed_before) = filter.completed_before {
1909 query = query.bind(completed_before as i64);
1910 }
1911 query = query.bind(limit as i64);
1912
1913 let instance_ids: Vec<String> = query
1914 .fetch_all(&*self.pool)
1915 .await
1916 .map_err(|e| Self::sqlx_to_provider_error("delete_instance_bulk", e))?;
1917
1918 if instance_ids.is_empty() {
1919 return Ok(DeleteInstanceResult::default());
1920 }
1921
1922 let mut result = DeleteInstanceResult::default();
1924
1925 for instance_id in &instance_ids {
1926 let tree = self.get_instance_tree(instance_id).await?;
1928
1929 let delete_result = self.delete_instances_atomic(&tree.all_ids, true).await?;
1931 result.instances_deleted += delete_result.instances_deleted;
1932 result.executions_deleted += delete_result.executions_deleted;
1933 result.events_deleted += delete_result.events_deleted;
1934 result.queue_messages_deleted += delete_result.queue_messages_deleted;
1935 }
1936
1937 debug!(
1938 target = "duroxide::providers::postgres",
1939 operation = "delete_instance_bulk",
1940 instances_deleted = result.instances_deleted,
1941 executions_deleted = result.executions_deleted,
1942 events_deleted = result.events_deleted,
1943 queue_messages_deleted = result.queue_messages_deleted,
1944 "Bulk deleted instances"
1945 );
1946
1947 Ok(result)
1948 }
1949
1950 #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1953 async fn prune_executions(
1954 &self,
1955 instance_id: &str,
1956 options: PruneOptions,
1957 ) -> Result<PruneResult, ProviderError> {
1958 let keep_last: Option<i32> = options.keep_last.map(|v| v as i32);
1959 let completed_before_ms: Option<i64> = options.completed_before.map(|v| v as i64);
1960
1961 let row: Option<(i64, i64, i64)> = sqlx::query_as(&format!(
1962 "SELECT * FROM {}.prune_executions($1, $2, $3)",
1963 self.schema_name
1964 ))
1965 .bind(instance_id)
1966 .bind(keep_last)
1967 .bind(completed_before_ms)
1968 .fetch_optional(&*self.pool)
1969 .await
1970 .map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?;
1971
1972 let (instances_processed, executions_deleted, events_deleted) = row.unwrap_or((0, 0, 0));
1973
1974 debug!(
1975 target = "duroxide::providers::postgres",
1976 operation = "prune_executions",
1977 instance_id = %instance_id,
1978 instances_processed = instances_processed,
1979 executions_deleted = executions_deleted,
1980 events_deleted = events_deleted,
1981 "Pruned executions"
1982 );
1983
1984 Ok(PruneResult {
1985 instances_processed: instances_processed as u64,
1986 executions_deleted: executions_deleted as u64,
1987 events_deleted: events_deleted as u64,
1988 })
1989 }
1990
1991 #[instrument(skip(self), target = "duroxide::providers::postgres")]
1992 async fn prune_executions_bulk(
1993 &self,
1994 filter: InstanceFilter,
1995 options: PruneOptions,
1996 ) -> Result<PruneResult, ProviderError> {
1997 let mut sql = format!(
2002 r#"
2003 SELECT i.instance_id
2004 FROM {}.instances i
2005 LEFT JOIN {}.executions e ON i.instance_id = e.instance_id
2006 AND i.current_execution_id = e.execution_id
2007 WHERE 1=1
2008 "#,
2009 self.schema_name, self.schema_name
2010 );
2011
2012 if let Some(ref ids) = filter.instance_ids {
2014 if ids.is_empty() {
2015 return Ok(PruneResult::default());
2016 }
2017 let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
2018 sql.push_str(&format!(
2019 " AND i.instance_id IN ({})",
2020 placeholders.join(", ")
2021 ));
2022 }
2023
2024 if filter.completed_before.is_some() {
2026 let param_num = filter
2027 .instance_ids
2028 .as_ref()
2029 .map(|ids| ids.len())
2030 .unwrap_or(0)
2031 + 1;
2032 sql.push_str(&format!(
2033 " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
2034 param_num
2035 ));
2036 }
2037
2038 let limit = filter.limit.unwrap_or(1000);
2040 let limit_param_num = filter
2041 .instance_ids
2042 .as_ref()
2043 .map(|ids| ids.len())
2044 .unwrap_or(0)
2045 + if filter.completed_before.is_some() {
2046 1
2047 } else {
2048 0
2049 }
2050 + 1;
2051 sql.push_str(&format!(" LIMIT ${}", limit_param_num));
2052
2053 let mut query = sqlx::query_scalar::<_, String>(&sql);
2055 if let Some(ref ids) = filter.instance_ids {
2056 for id in ids {
2057 query = query.bind(id);
2058 }
2059 }
2060 if let Some(completed_before) = filter.completed_before {
2061 query = query.bind(completed_before as i64);
2062 }
2063 query = query.bind(limit as i64);
2064
2065 let instance_ids: Vec<String> = query
2066 .fetch_all(&*self.pool)
2067 .await
2068 .map_err(|e| Self::sqlx_to_provider_error("prune_executions_bulk", e))?;
2069
2070 let mut result = PruneResult::default();
2072
2073 for instance_id in &instance_ids {
2074 let single_result = self.prune_executions(instance_id, options.clone()).await?;
2075 result.instances_processed += single_result.instances_processed;
2076 result.executions_deleted += single_result.executions_deleted;
2077 result.events_deleted += single_result.events_deleted;
2078 }
2079
2080 debug!(
2081 target = "duroxide::providers::postgres",
2082 operation = "prune_executions_bulk",
2083 instances_processed = result.instances_processed,
2084 executions_deleted = result.executions_deleted,
2085 events_deleted = result.events_deleted,
2086 "Bulk pruned executions"
2087 );
2088
2089 Ok(result)
2090 }
2091}