Skip to main content

Crate cloacina

Crate cloacina 

Source
Expand description

§Cloacina: Embedded Workflow Framework for Rust

Cloacina is a library for building resilient task workflows directly within your Rust applications. Unlike standalone orchestration services (Airflow, Prefect), Cloacina embeds into your existing applications to manage complex multi-step workflows with automatic retry, state persistence, and dependency resolution.

§What Cloacina Is

  • Embedded Framework: Integrates directly into your Rust applications
  • Resilient Execution: Automatic retries, failure recovery, and state persistence
  • Type-Safe Workflows: Compile-time validation of task dependencies and data flow
  • Database-Backed: Uses PostgreSQL or SQLite for reliable state management
  • Multi-Tenant Ready: PostgreSQL schema-based isolation for complete tenant separation
  • Async-First: Built on tokio for high-performance concurrent execution
  • Content-Versioned: Automatic workflow versioning based on task code and structure

§What Cloacina Is Not

  • Standalone Service: Not a separate process you deploy and manage
  • Distributed Scheduler: Doesn’t coordinate tasks across multiple machines
  • Web Platform: No built-in UI or REST API (though you can build one)
  • Cron Replacement: Use proper schedulers for time-based triggering
  • Message Queue: Not designed for high-throughput message processing
  • State Machine: Focuses on task execution rather than state transitions

§Perfect For

  • Data Processing Applications: ETL workflows within your data services
  • Background Job Processing: Complex multi-step jobs in web applications
  • Batch Processing: Resilient batch operations with dependency management
  • Integration Workflows: Multi-step API integrations with error recovery
  • Report Generation: Multi-step report creation with data dependencies
  • Data Migration: Complex data transformation and loading operations

§Quick Start Tutorial

§Your First Task

Define a task using the #[task] macro:

use cloacina::*;

#[task(
    id = "process_data",
    dependencies = []
)]
async fn process_data(context: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
    // Your business logic here
    context.insert("processed", serde_json::json!(true))?;
    println!("Data processed successfully!");
    Ok(())
}

§Building a Workflow

Create workflows with the workflow! macro:

use cloacina::*;

#[task(id = "extract", dependencies = [])]
async fn extract_data(ctx: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
    ctx.insert("raw_data", serde_json::json!({"users": [1, 2, 3]}))?;
    Ok(())
}

#[task(id = "transform", dependencies = ["extract"])]
async fn transform_data(ctx: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
    if let Some(data) = ctx.get("raw_data") {
        ctx.insert("transformed_data", serde_json::json!({"processed": data}))?;
    }
    Ok(())
}

#[task(id = "load", dependencies = ["transform"])]
async fn load_data(ctx: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
    println!("Loading data to warehouse...");
    Ok(())
}

// Create the workflow
let workflow = workflow! {
    name: "etl_pipeline",
    description: "Extract, Transform, Load workflow",
    tasks: [extract_data, transform_data, load_data]
};

§Execution with Database Persistence

use cloacina::runner::{DefaultRunner, WorkflowExecutor};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Initialize executor with database connection
    let runner = DefaultRunner::new("postgresql://user:pass@localhost/mydb").await?;

    // Execute workflow with automatic state persistence
    let context = Context::new();
    let result = executor.execute("etl_pipeline", context).await?;

    println!("Workflow completed: {:?}", result.status);
    Ok(())
}

§Multi-Tenant Support

Cloacina provides complete tenant isolation with zero collision risk:

§PostgreSQL Schema-Based Multi-Tenancy

use cloacina::runner::DefaultRunner;

// Each tenant gets their own PostgreSQL schema
let tenant_a = DefaultRunner::with_schema(
    "postgresql://user:pass@localhost/cloacina",
    "tenant_a"
).await?;

let tenant_b = DefaultRunner::with_schema(
    "postgresql://user:pass@localhost/cloacina",
    "tenant_b"
).await?;

// Or using the builder pattern
let runner = DefaultRunner::builder()
    .database_url("postgresql://user:pass@localhost/cloacina")
    .schema("my_tenant")
    .build()
    .await?;

§SQLite File-Based Multi-Tenancy

// Each tenant gets their own database file
let tenant_a = DefaultRunner::new("sqlite://./tenant_a.db").await?;
let tenant_b = DefaultRunner::new("sqlite://./tenant_b.db").await?;

Benefits:

  • Zero collision risk - Impossible for tenants to access each other’s data
  • No query changes - All existing DAL code works unchanged
  • Performance - No overhead from filtering every query
  • Clean separation - Each tenant can even have different schema versions

§Core Concepts

§Tasks

Tasks are the fundamental units of work. Use the #[task] macro for the most convenient definition:

#[task(
    id = "my_task",
    dependencies = ["other_task_id"],
    retry_policy = RetryPolicy::builder()
        .max_attempts(3)
        .build()
)]
async fn my_task(context: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
    // Task implementation
    Ok(())
}

Task attributes:

  • id: Unique identifier for the task
  • dependencies: List of task IDs that must complete before this task runs
  • retry_policy: Optional configuration for automatic retries
  • trigger_rules: Optional conditions for task execution

§Context

The Context is a type-safe container for sharing data between tasks:

// Insert data
context.insert("user_id", serde_json::json!(12345))?;
context.insert("config", serde_json::json!({"env": "prod"}))?;

// Read data in later tasks
if let Some(user_id) = context.get("user_id") {
    println!("Processing for user: {}", user_id);
}

Context features:

  • Type-safe serialization with serde_json
  • Atomic updates for data consistency
  • Automatic persistence to database
  • Thread-safe access patterns

§Dependency Management

Cloacina automatically resolves task dependencies and executes them in the correct order:

#[task(id = "a", dependencies = [])]
async fn task_a(_: &mut Context<serde_json::Value>) -> Result<(), TaskError> { Ok(()) }

#[task(id = "b", dependencies = ["a"])]  // Runs after A
async fn task_b(_: &mut Context<serde_json::Value>) -> Result<(), TaskError> { Ok(()) }

#[task(id = "c", dependencies = ["a", "b"])]  // Runs after A and B
async fn task_c(_: &mut Context<serde_json::Value>) -> Result<(), TaskError> { Ok(()) }

// Execution order: A → B → C

Dependency features:

  • Automatic cycle detection
  • Parallel execution of independent tasks
  • Runtime validation of dependency chains
  • Visual dependency graph generation

§Architecture Overview

graph TB
    subgraph "Your Application"
        A[Task Definitions]
        B[Workflow Builder]
        C[Workflow Execution]
    end

    subgraph "Cloacina Core"
        D[Task Registry]
        E[Context Management]
        F[Dependency Resolution]
        G[Scheduler Engine]
    end

    subgraph "Persistence Layer"
        H[Data Access Layer]
        I[Database Models]
        J[PostgreSQL Database]
    end

    A --> D
    B --> F
    C --> G
    D --> H
    E --> H
    F --> H
    G --> H
    H --> I
    I --> J

§Task Execution Lifecycle

sequenceDiagram
    participant App as Your App
    participant W as Workflow
    participant S as Scheduler
    participant T as Task
    participant DB as Database

    App->>W: Build workflow
    W->>W: Validate dependencies
    App->>S: Execute workflow
    S->>DB: Load/create execution state
    S->>S: Plan execution order
    loop For each task level
        S->>T: Execute task(s)
        T->>T: Process data
        T->>DB: Persist context updates
        T->>S: Signal completion
    end
    S->>App: Return results

§Error Handling and Retries

Configure retry policies for resilient execution:

use std::time::Duration;

#[task(
    id = "network_task",
    dependencies = [],
    retry_policy = RetryPolicy::builder()
        .max_attempts(3)
        .initial_delay(Duration::from_secs(1))
        .backoff_strategy(BackoffStrategy::Exponential { base: 2.0, multiplier: 1.0 })
        .retry_condition(RetryCondition::TransientOnly)
        .build()
)]
async fn network_task(ctx: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
    // Network operation that might fail and will be retried
    Ok(())
}

Retry features:

  • Configurable backoff strategies
  • Conditional retry based on error type
  • Maximum attempt limits
  • Custom retry conditions
  • Exponential and linear backoff

§Conditional Execution

Use trigger rules for conditional task execution:

#[task(
    id = "conditional_task",
    dependencies = ["validation_task"],
    trigger_rules = serde_json::json!({
        "type": "Conditional",
        "condition": {
            "field": "validation_passed",
            "operator": "Equals",
            "value": true
        }
    })
)]
async fn conditional_task(ctx: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
    // Only runs if validation_passed == true in context
    Ok(())
}

Trigger rule features:

  • Field-based conditions
  • Multiple operators (Equals, NotEquals, GreaterThan, etc.)
  • Complex boolean expressions
  • Context-aware evaluation

§Database Setup

Cloacina requires PostgreSQL for state persistence:

# Create database
createdb myapp_pipelines

# Set connection string
export DATABASE_URL="postgresql://user:password@localhost/myapp_pipelines"

Database features:

  • Automatic schema migrations
  • Transaction support
  • Connection pooling
  • Execution history tracking
  • State persistence

§Feature Overview

  • Type Safety: Leverages Rust’s type system for compile-time guarantees
  • Content-Based Versioning: Automatic workflow versioning based on task code and structure
  • Async/Await: Built on tokio for high-performance async execution
  • Database Integration: PostgreSQL support with Diesel ORM
  • Dependency Resolution: Automatic topological sorting and cycle detection
  • Error Recovery: Comprehensive error types and checkpoint support
  • Macro Support: Convenient procedural macros for task definition with code fingerprinting
  • Logging: Structured logging with configurable levels
  • Metrics: Built-in performance monitoring
  • Testing: Comprehensive test utilities and mocks

§Documentation Navigation

§Learn Cloacina (Tutorials)

§Solve Problems (How-To Guides)

§API Reference

  • Task - Core task trait and macro
  • Workflow - Workflow construction and management
  • Context - Data container for inter-task communication
  • TaskScheduler - Execution engine and scheduling
  • Error Types - Complete error hierarchy

§Understand the Design (Explanations)

§Modules

  • context: Context management for sharing data between tasks
  • [task]: Core task trait and registry functionality
  • [workflow]: Workflow construction and dependency management
  • registry: Dynamic workflow package loading and storage
  • database: Database connection and persistence
  • error: Comprehensive error types
  • models: Database models and schemas
  • dal: Data access layer
  • execution_planner: Task scheduler for persistent workflow execution
  • executor: Unified execution engine
  • logging: Structured logging setup
  • retry: Retry policies and backoff strategies

Re-exports§

pub extern crate cloacina_workflow;
pub extern crate cloacina_workflow_plugin;
pub use logging::init_logging;
pub use var::var;
pub use var::var_or;
pub use database::connection::Database;
pub use cron_recovery::CronRecoveryConfig;
pub use cron_recovery::CronRecoveryService;
pub use cron_trigger_scheduler::Scheduler;
pub use cron_trigger_scheduler::SchedulerConfig;
pub use database::AdminError;
pub use database::DatabaseAdmin;
pub use database::TenantConfig;
pub use database::TenantCredentials;
pub use database::UniversalBool;
pub use database::UniversalTimestamp;
pub use database::UniversalUuid;
pub use dispatcher::DefaultDispatcher;
pub use dispatcher::DispatchError;
pub use dispatcher::Dispatcher;
pub use dispatcher::ExecutionResult;
pub use dispatcher::ExecutionStatus;
pub use dispatcher::ExecutorMetrics;
pub use dispatcher::RoutingConfig;
pub use dispatcher::RoutingRule;
pub use dispatcher::TaskExecutor;
pub use dispatcher::TaskReadyEvent;
pub use error::ContextError;
pub use error::ExecutorError;
pub use error::RegistrationError;
pub use error::SubgraphError;
pub use error::ValidationError;
pub use error::WorkflowError;
pub use execution_planner::TaskScheduler;
pub use execution_planner::TriggerCondition;
pub use execution_planner::TriggerRule;
pub use execution_planner::ValueOperator;
pub use executor::return_task_handle;
pub use executor::take_task_handle;
pub use executor::with_task_handle;
pub use executor::ExecutorConfig;
pub use executor::TaskHandle;
pub use executor::TaskResult;
pub use executor::ThreadTaskExecutor;
pub use executor::WorkflowExecution;
pub use executor::WorkflowExecutionError;
pub use executor::WorkflowExecutionResult;
pub use executor::WorkflowExecutor;
pub use executor::WorkflowStatus;
pub use graph::DependencyEdge;
pub use graph::GraphEdge;
pub use graph::GraphMetadata;
pub use graph::GraphNode;
pub use graph::TaskNode;
pub use graph::WorkflowGraph;
pub use graph::WorkflowGraphData;
pub use inventory_entries::StreamBackendEntry;
pub use inventory_entries::StreamBackendFactoryFn;
pub use inventory_entries::WorkflowEntry;
pub use runner::DefaultRunnerBuilder;
pub use runner::DefaultRunner;
pub use runner::DefaultRunnerConfig;
pub use runtime::Runtime;
pub use task::TaskRegistry;
pub use trigger::TriggerConfig;
pub use trigger::TriggerError;
pub use workflow::DependencyGraph;
pub use workflow::Workflow;
pub use workflow::WorkflowBuilder;
pub use workflow::WorkflowMetadata;
pub use cloacina_computation_graph;
pub use inventory;

Modules§

computation_graph
Computation Graph Runtime Types
context
Context Management
cron_evaluator
T-0553 / I-0102: timezone-aware cron expression evaluator relocated to cloacina-workflow so packaged cdylibs (which depend on cloacina-workflow but not on cloacina) can use it from the #[trigger(cron = "...")] macro emission. Engine paths re-export.
cron_recovery
Cron execution recovery service for handling lost executions.
cron_trigger_scheduler
Cron and event-trigger schedule management. For task readiness and workflow execution planning, see execution_planner. Unified scheduler for both cron and trigger-based workflow execution.
crypto
Cryptographic utilities for package signing.
dal
Data Access Layer with runtime backend selection
database
Database Layer
dispatcher
Dispatcher Layer for Executor Decoupling
error
Error Types
execution_planner
Task readiness evaluation, workflow processing, and stale claim sweeping. For cron and trigger scheduling, see cron_trigger_scheduler.
executor
Task Executor
graph
Workflow Graph Data Structures
inventory_entries
Inventory entry types for linker-collected registry seeding.
logging
Logging Configuration
models
Database Models
packaging
Workflow packaging functionality for creating distributable workflow packages.
prelude
Prelude module for convenient imports.
python_runtime
Indirection layer between the reconciler and the Python runtime.
registry
Workflow Registry
retry
Retry Policy System
runner
Workflow runners for executing complete workflows.
runtime
Scoped runtime unifying all cloacina registries.
security
Security module for package signing and key management.
task
Task Management
trigger
Trigger System
var
Variable registry for external connections and configuration.
workflow
Workflow Management

Macros§

dispatch_backend
Dispatches to backend-specific code based on compile-time features.

Structs§

ComputationGraphEntry
Computation graph entry emitted by #[computation_graph] for the reactor-triggered (split) form. The package!() shell walks inventory::iter::<ComputationGraphEntry> to build the metadata returned by get_graph_metadata and to dispatch execution in execute_graph. At most one entry is expected per cdylib; the shell’s body errors if it finds more than one.
ComputationGraphRegistration
Metadata about a registered computation graph.
Context
A context that holds data for pipeline execution.
CronEvaluator
Timezone-aware cron expression evaluator.
ReactorEntry
Reactor entry emitted by the #[reactor] attribute macro. The package!() shell walks inventory::iter::<ReactorEntry> at FFI call time to produce Vec<ReactorPackageMetadata> for get_reactor_metadata.
ReactorRegistration
Runtime-side description of a reactor.
RetryPolicy
Comprehensive retry policy configuration for tasks.
RetryPolicyBuilder
Builder for creating RetryPolicy instances with a fluent API.
TaskEntry
Task entry emitted by #[task]. The package!() shell walks inventory::iter::<TaskEntry> to build the per-task metadata in get_task_metadata and to dispatch task execution by name in execute_task.
TaskNamespace
Hierarchical namespace for task identification and isolation.
TriggerEntry
Trigger entry emitted by #[trigger]. The package!() shell walks inventory::iter::<TriggerEntry> to populate get_trigger_metadata, calling each entry’s constructor and querying the resulting trigger’s name / poll_interval / cron_expression / allow_concurrent. (T-0552 — relocated from cloacina so packaged cdylibs can reach it.)
TriggerlessGraphEntry
Trigger-less computation graph entry emitted by #[computation_graph] for graphs declared without a trigger = reactor(...) clause. These graphs operate on Context<Value> rather than InputCache and are invoked directly by workflow tasks. (T-0552 — relocated from cloacina.)
TriggerlessGraphRegistration
Runtime-side description of a trigger-less computation graph.

Enums§

BackoffStrategy
Different backoff strategies for calculating retry delays.
CheckpointError
Errors that can occur during task checkpointing.
ComputationReactionMode
How a reactor decides when to fire.
CronError
Errors that can occur during cron evaluation.
RetryCondition
Conditions that determine whether a failed task should be retried.
TaskError
Errors that can occur during task execution.
TaskState
Represents the execution state of a task throughout its lifecycle.
TriggerResult
Result of a trigger poll operation.

Traits§

Graph
Compile-time handle for a computation graph declaration.
Reactor
Compile-time handle for a reactor declaration.
Task
Core trait that defines an executable task in a pipeline.
Trigger
Core trait for user-defined triggers.
TriggerlessGraph
Compile-time link from a Graph handle to its trigger-less invocation surface.

Functions§

parse_namespace
Parse a namespace string back into a TaskNamespace.

Type Aliases§

ReactorConstructor
TriggerlessGraphFn
The compiled function emitted for a trigger-less computation graph.

Attribute Macros§

batch_accumulator
Define a batch accumulator (buffers events, flushes on timer or size threshold).
computation_graph
Define a computation graph as a module containing async node functions.
passthrough_accumulator
Define a passthrough accumulator (socket-only, no event loop).
polling_accumulator
Define a polling accumulator (timer-based, queries pull-based sources).
reactor
Declare a reactor as a unit struct.
stream_accumulator
Define a stream-backed accumulator.
task
Define a task with retry policies and trigger rules.
trigger
Define a trigger that fires a workflow on a schedule or condition.
workflow
Define a workflow as a module containing #[task] functions.