posemesh_compute_node/storage/
output.rs

1use super::client::{DomainClient, UploadFileRequest, UploadRequest};
2use anyhow::{anyhow, Result};
3use async_trait::async_trait;
4use parking_lot::Mutex;
5use serde::Serialize;
6use std::collections::HashMap;
7use std::path::Path;
8use std::sync::Arc;
9
10use compute_runner_api::runner::{DomainArtifactContent, DomainArtifactRequest};
11#[derive(Clone, Debug)]
12struct UploadDescriptor {
13    logical_path: String,
14    name: String,
15    data_type: String,
16}
17
18#[derive(Clone, Debug, Serialize)]
19pub struct UploadedArtifact {
20    pub logical_path: String,
21    pub name: String,
22    pub data_type: String,
23    pub id: Option<String>,
24}
25
26/// Domain ArtifactSink implementation (skeleton).
27#[derive(Clone)]
28pub struct DomainOutput {
29    client: DomainClient,
30    domain_id: String,
31    outputs_prefix: Option<String>,
32    task_id: String,
33    uploads: Arc<Mutex<HashMap<String, UploadedArtifact>>>,
34}
35
36impl DomainOutput {
37    pub fn new(
38        client: DomainClient,
39        domain_id: String,
40        outputs_prefix: Option<String>,
41        task_id: String,
42    ) -> Self {
43        Self::with_store(
44            client,
45            domain_id,
46            outputs_prefix,
47            task_id,
48            Arc::new(Mutex::new(HashMap::new())),
49        )
50    }
51
52    pub fn with_store(
53        client: DomainClient,
54        domain_id: String,
55        outputs_prefix: Option<String>,
56        task_id: String,
57        uploads: Arc<Mutex<HashMap<String, UploadedArtifact>>>,
58    ) -> Self {
59        Self {
60            client,
61            domain_id,
62            outputs_prefix,
63            task_id,
64            uploads,
65        }
66    }
67
68    fn name_suffix(&self) -> String {
69        self.task_id.clone()
70    }
71
72    fn apply_outputs_prefix(&self, rel_path: &str) -> String {
73        let trimmed_rel = rel_path.trim_start_matches('/');
74        match self
75            .outputs_prefix
76            .as_ref()
77            .map(|p| p.trim_matches('/'))
78            .filter(|p| !p.is_empty())
79        {
80            Some(prefix) if trimmed_rel.is_empty() => prefix.to_string(),
81            Some(prefix) => format!("{}/{}", prefix, trimmed_rel),
82            None => trimmed_rel.to_string(),
83        }
84    }
85
86    fn descriptor_for(&self, rel_path: &str) -> UploadDescriptor {
87        let logical_path = self.apply_outputs_prefix(rel_path);
88        let sanitized = sanitize_component(&logical_path.replace('/', "_"));
89        let data_type = infer_data_type(rel_path);
90        UploadDescriptor {
91            logical_path,
92            name: format!("{}_{}", sanitized, self.name_suffix()),
93            data_type,
94        }
95    }
96}
97
98#[async_trait]
99impl compute_runner_api::ArtifactSink for DomainOutput {
100    async fn put_bytes(&self, rel_path: &str, bytes: &[u8]) -> Result<()> {
101        let descriptor = self.descriptor_for(rel_path);
102        self.put_domain_artifact(DomainArtifactRequest {
103            rel_path,
104            name: &descriptor.name,
105            data_type: &descriptor.data_type,
106            existing_id: None,
107            content: DomainArtifactContent::Bytes(bytes),
108        })
109        .await
110        .map(|_| ())
111    }
112
113    async fn put_file(&self, rel_path: &str, file_path: &std::path::Path) -> Result<()> {
114        let descriptor = self.descriptor_for(rel_path);
115        self.put_domain_artifact(DomainArtifactRequest {
116            rel_path,
117            name: &descriptor.name,
118            data_type: &descriptor.data_type,
119            existing_id: None,
120            content: DomainArtifactContent::File(file_path),
121        })
122        .await
123        .map(|_| ())
124    }
125
126    async fn open_multipart(
127        &self,
128        _rel_path: &str,
129    ) -> Result<Box<dyn compute_runner_api::runner::MultipartUpload>> {
130        // Implemented in later prompt.
131        unimplemented!("multipart not implemented yet")
132    }
133
134    async fn put_domain_artifact(
135        &self,
136        request: DomainArtifactRequest<'_>,
137    ) -> Result<Option<String>> {
138        let logical_path = self.apply_outputs_prefix(request.rel_path);
139        let key = logical_path.clone();
140        let mut existing_id = request.existing_id.map(|s| s.to_string());
141        if existing_id.is_none() {
142            let uploads = self.uploads.lock();
143            existing_id = uploads.get(&key).and_then(|record| record.id.clone());
144        }
145        if existing_id.is_none() {
146            existing_id = self
147                .client
148                .find_artifact_id(&self.domain_id, request.name, request.data_type)
149                .await
150                .map_err(|e| anyhow!(e))?;
151        }
152
153        let maybe_id = match request.content {
154            DomainArtifactContent::Bytes(bytes) => {
155                let upload_req = UploadRequest {
156                    domain_id: &self.domain_id,
157                    name: request.name,
158                    data_type: request.data_type,
159                    logical_path: &logical_path,
160                    bytes,
161                    existing_id: existing_id.as_deref(),
162                };
163                self.client
164                    .upload_artifact(upload_req)
165                    .await
166                    .map_err(|e| anyhow!(e))?
167            }
168            DomainArtifactContent::File(path) => {
169                let upload_req = UploadFileRequest {
170                    domain_id: &self.domain_id,
171                    name: request.name,
172                    data_type: request.data_type,
173                    logical_path: &logical_path,
174                    path,
175                    existing_id: existing_id.as_deref(),
176                };
177                self.client
178                    .upload_artifact_file(upload_req)
179                    .await
180                    .map_err(|e| anyhow!(e))?
181            }
182        };
183        let final_id = maybe_id.or(existing_id);
184
185        let mut uploads = self.uploads.lock();
186        uploads.insert(
187            key,
188            UploadedArtifact {
189                logical_path,
190                name: request.name.to_string(),
191                data_type: request.data_type.to_string(),
192                id: final_id.clone(),
193            },
194        );
195
196        Ok(final_id)
197    }
198}
199
200impl DomainOutput {
201    pub fn uploaded_artifacts(&self) -> Vec<UploadedArtifact> {
202        let guard = self.uploads.lock();
203        guard.values().cloned().collect()
204    }
205
206    pub fn seed_uploaded_artifact(&self, rel_path: &str, id: impl Into<String>) {
207        let descriptor = self.descriptor_for(rel_path);
208        let mut uploads = self.uploads.lock();
209        uploads.insert(
210            descriptor.logical_path.clone(),
211            UploadedArtifact {
212                logical_path: descriptor.logical_path,
213                name: descriptor.name,
214                data_type: descriptor.data_type,
215                id: Some(id.into()),
216            },
217        );
218    }
219}
220
221fn infer_data_type(rel_path: &str) -> String {
222    let ext = Path::new(rel_path)
223        .extension()
224        .and_then(|ext| ext.to_str())
225        .map(|ext| ext.trim().to_ascii_lowercase());
226    match ext.as_deref() {
227        Some("json") => "json".into(),
228        Some("ply") => "ply".into(),
229        Some("drc") => "ply_draco".into(),
230        Some("glb") => "glb".into(),
231        Some("obj") => "obj".into(),
232        Some("csv") => "csv".into(),
233        Some("mp4") => "mp4".into(),
234        Some(other) => format!("{}_data", sanitize_component(other)),
235        None => "binary".into(),
236    }
237}
238
239fn sanitize_component(value: &str) -> String {
240    let sanitized: String = value
241        .chars()
242        .map(|c| {
243            if c.is_ascii_alphanumeric() || matches!(c, '-' | '_') {
244                c
245            } else {
246                '_'
247            }
248        })
249        .collect();
250    if sanitized.is_empty() {
251        "artifact".into()
252    } else {
253        sanitized
254    }
255}