duroxide_pg/
provider.rs

1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4    DeleteInstanceResult, ExecutionInfo, ExecutionMetadata, InstanceFilter, InstanceInfo,
5    OrchestrationItem, Provider, ProviderAdmin, ProviderError, PruneOptions, PruneResult,
6    QueueDepths, ScheduledActivityIdentifier, SystemMetrics, WorkItem,
7};
8use duroxide::Event;
9use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
10use std::sync::Arc;
11use std::time::Duration;
12use std::time::{SystemTime, UNIX_EPOCH};
13use tokio::time::sleep;
14use tracing::{debug, error, instrument, warn};
15
16use crate::migrations::MigrationRunner;
17
18/// PostgreSQL-based provider for Duroxide durable orchestrations.
19///
20/// Implements the [`Provider`] and [`ProviderAdmin`] traits from Duroxide,
21/// storing orchestration state, history, and work queues in PostgreSQL.
22///
23/// # Example
24///
25/// ```rust,no_run
26/// use duroxide_pg::PostgresProvider;
27///
28/// # async fn example() -> anyhow::Result<()> {
29/// // Connect using DATABASE_URL or explicit connection string
30/// let provider = PostgresProvider::new("postgres://localhost/mydb").await?;
31///
32/// // Or use a custom schema for isolation
33/// let provider = PostgresProvider::new_with_schema(
34///     "postgres://localhost/mydb",
35///     Some("my_app"),
36/// ).await?;
37/// # Ok(())
38/// # }
39/// ```
40pub struct PostgresProvider {
41    pool: Arc<PgPool>,
42    schema_name: String,
43}
44
45impl PostgresProvider {
46    pub async fn new(database_url: &str) -> Result<Self> {
47        Self::new_with_schema(database_url, None).await
48    }
49
50    pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
51        let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
52            .ok()
53            .and_then(|s| s.parse::<u32>().ok())
54            .unwrap_or(10);
55
56        let pool = PgPoolOptions::new()
57            .max_connections(max_connections)
58            .min_connections(1)
59            .acquire_timeout(std::time::Duration::from_secs(30))
60            .connect(database_url)
61            .await?;
62
63        let schema_name = schema_name.unwrap_or("public").to_string();
64
65        let provider = Self {
66            pool: Arc::new(pool),
67            schema_name: schema_name.clone(),
68        };
69
70        // Run migrations to initialize schema
71        let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
72        migration_runner.migrate().await?;
73
74        Ok(provider)
75    }
76
77    #[instrument(skip(self), target = "duroxide::providers::postgres")]
78    pub async fn initialize_schema(&self) -> Result<()> {
79        // Schema initialization is now handled by migrations
80        // This method is kept for backward compatibility but delegates to migrations
81        let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
82        migration_runner.migrate().await?;
83        Ok(())
84    }
85
86    /// Get current timestamp in milliseconds (Unix epoch)
87    fn now_millis() -> i64 {
88        SystemTime::now()
89            .duration_since(UNIX_EPOCH)
90            .unwrap()
91            .as_millis() as i64
92    }
93
94
95
96    /// Get schema-qualified table name
97    fn table_name(&self, table: &str) -> String {
98        format!("{}.{}", self.schema_name, table)
99    }
100
101    /// Get the database pool (for testing)
102    pub fn pool(&self) -> &PgPool {
103        &self.pool
104    }
105
106    /// Get the schema name (for testing)
107    pub fn schema_name(&self) -> &str {
108        &self.schema_name
109    }
110
111    /// Convert sqlx::Error to ProviderError with proper classification
112    fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
113        match e {
114            SqlxError::Database(ref db_err) => {
115                // PostgreSQL error codes
116                let code_opt = db_err.code();
117                let code = code_opt.as_deref();
118                if code == Some("40P01") {
119                    // Deadlock detected
120                    ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
121                } else if code == Some("40001") {
122                    // Serialization failure - permanent error (transaction conflict, not transient)
123                    ProviderError::permanent(operation, format!("Serialization failure: {e}"))
124                } else if code == Some("23505") {
125                    // Unique constraint violation (duplicate event)
126                    ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
127                } else if code == Some("23503") {
128                    // Foreign key constraint violation
129                    ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
130                } else {
131                    ProviderError::permanent(operation, format!("Database error: {e}"))
132                }
133            }
134            SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
135                ProviderError::retryable(operation, format!("Connection pool error: {e}"))
136            }
137            SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
138            _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
139        }
140    }
141
142    /// Clean up schema after tests (drops all tables and optionally the schema)
143    ///
144    /// **SAFETY**: Never drops the "public" schema itself, only tables within it.
145    /// Only drops the schema if it's a custom schema (not "public").
146    pub async fn cleanup_schema(&self) -> Result<()> {
147        // Call the stored procedure to drop all tables
148        sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
149            .execute(&*self.pool)
150            .await?;
151
152        // SAFETY: Never drop the "public" schema - it's a PostgreSQL system schema
153        // Only drop custom schemas created for testing
154        if self.schema_name != "public" {
155            sqlx::query(&format!(
156                "DROP SCHEMA IF EXISTS {} CASCADE",
157                self.schema_name
158            ))
159            .execute(&*self.pool)
160            .await?;
161        } else {
162            // Explicit safeguard: we only drop tables from public schema, never the schema itself
163            // This ensures we don't accidentally drop the default PostgreSQL schema
164        }
165
166        Ok(())
167    }
168}
169
170#[async_trait::async_trait]
171impl Provider for PostgresProvider {
172    fn name(&self) -> &str {
173        "duroxide-pg"
174    }
175
176    fn version(&self) -> &str {
177        env!("CARGO_PKG_VERSION")
178    }
179
180    #[instrument(skip(self), target = "duroxide::providers::postgres")]
181    async fn fetch_orchestration_item(
182        &self,
183        lock_timeout: Duration,
184        _poll_timeout: Duration,
185    ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
186        let start = std::time::Instant::now();
187
188        const MAX_RETRIES: u32 = 3;
189        const RETRY_DELAY_MS: u64 = 50;
190
191        // Convert Duration to milliseconds
192        let lock_timeout_ms = lock_timeout.as_millis() as i64;
193        let mut _last_error: Option<ProviderError> = None;
194
195        for attempt in 0..=MAX_RETRIES {
196            let now_ms = Self::now_millis();
197
198            let result: Result<
199                Option<(
200                    String,
201                    String,
202                    String,
203                    i64,
204                    serde_json::Value,
205                    serde_json::Value,
206                    String,
207                    i32,
208                )>,
209                SqlxError,
210            > = sqlx::query_as(&format!(
211                "SELECT * FROM {}.fetch_orchestration_item($1, $2)",
212                self.schema_name
213            ))
214            .bind(now_ms)
215            .bind(lock_timeout_ms)
216            .fetch_optional(&*self.pool)
217            .await;
218
219            let row = match result {
220                Ok(r) => r,
221                Err(e) => {
222                    let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
223                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
224                        warn!(
225                            target = "duroxide::providers::postgres",
226                            operation = "fetch_orchestration_item",
227                            attempt = attempt + 1,
228                            error = %provider_err,
229                            "Retryable error, will retry"
230                        );
231                        _last_error = Some(provider_err);
232                        sleep(std::time::Duration::from_millis(
233                            RETRY_DELAY_MS * (attempt as u64 + 1),
234                        ))
235                        .await;
236                        continue;
237                    }
238                    return Err(provider_err);
239                }
240            };
241
242            if let Some((
243                instance_id,
244                orchestration_name,
245                orchestration_version,
246                execution_id,
247                history_json,
248                messages_json,
249                lock_token,
250                attempt_count,
251            )) = row
252            {
253                let history: Vec<Event> = serde_json::from_value(history_json).map_err(|e| {
254                    ProviderError::permanent(
255                        "fetch_orchestration_item",
256                        format!("Failed to deserialize history: {e}"),
257                    )
258                })?;
259
260                let messages: Vec<WorkItem> =
261                    serde_json::from_value(messages_json).map_err(|e| {
262                        ProviderError::permanent(
263                            "fetch_orchestration_item",
264                            format!("Failed to deserialize messages: {e}"),
265                        )
266                    })?;
267
268                let duration_ms = start.elapsed().as_millis() as u64;
269                debug!(
270                    target = "duroxide::providers::postgres",
271                    operation = "fetch_orchestration_item",
272                    instance_id = %instance_id,
273                    execution_id = execution_id,
274                    message_count = messages.len(),
275                    history_count = history.len(),
276                    attempt_count = attempt_count,
277                    duration_ms = duration_ms,
278                    attempts = attempt + 1,
279                    "Fetched orchestration item via stored procedure"
280                );
281
282                return Ok(Some((
283                    OrchestrationItem {
284                        instance: instance_id,
285                        orchestration_name,
286                        execution_id: execution_id as u64,
287                        version: orchestration_version,
288                        history,
289                        messages,
290                    },
291                    lock_token,
292                    attempt_count as u32,
293                )));
294            }
295
296            // No result found - return immediately (short polling behavior)
297            // Only retry with delay on retryable errors (handled above)
298            return Ok(None);
299        }
300
301        Ok(None)
302    }
303    #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
304    async fn ack_orchestration_item(
305        &self,
306        lock_token: &str,
307        execution_id: u64,
308        history_delta: Vec<Event>,
309        worker_items: Vec<WorkItem>,
310        orchestrator_items: Vec<WorkItem>,
311        metadata: ExecutionMetadata,
312        cancelled_activities: Vec<ScheduledActivityIdentifier>,
313    ) -> Result<(), ProviderError> {
314        let start = std::time::Instant::now();
315
316        const MAX_RETRIES: u32 = 3;
317        const RETRY_DELAY_MS: u64 = 50;
318
319        let mut history_delta_payload = Vec::with_capacity(history_delta.len());
320        for event in &history_delta {
321            if event.event_id() == 0 {
322                return Err(ProviderError::permanent(
323                    "ack_orchestration_item",
324                    "event_id must be set by runtime",
325                ));
326            }
327
328            let event_json = serde_json::to_string(event).map_err(|e| {
329                ProviderError::permanent(
330                    "ack_orchestration_item",
331                    format!("Failed to serialize event: {e}"),
332                )
333            })?;
334
335            let event_type = format!("{event:?}")
336                .split('{')
337                .next()
338                .unwrap_or("Unknown")
339                .trim()
340                .to_string();
341
342            history_delta_payload.push(serde_json::json!({
343                "event_id": event.event_id(),
344                "event_type": event_type,
345                "event_data": event_json,
346            }));
347        }
348
349        let history_delta_json = serde_json::Value::Array(history_delta_payload);
350
351        let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
352            ProviderError::permanent(
353                "ack_orchestration_item",
354                format!("Failed to serialize worker items: {e}"),
355            )
356        })?;
357
358        let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
359            ProviderError::permanent(
360                "ack_orchestration_item",
361                format!("Failed to serialize orchestrator items: {e}"),
362            )
363        })?;
364
365        let metadata_json = serde_json::json!({
366            "orchestration_name": metadata.orchestration_name,
367            "orchestration_version": metadata.orchestration_version,
368            "status": metadata.status,
369            "output": metadata.output,
370            "parent_instance_id": metadata.parent_instance_id,
371        });
372
373        // Serialize cancelled activities for lock stealing
374        let cancelled_activities_json: Vec<serde_json::Value> = cancelled_activities
375            .iter()
376            .map(|a| {
377                serde_json::json!({
378                    "instance": a.instance,
379                    "execution_id": a.execution_id,
380                    "activity_id": a.activity_id,
381                })
382            })
383            .collect();
384        let cancelled_activities_json = serde_json::Value::Array(cancelled_activities_json);
385
386        for attempt in 0..=MAX_RETRIES {
387            let result = sqlx::query(&format!(
388                "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6, $7)",
389                self.schema_name
390            ))
391            .bind(lock_token)
392            .bind(execution_id as i64)
393            .bind(&history_delta_json)
394            .bind(&worker_items_json)
395            .bind(&orchestrator_items_json)
396            .bind(&metadata_json)
397            .bind(&cancelled_activities_json)
398            .execute(&*self.pool)
399            .await;
400
401            match result {
402                Ok(_) => {
403                    let duration_ms = start.elapsed().as_millis() as u64;
404                    debug!(
405                        target = "duroxide::providers::postgres",
406                        operation = "ack_orchestration_item",
407                        execution_id = execution_id,
408                        history_count = history_delta.len(),
409                        worker_items_count = worker_items.len(),
410                        orchestrator_items_count = orchestrator_items.len(),
411                        cancelled_activities_count = cancelled_activities.len(),
412                        duration_ms = duration_ms,
413                        attempts = attempt + 1,
414                        "Acknowledged orchestration item via stored procedure"
415                    );
416                    return Ok(());
417                }
418                Err(e) => {
419                    // Check for permanent errors first
420                    if let SqlxError::Database(db_err) = &e {
421                        if db_err.message().contains("Invalid lock token") {
422                            return Err(ProviderError::permanent(
423                                "ack_orchestration_item",
424                                "Invalid lock token",
425                            ));
426                        }
427                    } else if e.to_string().contains("Invalid lock token") {
428                        return Err(ProviderError::permanent(
429                            "ack_orchestration_item",
430                            "Invalid lock token",
431                        ));
432                    }
433
434                    let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
435                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
436                        warn!(
437                            target = "duroxide::providers::postgres",
438                            operation = "ack_orchestration_item",
439                            attempt = attempt + 1,
440                            error = %provider_err,
441                            "Retryable error, will retry"
442                        );
443                        sleep(std::time::Duration::from_millis(
444                            RETRY_DELAY_MS * (attempt as u64 + 1),
445                        ))
446                        .await;
447                        continue;
448                    }
449                    return Err(provider_err);
450                }
451            }
452        }
453
454        // Should never reach here, but just in case
455        Ok(())
456    }
457    #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
458    async fn abandon_orchestration_item(
459        &self,
460        lock_token: &str,
461        delay: Option<Duration>,
462        ignore_attempt: bool,
463    ) -> Result<(), ProviderError> {
464        let start = std::time::Instant::now();
465        let now_ms = Self::now_millis();
466        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
467
468        let instance_id = match sqlx::query_scalar::<_, String>(&format!(
469            "SELECT {}.abandon_orchestration_item($1, $2, $3, $4)",
470            self.schema_name
471        ))
472        .bind(lock_token)
473        .bind(now_ms)
474        .bind(delay_param)
475        .bind(ignore_attempt)
476        .fetch_one(&*self.pool)
477        .await
478        {
479            Ok(instance_id) => instance_id,
480            Err(e) => {
481                if let SqlxError::Database(db_err) = &e {
482                    if db_err.message().contains("Invalid lock token") {
483                        return Err(ProviderError::permanent(
484                            "abandon_orchestration_item",
485                            "Invalid lock token",
486                        ));
487                    }
488                } else if e.to_string().contains("Invalid lock token") {
489                    return Err(ProviderError::permanent(
490                        "abandon_orchestration_item",
491                        "Invalid lock token",
492                    ));
493                }
494
495                return Err(Self::sqlx_to_provider_error(
496                    "abandon_orchestration_item",
497                    e,
498                ));
499            }
500        };
501
502        let duration_ms = start.elapsed().as_millis() as u64;
503        debug!(
504            target = "duroxide::providers::postgres",
505            operation = "abandon_orchestration_item",
506            instance_id = %instance_id,
507            delay_ms = delay.map(|d| d.as_millis() as u64),
508            ignore_attempt = ignore_attempt,
509            duration_ms = duration_ms,
510            "Abandoned orchestration item via stored procedure"
511        );
512
513        Ok(())
514    }
515
516    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
517    async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
518        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
519            "SELECT out_event_data FROM {}.fetch_history($1)",
520            self.schema_name
521        ))
522        .bind(instance)
523        .fetch_all(&*self.pool)
524        .await
525        .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
526
527        Ok(event_data_rows
528            .into_iter()
529            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
530            .collect())
531    }
532
533    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
534    async fn append_with_execution(
535        &self,
536        instance: &str,
537        execution_id: u64,
538        new_events: Vec<Event>,
539    ) -> Result<(), ProviderError> {
540        if new_events.is_empty() {
541            return Ok(());
542        }
543
544        let mut events_payload = Vec::with_capacity(new_events.len());
545        for event in &new_events {
546            if event.event_id() == 0 {
547                error!(
548                    target = "duroxide::providers::postgres",
549                    operation = "append_with_execution",
550                    error_type = "validation_error",
551                    instance_id = %instance,
552                    execution_id = execution_id,
553                    "event_id must be set by runtime"
554                );
555                return Err(ProviderError::permanent(
556                    "append_with_execution",
557                    "event_id must be set by runtime",
558                ));
559            }
560
561            let event_json = serde_json::to_string(event).map_err(|e| {
562                ProviderError::permanent(
563                    "append_with_execution",
564                    format!("Failed to serialize event: {e}"),
565                )
566            })?;
567
568            let event_type = format!("{event:?}")
569                .split('{')
570                .next()
571                .unwrap_or("Unknown")
572                .trim()
573                .to_string();
574
575            events_payload.push(serde_json::json!({
576                "event_id": event.event_id(),
577                "event_type": event_type,
578                "event_data": event_json,
579            }));
580        }
581
582        let events_json = serde_json::Value::Array(events_payload);
583
584        sqlx::query(&format!(
585            "SELECT {}.append_history($1, $2, $3)",
586            self.schema_name
587        ))
588        .bind(instance)
589        .bind(execution_id as i64)
590        .bind(events_json)
591        .execute(&*self.pool)
592        .await
593        .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
594
595        debug!(
596            target = "duroxide::providers::postgres",
597            operation = "append_with_execution",
598            instance_id = %instance,
599            execution_id = execution_id,
600            event_count = new_events.len(),
601            "Appended history events via stored procedure"
602        );
603
604        Ok(())
605    }
606
607    #[instrument(skip(self), target = "duroxide::providers::postgres")]
608    async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
609        let work_item = serde_json::to_string(&item).map_err(|e| {
610            ProviderError::permanent(
611                "enqueue_worker_work",
612                format!("Failed to serialize work item: {e}"),
613            )
614        })?;
615
616        let now_ms = Self::now_millis();
617
618        // Extract activity identification for ActivityExecute items (for cancellation support)
619        let (instance_id, execution_id, activity_id) = match &item {
620            WorkItem::ActivityExecute {
621                instance,
622                execution_id,
623                id,
624                ..
625            } => (
626                Some(instance.clone()),
627                Some(*execution_id as i64),
628                Some(*id as i64),
629            ),
630            _ => (None, None, None),
631        };
632
633        sqlx::query(&format!(
634            "SELECT {}.enqueue_worker_work($1, $2, $3, $4, $5)",
635            self.schema_name
636        ))
637        .bind(work_item)
638        .bind(now_ms)
639        .bind(&instance_id)
640        .bind(execution_id)
641        .bind(activity_id)
642        .execute(&*self.pool)
643        .await
644        .map_err(|e| {
645            error!(
646                target = "duroxide::providers::postgres",
647                operation = "enqueue_worker_work",
648                error_type = "database_error",
649                error = %e,
650                "Failed to enqueue worker work"
651            );
652            Self::sqlx_to_provider_error("enqueue_worker_work", e)
653        })?;
654
655        Ok(())
656    }
657
658    #[instrument(skip(self), target = "duroxide::providers::postgres")]
659    async fn fetch_work_item(
660        &self,
661        lock_timeout: Duration,
662        _poll_timeout: Duration,
663    ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
664        let start = std::time::Instant::now();
665
666        // Convert Duration to milliseconds
667        let lock_timeout_ms = lock_timeout.as_millis() as i64;
668
669        let row = match sqlx::query_as::<_, (String, String, i32)>(&format!(
670            "SELECT * FROM {}.fetch_work_item($1, $2)",
671            self.schema_name
672        ))
673        .bind(Self::now_millis())
674        .bind(lock_timeout_ms)
675        .fetch_optional(&*self.pool)
676        .await
677        {
678            Ok(row) => row,
679            Err(e) => {
680                return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
681            }
682        };
683
684        let (work_item_json, lock_token, attempt_count) = match row {
685            Some(row) => row,
686            None => return Ok(None),
687        };
688
689        let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
690            ProviderError::permanent(
691                "fetch_work_item",
692                format!("Failed to deserialize worker item: {e}"),
693            )
694        })?;
695
696        let duration_ms = start.elapsed().as_millis() as u64;
697
698        // Extract instance for logging - different work item types have different structures
699        let instance_id = match &work_item {
700            WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
701            WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
702            WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
703            WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
704            WorkItem::TimerFired { instance, .. } => instance.as_str(),
705            WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
706            WorkItem::CancelInstance { instance, .. } => instance.as_str(),
707            WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
708            WorkItem::SubOrchCompleted {
709                parent_instance, ..
710            } => parent_instance.as_str(),
711            WorkItem::SubOrchFailed {
712                parent_instance, ..
713            } => parent_instance.as_str(),
714        };
715
716        debug!(
717            target = "duroxide::providers::postgres",
718            operation = "fetch_work_item",
719            instance_id = %instance_id,
720            attempt_count = attempt_count,
721            duration_ms = duration_ms,
722            "Fetched activity work item via stored procedure"
723        );
724
725        Ok(Some((
726            work_item,
727            lock_token,
728            attempt_count as u32,
729        )))
730    }
731
732    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
733    async fn ack_work_item(
734        &self,
735        token: &str,
736        completion: Option<WorkItem>,
737    ) -> Result<(), ProviderError> {
738        let start = std::time::Instant::now();
739
740        // If no completion provided (e.g., cancelled activity), just delete the item
741        let Some(completion) = completion else {
742            // Call ack_worker with NULL completion to delete without enqueueing
743            sqlx::query(&format!(
744                "SELECT {}.ack_worker($1)",
745                self.schema_name
746            ))
747            .bind(token)
748            .execute(&*self.pool)
749            .await
750            .map_err(|e| {
751                if e.to_string().contains("Worker queue item not found") {
752                    ProviderError::permanent(
753                        "ack_worker",
754                        "Worker queue item not found or already processed",
755                    )
756                } else {
757                    Self::sqlx_to_provider_error("ack_worker", e)
758                }
759            })?;
760
761            let duration_ms = start.elapsed().as_millis() as u64;
762            debug!(
763                target = "duroxide::providers::postgres",
764                operation = "ack_worker",
765                token = %token,
766                duration_ms = duration_ms,
767                "Acknowledged worker without completion (cancelled)"
768            );
769            return Ok(());
770        };
771
772        // Extract instance ID from completion WorkItem
773        let instance_id = match &completion {
774            WorkItem::ActivityCompleted { instance, .. }
775            | WorkItem::ActivityFailed { instance, .. } => instance,
776            _ => {
777                error!(
778                    target = "duroxide::providers::postgres",
779                    operation = "ack_worker",
780                    error_type = "invalid_completion_type",
781                    "Invalid completion work item type"
782                );
783                return Err(ProviderError::permanent(
784                    "ack_worker",
785                    "Invalid completion work item type",
786                ));
787            }
788        };
789
790        let completion_json = serde_json::to_string(&completion).map_err(|e| {
791            ProviderError::permanent("ack_worker", format!("Failed to serialize completion: {e}"))
792        })?;
793
794        let now_ms = Self::now_millis();
795
796        // Call stored procedure to atomically delete worker item and enqueue completion
797        sqlx::query(&format!(
798            "SELECT {}.ack_worker($1, $2, $3, $4)",
799            self.schema_name
800        ))
801        .bind(token)
802        .bind(instance_id)
803        .bind(completion_json)
804        .bind(now_ms)
805        .execute(&*self.pool)
806        .await
807        .map_err(|e| {
808            if e.to_string().contains("Worker queue item not found") {
809                error!(
810                    target = "duroxide::providers::postgres",
811                    operation = "ack_worker",
812                    error_type = "worker_item_not_found",
813                    token = %token,
814                    "Worker queue item not found or already processed"
815                );
816                ProviderError::permanent(
817                    "ack_worker",
818                    "Worker queue item not found or already processed",
819                )
820            } else {
821                Self::sqlx_to_provider_error("ack_worker", e)
822            }
823        })?;
824
825        let duration_ms = start.elapsed().as_millis() as u64;
826        debug!(
827            target = "duroxide::providers::postgres",
828            operation = "ack_worker",
829            instance_id = %instance_id,
830            duration_ms = duration_ms,
831            "Acknowledged worker and enqueued completion"
832        );
833
834        Ok(())
835    }
836
837    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
838    async fn renew_work_item_lock(
839        &self,
840        token: &str,
841        extend_for: Duration,
842    ) -> Result<(), ProviderError> {
843        let start = std::time::Instant::now();
844
845        // Get current time from application for consistent time reference
846        let now_ms = Self::now_millis();
847
848        // Convert Duration to seconds for the stored procedure
849        let extend_secs = extend_for.as_secs() as i64;
850
851        match sqlx::query(&format!(
852            "SELECT {}.renew_work_item_lock($1, $2, $3)",
853            self.schema_name
854        ))
855        .bind(token)
856        .bind(now_ms)
857        .bind(extend_secs)
858        .execute(&*self.pool)
859        .await
860        {
861            Ok(_) => {
862                let duration_ms = start.elapsed().as_millis() as u64;
863                debug!(
864                    target = "duroxide::providers::postgres",
865                    operation = "renew_work_item_lock",
866                    token = %token,
867                    extend_for_secs = extend_secs,
868                    duration_ms = duration_ms,
869                    "Work item lock renewed successfully"
870                );
871                Ok(())
872            }
873            Err(e) => {
874                if let SqlxError::Database(db_err) = &e {
875                    if db_err.message().contains("Lock token invalid") {
876                        return Err(ProviderError::permanent(
877                            "renew_work_item_lock",
878                            "Lock token invalid, expired, or already acked",
879                        ));
880                    }
881                } else if e.to_string().contains("Lock token invalid") {
882                    return Err(ProviderError::permanent(
883                        "renew_work_item_lock",
884                        "Lock token invalid, expired, or already acked",
885                    ));
886                }
887
888                Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
889            }
890        }
891    }
892
893    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
894    async fn abandon_work_item(
895        &self,
896        token: &str,
897        delay: Option<Duration>,
898        ignore_attempt: bool,
899    ) -> Result<(), ProviderError> {
900        let start = std::time::Instant::now();
901        let now_ms = Self::now_millis();
902        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
903
904        match sqlx::query(&format!(
905            "SELECT {}.abandon_work_item($1, $2, $3, $4)",
906            self.schema_name
907        ))
908        .bind(token)
909        .bind(now_ms)
910        .bind(delay_param)
911        .bind(ignore_attempt)
912        .execute(&*self.pool)
913        .await
914        {
915            Ok(_) => {
916                let duration_ms = start.elapsed().as_millis() as u64;
917                debug!(
918                    target = "duroxide::providers::postgres",
919                    operation = "abandon_work_item",
920                    token = %token,
921                    delay_ms = delay.map(|d| d.as_millis() as u64),
922                    ignore_attempt = ignore_attempt,
923                    duration_ms = duration_ms,
924                    "Abandoned work item via stored procedure"
925                );
926                Ok(())
927            }
928            Err(e) => {
929                if let SqlxError::Database(db_err) = &e {
930                    if db_err.message().contains("Invalid lock token")
931                        || db_err.message().contains("already acked")
932                    {
933                        return Err(ProviderError::permanent(
934                            "abandon_work_item",
935                            "Invalid lock token or already acked",
936                        ));
937                    }
938                } else if e.to_string().contains("Invalid lock token")
939                    || e.to_string().contains("already acked")
940                {
941                    return Err(ProviderError::permanent(
942                        "abandon_work_item",
943                        "Invalid lock token or already acked",
944                    ));
945                }
946
947                Err(Self::sqlx_to_provider_error("abandon_work_item", e))
948            }
949        }
950    }
951
952    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
953    async fn renew_orchestration_item_lock(
954        &self,
955        token: &str,
956        extend_for: Duration,
957    ) -> Result<(), ProviderError> {
958        let start = std::time::Instant::now();
959
960        // Get current time from application for consistent time reference
961        let now_ms = Self::now_millis();
962
963        // Convert Duration to seconds for the stored procedure
964        let extend_secs = extend_for.as_secs() as i64;
965
966        match sqlx::query(&format!(
967            "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
968            self.schema_name
969        ))
970        .bind(token)
971        .bind(now_ms)
972        .bind(extend_secs)
973        .execute(&*self.pool)
974        .await
975        {
976            Ok(_) => {
977                let duration_ms = start.elapsed().as_millis() as u64;
978                debug!(
979                    target = "duroxide::providers::postgres",
980                    operation = "renew_orchestration_item_lock",
981                    token = %token,
982                    extend_for_secs = extend_secs,
983                    duration_ms = duration_ms,
984                    "Orchestration item lock renewed successfully"
985                );
986                Ok(())
987            }
988            Err(e) => {
989                if let SqlxError::Database(db_err) = &e {
990                    if db_err.message().contains("Lock token invalid")
991                        || db_err.message().contains("expired")
992                        || db_err.message().contains("already released")
993                    {
994                        return Err(ProviderError::permanent(
995                            "renew_orchestration_item_lock",
996                            "Lock token invalid, expired, or already released",
997                        ));
998                    }
999                } else if e.to_string().contains("Lock token invalid")
1000                    || e.to_string().contains("expired")
1001                    || e.to_string().contains("already released")
1002                {
1003                    return Err(ProviderError::permanent(
1004                        "renew_orchestration_item_lock",
1005                        "Lock token invalid, expired, or already released",
1006                    ));
1007                }
1008
1009                Err(Self::sqlx_to_provider_error(
1010                    "renew_orchestration_item_lock",
1011                    e,
1012                ))
1013            }
1014        }
1015    }
1016
1017    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1018    async fn enqueue_for_orchestrator(
1019        &self,
1020        item: WorkItem,
1021        delay: Option<Duration>,
1022    ) -> Result<(), ProviderError> {
1023        let work_item = serde_json::to_string(&item).map_err(|e| {
1024            ProviderError::permanent(
1025                "enqueue_orchestrator_work",
1026                format!("Failed to serialize work item: {e}"),
1027            )
1028        })?;
1029
1030        // Extract instance ID from WorkItem enum
1031        let instance_id = match &item {
1032            WorkItem::StartOrchestration { instance, .. }
1033            | WorkItem::ActivityCompleted { instance, .. }
1034            | WorkItem::ActivityFailed { instance, .. }
1035            | WorkItem::TimerFired { instance, .. }
1036            | WorkItem::ExternalRaised { instance, .. }
1037            | WorkItem::CancelInstance { instance, .. }
1038            | WorkItem::ContinueAsNew { instance, .. } => instance,
1039            WorkItem::SubOrchCompleted {
1040                parent_instance, ..
1041            }
1042            | WorkItem::SubOrchFailed {
1043                parent_instance, ..
1044            } => parent_instance,
1045            WorkItem::ActivityExecute { .. } => {
1046                return Err(ProviderError::permanent(
1047                    "enqueue_orchestrator_work",
1048                    "ActivityExecute should go to worker queue, not orchestrator queue",
1049                ));
1050            }
1051        };
1052
1053        // Determine visible_at: use max of fire_at_ms (for TimerFired) and delay
1054        let now_ms = Self::now_millis();
1055
1056        let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
1057            if *fire_at_ms > 0 {
1058                // Take max of fire_at_ms and delay (if provided)
1059                if let Some(delay) = delay {
1060                    std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
1061                } else {
1062                    *fire_at_ms
1063                }
1064            } else {
1065                // fire_at_ms is 0, use delay or NOW()
1066                delay
1067                    .map(|d| now_ms as u64 + d.as_millis() as u64)
1068                    .unwrap_or(now_ms as u64)
1069            }
1070        } else {
1071            // Non-timer item: use delay or NOW()
1072            delay
1073                .map(|d| now_ms as u64 + d.as_millis() as u64)
1074                .unwrap_or(now_ms as u64)
1075        };
1076
1077        let visible_at = Utc
1078            .timestamp_millis_opt(visible_at_ms as i64)
1079            .single()
1080            .ok_or_else(|| {
1081                ProviderError::permanent(
1082                    "enqueue_orchestrator_work",
1083                    "Invalid visible_at timestamp",
1084                )
1085            })?;
1086
1087        // ⚠️ CRITICAL: DO NOT extract orchestration metadata - instance creation happens via ack_orchestration_item metadata
1088        // Pass NULL for orchestration_name, orchestration_version, execution_id parameters
1089
1090        // Call stored procedure to enqueue work
1091        sqlx::query(&format!(
1092            "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
1093            self.schema_name
1094        ))
1095        .bind(instance_id)
1096        .bind(&work_item)
1097        .bind(visible_at)
1098        .bind::<Option<String>>(None) // orchestration_name - NULL
1099        .bind::<Option<String>>(None) // orchestration_version - NULL
1100        .bind::<Option<i64>>(None) // execution_id - NULL
1101        .execute(&*self.pool)
1102        .await
1103        .map_err(|e| {
1104            error!(
1105                target = "duroxide::providers::postgres",
1106                operation = "enqueue_orchestrator_work",
1107                error_type = "database_error",
1108                error = %e,
1109                instance_id = %instance_id,
1110                "Failed to enqueue orchestrator work"
1111            );
1112            Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1113        })?;
1114
1115        debug!(
1116            target = "duroxide::providers::postgres",
1117            operation = "enqueue_orchestrator_work",
1118            instance_id = %instance_id,
1119            delay_ms = delay.map(|d| d.as_millis() as u64),
1120            "Enqueued orchestrator work"
1121        );
1122
1123        Ok(())
1124    }
1125
1126    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1127    async fn read_with_execution(
1128        &self,
1129        instance: &str,
1130        execution_id: u64,
1131    ) -> Result<Vec<Event>, ProviderError> {
1132        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1133            "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1134            self.table_name("history")
1135        ))
1136        .bind(instance)
1137        .bind(execution_id as i64)
1138        .fetch_all(&*self.pool)
1139        .await
1140        .ok()
1141        .unwrap_or_default();
1142
1143        Ok(event_data_rows
1144            .into_iter()
1145            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1146            .collect())
1147    }
1148
1149    fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1150        Some(self)
1151    }
1152}
1153
1154#[async_trait::async_trait]
1155impl ProviderAdmin for PostgresProvider {
1156    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1157    async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1158        sqlx::query_scalar(&format!(
1159            "SELECT instance_id FROM {}.list_instances()",
1160            self.schema_name
1161        ))
1162        .fetch_all(&*self.pool)
1163        .await
1164        .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1165    }
1166
1167    #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1168    async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1169        sqlx::query_scalar(&format!(
1170            "SELECT instance_id FROM {}.list_instances_by_status($1)",
1171            self.schema_name
1172        ))
1173        .bind(status)
1174        .fetch_all(&*self.pool)
1175        .await
1176        .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1177    }
1178
1179    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1180    async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1181        let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1182            "SELECT execution_id FROM {}.list_executions($1)",
1183            self.schema_name
1184        ))
1185        .bind(instance)
1186        .fetch_all(&*self.pool)
1187        .await
1188        .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1189
1190        Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1191    }
1192
1193    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1194    async fn read_history_with_execution_id(
1195        &self,
1196        instance: &str,
1197        execution_id: u64,
1198    ) -> Result<Vec<Event>, ProviderError> {
1199        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1200            "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1201            self.schema_name
1202        ))
1203        .bind(instance)
1204        .bind(execution_id as i64)
1205        .fetch_all(&*self.pool)
1206        .await
1207        .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1208
1209        event_data_rows
1210            .into_iter()
1211            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1212            .collect::<Vec<Event>>()
1213            .into_iter()
1214            .map(Ok)
1215            .collect()
1216    }
1217
1218    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1219    async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1220        let execution_id = self.latest_execution_id(instance).await?;
1221        self.read_history_with_execution_id(instance, execution_id)
1222            .await
1223    }
1224
1225    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1226    async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1227        sqlx::query_scalar(&format!(
1228            "SELECT {}.latest_execution_id($1)",
1229            self.schema_name
1230        ))
1231        .bind(instance)
1232        .fetch_optional(&*self.pool)
1233        .await
1234        .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1235        .map(|id: i64| id as u64)
1236        .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1237    }
1238
1239    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1240    async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1241        let row: Option<(
1242            String,
1243            String,
1244            String,
1245            i64,
1246            chrono::DateTime<Utc>,
1247            Option<chrono::DateTime<Utc>>,
1248            Option<String>,
1249            Option<String>,
1250            Option<String>,
1251        )> = sqlx::query_as(&format!(
1252            "SELECT * FROM {}.get_instance_info($1)",
1253            self.schema_name
1254        ))
1255        .bind(instance)
1256        .fetch_optional(&*self.pool)
1257        .await
1258        .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1259
1260        let (
1261            instance_id,
1262            orchestration_name,
1263            orchestration_version,
1264            current_execution_id,
1265            created_at,
1266            updated_at,
1267            status,
1268            output,
1269            parent_instance_id,
1270        ) =
1271            row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1272
1273        Ok(InstanceInfo {
1274            instance_id,
1275            orchestration_name,
1276            orchestration_version,
1277            current_execution_id: current_execution_id as u64,
1278            status: status.unwrap_or_else(|| "Running".to_string()),
1279            output,
1280            created_at: created_at.timestamp_millis() as u64,
1281            updated_at: updated_at
1282                .map(|dt| dt.timestamp_millis() as u64)
1283                .unwrap_or(created_at.timestamp_millis() as u64),
1284            parent_instance_id,
1285        })
1286    }
1287
1288    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1289    async fn get_execution_info(
1290        &self,
1291        instance: &str,
1292        execution_id: u64,
1293    ) -> Result<ExecutionInfo, ProviderError> {
1294        let row: Option<(
1295            i64,
1296            String,
1297            Option<String>,
1298            chrono::DateTime<Utc>,
1299            Option<chrono::DateTime<Utc>>,
1300            i64,
1301        )> = sqlx::query_as(&format!(
1302            "SELECT * FROM {}.get_execution_info($1, $2)",
1303            self.schema_name
1304        ))
1305        .bind(instance)
1306        .bind(execution_id as i64)
1307        .fetch_optional(&*self.pool)
1308        .await
1309        .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1310
1311        let (exec_id, status, output, started_at, completed_at, event_count) = row
1312            .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1313
1314        Ok(ExecutionInfo {
1315            execution_id: exec_id as u64,
1316            status,
1317            output,
1318            started_at: started_at.timestamp_millis() as u64,
1319            completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1320            event_count: event_count as usize,
1321        })
1322    }
1323
1324    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1325    async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1326        let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1327            "SELECT * FROM {}.get_system_metrics()",
1328            self.schema_name
1329        ))
1330        .fetch_optional(&*self.pool)
1331        .await
1332        .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1333
1334        let (
1335            total_instances,
1336            total_executions,
1337            running_instances,
1338            completed_instances,
1339            failed_instances,
1340            total_events,
1341        ) = row.ok_or_else(|| {
1342            ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1343        })?;
1344
1345        Ok(SystemMetrics {
1346            total_instances: total_instances as u64,
1347            total_executions: total_executions as u64,
1348            running_instances: running_instances as u64,
1349            completed_instances: completed_instances as u64,
1350            failed_instances: failed_instances as u64,
1351            total_events: total_events as u64,
1352        })
1353    }
1354
1355    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1356    async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1357        let now_ms = Self::now_millis();
1358
1359        let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1360            "SELECT * FROM {}.get_queue_depths($1)",
1361            self.schema_name
1362        ))
1363        .bind(now_ms)
1364        .fetch_optional(&*self.pool)
1365        .await
1366        .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1367
1368        let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1369            ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1370        })?;
1371
1372        Ok(QueueDepths {
1373            orchestrator_queue: orchestrator_queue as usize,
1374            worker_queue: worker_queue as usize,
1375            timer_queue: 0, // Timers are in orchestrator queue with delayed visibility
1376        })
1377    }
1378
1379    // ===== Hierarchy Primitive Operations =====
1380
1381    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1382    async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError> {
1383        sqlx::query_scalar(&format!(
1384            "SELECT child_instance_id FROM {}.list_children($1)",
1385            self.schema_name
1386        ))
1387        .bind(instance_id)
1388        .fetch_all(&*self.pool)
1389        .await
1390        .map_err(|e| Self::sqlx_to_provider_error("list_children", e))
1391    }
1392
1393    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1394    async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError> {
1395        // The stored procedure raises an exception if instance doesn't exist
1396        // Otherwise returns the parent_instance_id (which may be NULL)
1397        let result: Result<Option<String>, _> = sqlx::query_scalar(&format!(
1398            "SELECT {}.get_parent_id($1)",
1399            self.schema_name
1400        ))
1401        .bind(instance_id)
1402        .fetch_one(&*self.pool)
1403        .await;
1404
1405        match result {
1406            Ok(parent_id) => Ok(parent_id),
1407            Err(e) => {
1408                let err_str = e.to_string();
1409                if err_str.contains("Instance not found") {
1410                    Err(ProviderError::permanent(
1411                        "get_parent_id",
1412                        format!("Instance not found: {}", instance_id),
1413                    ))
1414                } else {
1415                    Err(Self::sqlx_to_provider_error("get_parent_id", e))
1416                }
1417            }
1418        }
1419    }
1420
1421    // ===== Deletion Operations =====
1422
1423    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1424    async fn delete_instances_atomic(
1425        &self,
1426        ids: &[String],
1427        force: bool,
1428    ) -> Result<DeleteInstanceResult, ProviderError> {
1429        if ids.is_empty() {
1430            return Ok(DeleteInstanceResult::default());
1431        }
1432
1433        let row: Option<(i64, i64, i64, i64)> = sqlx::query_as(&format!(
1434            "SELECT * FROM {}.delete_instances_atomic($1, $2)",
1435            self.schema_name
1436        ))
1437        .bind(ids)
1438        .bind(force)
1439        .fetch_optional(&*self.pool)
1440        .await
1441        .map_err(|e| {
1442            let err_str = e.to_string();
1443            if err_str.contains("is Running") {
1444                ProviderError::permanent(
1445                    "delete_instances_atomic",
1446                    err_str,
1447                )
1448            } else if err_str.contains("Orphan detected") {
1449                ProviderError::permanent(
1450                    "delete_instances_atomic",
1451                    err_str,
1452                )
1453            } else {
1454                Self::sqlx_to_provider_error("delete_instances_atomic", e)
1455            }
1456        })?;
1457
1458        let (instances_deleted, executions_deleted, events_deleted, queue_messages_deleted) =
1459            row.unwrap_or((0, 0, 0, 0));
1460
1461        debug!(
1462            target = "duroxide::providers::postgres",
1463            operation = "delete_instances_atomic",
1464            instances_deleted = instances_deleted,
1465            executions_deleted = executions_deleted,
1466            events_deleted = events_deleted,
1467            queue_messages_deleted = queue_messages_deleted,
1468            "Deleted instances atomically"
1469        );
1470
1471        Ok(DeleteInstanceResult {
1472            instances_deleted: instances_deleted as u64,
1473            executions_deleted: executions_deleted as u64,
1474            events_deleted: events_deleted as u64,
1475            queue_messages_deleted: queue_messages_deleted as u64,
1476        })
1477    }
1478
1479    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1480    async fn delete_instance_bulk(
1481        &self,
1482        filter: InstanceFilter,
1483    ) -> Result<DeleteInstanceResult, ProviderError> {
1484        // Build query to find matching root instances in terminal states
1485        let mut sql = format!(
1486            r#"
1487            SELECT i.instance_id
1488            FROM {}.instances i
1489            LEFT JOIN {}.executions e ON i.instance_id = e.instance_id 
1490              AND i.current_execution_id = e.execution_id
1491            WHERE i.parent_instance_id IS NULL
1492              AND e.status IN ('Completed', 'Failed', 'ContinuedAsNew')
1493            "#,
1494            self.schema_name, self.schema_name
1495        );
1496
1497        // Add instance_ids filter if provided
1498        if let Some(ref ids) = filter.instance_ids {
1499            if ids.is_empty() {
1500                return Ok(DeleteInstanceResult::default());
1501            }
1502            let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1503            sql.push_str(&format!(
1504                " AND i.instance_id IN ({})",
1505                placeholders.join(", ")
1506            ));
1507        }
1508
1509        // Add completed_before filter if provided
1510        if filter.completed_before.is_some() {
1511            let param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0) + 1;
1512            sql.push_str(&format!(
1513                " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1514                param_num
1515            ));
1516        }
1517
1518        // Add limit
1519        let limit = filter.limit.unwrap_or(1000);
1520        let limit_param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0)
1521            + if filter.completed_before.is_some() { 1 } else { 0 }
1522            + 1;
1523        sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1524
1525        // Build and execute query
1526        let mut query = sqlx::query_scalar::<_, String>(&sql);
1527        if let Some(ref ids) = filter.instance_ids {
1528            for id in ids {
1529                query = query.bind(id);
1530            }
1531        }
1532        if let Some(completed_before) = filter.completed_before {
1533            query = query.bind(completed_before as i64);
1534        }
1535        query = query.bind(limit as i64);
1536
1537        let instance_ids: Vec<String> = query
1538            .fetch_all(&*self.pool)
1539            .await
1540            .map_err(|e| Self::sqlx_to_provider_error("delete_instance_bulk", e))?;
1541
1542        if instance_ids.is_empty() {
1543            return Ok(DeleteInstanceResult::default());
1544        }
1545
1546        // Delete each instance with cascade
1547        let mut result = DeleteInstanceResult::default();
1548
1549        for instance_id in &instance_ids {
1550            // Get full tree for this root
1551            let tree = self.get_instance_tree(instance_id).await?;
1552
1553            // Atomic delete (tree.all_ids is already in deletion order: children first)
1554            let delete_result = self.delete_instances_atomic(&tree.all_ids, true).await?;
1555            result.instances_deleted += delete_result.instances_deleted;
1556            result.executions_deleted += delete_result.executions_deleted;
1557            result.events_deleted += delete_result.events_deleted;
1558            result.queue_messages_deleted += delete_result.queue_messages_deleted;
1559        }
1560
1561        debug!(
1562            target = "duroxide::providers::postgres",
1563            operation = "delete_instance_bulk",
1564            instances_deleted = result.instances_deleted,
1565            executions_deleted = result.executions_deleted,
1566            events_deleted = result.events_deleted,
1567            queue_messages_deleted = result.queue_messages_deleted,
1568            "Bulk deleted instances"
1569        );
1570
1571        Ok(result)
1572    }
1573
1574    // ===== Pruning Operations =====
1575
1576    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1577    async fn prune_executions(
1578        &self,
1579        instance_id: &str,
1580        options: PruneOptions,
1581    ) -> Result<PruneResult, ProviderError> {
1582        let keep_last: Option<i32> = options.keep_last.map(|v| v as i32);
1583        let completed_before_ms: Option<i64> = options.completed_before.map(|v| v as i64);
1584
1585        let row: Option<(i64, i64, i64)> = sqlx::query_as(&format!(
1586            "SELECT * FROM {}.prune_executions($1, $2, $3)",
1587            self.schema_name
1588        ))
1589        .bind(instance_id)
1590        .bind(keep_last)
1591        .bind(completed_before_ms)
1592        .fetch_optional(&*self.pool)
1593        .await
1594        .map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?;
1595
1596        let (instances_processed, executions_deleted, events_deleted) = row.unwrap_or((0, 0, 0));
1597
1598        debug!(
1599            target = "duroxide::providers::postgres",
1600            operation = "prune_executions",
1601            instance_id = %instance_id,
1602            instances_processed = instances_processed,
1603            executions_deleted = executions_deleted,
1604            events_deleted = events_deleted,
1605            "Pruned executions"
1606        );
1607
1608        Ok(PruneResult {
1609            instances_processed: instances_processed as u64,
1610            executions_deleted: executions_deleted as u64,
1611            events_deleted: events_deleted as u64,
1612        })
1613    }
1614
1615    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1616    async fn prune_executions_bulk(
1617        &self,
1618        filter: InstanceFilter,
1619        options: PruneOptions,
1620    ) -> Result<PruneResult, ProviderError> {
1621        // Build query to find matching instances in terminal states
1622        let mut sql = format!(
1623            r#"
1624            SELECT i.instance_id
1625            FROM {}.instances i
1626            LEFT JOIN {}.executions e ON i.instance_id = e.instance_id 
1627              AND i.current_execution_id = e.execution_id
1628            WHERE e.status IN ('Completed', 'Failed', 'ContinuedAsNew')
1629            "#,
1630            self.schema_name, self.schema_name
1631        );
1632
1633        // Add instance_ids filter if provided
1634        if let Some(ref ids) = filter.instance_ids {
1635            if ids.is_empty() {
1636                return Ok(PruneResult::default());
1637            }
1638            let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1639            sql.push_str(&format!(
1640                " AND i.instance_id IN ({})",
1641                placeholders.join(", ")
1642            ));
1643        }
1644
1645        // Add completed_before filter if provided
1646        if filter.completed_before.is_some() {
1647            let param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0) + 1;
1648            sql.push_str(&format!(
1649                " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1650                param_num
1651            ));
1652        }
1653
1654        // Add limit
1655        let limit = filter.limit.unwrap_or(1000);
1656        let limit_param_num = filter.instance_ids.as_ref().map(|ids| ids.len()).unwrap_or(0)
1657            + if filter.completed_before.is_some() { 1 } else { 0 }
1658            + 1;
1659        sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1660
1661        // Build and execute query
1662        let mut query = sqlx::query_scalar::<_, String>(&sql);
1663        if let Some(ref ids) = filter.instance_ids {
1664            for id in ids {
1665                query = query.bind(id);
1666            }
1667        }
1668        if let Some(completed_before) = filter.completed_before {
1669            query = query.bind(completed_before as i64);
1670        }
1671        query = query.bind(limit as i64);
1672
1673        let instance_ids: Vec<String> = query
1674            .fetch_all(&*self.pool)
1675            .await
1676            .map_err(|e| Self::sqlx_to_provider_error("prune_executions_bulk", e))?;
1677
1678        // Prune each instance
1679        let mut result = PruneResult::default();
1680
1681        for instance_id in &instance_ids {
1682            let single_result = self.prune_executions(instance_id, options.clone()).await?;
1683            result.instances_processed += single_result.instances_processed;
1684            result.executions_deleted += single_result.executions_deleted;
1685            result.events_deleted += single_result.events_deleted;
1686        }
1687
1688        debug!(
1689            target = "duroxide::providers::postgres",
1690            operation = "prune_executions_bulk",
1691            instances_processed = result.instances_processed,
1692            executions_deleted = result.executions_deleted,
1693            events_deleted = result.events_deleted,
1694            "Bulk pruned executions"
1695        );
1696
1697        Ok(result)
1698    }
1699}