Skip to main content

posemesh_compute_node/storage/
client.rs

1use crate::errors::StorageError;
2use crate::storage::token::TokenRef;
3use anyhow::Result;
4use futures::StreamExt;
5use regex::Regex;
6use reqwest::Method;
7use serde::{Deserialize, Serialize};
8use std::path::{Path, PathBuf};
9use std::time::Duration;
10use tokio::fs;
11use url::Url;
12use uuid::Uuid;
13// zip extraction moved to reconstruction-specific runner
14
15/// Representation of one multipart section downloaded from Domain.
16#[derive(Debug, Clone)]
17pub struct DownloadedPart {
18    pub id: Option<String>,
19    pub name: Option<String>,
20    pub data_type: Option<String>,
21    pub domain_id: Option<String>,
22    pub path: PathBuf,
23    pub root: PathBuf,
24    pub relative_path: PathBuf,
25    pub extracted_paths: Vec<PathBuf>,
26}
27
28#[derive(Debug)]
29pub struct UploadRequest<'a> {
30    pub domain_id: &'a str,
31    pub name: &'a str,
32    pub data_type: &'a str,
33    pub logical_path: &'a str,
34    pub bytes: &'a [u8],
35    pub existing_id: Option<&'a str>,
36}
37
38#[derive(Debug)]
39pub struct UploadFileRequest<'a> {
40    pub domain_id: &'a str,
41    pub name: &'a str,
42    pub data_type: &'a str,
43    pub logical_path: &'a str,
44    pub path: &'a Path,
45    pub existing_id: Option<&'a str>,
46}
47
48#[derive(Debug, Serialize)]
49struct InitiateMultipartRequestV1 {
50    name: String,
51    data_type: String,
52    size: Option<i64>,
53    content_type: Option<String>,
54    existing_id: Option<String>,
55}
56
57#[derive(Debug, Deserialize)]
58struct InitiateMultipartResponseV1 {
59    upload_id: String,
60    part_size: i64,
61}
62
63#[derive(Debug, Deserialize)]
64struct UploadPartResultV1 {
65    etag: String,
66}
67
68#[derive(Debug, Serialize)]
69struct CompletedPartV1 {
70    part_number: i32,
71    etag: String,
72}
73
74#[derive(Debug, Serialize)]
75struct CompleteMultipartRequestV1 {
76    parts: Vec<CompletedPartV1>,
77}
78
79#[derive(Debug, Deserialize)]
80struct DomainDataMetadataV1 {
81    id: String,
82}
83
84/// Domain server HTTP client (skeleton; HTTP added later).
85#[derive(Clone)]
86pub struct DomainClient {
87    pub base: Url,
88    pub token: TokenRef,
89    client_id: String,
90}
91impl DomainClient {
92    pub fn new(base: Url, token: TokenRef) -> Result<Self> {
93        let client_id = env_client_id();
94        Ok(Self {
95            base,
96            token,
97            client_id,
98        })
99    }
100
101    pub fn with_timeout(base: Url, token: TokenRef, _timeout: Duration) -> Result<Self> {
102        let client_id = env_client_id();
103        Ok(Self {
104            base,
105            token,
106            client_id,
107        })
108    }
109
110    /// Download a Domain data item referenced by an absolute URI, persisting each multipart
111    /// part into a temporary file and returning its metadata.
112    pub async fn download_uri(
113        &self,
114        uri: &str,
115    ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
116        let resolved = resolve_domain_url(&self.base, uri)?;
117        let (domain_id, query) = parse_download_target(&resolved, None)?;
118        self.download_domain_data(&resolved, &domain_id, query)
119            .await
120    }
121
122    /// Download domain data referenced by a CID, which can be either:
123    /// - a bare domain-data ID (UUID), or
124    /// - an absolute/relative URL under the domain server.
125    pub async fn download_cid(
126        &self,
127        domain_id: &str,
128        cid: &str,
129    ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
130        let cid = cid.trim();
131        if cid.is_empty() {
132            return Err(StorageError::Other("empty cid".into()));
133        }
134
135        if cid.contains("://") || cid.starts_with('/') {
136            let resolved = resolve_domain_url(&self.base, cid)?;
137            let (domain_id, query) = parse_download_target(&resolved, Some(domain_id))?;
138            return self
139                .download_domain_data(&resolved, &domain_id, query)
140                .await;
141        }
142
143        let query = posemesh_domain_http::domain_data::DownloadQuery {
144            ids: vec![cid.to_string()],
145            name: None,
146            data_type: None,
147        };
148        self.download_domain_data(&self.base, domain_id, query)
149            .await
150    }
151
152    async fn download_domain_data(
153        &self,
154        url_for_log: &Url,
155        domain_id: &str,
156        query: posemesh_domain_http::domain_data::DownloadQuery,
157    ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
158        let domain_id = domain_id.trim();
159        if domain_id.is_empty() {
160            return Err(StorageError::Other("missing domain_id for download".into()));
161        }
162
163        tracing::debug!(
164            target: "posemesh_compute_node::storage::client",
165            method = "GET",
166            %url_for_log,
167            domain_id = domain_id,
168            ids = ?query.ids,
169            name = ?query.name,
170            data_type = ?query.data_type,
171            "Downloading domain data"
172        );
173
174        let base = self.base.as_str().trim_end_matches('/');
175        let mut rx = posemesh_domain_http::domain_data::download_v1_stream(
176            base,
177            self.client_id.as_str(),
178            self.token.get().as_str(),
179            domain_id,
180            &query,
181        )
182        .await
183        .map_err(map_domain_error)?;
184
185        let root = std::env::temp_dir().join(format!("domain-input-{}", Uuid::new_v4()));
186        fs::create_dir_all(&root)
187            .await
188            .map_err(|e| StorageError::Other(format!("create download root: {}", e)))?;
189        let datasets_root = root.join("datasets");
190        fs::create_dir_all(&datasets_root)
191            .await
192            .map_err(|e| StorageError::Other(format!("create datasets root: {}", e)))?;
193
194        let mut parts = Vec::new();
195
196        while let Some(item) = rx.next().await {
197            let domain_item = item.map_err(map_domain_error)?;
198            let name = domain_item.metadata.name.clone();
199            let data_type = domain_item.metadata.data_type.clone();
200
201            let scan_folder = extract_timestamp(&name)
202                .map(|ts| sanitize_component(&ts))
203                .unwrap_or_else(|| sanitize_component(&name));
204            let scan_dir = datasets_root.join(&scan_folder);
205            fs::create_dir_all(&scan_dir)
206                .await
207                .map_err(|e| StorageError::Other(format!("create scan dir: {}", e)))?;
208
209            let file_name = map_filename(&data_type, &name);
210            let file_path = scan_dir.join(&file_name);
211            if let Some(parent) = file_path.parent() {
212                fs::create_dir_all(parent)
213                    .await
214                    .map_err(|e| StorageError::Other(format!("create parent dir: {}", e)))?;
215            }
216            fs::write(&file_path, &domain_item.data)
217                .await
218                .map_err(|e| StorageError::Other(format!("write temp file: {}", e)))?;
219
220            let extracted_paths = Vec::new();
221
222            let relative_path = file_path
223                .strip_prefix(&root)
224                .unwrap_or(&file_path)
225                .to_path_buf();
226
227            parts.push(DownloadedPart {
228                id: Some(domain_item.metadata.id),
229                name: Some(name),
230                data_type: Some(data_type),
231                domain_id: Some(domain_item.metadata.domain_id),
232                path: file_path,
233                root: root.clone(),
234                relative_path,
235                extracted_paths,
236            });
237        }
238
239        if parts.is_empty() {
240            return Err(StorageError::NotFound);
241        }
242
243        Ok(parts)
244    }
245
246    pub async fn upload_artifact(
247        &self,
248        request: UploadRequest<'_>,
249    ) -> std::result::Result<Option<String>, StorageError> {
250        let domain_id = request.domain_id.trim();
251        if domain_id.is_empty() {
252            return Err(StorageError::Other(
253                "missing domain_id for artifact upload".into(),
254            ));
255        }
256
257        let base = self.base.as_str().trim_end_matches('/');
258        if let Some(info) = posemesh_domain_http::domain_data::get_upload_info_v1(base).await {
259            if info.multipart_enabled && info.request_max_bytes > 0 {
260                let fits_alone = fits_single_upload_request(
261                    info.request_max_bytes,
262                    request.name,
263                    request.data_type,
264                    request.existing_id,
265                    request.bytes.len(),
266                );
267                if !fits_alone {
268                    return self
269                        .upload_artifact_v1_multipart(base, domain_id, request)
270                        .await;
271                }
272            }
273        }
274
275        let action = if let Some(id) = request.existing_id {
276            posemesh_domain_http::domain_data::DomainAction::Update { id: id.to_string() }
277        } else {
278            posemesh_domain_http::domain_data::DomainAction::Create {
279                name: request.name.to_string(),
280                data_type: request.data_type.to_string(),
281            }
282        };
283
284        let upload = posemesh_domain_http::domain_data::UploadDomainData {
285            action,
286            data: request.bytes.to_vec(),
287        };
288        let method = if request.existing_id.is_some() {
289            Method::PUT
290        } else {
291            Method::POST
292        };
293        tracing::debug!(
294            target: "posemesh_compute_node::storage::client",
295            method = %method,
296            url = %format!("{}/api/v1/domains/{}/data", base, domain_id),
297            logical_path = request.logical_path,
298            name = request.name,
299            data_type = request.data_type,
300            has_existing_id = request.existing_id.is_some(),
301            "Sending domain upload request"
302        );
303
304        let items = posemesh_domain_http::domain_data::upload_v1(
305            base,
306            self.token.get().as_str(),
307            domain_id,
308            vec![upload],
309        )
310        .await
311        .map_err(map_domain_error)?;
312
313        Ok(items.into_iter().next().map(|d| d.id))
314    }
315
316    pub async fn upload_artifact_file(
317        &self,
318        request: UploadFileRequest<'_>,
319    ) -> std::result::Result<Option<String>, StorageError> {
320        let domain_id = request.domain_id.trim();
321        if domain_id.is_empty() {
322            return Err(StorageError::Other(
323                "missing domain_id for artifact upload".into(),
324            ));
325        }
326
327        let base = self.base.as_str().trim_end_matches('/');
328
329        let mut file = fs::File::open(request.path)
330            .await
331            .map_err(|e| StorageError::Other(format!("open upload file: {}", e)))?;
332
333        let file_size = match file.metadata().await {
334            Ok(meta) => {
335                if meta.len() == 0 {
336                    return Err(StorageError::BadRequest);
337                }
338                i64::try_from(meta.len()).ok()
339            }
340            Err(_) => None,
341        };
342
343        match self
344            .upload_artifact_v1_multipart_file(base, domain_id, &request, &mut file, file_size)
345            .await
346        {
347            Ok(v) => Ok(v),
348            Err(UploadFileFallback::UnsupportedEndpoint) => {
349                // Fall back to legacy multipart/form-data upload (in-memory) for older servers.
350                let bytes_owned = fs::read(request.path)
351                    .await
352                    .map_err(|e| StorageError::Other(format!("read upload file: {}", e)))?;
353                if bytes_owned.is_empty() {
354                    return Err(StorageError::BadRequest);
355                }
356                self.upload_artifact(UploadRequest {
357                    domain_id: request.domain_id,
358                    name: request.name,
359                    data_type: request.data_type,
360                    logical_path: request.logical_path,
361                    bytes: bytes_owned.as_slice(),
362                    existing_id: request.existing_id,
363                })
364                .await
365            }
366            Err(UploadFileFallback::Error(e)) => Err(e),
367        }
368    }
369
370    async fn upload_artifact_v1_multipart(
371        &self,
372        base: &str,
373        domain_id: &str,
374        request: UploadRequest<'_>,
375    ) -> std::result::Result<Option<String>, StorageError> {
376        if request.bytes.is_empty() {
377            return Err(StorageError::BadRequest);
378        }
379
380        let client = reqwest::Client::new();
381        let initiate_endpoint = format!(
382            "{}/api/v1/domains/{}/data/multipart?uploads",
383            base, domain_id
384        );
385
386        let init_req = InitiateMultipartRequestV1 {
387            name: request.name.to_string(),
388            data_type: request.data_type.to_string(),
389            size: Some(request.bytes.len() as i64),
390            content_type: Some("application/octet-stream".to_string()),
391            existing_id: request.existing_id.map(|id| id.to_string()),
392        };
393
394        tracing::debug!(
395            target: "posemesh_compute_node::storage::client",
396            method = "POST",
397            url = %initiate_endpoint,
398            logical_path = request.logical_path,
399            name = request.name,
400            data_type = request.data_type,
401            has_existing_id = request.existing_id.is_some(),
402            "Initiating multipart upload"
403        );
404
405        let init_resp = client
406            .post(&initiate_endpoint)
407            .bearer_auth(self.token.get())
408            .header("posemesh-client-id", self.client_id.as_str())
409            .header("Content-Type", "application/json")
410            .json(&init_req)
411            .send()
412            .await
413            .map_err(|e| StorageError::Network(e.to_string()))?;
414
415        if !init_resp.status().is_success() {
416            let status = init_resp.status();
417            let err = init_resp
418                .text()
419                .await
420                .unwrap_or_else(|_| "Unknown error".to_string());
421            tracing::warn!(
422                target: "posemesh_compute_node::storage::client",
423                %status,
424                error = %err,
425                "Multipart initiation failed"
426            );
427            return Err(map_status(status));
428        }
429
430        let init: InitiateMultipartResponseV1 = init_resp
431            .json()
432            .await
433            .map_err(|e| StorageError::Other(format!("invalid initiate response: {}", e)))?;
434
435        let part_size = usize::try_from(init.part_size)
436            .map_err(|_| StorageError::Other("invalid multipart part_size".into()))?;
437        if part_size == 0 {
438            return Err(StorageError::Other("invalid multipart part_size".into()));
439        }
440
441        let upload_id = init.upload_id;
442        let mut completed_parts: Vec<CompletedPartV1> = Vec::new();
443
444        let upload_res: std::result::Result<DomainDataMetadataV1, StorageError> = async {
445            let mut offset: usize = 0;
446            let mut part_number: i32 = 1;
447            while offset < request.bytes.len() {
448                let end = std::cmp::min(offset + part_size, request.bytes.len());
449                let chunk = request.bytes[offset..end].to_vec();
450
451                let part_endpoint = format!(
452                    "{}/api/v1/domains/{}/data/multipart?uploadId={}&partNumber={}",
453                    base, domain_id, upload_id, part_number
454                );
455                tracing::debug!(
456                    target: "posemesh_compute_node::storage::client",
457                    method = "PUT",
458                    url = %part_endpoint,
459                    part_number,
460                    part_bytes = chunk.len(),
461                    "Uploading multipart part"
462                );
463
464                let resp = client
465                    .put(&part_endpoint)
466                    .bearer_auth(self.token.get())
467                    .header("posemesh-client-id", self.client_id.as_str())
468                    .header("Content-Type", "application/octet-stream")
469                    .body(chunk)
470                    .send()
471                    .await
472                    .map_err(|e| StorageError::Network(e.to_string()))?;
473
474                if !resp.status().is_success() {
475                    let status = resp.status();
476                    let err = resp
477                        .text()
478                        .await
479                        .unwrap_or_else(|_| "Unknown error".to_string());
480                    tracing::warn!(
481                        target: "posemesh_compute_node::storage::client",
482                        %status,
483                        error = %err,
484                        part_number,
485                        "Multipart part upload failed"
486                    );
487                    return Err(map_status(status));
488                }
489
490                let res: UploadPartResultV1 = resp
491                    .json()
492                    .await
493                    .map_err(|e| StorageError::Other(format!("invalid part response: {}", e)))?;
494
495                completed_parts.push(CompletedPartV1 {
496                    part_number,
497                    etag: res.etag,
498                });
499
500                offset = end;
501                part_number = part_number
502                    .checked_add(1)
503                    .ok_or_else(|| StorageError::Other("multipart upload too many parts".into()))?;
504            }
505
506            let complete_endpoint = format!(
507                "{}/api/v1/domains/{}/data/multipart?uploadId={}",
508                base, domain_id, upload_id
509            );
510            tracing::debug!(
511                target: "posemesh_compute_node::storage::client",
512                method = "POST",
513                url = %complete_endpoint,
514                parts = completed_parts.len(),
515                "Completing multipart upload"
516            );
517            let resp = client
518                .post(&complete_endpoint)
519                .bearer_auth(self.token.get())
520                .header("posemesh-client-id", self.client_id.as_str())
521                .header("Content-Type", "application/json")
522                .json(&CompleteMultipartRequestV1 {
523                    parts: completed_parts,
524                })
525                .send()
526                .await
527                .map_err(|e| StorageError::Network(e.to_string()))?;
528
529            if !resp.status().is_success() {
530                let status = resp.status();
531                let err = resp
532                    .text()
533                    .await
534                    .unwrap_or_else(|_| "Unknown error".to_string());
535                tracing::warn!(
536                    target: "posemesh_compute_node::storage::client",
537                    %status,
538                    error = %err,
539                    "Multipart completion failed"
540                );
541                return Err(map_status(status));
542            }
543
544            resp.json::<DomainDataMetadataV1>()
545                .await
546                .map_err(|e| StorageError::Other(format!("invalid complete response: {}", e)))
547        }
548        .await;
549
550        if upload_res.is_err() {
551            let abort_endpoint = format!(
552                "{}/api/v1/domains/{}/data/multipart?uploadId={}",
553                base, domain_id, upload_id
554            );
555            let _ = client
556                .delete(&abort_endpoint)
557                .bearer_auth(self.token.get())
558                .header("posemesh-client-id", self.client_id.as_str())
559                .send()
560                .await;
561        }
562
563        upload_res.map(|meta| Some(meta.id))
564    }
565
566    async fn upload_artifact_v1_multipart_file(
567        &self,
568        base: &str,
569        domain_id: &str,
570        request: &UploadFileRequest<'_>,
571        file: &mut fs::File,
572        file_size: Option<i64>,
573    ) -> std::result::Result<Option<String>, UploadFileFallback> {
574        use tokio::io::AsyncReadExt;
575
576        let client = reqwest::Client::new();
577        let initiate_endpoint = format!(
578            "{}/api/v1/domains/{}/data/multipart?uploads",
579            base, domain_id
580        );
581
582        let init_req = InitiateMultipartRequestV1 {
583            name: request.name.to_string(),
584            data_type: request.data_type.to_string(),
585            size: file_size,
586            content_type: Some("application/octet-stream".to_string()),
587            existing_id: request.existing_id.map(|id| id.to_string()),
588        };
589
590        tracing::debug!(
591            target: "posemesh_compute_node::storage::client",
592            method = "POST",
593            url = %initiate_endpoint,
594            logical_path = request.logical_path,
595            name = request.name,
596            data_type = request.data_type,
597            has_existing_id = request.existing_id.is_some(),
598            "Initiating multipart file upload"
599        );
600
601        let init_resp = client
602            .post(&initiate_endpoint)
603            .bearer_auth(self.token.get())
604            .header("posemesh-client-id", self.client_id.as_str())
605            .header("Content-Type", "application/json")
606            .json(&init_req)
607            .send()
608            .await
609            .map_err(|e| UploadFileFallback::Error(StorageError::Network(e.to_string())))?;
610
611        if !init_resp.status().is_success() {
612            let status = init_resp.status();
613            let err = init_resp
614                .text()
615                .await
616                .unwrap_or_else(|_| "Unknown error".to_string());
617            tracing::warn!(
618                target: "posemesh_compute_node::storage::client",
619                %status,
620                error = %err,
621                "Multipart file initiation failed"
622            );
623            if is_unsupported_endpoint_status(status) {
624                return Err(UploadFileFallback::UnsupportedEndpoint);
625            }
626            return Err(UploadFileFallback::Error(map_status(status)));
627        }
628
629        let init: InitiateMultipartResponseV1 = init_resp.json().await.map_err(|e| {
630            UploadFileFallback::Error(StorageError::Other(format!(
631                "invalid initiate response: {}",
632                e
633            )))
634        })?;
635
636        let part_size = usize::try_from(init.part_size).map_err(|_| {
637            UploadFileFallback::Error(StorageError::Other("invalid multipart part_size".into()))
638        })?;
639        if part_size == 0 {
640            return Err(UploadFileFallback::Error(StorageError::Other(
641                "invalid multipart part_size".into(),
642            )));
643        }
644
645        let upload_id = init.upload_id;
646        let mut completed_parts: Vec<CompletedPartV1> = Vec::new();
647
648        let upload_res: std::result::Result<DomainDataMetadataV1, StorageError> = async {
649            let mut part_number: i32 = 1;
650            loop {
651                let mut chunk = vec![0u8; part_size];
652                let mut read = 0usize;
653                while read < part_size {
654                    let n = file
655                        .read(&mut chunk[read..])
656                        .await
657                        .map_err(|e| StorageError::Other(format!("read upload file: {}", e)))?;
658                    if n == 0 {
659                        break;
660                    }
661                    read += n;
662                }
663                if read == 0 {
664                    break;
665                }
666                chunk.truncate(read);
667
668                let part_endpoint = format!(
669                    "{}/api/v1/domains/{}/data/multipart?uploadId={}&partNumber={}",
670                    base, domain_id, upload_id, part_number
671                );
672                tracing::debug!(
673                    target: "posemesh_compute_node::storage::client",
674                    method = "PUT",
675                    url = %part_endpoint,
676                    part_number,
677                    part_bytes = chunk.len(),
678                    "Uploading multipart file part"
679                );
680
681                let resp = client
682                    .put(&part_endpoint)
683                    .bearer_auth(self.token.get())
684                    .header("posemesh-client-id", self.client_id.as_str())
685                    .header("Content-Type", "application/octet-stream")
686                    .body(chunk)
687                    .send()
688                    .await
689                    .map_err(|e| StorageError::Network(e.to_string()))?;
690
691                if !resp.status().is_success() {
692                    let status = resp.status();
693                    let err = resp
694                        .text()
695                        .await
696                        .unwrap_or_else(|_| "Unknown error".to_string());
697                    tracing::warn!(
698                        target: "posemesh_compute_node::storage::client",
699                        %status,
700                        error = %err,
701                        part_number,
702                        "Multipart file part upload failed"
703                    );
704                    return Err(map_status(status));
705                }
706
707                let res: UploadPartResultV1 = resp
708                    .json()
709                    .await
710                    .map_err(|e| StorageError::Other(format!("invalid part response: {}", e)))?;
711
712                completed_parts.push(CompletedPartV1 {
713                    part_number,
714                    etag: res.etag,
715                });
716
717                part_number = part_number
718                    .checked_add(1)
719                    .ok_or_else(|| StorageError::Other("multipart upload too many parts".into()))?;
720            }
721
722            if completed_parts.is_empty() {
723                return Err(StorageError::BadRequest);
724            }
725
726            let complete_endpoint = format!(
727                "{}/api/v1/domains/{}/data/multipart?uploadId={}",
728                base, domain_id, upload_id
729            );
730            tracing::debug!(
731                target: "posemesh_compute_node::storage::client",
732                method = "POST",
733                url = %complete_endpoint,
734                parts = completed_parts.len(),
735                "Completing multipart file upload"
736            );
737            let resp = client
738                .post(&complete_endpoint)
739                .bearer_auth(self.token.get())
740                .header("posemesh-client-id", self.client_id.as_str())
741                .header("Content-Type", "application/json")
742                .json(&CompleteMultipartRequestV1 {
743                    parts: completed_parts,
744                })
745                .send()
746                .await
747                .map_err(|e| StorageError::Network(e.to_string()))?;
748
749            if !resp.status().is_success() {
750                let status = resp.status();
751                let err = resp
752                    .text()
753                    .await
754                    .unwrap_or_else(|_| "Unknown error".to_string());
755                tracing::warn!(
756                    target: "posemesh_compute_node::storage::client",
757                    %status,
758                    error = %err,
759                    "Multipart file completion failed"
760                );
761                return Err(map_status(status));
762            }
763
764            resp.json::<DomainDataMetadataV1>()
765                .await
766                .map_err(|e| StorageError::Other(format!("invalid complete response: {}", e)))
767        }
768        .await;
769
770        if upload_res.is_err() {
771            let abort_endpoint = format!(
772                "{}/api/v1/domains/{}/data/multipart?uploadId={}",
773                base, domain_id, upload_id
774            );
775            let _ = client
776                .delete(&abort_endpoint)
777                .bearer_auth(self.token.get())
778                .header("posemesh-client-id", self.client_id.as_str())
779                .send()
780                .await;
781        }
782
783        upload_res
784            .map(|meta| Some(meta.id))
785            .map_err(UploadFileFallback::Error)
786    }
787
788    pub async fn find_artifact_id(
789        &self,
790        domain_id: &str,
791        name: &str,
792        data_type: &str,
793    ) -> std::result::Result<Option<String>, StorageError> {
794        let domain_id = domain_id.trim();
795        if domain_id.is_empty() {
796            return Err(StorageError::Other(
797                "missing domain_id for artifact lookup".into(),
798            ));
799        }
800
801        let query = posemesh_domain_http::domain_data::DownloadQuery {
802            ids: Vec::new(),
803            name: Some(name.to_string()),
804            data_type: Some(data_type.to_string()),
805        };
806
807        let base = self.base.as_str().trim_end_matches('/');
808        let url = format!("{}/api/v1/domains/{}/data", base, domain_id);
809        tracing::debug!(
810            target: "posemesh_compute_node::storage::client",
811            method = "GET",
812            %url,
813            artifact_name = name,
814            artifact_type = data_type,
815            "Looking up existing domain artifact"
816        );
817
818        let results = posemesh_domain_http::domain_data::download_metadata_v1(
819            base,
820            self.client_id.as_str(),
821            self.token.get().as_str(),
822            domain_id,
823            &query,
824        )
825        .await;
826
827        let results = match results {
828            Ok(items) => items,
829            Err(err) => {
830                if let posemesh_domain_http::errors::DomainError::AukiErrorResponse(resp) = &err {
831                    if resp.status == reqwest::StatusCode::NOT_FOUND {
832                        return Ok(None);
833                    }
834                }
835                return Err(map_domain_error(err));
836            }
837        };
838
839        Ok(results
840            .into_iter()
841            .find(|item| item.name == name && item.data_type == data_type)
842            .map(|item| item.id))
843    }
844}
845
846fn fits_single_upload_request(
847    request_max_bytes: i64,
848    name: &str,
849    data_type: &str,
850    existing_id: Option<&str>,
851    data_len: usize,
852) -> bool {
853    let boundary = "boundary";
854    let header = if let Some(id) = existing_id {
855        format!(
856            "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; id=\"{}\"\r\n\r\n",
857            boundary, id
858        )
859    } else {
860        format!(
861            "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
862            boundary, name, data_type
863        )
864    };
865    let closing = format!("--{}--\r\n", boundary);
866    let part_len = header.len() + data_len + 2;
867    (part_len + closing.len()) as i64 <= request_max_bytes
868}
869
870fn map_status(status: reqwest::StatusCode) -> StorageError {
871    match status.as_u16() {
872        400 => StorageError::BadRequest,
873        401 => StorageError::Unauthorized,
874        404 => StorageError::NotFound,
875        409 => StorageError::Conflict,
876        n if (500..=599).contains(&n) => StorageError::Server(n),
877        other => StorageError::Other(format!("unexpected status: {}", other)),
878    }
879}
880
881fn is_unsupported_endpoint_status(status: reqwest::StatusCode) -> bool {
882    status == reqwest::StatusCode::NOT_FOUND
883        || status == reqwest::StatusCode::METHOD_NOT_ALLOWED
884        || status == reqwest::StatusCode::NOT_IMPLEMENTED
885}
886
887enum UploadFileFallback {
888    UnsupportedEndpoint,
889    Error(StorageError),
890}
891
892fn map_domain_error(err: posemesh_domain_http::errors::DomainError) -> StorageError {
893    use posemesh_domain_http::errors::{AuthError, DomainError};
894
895    match err {
896        DomainError::AukiErrorResponse(resp) => map_status(resp.status),
897        DomainError::ReqwestError(e) => StorageError::Network(e.to_string()),
898        DomainError::AuthError(AuthError::Unauthorized(_)) => StorageError::Unauthorized,
899        other => StorageError::Other(other.to_string()),
900    }
901}
902
903fn env_client_id() -> String {
904    std::env::var("CLIENT_ID")
905        .ok()
906        .map(|v| v.trim().to_string())
907        .filter(|v| !v.is_empty())
908        .unwrap_or_else(|| format!("posemesh-compute-node/{}", Uuid::new_v4()))
909}
910
911fn resolve_domain_url(base: &Url, value: &str) -> std::result::Result<Url, StorageError> {
912    if value.contains("://") {
913        Url::parse(value).map_err(|e| StorageError::Other(format!("parse domain url: {e}")))
914    } else {
915        base.join(value)
916            .map_err(|e| StorageError::Other(format!("join domain url: {e}")))
917    }
918}
919
920fn parse_download_target(
921    url: &Url,
922    fallback_domain_id: Option<&str>,
923) -> std::result::Result<(String, posemesh_domain_http::domain_data::DownloadQuery), StorageError> {
924    let segments: Vec<&str> = url
925        .path_segments()
926        .map(|segments| segments.filter(|seg| !seg.is_empty()).collect())
927        .unwrap_or_default();
928
929    let mut domain_id_from_path: Option<&str> = None;
930    let mut data_id_from_path: Option<&str> = None;
931
932    for idx in 0..segments.len() {
933        if segments[idx] == "domains" && idx + 2 < segments.len() && segments[idx + 2] == "data" {
934            domain_id_from_path = Some(segments[idx + 1]);
935            data_id_from_path = segments.get(idx + 3).copied();
936            break;
937        }
938    }
939
940    let domain_id = domain_id_from_path
941        .or(fallback_domain_id)
942        .ok_or_else(|| StorageError::Other(format!("cid url missing domain_id: {}", url)))?
943        .to_string();
944
945    let mut ids: Vec<String> = Vec::new();
946    let mut name: Option<String> = None;
947    let mut data_type: Option<String> = None;
948
949    for (key, value) in url.query_pairs() {
950        match key.as_ref() {
951            "ids" => ids.extend(
952                value
953                    .split(',')
954                    .map(|s| s.trim())
955                    .filter(|s| !s.is_empty())
956                    .map(|s| s.to_string()),
957            ),
958            "name" => {
959                if name.is_none() {
960                    name = Some(value.to_string());
961                }
962            }
963            "data_type" => {
964                if data_type.is_none() {
965                    data_type = Some(value.to_string());
966                }
967            }
968            _ => {}
969        }
970    }
971
972    if let Some(id) = data_id_from_path {
973        ids = vec![id.to_string()];
974    }
975
976    Ok((
977        domain_id,
978        posemesh_domain_http::domain_data::DownloadQuery {
979            ids,
980            name,
981            data_type,
982        },
983    ))
984}
985
986// no parse_disposition_params; headers are parsed in posemesh-domain-http
987
988fn sanitize_component(value: &str) -> String {
989    let sanitized: String = value
990        .chars()
991        .map(|c| {
992            if c.is_ascii_alphanumeric() || matches!(c, '-' | '_') {
993                c
994            } else {
995                '_'
996            }
997        })
998        .collect();
999    if sanitized.is_empty() {
1000        "part".into()
1001    } else {
1002        sanitized
1003    }
1004}
1005
1006fn extract_timestamp(name: &str) -> Option<String> {
1007    Regex::new(r"\d{4}-\d{2}-\d{2}[_-]\d{2}-\d{2}-\d{2}")
1008        .ok()
1009        .and_then(|re| re.find(name).map(|m| m.as_str().to_string()))
1010}
1011
1012fn map_filename(data_type: &str, name: &str) -> String {
1013    format!(
1014        "{}.{}",
1015        sanitize_component(name),
1016        sanitize_component(data_type)
1017    )
1018}