Expand description
§Task Tracker - Hierarchical Task Management System
A composable task management system with configurable scheduling and error handling policies. The TaskTracker enables controlled concurrent execution with proper resource management, cancellation semantics, and retry support.
§Architecture Overview
The TaskTracker system is built around three core abstractions that compose together:
§1. TaskScheduler - Resource Management
Controls when and how tasks acquire execution resources (permits, slots, etc.). Schedulers implement resource acquisition with cancellation support:
TaskScheduler::acquire_execution_slot(cancel_token) -> SchedulingResult<ResourceGuard>
- Resource Acquisition: Can be cancelled to avoid unnecessary allocation
- RAII Guards: Resources are automatically released when guards are dropped
- Pluggable: Different scheduling policies (unlimited, semaphore, rate-limited, etc.)
§2. OnErrorPolicy - Error Handling
Defines how the system responds to task failures:
OnErrorPolicy::on_error(error, task_id) -> ErrorResponse
- ErrorResponse::Fail: Log error, fail this task
- ErrorResponse::Shutdown: Shutdown tracker and all children
- ErrorResponse::Custom(action): Execute custom logic that can return:
ActionResult::Fail
: Handle error and fail the taskActionResult::Shutdown
: Shutdown trackerActionResult::Continue { continuation }
: Continue with provided task
§3. Execution Pipeline - Task Orchestration
The execution pipeline coordinates scheduling, execution, and error handling:
1. Acquire resources (scheduler.acquire_execution_slot)
2. Create task future (only after resources acquired)
3. Execute task while holding guard (RAII pattern)
4. Handle errors through policy (with retry support for cancellable tasks)
5. Update metrics and release resources
§Key Design Principles
§Separation of Concerns
- Scheduling: When/how to allocate resources
- Execution: Running tasks with proper resource management
- Error Handling: Responding to failures with configurable policies
§Composability
- Schedulers and error policies are independent and can be mixed/matched
- Custom policies can be implemented via traits
- Execution pipeline handles the coordination automatically
§Resource Safety
- Resources are acquired before task creation (prevents early execution)
- RAII pattern ensures resources are always released
- Cancellation is supported during resource acquisition, not during execution
§Retry Support
- Regular tasks (
spawn
): Cannot be retried (future is consumed) - Cancellable tasks (
spawn_cancellable
): Support retry viaFnMut
closures - Error policies can provide next executors via
ActionResult::Continue
§Task Types
§Regular Tasks
let handle = tracker.spawn(async { Ok(42) });
- Simple futures that run to completion
- Cannot be retried (future is consumed on first execution)
- Suitable for one-shot operations
§Cancellable Tasks
let handle = tracker.spawn_cancellable(|cancel_token| async move {
// Task can check cancel_token.is_cancelled() or use tokio::select!
CancellableTaskResult::Ok(42)
});
- Receive a
CancellationToken
for cooperative cancellation - Support retry via
FnMut
closures (can be called multiple times) - Return
CancellableTaskResult
to indicate success/cancellation/error
§Hierarchical Structure
TaskTrackers form parent-child relationships:
- Metrics: Child metrics aggregate to parents
- Cancellation: Parent cancellation propagates to children
- Independence: Child cancellation doesn’t affect parents
- Cleanup:
join()
waits for all descendants bottom-up
§Metrics and Observability
Built-in metrics track task lifecycle:
issued
: Tasks submitted via spawn methodsactive
: Currently executing taskssuccess/failed/cancelled/rejected
: Final outcomespending
: Issued but not completed (issued - completed)queued
: Waiting for resources (pending - active)
Optional Prometheus integration available via PrometheusTaskMetrics
.
§Usage Examples
§Basic Task Execution
use dynamo_runtime::utils::tasks::tracker::*;
use std::sync::Arc;
let scheduler = SemaphoreScheduler::with_permits(10);
let error_policy = LogOnlyPolicy::new();
let tracker = TaskTracker::new(scheduler, error_policy)?;
let handle = tracker.spawn(async { Ok(42) });
let result = handle.await??;
assert_eq!(result, 42);
§Cancellable Tasks with Retry
let handle = tracker.spawn_cancellable(|cancel_token| async move {
tokio::select! {
result = do_work() => CancellableTaskResult::Ok(result),
_ = cancel_token.cancelled() => CancellableTaskResult::Cancelled,
}
});
§Task-Driven Retry with Continuations
let handle = tracker.spawn(async {
// Simulate initial failure with retry logic
let error = FailedWithContinuation::from_fn(
anyhow!("Network timeout"),
|| async {
println!("Retrying with exponential backoff...");
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Ok("Success after retry".to_string())
}
);
let result: Result<String, anyhow::Error> = Err(error);
result
});
let result = handle.await?;
assert!(result.is_ok());
§Custom Error Policy with Continuation
struct RetryPolicy {
max_attempts: u32,
}
impl OnErrorPolicy for RetryPolicy {
fn create_child(&self) -> Arc<dyn OnErrorPolicy> {
Arc::new(RetryPolicy { max_attempts: self.max_attempts })
}
fn create_context(&self) -> Option<Box<dyn std::any::Any + Send + 'static>> {
None // Stateless policy
}
fn on_error(&self, _error: &anyhow::Error, context: &mut OnErrorContext) -> ErrorResponse {
if context.attempt_count < self.max_attempts {
ErrorResponse::Custom(Box::new(RetryAction))
} else {
ErrorResponse::Fail
}
}
}
struct RetryAction;
#[async_trait]
impl OnErrorAction for RetryAction {
async fn execute(
&self,
_error: &anyhow::Error,
_task_id: TaskId,
_attempt_count: u32,
_context: &TaskExecutionContext,
) -> ActionResult {
// In practice, you would create a continuation here
ActionResult::Fail
}
}
§Future Extensibility
The system is designed for extensibility. See the source code for detailed TODO comments describing additional policies that can be implemented:
- Scheduling: Token bucket rate limiting, adaptive concurrency, memory-aware scheduling
- Error Handling: Retry with backoff, circuit breakers, dead letter queues
Each TODO comment includes complete implementation guidance with data structures, algorithms, and dependencies needed for future contributors.
§Hierarchical Organization
use dynamo_runtime::utils::tasks::tracker::{
TaskTracker, UnlimitedScheduler, ThresholdCancelPolicy, SemaphoreScheduler
};
// Create root tracker with failure threshold policy
let error_policy = ThresholdCancelPolicy::with_threshold(5);
let root = TaskTracker::builder()
.scheduler(UnlimitedScheduler::new())
.error_policy(error_policy)
.build()?;
// Create child trackers for different components
let api_handler = root.child_tracker()?; // Inherits policies
let background_jobs = root.child_tracker()?;
// Children can have custom policies
let rate_limited = root.child_tracker_builder()
.scheduler(SemaphoreScheduler::with_permits(2)) // Custom concurrency limit
.build()?;
// Tasks run independently but metrics roll up
api_handler.spawn(async { Ok(()) });
background_jobs.spawn(async { Ok(()) });
rate_limited.spawn(async { Ok(()) });
// Join all children hierarchically
root.join().await;
assert_eq!(root.metrics().success(), 3); // Sees all successes
§Policy Examples
use dynamo_runtime::utils::tasks::tracker::{
TaskTracker, CancelOnError, SemaphoreScheduler, ThresholdCancelPolicy
};
// Pattern-based error cancellation
let (error_policy, token) = CancelOnError::with_patterns(
vec!["OutOfMemory".to_string(), "DeviceError".to_string()]
);
let simple = TaskTracker::builder()
.scheduler(SemaphoreScheduler::with_permits(5))
.error_policy(error_policy)
.build()?;
// Threshold-based cancellation with monitoring
let scheduler = SemaphoreScheduler::with_permits(10); // Returns Arc<SemaphoreScheduler>
let error_policy = ThresholdCancelPolicy::with_threshold(3); // Returns Arc<Policy>
let advanced = TaskTracker::builder()
.scheduler(scheduler)
.error_policy(error_policy)
.build()?;
// Monitor cancellation externally
if token.is_cancelled() {
println!("Tracker cancelled due to failures");
}
§Metrics and Observability
use dynamo_runtime::utils::tasks::tracker::{TaskTracker, SemaphoreScheduler, LogOnlyPolicy};
let tracker = std::sync::Arc::new(TaskTracker::builder()
.scheduler(SemaphoreScheduler::with_permits(2)) // Only 2 concurrent tasks
.error_policy(LogOnlyPolicy::new())
.build()?);
// Spawn multiple tasks
for i in 0..5 {
tracker.spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Ok(i)
});
}
// Check metrics
let metrics = tracker.metrics();
println!("Issued: {}", metrics.issued()); // 5 tasks issued
println!("Active: {}", metrics.active()); // 2 tasks running (semaphore limit)
println!("Queued: {}", metrics.queued()); // 3 tasks waiting in scheduler queue
println!("Pending: {}", metrics.pending()); // 5 tasks not yet completed
tracker.join().await;
assert_eq!(metrics.success(), 5);
assert_eq!(metrics.pending(), 0);
§Prometheus Integration
use dynamo_runtime::utils::tasks::tracker::{TaskTracker, SemaphoreScheduler, LogOnlyPolicy};
use dynamo_runtime::metrics::MetricsRegistry;
// Root tracker with Prometheus metrics
let tracker = TaskTracker::new_with_prometheus(
SemaphoreScheduler::with_permits(10),
LogOnlyPolicy::new(),
registry,
"my_component"
)?;
// Metrics automatically exported to Prometheus:
// - my_component_tasks_issued_total
// - my_component_tasks_success_total
// - my_component_tasks_failed_total
// - my_component_tasks_active
// - my_component_tasks_queued
Structs§
- Cancel
OnError - Error policy that triggers cancellation based on error patterns
- Child
Tracker Builder - Builder for creating child trackers with custom policies
- Failed
With Continuation - Error type that signals a task failed but provided a continuation
- LogOnly
Policy - Simple error policy that only logs errors
- OnError
Context - Trait for implementing error handling policies
- Prometheus
Task Metrics - Root tracker metrics with Prometheus integration
- Rate
Cancel Policy - Error policy that cancels tasks when failure rate exceeds threshold within time window
- Rate
Cancel Policy Builder - Builder for RateCancelPolicy
- Semaphore
Guard - Resource guard for semaphore-based scheduling
- Semaphore
Scheduler - Semaphore-based task scheduler
- Task
Execution Context - Execution context provided to custom error actions
- Task
Handle - A handle to a spawned task that provides both join functionality and cancellation control
- TaskId
- Unique identifier for a task
- Task
Metrics - Task execution metrics for a tracker
- Task
Tracker - Hierarchical task tracker with pluggable scheduling and error policies
- Task
Tracker Builder - Builder for TaskTracker
- Threshold
Cancel Policy - Error policy that cancels tasks after a threshold number of failures
- Trigger
Cancellation Token Action - Custom action that triggers a cancellation token when executed
- Trigger
Cancellation Token OnError - Test error policy that triggers a custom cancellation token on any error
- Unlimited
Guard - Resource guard for unlimited scheduling
- Unlimited
Scheduler - Unlimited task scheduler that executes all tasks immediately
Enums§
- Action
Result - Result of a custom error action execution
- Cancellable
Task Result - Result type for cancellable tasks that explicitly tracks cancellation
- Completion
Status - Result of task execution
- Error
Policy - Common error handling policies for task failure management
- Error
Response - Response type for error handling policies
- Scheduling
Policy - Common scheduling policies for task execution
- Scheduling
Result - Result of scheduling a task
- Task
Error - Error type for task execution results
- Task
Execution Result - Result of task execution - unified for both regular and cancellable tasks
Traits§
- ArcPolicy
- Common functionality for policy Arc construction
- Continuation
- Trait for continuation tasks that execute after a failure
- Failed
With Continuation Ext - Extension trait for extracting FailedWithContinuation from anyhow::Error
- Hierarchical
Task Metrics - Trait for hierarchical task metrics that supports aggregation up the tracker tree
- OnError
Action - Trait for implementing custom error handling actions
- OnError
Policy - Error handling policy trait for task failures
- Resource
Guard - Resource guard that manages task execution
- Task
Scheduler - Trait for implementing task scheduling policies