Skip to main content

solti_core/supervisor/
mod.rs

1//! High-level API over taskvisor `Supervisor` used by solti-core.
2//!
3//! Responsibilities:
4//! - owns a [`Supervisor`] instance and runs its event loop in the background;
5//! - uses [`RunnerRouter`] to build concrete tasks from [`TaskSpec`];
6//! - maps model-level specs / policies into controller specs and submits them.
7use std::{sync::Arc, time::Duration};
8
9use solti_model::{Task, TaskId, TaskPage, TaskPhase, TaskQuery, TaskRun, TaskSpec};
10use taskvisor::{
11    ControllerConfig, ControllerSpec, Subscribe, Supervisor, SupervisorConfig, SupervisorHandle,
12    TaskRef, TaskSpec as TvTaskSpec,
13};
14use tracing::{debug, info, instrument};
15
16use solti_runner::RunnerRouter;
17
18use crate::system::init_uptime;
19use crate::{
20    error::CoreError,
21    map::{to_admission_policy, to_backoff_policy, to_restart_policy},
22    state::{StateConfig, StateSubscriber, TaskState, state_sweep},
23};
24
25/// Thin wrapper around taskvisor [`Supervisor`] with a runner router.
26///
27/// This type is responsible for:
28/// - constructing and running the supervisor;
29/// - selecting a concrete runner for each [`TaskSpec`];
30/// - mapping model-level specs into controller specs and submitting them.
31///
32/// ## Also
33///
34/// - [`CoreError`] error type returned by all methods.
35/// - [`StateConfig`] configures sweep TTLs and interval (defaults are sane).
36/// - [`solti_runner::RunnerRouter`] picks a runner for each submitted spec.
37pub struct SupervisorApi {
38    handle: SupervisorHandle,
39    router: RunnerRouter,
40    state: TaskState,
41}
42
43impl SupervisorApi {
44    /// Create a supervisor with explicit configs and start its run loop in the background.
45    ///
46    /// - `sup_cfg`      - supervisor configuration;
47    /// - `ctrl_cfg`     - controller configuration;
48    /// - `subscribers`  - event subscribers to attach to the supervisor;
49    /// - `router`       - runner router [`solti_model::TaskKind`];
50    /// - `state_cfg`    - sweep TTLs and interval ([`StateConfig::default()`] is usually fine).
51    ///
52    /// The supervisor event loop is started via [`Supervisor::serve()`] which returns
53    /// a [`SupervisorHandle`] for dynamic task management.
54    ///
55    /// A periodic sweep task is automatically submitted to prevent unbounded memory growth.
56    /// It removes completed runs and terminal tasks that exceed their configured TTLs.
57    pub async fn new(
58        sup_cfg: SupervisorConfig,
59        ctrl_cfg: ControllerConfig,
60        mut subscribers: Vec<Arc<dyn Subscribe>>,
61        router: RunnerRouter,
62        state_cfg: StateConfig,
63    ) -> Result<Self, CoreError> {
64        let state = TaskState::new();
65        subscribers.push(Arc::new(StateSubscriber::new(state.clone())));
66
67        let sup = Supervisor::builder(sup_cfg)
68            .with_subscribers(subscribers)
69            .with_controller(ctrl_cfg)
70            .build();
71
72        let handle = sup.serve();
73        init_uptime();
74
75        let api = Self {
76            handle,
77            router,
78            state,
79        };
80
81        // Sweep is always-on: prevents unbounded memory growth by periodically
82        // removing completed runs and terminal tasks that exceed their TTL.
83        let (task, spec) = state_sweep(api.state.clone(), state_cfg);
84        api.submit_with_task(task, &spec).await?;
85        info!("supervisor is ready (sweep active)");
86
87        Ok(api)
88    }
89
90    /// Get task information by ID.
91    pub fn get_task(&self, id: &TaskId) -> Option<Task> {
92        self.state.get(id)
93    }
94
95    /// List all tasks in a specific slot.
96    pub fn list_tasks_by_slot(&self, slot: &str) -> Vec<Task> {
97        self.state.list_by_slot(slot)
98    }
99
100    /// List all tasks.
101    pub fn list_all_tasks(&self) -> Vec<Task> {
102        self.state.list_all()
103    }
104
105    /// List tasks by phase.
106    pub fn list_tasks_by_status(&self, phase: TaskPhase) -> Vec<Task> {
107        self.state.list_by_status(phase)
108    }
109
110    /// Query tasks with combined filters and pagination.
111    pub fn query_tasks(&self, query: &TaskQuery) -> TaskPage<Task> {
112        self.state.query(query)
113    }
114
115    /// List execution history for a specific task (oldest first).
116    pub fn list_task_runs(&self, id: &TaskId) -> Vec<TaskRun> {
117        self.state.list_runs(id)
118    }
119
120    /// Stop a task and purge its run history.
121    #[instrument(level = "debug", skip(self), fields(task_id = %id))]
122    pub async fn delete_task(&self, id: &TaskId) -> Result<(), CoreError> {
123        debug!("deleting task: {}", id);
124
125        let was_cancelled = self
126            .handle
127            .cancel(id.as_str())
128            .await
129            .map_err(|e| CoreError::Supervisor(format!("cancel failed: {}", e)))?;
130
131        let had_local = self.state.delete_task(id);
132
133        if !was_cancelled && !had_local {
134            debug!("delete_task: no such task in supervisor or state; idempotent no-op");
135        }
136        Ok(())
137    }
138
139    /// Get a clone of the underlying supervisor handle.
140    pub fn handle(&self) -> SupervisorHandle {
141        self.handle.clone()
142    }
143
144    /// Build and submit a task described by [`TaskSpec`].
145    ///
146    /// Steps:
147    /// 1. Ask the [`RunnerRouter`] to pick a runner and build a [`TaskRef`].
148    /// 2. Delegate to [`SupervisorApi::submit_with_task`].
149    ///
150    /// This is the primary entrypoint for tasks that are fully described by the public [`solti_model::TaskKind`] model.
151    #[instrument(level = "debug", skip(self, spec), fields(slot = %spec.slot(), kind = ?spec.kind()))]
152    pub async fn submit(&self, spec: &TaskSpec) -> Result<TaskId, CoreError> {
153        spec.validate()?;
154
155        let task = self.router.build(spec)?;
156        self.submit_with_task(task, spec).await
157    }
158
159    /// Submit a pre-built task together with its spec.
160    ///
161    /// This API is intended for in-process / code-defined tasks (with `TaskKind::Embedded`).
162    ///
163    /// The caller is responsible for constructing the [`TaskRef`];
164    /// the spec controls timeout, restart, backoff and admission behavior.
165    #[instrument(level = "debug", skip(self, task, spec), fields(slot = %spec.slot()))]
166    pub async fn submit_with_task(
167        &self,
168        task: TaskRef,
169        spec: &TaskSpec,
170    ) -> Result<TaskId, CoreError> {
171        let task_id = TaskId::from(task.name());
172
173        self.state.add_task(task_id.clone(), spec.clone());
174
175        let task_spec = TvTaskSpec::new(
176            task,
177            to_restart_policy(spec.restart())?,
178            to_backoff_policy(spec.backoff())?,
179            Some(Duration::from_millis(spec.timeout().as_millis())),
180        );
181        let controller_spec =
182            ControllerSpec::new(to_admission_policy(spec.admission())?, task_spec);
183
184        debug!("submitting pre-built task via controller");
185        if let Err(e) = self.handle.submit(controller_spec).await {
186            self.state.unregister_task(&task_id);
187            return Err(CoreError::Supervisor(e.to_string()));
188        }
189        Ok(task_id)
190    }
191
192    /// Gracefully shut down the supervisor: cancel all tasks and wait for completion.
193    ///
194    /// Consumes `self` - no further operations are possible after shutdown.
195    /// The grace period is determined by [`SupervisorConfig`] passed to [`new`](Self::new).
196    ///
197    /// # Example
198    /// ```text
199    /// api.shutdown().await?;
200    /// ```
201    #[instrument(level = "info", skip(self))]
202    pub async fn shutdown(self) -> Result<(), CoreError> {
203        info!("initiating graceful shutdown");
204        self.handle
205            .shutdown()
206            .await
207            .map_err(|e| CoreError::Supervisor(e.to_string()))
208    }
209
210    /// Cancel a running task by ID (in-process Rust API).
211    #[instrument(level = "debug", skip(self), fields(task_id = %id))]
212    pub async fn cancel_task(&self, id: &TaskId) -> Result<(), CoreError> {
213        debug!("cancelling task: {}", id);
214
215        let was_cancelled = self
216            .handle
217            .cancel(id.as_str())
218            .await
219            .map_err(|e| CoreError::Supervisor(format!("cancel failed: {}", e)))?;
220
221        if !was_cancelled {
222            return Err(CoreError::Supervisor(format!("task not found: {}", id)));
223        }
224
225        debug!("task cancelled successfully: {}", id);
226        Ok(())
227    }
228}
229
230#[cfg(test)]
231mod tests {
232    use super::*;
233
234    use std::sync::atomic::{AtomicBool, Ordering};
235
236    use solti_model::{AdmissionPolicy, BackoffPolicy, JitterPolicy, RestartPolicy, TaskKind};
237    use taskvisor::{TaskError, TaskFn};
238    use tokio_util::sync::CancellationToken;
239
240    fn mk_backoff() -> BackoffPolicy {
241        BackoffPolicy {
242            jitter: JitterPolicy::Equal,
243            first_ms: 1_000,
244            max_ms: 5_000,
245            factor: 2.0,
246        }
247    }
248
249    #[tokio::test]
250    async fn submit_with_task_succeeds_for_simple_task() {
251        let router = RunnerRouter::new();
252        let api = SupervisorApi::new(
253            SupervisorConfig::default(),
254            ControllerConfig::default(),
255            Vec::new(),
256            router,
257            StateConfig::default(),
258        )
259        .await
260        .expect("failed to create SupervisorApi");
261
262        let task: TaskRef = TaskFn::arc("test-task", |_ctx: CancellationToken| async move {
263            Ok::<(), TaskError>(())
264        });
265
266        let spec = TaskSpec::builder("test-slot", TaskKind::Embedded, 1_000_u64)
267            .restart(RestartPolicy::Never)
268            .backoff(mk_backoff())
269            .admission(AdmissionPolicy::DropIfRunning)
270            .build()
271            .expect("valid spec");
272
273        let res = api.submit_with_task(task, &spec).await;
274        match res {
275            Ok(task_id) => {
276                assert!(!task_id.as_str().is_empty());
277                assert!(task_id.as_str().contains("test-task"));
278            }
279            Err(e) => panic!("expected Ok(TaskId), got error: {e:?}"),
280        }
281    }
282
283    #[tokio::test]
284    async fn delete_task_stops_running_task_and_wipes_state() {
285        let router = RunnerRouter::new();
286        let api = SupervisorApi::new(
287            SupervisorConfig::default(),
288            ControllerConfig::default(),
289            Vec::new(),
290            router,
291            StateConfig::default(),
292        )
293        .await
294        .expect("SupervisorApi::new");
295
296        let cancelled_observed = Arc::new(AtomicBool::new(false));
297        let flag = Arc::clone(&cancelled_observed);
298        let task: TaskRef = TaskFn::arc("kill-me", move |ctx: CancellationToken| {
299            let flag = Arc::clone(&flag);
300            async move {
301                while !ctx.is_cancelled() {
302                    tokio::time::sleep(Duration::from_millis(5)).await;
303                }
304                flag.store(true, Ordering::SeqCst);
305                Ok::<(), TaskError>(())
306            }
307        });
308
309        let spec = TaskSpec::builder("slot-delete", TaskKind::Embedded, 60_000_u64)
310            .restart(RestartPolicy::Never)
311            .backoff(mk_backoff())
312            .admission(AdmissionPolicy::Replace)
313            .build()
314            .expect("spec builds");
315
316        let task_id = api
317            .submit_with_task(task, &spec)
318            .await
319            .expect("submit_with_task");
320
321        let handle = api.handle();
322        let mut alive = false;
323        for _ in 0..100 {
324            if handle.is_alive(task_id.as_str()).await {
325                alive = true;
326                break;
327            }
328            tokio::time::sleep(Duration::from_millis(10)).await;
329        }
330        assert!(
331            alive,
332            "task body must reach Running state before we try to delete"
333        );
334
335        api.delete_task(&task_id)
336            .await
337            .expect("delete_task must Ok");
338
339        assert!(
340            api.get_task(&task_id).is_none(),
341            "state must be wiped after delete"
342        );
343        assert!(
344            api.list_task_runs(&task_id).is_empty(),
345            "run history must be purged by delete"
346        );
347
348        for _ in 0..100 {
349            if cancelled_observed.load(Ordering::SeqCst) {
350                break;
351            }
352            tokio::time::sleep(Duration::from_millis(10)).await;
353        }
354        assert!(
355            cancelled_observed.load(Ordering::SeqCst),
356            "task body must observe the cancel token — delete must cancel, not just wipe state"
357        );
358    }
359
360    #[tokio::test]
361    async fn delete_task_is_idempotent_on_missing() {
362        let router = RunnerRouter::new();
363        let api = SupervisorApi::new(
364            SupervisorConfig::default(),
365            ControllerConfig::default(),
366            Vec::new(),
367            router,
368            StateConfig::default(),
369        )
370        .await
371        .expect("SupervisorApi::new");
372
373        let missing = TaskId::from("never-submitted");
374        api.delete_task(&missing)
375            .await
376            .expect("delete on missing id must be Ok");
377    }
378
379    #[tokio::test]
380    async fn submit_rejects_taskkind_embedded() {
381        let router = RunnerRouter::new();
382        let api = SupervisorApi::new(
383            SupervisorConfig::default(),
384            ControllerConfig::default(),
385            Vec::new(),
386            router,
387            StateConfig::default(),
388        )
389        .await
390        .expect("failed to create SupervisorApi");
391
392        let spec = TaskSpec::builder("test-slot-none", TaskKind::Embedded, 1_000_u64)
393            .restart(RestartPolicy::Never)
394            .backoff(mk_backoff())
395            .admission(AdmissionPolicy::DropIfRunning)
396            .build()
397            .expect("valid spec");
398        let res = api.submit(&spec).await;
399
400        match res {
401            Err(CoreError::InvalidSpec(e)) => {
402                assert!(e.to_string().contains("TaskKind::Embedded"));
403            }
404            Ok(_) => panic!("expected error for TaskKind::Embedded, got Ok(TaskId)"),
405            Err(e) => panic!("expected CoreError::InvalidSpec, got {e:?}"),
406        }
407    }
408}