Skip to main content

studio_worker/engine/
multi.rs

1//! Composite engine that delegates per (kind, model).
2//!
3//! The worker no longer has an `engine` knob in its config: instead,
4//! `engine::build()` returns a `MultiEngine` populated with every
5//! backend compiled into the binary (synthetic always; llama /
6//! whisper / image-candle / video / tts when their cargo features
7//! are on).
8//!
9//! For each incoming job [`MultiEngine`] picks the first engine in
10//! the list that advertises support for the requested model.  If no
11//! engine claims the exact model it falls back to the first engine
12//! that handles the task kind at all.  Real backends are inserted
13//! ahead of synthetic so they win when both could serve the same
14//! (kind, model).  If nothing matches the dispatch fails with the
15//! "cannot serve <kind>" shape the studio's claim loop already
16//! knows how to handle.
17use crate::engine::{Engine, EngineCapabilities};
18use crate::types::*;
19use anyhow::{bail, Result};
20use std::collections::BTreeMap;
21use tracing::{debug, warn};
22
23/// Tracing target for the multi engine.  Stable so operators can
24/// filter with `RUST_LOG=studio_worker::engine::multi=debug`.
25const TRACE_TARGET: &str = "studio_worker::engine::multi";
26
27pub struct MultiEngine {
28    engines: Vec<Box<dyn Engine>>,
29}
30
31impl MultiEngine {
32    pub fn new(engines: Vec<Box<dyn Engine>>) -> Self {
33        Self { engines }
34    }
35
36    /// Pick the engine that claims `(kind, model)` exactly.  No
37    /// kind-only fallback — the studio's `ModelSource` is
38    /// authoritative.  A model whose engine isn't on this worker is
39    /// rejected loudly so the operator sees what's missing instead of
40    /// silently routing through synthetic placeholder bytes.
41    fn pick_for(&self, kind: TaskKind, model: &str) -> Option<&dyn Engine> {
42        for e in &self.engines {
43            if e.capabilities().supports(kind, model) {
44                debug!(
45                    target: TRACE_TARGET,
46                    op = "pick",
47                    kind = kind.as_str(),
48                    model,
49                    sub_engine = e.name(),
50                    r#match = "exact",
51                    "engine selected"
52                );
53                return Some(e.as_ref());
54            }
55        }
56        warn!(
57            target: TRACE_TARGET,
58            op = "pick",
59            kind = kind.as_str(),
60            model,
61            "no engine claims this exact (kind, model) pair"
62        );
63        None
64    }
65}
66
67impl Engine for MultiEngine {
68    fn name(&self) -> &'static str {
69        "multi"
70    }
71
72    fn capabilities(&self) -> EngineCapabilities {
73        let mut map: BTreeMap<TaskKind, Vec<String>> = BTreeMap::new();
74        for e in &self.engines {
75            for (kind, models) in e.capabilities().supported_models_per_kind {
76                let entry = map.entry(kind).or_default();
77                for m in models {
78                    if !entry.contains(&m) {
79                        entry.push(m);
80                    }
81                }
82            }
83        }
84        EngineCapabilities {
85            supported_models_per_kind: map,
86        }
87    }
88
89    fn dispatch(&self, model: &str, task: Task) -> Result<TaskResult> {
90        let kind = task.kind();
91        let Some(engine) = self.pick_for(kind, model) else {
92            bail!(
93                "no engine on this worker can serve model {} (kind={}); \
94                 synthetic fallback is disabled",
95                model,
96                kind.as_str()
97            );
98        };
99        engine.dispatch(model, task)
100    }
101
102    fn dispatch_with_source(
103        &self,
104        model: &str,
105        task: Task,
106        source: &crate::types::ModelSource,
107    ) -> Result<TaskResult> {
108        let kind = task.kind();
109        // The studio knows exactly which engine should serve this
110        // job (source.engine); we route strictly to that backend.
111        // No silent fallback to synthetic for real-model offers —
112        // see DECISIONS.md "Synthetic fallback removed for real
113        // models".
114        let wanted = match source.engine {
115            crate::types::ModelEngine::SdCpp => "sdcpp",
116            crate::types::ModelEngine::LlamaCpp => "llama",
117            crate::types::ModelEngine::Onnx => "onnx",
118            crate::types::ModelEngine::Synthetic => "synthetic",
119        };
120        for e in &self.engines {
121            if e.name() == wanted {
122                debug!(
123                    target: TRACE_TARGET,
124                    op = "pick",
125                    kind = kind.as_str(),
126                    model,
127                    sub_engine = e.name(),
128                    r#match = "model-source",
129                    "engine selected by ModelSource.engine"
130                );
131                return e.dispatch_with_source(model, task, source);
132            }
133        }
134        warn!(
135            target: TRACE_TARGET,
136            op = "pick",
137            kind = kind.as_str(),
138            model,
139            sub_engine = wanted,
140            r#match = "model-source",
141            "requested engine not compiled into this worker"
142        );
143        bail!(
144            "no `{}` engine compiled into this worker (model `{}` requires it). \
145             Install the all-backends release build from \
146             https://github.com/webbertakken/studio-worker/releases/latest, \
147             or rebuild from source with `cargo install studio-worker --features all`.",
148            wanted,
149            model
150        );
151    }
152}
153
154#[cfg(test)]
155mod tests {
156    use super::*;
157    use crate::engine::SyntheticEngine;
158
159    struct StubEngine {
160        name: &'static str,
161        kinds: Vec<TaskKind>,
162        models: Vec<String>,
163    }
164
165    impl Engine for StubEngine {
166        fn name(&self) -> &'static str {
167            self.name
168        }
169        fn capabilities(&self) -> EngineCapabilities {
170            let mut map: BTreeMap<TaskKind, Vec<String>> = BTreeMap::new();
171            for k in &self.kinds {
172                map.insert(*k, self.models.clone());
173            }
174            EngineCapabilities {
175                supported_models_per_kind: map,
176            }
177        }
178        fn dispatch(&self, _model: &str, task: Task) -> Result<TaskResult> {
179            // Return a sentinel result tagged with the engine name so we
180            // can verify routing in tests.
181            match task {
182                Task::Image(_) => Ok(TaskResult::Image {
183                    bytes: self.name.as_bytes().to_vec(),
184                    ext: "test".into(),
185                }),
186                Task::Llm(_) => Ok(TaskResult::Llm {
187                    json: serde_json::json!({ "from": self.name }),
188                }),
189                _ => bail!("stub doesn't serve this"),
190            }
191        }
192    }
193
194    fn image_task() -> Task {
195        Task::Image(ImageParams {
196            prompt: "x".into(),
197            width: 64,
198            height: 64,
199            steps: 1,
200            ext: "webp".into(),
201            ..Default::default()
202        })
203    }
204
205    fn llm_task() -> Task {
206        Task::Llm(LlmParams {
207            messages: vec![],
208            max_tokens: 1,
209            temperature: 0.0,
210            ..Default::default()
211        })
212    }
213
214    #[test]
215    fn multi_picks_first_engine_supporting_the_kind_and_model() {
216        let a: Box<dyn Engine> = Box::new(StubEngine {
217            name: "a",
218            kinds: vec![TaskKind::Image],
219            models: vec!["alpha".into()],
220        });
221        let b: Box<dyn Engine> = Box::new(StubEngine {
222            name: "b",
223            kinds: vec![TaskKind::Image],
224            models: vec!["beta".into()],
225        });
226        let multi = MultiEngine::new(vec![a, b]);
227
228        let result = multi.dispatch("alpha", image_task()).unwrap();
229        match result {
230            TaskResult::Image { bytes, .. } => assert_eq!(bytes, b"a"),
231            _ => panic!("expected image"),
232        }
233        let result = multi.dispatch("beta", image_task()).unwrap();
234        match result {
235            TaskResult::Image { bytes, .. } => assert_eq!(bytes, b"b"),
236            _ => panic!("expected image"),
237        }
238    }
239
240    #[test]
241    fn multi_refuses_unknown_model_without_kind_fallback() {
242        // An LLM engine is present, but no engine claims the
243        // specific model id.  Per the no-fallback policy the
244        // dispatch errors loudly instead of routing to the first
245        // engine that advertises the kind.
246        let alpha_only: Box<dyn Engine> = Box::new(StubEngine {
247            name: "alpha",
248            kinds: vec![TaskKind::Image],
249            models: vec!["alpha-image".into()],
250        });
251        let llm_only: Box<dyn Engine> = Box::new(StubEngine {
252            name: "llm",
253            kinds: vec![TaskKind::Llm],
254            models: vec!["llama-some".into()],
255        });
256        let multi = MultiEngine::new(vec![alpha_only, llm_only]);
257
258        let err = multi.dispatch("unknown-model", llm_task()).unwrap_err();
259        let msg = err.to_string();
260        assert!(
261            msg.contains("no engine on this worker can serve model"),
262            "expected no-fallback error, got: {msg}"
263        );
264        assert!(msg.contains("unknown-model"));
265    }
266
267    #[test]
268    fn multi_errors_when_no_engine_serves_kind() {
269        let image_only: Box<dyn Engine> = Box::new(StubEngine {
270            name: "image",
271            kinds: vec![TaskKind::Image],
272            models: vec!["x".into()],
273        });
274        let multi = MultiEngine::new(vec![image_only]);
275        let err = multi.dispatch("x", llm_task()).unwrap_err();
276        let msg = err.to_string();
277        assert!(
278            msg.contains("no engine on this worker can serve model"),
279            "expected no-fallback error, got: {msg}"
280        );
281    }
282
283    #[test]
284    fn capabilities_union_across_all_engines() {
285        let img: Box<dyn Engine> = Box::new(SyntheticEngine::new());
286        let stub: Box<dyn Engine> = Box::new(StubEngine {
287            name: "extra",
288            kinds: vec![TaskKind::Image],
289            models: vec!["extra-image-model".into()],
290        });
291        let multi = MultiEngine::new(vec![img, stub]);
292        let caps = multi.capabilities();
293        let image = &caps.supported_models_per_kind[&TaskKind::Image];
294        assert!(image.contains(&"synthetic".to_string()));
295        assert!(image.contains(&"extra-image-model".to_string()));
296    }
297
298    #[test]
299    fn name_is_multi() {
300        let multi = MultiEngine::new(vec![]);
301        assert_eq!(multi.name(), "multi");
302    }
303
304    /// Build a `ModelSource` for `engine` with throwaway CLI defaults.
305    /// `dispatch_with_source` routes purely on `engine`, so the file
306    /// roster + params are irrelevant to the routing tests.
307    fn source_for(engine: crate::types::ModelEngine) -> crate::types::ModelSource {
308        crate::types::ModelSource {
309            engine,
310            files: vec![],
311            cli_defaults: crate::types::ModelCliDefaults {
312                cfg_scale: 1.0,
313                steps: 8,
314                width: 1024,
315                height: 1024,
316                sampling_method: None,
317                ..Default::default()
318            },
319        }
320    }
321
322    fn sd_cpp_source() -> crate::types::ModelSource {
323        source_for(crate::types::ModelEngine::SdCpp)
324    }
325
326    /// The no-fallback policy: when the studio asks for an `sd-cpp`
327    /// model but no sd-cpp engine is compiled in (e.g. CI / minimal
328    /// build), dispatch errors loudly instead of silently routing the
329    /// job to synthetic.
330    #[test]
331    fn dispatch_with_source_refuses_to_fall_back_to_synthetic_for_real_models() {
332        let synth: Box<dyn Engine> = Box::new(SyntheticEngine::new());
333        let multi = MultiEngine::new(vec![synth]);
334        let source = sd_cpp_source();
335        let err = multi
336            .dispatch_with_source("some-real-flux-model", image_task(), &source)
337            .unwrap_err()
338            .to_string();
339        assert!(
340            err.contains("no `sdcpp` engine compiled"),
341            "expected no-sdcpp-backend error, got: {err}"
342        );
343    }
344
345    /// The no-match path of `dispatch_with_source` must emit a
346    /// structured breadcrumb on the `studio_worker::engine::multi`
347    /// target, symmetric with `pick_for`'s no-match `warn!`.  Without
348    /// it, an operator filtering `RUST_LOG=studio_worker::engine::multi`
349    /// to trace routing would see "engine selected" events but never
350    /// the rejections, making a wrong-engine offer impossible to
351    /// diagnose from the routing breadcrumbs alone.
352    #[test]
353    fn dispatch_with_source_warns_when_wanted_engine_missing() {
354        let logs = crate::test_support::capture(|| {
355            let synth: Box<dyn Engine> = Box::new(SyntheticEngine::new());
356            let multi = MultiEngine::new(vec![synth]);
357            let source = sd_cpp_source();
358            let _ = multi.dispatch_with_source("some-real-flux-model", image_task(), &source);
359        });
360        assert!(logs.contains("WARN"), "expected WARN, got: {logs}");
361        assert!(
362            logs.contains("studio_worker::engine::multi"),
363            "expected multi target, got: {logs}"
364        );
365        assert!(logs.contains("op=\"pick\""), "expected op field: {logs}");
366        assert!(
367            logs.contains("sdcpp"),
368            "expected wanted engine name in breadcrumb: {logs}"
369        );
370        assert!(
371            logs.contains("some-real-flux-model"),
372            "expected model id in breadcrumb: {logs}"
373        );
374    }
375
376    /// Synthetic offers (engine == Synthetic) still route to the
377    /// synthetic engine.  This is *not* a fallback — the studio
378    /// explicitly asked for it.
379    #[test]
380    fn dispatch_with_source_routes_synthetic_engine_for_synthetic_models() {
381        let synth: Box<dyn Engine> = Box::new(SyntheticEngine::new());
382        let multi = MultiEngine::new(vec![synth]);
383        let source = source_for(crate::types::ModelEngine::Synthetic);
384        let result = multi
385            .dispatch_with_source("synthetic", image_task(), &source)
386            .unwrap();
387        assert!(matches!(result, TaskResult::Image { .. }));
388    }
389
390    /// `ModelSource.engine == Onnx` must route strictly to the engine
391    /// named `onnx` (the LaMa object-removal backend that serves
392    /// Find-the-Differences removals).  This arm had no test, so a
393    /// typo'd engine string or a dropped/reordered match arm would have
394    /// shipped silently and mis-routed every removal job.
395    #[test]
396    fn dispatch_with_source_routes_onnx_to_the_onnx_backend() {
397        let onnx: Box<dyn Engine> = Box::new(StubEngine {
398            name: "onnx",
399            kinds: vec![TaskKind::Image],
400            models: vec![],
401        });
402        let multi = MultiEngine::new(vec![onnx]);
403        let source = source_for(crate::types::ModelEngine::Onnx);
404        let result = multi
405            .dispatch_with_source("lama", image_task(), &source)
406            .unwrap();
407        match result {
408            TaskResult::Image { bytes, .. } => assert_eq!(bytes, b"onnx"),
409            _ => panic!("expected the image to route to the onnx stub"),
410        }
411    }
412
413    /// `ModelSource.engine == LlamaCpp` must route strictly to the
414    /// engine named `llama`.  Symmetric cover for the second
415    /// previously-untested routing arm.
416    #[test]
417    fn dispatch_with_source_routes_llamacpp_to_the_llama_backend() {
418        let llama: Box<dyn Engine> = Box::new(StubEngine {
419            name: "llama",
420            kinds: vec![TaskKind::Llm],
421            models: vec![],
422        });
423        let multi = MultiEngine::new(vec![llama]);
424        let source = source_for(crate::types::ModelEngine::LlamaCpp);
425        let result = multi
426            .dispatch_with_source("some-gguf", llm_task(), &source)
427            .unwrap();
428        match result {
429            TaskResult::Llm { json } => assert_eq!(json["from"], "llama"),
430            _ => panic!("expected the llm to route to the llama stub"),
431        }
432    }
433}