Module tracker

Module tracker 

Source
Expand description

§Task Tracker - Hierarchical Task Management System

A composable task management system with configurable scheduling and error handling policies. The TaskTracker enables controlled concurrent execution with proper resource management, cancellation semantics, and retry support.

§Architecture Overview

The TaskTracker system is built around three core abstractions that compose together:

§1. TaskScheduler - Resource Management

Controls when and how tasks acquire execution resources (permits, slots, etc.). Schedulers implement resource acquisition with cancellation support:

TaskScheduler::acquire_execution_slot(cancel_token) -> SchedulingResult<ResourceGuard>
  • Resource Acquisition: Can be cancelled to avoid unnecessary allocation
  • RAII Guards: Resources are automatically released when guards are dropped
  • Pluggable: Different scheduling policies (unlimited, semaphore, rate-limited, etc.)

§2. OnErrorPolicy - Error Handling

Defines how the system responds to task failures:

OnErrorPolicy::on_error(error, task_id) -> ErrorResponse
  • ErrorResponse::Fail: Log error, fail this task
  • ErrorResponse::Shutdown: Shutdown tracker and all children
  • ErrorResponse::Custom(action): Execute custom logic that can return:
    • ActionResult::Fail: Handle error and fail the task
    • ActionResult::Shutdown: Shutdown tracker
    • ActionResult::Continue { continuation }: Continue with provided task

§3. Execution Pipeline - Task Orchestration

The execution pipeline coordinates scheduling, execution, and error handling:

1. Acquire resources (scheduler.acquire_execution_slot)
2. Create task future (only after resources acquired)
3. Execute task while holding guard (RAII pattern)
4. Handle errors through policy (with retry support for cancellable tasks)
5. Update metrics and release resources

§Key Design Principles

§Separation of Concerns

  • Scheduling: When/how to allocate resources
  • Execution: Running tasks with proper resource management
  • Error Handling: Responding to failures with configurable policies

§Composability

  • Schedulers and error policies are independent and can be mixed/matched
  • Custom policies can be implemented via traits
  • Execution pipeline handles the coordination automatically

§Resource Safety

  • Resources are acquired before task creation (prevents early execution)
  • RAII pattern ensures resources are always released
  • Cancellation is supported during resource acquisition, not during execution

§Retry Support

  • Regular tasks (spawn): Cannot be retried (future is consumed)
  • Cancellable tasks (spawn_cancellable): Support retry via FnMut closures
  • Error policies can provide next executors via ActionResult::Continue

§Task Types

§Regular Tasks

let handle = tracker.spawn(async { Ok(42) });
  • Simple futures that run to completion
  • Cannot be retried (future is consumed on first execution)
  • Suitable for one-shot operations

§Cancellable Tasks

let handle = tracker.spawn_cancellable(|cancel_token| async move {
    // Task can check cancel_token.is_cancelled() or use tokio::select!
    CancellableTaskResult::Ok(42)
});
  • Receive a CancellationToken for cooperative cancellation
  • Support retry via FnMut closures (can be called multiple times)
  • Return CancellableTaskResult to indicate success/cancellation/error

§Hierarchical Structure

TaskTrackers form parent-child relationships:

  • Metrics: Child metrics aggregate to parents
  • Cancellation: Parent cancellation propagates to children
  • Independence: Child cancellation doesn’t affect parents
  • Cleanup: join() waits for all descendants bottom-up

§Metrics and Observability

Built-in metrics track task lifecycle:

  • issued: Tasks submitted via spawn methods
  • active: Currently executing tasks
  • success/failed/cancelled/rejected: Final outcomes
  • pending: Issued but not completed (issued - completed)
  • queued: Waiting for resources (pending - active)

Optional Prometheus integration available via PrometheusTaskMetrics.

§Usage Examples

§Basic Task Execution

use dynamo_runtime::utils::tasks::tracker::*;
use std::sync::Arc;

let scheduler = SemaphoreScheduler::with_permits(10);
let error_policy = LogOnlyPolicy::new();
let tracker = TaskTracker::new(scheduler, error_policy)?;

let handle = tracker.spawn(async { Ok(42) });
let result = handle.await??;
assert_eq!(result, 42);

§Cancellable Tasks with Retry

let handle = tracker.spawn_cancellable(|cancel_token| async move {
    tokio::select! {
        result = do_work() => CancellableTaskResult::Ok(result),
        _ = cancel_token.cancelled() => CancellableTaskResult::Cancelled,
    }
});

§Task-Driven Retry with Continuations

let handle = tracker.spawn(async {
    // Simulate initial failure with retry logic
    let error = FailedWithContinuation::from_fn(
        anyhow!("Network timeout"),
        || async {
            println!("Retrying with exponential backoff...");
            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
            Ok("Success after retry".to_string())
        }
    );
    let result: Result<String, anyhow::Error> = Err(error);
    result
});

let result = handle.await?;
assert!(result.is_ok());

§Custom Error Policy with Continuation

struct RetryPolicy {
    max_attempts: u32,
}

impl OnErrorPolicy for RetryPolicy {
    fn create_child(&self) -> Arc<dyn OnErrorPolicy> {
        Arc::new(RetryPolicy { max_attempts: self.max_attempts })
    }

    fn create_context(&self) -> Option<Box<dyn std::any::Any + Send + 'static>> {
        None // Stateless policy
    }

    fn on_error(&self, _error: &anyhow::Error, context: &mut OnErrorContext) -> ErrorResponse {
        if context.attempt_count < self.max_attempts {
            ErrorResponse::Custom(Box::new(RetryAction))
        } else {
            ErrorResponse::Fail
        }
    }
}

struct RetryAction;

#[async_trait]
impl OnErrorAction for RetryAction {
    async fn execute(
        &self,
        _error: &anyhow::Error,
        _task_id: TaskId,
        _attempt_count: u32,
        _context: &TaskExecutionContext,
    ) -> ActionResult {
        // In practice, you would create a continuation here
        ActionResult::Fail
    }
}

§Future Extensibility

The system is designed for extensibility. See the source code for detailed TODO comments describing additional policies that can be implemented:

  • Scheduling: Token bucket rate limiting, adaptive concurrency, memory-aware scheduling
  • Error Handling: Retry with backoff, circuit breakers, dead letter queues

Each TODO comment includes complete implementation guidance with data structures, algorithms, and dependencies needed for future contributors.

§Hierarchical Organization

use dynamo_runtime::utils::tasks::tracker::{
    TaskTracker, UnlimitedScheduler, ThresholdCancelPolicy, SemaphoreScheduler
};

// Create root tracker with failure threshold policy
let error_policy = ThresholdCancelPolicy::with_threshold(5);
let root = TaskTracker::builder()
    .scheduler(UnlimitedScheduler::new())
    .error_policy(error_policy)
    .build()?;

// Create child trackers for different components
let api_handler = root.child_tracker()?;  // Inherits policies
let background_jobs = root.child_tracker()?;

// Children can have custom policies
let rate_limited = root.child_tracker_builder()
    .scheduler(SemaphoreScheduler::with_permits(2))  // Custom concurrency limit
    .build()?;

// Tasks run independently but metrics roll up
api_handler.spawn(async { Ok(()) });
background_jobs.spawn(async { Ok(()) });
rate_limited.spawn(async { Ok(()) });

// Join all children hierarchically
root.join().await;
assert_eq!(root.metrics().success(), 3); // Sees all successes

§Policy Examples

use dynamo_runtime::utils::tasks::tracker::{
    TaskTracker, CancelOnError, SemaphoreScheduler, ThresholdCancelPolicy
};

// Pattern-based error cancellation
let (error_policy, token) = CancelOnError::with_patterns(
    vec!["OutOfMemory".to_string(), "DeviceError".to_string()]
);
let simple = TaskTracker::builder()
    .scheduler(SemaphoreScheduler::with_permits(5))
    .error_policy(error_policy)
    .build()?;

// Threshold-based cancellation with monitoring
let scheduler = SemaphoreScheduler::with_permits(10);  // Returns Arc<SemaphoreScheduler>
let error_policy = ThresholdCancelPolicy::with_threshold(3);  // Returns Arc<Policy>

let advanced = TaskTracker::builder()
    .scheduler(scheduler)
    .error_policy(error_policy)
    .build()?;

// Monitor cancellation externally
if token.is_cancelled() {
    println!("Tracker cancelled due to failures");
}

§Metrics and Observability

use dynamo_runtime::utils::tasks::tracker::{TaskTracker, SemaphoreScheduler, LogOnlyPolicy};

let tracker = std::sync::Arc::new(TaskTracker::builder()
    .scheduler(SemaphoreScheduler::with_permits(2))  // Only 2 concurrent tasks
    .error_policy(LogOnlyPolicy::new())
    .build()?);

// Spawn multiple tasks
for i in 0..5 {
    tracker.spawn(async move {
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
        Ok(i)
    });
}

// Check metrics
let metrics = tracker.metrics();
println!("Issued: {}", metrics.issued());        // 5 tasks issued
println!("Active: {}", metrics.active());        // 2 tasks running (semaphore limit)
println!("Queued: {}", metrics.queued());        // 3 tasks waiting in scheduler queue
println!("Pending: {}", metrics.pending());      // 5 tasks not yet completed

tracker.join().await;
assert_eq!(metrics.success(), 5);
assert_eq!(metrics.pending(), 0);

§Prometheus Integration

use dynamo_runtime::utils::tasks::tracker::{TaskTracker, SemaphoreScheduler, LogOnlyPolicy};
use dynamo_runtime::metrics::MetricsRegistry;

// Root tracker with Prometheus metrics
let tracker = TaskTracker::new_with_prometheus(
    SemaphoreScheduler::with_permits(10),
    LogOnlyPolicy::new(),
    registry,
    "my_component"
)?;

// Metrics automatically exported to Prometheus:
// - my_component_tasks_issued_total
// - my_component_tasks_success_total
// - my_component_tasks_failed_total
// - my_component_tasks_active
// - my_component_tasks_queued

Structs§

CancelOnError
Error policy that triggers cancellation based on error patterns
ChildTrackerBuilder
Builder for creating child trackers with custom policies
FailedWithContinuation
Error type that signals a task failed but provided a continuation
LogOnlyPolicy
Simple error policy that only logs errors
OnErrorContext
Trait for implementing error handling policies
PrometheusTaskMetrics
Root tracker metrics with Prometheus integration
RateCancelPolicy
Error policy that cancels tasks when failure rate exceeds threshold within time window
RateCancelPolicyBuilder
Builder for RateCancelPolicy
SemaphoreGuard
Resource guard for semaphore-based scheduling
SemaphoreScheduler
Semaphore-based task scheduler
TaskExecutionContext
Execution context provided to custom error actions
TaskHandle
A handle to a spawned task that provides both join functionality and cancellation control
TaskId
Unique identifier for a task
TaskMetrics
Task execution metrics for a tracker
TaskTracker
Hierarchical task tracker with pluggable scheduling and error policies
TaskTrackerBuilder
Builder for TaskTracker
ThresholdCancelPolicy
Error policy that cancels tasks after a threshold number of failures
TriggerCancellationTokenAction
Custom action that triggers a cancellation token when executed
TriggerCancellationTokenOnError
Test error policy that triggers a custom cancellation token on any error
UnlimitedGuard
Resource guard for unlimited scheduling
UnlimitedScheduler
Unlimited task scheduler that executes all tasks immediately

Enums§

ActionResult
Result of a custom error action execution
CancellableTaskResult
Result type for cancellable tasks that explicitly tracks cancellation
CompletionStatus
Result of task execution
ErrorPolicy
Common error handling policies for task failure management
ErrorResponse
Response type for error handling policies
SchedulingPolicy
Common scheduling policies for task execution
SchedulingResult
Result of scheduling a task
TaskError
Error type for task execution results
TaskExecutionResult
Result of task execution - unified for both regular and cancellable tasks

Traits§

ArcPolicy
Common functionality for policy Arc construction
Continuation
Trait for continuation tasks that execute after a failure
FailedWithContinuationExt
Extension trait for extracting FailedWithContinuation from anyhow::Error
HierarchicalTaskMetrics
Trait for hierarchical task metrics that supports aggregation up the tracker tree
OnErrorAction
Trait for implementing custom error handling actions
OnErrorPolicy
Error handling policy trait for task failures
ResourceGuard
Resource guard that manages task execution
TaskScheduler
Trait for implementing task scheduling policies