posemesh_compute_node/storage/
client.rs

1use crate::errors::StorageError;
2use crate::storage::token::TokenRef;
3use anyhow::{Context, Result};
4use multer::Multipart;
5use regex::Regex;
6use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, AUTHORIZATION, CONTENT_TYPE};
7use reqwest::{Client, Method, StatusCode};
8use serde::Deserialize;
9use std::collections::HashMap;
10use std::path::PathBuf;
11use std::time::Duration;
12use tokio::fs;
13use url::Url;
14use uuid::Uuid;
15// zip extraction moved to reconstruction-specific runner
16
17/// Representation of one multipart section downloaded from Domain.
18#[derive(Debug, Clone)]
19pub struct DownloadedPart {
20    pub id: Option<String>,
21    pub name: Option<String>,
22    pub data_type: Option<String>,
23    pub domain_id: Option<String>,
24    pub path: PathBuf,
25    pub root: PathBuf,
26    pub relative_path: PathBuf,
27    pub extracted_paths: Vec<PathBuf>,
28}
29
30#[derive(Debug)]
31pub struct UploadRequest<'a> {
32    pub domain_id: &'a str,
33    pub name: &'a str,
34    pub data_type: &'a str,
35    pub logical_path: &'a str,
36    pub bytes: &'a [u8],
37    pub existing_id: Option<&'a str>,
38}
39
40/// Domain server HTTP client (skeleton; HTTP added later).
41#[derive(Clone)]
42pub struct DomainClient {
43    pub base: Url,
44    pub token: TokenRef,
45    http: Client,
46}
47impl DomainClient {
48    pub fn new(base: Url, token: TokenRef) -> Result<Self> {
49        let http = Client::builder()
50            .use_rustls_tls()
51            .timeout(Duration::from_secs(30))
52            .build()
53            .context("build reqwest client")?;
54        Ok(Self { base, token, http })
55    }
56
57    pub fn with_timeout(base: Url, token: TokenRef, timeout: Duration) -> Result<Self> {
58        let http = Client::builder()
59            .use_rustls_tls()
60            .timeout(timeout)
61            .build()
62            .context("build reqwest client")?;
63        Ok(Self { base, token, http })
64    }
65
66    fn auth_headers(&self) -> HeaderMap {
67        let mut h = HeaderMap::new();
68        let token = format!("Bearer {}", self.token.get());
69        let mut v = HeaderValue::from_str(&token)
70            .unwrap_or_else(|_| HeaderValue::from_static("Bearer INVALID"));
71        v.set_sensitive(true);
72        h.insert(AUTHORIZATION, v);
73        h
74    }
75
76    /// Download a Domain data item referenced by an absolute URI, persisting each multipart
77    /// part into a temporary file and returning its metadata.
78    pub async fn download_uri(
79        &self,
80        uri: &str,
81    ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
82        let url =
83            Url::parse(uri).map_err(|e| StorageError::Other(format!("parse domain uri: {}", e)))?;
84        let url_for_log = url.clone();
85        let mut headers = self.auth_headers();
86        headers.insert(ACCEPT, HeaderValue::from_static("multipart/form-data"));
87        tracing::debug!(
88            target: "posemesh_compute_node::storage::client",
89            method = "GET",
90            %url_for_log,
91            "Sending domain request"
92        );
93        let res = self
94            .http
95            .get(url)
96            .headers(headers)
97            .send()
98            .await
99            .map_err(|e| StorageError::Network(e.to_string()))?;
100        let status = res.status();
101        tracing::debug!(
102            target: "posemesh_compute_node::storage::client",
103            method = "GET",
104            %url_for_log,
105            status = %status,
106            "Domain response received"
107        );
108        if !status.is_success() {
109            return Err(map_status(status));
110        }
111
112        let root = std::env::temp_dir().join(format!("domain-input-{}", Uuid::new_v4()));
113        fs::create_dir_all(&root)
114            .await
115            .map_err(|e| StorageError::Other(format!("create download root: {}", e)))?;
116        let datasets_root = root.join("datasets");
117        fs::create_dir_all(&datasets_root)
118            .await
119            .map_err(|e| StorageError::Other(format!("create datasets root: {}", e)))?;
120
121        let content_type = res
122            .headers()
123            .get(CONTENT_TYPE)
124            .and_then(|v| v.to_str().ok())
125            .ok_or_else(|| {
126                StorageError::Other("missing Content-Type header on domain response".into())
127            })?;
128        let boundary = multer::parse_boundary(content_type).map_err(|e| {
129            StorageError::Other(format!("invalid multipart boundary from domain: {}", e))
130        })?;
131        let mut multipart = Multipart::new(res.bytes_stream(), boundary);
132        let mut parts = Vec::new();
133
134        while let Some(mut field) = multipart
135            .next_field()
136            .await
137            .map_err(|e| StorageError::Other(format!("read multipart field: {}", e)))?
138        {
139            let disposition = field
140                .headers()
141                .get("content-disposition")
142                .and_then(|v| v.to_str().ok())
143                .unwrap_or_default();
144            let params = parse_disposition_params(disposition);
145            let name = params
146                .get("name")
147                .cloned()
148                .unwrap_or_else(|| "domain-data".into());
149            let data_type = params.get("data-type").cloned().unwrap_or_default();
150
151            let mut buf = Vec::new();
152            while let Some(chunk) = field
153                .chunk()
154                .await
155                .map_err(|e| StorageError::Other(format!("stream multipart chunk: {}", e)))?
156            {
157                buf.extend_from_slice(&chunk);
158            }
159
160            let scan_folder = extract_timestamp(&name)
161                .map(|ts| sanitize_component(&ts))
162                .unwrap_or_else(|| sanitize_component(&name));
163            let scan_dir = datasets_root.join(&scan_folder);
164            fs::create_dir_all(&scan_dir)
165                .await
166                .map_err(|e| StorageError::Other(format!("create scan dir: {}", e)))?;
167
168            let file_name = map_filename(&data_type, &name);
169            let file_path = scan_dir.join(&file_name);
170            if let Some(parent) = file_path.parent() {
171                fs::create_dir_all(parent)
172                    .await
173                    .map_err(|e| StorageError::Other(format!("create parent dir: {}", e)))?;
174            }
175            fs::write(&file_path, &buf)
176                .await
177                .map_err(|e| StorageError::Other(format!("write temp file: {}", e)))?;
178
179            let extracted_paths = Vec::new();
180
181            let relative_path = file_path
182                .strip_prefix(&root)
183                .unwrap_or(&file_path)
184                .to_path_buf();
185
186            parts.push(DownloadedPart {
187                id: params.get("id").cloned(),
188                name: Some(name),
189                data_type: Some(data_type),
190                domain_id: params.get("domain-id").cloned(),
191                path: file_path,
192                root: root.clone(),
193                relative_path,
194                extracted_paths,
195            });
196        }
197
198        if parts.is_empty() {
199            return Err(StorageError::Other(
200                "domain response did not contain any data parts".into(),
201            ));
202        }
203
204        Ok(parts)
205    }
206
207    pub async fn upload_artifact(
208        &self,
209        request: UploadRequest<'_>,
210    ) -> std::result::Result<Option<String>, StorageError> {
211        let domain_id = request.domain_id.trim();
212        if domain_id.is_empty() {
213            return Err(StorageError::Other(
214                "missing domain_id for artifact upload".into(),
215            ));
216        }
217        let path = format!("api/v1/domains/{}/data", domain_id);
218        let url = self
219            .base
220            .join(&path)
221            .map_err(|e| StorageError::Other(format!("join upload path: {}", e)))?;
222        let boundary = format!("------------------------{}", Uuid::new_v4().simple());
223        let (body, content_type) = build_multipart_body(
224            &boundary,
225            request.name,
226            request.data_type,
227            domain_id,
228            request.existing_id,
229            request.bytes,
230        );
231        let mut headers = self.auth_headers();
232        let ct_value = HeaderValue::from_str(&content_type)
233            .unwrap_or_else(|_| HeaderValue::from_static("multipart/form-data"));
234        headers.insert(CONTENT_TYPE, ct_value);
235        let method = if request.existing_id.is_some() {
236            Method::PUT
237        } else {
238            Method::POST
239        };
240        tracing::debug!(
241            target: "posemesh_compute_node::storage::client",
242            method = %method,
243            %url,
244            logical_path = request.logical_path,
245            name = request.name,
246            data_type = request.data_type,
247            has_existing_id = request.existing_id.is_some(),
248            "Sending domain upload request"
249        );
250        let res = self
251            .http
252            .request(method.clone(), url.clone())
253            .headers(headers)
254            .body(body)
255            .send()
256            .await
257            .map_err(|e| StorageError::Network(e.to_string()))?;
258        let status = res.status();
259        tracing::debug!(
260            target: "posemesh_compute_node::storage::client",
261            method = %method,
262            %url,
263            status = %status,
264            "Domain upload response received"
265        );
266        if !status.is_success() {
267            return Err(map_status(status));
268        }
269        let text = res
270            .text()
271            .await
272            .map_err(|e| StorageError::Network(e.to_string()))?;
273        if text.trim().is_empty() {
274            return Ok(None);
275        }
276        match serde_json::from_str::<PostDomainDataResponse>(&text) {
277            Ok(parsed) => {
278                let id = parsed.data.into_iter().next().map(|d| d.id);
279                Ok(id)
280            }
281            Err(err) => {
282                tracing::debug!(
283                    target: "posemesh_compute_node::storage::client",
284                    error = %err,
285                    body = %text,
286                    "Failed to parse domain upload response body as JSON"
287                );
288                Ok(None)
289            }
290        }
291    }
292
293    pub async fn find_artifact_id(
294        &self,
295        domain_id: &str,
296        name: &str,
297        data_type: &str,
298    ) -> std::result::Result<Option<String>, StorageError> {
299        let domain_id = domain_id.trim();
300        if domain_id.is_empty() {
301            return Err(StorageError::Other(
302                "missing domain_id for artifact lookup".into(),
303            ));
304        }
305        let path = format!("api/v1/domains/{}/data", domain_id);
306        let url = self
307            .base
308            .join(&path)
309            .map_err(|e| StorageError::Other(format!("join lookup path: {}", e)))?;
310        let mut headers = self.auth_headers();
311        headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
312        tracing::debug!(
313            target: "posemesh_compute_node::storage::client",
314            method = "GET",
315            %url,
316            artifact_name = name,
317            artifact_type = data_type,
318            "Looking up existing domain artifact"
319        );
320        let res = self
321            .http
322            .get(url.clone())
323            .headers(headers)
324            .query(&[("name", name), ("data_type", data_type)])
325            .send()
326            .await
327            .map_err(|e| StorageError::Network(e.to_string()))?;
328        let status = res.status();
329        if status == StatusCode::NOT_FOUND {
330            tracing::debug!(
331                target: "posemesh_compute_node::storage::client",
332                method = "GET",
333                %url,
334                artifact_name = name,
335                artifact_type = data_type,
336                "Artifact lookup returned 404"
337            );
338            return Ok(None);
339        }
340        if !status.is_success() {
341            return Err(map_status(status));
342        }
343        let payload = res
344            .json::<ListDomainDataResponse>()
345            .await
346            .map_err(|e| StorageError::Network(e.to_string()))?;
347        let found = payload
348            .data
349            .into_iter()
350            .find(|item| item.name == name && item.data_type == data_type);
351        Ok(found.map(|item| item.id))
352    }
353}
354
355fn build_multipart_body(
356    boundary: &str,
357    name: &str,
358    data_type: &str,
359    domain_id: &str,
360    existing_id: Option<&str>,
361    bytes: &[u8],
362) -> (Vec<u8>, String) {
363    let mut body = Vec::with_capacity(bytes.len().saturating_add(256));
364    let disposition = if let Some(id) = existing_id {
365        format!(
366            "Content-Disposition: form-data; name=\"{}\"; data-type=\"{}\"; id=\"{}\"; domain-id=\"{}\"\r\n",
367            name, data_type, id, domain_id
368        )
369    } else {
370        format!(
371            "Content-Disposition: form-data; name=\"{}\"; data-type=\"{}\"; domain-id=\"{}\"\r\n",
372            name, data_type, domain_id
373        )
374    };
375    let header = format!(
376        "--{}\r\nContent-Type: application/octet-stream\r\n{}\r\n",
377        boundary, disposition
378    );
379    body.extend_from_slice(header.as_bytes());
380    body.extend_from_slice(bytes);
381    body.extend_from_slice(b"\r\n");
382    body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
383    let content_type = format!("multipart/form-data; boundary={}", boundary);
384    (body, content_type)
385}
386
387#[derive(Debug, Deserialize)]
388struct PostDomainDataResponse {
389    #[serde(default)]
390    data: Vec<PostDomainDataItem>,
391}
392
393#[derive(Debug, Deserialize)]
394struct PostDomainDataItem {
395    #[serde(default)]
396    id: String,
397}
398
399#[derive(Debug, Deserialize)]
400struct ListDomainDataResponse {
401    #[serde(default)]
402    data: Vec<DomainDataSummary>,
403}
404
405#[derive(Debug, Deserialize)]
406struct DomainDataSummary {
407    #[serde(default)]
408    id: String,
409    #[serde(default)]
410    name: String,
411    #[serde(default)]
412    data_type: String,
413}
414
415fn map_status(status: reqwest::StatusCode) -> StorageError {
416    match status.as_u16() {
417        400 => StorageError::BadRequest,
418        401 => StorageError::Unauthorized,
419        404 => StorageError::NotFound,
420        409 => StorageError::Conflict,
421        n if (500..=599).contains(&n) => StorageError::Server(n),
422        other => StorageError::Other(format!("unexpected status: {}", other)),
423    }
424}
425
426fn parse_disposition_params(value: &str) -> HashMap<String, String> {
427    value
428        .split(';')
429        .filter_map(|segment| {
430            let trimmed = segment.trim();
431            if trimmed.is_empty() || trimmed.eq_ignore_ascii_case("form-data") {
432                return None;
433            }
434            let (key, val) = trimmed.split_once('=')?;
435            let cleaned = val.trim().trim_matches('"').to_string();
436            Some((key.trim().to_ascii_lowercase(), cleaned))
437        })
438        .collect()
439}
440
441fn sanitize_component(value: &str) -> String {
442    let sanitized: String = value
443        .chars()
444        .map(|c| {
445            if c.is_ascii_alphanumeric() || matches!(c, '-' | '_') {
446                c
447            } else {
448                '_'
449            }
450        })
451        .collect();
452    if sanitized.is_empty() {
453        "part".into()
454    } else {
455        sanitized
456    }
457}
458
459fn extract_timestamp(name: &str) -> Option<String> {
460    Regex::new(r"\d{4}-\d{2}-\d{2}[_-]\d{2}-\d{2}-\d{2}")
461        .ok()
462        .and_then(|re| re.find(name).map(|m| m.as_str().to_string()))
463}
464
465fn map_filename(data_type: &str, name: &str) -> String {
466    format!(
467        "{}.{}",
468        sanitize_component(name),
469        sanitize_component(data_type)
470    )
471}