Skip to main content

duroxide_pg/
provider.rs

1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4    DeleteInstanceResult, DispatcherCapabilityFilter, ExecutionInfo,
5    ExecutionMetadata, InstanceFilter, InstanceInfo, OrchestrationItem, Provider, ProviderAdmin,
6    ProviderError, PruneOptions, PruneResult, QueueDepths, ScheduledActivityIdentifier,
7    SessionFetchConfig, SystemMetrics, WorkItem,
8};
9use duroxide::{Event, EventKind};
10use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
11use std::sync::Arc;
12use std::time::Duration;
13use std::time::{SystemTime, UNIX_EPOCH};
14use tokio::time::sleep;
15use tracing::{debug, error, instrument, warn};
16
17use crate::migrations::MigrationRunner;
18
19/// PostgreSQL-based provider for Duroxide durable orchestrations.
20///
21/// Implements the [`Provider`] and [`ProviderAdmin`] traits from Duroxide,
22/// storing orchestration state, history, and work queues in PostgreSQL.
23///
24/// # Example
25///
26/// ```rust,no_run
27/// use duroxide_pg::PostgresProvider;
28///
29/// # async fn example() -> anyhow::Result<()> {
30/// // Connect using DATABASE_URL or explicit connection string
31/// let provider = PostgresProvider::new("postgres://localhost/mydb").await?;
32///
33/// // Or use a custom schema for isolation
34/// let provider = PostgresProvider::new_with_schema(
35///     "postgres://localhost/mydb",
36///     Some("my_app"),
37/// ).await?;
38/// # Ok(())
39/// # }
40/// ```
41pub struct PostgresProvider {
42    pool: Arc<PgPool>,
43    schema_name: String,
44}
45
46impl PostgresProvider {
47    pub async fn new(database_url: &str) -> Result<Self> {
48        Self::new_with_schema(database_url, None).await
49    }
50
51    pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
52        let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
53            .ok()
54            .and_then(|s| s.parse::<u32>().ok())
55            .unwrap_or(10);
56
57        let pool = PgPoolOptions::new()
58            .max_connections(max_connections)
59            .min_connections(1)
60            .acquire_timeout(std::time::Duration::from_secs(30))
61            .connect(database_url)
62            .await?;
63
64        let schema_name = schema_name.unwrap_or("public").to_string();
65
66        let provider = Self {
67            pool: Arc::new(pool),
68            schema_name: schema_name.clone(),
69        };
70
71        // Run migrations to initialize schema
72        let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
73        migration_runner.migrate().await?;
74
75        Ok(provider)
76    }
77
78    #[instrument(skip(self), target = "duroxide::providers::postgres")]
79    pub async fn initialize_schema(&self) -> Result<()> {
80        // Schema initialization is now handled by migrations
81        // This method is kept for backward compatibility but delegates to migrations
82        let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
83        migration_runner.migrate().await?;
84        Ok(())
85    }
86
87    /// Get current timestamp in milliseconds (Unix epoch)
88    fn now_millis() -> i64 {
89        SystemTime::now()
90            .duration_since(UNIX_EPOCH)
91            .unwrap()
92            .as_millis() as i64
93    }
94
95
96
97    /// Get schema-qualified table name
98    fn table_name(&self, table: &str) -> String {
99        format!("{}.{}", self.schema_name, table)
100    }
101
102    /// Get the database pool (for testing)
103    pub fn pool(&self) -> &PgPool {
104        &self.pool
105    }
106
107    /// Get the schema name (for testing)
108    pub fn schema_name(&self) -> &str {
109        &self.schema_name
110    }
111
112    /// Convert sqlx::Error to ProviderError with proper classification
113    fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
114        match e {
115            SqlxError::Database(ref db_err) => {
116                // PostgreSQL error codes
117                let code_opt = db_err.code();
118                let code = code_opt.as_deref();
119                if code == Some("40P01") {
120                    // Deadlock detected
121                    ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
122                } else if code == Some("40001") {
123                    // Serialization failure - permanent error (transaction conflict, not transient)
124                    ProviderError::permanent(operation, format!("Serialization failure: {e}"))
125                } else if code == Some("23505") {
126                    // Unique constraint violation (duplicate event)
127                    ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
128                } else if code == Some("23503") {
129                    // Foreign key constraint violation
130                    ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
131                } else {
132                    ProviderError::permanent(operation, format!("Database error: {e}"))
133                }
134            }
135            SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
136                ProviderError::retryable(operation, format!("Connection pool error: {e}"))
137            }
138            SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
139            _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
140        }
141    }
142
143    /// Clean up schema after tests (drops all tables and optionally the schema)
144    ///
145    /// **SAFETY**: Never drops the "public" schema itself, only tables within it.
146    /// Only drops the schema if it's a custom schema (not "public").
147    pub async fn cleanup_schema(&self) -> Result<()> {
148        // Call the stored procedure to drop all tables
149        sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
150            .execute(&*self.pool)
151            .await?;
152
153        // SAFETY: Never drop the "public" schema - it's a PostgreSQL system schema
154        // Only drop custom schemas created for testing
155        if self.schema_name != "public" {
156            sqlx::query(&format!(
157                "DROP SCHEMA IF EXISTS {} CASCADE",
158                self.schema_name
159            ))
160            .execute(&*self.pool)
161            .await?;
162        } else {
163            // Explicit safeguard: we only drop tables from public schema, never the schema itself
164            // This ensures we don't accidentally drop the default PostgreSQL schema
165        }
166
167        Ok(())
168    }
169}
170
171#[async_trait::async_trait]
172impl Provider for PostgresProvider {
173    fn name(&self) -> &str {
174        "duroxide-pg"
175    }
176
177    fn version(&self) -> &str {
178        env!("CARGO_PKG_VERSION")
179    }
180
181    #[instrument(skip(self), target = "duroxide::providers::postgres")]
182    async fn fetch_orchestration_item(
183        &self,
184        lock_timeout: Duration,
185        _poll_timeout: Duration,
186        filter: Option<&DispatcherCapabilityFilter>,
187    ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
188        let start = std::time::Instant::now();
189
190        const MAX_RETRIES: u32 = 3;
191        const RETRY_DELAY_MS: u64 = 50;
192
193        // Convert Duration to milliseconds
194        let lock_timeout_ms = lock_timeout.as_millis() as i64;
195        let mut _last_error: Option<ProviderError> = None;
196
197        // Extract version filter from capability filter
198        let (min_packed, max_packed) = if let Some(f) = filter {
199            if let Some(range) = f.supported_duroxide_versions.first() {
200                let min = range.min.major as i64 * 1_000_000 + range.min.minor as i64 * 1_000 + range.min.patch as i64;
201                let max = range.max.major as i64 * 1_000_000 + range.max.minor as i64 * 1_000 + range.max.patch as i64;
202                (Some(min), Some(max))
203            } else {
204                // Empty supported_duroxide_versions = supports nothing
205                return Ok(None);
206            }
207        } else {
208            (None, None)
209        };
210
211        for attempt in 0..=MAX_RETRIES {
212            let now_ms = Self::now_millis();
213
214            let result: Result<
215                Option<(
216                    String,
217                    String,
218                    String,
219                    i64,
220                    serde_json::Value,
221                    serde_json::Value,
222                    String,
223                    i32,
224                )>,
225                SqlxError,
226            > = sqlx::query_as(&format!(
227                "SELECT * FROM {}.fetch_orchestration_item($1, $2, $3, $4)",
228                self.schema_name
229            ))
230            .bind(now_ms)
231            .bind(lock_timeout_ms)
232            .bind(min_packed)
233            .bind(max_packed)
234            .fetch_optional(&*self.pool)
235            .await;
236
237            let row = match result {
238                Ok(r) => r,
239                Err(e) => {
240                    let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
241                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
242                        warn!(
243                            target = "duroxide::providers::postgres",
244                            operation = "fetch_orchestration_item",
245                            attempt = attempt + 1,
246                            error = %provider_err,
247                            "Retryable error, will retry"
248                        );
249                        _last_error = Some(provider_err);
250                        sleep(std::time::Duration::from_millis(
251                            RETRY_DELAY_MS * (attempt as u64 + 1),
252                        ))
253                        .await;
254                        continue;
255                    }
256                    return Err(provider_err);
257                }
258            };
259
260            if let Some((
261                instance_id,
262                orchestration_name,
263                orchestration_version,
264                execution_id,
265                history_json,
266                messages_json,
267                lock_token,
268                attempt_count,
269            )) = row
270            {
271                let (history, history_error) = match serde_json::from_value::<Vec<Event>>(history_json) {
272                    Ok(h) => (h, None),
273                    Err(e) => {
274                        let error_msg = format!("Failed to deserialize history: {e}");
275                        warn!(
276                            target = "duroxide::providers::postgres",
277                            instance = %instance_id,
278                            error = %error_msg,
279                            "History deserialization failed, returning item with history_error"
280                        );
281                        (vec![], Some(error_msg))
282                    }
283                };
284
285                let messages: Vec<WorkItem> =
286                    serde_json::from_value(messages_json).map_err(|e| {
287                        ProviderError::permanent(
288                            "fetch_orchestration_item",
289                            format!("Failed to deserialize messages: {e}"),
290                        )
291                    })?;
292
293                let duration_ms = start.elapsed().as_millis() as u64;
294                debug!(
295                    target = "duroxide::providers::postgres",
296                    operation = "fetch_orchestration_item",
297                    instance_id = %instance_id,
298                    execution_id = execution_id,
299                    message_count = messages.len(),
300                    history_count = history.len(),
301                    attempt_count = attempt_count,
302                    duration_ms = duration_ms,
303                    attempts = attempt + 1,
304                    "Fetched orchestration item via stored procedure"
305                );
306
307                return Ok(Some((
308                    OrchestrationItem {
309                        instance: instance_id,
310                        orchestration_name,
311                        execution_id: execution_id as u64,
312                        version: orchestration_version,
313                        history,
314                        messages,
315                        history_error,
316                    },
317                    lock_token,
318                    attempt_count as u32,
319                )));
320            }
321
322            // No result found - return immediately (short polling behavior)
323            // Only retry with delay on retryable errors (handled above)
324            return Ok(None);
325        }
326
327        Ok(None)
328    }
329    #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
330    async fn ack_orchestration_item(
331        &self,
332        lock_token: &str,
333        execution_id: u64,
334        history_delta: Vec<Event>,
335        worker_items: Vec<WorkItem>,
336        orchestrator_items: Vec<WorkItem>,
337        metadata: ExecutionMetadata,
338        cancelled_activities: Vec<ScheduledActivityIdentifier>,
339    ) -> Result<(), ProviderError> {
340        let start = std::time::Instant::now();
341
342        const MAX_RETRIES: u32 = 3;
343        const RETRY_DELAY_MS: u64 = 50;
344
345        let mut history_delta_payload = Vec::with_capacity(history_delta.len());
346        for event in &history_delta {
347            if event.event_id() == 0 {
348                return Err(ProviderError::permanent(
349                    "ack_orchestration_item",
350                    "event_id must be set by runtime",
351                ));
352            }
353
354            let event_json = serde_json::to_string(event).map_err(|e| {
355                ProviderError::permanent(
356                    "ack_orchestration_item",
357                    format!("Failed to serialize event: {e}"),
358                )
359            })?;
360
361            let event_type = format!("{event:?}")
362                .split('{')
363                .next()
364                .unwrap_or("Unknown")
365                .trim()
366                .to_string();
367
368            history_delta_payload.push(serde_json::json!({
369                "event_id": event.event_id(),
370                "event_type": event_type,
371                "event_data": event_json,
372            }));
373        }
374
375        let history_delta_json = serde_json::Value::Array(history_delta_payload);
376
377        let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
378            ProviderError::permanent(
379                "ack_orchestration_item",
380                format!("Failed to serialize worker items: {e}"),
381            )
382        })?;
383
384        let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
385            ProviderError::permanent(
386                "ack_orchestration_item",
387                format!("Failed to serialize orchestrator items: {e}"),
388            )
389        })?;
390
391        // Scan history_delta for the last CustomStatusUpdated event
392        let (custom_status_action, custom_status_value): (Option<&str>, Option<&str>) = {
393            let mut last_status: Option<&Option<String>> = None;
394            for event in &history_delta {
395                if let EventKind::CustomStatusUpdated { ref status } = event.kind {
396                    last_status = Some(status);
397                }
398            }
399            match last_status {
400                Some(Some(s)) => (Some("set"), Some(s.as_str())),
401                Some(None) => (Some("clear"), None),
402                None => (None, None),
403            }
404        };
405
406        let metadata_json = serde_json::json!({
407            "orchestration_name": metadata.orchestration_name,
408            "orchestration_version": metadata.orchestration_version,
409            "status": metadata.status,
410            "output": metadata.output,
411            "parent_instance_id": metadata.parent_instance_id,
412            "pinned_duroxide_version": metadata.pinned_duroxide_version.as_ref().map(|v| {
413                serde_json::json!({
414                    "major": v.major,
415                    "minor": v.minor,
416                    "patch": v.patch,
417                })
418            }),
419            "custom_status_action": custom_status_action,
420            "custom_status_value": custom_status_value,
421        });
422
423        // Serialize cancelled activities for lock stealing
424        let cancelled_activities_json: Vec<serde_json::Value> = cancelled_activities
425            .iter()
426            .map(|a| {
427                serde_json::json!({
428                    "instance": a.instance,
429                    "execution_id": a.execution_id,
430                    "activity_id": a.activity_id,
431                })
432            })
433            .collect();
434        let cancelled_activities_json = serde_json::Value::Array(cancelled_activities_json);
435
436        for attempt in 0..=MAX_RETRIES {
437            let now_ms = Self::now_millis();
438            let result = sqlx::query(&format!(
439                "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6, $7, $8)",
440                self.schema_name
441            ))
442            .bind(lock_token)
443            .bind(now_ms)
444            .bind(execution_id as i64)
445            .bind(&history_delta_json)
446            .bind(&worker_items_json)
447            .bind(&orchestrator_items_json)
448            .bind(&metadata_json)
449            .bind(&cancelled_activities_json)
450            .execute(&*self.pool)
451            .await;
452
453            match result {
454                Ok(_) => {
455                    let duration_ms = start.elapsed().as_millis() as u64;
456                    debug!(
457                        target = "duroxide::providers::postgres",
458                        operation = "ack_orchestration_item",
459                        execution_id = execution_id,
460                        history_count = history_delta.len(),
461                        worker_items_count = worker_items.len(),
462                        orchestrator_items_count = orchestrator_items.len(),
463                        cancelled_activities_count = cancelled_activities.len(),
464                        duration_ms = duration_ms,
465                        attempts = attempt + 1,
466                        "Acknowledged orchestration item via stored procedure"
467                    );
468                    return Ok(());
469                }
470                Err(e) => {
471                    // Check for permanent errors first
472                    if let SqlxError::Database(db_err) = &e {
473                        if db_err.message().contains("Invalid lock token") {
474                            return Err(ProviderError::permanent(
475                                "ack_orchestration_item",
476                                "Invalid lock token",
477                            ));
478                        }
479                    } else if e.to_string().contains("Invalid lock token") {
480                        return Err(ProviderError::permanent(
481                            "ack_orchestration_item",
482                            "Invalid lock token",
483                        ));
484                    }
485
486                    let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
487                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
488                        warn!(
489                            target = "duroxide::providers::postgres",
490                            operation = "ack_orchestration_item",
491                            attempt = attempt + 1,
492                            error = %provider_err,
493                            "Retryable error, will retry"
494                        );
495                        sleep(std::time::Duration::from_millis(
496                            RETRY_DELAY_MS * (attempt as u64 + 1),
497                        ))
498                        .await;
499                        continue;
500                    }
501                    return Err(provider_err);
502                }
503            }
504        }
505
506        // Should never reach here, but just in case
507        Ok(())
508    }
509    #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
510    async fn abandon_orchestration_item(
511        &self,
512        lock_token: &str,
513        delay: Option<Duration>,
514        ignore_attempt: bool,
515    ) -> Result<(), ProviderError> {
516        let start = std::time::Instant::now();
517        let now_ms = Self::now_millis();
518        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
519
520        let instance_id = match sqlx::query_scalar::<_, String>(&format!(
521            "SELECT {}.abandon_orchestration_item($1, $2, $3, $4)",
522            self.schema_name
523        ))
524        .bind(lock_token)
525        .bind(now_ms)
526        .bind(delay_param)
527        .bind(ignore_attempt)
528        .fetch_one(&*self.pool)
529        .await
530        {
531            Ok(instance_id) => instance_id,
532            Err(e) => {
533                if let SqlxError::Database(db_err) = &e {
534                    if db_err.message().contains("Invalid lock token") {
535                        return Err(ProviderError::permanent(
536                            "abandon_orchestration_item",
537                            "Invalid lock token",
538                        ));
539                    }
540                } else if e.to_string().contains("Invalid lock token") {
541                    return Err(ProviderError::permanent(
542                        "abandon_orchestration_item",
543                        "Invalid lock token",
544                    ));
545                }
546
547                return Err(Self::sqlx_to_provider_error(
548                    "abandon_orchestration_item",
549                    e,
550                ));
551            }
552        };
553
554        let duration_ms = start.elapsed().as_millis() as u64;
555        debug!(
556            target = "duroxide::providers::postgres",
557            operation = "abandon_orchestration_item",
558            instance_id = %instance_id,
559            delay_ms = delay.map(|d| d.as_millis() as u64),
560            ignore_attempt = ignore_attempt,
561            duration_ms = duration_ms,
562            "Abandoned orchestration item via stored procedure"
563        );
564
565        Ok(())
566    }
567
568    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
569    async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
570        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
571            "SELECT out_event_data FROM {}.fetch_history($1)",
572            self.schema_name
573        ))
574        .bind(instance)
575        .fetch_all(&*self.pool)
576        .await
577        .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
578
579        Ok(event_data_rows
580            .into_iter()
581            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
582            .collect())
583    }
584
585    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
586    async fn append_with_execution(
587        &self,
588        instance: &str,
589        execution_id: u64,
590        new_events: Vec<Event>,
591    ) -> Result<(), ProviderError> {
592        if new_events.is_empty() {
593            return Ok(());
594        }
595
596        let mut events_payload = Vec::with_capacity(new_events.len());
597        for event in &new_events {
598            if event.event_id() == 0 {
599                error!(
600                    target = "duroxide::providers::postgres",
601                    operation = "append_with_execution",
602                    error_type = "validation_error",
603                    instance_id = %instance,
604                    execution_id = execution_id,
605                    "event_id must be set by runtime"
606                );
607                return Err(ProviderError::permanent(
608                    "append_with_execution",
609                    "event_id must be set by runtime",
610                ));
611            }
612
613            let event_json = serde_json::to_string(event).map_err(|e| {
614                ProviderError::permanent(
615                    "append_with_execution",
616                    format!("Failed to serialize event: {e}"),
617                )
618            })?;
619
620            let event_type = format!("{event:?}")
621                .split('{')
622                .next()
623                .unwrap_or("Unknown")
624                .trim()
625                .to_string();
626
627            events_payload.push(serde_json::json!({
628                "event_id": event.event_id(),
629                "event_type": event_type,
630                "event_data": event_json,
631            }));
632        }
633
634        let events_json = serde_json::Value::Array(events_payload);
635
636        sqlx::query(&format!(
637            "SELECT {}.append_history($1, $2, $3)",
638            self.schema_name
639        ))
640        .bind(instance)
641        .bind(execution_id as i64)
642        .bind(events_json)
643        .execute(&*self.pool)
644        .await
645        .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
646
647        debug!(
648            target = "duroxide::providers::postgres",
649            operation = "append_with_execution",
650            instance_id = %instance,
651            execution_id = execution_id,
652            event_count = new_events.len(),
653            "Appended history events via stored procedure"
654        );
655
656        Ok(())
657    }
658
659    #[instrument(skip(self), target = "duroxide::providers::postgres")]
660    async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
661        let work_item = serde_json::to_string(&item).map_err(|e| {
662            ProviderError::permanent(
663                "enqueue_worker_work",
664                format!("Failed to serialize work item: {e}"),
665            )
666        })?;
667
668        let now_ms = Self::now_millis();
669
670        // Extract activity identification and session_id for ActivityExecute items
671        let (instance_id, execution_id, activity_id, session_id) = match &item {
672            WorkItem::ActivityExecute {
673                instance,
674                execution_id,
675                id,
676                session_id,
677                ..
678            } => (
679                Some(instance.clone()),
680                Some(*execution_id as i64),
681                Some(*id as i64),
682                session_id.clone(),
683            ),
684            _ => (None, None, None, None),
685        };
686
687        sqlx::query(&format!(
688            "SELECT {}.enqueue_worker_work($1, $2, $3, $4, $5, $6)",
689            self.schema_name
690        ))
691        .bind(work_item)
692        .bind(now_ms)
693        .bind(&instance_id)
694        .bind(execution_id)
695        .bind(activity_id)
696        .bind(&session_id)
697        .execute(&*self.pool)
698        .await
699        .map_err(|e| {
700            error!(
701                target = "duroxide::providers::postgres",
702                operation = "enqueue_worker_work",
703                error_type = "database_error",
704                error = %e,
705                "Failed to enqueue worker work"
706            );
707            Self::sqlx_to_provider_error("enqueue_worker_work", e)
708        })?;
709
710        Ok(())
711    }
712
713    #[instrument(skip(self), target = "duroxide::providers::postgres")]
714    async fn fetch_work_item(
715        &self,
716        lock_timeout: Duration,
717        _poll_timeout: Duration,
718        session: Option<&SessionFetchConfig>,
719    ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
720        let start = std::time::Instant::now();
721
722        // Convert Duration to milliseconds
723        let lock_timeout_ms = lock_timeout.as_millis() as i64;
724
725        // Extract session parameters
726        let (owner_id, session_lock_timeout_ms): (Option<&str>, Option<i64>) = match session {
727            Some(config) => (
728                Some(&config.owner_id),
729                Some(config.lock_timeout.as_millis() as i64),
730            ),
731            None => (None, None),
732        };
733
734        let row = match sqlx::query_as::<_, (String, String, i32)>(&format!(
735            "SELECT * FROM {}.fetch_work_item($1, $2, $3, $4)",
736            self.schema_name
737        ))
738        .bind(Self::now_millis())
739        .bind(lock_timeout_ms)
740        .bind(owner_id)
741        .bind(session_lock_timeout_ms)
742        .fetch_optional(&*self.pool)
743        .await
744        {
745            Ok(row) => row,
746            Err(e) => {
747                return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
748            }
749        };
750
751        let (work_item_json, lock_token, attempt_count) = match row {
752            Some(row) => row,
753            None => return Ok(None),
754        };
755
756        let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
757            ProviderError::permanent(
758                "fetch_work_item",
759                format!("Failed to deserialize worker item: {e}"),
760            )
761        })?;
762
763        let duration_ms = start.elapsed().as_millis() as u64;
764
765        // Extract instance for logging - different work item types have different structures
766        let instance_id = match &work_item {
767            WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
768            WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
769            WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
770            WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
771            WorkItem::TimerFired { instance, .. } => instance.as_str(),
772            WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
773            WorkItem::CancelInstance { instance, .. } => instance.as_str(),
774            WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
775            WorkItem::SubOrchCompleted {
776                parent_instance, ..
777            } => parent_instance.as_str(),
778            WorkItem::SubOrchFailed {
779                parent_instance, ..
780            } => parent_instance.as_str(),
781            WorkItem::QueueMessage { instance, .. } => instance.as_str(),
782        };
783
784        debug!(
785            target = "duroxide::providers::postgres",
786            operation = "fetch_work_item",
787            instance_id = %instance_id,
788            attempt_count = attempt_count,
789            duration_ms = duration_ms,
790            "Fetched activity work item via stored procedure"
791        );
792
793        Ok(Some((
794            work_item,
795            lock_token,
796            attempt_count as u32,
797        )))
798    }
799
800    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
801    async fn ack_work_item(
802        &self,
803        token: &str,
804        completion: Option<WorkItem>,
805    ) -> Result<(), ProviderError> {
806        let start = std::time::Instant::now();
807
808        // If no completion provided (e.g., cancelled activity), just delete the item
809        let Some(completion) = completion else {
810            let now_ms = Self::now_millis();
811            // Call ack_worker with NULL completion to delete without enqueueing
812            sqlx::query(&format!(
813                "SELECT {}.ack_worker($1, NULL, NULL, $2)",
814                self.schema_name
815            ))
816            .bind(token)
817            .bind(now_ms)
818            .execute(&*self.pool)
819            .await
820            .map_err(|e| {
821                if e.to_string().contains("Worker queue item not found") {
822                    ProviderError::permanent(
823                        "ack_worker",
824                        "Worker queue item not found or already processed",
825                    )
826                } else {
827                    Self::sqlx_to_provider_error("ack_worker", e)
828                }
829            })?;
830
831            let duration_ms = start.elapsed().as_millis() as u64;
832            debug!(
833                target = "duroxide::providers::postgres",
834                operation = "ack_worker",
835                token = %token,
836                duration_ms = duration_ms,
837                "Acknowledged worker without completion (cancelled)"
838            );
839            return Ok(());
840        };
841
842        // Extract instance ID from completion WorkItem
843        let instance_id = match &completion {
844            WorkItem::ActivityCompleted { instance, .. }
845            | WorkItem::ActivityFailed { instance, .. } => instance,
846            _ => {
847                error!(
848                    target = "duroxide::providers::postgres",
849                    operation = "ack_worker",
850                    error_type = "invalid_completion_type",
851                    "Invalid completion work item type"
852                );
853                return Err(ProviderError::permanent(
854                    "ack_worker",
855                    "Invalid completion work item type",
856                ));
857            }
858        };
859
860        let completion_json = serde_json::to_string(&completion).map_err(|e| {
861            ProviderError::permanent("ack_worker", format!("Failed to serialize completion: {e}"))
862        })?;
863
864        let now_ms = Self::now_millis();
865
866        // Call stored procedure to atomically delete worker item and enqueue completion
867        sqlx::query(&format!(
868            "SELECT {}.ack_worker($1, $2, $3, $4)",
869            self.schema_name
870        ))
871        .bind(token)
872        .bind(instance_id)
873        .bind(completion_json)
874        .bind(now_ms)
875        .execute(&*self.pool)
876        .await
877        .map_err(|e| {
878            if e.to_string().contains("Worker queue item not found") {
879                error!(
880                    target = "duroxide::providers::postgres",
881                    operation = "ack_worker",
882                    error_type = "worker_item_not_found",
883                    token = %token,
884                    "Worker queue item not found or already processed"
885                );
886                ProviderError::permanent(
887                    "ack_worker",
888                    "Worker queue item not found or already processed",
889                )
890            } else {
891                Self::sqlx_to_provider_error("ack_worker", e)
892            }
893        })?;
894
895        let duration_ms = start.elapsed().as_millis() as u64;
896        debug!(
897            target = "duroxide::providers::postgres",
898            operation = "ack_worker",
899            instance_id = %instance_id,
900            duration_ms = duration_ms,
901            "Acknowledged worker and enqueued completion"
902        );
903
904        Ok(())
905    }
906
907    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
908    async fn renew_work_item_lock(
909        &self,
910        token: &str,
911        extend_for: Duration,
912    ) -> Result<(), ProviderError> {
913        let start = std::time::Instant::now();
914
915        // Get current time from application for consistent time reference
916        let now_ms = Self::now_millis();
917
918        // Convert Duration to seconds for the stored procedure
919        let extend_secs = extend_for.as_secs() as i64;
920
921        match sqlx::query(&format!(
922            "SELECT {}.renew_work_item_lock($1, $2, $3)",
923            self.schema_name
924        ))
925        .bind(token)
926        .bind(now_ms)
927        .bind(extend_secs)
928        .execute(&*self.pool)
929        .await
930        {
931            Ok(_) => {
932                let duration_ms = start.elapsed().as_millis() as u64;
933                debug!(
934                    target = "duroxide::providers::postgres",
935                    operation = "renew_work_item_lock",
936                    token = %token,
937                    extend_for_secs = extend_secs,
938                    duration_ms = duration_ms,
939                    "Work item lock renewed successfully"
940                );
941                Ok(())
942            }
943            Err(e) => {
944                if let SqlxError::Database(db_err) = &e {
945                    if db_err.message().contains("Lock token invalid") {
946                        return Err(ProviderError::permanent(
947                            "renew_work_item_lock",
948                            "Lock token invalid, expired, or already acked",
949                        ));
950                    }
951                } else if e.to_string().contains("Lock token invalid") {
952                    return Err(ProviderError::permanent(
953                        "renew_work_item_lock",
954                        "Lock token invalid, expired, or already acked",
955                    ));
956                }
957
958                Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
959            }
960        }
961    }
962
963    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
964    async fn abandon_work_item(
965        &self,
966        token: &str,
967        delay: Option<Duration>,
968        ignore_attempt: bool,
969    ) -> Result<(), ProviderError> {
970        let start = std::time::Instant::now();
971        let now_ms = Self::now_millis();
972        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
973
974        match sqlx::query(&format!(
975            "SELECT {}.abandon_work_item($1, $2, $3, $4)",
976            self.schema_name
977        ))
978        .bind(token)
979        .bind(now_ms)
980        .bind(delay_param)
981        .bind(ignore_attempt)
982        .execute(&*self.pool)
983        .await
984        {
985            Ok(_) => {
986                let duration_ms = start.elapsed().as_millis() as u64;
987                debug!(
988                    target = "duroxide::providers::postgres",
989                    operation = "abandon_work_item",
990                    token = %token,
991                    delay_ms = delay.map(|d| d.as_millis() as u64),
992                    ignore_attempt = ignore_attempt,
993                    duration_ms = duration_ms,
994                    "Abandoned work item via stored procedure"
995                );
996                Ok(())
997            }
998            Err(e) => {
999                if let SqlxError::Database(db_err) = &e {
1000                    if db_err.message().contains("Invalid lock token")
1001                        || db_err.message().contains("already acked")
1002                    {
1003                        return Err(ProviderError::permanent(
1004                            "abandon_work_item",
1005                            "Invalid lock token or already acked",
1006                        ));
1007                    }
1008                } else if e.to_string().contains("Invalid lock token")
1009                    || e.to_string().contains("already acked")
1010                {
1011                    return Err(ProviderError::permanent(
1012                        "abandon_work_item",
1013                        "Invalid lock token or already acked",
1014                    ));
1015                }
1016
1017                Err(Self::sqlx_to_provider_error("abandon_work_item", e))
1018            }
1019        }
1020    }
1021
1022    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1023    async fn renew_orchestration_item_lock(
1024        &self,
1025        token: &str,
1026        extend_for: Duration,
1027    ) -> Result<(), ProviderError> {
1028        let start = std::time::Instant::now();
1029
1030        // Get current time from application for consistent time reference
1031        let now_ms = Self::now_millis();
1032
1033        // Convert Duration to seconds for the stored procedure
1034        let extend_secs = extend_for.as_secs() as i64;
1035
1036        match sqlx::query(&format!(
1037            "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
1038            self.schema_name
1039        ))
1040        .bind(token)
1041        .bind(now_ms)
1042        .bind(extend_secs)
1043        .execute(&*self.pool)
1044        .await
1045        {
1046            Ok(_) => {
1047                let duration_ms = start.elapsed().as_millis() as u64;
1048                debug!(
1049                    target = "duroxide::providers::postgres",
1050                    operation = "renew_orchestration_item_lock",
1051                    token = %token,
1052                    extend_for_secs = extend_secs,
1053                    duration_ms = duration_ms,
1054                    "Orchestration item lock renewed successfully"
1055                );
1056                Ok(())
1057            }
1058            Err(e) => {
1059                if let SqlxError::Database(db_err) = &e {
1060                    if db_err.message().contains("Lock token invalid")
1061                        || db_err.message().contains("expired")
1062                        || db_err.message().contains("already released")
1063                    {
1064                        return Err(ProviderError::permanent(
1065                            "renew_orchestration_item_lock",
1066                            "Lock token invalid, expired, or already released",
1067                        ));
1068                    }
1069                } else if e.to_string().contains("Lock token invalid")
1070                    || e.to_string().contains("expired")
1071                    || e.to_string().contains("already released")
1072                {
1073                    return Err(ProviderError::permanent(
1074                        "renew_orchestration_item_lock",
1075                        "Lock token invalid, expired, or already released",
1076                    ));
1077                }
1078
1079                Err(Self::sqlx_to_provider_error(
1080                    "renew_orchestration_item_lock",
1081                    e,
1082                ))
1083            }
1084        }
1085    }
1086
1087    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1088    async fn enqueue_for_orchestrator(
1089        &self,
1090        item: WorkItem,
1091        delay: Option<Duration>,
1092    ) -> Result<(), ProviderError> {
1093        let work_item = serde_json::to_string(&item).map_err(|e| {
1094            ProviderError::permanent(
1095                "enqueue_orchestrator_work",
1096                format!("Failed to serialize work item: {e}"),
1097            )
1098        })?;
1099
1100        // Extract instance ID from WorkItem enum
1101        let instance_id = match &item {
1102            WorkItem::StartOrchestration { instance, .. }
1103            | WorkItem::ActivityCompleted { instance, .. }
1104            | WorkItem::ActivityFailed { instance, .. }
1105            | WorkItem::TimerFired { instance, .. }
1106            | WorkItem::ExternalRaised { instance, .. }
1107            | WorkItem::CancelInstance { instance, .. }
1108            | WorkItem::ContinueAsNew { instance, .. }
1109            | WorkItem::QueueMessage { instance, .. } => instance,
1110            WorkItem::SubOrchCompleted {
1111                parent_instance, ..
1112            }
1113            | WorkItem::SubOrchFailed {
1114                parent_instance, ..
1115            } => parent_instance,
1116            WorkItem::ActivityExecute { .. } => {
1117                return Err(ProviderError::permanent(
1118                    "enqueue_orchestrator_work",
1119                    "ActivityExecute should go to worker queue, not orchestrator queue",
1120                ));
1121            }
1122        };
1123
1124        // Determine visible_at: use max of fire_at_ms (for TimerFired) and delay
1125        let now_ms = Self::now_millis();
1126
1127        let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
1128            if *fire_at_ms > 0 {
1129                // Take max of fire_at_ms and delay (if provided)
1130                if let Some(delay) = delay {
1131                    std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
1132                } else {
1133                    *fire_at_ms
1134                }
1135            } else {
1136                // fire_at_ms is 0, use delay or NOW()
1137                delay
1138                    .map(|d| now_ms as u64 + d.as_millis() as u64)
1139                    .unwrap_or(now_ms as u64)
1140            }
1141        } else {
1142            // Non-timer item: use delay or NOW()
1143            delay
1144                .map(|d| now_ms as u64 + d.as_millis() as u64)
1145                .unwrap_or(now_ms as u64)
1146        };
1147
1148        let visible_at = Utc
1149            .timestamp_millis_opt(visible_at_ms as i64)
1150            .single()
1151            .ok_or_else(|| {
1152                ProviderError::permanent(
1153                    "enqueue_orchestrator_work",
1154                    "Invalid visible_at timestamp",
1155                )
1156            })?;
1157
1158        // ⚠️ CRITICAL: DO NOT extract orchestration metadata - instance creation happens via ack_orchestration_item metadata
1159        // Pass NULL for orchestration_name, orchestration_version, execution_id parameters
1160
1161        // Call stored procedure to enqueue work
1162        sqlx::query(&format!(
1163            "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
1164            self.schema_name
1165        ))
1166        .bind(instance_id)
1167        .bind(&work_item)
1168        .bind(visible_at)
1169        .bind::<Option<String>>(None) // orchestration_name - NULL
1170        .bind::<Option<String>>(None) // orchestration_version - NULL
1171        .bind::<Option<i64>>(None) // execution_id - NULL
1172        .execute(&*self.pool)
1173        .await
1174        .map_err(|e| {
1175            error!(
1176                target = "duroxide::providers::postgres",
1177                operation = "enqueue_orchestrator_work",
1178                error_type = "database_error",
1179                error = %e,
1180                instance_id = %instance_id,
1181                "Failed to enqueue orchestrator work"
1182            );
1183            Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1184        })?;
1185
1186        debug!(
1187            target = "duroxide::providers::postgres",
1188            operation = "enqueue_orchestrator_work",
1189            instance_id = %instance_id,
1190            delay_ms = delay.map(|d| d.as_millis() as u64),
1191            "Enqueued orchestrator work"
1192        );
1193
1194        Ok(())
1195    }
1196
1197    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1198    async fn read_with_execution(
1199        &self,
1200        instance: &str,
1201        execution_id: u64,
1202    ) -> Result<Vec<Event>, ProviderError> {
1203        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1204            "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1205            self.table_name("history")
1206        ))
1207        .bind(instance)
1208        .bind(execution_id as i64)
1209        .fetch_all(&*self.pool)
1210        .await
1211        .ok()
1212        .unwrap_or_default();
1213
1214        Ok(event_data_rows
1215            .into_iter()
1216            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1217            .collect())
1218    }
1219
1220    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1221    async fn renew_session_lock(
1222        &self,
1223        owner_ids: &[&str],
1224        extend_for: Duration,
1225        idle_timeout: Duration,
1226    ) -> Result<usize, ProviderError> {
1227        if owner_ids.is_empty() {
1228            return Ok(0);
1229        }
1230
1231        let now_ms = Self::now_millis();
1232        let extend_ms = extend_for.as_millis() as i64;
1233        let idle_timeout_ms = idle_timeout.as_millis() as i64;
1234        let owner_ids_vec: Vec<&str> = owner_ids.to_vec();
1235
1236        let result = sqlx::query_scalar::<_, i64>(&format!(
1237            "SELECT {}.renew_session_lock($1, $2, $3, $4)",
1238            self.schema_name
1239        ))
1240        .bind(&owner_ids_vec)
1241        .bind(now_ms)
1242        .bind(extend_ms)
1243        .bind(idle_timeout_ms)
1244        .fetch_one(&*self.pool)
1245        .await
1246        .map_err(|e| Self::sqlx_to_provider_error("renew_session_lock", e))?;
1247
1248        debug!(
1249            target = "duroxide::providers::postgres",
1250            operation = "renew_session_lock",
1251            owner_count = owner_ids.len(),
1252            sessions_renewed = result,
1253            "Session locks renewed"
1254        );
1255
1256        Ok(result as usize)
1257    }
1258
1259    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1260    async fn cleanup_orphaned_sessions(
1261        &self,
1262        _idle_timeout: Duration,
1263    ) -> Result<usize, ProviderError> {
1264        let now_ms = Self::now_millis();
1265
1266        let result = sqlx::query_scalar::<_, i64>(&format!(
1267            "SELECT {}.cleanup_orphaned_sessions($1)",
1268            self.schema_name
1269        ))
1270        .bind(now_ms)
1271        .fetch_one(&*self.pool)
1272        .await
1273        .map_err(|e| Self::sqlx_to_provider_error("cleanup_orphaned_sessions", e))?;
1274
1275        debug!(
1276            target = "duroxide::providers::postgres",
1277            operation = "cleanup_orphaned_sessions",
1278            sessions_cleaned = result,
1279            "Orphaned sessions cleaned up"
1280        );
1281
1282        Ok(result as usize)
1283    }
1284
1285    fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1286        Some(self)
1287    }
1288
1289    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1290    async fn get_custom_status(
1291        &self,
1292        instance: &str,
1293        last_seen_version: u64,
1294    ) -> Result<Option<(Option<String>, u64)>, ProviderError> {
1295        let row = sqlx::query_as::<_, (Option<String>, i64)>(&format!(
1296            "SELECT * FROM {}.get_custom_status($1, $2)",
1297            self.schema_name
1298        ))
1299        .bind(instance)
1300        .bind(last_seen_version as i64)
1301        .fetch_optional(&*self.pool)
1302        .await
1303        .map_err(|e| Self::sqlx_to_provider_error("get_custom_status", e))?;
1304
1305        match row {
1306            Some((custom_status, version)) => Ok(Some((custom_status, version as u64))),
1307            None => Ok(None),
1308        }
1309    }
1310}
1311
1312#[async_trait::async_trait]
1313impl ProviderAdmin for PostgresProvider {
1314    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1315    async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1316        sqlx::query_scalar(&format!(
1317            "SELECT instance_id FROM {}.list_instances()",
1318            self.schema_name
1319        ))
1320        .fetch_all(&*self.pool)
1321        .await
1322        .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1323    }
1324
1325    #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1326    async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1327        sqlx::query_scalar(&format!(
1328            "SELECT instance_id FROM {}.list_instances_by_status($1)",
1329            self.schema_name
1330        ))
1331        .bind(status)
1332        .fetch_all(&*self.pool)
1333        .await
1334        .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1335    }
1336
1337    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1338    async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1339        let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1340            "SELECT execution_id FROM {}.list_executions($1)",
1341            self.schema_name
1342        ))
1343        .bind(instance)
1344        .fetch_all(&*self.pool)
1345        .await
1346        .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1347
1348        Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1349    }
1350
1351    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1352    async fn read_history_with_execution_id(
1353        &self,
1354        instance: &str,
1355        execution_id: u64,
1356    ) -> Result<Vec<Event>, ProviderError> {
1357        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1358            "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1359            self.schema_name
1360        ))
1361        .bind(instance)
1362        .bind(execution_id as i64)
1363        .fetch_all(&*self.pool)
1364        .await
1365        .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1366
1367        event_data_rows
1368            .into_iter()
1369            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1370            .collect::<Vec<Event>>()
1371            .into_iter()
1372            .map(Ok)
1373            .collect()
1374    }
1375
1376    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1377    async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1378        let execution_id = self.latest_execution_id(instance).await?;
1379        self.read_history_with_execution_id(instance, execution_id)
1380            .await
1381    }
1382
1383    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1384    async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1385        sqlx::query_scalar(&format!(
1386            "SELECT {}.latest_execution_id($1)",
1387            self.schema_name
1388        ))
1389        .bind(instance)
1390        .fetch_optional(&*self.pool)
1391        .await
1392        .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1393        .map(|id: i64| id as u64)
1394        .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1395    }
1396
1397    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1398    async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1399        let row: Option<(
1400            String,
1401            String,
1402            String,
1403            i64,
1404            chrono::DateTime<Utc>,
1405            Option<chrono::DateTime<Utc>>,
1406            Option<String>,
1407            Option<String>,
1408            Option<String>,
1409        )> = sqlx::query_as(&format!(
1410            "SELECT * FROM {}.get_instance_info($1)",
1411            self.schema_name
1412        ))
1413        .bind(instance)
1414        .fetch_optional(&*self.pool)
1415        .await
1416        .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1417
1418        let (
1419            instance_id,
1420            orchestration_name,
1421            orchestration_version,
1422            current_execution_id,
1423            created_at,
1424            updated_at,
1425            status,
1426            output,
1427            parent_instance_id,
1428        ) =
1429            row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1430
1431        Ok(InstanceInfo {
1432            instance_id,
1433            orchestration_name,
1434            orchestration_version,
1435            current_execution_id: current_execution_id as u64,
1436            status: status.unwrap_or_else(|| "Running".to_string()),
1437            output,
1438            created_at: created_at.timestamp_millis() as u64,
1439            updated_at: updated_at
1440                .map(|dt| dt.timestamp_millis() as u64)
1441                .unwrap_or(created_at.timestamp_millis() as u64),
1442            parent_instance_id,
1443        })
1444    }
1445
1446    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1447    async fn get_execution_info(
1448        &self,
1449        instance: &str,
1450        execution_id: u64,
1451    ) -> Result<ExecutionInfo, ProviderError> {
1452        let row: Option<(
1453            i64,
1454            String,
1455            Option<String>,
1456            chrono::DateTime<Utc>,
1457            Option<chrono::DateTime<Utc>>,
1458            i64,
1459        )> = sqlx::query_as(&format!(
1460            "SELECT * FROM {}.get_execution_info($1, $2)",
1461            self.schema_name
1462        ))
1463        .bind(instance)
1464        .bind(execution_id as i64)
1465        .fetch_optional(&*self.pool)
1466        .await
1467        .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1468
1469        let (exec_id, status, output, started_at, completed_at, event_count) = row
1470            .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1471
1472        Ok(ExecutionInfo {
1473            execution_id: exec_id as u64,
1474            status,
1475            output,
1476            started_at: started_at.timestamp_millis() as u64,
1477            completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1478            event_count: event_count as usize,
1479        })
1480    }
1481
1482    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1483    async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1484        let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1485            "SELECT * FROM {}.get_system_metrics()",
1486            self.schema_name
1487        ))
1488        .fetch_optional(&*self.pool)
1489        .await
1490        .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1491
1492        let (
1493            total_instances,
1494            total_executions,
1495            running_instances,
1496            completed_instances,
1497            failed_instances,
1498            total_events,
1499        ) = row.ok_or_else(|| {
1500            ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1501        })?;
1502
1503        Ok(SystemMetrics {
1504            total_instances: total_instances as u64,
1505            total_executions: total_executions as u64,
1506            running_instances: running_instances as u64,
1507            completed_instances: completed_instances as u64,
1508            failed_instances: failed_instances as u64,
1509            total_events: total_events as u64,
1510        })
1511    }
1512
1513    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1514    async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1515        let now_ms = Self::now_millis();
1516
1517        let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1518            "SELECT * FROM {}.get_queue_depths($1)",
1519            self.schema_name
1520        ))
1521        .bind(now_ms)
1522        .fetch_optional(&*self.pool)
1523        .await
1524        .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1525
1526        let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1527            ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1528        })?;
1529
1530        Ok(QueueDepths {
1531            orchestrator_queue: orchestrator_queue as usize,
1532            worker_queue: worker_queue as usize,
1533            timer_queue: 0, // Timers are in orchestrator queue with delayed visibility
1534        })
1535    }
1536
1537    // ===== Hierarchy Primitive Operations =====
1538
1539    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1540    async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError> {
1541        sqlx::query_scalar(&format!(
1542            "SELECT child_instance_id FROM {}.list_children($1)",
1543            self.schema_name
1544        ))
1545        .bind(instance_id)
1546        .fetch_all(&*self.pool)
1547        .await
1548        .map_err(|e| Self::sqlx_to_provider_error("list_children", e))
1549    }
1550
1551    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1552    async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError> {
1553        // The stored procedure raises an exception if instance doesn't exist
1554        // Otherwise returns the parent_instance_id (which may be NULL)
1555        let result: Result<Option<String>, _> = sqlx::query_scalar(&format!(
1556            "SELECT {}.get_parent_id($1)",
1557            self.schema_name
1558        ))
1559        .bind(instance_id)
1560        .fetch_one(&*self.pool)
1561        .await;
1562
1563        match result {
1564            Ok(parent_id) => Ok(parent_id),
1565            Err(e) => {
1566                let err_str = e.to_string();
1567                if err_str.contains("Instance not found") {
1568                    Err(ProviderError::permanent(
1569                        "get_parent_id",
1570                        format!("Instance not found: {}", instance_id),
1571                    ))
1572                } else {
1573                    Err(Self::sqlx_to_provider_error("get_parent_id", e))
1574                }
1575            }
1576        }
1577    }
1578
1579    // ===== Deletion Operations =====
1580
1581    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1582    async fn delete_instances_atomic(
1583        &self,
1584        ids: &[String],
1585        force: bool,
1586    ) -> Result<DeleteInstanceResult, ProviderError> {
1587        if ids.is_empty() {
1588            return Ok(DeleteInstanceResult::default());
1589        }
1590
1591        let row: Option<(i64, i64, i64, i64)> = sqlx::query_as(&format!(
1592            "SELECT * FROM {}.delete_instances_atomic($1, $2)",
1593            self.schema_name
1594        ))
1595        .bind(ids)
1596        .bind(force)
1597        .fetch_optional(&*self.pool)
1598        .await
1599        .map_err(|e| {
1600            let err_str = e.to_string();
1601            if err_str.contains("is Running") {
1602                ProviderError::permanent(
1603                    "delete_instances_atomic",
1604                    err_str,
1605                )
1606            } else if err_str.contains("Orphan detected") {
1607                ProviderError::permanent(
1608                    "delete_instances_atomic",
1609                    err_str,
1610                )
1611            } else {
1612                Self::sqlx_to_provider_error("delete_instances_atomic", e)
1613            }
1614        })?;
1615
1616        let (instances_deleted, executions_deleted, events_deleted, queue_messages_deleted) =
1617            row.unwrap_or((0, 0, 0, 0));
1618
1619        debug!(
1620            target = "duroxide::providers::postgres",
1621            operation = "delete_instances_atomic",
1622            instances_deleted = instances_deleted,
1623            executions_deleted = executions_deleted,
1624            events_deleted = events_deleted,
1625            queue_messages_deleted = queue_messages_deleted,
1626            "Deleted instances atomically"
1627        );
1628
1629        Ok(DeleteInstanceResult {
1630            instances_deleted: instances_deleted as u64,
1631            executions_deleted: executions_deleted as u64,
1632            events_deleted: events_deleted as u64,
1633            queue_messages_deleted: queue_messages_deleted as u64,
1634        })
1635    }
1636
1637    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1638    async fn delete_instance_bulk(
1639        &self,
1640        filter: InstanceFilter,
1641    ) -> Result<DeleteInstanceResult, ProviderError> {
1642        // Build query to find matching root instances in terminal states
1643        let mut sql = format!(
1644            r#"
1645            SELECT i.instance_id
1646            FROM {}.instances i
1647            LEFT JOIN {}.executions e ON i.instance_id = e.instance_id 
1648              AND i.current_execution_id = e.execution_id
1649            WHERE i.parent_instance_id IS NULL
1650              AND e.status IN ('Completed', 'Failed', 'ContinuedAsNew')
1651            "#,
1652            self.schema_name, self.schema_name
1653        );
1654
1655        // Add instance_ids filter if provided
1656        if let Some(ref ids) = filter.instance_ids {
1657            if ids.is_empty() {
1658                return Ok(DeleteInstanceResult::default());
1659            }
1660            let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1661            sql.push_str(&format!(
1662                " AND i.instance_id IN ({})",
1663                placeholders.join(", ")
1664            ));
1665        }
1666
1667        // Add completed_before filter if provided
1668        if filter.completed_before.is_some() {
1669            let param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0) + 1;
1670            sql.push_str(&format!(
1671                " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1672                param_num
1673            ));
1674        }
1675
1676        // Add limit
1677        let limit = filter.limit.unwrap_or(1000);
1678        let limit_param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0)
1679            + if filter.completed_before.is_some() { 1 } else { 0 }
1680            + 1;
1681        sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1682
1683        // Build and execute query
1684        let mut query = sqlx::query_scalar::<_, String>(&sql);
1685        if let Some(ref ids) = filter.instance_ids {
1686            for id in ids {
1687                query = query.bind(id);
1688            }
1689        }
1690        if let Some(completed_before) = filter.completed_before {
1691            query = query.bind(completed_before as i64);
1692        }
1693        query = query.bind(limit as i64);
1694
1695        let instance_ids: Vec<String> = query
1696            .fetch_all(&*self.pool)
1697            .await
1698            .map_err(|e| Self::sqlx_to_provider_error("delete_instance_bulk", e))?;
1699
1700        if instance_ids.is_empty() {
1701            return Ok(DeleteInstanceResult::default());
1702        }
1703
1704        // Delete each instance with cascade
1705        let mut result = DeleteInstanceResult::default();
1706
1707        for instance_id in &instance_ids {
1708            // Get full tree for this root
1709            let tree = self.get_instance_tree(instance_id).await?;
1710
1711            // Atomic delete (tree.all_ids is already in deletion order: children first)
1712            let delete_result = self.delete_instances_atomic(&tree.all_ids, true).await?;
1713            result.instances_deleted += delete_result.instances_deleted;
1714            result.executions_deleted += delete_result.executions_deleted;
1715            result.events_deleted += delete_result.events_deleted;
1716            result.queue_messages_deleted += delete_result.queue_messages_deleted;
1717        }
1718
1719        debug!(
1720            target = "duroxide::providers::postgres",
1721            operation = "delete_instance_bulk",
1722            instances_deleted = result.instances_deleted,
1723            executions_deleted = result.executions_deleted,
1724            events_deleted = result.events_deleted,
1725            queue_messages_deleted = result.queue_messages_deleted,
1726            "Bulk deleted instances"
1727        );
1728
1729        Ok(result)
1730    }
1731
1732    // ===== Pruning Operations =====
1733
1734    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1735    async fn prune_executions(
1736        &self,
1737        instance_id: &str,
1738        options: PruneOptions,
1739    ) -> Result<PruneResult, ProviderError> {
1740        let keep_last: Option<i32> = options.keep_last.map(|v| v as i32);
1741        let completed_before_ms: Option<i64> = options.completed_before.map(|v| v as i64);
1742
1743        let row: Option<(i64, i64, i64)> = sqlx::query_as(&format!(
1744            "SELECT * FROM {}.prune_executions($1, $2, $3)",
1745            self.schema_name
1746        ))
1747        .bind(instance_id)
1748        .bind(keep_last)
1749        .bind(completed_before_ms)
1750        .fetch_optional(&*self.pool)
1751        .await
1752        .map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?;
1753
1754        let (instances_processed, executions_deleted, events_deleted) = row.unwrap_or((0, 0, 0));
1755
1756        debug!(
1757            target = "duroxide::providers::postgres",
1758            operation = "prune_executions",
1759            instance_id = %instance_id,
1760            instances_processed = instances_processed,
1761            executions_deleted = executions_deleted,
1762            events_deleted = events_deleted,
1763            "Pruned executions"
1764        );
1765
1766        Ok(PruneResult {
1767            instances_processed: instances_processed as u64,
1768            executions_deleted: executions_deleted as u64,
1769            events_deleted: events_deleted as u64,
1770        })
1771    }
1772
1773    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1774    async fn prune_executions_bulk(
1775        &self,
1776        filter: InstanceFilter,
1777        options: PruneOptions,
1778    ) -> Result<PruneResult, ProviderError> {
1779        // Find matching instances (all statuses - prune_executions protects current execution)
1780        // Note: We include Running instances because long-running orchestrations (e.g., with
1781        // ContinueAsNew) may have old executions that need pruning. The underlying prune_executions
1782        // call safely skips the current execution regardless of its status.
1783        let mut sql = format!(
1784            r#"
1785            SELECT i.instance_id
1786            FROM {}.instances i
1787            LEFT JOIN {}.executions e ON i.instance_id = e.instance_id 
1788              AND i.current_execution_id = e.execution_id
1789            WHERE 1=1
1790            "#,
1791            self.schema_name, self.schema_name
1792        );
1793
1794        // Add instance_ids filter if provided
1795        if let Some(ref ids) = filter.instance_ids {
1796            if ids.is_empty() {
1797                return Ok(PruneResult::default());
1798            }
1799            let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1800            sql.push_str(&format!(
1801                " AND i.instance_id IN ({})",
1802                placeholders.join(", ")
1803            ));
1804        }
1805
1806        // Add completed_before filter if provided
1807        if filter.completed_before.is_some() {
1808            let param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0) + 1;
1809            sql.push_str(&format!(
1810                " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1811                param_num
1812            ));
1813        }
1814
1815        // Add limit
1816        let limit = filter.limit.unwrap_or(1000);
1817        let limit_param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0)
1818            + if filter.completed_before.is_some() { 1 } else { 0 }
1819            + 1;
1820        sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1821
1822        // Build and execute query
1823        let mut query = sqlx::query_scalar::<_, String>(&sql);
1824        if let Some(ref ids) = filter.instance_ids {
1825            for id in ids {
1826                query = query.bind(id);
1827            }
1828        }
1829        if let Some(completed_before) = filter.completed_before {
1830            query = query.bind(completed_before as i64);
1831        }
1832        query = query.bind(limit as i64);
1833
1834        let instance_ids: Vec<String> = query
1835            .fetch_all(&*self.pool)
1836            .await
1837            .map_err(|e| Self::sqlx_to_provider_error("prune_executions_bulk", e))?;
1838
1839        // Prune each instance
1840        let mut result = PruneResult::default();
1841
1842        for instance_id in &instance_ids {
1843            let single_result = self.prune_executions(instance_id, options.clone()).await?;
1844            result.instances_processed += single_result.instances_processed;
1845            result.executions_deleted += single_result.executions_deleted;
1846            result.events_deleted += single_result.events_deleted;
1847        }
1848
1849        debug!(
1850            target = "duroxide::providers::postgres",
1851            operation = "prune_executions_bulk",
1852            instances_processed = result.instances_processed,
1853            executions_deleted = result.executions_deleted,
1854            events_deleted = result.events_deleted,
1855            "Bulk pruned executions"
1856        );
1857
1858        Ok(result)
1859    }
1860}