rs-task-queue
In-process thread-based task queue with priority and concurrency control
Installation
[]
= "0.4.0"
Usage
use ;
// Create a queue with 4 worker threads
let queue = new;
// Submit a task (Normal priority by default)
let handle = queue.submit;
// Block until the task completes
let result = handle.join.unwrap;
assert_eq!;
// Submit with explicit priority
let handle = queue.submit_with_priority;
assert_eq!;
// Check completion without blocking
let handle = queue.submit;
// ... do other work ...
if handle.is_done
// Graceful shutdown: finishes running tasks, drops pending ones
queue.shutdown;
Stats
Get a real-time snapshot of queue activity:
use TaskQueue;
let queue = new;
queue.submit.join.unwrap;
let stats = queue.stats;
println!;
queue.shutdown;
Drain
Graceful shutdown that completes all pending tasks instead of dropping them:
use TaskQueue;
use Arc;
use ;
let queue = new;
let counter = new;
for _ in 0..10
// Waits for all 10 tasks to finish, then shuts down
queue.drain;
assert_eq!;
Callbacks
Register a callback that fires after every task completes:
use TaskQueue;
let queue = new;
queue.on_complete;
queue.submit.join.unwrap;
queue.shutdown;
Queue Capacity Limit
Limit the queue size to apply backpressure:
use ;
// Allow at most 100 pending tasks
let queue = with_capacity;
let handle = queue.submit;
assert_eq!;
queue.shutdown;
Pause and Resume
Temporarily stop processing without shutting down:
use TaskQueue;
let queue = new;
queue.pause;
queue.submit;
queue.resume; // workers start processing again
queue.shutdown;
Error Handling
submit and submit_with_priority always return a TaskHandle — errors surface
when you call join(). The two most common error cases are backpressure
(TaskError::QueueFull) and task panics (TaskError::Panicked).
Retry with backoff on QueueFull:
use ;
use thread;
use Duration;
let queue = with_capacity;
let mut delay = from_millis;
let result = loop ;
let _ = result;
queue.shutdown;
Graceful degradation on Panicked:
use ;
let queue = new;
let handle = queue.submit;
let value = match handle.join ;
assert_eq!;
queue.shutdown;
Queue Depth
Check how many tasks are waiting:
use TaskQueue;
let queue = new;
println!;
queue.shutdown;
API
| Item | Description |
|---|---|
TaskQueue::new(concurrency) |
Create a queue with N worker threads |
TaskQueue::with_capacity(concurrency, max_queued) |
Create a queue with a maximum pending task limit |
queue.submit(task) |
Submit a task at Normal priority; returns TaskHandle<T> |
queue.submit_with_priority(priority, task) |
Submit a task at the given priority; returns TaskHandle<T> |
queue.stats() |
Return a TaskQueueStats snapshot (submitted, completed, failed, in-flight, latency) |
stats.average_latency() |
Option<Duration> average enqueue-to-completion latency, None if no tasks have finished |
queue.drain() |
Stop accepting tasks, wait for all queued tasks to finish, then shut down |
queue.on_complete(callback) |
Register a Fn(bool, Duration) callback fired after each task |
queue.pause() |
Temporarily stop processing tasks |
queue.resume() |
Resume processing after pause |
queue.is_paused() |
Check if processing is paused |
queue.pending_count() |
Get number of tasks waiting in the queue |
queue.shutdown() |
Signal workers to stop, wait for running tasks, drop pending |
handle.join() |
Block until the task completes; returns Result<T, TaskError> |
handle.is_done() |
Check if the task has completed without blocking |
Priority::High |
Highest execution priority |
Priority::Normal |
Default execution priority |
Priority::Low |
Lowest execution priority |
TaskError::Panicked |
Task panicked during execution |
TaskError::Cancelled |
Task was dropped during shutdown before it ran |
TaskError::QueueFull |
Task rejected because queue is at capacity |
Development
Support
If you find this project useful: