Skip to main content

solti_runner/
router.rs

1//! # Runner router.
2//!
3//! [`RunnerRouter`] selects the first registered [`Runner`](crate::Runner) that:
4//! 1. returns `true` from [`supports`](crate::Runner::supports) for the given spec, and
5//! 2. satisfies the [`RunnerSelector`](solti_model::RunnerSelector) label constraints (if any).
6//!
7//! Runners are checked in registration order.
8//!
9//! See the [crate root](crate) for architecture overview.
10use std::sync::Arc;
11
12use solti_model::{Labels, TaskKind, TaskSpec};
13use taskvisor::TaskRef;
14use tracing::{debug, instrument, trace};
15
16use crate::context::BuildContext;
17use crate::error::RunnerError;
18use crate::runner::Runner;
19
20/// Single runner entry with optional static labels used for routing.
21struct RunnerEntry {
22    /// Concrete runner implementation.
23    runner: Arc<dyn Runner>,
24    /// Static labels attached to this runner (e.g. capacity class, backend tag).
25    labels: Labels,
26}
27
28/// Router that selects an appropriate [`Runner`] for a given [`TaskSpec`].
29///
30/// Runners are checked in the order they were registered.
31/// The first runner whose [`Runner::supports`] method returns `true` and satisfies
32/// the [`TaskSpec::runner_selector`] (if any) is used to build the task.
33///
34/// ## Notes
35///
36/// - `TaskKind::Embedded` is not routable — use `SupervisorApi::submit_with_task` instead.
37/// - Default [`BuildContext`] uses empty env and [`NoOpMetrics`](crate::NoOpMetrics).
38///
39/// ## Also
40///
41/// - [`Runner`] — trait that concrete executors implement.
42/// - [`BuildContext`] — shared dependencies for all runners.
43/// - [`RunnerError::NoRunner`](crate::RunnerError::NoRunner) — returned when no runner matches.
44#[derive(Default)]
45pub struct RunnerRouter {
46    runners: Vec<RunnerEntry>,
47    ctx: BuildContext,
48}
49
50impl RunnerRouter {
51    /// Create an empty router with a default build context.
52    #[inline]
53    pub fn new() -> Self {
54        Self {
55            runners: Vec::new(),
56            ctx: BuildContext::default(),
57        }
58    }
59
60    /// Set a custom build context for all runners managed by this router.
61    ///
62    /// This is typically used to inject shared dependencies (config, observability, global handles, etc.) into runner instances.
63    #[inline]
64    pub fn with_context(mut self, ctx: BuildContext) -> Self {
65        self.ctx = ctx;
66        self
67    }
68
69    /// Register a new runner without labels.
70    ///
71    /// Runners are queried in the order they are registered; the first one that reports `supports(spec) == true` (and matches labels, if any) is used.
72    #[inline]
73    pub fn register(&mut self, runner: Arc<dyn Runner>) {
74        self.runners.push(RunnerEntry {
75            runner,
76            labels: Labels::default(),
77        });
78    }
79
80    /// Register a new runner with static labels.
81    ///
82    /// These labels are used by the router to further narrow down candidates when [`TaskSpec::runner_selector`] is set.
83    #[inline]
84    pub fn register_with_labels(&mut self, runner: Arc<dyn Runner>, labels: Labels) {
85        self.runners.push(RunnerEntry { runner, labels });
86    }
87
88    /// Pick the first runner that claims to support the given spec and matches the runner selector.
89    ///
90    /// Routing rules:
91    /// - filter runners by `Runner::supports(spec)`;
92    /// - if `spec.runner_selector()` is set, keep only runners whose `labels`
93    ///   satisfy all `match_labels` and `match_expressions` requirements;
94    /// - pick the first matching entry.
95    pub fn pick(&self, spec: &TaskSpec) -> Option<&Arc<dyn Runner>> {
96        let selector = spec.runner_selector();
97
98        self.runners
99            .iter()
100            .find(|entry| {
101                entry.runner.supports(spec) && selector.is_none_or(|sel| sel.matches(&entry.labels))
102            })
103            .map(|entry| &entry.runner)
104    }
105
106    /// Build a [`TaskRef`] for the given spec using the selected runner.
107    ///
108    /// `TaskKind::Embedded` is not routable and must be used with
109    /// `SupervisorApi::submit_with_task`.
110    #[instrument(level = "debug", skip(self, spec), fields(kind = ?spec.kind()))]
111    pub fn build(&self, spec: &TaskSpec) -> Result<TaskRef, RunnerError> {
112        trace!(spec = ?spec, "router received spec");
113
114        if matches!(spec.kind(), TaskKind::Embedded) {
115            return Err(RunnerError::NoRunner(
116                "TaskKind::Embedded requires submit_with_task()".to_string(),
117            ));
118        }
119        let r = self
120            .pick(spec)
121            .ok_or_else(|| RunnerError::NoRunner(spec.kind().kind().to_string()))?;
122
123        let task = r.build_task(spec, &self.ctx)?;
124        debug!(runner = r.name(), "runner built task successfully");
125        Ok(task)
126    }
127
128    /// Returns `true` if at least one registered runner has `label_key == label_value`.
129    pub fn contains_label(&self, label_key: &str, label_value: &str) -> bool {
130        self.runners
131            .iter()
132            .any(|e| e.labels.get(label_key) == Some(label_value))
133    }
134}
135
136#[cfg(test)]
137mod tests {
138    use super::*;
139    use crate::RunnerError;
140
141    use solti_model::{
142        AdmissionPolicy, BackoffPolicy, Flag, JitterPolicy, Labels, RunnerSelector, SubprocessMode,
143        SubprocessSpec, TaskEnv, WasmSpec,
144    };
145    use std::path::PathBuf;
146    use taskvisor::{TaskError, TaskFn};
147    use tokio_util::sync::CancellationToken;
148
149    struct SubprocessRunnerDummy;
150
151    impl Runner for SubprocessRunnerDummy {
152        fn name(&self) -> &'static str {
153            "subprocess-only"
154        }
155
156        fn supports(&self, spec: &TaskSpec) -> bool {
157            matches!(spec.kind(), TaskKind::Subprocess(_))
158        }
159
160        fn build_task(
161            &self,
162            _spec: &TaskSpec,
163            _ctx: &BuildContext,
164        ) -> Result<TaskRef, RunnerError> {
165            let task = TaskFn::arc(
166                "test-subprocess-runner",
167                |_ctx: CancellationToken| async move { Ok::<(), TaskError>(()) },
168            );
169            Ok(task)
170        }
171    }
172
173    fn mk_backoff() -> BackoffPolicy {
174        BackoffPolicy {
175            jitter: JitterPolicy::Equal,
176            first_ms: 1_000,
177            max_ms: 5_000,
178            factor: 2.0,
179        }
180    }
181
182    fn mk_spec(kind: TaskKind) -> TaskSpec {
183        TaskSpec::builder("test-slot", kind, 10_000_u64)
184            .backoff(mk_backoff())
185            .admission(AdmissionPolicy::DropIfRunning)
186            .build()
187            .expect("valid spec")
188    }
189
190    #[test]
191    fn build_fails_for_taskkind_embedded() {
192        let router = RunnerRouter::new();
193        let spec = mk_spec(TaskKind::Embedded);
194
195        let res = router.build(&spec);
196
197        match res {
198            Err(RunnerError::NoRunner(msg)) => {
199                assert!(
200                    msg.contains("TaskKind::Embedded"),
201                    "unexpected NoRunner message: {msg}"
202                );
203            }
204            Ok(_) => panic!("expected RunnerError::NoRunner for TaskKind::Embedded, got Ok(..)"),
205            Err(e) => panic!("expected RunnerError::NoRunner for TaskKind::Embedded, got {e:?}"),
206        }
207    }
208
209    #[test]
210    fn build_uses_registered_runner_for_subprocess() {
211        let mut router = RunnerRouter::new();
212        router.register(Arc::new(SubprocessRunnerDummy));
213
214        let spec = mk_spec(TaskKind::Subprocess(SubprocessSpec {
215            mode: SubprocessMode::Command {
216                command: "echo".to_string(),
217                args: vec!["hello".into()],
218            },
219            env: TaskEnv::default(),
220            cwd: None,
221            fail_on_non_zero: Flag::default(),
222        }));
223
224        let res = router.build(&spec);
225
226        match res {
227            Ok(_task) => {}
228            Err(e) => panic!("expected Ok(TaskRef) for subprocess, got error: {e:?}"),
229        }
230    }
231
232    #[test]
233    fn build_fails_when_no_runner_supports_kind() {
234        let mut router = RunnerRouter::new();
235        router.register(Arc::new(SubprocessRunnerDummy));
236
237        let spec = mk_spec(TaskKind::Wasm(WasmSpec {
238            module: PathBuf::from("mod.wasm"),
239            args: Vec::new(),
240            env: TaskEnv::default(),
241        }));
242
243        let res = router.build(&spec);
244
245        match res {
246            Err(RunnerError::NoRunner(kind)) => {
247                assert_eq!(kind, "wasm", "expected NoRunner(\"wasm\")");
248            }
249            Ok(_) => panic!("expected RunnerError::NoRunner for wasm, got Ok(..)"),
250            Err(e) => panic!("expected RunnerError::NoRunner for wasm, got {e:?}"),
251        }
252    }
253
254    #[test]
255    fn pick_respects_runner_selector() {
256        struct R1;
257        struct R2;
258
259        impl Runner for R1 {
260            fn name(&self) -> &'static str {
261                "r1"
262            }
263
264            fn supports(&self, _spec: &TaskSpec) -> bool {
265                true
266            }
267
268            fn build_task(
269                &self,
270                _spec: &TaskSpec,
271                _ctx: &BuildContext,
272            ) -> Result<TaskRef, RunnerError> {
273                Ok(TaskFn::arc(
274                    "r1-task",
275                    |_ctx: CancellationToken| async move { Ok::<(), TaskError>(()) },
276                ))
277            }
278        }
279
280        impl Runner for R2 {
281            fn name(&self) -> &'static str {
282                "r2"
283            }
284
285            fn supports(&self, _spec: &TaskSpec) -> bool {
286                true
287            }
288
289            fn build_task(
290                &self,
291                _spec: &TaskSpec,
292                _ctx: &BuildContext,
293            ) -> Result<TaskRef, RunnerError> {
294                Ok(TaskFn::arc(
295                    "r2-task",
296                    |_ctx: CancellationToken| async move { Ok::<(), TaskError>(()) },
297                ))
298            }
299        }
300
301        let mut labels_r1 = Labels::new();
302        labels_r1.insert("runner-name", "runner-a");
303        let mut labels_r2 = Labels::new();
304        labels_r2.insert("runner-name", "runner-b");
305
306        let mut router = RunnerRouter::new();
307        router.register_with_labels(Arc::new(R1), labels_r1);
308        router.register_with_labels(Arc::new(R2), labels_r2);
309
310        let spec = {
311            let base = mk_spec(TaskKind::Subprocess(SubprocessSpec {
312                mode: SubprocessMode::Command {
313                    command: "echo".into(),
314                    args: vec!["hi".into()],
315                },
316                env: TaskEnv::default(),
317                cwd: None,
318                fail_on_non_zero: Flag::enabled(),
319            }));
320            let mut match_labels = Labels::new();
321            match_labels.insert("runner-name", "runner-b");
322            base.with_runner_selector(RunnerSelector::from_labels(match_labels))
323        };
324
325        let picked = router.pick(&spec).expect("runner should be picked");
326        assert_eq!(picked.name(), "r2");
327    }
328}