Skip to main content

duroxide_pg/
provider.rs

1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4    DeleteInstanceResult, DispatcherCapabilityFilter, ExecutionInfo, ExecutionMetadata,
5    InstanceFilter, InstanceInfo, OrchestrationItem, Provider, ProviderAdmin, ProviderError,
6    PruneOptions, PruneResult, QueueDepths, ScheduledActivityIdentifier, SessionFetchConfig,
7    SystemMetrics, TagFilter, WorkItem,
8};
9use duroxide::{Event, EventKind};
10use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
11use std::sync::Arc;
12use std::time::Duration;
13use std::time::{SystemTime, UNIX_EPOCH};
14use tokio::time::sleep;
15use tracing::{debug, error, instrument, warn};
16
17use crate::migrations::MigrationRunner;
18
19/// PostgreSQL-based provider for Duroxide durable orchestrations.
20///
21/// Implements the [`Provider`] and [`ProviderAdmin`] traits from Duroxide,
22/// storing orchestration state, history, and work queues in PostgreSQL.
23///
24/// # Example
25///
26/// ```rust,no_run
27/// use duroxide_pg::PostgresProvider;
28///
29/// # async fn example() -> anyhow::Result<()> {
30/// // Connect using DATABASE_URL or explicit connection string
31/// let provider = PostgresProvider::new("postgres://localhost/mydb").await?;
32///
33/// // Or use a custom schema for isolation
34/// let provider = PostgresProvider::new_with_schema(
35///     "postgres://localhost/mydb",
36///     Some("my_app"),
37/// ).await?;
38/// # Ok(())
39/// # }
40/// ```
41pub struct PostgresProvider {
42    pool: Arc<PgPool>,
43    schema_name: String,
44}
45
46impl PostgresProvider {
47    pub async fn new(database_url: &str) -> Result<Self> {
48        Self::new_with_schema(database_url, None).await
49    }
50
51    pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
52        let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
53            .ok()
54            .and_then(|s| s.parse::<u32>().ok())
55            .unwrap_or(10);
56
57        let pool = PgPoolOptions::new()
58            .max_connections(max_connections)
59            .min_connections(1)
60            .acquire_timeout(std::time::Duration::from_secs(30))
61            .connect(database_url)
62            .await?;
63
64        let schema_name = schema_name.unwrap_or("public").to_string();
65
66        let provider = Self {
67            pool: Arc::new(pool),
68            schema_name: schema_name.clone(),
69        };
70
71        // Run migrations to initialize schema
72        let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
73        migration_runner.migrate().await?;
74
75        Ok(provider)
76    }
77
78    #[instrument(skip(self), target = "duroxide::providers::postgres")]
79    pub async fn initialize_schema(&self) -> Result<()> {
80        // Schema initialization is now handled by migrations
81        // This method is kept for backward compatibility but delegates to migrations
82        let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
83        migration_runner.migrate().await?;
84        Ok(())
85    }
86
87    /// Get current timestamp in milliseconds (Unix epoch)
88    fn now_millis() -> i64 {
89        SystemTime::now()
90            .duration_since(UNIX_EPOCH)
91            .unwrap()
92            .as_millis() as i64
93    }
94
95    /// Get schema-qualified table name
96    fn table_name(&self, table: &str) -> String {
97        format!("{}.{}", self.schema_name, table)
98    }
99
100    /// Get the database pool (for testing)
101    pub fn pool(&self) -> &PgPool {
102        &self.pool
103    }
104
105    /// Get the schema name (for testing)
106    pub fn schema_name(&self) -> &str {
107        &self.schema_name
108    }
109
110    /// Convert sqlx::Error to ProviderError with proper classification
111    fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
112        match e {
113            SqlxError::Database(ref db_err) => {
114                // PostgreSQL error codes
115                let code_opt = db_err.code();
116                let code = code_opt.as_deref();
117                if code == Some("40P01") {
118                    // Deadlock detected
119                    ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
120                } else if code == Some("40001") {
121                    // Serialization failure - permanent error (transaction conflict, not transient)
122                    ProviderError::permanent(operation, format!("Serialization failure: {e}"))
123                } else if code == Some("23505") {
124                    // Unique constraint violation (duplicate event)
125                    ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
126                } else if code == Some("23503") {
127                    // Foreign key constraint violation
128                    ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
129                } else {
130                    ProviderError::permanent(operation, format!("Database error: {e}"))
131                }
132            }
133            SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
134                ProviderError::retryable(operation, format!("Connection pool error: {e}"))
135            }
136            SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
137            _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
138        }
139    }
140
141    /// Convert TagFilter to SQL parameters (mode string + tag array)
142    fn tag_filter_to_sql(filter: &TagFilter) -> (&'static str, Vec<String>) {
143        match filter {
144            TagFilter::DefaultOnly => ("default_only", vec![]),
145            TagFilter::Tags(set) => {
146                let mut tags: Vec<String> = set.iter().cloned().collect();
147                tags.sort();
148                ("tags", tags)
149            }
150            TagFilter::DefaultAnd(set) => {
151                let mut tags: Vec<String> = set.iter().cloned().collect();
152                tags.sort();
153                ("default_and", tags)
154            }
155            TagFilter::Any => ("any", vec![]),
156            TagFilter::None => ("none", vec![]),
157        }
158    }
159
160    /// Clean up schema after tests (drops all tables and optionally the schema)
161    ///
162    /// **SAFETY**: Never drops the "public" schema itself, only tables within it.
163    /// Only drops the schema if it's a custom schema (not "public").
164    pub async fn cleanup_schema(&self) -> Result<()> {
165        const MAX_RETRIES: u32 = 5;
166        const BASE_RETRY_DELAY_MS: u64 = 50;
167
168        for attempt in 0..=MAX_RETRIES {
169            let cleanup_result = async {
170                // Call the stored procedure to drop all tables
171                sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
172                    .execute(&*self.pool)
173                    .await?;
174
175                // SAFETY: Never drop the "public" schema - it's a PostgreSQL system schema
176                // Only drop custom schemas created for testing
177                if self.schema_name != "public" {
178                    sqlx::query(&format!(
179                        "DROP SCHEMA IF EXISTS {} CASCADE",
180                        self.schema_name
181                    ))
182                    .execute(&*self.pool)
183                    .await?;
184                } else {
185                    // Explicit safeguard: we only drop tables from public schema, never the schema itself
186                    // This ensures we don't accidentally drop the default PostgreSQL schema
187                }
188
189                Ok::<(), SqlxError>(())
190            }
191            .await;
192
193            match cleanup_result {
194                Ok(()) => return Ok(()),
195                Err(SqlxError::Database(db_err)) if db_err.code().as_deref() == Some("40P01") => {
196                    if attempt < MAX_RETRIES {
197                        warn!(
198                            target = "duroxide::providers::postgres",
199                            schema = %self.schema_name,
200                            attempt = attempt + 1,
201                            "Deadlock during cleanup_schema, retrying"
202                        );
203                        sleep(Duration::from_millis(BASE_RETRY_DELAY_MS * (attempt as u64 + 1)))
204                            .await;
205                        continue;
206                    }
207                    return Err(anyhow::anyhow!(db_err.to_string()));
208                }
209                Err(e) => return Err(anyhow::anyhow!(e.to_string())),
210            }
211        }
212
213        Ok(())
214    }
215}
216
217#[async_trait::async_trait]
218impl Provider for PostgresProvider {
219    fn name(&self) -> &str {
220        "duroxide-pg"
221    }
222
223    fn version(&self) -> &str {
224        env!("CARGO_PKG_VERSION")
225    }
226
227    #[instrument(skip(self), target = "duroxide::providers::postgres")]
228    async fn fetch_orchestration_item(
229        &self,
230        lock_timeout: Duration,
231        _poll_timeout: Duration,
232        filter: Option<&DispatcherCapabilityFilter>,
233    ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
234        let start = std::time::Instant::now();
235
236        const MAX_RETRIES: u32 = 3;
237        const RETRY_DELAY_MS: u64 = 50;
238
239        // Convert Duration to milliseconds
240        let lock_timeout_ms = lock_timeout.as_millis() as i64;
241        let mut _last_error: Option<ProviderError> = None;
242
243        // Extract version filter from capability filter
244        let (min_packed, max_packed) = if let Some(f) = filter {
245            if let Some(range) = f.supported_duroxide_versions.first() {
246                let min = range.min.major as i64 * 1_000_000
247                    + range.min.minor as i64 * 1_000
248                    + range.min.patch as i64;
249                let max = range.max.major as i64 * 1_000_000
250                    + range.max.minor as i64 * 1_000
251                    + range.max.patch as i64;
252                (Some(min), Some(max))
253            } else {
254                // Empty supported_duroxide_versions = supports nothing
255                return Ok(None);
256            }
257        } else {
258            (None, None)
259        };
260
261        for attempt in 0..=MAX_RETRIES {
262            let now_ms = Self::now_millis();
263
264            let result: Result<
265                Option<(
266                    String,
267                    String,
268                    String,
269                    i64,
270                    serde_json::Value,
271                    serde_json::Value,
272                    String,
273                    i32,
274                )>,
275                SqlxError,
276            > = sqlx::query_as(&format!(
277                "SELECT * FROM {}.fetch_orchestration_item($1, $2, $3, $4)",
278                self.schema_name
279            ))
280            .bind(now_ms)
281            .bind(lock_timeout_ms)
282            .bind(min_packed)
283            .bind(max_packed)
284            .fetch_optional(&*self.pool)
285            .await;
286
287            let row = match result {
288                Ok(r) => r,
289                Err(e) => {
290                    let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
291                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
292                        warn!(
293                            target = "duroxide::providers::postgres",
294                            operation = "fetch_orchestration_item",
295                            attempt = attempt + 1,
296                            error = %provider_err,
297                            "Retryable error, will retry"
298                        );
299                        _last_error = Some(provider_err);
300                        sleep(std::time::Duration::from_millis(
301                            RETRY_DELAY_MS * (attempt as u64 + 1),
302                        ))
303                        .await;
304                        continue;
305                    }
306                    return Err(provider_err);
307                }
308            };
309
310            if let Some((
311                instance_id,
312                orchestration_name,
313                orchestration_version,
314                execution_id,
315                history_json,
316                messages_json,
317                lock_token,
318                attempt_count,
319            )) = row
320            {
321                let (history, history_error) =
322                    match serde_json::from_value::<Vec<Event>>(history_json) {
323                        Ok(h) => (h, None),
324                        Err(e) => {
325                            let error_msg = format!("Failed to deserialize history: {e}");
326                            warn!(
327                                target = "duroxide::providers::postgres",
328                                instance = %instance_id,
329                                error = %error_msg,
330                                "History deserialization failed, returning item with history_error"
331                            );
332                            (vec![], Some(error_msg))
333                        }
334                    };
335
336                let messages: Vec<WorkItem> =
337                    serde_json::from_value(messages_json).map_err(|e| {
338                        ProviderError::permanent(
339                            "fetch_orchestration_item",
340                            format!("Failed to deserialize messages: {e}"),
341                        )
342                    })?;
343
344                let duration_ms = start.elapsed().as_millis() as u64;
345                debug!(
346                    target = "duroxide::providers::postgres",
347                    operation = "fetch_orchestration_item",
348                    instance_id = %instance_id,
349                    execution_id = execution_id,
350                    message_count = messages.len(),
351                    history_count = history.len(),
352                    attempt_count = attempt_count,
353                    duration_ms = duration_ms,
354                    attempts = attempt + 1,
355                    "Fetched orchestration item via stored procedure"
356                );
357
358                // Orphan queue messages: if orchestration_name is "Unknown", there's
359                // no history, and ALL messages are QueueMessage items, these are orphan
360                // events enqueued before the orchestration started. Drop them by acking
361                // with empty deltas. Other work items (CancelInstance, etc.) may
362                // legitimately race with StartOrchestration and must not be dropped.
363                if orchestration_name == "Unknown"
364                    && history.is_empty()
365                    && messages
366                        .iter()
367                        .all(|m| matches!(m, WorkItem::QueueMessage { .. }))
368                {
369                    let message_count = messages.len();
370                    tracing::warn!(
371                        target = "duroxide::providers::postgres",
372                        instance = %instance_id,
373                        message_count,
374                        "Dropping orphan queue messages — events enqueued before orchestration started are not supported"
375                    );
376                    self.ack_orchestration_item(
377                        &lock_token,
378                        execution_id as u64,
379                        vec![],
380                        vec![],
381                        vec![],
382                        ExecutionMetadata::default(),
383                        vec![],
384                    )
385                    .await?;
386                    return Ok(None);
387                }
388
389                return Ok(Some((
390                    OrchestrationItem {
391                        instance: instance_id,
392                        orchestration_name,
393                        execution_id: execution_id as u64,
394                        version: orchestration_version,
395                        history,
396                        messages,
397                        history_error,
398                    },
399                    lock_token,
400                    attempt_count as u32,
401                )));
402            }
403
404            // No result found - return immediately (short polling behavior)
405            // Only retry with delay on retryable errors (handled above)
406            return Ok(None);
407        }
408
409        Ok(None)
410    }
411    #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
412    async fn ack_orchestration_item(
413        &self,
414        lock_token: &str,
415        execution_id: u64,
416        history_delta: Vec<Event>,
417        worker_items: Vec<WorkItem>,
418        orchestrator_items: Vec<WorkItem>,
419        metadata: ExecutionMetadata,
420        cancelled_activities: Vec<ScheduledActivityIdentifier>,
421    ) -> Result<(), ProviderError> {
422        let start = std::time::Instant::now();
423
424        const MAX_RETRIES: u32 = 3;
425        const RETRY_DELAY_MS: u64 = 50;
426
427        let mut history_delta_payload = Vec::with_capacity(history_delta.len());
428        for event in &history_delta {
429            if event.event_id() == 0 {
430                return Err(ProviderError::permanent(
431                    "ack_orchestration_item",
432                    "event_id must be set by runtime",
433                ));
434            }
435
436            let event_json = serde_json::to_string(event).map_err(|e| {
437                ProviderError::permanent(
438                    "ack_orchestration_item",
439                    format!("Failed to serialize event: {e}"),
440                )
441            })?;
442
443            let event_type = format!("{event:?}")
444                .split('{')
445                .next()
446                .unwrap_or("Unknown")
447                .trim()
448                .to_string();
449
450            history_delta_payload.push(serde_json::json!({
451                "event_id": event.event_id(),
452                "event_type": event_type,
453                "event_data": event_json,
454            }));
455        }
456
457        let history_delta_json = serde_json::Value::Array(history_delta_payload);
458
459        let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
460            ProviderError::permanent(
461                "ack_orchestration_item",
462                format!("Failed to serialize worker items: {e}"),
463            )
464        })?;
465
466        let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
467            ProviderError::permanent(
468                "ack_orchestration_item",
469                format!("Failed to serialize orchestrator items: {e}"),
470            )
471        })?;
472
473        // Scan history_delta for the last CustomStatusUpdated event
474        let (custom_status_action, custom_status_value): (Option<&str>, Option<&str>) = {
475            let mut last_status: Option<&Option<String>> = None;
476            for event in &history_delta {
477                if let EventKind::CustomStatusUpdated { ref status } = event.kind {
478                    last_status = Some(status);
479                }
480            }
481            match last_status {
482                Some(Some(s)) => (Some("set"), Some(s.as_str())),
483                Some(None) => (Some("clear"), None),
484                None => (None, None),
485            }
486        };
487
488        let metadata_json = serde_json::json!({
489            "orchestration_name": metadata.orchestration_name,
490            "orchestration_version": metadata.orchestration_version,
491            "status": metadata.status,
492            "output": metadata.output,
493            "parent_instance_id": metadata.parent_instance_id,
494            "pinned_duroxide_version": metadata.pinned_duroxide_version.as_ref().map(|v| {
495                serde_json::json!({
496                    "major": v.major,
497                    "minor": v.minor,
498                    "patch": v.patch,
499                })
500            }),
501            "custom_status_action": custom_status_action,
502            "custom_status_value": custom_status_value,
503        });
504
505        // Serialize cancelled activities for lock stealing
506        let cancelled_activities_json: Vec<serde_json::Value> = cancelled_activities
507            .iter()
508            .map(|a| {
509                serde_json::json!({
510                    "instance": a.instance,
511                    "execution_id": a.execution_id,
512                    "activity_id": a.activity_id,
513                })
514            })
515            .collect();
516        let cancelled_activities_json = serde_json::Value::Array(cancelled_activities_json);
517
518        for attempt in 0..=MAX_RETRIES {
519            let now_ms = Self::now_millis();
520            let result = sqlx::query(&format!(
521                "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6, $7, $8)",
522                self.schema_name
523            ))
524            .bind(lock_token)
525            .bind(now_ms)
526            .bind(execution_id as i64)
527            .bind(&history_delta_json)
528            .bind(&worker_items_json)
529            .bind(&orchestrator_items_json)
530            .bind(&metadata_json)
531            .bind(&cancelled_activities_json)
532            .execute(&*self.pool)
533            .await;
534
535            match result {
536                Ok(_) => {
537                    let duration_ms = start.elapsed().as_millis() as u64;
538                    debug!(
539                        target = "duroxide::providers::postgres",
540                        operation = "ack_orchestration_item",
541                        execution_id = execution_id,
542                        history_count = history_delta.len(),
543                        worker_items_count = worker_items.len(),
544                        orchestrator_items_count = orchestrator_items.len(),
545                        cancelled_activities_count = cancelled_activities.len(),
546                        duration_ms = duration_ms,
547                        attempts = attempt + 1,
548                        "Acknowledged orchestration item via stored procedure"
549                    );
550                    return Ok(());
551                }
552                Err(e) => {
553                    // Check for permanent errors first
554                    if let SqlxError::Database(db_err) = &e {
555                        if db_err.message().contains("Invalid lock token") {
556                            return Err(ProviderError::permanent(
557                                "ack_orchestration_item",
558                                "Invalid lock token",
559                            ));
560                        }
561                    } else if e.to_string().contains("Invalid lock token") {
562                        return Err(ProviderError::permanent(
563                            "ack_orchestration_item",
564                            "Invalid lock token",
565                        ));
566                    }
567
568                    let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
569                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
570                        warn!(
571                            target = "duroxide::providers::postgres",
572                            operation = "ack_orchestration_item",
573                            attempt = attempt + 1,
574                            error = %provider_err,
575                            "Retryable error, will retry"
576                        );
577                        sleep(std::time::Duration::from_millis(
578                            RETRY_DELAY_MS * (attempt as u64 + 1),
579                        ))
580                        .await;
581                        continue;
582                    }
583                    return Err(provider_err);
584                }
585            }
586        }
587
588        // Should never reach here, but just in case
589        Ok(())
590    }
591    #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
592    async fn abandon_orchestration_item(
593        &self,
594        lock_token: &str,
595        delay: Option<Duration>,
596        ignore_attempt: bool,
597    ) -> Result<(), ProviderError> {
598        let start = std::time::Instant::now();
599        let now_ms = Self::now_millis();
600        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
601
602        let instance_id = match sqlx::query_scalar::<_, String>(&format!(
603            "SELECT {}.abandon_orchestration_item($1, $2, $3, $4)",
604            self.schema_name
605        ))
606        .bind(lock_token)
607        .bind(now_ms)
608        .bind(delay_param)
609        .bind(ignore_attempt)
610        .fetch_one(&*self.pool)
611        .await
612        {
613            Ok(instance_id) => instance_id,
614            Err(e) => {
615                if let SqlxError::Database(db_err) = &e {
616                    if db_err.message().contains("Invalid lock token") {
617                        return Err(ProviderError::permanent(
618                            "abandon_orchestration_item",
619                            "Invalid lock token",
620                        ));
621                    }
622                } else if e.to_string().contains("Invalid lock token") {
623                    return Err(ProviderError::permanent(
624                        "abandon_orchestration_item",
625                        "Invalid lock token",
626                    ));
627                }
628
629                return Err(Self::sqlx_to_provider_error(
630                    "abandon_orchestration_item",
631                    e,
632                ));
633            }
634        };
635
636        let duration_ms = start.elapsed().as_millis() as u64;
637        debug!(
638            target = "duroxide::providers::postgres",
639            operation = "abandon_orchestration_item",
640            instance_id = %instance_id,
641            delay_ms = delay.map(|d| d.as_millis() as u64),
642            ignore_attempt = ignore_attempt,
643            duration_ms = duration_ms,
644            "Abandoned orchestration item via stored procedure"
645        );
646
647        Ok(())
648    }
649
650    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
651    async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
652        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
653            "SELECT out_event_data FROM {}.fetch_history($1)",
654            self.schema_name
655        ))
656        .bind(instance)
657        .fetch_all(&*self.pool)
658        .await
659        .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
660
661        Ok(event_data_rows
662            .into_iter()
663            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
664            .collect())
665    }
666
667    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
668    async fn append_with_execution(
669        &self,
670        instance: &str,
671        execution_id: u64,
672        new_events: Vec<Event>,
673    ) -> Result<(), ProviderError> {
674        if new_events.is_empty() {
675            return Ok(());
676        }
677
678        let mut events_payload = Vec::with_capacity(new_events.len());
679        for event in &new_events {
680            if event.event_id() == 0 {
681                error!(
682                    target = "duroxide::providers::postgres",
683                    operation = "append_with_execution",
684                    error_type = "validation_error",
685                    instance_id = %instance,
686                    execution_id = execution_id,
687                    "event_id must be set by runtime"
688                );
689                return Err(ProviderError::permanent(
690                    "append_with_execution",
691                    "event_id must be set by runtime",
692                ));
693            }
694
695            let event_json = serde_json::to_string(event).map_err(|e| {
696                ProviderError::permanent(
697                    "append_with_execution",
698                    format!("Failed to serialize event: {e}"),
699                )
700            })?;
701
702            let event_type = format!("{event:?}")
703                .split('{')
704                .next()
705                .unwrap_or("Unknown")
706                .trim()
707                .to_string();
708
709            events_payload.push(serde_json::json!({
710                "event_id": event.event_id(),
711                "event_type": event_type,
712                "event_data": event_json,
713            }));
714        }
715
716        let events_json = serde_json::Value::Array(events_payload);
717
718        sqlx::query(&format!(
719            "SELECT {}.append_history($1, $2, $3)",
720            self.schema_name
721        ))
722        .bind(instance)
723        .bind(execution_id as i64)
724        .bind(events_json)
725        .execute(&*self.pool)
726        .await
727        .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
728
729        debug!(
730            target = "duroxide::providers::postgres",
731            operation = "append_with_execution",
732            instance_id = %instance,
733            execution_id = execution_id,
734            event_count = new_events.len(),
735            "Appended history events via stored procedure"
736        );
737
738        Ok(())
739    }
740
741    #[instrument(skip(self), target = "duroxide::providers::postgres")]
742    async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
743        let work_item = serde_json::to_string(&item).map_err(|e| {
744            ProviderError::permanent(
745                "enqueue_worker_work",
746                format!("Failed to serialize work item: {e}"),
747            )
748        })?;
749
750        let now_ms = Self::now_millis();
751
752        // Extract activity identification, session_id, and tag for ActivityExecute items
753        let (instance_id, execution_id, activity_id, session_id, tag) = match &item {
754            WorkItem::ActivityExecute {
755                instance,
756                execution_id,
757                id,
758                session_id,
759                tag,
760                ..
761            } => (
762                Some(instance.clone()),
763                Some(*execution_id as i64),
764                Some(*id as i64),
765                session_id.clone(),
766                tag.clone(),
767            ),
768            _ => (None, None, None, None, None),
769        };
770
771        sqlx::query(&format!(
772            "SELECT {}.enqueue_worker_work($1, $2, $3, $4, $5, $6, $7)",
773            self.schema_name
774        ))
775        .bind(work_item)
776        .bind(now_ms)
777        .bind(&instance_id)
778        .bind(execution_id)
779        .bind(activity_id)
780        .bind(&session_id)
781        .bind(&tag)
782        .execute(&*self.pool)
783        .await
784        .map_err(|e| {
785            error!(
786                target = "duroxide::providers::postgres",
787                operation = "enqueue_worker_work",
788                error_type = "database_error",
789                error = %e,
790                "Failed to enqueue worker work"
791            );
792            Self::sqlx_to_provider_error("enqueue_worker_work", e)
793        })?;
794
795        Ok(())
796    }
797
798    #[instrument(skip(self), target = "duroxide::providers::postgres")]
799    async fn fetch_work_item(
800        &self,
801        lock_timeout: Duration,
802        _poll_timeout: Duration,
803        session: Option<&SessionFetchConfig>,
804        tag_filter: &TagFilter,
805    ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
806        // None filter means don't process any activities
807        if matches!(tag_filter, TagFilter::None) {
808            return Ok(None);
809        }
810
811        let start = std::time::Instant::now();
812
813        // Convert Duration to milliseconds
814        let lock_timeout_ms = lock_timeout.as_millis() as i64;
815
816        // Extract session parameters
817        let (owner_id, session_lock_timeout_ms): (Option<&str>, Option<i64>) = match session {
818            Some(config) => (
819                Some(&config.owner_id),
820                Some(config.lock_timeout.as_millis() as i64),
821            ),
822            None => (None, None),
823        };
824
825        // Convert TagFilter to SQL parameters
826        let (tag_mode, tag_names) = Self::tag_filter_to_sql(tag_filter);
827
828        let row = match sqlx::query_as::<_, (String, String, i32)>(&format!(
829            "SELECT * FROM {}.fetch_work_item($1, $2, $3, $4, $5, $6)",
830            self.schema_name
831        ))
832        .bind(Self::now_millis())
833        .bind(lock_timeout_ms)
834        .bind(owner_id)
835        .bind(session_lock_timeout_ms)
836        .bind(&tag_names)
837        .bind(tag_mode)
838        .fetch_optional(&*self.pool)
839        .await
840        {
841            Ok(row) => row,
842            Err(e) => {
843                return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
844            }
845        };
846
847        let (work_item_json, lock_token, attempt_count) = match row {
848            Some(row) => row,
849            None => return Ok(None),
850        };
851
852        let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
853            ProviderError::permanent(
854                "fetch_work_item",
855                format!("Failed to deserialize worker item: {e}"),
856            )
857        })?;
858
859        let duration_ms = start.elapsed().as_millis() as u64;
860
861        // Extract instance for logging - different work item types have different structures
862        let instance_id = match &work_item {
863            WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
864            WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
865            WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
866            WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
867            WorkItem::TimerFired { instance, .. } => instance.as_str(),
868            WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
869            WorkItem::CancelInstance { instance, .. } => instance.as_str(),
870            WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
871            WorkItem::SubOrchCompleted {
872                parent_instance, ..
873            } => parent_instance.as_str(),
874            WorkItem::SubOrchFailed {
875                parent_instance, ..
876            } => parent_instance.as_str(),
877            WorkItem::QueueMessage { instance, .. } => instance.as_str(),
878        };
879
880        debug!(
881            target = "duroxide::providers::postgres",
882            operation = "fetch_work_item",
883            instance_id = %instance_id,
884            attempt_count = attempt_count,
885            duration_ms = duration_ms,
886            "Fetched activity work item via stored procedure"
887        );
888
889        Ok(Some((work_item, lock_token, attempt_count as u32)))
890    }
891
892    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
893    async fn ack_work_item(
894        &self,
895        token: &str,
896        completion: Option<WorkItem>,
897    ) -> Result<(), ProviderError> {
898        let start = std::time::Instant::now();
899
900        // If no completion provided (e.g., cancelled activity), just delete the item
901        let Some(completion) = completion else {
902            let now_ms = Self::now_millis();
903            // Call ack_worker with NULL completion to delete without enqueueing
904            sqlx::query(&format!(
905                "SELECT {}.ack_worker($1, NULL, NULL, $2)",
906                self.schema_name
907            ))
908            .bind(token)
909            .bind(now_ms)
910            .execute(&*self.pool)
911            .await
912            .map_err(|e| {
913                if e.to_string().contains("Worker queue item not found") {
914                    ProviderError::permanent(
915                        "ack_worker",
916                        "Worker queue item not found or already processed",
917                    )
918                } else {
919                    Self::sqlx_to_provider_error("ack_worker", e)
920                }
921            })?;
922
923            let duration_ms = start.elapsed().as_millis() as u64;
924            debug!(
925                target = "duroxide::providers::postgres",
926                operation = "ack_worker",
927                token = %token,
928                duration_ms = duration_ms,
929                "Acknowledged worker without completion (cancelled)"
930            );
931            return Ok(());
932        };
933
934        // Extract instance ID from completion WorkItem
935        let instance_id = match &completion {
936            WorkItem::ActivityCompleted { instance, .. }
937            | WorkItem::ActivityFailed { instance, .. } => instance,
938            _ => {
939                error!(
940                    target = "duroxide::providers::postgres",
941                    operation = "ack_worker",
942                    error_type = "invalid_completion_type",
943                    "Invalid completion work item type"
944                );
945                return Err(ProviderError::permanent(
946                    "ack_worker",
947                    "Invalid completion work item type",
948                ));
949            }
950        };
951
952        let completion_json = serde_json::to_string(&completion).map_err(|e| {
953            ProviderError::permanent("ack_worker", format!("Failed to serialize completion: {e}"))
954        })?;
955
956        let now_ms = Self::now_millis();
957
958        // Call stored procedure to atomically delete worker item and enqueue completion
959        sqlx::query(&format!(
960            "SELECT {}.ack_worker($1, $2, $3, $4)",
961            self.schema_name
962        ))
963        .bind(token)
964        .bind(instance_id)
965        .bind(completion_json)
966        .bind(now_ms)
967        .execute(&*self.pool)
968        .await
969        .map_err(|e| {
970            if e.to_string().contains("Worker queue item not found") {
971                error!(
972                    target = "duroxide::providers::postgres",
973                    operation = "ack_worker",
974                    error_type = "worker_item_not_found",
975                    token = %token,
976                    "Worker queue item not found or already processed"
977                );
978                ProviderError::permanent(
979                    "ack_worker",
980                    "Worker queue item not found or already processed",
981                )
982            } else {
983                Self::sqlx_to_provider_error("ack_worker", e)
984            }
985        })?;
986
987        let duration_ms = start.elapsed().as_millis() as u64;
988        debug!(
989            target = "duroxide::providers::postgres",
990            operation = "ack_worker",
991            instance_id = %instance_id,
992            duration_ms = duration_ms,
993            "Acknowledged worker and enqueued completion"
994        );
995
996        Ok(())
997    }
998
999    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1000    async fn renew_work_item_lock(
1001        &self,
1002        token: &str,
1003        extend_for: Duration,
1004    ) -> Result<(), ProviderError> {
1005        let start = std::time::Instant::now();
1006
1007        // Get current time from application for consistent time reference
1008        let now_ms = Self::now_millis();
1009
1010        // Convert Duration to seconds for the stored procedure
1011        let extend_secs = extend_for.as_secs() as i64;
1012
1013        match sqlx::query(&format!(
1014            "SELECT {}.renew_work_item_lock($1, $2, $3)",
1015            self.schema_name
1016        ))
1017        .bind(token)
1018        .bind(now_ms)
1019        .bind(extend_secs)
1020        .execute(&*self.pool)
1021        .await
1022        {
1023            Ok(_) => {
1024                let duration_ms = start.elapsed().as_millis() as u64;
1025                debug!(
1026                    target = "duroxide::providers::postgres",
1027                    operation = "renew_work_item_lock",
1028                    token = %token,
1029                    extend_for_secs = extend_secs,
1030                    duration_ms = duration_ms,
1031                    "Work item lock renewed successfully"
1032                );
1033                Ok(())
1034            }
1035            Err(e) => {
1036                if let SqlxError::Database(db_err) = &e {
1037                    if db_err.message().contains("Lock token invalid") {
1038                        return Err(ProviderError::permanent(
1039                            "renew_work_item_lock",
1040                            "Lock token invalid, expired, or already acked",
1041                        ));
1042                    }
1043                } else if e.to_string().contains("Lock token invalid") {
1044                    return Err(ProviderError::permanent(
1045                        "renew_work_item_lock",
1046                        "Lock token invalid, expired, or already acked",
1047                    ));
1048                }
1049
1050                Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
1051            }
1052        }
1053    }
1054
1055    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1056    async fn abandon_work_item(
1057        &self,
1058        token: &str,
1059        delay: Option<Duration>,
1060        ignore_attempt: bool,
1061    ) -> Result<(), ProviderError> {
1062        let start = std::time::Instant::now();
1063        let now_ms = Self::now_millis();
1064        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
1065
1066        match sqlx::query(&format!(
1067            "SELECT {}.abandon_work_item($1, $2, $3, $4)",
1068            self.schema_name
1069        ))
1070        .bind(token)
1071        .bind(now_ms)
1072        .bind(delay_param)
1073        .bind(ignore_attempt)
1074        .execute(&*self.pool)
1075        .await
1076        {
1077            Ok(_) => {
1078                let duration_ms = start.elapsed().as_millis() as u64;
1079                debug!(
1080                    target = "duroxide::providers::postgres",
1081                    operation = "abandon_work_item",
1082                    token = %token,
1083                    delay_ms = delay.map(|d| d.as_millis() as u64),
1084                    ignore_attempt = ignore_attempt,
1085                    duration_ms = duration_ms,
1086                    "Abandoned work item via stored procedure"
1087                );
1088                Ok(())
1089            }
1090            Err(e) => {
1091                if let SqlxError::Database(db_err) = &e {
1092                    if db_err.message().contains("Invalid lock token")
1093                        || db_err.message().contains("already acked")
1094                    {
1095                        return Err(ProviderError::permanent(
1096                            "abandon_work_item",
1097                            "Invalid lock token or already acked",
1098                        ));
1099                    }
1100                } else if e.to_string().contains("Invalid lock token")
1101                    || e.to_string().contains("already acked")
1102                {
1103                    return Err(ProviderError::permanent(
1104                        "abandon_work_item",
1105                        "Invalid lock token or already acked",
1106                    ));
1107                }
1108
1109                Err(Self::sqlx_to_provider_error("abandon_work_item", e))
1110            }
1111        }
1112    }
1113
1114    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1115    async fn renew_orchestration_item_lock(
1116        &self,
1117        token: &str,
1118        extend_for: Duration,
1119    ) -> Result<(), ProviderError> {
1120        let start = std::time::Instant::now();
1121
1122        // Get current time from application for consistent time reference
1123        let now_ms = Self::now_millis();
1124
1125        // Convert Duration to seconds for the stored procedure
1126        let extend_secs = extend_for.as_secs() as i64;
1127
1128        match sqlx::query(&format!(
1129            "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
1130            self.schema_name
1131        ))
1132        .bind(token)
1133        .bind(now_ms)
1134        .bind(extend_secs)
1135        .execute(&*self.pool)
1136        .await
1137        {
1138            Ok(_) => {
1139                let duration_ms = start.elapsed().as_millis() as u64;
1140                debug!(
1141                    target = "duroxide::providers::postgres",
1142                    operation = "renew_orchestration_item_lock",
1143                    token = %token,
1144                    extend_for_secs = extend_secs,
1145                    duration_ms = duration_ms,
1146                    "Orchestration item lock renewed successfully"
1147                );
1148                Ok(())
1149            }
1150            Err(e) => {
1151                if let SqlxError::Database(db_err) = &e {
1152                    if db_err.message().contains("Lock token invalid")
1153                        || db_err.message().contains("expired")
1154                        || db_err.message().contains("already released")
1155                    {
1156                        return Err(ProviderError::permanent(
1157                            "renew_orchestration_item_lock",
1158                            "Lock token invalid, expired, or already released",
1159                        ));
1160                    }
1161                } else if e.to_string().contains("Lock token invalid")
1162                    || e.to_string().contains("expired")
1163                    || e.to_string().contains("already released")
1164                {
1165                    return Err(ProviderError::permanent(
1166                        "renew_orchestration_item_lock",
1167                        "Lock token invalid, expired, or already released",
1168                    ));
1169                }
1170
1171                Err(Self::sqlx_to_provider_error(
1172                    "renew_orchestration_item_lock",
1173                    e,
1174                ))
1175            }
1176        }
1177    }
1178
1179    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1180    async fn enqueue_for_orchestrator(
1181        &self,
1182        item: WorkItem,
1183        delay: Option<Duration>,
1184    ) -> Result<(), ProviderError> {
1185        let work_item = serde_json::to_string(&item).map_err(|e| {
1186            ProviderError::permanent(
1187                "enqueue_orchestrator_work",
1188                format!("Failed to serialize work item: {e}"),
1189            )
1190        })?;
1191
1192        // Extract instance ID from WorkItem enum
1193        let instance_id = match &item {
1194            WorkItem::StartOrchestration { instance, .. }
1195            | WorkItem::ActivityCompleted { instance, .. }
1196            | WorkItem::ActivityFailed { instance, .. }
1197            | WorkItem::TimerFired { instance, .. }
1198            | WorkItem::ExternalRaised { instance, .. }
1199            | WorkItem::CancelInstance { instance, .. }
1200            | WorkItem::ContinueAsNew { instance, .. }
1201            | WorkItem::QueueMessage { instance, .. } => instance,
1202            WorkItem::SubOrchCompleted {
1203                parent_instance, ..
1204            }
1205            | WorkItem::SubOrchFailed {
1206                parent_instance, ..
1207            } => parent_instance,
1208            WorkItem::ActivityExecute { .. } => {
1209                return Err(ProviderError::permanent(
1210                    "enqueue_orchestrator_work",
1211                    "ActivityExecute should go to worker queue, not orchestrator queue",
1212                ));
1213            }
1214        };
1215
1216        // Determine visible_at: use max of fire_at_ms (for TimerFired) and delay
1217        let now_ms = Self::now_millis();
1218
1219        let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
1220            if *fire_at_ms > 0 {
1221                // Take max of fire_at_ms and delay (if provided)
1222                if let Some(delay) = delay {
1223                    std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
1224                } else {
1225                    *fire_at_ms
1226                }
1227            } else {
1228                // fire_at_ms is 0, use delay or NOW()
1229                delay
1230                    .map(|d| now_ms as u64 + d.as_millis() as u64)
1231                    .unwrap_or(now_ms as u64)
1232            }
1233        } else {
1234            // Non-timer item: use delay or NOW()
1235            delay
1236                .map(|d| now_ms as u64 + d.as_millis() as u64)
1237                .unwrap_or(now_ms as u64)
1238        };
1239
1240        let visible_at = Utc
1241            .timestamp_millis_opt(visible_at_ms as i64)
1242            .single()
1243            .ok_or_else(|| {
1244                ProviderError::permanent(
1245                    "enqueue_orchestrator_work",
1246                    "Invalid visible_at timestamp",
1247                )
1248            })?;
1249
1250        // ⚠️ CRITICAL: DO NOT extract orchestration metadata - instance creation happens via ack_orchestration_item metadata
1251        // Pass NULL for orchestration_name, orchestration_version, execution_id parameters
1252
1253        // Call stored procedure to enqueue work
1254        sqlx::query(&format!(
1255            "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
1256            self.schema_name
1257        ))
1258        .bind(instance_id)
1259        .bind(&work_item)
1260        .bind(visible_at)
1261        .bind::<Option<String>>(None) // orchestration_name - NULL
1262        .bind::<Option<String>>(None) // orchestration_version - NULL
1263        .bind::<Option<i64>>(None) // execution_id - NULL
1264        .execute(&*self.pool)
1265        .await
1266        .map_err(|e| {
1267            error!(
1268                target = "duroxide::providers::postgres",
1269                operation = "enqueue_orchestrator_work",
1270                error_type = "database_error",
1271                error = %e,
1272                instance_id = %instance_id,
1273                "Failed to enqueue orchestrator work"
1274            );
1275            Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1276        })?;
1277
1278        debug!(
1279            target = "duroxide::providers::postgres",
1280            operation = "enqueue_orchestrator_work",
1281            instance_id = %instance_id,
1282            delay_ms = delay.map(|d| d.as_millis() as u64),
1283            "Enqueued orchestrator work"
1284        );
1285
1286        Ok(())
1287    }
1288
1289    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1290    async fn read_with_execution(
1291        &self,
1292        instance: &str,
1293        execution_id: u64,
1294    ) -> Result<Vec<Event>, ProviderError> {
1295        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1296            "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1297            self.table_name("history")
1298        ))
1299        .bind(instance)
1300        .bind(execution_id as i64)
1301        .fetch_all(&*self.pool)
1302        .await
1303        .ok()
1304        .unwrap_or_default();
1305
1306        Ok(event_data_rows
1307            .into_iter()
1308            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1309            .collect())
1310    }
1311
1312    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1313    async fn renew_session_lock(
1314        &self,
1315        owner_ids: &[&str],
1316        extend_for: Duration,
1317        idle_timeout: Duration,
1318    ) -> Result<usize, ProviderError> {
1319        if owner_ids.is_empty() {
1320            return Ok(0);
1321        }
1322
1323        let now_ms = Self::now_millis();
1324        let extend_ms = extend_for.as_millis() as i64;
1325        let idle_timeout_ms = idle_timeout.as_millis() as i64;
1326        let owner_ids_vec: Vec<&str> = owner_ids.to_vec();
1327
1328        let result = sqlx::query_scalar::<_, i64>(&format!(
1329            "SELECT {}.renew_session_lock($1, $2, $3, $4)",
1330            self.schema_name
1331        ))
1332        .bind(&owner_ids_vec)
1333        .bind(now_ms)
1334        .bind(extend_ms)
1335        .bind(idle_timeout_ms)
1336        .fetch_one(&*self.pool)
1337        .await
1338        .map_err(|e| Self::sqlx_to_provider_error("renew_session_lock", e))?;
1339
1340        debug!(
1341            target = "duroxide::providers::postgres",
1342            operation = "renew_session_lock",
1343            owner_count = owner_ids.len(),
1344            sessions_renewed = result,
1345            "Session locks renewed"
1346        );
1347
1348        Ok(result as usize)
1349    }
1350
1351    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1352    async fn cleanup_orphaned_sessions(
1353        &self,
1354        _idle_timeout: Duration,
1355    ) -> Result<usize, ProviderError> {
1356        let now_ms = Self::now_millis();
1357
1358        let result = sqlx::query_scalar::<_, i64>(&format!(
1359            "SELECT {}.cleanup_orphaned_sessions($1)",
1360            self.schema_name
1361        ))
1362        .bind(now_ms)
1363        .fetch_one(&*self.pool)
1364        .await
1365        .map_err(|e| Self::sqlx_to_provider_error("cleanup_orphaned_sessions", e))?;
1366
1367        debug!(
1368            target = "duroxide::providers::postgres",
1369            operation = "cleanup_orphaned_sessions",
1370            sessions_cleaned = result,
1371            "Orphaned sessions cleaned up"
1372        );
1373
1374        Ok(result as usize)
1375    }
1376
1377    fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1378        Some(self)
1379    }
1380
1381    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1382    async fn get_custom_status(
1383        &self,
1384        instance: &str,
1385        last_seen_version: u64,
1386    ) -> Result<Option<(Option<String>, u64)>, ProviderError> {
1387        let row = sqlx::query_as::<_, (Option<String>, i64)>(&format!(
1388            "SELECT * FROM {}.get_custom_status($1, $2)",
1389            self.schema_name
1390        ))
1391        .bind(instance)
1392        .bind(last_seen_version as i64)
1393        .fetch_optional(&*self.pool)
1394        .await
1395        .map_err(|e| Self::sqlx_to_provider_error("get_custom_status", e))?;
1396
1397        match row {
1398            Some((custom_status, version)) => Ok(Some((custom_status, version as u64))),
1399            None => Ok(None),
1400        }
1401    }
1402}
1403
1404#[async_trait::async_trait]
1405impl ProviderAdmin for PostgresProvider {
1406    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1407    async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1408        sqlx::query_scalar(&format!(
1409            "SELECT instance_id FROM {}.list_instances()",
1410            self.schema_name
1411        ))
1412        .fetch_all(&*self.pool)
1413        .await
1414        .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1415    }
1416
1417    #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1418    async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1419        sqlx::query_scalar(&format!(
1420            "SELECT instance_id FROM {}.list_instances_by_status($1)",
1421            self.schema_name
1422        ))
1423        .bind(status)
1424        .fetch_all(&*self.pool)
1425        .await
1426        .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1427    }
1428
1429    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1430    async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1431        let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1432            "SELECT execution_id FROM {}.list_executions($1)",
1433            self.schema_name
1434        ))
1435        .bind(instance)
1436        .fetch_all(&*self.pool)
1437        .await
1438        .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1439
1440        Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1441    }
1442
1443    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1444    async fn read_history_with_execution_id(
1445        &self,
1446        instance: &str,
1447        execution_id: u64,
1448    ) -> Result<Vec<Event>, ProviderError> {
1449        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1450            "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1451            self.schema_name
1452        ))
1453        .bind(instance)
1454        .bind(execution_id as i64)
1455        .fetch_all(&*self.pool)
1456        .await
1457        .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1458
1459        event_data_rows
1460            .into_iter()
1461            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1462            .collect::<Vec<Event>>()
1463            .into_iter()
1464            .map(Ok)
1465            .collect()
1466    }
1467
1468    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1469    async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1470        let execution_id = self.latest_execution_id(instance).await?;
1471        self.read_history_with_execution_id(instance, execution_id)
1472            .await
1473    }
1474
1475    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1476    async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1477        sqlx::query_scalar(&format!(
1478            "SELECT {}.latest_execution_id($1)",
1479            self.schema_name
1480        ))
1481        .bind(instance)
1482        .fetch_optional(&*self.pool)
1483        .await
1484        .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1485        .map(|id: i64| id as u64)
1486        .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1487    }
1488
1489    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1490    async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1491        let row: Option<(
1492            String,
1493            String,
1494            String,
1495            i64,
1496            chrono::DateTime<Utc>,
1497            Option<chrono::DateTime<Utc>>,
1498            Option<String>,
1499            Option<String>,
1500            Option<String>,
1501        )> = sqlx::query_as(&format!(
1502            "SELECT * FROM {}.get_instance_info($1)",
1503            self.schema_name
1504        ))
1505        .bind(instance)
1506        .fetch_optional(&*self.pool)
1507        .await
1508        .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1509
1510        let (
1511            instance_id,
1512            orchestration_name,
1513            orchestration_version,
1514            current_execution_id,
1515            created_at,
1516            updated_at,
1517            status,
1518            output,
1519            parent_instance_id,
1520        ) =
1521            row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1522
1523        Ok(InstanceInfo {
1524            instance_id,
1525            orchestration_name,
1526            orchestration_version,
1527            current_execution_id: current_execution_id as u64,
1528            status: status.unwrap_or_else(|| "Running".to_string()),
1529            output,
1530            created_at: created_at.timestamp_millis() as u64,
1531            updated_at: updated_at
1532                .map(|dt| dt.timestamp_millis() as u64)
1533                .unwrap_or(created_at.timestamp_millis() as u64),
1534            parent_instance_id,
1535        })
1536    }
1537
1538    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1539    async fn get_execution_info(
1540        &self,
1541        instance: &str,
1542        execution_id: u64,
1543    ) -> Result<ExecutionInfo, ProviderError> {
1544        let row: Option<(
1545            i64,
1546            String,
1547            Option<String>,
1548            chrono::DateTime<Utc>,
1549            Option<chrono::DateTime<Utc>>,
1550            i64,
1551        )> = sqlx::query_as(&format!(
1552            "SELECT * FROM {}.get_execution_info($1, $2)",
1553            self.schema_name
1554        ))
1555        .bind(instance)
1556        .bind(execution_id as i64)
1557        .fetch_optional(&*self.pool)
1558        .await
1559        .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1560
1561        let (exec_id, status, output, started_at, completed_at, event_count) = row
1562            .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1563
1564        Ok(ExecutionInfo {
1565            execution_id: exec_id as u64,
1566            status,
1567            output,
1568            started_at: started_at.timestamp_millis() as u64,
1569            completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1570            event_count: event_count as usize,
1571        })
1572    }
1573
1574    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1575    async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1576        let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1577            "SELECT * FROM {}.get_system_metrics()",
1578            self.schema_name
1579        ))
1580        .fetch_optional(&*self.pool)
1581        .await
1582        .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1583
1584        let (
1585            total_instances,
1586            total_executions,
1587            running_instances,
1588            completed_instances,
1589            failed_instances,
1590            total_events,
1591        ) = row.ok_or_else(|| {
1592            ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1593        })?;
1594
1595        Ok(SystemMetrics {
1596            total_instances: total_instances as u64,
1597            total_executions: total_executions as u64,
1598            running_instances: running_instances as u64,
1599            completed_instances: completed_instances as u64,
1600            failed_instances: failed_instances as u64,
1601            total_events: total_events as u64,
1602        })
1603    }
1604
1605    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1606    async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1607        let now_ms = Self::now_millis();
1608
1609        let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1610            "SELECT * FROM {}.get_queue_depths($1)",
1611            self.schema_name
1612        ))
1613        .bind(now_ms)
1614        .fetch_optional(&*self.pool)
1615        .await
1616        .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1617
1618        let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1619            ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1620        })?;
1621
1622        Ok(QueueDepths {
1623            orchestrator_queue: orchestrator_queue as usize,
1624            worker_queue: worker_queue as usize,
1625            timer_queue: 0, // Timers are in orchestrator queue with delayed visibility
1626        })
1627    }
1628
1629    // ===== Hierarchy Primitive Operations =====
1630
1631    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1632    async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError> {
1633        sqlx::query_scalar(&format!(
1634            "SELECT child_instance_id FROM {}.list_children($1)",
1635            self.schema_name
1636        ))
1637        .bind(instance_id)
1638        .fetch_all(&*self.pool)
1639        .await
1640        .map_err(|e| Self::sqlx_to_provider_error("list_children", e))
1641    }
1642
1643    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1644    async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError> {
1645        // The stored procedure raises an exception if instance doesn't exist
1646        // Otherwise returns the parent_instance_id (which may be NULL)
1647        let result: Result<Option<String>, _> =
1648            sqlx::query_scalar(&format!("SELECT {}.get_parent_id($1)", self.schema_name))
1649                .bind(instance_id)
1650                .fetch_one(&*self.pool)
1651                .await;
1652
1653        match result {
1654            Ok(parent_id) => Ok(parent_id),
1655            Err(e) => {
1656                let err_str = e.to_string();
1657                if err_str.contains("Instance not found") {
1658                    Err(ProviderError::permanent(
1659                        "get_parent_id",
1660                        format!("Instance not found: {}", instance_id),
1661                    ))
1662                } else {
1663                    Err(Self::sqlx_to_provider_error("get_parent_id", e))
1664                }
1665            }
1666        }
1667    }
1668
1669    // ===== Deletion Operations =====
1670
1671    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1672    async fn delete_instances_atomic(
1673        &self,
1674        ids: &[String],
1675        force: bool,
1676    ) -> Result<DeleteInstanceResult, ProviderError> {
1677        if ids.is_empty() {
1678            return Ok(DeleteInstanceResult::default());
1679        }
1680
1681        let row: Option<(i64, i64, i64, i64)> = sqlx::query_as(&format!(
1682            "SELECT * FROM {}.delete_instances_atomic($1, $2)",
1683            self.schema_name
1684        ))
1685        .bind(ids)
1686        .bind(force)
1687        .fetch_optional(&*self.pool)
1688        .await
1689        .map_err(|e| {
1690            let err_str = e.to_string();
1691            if err_str.contains("is Running") {
1692                ProviderError::permanent("delete_instances_atomic", err_str)
1693            } else if err_str.contains("Orphan detected") {
1694                ProviderError::permanent("delete_instances_atomic", err_str)
1695            } else {
1696                Self::sqlx_to_provider_error("delete_instances_atomic", e)
1697            }
1698        })?;
1699
1700        let (instances_deleted, executions_deleted, events_deleted, queue_messages_deleted) =
1701            row.unwrap_or((0, 0, 0, 0));
1702
1703        debug!(
1704            target = "duroxide::providers::postgres",
1705            operation = "delete_instances_atomic",
1706            instances_deleted = instances_deleted,
1707            executions_deleted = executions_deleted,
1708            events_deleted = events_deleted,
1709            queue_messages_deleted = queue_messages_deleted,
1710            "Deleted instances atomically"
1711        );
1712
1713        Ok(DeleteInstanceResult {
1714            instances_deleted: instances_deleted as u64,
1715            executions_deleted: executions_deleted as u64,
1716            events_deleted: events_deleted as u64,
1717            queue_messages_deleted: queue_messages_deleted as u64,
1718        })
1719    }
1720
1721    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1722    async fn delete_instance_bulk(
1723        &self,
1724        filter: InstanceFilter,
1725    ) -> Result<DeleteInstanceResult, ProviderError> {
1726        // Build query to find matching root instances in terminal states
1727        let mut sql = format!(
1728            r#"
1729            SELECT i.instance_id
1730            FROM {}.instances i
1731            LEFT JOIN {}.executions e ON i.instance_id = e.instance_id 
1732              AND i.current_execution_id = e.execution_id
1733            WHERE i.parent_instance_id IS NULL
1734              AND e.status IN ('Completed', 'Failed', 'ContinuedAsNew')
1735            "#,
1736            self.schema_name, self.schema_name
1737        );
1738
1739        // Add instance_ids filter if provided
1740        if let Some(ref ids) = filter.instance_ids {
1741            if ids.is_empty() {
1742                return Ok(DeleteInstanceResult::default());
1743            }
1744            let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1745            sql.push_str(&format!(
1746                " AND i.instance_id IN ({})",
1747                placeholders.join(", ")
1748            ));
1749        }
1750
1751        // Add completed_before filter if provided
1752        if filter.completed_before.is_some() {
1753            let param_num = filter
1754                .instance_ids
1755                .as_ref()
1756                .map(|ids| ids.len())
1757                .unwrap_or(0)
1758                + 1;
1759            sql.push_str(&format!(
1760                " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1761                param_num
1762            ));
1763        }
1764
1765        // Add limit
1766        let limit = filter.limit.unwrap_or(1000);
1767        let limit_param_num = filter
1768            .instance_ids
1769            .as_ref()
1770            .map(|ids| ids.len())
1771            .unwrap_or(0)
1772            + if filter.completed_before.is_some() {
1773                1
1774            } else {
1775                0
1776            }
1777            + 1;
1778        sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1779
1780        // Build and execute query
1781        let mut query = sqlx::query_scalar::<_, String>(&sql);
1782        if let Some(ref ids) = filter.instance_ids {
1783            for id in ids {
1784                query = query.bind(id);
1785            }
1786        }
1787        if let Some(completed_before) = filter.completed_before {
1788            query = query.bind(completed_before as i64);
1789        }
1790        query = query.bind(limit as i64);
1791
1792        let instance_ids: Vec<String> = query
1793            .fetch_all(&*self.pool)
1794            .await
1795            .map_err(|e| Self::sqlx_to_provider_error("delete_instance_bulk", e))?;
1796
1797        if instance_ids.is_empty() {
1798            return Ok(DeleteInstanceResult::default());
1799        }
1800
1801        // Delete each instance with cascade
1802        let mut result = DeleteInstanceResult::default();
1803
1804        for instance_id in &instance_ids {
1805            // Get full tree for this root
1806            let tree = self.get_instance_tree(instance_id).await?;
1807
1808            // Atomic delete (tree.all_ids is already in deletion order: children first)
1809            let delete_result = self.delete_instances_atomic(&tree.all_ids, true).await?;
1810            result.instances_deleted += delete_result.instances_deleted;
1811            result.executions_deleted += delete_result.executions_deleted;
1812            result.events_deleted += delete_result.events_deleted;
1813            result.queue_messages_deleted += delete_result.queue_messages_deleted;
1814        }
1815
1816        debug!(
1817            target = "duroxide::providers::postgres",
1818            operation = "delete_instance_bulk",
1819            instances_deleted = result.instances_deleted,
1820            executions_deleted = result.executions_deleted,
1821            events_deleted = result.events_deleted,
1822            queue_messages_deleted = result.queue_messages_deleted,
1823            "Bulk deleted instances"
1824        );
1825
1826        Ok(result)
1827    }
1828
1829    // ===== Pruning Operations =====
1830
1831    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1832    async fn prune_executions(
1833        &self,
1834        instance_id: &str,
1835        options: PruneOptions,
1836    ) -> Result<PruneResult, ProviderError> {
1837        let keep_last: Option<i32> = options.keep_last.map(|v| v as i32);
1838        let completed_before_ms: Option<i64> = options.completed_before.map(|v| v as i64);
1839
1840        let row: Option<(i64, i64, i64)> = sqlx::query_as(&format!(
1841            "SELECT * FROM {}.prune_executions($1, $2, $3)",
1842            self.schema_name
1843        ))
1844        .bind(instance_id)
1845        .bind(keep_last)
1846        .bind(completed_before_ms)
1847        .fetch_optional(&*self.pool)
1848        .await
1849        .map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?;
1850
1851        let (instances_processed, executions_deleted, events_deleted) = row.unwrap_or((0, 0, 0));
1852
1853        debug!(
1854            target = "duroxide::providers::postgres",
1855            operation = "prune_executions",
1856            instance_id = %instance_id,
1857            instances_processed = instances_processed,
1858            executions_deleted = executions_deleted,
1859            events_deleted = events_deleted,
1860            "Pruned executions"
1861        );
1862
1863        Ok(PruneResult {
1864            instances_processed: instances_processed as u64,
1865            executions_deleted: executions_deleted as u64,
1866            events_deleted: events_deleted as u64,
1867        })
1868    }
1869
1870    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1871    async fn prune_executions_bulk(
1872        &self,
1873        filter: InstanceFilter,
1874        options: PruneOptions,
1875    ) -> Result<PruneResult, ProviderError> {
1876        // Find matching instances (all statuses - prune_executions protects current execution)
1877        // Note: We include Running instances because long-running orchestrations (e.g., with
1878        // ContinueAsNew) may have old executions that need pruning. The underlying prune_executions
1879        // call safely skips the current execution regardless of its status.
1880        let mut sql = format!(
1881            r#"
1882            SELECT i.instance_id
1883            FROM {}.instances i
1884            LEFT JOIN {}.executions e ON i.instance_id = e.instance_id 
1885              AND i.current_execution_id = e.execution_id
1886            WHERE 1=1
1887            "#,
1888            self.schema_name, self.schema_name
1889        );
1890
1891        // Add instance_ids filter if provided
1892        if let Some(ref ids) = filter.instance_ids {
1893            if ids.is_empty() {
1894                return Ok(PruneResult::default());
1895            }
1896            let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1897            sql.push_str(&format!(
1898                " AND i.instance_id IN ({})",
1899                placeholders.join(", ")
1900            ));
1901        }
1902
1903        // Add completed_before filter if provided
1904        if filter.completed_before.is_some() {
1905            let param_num = filter
1906                .instance_ids
1907                .as_ref()
1908                .map(|ids| ids.len())
1909                .unwrap_or(0)
1910                + 1;
1911            sql.push_str(&format!(
1912                " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1913                param_num
1914            ));
1915        }
1916
1917        // Add limit
1918        let limit = filter.limit.unwrap_or(1000);
1919        let limit_param_num = filter
1920            .instance_ids
1921            .as_ref()
1922            .map(|ids| ids.len())
1923            .unwrap_or(0)
1924            + if filter.completed_before.is_some() {
1925                1
1926            } else {
1927                0
1928            }
1929            + 1;
1930        sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1931
1932        // Build and execute query
1933        let mut query = sqlx::query_scalar::<_, String>(&sql);
1934        if let Some(ref ids) = filter.instance_ids {
1935            for id in ids {
1936                query = query.bind(id);
1937            }
1938        }
1939        if let Some(completed_before) = filter.completed_before {
1940            query = query.bind(completed_before as i64);
1941        }
1942        query = query.bind(limit as i64);
1943
1944        let instance_ids: Vec<String> = query
1945            .fetch_all(&*self.pool)
1946            .await
1947            .map_err(|e| Self::sqlx_to_provider_error("prune_executions_bulk", e))?;
1948
1949        // Prune each instance
1950        let mut result = PruneResult::default();
1951
1952        for instance_id in &instance_ids {
1953            let single_result = self.prune_executions(instance_id, options.clone()).await?;
1954            result.instances_processed += single_result.instances_processed;
1955            result.executions_deleted += single_result.executions_deleted;
1956            result.events_deleted += single_result.events_deleted;
1957        }
1958
1959        debug!(
1960            target = "duroxide::providers::postgres",
1961            operation = "prune_executions_bulk",
1962            instances_processed = result.instances_processed,
1963            executions_deleted = result.executions_deleted,
1964            events_deleted = result.events_deleted,
1965            "Bulk pruned executions"
1966        );
1967
1968        Ok(result)
1969    }
1970}