Skip to main content

duroxide_pg_opt/
provider.rs

1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4    DeleteInstanceResult, DispatcherCapabilityFilter, ExecutionInfo, ExecutionMetadata,
5    InstanceFilter, InstanceInfo, OrchestrationItem, Provider, ProviderAdmin, ProviderError,
6    PruneOptions, PruneResult, QueueDepths, ScheduledActivityIdentifier, SessionFetchConfig,
7    SystemMetrics, TagFilter, WorkItem,
8};
9use duroxide::{Event, EventKind};
10use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
11use std::sync::Arc;
12use std::time::Duration;
13use std::time::{SystemTime, UNIX_EPOCH};
14use tokio::sync::Notify;
15use tokio::task::JoinHandle;
16use tokio::time::sleep;
17use tracing::{debug, error, info, instrument, warn};
18
19use crate::db_metrics::{record_fetch_result, DbCallTimer, DbOperation, FetchType};
20use crate::migrations::MigrationRunner;
21use crate::notifier::{LongPollConfig, Notifier};
22
23#[cfg(feature = "test-fault-injection")]
24use crate::fault_injection::FaultInjector;
25
26/// PostgreSQL-based provider for Duroxide durable orchestrations.
27///
28/// Implements the [`Provider`] and [`ProviderAdmin`] traits from Duroxide,
29/// storing orchestration state, history, and work queues in PostgreSQL.
30///
31/// # Example
32///
33/// ```rust,no_run
34/// use duroxide_pg_opt::PostgresProvider;
35///
36/// # async fn example() -> anyhow::Result<()> {
37/// // Connect using DATABASE_URL or explicit connection string
38/// let provider = PostgresProvider::new("postgres://localhost/mydb").await?;
39///
40/// // Or use a custom schema for isolation
41/// let provider = PostgresProvider::new_with_schema(
42///     "postgres://localhost/mydb",
43///     Some("my_app"),
44/// ).await?;
45/// # Ok(())
46/// # }
47/// ```
48pub struct PostgresProvider {
49    pool: Arc<PgPool>,
50    schema_name: String,
51
52    // Long-poll infrastructure (None if disabled)
53    orch_notify: Option<Arc<Notify>>,
54    worker_notify: Option<Arc<Notify>>,
55    notifier_handle: Option<JoinHandle<()>>,
56
57    // Fault injection (only present when feature is enabled)
58    #[cfg(feature = "test-fault-injection")]
59    fault_injector: Option<Arc<FaultInjector>>,
60}
61
62impl PostgresProvider {
63    /// Create a new provider with default settings (long-poll enabled).
64    pub async fn new(database_url: &str) -> Result<Self> {
65        Self::new_with_options(database_url, None, LongPollConfig::default()).await
66    }
67
68    /// Create a new provider with a custom schema.
69    pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
70        Self::new_with_options(database_url, schema_name, LongPollConfig::default()).await
71    }
72
73    /// Create a new provider with full configuration options.
74    pub async fn new_with_options(
75        database_url: &str,
76        schema_name: Option<&str>,
77        config: LongPollConfig,
78    ) -> Result<Self> {
79        let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
80            .ok()
81            .and_then(|s| s.parse::<u32>().ok())
82            .unwrap_or(10);
83
84        let pool = PgPoolOptions::new()
85            .max_connections(max_connections)
86            .min_connections(1)
87            .acquire_timeout(std::time::Duration::from_secs(30))
88            .connect(database_url)
89            .await?;
90
91        let schema_name = schema_name.unwrap_or("public").to_string();
92
93        // Run migrations to initialize schema
94        let migration_runner = MigrationRunner::new(Arc::new(pool.clone()), schema_name.clone());
95        migration_runner.migrate().await?;
96
97        // Start notifier thread if long-polling is enabled
98        let (orch_notify, worker_notify, notifier_handle) = if config.enabled {
99            let orch_notify = Arc::new(Notify::new());
100            let worker_notify = Arc::new(Notify::new());
101
102            let mut notifier = Notifier::new(
103                pool.clone(),
104                schema_name.clone(),
105                orch_notify.clone(),
106                worker_notify.clone(),
107                config.clone(),
108            )
109            .await?;
110
111            let handle = tokio::spawn(async move {
112                notifier.run().await;
113            });
114
115            info!(
116                target = "duroxide::providers::postgres",
117                schema = %schema_name,
118                "Long-polling enabled"
119            );
120
121            (Some(orch_notify), Some(worker_notify), Some(handle))
122        } else {
123            debug!(
124                target = "duroxide::providers::postgres",
125                schema = %schema_name,
126                "Long-polling disabled"
127            );
128            (None, None, None)
129        };
130
131        Ok(Self {
132            pool: Arc::new(pool),
133            schema_name,
134            orch_notify,
135            worker_notify,
136            notifier_handle,
137            #[cfg(feature = "test-fault-injection")]
138            fault_injector: None,
139        })
140    }
141
142    /// Create a new provider with fault injection for testing.
143    ///
144    /// This constructor allows injecting faults to test resilience scenarios.
145    /// The FaultInjector can be used to disable the notifier thread.
146    #[cfg(feature = "test-fault-injection")]
147    pub async fn new_with_fault_injection(
148        database_url: &str,
149        schema_name: Option<&str>,
150        config: LongPollConfig,
151        fault_injector: Arc<FaultInjector>,
152    ) -> Result<Self> {
153        let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
154            .ok()
155            .and_then(|s| s.parse::<u32>().ok())
156            .unwrap_or(10);
157
158        // Check fault injection: if notifier is disabled, skip starting it
159        let notifier_disabled = fault_injector.is_notifier_disabled();
160
161        let pool = PgPoolOptions::new()
162            .max_connections(max_connections)
163            .min_connections(1)
164            .acquire_timeout(std::time::Duration::from_secs(30))
165            .connect(database_url)
166            .await?;
167
168        let schema_name = schema_name.unwrap_or("public").to_string();
169
170        // Run migrations to initialize schema
171        let migration_runner = MigrationRunner::new(Arc::new(pool.clone()), schema_name.clone());
172        migration_runner.migrate().await?;
173
174        // Start notifier thread if long-polling is enabled AND not disabled by fault injection
175        let (orch_notify, worker_notify, notifier_handle, fi) =
176            if config.enabled && !notifier_disabled {
177                let orch_notify = Arc::new(Notify::new());
178                let worker_notify = Arc::new(Notify::new());
179
180                let mut notifier = Notifier::new_with_fault_injection(
181                    pool.clone(),
182                    schema_name.clone(),
183                    orch_notify.clone(),
184                    worker_notify.clone(),
185                    config.clone(),
186                    fault_injector.clone(),
187                )
188                .await?;
189
190                let handle = tokio::spawn(async move {
191                    notifier.run().await;
192                });
193
194                info!(
195                    target = "duroxide::providers::postgres",
196                    schema = %schema_name,
197                    "Long-polling enabled"
198                );
199
200                (
201                    Some(orch_notify),
202                    Some(worker_notify),
203                    Some(handle),
204                    Some(fault_injector),
205                )
206            } else {
207                if notifier_disabled {
208                    warn!(
209                        target = "duroxide::providers::postgres",
210                        schema = %schema_name,
211                        "Long-polling disabled by fault injection"
212                    );
213                } else {
214                    debug!(
215                        target = "duroxide::providers::postgres",
216                        schema = %schema_name,
217                        "Long-polling disabled"
218                    );
219                }
220                (None, None, None, Some(fault_injector))
221            };
222
223        Ok(Self {
224            pool: Arc::new(pool),
225            schema_name,
226            orch_notify,
227            worker_notify,
228            notifier_handle,
229            fault_injector: fi,
230        })
231    }
232
233    #[instrument(skip(self), target = "duroxide::providers::postgres")]
234    pub async fn initialize_schema(&self) -> Result<()> {
235        // Schema initialization is now handled by migrations
236        // This method is kept for backward compatibility but delegates to migrations
237        let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
238        migration_runner.migrate().await?;
239        Ok(())
240    }
241
242    /// Get current timestamp in milliseconds (Unix epoch) - static version.
243    ///
244    /// This is the base time calculation without any fault injection adjustments.
245    fn now_millis_base() -> i64 {
246        SystemTime::now()
247            .duration_since(UNIX_EPOCH)
248            .unwrap_or_default()
249            .as_millis() as i64
250    }
251
252    /// Get current timestamp in milliseconds, with optional clock skew adjustment.
253    ///
254    /// When fault injection is enabled and a clock skew is configured, the skew
255    /// is added to the current time. This allows simulating nodes with clocks
256    /// that are ahead (positive skew) or behind (negative skew).
257    ///
258    /// When fault injection is disabled, this is zero-cost and equivalent to
259    /// `now_millis_base()`.
260    #[cfg(feature = "test-fault-injection")]
261    fn now_millis(&self) -> i64 {
262        let base = Self::now_millis_base();
263        if let Some(ref fi) = self.fault_injector {
264            base + fi.get_clock_skew_ms()
265        } else {
266            base
267        }
268    }
269
270    /// Get current timestamp in milliseconds (no fault injection).
271    #[cfg(not(feature = "test-fault-injection"))]
272    fn now_millis(&self) -> i64 {
273        Self::now_millis_base()
274    }
275
276    /// Get schema-qualified table name
277    fn table_name(&self, table: &str) -> String {
278        format!("{}.{}", self.schema_name, table)
279    }
280
281    /// Get the database pool (for testing)
282    pub fn pool(&self) -> &PgPool {
283        &self.pool
284    }
285
286    /// Get the schema name (for testing)
287    pub fn schema_name(&self) -> &str {
288        &self.schema_name
289    }
290
291    /// Convert sqlx::Error to ProviderError with proper classification
292    fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
293        match e {
294            SqlxError::Database(ref db_err) => {
295                // PostgreSQL error codes
296                let code_opt = db_err.code();
297                let code = code_opt.as_deref();
298                if code == Some("40P01") {
299                    // Deadlock detected
300                    ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
301                } else if code == Some("40001") {
302                    // Serialization failure - permanent error (transaction conflict, not transient)
303                    ProviderError::permanent(operation, format!("Serialization failure: {e}"))
304                } else if code == Some("23505") {
305                    // Unique constraint violation (duplicate event)
306                    ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
307                } else if code == Some("23503") {
308                    // Foreign key constraint violation
309                    ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
310                } else {
311                    ProviderError::permanent(operation, format!("Database error: {e}"))
312                }
313            }
314            SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
315                ProviderError::retryable(operation, format!("Connection pool error: {e}"))
316            }
317            SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
318            _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
319        }
320    }
321
322    /// Clean up schema after tests (drops all tables and optionally the schema)
323    ///
324    /// **SAFETY**: Never drops the "public" schema itself, only tables within it.
325    /// Only drops the schema if it's a custom schema (not "public").
326    pub async fn cleanup_schema(&self) -> Result<()> {
327        const MAX_RETRIES: u32 = 5;
328        const BASE_RETRY_DELAY_MS: u64 = 50;
329
330        for attempt in 0..=MAX_RETRIES {
331            let cleanup_result = async {
332                // Call the stored procedure to drop all tables
333                let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("cleanup_schema"));
334                sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
335                    .execute(&*self.pool)
336                    .await?;
337
338                // SAFETY: Never drop the "public" schema - it's a PostgreSQL system schema
339                // Only drop custom schemas created for testing
340                if self.schema_name != "public" {
341                    let _timer = DbCallTimer::new(DbOperation::Ddl, None);
342                    sqlx::query(&format!(
343                        "DROP SCHEMA IF EXISTS {} CASCADE",
344                        self.schema_name
345                    ))
346                    .execute(&*self.pool)
347                    .await?;
348                } else {
349                    // Explicit safeguard: we only drop tables from public schema, never the schema itself
350                    // This ensures we don't accidentally drop the default PostgreSQL schema
351                }
352
353                Ok::<(), SqlxError>(())
354            }
355            .await;
356
357            match cleanup_result {
358                Ok(()) => return Ok(()),
359                Err(SqlxError::Database(db_err)) if db_err.code().as_deref() == Some("40P01") => {
360                    if attempt < MAX_RETRIES {
361                        warn!(
362                            target = "duroxide::providers::postgres",
363                            schema = %self.schema_name,
364                            attempt = attempt + 1,
365                            "Deadlock during cleanup_schema, retrying"
366                        );
367                        sleep(Duration::from_millis(
368                            BASE_RETRY_DELAY_MS * (attempt as u64 + 1),
369                        ))
370                        .await;
371                        continue;
372                    }
373                    return Err(anyhow::anyhow!(db_err.to_string()));
374                }
375                Err(e) => return Err(anyhow::anyhow!(e.to_string())),
376            }
377        }
378
379        Ok(())
380    }
381
382    /// Internal fetch logic for orchestration items with retries
383    async fn do_fetch_orchestration_item(
384        &self,
385        lock_timeout: Duration,
386        filter: Option<&DispatcherCapabilityFilter>,
387    ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
388        let start = std::time::Instant::now();
389
390        const MAX_RETRIES: u32 = 3;
391        const RETRY_DELAY_MS: u64 = 50;
392
393        // Convert Duration to milliseconds
394        let lock_timeout_ms = lock_timeout.as_millis() as i64;
395        let mut _last_error: Option<ProviderError> = None;
396
397        // Extract version filter parameters
398        let (min_packed, max_packed): (Option<i64>, Option<i64>) = if let Some(cap_filter) = filter
399        {
400            match cap_filter.supported_duroxide_versions.first() {
401                Some(range) => {
402                    let min = range.min.major as i64 * 1_000_000
403                        + range.min.minor as i64 * 1_000
404                        + range.min.patch as i64;
405                    let max = range.max.major as i64 * 1_000_000
406                        + range.max.minor as i64 * 1_000
407                        + range.max.patch as i64;
408                    (Some(min), Some(max))
409                }
410                None => {
411                    // Empty supported_duroxide_versions = "supports nothing" → no candidate.
412                    return Ok(None);
413                }
414            }
415        } else {
416            (None, None)
417        };
418
419        for attempt in 0..=MAX_RETRIES {
420            let now_ms = self.now_millis();
421
422            let _timer = DbCallTimer::new(
423                DbOperation::StoredProcedure,
424                Some("fetch_orchestration_item"),
425            );
426            #[allow(clippy::type_complexity)]
427            let result: Result<
428                Option<(
429                    String,
430                    String,
431                    String,
432                    i64,
433                    serde_json::Value,
434                    serde_json::Value,
435                    String,
436                    i32,
437                    serde_json::Value,
438                )>,
439                SqlxError,
440            > = sqlx::query_as(&format!(
441                "SELECT * FROM {}.fetch_orchestration_item($1, $2, $3, $4)",
442                self.schema_name
443            ))
444            .bind(now_ms)
445            .bind(lock_timeout_ms)
446            .bind(min_packed)
447            .bind(max_packed)
448            .fetch_optional(&*self.pool)
449            .await;
450
451            let row = match result {
452                Ok(r) => r,
453                Err(e) => {
454                    let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
455                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
456                        warn!(
457                            target = "duroxide::providers::postgres",
458                            operation = "fetch_orchestration_item",
459                            attempt = attempt + 1,
460                            error = %provider_err,
461                            "Retryable error, will retry"
462                        );
463                        _last_error = Some(provider_err);
464                        sleep(std::time::Duration::from_millis(
465                            RETRY_DELAY_MS * (attempt as u64 + 1),
466                        ))
467                        .await;
468                        continue;
469                    }
470                    return Err(provider_err);
471                }
472            };
473
474            if let Some((
475                instance_id,
476                orchestration_name,
477                orchestration_version,
478                execution_id,
479                history_json,
480                messages_json,
481                lock_token,
482                attempt_count,
483                kv_snapshot_json,
484            )) = row
485            {
486                let (history, history_error) =
487                    match serde_json::from_value::<Vec<Event>>(history_json) {
488                        Ok(h) => (h, None),
489                        Err(e) => {
490                            let error_msg = format!("Failed to deserialize history: {e}");
491                            tracing::warn!(
492                                target = "duroxide::providers::postgres",
493                                instance = %instance_id,
494                                error = %error_msg,
495                                "History deserialization failed, returning item with history_error"
496                            );
497                            (vec![], Some(error_msg))
498                        }
499                    };
500
501                let messages: Vec<WorkItem> =
502                    serde_json::from_value(messages_json).map_err(|e| {
503                        ProviderError::permanent(
504                            "fetch_orchestration_item",
505                            format!("Failed to deserialize messages: {e}"),
506                        )
507                    })?;
508                let kv_snapshot: std::collections::HashMap<String, duroxide::providers::KvEntry> = {
509                    let raw: std::collections::HashMap<String, serde_json::Value> =
510                        serde_json::from_value(kv_snapshot_json).unwrap_or_default();
511                    raw.into_iter()
512                        .filter_map(|(k, v)| {
513                            let value = v.get("value")?.as_str()?.to_string();
514                            let last_updated_at_ms =
515                                v.get("last_updated_at_ms")?.as_u64().unwrap_or(0);
516                            Some((
517                                k,
518                                duroxide::providers::KvEntry {
519                                    value,
520                                    last_updated_at_ms,
521                                },
522                            ))
523                        })
524                        .collect()
525                };
526
527                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
528                debug!(
529                    target = "duroxide::providers::postgres",
530                    operation = "fetch_orchestration_item",
531                    instance_id = %instance_id,
532                    execution_id = execution_id,
533                    message_count = messages.len(),
534                    history_count = history.len(),
535                    attempt_count = attempt_count,
536                    duration_ms = duration_ms,
537                    attempts = attempt + 1,
538                    "Fetched orchestration item via stored procedure"
539                );
540
541                // Record loaded fetch with timing
542                record_fetch_result(FetchType::Orchestration, 1, duration_ms);
543
544                // Orphan queue messages: if orchestration_name is "Unknown", there's
545                // no history, and ALL messages are QueueMessage items, these are orphan
546                // events enqueued before the orchestration started. Drop them by acking
547                // with empty deltas. Other work items (CancelInstance, etc.) may
548                // legitimately race with StartOrchestration and must not be dropped.
549                if orchestration_name == "Unknown"
550                    && history.is_empty()
551                    && messages
552                        .iter()
553                        .all(|m| matches!(m, WorkItem::QueueMessage { .. }))
554                {
555                    let message_count = messages.len();
556                    tracing::warn!(
557                        target = "duroxide::providers::postgres",
558                        instance = %instance_id,
559                        message_count,
560                        "Dropping orphan queue messages — events enqueued before orchestration started are not supported"
561                    );
562                    self.ack_orchestration_item(
563                        &lock_token,
564                        execution_id as u64,
565                        vec![],
566                        vec![],
567                        vec![],
568                        ExecutionMetadata::default(),
569                        vec![],
570                    )
571                    .await?;
572                    return Ok(None);
573                }
574
575                return Ok(Some((
576                    OrchestrationItem {
577                        instance: instance_id,
578                        orchestration_name,
579                        execution_id: execution_id as u64,
580                        version: orchestration_version,
581                        history,
582                        messages,
583                        history_error,
584                        kv_snapshot,
585                    },
586                    lock_token,
587                    attempt_count as u32,
588                )));
589            }
590
591            // Query succeeded but no work found - return immediately
592            // (retries are only for error recovery, not for polling)
593            break;
594        }
595
596        // Record empty fetch with timing
597        let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
598        record_fetch_result(FetchType::Orchestration, 0, duration_ms);
599
600        Ok(None)
601    }
602
603    /// Internal fetch logic for work items
604    async fn do_fetch_work_item(
605        &self,
606        lock_timeout: Duration,
607        session: Option<&SessionFetchConfig>,
608        tag_filter: &TagFilter,
609    ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
610        let start = std::time::Instant::now();
611
612        // Convert Duration to milliseconds
613        let lock_timeout_ms = lock_timeout.as_millis() as i64;
614
615        // Extract session parameters
616        let (owner_id, session_lock_timeout_ms): (Option<&str>, Option<i64>) = match session {
617            Some(config) => (
618                Some(&config.owner_id),
619                Some(config.lock_timeout.as_millis() as i64),
620            ),
621            None => (None, None),
622        };
623
624        let (tag_filter_values, tag_mode): (Option<Vec<String>>, &str) = match tag_filter {
625            TagFilter::DefaultOnly => (None, "default_only"),
626            TagFilter::Tags(tags) => (Some(tags.iter().cloned().collect()), "tags"),
627            TagFilter::DefaultAnd(tags) => (Some(tags.iter().cloned().collect()), "default_and"),
628            TagFilter::Any => (None, "any"),
629            TagFilter::None => (None, "none"),
630        };
631
632        let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("fetch_work_item"));
633        // Returns: work_item, lock_token, attempt_count
634        let row = match sqlx::query_as::<_, (String, String, i32)>(&format!(
635            "SELECT * FROM {}.fetch_work_item($1, $2, $3, $4, $5, $6)",
636            self.schema_name
637        ))
638        .bind(self.now_millis())
639        .bind(lock_timeout_ms)
640        .bind(owner_id)
641        .bind(session_lock_timeout_ms)
642        .bind(tag_filter_values)
643        .bind(tag_mode)
644        .fetch_optional(&*self.pool)
645        .await
646        {
647            Ok(row) => row,
648            Err(e) => {
649                return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
650            }
651        };
652
653        let (work_item_json, lock_token, attempt_count) = match row {
654            Some(row) => row,
655            None => {
656                // Record empty fetch with timing
657                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
658                record_fetch_result(FetchType::WorkItem, 0, duration_ms);
659                return Ok(None);
660            }
661        };
662
663        let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
664            ProviderError::permanent(
665                "fetch_work_item",
666                format!("Failed to deserialize worker item: {e}"),
667            )
668        })?;
669
670        let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
671
672        // Extract instance for logging - different work item types have different structures
673        let instance_id = match &work_item {
674            WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
675            WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
676            WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
677            WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
678            WorkItem::TimerFired { instance, .. } => instance.as_str(),
679            WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
680            WorkItem::CancelInstance { instance, .. } => instance.as_str(),
681            WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
682            WorkItem::SubOrchCompleted {
683                parent_instance, ..
684            } => parent_instance.as_str(),
685            WorkItem::SubOrchFailed {
686                parent_instance, ..
687            } => parent_instance.as_str(),
688            WorkItem::QueueMessage { instance, .. } => instance.as_str(),
689        };
690
691        debug!(
692            target = "duroxide::providers::postgres",
693            operation = "fetch_work_item",
694            instance_id = %instance_id,
695            attempt_count = attempt_count,
696            duration_ms = duration_ms,
697            "Fetched activity work item via stored procedure"
698        );
699
700        // Record loaded fetch with timing
701        record_fetch_result(FetchType::WorkItem, 1, duration_ms);
702
703        Ok(Some((work_item, lock_token, attempt_count as u32)))
704    }
705}
706
707impl Drop for PostgresProvider {
708    fn drop(&mut self) {
709        // Abort the notifier thread when the provider is dropped
710        if let Some(handle) = self.notifier_handle.take() {
711            handle.abort();
712        }
713    }
714}
715
716#[async_trait::async_trait]
717impl Provider for PostgresProvider {
718    fn name(&self) -> &str {
719        env!("CARGO_PKG_NAME")
720    }
721
722    fn version(&self) -> &str {
723        env!("CARGO_PKG_VERSION")
724    }
725
726    #[instrument(skip(self), target = "duroxide::providers::postgres")]
727    async fn fetch_orchestration_item(
728        &self,
729        lock_timeout: Duration,
730        poll_timeout: Duration,
731        filter: Option<&DispatcherCapabilityFilter>,
732    ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
733        // Fast path: Duration::ZERO means "do not wait".
734        // Avoid long-poll notifier bookkeeping to keep behavior deterministic
735        // and reduce contention/overhead on hot paths.
736        if poll_timeout.is_zero() {
737            return self.do_fetch_orchestration_item(lock_timeout, filter).await;
738        }
739
740        // Long-poll pattern: register interest BEFORE checking to avoid race
741        if let Some(notify) = &self.orch_notify {
742            // Step 1: Create the notification future and enable it
743            // enable() registers interest immediately, so any notify_one()
744            // after this point will wake us up (or store a permit if we're not waiting yet).
745            let notified = notify.notified();
746            tokio::pin!(notified);
747            notified.as_mut().enable();
748
749            // Step 2: Try to fetch
750            let result = self
751                .do_fetch_orchestration_item(lock_timeout, filter)
752                .await?;
753            if result.is_some() {
754                return Ok(result);
755            }
756
757            // Step 3: No work - wait for wake signal or timeout
758            // Because we called enable() BEFORE checking, any notify_one()
759            // that happened after step 1 will still wake us up.
760            tokio::select! {
761                _ = &mut notified => {
762                    // Woken by notifier (NOTIFY or timer) - fetch now
763                    return self.do_fetch_orchestration_item(lock_timeout, filter).await;
764                }
765                _ = tokio::time::sleep(poll_timeout) => {
766                    // Timeout - return None, let runtime handle idle sleep
767                    return Ok(None);
768                }
769            }
770        }
771
772        // Long-poll disabled - try once and return immediately (old behavior)
773        self.do_fetch_orchestration_item(lock_timeout, filter).await
774    }
775
776    #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
777    async fn ack_orchestration_item(
778        &self,
779        lock_token: &str,
780        execution_id: u64,
781        history_delta: Vec<Event>,
782        worker_items: Vec<WorkItem>,
783        orchestrator_items: Vec<WorkItem>,
784        metadata: ExecutionMetadata,
785        cancelled_activities: Vec<ScheduledActivityIdentifier>,
786    ) -> Result<(), ProviderError> {
787        let start = std::time::Instant::now();
788
789        const MAX_RETRIES: u32 = 3;
790        const RETRY_DELAY_MS: u64 = 50;
791
792        let mut history_delta_payload = Vec::with_capacity(history_delta.len());
793        for event in &history_delta {
794            if event.event_id() == 0 {
795                return Err(ProviderError::permanent(
796                    "ack_orchestration_item",
797                    "event_id must be set by runtime",
798                ));
799            }
800
801            let event_json = serde_json::to_string(event).map_err(|e| {
802                ProviderError::permanent(
803                    "ack_orchestration_item",
804                    format!("Failed to serialize event: {e}"),
805                )
806            })?;
807
808            let event_type = format!("{event:?}")
809                .split('{')
810                .next()
811                .unwrap_or("Unknown")
812                .trim()
813                .to_string();
814
815            history_delta_payload.push(serde_json::json!({
816                "event_id": event.event_id(),
817                "event_type": event_type,
818                "event_data": event_json,
819            }));
820        }
821
822        let history_delta_json = serde_json::Value::Array(history_delta_payload);
823
824        let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
825            ProviderError::permanent(
826                "ack_orchestration_item",
827                format!("Failed to serialize worker items: {e}"),
828            )
829        })?;
830
831        let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
832            ProviderError::permanent(
833                "ack_orchestration_item",
834                format!("Failed to serialize orchestrator items: {e}"),
835            )
836        })?;
837
838        // Scan history_delta for the last CustomStatusUpdated event
839        let (custom_status_action, custom_status_value): (Option<&str>, Option<&str>) = {
840            let mut last_status: Option<&Option<String>> = None;
841            for event in &history_delta {
842                if let EventKind::CustomStatusUpdated { ref status } = event.kind {
843                    last_status = Some(status);
844                }
845            }
846            match last_status {
847                Some(Some(s)) => (Some("set"), Some(s.as_str())),
848                Some(None) => (Some("clear"), None),
849                None => (None, None),
850            }
851        };
852
853        let kv_mutations: Vec<serde_json::Value> = history_delta
854            .iter()
855            .filter_map(|event| match &event.kind {
856                duroxide::EventKind::KeyValueSet {
857                    key,
858                    value,
859                    last_updated_at_ms,
860                } => Some(serde_json::json!({
861                    "action": "set",
862                    "key": key,
863                    "value": value,
864                    "last_updated_at_ms": last_updated_at_ms,
865                })),
866                duroxide::EventKind::KeyValueCleared { key } => Some(serde_json::json!({
867                    "action": "clear_key",
868                    "key": key,
869                })),
870                duroxide::EventKind::KeyValuesCleared => Some(serde_json::json!({
871                    "action": "clear_all",
872                })),
873                _ => None,
874            })
875            .collect();
876
877        let metadata_json = serde_json::json!({
878            "orchestration_name": metadata.orchestration_name,
879            "orchestration_version": metadata.orchestration_version,
880            "status": metadata.status,
881            "output": metadata.output,
882            "parent_instance_id": metadata.parent_instance_id,
883            "pinned_duroxide_version": metadata.pinned_duroxide_version.as_ref().map(|v| {
884                serde_json::json!({
885                    "major": v.major,
886                    "minor": v.minor,
887                    "patch": v.patch,
888                })
889            }),
890            "custom_status_action": custom_status_action,
891            "custom_status_value": custom_status_value,
892            "kv_mutations": kv_mutations,
893        });
894
895        // Serialize cancelled_activities for lock-stealing cancellation
896        // Each entry needs execution_id and activity_id. The instance_id is constrained
897        // by v_instance_id (derived from lock_token in the stored procedure).
898        //
899        // Note: We intentionally allow cancelled_activities with different execution_ids
900        // than the current p_execution_id. The DELETE will simply be a no-op for
901        // non-matching entries, making the operation idempotent.
902        let cancelled_activities_json = serde_json::Value::Array(
903            cancelled_activities
904                .iter()
905                .map(|sa| {
906                    serde_json::json!({
907                        "execution_id": sa.execution_id,
908                        "activity_id": sa.activity_id
909                    })
910                })
911                .collect(),
912        );
913
914        let now_ms = self.now_millis();
915
916        for attempt in 0..=MAX_RETRIES {
917            let _timer =
918                DbCallTimer::new(DbOperation::StoredProcedure, Some("ack_orchestration_item"));
919            let result = sqlx::query(&format!(
920                "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6, $7, $8)",
921                self.schema_name
922            ))
923            .bind(lock_token)
924            .bind(now_ms)
925            .bind(execution_id as i64)
926            .bind(&history_delta_json)
927            .bind(&worker_items_json)
928            .bind(&orchestrator_items_json)
929            .bind(&metadata_json)
930            .bind(&cancelled_activities_json)
931            .execute(&*self.pool)
932            .await;
933
934            match result {
935                Ok(_) => {
936                    let duration_ms = start.elapsed().as_millis() as u64;
937                    debug!(
938                        target = "duroxide::providers::postgres",
939                        operation = "ack_orchestration_item",
940                        execution_id = execution_id,
941                        history_count = history_delta.len(),
942                        worker_items_count = worker_items.len(),
943                        orchestrator_items_count = orchestrator_items.len(),
944                        duration_ms = duration_ms,
945                        attempts = attempt + 1,
946                        "Acknowledged orchestration item via stored procedure"
947                    );
948                    return Ok(());
949                }
950                Err(e) => {
951                    // Check for permanent errors first
952                    if let SqlxError::Database(db_err) = &e {
953                        if db_err.message().contains("Invalid lock token") {
954                            return Err(ProviderError::permanent(
955                                "ack_orchestration_item",
956                                "Invalid lock token",
957                            ));
958                        }
959                    } else if e.to_string().contains("Invalid lock token") {
960                        return Err(ProviderError::permanent(
961                            "ack_orchestration_item",
962                            "Invalid lock token",
963                        ));
964                    }
965
966                    let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
967                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
968                        warn!(
969                            target = "duroxide::providers::postgres",
970                            operation = "ack_orchestration_item",
971                            attempt = attempt + 1,
972                            error = %provider_err,
973                            "Retryable error, will retry"
974                        );
975                        sleep(std::time::Duration::from_millis(
976                            RETRY_DELAY_MS * (attempt as u64 + 1),
977                        ))
978                        .await;
979                        continue;
980                    }
981                    return Err(provider_err);
982                }
983            }
984        }
985
986        // Should never reach here, but just in case
987        Ok(())
988    }
989    #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
990    async fn abandon_orchestration_item(
991        &self,
992        lock_token: &str,
993        delay: Option<Duration>,
994        ignore_attempt: bool,
995    ) -> Result<(), ProviderError> {
996        let start = std::time::Instant::now();
997        let now_ms = self.now_millis();
998        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
999
1000        let _timer = DbCallTimer::new(
1001            DbOperation::StoredProcedure,
1002            Some("abandon_orchestration_item"),
1003        );
1004        let instance_id = match sqlx::query_scalar::<_, String>(&format!(
1005            "SELECT {}.abandon_orchestration_item($1, $2, $3, $4)",
1006            self.schema_name
1007        ))
1008        .bind(lock_token)
1009        .bind(now_ms)
1010        .bind(delay_param)
1011        .bind(ignore_attempt)
1012        .fetch_one(&*self.pool)
1013        .await
1014        {
1015            Ok(instance_id) => instance_id,
1016            Err(e) => {
1017                if let SqlxError::Database(db_err) = &e {
1018                    if db_err.message().contains("Invalid lock token") {
1019                        return Err(ProviderError::permanent(
1020                            "abandon_orchestration_item",
1021                            "Invalid lock token",
1022                        ));
1023                    }
1024                } else if e.to_string().contains("Invalid lock token") {
1025                    return Err(ProviderError::permanent(
1026                        "abandon_orchestration_item",
1027                        "Invalid lock token",
1028                    ));
1029                }
1030
1031                return Err(Self::sqlx_to_provider_error(
1032                    "abandon_orchestration_item",
1033                    e,
1034                ));
1035            }
1036        };
1037
1038        let duration_ms = start.elapsed().as_millis() as u64;
1039        debug!(
1040            target = "duroxide::providers::postgres",
1041            operation = "abandon_orchestration_item",
1042            instance_id = %instance_id,
1043            delay_ms = delay.map(|d| d.as_millis() as u64),
1044            ignore_attempt = ignore_attempt,
1045            duration_ms = duration_ms,
1046            "Abandoned orchestration item via stored procedure"
1047        );
1048
1049        Ok(())
1050    }
1051
1052    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1053    async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1054        let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("fetch_history"));
1055        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1056            "SELECT out_event_data FROM {}.fetch_history($1)",
1057            self.schema_name
1058        ))
1059        .bind(instance)
1060        .fetch_all(&*self.pool)
1061        .await
1062        .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
1063
1064        Ok(event_data_rows
1065            .into_iter()
1066            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1067            .collect())
1068    }
1069
1070    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1071    async fn append_with_execution(
1072        &self,
1073        instance: &str,
1074        execution_id: u64,
1075        new_events: Vec<Event>,
1076    ) -> Result<(), ProviderError> {
1077        if new_events.is_empty() {
1078            return Ok(());
1079        }
1080
1081        let mut events_payload = Vec::with_capacity(new_events.len());
1082        for event in &new_events {
1083            if event.event_id() == 0 {
1084                error!(
1085                    target = "duroxide::providers::postgres",
1086                    operation = "append_with_execution",
1087                    error_type = "validation_error",
1088                    instance_id = %instance,
1089                    execution_id = execution_id,
1090                    "event_id must be set by runtime"
1091                );
1092                return Err(ProviderError::permanent(
1093                    "append_with_execution",
1094                    "event_id must be set by runtime",
1095                ));
1096            }
1097
1098            let event_json = serde_json::to_string(event).map_err(|e| {
1099                ProviderError::permanent(
1100                    "append_with_execution",
1101                    format!("Failed to serialize event: {e}"),
1102                )
1103            })?;
1104
1105            let event_type = format!("{event:?}")
1106                .split('{')
1107                .next()
1108                .unwrap_or("Unknown")
1109                .trim()
1110                .to_string();
1111
1112            events_payload.push(serde_json::json!({
1113                "event_id": event.event_id(),
1114                "event_type": event_type,
1115                "event_data": event_json,
1116            }));
1117        }
1118
1119        let events_json = serde_json::Value::Array(events_payload);
1120        let now_ms = self.now_millis();
1121
1122        let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("append_history"));
1123        sqlx::query(&format!(
1124            "SELECT {}.append_history($1, $2, $3, $4)",
1125            self.schema_name
1126        ))
1127        .bind(instance)
1128        .bind(execution_id as i64)
1129        .bind(events_json)
1130        .bind(now_ms)
1131        .execute(&*self.pool)
1132        .await
1133        .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
1134
1135        debug!(
1136            target = "duroxide::providers::postgres",
1137            operation = "append_with_execution",
1138            instance_id = %instance,
1139            execution_id = execution_id,
1140            event_count = new_events.len(),
1141            "Appended history events via stored procedure"
1142        );
1143
1144        Ok(())
1145    }
1146
1147    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1148    async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
1149        let work_item = serde_json::to_string(&item).map_err(|e| {
1150            ProviderError::permanent(
1151                "enqueue_worker_work",
1152                format!("Failed to serialize work item: {e}"),
1153            )
1154        })?;
1155
1156        let now_ms = self.now_millis();
1157
1158        // Extract session_id and tag for ActivityExecute items
1159        let (session_id, tag) = match &item {
1160            WorkItem::ActivityExecute {
1161                session_id, tag, ..
1162            } => (session_id.clone(), tag.clone()),
1163            _ => (None, None),
1164        };
1165
1166        let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("enqueue_worker_work"));
1167        sqlx::query(&format!(
1168            "SELECT {}.enqueue_worker_work($1, $2, $3, $4)",
1169            self.schema_name
1170        ))
1171        .bind(work_item)
1172        .bind(now_ms)
1173        .bind(&session_id)
1174        .bind(&tag)
1175        .execute(&*self.pool)
1176        .await
1177        .map_err(|e| {
1178            error!(
1179                target = "duroxide::providers::postgres",
1180                operation = "enqueue_worker_work",
1181                error_type = "database_error",
1182                error = %e,
1183                "Failed to enqueue worker work"
1184            );
1185            Self::sqlx_to_provider_error("enqueue_worker_work", e)
1186        })?;
1187
1188        Ok(())
1189    }
1190
1191    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1192    async fn fetch_work_item(
1193        &self,
1194        lock_timeout: Duration,
1195        poll_timeout: Duration,
1196        session: Option<&SessionFetchConfig>,
1197        tag_filter: &TagFilter,
1198    ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
1199        // Fast path: Duration::ZERO means "do not wait".
1200        // Avoid long-poll notifier bookkeeping to keep behavior deterministic
1201        // and reduce contention/overhead on hot paths.
1202        if poll_timeout.is_zero() {
1203            return self
1204                .do_fetch_work_item(lock_timeout, session, tag_filter)
1205                .await;
1206        }
1207
1208        // Long-poll pattern: register interest BEFORE checking to avoid race
1209        if let Some(notify) = &self.worker_notify {
1210            // Step 1: Create the notification future and enable it
1211            let notified = notify.notified();
1212            tokio::pin!(notified);
1213            notified.as_mut().enable();
1214
1215            // Step 2: Try to fetch
1216            let result = self
1217                .do_fetch_work_item(lock_timeout, session, tag_filter)
1218                .await?;
1219            if result.is_some() {
1220                return Ok(result);
1221            }
1222
1223            // Step 3: No work - wait for wake signal or timeout
1224            tokio::select! {
1225                _ = &mut notified => {
1226                    // Woken by notifier (NOTIFY or timer) - fetch now
1227                    return self.do_fetch_work_item(lock_timeout, session, tag_filter).await;
1228                }
1229                _ = tokio::time::sleep(poll_timeout) => {
1230                    // Timeout - return None, let runtime handle idle sleep
1231                    return Ok(None);
1232                }
1233            }
1234        }
1235
1236        // Long-poll disabled - try once and return immediately (old behavior)
1237        self.do_fetch_work_item(lock_timeout, session, tag_filter)
1238            .await
1239    }
1240
1241    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1242    async fn ack_work_item(
1243        &self,
1244        token: &str,
1245        completion: Option<WorkItem>,
1246    ) -> Result<(), ProviderError> {
1247        let start = std::time::Instant::now();
1248
1249        // Extract instance ID and serialize completion if provided
1250        let (instance_id, completion_json): (Option<String>, Option<String>) = match &completion {
1251            Some(WorkItem::ActivityCompleted { instance, .. })
1252            | Some(WorkItem::ActivityFailed { instance, .. }) => {
1253                let json = serde_json::to_string(&completion).map_err(|e| {
1254                    ProviderError::permanent(
1255                        "ack_worker",
1256                        format!("Failed to serialize completion: {e}"),
1257                    )
1258                })?;
1259                (Some(instance.clone()), Some(json))
1260            }
1261            Some(_) => {
1262                error!(
1263                    target = "duroxide::providers::postgres",
1264                    operation = "ack_worker",
1265                    error_type = "invalid_completion_type",
1266                    "Invalid completion work item type"
1267                );
1268                return Err(ProviderError::permanent(
1269                    "ack_worker",
1270                    "Invalid completion work item type",
1271                ));
1272            }
1273            None => (None, None), // Orchestration terminal/missing - just delete, don't enqueue
1274        };
1275
1276        let now_ms = self.now_millis();
1277
1278        // Call stored procedure to atomically delete worker item and optionally enqueue completion
1279        let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("ack_worker"));
1280        sqlx::query(&format!(
1281            "SELECT {}.ack_worker($1, $2, $3, $4)",
1282            self.schema_name
1283        ))
1284        .bind(token)
1285        .bind(&instance_id)
1286        .bind(&completion_json)
1287        .bind(now_ms)
1288        .execute(&*self.pool)
1289        .await
1290        .map_err(|e| {
1291            if e.to_string().contains("Worker queue item not found") {
1292                error!(
1293                    target = "duroxide::providers::postgres",
1294                    operation = "ack_worker",
1295                    error_type = "worker_item_not_found",
1296                    token = %token,
1297                    "Worker queue item not found or already processed"
1298                );
1299                ProviderError::permanent(
1300                    "ack_worker",
1301                    "Worker queue item not found or already processed",
1302                )
1303            } else {
1304                Self::sqlx_to_provider_error("ack_worker", e)
1305            }
1306        })?;
1307
1308        let duration_ms = start.elapsed().as_millis() as u64;
1309        debug!(
1310            target = "duroxide::providers::postgres",
1311            operation = "ack_worker",
1312            instance_id = ?instance_id,
1313            completion_provided = completion.is_some(),
1314            duration_ms = duration_ms,
1315            "Acknowledged worker item"
1316        );
1317
1318        Ok(())
1319    }
1320
1321    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1322    async fn renew_work_item_lock(
1323        &self,
1324        token: &str,
1325        extend_for: Duration,
1326    ) -> Result<(), ProviderError> {
1327        let start = std::time::Instant::now();
1328
1329        // Get current time from application for consistent time reference
1330        let now_ms = self.now_millis();
1331
1332        // Convert Duration to milliseconds for the stored procedure to avoid truncation
1333        let extend_ms = extend_for.as_millis() as i64;
1334
1335        let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("renew_work_item_lock"));
1336        // In duroxide 0.1.8, renew returns () and fails if entry missing (lock stolen)
1337        // The stored procedure will raise an exception if the lock token is invalid or entry deleted
1338        match sqlx::query(&format!(
1339            "SELECT {}.renew_work_item_lock($1, $2, $3)",
1340            self.schema_name
1341        ))
1342        .bind(token)
1343        .bind(now_ms)
1344        .bind(extend_ms)
1345        .execute(&*self.pool)
1346        .await
1347        {
1348            Ok(_) => {
1349                let duration_ms = start.elapsed().as_millis() as u64;
1350                debug!(
1351                    target = "duroxide::providers::postgres",
1352                    operation = "renew_work_item_lock",
1353                    token = %token,
1354                    extend_for_ms = extend_ms,
1355                    duration_ms = duration_ms,
1356                    "Renew work item lock completed successfully"
1357                );
1358                Ok(())
1359            }
1360            Err(e) => {
1361                if let SqlxError::Database(db_err) = &e {
1362                    if db_err.message().contains("Lock token invalid") {
1363                        return Err(ProviderError::permanent(
1364                            "renew_work_item_lock",
1365                            "Lock token invalid, expired, or already acked",
1366                        ));
1367                    }
1368                } else if e.to_string().contains("Lock token invalid") {
1369                    return Err(ProviderError::permanent(
1370                        "renew_work_item_lock",
1371                        "Lock token invalid, expired, or already acked",
1372                    ));
1373                }
1374
1375                Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
1376            }
1377        }
1378    }
1379
1380    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1381    async fn abandon_work_item(
1382        &self,
1383        token: &str,
1384        delay: Option<Duration>,
1385        ignore_attempt: bool,
1386    ) -> Result<(), ProviderError> {
1387        let start = std::time::Instant::now();
1388        let now_ms = self.now_millis();
1389        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
1390
1391        let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("abandon_work_item"));
1392        match sqlx::query(&format!(
1393            "SELECT {}.abandon_work_item($1, $2, $3, $4)",
1394            self.schema_name
1395        ))
1396        .bind(token)
1397        .bind(now_ms)
1398        .bind(delay_param)
1399        .bind(ignore_attempt)
1400        .execute(&*self.pool)
1401        .await
1402        {
1403            Ok(_) => {
1404                let duration_ms = start.elapsed().as_millis() as u64;
1405                debug!(
1406                    target = "duroxide::providers::postgres",
1407                    operation = "abandon_work_item",
1408                    token = %token,
1409                    delay_ms = delay.map(|d| d.as_millis() as u64),
1410                    ignore_attempt = ignore_attempt,
1411                    duration_ms = duration_ms,
1412                    "Abandoned work item via stored procedure"
1413                );
1414                Ok(())
1415            }
1416            Err(e) => {
1417                if let SqlxError::Database(db_err) = &e {
1418                    if db_err.message().contains("Invalid lock token")
1419                        || db_err.message().contains("already acked")
1420                    {
1421                        return Err(ProviderError::permanent(
1422                            "abandon_work_item",
1423                            "Invalid lock token or already acked",
1424                        ));
1425                    }
1426                } else if e.to_string().contains("Invalid lock token")
1427                    || e.to_string().contains("already acked")
1428                {
1429                    return Err(ProviderError::permanent(
1430                        "abandon_work_item",
1431                        "Invalid lock token or already acked",
1432                    ));
1433                }
1434
1435                Err(Self::sqlx_to_provider_error("abandon_work_item", e))
1436            }
1437        }
1438    }
1439
1440    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1441    async fn renew_orchestration_item_lock(
1442        &self,
1443        token: &str,
1444        extend_for: Duration,
1445    ) -> Result<(), ProviderError> {
1446        let start = std::time::Instant::now();
1447
1448        // Get current time from application for consistent time reference
1449        let now_ms = self.now_millis();
1450
1451        // Convert Duration to milliseconds for the stored procedure to avoid truncation
1452        let extend_ms = extend_for.as_millis() as i64;
1453
1454        let _timer = DbCallTimer::new(
1455            DbOperation::StoredProcedure,
1456            Some("renew_orchestration_item_lock"),
1457        );
1458        match sqlx::query(&format!(
1459            "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
1460            self.schema_name
1461        ))
1462        .bind(token)
1463        .bind(now_ms)
1464        .bind(extend_ms)
1465        .execute(&*self.pool)
1466        .await
1467        {
1468            Ok(_) => {
1469                let duration_ms = start.elapsed().as_millis() as u64;
1470                debug!(
1471                    target = "duroxide::providers::postgres",
1472                    operation = "renew_orchestration_item_lock",
1473                    token = %token,
1474                extend_for_ms = extend_ms,
1475                    duration_ms = duration_ms,
1476                    "Orchestration item lock renewed successfully"
1477                );
1478                Ok(())
1479            }
1480            Err(e) => {
1481                if let SqlxError::Database(db_err) = &e {
1482                    if db_err.message().contains("Lock token invalid")
1483                        || db_err.message().contains("expired")
1484                        || db_err.message().contains("already released")
1485                    {
1486                        return Err(ProviderError::permanent(
1487                            "renew_orchestration_item_lock",
1488                            "Lock token invalid, expired, or already released",
1489                        ));
1490                    }
1491                } else if e.to_string().contains("Lock token invalid")
1492                    || e.to_string().contains("expired")
1493                    || e.to_string().contains("already released")
1494                {
1495                    return Err(ProviderError::permanent(
1496                        "renew_orchestration_item_lock",
1497                        "Lock token invalid, expired, or already released",
1498                    ));
1499                }
1500
1501                Err(Self::sqlx_to_provider_error(
1502                    "renew_orchestration_item_lock",
1503                    e,
1504                ))
1505            }
1506        }
1507    }
1508
1509    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1510    async fn enqueue_for_orchestrator(
1511        &self,
1512        item: WorkItem,
1513        delay: Option<Duration>,
1514    ) -> Result<(), ProviderError> {
1515        let work_item = serde_json::to_string(&item).map_err(|e| {
1516            ProviderError::permanent(
1517                "enqueue_orchestrator_work",
1518                format!("Failed to serialize work item: {e}"),
1519            )
1520        })?;
1521
1522        // Extract instance ID from WorkItem enum
1523        let instance_id = match &item {
1524            WorkItem::StartOrchestration { instance, .. }
1525            | WorkItem::ActivityCompleted { instance, .. }
1526            | WorkItem::ActivityFailed { instance, .. }
1527            | WorkItem::TimerFired { instance, .. }
1528            | WorkItem::ExternalRaised { instance, .. }
1529            | WorkItem::CancelInstance { instance, .. }
1530            | WorkItem::ContinueAsNew { instance, .. }
1531            | WorkItem::QueueMessage { instance, .. } => instance,
1532            WorkItem::SubOrchCompleted {
1533                parent_instance, ..
1534            }
1535            | WorkItem::SubOrchFailed {
1536                parent_instance, ..
1537            } => parent_instance,
1538            WorkItem::ActivityExecute { .. } => {
1539                return Err(ProviderError::permanent(
1540                    "enqueue_orchestrator_work",
1541                    "ActivityExecute should go to worker queue, not orchestrator queue",
1542                ));
1543            }
1544        };
1545
1546        // Determine visible_at: use max of fire_at_ms (for TimerFired) and delay
1547        let now_ms = self.now_millis();
1548
1549        let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
1550            if *fire_at_ms > 0 {
1551                // Take max of fire_at_ms and delay (if provided)
1552                if let Some(delay) = delay {
1553                    std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
1554                } else {
1555                    *fire_at_ms
1556                }
1557            } else {
1558                // fire_at_ms is 0, use delay or NOW()
1559                delay
1560                    .map(|d| now_ms as u64 + d.as_millis() as u64)
1561                    .unwrap_or(now_ms as u64)
1562            }
1563        } else {
1564            // Non-timer item: use delay or NOW()
1565            delay
1566                .map(|d| now_ms as u64 + d.as_millis() as u64)
1567                .unwrap_or(now_ms as u64)
1568        };
1569
1570        let visible_at = Utc
1571            .timestamp_millis_opt(visible_at_ms as i64)
1572            .single()
1573            .ok_or_else(|| {
1574                ProviderError::permanent(
1575                    "enqueue_orchestrator_work",
1576                    "Invalid visible_at timestamp",
1577                )
1578            })?;
1579
1580        // ⚠️ CRITICAL: DO NOT extract orchestration metadata - instance creation happens via ack_orchestration_item metadata
1581        // Pass NULL for orchestration_name, orchestration_version, execution_id parameters
1582
1583        // Call stored procedure to enqueue work
1584        let _timer = DbCallTimer::new(
1585            DbOperation::StoredProcedure,
1586            Some("enqueue_orchestrator_work"),
1587        );
1588        sqlx::query(&format!(
1589            "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6, $7)",
1590            self.schema_name
1591        ))
1592        .bind(instance_id)
1593        .bind(&work_item)
1594        .bind(visible_at)
1595        .bind(now_ms) // p_now_ms - for created_at
1596        .bind::<Option<String>>(None) // orchestration_name - NULL
1597        .bind::<Option<String>>(None) // orchestration_version - NULL
1598        .bind::<Option<i64>>(None) // execution_id - NULL
1599        .execute(&*self.pool)
1600        .await
1601        .map_err(|e| {
1602            error!(
1603                target = "duroxide::providers::postgres",
1604                operation = "enqueue_orchestrator_work",
1605                error_type = "database_error",
1606                error = %e,
1607                instance_id = %instance_id,
1608                "Failed to enqueue orchestrator work"
1609            );
1610            Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1611        })?;
1612
1613        debug!(
1614            target = "duroxide::providers::postgres",
1615            operation = "enqueue_orchestrator_work",
1616            instance_id = %instance_id,
1617            delay_ms = delay.map(|d| d.as_millis() as u64),
1618            "Enqueued orchestrator work"
1619        );
1620
1621        Ok(())
1622    }
1623
1624    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1625    async fn read_with_execution(
1626        &self,
1627        instance: &str,
1628        execution_id: u64,
1629    ) -> Result<Vec<Event>, ProviderError> {
1630        let _timer = DbCallTimer::new(DbOperation::Select, None);
1631        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1632            "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1633            self.table_name("history")
1634        ))
1635        .bind(instance)
1636        .bind(execution_id as i64)
1637        .fetch_all(&*self.pool)
1638        .await
1639        .ok()
1640        .unwrap_or_default();
1641
1642        Ok(event_data_rows
1643            .into_iter()
1644            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1645            .collect())
1646    }
1647
1648    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1649    async fn renew_session_lock(
1650        &self,
1651        owner_ids: &[&str],
1652        extend_for: Duration,
1653        idle_timeout: Duration,
1654    ) -> Result<usize, ProviderError> {
1655        if owner_ids.is_empty() {
1656            return Ok(0);
1657        }
1658
1659        let now_ms = self.now_millis();
1660        let extend_ms = extend_for.as_millis() as i64;
1661        let idle_timeout_ms = idle_timeout.as_millis() as i64;
1662        let owner_ids_vec: Vec<&str> = owner_ids.to_vec();
1663
1664        let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("renew_session_lock"));
1665        let result = sqlx::query_scalar::<_, i64>(&format!(
1666            "SELECT {}.renew_session_lock($1, $2, $3, $4)",
1667            self.schema_name
1668        ))
1669        .bind(&owner_ids_vec)
1670        .bind(now_ms)
1671        .bind(extend_ms)
1672        .bind(idle_timeout_ms)
1673        .fetch_one(&*self.pool)
1674        .await
1675        .map_err(|e| Self::sqlx_to_provider_error("renew_session_lock", e))?;
1676
1677        debug!(
1678            target = "duroxide::providers::postgres",
1679            operation = "renew_session_lock",
1680            owner_count = owner_ids.len(),
1681            sessions_renewed = result,
1682            "Session locks renewed"
1683        );
1684
1685        Ok(result as usize)
1686    }
1687
1688    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1689    async fn cleanup_orphaned_sessions(
1690        &self,
1691        _idle_timeout: Duration,
1692    ) -> Result<usize, ProviderError> {
1693        let now_ms = self.now_millis();
1694
1695        let _timer = DbCallTimer::new(
1696            DbOperation::StoredProcedure,
1697            Some("cleanup_orphaned_sessions"),
1698        );
1699        let result = sqlx::query_scalar::<_, i64>(&format!(
1700            "SELECT {}.cleanup_orphaned_sessions($1)",
1701            self.schema_name
1702        ))
1703        .bind(now_ms)
1704        .fetch_one(&*self.pool)
1705        .await
1706        .map_err(|e| Self::sqlx_to_provider_error("cleanup_orphaned_sessions", e))?;
1707
1708        debug!(
1709            target = "duroxide::providers::postgres",
1710            operation = "cleanup_orphaned_sessions",
1711            sessions_cleaned = result,
1712            "Orphaned sessions cleaned up"
1713        );
1714
1715        Ok(result as usize)
1716    }
1717
1718    fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1719        Some(self)
1720    }
1721
1722    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1723    async fn get_custom_status(
1724        &self,
1725        instance: &str,
1726        last_seen_version: u64,
1727    ) -> Result<Option<(Option<String>, u64)>, ProviderError> {
1728        let row = sqlx::query_as::<_, (Option<String>, i64)>(&format!(
1729            "SELECT * FROM {}.get_custom_status($1, $2)",
1730            self.schema_name
1731        ))
1732        .bind(instance)
1733        .bind(last_seen_version as i64)
1734        .fetch_optional(&*self.pool)
1735        .await
1736        .map_err(|e| Self::sqlx_to_provider_error("get_custom_status", e))?;
1737
1738        match row {
1739            Some((custom_status, version)) => Ok(Some((custom_status, version as u64))),
1740            None => Ok(None),
1741        }
1742    }
1743
1744    async fn get_kv_value(
1745        &self,
1746        instance_id: &str,
1747        key: &str,
1748    ) -> Result<Option<String>, ProviderError> {
1749        let query = format!(
1750            "SELECT value FROM {}.kv_store WHERE instance_id = $1 AND key = $2",
1751            self.schema_name
1752        );
1753        let result: Option<(String,)> = sqlx::query_as(&query)
1754            .bind(instance_id)
1755            .bind(key)
1756            .fetch_optional(&*self.pool)
1757            .await
1758            .map_err(|e| ProviderError::retryable("get_kv_value", format!("get_kv_value: {e}")))?;
1759        Ok(result.map(|(value,)| value))
1760    }
1761
1762    async fn get_kv_all_values(
1763        &self,
1764        instance_id: &str,
1765    ) -> Result<std::collections::HashMap<String, String>, ProviderError> {
1766        let query = format!(
1767            "SELECT key, value FROM {}.kv_store WHERE instance_id = $1",
1768            self.schema_name
1769        );
1770        let rows: Vec<(String, String)> = sqlx::query_as(&query)
1771            .bind(instance_id)
1772            .fetch_all(&*self.pool)
1773            .await
1774            .map_err(|e| Self::sqlx_to_provider_error("get_kv_all_values", e))?;
1775        Ok(rows.into_iter().collect())
1776    }
1777}
1778
1779#[async_trait::async_trait]
1780impl ProviderAdmin for PostgresProvider {
1781    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1782    async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1783        let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("list_instances"));
1784        sqlx::query_scalar(&format!(
1785            "SELECT instance_id FROM {}.list_instances()",
1786            self.schema_name
1787        ))
1788        .fetch_all(&*self.pool)
1789        .await
1790        .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1791    }
1792
1793    #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1794    async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1795        let _timer = DbCallTimer::new(
1796            DbOperation::StoredProcedure,
1797            Some("list_instances_by_status"),
1798        );
1799        sqlx::query_scalar(&format!(
1800            "SELECT instance_id FROM {}.list_instances_by_status($1)",
1801            self.schema_name
1802        ))
1803        .bind(status)
1804        .fetch_all(&*self.pool)
1805        .await
1806        .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1807    }
1808
1809    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1810    async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1811        let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("list_executions"));
1812        let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1813            "SELECT execution_id FROM {}.list_executions($1)",
1814            self.schema_name
1815        ))
1816        .bind(instance)
1817        .fetch_all(&*self.pool)
1818        .await
1819        .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1820
1821        Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1822    }
1823
1824    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1825    async fn read_history_with_execution_id(
1826        &self,
1827        instance: &str,
1828        execution_id: u64,
1829    ) -> Result<Vec<Event>, ProviderError> {
1830        let _timer = DbCallTimer::new(
1831            DbOperation::StoredProcedure,
1832            Some("fetch_history_with_execution"),
1833        );
1834        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1835            "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1836            self.schema_name
1837        ))
1838        .bind(instance)
1839        .bind(execution_id as i64)
1840        .fetch_all(&*self.pool)
1841        .await
1842        .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1843
1844        event_data_rows
1845            .into_iter()
1846            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1847            .collect::<Vec<Event>>()
1848            .into_iter()
1849            .map(Ok)
1850            .collect()
1851    }
1852
1853    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1854    async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1855        let execution_id = self.latest_execution_id(instance).await?;
1856        self.read_history_with_execution_id(instance, execution_id)
1857            .await
1858    }
1859
1860    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1861    async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1862        let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("latest_execution_id"));
1863        sqlx::query_scalar(&format!(
1864            "SELECT {}.latest_execution_id($1)",
1865            self.schema_name
1866        ))
1867        .bind(instance)
1868        .fetch_optional(&*self.pool)
1869        .await
1870        .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1871        .map(|id: i64| id as u64)
1872        .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1873    }
1874
1875    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1876    async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1877        let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("get_instance_info"));
1878        let row: Option<(
1879            String,
1880            String,
1881            String,
1882            i64,
1883            chrono::DateTime<Utc>,
1884            Option<chrono::DateTime<Utc>>,
1885            Option<String>,
1886            Option<String>,
1887            Option<String>,
1888        )> = sqlx::query_as(&format!(
1889            "SELECT * FROM {}.get_instance_info($1)",
1890            self.schema_name
1891        ))
1892        .bind(instance)
1893        .fetch_optional(&*self.pool)
1894        .await
1895        .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1896
1897        let (
1898            instance_id,
1899            orchestration_name,
1900            orchestration_version,
1901            current_execution_id,
1902            created_at,
1903            updated_at,
1904            status,
1905            output,
1906            parent_instance_id,
1907        ) =
1908            row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1909
1910        Ok(InstanceInfo {
1911            instance_id,
1912            orchestration_name,
1913            orchestration_version,
1914            current_execution_id: current_execution_id as u64,
1915            status: status.unwrap_or_else(|| "Running".to_string()),
1916            output,
1917            created_at: created_at.timestamp_millis() as u64,
1918            updated_at: updated_at
1919                .map(|dt| dt.timestamp_millis() as u64)
1920                .unwrap_or(created_at.timestamp_millis() as u64),
1921            parent_instance_id,
1922        })
1923    }
1924
1925    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1926    async fn get_execution_info(
1927        &self,
1928        instance: &str,
1929        execution_id: u64,
1930    ) -> Result<ExecutionInfo, ProviderError> {
1931        let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("get_execution_info"));
1932        let row: Option<(
1933            i64,
1934            String,
1935            Option<String>,
1936            chrono::DateTime<Utc>,
1937            Option<chrono::DateTime<Utc>>,
1938            i64,
1939        )> = sqlx::query_as(&format!(
1940            "SELECT * FROM {}.get_execution_info($1, $2)",
1941            self.schema_name
1942        ))
1943        .bind(instance)
1944        .bind(execution_id as i64)
1945        .fetch_optional(&*self.pool)
1946        .await
1947        .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1948
1949        let (exec_id, status, output, started_at, completed_at, event_count) = row
1950            .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1951
1952        Ok(ExecutionInfo {
1953            execution_id: exec_id as u64,
1954            status,
1955            output,
1956            started_at: started_at.timestamp_millis() as u64,
1957            completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1958            event_count: event_count as usize,
1959        })
1960    }
1961
1962    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1963    async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1964        let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("get_system_metrics"));
1965        let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1966            "SELECT * FROM {}.get_system_metrics()",
1967            self.schema_name
1968        ))
1969        .fetch_optional(&*self.pool)
1970        .await
1971        .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1972
1973        let (
1974            total_instances,
1975            total_executions,
1976            running_instances,
1977            completed_instances,
1978            failed_instances,
1979            total_events,
1980        ) = row.ok_or_else(|| {
1981            ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1982        })?;
1983
1984        Ok(SystemMetrics {
1985            total_instances: total_instances as u64,
1986            total_executions: total_executions as u64,
1987            running_instances: running_instances as u64,
1988            completed_instances: completed_instances as u64,
1989            failed_instances: failed_instances as u64,
1990            total_events: total_events as u64,
1991        })
1992    }
1993
1994    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1995    async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1996        let now_ms = self.now_millis();
1997
1998        let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("get_queue_depths"));
1999        let row: Option<(i64, i64)> = sqlx::query_as(&format!(
2000            "SELECT * FROM {}.get_queue_depths($1)",
2001            self.schema_name
2002        ))
2003        .bind(now_ms)
2004        .fetch_optional(&*self.pool)
2005        .await
2006        .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
2007
2008        let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
2009            ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
2010        })?;
2011
2012        Ok(QueueDepths {
2013            orchestrator_queue: orchestrator_queue as usize,
2014            worker_queue: worker_queue as usize,
2015            timer_queue: 0, // Timers are in orchestrator queue with delayed visibility
2016        })
2017    }
2018
2019    // ===== Hierarchy Primitives =====
2020
2021    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
2022    async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError> {
2023        let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("list_children"));
2024        sqlx::query_scalar(&format!(
2025            "SELECT child_instance_id FROM {}.list_children($1)",
2026            self.schema_name
2027        ))
2028        .bind(instance_id)
2029        .fetch_all(&*self.pool)
2030        .await
2031        .map_err(|e| Self::sqlx_to_provider_error("list_children", e))
2032    }
2033
2034    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
2035    async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError> {
2036        let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("get_parent_id"));
2037        let result: Result<Option<String>, _> =
2038            sqlx::query_scalar(&format!("SELECT {}.get_parent_id($1)", self.schema_name))
2039                .bind(instance_id)
2040                .fetch_one(&*self.pool)
2041                .await;
2042
2043        match result {
2044            Ok(parent_id) => Ok(parent_id),
2045            Err(e) => {
2046                let err_str = e.to_string();
2047                if err_str.contains("Instance not found") {
2048                    Err(ProviderError::permanent(
2049                        "get_parent_id",
2050                        format!("Instance not found: {instance_id}"),
2051                    ))
2052                } else {
2053                    Err(Self::sqlx_to_provider_error("get_parent_id", e))
2054                }
2055            }
2056        }
2057    }
2058
2059    // ===== Deletion Operations =====
2060
2061    #[instrument(skip(self), target = "duroxide::providers::postgres")]
2062    async fn delete_instances_atomic(
2063        &self,
2064        ids: &[String],
2065        force: bool,
2066    ) -> Result<DeleteInstanceResult, ProviderError> {
2067        if ids.is_empty() {
2068            return Ok(DeleteInstanceResult::default());
2069        }
2070
2071        let _timer = DbCallTimer::new(
2072            DbOperation::StoredProcedure,
2073            Some("delete_instances_atomic"),
2074        );
2075        let row: Option<(i64, i64, i64, i64)> = sqlx::query_as(&format!(
2076            "SELECT * FROM {}.delete_instances_atomic($1, $2)",
2077            self.schema_name
2078        ))
2079        .bind(ids)
2080        .bind(force)
2081        .fetch_optional(&*self.pool)
2082        .await
2083        .map_err(|e| {
2084            let err_str = e.to_string();
2085            if err_str.contains("is Running") || err_str.contains("Orphan detected") {
2086                ProviderError::permanent("delete_instances_atomic", err_str)
2087            } else {
2088                Self::sqlx_to_provider_error("delete_instances_atomic", e)
2089            }
2090        })?;
2091
2092        let (instances_deleted, executions_deleted, events_deleted, queue_messages_deleted) =
2093            row.unwrap_or((0, 0, 0, 0));
2094
2095        debug!(
2096            target = "duroxide::providers::postgres",
2097            operation = "delete_instances_atomic",
2098            instances_deleted = instances_deleted,
2099            executions_deleted = executions_deleted,
2100            events_deleted = events_deleted,
2101            queue_messages_deleted = queue_messages_deleted,
2102            "Deleted instances atomically"
2103        );
2104
2105        Ok(DeleteInstanceResult {
2106            instances_deleted: instances_deleted as u64,
2107            executions_deleted: executions_deleted as u64,
2108            events_deleted: events_deleted as u64,
2109            queue_messages_deleted: queue_messages_deleted as u64,
2110        })
2111    }
2112
2113    #[instrument(skip(self), target = "duroxide::providers::postgres")]
2114    async fn delete_instance_bulk(
2115        &self,
2116        filter: InstanceFilter,
2117    ) -> Result<DeleteInstanceResult, ProviderError> {
2118        // Build query to find matching root instances in terminal states
2119        let mut sql = format!(
2120            r#"
2121            SELECT i.instance_id
2122            FROM {}.instances i
2123            LEFT JOIN {}.executions e ON i.instance_id = e.instance_id 
2124              AND i.current_execution_id = e.execution_id
2125            WHERE i.parent_instance_id IS NULL
2126              AND e.status IN ('Completed', 'Failed', 'ContinuedAsNew')
2127            "#,
2128            self.schema_name, self.schema_name
2129        );
2130
2131        // Add instance_ids filter if provided
2132        if let Some(ref ids) = filter.instance_ids {
2133            if ids.is_empty() {
2134                return Ok(DeleteInstanceResult::default());
2135            }
2136            let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${i}")).collect();
2137            sql.push_str(&format!(
2138                " AND i.instance_id IN ({})",
2139                placeholders.join(", ")
2140            ));
2141        }
2142
2143        // Add completed_before filter if provided
2144        if filter.completed_before.is_some() {
2145            let param_num = filter
2146                .instance_ids
2147                .as_ref()
2148                .map(|ids| ids.len())
2149                .unwrap_or(0)
2150                + 1;
2151            sql.push_str(&format!(
2152                " AND e.completed_at < TO_TIMESTAMP(${param_num} / 1000.0)"
2153            ));
2154        }
2155
2156        // Add limit
2157        let limit = filter.limit.unwrap_or(1000);
2158        let limit_param_num = filter
2159            .instance_ids
2160            .as_ref()
2161            .map(|ids| ids.len())
2162            .unwrap_or(0)
2163            + if filter.completed_before.is_some() {
2164                1
2165            } else {
2166                0
2167            }
2168            + 1;
2169        sql.push_str(&format!(" LIMIT ${limit_param_num}"));
2170
2171        // Build and execute query
2172        let _timer = DbCallTimer::new(DbOperation::Select, None);
2173        let mut query = sqlx::query_scalar::<_, String>(&sql);
2174        if let Some(ref ids) = filter.instance_ids {
2175            for id in ids {
2176                query = query.bind(id);
2177            }
2178        }
2179        if let Some(completed_before) = filter.completed_before {
2180            query = query.bind(completed_before as i64);
2181        }
2182        query = query.bind(limit as i64);
2183
2184        let instance_ids: Vec<String> = query
2185            .fetch_all(&*self.pool)
2186            .await
2187            .map_err(|e| Self::sqlx_to_provider_error("delete_instance_bulk", e))?;
2188
2189        if instance_ids.is_empty() {
2190            return Ok(DeleteInstanceResult::default());
2191        }
2192
2193        // Delete each instance with cascade
2194        let mut result = DeleteInstanceResult::default();
2195
2196        for instance_id in &instance_ids {
2197            // Get full tree for this root (uses default impl from ProviderAdmin trait)
2198            let tree = self.get_instance_tree(instance_id).await?;
2199
2200            // Atomic delete (tree.all_ids is already in deletion order: children first)
2201            let delete_result = self.delete_instances_atomic(&tree.all_ids, true).await?;
2202            result.instances_deleted += delete_result.instances_deleted;
2203            result.executions_deleted += delete_result.executions_deleted;
2204            result.events_deleted += delete_result.events_deleted;
2205            result.queue_messages_deleted += delete_result.queue_messages_deleted;
2206        }
2207
2208        debug!(
2209            target = "duroxide::providers::postgres",
2210            operation = "delete_instance_bulk",
2211            instances_deleted = result.instances_deleted,
2212            executions_deleted = result.executions_deleted,
2213            events_deleted = result.events_deleted,
2214            queue_messages_deleted = result.queue_messages_deleted,
2215            "Bulk deleted instances"
2216        );
2217
2218        Ok(result)
2219    }
2220
2221    // ===== Pruning Operations =====
2222
2223    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
2224    async fn prune_executions(
2225        &self,
2226        instance_id: &str,
2227        options: PruneOptions,
2228    ) -> Result<PruneResult, ProviderError> {
2229        let keep_last: Option<i32> = options.keep_last.map(|v| v as i32);
2230        let completed_before_ms: Option<i64> = options.completed_before.map(|v| v as i64);
2231
2232        let _timer = DbCallTimer::new(DbOperation::StoredProcedure, Some("prune_executions"));
2233        let row: Option<(i64, i64, i64)> = sqlx::query_as(&format!(
2234            "SELECT * FROM {}.prune_executions($1, $2, $3)",
2235            self.schema_name
2236        ))
2237        .bind(instance_id)
2238        .bind(keep_last)
2239        .bind(completed_before_ms)
2240        .fetch_optional(&*self.pool)
2241        .await
2242        .map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?;
2243
2244        let (instances_processed, executions_deleted, events_deleted) = row.unwrap_or((0, 0, 0));
2245
2246        debug!(
2247            target = "duroxide::providers::postgres",
2248            operation = "prune_executions",
2249            instance_id = %instance_id,
2250            instances_processed = instances_processed,
2251            executions_deleted = executions_deleted,
2252            events_deleted = events_deleted,
2253            "Pruned executions"
2254        );
2255
2256        Ok(PruneResult {
2257            instances_processed: instances_processed as u64,
2258            executions_deleted: executions_deleted as u64,
2259            events_deleted: events_deleted as u64,
2260        })
2261    }
2262
2263    #[instrument(skip(self), target = "duroxide::providers::postgres")]
2264    async fn prune_executions_bulk(
2265        &self,
2266        filter: InstanceFilter,
2267        options: PruneOptions,
2268    ) -> Result<PruneResult, ProviderError> {
2269        // Build query to find matching instances (all statuses)
2270        // Note: We include Running instances because long-running orchestrations (e.g., with
2271        // ContinueAsNew) may have old executions that need pruning. The underlying prune_executions
2272        // stored procedure safely skips the current execution regardless of its status.
2273        let mut sql = format!(
2274            r#"
2275            SELECT i.instance_id
2276            FROM {}.instances i
2277            LEFT JOIN {}.executions e ON i.instance_id = e.instance_id 
2278              AND i.current_execution_id = e.execution_id
2279            WHERE 1=1
2280            "#,
2281            self.schema_name, self.schema_name
2282        );
2283
2284        // Add instance_ids filter if provided
2285        if let Some(ref ids) = filter.instance_ids {
2286            if ids.is_empty() {
2287                return Ok(PruneResult::default());
2288            }
2289            let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${i}")).collect();
2290            sql.push_str(&format!(
2291                " AND i.instance_id IN ({})",
2292                placeholders.join(", ")
2293            ));
2294        }
2295
2296        // Add completed_before filter if provided
2297        if filter.completed_before.is_some() {
2298            let param_num = filter
2299                .instance_ids
2300                .as_ref()
2301                .map(|ids| ids.len())
2302                .unwrap_or(0)
2303                + 1;
2304            sql.push_str(&format!(
2305                " AND e.completed_at < TO_TIMESTAMP(${param_num} / 1000.0)"
2306            ));
2307        }
2308
2309        // Add limit
2310        let limit = filter.limit.unwrap_or(1000);
2311        let limit_param_num = filter
2312            .instance_ids
2313            .as_ref()
2314            .map(|ids| ids.len())
2315            .unwrap_or(0)
2316            + if filter.completed_before.is_some() {
2317                1
2318            } else {
2319                0
2320            }
2321            + 1;
2322        sql.push_str(&format!(" LIMIT ${limit_param_num}"));
2323
2324        // Build and execute query
2325        let _timer = DbCallTimer::new(DbOperation::Select, None);
2326        let mut query = sqlx::query_scalar::<_, String>(&sql);
2327        if let Some(ref ids) = filter.instance_ids {
2328            for id in ids {
2329                query = query.bind(id);
2330            }
2331        }
2332        if let Some(completed_before) = filter.completed_before {
2333            query = query.bind(completed_before as i64);
2334        }
2335        query = query.bind(limit as i64);
2336
2337        let instance_ids: Vec<String> = query
2338            .fetch_all(&*self.pool)
2339            .await
2340            .map_err(|e| Self::sqlx_to_provider_error("prune_executions_bulk", e))?;
2341
2342        // Prune each instance
2343        let mut result = PruneResult::default();
2344
2345        for instance_id in &instance_ids {
2346            let single_result = self.prune_executions(instance_id, options.clone()).await?;
2347            result.instances_processed += single_result.instances_processed;
2348            result.executions_deleted += single_result.executions_deleted;
2349            result.events_deleted += single_result.events_deleted;
2350        }
2351
2352        debug!(
2353            target = "duroxide::providers::postgres",
2354            operation = "prune_executions_bulk",
2355            instances_processed = result.instances_processed,
2356            executions_deleted = result.executions_deleted,
2357            events_deleted = result.events_deleted,
2358            "Bulk pruned executions"
2359        );
2360
2361        Ok(result)
2362    }
2363}