1use std::collections::BTreeMap;
2#[cfg(feature = "jobs")]
3use std::path::Path;
4use std::path::PathBuf;
5#[cfg(feature = "jobs")]
6use std::process::Command;
7#[cfg(feature = "jobs")]
8use std::sync::{Arc, Mutex};
9
10use jobs_core::{ArtifactKind, ArtifactRef, JobSpec};
11#[cfg(feature = "jobs")]
12use jobs_core::{BackgroundJobRunner, JobArtifact, JobError, JobProgress};
13use runtime_core::{
14 Diagnostic, OperationId, RuntimeRequirement, SurfaceArtifactExpectation, SurfaceExecutionMode,
15 SurfaceExecutionPlan, SurfaceSideEffect,
16};
17use serde::{Deserialize, Serialize};
18
19#[cfg(feature = "jobs")]
20use crate::{ModelBundle, ModelBundleStore};
21use crate::{ModelFileRequest, ModelRuntimeBackend, ModelSource, ModelSpec};
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
25pub enum ModelJobKind {
26 Download,
28 MaterializeBundle,
30 ValidateBundle,
32 Warmup,
34 Inference,
36 BatchInference,
38 ExternalCommand,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44#[serde(rename_all = "camelCase")]
45pub struct ModelAccessJobRequest {
46 pub id: Option<String>,
48 pub kind: ModelJobKind,
50 pub spec: ModelSpec,
52 pub backend: ModelRuntimeBackend,
54 #[serde(default)]
56 pub inputs: Vec<ModelJobInput>,
57 pub output_artifact_prefix: Option<String>,
59 #[serde(default)]
61 pub metadata: BTreeMap<String, String>,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
66#[serde(rename_all = "camelCase", tag = "kind", content = "value")]
67pub enum ModelJobInput {
68 Json(serde_json::Value),
70 Artifact(ArtifactRef),
72 LocalPath(PathBuf),
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
78#[serde(rename_all = "camelCase")]
79pub struct ModelAccessJobResult {
80 pub job_id: jobs_core::JobId,
82 pub kind: ModelJobKind,
84 pub spec: ModelSpec,
86 pub backend: ModelRuntimeBackend,
88 pub artifacts: Vec<ArtifactRef>,
90 pub diagnostics: Vec<Diagnostic>,
92 pub output: Option<serde_json::Value>,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
98#[serde(rename_all = "camelCase")]
99pub struct ModelBundlePlan {
100 pub spec: ModelSpec,
101 pub manifest_path: String,
102 pub files_directory: String,
103 pub files: Vec<ModelBundlePlanFile>,
104 pub artifact_refs: Vec<ArtifactRef>,
105 pub downloads_required: bool,
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
110#[serde(rename_all = "camelCase")]
111pub struct ModelBundlePlanFile {
112 pub remote_path: String,
113 pub local_path: String,
114 pub present_locally: bool,
115 pub required: bool,
116 pub media_type: String,
117}
118
119#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
121#[serde(rename_all = "camelCase")]
122pub struct ModelAccessPlan {
123 pub job_spec: jobs_core::JobSpec,
124 pub kind: ModelJobKind,
125 pub backend: ModelRuntimeBackend,
126 pub execution_plan: SurfaceExecutionPlan,
127 pub expected_artifacts: Vec<ArtifactRef>,
128}
129
130impl ModelJobKind {
131 pub fn as_str(self) -> &'static str {
133 match self {
134 Self::Download => "model-download",
135 Self::MaterializeBundle => "model-materialize-bundle",
136 Self::ValidateBundle => "model-validate-bundle",
137 Self::Warmup => "model-warmup",
138 Self::Inference => "model-inference",
139 Self::BatchInference => "model-batch-inference",
140 Self::ExternalCommand => "model-external-command",
141 }
142 }
143}
144
145pub fn model_job_spec(
147 id: impl Into<String>,
148 kind: ModelJobKind,
149 spec: &ModelSpec,
150 backend: ModelRuntimeBackend,
151) -> jobs_core::Result<JobSpec> {
152 let mut job = JobSpec::new(id, format!("{} {}", kind.as_str(), spec.name))?
153 .with_kind(kind.as_str())?
154 .with_metadata("model.name", spec.name.clone())?
155 .with_metadata("model.task", spec.task.as_protocol_str().to_string())?
156 .with_metadata("model.source", spec.source.kind().to_string())?
157 .with_metadata("model.runtime", backend.as_str().to_string())?;
158 if let Some(revision) = spec.revision_value() {
159 job = job.with_metadata("model.revision", revision.to_string())?;
160 }
161 if let Some(repo_id) = spec.repo_id_value() {
162 job = job.with_metadata("model.repoId", repo_id.to_string())?;
163 }
164 Ok(job)
165}
166
167pub fn plan_model_bundle(
169 spec: &ModelSpec,
170 local_files: &[String],
171) -> crate::Result<ModelBundlePlan> {
172 validate_model_spec(spec)?;
173 let safe_name = spec.safe_name();
174 let revision = spec.revision_value().unwrap_or("main");
175 let bundle_root = format!("{safe_name}/{revision}");
176 let files = resolve_requested_files(&spec.files, local_files);
177 let artifact_refs = files
178 .iter()
179 .map(|file| {
180 let mut artifact = ArtifactRef::new(
181 format!("model:{}", file.remote_path.replace(['/', '\\'], "_")),
182 model_file_kind(&file.remote_path),
183 file.media_type.clone(),
184 format!("{bundle_root}/{}", file.local_path),
185 );
186 artifact.metadata = model_metadata(spec, ModelRuntimeBackend::External);
187 artifact.metadata.insert(
188 "model.fileRole".to_string(),
189 model_file_role(&file.remote_path).to_string(),
190 );
191 artifact
192 })
193 .collect::<Vec<_>>();
194 let downloads_required = files
195 .iter()
196 .any(|file| file.required && !file.present_locally);
197 Ok(ModelBundlePlan {
198 spec: spec.clone(),
199 manifest_path: format!("{bundle_root}/manifest.json"),
200 files_directory: format!("{bundle_root}/files"),
201 files,
202 artifact_refs,
203 downloads_required,
204 })
205}
206
207pub fn plan_model_access(request: &ModelAccessJobRequest) -> jobs_core::Result<ModelAccessPlan> {
209 validate_model_spec_for_job(&request.spec)?;
210 let job_spec = job_spec_for_request(request)?;
211 let bundle_plan = plan_model_bundle(&request.spec, &[])
212 .map_err(|error| jobs_core::JobError::InvalidArgument(error.to_string()))?;
213 let expected_artifacts = expected_artifacts_for_request(request, &bundle_plan);
214 let execution_plan = SurfaceExecutionPlan {
215 operation: OperationId::new("model.executionPlan"),
216 mode: execution_mode_for_request(request),
217 side_effects: side_effects_for_request(request),
218 cancellable: true,
219 progress_unit: Some(progress_unit_for_kind(request.kind).to_string()),
220 expected_artifacts: expected_artifacts
221 .iter()
222 .map(|artifact| SurfaceArtifactExpectation {
223 id: artifact.id.as_str().to_string(),
224 kind: artifact_kind_name(&artifact.kind),
225 media_type: artifact.media_type.clone(),
226 required: true,
227 description: Some(format!("Expected {} artifact", artifact.id.as_str())),
228 })
229 .collect(),
230 requirements: runtime_requirements_for_request(request),
231 max_recommended_input_bytes: Some(1_048_576),
232 };
233 Ok(ModelAccessPlan {
234 job_spec,
235 kind: request.kind,
236 backend: request.backend.clone(),
237 execution_plan,
238 expected_artifacts,
239 })
240}
241
242fn validate_model_spec(spec: &ModelSpec) -> crate::Result<()> {
243 if spec.name.trim().is_empty() {
244 return Err(crate::ModelRuntimeError::InvalidArgument(
245 "model name must not be empty".to_string(),
246 ));
247 }
248 for file in &spec.files {
249 match file {
250 ModelFileRequest::Required(path) | ModelFileRequest::Optional(path) => {
251 validate_remote_file_path(path)?;
252 }
253 ModelFileRequest::FirstAvailable(paths) => {
254 if paths.is_empty() {
255 return Err(crate::ModelRuntimeError::InvalidArgument(
256 "first_available model file requests must include at least one path"
257 .to_string(),
258 ));
259 }
260 for path in paths {
261 validate_remote_file_path(path)?;
262 }
263 }
264 }
265 }
266 Ok(())
267}
268
269fn validate_model_spec_for_job(spec: &ModelSpec) -> jobs_core::Result<()> {
270 validate_model_spec(spec)
271 .map_err(|error| jobs_core::JobError::InvalidArgument(error.to_string()))
272}
273
274fn validate_remote_file_path(path: &str) -> crate::Result<()> {
275 if path.trim().is_empty() || path.starts_with('/') || path.contains("..") {
276 return Err(crate::ModelRuntimeError::InvalidArgument(format!(
277 "model file path `{path}` must be a relative file path"
278 )));
279 }
280 Ok(())
281}
282
283fn resolve_requested_files(
284 files: &[ModelFileRequest],
285 local_files: &[String],
286) -> Vec<ModelBundlePlanFile> {
287 files
288 .iter()
289 .filter_map(|request| match request {
290 ModelFileRequest::Required(path) => Some((path.clone(), true)),
291 ModelFileRequest::Optional(path) => Some((path.clone(), false)),
292 ModelFileRequest::FirstAvailable(paths) => paths
293 .iter()
294 .find(|path| local_files.iter().any(|local| local == *path))
295 .or_else(|| paths.first())
296 .map(|path| (path.clone(), true)),
297 })
298 .map(|(remote_path, required)| ModelBundlePlanFile {
299 local_path: format!("files/{remote_path}"),
300 present_locally: local_files.iter().any(|local| local == &remote_path),
301 media_type: model_file_media_type(&remote_path).to_string(),
302 remote_path,
303 required,
304 })
305 .collect()
306}
307
308fn expected_artifacts_for_request(
309 request: &ModelAccessJobRequest,
310 bundle_plan: &ModelBundlePlan,
311) -> Vec<ArtifactRef> {
312 match request.kind {
313 ModelJobKind::Download | ModelJobKind::MaterializeBundle => {
314 let mut artifacts = bundle_plan.artifact_refs.clone();
315 artifacts.push(model_manifest_artifact(request, bundle_plan));
316 artifacts
317 }
318 ModelJobKind::ValidateBundle => vec![model_manifest_artifact(request, bundle_plan)],
319 ModelJobKind::Warmup => Vec::new(),
320 ModelJobKind::Inference | ModelJobKind::BatchInference | ModelJobKind::ExternalCommand => {
321 vec![planned_output_artifact(request)]
322 }
323 }
324}
325
326fn model_manifest_artifact(
327 request: &ModelAccessJobRequest,
328 bundle_plan: &ModelBundlePlan,
329) -> ArtifactRef {
330 let mut artifact = ArtifactRef::new(
331 artifact_id(request, "manifest"),
332 ArtifactKind::Json,
333 "application/json",
334 bundle_plan.manifest_path.clone(),
335 );
336 artifact.metadata = model_metadata(&request.spec, request.backend.clone());
337 artifact
338}
339
340fn planned_output_artifact(request: &ModelAccessJobRequest) -> ArtifactRef {
341 let mut artifact = ArtifactRef::new(
342 artifact_id(request, "output"),
343 ArtifactKind::Json,
344 "application/json",
345 format!("memory://{}/output.json", default_job_id(request)),
346 );
347 artifact.metadata = model_metadata(&request.spec, request.backend.clone());
348 artifact
349}
350
351fn execution_mode_for_request(request: &ModelAccessJobRequest) -> SurfaceExecutionMode {
352 if request.kind == ModelJobKind::ExternalCommand
353 || matches!(request.backend, ModelRuntimeBackend::External)
354 || matches!(request.spec.source, ModelSource::ExternalCommand { .. })
355 {
356 SurfaceExecutionMode::ExternalCommand
357 } else {
358 SurfaceExecutionMode::PlannedJob
359 }
360}
361
362fn side_effects_for_request(request: &ModelAccessJobRequest) -> Vec<SurfaceSideEffect> {
363 match request.kind {
364 ModelJobKind::Download => vec![SurfaceSideEffect::Network, SurfaceSideEffect::WritesFiles],
365 ModelJobKind::MaterializeBundle => vec![
366 SurfaceSideEffect::ReadsFiles,
367 SurfaceSideEffect::WritesFiles,
368 ],
369 ModelJobKind::ValidateBundle => vec![SurfaceSideEffect::ReadsFiles],
370 ModelJobKind::ExternalCommand => vec![SurfaceSideEffect::ExternalProcess],
371 ModelJobKind::Warmup | ModelJobKind::Inference | ModelJobKind::BatchInference => {
372 vec![SurfaceSideEffect::None]
373 }
374 }
375}
376
377fn progress_unit_for_kind(kind: ModelJobKind) -> &'static str {
378 match kind {
379 ModelJobKind::BatchInference => "inputs",
380 ModelJobKind::Download | ModelJobKind::MaterializeBundle | ModelJobKind::ValidateBundle => {
381 "files"
382 }
383 ModelJobKind::Warmup | ModelJobKind::Inference | ModelJobKind::ExternalCommand => "steps",
384 }
385}
386
387fn runtime_requirements_for_request(request: &ModelAccessJobRequest) -> Vec<RuntimeRequirement> {
388 let mut requirements = Vec::new();
389 if request.kind == ModelJobKind::Download {
390 requirements.push(RuntimeRequirement {
391 name: "network".to_string(),
392 description: Some("Model file download requires network access".to_string()),
393 required: true,
394 });
395 }
396 if matches!(request.kind, ModelJobKind::ExternalCommand)
397 || matches!(request.backend, ModelRuntimeBackend::External)
398 {
399 requirements.push(RuntimeRequirement {
400 name: "external-command".to_string(),
401 description: Some("Execution requires a caller-provided command".to_string()),
402 required: true,
403 });
404 }
405 requirements
406}
407
408fn model_file_kind(remote_path: &str) -> ArtifactKind {
409 match model_file_role(remote_path) {
410 "config" | "tokenizer" => ArtifactKind::Json,
411 "vocabulary" => ArtifactKind::Text,
412 _ => ArtifactKind::Binary,
413 }
414}
415
416fn model_file_media_type(remote_path: &str) -> &'static str {
417 if remote_path.ends_with(".json") {
418 "application/json"
419 } else if remote_path.ends_with(".txt") || remote_path.ends_with(".md") {
420 "text/plain"
421 } else {
422 "application/octet-stream"
423 }
424}
425
426fn model_file_role(remote_path: &str) -> &'static str {
427 let file_name = remote_path.rsplit('/').next().unwrap_or(remote_path);
428 if file_name == "config.json" {
429 "config"
430 } else if file_name.contains("tokenizer") {
431 "tokenizer"
432 } else if matches!(file_name, "vocab.txt" | "merges.txt") {
433 "vocabulary"
434 } else if file_name.ends_with(".onnx")
435 || file_name.ends_with(".safetensors")
436 || file_name.ends_with(".bin")
437 || file_name.ends_with(".pt")
438 {
439 "weights"
440 } else {
441 "artifact"
442 }
443}
444
445fn artifact_kind_name(kind: &ArtifactKind) -> String {
446 match kind {
447 ArtifactKind::File => "file",
448 ArtifactKind::Directory => "directory",
449 ArtifactKind::Image => "image",
450 ArtifactKind::Audio => "audio",
451 ArtifactKind::Video => "video",
452 ArtifactKind::Text => "text",
453 ArtifactKind::Json => "json",
454 ArtifactKind::Log => "log",
455 ArtifactKind::Archive => "archive",
456 ArtifactKind::Binary => "binary",
457 ArtifactKind::Other(value) => value.as_str(),
458 }
459 .to_string()
460}
461
462#[cfg(feature = "jobs")]
464#[derive(Debug)]
465pub struct ModelJobJoinHandle<T> {
466 inner: jobs_core::JobJoinHandle,
467 value: Arc<Mutex<Option<T>>>,
468}
469
470#[cfg(feature = "jobs")]
471impl<T: Clone> ModelJobJoinHandle<T> {
472 pub fn job(&self) -> &jobs_core::JobHandle {
474 self.inner.job()
475 }
476
477 pub fn id(&self) -> &jobs_core::JobId {
479 self.inner.id()
480 }
481
482 pub fn request_cancel(&self) -> jobs_core::Result<()> {
484 self.inner.request_cancel()
485 }
486
487 pub fn join_result(&mut self) -> jobs_core::Result<T> {
489 self.inner.join()?;
490 self.value
491 .lock()
492 .map_err(|_| JobError::StateUnavailable("model job result lock poisoned".to_string()))?
493 .clone()
494 .ok_or_else(|| JobError::Failed("model job did not produce a result".to_string()))
495 }
496}
497
498#[cfg(feature = "jobs")]
500pub fn spawn_model_download_job(
501 runner: &BackgroundJobRunner,
502 spec: ModelSpec,
503 store: ModelBundleStore,
504) -> jobs_core::Result<ModelJobJoinHandle<ModelBundle>> {
505 let value = Arc::new(Mutex::new(None));
506 let value_for_job = Arc::clone(&value);
507 let job_spec = model_job_spec(
508 format!(
509 "model-download-{}-{}",
510 spec.safe_name(),
511 spec.revision_value().unwrap_or("local")
512 ),
513 ModelJobKind::Download,
514 &spec,
515 ModelRuntimeBackend::External,
516 )?;
517 let inner = runner.spawn(job_spec, move |context| {
518 context.info(format!("materializing model bundle `{}`", spec.name))?;
519 context.progress(
520 JobProgress::new(0, Some(2))?
521 .unit("steps")?
522 .message("starting model download"),
523 )?;
524 context.check_cancelled()?;
525 let bundle = store
526 .download(&spec)
527 .map_err(|err| JobError::Failed(err.to_string()))?;
528 context.check_cancelled()?;
529 context.progress(
530 JobProgress::new(1, Some(2))?
531 .unit("steps")?
532 .message("model files materialized"),
533 )?;
534 context.artifact(
535 JobArtifact::new("manifest", "Model bundle manifest")
536 .kind("model-bundle")
537 .path(bundle.manifest_path()),
538 )?;
539 *value_for_job.lock().map_err(|_| {
540 JobError::StateUnavailable("model job result lock poisoned".to_string())
541 })? = Some(bundle);
542 context.progress(
543 JobProgress::new(2, Some(2))?
544 .unit("steps")?
545 .message("model bundle ready"),
546 )?;
547 Ok(())
548 })?;
549 Ok(ModelJobJoinHandle { inner, value })
550}
551
552#[cfg(feature = "jobs")]
554pub fn spawn_model_materialize_job(
555 runner: &BackgroundJobRunner,
556 request: ModelAccessJobRequest,
557 store: ModelBundleStore,
558) -> jobs_core::Result<ModelJobJoinHandle<ModelAccessJobResult>> {
559 spawn_access_job(runner, request, move |context, request| {
560 context.info(format!(
561 "materializing model bundle `{}`",
562 request.spec.name
563 ))?;
564 context.progress(
565 JobProgress::new(0, Some(2))?
566 .unit("steps")?
567 .message("starting model materialization"),
568 )?;
569 context.check_cancelled()?;
570 let bundle = store
571 .download(&request.spec)
572 .map_err(|err| JobError::Failed(err.to_string()))?;
573 context.check_cancelled()?;
574 let mut artifacts = bundle.artifact_refs();
575 let manifest_artifact = artifact_ref_for_path(
576 artifact_id(&request, "manifest"),
577 ArtifactKind::Json,
578 "application/json",
579 &bundle.manifest_path(),
580 model_metadata(&request.spec, request.backend.clone()),
581 );
582 context.artifact(
583 JobArtifact::new("manifest", "Model bundle manifest")
584 .kind("model-bundle")
585 .path(bundle.manifest_path()),
586 )?;
587 artifacts.push(manifest_artifact);
588 context.progress(
589 JobProgress::new(2, Some(2))?
590 .unit("steps")?
591 .message("model bundle materialized"),
592 )?;
593 Ok(ModelAccessJobResult {
594 job_id: context.id().clone(),
595 kind: request.kind,
596 spec: request.spec,
597 backend: request.backend,
598 artifacts,
599 diagnostics: Vec::new(),
600 output: Some(serde_json::json!({
601 "bundleRoot": bundle.root,
602 "manifestPath": bundle.manifest_path(),
603 })),
604 })
605 })
606}
607
608#[cfg(feature = "jobs")]
610pub fn spawn_model_validate_job(
611 runner: &BackgroundJobRunner,
612 request: ModelAccessJobRequest,
613) -> jobs_core::Result<ModelJobJoinHandle<ModelAccessJobResult>> {
614 spawn_access_job(runner, request, move |context, request| {
615 context.info(format!(
616 "validating model access for `{}`",
617 request.spec.name
618 ))?;
619 context.check_cancelled()?;
620 let mut artifacts = Vec::new();
621 for (index, input) in request.inputs.iter().enumerate() {
622 if let ModelJobInput::LocalPath(path) = input {
623 if !path.exists() {
624 return Err(JobError::NotFound(format!(
625 "model input path `{}` does not exist",
626 path.display()
627 )));
628 }
629 let kind = if path.is_dir() {
630 ArtifactKind::Directory
631 } else {
632 ArtifactKind::File
633 };
634 let artifact = artifact_ref_for_path(
635 artifact_id(&request, &format!("input-{index}")),
636 kind,
637 "application/octet-stream",
638 path,
639 model_metadata(&request.spec, request.backend.clone()),
640 );
641 context.artifact(
642 JobArtifact::new(format!("input-{index}"), "Validated model input").path(path),
643 )?;
644 artifacts.push(artifact);
645 }
646 }
647 context.check_cancelled()?;
648 Ok(ModelAccessJobResult {
649 job_id: context.id().clone(),
650 kind: request.kind,
651 spec: request.spec,
652 backend: request.backend,
653 artifacts,
654 diagnostics: Vec::new(),
655 output: Some(serde_json::json!({ "valid": true })),
656 })
657 })
658}
659
660#[cfg(feature = "jobs")]
662pub fn spawn_model_warmup_job(
663 runner: &BackgroundJobRunner,
664 request: ModelAccessJobRequest,
665) -> jobs_core::Result<ModelJobJoinHandle<ModelAccessJobResult>> {
666 spawn_access_job(runner, request, move |context, request| {
667 context.info(format!(
668 "warming model runtime `{}`",
669 request.backend.as_str()
670 ))?;
671 context.check_cancelled()?;
672 context.progress(
673 JobProgress::new(1, Some(1))?
674 .unit("steps")?
675 .message("model warmup recorded"),
676 )?;
677 Ok(empty_access_result(
678 context.id().clone(),
679 request,
680 Some(serde_json::json!({ "warmed": true })),
681 ))
682 })
683}
684
685#[cfg(feature = "jobs")]
687pub fn spawn_model_inference_job(
688 runner: &BackgroundJobRunner,
689 request: ModelAccessJobRequest,
690) -> jobs_core::Result<ModelJobJoinHandle<ModelAccessJobResult>> {
691 spawn_access_job(runner, request, move |context, request| {
692 context.info(format!("running model inference `{}`", request.spec.name))?;
693 context.check_cancelled()?;
694 let output = serde_json::json!({
695 "inputCount": request.inputs.len(),
696 "backend": request.backend.as_str(),
697 });
698 context.check_cancelled()?;
699 Ok(empty_access_result(
700 context.id().clone(),
701 request,
702 Some(output),
703 ))
704 })
705}
706
707#[cfg(feature = "jobs")]
709pub fn spawn_model_batch_inference_job(
710 runner: &BackgroundJobRunner,
711 request: ModelAccessJobRequest,
712) -> jobs_core::Result<ModelJobJoinHandle<ModelAccessJobResult>> {
713 spawn_access_job(runner, request, move |context, request| {
714 context.info(format!(
715 "running batch model inference `{}`",
716 request.spec.name
717 ))?;
718 context.check_cancelled()?;
719 let input_count = request.inputs.len();
720 context.progress(
721 JobProgress::new(input_count as u64, Some(input_count.max(1) as u64))?
722 .unit("inputs")?
723 .message("batch inputs accepted"),
724 )?;
725 context.check_cancelled()?;
726 Ok(empty_access_result(
727 context.id().clone(),
728 request,
729 Some(serde_json::json!({ "inputCount": input_count })),
730 ))
731 })
732}
733
734#[cfg(feature = "jobs")]
736pub fn spawn_external_model_command_job(
737 runner: &BackgroundJobRunner,
738 request: ModelAccessJobRequest,
739) -> jobs_core::Result<ModelJobJoinHandle<ModelAccessJobResult>> {
740 spawn_access_job(runner, request, move |context, request| {
741 let command = match &request.spec.source {
742 ModelSource::ExternalCommand { command } => command.clone(),
743 _ => {
744 return Err(JobError::InvalidArgument(
745 "external model command jobs require ModelSource::ExternalCommand".to_string(),
746 ));
747 }
748 };
749 context.info(format!(
750 "running external model command `{}`",
751 command.display()
752 ))?;
753 context.check_cancelled()?;
754 let output = Command::new(&command).output().map_err(|err| {
755 JobError::Failed(format!("failed to run `{}`: {err}", command.display()))
756 })?;
757 context.check_cancelled()?;
758 if !output.status.success() {
759 return Err(JobError::Failed(format!(
760 "`{}` exited with status {}",
761 command.display(),
762 output.status
763 )));
764 }
765 Ok(empty_access_result(
766 context.id().clone(),
767 request,
768 Some(serde_json::json!({
769 "status": output.status.code(),
770 "stdout": String::from_utf8_lossy(&output.stdout),
771 "stderr": String::from_utf8_lossy(&output.stderr),
772 })),
773 ))
774 })
775}
776
777pub fn run_model_job_inline_for_tests(
779 request: ModelAccessJobRequest,
780) -> jobs_core::Result<ModelAccessJobResult> {
781 let job_id = jobs_core::JobId::new(
782 request
783 .id
784 .clone()
785 .unwrap_or_else(|| default_job_id(&request)),
786 )?;
787 Ok(empty_access_result(
788 job_id,
789 request,
790 Some(serde_json::json!({ "inline": true })),
791 ))
792}
793
794#[cfg(feature = "jobs")]
795fn spawn_access_job<F>(
796 runner: &BackgroundJobRunner,
797 request: ModelAccessJobRequest,
798 run: F,
799) -> jobs_core::Result<ModelJobJoinHandle<ModelAccessJobResult>>
800where
801 F: FnOnce(
802 jobs_core::JobContext,
803 ModelAccessJobRequest,
804 ) -> jobs_core::Result<ModelAccessJobResult>
805 + Send
806 + 'static,
807{
808 let value = Arc::new(Mutex::new(None));
809 let value_for_job = Arc::clone(&value);
810 let job_spec = job_spec_for_request(&request)?;
811 let inner = runner.spawn(job_spec, move |context| {
812 for (key, value) in model_metadata(&request.spec, request.backend.clone()) {
813 context.metadata(key, value)?;
814 }
815 for (key, value) in &request.metadata {
816 context.metadata(key.clone(), value.clone())?;
817 }
818 let result = run(context, request)?;
819 *value_for_job.lock().map_err(|_| {
820 JobError::StateUnavailable("model job result lock poisoned".to_string())
821 })? = Some(result);
822 Ok(())
823 })?;
824 Ok(ModelJobJoinHandle { inner, value })
825}
826
827fn job_spec_for_request(request: &ModelAccessJobRequest) -> jobs_core::Result<JobSpec> {
828 let id = request
829 .id
830 .clone()
831 .unwrap_or_else(|| default_job_id(request));
832 let mut job = model_job_spec(id, request.kind, &request.spec, request.backend.clone())?;
833 for (key, value) in &request.metadata {
834 job = job.with_metadata(key.clone(), value.clone())?;
835 }
836 Ok(job)
837}
838
839fn default_job_id(request: &ModelAccessJobRequest) -> String {
840 format!(
841 "{}-{}-{}",
842 request.kind.as_str(),
843 request.spec.safe_name(),
844 request.spec.revision_value().unwrap_or("local")
845 )
846}
847
848fn empty_access_result(
849 job_id: jobs_core::JobId,
850 request: ModelAccessJobRequest,
851 output: Option<serde_json::Value>,
852) -> ModelAccessJobResult {
853 ModelAccessJobResult {
854 job_id,
855 kind: request.kind,
856 spec: request.spec,
857 backend: request.backend,
858 artifacts: Vec::new(),
859 diagnostics: Vec::new(),
860 output,
861 }
862}
863
864fn model_metadata(spec: &ModelSpec, backend: ModelRuntimeBackend) -> BTreeMap<String, String> {
865 let mut metadata = BTreeMap::new();
866 metadata.insert("model.name".to_string(), spec.name.clone());
867 metadata.insert(
868 "model.task".to_string(),
869 spec.task.as_protocol_str().to_string(),
870 );
871 metadata.insert("model.source".to_string(), spec.source.kind().to_string());
872 metadata.insert("model.runtime".to_string(), backend.as_str().to_string());
873 if let Some(revision) = spec.revision_value() {
874 metadata.insert("model.revision".to_string(), revision.to_string());
875 }
876 if let Some(repo_id) = spec.repo_id_value() {
877 metadata.insert("model.repoId".to_string(), repo_id.to_string());
878 }
879 metadata
880}
881
882fn artifact_id(request: &ModelAccessJobRequest, suffix: &str) -> String {
883 match &request.output_artifact_prefix {
884 Some(prefix) => format!("{prefix}-{suffix}"),
885 None => suffix.to_string(),
886 }
887}
888
889#[cfg(feature = "jobs")]
890fn artifact_ref_for_path(
891 id: impl Into<runtime_core::ArtifactId>,
892 kind: ArtifactKind,
893 media_type: impl Into<String>,
894 path: &Path,
895 metadata: BTreeMap<String, String>,
896) -> ArtifactRef {
897 let mut artifact = ArtifactRef::new(id, kind, media_type, file_uri(path));
898 artifact.size_bytes = path.metadata().ok().map(|metadata| metadata.len());
899 artifact.metadata = metadata;
900 artifact
901}
902
903#[cfg(feature = "jobs")]
904fn file_uri(path: &Path) -> String {
905 format!("file://{}", path.display())
906}