Skip to main content

controller/
controller.rs

1//! # Controller Example
2//!
3//! Shows how to use Controller with three admission policies:
4//! - Queue: tasks run one by one
5//! - Replace: cancels running task, starts new one
6//! - DropIfRunning: ignores new tasks if slot is busy
7//!
8//! ## Run
9//! ```bash
10//! cargo run --example controller --features "controller"
11//! ```
12
13#[cfg(not(feature = "controller"))]
14compile_error!("error");
15
16use std::{sync::Arc, time::Duration};
17
18fn make_spec(name: &'static str, duration_ms: u64) -> taskvisor::TaskSpec {
19    let task: taskvisor::TaskRef = taskvisor::TaskFn::arc(
20        name,
21        move |ctx: tokio_util::sync::CancellationToken| async move {
22            println!("{:>6}[{name}] started", "");
23
24            let start = tokio::time::Instant::now();
25            let sleep = tokio::time::sleep(Duration::from_millis(duration_ms));
26
27            tokio::pin!(sleep);
28            tokio::select! {
29                _ = &mut sleep => {
30                    println!("{:>6}[{name}] completed in {:?}", "", start.elapsed());
31                    Ok(())
32                }
33                _ = ctx.cancelled() => {
34                    println!("{:>6}[{name}] cancelled after {:?}", "", start.elapsed());
35                    Err(taskvisor::TaskError::Canceled)
36                }
37            }
38        },
39    );
40    let policy = taskvisor::RestartPolicy::Never;
41    let backoff = taskvisor::BackoffPolicy::default();
42    taskvisor::TaskSpec::new(task, policy, backoff, None)
43}
44
45#[tokio::main(flavor = "current_thread")]
46async fn main() -> anyhow::Result<()> {
47    let sup = taskvisor::Supervisor::builder(taskvisor::SupervisorConfig::default())
48        .with_controller(taskvisor::ControllerConfig::default())
49        .build();
50
51    let runner = Arc::clone(&sup);
52    tokio::spawn(async move {
53        let _ = runner.run(vec![]).await;
54    });
55    sup.wait_ready().await;
56
57    // ============================================================
58    // Demo -> Queue: Tasks execute one after another
59    // ============================================================
60    println!("Demo 1: Queue Policy");
61    println!(" └► Submit 3 tasks with same name: they run sequentially");
62
63    for _ in 1..=3 {
64        let spec = make_spec("job-in-queue", 800);
65        sup.submit(taskvisor::ControllerSpec::queue(spec)).await?;
66    }
67
68    tokio::time::sleep(Duration::from_secs(4)).await;
69    println!();
70
71    // ============================================================
72    // Demo -> Replace: New task cancels running one
73    // ============================================================
74    println!("Demo 2: Replace Policy");
75    println!(" └► Submit task, wait 500ms, submit another: first gets cancelled");
76
77    let task_1 = make_spec("job-replace", 6000);
78    let task_2 = make_spec("job-replace", 500);
79
80    sup.submit(taskvisor::ControllerSpec::replace(task_1))
81        .await?;
82    tokio::time::sleep(Duration::from_secs(1)).await;
83    sup.submit(taskvisor::ControllerSpec::replace(task_2))
84        .await?;
85
86    tokio::time::sleep(Duration::from_secs(2)).await;
87    println!();
88
89    // ============================================================
90    // Demo -> DropIfRunning: Ignores(skip) new tasks while busy
91    // ============================================================
92    println!("Demo 3: DropIfRunning Policy");
93    println!(" └► Submit task & submit another while first is running: second is ignored");
94
95    let task_1 = make_spec("job-drop-if-running", 1000);
96    let task_2 = make_spec("job-drop-if-running", 10000);
97
98    sup.submit(taskvisor::ControllerSpec::drop_if_running(task_1))
99        .await?;
100    tokio::time::sleep(Duration::from_millis(250)).await;
101    sup.submit(taskvisor::ControllerSpec::drop_if_running(task_2))
102        .await?;
103
104    tokio::time::sleep(Duration::from_secs(2)).await;
105    println!();
106
107    println!("Done");
108    Ok(())
109}