posemesh_compute_node/storage/
client.rs1use crate::errors::StorageError;
2use crate::storage::token::TokenRef;
3use anyhow::{Context, Result};
4use multer::Multipart;
5use regex::Regex;
6use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, AUTHORIZATION, CONTENT_TYPE};
7use reqwest::{Client, Method, StatusCode};
8use serde::Deserialize;
9use std::collections::HashMap;
10use std::path::PathBuf;
11use std::time::Duration;
12use tokio::fs;
13use url::Url;
14use uuid::Uuid;
15#[derive(Debug, Clone)]
19pub struct DownloadedPart {
20 pub id: Option<String>,
21 pub name: Option<String>,
22 pub data_type: Option<String>,
23 pub domain_id: Option<String>,
24 pub path: PathBuf,
25 pub root: PathBuf,
26 pub relative_path: PathBuf,
27 pub extracted_paths: Vec<PathBuf>,
28}
29
30#[derive(Debug)]
31pub struct UploadRequest<'a> {
32 pub domain_id: &'a str,
33 pub name: &'a str,
34 pub data_type: &'a str,
35 pub logical_path: &'a str,
36 pub bytes: &'a [u8],
37 pub existing_id: Option<&'a str>,
38}
39
40#[derive(Clone)]
42pub struct DomainClient {
43 pub base: Url,
44 pub token: TokenRef,
45 http: Client,
46}
47impl DomainClient {
48 pub fn new(base: Url, token: TokenRef) -> Result<Self> {
49 let http = Client::builder()
50 .use_rustls_tls()
51 .timeout(Duration::from_secs(30))
52 .build()
53 .context("build reqwest client")?;
54 Ok(Self { base, token, http })
55 }
56
57 pub fn with_timeout(base: Url, token: TokenRef, timeout: Duration) -> Result<Self> {
58 let http = Client::builder()
59 .use_rustls_tls()
60 .timeout(timeout)
61 .build()
62 .context("build reqwest client")?;
63 Ok(Self { base, token, http })
64 }
65
66 fn auth_headers(&self) -> HeaderMap {
67 let mut h = HeaderMap::new();
68 let token = format!("Bearer {}", self.token.get());
69 let mut v = HeaderValue::from_str(&token)
70 .unwrap_or_else(|_| HeaderValue::from_static("Bearer INVALID"));
71 v.set_sensitive(true);
72 h.insert(AUTHORIZATION, v);
73 h
74 }
75
76 pub async fn download_uri(
79 &self,
80 uri: &str,
81 ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
82 let url =
83 Url::parse(uri).map_err(|e| StorageError::Other(format!("parse domain uri: {}", e)))?;
84 let url_for_log = url.clone();
85 let mut headers = self.auth_headers();
86 headers.insert(ACCEPT, HeaderValue::from_static("multipart/form-data"));
87 tracing::debug!(
88 target: "posemesh_compute_node::storage::client",
89 method = "GET",
90 %url_for_log,
91 "Sending domain request"
92 );
93 let res = self
94 .http
95 .get(url)
96 .headers(headers)
97 .send()
98 .await
99 .map_err(|e| StorageError::Network(e.to_string()))?;
100 let status = res.status();
101 tracing::debug!(
102 target: "posemesh_compute_node::storage::client",
103 method = "GET",
104 %url_for_log,
105 status = %status,
106 "Domain response received"
107 );
108 if !status.is_success() {
109 return Err(map_status(status));
110 }
111
112 let root = std::env::temp_dir().join(format!("domain-input-{}", Uuid::new_v4()));
113 fs::create_dir_all(&root)
114 .await
115 .map_err(|e| StorageError::Other(format!("create download root: {}", e)))?;
116 let datasets_root = root.join("datasets");
117 fs::create_dir_all(&datasets_root)
118 .await
119 .map_err(|e| StorageError::Other(format!("create datasets root: {}", e)))?;
120
121 let content_type = res
122 .headers()
123 .get(CONTENT_TYPE)
124 .and_then(|v| v.to_str().ok())
125 .ok_or_else(|| {
126 StorageError::Other("missing Content-Type header on domain response".into())
127 })?;
128 let boundary = multer::parse_boundary(content_type).map_err(|e| {
129 StorageError::Other(format!("invalid multipart boundary from domain: {}", e))
130 })?;
131 let mut multipart = Multipart::new(res.bytes_stream(), boundary);
132 let mut parts = Vec::new();
133
134 while let Some(mut field) = multipart
135 .next_field()
136 .await
137 .map_err(|e| StorageError::Other(format!("read multipart field: {}", e)))?
138 {
139 let disposition = field
140 .headers()
141 .get("content-disposition")
142 .and_then(|v| v.to_str().ok())
143 .unwrap_or_default();
144 let params = parse_disposition_params(disposition);
145 let name = params
146 .get("name")
147 .cloned()
148 .unwrap_or_else(|| "domain-data".into());
149 let data_type = params.get("data-type").cloned().unwrap_or_default();
150
151 let mut buf = Vec::new();
152 while let Some(chunk) = field
153 .chunk()
154 .await
155 .map_err(|e| StorageError::Other(format!("stream multipart chunk: {}", e)))?
156 {
157 buf.extend_from_slice(&chunk);
158 }
159
160 let scan_folder = extract_timestamp(&name)
161 .map(|ts| sanitize_component(&ts))
162 .unwrap_or_else(|| sanitize_component(&name));
163 let scan_dir = datasets_root.join(&scan_folder);
164 fs::create_dir_all(&scan_dir)
165 .await
166 .map_err(|e| StorageError::Other(format!("create scan dir: {}", e)))?;
167
168 let file_name = map_filename(&data_type, &name);
169 let file_path = scan_dir.join(&file_name);
170 if let Some(parent) = file_path.parent() {
171 fs::create_dir_all(parent)
172 .await
173 .map_err(|e| StorageError::Other(format!("create parent dir: {}", e)))?;
174 }
175 fs::write(&file_path, &buf)
176 .await
177 .map_err(|e| StorageError::Other(format!("write temp file: {}", e)))?;
178
179 let extracted_paths = Vec::new();
180
181 let relative_path = file_path
182 .strip_prefix(&root)
183 .unwrap_or(&file_path)
184 .to_path_buf();
185
186 parts.push(DownloadedPart {
187 id: params.get("id").cloned(),
188 name: Some(name),
189 data_type: Some(data_type),
190 domain_id: params.get("domain-id").cloned(),
191 path: file_path,
192 root: root.clone(),
193 relative_path,
194 extracted_paths,
195 });
196 }
197
198 if parts.is_empty() {
199 return Err(StorageError::Other(
200 "domain response did not contain any data parts".into(),
201 ));
202 }
203
204 Ok(parts)
205 }
206
207 pub async fn upload_artifact(
208 &self,
209 request: UploadRequest<'_>,
210 ) -> std::result::Result<Option<String>, StorageError> {
211 let domain_id = request.domain_id.trim();
212 if domain_id.is_empty() {
213 return Err(StorageError::Other(
214 "missing domain_id for artifact upload".into(),
215 ));
216 }
217 let path = format!("api/v1/domains/{}/data", domain_id);
218 let url = self
219 .base
220 .join(&path)
221 .map_err(|e| StorageError::Other(format!("join upload path: {}", e)))?;
222 let boundary = format!("------------------------{}", Uuid::new_v4().simple());
223 let (body, content_type) = build_multipart_body(
224 &boundary,
225 request.name,
226 request.data_type,
227 domain_id,
228 request.existing_id,
229 request.bytes,
230 );
231 let mut headers = self.auth_headers();
232 let ct_value = HeaderValue::from_str(&content_type)
233 .unwrap_or_else(|_| HeaderValue::from_static("multipart/form-data"));
234 headers.insert(CONTENT_TYPE, ct_value);
235 let method = if request.existing_id.is_some() {
236 Method::PUT
237 } else {
238 Method::POST
239 };
240 tracing::debug!(
241 target: "posemesh_compute_node::storage::client",
242 method = %method,
243 %url,
244 logical_path = request.logical_path,
245 name = request.name,
246 data_type = request.data_type,
247 has_existing_id = request.existing_id.is_some(),
248 "Sending domain upload request"
249 );
250 let res = self
251 .http
252 .request(method.clone(), url.clone())
253 .headers(headers)
254 .body(body)
255 .send()
256 .await
257 .map_err(|e| StorageError::Network(e.to_string()))?;
258 let status = res.status();
259 tracing::debug!(
260 target: "posemesh_compute_node::storage::client",
261 method = %method,
262 %url,
263 status = %status,
264 "Domain upload response received"
265 );
266 if !status.is_success() {
267 return Err(map_status(status));
268 }
269 let text = res
270 .text()
271 .await
272 .map_err(|e| StorageError::Network(e.to_string()))?;
273 if text.trim().is_empty() {
274 return Ok(None);
275 }
276 match serde_json::from_str::<PostDomainDataResponse>(&text) {
277 Ok(parsed) => {
278 let id = parsed.data.into_iter().next().map(|d| d.id);
279 Ok(id)
280 }
281 Err(err) => {
282 tracing::debug!(
283 target: "posemesh_compute_node::storage::client",
284 error = %err,
285 body = %text,
286 "Failed to parse domain upload response body as JSON"
287 );
288 Ok(None)
289 }
290 }
291 }
292
293 pub async fn find_artifact_id(
294 &self,
295 domain_id: &str,
296 name: &str,
297 data_type: &str,
298 ) -> std::result::Result<Option<String>, StorageError> {
299 let domain_id = domain_id.trim();
300 if domain_id.is_empty() {
301 return Err(StorageError::Other(
302 "missing domain_id for artifact lookup".into(),
303 ));
304 }
305 let path = format!("api/v1/domains/{}/data", domain_id);
306 let url = self
307 .base
308 .join(&path)
309 .map_err(|e| StorageError::Other(format!("join lookup path: {}", e)))?;
310 let mut headers = self.auth_headers();
311 headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
312 tracing::debug!(
313 target: "posemesh_compute_node::storage::client",
314 method = "GET",
315 %url,
316 artifact_name = name,
317 artifact_type = data_type,
318 "Looking up existing domain artifact"
319 );
320 let res = self
321 .http
322 .get(url.clone())
323 .headers(headers)
324 .query(&[("name", name), ("data_type", data_type)])
325 .send()
326 .await
327 .map_err(|e| StorageError::Network(e.to_string()))?;
328 let status = res.status();
329 if status == StatusCode::NOT_FOUND {
330 tracing::debug!(
331 target: "posemesh_compute_node::storage::client",
332 method = "GET",
333 %url,
334 artifact_name = name,
335 artifact_type = data_type,
336 "Artifact lookup returned 404"
337 );
338 return Ok(None);
339 }
340 if !status.is_success() {
341 return Err(map_status(status));
342 }
343 let payload = res
344 .json::<ListDomainDataResponse>()
345 .await
346 .map_err(|e| StorageError::Network(e.to_string()))?;
347 let found = payload
348 .data
349 .into_iter()
350 .find(|item| item.name == name && item.data_type == data_type);
351 Ok(found.map(|item| item.id))
352 }
353}
354
355fn build_multipart_body(
356 boundary: &str,
357 name: &str,
358 data_type: &str,
359 domain_id: &str,
360 existing_id: Option<&str>,
361 bytes: &[u8],
362) -> (Vec<u8>, String) {
363 let mut body = Vec::with_capacity(bytes.len().saturating_add(256));
364 let disposition = if let Some(id) = existing_id {
365 format!(
366 "Content-Disposition: form-data; name=\"{}\"; data-type=\"{}\"; id=\"{}\"; domain-id=\"{}\"\r\n",
367 name, data_type, id, domain_id
368 )
369 } else {
370 format!(
371 "Content-Disposition: form-data; name=\"{}\"; data-type=\"{}\"; domain-id=\"{}\"\r\n",
372 name, data_type, domain_id
373 )
374 };
375 let header = format!(
376 "--{}\r\nContent-Type: application/octet-stream\r\n{}\r\n",
377 boundary, disposition
378 );
379 body.extend_from_slice(header.as_bytes());
380 body.extend_from_slice(bytes);
381 body.extend_from_slice(b"\r\n");
382 body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
383 let content_type = format!("multipart/form-data; boundary={}", boundary);
384 (body, content_type)
385}
386
387#[derive(Debug, Deserialize)]
388struct PostDomainDataResponse {
389 #[serde(default)]
390 data: Vec<PostDomainDataItem>,
391}
392
393#[derive(Debug, Deserialize)]
394struct PostDomainDataItem {
395 #[serde(default)]
396 id: String,
397}
398
399#[derive(Debug, Deserialize)]
400struct ListDomainDataResponse {
401 #[serde(default)]
402 data: Vec<DomainDataSummary>,
403}
404
405#[derive(Debug, Deserialize)]
406struct DomainDataSummary {
407 #[serde(default)]
408 id: String,
409 #[serde(default)]
410 name: String,
411 #[serde(default)]
412 data_type: String,
413}
414
415fn map_status(status: reqwest::StatusCode) -> StorageError {
416 match status.as_u16() {
417 400 => StorageError::BadRequest,
418 401 => StorageError::Unauthorized,
419 404 => StorageError::NotFound,
420 409 => StorageError::Conflict,
421 n if (500..=599).contains(&n) => StorageError::Server(n),
422 other => StorageError::Other(format!("unexpected status: {}", other)),
423 }
424}
425
426fn parse_disposition_params(value: &str) -> HashMap<String, String> {
427 value
428 .split(';')
429 .filter_map(|segment| {
430 let trimmed = segment.trim();
431 if trimmed.is_empty() || trimmed.eq_ignore_ascii_case("form-data") {
432 return None;
433 }
434 let (key, val) = trimmed.split_once('=')?;
435 let cleaned = val.trim().trim_matches('"').to_string();
436 Some((key.trim().to_ascii_lowercase(), cleaned))
437 })
438 .collect()
439}
440
441fn sanitize_component(value: &str) -> String {
442 let sanitized: String = value
443 .chars()
444 .map(|c| {
445 if c.is_ascii_alphanumeric() || matches!(c, '-' | '_') {
446 c
447 } else {
448 '_'
449 }
450 })
451 .collect();
452 if sanitized.is_empty() {
453 "part".into()
454 } else {
455 sanitized
456 }
457}
458
459fn extract_timestamp(name: &str) -> Option<String> {
460 Regex::new(r"\d{4}-\d{2}-\d{2}[_-]\d{2}-\d{2}-\d{2}")
461 .ok()
462 .and_then(|re| re.find(name).map(|m| m.as_str().to_string()))
463}
464
465fn map_filename(data_type: &str, name: &str) -> String {
466 format!(
467 "{}.{}",
468 sanitize_component(name),
469 sanitize_component(data_type)
470 )
471}