duroxide_pg/
provider.rs

1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4    ExecutionInfo, ExecutionMetadata, ExecutionState, InstanceInfo, OrchestrationItem, Provider,
5    ProviderAdmin, ProviderError, QueueDepths, SystemMetrics, WorkItem,
6};
7use duroxide::Event;
8use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
9use std::sync::Arc;
10use std::time::Duration;
11use std::time::{SystemTime, UNIX_EPOCH};
12use tokio::time::sleep;
13use tracing::{debug, error, instrument, warn};
14
15use crate::migrations::MigrationRunner;
16
17/// PostgreSQL-based provider for Duroxide durable orchestrations.
18///
19/// Implements the [`Provider`] and [`ProviderAdmin`] traits from Duroxide,
20/// storing orchestration state, history, and work queues in PostgreSQL.
21///
22/// # Example
23///
24/// ```rust,no_run
25/// use duroxide_pg::PostgresProvider;
26///
27/// # async fn example() -> anyhow::Result<()> {
28/// // Connect using DATABASE_URL or explicit connection string
29/// let provider = PostgresProvider::new("postgres://localhost/mydb").await?;
30///
31/// // Or use a custom schema for isolation
32/// let provider = PostgresProvider::new_with_schema(
33///     "postgres://localhost/mydb",
34///     Some("my_app"),
35/// ).await?;
36/// # Ok(())
37/// # }
38/// ```
39pub struct PostgresProvider {
40    pool: Arc<PgPool>,
41    schema_name: String,
42}
43
44impl PostgresProvider {
45    pub async fn new(database_url: &str) -> Result<Self> {
46        Self::new_with_schema(database_url, None).await
47    }
48
49    pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
50        let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
51            .ok()
52            .and_then(|s| s.parse::<u32>().ok())
53            .unwrap_or(10);
54
55        let pool = PgPoolOptions::new()
56            .max_connections(max_connections)
57            .min_connections(1)
58            .acquire_timeout(std::time::Duration::from_secs(30))
59            .connect(database_url)
60            .await?;
61
62        let schema_name = schema_name.unwrap_or("public").to_string();
63
64        let provider = Self {
65            pool: Arc::new(pool),
66            schema_name: schema_name.clone(),
67        };
68
69        // Run migrations to initialize schema
70        let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
71        migration_runner.migrate().await?;
72
73        Ok(provider)
74    }
75
76    #[instrument(skip(self), target = "duroxide::providers::postgres")]
77    pub async fn initialize_schema(&self) -> Result<()> {
78        // Schema initialization is now handled by migrations
79        // This method is kept for backward compatibility but delegates to migrations
80        let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
81        migration_runner.migrate().await?;
82        Ok(())
83    }
84
85    /// Get current timestamp in milliseconds (Unix epoch)
86    fn now_millis() -> i64 {
87        SystemTime::now()
88            .duration_since(UNIX_EPOCH)
89            .unwrap()
90            .as_millis() as i64
91    }
92
93    /// Parse execution state string from stored procedure result
94    /// Format: "Running", "Terminal:status", or "Missing"
95    fn parse_execution_state(state_str: &str) -> ExecutionState {
96        if state_str == "Running" {
97            ExecutionState::Running
98        } else if state_str == "Missing" {
99            ExecutionState::Missing
100        } else if let Some(status) = state_str.strip_prefix("Terminal:") {
101            ExecutionState::Terminal {
102                status: status.to_string(),
103            }
104        } else {
105            // Default to Running if unrecognized (shouldn't happen)
106            warn!(
107                target = "duroxide::providers::postgres",
108                state = %state_str,
109                "Unrecognized execution state, defaulting to Running"
110            );
111            ExecutionState::Running
112        }
113    }
114
115    /// Get schema-qualified table name
116    fn table_name(&self, table: &str) -> String {
117        format!("{}.{}", self.schema_name, table)
118    }
119
120    /// Get the database pool (for testing)
121    pub fn pool(&self) -> &PgPool {
122        &self.pool
123    }
124
125    /// Get the schema name (for testing)
126    pub fn schema_name(&self) -> &str {
127        &self.schema_name
128    }
129
130    /// Convert sqlx::Error to ProviderError with proper classification
131    fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
132        match e {
133            SqlxError::Database(ref db_err) => {
134                // PostgreSQL error codes
135                let code_opt = db_err.code();
136                let code = code_opt.as_deref();
137                if code == Some("40P01") {
138                    // Deadlock detected
139                    ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
140                } else if code == Some("40001") {
141                    // Serialization failure - permanent error (transaction conflict, not transient)
142                    ProviderError::permanent(operation, format!("Serialization failure: {e}"))
143                } else if code == Some("23505") {
144                    // Unique constraint violation (duplicate event)
145                    ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
146                } else if code == Some("23503") {
147                    // Foreign key constraint violation
148                    ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
149                } else {
150                    ProviderError::permanent(operation, format!("Database error: {e}"))
151                }
152            }
153            SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
154                ProviderError::retryable(operation, format!("Connection pool error: {e}"))
155            }
156            SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
157            _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
158        }
159    }
160
161    /// Clean up schema after tests (drops all tables and optionally the schema)
162    ///
163    /// **SAFETY**: Never drops the "public" schema itself, only tables within it.
164    /// Only drops the schema if it's a custom schema (not "public").
165    pub async fn cleanup_schema(&self) -> Result<()> {
166        // Call the stored procedure to drop all tables
167        sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
168            .execute(&*self.pool)
169            .await?;
170
171        // SAFETY: Never drop the "public" schema - it's a PostgreSQL system schema
172        // Only drop custom schemas created for testing
173        if self.schema_name != "public" {
174            sqlx::query(&format!(
175                "DROP SCHEMA IF EXISTS {} CASCADE",
176                self.schema_name
177            ))
178            .execute(&*self.pool)
179            .await?;
180        } else {
181            // Explicit safeguard: we only drop tables from public schema, never the schema itself
182            // This ensures we don't accidentally drop the default PostgreSQL schema
183        }
184
185        Ok(())
186    }
187}
188
189#[async_trait::async_trait]
190impl Provider for PostgresProvider {
191    fn name(&self) -> &str {
192        "duroxide-pg"
193    }
194
195    fn version(&self) -> &str {
196        env!("CARGO_PKG_VERSION")
197    }
198
199    #[instrument(skip(self), target = "duroxide::providers::postgres")]
200    async fn fetch_orchestration_item(
201        &self,
202        lock_timeout: Duration,
203        _poll_timeout: Duration,
204    ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
205        let start = std::time::Instant::now();
206
207        const MAX_RETRIES: u32 = 3;
208        const RETRY_DELAY_MS: u64 = 50;
209
210        // Convert Duration to milliseconds
211        let lock_timeout_ms = lock_timeout.as_millis() as i64;
212        let mut _last_error: Option<ProviderError> = None;
213
214        for attempt in 0..=MAX_RETRIES {
215            let now_ms = Self::now_millis();
216
217            let result: Result<
218                Option<(
219                    String,
220                    String,
221                    String,
222                    i64,
223                    serde_json::Value,
224                    serde_json::Value,
225                    String,
226                    i32,
227                )>,
228                SqlxError,
229            > = sqlx::query_as(&format!(
230                "SELECT * FROM {}.fetch_orchestration_item($1, $2)",
231                self.schema_name
232            ))
233            .bind(now_ms)
234            .bind(lock_timeout_ms)
235            .fetch_optional(&*self.pool)
236            .await;
237
238            let row = match result {
239                Ok(r) => r,
240                Err(e) => {
241                    let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
242                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
243                        warn!(
244                            target = "duroxide::providers::postgres",
245                            operation = "fetch_orchestration_item",
246                            attempt = attempt + 1,
247                            error = %provider_err,
248                            "Retryable error, will retry"
249                        );
250                        _last_error = Some(provider_err);
251                        sleep(std::time::Duration::from_millis(
252                            RETRY_DELAY_MS * (attempt as u64 + 1),
253                        ))
254                        .await;
255                        continue;
256                    }
257                    return Err(provider_err);
258                }
259            };
260
261            if let Some((
262                instance_id,
263                orchestration_name,
264                orchestration_version,
265                execution_id,
266                history_json,
267                messages_json,
268                lock_token,
269                attempt_count,
270            )) = row
271            {
272                let history: Vec<Event> = serde_json::from_value(history_json).map_err(|e| {
273                    ProviderError::permanent(
274                        "fetch_orchestration_item",
275                        format!("Failed to deserialize history: {e}"),
276                    )
277                })?;
278
279                let messages: Vec<WorkItem> =
280                    serde_json::from_value(messages_json).map_err(|e| {
281                        ProviderError::permanent(
282                            "fetch_orchestration_item",
283                            format!("Failed to deserialize messages: {e}"),
284                        )
285                    })?;
286
287                let duration_ms = start.elapsed().as_millis() as u64;
288                debug!(
289                    target = "duroxide::providers::postgres",
290                    operation = "fetch_orchestration_item",
291                    instance_id = %instance_id,
292                    execution_id = execution_id,
293                    message_count = messages.len(),
294                    history_count = history.len(),
295                    attempt_count = attempt_count,
296                    duration_ms = duration_ms,
297                    attempts = attempt + 1,
298                    "Fetched orchestration item via stored procedure"
299                );
300
301                return Ok(Some((
302                    OrchestrationItem {
303                        instance: instance_id,
304                        orchestration_name,
305                        execution_id: execution_id as u64,
306                        version: orchestration_version,
307                        history,
308                        messages,
309                    },
310                    lock_token,
311                    attempt_count as u32,
312                )));
313            }
314
315            if attempt < MAX_RETRIES {
316                sleep(std::time::Duration::from_millis(RETRY_DELAY_MS)).await;
317            }
318        }
319
320        Ok(None)
321    }
322    #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
323    async fn ack_orchestration_item(
324        &self,
325        lock_token: &str,
326        execution_id: u64,
327        history_delta: Vec<Event>,
328        worker_items: Vec<WorkItem>,
329        orchestrator_items: Vec<WorkItem>,
330        metadata: ExecutionMetadata,
331    ) -> Result<(), ProviderError> {
332        let start = std::time::Instant::now();
333
334        const MAX_RETRIES: u32 = 3;
335        const RETRY_DELAY_MS: u64 = 50;
336
337        let mut history_delta_payload = Vec::with_capacity(history_delta.len());
338        for event in &history_delta {
339            if event.event_id() == 0 {
340                return Err(ProviderError::permanent(
341                    "ack_orchestration_item",
342                    "event_id must be set by runtime",
343                ));
344            }
345
346            let event_json = serde_json::to_string(event).map_err(|e| {
347                ProviderError::permanent(
348                    "ack_orchestration_item",
349                    format!("Failed to serialize event: {e}"),
350                )
351            })?;
352
353            let event_type = format!("{event:?}")
354                .split('{')
355                .next()
356                .unwrap_or("Unknown")
357                .trim()
358                .to_string();
359
360            history_delta_payload.push(serde_json::json!({
361                "event_id": event.event_id(),
362                "event_type": event_type,
363                "event_data": event_json,
364            }));
365        }
366
367        let history_delta_json = serde_json::Value::Array(history_delta_payload);
368
369        let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
370            ProviderError::permanent(
371                "ack_orchestration_item",
372                format!("Failed to serialize worker items: {e}"),
373            )
374        })?;
375
376        let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
377            ProviderError::permanent(
378                "ack_orchestration_item",
379                format!("Failed to serialize orchestrator items: {e}"),
380            )
381        })?;
382
383        let metadata_json = serde_json::json!({
384            "orchestration_name": metadata.orchestration_name,
385            "orchestration_version": metadata.orchestration_version,
386            "status": metadata.status,
387            "output": metadata.output,
388        });
389
390        for attempt in 0..=MAX_RETRIES {
391            let result = sqlx::query(&format!(
392                "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6)",
393                self.schema_name
394            ))
395            .bind(lock_token)
396            .bind(execution_id as i64)
397            .bind(&history_delta_json)
398            .bind(&worker_items_json)
399            .bind(&orchestrator_items_json)
400            .bind(&metadata_json)
401            .execute(&*self.pool)
402            .await;
403
404            match result {
405                Ok(_) => {
406                    let duration_ms = start.elapsed().as_millis() as u64;
407                    debug!(
408                        target = "duroxide::providers::postgres",
409                        operation = "ack_orchestration_item",
410                        execution_id = execution_id,
411                        history_count = history_delta.len(),
412                        worker_items_count = worker_items.len(),
413                        orchestrator_items_count = orchestrator_items.len(),
414                        duration_ms = duration_ms,
415                        attempts = attempt + 1,
416                        "Acknowledged orchestration item via stored procedure"
417                    );
418                    return Ok(());
419                }
420                Err(e) => {
421                    // Check for permanent errors first
422                    if let SqlxError::Database(db_err) = &e {
423                        if db_err.message().contains("Invalid lock token") {
424                            return Err(ProviderError::permanent(
425                                "ack_orchestration_item",
426                                "Invalid lock token",
427                            ));
428                        }
429                    } else if e.to_string().contains("Invalid lock token") {
430                        return Err(ProviderError::permanent(
431                            "ack_orchestration_item",
432                            "Invalid lock token",
433                        ));
434                    }
435
436                    let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
437                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
438                        warn!(
439                            target = "duroxide::providers::postgres",
440                            operation = "ack_orchestration_item",
441                            attempt = attempt + 1,
442                            error = %provider_err,
443                            "Retryable error, will retry"
444                        );
445                        sleep(std::time::Duration::from_millis(
446                            RETRY_DELAY_MS * (attempt as u64 + 1),
447                        ))
448                        .await;
449                        continue;
450                    }
451                    return Err(provider_err);
452                }
453            }
454        }
455
456        // Should never reach here, but just in case
457        Ok(())
458    }
459    #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
460    async fn abandon_orchestration_item(
461        &self,
462        lock_token: &str,
463        delay: Option<Duration>,
464        ignore_attempt: bool,
465    ) -> Result<(), ProviderError> {
466        let start = std::time::Instant::now();
467        let now_ms = Self::now_millis();
468        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
469
470        let instance_id = match sqlx::query_scalar::<_, String>(&format!(
471            "SELECT {}.abandon_orchestration_item($1, $2, $3, $4)",
472            self.schema_name
473        ))
474        .bind(lock_token)
475        .bind(now_ms)
476        .bind(delay_param)
477        .bind(ignore_attempt)
478        .fetch_one(&*self.pool)
479        .await
480        {
481            Ok(instance_id) => instance_id,
482            Err(e) => {
483                if let SqlxError::Database(db_err) = &e {
484                    if db_err.message().contains("Invalid lock token") {
485                        return Err(ProviderError::permanent(
486                            "abandon_orchestration_item",
487                            "Invalid lock token",
488                        ));
489                    }
490                } else if e.to_string().contains("Invalid lock token") {
491                    return Err(ProviderError::permanent(
492                        "abandon_orchestration_item",
493                        "Invalid lock token",
494                    ));
495                }
496
497                return Err(Self::sqlx_to_provider_error(
498                    "abandon_orchestration_item",
499                    e,
500                ));
501            }
502        };
503
504        let duration_ms = start.elapsed().as_millis() as u64;
505        debug!(
506            target = "duroxide::providers::postgres",
507            operation = "abandon_orchestration_item",
508            instance_id = %instance_id,
509            delay_ms = delay.map(|d| d.as_millis() as u64),
510            ignore_attempt = ignore_attempt,
511            duration_ms = duration_ms,
512            "Abandoned orchestration item via stored procedure"
513        );
514
515        Ok(())
516    }
517
518    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
519    async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
520        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
521            "SELECT out_event_data FROM {}.fetch_history($1)",
522            self.schema_name
523        ))
524        .bind(instance)
525        .fetch_all(&*self.pool)
526        .await
527        .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
528
529        Ok(event_data_rows
530            .into_iter()
531            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
532            .collect())
533    }
534
535    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
536    async fn append_with_execution(
537        &self,
538        instance: &str,
539        execution_id: u64,
540        new_events: Vec<Event>,
541    ) -> Result<(), ProviderError> {
542        if new_events.is_empty() {
543            return Ok(());
544        }
545
546        let mut events_payload = Vec::with_capacity(new_events.len());
547        for event in &new_events {
548            if event.event_id() == 0 {
549                error!(
550                    target = "duroxide::providers::postgres",
551                    operation = "append_with_execution",
552                    error_type = "validation_error",
553                    instance_id = %instance,
554                    execution_id = execution_id,
555                    "event_id must be set by runtime"
556                );
557                return Err(ProviderError::permanent(
558                    "append_with_execution",
559                    "event_id must be set by runtime",
560                ));
561            }
562
563            let event_json = serde_json::to_string(event).map_err(|e| {
564                ProviderError::permanent(
565                    "append_with_execution",
566                    format!("Failed to serialize event: {e}"),
567                )
568            })?;
569
570            let event_type = format!("{event:?}")
571                .split('{')
572                .next()
573                .unwrap_or("Unknown")
574                .trim()
575                .to_string();
576
577            events_payload.push(serde_json::json!({
578                "event_id": event.event_id(),
579                "event_type": event_type,
580                "event_data": event_json,
581            }));
582        }
583
584        let events_json = serde_json::Value::Array(events_payload);
585
586        sqlx::query(&format!(
587            "SELECT {}.append_history($1, $2, $3)",
588            self.schema_name
589        ))
590        .bind(instance)
591        .bind(execution_id as i64)
592        .bind(events_json)
593        .execute(&*self.pool)
594        .await
595        .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
596
597        debug!(
598            target = "duroxide::providers::postgres",
599            operation = "append_with_execution",
600            instance_id = %instance,
601            execution_id = execution_id,
602            event_count = new_events.len(),
603            "Appended history events via stored procedure"
604        );
605
606        Ok(())
607    }
608
609    #[instrument(skip(self), target = "duroxide::providers::postgres")]
610    async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
611        let work_item = serde_json::to_string(&item).map_err(|e| {
612            ProviderError::permanent(
613                "enqueue_worker_work",
614                format!("Failed to serialize work item: {e}"),
615            )
616        })?;
617
618        let now_ms = Self::now_millis();
619
620        sqlx::query(&format!(
621            "SELECT {}.enqueue_worker_work($1, $2)",
622            self.schema_name
623        ))
624        .bind(work_item)
625        .bind(now_ms)
626        .execute(&*self.pool)
627        .await
628        .map_err(|e| {
629            error!(
630                target = "duroxide::providers::postgres",
631                operation = "enqueue_worker_work",
632                error_type = "database_error",
633                error = %e,
634                "Failed to enqueue worker work"
635            );
636            Self::sqlx_to_provider_error("enqueue_worker_work", e)
637        })?;
638
639        Ok(())
640    }
641
642    #[instrument(skip(self), target = "duroxide::providers::postgres")]
643    async fn fetch_work_item(
644        &self,
645        lock_timeout: Duration,
646        _poll_timeout: Duration,
647    ) -> Result<Option<(WorkItem, String, u32, ExecutionState)>, ProviderError> {
648        let start = std::time::Instant::now();
649
650        // Convert Duration to milliseconds
651        let lock_timeout_ms = lock_timeout.as_millis() as i64;
652
653        let row = match sqlx::query_as::<_, (String, String, i32, String)>(&format!(
654            "SELECT * FROM {}.fetch_work_item($1, $2)",
655            self.schema_name
656        ))
657        .bind(Self::now_millis())
658        .bind(lock_timeout_ms)
659        .fetch_optional(&*self.pool)
660        .await
661        {
662            Ok(row) => row,
663            Err(e) => {
664                return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
665            }
666        };
667
668        let (work_item_json, lock_token, attempt_count, execution_state_str) = match row {
669            Some(row) => row,
670            None => return Ok(None),
671        };
672
673        let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
674            ProviderError::permanent(
675                "fetch_work_item",
676                format!("Failed to deserialize worker item: {e}"),
677            )
678        })?;
679
680        // Parse execution state from stored procedure result
681        let execution_state = Self::parse_execution_state(&execution_state_str);
682
683        let duration_ms = start.elapsed().as_millis() as u64;
684
685        // Extract instance for logging - different work item types have different structures
686        let instance_id = match &work_item {
687            WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
688            WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
689            WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
690            WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
691            WorkItem::TimerFired { instance, .. } => instance.as_str(),
692            WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
693            WorkItem::CancelInstance { instance, .. } => instance.as_str(),
694            WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
695            WorkItem::SubOrchCompleted {
696                parent_instance, ..
697            } => parent_instance.as_str(),
698            WorkItem::SubOrchFailed {
699                parent_instance, ..
700            } => parent_instance.as_str(),
701        };
702
703        debug!(
704            target = "duroxide::providers::postgres",
705            operation = "fetch_work_item",
706            instance_id = %instance_id,
707            attempt_count = attempt_count,
708            execution_state = ?execution_state,
709            duration_ms = duration_ms,
710            "Fetched activity work item via stored procedure"
711        );
712
713        Ok(Some((
714            work_item,
715            lock_token,
716            attempt_count as u32,
717            execution_state,
718        )))
719    }
720
721    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
722    async fn ack_work_item(
723        &self,
724        token: &str,
725        completion: Option<WorkItem>,
726    ) -> Result<(), ProviderError> {
727        let start = std::time::Instant::now();
728
729        // If no completion provided (e.g., cancelled activity), just delete the item
730        let Some(completion) = completion else {
731            // Call ack_worker with NULL completion to delete without enqueueing
732            sqlx::query(&format!(
733                "SELECT {}.ack_worker($1)",
734                self.schema_name
735            ))
736            .bind(token)
737            .execute(&*self.pool)
738            .await
739            .map_err(|e| {
740                if e.to_string().contains("Worker queue item not found") {
741                    ProviderError::permanent(
742                        "ack_worker",
743                        "Worker queue item not found or already processed",
744                    )
745                } else {
746                    Self::sqlx_to_provider_error("ack_worker", e)
747                }
748            })?;
749
750            let duration_ms = start.elapsed().as_millis() as u64;
751            debug!(
752                target = "duroxide::providers::postgres",
753                operation = "ack_worker",
754                token = %token,
755                duration_ms = duration_ms,
756                "Acknowledged worker without completion (cancelled)"
757            );
758            return Ok(());
759        };
760
761        // Extract instance ID from completion WorkItem
762        let instance_id = match &completion {
763            WorkItem::ActivityCompleted { instance, .. }
764            | WorkItem::ActivityFailed { instance, .. } => instance,
765            _ => {
766                error!(
767                    target = "duroxide::providers::postgres",
768                    operation = "ack_worker",
769                    error_type = "invalid_completion_type",
770                    "Invalid completion work item type"
771                );
772                return Err(ProviderError::permanent(
773                    "ack_worker",
774                    "Invalid completion work item type",
775                ));
776            }
777        };
778
779        let completion_json = serde_json::to_string(&completion).map_err(|e| {
780            ProviderError::permanent("ack_worker", format!("Failed to serialize completion: {e}"))
781        })?;
782
783        let now_ms = Self::now_millis();
784
785        // Call stored procedure to atomically delete worker item and enqueue completion
786        sqlx::query(&format!(
787            "SELECT {}.ack_worker($1, $2, $3, $4)",
788            self.schema_name
789        ))
790        .bind(token)
791        .bind(instance_id)
792        .bind(completion_json)
793        .bind(now_ms)
794        .execute(&*self.pool)
795        .await
796        .map_err(|e| {
797            if e.to_string().contains("Worker queue item not found") {
798                error!(
799                    target = "duroxide::providers::postgres",
800                    operation = "ack_worker",
801                    error_type = "worker_item_not_found",
802                    token = %token,
803                    "Worker queue item not found or already processed"
804                );
805                ProviderError::permanent(
806                    "ack_worker",
807                    "Worker queue item not found or already processed",
808                )
809            } else {
810                Self::sqlx_to_provider_error("ack_worker", e)
811            }
812        })?;
813
814        let duration_ms = start.elapsed().as_millis() as u64;
815        debug!(
816            target = "duroxide::providers::postgres",
817            operation = "ack_worker",
818            instance_id = %instance_id,
819            duration_ms = duration_ms,
820            "Acknowledged worker and enqueued completion"
821        );
822
823        Ok(())
824    }
825
826    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
827    async fn renew_work_item_lock(
828        &self,
829        token: &str,
830        extend_for: Duration,
831    ) -> Result<ExecutionState, ProviderError> {
832        let start = std::time::Instant::now();
833
834        // Get current time from application for consistent time reference
835        let now_ms = Self::now_millis();
836
837        // Convert Duration to seconds for the stored procedure
838        let extend_secs = extend_for.as_secs() as i64;
839
840        match sqlx::query_as::<_, (String,)>(&format!(
841            "SELECT {}.renew_work_item_lock($1, $2, $3)",
842            self.schema_name
843        ))
844        .bind(token)
845        .bind(now_ms)
846        .bind(extend_secs)
847        .fetch_one(&*self.pool)
848        .await
849        {
850            Ok((execution_state_str,)) => {
851                let execution_state = Self::parse_execution_state(&execution_state_str);
852                let duration_ms = start.elapsed().as_millis() as u64;
853                debug!(
854                    target = "duroxide::providers::postgres",
855                    operation = "renew_work_item_lock",
856                    token = %token,
857                    extend_for_secs = extend_secs,
858                    execution_state = ?execution_state,
859                    duration_ms = duration_ms,
860                    "Work item lock renewed successfully"
861                );
862                Ok(execution_state)
863            }
864            Err(e) => {
865                if let SqlxError::Database(db_err) = &e {
866                    if db_err.message().contains("Lock token invalid") {
867                        return Err(ProviderError::permanent(
868                            "renew_work_item_lock",
869                            "Lock token invalid, expired, or already acked",
870                        ));
871                    }
872                } else if e.to_string().contains("Lock token invalid") {
873                    return Err(ProviderError::permanent(
874                        "renew_work_item_lock",
875                        "Lock token invalid, expired, or already acked",
876                    ));
877                }
878
879                Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
880            }
881        }
882    }
883
884    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
885    async fn abandon_work_item(
886        &self,
887        token: &str,
888        delay: Option<Duration>,
889        ignore_attempt: bool,
890    ) -> Result<(), ProviderError> {
891        let start = std::time::Instant::now();
892        let now_ms = Self::now_millis();
893        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
894
895        match sqlx::query(&format!(
896            "SELECT {}.abandon_work_item($1, $2, $3, $4)",
897            self.schema_name
898        ))
899        .bind(token)
900        .bind(now_ms)
901        .bind(delay_param)
902        .bind(ignore_attempt)
903        .execute(&*self.pool)
904        .await
905        {
906            Ok(_) => {
907                let duration_ms = start.elapsed().as_millis() as u64;
908                debug!(
909                    target = "duroxide::providers::postgres",
910                    operation = "abandon_work_item",
911                    token = %token,
912                    delay_ms = delay.map(|d| d.as_millis() as u64),
913                    ignore_attempt = ignore_attempt,
914                    duration_ms = duration_ms,
915                    "Abandoned work item via stored procedure"
916                );
917                Ok(())
918            }
919            Err(e) => {
920                if let SqlxError::Database(db_err) = &e {
921                    if db_err.message().contains("Invalid lock token")
922                        || db_err.message().contains("already acked")
923                    {
924                        return Err(ProviderError::permanent(
925                            "abandon_work_item",
926                            "Invalid lock token or already acked",
927                        ));
928                    }
929                } else if e.to_string().contains("Invalid lock token")
930                    || e.to_string().contains("already acked")
931                {
932                    return Err(ProviderError::permanent(
933                        "abandon_work_item",
934                        "Invalid lock token or already acked",
935                    ));
936                }
937
938                Err(Self::sqlx_to_provider_error("abandon_work_item", e))
939            }
940        }
941    }
942
943    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
944    async fn renew_orchestration_item_lock(
945        &self,
946        token: &str,
947        extend_for: Duration,
948    ) -> Result<(), ProviderError> {
949        let start = std::time::Instant::now();
950
951        // Get current time from application for consistent time reference
952        let now_ms = Self::now_millis();
953
954        // Convert Duration to seconds for the stored procedure
955        let extend_secs = extend_for.as_secs() as i64;
956
957        match sqlx::query(&format!(
958            "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
959            self.schema_name
960        ))
961        .bind(token)
962        .bind(now_ms)
963        .bind(extend_secs)
964        .execute(&*self.pool)
965        .await
966        {
967            Ok(_) => {
968                let duration_ms = start.elapsed().as_millis() as u64;
969                debug!(
970                    target = "duroxide::providers::postgres",
971                    operation = "renew_orchestration_item_lock",
972                    token = %token,
973                    extend_for_secs = extend_secs,
974                    duration_ms = duration_ms,
975                    "Orchestration item lock renewed successfully"
976                );
977                Ok(())
978            }
979            Err(e) => {
980                if let SqlxError::Database(db_err) = &e {
981                    if db_err.message().contains("Lock token invalid")
982                        || db_err.message().contains("expired")
983                        || db_err.message().contains("already released")
984                    {
985                        return Err(ProviderError::permanent(
986                            "renew_orchestration_item_lock",
987                            "Lock token invalid, expired, or already released",
988                        ));
989                    }
990                } else if e.to_string().contains("Lock token invalid")
991                    || e.to_string().contains("expired")
992                    || e.to_string().contains("already released")
993                {
994                    return Err(ProviderError::permanent(
995                        "renew_orchestration_item_lock",
996                        "Lock token invalid, expired, or already released",
997                    ));
998                }
999
1000                Err(Self::sqlx_to_provider_error(
1001                    "renew_orchestration_item_lock",
1002                    e,
1003                ))
1004            }
1005        }
1006    }
1007
1008    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1009    async fn enqueue_for_orchestrator(
1010        &self,
1011        item: WorkItem,
1012        delay: Option<Duration>,
1013    ) -> Result<(), ProviderError> {
1014        let work_item = serde_json::to_string(&item).map_err(|e| {
1015            ProviderError::permanent(
1016                "enqueue_orchestrator_work",
1017                format!("Failed to serialize work item: {e}"),
1018            )
1019        })?;
1020
1021        // Extract instance ID from WorkItem enum
1022        let instance_id = match &item {
1023            WorkItem::StartOrchestration { instance, .. }
1024            | WorkItem::ActivityCompleted { instance, .. }
1025            | WorkItem::ActivityFailed { instance, .. }
1026            | WorkItem::TimerFired { instance, .. }
1027            | WorkItem::ExternalRaised { instance, .. }
1028            | WorkItem::CancelInstance { instance, .. }
1029            | WorkItem::ContinueAsNew { instance, .. } => instance,
1030            WorkItem::SubOrchCompleted {
1031                parent_instance, ..
1032            }
1033            | WorkItem::SubOrchFailed {
1034                parent_instance, ..
1035            } => parent_instance,
1036            WorkItem::ActivityExecute { .. } => {
1037                return Err(ProviderError::permanent(
1038                    "enqueue_orchestrator_work",
1039                    "ActivityExecute should go to worker queue, not orchestrator queue",
1040                ));
1041            }
1042        };
1043
1044        // Determine visible_at: use max of fire_at_ms (for TimerFired) and delay
1045        let now_ms = Self::now_millis();
1046
1047        let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
1048            if *fire_at_ms > 0 {
1049                // Take max of fire_at_ms and delay (if provided)
1050                if let Some(delay) = delay {
1051                    std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
1052                } else {
1053                    *fire_at_ms
1054                }
1055            } else {
1056                // fire_at_ms is 0, use delay or NOW()
1057                delay
1058                    .map(|d| now_ms as u64 + d.as_millis() as u64)
1059                    .unwrap_or(now_ms as u64)
1060            }
1061        } else {
1062            // Non-timer item: use delay or NOW()
1063            delay
1064                .map(|d| now_ms as u64 + d.as_millis() as u64)
1065                .unwrap_or(now_ms as u64)
1066        };
1067
1068        let visible_at = Utc
1069            .timestamp_millis_opt(visible_at_ms as i64)
1070            .single()
1071            .ok_or_else(|| {
1072                ProviderError::permanent(
1073                    "enqueue_orchestrator_work",
1074                    "Invalid visible_at timestamp",
1075                )
1076            })?;
1077
1078        // ⚠️ CRITICAL: DO NOT extract orchestration metadata - instance creation happens via ack_orchestration_item metadata
1079        // Pass NULL for orchestration_name, orchestration_version, execution_id parameters
1080
1081        // Call stored procedure to enqueue work
1082        sqlx::query(&format!(
1083            "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
1084            self.schema_name
1085        ))
1086        .bind(instance_id)
1087        .bind(&work_item)
1088        .bind(visible_at)
1089        .bind::<Option<String>>(None) // orchestration_name - NULL
1090        .bind::<Option<String>>(None) // orchestration_version - NULL
1091        .bind::<Option<i64>>(None) // execution_id - NULL
1092        .execute(&*self.pool)
1093        .await
1094        .map_err(|e| {
1095            error!(
1096                target = "duroxide::providers::postgres",
1097                operation = "enqueue_orchestrator_work",
1098                error_type = "database_error",
1099                error = %e,
1100                instance_id = %instance_id,
1101                "Failed to enqueue orchestrator work"
1102            );
1103            Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1104        })?;
1105
1106        debug!(
1107            target = "duroxide::providers::postgres",
1108            operation = "enqueue_orchestrator_work",
1109            instance_id = %instance_id,
1110            delay_ms = delay.map(|d| d.as_millis() as u64),
1111            "Enqueued orchestrator work"
1112        );
1113
1114        Ok(())
1115    }
1116
1117    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1118    async fn read_with_execution(
1119        &self,
1120        instance: &str,
1121        execution_id: u64,
1122    ) -> Result<Vec<Event>, ProviderError> {
1123        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1124            "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1125            self.table_name("history")
1126        ))
1127        .bind(instance)
1128        .bind(execution_id as i64)
1129        .fetch_all(&*self.pool)
1130        .await
1131        .ok()
1132        .unwrap_or_default();
1133
1134        Ok(event_data_rows
1135            .into_iter()
1136            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1137            .collect())
1138    }
1139
1140    fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1141        Some(self)
1142    }
1143}
1144
1145#[async_trait::async_trait]
1146impl ProviderAdmin for PostgresProvider {
1147    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1148    async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1149        sqlx::query_scalar(&format!(
1150            "SELECT instance_id FROM {}.list_instances()",
1151            self.schema_name
1152        ))
1153        .fetch_all(&*self.pool)
1154        .await
1155        .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1156    }
1157
1158    #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1159    async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1160        sqlx::query_scalar(&format!(
1161            "SELECT instance_id FROM {}.list_instances_by_status($1)",
1162            self.schema_name
1163        ))
1164        .bind(status)
1165        .fetch_all(&*self.pool)
1166        .await
1167        .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1168    }
1169
1170    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1171    async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1172        let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1173            "SELECT execution_id FROM {}.list_executions($1)",
1174            self.schema_name
1175        ))
1176        .bind(instance)
1177        .fetch_all(&*self.pool)
1178        .await
1179        .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1180
1181        Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1182    }
1183
1184    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1185    async fn read_history_with_execution_id(
1186        &self,
1187        instance: &str,
1188        execution_id: u64,
1189    ) -> Result<Vec<Event>, ProviderError> {
1190        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1191            "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1192            self.schema_name
1193        ))
1194        .bind(instance)
1195        .bind(execution_id as i64)
1196        .fetch_all(&*self.pool)
1197        .await
1198        .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1199
1200        event_data_rows
1201            .into_iter()
1202            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1203            .collect::<Vec<Event>>()
1204            .into_iter()
1205            .map(Ok)
1206            .collect()
1207    }
1208
1209    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1210    async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1211        let execution_id = self.latest_execution_id(instance).await?;
1212        self.read_history_with_execution_id(instance, execution_id)
1213            .await
1214    }
1215
1216    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1217    async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1218        sqlx::query_scalar(&format!(
1219            "SELECT {}.latest_execution_id($1)",
1220            self.schema_name
1221        ))
1222        .bind(instance)
1223        .fetch_optional(&*self.pool)
1224        .await
1225        .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1226        .map(|id: i64| id as u64)
1227        .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1228    }
1229
1230    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1231    async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1232        let row: Option<(
1233            String,
1234            String,
1235            String,
1236            i64,
1237            chrono::DateTime<Utc>,
1238            Option<chrono::DateTime<Utc>>,
1239            Option<String>,
1240            Option<String>,
1241        )> = sqlx::query_as(&format!(
1242            "SELECT * FROM {}.get_instance_info($1)",
1243            self.schema_name
1244        ))
1245        .bind(instance)
1246        .fetch_optional(&*self.pool)
1247        .await
1248        .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1249
1250        let (
1251            instance_id,
1252            orchestration_name,
1253            orchestration_version,
1254            current_execution_id,
1255            created_at,
1256            updated_at,
1257            status,
1258            output,
1259        ) =
1260            row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1261
1262        Ok(InstanceInfo {
1263            instance_id,
1264            orchestration_name,
1265            orchestration_version,
1266            current_execution_id: current_execution_id as u64,
1267            status: status.unwrap_or_else(|| "Running".to_string()),
1268            output,
1269            created_at: created_at.timestamp_millis() as u64,
1270            updated_at: updated_at
1271                .map(|dt| dt.timestamp_millis() as u64)
1272                .unwrap_or(created_at.timestamp_millis() as u64),
1273        })
1274    }
1275
1276    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1277    async fn get_execution_info(
1278        &self,
1279        instance: &str,
1280        execution_id: u64,
1281    ) -> Result<ExecutionInfo, ProviderError> {
1282        let row: Option<(
1283            i64,
1284            String,
1285            Option<String>,
1286            chrono::DateTime<Utc>,
1287            Option<chrono::DateTime<Utc>>,
1288            i64,
1289        )> = sqlx::query_as(&format!(
1290            "SELECT * FROM {}.get_execution_info($1, $2)",
1291            self.schema_name
1292        ))
1293        .bind(instance)
1294        .bind(execution_id as i64)
1295        .fetch_optional(&*self.pool)
1296        .await
1297        .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1298
1299        let (exec_id, status, output, started_at, completed_at, event_count) = row
1300            .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1301
1302        Ok(ExecutionInfo {
1303            execution_id: exec_id as u64,
1304            status,
1305            output,
1306            started_at: started_at.timestamp_millis() as u64,
1307            completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1308            event_count: event_count as usize,
1309        })
1310    }
1311
1312    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1313    async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1314        let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1315            "SELECT * FROM {}.get_system_metrics()",
1316            self.schema_name
1317        ))
1318        .fetch_optional(&*self.pool)
1319        .await
1320        .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1321
1322        let (
1323            total_instances,
1324            total_executions,
1325            running_instances,
1326            completed_instances,
1327            failed_instances,
1328            total_events,
1329        ) = row.ok_or_else(|| {
1330            ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1331        })?;
1332
1333        Ok(SystemMetrics {
1334            total_instances: total_instances as u64,
1335            total_executions: total_executions as u64,
1336            running_instances: running_instances as u64,
1337            completed_instances: completed_instances as u64,
1338            failed_instances: failed_instances as u64,
1339            total_events: total_events as u64,
1340        })
1341    }
1342
1343    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1344    async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1345        let now_ms = Self::now_millis();
1346
1347        let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1348            "SELECT * FROM {}.get_queue_depths($1)",
1349            self.schema_name
1350        ))
1351        .bind(now_ms)
1352        .fetch_optional(&*self.pool)
1353        .await
1354        .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1355
1356        let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1357            ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1358        })?;
1359
1360        Ok(QueueDepths {
1361            orchestrator_queue: orchestrator_queue as usize,
1362            worker_queue: worker_queue as usize,
1363            timer_queue: 0, // Timers are in orchestrator queue with delayed visibility
1364        })
1365    }
1366}