Skip to main content

model_runtime/
jobs.rs

1use std::collections::BTreeMap;
2#[cfg(feature = "jobs")]
3use std::path::Path;
4use std::path::PathBuf;
5#[cfg(feature = "jobs")]
6use std::process::Command;
7#[cfg(feature = "jobs")]
8use std::sync::{Arc, Mutex};
9
10use jobs_core::{ArtifactKind, ArtifactRef, JobSpec};
11#[cfg(feature = "jobs")]
12use jobs_core::{BackgroundJobRunner, JobArtifact, JobError, JobProgress};
13use runtime_core::{
14    Diagnostic, OperationId, RuntimeRequirement, SurfaceArtifactExpectation, SurfaceExecutionMode,
15    SurfaceExecutionPlan, SurfaceSideEffect,
16};
17use serde::{Deserialize, Serialize};
18
19#[cfg(feature = "jobs")]
20use crate::{ModelBundle, ModelBundleStore};
21use crate::{ModelFileRequest, ModelRuntimeBackend, ModelSource, ModelSpec};
22
23/// Long-running model operation kind.
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
25pub enum ModelJobKind {
26    /// Download remote model files.
27    Download,
28    /// Materialize a local model bundle.
29    MaterializeBundle,
30    /// Validate an existing model bundle.
31    ValidateBundle,
32    /// Warm a runtime/session.
33    Warmup,
34    /// Run one inference request.
35    Inference,
36    /// Run batch inference.
37    BatchInference,
38    /// Run an external model command.
39    ExternalCommand,
40}
41
42/// Generic request for long-running or side-effectful model access.
43#[derive(Debug, Clone, Serialize, Deserialize)]
44#[serde(rename_all = "camelCase")]
45pub struct ModelAccessJobRequest {
46    /// Optional caller-supplied job id.
47    pub id: Option<String>,
48    /// The model operation kind.
49    pub kind: ModelJobKind,
50    /// Model spec or preset-derived spec.
51    pub spec: ModelSpec,
52    /// Runtime backend that will handle the request.
53    pub backend: ModelRuntimeBackend,
54    /// Input values or artifacts for the job.
55    #[serde(default)]
56    pub inputs: Vec<ModelJobInput>,
57    /// Optional artifact id prefix for outputs.
58    pub output_artifact_prefix: Option<String>,
59    /// Caller-defined metadata.
60    #[serde(default)]
61    pub metadata: BTreeMap<String, String>,
62}
63
64/// Input accepted by generic model jobs.
65#[derive(Debug, Clone, Serialize, Deserialize)]
66#[serde(rename_all = "camelCase", tag = "kind", content = "value")]
67pub enum ModelJobInput {
68    /// JSON request payload.
69    Json(serde_json::Value),
70    /// Artifact produced by a previous job.
71    Artifact(ArtifactRef),
72    /// Local path supplied by an explicit CLI/server/job route.
73    LocalPath(PathBuf),
74}
75
76/// Generic result returned by model access jobs.
77#[derive(Debug, Clone, Serialize, Deserialize)]
78#[serde(rename_all = "camelCase")]
79pub struct ModelAccessJobResult {
80    /// Stable job identifier.
81    pub job_id: jobs_core::JobId,
82    /// Model operation kind.
83    pub kind: ModelJobKind,
84    /// Model spec used by the job.
85    pub spec: ModelSpec,
86    /// Runtime backend used by the job.
87    pub backend: ModelRuntimeBackend,
88    /// Artifacts produced by the job.
89    pub artifacts: Vec<ArtifactRef>,
90    /// Diagnostics emitted by the job.
91    pub diagnostics: Vec<Diagnostic>,
92    /// Optional structured output.
93    pub output: Option<serde_json::Value>,
94}
95
96/// Pure plan for a model bundle layout before files are materialized.
97#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
98#[serde(rename_all = "camelCase")]
99pub struct ModelBundlePlan {
100    pub spec: ModelSpec,
101    pub manifest_path: String,
102    pub files_directory: String,
103    pub files: Vec<ModelBundlePlanFile>,
104    pub artifact_refs: Vec<ArtifactRef>,
105    pub downloads_required: bool,
106}
107
108/// One planned model bundle file.
109#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
110#[serde(rename_all = "camelCase")]
111pub struct ModelBundlePlanFile {
112    pub remote_path: String,
113    pub local_path: String,
114    pub present_locally: bool,
115    pub required: bool,
116    pub media_type: String,
117}
118
119/// Pure access plan for a model job request.
120#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
121#[serde(rename_all = "camelCase")]
122pub struct ModelAccessPlan {
123    pub job_spec: jobs_core::JobSpec,
124    pub kind: ModelJobKind,
125    pub backend: ModelRuntimeBackend,
126    pub execution_plan: SurfaceExecutionPlan,
127    pub expected_artifacts: Vec<ArtifactRef>,
128}
129
130impl ModelJobKind {
131    /// Returns the stable job kind string.
132    pub fn as_str(self) -> &'static str {
133        match self {
134            Self::Download => "model-download",
135            Self::MaterializeBundle => "model-materialize-bundle",
136            Self::ValidateBundle => "model-validate-bundle",
137            Self::Warmup => "model-warmup",
138            Self::Inference => "model-inference",
139            Self::BatchInference => "model-batch-inference",
140            Self::ExternalCommand => "model-external-command",
141        }
142    }
143}
144
145/// Builds a standard model job spec with model/runtime metadata.
146pub fn model_job_spec(
147    id: impl Into<String>,
148    kind: ModelJobKind,
149    spec: &ModelSpec,
150    backend: ModelRuntimeBackend,
151) -> jobs_core::Result<JobSpec> {
152    let mut job = JobSpec::new(id, format!("{} {}", kind.as_str(), spec.name))?
153        .with_kind(kind.as_str())?
154        .with_metadata("model.name", spec.name.clone())?
155        .with_metadata("model.task", spec.task.as_protocol_str().to_string())?
156        .with_metadata("model.source", spec.source.kind().to_string())?
157        .with_metadata("model.runtime", backend.as_str().to_string())?;
158    if let Some(revision) = spec.revision_value() {
159        job = job.with_metadata("model.revision", revision.to_string())?;
160    }
161    if let Some(repo_id) = spec.repo_id_value() {
162        job = job.with_metadata("model.repoId", repo_id.to_string())?;
163    }
164    Ok(job)
165}
166
167/// Plans a model bundle without downloading or materializing files.
168pub fn plan_model_bundle(
169    spec: &ModelSpec,
170    local_files: &[String],
171) -> crate::Result<ModelBundlePlan> {
172    validate_model_spec(spec)?;
173    let safe_name = spec.safe_name();
174    let revision = spec.revision_value().unwrap_or("main");
175    let bundle_root = format!("{safe_name}/{revision}");
176    let files = resolve_requested_files(&spec.files, local_files);
177    let artifact_refs = files
178        .iter()
179        .map(|file| {
180            let mut artifact = ArtifactRef::new(
181                format!("model:{}", file.remote_path.replace(['/', '\\'], "_")),
182                model_file_kind(&file.remote_path),
183                file.media_type.clone(),
184                format!("{bundle_root}/{}", file.local_path),
185            );
186            artifact.metadata = model_metadata(spec, ModelRuntimeBackend::External);
187            artifact.metadata.insert(
188                "model.fileRole".to_string(),
189                model_file_role(&file.remote_path).to_string(),
190            );
191            artifact
192        })
193        .collect::<Vec<_>>();
194    let downloads_required = files
195        .iter()
196        .any(|file| file.required && !file.present_locally);
197    Ok(ModelBundlePlan {
198        spec: spec.clone(),
199        manifest_path: format!("{bundle_root}/manifest.json"),
200        files_directory: format!("{bundle_root}/files"),
201        files,
202        artifact_refs,
203        downloads_required,
204    })
205}
206
207/// Plans model access as a job without spawning work or invoking model runtimes.
208pub fn plan_model_access(request: &ModelAccessJobRequest) -> jobs_core::Result<ModelAccessPlan> {
209    validate_model_spec_for_job(&request.spec)?;
210    let job_spec = job_spec_for_request(request)?;
211    let bundle_plan = plan_model_bundle(&request.spec, &[])
212        .map_err(|error| jobs_core::JobError::InvalidArgument(error.to_string()))?;
213    let expected_artifacts = expected_artifacts_for_request(request, &bundle_plan);
214    let execution_plan = SurfaceExecutionPlan {
215        operation: OperationId::new("model.executionPlan"),
216        mode: execution_mode_for_request(request),
217        side_effects: side_effects_for_request(request),
218        cancellable: true,
219        progress_unit: Some(progress_unit_for_kind(request.kind).to_string()),
220        expected_artifacts: expected_artifacts
221            .iter()
222            .map(|artifact| SurfaceArtifactExpectation {
223                id: artifact.id.as_str().to_string(),
224                kind: artifact_kind_name(&artifact.kind),
225                media_type: artifact.media_type.clone(),
226                required: true,
227                description: Some(format!("Expected {} artifact", artifact.id.as_str())),
228            })
229            .collect(),
230        requirements: runtime_requirements_for_request(request),
231        max_recommended_input_bytes: Some(1_048_576),
232    };
233    Ok(ModelAccessPlan {
234        job_spec,
235        kind: request.kind,
236        backend: request.backend.clone(),
237        execution_plan,
238        expected_artifacts,
239    })
240}
241
242fn validate_model_spec(spec: &ModelSpec) -> crate::Result<()> {
243    if spec.name.trim().is_empty() {
244        return Err(crate::ModelRuntimeError::InvalidArgument(
245            "model name must not be empty".to_string(),
246        ));
247    }
248    for file in &spec.files {
249        match file {
250            ModelFileRequest::Required(path) | ModelFileRequest::Optional(path) => {
251                validate_remote_file_path(path)?;
252            }
253            ModelFileRequest::FirstAvailable(paths) => {
254                if paths.is_empty() {
255                    return Err(crate::ModelRuntimeError::InvalidArgument(
256                        "first_available model file requests must include at least one path"
257                            .to_string(),
258                    ));
259                }
260                for path in paths {
261                    validate_remote_file_path(path)?;
262                }
263            }
264        }
265    }
266    Ok(())
267}
268
269fn validate_model_spec_for_job(spec: &ModelSpec) -> jobs_core::Result<()> {
270    validate_model_spec(spec)
271        .map_err(|error| jobs_core::JobError::InvalidArgument(error.to_string()))
272}
273
274fn validate_remote_file_path(path: &str) -> crate::Result<()> {
275    if path.trim().is_empty() || path.starts_with('/') || path.contains("..") {
276        return Err(crate::ModelRuntimeError::InvalidArgument(format!(
277            "model file path `{path}` must be a relative file path"
278        )));
279    }
280    Ok(())
281}
282
283fn resolve_requested_files(
284    files: &[ModelFileRequest],
285    local_files: &[String],
286) -> Vec<ModelBundlePlanFile> {
287    files
288        .iter()
289        .filter_map(|request| match request {
290            ModelFileRequest::Required(path) => Some((path.clone(), true)),
291            ModelFileRequest::Optional(path) => Some((path.clone(), false)),
292            ModelFileRequest::FirstAvailable(paths) => paths
293                .iter()
294                .find(|path| local_files.iter().any(|local| local == *path))
295                .or_else(|| paths.first())
296                .map(|path| (path.clone(), true)),
297        })
298        .map(|(remote_path, required)| ModelBundlePlanFile {
299            local_path: format!("files/{remote_path}"),
300            present_locally: local_files.iter().any(|local| local == &remote_path),
301            media_type: model_file_media_type(&remote_path).to_string(),
302            remote_path,
303            required,
304        })
305        .collect()
306}
307
308fn expected_artifacts_for_request(
309    request: &ModelAccessJobRequest,
310    bundle_plan: &ModelBundlePlan,
311) -> Vec<ArtifactRef> {
312    match request.kind {
313        ModelJobKind::Download | ModelJobKind::MaterializeBundle => {
314            let mut artifacts = bundle_plan.artifact_refs.clone();
315            artifacts.push(model_manifest_artifact(request, bundle_plan));
316            artifacts
317        }
318        ModelJobKind::ValidateBundle => vec![model_manifest_artifact(request, bundle_plan)],
319        ModelJobKind::Warmup => Vec::new(),
320        ModelJobKind::Inference | ModelJobKind::BatchInference | ModelJobKind::ExternalCommand => {
321            vec![planned_output_artifact(request)]
322        }
323    }
324}
325
326fn model_manifest_artifact(
327    request: &ModelAccessJobRequest,
328    bundle_plan: &ModelBundlePlan,
329) -> ArtifactRef {
330    let mut artifact = ArtifactRef::new(
331        artifact_id(request, "manifest"),
332        ArtifactKind::Json,
333        "application/json",
334        bundle_plan.manifest_path.clone(),
335    );
336    artifact.metadata = model_metadata(&request.spec, request.backend.clone());
337    artifact
338}
339
340fn planned_output_artifact(request: &ModelAccessJobRequest) -> ArtifactRef {
341    let mut artifact = ArtifactRef::new(
342        artifact_id(request, "output"),
343        ArtifactKind::Json,
344        "application/json",
345        format!("memory://{}/output.json", default_job_id(request)),
346    );
347    artifact.metadata = model_metadata(&request.spec, request.backend.clone());
348    artifact
349}
350
351fn execution_mode_for_request(request: &ModelAccessJobRequest) -> SurfaceExecutionMode {
352    if request.kind == ModelJobKind::ExternalCommand
353        || matches!(request.backend, ModelRuntimeBackend::External)
354        || matches!(request.spec.source, ModelSource::ExternalCommand { .. })
355    {
356        SurfaceExecutionMode::ExternalCommand
357    } else {
358        SurfaceExecutionMode::PlannedJob
359    }
360}
361
362fn side_effects_for_request(request: &ModelAccessJobRequest) -> Vec<SurfaceSideEffect> {
363    match request.kind {
364        ModelJobKind::Download => vec![SurfaceSideEffect::Network, SurfaceSideEffect::WritesFiles],
365        ModelJobKind::MaterializeBundle => vec![
366            SurfaceSideEffect::ReadsFiles,
367            SurfaceSideEffect::WritesFiles,
368        ],
369        ModelJobKind::ValidateBundle => vec![SurfaceSideEffect::ReadsFiles],
370        ModelJobKind::ExternalCommand => vec![SurfaceSideEffect::ExternalProcess],
371        ModelJobKind::Warmup | ModelJobKind::Inference | ModelJobKind::BatchInference => {
372            vec![SurfaceSideEffect::None]
373        }
374    }
375}
376
377fn progress_unit_for_kind(kind: ModelJobKind) -> &'static str {
378    match kind {
379        ModelJobKind::BatchInference => "inputs",
380        ModelJobKind::Download | ModelJobKind::MaterializeBundle | ModelJobKind::ValidateBundle => {
381            "files"
382        }
383        ModelJobKind::Warmup | ModelJobKind::Inference | ModelJobKind::ExternalCommand => "steps",
384    }
385}
386
387fn runtime_requirements_for_request(request: &ModelAccessJobRequest) -> Vec<RuntimeRequirement> {
388    let mut requirements = Vec::new();
389    if request.kind == ModelJobKind::Download {
390        requirements.push(RuntimeRequirement {
391            name: "network".to_string(),
392            description: Some("Model file download requires network access".to_string()),
393            required: true,
394        });
395    }
396    if matches!(request.kind, ModelJobKind::ExternalCommand)
397        || matches!(request.backend, ModelRuntimeBackend::External)
398    {
399        requirements.push(RuntimeRequirement {
400            name: "external-command".to_string(),
401            description: Some("Execution requires a caller-provided command".to_string()),
402            required: true,
403        });
404    }
405    requirements
406}
407
408fn model_file_kind(remote_path: &str) -> ArtifactKind {
409    match model_file_role(remote_path) {
410        "config" | "tokenizer" => ArtifactKind::Json,
411        "vocabulary" => ArtifactKind::Text,
412        _ => ArtifactKind::Binary,
413    }
414}
415
416fn model_file_media_type(remote_path: &str) -> &'static str {
417    if remote_path.ends_with(".json") {
418        "application/json"
419    } else if remote_path.ends_with(".txt") || remote_path.ends_with(".md") {
420        "text/plain"
421    } else {
422        "application/octet-stream"
423    }
424}
425
426fn model_file_role(remote_path: &str) -> &'static str {
427    let file_name = remote_path.rsplit('/').next().unwrap_or(remote_path);
428    if file_name == "config.json" {
429        "config"
430    } else if file_name.contains("tokenizer") {
431        "tokenizer"
432    } else if matches!(file_name, "vocab.txt" | "merges.txt") {
433        "vocabulary"
434    } else if file_name.ends_with(".onnx")
435        || file_name.ends_with(".safetensors")
436        || file_name.ends_with(".bin")
437        || file_name.ends_with(".pt")
438    {
439        "weights"
440    } else {
441        "artifact"
442    }
443}
444
445fn artifact_kind_name(kind: &ArtifactKind) -> String {
446    match kind {
447        ArtifactKind::File => "file",
448        ArtifactKind::Directory => "directory",
449        ArtifactKind::Image => "image",
450        ArtifactKind::Audio => "audio",
451        ArtifactKind::Video => "video",
452        ArtifactKind::Text => "text",
453        ArtifactKind::Json => "json",
454        ArtifactKind::Log => "log",
455        ArtifactKind::Archive => "archive",
456        ArtifactKind::Binary => "binary",
457        ArtifactKind::Other(value) => value.as_str(),
458    }
459    .to_string()
460}
461
462/// Join handle for model jobs that produce a typed value.
463#[cfg(feature = "jobs")]
464#[derive(Debug)]
465pub struct ModelJobJoinHandle<T> {
466    inner: jobs_core::JobJoinHandle,
467    value: Arc<Mutex<Option<T>>>,
468}
469
470#[cfg(feature = "jobs")]
471impl<T: Clone> ModelJobJoinHandle<T> {
472    /// Returns the underlying tracked job.
473    pub fn job(&self) -> &jobs_core::JobHandle {
474        self.inner.job()
475    }
476
477    /// Returns the job id.
478    pub fn id(&self) -> &jobs_core::JobId {
479        self.inner.id()
480    }
481
482    /// Requests cancellation.
483    pub fn request_cancel(&self) -> jobs_core::Result<()> {
484        self.inner.request_cancel()
485    }
486
487    /// Waits for completion and returns the produced value.
488    pub fn join_result(&mut self) -> jobs_core::Result<T> {
489        self.inner.join()?;
490        self.value
491            .lock()
492            .map_err(|_| JobError::StateUnavailable("model job result lock poisoned".to_string()))?
493            .clone()
494            .ok_or_else(|| JobError::Failed("model job did not produce a result".to_string()))
495    }
496}
497
498/// Spawns a model download/materialization job.
499#[cfg(feature = "jobs")]
500pub fn spawn_model_download_job(
501    runner: &BackgroundJobRunner,
502    spec: ModelSpec,
503    store: ModelBundleStore,
504) -> jobs_core::Result<ModelJobJoinHandle<ModelBundle>> {
505    let value = Arc::new(Mutex::new(None));
506    let value_for_job = Arc::clone(&value);
507    let job_spec = model_job_spec(
508        format!(
509            "model-download-{}-{}",
510            spec.safe_name(),
511            spec.revision_value().unwrap_or("local")
512        ),
513        ModelJobKind::Download,
514        &spec,
515        ModelRuntimeBackend::External,
516    )?;
517    let inner = runner.spawn(job_spec, move |context| {
518        context.info(format!("materializing model bundle `{}`", spec.name))?;
519        context.progress(
520            JobProgress::new(0, Some(2))?
521                .unit("steps")?
522                .message("starting model download"),
523        )?;
524        context.check_cancelled()?;
525        let bundle = store
526            .download(&spec)
527            .map_err(|err| JobError::Failed(err.to_string()))?;
528        context.check_cancelled()?;
529        context.progress(
530            JobProgress::new(1, Some(2))?
531                .unit("steps")?
532                .message("model files materialized"),
533        )?;
534        context.artifact(
535            JobArtifact::new("manifest", "Model bundle manifest")
536                .kind("model-bundle")
537                .path(bundle.manifest_path()),
538        )?;
539        *value_for_job.lock().map_err(|_| {
540            JobError::StateUnavailable("model job result lock poisoned".to_string())
541        })? = Some(bundle);
542        context.progress(
543            JobProgress::new(2, Some(2))?
544                .unit("steps")?
545                .message("model bundle ready"),
546        )?;
547        Ok(())
548    })?;
549    Ok(ModelJobJoinHandle { inner, value })
550}
551
552/// Spawns a model bundle materialization job.
553#[cfg(feature = "jobs")]
554pub fn spawn_model_materialize_job(
555    runner: &BackgroundJobRunner,
556    request: ModelAccessJobRequest,
557    store: ModelBundleStore,
558) -> jobs_core::Result<ModelJobJoinHandle<ModelAccessJobResult>> {
559    spawn_access_job(runner, request, move |context, request| {
560        context.info(format!(
561            "materializing model bundle `{}`",
562            request.spec.name
563        ))?;
564        context.progress(
565            JobProgress::new(0, Some(2))?
566                .unit("steps")?
567                .message("starting model materialization"),
568        )?;
569        context.check_cancelled()?;
570        let bundle = store
571            .download(&request.spec)
572            .map_err(|err| JobError::Failed(err.to_string()))?;
573        context.check_cancelled()?;
574        let mut artifacts = bundle.artifact_refs();
575        let manifest_artifact = artifact_ref_for_path(
576            artifact_id(&request, "manifest"),
577            ArtifactKind::Json,
578            "application/json",
579            &bundle.manifest_path(),
580            model_metadata(&request.spec, request.backend.clone()),
581        );
582        context.artifact(
583            JobArtifact::new("manifest", "Model bundle manifest")
584                .kind("model-bundle")
585                .path(bundle.manifest_path()),
586        )?;
587        artifacts.push(manifest_artifact);
588        context.progress(
589            JobProgress::new(2, Some(2))?
590                .unit("steps")?
591                .message("model bundle materialized"),
592        )?;
593        Ok(ModelAccessJobResult {
594            job_id: context.id().clone(),
595            kind: request.kind,
596            spec: request.spec,
597            backend: request.backend,
598            artifacts,
599            diagnostics: Vec::new(),
600            output: Some(serde_json::json!({
601                "bundleRoot": bundle.root,
602                "manifestPath": bundle.manifest_path(),
603            })),
604        })
605    })
606}
607
608/// Spawns a model bundle validation job.
609#[cfg(feature = "jobs")]
610pub fn spawn_model_validate_job(
611    runner: &BackgroundJobRunner,
612    request: ModelAccessJobRequest,
613) -> jobs_core::Result<ModelJobJoinHandle<ModelAccessJobResult>> {
614    spawn_access_job(runner, request, move |context, request| {
615        context.info(format!(
616            "validating model access for `{}`",
617            request.spec.name
618        ))?;
619        context.check_cancelled()?;
620        let mut artifacts = Vec::new();
621        for (index, input) in request.inputs.iter().enumerate() {
622            if let ModelJobInput::LocalPath(path) = input {
623                if !path.exists() {
624                    return Err(JobError::NotFound(format!(
625                        "model input path `{}` does not exist",
626                        path.display()
627                    )));
628                }
629                let kind = if path.is_dir() {
630                    ArtifactKind::Directory
631                } else {
632                    ArtifactKind::File
633                };
634                let artifact = artifact_ref_for_path(
635                    artifact_id(&request, &format!("input-{index}")),
636                    kind,
637                    "application/octet-stream",
638                    path,
639                    model_metadata(&request.spec, request.backend.clone()),
640                );
641                context.artifact(
642                    JobArtifact::new(format!("input-{index}"), "Validated model input").path(path),
643                )?;
644                artifacts.push(artifact);
645            }
646        }
647        context.check_cancelled()?;
648        Ok(ModelAccessJobResult {
649            job_id: context.id().clone(),
650            kind: request.kind,
651            spec: request.spec,
652            backend: request.backend,
653            artifacts,
654            diagnostics: Vec::new(),
655            output: Some(serde_json::json!({ "valid": true })),
656        })
657    })
658}
659
660/// Spawns a runtime warmup job.
661#[cfg(feature = "jobs")]
662pub fn spawn_model_warmup_job(
663    runner: &BackgroundJobRunner,
664    request: ModelAccessJobRequest,
665) -> jobs_core::Result<ModelJobJoinHandle<ModelAccessJobResult>> {
666    spawn_access_job(runner, request, move |context, request| {
667        context.info(format!(
668            "warming model runtime `{}`",
669            request.backend.as_str()
670        ))?;
671        context.check_cancelled()?;
672        context.progress(
673            JobProgress::new(1, Some(1))?
674                .unit("steps")?
675                .message("model warmup recorded"),
676        )?;
677        Ok(empty_access_result(
678            context.id().clone(),
679            request,
680            Some(serde_json::json!({ "warmed": true })),
681        ))
682    })
683}
684
685/// Spawns a single model inference job.
686#[cfg(feature = "jobs")]
687pub fn spawn_model_inference_job(
688    runner: &BackgroundJobRunner,
689    request: ModelAccessJobRequest,
690) -> jobs_core::Result<ModelJobJoinHandle<ModelAccessJobResult>> {
691    spawn_access_job(runner, request, move |context, request| {
692        context.info(format!("running model inference `{}`", request.spec.name))?;
693        context.check_cancelled()?;
694        let output = serde_json::json!({
695            "inputCount": request.inputs.len(),
696            "backend": request.backend.as_str(),
697        });
698        context.check_cancelled()?;
699        Ok(empty_access_result(
700            context.id().clone(),
701            request,
702            Some(output),
703        ))
704    })
705}
706
707/// Spawns a batch model inference job.
708#[cfg(feature = "jobs")]
709pub fn spawn_model_batch_inference_job(
710    runner: &BackgroundJobRunner,
711    request: ModelAccessJobRequest,
712) -> jobs_core::Result<ModelJobJoinHandle<ModelAccessJobResult>> {
713    spawn_access_job(runner, request, move |context, request| {
714        context.info(format!(
715            "running batch model inference `{}`",
716            request.spec.name
717        ))?;
718        context.check_cancelled()?;
719        let input_count = request.inputs.len();
720        context.progress(
721            JobProgress::new(input_count as u64, Some(input_count.max(1) as u64))?
722                .unit("inputs")?
723                .message("batch inputs accepted"),
724        )?;
725        context.check_cancelled()?;
726        Ok(empty_access_result(
727            context.id().clone(),
728            request,
729            Some(serde_json::json!({ "inputCount": input_count })),
730        ))
731    })
732}
733
734/// Spawns an external model command job.
735#[cfg(feature = "jobs")]
736pub fn spawn_external_model_command_job(
737    runner: &BackgroundJobRunner,
738    request: ModelAccessJobRequest,
739) -> jobs_core::Result<ModelJobJoinHandle<ModelAccessJobResult>> {
740    spawn_access_job(runner, request, move |context, request| {
741        let command = match &request.spec.source {
742            ModelSource::ExternalCommand { command } => command.clone(),
743            _ => {
744                return Err(JobError::InvalidArgument(
745                    "external model command jobs require ModelSource::ExternalCommand".to_string(),
746                ));
747            }
748        };
749        context.info(format!(
750            "running external model command `{}`",
751            command.display()
752        ))?;
753        context.check_cancelled()?;
754        let output = Command::new(&command).output().map_err(|err| {
755            JobError::Failed(format!("failed to run `{}`: {err}", command.display()))
756        })?;
757        context.check_cancelled()?;
758        if !output.status.success() {
759            return Err(JobError::Failed(format!(
760                "`{}` exited with status {}",
761                command.display(),
762                output.status
763            )));
764        }
765        Ok(empty_access_result(
766            context.id().clone(),
767            request,
768            Some(serde_json::json!({
769                "status": output.status.code(),
770                "stdout": String::from_utf8_lossy(&output.stdout),
771                "stderr": String::from_utf8_lossy(&output.stderr),
772            })),
773        ))
774    })
775}
776
777/// Runs a model job request inline for tests and deterministic contract checks.
778pub fn run_model_job_inline_for_tests(
779    request: ModelAccessJobRequest,
780) -> jobs_core::Result<ModelAccessJobResult> {
781    let job_id = jobs_core::JobId::new(
782        request
783            .id
784            .clone()
785            .unwrap_or_else(|| default_job_id(&request)),
786    )?;
787    Ok(empty_access_result(
788        job_id,
789        request,
790        Some(serde_json::json!({ "inline": true })),
791    ))
792}
793
794#[cfg(feature = "jobs")]
795fn spawn_access_job<F>(
796    runner: &BackgroundJobRunner,
797    request: ModelAccessJobRequest,
798    run: F,
799) -> jobs_core::Result<ModelJobJoinHandle<ModelAccessJobResult>>
800where
801    F: FnOnce(
802            jobs_core::JobContext,
803            ModelAccessJobRequest,
804        ) -> jobs_core::Result<ModelAccessJobResult>
805        + Send
806        + 'static,
807{
808    let value = Arc::new(Mutex::new(None));
809    let value_for_job = Arc::clone(&value);
810    let job_spec = job_spec_for_request(&request)?;
811    let inner = runner.spawn(job_spec, move |context| {
812        for (key, value) in model_metadata(&request.spec, request.backend.clone()) {
813            context.metadata(key, value)?;
814        }
815        for (key, value) in &request.metadata {
816            context.metadata(key.clone(), value.clone())?;
817        }
818        let result = run(context, request)?;
819        *value_for_job.lock().map_err(|_| {
820            JobError::StateUnavailable("model job result lock poisoned".to_string())
821        })? = Some(result);
822        Ok(())
823    })?;
824    Ok(ModelJobJoinHandle { inner, value })
825}
826
827fn job_spec_for_request(request: &ModelAccessJobRequest) -> jobs_core::Result<JobSpec> {
828    let id = request
829        .id
830        .clone()
831        .unwrap_or_else(|| default_job_id(request));
832    let mut job = model_job_spec(id, request.kind, &request.spec, request.backend.clone())?;
833    for (key, value) in &request.metadata {
834        job = job.with_metadata(key.clone(), value.clone())?;
835    }
836    Ok(job)
837}
838
839fn default_job_id(request: &ModelAccessJobRequest) -> String {
840    format!(
841        "{}-{}-{}",
842        request.kind.as_str(),
843        request.spec.safe_name(),
844        request.spec.revision_value().unwrap_or("local")
845    )
846}
847
848fn empty_access_result(
849    job_id: jobs_core::JobId,
850    request: ModelAccessJobRequest,
851    output: Option<serde_json::Value>,
852) -> ModelAccessJobResult {
853    ModelAccessJobResult {
854        job_id,
855        kind: request.kind,
856        spec: request.spec,
857        backend: request.backend,
858        artifacts: Vec::new(),
859        diagnostics: Vec::new(),
860        output,
861    }
862}
863
864fn model_metadata(spec: &ModelSpec, backend: ModelRuntimeBackend) -> BTreeMap<String, String> {
865    let mut metadata = BTreeMap::new();
866    metadata.insert("model.name".to_string(), spec.name.clone());
867    metadata.insert(
868        "model.task".to_string(),
869        spec.task.as_protocol_str().to_string(),
870    );
871    metadata.insert("model.source".to_string(), spec.source.kind().to_string());
872    metadata.insert("model.runtime".to_string(), backend.as_str().to_string());
873    if let Some(revision) = spec.revision_value() {
874        metadata.insert("model.revision".to_string(), revision.to_string());
875    }
876    if let Some(repo_id) = spec.repo_id_value() {
877        metadata.insert("model.repoId".to_string(), repo_id.to_string());
878    }
879    metadata
880}
881
882fn artifact_id(request: &ModelAccessJobRequest, suffix: &str) -> String {
883    match &request.output_artifact_prefix {
884        Some(prefix) => format!("{prefix}-{suffix}"),
885        None => suffix.to_string(),
886    }
887}
888
889#[cfg(feature = "jobs")]
890fn artifact_ref_for_path(
891    id: impl Into<runtime_core::ArtifactId>,
892    kind: ArtifactKind,
893    media_type: impl Into<String>,
894    path: &Path,
895    metadata: BTreeMap<String, String>,
896) -> ArtifactRef {
897    let mut artifact = ArtifactRef::new(id, kind, media_type, file_uri(path));
898    artifact.size_bytes = path.metadata().ok().map(|metadata| metadata.len());
899    artifact.metadata = metadata;
900    artifact
901}
902
903#[cfg(feature = "jobs")]
904fn file_uri(path: &Path) -> String {
905    format!("file://{}", path.display())
906}