taskflow-rs 0.1.0

A high-performance, async-first task orchestration framework for Rust
Documentation

TaskFlow.rs

A modern, async task processing framework written in Rust for building scalable workflow systems with support for multiple task types, dependency management, and distributed execution.

Features

  • Multiple Task Types: HTTP requests, shell commands, and custom task handlers
  • Dependency Management: Tasks can depend on the completion of other tasks
  • Async Execution: Built on Tokio for high-performance async execution
  • Retry Mechanisms: Configurable retry policies with exponential backoff
  • Real-time Monitoring: Live task metrics and status tracking
  • Extensible Storage: Pluggable storage backends (in-memory, Redis, etc.)
  • Worker System: Distributed execution with multiple worker nodes
  • Comprehensive Logging: Structured logging with execution traces

Installation

Add to your Cargo.toml:

[dependencies]
taskflow-rs = "0.1.0"

Quick Start

use taskflow_rs::{TaskDefinition, TaskFlow};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let taskflow = TaskFlow::with_in_memory().await?;
    
    // Submit an HTTP task
    let task_id = taskflow
        .submit_http_task("fetch_data", "https://api.example.com/data", Some("GET"))
        .await?;
    
    // Submit a shell command task
    let shell_task_id = taskflow
        .submit_shell_task("process_files", "ls", vec!["-la"])
        .await?;
    
    // Start task execution
    taskflow.start().await?;
    
    // Monitor progress
    let metrics = taskflow.get_task_metrics().await?;
    println!("Metrics: {:?}", metrics);
    
    Ok(())
}

Task Types

HTTP Tasks

taskflow.submit_http_task("task_name", "https://example.com", Some("POST"))
    .with_headers(vec![("Authorization", "Bearer token")])
    .with_body("{\"data\": \"value\"}")
    .await?;

Shell Command Tasks

taskflow.submit_shell_task("process_data", "python", vec!["script.py", "--input", "data.txt"])
    .await?;

Custom Tasks

let task = TaskDefinition::new("custom_task", "my_handler_type")
    .with_payload("param1", serde_json::Value::String("value1".to_string()))
    .with_payload("param2", serde_json::Value::Number(42.into()))
    .with_dependencies(vec![previous_task_id]);

taskflow.submit_task(task).await?;

Architecture

Core Components

  • TaskFlow: Main framework orchestrator
  • Scheduler: Manages task scheduling and dependency resolution
  • Executor: Handles task execution with worker coordination
  • Storage: Abstract storage layer for task persistence
  • Task Handlers: Extensible system for different task types

Storage Backends

  • In-Memory: For development and testing
  • Redis: For distributed deployments (planned)
  • PostgreSQL: For persistent storage (planned)

Examples

See the examples/ directory for complete usage examples:

  • basic_usage.rs: Simple demonstration of core features
  • custom_handler.rs: Creating custom task handlers

API Reference

TaskFlow

impl TaskFlow {
    pub async fn with_in_memory() -> Result<Self, TaskFlowError>;
    pub async fn submit_http_task(&self, name: &str, url: &str, method: Option<&str>) -> Result<String, TaskFlowError>;
    pub async fn submit_shell_task(&self, name: &str, command: &str, args: Vec<&str>) -> Result<String, TaskFlowError>;
    pub async fn submit_task(&self, task: TaskDefinition) -> Result<String, TaskFlowError>;
    pub async fn start(&self) -> Result<(), TaskFlowError>;
    pub async fn shutdown(&self) -> Result<(), TaskFlowError>;
    pub async fn get_task_metrics(&self) -> Result<TaskMetrics, TaskFlowError>;
    pub async fn list_tasks(&self, status_filter: Option<TaskStatus>) -> Result<Vec<Task>, TaskFlowError>;
}

TaskDefinition

impl TaskDefinition {
    pub fn new(name: &str, handler_type: &str) -> Self;
    pub fn with_payload(mut self, key: &str, value: serde_json::Value) -> Self;
    pub fn with_dependencies(mut self, dependencies: Vec<String>) -> Self;
    pub fn with_max_retries(mut self, max_retries: u32) -> Self;
    pub fn with_retry_delay_ms(mut self, delay_ms: u64) -> Self;
    pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self;
}

Configuration

let config = TaskFlowConfig {
    max_workers: 4,
    worker_timeout_ms: 30000,
    task_timeout_ms: 60000,
    retry_delay_ms: 1000,
    max_retries: 3,
    storage: StorageConfig::InMemory,
};

let taskflow = TaskFlow::new(config).await?;

Error Handling

The framework uses TaskFlowError for all error cases, with detailed error variants:

pub enum TaskFlowError {
    StorageError(String),
    TaskNotFound(String),
    TaskValidationError(String),
    DependencyCycle(String),
    ExecutionError(String),
    TimeoutError(String),
    ConfigurationError(String),
}

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Add tests
  5. Submit a pull request

License

This project is licensed under the Apache 2.0 License - see the LICENSE file for details.