1use crate::errors::StorageError;
2use crate::storage::token::TokenRef;
3use anyhow::{Context, Result};
4use multer::Multipart;
5use regex::Regex;
6use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, AUTHORIZATION, CONTENT_TYPE};
7use reqwest::{Client, Method, StatusCode};
8use serde::Deserialize;
9use std::collections::HashMap;
10use std::io::{Cursor, Read};
11use std::path::PathBuf;
12use std::time::Duration;
13use tokio::fs;
14use tokio::task;
15use url::Url;
16use uuid::Uuid;
17use zip::ZipArchive;
18
19#[derive(Debug, Clone)]
21pub struct DownloadedPart {
22 pub id: Option<String>,
23 pub name: Option<String>,
24 pub data_type: Option<String>,
25 pub domain_id: Option<String>,
26 pub path: PathBuf,
27 pub root: PathBuf,
28 pub relative_path: PathBuf,
29 pub extracted_paths: Vec<PathBuf>,
30}
31
32#[derive(Debug)]
33pub struct UploadRequest<'a> {
34 pub domain_id: &'a str,
35 pub name: &'a str,
36 pub data_type: &'a str,
37 pub logical_path: &'a str,
38 pub bytes: &'a [u8],
39 pub existing_id: Option<&'a str>,
40}
41
42#[derive(Clone)]
44pub struct DomainClient {
45 pub base: Url,
46 pub token: TokenRef,
47 http: Client,
48}
49impl DomainClient {
50 pub fn new(base: Url, token: TokenRef) -> Result<Self> {
51 let http = Client::builder()
52 .use_rustls_tls()
53 .timeout(Duration::from_secs(30))
54 .build()
55 .context("build reqwest client")?;
56 Ok(Self { base, token, http })
57 }
58
59 pub fn with_timeout(base: Url, token: TokenRef, timeout: Duration) -> Result<Self> {
60 let http = Client::builder()
61 .use_rustls_tls()
62 .timeout(timeout)
63 .build()
64 .context("build reqwest client")?;
65 Ok(Self { base, token, http })
66 }
67
68 fn auth_headers(&self) -> HeaderMap {
69 let mut h = HeaderMap::new();
70 let token = format!("Bearer {}", self.token.get());
71 let mut v = HeaderValue::from_str(&token)
72 .unwrap_or_else(|_| HeaderValue::from_static("Bearer INVALID"));
73 v.set_sensitive(true);
74 h.insert(AUTHORIZATION, v);
75 h
76 }
77
78 pub async fn download_uri(
81 &self,
82 uri: &str,
83 ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
84 let url =
85 Url::parse(uri).map_err(|e| StorageError::Other(format!("parse domain uri: {}", e)))?;
86 let url_for_log = url.clone();
87 let mut headers = self.auth_headers();
88 headers.insert(ACCEPT, HeaderValue::from_static("multipart/form-data"));
89 tracing::debug!(
90 target: "posemesh_compute_node::storage::client",
91 method = "GET",
92 %url_for_log,
93 "Sending domain request"
94 );
95 let res = self
96 .http
97 .get(url)
98 .headers(headers)
99 .send()
100 .await
101 .map_err(|e| StorageError::Network(e.to_string()))?;
102 let status = res.status();
103 tracing::debug!(
104 target: "posemesh_compute_node::storage::client",
105 method = "GET",
106 %url_for_log,
107 status = %status,
108 "Domain response received"
109 );
110 if !status.is_success() {
111 return Err(map_status(status));
112 }
113
114 let root = std::env::temp_dir().join(format!("domain-input-{}", Uuid::new_v4()));
115 fs::create_dir_all(&root)
116 .await
117 .map_err(|e| StorageError::Other(format!("create download root: {}", e)))?;
118 let datasets_root = root.join("datasets");
119 fs::create_dir_all(&datasets_root)
120 .await
121 .map_err(|e| StorageError::Other(format!("create datasets root: {}", e)))?;
122
123 let content_type = res
124 .headers()
125 .get(CONTENT_TYPE)
126 .and_then(|v| v.to_str().ok())
127 .ok_or_else(|| {
128 StorageError::Other("missing Content-Type header on domain response".into())
129 })?;
130 let boundary = multer::parse_boundary(content_type).map_err(|e| {
131 StorageError::Other(format!("invalid multipart boundary from domain: {}", e))
132 })?;
133 let mut multipart = Multipart::new(res.bytes_stream(), boundary);
134 let mut parts = Vec::new();
135
136 while let Some(mut field) = multipart
137 .next_field()
138 .await
139 .map_err(|e| StorageError::Other(format!("read multipart field: {}", e)))?
140 {
141 let disposition = field
142 .headers()
143 .get("content-disposition")
144 .and_then(|v| v.to_str().ok())
145 .unwrap_or_default();
146 let params = parse_disposition_params(disposition);
147 let name = params
148 .get("name")
149 .cloned()
150 .unwrap_or_else(|| "domain-data".into());
151 let data_type = params.get("data-type").cloned().unwrap_or_default();
152
153 let mut buf = Vec::new();
154 while let Some(chunk) = field
155 .chunk()
156 .await
157 .map_err(|e| StorageError::Other(format!("stream multipart chunk: {}", e)))?
158 {
159 buf.extend_from_slice(&chunk);
160 }
161
162 let scan_folder = extract_timestamp(&name)
163 .map(|ts| sanitize_component(&ts))
164 .unwrap_or_else(|| sanitize_component(&name));
165 let scan_dir = datasets_root.join(&scan_folder);
166 fs::create_dir_all(&scan_dir)
167 .await
168 .map_err(|e| StorageError::Other(format!("create scan dir: {}", e)))?;
169
170 let file_name = map_filename(&data_type, &name);
171 let file_path = scan_dir.join(&file_name);
172 if let Some(parent) = file_path.parent() {
173 fs::create_dir_all(parent)
174 .await
175 .map_err(|e| StorageError::Other(format!("create parent dir: {}", e)))?;
176 }
177 fs::write(&file_path, &buf)
178 .await
179 .map_err(|e| StorageError::Other(format!("write temp file: {}", e)))?;
180
181 let mut extracted_paths = Vec::new();
182 if data_type == "refined_scan_zip" {
183 let unzip_root = root
184 .join("refined")
185 .join("local")
186 .join(&scan_folder)
187 .join("sfm");
188 extracted_paths = unzip_refined_scan(buf.clone(), unzip_root).await?;
189 }
190
191 let relative_path = file_path
192 .strip_prefix(&root)
193 .unwrap_or(&file_path)
194 .to_path_buf();
195
196 parts.push(DownloadedPart {
197 id: params.get("id").cloned(),
198 name: Some(name),
199 data_type: Some(data_type),
200 domain_id: params.get("domain-id").cloned(),
201 path: file_path,
202 root: root.clone(),
203 relative_path,
204 extracted_paths,
205 });
206 }
207
208 if parts.is_empty() {
209 return Err(StorageError::Other(
210 "domain response did not contain any data parts".into(),
211 ));
212 }
213
214 Ok(parts)
215 }
216
217 pub async fn upload_artifact(
218 &self,
219 request: UploadRequest<'_>,
220 ) -> std::result::Result<Option<String>, StorageError> {
221 let domain_id = request.domain_id.trim();
222 if domain_id.is_empty() {
223 return Err(StorageError::Other(
224 "missing domain_id for artifact upload".into(),
225 ));
226 }
227 let path = format!("api/v1/domains/{}/data", domain_id);
228 let url = self
229 .base
230 .join(&path)
231 .map_err(|e| StorageError::Other(format!("join upload path: {}", e)))?;
232 let boundary = format!("------------------------{}", Uuid::new_v4().simple());
233 let (body, content_type) = build_multipart_body(
234 &boundary,
235 request.name,
236 request.data_type,
237 domain_id,
238 request.existing_id,
239 request.bytes,
240 );
241 let mut headers = self.auth_headers();
242 let ct_value = HeaderValue::from_str(&content_type)
243 .unwrap_or_else(|_| HeaderValue::from_static("multipart/form-data"));
244 headers.insert(CONTENT_TYPE, ct_value);
245 let method = if request.existing_id.is_some() {
246 Method::PUT
247 } else {
248 Method::POST
249 };
250 tracing::debug!(
251 target: "posemesh_compute_node::storage::client",
252 method = %method,
253 %url,
254 logical_path = request.logical_path,
255 name = request.name,
256 data_type = request.data_type,
257 has_existing_id = request.existing_id.is_some(),
258 "Sending domain upload request"
259 );
260 let res = self
261 .http
262 .request(method.clone(), url.clone())
263 .headers(headers)
264 .body(body)
265 .send()
266 .await
267 .map_err(|e| StorageError::Network(e.to_string()))?;
268 let status = res.status();
269 tracing::debug!(
270 target: "posemesh_compute_node::storage::client",
271 method = %method,
272 %url,
273 status = %status,
274 "Domain upload response received"
275 );
276 if !status.is_success() {
277 return Err(map_status(status));
278 }
279 let text = res
280 .text()
281 .await
282 .map_err(|e| StorageError::Network(e.to_string()))?;
283 if text.trim().is_empty() {
284 return Ok(None);
285 }
286 match serde_json::from_str::<PostDomainDataResponse>(&text) {
287 Ok(parsed) => {
288 let id = parsed.data.into_iter().next().map(|d| d.id);
289 Ok(id)
290 }
291 Err(err) => {
292 tracing::debug!(
293 target: "posemesh_compute_node::storage::client",
294 error = %err,
295 body = %text,
296 "Failed to parse domain upload response body as JSON"
297 );
298 Ok(None)
299 }
300 }
301 }
302
303 pub async fn find_artifact_id(
304 &self,
305 domain_id: &str,
306 name: &str,
307 data_type: &str,
308 ) -> std::result::Result<Option<String>, StorageError> {
309 let domain_id = domain_id.trim();
310 if domain_id.is_empty() {
311 return Err(StorageError::Other(
312 "missing domain_id for artifact lookup".into(),
313 ));
314 }
315 let path = format!("api/v1/domains/{}/data", domain_id);
316 let url = self
317 .base
318 .join(&path)
319 .map_err(|e| StorageError::Other(format!("join lookup path: {}", e)))?;
320 let mut headers = self.auth_headers();
321 headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
322 tracing::debug!(
323 target: "posemesh_compute_node::storage::client",
324 method = "GET",
325 %url,
326 artifact_name = name,
327 artifact_type = data_type,
328 "Looking up existing domain artifact"
329 );
330 let res = self
331 .http
332 .get(url.clone())
333 .headers(headers)
334 .query(&[("name", name), ("data_type", data_type)])
335 .send()
336 .await
337 .map_err(|e| StorageError::Network(e.to_string()))?;
338 let status = res.status();
339 if status == StatusCode::NOT_FOUND {
340 tracing::debug!(
341 target: "posemesh_compute_node::storage::client",
342 method = "GET",
343 %url,
344 artifact_name = name,
345 artifact_type = data_type,
346 "Artifact lookup returned 404"
347 );
348 return Ok(None);
349 }
350 if !status.is_success() {
351 return Err(map_status(status));
352 }
353 let payload = res
354 .json::<ListDomainDataResponse>()
355 .await
356 .map_err(|e| StorageError::Network(e.to_string()))?;
357 let found = payload
358 .data
359 .into_iter()
360 .find(|item| item.name == name && item.data_type == data_type);
361 Ok(found.map(|item| item.id))
362 }
363}
364
365fn build_multipart_body(
366 boundary: &str,
367 name: &str,
368 data_type: &str,
369 domain_id: &str,
370 existing_id: Option<&str>,
371 bytes: &[u8],
372) -> (Vec<u8>, String) {
373 let mut body = Vec::with_capacity(bytes.len().saturating_add(256));
374 let disposition = if let Some(id) = existing_id {
375 format!(
376 "Content-Disposition: form-data; name=\"{}\"; data-type=\"{}\"; id=\"{}\"; domain-id=\"{}\"\r\n",
377 name, data_type, id, domain_id
378 )
379 } else {
380 format!(
381 "Content-Disposition: form-data; name=\"{}\"; data-type=\"{}\"; domain-id=\"{}\"\r\n",
382 name, data_type, domain_id
383 )
384 };
385 let header = format!(
386 "--{}\r\nContent-Type: application/octet-stream\r\n{}\r\n",
387 boundary, disposition
388 );
389 body.extend_from_slice(header.as_bytes());
390 body.extend_from_slice(bytes);
391 body.extend_from_slice(b"\r\n");
392 body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
393 let content_type = format!("multipart/form-data; boundary={}", boundary);
394 (body, content_type)
395}
396
397#[derive(Debug, Deserialize)]
398struct PostDomainDataResponse {
399 #[serde(default)]
400 data: Vec<PostDomainDataItem>,
401}
402
403#[derive(Debug, Deserialize)]
404struct PostDomainDataItem {
405 #[serde(default)]
406 id: String,
407}
408
409#[derive(Debug, Deserialize)]
410struct ListDomainDataResponse {
411 #[serde(default)]
412 data: Vec<DomainDataSummary>,
413}
414
415#[derive(Debug, Deserialize)]
416struct DomainDataSummary {
417 #[serde(default)]
418 id: String,
419 #[serde(default)]
420 name: String,
421 #[serde(default)]
422 data_type: String,
423}
424
425fn map_status(status: reqwest::StatusCode) -> StorageError {
426 match status.as_u16() {
427 400 => StorageError::BadRequest,
428 401 => StorageError::Unauthorized,
429 404 => StorageError::NotFound,
430 409 => StorageError::Conflict,
431 n if (500..=599).contains(&n) => StorageError::Server(n),
432 other => StorageError::Other(format!("unexpected status: {}", other)),
433 }
434}
435
436fn parse_disposition_params(value: &str) -> HashMap<String, String> {
437 value
438 .split(';')
439 .filter_map(|segment| {
440 let trimmed = segment.trim();
441 if trimmed.is_empty() || trimmed.eq_ignore_ascii_case("form-data") {
442 return None;
443 }
444 let (key, val) = trimmed.split_once('=')?;
445 let cleaned = val.trim().trim_matches('"').to_string();
446 Some((key.trim().to_ascii_lowercase(), cleaned))
447 })
448 .collect()
449}
450
451fn sanitize_component(value: &str) -> String {
452 let sanitized: String = value
453 .chars()
454 .map(|c| {
455 if c.is_ascii_alphanumeric() || matches!(c, '-' | '_') {
456 c
457 } else {
458 '_'
459 }
460 })
461 .collect();
462 if sanitized.is_empty() {
463 "part".into()
464 } else {
465 sanitized
466 }
467}
468
469fn extract_timestamp(name: &str) -> Option<String> {
470 Regex::new(r"\d{4}-\d{2}-\d{2}[_-]\d{2}-\d{2}-\d{2}")
471 .ok()
472 .and_then(|re| re.find(name).map(|m| m.as_str().to_string()))
473}
474
475fn map_filename(data_type: &str, name: &str) -> String {
476 match data_type {
477 "dmt_manifest_json" => "Manifest.json".into(),
478 "dmt_featurepoints_ply" | "dmt_pointcloud_ply" => "FeaturePoints.ply".into(),
479 "dmt_arposes_csv" => "ARposes.csv".into(),
480 "dmt_portal_detections_csv" | "dmt_observations_csv" => "PortalDetections.csv".into(),
481 "dmt_intrinsics_csv" | "dmt_cameraintrinsics_csv" => "CameraIntrinsics.csv".into(),
482 "dmt_frames_csv" => "Frames.csv".into(),
483 "dmt_gyro_csv" => "Gyro.csv".into(),
484 "dmt_accel_csv" => "Accel.csv".into(),
485 "dmt_gyroaccel_csv" => "gyro_accel.csv".into(),
486 "dmt_recording_mp4" => "Frames.mp4".into(),
487 "refined_scan_zip" => "RefinedScan.zip".into(),
488 _ => format!(
489 "{}.{}",
490 sanitize_component(name),
491 sanitize_component(data_type)
492 ),
493 }
494}
495
496async fn unzip_refined_scan(
497 zip_bytes: Vec<u8>,
498 unzip_root: PathBuf,
499) -> Result<Vec<PathBuf>, StorageError> {
500 task::spawn_blocking(move || {
501 std::fs::create_dir_all(&unzip_root)
502 .map_err(|e| StorageError::Other(format!("create unzip dir: {}", e)))?;
503 let mut archive = ZipArchive::new(Cursor::new(zip_bytes))
504 .map_err(|e| StorageError::Other(format!("open zip: {}", e)))?;
505 let mut extracted = Vec::new();
506 for idx in 0..archive.len() {
507 let mut file = archive
508 .by_index(idx)
509 .map_err(|e| StorageError::Other(format!("read zip entry: {}", e)))?;
510 if file.is_dir() {
511 continue;
512 }
513 let mut buf = Vec::new();
514 file.read_to_end(&mut buf)
515 .map_err(|e| StorageError::Other(format!("read zip data: {}", e)))?;
516 let out_path = unzip_root.join(file.name());
517 if let Some(parent) = out_path.parent() {
518 std::fs::create_dir_all(parent)
519 .map_err(|e| StorageError::Other(format!("create unzip parent: {}", e)))?;
520 }
521 std::fs::write(&out_path, &buf)
522 .map_err(|e| StorageError::Other(format!("write unzip file: {}", e)))?;
523 extracted.push(out_path);
524 }
525 Ok(extracted)
526 })
527 .await
528 .map_err(|e| StorageError::Other(format!("zip task join: {}", e)))?
529}