duroxide_pg/
provider.rs

1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4    ExecutionInfo, ExecutionMetadata, InstanceInfo, OrchestrationItem, Provider, ProviderAdmin,
5    ProviderError, QueueDepths, SystemMetrics, WorkItem,
6};
7use duroxide::Event;
8use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
9use std::sync::Arc;
10use std::time::Duration;
11use std::time::{SystemTime, UNIX_EPOCH};
12use tokio::time::sleep;
13use tracing::{debug, error, instrument, warn};
14
15use crate::migrations::MigrationRunner;
16
17/// PostgreSQL-based provider for Duroxide durable orchestrations.
18///
19/// Implements the [`Provider`] and [`ProviderAdmin`] traits from Duroxide,
20/// storing orchestration state, history, and work queues in PostgreSQL.
21///
22/// # Example
23///
24/// ```rust,no_run
25/// use duroxide_pg::PostgresProvider;
26///
27/// # async fn example() -> anyhow::Result<()> {
28/// // Connect using DATABASE_URL or explicit connection string
29/// let provider = PostgresProvider::new("postgres://localhost/mydb").await?;
30///
31/// // Or use a custom schema for isolation
32/// let provider = PostgresProvider::new_with_schema(
33///     "postgres://localhost/mydb",
34///     Some("my_app"),
35/// ).await?;
36/// # Ok(())
37/// # }
38/// ```
39pub struct PostgresProvider {
40    pool: Arc<PgPool>,
41    schema_name: String,
42}
43
44impl PostgresProvider {
45    pub async fn new(database_url: &str) -> Result<Self> {
46        Self::new_with_schema(database_url, None).await
47    }
48
49    pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
50        let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
51            .ok()
52            .and_then(|s| s.parse::<u32>().ok())
53            .unwrap_or(10);
54
55        let pool = PgPoolOptions::new()
56            .max_connections(max_connections)
57            .min_connections(1)
58            .acquire_timeout(std::time::Duration::from_secs(30))
59            .connect(database_url)
60            .await?;
61
62        let schema_name = schema_name.unwrap_or("public").to_string();
63
64        let provider = Self {
65            pool: Arc::new(pool),
66            schema_name: schema_name.clone(),
67        };
68
69        // Run migrations to initialize schema
70        let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
71        migration_runner.migrate().await?;
72
73        Ok(provider)
74    }
75
76    #[instrument(skip(self), target = "duroxide::providers::postgres")]
77    pub async fn initialize_schema(&self) -> Result<()> {
78        // Schema initialization is now handled by migrations
79        // This method is kept for backward compatibility but delegates to migrations
80        let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
81        migration_runner.migrate().await?;
82        Ok(())
83    }
84
85    /// Get current timestamp in milliseconds (Unix epoch)
86    fn now_millis() -> i64 {
87        SystemTime::now()
88            .duration_since(UNIX_EPOCH)
89            .unwrap()
90            .as_millis() as i64
91    }
92
93    /// Get schema-qualified table name
94    fn table_name(&self, table: &str) -> String {
95        format!("{}.{}", self.schema_name, table)
96    }
97
98    /// Get the database pool (for testing)
99    pub fn pool(&self) -> &PgPool {
100        &self.pool
101    }
102
103    /// Get the schema name (for testing)
104    pub fn schema_name(&self) -> &str {
105        &self.schema_name
106    }
107
108    /// Convert sqlx::Error to ProviderError with proper classification
109    fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
110        match e {
111            SqlxError::Database(ref db_err) => {
112                // PostgreSQL error codes
113                let code_opt = db_err.code();
114                let code = code_opt.as_deref();
115                if code == Some("40P01") {
116                    // Deadlock detected
117                    ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
118                } else if code == Some("40001") {
119                    // Serialization failure - permanent error (transaction conflict, not transient)
120                    ProviderError::permanent(operation, format!("Serialization failure: {e}"))
121                } else if code == Some("23505") {
122                    // Unique constraint violation (duplicate event)
123                    ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
124                } else if code == Some("23503") {
125                    // Foreign key constraint violation
126                    ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
127                } else {
128                    ProviderError::permanent(operation, format!("Database error: {e}"))
129                }
130            }
131            SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
132                ProviderError::retryable(operation, format!("Connection pool error: {e}"))
133            }
134            SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
135            _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
136        }
137    }
138
139    /// Clean up schema after tests (drops all tables and optionally the schema)
140    ///
141    /// **SAFETY**: Never drops the "public" schema itself, only tables within it.
142    /// Only drops the schema if it's a custom schema (not "public").
143    pub async fn cleanup_schema(&self) -> Result<()> {
144        // Call the stored procedure to drop all tables
145        sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
146            .execute(&*self.pool)
147            .await?;
148
149        // SAFETY: Never drop the "public" schema - it's a PostgreSQL system schema
150        // Only drop custom schemas created for testing
151        if self.schema_name != "public" {
152            sqlx::query(&format!(
153                "DROP SCHEMA IF EXISTS {} CASCADE",
154                self.schema_name
155            ))
156            .execute(&*self.pool)
157            .await?;
158        } else {
159            // Explicit safeguard: we only drop tables from public schema, never the schema itself
160            // This ensures we don't accidentally drop the default PostgreSQL schema
161        }
162
163        Ok(())
164    }
165}
166
167#[async_trait::async_trait]
168impl Provider for PostgresProvider {
169    #[instrument(skip(self), target = "duroxide::providers::postgres")]
170    async fn fetch_orchestration_item(
171        &self,
172        lock_timeout: Duration,
173        _poll_timeout: Duration,
174    ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
175        let start = std::time::Instant::now();
176
177        const MAX_RETRIES: u32 = 3;
178        const RETRY_DELAY_MS: u64 = 50;
179
180        // Convert Duration to milliseconds
181        let lock_timeout_ms = lock_timeout.as_millis() as i64;
182        let mut _last_error: Option<ProviderError> = None;
183
184        for attempt in 0..=MAX_RETRIES {
185            let now_ms = Self::now_millis();
186
187            let result: Result<
188                Option<(
189                    String,
190                    String,
191                    String,
192                    i64,
193                    serde_json::Value,
194                    serde_json::Value,
195                    String,
196                    i32,
197                )>,
198                SqlxError,
199            > = sqlx::query_as(&format!(
200                "SELECT * FROM {}.fetch_orchestration_item($1, $2)",
201                self.schema_name
202            ))
203            .bind(now_ms)
204            .bind(lock_timeout_ms)
205            .fetch_optional(&*self.pool)
206            .await;
207
208            let row = match result {
209                Ok(r) => r,
210                Err(e) => {
211                    let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
212                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
213                        warn!(
214                            target = "duroxide::providers::postgres",
215                            operation = "fetch_orchestration_item",
216                            attempt = attempt + 1,
217                            error = %provider_err,
218                            "Retryable error, will retry"
219                        );
220                        _last_error = Some(provider_err);
221                        sleep(std::time::Duration::from_millis(
222                            RETRY_DELAY_MS * (attempt as u64 + 1),
223                        ))
224                        .await;
225                        continue;
226                    }
227                    return Err(provider_err);
228                }
229            };
230
231            if let Some((
232                instance_id,
233                orchestration_name,
234                orchestration_version,
235                execution_id,
236                history_json,
237                messages_json,
238                lock_token,
239                attempt_count,
240            )) = row
241            {
242                let history: Vec<Event> = serde_json::from_value(history_json).map_err(|e| {
243                    ProviderError::permanent(
244                        "fetch_orchestration_item",
245                        format!("Failed to deserialize history: {e}"),
246                    )
247                })?;
248
249                let messages: Vec<WorkItem> =
250                    serde_json::from_value(messages_json).map_err(|e| {
251                        ProviderError::permanent(
252                            "fetch_orchestration_item",
253                            format!("Failed to deserialize messages: {e}"),
254                        )
255                    })?;
256
257                let duration_ms = start.elapsed().as_millis() as u64;
258                debug!(
259                    target = "duroxide::providers::postgres",
260                    operation = "fetch_orchestration_item",
261                    instance_id = %instance_id,
262                    execution_id = execution_id,
263                    message_count = messages.len(),
264                    history_count = history.len(),
265                    attempt_count = attempt_count,
266                    duration_ms = duration_ms,
267                    attempts = attempt + 1,
268                    "Fetched orchestration item via stored procedure"
269                );
270
271                return Ok(Some((
272                    OrchestrationItem {
273                        instance: instance_id,
274                        orchestration_name,
275                        execution_id: execution_id as u64,
276                        version: orchestration_version,
277                        history,
278                        messages,
279                    },
280                    lock_token,
281                    attempt_count as u32,
282                )));
283            }
284
285            if attempt < MAX_RETRIES {
286                sleep(std::time::Duration::from_millis(RETRY_DELAY_MS)).await;
287            }
288        }
289
290        Ok(None)
291    }
292    #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
293    async fn ack_orchestration_item(
294        &self,
295        lock_token: &str,
296        execution_id: u64,
297        history_delta: Vec<Event>,
298        worker_items: Vec<WorkItem>,
299        orchestrator_items: Vec<WorkItem>,
300        metadata: ExecutionMetadata,
301    ) -> Result<(), ProviderError> {
302        let start = std::time::Instant::now();
303
304        const MAX_RETRIES: u32 = 3;
305        const RETRY_DELAY_MS: u64 = 50;
306
307        let mut history_delta_payload = Vec::with_capacity(history_delta.len());
308        for event in &history_delta {
309            if event.event_id() == 0 {
310                return Err(ProviderError::permanent(
311                    "ack_orchestration_item",
312                    "event_id must be set by runtime",
313                ));
314            }
315
316            let event_json = serde_json::to_string(event).map_err(|e| {
317                ProviderError::permanent(
318                    "ack_orchestration_item",
319                    format!("Failed to serialize event: {e}"),
320                )
321            })?;
322
323            let event_type = format!("{event:?}")
324                .split('{')
325                .next()
326                .unwrap_or("Unknown")
327                .trim()
328                .to_string();
329
330            history_delta_payload.push(serde_json::json!({
331                "event_id": event.event_id(),
332                "event_type": event_type,
333                "event_data": event_json,
334            }));
335        }
336
337        let history_delta_json = serde_json::Value::Array(history_delta_payload);
338
339        let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
340            ProviderError::permanent(
341                "ack_orchestration_item",
342                format!("Failed to serialize worker items: {e}"),
343            )
344        })?;
345
346        let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
347            ProviderError::permanent(
348                "ack_orchestration_item",
349                format!("Failed to serialize orchestrator items: {e}"),
350            )
351        })?;
352
353        let metadata_json = serde_json::json!({
354            "orchestration_name": metadata.orchestration_name,
355            "orchestration_version": metadata.orchestration_version,
356            "status": metadata.status,
357            "output": metadata.output,
358        });
359
360        for attempt in 0..=MAX_RETRIES {
361            let result = sqlx::query(&format!(
362                "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6)",
363                self.schema_name
364            ))
365            .bind(lock_token)
366            .bind(execution_id as i64)
367            .bind(&history_delta_json)
368            .bind(&worker_items_json)
369            .bind(&orchestrator_items_json)
370            .bind(&metadata_json)
371            .execute(&*self.pool)
372            .await;
373
374            match result {
375                Ok(_) => {
376                    let duration_ms = start.elapsed().as_millis() as u64;
377                    debug!(
378                        target = "duroxide::providers::postgres",
379                        operation = "ack_orchestration_item",
380                        execution_id = execution_id,
381                        history_count = history_delta.len(),
382                        worker_items_count = worker_items.len(),
383                        orchestrator_items_count = orchestrator_items.len(),
384                        duration_ms = duration_ms,
385                        attempts = attempt + 1,
386                        "Acknowledged orchestration item via stored procedure"
387                    );
388                    return Ok(());
389                }
390                Err(e) => {
391                    // Check for permanent errors first
392                    if let SqlxError::Database(db_err) = &e {
393                        if db_err.message().contains("Invalid lock token") {
394                            return Err(ProviderError::permanent(
395                                "ack_orchestration_item",
396                                "Invalid lock token",
397                            ));
398                        }
399                    } else if e.to_string().contains("Invalid lock token") {
400                        return Err(ProviderError::permanent(
401                            "ack_orchestration_item",
402                            "Invalid lock token",
403                        ));
404                    }
405
406                    let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
407                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
408                        warn!(
409                            target = "duroxide::providers::postgres",
410                            operation = "ack_orchestration_item",
411                            attempt = attempt + 1,
412                            error = %provider_err,
413                            "Retryable error, will retry"
414                        );
415                        sleep(std::time::Duration::from_millis(
416                            RETRY_DELAY_MS * (attempt as u64 + 1),
417                        ))
418                        .await;
419                        continue;
420                    }
421                    return Err(provider_err);
422                }
423            }
424        }
425
426        // Should never reach here, but just in case
427        Ok(())
428    }
429    #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
430    async fn abandon_orchestration_item(
431        &self,
432        lock_token: &str,
433        delay: Option<Duration>,
434        ignore_attempt: bool,
435    ) -> Result<(), ProviderError> {
436        let start = std::time::Instant::now();
437        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
438
439        let instance_id = match sqlx::query_scalar::<_, String>(&format!(
440            "SELECT {}.abandon_orchestration_item($1, $2, $3)",
441            self.schema_name
442        ))
443        .bind(lock_token)
444        .bind(delay_param)
445        .bind(ignore_attempt)
446        .fetch_one(&*self.pool)
447        .await
448        {
449            Ok(instance_id) => instance_id,
450            Err(e) => {
451                if let SqlxError::Database(db_err) = &e {
452                    if db_err.message().contains("Invalid lock token") {
453                        return Err(ProviderError::permanent(
454                            "abandon_orchestration_item",
455                            "Invalid lock token",
456                        ));
457                    }
458                } else if e.to_string().contains("Invalid lock token") {
459                    return Err(ProviderError::permanent(
460                        "abandon_orchestration_item",
461                        "Invalid lock token",
462                    ));
463                }
464
465                return Err(Self::sqlx_to_provider_error(
466                    "abandon_orchestration_item",
467                    e,
468                ));
469            }
470        };
471
472        let duration_ms = start.elapsed().as_millis() as u64;
473        debug!(
474            target = "duroxide::providers::postgres",
475            operation = "abandon_orchestration_item",
476            instance_id = %instance_id,
477            delay_ms = delay.map(|d| d.as_millis() as u64),
478            ignore_attempt = ignore_attempt,
479            duration_ms = duration_ms,
480            "Abandoned orchestration item via stored procedure"
481        );
482
483        Ok(())
484    }
485
486    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
487    async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
488        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
489            "SELECT out_event_data FROM {}.fetch_history($1)",
490            self.schema_name
491        ))
492        .bind(instance)
493        .fetch_all(&*self.pool)
494        .await
495        .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
496
497        Ok(event_data_rows
498            .into_iter()
499            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
500            .collect())
501    }
502
503    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
504    async fn append_with_execution(
505        &self,
506        instance: &str,
507        execution_id: u64,
508        new_events: Vec<Event>,
509    ) -> Result<(), ProviderError> {
510        if new_events.is_empty() {
511            return Ok(());
512        }
513
514        let mut events_payload = Vec::with_capacity(new_events.len());
515        for event in &new_events {
516            if event.event_id() == 0 {
517                error!(
518                    target = "duroxide::providers::postgres",
519                    operation = "append_with_execution",
520                    error_type = "validation_error",
521                    instance_id = %instance,
522                    execution_id = execution_id,
523                    "event_id must be set by runtime"
524                );
525                return Err(ProviderError::permanent(
526                    "append_with_execution",
527                    "event_id must be set by runtime",
528                ));
529            }
530
531            let event_json = serde_json::to_string(event).map_err(|e| {
532                ProviderError::permanent(
533                    "append_with_execution",
534                    format!("Failed to serialize event: {e}"),
535                )
536            })?;
537
538            let event_type = format!("{event:?}")
539                .split('{')
540                .next()
541                .unwrap_or("Unknown")
542                .trim()
543                .to_string();
544
545            events_payload.push(serde_json::json!({
546                "event_id": event.event_id(),
547                "event_type": event_type,
548                "event_data": event_json,
549            }));
550        }
551
552        let events_json = serde_json::Value::Array(events_payload);
553
554        sqlx::query(&format!(
555            "SELECT {}.append_history($1, $2, $3)",
556            self.schema_name
557        ))
558        .bind(instance)
559        .bind(execution_id as i64)
560        .bind(events_json)
561        .execute(&*self.pool)
562        .await
563        .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
564
565        debug!(
566            target = "duroxide::providers::postgres",
567            operation = "append_with_execution",
568            instance_id = %instance,
569            execution_id = execution_id,
570            event_count = new_events.len(),
571            "Appended history events via stored procedure"
572        );
573
574        Ok(())
575    }
576
577    #[instrument(skip(self), target = "duroxide::providers::postgres")]
578    async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
579        let work_item = serde_json::to_string(&item).map_err(|e| {
580            ProviderError::permanent(
581                "enqueue_worker_work",
582                format!("Failed to serialize work item: {e}"),
583            )
584        })?;
585
586        sqlx::query(&format!(
587            "SELECT {}.enqueue_worker_work($1)",
588            self.schema_name
589        ))
590        .bind(work_item)
591        .execute(&*self.pool)
592        .await
593        .map_err(|e| {
594            error!(
595                target = "duroxide::providers::postgres",
596                operation = "enqueue_worker_work",
597                error_type = "database_error",
598                error = %e,
599                "Failed to enqueue worker work"
600            );
601            Self::sqlx_to_provider_error("enqueue_worker_work", e)
602        })?;
603
604        Ok(())
605    }
606
607    #[instrument(skip(self), target = "duroxide::providers::postgres")]
608    async fn fetch_work_item(
609        &self,
610        lock_timeout: Duration,
611        _poll_timeout: Duration,
612    ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
613        let start = std::time::Instant::now();
614
615        // Convert Duration to milliseconds
616        let lock_timeout_ms = lock_timeout.as_millis() as i64;
617
618        let row = match sqlx::query_as::<_, (String, String, i32)>(&format!(
619            "SELECT * FROM {}.fetch_work_item($1, $2)",
620            self.schema_name
621        ))
622        .bind(Self::now_millis())
623        .bind(lock_timeout_ms)
624        .fetch_optional(&*self.pool)
625        .await
626        {
627            Ok(row) => row,
628            Err(e) => {
629                return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
630            }
631        };
632
633        let (work_item_json, lock_token, attempt_count) = match row {
634            Some(row) => row,
635            None => return Ok(None),
636        };
637
638        let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
639            ProviderError::permanent(
640                "fetch_work_item",
641                format!("Failed to deserialize worker item: {e}"),
642            )
643        })?;
644
645        let duration_ms = start.elapsed().as_millis() as u64;
646
647        // Extract instance for logging - different work item types have different structures
648        let instance_id = match &work_item {
649            WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
650            WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
651            WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
652            WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
653            WorkItem::TimerFired { instance, .. } => instance.as_str(),
654            WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
655            WorkItem::CancelInstance { instance, .. } => instance.as_str(),
656            WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
657            WorkItem::SubOrchCompleted {
658                parent_instance, ..
659            } => parent_instance.as_str(),
660            WorkItem::SubOrchFailed {
661                parent_instance, ..
662            } => parent_instance.as_str(),
663        };
664
665        debug!(
666            target = "duroxide::providers::postgres",
667            operation = "fetch_work_item",
668            instance_id = %instance_id,
669            attempt_count = attempt_count,
670            duration_ms = duration_ms,
671            "Fetched activity work item via stored procedure"
672        );
673
674        Ok(Some((work_item, lock_token, attempt_count as u32)))
675    }
676
677    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
678    async fn ack_work_item(&self, token: &str, completion: WorkItem) -> Result<(), ProviderError> {
679        let start = std::time::Instant::now();
680
681        // Extract instance ID from completion WorkItem
682        let instance_id = match &completion {
683            WorkItem::ActivityCompleted { instance, .. }
684            | WorkItem::ActivityFailed { instance, .. } => instance,
685            _ => {
686                error!(
687                    target = "duroxide::providers::postgres",
688                    operation = "ack_worker",
689                    error_type = "invalid_completion_type",
690                    "Invalid completion work item type"
691                );
692                return Err(ProviderError::permanent(
693                    "ack_worker",
694                    "Invalid completion work item type",
695                ));
696            }
697        };
698
699        let completion_json = serde_json::to_string(&completion).map_err(|e| {
700            ProviderError::permanent("ack_worker", format!("Failed to serialize completion: {e}"))
701        })?;
702
703        // Call stored procedure to atomically delete worker item and enqueue completion
704        sqlx::query(&format!(
705            "SELECT {}.ack_worker($1, $2, $3)",
706            self.schema_name
707        ))
708        .bind(token)
709        .bind(instance_id)
710        .bind(completion_json)
711        .execute(&*self.pool)
712        .await
713        .map_err(|e| {
714            if e.to_string().contains("Worker queue item not found") {
715                error!(
716                    target = "duroxide::providers::postgres",
717                    operation = "ack_worker",
718                    error_type = "worker_item_not_found",
719                    token = %token,
720                    "Worker queue item not found or already processed"
721                );
722                ProviderError::permanent(
723                    "ack_worker",
724                    "Worker queue item not found or already processed",
725                )
726            } else {
727                Self::sqlx_to_provider_error("ack_worker", e)
728            }
729        })?;
730
731        let duration_ms = start.elapsed().as_millis() as u64;
732        debug!(
733            target = "duroxide::providers::postgres",
734            operation = "ack_worker",
735            instance_id = %instance_id,
736            duration_ms = duration_ms,
737            "Acknowledged worker and enqueued completion"
738        );
739
740        Ok(())
741    }
742
743    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
744    async fn renew_work_item_lock(
745        &self,
746        token: &str,
747        extend_for: Duration,
748    ) -> Result<(), ProviderError> {
749        let start = std::time::Instant::now();
750
751        // Get current time from application for consistent time reference
752        let now_ms = Self::now_millis();
753
754        // Convert Duration to seconds for the stored procedure
755        let extend_secs = extend_for.as_secs() as i64;
756
757        match sqlx::query(&format!(
758            "SELECT {}.renew_work_item_lock($1, $2, $3)",
759            self.schema_name
760        ))
761        .bind(token)
762        .bind(now_ms)
763        .bind(extend_secs)
764        .execute(&*self.pool)
765        .await
766        {
767            Ok(_) => {
768                let duration_ms = start.elapsed().as_millis() as u64;
769                debug!(
770                    target = "duroxide::providers::postgres",
771                    operation = "renew_work_item_lock",
772                    token = %token,
773                    extend_for_secs = extend_secs,
774                    duration_ms = duration_ms,
775                    "Work item lock renewed successfully"
776                );
777                Ok(())
778            }
779            Err(e) => {
780                if let SqlxError::Database(db_err) = &e {
781                    if db_err.message().contains("Lock token invalid") {
782                        return Err(ProviderError::permanent(
783                            "renew_work_item_lock",
784                            "Lock token invalid, expired, or already acked",
785                        ));
786                    }
787                } else if e.to_string().contains("Lock token invalid") {
788                    return Err(ProviderError::permanent(
789                        "renew_work_item_lock",
790                        "Lock token invalid, expired, or already acked",
791                    ));
792                }
793
794                Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
795            }
796        }
797    }
798
799    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
800    async fn abandon_work_item(
801        &self,
802        token: &str,
803        delay: Option<Duration>,
804        ignore_attempt: bool,
805    ) -> Result<(), ProviderError> {
806        let start = std::time::Instant::now();
807        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
808
809        match sqlx::query(&format!(
810            "SELECT {}.abandon_work_item($1, $2, $3)",
811            self.schema_name
812        ))
813        .bind(token)
814        .bind(delay_param)
815        .bind(ignore_attempt)
816        .execute(&*self.pool)
817        .await
818        {
819            Ok(_) => {
820                let duration_ms = start.elapsed().as_millis() as u64;
821                debug!(
822                    target = "duroxide::providers::postgres",
823                    operation = "abandon_work_item",
824                    token = %token,
825                    delay_ms = delay.map(|d| d.as_millis() as u64),
826                    ignore_attempt = ignore_attempt,
827                    duration_ms = duration_ms,
828                    "Abandoned work item via stored procedure"
829                );
830                Ok(())
831            }
832            Err(e) => {
833                if let SqlxError::Database(db_err) = &e {
834                    if db_err.message().contains("Invalid lock token")
835                        || db_err.message().contains("already acked")
836                    {
837                        return Err(ProviderError::permanent(
838                            "abandon_work_item",
839                            "Invalid lock token or already acked",
840                        ));
841                    }
842                } else if e.to_string().contains("Invalid lock token")
843                    || e.to_string().contains("already acked")
844                {
845                    return Err(ProviderError::permanent(
846                        "abandon_work_item",
847                        "Invalid lock token or already acked",
848                    ));
849                }
850
851                Err(Self::sqlx_to_provider_error("abandon_work_item", e))
852            }
853        }
854    }
855
856    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
857    async fn renew_orchestration_item_lock(
858        &self,
859        token: &str,
860        extend_for: Duration,
861    ) -> Result<(), ProviderError> {
862        let start = std::time::Instant::now();
863
864        // Get current time from application for consistent time reference
865        let now_ms = Self::now_millis();
866
867        // Convert Duration to seconds for the stored procedure
868        let extend_secs = extend_for.as_secs() as i64;
869
870        match sqlx::query(&format!(
871            "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
872            self.schema_name
873        ))
874        .bind(token)
875        .bind(now_ms)
876        .bind(extend_secs)
877        .execute(&*self.pool)
878        .await
879        {
880            Ok(_) => {
881                let duration_ms = start.elapsed().as_millis() as u64;
882                debug!(
883                    target = "duroxide::providers::postgres",
884                    operation = "renew_orchestration_item_lock",
885                    token = %token,
886                    extend_for_secs = extend_secs,
887                    duration_ms = duration_ms,
888                    "Orchestration item lock renewed successfully"
889                );
890                Ok(())
891            }
892            Err(e) => {
893                if let SqlxError::Database(db_err) = &e {
894                    if db_err.message().contains("Lock token invalid")
895                        || db_err.message().contains("expired")
896                        || db_err.message().contains("already released")
897                    {
898                        return Err(ProviderError::permanent(
899                            "renew_orchestration_item_lock",
900                            "Lock token invalid, expired, or already released",
901                        ));
902                    }
903                } else if e.to_string().contains("Lock token invalid")
904                    || e.to_string().contains("expired")
905                    || e.to_string().contains("already released")
906                {
907                    return Err(ProviderError::permanent(
908                        "renew_orchestration_item_lock",
909                        "Lock token invalid, expired, or already released",
910                    ));
911                }
912
913                Err(Self::sqlx_to_provider_error(
914                    "renew_orchestration_item_lock",
915                    e,
916                ))
917            }
918        }
919    }
920
921    #[instrument(skip(self), target = "duroxide::providers::postgres")]
922    async fn enqueue_for_orchestrator(
923        &self,
924        item: WorkItem,
925        delay: Option<Duration>,
926    ) -> Result<(), ProviderError> {
927        let work_item = serde_json::to_string(&item).map_err(|e| {
928            ProviderError::permanent(
929                "enqueue_orchestrator_work",
930                format!("Failed to serialize work item: {e}"),
931            )
932        })?;
933
934        // Extract instance ID from WorkItem enum
935        let instance_id = match &item {
936            WorkItem::StartOrchestration { instance, .. }
937            | WorkItem::ActivityCompleted { instance, .. }
938            | WorkItem::ActivityFailed { instance, .. }
939            | WorkItem::TimerFired { instance, .. }
940            | WorkItem::ExternalRaised { instance, .. }
941            | WorkItem::CancelInstance { instance, .. }
942            | WorkItem::ContinueAsNew { instance, .. } => instance,
943            WorkItem::SubOrchCompleted {
944                parent_instance, ..
945            }
946            | WorkItem::SubOrchFailed {
947                parent_instance, ..
948            } => parent_instance,
949            WorkItem::ActivityExecute { .. } => {
950                return Err(ProviderError::permanent(
951                    "enqueue_orchestrator_work",
952                    "ActivityExecute should go to worker queue, not orchestrator queue",
953                ));
954            }
955        };
956
957        // Determine visible_at: use max of fire_at_ms (for TimerFired) and delay
958        let now_ms = Self::now_millis();
959
960        let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
961            if *fire_at_ms > 0 {
962                // Take max of fire_at_ms and delay (if provided)
963                if let Some(delay) = delay {
964                    std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
965                } else {
966                    *fire_at_ms
967                }
968            } else {
969                // fire_at_ms is 0, use delay or NOW()
970                delay
971                    .map(|d| now_ms as u64 + d.as_millis() as u64)
972                    .unwrap_or(now_ms as u64)
973            }
974        } else {
975            // Non-timer item: use delay or NOW()
976            delay
977                .map(|d| now_ms as u64 + d.as_millis() as u64)
978                .unwrap_or(now_ms as u64)
979        };
980
981        let visible_at = Utc
982            .timestamp_millis_opt(visible_at_ms as i64)
983            .single()
984            .ok_or_else(|| {
985                ProviderError::permanent(
986                    "enqueue_orchestrator_work",
987                    "Invalid visible_at timestamp",
988                )
989            })?;
990
991        // ⚠️ CRITICAL: DO NOT extract orchestration metadata - instance creation happens via ack_orchestration_item metadata
992        // Pass NULL for orchestration_name, orchestration_version, execution_id parameters
993
994        // Call stored procedure to enqueue work
995        sqlx::query(&format!(
996            "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
997            self.schema_name
998        ))
999        .bind(instance_id)
1000        .bind(&work_item)
1001        .bind(visible_at)
1002        .bind::<Option<String>>(None) // orchestration_name - NULL
1003        .bind::<Option<String>>(None) // orchestration_version - NULL
1004        .bind::<Option<i64>>(None) // execution_id - NULL
1005        .execute(&*self.pool)
1006        .await
1007        .map_err(|e| {
1008            error!(
1009                target = "duroxide::providers::postgres",
1010                operation = "enqueue_orchestrator_work",
1011                error_type = "database_error",
1012                error = %e,
1013                instance_id = %instance_id,
1014                "Failed to enqueue orchestrator work"
1015            );
1016            Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1017        })?;
1018
1019        debug!(
1020            target = "duroxide::providers::postgres",
1021            operation = "enqueue_orchestrator_work",
1022            instance_id = %instance_id,
1023            delay_ms = delay.map(|d| d.as_millis() as u64),
1024            "Enqueued orchestrator work"
1025        );
1026
1027        Ok(())
1028    }
1029
1030    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1031    async fn read_with_execution(
1032        &self,
1033        instance: &str,
1034        execution_id: u64,
1035    ) -> Result<Vec<Event>, ProviderError> {
1036        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1037            "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1038            self.table_name("history")
1039        ))
1040        .bind(instance)
1041        .bind(execution_id as i64)
1042        .fetch_all(&*self.pool)
1043        .await
1044        .ok()
1045        .unwrap_or_default();
1046
1047        Ok(event_data_rows
1048            .into_iter()
1049            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1050            .collect())
1051    }
1052
1053    fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1054        Some(self)
1055    }
1056}
1057
1058#[async_trait::async_trait]
1059impl ProviderAdmin for PostgresProvider {
1060    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1061    async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1062        sqlx::query_scalar(&format!(
1063            "SELECT instance_id FROM {}.list_instances()",
1064            self.schema_name
1065        ))
1066        .fetch_all(&*self.pool)
1067        .await
1068        .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1069    }
1070
1071    #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1072    async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1073        sqlx::query_scalar(&format!(
1074            "SELECT instance_id FROM {}.list_instances_by_status($1)",
1075            self.schema_name
1076        ))
1077        .bind(status)
1078        .fetch_all(&*self.pool)
1079        .await
1080        .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1081    }
1082
1083    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1084    async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1085        let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1086            "SELECT execution_id FROM {}.list_executions($1)",
1087            self.schema_name
1088        ))
1089        .bind(instance)
1090        .fetch_all(&*self.pool)
1091        .await
1092        .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1093
1094        Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1095    }
1096
1097    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1098    async fn read_history_with_execution_id(
1099        &self,
1100        instance: &str,
1101        execution_id: u64,
1102    ) -> Result<Vec<Event>, ProviderError> {
1103        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1104            "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1105            self.schema_name
1106        ))
1107        .bind(instance)
1108        .bind(execution_id as i64)
1109        .fetch_all(&*self.pool)
1110        .await
1111        .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1112
1113        event_data_rows
1114            .into_iter()
1115            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1116            .collect::<Vec<Event>>()
1117            .into_iter()
1118            .map(Ok)
1119            .collect()
1120    }
1121
1122    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1123    async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1124        let execution_id = self.latest_execution_id(instance).await?;
1125        self.read_history_with_execution_id(instance, execution_id)
1126            .await
1127    }
1128
1129    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1130    async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1131        sqlx::query_scalar(&format!(
1132            "SELECT {}.latest_execution_id($1)",
1133            self.schema_name
1134        ))
1135        .bind(instance)
1136        .fetch_optional(&*self.pool)
1137        .await
1138        .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1139        .map(|id: i64| id as u64)
1140        .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1141    }
1142
1143    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1144    async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1145        let row: Option<(
1146            String,
1147            String,
1148            String,
1149            i64,
1150            chrono::DateTime<Utc>,
1151            Option<chrono::DateTime<Utc>>,
1152            Option<String>,
1153            Option<String>,
1154        )> = sqlx::query_as(&format!(
1155            "SELECT * FROM {}.get_instance_info($1)",
1156            self.schema_name
1157        ))
1158        .bind(instance)
1159        .fetch_optional(&*self.pool)
1160        .await
1161        .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1162
1163        let (
1164            instance_id,
1165            orchestration_name,
1166            orchestration_version,
1167            current_execution_id,
1168            created_at,
1169            updated_at,
1170            status,
1171            output,
1172        ) =
1173            row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1174
1175        Ok(InstanceInfo {
1176            instance_id,
1177            orchestration_name,
1178            orchestration_version,
1179            current_execution_id: current_execution_id as u64,
1180            status: status.unwrap_or_else(|| "Running".to_string()),
1181            output,
1182            created_at: created_at.timestamp_millis() as u64,
1183            updated_at: updated_at
1184                .map(|dt| dt.timestamp_millis() as u64)
1185                .unwrap_or(created_at.timestamp_millis() as u64),
1186        })
1187    }
1188
1189    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1190    async fn get_execution_info(
1191        &self,
1192        instance: &str,
1193        execution_id: u64,
1194    ) -> Result<ExecutionInfo, ProviderError> {
1195        let row: Option<(
1196            i64,
1197            String,
1198            Option<String>,
1199            chrono::DateTime<Utc>,
1200            Option<chrono::DateTime<Utc>>,
1201            i64,
1202        )> = sqlx::query_as(&format!(
1203            "SELECT * FROM {}.get_execution_info($1, $2)",
1204            self.schema_name
1205        ))
1206        .bind(instance)
1207        .bind(execution_id as i64)
1208        .fetch_optional(&*self.pool)
1209        .await
1210        .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1211
1212        let (exec_id, status, output, started_at, completed_at, event_count) = row
1213            .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1214
1215        Ok(ExecutionInfo {
1216            execution_id: exec_id as u64,
1217            status,
1218            output,
1219            started_at: started_at.timestamp_millis() as u64,
1220            completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1221            event_count: event_count as usize,
1222        })
1223    }
1224
1225    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1226    async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1227        let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1228            "SELECT * FROM {}.get_system_metrics()",
1229            self.schema_name
1230        ))
1231        .fetch_optional(&*self.pool)
1232        .await
1233        .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1234
1235        let (
1236            total_instances,
1237            total_executions,
1238            running_instances,
1239            completed_instances,
1240            failed_instances,
1241            total_events,
1242        ) = row.ok_or_else(|| {
1243            ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1244        })?;
1245
1246        Ok(SystemMetrics {
1247            total_instances: total_instances as u64,
1248            total_executions: total_executions as u64,
1249            running_instances: running_instances as u64,
1250            completed_instances: completed_instances as u64,
1251            failed_instances: failed_instances as u64,
1252            total_events: total_events as u64,
1253        })
1254    }
1255
1256    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1257    async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1258        let now_ms = Self::now_millis();
1259
1260        let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1261            "SELECT * FROM {}.get_queue_depths($1)",
1262            self.schema_name
1263        ))
1264        .bind(now_ms)
1265        .fetch_optional(&*self.pool)
1266        .await
1267        .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1268
1269        let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1270            ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1271        })?;
1272
1273        Ok(QueueDepths {
1274            orchestrator_queue: orchestrator_queue as usize,
1275            worker_queue: worker_queue as usize,
1276            timer_queue: 0, // Timers are in orchestrator queue with delayed visibility
1277        })
1278    }
1279}