posemesh_compute_node/storage/
client.rs

1use crate::errors::StorageError;
2use crate::storage::token::TokenRef;
3use anyhow::{Context, Result};
4use futures::StreamExt;
5use regex::Regex;
6use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, AUTHORIZATION};
7use reqwest::{Client, Method, StatusCode};
8use serde::Deserialize;
9use std::path::PathBuf;
10use std::time::Duration;
11use tokio::fs;
12use url::Url;
13use uuid::Uuid;
14// zip extraction moved to reconstruction-specific runner
15
16/// Representation of one multipart section downloaded from Domain.
17#[derive(Debug, Clone)]
18pub struct DownloadedPart {
19    pub id: Option<String>,
20    pub name: Option<String>,
21    pub data_type: Option<String>,
22    pub domain_id: Option<String>,
23    pub path: PathBuf,
24    pub root: PathBuf,
25    pub relative_path: PathBuf,
26    pub extracted_paths: Vec<PathBuf>,
27}
28
29#[derive(Debug)]
30pub struct UploadRequest<'a> {
31    pub domain_id: &'a str,
32    pub name: &'a str,
33    pub data_type: &'a str,
34    pub logical_path: &'a str,
35    pub bytes: &'a [u8],
36    pub existing_id: Option<&'a str>,
37}
38
39/// Domain server HTTP client (skeleton; HTTP added later).
40#[derive(Clone)]
41pub struct DomainClient {
42    pub base: Url,
43    pub token: TokenRef,
44    http: Client,
45}
46impl DomainClient {
47    pub fn new(base: Url, token: TokenRef) -> Result<Self> {
48        let http = Client::builder()
49            .use_rustls_tls()
50            .timeout(Duration::from_secs(30))
51            .build()
52            .context("build reqwest client")?;
53        Ok(Self { base, token, http })
54    }
55
56    pub fn with_timeout(base: Url, token: TokenRef, timeout: Duration) -> Result<Self> {
57        let http = Client::builder()
58            .use_rustls_tls()
59            .timeout(timeout)
60            .build()
61            .context("build reqwest client")?;
62        Ok(Self { base, token, http })
63    }
64
65    fn auth_headers(&self) -> HeaderMap {
66        let mut h = HeaderMap::new();
67        let token = format!("Bearer {}", self.token.get());
68        let mut v = HeaderValue::from_str(&token)
69            .unwrap_or_else(|_| HeaderValue::from_static("Bearer INVALID"));
70        v.set_sensitive(true);
71        h.insert(AUTHORIZATION, v);
72        h
73    }
74
75    /// Download a Domain data item referenced by an absolute URI, persisting each multipart
76    /// part into a temporary file and returning its metadata.
77    pub async fn download_uri(
78        &self,
79        uri: &str,
80    ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
81        let url =
82            Url::parse(uri).map_err(|e| StorageError::Other(format!("parse domain uri: {}", e)))?;
83        let url_for_log = url.clone();
84        tracing::debug!(
85            target: "posemesh_compute_node::storage::client",
86            method = "GET",
87            %url_for_log,
88            "Sending domain request"
89        );
90        let client_id = std::env::var("CLIENT_ID")
91            .unwrap_or_else(|_| format!("posemesh-compute-node/{}", uuid::Uuid::new_v4()));
92        let res = posemesh_domain_http::domain_data::request_download_absolute(
93            url.as_str(),
94            &client_id,
95            &self.token.get(),
96        )
97        .await
98        .map_err(|e| StorageError::Network(e.to_string()))?;
99        let status = res.status();
100        tracing::debug!(
101            target: "posemesh_compute_node::storage::client",
102            method = "GET",
103            %url_for_log,
104            status = %status,
105            "Domain response received"
106        );
107        if !status.is_success() {
108            return Err(map_status(status));
109        }
110
111        let root = std::env::temp_dir().join(format!("domain-input-{}", Uuid::new_v4()));
112        fs::create_dir_all(&root)
113            .await
114            .map_err(|e| StorageError::Other(format!("create download root: {}", e)))?;
115        let datasets_root = root.join("datasets");
116        fs::create_dir_all(&datasets_root)
117            .await
118            .map_err(|e| StorageError::Other(format!("create datasets root: {}", e)))?;
119
120        let mut parts = Vec::new();
121        let mut rx = posemesh_domain_http::domain_data::stream_from_response(res)
122            .await
123            .map_err(|e| StorageError::Other(e.to_string()))?;
124
125        while let Some(item) = rx.next().await {
126            let domain_item = item.map_err(|e| StorageError::Other(e.to_string()))?;
127            let name = domain_item.metadata.name.clone();
128            let data_type = domain_item.metadata.data_type.clone();
129
130            let scan_folder = extract_timestamp(&name)
131                .map(|ts| sanitize_component(&ts))
132                .unwrap_or_else(|| sanitize_component(&name));
133            let scan_dir = datasets_root.join(&scan_folder);
134            fs::create_dir_all(&scan_dir)
135                .await
136                .map_err(|e| StorageError::Other(format!("create scan dir: {}", e)))?;
137
138            let file_name = map_filename(&data_type, &name);
139            let file_path = scan_dir.join(&file_name);
140            if let Some(parent) = file_path.parent() {
141                fs::create_dir_all(parent)
142                    .await
143                    .map_err(|e| StorageError::Other(format!("create parent dir: {}", e)))?;
144            }
145            fs::write(&file_path, &domain_item.data)
146                .await
147                .map_err(|e| StorageError::Other(format!("write temp file: {}", e)))?;
148
149            let extracted_paths = Vec::new();
150
151            let relative_path = file_path
152                .strip_prefix(&root)
153                .unwrap_or(&file_path)
154                .to_path_buf();
155
156            parts.push(DownloadedPart {
157                id: Some(domain_item.metadata.id),
158                name: Some(name),
159                data_type: Some(data_type),
160                domain_id: Some(domain_item.metadata.domain_id),
161                path: file_path,
162                root: root.clone(),
163                relative_path,
164                extracted_paths,
165            });
166        }
167
168        if parts.is_empty() {
169            return Err(StorageError::Other(
170                "domain response did not contain any data parts".into(),
171            ));
172        }
173
174        Ok(parts)
175    }
176
177    pub async fn upload_artifact(
178        &self,
179        request: UploadRequest<'_>,
180    ) -> std::result::Result<Option<String>, StorageError> {
181        let domain_id = request.domain_id.trim();
182        if domain_id.is_empty() {
183            return Err(StorageError::Other(
184                "missing domain_id for artifact upload".into(),
185            ));
186        }
187        let action = if let Some(id) = request.existing_id {
188            posemesh_domain_http::domain_data::DomainAction::Update(
189                posemesh_domain_http::domain_data::UpdateDomainData { id: id.to_string() },
190            )
191        } else {
192            posemesh_domain_http::domain_data::DomainAction::Create(
193                posemesh_domain_http::domain_data::CreateDomainData {
194                    name: request.name.to_string(),
195                    data_type: request.data_type.to_string(),
196                },
197            )
198        };
199
200        let upload = posemesh_domain_http::domain_data::UploadDomainData {
201            action,
202            data: request.bytes.to_vec(),
203        };
204
205        let base = self.base.as_str().trim_end_matches('/');
206        let method = if matches!(
207            upload.action,
208            posemesh_domain_http::domain_data::DomainAction::Update(_)
209        ) {
210            Method::PUT
211        } else {
212            Method::POST
213        };
214        tracing::debug!(
215            target: "posemesh_compute_node::storage::client",
216            method = %method,
217            url = %format!("{}/api/v1/domains/{}/data", base, domain_id),
218            logical_path = request.logical_path,
219            name = request.name,
220            data_type = request.data_type,
221            has_existing_id = request.existing_id.is_some(),
222            "Sending domain upload request"
223        );
224
225        match posemesh_domain_http::domain_data::upload_one(
226            base,
227            &self.token.get(),
228            domain_id,
229            upload,
230        )
231        .await
232        {
233            Ok(mut items) => {
234                let id = items.drain(..).next().map(|d| d.metadata.id);
235                Ok(id)
236            }
237            Err((status, _body)) => Err(map_status(status)),
238        }
239    }
240
241    pub async fn find_artifact_id(
242        &self,
243        domain_id: &str,
244        name: &str,
245        data_type: &str,
246    ) -> std::result::Result<Option<String>, StorageError> {
247        let domain_id = domain_id.trim();
248        if domain_id.is_empty() {
249            return Err(StorageError::Other(
250                "missing domain_id for artifact lookup".into(),
251            ));
252        }
253        let path = format!("api/v1/domains/{}/data", domain_id);
254        let url = self
255            .base
256            .join(&path)
257            .map_err(|e| StorageError::Other(format!("join lookup path: {}", e)))?;
258        let mut headers = self.auth_headers();
259        headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
260        tracing::debug!(
261            target: "posemesh_compute_node::storage::client",
262            method = "GET",
263            %url,
264            artifact_name = name,
265            artifact_type = data_type,
266            "Looking up existing domain artifact"
267        );
268        let res = self
269            .http
270            .get(url.clone())
271            .headers(headers)
272            .query(&[("name", name), ("data_type", data_type)])
273            .send()
274            .await
275            .map_err(|e| StorageError::Network(e.to_string()))?;
276        let status = res.status();
277        if status == StatusCode::NOT_FOUND {
278            tracing::debug!(
279                target: "posemesh_compute_node::storage::client",
280                method = "GET",
281                %url,
282                artifact_name = name,
283                artifact_type = data_type,
284                "Artifact lookup returned 404"
285            );
286            return Ok(None);
287        }
288        if !status.is_success() {
289            return Err(map_status(status));
290        }
291        let payload = res
292            .json::<ListDomainDataResponse>()
293            .await
294            .map_err(|e| StorageError::Network(e.to_string()))?;
295        let found = payload
296            .data
297            .into_iter()
298            .find(|item| item.name == name && item.data_type == data_type);
299        Ok(found.map(|item| item.id))
300    }
301}
302
303#[derive(Debug, Deserialize)]
304struct ListDomainDataResponse {
305    #[serde(default)]
306    data: Vec<DomainDataSummary>,
307}
308
309#[derive(Debug, Deserialize)]
310struct DomainDataSummary {
311    #[serde(default)]
312    id: String,
313    #[serde(default)]
314    name: String,
315    #[serde(default)]
316    data_type: String,
317}
318
319fn map_status(status: reqwest::StatusCode) -> StorageError {
320    match status.as_u16() {
321        400 => StorageError::BadRequest,
322        401 => StorageError::Unauthorized,
323        404 => StorageError::NotFound,
324        409 => StorageError::Conflict,
325        n if (500..=599).contains(&n) => StorageError::Server(n),
326        other => StorageError::Other(format!("unexpected status: {}", other)),
327    }
328}
329
330// no parse_disposition_params; headers are parsed in posemesh-domain-http
331
332fn sanitize_component(value: &str) -> String {
333    let sanitized: String = value
334        .chars()
335        .map(|c| {
336            if c.is_ascii_alphanumeric() || matches!(c, '-' | '_') {
337                c
338            } else {
339                '_'
340            }
341        })
342        .collect();
343    if sanitized.is_empty() {
344        "part".into()
345    } else {
346        sanitized
347    }
348}
349
350fn extract_timestamp(name: &str) -> Option<String> {
351    Regex::new(r"\d{4}-\d{2}-\d{2}[_-]\d{2}-\d{2}-\d{2}")
352        .ok()
353        .and_then(|re| re.find(name).map(|m| m.as_str().to_string()))
354}
355
356fn map_filename(data_type: &str, name: &str) -> String {
357    format!(
358        "{}.{}",
359        sanitize_component(name),
360        sanitize_component(data_type)
361    )
362}