duroxide/client/mod.rs
1use std::sync::Arc;
2
3use crate::_typed_codec::{Codec, Json};
4use crate::providers::{
5 ExecutionInfo, InstanceInfo, Provider, ProviderAdmin, ProviderError, QueueDepths, SystemMetrics, WorkItem,
6};
7use crate::{EventKind, OrchestrationStatus};
8use serde::Serialize;
9
10/// Client-specific error type that wraps provider errors and adds client-specific errors.
11///
12/// This enum allows callers to distinguish between:
13/// - Provider errors (storage failures, can be retryable or permanent)
14/// - Client-specific errors (validation, capability not available, etc.)
15#[derive(Debug, Clone)]
16pub enum ClientError {
17 /// Provider operation failed (wraps ProviderError)
18 Provider(ProviderError),
19
20 /// Management capability not available
21 ManagementNotAvailable,
22
23 /// Invalid input (client validation)
24 InvalidInput { message: String },
25
26 /// Operation timed out
27 Timeout,
28}
29
30impl ClientError {
31 /// Check if this error is retryable (only applies to Provider errors)
32 pub fn is_retryable(&self) -> bool {
33 match self {
34 ClientError::Provider(e) => e.is_retryable(),
35 ClientError::ManagementNotAvailable => false,
36 ClientError::InvalidInput { .. } => false,
37 ClientError::Timeout => true,
38 }
39 }
40}
41
42impl std::fmt::Display for ClientError {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 match self {
45 ClientError::Provider(e) => write!(f, "{e}"),
46 ClientError::ManagementNotAvailable => write!(
47 f,
48 "Management features not available - provider doesn't implement ProviderAdmin"
49 ),
50 ClientError::InvalidInput { message } => write!(f, "Invalid input: {message}"),
51 ClientError::Timeout => write!(f, "Operation timed out"),
52 }
53 }
54}
55
56impl std::error::Error for ClientError {}
57
58impl From<ProviderError> for ClientError {
59 fn from(e: ProviderError) -> Self {
60 ClientError::Provider(e)
61 }
62}
63
64// Constants for polling behavior in wait_for_orchestration
65/// Initial delay between status polls (5ms)
66const INITIAL_POLL_DELAY_MS: u64 = 5;
67
68/// Maximum delay between status polls (100ms)
69const MAX_POLL_DELAY_MS: u64 = 100;
70
71/// Multiplier for exponential backoff
72const POLL_DELAY_MULTIPLIER: u64 = 2;
73
74/// Client for orchestration control-plane operations with automatic capability discovery.
75///
76/// The Client provides APIs for managing orchestration instances:
77/// - Starting orchestrations
78/// - Raising external events
79/// - Cancelling instances
80/// - Checking status
81/// - Waiting for completion
82/// - Rich management features (when available)
83///
84/// # Automatic Capability Discovery
85///
86/// The Client automatically discovers provider capabilities through the `Provider::as_management_capability()` method.
87/// When a provider implements `ProviderAdmin`, rich management features become available:
88///
89/// ```ignore
90/// let client = Client::new(provider);
91///
92/// // Control plane (always available)
93/// client.start_orchestration("order-1", "ProcessOrder", "{}").await?;
94///
95/// // Management (automatically discovered)
96/// if client.has_management_capability() {
97/// let instances = client.list_all_instances().await?;
98/// let metrics = client.get_system_metrics().await?;
99/// } else {
100/// println!("Management features not available");
101/// }
102/// ```
103///
104/// # Design
105///
106/// The Client communicates with the Runtime **only through the shared Provider** (no direct coupling).
107/// This allows the Client to be used from any process, even one without a running Runtime.
108///
109/// # Thread Safety
110///
111/// Client is `Clone` and can be safely shared across threads.
112///
113/// # Example Usage
114///
115/// ```ignore
116/// use duroxide::{Client, OrchestrationStatus};
117/// use duroxide::providers::sqlite::SqliteProvider;
118/// use std::sync::Arc;
119///
120/// use duroxide::ClientError;
121/// let store = Arc::new(SqliteProvider::new("sqlite:./data.db").await?);
122/// let client = Client::new(store);
123///
124/// // Start an orchestration
125/// client.start_orchestration("order-123", "ProcessOrder", r#"{"customer_id": "c1"}"#).await?;
126///
127/// // Check status
128/// let status = client.get_orchestration_status("order-123").await?;
129/// println!("Status: {:?}", status);
130///
131/// // Wait for completion
132/// let result = client.wait_for_orchestration("order-123", std::time::Duration::from_secs(30)).await.unwrap();
133/// match result {
134/// OrchestrationStatus::Completed { output } => println!("Done: {}", output),
135/// OrchestrationStatus::Failed { details } => {
136/// eprintln!("Failed ({}): {}", details.category(), details.display_message());
137/// }
138/// _ => {}
139/// }
140/// ```
141pub struct Client {
142 store: Arc<dyn Provider>,
143}
144
145impl Client {
146 /// Create a client bound to a Provider instance.
147 ///
148 /// # Parameters
149 ///
150 /// * `store` - Arc-wrapped Provider (same instance used by Runtime)
151 ///
152 /// # Example
153 ///
154 /// ```ignore
155 /// let store = Arc::new(SqliteProvider::new("sqlite::memory:").await.unwrap());
156 /// let client = Client::new(store.clone());
157 /// // Multiple clients can share the same store
158 /// let client2 = client.clone();
159 /// ```
160 pub fn new(store: Arc<dyn Provider>) -> Self {
161 Self { store }
162 }
163
164 /// Start an orchestration instance with string input.
165 ///
166 /// # Parameters
167 ///
168 /// * `instance` - Unique instance ID (e.g., "order-123", "user-payment-456")
169 /// * `orchestration` - Name of registered orchestration (e.g., "ProcessOrder")
170 /// * `input` - JSON string input (will be passed to orchestration)
171 ///
172 /// # Returns
173 ///
174 /// * `Ok(())` - Instance was enqueued for processing
175 /// * `Err(msg)` - Failed to enqueue (storage error)
176 ///
177 /// # Behavior
178 ///
179 /// - Enqueues a StartOrchestration work item
180 /// - Returns immediately (doesn't wait for orchestration to start/complete)
181 /// - Use `wait_for_orchestration()` to wait for completion
182 ///
183 /// # Instance ID Requirements
184 ///
185 /// - Must be unique across all orchestrations
186 /// - Can be any string (alphanumeric + hyphens recommended)
187 /// - Reusing an instance ID that already exists will fail
188 ///
189 /// # Example
190 ///
191 /// ```rust,no_run
192 /// # use duroxide::{Client, ClientError};
193 /// # use std::sync::Arc;
194 /// # async fn example(client: Client) -> Result<(), ClientError> {
195 /// // Start with JSON string input
196 /// client.start_orchestration(
197 /// "order-123",
198 /// "ProcessOrder",
199 /// r#"{"customer_id": "c1", "items": ["item1", "item2"]}"#
200 /// ).await?;
201 /// # Ok(())
202 /// # }
203 /// ```
204 ///
205 /// # Errors
206 ///
207 /// Returns `ClientError::Provider` if the provider fails to enqueue the orchestration.
208 pub async fn start_orchestration(
209 &self,
210 instance: impl Into<String>,
211 orchestration: impl Into<String>,
212 input: impl Into<String>,
213 ) -> Result<(), ClientError> {
214 let item = WorkItem::StartOrchestration {
215 instance: instance.into(),
216 orchestration: orchestration.into(),
217 input: input.into(),
218 version: None,
219 parent_instance: None,
220 parent_id: None,
221 execution_id: crate::INITIAL_EXECUTION_ID,
222 };
223 self.store
224 .enqueue_for_orchestrator(item, None)
225 .await
226 .map_err(ClientError::from)
227 }
228
229 /// Start an orchestration instance pinned to a specific version.
230 ///
231 /// # Errors
232 ///
233 /// Returns `ClientError::Provider` if the provider fails to enqueue the orchestration.
234 pub async fn start_orchestration_versioned(
235 &self,
236 instance: impl Into<String>,
237 orchestration: impl Into<String>,
238 version: impl Into<String>,
239 input: impl Into<String>,
240 ) -> Result<(), ClientError> {
241 let item = WorkItem::StartOrchestration {
242 instance: instance.into(),
243 orchestration: orchestration.into(),
244 input: input.into(),
245 version: Some(version.into()),
246 parent_instance: None,
247 parent_id: None,
248 execution_id: crate::INITIAL_EXECUTION_ID,
249 };
250 self.store
251 .enqueue_for_orchestrator(item, None)
252 .await
253 .map_err(ClientError::from)
254 }
255
256 // Note: No delayed scheduling API. Clients should use normal start APIs.
257
258 /// Start an orchestration with typed input (serialized to JSON).
259 ///
260 /// # Errors
261 ///
262 /// Returns `ClientError::InvalidInput` if serialization fails.
263 /// Returns `ClientError::Provider` if the provider fails to enqueue the orchestration.
264 pub async fn start_orchestration_typed<In: Serialize>(
265 &self,
266 instance: impl Into<String>,
267 orchestration: impl Into<String>,
268 input: In,
269 ) -> Result<(), ClientError> {
270 let payload = Json::encode(&input).map_err(|e| ClientError::InvalidInput {
271 message: format!("encode: {e}"),
272 })?;
273 self.start_orchestration(instance, orchestration, payload).await
274 }
275
276 /// Start a versioned orchestration with typed input (serialized to JSON).
277 ///
278 /// # Errors
279 ///
280 /// Returns `ClientError::InvalidInput` if serialization fails.
281 /// Returns `ClientError::Provider` if the provider fails to enqueue the orchestration.
282 pub async fn start_orchestration_versioned_typed<In: Serialize>(
283 &self,
284 instance: impl Into<String>,
285 orchestration: impl Into<String>,
286 version: impl Into<String>,
287 input: In,
288 ) -> Result<(), ClientError> {
289 let payload = Json::encode(&input).map_err(|e| ClientError::InvalidInput {
290 message: format!("encode: {e}"),
291 })?;
292 self.start_orchestration_versioned(instance, orchestration, version, payload)
293 .await
294 }
295
296 /// Raise an external event into a running orchestration instance.
297 ///
298 /// # Purpose
299 ///
300 /// Send a signal/message to a running orchestration that is waiting for an external event.
301 /// The orchestration must have called `ctx.schedule_wait(event_name)` to receive the event.
302 ///
303 /// # Parameters
304 ///
305 /// * `instance` - Instance ID of the running orchestration
306 /// * `event_name` - Name of the event (must match `schedule_wait` name)
307 /// * `data` - Payload data (JSON string, passed to orchestration)
308 ///
309 /// # Behavior
310 ///
311 /// - Enqueues ExternalRaised work item to orchestrator queue
312 /// - If instance isn't waiting for this event (yet), it's buffered
313 /// - Event is matched by NAME (not correlation ID)
314 /// - Multiple events with same name can be raised
315 ///
316 /// # Example
317 ///
318 /// ```rust,no_run
319 /// # use duroxide::{Client, ClientError};
320 /// # async fn example(client: Client) -> Result<(), ClientError> {
321 /// // Orchestration waiting for approval
322 /// // ctx.schedule_wait("ApprovalEvent").into_event().await
323 ///
324 /// // External system/human approves
325 /// client.raise_event(
326 /// "order-123",
327 /// "ApprovalEvent",
328 /// r#"{"approved": true, "by": "manager@company.com"}"#
329 /// ).await?;
330 /// # Ok(())
331 /// # }
332 /// ```
333 ///
334 /// # Use Cases
335 ///
336 /// - Human approval workflows
337 /// - Webhook callbacks
338 /// - Inter-orchestration communication
339 /// - External system integration
340 ///
341 /// # Error Cases
342 ///
343 /// - Instance doesn't exist: Event is buffered, orchestration processes when started
344 /// - Instance already completed: Event is ignored gracefully
345 ///
346 /// # Errors
347 ///
348 /// Returns `ClientError::Provider` if the provider fails to enqueue the event.
349 pub async fn raise_event(
350 &self,
351 instance: impl Into<String>,
352 event_name: impl Into<String>,
353 data: impl Into<String>,
354 ) -> Result<(), ClientError> {
355 let item = WorkItem::ExternalRaised {
356 instance: instance.into(),
357 name: event_name.into(),
358 data: data.into(),
359 };
360 self.store
361 .enqueue_for_orchestrator(item, None)
362 .await
363 .map_err(ClientError::from)
364 }
365
366 /// Request cancellation of an orchestration instance.
367 ///
368 /// # Purpose
369 ///
370 /// Gracefully cancel a running orchestration. The orchestration will complete its current
371 /// turn and then fail deterministically with a "canceled: {reason}" error.
372 ///
373 /// # Parameters
374 ///
375 /// * `instance` - Instance ID to cancel
376 /// * `reason` - Reason for cancellation (included in error message)
377 ///
378 /// # Behavior
379 ///
380 /// 1. Enqueues CancelInstance work item
381 /// 2. Runtime appends OrchestrationCancelRequested event
382 /// 3. Next turn, orchestration sees cancellation and fails deterministically
383 /// 4. Final status: `OrchestrationStatus::Failed { details: Application::Cancelled }`
384 ///
385 /// # Deterministic Cancellation
386 ///
387 /// Cancellation is **deterministic** - the orchestration fails at a well-defined point:
388 /// - Not mid-activity (activities complete)
389 /// - Not mid-turn (current turn finishes)
390 /// - Failure is recorded in history (replays consistently)
391 ///
392 /// # Propagation
393 ///
394 /// If the orchestration has child sub-orchestrations, they are also cancelled.
395 ///
396 /// # Example
397 ///
398 /// ```ignore
399 /// // Cancel a long-running order
400 /// client.cancel_instance("order-123", "Customer requested cancellation").await?;
401 ///
402 /// // Wait for cancellation to complete
403 /// let status = client.wait_for_orchestration("order-123", std::time::Duration::from_secs(5)).await?;
404 /// match status {
405 /// OrchestrationStatus::Failed { details } if matches!(
406 /// details,
407 /// duroxide::ErrorDetails::Application {
408 /// kind: duroxide::AppErrorKind::Cancelled { .. },
409 /// ..
410 /// }
411 /// ) => {
412 /// println!("Successfully cancelled");
413 /// }
414 /// _ => {}
415 /// }
416 /// ```
417 ///
418 /// # Error Cases
419 ///
420 /// - Instance already completed: Cancellation is no-op
421 /// - Instance doesn't exist: Cancellation is no-op
422 ///
423 /// # Errors
424 ///
425 /// Returns `ClientError::Provider` if the provider fails to enqueue the cancellation.
426 pub async fn cancel_instance(
427 &self,
428 instance: impl Into<String>,
429 reason: impl Into<String>,
430 ) -> Result<(), ClientError> {
431 let item = WorkItem::CancelInstance {
432 instance: instance.into(),
433 reason: reason.into(),
434 };
435 self.store
436 .enqueue_for_orchestrator(item, None)
437 .await
438 .map_err(ClientError::from)
439 }
440
441 /// Get the current status of an orchestration by inspecting its history.
442 ///
443 /// # Purpose
444 ///
445 /// Query the current state of an orchestration instance without waiting.
446 ///
447 /// # Parameters
448 ///
449 /// * `instance` - Instance ID to query
450 ///
451 /// # Returns
452 ///
453 /// * `OrchestrationStatus::NotFound` - Instance doesn't exist
454 /// * `OrchestrationStatus::Running` - Instance is still executing
455 /// * `OrchestrationStatus::Completed { output }` - Instance completed successfully
456 /// * `OrchestrationStatus::Failed { error }` - Instance failed (includes cancellations)
457 ///
458 /// # Behavior
459 ///
460 /// - Reads instance history from provider
461 /// - Scans for terminal events (Completed/Failed)
462 /// - For multi-execution instances (ContinueAsNew), returns status of LATEST execution
463 ///
464 /// # Performance
465 ///
466 /// This method reads from storage (not cached). For polling, use `wait_for_orchestration` instead.
467 ///
468 /// # Example
469 ///
470 /// ```rust,no_run
471 /// # use duroxide::{Client, ClientError, OrchestrationStatus};
472 /// # async fn example(client: Client) -> Result<(), ClientError> {
473 /// let status = client.get_orchestration_status("order-123").await?;
474 ///
475 /// match status {
476 /// OrchestrationStatus::NotFound => println!("Instance not found"),
477 /// OrchestrationStatus::Running => println!("Still processing"),
478 /// OrchestrationStatus::Completed { output } => println!("Done: {}", output),
479 /// OrchestrationStatus::Failed { details } => eprintln!("Error: {}", details.display_message()),
480 /// }
481 /// # Ok(())
482 /// # }
483 /// ```
484 ///
485 /// # Errors
486 ///
487 /// Returns `ClientError::Provider` if the provider fails to read the orchestration history.
488 pub async fn get_orchestration_status(&self, instance: &str) -> Result<OrchestrationStatus, ClientError> {
489 let hist = self.store.read(instance).await.map_err(ClientError::from)?;
490 // Find terminal events first
491 for e in hist.iter().rev() {
492 match &e.kind {
493 EventKind::OrchestrationCompleted { output } => {
494 return Ok(OrchestrationStatus::Completed { output: output.clone() });
495 }
496 EventKind::OrchestrationFailed { details } => {
497 return Ok(OrchestrationStatus::Failed {
498 details: details.clone(),
499 });
500 }
501 _ => {}
502 }
503 }
504 // If we ever saw a start, it's running
505 if hist
506 .iter()
507 .any(|e| matches!(&e.kind, EventKind::OrchestrationStarted { .. }))
508 {
509 Ok(OrchestrationStatus::Running)
510 } else {
511 Ok(OrchestrationStatus::NotFound)
512 }
513 }
514
515 /// Wait until terminal state or timeout using provider reads.
516 ///
517 /// # Purpose
518 ///
519 /// Poll for orchestration completion with exponential backoff, returning when terminal or timeout.
520 ///
521 /// # Parameters
522 ///
523 /// * `instance` - Instance ID to wait for
524 /// * `timeout` - Maximum time to wait before returning timeout error
525 ///
526 /// # Returns
527 ///
528 /// * `Ok(OrchestrationStatus::Completed { output })` - Orchestration completed successfully
529 /// * `Ok(OrchestrationStatus::Failed { details })` - Orchestration failed (includes cancellations)
530 /// * `Err(ClientError::Timeout)` - Timeout elapsed while still Running
531 /// * `Err(ClientError::Provider(e))` - Provider/Storage error
532 ///
533 /// **Note:** Never returns `NotFound` or `Running` - only terminal states or timeout.
534 ///
535 /// # Polling Behavior
536 ///
537 /// - First check: Immediate (no delay)
538 /// - Subsequent checks: Exponential backoff starting at 5ms, doubling each iteration, max 100ms
539 /// - Continues until terminal state or timeout
540 ///
541 /// # Example
542 ///
543 /// ```ignore
544 /// // Start orchestration
545 /// client.start_orchestration("order-123", "ProcessOrder", "{}").await?;
546 ///
547 /// // Wait up to 30 seconds
548 /// match client.wait_for_orchestration("order-123", std::time::Duration::from_secs(30)).await {
549 /// Ok(OrchestrationStatus::Completed { output }) => {
550 /// println!("Success: {}", output);
551 /// }
552 /// Ok(OrchestrationStatus::Failed { details }) => {
553 /// eprintln!("Failed ({}): {}", details.category(), details.display_message());
554 /// }
555 /// Err(ClientError::Timeout) => {
556 /// println!("Still running after 30s, instance: order-123");
557 /// // Instance is still running - can wait more or cancel
558 /// }
559 /// _ => unreachable!("wait_for_orchestration only returns terminal or timeout"),
560 /// }
561 /// ```
562 ///
563 /// # Use Cases
564 ///
565 /// - Synchronous request/response workflows
566 /// - Testing (wait for workflow to complete)
567 /// - CLI tools (block until done)
568 /// - Health checks
569 ///
570 /// # For Long-Running Workflows
571 ///
572 /// Don't wait for hours/days:
573 /// ```ignore
574 /// // Start workflow
575 /// client.start_orchestration("batch-job", "ProcessBatch", "{}").await.unwrap();
576 ///
577 /// // DON'T wait for hours
578 /// // let status = client.wait_for_orchestration("batch-job", Duration::from_hours(24)).await;
579 ///
580 /// // DO poll periodically
581 /// loop {
582 /// match client.get_orchestration_status("batch-job").await {
583 /// OrchestrationStatus::Completed { .. } => break,
584 /// OrchestrationStatus::Failed { .. } => break,
585 /// _ => tokio::time::sleep(std::time::Duration::from_secs(60)).await,
586 /// }
587 /// }
588 /// ```
589 ///
590 /// # Errors
591 ///
592 /// Returns `ClientError::Provider` if the provider fails to read the orchestration status.
593 /// Returns `ClientError::Timeout` if the orchestration doesn't complete within the timeout.
594 pub async fn wait_for_orchestration(
595 &self,
596 instance: &str,
597 timeout: std::time::Duration,
598 ) -> Result<OrchestrationStatus, ClientError> {
599 let deadline = std::time::Instant::now() + timeout;
600 // quick path
601 match self.get_orchestration_status(instance).await {
602 Ok(OrchestrationStatus::Completed { output }) => return Ok(OrchestrationStatus::Completed { output }),
603 Ok(OrchestrationStatus::Failed { details }) => return Ok(OrchestrationStatus::Failed { details }),
604 Err(e) => return Err(e),
605 _ => {}
606 }
607 // poll with backoff
608 let mut delay_ms: u64 = INITIAL_POLL_DELAY_MS;
609 while std::time::Instant::now() < deadline {
610 match self.get_orchestration_status(instance).await {
611 Ok(OrchestrationStatus::Completed { output }) => return Ok(OrchestrationStatus::Completed { output }),
612 Ok(OrchestrationStatus::Failed { details }) => return Ok(OrchestrationStatus::Failed { details }),
613 Err(e) => return Err(e),
614 _ => {
615 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
616 delay_ms = (delay_ms.saturating_mul(POLL_DELAY_MULTIPLIER)).min(MAX_POLL_DELAY_MS);
617 }
618 }
619 }
620 Err(ClientError::Timeout)
621 }
622
623 /// Typed wait helper: decodes output on Completed, returns Err(String) on Failed.
624 ///
625 /// # Errors
626 ///
627 /// Returns `ClientError::Provider` if the provider fails to read the orchestration status.
628 /// Returns `ClientError::Timeout` if the orchestration doesn't complete within the timeout.
629 /// Returns `ClientError::InvalidInput` if deserialization of the output fails.
630 pub async fn wait_for_orchestration_typed<Out: serde::de::DeserializeOwned>(
631 &self,
632 instance: &str,
633 timeout: std::time::Duration,
634 ) -> Result<Result<Out, String>, ClientError> {
635 match self.wait_for_orchestration(instance, timeout).await? {
636 OrchestrationStatus::Completed { output } => match Json::decode::<Out>(&output) {
637 Ok(v) => Ok(Ok(v)),
638 Err(e) => Err(ClientError::InvalidInput {
639 message: format!("decode failed: {e}"),
640 }),
641 },
642 OrchestrationStatus::Failed { details } => Ok(Err(details.display_message())),
643 _ => unreachable!("wait_for_orchestration returns only terminal or timeout"),
644 }
645 }
646
647 // ===== Capability Discovery =====
648
649 /// Check if management capabilities are available.
650 ///
651 /// # Returns
652 ///
653 /// `true` if the provider implements `ProviderAdmin`, `false` otherwise.
654 ///
655 /// # Usage
656 ///
657 /// ```ignore
658 /// let client = Client::new(provider);
659 /// if client.has_management_capability() {
660 /// let instances = client.list_all_instances().await?;
661 /// } else {
662 /// println!("Management features not available");
663 /// }
664 /// ```
665 pub fn has_management_capability(&self) -> bool {
666 self.discover_management().is_ok()
667 }
668
669 /// Automatically discover management capabilities from the provider.
670 ///
671 /// # Returns
672 ///
673 /// `Ok(&dyn ManagementCapability)` if available, `Err(String)` if not.
674 ///
675 /// # Internal Use
676 ///
677 /// This method is used internally by management methods to access capabilities.
678 fn discover_management(&self) -> Result<&dyn ProviderAdmin, ClientError> {
679 self.store
680 .as_management_capability()
681 .ok_or(ClientError::ManagementNotAvailable)
682 }
683
684 // ===== Rich Management Methods =====
685
686 /// List all orchestration instances.
687 ///
688 /// # Returns
689 ///
690 /// Vector of instance IDs, typically sorted by creation time (newest first).
691 ///
692 /// # Errors
693 ///
694 /// Returns `Err("Management features not available")` if the provider doesn't implement `ProviderAdmin`.
695 ///
696 /// # Usage
697 ///
698 /// ```ignore
699 /// let client = Client::new(provider);
700 /// if client.has_management_capability() {
701 /// let instances = client.list_all_instances().await?;
702 /// for instance in instances {
703 /// println!("Instance: {}", instance);
704 /// }
705 /// }
706 /// ```
707 pub async fn list_all_instances(&self) -> Result<Vec<String>, ClientError> {
708 self.discover_management()?
709 .list_instances()
710 .await
711 .map_err(ClientError::from)
712 }
713
714 /// List instances matching a status filter.
715 ///
716 /// # Parameters
717 ///
718 /// * `status` - Filter by execution status: "Running", "Completed", "Failed", "ContinuedAsNew"
719 ///
720 /// # Returns
721 ///
722 /// Vector of instance IDs with the specified status.
723 ///
724 /// # Errors
725 ///
726 /// Returns `Err("Management features not available")` if the provider doesn't implement `ProviderAdmin`.
727 ///
728 /// # Usage
729 ///
730 /// ```ignore
731 /// let client = Client::new(provider);
732 /// if client.has_management_capability() {
733 /// let running = client.list_instances_by_status("Running").await?;
734 /// let completed = client.list_instances_by_status("Completed").await?;
735 /// println!("Running: {}, Completed: {}", running.len(), completed.len());
736 /// }
737 /// ```
738 pub async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ClientError> {
739 self.discover_management()?
740 .list_instances_by_status(status)
741 .await
742 .map_err(ClientError::from)
743 }
744
745 /// Get comprehensive information about an instance.
746 ///
747 /// # Parameters
748 ///
749 /// * `instance` - The ID of the orchestration instance.
750 ///
751 /// # Returns
752 ///
753 /// Detailed instance information including status, output, and metadata.
754 ///
755 /// # Errors
756 ///
757 /// Returns `Err("Management features not available")` if the provider doesn't implement `ProviderAdmin`.
758 ///
759 /// # Usage
760 ///
761 /// ```ignore
762 /// let client = Client::new(provider);
763 /// if client.has_management_capability() {
764 /// let info = client.get_instance_info("order-123").await?;
765 /// println!("Instance {}: {} ({})", info.instance_id, info.orchestration_name, info.status);
766 /// }
767 /// ```
768 pub async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ClientError> {
769 self.discover_management()?
770 .get_instance_info(instance)
771 .await
772 .map_err(ClientError::from)
773 }
774
775 /// Get detailed information about a specific execution.
776 ///
777 /// # Parameters
778 ///
779 /// * `instance` - The ID of the orchestration instance.
780 /// * `execution_id` - The specific execution ID.
781 ///
782 /// # Returns
783 ///
784 /// Detailed execution information including status, output, and event count.
785 ///
786 /// # Errors
787 ///
788 /// Returns `Err("Management features not available")` if the provider doesn't implement `ProviderAdmin`.
789 ///
790 /// # Usage
791 ///
792 /// ```ignore
793 /// let client = Client::new(provider);
794 /// if client.has_management_capability() {
795 /// let info = client.get_execution_info("order-123", 1).await?;
796 /// println!("Execution {}: {} events, status: {}", info.execution_id, info.event_count, info.status);
797 /// }
798 /// ```
799 pub async fn get_execution_info(&self, instance: &str, execution_id: u64) -> Result<ExecutionInfo, ClientError> {
800 self.discover_management()?
801 .get_execution_info(instance, execution_id)
802 .await
803 .map_err(ClientError::from)
804 }
805
806 /// List all execution IDs for an instance.
807 ///
808 /// Returns execution IDs in ascending order: \[1\], \[1, 2\], \[1, 2, 3\], etc.
809 /// Each execution represents either the initial run or a continuation via ContinueAsNew.
810 ///
811 /// # Parameters
812 ///
813 /// * `instance` - The instance ID to query
814 ///
815 /// # Returns
816 ///
817 /// Vector of execution IDs in ascending order.
818 ///
819 /// # Errors
820 ///
821 /// Returns an error if:
822 /// - The provider doesn't support management capabilities
823 /// - The database query fails
824 ///
825 /// # Usage
826 ///
827 /// ```ignore
828 /// let client = Client::new(provider);
829 /// if client.has_management_capability() {
830 /// let executions = client.list_executions("order-123").await?;
831 /// println!("Instance has {} executions", executions.len()); // [1, 2, 3]
832 /// }
833 /// ```
834 pub async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ClientError> {
835 let mgmt = self.discover_management()?;
836 mgmt.list_executions(instance).await.map_err(ClientError::from)
837 }
838
839 /// Read the full event history for a specific execution within an instance.
840 ///
841 /// Returns all events for the specified execution in chronological order.
842 /// Each execution has its own independent history starting from OrchestrationStarted.
843 ///
844 /// # Parameters
845 ///
846 /// * `instance` - The instance ID
847 /// * `execution_id` - The specific execution ID (starts at 1)
848 ///
849 /// # Returns
850 ///
851 /// Vector of events in chronological order (oldest first).
852 ///
853 /// # Errors
854 ///
855 /// Returns an error if:
856 /// - The provider doesn't support management capabilities
857 /// - The instance or execution doesn't exist
858 /// - The database query fails
859 ///
860 /// # Usage
861 ///
862 /// ```ignore
863 /// let client = Client::new(provider);
864 /// if client.has_management_capability() {
865 /// let history = client.read_execution_history("order-123", 1).await?;
866 /// for event in history {
867 /// println!("Event: {:?}", event);
868 /// }
869 /// }
870 /// ```
871 pub async fn read_execution_history(
872 &self,
873 instance: &str,
874 execution_id: u64,
875 ) -> Result<Vec<crate::Event>, ClientError> {
876 let mgmt = self.discover_management()?;
877 mgmt.read_history_with_execution_id(instance, execution_id)
878 .await
879 .map_err(ClientError::from)
880 }
881
882 /// Get system-wide metrics for the orchestration engine.
883 ///
884 /// # Returns
885 ///
886 /// System metrics including instance counts, execution counts, and status breakdown.
887 ///
888 /// # Errors
889 ///
890 /// Returns `Err("Management features not available")` if the provider doesn't implement `ProviderAdmin`.
891 ///
892 /// # Usage
893 ///
894 /// ```ignore
895 /// let client = Client::new(provider);
896 /// if client.has_management_capability() {
897 /// let metrics = client.get_system_metrics().await?;
898 /// println!("System health: {} running, {} completed, {} failed",
899 /// metrics.running_instances, metrics.completed_instances, metrics.failed_instances);
900 /// }
901 /// ```
902 pub async fn get_system_metrics(&self) -> Result<SystemMetrics, ClientError> {
903 self.discover_management()?
904 .get_system_metrics()
905 .await
906 .map_err(ClientError::from)
907 }
908
909 /// Get the current depths of the internal work queues.
910 ///
911 /// # Returns
912 ///
913 /// Queue depths for orchestrator, worker, and timer queues.
914 ///
915 /// # Errors
916 ///
917 /// Returns `Err("Management features not available")` if the provider doesn't implement `ProviderAdmin`.
918 ///
919 /// # Usage
920 ///
921 /// ```ignore
922 /// let client = Client::new(provider);
923 /// if client.has_management_capability() {
924 /// let queues = client.get_queue_depths().await?;
925 /// println!("Queue depths - Orchestrator: {}, Worker: {}, Timer: {}",
926 /// queues.orchestrator_queue, queues.worker_queue, queues.timer_queue);
927 /// }
928 /// ```
929 pub async fn get_queue_depths(&self) -> Result<QueueDepths, ClientError> {
930 self.discover_management()?
931 .get_queue_depths()
932 .await
933 .map_err(ClientError::from)
934 }
935}