1use super::client::{DomainClient, UploadRequest};
2use anyhow::{anyhow, Result};
3use async_trait::async_trait;
4use parking_lot::Mutex;
5use serde::Serialize;
6use std::collections::HashMap;
7use std::path::Path;
8use std::sync::Arc;
9use tokio::fs;
10
11use compute_runner_api::runner::{DomainArtifactContent, DomainArtifactRequest};
12#[derive(Clone, Debug)]
13struct UploadDescriptor {
14 logical_path: String,
15 name: String,
16 data_type: String,
17}
18
19#[derive(Clone, Debug, Serialize)]
20pub struct UploadedArtifact {
21 pub logical_path: String,
22 pub name: String,
23 pub data_type: String,
24 pub id: Option<String>,
25}
26
27#[derive(Clone)]
29pub struct DomainOutput {
30 client: DomainClient,
31 domain_id: String,
32 outputs_prefix: Option<String>,
33 task_id: String,
34 uploads: Arc<Mutex<HashMap<String, UploadedArtifact>>>,
35}
36
37impl DomainOutput {
38 pub fn new(
39 client: DomainClient,
40 domain_id: String,
41 outputs_prefix: Option<String>,
42 task_id: String,
43 ) -> Self {
44 Self::with_store(
45 client,
46 domain_id,
47 outputs_prefix,
48 task_id,
49 Arc::new(Mutex::new(HashMap::new())),
50 )
51 }
52
53 pub fn with_store(
54 client: DomainClient,
55 domain_id: String,
56 outputs_prefix: Option<String>,
57 task_id: String,
58 uploads: Arc<Mutex<HashMap<String, UploadedArtifact>>>,
59 ) -> Self {
60 Self {
61 client,
62 domain_id,
63 outputs_prefix,
64 task_id,
65 uploads,
66 }
67 }
68
69 fn name_suffix(&self) -> String {
70 self.task_id.clone()
71 }
72
73 fn apply_outputs_prefix(&self, rel_path: &str) -> String {
74 let trimmed_rel = rel_path.trim_start_matches('/');
75 match self
76 .outputs_prefix
77 .as_ref()
78 .map(|p| p.trim_matches('/'))
79 .filter(|p| !p.is_empty())
80 {
81 Some(prefix) if trimmed_rel.is_empty() => prefix.to_string(),
82 Some(prefix) => format!("{}/{}", prefix, trimmed_rel),
83 None => trimmed_rel.to_string(),
84 }
85 }
86
87 fn descriptor_for(&self, rel_path: &str) -> UploadDescriptor {
88 let logical_path = self.apply_outputs_prefix(rel_path);
89 if let Some(descriptor) = self.known_descriptor(rel_path, &logical_path) {
90 return descriptor;
91 }
92
93 let sanitized = sanitize_component(&logical_path.replace('/', "_"));
94 let data_type = infer_data_type(rel_path);
95 UploadDescriptor {
96 logical_path,
97 name: format!("{}_{}", sanitized, self.name_suffix()),
98 data_type,
99 }
100 }
101
102 fn known_descriptor(&self, rel_path: &str, logical_path: &str) -> Option<UploadDescriptor> {
103 let suffix = self.name_suffix();
104 let trimmed = rel_path.trim_start_matches('/');
105 if let Some(scan_id) = trimmed
106 .strip_prefix("refined/local/")
107 .and_then(|rest| rest.strip_suffix("/RefinedScan.zip"))
108 {
109 let sanitized_scan = sanitize_component(scan_id);
110 return Some(UploadDescriptor {
111 logical_path: logical_path.to_string(),
112 name: format!("refined_scan_{}_{}", sanitized_scan, suffix),
113 data_type: "refined_scan_zip".into(),
114 });
115 }
116 let descriptor = match trimmed {
117 "job_manifest.json" => UploadDescriptor {
118 logical_path: logical_path.to_string(),
119 name: format!("job_manifest_{}", suffix),
120 data_type: "job_manifest_json".into(),
121 },
122 "refined/global/refined_manifest.json" => UploadDescriptor {
123 logical_path: logical_path.to_string(),
124 name: format!("refined_manifest_{}", suffix),
125 data_type: "refined_manifest_json".into(),
126 },
127 "refined/global/RefinedPointCloudReduced.ply" => UploadDescriptor {
128 logical_path: logical_path.to_string(),
129 name: format!("refined_pointcloud_reduced_{}", suffix),
130 data_type: "refined_pointcloud_ply".into(),
131 },
132 "refined/global/RefinedPointCloud.ply.drc" => UploadDescriptor {
133 logical_path: logical_path.to_string(),
134 name: format!("refined_pointcloud_full_draco_{}", suffix),
135 data_type: "refined_pointcloud_ply_draco".into(),
136 },
137 "refined/global/topology/topology_downsampled_0.111.obj" => UploadDescriptor {
138 logical_path: logical_path.to_string(),
139 name: format!("topologymesh_v1_lowpoly_obj_{}", suffix),
140 data_type: "obj".into(),
141 },
142 "refined/global/topology/topology_downsampled_0.111.glb" => UploadDescriptor {
143 logical_path: logical_path.to_string(),
144 name: format!("topologymesh_v1_lowpoly_glb_{}", suffix),
145 data_type: "glb".into(),
146 },
147 "refined/global/topology/topology_downsampled_0.333.obj" => UploadDescriptor {
148 logical_path: logical_path.to_string(),
149 name: format!("topologymesh_v1_midpoly_obj_{}", suffix),
150 data_type: "obj".into(),
151 },
152 "refined/global/topology/topology_downsampled_0.333.glb" => UploadDescriptor {
153 logical_path: logical_path.to_string(),
154 name: format!("topologymesh_v1_midpoly_glb_{}", suffix),
155 data_type: "glb".into(),
156 },
157 "refined/global/topology/topology.obj" => UploadDescriptor {
158 logical_path: logical_path.to_string(),
159 name: format!("topologymesh_v1_highpoly_obj_{}", suffix),
160 data_type: "obj".into(),
161 },
162 "refined/global/topology/topology.glb" => UploadDescriptor {
163 logical_path: logical_path.to_string(),
164 name: format!("topologymesh_v1_highpoly_glb_{}", suffix),
165 data_type: "glb".into(),
166 },
167 "outputs_index.json" => UploadDescriptor {
168 logical_path: logical_path.to_string(),
169 name: format!("outputs_index_{}", suffix),
170 data_type: "json".into(),
171 },
172 "result.json" => UploadDescriptor {
173 logical_path: logical_path.to_string(),
174 name: format!("result_{}", suffix),
175 data_type: "json".into(),
176 },
177 "scan_data_summary.json" => UploadDescriptor {
178 logical_path: logical_path.to_string(),
179 name: format!("scan_data_summary_{}", suffix),
180 data_type: "json".into(),
181 },
182 _ => return None,
183 };
184 Some(descriptor)
185 }
186}
187
188#[async_trait]
189impl compute_runner_api::ArtifactSink for DomainOutput {
190 async fn put_bytes(&self, rel_path: &str, bytes: &[u8]) -> Result<()> {
191 let descriptor = self.descriptor_for(rel_path);
192 self.put_domain_artifact(DomainArtifactRequest {
193 rel_path,
194 name: &descriptor.name,
195 data_type: &descriptor.data_type,
196 existing_id: None,
197 content: DomainArtifactContent::Bytes(bytes),
198 })
199 .await
200 .map(|_| ())
201 }
202
203 async fn put_file(&self, rel_path: &str, file_path: &std::path::Path) -> Result<()> {
204 let descriptor = self.descriptor_for(rel_path);
205 self.put_domain_artifact(DomainArtifactRequest {
206 rel_path,
207 name: &descriptor.name,
208 data_type: &descriptor.data_type,
209 existing_id: None,
210 content: DomainArtifactContent::File(file_path),
211 })
212 .await
213 .map(|_| ())
214 }
215
216 async fn open_multipart(
217 &self,
218 _rel_path: &str,
219 ) -> Result<Box<dyn compute_runner_api::runner::MultipartUpload>> {
220 unimplemented!("multipart not implemented yet")
222 }
223
224 async fn put_domain_artifact(
225 &self,
226 request: DomainArtifactRequest<'_>,
227 ) -> Result<Option<String>> {
228 let logical_path = self.apply_outputs_prefix(request.rel_path);
229 let key = logical_path.clone();
230 let mut existing_id = request.existing_id.map(|s| s.to_string());
231 if existing_id.is_none() {
232 let uploads = self.uploads.lock();
233 existing_id = uploads.get(&key).and_then(|record| record.id.clone());
234 }
235 if existing_id.is_none() {
236 existing_id = self
237 .client
238 .find_artifact_id(&self.domain_id, request.name, request.data_type)
239 .await
240 .map_err(|e| anyhow!(e))?;
241 }
242
243 let bytes_owned;
244 let bytes = match request.content {
245 DomainArtifactContent::Bytes(b) => b,
246 DomainArtifactContent::File(path) => {
247 bytes_owned = fs::read(path).await?;
248 bytes_owned.as_slice()
249 }
250 };
251
252 let upload_req = UploadRequest {
253 domain_id: &self.domain_id,
254 name: request.name,
255 data_type: request.data_type,
256 logical_path: &logical_path,
257 bytes,
258 existing_id: existing_id.as_deref(),
259 };
260
261 let maybe_id = self
262 .client
263 .upload_artifact(upload_req)
264 .await
265 .map_err(|e| anyhow!(e))?;
266 let final_id = maybe_id.or(existing_id);
267
268 let mut uploads = self.uploads.lock();
269 uploads.insert(
270 key,
271 UploadedArtifact {
272 logical_path,
273 name: request.name.to_string(),
274 data_type: request.data_type.to_string(),
275 id: final_id.clone(),
276 },
277 );
278
279 Ok(final_id)
280 }
281}
282
283impl DomainOutput {
284 pub fn uploaded_artifacts(&self) -> Vec<UploadedArtifact> {
285 let guard = self.uploads.lock();
286 guard.values().cloned().collect()
287 }
288
289 pub fn seed_uploaded_artifact(&self, rel_path: &str, id: impl Into<String>) {
290 let descriptor = self.descriptor_for(rel_path);
291 let mut uploads = self.uploads.lock();
292 uploads.insert(
293 descriptor.logical_path.clone(),
294 UploadedArtifact {
295 logical_path: descriptor.logical_path,
296 name: descriptor.name,
297 data_type: descriptor.data_type,
298 id: Some(id.into()),
299 },
300 );
301 }
302}
303
304fn infer_data_type(rel_path: &str) -> String {
305 let ext = Path::new(rel_path)
306 .extension()
307 .and_then(|ext| ext.to_str())
308 .map(|ext| ext.trim().to_ascii_lowercase());
309 match ext.as_deref() {
310 Some("json") => "json".into(),
311 Some("ply") => "ply".into(),
312 Some("drc") => "ply_draco".into(),
313 Some("glb") => "glb".into(),
314 Some("obj") => "obj".into(),
315 Some("csv") => "csv".into(),
316 Some("mp4") => "mp4".into(),
317 Some(other) => format!("{}_data", sanitize_component(other)),
318 None => "binary".into(),
319 }
320}
321
322fn sanitize_component(value: &str) -> String {
323 let sanitized: String = value
324 .chars()
325 .map(|c| {
326 if c.is_ascii_alphanumeric() || matches!(c, '-' | '_') {
327 c
328 } else {
329 '_'
330 }
331 })
332 .collect();
333 if sanitized.is_empty() {
334 "artifact".into()
335 } else {
336 sanitized
337 }
338}