Skip to main content

duroxide_pg/
provider.rs

1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4    DeleteInstanceResult, DispatcherCapabilityFilter, ExecutionInfo, ExecutionMetadata,
5    InstanceFilter, InstanceInfo, OrchestrationItem, Provider, ProviderAdmin, ProviderError,
6    PruneOptions, PruneResult, QueueDepths, ScheduledActivityIdentifier, SessionFetchConfig,
7    SystemMetrics, WorkItem,
8};
9use duroxide::{Event, EventKind};
10use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
11use std::sync::Arc;
12use std::time::Duration;
13use std::time::{SystemTime, UNIX_EPOCH};
14use tokio::time::sleep;
15use tracing::{debug, error, instrument, warn};
16
17use crate::migrations::MigrationRunner;
18
19/// PostgreSQL-based provider for Duroxide durable orchestrations.
20///
21/// Implements the [`Provider`] and [`ProviderAdmin`] traits from Duroxide,
22/// storing orchestration state, history, and work queues in PostgreSQL.
23///
24/// # Example
25///
26/// ```rust,no_run
27/// use duroxide_pg::PostgresProvider;
28///
29/// # async fn example() -> anyhow::Result<()> {
30/// // Connect using DATABASE_URL or explicit connection string
31/// let provider = PostgresProvider::new("postgres://localhost/mydb").await?;
32///
33/// // Or use a custom schema for isolation
34/// let provider = PostgresProvider::new_with_schema(
35///     "postgres://localhost/mydb",
36///     Some("my_app"),
37/// ).await?;
38/// # Ok(())
39/// # }
40/// ```
41pub struct PostgresProvider {
42    pool: Arc<PgPool>,
43    schema_name: String,
44}
45
46impl PostgresProvider {
47    pub async fn new(database_url: &str) -> Result<Self> {
48        Self::new_with_schema(database_url, None).await
49    }
50
51    pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
52        let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
53            .ok()
54            .and_then(|s| s.parse::<u32>().ok())
55            .unwrap_or(10);
56
57        let pool = PgPoolOptions::new()
58            .max_connections(max_connections)
59            .min_connections(1)
60            .acquire_timeout(std::time::Duration::from_secs(30))
61            .connect(database_url)
62            .await?;
63
64        let schema_name = schema_name.unwrap_or("public").to_string();
65
66        let provider = Self {
67            pool: Arc::new(pool),
68            schema_name: schema_name.clone(),
69        };
70
71        // Run migrations to initialize schema
72        let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
73        migration_runner.migrate().await?;
74
75        Ok(provider)
76    }
77
78    #[instrument(skip(self), target = "duroxide::providers::postgres")]
79    pub async fn initialize_schema(&self) -> Result<()> {
80        // Schema initialization is now handled by migrations
81        // This method is kept for backward compatibility but delegates to migrations
82        let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
83        migration_runner.migrate().await?;
84        Ok(())
85    }
86
87    /// Get current timestamp in milliseconds (Unix epoch)
88    fn now_millis() -> i64 {
89        SystemTime::now()
90            .duration_since(UNIX_EPOCH)
91            .unwrap()
92            .as_millis() as i64
93    }
94
95    /// Get schema-qualified table name
96    fn table_name(&self, table: &str) -> String {
97        format!("{}.{}", self.schema_name, table)
98    }
99
100    /// Get the database pool (for testing)
101    pub fn pool(&self) -> &PgPool {
102        &self.pool
103    }
104
105    /// Get the schema name (for testing)
106    pub fn schema_name(&self) -> &str {
107        &self.schema_name
108    }
109
110    /// Convert sqlx::Error to ProviderError with proper classification
111    fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
112        match e {
113            SqlxError::Database(ref db_err) => {
114                // PostgreSQL error codes
115                let code_opt = db_err.code();
116                let code = code_opt.as_deref();
117                if code == Some("40P01") {
118                    // Deadlock detected
119                    ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
120                } else if code == Some("40001") {
121                    // Serialization failure - permanent error (transaction conflict, not transient)
122                    ProviderError::permanent(operation, format!("Serialization failure: {e}"))
123                } else if code == Some("23505") {
124                    // Unique constraint violation (duplicate event)
125                    ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
126                } else if code == Some("23503") {
127                    // Foreign key constraint violation
128                    ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
129                } else {
130                    ProviderError::permanent(operation, format!("Database error: {e}"))
131                }
132            }
133            SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
134                ProviderError::retryable(operation, format!("Connection pool error: {e}"))
135            }
136            SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
137            _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
138        }
139    }
140
141    /// Clean up schema after tests (drops all tables and optionally the schema)
142    ///
143    /// **SAFETY**: Never drops the "public" schema itself, only tables within it.
144    /// Only drops the schema if it's a custom schema (not "public").
145    pub async fn cleanup_schema(&self) -> Result<()> {
146        // Call the stored procedure to drop all tables
147        sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
148            .execute(&*self.pool)
149            .await?;
150
151        // SAFETY: Never drop the "public" schema - it's a PostgreSQL system schema
152        // Only drop custom schemas created for testing
153        if self.schema_name != "public" {
154            sqlx::query(&format!(
155                "DROP SCHEMA IF EXISTS {} CASCADE",
156                self.schema_name
157            ))
158            .execute(&*self.pool)
159            .await?;
160        } else {
161            // Explicit safeguard: we only drop tables from public schema, never the schema itself
162            // This ensures we don't accidentally drop the default PostgreSQL schema
163        }
164
165        Ok(())
166    }
167}
168
169#[async_trait::async_trait]
170impl Provider for PostgresProvider {
171    fn name(&self) -> &str {
172        "duroxide-pg"
173    }
174
175    fn version(&self) -> &str {
176        env!("CARGO_PKG_VERSION")
177    }
178
179    #[instrument(skip(self), target = "duroxide::providers::postgres")]
180    async fn fetch_orchestration_item(
181        &self,
182        lock_timeout: Duration,
183        _poll_timeout: Duration,
184        filter: Option<&DispatcherCapabilityFilter>,
185    ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
186        let start = std::time::Instant::now();
187
188        const MAX_RETRIES: u32 = 3;
189        const RETRY_DELAY_MS: u64 = 50;
190
191        // Convert Duration to milliseconds
192        let lock_timeout_ms = lock_timeout.as_millis() as i64;
193        let mut _last_error: Option<ProviderError> = None;
194
195        // Extract version filter from capability filter
196        let (min_packed, max_packed) = if let Some(f) = filter {
197            if let Some(range) = f.supported_duroxide_versions.first() {
198                let min = range.min.major as i64 * 1_000_000
199                    + range.min.minor as i64 * 1_000
200                    + range.min.patch as i64;
201                let max = range.max.major as i64 * 1_000_000
202                    + range.max.minor as i64 * 1_000
203                    + range.max.patch as i64;
204                (Some(min), Some(max))
205            } else {
206                // Empty supported_duroxide_versions = supports nothing
207                return Ok(None);
208            }
209        } else {
210            (None, None)
211        };
212
213        for attempt in 0..=MAX_RETRIES {
214            let now_ms = Self::now_millis();
215
216            let result: Result<
217                Option<(
218                    String,
219                    String,
220                    String,
221                    i64,
222                    serde_json::Value,
223                    serde_json::Value,
224                    String,
225                    i32,
226                )>,
227                SqlxError,
228            > = sqlx::query_as(&format!(
229                "SELECT * FROM {}.fetch_orchestration_item($1, $2, $3, $4)",
230                self.schema_name
231            ))
232            .bind(now_ms)
233            .bind(lock_timeout_ms)
234            .bind(min_packed)
235            .bind(max_packed)
236            .fetch_optional(&*self.pool)
237            .await;
238
239            let row = match result {
240                Ok(r) => r,
241                Err(e) => {
242                    let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
243                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
244                        warn!(
245                            target = "duroxide::providers::postgres",
246                            operation = "fetch_orchestration_item",
247                            attempt = attempt + 1,
248                            error = %provider_err,
249                            "Retryable error, will retry"
250                        );
251                        _last_error = Some(provider_err);
252                        sleep(std::time::Duration::from_millis(
253                            RETRY_DELAY_MS * (attempt as u64 + 1),
254                        ))
255                        .await;
256                        continue;
257                    }
258                    return Err(provider_err);
259                }
260            };
261
262            if let Some((
263                instance_id,
264                orchestration_name,
265                orchestration_version,
266                execution_id,
267                history_json,
268                messages_json,
269                lock_token,
270                attempt_count,
271            )) = row
272            {
273                let (history, history_error) =
274                    match serde_json::from_value::<Vec<Event>>(history_json) {
275                        Ok(h) => (h, None),
276                        Err(e) => {
277                            let error_msg = format!("Failed to deserialize history: {e}");
278                            warn!(
279                                target = "duroxide::providers::postgres",
280                                instance = %instance_id,
281                                error = %error_msg,
282                                "History deserialization failed, returning item with history_error"
283                            );
284                            (vec![], Some(error_msg))
285                        }
286                    };
287
288                let messages: Vec<WorkItem> =
289                    serde_json::from_value(messages_json).map_err(|e| {
290                        ProviderError::permanent(
291                            "fetch_orchestration_item",
292                            format!("Failed to deserialize messages: {e}"),
293                        )
294                    })?;
295
296                let duration_ms = start.elapsed().as_millis() as u64;
297                debug!(
298                    target = "duroxide::providers::postgres",
299                    operation = "fetch_orchestration_item",
300                    instance_id = %instance_id,
301                    execution_id = execution_id,
302                    message_count = messages.len(),
303                    history_count = history.len(),
304                    attempt_count = attempt_count,
305                    duration_ms = duration_ms,
306                    attempts = attempt + 1,
307                    "Fetched orchestration item via stored procedure"
308                );
309
310                // Orphan queue messages: if orchestration_name is "Unknown", there's
311                // no history, and ALL messages are QueueMessage items, these are orphan
312                // events enqueued before the orchestration started. Drop them by acking
313                // with empty deltas. Other work items (CancelInstance, etc.) may
314                // legitimately race with StartOrchestration and must not be dropped.
315                if orchestration_name == "Unknown"
316                    && history.is_empty()
317                    && messages
318                        .iter()
319                        .all(|m| matches!(m, WorkItem::QueueMessage { .. }))
320                {
321                    let message_count = messages.len();
322                    tracing::warn!(
323                        target = "duroxide::providers::postgres",
324                        instance = %instance_id,
325                        message_count,
326                        "Dropping orphan queue messages — events enqueued before orchestration started are not supported"
327                    );
328                    self.ack_orchestration_item(
329                        &lock_token,
330                        execution_id as u64,
331                        vec![],
332                        vec![],
333                        vec![],
334                        ExecutionMetadata::default(),
335                        vec![],
336                    )
337                    .await?;
338                    return Ok(None);
339                }
340
341                return Ok(Some((
342                    OrchestrationItem {
343                        instance: instance_id,
344                        orchestration_name,
345                        execution_id: execution_id as u64,
346                        version: orchestration_version,
347                        history,
348                        messages,
349                        history_error,
350                    },
351                    lock_token,
352                    attempt_count as u32,
353                )));
354            }
355
356            // No result found - return immediately (short polling behavior)
357            // Only retry with delay on retryable errors (handled above)
358            return Ok(None);
359        }
360
361        Ok(None)
362    }
363    #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
364    async fn ack_orchestration_item(
365        &self,
366        lock_token: &str,
367        execution_id: u64,
368        history_delta: Vec<Event>,
369        worker_items: Vec<WorkItem>,
370        orchestrator_items: Vec<WorkItem>,
371        metadata: ExecutionMetadata,
372        cancelled_activities: Vec<ScheduledActivityIdentifier>,
373    ) -> Result<(), ProviderError> {
374        let start = std::time::Instant::now();
375
376        const MAX_RETRIES: u32 = 3;
377        const RETRY_DELAY_MS: u64 = 50;
378
379        let mut history_delta_payload = Vec::with_capacity(history_delta.len());
380        for event in &history_delta {
381            if event.event_id() == 0 {
382                return Err(ProviderError::permanent(
383                    "ack_orchestration_item",
384                    "event_id must be set by runtime",
385                ));
386            }
387
388            let event_json = serde_json::to_string(event).map_err(|e| {
389                ProviderError::permanent(
390                    "ack_orchestration_item",
391                    format!("Failed to serialize event: {e}"),
392                )
393            })?;
394
395            let event_type = format!("{event:?}")
396                .split('{')
397                .next()
398                .unwrap_or("Unknown")
399                .trim()
400                .to_string();
401
402            history_delta_payload.push(serde_json::json!({
403                "event_id": event.event_id(),
404                "event_type": event_type,
405                "event_data": event_json,
406            }));
407        }
408
409        let history_delta_json = serde_json::Value::Array(history_delta_payload);
410
411        let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
412            ProviderError::permanent(
413                "ack_orchestration_item",
414                format!("Failed to serialize worker items: {e}"),
415            )
416        })?;
417
418        let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
419            ProviderError::permanent(
420                "ack_orchestration_item",
421                format!("Failed to serialize orchestrator items: {e}"),
422            )
423        })?;
424
425        // Scan history_delta for the last CustomStatusUpdated event
426        let (custom_status_action, custom_status_value): (Option<&str>, Option<&str>) = {
427            let mut last_status: Option<&Option<String>> = None;
428            for event in &history_delta {
429                if let EventKind::CustomStatusUpdated { ref status } = event.kind {
430                    last_status = Some(status);
431                }
432            }
433            match last_status {
434                Some(Some(s)) => (Some("set"), Some(s.as_str())),
435                Some(None) => (Some("clear"), None),
436                None => (None, None),
437            }
438        };
439
440        let metadata_json = serde_json::json!({
441            "orchestration_name": metadata.orchestration_name,
442            "orchestration_version": metadata.orchestration_version,
443            "status": metadata.status,
444            "output": metadata.output,
445            "parent_instance_id": metadata.parent_instance_id,
446            "pinned_duroxide_version": metadata.pinned_duroxide_version.as_ref().map(|v| {
447                serde_json::json!({
448                    "major": v.major,
449                    "minor": v.minor,
450                    "patch": v.patch,
451                })
452            }),
453            "custom_status_action": custom_status_action,
454            "custom_status_value": custom_status_value,
455        });
456
457        // Serialize cancelled activities for lock stealing
458        let cancelled_activities_json: Vec<serde_json::Value> = cancelled_activities
459            .iter()
460            .map(|a| {
461                serde_json::json!({
462                    "instance": a.instance,
463                    "execution_id": a.execution_id,
464                    "activity_id": a.activity_id,
465                })
466            })
467            .collect();
468        let cancelled_activities_json = serde_json::Value::Array(cancelled_activities_json);
469
470        for attempt in 0..=MAX_RETRIES {
471            let now_ms = Self::now_millis();
472            let result = sqlx::query(&format!(
473                "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6, $7, $8)",
474                self.schema_name
475            ))
476            .bind(lock_token)
477            .bind(now_ms)
478            .bind(execution_id as i64)
479            .bind(&history_delta_json)
480            .bind(&worker_items_json)
481            .bind(&orchestrator_items_json)
482            .bind(&metadata_json)
483            .bind(&cancelled_activities_json)
484            .execute(&*self.pool)
485            .await;
486
487            match result {
488                Ok(_) => {
489                    let duration_ms = start.elapsed().as_millis() as u64;
490                    debug!(
491                        target = "duroxide::providers::postgres",
492                        operation = "ack_orchestration_item",
493                        execution_id = execution_id,
494                        history_count = history_delta.len(),
495                        worker_items_count = worker_items.len(),
496                        orchestrator_items_count = orchestrator_items.len(),
497                        cancelled_activities_count = cancelled_activities.len(),
498                        duration_ms = duration_ms,
499                        attempts = attempt + 1,
500                        "Acknowledged orchestration item via stored procedure"
501                    );
502                    return Ok(());
503                }
504                Err(e) => {
505                    // Check for permanent errors first
506                    if let SqlxError::Database(db_err) = &e {
507                        if db_err.message().contains("Invalid lock token") {
508                            return Err(ProviderError::permanent(
509                                "ack_orchestration_item",
510                                "Invalid lock token",
511                            ));
512                        }
513                    } else if e.to_string().contains("Invalid lock token") {
514                        return Err(ProviderError::permanent(
515                            "ack_orchestration_item",
516                            "Invalid lock token",
517                        ));
518                    }
519
520                    let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
521                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
522                        warn!(
523                            target = "duroxide::providers::postgres",
524                            operation = "ack_orchestration_item",
525                            attempt = attempt + 1,
526                            error = %provider_err,
527                            "Retryable error, will retry"
528                        );
529                        sleep(std::time::Duration::from_millis(
530                            RETRY_DELAY_MS * (attempt as u64 + 1),
531                        ))
532                        .await;
533                        continue;
534                    }
535                    return Err(provider_err);
536                }
537            }
538        }
539
540        // Should never reach here, but just in case
541        Ok(())
542    }
543    #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
544    async fn abandon_orchestration_item(
545        &self,
546        lock_token: &str,
547        delay: Option<Duration>,
548        ignore_attempt: bool,
549    ) -> Result<(), ProviderError> {
550        let start = std::time::Instant::now();
551        let now_ms = Self::now_millis();
552        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
553
554        let instance_id = match sqlx::query_scalar::<_, String>(&format!(
555            "SELECT {}.abandon_orchestration_item($1, $2, $3, $4)",
556            self.schema_name
557        ))
558        .bind(lock_token)
559        .bind(now_ms)
560        .bind(delay_param)
561        .bind(ignore_attempt)
562        .fetch_one(&*self.pool)
563        .await
564        {
565            Ok(instance_id) => instance_id,
566            Err(e) => {
567                if let SqlxError::Database(db_err) = &e {
568                    if db_err.message().contains("Invalid lock token") {
569                        return Err(ProviderError::permanent(
570                            "abandon_orchestration_item",
571                            "Invalid lock token",
572                        ));
573                    }
574                } else if e.to_string().contains("Invalid lock token") {
575                    return Err(ProviderError::permanent(
576                        "abandon_orchestration_item",
577                        "Invalid lock token",
578                    ));
579                }
580
581                return Err(Self::sqlx_to_provider_error(
582                    "abandon_orchestration_item",
583                    e,
584                ));
585            }
586        };
587
588        let duration_ms = start.elapsed().as_millis() as u64;
589        debug!(
590            target = "duroxide::providers::postgres",
591            operation = "abandon_orchestration_item",
592            instance_id = %instance_id,
593            delay_ms = delay.map(|d| d.as_millis() as u64),
594            ignore_attempt = ignore_attempt,
595            duration_ms = duration_ms,
596            "Abandoned orchestration item via stored procedure"
597        );
598
599        Ok(())
600    }
601
602    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
603    async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
604        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
605            "SELECT out_event_data FROM {}.fetch_history($1)",
606            self.schema_name
607        ))
608        .bind(instance)
609        .fetch_all(&*self.pool)
610        .await
611        .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
612
613        Ok(event_data_rows
614            .into_iter()
615            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
616            .collect())
617    }
618
619    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
620    async fn append_with_execution(
621        &self,
622        instance: &str,
623        execution_id: u64,
624        new_events: Vec<Event>,
625    ) -> Result<(), ProviderError> {
626        if new_events.is_empty() {
627            return Ok(());
628        }
629
630        let mut events_payload = Vec::with_capacity(new_events.len());
631        for event in &new_events {
632            if event.event_id() == 0 {
633                error!(
634                    target = "duroxide::providers::postgres",
635                    operation = "append_with_execution",
636                    error_type = "validation_error",
637                    instance_id = %instance,
638                    execution_id = execution_id,
639                    "event_id must be set by runtime"
640                );
641                return Err(ProviderError::permanent(
642                    "append_with_execution",
643                    "event_id must be set by runtime",
644                ));
645            }
646
647            let event_json = serde_json::to_string(event).map_err(|e| {
648                ProviderError::permanent(
649                    "append_with_execution",
650                    format!("Failed to serialize event: {e}"),
651                )
652            })?;
653
654            let event_type = format!("{event:?}")
655                .split('{')
656                .next()
657                .unwrap_or("Unknown")
658                .trim()
659                .to_string();
660
661            events_payload.push(serde_json::json!({
662                "event_id": event.event_id(),
663                "event_type": event_type,
664                "event_data": event_json,
665            }));
666        }
667
668        let events_json = serde_json::Value::Array(events_payload);
669
670        sqlx::query(&format!(
671            "SELECT {}.append_history($1, $2, $3)",
672            self.schema_name
673        ))
674        .bind(instance)
675        .bind(execution_id as i64)
676        .bind(events_json)
677        .execute(&*self.pool)
678        .await
679        .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
680
681        debug!(
682            target = "duroxide::providers::postgres",
683            operation = "append_with_execution",
684            instance_id = %instance,
685            execution_id = execution_id,
686            event_count = new_events.len(),
687            "Appended history events via stored procedure"
688        );
689
690        Ok(())
691    }
692
693    #[instrument(skip(self), target = "duroxide::providers::postgres")]
694    async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
695        let work_item = serde_json::to_string(&item).map_err(|e| {
696            ProviderError::permanent(
697                "enqueue_worker_work",
698                format!("Failed to serialize work item: {e}"),
699            )
700        })?;
701
702        let now_ms = Self::now_millis();
703
704        // Extract activity identification and session_id for ActivityExecute items
705        let (instance_id, execution_id, activity_id, session_id) = match &item {
706            WorkItem::ActivityExecute {
707                instance,
708                execution_id,
709                id,
710                session_id,
711                ..
712            } => (
713                Some(instance.clone()),
714                Some(*execution_id as i64),
715                Some(*id as i64),
716                session_id.clone(),
717            ),
718            _ => (None, None, None, None),
719        };
720
721        sqlx::query(&format!(
722            "SELECT {}.enqueue_worker_work($1, $2, $3, $4, $5, $6)",
723            self.schema_name
724        ))
725        .bind(work_item)
726        .bind(now_ms)
727        .bind(&instance_id)
728        .bind(execution_id)
729        .bind(activity_id)
730        .bind(&session_id)
731        .execute(&*self.pool)
732        .await
733        .map_err(|e| {
734            error!(
735                target = "duroxide::providers::postgres",
736                operation = "enqueue_worker_work",
737                error_type = "database_error",
738                error = %e,
739                "Failed to enqueue worker work"
740            );
741            Self::sqlx_to_provider_error("enqueue_worker_work", e)
742        })?;
743
744        Ok(())
745    }
746
747    #[instrument(skip(self), target = "duroxide::providers::postgres")]
748    async fn fetch_work_item(
749        &self,
750        lock_timeout: Duration,
751        _poll_timeout: Duration,
752        session: Option<&SessionFetchConfig>,
753    ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
754        let start = std::time::Instant::now();
755
756        // Convert Duration to milliseconds
757        let lock_timeout_ms = lock_timeout.as_millis() as i64;
758
759        // Extract session parameters
760        let (owner_id, session_lock_timeout_ms): (Option<&str>, Option<i64>) = match session {
761            Some(config) => (
762                Some(&config.owner_id),
763                Some(config.lock_timeout.as_millis() as i64),
764            ),
765            None => (None, None),
766        };
767
768        let row = match sqlx::query_as::<_, (String, String, i32)>(&format!(
769            "SELECT * FROM {}.fetch_work_item($1, $2, $3, $4)",
770            self.schema_name
771        ))
772        .bind(Self::now_millis())
773        .bind(lock_timeout_ms)
774        .bind(owner_id)
775        .bind(session_lock_timeout_ms)
776        .fetch_optional(&*self.pool)
777        .await
778        {
779            Ok(row) => row,
780            Err(e) => {
781                return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
782            }
783        };
784
785        let (work_item_json, lock_token, attempt_count) = match row {
786            Some(row) => row,
787            None => return Ok(None),
788        };
789
790        let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
791            ProviderError::permanent(
792                "fetch_work_item",
793                format!("Failed to deserialize worker item: {e}"),
794            )
795        })?;
796
797        let duration_ms = start.elapsed().as_millis() as u64;
798
799        // Extract instance for logging - different work item types have different structures
800        let instance_id = match &work_item {
801            WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
802            WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
803            WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
804            WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
805            WorkItem::TimerFired { instance, .. } => instance.as_str(),
806            WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
807            WorkItem::CancelInstance { instance, .. } => instance.as_str(),
808            WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
809            WorkItem::SubOrchCompleted {
810                parent_instance, ..
811            } => parent_instance.as_str(),
812            WorkItem::SubOrchFailed {
813                parent_instance, ..
814            } => parent_instance.as_str(),
815            WorkItem::QueueMessage { instance, .. } => instance.as_str(),
816        };
817
818        debug!(
819            target = "duroxide::providers::postgres",
820            operation = "fetch_work_item",
821            instance_id = %instance_id,
822            attempt_count = attempt_count,
823            duration_ms = duration_ms,
824            "Fetched activity work item via stored procedure"
825        );
826
827        Ok(Some((work_item, lock_token, attempt_count as u32)))
828    }
829
830    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
831    async fn ack_work_item(
832        &self,
833        token: &str,
834        completion: Option<WorkItem>,
835    ) -> Result<(), ProviderError> {
836        let start = std::time::Instant::now();
837
838        // If no completion provided (e.g., cancelled activity), just delete the item
839        let Some(completion) = completion else {
840            let now_ms = Self::now_millis();
841            // Call ack_worker with NULL completion to delete without enqueueing
842            sqlx::query(&format!(
843                "SELECT {}.ack_worker($1, NULL, NULL, $2)",
844                self.schema_name
845            ))
846            .bind(token)
847            .bind(now_ms)
848            .execute(&*self.pool)
849            .await
850            .map_err(|e| {
851                if e.to_string().contains("Worker queue item not found") {
852                    ProviderError::permanent(
853                        "ack_worker",
854                        "Worker queue item not found or already processed",
855                    )
856                } else {
857                    Self::sqlx_to_provider_error("ack_worker", e)
858                }
859            })?;
860
861            let duration_ms = start.elapsed().as_millis() as u64;
862            debug!(
863                target = "duroxide::providers::postgres",
864                operation = "ack_worker",
865                token = %token,
866                duration_ms = duration_ms,
867                "Acknowledged worker without completion (cancelled)"
868            );
869            return Ok(());
870        };
871
872        // Extract instance ID from completion WorkItem
873        let instance_id = match &completion {
874            WorkItem::ActivityCompleted { instance, .. }
875            | WorkItem::ActivityFailed { instance, .. } => instance,
876            _ => {
877                error!(
878                    target = "duroxide::providers::postgres",
879                    operation = "ack_worker",
880                    error_type = "invalid_completion_type",
881                    "Invalid completion work item type"
882                );
883                return Err(ProviderError::permanent(
884                    "ack_worker",
885                    "Invalid completion work item type",
886                ));
887            }
888        };
889
890        let completion_json = serde_json::to_string(&completion).map_err(|e| {
891            ProviderError::permanent("ack_worker", format!("Failed to serialize completion: {e}"))
892        })?;
893
894        let now_ms = Self::now_millis();
895
896        // Call stored procedure to atomically delete worker item and enqueue completion
897        sqlx::query(&format!(
898            "SELECT {}.ack_worker($1, $2, $3, $4)",
899            self.schema_name
900        ))
901        .bind(token)
902        .bind(instance_id)
903        .bind(completion_json)
904        .bind(now_ms)
905        .execute(&*self.pool)
906        .await
907        .map_err(|e| {
908            if e.to_string().contains("Worker queue item not found") {
909                error!(
910                    target = "duroxide::providers::postgres",
911                    operation = "ack_worker",
912                    error_type = "worker_item_not_found",
913                    token = %token,
914                    "Worker queue item not found or already processed"
915                );
916                ProviderError::permanent(
917                    "ack_worker",
918                    "Worker queue item not found or already processed",
919                )
920            } else {
921                Self::sqlx_to_provider_error("ack_worker", e)
922            }
923        })?;
924
925        let duration_ms = start.elapsed().as_millis() as u64;
926        debug!(
927            target = "duroxide::providers::postgres",
928            operation = "ack_worker",
929            instance_id = %instance_id,
930            duration_ms = duration_ms,
931            "Acknowledged worker and enqueued completion"
932        );
933
934        Ok(())
935    }
936
937    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
938    async fn renew_work_item_lock(
939        &self,
940        token: &str,
941        extend_for: Duration,
942    ) -> Result<(), ProviderError> {
943        let start = std::time::Instant::now();
944
945        // Get current time from application for consistent time reference
946        let now_ms = Self::now_millis();
947
948        // Convert Duration to seconds for the stored procedure
949        let extend_secs = extend_for.as_secs() as i64;
950
951        match sqlx::query(&format!(
952            "SELECT {}.renew_work_item_lock($1, $2, $3)",
953            self.schema_name
954        ))
955        .bind(token)
956        .bind(now_ms)
957        .bind(extend_secs)
958        .execute(&*self.pool)
959        .await
960        {
961            Ok(_) => {
962                let duration_ms = start.elapsed().as_millis() as u64;
963                debug!(
964                    target = "duroxide::providers::postgres",
965                    operation = "renew_work_item_lock",
966                    token = %token,
967                    extend_for_secs = extend_secs,
968                    duration_ms = duration_ms,
969                    "Work item lock renewed successfully"
970                );
971                Ok(())
972            }
973            Err(e) => {
974                if let SqlxError::Database(db_err) = &e {
975                    if db_err.message().contains("Lock token invalid") {
976                        return Err(ProviderError::permanent(
977                            "renew_work_item_lock",
978                            "Lock token invalid, expired, or already acked",
979                        ));
980                    }
981                } else if e.to_string().contains("Lock token invalid") {
982                    return Err(ProviderError::permanent(
983                        "renew_work_item_lock",
984                        "Lock token invalid, expired, or already acked",
985                    ));
986                }
987
988                Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
989            }
990        }
991    }
992
993    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
994    async fn abandon_work_item(
995        &self,
996        token: &str,
997        delay: Option<Duration>,
998        ignore_attempt: bool,
999    ) -> Result<(), ProviderError> {
1000        let start = std::time::Instant::now();
1001        let now_ms = Self::now_millis();
1002        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
1003
1004        match sqlx::query(&format!(
1005            "SELECT {}.abandon_work_item($1, $2, $3, $4)",
1006            self.schema_name
1007        ))
1008        .bind(token)
1009        .bind(now_ms)
1010        .bind(delay_param)
1011        .bind(ignore_attempt)
1012        .execute(&*self.pool)
1013        .await
1014        {
1015            Ok(_) => {
1016                let duration_ms = start.elapsed().as_millis() as u64;
1017                debug!(
1018                    target = "duroxide::providers::postgres",
1019                    operation = "abandon_work_item",
1020                    token = %token,
1021                    delay_ms = delay.map(|d| d.as_millis() as u64),
1022                    ignore_attempt = ignore_attempt,
1023                    duration_ms = duration_ms,
1024                    "Abandoned work item via stored procedure"
1025                );
1026                Ok(())
1027            }
1028            Err(e) => {
1029                if let SqlxError::Database(db_err) = &e {
1030                    if db_err.message().contains("Invalid lock token")
1031                        || db_err.message().contains("already acked")
1032                    {
1033                        return Err(ProviderError::permanent(
1034                            "abandon_work_item",
1035                            "Invalid lock token or already acked",
1036                        ));
1037                    }
1038                } else if e.to_string().contains("Invalid lock token")
1039                    || e.to_string().contains("already acked")
1040                {
1041                    return Err(ProviderError::permanent(
1042                        "abandon_work_item",
1043                        "Invalid lock token or already acked",
1044                    ));
1045                }
1046
1047                Err(Self::sqlx_to_provider_error("abandon_work_item", e))
1048            }
1049        }
1050    }
1051
1052    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1053    async fn renew_orchestration_item_lock(
1054        &self,
1055        token: &str,
1056        extend_for: Duration,
1057    ) -> Result<(), ProviderError> {
1058        let start = std::time::Instant::now();
1059
1060        // Get current time from application for consistent time reference
1061        let now_ms = Self::now_millis();
1062
1063        // Convert Duration to seconds for the stored procedure
1064        let extend_secs = extend_for.as_secs() as i64;
1065
1066        match sqlx::query(&format!(
1067            "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
1068            self.schema_name
1069        ))
1070        .bind(token)
1071        .bind(now_ms)
1072        .bind(extend_secs)
1073        .execute(&*self.pool)
1074        .await
1075        {
1076            Ok(_) => {
1077                let duration_ms = start.elapsed().as_millis() as u64;
1078                debug!(
1079                    target = "duroxide::providers::postgres",
1080                    operation = "renew_orchestration_item_lock",
1081                    token = %token,
1082                    extend_for_secs = extend_secs,
1083                    duration_ms = duration_ms,
1084                    "Orchestration item lock renewed successfully"
1085                );
1086                Ok(())
1087            }
1088            Err(e) => {
1089                if let SqlxError::Database(db_err) = &e {
1090                    if db_err.message().contains("Lock token invalid")
1091                        || db_err.message().contains("expired")
1092                        || db_err.message().contains("already released")
1093                    {
1094                        return Err(ProviderError::permanent(
1095                            "renew_orchestration_item_lock",
1096                            "Lock token invalid, expired, or already released",
1097                        ));
1098                    }
1099                } else if e.to_string().contains("Lock token invalid")
1100                    || e.to_string().contains("expired")
1101                    || e.to_string().contains("already released")
1102                {
1103                    return Err(ProviderError::permanent(
1104                        "renew_orchestration_item_lock",
1105                        "Lock token invalid, expired, or already released",
1106                    ));
1107                }
1108
1109                Err(Self::sqlx_to_provider_error(
1110                    "renew_orchestration_item_lock",
1111                    e,
1112                ))
1113            }
1114        }
1115    }
1116
1117    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1118    async fn enqueue_for_orchestrator(
1119        &self,
1120        item: WorkItem,
1121        delay: Option<Duration>,
1122    ) -> Result<(), ProviderError> {
1123        let work_item = serde_json::to_string(&item).map_err(|e| {
1124            ProviderError::permanent(
1125                "enqueue_orchestrator_work",
1126                format!("Failed to serialize work item: {e}"),
1127            )
1128        })?;
1129
1130        // Extract instance ID from WorkItem enum
1131        let instance_id = match &item {
1132            WorkItem::StartOrchestration { instance, .. }
1133            | WorkItem::ActivityCompleted { instance, .. }
1134            | WorkItem::ActivityFailed { instance, .. }
1135            | WorkItem::TimerFired { instance, .. }
1136            | WorkItem::ExternalRaised { instance, .. }
1137            | WorkItem::CancelInstance { instance, .. }
1138            | WorkItem::ContinueAsNew { instance, .. }
1139            | WorkItem::QueueMessage { instance, .. } => instance,
1140            WorkItem::SubOrchCompleted {
1141                parent_instance, ..
1142            }
1143            | WorkItem::SubOrchFailed {
1144                parent_instance, ..
1145            } => parent_instance,
1146            WorkItem::ActivityExecute { .. } => {
1147                return Err(ProviderError::permanent(
1148                    "enqueue_orchestrator_work",
1149                    "ActivityExecute should go to worker queue, not orchestrator queue",
1150                ));
1151            }
1152        };
1153
1154        // Determine visible_at: use max of fire_at_ms (for TimerFired) and delay
1155        let now_ms = Self::now_millis();
1156
1157        let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
1158            if *fire_at_ms > 0 {
1159                // Take max of fire_at_ms and delay (if provided)
1160                if let Some(delay) = delay {
1161                    std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
1162                } else {
1163                    *fire_at_ms
1164                }
1165            } else {
1166                // fire_at_ms is 0, use delay or NOW()
1167                delay
1168                    .map(|d| now_ms as u64 + d.as_millis() as u64)
1169                    .unwrap_or(now_ms as u64)
1170            }
1171        } else {
1172            // Non-timer item: use delay or NOW()
1173            delay
1174                .map(|d| now_ms as u64 + d.as_millis() as u64)
1175                .unwrap_or(now_ms as u64)
1176        };
1177
1178        let visible_at = Utc
1179            .timestamp_millis_opt(visible_at_ms as i64)
1180            .single()
1181            .ok_or_else(|| {
1182                ProviderError::permanent(
1183                    "enqueue_orchestrator_work",
1184                    "Invalid visible_at timestamp",
1185                )
1186            })?;
1187
1188        // ⚠️ CRITICAL: DO NOT extract orchestration metadata - instance creation happens via ack_orchestration_item metadata
1189        // Pass NULL for orchestration_name, orchestration_version, execution_id parameters
1190
1191        // Call stored procedure to enqueue work
1192        sqlx::query(&format!(
1193            "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
1194            self.schema_name
1195        ))
1196        .bind(instance_id)
1197        .bind(&work_item)
1198        .bind(visible_at)
1199        .bind::<Option<String>>(None) // orchestration_name - NULL
1200        .bind::<Option<String>>(None) // orchestration_version - NULL
1201        .bind::<Option<i64>>(None) // execution_id - NULL
1202        .execute(&*self.pool)
1203        .await
1204        .map_err(|e| {
1205            error!(
1206                target = "duroxide::providers::postgres",
1207                operation = "enqueue_orchestrator_work",
1208                error_type = "database_error",
1209                error = %e,
1210                instance_id = %instance_id,
1211                "Failed to enqueue orchestrator work"
1212            );
1213            Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1214        })?;
1215
1216        debug!(
1217            target = "duroxide::providers::postgres",
1218            operation = "enqueue_orchestrator_work",
1219            instance_id = %instance_id,
1220            delay_ms = delay.map(|d| d.as_millis() as u64),
1221            "Enqueued orchestrator work"
1222        );
1223
1224        Ok(())
1225    }
1226
1227    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1228    async fn read_with_execution(
1229        &self,
1230        instance: &str,
1231        execution_id: u64,
1232    ) -> Result<Vec<Event>, ProviderError> {
1233        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1234            "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1235            self.table_name("history")
1236        ))
1237        .bind(instance)
1238        .bind(execution_id as i64)
1239        .fetch_all(&*self.pool)
1240        .await
1241        .ok()
1242        .unwrap_or_default();
1243
1244        Ok(event_data_rows
1245            .into_iter()
1246            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1247            .collect())
1248    }
1249
1250    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1251    async fn renew_session_lock(
1252        &self,
1253        owner_ids: &[&str],
1254        extend_for: Duration,
1255        idle_timeout: Duration,
1256    ) -> Result<usize, ProviderError> {
1257        if owner_ids.is_empty() {
1258            return Ok(0);
1259        }
1260
1261        let now_ms = Self::now_millis();
1262        let extend_ms = extend_for.as_millis() as i64;
1263        let idle_timeout_ms = idle_timeout.as_millis() as i64;
1264        let owner_ids_vec: Vec<&str> = owner_ids.to_vec();
1265
1266        let result = sqlx::query_scalar::<_, i64>(&format!(
1267            "SELECT {}.renew_session_lock($1, $2, $3, $4)",
1268            self.schema_name
1269        ))
1270        .bind(&owner_ids_vec)
1271        .bind(now_ms)
1272        .bind(extend_ms)
1273        .bind(idle_timeout_ms)
1274        .fetch_one(&*self.pool)
1275        .await
1276        .map_err(|e| Self::sqlx_to_provider_error("renew_session_lock", e))?;
1277
1278        debug!(
1279            target = "duroxide::providers::postgres",
1280            operation = "renew_session_lock",
1281            owner_count = owner_ids.len(),
1282            sessions_renewed = result,
1283            "Session locks renewed"
1284        );
1285
1286        Ok(result as usize)
1287    }
1288
1289    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1290    async fn cleanup_orphaned_sessions(
1291        &self,
1292        _idle_timeout: Duration,
1293    ) -> Result<usize, ProviderError> {
1294        let now_ms = Self::now_millis();
1295
1296        let result = sqlx::query_scalar::<_, i64>(&format!(
1297            "SELECT {}.cleanup_orphaned_sessions($1)",
1298            self.schema_name
1299        ))
1300        .bind(now_ms)
1301        .fetch_one(&*self.pool)
1302        .await
1303        .map_err(|e| Self::sqlx_to_provider_error("cleanup_orphaned_sessions", e))?;
1304
1305        debug!(
1306            target = "duroxide::providers::postgres",
1307            operation = "cleanup_orphaned_sessions",
1308            sessions_cleaned = result,
1309            "Orphaned sessions cleaned up"
1310        );
1311
1312        Ok(result as usize)
1313    }
1314
1315    fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1316        Some(self)
1317    }
1318
1319    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1320    async fn get_custom_status(
1321        &self,
1322        instance: &str,
1323        last_seen_version: u64,
1324    ) -> Result<Option<(Option<String>, u64)>, ProviderError> {
1325        let row = sqlx::query_as::<_, (Option<String>, i64)>(&format!(
1326            "SELECT * FROM {}.get_custom_status($1, $2)",
1327            self.schema_name
1328        ))
1329        .bind(instance)
1330        .bind(last_seen_version as i64)
1331        .fetch_optional(&*self.pool)
1332        .await
1333        .map_err(|e| Self::sqlx_to_provider_error("get_custom_status", e))?;
1334
1335        match row {
1336            Some((custom_status, version)) => Ok(Some((custom_status, version as u64))),
1337            None => Ok(None),
1338        }
1339    }
1340}
1341
1342#[async_trait::async_trait]
1343impl ProviderAdmin for PostgresProvider {
1344    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1345    async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1346        sqlx::query_scalar(&format!(
1347            "SELECT instance_id FROM {}.list_instances()",
1348            self.schema_name
1349        ))
1350        .fetch_all(&*self.pool)
1351        .await
1352        .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1353    }
1354
1355    #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1356    async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1357        sqlx::query_scalar(&format!(
1358            "SELECT instance_id FROM {}.list_instances_by_status($1)",
1359            self.schema_name
1360        ))
1361        .bind(status)
1362        .fetch_all(&*self.pool)
1363        .await
1364        .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1365    }
1366
1367    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1368    async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1369        let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1370            "SELECT execution_id FROM {}.list_executions($1)",
1371            self.schema_name
1372        ))
1373        .bind(instance)
1374        .fetch_all(&*self.pool)
1375        .await
1376        .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1377
1378        Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1379    }
1380
1381    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1382    async fn read_history_with_execution_id(
1383        &self,
1384        instance: &str,
1385        execution_id: u64,
1386    ) -> Result<Vec<Event>, ProviderError> {
1387        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1388            "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1389            self.schema_name
1390        ))
1391        .bind(instance)
1392        .bind(execution_id as i64)
1393        .fetch_all(&*self.pool)
1394        .await
1395        .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1396
1397        event_data_rows
1398            .into_iter()
1399            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1400            .collect::<Vec<Event>>()
1401            .into_iter()
1402            .map(Ok)
1403            .collect()
1404    }
1405
1406    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1407    async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1408        let execution_id = self.latest_execution_id(instance).await?;
1409        self.read_history_with_execution_id(instance, execution_id)
1410            .await
1411    }
1412
1413    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1414    async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1415        sqlx::query_scalar(&format!(
1416            "SELECT {}.latest_execution_id($1)",
1417            self.schema_name
1418        ))
1419        .bind(instance)
1420        .fetch_optional(&*self.pool)
1421        .await
1422        .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1423        .map(|id: i64| id as u64)
1424        .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1425    }
1426
1427    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1428    async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1429        let row: Option<(
1430            String,
1431            String,
1432            String,
1433            i64,
1434            chrono::DateTime<Utc>,
1435            Option<chrono::DateTime<Utc>>,
1436            Option<String>,
1437            Option<String>,
1438            Option<String>,
1439        )> = sqlx::query_as(&format!(
1440            "SELECT * FROM {}.get_instance_info($1)",
1441            self.schema_name
1442        ))
1443        .bind(instance)
1444        .fetch_optional(&*self.pool)
1445        .await
1446        .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1447
1448        let (
1449            instance_id,
1450            orchestration_name,
1451            orchestration_version,
1452            current_execution_id,
1453            created_at,
1454            updated_at,
1455            status,
1456            output,
1457            parent_instance_id,
1458        ) =
1459            row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1460
1461        Ok(InstanceInfo {
1462            instance_id,
1463            orchestration_name,
1464            orchestration_version,
1465            current_execution_id: current_execution_id as u64,
1466            status: status.unwrap_or_else(|| "Running".to_string()),
1467            output,
1468            created_at: created_at.timestamp_millis() as u64,
1469            updated_at: updated_at
1470                .map(|dt| dt.timestamp_millis() as u64)
1471                .unwrap_or(created_at.timestamp_millis() as u64),
1472            parent_instance_id,
1473        })
1474    }
1475
1476    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1477    async fn get_execution_info(
1478        &self,
1479        instance: &str,
1480        execution_id: u64,
1481    ) -> Result<ExecutionInfo, ProviderError> {
1482        let row: Option<(
1483            i64,
1484            String,
1485            Option<String>,
1486            chrono::DateTime<Utc>,
1487            Option<chrono::DateTime<Utc>>,
1488            i64,
1489        )> = sqlx::query_as(&format!(
1490            "SELECT * FROM {}.get_execution_info($1, $2)",
1491            self.schema_name
1492        ))
1493        .bind(instance)
1494        .bind(execution_id as i64)
1495        .fetch_optional(&*self.pool)
1496        .await
1497        .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1498
1499        let (exec_id, status, output, started_at, completed_at, event_count) = row
1500            .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1501
1502        Ok(ExecutionInfo {
1503            execution_id: exec_id as u64,
1504            status,
1505            output,
1506            started_at: started_at.timestamp_millis() as u64,
1507            completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1508            event_count: event_count as usize,
1509        })
1510    }
1511
1512    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1513    async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1514        let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1515            "SELECT * FROM {}.get_system_metrics()",
1516            self.schema_name
1517        ))
1518        .fetch_optional(&*self.pool)
1519        .await
1520        .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1521
1522        let (
1523            total_instances,
1524            total_executions,
1525            running_instances,
1526            completed_instances,
1527            failed_instances,
1528            total_events,
1529        ) = row.ok_or_else(|| {
1530            ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1531        })?;
1532
1533        Ok(SystemMetrics {
1534            total_instances: total_instances as u64,
1535            total_executions: total_executions as u64,
1536            running_instances: running_instances as u64,
1537            completed_instances: completed_instances as u64,
1538            failed_instances: failed_instances as u64,
1539            total_events: total_events as u64,
1540        })
1541    }
1542
1543    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1544    async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1545        let now_ms = Self::now_millis();
1546
1547        let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1548            "SELECT * FROM {}.get_queue_depths($1)",
1549            self.schema_name
1550        ))
1551        .bind(now_ms)
1552        .fetch_optional(&*self.pool)
1553        .await
1554        .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1555
1556        let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1557            ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1558        })?;
1559
1560        Ok(QueueDepths {
1561            orchestrator_queue: orchestrator_queue as usize,
1562            worker_queue: worker_queue as usize,
1563            timer_queue: 0, // Timers are in orchestrator queue with delayed visibility
1564        })
1565    }
1566
1567    // ===== Hierarchy Primitive Operations =====
1568
1569    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1570    async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError> {
1571        sqlx::query_scalar(&format!(
1572            "SELECT child_instance_id FROM {}.list_children($1)",
1573            self.schema_name
1574        ))
1575        .bind(instance_id)
1576        .fetch_all(&*self.pool)
1577        .await
1578        .map_err(|e| Self::sqlx_to_provider_error("list_children", e))
1579    }
1580
1581    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1582    async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError> {
1583        // The stored procedure raises an exception if instance doesn't exist
1584        // Otherwise returns the parent_instance_id (which may be NULL)
1585        let result: Result<Option<String>, _> =
1586            sqlx::query_scalar(&format!("SELECT {}.get_parent_id($1)", self.schema_name))
1587                .bind(instance_id)
1588                .fetch_one(&*self.pool)
1589                .await;
1590
1591        match result {
1592            Ok(parent_id) => Ok(parent_id),
1593            Err(e) => {
1594                let err_str = e.to_string();
1595                if err_str.contains("Instance not found") {
1596                    Err(ProviderError::permanent(
1597                        "get_parent_id",
1598                        format!("Instance not found: {}", instance_id),
1599                    ))
1600                } else {
1601                    Err(Self::sqlx_to_provider_error("get_parent_id", e))
1602                }
1603            }
1604        }
1605    }
1606
1607    // ===== Deletion Operations =====
1608
1609    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1610    async fn delete_instances_atomic(
1611        &self,
1612        ids: &[String],
1613        force: bool,
1614    ) -> Result<DeleteInstanceResult, ProviderError> {
1615        if ids.is_empty() {
1616            return Ok(DeleteInstanceResult::default());
1617        }
1618
1619        let row: Option<(i64, i64, i64, i64)> = sqlx::query_as(&format!(
1620            "SELECT * FROM {}.delete_instances_atomic($1, $2)",
1621            self.schema_name
1622        ))
1623        .bind(ids)
1624        .bind(force)
1625        .fetch_optional(&*self.pool)
1626        .await
1627        .map_err(|e| {
1628            let err_str = e.to_string();
1629            if err_str.contains("is Running") {
1630                ProviderError::permanent("delete_instances_atomic", err_str)
1631            } else if err_str.contains("Orphan detected") {
1632                ProviderError::permanent("delete_instances_atomic", err_str)
1633            } else {
1634                Self::sqlx_to_provider_error("delete_instances_atomic", e)
1635            }
1636        })?;
1637
1638        let (instances_deleted, executions_deleted, events_deleted, queue_messages_deleted) =
1639            row.unwrap_or((0, 0, 0, 0));
1640
1641        debug!(
1642            target = "duroxide::providers::postgres",
1643            operation = "delete_instances_atomic",
1644            instances_deleted = instances_deleted,
1645            executions_deleted = executions_deleted,
1646            events_deleted = events_deleted,
1647            queue_messages_deleted = queue_messages_deleted,
1648            "Deleted instances atomically"
1649        );
1650
1651        Ok(DeleteInstanceResult {
1652            instances_deleted: instances_deleted as u64,
1653            executions_deleted: executions_deleted as u64,
1654            events_deleted: events_deleted as u64,
1655            queue_messages_deleted: queue_messages_deleted as u64,
1656        })
1657    }
1658
1659    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1660    async fn delete_instance_bulk(
1661        &self,
1662        filter: InstanceFilter,
1663    ) -> Result<DeleteInstanceResult, ProviderError> {
1664        // Build query to find matching root instances in terminal states
1665        let mut sql = format!(
1666            r#"
1667            SELECT i.instance_id
1668            FROM {}.instances i
1669            LEFT JOIN {}.executions e ON i.instance_id = e.instance_id 
1670              AND i.current_execution_id = e.execution_id
1671            WHERE i.parent_instance_id IS NULL
1672              AND e.status IN ('Completed', 'Failed', 'ContinuedAsNew')
1673            "#,
1674            self.schema_name, self.schema_name
1675        );
1676
1677        // Add instance_ids filter if provided
1678        if let Some(ref ids) = filter.instance_ids {
1679            if ids.is_empty() {
1680                return Ok(DeleteInstanceResult::default());
1681            }
1682            let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1683            sql.push_str(&format!(
1684                " AND i.instance_id IN ({})",
1685                placeholders.join(", ")
1686            ));
1687        }
1688
1689        // Add completed_before filter if provided
1690        if filter.completed_before.is_some() {
1691            let param_num = filter
1692                .instance_ids
1693                .as_ref()
1694                .map(|ids| ids.len())
1695                .unwrap_or(0)
1696                + 1;
1697            sql.push_str(&format!(
1698                " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1699                param_num
1700            ));
1701        }
1702
1703        // Add limit
1704        let limit = filter.limit.unwrap_or(1000);
1705        let limit_param_num = filter
1706            .instance_ids
1707            .as_ref()
1708            .map(|ids| ids.len())
1709            .unwrap_or(0)
1710            + if filter.completed_before.is_some() {
1711                1
1712            } else {
1713                0
1714            }
1715            + 1;
1716        sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1717
1718        // Build and execute query
1719        let mut query = sqlx::query_scalar::<_, String>(&sql);
1720        if let Some(ref ids) = filter.instance_ids {
1721            for id in ids {
1722                query = query.bind(id);
1723            }
1724        }
1725        if let Some(completed_before) = filter.completed_before {
1726            query = query.bind(completed_before as i64);
1727        }
1728        query = query.bind(limit as i64);
1729
1730        let instance_ids: Vec<String> = query
1731            .fetch_all(&*self.pool)
1732            .await
1733            .map_err(|e| Self::sqlx_to_provider_error("delete_instance_bulk", e))?;
1734
1735        if instance_ids.is_empty() {
1736            return Ok(DeleteInstanceResult::default());
1737        }
1738
1739        // Delete each instance with cascade
1740        let mut result = DeleteInstanceResult::default();
1741
1742        for instance_id in &instance_ids {
1743            // Get full tree for this root
1744            let tree = self.get_instance_tree(instance_id).await?;
1745
1746            // Atomic delete (tree.all_ids is already in deletion order: children first)
1747            let delete_result = self.delete_instances_atomic(&tree.all_ids, true).await?;
1748            result.instances_deleted += delete_result.instances_deleted;
1749            result.executions_deleted += delete_result.executions_deleted;
1750            result.events_deleted += delete_result.events_deleted;
1751            result.queue_messages_deleted += delete_result.queue_messages_deleted;
1752        }
1753
1754        debug!(
1755            target = "duroxide::providers::postgres",
1756            operation = "delete_instance_bulk",
1757            instances_deleted = result.instances_deleted,
1758            executions_deleted = result.executions_deleted,
1759            events_deleted = result.events_deleted,
1760            queue_messages_deleted = result.queue_messages_deleted,
1761            "Bulk deleted instances"
1762        );
1763
1764        Ok(result)
1765    }
1766
1767    // ===== Pruning Operations =====
1768
1769    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1770    async fn prune_executions(
1771        &self,
1772        instance_id: &str,
1773        options: PruneOptions,
1774    ) -> Result<PruneResult, ProviderError> {
1775        let keep_last: Option<i32> = options.keep_last.map(|v| v as i32);
1776        let completed_before_ms: Option<i64> = options.completed_before.map(|v| v as i64);
1777
1778        let row: Option<(i64, i64, i64)> = sqlx::query_as(&format!(
1779            "SELECT * FROM {}.prune_executions($1, $2, $3)",
1780            self.schema_name
1781        ))
1782        .bind(instance_id)
1783        .bind(keep_last)
1784        .bind(completed_before_ms)
1785        .fetch_optional(&*self.pool)
1786        .await
1787        .map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?;
1788
1789        let (instances_processed, executions_deleted, events_deleted) = row.unwrap_or((0, 0, 0));
1790
1791        debug!(
1792            target = "duroxide::providers::postgres",
1793            operation = "prune_executions",
1794            instance_id = %instance_id,
1795            instances_processed = instances_processed,
1796            executions_deleted = executions_deleted,
1797            events_deleted = events_deleted,
1798            "Pruned executions"
1799        );
1800
1801        Ok(PruneResult {
1802            instances_processed: instances_processed as u64,
1803            executions_deleted: executions_deleted as u64,
1804            events_deleted: events_deleted as u64,
1805        })
1806    }
1807
1808    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1809    async fn prune_executions_bulk(
1810        &self,
1811        filter: InstanceFilter,
1812        options: PruneOptions,
1813    ) -> Result<PruneResult, ProviderError> {
1814        // Find matching instances (all statuses - prune_executions protects current execution)
1815        // Note: We include Running instances because long-running orchestrations (e.g., with
1816        // ContinueAsNew) may have old executions that need pruning. The underlying prune_executions
1817        // call safely skips the current execution regardless of its status.
1818        let mut sql = format!(
1819            r#"
1820            SELECT i.instance_id
1821            FROM {}.instances i
1822            LEFT JOIN {}.executions e ON i.instance_id = e.instance_id 
1823              AND i.current_execution_id = e.execution_id
1824            WHERE 1=1
1825            "#,
1826            self.schema_name, self.schema_name
1827        );
1828
1829        // Add instance_ids filter if provided
1830        if let Some(ref ids) = filter.instance_ids {
1831            if ids.is_empty() {
1832                return Ok(PruneResult::default());
1833            }
1834            let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1835            sql.push_str(&format!(
1836                " AND i.instance_id IN ({})",
1837                placeholders.join(", ")
1838            ));
1839        }
1840
1841        // Add completed_before filter if provided
1842        if filter.completed_before.is_some() {
1843            let param_num = filter
1844                .instance_ids
1845                .as_ref()
1846                .map(|ids| ids.len())
1847                .unwrap_or(0)
1848                + 1;
1849            sql.push_str(&format!(
1850                " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1851                param_num
1852            ));
1853        }
1854
1855        // Add limit
1856        let limit = filter.limit.unwrap_or(1000);
1857        let limit_param_num = filter
1858            .instance_ids
1859            .as_ref()
1860            .map(|ids| ids.len())
1861            .unwrap_or(0)
1862            + if filter.completed_before.is_some() {
1863                1
1864            } else {
1865                0
1866            }
1867            + 1;
1868        sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1869
1870        // Build and execute query
1871        let mut query = sqlx::query_scalar::<_, String>(&sql);
1872        if let Some(ref ids) = filter.instance_ids {
1873            for id in ids {
1874                query = query.bind(id);
1875            }
1876        }
1877        if let Some(completed_before) = filter.completed_before {
1878            query = query.bind(completed_before as i64);
1879        }
1880        query = query.bind(limit as i64);
1881
1882        let instance_ids: Vec<String> = query
1883            .fetch_all(&*self.pool)
1884            .await
1885            .map_err(|e| Self::sqlx_to_provider_error("prune_executions_bulk", e))?;
1886
1887        // Prune each instance
1888        let mut result = PruneResult::default();
1889
1890        for instance_id in &instance_ids {
1891            let single_result = self.prune_executions(instance_id, options.clone()).await?;
1892            result.instances_processed += single_result.instances_processed;
1893            result.executions_deleted += single_result.executions_deleted;
1894            result.events_deleted += single_result.events_deleted;
1895        }
1896
1897        debug!(
1898            target = "duroxide::providers::postgres",
1899            operation = "prune_executions_bulk",
1900            instances_processed = result.instances_processed,
1901            executions_deleted = result.executions_deleted,
1902            events_deleted = result.events_deleted,
1903            "Bulk pruned executions"
1904        );
1905
1906        Ok(result)
1907    }
1908}