posemesh_compute_node/storage/
output.rs1use super::client::{DomainClient, UploadRequest};
2use anyhow::{anyhow, Result};
3use async_trait::async_trait;
4use parking_lot::Mutex;
5use serde::Serialize;
6use std::collections::HashMap;
7use std::path::Path;
8use std::sync::Arc;
9use tokio::fs;
10
11use compute_runner_api::runner::{DomainArtifactContent, DomainArtifactRequest};
12#[derive(Clone, Debug)]
13struct UploadDescriptor {
14 logical_path: String,
15 name: String,
16 data_type: String,
17}
18
19#[derive(Clone, Debug, Serialize)]
20pub struct UploadedArtifact {
21 pub logical_path: String,
22 pub name: String,
23 pub data_type: String,
24 pub id: Option<String>,
25}
26
27#[derive(Clone)]
29pub struct DomainOutput {
30 client: DomainClient,
31 domain_id: String,
32 outputs_prefix: Option<String>,
33 task_id: String,
34 uploads: Arc<Mutex<HashMap<String, UploadedArtifact>>>,
35}
36
37impl DomainOutput {
38 pub fn new(
39 client: DomainClient,
40 domain_id: String,
41 outputs_prefix: Option<String>,
42 task_id: String,
43 ) -> Self {
44 Self::with_store(
45 client,
46 domain_id,
47 outputs_prefix,
48 task_id,
49 Arc::new(Mutex::new(HashMap::new())),
50 )
51 }
52
53 pub fn with_store(
54 client: DomainClient,
55 domain_id: String,
56 outputs_prefix: Option<String>,
57 task_id: String,
58 uploads: Arc<Mutex<HashMap<String, UploadedArtifact>>>,
59 ) -> Self {
60 Self {
61 client,
62 domain_id,
63 outputs_prefix,
64 task_id,
65 uploads,
66 }
67 }
68
69 fn name_suffix(&self) -> String {
70 self.task_id.clone()
71 }
72
73 fn apply_outputs_prefix(&self, rel_path: &str) -> String {
74 let trimmed_rel = rel_path.trim_start_matches('/');
75 match self
76 .outputs_prefix
77 .as_ref()
78 .map(|p| p.trim_matches('/'))
79 .filter(|p| !p.is_empty())
80 {
81 Some(prefix) if trimmed_rel.is_empty() => prefix.to_string(),
82 Some(prefix) => format!("{}/{}", prefix, trimmed_rel),
83 None => trimmed_rel.to_string(),
84 }
85 }
86
87 fn descriptor_for(&self, rel_path: &str) -> UploadDescriptor {
88 let logical_path = self.apply_outputs_prefix(rel_path);
89 let sanitized = sanitize_component(&logical_path.replace('/', "_"));
90 let data_type = infer_data_type(rel_path);
91 UploadDescriptor {
92 logical_path,
93 name: format!("{}_{}", sanitized, self.name_suffix()),
94 data_type,
95 }
96 }
97}
98
99#[async_trait]
100impl compute_runner_api::ArtifactSink for DomainOutput {
101 async fn put_bytes(&self, rel_path: &str, bytes: &[u8]) -> Result<()> {
102 let descriptor = self.descriptor_for(rel_path);
103 self.put_domain_artifact(DomainArtifactRequest {
104 rel_path,
105 name: &descriptor.name,
106 data_type: &descriptor.data_type,
107 existing_id: None,
108 content: DomainArtifactContent::Bytes(bytes),
109 })
110 .await
111 .map(|_| ())
112 }
113
114 async fn put_file(&self, rel_path: &str, file_path: &std::path::Path) -> Result<()> {
115 let descriptor = self.descriptor_for(rel_path);
116 self.put_domain_artifact(DomainArtifactRequest {
117 rel_path,
118 name: &descriptor.name,
119 data_type: &descriptor.data_type,
120 existing_id: None,
121 content: DomainArtifactContent::File(file_path),
122 })
123 .await
124 .map(|_| ())
125 }
126
127 async fn open_multipart(
128 &self,
129 _rel_path: &str,
130 ) -> Result<Box<dyn compute_runner_api::runner::MultipartUpload>> {
131 unimplemented!("multipart not implemented yet")
133 }
134
135 async fn put_domain_artifact(
136 &self,
137 request: DomainArtifactRequest<'_>,
138 ) -> Result<Option<String>> {
139 let logical_path = self.apply_outputs_prefix(request.rel_path);
140 let key = logical_path.clone();
141 let mut existing_id = request.existing_id.map(|s| s.to_string());
142 if existing_id.is_none() {
143 let uploads = self.uploads.lock();
144 existing_id = uploads.get(&key).and_then(|record| record.id.clone());
145 }
146 if existing_id.is_none() {
147 existing_id = self
148 .client
149 .find_artifact_id(&self.domain_id, request.name, request.data_type)
150 .await
151 .map_err(|e| anyhow!(e))?;
152 }
153
154 let bytes_owned;
155 let bytes = match request.content {
156 DomainArtifactContent::Bytes(b) => b,
157 DomainArtifactContent::File(path) => {
158 bytes_owned = fs::read(path).await?;
159 bytes_owned.as_slice()
160 }
161 };
162
163 let upload_req = UploadRequest {
164 domain_id: &self.domain_id,
165 name: request.name,
166 data_type: request.data_type,
167 logical_path: &logical_path,
168 bytes,
169 existing_id: existing_id.as_deref(),
170 };
171
172 let maybe_id = self
173 .client
174 .upload_artifact(upload_req)
175 .await
176 .map_err(|e| anyhow!(e))?;
177 let final_id = maybe_id.or(existing_id);
178
179 let mut uploads = self.uploads.lock();
180 uploads.insert(
181 key,
182 UploadedArtifact {
183 logical_path,
184 name: request.name.to_string(),
185 data_type: request.data_type.to_string(),
186 id: final_id.clone(),
187 },
188 );
189
190 Ok(final_id)
191 }
192}
193
194impl DomainOutput {
195 pub fn uploaded_artifacts(&self) -> Vec<UploadedArtifact> {
196 let guard = self.uploads.lock();
197 guard.values().cloned().collect()
198 }
199
200 pub fn seed_uploaded_artifact(&self, rel_path: &str, id: impl Into<String>) {
201 let descriptor = self.descriptor_for(rel_path);
202 let mut uploads = self.uploads.lock();
203 uploads.insert(
204 descriptor.logical_path.clone(),
205 UploadedArtifact {
206 logical_path: descriptor.logical_path,
207 name: descriptor.name,
208 data_type: descriptor.data_type,
209 id: Some(id.into()),
210 },
211 );
212 }
213}
214
215fn infer_data_type(rel_path: &str) -> String {
216 let ext = Path::new(rel_path)
217 .extension()
218 .and_then(|ext| ext.to_str())
219 .map(|ext| ext.trim().to_ascii_lowercase());
220 match ext.as_deref() {
221 Some("json") => "json".into(),
222 Some("ply") => "ply".into(),
223 Some("drc") => "ply_draco".into(),
224 Some("glb") => "glb".into(),
225 Some("obj") => "obj".into(),
226 Some("csv") => "csv".into(),
227 Some("mp4") => "mp4".into(),
228 Some(other) => format!("{}_data", sanitize_component(other)),
229 None => "binary".into(),
230 }
231}
232
233fn sanitize_component(value: &str) -> String {
234 let sanitized: String = value
235 .chars()
236 .map(|c| {
237 if c.is_ascii_alphanumeric() || matches!(c, '-' | '_') {
238 c
239 } else {
240 '_'
241 }
242 })
243 .collect();
244 if sanitized.is_empty() {
245 "artifact".into()
246 } else {
247 sanitized
248 }
249}