Expand description
§Cloacina: Embedded Pipeline Framework for Rust
Cloacina is a library for building resilient task pipelines directly within your Rust applications. Unlike standalone orchestration services (Airflow, Prefect), Cloacina embeds into your existing applications to manage complex multi-step workflows with automatic retry, state persistence, and dependency resolution.
§What Cloacina Is
- Embedded Framework: Integrates directly into your Rust applications
- Resilient Execution: Automatic retries, failure recovery, and state persistence
- Type-Safe Workflows: Compile-time validation of task dependencies and data flow
- Database-Backed: Uses PostgreSQL or SQLite for reliable state management
- Multi-Tenant Ready: PostgreSQL schema-based isolation for complete tenant separation
- Async-First: Built on tokio for high-performance concurrent execution
- Content-Versioned: Automatic workflow versioning based on task code and structure
§What Cloacina Is Not
- Standalone Service: Not a separate process you deploy and manage
- Distributed Scheduler: Doesn’t coordinate tasks across multiple machines
- Web Platform: No built-in UI or REST API (though you can build one)
- Cron Replacement: Use proper schedulers for time-based triggering
- Message Queue: Not designed for high-throughput message processing
- State Machine: Focuses on task execution rather than state transitions
§Perfect For
- Data Processing Applications: ETL pipelines within your data services
- Background Job Processing: Complex multi-step jobs in web applications
- Batch Processing: Resilient batch operations with dependency management
- Integration Workflows: Multi-step API integrations with error recovery
- Report Generation: Multi-step report creation with data dependencies
- Data Migration: Complex data transformation and loading operations
§Quick Start Tutorial
§Your First Task
Define a task using the #[task] macro:
use cloacina::*;
#[task(
id = "process_data",
dependencies = []
)]
async fn process_data(context: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
// Your business logic here
context.insert("processed", serde_json::json!(true))?;
println!("Data processed successfully!");
Ok(())
}§Building a Workflow
Create workflows with the workflow! macro:
use cloacina::*;
#[task(id = "extract", dependencies = [])]
async fn extract_data(ctx: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
ctx.insert("raw_data", serde_json::json!({"users": [1, 2, 3]}))?;
Ok(())
}
#[task(id = "transform", dependencies = ["extract"])]
async fn transform_data(ctx: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
if let Some(data) = ctx.get("raw_data") {
ctx.insert("transformed_data", serde_json::json!({"processed": data}))?;
}
Ok(())
}
#[task(id = "load", dependencies = ["transform"])]
async fn load_data(ctx: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
println!("Loading data to warehouse...");
Ok(())
}
// Create the workflow
let workflow = workflow! {
name: "etl_pipeline",
description: "Extract, Transform, Load pipeline",
tasks: [extract_data, transform_data, load_data]
};§Execution with Database Persistence
use cloacina::runner::{DefaultRunner, PipelineExecutor};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize executor with database connection
let runner = DefaultRunner::new("postgresql://user:pass@localhost/mydb").await?;
// Execute workflow with automatic state persistence
let context = Context::new();
let result = executor.execute("etl_pipeline", context).await?;
println!("Pipeline completed: {:?}", result.status);
Ok(())
}§Multi-Tenant Support
Cloacina provides complete tenant isolation with zero collision risk:
§PostgreSQL Schema-Based Multi-Tenancy
use cloacina::runner::DefaultRunner;
// Each tenant gets their own PostgreSQL schema
let tenant_a = DefaultRunner::with_schema(
"postgresql://user:pass@localhost/cloacina",
"tenant_a"
).await?;
let tenant_b = DefaultRunner::with_schema(
"postgresql://user:pass@localhost/cloacina",
"tenant_b"
).await?;
// Or using the builder pattern
let runner = DefaultRunner::builder()
.database_url("postgresql://user:pass@localhost/cloacina")
.schema("my_tenant")
.build()
.await?;§SQLite File-Based Multi-Tenancy
// Each tenant gets their own database file
let tenant_a = DefaultRunner::new("sqlite://./tenant_a.db").await?;
let tenant_b = DefaultRunner::new("sqlite://./tenant_b.db").await?;Benefits:
- Zero collision risk - Impossible for tenants to access each other’s data
- No query changes - All existing DAL code works unchanged
- Performance - No overhead from filtering every query
- Clean separation - Each tenant can even have different schema versions
§Core Concepts
§Tasks
Tasks are the fundamental units of work. Use the #[task] macro for the most convenient definition:
#[task(
id = "my_task",
dependencies = ["other_task_id"],
retry_policy = RetryPolicy::builder()
.max_attempts(3)
.build()
)]
async fn my_task(context: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
// Task implementation
Ok(())
}Task attributes:
id: Unique identifier for the taskdependencies: List of task IDs that must complete before this task runsretry_policy: Optional configuration for automatic retriestrigger_rules: Optional conditions for task execution
§Context
The Context is a type-safe container for sharing data between tasks:
// Insert data
context.insert("user_id", serde_json::json!(12345))?;
context.insert("config", serde_json::json!({"env": "prod"}))?;
// Read data in later tasks
if let Some(user_id) = context.get("user_id") {
println!("Processing for user: {}", user_id);
}Context features:
- Type-safe serialization with serde_json
- Atomic updates for data consistency
- Automatic persistence to database
- Thread-safe access patterns
§Dependency Management
Cloacina automatically resolves task dependencies and executes them in the correct order:
#[task(id = "a", dependencies = [])]
async fn task_a(_: &mut Context<serde_json::Value>) -> Result<(), TaskError> { Ok(()) }
#[task(id = "b", dependencies = ["a"])] // Runs after A
async fn task_b(_: &mut Context<serde_json::Value>) -> Result<(), TaskError> { Ok(()) }
#[task(id = "c", dependencies = ["a", "b"])] // Runs after A and B
async fn task_c(_: &mut Context<serde_json::Value>) -> Result<(), TaskError> { Ok(()) }
// Execution order: A → B → CDependency features:
- Automatic cycle detection
- Parallel execution of independent tasks
- Runtime validation of dependency chains
- Visual dependency graph generation
§Architecture Overview
graph TB
subgraph "Your Application"
A[Task Definitions]
B[Workflow Builder]
C[Pipeline Execution]
end
subgraph "Cloacina Core"
D[Task Registry]
E[Context Management]
F[Dependency Resolution]
G[Scheduler Engine]
end
subgraph "Persistence Layer"
H[Data Access Layer]
I[Database Models]
J[PostgreSQL Database]
end
A --> D
B --> F
C --> G
D --> H
E --> H
F --> H
G --> H
H --> I
I --> J§Task Execution Lifecycle
sequenceDiagram
participant App as Your App
participant W as Workflow
participant S as Scheduler
participant T as Task
participant DB as Database
App->>W: Build workflow
W->>W: Validate dependencies
App->>S: Execute workflow
S->>DB: Load/create execution state
S->>S: Plan execution order
loop For each task level
S->>T: Execute task(s)
T->>T: Process data
T->>DB: Persist context updates
T->>S: Signal completion
end
S->>App: Return results§Error Handling and Retries
Configure retry policies for resilient execution:
use std::time::Duration;
#[task(
id = "network_task",
dependencies = [],
retry_policy = RetryPolicy::builder()
.max_attempts(3)
.initial_delay(Duration::from_secs(1))
.backoff_strategy(BackoffStrategy::Exponential { base: 2.0, multiplier: 1.0 })
.retry_condition(RetryCondition::TransientOnly)
.build()
)]
async fn network_task(ctx: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
// Network operation that might fail and will be retried
Ok(())
}Retry features:
- Configurable backoff strategies
- Conditional retry based on error type
- Maximum attempt limits
- Custom retry conditions
- Exponential and linear backoff
§Conditional Execution
Use trigger rules for conditional task execution:
#[task(
id = "conditional_task",
dependencies = ["validation_task"],
trigger_rules = serde_json::json!({
"type": "Conditional",
"condition": {
"field": "validation_passed",
"operator": "Equals",
"value": true
}
})
)]
async fn conditional_task(ctx: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
// Only runs if validation_passed == true in context
Ok(())
}Trigger rule features:
- Field-based conditions
- Multiple operators (Equals, NotEquals, GreaterThan, etc.)
- Complex boolean expressions
- Context-aware evaluation
§Database Setup
Cloacina requires PostgreSQL for state persistence:
# Create database
createdb myapp_pipelines
# Set connection string
export DATABASE_URL="postgresql://user:password@localhost/myapp_pipelines"Database features:
- Automatic schema migrations
- Transaction support
- Connection pooling
- Execution history tracking
- State persistence
§Feature Overview
- Type Safety: Leverages Rust’s type system for compile-time guarantees
- Content-Based Versioning: Automatic workflow versioning based on task code and structure
- Async/Await: Built on tokio for high-performance async execution
- Database Integration: PostgreSQL support with Diesel ORM
- Dependency Resolution: Automatic topological sorting and cycle detection
- Error Recovery: Comprehensive error types and checkpoint support
- Macro Support: Convenient procedural macros for task definition with code fingerprinting
- Logging: Structured logging with configurable levels
- Metrics: Built-in performance monitoring
- Testing: Comprehensive test utilities and mocks
§Documentation Navigation
§Learn Cloacina (Tutorials)
- Your First Pipeline - Start here for macro-based task creation
- Multi-Task Workflows - Building complex dependency chains
- Working with Data - Context management and serialization
- Error Handling - Retry policies and failure recovery
§Solve Problems (How-To Guides)
- Task Patterns - Common task implementation patterns
- [Testing Strategies] - Unit and integration testing approaches
- Database Operations - Working with execution history
- Error Recovery - Handling partial failures and recovery
§API Reference
Task- Core task trait and macroWorkflow- Pipeline construction and managementContext- Data container for inter-task communicationTaskScheduler- Execution engine and scheduling- Error Types - Complete error hierarchy
§Understand the Design (Explanations)
- Architecture Decisions - Why Cloacina works the way it does
- Execution Model - How tasks are scheduled and run
- Versioning Strategy - Content-based workflow versioning
- Recovery Mechanisms - Checkpoint and restart concepts
§Modules
context: Context management for sharing data between tasks- [
task]: Core task trait and registry functionality - [
workflow]: Workflow construction and dependency management registry: Dynamic workflow package loading and storagedatabase: Database connection and persistenceerror: Comprehensive error typesmodels: Database models and schemasdal: Data access layertask_scheduler: Task scheduler for persistent workflow executionexecutor: Unified execution enginelogging: Structured logging setupretry: Retry policies and backoff strategies
Re-exports§
pub extern crate cloacina_workflow;
Re-exports§
pub use logging::init_logging;pub use database::connection::Database;pub use cron_evaluator::CronError;pub use cron_evaluator::CronEvaluator;pub use cron_recovery::CronRecoveryConfig;pub use cron_recovery::CronRecoveryService;pub use cron_scheduler::CronScheduler;pub use cron_scheduler::CronSchedulerConfig;pub use database::AdminError;pub use database::DatabaseAdmin;pub use database::TenantConfig;pub use database::TenantCredentials;pub use database::UniversalBool;pub use database::UniversalTimestamp;pub use database::UniversalUuid;pub use dispatcher::DefaultDispatcher;pub use dispatcher::DispatchError;pub use dispatcher::Dispatcher;pub use dispatcher::ExecutionResult;pub use dispatcher::ExecutionStatus;pub use dispatcher::ExecutorMetrics;pub use dispatcher::RoutingConfig;pub use dispatcher::RoutingRule;pub use dispatcher::TaskExecutor;pub use dispatcher::TaskReadyEvent;pub use error::ContextError;pub use error::ExecutorError;pub use error::RegistrationError;pub use error::SubgraphError;pub use error::ValidationError;pub use error::WorkflowError;pub use executor::ExecutorConfig;pub use executor::PipelineError;pub use executor::PipelineExecution;pub use executor::PipelineExecutor;pub use executor::PipelineResult;pub use executor::PipelineStatus;pub use executor::TaskResult;pub use executor::ThreadTaskExecutor;pub use graph::DependencyEdge;pub use graph::GraphEdge;pub use graph::GraphMetadata;pub use graph::GraphNode;pub use graph::TaskNode;pub use graph::WorkflowGraph;pub use graph::WorkflowGraphData;pub use runner::DefaultRunnerBuilder;pub use runner::DefaultRunner;pub use runner::DefaultRunnerConfig;pub use task::global_task_registry;pub use task::register_task_constructor;pub use task::TaskRegistry;pub use task_scheduler::TaskScheduler;pub use task_scheduler::TriggerCondition;pub use task_scheduler::TriggerRule;pub use task_scheduler::ValueOperator;pub use trigger::get_trigger;pub use trigger::global_trigger_registry;pub use trigger::register_trigger;pub use trigger::register_trigger_constructor;pub use trigger::Trigger;pub use trigger::TriggerConfig;pub use trigger::TriggerError;pub use trigger::TriggerResult;pub use trigger_scheduler::TriggerScheduler;pub use trigger_scheduler::TriggerSchedulerConfig;pub use workflow::get_all_workflows;pub use workflow::global_workflow_registry;pub use workflow::register_workflow_constructor;pub use workflow::DependencyGraph;pub use workflow::Workflow;pub use workflow::WorkflowBuilder;pub use workflow::WorkflowMetadata;
Modules§
- context
- Context Management
- cron_
evaluator - Timezone-aware cron expression evaluator
- cron_
recovery - Cron execution recovery service for handling lost executions.
- cron_
scheduler - Cron scheduler for time-based workflow execution.
- dal
- Data Access Layer with runtime backend selection
- database
- Database Layer
- dispatcher
- Dispatcher Layer for Executor Decoupling
- error
- Error Types
- executor
- Task Executor
- graph
- Workflow Graph Data Structures
- logging
- Logging Configuration
- models
- Database Models
- packaging
- Workflow packaging functionality for creating distributable workflow packages.
- prelude
- Prelude module for convenient imports.
- registry
- Workflow Registry
- retry
- Retry Policy System
- runner
- Workflow runners for executing complete pipelines and workflows.
- task
- Task Management
- task_
scheduler - Task Scheduler
- trigger
- Trigger System
- trigger_
scheduler - Trigger scheduler for event-based workflow execution.
- workflow
- Workflow Management
Macros§
- backend_
dispatch - Helper macro for dispatching operations based on backend type.
- connection_
match - Helper macro for matching on AnyConnection variants.
- dispatch_
backend - Dispatches to backend-specific code based on compile-time features.
- workflow
Structs§
- Context
- A context that holds data for pipeline execution.
- Retry
Policy - Comprehensive retry policy configuration for tasks.
- Retry
Policy Builder - Builder for creating RetryPolicy instances with a fluent API.
- Task
Namespace - Hierarchical namespace for task identification and isolation.
Enums§
- Backoff
Strategy - Different backoff strategies for calculating retry delays.
- Checkpoint
Error - Errors that can occur during task checkpointing.
- Retry
Condition - Conditions that determine whether a failed task should be retried.
- Task
Error - Errors that can occur during task execution.
- Task
State - Represents the execution state of a task throughout its lifecycle.
Traits§
- Task
- Core trait that defines an executable task in a pipeline.
Functions§
- parse_
namespace - Parse a namespace string back into a TaskNamespace.