taskflow-rs
A modern, async task processing framework written in Rust for building scalable workflow systems with support for multiple task types, dependency management, and distributed execution.
Features
- Multiple Task Types: HTTP requests, shell commands, and custom task handlers
- Dependency Management: Tasks can depend on the completion of other tasks
- Async Execution: Built on Tokio for high-performance async execution
- Retry Mechanisms: Configurable retry policies with exponential backoff
- Real-time Monitoring: Live task metrics and status tracking
- Extensible Storage: Pluggable storage backends (in-memory, Redis, etc.)
- Worker System: Distributed execution with multiple worker nodes
- Comprehensive Logging: Structured logging with execution traces
Architecture
Core Components
- TaskFlow: Main framework orchestrator
- Scheduler: Manages task scheduling and dependency resolution
- Executor: Handles task execution with worker coordination
- Storage: Abstract storage layer for task persistence
- Task Handlers: Extensible system for different task types
Storage Backends
- In-Memory: For development and testing
- Redis: For distributed deployments (planned)
- PostgreSQL: For persistent storage (planned)
Installation
Add to your Cargo.toml:
cargo add taskflow-rs
Examples
Check the examples/ directory for complete working examples:
basic_usage.rs: Basic task submission and execution
yaml_config_usage.rs: Using YAML configuration for tasks
simple_execution.rs: Basic task submission and execution
custom_handler.rs: Custom task handler example
API Reference
TaskFlow
impl TaskFlow {
pub async fn new(config: TaskFlowConfig) -> Result<Self, TaskFlowError>;
pub async fn from_yaml_file<P: AsRef<Path>>(path: P) -> Result<Self, TaskFlowError>;
pub async fn from_yaml_str(config_content: &str) -> Result<Self, TaskFlowError>;
pub async fn register_handler(&self, handler: Arc<dyn TaskHandler>);
pub async fn submit_task(&self, definition: TaskDefinition) -> Result<String, TaskFlowError>;
pub async fn submit_http_task(
&self,
name: &str,
url: &str,
method: Option<&str>
) -> Result<String, TaskFlowError>;
pub async fn submit_shell_task(
&self,
name: &str,
command: &str,
args: Vec<&str>
) -> Result<String, TaskFlowError>;
pub async fn get_task_status(&self, task_id: &str) -> Result<Option<TaskStatus>, TaskFlowError>;
pub async fn cancel_task(&self, task_id: &str) -> Result<(), TaskFlowError>;
pub async fn list_tasks(&self, status: Option<TaskStatus>) -> Result<Vec<Task>, TaskFlowError>;
pub async fn wait_for_completion(
&self,
task_id: &str,
timeout_seconds: Option<u64>
) -> Result<Task, TaskFlowError>;
pub async fn get_task_metrics(&self) -> Result<TaskMetrics, TaskFlowError>;
pub async fn start(&self) -> Result<(), TaskFlowError>;
pub async fn shutdown(&self) -> Result<(), TaskFlowError>;
}
TaskDefinition
impl TaskDefinition {
pub fn new(name: &str, task_type: &str) -> Self;
pub fn with_payload(mut self, key: &str, value: serde_json::Value) -> Self;
pub fn with_priority(mut self, priority: i32) -> Self;
pub fn with_timeout(mut self, timeout_seconds: u64) -> Self;
pub fn with_dependencies(mut self, dependencies: Vec<String>) -> Self;
pub fn with_tags(mut self, tags: Vec<String>) -> Self;
pub fn schedule_at(mut self, scheduled_at: DateTime<Utc>) -> Self;
}
Configuration
use taskflow_rs::framework::TaskFlowConfig;
let config = TaskFlowConfig {
max_workers: 4, task_timeout_ms: 30000, retry_delay_ms: 1000, max_retries: 3, storage_type: StorageType::Memory, };
let taskflow = TaskFlow::new(config).await?;
Error Handling
The framework uses TaskFlowError for all error cases, with detailed error variants:
pub enum TaskFlowError {
StorageError(String),
TaskNotFound(String),
TaskValidationError(String),
DependencyCycle(String),
ExecutionError(String),
TimeoutError(String),
ConfigurationError(String),
}
Contributing
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests
- Submit a pull request
License
This project is licensed under the Apache 2.0 License.