Taskvisor
Lightweight, event-driven task supervision for async Rust.
Runs your background tasks, restarts them on failure with configurable backoff, and emits structured events for every lifecycle change.
TaskFn (your async code)
▼
TaskSpec (task + RestartPolicy + BackoffPolicy + timeout + max_retries)
▼
Supervisor
├──► Registry
│ └──► TaskActor (per task)
│ ├──► attempt loop
│ │ ├──► run task with timeout + cancellation token
│ │ ├──► apply RestartPolicy on Ok/Err
│ │ └──► apply BackoffPolicy on failure
│ └──► publish events to Bus
└──► Bus (broadcast channel)
├──► AliveTracker (sequence-based liveness)
└──► Subscribers (own queue each; your metrics, logs, alerts)
Why taskvisor?
Tokio gives you spawn and JoinHandle, but no supervision: no restart policies, no backoff, and no events to tell you what happened.
If a spawned task panics or fails, you only find out when you poll its JoinHandle - or never.
Taskvisor fills that gap:
| Feature | What you get | |
|---|---|---|
| 🔁 | Restart policies | Never, OnFailure, or Always { interval }: chosen per task |
| ⏳ | Backoff with jitter | Exponential, constant, or decorrelated; spreads retries out in time |
| 📡 | Structured events | Every start, stop, failure, timeout, and backoff on a broadcast bus |
| 🔌 | Pluggable subscribers | Implement one method (on_event) for metrics, alerts, or logging |
| 🎛️ | Dynamic management | Add, remove, cancel tasks at runtime via SupervisorHandle |
| 🚦 | Admission control | Optional slot controller: Queue / Replace / DropIfRunning |
| 🚧 | Concurrency limits | Global semaphore, per-task timeouts, max retries |
Taskvisor is not a replacement for tokio or tower. It works at a higher level: you write the task, and taskvisor runs it, restarts it on failure with backoff, and tells you what happened through events.
It is also not an actor framework: there are no addressable actors, mailboxes, or message passing. Taskvisor supervises tasks (plain async functions), not actors.
Roadmap: taskvisor is the supervision core of Solti, a larger task-orchestration toolkit in development on top of it (subprocess execution, HTTP/gRPC API, metrics, dashboards).
Taskvisor stands on its own today: the rest is future work.
Quick start
[]
= "0.2"
= { = "1", = ["full"] }
A task that prints "pong" every 10 seconds, restarts forever, and shuts down on Ctrl+C:
use Duration;
use *;
async
See it recover from failure
A task that fails twice, then succeeds - taskvisor retries it with backoff and publishes an event for every step. A subscriber prints them, so you see the supervision happen:
use Arc;
use ;
use Duration;
use *;
/// A subscriber that prints every lifecycle event the supervisor emits for a task.
;
async
# output:
TaskAddRequested (task=flaky)
TaskAdded (task=flaky)
TaskStarting (task=flaky) # attempt 1
TaskFailed (task=flaky)
BackoffScheduled (task=flaky) # wait, then retry
TaskStarting (task=flaky) # attempt 2
TaskFailed (task=flaky)
BackoffScheduled (task=flaky)
TaskStarting (task=flaky) # attempt 3
TaskStopped (task=flaky) # success
ActorExhausted (task=flaky) # OnFailure + success = done
TaskRemoved (task=flaky)
Restart, backoff, and an event for every step, without writing a retry loop.
See examples/metrics.rs for a fuller version (cargo run --example metrics).
Two modes
| Mode | When to use | Lifecycle |
|---|---|---|
sup.run(specs) |
Tasks known upfront | Blocks until all done or Ctrl+C |
sup.serve() |
Tasks added at runtime | Returns SupervisorHandle, you control shutdown |
// Dynamic mode
let handle = sup.serve;
let id = handle.add?; // returns a TaskId
handle.cancel.await?; // cancel by identity
handle.remove?; // or remove by identity
handle.cancel_by_label.await?; // ...or by label
let alive = handle.is_alive.await;
let tasks = handle.list.await; // Vec<(TaskId, name)>
handle.shutdown.await?;
Core concepts
Task & TaskFn - A Task is any Send + Sync + 'static type that implements
TaskFn wraps a closure into a Task so you don't need a struct. TaskRef is just Arc<dyn Task>.
TaskSpec - Bundles a task with its policies: restart, backoff, timeout, and max retries. This is what you pass to the supervisor.
// One-shot (run once, never restart)
let spec = once;
// Restartable (restart on failure, stop on success)
let spec = restartable;
// Full control
let spec = new
.with_max_retries;
RestartPolicy - Controls when a task restarts after it exits:
| Policy | Behavior |
|---|---|
Never |
Runs once, then stops. |
OnFailure (default) |
Restarts only after an error; stops on success. |
Always { interval } |
Always restarts. interval: Some(10s) waits between runs. |
BackoffPolicy - Controls retry delay after failure.
Delay for attempt n = first * factor^n, capped at max, then jitter is applied:
| Field | Default | Meaning |
|---|---|---|
first |
100ms |
Initial delay |
max |
30s |
Delay cap |
factor |
1.0 |
Multiplier per attempt (2.0 = exponential) |
jitter |
None |
Randomization strategy (see below) |
JitterPolicy - adds jitter to retry delays to prevent many tasks from retrying at the same time:
| Policy | Range | Use when |
|---|---|---|
None |
exact delay | Single task, predictable timing |
Full |
[0, delay] |
Maximum spread needed |
Equal (recommended) |
[delay/2, delay] |
Balanced; keeps about 75% of the delay |
Decorrelated |
[base, base*3] capped at max |
Self-adjusting; widens with each retry |
Events & Subscribe - Every lifecycle change is published to a broadcast bus.
Implement Subscribe to observe them.
Each subscriber gets its own bounded queue - a slow subscriber never blocks others or the supervisor.
Error handling
Return these from your task to control what happens next:
| Return | Retryable | What happens |
|---|---|---|
Ok(()) |
- | Task completed. RestartPolicy decides next step. |
Err(TaskError::Fail { reason, exit_code }) |
Yes | Retryable failure. Backoff, then retry. |
Err(TaskError::Timeout { .. }) |
Yes | Set automatically when per-task timeout is exceeded. |
Err(TaskError::Fatal { reason, exit_code }) |
No | Permanent failure. Actor stops, publishes ActorDead. |
Err(TaskError::Canceled) |
No | Graceful shutdown. Not an error. |
panic! in the task body |
Yes | Caught and converted to TaskError::Fail; backoff, then retry per policy. |
exit_code is Option<i32>: use when the error comes from a process-like runtime, pass None for logical errors.
Subscribers receive it as Event::exit_code on TaskFailed / ActorDead / ActorExhausted.
Cancellation
Tasks must observe cancellation via the CancellationToken passed to spawn:
// Pattern 1: select! (recommended for long-running tasks)
select!
// Pattern 2: check before work (ok for short tasks)
if ctx.is_cancelled
do_work.await
Recipes
Exponential backoff with jitter
use Duration;
use ;
let backoff = BackoffPolicy ;
Per-task timeout with max retries
// Task gets 5s per attempt, max 3 retries.
// If exceeded: TimeoutHit event + TaskError::Timeout + backoff + retry.
let spec = new
.with_max_retries;
Custom subscriber (metrics)
use ;
use Arc;
use ;
let metrics = new;
let sup = builder
.with_subscribers
.build;
Periodic task (run every N seconds)
// Runs, completes, waits 30s, runs again. Forever.
let spec = restartable
.with_restart;
Events
Every lifecycle change publishes an Event to the bus. Subscribe to observe them.
| Event | Meaning |
|---|---|
| Lifecycle | |
TaskStarting |
Attempt is beginning |
TaskStopped |
Completed or cancelled gracefully |
TaskFailed |
Attempt failed (retryable or fatal) |
TimeoutHit |
Attempt exceeded its timeout |
BackoffScheduled |
Retry delay scheduled (includes delay duration) |
| Terminal | |
ActorExhausted |
Restart policy says stop; the normal way a task ends |
ActorDead |
Fatal error, actor will not restart |
| Management | |
TaskAdded / TaskRemoved |
Task registered / unregistered |
TaskAddFailed |
Add rejected: a task with that name already exists |
TaskAddRequested / TaskRemoveRequested |
Add/remove commands received |
| Shutdown | |
ShutdownRequested |
OS signal caught (SIGTERM/SIGINT) |
AllStoppedWithinGrace |
Clean shutdown |
GraceExceeded |
Some tasks didn't stop in time |
| Subscriber | |
SubscriberPanicked |
A subscriber panicked (isolated, others unaffected) |
SubscriberOverflow |
Subscriber queue full, event dropped |
Controller (feature = controller) |
|
ControllerSubmitted |
Task accepted into a slot |
ControllerRejected |
Submission rejected (slot busy, queue full, ...) |
ControllerSlotTransition |
Slot changed state (e.g. running → terminating) |
Each event carries: kind, id (the task's TaskId), task (name), attempt, reason, exit_code, delay_ms, timeout_ms, backoff_source, seq (monotonic ordering), at (timestamp).
Configuration
use Duration;
use ;
let mut cfg = default;
cfg.grace = from_secs; // shutdown grace period
cfg.timeout = from_secs; // default per-task timeout (0 = none)
cfg.max_retries = 10; // default max retries (0 = unlimited)
cfg.max_concurrent = 4; // task concurrency limit (0 = unlimited)
cfg.bus_capacity = 2048; // event bus ring buffer size
cfg.restart = OnFailure;
cfg.backoff = default;
| Field | Default | Meaning |
|---|---|---|
grace |
60s |
How long to wait for tasks to stop on shutdown |
timeout |
0s (none) |
Default per-task attempt timeout |
max_retries |
0 (unlimited) |
Default max failure-driven retries |
max_concurrent |
0 (unlimited) |
Global semaphore for running tasks |
bus_capacity |
1024 |
Broadcast channel size. Slow subscribers see Lagged |
restart |
OnFailure |
Default restart policy for tasks |
backoff |
100ms / 1.0x / 30s max |
Default backoff for tasks |
Controller (feature = controller)
Slot-based admission control. Tasks submit to named slots, the policy decides what happens when a slot is busy.
| Policy | Behavior |
|---|---|
Queue |
FIFO queue. New task waits until current one finishes. |
Replace |
Cancels running task, starts the new one immediately. |
DropIfRunning |
Rejects submission if slot is already busy. |
use ;
let sup = builder
.with_controller
.build;
let handle = sup.serve;
handle.submit.await?;
handle.submit.await?;
handle.submit.await?;
handle.shutdown.await?;
Performance
Per-task overhead is a few microseconds: a handful of events on the bus, plus one registry insert and remove. That is small next to real task work, where I/O takes milliseconds or more. The cost grows linearly with the number of tasks, subscribers, and batch size.
Run the benchmarks on your own hardware:
Examples
| Example | What it shows |
|---|---|
| basic.rs | Minimal hello-world, one task runs once |
| worker.rs | Long-running worker with graceful Ctrl+C shutdown |
| periodic.rs | Cron-like periodic task via RestartPolicy::Always |
| multiple.rs | Three tasks with different policies and backoff |
| metrics.rs | Custom Subscribe implementation for metrics |
| dynamic.rs | serve() + SupervisorHandle: add/remove/cancel at runtime |
| pipeline.rs | Controller admission policies: Queue, Replace, DropIfRunning |
Optional features
| Feature | What it enables |
|---|---|
controller |
Slot-based admission control: ControllerSpec, ControllerConfig, AdmissionPolicy |
logging |
Built-in LogWriter subscriber - event output to stdout (demo/reference) |
= { = "0.2", = ["controller", "logging"] }
Contributing
Found a bug? Have an idea? Open an issue or send a pull request.