posemesh_compute_node/storage/
mod.rs

1//! Storage module: TokenRef, client, and InputSource/ArtifactSink wrappers.
2
3use anyhow::{anyhow, Result};
4use compute_runner_api::LeaseEnvelope;
5use parking_lot::Mutex;
6use serde_json::Value;
7use std::{collections::HashMap, sync::Arc};
8
9pub mod client;
10pub mod input;
11pub mod output;
12pub mod token;
13
14use output::{DomainOutput, UploadedArtifact};
15pub use token::TokenRef;
16
17/// Pair of input/output ports backed by the domain client.
18pub struct Ports {
19    pub input: Box<dyn compute_runner_api::InputSource>,
20    pub output: Box<dyn compute_runner_api::ArtifactSink>,
21    uploads: Arc<Mutex<HashMap<String, UploadedArtifact>>>,
22}
23
24impl Ports {
25    pub fn uploaded_artifacts(&self) -> Vec<UploadedArtifact> {
26        let guard = self.uploads.lock();
27        guard.values().cloned().collect()
28    }
29}
30/// Build storage ports from a lease and a TokenRef.
31pub fn build_ports(lease: &LeaseEnvelope, token: TokenRef) -> Result<Ports> {
32    let base = lease
33        .domain_server_url
34        .clone()
35        .ok_or_else(|| anyhow!("lease missing domain_server_url"))?;
36    let outputs_prefix = lease.task.outputs_prefix.clone();
37    if lease.task.outputs_prefix.is_none() {
38        tracing::debug!(
39            task_id = %lease.task.id,
40            "Lease missing outputs_prefix; defaulting to empty prefix"
41        );
42    }
43    let domain_id = lease
44        .domain_id
45        .map(|id| id.to_string())
46        .ok_or_else(|| anyhow!("lease missing domain_id"))?;
47    let override_job_name = lease
48        .task
49        .meta
50        .get("legacy")
51        .and_then(|value| value.get("override_job_name"))
52        .and_then(Value::as_str)
53        .map(str::trim)
54        .filter(|s| !s.is_empty())
55        .map(String::from);
56    let override_manifest_id = lease
57        .task
58        .meta
59        .get("legacy")
60        .and_then(|value| value.get("override_manifest_id"))
61        .and_then(Value::as_str)
62        .map(str::trim)
63        .filter(|s| !s.is_empty())
64        .map(String::from);
65
66    let job_id = lease
67        .task
68        .job_id
69        .map(|id| id.to_string())
70        .or_else(|| override_job_name.clone());
71    let task_id = lease.task.id.to_string();
72
73    let client = client::DomainClient::new(base, token)?;
74    let uploads = Arc::new(Mutex::new(HashMap::new()));
75    let output = DomainOutput::with_store(
76        client.clone(),
77        domain_id,
78        outputs_prefix,
79        job_id,
80        task_id,
81        Arc::clone(&uploads),
82    );
83    if let Some(override_id) = override_manifest_id {
84        output.seed_uploaded_artifact("job_manifest.json", override_id);
85    }
86    Ok(Ports {
87        input: Box::new(input::DomainInput::new(client.clone())),
88        output: Box::new(output),
89        uploads,
90    })
91}