posemesh_compute_node/storage/
client.rs

1use crate::errors::StorageError;
2use crate::storage::token::TokenRef;
3use anyhow::{Context, Result};
4use multer::Multipart;
5use regex::Regex;
6use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, AUTHORIZATION, CONTENT_TYPE};
7use reqwest::{Client, Method, StatusCode};
8use serde::Deserialize;
9use std::collections::HashMap;
10use std::io::{Cursor, Read};
11use std::path::PathBuf;
12use std::time::Duration;
13use tokio::fs;
14use tokio::task;
15use url::Url;
16use uuid::Uuid;
17use zip::ZipArchive;
18
19/// Representation of one multipart section downloaded from Domain.
20#[derive(Debug, Clone)]
21pub struct DownloadedPart {
22    pub id: Option<String>,
23    pub name: Option<String>,
24    pub data_type: Option<String>,
25    pub domain_id: Option<String>,
26    pub path: PathBuf,
27    pub root: PathBuf,
28    pub relative_path: PathBuf,
29    pub extracted_paths: Vec<PathBuf>,
30}
31
32#[derive(Debug)]
33pub struct UploadRequest<'a> {
34    pub domain_id: &'a str,
35    pub name: &'a str,
36    pub data_type: &'a str,
37    pub logical_path: &'a str,
38    pub bytes: &'a [u8],
39    pub existing_id: Option<&'a str>,
40}
41
42/// Domain server HTTP client (skeleton; HTTP added later).
43#[derive(Clone)]
44pub struct DomainClient {
45    pub base: Url,
46    pub token: TokenRef,
47    http: Client,
48}
49impl DomainClient {
50    pub fn new(base: Url, token: TokenRef) -> Result<Self> {
51        let http = Client::builder()
52            .use_rustls_tls()
53            .timeout(Duration::from_secs(30))
54            .build()
55            .context("build reqwest client")?;
56        Ok(Self { base, token, http })
57    }
58
59    pub fn with_timeout(base: Url, token: TokenRef, timeout: Duration) -> Result<Self> {
60        let http = Client::builder()
61            .use_rustls_tls()
62            .timeout(timeout)
63            .build()
64            .context("build reqwest client")?;
65        Ok(Self { base, token, http })
66    }
67
68    fn auth_headers(&self) -> HeaderMap {
69        let mut h = HeaderMap::new();
70        let token = format!("Bearer {}", self.token.get());
71        let mut v = HeaderValue::from_str(&token)
72            .unwrap_or_else(|_| HeaderValue::from_static("Bearer INVALID"));
73        v.set_sensitive(true);
74        h.insert(AUTHORIZATION, v);
75        h
76    }
77
78    /// Download a Domain data item referenced by an absolute URI, persisting each multipart
79    /// part into a temporary file and returning its metadata.
80    pub async fn download_uri(
81        &self,
82        uri: &str,
83    ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
84        let url =
85            Url::parse(uri).map_err(|e| StorageError::Other(format!("parse domain uri: {}", e)))?;
86        let url_for_log = url.clone();
87        let mut headers = self.auth_headers();
88        headers.insert(ACCEPT, HeaderValue::from_static("multipart/form-data"));
89        tracing::debug!(
90            target: "posemesh_compute_node::storage::client",
91            method = "GET",
92            %url_for_log,
93            "Sending domain request"
94        );
95        let res = self
96            .http
97            .get(url)
98            .headers(headers)
99            .send()
100            .await
101            .map_err(|e| StorageError::Network(e.to_string()))?;
102        let status = res.status();
103        tracing::debug!(
104            target: "posemesh_compute_node::storage::client",
105            method = "GET",
106            %url_for_log,
107            status = %status,
108            "Domain response received"
109        );
110        if !status.is_success() {
111            return Err(map_status(status));
112        }
113
114        let root = std::env::temp_dir().join(format!("domain-input-{}", Uuid::new_v4()));
115        fs::create_dir_all(&root)
116            .await
117            .map_err(|e| StorageError::Other(format!("create download root: {}", e)))?;
118        let datasets_root = root.join("datasets");
119        fs::create_dir_all(&datasets_root)
120            .await
121            .map_err(|e| StorageError::Other(format!("create datasets root: {}", e)))?;
122
123        let content_type = res
124            .headers()
125            .get(CONTENT_TYPE)
126            .and_then(|v| v.to_str().ok())
127            .ok_or_else(|| {
128                StorageError::Other("missing Content-Type header on domain response".into())
129            })?;
130        let boundary = multer::parse_boundary(content_type).map_err(|e| {
131            StorageError::Other(format!("invalid multipart boundary from domain: {}", e))
132        })?;
133        let mut multipart = Multipart::new(res.bytes_stream(), boundary);
134        let mut parts = Vec::new();
135
136        while let Some(mut field) = multipart
137            .next_field()
138            .await
139            .map_err(|e| StorageError::Other(format!("read multipart field: {}", e)))?
140        {
141            let disposition = field
142                .headers()
143                .get("content-disposition")
144                .and_then(|v| v.to_str().ok())
145                .unwrap_or_default();
146            let params = parse_disposition_params(disposition);
147            let name = params
148                .get("name")
149                .cloned()
150                .unwrap_or_else(|| "domain-data".into());
151            let data_type = params.get("data-type").cloned().unwrap_or_default();
152
153            let mut buf = Vec::new();
154            while let Some(chunk) = field
155                .chunk()
156                .await
157                .map_err(|e| StorageError::Other(format!("stream multipart chunk: {}", e)))?
158            {
159                buf.extend_from_slice(&chunk);
160            }
161
162            let scan_folder = extract_timestamp(&name)
163                .map(|ts| sanitize_component(&ts))
164                .unwrap_or_else(|| sanitize_component(&name));
165            let scan_dir = datasets_root.join(&scan_folder);
166            fs::create_dir_all(&scan_dir)
167                .await
168                .map_err(|e| StorageError::Other(format!("create scan dir: {}", e)))?;
169
170            let file_name = map_filename(&data_type, &name);
171            let file_path = scan_dir.join(&file_name);
172            if let Some(parent) = file_path.parent() {
173                fs::create_dir_all(parent)
174                    .await
175                    .map_err(|e| StorageError::Other(format!("create parent dir: {}", e)))?;
176            }
177            fs::write(&file_path, &buf)
178                .await
179                .map_err(|e| StorageError::Other(format!("write temp file: {}", e)))?;
180
181            let mut extracted_paths = Vec::new();
182            if data_type == "refined_scan_zip" {
183                let unzip_root = root
184                    .join("refined")
185                    .join("local")
186                    .join(&scan_folder)
187                    .join("sfm");
188                extracted_paths = unzip_refined_scan(buf.clone(), unzip_root).await?;
189            }
190
191            let relative_path = file_path
192                .strip_prefix(&root)
193                .unwrap_or(&file_path)
194                .to_path_buf();
195
196            parts.push(DownloadedPart {
197                id: params.get("id").cloned(),
198                name: Some(name),
199                data_type: Some(data_type),
200                domain_id: params.get("domain-id").cloned(),
201                path: file_path,
202                root: root.clone(),
203                relative_path,
204                extracted_paths,
205            });
206        }
207
208        if parts.is_empty() {
209            return Err(StorageError::Other(
210                "domain response did not contain any data parts".into(),
211            ));
212        }
213
214        Ok(parts)
215    }
216
217    pub async fn upload_artifact(
218        &self,
219        request: UploadRequest<'_>,
220    ) -> std::result::Result<Option<String>, StorageError> {
221        let domain_id = request.domain_id.trim();
222        if domain_id.is_empty() {
223            return Err(StorageError::Other(
224                "missing domain_id for artifact upload".into(),
225            ));
226        }
227        let path = format!("api/v1/domains/{}/data", domain_id);
228        let url = self
229            .base
230            .join(&path)
231            .map_err(|e| StorageError::Other(format!("join upload path: {}", e)))?;
232        let boundary = format!("------------------------{}", Uuid::new_v4().simple());
233        let (body, content_type) = build_multipart_body(
234            &boundary,
235            request.name,
236            request.data_type,
237            domain_id,
238            request.existing_id,
239            request.bytes,
240        );
241        let mut headers = self.auth_headers();
242        let ct_value = HeaderValue::from_str(&content_type)
243            .unwrap_or_else(|_| HeaderValue::from_static("multipart/form-data"));
244        headers.insert(CONTENT_TYPE, ct_value);
245        let method = if request.existing_id.is_some() {
246            Method::PUT
247        } else {
248            Method::POST
249        };
250        tracing::debug!(
251            target: "posemesh_compute_node::storage::client",
252            method = %method,
253            %url,
254            logical_path = request.logical_path,
255            name = request.name,
256            data_type = request.data_type,
257            has_existing_id = request.existing_id.is_some(),
258            "Sending domain upload request"
259        );
260        let res = self
261            .http
262            .request(method.clone(), url.clone())
263            .headers(headers)
264            .body(body)
265            .send()
266            .await
267            .map_err(|e| StorageError::Network(e.to_string()))?;
268        let status = res.status();
269        tracing::debug!(
270            target: "posemesh_compute_node::storage::client",
271            method = %method,
272            %url,
273            status = %status,
274            "Domain upload response received"
275        );
276        if !status.is_success() {
277            return Err(map_status(status));
278        }
279        let text = res
280            .text()
281            .await
282            .map_err(|e| StorageError::Network(e.to_string()))?;
283        if text.trim().is_empty() {
284            return Ok(None);
285        }
286        match serde_json::from_str::<PostDomainDataResponse>(&text) {
287            Ok(parsed) => {
288                let id = parsed.data.into_iter().next().map(|d| d.id);
289                Ok(id)
290            }
291            Err(err) => {
292                tracing::debug!(
293                    target: "posemesh_compute_node::storage::client",
294                    error = %err,
295                    body = %text,
296                    "Failed to parse domain upload response body as JSON"
297                );
298                Ok(None)
299            }
300        }
301    }
302
303    pub async fn find_artifact_id(
304        &self,
305        domain_id: &str,
306        name: &str,
307        data_type: &str,
308    ) -> std::result::Result<Option<String>, StorageError> {
309        let domain_id = domain_id.trim();
310        if domain_id.is_empty() {
311            return Err(StorageError::Other(
312                "missing domain_id for artifact lookup".into(),
313            ));
314        }
315        let path = format!("api/v1/domains/{}/data", domain_id);
316        let url = self
317            .base
318            .join(&path)
319            .map_err(|e| StorageError::Other(format!("join lookup path: {}", e)))?;
320        let mut headers = self.auth_headers();
321        headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
322        tracing::debug!(
323            target: "posemesh_compute_node::storage::client",
324            method = "GET",
325            %url,
326            artifact_name = name,
327            artifact_type = data_type,
328            "Looking up existing domain artifact"
329        );
330        let res = self
331            .http
332            .get(url.clone())
333            .headers(headers)
334            .query(&[("name", name), ("data_type", data_type)])
335            .send()
336            .await
337            .map_err(|e| StorageError::Network(e.to_string()))?;
338        let status = res.status();
339        if status == StatusCode::NOT_FOUND {
340            tracing::debug!(
341                target: "posemesh_compute_node::storage::client",
342                method = "GET",
343                %url,
344                artifact_name = name,
345                artifact_type = data_type,
346                "Artifact lookup returned 404"
347            );
348            return Ok(None);
349        }
350        if !status.is_success() {
351            return Err(map_status(status));
352        }
353        let payload = res
354            .json::<ListDomainDataResponse>()
355            .await
356            .map_err(|e| StorageError::Network(e.to_string()))?;
357        let found = payload
358            .data
359            .into_iter()
360            .find(|item| item.name == name && item.data_type == data_type);
361        Ok(found.map(|item| item.id))
362    }
363}
364
365fn build_multipart_body(
366    boundary: &str,
367    name: &str,
368    data_type: &str,
369    domain_id: &str,
370    existing_id: Option<&str>,
371    bytes: &[u8],
372) -> (Vec<u8>, String) {
373    let mut body = Vec::with_capacity(bytes.len().saturating_add(256));
374    let disposition = if let Some(id) = existing_id {
375        format!(
376            "Content-Disposition: form-data; name=\"{}\"; data-type=\"{}\"; id=\"{}\"; domain-id=\"{}\"\r\n",
377            name, data_type, id, domain_id
378        )
379    } else {
380        format!(
381            "Content-Disposition: form-data; name=\"{}\"; data-type=\"{}\"; domain-id=\"{}\"\r\n",
382            name, data_type, domain_id
383        )
384    };
385    let header = format!(
386        "--{}\r\nContent-Type: application/octet-stream\r\n{}\r\n",
387        boundary, disposition
388    );
389    body.extend_from_slice(header.as_bytes());
390    body.extend_from_slice(bytes);
391    body.extend_from_slice(b"\r\n");
392    body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
393    let content_type = format!("multipart/form-data; boundary={}", boundary);
394    (body, content_type)
395}
396
397#[derive(Debug, Deserialize)]
398struct PostDomainDataResponse {
399    #[serde(default)]
400    data: Vec<PostDomainDataItem>,
401}
402
403#[derive(Debug, Deserialize)]
404struct PostDomainDataItem {
405    #[serde(default)]
406    id: String,
407}
408
409#[derive(Debug, Deserialize)]
410struct ListDomainDataResponse {
411    #[serde(default)]
412    data: Vec<DomainDataSummary>,
413}
414
415#[derive(Debug, Deserialize)]
416struct DomainDataSummary {
417    #[serde(default)]
418    id: String,
419    #[serde(default)]
420    name: String,
421    #[serde(default)]
422    data_type: String,
423}
424
425fn map_status(status: reqwest::StatusCode) -> StorageError {
426    match status.as_u16() {
427        400 => StorageError::BadRequest,
428        401 => StorageError::Unauthorized,
429        404 => StorageError::NotFound,
430        409 => StorageError::Conflict,
431        n if (500..=599).contains(&n) => StorageError::Server(n),
432        other => StorageError::Other(format!("unexpected status: {}", other)),
433    }
434}
435
436fn parse_disposition_params(value: &str) -> HashMap<String, String> {
437    value
438        .split(';')
439        .filter_map(|segment| {
440            let trimmed = segment.trim();
441            if trimmed.is_empty() || trimmed.eq_ignore_ascii_case("form-data") {
442                return None;
443            }
444            let (key, val) = trimmed.split_once('=')?;
445            let cleaned = val.trim().trim_matches('"').to_string();
446            Some((key.trim().to_ascii_lowercase(), cleaned))
447        })
448        .collect()
449}
450
451fn sanitize_component(value: &str) -> String {
452    let sanitized: String = value
453        .chars()
454        .map(|c| {
455            if c.is_ascii_alphanumeric() || matches!(c, '-' | '_') {
456                c
457            } else {
458                '_'
459            }
460        })
461        .collect();
462    if sanitized.is_empty() {
463        "part".into()
464    } else {
465        sanitized
466    }
467}
468
469fn extract_timestamp(name: &str) -> Option<String> {
470    Regex::new(r"\d{4}-\d{2}-\d{2}[_-]\d{2}-\d{2}-\d{2}")
471        .ok()
472        .and_then(|re| re.find(name).map(|m| m.as_str().to_string()))
473}
474
475fn map_filename(data_type: &str, name: &str) -> String {
476    match data_type {
477        "dmt_manifest_json" => "Manifest.json".into(),
478        "dmt_featurepoints_ply" | "dmt_pointcloud_ply" => "FeaturePoints.ply".into(),
479        "dmt_arposes_csv" => "ARposes.csv".into(),
480        "dmt_portal_detections_csv" | "dmt_observations_csv" => "PortalDetections.csv".into(),
481        "dmt_intrinsics_csv" | "dmt_cameraintrinsics_csv" => "CameraIntrinsics.csv".into(),
482        "dmt_frames_csv" => "Frames.csv".into(),
483        "dmt_gyro_csv" => "Gyro.csv".into(),
484        "dmt_accel_csv" => "Accel.csv".into(),
485        "dmt_gyroaccel_csv" => "gyro_accel.csv".into(),
486        "dmt_recording_mp4" => "Frames.mp4".into(),
487        "refined_scan_zip" => "RefinedScan.zip".into(),
488        _ => format!(
489            "{}.{}",
490            sanitize_component(name),
491            sanitize_component(data_type)
492        ),
493    }
494}
495
496async fn unzip_refined_scan(
497    zip_bytes: Vec<u8>,
498    unzip_root: PathBuf,
499) -> Result<Vec<PathBuf>, StorageError> {
500    task::spawn_blocking(move || {
501        std::fs::create_dir_all(&unzip_root)
502            .map_err(|e| StorageError::Other(format!("create unzip dir: {}", e)))?;
503        let mut archive = ZipArchive::new(Cursor::new(zip_bytes))
504            .map_err(|e| StorageError::Other(format!("open zip: {}", e)))?;
505        let mut extracted = Vec::new();
506        for idx in 0..archive.len() {
507            let mut file = archive
508                .by_index(idx)
509                .map_err(|e| StorageError::Other(format!("read zip entry: {}", e)))?;
510            if file.is_dir() {
511                continue;
512            }
513            let mut buf = Vec::new();
514            file.read_to_end(&mut buf)
515                .map_err(|e| StorageError::Other(format!("read zip data: {}", e)))?;
516            let out_path = unzip_root.join(file.name());
517            if let Some(parent) = out_path.parent() {
518                std::fs::create_dir_all(parent)
519                    .map_err(|e| StorageError::Other(format!("create unzip parent: {}", e)))?;
520            }
521            std::fs::write(&out_path, &buf)
522                .map_err(|e| StorageError::Other(format!("write unzip file: {}", e)))?;
523            extracted.push(out_path);
524        }
525        Ok(extracted)
526    })
527    .await
528    .map_err(|e| StorageError::Other(format!("zip task join: {}", e)))?
529}