Crate cloacina

Crate cloacina 

Source
Expand description

§Cloacina: Embedded Pipeline Framework for Rust

Cloacina is a library for building resilient task pipelines directly within your Rust applications. Unlike standalone orchestration services (Airflow, Prefect), Cloacina embeds into your existing applications to manage complex multi-step workflows with automatic retry, state persistence, and dependency resolution.

§What Cloacina Is

  • Embedded Framework: Integrates directly into your Rust applications
  • Resilient Execution: Automatic retries, failure recovery, and state persistence
  • Type-Safe Workflows: Compile-time validation of task dependencies and data flow
  • Database-Backed: Uses PostgreSQL or SQLite for reliable state management
  • Multi-Tenant Ready: PostgreSQL schema-based isolation for complete tenant separation
  • Async-First: Built on tokio for high-performance concurrent execution
  • Content-Versioned: Automatic workflow versioning based on task code and structure

§What Cloacina Is Not

  • Standalone Service: Not a separate process you deploy and manage
  • Distributed Scheduler: Doesn’t coordinate tasks across multiple machines
  • Web Platform: No built-in UI or REST API (though you can build one)
  • Cron Replacement: Use proper schedulers for time-based triggering
  • Message Queue: Not designed for high-throughput message processing
  • State Machine: Focuses on task execution rather than state transitions

§Perfect For

  • Data Processing Applications: ETL pipelines within your data services
  • Background Job Processing: Complex multi-step jobs in web applications
  • Batch Processing: Resilient batch operations with dependency management
  • Integration Workflows: Multi-step API integrations with error recovery
  • Report Generation: Multi-step report creation with data dependencies
  • Data Migration: Complex data transformation and loading operations

§Quick Start Tutorial

§Your First Task

Define a task using the #[task] macro:

use cloacina::*;

#[task(
    id = "process_data",
    dependencies = []
)]
async fn process_data(context: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
    // Your business logic here
    context.insert("processed", serde_json::json!(true))?;
    println!("Data processed successfully!");
    Ok(())
}

§Building a Workflow

Create workflows with the workflow! macro:

use cloacina::*;

#[task(id = "extract", dependencies = [])]
async fn extract_data(ctx: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
    ctx.insert("raw_data", serde_json::json!({"users": [1, 2, 3]}))?;
    Ok(())
}

#[task(id = "transform", dependencies = ["extract"])]
async fn transform_data(ctx: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
    if let Some(data) = ctx.get("raw_data") {
        ctx.insert("transformed_data", serde_json::json!({"processed": data}))?;
    }
    Ok(())
}

#[task(id = "load", dependencies = ["transform"])]
async fn load_data(ctx: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
    println!("Loading data to warehouse...");
    Ok(())
}

// Create the workflow
let workflow = workflow! {
    name: "etl_pipeline",
    description: "Extract, Transform, Load pipeline",
    tasks: [extract_data, transform_data, load_data]
};

§Execution with Database Persistence

use cloacina::runner::{DefaultRunner, PipelineExecutor};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Initialize executor with database connection
    let runner = DefaultRunner::new("postgresql://user:pass@localhost/mydb").await?;

    // Execute workflow with automatic state persistence
    let context = Context::new();
    let result = executor.execute("etl_pipeline", context).await?;

    println!("Pipeline completed: {:?}", result.status);
    Ok(())
}

§Multi-Tenant Support

Cloacina provides complete tenant isolation with zero collision risk:

§PostgreSQL Schema-Based Multi-Tenancy

use cloacina::runner::DefaultRunner;

// Each tenant gets their own PostgreSQL schema
let tenant_a = DefaultRunner::with_schema(
    "postgresql://user:pass@localhost/cloacina",
    "tenant_a"
).await?;

let tenant_b = DefaultRunner::with_schema(
    "postgresql://user:pass@localhost/cloacina",
    "tenant_b"
).await?;

// Or using the builder pattern
let runner = DefaultRunner::builder()
    .database_url("postgresql://user:pass@localhost/cloacina")
    .schema("my_tenant")
    .build()
    .await?;

§SQLite File-Based Multi-Tenancy

// Each tenant gets their own database file
let tenant_a = DefaultRunner::new("sqlite://./tenant_a.db").await?;
let tenant_b = DefaultRunner::new("sqlite://./tenant_b.db").await?;

Benefits:

  • Zero collision risk - Impossible for tenants to access each other’s data
  • No query changes - All existing DAL code works unchanged
  • Performance - No overhead from filtering every query
  • Clean separation - Each tenant can even have different schema versions

§Core Concepts

§Tasks

Tasks are the fundamental units of work. Use the #[task] macro for the most convenient definition:

#[task(
    id = "my_task",
    dependencies = ["other_task_id"],
    retry_policy = RetryPolicy::builder()
        .max_attempts(3)
        .build()
)]
async fn my_task(context: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
    // Task implementation
    Ok(())
}

Task attributes:

  • id: Unique identifier for the task
  • dependencies: List of task IDs that must complete before this task runs
  • retry_policy: Optional configuration for automatic retries
  • trigger_rules: Optional conditions for task execution

§Context

The Context is a type-safe container for sharing data between tasks:

// Insert data
context.insert("user_id", serde_json::json!(12345))?;
context.insert("config", serde_json::json!({"env": "prod"}))?;

// Read data in later tasks
if let Some(user_id) = context.get("user_id") {
    println!("Processing for user: {}", user_id);
}

Context features:

  • Type-safe serialization with serde_json
  • Atomic updates for data consistency
  • Automatic persistence to database
  • Thread-safe access patterns

§Dependency Management

Cloacina automatically resolves task dependencies and executes them in the correct order:

#[task(id = "a", dependencies = [])]
async fn task_a(_: &mut Context<serde_json::Value>) -> Result<(), TaskError> { Ok(()) }

#[task(id = "b", dependencies = ["a"])]  // Runs after A
async fn task_b(_: &mut Context<serde_json::Value>) -> Result<(), TaskError> { Ok(()) }

#[task(id = "c", dependencies = ["a", "b"])]  // Runs after A and B
async fn task_c(_: &mut Context<serde_json::Value>) -> Result<(), TaskError> { Ok(()) }

// Execution order: A → B → C

Dependency features:

  • Automatic cycle detection
  • Parallel execution of independent tasks
  • Runtime validation of dependency chains
  • Visual dependency graph generation

§Architecture Overview

graph TB
    subgraph "Your Application"
        A[Task Definitions]
        B[Workflow Builder]
        C[Pipeline Execution]
    end

    subgraph "Cloacina Core"
        D[Task Registry]
        E[Context Management]
        F[Dependency Resolution]
        G[Scheduler Engine]
    end

    subgraph "Persistence Layer"
        H[Data Access Layer]
        I[Database Models]
        J[PostgreSQL Database]
    end

    A --> D
    B --> F
    C --> G
    D --> H
    E --> H
    F --> H
    G --> H
    H --> I
    I --> J

§Task Execution Lifecycle

sequenceDiagram
    participant App as Your App
    participant W as Workflow
    participant S as Scheduler
    participant T as Task
    participant DB as Database

    App->>W: Build workflow
    W->>W: Validate dependencies
    App->>S: Execute workflow
    S->>DB: Load/create execution state
    S->>S: Plan execution order
    loop For each task level
        S->>T: Execute task(s)
        T->>T: Process data
        T->>DB: Persist context updates
        T->>S: Signal completion
    end
    S->>App: Return results

§Error Handling and Retries

Configure retry policies for resilient execution:

use std::time::Duration;

#[task(
    id = "network_task",
    dependencies = [],
    retry_policy = RetryPolicy::builder()
        .max_attempts(3)
        .initial_delay(Duration::from_secs(1))
        .backoff_strategy(BackoffStrategy::Exponential { base: 2.0, multiplier: 1.0 })
        .retry_condition(RetryCondition::TransientOnly)
        .build()
)]
async fn network_task(ctx: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
    // Network operation that might fail and will be retried
    Ok(())
}

Retry features:

  • Configurable backoff strategies
  • Conditional retry based on error type
  • Maximum attempt limits
  • Custom retry conditions
  • Exponential and linear backoff

§Conditional Execution

Use trigger rules for conditional task execution:

#[task(
    id = "conditional_task",
    dependencies = ["validation_task"],
    trigger_rules = serde_json::json!({
        "type": "Conditional",
        "condition": {
            "field": "validation_passed",
            "operator": "Equals",
            "value": true
        }
    })
)]
async fn conditional_task(ctx: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
    // Only runs if validation_passed == true in context
    Ok(())
}

Trigger rule features:

  • Field-based conditions
  • Multiple operators (Equals, NotEquals, GreaterThan, etc.)
  • Complex boolean expressions
  • Context-aware evaluation

§Database Setup

Cloacina requires PostgreSQL for state persistence:

# Create database
createdb myapp_pipelines

# Set connection string
export DATABASE_URL="postgresql://user:password@localhost/myapp_pipelines"

Database features:

  • Automatic schema migrations
  • Transaction support
  • Connection pooling
  • Execution history tracking
  • State persistence

§Feature Overview

  • Type Safety: Leverages Rust’s type system for compile-time guarantees
  • Content-Based Versioning: Automatic workflow versioning based on task code and structure
  • Async/Await: Built on tokio for high-performance async execution
  • Database Integration: PostgreSQL support with Diesel ORM
  • Dependency Resolution: Automatic topological sorting and cycle detection
  • Error Recovery: Comprehensive error types and checkpoint support
  • Macro Support: Convenient procedural macros for task definition with code fingerprinting
  • Logging: Structured logging with configurable levels
  • Metrics: Built-in performance monitoring
  • Testing: Comprehensive test utilities and mocks

§Documentation Navigation

§Learn Cloacina (Tutorials)

§Solve Problems (How-To Guides)

§API Reference

  • Task - Core task trait and macro
  • Workflow - Pipeline construction and management
  • Context - Data container for inter-task communication
  • TaskScheduler - Execution engine and scheduling
  • Error Types - Complete error hierarchy

§Understand the Design (Explanations)

§Modules

  • context: Context management for sharing data between tasks
  • [task]: Core task trait and registry functionality
  • [workflow]: Workflow construction and dependency management
  • registry: Dynamic workflow package loading and storage
  • database: Database connection and persistence
  • error: Comprehensive error types
  • models: Database models and schemas
  • dal: Data access layer
  • task_scheduler: Task scheduler for persistent workflow execution
  • executor: Unified execution engine
  • logging: Structured logging setup
  • retry: Retry policies and backoff strategies

Re-exports§

pub extern crate cloacina_workflow;

Re-exports§

pub use logging::init_logging;
pub use database::connection::Database;
pub use cron_evaluator::CronError;
pub use cron_evaluator::CronEvaluator;
pub use cron_recovery::CronRecoveryConfig;
pub use cron_recovery::CronRecoveryService;
pub use cron_scheduler::CronScheduler;
pub use cron_scheduler::CronSchedulerConfig;
pub use database::AdminError;
pub use database::DatabaseAdmin;
pub use database::TenantConfig;
pub use database::TenantCredentials;
pub use database::UniversalBool;
pub use database::UniversalTimestamp;
pub use database::UniversalUuid;
pub use dispatcher::DefaultDispatcher;
pub use dispatcher::DispatchError;
pub use dispatcher::Dispatcher;
pub use dispatcher::ExecutionResult;
pub use dispatcher::ExecutionStatus;
pub use dispatcher::ExecutorMetrics;
pub use dispatcher::RoutingConfig;
pub use dispatcher::RoutingRule;
pub use dispatcher::TaskExecutor;
pub use dispatcher::TaskReadyEvent;
pub use error::ContextError;
pub use error::ExecutorError;
pub use error::RegistrationError;
pub use error::SubgraphError;
pub use error::ValidationError;
pub use error::WorkflowError;
pub use executor::ExecutorConfig;
pub use executor::PipelineError;
pub use executor::PipelineExecution;
pub use executor::PipelineExecutor;
pub use executor::PipelineResult;
pub use executor::PipelineStatus;
pub use executor::TaskResult;
pub use executor::ThreadTaskExecutor;
pub use graph::DependencyEdge;
pub use graph::GraphEdge;
pub use graph::GraphMetadata;
pub use graph::GraphNode;
pub use graph::TaskNode;
pub use graph::WorkflowGraph;
pub use graph::WorkflowGraphData;
pub use runner::DefaultRunnerBuilder;
pub use runner::DefaultRunner;
pub use runner::DefaultRunnerConfig;
pub use task::global_task_registry;
pub use task::register_task_constructor;
pub use task::TaskRegistry;
pub use task_scheduler::TaskScheduler;
pub use task_scheduler::TriggerCondition;
pub use task_scheduler::TriggerRule;
pub use task_scheduler::ValueOperator;
pub use trigger::get_trigger;
pub use trigger::global_trigger_registry;
pub use trigger::register_trigger;
pub use trigger::register_trigger_constructor;
pub use trigger::Trigger;
pub use trigger::TriggerConfig;
pub use trigger::TriggerError;
pub use trigger::TriggerResult;
pub use trigger_scheduler::TriggerScheduler;
pub use trigger_scheduler::TriggerSchedulerConfig;
pub use workflow::get_all_workflows;
pub use workflow::global_workflow_registry;
pub use workflow::register_workflow_constructor;
pub use workflow::DependencyGraph;
pub use workflow::Workflow;
pub use workflow::WorkflowBuilder;
pub use workflow::WorkflowMetadata;

Modules§

context
Context Management
cron_evaluator
Timezone-aware cron expression evaluator
cron_recovery
Cron execution recovery service for handling lost executions.
cron_scheduler
Cron scheduler for time-based workflow execution.
dal
Data Access Layer with runtime backend selection
database
Database Layer
dispatcher
Dispatcher Layer for Executor Decoupling
error
Error Types
executor
Task Executor
graph
Workflow Graph Data Structures
logging
Logging Configuration
models
Database Models
packaging
Workflow packaging functionality for creating distributable workflow packages.
prelude
Prelude module for convenient imports.
registry
Workflow Registry
retry
Retry Policy System
runner
Workflow runners for executing complete pipelines and workflows.
task
Task Management
task_scheduler
Task Scheduler
trigger
Trigger System
trigger_scheduler
Trigger scheduler for event-based workflow execution.
workflow
Workflow Management

Macros§

backend_dispatch
Helper macro for dispatching operations based on backend type.
connection_match
Helper macro for matching on AnyConnection variants.
dispatch_backend
Dispatches to backend-specific code based on compile-time features.
workflow

Structs§

Context
A context that holds data for pipeline execution.
RetryPolicy
Comprehensive retry policy configuration for tasks.
RetryPolicyBuilder
Builder for creating RetryPolicy instances with a fluent API.
TaskNamespace
Hierarchical namespace for task identification and isolation.

Enums§

BackoffStrategy
Different backoff strategies for calculating retry delays.
CheckpointError
Errors that can occur during task checkpointing.
RetryCondition
Conditions that determine whether a failed task should be retried.
TaskError
Errors that can occur during task execution.
TaskState
Represents the execution state of a task throughout its lifecycle.

Traits§

Task
Core trait that defines an executable task in a pipeline.

Functions§

parse_namespace
Parse a namespace string back into a TaskNamespace.

Attribute Macros§

packaged_workflow
task