solti-runner 0.0.1

Solti SDK runner plugin trait.
Documentation
//! # Runner router.
//!
//! [`RunnerRouter`] selects the first registered [`Runner`](crate::Runner) that:
//! 1. returns `true` from [`supports`](crate::Runner::supports) for the given spec, and
//! 2. satisfies the [`RunnerSelector`](solti_model::RunnerSelector) label constraints (if any).
//!
//! Runners are checked in registration order.
//!
//! See the [crate root](crate) for architecture overview.
use std::sync::Arc;

use solti_model::{Labels, TaskKind, TaskSpec};
use taskvisor::TaskRef;
use tracing::{debug, instrument, trace};

use crate::context::BuildContext;
use crate::error::RunnerError;
use crate::runner::Runner;

/// Single runner entry with optional static labels used for routing.
struct RunnerEntry {
    /// Concrete runner implementation.
    runner: Arc<dyn Runner>,
    /// Static labels attached to this runner (e.g. capacity class, backend tag).
    labels: Labels,
}

/// Router that selects an appropriate [`Runner`] for a given [`TaskSpec`].
///
/// Runners are checked in the order they were registered.
/// The first runner whose [`Runner::supports`] method returns `true` and satisfies
/// the [`TaskSpec::runner_selector`] (if any) is used to build the task.
///
/// ## Notes
///
/// - `TaskKind::Embedded` is not routable — use `SupervisorApi::submit_with_task` instead.
/// - Default [`BuildContext`] uses empty env and [`NoOpMetrics`](crate::NoOpMetrics).
///
/// ## Also
///
/// - [`Runner`] — trait that concrete executors implement.
/// - [`BuildContext`] — shared dependencies for all runners.
/// - [`RunnerError::NoRunner`](crate::RunnerError::NoRunner) — returned when no runner matches.
#[derive(Default)]
pub struct RunnerRouter {
    runners: Vec<RunnerEntry>,
    ctx: BuildContext,
}

impl RunnerRouter {
    /// Create an empty router with a default build context.
    #[inline]
    pub fn new() -> Self {
        Self {
            runners: Vec::new(),
            ctx: BuildContext::default(),
        }
    }

    /// Set a custom build context for all runners managed by this router.
    ///
    /// This is typically used to inject shared dependencies (config, observability, global handles, etc.) into runner instances.
    #[inline]
    pub fn with_context(mut self, ctx: BuildContext) -> Self {
        self.ctx = ctx;
        self
    }

    /// Register a new runner without labels.
    ///
    /// Runners are queried in the order they are registered; the first one that reports `supports(spec) == true` (and matches labels, if any) is used.
    #[inline]
    pub fn register(&mut self, runner: Arc<dyn Runner>) {
        self.runners.push(RunnerEntry {
            runner,
            labels: Labels::default(),
        });
    }

    /// Register a new runner with static labels.
    ///
    /// These labels are used by the router to further narrow down candidates when [`TaskSpec::runner_selector`] is set.
    #[inline]
    pub fn register_with_labels(&mut self, runner: Arc<dyn Runner>, labels: Labels) {
        self.runners.push(RunnerEntry { runner, labels });
    }

    /// Pick the first runner that claims to support the given spec and matches the runner selector.
    ///
    /// Routing rules:
    /// - filter runners by `Runner::supports(spec)`;
    /// - if `spec.runner_selector()` is set, keep only runners whose `labels`
    ///   satisfy all `match_labels` and `match_expressions` requirements;
    /// - pick the first matching entry.
    pub fn pick(&self, spec: &TaskSpec) -> Option<&Arc<dyn Runner>> {
        let selector = spec.runner_selector();

        self.runners
            .iter()
            .find(|entry| {
                entry.runner.supports(spec) && selector.is_none_or(|sel| sel.matches(&entry.labels))
            })
            .map(|entry| &entry.runner)
    }

    /// Build a [`TaskRef`] for the given spec using the selected runner.
    ///
    /// `TaskKind::Embedded` is not routable and must be used with
    /// `SupervisorApi::submit_with_task`.
    #[instrument(level = "debug", skip(self, spec), fields(kind = ?spec.kind()))]
    pub fn build(&self, spec: &TaskSpec) -> Result<TaskRef, RunnerError> {
        trace!(spec = ?spec, "router received spec");

        if matches!(spec.kind(), TaskKind::Embedded) {
            return Err(RunnerError::NoRunner(
                "TaskKind::Embedded requires submit_with_task()".to_string(),
            ));
        }
        let r = self
            .pick(spec)
            .ok_or_else(|| RunnerError::NoRunner(spec.kind().kind().to_string()))?;

        let task = r.build_task(spec, &self.ctx)?;
        debug!(runner = r.name(), "runner built task successfully");
        Ok(task)
    }

    /// Returns `true` if at least one registered runner has `label_key == label_value`.
    pub fn contains_label(&self, label_key: &str, label_value: &str) -> bool {
        self.runners
            .iter()
            .any(|e| e.labels.get(label_key) == Some(label_value))
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::RunnerError;

    use solti_model::{
        AdmissionPolicy, BackoffPolicy, Flag, JitterPolicy, Labels, RunnerSelector, SubprocessMode,
        SubprocessSpec, TaskEnv, WasmSpec,
    };
    use std::path::PathBuf;
    use taskvisor::{TaskError, TaskFn};
    use tokio_util::sync::CancellationToken;

    struct SubprocessRunnerDummy;

    impl Runner for SubprocessRunnerDummy {
        fn name(&self) -> &'static str {
            "subprocess-only"
        }

        fn supports(&self, spec: &TaskSpec) -> bool {
            matches!(spec.kind(), TaskKind::Subprocess(_))
        }

        fn build_task(
            &self,
            _spec: &TaskSpec,
            _ctx: &BuildContext,
        ) -> Result<TaskRef, RunnerError> {
            let task = TaskFn::arc(
                "test-subprocess-runner",
                |_ctx: CancellationToken| async move { Ok::<(), TaskError>(()) },
            );
            Ok(task)
        }
    }

    fn mk_backoff() -> BackoffPolicy {
        BackoffPolicy {
            jitter: JitterPolicy::Equal,
            first_ms: 1_000,
            max_ms: 5_000,
            factor: 2.0,
        }
    }

    fn mk_spec(kind: TaskKind) -> TaskSpec {
        TaskSpec::builder("test-slot", kind, 10_000_u64)
            .backoff(mk_backoff())
            .admission(AdmissionPolicy::DropIfRunning)
            .build()
            .expect("valid spec")
    }

    #[test]
    fn build_fails_for_taskkind_embedded() {
        let router = RunnerRouter::new();
        let spec = mk_spec(TaskKind::Embedded);

        let res = router.build(&spec);

        match res {
            Err(RunnerError::NoRunner(msg)) => {
                assert!(
                    msg.contains("TaskKind::Embedded"),
                    "unexpected NoRunner message: {msg}"
                );
            }
            Ok(_) => panic!("expected RunnerError::NoRunner for TaskKind::Embedded, got Ok(..)"),
            Err(e) => panic!("expected RunnerError::NoRunner for TaskKind::Embedded, got {e:?}"),
        }
    }

    #[test]
    fn build_uses_registered_runner_for_subprocess() {
        let mut router = RunnerRouter::new();
        router.register(Arc::new(SubprocessRunnerDummy));

        let spec = mk_spec(TaskKind::Subprocess(SubprocessSpec {
            mode: SubprocessMode::Command {
                command: "echo".to_string(),
                args: vec!["hello".into()],
            },
            env: TaskEnv::default(),
            cwd: None,
            fail_on_non_zero: Flag::default(),
        }));

        let res = router.build(&spec);

        match res {
            Ok(_task) => {}
            Err(e) => panic!("expected Ok(TaskRef) for subprocess, got error: {e:?}"),
        }
    }

    #[test]
    fn build_fails_when_no_runner_supports_kind() {
        let mut router = RunnerRouter::new();
        router.register(Arc::new(SubprocessRunnerDummy));

        let spec = mk_spec(TaskKind::Wasm(WasmSpec {
            module: PathBuf::from("mod.wasm"),
            args: Vec::new(),
            env: TaskEnv::default(),
        }));

        let res = router.build(&spec);

        match res {
            Err(RunnerError::NoRunner(kind)) => {
                assert_eq!(kind, "wasm", "expected NoRunner(\"wasm\")");
            }
            Ok(_) => panic!("expected RunnerError::NoRunner for wasm, got Ok(..)"),
            Err(e) => panic!("expected RunnerError::NoRunner for wasm, got {e:?}"),
        }
    }

    #[test]
    fn pick_respects_runner_selector() {
        struct R1;
        struct R2;

        impl Runner for R1 {
            fn name(&self) -> &'static str {
                "r1"
            }

            fn supports(&self, _spec: &TaskSpec) -> bool {
                true
            }

            fn build_task(
                &self,
                _spec: &TaskSpec,
                _ctx: &BuildContext,
            ) -> Result<TaskRef, RunnerError> {
                Ok(TaskFn::arc(
                    "r1-task",
                    |_ctx: CancellationToken| async move { Ok::<(), TaskError>(()) },
                ))
            }
        }

        impl Runner for R2 {
            fn name(&self) -> &'static str {
                "r2"
            }

            fn supports(&self, _spec: &TaskSpec) -> bool {
                true
            }

            fn build_task(
                &self,
                _spec: &TaskSpec,
                _ctx: &BuildContext,
            ) -> Result<TaskRef, RunnerError> {
                Ok(TaskFn::arc(
                    "r2-task",
                    |_ctx: CancellationToken| async move { Ok::<(), TaskError>(()) },
                ))
            }
        }

        let mut labels_r1 = Labels::new();
        labels_r1.insert("runner-name", "runner-a");
        let mut labels_r2 = Labels::new();
        labels_r2.insert("runner-name", "runner-b");

        let mut router = RunnerRouter::new();
        router.register_with_labels(Arc::new(R1), labels_r1);
        router.register_with_labels(Arc::new(R2), labels_r2);

        let spec = {
            let base = mk_spec(TaskKind::Subprocess(SubprocessSpec {
                mode: SubprocessMode::Command {
                    command: "echo".into(),
                    args: vec!["hi".into()],
                },
                env: TaskEnv::default(),
                cwd: None,
                fail_on_non_zero: Flag::enabled(),
            }));
            let mut match_labels = Labels::new();
            match_labels.insert("runner-name", "runner-b");
            base.with_runner_selector(RunnerSelector::from_labels(match_labels))
        };

        let picked = router.pick(&spec).expect("runner should be picked");
        assert_eq!(picked.name(), "r2");
    }
}