Expand description
§Taskmill
Adaptive priority work scheduler with IO-aware concurrency and SQLite persistence.
Taskmill provides a generic task scheduling system that:
- Persists tasks to SQLite so the queue survives restarts
- Schedules by priority (0 = highest, 255 = lowest) with named tiers
- Deduplicates tasks by key with configurable
DuplicateStrategy(skip, supersede, or reject) - Tracks expected and actual IO bytes (disk and network) per task for budget-based scheduling
- Monitors system CPU, disk, and network throughput to adjust concurrency
- Supports composable backpressure from arbitrary external sources
- Preempts lower-priority work when high-priority tasks arrive
- Retries failed tasks with configurable backoff
(
Constant,Linear,Exponential,ExponentialJitter) and per-type retry policies - Records completed/failed task history for queries and IO learning
- Supports batch submission with intra-batch dedup and chunking
- Emits lifecycle events including progress for UI integration
- Reports byte-level transfer progress with EWMA-smoothed throughput and ETA
- Supports task cancellation with history recording and cleanup hooks
- Supports task superseding for atomic cancel-and-replace
- Supports task TTL with automatic expiry, per-type defaults, and child inheritance
- Supports graceful shutdown with configurable drain timeout
§Concepts
§Task lifecycle
A task flows through a linear pipeline:
submit → blocked ─(deps met)─→ pending ──────────────→ running → completed
↑ ↓ ↘ paused ↗ ↘ failed (retryable → pending, with backoff delay)
(run_after elapsed) ↘ failed (permanent → history)
│ ↘ dead_letter (retries exhausted → history)
pending (gated) ↘ cancelled (via cancel() or supersede)
cancelled ↘ expired (TTL, cascade to children)
superseded
expired (TTL)
blocked ─(dep failed)─→ dep_failed (history)- Submit —
DomainHandle::submit(orsubmit_with,submit_batch) enqueues a typed task into the SQLite store. The domain handle auto-prefixes the task type with the domain name and applies defaults. - Pending — the task waits in a priority queue. The scheduler’s run loop pops the highest-priority pending task on each tick.
- Running — the scheduler calls the
TypedExecutor::executemethod with the deserialized payload and a [TaskContext] containing the task record, a cancellation token, and a progress reporter. - Terminal — on success the task moves to the history table. On failure,
a
retryableerror requeues it (up toSchedulerBuilder::max_retriesor per-typeRetryPolicy::max_retries) with a configurableBackoffStrategydelay; a non-retryable (permanent) error moves it to history as failed. Tasks that exhaust all retries enterdead_letterstate — queryable viaDomainHandle::dead_letter_tasksand manually re-submittable viaDomainHandle::retry_dead_letter.
§Deduplication & duplicate strategies
Every task has a dedup key derived from its type name and either an explicit
key string or the serialized payload (via SHA-256). What happens when a
submission’s key matches an existing task depends on the
DuplicateStrategy:
Skip(default) — attempt a priority upgrade or requeue, otherwise returnSubmitOutcome::Duplicate. Safe for idempotentsubmitcalls.Supersede— cancel the existing task (recording it in history asHistoryStatus::Superseded) and replace it with the new submission. For running tasks the cancellation token is fired, theon_cancelhook runs, and children are cascade-cancelled. ReturnsSubmitOutcome::Superseded.Reject— returnSubmitOutcome::Rejectedwithout modifying the existing task.
Within a single submit_batch call, intra-batch
dedup applies a last-wins policy: if two tasks share a dedup key, only
the last occurrence is submitted and earlier ones receive Duplicate.
§Priority & preemption
Priority is a u8 newtype where lower values = higher priority.
Named constants (REALTIME,
HIGH, NORMAL,
BACKGROUND, IDLE) cover
common tiers. When a task at or above the
preempt_priority threshold is
submitted, lower-priority running tasks are cancelled and paused so the
urgent work runs immediately.
§IO budgeting
Each task declares an IoBudget covering expected disk and network
bytes (via TypedTask::config() or TaskSubmission::expected_io).
The scheduler tracks running IO totals and, when
resource monitoring is enabled,
compares them against observed system throughput to avoid over-saturating
the disk or network. Executors report actual IO via
[TaskContext::record_read_bytes] / record_write_bytes /
record_net_rx_bytes /
record_net_tx_bytes,
which feeds back into historical throughput averages for future scheduling
decisions.
To throttle tasks when network bandwidth is saturated, set a bandwidth
cap with SchedulerBuilder::bandwidth_limit — this registers a built-in
NetworkPressure source that maps observed throughput to backpressure.
§Task groups
Tasks can be assigned to a named group via TaskSubmission::group (or
TypedTask::config()). The scheduler enforces per-group concurrency
limits — for example, limiting uploads to any single S3 bucket to 4
concurrent tasks. Configure limits at build time with
SchedulerBuilder::group_concurrency and
SchedulerBuilder::default_group_concurrency, or adjust at runtime via
Scheduler::set_group_limit and Scheduler::set_default_group_concurrency.
§Child tasks & two-phase execution
An executor can spawn child tasks via [TaskContext::spawn_child]. When
children exist, the parent enters a waiting state after its executor
returns. Once all children complete, the parent’s
TypedExecutor::finalize method is called — useful for assembly work
like CompleteMultipartUpload. If any child fails and
fail_fast is true (the default), siblings
are cancelled and the parent fails immediately.
§Task TTL & automatic expiry
Tasks can be given a time-to-live (TTL) so they expire automatically if they haven’t started running within the allowed window. TTL is resolved with a priority chain: per-task > per-type > global default > none.
The TTL clock can start at submission time (TtlFrom::Submission, the
default) or when the task is first dispatched (TtlFrom::FirstAttempt).
Expired tasks are moved to history with HistoryStatus::Expired and a
SchedulerEvent::TaskExpired event is emitted.
The scheduler catches expired tasks in two places:
- Dispatch time — when a task is about to be dispatched, its
expires_atis checked first. - Periodic sweep — a background sweep (default every 30s, configurable
via
SchedulerBuilder::expiry_sweep_interval) batch-expires pending and paused tasks whose deadline has passed.
When a parent task expires, its pending and paused children are cascade-expired.
Child tasks without an explicit TTL inherit the remaining parent TTL
(with TtlFrom::Submission), so a child can never outlive its parent’s
deadline.
§Task metadata tags
Tasks can carry schema-free key-value metadata tags for filtering, grouping, and display — without deserializing the task payload. Tags are immutable after submission and are persisted, indexed, and queryable.
Set tags per-task via TaskSubmission::tag, per-type via
TypedTask::tags, or as batch defaults via
BatchSubmission::default_tag. Tag keys and values are validated at submit
time against MAX_TAG_KEY_LEN, MAX_TAG_VALUE_LEN, and
MAX_TAGS_PER_TASK.
Child tasks inherit parent tags by default (child tags take precedence).
Tags are copied to history on all terminal transitions and are included in
TaskEventHeader for event subscribers.
Query by tags via the domain handle with DomainHandle::tasks_by_tags
(AND semantics).
§Delayed & scheduled tasks
A task can declare when it becomes eligible for dispatch:
- Immediate (default) — dispatched as soon as a slot is free.
- Delayed (one-shot) —
TaskSubmission::run_afterorTaskSubmission::run_atsets arun_aftertimestamp. The task enterspendingimmediately but is invisible to the dispatch loop until its timestamp passes. - Recurring —
TaskSubmission::recurringorTaskSubmission::recurring_scheduleconfigures automatic re-enqueueing. After each execution, the scheduler creates a new pending instance withrun_afterset tonow + interval.
Delayed tasks interact naturally with other features:
- Priority:
run_aftertasks respect normal priority ordering among eligible tasks. - TTL: A delayed task’s TTL clock ticks from submission (or first
attempt). If the TTL expires before
run_after, the task expires. - Dedup: Recurring tasks reuse the same dedup key. Pile-up prevention skips creating a new instance if the previous one is still pending.
- Parent/Child: Recurring tasks cannot be children (enforced at submit).
Recurring schedules can be paused, resumed, or cancelled via
DomainHandle::pause_recurring, DomainHandle::resume_recurring, and
DomainHandle::cancel_recurring. Pausing stops new instances from being
created without affecting any currently running instance.
The scheduler optimizes idle wakeups: when the next scheduled task is far
in the future, the run loop sleeps until min(poll_interval, next_run_after)
instead of waking every poll interval.
§Task dependencies
Tasks can declare dependencies on other tasks so they only become eligible for dispatch after their prerequisites complete. Dependencies are peer-to-peer relationships — distinct from the parent-child hierarchy used for fan-out/finalize patterns. Parent-child means “I spawned you and I finalize after you.” A dependency means “I cannot start until you finish.” The two compose orthogonally: a child task can depend on an unrelated peer, and a parent can depend on another peer.
§Blocked status
A task with unresolved dependencies enters the TaskStatus::Blocked
state. Blocked tasks are invisible to the dispatch loop — the existing
WHERE status = 'pending' filter excludes them automatically. Resolution
is event-driven: when a dependency completes, the scheduler checks whether
the dependent’s remaining edges have all been satisfied. If so, the task
transitions to pending and becomes eligible for dispatch. If a dependency
was already completed at submission time, its edge is skipped entirely; if
all dependencies are already complete, the task starts as pending
immediately.
§Failure policy
DependencyFailurePolicy controls what happens when a dependency fails
permanently (after exhausting retries):
Cancel(default) — the dependent is moved to history withHistoryStatus::DependencyFailedand its own dependents are cascade-failed.Fail— same terminal status, but does not cascade to other dependents in the same chain (useful for manual intervention).Ignore— the failed edge is removed and, if no other edges remain, the dependent is unblocked. Use with caution — the dependent must tolerate missing upstream results.
§Circular dependency detection
At submission time the scheduler walks the dependency graph upward from
each declared dependency using iterative BFS (bounded stack depth). If
the new task’s ID is encountered during the walk, submission fails with
a StoreError::CyclicDependency error. This catches both direct
cycles (A depends on B, B depends on A) and transitive cycles
(A → B → C → A).
§Interaction with other features
- Dedup: a blocked task still occupies its dedup key. Duplicate
submissions follow normal
DuplicateStrategyrules. - TTL: a blocked task’s TTL clock ticks normally. If the TTL expires
while blocked, the task moves to history as
HistoryStatus::Expiredand its edges are cleaned up. - Recurring: recurring tasks can declare dependencies. Each generated
instance starts as
blockedindependently — useful for “run B every hour, but only after A’s latest run completes.” - Delayed:
run_afterand dependencies compose. A task with both starts asblocked, transitions topendingwhen deps are met, but is still gated byrun_afterin the dispatch query. - Groups: blocked tasks are not dispatched, so they do not count against group concurrency limits.
§Cancellation
Tasks can be cancelled via the DomainHandle — individually with
DomainHandle::cancel, all at once with DomainHandle::cancel_all,
or by predicate with DomainHandle::cancel_where. Cancelled tasks are
recorded in the history table as HistoryStatus::Cancelled rather than
silently deleted.
For running tasks, cancellation fires the
on_cancel hook (with a configurable
cancel_hook_timeout) so
executors can clean up external resources — for example, aborting an S3
multipart upload. Executors can check for cancellation cooperatively via
[TaskContext::check_cancelled].
Cancelling a parent task cascade-cancels all its children.
§Byte-level progress
For long-running transfers (file copies, uploads, downloads), executors can
report byte-level progress via [TaskContext::set_bytes_total] and
[TaskContext::add_bytes]. The scheduler maintains per-task atomic counters
on the IoTracker — updates are lock-free and
impose no overhead on the executor hot path.
When SchedulerBuilder::progress_interval is set, a background ticker
task polls these counters and emits TaskProgress events on a dedicated
broadcast channel (via Scheduler::subscribe_progress). Each event
includes EWMA-smoothed throughput and an estimated time remaining (ETA).
The ticker is opt-in — when not configured, there is zero runtime cost.
For a one-shot query without the ticker, Scheduler::byte_progress
returns instantaneous snapshots (throughput = 0, no ETA).
§Quick start
use taskmill::{
Domain, DomainKey, DomainHandle, Scheduler, TypedExecutor,
TaskContext, TaskError, TypedTask, TaskTypeConfig, IoBudget, Priority,
};
use serde::{Serialize, Deserialize};
use tokio_util::sync::CancellationToken;
// 1. Define a domain (typed module identity).
struct Media;
impl DomainKey for Media { const NAME: &'static str = "media"; }
// 2. Define a typed task payload.
#[derive(Serialize, Deserialize)]
struct Thumbnail { path: String, size: u32 }
impl TypedTask for Thumbnail {
type Domain = Media;
const TASK_TYPE: &'static str = "thumbnail";
fn config() -> TaskTypeConfig {
TaskTypeConfig::new()
.expected_io(IoBudget::disk(4_096, 1_024))
}
}
// 3. Implement the typed executor.
struct ThumbnailExecutor;
impl TypedExecutor<Thumbnail> for ThumbnailExecutor {
async fn execute(&self, thumb: Thumbnail, ctx: DomainTaskContext<'_, Media>) -> Result<(), TaskError> {
ctx.progress().report(0.5, Some("resizing".into()));
// ... do work, check ctx.token().is_cancelled() ...
ctx.record_read_bytes(4_096);
ctx.record_write_bytes(1_024);
Ok(())
}
}
// 4. Build and run the scheduler.
let scheduler = Scheduler::builder()
.store_path("tasks.db")
.domain(Domain::<Media>::new().task::<Thumbnail>(ThumbnailExecutor))
.max_concurrency(4)
.with_resource_monitoring()
.build()
.await?;
// 5. Submit work via a typed domain handle.
let media: DomainHandle<Media> = scheduler.domain::<Media>();
media.submit(Thumbnail { path: "/photos/a.jpg".into(), size: 256 }).await?;
// 6. Run until cancelled.
let token = CancellationToken::new();
scheduler.run(token).await;§Common patterns
§Shared application state
Register shared services (database pools, HTTP clients, etc.) at build time
and retrieve them from any executor via [TaskContext::state]. State can be
domain-scoped (checked first) or global (fallback):
struct AppServices { db: DatabasePool, http: reqwest::Client }
struct IngestConfig { bucket: String }
struct Ingest;
impl DomainKey for Ingest { const NAME: &'static str = "ingest"; }
let scheduler = Scheduler::builder()
.store_path("tasks.db")
.app_state(AppServices { /* ... */ }) // global — all domains
.domain(Domain::<Ingest>::new()
.task::<FetchTask>(FetchExecutor)
.state(IngestConfig { bucket: "...".into() })) // domain-scoped
.build()
.await?;
// Inside an ingest executor — domain state checked first, then global:
async fn execute(&self, task: FetchTask, ctx: DomainTaskContext<'_, Ingest>) -> Result<(), TaskError> {
let cfg = ctx.state::<IngestConfig>().expect("IngestConfig not registered");
let svc = ctx.state::<AppServices>().expect("AppServices not registered");
svc.db.query("...").await?;
Ok(())
}State can also be injected globally after construction via
Scheduler::register_state — useful when a library
receives a pre-built scheduler from a parent application.
§Backpressure
Implement PressureSource to feed external signals into the scheduler’s
throttle decisions. The default ThrottlePolicy pauses BACKGROUND
tasks above 50% pressure and NORMAL tasks above 75%:
use std::sync::atomic::{AtomicU32, Ordering};
use taskmill::{PressureSource, Scheduler};
struct ApiLoad { active: AtomicU32, max: u32 }
impl PressureSource for ApiLoad {
fn pressure(&self) -> f32 {
self.active.load(Ordering::Relaxed) as f32 / self.max as f32
}
fn name(&self) -> &str { "api-load" }
}
let scheduler = Scheduler::builder()
.store_path("tasks.db")
.pressure_source(Box::new(ApiLoad { active: AtomicU32::new(0), max: 100 }))
// .throttle_policy(custom_policy) // optional override
.build()
.await?;§Events & progress
Subscribe to SchedulerEvents to drive a UI or collect metrics:
let mut rx = scheduler.subscribe();
tokio::spawn(async move {
while let Ok(event) = rx.recv().await {
match event {
SchedulerEvent::Progress { header, percent, message } => {
update_progress_bar(header.task_id, percent, message);
}
SchedulerEvent::Completed(header) => {
mark_done(header.task_id);
}
_ => {}
}
}
});For byte-level transfer progress with smoothed throughput and ETA, subscribe to the dedicated progress channel:
let mut progress_rx = scheduler.subscribe_progress();
tokio::spawn(async move {
while let Ok(tp) = progress_rx.recv().await {
println!(
"{}: {}/{} bytes ({:.0} B/s, ETA {:?})",
tp.key, tp.bytes_completed, tp.bytes_total.unwrap_or(0),
tp.throughput_bps, tp.eta,
);
}
});For a single-call dashboard snapshot, use Scheduler::snapshot which
returns a serializable SchedulerSnapshot with queue depths, running
tasks, progress estimates, byte-level progress, and backpressure.
§Group concurrency
Limit concurrent tasks within a named group — for example, cap uploads per S3 bucket:
struct Uploads;
impl DomainKey for Uploads { const NAME: &'static str = "uploads"; }
let scheduler = Scheduler::builder()
.store_path("tasks.db")
.domain(Domain::<Uploads>::new()
.task::<UploadPart>(UploadPartExecutor))
.default_group_concurrency(4) // default for all groups
.group_concurrency("s3://hot-bucket", 8) // override for one group
.build()
.await?;
// Tasks declare their group via TypedTask::config() or per-call override:
let uploads: DomainHandle<Uploads> = scheduler.domain::<Uploads>();
uploads.submit_with(UploadPart { etag: "abc".into() })
.group("s3://my-bucket")
.await?;
// Adjust at runtime:
scheduler.set_group_limit("s3://my-bucket", 2);
scheduler.remove_group_limit("s3://my-bucket"); // fall back to default§Batch submission
Submit many tasks at once via DomainHandle::submit_batch:
let uploads: DomainHandle<Uploads> = scheduler.domain::<Uploads>();
let tasks = vec![
UploadPart { etag: "file-1".into() },
UploadPart { etag: "file-2".into() },
];
let outcomes = uploads.submit_batch(tasks).await?;For untyped batch submission with batch-wide defaults, use BatchSubmission
and Scheduler::submit_built.
A SchedulerEvent::BatchSubmitted event is emitted for observability
whenever at least one task in the batch was inserted.
§Child tasks
Spawn child tasks from an executor to model fan-out work. The parent
automatically waits for all children before its finalize
method is called. spawn_child is domain-aware: the task type is
auto-prefixed with the owning domain’s namespace.
impl TypedExecutor<MultipartUpload> for MultipartUploadExecutor {
async fn execute(&self, upload: MultipartUpload, ctx: DomainTaskContext<'_, Uploads>) -> Result<(), TaskError> {
for part in &upload.parts {
ctx.spawn_child_with(UploadPart { etag: part.etag.clone(), size: part.size })
.key(&part.etag)
.priority(ctx.record().priority)
.await?;
}
Ok(())
}
async fn finalize(&self, upload: MultipartUpload, ctx: DomainTaskContext<'_, Uploads>) -> Result<(), TaskError> {
// All parts uploaded — complete the multipart upload.
complete_multipart(&upload).await?;
Ok(())
}
}For cross-domain children, use DomainSubmitBuilder::parent via ctx.domain():
let storage = ctx.domain::<Storage>();
storage.submit_with(Upload { ... })
.parent(ctx.record().id)
.await?;§Cancellation & cleanup hooks
Cancel tasks individually or in bulk. Implement
on_cancel to clean up external resources:
impl TypedExecutor<Upload> for UploadExecutor {
async fn execute(&self, upload: Upload, ctx: DomainTaskContext<'_, Uploads>) -> Result<(), TaskError> {
// Cooperatively check for cancellation in long loops.
for chunk in upload.chunks() {
ctx.check_cancelled()?;
upload_chunk(chunk).await?;
}
Ok(())
}
async fn on_cancel(&self, upload: Upload, ctx: DomainTaskContext<'_, Uploads>) -> Result<(), TaskError> {
// Abort the in-progress multipart upload.
abort_multipart(&upload.upload_id).await?;
Ok(())
}
}
// Cancel by ID through the domain handle:
let uploads: DomainHandle<Uploads> = scheduler.domain::<Uploads>();
uploads.cancel(task_id).await?;
// Bulk cancel all tasks in a domain:
uploads.cancel_all().await?;
// Cancel by predicate:
uploads.cancel_where(|t| t.priority == Priority::BACKGROUND).await?;§Task TTL
Set a TTL on a task type via TypedTask::config(), per-call via
DomainSubmitBuilder::ttl, or as a global default:
use std::time::Duration;
use taskmill::{TypedTask, TaskTypeConfig, TtlFrom};
// Per-type TTL — every Thumbnail task gets a 10-minute TTL.
impl TypedTask for Thumbnail {
type Domain = Media;
const TASK_TYPE: &'static str = "thumbnail";
fn config() -> TaskTypeConfig {
TaskTypeConfig::new()
.ttl(Duration::from_secs(600))
.ttl_from(TtlFrom::Submission)
}
}
// Per-call override:
media.submit_with(Thumbnail { path: "/img.jpg".into(), size: 256 })
.ttl(Duration::from_secs(300))
.await?;
// Global default — catch-all for any task without a per-type TTL.
let scheduler = Scheduler::builder()
.store_path("tasks.db")
.default_ttl(Duration::from_secs(3600)) // 1 hour
.build()
.await?;§Scheduled tasks
Delay a task or create recurring schedules:
use std::time::Duration;
use taskmill::{TaskSubmission, RecurringSchedule};
let media: DomainHandle<Media> = scheduler.domain::<Media>();
// One-shot delay — dispatch after 30 seconds.
media.submit_with(Cleanup { target: "stale-uploads".into() })
.run_after(Duration::from_secs(30))
.await?;
// For recurring scheduling, configure it in `TypedTask::config()`:
// impl TypedTask for Cleanup {
// fn config() -> TaskTypeConfig {
// TaskTypeConfig::new().recurring(RecurringSchedule {
// interval: Duration::from_secs(6 * 3600),
// initial_delay: None,
// max_executions: None,
// })
// }
// }
media.submit_with(Cleanup { target: "stale-uploads".into() })
.key("stale-uploads")
.await?;
// Pause/resume/cancel recurring schedules via the domain handle.
media.pause_recurring(task_id).await?;
media.resume_recurring(task_id).await?;
media.cancel_recurring(task_id).await?;§Task chains
Use DomainSubmitBuilder::depends_on to build dependency chains between
independent tasks. Unlike parent-child relationships (which model
fan-out from a single executor), chains connect separately submitted
tasks into ordered workflows.
§Sequential chain
Upload a file, verify its checksum, then delete the local copy:
let pipeline: DomainHandle<Pipeline> = scheduler.domain::<Pipeline>();
let upload = pipeline.submit(UploadFile { key: "file-a".into() }).await?;
let verify = pipeline.submit_with(VerifyChecksum { key: "file-a".into() })
.depends_on(upload.id().unwrap())
.await?;
pipeline.submit_with(DeleteLocal { key: "file-a".into() })
.depends_on(verify.id().unwrap())
.await?;§Fan-in
Multiple uploads converging on a single finalize step:
let pipeline: DomainHandle<Pipeline> = scheduler.domain::<Pipeline>();
let mut upload_ids = Vec::new();
for part in &parts {
let outcome = pipeline.submit(part.clone()).await?;
upload_ids.push(outcome.id().unwrap());
}
pipeline.submit_with(FinalizeUpload { key: "finalize".into() })
.depends_on_all(upload_ids)
.await?;§Diamond dependency
Task A fans out to B and C, which both converge on D:
let pipeline: DomainHandle<Pipeline> = scheduler.domain::<Pipeline>();
let a = pipeline.submit(Extract { key: "a".into() }).await?;
let a_id = a.id().unwrap();
let b = pipeline.submit_with(TransformX { key: "b".into() })
.depends_on(a_id)
.await?;
let c = pipeline.submit_with(TransformY { key: "c".into() })
.depends_on(a_id)
.await?;
pipeline.submit_with(Load { key: "d".into() })
.depends_on_all([b.id().unwrap(), c.id().unwrap()])
.await?;§Task superseding
Use DuplicateStrategy::Supersede for “latest-value-wins” scenarios
like continuous file sync, where re-submitting an already-queued task
should atomically cancel the old one and replace it. Set it via
TypedTask::config():
impl TypedTask for SyncFile {
type Domain = Sync;
const TASK_TYPE: &'static str = "sync-file";
fn config() -> TaskTypeConfig {
TaskTypeConfig::new().on_duplicate(DuplicateStrategy::Supersede)
}
fn key(&self) -> Option<String> { Some(self.path.clone()) }
}
let sync: DomainHandle<Sync> = scheduler.domain::<Sync>();
let outcome = sync.submit(SyncFile { path: "path/to/file.txt".into() }).await?;
// outcome is Superseded { new_task_id, replaced_task_id } if a duplicate existed,
// or Inserted(id) if this was the first submission.§How the dispatch loop works
Understanding the run loop helps when tuning SchedulerConfig:
- The loop wakes on three conditions: a new task was submitted (via
Notify), thepoll_intervalelapsed (default 500ms), or the cancellation token fired. - Expired tasks are swept (if the sweep interval has elapsed).
- Paused tasks are resumed if no active preemptors exist at their priority level.
- Pending finalizers (parents whose children all completed) are dispatched first.
- The highest-priority pending task is peeked (without claiming it). If it has expired, it is moved to history and the next candidate is tried.
- The dispatch gate checks concurrency limits, group concurrency, IO budget, and backpressure. If the gate rejects, no slot is consumed.
- If admitted, the task is atomically claimed (
peek→pop_by_id) and spawned as a Tokio task. - Steps 5–7 repeat until the queue is empty or the gate rejects.
§Feature flags
sysinfo-monitor(default): Enables the built-inSysinfoSamplerfor cross-platform CPU and disk IO monitoring. Disable for mobile targets or when providing a customResourceSampler. Without this feature, callingSchedulerBuilder::with_resource_monitoringrequires a custom sampler viaresource_sampler().
Re-exports§
pub use domain::Domain;pub use domain::DomainHandle;pub use domain::DomainKey;pub use domain::DomainSubmitBuilder;pub use domain::TaskEvent;pub use domain::TaskTypeConfig;pub use domain::TaskTypeOptions;pub use domain::TypedEventStream;pub use domain::TypedExecutor;pub use registry::ChildSpawnBuilder;pub use registry::DomainTaskContext;pub use backpressure::CompositePressure;pub use backpressure::PressureSource;pub use backpressure::ThrottlePolicy;pub use priority::Priority;pub use resource::network_pressure::NetworkPressure;pub use resource::sampler::SamplerConfig;pub use resource::ResourceReader;pub use resource::ResourceSampler;pub use resource::ResourceSnapshot;pub use scheduler::EstimatedProgress;pub use scheduler::GroupLimits;pub use scheduler::ProgressReporter;pub use scheduler::Scheduler;pub use scheduler::SchedulerBuilder;pub use scheduler::SchedulerConfig;pub use scheduler::SchedulerEvent;pub use scheduler::SchedulerSnapshot;pub use scheduler::ShutdownMode;pub use scheduler::TaskEventHeader;pub use scheduler::TaskProgress;pub use store::RetentionPolicy;pub use store::StoreConfig;pub use store::StoreError;pub use store::TaskStore;pub use task::generate_dedup_key;pub use task::BackoffStrategy;pub use task::BatchOutcome;pub use task::BatchSubmission;pub use task::DependencyFailurePolicy;pub use task::DuplicateStrategy;pub use task::HistoryStatus;pub use task::IoBudget;pub use task::ParentResolution;pub use task::RecurringSchedule;pub use task::RecurringScheduleInfo;pub use task::RetryPolicy;pub use task::SubmitOutcome;pub use task::TaskError;pub use task::TaskHistoryRecord;pub use task::TaskLookup;pub use task::TaskRecord;pub use task::TaskStatus;pub use task::TaskSubmission;pub use task::TtlFrom;pub use task::TypeStats;pub use task::TypedTask;pub use task::MAX_TAGS_PER_TASK;pub use task::MAX_TAG_KEY_LEN;pub use task::MAX_TAG_VALUE_LEN;pub use resource::platform_sampler;
Modules§
- backpressure
- Composable backpressure for throttling task dispatch.
- domain
- Domain-centric API — typed module identity, typed executors, and typed handles.
- priority
- Priority levels for task scheduling.
- registry
- Executor registration, shared state, and the [
TaskContext] passed to each task. - resource
- System resource monitoring for IO-aware scheduling.
- scheduler
- The scheduler: core types, configuration, and the main run loop.
- store
- SQLite-backed persistence layer for the task queue and history.
- task
- Task types, submission parameters, and the
TypedTasktrait.