duroxide/client/mod.rs
1use std::sync::Arc;
2
3use tracing::info;
4
5use crate::_typed_codec::{Codec, Json};
6use crate::providers::{
7 DeleteInstanceResult, ExecutionInfo, InstanceFilter, InstanceInfo, InstanceTree, Provider, ProviderAdmin,
8 ProviderError, PruneOptions, PruneResult, QueueDepths, SystemMetrics, WorkItem,
9};
10use crate::{EventKind, OrchestrationStatus};
11use serde::Serialize;
12
13/// Client-specific error type that wraps provider errors and adds client-specific errors.
14///
15/// This enum allows callers to distinguish between:
16/// - Provider errors (storage failures, can be retryable or permanent)
17/// - Client-specific errors (validation, capability not available, etc.)
18#[derive(Debug, Clone)]
19pub enum ClientError {
20 /// Provider operation failed (wraps ProviderError)
21 Provider(ProviderError),
22
23 /// Management capability not available
24 ManagementNotAvailable,
25
26 /// Invalid input (client validation)
27 InvalidInput { message: String },
28
29 /// Operation timed out
30 Timeout,
31
32 /// Instance is still running (for delete without force)
33 InstanceStillRunning { instance_id: String },
34
35 /// Cannot delete a sub-orchestration directly (must delete root)
36 CannotDeleteSubOrchestration { instance_id: String },
37
38 /// Instance not found
39 InstanceNotFound { instance_id: String },
40}
41
42impl ClientError {
43 /// Check if this error is retryable (only applies to Provider errors)
44 pub fn is_retryable(&self) -> bool {
45 match self {
46 ClientError::Provider(e) => e.is_retryable(),
47 ClientError::ManagementNotAvailable => false,
48 ClientError::InvalidInput { .. } => false,
49 ClientError::Timeout => true,
50 ClientError::InstanceStillRunning { .. } => false,
51 ClientError::CannotDeleteSubOrchestration { .. } => false,
52 ClientError::InstanceNotFound { .. } => false,
53 }
54 }
55}
56
57impl std::fmt::Display for ClientError {
58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59 match self {
60 ClientError::Provider(e) => write!(f, "{e}"),
61 ClientError::ManagementNotAvailable => write!(
62 f,
63 "Management features not available - provider doesn't implement ProviderAdmin"
64 ),
65 ClientError::InvalidInput { message } => write!(f, "Invalid input: {message}"),
66 ClientError::Timeout => write!(f, "Operation timed out"),
67 ClientError::InstanceStillRunning { instance_id } => write!(
68 f,
69 "Instance {instance_id} is still running. Use force=true or cancel first."
70 ),
71 ClientError::CannotDeleteSubOrchestration { instance_id } => write!(
72 f,
73 "Cannot delete sub-orchestration {instance_id} directly. Delete the root orchestration instead."
74 ),
75 ClientError::InstanceNotFound { instance_id } => {
76 write!(f, "Instance {instance_id} not found")
77 }
78 }
79 }
80}
81
82impl std::error::Error for ClientError {}
83
84impl From<ProviderError> for ClientError {
85 fn from(e: ProviderError) -> Self {
86 ClientError::Provider(e)
87 }
88}
89
90// Constants for polling behavior in wait_for_orchestration
91/// Initial delay between status polls (5ms)
92const INITIAL_POLL_DELAY_MS: u64 = 5;
93
94/// Maximum delay between status polls (100ms)
95const MAX_POLL_DELAY_MS: u64 = 100;
96
97/// Multiplier for exponential backoff
98const POLL_DELAY_MULTIPLIER: u64 = 2;
99
100/// Client for orchestration control-plane operations with automatic capability discovery.
101///
102/// The Client provides APIs for managing orchestration instances:
103/// - Starting orchestrations
104/// - Raising external events
105/// - Cancelling instances
106/// - Checking status
107/// - Waiting for completion
108/// - Rich management features (when available)
109///
110/// # Automatic Capability Discovery
111///
112/// The Client automatically discovers provider capabilities through the `Provider::as_management_capability()` method.
113/// When a provider implements `ProviderAdmin`, rich management features become available:
114///
115/// ```ignore
116/// let client = Client::new(provider);
117///
118/// // Control plane (always available)
119/// client.start_orchestration("order-1", "ProcessOrder", "{}").await?;
120///
121/// // Management (automatically discovered)
122/// if client.has_management_capability() {
123/// let instances = client.list_all_instances().await?;
124/// let metrics = client.get_system_metrics().await?;
125/// } else {
126/// println!("Management features not available");
127/// }
128/// ```
129///
130/// # Design
131///
132/// The Client communicates with the Runtime **only through the shared Provider** (no direct coupling).
133/// This allows the Client to be used from any process, even one without a running Runtime.
134///
135/// # Thread Safety
136///
137/// Client is `Clone` and can be safely shared across threads.
138///
139/// # Example Usage
140///
141/// ```ignore
142/// use duroxide::{Client, OrchestrationStatus};
143/// use duroxide::providers::sqlite::SqliteProvider;
144/// use std::sync::Arc;
145///
146/// use duroxide::ClientError;
147/// let store = Arc::new(SqliteProvider::new("sqlite:./data.db").await?);
148/// let client = Client::new(store);
149///
150/// // Start an orchestration
151/// client.start_orchestration("order-123", "ProcessOrder", r#"{"customer_id": "c1"}"#).await?;
152///
153/// // Check status
154/// let status = client.get_orchestration_status("order-123").await?;
155/// println!("Status: {:?}", status);
156///
157/// // Wait for completion
158/// let result = client.wait_for_orchestration("order-123", std::time::Duration::from_secs(30)).await.unwrap();
159/// match result {
160/// OrchestrationStatus::Completed { output, .. } => println!("Done: {}", output),
161/// OrchestrationStatus::Failed { details, .. } => {
162/// eprintln!("Failed ({}): {}", details.category(), details.display_message());
163/// }
164/// _ => {}
165/// }
166/// ```
167pub struct Client {
168 store: Arc<dyn Provider>,
169}
170
171impl Client {
172 /// Create a client bound to a Provider instance.
173 ///
174 /// # Parameters
175 ///
176 /// * `store` - Arc-wrapped Provider (same instance used by Runtime)
177 ///
178 /// # Example
179 ///
180 /// ```ignore
181 /// let store = Arc::new(SqliteProvider::new("sqlite::memory:").await.unwrap());
182 /// let client = Client::new(store.clone());
183 /// // Multiple clients can share the same store
184 /// let client2 = client.clone();
185 /// ```
186 pub fn new(store: Arc<dyn Provider>) -> Self {
187 Self { store }
188 }
189
190 /// Start an orchestration instance with string input.
191 ///
192 /// # Parameters
193 ///
194 /// * `instance` - Unique instance ID (e.g., "order-123", "user-payment-456")
195 /// * `orchestration` - Name of registered orchestration (e.g., "ProcessOrder")
196 /// * `input` - JSON string input (will be passed to orchestration)
197 ///
198 /// # Returns
199 ///
200 /// * `Ok(())` - Instance was enqueued for processing
201 /// * `Err(msg)` - Failed to enqueue (storage error)
202 ///
203 /// # Behavior
204 ///
205 /// - Enqueues a StartOrchestration work item
206 /// - Returns immediately (doesn't wait for orchestration to start/complete)
207 /// - Use `wait_for_orchestration()` to wait for completion
208 ///
209 /// # Instance ID Requirements
210 ///
211 /// - Must be unique across all orchestrations
212 /// - Can be any string (alphanumeric + hyphens recommended)
213 /// - Reusing an instance ID that already exists will fail
214 ///
215 /// # Example
216 ///
217 /// ```rust,no_run
218 /// # use duroxide::{Client, ClientError};
219 /// # use std::sync::Arc;
220 /// # async fn example(client: Client) -> Result<(), ClientError> {
221 /// // Start with JSON string input
222 /// client.start_orchestration(
223 /// "order-123",
224 /// "ProcessOrder",
225 /// r#"{"customer_id": "c1", "items": ["item1", "item2"]}"#
226 /// ).await?;
227 /// # Ok(())
228 /// # }
229 /// ```
230 ///
231 /// # Errors
232 ///
233 /// Returns `ClientError::Provider` if the provider fails to enqueue the orchestration.
234 pub async fn start_orchestration(
235 &self,
236 instance: impl Into<String>,
237 orchestration: impl Into<String>,
238 input: impl Into<String>,
239 ) -> Result<(), ClientError> {
240 let item = WorkItem::StartOrchestration {
241 instance: instance.into(),
242 orchestration: orchestration.into(),
243 input: input.into(),
244 version: None,
245 parent_instance: None,
246 parent_id: None,
247 execution_id: crate::INITIAL_EXECUTION_ID,
248 };
249 self.store
250 .enqueue_for_orchestrator(item, None)
251 .await
252 .map_err(ClientError::from)
253 }
254
255 /// Start an orchestration instance pinned to a specific version.
256 ///
257 /// # Errors
258 ///
259 /// Returns `ClientError::Provider` if the provider fails to enqueue the orchestration.
260 pub async fn start_orchestration_versioned(
261 &self,
262 instance: impl Into<String>,
263 orchestration: impl Into<String>,
264 version: impl Into<String>,
265 input: impl Into<String>,
266 ) -> Result<(), ClientError> {
267 let item = WorkItem::StartOrchestration {
268 instance: instance.into(),
269 orchestration: orchestration.into(),
270 input: input.into(),
271 version: Some(version.into()),
272 parent_instance: None,
273 parent_id: None,
274 execution_id: crate::INITIAL_EXECUTION_ID,
275 };
276 self.store
277 .enqueue_for_orchestrator(item, None)
278 .await
279 .map_err(ClientError::from)
280 }
281
282 // Note: No delayed scheduling API. Clients should use normal start APIs.
283
284 /// Start an orchestration with typed input (serialized to JSON).
285 ///
286 /// # Errors
287 ///
288 /// Returns `ClientError::InvalidInput` if serialization fails.
289 /// Returns `ClientError::Provider` if the provider fails to enqueue the orchestration.
290 pub async fn start_orchestration_typed<In: Serialize>(
291 &self,
292 instance: impl Into<String>,
293 orchestration: impl Into<String>,
294 input: In,
295 ) -> Result<(), ClientError> {
296 let payload = Json::encode(&input).map_err(|e| ClientError::InvalidInput {
297 message: format!("encode: {e}"),
298 })?;
299 self.start_orchestration(instance, orchestration, payload).await
300 }
301
302 /// Start a versioned orchestration with typed input (serialized to JSON).
303 ///
304 /// # Errors
305 ///
306 /// Returns `ClientError::InvalidInput` if serialization fails.
307 /// Returns `ClientError::Provider` if the provider fails to enqueue the orchestration.
308 pub async fn start_orchestration_versioned_typed<In: Serialize>(
309 &self,
310 instance: impl Into<String>,
311 orchestration: impl Into<String>,
312 version: impl Into<String>,
313 input: In,
314 ) -> Result<(), ClientError> {
315 let payload = Json::encode(&input).map_err(|e| ClientError::InvalidInput {
316 message: format!("encode: {e}"),
317 })?;
318 self.start_orchestration_versioned(instance, orchestration, version, payload)
319 .await
320 }
321
322 /// Raise an external event into a running orchestration instance.
323 ///
324 /// # Purpose
325 ///
326 /// Send a signal/message to a running orchestration that is waiting for an external event.
327 /// The orchestration must have called `ctx.schedule_wait(event_name)` to receive the event.
328 ///
329 /// # Parameters
330 ///
331 /// * `instance` - Instance ID of the running orchestration
332 /// * `event_name` - Name of the event (must match `schedule_wait` name)
333 /// * `data` - Payload data (JSON string, passed to orchestration)
334 ///
335 /// # Behavior
336 ///
337 /// - Enqueues ExternalRaised work item to orchestrator queue
338 /// - If instance isn't waiting for this event (yet), it's buffered
339 /// - Event is matched by NAME (not correlation ID)
340 /// - Multiple events with same name can be raised
341 ///
342 /// # Example
343 ///
344 /// ```rust,no_run
345 /// # use duroxide::{Client, ClientError};
346 /// # async fn example(client: Client) -> Result<(), ClientError> {
347 /// // Orchestration waiting for approval
348 /// // ctx.schedule_wait("ApprovalEvent").await
349 ///
350 /// // External system/human approves
351 /// client.raise_event(
352 /// "order-123",
353 /// "ApprovalEvent",
354 /// r#"{"approved": true, "by": "manager@company.com"}"#
355 /// ).await?;
356 /// # Ok(())
357 /// # }
358 /// ```
359 ///
360 /// # Use Cases
361 ///
362 /// - Human approval workflows
363 /// - Webhook callbacks
364 /// - Inter-orchestration communication
365 /// - External system integration
366 ///
367 /// # Error Cases
368 ///
369 /// - Instance doesn't exist: Event is buffered, orchestration processes when started
370 /// - Instance already completed: Event is ignored gracefully
371 ///
372 /// # Errors
373 ///
374 /// Returns `ClientError::Provider` if the provider fails to enqueue the event.
375 pub async fn raise_event(
376 &self,
377 instance: impl Into<String>,
378 event_name: impl Into<String>,
379 data: impl Into<String>,
380 ) -> Result<(), ClientError> {
381 let item = WorkItem::ExternalRaised {
382 instance: instance.into(),
383 name: event_name.into(),
384 data: data.into(),
385 };
386 self.store
387 .enqueue_for_orchestrator(item, None)
388 .await
389 .map_err(ClientError::from)
390 }
391
392 /// Raise a positional external event with typed data.
393 ///
394 /// Serializes `data` as JSON before sending. Same semantics as [`Self::raise_event`].
395 ///
396 /// # Errors
397 ///
398 /// Returns [`ClientError`] if the provider fails to enqueue the event.
399 pub async fn raise_event_typed<T: serde::Serialize>(
400 &self,
401 instance: impl Into<String>,
402 event_name: impl Into<String>,
403 data: &T,
404 ) -> Result<(), ClientError> {
405 let payload = crate::_typed_codec::Json::encode(data).expect("Serialization should not fail");
406 self.raise_event(instance, event_name, payload).await
407 }
408
409 /// Enqueue a message into a named queue for an orchestration instance.
410 ///
411 /// Queue messages use FIFO mailbox semantics:
412 /// - Matched to [`OrchestrationContext::dequeue_event`] subscriptions in order
413 /// - Stick around until consumed (even if no subscription exists yet)
414 /// - Survive `continue_as_new` boundaries
415 /// - Not affected by subscription cancellation
416 ///
417 /// # Errors
418 ///
419 /// Returns [`ClientError`] if the provider fails to enqueue the message.
420 pub async fn enqueue_event(
421 &self,
422 instance: impl Into<String>,
423 queue: impl Into<String>,
424 data: impl Into<String>,
425 ) -> Result<(), ClientError> {
426 let item = WorkItem::QueueMessage {
427 instance: instance.into(),
428 name: queue.into(),
429 data: data.into(),
430 };
431 self.store
432 .enqueue_for_orchestrator(item, None)
433 .await
434 .map_err(ClientError::from)
435 }
436
437 /// Enqueue a typed message into a named queue for an orchestration instance.
438 ///
439 /// Serializes `data` as JSON before enqueuing. Same semantics as [`Self::enqueue_event`].
440 ///
441 /// # Errors
442 ///
443 /// Returns [`ClientError`] if the provider fails to enqueue the message.
444 pub async fn enqueue_event_typed<T: serde::Serialize>(
445 &self,
446 instance: impl Into<String>,
447 queue: impl Into<String>,
448 data: &T,
449 ) -> Result<(), ClientError> {
450 let payload = crate::_typed_codec::Json::encode(data).expect("Serialization should not fail");
451 self.enqueue_event(instance, queue, payload).await
452 }
453
454 /// Raise a persistent external event that uses mailbox semantics.
455 ///
456 /// Prefer [`Self::enqueue_event`] — this is a deprecated alias.
457 ///
458 /// # Errors
459 ///
460 /// Returns [`ClientError`] if the provider fails to enqueue the event.
461 #[deprecated(note = "Use enqueue_event() instead")]
462 pub async fn raise_event_persistent(
463 &self,
464 instance: impl Into<String>,
465 event_name: impl Into<String>,
466 data: impl Into<String>,
467 ) -> Result<(), ClientError> {
468 self.enqueue_event(instance, event_name, data).await
469 }
470
471 /// V2: Raise an external event with topic-based pub/sub matching.
472 ///
473 /// Same as `raise_event`, but includes a `topic` for pub/sub matching.
474 /// The orchestration must have called `ctx.schedule_wait2(name, topic)` to receive the event.
475 /// Feature-gated for replay engine extensibility verification.
476 ///
477 /// # Errors
478 ///
479 /// Returns [`ClientError`] if the provider fails to enqueue the event.
480 #[cfg(feature = "replay-version-test")]
481 pub async fn raise_event2(
482 &self,
483 instance: impl Into<String>,
484 event_name: impl Into<String>,
485 topic: impl Into<String>,
486 data: impl Into<String>,
487 ) -> Result<(), ClientError> {
488 let item = WorkItem::ExternalRaised2 {
489 instance: instance.into(),
490 name: event_name.into(),
491 topic: topic.into(),
492 data: data.into(),
493 };
494 self.store
495 .enqueue_for_orchestrator(item, None)
496 .await
497 .map_err(ClientError::from)
498 }
499
500 /// Request cancellation of an orchestration instance.
501 ///
502 /// # Purpose
503 ///
504 /// Gracefully cancel a running orchestration. The orchestration will complete its current
505 /// turn and then fail deterministically with a "canceled: {reason}" error.
506 ///
507 /// # Parameters
508 ///
509 /// * `instance` - Instance ID to cancel
510 /// * `reason` - Reason for cancellation (included in error message)
511 ///
512 /// # Behavior
513 ///
514 /// 1. Enqueues CancelInstance work item
515 /// 2. Runtime appends OrchestrationCancelRequested event
516 /// 3. Next turn, orchestration sees cancellation and fails deterministically
517 /// 4. Final status: `OrchestrationStatus::Failed { details: Application::Cancelled }`
518 ///
519 /// # Deterministic Cancellation
520 ///
521 /// Cancellation is **deterministic** - the orchestration fails at a well-defined point:
522 /// - Not mid-activity (activities complete)
523 /// - Not mid-turn (current turn finishes)
524 /// - Failure is recorded in history (replays consistently)
525 ///
526 /// # Propagation
527 ///
528 /// If the orchestration has child sub-orchestrations, they are also cancelled.
529 ///
530 /// # Example
531 ///
532 /// ```ignore
533 /// // Cancel a long-running order
534 /// client.cancel_instance("order-123", "Customer requested cancellation").await?;
535 ///
536 /// // Wait for cancellation to complete
537 /// let status = client.wait_for_orchestration("order-123", std::time::Duration::from_secs(5)).await?;
538 /// match status {
539 /// OrchestrationStatus::Failed { details, .. } if matches!(
540 /// details,
541 /// duroxide::ErrorDetails::Application {
542 /// kind: duroxide::AppErrorKind::Cancelled { .. },
543 /// ..
544 /// }
545 /// ) => {
546 /// println!("Successfully cancelled");
547 /// }
548 /// _ => {}
549 /// }
550 /// ```
551 ///
552 /// # Error Cases
553 ///
554 /// - Instance already completed: Cancellation is no-op
555 /// - Instance doesn't exist: Cancellation is no-op
556 ///
557 /// # Errors
558 ///
559 /// Returns `ClientError::Provider` if the provider fails to enqueue the cancellation.
560 pub async fn cancel_instance(
561 &self,
562 instance: impl Into<String>,
563 reason: impl Into<String>,
564 ) -> Result<(), ClientError> {
565 let item = WorkItem::CancelInstance {
566 instance: instance.into(),
567 reason: reason.into(),
568 };
569 self.store
570 .enqueue_for_orchestrator(item, None)
571 .await
572 .map_err(ClientError::from)
573 }
574
575 /// Get the current status of an orchestration by inspecting its history.
576 ///
577 /// # Purpose
578 ///
579 /// Query the current state of an orchestration instance without waiting.
580 ///
581 /// # Parameters
582 ///
583 /// * `instance` - Instance ID to query
584 ///
585 /// # Returns
586 ///
587 /// * `OrchestrationStatus::NotFound` - Instance doesn't exist
588 /// * `OrchestrationStatus::Running` - Instance is still executing
589 /// * `OrchestrationStatus::Completed { output, .. }` - Instance completed successfully
590 /// * `OrchestrationStatus::Failed { error }` - Instance failed (includes cancellations)
591 ///
592 /// # Behavior
593 ///
594 /// - Reads instance history from provider
595 /// - Scans for terminal events (Completed/Failed)
596 /// - For multi-execution instances (ContinueAsNew), returns status of LATEST execution
597 ///
598 /// # Performance
599 ///
600 /// This method reads from storage (not cached). For polling, use `wait_for_orchestration` instead.
601 ///
602 /// # Example
603 ///
604 /// ```rust,no_run
605 /// # use duroxide::{Client, ClientError, OrchestrationStatus};
606 /// # async fn example(client: Client) -> Result<(), ClientError> {
607 /// let status = client.get_orchestration_status("order-123").await?;
608 ///
609 /// match status {
610 /// OrchestrationStatus::NotFound => println!("Instance not found"),
611 /// OrchestrationStatus::Running { .. } => println!("Still processing"),
612 /// OrchestrationStatus::Completed { output, .. } => println!("Done: {}", output),
613 /// OrchestrationStatus::Failed { details, .. } => eprintln!("Error: {}", details.display_message()),
614 /// }
615 /// # Ok(())
616 /// # }
617 /// ```
618 ///
619 /// # Errors
620 ///
621 /// Returns `ClientError::Provider` if the provider fails to read the orchestration history.
622 pub async fn get_orchestration_status(&self, instance: &str) -> Result<OrchestrationStatus, ClientError> {
623 let hist = self.store.read(instance).await.map_err(ClientError::from)?;
624
625 // Query custom status (lightweight, always available)
626 let (custom_status, custom_status_version) = match self.store.get_custom_status(instance, 0).await {
627 Ok(Some((cs, v))) => (cs, v),
628 Ok(None) => (None, 0),
629 Err(_) => (None, 0), // Best-effort: don't fail status query for custom_status errors
630 };
631
632 // Find terminal events first
633 for e in hist.iter().rev() {
634 match &e.kind {
635 EventKind::OrchestrationCompleted { output } => {
636 return Ok(OrchestrationStatus::Completed {
637 output: output.clone(),
638 custom_status,
639 custom_status_version,
640 });
641 }
642 EventKind::OrchestrationFailed { details } => {
643 return Ok(OrchestrationStatus::Failed {
644 details: details.clone(),
645 custom_status,
646 custom_status_version,
647 });
648 }
649 _ => {}
650 }
651 }
652 // If we ever saw a start, it's running
653 if hist
654 .iter()
655 .any(|e| matches!(&e.kind, EventKind::OrchestrationStarted { .. }))
656 {
657 Ok(OrchestrationStatus::Running {
658 custom_status,
659 custom_status_version,
660 })
661 } else {
662 Ok(OrchestrationStatus::NotFound)
663 }
664 }
665
666 /// Wait until terminal state or timeout using provider reads.
667 ///
668 /// # Purpose
669 ///
670 /// Poll for orchestration completion with exponential backoff, returning when terminal or timeout.
671 ///
672 /// # Parameters
673 ///
674 /// * `instance` - Instance ID to wait for
675 /// * `timeout` - Maximum time to wait before returning timeout error
676 ///
677 /// # Returns
678 ///
679 /// * `Ok(OrchestrationStatus::Completed { output, .. })` - Orchestration completed successfully
680 /// * `Ok(OrchestrationStatus::Failed { details, .. })` - Orchestration failed (includes cancellations)
681 /// * `Err(ClientError::Timeout)` - Timeout elapsed while still Running
682 /// * `Err(ClientError::Provider(e))` - Provider/Storage error
683 ///
684 /// **Note:** Never returns `NotFound` or `Running` - only terminal states or timeout.
685 ///
686 /// # Polling Behavior
687 ///
688 /// - First check: Immediate (no delay)
689 /// - Subsequent checks: Exponential backoff starting at 5ms, doubling each iteration, max 100ms
690 /// - Continues until terminal state or timeout
691 ///
692 /// # Example
693 ///
694 /// ```ignore
695 /// // Start orchestration
696 /// client.start_orchestration("order-123", "ProcessOrder", "{}").await?;
697 ///
698 /// // Wait up to 30 seconds
699 /// match client.wait_for_orchestration("order-123", std::time::Duration::from_secs(30)).await {
700 /// Ok(OrchestrationStatus::Completed { output, .. }) => {
701 /// println!("Success: {}", output);
702 /// }
703 /// Ok(OrchestrationStatus::Failed { details, .. }) => {
704 /// eprintln!("Failed ({}): {}", details.category(), details.display_message());
705 /// }
706 /// Err(ClientError::Timeout) => {
707 /// println!("Still running after 30s, instance: order-123");
708 /// // Instance is still running - can wait more or cancel
709 /// }
710 /// _ => unreachable!("wait_for_orchestration only returns terminal or timeout"),
711 /// }
712 /// ```
713 ///
714 /// # Use Cases
715 ///
716 /// - Synchronous request/response workflows
717 /// - Testing (wait for workflow to complete)
718 /// - CLI tools (block until done)
719 /// - Health checks
720 ///
721 /// # For Long-Running Workflows
722 ///
723 /// Don't wait for hours/days:
724 /// ```ignore
725 /// // Start workflow
726 /// client.start_orchestration("batch-job", "ProcessBatch", "{}").await.unwrap();
727 ///
728 /// // DON'T wait for hours
729 /// // let status = client.wait_for_orchestration("batch-job", Duration::from_hours(24)).await;
730 ///
731 /// // DO poll periodically
732 /// loop {
733 /// match client.get_orchestration_status("batch-job").await {
734 /// OrchestrationStatus::Completed { .. } => break,
735 /// OrchestrationStatus::Failed { .. } => break,
736 /// _ => tokio::time::sleep(std::time::Duration::from_secs(60)).await,
737 /// }
738 /// }
739 /// ```
740 ///
741 /// # Errors
742 ///
743 /// Returns `ClientError::Provider` if the provider fails to read the orchestration status.
744 /// Returns `ClientError::Timeout` if the orchestration doesn't complete within the timeout.
745 pub async fn wait_for_orchestration(
746 &self,
747 instance: &str,
748 timeout: std::time::Duration,
749 ) -> Result<OrchestrationStatus, ClientError> {
750 let deadline = std::time::Instant::now() + timeout;
751 // quick path
752 match self.get_orchestration_status(instance).await {
753 Ok(s @ OrchestrationStatus::Completed { .. }) => return Ok(s),
754 Ok(s @ OrchestrationStatus::Failed { .. }) => return Ok(s),
755 Err(e) => return Err(e),
756 _ => {}
757 }
758 // poll with backoff
759 let mut delay_ms: u64 = INITIAL_POLL_DELAY_MS;
760 while std::time::Instant::now() < deadline {
761 match self.get_orchestration_status(instance).await {
762 Ok(s @ OrchestrationStatus::Completed { .. }) => return Ok(s),
763 Ok(s @ OrchestrationStatus::Failed { .. }) => return Ok(s),
764 Err(e) => return Err(e),
765 _ => {
766 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
767 delay_ms = (delay_ms.saturating_mul(POLL_DELAY_MULTIPLIER)).min(MAX_POLL_DELAY_MS);
768 }
769 }
770 }
771 Err(ClientError::Timeout)
772 }
773
774 /// Typed wait helper: decodes output on Completed, returns Err(String) on Failed.
775 ///
776 /// # Errors
777 ///
778 /// Returns `ClientError::Provider` if the provider fails to read the orchestration status.
779 /// Returns `ClientError::Timeout` if the orchestration doesn't complete within the timeout.
780 /// Returns `ClientError::InvalidInput` if deserialization of the output fails.
781 pub async fn wait_for_orchestration_typed<Out: serde::de::DeserializeOwned>(
782 &self,
783 instance: &str,
784 timeout: std::time::Duration,
785 ) -> Result<Result<Out, String>, ClientError> {
786 match self.wait_for_orchestration(instance, timeout).await? {
787 OrchestrationStatus::Completed { output, .. } => match Json::decode::<Out>(&output) {
788 Ok(v) => Ok(Ok(v)),
789 Err(e) => Err(ClientError::InvalidInput {
790 message: format!("decode failed: {e}"),
791 }),
792 },
793 OrchestrationStatus::Failed { details, .. } => Ok(Err(details.display_message())),
794 _ => unreachable!("wait_for_orchestration returns only terminal or timeout"),
795 }
796 }
797
798 /// Wait for custom_status to change, polling the provider at `poll_interval`.
799 ///
800 /// Returns the full `OrchestrationStatus` when:
801 /// - The `custom_status_version` exceeds `last_seen_version`
802 /// - The orchestration reaches a terminal state (Completed/Failed)
803 /// - The timeout elapses (returns `Err(ClientError::Timeout)`)
804 ///
805 /// # Parameters
806 ///
807 /// * `instance` - Instance ID to monitor
808 /// * `last_seen_version` - The version the caller last observed (0 to get any status)
809 /// * `poll_interval` - How often to check the provider
810 /// * `timeout` - Maximum time to wait
811 ///
812 /// # Example
813 ///
814 /// ```ignore
815 /// let mut version = 0u64;
816 /// loop {
817 /// match client.wait_for_status_change("order-123", version, Duration::from_millis(200), Duration::from_secs(30)).await {
818 /// Ok(OrchestrationStatus::Running { custom_status, custom_status_version }) => {
819 /// println!("Progress: {:?}", custom_status);
820 /// version = custom_status_version;
821 /// }
822 /// Ok(OrchestrationStatus::Completed { output, .. }) => {
823 /// println!("Done: {output}");
824 /// break;
825 /// }
826 /// _ => break,
827 /// }
828 /// }
829 /// ```
830 ///
831 /// # Errors
832 ///
833 /// Returns [`ClientError`] if the provider fails or the instance doesn't exist.
834 pub async fn wait_for_status_change(
835 &self,
836 instance: &str,
837 last_seen_version: u64,
838 poll_interval: std::time::Duration,
839 timeout: std::time::Duration,
840 ) -> Result<OrchestrationStatus, ClientError> {
841 let deadline = std::time::Instant::now() + timeout;
842
843 while std::time::Instant::now() < deadline {
844 // Lightweight check: just custom_status + version
845 match self.store.get_custom_status(instance, last_seen_version).await {
846 Ok(Some(_)) => {
847 // Version changed — return full status
848 return self.get_orchestration_status(instance).await;
849 }
850 Ok(None) => {
851 // No change — check if terminal before sleeping
852 // (get_custom_status returns None if version hasn't changed,
853 // but also if the instance doesn't exist or is terminal)
854 }
855 Err(e) => return Err(ClientError::from(e)),
856 }
857
858 // Also check for terminal state (in case orchestration completed
859 // without ever updating custom_status)
860 match self.get_orchestration_status(instance).await? {
861 OrchestrationStatus::Running { .. } | OrchestrationStatus::NotFound => {
862 // Still running — sleep and retry
863 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
864 tokio::time::sleep(poll_interval.min(remaining)).await;
865 }
866 terminal => return Ok(terminal),
867 }
868 }
869
870 Err(ClientError::Timeout)
871 }
872
873 // ===== Capability Discovery =====
874
875 /// Check if management capabilities are available.
876 ///
877 /// # Returns
878 ///
879 /// `true` if the provider implements `ProviderAdmin`, `false` otherwise.
880 ///
881 /// # Usage
882 ///
883 /// ```ignore
884 /// let client = Client::new(provider);
885 /// if client.has_management_capability() {
886 /// let instances = client.list_all_instances().await?;
887 /// } else {
888 /// println!("Management features not available");
889 /// }
890 /// ```
891 pub fn has_management_capability(&self) -> bool {
892 self.discover_management().is_ok()
893 }
894
895 /// Automatically discover management capabilities from the provider.
896 ///
897 /// # Returns
898 ///
899 /// `Ok(&dyn ManagementCapability)` if available, `Err(String)` if not.
900 ///
901 /// # Internal Use
902 ///
903 /// This method is used internally by management methods to access capabilities.
904 fn discover_management(&self) -> Result<&dyn ProviderAdmin, ClientError> {
905 self.store
906 .as_management_capability()
907 .ok_or(ClientError::ManagementNotAvailable)
908 }
909
910 // ===== Rich Management Methods =====
911
912 /// List all orchestration instances.
913 ///
914 /// # Returns
915 ///
916 /// Vector of instance IDs, typically sorted by creation time (newest first).
917 ///
918 /// # Errors
919 ///
920 /// Returns `Err("Management features not available")` if the provider doesn't implement `ProviderAdmin`.
921 ///
922 /// # Usage
923 ///
924 /// ```ignore
925 /// let client = Client::new(provider);
926 /// if client.has_management_capability() {
927 /// let instances = client.list_all_instances().await?;
928 /// for instance in instances {
929 /// println!("Instance: {}", instance);
930 /// }
931 /// }
932 /// ```
933 pub async fn list_all_instances(&self) -> Result<Vec<String>, ClientError> {
934 self.discover_management()?
935 .list_instances()
936 .await
937 .map_err(ClientError::from)
938 }
939
940 /// List instances matching a status filter.
941 ///
942 /// # Parameters
943 ///
944 /// * `status` - Filter by execution status: "Running", "Completed", "Failed", "ContinuedAsNew"
945 ///
946 /// # Returns
947 ///
948 /// Vector of instance IDs with the specified status.
949 ///
950 /// # Errors
951 ///
952 /// Returns `Err("Management features not available")` if the provider doesn't implement `ProviderAdmin`.
953 ///
954 /// # Usage
955 ///
956 /// ```ignore
957 /// let client = Client::new(provider);
958 /// if client.has_management_capability() {
959 /// let running = client.list_instances_by_status("Running").await?;
960 /// let completed = client.list_instances_by_status("Completed").await?;
961 /// println!("Running: {}, Completed: {}", running.len(), completed.len());
962 /// }
963 /// ```
964 pub async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ClientError> {
965 self.discover_management()?
966 .list_instances_by_status(status)
967 .await
968 .map_err(ClientError::from)
969 }
970
971 /// Get comprehensive information about an instance.
972 ///
973 /// # Parameters
974 ///
975 /// * `instance` - The ID of the orchestration instance.
976 ///
977 /// # Returns
978 ///
979 /// Detailed instance information including status, output, and metadata.
980 ///
981 /// # Errors
982 ///
983 /// Returns `Err("Management features not available")` if the provider doesn't implement `ProviderAdmin`.
984 ///
985 /// # Usage
986 ///
987 /// ```ignore
988 /// let client = Client::new(provider);
989 /// if client.has_management_capability() {
990 /// let info = client.get_instance_info("order-123").await?;
991 /// println!("Instance {}: {} ({})", info.instance_id, info.orchestration_name, info.status);
992 /// }
993 /// ```
994 pub async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ClientError> {
995 self.discover_management()?
996 .get_instance_info(instance)
997 .await
998 .map_err(ClientError::from)
999 }
1000
1001 /// Get detailed information about a specific execution.
1002 ///
1003 /// # Parameters
1004 ///
1005 /// * `instance` - The ID of the orchestration instance.
1006 /// * `execution_id` - The specific execution ID.
1007 ///
1008 /// # Returns
1009 ///
1010 /// Detailed execution information including status, output, and event count.
1011 ///
1012 /// # Errors
1013 ///
1014 /// Returns `Err("Management features not available")` if the provider doesn't implement `ProviderAdmin`.
1015 ///
1016 /// # Usage
1017 ///
1018 /// ```ignore
1019 /// let client = Client::new(provider);
1020 /// if client.has_management_capability() {
1021 /// let info = client.get_execution_info("order-123", 1).await?;
1022 /// println!("Execution {}: {} events, status: {}", info.execution_id, info.event_count, info.status);
1023 /// }
1024 /// ```
1025 pub async fn get_execution_info(&self, instance: &str, execution_id: u64) -> Result<ExecutionInfo, ClientError> {
1026 self.discover_management()?
1027 .get_execution_info(instance, execution_id)
1028 .await
1029 .map_err(ClientError::from)
1030 }
1031
1032 /// List all execution IDs for an instance.
1033 ///
1034 /// Returns execution IDs in ascending order: \[1\], \[1, 2\], \[1, 2, 3\], etc.
1035 /// Each execution represents either the initial run or a continuation via ContinueAsNew.
1036 ///
1037 /// # Parameters
1038 ///
1039 /// * `instance` - The instance ID to query
1040 ///
1041 /// # Returns
1042 ///
1043 /// Vector of execution IDs in ascending order.
1044 ///
1045 /// # Errors
1046 ///
1047 /// Returns an error if:
1048 /// - The provider doesn't support management capabilities
1049 /// - The database query fails
1050 ///
1051 /// # Usage
1052 ///
1053 /// ```ignore
1054 /// let client = Client::new(provider);
1055 /// if client.has_management_capability() {
1056 /// let executions = client.list_executions("order-123").await?;
1057 /// println!("Instance has {} executions", executions.len()); // [1, 2, 3]
1058 /// }
1059 /// ```
1060 pub async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ClientError> {
1061 let mgmt = self.discover_management()?;
1062 mgmt.list_executions(instance).await.map_err(ClientError::from)
1063 }
1064
1065 /// Read the full event history for a specific execution within an instance.
1066 ///
1067 /// Returns all events for the specified execution in chronological order.
1068 /// Each execution has its own independent history starting from OrchestrationStarted.
1069 ///
1070 /// # Parameters
1071 ///
1072 /// * `instance` - The instance ID
1073 /// * `execution_id` - The specific execution ID (starts at 1)
1074 ///
1075 /// # Returns
1076 ///
1077 /// Vector of events in chronological order (oldest first).
1078 ///
1079 /// # Errors
1080 ///
1081 /// Returns an error if:
1082 /// - The provider doesn't support management capabilities
1083 /// - The instance or execution doesn't exist
1084 /// - The database query fails
1085 ///
1086 /// # Usage
1087 ///
1088 /// ```ignore
1089 /// let client = Client::new(provider);
1090 /// if client.has_management_capability() {
1091 /// let history = client.read_execution_history("order-123", 1).await?;
1092 /// for event in history {
1093 /// println!("Event: {:?}", event);
1094 /// }
1095 /// }
1096 /// ```
1097 pub async fn read_execution_history(
1098 &self,
1099 instance: &str,
1100 execution_id: u64,
1101 ) -> Result<Vec<crate::Event>, ClientError> {
1102 let mgmt = self.discover_management()?;
1103 mgmt.read_history_with_execution_id(instance, execution_id)
1104 .await
1105 .map_err(ClientError::from)
1106 }
1107
1108 /// Get system-wide metrics for the orchestration engine.
1109 ///
1110 /// # Returns
1111 ///
1112 /// System metrics including instance counts, execution counts, and status breakdown.
1113 ///
1114 /// # Errors
1115 ///
1116 /// Returns `Err("Management features not available")` if the provider doesn't implement `ProviderAdmin`.
1117 ///
1118 /// # Usage
1119 ///
1120 /// ```ignore
1121 /// let client = Client::new(provider);
1122 /// if client.has_management_capability() {
1123 /// let metrics = client.get_system_metrics().await?;
1124 /// println!("System health: {} running, {} completed, {} failed",
1125 /// metrics.running_instances, metrics.completed_instances, metrics.failed_instances);
1126 /// }
1127 /// ```
1128 pub async fn get_system_metrics(&self) -> Result<SystemMetrics, ClientError> {
1129 self.discover_management()?
1130 .get_system_metrics()
1131 .await
1132 .map_err(ClientError::from)
1133 }
1134
1135 /// Get the current depths of the internal work queues.
1136 ///
1137 /// # Returns
1138 ///
1139 /// Queue depths for orchestrator, worker, and timer queues.
1140 ///
1141 /// # Errors
1142 ///
1143 /// Returns `Err("Management features not available")` if the provider doesn't implement `ProviderAdmin`.
1144 ///
1145 /// # Usage
1146 ///
1147 /// ```ignore
1148 /// let client = Client::new(provider);
1149 /// if client.has_management_capability() {
1150 /// let queues = client.get_queue_depths().await?;
1151 /// println!("Queue depths - Orchestrator: {}, Worker: {}, Timer: {}",
1152 /// queues.orchestrator_queue, queues.worker_queue, queues.timer_queue);
1153 /// }
1154 /// ```
1155 pub async fn get_queue_depths(&self) -> Result<QueueDepths, ClientError> {
1156 self.discover_management()?
1157 .get_queue_depths()
1158 .await
1159 .map_err(ClientError::from)
1160 }
1161
1162 // ===== Hierarchy Operations =====
1163
1164 /// Get the full instance tree rooted at the given instance.
1165 ///
1166 /// Returns all instances in the tree: the root, all children, grandchildren, etc.
1167 /// Useful for inspecting hierarchy before deletion, or for understanding
1168 /// sub-orchestration relationships.
1169 ///
1170 /// # Parameters
1171 ///
1172 /// * `instance_id` - The root instance ID to start from.
1173 ///
1174 /// # Errors
1175 ///
1176 /// Returns [`ClientError::InstanceNotFound`] if the instance doesn't exist.
1177 /// Returns [`ClientError::ProviderError`] for database/connection issues.
1178 ///
1179 /// # Example
1180 ///
1181 /// ```rust,no_run
1182 /// # use duroxide::{Client, ClientError};
1183 /// # async fn example(client: Client) -> Result<(), ClientError> {
1184 /// let tree = client.get_instance_tree("order-123").await?;
1185 /// println!("Will delete {} instances", tree.size());
1186 /// for id in &tree.all_ids {
1187 /// println!(" - {}", id);
1188 /// }
1189 /// client.delete_instance("order-123", false).await?;
1190 /// # Ok(())
1191 /// # }
1192 /// ```
1193 pub async fn get_instance_tree(&self, instance_id: &str) -> Result<InstanceTree, ClientError> {
1194 let mgmt = self.discover_management()?;
1195 mgmt.get_instance_tree(instance_id).await.map_err(ClientError::from)
1196 }
1197
1198 // ===== Deletion and Pruning Operations =====
1199
1200 /// Delete an orchestration instance and all its associated data.
1201 ///
1202 /// This removes the instance, all executions, all history events, and any
1203 /// pending queue messages (orchestrator, worker, timer).
1204 ///
1205 /// # Parameters
1206 ///
1207 /// * `instance_id` - The ID of the instance to delete.
1208 /// * `force` - If true, delete even if the instance is in Running state.
1209 /// WARNING: Force delete only removes database state; it does NOT cancel
1210 /// in-flight tokio tasks. Use `cancel_instance` first for graceful termination.
1211 ///
1212 /// # Errors
1213 ///
1214 /// - [`ClientError::InstanceStillRunning`] - Instance is running and force=false.
1215 /// - [`ClientError::CannotDeleteSubOrchestration`] - Instance is a sub-orchestration.
1216 /// - [`ClientError::InstanceNotFound`] - Instance doesn't exist.
1217 /// - [`ClientError::ProviderError`] - Database/connection issues.
1218 ///
1219 /// # Cascading Delete
1220 ///
1221 /// If the instance is a root orchestration with sub-orchestrations, all descendants
1222 /// are included in the deletion (performed atomically in a single transaction).
1223 ///
1224 /// # Example
1225 ///
1226 /// ```rust,no_run
1227 /// # use duroxide::{Client, ClientError};
1228 /// # async fn example(client: Client) -> Result<(), ClientError> {
1229 /// // Delete a completed instance
1230 /// let result = client.delete_instance("order-123", false).await?;
1231 /// println!("Deleted {} events", result.events_deleted);
1232 ///
1233 /// // Graceful pattern: cancel first, then delete
1234 /// client.cancel_instance("workflow-456", "cleanup").await?;
1235 /// // Wait for cancellation to complete...
1236 /// client.delete_instance("workflow-456", false).await?;
1237 ///
1238 /// // Force delete an instance stuck in Running state
1239 /// client.delete_instance("stuck-workflow", true).await?;
1240 /// # Ok(())
1241 /// # }
1242 /// ```
1243 pub async fn delete_instance(&self, instance_id: &str, force: bool) -> Result<DeleteInstanceResult, ClientError> {
1244 let mgmt = self.discover_management()?;
1245 let result = mgmt
1246 .delete_instance(instance_id, force)
1247 .await
1248 .map_err(|e| Self::translate_delete_error(e, instance_id))?;
1249
1250 info!(
1251 instance_id = %instance_id,
1252 force = %force,
1253 instances_deleted = %result.instances_deleted,
1254 executions_deleted = %result.executions_deleted,
1255 events_deleted = %result.events_deleted,
1256 queue_messages_deleted = %result.queue_messages_deleted,
1257 "Deleted instance"
1258 );
1259
1260 Ok(result)
1261 }
1262
1263 /// Delete multiple orchestration instances matching the filter criteria.
1264 ///
1265 /// Only instances in terminal states (Completed, Failed) are eligible.
1266 /// Running instances are silently skipped (not an error).
1267 ///
1268 /// # Parameters
1269 ///
1270 /// * `filter` - Criteria for selecting instances to delete. All criteria are ANDed.
1271 ///
1272 /// # Filter Behavior
1273 ///
1274 /// - `instance_ids`: Allowlist of specific IDs to consider
1275 /// - `completed_before`: Only delete instances completed before this timestamp (ms since epoch)
1276 /// - `limit`: Maximum number of instances to delete (default: 1000)
1277 ///
1278 /// # Errors
1279 ///
1280 /// Returns [`ClientError::ProviderError`] for database/connection issues.
1281 ///
1282 /// # Example
1283 ///
1284 /// ```rust,no_run
1285 /// # use duroxide::{Client, ClientError, InstanceFilter};
1286 /// # async fn example(client: Client, now_ms: u64) -> Result<(), ClientError> {
1287 /// // Delete specific instances
1288 /// let result = client.delete_instance_bulk(InstanceFilter {
1289 /// instance_ids: Some(vec!["order-1".into(), "order-2".into()]),
1290 /// ..Default::default()
1291 /// }).await?;
1292 ///
1293 /// // Delete by age (retention policy)
1294 /// let five_days_ago = now_ms - (5 * 24 * 60 * 60 * 1000);
1295 /// let result = client.delete_instance_bulk(InstanceFilter {
1296 /// completed_before: Some(five_days_ago),
1297 /// limit: Some(500),
1298 /// ..Default::default()
1299 /// }).await?;
1300 /// # Ok(())
1301 /// # }
1302 /// ```
1303 pub async fn delete_instance_bulk(&self, filter: InstanceFilter) -> Result<DeleteInstanceResult, ClientError> {
1304 let result = self
1305 .discover_management()?
1306 .delete_instance_bulk(filter.clone())
1307 .await
1308 .map_err(ClientError::from)?;
1309
1310 info!(
1311 filter = ?filter,
1312 instances_deleted = %result.instances_deleted,
1313 executions_deleted = %result.executions_deleted,
1314 events_deleted = %result.events_deleted,
1315 queue_messages_deleted = %result.queue_messages_deleted,
1316 "Bulk deleted instances"
1317 );
1318
1319 Ok(result)
1320 }
1321
1322 /// Prune old executions from a single long-running instance.
1323 ///
1324 /// Use this for `ContinueAsNew` workflows that accumulate many executions
1325 /// over time. The current (active) execution is never pruned.
1326 ///
1327 /// # Parameters
1328 ///
1329 /// * `instance_id` - The instance to prune executions from.
1330 /// * `options` - Criteria for selecting executions to delete. All criteria are ANDed.
1331 ///
1332 /// # Options Behavior
1333 ///
1334 /// - `keep_last`: Keep the last N executions by execution_id
1335 /// - `completed_before`: Only delete executions completed before this timestamp (ms)
1336 /// - Running executions are NEVER pruned
1337 /// - The current execution is NEVER pruned
1338 ///
1339 /// # Errors
1340 ///
1341 /// Returns [`ClientError::ProviderError`] for database/connection issues.
1342 ///
1343 /// # Example
1344 ///
1345 /// ```rust,no_run
1346 /// # use duroxide::{Client, ClientError, PruneOptions};
1347 /// # async fn example(client: Client, now_ms: u64) -> Result<(), ClientError> {
1348 /// // Keep only the last 10 executions
1349 /// let result = client.prune_executions("eternal-workflow", PruneOptions {
1350 /// keep_last: Some(10),
1351 /// ..Default::default()
1352 /// }).await?;
1353 ///
1354 /// // Delete executions older than 30 days
1355 /// let thirty_days_ago = now_ms - (30 * 24 * 60 * 60 * 1000);
1356 /// let result = client.prune_executions("eternal-workflow", PruneOptions {
1357 /// completed_before: Some(thirty_days_ago),
1358 /// ..Default::default()
1359 /// }).await?;
1360 /// # Ok(())
1361 /// # }
1362 /// ```
1363 pub async fn prune_executions(&self, instance_id: &str, options: PruneOptions) -> Result<PruneResult, ClientError> {
1364 let result = self
1365 .discover_management()?
1366 .prune_executions(instance_id, options.clone())
1367 .await
1368 .map_err(ClientError::from)?;
1369
1370 info!(
1371 instance_id = %instance_id,
1372 options = ?options,
1373 executions_deleted = %result.executions_deleted,
1374 events_deleted = %result.events_deleted,
1375 instances_processed = %result.instances_processed,
1376 "Pruned executions"
1377 );
1378
1379 Ok(result)
1380 }
1381
1382 /// Prune old executions from multiple instances matching the filter.
1383 ///
1384 /// Applies the same prune options to all matching instances.
1385 ///
1386 /// # Parameters
1387 ///
1388 /// * `filter` - Criteria for selecting instances to process.
1389 /// * `options` - Criteria for selecting executions to delete within each instance.
1390 ///
1391 /// # Errors
1392 ///
1393 /// Returns [`ClientError::ProviderError`] for database/connection issues.
1394 ///
1395 /// # Example
1396 ///
1397 /// ```rust,no_run
1398 /// # use duroxide::{Client, ClientError, InstanceFilter, PruneOptions};
1399 /// # async fn example(client: Client) -> Result<(), ClientError> {
1400 /// // Prune all terminal instances: keep last 10 executions each
1401 /// let result = client.prune_executions_bulk(
1402 /// InstanceFilter { limit: Some(100), ..Default::default() },
1403 /// PruneOptions { keep_last: Some(10), ..Default::default() },
1404 /// ).await?;
1405 /// println!("Pruned {} executions across {} instances",
1406 /// result.executions_deleted, result.instances_processed);
1407 /// # Ok(())
1408 /// # }
1409 /// ```
1410 pub async fn prune_executions_bulk(
1411 &self,
1412 filter: InstanceFilter,
1413 options: PruneOptions,
1414 ) -> Result<PruneResult, ClientError> {
1415 let result = self
1416 .discover_management()?
1417 .prune_executions_bulk(filter.clone(), options.clone())
1418 .await
1419 .map_err(ClientError::from)?;
1420
1421 info!(
1422 filter = ?filter,
1423 options = ?options,
1424 executions_deleted = %result.executions_deleted,
1425 events_deleted = %result.events_deleted,
1426 instances_processed = %result.instances_processed,
1427 "Bulk pruned executions"
1428 );
1429
1430 Ok(result)
1431 }
1432
1433 /// Translate provider errors into more specific client errors for delete operations.
1434 fn translate_delete_error(e: ProviderError, instance_id: &str) -> ClientError {
1435 let msg = e.to_string().to_lowercase();
1436 if msg.contains("not found") {
1437 ClientError::InstanceNotFound {
1438 instance_id: instance_id.to_string(),
1439 }
1440 } else if msg.contains("still running") {
1441 ClientError::InstanceStillRunning {
1442 instance_id: instance_id.to_string(),
1443 }
1444 } else if msg.contains("sub-orchestration") || msg.contains("delete the root") {
1445 ClientError::CannotDeleteSubOrchestration {
1446 instance_id: instance_id.to_string(),
1447 }
1448 } else {
1449 ClientError::Provider(e)
1450 }
1451 }
1452}