Expand description
§Taskmill
Adaptive priority work scheduler with IO-aware concurrency and SQLite persistence.
Taskmill provides a generic task scheduling system that:
- Persists tasks to SQLite so the queue survives restarts
- Schedules by priority (0 = highest, 255 = lowest) with named tiers
- Deduplicates tasks by key — submitting an already-queued key is a no-op
- Tracks expected and actual IO bytes (disk and network) per task for budget-based scheduling
- Monitors system CPU, disk, and network throughput to adjust concurrency
- Supports composable backpressure from arbitrary external sources
- Preempts lower-priority work when high-priority tasks arrive
- Retries failed tasks at the same priority level
- Records completed/failed task history for queries and IO learning
- Emits lifecycle events including progress for UI integration
- Supports graceful shutdown with configurable drain timeout
§Concepts
§Task lifecycle
A task flows through a linear pipeline:
submit → pending → running → completed
↘ paused ↗ ↘ failed (retryable → pending)
↘ failed (permanent → history)- Submit —
Scheduler::submit(orsubmit_typed) enqueues aTaskSubmissioninto the SQLite store. - Pending — the task waits in a priority queue. The scheduler’s run loop pops the highest-priority pending task on each tick.
- Running — the scheduler calls
TaskExecutor::executewith aTaskContextcontaining the task record, a cancellation token, and a progress reporter. - Terminal — on success the task moves to the history table. On failure,
a
retryableerror requeues it (up toSchedulerBuilder::max_retries); a non-retryable error moves it to history as failed.
§Deduplication
Every task has a dedup key derived from its type name and either an explicit
key string or the serialized payload (via SHA-256). Submitting a task whose
key already exists returns SubmitOutcome::Duplicate (or
Upgraded if the new submission has higher
priority). This makes it safe to call submit idempotently.
§Priority & preemption
Priority is a u8 newtype where lower values = higher priority.
Named constants (REALTIME,
HIGH, NORMAL,
BACKGROUND, IDLE) cover
common tiers. When a task at or above the
preempt_priority threshold is
submitted, lower-priority running tasks are cancelled and paused so the
urgent work runs immediately.
§IO budgeting
Each task declares expected read/write bytes (via TypedTask or
TaskSubmission fields). The scheduler tracks running IO totals and,
when resource monitoring is
enabled, compares them against observed system disk throughput to avoid
over-saturating the disk. Executors report actual IO via
TaskContext::record_read_bytes / record_write_bytes,
which feeds back into historical throughput averages for future scheduling
decisions.
§Network IO
Tasks can also declare expected network IO via
TaskSubmission::expected_net_io (or TypedTask::expected_net_rx_bytes /
expected_net_tx_bytes). Executors report
actual network bytes via TaskContext::record_net_rx_bytes /
record_net_tx_bytes. To throttle tasks
when network bandwidth is saturated, set a bandwidth cap with
SchedulerBuilder::bandwidth_limit — this registers a built-in
NetworkPressure source that maps observed throughput to backpressure.
§Task groups
Tasks can be assigned to a named group via TaskSubmission::group (or
TypedTask::group_key). The scheduler enforces per-group concurrency
limits — for example, limiting uploads to any single S3 bucket to 4
concurrent tasks. Configure limits at build time with
SchedulerBuilder::group_concurrency and
SchedulerBuilder::default_group_concurrency, or adjust at runtime via
Scheduler::set_group_limit and Scheduler::set_default_group_concurrency.
§Child tasks & two-phase execution
An executor can spawn child tasks via TaskContext::spawn_child. When
children exist, the parent enters a waiting state after its executor
returns. Once all children complete, the parent’s
TaskExecutor::finalize method is called — useful for assembly work
like CompleteMultipartUpload. If any child fails and
fail_fast is true (the default), siblings
are cancelled and the parent fails immediately.
§Quick start
use std::sync::Arc;
use taskmill::{
Scheduler, TaskExecutor, TaskContext, TaskError,
TypedTask, Priority,
};
use serde::{Serialize, Deserialize};
use tokio_util::sync::CancellationToken;
// 1. Define a task payload.
#[derive(Serialize, Deserialize)]
struct Thumbnail { path: String, size: u32 }
impl TypedTask for Thumbnail {
const TASK_TYPE: &'static str = "thumbnail";
fn expected_read_bytes(&self) -> i64 { 4_096 }
fn expected_write_bytes(&self) -> i64 { 1_024 }
}
// 2. Implement the executor.
struct ThumbnailExecutor;
impl TaskExecutor for ThumbnailExecutor {
async fn execute<'a>(
&'a self, ctx: &'a TaskContext,
) -> Result<(), TaskError> {
let thumb: Thumbnail = ctx.payload()?;
ctx.progress().report(0.5, Some("resizing".into()));
// ... do work, check ctx.token().is_cancelled() ...
ctx.record_read_bytes(4_096);
ctx.record_write_bytes(1_024);
Ok(())
}
}
// 3. Build and run the scheduler.
let scheduler = Scheduler::builder()
.store_path("tasks.db")
.typed_executor::<Thumbnail, _>(Arc::new(ThumbnailExecutor))
.max_concurrency(4)
.with_resource_monitoring()
.build()
.await?;
// 4. Submit work.
let task = Thumbnail { path: "/photos/a.jpg".into(), size: 256 };
scheduler.submit_typed(&task).await?;
// 5. Run until cancelled.
let token = CancellationToken::new();
scheduler.run(token).await;§Common patterns
§Shared application state
Register shared services (database pools, HTTP clients, etc.) at build time
and retrieve them from any executor via TaskContext::state:
struct AppServices { db: DatabasePool, http: reqwest::Client }
let scheduler = Scheduler::builder()
.store_path("tasks.db")
.app_state(AppServices { /* ... */ })
.executor("ingest", Arc::new(IngestExecutor))
.build()
.await?;
// Inside the executor:
async fn execute<'a>(&'a self, ctx: &'a TaskContext) -> Result<(), TaskError> {
let svc = ctx.state::<AppServices>().expect("AppServices not registered");
svc.db.query("...").await?;
Ok(())
}State can also be injected after construction via
Scheduler::register_state — useful when a library (e.g. shoebox)
receives a pre-built scheduler from a parent application.
§Backpressure
Implement PressureSource to feed external signals into the scheduler’s
throttle decisions. The default ThrottlePolicy pauses BACKGROUND
tasks above 50% pressure and NORMAL tasks above 75%:
use std::sync::atomic::{AtomicU32, Ordering};
use taskmill::{PressureSource, Scheduler};
struct ApiLoad { active: AtomicU32, max: u32 }
impl PressureSource for ApiLoad {
fn pressure(&self) -> f32 {
self.active.load(Ordering::Relaxed) as f32 / self.max as f32
}
fn name(&self) -> &str { "api-load" }
}
let scheduler = Scheduler::builder()
.store_path("tasks.db")
.pressure_source(Box::new(ApiLoad { active: AtomicU32::new(0), max: 100 }))
// .throttle_policy(custom_policy) // optional override
.build()
.await?;§Events & progress
Subscribe to SchedulerEvents to drive a UI or collect metrics:
let mut rx = scheduler.subscribe();
tokio::spawn(async move {
while let Ok(event) = rx.recv().await {
match event {
SchedulerEvent::Progress { task_id, percent, message, .. } => {
update_progress_bar(task_id, percent, message);
}
SchedulerEvent::Completed { task_id, .. } => {
mark_done(task_id);
}
_ => {}
}
}
});For a single-call dashboard snapshot, use Scheduler::snapshot which
returns a serializable SchedulerSnapshot with queue depths, running
tasks, progress estimates, and backpressure.
§Group concurrency
Limit concurrent tasks within a named group — for example, cap uploads per S3 bucket:
let scheduler = Scheduler::builder()
.store_path("tasks.db")
.executor("upload-part", Arc::new(UploadPartExecutor))
.default_group_concurrency(4) // default for all groups
.group_concurrency("s3://hot-bucket", 8) // override for one group
.build()
.await?;
// Tasks declare their group via the submission:
let sub = TaskSubmission::new("upload-part")
.group("s3://my-bucket")
.payload_json(&part)?;
scheduler.submit(&sub).await?;
// Adjust at runtime:
scheduler.set_group_limit("s3://my-bucket", 2);
scheduler.remove_group_limit("s3://my-bucket"); // fall back to default§Child tasks
Spawn child tasks from an executor to model fan-out work. The parent
automatically waits for all children before its finalize
method is called:
impl TaskExecutor for MultipartUploadExecutor {
async fn execute<'a>(&'a self, ctx: &'a TaskContext) -> Result<(), TaskError> {
let upload: MultipartUpload = ctx.payload()?;
for part in &upload.parts {
ctx.spawn_child(
TaskSubmission::new("upload-part")
.key(&part.etag)
.priority(ctx.record().priority)
.payload_json(part)?
.expected_io(part.size as i64, 0),
).await?;
}
Ok(())
}
async fn finalize<'a>(&'a self, ctx: &'a TaskContext) -> Result<(), TaskError> {
// All parts uploaded — complete the multipart upload.
let upload: MultipartUpload = ctx.payload()?;
complete_multipart(&upload).await?;
Ok(())
}
}§How the dispatch loop works
Understanding the run loop helps when tuning SchedulerConfig:
- The loop wakes on three conditions: a new task was submitted (via
Notify), thepoll_intervalelapsed (default 500ms), or the cancellation token fired. - Paused tasks are resumed if no active preemptors exist at their priority level.
- Pending finalizers (parents whose children all completed) are dispatched first.
- The highest-priority pending task is peeked (without claiming it).
- The dispatch gate checks concurrency limits, group concurrency, IO budget, and backpressure. If the gate rejects, no slot is consumed.
- If admitted, the task is atomically claimed (
peek→pop_by_id) and spawned as a Tokio task. - Steps 4–6 repeat until the queue is empty or the gate rejects.
§Feature flags
sysinfo-monitor(default): Enables the built-inSysinfoSamplerfor cross-platform CPU and disk IO monitoring. Disable for mobile targets or when providing a customResourceSampler. Without this feature, callingSchedulerBuilder::with_resource_monitoringrequires a custom sampler viaresource_sampler().
Re-exports§
pub use backpressure::CompositePressure;pub use backpressure::PressureSource;pub use backpressure::ThrottlePolicy;pub use priority::Priority;pub use registry::TaskContext;pub use registry::TaskExecutor;pub use resource::network_pressure::NetworkPressure;pub use resource::sampler::SamplerConfig;pub use resource::ResourceReader;pub use resource::ResourceSampler;pub use resource::ResourceSnapshot;pub use scheduler::EstimatedProgress;pub use scheduler::GroupLimits;pub use scheduler::ProgressReporter;pub use scheduler::Scheduler;pub use scheduler::SchedulerBuilder;pub use scheduler::SchedulerConfig;pub use scheduler::SchedulerEvent;pub use scheduler::SchedulerSnapshot;pub use scheduler::ShutdownMode;pub use store::RetentionPolicy;pub use store::StoreConfig;pub use store::StoreError;pub use store::TaskStore;pub use task::generate_dedup_key;pub use task::HistoryStatus;pub use task::ParentResolution;pub use task::SubmitOutcome;pub use task::TaskError;pub use task::TaskHistoryRecord;pub use task::TaskLookup;pub use task::TaskMetrics;pub use task::TaskRecord;pub use task::TaskStatus;pub use task::TaskSubmission;pub use task::TypeStats;pub use task::TypedTask;pub use resource::platform_sampler;
Modules§
- backpressure
- Composable backpressure for throttling task dispatch.
- priority
- Priority levels for task scheduling.
- registry
- Executor registration, shared state, and the
TaskContextpassed to each task. - resource
- System resource monitoring for IO-aware scheduling.
- scheduler
- The scheduler: configuration, event stream, and the main run loop.
- store
- SQLite-backed persistence layer for the task queue and history.
- task
- Task types, submission parameters, and the
TypedTasktrait.