Skip to main content

Crate taskmill

Crate taskmill 

Source
Expand description

§Taskmill

Adaptive priority work scheduler with IO-aware concurrency and SQLite persistence.

Taskmill provides a generic task scheduling system that:

  • Persists tasks to SQLite so the queue survives restarts
  • Schedules by priority (0 = highest, 255 = lowest) with named tiers
  • Deduplicates tasks by key — submitting an already-queued key is a no-op
  • Tracks expected and actual IO bytes per task for budget-based scheduling
  • Monitors system CPU and disk throughput to adjust concurrency
  • Supports composable backpressure from arbitrary external sources
  • Preempts lower-priority work when high-priority tasks arrive
  • Retries failed tasks at the same priority level
  • Records completed/failed task history for queries and IO learning
  • Emits lifecycle events including progress for UI integration (via broadcast channel)
  • Supports graceful shutdown with configurable drain timeout

§Quick start

use std::sync::Arc;
use taskmill::{
    Scheduler, TaskExecutor, TaskContext, TaskResult, TaskError,
    TypedTask, Priority,
};
use serde::{Serialize, Deserialize};
use tokio_util::sync::CancellationToken;

// 1. Define a task payload.
#[derive(Serialize, Deserialize)]
struct Thumbnail { path: String, size: u32 }

impl TypedTask for Thumbnail {
    const TASK_TYPE: &'static str = "thumbnail";
    fn expected_read_bytes(&self) -> i64 { 4_096 }
    fn expected_write_bytes(&self) -> i64 { 1_024 }
}

// 2. Implement the executor.
struct ThumbnailExecutor;

impl TaskExecutor for ThumbnailExecutor {
    async fn execute<'a>(
        &'a self, ctx: &'a TaskContext,
    ) -> Result<TaskResult, TaskError> {
        let thumb: Thumbnail = ctx.deserialize_typed().unwrap().unwrap();
        ctx.progress.report(0.5, Some("resizing".into()));
        // ... do work, check ctx.token.is_cancelled() ...
        Ok(TaskResult { actual_read_bytes: 4_096, actual_write_bytes: 1_024 })
    }
}

// 3. Build and run the scheduler.
let scheduler = Scheduler::builder()
    .store_path("tasks.db")
    .typed_executor::<Thumbnail, _>(Arc::new(ThumbnailExecutor))
    .max_concurrency(4)
    .with_resource_monitoring()
    .build()
    .await?;

// 4. Submit work.
let task = Thumbnail { path: "/photos/a.jpg".into(), size: 256 };
scheduler.submit_typed(&task).await?;

// 5. Run until cancelled.
let token = CancellationToken::new();
scheduler.run(token).await;

§Feature flags

  • sysinfo-monitor (default): Enables the built-in SysinfoSampler for cross-platform CPU and disk IO monitoring. Disable for mobile targets or when providing a custom ResourceSampler.

Re-exports§

pub use backpressure::CompositePressure;
pub use backpressure::PressureSource;
pub use backpressure::ThrottlePolicy;
pub use priority::Priority;
pub use registry::TaskContext;
pub use registry::TaskExecutor;
pub use resource::sampler::SamplerConfig;
pub use resource::ResourceReader;
pub use resource::ResourceSampler;
pub use resource::ResourceSnapshot;
pub use scheduler::EstimatedProgress;
pub use scheduler::ProgressReporter;
pub use scheduler::Scheduler;
pub use scheduler::SchedulerBuilder;
pub use scheduler::SchedulerConfig;
pub use scheduler::SchedulerEvent;
pub use scheduler::SchedulerSnapshot;
pub use scheduler::ShutdownMode;
pub use store::RetentionPolicy;
pub use store::StoreConfig;
pub use store::StoreError;
pub use store::TaskStore;
pub use task::generate_dedup_key;
pub use task::HistoryStatus;
pub use task::ParentResolution;
pub use task::SubmitOutcome;
pub use task::TaskError;
pub use task::TaskHistoryRecord;
pub use task::TaskLookup;
pub use task::TaskRecord;
pub use task::TaskResult;
pub use task::TaskStatus;
pub use task::TaskSubmission;
pub use task::TypeStats;
pub use task::TypedTask;
pub use resource::platform_sampler;

Modules§

backpressure
Composable backpressure for throttling task dispatch.
priority
Priority levels for task scheduling.
registry
Executor registration, shared state, and the TaskContext passed to each task.
resource
System resource monitoring for IO-aware scheduling.
scheduler
The scheduler: configuration, event stream, and the main run loop.
store
SQLite-backed persistence layer for the task queue and history.
task
Task types, submission parameters, and the TypedTask trait.