Skip to main content

duroxide_pg/
provider.rs

1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4    DeleteInstanceResult, DispatcherCapabilityFilter, ExecutionInfo, ExecutionMetadata,
5    InstanceFilter, InstanceInfo, OrchestrationItem, Provider, ProviderAdmin, ProviderError,
6    PruneOptions, PruneResult, QueueDepths, ScheduledActivityIdentifier, SystemMetrics, WorkItem,
7};
8use duroxide::Event;
9use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
10use std::sync::Arc;
11use std::time::Duration;
12use std::time::{SystemTime, UNIX_EPOCH};
13use tokio::time::sleep;
14use tracing::{debug, error, instrument, warn};
15
16use crate::migrations::MigrationRunner;
17
18/// PostgreSQL-based provider for Duroxide durable orchestrations.
19///
20/// Implements the [`Provider`] and [`ProviderAdmin`] traits from Duroxide,
21/// storing orchestration state, history, and work queues in PostgreSQL.
22///
23/// # Example
24///
25/// ```rust,no_run
26/// use duroxide_pg::PostgresProvider;
27///
28/// # async fn example() -> anyhow::Result<()> {
29/// // Connect using DATABASE_URL or explicit connection string
30/// let provider = PostgresProvider::new("postgres://localhost/mydb").await?;
31///
32/// // Or use a custom schema for isolation
33/// let provider = PostgresProvider::new_with_schema(
34///     "postgres://localhost/mydb",
35///     Some("my_app"),
36/// ).await?;
37/// # Ok(())
38/// # }
39/// ```
40pub struct PostgresProvider {
41    pool: Arc<PgPool>,
42    schema_name: String,
43}
44
45impl PostgresProvider {
46    pub async fn new(database_url: &str) -> Result<Self> {
47        Self::new_with_schema(database_url, None).await
48    }
49
50    pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
51        let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
52            .ok()
53            .and_then(|s| s.parse::<u32>().ok())
54            .unwrap_or(10);
55
56        let pool = PgPoolOptions::new()
57            .max_connections(max_connections)
58            .min_connections(1)
59            .acquire_timeout(std::time::Duration::from_secs(30))
60            .connect(database_url)
61            .await?;
62
63        let schema_name = schema_name.unwrap_or("public").to_string();
64
65        let provider = Self {
66            pool: Arc::new(pool),
67            schema_name: schema_name.clone(),
68        };
69
70        // Run migrations to initialize schema
71        let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
72        migration_runner.migrate().await?;
73
74        Ok(provider)
75    }
76
77    #[instrument(skip(self), target = "duroxide::providers::postgres")]
78    pub async fn initialize_schema(&self) -> Result<()> {
79        // Schema initialization is now handled by migrations
80        // This method is kept for backward compatibility but delegates to migrations
81        let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
82        migration_runner.migrate().await?;
83        Ok(())
84    }
85
86    /// Get current timestamp in milliseconds (Unix epoch)
87    fn now_millis() -> i64 {
88        SystemTime::now()
89            .duration_since(UNIX_EPOCH)
90            .unwrap()
91            .as_millis() as i64
92    }
93
94
95
96    /// Get schema-qualified table name
97    fn table_name(&self, table: &str) -> String {
98        format!("{}.{}", self.schema_name, table)
99    }
100
101    /// Get the database pool (for testing)
102    pub fn pool(&self) -> &PgPool {
103        &self.pool
104    }
105
106    /// Get the schema name (for testing)
107    pub fn schema_name(&self) -> &str {
108        &self.schema_name
109    }
110
111    /// Convert sqlx::Error to ProviderError with proper classification
112    fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
113        match e {
114            SqlxError::Database(ref db_err) => {
115                // PostgreSQL error codes
116                let code_opt = db_err.code();
117                let code = code_opt.as_deref();
118                if code == Some("40P01") {
119                    // Deadlock detected
120                    ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
121                } else if code == Some("40001") {
122                    // Serialization failure - permanent error (transaction conflict, not transient)
123                    ProviderError::permanent(operation, format!("Serialization failure: {e}"))
124                } else if code == Some("23505") {
125                    // Unique constraint violation (duplicate event)
126                    ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
127                } else if code == Some("23503") {
128                    // Foreign key constraint violation
129                    ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
130                } else {
131                    ProviderError::permanent(operation, format!("Database error: {e}"))
132                }
133            }
134            SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
135                ProviderError::retryable(operation, format!("Connection pool error: {e}"))
136            }
137            SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
138            _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
139        }
140    }
141
142    /// Clean up schema after tests (drops all tables and optionally the schema)
143    ///
144    /// **SAFETY**: Never drops the "public" schema itself, only tables within it.
145    /// Only drops the schema if it's a custom schema (not "public").
146    pub async fn cleanup_schema(&self) -> Result<()> {
147        // Call the stored procedure to drop all tables
148        sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
149            .execute(&*self.pool)
150            .await?;
151
152        // SAFETY: Never drop the "public" schema - it's a PostgreSQL system schema
153        // Only drop custom schemas created for testing
154        if self.schema_name != "public" {
155            sqlx::query(&format!(
156                "DROP SCHEMA IF EXISTS {} CASCADE",
157                self.schema_name
158            ))
159            .execute(&*self.pool)
160            .await?;
161        } else {
162            // Explicit safeguard: we only drop tables from public schema, never the schema itself
163            // This ensures we don't accidentally drop the default PostgreSQL schema
164        }
165
166        Ok(())
167    }
168}
169
170#[async_trait::async_trait]
171impl Provider for PostgresProvider {
172    fn name(&self) -> &str {
173        "duroxide-pg"
174    }
175
176    fn version(&self) -> &str {
177        env!("CARGO_PKG_VERSION")
178    }
179
180    #[instrument(skip(self), target = "duroxide::providers::postgres")]
181    async fn fetch_orchestration_item(
182        &self,
183        lock_timeout: Duration,
184        _poll_timeout: Duration,
185        filter: Option<&DispatcherCapabilityFilter>,
186    ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
187        let start = std::time::Instant::now();
188
189        const MAX_RETRIES: u32 = 3;
190        const RETRY_DELAY_MS: u64 = 50;
191
192        // Convert Duration to milliseconds
193        let lock_timeout_ms = lock_timeout.as_millis() as i64;
194        let mut _last_error: Option<ProviderError> = None;
195
196        // Extract version filter from capability filter
197        let (min_packed, max_packed) = if let Some(f) = filter {
198            if let Some(range) = f.supported_duroxide_versions.first() {
199                let min = range.min.major as i64 * 1_000_000 + range.min.minor as i64 * 1_000 + range.min.patch as i64;
200                let max = range.max.major as i64 * 1_000_000 + range.max.minor as i64 * 1_000 + range.max.patch as i64;
201                (Some(min), Some(max))
202            } else {
203                // Empty supported_duroxide_versions = supports nothing
204                return Ok(None);
205            }
206        } else {
207            (None, None)
208        };
209
210        for attempt in 0..=MAX_RETRIES {
211            let now_ms = Self::now_millis();
212
213            let result: Result<
214                Option<(
215                    String,
216                    String,
217                    String,
218                    i64,
219                    serde_json::Value,
220                    serde_json::Value,
221                    String,
222                    i32,
223                )>,
224                SqlxError,
225            > = sqlx::query_as(&format!(
226                "SELECT * FROM {}.fetch_orchestration_item($1, $2, $3, $4)",
227                self.schema_name
228            ))
229            .bind(now_ms)
230            .bind(lock_timeout_ms)
231            .bind(min_packed)
232            .bind(max_packed)
233            .fetch_optional(&*self.pool)
234            .await;
235
236            let row = match result {
237                Ok(r) => r,
238                Err(e) => {
239                    let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
240                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
241                        warn!(
242                            target = "duroxide::providers::postgres",
243                            operation = "fetch_orchestration_item",
244                            attempt = attempt + 1,
245                            error = %provider_err,
246                            "Retryable error, will retry"
247                        );
248                        _last_error = Some(provider_err);
249                        sleep(std::time::Duration::from_millis(
250                            RETRY_DELAY_MS * (attempt as u64 + 1),
251                        ))
252                        .await;
253                        continue;
254                    }
255                    return Err(provider_err);
256                }
257            };
258
259            if let Some((
260                instance_id,
261                orchestration_name,
262                orchestration_version,
263                execution_id,
264                history_json,
265                messages_json,
266                lock_token,
267                attempt_count,
268            )) = row
269            {
270                let (history, history_error) = match serde_json::from_value::<Vec<Event>>(history_json) {
271                    Ok(h) => (h, None),
272                    Err(e) => {
273                        let error_msg = format!("Failed to deserialize history: {e}");
274                        warn!(
275                            target = "duroxide::providers::postgres",
276                            instance = %instance_id,
277                            error = %error_msg,
278                            "History deserialization failed, returning item with history_error"
279                        );
280                        (vec![], Some(error_msg))
281                    }
282                };
283
284                let messages: Vec<WorkItem> =
285                    serde_json::from_value(messages_json).map_err(|e| {
286                        ProviderError::permanent(
287                            "fetch_orchestration_item",
288                            format!("Failed to deserialize messages: {e}"),
289                        )
290                    })?;
291
292                let duration_ms = start.elapsed().as_millis() as u64;
293                debug!(
294                    target = "duroxide::providers::postgres",
295                    operation = "fetch_orchestration_item",
296                    instance_id = %instance_id,
297                    execution_id = execution_id,
298                    message_count = messages.len(),
299                    history_count = history.len(),
300                    attempt_count = attempt_count,
301                    duration_ms = duration_ms,
302                    attempts = attempt + 1,
303                    "Fetched orchestration item via stored procedure"
304                );
305
306                return Ok(Some((
307                    OrchestrationItem {
308                        instance: instance_id,
309                        orchestration_name,
310                        execution_id: execution_id as u64,
311                        version: orchestration_version,
312                        history,
313                        messages,
314                        history_error,
315                    },
316                    lock_token,
317                    attempt_count as u32,
318                )));
319            }
320
321            // No result found - return immediately (short polling behavior)
322            // Only retry with delay on retryable errors (handled above)
323            return Ok(None);
324        }
325
326        Ok(None)
327    }
328    #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
329    async fn ack_orchestration_item(
330        &self,
331        lock_token: &str,
332        execution_id: u64,
333        history_delta: Vec<Event>,
334        worker_items: Vec<WorkItem>,
335        orchestrator_items: Vec<WorkItem>,
336        metadata: ExecutionMetadata,
337        cancelled_activities: Vec<ScheduledActivityIdentifier>,
338    ) -> Result<(), ProviderError> {
339        let start = std::time::Instant::now();
340
341        const MAX_RETRIES: u32 = 3;
342        const RETRY_DELAY_MS: u64 = 50;
343
344        let mut history_delta_payload = Vec::with_capacity(history_delta.len());
345        for event in &history_delta {
346            if event.event_id() == 0 {
347                return Err(ProviderError::permanent(
348                    "ack_orchestration_item",
349                    "event_id must be set by runtime",
350                ));
351            }
352
353            let event_json = serde_json::to_string(event).map_err(|e| {
354                ProviderError::permanent(
355                    "ack_orchestration_item",
356                    format!("Failed to serialize event: {e}"),
357                )
358            })?;
359
360            let event_type = format!("{event:?}")
361                .split('{')
362                .next()
363                .unwrap_or("Unknown")
364                .trim()
365                .to_string();
366
367            history_delta_payload.push(serde_json::json!({
368                "event_id": event.event_id(),
369                "event_type": event_type,
370                "event_data": event_json,
371            }));
372        }
373
374        let history_delta_json = serde_json::Value::Array(history_delta_payload);
375
376        let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
377            ProviderError::permanent(
378                "ack_orchestration_item",
379                format!("Failed to serialize worker items: {e}"),
380            )
381        })?;
382
383        let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
384            ProviderError::permanent(
385                "ack_orchestration_item",
386                format!("Failed to serialize orchestrator items: {e}"),
387            )
388        })?;
389
390        let metadata_json = serde_json::json!({
391            "orchestration_name": metadata.orchestration_name,
392            "orchestration_version": metadata.orchestration_version,
393            "status": metadata.status,
394            "output": metadata.output,
395            "parent_instance_id": metadata.parent_instance_id,
396            "pinned_duroxide_version": metadata.pinned_duroxide_version.as_ref().map(|v| {
397                serde_json::json!({
398                    "major": v.major,
399                    "minor": v.minor,
400                    "patch": v.patch,
401                })
402            }),
403        });
404
405        // Serialize cancelled activities for lock stealing
406        let cancelled_activities_json: Vec<serde_json::Value> = cancelled_activities
407            .iter()
408            .map(|a| {
409                serde_json::json!({
410                    "instance": a.instance,
411                    "execution_id": a.execution_id,
412                    "activity_id": a.activity_id,
413                })
414            })
415            .collect();
416        let cancelled_activities_json = serde_json::Value::Array(cancelled_activities_json);
417
418        for attempt in 0..=MAX_RETRIES {
419            let now_ms = Self::now_millis();
420            let result = sqlx::query(&format!(
421                "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6, $7, $8)",
422                self.schema_name
423            ))
424            .bind(lock_token)
425            .bind(now_ms)
426            .bind(execution_id as i64)
427            .bind(&history_delta_json)
428            .bind(&worker_items_json)
429            .bind(&orchestrator_items_json)
430            .bind(&metadata_json)
431            .bind(&cancelled_activities_json)
432            .execute(&*self.pool)
433            .await;
434
435            match result {
436                Ok(_) => {
437                    let duration_ms = start.elapsed().as_millis() as u64;
438                    debug!(
439                        target = "duroxide::providers::postgres",
440                        operation = "ack_orchestration_item",
441                        execution_id = execution_id,
442                        history_count = history_delta.len(),
443                        worker_items_count = worker_items.len(),
444                        orchestrator_items_count = orchestrator_items.len(),
445                        cancelled_activities_count = cancelled_activities.len(),
446                        duration_ms = duration_ms,
447                        attempts = attempt + 1,
448                        "Acknowledged orchestration item via stored procedure"
449                    );
450                    return Ok(());
451                }
452                Err(e) => {
453                    // Check for permanent errors first
454                    if let SqlxError::Database(db_err) = &e {
455                        if db_err.message().contains("Invalid lock token") {
456                            return Err(ProviderError::permanent(
457                                "ack_orchestration_item",
458                                "Invalid lock token",
459                            ));
460                        }
461                    } else if e.to_string().contains("Invalid lock token") {
462                        return Err(ProviderError::permanent(
463                            "ack_orchestration_item",
464                            "Invalid lock token",
465                        ));
466                    }
467
468                    let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
469                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
470                        warn!(
471                            target = "duroxide::providers::postgres",
472                            operation = "ack_orchestration_item",
473                            attempt = attempt + 1,
474                            error = %provider_err,
475                            "Retryable error, will retry"
476                        );
477                        sleep(std::time::Duration::from_millis(
478                            RETRY_DELAY_MS * (attempt as u64 + 1),
479                        ))
480                        .await;
481                        continue;
482                    }
483                    return Err(provider_err);
484                }
485            }
486        }
487
488        // Should never reach here, but just in case
489        Ok(())
490    }
491    #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
492    async fn abandon_orchestration_item(
493        &self,
494        lock_token: &str,
495        delay: Option<Duration>,
496        ignore_attempt: bool,
497    ) -> Result<(), ProviderError> {
498        let start = std::time::Instant::now();
499        let now_ms = Self::now_millis();
500        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
501
502        let instance_id = match sqlx::query_scalar::<_, String>(&format!(
503            "SELECT {}.abandon_orchestration_item($1, $2, $3, $4)",
504            self.schema_name
505        ))
506        .bind(lock_token)
507        .bind(now_ms)
508        .bind(delay_param)
509        .bind(ignore_attempt)
510        .fetch_one(&*self.pool)
511        .await
512        {
513            Ok(instance_id) => instance_id,
514            Err(e) => {
515                if let SqlxError::Database(db_err) = &e {
516                    if db_err.message().contains("Invalid lock token") {
517                        return Err(ProviderError::permanent(
518                            "abandon_orchestration_item",
519                            "Invalid lock token",
520                        ));
521                    }
522                } else if e.to_string().contains("Invalid lock token") {
523                    return Err(ProviderError::permanent(
524                        "abandon_orchestration_item",
525                        "Invalid lock token",
526                    ));
527                }
528
529                return Err(Self::sqlx_to_provider_error(
530                    "abandon_orchestration_item",
531                    e,
532                ));
533            }
534        };
535
536        let duration_ms = start.elapsed().as_millis() as u64;
537        debug!(
538            target = "duroxide::providers::postgres",
539            operation = "abandon_orchestration_item",
540            instance_id = %instance_id,
541            delay_ms = delay.map(|d| d.as_millis() as u64),
542            ignore_attempt = ignore_attempt,
543            duration_ms = duration_ms,
544            "Abandoned orchestration item via stored procedure"
545        );
546
547        Ok(())
548    }
549
550    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
551    async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
552        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
553            "SELECT out_event_data FROM {}.fetch_history($1)",
554            self.schema_name
555        ))
556        .bind(instance)
557        .fetch_all(&*self.pool)
558        .await
559        .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
560
561        Ok(event_data_rows
562            .into_iter()
563            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
564            .collect())
565    }
566
567    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
568    async fn append_with_execution(
569        &self,
570        instance: &str,
571        execution_id: u64,
572        new_events: Vec<Event>,
573    ) -> Result<(), ProviderError> {
574        if new_events.is_empty() {
575            return Ok(());
576        }
577
578        let mut events_payload = Vec::with_capacity(new_events.len());
579        for event in &new_events {
580            if event.event_id() == 0 {
581                error!(
582                    target = "duroxide::providers::postgres",
583                    operation = "append_with_execution",
584                    error_type = "validation_error",
585                    instance_id = %instance,
586                    execution_id = execution_id,
587                    "event_id must be set by runtime"
588                );
589                return Err(ProviderError::permanent(
590                    "append_with_execution",
591                    "event_id must be set by runtime",
592                ));
593            }
594
595            let event_json = serde_json::to_string(event).map_err(|e| {
596                ProviderError::permanent(
597                    "append_with_execution",
598                    format!("Failed to serialize event: {e}"),
599                )
600            })?;
601
602            let event_type = format!("{event:?}")
603                .split('{')
604                .next()
605                .unwrap_or("Unknown")
606                .trim()
607                .to_string();
608
609            events_payload.push(serde_json::json!({
610                "event_id": event.event_id(),
611                "event_type": event_type,
612                "event_data": event_json,
613            }));
614        }
615
616        let events_json = serde_json::Value::Array(events_payload);
617
618        sqlx::query(&format!(
619            "SELECT {}.append_history($1, $2, $3)",
620            self.schema_name
621        ))
622        .bind(instance)
623        .bind(execution_id as i64)
624        .bind(events_json)
625        .execute(&*self.pool)
626        .await
627        .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
628
629        debug!(
630            target = "duroxide::providers::postgres",
631            operation = "append_with_execution",
632            instance_id = %instance,
633            execution_id = execution_id,
634            event_count = new_events.len(),
635            "Appended history events via stored procedure"
636        );
637
638        Ok(())
639    }
640
641    #[instrument(skip(self), target = "duroxide::providers::postgres")]
642    async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
643        let work_item = serde_json::to_string(&item).map_err(|e| {
644            ProviderError::permanent(
645                "enqueue_worker_work",
646                format!("Failed to serialize work item: {e}"),
647            )
648        })?;
649
650        let now_ms = Self::now_millis();
651
652        // Extract activity identification for ActivityExecute items (for cancellation support)
653        let (instance_id, execution_id, activity_id) = match &item {
654            WorkItem::ActivityExecute {
655                instance,
656                execution_id,
657                id,
658                ..
659            } => (
660                Some(instance.clone()),
661                Some(*execution_id as i64),
662                Some(*id as i64),
663            ),
664            _ => (None, None, None),
665        };
666
667        sqlx::query(&format!(
668            "SELECT {}.enqueue_worker_work($1, $2, $3, $4, $5)",
669            self.schema_name
670        ))
671        .bind(work_item)
672        .bind(now_ms)
673        .bind(&instance_id)
674        .bind(execution_id)
675        .bind(activity_id)
676        .execute(&*self.pool)
677        .await
678        .map_err(|e| {
679            error!(
680                target = "duroxide::providers::postgres",
681                operation = "enqueue_worker_work",
682                error_type = "database_error",
683                error = %e,
684                "Failed to enqueue worker work"
685            );
686            Self::sqlx_to_provider_error("enqueue_worker_work", e)
687        })?;
688
689        Ok(())
690    }
691
692    #[instrument(skip(self), target = "duroxide::providers::postgres")]
693    async fn fetch_work_item(
694        &self,
695        lock_timeout: Duration,
696        _poll_timeout: Duration,
697    ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
698        let start = std::time::Instant::now();
699
700        // Convert Duration to milliseconds
701        let lock_timeout_ms = lock_timeout.as_millis() as i64;
702
703        let row = match sqlx::query_as::<_, (String, String, i32)>(&format!(
704            "SELECT * FROM {}.fetch_work_item($1, $2)",
705            self.schema_name
706        ))
707        .bind(Self::now_millis())
708        .bind(lock_timeout_ms)
709        .fetch_optional(&*self.pool)
710        .await
711        {
712            Ok(row) => row,
713            Err(e) => {
714                return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
715            }
716        };
717
718        let (work_item_json, lock_token, attempt_count) = match row {
719            Some(row) => row,
720            None => return Ok(None),
721        };
722
723        let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
724            ProviderError::permanent(
725                "fetch_work_item",
726                format!("Failed to deserialize worker item: {e}"),
727            )
728        })?;
729
730        let duration_ms = start.elapsed().as_millis() as u64;
731
732        // Extract instance for logging - different work item types have different structures
733        let instance_id = match &work_item {
734            WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
735            WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
736            WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
737            WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
738            WorkItem::TimerFired { instance, .. } => instance.as_str(),
739            WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
740            WorkItem::CancelInstance { instance, .. } => instance.as_str(),
741            WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
742            WorkItem::SubOrchCompleted {
743                parent_instance, ..
744            } => parent_instance.as_str(),
745            WorkItem::SubOrchFailed {
746                parent_instance, ..
747            } => parent_instance.as_str(),
748        };
749
750        debug!(
751            target = "duroxide::providers::postgres",
752            operation = "fetch_work_item",
753            instance_id = %instance_id,
754            attempt_count = attempt_count,
755            duration_ms = duration_ms,
756            "Fetched activity work item via stored procedure"
757        );
758
759        Ok(Some((
760            work_item,
761            lock_token,
762            attempt_count as u32,
763        )))
764    }
765
766    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
767    async fn ack_work_item(
768        &self,
769        token: &str,
770        completion: Option<WorkItem>,
771    ) -> Result<(), ProviderError> {
772        let start = std::time::Instant::now();
773
774        // If no completion provided (e.g., cancelled activity), just delete the item
775        let Some(completion) = completion else {
776            // Call ack_worker with NULL completion to delete without enqueueing
777            sqlx::query(&format!(
778                "SELECT {}.ack_worker($1)",
779                self.schema_name
780            ))
781            .bind(token)
782            .execute(&*self.pool)
783            .await
784            .map_err(|e| {
785                if e.to_string().contains("Worker queue item not found") {
786                    ProviderError::permanent(
787                        "ack_worker",
788                        "Worker queue item not found or already processed",
789                    )
790                } else {
791                    Self::sqlx_to_provider_error("ack_worker", e)
792                }
793            })?;
794
795            let duration_ms = start.elapsed().as_millis() as u64;
796            debug!(
797                target = "duroxide::providers::postgres",
798                operation = "ack_worker",
799                token = %token,
800                duration_ms = duration_ms,
801                "Acknowledged worker without completion (cancelled)"
802            );
803            return Ok(());
804        };
805
806        // Extract instance ID from completion WorkItem
807        let instance_id = match &completion {
808            WorkItem::ActivityCompleted { instance, .. }
809            | WorkItem::ActivityFailed { instance, .. } => instance,
810            _ => {
811                error!(
812                    target = "duroxide::providers::postgres",
813                    operation = "ack_worker",
814                    error_type = "invalid_completion_type",
815                    "Invalid completion work item type"
816                );
817                return Err(ProviderError::permanent(
818                    "ack_worker",
819                    "Invalid completion work item type",
820                ));
821            }
822        };
823
824        let completion_json = serde_json::to_string(&completion).map_err(|e| {
825            ProviderError::permanent("ack_worker", format!("Failed to serialize completion: {e}"))
826        })?;
827
828        let now_ms = Self::now_millis();
829
830        // Call stored procedure to atomically delete worker item and enqueue completion
831        sqlx::query(&format!(
832            "SELECT {}.ack_worker($1, $2, $3, $4)",
833            self.schema_name
834        ))
835        .bind(token)
836        .bind(instance_id)
837        .bind(completion_json)
838        .bind(now_ms)
839        .execute(&*self.pool)
840        .await
841        .map_err(|e| {
842            if e.to_string().contains("Worker queue item not found") {
843                error!(
844                    target = "duroxide::providers::postgres",
845                    operation = "ack_worker",
846                    error_type = "worker_item_not_found",
847                    token = %token,
848                    "Worker queue item not found or already processed"
849                );
850                ProviderError::permanent(
851                    "ack_worker",
852                    "Worker queue item not found or already processed",
853                )
854            } else {
855                Self::sqlx_to_provider_error("ack_worker", e)
856            }
857        })?;
858
859        let duration_ms = start.elapsed().as_millis() as u64;
860        debug!(
861            target = "duroxide::providers::postgres",
862            operation = "ack_worker",
863            instance_id = %instance_id,
864            duration_ms = duration_ms,
865            "Acknowledged worker and enqueued completion"
866        );
867
868        Ok(())
869    }
870
871    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
872    async fn renew_work_item_lock(
873        &self,
874        token: &str,
875        extend_for: Duration,
876    ) -> Result<(), ProviderError> {
877        let start = std::time::Instant::now();
878
879        // Get current time from application for consistent time reference
880        let now_ms = Self::now_millis();
881
882        // Convert Duration to seconds for the stored procedure
883        let extend_secs = extend_for.as_secs() as i64;
884
885        match sqlx::query(&format!(
886            "SELECT {}.renew_work_item_lock($1, $2, $3)",
887            self.schema_name
888        ))
889        .bind(token)
890        .bind(now_ms)
891        .bind(extend_secs)
892        .execute(&*self.pool)
893        .await
894        {
895            Ok(_) => {
896                let duration_ms = start.elapsed().as_millis() as u64;
897                debug!(
898                    target = "duroxide::providers::postgres",
899                    operation = "renew_work_item_lock",
900                    token = %token,
901                    extend_for_secs = extend_secs,
902                    duration_ms = duration_ms,
903                    "Work item lock renewed successfully"
904                );
905                Ok(())
906            }
907            Err(e) => {
908                if let SqlxError::Database(db_err) = &e {
909                    if db_err.message().contains("Lock token invalid") {
910                        return Err(ProviderError::permanent(
911                            "renew_work_item_lock",
912                            "Lock token invalid, expired, or already acked",
913                        ));
914                    }
915                } else if e.to_string().contains("Lock token invalid") {
916                    return Err(ProviderError::permanent(
917                        "renew_work_item_lock",
918                        "Lock token invalid, expired, or already acked",
919                    ));
920                }
921
922                Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
923            }
924        }
925    }
926
927    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
928    async fn abandon_work_item(
929        &self,
930        token: &str,
931        delay: Option<Duration>,
932        ignore_attempt: bool,
933    ) -> Result<(), ProviderError> {
934        let start = std::time::Instant::now();
935        let now_ms = Self::now_millis();
936        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
937
938        match sqlx::query(&format!(
939            "SELECT {}.abandon_work_item($1, $2, $3, $4)",
940            self.schema_name
941        ))
942        .bind(token)
943        .bind(now_ms)
944        .bind(delay_param)
945        .bind(ignore_attempt)
946        .execute(&*self.pool)
947        .await
948        {
949            Ok(_) => {
950                let duration_ms = start.elapsed().as_millis() as u64;
951                debug!(
952                    target = "duroxide::providers::postgres",
953                    operation = "abandon_work_item",
954                    token = %token,
955                    delay_ms = delay.map(|d| d.as_millis() as u64),
956                    ignore_attempt = ignore_attempt,
957                    duration_ms = duration_ms,
958                    "Abandoned work item via stored procedure"
959                );
960                Ok(())
961            }
962            Err(e) => {
963                if let SqlxError::Database(db_err) = &e {
964                    if db_err.message().contains("Invalid lock token")
965                        || db_err.message().contains("already acked")
966                    {
967                        return Err(ProviderError::permanent(
968                            "abandon_work_item",
969                            "Invalid lock token or already acked",
970                        ));
971                    }
972                } else if e.to_string().contains("Invalid lock token")
973                    || e.to_string().contains("already acked")
974                {
975                    return Err(ProviderError::permanent(
976                        "abandon_work_item",
977                        "Invalid lock token or already acked",
978                    ));
979                }
980
981                Err(Self::sqlx_to_provider_error("abandon_work_item", e))
982            }
983        }
984    }
985
986    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
987    async fn renew_orchestration_item_lock(
988        &self,
989        token: &str,
990        extend_for: Duration,
991    ) -> Result<(), ProviderError> {
992        let start = std::time::Instant::now();
993
994        // Get current time from application for consistent time reference
995        let now_ms = Self::now_millis();
996
997        // Convert Duration to seconds for the stored procedure
998        let extend_secs = extend_for.as_secs() as i64;
999
1000        match sqlx::query(&format!(
1001            "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
1002            self.schema_name
1003        ))
1004        .bind(token)
1005        .bind(now_ms)
1006        .bind(extend_secs)
1007        .execute(&*self.pool)
1008        .await
1009        {
1010            Ok(_) => {
1011                let duration_ms = start.elapsed().as_millis() as u64;
1012                debug!(
1013                    target = "duroxide::providers::postgres",
1014                    operation = "renew_orchestration_item_lock",
1015                    token = %token,
1016                    extend_for_secs = extend_secs,
1017                    duration_ms = duration_ms,
1018                    "Orchestration item lock renewed successfully"
1019                );
1020                Ok(())
1021            }
1022            Err(e) => {
1023                if let SqlxError::Database(db_err) = &e {
1024                    if db_err.message().contains("Lock token invalid")
1025                        || db_err.message().contains("expired")
1026                        || db_err.message().contains("already released")
1027                    {
1028                        return Err(ProviderError::permanent(
1029                            "renew_orchestration_item_lock",
1030                            "Lock token invalid, expired, or already released",
1031                        ));
1032                    }
1033                } else if e.to_string().contains("Lock token invalid")
1034                    || e.to_string().contains("expired")
1035                    || e.to_string().contains("already released")
1036                {
1037                    return Err(ProviderError::permanent(
1038                        "renew_orchestration_item_lock",
1039                        "Lock token invalid, expired, or already released",
1040                    ));
1041                }
1042
1043                Err(Self::sqlx_to_provider_error(
1044                    "renew_orchestration_item_lock",
1045                    e,
1046                ))
1047            }
1048        }
1049    }
1050
1051    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1052    async fn enqueue_for_orchestrator(
1053        &self,
1054        item: WorkItem,
1055        delay: Option<Duration>,
1056    ) -> Result<(), ProviderError> {
1057        let work_item = serde_json::to_string(&item).map_err(|e| {
1058            ProviderError::permanent(
1059                "enqueue_orchestrator_work",
1060                format!("Failed to serialize work item: {e}"),
1061            )
1062        })?;
1063
1064        // Extract instance ID from WorkItem enum
1065        let instance_id = match &item {
1066            WorkItem::StartOrchestration { instance, .. }
1067            | WorkItem::ActivityCompleted { instance, .. }
1068            | WorkItem::ActivityFailed { instance, .. }
1069            | WorkItem::TimerFired { instance, .. }
1070            | WorkItem::ExternalRaised { instance, .. }
1071            | WorkItem::CancelInstance { instance, .. }
1072            | WorkItem::ContinueAsNew { instance, .. } => instance,
1073            WorkItem::SubOrchCompleted {
1074                parent_instance, ..
1075            }
1076            | WorkItem::SubOrchFailed {
1077                parent_instance, ..
1078            } => parent_instance,
1079            WorkItem::ActivityExecute { .. } => {
1080                return Err(ProviderError::permanent(
1081                    "enqueue_orchestrator_work",
1082                    "ActivityExecute should go to worker queue, not orchestrator queue",
1083                ));
1084            }
1085        };
1086
1087        // Determine visible_at: use max of fire_at_ms (for TimerFired) and delay
1088        let now_ms = Self::now_millis();
1089
1090        let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
1091            if *fire_at_ms > 0 {
1092                // Take max of fire_at_ms and delay (if provided)
1093                if let Some(delay) = delay {
1094                    std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
1095                } else {
1096                    *fire_at_ms
1097                }
1098            } else {
1099                // fire_at_ms is 0, use delay or NOW()
1100                delay
1101                    .map(|d| now_ms as u64 + d.as_millis() as u64)
1102                    .unwrap_or(now_ms as u64)
1103            }
1104        } else {
1105            // Non-timer item: use delay or NOW()
1106            delay
1107                .map(|d| now_ms as u64 + d.as_millis() as u64)
1108                .unwrap_or(now_ms as u64)
1109        };
1110
1111        let visible_at = Utc
1112            .timestamp_millis_opt(visible_at_ms as i64)
1113            .single()
1114            .ok_or_else(|| {
1115                ProviderError::permanent(
1116                    "enqueue_orchestrator_work",
1117                    "Invalid visible_at timestamp",
1118                )
1119            })?;
1120
1121        // ⚠️ CRITICAL: DO NOT extract orchestration metadata - instance creation happens via ack_orchestration_item metadata
1122        // Pass NULL for orchestration_name, orchestration_version, execution_id parameters
1123
1124        // Call stored procedure to enqueue work
1125        sqlx::query(&format!(
1126            "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
1127            self.schema_name
1128        ))
1129        .bind(instance_id)
1130        .bind(&work_item)
1131        .bind(visible_at)
1132        .bind::<Option<String>>(None) // orchestration_name - NULL
1133        .bind::<Option<String>>(None) // orchestration_version - NULL
1134        .bind::<Option<i64>>(None) // execution_id - NULL
1135        .execute(&*self.pool)
1136        .await
1137        .map_err(|e| {
1138            error!(
1139                target = "duroxide::providers::postgres",
1140                operation = "enqueue_orchestrator_work",
1141                error_type = "database_error",
1142                error = %e,
1143                instance_id = %instance_id,
1144                "Failed to enqueue orchestrator work"
1145            );
1146            Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1147        })?;
1148
1149        debug!(
1150            target = "duroxide::providers::postgres",
1151            operation = "enqueue_orchestrator_work",
1152            instance_id = %instance_id,
1153            delay_ms = delay.map(|d| d.as_millis() as u64),
1154            "Enqueued orchestrator work"
1155        );
1156
1157        Ok(())
1158    }
1159
1160    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1161    async fn read_with_execution(
1162        &self,
1163        instance: &str,
1164        execution_id: u64,
1165    ) -> Result<Vec<Event>, ProviderError> {
1166        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1167            "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1168            self.table_name("history")
1169        ))
1170        .bind(instance)
1171        .bind(execution_id as i64)
1172        .fetch_all(&*self.pool)
1173        .await
1174        .ok()
1175        .unwrap_or_default();
1176
1177        Ok(event_data_rows
1178            .into_iter()
1179            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1180            .collect())
1181    }
1182
1183    fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1184        Some(self)
1185    }
1186}
1187
1188#[async_trait::async_trait]
1189impl ProviderAdmin for PostgresProvider {
1190    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1191    async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1192        sqlx::query_scalar(&format!(
1193            "SELECT instance_id FROM {}.list_instances()",
1194            self.schema_name
1195        ))
1196        .fetch_all(&*self.pool)
1197        .await
1198        .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1199    }
1200
1201    #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1202    async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1203        sqlx::query_scalar(&format!(
1204            "SELECT instance_id FROM {}.list_instances_by_status($1)",
1205            self.schema_name
1206        ))
1207        .bind(status)
1208        .fetch_all(&*self.pool)
1209        .await
1210        .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1211    }
1212
1213    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1214    async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1215        let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1216            "SELECT execution_id FROM {}.list_executions($1)",
1217            self.schema_name
1218        ))
1219        .bind(instance)
1220        .fetch_all(&*self.pool)
1221        .await
1222        .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1223
1224        Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1225    }
1226
1227    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1228    async fn read_history_with_execution_id(
1229        &self,
1230        instance: &str,
1231        execution_id: u64,
1232    ) -> Result<Vec<Event>, ProviderError> {
1233        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1234            "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1235            self.schema_name
1236        ))
1237        .bind(instance)
1238        .bind(execution_id as i64)
1239        .fetch_all(&*self.pool)
1240        .await
1241        .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1242
1243        event_data_rows
1244            .into_iter()
1245            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1246            .collect::<Vec<Event>>()
1247            .into_iter()
1248            .map(Ok)
1249            .collect()
1250    }
1251
1252    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1253    async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1254        let execution_id = self.latest_execution_id(instance).await?;
1255        self.read_history_with_execution_id(instance, execution_id)
1256            .await
1257    }
1258
1259    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1260    async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1261        sqlx::query_scalar(&format!(
1262            "SELECT {}.latest_execution_id($1)",
1263            self.schema_name
1264        ))
1265        .bind(instance)
1266        .fetch_optional(&*self.pool)
1267        .await
1268        .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1269        .map(|id: i64| id as u64)
1270        .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1271    }
1272
1273    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1274    async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1275        let row: Option<(
1276            String,
1277            String,
1278            String,
1279            i64,
1280            chrono::DateTime<Utc>,
1281            Option<chrono::DateTime<Utc>>,
1282            Option<String>,
1283            Option<String>,
1284            Option<String>,
1285        )> = sqlx::query_as(&format!(
1286            "SELECT * FROM {}.get_instance_info($1)",
1287            self.schema_name
1288        ))
1289        .bind(instance)
1290        .fetch_optional(&*self.pool)
1291        .await
1292        .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1293
1294        let (
1295            instance_id,
1296            orchestration_name,
1297            orchestration_version,
1298            current_execution_id,
1299            created_at,
1300            updated_at,
1301            status,
1302            output,
1303            parent_instance_id,
1304        ) =
1305            row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1306
1307        Ok(InstanceInfo {
1308            instance_id,
1309            orchestration_name,
1310            orchestration_version,
1311            current_execution_id: current_execution_id as u64,
1312            status: status.unwrap_or_else(|| "Running".to_string()),
1313            output,
1314            created_at: created_at.timestamp_millis() as u64,
1315            updated_at: updated_at
1316                .map(|dt| dt.timestamp_millis() as u64)
1317                .unwrap_or(created_at.timestamp_millis() as u64),
1318            parent_instance_id,
1319        })
1320    }
1321
1322    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1323    async fn get_execution_info(
1324        &self,
1325        instance: &str,
1326        execution_id: u64,
1327    ) -> Result<ExecutionInfo, ProviderError> {
1328        let row: Option<(
1329            i64,
1330            String,
1331            Option<String>,
1332            chrono::DateTime<Utc>,
1333            Option<chrono::DateTime<Utc>>,
1334            i64,
1335        )> = sqlx::query_as(&format!(
1336            "SELECT * FROM {}.get_execution_info($1, $2)",
1337            self.schema_name
1338        ))
1339        .bind(instance)
1340        .bind(execution_id as i64)
1341        .fetch_optional(&*self.pool)
1342        .await
1343        .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1344
1345        let (exec_id, status, output, started_at, completed_at, event_count) = row
1346            .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1347
1348        Ok(ExecutionInfo {
1349            execution_id: exec_id as u64,
1350            status,
1351            output,
1352            started_at: started_at.timestamp_millis() as u64,
1353            completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1354            event_count: event_count as usize,
1355        })
1356    }
1357
1358    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1359    async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1360        let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1361            "SELECT * FROM {}.get_system_metrics()",
1362            self.schema_name
1363        ))
1364        .fetch_optional(&*self.pool)
1365        .await
1366        .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1367
1368        let (
1369            total_instances,
1370            total_executions,
1371            running_instances,
1372            completed_instances,
1373            failed_instances,
1374            total_events,
1375        ) = row.ok_or_else(|| {
1376            ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1377        })?;
1378
1379        Ok(SystemMetrics {
1380            total_instances: total_instances as u64,
1381            total_executions: total_executions as u64,
1382            running_instances: running_instances as u64,
1383            completed_instances: completed_instances as u64,
1384            failed_instances: failed_instances as u64,
1385            total_events: total_events as u64,
1386        })
1387    }
1388
1389    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1390    async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1391        let now_ms = Self::now_millis();
1392
1393        let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1394            "SELECT * FROM {}.get_queue_depths($1)",
1395            self.schema_name
1396        ))
1397        .bind(now_ms)
1398        .fetch_optional(&*self.pool)
1399        .await
1400        .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1401
1402        let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1403            ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1404        })?;
1405
1406        Ok(QueueDepths {
1407            orchestrator_queue: orchestrator_queue as usize,
1408            worker_queue: worker_queue as usize,
1409            timer_queue: 0, // Timers are in orchestrator queue with delayed visibility
1410        })
1411    }
1412
1413    // ===== Hierarchy Primitive Operations =====
1414
1415    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1416    async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError> {
1417        sqlx::query_scalar(&format!(
1418            "SELECT child_instance_id FROM {}.list_children($1)",
1419            self.schema_name
1420        ))
1421        .bind(instance_id)
1422        .fetch_all(&*self.pool)
1423        .await
1424        .map_err(|e| Self::sqlx_to_provider_error("list_children", e))
1425    }
1426
1427    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1428    async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError> {
1429        // The stored procedure raises an exception if instance doesn't exist
1430        // Otherwise returns the parent_instance_id (which may be NULL)
1431        let result: Result<Option<String>, _> = sqlx::query_scalar(&format!(
1432            "SELECT {}.get_parent_id($1)",
1433            self.schema_name
1434        ))
1435        .bind(instance_id)
1436        .fetch_one(&*self.pool)
1437        .await;
1438
1439        match result {
1440            Ok(parent_id) => Ok(parent_id),
1441            Err(e) => {
1442                let err_str = e.to_string();
1443                if err_str.contains("Instance not found") {
1444                    Err(ProviderError::permanent(
1445                        "get_parent_id",
1446                        format!("Instance not found: {}", instance_id),
1447                    ))
1448                } else {
1449                    Err(Self::sqlx_to_provider_error("get_parent_id", e))
1450                }
1451            }
1452        }
1453    }
1454
1455    // ===== Deletion Operations =====
1456
1457    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1458    async fn delete_instances_atomic(
1459        &self,
1460        ids: &[String],
1461        force: bool,
1462    ) -> Result<DeleteInstanceResult, ProviderError> {
1463        if ids.is_empty() {
1464            return Ok(DeleteInstanceResult::default());
1465        }
1466
1467        let row: Option<(i64, i64, i64, i64)> = sqlx::query_as(&format!(
1468            "SELECT * FROM {}.delete_instances_atomic($1, $2)",
1469            self.schema_name
1470        ))
1471        .bind(ids)
1472        .bind(force)
1473        .fetch_optional(&*self.pool)
1474        .await
1475        .map_err(|e| {
1476            let err_str = e.to_string();
1477            if err_str.contains("is Running") {
1478                ProviderError::permanent(
1479                    "delete_instances_atomic",
1480                    err_str,
1481                )
1482            } else if err_str.contains("Orphan detected") {
1483                ProviderError::permanent(
1484                    "delete_instances_atomic",
1485                    err_str,
1486                )
1487            } else {
1488                Self::sqlx_to_provider_error("delete_instances_atomic", e)
1489            }
1490        })?;
1491
1492        let (instances_deleted, executions_deleted, events_deleted, queue_messages_deleted) =
1493            row.unwrap_or((0, 0, 0, 0));
1494
1495        debug!(
1496            target = "duroxide::providers::postgres",
1497            operation = "delete_instances_atomic",
1498            instances_deleted = instances_deleted,
1499            executions_deleted = executions_deleted,
1500            events_deleted = events_deleted,
1501            queue_messages_deleted = queue_messages_deleted,
1502            "Deleted instances atomically"
1503        );
1504
1505        Ok(DeleteInstanceResult {
1506            instances_deleted: instances_deleted as u64,
1507            executions_deleted: executions_deleted as u64,
1508            events_deleted: events_deleted as u64,
1509            queue_messages_deleted: queue_messages_deleted as u64,
1510        })
1511    }
1512
1513    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1514    async fn delete_instance_bulk(
1515        &self,
1516        filter: InstanceFilter,
1517    ) -> Result<DeleteInstanceResult, ProviderError> {
1518        // Build query to find matching root instances in terminal states
1519        let mut sql = format!(
1520            r#"
1521            SELECT i.instance_id
1522            FROM {}.instances i
1523            LEFT JOIN {}.executions e ON i.instance_id = e.instance_id 
1524              AND i.current_execution_id = e.execution_id
1525            WHERE i.parent_instance_id IS NULL
1526              AND e.status IN ('Completed', 'Failed', 'ContinuedAsNew')
1527            "#,
1528            self.schema_name, self.schema_name
1529        );
1530
1531        // Add instance_ids filter if provided
1532        if let Some(ref ids) = filter.instance_ids {
1533            if ids.is_empty() {
1534                return Ok(DeleteInstanceResult::default());
1535            }
1536            let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1537            sql.push_str(&format!(
1538                " AND i.instance_id IN ({})",
1539                placeholders.join(", ")
1540            ));
1541        }
1542
1543        // Add completed_before filter if provided
1544        if filter.completed_before.is_some() {
1545            let param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0) + 1;
1546            sql.push_str(&format!(
1547                " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1548                param_num
1549            ));
1550        }
1551
1552        // Add limit
1553        let limit = filter.limit.unwrap_or(1000);
1554        let limit_param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0)
1555            + if filter.completed_before.is_some() { 1 } else { 0 }
1556            + 1;
1557        sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1558
1559        // Build and execute query
1560        let mut query = sqlx::query_scalar::<_, String>(&sql);
1561        if let Some(ref ids) = filter.instance_ids {
1562            for id in ids {
1563                query = query.bind(id);
1564            }
1565        }
1566        if let Some(completed_before) = filter.completed_before {
1567            query = query.bind(completed_before as i64);
1568        }
1569        query = query.bind(limit as i64);
1570
1571        let instance_ids: Vec<String> = query
1572            .fetch_all(&*self.pool)
1573            .await
1574            .map_err(|e| Self::sqlx_to_provider_error("delete_instance_bulk", e))?;
1575
1576        if instance_ids.is_empty() {
1577            return Ok(DeleteInstanceResult::default());
1578        }
1579
1580        // Delete each instance with cascade
1581        let mut result = DeleteInstanceResult::default();
1582
1583        for instance_id in &instance_ids {
1584            // Get full tree for this root
1585            let tree = self.get_instance_tree(instance_id).await?;
1586
1587            // Atomic delete (tree.all_ids is already in deletion order: children first)
1588            let delete_result = self.delete_instances_atomic(&tree.all_ids, true).await?;
1589            result.instances_deleted += delete_result.instances_deleted;
1590            result.executions_deleted += delete_result.executions_deleted;
1591            result.events_deleted += delete_result.events_deleted;
1592            result.queue_messages_deleted += delete_result.queue_messages_deleted;
1593        }
1594
1595        debug!(
1596            target = "duroxide::providers::postgres",
1597            operation = "delete_instance_bulk",
1598            instances_deleted = result.instances_deleted,
1599            executions_deleted = result.executions_deleted,
1600            events_deleted = result.events_deleted,
1601            queue_messages_deleted = result.queue_messages_deleted,
1602            "Bulk deleted instances"
1603        );
1604
1605        Ok(result)
1606    }
1607
1608    // ===== Pruning Operations =====
1609
1610    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1611    async fn prune_executions(
1612        &self,
1613        instance_id: &str,
1614        options: PruneOptions,
1615    ) -> Result<PruneResult, ProviderError> {
1616        let keep_last: Option<i32> = options.keep_last.map(|v| v as i32);
1617        let completed_before_ms: Option<i64> = options.completed_before.map(|v| v as i64);
1618
1619        let row: Option<(i64, i64, i64)> = sqlx::query_as(&format!(
1620            "SELECT * FROM {}.prune_executions($1, $2, $3)",
1621            self.schema_name
1622        ))
1623        .bind(instance_id)
1624        .bind(keep_last)
1625        .bind(completed_before_ms)
1626        .fetch_optional(&*self.pool)
1627        .await
1628        .map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?;
1629
1630        let (instances_processed, executions_deleted, events_deleted) = row.unwrap_or((0, 0, 0));
1631
1632        debug!(
1633            target = "duroxide::providers::postgres",
1634            operation = "prune_executions",
1635            instance_id = %instance_id,
1636            instances_processed = instances_processed,
1637            executions_deleted = executions_deleted,
1638            events_deleted = events_deleted,
1639            "Pruned executions"
1640        );
1641
1642        Ok(PruneResult {
1643            instances_processed: instances_processed as u64,
1644            executions_deleted: executions_deleted as u64,
1645            events_deleted: events_deleted as u64,
1646        })
1647    }
1648
1649    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1650    async fn prune_executions_bulk(
1651        &self,
1652        filter: InstanceFilter,
1653        options: PruneOptions,
1654    ) -> Result<PruneResult, ProviderError> {
1655        // Find matching instances (all statuses - prune_executions protects current execution)
1656        // Note: We include Running instances because long-running orchestrations (e.g., with
1657        // ContinueAsNew) may have old executions that need pruning. The underlying prune_executions
1658        // call safely skips the current execution regardless of its status.
1659        let mut sql = format!(
1660            r#"
1661            SELECT i.instance_id
1662            FROM {}.instances i
1663            LEFT JOIN {}.executions e ON i.instance_id = e.instance_id 
1664              AND i.current_execution_id = e.execution_id
1665            WHERE 1=1
1666            "#,
1667            self.schema_name, self.schema_name
1668        );
1669
1670        // Add instance_ids filter if provided
1671        if let Some(ref ids) = filter.instance_ids {
1672            if ids.is_empty() {
1673                return Ok(PruneResult::default());
1674            }
1675            let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1676            sql.push_str(&format!(
1677                " AND i.instance_id IN ({})",
1678                placeholders.join(", ")
1679            ));
1680        }
1681
1682        // Add completed_before filter if provided
1683        if filter.completed_before.is_some() {
1684            let param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0) + 1;
1685            sql.push_str(&format!(
1686                " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1687                param_num
1688            ));
1689        }
1690
1691        // Add limit
1692        let limit = filter.limit.unwrap_or(1000);
1693        let limit_param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0)
1694            + if filter.completed_before.is_some() { 1 } else { 0 }
1695            + 1;
1696        sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1697
1698        // Build and execute query
1699        let mut query = sqlx::query_scalar::<_, String>(&sql);
1700        if let Some(ref ids) = filter.instance_ids {
1701            for id in ids {
1702                query = query.bind(id);
1703            }
1704        }
1705        if let Some(completed_before) = filter.completed_before {
1706            query = query.bind(completed_before as i64);
1707        }
1708        query = query.bind(limit as i64);
1709
1710        let instance_ids: Vec<String> = query
1711            .fetch_all(&*self.pool)
1712            .await
1713            .map_err(|e| Self::sqlx_to_provider_error("prune_executions_bulk", e))?;
1714
1715        // Prune each instance
1716        let mut result = PruneResult::default();
1717
1718        for instance_id in &instance_ids {
1719            let single_result = self.prune_executions(instance_id, options.clone()).await?;
1720            result.instances_processed += single_result.instances_processed;
1721            result.executions_deleted += single_result.executions_deleted;
1722            result.events_deleted += single_result.events_deleted;
1723        }
1724
1725        debug!(
1726            target = "duroxide::providers::postgres",
1727            operation = "prune_executions_bulk",
1728            instances_processed = result.instances_processed,
1729            executions_deleted = result.executions_deleted,
1730            events_deleted = result.events_deleted,
1731            "Bulk pruned executions"
1732        );
1733
1734        Ok(result)
1735    }
1736}