Skip to main content

Crate taskmill

Crate taskmill 

Source
Expand description

§Taskmill

Adaptive priority work scheduler with IO-aware concurrency and SQLite persistence.

Taskmill provides a generic task scheduling system that:

  • Persists tasks to SQLite so the queue survives restarts
  • Schedules by priority (0 = highest, 255 = lowest) with named tiers
  • Deduplicates tasks by key — submitting an already-queued key is a no-op
  • Tracks expected and actual IO bytes (disk and network) per task for budget-based scheduling
  • Monitors system CPU, disk, and network throughput to adjust concurrency
  • Supports composable backpressure from arbitrary external sources
  • Preempts lower-priority work when high-priority tasks arrive
  • Retries failed tasks at the same priority level
  • Records completed/failed task history for queries and IO learning
  • Emits lifecycle events including progress for UI integration
  • Supports graceful shutdown with configurable drain timeout

§Concepts

§Task lifecycle

A task flows through a linear pipeline:

submit → pending → running → completed
                 ↘ paused ↗     ↘ failed (retryable → pending)
                                 ↘ failed (permanent → history)
  1. SubmitScheduler::submit (or submit_typed) enqueues a TaskSubmission into the SQLite store.
  2. Pending — the task waits in a priority queue. The scheduler’s run loop pops the highest-priority pending task on each tick.
  3. Running — the scheduler calls TaskExecutor::execute with a TaskContext containing the task record, a cancellation token, and a progress reporter.
  4. Terminal — on success the task moves to the history table. On failure, a retryable error requeues it (up to SchedulerBuilder::max_retries); a non-retryable error moves it to history as failed.

§Deduplication

Every task has a dedup key derived from its type name and either an explicit key string or the serialized payload (via SHA-256). Submitting a task whose key already exists returns SubmitOutcome::Duplicate (or Upgraded if the new submission has higher priority). This makes it safe to call submit idempotently.

§Priority & preemption

Priority is a u8 newtype where lower values = higher priority. Named constants (REALTIME, HIGH, NORMAL, BACKGROUND, IDLE) cover common tiers. When a task at or above the preempt_priority threshold is submitted, lower-priority running tasks are cancelled and paused so the urgent work runs immediately.

§IO budgeting

Each task declares expected read/write bytes (via TypedTask or TaskSubmission fields). The scheduler tracks running IO totals and, when resource monitoring is enabled, compares them against observed system disk throughput to avoid over-saturating the disk. Executors report actual IO via TaskContext::record_read_bytes / record_write_bytes, which feeds back into historical throughput averages for future scheduling decisions.

§Network IO

Tasks can also declare expected network IO via TaskSubmission::expected_net_io (or TypedTask::expected_net_rx_bytes / expected_net_tx_bytes). Executors report actual network bytes via TaskContext::record_net_rx_bytes / record_net_tx_bytes. To throttle tasks when network bandwidth is saturated, set a bandwidth cap with SchedulerBuilder::bandwidth_limit — this registers a built-in NetworkPressure source that maps observed throughput to backpressure.

§Task groups

Tasks can be assigned to a named group via TaskSubmission::group (or TypedTask::group_key). The scheduler enforces per-group concurrency limits — for example, limiting uploads to any single S3 bucket to 4 concurrent tasks. Configure limits at build time with SchedulerBuilder::group_concurrency and SchedulerBuilder::default_group_concurrency, or adjust at runtime via Scheduler::set_group_limit and Scheduler::set_default_group_concurrency.

§Child tasks & two-phase execution

An executor can spawn child tasks via TaskContext::spawn_child. When children exist, the parent enters a waiting state after its executor returns. Once all children complete, the parent’s TaskExecutor::finalize method is called — useful for assembly work like CompleteMultipartUpload. If any child fails and fail_fast is true (the default), siblings are cancelled and the parent fails immediately.

§Quick start

use std::sync::Arc;
use taskmill::{
    Scheduler, TaskExecutor, TaskContext, TaskError,
    TypedTask, Priority,
};
use serde::{Serialize, Deserialize};
use tokio_util::sync::CancellationToken;

// 1. Define a task payload.
#[derive(Serialize, Deserialize)]
struct Thumbnail { path: String, size: u32 }

impl TypedTask for Thumbnail {
    const TASK_TYPE: &'static str = "thumbnail";
    fn expected_read_bytes(&self) -> i64 { 4_096 }
    fn expected_write_bytes(&self) -> i64 { 1_024 }
}

// 2. Implement the executor.
struct ThumbnailExecutor;

impl TaskExecutor for ThumbnailExecutor {
    async fn execute<'a>(
        &'a self, ctx: &'a TaskContext,
    ) -> Result<(), TaskError> {
        let thumb: Thumbnail = ctx.payload()?;
        ctx.progress().report(0.5, Some("resizing".into()));
        // ... do work, check ctx.token().is_cancelled() ...
        ctx.record_read_bytes(4_096);
        ctx.record_write_bytes(1_024);
        Ok(())
    }
}

// 3. Build and run the scheduler.
let scheduler = Scheduler::builder()
    .store_path("tasks.db")
    .typed_executor::<Thumbnail, _>(Arc::new(ThumbnailExecutor))
    .max_concurrency(4)
    .with_resource_monitoring()
    .build()
    .await?;

// 4. Submit work.
let task = Thumbnail { path: "/photos/a.jpg".into(), size: 256 };
scheduler.submit_typed(&task).await?;

// 5. Run until cancelled.
let token = CancellationToken::new();
scheduler.run(token).await;

§Common patterns

§Shared application state

Register shared services (database pools, HTTP clients, etc.) at build time and retrieve them from any executor via TaskContext::state:

struct AppServices { db: DatabasePool, http: reqwest::Client }

let scheduler = Scheduler::builder()
    .store_path("tasks.db")
    .app_state(AppServices { /* ... */ })
    .executor("ingest", Arc::new(IngestExecutor))
    .build()
    .await?;

// Inside the executor:
async fn execute<'a>(&'a self, ctx: &'a TaskContext) -> Result<(), TaskError> {
    let svc = ctx.state::<AppServices>().expect("AppServices not registered");
    svc.db.query("...").await?;
    Ok(())
}

State can also be injected after construction via Scheduler::register_state — useful when a library (e.g. shoebox) receives a pre-built scheduler from a parent application.

§Backpressure

Implement PressureSource to feed external signals into the scheduler’s throttle decisions. The default ThrottlePolicy pauses BACKGROUND tasks above 50% pressure and NORMAL tasks above 75%:

use std::sync::atomic::{AtomicU32, Ordering};
use taskmill::{PressureSource, Scheduler};

struct ApiLoad { active: AtomicU32, max: u32 }

impl PressureSource for ApiLoad {
    fn pressure(&self) -> f32 {
        self.active.load(Ordering::Relaxed) as f32 / self.max as f32
    }
    fn name(&self) -> &str { "api-load" }
}

let scheduler = Scheduler::builder()
    .store_path("tasks.db")
    .pressure_source(Box::new(ApiLoad { active: AtomicU32::new(0), max: 100 }))
    // .throttle_policy(custom_policy)  // optional override
    .build()
    .await?;

§Events & progress

Subscribe to SchedulerEvents to drive a UI or collect metrics:

let mut rx = scheduler.subscribe();
tokio::spawn(async move {
    while let Ok(event) = rx.recv().await {
        match event {
            SchedulerEvent::Progress { task_id, percent, message, .. } => {
                update_progress_bar(task_id, percent, message);
            }
            SchedulerEvent::Completed { task_id, .. } => {
                mark_done(task_id);
            }
            _ => {}
        }
    }
});

For a single-call dashboard snapshot, use Scheduler::snapshot which returns a serializable SchedulerSnapshot with queue depths, running tasks, progress estimates, and backpressure.

§Group concurrency

Limit concurrent tasks within a named group — for example, cap uploads per S3 bucket:

let scheduler = Scheduler::builder()
    .store_path("tasks.db")
    .executor("upload-part", Arc::new(UploadPartExecutor))
    .default_group_concurrency(4)               // default for all groups
    .group_concurrency("s3://hot-bucket", 8)    // override for one group
    .build()
    .await?;

// Tasks declare their group via the submission:
let sub = TaskSubmission::new("upload-part")
    .group("s3://my-bucket")
    .payload_json(&part)?;
scheduler.submit(&sub).await?;

// Adjust at runtime:
scheduler.set_group_limit("s3://my-bucket", 2);
scheduler.remove_group_limit("s3://my-bucket"); // fall back to default

§Child tasks

Spawn child tasks from an executor to model fan-out work. The parent automatically waits for all children before its finalize method is called:

impl TaskExecutor for MultipartUploadExecutor {
    async fn execute<'a>(&'a self, ctx: &'a TaskContext) -> Result<(), TaskError> {
        let upload: MultipartUpload = ctx.payload()?;
        for part in &upload.parts {
            ctx.spawn_child(
                TaskSubmission::new("upload-part")
                    .key(&part.etag)
                    .priority(ctx.record().priority)
                    .payload_json(part)?
                    .expected_io(part.size as i64, 0),
            ).await?;
        }
        Ok(())
    }

    async fn finalize<'a>(&'a self, ctx: &'a TaskContext) -> Result<(), TaskError> {
        // All parts uploaded — complete the multipart upload.
        let upload: MultipartUpload = ctx.payload()?;
        complete_multipart(&upload).await?;
        Ok(())
    }
}

§How the dispatch loop works

Understanding the run loop helps when tuning SchedulerConfig:

  1. The loop wakes on three conditions: a new task was submitted (via Notify), the poll_interval elapsed (default 500ms), or the cancellation token fired.
  2. Paused tasks are resumed if no active preemptors exist at their priority level.
  3. Pending finalizers (parents whose children all completed) are dispatched first.
  4. The highest-priority pending task is peeked (without claiming it).
  5. The dispatch gate checks concurrency limits, group concurrency, IO budget, and backpressure. If the gate rejects, no slot is consumed.
  6. If admitted, the task is atomically claimed (peekpop_by_id) and spawned as a Tokio task.
  7. Steps 4–6 repeat until the queue is empty or the gate rejects.

§Feature flags

Re-exports§

pub use backpressure::CompositePressure;
pub use backpressure::PressureSource;
pub use backpressure::ThrottlePolicy;
pub use priority::Priority;
pub use registry::TaskContext;
pub use registry::TaskExecutor;
pub use resource::network_pressure::NetworkPressure;
pub use resource::sampler::SamplerConfig;
pub use resource::ResourceReader;
pub use resource::ResourceSampler;
pub use resource::ResourceSnapshot;
pub use scheduler::EstimatedProgress;
pub use scheduler::GroupLimits;
pub use scheduler::ProgressReporter;
pub use scheduler::Scheduler;
pub use scheduler::SchedulerBuilder;
pub use scheduler::SchedulerConfig;
pub use scheduler::SchedulerEvent;
pub use scheduler::SchedulerSnapshot;
pub use scheduler::ShutdownMode;
pub use store::RetentionPolicy;
pub use store::StoreConfig;
pub use store::StoreError;
pub use store::TaskStore;
pub use task::generate_dedup_key;
pub use task::HistoryStatus;
pub use task::ParentResolution;
pub use task::SubmitOutcome;
pub use task::TaskError;
pub use task::TaskHistoryRecord;
pub use task::TaskLookup;
pub use task::TaskMetrics;
pub use task::TaskRecord;
pub use task::TaskStatus;
pub use task::TaskSubmission;
pub use task::TypeStats;
pub use task::TypedTask;
pub use resource::platform_sampler;

Modules§

backpressure
Composable backpressure for throttling task dispatch.
priority
Priority levels for task scheduling.
registry
Executor registration, shared state, and the TaskContext passed to each task.
resource
System resource monitoring for IO-aware scheduling.
scheduler
The scheduler: configuration, event stream, and the main run loop.
store
SQLite-backed persistence layer for the task queue and history.
task
Task types, submission parameters, and the TypedTask trait.