posemesh_compute_node/storage/
mod.rs1use anyhow::{anyhow, Result};
4use compute_runner_api::LeaseEnvelope;
5use parking_lot::Mutex;
6use std::{collections::HashMap, sync::Arc};
7
8pub mod client;
9pub mod input;
10pub mod output;
11pub mod token;
12
13use output::{DomainOutput, UploadedArtifact};
14pub use token::TokenRef;
15
16pub struct Ports {
18 pub input: Box<dyn compute_runner_api::InputSource>,
19 pub output: Box<dyn compute_runner_api::ArtifactSink>,
20 uploads: Arc<Mutex<HashMap<String, UploadedArtifact>>>,
21}
22
23impl Ports {
24 pub fn uploaded_artifacts(&self) -> Vec<UploadedArtifact> {
25 let guard = self.uploads.lock();
26 guard.values().cloned().collect()
27 }
28}
29pub fn build_ports(lease: &LeaseEnvelope, token: TokenRef) -> Result<Ports> {
31 let base = lease
32 .domain_server_url
33 .clone()
34 .ok_or_else(|| anyhow!("lease missing domain_server_url"))?;
35 let outputs_prefix = lease.task.outputs_prefix.clone();
36 if lease.task.outputs_prefix.is_none() {
37 tracing::debug!(
38 task_id = %lease.task.id,
39 "Lease missing outputs_prefix; defaulting to empty prefix"
40 );
41 }
42 let domain_id = lease
43 .domain_id
44 .map(|id| id.to_string())
45 .ok_or_else(|| anyhow!("lease missing domain_id"))?;
46 let task_id = lease.task.id.to_string();
47
48 let client = client::DomainClient::new(base, token)?;
49 let uploads = Arc::new(Mutex::new(HashMap::new()));
50 let output = DomainOutput::with_store(
51 client.clone(),
52 domain_id,
53 outputs_prefix,
54 task_id,
55 Arc::clone(&uploads),
56 );
57 Ok(Ports {
58 input: Box::new(input::DomainInput::new(client.clone())),
59 output: Box::new(output),
60 uploads,
61 })
62}