duroxide_pg/
provider.rs

1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4    ExecutionInfo, ExecutionMetadata, InstanceInfo, OrchestrationItem, Provider, ProviderAdmin,
5    ProviderError, QueueDepths, SystemMetrics, WorkItem,
6};
7use duroxide::Event;
8use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
9use std::sync::Arc;
10use std::time::Duration;
11use std::time::{SystemTime, UNIX_EPOCH};
12use tokio::time::sleep;
13use tracing::{debug, error, instrument, warn};
14
15use crate::migrations::MigrationRunner;
16
17/// PostgreSQL-based provider for Duroxide durable orchestrations.
18///
19/// Implements the [`Provider`] and [`ProviderAdmin`] traits from Duroxide,
20/// storing orchestration state, history, and work queues in PostgreSQL.
21///
22/// # Example
23///
24/// ```rust,no_run
25/// use duroxide_pg::PostgresProvider;
26///
27/// # async fn example() -> anyhow::Result<()> {
28/// // Connect using DATABASE_URL or explicit connection string
29/// let provider = PostgresProvider::new("postgres://localhost/mydb").await?;
30///
31/// // Or use a custom schema for isolation
32/// let provider = PostgresProvider::new_with_schema(
33///     "postgres://localhost/mydb",
34///     Some("my_app"),
35/// ).await?;
36/// # Ok(())
37/// # }
38/// ```
39pub struct PostgresProvider {
40    pool: Arc<PgPool>,
41    schema_name: String,
42}
43
44impl PostgresProvider {
45    pub async fn new(database_url: &str) -> Result<Self> {
46        Self::new_with_schema(database_url, None).await
47    }
48
49    pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
50        let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
51            .ok()
52            .and_then(|s| s.parse::<u32>().ok())
53            .unwrap_or(10);
54
55        let pool = PgPoolOptions::new()
56            .max_connections(max_connections)
57            .min_connections(1)
58            .acquire_timeout(std::time::Duration::from_secs(30))
59            .connect(database_url)
60            .await?;
61
62        let schema_name = schema_name.unwrap_or("public").to_string();
63
64        let provider = Self {
65            pool: Arc::new(pool),
66            schema_name: schema_name.clone(),
67        };
68
69        // Run migrations to initialize schema
70        let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
71        migration_runner.migrate().await?;
72
73        Ok(provider)
74    }
75
76    #[instrument(skip(self), target = "duroxide::providers::postgres")]
77    pub async fn initialize_schema(&self) -> Result<()> {
78        // Schema initialization is now handled by migrations
79        // This method is kept for backward compatibility but delegates to migrations
80        let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
81        migration_runner.migrate().await?;
82        Ok(())
83    }
84
85    /// Get current timestamp in milliseconds (Unix epoch)
86    fn now_millis() -> i64 {
87        SystemTime::now()
88            .duration_since(UNIX_EPOCH)
89            .unwrap()
90            .as_millis() as i64
91    }
92
93    /// Get schema-qualified table name
94    fn table_name(&self, table: &str) -> String {
95        format!("{}.{}", self.schema_name, table)
96    }
97
98    /// Get the database pool (for testing)
99    pub fn pool(&self) -> &PgPool {
100        &self.pool
101    }
102
103    /// Get the schema name (for testing)
104    pub fn schema_name(&self) -> &str {
105        &self.schema_name
106    }
107
108    /// Convert sqlx::Error to ProviderError with proper classification
109    fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
110        match e {
111            SqlxError::Database(ref db_err) => {
112                // PostgreSQL error codes
113                let code_opt = db_err.code();
114                let code = code_opt.as_deref();
115                if code == Some("40P01") {
116                    // Deadlock detected
117                    ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
118                } else if code == Some("40001") {
119                    // Serialization failure - permanent error (transaction conflict, not transient)
120                    ProviderError::permanent(operation, format!("Serialization failure: {e}"))
121                } else if code == Some("23505") {
122                    // Unique constraint violation (duplicate event)
123                    ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
124                } else if code == Some("23503") {
125                    // Foreign key constraint violation
126                    ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
127                } else {
128                    ProviderError::permanent(operation, format!("Database error: {e}"))
129                }
130            }
131            SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
132                ProviderError::retryable(operation, format!("Connection pool error: {e}"))
133            }
134            SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
135            _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
136        }
137    }
138
139    /// Clean up schema after tests (drops all tables and optionally the schema)
140    ///
141    /// **SAFETY**: Never drops the "public" schema itself, only tables within it.
142    /// Only drops the schema if it's a custom schema (not "public").
143    pub async fn cleanup_schema(&self) -> Result<()> {
144        // Call the stored procedure to drop all tables
145        sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
146            .execute(&*self.pool)
147            .await?;
148
149        // SAFETY: Never drop the "public" schema - it's a PostgreSQL system schema
150        // Only drop custom schemas created for testing
151        if self.schema_name != "public" {
152            sqlx::query(&format!(
153                "DROP SCHEMA IF EXISTS {} CASCADE",
154                self.schema_name
155            ))
156            .execute(&*self.pool)
157            .await?;
158        } else {
159            // Explicit safeguard: we only drop tables from public schema, never the schema itself
160            // This ensures we don't accidentally drop the default PostgreSQL schema
161        }
162
163        Ok(())
164    }
165}
166
167#[async_trait::async_trait]
168impl Provider for PostgresProvider {
169    #[instrument(skip(self), target = "duroxide::providers::postgres")]
170    async fn fetch_orchestration_item(
171        &self,
172        lock_timeout: Duration,
173        _poll_timeout: Duration,
174    ) -> Result<Option<OrchestrationItem>, ProviderError> {
175        let start = std::time::Instant::now();
176
177        const MAX_RETRIES: u32 = 3;
178        const RETRY_DELAY_MS: u64 = 50;
179
180        // Convert Duration to milliseconds
181        let lock_timeout_ms = lock_timeout.as_millis() as i64;
182        let mut _last_error: Option<ProviderError> = None;
183
184        for attempt in 0..=MAX_RETRIES {
185            let now_ms = Self::now_millis();
186
187            let result: Result<
188                Option<(
189                    String,
190                    String,
191                    String,
192                    i64,
193                    serde_json::Value,
194                    serde_json::Value,
195                    String,
196                )>,
197                SqlxError,
198            > = sqlx::query_as(&format!(
199                "SELECT * FROM {}.fetch_orchestration_item($1, $2)",
200                self.schema_name
201            ))
202            .bind(now_ms)
203            .bind(lock_timeout_ms)
204            .fetch_optional(&*self.pool)
205            .await;
206
207            let row = match result {
208                Ok(r) => r,
209                Err(e) => {
210                    let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
211                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
212                        warn!(
213                            target = "duroxide::providers::postgres",
214                            operation = "fetch_orchestration_item",
215                            attempt = attempt + 1,
216                            error = %provider_err,
217                            "Retryable error, will retry"
218                        );
219                        _last_error = Some(provider_err);
220                        sleep(std::time::Duration::from_millis(
221                            RETRY_DELAY_MS * (attempt as u64 + 1),
222                        ))
223                        .await;
224                        continue;
225                    }
226                    return Err(provider_err);
227                }
228            };
229
230            if let Some((
231                instance_id,
232                orchestration_name,
233                orchestration_version,
234                execution_id,
235                history_json,
236                messages_json,
237                lock_token,
238            )) = row
239            {
240                let history: Vec<Event> = serde_json::from_value(history_json).map_err(|e| {
241                    ProviderError::permanent(
242                        "fetch_orchestration_item",
243                        format!("Failed to deserialize history: {e}"),
244                    )
245                })?;
246
247                let messages: Vec<WorkItem> =
248                    serde_json::from_value(messages_json).map_err(|e| {
249                        ProviderError::permanent(
250                            "fetch_orchestration_item",
251                            format!("Failed to deserialize messages: {e}"),
252                        )
253                    })?;
254
255                let duration_ms = start.elapsed().as_millis() as u64;
256                debug!(
257                    target = "duroxide::providers::postgres",
258                    operation = "fetch_orchestration_item",
259                    instance_id = %instance_id,
260                    execution_id = execution_id,
261                    message_count = messages.len(),
262                    history_count = history.len(),
263                    duration_ms = duration_ms,
264                    attempts = attempt + 1,
265                    "Fetched orchestration item via stored procedure"
266                );
267
268                return Ok(Some(OrchestrationItem {
269                    instance: instance_id,
270                    orchestration_name,
271                    execution_id: execution_id as u64,
272                    version: orchestration_version,
273                    history,
274                    messages,
275                    lock_token,
276                }));
277            }
278
279            if attempt < MAX_RETRIES {
280                sleep(std::time::Duration::from_millis(RETRY_DELAY_MS)).await;
281            }
282        }
283
284        Ok(None)
285    }
286    #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
287    async fn ack_orchestration_item(
288        &self,
289        lock_token: &str,
290        execution_id: u64,
291        history_delta: Vec<Event>,
292        worker_items: Vec<WorkItem>,
293        orchestrator_items: Vec<WorkItem>,
294        metadata: ExecutionMetadata,
295    ) -> Result<(), ProviderError> {
296        let start = std::time::Instant::now();
297
298        const MAX_RETRIES: u32 = 3;
299        const RETRY_DELAY_MS: u64 = 50;
300
301        let mut history_delta_payload = Vec::with_capacity(history_delta.len());
302        for event in &history_delta {
303            if event.event_id() == 0 {
304                return Err(ProviderError::permanent(
305                    "ack_orchestration_item",
306                    "event_id must be set by runtime",
307                ));
308            }
309
310            let event_json = serde_json::to_string(event).map_err(|e| {
311                ProviderError::permanent(
312                    "ack_orchestration_item",
313                    format!("Failed to serialize event: {e}"),
314                )
315            })?;
316
317            let event_type = format!("{event:?}")
318                .split('{')
319                .next()
320                .unwrap_or("Unknown")
321                .trim()
322                .to_string();
323
324            history_delta_payload.push(serde_json::json!({
325                "event_id": event.event_id(),
326                "event_type": event_type,
327                "event_data": event_json,
328            }));
329        }
330
331        let history_delta_json = serde_json::Value::Array(history_delta_payload);
332
333        let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
334            ProviderError::permanent(
335                "ack_orchestration_item",
336                format!("Failed to serialize worker items: {e}"),
337            )
338        })?;
339
340        let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
341            ProviderError::permanent(
342                "ack_orchestration_item",
343                format!("Failed to serialize orchestrator items: {e}"),
344            )
345        })?;
346
347        let metadata_json = serde_json::json!({
348            "orchestration_name": metadata.orchestration_name,
349            "orchestration_version": metadata.orchestration_version,
350            "status": metadata.status,
351            "output": metadata.output,
352        });
353
354        for attempt in 0..=MAX_RETRIES {
355            let result = sqlx::query(&format!(
356                "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6)",
357                self.schema_name
358            ))
359            .bind(lock_token)
360            .bind(execution_id as i64)
361            .bind(&history_delta_json)
362            .bind(&worker_items_json)
363            .bind(&orchestrator_items_json)
364            .bind(&metadata_json)
365            .execute(&*self.pool)
366            .await;
367
368            match result {
369                Ok(_) => {
370                    let duration_ms = start.elapsed().as_millis() as u64;
371                    debug!(
372                        target = "duroxide::providers::postgres",
373                        operation = "ack_orchestration_item",
374                        execution_id = execution_id,
375                        history_count = history_delta.len(),
376                        worker_items_count = worker_items.len(),
377                        orchestrator_items_count = orchestrator_items.len(),
378                        duration_ms = duration_ms,
379                        attempts = attempt + 1,
380                        "Acknowledged orchestration item via stored procedure"
381                    );
382                    return Ok(());
383                }
384                Err(e) => {
385                    // Check for permanent errors first
386                    if let SqlxError::Database(db_err) = &e {
387                        if db_err.message().contains("Invalid lock token") {
388                            return Err(ProviderError::permanent(
389                                "ack_orchestration_item",
390                                "Invalid lock token",
391                            ));
392                        }
393                    } else if e.to_string().contains("Invalid lock token") {
394                        return Err(ProviderError::permanent(
395                            "ack_orchestration_item",
396                            "Invalid lock token",
397                        ));
398                    }
399
400                    let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
401                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
402                        warn!(
403                            target = "duroxide::providers::postgres",
404                            operation = "ack_orchestration_item",
405                            attempt = attempt + 1,
406                            error = %provider_err,
407                            "Retryable error, will retry"
408                        );
409                        sleep(std::time::Duration::from_millis(
410                            RETRY_DELAY_MS * (attempt as u64 + 1),
411                        ))
412                        .await;
413                        continue;
414                    }
415                    return Err(provider_err);
416                }
417            }
418        }
419
420        // Should never reach here, but just in case
421        Ok(())
422    }
423    #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
424    async fn abandon_orchestration_item(
425        &self,
426        lock_token: &str,
427        delay: Option<Duration>,
428    ) -> Result<(), ProviderError> {
429        let start = std::time::Instant::now();
430        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
431
432        let instance_id = match sqlx::query_scalar::<_, String>(&format!(
433            "SELECT {}.abandon_orchestration_item($1, $2)",
434            self.schema_name
435        ))
436        .bind(lock_token)
437        .bind(delay_param)
438        .fetch_one(&*self.pool)
439        .await
440        {
441            Ok(instance_id) => instance_id,
442            Err(e) => {
443                if let SqlxError::Database(db_err) = &e {
444                    if db_err.message().contains("Invalid lock token") {
445                        return Err(ProviderError::permanent(
446                            "abandon_orchestration_item",
447                            "Invalid lock token",
448                        ));
449                    }
450                } else if e.to_string().contains("Invalid lock token") {
451                    return Err(ProviderError::permanent(
452                        "abandon_orchestration_item",
453                        "Invalid lock token",
454                    ));
455                }
456
457                return Err(Self::sqlx_to_provider_error(
458                    "abandon_orchestration_item",
459                    e,
460                ));
461            }
462        };
463
464        let duration_ms = start.elapsed().as_millis() as u64;
465        debug!(
466            target = "duroxide::providers::postgres",
467            operation = "abandon_orchestration_item",
468            instance_id = %instance_id,
469            delay_ms = delay.map(|d| d.as_millis() as u64),
470            duration_ms = duration_ms,
471            "Abandoned orchestration item via stored procedure"
472        );
473
474        Ok(())
475    }
476
477    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
478    async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
479        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
480            "SELECT out_event_data FROM {}.fetch_history($1)",
481            self.schema_name
482        ))
483        .bind(instance)
484        .fetch_all(&*self.pool)
485        .await
486        .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
487
488        Ok(event_data_rows
489            .into_iter()
490            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
491            .collect())
492    }
493
494    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
495    async fn append_with_execution(
496        &self,
497        instance: &str,
498        execution_id: u64,
499        new_events: Vec<Event>,
500    ) -> Result<(), ProviderError> {
501        if new_events.is_empty() {
502            return Ok(());
503        }
504
505        let mut events_payload = Vec::with_capacity(new_events.len());
506        for event in &new_events {
507            if event.event_id() == 0 {
508                error!(
509                    target = "duroxide::providers::postgres",
510                    operation = "append_with_execution",
511                    error_type = "validation_error",
512                    instance_id = %instance,
513                    execution_id = execution_id,
514                    "event_id must be set by runtime"
515                );
516                return Err(ProviderError::permanent(
517                    "append_with_execution",
518                    "event_id must be set by runtime",
519                ));
520            }
521
522            let event_json = serde_json::to_string(event).map_err(|e| {
523                ProviderError::permanent(
524                    "append_with_execution",
525                    format!("Failed to serialize event: {e}"),
526                )
527            })?;
528
529            let event_type = format!("{event:?}")
530                .split('{')
531                .next()
532                .unwrap_or("Unknown")
533                .trim()
534                .to_string();
535
536            events_payload.push(serde_json::json!({
537                "event_id": event.event_id(),
538                "event_type": event_type,
539                "event_data": event_json,
540            }));
541        }
542
543        let events_json = serde_json::Value::Array(events_payload);
544
545        sqlx::query(&format!(
546            "SELECT {}.append_history($1, $2, $3)",
547            self.schema_name
548        ))
549        .bind(instance)
550        .bind(execution_id as i64)
551        .bind(events_json)
552        .execute(&*self.pool)
553        .await
554        .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
555
556        debug!(
557            target = "duroxide::providers::postgres",
558            operation = "append_with_execution",
559            instance_id = %instance,
560            execution_id = execution_id,
561            event_count = new_events.len(),
562            "Appended history events via stored procedure"
563        );
564
565        Ok(())
566    }
567
568    #[instrument(skip(self), target = "duroxide::providers::postgres")]
569    async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
570        let work_item = serde_json::to_string(&item).map_err(|e| {
571            ProviderError::permanent(
572                "enqueue_worker_work",
573                format!("Failed to serialize work item: {e}"),
574            )
575        })?;
576
577        sqlx::query(&format!(
578            "SELECT {}.enqueue_worker_work($1)",
579            self.schema_name
580        ))
581        .bind(work_item)
582        .execute(&*self.pool)
583        .await
584        .map_err(|e| {
585            error!(
586                target = "duroxide::providers::postgres",
587                operation = "enqueue_worker_work",
588                error_type = "database_error",
589                error = %e,
590                "Failed to enqueue worker work"
591            );
592            Self::sqlx_to_provider_error("enqueue_worker_work", e)
593        })?;
594
595        Ok(())
596    }
597
598    #[instrument(skip(self), target = "duroxide::providers::postgres")]
599    async fn fetch_work_item(
600        &self,
601        lock_timeout: Duration,
602        _poll_timeout: Duration,
603    ) -> Result<Option<(WorkItem, String)>, ProviderError> {
604        let start = std::time::Instant::now();
605
606        // Convert Duration to milliseconds
607        let lock_timeout_ms = lock_timeout.as_millis() as i64;
608
609        let row = match sqlx::query_as::<_, (String, String)>(&format!(
610            "SELECT * FROM {}.fetch_work_item($1, $2)",
611            self.schema_name
612        ))
613        .bind(Self::now_millis())
614        .bind(lock_timeout_ms)
615        .fetch_optional(&*self.pool)
616        .await
617        {
618            Ok(row) => row,
619            Err(e) => {
620                return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
621            }
622        };
623
624        let (work_item_json, lock_token) = match row {
625            Some(row) => row,
626            None => return Ok(None),
627        };
628
629        let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
630            ProviderError::permanent(
631                "fetch_work_item",
632                format!("Failed to deserialize worker item: {e}"),
633            )
634        })?;
635
636        let duration_ms = start.elapsed().as_millis() as u64;
637
638        // Extract instance for logging - different work item types have different structures
639        let instance_id = match &work_item {
640            WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
641            WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
642            WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
643            WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
644            WorkItem::TimerFired { instance, .. } => instance.as_str(),
645            WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
646            WorkItem::CancelInstance { instance, .. } => instance.as_str(),
647            WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
648            WorkItem::SubOrchCompleted {
649                parent_instance, ..
650            } => parent_instance.as_str(),
651            WorkItem::SubOrchFailed {
652                parent_instance, ..
653            } => parent_instance.as_str(),
654        };
655
656        debug!(
657            target = "duroxide::providers::postgres",
658            operation = "fetch_work_item",
659            instance_id = %instance_id,
660            duration_ms = duration_ms,
661            "Fetched activity work item via stored procedure"
662        );
663
664        Ok(Some((work_item, lock_token)))
665    }
666
667    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
668    async fn ack_work_item(&self, token: &str, completion: WorkItem) -> Result<(), ProviderError> {
669        let start = std::time::Instant::now();
670
671        // Extract instance ID from completion WorkItem
672        let instance_id = match &completion {
673            WorkItem::ActivityCompleted { instance, .. }
674            | WorkItem::ActivityFailed { instance, .. } => instance,
675            _ => {
676                error!(
677                    target = "duroxide::providers::postgres",
678                    operation = "ack_worker",
679                    error_type = "invalid_completion_type",
680                    "Invalid completion work item type"
681                );
682                return Err(ProviderError::permanent(
683                    "ack_worker",
684                    "Invalid completion work item type",
685                ));
686            }
687        };
688
689        let completion_json = serde_json::to_string(&completion).map_err(|e| {
690            ProviderError::permanent(
691                "ack_worker",
692                format!("Failed to serialize completion: {e}"),
693            )
694        })?;
695
696        // Call stored procedure to atomically delete worker item and enqueue completion
697        sqlx::query(&format!(
698            "SELECT {}.ack_worker($1, $2, $3)",
699            self.schema_name
700        ))
701        .bind(token)
702        .bind(instance_id)
703        .bind(completion_json)
704        .execute(&*self.pool)
705        .await
706        .map_err(|e| {
707            if e.to_string().contains("Worker queue item not found") {
708                error!(
709                    target = "duroxide::providers::postgres",
710                    operation = "ack_worker",
711                    error_type = "worker_item_not_found",
712                    token = %token,
713                    "Worker queue item not found or already processed"
714                );
715                ProviderError::permanent(
716                    "ack_worker",
717                    "Worker queue item not found or already processed",
718                )
719            } else {
720                Self::sqlx_to_provider_error("ack_worker", e)
721            }
722        })?;
723
724        let duration_ms = start.elapsed().as_millis() as u64;
725        debug!(
726            target = "duroxide::providers::postgres",
727            operation = "ack_worker",
728            instance_id = %instance_id,
729            duration_ms = duration_ms,
730            "Acknowledged worker and enqueued completion"
731        );
732
733        Ok(())
734    }
735
736    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
737    async fn renew_work_item_lock(
738        &self,
739        token: &str,
740        extend_for: Duration,
741    ) -> Result<(), ProviderError> {
742        let start = std::time::Instant::now();
743
744        // Get current time from application for consistent time reference
745        let now_ms = Self::now_millis();
746
747        // Convert Duration to seconds for the stored procedure
748        let extend_secs = extend_for.as_secs() as i64;
749
750        match sqlx::query(&format!(
751            "SELECT {}.renew_work_item_lock($1, $2, $3)",
752            self.schema_name
753        ))
754        .bind(token)
755        .bind(now_ms)
756        .bind(extend_secs)
757        .execute(&*self.pool)
758        .await
759        {
760            Ok(_) => {
761                let duration_ms = start.elapsed().as_millis() as u64;
762                debug!(
763                    target = "duroxide::providers::postgres",
764                    operation = "renew_work_item_lock",
765                    token = %token,
766                    extend_for_secs = extend_secs,
767                    duration_ms = duration_ms,
768                    "Work item lock renewed successfully"
769                );
770                Ok(())
771            }
772            Err(e) => {
773                if let SqlxError::Database(db_err) = &e {
774                    if db_err.message().contains("Lock token invalid") {
775                        return Err(ProviderError::permanent(
776                            "renew_work_item_lock",
777                            "Lock token invalid, expired, or already acked",
778                        ));
779                    }
780                } else if e.to_string().contains("Lock token invalid") {
781                    return Err(ProviderError::permanent(
782                        "renew_work_item_lock",
783                        "Lock token invalid, expired, or already acked",
784                    ));
785                }
786
787                Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
788            }
789        }
790    }
791
792    #[instrument(skip(self), target = "duroxide::providers::postgres")]
793    async fn enqueue_for_orchestrator(
794        &self,
795        item: WorkItem,
796        delay: Option<Duration>,
797    ) -> Result<(), ProviderError> {
798        let work_item = serde_json::to_string(&item).map_err(|e| {
799            ProviderError::permanent(
800                "enqueue_orchestrator_work",
801                format!("Failed to serialize work item: {e}"),
802            )
803        })?;
804
805        // Extract instance ID from WorkItem enum
806        let instance_id = match &item {
807            WorkItem::StartOrchestration { instance, .. }
808            | WorkItem::ActivityCompleted { instance, .. }
809            | WorkItem::ActivityFailed { instance, .. }
810            | WorkItem::TimerFired { instance, .. }
811            | WorkItem::ExternalRaised { instance, .. }
812            | WorkItem::CancelInstance { instance, .. }
813            | WorkItem::ContinueAsNew { instance, .. } => instance,
814            WorkItem::SubOrchCompleted {
815                parent_instance, ..
816            }
817            | WorkItem::SubOrchFailed {
818                parent_instance, ..
819            } => parent_instance,
820            WorkItem::ActivityExecute { .. } => {
821                return Err(ProviderError::permanent(
822                    "enqueue_orchestrator_work",
823                    "ActivityExecute should go to worker queue, not orchestrator queue",
824                ));
825            }
826        };
827
828        // Determine visible_at: use max of fire_at_ms (for TimerFired) and delay
829        let now_ms = Self::now_millis();
830
831        let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
832            if *fire_at_ms > 0 {
833                // Take max of fire_at_ms and delay (if provided)
834                if let Some(delay) = delay {
835                    std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
836                } else {
837                    *fire_at_ms
838                }
839            } else {
840                // fire_at_ms is 0, use delay or NOW()
841                delay
842                    .map(|d| now_ms as u64 + d.as_millis() as u64)
843                    .unwrap_or(now_ms as u64)
844            }
845        } else {
846            // Non-timer item: use delay or NOW()
847            delay
848                .map(|d| now_ms as u64 + d.as_millis() as u64)
849                .unwrap_or(now_ms as u64)
850        };
851
852        let visible_at = Utc
853            .timestamp_millis_opt(visible_at_ms as i64)
854            .single()
855            .ok_or_else(|| {
856                ProviderError::permanent(
857                    "enqueue_orchestrator_work",
858                    "Invalid visible_at timestamp",
859                )
860            })?;
861
862        // ⚠️ CRITICAL: DO NOT extract orchestration metadata - instance creation happens via ack_orchestration_item metadata
863        // Pass NULL for orchestration_name, orchestration_version, execution_id parameters
864
865        // Call stored procedure to enqueue work
866        sqlx::query(&format!(
867            "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
868            self.schema_name
869        ))
870        .bind(instance_id)
871        .bind(&work_item)
872        .bind(visible_at)
873        .bind::<Option<String>>(None) // orchestration_name - NULL
874        .bind::<Option<String>>(None) // orchestration_version - NULL
875        .bind::<Option<i64>>(None) // execution_id - NULL
876        .execute(&*self.pool)
877        .await
878        .map_err(|e| {
879            error!(
880                target = "duroxide::providers::postgres",
881                operation = "enqueue_orchestrator_work",
882                error_type = "database_error",
883                error = %e,
884                instance_id = %instance_id,
885                "Failed to enqueue orchestrator work"
886            );
887            Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
888        })?;
889
890        debug!(
891            target = "duroxide::providers::postgres",
892            operation = "enqueue_orchestrator_work",
893            instance_id = %instance_id,
894            delay_ms = delay.map(|d| d.as_millis() as u64),
895            "Enqueued orchestrator work"
896        );
897
898        Ok(())
899    }
900
901    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
902    async fn read_with_execution(
903        &self,
904        instance: &str,
905        execution_id: u64,
906    ) -> Result<Vec<Event>, ProviderError> {
907        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
908            "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
909            self.table_name("history")
910        ))
911        .bind(instance)
912        .bind(execution_id as i64)
913        .fetch_all(&*self.pool)
914        .await
915        .ok()
916        .unwrap_or_default();
917
918        Ok(event_data_rows
919            .into_iter()
920            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
921            .collect())
922    }
923
924    fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
925        Some(self)
926    }
927}
928
929#[async_trait::async_trait]
930impl ProviderAdmin for PostgresProvider {
931    #[instrument(skip(self), target = "duroxide::providers::postgres")]
932    async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
933        sqlx::query_scalar(&format!(
934            "SELECT instance_id FROM {}.list_instances()",
935            self.schema_name
936        ))
937        .fetch_all(&*self.pool)
938        .await
939        .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
940    }
941
942    #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
943    async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
944        sqlx::query_scalar(&format!(
945            "SELECT instance_id FROM {}.list_instances_by_status($1)",
946            self.schema_name
947        ))
948        .bind(status)
949        .fetch_all(&*self.pool)
950        .await
951        .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
952    }
953
954    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
955    async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
956        let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
957            "SELECT execution_id FROM {}.list_executions($1)",
958            self.schema_name
959        ))
960        .bind(instance)
961        .fetch_all(&*self.pool)
962        .await
963        .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
964
965        Ok(execution_ids.into_iter().map(|id| id as u64).collect())
966    }
967
968    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
969    async fn read_history_with_execution_id(
970        &self,
971        instance: &str,
972        execution_id: u64,
973    ) -> Result<Vec<Event>, ProviderError> {
974        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
975            "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
976            self.schema_name
977        ))
978        .bind(instance)
979        .bind(execution_id as i64)
980        .fetch_all(&*self.pool)
981        .await
982        .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
983
984        event_data_rows
985            .into_iter()
986            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
987            .collect::<Vec<Event>>()
988            .into_iter()
989            .map(Ok)
990            .collect()
991    }
992
993    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
994    async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
995        let execution_id = self.latest_execution_id(instance).await?;
996        self.read_history_with_execution_id(instance, execution_id)
997            .await
998    }
999
1000    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1001    async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1002        sqlx::query_scalar(&format!(
1003            "SELECT {}.latest_execution_id($1)",
1004            self.schema_name
1005        ))
1006        .bind(instance)
1007        .fetch_optional(&*self.pool)
1008        .await
1009        .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1010        .map(|id: i64| id as u64)
1011        .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1012    }
1013
1014    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1015    async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1016        let row: Option<(
1017            String,
1018            String,
1019            String,
1020            i64,
1021            chrono::DateTime<Utc>,
1022            Option<chrono::DateTime<Utc>>,
1023            Option<String>,
1024            Option<String>,
1025        )> = sqlx::query_as(&format!(
1026            "SELECT * FROM {}.get_instance_info($1)",
1027            self.schema_name
1028        ))
1029        .bind(instance)
1030        .fetch_optional(&*self.pool)
1031        .await
1032        .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1033
1034        let (
1035            instance_id,
1036            orchestration_name,
1037            orchestration_version,
1038            current_execution_id,
1039            created_at,
1040            updated_at,
1041            status,
1042            output,
1043        ) =
1044            row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1045
1046        Ok(InstanceInfo {
1047            instance_id,
1048            orchestration_name,
1049            orchestration_version,
1050            current_execution_id: current_execution_id as u64,
1051            status: status.unwrap_or_else(|| "Running".to_string()),
1052            output,
1053            created_at: created_at.timestamp_millis() as u64,
1054            updated_at: updated_at
1055                .map(|dt| dt.timestamp_millis() as u64)
1056                .unwrap_or(created_at.timestamp_millis() as u64),
1057        })
1058    }
1059
1060    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1061    async fn get_execution_info(
1062        &self,
1063        instance: &str,
1064        execution_id: u64,
1065    ) -> Result<ExecutionInfo, ProviderError> {
1066        let row: Option<(
1067            i64,
1068            String,
1069            Option<String>,
1070            chrono::DateTime<Utc>,
1071            Option<chrono::DateTime<Utc>>,
1072            i64,
1073        )> = sqlx::query_as(&format!(
1074            "SELECT * FROM {}.get_execution_info($1, $2)",
1075            self.schema_name
1076        ))
1077        .bind(instance)
1078        .bind(execution_id as i64)
1079        .fetch_optional(&*self.pool)
1080        .await
1081        .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1082
1083        let (exec_id, status, output, started_at, completed_at, event_count) = row
1084            .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1085
1086        Ok(ExecutionInfo {
1087            execution_id: exec_id as u64,
1088            status,
1089            output,
1090            started_at: started_at.timestamp_millis() as u64,
1091            completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1092            event_count: event_count as usize,
1093        })
1094    }
1095
1096    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1097    async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1098        let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1099            "SELECT * FROM {}.get_system_metrics()",
1100            self.schema_name
1101        ))
1102        .fetch_optional(&*self.pool)
1103        .await
1104        .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1105
1106        let (
1107            total_instances,
1108            total_executions,
1109            running_instances,
1110            completed_instances,
1111            failed_instances,
1112            total_events,
1113        ) = row.ok_or_else(|| {
1114            ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1115        })?;
1116
1117        Ok(SystemMetrics {
1118            total_instances: total_instances as u64,
1119            total_executions: total_executions as u64,
1120            running_instances: running_instances as u64,
1121            completed_instances: completed_instances as u64,
1122            failed_instances: failed_instances as u64,
1123            total_events: total_events as u64,
1124        })
1125    }
1126
1127    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1128    async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1129        let now_ms = Self::now_millis();
1130
1131        let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1132            "SELECT * FROM {}.get_queue_depths($1)",
1133            self.schema_name
1134        ))
1135        .bind(now_ms)
1136        .fetch_optional(&*self.pool)
1137        .await
1138        .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1139
1140        let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1141            ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1142        })?;
1143
1144        Ok(QueueDepths {
1145            orchestrator_queue: orchestrator_queue as usize,
1146            worker_queue: worker_queue as usize,
1147            timer_queue: 0, // Timers are in orchestrator queue with delayed visibility
1148        })
1149    }
1150}