oxigdal-workflow

A DAG-based workflow engine for complex geospatial processing pipelines. Orchestrate multi-stage geospatial workflows with automatic parallelization, flexible scheduling, state persistence, and comprehensive monitoring.
Overview
oxigdal-workflow provides a production-grade workflow orchestration system designed for geospatial data processing. Build complex pipelines with directed acyclic graphs (DAGs), execute tasks in parallel, handle failures gracefully, and monitor execution with detailed metrics.
Features
- DAG-Based Execution: Define workflows as directed acyclic graphs for automatic dependency resolution and parallelization
- Flexible Scheduling: Cron-based, interval-based, and event-driven task scheduling with timezone support
- Automatic Parallelization: Intelligent task scheduling maximizes parallelism based on task dependencies
- Retry Logic & Resilience: Configurable retry policies with exponential backoff and failure recovery
- State Persistence: Save and restore workflow execution state for fault tolerance and recovery
- Conditional Execution: Dynamic branching and conditional task execution based on previous results
- Workflow Templates: Reusable workflow templates with parameterization and versioning
- Comprehensive Monitoring: Detailed execution metrics, logging, and debugging capabilities
- Resource Management: Resource requirement specification and tracking with pool-based scheduling
- External Integrations: Adapters for Airflow, Prefect, Temporal, and HTTP webhooks (with
integrations feature)
- HTTP Server: RESTful API for workflow management and monitoring (with
server feature)
- Pure Rust: 100% Pure Rust implementation with no C/Fortran dependencies
Installation
Add to your Cargo.toml:
[dependencies]
oxigdal-workflow = "0.1.2"
oxigdal-workflow = { version = "0.1.2", features = ["integrations"] }
oxigdal-workflow = { version = "0.1.2", features = ["server"] }
oxigdal-workflow = { version = "0.1.2", features = ["full"] }
Quick Start
Basic Workflow Definition
use oxigdal_workflow::{
WorkflowDefinition,
dag::{TaskNode, WorkflowDag, RetryPolicy, ResourceRequirements},
};
use std::collections::HashMap;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut dag = WorkflowDag::new();
let task1 = TaskNode {
id: "fetch_data".to_string(),
name: "Fetch Data".to_string(),
description: Some("Fetch input data".to_string()),
config: serde_json::json!({"source": "S3"}),
retry: RetryPolicy::default(),
timeout_secs: Some(300),
resources: ResourceRequirements::default(),
metadata: HashMap::new(),
};
let task2 = TaskNode {
id: "process_data".to_string(),
name: "Process Data".to_string(),
description: Some("Process and transform data".to_string()),
config: serde_json::json!({"algorithm": "NDVI"}),
retry: RetryPolicy::default(),
timeout_secs: Some(600),
resources: ResourceRequirements::default(),
metadata: HashMap::new(),
};
dag.add_task(task1)?;
dag.add_task(task2)?;
dag.add_dependency("fetch_data", "process_data", Default::default())?;
let workflow = WorkflowDefinition {
id: "geospatial-pipeline".to_string(),
name: "Geospatial Processing Pipeline".to_string(),
description: Some("Process satellite imagery".to_string()),
version: "1.0.0".to_string(),
dag,
};
println!("Workflow defined: {}", workflow.name);
Ok(())
}
Workflow Scheduling
use oxigdal_workflow::{
WorkflowDefinition,
scheduler::{Scheduler, ScheduleType, SchedulerConfig},
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = SchedulerConfig {
max_concurrent_executions: 5,
handle_missed_executions: true,
max_missed_executions: 3,
execution_timeout_secs: 3600,
enable_persistence: true,
persistence_path: Some("./workflow_state".to_string()),
tick_interval_ms: 100,
timezone: "UTC".to_string(),
};
let scheduler = Scheduler::new(config);
let schedule_id = scheduler
.add_schedule(
workflow,
ScheduleType::Cron {
expression: "0 0 * * *".to_string(),
},
)
.await?;
println!("Workflow scheduled with ID: {}", schedule_id);
let schedule_id = scheduler
.add_schedule(
workflow,
ScheduleType::Interval {
interval_secs: 3600,
},
)
.await?;
println!("Workflow scheduled with ID: {}", schedule_id);
Ok(())
}
Parallel Execution Planning
use oxigdal_workflow::dag::{WorkflowDag, create_execution_plan};
fn main() -> Result<(), Box<dyn std::error::Error>> {
let dag = WorkflowDag::new();
let execution_plan = create_execution_plan(&dag)?;
for (level_idx, level) in execution_plan.iter().enumerate() {
println!("Level {}: {:?} (can run in parallel)", level_idx, level);
}
Ok(())
}
Usage Examples
Example 1: Batch Processing Workflow
use oxigdal_workflow::{
WorkflowDefinition,
dag::{TaskNode, WorkflowDag, TaskEdge},
monitoring::MonitoringService,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut dag = WorkflowDag::new();
dag.add_task(TaskNode {
id: "list_files".to_string(),
name: "List Input Files".to_string(),
..Default::default()
})?;
for i in 0..8 {
dag.add_task(TaskNode {
id: format!("process_batch_{}", i),
name: format!("Process Batch {}", i),
..Default::default()
})?;
}
dag.add_task(TaskNode {
id: "merge_results".to_string(),
name: "Merge Results".to_string(),
..Default::default()
})?;
for i in 0..8 {
dag.add_dependency("list_files", &format!("process_batch_{}", i), TaskEdge::default())?;
dag.add_dependency(&format!("process_batch_{}", i), "merge_results", TaskEdge::default())?;
}
let workflow = WorkflowDefinition {
id: "batch-processing".to_string(),
name: "Batch Processing Workflow".to_string(),
description: Some("Process multiple files in parallel".to_string()),
version: "1.0.0".to_string(),
dag,
};
let monitoring = MonitoringService::new();
println!("Monitoring initialized");
Ok(())
}
Example 2: Satellite Imagery Processing
use oxigdal_workflow::{
WorkflowDefinition,
dag::{TaskNode, WorkflowDag},
scheduler::{Scheduler, ScheduleType},
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut dag = WorkflowDag::new();
dag.add_task(TaskNode {
id: "download_imagery".to_string(),
name: "Download Imagery".to_string(),
..Default::default()
})?;
dag.add_task(TaskNode {
id: "atmospheric_correction".to_string(),
name: "Atmospheric Correction".to_string(),
..Default::default()
})?;
dag.add_task(TaskNode {
id: "cloud_masking".to_string(),
name: "Cloud Masking".to_string(),
..Default::default()
})?;
dag.add_task(TaskNode {
id: "calculate_ndvi".to_string(),
name: "Calculate NDVI".to_string(),
..Default::default()
})?;
dag.add_task(TaskNode {
id: "export_results".to_string(),
name: "Export Results".to_string(),
..Default::default()
})?;
let steps = vec![
("download_imagery", "atmospheric_correction"),
("atmospheric_correction", "cloud_masking"),
("cloud_masking", "calculate_ndvi"),
("calculate_ndvi", "export_results"),
];
for (from, to) in steps {
dag.add_dependency(from, to, Default::default())?;
}
let workflow = WorkflowDefinition {
id: "satellite-processing".to_string(),
name: "Satellite Imagery Processing".to_string(),
description: Some("Process Sentinel-2 satellite imagery".to_string()),
version: "1.0.0".to_string(),
dag,
};
let scheduler = Scheduler::with_defaults();
let schedule_id = scheduler
.add_schedule(
workflow,
ScheduleType::Cron {
expression: "0 0 * * *".to_string(), },
)
.await?;
println!("Scheduled workflow with ID: {}", schedule_id);
Ok(())
}
API Overview
Core Modules
| Module |
Description |
engine |
Workflow runtime, execution, and state management |
dag |
DAG construction, validation, and execution planning |
scheduler |
Cron and interval-based scheduling system |
monitoring |
Execution metrics, history, and observability |
conditional |
Conditional branching and dynamic execution |
templates |
Workflow templates and reusable patterns |
versioning |
Version management and backward compatibility |
integrations |
External system adapters (Airflow, Prefect, Kafka) |
error |
Error types and result handling |
Key Types
WorkflowDefinition
pub struct WorkflowDefinition {
pub id: String, pub name: String, pub description: Option<String>, pub version: String, pub dag: WorkflowDag, }
TaskNode
pub struct TaskNode {
pub id: String, pub name: String, pub description: Option<String>, pub config: serde_json::Value, pub retry: RetryPolicy, pub timeout_secs: Option<u64>, pub resources: ResourceRequirements, pub metadata: HashMap<String, String>, }
WorkflowDag
impl WorkflowDag {
pub fn new() -> Self;
pub fn add_task(&mut self, task: TaskNode) -> Result<()>;
pub fn add_dependency(&mut self, from: &str, to: &str, edge: TaskEdge) -> Result<()>;
pub fn validate(&self) -> Result<()>;
pub fn tasks(&self) -> Vec<&TaskNode>;
}
Scheduler
impl Scheduler {
pub fn new(config: SchedulerConfig) -> Self;
pub async fn add_schedule(&self, workflow: WorkflowDefinition, schedule: ScheduleType) -> Result<String>;
pub fn get_schedules(&self) -> Vec<ScheduleInfo>;
pub async fn remove_schedule(&self, id: &str) -> Result<()>;
}
MonitoringService
impl MonitoringService {
pub fn new() -> Self;
pub fn record_task_execution(&self, task_id: &str, status: TaskStatus, duration: Duration);
pub fn get_metrics(&self) -> WorkflowMetrics;
pub fn get_execution_history(&self, workflow_id: &str) -> Vec<ExecutionHistory>;
}
Advanced Features
Conditional Execution
use oxigdal_workflow::conditional::{ConditionalBranch, Expression, ExecutionDecision};
let condition = ConditionalBranch {
id: "check_data_quality".to_string(),
expression: Expression::GreaterThan("quality_score".to_string(), 0.8),
true_task: "process_high_quality".to_string(),
false_task: Some("process_low_quality".to_string()),
};
Retry Policies
use oxigdal_workflow::dag::RetryPolicy;
let retry_policy = RetryPolicy {
max_attempts: 3,
initial_delay_ms: 1000,
max_delay_ms: 30000,
backoff_multiplier: 2.0,
jitter_factor: 0.1,
};
Resource Requirements
use oxigdal_workflow::dag::ResourceRequirements;
let resources = ResourceRequirements {
cpu_cores: Some(4),
memory_mb: Some(8192),
gpu_count: Some(1),
disk_gb: Some(50),
};
Workflow Templates
use oxigdal_workflow::templates::{WorkflowTemplate, WorkflowTemplateLibrary};
let template = WorkflowTemplate {
id: "geospatial_pipeline".to_string(),
name: "Geospatial Pipeline Template".to_string(),
description: Some("Reusable template for geospatial processing".to_string()),
version: "1.0.0".to_string(),
parameters: vec![
("source_url".to_string(), "string".to_string()),
("algorithm".to_string(), "enum(NDVI,EVI)".to_string()),
],
workflow_definition: workflow,
};
let mut library = WorkflowTemplateLibrary::new();
library.add_template(template)?;
Performance Characteristics
- Task Execution Overhead: <1ms per task (in-process scheduling)
- DAG Validation: O(V + E) where V=vertices, E=edges
- Execution Planning: O(V + E) topological sort
- Parallel Scheduling: Optimal for DAGs with high parallelism factor
- State Persistence: Configurable with optional disk-based durability
Benchmark Results
| Operation |
Time (avg) |
| DAG creation (100 tasks) |
~5ms |
| Execution plan generation |
~2ms |
| Task scheduling |
<0.5ms |
| Metrics aggregation |
~1ms |
Error Handling
All fallible operations return Result<T, WorkflowError> with comprehensive error types:
pub enum WorkflowError {
DagError(String), ScheduleError(String), ExecutionError(String), StateError(String), ConfigurationError(String), IntegrationError(String), }
Examples
Complete working examples are available in the examples directory:
Run examples with:
cargo run --example batch_processing_workflow
cargo run --example satellite_workflow
cargo run --example change_detection_workflow
Configuration
SchedulerConfig
pub struct SchedulerConfig {
pub max_concurrent_executions: usize, pub handle_missed_executions: bool, pub max_missed_executions: usize, pub execution_timeout_secs: u64, pub enable_persistence: bool, pub persistence_path: Option<String>, pub tick_interval_ms: u64, pub timezone: String, }
Integration with OxiGDAL Ecosystem
oxigdal-workflow is part of the OxiGDAL ecosystem for geospatial data processing:
Pure Rust
This library is 100% Pure Rust with no C/Fortran dependencies. All functionality works out of the box without external binaries or system libraries. Optional features may integrate with external systems via HTTP or message queues, but core functionality remains pure Rust.
Documentation
Contributing
Contributions are welcome! This project follows COOLJAPAN ecosystem standards:
- Pure Rust implementation required
- No unwrap() calls in production code
- Comprehensive error handling
- Full documentation with examples
- Tests for all public APIs
License
Licensed under the Apache License, Version 2.0 (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0).
Related Projects
- OxiGDAL Core - Core geospatial types and operations
- OxiBLAS - Pure Rust BLAS operations
- OxiCode - Pure Rust serialization
- SciRS2 - Scientific computing ecosystem
Part of the COOLJAPAN Ecosystem - Pure Rust Geospatial Processing