Skip to main content

duroxide_pg/
provider.rs

1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4    DeleteInstanceResult, DispatcherCapabilityFilter, ExecutionInfo, ExecutionMetadata,
5    InstanceFilter, InstanceInfo, OrchestrationItem, Provider, ProviderAdmin, ProviderError,
6    PruneOptions, PruneResult, QueueDepths, ScheduledActivityIdentifier, SessionFetchConfig,
7    SystemMetrics, TagFilter, WorkItem,
8};
9use duroxide::{Event, EventKind};
10use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
11use std::sync::Arc;
12use std::time::Duration;
13use std::time::{SystemTime, UNIX_EPOCH};
14use tokio::time::sleep;
15use tracing::{debug, error, instrument, warn};
16
17use crate::migrations::MigrationRunner;
18
19/// PostgreSQL-based provider for Duroxide durable orchestrations.
20///
21/// Implements the [`Provider`] and [`ProviderAdmin`] traits from Duroxide,
22/// storing orchestration state, history, and work queues in PostgreSQL.
23///
24/// # Example
25///
26/// ```rust,no_run
27/// use duroxide_pg::PostgresProvider;
28///
29/// # async fn example() -> anyhow::Result<()> {
30/// // Connect using DATABASE_URL or explicit connection string
31/// let provider = PostgresProvider::new("postgres://localhost/mydb").await?;
32///
33/// // Or use a custom schema for isolation
34/// let provider = PostgresProvider::new_with_schema(
35///     "postgres://localhost/mydb",
36///     Some("my_app"),
37/// ).await?;
38/// # Ok(())
39/// # }
40/// ```
41pub struct PostgresProvider {
42    pool: Arc<PgPool>,
43    schema_name: String,
44}
45
46impl PostgresProvider {
47    pub async fn new(database_url: &str) -> Result<Self> {
48        Self::new_with_schema(database_url, None).await
49    }
50
51    pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
52        let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
53            .ok()
54            .and_then(|s| s.parse::<u32>().ok())
55            .unwrap_or(10);
56
57        let pool = PgPoolOptions::new()
58            .max_connections(max_connections)
59            .min_connections(1)
60            .acquire_timeout(std::time::Duration::from_secs(30))
61            .connect(database_url)
62            .await?;
63
64        let schema_name = schema_name.unwrap_or("public").to_string();
65
66        let provider = Self {
67            pool: Arc::new(pool),
68            schema_name: schema_name.clone(),
69        };
70
71        // Run migrations to initialize schema
72        let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
73        migration_runner.migrate().await?;
74
75        Ok(provider)
76    }
77
78    #[instrument(skip(self), target = "duroxide::providers::postgres")]
79    pub async fn initialize_schema(&self) -> Result<()> {
80        // Schema initialization is now handled by migrations
81        // This method is kept for backward compatibility but delegates to migrations
82        let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
83        migration_runner.migrate().await?;
84        Ok(())
85    }
86
87    /// Get current timestamp in milliseconds (Unix epoch)
88    fn now_millis() -> i64 {
89        SystemTime::now()
90            .duration_since(UNIX_EPOCH)
91            .unwrap()
92            .as_millis() as i64
93    }
94
95    /// Get schema-qualified table name
96    fn table_name(&self, table: &str) -> String {
97        format!("{}.{}", self.schema_name, table)
98    }
99
100    /// Get the database pool (for testing)
101    pub fn pool(&self) -> &PgPool {
102        &self.pool
103    }
104
105    /// Get the schema name (for testing)
106    pub fn schema_name(&self) -> &str {
107        &self.schema_name
108    }
109
110    /// Convert sqlx::Error to ProviderError with proper classification
111    fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
112        match e {
113            SqlxError::Database(ref db_err) => {
114                // PostgreSQL error codes
115                let code_opt = db_err.code();
116                let code = code_opt.as_deref();
117                if code == Some("40P01") {
118                    // Deadlock detected
119                    ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
120                } else if code == Some("40001") {
121                    // Serialization failure - permanent error (transaction conflict, not transient)
122                    ProviderError::permanent(operation, format!("Serialization failure: {e}"))
123                } else if code == Some("23505") {
124                    // Unique constraint violation (duplicate event)
125                    ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
126                } else if code == Some("23503") {
127                    // Foreign key constraint violation
128                    ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
129                } else {
130                    ProviderError::permanent(operation, format!("Database error: {e}"))
131                }
132            }
133            SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
134                ProviderError::retryable(operation, format!("Connection pool error: {e}"))
135            }
136            SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
137            _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
138        }
139    }
140
141    /// Convert TagFilter to SQL parameters (mode string + tag array)
142    fn tag_filter_to_sql(filter: &TagFilter) -> (&'static str, Vec<String>) {
143        match filter {
144            TagFilter::DefaultOnly => ("default_only", vec![]),
145            TagFilter::Tags(set) => {
146                let mut tags: Vec<String> = set.iter().cloned().collect();
147                tags.sort();
148                ("tags", tags)
149            }
150            TagFilter::DefaultAnd(set) => {
151                let mut tags: Vec<String> = set.iter().cloned().collect();
152                tags.sort();
153                ("default_and", tags)
154            }
155            TagFilter::Any => ("any", vec![]),
156            TagFilter::None => ("none", vec![]),
157        }
158    }
159
160    /// Clean up schema after tests (drops all tables and optionally the schema)
161    ///
162    /// **SAFETY**: Never drops the "public" schema itself, only tables within it.
163    /// Only drops the schema if it's a custom schema (not "public").
164    pub async fn cleanup_schema(&self) -> Result<()> {
165        const MAX_RETRIES: u32 = 5;
166        const BASE_RETRY_DELAY_MS: u64 = 50;
167
168        for attempt in 0..=MAX_RETRIES {
169            let cleanup_result = async {
170                // Call the stored procedure to drop all tables
171                sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
172                    .execute(&*self.pool)
173                    .await?;
174
175                // SAFETY: Never drop the "public" schema - it's a PostgreSQL system schema
176                // Only drop custom schemas created for testing
177                if self.schema_name != "public" {
178                    sqlx::query(&format!(
179                        "DROP SCHEMA IF EXISTS {} CASCADE",
180                        self.schema_name
181                    ))
182                    .execute(&*self.pool)
183                    .await?;
184                } else {
185                    // Explicit safeguard: we only drop tables from public schema, never the schema itself
186                    // This ensures we don't accidentally drop the default PostgreSQL schema
187                }
188
189                Ok::<(), SqlxError>(())
190            }
191            .await;
192
193            match cleanup_result {
194                Ok(()) => return Ok(()),
195                Err(SqlxError::Database(db_err)) if db_err.code().as_deref() == Some("40P01") => {
196                    if attempt < MAX_RETRIES {
197                        warn!(
198                            target = "duroxide::providers::postgres",
199                            schema = %self.schema_name,
200                            attempt = attempt + 1,
201                            "Deadlock during cleanup_schema, retrying"
202                        );
203                        sleep(Duration::from_millis(
204                            BASE_RETRY_DELAY_MS * (attempt as u64 + 1),
205                        ))
206                        .await;
207                        continue;
208                    }
209                    return Err(anyhow::anyhow!(db_err.to_string()));
210                }
211                Err(e) => return Err(anyhow::anyhow!(e.to_string())),
212            }
213        }
214
215        Ok(())
216    }
217}
218
219#[async_trait::async_trait]
220impl Provider for PostgresProvider {
221    fn name(&self) -> &str {
222        "duroxide-pg"
223    }
224
225    fn version(&self) -> &str {
226        env!("CARGO_PKG_VERSION")
227    }
228
229    #[instrument(skip(self), target = "duroxide::providers::postgres")]
230    async fn fetch_orchestration_item(
231        &self,
232        lock_timeout: Duration,
233        _poll_timeout: Duration,
234        filter: Option<&DispatcherCapabilityFilter>,
235    ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
236        let start = std::time::Instant::now();
237
238        const MAX_RETRIES: u32 = 3;
239        const RETRY_DELAY_MS: u64 = 50;
240
241        // Convert Duration to milliseconds
242        let lock_timeout_ms = lock_timeout.as_millis() as i64;
243        let mut _last_error: Option<ProviderError> = None;
244
245        // Extract version filter from capability filter
246        let (min_packed, max_packed) = if let Some(f) = filter {
247            if let Some(range) = f.supported_duroxide_versions.first() {
248                let min = range.min.major as i64 * 1_000_000
249                    + range.min.minor as i64 * 1_000
250                    + range.min.patch as i64;
251                let max = range.max.major as i64 * 1_000_000
252                    + range.max.minor as i64 * 1_000
253                    + range.max.patch as i64;
254                (Some(min), Some(max))
255            } else {
256                // Empty supported_duroxide_versions = supports nothing
257                return Ok(None);
258            }
259        } else {
260            (None, None)
261        };
262
263        for attempt in 0..=MAX_RETRIES {
264            let now_ms = Self::now_millis();
265
266            let result: Result<
267                Option<(
268                    String,
269                    String,
270                    String,
271                    i64,
272                    serde_json::Value,
273                    serde_json::Value,
274                    String,
275                    i32,
276                    serde_json::Value,
277                )>,
278                SqlxError,
279            > = sqlx::query_as(&format!(
280                "SELECT * FROM {}.fetch_orchestration_item($1, $2, $3, $4)",
281                self.schema_name
282            ))
283            .bind(now_ms)
284            .bind(lock_timeout_ms)
285            .bind(min_packed)
286            .bind(max_packed)
287            .fetch_optional(&*self.pool)
288            .await;
289
290            let row = match result {
291                Ok(r) => r,
292                Err(e) => {
293                    let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
294                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
295                        warn!(
296                            target = "duroxide::providers::postgres",
297                            operation = "fetch_orchestration_item",
298                            attempt = attempt + 1,
299                            error = %provider_err,
300                            "Retryable error, will retry"
301                        );
302                        _last_error = Some(provider_err);
303                        sleep(std::time::Duration::from_millis(
304                            RETRY_DELAY_MS * (attempt as u64 + 1),
305                        ))
306                        .await;
307                        continue;
308                    }
309                    return Err(provider_err);
310                }
311            };
312
313            if let Some((
314                instance_id,
315                orchestration_name,
316                orchestration_version,
317                execution_id,
318                history_json,
319                messages_json,
320                lock_token,
321                attempt_count,
322                kv_snapshot_json,
323            )) = row
324            {
325                let (history, history_error) =
326                    match serde_json::from_value::<Vec<Event>>(history_json) {
327                        Ok(h) => (h, None),
328                        Err(e) => {
329                            let error_msg = format!("Failed to deserialize history: {e}");
330                            warn!(
331                                target = "duroxide::providers::postgres",
332                                instance = %instance_id,
333                                error = %error_msg,
334                                "History deserialization failed, returning item with history_error"
335                            );
336                            (vec![], Some(error_msg))
337                        }
338                    };
339
340                let messages: Vec<WorkItem> =
341                    serde_json::from_value(messages_json).map_err(|e| {
342                        ProviderError::permanent(
343                            "fetch_orchestration_item",
344                            format!("Failed to deserialize messages: {e}"),
345                        )
346                    })?;
347                let kv_snapshot: std::collections::HashMap<String, duroxide::providers::KvEntry> = {
348                    let raw: std::collections::HashMap<String, serde_json::Value> =
349                        serde_json::from_value(kv_snapshot_json).unwrap_or_default();
350                    raw.into_iter()
351                        .filter_map(|(k, v)| {
352                            let value = v.get("value")?.as_str()?.to_string();
353                            let last_updated_at_ms =
354                                v.get("last_updated_at_ms")?.as_u64().unwrap_or(0);
355                            Some((
356                                k,
357                                duroxide::providers::KvEntry {
358                                    value,
359                                    last_updated_at_ms,
360                                },
361                            ))
362                        })
363                        .collect()
364                };
365
366                let duration_ms = start.elapsed().as_millis() as u64;
367                debug!(
368                    target = "duroxide::providers::postgres",
369                    operation = "fetch_orchestration_item",
370                    instance_id = %instance_id,
371                    execution_id = execution_id,
372                    message_count = messages.len(),
373                    history_count = history.len(),
374                    attempt_count = attempt_count,
375                    duration_ms = duration_ms,
376                    attempts = attempt + 1,
377                    "Fetched orchestration item via stored procedure"
378                );
379
380                // Orphan queue messages: if orchestration_name is "Unknown", there's
381                // no history, and ALL messages are QueueMessage items, these are orphan
382                // events enqueued before the orchestration started. Drop them by acking
383                // with empty deltas. Other work items (CancelInstance, etc.) may
384                // legitimately race with StartOrchestration and must not be dropped.
385                if orchestration_name == "Unknown"
386                    && history.is_empty()
387                    && messages
388                        .iter()
389                        .all(|m| matches!(m, WorkItem::QueueMessage { .. }))
390                {
391                    let message_count = messages.len();
392                    tracing::warn!(
393                        target = "duroxide::providers::postgres",
394                        instance = %instance_id,
395                        message_count,
396                        "Dropping orphan queue messages — events enqueued before orchestration started are not supported"
397                    );
398                    self.ack_orchestration_item(
399                        &lock_token,
400                        execution_id as u64,
401                        vec![],
402                        vec![],
403                        vec![],
404                        ExecutionMetadata::default(),
405                        vec![],
406                    )
407                    .await?;
408                    return Ok(None);
409                }
410
411                return Ok(Some((
412                    OrchestrationItem {
413                        instance: instance_id,
414                        orchestration_name,
415                        execution_id: execution_id as u64,
416                        version: orchestration_version,
417                        history,
418                        messages,
419                        history_error,
420                        kv_snapshot,
421                    },
422                    lock_token,
423                    attempt_count as u32,
424                )));
425            }
426
427            // No result found - return immediately (short polling behavior)
428            // Only retry with delay on retryable errors (handled above)
429            return Ok(None);
430        }
431
432        Ok(None)
433    }
434    #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
435    async fn ack_orchestration_item(
436        &self,
437        lock_token: &str,
438        execution_id: u64,
439        history_delta: Vec<Event>,
440        worker_items: Vec<WorkItem>,
441        orchestrator_items: Vec<WorkItem>,
442        metadata: ExecutionMetadata,
443        cancelled_activities: Vec<ScheduledActivityIdentifier>,
444    ) -> Result<(), ProviderError> {
445        let start = std::time::Instant::now();
446
447        const MAX_RETRIES: u32 = 3;
448        const RETRY_DELAY_MS: u64 = 50;
449
450        let mut history_delta_payload = Vec::with_capacity(history_delta.len());
451        for event in &history_delta {
452            if event.event_id() == 0 {
453                return Err(ProviderError::permanent(
454                    "ack_orchestration_item",
455                    "event_id must be set by runtime",
456                ));
457            }
458
459            let event_json = serde_json::to_string(event).map_err(|e| {
460                ProviderError::permanent(
461                    "ack_orchestration_item",
462                    format!("Failed to serialize event: {e}"),
463                )
464            })?;
465
466            let event_type = format!("{event:?}")
467                .split('{')
468                .next()
469                .unwrap_or("Unknown")
470                .trim()
471                .to_string();
472
473            history_delta_payload.push(serde_json::json!({
474                "event_id": event.event_id(),
475                "event_type": event_type,
476                "event_data": event_json,
477            }));
478        }
479
480        let history_delta_json = serde_json::Value::Array(history_delta_payload);
481
482        let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
483            ProviderError::permanent(
484                "ack_orchestration_item",
485                format!("Failed to serialize worker items: {e}"),
486            )
487        })?;
488
489        let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
490            ProviderError::permanent(
491                "ack_orchestration_item",
492                format!("Failed to serialize orchestrator items: {e}"),
493            )
494        })?;
495
496        // Scan history_delta for the last CustomStatusUpdated event
497        let (custom_status_action, custom_status_value): (Option<&str>, Option<&str>) = {
498            let mut last_status: Option<&Option<String>> = None;
499            for event in &history_delta {
500                if let EventKind::CustomStatusUpdated { ref status } = event.kind {
501                    last_status = Some(status);
502                }
503            }
504            match last_status {
505                Some(Some(s)) => (Some("set"), Some(s.as_str())),
506                Some(None) => (Some("clear"), None),
507                None => (None, None),
508            }
509        };
510
511        let kv_mutations: Vec<serde_json::Value> = history_delta
512            .iter()
513            .filter_map(|event| match &event.kind {
514                EventKind::KeyValueSet {
515                    key,
516                    value,
517                    last_updated_at_ms,
518                } => Some(serde_json::json!({
519                    "action": "set",
520                    "key": key,
521                    "value": value,
522                    "last_updated_at_ms": last_updated_at_ms,
523                })),
524                EventKind::KeyValueCleared { key } => Some(serde_json::json!({
525                    "action": "clear_key",
526                    "key": key,
527                })),
528                EventKind::KeyValuesCleared => Some(serde_json::json!({
529                    "action": "clear_all",
530                })),
531                _ => None,
532            })
533            .collect();
534
535        let metadata_json = serde_json::json!({
536            "orchestration_name": metadata.orchestration_name,
537            "orchestration_version": metadata.orchestration_version,
538            "status": metadata.status,
539            "output": metadata.output,
540            "parent_instance_id": metadata.parent_instance_id,
541            "pinned_duroxide_version": metadata.pinned_duroxide_version.as_ref().map(|v| {
542                serde_json::json!({
543                    "major": v.major,
544                    "minor": v.minor,
545                    "patch": v.patch,
546                })
547            }),
548            "custom_status_action": custom_status_action,
549            "custom_status_value": custom_status_value,
550            "kv_mutations": kv_mutations,
551        });
552
553        // Serialize cancelled activities for lock stealing
554        let cancelled_activities_json: Vec<serde_json::Value> = cancelled_activities
555            .iter()
556            .map(|a| {
557                serde_json::json!({
558                    "instance": a.instance,
559                    "execution_id": a.execution_id,
560                    "activity_id": a.activity_id,
561                })
562            })
563            .collect();
564        let cancelled_activities_json = serde_json::Value::Array(cancelled_activities_json);
565
566        for attempt in 0..=MAX_RETRIES {
567            let now_ms = Self::now_millis();
568            let result = sqlx::query(&format!(
569                "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6, $7, $8)",
570                self.schema_name
571            ))
572            .bind(lock_token)
573            .bind(now_ms)
574            .bind(execution_id as i64)
575            .bind(&history_delta_json)
576            .bind(&worker_items_json)
577            .bind(&orchestrator_items_json)
578            .bind(&metadata_json)
579            .bind(&cancelled_activities_json)
580            .execute(&*self.pool)
581            .await;
582
583            match result {
584                Ok(_) => {
585                    let duration_ms = start.elapsed().as_millis() as u64;
586                    debug!(
587                        target = "duroxide::providers::postgres",
588                        operation = "ack_orchestration_item",
589                        execution_id = execution_id,
590                        history_count = history_delta.len(),
591                        worker_items_count = worker_items.len(),
592                        orchestrator_items_count = orchestrator_items.len(),
593                        cancelled_activities_count = cancelled_activities.len(),
594                        duration_ms = duration_ms,
595                        attempts = attempt + 1,
596                        "Acknowledged orchestration item via stored procedure"
597                    );
598                    return Ok(());
599                }
600                Err(e) => {
601                    // Check for permanent errors first
602                    if let SqlxError::Database(db_err) = &e {
603                        if db_err.message().contains("Invalid lock token") {
604                            return Err(ProviderError::permanent(
605                                "ack_orchestration_item",
606                                "Invalid lock token",
607                            ));
608                        }
609                    } else if e.to_string().contains("Invalid lock token") {
610                        return Err(ProviderError::permanent(
611                            "ack_orchestration_item",
612                            "Invalid lock token",
613                        ));
614                    }
615
616                    let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
617                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
618                        warn!(
619                            target = "duroxide::providers::postgres",
620                            operation = "ack_orchestration_item",
621                            attempt = attempt + 1,
622                            error = %provider_err,
623                            "Retryable error, will retry"
624                        );
625                        sleep(std::time::Duration::from_millis(
626                            RETRY_DELAY_MS * (attempt as u64 + 1),
627                        ))
628                        .await;
629                        continue;
630                    }
631                    return Err(provider_err);
632                }
633            }
634        }
635
636        // Should never reach here, but just in case
637        Ok(())
638    }
639    #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
640    async fn abandon_orchestration_item(
641        &self,
642        lock_token: &str,
643        delay: Option<Duration>,
644        ignore_attempt: bool,
645    ) -> Result<(), ProviderError> {
646        let start = std::time::Instant::now();
647        let now_ms = Self::now_millis();
648        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
649
650        let instance_id = match sqlx::query_scalar::<_, String>(&format!(
651            "SELECT {}.abandon_orchestration_item($1, $2, $3, $4)",
652            self.schema_name
653        ))
654        .bind(lock_token)
655        .bind(now_ms)
656        .bind(delay_param)
657        .bind(ignore_attempt)
658        .fetch_one(&*self.pool)
659        .await
660        {
661            Ok(instance_id) => instance_id,
662            Err(e) => {
663                if let SqlxError::Database(db_err) = &e {
664                    if db_err.message().contains("Invalid lock token") {
665                        return Err(ProviderError::permanent(
666                            "abandon_orchestration_item",
667                            "Invalid lock token",
668                        ));
669                    }
670                } else if e.to_string().contains("Invalid lock token") {
671                    return Err(ProviderError::permanent(
672                        "abandon_orchestration_item",
673                        "Invalid lock token",
674                    ));
675                }
676
677                return Err(Self::sqlx_to_provider_error(
678                    "abandon_orchestration_item",
679                    e,
680                ));
681            }
682        };
683
684        let duration_ms = start.elapsed().as_millis() as u64;
685        debug!(
686            target = "duroxide::providers::postgres",
687            operation = "abandon_orchestration_item",
688            instance_id = %instance_id,
689            delay_ms = delay.map(|d| d.as_millis() as u64),
690            ignore_attempt = ignore_attempt,
691            duration_ms = duration_ms,
692            "Abandoned orchestration item via stored procedure"
693        );
694
695        Ok(())
696    }
697
698    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
699    async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
700        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
701            "SELECT out_event_data FROM {}.fetch_history($1)",
702            self.schema_name
703        ))
704        .bind(instance)
705        .fetch_all(&*self.pool)
706        .await
707        .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
708
709        Ok(event_data_rows
710            .into_iter()
711            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
712            .collect())
713    }
714
715    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
716    async fn append_with_execution(
717        &self,
718        instance: &str,
719        execution_id: u64,
720        new_events: Vec<Event>,
721    ) -> Result<(), ProviderError> {
722        if new_events.is_empty() {
723            return Ok(());
724        }
725
726        let mut events_payload = Vec::with_capacity(new_events.len());
727        for event in &new_events {
728            if event.event_id() == 0 {
729                error!(
730                    target = "duroxide::providers::postgres",
731                    operation = "append_with_execution",
732                    error_type = "validation_error",
733                    instance_id = %instance,
734                    execution_id = execution_id,
735                    "event_id must be set by runtime"
736                );
737                return Err(ProviderError::permanent(
738                    "append_with_execution",
739                    "event_id must be set by runtime",
740                ));
741            }
742
743            let event_json = serde_json::to_string(event).map_err(|e| {
744                ProviderError::permanent(
745                    "append_with_execution",
746                    format!("Failed to serialize event: {e}"),
747                )
748            })?;
749
750            let event_type = format!("{event:?}")
751                .split('{')
752                .next()
753                .unwrap_or("Unknown")
754                .trim()
755                .to_string();
756
757            events_payload.push(serde_json::json!({
758                "event_id": event.event_id(),
759                "event_type": event_type,
760                "event_data": event_json,
761            }));
762        }
763
764        let events_json = serde_json::Value::Array(events_payload);
765
766        sqlx::query(&format!(
767            "SELECT {}.append_history($1, $2, $3)",
768            self.schema_name
769        ))
770        .bind(instance)
771        .bind(execution_id as i64)
772        .bind(events_json)
773        .execute(&*self.pool)
774        .await
775        .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
776
777        debug!(
778            target = "duroxide::providers::postgres",
779            operation = "append_with_execution",
780            instance_id = %instance,
781            execution_id = execution_id,
782            event_count = new_events.len(),
783            "Appended history events via stored procedure"
784        );
785
786        Ok(())
787    }
788
789    #[instrument(skip(self), target = "duroxide::providers::postgres")]
790    async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
791        let work_item = serde_json::to_string(&item).map_err(|e| {
792            ProviderError::permanent(
793                "enqueue_worker_work",
794                format!("Failed to serialize work item: {e}"),
795            )
796        })?;
797
798        let now_ms = Self::now_millis();
799
800        // Extract activity identification, session_id, and tag for ActivityExecute items
801        let (instance_id, execution_id, activity_id, session_id, tag) = match &item {
802            WorkItem::ActivityExecute {
803                instance,
804                execution_id,
805                id,
806                session_id,
807                tag,
808                ..
809            } => (
810                Some(instance.clone()),
811                Some(*execution_id as i64),
812                Some(*id as i64),
813                session_id.clone(),
814                tag.clone(),
815            ),
816            _ => (None, None, None, None, None),
817        };
818
819        sqlx::query(&format!(
820            "SELECT {}.enqueue_worker_work($1, $2, $3, $4, $5, $6, $7)",
821            self.schema_name
822        ))
823        .bind(work_item)
824        .bind(now_ms)
825        .bind(&instance_id)
826        .bind(execution_id)
827        .bind(activity_id)
828        .bind(&session_id)
829        .bind(&tag)
830        .execute(&*self.pool)
831        .await
832        .map_err(|e| {
833            error!(
834                target = "duroxide::providers::postgres",
835                operation = "enqueue_worker_work",
836                error_type = "database_error",
837                error = %e,
838                "Failed to enqueue worker work"
839            );
840            Self::sqlx_to_provider_error("enqueue_worker_work", e)
841        })?;
842
843        Ok(())
844    }
845
846    #[instrument(skip(self), target = "duroxide::providers::postgres")]
847    async fn fetch_work_item(
848        &self,
849        lock_timeout: Duration,
850        _poll_timeout: Duration,
851        session: Option<&SessionFetchConfig>,
852        tag_filter: &TagFilter,
853    ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
854        // None filter means don't process any activities
855        if matches!(tag_filter, TagFilter::None) {
856            return Ok(None);
857        }
858
859        let start = std::time::Instant::now();
860
861        // Convert Duration to milliseconds
862        let lock_timeout_ms = lock_timeout.as_millis() as i64;
863
864        // Extract session parameters
865        let (owner_id, session_lock_timeout_ms): (Option<&str>, Option<i64>) = match session {
866            Some(config) => (
867                Some(&config.owner_id),
868                Some(config.lock_timeout.as_millis() as i64),
869            ),
870            None => (None, None),
871        };
872
873        // Convert TagFilter to SQL parameters
874        let (tag_mode, tag_names) = Self::tag_filter_to_sql(tag_filter);
875
876        let row = match sqlx::query_as::<_, (String, String, i32)>(&format!(
877            "SELECT * FROM {}.fetch_work_item($1, $2, $3, $4, $5, $6)",
878            self.schema_name
879        ))
880        .bind(Self::now_millis())
881        .bind(lock_timeout_ms)
882        .bind(owner_id)
883        .bind(session_lock_timeout_ms)
884        .bind(&tag_names)
885        .bind(tag_mode)
886        .fetch_optional(&*self.pool)
887        .await
888        {
889            Ok(row) => row,
890            Err(e) => {
891                return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
892            }
893        };
894
895        let (work_item_json, lock_token, attempt_count) = match row {
896            Some(row) => row,
897            None => return Ok(None),
898        };
899
900        let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
901            ProviderError::permanent(
902                "fetch_work_item",
903                format!("Failed to deserialize worker item: {e}"),
904            )
905        })?;
906
907        let duration_ms = start.elapsed().as_millis() as u64;
908
909        // Extract instance for logging - different work item types have different structures
910        let instance_id = match &work_item {
911            WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
912            WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
913            WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
914            WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
915            WorkItem::TimerFired { instance, .. } => instance.as_str(),
916            WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
917            WorkItem::CancelInstance { instance, .. } => instance.as_str(),
918            WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
919            WorkItem::SubOrchCompleted {
920                parent_instance, ..
921            } => parent_instance.as_str(),
922            WorkItem::SubOrchFailed {
923                parent_instance, ..
924            } => parent_instance.as_str(),
925            WorkItem::QueueMessage { instance, .. } => instance.as_str(),
926        };
927
928        debug!(
929            target = "duroxide::providers::postgres",
930            operation = "fetch_work_item",
931            instance_id = %instance_id,
932            attempt_count = attempt_count,
933            duration_ms = duration_ms,
934            "Fetched activity work item via stored procedure"
935        );
936
937        Ok(Some((work_item, lock_token, attempt_count as u32)))
938    }
939
940    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
941    async fn ack_work_item(
942        &self,
943        token: &str,
944        completion: Option<WorkItem>,
945    ) -> Result<(), ProviderError> {
946        let start = std::time::Instant::now();
947
948        // If no completion provided (e.g., cancelled activity), just delete the item
949        let Some(completion) = completion else {
950            let now_ms = Self::now_millis();
951            // Call ack_worker with NULL completion to delete without enqueueing
952            sqlx::query(&format!(
953                "SELECT {}.ack_worker($1, NULL, NULL, $2)",
954                self.schema_name
955            ))
956            .bind(token)
957            .bind(now_ms)
958            .execute(&*self.pool)
959            .await
960            .map_err(|e| {
961                if e.to_string().contains("Worker queue item not found") {
962                    ProviderError::permanent(
963                        "ack_worker",
964                        "Worker queue item not found or already processed",
965                    )
966                } else {
967                    Self::sqlx_to_provider_error("ack_worker", e)
968                }
969            })?;
970
971            let duration_ms = start.elapsed().as_millis() as u64;
972            debug!(
973                target = "duroxide::providers::postgres",
974                operation = "ack_worker",
975                token = %token,
976                duration_ms = duration_ms,
977                "Acknowledged worker without completion (cancelled)"
978            );
979            return Ok(());
980        };
981
982        // Extract instance ID from completion WorkItem
983        let instance_id = match &completion {
984            WorkItem::ActivityCompleted { instance, .. }
985            | WorkItem::ActivityFailed { instance, .. } => instance,
986            _ => {
987                error!(
988                    target = "duroxide::providers::postgres",
989                    operation = "ack_worker",
990                    error_type = "invalid_completion_type",
991                    "Invalid completion work item type"
992                );
993                return Err(ProviderError::permanent(
994                    "ack_worker",
995                    "Invalid completion work item type",
996                ));
997            }
998        };
999
1000        let completion_json = serde_json::to_string(&completion).map_err(|e| {
1001            ProviderError::permanent("ack_worker", format!("Failed to serialize completion: {e}"))
1002        })?;
1003
1004        let now_ms = Self::now_millis();
1005
1006        // Call stored procedure to atomically delete worker item and enqueue completion
1007        sqlx::query(&format!(
1008            "SELECT {}.ack_worker($1, $2, $3, $4)",
1009            self.schema_name
1010        ))
1011        .bind(token)
1012        .bind(instance_id)
1013        .bind(completion_json)
1014        .bind(now_ms)
1015        .execute(&*self.pool)
1016        .await
1017        .map_err(|e| {
1018            if e.to_string().contains("Worker queue item not found") {
1019                error!(
1020                    target = "duroxide::providers::postgres",
1021                    operation = "ack_worker",
1022                    error_type = "worker_item_not_found",
1023                    token = %token,
1024                    "Worker queue item not found or already processed"
1025                );
1026                ProviderError::permanent(
1027                    "ack_worker",
1028                    "Worker queue item not found or already processed",
1029                )
1030            } else {
1031                Self::sqlx_to_provider_error("ack_worker", e)
1032            }
1033        })?;
1034
1035        let duration_ms = start.elapsed().as_millis() as u64;
1036        debug!(
1037            target = "duroxide::providers::postgres",
1038            operation = "ack_worker",
1039            instance_id = %instance_id,
1040            duration_ms = duration_ms,
1041            "Acknowledged worker and enqueued completion"
1042        );
1043
1044        Ok(())
1045    }
1046
1047    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1048    async fn renew_work_item_lock(
1049        &self,
1050        token: &str,
1051        extend_for: Duration,
1052    ) -> Result<(), ProviderError> {
1053        let start = std::time::Instant::now();
1054
1055        // Get current time from application for consistent time reference
1056        let now_ms = Self::now_millis();
1057
1058        // Convert Duration to seconds for the stored procedure
1059        let extend_secs = extend_for.as_secs() as i64;
1060
1061        match sqlx::query(&format!(
1062            "SELECT {}.renew_work_item_lock($1, $2, $3)",
1063            self.schema_name
1064        ))
1065        .bind(token)
1066        .bind(now_ms)
1067        .bind(extend_secs)
1068        .execute(&*self.pool)
1069        .await
1070        {
1071            Ok(_) => {
1072                let duration_ms = start.elapsed().as_millis() as u64;
1073                debug!(
1074                    target = "duroxide::providers::postgres",
1075                    operation = "renew_work_item_lock",
1076                    token = %token,
1077                    extend_for_secs = extend_secs,
1078                    duration_ms = duration_ms,
1079                    "Work item lock renewed successfully"
1080                );
1081                Ok(())
1082            }
1083            Err(e) => {
1084                if let SqlxError::Database(db_err) = &e {
1085                    if db_err.message().contains("Lock token invalid") {
1086                        return Err(ProviderError::permanent(
1087                            "renew_work_item_lock",
1088                            "Lock token invalid, expired, or already acked",
1089                        ));
1090                    }
1091                } else if e.to_string().contains("Lock token invalid") {
1092                    return Err(ProviderError::permanent(
1093                        "renew_work_item_lock",
1094                        "Lock token invalid, expired, or already acked",
1095                    ));
1096                }
1097
1098                Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
1099            }
1100        }
1101    }
1102
1103    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1104    async fn abandon_work_item(
1105        &self,
1106        token: &str,
1107        delay: Option<Duration>,
1108        ignore_attempt: bool,
1109    ) -> Result<(), ProviderError> {
1110        let start = std::time::Instant::now();
1111        let now_ms = Self::now_millis();
1112        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
1113
1114        match sqlx::query(&format!(
1115            "SELECT {}.abandon_work_item($1, $2, $3, $4)",
1116            self.schema_name
1117        ))
1118        .bind(token)
1119        .bind(now_ms)
1120        .bind(delay_param)
1121        .bind(ignore_attempt)
1122        .execute(&*self.pool)
1123        .await
1124        {
1125            Ok(_) => {
1126                let duration_ms = start.elapsed().as_millis() as u64;
1127                debug!(
1128                    target = "duroxide::providers::postgres",
1129                    operation = "abandon_work_item",
1130                    token = %token,
1131                    delay_ms = delay.map(|d| d.as_millis() as u64),
1132                    ignore_attempt = ignore_attempt,
1133                    duration_ms = duration_ms,
1134                    "Abandoned work item via stored procedure"
1135                );
1136                Ok(())
1137            }
1138            Err(e) => {
1139                if let SqlxError::Database(db_err) = &e {
1140                    if db_err.message().contains("Invalid lock token")
1141                        || db_err.message().contains("already acked")
1142                    {
1143                        return Err(ProviderError::permanent(
1144                            "abandon_work_item",
1145                            "Invalid lock token or already acked",
1146                        ));
1147                    }
1148                } else if e.to_string().contains("Invalid lock token")
1149                    || e.to_string().contains("already acked")
1150                {
1151                    return Err(ProviderError::permanent(
1152                        "abandon_work_item",
1153                        "Invalid lock token or already acked",
1154                    ));
1155                }
1156
1157                Err(Self::sqlx_to_provider_error("abandon_work_item", e))
1158            }
1159        }
1160    }
1161
1162    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1163    async fn renew_orchestration_item_lock(
1164        &self,
1165        token: &str,
1166        extend_for: Duration,
1167    ) -> Result<(), ProviderError> {
1168        let start = std::time::Instant::now();
1169
1170        // Get current time from application for consistent time reference
1171        let now_ms = Self::now_millis();
1172
1173        // Convert Duration to seconds for the stored procedure
1174        let extend_secs = extend_for.as_secs() as i64;
1175
1176        match sqlx::query(&format!(
1177            "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
1178            self.schema_name
1179        ))
1180        .bind(token)
1181        .bind(now_ms)
1182        .bind(extend_secs)
1183        .execute(&*self.pool)
1184        .await
1185        {
1186            Ok(_) => {
1187                let duration_ms = start.elapsed().as_millis() as u64;
1188                debug!(
1189                    target = "duroxide::providers::postgres",
1190                    operation = "renew_orchestration_item_lock",
1191                    token = %token,
1192                    extend_for_secs = extend_secs,
1193                    duration_ms = duration_ms,
1194                    "Orchestration item lock renewed successfully"
1195                );
1196                Ok(())
1197            }
1198            Err(e) => {
1199                if let SqlxError::Database(db_err) = &e {
1200                    if db_err.message().contains("Lock token invalid")
1201                        || db_err.message().contains("expired")
1202                        || db_err.message().contains("already released")
1203                    {
1204                        return Err(ProviderError::permanent(
1205                            "renew_orchestration_item_lock",
1206                            "Lock token invalid, expired, or already released",
1207                        ));
1208                    }
1209                } else if e.to_string().contains("Lock token invalid")
1210                    || e.to_string().contains("expired")
1211                    || e.to_string().contains("already released")
1212                {
1213                    return Err(ProviderError::permanent(
1214                        "renew_orchestration_item_lock",
1215                        "Lock token invalid, expired, or already released",
1216                    ));
1217                }
1218
1219                Err(Self::sqlx_to_provider_error(
1220                    "renew_orchestration_item_lock",
1221                    e,
1222                ))
1223            }
1224        }
1225    }
1226
1227    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1228    async fn enqueue_for_orchestrator(
1229        &self,
1230        item: WorkItem,
1231        delay: Option<Duration>,
1232    ) -> Result<(), ProviderError> {
1233        let work_item = serde_json::to_string(&item).map_err(|e| {
1234            ProviderError::permanent(
1235                "enqueue_orchestrator_work",
1236                format!("Failed to serialize work item: {e}"),
1237            )
1238        })?;
1239
1240        // Extract instance ID from WorkItem enum
1241        let instance_id = match &item {
1242            WorkItem::StartOrchestration { instance, .. }
1243            | WorkItem::ActivityCompleted { instance, .. }
1244            | WorkItem::ActivityFailed { instance, .. }
1245            | WorkItem::TimerFired { instance, .. }
1246            | WorkItem::ExternalRaised { instance, .. }
1247            | WorkItem::CancelInstance { instance, .. }
1248            | WorkItem::ContinueAsNew { instance, .. }
1249            | WorkItem::QueueMessage { instance, .. } => instance,
1250            WorkItem::SubOrchCompleted {
1251                parent_instance, ..
1252            }
1253            | WorkItem::SubOrchFailed {
1254                parent_instance, ..
1255            } => parent_instance,
1256            WorkItem::ActivityExecute { .. } => {
1257                return Err(ProviderError::permanent(
1258                    "enqueue_orchestrator_work",
1259                    "ActivityExecute should go to worker queue, not orchestrator queue",
1260                ));
1261            }
1262        };
1263
1264        // Determine visible_at: use max of fire_at_ms (for TimerFired) and delay
1265        let now_ms = Self::now_millis();
1266
1267        let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
1268            if *fire_at_ms > 0 {
1269                // Take max of fire_at_ms and delay (if provided)
1270                if let Some(delay) = delay {
1271                    std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
1272                } else {
1273                    *fire_at_ms
1274                }
1275            } else {
1276                // fire_at_ms is 0, use delay or NOW()
1277                delay
1278                    .map(|d| now_ms as u64 + d.as_millis() as u64)
1279                    .unwrap_or(now_ms as u64)
1280            }
1281        } else {
1282            // Non-timer item: use delay or NOW()
1283            delay
1284                .map(|d| now_ms as u64 + d.as_millis() as u64)
1285                .unwrap_or(now_ms as u64)
1286        };
1287
1288        let visible_at = Utc
1289            .timestamp_millis_opt(visible_at_ms as i64)
1290            .single()
1291            .ok_or_else(|| {
1292                ProviderError::permanent(
1293                    "enqueue_orchestrator_work",
1294                    "Invalid visible_at timestamp",
1295                )
1296            })?;
1297
1298        // ⚠️ CRITICAL: DO NOT extract orchestration metadata - instance creation happens via ack_orchestration_item metadata
1299        // Pass NULL for orchestration_name, orchestration_version, execution_id parameters
1300
1301        // Call stored procedure to enqueue work
1302        sqlx::query(&format!(
1303            "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
1304            self.schema_name
1305        ))
1306        .bind(instance_id)
1307        .bind(&work_item)
1308        .bind(visible_at)
1309        .bind::<Option<String>>(None) // orchestration_name - NULL
1310        .bind::<Option<String>>(None) // orchestration_version - NULL
1311        .bind::<Option<i64>>(None) // execution_id - NULL
1312        .execute(&*self.pool)
1313        .await
1314        .map_err(|e| {
1315            error!(
1316                target = "duroxide::providers::postgres",
1317                operation = "enqueue_orchestrator_work",
1318                error_type = "database_error",
1319                error = %e,
1320                instance_id = %instance_id,
1321                "Failed to enqueue orchestrator work"
1322            );
1323            Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1324        })?;
1325
1326        debug!(
1327            target = "duroxide::providers::postgres",
1328            operation = "enqueue_orchestrator_work",
1329            instance_id = %instance_id,
1330            delay_ms = delay.map(|d| d.as_millis() as u64),
1331            "Enqueued orchestrator work"
1332        );
1333
1334        Ok(())
1335    }
1336
1337    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1338    async fn read_with_execution(
1339        &self,
1340        instance: &str,
1341        execution_id: u64,
1342    ) -> Result<Vec<Event>, ProviderError> {
1343        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1344            "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1345            self.table_name("history")
1346        ))
1347        .bind(instance)
1348        .bind(execution_id as i64)
1349        .fetch_all(&*self.pool)
1350        .await
1351        .ok()
1352        .unwrap_or_default();
1353
1354        Ok(event_data_rows
1355            .into_iter()
1356            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1357            .collect())
1358    }
1359
1360    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1361    async fn renew_session_lock(
1362        &self,
1363        owner_ids: &[&str],
1364        extend_for: Duration,
1365        idle_timeout: Duration,
1366    ) -> Result<usize, ProviderError> {
1367        if owner_ids.is_empty() {
1368            return Ok(0);
1369        }
1370
1371        let now_ms = Self::now_millis();
1372        let extend_ms = extend_for.as_millis() as i64;
1373        let idle_timeout_ms = idle_timeout.as_millis() as i64;
1374        let owner_ids_vec: Vec<&str> = owner_ids.to_vec();
1375
1376        let result = sqlx::query_scalar::<_, i64>(&format!(
1377            "SELECT {}.renew_session_lock($1, $2, $3, $4)",
1378            self.schema_name
1379        ))
1380        .bind(&owner_ids_vec)
1381        .bind(now_ms)
1382        .bind(extend_ms)
1383        .bind(idle_timeout_ms)
1384        .fetch_one(&*self.pool)
1385        .await
1386        .map_err(|e| Self::sqlx_to_provider_error("renew_session_lock", e))?;
1387
1388        debug!(
1389            target = "duroxide::providers::postgres",
1390            operation = "renew_session_lock",
1391            owner_count = owner_ids.len(),
1392            sessions_renewed = result,
1393            "Session locks renewed"
1394        );
1395
1396        Ok(result as usize)
1397    }
1398
1399    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1400    async fn cleanup_orphaned_sessions(
1401        &self,
1402        _idle_timeout: Duration,
1403    ) -> Result<usize, ProviderError> {
1404        let now_ms = Self::now_millis();
1405
1406        let result = sqlx::query_scalar::<_, i64>(&format!(
1407            "SELECT {}.cleanup_orphaned_sessions($1)",
1408            self.schema_name
1409        ))
1410        .bind(now_ms)
1411        .fetch_one(&*self.pool)
1412        .await
1413        .map_err(|e| Self::sqlx_to_provider_error("cleanup_orphaned_sessions", e))?;
1414
1415        debug!(
1416            target = "duroxide::providers::postgres",
1417            operation = "cleanup_orphaned_sessions",
1418            sessions_cleaned = result,
1419            "Orphaned sessions cleaned up"
1420        );
1421
1422        Ok(result as usize)
1423    }
1424
1425    fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1426        Some(self)
1427    }
1428
1429    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1430    async fn get_custom_status(
1431        &self,
1432        instance: &str,
1433        last_seen_version: u64,
1434    ) -> Result<Option<(Option<String>, u64)>, ProviderError> {
1435        let row = sqlx::query_as::<_, (Option<String>, i64)>(&format!(
1436            "SELECT * FROM {}.get_custom_status($1, $2)",
1437            self.schema_name
1438        ))
1439        .bind(instance)
1440        .bind(last_seen_version as i64)
1441        .fetch_optional(&*self.pool)
1442        .await
1443        .map_err(|e| Self::sqlx_to_provider_error("get_custom_status", e))?;
1444
1445        match row {
1446            Some((custom_status, version)) => Ok(Some((custom_status, version as u64))),
1447            None => Ok(None),
1448        }
1449    }
1450
1451    async fn get_kv_value(
1452        &self,
1453        instance_id: &str,
1454        key: &str,
1455    ) -> Result<Option<String>, ProviderError> {
1456        let query = format!(
1457            "SELECT value FROM {}.kv_store WHERE instance_id = $1 AND key = $2",
1458            self.schema_name
1459        );
1460        let result: Option<(String,)> = sqlx::query_as(&query)
1461            .bind(instance_id)
1462            .bind(key)
1463            .fetch_optional(&*self.pool)
1464            .await
1465            .map_err(|e| ProviderError::retryable("get_kv_value", format!("get_kv_value: {e}")))?;
1466        Ok(result.map(|(v,)| v))
1467    }
1468
1469    async fn get_kv_all_values(
1470        &self,
1471        instance_id: &str,
1472    ) -> Result<std::collections::HashMap<String, String>, ProviderError> {
1473        let query = format!(
1474            "SELECT key, value FROM {}.kv_store WHERE instance_id = $1",
1475            self.schema_name
1476        );
1477        let rows: Vec<(String, String)> = sqlx::query_as(&query)
1478            .bind(instance_id)
1479            .fetch_all(&*self.pool)
1480            .await
1481            .map_err(|e| Self::sqlx_to_provider_error("get_kv_all_values", e))?;
1482        Ok(rows.into_iter().collect())
1483    }
1484}
1485
1486#[async_trait::async_trait]
1487impl ProviderAdmin for PostgresProvider {
1488    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1489    async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1490        sqlx::query_scalar(&format!(
1491            "SELECT instance_id FROM {}.list_instances()",
1492            self.schema_name
1493        ))
1494        .fetch_all(&*self.pool)
1495        .await
1496        .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1497    }
1498
1499    #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1500    async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1501        sqlx::query_scalar(&format!(
1502            "SELECT instance_id FROM {}.list_instances_by_status($1)",
1503            self.schema_name
1504        ))
1505        .bind(status)
1506        .fetch_all(&*self.pool)
1507        .await
1508        .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1509    }
1510
1511    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1512    async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1513        let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1514            "SELECT execution_id FROM {}.list_executions($1)",
1515            self.schema_name
1516        ))
1517        .bind(instance)
1518        .fetch_all(&*self.pool)
1519        .await
1520        .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1521
1522        Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1523    }
1524
1525    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1526    async fn read_history_with_execution_id(
1527        &self,
1528        instance: &str,
1529        execution_id: u64,
1530    ) -> Result<Vec<Event>, ProviderError> {
1531        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1532            "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1533            self.schema_name
1534        ))
1535        .bind(instance)
1536        .bind(execution_id as i64)
1537        .fetch_all(&*self.pool)
1538        .await
1539        .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1540
1541        event_data_rows
1542            .into_iter()
1543            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1544            .collect::<Vec<Event>>()
1545            .into_iter()
1546            .map(Ok)
1547            .collect()
1548    }
1549
1550    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1551    async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1552        let execution_id = self.latest_execution_id(instance).await?;
1553        self.read_history_with_execution_id(instance, execution_id)
1554            .await
1555    }
1556
1557    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1558    async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1559        sqlx::query_scalar(&format!(
1560            "SELECT {}.latest_execution_id($1)",
1561            self.schema_name
1562        ))
1563        .bind(instance)
1564        .fetch_optional(&*self.pool)
1565        .await
1566        .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1567        .map(|id: i64| id as u64)
1568        .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1569    }
1570
1571    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1572    async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1573        let row: Option<(
1574            String,
1575            String,
1576            String,
1577            i64,
1578            chrono::DateTime<Utc>,
1579            Option<chrono::DateTime<Utc>>,
1580            Option<String>,
1581            Option<String>,
1582            Option<String>,
1583        )> = sqlx::query_as(&format!(
1584            "SELECT * FROM {}.get_instance_info($1)",
1585            self.schema_name
1586        ))
1587        .bind(instance)
1588        .fetch_optional(&*self.pool)
1589        .await
1590        .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1591
1592        let (
1593            instance_id,
1594            orchestration_name,
1595            orchestration_version,
1596            current_execution_id,
1597            created_at,
1598            updated_at,
1599            status,
1600            output,
1601            parent_instance_id,
1602        ) =
1603            row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1604
1605        Ok(InstanceInfo {
1606            instance_id,
1607            orchestration_name,
1608            orchestration_version,
1609            current_execution_id: current_execution_id as u64,
1610            status: status.unwrap_or_else(|| "Running".to_string()),
1611            output,
1612            created_at: created_at.timestamp_millis() as u64,
1613            updated_at: updated_at
1614                .map(|dt| dt.timestamp_millis() as u64)
1615                .unwrap_or(created_at.timestamp_millis() as u64),
1616            parent_instance_id,
1617        })
1618    }
1619
1620    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1621    async fn get_execution_info(
1622        &self,
1623        instance: &str,
1624        execution_id: u64,
1625    ) -> Result<ExecutionInfo, ProviderError> {
1626        let row: Option<(
1627            i64,
1628            String,
1629            Option<String>,
1630            chrono::DateTime<Utc>,
1631            Option<chrono::DateTime<Utc>>,
1632            i64,
1633        )> = sqlx::query_as(&format!(
1634            "SELECT * FROM {}.get_execution_info($1, $2)",
1635            self.schema_name
1636        ))
1637        .bind(instance)
1638        .bind(execution_id as i64)
1639        .fetch_optional(&*self.pool)
1640        .await
1641        .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1642
1643        let (exec_id, status, output, started_at, completed_at, event_count) = row
1644            .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1645
1646        Ok(ExecutionInfo {
1647            execution_id: exec_id as u64,
1648            status,
1649            output,
1650            started_at: started_at.timestamp_millis() as u64,
1651            completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1652            event_count: event_count as usize,
1653        })
1654    }
1655
1656    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1657    async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1658        let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1659            "SELECT * FROM {}.get_system_metrics()",
1660            self.schema_name
1661        ))
1662        .fetch_optional(&*self.pool)
1663        .await
1664        .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1665
1666        let (
1667            total_instances,
1668            total_executions,
1669            running_instances,
1670            completed_instances,
1671            failed_instances,
1672            total_events,
1673        ) = row.ok_or_else(|| {
1674            ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1675        })?;
1676
1677        Ok(SystemMetrics {
1678            total_instances: total_instances as u64,
1679            total_executions: total_executions as u64,
1680            running_instances: running_instances as u64,
1681            completed_instances: completed_instances as u64,
1682            failed_instances: failed_instances as u64,
1683            total_events: total_events as u64,
1684        })
1685    }
1686
1687    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1688    async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1689        let now_ms = Self::now_millis();
1690
1691        let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1692            "SELECT * FROM {}.get_queue_depths($1)",
1693            self.schema_name
1694        ))
1695        .bind(now_ms)
1696        .fetch_optional(&*self.pool)
1697        .await
1698        .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1699
1700        let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1701            ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1702        })?;
1703
1704        Ok(QueueDepths {
1705            orchestrator_queue: orchestrator_queue as usize,
1706            worker_queue: worker_queue as usize,
1707            timer_queue: 0, // Timers are in orchestrator queue with delayed visibility
1708        })
1709    }
1710
1711    // ===== Hierarchy Primitive Operations =====
1712
1713    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1714    async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError> {
1715        sqlx::query_scalar(&format!(
1716            "SELECT child_instance_id FROM {}.list_children($1)",
1717            self.schema_name
1718        ))
1719        .bind(instance_id)
1720        .fetch_all(&*self.pool)
1721        .await
1722        .map_err(|e| Self::sqlx_to_provider_error("list_children", e))
1723    }
1724
1725    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1726    async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError> {
1727        // The stored procedure raises an exception if instance doesn't exist
1728        // Otherwise returns the parent_instance_id (which may be NULL)
1729        let result: Result<Option<String>, _> =
1730            sqlx::query_scalar(&format!("SELECT {}.get_parent_id($1)", self.schema_name))
1731                .bind(instance_id)
1732                .fetch_one(&*self.pool)
1733                .await;
1734
1735        match result {
1736            Ok(parent_id) => Ok(parent_id),
1737            Err(e) => {
1738                let err_str = e.to_string();
1739                if err_str.contains("Instance not found") {
1740                    Err(ProviderError::permanent(
1741                        "get_parent_id",
1742                        format!("Instance not found: {}", instance_id),
1743                    ))
1744                } else {
1745                    Err(Self::sqlx_to_provider_error("get_parent_id", e))
1746                }
1747            }
1748        }
1749    }
1750
1751    // ===== Deletion Operations =====
1752
1753    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1754    async fn delete_instances_atomic(
1755        &self,
1756        ids: &[String],
1757        force: bool,
1758    ) -> Result<DeleteInstanceResult, ProviderError> {
1759        if ids.is_empty() {
1760            return Ok(DeleteInstanceResult::default());
1761        }
1762
1763        let row: Option<(i64, i64, i64, i64)> = sqlx::query_as(&format!(
1764            "SELECT * FROM {}.delete_instances_atomic($1, $2)",
1765            self.schema_name
1766        ))
1767        .bind(ids)
1768        .bind(force)
1769        .fetch_optional(&*self.pool)
1770        .await
1771        .map_err(|e| {
1772            let err_str = e.to_string();
1773            if err_str.contains("is Running") {
1774                ProviderError::permanent("delete_instances_atomic", err_str)
1775            } else if err_str.contains("Orphan detected") {
1776                ProviderError::permanent("delete_instances_atomic", err_str)
1777            } else {
1778                Self::sqlx_to_provider_error("delete_instances_atomic", e)
1779            }
1780        })?;
1781
1782        let (instances_deleted, executions_deleted, events_deleted, queue_messages_deleted) =
1783            row.unwrap_or((0, 0, 0, 0));
1784
1785        debug!(
1786            target = "duroxide::providers::postgres",
1787            operation = "delete_instances_atomic",
1788            instances_deleted = instances_deleted,
1789            executions_deleted = executions_deleted,
1790            events_deleted = events_deleted,
1791            queue_messages_deleted = queue_messages_deleted,
1792            "Deleted instances atomically"
1793        );
1794
1795        Ok(DeleteInstanceResult {
1796            instances_deleted: instances_deleted as u64,
1797            executions_deleted: executions_deleted as u64,
1798            events_deleted: events_deleted as u64,
1799            queue_messages_deleted: queue_messages_deleted as u64,
1800        })
1801    }
1802
1803    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1804    async fn delete_instance_bulk(
1805        &self,
1806        filter: InstanceFilter,
1807    ) -> Result<DeleteInstanceResult, ProviderError> {
1808        // Build query to find matching root instances in terminal states
1809        let mut sql = format!(
1810            r#"
1811            SELECT i.instance_id
1812            FROM {}.instances i
1813            LEFT JOIN {}.executions e ON i.instance_id = e.instance_id 
1814              AND i.current_execution_id = e.execution_id
1815            WHERE i.parent_instance_id IS NULL
1816              AND e.status IN ('Completed', 'Failed', 'ContinuedAsNew')
1817            "#,
1818            self.schema_name, self.schema_name
1819        );
1820
1821        // Add instance_ids filter if provided
1822        if let Some(ref ids) = filter.instance_ids {
1823            if ids.is_empty() {
1824                return Ok(DeleteInstanceResult::default());
1825            }
1826            let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1827            sql.push_str(&format!(
1828                " AND i.instance_id IN ({})",
1829                placeholders.join(", ")
1830            ));
1831        }
1832
1833        // Add completed_before filter if provided
1834        if filter.completed_before.is_some() {
1835            let param_num = filter
1836                .instance_ids
1837                .as_ref()
1838                .map(|ids| ids.len())
1839                .unwrap_or(0)
1840                + 1;
1841            sql.push_str(&format!(
1842                " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1843                param_num
1844            ));
1845        }
1846
1847        // Add limit
1848        let limit = filter.limit.unwrap_or(1000);
1849        let limit_param_num = filter
1850            .instance_ids
1851            .as_ref()
1852            .map(|ids| ids.len())
1853            .unwrap_or(0)
1854            + if filter.completed_before.is_some() {
1855                1
1856            } else {
1857                0
1858            }
1859            + 1;
1860        sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1861
1862        // Build and execute query
1863        let mut query = sqlx::query_scalar::<_, String>(&sql);
1864        if let Some(ref ids) = filter.instance_ids {
1865            for id in ids {
1866                query = query.bind(id);
1867            }
1868        }
1869        if let Some(completed_before) = filter.completed_before {
1870            query = query.bind(completed_before as i64);
1871        }
1872        query = query.bind(limit as i64);
1873
1874        let instance_ids: Vec<String> = query
1875            .fetch_all(&*self.pool)
1876            .await
1877            .map_err(|e| Self::sqlx_to_provider_error("delete_instance_bulk", e))?;
1878
1879        if instance_ids.is_empty() {
1880            return Ok(DeleteInstanceResult::default());
1881        }
1882
1883        // Delete each instance with cascade
1884        let mut result = DeleteInstanceResult::default();
1885
1886        for instance_id in &instance_ids {
1887            // Get full tree for this root
1888            let tree = self.get_instance_tree(instance_id).await?;
1889
1890            // Atomic delete (tree.all_ids is already in deletion order: children first)
1891            let delete_result = self.delete_instances_atomic(&tree.all_ids, true).await?;
1892            result.instances_deleted += delete_result.instances_deleted;
1893            result.executions_deleted += delete_result.executions_deleted;
1894            result.events_deleted += delete_result.events_deleted;
1895            result.queue_messages_deleted += delete_result.queue_messages_deleted;
1896        }
1897
1898        debug!(
1899            target = "duroxide::providers::postgres",
1900            operation = "delete_instance_bulk",
1901            instances_deleted = result.instances_deleted,
1902            executions_deleted = result.executions_deleted,
1903            events_deleted = result.events_deleted,
1904            queue_messages_deleted = result.queue_messages_deleted,
1905            "Bulk deleted instances"
1906        );
1907
1908        Ok(result)
1909    }
1910
1911    // ===== Pruning Operations =====
1912
1913    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1914    async fn prune_executions(
1915        &self,
1916        instance_id: &str,
1917        options: PruneOptions,
1918    ) -> Result<PruneResult, ProviderError> {
1919        let keep_last: Option<i32> = options.keep_last.map(|v| v as i32);
1920        let completed_before_ms: Option<i64> = options.completed_before.map(|v| v as i64);
1921
1922        let row: Option<(i64, i64, i64)> = sqlx::query_as(&format!(
1923            "SELECT * FROM {}.prune_executions($1, $2, $3)",
1924            self.schema_name
1925        ))
1926        .bind(instance_id)
1927        .bind(keep_last)
1928        .bind(completed_before_ms)
1929        .fetch_optional(&*self.pool)
1930        .await
1931        .map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?;
1932
1933        let (instances_processed, executions_deleted, events_deleted) = row.unwrap_or((0, 0, 0));
1934
1935        debug!(
1936            target = "duroxide::providers::postgres",
1937            operation = "prune_executions",
1938            instance_id = %instance_id,
1939            instances_processed = instances_processed,
1940            executions_deleted = executions_deleted,
1941            events_deleted = events_deleted,
1942            "Pruned executions"
1943        );
1944
1945        Ok(PruneResult {
1946            instances_processed: instances_processed as u64,
1947            executions_deleted: executions_deleted as u64,
1948            events_deleted: events_deleted as u64,
1949        })
1950    }
1951
1952    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1953    async fn prune_executions_bulk(
1954        &self,
1955        filter: InstanceFilter,
1956        options: PruneOptions,
1957    ) -> Result<PruneResult, ProviderError> {
1958        // Find matching instances (all statuses - prune_executions protects current execution)
1959        // Note: We include Running instances because long-running orchestrations (e.g., with
1960        // ContinueAsNew) may have old executions that need pruning. The underlying prune_executions
1961        // call safely skips the current execution regardless of its status.
1962        let mut sql = format!(
1963            r#"
1964            SELECT i.instance_id
1965            FROM {}.instances i
1966            LEFT JOIN {}.executions e ON i.instance_id = e.instance_id 
1967              AND i.current_execution_id = e.execution_id
1968            WHERE 1=1
1969            "#,
1970            self.schema_name, self.schema_name
1971        );
1972
1973        // Add instance_ids filter if provided
1974        if let Some(ref ids) = filter.instance_ids {
1975            if ids.is_empty() {
1976                return Ok(PruneResult::default());
1977            }
1978            let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1979            sql.push_str(&format!(
1980                " AND i.instance_id IN ({})",
1981                placeholders.join(", ")
1982            ));
1983        }
1984
1985        // Add completed_before filter if provided
1986        if filter.completed_before.is_some() {
1987            let param_num = filter
1988                .instance_ids
1989                .as_ref()
1990                .map(|ids| ids.len())
1991                .unwrap_or(0)
1992                + 1;
1993            sql.push_str(&format!(
1994                " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1995                param_num
1996            ));
1997        }
1998
1999        // Add limit
2000        let limit = filter.limit.unwrap_or(1000);
2001        let limit_param_num = filter
2002            .instance_ids
2003            .as_ref()
2004            .map(|ids| ids.len())
2005            .unwrap_or(0)
2006            + if filter.completed_before.is_some() {
2007                1
2008            } else {
2009                0
2010            }
2011            + 1;
2012        sql.push_str(&format!(" LIMIT ${}", limit_param_num));
2013
2014        // Build and execute query
2015        let mut query = sqlx::query_scalar::<_, String>(&sql);
2016        if let Some(ref ids) = filter.instance_ids {
2017            for id in ids {
2018                query = query.bind(id);
2019            }
2020        }
2021        if let Some(completed_before) = filter.completed_before {
2022            query = query.bind(completed_before as i64);
2023        }
2024        query = query.bind(limit as i64);
2025
2026        let instance_ids: Vec<String> = query
2027            .fetch_all(&*self.pool)
2028            .await
2029            .map_err(|e| Self::sqlx_to_provider_error("prune_executions_bulk", e))?;
2030
2031        // Prune each instance
2032        let mut result = PruneResult::default();
2033
2034        for instance_id in &instance_ids {
2035            let single_result = self.prune_executions(instance_id, options.clone()).await?;
2036            result.instances_processed += single_result.instances_processed;
2037            result.executions_deleted += single_result.executions_deleted;
2038            result.events_deleted += single_result.events_deleted;
2039        }
2040
2041        debug!(
2042            target = "duroxide::providers::postgres",
2043            operation = "prune_executions_bulk",
2044            instances_processed = result.instances_processed,
2045            executions_deleted = result.executions_deleted,
2046            events_deleted = result.events_deleted,
2047            "Bulk pruned executions"
2048        );
2049
2050        Ok(result)
2051    }
2052}