pub struct Client { /* private fields */ }Expand description
Client for orchestration control-plane operations with automatic capability discovery.
The Client provides APIs for managing orchestration instances:
- Starting orchestrations
- Raising external events
- Cancelling instances
- Checking status
- Waiting for completion
- Rich management features (when available)
§Automatic Capability Discovery
The Client automatically discovers provider capabilities through the Provider::as_management_capability() method.
When a provider implements ProviderAdmin, rich management features become available:
let client = Client::new(provider);
// Control plane (always available)
client.start_orchestration("order-1", "ProcessOrder", "{}").await?;
// Management (automatically discovered)
if client.has_management_capability() {
let instances = client.list_all_instances().await?;
let metrics = client.get_system_metrics().await?;
} else {
println!("Management features not available");
}§Design
The Client communicates with the Runtime only through the shared Provider (no direct coupling). This allows the Client to be used from any process, even one without a running Runtime.
§Thread Safety
Client is Clone and can be safely shared across threads.
§Example Usage
use duroxide::{Client, OrchestrationStatus};
use duroxide::providers::sqlite::SqliteProvider;
use std::sync::Arc;
use duroxide::ClientError;
let store = Arc::new(SqliteProvider::new("sqlite:./data.db").await?);
let client = Client::new(store);
// Start an orchestration
client.start_orchestration("order-123", "ProcessOrder", r#"{"customer_id": "c1"}"#).await?;
// Check status
let status = client.get_orchestration_status("order-123").await?;
println!("Status: {:?}", status);
// Wait for completion
let result = client.wait_for_orchestration("order-123", std::time::Duration::from_secs(30)).await.unwrap();
match result {
OrchestrationStatus::Completed { output } => println!("Done: {}", output),
OrchestrationStatus::Failed { details } => {
eprintln!("Failed ({}): {}", details.category(), details.display_message());
}
_ => {}
}Implementations§
Source§impl Client
impl Client
Sourcepub fn new(store: Arc<dyn Provider>) -> Self
pub fn new(store: Arc<dyn Provider>) -> Self
Create a client bound to a Provider instance.
§Parameters
store- Arc-wrapped Provider (same instance used by Runtime)
§Example
let store = Arc::new(SqliteProvider::new("sqlite::memory:").await.unwrap());
let client = Client::new(store.clone());
// Multiple clients can share the same store
let client2 = client.clone();Sourcepub async fn start_orchestration(
&self,
instance: impl Into<String>,
orchestration: impl Into<String>,
input: impl Into<String>,
) -> Result<(), ClientError>
pub async fn start_orchestration( &self, instance: impl Into<String>, orchestration: impl Into<String>, input: impl Into<String>, ) -> Result<(), ClientError>
Start an orchestration instance with string input.
§Parameters
instance- Unique instance ID (e.g., “order-123”, “user-payment-456”)orchestration- Name of registered orchestration (e.g., “ProcessOrder”)input- JSON string input (will be passed to orchestration)
§Returns
Ok(())- Instance was enqueued for processingErr(msg)- Failed to enqueue (storage error)
§Behavior
- Enqueues a StartOrchestration work item
- Returns immediately (doesn’t wait for orchestration to start/complete)
- Use
wait_for_orchestration()to wait for completion
§Instance ID Requirements
- Must be unique across all orchestrations
- Can be any string (alphanumeric + hyphens recommended)
- Reusing an instance ID that already exists will fail
§Example
// Start with JSON string input
client.start_orchestration(
"order-123",
"ProcessOrder",
r#"{"customer_id": "c1", "items": ["item1", "item2"]}"#
).await?;§Errors
Returns ClientError::Provider if the provider fails to enqueue the orchestration.
Sourcepub async fn start_orchestration_versioned(
&self,
instance: impl Into<String>,
orchestration: impl Into<String>,
version: impl Into<String>,
input: impl Into<String>,
) -> Result<(), ClientError>
pub async fn start_orchestration_versioned( &self, instance: impl Into<String>, orchestration: impl Into<String>, version: impl Into<String>, input: impl Into<String>, ) -> Result<(), ClientError>
Start an orchestration instance pinned to a specific version.
§Errors
Returns ClientError::Provider if the provider fails to enqueue the orchestration.
Sourcepub async fn start_orchestration_typed<In: Serialize>(
&self,
instance: impl Into<String>,
orchestration: impl Into<String>,
input: In,
) -> Result<(), ClientError>
pub async fn start_orchestration_typed<In: Serialize>( &self, instance: impl Into<String>, orchestration: impl Into<String>, input: In, ) -> Result<(), ClientError>
Start an orchestration with typed input (serialized to JSON).
§Errors
Returns ClientError::InvalidInput if serialization fails.
Returns ClientError::Provider if the provider fails to enqueue the orchestration.
Sourcepub async fn start_orchestration_versioned_typed<In: Serialize>(
&self,
instance: impl Into<String>,
orchestration: impl Into<String>,
version: impl Into<String>,
input: In,
) -> Result<(), ClientError>
pub async fn start_orchestration_versioned_typed<In: Serialize>( &self, instance: impl Into<String>, orchestration: impl Into<String>, version: impl Into<String>, input: In, ) -> Result<(), ClientError>
Start a versioned orchestration with typed input (serialized to JSON).
§Errors
Returns ClientError::InvalidInput if serialization fails.
Returns ClientError::Provider if the provider fails to enqueue the orchestration.
Sourcepub async fn raise_event(
&self,
instance: impl Into<String>,
event_name: impl Into<String>,
data: impl Into<String>,
) -> Result<(), ClientError>
pub async fn raise_event( &self, instance: impl Into<String>, event_name: impl Into<String>, data: impl Into<String>, ) -> Result<(), ClientError>
Raise an external event into a running orchestration instance.
§Purpose
Send a signal/message to a running orchestration that is waiting for an external event.
The orchestration must have called ctx.schedule_wait(event_name) to receive the event.
§Parameters
instance- Instance ID of the running orchestrationevent_name- Name of the event (must matchschedule_waitname)data- Payload data (JSON string, passed to orchestration)
§Behavior
- Enqueues ExternalRaised work item to orchestrator queue
- If instance isn’t waiting for this event (yet), it’s buffered
- Event is matched by NAME (not correlation ID)
- Multiple events with same name can be raised
§Example
// Orchestration waiting for approval
// ctx.schedule_wait("ApprovalEvent").into_event().await
// External system/human approves
client.raise_event(
"order-123",
"ApprovalEvent",
r#"{"approved": true, "by": "manager@company.com"}"#
).await?;§Use Cases
- Human approval workflows
- Webhook callbacks
- Inter-orchestration communication
- External system integration
§Error Cases
- Instance doesn’t exist: Event is buffered, orchestration processes when started
- Instance already completed: Event is ignored gracefully
§Errors
Returns ClientError::Provider if the provider fails to enqueue the event.
Sourcepub async fn cancel_instance(
&self,
instance: impl Into<String>,
reason: impl Into<String>,
) -> Result<(), ClientError>
pub async fn cancel_instance( &self, instance: impl Into<String>, reason: impl Into<String>, ) -> Result<(), ClientError>
Request cancellation of an orchestration instance.
§Purpose
Gracefully cancel a running orchestration. The orchestration will complete its current turn and then fail deterministically with a “canceled: {reason}” error.
§Parameters
instance- Instance ID to cancelreason- Reason for cancellation (included in error message)
§Behavior
- Enqueues CancelInstance work item
- Runtime appends OrchestrationCancelRequested event
- Next turn, orchestration sees cancellation and fails deterministically
- Final status:
OrchestrationStatus::Failed { details: Application::Cancelled }
§Deterministic Cancellation
Cancellation is deterministic - the orchestration fails at a well-defined point:
- Not mid-activity (activities complete)
- Not mid-turn (current turn finishes)
- Failure is recorded in history (replays consistently)
§Propagation
If the orchestration has child sub-orchestrations, they are also cancelled.
§Example
// Cancel a long-running order
client.cancel_instance("order-123", "Customer requested cancellation").await?;
// Wait for cancellation to complete
let status = client.wait_for_orchestration("order-123", std::time::Duration::from_secs(5)).await?;
match status {
OrchestrationStatus::Failed { details } if matches!(
details,
duroxide::ErrorDetails::Application {
kind: duroxide::AppErrorKind::Cancelled { .. },
..
}
) => {
println!("Successfully cancelled");
}
_ => {}
}§Error Cases
- Instance already completed: Cancellation is no-op
- Instance doesn’t exist: Cancellation is no-op
§Errors
Returns ClientError::Provider if the provider fails to enqueue the cancellation.
Sourcepub async fn get_orchestration_status(
&self,
instance: &str,
) -> Result<OrchestrationStatus, ClientError>
pub async fn get_orchestration_status( &self, instance: &str, ) -> Result<OrchestrationStatus, ClientError>
Get the current status of an orchestration by inspecting its history.
§Purpose
Query the current state of an orchestration instance without waiting.
§Parameters
instance- Instance ID to query
§Returns
OrchestrationStatus::NotFound- Instance doesn’t existOrchestrationStatus::Running- Instance is still executingOrchestrationStatus::Completed { output }- Instance completed successfullyOrchestrationStatus::Failed { error }- Instance failed (includes cancellations)
§Behavior
- Reads instance history from provider
- Scans for terminal events (Completed/Failed)
- For multi-execution instances (ContinueAsNew), returns status of LATEST execution
§Performance
This method reads from storage (not cached). For polling, use wait_for_orchestration instead.
§Example
let status = client.get_orchestration_status("order-123").await?;
match status {
OrchestrationStatus::NotFound => println!("Instance not found"),
OrchestrationStatus::Running => println!("Still processing"),
OrchestrationStatus::Completed { output } => println!("Done: {}", output),
OrchestrationStatus::Failed { details } => eprintln!("Error: {}", details.display_message()),
}§Errors
Returns ClientError::Provider if the provider fails to read the orchestration history.
Sourcepub async fn wait_for_orchestration(
&self,
instance: &str,
timeout: Duration,
) -> Result<OrchestrationStatus, ClientError>
pub async fn wait_for_orchestration( &self, instance: &str, timeout: Duration, ) -> Result<OrchestrationStatus, ClientError>
Wait until terminal state or timeout using provider reads.
§Purpose
Poll for orchestration completion with exponential backoff, returning when terminal or timeout.
§Parameters
instance- Instance ID to wait fortimeout- Maximum time to wait before returning timeout error
§Returns
Ok(OrchestrationStatus::Completed { output })- Orchestration completed successfullyOk(OrchestrationStatus::Failed { details })- Orchestration failed (includes cancellations)Err(ClientError::Timeout)- Timeout elapsed while still RunningErr(ClientError::Provider(e))- Provider/Storage error
Note: Never returns NotFound or Running - only terminal states or timeout.
§Polling Behavior
- First check: Immediate (no delay)
- Subsequent checks: Exponential backoff starting at 5ms, doubling each iteration, max 100ms
- Continues until terminal state or timeout
§Example
// Start orchestration
client.start_orchestration("order-123", "ProcessOrder", "{}").await?;
// Wait up to 30 seconds
match client.wait_for_orchestration("order-123", std::time::Duration::from_secs(30)).await {
Ok(OrchestrationStatus::Completed { output }) => {
println!("Success: {}", output);
}
Ok(OrchestrationStatus::Failed { details }) => {
eprintln!("Failed ({}): {}", details.category(), details.display_message());
}
Err(ClientError::Timeout) => {
println!("Still running after 30s, instance: order-123");
// Instance is still running - can wait more or cancel
}
_ => unreachable!("wait_for_orchestration only returns terminal or timeout"),
}§Use Cases
- Synchronous request/response workflows
- Testing (wait for workflow to complete)
- CLI tools (block until done)
- Health checks
§For Long-Running Workflows
Don’t wait for hours/days:
// Start workflow
client.start_orchestration("batch-job", "ProcessBatch", "{}").await.unwrap();
// DON'T wait for hours
// let status = client.wait_for_orchestration("batch-job", Duration::from_hours(24)).await;
// DO poll periodically
loop {
match client.get_orchestration_status("batch-job").await {
OrchestrationStatus::Completed { .. } => break,
OrchestrationStatus::Failed { .. } => break,
_ => tokio::time::sleep(std::time::Duration::from_secs(60)).await,
}
}§Errors
Returns ClientError::Provider if the provider fails to read the orchestration status.
Returns ClientError::Timeout if the orchestration doesn’t complete within the timeout.
Sourcepub async fn wait_for_orchestration_typed<Out: DeserializeOwned>(
&self,
instance: &str,
timeout: Duration,
) -> Result<Result<Out, String>, ClientError>
pub async fn wait_for_orchestration_typed<Out: DeserializeOwned>( &self, instance: &str, timeout: Duration, ) -> Result<Result<Out, String>, ClientError>
Typed wait helper: decodes output on Completed, returns Err(String) on Failed.
§Errors
Returns ClientError::Provider if the provider fails to read the orchestration status.
Returns ClientError::Timeout if the orchestration doesn’t complete within the timeout.
Returns ClientError::InvalidInput if deserialization of the output fails.
Sourcepub fn has_management_capability(&self) -> bool
pub fn has_management_capability(&self) -> bool
Check if management capabilities are available.
§Returns
true if the provider implements ProviderAdmin, false otherwise.
§Usage
let client = Client::new(provider);
if client.has_management_capability() {
let instances = client.list_all_instances().await?;
} else {
println!("Management features not available");
}Sourcepub async fn list_all_instances(&self) -> Result<Vec<String>, ClientError>
pub async fn list_all_instances(&self) -> Result<Vec<String>, ClientError>
List all orchestration instances.
§Returns
Vector of instance IDs, typically sorted by creation time (newest first).
§Errors
Returns Err("Management features not available") if the provider doesn’t implement ProviderAdmin.
§Usage
let client = Client::new(provider);
if client.has_management_capability() {
let instances = client.list_all_instances().await?;
for instance in instances {
println!("Instance: {}", instance);
}
}Sourcepub async fn list_instances_by_status(
&self,
status: &str,
) -> Result<Vec<String>, ClientError>
pub async fn list_instances_by_status( &self, status: &str, ) -> Result<Vec<String>, ClientError>
List instances matching a status filter.
§Parameters
status- Filter by execution status: “Running”, “Completed”, “Failed”, “ContinuedAsNew”
§Returns
Vector of instance IDs with the specified status.
§Errors
Returns Err("Management features not available") if the provider doesn’t implement ProviderAdmin.
§Usage
let client = Client::new(provider);
if client.has_management_capability() {
let running = client.list_instances_by_status("Running").await?;
let completed = client.list_instances_by_status("Completed").await?;
println!("Running: {}, Completed: {}", running.len(), completed.len());
}Sourcepub async fn get_instance_info(
&self,
instance: &str,
) -> Result<InstanceInfo, ClientError>
pub async fn get_instance_info( &self, instance: &str, ) -> Result<InstanceInfo, ClientError>
Get comprehensive information about an instance.
§Parameters
instance- The ID of the orchestration instance.
§Returns
Detailed instance information including status, output, and metadata.
§Errors
Returns Err("Management features not available") if the provider doesn’t implement ProviderAdmin.
§Usage
let client = Client::new(provider);
if client.has_management_capability() {
let info = client.get_instance_info("order-123").await?;
println!("Instance {}: {} ({})", info.instance_id, info.orchestration_name, info.status);
}Sourcepub async fn get_execution_info(
&self,
instance: &str,
execution_id: u64,
) -> Result<ExecutionInfo, ClientError>
pub async fn get_execution_info( &self, instance: &str, execution_id: u64, ) -> Result<ExecutionInfo, ClientError>
Get detailed information about a specific execution.
§Parameters
instance- The ID of the orchestration instance.execution_id- The specific execution ID.
§Returns
Detailed execution information including status, output, and event count.
§Errors
Returns Err("Management features not available") if the provider doesn’t implement ProviderAdmin.
§Usage
let client = Client::new(provider);
if client.has_management_capability() {
let info = client.get_execution_info("order-123", 1).await?;
println!("Execution {}: {} events, status: {}", info.execution_id, info.event_count, info.status);
}Sourcepub async fn list_executions(
&self,
instance: &str,
) -> Result<Vec<u64>, ClientError>
pub async fn list_executions( &self, instance: &str, ) -> Result<Vec<u64>, ClientError>
List all execution IDs for an instance.
Returns execution IDs in ascending order: [1], [1, 2], [1, 2, 3], etc. Each execution represents either the initial run or a continuation via ContinueAsNew.
§Parameters
instance- The instance ID to query
§Returns
Vector of execution IDs in ascending order.
§Errors
Returns an error if:
- The provider doesn’t support management capabilities
- The database query fails
§Usage
let client = Client::new(provider);
if client.has_management_capability() {
let executions = client.list_executions("order-123").await?;
println!("Instance has {} executions", executions.len()); // [1, 2, 3]
}Sourcepub async fn read_execution_history(
&self,
instance: &str,
execution_id: u64,
) -> Result<Vec<Event>, ClientError>
pub async fn read_execution_history( &self, instance: &str, execution_id: u64, ) -> Result<Vec<Event>, ClientError>
Read the full event history for a specific execution within an instance.
Returns all events for the specified execution in chronological order. Each execution has its own independent history starting from OrchestrationStarted.
§Parameters
instance- The instance IDexecution_id- The specific execution ID (starts at 1)
§Returns
Vector of events in chronological order (oldest first).
§Errors
Returns an error if:
- The provider doesn’t support management capabilities
- The instance or execution doesn’t exist
- The database query fails
§Usage
let client = Client::new(provider);
if client.has_management_capability() {
let history = client.read_execution_history("order-123", 1).await?;
for event in history {
println!("Event: {:?}", event);
}
}Sourcepub async fn get_system_metrics(&self) -> Result<SystemMetrics, ClientError>
pub async fn get_system_metrics(&self) -> Result<SystemMetrics, ClientError>
Get system-wide metrics for the orchestration engine.
§Returns
System metrics including instance counts, execution counts, and status breakdown.
§Errors
Returns Err("Management features not available") if the provider doesn’t implement ProviderAdmin.
§Usage
let client = Client::new(provider);
if client.has_management_capability() {
let metrics = client.get_system_metrics().await?;
println!("System health: {} running, {} completed, {} failed",
metrics.running_instances, metrics.completed_instances, metrics.failed_instances);
}Sourcepub async fn get_queue_depths(&self) -> Result<QueueDepths, ClientError>
pub async fn get_queue_depths(&self) -> Result<QueueDepths, ClientError>
Get the current depths of the internal work queues.
§Returns
Queue depths for orchestrator, worker, and timer queues.
§Errors
Returns Err("Management features not available") if the provider doesn’t implement ProviderAdmin.
§Usage
let client = Client::new(provider);
if client.has_management_capability() {
let queues = client.get_queue_depths().await?;
println!("Queue depths - Orchestrator: {}, Worker: {}, Timer: {}",
queues.orchestrator_queue, queues.worker_queue, queues.timer_queue);
}