posemesh_compute_node/storage/
output.rs

1use super::client::{DomainClient, UploadRequest};
2use anyhow::{anyhow, Result};
3use async_trait::async_trait;
4use parking_lot::Mutex;
5use serde::Serialize;
6use std::collections::HashMap;
7use std::path::Path;
8use std::sync::Arc;
9use tokio::fs;
10
11use compute_runner_api::runner::{DomainArtifactContent, DomainArtifactRequest};
12#[derive(Clone, Debug)]
13struct UploadDescriptor {
14    logical_path: String,
15    name: String,
16    data_type: String,
17}
18
19#[derive(Clone, Debug, Serialize)]
20pub struct UploadedArtifact {
21    pub logical_path: String,
22    pub name: String,
23    pub data_type: String,
24    pub id: Option<String>,
25}
26
27/// Domain ArtifactSink implementation (skeleton).
28#[derive(Clone)]
29pub struct DomainOutput {
30    client: DomainClient,
31    domain_id: String,
32    outputs_prefix: Option<String>,
33    task_id: String,
34    uploads: Arc<Mutex<HashMap<String, UploadedArtifact>>>,
35}
36
37impl DomainOutput {
38    pub fn new(
39        client: DomainClient,
40        domain_id: String,
41        outputs_prefix: Option<String>,
42        task_id: String,
43    ) -> Self {
44        Self::with_store(
45            client,
46            domain_id,
47            outputs_prefix,
48            task_id,
49            Arc::new(Mutex::new(HashMap::new())),
50        )
51    }
52
53    pub fn with_store(
54        client: DomainClient,
55        domain_id: String,
56        outputs_prefix: Option<String>,
57        task_id: String,
58        uploads: Arc<Mutex<HashMap<String, UploadedArtifact>>>,
59    ) -> Self {
60        Self {
61            client,
62            domain_id,
63            outputs_prefix,
64            task_id,
65            uploads,
66        }
67    }
68
69    fn name_suffix(&self) -> String {
70        self.task_id.clone()
71    }
72
73    fn apply_outputs_prefix(&self, rel_path: &str) -> String {
74        let trimmed_rel = rel_path.trim_start_matches('/');
75        match self
76            .outputs_prefix
77            .as_ref()
78            .map(|p| p.trim_matches('/'))
79            .filter(|p| !p.is_empty())
80        {
81            Some(prefix) if trimmed_rel.is_empty() => prefix.to_string(),
82            Some(prefix) => format!("{}/{}", prefix, trimmed_rel),
83            None => trimmed_rel.to_string(),
84        }
85    }
86
87    fn descriptor_for(&self, rel_path: &str) -> UploadDescriptor {
88        let logical_path = self.apply_outputs_prefix(rel_path);
89        if let Some(descriptor) = self.known_descriptor(rel_path, &logical_path) {
90            return descriptor;
91        }
92
93        let sanitized = sanitize_component(&logical_path.replace('/', "_"));
94        let data_type = infer_data_type(rel_path);
95        UploadDescriptor {
96            logical_path,
97            name: format!("{}_{}", sanitized, self.name_suffix()),
98            data_type,
99        }
100    }
101
102    fn known_descriptor(&self, rel_path: &str, logical_path: &str) -> Option<UploadDescriptor> {
103        let suffix = self.name_suffix();
104        let trimmed = rel_path.trim_start_matches('/');
105        if let Some(scan_id) = trimmed
106            .strip_prefix("refined/local/")
107            .and_then(|rest| rest.strip_suffix("/RefinedScan.zip"))
108        {
109            let sanitized_scan = sanitize_component(scan_id);
110            return Some(UploadDescriptor {
111                logical_path: logical_path.to_string(),
112                name: format!("refined_scan_{}_{}", sanitized_scan, suffix),
113                data_type: "refined_scan_zip".into(),
114            });
115        }
116        let descriptor = match trimmed {
117            "job_manifest.json" => UploadDescriptor {
118                logical_path: logical_path.to_string(),
119                name: format!("job_manifest_{}", suffix),
120                data_type: "job_manifest_json".into(),
121            },
122            "refined/global/refined_manifest.json" => UploadDescriptor {
123                logical_path: logical_path.to_string(),
124                name: format!("refined_manifest_{}", suffix),
125                data_type: "refined_manifest_json".into(),
126            },
127            "refined/global/RefinedPointCloudReduced.ply" => UploadDescriptor {
128                logical_path: logical_path.to_string(),
129                name: format!("refined_pointcloud_reduced_{}", suffix),
130                data_type: "refined_pointcloud_ply".into(),
131            },
132            "refined/global/RefinedPointCloud.ply.drc" => UploadDescriptor {
133                logical_path: logical_path.to_string(),
134                name: format!("refined_pointcloud_full_draco_{}", suffix),
135                data_type: "refined_pointcloud_ply_draco".into(),
136            },
137            "refined/global/topology/topology_downsampled_0.111.obj" => UploadDescriptor {
138                logical_path: logical_path.to_string(),
139                name: format!("topologymesh_v1_lowpoly_obj_{}", suffix),
140                data_type: "obj".into(),
141            },
142            "refined/global/topology/topology_downsampled_0.111.glb" => UploadDescriptor {
143                logical_path: logical_path.to_string(),
144                name: format!("topologymesh_v1_lowpoly_glb_{}", suffix),
145                data_type: "glb".into(),
146            },
147            "refined/global/topology/topology_downsampled_0.333.obj" => UploadDescriptor {
148                logical_path: logical_path.to_string(),
149                name: format!("topologymesh_v1_midpoly_obj_{}", suffix),
150                data_type: "obj".into(),
151            },
152            "refined/global/topology/topology_downsampled_0.333.glb" => UploadDescriptor {
153                logical_path: logical_path.to_string(),
154                name: format!("topologymesh_v1_midpoly_glb_{}", suffix),
155                data_type: "glb".into(),
156            },
157            "refined/global/topology/topology.obj" => UploadDescriptor {
158                logical_path: logical_path.to_string(),
159                name: format!("topologymesh_v1_highpoly_obj_{}", suffix),
160                data_type: "obj".into(),
161            },
162            "refined/global/topology/topology.glb" => UploadDescriptor {
163                logical_path: logical_path.to_string(),
164                name: format!("topologymesh_v1_highpoly_glb_{}", suffix),
165                data_type: "glb".into(),
166            },
167            "outputs_index.json" => UploadDescriptor {
168                logical_path: logical_path.to_string(),
169                name: format!("outputs_index_{}", suffix),
170                data_type: "json".into(),
171            },
172            "result.json" => UploadDescriptor {
173                logical_path: logical_path.to_string(),
174                name: format!("result_{}", suffix),
175                data_type: "json".into(),
176            },
177            "scan_data_summary.json" => UploadDescriptor {
178                logical_path: logical_path.to_string(),
179                name: format!("scan_data_summary_{}", suffix),
180                data_type: "json".into(),
181            },
182            _ => return None,
183        };
184        Some(descriptor)
185    }
186}
187
188#[async_trait]
189impl compute_runner_api::ArtifactSink for DomainOutput {
190    async fn put_bytes(&self, rel_path: &str, bytes: &[u8]) -> Result<()> {
191        let descriptor = self.descriptor_for(rel_path);
192        self.put_domain_artifact(DomainArtifactRequest {
193            rel_path,
194            name: &descriptor.name,
195            data_type: &descriptor.data_type,
196            existing_id: None,
197            content: DomainArtifactContent::Bytes(bytes),
198        })
199        .await
200        .map(|_| ())
201    }
202
203    async fn put_file(&self, rel_path: &str, file_path: &std::path::Path) -> Result<()> {
204        let descriptor = self.descriptor_for(rel_path);
205        self.put_domain_artifact(DomainArtifactRequest {
206            rel_path,
207            name: &descriptor.name,
208            data_type: &descriptor.data_type,
209            existing_id: None,
210            content: DomainArtifactContent::File(file_path),
211        })
212        .await
213        .map(|_| ())
214    }
215
216    async fn open_multipart(
217        &self,
218        _rel_path: &str,
219    ) -> Result<Box<dyn compute_runner_api::runner::MultipartUpload>> {
220        // Implemented in later prompt.
221        unimplemented!("multipart not implemented yet")
222    }
223
224    async fn put_domain_artifact(
225        &self,
226        request: DomainArtifactRequest<'_>,
227    ) -> Result<Option<String>> {
228        let logical_path = self.apply_outputs_prefix(request.rel_path);
229        let key = logical_path.clone();
230        let mut existing_id = request.existing_id.map(|s| s.to_string());
231        if existing_id.is_none() {
232            let uploads = self.uploads.lock();
233            existing_id = uploads.get(&key).and_then(|record| record.id.clone());
234        }
235        if existing_id.is_none() {
236            existing_id = self
237                .client
238                .find_artifact_id(&self.domain_id, request.name, request.data_type)
239                .await
240                .map_err(|e| anyhow!(e))?;
241        }
242
243        let bytes_owned;
244        let bytes = match request.content {
245            DomainArtifactContent::Bytes(b) => b,
246            DomainArtifactContent::File(path) => {
247                bytes_owned = fs::read(path).await?;
248                bytes_owned.as_slice()
249            }
250        };
251
252        let upload_req = UploadRequest {
253            domain_id: &self.domain_id,
254            name: request.name,
255            data_type: request.data_type,
256            logical_path: &logical_path,
257            bytes,
258            existing_id: existing_id.as_deref(),
259        };
260
261        let maybe_id = self
262            .client
263            .upload_artifact(upload_req)
264            .await
265            .map_err(|e| anyhow!(e))?;
266        let final_id = maybe_id.or(existing_id);
267
268        let mut uploads = self.uploads.lock();
269        uploads.insert(
270            key,
271            UploadedArtifact {
272                logical_path,
273                name: request.name.to_string(),
274                data_type: request.data_type.to_string(),
275                id: final_id.clone(),
276            },
277        );
278
279        Ok(final_id)
280    }
281}
282
283impl DomainOutput {
284    pub fn uploaded_artifacts(&self) -> Vec<UploadedArtifact> {
285        let guard = self.uploads.lock();
286        guard.values().cloned().collect()
287    }
288
289    pub fn seed_uploaded_artifact(&self, rel_path: &str, id: impl Into<String>) {
290        let descriptor = self.descriptor_for(rel_path);
291        let mut uploads = self.uploads.lock();
292        uploads.insert(
293            descriptor.logical_path.clone(),
294            UploadedArtifact {
295                logical_path: descriptor.logical_path,
296                name: descriptor.name,
297                data_type: descriptor.data_type,
298                id: Some(id.into()),
299            },
300        );
301    }
302}
303
304fn infer_data_type(rel_path: &str) -> String {
305    let ext = Path::new(rel_path)
306        .extension()
307        .and_then(|ext| ext.to_str())
308        .map(|ext| ext.trim().to_ascii_lowercase());
309    match ext.as_deref() {
310        Some("json") => "json".into(),
311        Some("ply") => "ply".into(),
312        Some("drc") => "ply_draco".into(),
313        Some("glb") => "glb".into(),
314        Some("obj") => "obj".into(),
315        Some("csv") => "csv".into(),
316        Some("mp4") => "mp4".into(),
317        Some(other) => format!("{}_data", sanitize_component(other)),
318        None => "binary".into(),
319    }
320}
321
322fn sanitize_component(value: &str) -> String {
323    let sanitized: String = value
324        .chars()
325        .map(|c| {
326            if c.is_ascii_alphanumeric() || matches!(c, '-' | '_') {
327                c
328            } else {
329                '_'
330            }
331        })
332        .collect();
333    if sanitized.is_empty() {
334        "artifact".into()
335    } else {
336        sanitized
337    }
338}