posemesh_compute_node/storage/
client.rs

1use crate::errors::StorageError;
2use crate::storage::token::TokenRef;
3use anyhow::Result;
4use futures::StreamExt;
5use regex::Regex;
6use reqwest::Method;
7use std::path::PathBuf;
8use std::time::Duration;
9use tokio::fs;
10use url::Url;
11use uuid::Uuid;
12// zip extraction moved to reconstruction-specific runner
13
14/// Representation of one multipart section downloaded from Domain.
15#[derive(Debug, Clone)]
16pub struct DownloadedPart {
17    pub id: Option<String>,
18    pub name: Option<String>,
19    pub data_type: Option<String>,
20    pub domain_id: Option<String>,
21    pub path: PathBuf,
22    pub root: PathBuf,
23    pub relative_path: PathBuf,
24    pub extracted_paths: Vec<PathBuf>,
25}
26
27#[derive(Debug)]
28pub struct UploadRequest<'a> {
29    pub domain_id: &'a str,
30    pub name: &'a str,
31    pub data_type: &'a str,
32    pub logical_path: &'a str,
33    pub bytes: &'a [u8],
34    pub existing_id: Option<&'a str>,
35}
36
37/// Domain server HTTP client (skeleton; HTTP added later).
38#[derive(Clone)]
39pub struct DomainClient {
40    pub base: Url,
41    pub token: TokenRef,
42    client_id: String,
43}
44impl DomainClient {
45    pub fn new(base: Url, token: TokenRef) -> Result<Self> {
46        let client_id = env_client_id();
47        Ok(Self {
48            base,
49            token,
50            client_id,
51        })
52    }
53
54    pub fn with_timeout(base: Url, token: TokenRef, _timeout: Duration) -> Result<Self> {
55        let client_id = env_client_id();
56        Ok(Self {
57            base,
58            token,
59            client_id,
60        })
61    }
62
63    /// Download a Domain data item referenced by an absolute URI, persisting each multipart
64    /// part into a temporary file and returning its metadata.
65    pub async fn download_uri(
66        &self,
67        uri: &str,
68    ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
69        let resolved = resolve_domain_url(&self.base, uri)?;
70        let (domain_id, query) = parse_download_target(&resolved, None)?;
71        self.download_domain_data(&resolved, &domain_id, query)
72            .await
73    }
74
75    /// Download domain data referenced by a CID, which can be either:
76    /// - a bare domain-data ID (UUID), or
77    /// - an absolute/relative URL under the domain server.
78    pub async fn download_cid(
79        &self,
80        domain_id: &str,
81        cid: &str,
82    ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
83        let cid = cid.trim();
84        if cid.is_empty() {
85            return Err(StorageError::Other("empty cid".into()));
86        }
87
88        if cid.contains("://") || cid.starts_with('/') {
89            let resolved = resolve_domain_url(&self.base, cid)?;
90            let (domain_id, query) = parse_download_target(&resolved, Some(domain_id))?;
91            return self
92                .download_domain_data(&resolved, &domain_id, query)
93                .await;
94        }
95
96        let query = posemesh_domain_http::domain_data::DownloadQuery {
97            ids: vec![cid.to_string()],
98            name: None,
99            data_type: None,
100        };
101        self.download_domain_data(&self.base, domain_id, query)
102            .await
103    }
104
105    async fn download_domain_data(
106        &self,
107        url_for_log: &Url,
108        domain_id: &str,
109        query: posemesh_domain_http::domain_data::DownloadQuery,
110    ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
111        let domain_id = domain_id.trim();
112        if domain_id.is_empty() {
113            return Err(StorageError::Other("missing domain_id for download".into()));
114        }
115
116        tracing::debug!(
117            target: "posemesh_compute_node::storage::client",
118            method = "GET",
119            %url_for_log,
120            domain_id = domain_id,
121            ids = ?query.ids,
122            name = ?query.name,
123            data_type = ?query.data_type,
124            "Downloading domain data"
125        );
126
127        let base = self.base.as_str().trim_end_matches('/');
128        let mut rx = posemesh_domain_http::domain_data::download_v1_stream(
129            base,
130            self.client_id.as_str(),
131            self.token.get().as_str(),
132            domain_id,
133            &query,
134        )
135        .await
136        .map_err(map_domain_error)?;
137
138        let root = std::env::temp_dir().join(format!("domain-input-{}", Uuid::new_v4()));
139        fs::create_dir_all(&root)
140            .await
141            .map_err(|e| StorageError::Other(format!("create download root: {}", e)))?;
142        let datasets_root = root.join("datasets");
143        fs::create_dir_all(&datasets_root)
144            .await
145            .map_err(|e| StorageError::Other(format!("create datasets root: {}", e)))?;
146
147        let mut parts = Vec::new();
148
149        while let Some(item) = rx.next().await {
150            let domain_item = item.map_err(map_domain_error)?;
151            let name = domain_item.metadata.name.clone();
152            let data_type = domain_item.metadata.data_type.clone();
153
154            let scan_folder = extract_timestamp(&name)
155                .map(|ts| sanitize_component(&ts))
156                .unwrap_or_else(|| sanitize_component(&name));
157            let scan_dir = datasets_root.join(&scan_folder);
158            fs::create_dir_all(&scan_dir)
159                .await
160                .map_err(|e| StorageError::Other(format!("create scan dir: {}", e)))?;
161
162            let file_name = map_filename(&data_type, &name);
163            let file_path = scan_dir.join(&file_name);
164            if let Some(parent) = file_path.parent() {
165                fs::create_dir_all(parent)
166                    .await
167                    .map_err(|e| StorageError::Other(format!("create parent dir: {}", e)))?;
168            }
169            fs::write(&file_path, &domain_item.data)
170                .await
171                .map_err(|e| StorageError::Other(format!("write temp file: {}", e)))?;
172
173            let extracted_paths = Vec::new();
174
175            let relative_path = file_path
176                .strip_prefix(&root)
177                .unwrap_or(&file_path)
178                .to_path_buf();
179
180            parts.push(DownloadedPart {
181                id: Some(domain_item.metadata.id),
182                name: Some(name),
183                data_type: Some(data_type),
184                domain_id: Some(domain_item.metadata.domain_id),
185                path: file_path,
186                root: root.clone(),
187                relative_path,
188                extracted_paths,
189            });
190        }
191
192        if parts.is_empty() {
193            return Err(StorageError::NotFound);
194        }
195
196        Ok(parts)
197    }
198
199    pub async fn upload_artifact(
200        &self,
201        request: UploadRequest<'_>,
202    ) -> std::result::Result<Option<String>, StorageError> {
203        let domain_id = request.domain_id.trim();
204        if domain_id.is_empty() {
205            return Err(StorageError::Other(
206                "missing domain_id for artifact upload".into(),
207            ));
208        }
209        let action = if let Some(id) = request.existing_id {
210            posemesh_domain_http::domain_data::DomainAction::Update { id: id.to_string() }
211        } else {
212            posemesh_domain_http::domain_data::DomainAction::Create {
213                name: request.name.to_string(),
214                data_type: request.data_type.to_string(),
215            }
216        };
217
218        let upload = posemesh_domain_http::domain_data::UploadDomainData {
219            action,
220            data: request.bytes.to_vec(),
221        };
222
223        let base = self.base.as_str().trim_end_matches('/');
224        let method = if request.existing_id.is_some() {
225            Method::PUT
226        } else {
227            Method::POST
228        };
229        tracing::debug!(
230            target: "posemesh_compute_node::storage::client",
231            method = %method,
232            url = %format!("{}/api/v1/domains/{}/data", base, domain_id),
233            logical_path = request.logical_path,
234            name = request.name,
235            data_type = request.data_type,
236            has_existing_id = request.existing_id.is_some(),
237            "Sending domain upload request"
238        );
239
240        let items = posemesh_domain_http::domain_data::upload_v1(
241            base,
242            self.token.get().as_str(),
243            domain_id,
244            vec![upload],
245        )
246        .await
247        .map_err(map_domain_error)?;
248
249        Ok(items.into_iter().next().map(|d| d.id))
250    }
251
252    pub async fn find_artifact_id(
253        &self,
254        domain_id: &str,
255        name: &str,
256        data_type: &str,
257    ) -> std::result::Result<Option<String>, StorageError> {
258        let domain_id = domain_id.trim();
259        if domain_id.is_empty() {
260            return Err(StorageError::Other(
261                "missing domain_id for artifact lookup".into(),
262            ));
263        }
264
265        let query = posemesh_domain_http::domain_data::DownloadQuery {
266            ids: Vec::new(),
267            name: Some(name.to_string()),
268            data_type: Some(data_type.to_string()),
269        };
270
271        let base = self.base.as_str().trim_end_matches('/');
272        let url = format!("{}/api/v1/domains/{}/data", base, domain_id);
273        tracing::debug!(
274            target: "posemesh_compute_node::storage::client",
275            method = "GET",
276            %url,
277            artifact_name = name,
278            artifact_type = data_type,
279            "Looking up existing domain artifact"
280        );
281
282        let results = posemesh_domain_http::domain_data::download_metadata_v1(
283            base,
284            self.client_id.as_str(),
285            self.token.get().as_str(),
286            domain_id,
287            &query,
288        )
289        .await;
290
291        let results = match results {
292            Ok(items) => items,
293            Err(err) => {
294                if let posemesh_domain_http::errors::DomainError::AukiErrorResponse(resp) = &err {
295                    if resp.status == reqwest::StatusCode::NOT_FOUND {
296                        return Ok(None);
297                    }
298                }
299                return Err(map_domain_error(err));
300            }
301        };
302
303        Ok(results
304            .into_iter()
305            .find(|item| item.name == name && item.data_type == data_type)
306            .map(|item| item.id))
307    }
308}
309
310fn map_status(status: reqwest::StatusCode) -> StorageError {
311    match status.as_u16() {
312        400 => StorageError::BadRequest,
313        401 => StorageError::Unauthorized,
314        404 => StorageError::NotFound,
315        409 => StorageError::Conflict,
316        n if (500..=599).contains(&n) => StorageError::Server(n),
317        other => StorageError::Other(format!("unexpected status: {}", other)),
318    }
319}
320
321fn map_domain_error(err: posemesh_domain_http::errors::DomainError) -> StorageError {
322    use posemesh_domain_http::errors::{AuthError, DomainError};
323
324    match err {
325        DomainError::AukiErrorResponse(resp) => map_status(resp.status),
326        DomainError::ReqwestError(e) => StorageError::Network(e.to_string()),
327        DomainError::AuthError(AuthError::Unauthorized(_)) => StorageError::Unauthorized,
328        other => StorageError::Other(other.to_string()),
329    }
330}
331
332fn env_client_id() -> String {
333    std::env::var("CLIENT_ID")
334        .ok()
335        .map(|v| v.trim().to_string())
336        .filter(|v| !v.is_empty())
337        .unwrap_or_else(|| format!("posemesh-compute-node/{}", Uuid::new_v4()))
338}
339
340fn resolve_domain_url(base: &Url, value: &str) -> std::result::Result<Url, StorageError> {
341    if value.contains("://") {
342        Url::parse(value).map_err(|e| StorageError::Other(format!("parse domain url: {e}")))
343    } else {
344        base.join(value)
345            .map_err(|e| StorageError::Other(format!("join domain url: {e}")))
346    }
347}
348
349fn parse_download_target(
350    url: &Url,
351    fallback_domain_id: Option<&str>,
352) -> std::result::Result<(String, posemesh_domain_http::domain_data::DownloadQuery), StorageError> {
353    let segments: Vec<&str> = url
354        .path_segments()
355        .map(|segments| segments.filter(|seg| !seg.is_empty()).collect())
356        .unwrap_or_default();
357
358    let mut domain_id_from_path: Option<&str> = None;
359    let mut data_id_from_path: Option<&str> = None;
360
361    for idx in 0..segments.len() {
362        if segments[idx] == "domains" && idx + 2 < segments.len() && segments[idx + 2] == "data" {
363            domain_id_from_path = Some(segments[idx + 1]);
364            data_id_from_path = segments.get(idx + 3).copied();
365            break;
366        }
367    }
368
369    let domain_id = domain_id_from_path
370        .or(fallback_domain_id)
371        .ok_or_else(|| StorageError::Other(format!("cid url missing domain_id: {}", url)))?
372        .to_string();
373
374    let mut ids: Vec<String> = Vec::new();
375    let mut name: Option<String> = None;
376    let mut data_type: Option<String> = None;
377
378    for (key, value) in url.query_pairs() {
379        match key.as_ref() {
380            "ids" => ids.extend(
381                value
382                    .split(',')
383                    .map(|s| s.trim())
384                    .filter(|s| !s.is_empty())
385                    .map(|s| s.to_string()),
386            ),
387            "name" => {
388                if name.is_none() {
389                    name = Some(value.to_string());
390                }
391            }
392            "data_type" => {
393                if data_type.is_none() {
394                    data_type = Some(value.to_string());
395                }
396            }
397            _ => {}
398        }
399    }
400
401    if let Some(id) = data_id_from_path {
402        ids = vec![id.to_string()];
403    }
404
405    Ok((
406        domain_id,
407        posemesh_domain_http::domain_data::DownloadQuery {
408            ids,
409            name,
410            data_type,
411        },
412    ))
413}
414
415// no parse_disposition_params; headers are parsed in posemesh-domain-http
416
417fn sanitize_component(value: &str) -> String {
418    let sanitized: String = value
419        .chars()
420        .map(|c| {
421            if c.is_ascii_alphanumeric() || matches!(c, '-' | '_') {
422                c
423            } else {
424                '_'
425            }
426        })
427        .collect();
428    if sanitized.is_empty() {
429        "part".into()
430    } else {
431        sanitized
432    }
433}
434
435fn extract_timestamp(name: &str) -> Option<String> {
436    Regex::new(r"\d{4}-\d{2}-\d{2}[_-]\d{2}-\d{2}-\d{2}")
437        .ok()
438        .and_then(|re| re.find(name).map(|m| m.as_str().to_string()))
439}
440
441fn map_filename(data_type: &str, name: &str) -> String {
442    format!(
443        "{}.{}",
444        sanitize_component(name),
445        sanitize_component(data_type)
446    )
447}