duroxide_pg/
provider.rs

1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4    ExecutionInfo, ExecutionMetadata, InstanceInfo, OrchestrationItem, Provider,
5    ProviderAdmin, ProviderError, QueueDepths, ScheduledActivityIdentifier, SystemMetrics, WorkItem,
6};
7use duroxide::Event;
8use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
9use std::sync::Arc;
10use std::time::Duration;
11use std::time::{SystemTime, UNIX_EPOCH};
12use tokio::time::sleep;
13use tracing::{debug, error, instrument, warn};
14
15use crate::migrations::MigrationRunner;
16
17/// PostgreSQL-based provider for Duroxide durable orchestrations.
18///
19/// Implements the [`Provider`] and [`ProviderAdmin`] traits from Duroxide,
20/// storing orchestration state, history, and work queues in PostgreSQL.
21///
22/// # Example
23///
24/// ```rust,no_run
25/// use duroxide_pg::PostgresProvider;
26///
27/// # async fn example() -> anyhow::Result<()> {
28/// // Connect using DATABASE_URL or explicit connection string
29/// let provider = PostgresProvider::new("postgres://localhost/mydb").await?;
30///
31/// // Or use a custom schema for isolation
32/// let provider = PostgresProvider::new_with_schema(
33///     "postgres://localhost/mydb",
34///     Some("my_app"),
35/// ).await?;
36/// # Ok(())
37/// # }
38/// ```
39pub struct PostgresProvider {
40    pool: Arc<PgPool>,
41    schema_name: String,
42}
43
44impl PostgresProvider {
45    pub async fn new(database_url: &str) -> Result<Self> {
46        Self::new_with_schema(database_url, None).await
47    }
48
49    pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
50        let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
51            .ok()
52            .and_then(|s| s.parse::<u32>().ok())
53            .unwrap_or(10);
54
55        let pool = PgPoolOptions::new()
56            .max_connections(max_connections)
57            .min_connections(1)
58            .acquire_timeout(std::time::Duration::from_secs(30))
59            .connect(database_url)
60            .await?;
61
62        let schema_name = schema_name.unwrap_or("public").to_string();
63
64        let provider = Self {
65            pool: Arc::new(pool),
66            schema_name: schema_name.clone(),
67        };
68
69        // Run migrations to initialize schema
70        let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
71        migration_runner.migrate().await?;
72
73        Ok(provider)
74    }
75
76    #[instrument(skip(self), target = "duroxide::providers::postgres")]
77    pub async fn initialize_schema(&self) -> Result<()> {
78        // Schema initialization is now handled by migrations
79        // This method is kept for backward compatibility but delegates to migrations
80        let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
81        migration_runner.migrate().await?;
82        Ok(())
83    }
84
85    /// Get current timestamp in milliseconds (Unix epoch)
86    fn now_millis() -> i64 {
87        SystemTime::now()
88            .duration_since(UNIX_EPOCH)
89            .unwrap()
90            .as_millis() as i64
91    }
92
93
94
95    /// Get schema-qualified table name
96    fn table_name(&self, table: &str) -> String {
97        format!("{}.{}", self.schema_name, table)
98    }
99
100    /// Get the database pool (for testing)
101    pub fn pool(&self) -> &PgPool {
102        &self.pool
103    }
104
105    /// Get the schema name (for testing)
106    pub fn schema_name(&self) -> &str {
107        &self.schema_name
108    }
109
110    /// Convert sqlx::Error to ProviderError with proper classification
111    fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
112        match e {
113            SqlxError::Database(ref db_err) => {
114                // PostgreSQL error codes
115                let code_opt = db_err.code();
116                let code = code_opt.as_deref();
117                if code == Some("40P01") {
118                    // Deadlock detected
119                    ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
120                } else if code == Some("40001") {
121                    // Serialization failure - permanent error (transaction conflict, not transient)
122                    ProviderError::permanent(operation, format!("Serialization failure: {e}"))
123                } else if code == Some("23505") {
124                    // Unique constraint violation (duplicate event)
125                    ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
126                } else if code == Some("23503") {
127                    // Foreign key constraint violation
128                    ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
129                } else {
130                    ProviderError::permanent(operation, format!("Database error: {e}"))
131                }
132            }
133            SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
134                ProviderError::retryable(operation, format!("Connection pool error: {e}"))
135            }
136            SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
137            _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
138        }
139    }
140
141    /// Clean up schema after tests (drops all tables and optionally the schema)
142    ///
143    /// **SAFETY**: Never drops the "public" schema itself, only tables within it.
144    /// Only drops the schema if it's a custom schema (not "public").
145    pub async fn cleanup_schema(&self) -> Result<()> {
146        // Call the stored procedure to drop all tables
147        sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
148            .execute(&*self.pool)
149            .await?;
150
151        // SAFETY: Never drop the "public" schema - it's a PostgreSQL system schema
152        // Only drop custom schemas created for testing
153        if self.schema_name != "public" {
154            sqlx::query(&format!(
155                "DROP SCHEMA IF EXISTS {} CASCADE",
156                self.schema_name
157            ))
158            .execute(&*self.pool)
159            .await?;
160        } else {
161            // Explicit safeguard: we only drop tables from public schema, never the schema itself
162            // This ensures we don't accidentally drop the default PostgreSQL schema
163        }
164
165        Ok(())
166    }
167}
168
169#[async_trait::async_trait]
170impl Provider for PostgresProvider {
171    fn name(&self) -> &str {
172        "duroxide-pg"
173    }
174
175    fn version(&self) -> &str {
176        env!("CARGO_PKG_VERSION")
177    }
178
179    #[instrument(skip(self), target = "duroxide::providers::postgres")]
180    async fn fetch_orchestration_item(
181        &self,
182        lock_timeout: Duration,
183        _poll_timeout: Duration,
184    ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
185        let start = std::time::Instant::now();
186
187        const MAX_RETRIES: u32 = 3;
188        const RETRY_DELAY_MS: u64 = 50;
189
190        // Convert Duration to milliseconds
191        let lock_timeout_ms = lock_timeout.as_millis() as i64;
192        let mut _last_error: Option<ProviderError> = None;
193
194        for attempt in 0..=MAX_RETRIES {
195            let now_ms = Self::now_millis();
196
197            let result: Result<
198                Option<(
199                    String,
200                    String,
201                    String,
202                    i64,
203                    serde_json::Value,
204                    serde_json::Value,
205                    String,
206                    i32,
207                )>,
208                SqlxError,
209            > = sqlx::query_as(&format!(
210                "SELECT * FROM {}.fetch_orchestration_item($1, $2)",
211                self.schema_name
212            ))
213            .bind(now_ms)
214            .bind(lock_timeout_ms)
215            .fetch_optional(&*self.pool)
216            .await;
217
218            let row = match result {
219                Ok(r) => r,
220                Err(e) => {
221                    let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
222                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
223                        warn!(
224                            target = "duroxide::providers::postgres",
225                            operation = "fetch_orchestration_item",
226                            attempt = attempt + 1,
227                            error = %provider_err,
228                            "Retryable error, will retry"
229                        );
230                        _last_error = Some(provider_err);
231                        sleep(std::time::Duration::from_millis(
232                            RETRY_DELAY_MS * (attempt as u64 + 1),
233                        ))
234                        .await;
235                        continue;
236                    }
237                    return Err(provider_err);
238                }
239            };
240
241            if let Some((
242                instance_id,
243                orchestration_name,
244                orchestration_version,
245                execution_id,
246                history_json,
247                messages_json,
248                lock_token,
249                attempt_count,
250            )) = row
251            {
252                let history: Vec<Event> = serde_json::from_value(history_json).map_err(|e| {
253                    ProviderError::permanent(
254                        "fetch_orchestration_item",
255                        format!("Failed to deserialize history: {e}"),
256                    )
257                })?;
258
259                let messages: Vec<WorkItem> =
260                    serde_json::from_value(messages_json).map_err(|e| {
261                        ProviderError::permanent(
262                            "fetch_orchestration_item",
263                            format!("Failed to deserialize messages: {e}"),
264                        )
265                    })?;
266
267                let duration_ms = start.elapsed().as_millis() as u64;
268                debug!(
269                    target = "duroxide::providers::postgres",
270                    operation = "fetch_orchestration_item",
271                    instance_id = %instance_id,
272                    execution_id = execution_id,
273                    message_count = messages.len(),
274                    history_count = history.len(),
275                    attempt_count = attempt_count,
276                    duration_ms = duration_ms,
277                    attempts = attempt + 1,
278                    "Fetched orchestration item via stored procedure"
279                );
280
281                return Ok(Some((
282                    OrchestrationItem {
283                        instance: instance_id,
284                        orchestration_name,
285                        execution_id: execution_id as u64,
286                        version: orchestration_version,
287                        history,
288                        messages,
289                    },
290                    lock_token,
291                    attempt_count as u32,
292                )));
293            }
294
295            // No result found - return immediately (short polling behavior)
296            // Only retry with delay on retryable errors (handled above)
297            return Ok(None);
298        }
299
300        Ok(None)
301    }
302    #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
303    async fn ack_orchestration_item(
304        &self,
305        lock_token: &str,
306        execution_id: u64,
307        history_delta: Vec<Event>,
308        worker_items: Vec<WorkItem>,
309        orchestrator_items: Vec<WorkItem>,
310        metadata: ExecutionMetadata,
311        cancelled_activities: Vec<ScheduledActivityIdentifier>,
312    ) -> Result<(), ProviderError> {
313        let start = std::time::Instant::now();
314
315        const MAX_RETRIES: u32 = 3;
316        const RETRY_DELAY_MS: u64 = 50;
317
318        let mut history_delta_payload = Vec::with_capacity(history_delta.len());
319        for event in &history_delta {
320            if event.event_id() == 0 {
321                return Err(ProviderError::permanent(
322                    "ack_orchestration_item",
323                    "event_id must be set by runtime",
324                ));
325            }
326
327            let event_json = serde_json::to_string(event).map_err(|e| {
328                ProviderError::permanent(
329                    "ack_orchestration_item",
330                    format!("Failed to serialize event: {e}"),
331                )
332            })?;
333
334            let event_type = format!("{event:?}")
335                .split('{')
336                .next()
337                .unwrap_or("Unknown")
338                .trim()
339                .to_string();
340
341            history_delta_payload.push(serde_json::json!({
342                "event_id": event.event_id(),
343                "event_type": event_type,
344                "event_data": event_json,
345            }));
346        }
347
348        let history_delta_json = serde_json::Value::Array(history_delta_payload);
349
350        let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
351            ProviderError::permanent(
352                "ack_orchestration_item",
353                format!("Failed to serialize worker items: {e}"),
354            )
355        })?;
356
357        let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
358            ProviderError::permanent(
359                "ack_orchestration_item",
360                format!("Failed to serialize orchestrator items: {e}"),
361            )
362        })?;
363
364        let metadata_json = serde_json::json!({
365            "orchestration_name": metadata.orchestration_name,
366            "orchestration_version": metadata.orchestration_version,
367            "status": metadata.status,
368            "output": metadata.output,
369        });
370
371        // Serialize cancelled activities for lock stealing
372        let cancelled_activities_json: Vec<serde_json::Value> = cancelled_activities
373            .iter()
374            .map(|a| {
375                serde_json::json!({
376                    "instance": a.instance,
377                    "execution_id": a.execution_id,
378                    "activity_id": a.activity_id,
379                })
380            })
381            .collect();
382        let cancelled_activities_json = serde_json::Value::Array(cancelled_activities_json);
383
384        for attempt in 0..=MAX_RETRIES {
385            let result = sqlx::query(&format!(
386                "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6, $7)",
387                self.schema_name
388            ))
389            .bind(lock_token)
390            .bind(execution_id as i64)
391            .bind(&history_delta_json)
392            .bind(&worker_items_json)
393            .bind(&orchestrator_items_json)
394            .bind(&metadata_json)
395            .bind(&cancelled_activities_json)
396            .execute(&*self.pool)
397            .await;
398
399            match result {
400                Ok(_) => {
401                    let duration_ms = start.elapsed().as_millis() as u64;
402                    debug!(
403                        target = "duroxide::providers::postgres",
404                        operation = "ack_orchestration_item",
405                        execution_id = execution_id,
406                        history_count = history_delta.len(),
407                        worker_items_count = worker_items.len(),
408                        orchestrator_items_count = orchestrator_items.len(),
409                        cancelled_activities_count = cancelled_activities.len(),
410                        duration_ms = duration_ms,
411                        attempts = attempt + 1,
412                        "Acknowledged orchestration item via stored procedure"
413                    );
414                    return Ok(());
415                }
416                Err(e) => {
417                    // Check for permanent errors first
418                    if let SqlxError::Database(db_err) = &e {
419                        if db_err.message().contains("Invalid lock token") {
420                            return Err(ProviderError::permanent(
421                                "ack_orchestration_item",
422                                "Invalid lock token",
423                            ));
424                        }
425                    } else if e.to_string().contains("Invalid lock token") {
426                        return Err(ProviderError::permanent(
427                            "ack_orchestration_item",
428                            "Invalid lock token",
429                        ));
430                    }
431
432                    let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
433                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
434                        warn!(
435                            target = "duroxide::providers::postgres",
436                            operation = "ack_orchestration_item",
437                            attempt = attempt + 1,
438                            error = %provider_err,
439                            "Retryable error, will retry"
440                        );
441                        sleep(std::time::Duration::from_millis(
442                            RETRY_DELAY_MS * (attempt as u64 + 1),
443                        ))
444                        .await;
445                        continue;
446                    }
447                    return Err(provider_err);
448                }
449            }
450        }
451
452        // Should never reach here, but just in case
453        Ok(())
454    }
455    #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
456    async fn abandon_orchestration_item(
457        &self,
458        lock_token: &str,
459        delay: Option<Duration>,
460        ignore_attempt: bool,
461    ) -> Result<(), ProviderError> {
462        let start = std::time::Instant::now();
463        let now_ms = Self::now_millis();
464        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
465
466        let instance_id = match sqlx::query_scalar::<_, String>(&format!(
467            "SELECT {}.abandon_orchestration_item($1, $2, $3, $4)",
468            self.schema_name
469        ))
470        .bind(lock_token)
471        .bind(now_ms)
472        .bind(delay_param)
473        .bind(ignore_attempt)
474        .fetch_one(&*self.pool)
475        .await
476        {
477            Ok(instance_id) => instance_id,
478            Err(e) => {
479                if let SqlxError::Database(db_err) = &e {
480                    if db_err.message().contains("Invalid lock token") {
481                        return Err(ProviderError::permanent(
482                            "abandon_orchestration_item",
483                            "Invalid lock token",
484                        ));
485                    }
486                } else if e.to_string().contains("Invalid lock token") {
487                    return Err(ProviderError::permanent(
488                        "abandon_orchestration_item",
489                        "Invalid lock token",
490                    ));
491                }
492
493                return Err(Self::sqlx_to_provider_error(
494                    "abandon_orchestration_item",
495                    e,
496                ));
497            }
498        };
499
500        let duration_ms = start.elapsed().as_millis() as u64;
501        debug!(
502            target = "duroxide::providers::postgres",
503            operation = "abandon_orchestration_item",
504            instance_id = %instance_id,
505            delay_ms = delay.map(|d| d.as_millis() as u64),
506            ignore_attempt = ignore_attempt,
507            duration_ms = duration_ms,
508            "Abandoned orchestration item via stored procedure"
509        );
510
511        Ok(())
512    }
513
514    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
515    async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
516        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
517            "SELECT out_event_data FROM {}.fetch_history($1)",
518            self.schema_name
519        ))
520        .bind(instance)
521        .fetch_all(&*self.pool)
522        .await
523        .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
524
525        Ok(event_data_rows
526            .into_iter()
527            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
528            .collect())
529    }
530
531    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
532    async fn append_with_execution(
533        &self,
534        instance: &str,
535        execution_id: u64,
536        new_events: Vec<Event>,
537    ) -> Result<(), ProviderError> {
538        if new_events.is_empty() {
539            return Ok(());
540        }
541
542        let mut events_payload = Vec::with_capacity(new_events.len());
543        for event in &new_events {
544            if event.event_id() == 0 {
545                error!(
546                    target = "duroxide::providers::postgres",
547                    operation = "append_with_execution",
548                    error_type = "validation_error",
549                    instance_id = %instance,
550                    execution_id = execution_id,
551                    "event_id must be set by runtime"
552                );
553                return Err(ProviderError::permanent(
554                    "append_with_execution",
555                    "event_id must be set by runtime",
556                ));
557            }
558
559            let event_json = serde_json::to_string(event).map_err(|e| {
560                ProviderError::permanent(
561                    "append_with_execution",
562                    format!("Failed to serialize event: {e}"),
563                )
564            })?;
565
566            let event_type = format!("{event:?}")
567                .split('{')
568                .next()
569                .unwrap_or("Unknown")
570                .trim()
571                .to_string();
572
573            events_payload.push(serde_json::json!({
574                "event_id": event.event_id(),
575                "event_type": event_type,
576                "event_data": event_json,
577            }));
578        }
579
580        let events_json = serde_json::Value::Array(events_payload);
581
582        sqlx::query(&format!(
583            "SELECT {}.append_history($1, $2, $3)",
584            self.schema_name
585        ))
586        .bind(instance)
587        .bind(execution_id as i64)
588        .bind(events_json)
589        .execute(&*self.pool)
590        .await
591        .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
592
593        debug!(
594            target = "duroxide::providers::postgres",
595            operation = "append_with_execution",
596            instance_id = %instance,
597            execution_id = execution_id,
598            event_count = new_events.len(),
599            "Appended history events via stored procedure"
600        );
601
602        Ok(())
603    }
604
605    #[instrument(skip(self), target = "duroxide::providers::postgres")]
606    async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
607        let work_item = serde_json::to_string(&item).map_err(|e| {
608            ProviderError::permanent(
609                "enqueue_worker_work",
610                format!("Failed to serialize work item: {e}"),
611            )
612        })?;
613
614        let now_ms = Self::now_millis();
615
616        // Extract activity identification for ActivityExecute items (for cancellation support)
617        let (instance_id, execution_id, activity_id) = match &item {
618            WorkItem::ActivityExecute {
619                instance,
620                execution_id,
621                id,
622                ..
623            } => (
624                Some(instance.clone()),
625                Some(*execution_id as i64),
626                Some(*id as i64),
627            ),
628            _ => (None, None, None),
629        };
630
631        sqlx::query(&format!(
632            "SELECT {}.enqueue_worker_work($1, $2, $3, $4, $5)",
633            self.schema_name
634        ))
635        .bind(work_item)
636        .bind(now_ms)
637        .bind(&instance_id)
638        .bind(execution_id)
639        .bind(activity_id)
640        .execute(&*self.pool)
641        .await
642        .map_err(|e| {
643            error!(
644                target = "duroxide::providers::postgres",
645                operation = "enqueue_worker_work",
646                error_type = "database_error",
647                error = %e,
648                "Failed to enqueue worker work"
649            );
650            Self::sqlx_to_provider_error("enqueue_worker_work", e)
651        })?;
652
653        Ok(())
654    }
655
656    #[instrument(skip(self), target = "duroxide::providers::postgres")]
657    async fn fetch_work_item(
658        &self,
659        lock_timeout: Duration,
660        _poll_timeout: Duration,
661    ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
662        let start = std::time::Instant::now();
663
664        // Convert Duration to milliseconds
665        let lock_timeout_ms = lock_timeout.as_millis() as i64;
666
667        let row = match sqlx::query_as::<_, (String, String, i32)>(&format!(
668            "SELECT * FROM {}.fetch_work_item($1, $2)",
669            self.schema_name
670        ))
671        .bind(Self::now_millis())
672        .bind(lock_timeout_ms)
673        .fetch_optional(&*self.pool)
674        .await
675        {
676            Ok(row) => row,
677            Err(e) => {
678                return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
679            }
680        };
681
682        let (work_item_json, lock_token, attempt_count) = match row {
683            Some(row) => row,
684            None => return Ok(None),
685        };
686
687        let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
688            ProviderError::permanent(
689                "fetch_work_item",
690                format!("Failed to deserialize worker item: {e}"),
691            )
692        })?;
693
694        let duration_ms = start.elapsed().as_millis() as u64;
695
696        // Extract instance for logging - different work item types have different structures
697        let instance_id = match &work_item {
698            WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
699            WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
700            WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
701            WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
702            WorkItem::TimerFired { instance, .. } => instance.as_str(),
703            WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
704            WorkItem::CancelInstance { instance, .. } => instance.as_str(),
705            WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
706            WorkItem::SubOrchCompleted {
707                parent_instance, ..
708            } => parent_instance.as_str(),
709            WorkItem::SubOrchFailed {
710                parent_instance, ..
711            } => parent_instance.as_str(),
712        };
713
714        debug!(
715            target = "duroxide::providers::postgres",
716            operation = "fetch_work_item",
717            instance_id = %instance_id,
718            attempt_count = attempt_count,
719            duration_ms = duration_ms,
720            "Fetched activity work item via stored procedure"
721        );
722
723        Ok(Some((
724            work_item,
725            lock_token,
726            attempt_count as u32,
727        )))
728    }
729
730    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
731    async fn ack_work_item(
732        &self,
733        token: &str,
734        completion: Option<WorkItem>,
735    ) -> Result<(), ProviderError> {
736        let start = std::time::Instant::now();
737
738        // If no completion provided (e.g., cancelled activity), just delete the item
739        let Some(completion) = completion else {
740            // Call ack_worker with NULL completion to delete without enqueueing
741            sqlx::query(&format!(
742                "SELECT {}.ack_worker($1)",
743                self.schema_name
744            ))
745            .bind(token)
746            .execute(&*self.pool)
747            .await
748            .map_err(|e| {
749                if e.to_string().contains("Worker queue item not found") {
750                    ProviderError::permanent(
751                        "ack_worker",
752                        "Worker queue item not found or already processed",
753                    )
754                } else {
755                    Self::sqlx_to_provider_error("ack_worker", e)
756                }
757            })?;
758
759            let duration_ms = start.elapsed().as_millis() as u64;
760            debug!(
761                target = "duroxide::providers::postgres",
762                operation = "ack_worker",
763                token = %token,
764                duration_ms = duration_ms,
765                "Acknowledged worker without completion (cancelled)"
766            );
767            return Ok(());
768        };
769
770        // Extract instance ID from completion WorkItem
771        let instance_id = match &completion {
772            WorkItem::ActivityCompleted { instance, .. }
773            | WorkItem::ActivityFailed { instance, .. } => instance,
774            _ => {
775                error!(
776                    target = "duroxide::providers::postgres",
777                    operation = "ack_worker",
778                    error_type = "invalid_completion_type",
779                    "Invalid completion work item type"
780                );
781                return Err(ProviderError::permanent(
782                    "ack_worker",
783                    "Invalid completion work item type",
784                ));
785            }
786        };
787
788        let completion_json = serde_json::to_string(&completion).map_err(|e| {
789            ProviderError::permanent("ack_worker", format!("Failed to serialize completion: {e}"))
790        })?;
791
792        let now_ms = Self::now_millis();
793
794        // Call stored procedure to atomically delete worker item and enqueue completion
795        sqlx::query(&format!(
796            "SELECT {}.ack_worker($1, $2, $3, $4)",
797            self.schema_name
798        ))
799        .bind(token)
800        .bind(instance_id)
801        .bind(completion_json)
802        .bind(now_ms)
803        .execute(&*self.pool)
804        .await
805        .map_err(|e| {
806            if e.to_string().contains("Worker queue item not found") {
807                error!(
808                    target = "duroxide::providers::postgres",
809                    operation = "ack_worker",
810                    error_type = "worker_item_not_found",
811                    token = %token,
812                    "Worker queue item not found or already processed"
813                );
814                ProviderError::permanent(
815                    "ack_worker",
816                    "Worker queue item not found or already processed",
817                )
818            } else {
819                Self::sqlx_to_provider_error("ack_worker", e)
820            }
821        })?;
822
823        let duration_ms = start.elapsed().as_millis() as u64;
824        debug!(
825            target = "duroxide::providers::postgres",
826            operation = "ack_worker",
827            instance_id = %instance_id,
828            duration_ms = duration_ms,
829            "Acknowledged worker and enqueued completion"
830        );
831
832        Ok(())
833    }
834
835    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
836    async fn renew_work_item_lock(
837        &self,
838        token: &str,
839        extend_for: Duration,
840    ) -> Result<(), ProviderError> {
841        let start = std::time::Instant::now();
842
843        // Get current time from application for consistent time reference
844        let now_ms = Self::now_millis();
845
846        // Convert Duration to seconds for the stored procedure
847        let extend_secs = extend_for.as_secs() as i64;
848
849        match sqlx::query(&format!(
850            "SELECT {}.renew_work_item_lock($1, $2, $3)",
851            self.schema_name
852        ))
853        .bind(token)
854        .bind(now_ms)
855        .bind(extend_secs)
856        .execute(&*self.pool)
857        .await
858        {
859            Ok(_) => {
860                let duration_ms = start.elapsed().as_millis() as u64;
861                debug!(
862                    target = "duroxide::providers::postgres",
863                    operation = "renew_work_item_lock",
864                    token = %token,
865                    extend_for_secs = extend_secs,
866                    duration_ms = duration_ms,
867                    "Work item lock renewed successfully"
868                );
869                Ok(())
870            }
871            Err(e) => {
872                if let SqlxError::Database(db_err) = &e {
873                    if db_err.message().contains("Lock token invalid") {
874                        return Err(ProviderError::permanent(
875                            "renew_work_item_lock",
876                            "Lock token invalid, expired, or already acked",
877                        ));
878                    }
879                } else if e.to_string().contains("Lock token invalid") {
880                    return Err(ProviderError::permanent(
881                        "renew_work_item_lock",
882                        "Lock token invalid, expired, or already acked",
883                    ));
884                }
885
886                Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
887            }
888        }
889    }
890
891    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
892    async fn abandon_work_item(
893        &self,
894        token: &str,
895        delay: Option<Duration>,
896        ignore_attempt: bool,
897    ) -> Result<(), ProviderError> {
898        let start = std::time::Instant::now();
899        let now_ms = Self::now_millis();
900        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
901
902        match sqlx::query(&format!(
903            "SELECT {}.abandon_work_item($1, $2, $3, $4)",
904            self.schema_name
905        ))
906        .bind(token)
907        .bind(now_ms)
908        .bind(delay_param)
909        .bind(ignore_attempt)
910        .execute(&*self.pool)
911        .await
912        {
913            Ok(_) => {
914                let duration_ms = start.elapsed().as_millis() as u64;
915                debug!(
916                    target = "duroxide::providers::postgres",
917                    operation = "abandon_work_item",
918                    token = %token,
919                    delay_ms = delay.map(|d| d.as_millis() as u64),
920                    ignore_attempt = ignore_attempt,
921                    duration_ms = duration_ms,
922                    "Abandoned work item via stored procedure"
923                );
924                Ok(())
925            }
926            Err(e) => {
927                if let SqlxError::Database(db_err) = &e {
928                    if db_err.message().contains("Invalid lock token")
929                        || db_err.message().contains("already acked")
930                    {
931                        return Err(ProviderError::permanent(
932                            "abandon_work_item",
933                            "Invalid lock token or already acked",
934                        ));
935                    }
936                } else if e.to_string().contains("Invalid lock token")
937                    || e.to_string().contains("already acked")
938                {
939                    return Err(ProviderError::permanent(
940                        "abandon_work_item",
941                        "Invalid lock token or already acked",
942                    ));
943                }
944
945                Err(Self::sqlx_to_provider_error("abandon_work_item", e))
946            }
947        }
948    }
949
950    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
951    async fn renew_orchestration_item_lock(
952        &self,
953        token: &str,
954        extend_for: Duration,
955    ) -> Result<(), ProviderError> {
956        let start = std::time::Instant::now();
957
958        // Get current time from application for consistent time reference
959        let now_ms = Self::now_millis();
960
961        // Convert Duration to seconds for the stored procedure
962        let extend_secs = extend_for.as_secs() as i64;
963
964        match sqlx::query(&format!(
965            "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
966            self.schema_name
967        ))
968        .bind(token)
969        .bind(now_ms)
970        .bind(extend_secs)
971        .execute(&*self.pool)
972        .await
973        {
974            Ok(_) => {
975                let duration_ms = start.elapsed().as_millis() as u64;
976                debug!(
977                    target = "duroxide::providers::postgres",
978                    operation = "renew_orchestration_item_lock",
979                    token = %token,
980                    extend_for_secs = extend_secs,
981                    duration_ms = duration_ms,
982                    "Orchestration item lock renewed successfully"
983                );
984                Ok(())
985            }
986            Err(e) => {
987                if let SqlxError::Database(db_err) = &e {
988                    if db_err.message().contains("Lock token invalid")
989                        || db_err.message().contains("expired")
990                        || db_err.message().contains("already released")
991                    {
992                        return Err(ProviderError::permanent(
993                            "renew_orchestration_item_lock",
994                            "Lock token invalid, expired, or already released",
995                        ));
996                    }
997                } else if e.to_string().contains("Lock token invalid")
998                    || e.to_string().contains("expired")
999                    || e.to_string().contains("already released")
1000                {
1001                    return Err(ProviderError::permanent(
1002                        "renew_orchestration_item_lock",
1003                        "Lock token invalid, expired, or already released",
1004                    ));
1005                }
1006
1007                Err(Self::sqlx_to_provider_error(
1008                    "renew_orchestration_item_lock",
1009                    e,
1010                ))
1011            }
1012        }
1013    }
1014
1015    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1016    async fn enqueue_for_orchestrator(
1017        &self,
1018        item: WorkItem,
1019        delay: Option<Duration>,
1020    ) -> Result<(), ProviderError> {
1021        let work_item = serde_json::to_string(&item).map_err(|e| {
1022            ProviderError::permanent(
1023                "enqueue_orchestrator_work",
1024                format!("Failed to serialize work item: {e}"),
1025            )
1026        })?;
1027
1028        // Extract instance ID from WorkItem enum
1029        let instance_id = match &item {
1030            WorkItem::StartOrchestration { instance, .. }
1031            | WorkItem::ActivityCompleted { instance, .. }
1032            | WorkItem::ActivityFailed { instance, .. }
1033            | WorkItem::TimerFired { instance, .. }
1034            | WorkItem::ExternalRaised { instance, .. }
1035            | WorkItem::CancelInstance { instance, .. }
1036            | WorkItem::ContinueAsNew { instance, .. } => instance,
1037            WorkItem::SubOrchCompleted {
1038                parent_instance, ..
1039            }
1040            | WorkItem::SubOrchFailed {
1041                parent_instance, ..
1042            } => parent_instance,
1043            WorkItem::ActivityExecute { .. } => {
1044                return Err(ProviderError::permanent(
1045                    "enqueue_orchestrator_work",
1046                    "ActivityExecute should go to worker queue, not orchestrator queue",
1047                ));
1048            }
1049        };
1050
1051        // Determine visible_at: use max of fire_at_ms (for TimerFired) and delay
1052        let now_ms = Self::now_millis();
1053
1054        let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
1055            if *fire_at_ms > 0 {
1056                // Take max of fire_at_ms and delay (if provided)
1057                if let Some(delay) = delay {
1058                    std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
1059                } else {
1060                    *fire_at_ms
1061                }
1062            } else {
1063                // fire_at_ms is 0, use delay or NOW()
1064                delay
1065                    .map(|d| now_ms as u64 + d.as_millis() as u64)
1066                    .unwrap_or(now_ms as u64)
1067            }
1068        } else {
1069            // Non-timer item: use delay or NOW()
1070            delay
1071                .map(|d| now_ms as u64 + d.as_millis() as u64)
1072                .unwrap_or(now_ms as u64)
1073        };
1074
1075        let visible_at = Utc
1076            .timestamp_millis_opt(visible_at_ms as i64)
1077            .single()
1078            .ok_or_else(|| {
1079                ProviderError::permanent(
1080                    "enqueue_orchestrator_work",
1081                    "Invalid visible_at timestamp",
1082                )
1083            })?;
1084
1085        // ⚠️ CRITICAL: DO NOT extract orchestration metadata - instance creation happens via ack_orchestration_item metadata
1086        // Pass NULL for orchestration_name, orchestration_version, execution_id parameters
1087
1088        // Call stored procedure to enqueue work
1089        sqlx::query(&format!(
1090            "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
1091            self.schema_name
1092        ))
1093        .bind(instance_id)
1094        .bind(&work_item)
1095        .bind(visible_at)
1096        .bind::<Option<String>>(None) // orchestration_name - NULL
1097        .bind::<Option<String>>(None) // orchestration_version - NULL
1098        .bind::<Option<i64>>(None) // execution_id - NULL
1099        .execute(&*self.pool)
1100        .await
1101        .map_err(|e| {
1102            error!(
1103                target = "duroxide::providers::postgres",
1104                operation = "enqueue_orchestrator_work",
1105                error_type = "database_error",
1106                error = %e,
1107                instance_id = %instance_id,
1108                "Failed to enqueue orchestrator work"
1109            );
1110            Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1111        })?;
1112
1113        debug!(
1114            target = "duroxide::providers::postgres",
1115            operation = "enqueue_orchestrator_work",
1116            instance_id = %instance_id,
1117            delay_ms = delay.map(|d| d.as_millis() as u64),
1118            "Enqueued orchestrator work"
1119        );
1120
1121        Ok(())
1122    }
1123
1124    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1125    async fn read_with_execution(
1126        &self,
1127        instance: &str,
1128        execution_id: u64,
1129    ) -> Result<Vec<Event>, ProviderError> {
1130        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1131            "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1132            self.table_name("history")
1133        ))
1134        .bind(instance)
1135        .bind(execution_id as i64)
1136        .fetch_all(&*self.pool)
1137        .await
1138        .ok()
1139        .unwrap_or_default();
1140
1141        Ok(event_data_rows
1142            .into_iter()
1143            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1144            .collect())
1145    }
1146
1147    fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1148        Some(self)
1149    }
1150}
1151
1152#[async_trait::async_trait]
1153impl ProviderAdmin for PostgresProvider {
1154    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1155    async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1156        sqlx::query_scalar(&format!(
1157            "SELECT instance_id FROM {}.list_instances()",
1158            self.schema_name
1159        ))
1160        .fetch_all(&*self.pool)
1161        .await
1162        .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1163    }
1164
1165    #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1166    async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1167        sqlx::query_scalar(&format!(
1168            "SELECT instance_id FROM {}.list_instances_by_status($1)",
1169            self.schema_name
1170        ))
1171        .bind(status)
1172        .fetch_all(&*self.pool)
1173        .await
1174        .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1175    }
1176
1177    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1178    async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1179        let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1180            "SELECT execution_id FROM {}.list_executions($1)",
1181            self.schema_name
1182        ))
1183        .bind(instance)
1184        .fetch_all(&*self.pool)
1185        .await
1186        .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1187
1188        Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1189    }
1190
1191    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1192    async fn read_history_with_execution_id(
1193        &self,
1194        instance: &str,
1195        execution_id: u64,
1196    ) -> Result<Vec<Event>, ProviderError> {
1197        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1198            "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1199            self.schema_name
1200        ))
1201        .bind(instance)
1202        .bind(execution_id as i64)
1203        .fetch_all(&*self.pool)
1204        .await
1205        .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1206
1207        event_data_rows
1208            .into_iter()
1209            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1210            .collect::<Vec<Event>>()
1211            .into_iter()
1212            .map(Ok)
1213            .collect()
1214    }
1215
1216    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1217    async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1218        let execution_id = self.latest_execution_id(instance).await?;
1219        self.read_history_with_execution_id(instance, execution_id)
1220            .await
1221    }
1222
1223    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1224    async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1225        sqlx::query_scalar(&format!(
1226            "SELECT {}.latest_execution_id($1)",
1227            self.schema_name
1228        ))
1229        .bind(instance)
1230        .fetch_optional(&*self.pool)
1231        .await
1232        .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1233        .map(|id: i64| id as u64)
1234        .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1235    }
1236
1237    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1238    async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1239        let row: Option<(
1240            String,
1241            String,
1242            String,
1243            i64,
1244            chrono::DateTime<Utc>,
1245            Option<chrono::DateTime<Utc>>,
1246            Option<String>,
1247            Option<String>,
1248        )> = sqlx::query_as(&format!(
1249            "SELECT * FROM {}.get_instance_info($1)",
1250            self.schema_name
1251        ))
1252        .bind(instance)
1253        .fetch_optional(&*self.pool)
1254        .await
1255        .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1256
1257        let (
1258            instance_id,
1259            orchestration_name,
1260            orchestration_version,
1261            current_execution_id,
1262            created_at,
1263            updated_at,
1264            status,
1265            output,
1266        ) =
1267            row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1268
1269        Ok(InstanceInfo {
1270            instance_id,
1271            orchestration_name,
1272            orchestration_version,
1273            current_execution_id: current_execution_id as u64,
1274            status: status.unwrap_or_else(|| "Running".to_string()),
1275            output,
1276            created_at: created_at.timestamp_millis() as u64,
1277            updated_at: updated_at
1278                .map(|dt| dt.timestamp_millis() as u64)
1279                .unwrap_or(created_at.timestamp_millis() as u64),
1280        })
1281    }
1282
1283    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1284    async fn get_execution_info(
1285        &self,
1286        instance: &str,
1287        execution_id: u64,
1288    ) -> Result<ExecutionInfo, ProviderError> {
1289        let row: Option<(
1290            i64,
1291            String,
1292            Option<String>,
1293            chrono::DateTime<Utc>,
1294            Option<chrono::DateTime<Utc>>,
1295            i64,
1296        )> = sqlx::query_as(&format!(
1297            "SELECT * FROM {}.get_execution_info($1, $2)",
1298            self.schema_name
1299        ))
1300        .bind(instance)
1301        .bind(execution_id as i64)
1302        .fetch_optional(&*self.pool)
1303        .await
1304        .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1305
1306        let (exec_id, status, output, started_at, completed_at, event_count) = row
1307            .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1308
1309        Ok(ExecutionInfo {
1310            execution_id: exec_id as u64,
1311            status,
1312            output,
1313            started_at: started_at.timestamp_millis() as u64,
1314            completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1315            event_count: event_count as usize,
1316        })
1317    }
1318
1319    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1320    async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1321        let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1322            "SELECT * FROM {}.get_system_metrics()",
1323            self.schema_name
1324        ))
1325        .fetch_optional(&*self.pool)
1326        .await
1327        .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1328
1329        let (
1330            total_instances,
1331            total_executions,
1332            running_instances,
1333            completed_instances,
1334            failed_instances,
1335            total_events,
1336        ) = row.ok_or_else(|| {
1337            ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1338        })?;
1339
1340        Ok(SystemMetrics {
1341            total_instances: total_instances as u64,
1342            total_executions: total_executions as u64,
1343            running_instances: running_instances as u64,
1344            completed_instances: completed_instances as u64,
1345            failed_instances: failed_instances as u64,
1346            total_events: total_events as u64,
1347        })
1348    }
1349
1350    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1351    async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1352        let now_ms = Self::now_millis();
1353
1354        let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1355            "SELECT * FROM {}.get_queue_depths($1)",
1356            self.schema_name
1357        ))
1358        .bind(now_ms)
1359        .fetch_optional(&*self.pool)
1360        .await
1361        .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1362
1363        let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1364            ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1365        })?;
1366
1367        Ok(QueueDepths {
1368            orchestrator_queue: orchestrator_queue as usize,
1369            worker_queue: worker_queue as usize,
1370            timer_queue: 0, // Timers are in orchestrator queue with delayed visibility
1371        })
1372    }
1373}