duroxide_pg/
provider.rs

1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4    ExecutionInfo, ExecutionMetadata, InstanceInfo, OrchestrationItem, Provider, ProviderAdmin,
5    ProviderError, QueueDepths, SystemMetrics, WorkItem,
6};
7use duroxide::Event;
8use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
9use std::sync::Arc;
10use std::time::{SystemTime, UNIX_EPOCH};
11use std::time::Duration;
12use tokio::time::sleep;
13use tracing::{debug, error, instrument};
14
15use crate::migrations::MigrationRunner;
16
17/// PostgreSQL-based provider for Duroxide durable orchestrations.
18///
19/// Implements the [`Provider`] and [`ProviderAdmin`] traits from Duroxide,
20/// storing orchestration state, history, and work queues in PostgreSQL.
21///
22/// # Example
23///
24/// ```rust,no_run
25/// use duroxide_pg::PostgresProvider;
26///
27/// # async fn example() -> anyhow::Result<()> {
28/// // Connect using DATABASE_URL or explicit connection string
29/// let provider = PostgresProvider::new("postgres://localhost/mydb").await?;
30///
31/// // Or use a custom schema for isolation
32/// let provider = PostgresProvider::new_with_schema(
33///     "postgres://localhost/mydb",
34///     Some("my_app"),
35/// ).await?;
36/// # Ok(())
37/// # }
38/// ```
39pub struct PostgresProvider {
40    pool: Arc<PgPool>,
41    schema_name: String,
42}
43
44impl PostgresProvider {
45    pub async fn new(database_url: &str) -> Result<Self> {
46        Self::new_with_schema(database_url, None).await
47    }
48
49    pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
50        let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
51            .ok()
52            .and_then(|s| s.parse::<u32>().ok())
53            .unwrap_or(10);
54
55        let pool = PgPoolOptions::new()
56            .max_connections(max_connections)
57            .min_connections(1)
58            .acquire_timeout(std::time::Duration::from_secs(30))
59            .connect(database_url)
60            .await?;
61
62        let schema_name = schema_name.unwrap_or("public").to_string();
63
64        let provider = Self {
65            pool: Arc::new(pool),
66            schema_name: schema_name.clone(),
67        };
68
69        // Run migrations to initialize schema
70        let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
71        migration_runner.migrate().await?;
72
73        Ok(provider)
74    }
75
76    #[instrument(skip(self), target = "duroxide::providers::postgres")]
77    pub async fn initialize_schema(&self) -> Result<()> {
78        // Schema initialization is now handled by migrations
79        // This method is kept for backward compatibility but delegates to migrations
80        let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
81        migration_runner.migrate().await?;
82        Ok(())
83    }
84
85    /// Get current timestamp in milliseconds (Unix epoch)
86    fn now_millis() -> i64 {
87        SystemTime::now()
88            .duration_since(UNIX_EPOCH)
89            .unwrap()
90            .as_millis() as i64
91    }
92
93    /// Get schema-qualified table name
94    fn table_name(&self, table: &str) -> String {
95        format!("{}.{}", self.schema_name, table)
96    }
97
98    /// Get the database pool (for testing)
99    pub fn pool(&self) -> &PgPool {
100        &self.pool
101    }
102
103    /// Get the schema name (for testing)
104    pub fn schema_name(&self) -> &str {
105        &self.schema_name
106    }
107
108    /// Convert sqlx::Error to ProviderError with proper classification
109    fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
110        match e {
111            SqlxError::Database(ref db_err) => {
112                // PostgreSQL error codes
113                let code_opt = db_err.code();
114                let code = code_opt.as_deref();
115                if code == Some("40P01") {
116                    // Deadlock detected
117                    ProviderError::retryable(operation, format!("Deadlock detected: {}", e))
118                } else if code == Some("40001") {
119                    // Serialization failure - permanent error (transaction conflict, not transient)
120                    ProviderError::permanent(operation, format!("Serialization failure: {}", e))
121                } else if code == Some("23505") {
122                    // Unique constraint violation (duplicate event)
123                    ProviderError::permanent(operation, format!("Duplicate detected: {}", e))
124                } else if code == Some("23503") {
125                    // Foreign key constraint violation
126                    ProviderError::permanent(operation, format!("Foreign key violation: {}", e))
127                } else {
128                    ProviderError::permanent(operation, format!("Database error: {}", e))
129                }
130            }
131            SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
132                ProviderError::retryable(operation, format!("Connection pool error: {}", e))
133            }
134            SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {}", e)),
135            _ => ProviderError::permanent(operation, format!("Unexpected error: {}", e)),
136        }
137    }
138
139    /// Clean up schema after tests (drops all tables and optionally the schema)
140    ///
141    /// **SAFETY**: Never drops the "public" schema itself, only tables within it.
142    /// Only drops the schema if it's a custom schema (not "public").
143    pub async fn cleanup_schema(&self) -> Result<()> {
144        // Call the stored procedure to drop all tables
145        sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
146            .execute(&*self.pool)
147            .await?;
148
149        // SAFETY: Never drop the "public" schema - it's a PostgreSQL system schema
150        // Only drop custom schemas created for testing
151        if self.schema_name != "public" {
152            sqlx::query(&format!(
153                "DROP SCHEMA IF EXISTS {} CASCADE",
154                self.schema_name
155            ))
156            .execute(&*self.pool)
157            .await?;
158        } else {
159            // Explicit safeguard: we only drop tables from public schema, never the schema itself
160            // This ensures we don't accidentally drop the default PostgreSQL schema
161        }
162
163        Ok(())
164    }
165}
166
167#[async_trait::async_trait]
168impl Provider for PostgresProvider {
169    #[instrument(skip(self), target = "duroxide::providers::postgres")]
170    async fn fetch_orchestration_item(
171        &self,
172        lock_timeout: Duration,
173    ) -> Result<Option<OrchestrationItem>, ProviderError> {
174        let start = std::time::Instant::now();
175
176        const MAX_RETRIES: u32 = 3;
177        const RETRY_DELAY_MS: u64 = 10;
178
179        // Convert Duration to milliseconds
180        let lock_timeout_ms = lock_timeout.as_millis() as i64;
181
182        for attempt in 0..=MAX_RETRIES {
183            let now_ms = Self::now_millis();
184
185            let row: Option<(
186                String,
187                String,
188                String,
189                i64,
190                serde_json::Value,
191                serde_json::Value,
192                String,
193            )> = sqlx::query_as(&format!(
194                "SELECT * FROM {}.fetch_orchestration_item($1, $2)",
195                self.schema_name
196            ))
197            .bind(now_ms)
198            .bind(lock_timeout_ms)
199            .fetch_optional(&*self.pool)
200            .await
201            .map_err(|e| Self::sqlx_to_provider_error("fetch_orchestration_item", e))?;
202
203            if let Some((
204                instance_id,
205                orchestration_name,
206                orchestration_version,
207                execution_id,
208                history_json,
209                messages_json,
210                lock_token,
211            )) = row
212            {
213                let history: Vec<Event> = serde_json::from_value(history_json).map_err(|e| {
214                    ProviderError::permanent(
215                        "fetch_orchestration_item",
216                        &format!("Failed to deserialize history: {}", e),
217                    )
218                })?;
219
220                let messages: Vec<WorkItem> =
221                    serde_json::from_value(messages_json).map_err(|e| {
222                        ProviderError::permanent(
223                            "fetch_orchestration_item",
224                            &format!("Failed to deserialize messages: {}", e),
225                        )
226                    })?;
227
228                let duration_ms = start.elapsed().as_millis() as u64;
229                debug!(
230                    target = "duroxide::providers::postgres",
231                    operation = "fetch_orchestration_item",
232                    instance_id = %instance_id,
233                    execution_id = execution_id,
234                    message_count = messages.len(),
235                    history_count = history.len(),
236                    duration_ms = duration_ms,
237                    attempts = attempt + 1,
238                    "Fetched orchestration item via stored procedure"
239                );
240
241                return Ok(Some(OrchestrationItem {
242                    instance: instance_id,
243                    orchestration_name,
244                    execution_id: execution_id as u64,
245                    version: orchestration_version,
246                    history,
247                    messages,
248                    lock_token,
249                }));
250            }
251
252            if attempt < MAX_RETRIES {
253                sleep(std::time::Duration::from_millis(RETRY_DELAY_MS)).await;
254            }
255        }
256
257        Ok(None)
258    }
259    #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
260    async fn ack_orchestration_item(
261        &self,
262        lock_token: &str,
263        execution_id: u64,
264        history_delta: Vec<Event>,
265        worker_items: Vec<WorkItem>,
266        orchestrator_items: Vec<WorkItem>,
267        metadata: ExecutionMetadata,
268    ) -> Result<(), ProviderError> {
269        let start = std::time::Instant::now();
270
271        let mut history_delta_payload = Vec::with_capacity(history_delta.len());
272        for event in &history_delta {
273            if event.event_id() == 0 {
274                return Err(ProviderError::permanent(
275                    "ack_orchestration_item",
276                    "event_id must be set by runtime",
277                ));
278            }
279
280            let event_json = serde_json::to_string(event).map_err(|e| {
281                ProviderError::permanent(
282                    "ack_orchestration_item",
283                    &format!("Failed to serialize event: {}", e),
284                )
285            })?;
286
287            let event_type = format!("{:?}", event)
288                .split('{')
289                .next()
290                .unwrap_or("Unknown")
291                .trim()
292                .to_string();
293
294            history_delta_payload.push(serde_json::json!({
295                "event_id": event.event_id(),
296                "event_type": event_type,
297                "event_data": event_json,
298            }));
299        }
300
301        let history_delta_json = serde_json::Value::Array(history_delta_payload);
302
303        let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
304            ProviderError::permanent(
305                "ack_orchestration_item",
306                &format!("Failed to serialize worker items: {}", e),
307            )
308        })?;
309
310        let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
311            ProviderError::permanent(
312                "ack_orchestration_item",
313                &format!("Failed to serialize orchestrator items: {}", e),
314            )
315        })?;
316
317        let metadata_json = serde_json::json!({
318            "orchestration_name": metadata.orchestration_name,
319            "orchestration_version": metadata.orchestration_version,
320            "status": metadata.status,
321            "output": metadata.output,
322        });
323
324        match sqlx::query(&format!(
325            "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6)",
326            self.schema_name
327        ))
328        .bind(lock_token)
329        .bind(execution_id as i64)
330        .bind(history_delta_json)
331        .bind(worker_items_json)
332        .bind(orchestrator_items_json)
333        .bind(metadata_json)
334        .execute(&*self.pool)
335        .await
336        {
337            Ok(_) => {}
338            Err(e) => {
339                if let SqlxError::Database(db_err) = &e {
340                    if db_err.message().contains("Invalid lock token") {
341                        return Err(ProviderError::permanent(
342                            "ack_orchestration_item",
343                            "Invalid lock token",
344                        ));
345                    }
346                } else if e.to_string().contains("Invalid lock token") {
347                    return Err(ProviderError::permanent(
348                        "ack_orchestration_item",
349                        "Invalid lock token",
350                    ));
351                }
352
353                return Err(Self::sqlx_to_provider_error("ack_orchestration_item", e));
354            }
355        }
356
357        let duration_ms = start.elapsed().as_millis() as u64;
358        debug!(
359            target = "duroxide::providers::postgres",
360            operation = "ack_orchestration_item",
361            execution_id = execution_id,
362            history_count = history_delta.len(),
363            worker_items_count = worker_items.len(),
364            orchestrator_items_count = orchestrator_items.len(),
365            duration_ms = duration_ms,
366            "Acknowledged orchestration item via stored procedure"
367        );
368
369        Ok(())
370    }
371    #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
372    async fn abandon_orchestration_item(
373        &self,
374        lock_token: &str,
375        delay: Option<Duration>,
376    ) -> Result<(), ProviderError> {
377        let start = std::time::Instant::now();
378        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
379
380        let instance_id = match sqlx::query_scalar::<_, String>(&format!(
381            "SELECT {}.abandon_orchestration_item($1, $2)",
382            self.schema_name
383        ))
384        .bind(lock_token)
385        .bind(delay_param)
386        .fetch_one(&*self.pool)
387        .await
388        {
389            Ok(instance_id) => instance_id,
390            Err(e) => {
391                if let SqlxError::Database(db_err) = &e {
392                    if db_err.message().contains("Invalid lock token") {
393                        return Err(ProviderError::permanent(
394                            "abandon_orchestration_item",
395                            "Invalid lock token",
396                        ));
397                    }
398                } else if e.to_string().contains("Invalid lock token") {
399                    return Err(ProviderError::permanent(
400                        "abandon_orchestration_item",
401                        "Invalid lock token",
402                    ));
403                }
404
405                return Err(Self::sqlx_to_provider_error(
406                    "abandon_orchestration_item",
407                    e,
408                ));
409            }
410        };
411
412        let duration_ms = start.elapsed().as_millis() as u64;
413        debug!(
414            target = "duroxide::providers::postgres",
415            operation = "abandon_orchestration_item",
416            instance_id = %instance_id,
417            delay_ms = delay.map(|d| d.as_millis() as u64),
418            duration_ms = duration_ms,
419            "Abandoned orchestration item via stored procedure"
420        );
421
422        Ok(())
423    }
424
425    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
426    async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
427        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
428            "SELECT out_event_data FROM {}.fetch_history($1)",
429            self.schema_name
430        ))
431        .bind(instance)
432        .fetch_all(&*self.pool)
433        .await
434        .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
435
436        Ok(event_data_rows
437            .into_iter()
438            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
439            .collect())
440    }
441
442    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
443    async fn append_with_execution(
444        &self,
445        instance: &str,
446        execution_id: u64,
447        new_events: Vec<Event>,
448    ) -> Result<(), ProviderError> {
449        if new_events.is_empty() {
450            return Ok(());
451        }
452
453        let mut events_payload = Vec::with_capacity(new_events.len());
454        for event in &new_events {
455            if event.event_id() == 0 {
456                error!(
457                    target = "duroxide::providers::postgres",
458                    operation = "append_with_execution",
459                    error_type = "validation_error",
460                    instance_id = %instance,
461                    execution_id = execution_id,
462                    "event_id must be set by runtime"
463                );
464                return Err(ProviderError::permanent(
465                    "append_with_execution",
466                    "event_id must be set by runtime",
467                ));
468            }
469
470            let event_json = serde_json::to_string(event).map_err(|e| {
471                ProviderError::permanent(
472                    "append_with_execution",
473                    &format!("Failed to serialize event: {}", e),
474                )
475            })?;
476
477            let event_type = format!("{:?}", event)
478                .split('{')
479                .next()
480                .unwrap_or("Unknown")
481                .trim()
482                .to_string();
483
484            events_payload.push(serde_json::json!({
485                "event_id": event.event_id(),
486                "event_type": event_type,
487                "event_data": event_json,
488            }));
489        }
490
491        let events_json = serde_json::Value::Array(events_payload);
492
493        sqlx::query(&format!(
494            "SELECT {}.append_history($1, $2, $3)",
495            self.schema_name
496        ))
497        .bind(instance)
498        .bind(execution_id as i64)
499        .bind(events_json)
500        .execute(&*self.pool)
501        .await
502        .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
503
504        debug!(
505            target = "duroxide::providers::postgres",
506            operation = "append_with_execution",
507            instance_id = %instance,
508            execution_id = execution_id,
509            event_count = new_events.len(),
510            "Appended history events via stored procedure"
511        );
512
513        Ok(())
514    }
515
516    #[instrument(skip(self), target = "duroxide::providers::postgres")]
517    async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
518        let work_item = serde_json::to_string(&item).map_err(|e| {
519            ProviderError::permanent(
520                "enqueue_worker_work",
521                &format!("Failed to serialize work item: {}", e),
522            )
523        })?;
524
525        sqlx::query(&format!(
526            "SELECT {}.enqueue_worker_work($1)",
527            self.schema_name
528        ))
529        .bind(work_item)
530        .execute(&*self.pool)
531        .await
532        .map_err(|e| {
533            error!(
534                target = "duroxide::providers::postgres",
535                operation = "enqueue_worker_work",
536                error_type = "database_error",
537                error = %e,
538                "Failed to enqueue worker work"
539            );
540            Self::sqlx_to_provider_error("enqueue_worker_work", e)
541        })?;
542
543        Ok(())
544    }
545
546    #[instrument(skip(self), target = "duroxide::providers::postgres")]
547    async fn fetch_work_item(&self, lock_timeout: Duration) -> Result<Option<(WorkItem, String)>, ProviderError> {
548        let start = std::time::Instant::now();
549        
550        // Convert Duration to milliseconds
551        let lock_timeout_ms = lock_timeout.as_millis() as i64;
552        
553        let row = match sqlx::query_as::<_, (String, String)>(&format!(
554            "SELECT * FROM {}.fetch_work_item($1, $2)",
555            self.schema_name
556        ))
557        .bind(Self::now_millis())
558        .bind(lock_timeout_ms)
559        .fetch_optional(&*self.pool)
560        .await
561        {
562            Ok(row) => row,
563            Err(e) => {
564                return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
565            }
566        };
567
568        let (work_item_json, lock_token) = match row {
569            Some(row) => row,
570            None => return Ok(None),
571        };
572
573        let work_item: WorkItem = serde_json::from_str(&work_item_json)
574            .map_err(|e| ProviderError::permanent("fetch_work_item", &format!("Failed to deserialize worker item: {}", e)))?;
575
576        let duration_ms = start.elapsed().as_millis() as u64;
577        
578        // Extract instance for logging - different work item types have different structures
579        let instance_id = match &work_item {
580            WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
581            WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
582            WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
583            WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
584            WorkItem::TimerFired { instance, .. } => instance.as_str(),
585            WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
586            WorkItem::CancelInstance { instance, .. } => instance.as_str(),
587            WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
588            WorkItem::SubOrchCompleted { parent_instance, .. } => parent_instance.as_str(),
589            WorkItem::SubOrchFailed { parent_instance, .. } => parent_instance.as_str(),
590        };
591        
592        debug!(
593            target = "duroxide::providers::postgres",
594            operation = "fetch_work_item",
595            instance_id = %instance_id,
596            duration_ms = duration_ms,
597            "Fetched activity work item via stored procedure"
598        );
599
600        Ok(Some((work_item, lock_token)))
601    }
602
603    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
604    async fn ack_work_item(&self, token: &str, completion: WorkItem) -> Result<(), ProviderError> {
605        let start = std::time::Instant::now();
606
607        // Extract instance ID from completion WorkItem
608        let instance_id = match &completion {
609            WorkItem::ActivityCompleted { instance, .. }
610            | WorkItem::ActivityFailed { instance, .. } => instance,
611            _ => {
612                error!(
613                    target = "duroxide::providers::postgres",
614                    operation = "ack_worker",
615                    error_type = "invalid_completion_type",
616                    "Invalid completion work item type"
617                );
618                return Err(ProviderError::permanent(
619                    "ack_worker",
620                    "Invalid completion work item type",
621                ));
622            }
623        };
624
625        let completion_json = serde_json::to_string(&completion).map_err(|e| {
626            ProviderError::permanent(
627                "ack_worker",
628                &format!("Failed to serialize completion: {}", e),
629            )
630        })?;
631
632        // Call stored procedure to atomically delete worker item and enqueue completion
633        sqlx::query(&format!(
634            "SELECT {}.ack_worker($1, $2, $3)",
635            self.schema_name
636        ))
637        .bind(token)
638        .bind(instance_id)
639        .bind(completion_json)
640        .execute(&*self.pool)
641        .await
642        .map_err(|e| {
643            if e.to_string().contains("Worker queue item not found") {
644                error!(
645                    target = "duroxide::providers::postgres",
646                    operation = "ack_worker",
647                    error_type = "worker_item_not_found",
648                    token = %token,
649                    "Worker queue item not found or already processed"
650                );
651                ProviderError::permanent(
652                    "ack_worker",
653                    "Worker queue item not found or already processed",
654                )
655            } else {
656                Self::sqlx_to_provider_error("ack_worker", e)
657            }
658        })?;
659
660        let duration_ms = start.elapsed().as_millis() as u64;
661        debug!(
662            target = "duroxide::providers::postgres",
663            operation = "ack_worker",
664            instance_id = %instance_id,
665            duration_ms = duration_ms,
666            "Acknowledged worker and enqueued completion"
667        );
668
669        Ok(())
670    }
671
672    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
673    async fn renew_work_item_lock(&self, token: &str, extend_for: Duration) -> Result<(), ProviderError> {
674        let start = std::time::Instant::now();
675
676        // Get current time from application for consistent time reference
677        let now_ms = Self::now_millis();
678        
679        // Convert Duration to seconds for the stored procedure
680        let extend_secs = extend_for.as_secs() as i64;
681
682        match sqlx::query(&format!(
683            "SELECT {}.renew_work_item_lock($1, $2, $3)",
684            self.schema_name
685        ))
686        .bind(token)
687        .bind(now_ms)
688        .bind(extend_secs)
689        .execute(&*self.pool)
690        .await
691        {
692            Ok(_) => {
693                let duration_ms = start.elapsed().as_millis() as u64;
694                debug!(
695                    target = "duroxide::providers::postgres",
696                    operation = "renew_work_item_lock",
697                    token = %token,
698                    extend_for_secs = extend_secs,
699                    duration_ms = duration_ms,
700                    "Work item lock renewed successfully"
701                );
702                Ok(())
703            }
704            Err(e) => {
705                if let SqlxError::Database(db_err) = &e {
706                    if db_err.message().contains("Lock token invalid") {
707                        return Err(ProviderError::permanent(
708                            "renew_work_item_lock",
709                            "Lock token invalid, expired, or already acked",
710                        ));
711                    }
712                } else if e.to_string().contains("Lock token invalid") {
713                    return Err(ProviderError::permanent(
714                        "renew_work_item_lock",
715                        "Lock token invalid, expired, or already acked",
716                    ));
717                }
718
719                Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
720            }
721        }
722    }
723
724    #[instrument(skip(self), target = "duroxide::providers::postgres")]
725    async fn enqueue_for_orchestrator(
726        &self,
727        item: WorkItem,
728        delay: Option<Duration>,
729    ) -> Result<(), ProviderError> {
730        let work_item = serde_json::to_string(&item).map_err(|e| {
731            ProviderError::permanent(
732                "enqueue_orchestrator_work",
733                &format!("Failed to serialize work item: {}", e),
734            )
735        })?;
736
737        // Extract instance ID from WorkItem enum
738        let instance_id = match &item {
739            WorkItem::StartOrchestration { instance, .. }
740            | WorkItem::ActivityCompleted { instance, .. }
741            | WorkItem::ActivityFailed { instance, .. }
742            | WorkItem::TimerFired { instance, .. }
743            | WorkItem::ExternalRaised { instance, .. }
744            | WorkItem::CancelInstance { instance, .. }
745            | WorkItem::ContinueAsNew { instance, .. } => instance,
746            WorkItem::SubOrchCompleted {
747                parent_instance, ..
748            }
749            | WorkItem::SubOrchFailed {
750                parent_instance, ..
751            } => parent_instance,
752            WorkItem::ActivityExecute { .. } => {
753                return Err(ProviderError::permanent(
754                    "enqueue_orchestrator_work",
755                    "ActivityExecute should go to worker queue, not orchestrator queue",
756                ));
757            }
758        };
759
760        // Determine visible_at: use max of fire_at_ms (for TimerFired) and delay
761        let now_ms = Self::now_millis();
762
763        let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
764            if *fire_at_ms > 0 {
765                // Take max of fire_at_ms and delay (if provided)
766                if let Some(delay) = delay {
767                    std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
768                } else {
769                    *fire_at_ms
770                }
771            } else {
772                // fire_at_ms is 0, use delay or NOW()
773                delay.map(|d| now_ms as u64 + d.as_millis() as u64).unwrap_or(now_ms as u64)
774            }
775        } else {
776            // Non-timer item: use delay or NOW()
777            delay.map(|d| now_ms as u64 + d.as_millis() as u64).unwrap_or(now_ms as u64)
778        };
779
780        let visible_at = Utc
781            .timestamp_millis_opt(visible_at_ms as i64)
782            .single()
783            .ok_or_else(|| {
784                ProviderError::permanent(
785                    "enqueue_orchestrator_work",
786                    "Invalid visible_at timestamp",
787                )
788            })?;
789
790        // ⚠️ CRITICAL: DO NOT extract orchestration metadata - instance creation happens via ack_orchestration_item metadata
791        // Pass NULL for orchestration_name, orchestration_version, execution_id parameters
792
793        // Call stored procedure to enqueue work
794        sqlx::query(&format!(
795            "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
796            self.schema_name
797        ))
798        .bind(instance_id)
799        .bind(&work_item)
800        .bind(visible_at)
801        .bind::<Option<String>>(None) // orchestration_name - NULL
802        .bind::<Option<String>>(None) // orchestration_version - NULL
803        .bind::<Option<i64>>(None) // execution_id - NULL
804        .execute(&*self.pool)
805        .await
806        .map_err(|e| {
807            error!(
808                target = "duroxide::providers::postgres",
809                operation = "enqueue_orchestrator_work",
810                error_type = "database_error",
811                error = %e,
812                instance_id = %instance_id,
813                "Failed to enqueue orchestrator work"
814            );
815            Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
816        })?;
817
818        debug!(
819            target = "duroxide::providers::postgres",
820            operation = "enqueue_orchestrator_work",
821            instance_id = %instance_id,
822            delay_ms = delay.map(|d| d.as_millis() as u64),
823            "Enqueued orchestrator work"
824        );
825
826        Ok(())
827    }
828
829    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
830    async fn read_with_execution(
831        &self,
832        instance: &str,
833        execution_id: u64,
834    ) -> Result<Vec<Event>, ProviderError> {
835        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
836            "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
837            self.table_name("history")
838        ))
839        .bind(instance)
840        .bind(execution_id as i64)
841        .fetch_all(&*self.pool)
842        .await
843        .ok()
844        .unwrap_or_default();
845
846        Ok(event_data_rows
847            .into_iter()
848            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
849            .collect())
850    }
851
852    fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
853        Some(self)
854    }
855}
856
857#[async_trait::async_trait]
858impl ProviderAdmin for PostgresProvider {
859    #[instrument(skip(self), target = "duroxide::providers::postgres")]
860    async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
861        sqlx::query_scalar(&format!(
862            "SELECT instance_id FROM {}.list_instances()",
863            self.schema_name
864        ))
865        .fetch_all(&*self.pool)
866        .await
867        .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
868    }
869
870    #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
871    async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
872        sqlx::query_scalar(&format!(
873            "SELECT instance_id FROM {}.list_instances_by_status($1)",
874            self.schema_name
875        ))
876        .bind(status)
877        .fetch_all(&*self.pool)
878        .await
879        .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
880    }
881
882    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
883    async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
884        let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
885            "SELECT execution_id FROM {}.list_executions($1)",
886            self.schema_name
887        ))
888        .bind(instance)
889        .fetch_all(&*self.pool)
890        .await
891        .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
892
893        Ok(execution_ids.into_iter().map(|id| id as u64).collect())
894    }
895
896    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
897    async fn read_history_with_execution_id(
898        &self,
899        instance: &str,
900        execution_id: u64,
901    ) -> Result<Vec<Event>, ProviderError> {
902        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
903            "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
904            self.schema_name
905        ))
906        .bind(instance)
907        .bind(execution_id as i64)
908        .fetch_all(&*self.pool)
909        .await
910        .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
911
912        event_data_rows
913            .into_iter()
914            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
915            .collect::<Vec<Event>>()
916            .into_iter()
917            .map(Ok)
918            .collect()
919    }
920
921    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
922    async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
923        let execution_id = self.latest_execution_id(instance).await?;
924        self.read_history_with_execution_id(instance, execution_id)
925            .await
926    }
927
928    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
929    async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
930        sqlx::query_scalar(&format!(
931            "SELECT {}.latest_execution_id($1)",
932            self.schema_name
933        ))
934        .bind(instance)
935        .fetch_optional(&*self.pool)
936        .await
937        .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
938        .map(|id: i64| id as u64)
939        .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
940    }
941
942    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
943    async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
944        let row: Option<(
945            String,
946            String,
947            String,
948            i64,
949            chrono::DateTime<Utc>,
950            Option<chrono::DateTime<Utc>>,
951            Option<String>,
952            Option<String>,
953        )> = sqlx::query_as(&format!(
954            "SELECT * FROM {}.get_instance_info($1)",
955            self.schema_name
956        ))
957        .bind(instance)
958        .fetch_optional(&*self.pool)
959        .await
960        .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
961
962        let (
963            instance_id,
964            orchestration_name,
965            orchestration_version,
966            current_execution_id,
967            created_at,
968            updated_at,
969            status,
970            output,
971        ) =
972            row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
973
974        Ok(InstanceInfo {
975            instance_id,
976            orchestration_name,
977            orchestration_version,
978            current_execution_id: current_execution_id as u64,
979            status: status.unwrap_or_else(|| "Running".to_string()),
980            output,
981            created_at: created_at.timestamp_millis() as u64,
982            updated_at: updated_at
983                .map(|dt| dt.timestamp_millis() as u64)
984                .unwrap_or(created_at.timestamp_millis() as u64),
985        })
986    }
987
988    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
989    async fn get_execution_info(
990        &self,
991        instance: &str,
992        execution_id: u64,
993    ) -> Result<ExecutionInfo, ProviderError> {
994        let row: Option<(
995            i64,
996            String,
997            Option<String>,
998            chrono::DateTime<Utc>,
999            Option<chrono::DateTime<Utc>>,
1000            i64,
1001        )> = sqlx::query_as(&format!(
1002            "SELECT * FROM {}.get_execution_info($1, $2)",
1003            self.schema_name
1004        ))
1005        .bind(instance)
1006        .bind(execution_id as i64)
1007        .fetch_optional(&*self.pool)
1008        .await
1009        .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1010
1011        let (exec_id, status, output, started_at, completed_at, event_count) = row
1012            .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1013
1014        Ok(ExecutionInfo {
1015            execution_id: exec_id as u64,
1016            status,
1017            output,
1018            started_at: started_at.timestamp_millis() as u64,
1019            completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1020            event_count: event_count as usize,
1021        })
1022    }
1023
1024    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1025    async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1026        let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1027            "SELECT * FROM {}.get_system_metrics()",
1028            self.schema_name
1029        ))
1030        .fetch_optional(&*self.pool)
1031        .await
1032        .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1033
1034        let (
1035            total_instances,
1036            total_executions,
1037            running_instances,
1038            completed_instances,
1039            failed_instances,
1040            total_events,
1041        ) = row.ok_or_else(|| {
1042            ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1043        })?;
1044
1045        Ok(SystemMetrics {
1046            total_instances: total_instances as u64,
1047            total_executions: total_executions as u64,
1048            running_instances: running_instances as u64,
1049            completed_instances: completed_instances as u64,
1050            failed_instances: failed_instances as u64,
1051            total_events: total_events as u64,
1052        })
1053    }
1054
1055    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1056    async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1057        let now_ms = Self::now_millis();
1058
1059        let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1060            "SELECT * FROM {}.get_queue_depths($1)",
1061            self.schema_name
1062        ))
1063        .bind(now_ms)
1064        .fetch_optional(&*self.pool)
1065        .await
1066        .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1067
1068        let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1069            ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1070        })?;
1071
1072        Ok(QueueDepths {
1073            orchestrator_queue: orchestrator_queue as usize,
1074            worker_queue: worker_queue as usize,
1075            timer_queue: 0, // Timers are in orchestrator queue with delayed visibility
1076        })
1077    }
1078}