posemesh_compute_node/storage/
output.rs

1use super::client::{DomainClient, UploadRequest};
2use anyhow::{anyhow, Result};
3use async_trait::async_trait;
4use parking_lot::Mutex;
5use serde::Serialize;
6use std::collections::HashMap;
7use std::path::Path;
8use std::sync::Arc;
9use tokio::fs;
10
11use compute_runner_api::runner::{DomainArtifactContent, DomainArtifactRequest};
12#[derive(Clone, Debug)]
13struct UploadDescriptor {
14    logical_path: String,
15    name: String,
16    data_type: String,
17}
18
19#[derive(Clone, Debug, Serialize)]
20pub struct UploadedArtifact {
21    pub logical_path: String,
22    pub name: String,
23    pub data_type: String,
24    pub id: Option<String>,
25}
26
27/// Domain ArtifactSink implementation (skeleton).
28#[derive(Clone)]
29pub struct DomainOutput {
30    client: DomainClient,
31    domain_id: String,
32    outputs_prefix: Option<String>,
33    task_id: String,
34    uploads: Arc<Mutex<HashMap<String, UploadedArtifact>>>,
35}
36
37impl DomainOutput {
38    pub fn new(
39        client: DomainClient,
40        domain_id: String,
41        outputs_prefix: Option<String>,
42        task_id: String,
43    ) -> Self {
44        Self::with_store(
45            client,
46            domain_id,
47            outputs_prefix,
48            task_id,
49            Arc::new(Mutex::new(HashMap::new())),
50        )
51    }
52
53    pub fn with_store(
54        client: DomainClient,
55        domain_id: String,
56        outputs_prefix: Option<String>,
57        task_id: String,
58        uploads: Arc<Mutex<HashMap<String, UploadedArtifact>>>,
59    ) -> Self {
60        Self {
61            client,
62            domain_id,
63            outputs_prefix,
64            task_id,
65            uploads,
66        }
67    }
68
69    fn name_suffix(&self) -> String {
70        self.task_id.clone()
71    }
72
73    fn apply_outputs_prefix(&self, rel_path: &str) -> String {
74        let trimmed_rel = rel_path.trim_start_matches('/');
75        match self
76            .outputs_prefix
77            .as_ref()
78            .map(|p| p.trim_matches('/'))
79            .filter(|p| !p.is_empty())
80        {
81            Some(prefix) if trimmed_rel.is_empty() => prefix.to_string(),
82            Some(prefix) => format!("{}/{}", prefix, trimmed_rel),
83            None => trimmed_rel.to_string(),
84        }
85    }
86
87    fn descriptor_for(&self, rel_path: &str) -> UploadDescriptor {
88        let logical_path = self.apply_outputs_prefix(rel_path);
89        let sanitized = sanitize_component(&logical_path.replace('/', "_"));
90        let data_type = infer_data_type(rel_path);
91        UploadDescriptor {
92            logical_path,
93            name: format!("{}_{}", sanitized, self.name_suffix()),
94            data_type,
95        }
96    }
97}
98
99#[async_trait]
100impl compute_runner_api::ArtifactSink for DomainOutput {
101    async fn put_bytes(&self, rel_path: &str, bytes: &[u8]) -> Result<()> {
102        let descriptor = self.descriptor_for(rel_path);
103        self.put_domain_artifact(DomainArtifactRequest {
104            rel_path,
105            name: &descriptor.name,
106            data_type: &descriptor.data_type,
107            existing_id: None,
108            content: DomainArtifactContent::Bytes(bytes),
109        })
110        .await
111        .map(|_| ())
112    }
113
114    async fn put_file(&self, rel_path: &str, file_path: &std::path::Path) -> Result<()> {
115        let descriptor = self.descriptor_for(rel_path);
116        self.put_domain_artifact(DomainArtifactRequest {
117            rel_path,
118            name: &descriptor.name,
119            data_type: &descriptor.data_type,
120            existing_id: None,
121            content: DomainArtifactContent::File(file_path),
122        })
123        .await
124        .map(|_| ())
125    }
126
127    async fn open_multipart(
128        &self,
129        _rel_path: &str,
130    ) -> Result<Box<dyn compute_runner_api::runner::MultipartUpload>> {
131        // Implemented in later prompt.
132        unimplemented!("multipart not implemented yet")
133    }
134
135    async fn put_domain_artifact(
136        &self,
137        request: DomainArtifactRequest<'_>,
138    ) -> Result<Option<String>> {
139        let logical_path = self.apply_outputs_prefix(request.rel_path);
140        let key = logical_path.clone();
141        let mut existing_id = request.existing_id.map(|s| s.to_string());
142        if existing_id.is_none() {
143            let uploads = self.uploads.lock();
144            existing_id = uploads.get(&key).and_then(|record| record.id.clone());
145        }
146        if existing_id.is_none() {
147            existing_id = self
148                .client
149                .find_artifact_id(&self.domain_id, request.name, request.data_type)
150                .await
151                .map_err(|e| anyhow!(e))?;
152        }
153
154        let bytes_owned;
155        let bytes = match request.content {
156            DomainArtifactContent::Bytes(b) => b,
157            DomainArtifactContent::File(path) => {
158                bytes_owned = fs::read(path).await?;
159                bytes_owned.as_slice()
160            }
161        };
162
163        let upload_req = UploadRequest {
164            domain_id: &self.domain_id,
165            name: request.name,
166            data_type: request.data_type,
167            logical_path: &logical_path,
168            bytes,
169            existing_id: existing_id.as_deref(),
170        };
171
172        let maybe_id = self
173            .client
174            .upload_artifact(upload_req)
175            .await
176            .map_err(|e| anyhow!(e))?;
177        let final_id = maybe_id.or(existing_id);
178
179        let mut uploads = self.uploads.lock();
180        uploads.insert(
181            key,
182            UploadedArtifact {
183                logical_path,
184                name: request.name.to_string(),
185                data_type: request.data_type.to_string(),
186                id: final_id.clone(),
187            },
188        );
189
190        Ok(final_id)
191    }
192}
193
194impl DomainOutput {
195    pub fn uploaded_artifacts(&self) -> Vec<UploadedArtifact> {
196        let guard = self.uploads.lock();
197        guard.values().cloned().collect()
198    }
199
200    pub fn seed_uploaded_artifact(&self, rel_path: &str, id: impl Into<String>) {
201        let descriptor = self.descriptor_for(rel_path);
202        let mut uploads = self.uploads.lock();
203        uploads.insert(
204            descriptor.logical_path.clone(),
205            UploadedArtifact {
206                logical_path: descriptor.logical_path,
207                name: descriptor.name,
208                data_type: descriptor.data_type,
209                id: Some(id.into()),
210            },
211        );
212    }
213}
214
215fn infer_data_type(rel_path: &str) -> String {
216    let ext = Path::new(rel_path)
217        .extension()
218        .and_then(|ext| ext.to_str())
219        .map(|ext| ext.trim().to_ascii_lowercase());
220    match ext.as_deref() {
221        Some("json") => "json".into(),
222        Some("ply") => "ply".into(),
223        Some("drc") => "ply_draco".into(),
224        Some("glb") => "glb".into(),
225        Some("obj") => "obj".into(),
226        Some("csv") => "csv".into(),
227        Some("mp4") => "mp4".into(),
228        Some(other) => format!("{}_data", sanitize_component(other)),
229        None => "binary".into(),
230    }
231}
232
233fn sanitize_component(value: &str) -> String {
234    let sanitized: String = value
235        .chars()
236        .map(|c| {
237            if c.is_ascii_alphanumeric() || matches!(c, '-' | '_') {
238                c
239            } else {
240                '_'
241            }
242        })
243        .collect();
244    if sanitized.is_empty() {
245        "artifact".into()
246    } else {
247        sanitized
248    }
249}