Skip to main content

duroxide_pg/
provider.rs

1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4    DeleteInstanceResult, DispatcherCapabilityFilter, ExecutionInfo, ExecutionMetadata,
5    InstanceFilter, InstanceInfo, OrchestrationItem, Provider, ProviderAdmin, ProviderError,
6    PruneOptions, PruneResult, QueueDepths, ScheduledActivityIdentifier, SessionFetchConfig,
7    SystemMetrics, WorkItem,
8};
9use duroxide::Event;
10use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
11use std::sync::Arc;
12use std::time::Duration;
13use std::time::{SystemTime, UNIX_EPOCH};
14use tokio::time::sleep;
15use tracing::{debug, error, instrument, warn};
16
17use crate::migrations::MigrationRunner;
18
19/// PostgreSQL-based provider for Duroxide durable orchestrations.
20///
21/// Implements the [`Provider`] and [`ProviderAdmin`] traits from Duroxide,
22/// storing orchestration state, history, and work queues in PostgreSQL.
23///
24/// # Example
25///
26/// ```rust,no_run
27/// use duroxide_pg::PostgresProvider;
28///
29/// # async fn example() -> anyhow::Result<()> {
30/// // Connect using DATABASE_URL or explicit connection string
31/// let provider = PostgresProvider::new("postgres://localhost/mydb").await?;
32///
33/// // Or use a custom schema for isolation
34/// let provider = PostgresProvider::new_with_schema(
35///     "postgres://localhost/mydb",
36///     Some("my_app"),
37/// ).await?;
38/// # Ok(())
39/// # }
40/// ```
41pub struct PostgresProvider {
42    pool: Arc<PgPool>,
43    schema_name: String,
44}
45
46impl PostgresProvider {
47    pub async fn new(database_url: &str) -> Result<Self> {
48        Self::new_with_schema(database_url, None).await
49    }
50
51    pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
52        let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
53            .ok()
54            .and_then(|s| s.parse::<u32>().ok())
55            .unwrap_or(10);
56
57        let pool = PgPoolOptions::new()
58            .max_connections(max_connections)
59            .min_connections(1)
60            .acquire_timeout(std::time::Duration::from_secs(30))
61            .connect(database_url)
62            .await?;
63
64        let schema_name = schema_name.unwrap_or("public").to_string();
65
66        let provider = Self {
67            pool: Arc::new(pool),
68            schema_name: schema_name.clone(),
69        };
70
71        // Run migrations to initialize schema
72        let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
73        migration_runner.migrate().await?;
74
75        Ok(provider)
76    }
77
78    #[instrument(skip(self), target = "duroxide::providers::postgres")]
79    pub async fn initialize_schema(&self) -> Result<()> {
80        // Schema initialization is now handled by migrations
81        // This method is kept for backward compatibility but delegates to migrations
82        let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
83        migration_runner.migrate().await?;
84        Ok(())
85    }
86
87    /// Get current timestamp in milliseconds (Unix epoch)
88    fn now_millis() -> i64 {
89        SystemTime::now()
90            .duration_since(UNIX_EPOCH)
91            .unwrap()
92            .as_millis() as i64
93    }
94
95
96
97    /// Get schema-qualified table name
98    fn table_name(&self, table: &str) -> String {
99        format!("{}.{}", self.schema_name, table)
100    }
101
102    /// Get the database pool (for testing)
103    pub fn pool(&self) -> &PgPool {
104        &self.pool
105    }
106
107    /// Get the schema name (for testing)
108    pub fn schema_name(&self) -> &str {
109        &self.schema_name
110    }
111
112    /// Convert sqlx::Error to ProviderError with proper classification
113    fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
114        match e {
115            SqlxError::Database(ref db_err) => {
116                // PostgreSQL error codes
117                let code_opt = db_err.code();
118                let code = code_opt.as_deref();
119                if code == Some("40P01") {
120                    // Deadlock detected
121                    ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
122                } else if code == Some("40001") {
123                    // Serialization failure - permanent error (transaction conflict, not transient)
124                    ProviderError::permanent(operation, format!("Serialization failure: {e}"))
125                } else if code == Some("23505") {
126                    // Unique constraint violation (duplicate event)
127                    ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
128                } else if code == Some("23503") {
129                    // Foreign key constraint violation
130                    ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
131                } else {
132                    ProviderError::permanent(operation, format!("Database error: {e}"))
133                }
134            }
135            SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
136                ProviderError::retryable(operation, format!("Connection pool error: {e}"))
137            }
138            SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
139            _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
140        }
141    }
142
143    /// Clean up schema after tests (drops all tables and optionally the schema)
144    ///
145    /// **SAFETY**: Never drops the "public" schema itself, only tables within it.
146    /// Only drops the schema if it's a custom schema (not "public").
147    pub async fn cleanup_schema(&self) -> Result<()> {
148        // Call the stored procedure to drop all tables
149        sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
150            .execute(&*self.pool)
151            .await?;
152
153        // SAFETY: Never drop the "public" schema - it's a PostgreSQL system schema
154        // Only drop custom schemas created for testing
155        if self.schema_name != "public" {
156            sqlx::query(&format!(
157                "DROP SCHEMA IF EXISTS {} CASCADE",
158                self.schema_name
159            ))
160            .execute(&*self.pool)
161            .await?;
162        } else {
163            // Explicit safeguard: we only drop tables from public schema, never the schema itself
164            // This ensures we don't accidentally drop the default PostgreSQL schema
165        }
166
167        Ok(())
168    }
169}
170
171#[async_trait::async_trait]
172impl Provider for PostgresProvider {
173    fn name(&self) -> &str {
174        "duroxide-pg"
175    }
176
177    fn version(&self) -> &str {
178        env!("CARGO_PKG_VERSION")
179    }
180
181    #[instrument(skip(self), target = "duroxide::providers::postgres")]
182    async fn fetch_orchestration_item(
183        &self,
184        lock_timeout: Duration,
185        _poll_timeout: Duration,
186        filter: Option<&DispatcherCapabilityFilter>,
187    ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
188        let start = std::time::Instant::now();
189
190        const MAX_RETRIES: u32 = 3;
191        const RETRY_DELAY_MS: u64 = 50;
192
193        // Convert Duration to milliseconds
194        let lock_timeout_ms = lock_timeout.as_millis() as i64;
195        let mut _last_error: Option<ProviderError> = None;
196
197        // Extract version filter from capability filter
198        let (min_packed, max_packed) = if let Some(f) = filter {
199            if let Some(range) = f.supported_duroxide_versions.first() {
200                let min = range.min.major as i64 * 1_000_000 + range.min.minor as i64 * 1_000 + range.min.patch as i64;
201                let max = range.max.major as i64 * 1_000_000 + range.max.minor as i64 * 1_000 + range.max.patch as i64;
202                (Some(min), Some(max))
203            } else {
204                // Empty supported_duroxide_versions = supports nothing
205                return Ok(None);
206            }
207        } else {
208            (None, None)
209        };
210
211        for attempt in 0..=MAX_RETRIES {
212            let now_ms = Self::now_millis();
213
214            let result: Result<
215                Option<(
216                    String,
217                    String,
218                    String,
219                    i64,
220                    serde_json::Value,
221                    serde_json::Value,
222                    String,
223                    i32,
224                )>,
225                SqlxError,
226            > = sqlx::query_as(&format!(
227                "SELECT * FROM {}.fetch_orchestration_item($1, $2, $3, $4)",
228                self.schema_name
229            ))
230            .bind(now_ms)
231            .bind(lock_timeout_ms)
232            .bind(min_packed)
233            .bind(max_packed)
234            .fetch_optional(&*self.pool)
235            .await;
236
237            let row = match result {
238                Ok(r) => r,
239                Err(e) => {
240                    let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
241                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
242                        warn!(
243                            target = "duroxide::providers::postgres",
244                            operation = "fetch_orchestration_item",
245                            attempt = attempt + 1,
246                            error = %provider_err,
247                            "Retryable error, will retry"
248                        );
249                        _last_error = Some(provider_err);
250                        sleep(std::time::Duration::from_millis(
251                            RETRY_DELAY_MS * (attempt as u64 + 1),
252                        ))
253                        .await;
254                        continue;
255                    }
256                    return Err(provider_err);
257                }
258            };
259
260            if let Some((
261                instance_id,
262                orchestration_name,
263                orchestration_version,
264                execution_id,
265                history_json,
266                messages_json,
267                lock_token,
268                attempt_count,
269            )) = row
270            {
271                let (history, history_error) = match serde_json::from_value::<Vec<Event>>(history_json) {
272                    Ok(h) => (h, None),
273                    Err(e) => {
274                        let error_msg = format!("Failed to deserialize history: {e}");
275                        warn!(
276                            target = "duroxide::providers::postgres",
277                            instance = %instance_id,
278                            error = %error_msg,
279                            "History deserialization failed, returning item with history_error"
280                        );
281                        (vec![], Some(error_msg))
282                    }
283                };
284
285                let messages: Vec<WorkItem> =
286                    serde_json::from_value(messages_json).map_err(|e| {
287                        ProviderError::permanent(
288                            "fetch_orchestration_item",
289                            format!("Failed to deserialize messages: {e}"),
290                        )
291                    })?;
292
293                let duration_ms = start.elapsed().as_millis() as u64;
294                debug!(
295                    target = "duroxide::providers::postgres",
296                    operation = "fetch_orchestration_item",
297                    instance_id = %instance_id,
298                    execution_id = execution_id,
299                    message_count = messages.len(),
300                    history_count = history.len(),
301                    attempt_count = attempt_count,
302                    duration_ms = duration_ms,
303                    attempts = attempt + 1,
304                    "Fetched orchestration item via stored procedure"
305                );
306
307                return Ok(Some((
308                    OrchestrationItem {
309                        instance: instance_id,
310                        orchestration_name,
311                        execution_id: execution_id as u64,
312                        version: orchestration_version,
313                        history,
314                        messages,
315                        history_error,
316                    },
317                    lock_token,
318                    attempt_count as u32,
319                )));
320            }
321
322            // No result found - return immediately (short polling behavior)
323            // Only retry with delay on retryable errors (handled above)
324            return Ok(None);
325        }
326
327        Ok(None)
328    }
329    #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
330    async fn ack_orchestration_item(
331        &self,
332        lock_token: &str,
333        execution_id: u64,
334        history_delta: Vec<Event>,
335        worker_items: Vec<WorkItem>,
336        orchestrator_items: Vec<WorkItem>,
337        metadata: ExecutionMetadata,
338        cancelled_activities: Vec<ScheduledActivityIdentifier>,
339    ) -> Result<(), ProviderError> {
340        let start = std::time::Instant::now();
341
342        const MAX_RETRIES: u32 = 3;
343        const RETRY_DELAY_MS: u64 = 50;
344
345        let mut history_delta_payload = Vec::with_capacity(history_delta.len());
346        for event in &history_delta {
347            if event.event_id() == 0 {
348                return Err(ProviderError::permanent(
349                    "ack_orchestration_item",
350                    "event_id must be set by runtime",
351                ));
352            }
353
354            let event_json = serde_json::to_string(event).map_err(|e| {
355                ProviderError::permanent(
356                    "ack_orchestration_item",
357                    format!("Failed to serialize event: {e}"),
358                )
359            })?;
360
361            let event_type = format!("{event:?}")
362                .split('{')
363                .next()
364                .unwrap_or("Unknown")
365                .trim()
366                .to_string();
367
368            history_delta_payload.push(serde_json::json!({
369                "event_id": event.event_id(),
370                "event_type": event_type,
371                "event_data": event_json,
372            }));
373        }
374
375        let history_delta_json = serde_json::Value::Array(history_delta_payload);
376
377        let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
378            ProviderError::permanent(
379                "ack_orchestration_item",
380                format!("Failed to serialize worker items: {e}"),
381            )
382        })?;
383
384        let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
385            ProviderError::permanent(
386                "ack_orchestration_item",
387                format!("Failed to serialize orchestrator items: {e}"),
388            )
389        })?;
390
391        let metadata_json = serde_json::json!({
392            "orchestration_name": metadata.orchestration_name,
393            "orchestration_version": metadata.orchestration_version,
394            "status": metadata.status,
395            "output": metadata.output,
396            "parent_instance_id": metadata.parent_instance_id,
397            "pinned_duroxide_version": metadata.pinned_duroxide_version.as_ref().map(|v| {
398                serde_json::json!({
399                    "major": v.major,
400                    "minor": v.minor,
401                    "patch": v.patch,
402                })
403            }),
404        });
405
406        // Serialize cancelled activities for lock stealing
407        let cancelled_activities_json: Vec<serde_json::Value> = cancelled_activities
408            .iter()
409            .map(|a| {
410                serde_json::json!({
411                    "instance": a.instance,
412                    "execution_id": a.execution_id,
413                    "activity_id": a.activity_id,
414                })
415            })
416            .collect();
417        let cancelled_activities_json = serde_json::Value::Array(cancelled_activities_json);
418
419        for attempt in 0..=MAX_RETRIES {
420            let now_ms = Self::now_millis();
421            let result = sqlx::query(&format!(
422                "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6, $7, $8)",
423                self.schema_name
424            ))
425            .bind(lock_token)
426            .bind(now_ms)
427            .bind(execution_id as i64)
428            .bind(&history_delta_json)
429            .bind(&worker_items_json)
430            .bind(&orchestrator_items_json)
431            .bind(&metadata_json)
432            .bind(&cancelled_activities_json)
433            .execute(&*self.pool)
434            .await;
435
436            match result {
437                Ok(_) => {
438                    let duration_ms = start.elapsed().as_millis() as u64;
439                    debug!(
440                        target = "duroxide::providers::postgres",
441                        operation = "ack_orchestration_item",
442                        execution_id = execution_id,
443                        history_count = history_delta.len(),
444                        worker_items_count = worker_items.len(),
445                        orchestrator_items_count = orchestrator_items.len(),
446                        cancelled_activities_count = cancelled_activities.len(),
447                        duration_ms = duration_ms,
448                        attempts = attempt + 1,
449                        "Acknowledged orchestration item via stored procedure"
450                    );
451                    return Ok(());
452                }
453                Err(e) => {
454                    // Check for permanent errors first
455                    if let SqlxError::Database(db_err) = &e {
456                        if db_err.message().contains("Invalid lock token") {
457                            return Err(ProviderError::permanent(
458                                "ack_orchestration_item",
459                                "Invalid lock token",
460                            ));
461                        }
462                    } else if e.to_string().contains("Invalid lock token") {
463                        return Err(ProviderError::permanent(
464                            "ack_orchestration_item",
465                            "Invalid lock token",
466                        ));
467                    }
468
469                    let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
470                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
471                        warn!(
472                            target = "duroxide::providers::postgres",
473                            operation = "ack_orchestration_item",
474                            attempt = attempt + 1,
475                            error = %provider_err,
476                            "Retryable error, will retry"
477                        );
478                        sleep(std::time::Duration::from_millis(
479                            RETRY_DELAY_MS * (attempt as u64 + 1),
480                        ))
481                        .await;
482                        continue;
483                    }
484                    return Err(provider_err);
485                }
486            }
487        }
488
489        // Should never reach here, but just in case
490        Ok(())
491    }
492    #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
493    async fn abandon_orchestration_item(
494        &self,
495        lock_token: &str,
496        delay: Option<Duration>,
497        ignore_attempt: bool,
498    ) -> Result<(), ProviderError> {
499        let start = std::time::Instant::now();
500        let now_ms = Self::now_millis();
501        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
502
503        let instance_id = match sqlx::query_scalar::<_, String>(&format!(
504            "SELECT {}.abandon_orchestration_item($1, $2, $3, $4)",
505            self.schema_name
506        ))
507        .bind(lock_token)
508        .bind(now_ms)
509        .bind(delay_param)
510        .bind(ignore_attempt)
511        .fetch_one(&*self.pool)
512        .await
513        {
514            Ok(instance_id) => instance_id,
515            Err(e) => {
516                if let SqlxError::Database(db_err) = &e {
517                    if db_err.message().contains("Invalid lock token") {
518                        return Err(ProviderError::permanent(
519                            "abandon_orchestration_item",
520                            "Invalid lock token",
521                        ));
522                    }
523                } else if e.to_string().contains("Invalid lock token") {
524                    return Err(ProviderError::permanent(
525                        "abandon_orchestration_item",
526                        "Invalid lock token",
527                    ));
528                }
529
530                return Err(Self::sqlx_to_provider_error(
531                    "abandon_orchestration_item",
532                    e,
533                ));
534            }
535        };
536
537        let duration_ms = start.elapsed().as_millis() as u64;
538        debug!(
539            target = "duroxide::providers::postgres",
540            operation = "abandon_orchestration_item",
541            instance_id = %instance_id,
542            delay_ms = delay.map(|d| d.as_millis() as u64),
543            ignore_attempt = ignore_attempt,
544            duration_ms = duration_ms,
545            "Abandoned orchestration item via stored procedure"
546        );
547
548        Ok(())
549    }
550
551    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
552    async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
553        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
554            "SELECT out_event_data FROM {}.fetch_history($1)",
555            self.schema_name
556        ))
557        .bind(instance)
558        .fetch_all(&*self.pool)
559        .await
560        .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
561
562        Ok(event_data_rows
563            .into_iter()
564            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
565            .collect())
566    }
567
568    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
569    async fn append_with_execution(
570        &self,
571        instance: &str,
572        execution_id: u64,
573        new_events: Vec<Event>,
574    ) -> Result<(), ProviderError> {
575        if new_events.is_empty() {
576            return Ok(());
577        }
578
579        let mut events_payload = Vec::with_capacity(new_events.len());
580        for event in &new_events {
581            if event.event_id() == 0 {
582                error!(
583                    target = "duroxide::providers::postgres",
584                    operation = "append_with_execution",
585                    error_type = "validation_error",
586                    instance_id = %instance,
587                    execution_id = execution_id,
588                    "event_id must be set by runtime"
589                );
590                return Err(ProviderError::permanent(
591                    "append_with_execution",
592                    "event_id must be set by runtime",
593                ));
594            }
595
596            let event_json = serde_json::to_string(event).map_err(|e| {
597                ProviderError::permanent(
598                    "append_with_execution",
599                    format!("Failed to serialize event: {e}"),
600                )
601            })?;
602
603            let event_type = format!("{event:?}")
604                .split('{')
605                .next()
606                .unwrap_or("Unknown")
607                .trim()
608                .to_string();
609
610            events_payload.push(serde_json::json!({
611                "event_id": event.event_id(),
612                "event_type": event_type,
613                "event_data": event_json,
614            }));
615        }
616
617        let events_json = serde_json::Value::Array(events_payload);
618
619        sqlx::query(&format!(
620            "SELECT {}.append_history($1, $2, $3)",
621            self.schema_name
622        ))
623        .bind(instance)
624        .bind(execution_id as i64)
625        .bind(events_json)
626        .execute(&*self.pool)
627        .await
628        .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
629
630        debug!(
631            target = "duroxide::providers::postgres",
632            operation = "append_with_execution",
633            instance_id = %instance,
634            execution_id = execution_id,
635            event_count = new_events.len(),
636            "Appended history events via stored procedure"
637        );
638
639        Ok(())
640    }
641
642    #[instrument(skip(self), target = "duroxide::providers::postgres")]
643    async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
644        let work_item = serde_json::to_string(&item).map_err(|e| {
645            ProviderError::permanent(
646                "enqueue_worker_work",
647                format!("Failed to serialize work item: {e}"),
648            )
649        })?;
650
651        let now_ms = Self::now_millis();
652
653        // Extract activity identification and session_id for ActivityExecute items
654        let (instance_id, execution_id, activity_id, session_id) = match &item {
655            WorkItem::ActivityExecute {
656                instance,
657                execution_id,
658                id,
659                session_id,
660                ..
661            } => (
662                Some(instance.clone()),
663                Some(*execution_id as i64),
664                Some(*id as i64),
665                session_id.clone(),
666            ),
667            _ => (None, None, None, None),
668        };
669
670        sqlx::query(&format!(
671            "SELECT {}.enqueue_worker_work($1, $2, $3, $4, $5, $6)",
672            self.schema_name
673        ))
674        .bind(work_item)
675        .bind(now_ms)
676        .bind(&instance_id)
677        .bind(execution_id)
678        .bind(activity_id)
679        .bind(&session_id)
680        .execute(&*self.pool)
681        .await
682        .map_err(|e| {
683            error!(
684                target = "duroxide::providers::postgres",
685                operation = "enqueue_worker_work",
686                error_type = "database_error",
687                error = %e,
688                "Failed to enqueue worker work"
689            );
690            Self::sqlx_to_provider_error("enqueue_worker_work", e)
691        })?;
692
693        Ok(())
694    }
695
696    #[instrument(skip(self), target = "duroxide::providers::postgres")]
697    async fn fetch_work_item(
698        &self,
699        lock_timeout: Duration,
700        _poll_timeout: Duration,
701        session: Option<&SessionFetchConfig>,
702    ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
703        let start = std::time::Instant::now();
704
705        // Convert Duration to milliseconds
706        let lock_timeout_ms = lock_timeout.as_millis() as i64;
707
708        // Extract session parameters
709        let (owner_id, session_lock_timeout_ms): (Option<&str>, Option<i64>) = match session {
710            Some(config) => (
711                Some(&config.owner_id),
712                Some(config.lock_timeout.as_millis() as i64),
713            ),
714            None => (None, None),
715        };
716
717        let row = match sqlx::query_as::<_, (String, String, i32)>(&format!(
718            "SELECT * FROM {}.fetch_work_item($1, $2, $3, $4)",
719            self.schema_name
720        ))
721        .bind(Self::now_millis())
722        .bind(lock_timeout_ms)
723        .bind(owner_id)
724        .bind(session_lock_timeout_ms)
725        .fetch_optional(&*self.pool)
726        .await
727        {
728            Ok(row) => row,
729            Err(e) => {
730                return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
731            }
732        };
733
734        let (work_item_json, lock_token, attempt_count) = match row {
735            Some(row) => row,
736            None => return Ok(None),
737        };
738
739        let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
740            ProviderError::permanent(
741                "fetch_work_item",
742                format!("Failed to deserialize worker item: {e}"),
743            )
744        })?;
745
746        let duration_ms = start.elapsed().as_millis() as u64;
747
748        // Extract instance for logging - different work item types have different structures
749        let instance_id = match &work_item {
750            WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
751            WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
752            WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
753            WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
754            WorkItem::TimerFired { instance, .. } => instance.as_str(),
755            WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
756            WorkItem::CancelInstance { instance, .. } => instance.as_str(),
757            WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
758            WorkItem::SubOrchCompleted {
759                parent_instance, ..
760            } => parent_instance.as_str(),
761            WorkItem::SubOrchFailed {
762                parent_instance, ..
763            } => parent_instance.as_str(),
764        };
765
766        debug!(
767            target = "duroxide::providers::postgres",
768            operation = "fetch_work_item",
769            instance_id = %instance_id,
770            attempt_count = attempt_count,
771            duration_ms = duration_ms,
772            "Fetched activity work item via stored procedure"
773        );
774
775        Ok(Some((
776            work_item,
777            lock_token,
778            attempt_count as u32,
779        )))
780    }
781
782    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
783    async fn ack_work_item(
784        &self,
785        token: &str,
786        completion: Option<WorkItem>,
787    ) -> Result<(), ProviderError> {
788        let start = std::time::Instant::now();
789
790        // If no completion provided (e.g., cancelled activity), just delete the item
791        let Some(completion) = completion else {
792            // Call ack_worker with NULL completion to delete without enqueueing
793            sqlx::query(&format!(
794                "SELECT {}.ack_worker($1)",
795                self.schema_name
796            ))
797            .bind(token)
798            .execute(&*self.pool)
799            .await
800            .map_err(|e| {
801                if e.to_string().contains("Worker queue item not found") {
802                    ProviderError::permanent(
803                        "ack_worker",
804                        "Worker queue item not found or already processed",
805                    )
806                } else {
807                    Self::sqlx_to_provider_error("ack_worker", e)
808                }
809            })?;
810
811            let duration_ms = start.elapsed().as_millis() as u64;
812            debug!(
813                target = "duroxide::providers::postgres",
814                operation = "ack_worker",
815                token = %token,
816                duration_ms = duration_ms,
817                "Acknowledged worker without completion (cancelled)"
818            );
819            return Ok(());
820        };
821
822        // Extract instance ID from completion WorkItem
823        let instance_id = match &completion {
824            WorkItem::ActivityCompleted { instance, .. }
825            | WorkItem::ActivityFailed { instance, .. } => instance,
826            _ => {
827                error!(
828                    target = "duroxide::providers::postgres",
829                    operation = "ack_worker",
830                    error_type = "invalid_completion_type",
831                    "Invalid completion work item type"
832                );
833                return Err(ProviderError::permanent(
834                    "ack_worker",
835                    "Invalid completion work item type",
836                ));
837            }
838        };
839
840        let completion_json = serde_json::to_string(&completion).map_err(|e| {
841            ProviderError::permanent("ack_worker", format!("Failed to serialize completion: {e}"))
842        })?;
843
844        let now_ms = Self::now_millis();
845
846        // Call stored procedure to atomically delete worker item and enqueue completion
847        sqlx::query(&format!(
848            "SELECT {}.ack_worker($1, $2, $3, $4)",
849            self.schema_name
850        ))
851        .bind(token)
852        .bind(instance_id)
853        .bind(completion_json)
854        .bind(now_ms)
855        .execute(&*self.pool)
856        .await
857        .map_err(|e| {
858            if e.to_string().contains("Worker queue item not found") {
859                error!(
860                    target = "duroxide::providers::postgres",
861                    operation = "ack_worker",
862                    error_type = "worker_item_not_found",
863                    token = %token,
864                    "Worker queue item not found or already processed"
865                );
866                ProviderError::permanent(
867                    "ack_worker",
868                    "Worker queue item not found or already processed",
869                )
870            } else {
871                Self::sqlx_to_provider_error("ack_worker", e)
872            }
873        })?;
874
875        let duration_ms = start.elapsed().as_millis() as u64;
876        debug!(
877            target = "duroxide::providers::postgres",
878            operation = "ack_worker",
879            instance_id = %instance_id,
880            duration_ms = duration_ms,
881            "Acknowledged worker and enqueued completion"
882        );
883
884        Ok(())
885    }
886
887    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
888    async fn renew_work_item_lock(
889        &self,
890        token: &str,
891        extend_for: Duration,
892    ) -> Result<(), ProviderError> {
893        let start = std::time::Instant::now();
894
895        // Get current time from application for consistent time reference
896        let now_ms = Self::now_millis();
897
898        // Convert Duration to seconds for the stored procedure
899        let extend_secs = extend_for.as_secs() as i64;
900
901        match sqlx::query(&format!(
902            "SELECT {}.renew_work_item_lock($1, $2, $3)",
903            self.schema_name
904        ))
905        .bind(token)
906        .bind(now_ms)
907        .bind(extend_secs)
908        .execute(&*self.pool)
909        .await
910        {
911            Ok(_) => {
912                let duration_ms = start.elapsed().as_millis() as u64;
913                debug!(
914                    target = "duroxide::providers::postgres",
915                    operation = "renew_work_item_lock",
916                    token = %token,
917                    extend_for_secs = extend_secs,
918                    duration_ms = duration_ms,
919                    "Work item lock renewed successfully"
920                );
921                Ok(())
922            }
923            Err(e) => {
924                if let SqlxError::Database(db_err) = &e {
925                    if db_err.message().contains("Lock token invalid") {
926                        return Err(ProviderError::permanent(
927                            "renew_work_item_lock",
928                            "Lock token invalid, expired, or already acked",
929                        ));
930                    }
931                } else if e.to_string().contains("Lock token invalid") {
932                    return Err(ProviderError::permanent(
933                        "renew_work_item_lock",
934                        "Lock token invalid, expired, or already acked",
935                    ));
936                }
937
938                Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
939            }
940        }
941    }
942
943    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
944    async fn abandon_work_item(
945        &self,
946        token: &str,
947        delay: Option<Duration>,
948        ignore_attempt: bool,
949    ) -> Result<(), ProviderError> {
950        let start = std::time::Instant::now();
951        let now_ms = Self::now_millis();
952        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
953
954        match sqlx::query(&format!(
955            "SELECT {}.abandon_work_item($1, $2, $3, $4)",
956            self.schema_name
957        ))
958        .bind(token)
959        .bind(now_ms)
960        .bind(delay_param)
961        .bind(ignore_attempt)
962        .execute(&*self.pool)
963        .await
964        {
965            Ok(_) => {
966                let duration_ms = start.elapsed().as_millis() as u64;
967                debug!(
968                    target = "duroxide::providers::postgres",
969                    operation = "abandon_work_item",
970                    token = %token,
971                    delay_ms = delay.map(|d| d.as_millis() as u64),
972                    ignore_attempt = ignore_attempt,
973                    duration_ms = duration_ms,
974                    "Abandoned work item via stored procedure"
975                );
976                Ok(())
977            }
978            Err(e) => {
979                if let SqlxError::Database(db_err) = &e {
980                    if db_err.message().contains("Invalid lock token")
981                        || db_err.message().contains("already acked")
982                    {
983                        return Err(ProviderError::permanent(
984                            "abandon_work_item",
985                            "Invalid lock token or already acked",
986                        ));
987                    }
988                } else if e.to_string().contains("Invalid lock token")
989                    || e.to_string().contains("already acked")
990                {
991                    return Err(ProviderError::permanent(
992                        "abandon_work_item",
993                        "Invalid lock token or already acked",
994                    ));
995                }
996
997                Err(Self::sqlx_to_provider_error("abandon_work_item", e))
998            }
999        }
1000    }
1001
1002    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1003    async fn renew_orchestration_item_lock(
1004        &self,
1005        token: &str,
1006        extend_for: Duration,
1007    ) -> Result<(), ProviderError> {
1008        let start = std::time::Instant::now();
1009
1010        // Get current time from application for consistent time reference
1011        let now_ms = Self::now_millis();
1012
1013        // Convert Duration to seconds for the stored procedure
1014        let extend_secs = extend_for.as_secs() as i64;
1015
1016        match sqlx::query(&format!(
1017            "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
1018            self.schema_name
1019        ))
1020        .bind(token)
1021        .bind(now_ms)
1022        .bind(extend_secs)
1023        .execute(&*self.pool)
1024        .await
1025        {
1026            Ok(_) => {
1027                let duration_ms = start.elapsed().as_millis() as u64;
1028                debug!(
1029                    target = "duroxide::providers::postgres",
1030                    operation = "renew_orchestration_item_lock",
1031                    token = %token,
1032                    extend_for_secs = extend_secs,
1033                    duration_ms = duration_ms,
1034                    "Orchestration item lock renewed successfully"
1035                );
1036                Ok(())
1037            }
1038            Err(e) => {
1039                if let SqlxError::Database(db_err) = &e {
1040                    if db_err.message().contains("Lock token invalid")
1041                        || db_err.message().contains("expired")
1042                        || db_err.message().contains("already released")
1043                    {
1044                        return Err(ProviderError::permanent(
1045                            "renew_orchestration_item_lock",
1046                            "Lock token invalid, expired, or already released",
1047                        ));
1048                    }
1049                } else if e.to_string().contains("Lock token invalid")
1050                    || e.to_string().contains("expired")
1051                    || e.to_string().contains("already released")
1052                {
1053                    return Err(ProviderError::permanent(
1054                        "renew_orchestration_item_lock",
1055                        "Lock token invalid, expired, or already released",
1056                    ));
1057                }
1058
1059                Err(Self::sqlx_to_provider_error(
1060                    "renew_orchestration_item_lock",
1061                    e,
1062                ))
1063            }
1064        }
1065    }
1066
1067    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1068    async fn enqueue_for_orchestrator(
1069        &self,
1070        item: WorkItem,
1071        delay: Option<Duration>,
1072    ) -> Result<(), ProviderError> {
1073        let work_item = serde_json::to_string(&item).map_err(|e| {
1074            ProviderError::permanent(
1075                "enqueue_orchestrator_work",
1076                format!("Failed to serialize work item: {e}"),
1077            )
1078        })?;
1079
1080        // Extract instance ID from WorkItem enum
1081        let instance_id = match &item {
1082            WorkItem::StartOrchestration { instance, .. }
1083            | WorkItem::ActivityCompleted { instance, .. }
1084            | WorkItem::ActivityFailed { instance, .. }
1085            | WorkItem::TimerFired { instance, .. }
1086            | WorkItem::ExternalRaised { instance, .. }
1087            | WorkItem::CancelInstance { instance, .. }
1088            | WorkItem::ContinueAsNew { instance, .. } => instance,
1089            WorkItem::SubOrchCompleted {
1090                parent_instance, ..
1091            }
1092            | WorkItem::SubOrchFailed {
1093                parent_instance, ..
1094            } => parent_instance,
1095            WorkItem::ActivityExecute { .. } => {
1096                return Err(ProviderError::permanent(
1097                    "enqueue_orchestrator_work",
1098                    "ActivityExecute should go to worker queue, not orchestrator queue",
1099                ));
1100            }
1101        };
1102
1103        // Determine visible_at: use max of fire_at_ms (for TimerFired) and delay
1104        let now_ms = Self::now_millis();
1105
1106        let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
1107            if *fire_at_ms > 0 {
1108                // Take max of fire_at_ms and delay (if provided)
1109                if let Some(delay) = delay {
1110                    std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
1111                } else {
1112                    *fire_at_ms
1113                }
1114            } else {
1115                // fire_at_ms is 0, use delay or NOW()
1116                delay
1117                    .map(|d| now_ms as u64 + d.as_millis() as u64)
1118                    .unwrap_or(now_ms as u64)
1119            }
1120        } else {
1121            // Non-timer item: use delay or NOW()
1122            delay
1123                .map(|d| now_ms as u64 + d.as_millis() as u64)
1124                .unwrap_or(now_ms as u64)
1125        };
1126
1127        let visible_at = Utc
1128            .timestamp_millis_opt(visible_at_ms as i64)
1129            .single()
1130            .ok_or_else(|| {
1131                ProviderError::permanent(
1132                    "enqueue_orchestrator_work",
1133                    "Invalid visible_at timestamp",
1134                )
1135            })?;
1136
1137        // ⚠️ CRITICAL: DO NOT extract orchestration metadata - instance creation happens via ack_orchestration_item metadata
1138        // Pass NULL for orchestration_name, orchestration_version, execution_id parameters
1139
1140        // Call stored procedure to enqueue work
1141        sqlx::query(&format!(
1142            "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
1143            self.schema_name
1144        ))
1145        .bind(instance_id)
1146        .bind(&work_item)
1147        .bind(visible_at)
1148        .bind::<Option<String>>(None) // orchestration_name - NULL
1149        .bind::<Option<String>>(None) // orchestration_version - NULL
1150        .bind::<Option<i64>>(None) // execution_id - NULL
1151        .execute(&*self.pool)
1152        .await
1153        .map_err(|e| {
1154            error!(
1155                target = "duroxide::providers::postgres",
1156                operation = "enqueue_orchestrator_work",
1157                error_type = "database_error",
1158                error = %e,
1159                instance_id = %instance_id,
1160                "Failed to enqueue orchestrator work"
1161            );
1162            Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1163        })?;
1164
1165        debug!(
1166            target = "duroxide::providers::postgres",
1167            operation = "enqueue_orchestrator_work",
1168            instance_id = %instance_id,
1169            delay_ms = delay.map(|d| d.as_millis() as u64),
1170            "Enqueued orchestrator work"
1171        );
1172
1173        Ok(())
1174    }
1175
1176    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1177    async fn read_with_execution(
1178        &self,
1179        instance: &str,
1180        execution_id: u64,
1181    ) -> Result<Vec<Event>, ProviderError> {
1182        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1183            "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1184            self.table_name("history")
1185        ))
1186        .bind(instance)
1187        .bind(execution_id as i64)
1188        .fetch_all(&*self.pool)
1189        .await
1190        .ok()
1191        .unwrap_or_default();
1192
1193        Ok(event_data_rows
1194            .into_iter()
1195            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1196            .collect())
1197    }
1198
1199    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1200    async fn renew_session_lock(
1201        &self,
1202        owner_ids: &[&str],
1203        extend_for: Duration,
1204        idle_timeout: Duration,
1205    ) -> Result<usize, ProviderError> {
1206        if owner_ids.is_empty() {
1207            return Ok(0);
1208        }
1209
1210        let now_ms = Self::now_millis();
1211        let extend_ms = extend_for.as_millis() as i64;
1212        let idle_timeout_ms = idle_timeout.as_millis() as i64;
1213        let owner_ids_vec: Vec<&str> = owner_ids.to_vec();
1214
1215        let result = sqlx::query_scalar::<_, i64>(&format!(
1216            "SELECT {}.renew_session_lock($1, $2, $3, $4)",
1217            self.schema_name
1218        ))
1219        .bind(&owner_ids_vec)
1220        .bind(now_ms)
1221        .bind(extend_ms)
1222        .bind(idle_timeout_ms)
1223        .fetch_one(&*self.pool)
1224        .await
1225        .map_err(|e| Self::sqlx_to_provider_error("renew_session_lock", e))?;
1226
1227        debug!(
1228            target = "duroxide::providers::postgres",
1229            operation = "renew_session_lock",
1230            owner_count = owner_ids.len(),
1231            sessions_renewed = result,
1232            "Session locks renewed"
1233        );
1234
1235        Ok(result as usize)
1236    }
1237
1238    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1239    async fn cleanup_orphaned_sessions(
1240        &self,
1241        _idle_timeout: Duration,
1242    ) -> Result<usize, ProviderError> {
1243        let now_ms = Self::now_millis();
1244
1245        let result = sqlx::query_scalar::<_, i64>(&format!(
1246            "SELECT {}.cleanup_orphaned_sessions($1)",
1247            self.schema_name
1248        ))
1249        .bind(now_ms)
1250        .fetch_one(&*self.pool)
1251        .await
1252        .map_err(|e| Self::sqlx_to_provider_error("cleanup_orphaned_sessions", e))?;
1253
1254        debug!(
1255            target = "duroxide::providers::postgres",
1256            operation = "cleanup_orphaned_sessions",
1257            sessions_cleaned = result,
1258            "Orphaned sessions cleaned up"
1259        );
1260
1261        Ok(result as usize)
1262    }
1263
1264    fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1265        Some(self)
1266    }
1267}
1268
1269#[async_trait::async_trait]
1270impl ProviderAdmin for PostgresProvider {
1271    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1272    async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1273        sqlx::query_scalar(&format!(
1274            "SELECT instance_id FROM {}.list_instances()",
1275            self.schema_name
1276        ))
1277        .fetch_all(&*self.pool)
1278        .await
1279        .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1280    }
1281
1282    #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1283    async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1284        sqlx::query_scalar(&format!(
1285            "SELECT instance_id FROM {}.list_instances_by_status($1)",
1286            self.schema_name
1287        ))
1288        .bind(status)
1289        .fetch_all(&*self.pool)
1290        .await
1291        .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1292    }
1293
1294    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1295    async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1296        let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1297            "SELECT execution_id FROM {}.list_executions($1)",
1298            self.schema_name
1299        ))
1300        .bind(instance)
1301        .fetch_all(&*self.pool)
1302        .await
1303        .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1304
1305        Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1306    }
1307
1308    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1309    async fn read_history_with_execution_id(
1310        &self,
1311        instance: &str,
1312        execution_id: u64,
1313    ) -> Result<Vec<Event>, ProviderError> {
1314        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1315            "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1316            self.schema_name
1317        ))
1318        .bind(instance)
1319        .bind(execution_id as i64)
1320        .fetch_all(&*self.pool)
1321        .await
1322        .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1323
1324        event_data_rows
1325            .into_iter()
1326            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1327            .collect::<Vec<Event>>()
1328            .into_iter()
1329            .map(Ok)
1330            .collect()
1331    }
1332
1333    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1334    async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1335        let execution_id = self.latest_execution_id(instance).await?;
1336        self.read_history_with_execution_id(instance, execution_id)
1337            .await
1338    }
1339
1340    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1341    async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1342        sqlx::query_scalar(&format!(
1343            "SELECT {}.latest_execution_id($1)",
1344            self.schema_name
1345        ))
1346        .bind(instance)
1347        .fetch_optional(&*self.pool)
1348        .await
1349        .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1350        .map(|id: i64| id as u64)
1351        .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1352    }
1353
1354    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1355    async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1356        let row: Option<(
1357            String,
1358            String,
1359            String,
1360            i64,
1361            chrono::DateTime<Utc>,
1362            Option<chrono::DateTime<Utc>>,
1363            Option<String>,
1364            Option<String>,
1365            Option<String>,
1366        )> = sqlx::query_as(&format!(
1367            "SELECT * FROM {}.get_instance_info($1)",
1368            self.schema_name
1369        ))
1370        .bind(instance)
1371        .fetch_optional(&*self.pool)
1372        .await
1373        .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1374
1375        let (
1376            instance_id,
1377            orchestration_name,
1378            orchestration_version,
1379            current_execution_id,
1380            created_at,
1381            updated_at,
1382            status,
1383            output,
1384            parent_instance_id,
1385        ) =
1386            row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1387
1388        Ok(InstanceInfo {
1389            instance_id,
1390            orchestration_name,
1391            orchestration_version,
1392            current_execution_id: current_execution_id as u64,
1393            status: status.unwrap_or_else(|| "Running".to_string()),
1394            output,
1395            created_at: created_at.timestamp_millis() as u64,
1396            updated_at: updated_at
1397                .map(|dt| dt.timestamp_millis() as u64)
1398                .unwrap_or(created_at.timestamp_millis() as u64),
1399            parent_instance_id,
1400        })
1401    }
1402
1403    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1404    async fn get_execution_info(
1405        &self,
1406        instance: &str,
1407        execution_id: u64,
1408    ) -> Result<ExecutionInfo, ProviderError> {
1409        let row: Option<(
1410            i64,
1411            String,
1412            Option<String>,
1413            chrono::DateTime<Utc>,
1414            Option<chrono::DateTime<Utc>>,
1415            i64,
1416        )> = sqlx::query_as(&format!(
1417            "SELECT * FROM {}.get_execution_info($1, $2)",
1418            self.schema_name
1419        ))
1420        .bind(instance)
1421        .bind(execution_id as i64)
1422        .fetch_optional(&*self.pool)
1423        .await
1424        .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1425
1426        let (exec_id, status, output, started_at, completed_at, event_count) = row
1427            .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1428
1429        Ok(ExecutionInfo {
1430            execution_id: exec_id as u64,
1431            status,
1432            output,
1433            started_at: started_at.timestamp_millis() as u64,
1434            completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1435            event_count: event_count as usize,
1436        })
1437    }
1438
1439    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1440    async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1441        let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1442            "SELECT * FROM {}.get_system_metrics()",
1443            self.schema_name
1444        ))
1445        .fetch_optional(&*self.pool)
1446        .await
1447        .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1448
1449        let (
1450            total_instances,
1451            total_executions,
1452            running_instances,
1453            completed_instances,
1454            failed_instances,
1455            total_events,
1456        ) = row.ok_or_else(|| {
1457            ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1458        })?;
1459
1460        Ok(SystemMetrics {
1461            total_instances: total_instances as u64,
1462            total_executions: total_executions as u64,
1463            running_instances: running_instances as u64,
1464            completed_instances: completed_instances as u64,
1465            failed_instances: failed_instances as u64,
1466            total_events: total_events as u64,
1467        })
1468    }
1469
1470    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1471    async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1472        let now_ms = Self::now_millis();
1473
1474        let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1475            "SELECT * FROM {}.get_queue_depths($1)",
1476            self.schema_name
1477        ))
1478        .bind(now_ms)
1479        .fetch_optional(&*self.pool)
1480        .await
1481        .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1482
1483        let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1484            ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1485        })?;
1486
1487        Ok(QueueDepths {
1488            orchestrator_queue: orchestrator_queue as usize,
1489            worker_queue: worker_queue as usize,
1490            timer_queue: 0, // Timers are in orchestrator queue with delayed visibility
1491        })
1492    }
1493
1494    // ===== Hierarchy Primitive Operations =====
1495
1496    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1497    async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError> {
1498        sqlx::query_scalar(&format!(
1499            "SELECT child_instance_id FROM {}.list_children($1)",
1500            self.schema_name
1501        ))
1502        .bind(instance_id)
1503        .fetch_all(&*self.pool)
1504        .await
1505        .map_err(|e| Self::sqlx_to_provider_error("list_children", e))
1506    }
1507
1508    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1509    async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError> {
1510        // The stored procedure raises an exception if instance doesn't exist
1511        // Otherwise returns the parent_instance_id (which may be NULL)
1512        let result: Result<Option<String>, _> = sqlx::query_scalar(&format!(
1513            "SELECT {}.get_parent_id($1)",
1514            self.schema_name
1515        ))
1516        .bind(instance_id)
1517        .fetch_one(&*self.pool)
1518        .await;
1519
1520        match result {
1521            Ok(parent_id) => Ok(parent_id),
1522            Err(e) => {
1523                let err_str = e.to_string();
1524                if err_str.contains("Instance not found") {
1525                    Err(ProviderError::permanent(
1526                        "get_parent_id",
1527                        format!("Instance not found: {}", instance_id),
1528                    ))
1529                } else {
1530                    Err(Self::sqlx_to_provider_error("get_parent_id", e))
1531                }
1532            }
1533        }
1534    }
1535
1536    // ===== Deletion Operations =====
1537
1538    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1539    async fn delete_instances_atomic(
1540        &self,
1541        ids: &[String],
1542        force: bool,
1543    ) -> Result<DeleteInstanceResult, ProviderError> {
1544        if ids.is_empty() {
1545            return Ok(DeleteInstanceResult::default());
1546        }
1547
1548        let row: Option<(i64, i64, i64, i64)> = sqlx::query_as(&format!(
1549            "SELECT * FROM {}.delete_instances_atomic($1, $2)",
1550            self.schema_name
1551        ))
1552        .bind(ids)
1553        .bind(force)
1554        .fetch_optional(&*self.pool)
1555        .await
1556        .map_err(|e| {
1557            let err_str = e.to_string();
1558            if err_str.contains("is Running") {
1559                ProviderError::permanent(
1560                    "delete_instances_atomic",
1561                    err_str,
1562                )
1563            } else if err_str.contains("Orphan detected") {
1564                ProviderError::permanent(
1565                    "delete_instances_atomic",
1566                    err_str,
1567                )
1568            } else {
1569                Self::sqlx_to_provider_error("delete_instances_atomic", e)
1570            }
1571        })?;
1572
1573        let (instances_deleted, executions_deleted, events_deleted, queue_messages_deleted) =
1574            row.unwrap_or((0, 0, 0, 0));
1575
1576        debug!(
1577            target = "duroxide::providers::postgres",
1578            operation = "delete_instances_atomic",
1579            instances_deleted = instances_deleted,
1580            executions_deleted = executions_deleted,
1581            events_deleted = events_deleted,
1582            queue_messages_deleted = queue_messages_deleted,
1583            "Deleted instances atomically"
1584        );
1585
1586        Ok(DeleteInstanceResult {
1587            instances_deleted: instances_deleted as u64,
1588            executions_deleted: executions_deleted as u64,
1589            events_deleted: events_deleted as u64,
1590            queue_messages_deleted: queue_messages_deleted as u64,
1591        })
1592    }
1593
1594    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1595    async fn delete_instance_bulk(
1596        &self,
1597        filter: InstanceFilter,
1598    ) -> Result<DeleteInstanceResult, ProviderError> {
1599        // Build query to find matching root instances in terminal states
1600        let mut sql = format!(
1601            r#"
1602            SELECT i.instance_id
1603            FROM {}.instances i
1604            LEFT JOIN {}.executions e ON i.instance_id = e.instance_id 
1605              AND i.current_execution_id = e.execution_id
1606            WHERE i.parent_instance_id IS NULL
1607              AND e.status IN ('Completed', 'Failed', 'ContinuedAsNew')
1608            "#,
1609            self.schema_name, self.schema_name
1610        );
1611
1612        // Add instance_ids filter if provided
1613        if let Some(ref ids) = filter.instance_ids {
1614            if ids.is_empty() {
1615                return Ok(DeleteInstanceResult::default());
1616            }
1617            let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1618            sql.push_str(&format!(
1619                " AND i.instance_id IN ({})",
1620                placeholders.join(", ")
1621            ));
1622        }
1623
1624        // Add completed_before filter if provided
1625        if filter.completed_before.is_some() {
1626            let param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0) + 1;
1627            sql.push_str(&format!(
1628                " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1629                param_num
1630            ));
1631        }
1632
1633        // Add limit
1634        let limit = filter.limit.unwrap_or(1000);
1635        let limit_param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0)
1636            + if filter.completed_before.is_some() { 1 } else { 0 }
1637            + 1;
1638        sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1639
1640        // Build and execute query
1641        let mut query = sqlx::query_scalar::<_, String>(&sql);
1642        if let Some(ref ids) = filter.instance_ids {
1643            for id in ids {
1644                query = query.bind(id);
1645            }
1646        }
1647        if let Some(completed_before) = filter.completed_before {
1648            query = query.bind(completed_before as i64);
1649        }
1650        query = query.bind(limit as i64);
1651
1652        let instance_ids: Vec<String> = query
1653            .fetch_all(&*self.pool)
1654            .await
1655            .map_err(|e| Self::sqlx_to_provider_error("delete_instance_bulk", e))?;
1656
1657        if instance_ids.is_empty() {
1658            return Ok(DeleteInstanceResult::default());
1659        }
1660
1661        // Delete each instance with cascade
1662        let mut result = DeleteInstanceResult::default();
1663
1664        for instance_id in &instance_ids {
1665            // Get full tree for this root
1666            let tree = self.get_instance_tree(instance_id).await?;
1667
1668            // Atomic delete (tree.all_ids is already in deletion order: children first)
1669            let delete_result = self.delete_instances_atomic(&tree.all_ids, true).await?;
1670            result.instances_deleted += delete_result.instances_deleted;
1671            result.executions_deleted += delete_result.executions_deleted;
1672            result.events_deleted += delete_result.events_deleted;
1673            result.queue_messages_deleted += delete_result.queue_messages_deleted;
1674        }
1675
1676        debug!(
1677            target = "duroxide::providers::postgres",
1678            operation = "delete_instance_bulk",
1679            instances_deleted = result.instances_deleted,
1680            executions_deleted = result.executions_deleted,
1681            events_deleted = result.events_deleted,
1682            queue_messages_deleted = result.queue_messages_deleted,
1683            "Bulk deleted instances"
1684        );
1685
1686        Ok(result)
1687    }
1688
1689    // ===== Pruning Operations =====
1690
1691    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1692    async fn prune_executions(
1693        &self,
1694        instance_id: &str,
1695        options: PruneOptions,
1696    ) -> Result<PruneResult, ProviderError> {
1697        let keep_last: Option<i32> = options.keep_last.map(|v| v as i32);
1698        let completed_before_ms: Option<i64> = options.completed_before.map(|v| v as i64);
1699
1700        let row: Option<(i64, i64, i64)> = sqlx::query_as(&format!(
1701            "SELECT * FROM {}.prune_executions($1, $2, $3)",
1702            self.schema_name
1703        ))
1704        .bind(instance_id)
1705        .bind(keep_last)
1706        .bind(completed_before_ms)
1707        .fetch_optional(&*self.pool)
1708        .await
1709        .map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?;
1710
1711        let (instances_processed, executions_deleted, events_deleted) = row.unwrap_or((0, 0, 0));
1712
1713        debug!(
1714            target = "duroxide::providers::postgres",
1715            operation = "prune_executions",
1716            instance_id = %instance_id,
1717            instances_processed = instances_processed,
1718            executions_deleted = executions_deleted,
1719            events_deleted = events_deleted,
1720            "Pruned executions"
1721        );
1722
1723        Ok(PruneResult {
1724            instances_processed: instances_processed as u64,
1725            executions_deleted: executions_deleted as u64,
1726            events_deleted: events_deleted as u64,
1727        })
1728    }
1729
1730    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1731    async fn prune_executions_bulk(
1732        &self,
1733        filter: InstanceFilter,
1734        options: PruneOptions,
1735    ) -> Result<PruneResult, ProviderError> {
1736        // Find matching instances (all statuses - prune_executions protects current execution)
1737        // Note: We include Running instances because long-running orchestrations (e.g., with
1738        // ContinueAsNew) may have old executions that need pruning. The underlying prune_executions
1739        // call safely skips the current execution regardless of its status.
1740        let mut sql = format!(
1741            r#"
1742            SELECT i.instance_id
1743            FROM {}.instances i
1744            LEFT JOIN {}.executions e ON i.instance_id = e.instance_id 
1745              AND i.current_execution_id = e.execution_id
1746            WHERE 1=1
1747            "#,
1748            self.schema_name, self.schema_name
1749        );
1750
1751        // Add instance_ids filter if provided
1752        if let Some(ref ids) = filter.instance_ids {
1753            if ids.is_empty() {
1754                return Ok(PruneResult::default());
1755            }
1756            let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1757            sql.push_str(&format!(
1758                " AND i.instance_id IN ({})",
1759                placeholders.join(", ")
1760            ));
1761        }
1762
1763        // Add completed_before filter if provided
1764        if filter.completed_before.is_some() {
1765            let param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0) + 1;
1766            sql.push_str(&format!(
1767                " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1768                param_num
1769            ));
1770        }
1771
1772        // Add limit
1773        let limit = filter.limit.unwrap_or(1000);
1774        let limit_param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0)
1775            + if filter.completed_before.is_some() { 1 } else { 0 }
1776            + 1;
1777        sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1778
1779        // Build and execute query
1780        let mut query = sqlx::query_scalar::<_, String>(&sql);
1781        if let Some(ref ids) = filter.instance_ids {
1782            for id in ids {
1783                query = query.bind(id);
1784            }
1785        }
1786        if let Some(completed_before) = filter.completed_before {
1787            query = query.bind(completed_before as i64);
1788        }
1789        query = query.bind(limit as i64);
1790
1791        let instance_ids: Vec<String> = query
1792            .fetch_all(&*self.pool)
1793            .await
1794            .map_err(|e| Self::sqlx_to_provider_error("prune_executions_bulk", e))?;
1795
1796        // Prune each instance
1797        let mut result = PruneResult::default();
1798
1799        for instance_id in &instance_ids {
1800            let single_result = self.prune_executions(instance_id, options.clone()).await?;
1801            result.instances_processed += single_result.instances_processed;
1802            result.executions_deleted += single_result.executions_deleted;
1803            result.events_deleted += single_result.events_deleted;
1804        }
1805
1806        debug!(
1807            target = "duroxide::providers::postgres",
1808            operation = "prune_executions_bulk",
1809            instances_processed = result.instances_processed,
1810            executions_deleted = result.executions_deleted,
1811            events_deleted = result.events_deleted,
1812            "Bulk pruned executions"
1813        );
1814
1815        Ok(result)
1816    }
1817}