Skip to main content

duroxide_pg/
provider.rs

1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4    DeleteInstanceResult, DispatcherCapabilityFilter, ExecutionInfo, ExecutionMetadata,
5    InstanceFilter, InstanceInfo, OrchestrationItem, Provider, ProviderAdmin, ProviderError,
6    PruneOptions, PruneResult, QueueDepths, ScheduledActivityIdentifier, SessionFetchConfig,
7    SystemMetrics, TagFilter, WorkItem,
8};
9use duroxide::{Event, EventKind, SystemStats};
10use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
11use std::sync::Arc;
12use std::time::Duration;
13use std::time::{SystemTime, UNIX_EPOCH};
14use tokio::time::sleep;
15use tracing::{debug, error, instrument, warn};
16
17use crate::migrations::MigrationRunner;
18
19/// PostgreSQL-based provider for Duroxide durable orchestrations.
20///
21/// Implements the [`Provider`] and [`ProviderAdmin`] traits from Duroxide,
22/// storing orchestration state, history, and work queues in PostgreSQL.
23///
24/// # Example
25///
26/// ```rust,no_run
27/// use duroxide_pg::PostgresProvider;
28///
29/// # async fn example() -> anyhow::Result<()> {
30/// // Connect using DATABASE_URL or explicit connection string
31/// let provider = PostgresProvider::new("postgres://localhost/mydb").await?;
32///
33/// // Or use a custom schema for isolation
34/// let provider = PostgresProvider::new_with_schema(
35///     "postgres://localhost/mydb",
36///     Some("my_app"),
37/// ).await?;
38/// # Ok(())
39/// # }
40/// ```
41pub struct PostgresProvider {
42    pool: Arc<PgPool>,
43    schema_name: String,
44}
45
46impl PostgresProvider {
47    pub async fn new(database_url: &str) -> Result<Self> {
48        Self::new_with_schema(database_url, None).await
49    }
50
51    pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
52        let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
53            .ok()
54            .and_then(|s| s.parse::<u32>().ok())
55            .unwrap_or(10);
56
57        let pool = PgPoolOptions::new()
58            .max_connections(max_connections)
59            .min_connections(1)
60            .acquire_timeout(std::time::Duration::from_secs(30))
61            .connect(database_url)
62            .await?;
63
64        let schema_name = schema_name.unwrap_or("public").to_string();
65
66        let provider = Self {
67            pool: Arc::new(pool),
68            schema_name: schema_name.clone(),
69        };
70
71        // Run migrations to initialize schema
72        let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
73        migration_runner.migrate().await?;
74
75        Ok(provider)
76    }
77
78    #[instrument(skip(self), target = "duroxide::providers::postgres")]
79    pub async fn initialize_schema(&self) -> Result<()> {
80        // Schema initialization is now handled by migrations
81        // This method is kept for backward compatibility but delegates to migrations
82        let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
83        migration_runner.migrate().await?;
84        Ok(())
85    }
86
87    /// Get current timestamp in milliseconds (Unix epoch)
88    fn now_millis() -> i64 {
89        SystemTime::now()
90            .duration_since(UNIX_EPOCH)
91            .unwrap()
92            .as_millis() as i64
93    }
94
95    /// Get schema-qualified table name
96    fn table_name(&self, table: &str) -> String {
97        format!("{}.{}", self.schema_name, table)
98    }
99
100    /// Get the database pool (for testing)
101    pub fn pool(&self) -> &PgPool {
102        &self.pool
103    }
104
105    /// Get the schema name (for testing)
106    pub fn schema_name(&self) -> &str {
107        &self.schema_name
108    }
109
110    /// Convert sqlx::Error to ProviderError with proper classification
111    fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
112        match e {
113            SqlxError::Database(ref db_err) => {
114                // PostgreSQL error codes
115                let code_opt = db_err.code();
116                let code = code_opt.as_deref();
117                if code == Some("40P01") {
118                    // Deadlock detected
119                    ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
120                } else if code == Some("40001") {
121                    // Serialization failure - permanent error (transaction conflict, not transient)
122                    ProviderError::permanent(operation, format!("Serialization failure: {e}"))
123                } else if code == Some("23505") {
124                    // Unique constraint violation (duplicate event)
125                    ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
126                } else if code == Some("23503") {
127                    // Foreign key constraint violation
128                    ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
129                } else if code == Some("0A000") {
130                    // Cached plan invalidated by concurrent DDL (e.g., migration replaced a function)
131                    ProviderError::retryable(operation, format!("Cached plan invalidated: {e}"))
132                } else {
133                    ProviderError::permanent(operation, format!("Database error: {e}"))
134                }
135            }
136            SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
137                ProviderError::retryable(operation, format!("Connection pool error: {e}"))
138            }
139            SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
140            _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
141        }
142    }
143
144    /// Convert TagFilter to SQL parameters (mode string + tag array)
145    fn tag_filter_to_sql(filter: &TagFilter) -> (&'static str, Vec<String>) {
146        match filter {
147            TagFilter::DefaultOnly => ("default_only", vec![]),
148            TagFilter::Tags(set) => {
149                let mut tags: Vec<String> = set.iter().cloned().collect();
150                tags.sort();
151                ("tags", tags)
152            }
153            TagFilter::DefaultAnd(set) => {
154                let mut tags: Vec<String> = set.iter().cloned().collect();
155                tags.sort();
156                ("default_and", tags)
157            }
158            TagFilter::Any => ("any", vec![]),
159            TagFilter::None => ("none", vec![]),
160        }
161    }
162
163    /// Clean up schema after tests (drops all tables and optionally the schema)
164    ///
165    /// **SAFETY**: Never drops the "public" schema itself, only tables within it.
166    /// Only drops the schema if it's a custom schema (not "public").
167    pub async fn cleanup_schema(&self) -> Result<()> {
168        const MAX_RETRIES: u32 = 5;
169        const BASE_RETRY_DELAY_MS: u64 = 50;
170
171        for attempt in 0..=MAX_RETRIES {
172            let cleanup_result = async {
173                // Call the stored procedure to drop all tables
174                sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
175                    .execute(&*self.pool)
176                    .await?;
177
178                // SAFETY: Never drop the "public" schema - it's a PostgreSQL system schema
179                // Only drop custom schemas created for testing
180                if self.schema_name != "public" {
181                    sqlx::query(&format!(
182                        "DROP SCHEMA IF EXISTS {} CASCADE",
183                        self.schema_name
184                    ))
185                    .execute(&*self.pool)
186                    .await?;
187                } else {
188                    // Explicit safeguard: we only drop tables from public schema, never the schema itself
189                    // This ensures we don't accidentally drop the default PostgreSQL schema
190                }
191
192                Ok::<(), SqlxError>(())
193            }
194            .await;
195
196            match cleanup_result {
197                Ok(()) => return Ok(()),
198                Err(SqlxError::Database(db_err)) if db_err.code().as_deref() == Some("40P01") => {
199                    if attempt < MAX_RETRIES {
200                        warn!(
201                            target = "duroxide::providers::postgres",
202                            schema = %self.schema_name,
203                            attempt = attempt + 1,
204                            "Deadlock during cleanup_schema, retrying"
205                        );
206                        sleep(Duration::from_millis(
207                            BASE_RETRY_DELAY_MS * (attempt as u64 + 1),
208                        ))
209                        .await;
210                        continue;
211                    }
212                    return Err(anyhow::anyhow!(db_err.to_string()));
213                }
214                Err(e) => return Err(anyhow::anyhow!(e.to_string())),
215            }
216        }
217
218        Ok(())
219    }
220}
221
222#[async_trait::async_trait]
223impl Provider for PostgresProvider {
224    fn name(&self) -> &str {
225        "duroxide-pg"
226    }
227
228    fn version(&self) -> &str {
229        env!("CARGO_PKG_VERSION")
230    }
231
232    #[instrument(skip(self), target = "duroxide::providers::postgres")]
233    async fn fetch_orchestration_item(
234        &self,
235        lock_timeout: Duration,
236        _poll_timeout: Duration,
237        filter: Option<&DispatcherCapabilityFilter>,
238    ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
239        let start = std::time::Instant::now();
240
241        const MAX_RETRIES: u32 = 3;
242        const RETRY_DELAY_MS: u64 = 50;
243
244        // Convert Duration to milliseconds
245        let lock_timeout_ms = lock_timeout.as_millis() as i64;
246        let mut _last_error: Option<ProviderError> = None;
247
248        // Extract version filter from capability filter
249        let (min_packed, max_packed) = if let Some(f) = filter {
250            if let Some(range) = f.supported_duroxide_versions.first() {
251                let min = range.min.major as i64 * 1_000_000
252                    + range.min.minor as i64 * 1_000
253                    + range.min.patch as i64;
254                let max = range.max.major as i64 * 1_000_000
255                    + range.max.minor as i64 * 1_000
256                    + range.max.patch as i64;
257                (Some(min), Some(max))
258            } else {
259                // Empty supported_duroxide_versions = supports nothing
260                return Ok(None);
261            }
262        } else {
263            (None, None)
264        };
265
266        for attempt in 0..=MAX_RETRIES {
267            let now_ms = Self::now_millis();
268
269            let result: Result<
270                Option<(
271                    String,
272                    String,
273                    String,
274                    i64,
275                    serde_json::Value,
276                    serde_json::Value,
277                    String,
278                    i32,
279                    serde_json::Value,
280                )>,
281                SqlxError,
282            > = sqlx::query_as(&format!(
283                "SELECT * FROM {}.fetch_orchestration_item($1, $2, $3, $4)",
284                self.schema_name
285            ))
286            .bind(now_ms)
287            .bind(lock_timeout_ms)
288            .bind(min_packed)
289            .bind(max_packed)
290            .fetch_optional(&*self.pool)
291            .await;
292
293            let row = match result {
294                Ok(r) => r,
295                Err(e) => {
296                    let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
297                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
298                        warn!(
299                            target = "duroxide::providers::postgres",
300                            operation = "fetch_orchestration_item",
301                            attempt = attempt + 1,
302                            error = %provider_err,
303                            "Retryable error, will retry"
304                        );
305                        _last_error = Some(provider_err);
306                        sleep(std::time::Duration::from_millis(
307                            RETRY_DELAY_MS * (attempt as u64 + 1),
308                        ))
309                        .await;
310                        continue;
311                    }
312                    return Err(provider_err);
313                }
314            };
315
316            if let Some((
317                instance_id,
318                orchestration_name,
319                orchestration_version,
320                execution_id,
321                history_json,
322                messages_json,
323                lock_token,
324                attempt_count,
325                kv_snapshot_json,
326            )) = row
327            {
328                let (history, history_error) =
329                    match serde_json::from_value::<Vec<Event>>(history_json) {
330                        Ok(h) => (h, None),
331                        Err(e) => {
332                            let error_msg = format!("Failed to deserialize history: {e}");
333                            warn!(
334                                target = "duroxide::providers::postgres",
335                                instance = %instance_id,
336                                error = %error_msg,
337                                "History deserialization failed, returning item with history_error"
338                            );
339                            (vec![], Some(error_msg))
340                        }
341                    };
342
343                let messages: Vec<WorkItem> =
344                    serde_json::from_value(messages_json).map_err(|e| {
345                        ProviderError::permanent(
346                            "fetch_orchestration_item",
347                            format!("Failed to deserialize messages: {e}"),
348                        )
349                    })?;
350                let kv_snapshot: std::collections::HashMap<String, duroxide::providers::KvEntry> = {
351                    let raw: std::collections::HashMap<String, serde_json::Value> =
352                        serde_json::from_value(kv_snapshot_json).unwrap_or_default();
353                    raw.into_iter()
354                        .filter_map(|(k, v)| {
355                            let value = v.get("value")?.as_str()?.to_string();
356                            let last_updated_at_ms =
357                                v.get("last_updated_at_ms")?.as_u64().unwrap_or(0);
358                            Some((
359                                k,
360                                duroxide::providers::KvEntry {
361                                    value,
362                                    last_updated_at_ms,
363                                },
364                            ))
365                        })
366                        .collect()
367                };
368
369                let duration_ms = start.elapsed().as_millis() as u64;
370                debug!(
371                    target = "duroxide::providers::postgres",
372                    operation = "fetch_orchestration_item",
373                    instance_id = %instance_id,
374                    execution_id = execution_id,
375                    message_count = messages.len(),
376                    history_count = history.len(),
377                    attempt_count = attempt_count,
378                    duration_ms = duration_ms,
379                    attempts = attempt + 1,
380                    "Fetched orchestration item via stored procedure"
381                );
382
383                // Orphan queue messages: if orchestration_name is "Unknown", there's
384                // no history, and ALL messages are QueueMessage items, these are orphan
385                // events enqueued before the orchestration started. Drop them by acking
386                // with empty deltas. Other work items (CancelInstance, etc.) may
387                // legitimately race with StartOrchestration and must not be dropped.
388                if orchestration_name == "Unknown"
389                    && history.is_empty()
390                    && messages
391                        .iter()
392                        .all(|m| matches!(m, WorkItem::QueueMessage { .. }))
393                {
394                    let message_count = messages.len();
395                    tracing::warn!(
396                        target = "duroxide::providers::postgres",
397                        instance = %instance_id,
398                        message_count,
399                        "Dropping orphan queue messages — events enqueued before orchestration started are not supported"
400                    );
401                    self.ack_orchestration_item(
402                        &lock_token,
403                        execution_id as u64,
404                        vec![],
405                        vec![],
406                        vec![],
407                        ExecutionMetadata::default(),
408                        vec![],
409                    )
410                    .await?;
411                    return Ok(None);
412                }
413
414                return Ok(Some((
415                    OrchestrationItem {
416                        instance: instance_id,
417                        orchestration_name,
418                        execution_id: execution_id as u64,
419                        version: orchestration_version,
420                        history,
421                        messages,
422                        history_error,
423                        kv_snapshot,
424                    },
425                    lock_token,
426                    attempt_count as u32,
427                )));
428            }
429
430            // No result found - return immediately (short polling behavior)
431            // Only retry with delay on retryable errors (handled above)
432            return Ok(None);
433        }
434
435        Ok(None)
436    }
437    #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
438    async fn ack_orchestration_item(
439        &self,
440        lock_token: &str,
441        execution_id: u64,
442        history_delta: Vec<Event>,
443        worker_items: Vec<WorkItem>,
444        orchestrator_items: Vec<WorkItem>,
445        metadata: ExecutionMetadata,
446        cancelled_activities: Vec<ScheduledActivityIdentifier>,
447    ) -> Result<(), ProviderError> {
448        let start = std::time::Instant::now();
449
450        const MAX_RETRIES: u32 = 3;
451        const RETRY_DELAY_MS: u64 = 50;
452
453        let mut history_delta_payload = Vec::with_capacity(history_delta.len());
454        for event in &history_delta {
455            if event.event_id() == 0 {
456                return Err(ProviderError::permanent(
457                    "ack_orchestration_item",
458                    "event_id must be set by runtime",
459                ));
460            }
461
462            let event_json = serde_json::to_string(event).map_err(|e| {
463                ProviderError::permanent(
464                    "ack_orchestration_item",
465                    format!("Failed to serialize event: {e}"),
466                )
467            })?;
468
469            let event_type = format!("{event:?}")
470                .split('{')
471                .next()
472                .unwrap_or("Unknown")
473                .trim()
474                .to_string();
475
476            history_delta_payload.push(serde_json::json!({
477                "event_id": event.event_id(),
478                "event_type": event_type,
479                "event_data": event_json,
480            }));
481        }
482
483        let history_delta_json = serde_json::Value::Array(history_delta_payload);
484
485        let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
486            ProviderError::permanent(
487                "ack_orchestration_item",
488                format!("Failed to serialize worker items: {e}"),
489            )
490        })?;
491
492        let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
493            ProviderError::permanent(
494                "ack_orchestration_item",
495                format!("Failed to serialize orchestrator items: {e}"),
496            )
497        })?;
498
499        // Scan history_delta for the last CustomStatusUpdated event
500        let (custom_status_action, custom_status_value): (Option<&str>, Option<&str>) = {
501            let mut last_status: Option<&Option<String>> = None;
502            for event in &history_delta {
503                if let EventKind::CustomStatusUpdated { ref status } = event.kind {
504                    last_status = Some(status);
505                }
506            }
507            match last_status {
508                Some(Some(s)) => (Some("set"), Some(s.as_str())),
509                Some(None) => (Some("clear"), None),
510                None => (None, None),
511            }
512        };
513
514        let kv_mutations: Vec<serde_json::Value> = history_delta
515            .iter()
516            .filter_map(|event| match &event.kind {
517                EventKind::KeyValueSet {
518                    key,
519                    value,
520                    last_updated_at_ms,
521                } => Some(serde_json::json!({
522                    "action": "set",
523                    "key": key,
524                    "value": value,
525                    "last_updated_at_ms": last_updated_at_ms,
526                })),
527                EventKind::KeyValueCleared { key } => Some(serde_json::json!({
528                    "action": "clear_key",
529                    "key": key,
530                })),
531                EventKind::KeyValuesCleared => Some(serde_json::json!({
532                    "action": "clear_all",
533                })),
534                _ => None,
535            })
536            .collect();
537
538        let metadata_json = serde_json::json!({
539            "orchestration_name": metadata.orchestration_name,
540            "orchestration_version": metadata.orchestration_version,
541            "status": metadata.status,
542            "output": metadata.output,
543            "parent_instance_id": metadata.parent_instance_id,
544            "pinned_duroxide_version": metadata.pinned_duroxide_version.as_ref().map(|v| {
545                serde_json::json!({
546                    "major": v.major,
547                    "minor": v.minor,
548                    "patch": v.patch,
549                })
550            }),
551            "custom_status_action": custom_status_action,
552            "custom_status_value": custom_status_value,
553            "kv_mutations": kv_mutations,
554        });
555
556        // Serialize cancelled activities for lock stealing
557        let cancelled_activities_json: Vec<serde_json::Value> = cancelled_activities
558            .iter()
559            .map(|a| {
560                serde_json::json!({
561                    "instance": a.instance,
562                    "execution_id": a.execution_id,
563                    "activity_id": a.activity_id,
564                })
565            })
566            .collect();
567        let cancelled_activities_json = serde_json::Value::Array(cancelled_activities_json);
568
569        for attempt in 0..=MAX_RETRIES {
570            let now_ms = Self::now_millis();
571            let result = sqlx::query(&format!(
572                "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6, $7, $8)",
573                self.schema_name
574            ))
575            .bind(lock_token)
576            .bind(now_ms)
577            .bind(execution_id as i64)
578            .bind(&history_delta_json)
579            .bind(&worker_items_json)
580            .bind(&orchestrator_items_json)
581            .bind(&metadata_json)
582            .bind(&cancelled_activities_json)
583            .execute(&*self.pool)
584            .await;
585
586            match result {
587                Ok(_) => {
588                    let duration_ms = start.elapsed().as_millis() as u64;
589                    debug!(
590                        target = "duroxide::providers::postgres",
591                        operation = "ack_orchestration_item",
592                        execution_id = execution_id,
593                        history_count = history_delta.len(),
594                        worker_items_count = worker_items.len(),
595                        orchestrator_items_count = orchestrator_items.len(),
596                        cancelled_activities_count = cancelled_activities.len(),
597                        duration_ms = duration_ms,
598                        attempts = attempt + 1,
599                        "Acknowledged orchestration item via stored procedure"
600                    );
601                    return Ok(());
602                }
603                Err(e) => {
604                    // Check for permanent errors first
605                    if let SqlxError::Database(db_err) = &e {
606                        if db_err.message().contains("Invalid lock token") {
607                            return Err(ProviderError::permanent(
608                                "ack_orchestration_item",
609                                "Invalid lock token",
610                            ));
611                        }
612                    } else if e.to_string().contains("Invalid lock token") {
613                        return Err(ProviderError::permanent(
614                            "ack_orchestration_item",
615                            "Invalid lock token",
616                        ));
617                    }
618
619                    let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
620                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
621                        warn!(
622                            target = "duroxide::providers::postgres",
623                            operation = "ack_orchestration_item",
624                            attempt = attempt + 1,
625                            error = %provider_err,
626                            "Retryable error, will retry"
627                        );
628                        sleep(std::time::Duration::from_millis(
629                            RETRY_DELAY_MS * (attempt as u64 + 1),
630                        ))
631                        .await;
632                        continue;
633                    }
634                    return Err(provider_err);
635                }
636            }
637        }
638
639        // Should never reach here, but just in case
640        Ok(())
641    }
642    #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
643    async fn abandon_orchestration_item(
644        &self,
645        lock_token: &str,
646        delay: Option<Duration>,
647        ignore_attempt: bool,
648    ) -> Result<(), ProviderError> {
649        let start = std::time::Instant::now();
650        let now_ms = Self::now_millis();
651        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
652
653        let instance_id = match sqlx::query_scalar::<_, String>(&format!(
654            "SELECT {}.abandon_orchestration_item($1, $2, $3, $4)",
655            self.schema_name
656        ))
657        .bind(lock_token)
658        .bind(now_ms)
659        .bind(delay_param)
660        .bind(ignore_attempt)
661        .fetch_one(&*self.pool)
662        .await
663        {
664            Ok(instance_id) => instance_id,
665            Err(e) => {
666                if let SqlxError::Database(db_err) = &e {
667                    if db_err.message().contains("Invalid lock token") {
668                        return Err(ProviderError::permanent(
669                            "abandon_orchestration_item",
670                            "Invalid lock token",
671                        ));
672                    }
673                } else if e.to_string().contains("Invalid lock token") {
674                    return Err(ProviderError::permanent(
675                        "abandon_orchestration_item",
676                        "Invalid lock token",
677                    ));
678                }
679
680                return Err(Self::sqlx_to_provider_error(
681                    "abandon_orchestration_item",
682                    e,
683                ));
684            }
685        };
686
687        let duration_ms = start.elapsed().as_millis() as u64;
688        debug!(
689            target = "duroxide::providers::postgres",
690            operation = "abandon_orchestration_item",
691            instance_id = %instance_id,
692            delay_ms = delay.map(|d| d.as_millis() as u64),
693            ignore_attempt = ignore_attempt,
694            duration_ms = duration_ms,
695            "Abandoned orchestration item via stored procedure"
696        );
697
698        Ok(())
699    }
700
701    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
702    async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
703        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
704            "SELECT out_event_data FROM {}.fetch_history($1)",
705            self.schema_name
706        ))
707        .bind(instance)
708        .fetch_all(&*self.pool)
709        .await
710        .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
711
712        event_data_rows
713            .into_iter()
714            .map(|event_data| {
715                serde_json::from_str::<Event>(&event_data).map_err(|e| {
716                    ProviderError::permanent("read", format!("Failed to deserialize event: {e}"))
717                })
718            })
719            .collect()
720    }
721
722    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
723    async fn append_with_execution(
724        &self,
725        instance: &str,
726        execution_id: u64,
727        new_events: Vec<Event>,
728    ) -> Result<(), ProviderError> {
729        if new_events.is_empty() {
730            return Ok(());
731        }
732
733        let mut events_payload = Vec::with_capacity(new_events.len());
734        for event in &new_events {
735            if event.event_id() == 0 {
736                error!(
737                    target = "duroxide::providers::postgres",
738                    operation = "append_with_execution",
739                    error_type = "validation_error",
740                    instance_id = %instance,
741                    execution_id = execution_id,
742                    "event_id must be set by runtime"
743                );
744                return Err(ProviderError::permanent(
745                    "append_with_execution",
746                    "event_id must be set by runtime",
747                ));
748            }
749
750            let event_json = serde_json::to_string(event).map_err(|e| {
751                ProviderError::permanent(
752                    "append_with_execution",
753                    format!("Failed to serialize event: {e}"),
754                )
755            })?;
756
757            let event_type = format!("{event:?}")
758                .split('{')
759                .next()
760                .unwrap_or("Unknown")
761                .trim()
762                .to_string();
763
764            events_payload.push(serde_json::json!({
765                "event_id": event.event_id(),
766                "event_type": event_type,
767                "event_data": event_json,
768            }));
769        }
770
771        let events_json = serde_json::Value::Array(events_payload);
772
773        sqlx::query(&format!(
774            "SELECT {}.append_history($1, $2, $3)",
775            self.schema_name
776        ))
777        .bind(instance)
778        .bind(execution_id as i64)
779        .bind(events_json)
780        .execute(&*self.pool)
781        .await
782        .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
783
784        debug!(
785            target = "duroxide::providers::postgres",
786            operation = "append_with_execution",
787            instance_id = %instance,
788            execution_id = execution_id,
789            event_count = new_events.len(),
790            "Appended history events via stored procedure"
791        );
792
793        Ok(())
794    }
795
796    #[instrument(skip(self), target = "duroxide::providers::postgres")]
797    async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
798        let work_item = serde_json::to_string(&item).map_err(|e| {
799            ProviderError::permanent(
800                "enqueue_worker_work",
801                format!("Failed to serialize work item: {e}"),
802            )
803        })?;
804
805        let now_ms = Self::now_millis();
806
807        // Extract activity identification, session_id, and tag for ActivityExecute items
808        let (instance_id, execution_id, activity_id, session_id, tag) = match &item {
809            WorkItem::ActivityExecute {
810                instance,
811                execution_id,
812                id,
813                session_id,
814                tag,
815                ..
816            } => (
817                Some(instance.clone()),
818                Some(*execution_id as i64),
819                Some(*id as i64),
820                session_id.clone(),
821                tag.clone(),
822            ),
823            _ => (None, None, None, None, None),
824        };
825
826        sqlx::query(&format!(
827            "SELECT {}.enqueue_worker_work($1, $2, $3, $4, $5, $6, $7)",
828            self.schema_name
829        ))
830        .bind(work_item)
831        .bind(now_ms)
832        .bind(&instance_id)
833        .bind(execution_id)
834        .bind(activity_id)
835        .bind(&session_id)
836        .bind(&tag)
837        .execute(&*self.pool)
838        .await
839        .map_err(|e| {
840            error!(
841                target = "duroxide::providers::postgres",
842                operation = "enqueue_worker_work",
843                error_type = "database_error",
844                error = %e,
845                "Failed to enqueue worker work"
846            );
847            Self::sqlx_to_provider_error("enqueue_worker_work", e)
848        })?;
849
850        Ok(())
851    }
852
853    #[instrument(skip(self), target = "duroxide::providers::postgres")]
854    async fn fetch_work_item(
855        &self,
856        lock_timeout: Duration,
857        _poll_timeout: Duration,
858        session: Option<&SessionFetchConfig>,
859        tag_filter: &TagFilter,
860    ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
861        // None filter means don't process any activities
862        if matches!(tag_filter, TagFilter::None) {
863            return Ok(None);
864        }
865
866        let start = std::time::Instant::now();
867
868        // Convert Duration to milliseconds
869        let lock_timeout_ms = lock_timeout.as_millis() as i64;
870
871        // Extract session parameters
872        let (owner_id, session_lock_timeout_ms): (Option<&str>, Option<i64>) = match session {
873            Some(config) => (
874                Some(&config.owner_id),
875                Some(config.lock_timeout.as_millis() as i64),
876            ),
877            None => (None, None),
878        };
879
880        // Convert TagFilter to SQL parameters
881        let (tag_mode, tag_names) = Self::tag_filter_to_sql(tag_filter);
882
883        let row = match sqlx::query_as::<_, (String, String, i32)>(&format!(
884            "SELECT * FROM {}.fetch_work_item($1, $2, $3, $4, $5, $6)",
885            self.schema_name
886        ))
887        .bind(Self::now_millis())
888        .bind(lock_timeout_ms)
889        .bind(owner_id)
890        .bind(session_lock_timeout_ms)
891        .bind(&tag_names)
892        .bind(tag_mode)
893        .fetch_optional(&*self.pool)
894        .await
895        {
896            Ok(row) => row,
897            Err(e) => {
898                return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
899            }
900        };
901
902        let (work_item_json, lock_token, attempt_count) = match row {
903            Some(row) => row,
904            None => return Ok(None),
905        };
906
907        let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
908            ProviderError::permanent(
909                "fetch_work_item",
910                format!("Failed to deserialize worker item: {e}"),
911            )
912        })?;
913
914        let duration_ms = start.elapsed().as_millis() as u64;
915
916        // Extract instance for logging - different work item types have different structures
917        let instance_id = match &work_item {
918            WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
919            WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
920            WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
921            WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
922            WorkItem::TimerFired { instance, .. } => instance.as_str(),
923            WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
924            WorkItem::CancelInstance { instance, .. } => instance.as_str(),
925            WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
926            WorkItem::SubOrchCompleted {
927                parent_instance, ..
928            } => parent_instance.as_str(),
929            WorkItem::SubOrchFailed {
930                parent_instance, ..
931            } => parent_instance.as_str(),
932            WorkItem::QueueMessage { instance, .. } => instance.as_str(),
933        };
934
935        debug!(
936            target = "duroxide::providers::postgres",
937            operation = "fetch_work_item",
938            instance_id = %instance_id,
939            attempt_count = attempt_count,
940            duration_ms = duration_ms,
941            "Fetched activity work item via stored procedure"
942        );
943
944        Ok(Some((work_item, lock_token, attempt_count as u32)))
945    }
946
947    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
948    async fn ack_work_item(
949        &self,
950        token: &str,
951        completion: Option<WorkItem>,
952    ) -> Result<(), ProviderError> {
953        let start = std::time::Instant::now();
954
955        // If no completion provided (e.g., cancelled activity), just delete the item
956        let Some(completion) = completion else {
957            let now_ms = Self::now_millis();
958            // Call ack_worker with NULL completion to delete without enqueueing
959            sqlx::query(&format!(
960                "SELECT {}.ack_worker($1, NULL, NULL, $2)",
961                self.schema_name
962            ))
963            .bind(token)
964            .bind(now_ms)
965            .execute(&*self.pool)
966            .await
967            .map_err(|e| {
968                if e.to_string().contains("Worker queue item not found") {
969                    ProviderError::permanent(
970                        "ack_worker",
971                        "Worker queue item not found or already processed",
972                    )
973                } else {
974                    Self::sqlx_to_provider_error("ack_worker", e)
975                }
976            })?;
977
978            let duration_ms = start.elapsed().as_millis() as u64;
979            debug!(
980                target = "duroxide::providers::postgres",
981                operation = "ack_worker",
982                token = %token,
983                duration_ms = duration_ms,
984                "Acknowledged worker without completion (cancelled)"
985            );
986            return Ok(());
987        };
988
989        // Extract instance ID from completion WorkItem
990        let instance_id = match &completion {
991            WorkItem::ActivityCompleted { instance, .. }
992            | WorkItem::ActivityFailed { instance, .. } => instance,
993            _ => {
994                error!(
995                    target = "duroxide::providers::postgres",
996                    operation = "ack_worker",
997                    error_type = "invalid_completion_type",
998                    "Invalid completion work item type"
999                );
1000                return Err(ProviderError::permanent(
1001                    "ack_worker",
1002                    "Invalid completion work item type",
1003                ));
1004            }
1005        };
1006
1007        let completion_json = serde_json::to_string(&completion).map_err(|e| {
1008            ProviderError::permanent("ack_worker", format!("Failed to serialize completion: {e}"))
1009        })?;
1010
1011        let now_ms = Self::now_millis();
1012
1013        // Call stored procedure to atomically delete worker item and enqueue completion
1014        sqlx::query(&format!(
1015            "SELECT {}.ack_worker($1, $2, $3, $4)",
1016            self.schema_name
1017        ))
1018        .bind(token)
1019        .bind(instance_id)
1020        .bind(completion_json)
1021        .bind(now_ms)
1022        .execute(&*self.pool)
1023        .await
1024        .map_err(|e| {
1025            if e.to_string().contains("Worker queue item not found") {
1026                error!(
1027                    target = "duroxide::providers::postgres",
1028                    operation = "ack_worker",
1029                    error_type = "worker_item_not_found",
1030                    token = %token,
1031                    "Worker queue item not found or already processed"
1032                );
1033                ProviderError::permanent(
1034                    "ack_worker",
1035                    "Worker queue item not found or already processed",
1036                )
1037            } else {
1038                Self::sqlx_to_provider_error("ack_worker", e)
1039            }
1040        })?;
1041
1042        let duration_ms = start.elapsed().as_millis() as u64;
1043        debug!(
1044            target = "duroxide::providers::postgres",
1045            operation = "ack_worker",
1046            instance_id = %instance_id,
1047            duration_ms = duration_ms,
1048            "Acknowledged worker and enqueued completion"
1049        );
1050
1051        Ok(())
1052    }
1053
1054    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1055    async fn renew_work_item_lock(
1056        &self,
1057        token: &str,
1058        extend_for: Duration,
1059    ) -> Result<(), ProviderError> {
1060        let start = std::time::Instant::now();
1061
1062        // Get current time from application for consistent time reference
1063        let now_ms = Self::now_millis();
1064
1065        // Convert Duration to seconds for the stored procedure
1066        let extend_secs = extend_for.as_secs() as i64;
1067
1068        match sqlx::query(&format!(
1069            "SELECT {}.renew_work_item_lock($1, $2, $3)",
1070            self.schema_name
1071        ))
1072        .bind(token)
1073        .bind(now_ms)
1074        .bind(extend_secs)
1075        .execute(&*self.pool)
1076        .await
1077        {
1078            Ok(_) => {
1079                let duration_ms = start.elapsed().as_millis() as u64;
1080                debug!(
1081                    target = "duroxide::providers::postgres",
1082                    operation = "renew_work_item_lock",
1083                    token = %token,
1084                    extend_for_secs = extend_secs,
1085                    duration_ms = duration_ms,
1086                    "Work item lock renewed successfully"
1087                );
1088                Ok(())
1089            }
1090            Err(e) => {
1091                if let SqlxError::Database(db_err) = &e {
1092                    if db_err.message().contains("Lock token invalid") {
1093                        return Err(ProviderError::permanent(
1094                            "renew_work_item_lock",
1095                            "Lock token invalid, expired, or already acked",
1096                        ));
1097                    }
1098                } else if e.to_string().contains("Lock token invalid") {
1099                    return Err(ProviderError::permanent(
1100                        "renew_work_item_lock",
1101                        "Lock token invalid, expired, or already acked",
1102                    ));
1103                }
1104
1105                Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
1106            }
1107        }
1108    }
1109
1110    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1111    async fn abandon_work_item(
1112        &self,
1113        token: &str,
1114        delay: Option<Duration>,
1115        ignore_attempt: bool,
1116    ) -> Result<(), ProviderError> {
1117        let start = std::time::Instant::now();
1118        let now_ms = Self::now_millis();
1119        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
1120
1121        match sqlx::query(&format!(
1122            "SELECT {}.abandon_work_item($1, $2, $3, $4)",
1123            self.schema_name
1124        ))
1125        .bind(token)
1126        .bind(now_ms)
1127        .bind(delay_param)
1128        .bind(ignore_attempt)
1129        .execute(&*self.pool)
1130        .await
1131        {
1132            Ok(_) => {
1133                let duration_ms = start.elapsed().as_millis() as u64;
1134                debug!(
1135                    target = "duroxide::providers::postgres",
1136                    operation = "abandon_work_item",
1137                    token = %token,
1138                    delay_ms = delay.map(|d| d.as_millis() as u64),
1139                    ignore_attempt = ignore_attempt,
1140                    duration_ms = duration_ms,
1141                    "Abandoned work item via stored procedure"
1142                );
1143                Ok(())
1144            }
1145            Err(e) => {
1146                if let SqlxError::Database(db_err) = &e {
1147                    if db_err.message().contains("Invalid lock token")
1148                        || db_err.message().contains("already acked")
1149                    {
1150                        return Err(ProviderError::permanent(
1151                            "abandon_work_item",
1152                            "Invalid lock token or already acked",
1153                        ));
1154                    }
1155                } else if e.to_string().contains("Invalid lock token")
1156                    || e.to_string().contains("already acked")
1157                {
1158                    return Err(ProviderError::permanent(
1159                        "abandon_work_item",
1160                        "Invalid lock token or already acked",
1161                    ));
1162                }
1163
1164                Err(Self::sqlx_to_provider_error("abandon_work_item", e))
1165            }
1166        }
1167    }
1168
1169    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1170    async fn renew_orchestration_item_lock(
1171        &self,
1172        token: &str,
1173        extend_for: Duration,
1174    ) -> Result<(), ProviderError> {
1175        let start = std::time::Instant::now();
1176
1177        // Get current time from application for consistent time reference
1178        let now_ms = Self::now_millis();
1179
1180        // Convert Duration to seconds for the stored procedure
1181        let extend_secs = extend_for.as_secs() as i64;
1182
1183        match sqlx::query(&format!(
1184            "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
1185            self.schema_name
1186        ))
1187        .bind(token)
1188        .bind(now_ms)
1189        .bind(extend_secs)
1190        .execute(&*self.pool)
1191        .await
1192        {
1193            Ok(_) => {
1194                let duration_ms = start.elapsed().as_millis() as u64;
1195                debug!(
1196                    target = "duroxide::providers::postgres",
1197                    operation = "renew_orchestration_item_lock",
1198                    token = %token,
1199                    extend_for_secs = extend_secs,
1200                    duration_ms = duration_ms,
1201                    "Orchestration item lock renewed successfully"
1202                );
1203                Ok(())
1204            }
1205            Err(e) => {
1206                if let SqlxError::Database(db_err) = &e {
1207                    if db_err.message().contains("Lock token invalid")
1208                        || db_err.message().contains("expired")
1209                        || db_err.message().contains("already released")
1210                    {
1211                        return Err(ProviderError::permanent(
1212                            "renew_orchestration_item_lock",
1213                            "Lock token invalid, expired, or already released",
1214                        ));
1215                    }
1216                } else if e.to_string().contains("Lock token invalid")
1217                    || e.to_string().contains("expired")
1218                    || e.to_string().contains("already released")
1219                {
1220                    return Err(ProviderError::permanent(
1221                        "renew_orchestration_item_lock",
1222                        "Lock token invalid, expired, or already released",
1223                    ));
1224                }
1225
1226                Err(Self::sqlx_to_provider_error(
1227                    "renew_orchestration_item_lock",
1228                    e,
1229                ))
1230            }
1231        }
1232    }
1233
1234    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1235    async fn enqueue_for_orchestrator(
1236        &self,
1237        item: WorkItem,
1238        delay: Option<Duration>,
1239    ) -> Result<(), ProviderError> {
1240        let work_item = serde_json::to_string(&item).map_err(|e| {
1241            ProviderError::permanent(
1242                "enqueue_orchestrator_work",
1243                format!("Failed to serialize work item: {e}"),
1244            )
1245        })?;
1246
1247        // Extract instance ID from WorkItem enum
1248        let instance_id = match &item {
1249            WorkItem::StartOrchestration { instance, .. }
1250            | WorkItem::ActivityCompleted { instance, .. }
1251            | WorkItem::ActivityFailed { instance, .. }
1252            | WorkItem::TimerFired { instance, .. }
1253            | WorkItem::ExternalRaised { instance, .. }
1254            | WorkItem::CancelInstance { instance, .. }
1255            | WorkItem::ContinueAsNew { instance, .. }
1256            | WorkItem::QueueMessage { instance, .. } => instance,
1257            WorkItem::SubOrchCompleted {
1258                parent_instance, ..
1259            }
1260            | WorkItem::SubOrchFailed {
1261                parent_instance, ..
1262            } => parent_instance,
1263            WorkItem::ActivityExecute { .. } => {
1264                return Err(ProviderError::permanent(
1265                    "enqueue_orchestrator_work",
1266                    "ActivityExecute should go to worker queue, not orchestrator queue",
1267                ));
1268            }
1269        };
1270
1271        // Determine visible_at: use max of fire_at_ms (for TimerFired) and delay
1272        let now_ms = Self::now_millis();
1273
1274        let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
1275            if *fire_at_ms > 0 {
1276                // Take max of fire_at_ms and delay (if provided)
1277                if let Some(delay) = delay {
1278                    std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
1279                } else {
1280                    *fire_at_ms
1281                }
1282            } else {
1283                // fire_at_ms is 0, use delay or NOW()
1284                delay
1285                    .map(|d| now_ms as u64 + d.as_millis() as u64)
1286                    .unwrap_or(now_ms as u64)
1287            }
1288        } else {
1289            // Non-timer item: use delay or NOW()
1290            delay
1291                .map(|d| now_ms as u64 + d.as_millis() as u64)
1292                .unwrap_or(now_ms as u64)
1293        };
1294
1295        let visible_at = Utc
1296            .timestamp_millis_opt(visible_at_ms as i64)
1297            .single()
1298            .ok_or_else(|| {
1299                ProviderError::permanent(
1300                    "enqueue_orchestrator_work",
1301                    "Invalid visible_at timestamp",
1302                )
1303            })?;
1304
1305        // ⚠️ CRITICAL: DO NOT extract orchestration metadata - instance creation happens via ack_orchestration_item metadata
1306        // Pass NULL for orchestration_name, orchestration_version, execution_id parameters
1307
1308        // Call stored procedure to enqueue work
1309        sqlx::query(&format!(
1310            "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
1311            self.schema_name
1312        ))
1313        .bind(instance_id)
1314        .bind(&work_item)
1315        .bind(visible_at)
1316        .bind::<Option<String>>(None) // orchestration_name - NULL
1317        .bind::<Option<String>>(None) // orchestration_version - NULL
1318        .bind::<Option<i64>>(None) // execution_id - NULL
1319        .execute(&*self.pool)
1320        .await
1321        .map_err(|e| {
1322            error!(
1323                target = "duroxide::providers::postgres",
1324                operation = "enqueue_orchestrator_work",
1325                error_type = "database_error",
1326                error = %e,
1327                instance_id = %instance_id,
1328                "Failed to enqueue orchestrator work"
1329            );
1330            Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1331        })?;
1332
1333        debug!(
1334            target = "duroxide::providers::postgres",
1335            operation = "enqueue_orchestrator_work",
1336            instance_id = %instance_id,
1337            delay_ms = delay.map(|d| d.as_millis() as u64),
1338            "Enqueued orchestrator work"
1339        );
1340
1341        Ok(())
1342    }
1343
1344    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1345    async fn read_with_execution(
1346        &self,
1347        instance: &str,
1348        execution_id: u64,
1349    ) -> Result<Vec<Event>, ProviderError> {
1350        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1351            "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1352            self.table_name("history")
1353        ))
1354        .bind(instance)
1355        .bind(execution_id as i64)
1356        .fetch_all(&*self.pool)
1357        .await
1358        .map_err(|e| Self::sqlx_to_provider_error("read_with_execution", e))?;
1359
1360        event_data_rows
1361            .into_iter()
1362            .map(|event_data| {
1363                serde_json::from_str::<Event>(&event_data).map_err(|e| {
1364                    ProviderError::permanent(
1365                        "read_with_execution",
1366                        format!("Failed to deserialize event: {e}"),
1367                    )
1368                })
1369            })
1370            .collect()
1371    }
1372
1373    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1374    async fn renew_session_lock(
1375        &self,
1376        owner_ids: &[&str],
1377        extend_for: Duration,
1378        idle_timeout: Duration,
1379    ) -> Result<usize, ProviderError> {
1380        if owner_ids.is_empty() {
1381            return Ok(0);
1382        }
1383
1384        let now_ms = Self::now_millis();
1385        let extend_ms = extend_for.as_millis() as i64;
1386        let idle_timeout_ms = idle_timeout.as_millis() as i64;
1387        let owner_ids_vec: Vec<&str> = owner_ids.to_vec();
1388
1389        let result = sqlx::query_scalar::<_, i64>(&format!(
1390            "SELECT {}.renew_session_lock($1, $2, $3, $4)",
1391            self.schema_name
1392        ))
1393        .bind(&owner_ids_vec)
1394        .bind(now_ms)
1395        .bind(extend_ms)
1396        .bind(idle_timeout_ms)
1397        .fetch_one(&*self.pool)
1398        .await
1399        .map_err(|e| Self::sqlx_to_provider_error("renew_session_lock", e))?;
1400
1401        debug!(
1402            target = "duroxide::providers::postgres",
1403            operation = "renew_session_lock",
1404            owner_count = owner_ids.len(),
1405            sessions_renewed = result,
1406            "Session locks renewed"
1407        );
1408
1409        Ok(result as usize)
1410    }
1411
1412    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1413    async fn cleanup_orphaned_sessions(
1414        &self,
1415        _idle_timeout: Duration,
1416    ) -> Result<usize, ProviderError> {
1417        let now_ms = Self::now_millis();
1418
1419        let result = sqlx::query_scalar::<_, i64>(&format!(
1420            "SELECT {}.cleanup_orphaned_sessions($1)",
1421            self.schema_name
1422        ))
1423        .bind(now_ms)
1424        .fetch_one(&*self.pool)
1425        .await
1426        .map_err(|e| Self::sqlx_to_provider_error("cleanup_orphaned_sessions", e))?;
1427
1428        debug!(
1429            target = "duroxide::providers::postgres",
1430            operation = "cleanup_orphaned_sessions",
1431            sessions_cleaned = result,
1432            "Orphaned sessions cleaned up"
1433        );
1434
1435        Ok(result as usize)
1436    }
1437
1438    fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1439        Some(self)
1440    }
1441
1442    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1443    async fn get_custom_status(
1444        &self,
1445        instance: &str,
1446        last_seen_version: u64,
1447    ) -> Result<Option<(Option<String>, u64)>, ProviderError> {
1448        let row = sqlx::query_as::<_, (Option<String>, i64)>(&format!(
1449            "SELECT * FROM {}.get_custom_status($1, $2)",
1450            self.schema_name
1451        ))
1452        .bind(instance)
1453        .bind(last_seen_version as i64)
1454        .fetch_optional(&*self.pool)
1455        .await
1456        .map_err(|e| Self::sqlx_to_provider_error("get_custom_status", e))?;
1457
1458        match row {
1459            Some((custom_status, version)) => Ok(Some((custom_status, version as u64))),
1460            None => Ok(None),
1461        }
1462    }
1463
1464    async fn get_kv_value(
1465        &self,
1466        instance_id: &str,
1467        key: &str,
1468    ) -> Result<Option<String>, ProviderError> {
1469        let row: Option<(Option<String>, bool)> = sqlx::query_as(&format!(
1470            "SELECT * FROM {}.get_kv_value($1, $2)",
1471            self.schema_name
1472        ))
1473        .bind(instance_id)
1474        .bind(key)
1475        .fetch_optional(&*self.pool)
1476        .await
1477        .map_err(|e| Self::sqlx_to_provider_error("get_kv_value", e))?;
1478
1479        Ok(row.and_then(|(value, found)| if found { value } else { None }))
1480    }
1481
1482    async fn get_kv_all_values(
1483        &self,
1484        instance_id: &str,
1485    ) -> Result<std::collections::HashMap<String, String>, ProviderError> {
1486        let rows: Vec<(String, String)> = sqlx::query_as(&format!(
1487            "SELECT * FROM {}.get_kv_all_values($1)",
1488            self.schema_name
1489        ))
1490        .bind(instance_id)
1491        .fetch_all(&*self.pool)
1492        .await
1493        .map_err(|e| Self::sqlx_to_provider_error("get_kv_all_values", e))?;
1494
1495        Ok(rows.into_iter().collect())
1496    }
1497
1498    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1499    async fn get_instance_stats(&self, instance: &str) -> Result<Option<SystemStats>, ProviderError> {
1500        let row: Option<(bool, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1501            "SELECT * FROM {}.get_instance_stats($1)",
1502            self.schema_name
1503        ))
1504        .bind(instance)
1505        .fetch_optional(&*self.pool)
1506        .await
1507        .map_err(|e| Self::sqlx_to_provider_error("get_instance_stats", e))?;
1508
1509        match row {
1510            Some((true, history_event_count, history_size_bytes, queue_pending_count, kv_user_key_count, kv_total_value_bytes)) => {
1511                Ok(Some(SystemStats {
1512                    history_event_count: history_event_count as u64,
1513                    history_size_bytes: history_size_bytes as u64,
1514                    queue_pending_count: queue_pending_count as u64,
1515                    kv_user_key_count: kv_user_key_count as u64,
1516                    kv_total_value_bytes: kv_total_value_bytes as u64,
1517                }))
1518            }
1519            _ => Ok(None),
1520        }
1521    }
1522}
1523
1524#[async_trait::async_trait]
1525impl ProviderAdmin for PostgresProvider {
1526    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1527    async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1528        sqlx::query_scalar(&format!(
1529            "SELECT instance_id FROM {}.list_instances()",
1530            self.schema_name
1531        ))
1532        .fetch_all(&*self.pool)
1533        .await
1534        .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1535    }
1536
1537    #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1538    async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1539        sqlx::query_scalar(&format!(
1540            "SELECT instance_id FROM {}.list_instances_by_status($1)",
1541            self.schema_name
1542        ))
1543        .bind(status)
1544        .fetch_all(&*self.pool)
1545        .await
1546        .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1547    }
1548
1549    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1550    async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1551        let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1552            "SELECT execution_id FROM {}.list_executions($1)",
1553            self.schema_name
1554        ))
1555        .bind(instance)
1556        .fetch_all(&*self.pool)
1557        .await
1558        .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1559
1560        Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1561    }
1562
1563    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1564    async fn read_history_with_execution_id(
1565        &self,
1566        instance: &str,
1567        execution_id: u64,
1568    ) -> Result<Vec<Event>, ProviderError> {
1569        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1570            "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1571            self.schema_name
1572        ))
1573        .bind(instance)
1574        .bind(execution_id as i64)
1575        .fetch_all(&*self.pool)
1576        .await
1577        .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1578
1579        event_data_rows
1580            .into_iter()
1581            .map(|event_data| {
1582                serde_json::from_str::<Event>(&event_data).map_err(|e| {
1583                    ProviderError::permanent(
1584                        "read_history_with_execution_id",
1585                        format!("Failed to deserialize event: {e}"),
1586                    )
1587                })
1588            })
1589            .collect()
1590    }
1591
1592    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1593    async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1594        let execution_id = self.latest_execution_id(instance).await?;
1595        self.read_history_with_execution_id(instance, execution_id)
1596            .await
1597    }
1598
1599    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1600    async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1601        sqlx::query_scalar(&format!(
1602            "SELECT {}.latest_execution_id($1)",
1603            self.schema_name
1604        ))
1605        .bind(instance)
1606        .fetch_optional(&*self.pool)
1607        .await
1608        .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1609        .map(|id: i64| id as u64)
1610        .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1611    }
1612
1613    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1614    async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1615        let row: Option<(
1616            String,
1617            String,
1618            String,
1619            i64,
1620            chrono::DateTime<Utc>,
1621            Option<chrono::DateTime<Utc>>,
1622            Option<String>,
1623            Option<String>,
1624            Option<String>,
1625        )> = sqlx::query_as(&format!(
1626            "SELECT * FROM {}.get_instance_info($1)",
1627            self.schema_name
1628        ))
1629        .bind(instance)
1630        .fetch_optional(&*self.pool)
1631        .await
1632        .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1633
1634        let (
1635            instance_id,
1636            orchestration_name,
1637            orchestration_version,
1638            current_execution_id,
1639            created_at,
1640            updated_at,
1641            status,
1642            output,
1643            parent_instance_id,
1644        ) =
1645            row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1646
1647        Ok(InstanceInfo {
1648            instance_id,
1649            orchestration_name,
1650            orchestration_version,
1651            current_execution_id: current_execution_id as u64,
1652            status: status.unwrap_or_else(|| "Running".to_string()),
1653            output,
1654            created_at: created_at.timestamp_millis() as u64,
1655            updated_at: updated_at
1656                .map(|dt| dt.timestamp_millis() as u64)
1657                .unwrap_or(created_at.timestamp_millis() as u64),
1658            parent_instance_id,
1659        })
1660    }
1661
1662    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1663    async fn get_execution_info(
1664        &self,
1665        instance: &str,
1666        execution_id: u64,
1667    ) -> Result<ExecutionInfo, ProviderError> {
1668        let row: Option<(
1669            i64,
1670            String,
1671            Option<String>,
1672            chrono::DateTime<Utc>,
1673            Option<chrono::DateTime<Utc>>,
1674            i64,
1675        )> = sqlx::query_as(&format!(
1676            "SELECT * FROM {}.get_execution_info($1, $2)",
1677            self.schema_name
1678        ))
1679        .bind(instance)
1680        .bind(execution_id as i64)
1681        .fetch_optional(&*self.pool)
1682        .await
1683        .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1684
1685        let (exec_id, status, output, started_at, completed_at, event_count) = row
1686            .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1687
1688        Ok(ExecutionInfo {
1689            execution_id: exec_id as u64,
1690            status,
1691            output,
1692            started_at: started_at.timestamp_millis() as u64,
1693            completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1694            event_count: event_count as usize,
1695        })
1696    }
1697
1698    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1699    async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1700        let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1701            "SELECT * FROM {}.get_system_metrics()",
1702            self.schema_name
1703        ))
1704        .fetch_optional(&*self.pool)
1705        .await
1706        .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1707
1708        let (
1709            total_instances,
1710            total_executions,
1711            running_instances,
1712            completed_instances,
1713            failed_instances,
1714            total_events,
1715        ) = row.ok_or_else(|| {
1716            ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1717        })?;
1718
1719        Ok(SystemMetrics {
1720            total_instances: total_instances as u64,
1721            total_executions: total_executions as u64,
1722            running_instances: running_instances as u64,
1723            completed_instances: completed_instances as u64,
1724            failed_instances: failed_instances as u64,
1725            total_events: total_events as u64,
1726        })
1727    }
1728
1729    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1730    async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1731        let now_ms = Self::now_millis();
1732
1733        let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1734            "SELECT * FROM {}.get_queue_depths($1)",
1735            self.schema_name
1736        ))
1737        .bind(now_ms)
1738        .fetch_optional(&*self.pool)
1739        .await
1740        .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1741
1742        let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1743            ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1744        })?;
1745
1746        Ok(QueueDepths {
1747            orchestrator_queue: orchestrator_queue as usize,
1748            worker_queue: worker_queue as usize,
1749            timer_queue: 0, // Timers are in orchestrator queue with delayed visibility
1750        })
1751    }
1752
1753    // ===== Hierarchy Primitive Operations =====
1754
1755    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1756    async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError> {
1757        sqlx::query_scalar(&format!(
1758            "SELECT child_instance_id FROM {}.list_children($1)",
1759            self.schema_name
1760        ))
1761        .bind(instance_id)
1762        .fetch_all(&*self.pool)
1763        .await
1764        .map_err(|e| Self::sqlx_to_provider_error("list_children", e))
1765    }
1766
1767    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1768    async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError> {
1769        // The stored procedure raises an exception if instance doesn't exist
1770        // Otherwise returns the parent_instance_id (which may be NULL)
1771        let result: Result<Option<String>, _> =
1772            sqlx::query_scalar(&format!("SELECT {}.get_parent_id($1)", self.schema_name))
1773                .bind(instance_id)
1774                .fetch_one(&*self.pool)
1775                .await;
1776
1777        match result {
1778            Ok(parent_id) => Ok(parent_id),
1779            Err(e) => {
1780                let err_str = e.to_string();
1781                if err_str.contains("Instance not found") {
1782                    Err(ProviderError::permanent(
1783                        "get_parent_id",
1784                        format!("Instance not found: {}", instance_id),
1785                    ))
1786                } else {
1787                    Err(Self::sqlx_to_provider_error("get_parent_id", e))
1788                }
1789            }
1790        }
1791    }
1792
1793    // ===== Deletion Operations =====
1794
1795    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1796    async fn delete_instances_atomic(
1797        &self,
1798        ids: &[String],
1799        force: bool,
1800    ) -> Result<DeleteInstanceResult, ProviderError> {
1801        if ids.is_empty() {
1802            return Ok(DeleteInstanceResult::default());
1803        }
1804
1805        let row: Option<(i64, i64, i64, i64)> = sqlx::query_as(&format!(
1806            "SELECT * FROM {}.delete_instances_atomic($1, $2)",
1807            self.schema_name
1808        ))
1809        .bind(ids)
1810        .bind(force)
1811        .fetch_optional(&*self.pool)
1812        .await
1813        .map_err(|e| {
1814            let err_str = e.to_string();
1815            if err_str.contains("is Running") {
1816                ProviderError::permanent("delete_instances_atomic", err_str)
1817            } else if err_str.contains("Orphan detected") {
1818                ProviderError::permanent("delete_instances_atomic", err_str)
1819            } else {
1820                Self::sqlx_to_provider_error("delete_instances_atomic", e)
1821            }
1822        })?;
1823
1824        let (instances_deleted, executions_deleted, events_deleted, queue_messages_deleted) =
1825            row.unwrap_or((0, 0, 0, 0));
1826
1827        debug!(
1828            target = "duroxide::providers::postgres",
1829            operation = "delete_instances_atomic",
1830            instances_deleted = instances_deleted,
1831            executions_deleted = executions_deleted,
1832            events_deleted = events_deleted,
1833            queue_messages_deleted = queue_messages_deleted,
1834            "Deleted instances atomically"
1835        );
1836
1837        Ok(DeleteInstanceResult {
1838            instances_deleted: instances_deleted as u64,
1839            executions_deleted: executions_deleted as u64,
1840            events_deleted: events_deleted as u64,
1841            queue_messages_deleted: queue_messages_deleted as u64,
1842        })
1843    }
1844
1845    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1846    async fn delete_instance_bulk(
1847        &self,
1848        filter: InstanceFilter,
1849    ) -> Result<DeleteInstanceResult, ProviderError> {
1850        // Build query to find matching root instances in terminal states
1851        let mut sql = format!(
1852            r#"
1853            SELECT i.instance_id
1854            FROM {}.instances i
1855            LEFT JOIN {}.executions e ON i.instance_id = e.instance_id 
1856              AND i.current_execution_id = e.execution_id
1857            WHERE i.parent_instance_id IS NULL
1858              AND e.status IN ('Completed', 'Failed', 'ContinuedAsNew')
1859            "#,
1860            self.schema_name, self.schema_name
1861        );
1862
1863        // Add instance_ids filter if provided
1864        if let Some(ref ids) = filter.instance_ids {
1865            if ids.is_empty() {
1866                return Ok(DeleteInstanceResult::default());
1867            }
1868            let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1869            sql.push_str(&format!(
1870                " AND i.instance_id IN ({})",
1871                placeholders.join(", ")
1872            ));
1873        }
1874
1875        // Add completed_before filter if provided
1876        if filter.completed_before.is_some() {
1877            let param_num = filter
1878                .instance_ids
1879                .as_ref()
1880                .map(|ids| ids.len())
1881                .unwrap_or(0)
1882                + 1;
1883            sql.push_str(&format!(
1884                " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1885                param_num
1886            ));
1887        }
1888
1889        // Add limit
1890        let limit = filter.limit.unwrap_or(1000);
1891        let limit_param_num = filter
1892            .instance_ids
1893            .as_ref()
1894            .map(|ids| ids.len())
1895            .unwrap_or(0)
1896            + if filter.completed_before.is_some() {
1897                1
1898            } else {
1899                0
1900            }
1901            + 1;
1902        sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1903
1904        // Build and execute query
1905        let mut query = sqlx::query_scalar::<_, String>(&sql);
1906        if let Some(ref ids) = filter.instance_ids {
1907            for id in ids {
1908                query = query.bind(id);
1909            }
1910        }
1911        if let Some(completed_before) = filter.completed_before {
1912            query = query.bind(completed_before as i64);
1913        }
1914        query = query.bind(limit as i64);
1915
1916        let instance_ids: Vec<String> = query
1917            .fetch_all(&*self.pool)
1918            .await
1919            .map_err(|e| Self::sqlx_to_provider_error("delete_instance_bulk", e))?;
1920
1921        if instance_ids.is_empty() {
1922            return Ok(DeleteInstanceResult::default());
1923        }
1924
1925        // Delete each instance with cascade
1926        let mut result = DeleteInstanceResult::default();
1927
1928        for instance_id in &instance_ids {
1929            // Get full tree for this root
1930            let tree = self.get_instance_tree(instance_id).await?;
1931
1932            // Atomic delete (tree.all_ids is already in deletion order: children first)
1933            let delete_result = self.delete_instances_atomic(&tree.all_ids, true).await?;
1934            result.instances_deleted += delete_result.instances_deleted;
1935            result.executions_deleted += delete_result.executions_deleted;
1936            result.events_deleted += delete_result.events_deleted;
1937            result.queue_messages_deleted += delete_result.queue_messages_deleted;
1938        }
1939
1940        debug!(
1941            target = "duroxide::providers::postgres",
1942            operation = "delete_instance_bulk",
1943            instances_deleted = result.instances_deleted,
1944            executions_deleted = result.executions_deleted,
1945            events_deleted = result.events_deleted,
1946            queue_messages_deleted = result.queue_messages_deleted,
1947            "Bulk deleted instances"
1948        );
1949
1950        Ok(result)
1951    }
1952
1953    // ===== Pruning Operations =====
1954
1955    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1956    async fn prune_executions(
1957        &self,
1958        instance_id: &str,
1959        options: PruneOptions,
1960    ) -> Result<PruneResult, ProviderError> {
1961        let keep_last: Option<i32> = options.keep_last.map(|v| v as i32);
1962        let completed_before_ms: Option<i64> = options.completed_before.map(|v| v as i64);
1963
1964        let row: Option<(i64, i64, i64)> = sqlx::query_as(&format!(
1965            "SELECT * FROM {}.prune_executions($1, $2, $3)",
1966            self.schema_name
1967        ))
1968        .bind(instance_id)
1969        .bind(keep_last)
1970        .bind(completed_before_ms)
1971        .fetch_optional(&*self.pool)
1972        .await
1973        .map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?;
1974
1975        let (instances_processed, executions_deleted, events_deleted) = row.unwrap_or((0, 0, 0));
1976
1977        debug!(
1978            target = "duroxide::providers::postgres",
1979            operation = "prune_executions",
1980            instance_id = %instance_id,
1981            instances_processed = instances_processed,
1982            executions_deleted = executions_deleted,
1983            events_deleted = events_deleted,
1984            "Pruned executions"
1985        );
1986
1987        Ok(PruneResult {
1988            instances_processed: instances_processed as u64,
1989            executions_deleted: executions_deleted as u64,
1990            events_deleted: events_deleted as u64,
1991        })
1992    }
1993
1994    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1995    async fn prune_executions_bulk(
1996        &self,
1997        filter: InstanceFilter,
1998        options: PruneOptions,
1999    ) -> Result<PruneResult, ProviderError> {
2000        // Find matching instances (all statuses - prune_executions protects current execution)
2001        // Note: We include Running instances because long-running orchestrations (e.g., with
2002        // ContinueAsNew) may have old executions that need pruning. The underlying prune_executions
2003        // call safely skips the current execution regardless of its status.
2004        let mut sql = format!(
2005            r#"
2006            SELECT i.instance_id
2007            FROM {}.instances i
2008            LEFT JOIN {}.executions e ON i.instance_id = e.instance_id 
2009              AND i.current_execution_id = e.execution_id
2010            WHERE 1=1
2011            "#,
2012            self.schema_name, self.schema_name
2013        );
2014
2015        // Add instance_ids filter if provided
2016        if let Some(ref ids) = filter.instance_ids {
2017            if ids.is_empty() {
2018                return Ok(PruneResult::default());
2019            }
2020            let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
2021            sql.push_str(&format!(
2022                " AND i.instance_id IN ({})",
2023                placeholders.join(", ")
2024            ));
2025        }
2026
2027        // Add completed_before filter if provided
2028        if filter.completed_before.is_some() {
2029            let param_num = filter
2030                .instance_ids
2031                .as_ref()
2032                .map(|ids| ids.len())
2033                .unwrap_or(0)
2034                + 1;
2035            sql.push_str(&format!(
2036                " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
2037                param_num
2038            ));
2039        }
2040
2041        // Add limit
2042        let limit = filter.limit.unwrap_or(1000);
2043        let limit_param_num = filter
2044            .instance_ids
2045            .as_ref()
2046            .map(|ids| ids.len())
2047            .unwrap_or(0)
2048            + if filter.completed_before.is_some() {
2049                1
2050            } else {
2051                0
2052            }
2053            + 1;
2054        sql.push_str(&format!(" LIMIT ${}", limit_param_num));
2055
2056        // Build and execute query
2057        let mut query = sqlx::query_scalar::<_, String>(&sql);
2058        if let Some(ref ids) = filter.instance_ids {
2059            for id in ids {
2060                query = query.bind(id);
2061            }
2062        }
2063        if let Some(completed_before) = filter.completed_before {
2064            query = query.bind(completed_before as i64);
2065        }
2066        query = query.bind(limit as i64);
2067
2068        let instance_ids: Vec<String> = query
2069            .fetch_all(&*self.pool)
2070            .await
2071            .map_err(|e| Self::sqlx_to_provider_error("prune_executions_bulk", e))?;
2072
2073        // Prune each instance
2074        let mut result = PruneResult::default();
2075
2076        for instance_id in &instance_ids {
2077            let single_result = self.prune_executions(instance_id, options.clone()).await?;
2078            result.instances_processed += single_result.instances_processed;
2079            result.executions_deleted += single_result.executions_deleted;
2080            result.events_deleted += single_result.events_deleted;
2081        }
2082
2083        debug!(
2084            target = "duroxide::providers::postgres",
2085            operation = "prune_executions_bulk",
2086            instances_processed = result.instances_processed,
2087            executions_deleted = result.executions_deleted,
2088            events_deleted = result.events_deleted,
2089            "Bulk pruned executions"
2090        );
2091
2092        Ok(result)
2093    }
2094}