TaskFlow.rs
A modern, async task processing framework written in Rust for building scalable workflow systems with support for multiple task types, dependency management, and distributed execution.
Features
- Multiple Task Types: HTTP requests, shell commands, and custom task handlers
- Dependency Management: Tasks can depend on the completion of other tasks
- Async Execution: Built on Tokio for high-performance async execution
- Retry Mechanisms: Configurable retry policies with exponential backoff
- Real-time Monitoring: Live task metrics and status tracking
- Extensible Storage: Pluggable storage backends (in-memory, Redis, etc.)
- Worker System: Distributed execution with multiple worker nodes
- Comprehensive Logging: Structured logging with execution traces
Installation
Add to your Cargo.toml:
[dependencies]
taskflow-rs = "0.1.0"
Quick Start
use taskflow_rs::{TaskDefinition, TaskFlow};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let taskflow = TaskFlow::with_in_memory().await?;
let task_id = taskflow
.submit_http_task("fetch_data", "https://api.example.com/data", Some("GET"))
.await?;
let shell_task_id = taskflow
.submit_shell_task("process_files", "ls", vec!["-la"])
.await?;
taskflow.start().await?;
let metrics = taskflow.get_task_metrics().await?;
println!("Metrics: {:?}", metrics);
Ok(())
}
Task Types
HTTP Tasks
taskflow.submit_http_task("task_name", "https://example.com", Some("POST"))
.with_headers(vec![("Authorization", "Bearer token")])
.with_body("{\"data\": \"value\"}")
.await?;
Shell Command Tasks
taskflow.submit_shell_task("process_data", "python", vec!["script.py", "--input", "data.txt"])
.await?;
Custom Tasks
let task = TaskDefinition::new("custom_task", "my_handler_type")
.with_payload("param1", serde_json::Value::String("value1".to_string()))
.with_payload("param2", serde_json::Value::Number(42.into()))
.with_dependencies(vec![previous_task_id]);
taskflow.submit_task(task).await?;
Architecture
Core Components
- TaskFlow: Main framework orchestrator
- Scheduler: Manages task scheduling and dependency resolution
- Executor: Handles task execution with worker coordination
- Storage: Abstract storage layer for task persistence
- Task Handlers: Extensible system for different task types
Storage Backends
- In-Memory: For development and testing
- Redis: For distributed deployments (planned)
- PostgreSQL: For persistent storage (planned)
Examples
See the examples/ directory for complete usage examples:
basic_usage.rs: Simple demonstration of core features
custom_handler.rs: Creating custom task handlers
API Reference
TaskFlow
impl TaskFlow {
pub async fn with_in_memory() -> Result<Self, TaskFlowError>;
pub async fn submit_http_task(&self, name: &str, url: &str, method: Option<&str>) -> Result<String, TaskFlowError>;
pub async fn submit_shell_task(&self, name: &str, command: &str, args: Vec<&str>) -> Result<String, TaskFlowError>;
pub async fn submit_task(&self, task: TaskDefinition) -> Result<String, TaskFlowError>;
pub async fn start(&self) -> Result<(), TaskFlowError>;
pub async fn shutdown(&self) -> Result<(), TaskFlowError>;
pub async fn get_task_metrics(&self) -> Result<TaskMetrics, TaskFlowError>;
pub async fn list_tasks(&self, status_filter: Option<TaskStatus>) -> Result<Vec<Task>, TaskFlowError>;
}
TaskDefinition
impl TaskDefinition {
pub fn new(name: &str, handler_type: &str) -> Self;
pub fn with_payload(mut self, key: &str, value: serde_json::Value) -> Self;
pub fn with_dependencies(mut self, dependencies: Vec<String>) -> Self;
pub fn with_max_retries(mut self, max_retries: u32) -> Self;
pub fn with_retry_delay_ms(mut self, delay_ms: u64) -> Self;
pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self;
}
Configuration
let config = TaskFlowConfig {
max_workers: 4,
worker_timeout_ms: 30000,
task_timeout_ms: 60000,
retry_delay_ms: 1000,
max_retries: 3,
storage: StorageConfig::InMemory,
};
let taskflow = TaskFlow::new(config).await?;
Error Handling
The framework uses TaskFlowError for all error cases, with detailed error variants:
pub enum TaskFlowError {
StorageError(String),
TaskNotFound(String),
TaskValidationError(String),
DependencyCycle(String),
ExecutionError(String),
TimeoutError(String),
ConfigurationError(String),
}
Contributing
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests
- Submit a pull request
License
This project is licensed under the Apache 2.0 License - see the LICENSE file for details.