Skip to main content

duroxide_pg/
provider.rs

1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4    CustomStatusUpdate, DeleteInstanceResult, DispatcherCapabilityFilter, ExecutionInfo,
5    ExecutionMetadata, InstanceFilter, InstanceInfo, OrchestrationItem, Provider, ProviderAdmin,
6    ProviderError, PruneOptions, PruneResult, QueueDepths, ScheduledActivityIdentifier,
7    SessionFetchConfig, SystemMetrics, WorkItem,
8};
9use duroxide::Event;
10use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
11use std::sync::Arc;
12use std::time::Duration;
13use std::time::{SystemTime, UNIX_EPOCH};
14use tokio::time::sleep;
15use tracing::{debug, error, instrument, warn};
16
17use crate::migrations::MigrationRunner;
18
19/// PostgreSQL-based provider for Duroxide durable orchestrations.
20///
21/// Implements the [`Provider`] and [`ProviderAdmin`] traits from Duroxide,
22/// storing orchestration state, history, and work queues in PostgreSQL.
23///
24/// # Example
25///
26/// ```rust,no_run
27/// use duroxide_pg::PostgresProvider;
28///
29/// # async fn example() -> anyhow::Result<()> {
30/// // Connect using DATABASE_URL or explicit connection string
31/// let provider = PostgresProvider::new("postgres://localhost/mydb").await?;
32///
33/// // Or use a custom schema for isolation
34/// let provider = PostgresProvider::new_with_schema(
35///     "postgres://localhost/mydb",
36///     Some("my_app"),
37/// ).await?;
38/// # Ok(())
39/// # }
40/// ```
41pub struct PostgresProvider {
42    pool: Arc<PgPool>,
43    schema_name: String,
44}
45
46impl PostgresProvider {
47    pub async fn new(database_url: &str) -> Result<Self> {
48        Self::new_with_schema(database_url, None).await
49    }
50
51    pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
52        let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
53            .ok()
54            .and_then(|s| s.parse::<u32>().ok())
55            .unwrap_or(10);
56
57        let pool = PgPoolOptions::new()
58            .max_connections(max_connections)
59            .min_connections(1)
60            .acquire_timeout(std::time::Duration::from_secs(30))
61            .connect(database_url)
62            .await?;
63
64        let schema_name = schema_name.unwrap_or("public").to_string();
65
66        let provider = Self {
67            pool: Arc::new(pool),
68            schema_name: schema_name.clone(),
69        };
70
71        // Run migrations to initialize schema
72        let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
73        migration_runner.migrate().await?;
74
75        Ok(provider)
76    }
77
78    #[instrument(skip(self), target = "duroxide::providers::postgres")]
79    pub async fn initialize_schema(&self) -> Result<()> {
80        // Schema initialization is now handled by migrations
81        // This method is kept for backward compatibility but delegates to migrations
82        let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
83        migration_runner.migrate().await?;
84        Ok(())
85    }
86
87    /// Get current timestamp in milliseconds (Unix epoch)
88    fn now_millis() -> i64 {
89        SystemTime::now()
90            .duration_since(UNIX_EPOCH)
91            .unwrap()
92            .as_millis() as i64
93    }
94
95
96
97    /// Get schema-qualified table name
98    fn table_name(&self, table: &str) -> String {
99        format!("{}.{}", self.schema_name, table)
100    }
101
102    /// Get the database pool (for testing)
103    pub fn pool(&self) -> &PgPool {
104        &self.pool
105    }
106
107    /// Get the schema name (for testing)
108    pub fn schema_name(&self) -> &str {
109        &self.schema_name
110    }
111
112    /// Convert sqlx::Error to ProviderError with proper classification
113    fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
114        match e {
115            SqlxError::Database(ref db_err) => {
116                // PostgreSQL error codes
117                let code_opt = db_err.code();
118                let code = code_opt.as_deref();
119                if code == Some("40P01") {
120                    // Deadlock detected
121                    ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
122                } else if code == Some("40001") {
123                    // Serialization failure - permanent error (transaction conflict, not transient)
124                    ProviderError::permanent(operation, format!("Serialization failure: {e}"))
125                } else if code == Some("23505") {
126                    // Unique constraint violation (duplicate event)
127                    ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
128                } else if code == Some("23503") {
129                    // Foreign key constraint violation
130                    ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
131                } else {
132                    ProviderError::permanent(operation, format!("Database error: {e}"))
133                }
134            }
135            SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
136                ProviderError::retryable(operation, format!("Connection pool error: {e}"))
137            }
138            SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
139            _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
140        }
141    }
142
143    /// Clean up schema after tests (drops all tables and optionally the schema)
144    ///
145    /// **SAFETY**: Never drops the "public" schema itself, only tables within it.
146    /// Only drops the schema if it's a custom schema (not "public").
147    pub async fn cleanup_schema(&self) -> Result<()> {
148        // Call the stored procedure to drop all tables
149        sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
150            .execute(&*self.pool)
151            .await?;
152
153        // SAFETY: Never drop the "public" schema - it's a PostgreSQL system schema
154        // Only drop custom schemas created for testing
155        if self.schema_name != "public" {
156            sqlx::query(&format!(
157                "DROP SCHEMA IF EXISTS {} CASCADE",
158                self.schema_name
159            ))
160            .execute(&*self.pool)
161            .await?;
162        } else {
163            // Explicit safeguard: we only drop tables from public schema, never the schema itself
164            // This ensures we don't accidentally drop the default PostgreSQL schema
165        }
166
167        Ok(())
168    }
169}
170
171#[async_trait::async_trait]
172impl Provider for PostgresProvider {
173    fn name(&self) -> &str {
174        "duroxide-pg"
175    }
176
177    fn version(&self) -> &str {
178        env!("CARGO_PKG_VERSION")
179    }
180
181    #[instrument(skip(self), target = "duroxide::providers::postgres")]
182    async fn fetch_orchestration_item(
183        &self,
184        lock_timeout: Duration,
185        _poll_timeout: Duration,
186        filter: Option<&DispatcherCapabilityFilter>,
187    ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
188        let start = std::time::Instant::now();
189
190        const MAX_RETRIES: u32 = 3;
191        const RETRY_DELAY_MS: u64 = 50;
192
193        // Convert Duration to milliseconds
194        let lock_timeout_ms = lock_timeout.as_millis() as i64;
195        let mut _last_error: Option<ProviderError> = None;
196
197        // Extract version filter from capability filter
198        let (min_packed, max_packed) = if let Some(f) = filter {
199            if let Some(range) = f.supported_duroxide_versions.first() {
200                let min = range.min.major as i64 * 1_000_000 + range.min.minor as i64 * 1_000 + range.min.patch as i64;
201                let max = range.max.major as i64 * 1_000_000 + range.max.minor as i64 * 1_000 + range.max.patch as i64;
202                (Some(min), Some(max))
203            } else {
204                // Empty supported_duroxide_versions = supports nothing
205                return Ok(None);
206            }
207        } else {
208            (None, None)
209        };
210
211        for attempt in 0..=MAX_RETRIES {
212            let now_ms = Self::now_millis();
213
214            let result: Result<
215                Option<(
216                    String,
217                    String,
218                    String,
219                    i64,
220                    serde_json::Value,
221                    serde_json::Value,
222                    String,
223                    i32,
224                )>,
225                SqlxError,
226            > = sqlx::query_as(&format!(
227                "SELECT * FROM {}.fetch_orchestration_item($1, $2, $3, $4)",
228                self.schema_name
229            ))
230            .bind(now_ms)
231            .bind(lock_timeout_ms)
232            .bind(min_packed)
233            .bind(max_packed)
234            .fetch_optional(&*self.pool)
235            .await;
236
237            let row = match result {
238                Ok(r) => r,
239                Err(e) => {
240                    let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
241                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
242                        warn!(
243                            target = "duroxide::providers::postgres",
244                            operation = "fetch_orchestration_item",
245                            attempt = attempt + 1,
246                            error = %provider_err,
247                            "Retryable error, will retry"
248                        );
249                        _last_error = Some(provider_err);
250                        sleep(std::time::Duration::from_millis(
251                            RETRY_DELAY_MS * (attempt as u64 + 1),
252                        ))
253                        .await;
254                        continue;
255                    }
256                    return Err(provider_err);
257                }
258            };
259
260            if let Some((
261                instance_id,
262                orchestration_name,
263                orchestration_version,
264                execution_id,
265                history_json,
266                messages_json,
267                lock_token,
268                attempt_count,
269            )) = row
270            {
271                let (history, history_error) = match serde_json::from_value::<Vec<Event>>(history_json) {
272                    Ok(h) => (h, None),
273                    Err(e) => {
274                        let error_msg = format!("Failed to deserialize history: {e}");
275                        warn!(
276                            target = "duroxide::providers::postgres",
277                            instance = %instance_id,
278                            error = %error_msg,
279                            "History deserialization failed, returning item with history_error"
280                        );
281                        (vec![], Some(error_msg))
282                    }
283                };
284
285                let messages: Vec<WorkItem> =
286                    serde_json::from_value(messages_json).map_err(|e| {
287                        ProviderError::permanent(
288                            "fetch_orchestration_item",
289                            format!("Failed to deserialize messages: {e}"),
290                        )
291                    })?;
292
293                let duration_ms = start.elapsed().as_millis() as u64;
294                debug!(
295                    target = "duroxide::providers::postgres",
296                    operation = "fetch_orchestration_item",
297                    instance_id = %instance_id,
298                    execution_id = execution_id,
299                    message_count = messages.len(),
300                    history_count = history.len(),
301                    attempt_count = attempt_count,
302                    duration_ms = duration_ms,
303                    attempts = attempt + 1,
304                    "Fetched orchestration item via stored procedure"
305                );
306
307                return Ok(Some((
308                    OrchestrationItem {
309                        instance: instance_id,
310                        orchestration_name,
311                        execution_id: execution_id as u64,
312                        version: orchestration_version,
313                        history,
314                        messages,
315                        history_error,
316                    },
317                    lock_token,
318                    attempt_count as u32,
319                )));
320            }
321
322            // No result found - return immediately (short polling behavior)
323            // Only retry with delay on retryable errors (handled above)
324            return Ok(None);
325        }
326
327        Ok(None)
328    }
329    #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
330    async fn ack_orchestration_item(
331        &self,
332        lock_token: &str,
333        execution_id: u64,
334        history_delta: Vec<Event>,
335        worker_items: Vec<WorkItem>,
336        orchestrator_items: Vec<WorkItem>,
337        metadata: ExecutionMetadata,
338        cancelled_activities: Vec<ScheduledActivityIdentifier>,
339    ) -> Result<(), ProviderError> {
340        let start = std::time::Instant::now();
341
342        const MAX_RETRIES: u32 = 3;
343        const RETRY_DELAY_MS: u64 = 50;
344
345        let mut history_delta_payload = Vec::with_capacity(history_delta.len());
346        for event in &history_delta {
347            if event.event_id() == 0 {
348                return Err(ProviderError::permanent(
349                    "ack_orchestration_item",
350                    "event_id must be set by runtime",
351                ));
352            }
353
354            let event_json = serde_json::to_string(event).map_err(|e| {
355                ProviderError::permanent(
356                    "ack_orchestration_item",
357                    format!("Failed to serialize event: {e}"),
358                )
359            })?;
360
361            let event_type = format!("{event:?}")
362                .split('{')
363                .next()
364                .unwrap_or("Unknown")
365                .trim()
366                .to_string();
367
368            history_delta_payload.push(serde_json::json!({
369                "event_id": event.event_id(),
370                "event_type": event_type,
371                "event_data": event_json,
372            }));
373        }
374
375        let history_delta_json = serde_json::Value::Array(history_delta_payload);
376
377        let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
378            ProviderError::permanent(
379                "ack_orchestration_item",
380                format!("Failed to serialize worker items: {e}"),
381            )
382        })?;
383
384        let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
385            ProviderError::permanent(
386                "ack_orchestration_item",
387                format!("Failed to serialize orchestrator items: {e}"),
388            )
389        })?;
390
391        // Map custom_status to action/value for the stored procedure
392        let (custom_status_action, custom_status_value): (Option<&str>, Option<&str>) =
393            match &metadata.custom_status {
394                Some(CustomStatusUpdate::Set(s)) => (Some("set"), Some(s.as_str())),
395                Some(CustomStatusUpdate::Clear) => (Some("clear"), None),
396                None => (None, None),
397            };
398
399        let metadata_json = serde_json::json!({
400            "orchestration_name": metadata.orchestration_name,
401            "orchestration_version": metadata.orchestration_version,
402            "status": metadata.status,
403            "output": metadata.output,
404            "parent_instance_id": metadata.parent_instance_id,
405            "pinned_duroxide_version": metadata.pinned_duroxide_version.as_ref().map(|v| {
406                serde_json::json!({
407                    "major": v.major,
408                    "minor": v.minor,
409                    "patch": v.patch,
410                })
411            }),
412            "custom_status_action": custom_status_action,
413            "custom_status_value": custom_status_value,
414        });
415
416        // Serialize cancelled activities for lock stealing
417        let cancelled_activities_json: Vec<serde_json::Value> = cancelled_activities
418            .iter()
419            .map(|a| {
420                serde_json::json!({
421                    "instance": a.instance,
422                    "execution_id": a.execution_id,
423                    "activity_id": a.activity_id,
424                })
425            })
426            .collect();
427        let cancelled_activities_json = serde_json::Value::Array(cancelled_activities_json);
428
429        for attempt in 0..=MAX_RETRIES {
430            let now_ms = Self::now_millis();
431            let result = sqlx::query(&format!(
432                "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6, $7, $8)",
433                self.schema_name
434            ))
435            .bind(lock_token)
436            .bind(now_ms)
437            .bind(execution_id as i64)
438            .bind(&history_delta_json)
439            .bind(&worker_items_json)
440            .bind(&orchestrator_items_json)
441            .bind(&metadata_json)
442            .bind(&cancelled_activities_json)
443            .execute(&*self.pool)
444            .await;
445
446            match result {
447                Ok(_) => {
448                    let duration_ms = start.elapsed().as_millis() as u64;
449                    debug!(
450                        target = "duroxide::providers::postgres",
451                        operation = "ack_orchestration_item",
452                        execution_id = execution_id,
453                        history_count = history_delta.len(),
454                        worker_items_count = worker_items.len(),
455                        orchestrator_items_count = orchestrator_items.len(),
456                        cancelled_activities_count = cancelled_activities.len(),
457                        duration_ms = duration_ms,
458                        attempts = attempt + 1,
459                        "Acknowledged orchestration item via stored procedure"
460                    );
461                    return Ok(());
462                }
463                Err(e) => {
464                    // Check for permanent errors first
465                    if let SqlxError::Database(db_err) = &e {
466                        if db_err.message().contains("Invalid lock token") {
467                            return Err(ProviderError::permanent(
468                                "ack_orchestration_item",
469                                "Invalid lock token",
470                            ));
471                        }
472                    } else if e.to_string().contains("Invalid lock token") {
473                        return Err(ProviderError::permanent(
474                            "ack_orchestration_item",
475                            "Invalid lock token",
476                        ));
477                    }
478
479                    let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
480                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
481                        warn!(
482                            target = "duroxide::providers::postgres",
483                            operation = "ack_orchestration_item",
484                            attempt = attempt + 1,
485                            error = %provider_err,
486                            "Retryable error, will retry"
487                        );
488                        sleep(std::time::Duration::from_millis(
489                            RETRY_DELAY_MS * (attempt as u64 + 1),
490                        ))
491                        .await;
492                        continue;
493                    }
494                    return Err(provider_err);
495                }
496            }
497        }
498
499        // Should never reach here, but just in case
500        Ok(())
501    }
502    #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
503    async fn abandon_orchestration_item(
504        &self,
505        lock_token: &str,
506        delay: Option<Duration>,
507        ignore_attempt: bool,
508    ) -> Result<(), ProviderError> {
509        let start = std::time::Instant::now();
510        let now_ms = Self::now_millis();
511        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
512
513        let instance_id = match sqlx::query_scalar::<_, String>(&format!(
514            "SELECT {}.abandon_orchestration_item($1, $2, $3, $4)",
515            self.schema_name
516        ))
517        .bind(lock_token)
518        .bind(now_ms)
519        .bind(delay_param)
520        .bind(ignore_attempt)
521        .fetch_one(&*self.pool)
522        .await
523        {
524            Ok(instance_id) => instance_id,
525            Err(e) => {
526                if let SqlxError::Database(db_err) = &e {
527                    if db_err.message().contains("Invalid lock token") {
528                        return Err(ProviderError::permanent(
529                            "abandon_orchestration_item",
530                            "Invalid lock token",
531                        ));
532                    }
533                } else if e.to_string().contains("Invalid lock token") {
534                    return Err(ProviderError::permanent(
535                        "abandon_orchestration_item",
536                        "Invalid lock token",
537                    ));
538                }
539
540                return Err(Self::sqlx_to_provider_error(
541                    "abandon_orchestration_item",
542                    e,
543                ));
544            }
545        };
546
547        let duration_ms = start.elapsed().as_millis() as u64;
548        debug!(
549            target = "duroxide::providers::postgres",
550            operation = "abandon_orchestration_item",
551            instance_id = %instance_id,
552            delay_ms = delay.map(|d| d.as_millis() as u64),
553            ignore_attempt = ignore_attempt,
554            duration_ms = duration_ms,
555            "Abandoned orchestration item via stored procedure"
556        );
557
558        Ok(())
559    }
560
561    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
562    async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
563        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
564            "SELECT out_event_data FROM {}.fetch_history($1)",
565            self.schema_name
566        ))
567        .bind(instance)
568        .fetch_all(&*self.pool)
569        .await
570        .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
571
572        Ok(event_data_rows
573            .into_iter()
574            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
575            .collect())
576    }
577
578    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
579    async fn append_with_execution(
580        &self,
581        instance: &str,
582        execution_id: u64,
583        new_events: Vec<Event>,
584    ) -> Result<(), ProviderError> {
585        if new_events.is_empty() {
586            return Ok(());
587        }
588
589        let mut events_payload = Vec::with_capacity(new_events.len());
590        for event in &new_events {
591            if event.event_id() == 0 {
592                error!(
593                    target = "duroxide::providers::postgres",
594                    operation = "append_with_execution",
595                    error_type = "validation_error",
596                    instance_id = %instance,
597                    execution_id = execution_id,
598                    "event_id must be set by runtime"
599                );
600                return Err(ProviderError::permanent(
601                    "append_with_execution",
602                    "event_id must be set by runtime",
603                ));
604            }
605
606            let event_json = serde_json::to_string(event).map_err(|e| {
607                ProviderError::permanent(
608                    "append_with_execution",
609                    format!("Failed to serialize event: {e}"),
610                )
611            })?;
612
613            let event_type = format!("{event:?}")
614                .split('{')
615                .next()
616                .unwrap_or("Unknown")
617                .trim()
618                .to_string();
619
620            events_payload.push(serde_json::json!({
621                "event_id": event.event_id(),
622                "event_type": event_type,
623                "event_data": event_json,
624            }));
625        }
626
627        let events_json = serde_json::Value::Array(events_payload);
628
629        sqlx::query(&format!(
630            "SELECT {}.append_history($1, $2, $3)",
631            self.schema_name
632        ))
633        .bind(instance)
634        .bind(execution_id as i64)
635        .bind(events_json)
636        .execute(&*self.pool)
637        .await
638        .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
639
640        debug!(
641            target = "duroxide::providers::postgres",
642            operation = "append_with_execution",
643            instance_id = %instance,
644            execution_id = execution_id,
645            event_count = new_events.len(),
646            "Appended history events via stored procedure"
647        );
648
649        Ok(())
650    }
651
652    #[instrument(skip(self), target = "duroxide::providers::postgres")]
653    async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
654        let work_item = serde_json::to_string(&item).map_err(|e| {
655            ProviderError::permanent(
656                "enqueue_worker_work",
657                format!("Failed to serialize work item: {e}"),
658            )
659        })?;
660
661        let now_ms = Self::now_millis();
662
663        // Extract activity identification and session_id for ActivityExecute items
664        let (instance_id, execution_id, activity_id, session_id) = match &item {
665            WorkItem::ActivityExecute {
666                instance,
667                execution_id,
668                id,
669                session_id,
670                ..
671            } => (
672                Some(instance.clone()),
673                Some(*execution_id as i64),
674                Some(*id as i64),
675                session_id.clone(),
676            ),
677            _ => (None, None, None, None),
678        };
679
680        sqlx::query(&format!(
681            "SELECT {}.enqueue_worker_work($1, $2, $3, $4, $5, $6)",
682            self.schema_name
683        ))
684        .bind(work_item)
685        .bind(now_ms)
686        .bind(&instance_id)
687        .bind(execution_id)
688        .bind(activity_id)
689        .bind(&session_id)
690        .execute(&*self.pool)
691        .await
692        .map_err(|e| {
693            error!(
694                target = "duroxide::providers::postgres",
695                operation = "enqueue_worker_work",
696                error_type = "database_error",
697                error = %e,
698                "Failed to enqueue worker work"
699            );
700            Self::sqlx_to_provider_error("enqueue_worker_work", e)
701        })?;
702
703        Ok(())
704    }
705
706    #[instrument(skip(self), target = "duroxide::providers::postgres")]
707    async fn fetch_work_item(
708        &self,
709        lock_timeout: Duration,
710        _poll_timeout: Duration,
711        session: Option<&SessionFetchConfig>,
712    ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
713        let start = std::time::Instant::now();
714
715        // Convert Duration to milliseconds
716        let lock_timeout_ms = lock_timeout.as_millis() as i64;
717
718        // Extract session parameters
719        let (owner_id, session_lock_timeout_ms): (Option<&str>, Option<i64>) = match session {
720            Some(config) => (
721                Some(&config.owner_id),
722                Some(config.lock_timeout.as_millis() as i64),
723            ),
724            None => (None, None),
725        };
726
727        let row = match sqlx::query_as::<_, (String, String, i32)>(&format!(
728            "SELECT * FROM {}.fetch_work_item($1, $2, $3, $4)",
729            self.schema_name
730        ))
731        .bind(Self::now_millis())
732        .bind(lock_timeout_ms)
733        .bind(owner_id)
734        .bind(session_lock_timeout_ms)
735        .fetch_optional(&*self.pool)
736        .await
737        {
738            Ok(row) => row,
739            Err(e) => {
740                return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
741            }
742        };
743
744        let (work_item_json, lock_token, attempt_count) = match row {
745            Some(row) => row,
746            None => return Ok(None),
747        };
748
749        let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
750            ProviderError::permanent(
751                "fetch_work_item",
752                format!("Failed to deserialize worker item: {e}"),
753            )
754        })?;
755
756        let duration_ms = start.elapsed().as_millis() as u64;
757
758        // Extract instance for logging - different work item types have different structures
759        let instance_id = match &work_item {
760            WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
761            WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
762            WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
763            WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
764            WorkItem::TimerFired { instance, .. } => instance.as_str(),
765            WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
766            WorkItem::CancelInstance { instance, .. } => instance.as_str(),
767            WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
768            WorkItem::SubOrchCompleted {
769                parent_instance, ..
770            } => parent_instance.as_str(),
771            WorkItem::SubOrchFailed {
772                parent_instance, ..
773            } => parent_instance.as_str(),
774            WorkItem::QueueMessage { instance, .. } => instance.as_str(),
775        };
776
777        debug!(
778            target = "duroxide::providers::postgres",
779            operation = "fetch_work_item",
780            instance_id = %instance_id,
781            attempt_count = attempt_count,
782            duration_ms = duration_ms,
783            "Fetched activity work item via stored procedure"
784        );
785
786        Ok(Some((
787            work_item,
788            lock_token,
789            attempt_count as u32,
790        )))
791    }
792
793    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
794    async fn ack_work_item(
795        &self,
796        token: &str,
797        completion: Option<WorkItem>,
798    ) -> Result<(), ProviderError> {
799        let start = std::time::Instant::now();
800
801        // If no completion provided (e.g., cancelled activity), just delete the item
802        let Some(completion) = completion else {
803            let now_ms = Self::now_millis();
804            // Call ack_worker with NULL completion to delete without enqueueing
805            sqlx::query(&format!(
806                "SELECT {}.ack_worker($1, NULL, NULL, $2)",
807                self.schema_name
808            ))
809            .bind(token)
810            .bind(now_ms)
811            .execute(&*self.pool)
812            .await
813            .map_err(|e| {
814                if e.to_string().contains("Worker queue item not found") {
815                    ProviderError::permanent(
816                        "ack_worker",
817                        "Worker queue item not found or already processed",
818                    )
819                } else {
820                    Self::sqlx_to_provider_error("ack_worker", e)
821                }
822            })?;
823
824            let duration_ms = start.elapsed().as_millis() as u64;
825            debug!(
826                target = "duroxide::providers::postgres",
827                operation = "ack_worker",
828                token = %token,
829                duration_ms = duration_ms,
830                "Acknowledged worker without completion (cancelled)"
831            );
832            return Ok(());
833        };
834
835        // Extract instance ID from completion WorkItem
836        let instance_id = match &completion {
837            WorkItem::ActivityCompleted { instance, .. }
838            | WorkItem::ActivityFailed { instance, .. } => instance,
839            _ => {
840                error!(
841                    target = "duroxide::providers::postgres",
842                    operation = "ack_worker",
843                    error_type = "invalid_completion_type",
844                    "Invalid completion work item type"
845                );
846                return Err(ProviderError::permanent(
847                    "ack_worker",
848                    "Invalid completion work item type",
849                ));
850            }
851        };
852
853        let completion_json = serde_json::to_string(&completion).map_err(|e| {
854            ProviderError::permanent("ack_worker", format!("Failed to serialize completion: {e}"))
855        })?;
856
857        let now_ms = Self::now_millis();
858
859        // Call stored procedure to atomically delete worker item and enqueue completion
860        sqlx::query(&format!(
861            "SELECT {}.ack_worker($1, $2, $3, $4)",
862            self.schema_name
863        ))
864        .bind(token)
865        .bind(instance_id)
866        .bind(completion_json)
867        .bind(now_ms)
868        .execute(&*self.pool)
869        .await
870        .map_err(|e| {
871            if e.to_string().contains("Worker queue item not found") {
872                error!(
873                    target = "duroxide::providers::postgres",
874                    operation = "ack_worker",
875                    error_type = "worker_item_not_found",
876                    token = %token,
877                    "Worker queue item not found or already processed"
878                );
879                ProviderError::permanent(
880                    "ack_worker",
881                    "Worker queue item not found or already processed",
882                )
883            } else {
884                Self::sqlx_to_provider_error("ack_worker", e)
885            }
886        })?;
887
888        let duration_ms = start.elapsed().as_millis() as u64;
889        debug!(
890            target = "duroxide::providers::postgres",
891            operation = "ack_worker",
892            instance_id = %instance_id,
893            duration_ms = duration_ms,
894            "Acknowledged worker and enqueued completion"
895        );
896
897        Ok(())
898    }
899
900    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
901    async fn renew_work_item_lock(
902        &self,
903        token: &str,
904        extend_for: Duration,
905    ) -> Result<(), ProviderError> {
906        let start = std::time::Instant::now();
907
908        // Get current time from application for consistent time reference
909        let now_ms = Self::now_millis();
910
911        // Convert Duration to seconds for the stored procedure
912        let extend_secs = extend_for.as_secs() as i64;
913
914        match sqlx::query(&format!(
915            "SELECT {}.renew_work_item_lock($1, $2, $3)",
916            self.schema_name
917        ))
918        .bind(token)
919        .bind(now_ms)
920        .bind(extend_secs)
921        .execute(&*self.pool)
922        .await
923        {
924            Ok(_) => {
925                let duration_ms = start.elapsed().as_millis() as u64;
926                debug!(
927                    target = "duroxide::providers::postgres",
928                    operation = "renew_work_item_lock",
929                    token = %token,
930                    extend_for_secs = extend_secs,
931                    duration_ms = duration_ms,
932                    "Work item lock renewed successfully"
933                );
934                Ok(())
935            }
936            Err(e) => {
937                if let SqlxError::Database(db_err) = &e {
938                    if db_err.message().contains("Lock token invalid") {
939                        return Err(ProviderError::permanent(
940                            "renew_work_item_lock",
941                            "Lock token invalid, expired, or already acked",
942                        ));
943                    }
944                } else if e.to_string().contains("Lock token invalid") {
945                    return Err(ProviderError::permanent(
946                        "renew_work_item_lock",
947                        "Lock token invalid, expired, or already acked",
948                    ));
949                }
950
951                Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
952            }
953        }
954    }
955
956    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
957    async fn abandon_work_item(
958        &self,
959        token: &str,
960        delay: Option<Duration>,
961        ignore_attempt: bool,
962    ) -> Result<(), ProviderError> {
963        let start = std::time::Instant::now();
964        let now_ms = Self::now_millis();
965        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
966
967        match sqlx::query(&format!(
968            "SELECT {}.abandon_work_item($1, $2, $3, $4)",
969            self.schema_name
970        ))
971        .bind(token)
972        .bind(now_ms)
973        .bind(delay_param)
974        .bind(ignore_attempt)
975        .execute(&*self.pool)
976        .await
977        {
978            Ok(_) => {
979                let duration_ms = start.elapsed().as_millis() as u64;
980                debug!(
981                    target = "duroxide::providers::postgres",
982                    operation = "abandon_work_item",
983                    token = %token,
984                    delay_ms = delay.map(|d| d.as_millis() as u64),
985                    ignore_attempt = ignore_attempt,
986                    duration_ms = duration_ms,
987                    "Abandoned work item via stored procedure"
988                );
989                Ok(())
990            }
991            Err(e) => {
992                if let SqlxError::Database(db_err) = &e {
993                    if db_err.message().contains("Invalid lock token")
994                        || db_err.message().contains("already acked")
995                    {
996                        return Err(ProviderError::permanent(
997                            "abandon_work_item",
998                            "Invalid lock token or already acked",
999                        ));
1000                    }
1001                } else if e.to_string().contains("Invalid lock token")
1002                    || e.to_string().contains("already acked")
1003                {
1004                    return Err(ProviderError::permanent(
1005                        "abandon_work_item",
1006                        "Invalid lock token or already acked",
1007                    ));
1008                }
1009
1010                Err(Self::sqlx_to_provider_error("abandon_work_item", e))
1011            }
1012        }
1013    }
1014
1015    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1016    async fn renew_orchestration_item_lock(
1017        &self,
1018        token: &str,
1019        extend_for: Duration,
1020    ) -> Result<(), ProviderError> {
1021        let start = std::time::Instant::now();
1022
1023        // Get current time from application for consistent time reference
1024        let now_ms = Self::now_millis();
1025
1026        // Convert Duration to seconds for the stored procedure
1027        let extend_secs = extend_for.as_secs() as i64;
1028
1029        match sqlx::query(&format!(
1030            "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
1031            self.schema_name
1032        ))
1033        .bind(token)
1034        .bind(now_ms)
1035        .bind(extend_secs)
1036        .execute(&*self.pool)
1037        .await
1038        {
1039            Ok(_) => {
1040                let duration_ms = start.elapsed().as_millis() as u64;
1041                debug!(
1042                    target = "duroxide::providers::postgres",
1043                    operation = "renew_orchestration_item_lock",
1044                    token = %token,
1045                    extend_for_secs = extend_secs,
1046                    duration_ms = duration_ms,
1047                    "Orchestration item lock renewed successfully"
1048                );
1049                Ok(())
1050            }
1051            Err(e) => {
1052                if let SqlxError::Database(db_err) = &e {
1053                    if db_err.message().contains("Lock token invalid")
1054                        || db_err.message().contains("expired")
1055                        || db_err.message().contains("already released")
1056                    {
1057                        return Err(ProviderError::permanent(
1058                            "renew_orchestration_item_lock",
1059                            "Lock token invalid, expired, or already released",
1060                        ));
1061                    }
1062                } else if e.to_string().contains("Lock token invalid")
1063                    || e.to_string().contains("expired")
1064                    || e.to_string().contains("already released")
1065                {
1066                    return Err(ProviderError::permanent(
1067                        "renew_orchestration_item_lock",
1068                        "Lock token invalid, expired, or already released",
1069                    ));
1070                }
1071
1072                Err(Self::sqlx_to_provider_error(
1073                    "renew_orchestration_item_lock",
1074                    e,
1075                ))
1076            }
1077        }
1078    }
1079
1080    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1081    async fn enqueue_for_orchestrator(
1082        &self,
1083        item: WorkItem,
1084        delay: Option<Duration>,
1085    ) -> Result<(), ProviderError> {
1086        let work_item = serde_json::to_string(&item).map_err(|e| {
1087            ProviderError::permanent(
1088                "enqueue_orchestrator_work",
1089                format!("Failed to serialize work item: {e}"),
1090            )
1091        })?;
1092
1093        // Extract instance ID from WorkItem enum
1094        let instance_id = match &item {
1095            WorkItem::StartOrchestration { instance, .. }
1096            | WorkItem::ActivityCompleted { instance, .. }
1097            | WorkItem::ActivityFailed { instance, .. }
1098            | WorkItem::TimerFired { instance, .. }
1099            | WorkItem::ExternalRaised { instance, .. }
1100            | WorkItem::CancelInstance { instance, .. }
1101            | WorkItem::ContinueAsNew { instance, .. }
1102            | WorkItem::QueueMessage { instance, .. } => instance,
1103            WorkItem::SubOrchCompleted {
1104                parent_instance, ..
1105            }
1106            | WorkItem::SubOrchFailed {
1107                parent_instance, ..
1108            } => parent_instance,
1109            WorkItem::ActivityExecute { .. } => {
1110                return Err(ProviderError::permanent(
1111                    "enqueue_orchestrator_work",
1112                    "ActivityExecute should go to worker queue, not orchestrator queue",
1113                ));
1114            }
1115        };
1116
1117        // Determine visible_at: use max of fire_at_ms (for TimerFired) and delay
1118        let now_ms = Self::now_millis();
1119
1120        let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
1121            if *fire_at_ms > 0 {
1122                // Take max of fire_at_ms and delay (if provided)
1123                if let Some(delay) = delay {
1124                    std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
1125                } else {
1126                    *fire_at_ms
1127                }
1128            } else {
1129                // fire_at_ms is 0, use delay or NOW()
1130                delay
1131                    .map(|d| now_ms as u64 + d.as_millis() as u64)
1132                    .unwrap_or(now_ms as u64)
1133            }
1134        } else {
1135            // Non-timer item: use delay or NOW()
1136            delay
1137                .map(|d| now_ms as u64 + d.as_millis() as u64)
1138                .unwrap_or(now_ms as u64)
1139        };
1140
1141        let visible_at = Utc
1142            .timestamp_millis_opt(visible_at_ms as i64)
1143            .single()
1144            .ok_or_else(|| {
1145                ProviderError::permanent(
1146                    "enqueue_orchestrator_work",
1147                    "Invalid visible_at timestamp",
1148                )
1149            })?;
1150
1151        // ⚠️ CRITICAL: DO NOT extract orchestration metadata - instance creation happens via ack_orchestration_item metadata
1152        // Pass NULL for orchestration_name, orchestration_version, execution_id parameters
1153
1154        // Call stored procedure to enqueue work
1155        sqlx::query(&format!(
1156            "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
1157            self.schema_name
1158        ))
1159        .bind(instance_id)
1160        .bind(&work_item)
1161        .bind(visible_at)
1162        .bind::<Option<String>>(None) // orchestration_name - NULL
1163        .bind::<Option<String>>(None) // orchestration_version - NULL
1164        .bind::<Option<i64>>(None) // execution_id - NULL
1165        .execute(&*self.pool)
1166        .await
1167        .map_err(|e| {
1168            error!(
1169                target = "duroxide::providers::postgres",
1170                operation = "enqueue_orchestrator_work",
1171                error_type = "database_error",
1172                error = %e,
1173                instance_id = %instance_id,
1174                "Failed to enqueue orchestrator work"
1175            );
1176            Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1177        })?;
1178
1179        debug!(
1180            target = "duroxide::providers::postgres",
1181            operation = "enqueue_orchestrator_work",
1182            instance_id = %instance_id,
1183            delay_ms = delay.map(|d| d.as_millis() as u64),
1184            "Enqueued orchestrator work"
1185        );
1186
1187        Ok(())
1188    }
1189
1190    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1191    async fn read_with_execution(
1192        &self,
1193        instance: &str,
1194        execution_id: u64,
1195    ) -> Result<Vec<Event>, ProviderError> {
1196        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1197            "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1198            self.table_name("history")
1199        ))
1200        .bind(instance)
1201        .bind(execution_id as i64)
1202        .fetch_all(&*self.pool)
1203        .await
1204        .ok()
1205        .unwrap_or_default();
1206
1207        Ok(event_data_rows
1208            .into_iter()
1209            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1210            .collect())
1211    }
1212
1213    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1214    async fn renew_session_lock(
1215        &self,
1216        owner_ids: &[&str],
1217        extend_for: Duration,
1218        idle_timeout: Duration,
1219    ) -> Result<usize, ProviderError> {
1220        if owner_ids.is_empty() {
1221            return Ok(0);
1222        }
1223
1224        let now_ms = Self::now_millis();
1225        let extend_ms = extend_for.as_millis() as i64;
1226        let idle_timeout_ms = idle_timeout.as_millis() as i64;
1227        let owner_ids_vec: Vec<&str> = owner_ids.to_vec();
1228
1229        let result = sqlx::query_scalar::<_, i64>(&format!(
1230            "SELECT {}.renew_session_lock($1, $2, $3, $4)",
1231            self.schema_name
1232        ))
1233        .bind(&owner_ids_vec)
1234        .bind(now_ms)
1235        .bind(extend_ms)
1236        .bind(idle_timeout_ms)
1237        .fetch_one(&*self.pool)
1238        .await
1239        .map_err(|e| Self::sqlx_to_provider_error("renew_session_lock", e))?;
1240
1241        debug!(
1242            target = "duroxide::providers::postgres",
1243            operation = "renew_session_lock",
1244            owner_count = owner_ids.len(),
1245            sessions_renewed = result,
1246            "Session locks renewed"
1247        );
1248
1249        Ok(result as usize)
1250    }
1251
1252    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1253    async fn cleanup_orphaned_sessions(
1254        &self,
1255        _idle_timeout: Duration,
1256    ) -> Result<usize, ProviderError> {
1257        let now_ms = Self::now_millis();
1258
1259        let result = sqlx::query_scalar::<_, i64>(&format!(
1260            "SELECT {}.cleanup_orphaned_sessions($1)",
1261            self.schema_name
1262        ))
1263        .bind(now_ms)
1264        .fetch_one(&*self.pool)
1265        .await
1266        .map_err(|e| Self::sqlx_to_provider_error("cleanup_orphaned_sessions", e))?;
1267
1268        debug!(
1269            target = "duroxide::providers::postgres",
1270            operation = "cleanup_orphaned_sessions",
1271            sessions_cleaned = result,
1272            "Orphaned sessions cleaned up"
1273        );
1274
1275        Ok(result as usize)
1276    }
1277
1278    fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1279        Some(self)
1280    }
1281
1282    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1283    async fn get_custom_status(
1284        &self,
1285        instance: &str,
1286        last_seen_version: u64,
1287    ) -> Result<Option<(Option<String>, u64)>, ProviderError> {
1288        let row = sqlx::query_as::<_, (Option<String>, i64)>(&format!(
1289            "SELECT * FROM {}.get_custom_status($1, $2)",
1290            self.schema_name
1291        ))
1292        .bind(instance)
1293        .bind(last_seen_version as i64)
1294        .fetch_optional(&*self.pool)
1295        .await
1296        .map_err(|e| Self::sqlx_to_provider_error("get_custom_status", e))?;
1297
1298        match row {
1299            Some((custom_status, version)) => Ok(Some((custom_status, version as u64))),
1300            None => Ok(None),
1301        }
1302    }
1303}
1304
1305#[async_trait::async_trait]
1306impl ProviderAdmin for PostgresProvider {
1307    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1308    async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1309        sqlx::query_scalar(&format!(
1310            "SELECT instance_id FROM {}.list_instances()",
1311            self.schema_name
1312        ))
1313        .fetch_all(&*self.pool)
1314        .await
1315        .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1316    }
1317
1318    #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1319    async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1320        sqlx::query_scalar(&format!(
1321            "SELECT instance_id FROM {}.list_instances_by_status($1)",
1322            self.schema_name
1323        ))
1324        .bind(status)
1325        .fetch_all(&*self.pool)
1326        .await
1327        .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1328    }
1329
1330    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1331    async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1332        let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1333            "SELECT execution_id FROM {}.list_executions($1)",
1334            self.schema_name
1335        ))
1336        .bind(instance)
1337        .fetch_all(&*self.pool)
1338        .await
1339        .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1340
1341        Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1342    }
1343
1344    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1345    async fn read_history_with_execution_id(
1346        &self,
1347        instance: &str,
1348        execution_id: u64,
1349    ) -> Result<Vec<Event>, ProviderError> {
1350        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1351            "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1352            self.schema_name
1353        ))
1354        .bind(instance)
1355        .bind(execution_id as i64)
1356        .fetch_all(&*self.pool)
1357        .await
1358        .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1359
1360        event_data_rows
1361            .into_iter()
1362            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1363            .collect::<Vec<Event>>()
1364            .into_iter()
1365            .map(Ok)
1366            .collect()
1367    }
1368
1369    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1370    async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1371        let execution_id = self.latest_execution_id(instance).await?;
1372        self.read_history_with_execution_id(instance, execution_id)
1373            .await
1374    }
1375
1376    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1377    async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1378        sqlx::query_scalar(&format!(
1379            "SELECT {}.latest_execution_id($1)",
1380            self.schema_name
1381        ))
1382        .bind(instance)
1383        .fetch_optional(&*self.pool)
1384        .await
1385        .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1386        .map(|id: i64| id as u64)
1387        .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1388    }
1389
1390    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1391    async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1392        let row: Option<(
1393            String,
1394            String,
1395            String,
1396            i64,
1397            chrono::DateTime<Utc>,
1398            Option<chrono::DateTime<Utc>>,
1399            Option<String>,
1400            Option<String>,
1401            Option<String>,
1402        )> = sqlx::query_as(&format!(
1403            "SELECT * FROM {}.get_instance_info($1)",
1404            self.schema_name
1405        ))
1406        .bind(instance)
1407        .fetch_optional(&*self.pool)
1408        .await
1409        .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1410
1411        let (
1412            instance_id,
1413            orchestration_name,
1414            orchestration_version,
1415            current_execution_id,
1416            created_at,
1417            updated_at,
1418            status,
1419            output,
1420            parent_instance_id,
1421        ) =
1422            row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1423
1424        Ok(InstanceInfo {
1425            instance_id,
1426            orchestration_name,
1427            orchestration_version,
1428            current_execution_id: current_execution_id as u64,
1429            status: status.unwrap_or_else(|| "Running".to_string()),
1430            output,
1431            created_at: created_at.timestamp_millis() as u64,
1432            updated_at: updated_at
1433                .map(|dt| dt.timestamp_millis() as u64)
1434                .unwrap_or(created_at.timestamp_millis() as u64),
1435            parent_instance_id,
1436        })
1437    }
1438
1439    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1440    async fn get_execution_info(
1441        &self,
1442        instance: &str,
1443        execution_id: u64,
1444    ) -> Result<ExecutionInfo, ProviderError> {
1445        let row: Option<(
1446            i64,
1447            String,
1448            Option<String>,
1449            chrono::DateTime<Utc>,
1450            Option<chrono::DateTime<Utc>>,
1451            i64,
1452        )> = sqlx::query_as(&format!(
1453            "SELECT * FROM {}.get_execution_info($1, $2)",
1454            self.schema_name
1455        ))
1456        .bind(instance)
1457        .bind(execution_id as i64)
1458        .fetch_optional(&*self.pool)
1459        .await
1460        .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1461
1462        let (exec_id, status, output, started_at, completed_at, event_count) = row
1463            .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1464
1465        Ok(ExecutionInfo {
1466            execution_id: exec_id as u64,
1467            status,
1468            output,
1469            started_at: started_at.timestamp_millis() as u64,
1470            completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1471            event_count: event_count as usize,
1472        })
1473    }
1474
1475    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1476    async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1477        let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1478            "SELECT * FROM {}.get_system_metrics()",
1479            self.schema_name
1480        ))
1481        .fetch_optional(&*self.pool)
1482        .await
1483        .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1484
1485        let (
1486            total_instances,
1487            total_executions,
1488            running_instances,
1489            completed_instances,
1490            failed_instances,
1491            total_events,
1492        ) = row.ok_or_else(|| {
1493            ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1494        })?;
1495
1496        Ok(SystemMetrics {
1497            total_instances: total_instances as u64,
1498            total_executions: total_executions as u64,
1499            running_instances: running_instances as u64,
1500            completed_instances: completed_instances as u64,
1501            failed_instances: failed_instances as u64,
1502            total_events: total_events as u64,
1503        })
1504    }
1505
1506    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1507    async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1508        let now_ms = Self::now_millis();
1509
1510        let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1511            "SELECT * FROM {}.get_queue_depths($1)",
1512            self.schema_name
1513        ))
1514        .bind(now_ms)
1515        .fetch_optional(&*self.pool)
1516        .await
1517        .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1518
1519        let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1520            ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1521        })?;
1522
1523        Ok(QueueDepths {
1524            orchestrator_queue: orchestrator_queue as usize,
1525            worker_queue: worker_queue as usize,
1526            timer_queue: 0, // Timers are in orchestrator queue with delayed visibility
1527        })
1528    }
1529
1530    // ===== Hierarchy Primitive Operations =====
1531
1532    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1533    async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError> {
1534        sqlx::query_scalar(&format!(
1535            "SELECT child_instance_id FROM {}.list_children($1)",
1536            self.schema_name
1537        ))
1538        .bind(instance_id)
1539        .fetch_all(&*self.pool)
1540        .await
1541        .map_err(|e| Self::sqlx_to_provider_error("list_children", e))
1542    }
1543
1544    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1545    async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError> {
1546        // The stored procedure raises an exception if instance doesn't exist
1547        // Otherwise returns the parent_instance_id (which may be NULL)
1548        let result: Result<Option<String>, _> = sqlx::query_scalar(&format!(
1549            "SELECT {}.get_parent_id($1)",
1550            self.schema_name
1551        ))
1552        .bind(instance_id)
1553        .fetch_one(&*self.pool)
1554        .await;
1555
1556        match result {
1557            Ok(parent_id) => Ok(parent_id),
1558            Err(e) => {
1559                let err_str = e.to_string();
1560                if err_str.contains("Instance not found") {
1561                    Err(ProviderError::permanent(
1562                        "get_parent_id",
1563                        format!("Instance not found: {}", instance_id),
1564                    ))
1565                } else {
1566                    Err(Self::sqlx_to_provider_error("get_parent_id", e))
1567                }
1568            }
1569        }
1570    }
1571
1572    // ===== Deletion Operations =====
1573
1574    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1575    async fn delete_instances_atomic(
1576        &self,
1577        ids: &[String],
1578        force: bool,
1579    ) -> Result<DeleteInstanceResult, ProviderError> {
1580        if ids.is_empty() {
1581            return Ok(DeleteInstanceResult::default());
1582        }
1583
1584        let row: Option<(i64, i64, i64, i64)> = sqlx::query_as(&format!(
1585            "SELECT * FROM {}.delete_instances_atomic($1, $2)",
1586            self.schema_name
1587        ))
1588        .bind(ids)
1589        .bind(force)
1590        .fetch_optional(&*self.pool)
1591        .await
1592        .map_err(|e| {
1593            let err_str = e.to_string();
1594            if err_str.contains("is Running") {
1595                ProviderError::permanent(
1596                    "delete_instances_atomic",
1597                    err_str,
1598                )
1599            } else if err_str.contains("Orphan detected") {
1600                ProviderError::permanent(
1601                    "delete_instances_atomic",
1602                    err_str,
1603                )
1604            } else {
1605                Self::sqlx_to_provider_error("delete_instances_atomic", e)
1606            }
1607        })?;
1608
1609        let (instances_deleted, executions_deleted, events_deleted, queue_messages_deleted) =
1610            row.unwrap_or((0, 0, 0, 0));
1611
1612        debug!(
1613            target = "duroxide::providers::postgres",
1614            operation = "delete_instances_atomic",
1615            instances_deleted = instances_deleted,
1616            executions_deleted = executions_deleted,
1617            events_deleted = events_deleted,
1618            queue_messages_deleted = queue_messages_deleted,
1619            "Deleted instances atomically"
1620        );
1621
1622        Ok(DeleteInstanceResult {
1623            instances_deleted: instances_deleted as u64,
1624            executions_deleted: executions_deleted as u64,
1625            events_deleted: events_deleted as u64,
1626            queue_messages_deleted: queue_messages_deleted as u64,
1627        })
1628    }
1629
1630    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1631    async fn delete_instance_bulk(
1632        &self,
1633        filter: InstanceFilter,
1634    ) -> Result<DeleteInstanceResult, ProviderError> {
1635        // Build query to find matching root instances in terminal states
1636        let mut sql = format!(
1637            r#"
1638            SELECT i.instance_id
1639            FROM {}.instances i
1640            LEFT JOIN {}.executions e ON i.instance_id = e.instance_id 
1641              AND i.current_execution_id = e.execution_id
1642            WHERE i.parent_instance_id IS NULL
1643              AND e.status IN ('Completed', 'Failed', 'ContinuedAsNew')
1644            "#,
1645            self.schema_name, self.schema_name
1646        );
1647
1648        // Add instance_ids filter if provided
1649        if let Some(ref ids) = filter.instance_ids {
1650            if ids.is_empty() {
1651                return Ok(DeleteInstanceResult::default());
1652            }
1653            let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1654            sql.push_str(&format!(
1655                " AND i.instance_id IN ({})",
1656                placeholders.join(", ")
1657            ));
1658        }
1659
1660        // Add completed_before filter if provided
1661        if filter.completed_before.is_some() {
1662            let param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0) + 1;
1663            sql.push_str(&format!(
1664                " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1665                param_num
1666            ));
1667        }
1668
1669        // Add limit
1670        let limit = filter.limit.unwrap_or(1000);
1671        let limit_param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0)
1672            + if filter.completed_before.is_some() { 1 } else { 0 }
1673            + 1;
1674        sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1675
1676        // Build and execute query
1677        let mut query = sqlx::query_scalar::<_, String>(&sql);
1678        if let Some(ref ids) = filter.instance_ids {
1679            for id in ids {
1680                query = query.bind(id);
1681            }
1682        }
1683        if let Some(completed_before) = filter.completed_before {
1684            query = query.bind(completed_before as i64);
1685        }
1686        query = query.bind(limit as i64);
1687
1688        let instance_ids: Vec<String> = query
1689            .fetch_all(&*self.pool)
1690            .await
1691            .map_err(|e| Self::sqlx_to_provider_error("delete_instance_bulk", e))?;
1692
1693        if instance_ids.is_empty() {
1694            return Ok(DeleteInstanceResult::default());
1695        }
1696
1697        // Delete each instance with cascade
1698        let mut result = DeleteInstanceResult::default();
1699
1700        for instance_id in &instance_ids {
1701            // Get full tree for this root
1702            let tree = self.get_instance_tree(instance_id).await?;
1703
1704            // Atomic delete (tree.all_ids is already in deletion order: children first)
1705            let delete_result = self.delete_instances_atomic(&tree.all_ids, true).await?;
1706            result.instances_deleted += delete_result.instances_deleted;
1707            result.executions_deleted += delete_result.executions_deleted;
1708            result.events_deleted += delete_result.events_deleted;
1709            result.queue_messages_deleted += delete_result.queue_messages_deleted;
1710        }
1711
1712        debug!(
1713            target = "duroxide::providers::postgres",
1714            operation = "delete_instance_bulk",
1715            instances_deleted = result.instances_deleted,
1716            executions_deleted = result.executions_deleted,
1717            events_deleted = result.events_deleted,
1718            queue_messages_deleted = result.queue_messages_deleted,
1719            "Bulk deleted instances"
1720        );
1721
1722        Ok(result)
1723    }
1724
1725    // ===== Pruning Operations =====
1726
1727    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1728    async fn prune_executions(
1729        &self,
1730        instance_id: &str,
1731        options: PruneOptions,
1732    ) -> Result<PruneResult, ProviderError> {
1733        let keep_last: Option<i32> = options.keep_last.map(|v| v as i32);
1734        let completed_before_ms: Option<i64> = options.completed_before.map(|v| v as i64);
1735
1736        let row: Option<(i64, i64, i64)> = sqlx::query_as(&format!(
1737            "SELECT * FROM {}.prune_executions($1, $2, $3)",
1738            self.schema_name
1739        ))
1740        .bind(instance_id)
1741        .bind(keep_last)
1742        .bind(completed_before_ms)
1743        .fetch_optional(&*self.pool)
1744        .await
1745        .map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?;
1746
1747        let (instances_processed, executions_deleted, events_deleted) = row.unwrap_or((0, 0, 0));
1748
1749        debug!(
1750            target = "duroxide::providers::postgres",
1751            operation = "prune_executions",
1752            instance_id = %instance_id,
1753            instances_processed = instances_processed,
1754            executions_deleted = executions_deleted,
1755            events_deleted = events_deleted,
1756            "Pruned executions"
1757        );
1758
1759        Ok(PruneResult {
1760            instances_processed: instances_processed as u64,
1761            executions_deleted: executions_deleted as u64,
1762            events_deleted: events_deleted as u64,
1763        })
1764    }
1765
1766    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1767    async fn prune_executions_bulk(
1768        &self,
1769        filter: InstanceFilter,
1770        options: PruneOptions,
1771    ) -> Result<PruneResult, ProviderError> {
1772        // Find matching instances (all statuses - prune_executions protects current execution)
1773        // Note: We include Running instances because long-running orchestrations (e.g., with
1774        // ContinueAsNew) may have old executions that need pruning. The underlying prune_executions
1775        // call safely skips the current execution regardless of its status.
1776        let mut sql = format!(
1777            r#"
1778            SELECT i.instance_id
1779            FROM {}.instances i
1780            LEFT JOIN {}.executions e ON i.instance_id = e.instance_id 
1781              AND i.current_execution_id = e.execution_id
1782            WHERE 1=1
1783            "#,
1784            self.schema_name, self.schema_name
1785        );
1786
1787        // Add instance_ids filter if provided
1788        if let Some(ref ids) = filter.instance_ids {
1789            if ids.is_empty() {
1790                return Ok(PruneResult::default());
1791            }
1792            let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1793            sql.push_str(&format!(
1794                " AND i.instance_id IN ({})",
1795                placeholders.join(", ")
1796            ));
1797        }
1798
1799        // Add completed_before filter if provided
1800        if filter.completed_before.is_some() {
1801            let param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0) + 1;
1802            sql.push_str(&format!(
1803                " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1804                param_num
1805            ));
1806        }
1807
1808        // Add limit
1809        let limit = filter.limit.unwrap_or(1000);
1810        let limit_param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0)
1811            + if filter.completed_before.is_some() { 1 } else { 0 }
1812            + 1;
1813        sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1814
1815        // Build and execute query
1816        let mut query = sqlx::query_scalar::<_, String>(&sql);
1817        if let Some(ref ids) = filter.instance_ids {
1818            for id in ids {
1819                query = query.bind(id);
1820            }
1821        }
1822        if let Some(completed_before) = filter.completed_before {
1823            query = query.bind(completed_before as i64);
1824        }
1825        query = query.bind(limit as i64);
1826
1827        let instance_ids: Vec<String> = query
1828            .fetch_all(&*self.pool)
1829            .await
1830            .map_err(|e| Self::sqlx_to_provider_error("prune_executions_bulk", e))?;
1831
1832        // Prune each instance
1833        let mut result = PruneResult::default();
1834
1835        for instance_id in &instance_ids {
1836            let single_result = self.prune_executions(instance_id, options.clone()).await?;
1837            result.instances_processed += single_result.instances_processed;
1838            result.executions_deleted += single_result.executions_deleted;
1839            result.events_deleted += single_result.events_deleted;
1840        }
1841
1842        debug!(
1843            target = "duroxide::providers::postgres",
1844            operation = "prune_executions_bulk",
1845            instances_processed = result.instances_processed,
1846            executions_deleted = result.executions_deleted,
1847            events_deleted = result.events_deleted,
1848            "Bulk pruned executions"
1849        );
1850
1851        Ok(result)
1852    }
1853}