posemesh_compute_node/storage/
mod.rs1use anyhow::{anyhow, Result};
4use compute_runner_api::LeaseEnvelope;
5use parking_lot::Mutex;
6use serde_json::Value;
7use std::{collections::HashMap, sync::Arc};
8
9pub mod client;
10pub mod input;
11pub mod output;
12pub mod token;
13
14use output::{DomainOutput, UploadedArtifact};
15pub use token::TokenRef;
16
17pub struct Ports {
19 pub input: Box<dyn compute_runner_api::InputSource>,
20 pub output: Box<dyn compute_runner_api::ArtifactSink>,
21 uploads: Arc<Mutex<HashMap<String, UploadedArtifact>>>,
22}
23
24impl Ports {
25 pub fn uploaded_artifacts(&self) -> Vec<UploadedArtifact> {
26 let guard = self.uploads.lock();
27 guard.values().cloned().collect()
28 }
29}
30pub fn build_ports(lease: &LeaseEnvelope, token: TokenRef) -> Result<Ports> {
32 let base = lease
33 .domain_server_url
34 .clone()
35 .ok_or_else(|| anyhow!("lease missing domain_server_url"))?;
36 let outputs_prefix = lease.task.outputs_prefix.clone();
37 if lease.task.outputs_prefix.is_none() {
38 tracing::debug!(
39 task_id = %lease.task.id,
40 "Lease missing outputs_prefix; defaulting to empty prefix"
41 );
42 }
43 let domain_id = lease
44 .domain_id
45 .map(|id| id.to_string())
46 .ok_or_else(|| anyhow!("lease missing domain_id"))?;
47 let override_manifest_id = lease
48 .task
49 .meta
50 .get("legacy")
51 .and_then(|value| value.get("override_manifest_id"))
52 .and_then(Value::as_str)
53 .map(str::trim)
54 .filter(|s| !s.is_empty())
55 .map(String::from);
56
57 let task_id = lease.task.id.to_string();
58
59 let client = client::DomainClient::new(base, token)?;
60 let uploads = Arc::new(Mutex::new(HashMap::new()));
61 let output = DomainOutput::with_store(
62 client.clone(),
63 domain_id,
64 outputs_prefix,
65 task_id,
66 Arc::clone(&uploads),
67 );
68 if let Some(override_id) = override_manifest_id {
69 output.seed_uploaded_artifact("job_manifest.json", override_id);
70 }
71 Ok(Ports {
72 input: Box::new(input::DomainInput::new(client.clone())),
73 output: Box::new(output),
74 uploads,
75 })
76}