posemesh_compute_node/storage/
client.rs

1use crate::errors::StorageError;
2use crate::storage::token::TokenRef;
3use anyhow::{Context, Result};
4use futures::StreamExt;
5use regex::Regex;
6use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, AUTHORIZATION};
7use reqwest::{Client, Method, StatusCode};
8use serde::Deserialize;
9use std::path::PathBuf;
10use std::time::Duration;
11use tokio::fs;
12use url::Url;
13use uuid::Uuid;
14// zip extraction moved to reconstruction-specific runner
15
16/// Representation of one multipart section downloaded from Domain.
17#[derive(Debug, Clone)]
18pub struct DownloadedPart {
19    pub id: Option<String>,
20    pub name: Option<String>,
21    pub data_type: Option<String>,
22    pub domain_id: Option<String>,
23    pub path: PathBuf,
24    pub root: PathBuf,
25    pub relative_path: PathBuf,
26    pub extracted_paths: Vec<PathBuf>,
27}
28
29#[derive(Debug)]
30pub struct UploadRequest<'a> {
31    pub domain_id: &'a str,
32    pub name: &'a str,
33    pub data_type: &'a str,
34    pub logical_path: &'a str,
35    pub bytes: &'a [u8],
36    pub existing_id: Option<&'a str>,
37}
38
39/// Domain server HTTP client (skeleton; HTTP added later).
40#[derive(Clone)]
41pub struct DomainClient {
42    pub base: Url,
43    pub token: TokenRef,
44    http: Client,
45}
46impl DomainClient {
47    pub fn new(base: Url, token: TokenRef) -> Result<Self> {
48        let http = Client::builder()
49            .use_rustls_tls()
50            .timeout(Duration::from_secs(30))
51            .build()
52            .context("build reqwest client")?;
53        Ok(Self { base, token, http })
54    }
55
56    pub fn with_timeout(base: Url, token: TokenRef, timeout: Duration) -> Result<Self> {
57        let http = Client::builder()
58            .use_rustls_tls()
59            .timeout(timeout)
60            .build()
61            .context("build reqwest client")?;
62        Ok(Self { base, token, http })
63    }
64
65    fn auth_headers(&self) -> HeaderMap {
66        let mut h = HeaderMap::new();
67        let token = format!("Bearer {}", self.token.get());
68        let mut v = HeaderValue::from_str(&token)
69            .unwrap_or_else(|_| HeaderValue::from_static("Bearer INVALID"));
70        v.set_sensitive(true);
71        h.insert(AUTHORIZATION, v);
72        h
73    }
74
75    /// Download a Domain data item referenced by an absolute URI, persisting each multipart
76    /// part into a temporary file and returning its metadata.
77    pub async fn download_uri(
78        &self,
79        uri: &str,
80    ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
81        let url =
82            Url::parse(uri).map_err(|e| StorageError::Other(format!("parse domain uri: {}", e)))?;
83        let url_for_log = url.clone();
84        tracing::debug!(
85            target: "posemesh_compute_node::storage::client",
86            method = "GET",
87            %url_for_log,
88            "Sending domain request"
89        );
90        let client_id = std::env::var("CLIENT_ID")
91            .unwrap_or_else(|_| format!("posemesh-compute-node/{}", uuid::Uuid::new_v4()));
92        let res = posemesh_domain_http::domain_data::request_download_absolute(
93            url.as_str(),
94            &client_id,
95            &self.token.get(),
96        )
97        .await
98        .map_err(|e| StorageError::Network(e.to_string()))?;
99        let status = res.status();
100        tracing::debug!(
101            target: "posemesh_compute_node::storage::client",
102            method = "GET",
103            %url_for_log,
104            status = %status,
105            "Domain response received"
106        );
107        if !status.is_success() {
108            return Err(map_status(status));
109        }
110
111        let root = std::env::temp_dir().join(format!("domain-input-{}", Uuid::new_v4()));
112        fs::create_dir_all(&root)
113            .await
114            .map_err(|e| StorageError::Other(format!("create download root: {}", e)))?;
115        let datasets_root = root.join("datasets");
116        fs::create_dir_all(&datasets_root)
117            .await
118            .map_err(|e| StorageError::Other(format!("create datasets root: {}", e)))?;
119
120        let mut parts = Vec::new();
121        let mut rx = posemesh_domain_http::domain_data::stream_from_response(res)
122            .await
123            .map_err(|e| StorageError::Other(e.to_string()))?;
124
125        while let Some(item) = rx.next().await {
126            let domain_item = item.map_err(|e| StorageError::Other(e.to_string()))?;
127            let name = domain_item.metadata.name.clone();
128            let data_type = domain_item.metadata.data_type.clone();
129
130            let scan_folder = extract_timestamp(&name)
131                .map(|ts| sanitize_component(&ts))
132                .unwrap_or_else(|| sanitize_component(&name));
133            let scan_dir = datasets_root.join(&scan_folder);
134            fs::create_dir_all(&scan_dir)
135                .await
136                .map_err(|e| StorageError::Other(format!("create scan dir: {}", e)))?;
137
138            let file_name = map_filename(&data_type, &name);
139            let file_path = scan_dir.join(&file_name);
140            if let Some(parent) = file_path.parent() {
141                fs::create_dir_all(parent)
142                    .await
143                    .map_err(|e| StorageError::Other(format!("create parent dir: {}", e)))?;
144            }
145            fs::write(&file_path, &domain_item.data)
146                .await
147                .map_err(|e| StorageError::Other(format!("write temp file: {}", e)))?;
148
149            let extracted_paths = Vec::new();
150
151            let relative_path = file_path
152                .strip_prefix(&root)
153                .unwrap_or(&file_path)
154                .to_path_buf();
155
156            parts.push(DownloadedPart {
157                id: Some(domain_item.metadata.id),
158                name: Some(name),
159                data_type: Some(data_type),
160                domain_id: Some(domain_item.metadata.domain_id),
161                path: file_path,
162                root: root.clone(),
163                relative_path,
164                extracted_paths,
165            });
166        }
167
168        if parts.is_empty() {
169            return Err(StorageError::Other(
170                "domain response did not contain any data parts".into(),
171            ));
172        }
173
174        Ok(parts)
175    }
176
177    pub async fn upload_artifact(
178        &self,
179        request: UploadRequest<'_>,
180    ) -> std::result::Result<Option<String>, StorageError> {
181        let domain_id = request.domain_id.trim();
182        if domain_id.is_empty() {
183            return Err(StorageError::Other(
184                "missing domain_id for artifact upload".into(),
185            ));
186        }
187        let action = if let Some(id) = request.existing_id {
188            posemesh_domain_http::domain_data::DomainAction::Update { id: id.to_string() }
189        } else {
190            posemesh_domain_http::domain_data::DomainAction::Create {
191                name: request.name.to_string(),
192                data_type: request.data_type.to_string(),
193            }
194        };
195
196        let upload = posemesh_domain_http::domain_data::UploadDomainData {
197            action,
198            data: request.bytes.to_vec(),
199        };
200
201        let base = self.base.as_str().trim_end_matches('/');
202        let method = if matches!(
203            upload.action,
204            posemesh_domain_http::domain_data::DomainAction::Update { .. }
205        ) {
206            Method::PUT
207        } else {
208            Method::POST
209        };
210        tracing::debug!(
211            target: "posemesh_compute_node::storage::client",
212            method = %method,
213            url = %format!("{}/api/v1/domains/{}/data", base, domain_id),
214            logical_path = request.logical_path,
215            name = request.name,
216            data_type = request.data_type,
217            has_existing_id = request.existing_id.is_some(),
218            "Sending domain upload request"
219        );
220
221        match posemesh_domain_http::domain_data::upload_one(
222            base,
223            &self.token.get(),
224            domain_id,
225            upload,
226        )
227        .await
228        {
229            Ok(mut items) => {
230                let id = items.drain(..).next().map(|d| d.metadata.id);
231                Ok(id)
232            }
233            Err((status, _body)) => Err(map_status(status)),
234        }
235    }
236
237    pub async fn find_artifact_id(
238        &self,
239        domain_id: &str,
240        name: &str,
241        data_type: &str,
242    ) -> std::result::Result<Option<String>, StorageError> {
243        let domain_id = domain_id.trim();
244        if domain_id.is_empty() {
245            return Err(StorageError::Other(
246                "missing domain_id for artifact lookup".into(),
247            ));
248        }
249        let path = format!("api/v1/domains/{}/data", domain_id);
250        let url = self
251            .base
252            .join(&path)
253            .map_err(|e| StorageError::Other(format!("join lookup path: {}", e)))?;
254        let mut headers = self.auth_headers();
255        headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
256        tracing::debug!(
257            target: "posemesh_compute_node::storage::client",
258            method = "GET",
259            %url,
260            artifact_name = name,
261            artifact_type = data_type,
262            "Looking up existing domain artifact"
263        );
264        let res = self
265            .http
266            .get(url.clone())
267            .headers(headers)
268            .query(&[("name", name), ("data_type", data_type)])
269            .send()
270            .await
271            .map_err(|e| StorageError::Network(e.to_string()))?;
272        let status = res.status();
273        if status == StatusCode::NOT_FOUND {
274            tracing::debug!(
275                target: "posemesh_compute_node::storage::client",
276                method = "GET",
277                %url,
278                artifact_name = name,
279                artifact_type = data_type,
280                "Artifact lookup returned 404"
281            );
282            return Ok(None);
283        }
284        if !status.is_success() {
285            return Err(map_status(status));
286        }
287        let payload = res
288            .json::<ListDomainDataResponse>()
289            .await
290            .map_err(|e| StorageError::Network(e.to_string()))?;
291        let found = payload
292            .data
293            .into_iter()
294            .find(|item| item.name == name && item.data_type == data_type);
295        Ok(found.map(|item| item.id))
296    }
297}
298
299#[derive(Debug, Deserialize)]
300struct ListDomainDataResponse {
301    #[serde(default)]
302    data: Vec<DomainDataSummary>,
303}
304
305#[derive(Debug, Deserialize)]
306struct DomainDataSummary {
307    #[serde(default)]
308    id: String,
309    #[serde(default)]
310    name: String,
311    #[serde(default)]
312    data_type: String,
313}
314
315fn map_status(status: reqwest::StatusCode) -> StorageError {
316    match status.as_u16() {
317        400 => StorageError::BadRequest,
318        401 => StorageError::Unauthorized,
319        404 => StorageError::NotFound,
320        409 => StorageError::Conflict,
321        n if (500..=599).contains(&n) => StorageError::Server(n),
322        other => StorageError::Other(format!("unexpected status: {}", other)),
323    }
324}
325
326// no parse_disposition_params; headers are parsed in posemesh-domain-http
327
328fn sanitize_component(value: &str) -> String {
329    let sanitized: String = value
330        .chars()
331        .map(|c| {
332            if c.is_ascii_alphanumeric() || matches!(c, '-' | '_') {
333                c
334            } else {
335                '_'
336            }
337        })
338        .collect();
339    if sanitized.is_empty() {
340        "part".into()
341    } else {
342        sanitized
343    }
344}
345
346fn extract_timestamp(name: &str) -> Option<String> {
347    Regex::new(r"\d{4}-\d{2}-\d{2}[_-]\d{2}-\d{2}-\d{2}")
348        .ok()
349        .and_then(|re| re.find(name).map(|m| m.as_str().to_string()))
350}
351
352fn map_filename(data_type: &str, name: &str) -> String {
353    format!(
354        "{}.{}",
355        sanitize_component(name),
356        sanitize_component(data_type)
357    )
358}