use std::sync::Arc;
use solti_model::{Labels, TaskKind, TaskSpec};
use taskvisor::TaskRef;
use tracing::{debug, instrument, trace};
use crate::context::BuildContext;
use crate::error::RunnerError;
use crate::runner::Runner;
struct RunnerEntry {
runner: Arc<dyn Runner>,
labels: Labels,
}
#[derive(Default)]
pub struct RunnerRouter {
runners: Vec<RunnerEntry>,
ctx: BuildContext,
}
impl RunnerRouter {
#[inline]
pub fn new() -> Self {
Self {
runners: Vec::new(),
ctx: BuildContext::default(),
}
}
#[inline]
pub fn with_context(mut self, ctx: BuildContext) -> Self {
self.ctx = ctx;
self
}
#[inline]
pub fn register(&mut self, runner: Arc<dyn Runner>) {
self.runners.push(RunnerEntry {
runner,
labels: Labels::default(),
});
}
#[inline]
pub fn register_with_labels(&mut self, runner: Arc<dyn Runner>, labels: Labels) {
self.runners.push(RunnerEntry { runner, labels });
}
pub fn pick(&self, spec: &TaskSpec) -> Option<&Arc<dyn Runner>> {
let selector = spec.runner_selector();
self.runners
.iter()
.find(|entry| {
entry.runner.supports(spec) && selector.is_none_or(|sel| sel.matches(&entry.labels))
})
.map(|entry| &entry.runner)
}
#[instrument(level = "debug", skip(self, spec), fields(kind = ?spec.kind()))]
pub fn build(&self, spec: &TaskSpec) -> Result<TaskRef, RunnerError> {
trace!(spec = ?spec, "router received spec");
if matches!(spec.kind(), TaskKind::Embedded) {
return Err(RunnerError::NoRunner(
"TaskKind::Embedded requires submit_with_task()".to_string(),
));
}
let r = self
.pick(spec)
.ok_or_else(|| RunnerError::NoRunner(spec.kind().kind().to_string()))?;
let task = r.build_task(spec, &self.ctx)?;
debug!(runner = r.name(), "runner built task successfully");
Ok(task)
}
pub fn contains_label(&self, label_key: &str, label_value: &str) -> bool {
self.runners
.iter()
.any(|e| e.labels.get(label_key) == Some(label_value))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::RunnerError;
use solti_model::{
AdmissionPolicy, BackoffPolicy, Flag, JitterPolicy, Labels, RunnerSelector, SubprocessMode,
SubprocessSpec, TaskEnv, WasmSpec,
};
use std::path::PathBuf;
use taskvisor::{TaskError, TaskFn};
use tokio_util::sync::CancellationToken;
struct SubprocessRunnerDummy;
impl Runner for SubprocessRunnerDummy {
fn name(&self) -> &'static str {
"subprocess-only"
}
fn supports(&self, spec: &TaskSpec) -> bool {
matches!(spec.kind(), TaskKind::Subprocess(_))
}
fn build_task(
&self,
_spec: &TaskSpec,
_ctx: &BuildContext,
) -> Result<TaskRef, RunnerError> {
let task = TaskFn::arc(
"test-subprocess-runner",
|_ctx: CancellationToken| async move { Ok::<(), TaskError>(()) },
);
Ok(task)
}
}
fn mk_backoff() -> BackoffPolicy {
BackoffPolicy {
jitter: JitterPolicy::Equal,
first_ms: 1_000,
max_ms: 5_000,
factor: 2.0,
}
}
fn mk_spec(kind: TaskKind) -> TaskSpec {
TaskSpec::builder("test-slot", kind, 10_000_u64)
.backoff(mk_backoff())
.admission(AdmissionPolicy::DropIfRunning)
.build()
.expect("valid spec")
}
#[test]
fn build_fails_for_taskkind_embedded() {
let router = RunnerRouter::new();
let spec = mk_spec(TaskKind::Embedded);
let res = router.build(&spec);
match res {
Err(RunnerError::NoRunner(msg)) => {
assert!(
msg.contains("TaskKind::Embedded"),
"unexpected NoRunner message: {msg}"
);
}
Ok(_) => panic!("expected RunnerError::NoRunner for TaskKind::Embedded, got Ok(..)"),
Err(e) => panic!("expected RunnerError::NoRunner for TaskKind::Embedded, got {e:?}"),
}
}
#[test]
fn build_uses_registered_runner_for_subprocess() {
let mut router = RunnerRouter::new();
router.register(Arc::new(SubprocessRunnerDummy));
let spec = mk_spec(TaskKind::Subprocess(SubprocessSpec {
mode: SubprocessMode::Command {
command: "echo".to_string(),
args: vec!["hello".into()],
},
env: TaskEnv::default(),
cwd: None,
fail_on_non_zero: Flag::default(),
}));
let res = router.build(&spec);
match res {
Ok(_task) => {}
Err(e) => panic!("expected Ok(TaskRef) for subprocess, got error: {e:?}"),
}
}
#[test]
fn build_fails_when_no_runner_supports_kind() {
let mut router = RunnerRouter::new();
router.register(Arc::new(SubprocessRunnerDummy));
let spec = mk_spec(TaskKind::Wasm(WasmSpec {
module: PathBuf::from("mod.wasm"),
args: Vec::new(),
env: TaskEnv::default(),
}));
let res = router.build(&spec);
match res {
Err(RunnerError::NoRunner(kind)) => {
assert_eq!(kind, "wasm", "expected NoRunner(\"wasm\")");
}
Ok(_) => panic!("expected RunnerError::NoRunner for wasm, got Ok(..)"),
Err(e) => panic!("expected RunnerError::NoRunner for wasm, got {e:?}"),
}
}
#[test]
fn pick_respects_runner_selector() {
struct R1;
struct R2;
impl Runner for R1 {
fn name(&self) -> &'static str {
"r1"
}
fn supports(&self, _spec: &TaskSpec) -> bool {
true
}
fn build_task(
&self,
_spec: &TaskSpec,
_ctx: &BuildContext,
) -> Result<TaskRef, RunnerError> {
Ok(TaskFn::arc(
"r1-task",
|_ctx: CancellationToken| async move { Ok::<(), TaskError>(()) },
))
}
}
impl Runner for R2 {
fn name(&self) -> &'static str {
"r2"
}
fn supports(&self, _spec: &TaskSpec) -> bool {
true
}
fn build_task(
&self,
_spec: &TaskSpec,
_ctx: &BuildContext,
) -> Result<TaskRef, RunnerError> {
Ok(TaskFn::arc(
"r2-task",
|_ctx: CancellationToken| async move { Ok::<(), TaskError>(()) },
))
}
}
let mut labels_r1 = Labels::new();
labels_r1.insert("runner-name", "runner-a");
let mut labels_r2 = Labels::new();
labels_r2.insert("runner-name", "runner-b");
let mut router = RunnerRouter::new();
router.register_with_labels(Arc::new(R1), labels_r1);
router.register_with_labels(Arc::new(R2), labels_r2);
let spec = {
let base = mk_spec(TaskKind::Subprocess(SubprocessSpec {
mode: SubprocessMode::Command {
command: "echo".into(),
args: vec!["hi".into()],
},
env: TaskEnv::default(),
cwd: None,
fail_on_non_zero: Flag::enabled(),
}));
let mut match_labels = Labels::new();
match_labels.insert("runner-name", "runner-b");
base.with_runner_selector(RunnerSelector::from_labels(match_labels))
};
let picked = router.pick(&spec).expect("runner should be picked");
assert_eq!(picked.name(), "r2");
}
}