1use std::sync::Arc;
11
12use solti_model::{Labels, TaskKind, TaskSpec};
13use taskvisor::TaskRef;
14use tracing::{debug, instrument, trace};
15
16use crate::context::BuildContext;
17use crate::error::RunnerError;
18use crate::runner::Runner;
19
20struct RunnerEntry {
22 runner: Arc<dyn Runner>,
24 labels: Labels,
26}
27
28#[derive(Default)]
45pub struct RunnerRouter {
46 runners: Vec<RunnerEntry>,
47 ctx: BuildContext,
48}
49
50impl RunnerRouter {
51 #[inline]
53 pub fn new() -> Self {
54 Self {
55 runners: Vec::new(),
56 ctx: BuildContext::default(),
57 }
58 }
59
60 #[inline]
64 pub fn with_context(mut self, ctx: BuildContext) -> Self {
65 self.ctx = ctx;
66 self
67 }
68
69 #[inline]
73 pub fn register(&mut self, runner: Arc<dyn Runner>) {
74 self.runners.push(RunnerEntry {
75 runner,
76 labels: Labels::default(),
77 });
78 }
79
80 #[inline]
84 pub fn register_with_labels(&mut self, runner: Arc<dyn Runner>, labels: Labels) {
85 self.runners.push(RunnerEntry { runner, labels });
86 }
87
88 pub fn pick(&self, spec: &TaskSpec) -> Option<&Arc<dyn Runner>> {
96 let selector = spec.runner_selector();
97
98 self.runners
99 .iter()
100 .find(|entry| {
101 entry.runner.supports(spec) && selector.is_none_or(|sel| sel.matches(&entry.labels))
102 })
103 .map(|entry| &entry.runner)
104 }
105
106 #[instrument(level = "debug", skip(self, spec), fields(kind = ?spec.kind()))]
111 pub fn build(&self, spec: &TaskSpec) -> Result<TaskRef, RunnerError> {
112 trace!(spec = ?spec, "router received spec");
113
114 if matches!(spec.kind(), TaskKind::Embedded) {
115 return Err(RunnerError::NoRunner(
116 "TaskKind::Embedded requires submit_with_task()".to_string(),
117 ));
118 }
119 let r = self
120 .pick(spec)
121 .ok_or_else(|| RunnerError::NoRunner(spec.kind().kind().to_string()))?;
122
123 let task = r.build_task(spec, &self.ctx)?;
124 debug!(runner = r.name(), "runner built task successfully");
125 Ok(task)
126 }
127
128 pub fn contains_label(&self, label_key: &str, label_value: &str) -> bool {
130 self.runners
131 .iter()
132 .any(|e| e.labels.get(label_key) == Some(label_value))
133 }
134}
135
136#[cfg(test)]
137mod tests {
138 use super::*;
139 use crate::RunnerError;
140
141 use solti_model::{
142 AdmissionPolicy, BackoffPolicy, Flag, JitterPolicy, Labels, RunnerSelector, SubprocessMode,
143 SubprocessSpec, TaskEnv, WasmSpec,
144 };
145 use std::path::PathBuf;
146 use taskvisor::{TaskError, TaskFn};
147 use tokio_util::sync::CancellationToken;
148
149 struct SubprocessRunnerDummy;
150
151 impl Runner for SubprocessRunnerDummy {
152 fn name(&self) -> &'static str {
153 "subprocess-only"
154 }
155
156 fn supports(&self, spec: &TaskSpec) -> bool {
157 matches!(spec.kind(), TaskKind::Subprocess(_))
158 }
159
160 fn build_task(
161 &self,
162 _spec: &TaskSpec,
163 _ctx: &BuildContext,
164 ) -> Result<TaskRef, RunnerError> {
165 let task = TaskFn::arc(
166 "test-subprocess-runner",
167 |_ctx: CancellationToken| async move { Ok::<(), TaskError>(()) },
168 );
169 Ok(task)
170 }
171 }
172
173 fn mk_backoff() -> BackoffPolicy {
174 BackoffPolicy {
175 jitter: JitterPolicy::Equal,
176 first_ms: 1_000,
177 max_ms: 5_000,
178 factor: 2.0,
179 }
180 }
181
182 fn mk_spec(kind: TaskKind) -> TaskSpec {
183 TaskSpec::builder("test-slot", kind, 10_000_u64)
184 .backoff(mk_backoff())
185 .admission(AdmissionPolicy::DropIfRunning)
186 .build()
187 .expect("valid spec")
188 }
189
190 #[test]
191 fn build_fails_for_taskkind_embedded() {
192 let router = RunnerRouter::new();
193 let spec = mk_spec(TaskKind::Embedded);
194
195 let res = router.build(&spec);
196
197 match res {
198 Err(RunnerError::NoRunner(msg)) => {
199 assert!(
200 msg.contains("TaskKind::Embedded"),
201 "unexpected NoRunner message: {msg}"
202 );
203 }
204 Ok(_) => panic!("expected RunnerError::NoRunner for TaskKind::Embedded, got Ok(..)"),
205 Err(e) => panic!("expected RunnerError::NoRunner for TaskKind::Embedded, got {e:?}"),
206 }
207 }
208
209 #[test]
210 fn build_uses_registered_runner_for_subprocess() {
211 let mut router = RunnerRouter::new();
212 router.register(Arc::new(SubprocessRunnerDummy));
213
214 let spec = mk_spec(TaskKind::Subprocess(SubprocessSpec {
215 mode: SubprocessMode::Command {
216 command: "echo".to_string(),
217 args: vec!["hello".into()],
218 },
219 env: TaskEnv::default(),
220 cwd: None,
221 fail_on_non_zero: Flag::default(),
222 }));
223
224 let res = router.build(&spec);
225
226 match res {
227 Ok(_task) => {}
228 Err(e) => panic!("expected Ok(TaskRef) for subprocess, got error: {e:?}"),
229 }
230 }
231
232 #[test]
233 fn build_fails_when_no_runner_supports_kind() {
234 let mut router = RunnerRouter::new();
235 router.register(Arc::new(SubprocessRunnerDummy));
236
237 let spec = mk_spec(TaskKind::Wasm(WasmSpec {
238 module: PathBuf::from("mod.wasm"),
239 args: Vec::new(),
240 env: TaskEnv::default(),
241 }));
242
243 let res = router.build(&spec);
244
245 match res {
246 Err(RunnerError::NoRunner(kind)) => {
247 assert_eq!(kind, "wasm", "expected NoRunner(\"wasm\")");
248 }
249 Ok(_) => panic!("expected RunnerError::NoRunner for wasm, got Ok(..)"),
250 Err(e) => panic!("expected RunnerError::NoRunner for wasm, got {e:?}"),
251 }
252 }
253
254 #[test]
255 fn pick_respects_runner_selector() {
256 struct R1;
257 struct R2;
258
259 impl Runner for R1 {
260 fn name(&self) -> &'static str {
261 "r1"
262 }
263
264 fn supports(&self, _spec: &TaskSpec) -> bool {
265 true
266 }
267
268 fn build_task(
269 &self,
270 _spec: &TaskSpec,
271 _ctx: &BuildContext,
272 ) -> Result<TaskRef, RunnerError> {
273 Ok(TaskFn::arc(
274 "r1-task",
275 |_ctx: CancellationToken| async move { Ok::<(), TaskError>(()) },
276 ))
277 }
278 }
279
280 impl Runner for R2 {
281 fn name(&self) -> &'static str {
282 "r2"
283 }
284
285 fn supports(&self, _spec: &TaskSpec) -> bool {
286 true
287 }
288
289 fn build_task(
290 &self,
291 _spec: &TaskSpec,
292 _ctx: &BuildContext,
293 ) -> Result<TaskRef, RunnerError> {
294 Ok(TaskFn::arc(
295 "r2-task",
296 |_ctx: CancellationToken| async move { Ok::<(), TaskError>(()) },
297 ))
298 }
299 }
300
301 let mut labels_r1 = Labels::new();
302 labels_r1.insert("runner-name", "runner-a");
303 let mut labels_r2 = Labels::new();
304 labels_r2.insert("runner-name", "runner-b");
305
306 let mut router = RunnerRouter::new();
307 router.register_with_labels(Arc::new(R1), labels_r1);
308 router.register_with_labels(Arc::new(R2), labels_r2);
309
310 let spec = {
311 let base = mk_spec(TaskKind::Subprocess(SubprocessSpec {
312 mode: SubprocessMode::Command {
313 command: "echo".into(),
314 args: vec!["hi".into()],
315 },
316 env: TaskEnv::default(),
317 cwd: None,
318 fail_on_non_zero: Flag::enabled(),
319 }));
320 let mut match_labels = Labels::new();
321 match_labels.insert("runner-name", "runner-b");
322 base.with_runner_selector(RunnerSelector::from_labels(match_labels))
323 };
324
325 let picked = router.pick(&spec).expect("runner should be picked");
326 assert_eq!(picked.name(), "r2");
327 }
328}