posemesh_compute_node/storage/
output.rs1use super::client::{DomainClient, UploadFileRequest, UploadRequest};
2use anyhow::{anyhow, Result};
3use async_trait::async_trait;
4use parking_lot::Mutex;
5use serde::Serialize;
6use std::collections::HashMap;
7use std::path::Path;
8use std::sync::Arc;
9
10use compute_runner_api::runner::{DomainArtifactContent, DomainArtifactRequest};
11#[derive(Clone, Debug)]
12struct UploadDescriptor {
13 logical_path: String,
14 name: String,
15 data_type: String,
16}
17
18#[derive(Clone, Debug, Serialize)]
19pub struct UploadedArtifact {
20 pub logical_path: String,
21 pub name: String,
22 pub data_type: String,
23 pub id: Option<String>,
24}
25
26#[derive(Clone)]
28pub struct DomainOutput {
29 client: DomainClient,
30 domain_id: String,
31 outputs_prefix: Option<String>,
32 task_id: String,
33 uploads: Arc<Mutex<HashMap<String, UploadedArtifact>>>,
34}
35
36impl DomainOutput {
37 pub fn new(
38 client: DomainClient,
39 domain_id: String,
40 outputs_prefix: Option<String>,
41 task_id: String,
42 ) -> Self {
43 Self::with_store(
44 client,
45 domain_id,
46 outputs_prefix,
47 task_id,
48 Arc::new(Mutex::new(HashMap::new())),
49 )
50 }
51
52 pub fn with_store(
53 client: DomainClient,
54 domain_id: String,
55 outputs_prefix: Option<String>,
56 task_id: String,
57 uploads: Arc<Mutex<HashMap<String, UploadedArtifact>>>,
58 ) -> Self {
59 Self {
60 client,
61 domain_id,
62 outputs_prefix,
63 task_id,
64 uploads,
65 }
66 }
67
68 fn name_suffix(&self) -> String {
69 self.task_id.clone()
70 }
71
72 fn apply_outputs_prefix(&self, rel_path: &str) -> String {
73 let trimmed_rel = rel_path.trim_start_matches('/');
74 match self
75 .outputs_prefix
76 .as_ref()
77 .map(|p| p.trim_matches('/'))
78 .filter(|p| !p.is_empty())
79 {
80 Some(prefix) if trimmed_rel.is_empty() => prefix.to_string(),
81 Some(prefix) => format!("{}/{}", prefix, trimmed_rel),
82 None => trimmed_rel.to_string(),
83 }
84 }
85
86 fn descriptor_for(&self, rel_path: &str) -> UploadDescriptor {
87 let logical_path = self.apply_outputs_prefix(rel_path);
88 let sanitized = sanitize_component(&logical_path.replace('/', "_"));
89 let data_type = infer_data_type(rel_path);
90 UploadDescriptor {
91 logical_path,
92 name: format!("{}_{}", sanitized, self.name_suffix()),
93 data_type,
94 }
95 }
96}
97
98#[async_trait]
99impl compute_runner_api::ArtifactSink for DomainOutput {
100 async fn put_bytes(&self, rel_path: &str, bytes: &[u8]) -> Result<()> {
101 let descriptor = self.descriptor_for(rel_path);
102 self.put_domain_artifact(DomainArtifactRequest {
103 rel_path,
104 name: &descriptor.name,
105 data_type: &descriptor.data_type,
106 existing_id: None,
107 content: DomainArtifactContent::Bytes(bytes),
108 })
109 .await
110 .map(|_| ())
111 }
112
113 async fn put_file(&self, rel_path: &str, file_path: &std::path::Path) -> Result<()> {
114 let descriptor = self.descriptor_for(rel_path);
115 self.put_domain_artifact(DomainArtifactRequest {
116 rel_path,
117 name: &descriptor.name,
118 data_type: &descriptor.data_type,
119 existing_id: None,
120 content: DomainArtifactContent::File(file_path),
121 })
122 .await
123 .map(|_| ())
124 }
125
126 async fn open_multipart(
127 &self,
128 _rel_path: &str,
129 ) -> Result<Box<dyn compute_runner_api::runner::MultipartUpload>> {
130 unimplemented!("multipart not implemented yet")
132 }
133
134 async fn put_domain_artifact(
135 &self,
136 request: DomainArtifactRequest<'_>,
137 ) -> Result<Option<String>> {
138 let logical_path = self.apply_outputs_prefix(request.rel_path);
139 let key = logical_path.clone();
140 let mut existing_id = request.existing_id.map(|s| s.to_string());
141 if existing_id.is_none() {
142 let uploads = self.uploads.lock();
143 existing_id = uploads.get(&key).and_then(|record| record.id.clone());
144 }
145 if existing_id.is_none() {
146 existing_id = self
147 .client
148 .find_artifact_id(&self.domain_id, request.name, request.data_type)
149 .await
150 .map_err(|e| anyhow!(e))?;
151 }
152
153 let maybe_id = match request.content {
154 DomainArtifactContent::Bytes(bytes) => {
155 let upload_req = UploadRequest {
156 domain_id: &self.domain_id,
157 name: request.name,
158 data_type: request.data_type,
159 logical_path: &logical_path,
160 bytes,
161 existing_id: existing_id.as_deref(),
162 };
163 self.client
164 .upload_artifact(upload_req)
165 .await
166 .map_err(|e| anyhow!(e))?
167 }
168 DomainArtifactContent::File(path) => {
169 let upload_req = UploadFileRequest {
170 domain_id: &self.domain_id,
171 name: request.name,
172 data_type: request.data_type,
173 logical_path: &logical_path,
174 path,
175 existing_id: existing_id.as_deref(),
176 };
177 self.client
178 .upload_artifact_file(upload_req)
179 .await
180 .map_err(|e| anyhow!(e))?
181 }
182 };
183 let final_id = maybe_id.or(existing_id);
184
185 let mut uploads = self.uploads.lock();
186 uploads.insert(
187 key,
188 UploadedArtifact {
189 logical_path,
190 name: request.name.to_string(),
191 data_type: request.data_type.to_string(),
192 id: final_id.clone(),
193 },
194 );
195
196 Ok(final_id)
197 }
198}
199
200impl DomainOutput {
201 pub fn uploaded_artifacts(&self) -> Vec<UploadedArtifact> {
202 let guard = self.uploads.lock();
203 guard.values().cloned().collect()
204 }
205
206 pub fn seed_uploaded_artifact(&self, rel_path: &str, id: impl Into<String>) {
207 let descriptor = self.descriptor_for(rel_path);
208 let mut uploads = self.uploads.lock();
209 uploads.insert(
210 descriptor.logical_path.clone(),
211 UploadedArtifact {
212 logical_path: descriptor.logical_path,
213 name: descriptor.name,
214 data_type: descriptor.data_type,
215 id: Some(id.into()),
216 },
217 );
218 }
219}
220
221fn infer_data_type(rel_path: &str) -> String {
222 let ext = Path::new(rel_path)
223 .extension()
224 .and_then(|ext| ext.to_str())
225 .map(|ext| ext.trim().to_ascii_lowercase());
226 match ext.as_deref() {
227 Some("json") => "json".into(),
228 Some("ply") => "ply".into(),
229 Some("drc") => "ply_draco".into(),
230 Some("glb") => "glb".into(),
231 Some("obj") => "obj".into(),
232 Some("csv") => "csv".into(),
233 Some("mp4") => "mp4".into(),
234 Some(other) => format!("{}_data", sanitize_component(other)),
235 None => "binary".into(),
236 }
237}
238
239fn sanitize_component(value: &str) -> String {
240 let sanitized: String = value
241 .chars()
242 .map(|c| {
243 if c.is_ascii_alphanumeric() || matches!(c, '-' | '_') {
244 c
245 } else {
246 '_'
247 }
248 })
249 .collect();
250 if sanitized.is_empty() {
251 "artifact".into()
252 } else {
253 sanitized
254 }
255}