Skip to main content

duroxide/client/
mod.rs

1use std::sync::Arc;
2
3use tracing::info;
4
5use crate::_typed_codec::{Codec, Json};
6use crate::providers::{
7    DeleteInstanceResult, ExecutionInfo, InstanceFilter, InstanceInfo, InstanceTree, Provider, ProviderAdmin,
8    ProviderError, PruneOptions, PruneResult, QueueDepths, SystemMetrics, WorkItem,
9};
10use crate::{EventKind, OrchestrationStatus};
11use serde::Serialize;
12
13/// Client-specific error type that wraps provider errors and adds client-specific errors.
14///
15/// This enum allows callers to distinguish between:
16/// - Provider errors (storage failures, can be retryable or permanent)
17/// - Client-specific errors (validation, capability not available, etc.)
18#[derive(Debug, Clone)]
19pub enum ClientError {
20    /// Provider operation failed (wraps ProviderError)
21    Provider(ProviderError),
22
23    /// Management capability not available
24    ManagementNotAvailable,
25
26    /// Invalid input (client validation)
27    InvalidInput { message: String },
28
29    /// Operation timed out
30    Timeout,
31
32    /// Instance is still running (for delete without force)
33    InstanceStillRunning { instance_id: String },
34
35    /// Cannot delete a sub-orchestration directly (must delete root)
36    CannotDeleteSubOrchestration { instance_id: String },
37
38    /// Instance not found
39    InstanceNotFound { instance_id: String },
40}
41
42impl ClientError {
43    /// Check if this error is retryable (only applies to Provider errors)
44    pub fn is_retryable(&self) -> bool {
45        match self {
46            ClientError::Provider(e) => e.is_retryable(),
47            ClientError::ManagementNotAvailable => false,
48            ClientError::InvalidInput { .. } => false,
49            ClientError::Timeout => true,
50            ClientError::InstanceStillRunning { .. } => false,
51            ClientError::CannotDeleteSubOrchestration { .. } => false,
52            ClientError::InstanceNotFound { .. } => false,
53        }
54    }
55}
56
57impl std::fmt::Display for ClientError {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        match self {
60            ClientError::Provider(e) => write!(f, "{e}"),
61            ClientError::ManagementNotAvailable => write!(
62                f,
63                "Management features not available - provider doesn't implement ProviderAdmin"
64            ),
65            ClientError::InvalidInput { message } => write!(f, "Invalid input: {message}"),
66            ClientError::Timeout => write!(f, "Operation timed out"),
67            ClientError::InstanceStillRunning { instance_id } => write!(
68                f,
69                "Instance {instance_id} is still running. Use force=true or cancel first."
70            ),
71            ClientError::CannotDeleteSubOrchestration { instance_id } => write!(
72                f,
73                "Cannot delete sub-orchestration {instance_id} directly. Delete the root orchestration instead."
74            ),
75            ClientError::InstanceNotFound { instance_id } => {
76                write!(f, "Instance {instance_id} not found")
77            }
78        }
79    }
80}
81
82impl std::error::Error for ClientError {}
83
84impl From<ProviderError> for ClientError {
85    fn from(e: ProviderError) -> Self {
86        ClientError::Provider(e)
87    }
88}
89
90// Constants for polling behavior in wait_for_orchestration
91/// Initial delay between status polls (5ms)
92const INITIAL_POLL_DELAY_MS: u64 = 5;
93
94/// Maximum delay between status polls (100ms)
95const MAX_POLL_DELAY_MS: u64 = 100;
96
97/// Multiplier for exponential backoff
98const POLL_DELAY_MULTIPLIER: u64 = 2;
99
100/// Client for orchestration control-plane operations with automatic capability discovery.
101///
102/// The Client provides APIs for managing orchestration instances:
103/// - Starting orchestrations
104/// - Raising external events
105/// - Cancelling instances
106/// - Checking status
107/// - Waiting for completion
108/// - Rich management features (when available)
109///
110/// # Automatic Capability Discovery
111///
112/// The Client automatically discovers provider capabilities through the `Provider::as_management_capability()` method.
113/// When a provider implements `ProviderAdmin`, rich management features become available:
114///
115/// ```ignore
116/// let client = Client::new(provider);
117///
118/// // Control plane (always available)
119/// client.start_orchestration("order-1", "ProcessOrder", "{}").await?;
120///
121/// // Management (automatically discovered)
122/// if client.has_management_capability() {
123///     let instances = client.list_all_instances().await?;
124///     let metrics = client.get_system_metrics().await?;
125/// } else {
126///     println!("Management features not available");
127/// }
128/// ```
129///
130/// # Design
131///
132/// The Client communicates with the Runtime **only through the shared Provider** (no direct coupling).
133/// This allows the Client to be used from any process, even one without a running Runtime.
134///
135/// # Thread Safety
136///
137/// Client is `Clone` and can be safely shared across threads.
138///
139/// # Example Usage
140///
141/// ```ignore
142/// use duroxide::{Client, OrchestrationStatus};
143/// use duroxide::providers::sqlite::SqliteProvider;
144/// use std::sync::Arc;
145///
146/// use duroxide::ClientError;
147/// let store = Arc::new(SqliteProvider::new("sqlite:./data.db").await?);
148/// let client = Client::new(store);
149///
150/// // Start an orchestration
151/// client.start_orchestration("order-123", "ProcessOrder", r#"{"customer_id": "c1"}"#).await?;
152///
153/// // Check status
154/// let status = client.get_orchestration_status("order-123").await?;
155/// println!("Status: {:?}", status);
156///
157/// // Wait for completion
158/// let result = client.wait_for_orchestration("order-123", std::time::Duration::from_secs(30)).await.unwrap();
159/// match result {
160///     OrchestrationStatus::Completed { output, .. } => println!("Done: {}", output),
161///     OrchestrationStatus::Failed { details, .. } => {
162///         eprintln!("Failed ({}): {}", details.category(), details.display_message());
163///     }
164///     _ => {}
165/// }
166/// ```
167pub struct Client {
168    store: Arc<dyn Provider>,
169}
170
171impl Client {
172    /// Create a client bound to a Provider instance.
173    ///
174    /// # Parameters
175    ///
176    /// * `store` - Arc-wrapped Provider (same instance used by Runtime)
177    ///
178    /// # Example
179    ///
180    /// ```ignore
181    /// let store = Arc::new(SqliteProvider::new("sqlite::memory:").await.unwrap());
182    /// let client = Client::new(store.clone());
183    /// // Multiple clients can share the same store
184    /// let client2 = client.clone();
185    /// ```
186    pub fn new(store: Arc<dyn Provider>) -> Self {
187        Self { store }
188    }
189
190    /// Start an orchestration instance with string input.
191    ///
192    /// # Parameters
193    ///
194    /// * `instance` - Unique instance ID (e.g., "order-123", "user-payment-456")
195    /// * `orchestration` - Name of registered orchestration (e.g., "ProcessOrder")
196    /// * `input` - JSON string input (will be passed to orchestration)
197    ///
198    /// # Returns
199    ///
200    /// * `Ok(())` - Instance was enqueued for processing
201    /// * `Err(msg)` - Failed to enqueue (storage error)
202    ///
203    /// # Behavior
204    ///
205    /// - Enqueues a StartOrchestration work item
206    /// - Returns immediately (doesn't wait for orchestration to start/complete)
207    /// - Use `wait_for_orchestration()` to wait for completion
208    ///
209    /// # Instance ID Requirements
210    ///
211    /// - Must be unique across all orchestrations
212    /// - Can be any string (alphanumeric + hyphens recommended)
213    /// - Reusing an instance ID that already exists will fail
214    ///
215    /// # Example
216    ///
217    /// ```rust,no_run
218    /// # use duroxide::{Client, ClientError};
219    /// # use std::sync::Arc;
220    /// # async fn example(client: Client) -> Result<(), ClientError> {
221    /// // Start with JSON string input
222    /// client.start_orchestration(
223    ///     "order-123",
224    ///     "ProcessOrder",
225    ///     r#"{"customer_id": "c1", "items": ["item1", "item2"]}"#
226    /// ).await?;
227    /// # Ok(())
228    /// # }
229    /// ```
230    ///
231    /// # Errors
232    ///
233    /// Returns `ClientError::Provider` if the provider fails to enqueue the orchestration.
234    pub async fn start_orchestration(
235        &self,
236        instance: impl Into<String>,
237        orchestration: impl Into<String>,
238        input: impl Into<String>,
239    ) -> Result<(), ClientError> {
240        let item = WorkItem::StartOrchestration {
241            instance: instance.into(),
242            orchestration: orchestration.into(),
243            input: input.into(),
244            version: None,
245            parent_instance: None,
246            parent_id: None,
247            execution_id: crate::INITIAL_EXECUTION_ID,
248        };
249        self.store
250            .enqueue_for_orchestrator(item, None)
251            .await
252            .map_err(ClientError::from)
253    }
254
255    /// Start an orchestration instance pinned to a specific version.
256    ///
257    /// # Errors
258    ///
259    /// Returns `ClientError::Provider` if the provider fails to enqueue the orchestration.
260    pub async fn start_orchestration_versioned(
261        &self,
262        instance: impl Into<String>,
263        orchestration: impl Into<String>,
264        version: impl Into<String>,
265        input: impl Into<String>,
266    ) -> Result<(), ClientError> {
267        let item = WorkItem::StartOrchestration {
268            instance: instance.into(),
269            orchestration: orchestration.into(),
270            input: input.into(),
271            version: Some(version.into()),
272            parent_instance: None,
273            parent_id: None,
274            execution_id: crate::INITIAL_EXECUTION_ID,
275        };
276        self.store
277            .enqueue_for_orchestrator(item, None)
278            .await
279            .map_err(ClientError::from)
280    }
281
282    // Note: No delayed scheduling API. Clients should use normal start APIs.
283
284    /// Start an orchestration with typed input (serialized to JSON).
285    ///
286    /// # Errors
287    ///
288    /// Returns `ClientError::InvalidInput` if serialization fails.
289    /// Returns `ClientError::Provider` if the provider fails to enqueue the orchestration.
290    pub async fn start_orchestration_typed<In: Serialize>(
291        &self,
292        instance: impl Into<String>,
293        orchestration: impl Into<String>,
294        input: In,
295    ) -> Result<(), ClientError> {
296        let payload = Json::encode(&input).map_err(|e| ClientError::InvalidInput {
297            message: format!("encode: {e}"),
298        })?;
299        self.start_orchestration(instance, orchestration, payload).await
300    }
301
302    /// Start a versioned orchestration with typed input (serialized to JSON).
303    ///
304    /// # Errors
305    ///
306    /// Returns `ClientError::InvalidInput` if serialization fails.
307    /// Returns `ClientError::Provider` if the provider fails to enqueue the orchestration.
308    pub async fn start_orchestration_versioned_typed<In: Serialize>(
309        &self,
310        instance: impl Into<String>,
311        orchestration: impl Into<String>,
312        version: impl Into<String>,
313        input: In,
314    ) -> Result<(), ClientError> {
315        let payload = Json::encode(&input).map_err(|e| ClientError::InvalidInput {
316            message: format!("encode: {e}"),
317        })?;
318        self.start_orchestration_versioned(instance, orchestration, version, payload)
319            .await
320    }
321
322    /// Raise an external event into a running orchestration instance.
323    ///
324    /// # Purpose
325    ///
326    /// Send a signal/message to a running orchestration that is waiting for an external event.
327    /// The orchestration must have called `ctx.schedule_wait(event_name)` to receive the event.
328    ///
329    /// # Parameters
330    ///
331    /// * `instance` - Instance ID of the running orchestration
332    /// * `event_name` - Name of the event (must match `schedule_wait` name)
333    /// * `data` - Payload data (JSON string, passed to orchestration)
334    ///
335    /// # Behavior
336    ///
337    /// - Enqueues ExternalRaised work item to orchestrator queue
338    /// - If instance isn't waiting for this event (yet), it's buffered
339    /// - Event is matched by NAME (not correlation ID)
340    /// - Multiple events with same name can be raised
341    ///
342    /// # Example
343    ///
344    /// ```rust,no_run
345    /// # use duroxide::{Client, ClientError};
346    /// # async fn example(client: Client) -> Result<(), ClientError> {
347    /// // Orchestration waiting for approval
348    /// // ctx.schedule_wait("ApprovalEvent").await
349    ///
350    /// // External system/human approves
351    /// client.raise_event(
352    ///     "order-123",
353    ///     "ApprovalEvent",
354    ///     r#"{"approved": true, "by": "manager@company.com"}"#
355    /// ).await?;
356    /// # Ok(())
357    /// # }
358    /// ```
359    ///
360    /// # Use Cases
361    ///
362    /// - Human approval workflows
363    /// - Webhook callbacks
364    /// - Inter-orchestration communication
365    /// - External system integration
366    ///
367    /// # Error Cases
368    ///
369    /// - Instance doesn't exist: Event is buffered, orchestration processes when started
370    /// - Instance already completed: Event is ignored gracefully
371    ///
372    /// # Errors
373    ///
374    /// Returns `ClientError::Provider` if the provider fails to enqueue the event.
375    pub async fn raise_event(
376        &self,
377        instance: impl Into<String>,
378        event_name: impl Into<String>,
379        data: impl Into<String>,
380    ) -> Result<(), ClientError> {
381        let item = WorkItem::ExternalRaised {
382            instance: instance.into(),
383            name: event_name.into(),
384            data: data.into(),
385        };
386        self.store
387            .enqueue_for_orchestrator(item, None)
388            .await
389            .map_err(ClientError::from)
390    }
391
392    /// Raise a positional external event with typed data.
393    ///
394    /// Serializes `data` as JSON before sending. Same semantics as [`Self::raise_event`].
395    ///
396    /// # Errors
397    ///
398    /// Returns [`ClientError`] if the provider fails to enqueue the event.
399    pub async fn raise_event_typed<T: serde::Serialize>(
400        &self,
401        instance: impl Into<String>,
402        event_name: impl Into<String>,
403        data: &T,
404    ) -> Result<(), ClientError> {
405        let payload = crate::_typed_codec::Json::encode(data).expect("Serialization should not fail");
406        self.raise_event(instance, event_name, payload).await
407    }
408
409    /// Enqueue a message into a named queue for an orchestration instance.
410    ///
411    /// Queue messages use FIFO mailbox semantics:
412    /// - Matched to [`OrchestrationContext::dequeue_event`] subscriptions in order
413    /// - Stick around until consumed (even if no subscription exists yet)
414    /// - Survive `continue_as_new` boundaries
415    /// - Not affected by subscription cancellation
416    ///
417    /// # Errors
418    ///
419    /// Returns [`ClientError`] if the provider fails to enqueue the message.
420    pub async fn enqueue_event(
421        &self,
422        instance: impl Into<String>,
423        queue: impl Into<String>,
424        data: impl Into<String>,
425    ) -> Result<(), ClientError> {
426        let item = WorkItem::QueueMessage {
427            instance: instance.into(),
428            name: queue.into(),
429            data: data.into(),
430        };
431        self.store
432            .enqueue_for_orchestrator(item, None)
433            .await
434            .map_err(ClientError::from)
435    }
436
437    /// Enqueue a typed message into a named queue for an orchestration instance.
438    ///
439    /// Serializes `data` as JSON before enqueuing. Same semantics as [`Self::enqueue_event`].
440    ///
441    /// # Errors
442    ///
443    /// Returns [`ClientError`] if the provider fails to enqueue the message.
444    pub async fn enqueue_event_typed<T: serde::Serialize>(
445        &self,
446        instance: impl Into<String>,
447        queue: impl Into<String>,
448        data: &T,
449    ) -> Result<(), ClientError> {
450        let payload = crate::_typed_codec::Json::encode(data).expect("Serialization should not fail");
451        self.enqueue_event(instance, queue, payload).await
452    }
453
454    /// Raise a persistent external event that uses mailbox semantics.
455    ///
456    /// Prefer [`Self::enqueue_event`] — this is a deprecated alias.
457    ///
458    /// # Errors
459    ///
460    /// Returns [`ClientError`] if the provider fails to enqueue the event.
461    #[deprecated(note = "Use enqueue_event() instead")]
462    pub async fn raise_event_persistent(
463        &self,
464        instance: impl Into<String>,
465        event_name: impl Into<String>,
466        data: impl Into<String>,
467    ) -> Result<(), ClientError> {
468        self.enqueue_event(instance, event_name, data).await
469    }
470
471    /// V2: Raise an external event with topic-based pub/sub matching.
472    ///
473    /// Same as `raise_event`, but includes a `topic` for pub/sub matching.
474    /// The orchestration must have called `ctx.schedule_wait2(name, topic)` to receive the event.
475    /// Feature-gated for replay engine extensibility verification.
476    ///
477    /// # Errors
478    ///
479    /// Returns [`ClientError`] if the provider fails to enqueue the event.
480    #[cfg(feature = "replay-version-test")]
481    pub async fn raise_event2(
482        &self,
483        instance: impl Into<String>,
484        event_name: impl Into<String>,
485        topic: impl Into<String>,
486        data: impl Into<String>,
487    ) -> Result<(), ClientError> {
488        let item = WorkItem::ExternalRaised2 {
489            instance: instance.into(),
490            name: event_name.into(),
491            topic: topic.into(),
492            data: data.into(),
493        };
494        self.store
495            .enqueue_for_orchestrator(item, None)
496            .await
497            .map_err(ClientError::from)
498    }
499
500    /// Request cancellation of an orchestration instance.
501    ///
502    /// # Purpose
503    ///
504    /// Gracefully cancel a running orchestration. The orchestration will complete its current
505    /// turn and then fail deterministically with a "canceled: {reason}" error.
506    ///
507    /// # Parameters
508    ///
509    /// * `instance` - Instance ID to cancel
510    /// * `reason` - Reason for cancellation (included in error message)
511    ///
512    /// # Behavior
513    ///
514    /// 1. Enqueues CancelInstance work item
515    /// 2. Runtime appends OrchestrationCancelRequested event
516    /// 3. Next turn, orchestration sees cancellation and fails deterministically
517    /// 4. Final status: `OrchestrationStatus::Failed { details: Application::Cancelled }`
518    ///
519    /// # Deterministic Cancellation
520    ///
521    /// Cancellation is **deterministic** - the orchestration fails at a well-defined point:
522    /// - Not mid-activity (activities complete)
523    /// - Not mid-turn (current turn finishes)
524    /// - Failure is recorded in history (replays consistently)
525    ///
526    /// # Propagation
527    ///
528    /// If the orchestration has child sub-orchestrations, they are also cancelled.
529    ///
530    /// # Example
531    ///
532    /// ```ignore
533    /// // Cancel a long-running order
534    /// client.cancel_instance("order-123", "Customer requested cancellation").await?;
535    ///
536    /// // Wait for cancellation to complete
537    /// let status = client.wait_for_orchestration("order-123", std::time::Duration::from_secs(5)).await?;
538    /// match status {
539    ///     OrchestrationStatus::Failed { details, .. } if matches!(
540    ///         details,
541    ///         duroxide::ErrorDetails::Application {
542    ///             kind: duroxide::AppErrorKind::Cancelled { .. },
543    ///             ..
544    ///         }
545    ///     ) => {
546    ///         println!("Successfully cancelled");
547    ///     }
548    ///     _ => {}
549    /// }
550    /// ```
551    ///
552    /// # Error Cases
553    ///
554    /// - Instance already completed: Cancellation is no-op
555    /// - Instance doesn't exist: Cancellation is no-op
556    ///
557    /// # Errors
558    ///
559    /// Returns `ClientError::Provider` if the provider fails to enqueue the cancellation.
560    pub async fn cancel_instance(
561        &self,
562        instance: impl Into<String>,
563        reason: impl Into<String>,
564    ) -> Result<(), ClientError> {
565        let item = WorkItem::CancelInstance {
566            instance: instance.into(),
567            reason: reason.into(),
568        };
569        self.store
570            .enqueue_for_orchestrator(item, None)
571            .await
572            .map_err(ClientError::from)
573    }
574
575    /// Get the current status of an orchestration by inspecting its history.
576    ///
577    /// # Purpose
578    ///
579    /// Query the current state of an orchestration instance without waiting.
580    ///
581    /// # Parameters
582    ///
583    /// * `instance` - Instance ID to query
584    ///
585    /// # Returns
586    ///
587    /// * `OrchestrationStatus::NotFound` - Instance doesn't exist
588    /// * `OrchestrationStatus::Running` - Instance is still executing
589    /// * `OrchestrationStatus::Completed { output, .. }` - Instance completed successfully
590    /// * `OrchestrationStatus::Failed { error }` - Instance failed (includes cancellations)
591    ///
592    /// # Behavior
593    ///
594    /// - Reads instance history from provider
595    /// - Scans for terminal events (Completed/Failed)
596    /// - For multi-execution instances (ContinueAsNew), returns status of LATEST execution
597    ///
598    /// # Performance
599    ///
600    /// This method reads from storage (not cached). For polling, use `wait_for_orchestration` instead.
601    ///
602    /// # Example
603    ///
604    /// ```rust,no_run
605    /// # use duroxide::{Client, ClientError, OrchestrationStatus};
606    /// # async fn example(client: Client) -> Result<(), ClientError> {
607    /// let status = client.get_orchestration_status("order-123").await?;
608    ///
609    /// match status {
610    ///     OrchestrationStatus::NotFound => println!("Instance not found"),
611    ///     OrchestrationStatus::Running { .. } => println!("Still processing"),
612    ///     OrchestrationStatus::Completed { output, .. } => println!("Done: {}", output),
613    ///     OrchestrationStatus::Failed { details, .. } => eprintln!("Error: {}", details.display_message()),
614    /// }
615    /// # Ok(())
616    /// # }
617    /// ```
618    ///
619    /// # Errors
620    ///
621    /// Returns `ClientError::Provider` if the provider fails to read the orchestration history.
622    pub async fn get_orchestration_status(&self, instance: &str) -> Result<OrchestrationStatus, ClientError> {
623        let hist = self.store.read(instance).await.map_err(ClientError::from)?;
624
625        // Query custom status (lightweight, always available)
626        let (custom_status, custom_status_version) = match self.store.get_custom_status(instance, 0).await {
627            Ok(Some((cs, v))) => (cs, v),
628            Ok(None) => (None, 0),
629            Err(_) => (None, 0), // Best-effort: don't fail status query for custom_status errors
630        };
631
632        // Find terminal events first
633        for e in hist.iter().rev() {
634            match &e.kind {
635                EventKind::OrchestrationCompleted { output } => {
636                    return Ok(OrchestrationStatus::Completed {
637                        output: output.clone(),
638                        custom_status,
639                        custom_status_version,
640                    });
641                }
642                EventKind::OrchestrationFailed { details } => {
643                    return Ok(OrchestrationStatus::Failed {
644                        details: details.clone(),
645                        custom_status,
646                        custom_status_version,
647                    });
648                }
649                _ => {}
650            }
651        }
652        // If we ever saw a start, it's running
653        if hist
654            .iter()
655            .any(|e| matches!(&e.kind, EventKind::OrchestrationStarted { .. }))
656        {
657            Ok(OrchestrationStatus::Running {
658                custom_status,
659                custom_status_version,
660            })
661        } else {
662            Ok(OrchestrationStatus::NotFound)
663        }
664    }
665
666    /// Wait until terminal state or timeout using provider reads.
667    ///
668    /// # Purpose
669    ///
670    /// Poll for orchestration completion with exponential backoff, returning when terminal or timeout.
671    ///
672    /// # Parameters
673    ///
674    /// * `instance` - Instance ID to wait for
675    /// * `timeout` - Maximum time to wait before returning timeout error
676    ///
677    /// # Returns
678    ///
679    /// * `Ok(OrchestrationStatus::Completed { output, .. })` - Orchestration completed successfully
680    /// * `Ok(OrchestrationStatus::Failed { details, .. })` - Orchestration failed (includes cancellations)
681    /// * `Err(ClientError::Timeout)` - Timeout elapsed while still Running
682    /// * `Err(ClientError::Provider(e))` - Provider/Storage error
683    ///
684    /// **Note:** Never returns `NotFound` or `Running` - only terminal states or timeout.
685    ///
686    /// # Polling Behavior
687    ///
688    /// - First check: Immediate (no delay)
689    /// - Subsequent checks: Exponential backoff starting at 5ms, doubling each iteration, max 100ms
690    /// - Continues until terminal state or timeout
691    ///
692    /// # Example
693    ///
694    /// ```ignore
695    /// // Start orchestration
696    /// client.start_orchestration("order-123", "ProcessOrder", "{}").await?;
697    ///
698    /// // Wait up to 30 seconds
699    /// match client.wait_for_orchestration("order-123", std::time::Duration::from_secs(30)).await {
700    ///     Ok(OrchestrationStatus::Completed { output, .. }) => {
701    ///         println!("Success: {}", output);
702    ///     }
703    ///     Ok(OrchestrationStatus::Failed { details, .. }) => {
704    ///         eprintln!("Failed ({}): {}", details.category(), details.display_message());
705    ///     }
706    ///     Err(ClientError::Timeout) => {
707    ///         println!("Still running after 30s, instance: order-123");
708    ///         // Instance is still running - can wait more or cancel
709    ///     }
710    ///     _ => unreachable!("wait_for_orchestration only returns terminal or timeout"),
711    /// }
712    /// ```
713    ///
714    /// # Use Cases
715    ///
716    /// - Synchronous request/response workflows
717    /// - Testing (wait for workflow to complete)
718    /// - CLI tools (block until done)
719    /// - Health checks
720    ///
721    /// # For Long-Running Workflows
722    ///
723    /// Don't wait for hours/days:
724    /// ```ignore
725    /// // Start workflow
726    /// client.start_orchestration("batch-job", "ProcessBatch", "{}").await.unwrap();
727    ///
728    /// // DON'T wait for hours
729    /// // let status = client.wait_for_orchestration("batch-job", Duration::from_hours(24)).await;
730    ///
731    /// // DO poll periodically
732    /// loop {
733    ///     match client.get_orchestration_status("batch-job").await {
734    ///         OrchestrationStatus::Completed { .. } => break,
735    ///         OrchestrationStatus::Failed { .. } => break,
736    ///         _ => tokio::time::sleep(std::time::Duration::from_secs(60)).await,
737    ///     }
738    /// }
739    /// ```
740    ///
741    /// # Errors
742    ///
743    /// Returns `ClientError::Provider` if the provider fails to read the orchestration status.
744    /// Returns `ClientError::Timeout` if the orchestration doesn't complete within the timeout.
745    pub async fn wait_for_orchestration(
746        &self,
747        instance: &str,
748        timeout: std::time::Duration,
749    ) -> Result<OrchestrationStatus, ClientError> {
750        let deadline = std::time::Instant::now() + timeout;
751        // quick path
752        match self.get_orchestration_status(instance).await {
753            Ok(s @ OrchestrationStatus::Completed { .. }) => return Ok(s),
754            Ok(s @ OrchestrationStatus::Failed { .. }) => return Ok(s),
755            Err(e) => return Err(e),
756            _ => {}
757        }
758        // poll with backoff
759        let mut delay_ms: u64 = INITIAL_POLL_DELAY_MS;
760        while std::time::Instant::now() < deadline {
761            match self.get_orchestration_status(instance).await {
762                Ok(s @ OrchestrationStatus::Completed { .. }) => return Ok(s),
763                Ok(s @ OrchestrationStatus::Failed { .. }) => return Ok(s),
764                Err(e) => return Err(e),
765                _ => {
766                    tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
767                    delay_ms = (delay_ms.saturating_mul(POLL_DELAY_MULTIPLIER)).min(MAX_POLL_DELAY_MS);
768                }
769            }
770        }
771        Err(ClientError::Timeout)
772    }
773
774    /// Typed wait helper: decodes output on Completed, returns Err(String) on Failed.
775    ///
776    /// # Errors
777    ///
778    /// Returns `ClientError::Provider` if the provider fails to read the orchestration status.
779    /// Returns `ClientError::Timeout` if the orchestration doesn't complete within the timeout.
780    /// Returns `ClientError::InvalidInput` if deserialization of the output fails.
781    pub async fn wait_for_orchestration_typed<Out: serde::de::DeserializeOwned>(
782        &self,
783        instance: &str,
784        timeout: std::time::Duration,
785    ) -> Result<Result<Out, String>, ClientError> {
786        match self.wait_for_orchestration(instance, timeout).await? {
787            OrchestrationStatus::Completed { output, .. } => match Json::decode::<Out>(&output) {
788                Ok(v) => Ok(Ok(v)),
789                Err(e) => Err(ClientError::InvalidInput {
790                    message: format!("decode failed: {e}"),
791                }),
792            },
793            OrchestrationStatus::Failed { details, .. } => Ok(Err(details.display_message())),
794            _ => unreachable!("wait_for_orchestration returns only terminal or timeout"),
795        }
796    }
797
798    /// Wait for custom_status to change, polling the provider at `poll_interval`.
799    ///
800    /// Returns the full `OrchestrationStatus` when:
801    /// - The `custom_status_version` exceeds `last_seen_version`
802    /// - The orchestration reaches a terminal state (Completed/Failed)
803    /// - The timeout elapses (returns `Err(ClientError::Timeout)`)
804    ///
805    /// # Parameters
806    ///
807    /// * `instance` - Instance ID to monitor
808    /// * `last_seen_version` - The version the caller last observed (0 to get any status)
809    /// * `poll_interval` - How often to check the provider
810    /// * `timeout` - Maximum time to wait
811    ///
812    /// # Example
813    ///
814    /// ```ignore
815    /// let mut version = 0u64;
816    /// loop {
817    ///     match client.wait_for_status_change("order-123", version, Duration::from_millis(200), Duration::from_secs(30)).await {
818    ///         Ok(OrchestrationStatus::Running { custom_status, custom_status_version }) => {
819    ///             println!("Progress: {:?}", custom_status);
820    ///             version = custom_status_version;
821    ///         }
822    ///         Ok(OrchestrationStatus::Completed { output, .. }) => {
823    ///             println!("Done: {output}");
824    ///             break;
825    ///         }
826    ///         _ => break,
827    ///     }
828    /// }
829    /// ```
830    ///
831    /// # Errors
832    ///
833    /// Returns [`ClientError`] if the provider fails or the instance doesn't exist.
834    pub async fn wait_for_status_change(
835        &self,
836        instance: &str,
837        last_seen_version: u64,
838        poll_interval: std::time::Duration,
839        timeout: std::time::Duration,
840    ) -> Result<OrchestrationStatus, ClientError> {
841        let deadline = std::time::Instant::now() + timeout;
842
843        while std::time::Instant::now() < deadline {
844            // Lightweight check: just custom_status + version
845            match self.store.get_custom_status(instance, last_seen_version).await {
846                Ok(Some(_)) => {
847                    // Version changed — return full status
848                    return self.get_orchestration_status(instance).await;
849                }
850                Ok(None) => {
851                    // No change — check if terminal before sleeping
852                    // (get_custom_status returns None if version hasn't changed,
853                    //  but also if the instance doesn't exist or is terminal)
854                }
855                Err(e) => return Err(ClientError::from(e)),
856            }
857
858            // Also check for terminal state (in case orchestration completed
859            // without ever updating custom_status)
860            match self.get_orchestration_status(instance).await? {
861                OrchestrationStatus::Running { .. } | OrchestrationStatus::NotFound => {
862                    // Still running — sleep and retry
863                    let remaining = deadline.saturating_duration_since(std::time::Instant::now());
864                    tokio::time::sleep(poll_interval.min(remaining)).await;
865                }
866                terminal => return Ok(terminal),
867            }
868        }
869
870        Err(ClientError::Timeout)
871    }
872
873    // ===== Capability Discovery =====
874
875    /// Check if management capabilities are available.
876    ///
877    /// # Returns
878    ///
879    /// `true` if the provider implements `ProviderAdmin`, `false` otherwise.
880    ///
881    /// # Usage
882    ///
883    /// ```ignore
884    /// let client = Client::new(provider);
885    /// if client.has_management_capability() {
886    ///     let instances = client.list_all_instances().await?;
887    /// } else {
888    ///     println!("Management features not available");
889    /// }
890    /// ```
891    pub fn has_management_capability(&self) -> bool {
892        self.discover_management().is_ok()
893    }
894
895    /// Automatically discover management capabilities from the provider.
896    ///
897    /// # Returns
898    ///
899    /// `Ok(&dyn ManagementCapability)` if available, `Err(String)` if not.
900    ///
901    /// # Internal Use
902    ///
903    /// This method is used internally by management methods to access capabilities.
904    fn discover_management(&self) -> Result<&dyn ProviderAdmin, ClientError> {
905        self.store
906            .as_management_capability()
907            .ok_or(ClientError::ManagementNotAvailable)
908    }
909
910    // ===== Rich Management Methods =====
911
912    /// List all orchestration instances.
913    ///
914    /// # Returns
915    ///
916    /// Vector of instance IDs, typically sorted by creation time (newest first).
917    ///
918    /// # Errors
919    ///
920    /// Returns `Err("Management features not available")` if the provider doesn't implement `ProviderAdmin`.
921    ///
922    /// # Usage
923    ///
924    /// ```ignore
925    /// let client = Client::new(provider);
926    /// if client.has_management_capability() {
927    ///     let instances = client.list_all_instances().await?;
928    ///     for instance in instances {
929    ///         println!("Instance: {}", instance);
930    ///     }
931    /// }
932    /// ```
933    pub async fn list_all_instances(&self) -> Result<Vec<String>, ClientError> {
934        self.discover_management()?
935            .list_instances()
936            .await
937            .map_err(ClientError::from)
938    }
939
940    /// List instances matching a status filter.
941    ///
942    /// # Parameters
943    ///
944    /// * `status` - Filter by execution status: "Running", "Completed", "Failed", "ContinuedAsNew"
945    ///
946    /// # Returns
947    ///
948    /// Vector of instance IDs with the specified status.
949    ///
950    /// # Errors
951    ///
952    /// Returns `Err("Management features not available")` if the provider doesn't implement `ProviderAdmin`.
953    ///
954    /// # Usage
955    ///
956    /// ```ignore
957    /// let client = Client::new(provider);
958    /// if client.has_management_capability() {
959    ///     let running = client.list_instances_by_status("Running").await?;
960    ///     let completed = client.list_instances_by_status("Completed").await?;
961    ///     println!("Running: {}, Completed: {}", running.len(), completed.len());
962    /// }
963    /// ```
964    pub async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ClientError> {
965        self.discover_management()?
966            .list_instances_by_status(status)
967            .await
968            .map_err(ClientError::from)
969    }
970
971    /// Get comprehensive information about an instance.
972    ///
973    /// # Parameters
974    ///
975    /// * `instance` - The ID of the orchestration instance.
976    ///
977    /// # Returns
978    ///
979    /// Detailed instance information including status, output, and metadata.
980    ///
981    /// # Errors
982    ///
983    /// Returns `Err("Management features not available")` if the provider doesn't implement `ProviderAdmin`.
984    ///
985    /// # Usage
986    ///
987    /// ```ignore
988    /// let client = Client::new(provider);
989    /// if client.has_management_capability() {
990    ///     let info = client.get_instance_info("order-123").await?;
991    ///     println!("Instance {}: {} ({})", info.instance_id, info.orchestration_name, info.status);
992    /// }
993    /// ```
994    pub async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ClientError> {
995        self.discover_management()?
996            .get_instance_info(instance)
997            .await
998            .map_err(ClientError::from)
999    }
1000
1001    /// Get detailed information about a specific execution.
1002    ///
1003    /// # Parameters
1004    ///
1005    /// * `instance` - The ID of the orchestration instance.
1006    /// * `execution_id` - The specific execution ID.
1007    ///
1008    /// # Returns
1009    ///
1010    /// Detailed execution information including status, output, and event count.
1011    ///
1012    /// # Errors
1013    ///
1014    /// Returns `Err("Management features not available")` if the provider doesn't implement `ProviderAdmin`.
1015    ///
1016    /// # Usage
1017    ///
1018    /// ```ignore
1019    /// let client = Client::new(provider);
1020    /// if client.has_management_capability() {
1021    ///     let info = client.get_execution_info("order-123", 1).await?;
1022    ///     println!("Execution {}: {} events, status: {}", info.execution_id, info.event_count, info.status);
1023    /// }
1024    /// ```
1025    pub async fn get_execution_info(&self, instance: &str, execution_id: u64) -> Result<ExecutionInfo, ClientError> {
1026        self.discover_management()?
1027            .get_execution_info(instance, execution_id)
1028            .await
1029            .map_err(ClientError::from)
1030    }
1031
1032    /// List all execution IDs for an instance.
1033    ///
1034    /// Returns execution IDs in ascending order: \[1\], \[1, 2\], \[1, 2, 3\], etc.
1035    /// Each execution represents either the initial run or a continuation via ContinueAsNew.
1036    ///
1037    /// # Parameters
1038    ///
1039    /// * `instance` - The instance ID to query
1040    ///
1041    /// # Returns
1042    ///
1043    /// Vector of execution IDs in ascending order.
1044    ///
1045    /// # Errors
1046    ///
1047    /// Returns an error if:
1048    /// - The provider doesn't support management capabilities
1049    /// - The database query fails
1050    ///
1051    /// # Usage
1052    ///
1053    /// ```ignore
1054    /// let client = Client::new(provider);
1055    /// if client.has_management_capability() {
1056    ///     let executions = client.list_executions("order-123").await?;
1057    ///     println!("Instance has {} executions", executions.len()); // [1, 2, 3]
1058    /// }
1059    /// ```
1060    pub async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ClientError> {
1061        let mgmt = self.discover_management()?;
1062        mgmt.list_executions(instance).await.map_err(ClientError::from)
1063    }
1064
1065    /// Read the full event history for a specific execution within an instance.
1066    ///
1067    /// Returns all events for the specified execution in chronological order.
1068    /// Each execution has its own independent history starting from OrchestrationStarted.
1069    ///
1070    /// # Parameters
1071    ///
1072    /// * `instance` - The instance ID
1073    /// * `execution_id` - The specific execution ID (starts at 1)
1074    ///
1075    /// # Returns
1076    ///
1077    /// Vector of events in chronological order (oldest first).
1078    ///
1079    /// # Errors
1080    ///
1081    /// Returns an error if:
1082    /// - The provider doesn't support management capabilities
1083    /// - The instance or execution doesn't exist
1084    /// - The database query fails
1085    ///
1086    /// # Usage
1087    ///
1088    /// ```ignore
1089    /// let client = Client::new(provider);
1090    /// if client.has_management_capability() {
1091    ///     let history = client.read_execution_history("order-123", 1).await?;
1092    ///     for event in history {
1093    ///         println!("Event: {:?}", event);
1094    ///     }
1095    /// }
1096    /// ```
1097    pub async fn read_execution_history(
1098        &self,
1099        instance: &str,
1100        execution_id: u64,
1101    ) -> Result<Vec<crate::Event>, ClientError> {
1102        let mgmt = self.discover_management()?;
1103        mgmt.read_history_with_execution_id(instance, execution_id)
1104            .await
1105            .map_err(ClientError::from)
1106    }
1107
1108    /// Get system-wide metrics for the orchestration engine.
1109    ///
1110    /// # Returns
1111    ///
1112    /// System metrics including instance counts, execution counts, and status breakdown.
1113    ///
1114    /// # Errors
1115    ///
1116    /// Returns `Err("Management features not available")` if the provider doesn't implement `ProviderAdmin`.
1117    ///
1118    /// # Usage
1119    ///
1120    /// ```ignore
1121    /// let client = Client::new(provider);
1122    /// if client.has_management_capability() {
1123    ///     let metrics = client.get_system_metrics().await?;
1124    ///     println!("System health: {} running, {} completed, {} failed",
1125    ///         metrics.running_instances, metrics.completed_instances, metrics.failed_instances);
1126    /// }
1127    /// ```
1128    pub async fn get_system_metrics(&self) -> Result<SystemMetrics, ClientError> {
1129        self.discover_management()?
1130            .get_system_metrics()
1131            .await
1132            .map_err(ClientError::from)
1133    }
1134
1135    /// Get the current depths of the internal work queues.
1136    ///
1137    /// # Returns
1138    ///
1139    /// Queue depths for orchestrator, worker, and timer queues.
1140    ///
1141    /// # Errors
1142    ///
1143    /// Returns `Err("Management features not available")` if the provider doesn't implement `ProviderAdmin`.
1144    ///
1145    /// # Usage
1146    ///
1147    /// ```ignore
1148    /// let client = Client::new(provider);
1149    /// if client.has_management_capability() {
1150    ///     let queues = client.get_queue_depths().await?;
1151    ///     println!("Queue depths - Orchestrator: {}, Worker: {}, Timer: {}",
1152    ///         queues.orchestrator_queue, queues.worker_queue, queues.timer_queue);
1153    /// }
1154    /// ```
1155    pub async fn get_queue_depths(&self) -> Result<QueueDepths, ClientError> {
1156        self.discover_management()?
1157            .get_queue_depths()
1158            .await
1159            .map_err(ClientError::from)
1160    }
1161
1162    // ===== Hierarchy Operations =====
1163
1164    /// Get the full instance tree rooted at the given instance.
1165    ///
1166    /// Returns all instances in the tree: the root, all children, grandchildren, etc.
1167    /// Useful for inspecting hierarchy before deletion, or for understanding
1168    /// sub-orchestration relationships.
1169    ///
1170    /// # Parameters
1171    ///
1172    /// * `instance_id` - The root instance ID to start from.
1173    ///
1174    /// # Errors
1175    ///
1176    /// Returns [`ClientError::InstanceNotFound`] if the instance doesn't exist.
1177    /// Returns [`ClientError::ProviderError`] for database/connection issues.
1178    ///
1179    /// # Example
1180    ///
1181    /// ```rust,no_run
1182    /// # use duroxide::{Client, ClientError};
1183    /// # async fn example(client: Client) -> Result<(), ClientError> {
1184    /// let tree = client.get_instance_tree("order-123").await?;
1185    /// println!("Will delete {} instances", tree.size());
1186    /// for id in &tree.all_ids {
1187    ///     println!("  - {}", id);
1188    /// }
1189    /// client.delete_instance("order-123", false).await?;
1190    /// # Ok(())
1191    /// # }
1192    /// ```
1193    pub async fn get_instance_tree(&self, instance_id: &str) -> Result<InstanceTree, ClientError> {
1194        let mgmt = self.discover_management()?;
1195        mgmt.get_instance_tree(instance_id).await.map_err(ClientError::from)
1196    }
1197
1198    // ===== Deletion and Pruning Operations =====
1199
1200    /// Delete an orchestration instance and all its associated data.
1201    ///
1202    /// This removes the instance, all executions, all history events, and any
1203    /// pending queue messages (orchestrator, worker, timer).
1204    ///
1205    /// # Parameters
1206    ///
1207    /// * `instance_id` - The ID of the instance to delete.
1208    /// * `force` - If true, delete even if the instance is in Running state.
1209    ///   WARNING: Force delete only removes database state; it does NOT cancel
1210    ///   in-flight tokio tasks. Use `cancel_instance` first for graceful termination.
1211    ///
1212    /// # Errors
1213    ///
1214    /// - [`ClientError::InstanceStillRunning`] - Instance is running and force=false.
1215    /// - [`ClientError::CannotDeleteSubOrchestration`] - Instance is a sub-orchestration.
1216    /// - [`ClientError::InstanceNotFound`] - Instance doesn't exist.
1217    /// - [`ClientError::ProviderError`] - Database/connection issues.
1218    ///
1219    /// # Cascading Delete
1220    ///
1221    /// If the instance is a root orchestration with sub-orchestrations, all descendants
1222    /// are included in the deletion (performed atomically in a single transaction).
1223    ///
1224    /// # Example
1225    ///
1226    /// ```rust,no_run
1227    /// # use duroxide::{Client, ClientError};
1228    /// # async fn example(client: Client) -> Result<(), ClientError> {
1229    /// // Delete a completed instance
1230    /// let result = client.delete_instance("order-123", false).await?;
1231    /// println!("Deleted {} events", result.events_deleted);
1232    ///
1233    /// // Graceful pattern: cancel first, then delete
1234    /// client.cancel_instance("workflow-456", "cleanup").await?;
1235    /// // Wait for cancellation to complete...
1236    /// client.delete_instance("workflow-456", false).await?;
1237    ///
1238    /// // Force delete an instance stuck in Running state
1239    /// client.delete_instance("stuck-workflow", true).await?;
1240    /// # Ok(())
1241    /// # }
1242    /// ```
1243    pub async fn delete_instance(&self, instance_id: &str, force: bool) -> Result<DeleteInstanceResult, ClientError> {
1244        let mgmt = self.discover_management()?;
1245        let result = mgmt
1246            .delete_instance(instance_id, force)
1247            .await
1248            .map_err(|e| Self::translate_delete_error(e, instance_id))?;
1249
1250        info!(
1251            instance_id = %instance_id,
1252            force = %force,
1253            instances_deleted = %result.instances_deleted,
1254            executions_deleted = %result.executions_deleted,
1255            events_deleted = %result.events_deleted,
1256            queue_messages_deleted = %result.queue_messages_deleted,
1257            "Deleted instance"
1258        );
1259
1260        Ok(result)
1261    }
1262
1263    /// Delete multiple orchestration instances matching the filter criteria.
1264    ///
1265    /// Only instances in terminal states (Completed, Failed) are eligible.
1266    /// Running instances are silently skipped (not an error).
1267    ///
1268    /// # Parameters
1269    ///
1270    /// * `filter` - Criteria for selecting instances to delete. All criteria are ANDed.
1271    ///
1272    /// # Filter Behavior
1273    ///
1274    /// - `instance_ids`: Allowlist of specific IDs to consider
1275    /// - `completed_before`: Only delete instances completed before this timestamp (ms since epoch)
1276    /// - `limit`: Maximum number of instances to delete (default: 1000)
1277    ///
1278    /// # Errors
1279    ///
1280    /// Returns [`ClientError::ProviderError`] for database/connection issues.
1281    ///
1282    /// # Example
1283    ///
1284    /// ```rust,no_run
1285    /// # use duroxide::{Client, ClientError, InstanceFilter};
1286    /// # async fn example(client: Client, now_ms: u64) -> Result<(), ClientError> {
1287    /// // Delete specific instances
1288    /// let result = client.delete_instance_bulk(InstanceFilter {
1289    ///     instance_ids: Some(vec!["order-1".into(), "order-2".into()]),
1290    ///     ..Default::default()
1291    /// }).await?;
1292    ///
1293    /// // Delete by age (retention policy)
1294    /// let five_days_ago = now_ms - (5 * 24 * 60 * 60 * 1000);
1295    /// let result = client.delete_instance_bulk(InstanceFilter {
1296    ///     completed_before: Some(five_days_ago),
1297    ///     limit: Some(500),
1298    ///     ..Default::default()
1299    /// }).await?;
1300    /// # Ok(())
1301    /// # }
1302    /// ```
1303    pub async fn delete_instance_bulk(&self, filter: InstanceFilter) -> Result<DeleteInstanceResult, ClientError> {
1304        let result = self
1305            .discover_management()?
1306            .delete_instance_bulk(filter.clone())
1307            .await
1308            .map_err(ClientError::from)?;
1309
1310        info!(
1311            filter = ?filter,
1312            instances_deleted = %result.instances_deleted,
1313            executions_deleted = %result.executions_deleted,
1314            events_deleted = %result.events_deleted,
1315            queue_messages_deleted = %result.queue_messages_deleted,
1316            "Bulk deleted instances"
1317        );
1318
1319        Ok(result)
1320    }
1321
1322    /// Prune old executions from a single long-running instance.
1323    ///
1324    /// Use this for `ContinueAsNew` workflows that accumulate many executions
1325    /// over time. The current (active) execution is never pruned.
1326    ///
1327    /// # Parameters
1328    ///
1329    /// * `instance_id` - The instance to prune executions from.
1330    /// * `options` - Criteria for selecting executions to delete. All criteria are ANDed.
1331    ///
1332    /// # Options Behavior
1333    ///
1334    /// - `keep_last`: Keep the last N executions by execution_id
1335    /// - `completed_before`: Only delete executions completed before this timestamp (ms)
1336    /// - Running executions are NEVER pruned
1337    /// - The current execution is NEVER pruned
1338    ///
1339    /// # Errors
1340    ///
1341    /// Returns [`ClientError::ProviderError`] for database/connection issues.
1342    ///
1343    /// # Example
1344    ///
1345    /// ```rust,no_run
1346    /// # use duroxide::{Client, ClientError, PruneOptions};
1347    /// # async fn example(client: Client, now_ms: u64) -> Result<(), ClientError> {
1348    /// // Keep only the last 10 executions
1349    /// let result = client.prune_executions("eternal-workflow", PruneOptions {
1350    ///     keep_last: Some(10),
1351    ///     ..Default::default()
1352    /// }).await?;
1353    ///
1354    /// // Delete executions older than 30 days
1355    /// let thirty_days_ago = now_ms - (30 * 24 * 60 * 60 * 1000);
1356    /// let result = client.prune_executions("eternal-workflow", PruneOptions {
1357    ///     completed_before: Some(thirty_days_ago),
1358    ///     ..Default::default()
1359    /// }).await?;
1360    /// # Ok(())
1361    /// # }
1362    /// ```
1363    pub async fn prune_executions(&self, instance_id: &str, options: PruneOptions) -> Result<PruneResult, ClientError> {
1364        let result = self
1365            .discover_management()?
1366            .prune_executions(instance_id, options.clone())
1367            .await
1368            .map_err(ClientError::from)?;
1369
1370        info!(
1371            instance_id = %instance_id,
1372            options = ?options,
1373            executions_deleted = %result.executions_deleted,
1374            events_deleted = %result.events_deleted,
1375            instances_processed = %result.instances_processed,
1376            "Pruned executions"
1377        );
1378
1379        Ok(result)
1380    }
1381
1382    /// Prune old executions from multiple instances matching the filter.
1383    ///
1384    /// Applies the same prune options to all matching instances.
1385    ///
1386    /// # Parameters
1387    ///
1388    /// * `filter` - Criteria for selecting instances to process.
1389    /// * `options` - Criteria for selecting executions to delete within each instance.
1390    ///
1391    /// # Errors
1392    ///
1393    /// Returns [`ClientError::ProviderError`] for database/connection issues.
1394    ///
1395    /// # Example
1396    ///
1397    /// ```rust,no_run
1398    /// # use duroxide::{Client, ClientError, InstanceFilter, PruneOptions};
1399    /// # async fn example(client: Client) -> Result<(), ClientError> {
1400    /// // Prune all terminal instances: keep last 10 executions each
1401    /// let result = client.prune_executions_bulk(
1402    ///     InstanceFilter { limit: Some(100), ..Default::default() },
1403    ///     PruneOptions { keep_last: Some(10), ..Default::default() },
1404    /// ).await?;
1405    /// println!("Pruned {} executions across {} instances",
1406    ///     result.executions_deleted, result.instances_processed);
1407    /// # Ok(())
1408    /// # }
1409    /// ```
1410    pub async fn prune_executions_bulk(
1411        &self,
1412        filter: InstanceFilter,
1413        options: PruneOptions,
1414    ) -> Result<PruneResult, ClientError> {
1415        let result = self
1416            .discover_management()?
1417            .prune_executions_bulk(filter.clone(), options.clone())
1418            .await
1419            .map_err(ClientError::from)?;
1420
1421        info!(
1422            filter = ?filter,
1423            options = ?options,
1424            executions_deleted = %result.executions_deleted,
1425            events_deleted = %result.events_deleted,
1426            instances_processed = %result.instances_processed,
1427            "Bulk pruned executions"
1428        );
1429
1430        Ok(result)
1431    }
1432
1433    /// Translate provider errors into more specific client errors for delete operations.
1434    fn translate_delete_error(e: ProviderError, instance_id: &str) -> ClientError {
1435        let msg = e.to_string().to_lowercase();
1436        if msg.contains("not found") {
1437            ClientError::InstanceNotFound {
1438                instance_id: instance_id.to_string(),
1439            }
1440        } else if msg.contains("still running") {
1441            ClientError::InstanceStillRunning {
1442                instance_id: instance_id.to_string(),
1443            }
1444        } else if msg.contains("sub-orchestration") || msg.contains("delete the root") {
1445            ClientError::CannotDeleteSubOrchestration {
1446                instance_id: instance_id.to_string(),
1447            }
1448        } else {
1449            ClientError::Provider(e)
1450        }
1451    }
1452}