posemesh_compute_node/storage/
client.rs1use crate::errors::StorageError;
2use crate::storage::token::TokenRef;
3use anyhow::{Context, Result};
4use multer::Multipart;
5use regex::Regex;
6use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, AUTHORIZATION, CONTENT_TYPE};
7use reqwest::{Client, Method};
8use serde::Deserialize;
9use std::collections::HashMap;
10use std::io::{Cursor, Read};
11use std::path::PathBuf;
12use std::time::Duration;
13use tokio::fs;
14use tokio::task;
15use url::Url;
16use uuid::Uuid;
17use zip::ZipArchive;
18
19#[derive(Debug, Clone)]
21pub struct DownloadedPart {
22 pub id: Option<String>,
23 pub name: Option<String>,
24 pub data_type: Option<String>,
25 pub domain_id: Option<String>,
26 pub path: PathBuf,
27 pub root: PathBuf,
28 pub relative_path: PathBuf,
29 pub extracted_paths: Vec<PathBuf>,
30}
31
32#[derive(Debug)]
33pub struct UploadRequest<'a> {
34 pub domain_id: &'a str,
35 pub name: &'a str,
36 pub data_type: &'a str,
37 pub logical_path: &'a str,
38 pub bytes: &'a [u8],
39 pub existing_id: Option<&'a str>,
40}
41
42#[derive(Clone)]
44pub struct DomainClient {
45 pub base: Url,
46 pub token: TokenRef,
47 http: Client,
48}
49impl DomainClient {
50 pub fn new(base: Url, token: TokenRef) -> Result<Self> {
51 let http = Client::builder()
52 .use_rustls_tls()
53 .timeout(Duration::from_secs(30))
54 .build()
55 .context("build reqwest client")?;
56 Ok(Self { base, token, http })
57 }
58
59 pub fn with_timeout(base: Url, token: TokenRef, timeout: Duration) -> Result<Self> {
60 let http = Client::builder()
61 .use_rustls_tls()
62 .timeout(timeout)
63 .build()
64 .context("build reqwest client")?;
65 Ok(Self { base, token, http })
66 }
67
68 fn auth_headers(&self) -> HeaderMap {
69 let mut h = HeaderMap::new();
70 let token = format!("Bearer {}", self.token.get());
71 let mut v = HeaderValue::from_str(&token)
72 .unwrap_or_else(|_| HeaderValue::from_static("Bearer INVALID"));
73 v.set_sensitive(true);
74 h.insert(AUTHORIZATION, v);
75 h
76 }
77
78 pub async fn download_uri(
81 &self,
82 uri: &str,
83 ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
84 let url =
85 Url::parse(uri).map_err(|e| StorageError::Other(format!("parse domain uri: {}", e)))?;
86 let url_for_log = url.clone();
87 let mut headers = self.auth_headers();
88 headers.insert(ACCEPT, HeaderValue::from_static("multipart/form-data"));
89 tracing::debug!(
90 target: "posemesh_compute_node::storage::client",
91 method = "GET",
92 %url_for_log,
93 "Sending domain request"
94 );
95 let res = self
96 .http
97 .get(url)
98 .headers(headers)
99 .send()
100 .await
101 .map_err(|e| StorageError::Network(e.to_string()))?;
102 let status = res.status();
103 tracing::debug!(
104 target: "posemesh_compute_node::storage::client",
105 method = "GET",
106 %url_for_log,
107 status = %status,
108 "Domain response received"
109 );
110 if !status.is_success() {
111 return Err(map_status(status));
112 }
113
114 let root = std::env::temp_dir().join(format!("domain-input-{}", Uuid::new_v4()));
115 fs::create_dir_all(&root)
116 .await
117 .map_err(|e| StorageError::Other(format!("create download root: {}", e)))?;
118 let datasets_root = root.join("datasets");
119 fs::create_dir_all(&datasets_root)
120 .await
121 .map_err(|e| StorageError::Other(format!("create datasets root: {}", e)))?;
122
123 let content_type = res
124 .headers()
125 .get(CONTENT_TYPE)
126 .and_then(|v| v.to_str().ok())
127 .ok_or_else(|| {
128 StorageError::Other("missing Content-Type header on domain response".into())
129 })?;
130 let boundary = multer::parse_boundary(content_type).map_err(|e| {
131 StorageError::Other(format!("invalid multipart boundary from domain: {}", e))
132 })?;
133 let mut multipart = Multipart::new(res.bytes_stream(), boundary);
134 let mut parts = Vec::new();
135
136 while let Some(mut field) = multipart
137 .next_field()
138 .await
139 .map_err(|e| StorageError::Other(format!("read multipart field: {}", e)))?
140 {
141 let disposition = field
142 .headers()
143 .get("content-disposition")
144 .and_then(|v| v.to_str().ok())
145 .unwrap_or_default();
146 let params = parse_disposition_params(disposition);
147 let name = params
148 .get("name")
149 .cloned()
150 .unwrap_or_else(|| "domain-data".into());
151 let data_type = params.get("data-type").cloned().unwrap_or_default();
152
153 let mut buf = Vec::new();
154 while let Some(chunk) = field
155 .chunk()
156 .await
157 .map_err(|e| StorageError::Other(format!("stream multipart chunk: {}", e)))?
158 {
159 buf.extend_from_slice(&chunk);
160 }
161
162 let scan_folder = extract_timestamp(&name)
163 .map(|ts| sanitize_component(&ts))
164 .unwrap_or_else(|| sanitize_component(&name));
165 let scan_dir = datasets_root.join(&scan_folder);
166 fs::create_dir_all(&scan_dir)
167 .await
168 .map_err(|e| StorageError::Other(format!("create scan dir: {}", e)))?;
169
170 let file_name = map_filename(&data_type, &name);
171 let file_path = scan_dir.join(&file_name);
172 if let Some(parent) = file_path.parent() {
173 fs::create_dir_all(parent)
174 .await
175 .map_err(|e| StorageError::Other(format!("create parent dir: {}", e)))?;
176 }
177 fs::write(&file_path, &buf)
178 .await
179 .map_err(|e| StorageError::Other(format!("write temp file: {}", e)))?;
180
181 let mut extracted_paths = Vec::new();
182 if data_type == "refined_scan_zip" {
183 let unzip_root = root
184 .join("refined")
185 .join("local")
186 .join(&scan_folder)
187 .join("sfm");
188 extracted_paths = unzip_refined_scan(buf.clone(), unzip_root).await?;
189 }
190
191 let relative_path = file_path
192 .strip_prefix(&root)
193 .unwrap_or(&file_path)
194 .to_path_buf();
195
196 parts.push(DownloadedPart {
197 id: params.get("id").cloned(),
198 name: Some(name),
199 data_type: Some(data_type),
200 domain_id: params.get("domain-id").cloned(),
201 path: file_path,
202 root: root.clone(),
203 relative_path,
204 extracted_paths,
205 });
206 }
207
208 if parts.is_empty() {
209 return Err(StorageError::Other(
210 "domain response did not contain any data parts".into(),
211 ));
212 }
213
214 Ok(parts)
215 }
216
217 pub async fn upload_artifact(
218 &self,
219 request: UploadRequest<'_>,
220 ) -> std::result::Result<Option<String>, StorageError> {
221 let domain_id = request.domain_id.trim();
222 if domain_id.is_empty() {
223 return Err(StorageError::Other(
224 "missing domain_id for artifact upload".into(),
225 ));
226 }
227 let path = format!("api/v1/domains/{}/data", domain_id);
228 let url = self
229 .base
230 .join(&path)
231 .map_err(|e| StorageError::Other(format!("join upload path: {}", e)))?;
232 let boundary = format!("------------------------{}", Uuid::new_v4().simple());
233 let (body, content_type) = build_multipart_body(
234 &boundary,
235 request.name,
236 request.data_type,
237 domain_id,
238 request.existing_id,
239 request.bytes,
240 );
241 let mut headers = self.auth_headers();
242 let ct_value = HeaderValue::from_str(&content_type)
243 .unwrap_or_else(|_| HeaderValue::from_static("multipart/form-data"));
244 headers.insert(CONTENT_TYPE, ct_value);
245 let method = if request.existing_id.is_some() {
246 Method::PUT
247 } else {
248 Method::POST
249 };
250 tracing::debug!(
251 target: "posemesh_compute_node::storage::client",
252 method = %method,
253 %url,
254 logical_path = request.logical_path,
255 name = request.name,
256 data_type = request.data_type,
257 has_existing_id = request.existing_id.is_some(),
258 "Sending domain upload request"
259 );
260 let res = self
261 .http
262 .request(method.clone(), url.clone())
263 .headers(headers)
264 .body(body)
265 .send()
266 .await
267 .map_err(|e| StorageError::Network(e.to_string()))?;
268 let status = res.status();
269 tracing::debug!(
270 target: "posemesh_compute_node::storage::client",
271 method = %method,
272 %url,
273 status = %status,
274 "Domain upload response received"
275 );
276 if !status.is_success() {
277 return Err(map_status(status));
278 }
279 let text = res
280 .text()
281 .await
282 .map_err(|e| StorageError::Network(e.to_string()))?;
283 if text.trim().is_empty() {
284 return Ok(None);
285 }
286 match serde_json::from_str::<PostDomainDataResponse>(&text) {
287 Ok(parsed) => {
288 let id = parsed.data.into_iter().next().map(|d| d.id);
289 Ok(id)
290 }
291 Err(err) => {
292 tracing::debug!(
293 target: "posemesh_compute_node::storage::client",
294 error = %err,
295 body = %text,
296 "Failed to parse domain upload response body as JSON"
297 );
298 Ok(None)
299 }
300 }
301 }
302}
303
304fn build_multipart_body(
305 boundary: &str,
306 name: &str,
307 data_type: &str,
308 domain_id: &str,
309 existing_id: Option<&str>,
310 bytes: &[u8],
311) -> (Vec<u8>, String) {
312 let mut body = Vec::with_capacity(bytes.len().saturating_add(256));
313 let disposition = if let Some(id) = existing_id {
314 format!(
315 "Content-Disposition: form-data; name=\"{}\"; data-type=\"{}\"; id=\"{}\"; domain-id=\"{}\"\r\n",
316 name, data_type, id, domain_id
317 )
318 } else {
319 format!(
320 "Content-Disposition: form-data; name=\"{}\"; data-type=\"{}\"; domain-id=\"{}\"\r\n",
321 name, data_type, domain_id
322 )
323 };
324 let header = format!(
325 "--{}\r\nContent-Type: application/octet-stream\r\n{}\r\n",
326 boundary, disposition
327 );
328 body.extend_from_slice(header.as_bytes());
329 body.extend_from_slice(bytes);
330 body.extend_from_slice(b"\r\n");
331 body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
332 let content_type = format!("multipart/form-data; boundary={}", boundary);
333 (body, content_type)
334}
335
336#[derive(Debug, Deserialize)]
337struct PostDomainDataResponse {
338 #[serde(default)]
339 data: Vec<PostDomainDataItem>,
340}
341
342#[derive(Debug, Deserialize)]
343struct PostDomainDataItem {
344 #[serde(default)]
345 id: String,
346}
347
348fn map_status(status: reqwest::StatusCode) -> StorageError {
349 match status.as_u16() {
350 400 => StorageError::BadRequest,
351 401 => StorageError::Unauthorized,
352 404 => StorageError::NotFound,
353 409 => StorageError::Conflict,
354 n if (500..=599).contains(&n) => StorageError::Server(n),
355 other => StorageError::Other(format!("unexpected status: {}", other)),
356 }
357}
358
359fn parse_disposition_params(value: &str) -> HashMap<String, String> {
360 value
361 .split(';')
362 .filter_map(|segment| {
363 let trimmed = segment.trim();
364 if trimmed.is_empty() || trimmed.eq_ignore_ascii_case("form-data") {
365 return None;
366 }
367 let (key, val) = trimmed.split_once('=')?;
368 let cleaned = val.trim().trim_matches('"').to_string();
369 Some((key.trim().to_ascii_lowercase(), cleaned))
370 })
371 .collect()
372}
373
374fn sanitize_component(value: &str) -> String {
375 let sanitized: String = value
376 .chars()
377 .map(|c| {
378 if c.is_ascii_alphanumeric() || matches!(c, '-' | '_') {
379 c
380 } else {
381 '_'
382 }
383 })
384 .collect();
385 if sanitized.is_empty() {
386 "part".into()
387 } else {
388 sanitized
389 }
390}
391
392fn extract_timestamp(name: &str) -> Option<String> {
393 Regex::new(r"\d{4}-\d{2}-\d{2}[_-]\d{2}-\d{2}-\d{2}")
394 .ok()
395 .and_then(|re| re.find(name).map(|m| m.as_str().to_string()))
396}
397
398fn map_filename(data_type: &str, name: &str) -> String {
399 match data_type {
400 "dmt_manifest_json" => "Manifest.json".into(),
401 "dmt_featurepoints_ply" | "dmt_pointcloud_ply" => "FeaturePoints.ply".into(),
402 "dmt_arposes_csv" => "ARposes.csv".into(),
403 "dmt_portal_detections_csv" | "dmt_observations_csv" => "PortalDetections.csv".into(),
404 "dmt_intrinsics_csv" | "dmt_cameraintrinsics_csv" => "CameraIntrinsics.csv".into(),
405 "dmt_frames_csv" => "Frames.csv".into(),
406 "dmt_gyro_csv" => "Gyro.csv".into(),
407 "dmt_accel_csv" => "Accel.csv".into(),
408 "dmt_gyroaccel_csv" => "gyro_accel.csv".into(),
409 "dmt_recording_mp4" => "Frames.mp4".into(),
410 "refined_scan_zip" => "RefinedScan.zip".into(),
411 _ => format!(
412 "{}.{}",
413 sanitize_component(name),
414 sanitize_component(data_type)
415 ),
416 }
417}
418
419async fn unzip_refined_scan(
420 zip_bytes: Vec<u8>,
421 unzip_root: PathBuf,
422) -> Result<Vec<PathBuf>, StorageError> {
423 task::spawn_blocking(move || {
424 std::fs::create_dir_all(&unzip_root)
425 .map_err(|e| StorageError::Other(format!("create unzip dir: {}", e)))?;
426 let mut archive = ZipArchive::new(Cursor::new(zip_bytes))
427 .map_err(|e| StorageError::Other(format!("open zip: {}", e)))?;
428 let mut extracted = Vec::new();
429 for idx in 0..archive.len() {
430 let mut file = archive
431 .by_index(idx)
432 .map_err(|e| StorageError::Other(format!("read zip entry: {}", e)))?;
433 if file.is_dir() {
434 continue;
435 }
436 let mut buf = Vec::new();
437 file.read_to_end(&mut buf)
438 .map_err(|e| StorageError::Other(format!("read zip data: {}", e)))?;
439 let out_path = unzip_root.join(file.name());
440 if let Some(parent) = out_path.parent() {
441 std::fs::create_dir_all(parent)
442 .map_err(|e| StorageError::Other(format!("create unzip parent: {}", e)))?;
443 }
444 std::fs::write(&out_path, &buf)
445 .map_err(|e| StorageError::Other(format!("write unzip file: {}", e)))?;
446 extracted.push(out_path);
447 }
448 Ok(extracted)
449 })
450 .await
451 .map_err(|e| StorageError::Other(format!("zip task join: {}", e)))?
452}