Crate duroxide

Crate duroxide 

Source
Expand description

§Duroxide: Durable execution framework in Rust

Duroxide is a framework for building reliable, long-running code based workflows that can survive failures and restarts. For a deep dive into how durable execution works, see the Durable Futures Internals documentation.

§Quick Start

use duroxide::providers::sqlite::SqliteProvider;
use duroxide::runtime::registry::ActivityRegistry;
use duroxide::runtime::{self};
use duroxide::{ActivityContext, OrchestrationContext, OrchestrationRegistry, Client};
use std::sync::Arc;

// 1. Create a storage provider
let store = Arc::new(SqliteProvider::new("sqlite:./data.db", None).await.unwrap());

// 2. Register activities (your business logic)
let activities = ActivityRegistry::builder()
    .register("Greet", |_ctx: ActivityContext, name: String| async move {
        Ok(format!("Hello, {}!", name))
    })
    .build();

// 3. Define your orchestration
let orchestration = |ctx: OrchestrationContext, name: String| async move {
    let greeting = ctx.schedule_activity("Greet", name)
        .into_activity().await?;
    Ok(greeting)
};

// 4. Register and start the runtime
let orchestrations = OrchestrationRegistry::builder()
    .register("HelloWorld", orchestration)
    .build();

let rt = runtime::Runtime::start_with_store(
    store.clone(), Arc::new(activities), orchestrations
).await;

// 5. Create a client and start an orchestration instance
let client = Client::new(store.clone());
client.start_orchestration("inst-1", "HelloWorld", "World").await?;
let result = client.wait_for_orchestration("inst-1", std::time::Duration::from_secs(5)).await
    .map_err(|e| format!("Wait error: {:?}", e))?;

§Key Concepts

  • Orchestrations: Long-running workflows written as async functions (coordination logic)
  • Activities: Single-purpose work units (can do anything - DB, API, polling, etc.)
    • Supports long-running activities via automatic lock renewal (minutes to hours)
  • Timers: Use ctx.schedule_timer(ms) for orchestration-level delays and timeouts
  • Deterministic Replay: Orchestrations are replayed from history to ensure consistency
  • Durable Futures: Composable futures for activities, timers, and external events
  • ContinueAsNew (Multi-Execution): An orchestration can end the current execution and immediately start a new one with fresh input. Each execution has its own isolated history that starts with OrchestrationStarted { event_id: 1 }.

§⚠️ Important: Orchestrations vs Activities

Orchestrations = Coordination (control flow, business logic) Activities = Execution (single-purpose work units)

// ✅ CORRECT: Orchestration-level delay using timer
ctx.schedule_timer(Duration::from_secs(5)).into_timer().await;  // Wait 5 seconds

// ✅ ALSO CORRECT: Activity can poll/sleep as part of its work
// Example: Activity that provisions a VM and polls for readiness
// activities.register("ProvisionVM", |config| async move {
//     let vm = create_vm(config).await?;
//     while !vm_ready(&vm).await {
//         tokio::time::sleep(Duration::from_secs(5)).await;  // ✅ OK - part of provisioning
//     }
//     Ok(vm.id)
// });

// ❌ WRONG: Activity that ONLY sleeps (use timer instead)
// ctx.schedule_activity("Sleep5Seconds", "").into_activity().await;

Put in Activities (single-purpose execution units):

  • Database operations
  • API calls (can include retries/polling)
  • Data transformations
  • File I/O
  • VM provisioning (with internal polling)

Put in Orchestrations (coordination and business logic):

  • Control flow (if/else, match, loops)
  • Business decisions
  • Multi-step workflows
  • Error handling and compensation
  • Timeouts and deadlines (use timers)
  • Waiting for external events

§ContinueAsNew (Multi-Execution) Semantics

ContinueAsNew (CAN) allows an orchestration to end its current execution and start a new one with fresh input (useful for loops, pagination, long-running workflows).

  • Orchestration calls ctx.continue_as_new(new_input)
  • Runtime stamps OrchestrationContinuedAsNew in the CURRENT execution’s history
  • Runtime enqueues a WorkItem::ContinueAsNew
  • When processing that work item, the runtime starts a NEW execution with:
    • execution_id = previous_execution_id + 1
    • existing_history = [] (fresh history)
    • OrchestrationStarted { event_id: 1, input = new_input } is stamped automatically
  • Each execution’s history is independent; duroxide::Client::read_execution_history(instance, id) returns events for that execution only

Provider responsibilities are strictly storage-level (see below). The runtime owns all orchestration semantics, including execution boundaries and starting the new execution.

§Provider Responsibilities (At a Glance)

Providers are pure storage abstractions. The runtime computes orchestration semantics and passes explicit instructions to the provider.

  • fetch_orchestration_item()

    • Return a locked batch of work for ONE instance
    • Include full history for the CURRENT execution_id
    • Do NOT create/synthesize new executions here (even for ContinueAsNew)
  • ack_orchestration_item(lock_token, execution_id, history_delta, ..., metadata)

    • Atomic commit of one orchestration turn
    • Idempotently INSERT OR IGNORE execution row for the explicit execution_id
    • UPDATE instances.current_execution_id = MAX(current_execution_id, execution_id)
    • Append history_delta to the specified execution
    • Update executions.status and executions.output from metadata (no event inspection)
  • Worker/Timer queues

    • Peek-lock semantics (dequeue with lock token; ack by deleting)
    • Automatic lock renewal for long-running activities (no configuration needed)
    • Orchestrator, Worker, Timer queues are independent but committed atomically with history

See docs/provider-implementation-guide.md and src/providers/sqlite.rs for a complete, production-grade provider implementation.

§⚠️ Critical: DurableFuture Conversion Pattern

All schedule methods return DurableFuture - you MUST convert before awaiting:

// ✅ CORRECT patterns:
let result = ctx.schedule_activity("Task", "input").into_activity().await?;
ctx.schedule_timer(Duration::from_secs(5)).into_timer().await;
let event = ctx.schedule_wait("Event").into_event().await;
let sub_result = ctx.schedule_sub_orchestration("Sub", "input").into_sub_orchestration().await?;

// ❌ WRONG - These won't compile:
// let result = ctx.schedule_activity("Task", "input").await;  // Missing .into_activity()!
// ctx.schedule_timer(Duration::from_secs(5)).await;                            // Missing .into_timer()!
// let event = ctx.schedule_wait("Event").await;              // Missing .into_event()!

Why this pattern? DurableFuture is a unified type that can represent any async operation. The .into_*() methods convert it to the specific awaitable type you need.

§Common Patterns

§Function Chaining

async fn chain_example(ctx: OrchestrationContext) -> Result<String, String> {
    let step1 = ctx.schedule_activity("Step1", "input").into_activity().await?;
    let step2 = ctx.schedule_activity("Step2", &step1).into_activity().await?;
    Ok(step2)
}

§Fan-Out/Fan-In

async fn fanout_example(ctx: OrchestrationContext) -> Vec<String> {
    let futures = vec![
        ctx.schedule_activity("Process", "item1"),
        ctx.schedule_activity("Process", "item2"),
        ctx.schedule_activity("Process", "item3"),
    ];
    let results = ctx.join(futures).await;
    results.into_iter().map(|r| match r {
        DurableOutput::Activity(Ok(s)) => s,
        _ => "error".to_string(),
    }).collect()
}

§Human-in-the-Loop

async fn approval_example(ctx: OrchestrationContext) -> String {
    let timer = ctx.schedule_timer(Duration::from_secs(30)); // 30 second timeout
    let approval = ctx.schedule_wait("ApprovalEvent");
     
    let (_, result) = ctx.select2(timer, approval).await;
    match result {
        DurableOutput::External(data) => data,
        DurableOutput::Timer => "timeout".to_string(),
        _ => "error".to_string(),
    }
}

§Delays and Timeouts

async fn delay_example(ctx: OrchestrationContext) -> Result<String, String> {
    // ✅ CORRECT: Use timer for orchestration-level delays
    ctx.schedule_timer(Duration::from_secs(5)).into_timer().await;
     
    // Process after delay
    let result = ctx.schedule_activity("ProcessData", "input")
        .into_activity().await?;
    Ok(result)
}

async fn timeout_example(ctx: OrchestrationContext) -> Result<String, String> {
    // Race work against timeout
    let work = ctx.schedule_activity("SlowOperation", "input");
    let timeout = ctx.schedule_timer(Duration::from_secs(5));
     
    let (winner_index, result) = ctx.select2(work, timeout).await;
    match winner_index {
        0 => match result {
            DurableOutput::Activity(Ok(value)) => Ok(value),
            DurableOutput::Activity(Err(e)) => Err(format!("Work failed: {e}")),
            _ => unreachable!(),
        },
        1 => Err("Operation timed out".to_string()),
        _ => unreachable!(),
    }
}

§Fan-Out/Fan-In with Error Handling

async fn fanout_with_errors(ctx: OrchestrationContext, items: Vec<String>) -> Result<Vec<String>, String> {
    // Schedule all work in parallel
    let futures: Vec<_> = items.iter()
        .map(|item| ctx.schedule_activity("ProcessItem", item.clone()))
        .collect();
     
    // Wait for all to complete (deterministic order preserved)
    let results = ctx.join(futures).await;
     
    // Process results with error handling
    let mut successes = Vec::new();
    for result in results {
        match result {
            DurableOutput::Activity(Ok(value)) => successes.push(value),
            DurableOutput::Activity(Err(e)) => {
                // Log error but continue processing other items
                ctx.trace_error(format!("Item processing failed: {e}"));
            }
            _ => return Err("Unexpected result type".to_string()),
        }
    }
     
    Ok(successes)
}

§Retry Pattern

async fn retry_example(ctx: OrchestrationContext) -> Result<String, String> {
    // Retry with linear backoff: 5 attempts, delay increases linearly (1s, 2s, 3s, 4s)
    let result = ctx.schedule_activity_with_retry(
        "UnreliableOperation",
        "input",
        RetryPolicy::new(5)
            .with_backoff(BackoffStrategy::Linear {
                base: Duration::from_secs(1),
                max: Duration::from_secs(10),
            }),
    ).await?;
     
    Ok(result)
}

§Examples

See the examples/ directory for complete, runnable examples:

  • hello_world.rs - Basic orchestration setup
  • fan_out_fan_in.rs - Parallel processing pattern with error handling
  • timers_and_events.rs - Human-in-the-loop workflows with timeouts
  • delays_and_timeouts.rs - Correct usage of timers for delays and timeouts
  • with_observability.rs - Using observability features (tracing, metrics)
  • metrics_cli.rs - Querying system metrics via CLI

Run examples with: cargo run --example <name>

§Architecture

This crate provides:

  • Public data model: Event, Action for history and decisions
  • Orchestration driver: run_turn, run_turn_with, and Executor
  • OrchestrationContext: Schedule activities, timers, and external events
  • DurableFuture: Unified futures that can be composed with join/select
  • Runtime: In-process execution engine with dispatchers and workers
  • Providers: Pluggable storage backends (filesystem, in-memory)

§End-to-End System Architecture

+-------------------------------------------------------------------------+
|                           Application Layer                             |
+-------------------------------------------------------------------------+
|                                                                         |
|  +--------------+         +------------------------------------+        |
|  |    Client    |-------->|  start_orchestration()             |        |
|  |              |         |  raise_event()                     |        |
|  |              |         |  wait_for_orchestration()          |        |
|  +--------------+         +------------------------------------+        |
|                                                                         |
+-------------------------------------------------------------------------+
                                   |
                                   v
+-------------------------------------------------------------------------+
|                            Runtime Layer                                |
+-------------------------------------------------------------------------+
|                                                                         |
|  +-------------------------------------------------------------------+  |
|  |                         Runtime                                   |  |
|  |  +----------------------+         +----------------------+        |  |
|  |  | Orchestration        |         | Work                 |        |  |
|  |  | Dispatcher           |         | Dispatcher           |        |  |
|  |  | (N concurrent)       |         | (N concurrent)       |        |  |
|  |  +----------+-----------+         +----------+-----------+        |  |
|  |             |                                |                    |  |
|  |             | Processes turns                | Executes activities|  |
|  |             |                                |                    |  |
|  +-------------+--------------------------------+--------------------+  |
|                |                                |                       |
|  +-------------v--------------------------------v--------------------+  |
|  |  OrchestrationRegistry: maps names -> orchestration handlers     |  |
|  +-------------------------------------------------------------------+  |
|                                                                         |
|  +-------------------------------------------------------------------+  |
|  |  ActivityRegistry: maps names -> activity handlers               |  |
|  +-------------------------------------------------------------------+  |
|                                                                         |
+-------------------------------------------------------------------------+
               |                                |
               | Fetches work items             | Fetches work items
               | (peek-lock)                    | (peek-lock)
               v                                v
+-------------------------------------------------------------------------+
|                          Provider Layer                                 |
+-------------------------------------------------------------------------+
|                                                                         |
|  +----------------------------+    +----------------------------+       |
|  |  Orchestrator Queue        |    |  Worker Queue              |       |
|  |  - StartOrchestration      |    |  - ActivityExecute         |       |
|  |  - ActivityCompleted       |    |                            |       |
|  |  - ActivityFailed          |    |                            |       |
|  |  - TimerFired (delayed)    |    |                            |       |
|  |  - ExternalRaised          |    |                            |       |
|  |  - ContinueAsNew           |    |                            |       |
|  +----------------------------+    +----------------------------+       |
|                                                                         |
|  +-------------------------------------------------------------------+  |
|  |                     Provider (Storage)                            |  |
|  |  - History (Events per instance/execution)                        |  |
|  |  - Instance metadata                                              |  |
|  |  - Execution metadata                                             |  |
|  |  - Instance locks (peek-lock semantics)                           |  |
|  |  - Queue management (enqueue/dequeue with visibility)             |  |
|  +-------------------------------------------------------------------+  |
|                                                                         |
|  +-------------------------------------------------------------------+  |
|  |                Storage Backend (SQLite, etc.)                     |  |
|  +-------------------------------------------------------------------+  |
|                                                                         |
+-------------------------------------------------------------------------+

### Execution Flow

1. **Client** starts orchestration → enqueues `StartOrchestration` to orchestrator queue
2. **OrchestrationDispatcher** fetches work item (peek-lock), loads history from Provider
3. **Runtime** calls user's orchestration function with `OrchestrationContext`
4. **Orchestration** schedules activities/timers → Runtime appends `Event`s to history
5. **Runtime** enqueues `ActivityExecute` to worker queue, `TimerFired` (delayed) to orchestrator queue
6. **WorkDispatcher** fetches activity work item, executes via `ActivityRegistry`
7. **Activity** completes → enqueues `ActivityCompleted`/`ActivityFailed` to orchestrator queue
8. **OrchestrationDispatcher** processes completion → next orchestration turn
9. **Runtime** atomically commits history + queue changes via `ack_orchestration_item()`

All operations are deterministic and replayable from history.

Re-exports§

pub use runtime::OrchestrationDescriptor;
pub use client::Client;
pub use client::ClientError;
pub use runtime::OrchestrationHandler;
pub use runtime::OrchestrationRegistry;
pub use runtime::OrchestrationRegistryBuilder;
pub use runtime::OrchestrationStatus;
pub use runtime::RuntimeOptions;
pub use providers::ExecutionInfo;
pub use providers::InstanceInfo;
pub use providers::ProviderAdmin;
pub use providers::QueueDepths;
pub use providers::SystemMetrics;
pub use crate::futures::DurableFuture;
pub use crate::futures::DurableOutput;
pub use crate::futures::JoinFuture;
pub use crate::futures::SelectFuture;

Modules§

client
futures
providers
runtime

Structs§

ActivityContext
User-facing orchestration context for scheduling and replay-safe helpers. Context provided to activities for logging and metadata access.
Event
Unified event with common metadata and type-specific payload.
OrchestrationContext
RetryPolicy
Retry policy for activities.

Enums§

Action
Declarative decisions produced by an orchestration turn. The host/provider is responsible for materializing these into corresponding Events.
AppErrorKind
Application error kinds.
BackoffStrategy
Backoff strategy for computing delay between retry attempts.
ConfigErrorKind
Configuration error kinds.
ErrorDetails
Structured error details for orchestration failures.
EventKind
Event-specific payloads.
LogLevel
Log levels for orchestration context logging.
PoisonMessageType
Poison message type identification.

Constants§

INITIAL_EVENT_ID
Initial event ID for new executions. The first event (OrchestrationStarted) always has event_id = 1.
INITIAL_EXECUTION_ID
Initial execution ID for new orchestration instances. All orchestrations start with execution_id = 1.

Functions§

run_turn
Simple run_turn for tests. Uses default execution_id=1 and placeholder instance metadata.
run_turn_with
Execute one orchestration turn with explicit execution_id. This is the full-featured run_turn implementation used by the runtime.
run_turn_with_status
Execute one orchestration turn and also return any nondeterminism flagged by futures. This does not change the deterministic behavior of the orchestrator; it only surfaces CtxInner.nondeterminism_error that futures may set during scheduling order checks.

Type Aliases§

OrchestrationHandlerRef
Shared reference to an OrchestrationHandler
ProviderRef
Shared reference to a Provider implementation
TurnResult
Poll the orchestrator once with the provided history, producing updated history, requested Actions, and an optional output.