sayiir-runtime 0.1.0-alpha.2

Execution runtime, checkpointing runner and distributed workers for Sayiir workflows
Documentation

sayiir-runtime

Runtime execution strategies for durable workflows.

Execution Strategies

Strategy Use Case
CheckpointingRunner Single-process with crash recovery
PooledWorker Multi-worker horizontal scaling
InProcessRunner Simple in-memory execution (no persistence)

CheckpointingRunner

Executes an entire workflow within a single process, saving snapshots after each task. Fork branches run concurrently as tokio tasks.

flowchart LR
    subgraph Process["Single Process"]
        T1[Task 1] --> T2[Task 2] --> T3[Task 3] --> T4[Task 4]
    end

    T1 -.->|snapshot| DB[(Backend)]
    T2 -.->|snapshot| DB
    T3 -.->|snapshot| DB
    T4 -.->|snapshot| DB

When to use:

  • Single-node deployment
  • Crash recovery needed (resume from last checkpoint)
  • Simple deployment without coordination

Example:

use sayiir_runtime::CheckpointingRunner;
use sayiir_persistence::InMemoryBackend;

let backend = InMemoryBackend::new();
let runner = CheckpointingRunner::new(backend);

// Run workflow with automatic checkpointing
let status = runner.run(&workflow, "instance-123", input).await?;

// Resume after crash
let status = runner.resume(&workflow, "instance-123").await?;

PooledWorker

Multiple workers poll a shared backend, claim tasks, and execute them concurrently. Task claiming with TTL prevents duplicate execution.

flowchart TB
    subgraph Backend["Persistent Backend"]
        Q[/"Available Tasks"/]
    end

    Q --> W1["PooledWorker 1"]
    Q --> W2["PooledWorker 2"]
    Q --> W3["PooledWorker 3"]

    W1 -.->|claim + execute| Q
    W2 -.->|claim + execute| Q
    W3 -.->|claim + execute| Q

When to use:

  • Horizontal scaling across multiple machines
  • High throughput requirements
  • Fault tolerance (crashed workers' tasks auto-reclaim)

Example:

use sayiir_runtime::PooledWorker;
use sayiir_persistence::PostgresBackend;
use std::time::Duration;

let backend = PostgresBackend::new(pool);
let registry = TaskRegistry::new();

let worker = PooledWorker::new("worker-1", backend, registry)
    .with_claim_ttl(Some(Duration::from_secs(5 * 60)))
    .with_heartbeat_interval(Some(Duration::from_secs(2 * 60)));

// Spawn the worker and get a handle for lifecycle control
let handle = worker.spawn(Duration::from_secs(1), workflows);
// ... later, shut down gracefully ...
handle.shutdown();
handle.join().await?;

Comparison

Aspect CheckpointingRunner PooledWorker
Execution Single process Multiple workers
Concurrency Forks run as tokio tasks Claim-based distribution
Scaling Vertical Horizontal
Coordination None needed Via backend claims
Failure recovery Resume from snapshot Claim expires, task retried

Task Claiming (PooledWorker)

Workers use a heartbeat mechanism to hold task claims:

sequenceDiagram
    participant W as Worker
    participant B as Backend

    W->>B: claim_task (TTL: 5min)
    B-->>W: claim granted

    loop Every 2 minutes
        W->>B: extend_claim
    end

    W->>B: release_claim
    Note over W,B: Task complete

Configuration:

Setting Default Description
claim_ttl 5 minutes How long a claim is valid
heartbeat_interval 2 minutes How often to extend the claim
batch_size 1 Tasks to fetch per poll

If a worker crashes, its heartbeat stops and the claim expires, allowing another worker to pick up the task.


Polling vs Claiming

Important distinction: Fetching tasks and claiming tasks are separate operations.

sequenceDiagram
    participant W1 as Worker 1
    participant W2 as Worker 2
    participant B as Backend

    W1->>B: find_available_tasks (batch=3)
    B-->>W1: [task1, task2, task3]

    Note over W1: Starts executing task1
    W1->>B: claim_task(task1)
    B-->>W1: claimed ✓

    W2->>B: find_available_tasks (batch=3)
    B-->>W2: [task2, task3, task4]
    Note over W2: task2, task3 still available!

    W2->>B: claim_task(task2)
    B-->>W2: claimed ✓
  • Fetching returns task IDs that are currently unclaimed
  • Claiming happens when execution starts (one task at a time)
  • Other workers can "steal" fetched-but-not-yet-claimed tasks
  • batch_size controls fetch count, not claim count

With batch_size=1 (default), each worker fetches one task, executes it, then polls again. This minimizes stale task IDs while keeping polling overhead low.


Task Timeouts

Tasks can have a configured timeout duration. When a task exceeds its timeout, the workflow is marked as Failed and the task future is cancelled.

How it works

Timeouts are enforced through durable deadlines — an absolute wall-clock timestamp persisted in the workflow snapshot, not an ephemeral in-process timer.

sequenceDiagram
    participant W as Worker
    participant B as Backend

    W->>B: set_task_deadline(now + timeout)
    W->>B: save_snapshot

    Note over W: Start executing task

    loop Heartbeat (every TTL/2)
        W->>W: Utc::now() >= deadline?
        W->>B: extend_claim
    end

    alt Deadline expired
        W->>W: Drop task future (cancel)
        W->>B: mark_failed + save_snapshot
        W->>B: release_claim
        Note over W: Worker shuts down
    else Task completes in time
        W->>B: save result + clear deadline
    end

Enforcement by runner

Runner Mechanism Cancellation
PooledWorker Heartbeat checks persisted deadline Active — future dropped mid-execution
CheckpointingRunner Periodic check of persisted deadline Active — future dropped mid-execution
InProcessRunner tokio::select! with sleep Active — future dropped mid-execution

Durable deadline lifecycle

  1. Before execution: Compute deadline = now + timeout, persist in snapshot, save to backend
  2. Deadline refresh: Right before actual execution starts, recompute deadline = now + timeout so snapshot-save I/O doesn't eat into the time budget
  3. During execution: Periodically check Utc::now() >= deadline (piggybacks on heartbeat in distributed mode)
  4. On completion: Clear deadline from snapshot
  5. On timeout: Drop future, mark workflow Failed, worker shuts down

Crash recovery

If the process crashes while a task with a deadline is running:

  • The deadline is persisted in the snapshot (survives the crash)
  • On resume, the expired deadline is detected before re-executing the task
  • The workflow fails immediately with TaskTimedOut — no wasted re-execution

Cancellation semantics

Cancellation is cooperative (standard async Rust). Dropping the task future means:

  • The future stops being polled — it won't resume past its current .await point
  • Drop handlers run, resources are released
  • Spawned sub-tasks (tokio::spawn) are not cancelled — they run independently
  • CPU-bound work between .await points runs to completion

For typical async tasks (HTTP calls, DB queries, message sends), this provides effective mid-flight cancellation.

Worker shutdown on timeout

In PooledWorker, a task timeout triggers worker shutdown:

  1. Heartbeat detects expired deadline
  2. Task future is dropped (cancelled)
  3. Workflow marked Failed, snapshot saved, claim released
  4. execute_task returns Err(TaskTimedOut)
  5. Actor loop detects is_timeout() and exits gracefully
  6. Other workers see Failed state and skip the workflow

Performance

The deadline check is a zero-cost in-memory timestamp comparison (Utc::now() >= deadline) that piggybacks on the existing heartbeat timer. No additional I/O is required — the heartbeat already calls extend_task_claim on each tick regardless of timeouts. With the default 5-minute claim TTL, the heartbeat fires every 2.5 minutes.

Configuration

// Set timeout on a task via the builder
let workflow = WorkflowBuilder::new(ctx)
    .with_registry()
    .then("my_task", |i: u32| async move { Ok(i + 1) })
    .with_metadata(TaskMetadata {
        timeout: Some(Duration::from_secs(300)), // 5 minutes
        ..Default::default()
    })
    .build()?;

See Also

  • ROADMAP.md - Alternative claiming strategies and future improvements