Skip to main content

duroxide_pg/
provider.rs

1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4    DeleteInstanceResult, DispatcherCapabilityFilter, ExecutionInfo, ExecutionMetadata,
5    InstanceFilter, InstanceInfo, OrchestrationItem, Provider, ProviderAdmin, ProviderError,
6    PruneOptions, PruneResult, QueueDepths, ScheduledActivityIdentifier, SessionFetchConfig,
7    SystemMetrics, TagFilter, WorkItem,
8};
9use duroxide::{Event, EventKind};
10use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
11use std::sync::Arc;
12use std::time::Duration;
13use std::time::{SystemTime, UNIX_EPOCH};
14use tokio::time::sleep;
15use tracing::{debug, error, instrument, warn};
16
17use crate::migrations::MigrationRunner;
18
19/// PostgreSQL-based provider for Duroxide durable orchestrations.
20///
21/// Implements the [`Provider`] and [`ProviderAdmin`] traits from Duroxide,
22/// storing orchestration state, history, and work queues in PostgreSQL.
23///
24/// # Example
25///
26/// ```rust,no_run
27/// use duroxide_pg::PostgresProvider;
28///
29/// # async fn example() -> anyhow::Result<()> {
30/// // Connect using DATABASE_URL or explicit connection string
31/// let provider = PostgresProvider::new("postgres://localhost/mydb").await?;
32///
33/// // Or use a custom schema for isolation
34/// let provider = PostgresProvider::new_with_schema(
35///     "postgres://localhost/mydb",
36///     Some("my_app"),
37/// ).await?;
38/// # Ok(())
39/// # }
40/// ```
41pub struct PostgresProvider {
42    pool: Arc<PgPool>,
43    schema_name: String,
44}
45
46impl PostgresProvider {
47    pub async fn new(database_url: &str) -> Result<Self> {
48        Self::new_with_schema(database_url, None).await
49    }
50
51    pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
52        let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
53            .ok()
54            .and_then(|s| s.parse::<u32>().ok())
55            .unwrap_or(10);
56
57        let pool = PgPoolOptions::new()
58            .max_connections(max_connections)
59            .min_connections(1)
60            .acquire_timeout(std::time::Duration::from_secs(30))
61            .connect(database_url)
62            .await?;
63
64        let schema_name = schema_name.unwrap_or("public").to_string();
65
66        let provider = Self {
67            pool: Arc::new(pool),
68            schema_name: schema_name.clone(),
69        };
70
71        // Run migrations to initialize schema
72        let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
73        migration_runner.migrate().await?;
74
75        Ok(provider)
76    }
77
78    #[instrument(skip(self), target = "duroxide::providers::postgres")]
79    pub async fn initialize_schema(&self) -> Result<()> {
80        // Schema initialization is now handled by migrations
81        // This method is kept for backward compatibility but delegates to migrations
82        let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
83        migration_runner.migrate().await?;
84        Ok(())
85    }
86
87    /// Get current timestamp in milliseconds (Unix epoch)
88    fn now_millis() -> i64 {
89        SystemTime::now()
90            .duration_since(UNIX_EPOCH)
91            .unwrap()
92            .as_millis() as i64
93    }
94
95    /// Get schema-qualified table name
96    fn table_name(&self, table: &str) -> String {
97        format!("{}.{}", self.schema_name, table)
98    }
99
100    /// Get the database pool (for testing)
101    pub fn pool(&self) -> &PgPool {
102        &self.pool
103    }
104
105    /// Get the schema name (for testing)
106    pub fn schema_name(&self) -> &str {
107        &self.schema_name
108    }
109
110    /// Convert sqlx::Error to ProviderError with proper classification
111    fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
112        match e {
113            SqlxError::Database(ref db_err) => {
114                // PostgreSQL error codes
115                let code_opt = db_err.code();
116                let code = code_opt.as_deref();
117                if code == Some("40P01") {
118                    // Deadlock detected
119                    ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
120                } else if code == Some("40001") {
121                    // Serialization failure - permanent error (transaction conflict, not transient)
122                    ProviderError::permanent(operation, format!("Serialization failure: {e}"))
123                } else if code == Some("23505") {
124                    // Unique constraint violation (duplicate event)
125                    ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
126                } else if code == Some("23503") {
127                    // Foreign key constraint violation
128                    ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
129                } else {
130                    ProviderError::permanent(operation, format!("Database error: {e}"))
131                }
132            }
133            SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
134                ProviderError::retryable(operation, format!("Connection pool error: {e}"))
135            }
136            SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
137            _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
138        }
139    }
140
141    /// Convert TagFilter to SQL parameters (mode string + tag array)
142    fn tag_filter_to_sql(filter: &TagFilter) -> (&'static str, Vec<String>) {
143        match filter {
144            TagFilter::DefaultOnly => ("default_only", vec![]),
145            TagFilter::Tags(set) => {
146                let mut tags: Vec<String> = set.iter().cloned().collect();
147                tags.sort();
148                ("tags", tags)
149            }
150            TagFilter::DefaultAnd(set) => {
151                let mut tags: Vec<String> = set.iter().cloned().collect();
152                tags.sort();
153                ("default_and", tags)
154            }
155            TagFilter::Any => ("any", vec![]),
156            TagFilter::None => ("none", vec![]),
157        }
158    }
159
160    /// Clean up schema after tests (drops all tables and optionally the schema)
161    ///
162    /// **SAFETY**: Never drops the "public" schema itself, only tables within it.
163    /// Only drops the schema if it's a custom schema (not "public").
164    pub async fn cleanup_schema(&self) -> Result<()> {
165        const MAX_RETRIES: u32 = 5;
166        const BASE_RETRY_DELAY_MS: u64 = 50;
167
168        for attempt in 0..=MAX_RETRIES {
169            let cleanup_result = async {
170                // Call the stored procedure to drop all tables
171                sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
172                    .execute(&*self.pool)
173                    .await?;
174
175                // SAFETY: Never drop the "public" schema - it's a PostgreSQL system schema
176                // Only drop custom schemas created for testing
177                if self.schema_name != "public" {
178                    sqlx::query(&format!(
179                        "DROP SCHEMA IF EXISTS {} CASCADE",
180                        self.schema_name
181                    ))
182                    .execute(&*self.pool)
183                    .await?;
184                } else {
185                    // Explicit safeguard: we only drop tables from public schema, never the schema itself
186                    // This ensures we don't accidentally drop the default PostgreSQL schema
187                }
188
189                Ok::<(), SqlxError>(())
190            }
191            .await;
192
193            match cleanup_result {
194                Ok(()) => return Ok(()),
195                Err(SqlxError::Database(db_err)) if db_err.code().as_deref() == Some("40P01") => {
196                    if attempt < MAX_RETRIES {
197                        warn!(
198                            target = "duroxide::providers::postgres",
199                            schema = %self.schema_name,
200                            attempt = attempt + 1,
201                            "Deadlock during cleanup_schema, retrying"
202                        );
203                        sleep(Duration::from_millis(
204                            BASE_RETRY_DELAY_MS * (attempt as u64 + 1),
205                        ))
206                        .await;
207                        continue;
208                    }
209                    return Err(anyhow::anyhow!(db_err.to_string()));
210                }
211                Err(e) => return Err(anyhow::anyhow!(e.to_string())),
212            }
213        }
214
215        Ok(())
216    }
217}
218
219#[async_trait::async_trait]
220impl Provider for PostgresProvider {
221    fn name(&self) -> &str {
222        "duroxide-pg"
223    }
224
225    fn version(&self) -> &str {
226        env!("CARGO_PKG_VERSION")
227    }
228
229    #[instrument(skip(self), target = "duroxide::providers::postgres")]
230    async fn fetch_orchestration_item(
231        &self,
232        lock_timeout: Duration,
233        _poll_timeout: Duration,
234        filter: Option<&DispatcherCapabilityFilter>,
235    ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
236        let start = std::time::Instant::now();
237
238        const MAX_RETRIES: u32 = 3;
239        const RETRY_DELAY_MS: u64 = 50;
240
241        // Convert Duration to milliseconds
242        let lock_timeout_ms = lock_timeout.as_millis() as i64;
243        let mut _last_error: Option<ProviderError> = None;
244
245        // Extract version filter from capability filter
246        let (min_packed, max_packed) = if let Some(f) = filter {
247            if let Some(range) = f.supported_duroxide_versions.first() {
248                let min = range.min.major as i64 * 1_000_000
249                    + range.min.minor as i64 * 1_000
250                    + range.min.patch as i64;
251                let max = range.max.major as i64 * 1_000_000
252                    + range.max.minor as i64 * 1_000
253                    + range.max.patch as i64;
254                (Some(min), Some(max))
255            } else {
256                // Empty supported_duroxide_versions = supports nothing
257                return Ok(None);
258            }
259        } else {
260            (None, None)
261        };
262
263        for attempt in 0..=MAX_RETRIES {
264            let now_ms = Self::now_millis();
265
266            let result: Result<
267                Option<(
268                    String,
269                    String,
270                    String,
271                    i64,
272                    serde_json::Value,
273                    serde_json::Value,
274                    String,
275                    i32,
276                    serde_json::Value,
277                )>,
278                SqlxError,
279            > = sqlx::query_as(&format!(
280                "SELECT * FROM {}.fetch_orchestration_item($1, $2, $3, $4)",
281                self.schema_name
282            ))
283            .bind(now_ms)
284            .bind(lock_timeout_ms)
285            .bind(min_packed)
286            .bind(max_packed)
287            .fetch_optional(&*self.pool)
288            .await;
289
290            let row = match result {
291                Ok(r) => r,
292                Err(e) => {
293                    let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
294                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
295                        warn!(
296                            target = "duroxide::providers::postgres",
297                            operation = "fetch_orchestration_item",
298                            attempt = attempt + 1,
299                            error = %provider_err,
300                            "Retryable error, will retry"
301                        );
302                        _last_error = Some(provider_err);
303                        sleep(std::time::Duration::from_millis(
304                            RETRY_DELAY_MS * (attempt as u64 + 1),
305                        ))
306                        .await;
307                        continue;
308                    }
309                    return Err(provider_err);
310                }
311            };
312
313            if let Some((
314                instance_id,
315                orchestration_name,
316                orchestration_version,
317                execution_id,
318                history_json,
319                messages_json,
320                lock_token,
321                attempt_count,
322                kv_snapshot_json,
323            )) = row
324            {
325                let (history, history_error) =
326                    match serde_json::from_value::<Vec<Event>>(history_json) {
327                        Ok(h) => (h, None),
328                        Err(e) => {
329                            let error_msg = format!("Failed to deserialize history: {e}");
330                            warn!(
331                                target = "duroxide::providers::postgres",
332                                instance = %instance_id,
333                                error = %error_msg,
334                                "History deserialization failed, returning item with history_error"
335                            );
336                            (vec![], Some(error_msg))
337                        }
338                    };
339
340                let messages: Vec<WorkItem> =
341                    serde_json::from_value(messages_json).map_err(|e| {
342                        ProviderError::permanent(
343                            "fetch_orchestration_item",
344                            format!("Failed to deserialize messages: {e}"),
345                        )
346                    })?;
347                let kv_snapshot: std::collections::HashMap<String, String> =
348                    serde_json::from_value(kv_snapshot_json).unwrap_or_default();
349
350                let duration_ms = start.elapsed().as_millis() as u64;
351                debug!(
352                    target = "duroxide::providers::postgres",
353                    operation = "fetch_orchestration_item",
354                    instance_id = %instance_id,
355                    execution_id = execution_id,
356                    message_count = messages.len(),
357                    history_count = history.len(),
358                    attempt_count = attempt_count,
359                    duration_ms = duration_ms,
360                    attempts = attempt + 1,
361                    "Fetched orchestration item via stored procedure"
362                );
363
364                // Orphan queue messages: if orchestration_name is "Unknown", there's
365                // no history, and ALL messages are QueueMessage items, these are orphan
366                // events enqueued before the orchestration started. Drop them by acking
367                // with empty deltas. Other work items (CancelInstance, etc.) may
368                // legitimately race with StartOrchestration and must not be dropped.
369                if orchestration_name == "Unknown"
370                    && history.is_empty()
371                    && messages
372                        .iter()
373                        .all(|m| matches!(m, WorkItem::QueueMessage { .. }))
374                {
375                    let message_count = messages.len();
376                    tracing::warn!(
377                        target = "duroxide::providers::postgres",
378                        instance = %instance_id,
379                        message_count,
380                        "Dropping orphan queue messages — events enqueued before orchestration started are not supported"
381                    );
382                    self.ack_orchestration_item(
383                        &lock_token,
384                        execution_id as u64,
385                        vec![],
386                        vec![],
387                        vec![],
388                        ExecutionMetadata::default(),
389                        vec![],
390                    )
391                    .await?;
392                    return Ok(None);
393                }
394
395                return Ok(Some((
396                    OrchestrationItem {
397                        instance: instance_id,
398                        orchestration_name,
399                        execution_id: execution_id as u64,
400                        version: orchestration_version,
401                        history,
402                        messages,
403                        history_error,
404                        kv_snapshot,
405                    },
406                    lock_token,
407                    attempt_count as u32,
408                )));
409            }
410
411            // No result found - return immediately (short polling behavior)
412            // Only retry with delay on retryable errors (handled above)
413            return Ok(None);
414        }
415
416        Ok(None)
417    }
418    #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
419    async fn ack_orchestration_item(
420        &self,
421        lock_token: &str,
422        execution_id: u64,
423        history_delta: Vec<Event>,
424        worker_items: Vec<WorkItem>,
425        orchestrator_items: Vec<WorkItem>,
426        metadata: ExecutionMetadata,
427        cancelled_activities: Vec<ScheduledActivityIdentifier>,
428    ) -> Result<(), ProviderError> {
429        let start = std::time::Instant::now();
430
431        const MAX_RETRIES: u32 = 3;
432        const RETRY_DELAY_MS: u64 = 50;
433
434        let mut history_delta_payload = Vec::with_capacity(history_delta.len());
435        for event in &history_delta {
436            if event.event_id() == 0 {
437                return Err(ProviderError::permanent(
438                    "ack_orchestration_item",
439                    "event_id must be set by runtime",
440                ));
441            }
442
443            let event_json = serde_json::to_string(event).map_err(|e| {
444                ProviderError::permanent(
445                    "ack_orchestration_item",
446                    format!("Failed to serialize event: {e}"),
447                )
448            })?;
449
450            let event_type = format!("{event:?}")
451                .split('{')
452                .next()
453                .unwrap_or("Unknown")
454                .trim()
455                .to_string();
456
457            history_delta_payload.push(serde_json::json!({
458                "event_id": event.event_id(),
459                "event_type": event_type,
460                "event_data": event_json,
461            }));
462        }
463
464        let history_delta_json = serde_json::Value::Array(history_delta_payload);
465
466        let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
467            ProviderError::permanent(
468                "ack_orchestration_item",
469                format!("Failed to serialize worker items: {e}"),
470            )
471        })?;
472
473        let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
474            ProviderError::permanent(
475                "ack_orchestration_item",
476                format!("Failed to serialize orchestrator items: {e}"),
477            )
478        })?;
479
480        // Scan history_delta for the last CustomStatusUpdated event
481        let (custom_status_action, custom_status_value): (Option<&str>, Option<&str>) = {
482            let mut last_status: Option<&Option<String>> = None;
483            for event in &history_delta {
484                if let EventKind::CustomStatusUpdated { ref status } = event.kind {
485                    last_status = Some(status);
486                }
487            }
488            match last_status {
489                Some(Some(s)) => (Some("set"), Some(s.as_str())),
490                Some(None) => (Some("clear"), None),
491                None => (None, None),
492            }
493        };
494
495        let kv_mutations: Vec<serde_json::Value> = history_delta
496            .iter()
497            .filter_map(|event| match &event.kind {
498                EventKind::KeyValueSet { key, value } => Some(serde_json::json!({
499                    "action": "set",
500                    "key": key,
501                    "value": value,
502                })),
503                EventKind::KeyValueCleared { key } => Some(serde_json::json!({
504                    "action": "clear_key",
505                    "key": key,
506                })),
507                EventKind::KeyValuesCleared => Some(serde_json::json!({
508                    "action": "clear_all",
509                })),
510                _ => None,
511            })
512            .collect();
513
514        let metadata_json = serde_json::json!({
515            "orchestration_name": metadata.orchestration_name,
516            "orchestration_version": metadata.orchestration_version,
517            "status": metadata.status,
518            "output": metadata.output,
519            "parent_instance_id": metadata.parent_instance_id,
520            "pinned_duroxide_version": metadata.pinned_duroxide_version.as_ref().map(|v| {
521                serde_json::json!({
522                    "major": v.major,
523                    "minor": v.minor,
524                    "patch": v.patch,
525                })
526            }),
527            "custom_status_action": custom_status_action,
528            "custom_status_value": custom_status_value,
529            "kv_mutations": kv_mutations,
530        });
531
532        // Serialize cancelled activities for lock stealing
533        let cancelled_activities_json: Vec<serde_json::Value> = cancelled_activities
534            .iter()
535            .map(|a| {
536                serde_json::json!({
537                    "instance": a.instance,
538                    "execution_id": a.execution_id,
539                    "activity_id": a.activity_id,
540                })
541            })
542            .collect();
543        let cancelled_activities_json = serde_json::Value::Array(cancelled_activities_json);
544
545        for attempt in 0..=MAX_RETRIES {
546            let now_ms = Self::now_millis();
547            let result = sqlx::query(&format!(
548                "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6, $7, $8)",
549                self.schema_name
550            ))
551            .bind(lock_token)
552            .bind(now_ms)
553            .bind(execution_id as i64)
554            .bind(&history_delta_json)
555            .bind(&worker_items_json)
556            .bind(&orchestrator_items_json)
557            .bind(&metadata_json)
558            .bind(&cancelled_activities_json)
559            .execute(&*self.pool)
560            .await;
561
562            match result {
563                Ok(_) => {
564                    let duration_ms = start.elapsed().as_millis() as u64;
565                    debug!(
566                        target = "duroxide::providers::postgres",
567                        operation = "ack_orchestration_item",
568                        execution_id = execution_id,
569                        history_count = history_delta.len(),
570                        worker_items_count = worker_items.len(),
571                        orchestrator_items_count = orchestrator_items.len(),
572                        cancelled_activities_count = cancelled_activities.len(),
573                        duration_ms = duration_ms,
574                        attempts = attempt + 1,
575                        "Acknowledged orchestration item via stored procedure"
576                    );
577                    return Ok(());
578                }
579                Err(e) => {
580                    // Check for permanent errors first
581                    if let SqlxError::Database(db_err) = &e {
582                        if db_err.message().contains("Invalid lock token") {
583                            return Err(ProviderError::permanent(
584                                "ack_orchestration_item",
585                                "Invalid lock token",
586                            ));
587                        }
588                    } else if e.to_string().contains("Invalid lock token") {
589                        return Err(ProviderError::permanent(
590                            "ack_orchestration_item",
591                            "Invalid lock token",
592                        ));
593                    }
594
595                    let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
596                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
597                        warn!(
598                            target = "duroxide::providers::postgres",
599                            operation = "ack_orchestration_item",
600                            attempt = attempt + 1,
601                            error = %provider_err,
602                            "Retryable error, will retry"
603                        );
604                        sleep(std::time::Duration::from_millis(
605                            RETRY_DELAY_MS * (attempt as u64 + 1),
606                        ))
607                        .await;
608                        continue;
609                    }
610                    return Err(provider_err);
611                }
612            }
613        }
614
615        // Should never reach here, but just in case
616        Ok(())
617    }
618    #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
619    async fn abandon_orchestration_item(
620        &self,
621        lock_token: &str,
622        delay: Option<Duration>,
623        ignore_attempt: bool,
624    ) -> Result<(), ProviderError> {
625        let start = std::time::Instant::now();
626        let now_ms = Self::now_millis();
627        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
628
629        let instance_id = match sqlx::query_scalar::<_, String>(&format!(
630            "SELECT {}.abandon_orchestration_item($1, $2, $3, $4)",
631            self.schema_name
632        ))
633        .bind(lock_token)
634        .bind(now_ms)
635        .bind(delay_param)
636        .bind(ignore_attempt)
637        .fetch_one(&*self.pool)
638        .await
639        {
640            Ok(instance_id) => instance_id,
641            Err(e) => {
642                if let SqlxError::Database(db_err) = &e {
643                    if db_err.message().contains("Invalid lock token") {
644                        return Err(ProviderError::permanent(
645                            "abandon_orchestration_item",
646                            "Invalid lock token",
647                        ));
648                    }
649                } else if e.to_string().contains("Invalid lock token") {
650                    return Err(ProviderError::permanent(
651                        "abandon_orchestration_item",
652                        "Invalid lock token",
653                    ));
654                }
655
656                return Err(Self::sqlx_to_provider_error(
657                    "abandon_orchestration_item",
658                    e,
659                ));
660            }
661        };
662
663        let duration_ms = start.elapsed().as_millis() as u64;
664        debug!(
665            target = "duroxide::providers::postgres",
666            operation = "abandon_orchestration_item",
667            instance_id = %instance_id,
668            delay_ms = delay.map(|d| d.as_millis() as u64),
669            ignore_attempt = ignore_attempt,
670            duration_ms = duration_ms,
671            "Abandoned orchestration item via stored procedure"
672        );
673
674        Ok(())
675    }
676
677    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
678    async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
679        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
680            "SELECT out_event_data FROM {}.fetch_history($1)",
681            self.schema_name
682        ))
683        .bind(instance)
684        .fetch_all(&*self.pool)
685        .await
686        .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
687
688        Ok(event_data_rows
689            .into_iter()
690            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
691            .collect())
692    }
693
694    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
695    async fn append_with_execution(
696        &self,
697        instance: &str,
698        execution_id: u64,
699        new_events: Vec<Event>,
700    ) -> Result<(), ProviderError> {
701        if new_events.is_empty() {
702            return Ok(());
703        }
704
705        let mut events_payload = Vec::with_capacity(new_events.len());
706        for event in &new_events {
707            if event.event_id() == 0 {
708                error!(
709                    target = "duroxide::providers::postgres",
710                    operation = "append_with_execution",
711                    error_type = "validation_error",
712                    instance_id = %instance,
713                    execution_id = execution_id,
714                    "event_id must be set by runtime"
715                );
716                return Err(ProviderError::permanent(
717                    "append_with_execution",
718                    "event_id must be set by runtime",
719                ));
720            }
721
722            let event_json = serde_json::to_string(event).map_err(|e| {
723                ProviderError::permanent(
724                    "append_with_execution",
725                    format!("Failed to serialize event: {e}"),
726                )
727            })?;
728
729            let event_type = format!("{event:?}")
730                .split('{')
731                .next()
732                .unwrap_or("Unknown")
733                .trim()
734                .to_string();
735
736            events_payload.push(serde_json::json!({
737                "event_id": event.event_id(),
738                "event_type": event_type,
739                "event_data": event_json,
740            }));
741        }
742
743        let events_json = serde_json::Value::Array(events_payload);
744
745        sqlx::query(&format!(
746            "SELECT {}.append_history($1, $2, $3)",
747            self.schema_name
748        ))
749        .bind(instance)
750        .bind(execution_id as i64)
751        .bind(events_json)
752        .execute(&*self.pool)
753        .await
754        .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
755
756        debug!(
757            target = "duroxide::providers::postgres",
758            operation = "append_with_execution",
759            instance_id = %instance,
760            execution_id = execution_id,
761            event_count = new_events.len(),
762            "Appended history events via stored procedure"
763        );
764
765        Ok(())
766    }
767
768    #[instrument(skip(self), target = "duroxide::providers::postgres")]
769    async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
770        let work_item = serde_json::to_string(&item).map_err(|e| {
771            ProviderError::permanent(
772                "enqueue_worker_work",
773                format!("Failed to serialize work item: {e}"),
774            )
775        })?;
776
777        let now_ms = Self::now_millis();
778
779        // Extract activity identification, session_id, and tag for ActivityExecute items
780        let (instance_id, execution_id, activity_id, session_id, tag) = match &item {
781            WorkItem::ActivityExecute {
782                instance,
783                execution_id,
784                id,
785                session_id,
786                tag,
787                ..
788            } => (
789                Some(instance.clone()),
790                Some(*execution_id as i64),
791                Some(*id as i64),
792                session_id.clone(),
793                tag.clone(),
794            ),
795            _ => (None, None, None, None, None),
796        };
797
798        sqlx::query(&format!(
799            "SELECT {}.enqueue_worker_work($1, $2, $3, $4, $5, $6, $7)",
800            self.schema_name
801        ))
802        .bind(work_item)
803        .bind(now_ms)
804        .bind(&instance_id)
805        .bind(execution_id)
806        .bind(activity_id)
807        .bind(&session_id)
808        .bind(&tag)
809        .execute(&*self.pool)
810        .await
811        .map_err(|e| {
812            error!(
813                target = "duroxide::providers::postgres",
814                operation = "enqueue_worker_work",
815                error_type = "database_error",
816                error = %e,
817                "Failed to enqueue worker work"
818            );
819            Self::sqlx_to_provider_error("enqueue_worker_work", e)
820        })?;
821
822        Ok(())
823    }
824
825    #[instrument(skip(self), target = "duroxide::providers::postgres")]
826    async fn fetch_work_item(
827        &self,
828        lock_timeout: Duration,
829        _poll_timeout: Duration,
830        session: Option<&SessionFetchConfig>,
831        tag_filter: &TagFilter,
832    ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
833        // None filter means don't process any activities
834        if matches!(tag_filter, TagFilter::None) {
835            return Ok(None);
836        }
837
838        let start = std::time::Instant::now();
839
840        // Convert Duration to milliseconds
841        let lock_timeout_ms = lock_timeout.as_millis() as i64;
842
843        // Extract session parameters
844        let (owner_id, session_lock_timeout_ms): (Option<&str>, Option<i64>) = match session {
845            Some(config) => (
846                Some(&config.owner_id),
847                Some(config.lock_timeout.as_millis() as i64),
848            ),
849            None => (None, None),
850        };
851
852        // Convert TagFilter to SQL parameters
853        let (tag_mode, tag_names) = Self::tag_filter_to_sql(tag_filter);
854
855        let row = match sqlx::query_as::<_, (String, String, i32)>(&format!(
856            "SELECT * FROM {}.fetch_work_item($1, $2, $3, $4, $5, $6)",
857            self.schema_name
858        ))
859        .bind(Self::now_millis())
860        .bind(lock_timeout_ms)
861        .bind(owner_id)
862        .bind(session_lock_timeout_ms)
863        .bind(&tag_names)
864        .bind(tag_mode)
865        .fetch_optional(&*self.pool)
866        .await
867        {
868            Ok(row) => row,
869            Err(e) => {
870                return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
871            }
872        };
873
874        let (work_item_json, lock_token, attempt_count) = match row {
875            Some(row) => row,
876            None => return Ok(None),
877        };
878
879        let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
880            ProviderError::permanent(
881                "fetch_work_item",
882                format!("Failed to deserialize worker item: {e}"),
883            )
884        })?;
885
886        let duration_ms = start.elapsed().as_millis() as u64;
887
888        // Extract instance for logging - different work item types have different structures
889        let instance_id = match &work_item {
890            WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
891            WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
892            WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
893            WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
894            WorkItem::TimerFired { instance, .. } => instance.as_str(),
895            WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
896            WorkItem::CancelInstance { instance, .. } => instance.as_str(),
897            WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
898            WorkItem::SubOrchCompleted {
899                parent_instance, ..
900            } => parent_instance.as_str(),
901            WorkItem::SubOrchFailed {
902                parent_instance, ..
903            } => parent_instance.as_str(),
904            WorkItem::QueueMessage { instance, .. } => instance.as_str(),
905        };
906
907        debug!(
908            target = "duroxide::providers::postgres",
909            operation = "fetch_work_item",
910            instance_id = %instance_id,
911            attempt_count = attempt_count,
912            duration_ms = duration_ms,
913            "Fetched activity work item via stored procedure"
914        );
915
916        Ok(Some((work_item, lock_token, attempt_count as u32)))
917    }
918
919    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
920    async fn ack_work_item(
921        &self,
922        token: &str,
923        completion: Option<WorkItem>,
924    ) -> Result<(), ProviderError> {
925        let start = std::time::Instant::now();
926
927        // If no completion provided (e.g., cancelled activity), just delete the item
928        let Some(completion) = completion else {
929            let now_ms = Self::now_millis();
930            // Call ack_worker with NULL completion to delete without enqueueing
931            sqlx::query(&format!(
932                "SELECT {}.ack_worker($1, NULL, NULL, $2)",
933                self.schema_name
934            ))
935            .bind(token)
936            .bind(now_ms)
937            .execute(&*self.pool)
938            .await
939            .map_err(|e| {
940                if e.to_string().contains("Worker queue item not found") {
941                    ProviderError::permanent(
942                        "ack_worker",
943                        "Worker queue item not found or already processed",
944                    )
945                } else {
946                    Self::sqlx_to_provider_error("ack_worker", e)
947                }
948            })?;
949
950            let duration_ms = start.elapsed().as_millis() as u64;
951            debug!(
952                target = "duroxide::providers::postgres",
953                operation = "ack_worker",
954                token = %token,
955                duration_ms = duration_ms,
956                "Acknowledged worker without completion (cancelled)"
957            );
958            return Ok(());
959        };
960
961        // Extract instance ID from completion WorkItem
962        let instance_id = match &completion {
963            WorkItem::ActivityCompleted { instance, .. }
964            | WorkItem::ActivityFailed { instance, .. } => instance,
965            _ => {
966                error!(
967                    target = "duroxide::providers::postgres",
968                    operation = "ack_worker",
969                    error_type = "invalid_completion_type",
970                    "Invalid completion work item type"
971                );
972                return Err(ProviderError::permanent(
973                    "ack_worker",
974                    "Invalid completion work item type",
975                ));
976            }
977        };
978
979        let completion_json = serde_json::to_string(&completion).map_err(|e| {
980            ProviderError::permanent("ack_worker", format!("Failed to serialize completion: {e}"))
981        })?;
982
983        let now_ms = Self::now_millis();
984
985        // Call stored procedure to atomically delete worker item and enqueue completion
986        sqlx::query(&format!(
987            "SELECT {}.ack_worker($1, $2, $3, $4)",
988            self.schema_name
989        ))
990        .bind(token)
991        .bind(instance_id)
992        .bind(completion_json)
993        .bind(now_ms)
994        .execute(&*self.pool)
995        .await
996        .map_err(|e| {
997            if e.to_string().contains("Worker queue item not found") {
998                error!(
999                    target = "duroxide::providers::postgres",
1000                    operation = "ack_worker",
1001                    error_type = "worker_item_not_found",
1002                    token = %token,
1003                    "Worker queue item not found or already processed"
1004                );
1005                ProviderError::permanent(
1006                    "ack_worker",
1007                    "Worker queue item not found or already processed",
1008                )
1009            } else {
1010                Self::sqlx_to_provider_error("ack_worker", e)
1011            }
1012        })?;
1013
1014        let duration_ms = start.elapsed().as_millis() as u64;
1015        debug!(
1016            target = "duroxide::providers::postgres",
1017            operation = "ack_worker",
1018            instance_id = %instance_id,
1019            duration_ms = duration_ms,
1020            "Acknowledged worker and enqueued completion"
1021        );
1022
1023        Ok(())
1024    }
1025
1026    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1027    async fn renew_work_item_lock(
1028        &self,
1029        token: &str,
1030        extend_for: Duration,
1031    ) -> Result<(), ProviderError> {
1032        let start = std::time::Instant::now();
1033
1034        // Get current time from application for consistent time reference
1035        let now_ms = Self::now_millis();
1036
1037        // Convert Duration to seconds for the stored procedure
1038        let extend_secs = extend_for.as_secs() as i64;
1039
1040        match sqlx::query(&format!(
1041            "SELECT {}.renew_work_item_lock($1, $2, $3)",
1042            self.schema_name
1043        ))
1044        .bind(token)
1045        .bind(now_ms)
1046        .bind(extend_secs)
1047        .execute(&*self.pool)
1048        .await
1049        {
1050            Ok(_) => {
1051                let duration_ms = start.elapsed().as_millis() as u64;
1052                debug!(
1053                    target = "duroxide::providers::postgres",
1054                    operation = "renew_work_item_lock",
1055                    token = %token,
1056                    extend_for_secs = extend_secs,
1057                    duration_ms = duration_ms,
1058                    "Work item lock renewed successfully"
1059                );
1060                Ok(())
1061            }
1062            Err(e) => {
1063                if let SqlxError::Database(db_err) = &e {
1064                    if db_err.message().contains("Lock token invalid") {
1065                        return Err(ProviderError::permanent(
1066                            "renew_work_item_lock",
1067                            "Lock token invalid, expired, or already acked",
1068                        ));
1069                    }
1070                } else if e.to_string().contains("Lock token invalid") {
1071                    return Err(ProviderError::permanent(
1072                        "renew_work_item_lock",
1073                        "Lock token invalid, expired, or already acked",
1074                    ));
1075                }
1076
1077                Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
1078            }
1079        }
1080    }
1081
1082    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1083    async fn abandon_work_item(
1084        &self,
1085        token: &str,
1086        delay: Option<Duration>,
1087        ignore_attempt: bool,
1088    ) -> Result<(), ProviderError> {
1089        let start = std::time::Instant::now();
1090        let now_ms = Self::now_millis();
1091        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
1092
1093        match sqlx::query(&format!(
1094            "SELECT {}.abandon_work_item($1, $2, $3, $4)",
1095            self.schema_name
1096        ))
1097        .bind(token)
1098        .bind(now_ms)
1099        .bind(delay_param)
1100        .bind(ignore_attempt)
1101        .execute(&*self.pool)
1102        .await
1103        {
1104            Ok(_) => {
1105                let duration_ms = start.elapsed().as_millis() as u64;
1106                debug!(
1107                    target = "duroxide::providers::postgres",
1108                    operation = "abandon_work_item",
1109                    token = %token,
1110                    delay_ms = delay.map(|d| d.as_millis() as u64),
1111                    ignore_attempt = ignore_attempt,
1112                    duration_ms = duration_ms,
1113                    "Abandoned work item via stored procedure"
1114                );
1115                Ok(())
1116            }
1117            Err(e) => {
1118                if let SqlxError::Database(db_err) = &e {
1119                    if db_err.message().contains("Invalid lock token")
1120                        || db_err.message().contains("already acked")
1121                    {
1122                        return Err(ProviderError::permanent(
1123                            "abandon_work_item",
1124                            "Invalid lock token or already acked",
1125                        ));
1126                    }
1127                } else if e.to_string().contains("Invalid lock token")
1128                    || e.to_string().contains("already acked")
1129                {
1130                    return Err(ProviderError::permanent(
1131                        "abandon_work_item",
1132                        "Invalid lock token or already acked",
1133                    ));
1134                }
1135
1136                Err(Self::sqlx_to_provider_error("abandon_work_item", e))
1137            }
1138        }
1139    }
1140
1141    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1142    async fn renew_orchestration_item_lock(
1143        &self,
1144        token: &str,
1145        extend_for: Duration,
1146    ) -> Result<(), ProviderError> {
1147        let start = std::time::Instant::now();
1148
1149        // Get current time from application for consistent time reference
1150        let now_ms = Self::now_millis();
1151
1152        // Convert Duration to seconds for the stored procedure
1153        let extend_secs = extend_for.as_secs() as i64;
1154
1155        match sqlx::query(&format!(
1156            "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
1157            self.schema_name
1158        ))
1159        .bind(token)
1160        .bind(now_ms)
1161        .bind(extend_secs)
1162        .execute(&*self.pool)
1163        .await
1164        {
1165            Ok(_) => {
1166                let duration_ms = start.elapsed().as_millis() as u64;
1167                debug!(
1168                    target = "duroxide::providers::postgres",
1169                    operation = "renew_orchestration_item_lock",
1170                    token = %token,
1171                    extend_for_secs = extend_secs,
1172                    duration_ms = duration_ms,
1173                    "Orchestration item lock renewed successfully"
1174                );
1175                Ok(())
1176            }
1177            Err(e) => {
1178                if let SqlxError::Database(db_err) = &e {
1179                    if db_err.message().contains("Lock token invalid")
1180                        || db_err.message().contains("expired")
1181                        || db_err.message().contains("already released")
1182                    {
1183                        return Err(ProviderError::permanent(
1184                            "renew_orchestration_item_lock",
1185                            "Lock token invalid, expired, or already released",
1186                        ));
1187                    }
1188                } else if e.to_string().contains("Lock token invalid")
1189                    || e.to_string().contains("expired")
1190                    || e.to_string().contains("already released")
1191                {
1192                    return Err(ProviderError::permanent(
1193                        "renew_orchestration_item_lock",
1194                        "Lock token invalid, expired, or already released",
1195                    ));
1196                }
1197
1198                Err(Self::sqlx_to_provider_error(
1199                    "renew_orchestration_item_lock",
1200                    e,
1201                ))
1202            }
1203        }
1204    }
1205
1206    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1207    async fn enqueue_for_orchestrator(
1208        &self,
1209        item: WorkItem,
1210        delay: Option<Duration>,
1211    ) -> Result<(), ProviderError> {
1212        let work_item = serde_json::to_string(&item).map_err(|e| {
1213            ProviderError::permanent(
1214                "enqueue_orchestrator_work",
1215                format!("Failed to serialize work item: {e}"),
1216            )
1217        })?;
1218
1219        // Extract instance ID from WorkItem enum
1220        let instance_id = match &item {
1221            WorkItem::StartOrchestration { instance, .. }
1222            | WorkItem::ActivityCompleted { instance, .. }
1223            | WorkItem::ActivityFailed { instance, .. }
1224            | WorkItem::TimerFired { instance, .. }
1225            | WorkItem::ExternalRaised { instance, .. }
1226            | WorkItem::CancelInstance { instance, .. }
1227            | WorkItem::ContinueAsNew { instance, .. }
1228            | WorkItem::QueueMessage { instance, .. } => instance,
1229            WorkItem::SubOrchCompleted {
1230                parent_instance, ..
1231            }
1232            | WorkItem::SubOrchFailed {
1233                parent_instance, ..
1234            } => parent_instance,
1235            WorkItem::ActivityExecute { .. } => {
1236                return Err(ProviderError::permanent(
1237                    "enqueue_orchestrator_work",
1238                    "ActivityExecute should go to worker queue, not orchestrator queue",
1239                ));
1240            }
1241        };
1242
1243        // Determine visible_at: use max of fire_at_ms (for TimerFired) and delay
1244        let now_ms = Self::now_millis();
1245
1246        let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
1247            if *fire_at_ms > 0 {
1248                // Take max of fire_at_ms and delay (if provided)
1249                if let Some(delay) = delay {
1250                    std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
1251                } else {
1252                    *fire_at_ms
1253                }
1254            } else {
1255                // fire_at_ms is 0, use delay or NOW()
1256                delay
1257                    .map(|d| now_ms as u64 + d.as_millis() as u64)
1258                    .unwrap_or(now_ms as u64)
1259            }
1260        } else {
1261            // Non-timer item: use delay or NOW()
1262            delay
1263                .map(|d| now_ms as u64 + d.as_millis() as u64)
1264                .unwrap_or(now_ms as u64)
1265        };
1266
1267        let visible_at = Utc
1268            .timestamp_millis_opt(visible_at_ms as i64)
1269            .single()
1270            .ok_or_else(|| {
1271                ProviderError::permanent(
1272                    "enqueue_orchestrator_work",
1273                    "Invalid visible_at timestamp",
1274                )
1275            })?;
1276
1277        // ⚠️ CRITICAL: DO NOT extract orchestration metadata - instance creation happens via ack_orchestration_item metadata
1278        // Pass NULL for orchestration_name, orchestration_version, execution_id parameters
1279
1280        // Call stored procedure to enqueue work
1281        sqlx::query(&format!(
1282            "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
1283            self.schema_name
1284        ))
1285        .bind(instance_id)
1286        .bind(&work_item)
1287        .bind(visible_at)
1288        .bind::<Option<String>>(None) // orchestration_name - NULL
1289        .bind::<Option<String>>(None) // orchestration_version - NULL
1290        .bind::<Option<i64>>(None) // execution_id - NULL
1291        .execute(&*self.pool)
1292        .await
1293        .map_err(|e| {
1294            error!(
1295                target = "duroxide::providers::postgres",
1296                operation = "enqueue_orchestrator_work",
1297                error_type = "database_error",
1298                error = %e,
1299                instance_id = %instance_id,
1300                "Failed to enqueue orchestrator work"
1301            );
1302            Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1303        })?;
1304
1305        debug!(
1306            target = "duroxide::providers::postgres",
1307            operation = "enqueue_orchestrator_work",
1308            instance_id = %instance_id,
1309            delay_ms = delay.map(|d| d.as_millis() as u64),
1310            "Enqueued orchestrator work"
1311        );
1312
1313        Ok(())
1314    }
1315
1316    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1317    async fn read_with_execution(
1318        &self,
1319        instance: &str,
1320        execution_id: u64,
1321    ) -> Result<Vec<Event>, ProviderError> {
1322        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1323            "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1324            self.table_name("history")
1325        ))
1326        .bind(instance)
1327        .bind(execution_id as i64)
1328        .fetch_all(&*self.pool)
1329        .await
1330        .ok()
1331        .unwrap_or_default();
1332
1333        Ok(event_data_rows
1334            .into_iter()
1335            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1336            .collect())
1337    }
1338
1339    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1340    async fn renew_session_lock(
1341        &self,
1342        owner_ids: &[&str],
1343        extend_for: Duration,
1344        idle_timeout: Duration,
1345    ) -> Result<usize, ProviderError> {
1346        if owner_ids.is_empty() {
1347            return Ok(0);
1348        }
1349
1350        let now_ms = Self::now_millis();
1351        let extend_ms = extend_for.as_millis() as i64;
1352        let idle_timeout_ms = idle_timeout.as_millis() as i64;
1353        let owner_ids_vec: Vec<&str> = owner_ids.to_vec();
1354
1355        let result = sqlx::query_scalar::<_, i64>(&format!(
1356            "SELECT {}.renew_session_lock($1, $2, $3, $4)",
1357            self.schema_name
1358        ))
1359        .bind(&owner_ids_vec)
1360        .bind(now_ms)
1361        .bind(extend_ms)
1362        .bind(idle_timeout_ms)
1363        .fetch_one(&*self.pool)
1364        .await
1365        .map_err(|e| Self::sqlx_to_provider_error("renew_session_lock", e))?;
1366
1367        debug!(
1368            target = "duroxide::providers::postgres",
1369            operation = "renew_session_lock",
1370            owner_count = owner_ids.len(),
1371            sessions_renewed = result,
1372            "Session locks renewed"
1373        );
1374
1375        Ok(result as usize)
1376    }
1377
1378    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1379    async fn cleanup_orphaned_sessions(
1380        &self,
1381        _idle_timeout: Duration,
1382    ) -> Result<usize, ProviderError> {
1383        let now_ms = Self::now_millis();
1384
1385        let result = sqlx::query_scalar::<_, i64>(&format!(
1386            "SELECT {}.cleanup_orphaned_sessions($1)",
1387            self.schema_name
1388        ))
1389        .bind(now_ms)
1390        .fetch_one(&*self.pool)
1391        .await
1392        .map_err(|e| Self::sqlx_to_provider_error("cleanup_orphaned_sessions", e))?;
1393
1394        debug!(
1395            target = "duroxide::providers::postgres",
1396            operation = "cleanup_orphaned_sessions",
1397            sessions_cleaned = result,
1398            "Orphaned sessions cleaned up"
1399        );
1400
1401        Ok(result as usize)
1402    }
1403
1404    fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1405        Some(self)
1406    }
1407
1408    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1409    async fn get_custom_status(
1410        &self,
1411        instance: &str,
1412        last_seen_version: u64,
1413    ) -> Result<Option<(Option<String>, u64)>, ProviderError> {
1414        let row = sqlx::query_as::<_, (Option<String>, i64)>(&format!(
1415            "SELECT * FROM {}.get_custom_status($1, $2)",
1416            self.schema_name
1417        ))
1418        .bind(instance)
1419        .bind(last_seen_version as i64)
1420        .fetch_optional(&*self.pool)
1421        .await
1422        .map_err(|e| Self::sqlx_to_provider_error("get_custom_status", e))?;
1423
1424        match row {
1425            Some((custom_status, version)) => Ok(Some((custom_status, version as u64))),
1426            None => Ok(None),
1427        }
1428    }
1429
1430    async fn get_kv_value(
1431        &self,
1432        instance_id: &str,
1433        key: &str,
1434    ) -> Result<Option<String>, ProviderError> {
1435        let query = format!(
1436            "SELECT value FROM {}.kv_store WHERE instance_id = $1 AND key = $2",
1437            self.schema_name
1438        );
1439        let result: Option<(String,)> = sqlx::query_as(&query)
1440            .bind(instance_id)
1441            .bind(key)
1442            .fetch_optional(&*self.pool)
1443            .await
1444            .map_err(|e| ProviderError::retryable("get_kv_value", format!("get_kv_value: {e}")))?;
1445        Ok(result.map(|(v,)| v))
1446    }
1447}
1448
1449#[async_trait::async_trait]
1450impl ProviderAdmin for PostgresProvider {
1451    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1452    async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1453        sqlx::query_scalar(&format!(
1454            "SELECT instance_id FROM {}.list_instances()",
1455            self.schema_name
1456        ))
1457        .fetch_all(&*self.pool)
1458        .await
1459        .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1460    }
1461
1462    #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1463    async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1464        sqlx::query_scalar(&format!(
1465            "SELECT instance_id FROM {}.list_instances_by_status($1)",
1466            self.schema_name
1467        ))
1468        .bind(status)
1469        .fetch_all(&*self.pool)
1470        .await
1471        .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1472    }
1473
1474    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1475    async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1476        let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1477            "SELECT execution_id FROM {}.list_executions($1)",
1478            self.schema_name
1479        ))
1480        .bind(instance)
1481        .fetch_all(&*self.pool)
1482        .await
1483        .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1484
1485        Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1486    }
1487
1488    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1489    async fn read_history_with_execution_id(
1490        &self,
1491        instance: &str,
1492        execution_id: u64,
1493    ) -> Result<Vec<Event>, ProviderError> {
1494        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1495            "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1496            self.schema_name
1497        ))
1498        .bind(instance)
1499        .bind(execution_id as i64)
1500        .fetch_all(&*self.pool)
1501        .await
1502        .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1503
1504        event_data_rows
1505            .into_iter()
1506            .filter_map(|event_data| serde_json::from_str::<Event>(&event_data).ok())
1507            .collect::<Vec<Event>>()
1508            .into_iter()
1509            .map(Ok)
1510            .collect()
1511    }
1512
1513    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1514    async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1515        let execution_id = self.latest_execution_id(instance).await?;
1516        self.read_history_with_execution_id(instance, execution_id)
1517            .await
1518    }
1519
1520    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1521    async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1522        sqlx::query_scalar(&format!(
1523            "SELECT {}.latest_execution_id($1)",
1524            self.schema_name
1525        ))
1526        .bind(instance)
1527        .fetch_optional(&*self.pool)
1528        .await
1529        .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1530        .map(|id: i64| id as u64)
1531        .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1532    }
1533
1534    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1535    async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1536        let row: Option<(
1537            String,
1538            String,
1539            String,
1540            i64,
1541            chrono::DateTime<Utc>,
1542            Option<chrono::DateTime<Utc>>,
1543            Option<String>,
1544            Option<String>,
1545            Option<String>,
1546        )> = sqlx::query_as(&format!(
1547            "SELECT * FROM {}.get_instance_info($1)",
1548            self.schema_name
1549        ))
1550        .bind(instance)
1551        .fetch_optional(&*self.pool)
1552        .await
1553        .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1554
1555        let (
1556            instance_id,
1557            orchestration_name,
1558            orchestration_version,
1559            current_execution_id,
1560            created_at,
1561            updated_at,
1562            status,
1563            output,
1564            parent_instance_id,
1565        ) =
1566            row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1567
1568        Ok(InstanceInfo {
1569            instance_id,
1570            orchestration_name,
1571            orchestration_version,
1572            current_execution_id: current_execution_id as u64,
1573            status: status.unwrap_or_else(|| "Running".to_string()),
1574            output,
1575            created_at: created_at.timestamp_millis() as u64,
1576            updated_at: updated_at
1577                .map(|dt| dt.timestamp_millis() as u64)
1578                .unwrap_or(created_at.timestamp_millis() as u64),
1579            parent_instance_id,
1580        })
1581    }
1582
1583    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1584    async fn get_execution_info(
1585        &self,
1586        instance: &str,
1587        execution_id: u64,
1588    ) -> Result<ExecutionInfo, ProviderError> {
1589        let row: Option<(
1590            i64,
1591            String,
1592            Option<String>,
1593            chrono::DateTime<Utc>,
1594            Option<chrono::DateTime<Utc>>,
1595            i64,
1596        )> = sqlx::query_as(&format!(
1597            "SELECT * FROM {}.get_execution_info($1, $2)",
1598            self.schema_name
1599        ))
1600        .bind(instance)
1601        .bind(execution_id as i64)
1602        .fetch_optional(&*self.pool)
1603        .await
1604        .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1605
1606        let (exec_id, status, output, started_at, completed_at, event_count) = row
1607            .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1608
1609        Ok(ExecutionInfo {
1610            execution_id: exec_id as u64,
1611            status,
1612            output,
1613            started_at: started_at.timestamp_millis() as u64,
1614            completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1615            event_count: event_count as usize,
1616        })
1617    }
1618
1619    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1620    async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1621        let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1622            "SELECT * FROM {}.get_system_metrics()",
1623            self.schema_name
1624        ))
1625        .fetch_optional(&*self.pool)
1626        .await
1627        .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1628
1629        let (
1630            total_instances,
1631            total_executions,
1632            running_instances,
1633            completed_instances,
1634            failed_instances,
1635            total_events,
1636        ) = row.ok_or_else(|| {
1637            ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1638        })?;
1639
1640        Ok(SystemMetrics {
1641            total_instances: total_instances as u64,
1642            total_executions: total_executions as u64,
1643            running_instances: running_instances as u64,
1644            completed_instances: completed_instances as u64,
1645            failed_instances: failed_instances as u64,
1646            total_events: total_events as u64,
1647        })
1648    }
1649
1650    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1651    async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1652        let now_ms = Self::now_millis();
1653
1654        let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1655            "SELECT * FROM {}.get_queue_depths($1)",
1656            self.schema_name
1657        ))
1658        .bind(now_ms)
1659        .fetch_optional(&*self.pool)
1660        .await
1661        .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1662
1663        let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1664            ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1665        })?;
1666
1667        Ok(QueueDepths {
1668            orchestrator_queue: orchestrator_queue as usize,
1669            worker_queue: worker_queue as usize,
1670            timer_queue: 0, // Timers are in orchestrator queue with delayed visibility
1671        })
1672    }
1673
1674    // ===== Hierarchy Primitive Operations =====
1675
1676    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1677    async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError> {
1678        sqlx::query_scalar(&format!(
1679            "SELECT child_instance_id FROM {}.list_children($1)",
1680            self.schema_name
1681        ))
1682        .bind(instance_id)
1683        .fetch_all(&*self.pool)
1684        .await
1685        .map_err(|e| Self::sqlx_to_provider_error("list_children", e))
1686    }
1687
1688    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1689    async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError> {
1690        // The stored procedure raises an exception if instance doesn't exist
1691        // Otherwise returns the parent_instance_id (which may be NULL)
1692        let result: Result<Option<String>, _> =
1693            sqlx::query_scalar(&format!("SELECT {}.get_parent_id($1)", self.schema_name))
1694                .bind(instance_id)
1695                .fetch_one(&*self.pool)
1696                .await;
1697
1698        match result {
1699            Ok(parent_id) => Ok(parent_id),
1700            Err(e) => {
1701                let err_str = e.to_string();
1702                if err_str.contains("Instance not found") {
1703                    Err(ProviderError::permanent(
1704                        "get_parent_id",
1705                        format!("Instance not found: {}", instance_id),
1706                    ))
1707                } else {
1708                    Err(Self::sqlx_to_provider_error("get_parent_id", e))
1709                }
1710            }
1711        }
1712    }
1713
1714    // ===== Deletion Operations =====
1715
1716    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1717    async fn delete_instances_atomic(
1718        &self,
1719        ids: &[String],
1720        force: bool,
1721    ) -> Result<DeleteInstanceResult, ProviderError> {
1722        if ids.is_empty() {
1723            return Ok(DeleteInstanceResult::default());
1724        }
1725
1726        let row: Option<(i64, i64, i64, i64)> = sqlx::query_as(&format!(
1727            "SELECT * FROM {}.delete_instances_atomic($1, $2)",
1728            self.schema_name
1729        ))
1730        .bind(ids)
1731        .bind(force)
1732        .fetch_optional(&*self.pool)
1733        .await
1734        .map_err(|e| {
1735            let err_str = e.to_string();
1736            if err_str.contains("is Running") {
1737                ProviderError::permanent("delete_instances_atomic", err_str)
1738            } else if err_str.contains("Orphan detected") {
1739                ProviderError::permanent("delete_instances_atomic", err_str)
1740            } else {
1741                Self::sqlx_to_provider_error("delete_instances_atomic", e)
1742            }
1743        })?;
1744
1745        let (instances_deleted, executions_deleted, events_deleted, queue_messages_deleted) =
1746            row.unwrap_or((0, 0, 0, 0));
1747
1748        debug!(
1749            target = "duroxide::providers::postgres",
1750            operation = "delete_instances_atomic",
1751            instances_deleted = instances_deleted,
1752            executions_deleted = executions_deleted,
1753            events_deleted = events_deleted,
1754            queue_messages_deleted = queue_messages_deleted,
1755            "Deleted instances atomically"
1756        );
1757
1758        Ok(DeleteInstanceResult {
1759            instances_deleted: instances_deleted as u64,
1760            executions_deleted: executions_deleted as u64,
1761            events_deleted: events_deleted as u64,
1762            queue_messages_deleted: queue_messages_deleted as u64,
1763        })
1764    }
1765
1766    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1767    async fn delete_instance_bulk(
1768        &self,
1769        filter: InstanceFilter,
1770    ) -> Result<DeleteInstanceResult, ProviderError> {
1771        // Build query to find matching root instances in terminal states
1772        let mut sql = format!(
1773            r#"
1774            SELECT i.instance_id
1775            FROM {}.instances i
1776            LEFT JOIN {}.executions e ON i.instance_id = e.instance_id 
1777              AND i.current_execution_id = e.execution_id
1778            WHERE i.parent_instance_id IS NULL
1779              AND e.status IN ('Completed', 'Failed', 'ContinuedAsNew')
1780            "#,
1781            self.schema_name, self.schema_name
1782        );
1783
1784        // Add instance_ids filter if provided
1785        if let Some(ref ids) = filter.instance_ids {
1786            if ids.is_empty() {
1787                return Ok(DeleteInstanceResult::default());
1788            }
1789            let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1790            sql.push_str(&format!(
1791                " AND i.instance_id IN ({})",
1792                placeholders.join(", ")
1793            ));
1794        }
1795
1796        // Add completed_before filter if provided
1797        if filter.completed_before.is_some() {
1798            let param_num = filter
1799                .instance_ids
1800                .as_ref()
1801                .map(|ids| ids.len())
1802                .unwrap_or(0)
1803                + 1;
1804            sql.push_str(&format!(
1805                " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1806                param_num
1807            ));
1808        }
1809
1810        // Add limit
1811        let limit = filter.limit.unwrap_or(1000);
1812        let limit_param_num = filter
1813            .instance_ids
1814            .as_ref()
1815            .map(|ids| ids.len())
1816            .unwrap_or(0)
1817            + if filter.completed_before.is_some() {
1818                1
1819            } else {
1820                0
1821            }
1822            + 1;
1823        sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1824
1825        // Build and execute query
1826        let mut query = sqlx::query_scalar::<_, String>(&sql);
1827        if let Some(ref ids) = filter.instance_ids {
1828            for id in ids {
1829                query = query.bind(id);
1830            }
1831        }
1832        if let Some(completed_before) = filter.completed_before {
1833            query = query.bind(completed_before as i64);
1834        }
1835        query = query.bind(limit as i64);
1836
1837        let instance_ids: Vec<String> = query
1838            .fetch_all(&*self.pool)
1839            .await
1840            .map_err(|e| Self::sqlx_to_provider_error("delete_instance_bulk", e))?;
1841
1842        if instance_ids.is_empty() {
1843            return Ok(DeleteInstanceResult::default());
1844        }
1845
1846        // Delete each instance with cascade
1847        let mut result = DeleteInstanceResult::default();
1848
1849        for instance_id in &instance_ids {
1850            // Get full tree for this root
1851            let tree = self.get_instance_tree(instance_id).await?;
1852
1853            // Atomic delete (tree.all_ids is already in deletion order: children first)
1854            let delete_result = self.delete_instances_atomic(&tree.all_ids, true).await?;
1855            result.instances_deleted += delete_result.instances_deleted;
1856            result.executions_deleted += delete_result.executions_deleted;
1857            result.events_deleted += delete_result.events_deleted;
1858            result.queue_messages_deleted += delete_result.queue_messages_deleted;
1859        }
1860
1861        debug!(
1862            target = "duroxide::providers::postgres",
1863            operation = "delete_instance_bulk",
1864            instances_deleted = result.instances_deleted,
1865            executions_deleted = result.executions_deleted,
1866            events_deleted = result.events_deleted,
1867            queue_messages_deleted = result.queue_messages_deleted,
1868            "Bulk deleted instances"
1869        );
1870
1871        Ok(result)
1872    }
1873
1874    // ===== Pruning Operations =====
1875
1876    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1877    async fn prune_executions(
1878        &self,
1879        instance_id: &str,
1880        options: PruneOptions,
1881    ) -> Result<PruneResult, ProviderError> {
1882        let keep_last: Option<i32> = options.keep_last.map(|v| v as i32);
1883        let completed_before_ms: Option<i64> = options.completed_before.map(|v| v as i64);
1884
1885        let row: Option<(i64, i64, i64)> = sqlx::query_as(&format!(
1886            "SELECT * FROM {}.prune_executions($1, $2, $3)",
1887            self.schema_name
1888        ))
1889        .bind(instance_id)
1890        .bind(keep_last)
1891        .bind(completed_before_ms)
1892        .fetch_optional(&*self.pool)
1893        .await
1894        .map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?;
1895
1896        let (instances_processed, executions_deleted, events_deleted) = row.unwrap_or((0, 0, 0));
1897
1898        debug!(
1899            target = "duroxide::providers::postgres",
1900            operation = "prune_executions",
1901            instance_id = %instance_id,
1902            instances_processed = instances_processed,
1903            executions_deleted = executions_deleted,
1904            events_deleted = events_deleted,
1905            "Pruned executions"
1906        );
1907
1908        Ok(PruneResult {
1909            instances_processed: instances_processed as u64,
1910            executions_deleted: executions_deleted as u64,
1911            events_deleted: events_deleted as u64,
1912        })
1913    }
1914
1915    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1916    async fn prune_executions_bulk(
1917        &self,
1918        filter: InstanceFilter,
1919        options: PruneOptions,
1920    ) -> Result<PruneResult, ProviderError> {
1921        // Find matching instances (all statuses - prune_executions protects current execution)
1922        // Note: We include Running instances because long-running orchestrations (e.g., with
1923        // ContinueAsNew) may have old executions that need pruning. The underlying prune_executions
1924        // call safely skips the current execution regardless of its status.
1925        let mut sql = format!(
1926            r#"
1927            SELECT i.instance_id
1928            FROM {}.instances i
1929            LEFT JOIN {}.executions e ON i.instance_id = e.instance_id 
1930              AND i.current_execution_id = e.execution_id
1931            WHERE 1=1
1932            "#,
1933            self.schema_name, self.schema_name
1934        );
1935
1936        // Add instance_ids filter if provided
1937        if let Some(ref ids) = filter.instance_ids {
1938            if ids.is_empty() {
1939                return Ok(PruneResult::default());
1940            }
1941            let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1942            sql.push_str(&format!(
1943                " AND i.instance_id IN ({})",
1944                placeholders.join(", ")
1945            ));
1946        }
1947
1948        // Add completed_before filter if provided
1949        if filter.completed_before.is_some() {
1950            let param_num = filter
1951                .instance_ids
1952                .as_ref()
1953                .map(|ids| ids.len())
1954                .unwrap_or(0)
1955                + 1;
1956            sql.push_str(&format!(
1957                " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1958                param_num
1959            ));
1960        }
1961
1962        // Add limit
1963        let limit = filter.limit.unwrap_or(1000);
1964        let limit_param_num = filter
1965            .instance_ids
1966            .as_ref()
1967            .map(|ids| ids.len())
1968            .unwrap_or(0)
1969            + if filter.completed_before.is_some() {
1970                1
1971            } else {
1972                0
1973            }
1974            + 1;
1975        sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1976
1977        // Build and execute query
1978        let mut query = sqlx::query_scalar::<_, String>(&sql);
1979        if let Some(ref ids) = filter.instance_ids {
1980            for id in ids {
1981                query = query.bind(id);
1982            }
1983        }
1984        if let Some(completed_before) = filter.completed_before {
1985            query = query.bind(completed_before as i64);
1986        }
1987        query = query.bind(limit as i64);
1988
1989        let instance_ids: Vec<String> = query
1990            .fetch_all(&*self.pool)
1991            .await
1992            .map_err(|e| Self::sqlx_to_provider_error("prune_executions_bulk", e))?;
1993
1994        // Prune each instance
1995        let mut result = PruneResult::default();
1996
1997        for instance_id in &instance_ids {
1998            let single_result = self.prune_executions(instance_id, options.clone()).await?;
1999            result.instances_processed += single_result.instances_processed;
2000            result.executions_deleted += single_result.executions_deleted;
2001            result.events_deleted += single_result.events_deleted;
2002        }
2003
2004        debug!(
2005            target = "duroxide::providers::postgres",
2006            operation = "prune_executions_bulk",
2007            instances_processed = result.instances_processed,
2008            executions_deleted = result.executions_deleted,
2009            events_deleted = result.events_deleted,
2010            "Bulk pruned executions"
2011        );
2012
2013        Ok(result)
2014    }
2015}