duroxide_pg/
provider.rs

1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4    ExecutionInfo, ExecutionMetadata, InstanceInfo, OrchestrationItem, Provider, ProviderAdmin,
5    ProviderError, QueueDepths, SystemMetrics, WorkItem,
6};
7use duroxide::Event;
8use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
9use std::sync::Arc;
10use std::time::Duration;
11use std::time::{SystemTime, UNIX_EPOCH};
12use tokio::time::sleep;
13use tracing::{debug, error, instrument, warn};
14
15use crate::migrations::MigrationRunner;
16
17/// PostgreSQL-based provider for Duroxide durable orchestrations.
18///
19/// Implements the [`Provider`] and [`ProviderAdmin`] traits from Duroxide,
20/// storing orchestration state, history, and work queues in PostgreSQL.
21///
22/// # Example
23///
24/// ```rust,no_run
25/// use duroxide_pg::PostgresProvider;
26///
27/// # async fn example() -> anyhow::Result<()> {
28/// // Connect using DATABASE_URL or explicit connection string
29/// let provider = PostgresProvider::new("postgres://localhost/mydb").await?;
30///
31/// // Or use a custom schema for isolation
32/// let provider = PostgresProvider::new_with_schema(
33///     "postgres://localhost/mydb",
34///     Some("my_app"),
35/// ).await?;
36/// # Ok(())
37/// # }
38/// ```
39pub struct PostgresProvider {
40    pool: Arc<PgPool>,
41    schema_name: String,
42}
43
44impl PostgresProvider {
45    pub async fn new(database_url: &str) -> Result<Self> {
46        Self::new_with_schema(database_url, None).await
47    }
48
49    pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
50        let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
51            .ok()
52            .and_then(|s| s.parse::<u32>().ok())
53            .unwrap_or(10);
54
55        let pool = PgPoolOptions::new()
56            .max_connections(max_connections)
57            .min_connections(1)
58            .acquire_timeout(std::time::Duration::from_secs(30))
59            .connect(database_url)
60            .await?;
61
62        let schema_name = schema_name.unwrap_or("public").to_string();
63
64        let provider = Self {
65            pool: Arc::new(pool),
66            schema_name: schema_name.clone(),
67        };
68
69        // Run migrations to initialize schema
70        let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
71        migration_runner.migrate().await?;
72
73        Ok(provider)
74    }
75
76    #[instrument(skip(self), target = "duroxide::providers::postgres")]
77    pub async fn initialize_schema(&self) -> Result<()> {
78        // Schema initialization is now handled by migrations
79        // This method is kept for backward compatibility but delegates to migrations
80        let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
81        migration_runner.migrate().await?;
82        Ok(())
83    }
84
85    /// Get current timestamp in milliseconds (Unix epoch)
86    fn now_millis() -> i64 {
87        SystemTime::now()
88            .duration_since(UNIX_EPOCH)
89            .unwrap()
90            .as_millis() as i64
91    }
92
93    /// Get schema-qualified table name
94    fn table_name(&self, table: &str) -> String {
95        format!("{}.{}", self.schema_name, table)
96    }
97
98    /// Get the database pool (for testing)
99    pub fn pool(&self) -> &PgPool {
100        &self.pool
101    }
102
103    /// Get the schema name (for testing)
104    pub fn schema_name(&self) -> &str {
105        &self.schema_name
106    }
107
108    /// Convert sqlx::Error to ProviderError with proper classification
109    fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
110        match e {
111            SqlxError::Database(ref db_err) => {
112                // PostgreSQL error codes
113                let code_opt = db_err.code();
114                let code = code_opt.as_deref();
115                if code == Some("40P01") {
116                    // Deadlock detected
117                    ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
118                } else if code == Some("40001") {
119                    // Serialization failure - permanent error (transaction conflict, not transient)
120                    ProviderError::permanent(operation, format!("Serialization failure: {e}"))
121                } else if code == Some("23505") {
122                    // Unique constraint violation (duplicate event)
123                    ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
124                } else if code == Some("23503") {
125                    // Foreign key constraint violation
126                    ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
127                } else {
128                    ProviderError::permanent(operation, format!("Database error: {e}"))
129                }
130            }
131            SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
132                ProviderError::retryable(operation, format!("Connection pool error: {e}"))
133            }
134            SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
135            _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
136        }
137    }
138
139    /// Clean up schema after tests (drops all tables and optionally the schema)
140    ///
141    /// **SAFETY**: Never drops the "public" schema itself, only tables within it.
142    /// Only drops the schema if it's a custom schema (not "public").
143    pub async fn cleanup_schema(&self) -> Result<()> {
144        // Call the stored procedure to drop all tables
145        sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
146            .execute(&*self.pool)
147            .await?;
148
149        // SAFETY: Never drop the "public" schema - it's a PostgreSQL system schema
150        // Only drop custom schemas created for testing
151        if self.schema_name != "public" {
152            sqlx::query(&format!(
153                "DROP SCHEMA IF EXISTS {} CASCADE",
154                self.schema_name
155            ))
156            .execute(&*self.pool)
157            .await?;
158        } else {
159            // Explicit safeguard: we only drop tables from public schema, never the schema itself
160            // This ensures we don't accidentally drop the default PostgreSQL schema
161        }
162
163        Ok(())
164    }
165}
166
167#[async_trait::async_trait]
168impl Provider for PostgresProvider {
169    fn name(&self) -> &str {
170        "duroxide-pg"
171    }
172
173    fn version(&self) -> &str {
174        env!("CARGO_PKG_VERSION")
175    }
176
177    #[instrument(skip(self), target = "duroxide::providers::postgres")]
178    async fn fetch_orchestration_item(
179        &self,
180        lock_timeout: Duration,
181        _poll_timeout: Duration,
182    ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
183        let start = std::time::Instant::now();
184
185        const MAX_RETRIES: u32 = 3;
186        const RETRY_DELAY_MS: u64 = 50;
187
188        // Convert Duration to milliseconds
189        let lock_timeout_ms = lock_timeout.as_millis() as i64;
190        let mut _last_error: Option<ProviderError> = None;
191
192        for attempt in 0..=MAX_RETRIES {
193            let now_ms = Self::now_millis();
194
195            let result: Result<
196                Option<(
197                    String,
198                    String,
199                    String,
200                    i64,
201                    serde_json::Value,
202                    serde_json::Value,
203                    String,
204                    i32,
205                )>,
206                SqlxError,
207            > = sqlx::query_as(&format!(
208                "SELECT * FROM {}.fetch_orchestration_item($1, $2)",
209                self.schema_name
210            ))
211            .bind(now_ms)
212            .bind(lock_timeout_ms)
213            .fetch_optional(&*self.pool)
214            .await;
215
216            let row = match result {
217                Ok(r) => r,
218                Err(e) => {
219                    let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
220                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
221                        warn!(
222                            target = "duroxide::providers::postgres",
223                            operation = "fetch_orchestration_item",
224                            attempt = attempt + 1,
225                            error = %provider_err,
226                            "Retryable error, will retry"
227                        );
228                        _last_error = Some(provider_err);
229                        sleep(std::time::Duration::from_millis(
230                            RETRY_DELAY_MS * (attempt as u64 + 1),
231                        ))
232                        .await;
233                        continue;
234                    }
235                    return Err(provider_err);
236                }
237            };
238
239            if let Some((
240                instance_id,
241                orchestration_name,
242                orchestration_version,
243                execution_id,
244                history_json,
245                messages_json,
246                lock_token,
247                attempt_count,
248            )) = row
249            {
250                let history: Vec<Event> = serde_json::from_value(history_json).map_err(|e| {
251                    ProviderError::permanent(
252                        "fetch_orchestration_item",
253                        format!("Failed to deserialize history: {e}"),
254                    )
255                })?;
256
257                let messages: Vec<WorkItem> =
258                    serde_json::from_value(messages_json).map_err(|e| {
259                        ProviderError::permanent(
260                            "fetch_orchestration_item",
261                            format!("Failed to deserialize messages: {e}"),
262                        )
263                    })?;
264
265                let duration_ms = start.elapsed().as_millis() as u64;
266                debug!(
267                    target = "duroxide::providers::postgres",
268                    operation = "fetch_orchestration_item",
269                    instance_id = %instance_id,
270                    execution_id = execution_id,
271                    message_count = messages.len(),
272                    history_count = history.len(),
273                    attempt_count = attempt_count,
274                    duration_ms = duration_ms,
275                    attempts = attempt + 1,
276                    "Fetched orchestration item via stored procedure"
277                );
278
279                return Ok(Some((
280                    OrchestrationItem {
281                        instance: instance_id,
282                        orchestration_name,
283                        execution_id: execution_id as u64,
284                        version: orchestration_version,
285                        history,
286                        messages,
287                    },
288                    lock_token,
289                    attempt_count as u32,
290                )));
291            }
292
293            if attempt < MAX_RETRIES {
294                sleep(std::time::Duration::from_millis(RETRY_DELAY_MS)).await;
295            }
296        }
297
298        Ok(None)
299    }
300    #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
301    async fn ack_orchestration_item(
302        &self,
303        lock_token: &str,
304        execution_id: u64,
305        history_delta: Vec<Event>,
306        worker_items: Vec<WorkItem>,
307        orchestrator_items: Vec<WorkItem>,
308        metadata: ExecutionMetadata,
309    ) -> Result<(), ProviderError> {
310        let start = std::time::Instant::now();
311
312        const MAX_RETRIES: u32 = 3;
313        const RETRY_DELAY_MS: u64 = 50;
314
315        let mut history_delta_payload = Vec::with_capacity(history_delta.len());
316        for event in &history_delta {
317            if event.event_id() == 0 {
318                return Err(ProviderError::permanent(
319                    "ack_orchestration_item",
320                    "event_id must be set by runtime",
321                ));
322            }
323
324            let event_json = serde_json::to_string(event).map_err(|e| {
325                ProviderError::permanent(
326                    "ack_orchestration_item",
327                    format!("Failed to serialize event: {e}"),
328                )
329            })?;
330
331            let event_type = format!("{event:?}")
332                .split('{')
333                .next()
334                .unwrap_or("Unknown")
335                .trim()
336                .to_string();
337
338            history_delta_payload.push(serde_json::json!({
339                "event_id": event.event_id(),
340                "event_type": event_type,
341                "event_data": event_json,
342            }));
343        }
344
345        let history_delta_json = serde_json::Value::Array(history_delta_payload);
346
347        let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
348            ProviderError::permanent(
349                "ack_orchestration_item",
350                format!("Failed to serialize worker items: {e}"),
351            )
352        })?;
353
354        let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
355            ProviderError::permanent(
356                "ack_orchestration_item",
357                format!("Failed to serialize orchestrator items: {e}"),
358            )
359        })?;
360
361        let metadata_json = serde_json::json!({
362            "orchestration_name": metadata.orchestration_name,
363            "orchestration_version": metadata.orchestration_version,
364            "status": metadata.status,
365            "output": metadata.output,
366        });
367
368        for attempt in 0..=MAX_RETRIES {
369            let result = sqlx::query(&format!(
370                "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6)",
371                self.schema_name
372            ))
373            .bind(lock_token)
374            .bind(execution_id as i64)
375            .bind(&history_delta_json)
376            .bind(&worker_items_json)
377            .bind(&orchestrator_items_json)
378            .bind(&metadata_json)
379            .execute(&*self.pool)
380            .await;
381
382            match result {
383                Ok(_) => {
384                    let duration_ms = start.elapsed().as_millis() as u64;
385                    debug!(
386                        target = "duroxide::providers::postgres",
387                        operation = "ack_orchestration_item",
388                        execution_id = execution_id,
389                        history_count = history_delta.len(),
390                        worker_items_count = worker_items.len(),
391                        orchestrator_items_count = orchestrator_items.len(),
392                        duration_ms = duration_ms,
393                        attempts = attempt + 1,
394                        "Acknowledged orchestration item via stored procedure"
395                    );
396                    return Ok(());
397                }
398                Err(e) => {
399                    // Check for permanent errors first
400                    if let SqlxError::Database(db_err) = &e {
401                        if db_err.message().contains("Invalid lock token") {
402                            return Err(ProviderError::permanent(
403                                "ack_orchestration_item",
404                                "Invalid lock token",
405                            ));
406                        }
407                    } else if e.to_string().contains("Invalid lock token") {
408                        return Err(ProviderError::permanent(
409                            "ack_orchestration_item",
410                            "Invalid lock token",
411                        ));
412                    }
413
414                    let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
415                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
416                        warn!(
417                            target = "duroxide::providers::postgres",
418                            operation = "ack_orchestration_item",
419                            attempt = attempt + 1,
420                            error = %provider_err,
421                            "Retryable error, will retry"
422                        );
423                        sleep(std::time::Duration::from_millis(
424                            RETRY_DELAY_MS * (attempt as u64 + 1),
425                        ))
426                        .await;
427                        continue;
428                    }
429                    return Err(provider_err);
430                }
431            }
432        }
433
434        // Should never reach here, but just in case
435        Ok(())
436    }
437    #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
438    async fn abandon_orchestration_item(
439        &self,
440        lock_token: &str,
441        delay: Option<Duration>,
442        ignore_attempt: bool,
443    ) -> Result<(), ProviderError> {
444        let start = std::time::Instant::now();
445        let now_ms = Self::now_millis();
446        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
447
448        let instance_id = match sqlx::query_scalar::<_, String>(&format!(
449            "SELECT {}.abandon_orchestration_item($1, $2, $3, $4)",
450            self.schema_name
451        ))
452        .bind(lock_token)
453        .bind(now_ms)
454        .bind(delay_param)
455        .bind(ignore_attempt)
456        .fetch_one(&*self.pool)
457        .await
458        {
459            Ok(instance_id) => instance_id,
460            Err(e) => {
461                if let SqlxError::Database(db_err) = &e {
462                    if db_err.message().contains("Invalid lock token") {
463                        return Err(ProviderError::permanent(
464                            "abandon_orchestration_item",
465                            "Invalid lock token",
466                        ));
467                    }
468                } else if e.to_string().contains("Invalid lock token") {
469                    return Err(ProviderError::permanent(
470                        "abandon_orchestration_item",
471                        "Invalid lock token",
472                    ));
473                }
474
475                return Err(Self::sqlx_to_provider_error(
476                    "abandon_orchestration_item",
477                    e,
478                ));
479            }
480        };
481
482        let duration_ms = start.elapsed().as_millis() as u64;
483        debug!(
484            target = "duroxide::providers::postgres",
485            operation = "abandon_orchestration_item",
486            instance_id = %instance_id,
487            delay_ms = delay.map(|d| d.as_millis() as u64),
488            ignore_attempt = ignore_attempt,
489            duration_ms = duration_ms,
490            "Abandoned orchestration item via stored procedure"
491        );
492
493        Ok(())
494    }
495
496    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
497    async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
498        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
499            "SELECT out_event_data FROM {}.fetch_history($1)",
500            self.schema_name
501        ))
502        .bind(instance)
503        .fetch_all(&*self.pool)
504        .await
505        .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
506
507        Ok(event_data_rows
508            .into_iter()
509            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
510            .collect())
511    }
512
513    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
514    async fn append_with_execution(
515        &self,
516        instance: &str,
517        execution_id: u64,
518        new_events: Vec<Event>,
519    ) -> Result<(), ProviderError> {
520        if new_events.is_empty() {
521            return Ok(());
522        }
523
524        let mut events_payload = Vec::with_capacity(new_events.len());
525        for event in &new_events {
526            if event.event_id() == 0 {
527                error!(
528                    target = "duroxide::providers::postgres",
529                    operation = "append_with_execution",
530                    error_type = "validation_error",
531                    instance_id = %instance,
532                    execution_id = execution_id,
533                    "event_id must be set by runtime"
534                );
535                return Err(ProviderError::permanent(
536                    "append_with_execution",
537                    "event_id must be set by runtime",
538                ));
539            }
540
541            let event_json = serde_json::to_string(event).map_err(|e| {
542                ProviderError::permanent(
543                    "append_with_execution",
544                    format!("Failed to serialize event: {e}"),
545                )
546            })?;
547
548            let event_type = format!("{event:?}")
549                .split('{')
550                .next()
551                .unwrap_or("Unknown")
552                .trim()
553                .to_string();
554
555            events_payload.push(serde_json::json!({
556                "event_id": event.event_id(),
557                "event_type": event_type,
558                "event_data": event_json,
559            }));
560        }
561
562        let events_json = serde_json::Value::Array(events_payload);
563
564        sqlx::query(&format!(
565            "SELECT {}.append_history($1, $2, $3)",
566            self.schema_name
567        ))
568        .bind(instance)
569        .bind(execution_id as i64)
570        .bind(events_json)
571        .execute(&*self.pool)
572        .await
573        .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
574
575        debug!(
576            target = "duroxide::providers::postgres",
577            operation = "append_with_execution",
578            instance_id = %instance,
579            execution_id = execution_id,
580            event_count = new_events.len(),
581            "Appended history events via stored procedure"
582        );
583
584        Ok(())
585    }
586
587    #[instrument(skip(self), target = "duroxide::providers::postgres")]
588    async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
589        let work_item = serde_json::to_string(&item).map_err(|e| {
590            ProviderError::permanent(
591                "enqueue_worker_work",
592                format!("Failed to serialize work item: {e}"),
593            )
594        })?;
595
596        let now_ms = Self::now_millis();
597
598        sqlx::query(&format!(
599            "SELECT {}.enqueue_worker_work($1, $2)",
600            self.schema_name
601        ))
602        .bind(work_item)
603        .bind(now_ms)
604        .execute(&*self.pool)
605        .await
606        .map_err(|e| {
607            error!(
608                target = "duroxide::providers::postgres",
609                operation = "enqueue_worker_work",
610                error_type = "database_error",
611                error = %e,
612                "Failed to enqueue worker work"
613            );
614            Self::sqlx_to_provider_error("enqueue_worker_work", e)
615        })?;
616
617        Ok(())
618    }
619
620    #[instrument(skip(self), target = "duroxide::providers::postgres")]
621    async fn fetch_work_item(
622        &self,
623        lock_timeout: Duration,
624        _poll_timeout: Duration,
625    ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
626        let start = std::time::Instant::now();
627
628        // Convert Duration to milliseconds
629        let lock_timeout_ms = lock_timeout.as_millis() as i64;
630
631        let row = match sqlx::query_as::<_, (String, String, i32)>(&format!(
632            "SELECT * FROM {}.fetch_work_item($1, $2)",
633            self.schema_name
634        ))
635        .bind(Self::now_millis())
636        .bind(lock_timeout_ms)
637        .fetch_optional(&*self.pool)
638        .await
639        {
640            Ok(row) => row,
641            Err(e) => {
642                return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
643            }
644        };
645
646        let (work_item_json, lock_token, attempt_count) = match row {
647            Some(row) => row,
648            None => return Ok(None),
649        };
650
651        let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
652            ProviderError::permanent(
653                "fetch_work_item",
654                format!("Failed to deserialize worker item: {e}"),
655            )
656        })?;
657
658        let duration_ms = start.elapsed().as_millis() as u64;
659
660        // Extract instance for logging - different work item types have different structures
661        let instance_id = match &work_item {
662            WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
663            WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
664            WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
665            WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
666            WorkItem::TimerFired { instance, .. } => instance.as_str(),
667            WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
668            WorkItem::CancelInstance { instance, .. } => instance.as_str(),
669            WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
670            WorkItem::SubOrchCompleted {
671                parent_instance, ..
672            } => parent_instance.as_str(),
673            WorkItem::SubOrchFailed {
674                parent_instance, ..
675            } => parent_instance.as_str(),
676        };
677
678        debug!(
679            target = "duroxide::providers::postgres",
680            operation = "fetch_work_item",
681            instance_id = %instance_id,
682            attempt_count = attempt_count,
683            duration_ms = duration_ms,
684            "Fetched activity work item via stored procedure"
685        );
686
687        Ok(Some((work_item, lock_token, attempt_count as u32)))
688    }
689
690    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
691    async fn ack_work_item(&self, token: &str, completion: WorkItem) -> Result<(), ProviderError> {
692        let start = std::time::Instant::now();
693
694        // Extract instance ID from completion WorkItem
695        let instance_id = match &completion {
696            WorkItem::ActivityCompleted { instance, .. }
697            | WorkItem::ActivityFailed { instance, .. } => instance,
698            _ => {
699                error!(
700                    target = "duroxide::providers::postgres",
701                    operation = "ack_worker",
702                    error_type = "invalid_completion_type",
703                    "Invalid completion work item type"
704                );
705                return Err(ProviderError::permanent(
706                    "ack_worker",
707                    "Invalid completion work item type",
708                ));
709            }
710        };
711
712        let completion_json = serde_json::to_string(&completion).map_err(|e| {
713            ProviderError::permanent("ack_worker", format!("Failed to serialize completion: {e}"))
714        })?;
715
716        let now_ms = Self::now_millis();
717
718        // Call stored procedure to atomically delete worker item and enqueue completion
719        sqlx::query(&format!(
720            "SELECT {}.ack_worker($1, $2, $3, $4)",
721            self.schema_name
722        ))
723        .bind(token)
724        .bind(instance_id)
725        .bind(completion_json)
726        .bind(now_ms)
727        .execute(&*self.pool)
728        .await
729        .map_err(|e| {
730            if e.to_string().contains("Worker queue item not found") {
731                error!(
732                    target = "duroxide::providers::postgres",
733                    operation = "ack_worker",
734                    error_type = "worker_item_not_found",
735                    token = %token,
736                    "Worker queue item not found or already processed"
737                );
738                ProviderError::permanent(
739                    "ack_worker",
740                    "Worker queue item not found or already processed",
741                )
742            } else {
743                Self::sqlx_to_provider_error("ack_worker", e)
744            }
745        })?;
746
747        let duration_ms = start.elapsed().as_millis() as u64;
748        debug!(
749            target = "duroxide::providers::postgres",
750            operation = "ack_worker",
751            instance_id = %instance_id,
752            duration_ms = duration_ms,
753            "Acknowledged worker and enqueued completion"
754        );
755
756        Ok(())
757    }
758
759    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
760    async fn renew_work_item_lock(
761        &self,
762        token: &str,
763        extend_for: Duration,
764    ) -> Result<(), ProviderError> {
765        let start = std::time::Instant::now();
766
767        // Get current time from application for consistent time reference
768        let now_ms = Self::now_millis();
769
770        // Convert Duration to seconds for the stored procedure
771        let extend_secs = extend_for.as_secs() as i64;
772
773        match sqlx::query(&format!(
774            "SELECT {}.renew_work_item_lock($1, $2, $3)",
775            self.schema_name
776        ))
777        .bind(token)
778        .bind(now_ms)
779        .bind(extend_secs)
780        .execute(&*self.pool)
781        .await
782        {
783            Ok(_) => {
784                let duration_ms = start.elapsed().as_millis() as u64;
785                debug!(
786                    target = "duroxide::providers::postgres",
787                    operation = "renew_work_item_lock",
788                    token = %token,
789                    extend_for_secs = extend_secs,
790                    duration_ms = duration_ms,
791                    "Work item lock renewed successfully"
792                );
793                Ok(())
794            }
795            Err(e) => {
796                if let SqlxError::Database(db_err) = &e {
797                    if db_err.message().contains("Lock token invalid") {
798                        return Err(ProviderError::permanent(
799                            "renew_work_item_lock",
800                            "Lock token invalid, expired, or already acked",
801                        ));
802                    }
803                } else if e.to_string().contains("Lock token invalid") {
804                    return Err(ProviderError::permanent(
805                        "renew_work_item_lock",
806                        "Lock token invalid, expired, or already acked",
807                    ));
808                }
809
810                Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
811            }
812        }
813    }
814
815    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
816    async fn abandon_work_item(
817        &self,
818        token: &str,
819        delay: Option<Duration>,
820        ignore_attempt: bool,
821    ) -> Result<(), ProviderError> {
822        let start = std::time::Instant::now();
823        let now_ms = Self::now_millis();
824        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
825
826        match sqlx::query(&format!(
827            "SELECT {}.abandon_work_item($1, $2, $3, $4)",
828            self.schema_name
829        ))
830        .bind(token)
831        .bind(now_ms)
832        .bind(delay_param)
833        .bind(ignore_attempt)
834        .execute(&*self.pool)
835        .await
836        {
837            Ok(_) => {
838                let duration_ms = start.elapsed().as_millis() as u64;
839                debug!(
840                    target = "duroxide::providers::postgres",
841                    operation = "abandon_work_item",
842                    token = %token,
843                    delay_ms = delay.map(|d| d.as_millis() as u64),
844                    ignore_attempt = ignore_attempt,
845                    duration_ms = duration_ms,
846                    "Abandoned work item via stored procedure"
847                );
848                Ok(())
849            }
850            Err(e) => {
851                if let SqlxError::Database(db_err) = &e {
852                    if db_err.message().contains("Invalid lock token")
853                        || db_err.message().contains("already acked")
854                    {
855                        return Err(ProviderError::permanent(
856                            "abandon_work_item",
857                            "Invalid lock token or already acked",
858                        ));
859                    }
860                } else if e.to_string().contains("Invalid lock token")
861                    || e.to_string().contains("already acked")
862                {
863                    return Err(ProviderError::permanent(
864                        "abandon_work_item",
865                        "Invalid lock token or already acked",
866                    ));
867                }
868
869                Err(Self::sqlx_to_provider_error("abandon_work_item", e))
870            }
871        }
872    }
873
874    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
875    async fn renew_orchestration_item_lock(
876        &self,
877        token: &str,
878        extend_for: Duration,
879    ) -> Result<(), ProviderError> {
880        let start = std::time::Instant::now();
881
882        // Get current time from application for consistent time reference
883        let now_ms = Self::now_millis();
884
885        // Convert Duration to seconds for the stored procedure
886        let extend_secs = extend_for.as_secs() as i64;
887
888        match sqlx::query(&format!(
889            "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
890            self.schema_name
891        ))
892        .bind(token)
893        .bind(now_ms)
894        .bind(extend_secs)
895        .execute(&*self.pool)
896        .await
897        {
898            Ok(_) => {
899                let duration_ms = start.elapsed().as_millis() as u64;
900                debug!(
901                    target = "duroxide::providers::postgres",
902                    operation = "renew_orchestration_item_lock",
903                    token = %token,
904                    extend_for_secs = extend_secs,
905                    duration_ms = duration_ms,
906                    "Orchestration item lock renewed successfully"
907                );
908                Ok(())
909            }
910            Err(e) => {
911                if let SqlxError::Database(db_err) = &e {
912                    if db_err.message().contains("Lock token invalid")
913                        || db_err.message().contains("expired")
914                        || db_err.message().contains("already released")
915                    {
916                        return Err(ProviderError::permanent(
917                            "renew_orchestration_item_lock",
918                            "Lock token invalid, expired, or already released",
919                        ));
920                    }
921                } else if e.to_string().contains("Lock token invalid")
922                    || e.to_string().contains("expired")
923                    || e.to_string().contains("already released")
924                {
925                    return Err(ProviderError::permanent(
926                        "renew_orchestration_item_lock",
927                        "Lock token invalid, expired, or already released",
928                    ));
929                }
930
931                Err(Self::sqlx_to_provider_error(
932                    "renew_orchestration_item_lock",
933                    e,
934                ))
935            }
936        }
937    }
938
939    #[instrument(skip(self), target = "duroxide::providers::postgres")]
940    async fn enqueue_for_orchestrator(
941        &self,
942        item: WorkItem,
943        delay: Option<Duration>,
944    ) -> Result<(), ProviderError> {
945        let work_item = serde_json::to_string(&item).map_err(|e| {
946            ProviderError::permanent(
947                "enqueue_orchestrator_work",
948                format!("Failed to serialize work item: {e}"),
949            )
950        })?;
951
952        // Extract instance ID from WorkItem enum
953        let instance_id = match &item {
954            WorkItem::StartOrchestration { instance, .. }
955            | WorkItem::ActivityCompleted { instance, .. }
956            | WorkItem::ActivityFailed { instance, .. }
957            | WorkItem::TimerFired { instance, .. }
958            | WorkItem::ExternalRaised { instance, .. }
959            | WorkItem::CancelInstance { instance, .. }
960            | WorkItem::ContinueAsNew { instance, .. } => instance,
961            WorkItem::SubOrchCompleted {
962                parent_instance, ..
963            }
964            | WorkItem::SubOrchFailed {
965                parent_instance, ..
966            } => parent_instance,
967            WorkItem::ActivityExecute { .. } => {
968                return Err(ProviderError::permanent(
969                    "enqueue_orchestrator_work",
970                    "ActivityExecute should go to worker queue, not orchestrator queue",
971                ));
972            }
973        };
974
975        // Determine visible_at: use max of fire_at_ms (for TimerFired) and delay
976        let now_ms = Self::now_millis();
977
978        let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
979            if *fire_at_ms > 0 {
980                // Take max of fire_at_ms and delay (if provided)
981                if let Some(delay) = delay {
982                    std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
983                } else {
984                    *fire_at_ms
985                }
986            } else {
987                // fire_at_ms is 0, use delay or NOW()
988                delay
989                    .map(|d| now_ms as u64 + d.as_millis() as u64)
990                    .unwrap_or(now_ms as u64)
991            }
992        } else {
993            // Non-timer item: use delay or NOW()
994            delay
995                .map(|d| now_ms as u64 + d.as_millis() as u64)
996                .unwrap_or(now_ms as u64)
997        };
998
999        let visible_at = Utc
1000            .timestamp_millis_opt(visible_at_ms as i64)
1001            .single()
1002            .ok_or_else(|| {
1003                ProviderError::permanent(
1004                    "enqueue_orchestrator_work",
1005                    "Invalid visible_at timestamp",
1006                )
1007            })?;
1008
1009        // ⚠️ CRITICAL: DO NOT extract orchestration metadata - instance creation happens via ack_orchestration_item metadata
1010        // Pass NULL for orchestration_name, orchestration_version, execution_id parameters
1011
1012        // Call stored procedure to enqueue work
1013        sqlx::query(&format!(
1014            "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
1015            self.schema_name
1016        ))
1017        .bind(instance_id)
1018        .bind(&work_item)
1019        .bind(visible_at)
1020        .bind::<Option<String>>(None) // orchestration_name - NULL
1021        .bind::<Option<String>>(None) // orchestration_version - NULL
1022        .bind::<Option<i64>>(None) // execution_id - NULL
1023        .execute(&*self.pool)
1024        .await
1025        .map_err(|e| {
1026            error!(
1027                target = "duroxide::providers::postgres",
1028                operation = "enqueue_orchestrator_work",
1029                error_type = "database_error",
1030                error = %e,
1031                instance_id = %instance_id,
1032                "Failed to enqueue orchestrator work"
1033            );
1034            Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1035        })?;
1036
1037        debug!(
1038            target = "duroxide::providers::postgres",
1039            operation = "enqueue_orchestrator_work",
1040            instance_id = %instance_id,
1041            delay_ms = delay.map(|d| d.as_millis() as u64),
1042            "Enqueued orchestrator work"
1043        );
1044
1045        Ok(())
1046    }
1047
1048    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1049    async fn read_with_execution(
1050        &self,
1051        instance: &str,
1052        execution_id: u64,
1053    ) -> Result<Vec<Event>, ProviderError> {
1054        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1055            "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1056            self.table_name("history")
1057        ))
1058        .bind(instance)
1059        .bind(execution_id as i64)
1060        .fetch_all(&*self.pool)
1061        .await
1062        .ok()
1063        .unwrap_or_default();
1064
1065        Ok(event_data_rows
1066            .into_iter()
1067            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1068            .collect())
1069    }
1070
1071    fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1072        Some(self)
1073    }
1074}
1075
1076#[async_trait::async_trait]
1077impl ProviderAdmin for PostgresProvider {
1078    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1079    async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1080        sqlx::query_scalar(&format!(
1081            "SELECT instance_id FROM {}.list_instances()",
1082            self.schema_name
1083        ))
1084        .fetch_all(&*self.pool)
1085        .await
1086        .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1087    }
1088
1089    #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1090    async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1091        sqlx::query_scalar(&format!(
1092            "SELECT instance_id FROM {}.list_instances_by_status($1)",
1093            self.schema_name
1094        ))
1095        .bind(status)
1096        .fetch_all(&*self.pool)
1097        .await
1098        .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1099    }
1100
1101    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1102    async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1103        let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1104            "SELECT execution_id FROM {}.list_executions($1)",
1105            self.schema_name
1106        ))
1107        .bind(instance)
1108        .fetch_all(&*self.pool)
1109        .await
1110        .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1111
1112        Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1113    }
1114
1115    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1116    async fn read_history_with_execution_id(
1117        &self,
1118        instance: &str,
1119        execution_id: u64,
1120    ) -> Result<Vec<Event>, ProviderError> {
1121        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1122            "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1123            self.schema_name
1124        ))
1125        .bind(instance)
1126        .bind(execution_id as i64)
1127        .fetch_all(&*self.pool)
1128        .await
1129        .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1130
1131        event_data_rows
1132            .into_iter()
1133            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1134            .collect::<Vec<Event>>()
1135            .into_iter()
1136            .map(Ok)
1137            .collect()
1138    }
1139
1140    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1141    async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1142        let execution_id = self.latest_execution_id(instance).await?;
1143        self.read_history_with_execution_id(instance, execution_id)
1144            .await
1145    }
1146
1147    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1148    async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1149        sqlx::query_scalar(&format!(
1150            "SELECT {}.latest_execution_id($1)",
1151            self.schema_name
1152        ))
1153        .bind(instance)
1154        .fetch_optional(&*self.pool)
1155        .await
1156        .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1157        .map(|id: i64| id as u64)
1158        .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1159    }
1160
1161    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1162    async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1163        let row: Option<(
1164            String,
1165            String,
1166            String,
1167            i64,
1168            chrono::DateTime<Utc>,
1169            Option<chrono::DateTime<Utc>>,
1170            Option<String>,
1171            Option<String>,
1172        )> = sqlx::query_as(&format!(
1173            "SELECT * FROM {}.get_instance_info($1)",
1174            self.schema_name
1175        ))
1176        .bind(instance)
1177        .fetch_optional(&*self.pool)
1178        .await
1179        .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1180
1181        let (
1182            instance_id,
1183            orchestration_name,
1184            orchestration_version,
1185            current_execution_id,
1186            created_at,
1187            updated_at,
1188            status,
1189            output,
1190        ) =
1191            row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1192
1193        Ok(InstanceInfo {
1194            instance_id,
1195            orchestration_name,
1196            orchestration_version,
1197            current_execution_id: current_execution_id as u64,
1198            status: status.unwrap_or_else(|| "Running".to_string()),
1199            output,
1200            created_at: created_at.timestamp_millis() as u64,
1201            updated_at: updated_at
1202                .map(|dt| dt.timestamp_millis() as u64)
1203                .unwrap_or(created_at.timestamp_millis() as u64),
1204        })
1205    }
1206
1207    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1208    async fn get_execution_info(
1209        &self,
1210        instance: &str,
1211        execution_id: u64,
1212    ) -> Result<ExecutionInfo, ProviderError> {
1213        let row: Option<(
1214            i64,
1215            String,
1216            Option<String>,
1217            chrono::DateTime<Utc>,
1218            Option<chrono::DateTime<Utc>>,
1219            i64,
1220        )> = sqlx::query_as(&format!(
1221            "SELECT * FROM {}.get_execution_info($1, $2)",
1222            self.schema_name
1223        ))
1224        .bind(instance)
1225        .bind(execution_id as i64)
1226        .fetch_optional(&*self.pool)
1227        .await
1228        .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1229
1230        let (exec_id, status, output, started_at, completed_at, event_count) = row
1231            .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1232
1233        Ok(ExecutionInfo {
1234            execution_id: exec_id as u64,
1235            status,
1236            output,
1237            started_at: started_at.timestamp_millis() as u64,
1238            completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1239            event_count: event_count as usize,
1240        })
1241    }
1242
1243    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1244    async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1245        let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1246            "SELECT * FROM {}.get_system_metrics()",
1247            self.schema_name
1248        ))
1249        .fetch_optional(&*self.pool)
1250        .await
1251        .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1252
1253        let (
1254            total_instances,
1255            total_executions,
1256            running_instances,
1257            completed_instances,
1258            failed_instances,
1259            total_events,
1260        ) = row.ok_or_else(|| {
1261            ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1262        })?;
1263
1264        Ok(SystemMetrics {
1265            total_instances: total_instances as u64,
1266            total_executions: total_executions as u64,
1267            running_instances: running_instances as u64,
1268            completed_instances: completed_instances as u64,
1269            failed_instances: failed_instances as u64,
1270            total_events: total_events as u64,
1271        })
1272    }
1273
1274    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1275    async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1276        let now_ms = Self::now_millis();
1277
1278        let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1279            "SELECT * FROM {}.get_queue_depths($1)",
1280            self.schema_name
1281        ))
1282        .bind(now_ms)
1283        .fetch_optional(&*self.pool)
1284        .await
1285        .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1286
1287        let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1288            ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1289        })?;
1290
1291        Ok(QueueDepths {
1292            orchestrator_queue: orchestrator_queue as usize,
1293            worker_queue: worker_queue as usize,
1294            timer_queue: 0, // Timers are in orchestrator queue with delayed visibility
1295        })
1296    }
1297}