Skip to main content

Supervisor

Struct Supervisor 

Source
pub struct Supervisor { /* private fields */ }
Expand description

Orchestrates task actors, event delivery, and graceful shutdown.

  • Spawns and supervises task actors via event-driven registry
  • Provides runtime task management (add/remove tasks dynamically)
  • Fans out events to subscribers (non-blocking)
  • Handles graceful shutdown on OS signals
  • Tracks alive tasks for stuck detection
  • Enforces global concurrency limits

Implementations§

Source§

impl Supervisor

Source

pub fn new( cfg: SupervisorConfig, subscribers: Vec<Arc<dyn Subscribe>>, ) -> Arc<Self>

Creates a new supervisor with the given config and subscribers (maybe empty).

Examples found in repository?
examples/subscriber.rs (line 106)
101async fn main() -> anyhow::Result<()> {
102    let metrics = Arc::new(MetricsSubscriber::new());
103
104    let subs: Vec<Arc<dyn taskvisor::Subscribe>> =
105        vec![Arc::clone(&metrics) as Arc<dyn taskvisor::Subscribe>];
106    let sup = taskvisor::Supervisor::new(taskvisor::SupervisorConfig::default(), subs);
107
108    sup.run(vec![make_spec()]).await?;
109    metrics.print_stats();
110    Ok(())
111}
More examples
Hide additional examples
examples/control.rs (lines 19-22)
18async fn main() -> anyhow::Result<()> {
19    let sup = Arc::new(taskvisor::Supervisor::new(
20        taskvisor::SupervisorConfig::default(),
21        vec![],
22    ));
23    let runner = Arc::clone(&sup);
24    tokio::spawn(async move {
25        let _ = runner.run(vec![]).await;
26    });
27    sup.wait_ready().await;
28
29    // ============================================================
30    // Demo 1: Add task dynamically
31    // ============================================================
32    println!(" ─► Adding 'worker-A'...");
33
34    sup.add_task(make_worker("worker-A"))?;
35    tokio::time::sleep(Duration::from_secs(1)).await;
36    let tasks = sup.list_tasks().await;
37    println!(" ─► Active tasks: {tasks:?}");
38
39    // ============================================================
40    // Demo 2: Add second task
41    // ============================================================
42    println!(" ─► Adding 'worker-B'...");
43
44    sup.add_task(make_worker("worker-B"))?;
45    tokio::time::sleep(Duration::from_secs(1)).await;
46    let tasks = sup.list_tasks().await;
47    println!(" ─► Active tasks: {tasks:?}");
48
49    // ============================================================
50    // Demo 3: Remove specific task
51    // ============================================================
52    println!(" ─► Removing 'worker-A'...");
53
54    sup.remove_task("worker-A")?;
55    tokio::time::sleep(Duration::from_millis(500)).await;
56    let tasks = sup.list_tasks().await;
57    println!(" ─► Active tasks: {tasks:?}");
58
59    // ============================================================
60    // Demo 4: Cancel task (with confirmation)
61    // ============================================================
62    println!(" ─► Cancelling 'worker-B'...");
63
64    let cancelled = sup.cancel("worker-B").await?;
65    println!(" ─► Task cancelled: {cancelled}");
66
67    let alive = sup.is_alive("worker-B").await;
68    println!(" ─► Is alive: {alive}");
69
70    println!("Done");
71    Ok(())
72}
Source

pub fn builder(cfg: SupervisorConfig) -> SupervisorBuilder

Creates a builder for constructing a Supervisor.

§Example
use taskvisor::{SupervisorConfig, Supervisor};

let sup = Supervisor::builder(SupervisorConfig::default())
    .with_subscribers(vec![])
    .build();
Examples found in repository?
examples/controller.rs (line 47)
46async fn main() -> anyhow::Result<()> {
47    let sup = taskvisor::Supervisor::builder(taskvisor::SupervisorConfig::default())
48        .with_controller(taskvisor::ControllerConfig::default())
49        .build();
50
51    let runner = Arc::clone(&sup);
52    tokio::spawn(async move {
53        let _ = runner.run(vec![]).await;
54    });
55    sup.wait_ready().await;
56
57    // ============================================================
58    // Demo -> Queue: Tasks execute one after another
59    // ============================================================
60    println!("Demo 1: Queue Policy");
61    println!(" └► Submit 3 tasks with same name: they run sequentially");
62
63    for _ in 1..=3 {
64        let spec = make_spec("job-in-queue", 800);
65        sup.submit(taskvisor::ControllerSpec::queue(spec)).await?;
66    }
67
68    tokio::time::sleep(Duration::from_secs(4)).await;
69    println!();
70
71    // ============================================================
72    // Demo -> Replace: New task cancels running one
73    // ============================================================
74    println!("Demo 2: Replace Policy");
75    println!(" └► Submit task, wait 500ms, submit another: first gets cancelled");
76
77    let task_1 = make_spec("job-replace", 6000);
78    let task_2 = make_spec("job-replace", 500);
79
80    sup.submit(taskvisor::ControllerSpec::replace(task_1))
81        .await?;
82    tokio::time::sleep(Duration::from_secs(1)).await;
83    sup.submit(taskvisor::ControllerSpec::replace(task_2))
84        .await?;
85
86    tokio::time::sleep(Duration::from_secs(2)).await;
87    println!();
88
89    // ============================================================
90    // Demo -> DropIfRunning: Ignores(skip) new tasks while busy
91    // ============================================================
92    println!("Demo 3: DropIfRunning Policy");
93    println!(" └► Submit task & submit another while first is running: second is ignored");
94
95    let task_1 = make_spec("job-drop-if-running", 1000);
96    let task_2 = make_spec("job-drop-if-running", 10000);
97
98    sup.submit(taskvisor::ControllerSpec::drop_if_running(task_1))
99        .await?;
100    tokio::time::sleep(Duration::from_millis(250)).await;
101    sup.submit(taskvisor::ControllerSpec::drop_if_running(task_2))
102        .await?;
103
104    tokio::time::sleep(Duration::from_secs(2)).await;
105    println!();
106
107    println!("Done");
108    Ok(())
109}
Source

pub async fn wait_ready(&self)

Waits until supervisor is fully initialized and ready to accept submissions.

Examples found in repository?
examples/control.rs (line 27)
18async fn main() -> anyhow::Result<()> {
19    let sup = Arc::new(taskvisor::Supervisor::new(
20        taskvisor::SupervisorConfig::default(),
21        vec![],
22    ));
23    let runner = Arc::clone(&sup);
24    tokio::spawn(async move {
25        let _ = runner.run(vec![]).await;
26    });
27    sup.wait_ready().await;
28
29    // ============================================================
30    // Demo 1: Add task dynamically
31    // ============================================================
32    println!(" ─► Adding 'worker-A'...");
33
34    sup.add_task(make_worker("worker-A"))?;
35    tokio::time::sleep(Duration::from_secs(1)).await;
36    let tasks = sup.list_tasks().await;
37    println!(" ─► Active tasks: {tasks:?}");
38
39    // ============================================================
40    // Demo 2: Add second task
41    // ============================================================
42    println!(" ─► Adding 'worker-B'...");
43
44    sup.add_task(make_worker("worker-B"))?;
45    tokio::time::sleep(Duration::from_secs(1)).await;
46    let tasks = sup.list_tasks().await;
47    println!(" ─► Active tasks: {tasks:?}");
48
49    // ============================================================
50    // Demo 3: Remove specific task
51    // ============================================================
52    println!(" ─► Removing 'worker-A'...");
53
54    sup.remove_task("worker-A")?;
55    tokio::time::sleep(Duration::from_millis(500)).await;
56    let tasks = sup.list_tasks().await;
57    println!(" ─► Active tasks: {tasks:?}");
58
59    // ============================================================
60    // Demo 4: Cancel task (with confirmation)
61    // ============================================================
62    println!(" ─► Cancelling 'worker-B'...");
63
64    let cancelled = sup.cancel("worker-B").await?;
65    println!(" ─► Task cancelled: {cancelled}");
66
67    let alive = sup.is_alive("worker-B").await;
68    println!(" ─► Is alive: {alive}");
69
70    println!("Done");
71    Ok(())
72}
More examples
Hide additional examples
examples/controller.rs (line 55)
46async fn main() -> anyhow::Result<()> {
47    let sup = taskvisor::Supervisor::builder(taskvisor::SupervisorConfig::default())
48        .with_controller(taskvisor::ControllerConfig::default())
49        .build();
50
51    let runner = Arc::clone(&sup);
52    tokio::spawn(async move {
53        let _ = runner.run(vec![]).await;
54    });
55    sup.wait_ready().await;
56
57    // ============================================================
58    // Demo -> Queue: Tasks execute one after another
59    // ============================================================
60    println!("Demo 1: Queue Policy");
61    println!(" └► Submit 3 tasks with same name: they run sequentially");
62
63    for _ in 1..=3 {
64        let spec = make_spec("job-in-queue", 800);
65        sup.submit(taskvisor::ControllerSpec::queue(spec)).await?;
66    }
67
68    tokio::time::sleep(Duration::from_secs(4)).await;
69    println!();
70
71    // ============================================================
72    // Demo -> Replace: New task cancels running one
73    // ============================================================
74    println!("Demo 2: Replace Policy");
75    println!(" └► Submit task, wait 500ms, submit another: first gets cancelled");
76
77    let task_1 = make_spec("job-replace", 6000);
78    let task_2 = make_spec("job-replace", 500);
79
80    sup.submit(taskvisor::ControllerSpec::replace(task_1))
81        .await?;
82    tokio::time::sleep(Duration::from_secs(1)).await;
83    sup.submit(taskvisor::ControllerSpec::replace(task_2))
84        .await?;
85
86    tokio::time::sleep(Duration::from_secs(2)).await;
87    println!();
88
89    // ============================================================
90    // Demo -> DropIfRunning: Ignores(skip) new tasks while busy
91    // ============================================================
92    println!("Demo 3: DropIfRunning Policy");
93    println!(" └► Submit task & submit another while first is running: second is ignored");
94
95    let task_1 = make_spec("job-drop-if-running", 1000);
96    let task_2 = make_spec("job-drop-if-running", 10000);
97
98    sup.submit(taskvisor::ControllerSpec::drop_if_running(task_1))
99        .await?;
100    tokio::time::sleep(Duration::from_millis(250)).await;
101    sup.submit(taskvisor::ControllerSpec::drop_if_running(task_2))
102        .await?;
103
104    tokio::time::sleep(Duration::from_secs(2)).await;
105    println!();
106
107    println!("Done");
108    Ok(())
109}
Source

pub fn add_task(&self, spec: TaskSpec) -> Result<(), RuntimeError>

Adds a new task to the supervisor at runtime.

Publishes TaskAddRequested with the spec to the bus. Registry listener will spawn the actor.

Examples found in repository?
examples/control.rs (line 34)
18async fn main() -> anyhow::Result<()> {
19    let sup = Arc::new(taskvisor::Supervisor::new(
20        taskvisor::SupervisorConfig::default(),
21        vec![],
22    ));
23    let runner = Arc::clone(&sup);
24    tokio::spawn(async move {
25        let _ = runner.run(vec![]).await;
26    });
27    sup.wait_ready().await;
28
29    // ============================================================
30    // Demo 1: Add task dynamically
31    // ============================================================
32    println!(" ─► Adding 'worker-A'...");
33
34    sup.add_task(make_worker("worker-A"))?;
35    tokio::time::sleep(Duration::from_secs(1)).await;
36    let tasks = sup.list_tasks().await;
37    println!(" ─► Active tasks: {tasks:?}");
38
39    // ============================================================
40    // Demo 2: Add second task
41    // ============================================================
42    println!(" ─► Adding 'worker-B'...");
43
44    sup.add_task(make_worker("worker-B"))?;
45    tokio::time::sleep(Duration::from_secs(1)).await;
46    let tasks = sup.list_tasks().await;
47    println!(" ─► Active tasks: {tasks:?}");
48
49    // ============================================================
50    // Demo 3: Remove specific task
51    // ============================================================
52    println!(" ─► Removing 'worker-A'...");
53
54    sup.remove_task("worker-A")?;
55    tokio::time::sleep(Duration::from_millis(500)).await;
56    let tasks = sup.list_tasks().await;
57    println!(" ─► Active tasks: {tasks:?}");
58
59    // ============================================================
60    // Demo 4: Cancel task (with confirmation)
61    // ============================================================
62    println!(" ─► Cancelling 'worker-B'...");
63
64    let cancelled = sup.cancel("worker-B").await?;
65    println!(" ─► Task cancelled: {cancelled}");
66
67    let alive = sup.is_alive("worker-B").await;
68    println!(" ─► Is alive: {alive}");
69
70    println!("Done");
71    Ok(())
72}
Source

pub fn remove_task(&self, name: &str) -> Result<(), RuntimeError>

Removes a task from the supervisor at runtime.

Publishes TaskRemoveRequested to the bus. Registry listener will cancel and remove the task.

Examples found in repository?
examples/control.rs (line 54)
18async fn main() -> anyhow::Result<()> {
19    let sup = Arc::new(taskvisor::Supervisor::new(
20        taskvisor::SupervisorConfig::default(),
21        vec![],
22    ));
23    let runner = Arc::clone(&sup);
24    tokio::spawn(async move {
25        let _ = runner.run(vec![]).await;
26    });
27    sup.wait_ready().await;
28
29    // ============================================================
30    // Demo 1: Add task dynamically
31    // ============================================================
32    println!(" ─► Adding 'worker-A'...");
33
34    sup.add_task(make_worker("worker-A"))?;
35    tokio::time::sleep(Duration::from_secs(1)).await;
36    let tasks = sup.list_tasks().await;
37    println!(" ─► Active tasks: {tasks:?}");
38
39    // ============================================================
40    // Demo 2: Add second task
41    // ============================================================
42    println!(" ─► Adding 'worker-B'...");
43
44    sup.add_task(make_worker("worker-B"))?;
45    tokio::time::sleep(Duration::from_secs(1)).await;
46    let tasks = sup.list_tasks().await;
47    println!(" ─► Active tasks: {tasks:?}");
48
49    // ============================================================
50    // Demo 3: Remove specific task
51    // ============================================================
52    println!(" ─► Removing 'worker-A'...");
53
54    sup.remove_task("worker-A")?;
55    tokio::time::sleep(Duration::from_millis(500)).await;
56    let tasks = sup.list_tasks().await;
57    println!(" ─► Active tasks: {tasks:?}");
58
59    // ============================================================
60    // Demo 4: Cancel task (with confirmation)
61    // ============================================================
62    println!(" ─► Cancelling 'worker-B'...");
63
64    let cancelled = sup.cancel("worker-B").await?;
65    println!(" ─► Task cancelled: {cancelled}");
66
67    let alive = sup.is_alive("worker-B").await;
68    println!(" ─► Is alive: {alive}");
69
70    println!("Done");
71    Ok(())
72}
Source

pub async fn list_tasks(&self) -> Vec<String>

Returns a sorted list of currently active task names from the registry.

Examples found in repository?
examples/control.rs (line 36)
18async fn main() -> anyhow::Result<()> {
19    let sup = Arc::new(taskvisor::Supervisor::new(
20        taskvisor::SupervisorConfig::default(),
21        vec![],
22    ));
23    let runner = Arc::clone(&sup);
24    tokio::spawn(async move {
25        let _ = runner.run(vec![]).await;
26    });
27    sup.wait_ready().await;
28
29    // ============================================================
30    // Demo 1: Add task dynamically
31    // ============================================================
32    println!(" ─► Adding 'worker-A'...");
33
34    sup.add_task(make_worker("worker-A"))?;
35    tokio::time::sleep(Duration::from_secs(1)).await;
36    let tasks = sup.list_tasks().await;
37    println!(" ─► Active tasks: {tasks:?}");
38
39    // ============================================================
40    // Demo 2: Add second task
41    // ============================================================
42    println!(" ─► Adding 'worker-B'...");
43
44    sup.add_task(make_worker("worker-B"))?;
45    tokio::time::sleep(Duration::from_secs(1)).await;
46    let tasks = sup.list_tasks().await;
47    println!(" ─► Active tasks: {tasks:?}");
48
49    // ============================================================
50    // Demo 3: Remove specific task
51    // ============================================================
52    println!(" ─► Removing 'worker-A'...");
53
54    sup.remove_task("worker-A")?;
55    tokio::time::sleep(Duration::from_millis(500)).await;
56    let tasks = sup.list_tasks().await;
57    println!(" ─► Active tasks: {tasks:?}");
58
59    // ============================================================
60    // Demo 4: Cancel task (with confirmation)
61    // ============================================================
62    println!(" ─► Cancelling 'worker-B'...");
63
64    let cancelled = sup.cancel("worker-B").await?;
65    println!(" ─► Task cancelled: {cancelled}");
66
67    let alive = sup.is_alive("worker-B").await;
68    println!(" ─► Is alive: {alive}");
69
70    println!("Done");
71    Ok(())
72}
Source

pub async fn run(&self, tasks: Vec<TaskSpec>) -> Result<(), RuntimeError>

Runs task specifications until completion or shutdown signal.

Steps:

  • Spawn subscriber listener (event fan-out)
  • Spawn registry listener (task lifecycle management)
  • Notify waiters: ready for submit jobs
  • Publish TaskAddRequested for initial tasks
  • Optionally wait until registry becomes non-empty (if we added tasks)
  • Wait for shutdown signal or all tasks to exit
Examples found in repository?
examples/subscriber.rs (line 108)
101async fn main() -> anyhow::Result<()> {
102    let metrics = Arc::new(MetricsSubscriber::new());
103
104    let subs: Vec<Arc<dyn taskvisor::Subscribe>> =
105        vec![Arc::clone(&metrics) as Arc<dyn taskvisor::Subscribe>];
106    let sup = taskvisor::Supervisor::new(taskvisor::SupervisorConfig::default(), subs);
107
108    sup.run(vec![make_spec()]).await?;
109    metrics.print_stats();
110    Ok(())
111}
More examples
Hide additional examples
examples/control.rs (line 25)
18async fn main() -> anyhow::Result<()> {
19    let sup = Arc::new(taskvisor::Supervisor::new(
20        taskvisor::SupervisorConfig::default(),
21        vec![],
22    ));
23    let runner = Arc::clone(&sup);
24    tokio::spawn(async move {
25        let _ = runner.run(vec![]).await;
26    });
27    sup.wait_ready().await;
28
29    // ============================================================
30    // Demo 1: Add task dynamically
31    // ============================================================
32    println!(" ─► Adding 'worker-A'...");
33
34    sup.add_task(make_worker("worker-A"))?;
35    tokio::time::sleep(Duration::from_secs(1)).await;
36    let tasks = sup.list_tasks().await;
37    println!(" ─► Active tasks: {tasks:?}");
38
39    // ============================================================
40    // Demo 2: Add second task
41    // ============================================================
42    println!(" ─► Adding 'worker-B'...");
43
44    sup.add_task(make_worker("worker-B"))?;
45    tokio::time::sleep(Duration::from_secs(1)).await;
46    let tasks = sup.list_tasks().await;
47    println!(" ─► Active tasks: {tasks:?}");
48
49    // ============================================================
50    // Demo 3: Remove specific task
51    // ============================================================
52    println!(" ─► Removing 'worker-A'...");
53
54    sup.remove_task("worker-A")?;
55    tokio::time::sleep(Duration::from_millis(500)).await;
56    let tasks = sup.list_tasks().await;
57    println!(" ─► Active tasks: {tasks:?}");
58
59    // ============================================================
60    // Demo 4: Cancel task (with confirmation)
61    // ============================================================
62    println!(" ─► Cancelling 'worker-B'...");
63
64    let cancelled = sup.cancel("worker-B").await?;
65    println!(" ─► Task cancelled: {cancelled}");
66
67    let alive = sup.is_alive("worker-B").await;
68    println!(" ─► Is alive: {alive}");
69
70    println!("Done");
71    Ok(())
72}
examples/controller.rs (line 53)
46async fn main() -> anyhow::Result<()> {
47    let sup = taskvisor::Supervisor::builder(taskvisor::SupervisorConfig::default())
48        .with_controller(taskvisor::ControllerConfig::default())
49        .build();
50
51    let runner = Arc::clone(&sup);
52    tokio::spawn(async move {
53        let _ = runner.run(vec![]).await;
54    });
55    sup.wait_ready().await;
56
57    // ============================================================
58    // Demo -> Queue: Tasks execute one after another
59    // ============================================================
60    println!("Demo 1: Queue Policy");
61    println!(" └► Submit 3 tasks with same name: they run sequentially");
62
63    for _ in 1..=3 {
64        let spec = make_spec("job-in-queue", 800);
65        sup.submit(taskvisor::ControllerSpec::queue(spec)).await?;
66    }
67
68    tokio::time::sleep(Duration::from_secs(4)).await;
69    println!();
70
71    // ============================================================
72    // Demo -> Replace: New task cancels running one
73    // ============================================================
74    println!("Demo 2: Replace Policy");
75    println!(" └► Submit task, wait 500ms, submit another: first gets cancelled");
76
77    let task_1 = make_spec("job-replace", 6000);
78    let task_2 = make_spec("job-replace", 500);
79
80    sup.submit(taskvisor::ControllerSpec::replace(task_1))
81        .await?;
82    tokio::time::sleep(Duration::from_secs(1)).await;
83    sup.submit(taskvisor::ControllerSpec::replace(task_2))
84        .await?;
85
86    tokio::time::sleep(Duration::from_secs(2)).await;
87    println!();
88
89    // ============================================================
90    // Demo -> DropIfRunning: Ignores(skip) new tasks while busy
91    // ============================================================
92    println!("Demo 3: DropIfRunning Policy");
93    println!(" └► Submit task & submit another while first is running: second is ignored");
94
95    let task_1 = make_spec("job-drop-if-running", 1000);
96    let task_2 = make_spec("job-drop-if-running", 10000);
97
98    sup.submit(taskvisor::ControllerSpec::drop_if_running(task_1))
99        .await?;
100    tokio::time::sleep(Duration::from_millis(250)).await;
101    sup.submit(taskvisor::ControllerSpec::drop_if_running(task_2))
102        .await?;
103
104    tokio::time::sleep(Duration::from_secs(2)).await;
105    println!();
106
107    println!("Done");
108    Ok(())
109}
Source

pub async fn snapshot(&self) -> Vec<String>

Returns sorted list of currently alive task names.

Source

pub async fn is_alive(&self, name: &str) -> bool

Check whether a given task is currently alive.

Examples found in repository?
examples/control.rs (line 67)
18async fn main() -> anyhow::Result<()> {
19    let sup = Arc::new(taskvisor::Supervisor::new(
20        taskvisor::SupervisorConfig::default(),
21        vec![],
22    ));
23    let runner = Arc::clone(&sup);
24    tokio::spawn(async move {
25        let _ = runner.run(vec![]).await;
26    });
27    sup.wait_ready().await;
28
29    // ============================================================
30    // Demo 1: Add task dynamically
31    // ============================================================
32    println!(" ─► Adding 'worker-A'...");
33
34    sup.add_task(make_worker("worker-A"))?;
35    tokio::time::sleep(Duration::from_secs(1)).await;
36    let tasks = sup.list_tasks().await;
37    println!(" ─► Active tasks: {tasks:?}");
38
39    // ============================================================
40    // Demo 2: Add second task
41    // ============================================================
42    println!(" ─► Adding 'worker-B'...");
43
44    sup.add_task(make_worker("worker-B"))?;
45    tokio::time::sleep(Duration::from_secs(1)).await;
46    let tasks = sup.list_tasks().await;
47    println!(" ─► Active tasks: {tasks:?}");
48
49    // ============================================================
50    // Demo 3: Remove specific task
51    // ============================================================
52    println!(" ─► Removing 'worker-A'...");
53
54    sup.remove_task("worker-A")?;
55    tokio::time::sleep(Duration::from_millis(500)).await;
56    let tasks = sup.list_tasks().await;
57    println!(" ─► Active tasks: {tasks:?}");
58
59    // ============================================================
60    // Demo 4: Cancel task (with confirmation)
61    // ============================================================
62    println!(" ─► Cancelling 'worker-B'...");
63
64    let cancelled = sup.cancel("worker-B").await?;
65    println!(" ─► Task cancelled: {cancelled}");
66
67    let alive = sup.is_alive("worker-B").await;
68    println!(" ─► Is alive: {alive}");
69
70    println!("Done");
71    Ok(())
72}
Source

pub async fn cancel(&self, name: &str) -> Result<bool, RuntimeError>

Cancel a task by name and wait for confirmation (TaskRemoved).

Examples found in repository?
examples/control.rs (line 64)
18async fn main() -> anyhow::Result<()> {
19    let sup = Arc::new(taskvisor::Supervisor::new(
20        taskvisor::SupervisorConfig::default(),
21        vec![],
22    ));
23    let runner = Arc::clone(&sup);
24    tokio::spawn(async move {
25        let _ = runner.run(vec![]).await;
26    });
27    sup.wait_ready().await;
28
29    // ============================================================
30    // Demo 1: Add task dynamically
31    // ============================================================
32    println!(" ─► Adding 'worker-A'...");
33
34    sup.add_task(make_worker("worker-A"))?;
35    tokio::time::sleep(Duration::from_secs(1)).await;
36    let tasks = sup.list_tasks().await;
37    println!(" ─► Active tasks: {tasks:?}");
38
39    // ============================================================
40    // Demo 2: Add second task
41    // ============================================================
42    println!(" ─► Adding 'worker-B'...");
43
44    sup.add_task(make_worker("worker-B"))?;
45    tokio::time::sleep(Duration::from_secs(1)).await;
46    let tasks = sup.list_tasks().await;
47    println!(" ─► Active tasks: {tasks:?}");
48
49    // ============================================================
50    // Demo 3: Remove specific task
51    // ============================================================
52    println!(" ─► Removing 'worker-A'...");
53
54    sup.remove_task("worker-A")?;
55    tokio::time::sleep(Duration::from_millis(500)).await;
56    let tasks = sup.list_tasks().await;
57    println!(" ─► Active tasks: {tasks:?}");
58
59    // ============================================================
60    // Demo 4: Cancel task (with confirmation)
61    // ============================================================
62    println!(" ─► Cancelling 'worker-B'...");
63
64    let cancelled = sup.cancel("worker-B").await?;
65    println!(" ─► Task cancelled: {cancelled}");
66
67    let alive = sup.is_alive("worker-B").await;
68    println!(" ─► Is alive: {alive}");
69
70    println!("Done");
71    Ok(())
72}
Source

pub async fn cancel_with_timeout( &self, name: &str, wait_for: Duration, ) -> Result<bool, RuntimeError>

Cancel with explicit timeout.

Source

pub async fn submit(&self, spec: ControllerSpec) -> Result<(), ControllerError>

Submits a task to the controller (if enabled).

Returns an error if:

  • Controller feature is disabled
  • Controller is not configured
  • Submission queue is full (use try_submit for non-blocking)

Requires the controller feature flag.

Examples found in repository?
examples/controller.rs (line 65)
46async fn main() -> anyhow::Result<()> {
47    let sup = taskvisor::Supervisor::builder(taskvisor::SupervisorConfig::default())
48        .with_controller(taskvisor::ControllerConfig::default())
49        .build();
50
51    let runner = Arc::clone(&sup);
52    tokio::spawn(async move {
53        let _ = runner.run(vec![]).await;
54    });
55    sup.wait_ready().await;
56
57    // ============================================================
58    // Demo -> Queue: Tasks execute one after another
59    // ============================================================
60    println!("Demo 1: Queue Policy");
61    println!(" └► Submit 3 tasks with same name: they run sequentially");
62
63    for _ in 1..=3 {
64        let spec = make_spec("job-in-queue", 800);
65        sup.submit(taskvisor::ControllerSpec::queue(spec)).await?;
66    }
67
68    tokio::time::sleep(Duration::from_secs(4)).await;
69    println!();
70
71    // ============================================================
72    // Demo -> Replace: New task cancels running one
73    // ============================================================
74    println!("Demo 2: Replace Policy");
75    println!(" └► Submit task, wait 500ms, submit another: first gets cancelled");
76
77    let task_1 = make_spec("job-replace", 6000);
78    let task_2 = make_spec("job-replace", 500);
79
80    sup.submit(taskvisor::ControllerSpec::replace(task_1))
81        .await?;
82    tokio::time::sleep(Duration::from_secs(1)).await;
83    sup.submit(taskvisor::ControllerSpec::replace(task_2))
84        .await?;
85
86    tokio::time::sleep(Duration::from_secs(2)).await;
87    println!();
88
89    // ============================================================
90    // Demo -> DropIfRunning: Ignores(skip) new tasks while busy
91    // ============================================================
92    println!("Demo 3: DropIfRunning Policy");
93    println!(" └► Submit task & submit another while first is running: second is ignored");
94
95    let task_1 = make_spec("job-drop-if-running", 1000);
96    let task_2 = make_spec("job-drop-if-running", 10000);
97
98    sup.submit(taskvisor::ControllerSpec::drop_if_running(task_1))
99        .await?;
100    tokio::time::sleep(Duration::from_millis(250)).await;
101    sup.submit(taskvisor::ControllerSpec::drop_if_running(task_2))
102        .await?;
103
104    tokio::time::sleep(Duration::from_secs(2)).await;
105    println!();
106
107    println!("Done");
108    Ok(())
109}
Source

pub fn try_submit(&self, spec: ControllerSpec) -> Result<(), ControllerError>

Tries to submit a task without blocking.

Returns TrySubmitError::Full if the queue is full.

Requires the controller feature flag.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V