Expand description
§Duroxide: Durable execution framework in Rust
Duroxide is a framework for building reliable, long-running code based workflows that can survive failures and restarts. For a deep dive into how durable execution works, see the Durable Futures Internals documentation.
§Quick Start
use duroxide::providers::sqlite::SqliteProvider;
use duroxide::runtime::registry::ActivityRegistry;
use duroxide::runtime::{self};
use duroxide::{ActivityContext, OrchestrationContext, OrchestrationRegistry, Client};
use std::sync::Arc;
// 1. Create a storage provider
let store = Arc::new(SqliteProvider::new("sqlite:./data.db", None).await.unwrap());
// 2. Register activities (your business logic)
let activities = ActivityRegistry::builder()
.register("Greet", |_ctx: ActivityContext, name: String| async move {
Ok(format!("Hello, {}!", name))
})
.build();
// 3. Define your orchestration
let orchestration = |ctx: OrchestrationContext, name: String| async move {
let greeting = ctx.schedule_activity("Greet", name)
.into_activity().await?;
Ok(greeting)
};
// 4. Register and start the runtime
let orchestrations = OrchestrationRegistry::builder()
.register("HelloWorld", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(
store.clone(), Arc::new(activities), orchestrations
).await;
// 5. Create a client and start an orchestration instance
let client = Client::new(store.clone());
client.start_orchestration("inst-1", "HelloWorld", "World").await?;
let result = client.wait_for_orchestration("inst-1", std::time::Duration::from_secs(5)).await
.map_err(|e| format!("Wait error: {:?}", e))?;§Key Concepts
- Orchestrations: Long-running workflows written as async functions (coordination logic)
- Activities: Single-purpose work units (can do anything - DB, API, polling, etc.)
- Supports long-running activities via automatic lock renewal (minutes to hours)
- Timers: Use
ctx.schedule_timer(ms)for orchestration-level delays and timeouts - Deterministic Replay: Orchestrations are replayed from history to ensure consistency
- Durable Futures: Composable futures for activities, timers, and external events
- ContinueAsNew (Multi-Execution): An orchestration can end the current execution and
immediately start a new one with fresh input. Each execution has its own isolated history
that starts with
OrchestrationStarted { event_id: 1 }.
§⚠️ Important: Orchestrations vs Activities
Orchestrations = Coordination (control flow, business logic) Activities = Execution (single-purpose work units)
// ✅ CORRECT: Orchestration-level delay using timer
ctx.schedule_timer(Duration::from_secs(5)).into_timer().await; // Wait 5 seconds
// ✅ ALSO CORRECT: Activity can poll/sleep as part of its work
// Example: Activity that provisions a VM and polls for readiness
// activities.register("ProvisionVM", |config| async move {
// let vm = create_vm(config).await?;
// while !vm_ready(&vm).await {
// tokio::time::sleep(Duration::from_secs(5)).await; // ✅ OK - part of provisioning
// }
// Ok(vm.id)
// });
// ❌ WRONG: Activity that ONLY sleeps (use timer instead)
// ctx.schedule_activity("Sleep5Seconds", "").into_activity().await;Put in Activities (single-purpose execution units):
- Database operations
- API calls (can include retries/polling)
- Data transformations
- File I/O
- VM provisioning (with internal polling)
Put in Orchestrations (coordination and business logic):
- Control flow (if/else, match, loops)
- Business decisions
- Multi-step workflows
- Error handling and compensation
- Timeouts and deadlines (use timers)
- Waiting for external events
§ContinueAsNew (Multi-Execution) Semantics
ContinueAsNew (CAN) allows an orchestration to end its current execution and start a new one with fresh input (useful for loops, pagination, long-running workflows).
- Orchestration calls
ctx.continue_as_new(new_input) - Runtime stamps
OrchestrationContinuedAsNewin the CURRENT execution’s history - Runtime enqueues a
WorkItem::ContinueAsNew - When processing that work item, the runtime starts a NEW execution with:
execution_id = previous_execution_id + 1existing_history = [](fresh history)OrchestrationStarted { event_id: 1, input = new_input }is stamped automatically
- Each execution’s history is independent;
duroxide::Client::read_execution_history(instance, id)returns events for that execution only
Provider responsibilities are strictly storage-level (see below). The runtime owns all orchestration semantics, including execution boundaries and starting the new execution.
§Provider Responsibilities (At a Glance)
Providers are pure storage abstractions. The runtime computes orchestration semantics and passes explicit instructions to the provider.
-
fetch_orchestration_item()- Return a locked batch of work for ONE instance
- Include full history for the CURRENT
execution_id - Do NOT create/synthesize new executions here (even for ContinueAsNew)
-
ack_orchestration_item(lock_token, execution_id, history_delta, ..., metadata)- Atomic commit of one orchestration turn
- Idempotently
INSERT OR IGNOREexecution row for the explicitexecution_id UPDATE instances.current_execution_id = MAX(current_execution_id, execution_id)- Append
history_deltato the specified execution - Update
executions.statusandexecutions.outputfrommetadata(no event inspection)
-
Worker/Timer queues
- Peek-lock semantics (dequeue with lock token; ack by deleting)
- Automatic lock renewal for long-running activities (no configuration needed)
- Orchestrator, Worker, Timer queues are independent but committed atomically with history
See docs/provider-implementation-guide.md and src/providers/sqlite.rs for a complete,
production-grade provider implementation.
§⚠️ Critical: DurableFuture Conversion Pattern
All schedule methods return DurableFuture - you MUST convert before awaiting:
// ✅ CORRECT patterns:
let result = ctx.schedule_activity("Task", "input").into_activity().await?;
ctx.schedule_timer(Duration::from_secs(5)).into_timer().await;
let event = ctx.schedule_wait("Event").into_event().await;
let sub_result = ctx.schedule_sub_orchestration("Sub", "input").into_sub_orchestration().await?;
// ❌ WRONG - These won't compile:
// let result = ctx.schedule_activity("Task", "input").await; // Missing .into_activity()!
// ctx.schedule_timer(Duration::from_secs(5)).await; // Missing .into_timer()!
// let event = ctx.schedule_wait("Event").await; // Missing .into_event()!Why this pattern? DurableFuture is a unified type that can represent any async operation.
The .into_*() methods convert it to the specific awaitable type you need.
§Common Patterns
§Function Chaining
async fn chain_example(ctx: OrchestrationContext) -> Result<String, String> {
let step1 = ctx.schedule_activity("Step1", "input").into_activity().await?;
let step2 = ctx.schedule_activity("Step2", &step1).into_activity().await?;
Ok(step2)
}§Fan-Out/Fan-In
async fn fanout_example(ctx: OrchestrationContext) -> Vec<String> {
let futures = vec![
ctx.schedule_activity("Process", "item1"),
ctx.schedule_activity("Process", "item2"),
ctx.schedule_activity("Process", "item3"),
];
let results = ctx.join(futures).await;
results.into_iter().map(|r| match r {
DurableOutput::Activity(Ok(s)) => s,
_ => "error".to_string(),
}).collect()
}§Human-in-the-Loop
async fn approval_example(ctx: OrchestrationContext) -> String {
let timer = ctx.schedule_timer(Duration::from_secs(30)); // 30 second timeout
let approval = ctx.schedule_wait("ApprovalEvent");
let (_, result) = ctx.select2(timer, approval).await;
match result {
DurableOutput::External(data) => data,
DurableOutput::Timer => "timeout".to_string(),
_ => "error".to_string(),
}
}§Delays and Timeouts
async fn delay_example(ctx: OrchestrationContext) -> Result<String, String> {
// ✅ CORRECT: Use timer for orchestration-level delays
ctx.schedule_timer(Duration::from_secs(5)).into_timer().await;
// Process after delay
let result = ctx.schedule_activity("ProcessData", "input")
.into_activity().await?;
Ok(result)
}
async fn timeout_example(ctx: OrchestrationContext) -> Result<String, String> {
// Race work against timeout
let work = ctx.schedule_activity("SlowOperation", "input");
let timeout = ctx.schedule_timer(Duration::from_secs(5));
let (winner_index, result) = ctx.select2(work, timeout).await;
match winner_index {
0 => match result {
DurableOutput::Activity(Ok(value)) => Ok(value),
DurableOutput::Activity(Err(e)) => Err(format!("Work failed: {e}")),
_ => unreachable!(),
},
1 => Err("Operation timed out".to_string()),
_ => unreachable!(),
}
}§Fan-Out/Fan-In with Error Handling
async fn fanout_with_errors(ctx: OrchestrationContext, items: Vec<String>) -> Result<Vec<String>, String> {
// Schedule all work in parallel
let futures: Vec<_> = items.iter()
.map(|item| ctx.schedule_activity("ProcessItem", item.clone()))
.collect();
// Wait for all to complete (deterministic order preserved)
let results = ctx.join(futures).await;
// Process results with error handling
let mut successes = Vec::new();
for result in results {
match result {
DurableOutput::Activity(Ok(value)) => successes.push(value),
DurableOutput::Activity(Err(e)) => {
// Log error but continue processing other items
ctx.trace_error(format!("Item processing failed: {e}"));
}
_ => return Err("Unexpected result type".to_string()),
}
}
Ok(successes)
}§Retry Pattern
async fn retry_example(ctx: OrchestrationContext) -> Result<String, String> {
// Retry with linear backoff: 5 attempts, delay increases linearly (1s, 2s, 3s, 4s)
let result = ctx.schedule_activity_with_retry(
"UnreliableOperation",
"input",
RetryPolicy::new(5)
.with_backoff(BackoffStrategy::Linear {
base: Duration::from_secs(1),
max: Duration::from_secs(10),
}),
).await?;
Ok(result)
}§Examples
See the examples/ directory for complete, runnable examples:
hello_world.rs- Basic orchestration setupfan_out_fan_in.rs- Parallel processing pattern with error handlingtimers_and_events.rs- Human-in-the-loop workflows with timeoutsdelays_and_timeouts.rs- Correct usage of timers for delays and timeoutswith_observability.rs- Using observability features (tracing, metrics)metrics_cli.rs- Querying system metrics via CLI
Run examples with: cargo run --example <name>
§Architecture
This crate provides:
- Public data model:
Event,Actionfor history and decisions - Orchestration driver:
run_turn,run_turn_with, andExecutor - OrchestrationContext: Schedule activities, timers, and external events
- DurableFuture: Unified futures that can be composed with
join/select - Runtime: In-process execution engine with dispatchers and workers
- Providers: Pluggable storage backends (filesystem, in-memory)
§End-to-End System Architecture
+-------------------------------------------------------------------------+
| Application Layer |
+-------------------------------------------------------------------------+
| |
| +--------------+ +------------------------------------+ |
| | Client |-------->| start_orchestration() | |
| | | | raise_event() | |
| | | | wait_for_orchestration() | |
| +--------------+ +------------------------------------+ |
| |
+-------------------------------------------------------------------------+
|
v
+-------------------------------------------------------------------------+
| Runtime Layer |
+-------------------------------------------------------------------------+
| |
| +-------------------------------------------------------------------+ |
| | Runtime | |
| | +----------------------+ +----------------------+ | |
| | | Orchestration | | Work | | |
| | | Dispatcher | | Dispatcher | | |
| | | (N concurrent) | | (N concurrent) | | |
| | +----------+-----------+ +----------+-----------+ | |
| | | | | |
| | | Processes turns | Executes activities| |
| | | | | |
| +-------------+--------------------------------+--------------------+ |
| | | |
| +-------------v--------------------------------v--------------------+ |
| | OrchestrationRegistry: maps names -> orchestration handlers | |
| +-------------------------------------------------------------------+ |
| |
| +-------------------------------------------------------------------+ |
| | ActivityRegistry: maps names -> activity handlers | |
| +-------------------------------------------------------------------+ |
| |
+-------------------------------------------------------------------------+
| |
| Fetches work items | Fetches work items
| (peek-lock) | (peek-lock)
v v
+-------------------------------------------------------------------------+
| Provider Layer |
+-------------------------------------------------------------------------+
| |
| +----------------------------+ +----------------------------+ |
| | Orchestrator Queue | | Worker Queue | |
| | - StartOrchestration | | - ActivityExecute | |
| | - ActivityCompleted | | | |
| | - ActivityFailed | | | |
| | - TimerFired (delayed) | | | |
| | - ExternalRaised | | | |
| | - ContinueAsNew | | | |
| +----------------------------+ +----------------------------+ |
| |
| +-------------------------------------------------------------------+ |
| | Provider (Storage) | |
| | - History (Events per instance/execution) | |
| | - Instance metadata | |
| | - Execution metadata | |
| | - Instance locks (peek-lock semantics) | |
| | - Queue management (enqueue/dequeue with visibility) | |
| +-------------------------------------------------------------------+ |
| |
| +-------------------------------------------------------------------+ |
| | Storage Backend (SQLite, etc.) | |
| +-------------------------------------------------------------------+ |
| |
+-------------------------------------------------------------------------+
### Execution Flow
1. **Client** starts orchestration → enqueues `StartOrchestration` to orchestrator queue
2. **OrchestrationDispatcher** fetches work item (peek-lock), loads history from Provider
3. **Runtime** calls user's orchestration function with `OrchestrationContext`
4. **Orchestration** schedules activities/timers → Runtime appends `Event`s to history
5. **Runtime** enqueues `ActivityExecute` to worker queue, `TimerFired` (delayed) to orchestrator queue
6. **WorkDispatcher** fetches activity work item, executes via `ActivityRegistry`
7. **Activity** completes → enqueues `ActivityCompleted`/`ActivityFailed` to orchestrator queue
8. **OrchestrationDispatcher** processes completion → next orchestration turn
9. **Runtime** atomically commits history + queue changes via `ack_orchestration_item()`
All operations are deterministic and replayable from history.Re-exports§
pub use runtime::OrchestrationDescriptor;pub use client::Client;pub use client::ClientError;pub use runtime::OrchestrationHandler;pub use runtime::OrchestrationRegistry;pub use runtime::OrchestrationRegistryBuilder;pub use runtime::OrchestrationStatus;pub use runtime::RuntimeOptions;pub use providers::ExecutionInfo;pub use providers::InstanceInfo;pub use providers::ProviderAdmin;pub use providers::QueueDepths;pub use providers::SystemMetrics;pub use crate::futures::DurableFuture;pub use crate::futures::DurableOutput;pub use crate::futures::JoinFuture;pub use crate::futures::SelectFuture;
Modules§
Structs§
- Activity
Context - User-facing orchestration context for scheduling and replay-safe helpers. Context provided to activities for logging and metadata access.
- Event
- Unified event with common metadata and type-specific payload.
- Orchestration
Context - Retry
Policy - Retry policy for activities.
Enums§
- Action
- Declarative decisions produced by an orchestration turn. The host/provider
is responsible for materializing these into corresponding
Events. - AppError
Kind - Application error kinds.
- Backoff
Strategy - Backoff strategy for computing delay between retry attempts.
- Config
Error Kind - Configuration error kinds.
- Error
Details - Structured error details for orchestration failures.
- Event
Kind - Event-specific payloads.
- LogLevel
- Log levels for orchestration context logging.
- Poison
Message Type - Poison message type identification.
Constants§
- INITIAL_
EVENT_ ID - Initial event ID for new executions. The first event (OrchestrationStarted) always has event_id = 1.
- INITIAL_
EXECUTION_ ID - Initial execution ID for new orchestration instances. All orchestrations start with execution_id = 1.
Functions§
- run_
turn - Simple run_turn for tests. Uses default execution_id=1 and placeholder instance metadata.
- run_
turn_ with - Execute one orchestration turn with explicit execution_id. This is the full-featured run_turn implementation used by the runtime.
- run_
turn_ with_ status - Execute one orchestration turn and also return any nondeterminism flagged by futures.
This does not change the deterministic behavior of the orchestrator; it only surfaces
CtxInner.nondeterminism_errorthat futures may set during scheduling order checks.
Type Aliases§
- Orchestration
Handler Ref - Shared reference to an OrchestrationHandler
- Provider
Ref - Shared reference to a Provider implementation
- Turn
Result - Poll the orchestrator once with the provided history, producing
updated history, requested
Actions, and an optional output.