1use std::{sync::Arc, time::Duration};
8
9use solti_model::{Task, TaskId, TaskPage, TaskPhase, TaskQuery, TaskRun, TaskSpec};
10use taskvisor::{
11 ControllerConfig, ControllerSpec, Subscribe, Supervisor, SupervisorConfig, SupervisorHandle,
12 TaskRef, TaskSpec as TvTaskSpec,
13};
14use tracing::{debug, info, instrument};
15
16use solti_runner::RunnerRouter;
17
18use crate::system::init_uptime;
19use crate::{
20 error::CoreError,
21 map::{to_admission_policy, to_backoff_policy, to_restart_policy},
22 state::{StateConfig, StateSubscriber, TaskState, state_sweep},
23};
24
25pub struct SupervisorApi {
38 handle: SupervisorHandle,
39 router: RunnerRouter,
40 state: TaskState,
41}
42
43impl SupervisorApi {
44 pub async fn new(
58 sup_cfg: SupervisorConfig,
59 ctrl_cfg: ControllerConfig,
60 mut subscribers: Vec<Arc<dyn Subscribe>>,
61 router: RunnerRouter,
62 state_cfg: StateConfig,
63 ) -> Result<Self, CoreError> {
64 let state = TaskState::new();
65 subscribers.push(Arc::new(StateSubscriber::new(state.clone())));
66
67 let sup = Supervisor::builder(sup_cfg)
68 .with_subscribers(subscribers)
69 .with_controller(ctrl_cfg)
70 .build();
71
72 let handle = sup.serve();
73 init_uptime();
74
75 let api = Self {
76 handle,
77 router,
78 state,
79 };
80
81 let (task, spec) = state_sweep(api.state.clone(), state_cfg);
84 api.submit_with_task(task, &spec).await?;
85 info!("supervisor is ready (sweep active)");
86
87 Ok(api)
88 }
89
90 pub fn get_task(&self, id: &TaskId) -> Option<Task> {
92 self.state.get(id)
93 }
94
95 pub fn list_tasks_by_slot(&self, slot: &str) -> Vec<Task> {
97 self.state.list_by_slot(slot)
98 }
99
100 pub fn list_all_tasks(&self) -> Vec<Task> {
102 self.state.list_all()
103 }
104
105 pub fn list_tasks_by_status(&self, phase: TaskPhase) -> Vec<Task> {
107 self.state.list_by_status(phase)
108 }
109
110 pub fn query_tasks(&self, query: &TaskQuery) -> TaskPage<Task> {
112 self.state.query(query)
113 }
114
115 pub fn list_task_runs(&self, id: &TaskId) -> Vec<TaskRun> {
117 self.state.list_runs(id)
118 }
119
120 #[instrument(level = "debug", skip(self), fields(task_id = %id))]
122 pub async fn delete_task(&self, id: &TaskId) -> Result<(), CoreError> {
123 debug!("deleting task: {}", id);
124
125 let was_cancelled = self
126 .handle
127 .cancel(id.as_str())
128 .await
129 .map_err(|e| CoreError::Supervisor(format!("cancel failed: {}", e)))?;
130
131 let had_local = self.state.delete_task(id);
132
133 if !was_cancelled && !had_local {
134 debug!("delete_task: no such task in supervisor or state; idempotent no-op");
135 }
136 Ok(())
137 }
138
139 pub fn handle(&self) -> SupervisorHandle {
141 self.handle.clone()
142 }
143
144 #[instrument(level = "debug", skip(self, spec), fields(slot = %spec.slot(), kind = ?spec.kind()))]
152 pub async fn submit(&self, spec: &TaskSpec) -> Result<TaskId, CoreError> {
153 spec.validate()?;
154
155 let task = self.router.build(spec)?;
156 self.submit_with_task(task, spec).await
157 }
158
159 #[instrument(level = "debug", skip(self, task, spec), fields(slot = %spec.slot()))]
166 pub async fn submit_with_task(
167 &self,
168 task: TaskRef,
169 spec: &TaskSpec,
170 ) -> Result<TaskId, CoreError> {
171 let task_id = TaskId::from(task.name());
172
173 self.state.add_task(task_id.clone(), spec.clone());
174
175 let task_spec = TvTaskSpec::new(
176 task,
177 to_restart_policy(spec.restart())?,
178 to_backoff_policy(spec.backoff())?,
179 Some(Duration::from_millis(spec.timeout().as_millis())),
180 );
181 let controller_spec =
182 ControllerSpec::new(to_admission_policy(spec.admission())?, task_spec);
183
184 debug!("submitting pre-built task via controller");
185 if let Err(e) = self.handle.submit(controller_spec).await {
186 self.state.unregister_task(&task_id);
187 return Err(CoreError::Supervisor(e.to_string()));
188 }
189 Ok(task_id)
190 }
191
192 #[instrument(level = "info", skip(self))]
202 pub async fn shutdown(self) -> Result<(), CoreError> {
203 info!("initiating graceful shutdown");
204 self.handle
205 .shutdown()
206 .await
207 .map_err(|e| CoreError::Supervisor(e.to_string()))
208 }
209
210 #[instrument(level = "debug", skip(self), fields(task_id = %id))]
212 pub async fn cancel_task(&self, id: &TaskId) -> Result<(), CoreError> {
213 debug!("cancelling task: {}", id);
214
215 let was_cancelled = self
216 .handle
217 .cancel(id.as_str())
218 .await
219 .map_err(|e| CoreError::Supervisor(format!("cancel failed: {}", e)))?;
220
221 if !was_cancelled {
222 return Err(CoreError::Supervisor(format!("task not found: {}", id)));
223 }
224
225 debug!("task cancelled successfully: {}", id);
226 Ok(())
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233
234 use std::sync::atomic::{AtomicBool, Ordering};
235
236 use solti_model::{AdmissionPolicy, BackoffPolicy, JitterPolicy, RestartPolicy, TaskKind};
237 use taskvisor::{TaskError, TaskFn};
238 use tokio_util::sync::CancellationToken;
239
240 fn mk_backoff() -> BackoffPolicy {
241 BackoffPolicy {
242 jitter: JitterPolicy::Equal,
243 first_ms: 1_000,
244 max_ms: 5_000,
245 factor: 2.0,
246 }
247 }
248
249 #[tokio::test]
250 async fn submit_with_task_succeeds_for_simple_task() {
251 let router = RunnerRouter::new();
252 let api = SupervisorApi::new(
253 SupervisorConfig::default(),
254 ControllerConfig::default(),
255 Vec::new(),
256 router,
257 StateConfig::default(),
258 )
259 .await
260 .expect("failed to create SupervisorApi");
261
262 let task: TaskRef = TaskFn::arc("test-task", |_ctx: CancellationToken| async move {
263 Ok::<(), TaskError>(())
264 });
265
266 let spec = TaskSpec::builder("test-slot", TaskKind::Embedded, 1_000_u64)
267 .restart(RestartPolicy::Never)
268 .backoff(mk_backoff())
269 .admission(AdmissionPolicy::DropIfRunning)
270 .build()
271 .expect("valid spec");
272
273 let res = api.submit_with_task(task, &spec).await;
274 match res {
275 Ok(task_id) => {
276 assert!(!task_id.as_str().is_empty());
277 assert!(task_id.as_str().contains("test-task"));
278 }
279 Err(e) => panic!("expected Ok(TaskId), got error: {e:?}"),
280 }
281 }
282
283 #[tokio::test]
284 async fn delete_task_stops_running_task_and_wipes_state() {
285 let router = RunnerRouter::new();
286 let api = SupervisorApi::new(
287 SupervisorConfig::default(),
288 ControllerConfig::default(),
289 Vec::new(),
290 router,
291 StateConfig::default(),
292 )
293 .await
294 .expect("SupervisorApi::new");
295
296 let cancelled_observed = Arc::new(AtomicBool::new(false));
297 let flag = Arc::clone(&cancelled_observed);
298 let task: TaskRef = TaskFn::arc("kill-me", move |ctx: CancellationToken| {
299 let flag = Arc::clone(&flag);
300 async move {
301 while !ctx.is_cancelled() {
302 tokio::time::sleep(Duration::from_millis(5)).await;
303 }
304 flag.store(true, Ordering::SeqCst);
305 Ok::<(), TaskError>(())
306 }
307 });
308
309 let spec = TaskSpec::builder("slot-delete", TaskKind::Embedded, 60_000_u64)
310 .restart(RestartPolicy::Never)
311 .backoff(mk_backoff())
312 .admission(AdmissionPolicy::Replace)
313 .build()
314 .expect("spec builds");
315
316 let task_id = api
317 .submit_with_task(task, &spec)
318 .await
319 .expect("submit_with_task");
320
321 let handle = api.handle();
322 let mut alive = false;
323 for _ in 0..100 {
324 if handle.is_alive(task_id.as_str()).await {
325 alive = true;
326 break;
327 }
328 tokio::time::sleep(Duration::from_millis(10)).await;
329 }
330 assert!(
331 alive,
332 "task body must reach Running state before we try to delete"
333 );
334
335 api.delete_task(&task_id)
336 .await
337 .expect("delete_task must Ok");
338
339 assert!(
340 api.get_task(&task_id).is_none(),
341 "state must be wiped after delete"
342 );
343 assert!(
344 api.list_task_runs(&task_id).is_empty(),
345 "run history must be purged by delete"
346 );
347
348 for _ in 0..100 {
349 if cancelled_observed.load(Ordering::SeqCst) {
350 break;
351 }
352 tokio::time::sleep(Duration::from_millis(10)).await;
353 }
354 assert!(
355 cancelled_observed.load(Ordering::SeqCst),
356 "task body must observe the cancel token — delete must cancel, not just wipe state"
357 );
358 }
359
360 #[tokio::test]
361 async fn delete_task_is_idempotent_on_missing() {
362 let router = RunnerRouter::new();
363 let api = SupervisorApi::new(
364 SupervisorConfig::default(),
365 ControllerConfig::default(),
366 Vec::new(),
367 router,
368 StateConfig::default(),
369 )
370 .await
371 .expect("SupervisorApi::new");
372
373 let missing = TaskId::from("never-submitted");
374 api.delete_task(&missing)
375 .await
376 .expect("delete on missing id must be Ok");
377 }
378
379 #[tokio::test]
380 async fn submit_rejects_taskkind_embedded() {
381 let router = RunnerRouter::new();
382 let api = SupervisorApi::new(
383 SupervisorConfig::default(),
384 ControllerConfig::default(),
385 Vec::new(),
386 router,
387 StateConfig::default(),
388 )
389 .await
390 .expect("failed to create SupervisorApi");
391
392 let spec = TaskSpec::builder("test-slot-none", TaskKind::Embedded, 1_000_u64)
393 .restart(RestartPolicy::Never)
394 .backoff(mk_backoff())
395 .admission(AdmissionPolicy::DropIfRunning)
396 .build()
397 .expect("valid spec");
398 let res = api.submit(&spec).await;
399
400 match res {
401 Err(CoreError::InvalidSpec(e)) => {
402 assert!(e.to_string().contains("TaskKind::Embedded"));
403 }
404 Ok(_) => panic!("expected error for TaskKind::Embedded, got Ok(TaskId)"),
405 Err(e) => panic!("expected CoreError::InvalidSpec, got {e:?}"),
406 }
407 }
408}