# Taskvisor
[](https://crates.io/crates/taskvisor)
[](https://docs.rs/taskvisor)
[](https://rust-lang.org)
[](./LICENSE)
<div>
<a href="https://docs.rs/taskvisor/latest/taskvisor/"><img alt="API Docs" src="https://img.shields.io/badge/API%20Docs-4d76ae?style=for-the-badge&logo=rust&logoColor=white"></a>
<a href="./examples/"><img alt="Examples" src="https://img.shields.io/badge/Examples-2ea44f?style=for-the-badge&logo=github&logoColor=white"></a>
</div>
> Lightweight, event-driven task supervision for async Rust.
Runs your background tasks, restarts them on failure with configurable backoff, and emits structured events for every lifecycle change.
```text
TaskFn (your async code)
▼
TaskSpec (task + RestartPolicy + BackoffPolicy + timeout + max_retries)
▼
Supervisor
├──► Registry
│ └──► TaskActor (per task)
│ ├──► attempt loop
│ │ ├──► run task with timeout + cancellation token
│ │ ├──► apply RestartPolicy on Ok/Err
│ │ └──► apply BackoffPolicy on failure
│ └──► publish events to Bus
└──► Bus (broadcast channel)
├──► AliveTracker (sequence-based liveness)
└──► Subscribers (own queue each; your metrics, logs, alerts)
```
## Why taskvisor?
Tokio gives you `spawn` and `JoinHandle`, but no supervision: no restart policies, no backoff, and no events to tell you what happened.
If a spawned task panics or fails, you only find out when you poll its `JoinHandle` - or never.
Taskvisor fills that gap:
| 🔁 | **Restart policies** | `Never`, `OnFailure`, or `Always { interval }`: chosen per task |
| ⏳ | **Backoff with jitter** | Exponential, constant, or decorrelated; spreads retries out in time |
| 📡 | **Structured events** | Every start, stop, failure, timeout, and backoff on a broadcast bus |
| 🔌 | **Pluggable subscribers** | Implement one method (`on_event`) for metrics, alerts, or logging |
| 🎛️ | **Dynamic management** | Add, remove, cancel tasks at runtime via `SupervisorHandle` |
| 🚦 | **Admission control** | Optional slot controller: Queue / Replace / DropIfRunning |
| 🚧 | **Concurrency limits** | Global semaphore, per-task timeouts, max retries |
Taskvisor is not a replacement for tokio or tower.
It works at a higher level: you write the task, and taskvisor runs it, restarts it on failure with backoff, and tells you what happened through events.
It is also not an actor framework: there are no addressable actors, mailboxes, or message passing.
Taskvisor supervises **tasks** (plain async functions), not actors.
> **Roadmap:**
> taskvisor is the supervision core of *Solti*, a larger task-orchestration toolkit in
> development on top of it (subprocess execution, HTTP/gRPC API, metrics, dashboards).
>
> Taskvisor stands on its own today: the rest is future work.
## Quick start
```toml
[dependencies]
taskvisor = "0.2"
tokio = { version = "1", features = ["full"] }
```
A task that prints "pong" every 10 seconds, restarts forever, and shuts down on Ctrl+C:
```rust,no_run
use std::time::Duration;
use taskvisor::prelude::*;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let sup = Supervisor::builder(SupervisorConfig::default()).build();
let ping: TaskRef = TaskFn::arc("ping", |ctx: CancellationToken| async move {
tokio::select! {
_ = ctx.cancelled() => return Err(TaskError::Canceled),
_ = tokio::time::sleep(Duration::from_millis(100)) => {}
}
println!("[ping] pong");
Ok(())
});
let spec = TaskSpec::restartable(ping)
.with_restart(RestartPolicy::Always { interval: Some(Duration::from_secs(10)) });
sup.run(vec![spec]).await?;
Ok(())
}
```
## See it recover from failure
A task that fails twice, then succeeds - taskvisor retries it with backoff and publishes an event for every step.
A subscriber prints them, so you *see* the supervision happen:
```rust,no_run
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
use taskvisor::prelude::*;
/// A subscriber that prints every lifecycle event the supervisor emits for a task.
struct Printer;
impl Subscribe for Printer {
fn on_event(&self, ev: &Event) {
if let Some(task) = ev.task.as_deref() {
println!(" {:?} (task={task})", ev.kind);
}
}
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let attempts = Arc::new(AtomicU32::new(0));
let flaky: TaskRef = TaskFn::arc("flaky", move |_ctx: CancellationToken| {
let attempts = Arc::clone(&attempts);
async move {
let n = attempts.fetch_add(1, Ordering::Relaxed) + 1;
if n < 3 {
Err(TaskError::Fail { reason: format!("boom #{n}"), exit_code: None })
} else {
Ok(()) // 3rd attempt succeeds
}
}
});
// Restart on failure, with a short backoff between attempts.
let spec = TaskSpec::restartable(flaky).with_backoff(BackoffPolicy {
first: Duration::from_millis(50),
..BackoffPolicy::default()
});
let subs: Vec<Arc<dyn Subscribe>> = vec![Arc::new(Printer)];
Supervisor::new(SupervisorConfig::default(), subs).run(vec![spec]).await?;
Ok(())
}
```
```text
# output:
TaskAddRequested (task=flaky)
TaskAdded (task=flaky)
TaskStarting (task=flaky) # attempt 1
TaskFailed (task=flaky)
BackoffScheduled (task=flaky) # wait, then retry
TaskStarting (task=flaky) # attempt 2
TaskFailed (task=flaky)
BackoffScheduled (task=flaky)
TaskStarting (task=flaky) # attempt 3
TaskStopped (task=flaky) # success
ActorExhausted (task=flaky) # OnFailure + success = done
TaskRemoved (task=flaky)
```
Restart, backoff, and an event for every step, without writing a retry loop.
See [`examples/metrics.rs`](examples/metrics.rs) for a fuller version (`cargo run --example metrics`).
## Two modes
| `sup.run(specs)` | Tasks known upfront | Blocks until all done or Ctrl+C |
| `sup.serve()` | Tasks added at runtime | Returns `SupervisorHandle`, you control shutdown |
```rust,ignore
// Dynamic mode
let handle = sup.serve();
let id = handle.add(spec)?; // returns a TaskId
handle.cancel(id).await?; // cancel by identity
handle.remove(id)?; // or remove by identity
handle.cancel_by_label("task-name").await?; // ...or by label
let alive = handle.is_alive("task-name").await;
let tasks = handle.list().await; // Vec<(TaskId, name)>
handle.shutdown().await?;
```
## Core concepts
**Task & TaskFn** - A `Task` is any `Send + Sync + 'static` type that implements
```rust,ignore
fn spawn(&self, ctx: CancellationToken) -> BoxTaskFuture
```
`TaskFn` wraps a closure into a `Task` so you don't need a struct. `TaskRef` is just `Arc<dyn Task>`.
**TaskSpec** - Bundles a task with its policies: restart, backoff, timeout, and max retries.
This is what you pass to the supervisor.
```rust,ignore
// One-shot (run once, never restart)
let spec = TaskSpec::once(task);
// Restartable (restart on failure, stop on success)
let spec = TaskSpec::restartable(task);
// Full control
let spec = TaskSpec::new(task, RestartPolicy::Always { interval: None }, backoff, Some(timeout))
.with_max_retries(5);
```
**RestartPolicy** - Controls when a task restarts after it exits:
| `Never` | Runs once, then stops. |
| `OnFailure` *(default)* | Restarts only after an error; stops on success. |
| `Always { interval }` | Always restarts. `interval: Some(10s)` waits between runs. |
**BackoffPolicy** - Controls retry delay after failure.
Delay for attempt `n` = `first * factor^n`, capped at `max`, then jitter is applied:
| `first` | `100ms` | Initial delay |
| `max` | `30s` | Delay cap |
| `factor` | `1.0` | Multiplier per attempt (`2.0` = exponential) |
| `jitter` | `None` | Randomization strategy (see below) |
**JitterPolicy** - adds jitter to retry delays to prevent many tasks from retrying at the same time:
| `None` | exact delay | Single task, predictable timing |
| `Full` | `[0, delay]` | Maximum spread needed |
| `Equal` *(recommended)* | `[delay/2, delay]` | Balanced; keeps about 75% of the delay |
| `Decorrelated` | `[base, base*3]` capped at `max` | Self-adjusting; widens with each retry |
**Events & Subscribe** - Every lifecycle change is published to a broadcast bus.
Implement `Subscribe` to observe them.
Each subscriber gets its own bounded queue - a slow subscriber never blocks others or the supervisor.
## Error handling
Return these from your task to control what happens next:
| `Ok(())` | - | Task completed. `RestartPolicy` decides next step. |
| `Err(TaskError::Fail { reason, exit_code })` | Yes | Retryable failure. Backoff, then retry. |
| `Err(TaskError::Timeout { .. })` | Yes | Set automatically when per-task timeout is exceeded. |
| `Err(TaskError::Fatal { reason, exit_code })` | No | Permanent failure. Actor stops, publishes `ActorDead`. |
| `Err(TaskError::Canceled)` | No | Graceful shutdown. Not an error. |
| `panic!` in the task body | Yes | Caught and converted to `TaskError::Fail`; backoff, then retry per policy. |
`exit_code` is `Option<i32>`: use when the error comes from a process-like runtime, pass `None` for logical errors.
Subscribers receive it as `Event::exit_code` on `TaskFailed` / `ActorDead` / `ActorExhausted`.
### Cancellation
Tasks **must** observe cancellation via the `CancellationToken` passed to `spawn`:
```rust,ignore
// Pattern 1: select! (recommended for long-running tasks)
tokio::select! {
_ = ctx.cancelled() => Err(TaskError::Canceled),
result = do_work() => result,
}
// Pattern 2: check before work (ok for short tasks)
if ctx.is_cancelled() { return Err(TaskError::Canceled); }
do_work().await
```
## Recipes
### Exponential backoff with jitter
```rust,no_run
use std::time::Duration;
use taskvisor::{BackoffPolicy, JitterPolicy};
let backoff = BackoffPolicy {
first: Duration::from_millis(200),
max: Duration::from_secs(30),
factor: 2.0, // 200ms -> 400ms -> 800ms -> ...
jitter: JitterPolicy::Equal, // recommended: [delay/2, delay]
};
```
### Per-task timeout with max retries
```rust,ignore
// Task gets 5s per attempt, max 3 retries.
// If exceeded: TimeoutHit event + TaskError::Timeout + backoff + retry.
let spec = TaskSpec::new(task, RestartPolicy::OnFailure, backoff, Some(Duration::from_secs(5)))
.with_max_retries(3);
```
### Custom subscriber (metrics)
```rust,no_run
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use taskvisor::{Subscribe, Event, EventKind, Supervisor, SupervisorConfig};
struct Metrics { failures: AtomicU64 }
impl Subscribe for Metrics {
fn on_event(&self, event: &Event) {
if matches!(event.kind, EventKind::TaskFailed) {
self.failures.fetch_add(1, Ordering::Relaxed);
}
}
}
let metrics = Arc::new(Metrics { failures: AtomicU64::new(0) });
let sup = Supervisor::builder(SupervisorConfig::default())
.with_subscribers(vec![metrics as Arc<dyn Subscribe>])
.build();
```
### Periodic task (run every N seconds)
```rust,ignore
// Runs, completes, waits 30s, runs again. Forever.
let spec = TaskSpec::restartable(task)
.with_restart(RestartPolicy::Always { interval: Some(Duration::from_secs(30)) });
```
## Events
Every lifecycle change publishes an `Event` to the bus. Subscribe to observe them.
| **Lifecycle** | |
| `TaskStarting` | Attempt is beginning |
| `TaskStopped` | Completed or cancelled gracefully |
| `TaskFailed` | Attempt failed (retryable or fatal) |
| `TimeoutHit` | Attempt exceeded its timeout |
| `BackoffScheduled` | Retry delay scheduled (includes delay duration) |
| **Terminal** | |
| `ActorExhausted` | Restart policy says stop; the normal way a task ends |
| `ActorDead` | Fatal error, actor will not restart |
| **Management** | |
| `TaskAdded` / `TaskRemoved` | Task registered / unregistered |
| `TaskAddFailed` | Add rejected: a task with that name already exists |
| `TaskAddRequested` / `TaskRemoveRequested` | Add/remove commands received |
| **Shutdown** | |
| `ShutdownRequested` | OS signal caught (SIGTERM/SIGINT) |
| `AllStoppedWithinGrace` | Clean shutdown |
| `GraceExceeded` | Some tasks didn't stop in time |
| **Subscriber** | |
| `SubscriberPanicked` | A subscriber panicked (isolated, others unaffected) |
| `SubscriberOverflow` | Subscriber queue full, event dropped |
| **Controller** *(feature = `controller`)* | |
| `ControllerSubmitted` | Task accepted into a slot |
| `ControllerRejected` | Submission rejected (slot busy, queue full, ...) |
| `ControllerSlotTransition` | Slot changed state (e.g. running → terminating) |
Each event carries: `kind`, `id` (the task's `TaskId`), `task` (name), `attempt`, `reason`, `exit_code`, `delay_ms`, `timeout_ms`, `backoff_source`, `seq` (monotonic ordering), `at` (timestamp).
## Configuration
```rust,no_run
use std::time::Duration;
use taskvisor::{SupervisorConfig, RestartPolicy, BackoffPolicy};
let mut cfg = SupervisorConfig::default();
cfg.grace = Duration::from_secs(30); // shutdown grace period
cfg.timeout = Duration::from_secs(5); // default per-task timeout (0 = none)
cfg.max_retries = 10; // default max retries (0 = unlimited)
cfg.max_concurrent = 4; // task concurrency limit (0 = unlimited)
cfg.bus_capacity = 2048; // event bus ring buffer size
cfg.restart = RestartPolicy::OnFailure;
cfg.backoff = BackoffPolicy::default();
```
| `grace` | `60s` | How long to wait for tasks to stop on shutdown |
| `timeout` | `0s` (none) | Default per-task attempt timeout |
| `max_retries` | `0` (unlimited) | Default max failure-driven retries |
| `max_concurrent` | `0` (unlimited) | Global semaphore for running tasks |
| `bus_capacity` | `1024` | Broadcast channel size. Slow subscribers see `Lagged` |
| `restart` | `OnFailure` | Default restart policy for tasks |
| `backoff` | `100ms / 1.0x / 30s max` | Default backoff for tasks |
## Controller *(feature = `controller`)*
Slot-based admission control.
Tasks submit to named slots, the policy decides what happens when a slot is busy.
| `Queue` | FIFO queue. New task waits until current one finishes. |
| `Replace` | Cancels running task, starts the new one immediately. |
| `DropIfRunning` | Rejects submission if slot is already busy. |
```rust,ignore
use taskvisor::{ControllerSpec, ControllerConfig};
let sup = Supervisor::builder(cfg)
.with_controller(ControllerConfig::default())
.build();
let handle = sup.serve();
handle.submit(ControllerSpec::queue(spec)).await?;
handle.submit(ControllerSpec::replace(spec)).await?;
handle.submit(ControllerSpec::drop_if_running(spec)).await?;
handle.shutdown().await?;
```
## Performance
Per-task overhead is a few microseconds: a handful of events on the bus, plus one registry insert and remove.
That is small next to real task work, where I/O takes milliseconds or more.
The cost grows linearly with the number of tasks, subscribers, and batch size.
Run the benchmarks on your own hardware:
```bash
cargo bench # all benchmarks
cargo bench --bench lifecycle # specific suite
cargo bench --bench controller --features controller # controller benchmarks
```
## Examples
```bash
cargo run --example basic
cargo run --example worker
cargo run --example periodic
cargo run --example multiple
cargo run --example metrics
cargo run --example dynamic
cargo run --example pipeline --features controller
```
| [basic.rs](examples/basic.rs) | Minimal hello-world, one task runs once |
| [worker.rs](examples/worker.rs) | Long-running worker with graceful Ctrl+C shutdown |
| [periodic.rs](examples/periodic.rs) | Cron-like periodic task via `RestartPolicy::Always` |
| [multiple.rs](examples/multiple.rs) | Three tasks with different policies and backoff |
| [metrics.rs](examples/metrics.rs) | Custom `Subscribe` implementation for metrics |
| [dynamic.rs](examples/dynamic.rs) | `serve()` + `SupervisorHandle`: add/remove/cancel at runtime |
| [pipeline.rs](examples/pipeline.rs) | Controller admission policies: Queue, Replace, DropIfRunning |
## Optional features
| `controller` | Slot-based admission control: `ControllerSpec`, `ControllerConfig`, `AdmissionPolicy` |
| `logging` | Built-in `LogWriter` subscriber - event output to stdout (demo/reference) |
```toml
taskvisor = { version = "0.2", features = ["controller", "logging"] }
```
## Contributing
Found a bug? Have an idea? [Open an issue](https://github.com/soltiHQ/taskvisor/issues) or send a pull request.