posemesh_compute_node/storage/
mod.rs

1//! Storage module: TokenRef, client, and InputSource/ArtifactSink wrappers.
2
3use anyhow::{anyhow, Result};
4use compute_runner_api::LeaseEnvelope;
5use parking_lot::Mutex;
6use serde_json::Value;
7use std::{collections::HashMap, sync::Arc};
8
9pub mod client;
10pub mod input;
11pub mod output;
12pub mod token;
13
14use output::{DomainOutput, UploadedArtifact};
15pub use token::TokenRef;
16
17/// Pair of input/output ports backed by the domain client.
18pub struct Ports {
19    pub input: Box<dyn compute_runner_api::InputSource>,
20    pub output: Box<dyn compute_runner_api::ArtifactSink>,
21    uploads: Arc<Mutex<HashMap<String, UploadedArtifact>>>,
22}
23
24impl Ports {
25    pub fn uploaded_artifacts(&self) -> Vec<UploadedArtifact> {
26        let guard = self.uploads.lock();
27        guard.values().cloned().collect()
28    }
29}
30/// Build storage ports from a lease and a TokenRef.
31pub fn build_ports(lease: &LeaseEnvelope, token: TokenRef) -> Result<Ports> {
32    let base = lease
33        .domain_server_url
34        .clone()
35        .ok_or_else(|| anyhow!("lease missing domain_server_url"))?;
36    let outputs_prefix = lease.task.outputs_prefix.clone();
37    if lease.task.outputs_prefix.is_none() {
38        tracing::debug!(
39            task_id = %lease.task.id,
40            "Lease missing outputs_prefix; defaulting to empty prefix"
41        );
42    }
43    let domain_id = lease
44        .domain_id
45        .map(|id| id.to_string())
46        .ok_or_else(|| anyhow!("lease missing domain_id"))?;
47    let override_manifest_id = lease
48        .task
49        .meta
50        .get("legacy")
51        .and_then(|value| value.get("override_manifest_id"))
52        .and_then(Value::as_str)
53        .map(str::trim)
54        .filter(|s| !s.is_empty())
55        .map(String::from);
56
57    let task_id = lease.task.id.to_string();
58
59    let client = client::DomainClient::new(base, token)?;
60    let uploads = Arc::new(Mutex::new(HashMap::new()));
61    let output = DomainOutput::with_store(
62        client.clone(),
63        domain_id,
64        outputs_prefix,
65        task_id,
66        Arc::clone(&uploads),
67    );
68    if let Some(override_id) = override_manifest_id {
69        output.seed_uploaded_artifact("job_manifest.json", override_id);
70    }
71    Ok(Ports {
72        input: Box::new(input::DomainInput::new(client.clone())),
73        output: Box::new(output),
74        uploads,
75    })
76}