Skip to main content

duroxide_pg/
provider.rs

1use anyhow::Result;
2use chrono::{TimeZone, Utc};
3use duroxide::providers::{
4    DeleteInstanceResult, DispatcherCapabilityFilter, ExecutionInfo, ExecutionMetadata,
5    InstanceFilter, InstanceInfo, OrchestrationItem, Provider, ProviderAdmin, ProviderError,
6    PruneOptions, PruneResult, QueueDepths, ScheduledActivityIdentifier, SessionFetchConfig,
7    SystemMetrics, TagFilter, WorkItem,
8};
9use duroxide::{Event, EventKind, SystemStats};
10use sqlx::{postgres::PgPoolOptions, Error as SqlxError, PgPool};
11use std::sync::Arc;
12use std::time::Duration;
13use std::time::{SystemTime, UNIX_EPOCH};
14use tokio::time::sleep;
15use tracing::{debug, error, instrument, warn};
16
17use crate::migrations::MigrationRunner;
18
19/// PostgreSQL-based provider for Duroxide durable orchestrations.
20///
21/// Implements the [`Provider`] and [`ProviderAdmin`] traits from Duroxide,
22/// storing orchestration state, history, and work queues in PostgreSQL.
23///
24/// # Example
25///
26/// ```rust,no_run
27/// use duroxide_pg::PostgresProvider;
28///
29/// # async fn example() -> anyhow::Result<()> {
30/// // Connect using DATABASE_URL or explicit connection string
31/// let provider = PostgresProvider::new("postgres://localhost/mydb").await?;
32///
33/// // Or use a custom schema for isolation
34/// let provider = PostgresProvider::new_with_schema(
35///     "postgres://localhost/mydb",
36///     Some("my_app"),
37/// ).await?;
38/// # Ok(())
39/// # }
40/// ```
41pub struct PostgresProvider {
42    pool: Arc<PgPool>,
43    schema_name: String,
44}
45
46impl PostgresProvider {
47    pub async fn new(database_url: &str) -> Result<Self> {
48        Self::new_with_schema(database_url, None).await
49    }
50
51    pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result<Self> {
52        let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX")
53            .ok()
54            .and_then(|s| s.parse::<u32>().ok())
55            .unwrap_or(10);
56
57        let pool = PgPoolOptions::new()
58            .max_connections(max_connections)
59            .min_connections(1)
60            .acquire_timeout(std::time::Duration::from_secs(30))
61            .connect(database_url)
62            .await?;
63
64        let schema_name = schema_name.unwrap_or("public").to_string();
65
66        let provider = Self {
67            pool: Arc::new(pool),
68            schema_name: schema_name.clone(),
69        };
70
71        // Run migrations to initialize schema
72        let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone());
73        migration_runner.migrate().await?;
74
75        Ok(provider)
76    }
77
78    #[instrument(skip(self), target = "duroxide::providers::postgres")]
79    pub async fn initialize_schema(&self) -> Result<()> {
80        // Schema initialization is now handled by migrations
81        // This method is kept for backward compatibility but delegates to migrations
82        let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone());
83        migration_runner.migrate().await?;
84        Ok(())
85    }
86
87    /// Get current timestamp in milliseconds (Unix epoch)
88    fn now_millis() -> i64 {
89        SystemTime::now()
90            .duration_since(UNIX_EPOCH)
91            .unwrap()
92            .as_millis() as i64
93    }
94
95    /// Get schema-qualified table name
96    fn table_name(&self, table: &str) -> String {
97        format!("{}.{}", self.schema_name, table)
98    }
99
100    /// Get the database pool (for testing)
101    pub fn pool(&self) -> &PgPool {
102        &self.pool
103    }
104
105    /// Get the schema name (for testing)
106    pub fn schema_name(&self) -> &str {
107        &self.schema_name
108    }
109
110    /// Convert sqlx::Error to ProviderError with proper classification
111    fn sqlx_to_provider_error(operation: &str, e: SqlxError) -> ProviderError {
112        match e {
113            SqlxError::Database(ref db_err) => {
114                // PostgreSQL error codes
115                let code_opt = db_err.code();
116                let code = code_opt.as_deref();
117                if code == Some("40P01") {
118                    // Deadlock detected
119                    ProviderError::retryable(operation, format!("Deadlock detected: {e}"))
120                } else if code == Some("40001") {
121                    // Serialization failure - permanent error (transaction conflict, not transient)
122                    ProviderError::permanent(operation, format!("Serialization failure: {e}"))
123                } else if code == Some("23505") {
124                    // Unique constraint violation (duplicate event)
125                    ProviderError::permanent(operation, format!("Duplicate detected: {e}"))
126                } else if code == Some("23503") {
127                    // Foreign key constraint violation
128                    ProviderError::permanent(operation, format!("Foreign key violation: {e}"))
129                } else {
130                    ProviderError::permanent(operation, format!("Database error: {e}"))
131                }
132            }
133            SqlxError::PoolClosed | SqlxError::PoolTimedOut => {
134                ProviderError::retryable(operation, format!("Connection pool error: {e}"))
135            }
136            SqlxError::Io(_) => ProviderError::retryable(operation, format!("I/O error: {e}")),
137            _ => ProviderError::permanent(operation, format!("Unexpected error: {e}")),
138        }
139    }
140
141    /// Convert TagFilter to SQL parameters (mode string + tag array)
142    fn tag_filter_to_sql(filter: &TagFilter) -> (&'static str, Vec<String>) {
143        match filter {
144            TagFilter::DefaultOnly => ("default_only", vec![]),
145            TagFilter::Tags(set) => {
146                let mut tags: Vec<String> = set.iter().cloned().collect();
147                tags.sort();
148                ("tags", tags)
149            }
150            TagFilter::DefaultAnd(set) => {
151                let mut tags: Vec<String> = set.iter().cloned().collect();
152                tags.sort();
153                ("default_and", tags)
154            }
155            TagFilter::Any => ("any", vec![]),
156            TagFilter::None => ("none", vec![]),
157        }
158    }
159
160    /// Clean up schema after tests (drops all tables and optionally the schema)
161    ///
162    /// **SAFETY**: Never drops the "public" schema itself, only tables within it.
163    /// Only drops the schema if it's a custom schema (not "public").
164    pub async fn cleanup_schema(&self) -> Result<()> {
165        const MAX_RETRIES: u32 = 5;
166        const BASE_RETRY_DELAY_MS: u64 = 50;
167
168        for attempt in 0..=MAX_RETRIES {
169            let cleanup_result = async {
170                // Call the stored procedure to drop all tables
171                sqlx::query(&format!("SELECT {}.cleanup_schema()", self.schema_name))
172                    .execute(&*self.pool)
173                    .await?;
174
175                // SAFETY: Never drop the "public" schema - it's a PostgreSQL system schema
176                // Only drop custom schemas created for testing
177                if self.schema_name != "public" {
178                    sqlx::query(&format!(
179                        "DROP SCHEMA IF EXISTS {} CASCADE",
180                        self.schema_name
181                    ))
182                    .execute(&*self.pool)
183                    .await?;
184                } else {
185                    // Explicit safeguard: we only drop tables from public schema, never the schema itself
186                    // This ensures we don't accidentally drop the default PostgreSQL schema
187                }
188
189                Ok::<(), SqlxError>(())
190            }
191            .await;
192
193            match cleanup_result {
194                Ok(()) => return Ok(()),
195                Err(SqlxError::Database(db_err)) if db_err.code().as_deref() == Some("40P01") => {
196                    if attempt < MAX_RETRIES {
197                        warn!(
198                            target = "duroxide::providers::postgres",
199                            schema = %self.schema_name,
200                            attempt = attempt + 1,
201                            "Deadlock during cleanup_schema, retrying"
202                        );
203                        sleep(Duration::from_millis(
204                            BASE_RETRY_DELAY_MS * (attempt as u64 + 1),
205                        ))
206                        .await;
207                        continue;
208                    }
209                    return Err(anyhow::anyhow!(db_err.to_string()));
210                }
211                Err(e) => return Err(anyhow::anyhow!(e.to_string())),
212            }
213        }
214
215        Ok(())
216    }
217}
218
219#[async_trait::async_trait]
220impl Provider for PostgresProvider {
221    fn name(&self) -> &str {
222        "duroxide-pg"
223    }
224
225    fn version(&self) -> &str {
226        env!("CARGO_PKG_VERSION")
227    }
228
229    #[instrument(skip(self), target = "duroxide::providers::postgres")]
230    async fn fetch_orchestration_item(
231        &self,
232        lock_timeout: Duration,
233        _poll_timeout: Duration,
234        filter: Option<&DispatcherCapabilityFilter>,
235    ) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
236        let start = std::time::Instant::now();
237
238        const MAX_RETRIES: u32 = 3;
239        const RETRY_DELAY_MS: u64 = 50;
240
241        // Convert Duration to milliseconds
242        let lock_timeout_ms = lock_timeout.as_millis() as i64;
243        let mut _last_error: Option<ProviderError> = None;
244
245        // Extract version filter from capability filter
246        let (min_packed, max_packed) = if let Some(f) = filter {
247            if let Some(range) = f.supported_duroxide_versions.first() {
248                let min = range.min.major as i64 * 1_000_000
249                    + range.min.minor as i64 * 1_000
250                    + range.min.patch as i64;
251                let max = range.max.major as i64 * 1_000_000
252                    + range.max.minor as i64 * 1_000
253                    + range.max.patch as i64;
254                (Some(min), Some(max))
255            } else {
256                // Empty supported_duroxide_versions = supports nothing
257                return Ok(None);
258            }
259        } else {
260            (None, None)
261        };
262
263        for attempt in 0..=MAX_RETRIES {
264            let now_ms = Self::now_millis();
265
266            let result: Result<
267                Option<(
268                    String,
269                    String,
270                    String,
271                    i64,
272                    serde_json::Value,
273                    serde_json::Value,
274                    String,
275                    i32,
276                    serde_json::Value,
277                )>,
278                SqlxError,
279            > = sqlx::query_as(&format!(
280                "SELECT * FROM {}.fetch_orchestration_item($1, $2, $3, $4)",
281                self.schema_name
282            ))
283            .bind(now_ms)
284            .bind(lock_timeout_ms)
285            .bind(min_packed)
286            .bind(max_packed)
287            .fetch_optional(&*self.pool)
288            .await;
289
290            let row = match result {
291                Ok(r) => r,
292                Err(e) => {
293                    let provider_err = Self::sqlx_to_provider_error("fetch_orchestration_item", e);
294                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
295                        warn!(
296                            target = "duroxide::providers::postgres",
297                            operation = "fetch_orchestration_item",
298                            attempt = attempt + 1,
299                            error = %provider_err,
300                            "Retryable error, will retry"
301                        );
302                        _last_error = Some(provider_err);
303                        sleep(std::time::Duration::from_millis(
304                            RETRY_DELAY_MS * (attempt as u64 + 1),
305                        ))
306                        .await;
307                        continue;
308                    }
309                    return Err(provider_err);
310                }
311            };
312
313            if let Some((
314                instance_id,
315                orchestration_name,
316                orchestration_version,
317                execution_id,
318                history_json,
319                messages_json,
320                lock_token,
321                attempt_count,
322                kv_snapshot_json,
323            )) = row
324            {
325                let (history, history_error) =
326                    match serde_json::from_value::<Vec<Event>>(history_json) {
327                        Ok(h) => (h, None),
328                        Err(e) => {
329                            let error_msg = format!("Failed to deserialize history: {e}");
330                            warn!(
331                                target = "duroxide::providers::postgres",
332                                instance = %instance_id,
333                                error = %error_msg,
334                                "History deserialization failed, returning item with history_error"
335                            );
336                            (vec![], Some(error_msg))
337                        }
338                    };
339
340                let messages: Vec<WorkItem> =
341                    serde_json::from_value(messages_json).map_err(|e| {
342                        ProviderError::permanent(
343                            "fetch_orchestration_item",
344                            format!("Failed to deserialize messages: {e}"),
345                        )
346                    })?;
347                let kv_snapshot: std::collections::HashMap<String, duroxide::providers::KvEntry> = {
348                    let raw: std::collections::HashMap<String, serde_json::Value> =
349                        serde_json::from_value(kv_snapshot_json).unwrap_or_default();
350                    raw.into_iter()
351                        .filter_map(|(k, v)| {
352                            let value = v.get("value")?.as_str()?.to_string();
353                            let last_updated_at_ms =
354                                v.get("last_updated_at_ms")?.as_u64().unwrap_or(0);
355                            Some((
356                                k,
357                                duroxide::providers::KvEntry {
358                                    value,
359                                    last_updated_at_ms,
360                                },
361                            ))
362                        })
363                        .collect()
364                };
365
366                let duration_ms = start.elapsed().as_millis() as u64;
367                debug!(
368                    target = "duroxide::providers::postgres",
369                    operation = "fetch_orchestration_item",
370                    instance_id = %instance_id,
371                    execution_id = execution_id,
372                    message_count = messages.len(),
373                    history_count = history.len(),
374                    attempt_count = attempt_count,
375                    duration_ms = duration_ms,
376                    attempts = attempt + 1,
377                    "Fetched orchestration item via stored procedure"
378                );
379
380                // Orphan queue messages: if orchestration_name is "Unknown", there's
381                // no history, and ALL messages are QueueMessage items, these are orphan
382                // events enqueued before the orchestration started. Drop them by acking
383                // with empty deltas. Other work items (CancelInstance, etc.) may
384                // legitimately race with StartOrchestration and must not be dropped.
385                if orchestration_name == "Unknown"
386                    && history.is_empty()
387                    && messages
388                        .iter()
389                        .all(|m| matches!(m, WorkItem::QueueMessage { .. }))
390                {
391                    let message_count = messages.len();
392                    tracing::warn!(
393                        target = "duroxide::providers::postgres",
394                        instance = %instance_id,
395                        message_count,
396                        "Dropping orphan queue messages — events enqueued before orchestration started are not supported"
397                    );
398                    self.ack_orchestration_item(
399                        &lock_token,
400                        execution_id as u64,
401                        vec![],
402                        vec![],
403                        vec![],
404                        ExecutionMetadata::default(),
405                        vec![],
406                    )
407                    .await?;
408                    return Ok(None);
409                }
410
411                return Ok(Some((
412                    OrchestrationItem {
413                        instance: instance_id,
414                        orchestration_name,
415                        execution_id: execution_id as u64,
416                        version: orchestration_version,
417                        history,
418                        messages,
419                        history_error,
420                        kv_snapshot,
421                    },
422                    lock_token,
423                    attempt_count as u32,
424                )));
425            }
426
427            // No result found - return immediately (short polling behavior)
428            // Only retry with delay on retryable errors (handled above)
429            return Ok(None);
430        }
431
432        Ok(None)
433    }
434    #[instrument(skip(self), fields(lock_token = %lock_token, execution_id = execution_id), target = "duroxide::providers::postgres")]
435    async fn ack_orchestration_item(
436        &self,
437        lock_token: &str,
438        execution_id: u64,
439        history_delta: Vec<Event>,
440        worker_items: Vec<WorkItem>,
441        orchestrator_items: Vec<WorkItem>,
442        metadata: ExecutionMetadata,
443        cancelled_activities: Vec<ScheduledActivityIdentifier>,
444    ) -> Result<(), ProviderError> {
445        let start = std::time::Instant::now();
446
447        const MAX_RETRIES: u32 = 3;
448        const RETRY_DELAY_MS: u64 = 50;
449
450        let mut history_delta_payload = Vec::with_capacity(history_delta.len());
451        for event in &history_delta {
452            if event.event_id() == 0 {
453                return Err(ProviderError::permanent(
454                    "ack_orchestration_item",
455                    "event_id must be set by runtime",
456                ));
457            }
458
459            let event_json = serde_json::to_string(event).map_err(|e| {
460                ProviderError::permanent(
461                    "ack_orchestration_item",
462                    format!("Failed to serialize event: {e}"),
463                )
464            })?;
465
466            let event_type = format!("{event:?}")
467                .split('{')
468                .next()
469                .unwrap_or("Unknown")
470                .trim()
471                .to_string();
472
473            history_delta_payload.push(serde_json::json!({
474                "event_id": event.event_id(),
475                "event_type": event_type,
476                "event_data": event_json,
477            }));
478        }
479
480        let history_delta_json = serde_json::Value::Array(history_delta_payload);
481
482        let worker_items_json = serde_json::to_value(&worker_items).map_err(|e| {
483            ProviderError::permanent(
484                "ack_orchestration_item",
485                format!("Failed to serialize worker items: {e}"),
486            )
487        })?;
488
489        let orchestrator_items_json = serde_json::to_value(&orchestrator_items).map_err(|e| {
490            ProviderError::permanent(
491                "ack_orchestration_item",
492                format!("Failed to serialize orchestrator items: {e}"),
493            )
494        })?;
495
496        // Scan history_delta for the last CustomStatusUpdated event
497        let (custom_status_action, custom_status_value): (Option<&str>, Option<&str>) = {
498            let mut last_status: Option<&Option<String>> = None;
499            for event in &history_delta {
500                if let EventKind::CustomStatusUpdated { ref status } = event.kind {
501                    last_status = Some(status);
502                }
503            }
504            match last_status {
505                Some(Some(s)) => (Some("set"), Some(s.as_str())),
506                Some(None) => (Some("clear"), None),
507                None => (None, None),
508            }
509        };
510
511        let kv_mutations: Vec<serde_json::Value> = history_delta
512            .iter()
513            .filter_map(|event| match &event.kind {
514                EventKind::KeyValueSet {
515                    key,
516                    value,
517                    last_updated_at_ms,
518                } => Some(serde_json::json!({
519                    "action": "set",
520                    "key": key,
521                    "value": value,
522                    "last_updated_at_ms": last_updated_at_ms,
523                })),
524                EventKind::KeyValueCleared { key } => Some(serde_json::json!({
525                    "action": "clear_key",
526                    "key": key,
527                })),
528                EventKind::KeyValuesCleared => Some(serde_json::json!({
529                    "action": "clear_all",
530                })),
531                _ => None,
532            })
533            .collect();
534
535        let metadata_json = serde_json::json!({
536            "orchestration_name": metadata.orchestration_name,
537            "orchestration_version": metadata.orchestration_version,
538            "status": metadata.status,
539            "output": metadata.output,
540            "parent_instance_id": metadata.parent_instance_id,
541            "pinned_duroxide_version": metadata.pinned_duroxide_version.as_ref().map(|v| {
542                serde_json::json!({
543                    "major": v.major,
544                    "minor": v.minor,
545                    "patch": v.patch,
546                })
547            }),
548            "custom_status_action": custom_status_action,
549            "custom_status_value": custom_status_value,
550            "kv_mutations": kv_mutations,
551        });
552
553        // Serialize cancelled activities for lock stealing
554        let cancelled_activities_json: Vec<serde_json::Value> = cancelled_activities
555            .iter()
556            .map(|a| {
557                serde_json::json!({
558                    "instance": a.instance,
559                    "execution_id": a.execution_id,
560                    "activity_id": a.activity_id,
561                })
562            })
563            .collect();
564        let cancelled_activities_json = serde_json::Value::Array(cancelled_activities_json);
565
566        for attempt in 0..=MAX_RETRIES {
567            let now_ms = Self::now_millis();
568            let result = sqlx::query(&format!(
569                "SELECT {}.ack_orchestration_item($1, $2, $3, $4, $5, $6, $7, $8)",
570                self.schema_name
571            ))
572            .bind(lock_token)
573            .bind(now_ms)
574            .bind(execution_id as i64)
575            .bind(&history_delta_json)
576            .bind(&worker_items_json)
577            .bind(&orchestrator_items_json)
578            .bind(&metadata_json)
579            .bind(&cancelled_activities_json)
580            .execute(&*self.pool)
581            .await;
582
583            match result {
584                Ok(_) => {
585                    let duration_ms = start.elapsed().as_millis() as u64;
586                    debug!(
587                        target = "duroxide::providers::postgres",
588                        operation = "ack_orchestration_item",
589                        execution_id = execution_id,
590                        history_count = history_delta.len(),
591                        worker_items_count = worker_items.len(),
592                        orchestrator_items_count = orchestrator_items.len(),
593                        cancelled_activities_count = cancelled_activities.len(),
594                        duration_ms = duration_ms,
595                        attempts = attempt + 1,
596                        "Acknowledged orchestration item via stored procedure"
597                    );
598                    return Ok(());
599                }
600                Err(e) => {
601                    // Check for permanent errors first
602                    if let SqlxError::Database(db_err) = &e {
603                        if db_err.message().contains("Invalid lock token") {
604                            return Err(ProviderError::permanent(
605                                "ack_orchestration_item",
606                                "Invalid lock token",
607                            ));
608                        }
609                    } else if e.to_string().contains("Invalid lock token") {
610                        return Err(ProviderError::permanent(
611                            "ack_orchestration_item",
612                            "Invalid lock token",
613                        ));
614                    }
615
616                    let provider_err = Self::sqlx_to_provider_error("ack_orchestration_item", e);
617                    if provider_err.is_retryable() && attempt < MAX_RETRIES {
618                        warn!(
619                            target = "duroxide::providers::postgres",
620                            operation = "ack_orchestration_item",
621                            attempt = attempt + 1,
622                            error = %provider_err,
623                            "Retryable error, will retry"
624                        );
625                        sleep(std::time::Duration::from_millis(
626                            RETRY_DELAY_MS * (attempt as u64 + 1),
627                        ))
628                        .await;
629                        continue;
630                    }
631                    return Err(provider_err);
632                }
633            }
634        }
635
636        // Should never reach here, but just in case
637        Ok(())
638    }
639    #[instrument(skip(self), fields(lock_token = %lock_token), target = "duroxide::providers::postgres")]
640    async fn abandon_orchestration_item(
641        &self,
642        lock_token: &str,
643        delay: Option<Duration>,
644        ignore_attempt: bool,
645    ) -> Result<(), ProviderError> {
646        let start = std::time::Instant::now();
647        let now_ms = Self::now_millis();
648        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
649
650        let instance_id = match sqlx::query_scalar::<_, String>(&format!(
651            "SELECT {}.abandon_orchestration_item($1, $2, $3, $4)",
652            self.schema_name
653        ))
654        .bind(lock_token)
655        .bind(now_ms)
656        .bind(delay_param)
657        .bind(ignore_attempt)
658        .fetch_one(&*self.pool)
659        .await
660        {
661            Ok(instance_id) => instance_id,
662            Err(e) => {
663                if let SqlxError::Database(db_err) = &e {
664                    if db_err.message().contains("Invalid lock token") {
665                        return Err(ProviderError::permanent(
666                            "abandon_orchestration_item",
667                            "Invalid lock token",
668                        ));
669                    }
670                } else if e.to_string().contains("Invalid lock token") {
671                    return Err(ProviderError::permanent(
672                        "abandon_orchestration_item",
673                        "Invalid lock token",
674                    ));
675                }
676
677                return Err(Self::sqlx_to_provider_error(
678                    "abandon_orchestration_item",
679                    e,
680                ));
681            }
682        };
683
684        let duration_ms = start.elapsed().as_millis() as u64;
685        debug!(
686            target = "duroxide::providers::postgres",
687            operation = "abandon_orchestration_item",
688            instance_id = %instance_id,
689            delay_ms = delay.map(|d| d.as_millis() as u64),
690            ignore_attempt = ignore_attempt,
691            duration_ms = duration_ms,
692            "Abandoned orchestration item via stored procedure"
693        );
694
695        Ok(())
696    }
697
698    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
699    async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
700        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
701            "SELECT out_event_data FROM {}.fetch_history($1)",
702            self.schema_name
703        ))
704        .bind(instance)
705        .fetch_all(&*self.pool)
706        .await
707        .map_err(|e| Self::sqlx_to_provider_error("read", e))?;
708
709        event_data_rows
710            .into_iter()
711            .map(|event_data| {
712                serde_json::from_str::<Event>(&event_data).map_err(|e| {
713                    ProviderError::permanent("read", format!("Failed to deserialize event: {e}"))
714                })
715            })
716            .collect()
717    }
718
719    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
720    async fn append_with_execution(
721        &self,
722        instance: &str,
723        execution_id: u64,
724        new_events: Vec<Event>,
725    ) -> Result<(), ProviderError> {
726        if new_events.is_empty() {
727            return Ok(());
728        }
729
730        let mut events_payload = Vec::with_capacity(new_events.len());
731        for event in &new_events {
732            if event.event_id() == 0 {
733                error!(
734                    target = "duroxide::providers::postgres",
735                    operation = "append_with_execution",
736                    error_type = "validation_error",
737                    instance_id = %instance,
738                    execution_id = execution_id,
739                    "event_id must be set by runtime"
740                );
741                return Err(ProviderError::permanent(
742                    "append_with_execution",
743                    "event_id must be set by runtime",
744                ));
745            }
746
747            let event_json = serde_json::to_string(event).map_err(|e| {
748                ProviderError::permanent(
749                    "append_with_execution",
750                    format!("Failed to serialize event: {e}"),
751                )
752            })?;
753
754            let event_type = format!("{event:?}")
755                .split('{')
756                .next()
757                .unwrap_or("Unknown")
758                .trim()
759                .to_string();
760
761            events_payload.push(serde_json::json!({
762                "event_id": event.event_id(),
763                "event_type": event_type,
764                "event_data": event_json,
765            }));
766        }
767
768        let events_json = serde_json::Value::Array(events_payload);
769
770        sqlx::query(&format!(
771            "SELECT {}.append_history($1, $2, $3)",
772            self.schema_name
773        ))
774        .bind(instance)
775        .bind(execution_id as i64)
776        .bind(events_json)
777        .execute(&*self.pool)
778        .await
779        .map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
780
781        debug!(
782            target = "duroxide::providers::postgres",
783            operation = "append_with_execution",
784            instance_id = %instance,
785            execution_id = execution_id,
786            event_count = new_events.len(),
787            "Appended history events via stored procedure"
788        );
789
790        Ok(())
791    }
792
793    #[instrument(skip(self), target = "duroxide::providers::postgres")]
794    async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
795        let work_item = serde_json::to_string(&item).map_err(|e| {
796            ProviderError::permanent(
797                "enqueue_worker_work",
798                format!("Failed to serialize work item: {e}"),
799            )
800        })?;
801
802        let now_ms = Self::now_millis();
803
804        // Extract activity identification, session_id, and tag for ActivityExecute items
805        let (instance_id, execution_id, activity_id, session_id, tag) = match &item {
806            WorkItem::ActivityExecute {
807                instance,
808                execution_id,
809                id,
810                session_id,
811                tag,
812                ..
813            } => (
814                Some(instance.clone()),
815                Some(*execution_id as i64),
816                Some(*id as i64),
817                session_id.clone(),
818                tag.clone(),
819            ),
820            _ => (None, None, None, None, None),
821        };
822
823        sqlx::query(&format!(
824            "SELECT {}.enqueue_worker_work($1, $2, $3, $4, $5, $6, $7)",
825            self.schema_name
826        ))
827        .bind(work_item)
828        .bind(now_ms)
829        .bind(&instance_id)
830        .bind(execution_id)
831        .bind(activity_id)
832        .bind(&session_id)
833        .bind(&tag)
834        .execute(&*self.pool)
835        .await
836        .map_err(|e| {
837            error!(
838                target = "duroxide::providers::postgres",
839                operation = "enqueue_worker_work",
840                error_type = "database_error",
841                error = %e,
842                "Failed to enqueue worker work"
843            );
844            Self::sqlx_to_provider_error("enqueue_worker_work", e)
845        })?;
846
847        Ok(())
848    }
849
850    #[instrument(skip(self), target = "duroxide::providers::postgres")]
851    async fn fetch_work_item(
852        &self,
853        lock_timeout: Duration,
854        _poll_timeout: Duration,
855        session: Option<&SessionFetchConfig>,
856        tag_filter: &TagFilter,
857    ) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
858        // None filter means don't process any activities
859        if matches!(tag_filter, TagFilter::None) {
860            return Ok(None);
861        }
862
863        let start = std::time::Instant::now();
864
865        // Convert Duration to milliseconds
866        let lock_timeout_ms = lock_timeout.as_millis() as i64;
867
868        // Extract session parameters
869        let (owner_id, session_lock_timeout_ms): (Option<&str>, Option<i64>) = match session {
870            Some(config) => (
871                Some(&config.owner_id),
872                Some(config.lock_timeout.as_millis() as i64),
873            ),
874            None => (None, None),
875        };
876
877        // Convert TagFilter to SQL parameters
878        let (tag_mode, tag_names) = Self::tag_filter_to_sql(tag_filter);
879
880        let row = match sqlx::query_as::<_, (String, String, i32)>(&format!(
881            "SELECT * FROM {}.fetch_work_item($1, $2, $3, $4, $5, $6)",
882            self.schema_name
883        ))
884        .bind(Self::now_millis())
885        .bind(lock_timeout_ms)
886        .bind(owner_id)
887        .bind(session_lock_timeout_ms)
888        .bind(&tag_names)
889        .bind(tag_mode)
890        .fetch_optional(&*self.pool)
891        .await
892        {
893            Ok(row) => row,
894            Err(e) => {
895                return Err(Self::sqlx_to_provider_error("fetch_work_item", e));
896            }
897        };
898
899        let (work_item_json, lock_token, attempt_count) = match row {
900            Some(row) => row,
901            None => return Ok(None),
902        };
903
904        let work_item: WorkItem = serde_json::from_str(&work_item_json).map_err(|e| {
905            ProviderError::permanent(
906                "fetch_work_item",
907                format!("Failed to deserialize worker item: {e}"),
908            )
909        })?;
910
911        let duration_ms = start.elapsed().as_millis() as u64;
912
913        // Extract instance for logging - different work item types have different structures
914        let instance_id = match &work_item {
915            WorkItem::ActivityExecute { instance, .. } => instance.as_str(),
916            WorkItem::ActivityCompleted { instance, .. } => instance.as_str(),
917            WorkItem::ActivityFailed { instance, .. } => instance.as_str(),
918            WorkItem::StartOrchestration { instance, .. } => instance.as_str(),
919            WorkItem::TimerFired { instance, .. } => instance.as_str(),
920            WorkItem::ExternalRaised { instance, .. } => instance.as_str(),
921            WorkItem::CancelInstance { instance, .. } => instance.as_str(),
922            WorkItem::ContinueAsNew { instance, .. } => instance.as_str(),
923            WorkItem::SubOrchCompleted {
924                parent_instance, ..
925            } => parent_instance.as_str(),
926            WorkItem::SubOrchFailed {
927                parent_instance, ..
928            } => parent_instance.as_str(),
929            WorkItem::QueueMessage { instance, .. } => instance.as_str(),
930        };
931
932        debug!(
933            target = "duroxide::providers::postgres",
934            operation = "fetch_work_item",
935            instance_id = %instance_id,
936            attempt_count = attempt_count,
937            duration_ms = duration_ms,
938            "Fetched activity work item via stored procedure"
939        );
940
941        Ok(Some((work_item, lock_token, attempt_count as u32)))
942    }
943
944    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
945    async fn ack_work_item(
946        &self,
947        token: &str,
948        completion: Option<WorkItem>,
949    ) -> Result<(), ProviderError> {
950        let start = std::time::Instant::now();
951
952        // If no completion provided (e.g., cancelled activity), just delete the item
953        let Some(completion) = completion else {
954            let now_ms = Self::now_millis();
955            // Call ack_worker with NULL completion to delete without enqueueing
956            sqlx::query(&format!(
957                "SELECT {}.ack_worker($1, NULL, NULL, $2)",
958                self.schema_name
959            ))
960            .bind(token)
961            .bind(now_ms)
962            .execute(&*self.pool)
963            .await
964            .map_err(|e| {
965                if e.to_string().contains("Worker queue item not found") {
966                    ProviderError::permanent(
967                        "ack_worker",
968                        "Worker queue item not found or already processed",
969                    )
970                } else {
971                    Self::sqlx_to_provider_error("ack_worker", e)
972                }
973            })?;
974
975            let duration_ms = start.elapsed().as_millis() as u64;
976            debug!(
977                target = "duroxide::providers::postgres",
978                operation = "ack_worker",
979                token = %token,
980                duration_ms = duration_ms,
981                "Acknowledged worker without completion (cancelled)"
982            );
983            return Ok(());
984        };
985
986        // Extract instance ID from completion WorkItem
987        let instance_id = match &completion {
988            WorkItem::ActivityCompleted { instance, .. }
989            | WorkItem::ActivityFailed { instance, .. } => instance,
990            _ => {
991                error!(
992                    target = "duroxide::providers::postgres",
993                    operation = "ack_worker",
994                    error_type = "invalid_completion_type",
995                    "Invalid completion work item type"
996                );
997                return Err(ProviderError::permanent(
998                    "ack_worker",
999                    "Invalid completion work item type",
1000                ));
1001            }
1002        };
1003
1004        let completion_json = serde_json::to_string(&completion).map_err(|e| {
1005            ProviderError::permanent("ack_worker", format!("Failed to serialize completion: {e}"))
1006        })?;
1007
1008        let now_ms = Self::now_millis();
1009
1010        // Call stored procedure to atomically delete worker item and enqueue completion
1011        sqlx::query(&format!(
1012            "SELECT {}.ack_worker($1, $2, $3, $4)",
1013            self.schema_name
1014        ))
1015        .bind(token)
1016        .bind(instance_id)
1017        .bind(completion_json)
1018        .bind(now_ms)
1019        .execute(&*self.pool)
1020        .await
1021        .map_err(|e| {
1022            if e.to_string().contains("Worker queue item not found") {
1023                error!(
1024                    target = "duroxide::providers::postgres",
1025                    operation = "ack_worker",
1026                    error_type = "worker_item_not_found",
1027                    token = %token,
1028                    "Worker queue item not found or already processed"
1029                );
1030                ProviderError::permanent(
1031                    "ack_worker",
1032                    "Worker queue item not found or already processed",
1033                )
1034            } else {
1035                Self::sqlx_to_provider_error("ack_worker", e)
1036            }
1037        })?;
1038
1039        let duration_ms = start.elapsed().as_millis() as u64;
1040        debug!(
1041            target = "duroxide::providers::postgres",
1042            operation = "ack_worker",
1043            instance_id = %instance_id,
1044            duration_ms = duration_ms,
1045            "Acknowledged worker and enqueued completion"
1046        );
1047
1048        Ok(())
1049    }
1050
1051    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1052    async fn renew_work_item_lock(
1053        &self,
1054        token: &str,
1055        extend_for: Duration,
1056    ) -> Result<(), ProviderError> {
1057        let start = std::time::Instant::now();
1058
1059        // Get current time from application for consistent time reference
1060        let now_ms = Self::now_millis();
1061
1062        // Convert Duration to seconds for the stored procedure
1063        let extend_secs = extend_for.as_secs() as i64;
1064
1065        match sqlx::query(&format!(
1066            "SELECT {}.renew_work_item_lock($1, $2, $3)",
1067            self.schema_name
1068        ))
1069        .bind(token)
1070        .bind(now_ms)
1071        .bind(extend_secs)
1072        .execute(&*self.pool)
1073        .await
1074        {
1075            Ok(_) => {
1076                let duration_ms = start.elapsed().as_millis() as u64;
1077                debug!(
1078                    target = "duroxide::providers::postgres",
1079                    operation = "renew_work_item_lock",
1080                    token = %token,
1081                    extend_for_secs = extend_secs,
1082                    duration_ms = duration_ms,
1083                    "Work item lock renewed successfully"
1084                );
1085                Ok(())
1086            }
1087            Err(e) => {
1088                if let SqlxError::Database(db_err) = &e {
1089                    if db_err.message().contains("Lock token invalid") {
1090                        return Err(ProviderError::permanent(
1091                            "renew_work_item_lock",
1092                            "Lock token invalid, expired, or already acked",
1093                        ));
1094                    }
1095                } else if e.to_string().contains("Lock token invalid") {
1096                    return Err(ProviderError::permanent(
1097                        "renew_work_item_lock",
1098                        "Lock token invalid, expired, or already acked",
1099                    ));
1100                }
1101
1102                Err(Self::sqlx_to_provider_error("renew_work_item_lock", e))
1103            }
1104        }
1105    }
1106
1107    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1108    async fn abandon_work_item(
1109        &self,
1110        token: &str,
1111        delay: Option<Duration>,
1112        ignore_attempt: bool,
1113    ) -> Result<(), ProviderError> {
1114        let start = std::time::Instant::now();
1115        let now_ms = Self::now_millis();
1116        let delay_param: Option<i64> = delay.map(|d| d.as_millis() as i64);
1117
1118        match sqlx::query(&format!(
1119            "SELECT {}.abandon_work_item($1, $2, $3, $4)",
1120            self.schema_name
1121        ))
1122        .bind(token)
1123        .bind(now_ms)
1124        .bind(delay_param)
1125        .bind(ignore_attempt)
1126        .execute(&*self.pool)
1127        .await
1128        {
1129            Ok(_) => {
1130                let duration_ms = start.elapsed().as_millis() as u64;
1131                debug!(
1132                    target = "duroxide::providers::postgres",
1133                    operation = "abandon_work_item",
1134                    token = %token,
1135                    delay_ms = delay.map(|d| d.as_millis() as u64),
1136                    ignore_attempt = ignore_attempt,
1137                    duration_ms = duration_ms,
1138                    "Abandoned work item via stored procedure"
1139                );
1140                Ok(())
1141            }
1142            Err(e) => {
1143                if let SqlxError::Database(db_err) = &e {
1144                    if db_err.message().contains("Invalid lock token")
1145                        || db_err.message().contains("already acked")
1146                    {
1147                        return Err(ProviderError::permanent(
1148                            "abandon_work_item",
1149                            "Invalid lock token or already acked",
1150                        ));
1151                    }
1152                } else if e.to_string().contains("Invalid lock token")
1153                    || e.to_string().contains("already acked")
1154                {
1155                    return Err(ProviderError::permanent(
1156                        "abandon_work_item",
1157                        "Invalid lock token or already acked",
1158                    ));
1159                }
1160
1161                Err(Self::sqlx_to_provider_error("abandon_work_item", e))
1162            }
1163        }
1164    }
1165
1166    #[instrument(skip(self), fields(token = %token), target = "duroxide::providers::postgres")]
1167    async fn renew_orchestration_item_lock(
1168        &self,
1169        token: &str,
1170        extend_for: Duration,
1171    ) -> Result<(), ProviderError> {
1172        let start = std::time::Instant::now();
1173
1174        // Get current time from application for consistent time reference
1175        let now_ms = Self::now_millis();
1176
1177        // Convert Duration to seconds for the stored procedure
1178        let extend_secs = extend_for.as_secs() as i64;
1179
1180        match sqlx::query(&format!(
1181            "SELECT {}.renew_orchestration_item_lock($1, $2, $3)",
1182            self.schema_name
1183        ))
1184        .bind(token)
1185        .bind(now_ms)
1186        .bind(extend_secs)
1187        .execute(&*self.pool)
1188        .await
1189        {
1190            Ok(_) => {
1191                let duration_ms = start.elapsed().as_millis() as u64;
1192                debug!(
1193                    target = "duroxide::providers::postgres",
1194                    operation = "renew_orchestration_item_lock",
1195                    token = %token,
1196                    extend_for_secs = extend_secs,
1197                    duration_ms = duration_ms,
1198                    "Orchestration item lock renewed successfully"
1199                );
1200                Ok(())
1201            }
1202            Err(e) => {
1203                if let SqlxError::Database(db_err) = &e {
1204                    if db_err.message().contains("Lock token invalid")
1205                        || db_err.message().contains("expired")
1206                        || db_err.message().contains("already released")
1207                    {
1208                        return Err(ProviderError::permanent(
1209                            "renew_orchestration_item_lock",
1210                            "Lock token invalid, expired, or already released",
1211                        ));
1212                    }
1213                } else if e.to_string().contains("Lock token invalid")
1214                    || e.to_string().contains("expired")
1215                    || e.to_string().contains("already released")
1216                {
1217                    return Err(ProviderError::permanent(
1218                        "renew_orchestration_item_lock",
1219                        "Lock token invalid, expired, or already released",
1220                    ));
1221                }
1222
1223                Err(Self::sqlx_to_provider_error(
1224                    "renew_orchestration_item_lock",
1225                    e,
1226                ))
1227            }
1228        }
1229    }
1230
1231    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1232    async fn enqueue_for_orchestrator(
1233        &self,
1234        item: WorkItem,
1235        delay: Option<Duration>,
1236    ) -> Result<(), ProviderError> {
1237        let work_item = serde_json::to_string(&item).map_err(|e| {
1238            ProviderError::permanent(
1239                "enqueue_orchestrator_work",
1240                format!("Failed to serialize work item: {e}"),
1241            )
1242        })?;
1243
1244        // Extract instance ID from WorkItem enum
1245        let instance_id = match &item {
1246            WorkItem::StartOrchestration { instance, .. }
1247            | WorkItem::ActivityCompleted { instance, .. }
1248            | WorkItem::ActivityFailed { instance, .. }
1249            | WorkItem::TimerFired { instance, .. }
1250            | WorkItem::ExternalRaised { instance, .. }
1251            | WorkItem::CancelInstance { instance, .. }
1252            | WorkItem::ContinueAsNew { instance, .. }
1253            | WorkItem::QueueMessage { instance, .. } => instance,
1254            WorkItem::SubOrchCompleted {
1255                parent_instance, ..
1256            }
1257            | WorkItem::SubOrchFailed {
1258                parent_instance, ..
1259            } => parent_instance,
1260            WorkItem::ActivityExecute { .. } => {
1261                return Err(ProviderError::permanent(
1262                    "enqueue_orchestrator_work",
1263                    "ActivityExecute should go to worker queue, not orchestrator queue",
1264                ));
1265            }
1266        };
1267
1268        // Determine visible_at: use max of fire_at_ms (for TimerFired) and delay
1269        let now_ms = Self::now_millis();
1270
1271        let visible_at_ms = if let WorkItem::TimerFired { fire_at_ms, .. } = &item {
1272            if *fire_at_ms > 0 {
1273                // Take max of fire_at_ms and delay (if provided)
1274                if let Some(delay) = delay {
1275                    std::cmp::max(*fire_at_ms, now_ms as u64 + delay.as_millis() as u64)
1276                } else {
1277                    *fire_at_ms
1278                }
1279            } else {
1280                // fire_at_ms is 0, use delay or NOW()
1281                delay
1282                    .map(|d| now_ms as u64 + d.as_millis() as u64)
1283                    .unwrap_or(now_ms as u64)
1284            }
1285        } else {
1286            // Non-timer item: use delay or NOW()
1287            delay
1288                .map(|d| now_ms as u64 + d.as_millis() as u64)
1289                .unwrap_or(now_ms as u64)
1290        };
1291
1292        let visible_at = Utc
1293            .timestamp_millis_opt(visible_at_ms as i64)
1294            .single()
1295            .ok_or_else(|| {
1296                ProviderError::permanent(
1297                    "enqueue_orchestrator_work",
1298                    "Invalid visible_at timestamp",
1299                )
1300            })?;
1301
1302        // ⚠️ CRITICAL: DO NOT extract orchestration metadata - instance creation happens via ack_orchestration_item metadata
1303        // Pass NULL for orchestration_name, orchestration_version, execution_id parameters
1304
1305        // Call stored procedure to enqueue work
1306        sqlx::query(&format!(
1307            "SELECT {}.enqueue_orchestrator_work($1, $2, $3, $4, $5, $6)",
1308            self.schema_name
1309        ))
1310        .bind(instance_id)
1311        .bind(&work_item)
1312        .bind(visible_at)
1313        .bind::<Option<String>>(None) // orchestration_name - NULL
1314        .bind::<Option<String>>(None) // orchestration_version - NULL
1315        .bind::<Option<i64>>(None) // execution_id - NULL
1316        .execute(&*self.pool)
1317        .await
1318        .map_err(|e| {
1319            error!(
1320                target = "duroxide::providers::postgres",
1321                operation = "enqueue_orchestrator_work",
1322                error_type = "database_error",
1323                error = %e,
1324                instance_id = %instance_id,
1325                "Failed to enqueue orchestrator work"
1326            );
1327            Self::sqlx_to_provider_error("enqueue_orchestrator_work", e)
1328        })?;
1329
1330        debug!(
1331            target = "duroxide::providers::postgres",
1332            operation = "enqueue_orchestrator_work",
1333            instance_id = %instance_id,
1334            delay_ms = delay.map(|d| d.as_millis() as u64),
1335            "Enqueued orchestrator work"
1336        );
1337
1338        Ok(())
1339    }
1340
1341    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1342    async fn read_with_execution(
1343        &self,
1344        instance: &str,
1345        execution_id: u64,
1346    ) -> Result<Vec<Event>, ProviderError> {
1347        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1348            "SELECT event_data FROM {} WHERE instance_id = $1 AND execution_id = $2 ORDER BY event_id",
1349            self.table_name("history")
1350        ))
1351        .bind(instance)
1352        .bind(execution_id as i64)
1353        .fetch_all(&*self.pool)
1354        .await
1355        .map_err(|e| Self::sqlx_to_provider_error("read_with_execution", e))?;
1356
1357        event_data_rows
1358            .into_iter()
1359            .map(|event_data| {
1360                serde_json::from_str::<Event>(&event_data).map_err(|e| {
1361                    ProviderError::permanent(
1362                        "read_with_execution",
1363                        format!("Failed to deserialize event: {e}"),
1364                    )
1365                })
1366            })
1367            .collect()
1368    }
1369
1370    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1371    async fn renew_session_lock(
1372        &self,
1373        owner_ids: &[&str],
1374        extend_for: Duration,
1375        idle_timeout: Duration,
1376    ) -> Result<usize, ProviderError> {
1377        if owner_ids.is_empty() {
1378            return Ok(0);
1379        }
1380
1381        let now_ms = Self::now_millis();
1382        let extend_ms = extend_for.as_millis() as i64;
1383        let idle_timeout_ms = idle_timeout.as_millis() as i64;
1384        let owner_ids_vec: Vec<&str> = owner_ids.to_vec();
1385
1386        let result = sqlx::query_scalar::<_, i64>(&format!(
1387            "SELECT {}.renew_session_lock($1, $2, $3, $4)",
1388            self.schema_name
1389        ))
1390        .bind(&owner_ids_vec)
1391        .bind(now_ms)
1392        .bind(extend_ms)
1393        .bind(idle_timeout_ms)
1394        .fetch_one(&*self.pool)
1395        .await
1396        .map_err(|e| Self::sqlx_to_provider_error("renew_session_lock", e))?;
1397
1398        debug!(
1399            target = "duroxide::providers::postgres",
1400            operation = "renew_session_lock",
1401            owner_count = owner_ids.len(),
1402            sessions_renewed = result,
1403            "Session locks renewed"
1404        );
1405
1406        Ok(result as usize)
1407    }
1408
1409    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1410    async fn cleanup_orphaned_sessions(
1411        &self,
1412        _idle_timeout: Duration,
1413    ) -> Result<usize, ProviderError> {
1414        let now_ms = Self::now_millis();
1415
1416        let result = sqlx::query_scalar::<_, i64>(&format!(
1417            "SELECT {}.cleanup_orphaned_sessions($1)",
1418            self.schema_name
1419        ))
1420        .bind(now_ms)
1421        .fetch_one(&*self.pool)
1422        .await
1423        .map_err(|e| Self::sqlx_to_provider_error("cleanup_orphaned_sessions", e))?;
1424
1425        debug!(
1426            target = "duroxide::providers::postgres",
1427            operation = "cleanup_orphaned_sessions",
1428            sessions_cleaned = result,
1429            "Orphaned sessions cleaned up"
1430        );
1431
1432        Ok(result as usize)
1433    }
1434
1435    fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
1436        Some(self)
1437    }
1438
1439    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1440    async fn get_custom_status(
1441        &self,
1442        instance: &str,
1443        last_seen_version: u64,
1444    ) -> Result<Option<(Option<String>, u64)>, ProviderError> {
1445        let row = sqlx::query_as::<_, (Option<String>, i64)>(&format!(
1446            "SELECT * FROM {}.get_custom_status($1, $2)",
1447            self.schema_name
1448        ))
1449        .bind(instance)
1450        .bind(last_seen_version as i64)
1451        .fetch_optional(&*self.pool)
1452        .await
1453        .map_err(|e| Self::sqlx_to_provider_error("get_custom_status", e))?;
1454
1455        match row {
1456            Some((custom_status, version)) => Ok(Some((custom_status, version as u64))),
1457            None => Ok(None),
1458        }
1459    }
1460
1461    async fn get_kv_value(
1462        &self,
1463        instance_id: &str,
1464        key: &str,
1465    ) -> Result<Option<String>, ProviderError> {
1466        let row: Option<(Option<String>, bool)> = sqlx::query_as(&format!(
1467            "SELECT * FROM {}.get_kv_value($1, $2)",
1468            self.schema_name
1469        ))
1470        .bind(instance_id)
1471        .bind(key)
1472        .fetch_optional(&*self.pool)
1473        .await
1474        .map_err(|e| Self::sqlx_to_provider_error("get_kv_value", e))?;
1475
1476        Ok(row.and_then(|(value, found)| if found { value } else { None }))
1477    }
1478
1479    async fn get_kv_all_values(
1480        &self,
1481        instance_id: &str,
1482    ) -> Result<std::collections::HashMap<String, String>, ProviderError> {
1483        let rows: Vec<(String, String)> = sqlx::query_as(&format!(
1484            "SELECT * FROM {}.get_kv_all_values($1)",
1485            self.schema_name
1486        ))
1487        .bind(instance_id)
1488        .fetch_all(&*self.pool)
1489        .await
1490        .map_err(|e| Self::sqlx_to_provider_error("get_kv_all_values", e))?;
1491
1492        Ok(rows.into_iter().collect())
1493    }
1494
1495    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1496    async fn get_instance_stats(&self, instance: &str) -> Result<Option<SystemStats>, ProviderError> {
1497        let row: Option<(bool, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1498            "SELECT * FROM {}.get_instance_stats($1)",
1499            self.schema_name
1500        ))
1501        .bind(instance)
1502        .fetch_optional(&*self.pool)
1503        .await
1504        .map_err(|e| Self::sqlx_to_provider_error("get_instance_stats", e))?;
1505
1506        match row {
1507            Some((true, history_event_count, history_size_bytes, queue_pending_count, kv_user_key_count, kv_total_value_bytes)) => {
1508                Ok(Some(SystemStats {
1509                    history_event_count: history_event_count as u64,
1510                    history_size_bytes: history_size_bytes as u64,
1511                    queue_pending_count: queue_pending_count as u64,
1512                    kv_user_key_count: kv_user_key_count as u64,
1513                    kv_total_value_bytes: kv_total_value_bytes as u64,
1514                }))
1515            }
1516            _ => Ok(None),
1517        }
1518    }
1519}
1520
1521#[async_trait::async_trait]
1522impl ProviderAdmin for PostgresProvider {
1523    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1524    async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
1525        sqlx::query_scalar(&format!(
1526            "SELECT instance_id FROM {}.list_instances()",
1527            self.schema_name
1528        ))
1529        .fetch_all(&*self.pool)
1530        .await
1531        .map_err(|e| Self::sqlx_to_provider_error("list_instances", e))
1532    }
1533
1534    #[instrument(skip(self), fields(status = %status), target = "duroxide::providers::postgres")]
1535    async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
1536        sqlx::query_scalar(&format!(
1537            "SELECT instance_id FROM {}.list_instances_by_status($1)",
1538            self.schema_name
1539        ))
1540        .bind(status)
1541        .fetch_all(&*self.pool)
1542        .await
1543        .map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))
1544    }
1545
1546    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1547    async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
1548        let execution_ids: Vec<i64> = sqlx::query_scalar(&format!(
1549            "SELECT execution_id FROM {}.list_executions($1)",
1550            self.schema_name
1551        ))
1552        .bind(instance)
1553        .fetch_all(&*self.pool)
1554        .await
1555        .map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
1556
1557        Ok(execution_ids.into_iter().map(|id| id as u64).collect())
1558    }
1559
1560    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1561    async fn read_history_with_execution_id(
1562        &self,
1563        instance: &str,
1564        execution_id: u64,
1565    ) -> Result<Vec<Event>, ProviderError> {
1566        let event_data_rows: Vec<String> = sqlx::query_scalar(&format!(
1567            "SELECT out_event_data FROM {}.fetch_history_with_execution($1, $2)",
1568            self.schema_name
1569        ))
1570        .bind(instance)
1571        .bind(execution_id as i64)
1572        .fetch_all(&*self.pool)
1573        .await
1574        .map_err(|e| Self::sqlx_to_provider_error("read_execution", e))?;
1575
1576        event_data_rows
1577            .into_iter()
1578            .map(|event_data| {
1579                serde_json::from_str::<Event>(&event_data).map_err(|e| {
1580                    ProviderError::permanent(
1581                        "read_history_with_execution_id",
1582                        format!("Failed to deserialize event: {e}"),
1583                    )
1584                })
1585            })
1586            .collect()
1587    }
1588
1589    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1590    async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
1591        let execution_id = self.latest_execution_id(instance).await?;
1592        self.read_history_with_execution_id(instance, execution_id)
1593            .await
1594    }
1595
1596    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1597    async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
1598        sqlx::query_scalar(&format!(
1599            "SELECT {}.latest_execution_id($1)",
1600            self.schema_name
1601        ))
1602        .bind(instance)
1603        .fetch_optional(&*self.pool)
1604        .await
1605        .map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?
1606        .map(|id: i64| id as u64)
1607        .ok_or_else(|| ProviderError::permanent("latest_execution_id", "Instance not found"))
1608    }
1609
1610    #[instrument(skip(self), fields(instance = %instance), target = "duroxide::providers::postgres")]
1611    async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
1612        let row: Option<(
1613            String,
1614            String,
1615            String,
1616            i64,
1617            chrono::DateTime<Utc>,
1618            Option<chrono::DateTime<Utc>>,
1619            Option<String>,
1620            Option<String>,
1621            Option<String>,
1622        )> = sqlx::query_as(&format!(
1623            "SELECT * FROM {}.get_instance_info($1)",
1624            self.schema_name
1625        ))
1626        .bind(instance)
1627        .fetch_optional(&*self.pool)
1628        .await
1629        .map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
1630
1631        let (
1632            instance_id,
1633            orchestration_name,
1634            orchestration_version,
1635            current_execution_id,
1636            created_at,
1637            updated_at,
1638            status,
1639            output,
1640            parent_instance_id,
1641        ) =
1642            row.ok_or_else(|| ProviderError::permanent("get_instance_info", "Instance not found"))?;
1643
1644        Ok(InstanceInfo {
1645            instance_id,
1646            orchestration_name,
1647            orchestration_version,
1648            current_execution_id: current_execution_id as u64,
1649            status: status.unwrap_or_else(|| "Running".to_string()),
1650            output,
1651            created_at: created_at.timestamp_millis() as u64,
1652            updated_at: updated_at
1653                .map(|dt| dt.timestamp_millis() as u64)
1654                .unwrap_or(created_at.timestamp_millis() as u64),
1655            parent_instance_id,
1656        })
1657    }
1658
1659    #[instrument(skip(self), fields(instance = %instance, execution_id = execution_id), target = "duroxide::providers::postgres")]
1660    async fn get_execution_info(
1661        &self,
1662        instance: &str,
1663        execution_id: u64,
1664    ) -> Result<ExecutionInfo, ProviderError> {
1665        let row: Option<(
1666            i64,
1667            String,
1668            Option<String>,
1669            chrono::DateTime<Utc>,
1670            Option<chrono::DateTime<Utc>>,
1671            i64,
1672        )> = sqlx::query_as(&format!(
1673            "SELECT * FROM {}.get_execution_info($1, $2)",
1674            self.schema_name
1675        ))
1676        .bind(instance)
1677        .bind(execution_id as i64)
1678        .fetch_optional(&*self.pool)
1679        .await
1680        .map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
1681
1682        let (exec_id, status, output, started_at, completed_at, event_count) = row
1683            .ok_or_else(|| ProviderError::permanent("get_execution_info", "Execution not found"))?;
1684
1685        Ok(ExecutionInfo {
1686            execution_id: exec_id as u64,
1687            status,
1688            output,
1689            started_at: started_at.timestamp_millis() as u64,
1690            completed_at: completed_at.map(|dt| dt.timestamp_millis() as u64),
1691            event_count: event_count as usize,
1692        })
1693    }
1694
1695    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1696    async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
1697        let row: Option<(i64, i64, i64, i64, i64, i64)> = sqlx::query_as(&format!(
1698            "SELECT * FROM {}.get_system_metrics()",
1699            self.schema_name
1700        ))
1701        .fetch_optional(&*self.pool)
1702        .await
1703        .map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
1704
1705        let (
1706            total_instances,
1707            total_executions,
1708            running_instances,
1709            completed_instances,
1710            failed_instances,
1711            total_events,
1712        ) = row.ok_or_else(|| {
1713            ProviderError::permanent("get_system_metrics", "Failed to get system metrics")
1714        })?;
1715
1716        Ok(SystemMetrics {
1717            total_instances: total_instances as u64,
1718            total_executions: total_executions as u64,
1719            running_instances: running_instances as u64,
1720            completed_instances: completed_instances as u64,
1721            failed_instances: failed_instances as u64,
1722            total_events: total_events as u64,
1723        })
1724    }
1725
1726    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1727    async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
1728        let now_ms = Self::now_millis();
1729
1730        let row: Option<(i64, i64)> = sqlx::query_as(&format!(
1731            "SELECT * FROM {}.get_queue_depths($1)",
1732            self.schema_name
1733        ))
1734        .bind(now_ms)
1735        .fetch_optional(&*self.pool)
1736        .await
1737        .map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
1738
1739        let (orchestrator_queue, worker_queue) = row.ok_or_else(|| {
1740            ProviderError::permanent("get_queue_depths", "Failed to get queue depths")
1741        })?;
1742
1743        Ok(QueueDepths {
1744            orchestrator_queue: orchestrator_queue as usize,
1745            worker_queue: worker_queue as usize,
1746            timer_queue: 0, // Timers are in orchestrator queue with delayed visibility
1747        })
1748    }
1749
1750    // ===== Hierarchy Primitive Operations =====
1751
1752    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1753    async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError> {
1754        sqlx::query_scalar(&format!(
1755            "SELECT child_instance_id FROM {}.list_children($1)",
1756            self.schema_name
1757        ))
1758        .bind(instance_id)
1759        .fetch_all(&*self.pool)
1760        .await
1761        .map_err(|e| Self::sqlx_to_provider_error("list_children", e))
1762    }
1763
1764    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1765    async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError> {
1766        // The stored procedure raises an exception if instance doesn't exist
1767        // Otherwise returns the parent_instance_id (which may be NULL)
1768        let result: Result<Option<String>, _> =
1769            sqlx::query_scalar(&format!("SELECT {}.get_parent_id($1)", self.schema_name))
1770                .bind(instance_id)
1771                .fetch_one(&*self.pool)
1772                .await;
1773
1774        match result {
1775            Ok(parent_id) => Ok(parent_id),
1776            Err(e) => {
1777                let err_str = e.to_string();
1778                if err_str.contains("Instance not found") {
1779                    Err(ProviderError::permanent(
1780                        "get_parent_id",
1781                        format!("Instance not found: {}", instance_id),
1782                    ))
1783                } else {
1784                    Err(Self::sqlx_to_provider_error("get_parent_id", e))
1785                }
1786            }
1787        }
1788    }
1789
1790    // ===== Deletion Operations =====
1791
1792    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1793    async fn delete_instances_atomic(
1794        &self,
1795        ids: &[String],
1796        force: bool,
1797    ) -> Result<DeleteInstanceResult, ProviderError> {
1798        if ids.is_empty() {
1799            return Ok(DeleteInstanceResult::default());
1800        }
1801
1802        let row: Option<(i64, i64, i64, i64)> = sqlx::query_as(&format!(
1803            "SELECT * FROM {}.delete_instances_atomic($1, $2)",
1804            self.schema_name
1805        ))
1806        .bind(ids)
1807        .bind(force)
1808        .fetch_optional(&*self.pool)
1809        .await
1810        .map_err(|e| {
1811            let err_str = e.to_string();
1812            if err_str.contains("is Running") {
1813                ProviderError::permanent("delete_instances_atomic", err_str)
1814            } else if err_str.contains("Orphan detected") {
1815                ProviderError::permanent("delete_instances_atomic", err_str)
1816            } else {
1817                Self::sqlx_to_provider_error("delete_instances_atomic", e)
1818            }
1819        })?;
1820
1821        let (instances_deleted, executions_deleted, events_deleted, queue_messages_deleted) =
1822            row.unwrap_or((0, 0, 0, 0));
1823
1824        debug!(
1825            target = "duroxide::providers::postgres",
1826            operation = "delete_instances_atomic",
1827            instances_deleted = instances_deleted,
1828            executions_deleted = executions_deleted,
1829            events_deleted = events_deleted,
1830            queue_messages_deleted = queue_messages_deleted,
1831            "Deleted instances atomically"
1832        );
1833
1834        Ok(DeleteInstanceResult {
1835            instances_deleted: instances_deleted as u64,
1836            executions_deleted: executions_deleted as u64,
1837            events_deleted: events_deleted as u64,
1838            queue_messages_deleted: queue_messages_deleted as u64,
1839        })
1840    }
1841
1842    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1843    async fn delete_instance_bulk(
1844        &self,
1845        filter: InstanceFilter,
1846    ) -> Result<DeleteInstanceResult, ProviderError> {
1847        // Build query to find matching root instances in terminal states
1848        let mut sql = format!(
1849            r#"
1850            SELECT i.instance_id
1851            FROM {}.instances i
1852            LEFT JOIN {}.executions e ON i.instance_id = e.instance_id 
1853              AND i.current_execution_id = e.execution_id
1854            WHERE i.parent_instance_id IS NULL
1855              AND e.status IN ('Completed', 'Failed', 'ContinuedAsNew')
1856            "#,
1857            self.schema_name, self.schema_name
1858        );
1859
1860        // Add instance_ids filter if provided
1861        if let Some(ref ids) = filter.instance_ids {
1862            if ids.is_empty() {
1863                return Ok(DeleteInstanceResult::default());
1864            }
1865            let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
1866            sql.push_str(&format!(
1867                " AND i.instance_id IN ({})",
1868                placeholders.join(", ")
1869            ));
1870        }
1871
1872        // Add completed_before filter if provided
1873        if filter.completed_before.is_some() {
1874            let param_num = filter
1875                .instance_ids
1876                .as_ref()
1877                .map(|ids| ids.len())
1878                .unwrap_or(0)
1879                + 1;
1880            sql.push_str(&format!(
1881                " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
1882                param_num
1883            ));
1884        }
1885
1886        // Add limit
1887        let limit = filter.limit.unwrap_or(1000);
1888        let limit_param_num = filter
1889            .instance_ids
1890            .as_ref()
1891            .map(|ids| ids.len())
1892            .unwrap_or(0)
1893            + if filter.completed_before.is_some() {
1894                1
1895            } else {
1896                0
1897            }
1898            + 1;
1899        sql.push_str(&format!(" LIMIT ${}", limit_param_num));
1900
1901        // Build and execute query
1902        let mut query = sqlx::query_scalar::<_, String>(&sql);
1903        if let Some(ref ids) = filter.instance_ids {
1904            for id in ids {
1905                query = query.bind(id);
1906            }
1907        }
1908        if let Some(completed_before) = filter.completed_before {
1909            query = query.bind(completed_before as i64);
1910        }
1911        query = query.bind(limit as i64);
1912
1913        let instance_ids: Vec<String> = query
1914            .fetch_all(&*self.pool)
1915            .await
1916            .map_err(|e| Self::sqlx_to_provider_error("delete_instance_bulk", e))?;
1917
1918        if instance_ids.is_empty() {
1919            return Ok(DeleteInstanceResult::default());
1920        }
1921
1922        // Delete each instance with cascade
1923        let mut result = DeleteInstanceResult::default();
1924
1925        for instance_id in &instance_ids {
1926            // Get full tree for this root
1927            let tree = self.get_instance_tree(instance_id).await?;
1928
1929            // Atomic delete (tree.all_ids is already in deletion order: children first)
1930            let delete_result = self.delete_instances_atomic(&tree.all_ids, true).await?;
1931            result.instances_deleted += delete_result.instances_deleted;
1932            result.executions_deleted += delete_result.executions_deleted;
1933            result.events_deleted += delete_result.events_deleted;
1934            result.queue_messages_deleted += delete_result.queue_messages_deleted;
1935        }
1936
1937        debug!(
1938            target = "duroxide::providers::postgres",
1939            operation = "delete_instance_bulk",
1940            instances_deleted = result.instances_deleted,
1941            executions_deleted = result.executions_deleted,
1942            events_deleted = result.events_deleted,
1943            queue_messages_deleted = result.queue_messages_deleted,
1944            "Bulk deleted instances"
1945        );
1946
1947        Ok(result)
1948    }
1949
1950    // ===== Pruning Operations =====
1951
1952    #[instrument(skip(self), fields(instance = %instance_id), target = "duroxide::providers::postgres")]
1953    async fn prune_executions(
1954        &self,
1955        instance_id: &str,
1956        options: PruneOptions,
1957    ) -> Result<PruneResult, ProviderError> {
1958        let keep_last: Option<i32> = options.keep_last.map(|v| v as i32);
1959        let completed_before_ms: Option<i64> = options.completed_before.map(|v| v as i64);
1960
1961        let row: Option<(i64, i64, i64)> = sqlx::query_as(&format!(
1962            "SELECT * FROM {}.prune_executions($1, $2, $3)",
1963            self.schema_name
1964        ))
1965        .bind(instance_id)
1966        .bind(keep_last)
1967        .bind(completed_before_ms)
1968        .fetch_optional(&*self.pool)
1969        .await
1970        .map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?;
1971
1972        let (instances_processed, executions_deleted, events_deleted) = row.unwrap_or((0, 0, 0));
1973
1974        debug!(
1975            target = "duroxide::providers::postgres",
1976            operation = "prune_executions",
1977            instance_id = %instance_id,
1978            instances_processed = instances_processed,
1979            executions_deleted = executions_deleted,
1980            events_deleted = events_deleted,
1981            "Pruned executions"
1982        );
1983
1984        Ok(PruneResult {
1985            instances_processed: instances_processed as u64,
1986            executions_deleted: executions_deleted as u64,
1987            events_deleted: events_deleted as u64,
1988        })
1989    }
1990
1991    #[instrument(skip(self), target = "duroxide::providers::postgres")]
1992    async fn prune_executions_bulk(
1993        &self,
1994        filter: InstanceFilter,
1995        options: PruneOptions,
1996    ) -> Result<PruneResult, ProviderError> {
1997        // Find matching instances (all statuses - prune_executions protects current execution)
1998        // Note: We include Running instances because long-running orchestrations (e.g., with
1999        // ContinueAsNew) may have old executions that need pruning. The underlying prune_executions
2000        // call safely skips the current execution regardless of its status.
2001        let mut sql = format!(
2002            r#"
2003            SELECT i.instance_id
2004            FROM {}.instances i
2005            LEFT JOIN {}.executions e ON i.instance_id = e.instance_id 
2006              AND i.current_execution_id = e.execution_id
2007            WHERE 1=1
2008            "#,
2009            self.schema_name, self.schema_name
2010        );
2011
2012        // Add instance_ids filter if provided
2013        if let Some(ref ids) = filter.instance_ids {
2014            if ids.is_empty() {
2015                return Ok(PruneResult::default());
2016            }
2017            let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}", i)).collect();
2018            sql.push_str(&format!(
2019                " AND i.instance_id IN ({})",
2020                placeholders.join(", ")
2021            ));
2022        }
2023
2024        // Add completed_before filter if provided
2025        if filter.completed_before.is_some() {
2026            let param_num = filter
2027                .instance_ids
2028                .as_ref()
2029                .map(|ids| ids.len())
2030                .unwrap_or(0)
2031                + 1;
2032            sql.push_str(&format!(
2033                " AND e.completed_at < TO_TIMESTAMP(${} / 1000.0)",
2034                param_num
2035            ));
2036        }
2037
2038        // Add limit
2039        let limit = filter.limit.unwrap_or(1000);
2040        let limit_param_num = filter
2041            .instance_ids
2042            .as_ref()
2043            .map(|ids| ids.len())
2044            .unwrap_or(0)
2045            + if filter.completed_before.is_some() {
2046                1
2047            } else {
2048                0
2049            }
2050            + 1;
2051        sql.push_str(&format!(" LIMIT ${}", limit_param_num));
2052
2053        // Build and execute query
2054        let mut query = sqlx::query_scalar::<_, String>(&sql);
2055        if let Some(ref ids) = filter.instance_ids {
2056            for id in ids {
2057                query = query.bind(id);
2058            }
2059        }
2060        if let Some(completed_before) = filter.completed_before {
2061            query = query.bind(completed_before as i64);
2062        }
2063        query = query.bind(limit as i64);
2064
2065        let instance_ids: Vec<String> = query
2066            .fetch_all(&*self.pool)
2067            .await
2068            .map_err(|e| Self::sqlx_to_provider_error("prune_executions_bulk", e))?;
2069
2070        // Prune each instance
2071        let mut result = PruneResult::default();
2072
2073        for instance_id in &instance_ids {
2074            let single_result = self.prune_executions(instance_id, options.clone()).await?;
2075            result.instances_processed += single_result.instances_processed;
2076            result.executions_deleted += single_result.executions_deleted;
2077            result.events_deleted += single_result.events_deleted;
2078        }
2079
2080        debug!(
2081            target = "duroxide::providers::postgres",
2082            operation = "prune_executions_bulk",
2083            instances_processed = result.instances_processed,
2084            executions_deleted = result.executions_deleted,
2085            events_deleted = result.events_deleted,
2086            "Bulk pruned executions"
2087        );
2088
2089        Ok(result)
2090    }
2091}