Expand description
§Duroxide: Durable execution framework in Rust
Duroxide is a framework for building reliable, long-running code based workflows that can survive failures and restarts. For a deep dive into how durable execution works, see the Durable Futures Internals documentation.
§Quick Start
use duroxide::providers::sqlite::SqliteProvider;
use duroxide::runtime::registry::ActivityRegistry;
use duroxide::runtime::{self};
use duroxide::{ActivityContext, OrchestrationContext, OrchestrationRegistry, Client};
use std::sync::Arc;
// 1. Create a storage provider
let store = Arc::new(SqliteProvider::new("sqlite:./data.db", None).await.unwrap());
// 2. Register activities (your business logic)
let activities = ActivityRegistry::builder()
.register("Greet", |_ctx: ActivityContext, name: String| async move {
Ok(format!("Hello, {}!", name))
})
.build();
// 3. Define your orchestration
let orchestration = |ctx: OrchestrationContext, name: String| async move {
let greeting = ctx.schedule_activity("Greet", name).await?;
Ok(greeting)
};
// 4. Register and start the runtime
let orchestrations = OrchestrationRegistry::builder()
.register("HelloWorld", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(
store.clone(), activities, orchestrations
).await;
// 5. Create a client and start an orchestration instance
let client = Client::new(store.clone());
client.start_orchestration("inst-1", "HelloWorld", "World").await?;
let result = client.wait_for_orchestration("inst-1", std::time::Duration::from_secs(5)).await
.map_err(|e| format!("Wait error: {:?}", e))?;§Key Concepts
- Orchestrations: Long-running workflows written as async functions (coordination logic)
- Activities: Single-purpose work units (can do anything - DB, API, polling, etc.)
- Supports long-running activities via automatic lock renewal (minutes to hours)
- Timers: Use
ctx.schedule_timer(ms)for orchestration-level delays and timeouts - Deterministic Replay: Orchestrations are replayed from history to ensure consistency
- Durable Futures: Composable futures for activities, timers, and external events
- ContinueAsNew (Multi-Execution): An orchestration can end the current execution and
immediately start a new one with fresh input. Each execution has its own isolated history
that starts with
OrchestrationStarted { event_id: 1 }.
§⚠️ Important: Orchestrations vs Activities
Orchestrations = Coordination (control flow, business logic) Activities = Execution (single-purpose work units)
// ✅ CORRECT: Orchestration-level delay using timer
ctx.schedule_timer(Duration::from_secs(5)).await; // Wait 5 seconds
// ✅ ALSO CORRECT: Activity can poll/sleep as part of its work
// Example: Activity that provisions a VM and polls for readiness
// activities.register("ProvisionVM", |config| async move {
// let vm = create_vm(config).await?;
// while !vm_ready(&vm).await {
// tokio::time::sleep(Duration::from_secs(5)).await; // ✅ OK - part of provisioning
// }
// Ok(vm.id)
// });
// ❌ WRONG: Activity that ONLY sleeps (use timer instead)
// ctx.schedule_activity("Sleep5Seconds", "").await;Put in Activities (single-purpose execution units):
- Database operations
- API calls (can include retries/polling)
- Data transformations
- File I/O
- VM provisioning (with internal polling)
Put in Orchestrations (coordination and business logic):
- Control flow (if/else, match, loops)
- Business decisions
- Multi-step workflows
- Error handling and compensation
- Timeouts and deadlines (use timers)
- Waiting for external events
§ContinueAsNew (Multi-Execution) Semantics
ContinueAsNew (CAN) allows an orchestration to end its current execution and start a new one with fresh input (useful for loops, pagination, long-running workflows).
- Orchestration calls
ctx.continue_as_new(new_input) - Runtime stamps
OrchestrationContinuedAsNewin the CURRENT execution’s history - Runtime enqueues a
WorkItem::ContinueAsNew - When processing that work item, the runtime starts a NEW execution with:
execution_id = previous_execution_id + 1existing_history = [](fresh history)OrchestrationStarted { event_id: 1, input = new_input }is stamped automatically
- Each execution’s history is independent;
duroxide::Client::read_execution_history(instance, id)returns events for that execution only
Provider responsibilities are strictly storage-level (see below). The runtime owns all orchestration semantics, including execution boundaries and starting the new execution.
§Provider Responsibilities (At a Glance)
Providers are pure storage abstractions. The runtime computes orchestration semantics and passes explicit instructions to the provider.
-
fetch_orchestration_item()- Return a locked batch of work for ONE instance
- Include full history for the CURRENT
execution_id - Do NOT create/synthesize new executions here (even for ContinueAsNew)
-
ack_orchestration_item(lock_token, execution_id, history_delta, ..., metadata)- Atomic commit of one orchestration turn
- Idempotently
INSERT OR IGNOREexecution row for the explicitexecution_id UPDATE instances.current_execution_id = MAX(current_execution_id, execution_id)- Append
history_deltato the specified execution - Update
executions.statusandexecutions.outputfrommetadata(no event inspection)
-
Worker/Timer queues
- Peek-lock semantics (dequeue with lock token; ack by deleting)
- Automatic lock renewal for long-running activities (no configuration needed)
- Orchestrator, Worker, Timer queues are independent but committed atomically with history
See docs/provider-implementation-guide.md and src/providers/sqlite.rs for a complete,
production-grade provider implementation.
§Simplified API
All schedule methods return typed futures that can be awaited directly:
// Activities return Result<String, String>
let result = ctx.schedule_activity("Task", "input").await?;
// Timers return ()
ctx.schedule_timer(Duration::from_secs(5)).await;
// External events return String
let event = ctx.schedule_wait("Event").await;
// Sub-orchestrations return Result<String, String>
let sub_result = ctx.schedule_sub_orchestration("Sub", "input").await?;§Common Patterns
§Function Chaining
async fn chain_example(ctx: OrchestrationContext) -> Result<String, String> {
let step1 = ctx.schedule_activity("Step1", "input").await?;
let step2 = ctx.schedule_activity("Step2", &step1).await?;
Ok(step2)
}§Fan-Out/Fan-In
async fn fanout_example(ctx: OrchestrationContext) -> Vec<String> {
let futures = vec![
ctx.schedule_activity("Process", "item1"),
ctx.schedule_activity("Process", "item2"),
ctx.schedule_activity("Process", "item3"),
];
let results = ctx.join(futures).await;
results.into_iter().filter_map(|r| r.ok()).collect()
}§Human-in-the-Loop (Timeout Pattern)
async fn approval_example(ctx: OrchestrationContext) -> String {
let timeout = ctx.schedule_timer(Duration::from_secs(30));
let approval = ctx.schedule_wait("ApprovalEvent");
match ctx.select2(approval, timeout).await {
Either2::First(data) => data,
Either2::Second(()) => "timeout".to_string(),
}
}§Delays and Timeouts
async fn delay_example(ctx: OrchestrationContext) -> Result<String, String> {
// Use timer for orchestration-level delays
ctx.schedule_timer(Duration::from_secs(5)).await;
// Process after delay
let result = ctx.schedule_activity("ProcessData", "input").await?;
Ok(result)
}
async fn timeout_example(ctx: OrchestrationContext) -> Result<String, String> {
// Race work against timeout
let work = ctx.schedule_activity("SlowOperation", "input");
let timeout = ctx.schedule_timer(Duration::from_secs(30));
match ctx.select2(work, timeout).await {
Either2::First(result) => result,
Either2::Second(()) => Err("Operation timed out".to_string()),
}
}§Fan-Out/Fan-In with Error Handling
async fn fanout_with_errors(ctx: OrchestrationContext, items: Vec<String>) -> Result<Vec<String>, String> {
// Schedule all work in parallel
let futures: Vec<_> = items.iter()
.map(|item| ctx.schedule_activity("ProcessItem", item.clone()))
.collect();
// Wait for all to complete (deterministic order preserved)
let results = ctx.join(futures).await;
// Process results with error handling
let mut successes = Vec::new();
for result in results {
match result {
Ok(value) => successes.push(value),
Err(e) => {
// Log error but continue processing other items
ctx.trace_error(format!("Item processing failed: {e}"));
}
}
}
Ok(successes)
}§Retry Pattern
async fn retry_example(ctx: OrchestrationContext) -> Result<String, String> {
// Retry with linear backoff: 5 attempts, delay increases linearly (1s, 2s, 3s, 4s)
let result = ctx.schedule_activity_with_retry(
"UnreliableOperation",
"input",
RetryPolicy::new(5)
.with_backoff(BackoffStrategy::Linear {
base: Duration::from_secs(1),
max: Duration::from_secs(10),
}),
).await?;
Ok(result)
}§Examples
See the examples/ directory for complete, runnable examples:
hello_world.rs- Basic orchestration setupfan_out_fan_in.rs- Parallel processing pattern with error handlingtimers_and_events.rs- Human-in-the-loop workflows with timeoutsdelays_and_timeouts.rs- Correct usage of timers for delays and timeoutswith_observability.rs- Using observability features (tracing, metrics)metrics_cli.rs- Querying system metrics via CLI
Run examples with: cargo run --example <name>
§Architecture
This crate provides:
- Public data model:
Event,Actionfor history and decisions - Orchestration driver:
run_turn,run_turn_with, andExecutor - OrchestrationContext: Schedule activities, timers, and external events
- Deterministic futures:
schedule_*()return standardFutures that can be composed withjoin/select - Runtime: In-process execution engine with dispatchers and workers
- Providers: Pluggable storage backends (filesystem, in-memory)
§End-to-End System Architecture
+-------------------------------------------------------------------------+
| Application Layer |
+-------------------------------------------------------------------------+
| |
| +--------------+ +------------------------------------+ |
| | Client |-------->| start_orchestration() | |
| | | | raise_event() | |
| | | | wait_for_orchestration() | |
| +--------------+ +------------------------------------+ |
| |
+-------------------------------------------------------------------------+
|
v
+-------------------------------------------------------------------------+
| Runtime Layer |
+-------------------------------------------------------------------------+
| |
| +-------------------------------------------------------------------+ |
| | Runtime | |
| | +----------------------+ +----------------------+ | |
| | | Orchestration | | Work | | |
| | | Dispatcher | | Dispatcher | | |
| | | (N concurrent) | | (N concurrent) | | |
| | +----------+-----------+ +----------+-----------+ | |
| | | | | |
| | | Processes turns | Executes activities| |
| | | | | |
| +-------------+--------------------------------+--------------------+ |
| | | |
| +-------------v--------------------------------v--------------------+ |
| | OrchestrationRegistry: maps names -> orchestration handlers | |
| +-------------------------------------------------------------------+ |
| |
| +-------------------------------------------------------------------+ |
| | ActivityRegistry: maps names -> activity handlers | |
| +-------------------------------------------------------------------+ |
| |
+-------------------------------------------------------------------------+
| |
| Fetches work items | Fetches work items
| (peek-lock) | (peek-lock)
v v
+-------------------------------------------------------------------------+
| Provider Layer |
+-------------------------------------------------------------------------+
| |
| +----------------------------+ +----------------------------+ |
| | Orchestrator Queue | | Worker Queue | |
| | - StartOrchestration | | - ActivityExecute | |
| | - ActivityCompleted | | | |
| | - ActivityFailed | | | |
| | - TimerFired (delayed) | | | |
| | - ExternalRaised | | | |
| | - ContinueAsNew | | | |
| +----------------------------+ +----------------------------+ |
| |
| +-------------------------------------------------------------------+ |
| | Provider (Storage) | |
| | - History (Events per instance/execution) | |
| | - Instance metadata | |
| | - Execution metadata | |
| | - Instance locks (peek-lock semantics) | |
| | - Queue management (enqueue/dequeue with visibility) | |
| +-------------------------------------------------------------------+ |
| |
| +-------------------------------------------------------------------+ |
| | Storage Backend (SQLite, etc.) | |
| +-------------------------------------------------------------------+ |
| |
+-------------------------------------------------------------------------+
### Execution Flow
1. **Client** starts orchestration → enqueues `StartOrchestration` to orchestrator queue
2. **OrchestrationDispatcher** fetches work item (peek-lock), loads history from Provider
3. **Runtime** calls user's orchestration function with `OrchestrationContext`
4. **Orchestration** schedules activities/timers → Runtime appends `Event`s to history
5. **Runtime** enqueues `ActivityExecute` to worker queue, `TimerFired` (delayed) to orchestrator queue
6. **WorkDispatcher** fetches activity work item, executes via `ActivityRegistry`
7. **Activity** completes → enqueues `ActivityCompleted`/`ActivityFailed` to orchestrator queue
8. **OrchestrationDispatcher** processes completion → next orchestration turn
9. **Runtime** atomically commits history + queue changes via `ack_orchestration_item()`
All operations are deterministic and replayable from history.Re-exports§
pub use runtime::OrchestrationDescriptor;pub use client::Client;pub use client::ClientError;pub use runtime::OrchestrationHandler;pub use runtime::OrchestrationRegistry;pub use runtime::OrchestrationRegistryBuilder;pub use runtime::OrchestrationStatus;pub use runtime::RuntimeOptions;pub use providers::ExecutionInfo;pub use providers::InstanceInfo;pub use providers::ProviderAdmin;pub use providers::QueueDepths;pub use providers::ScheduledActivityIdentifier;pub use providers::SystemMetrics;pub use providers::DeleteInstanceResult;pub use providers::InstanceFilter;pub use providers::InstanceTree;pub use providers::PruneOptions;pub use providers::PruneResult;
Modules§
Structs§
- Activity
Context - User-facing orchestration context for scheduling and replay-safe helpers. Context provided to activities for logging and metadata access.
- Durable
Future - A wrapper around scheduled futures that supports cancellation on drop.
- Event
- Unified event with common metadata and type-specific payload.
- Orchestration
Context - Retry
Policy - Retry policy for activities.
Enums§
- Action
- Declarative decisions produced by an orchestration turn. The host/provider
is responsible for materializing these into corresponding
Events. - AppError
Kind - Application error kinds.
- Backoff
Strategy - Backoff strategy for computing delay between retry attempts.
- Config
Error Kind - Configuration error kinds.
- Either2
- Result type for
select2- represents which of two futures completed first. - Either3
- Result type for
select3- represents which of three futures completed first. - Error
Details - Structured error details for orchestration failures.
- Event
Kind - Event-specific payloads.
- LogLevel
- Log levels for orchestration context logging.
- Poison
Message Type - Poison message type identification.
- Schedule
Kind - Identifies the kind of scheduled work for cancellation purposes.
Constants§
- INITIAL_
EVENT_ ID - Initial event ID for new executions. The first event (OrchestrationStarted) always has event_id = 1.
- INITIAL_
EXECUTION_ ID - Initial execution ID for new orchestration instances. All orchestrations start with execution_id = 1.
- SUB_
ORCH_ AUTO_ PREFIX - Prefix for auto-generated sub-orchestration instance IDs.
IDs starting with this prefix will have parent prefix added:
{parent}::{sub::N}
Functions§
- build_
child_ instance_ id - Build the full child instance ID, adding parent prefix only for auto-generated IDs.
- is_
auto_ generated_ sub_ orch_ id - Determine if a sub-orchestration instance ID is auto-generated (needs parent prefix).
Type Aliases§
- Orchestration
Handler Ref - Shared reference to an OrchestrationHandler
- Provider
Ref - Shared reference to a Provider implementation