posemesh_compute_node/storage/
client.rs

1use crate::errors::StorageError;
2use crate::storage::token::TokenRef;
3use anyhow::Result;
4use futures::StreamExt;
5use regex::Regex;
6use reqwest::Method;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::path::{Path, PathBuf};
10use std::sync::OnceLock;
11use std::time::Duration;
12use tokio::fs;
13use url::Url;
14use uuid::Uuid;
15// zip extraction moved to reconstruction-specific runner
16
17/// Representation of one multipart section downloaded from Domain.
18#[derive(Debug, Clone)]
19pub struct DownloadedPart {
20    pub id: Option<String>,
21    pub name: Option<String>,
22    pub data_type: Option<String>,
23    pub domain_id: Option<String>,
24    pub path: PathBuf,
25    pub root: PathBuf,
26    pub relative_path: PathBuf,
27    pub extracted_paths: Vec<PathBuf>,
28}
29
30#[derive(Debug)]
31pub struct UploadRequest<'a> {
32    pub domain_id: &'a str,
33    pub name: &'a str,
34    pub data_type: &'a str,
35    pub logical_path: &'a str,
36    pub bytes: &'a [u8],
37    pub existing_id: Option<&'a str>,
38}
39
40#[derive(Debug)]
41pub struct UploadFileRequest<'a> {
42    pub domain_id: &'a str,
43    pub name: &'a str,
44    pub data_type: &'a str,
45    pub logical_path: &'a str,
46    pub path: &'a Path,
47    pub existing_id: Option<&'a str>,
48}
49
50#[derive(Debug, Clone)]
51struct UploadInfoV1 {
52    request_max_bytes: i64,
53    multipart_enabled: bool,
54}
55
56#[derive(Debug, Deserialize, Clone)]
57struct InfoResponseV1 {
58    upload: InfoUploadV1,
59}
60
61#[derive(Debug, Deserialize, Clone)]
62struct InfoUploadV1 {
63    request_max_bytes: i64,
64    multipart: InfoMultipartV1,
65}
66
67#[derive(Debug, Deserialize, Clone)]
68struct InfoMultipartV1 {
69    enabled: bool,
70}
71
72#[derive(Debug, Serialize)]
73struct InitiateMultipartRequestV1 {
74    name: String,
75    data_type: String,
76    size: Option<i64>,
77    content_type: Option<String>,
78    existing_id: Option<String>,
79}
80
81#[derive(Debug, Deserialize)]
82struct InitiateMultipartResponseV1 {
83    upload_id: String,
84    part_size: i64,
85}
86
87#[derive(Debug, Deserialize)]
88struct UploadPartResultV1 {
89    etag: String,
90}
91
92#[derive(Debug, Serialize)]
93struct CompletedPartV1 {
94    part_number: i32,
95    etag: String,
96}
97
98#[derive(Debug, Serialize)]
99struct CompleteMultipartRequestV1 {
100    parts: Vec<CompletedPartV1>,
101}
102
103#[derive(Debug, Deserialize)]
104struct DomainDataMetadataV1 {
105    id: String,
106}
107
108#[derive(Debug, Clone)]
109struct InfoCacheEntryV1 {
110    value: Option<UploadInfoV1>,
111    expires_at: u64,
112}
113
114const INFO_CACHE_TTL_SECS: u64 = 60;
115static INFO_CACHE_V1: OnceLock<parking_lot::Mutex<HashMap<String, InfoCacheEntryV1>>> =
116    OnceLock::new();
117
118fn info_cache_v1() -> &'static parking_lot::Mutex<HashMap<String, InfoCacheEntryV1>> {
119    INFO_CACHE_V1.get_or_init(|| parking_lot::Mutex::new(HashMap::new()))
120}
121
122fn now_unix_secs() -> u64 {
123    std::time::SystemTime::now()
124        .duration_since(std::time::UNIX_EPOCH)
125        .unwrap_or_else(|_| Duration::from_secs(0))
126        .as_secs()
127}
128
129/// Domain server HTTP client (skeleton; HTTP added later).
130#[derive(Clone)]
131pub struct DomainClient {
132    pub base: Url,
133    pub token: TokenRef,
134    client_id: String,
135}
136impl DomainClient {
137    pub fn new(base: Url, token: TokenRef) -> Result<Self> {
138        let client_id = env_client_id();
139        Ok(Self {
140            base,
141            token,
142            client_id,
143        })
144    }
145
146    pub fn with_timeout(base: Url, token: TokenRef, _timeout: Duration) -> Result<Self> {
147        let client_id = env_client_id();
148        Ok(Self {
149            base,
150            token,
151            client_id,
152        })
153    }
154
155    /// Download a Domain data item referenced by an absolute URI, persisting each multipart
156    /// part into a temporary file and returning its metadata.
157    pub async fn download_uri(
158        &self,
159        uri: &str,
160    ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
161        let resolved = resolve_domain_url(&self.base, uri)?;
162        let (domain_id, query) = parse_download_target(&resolved, None)?;
163        self.download_domain_data(&resolved, &domain_id, query)
164            .await
165    }
166
167    /// Download domain data referenced by a CID, which can be either:
168    /// - a bare domain-data ID (UUID), or
169    /// - an absolute/relative URL under the domain server.
170    pub async fn download_cid(
171        &self,
172        domain_id: &str,
173        cid: &str,
174    ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
175        let cid = cid.trim();
176        if cid.is_empty() {
177            return Err(StorageError::Other("empty cid".into()));
178        }
179
180        if cid.contains("://") || cid.starts_with('/') {
181            let resolved = resolve_domain_url(&self.base, cid)?;
182            let (domain_id, query) = parse_download_target(&resolved, Some(domain_id))?;
183            return self
184                .download_domain_data(&resolved, &domain_id, query)
185                .await;
186        }
187
188        let query = posemesh_domain_http::domain_data::DownloadQuery {
189            ids: vec![cid.to_string()],
190            name: None,
191            data_type: None,
192        };
193        self.download_domain_data(&self.base, domain_id, query)
194            .await
195    }
196
197    async fn download_domain_data(
198        &self,
199        url_for_log: &Url,
200        domain_id: &str,
201        query: posemesh_domain_http::domain_data::DownloadQuery,
202    ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
203        let domain_id = domain_id.trim();
204        if domain_id.is_empty() {
205            return Err(StorageError::Other("missing domain_id for download".into()));
206        }
207
208        tracing::debug!(
209            target: "posemesh_compute_node::storage::client",
210            method = "GET",
211            %url_for_log,
212            domain_id = domain_id,
213            ids = ?query.ids,
214            name = ?query.name,
215            data_type = ?query.data_type,
216            "Downloading domain data"
217        );
218
219        let base = self.base.as_str().trim_end_matches('/');
220        let mut rx = posemesh_domain_http::domain_data::download_v1_stream(
221            base,
222            self.client_id.as_str(),
223            self.token.get().as_str(),
224            domain_id,
225            &query,
226        )
227        .await
228        .map_err(map_domain_error)?;
229
230        let root = std::env::temp_dir().join(format!("domain-input-{}", Uuid::new_v4()));
231        fs::create_dir_all(&root)
232            .await
233            .map_err(|e| StorageError::Other(format!("create download root: {}", e)))?;
234        let datasets_root = root.join("datasets");
235        fs::create_dir_all(&datasets_root)
236            .await
237            .map_err(|e| StorageError::Other(format!("create datasets root: {}", e)))?;
238
239        let mut parts = Vec::new();
240
241        while let Some(item) = rx.next().await {
242            let domain_item = item.map_err(map_domain_error)?;
243            let name = domain_item.metadata.name.clone();
244            let data_type = domain_item.metadata.data_type.clone();
245
246            let scan_folder = extract_timestamp(&name)
247                .map(|ts| sanitize_component(&ts))
248                .unwrap_or_else(|| sanitize_component(&name));
249            let scan_dir = datasets_root.join(&scan_folder);
250            fs::create_dir_all(&scan_dir)
251                .await
252                .map_err(|e| StorageError::Other(format!("create scan dir: {}", e)))?;
253
254            let file_name = map_filename(&data_type, &name);
255            let file_path = scan_dir.join(&file_name);
256            if let Some(parent) = file_path.parent() {
257                fs::create_dir_all(parent)
258                    .await
259                    .map_err(|e| StorageError::Other(format!("create parent dir: {}", e)))?;
260            }
261            fs::write(&file_path, &domain_item.data)
262                .await
263                .map_err(|e| StorageError::Other(format!("write temp file: {}", e)))?;
264
265            let extracted_paths = Vec::new();
266
267            let relative_path = file_path
268                .strip_prefix(&root)
269                .unwrap_or(&file_path)
270                .to_path_buf();
271
272            parts.push(DownloadedPart {
273                id: Some(domain_item.metadata.id),
274                name: Some(name),
275                data_type: Some(data_type),
276                domain_id: Some(domain_item.metadata.domain_id),
277                path: file_path,
278                root: root.clone(),
279                relative_path,
280                extracted_paths,
281            });
282        }
283
284        if parts.is_empty() {
285            return Err(StorageError::NotFound);
286        }
287
288        Ok(parts)
289    }
290
291    pub async fn upload_artifact(
292        &self,
293        request: UploadRequest<'_>,
294    ) -> std::result::Result<Option<String>, StorageError> {
295        let domain_id = request.domain_id.trim();
296        if domain_id.is_empty() {
297            return Err(StorageError::Other(
298                "missing domain_id for artifact upload".into(),
299            ));
300        }
301
302        let base = self.base.as_str().trim_end_matches('/');
303        if let Some(info) = get_upload_info_v1(base).await {
304            if info.multipart_enabled && info.request_max_bytes > 0 {
305                let fits_alone = fits_single_upload_request(
306                    info.request_max_bytes,
307                    request.name,
308                    request.data_type,
309                    request.existing_id,
310                    request.bytes.len(),
311                );
312                if !fits_alone {
313                    return self
314                        .upload_artifact_v1_multipart(base, domain_id, request)
315                        .await;
316                }
317            }
318        }
319
320        let action = if let Some(id) = request.existing_id {
321            posemesh_domain_http::domain_data::DomainAction::Update { id: id.to_string() }
322        } else {
323            posemesh_domain_http::domain_data::DomainAction::Create {
324                name: request.name.to_string(),
325                data_type: request.data_type.to_string(),
326            }
327        };
328
329        let upload = posemesh_domain_http::domain_data::UploadDomainData {
330            action,
331            data: request.bytes.to_vec(),
332        };
333        let method = if request.existing_id.is_some() {
334            Method::PUT
335        } else {
336            Method::POST
337        };
338        tracing::debug!(
339            target: "posemesh_compute_node::storage::client",
340            method = %method,
341            url = %format!("{}/api/v1/domains/{}/data", base, domain_id),
342            logical_path = request.logical_path,
343            name = request.name,
344            data_type = request.data_type,
345            has_existing_id = request.existing_id.is_some(),
346            "Sending domain upload request"
347        );
348
349        let items = posemesh_domain_http::domain_data::upload_v1(
350            base,
351            self.token.get().as_str(),
352            domain_id,
353            vec![upload],
354        )
355        .await
356        .map_err(map_domain_error)?;
357
358        Ok(items.into_iter().next().map(|d| d.id))
359    }
360
361    pub async fn upload_artifact_file(
362        &self,
363        request: UploadFileRequest<'_>,
364    ) -> std::result::Result<Option<String>, StorageError> {
365        let domain_id = request.domain_id.trim();
366        if domain_id.is_empty() {
367            return Err(StorageError::Other(
368                "missing domain_id for artifact upload".into(),
369            ));
370        }
371
372        let base = self.base.as_str().trim_end_matches('/');
373
374        let mut file = fs::File::open(request.path)
375            .await
376            .map_err(|e| StorageError::Other(format!("open upload file: {}", e)))?;
377
378        let file_size = match file.metadata().await {
379            Ok(meta) => {
380                if meta.len() == 0 {
381                    return Err(StorageError::BadRequest);
382                }
383                i64::try_from(meta.len()).ok()
384            }
385            Err(_) => None,
386        };
387
388        match self
389            .upload_artifact_v1_multipart_file(base, domain_id, &request, &mut file, file_size)
390            .await
391        {
392            Ok(v) => Ok(v),
393            Err(UploadFileFallback::UnsupportedEndpoint) => {
394                // Fall back to legacy multipart/form-data upload (in-memory) for older servers.
395                let bytes_owned = fs::read(request.path)
396                    .await
397                    .map_err(|e| StorageError::Other(format!("read upload file: {}", e)))?;
398                if bytes_owned.is_empty() {
399                    return Err(StorageError::BadRequest);
400                }
401                self.upload_artifact(UploadRequest {
402                    domain_id: request.domain_id,
403                    name: request.name,
404                    data_type: request.data_type,
405                    logical_path: request.logical_path,
406                    bytes: bytes_owned.as_slice(),
407                    existing_id: request.existing_id,
408                })
409                .await
410            }
411            Err(UploadFileFallback::Error(e)) => Err(e),
412        }
413    }
414
415    async fn upload_artifact_v1_multipart(
416        &self,
417        base: &str,
418        domain_id: &str,
419        request: UploadRequest<'_>,
420    ) -> std::result::Result<Option<String>, StorageError> {
421        if request.bytes.is_empty() {
422            return Err(StorageError::BadRequest);
423        }
424
425        let client = reqwest::Client::new();
426        let initiate_endpoint = format!(
427            "{}/api/v1/domains/{}/data/multipart?uploads",
428            base, domain_id
429        );
430
431        let init_req = InitiateMultipartRequestV1 {
432            name: request.name.to_string(),
433            data_type: request.data_type.to_string(),
434            size: Some(request.bytes.len() as i64),
435            content_type: Some("application/octet-stream".to_string()),
436            existing_id: request.existing_id.map(|id| id.to_string()),
437        };
438
439        tracing::debug!(
440            target: "posemesh_compute_node::storage::client",
441            method = "POST",
442            url = %initiate_endpoint,
443            logical_path = request.logical_path,
444            name = request.name,
445            data_type = request.data_type,
446            has_existing_id = request.existing_id.is_some(),
447            "Initiating multipart upload"
448        );
449
450        let init_resp = client
451            .post(&initiate_endpoint)
452            .bearer_auth(self.token.get())
453            .header("posemesh-client-id", self.client_id.as_str())
454            .header("Content-Type", "application/json")
455            .json(&init_req)
456            .send()
457            .await
458            .map_err(|e| StorageError::Network(e.to_string()))?;
459
460        if !init_resp.status().is_success() {
461            let status = init_resp.status();
462            let err = init_resp
463                .text()
464                .await
465                .unwrap_or_else(|_| "Unknown error".to_string());
466            tracing::warn!(
467                target: "posemesh_compute_node::storage::client",
468                %status,
469                error = %err,
470                "Multipart initiation failed"
471            );
472            return Err(map_status(status));
473        }
474
475        let init: InitiateMultipartResponseV1 = init_resp
476            .json()
477            .await
478            .map_err(|e| StorageError::Other(format!("invalid initiate response: {}", e)))?;
479
480        let part_size = usize::try_from(init.part_size)
481            .map_err(|_| StorageError::Other("invalid multipart part_size".into()))?;
482        if part_size == 0 {
483            return Err(StorageError::Other("invalid multipart part_size".into()));
484        }
485
486        let upload_id = init.upload_id;
487        let mut completed_parts: Vec<CompletedPartV1> = Vec::new();
488
489        let upload_res: std::result::Result<DomainDataMetadataV1, StorageError> = async {
490            let mut offset: usize = 0;
491            let mut part_number: i32 = 1;
492            while offset < request.bytes.len() {
493                let end = std::cmp::min(offset + part_size, request.bytes.len());
494                let chunk = request.bytes[offset..end].to_vec();
495
496                let part_endpoint = format!(
497                    "{}/api/v1/domains/{}/data/multipart?uploadId={}&partNumber={}",
498                    base, domain_id, upload_id, part_number
499                );
500                tracing::debug!(
501                    target: "posemesh_compute_node::storage::client",
502                    method = "PUT",
503                    url = %part_endpoint,
504                    part_number,
505                    part_bytes = chunk.len(),
506                    "Uploading multipart part"
507                );
508
509                let resp = client
510                    .put(&part_endpoint)
511                    .bearer_auth(self.token.get())
512                    .header("posemesh-client-id", self.client_id.as_str())
513                    .header("Content-Type", "application/octet-stream")
514                    .body(chunk)
515                    .send()
516                    .await
517                    .map_err(|e| StorageError::Network(e.to_string()))?;
518
519                if !resp.status().is_success() {
520                    let status = resp.status();
521                    let err = resp
522                        .text()
523                        .await
524                        .unwrap_or_else(|_| "Unknown error".to_string());
525                    tracing::warn!(
526                        target: "posemesh_compute_node::storage::client",
527                        %status,
528                        error = %err,
529                        part_number,
530                        "Multipart part upload failed"
531                    );
532                    return Err(map_status(status));
533                }
534
535                let res: UploadPartResultV1 = resp
536                    .json()
537                    .await
538                    .map_err(|e| StorageError::Other(format!("invalid part response: {}", e)))?;
539
540                completed_parts.push(CompletedPartV1 {
541                    part_number,
542                    etag: res.etag,
543                });
544
545                offset = end;
546                part_number = part_number
547                    .checked_add(1)
548                    .ok_or_else(|| StorageError::Other("multipart upload too many parts".into()))?;
549            }
550
551            let complete_endpoint = format!(
552                "{}/api/v1/domains/{}/data/multipart?uploadId={}",
553                base, domain_id, upload_id
554            );
555            tracing::debug!(
556                target: "posemesh_compute_node::storage::client",
557                method = "POST",
558                url = %complete_endpoint,
559                parts = completed_parts.len(),
560                "Completing multipart upload"
561            );
562            let resp = client
563                .post(&complete_endpoint)
564                .bearer_auth(self.token.get())
565                .header("posemesh-client-id", self.client_id.as_str())
566                .header("Content-Type", "application/json")
567                .json(&CompleteMultipartRequestV1 {
568                    parts: completed_parts,
569                })
570                .send()
571                .await
572                .map_err(|e| StorageError::Network(e.to_string()))?;
573
574            if !resp.status().is_success() {
575                let status = resp.status();
576                let err = resp
577                    .text()
578                    .await
579                    .unwrap_or_else(|_| "Unknown error".to_string());
580                tracing::warn!(
581                    target: "posemesh_compute_node::storage::client",
582                    %status,
583                    error = %err,
584                    "Multipart completion failed"
585                );
586                return Err(map_status(status));
587            }
588
589            resp.json::<DomainDataMetadataV1>()
590                .await
591                .map_err(|e| StorageError::Other(format!("invalid complete response: {}", e)))
592        }
593        .await;
594
595        if upload_res.is_err() {
596            let abort_endpoint = format!(
597                "{}/api/v1/domains/{}/data/multipart?uploadId={}",
598                base, domain_id, upload_id
599            );
600            let _ = client
601                .delete(&abort_endpoint)
602                .bearer_auth(self.token.get())
603                .header("posemesh-client-id", self.client_id.as_str())
604                .send()
605                .await;
606        }
607
608        upload_res.map(|meta| Some(meta.id))
609    }
610
611    async fn upload_artifact_v1_multipart_file(
612        &self,
613        base: &str,
614        domain_id: &str,
615        request: &UploadFileRequest<'_>,
616        file: &mut fs::File,
617        file_size: Option<i64>,
618    ) -> std::result::Result<Option<String>, UploadFileFallback> {
619        use tokio::io::AsyncReadExt;
620
621        let client = reqwest::Client::new();
622        let initiate_endpoint = format!(
623            "{}/api/v1/domains/{}/data/multipart?uploads",
624            base, domain_id
625        );
626
627        let init_req = InitiateMultipartRequestV1 {
628            name: request.name.to_string(),
629            data_type: request.data_type.to_string(),
630            size: file_size,
631            content_type: Some("application/octet-stream".to_string()),
632            existing_id: request.existing_id.map(|id| id.to_string()),
633        };
634
635        tracing::debug!(
636            target: "posemesh_compute_node::storage::client",
637            method = "POST",
638            url = %initiate_endpoint,
639            logical_path = request.logical_path,
640            name = request.name,
641            data_type = request.data_type,
642            has_existing_id = request.existing_id.is_some(),
643            "Initiating multipart file upload"
644        );
645
646        let init_resp = client
647            .post(&initiate_endpoint)
648            .bearer_auth(self.token.get())
649            .header("posemesh-client-id", self.client_id.as_str())
650            .header("Content-Type", "application/json")
651            .json(&init_req)
652            .send()
653            .await
654            .map_err(|e| UploadFileFallback::Error(StorageError::Network(e.to_string())))?;
655
656        if !init_resp.status().is_success() {
657            let status = init_resp.status();
658            let err = init_resp
659                .text()
660                .await
661                .unwrap_or_else(|_| "Unknown error".to_string());
662            tracing::warn!(
663                target: "posemesh_compute_node::storage::client",
664                %status,
665                error = %err,
666                "Multipart file initiation failed"
667            );
668            if is_unsupported_endpoint_status(status) {
669                return Err(UploadFileFallback::UnsupportedEndpoint);
670            }
671            return Err(UploadFileFallback::Error(map_status(status)));
672        }
673
674        let init: InitiateMultipartResponseV1 = init_resp.json().await.map_err(|e| {
675            UploadFileFallback::Error(StorageError::Other(format!(
676                "invalid initiate response: {}",
677                e
678            )))
679        })?;
680
681        let part_size = usize::try_from(init.part_size).map_err(|_| {
682            UploadFileFallback::Error(StorageError::Other("invalid multipart part_size".into()))
683        })?;
684        if part_size == 0 {
685            return Err(UploadFileFallback::Error(StorageError::Other(
686                "invalid multipart part_size".into(),
687            )));
688        }
689
690        let upload_id = init.upload_id;
691        let mut completed_parts: Vec<CompletedPartV1> = Vec::new();
692
693        let upload_res: std::result::Result<DomainDataMetadataV1, StorageError> = async {
694            let mut part_number: i32 = 1;
695            loop {
696                let mut chunk = vec![0u8; part_size];
697                let mut read = 0usize;
698                while read < part_size {
699                    let n = file
700                        .read(&mut chunk[read..])
701                        .await
702                        .map_err(|e| StorageError::Other(format!("read upload file: {}", e)))?;
703                    if n == 0 {
704                        break;
705                    }
706                    read += n;
707                }
708                if read == 0 {
709                    break;
710                }
711                chunk.truncate(read);
712
713                let part_endpoint = format!(
714                    "{}/api/v1/domains/{}/data/multipart?uploadId={}&partNumber={}",
715                    base, domain_id, upload_id, part_number
716                );
717                tracing::debug!(
718                    target: "posemesh_compute_node::storage::client",
719                    method = "PUT",
720                    url = %part_endpoint,
721                    part_number,
722                    part_bytes = chunk.len(),
723                    "Uploading multipart file part"
724                );
725
726                let resp = client
727                    .put(&part_endpoint)
728                    .bearer_auth(self.token.get())
729                    .header("posemesh-client-id", self.client_id.as_str())
730                    .header("Content-Type", "application/octet-stream")
731                    .body(chunk)
732                    .send()
733                    .await
734                    .map_err(|e| StorageError::Network(e.to_string()))?;
735
736                if !resp.status().is_success() {
737                    let status = resp.status();
738                    let err = resp
739                        .text()
740                        .await
741                        .unwrap_or_else(|_| "Unknown error".to_string());
742                    tracing::warn!(
743                        target: "posemesh_compute_node::storage::client",
744                        %status,
745                        error = %err,
746                        part_number,
747                        "Multipart file part upload failed"
748                    );
749                    return Err(map_status(status));
750                }
751
752                let res: UploadPartResultV1 = resp
753                    .json()
754                    .await
755                    .map_err(|e| StorageError::Other(format!("invalid part response: {}", e)))?;
756
757                completed_parts.push(CompletedPartV1 {
758                    part_number,
759                    etag: res.etag,
760                });
761
762                part_number = part_number
763                    .checked_add(1)
764                    .ok_or_else(|| StorageError::Other("multipart upload too many parts".into()))?;
765            }
766
767            if completed_parts.is_empty() {
768                return Err(StorageError::BadRequest);
769            }
770
771            let complete_endpoint = format!(
772                "{}/api/v1/domains/{}/data/multipart?uploadId={}",
773                base, domain_id, upload_id
774            );
775            tracing::debug!(
776                target: "posemesh_compute_node::storage::client",
777                method = "POST",
778                url = %complete_endpoint,
779                parts = completed_parts.len(),
780                "Completing multipart file upload"
781            );
782            let resp = client
783                .post(&complete_endpoint)
784                .bearer_auth(self.token.get())
785                .header("posemesh-client-id", self.client_id.as_str())
786                .header("Content-Type", "application/json")
787                .json(&CompleteMultipartRequestV1 {
788                    parts: completed_parts,
789                })
790                .send()
791                .await
792                .map_err(|e| StorageError::Network(e.to_string()))?;
793
794            if !resp.status().is_success() {
795                let status = resp.status();
796                let err = resp
797                    .text()
798                    .await
799                    .unwrap_or_else(|_| "Unknown error".to_string());
800                tracing::warn!(
801                    target: "posemesh_compute_node::storage::client",
802                    %status,
803                    error = %err,
804                    "Multipart file completion failed"
805                );
806                return Err(map_status(status));
807            }
808
809            resp.json::<DomainDataMetadataV1>()
810                .await
811                .map_err(|e| StorageError::Other(format!("invalid complete response: {}", e)))
812        }
813        .await;
814
815        if upload_res.is_err() {
816            let abort_endpoint = format!(
817                "{}/api/v1/domains/{}/data/multipart?uploadId={}",
818                base, domain_id, upload_id
819            );
820            let _ = client
821                .delete(&abort_endpoint)
822                .bearer_auth(self.token.get())
823                .header("posemesh-client-id", self.client_id.as_str())
824                .send()
825                .await;
826        }
827
828        upload_res
829            .map(|meta| Some(meta.id))
830            .map_err(UploadFileFallback::Error)
831    }
832
833    pub async fn find_artifact_id(
834        &self,
835        domain_id: &str,
836        name: &str,
837        data_type: &str,
838    ) -> std::result::Result<Option<String>, StorageError> {
839        let domain_id = domain_id.trim();
840        if domain_id.is_empty() {
841            return Err(StorageError::Other(
842                "missing domain_id for artifact lookup".into(),
843            ));
844        }
845
846        let query = posemesh_domain_http::domain_data::DownloadQuery {
847            ids: Vec::new(),
848            name: Some(name.to_string()),
849            data_type: Some(data_type.to_string()),
850        };
851
852        let base = self.base.as_str().trim_end_matches('/');
853        let url = format!("{}/api/v1/domains/{}/data", base, domain_id);
854        tracing::debug!(
855            target: "posemesh_compute_node::storage::client",
856            method = "GET",
857            %url,
858            artifact_name = name,
859            artifact_type = data_type,
860            "Looking up existing domain artifact"
861        );
862
863        let results = posemesh_domain_http::domain_data::download_metadata_v1(
864            base,
865            self.client_id.as_str(),
866            self.token.get().as_str(),
867            domain_id,
868            &query,
869        )
870        .await;
871
872        let results = match results {
873            Ok(items) => items,
874            Err(err) => {
875                if let posemesh_domain_http::errors::DomainError::AukiErrorResponse(resp) = &err {
876                    if resp.status == reqwest::StatusCode::NOT_FOUND {
877                        return Ok(None);
878                    }
879                }
880                return Err(map_domain_error(err));
881            }
882        };
883
884        Ok(results
885            .into_iter()
886            .find(|item| item.name == name && item.data_type == data_type)
887            .map(|item| item.id))
888    }
889}
890
891async fn fetch_info_v1(base: &str) -> Result<Option<UploadInfoV1>, ()> {
892    let resp = reqwest::Client::new()
893        .get(format!("{}/api/v1/info", base))
894        .send()
895        .await
896        .map_err(|_| ())?;
897
898    if resp.status() == reqwest::StatusCode::NOT_FOUND {
899        return Ok(None);
900    }
901    if !resp.status().is_success() {
902        return Err(());
903    }
904
905    let info = resp.json::<InfoResponseV1>().await.map_err(|_| ())?;
906    Ok(Some(UploadInfoV1 {
907        request_max_bytes: info.upload.request_max_bytes,
908        multipart_enabled: info.upload.multipart.enabled,
909    }))
910}
911
912async fn get_upload_info_v1(base: &str) -> Option<UploadInfoV1> {
913    let now = now_unix_secs();
914    {
915        let cache = info_cache_v1().lock();
916        if let Some(entry) = cache.get(base) {
917            if entry.expires_at > now {
918                return entry.value.clone();
919            }
920        }
921    }
922
923    let fetched = match fetch_info_v1(base).await {
924        Ok(v) => v,
925        Err(_) => return None,
926    };
927
928    let mut cache = info_cache_v1().lock();
929    cache.retain(|_, entry| entry.expires_at > now);
930    cache.insert(
931        base.to_string(),
932        InfoCacheEntryV1 {
933            value: fetched.clone(),
934            expires_at: now.saturating_add(INFO_CACHE_TTL_SECS),
935        },
936    );
937    fetched
938}
939
940fn fits_single_upload_request(
941    request_max_bytes: i64,
942    name: &str,
943    data_type: &str,
944    existing_id: Option<&str>,
945    data_len: usize,
946) -> bool {
947    let boundary = "boundary";
948    let header = if let Some(id) = existing_id {
949        format!(
950            "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; id=\"{}\"\r\n\r\n",
951            boundary, id
952        )
953    } else {
954        format!(
955            "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
956            boundary, name, data_type
957        )
958    };
959    let closing = format!("--{}--\r\n", boundary);
960    let part_len = header.len() + data_len + 2;
961    (part_len + closing.len()) as i64 <= request_max_bytes
962}
963
964fn map_status(status: reqwest::StatusCode) -> StorageError {
965    match status.as_u16() {
966        400 => StorageError::BadRequest,
967        401 => StorageError::Unauthorized,
968        404 => StorageError::NotFound,
969        409 => StorageError::Conflict,
970        n if (500..=599).contains(&n) => StorageError::Server(n),
971        other => StorageError::Other(format!("unexpected status: {}", other)),
972    }
973}
974
975fn is_unsupported_endpoint_status(status: reqwest::StatusCode) -> bool {
976    status == reqwest::StatusCode::NOT_FOUND
977        || status == reqwest::StatusCode::METHOD_NOT_ALLOWED
978        || status == reqwest::StatusCode::NOT_IMPLEMENTED
979}
980
981enum UploadFileFallback {
982    UnsupportedEndpoint,
983    Error(StorageError),
984}
985
986fn map_domain_error(err: posemesh_domain_http::errors::DomainError) -> StorageError {
987    use posemesh_domain_http::errors::{AuthError, DomainError};
988
989    match err {
990        DomainError::AukiErrorResponse(resp) => map_status(resp.status),
991        DomainError::ReqwestError(e) => StorageError::Network(e.to_string()),
992        DomainError::AuthError(AuthError::Unauthorized(_)) => StorageError::Unauthorized,
993        other => StorageError::Other(other.to_string()),
994    }
995}
996
997fn env_client_id() -> String {
998    std::env::var("CLIENT_ID")
999        .ok()
1000        .map(|v| v.trim().to_string())
1001        .filter(|v| !v.is_empty())
1002        .unwrap_or_else(|| format!("posemesh-compute-node/{}", Uuid::new_v4()))
1003}
1004
1005fn resolve_domain_url(base: &Url, value: &str) -> std::result::Result<Url, StorageError> {
1006    if value.contains("://") {
1007        Url::parse(value).map_err(|e| StorageError::Other(format!("parse domain url: {e}")))
1008    } else {
1009        base.join(value)
1010            .map_err(|e| StorageError::Other(format!("join domain url: {e}")))
1011    }
1012}
1013
1014fn parse_download_target(
1015    url: &Url,
1016    fallback_domain_id: Option<&str>,
1017) -> std::result::Result<(String, posemesh_domain_http::domain_data::DownloadQuery), StorageError> {
1018    let segments: Vec<&str> = url
1019        .path_segments()
1020        .map(|segments| segments.filter(|seg| !seg.is_empty()).collect())
1021        .unwrap_or_default();
1022
1023    let mut domain_id_from_path: Option<&str> = None;
1024    let mut data_id_from_path: Option<&str> = None;
1025
1026    for idx in 0..segments.len() {
1027        if segments[idx] == "domains" && idx + 2 < segments.len() && segments[idx + 2] == "data" {
1028            domain_id_from_path = Some(segments[idx + 1]);
1029            data_id_from_path = segments.get(idx + 3).copied();
1030            break;
1031        }
1032    }
1033
1034    let domain_id = domain_id_from_path
1035        .or(fallback_domain_id)
1036        .ok_or_else(|| StorageError::Other(format!("cid url missing domain_id: {}", url)))?
1037        .to_string();
1038
1039    let mut ids: Vec<String> = Vec::new();
1040    let mut name: Option<String> = None;
1041    let mut data_type: Option<String> = None;
1042
1043    for (key, value) in url.query_pairs() {
1044        match key.as_ref() {
1045            "ids" => ids.extend(
1046                value
1047                    .split(',')
1048                    .map(|s| s.trim())
1049                    .filter(|s| !s.is_empty())
1050                    .map(|s| s.to_string()),
1051            ),
1052            "name" => {
1053                if name.is_none() {
1054                    name = Some(value.to_string());
1055                }
1056            }
1057            "data_type" => {
1058                if data_type.is_none() {
1059                    data_type = Some(value.to_string());
1060                }
1061            }
1062            _ => {}
1063        }
1064    }
1065
1066    if let Some(id) = data_id_from_path {
1067        ids = vec![id.to_string()];
1068    }
1069
1070    Ok((
1071        domain_id,
1072        posemesh_domain_http::domain_data::DownloadQuery {
1073            ids,
1074            name,
1075            data_type,
1076        },
1077    ))
1078}
1079
1080// no parse_disposition_params; headers are parsed in posemesh-domain-http
1081
1082fn sanitize_component(value: &str) -> String {
1083    let sanitized: String = value
1084        .chars()
1085        .map(|c| {
1086            if c.is_ascii_alphanumeric() || matches!(c, '-' | '_') {
1087                c
1088            } else {
1089                '_'
1090            }
1091        })
1092        .collect();
1093    if sanitized.is_empty() {
1094        "part".into()
1095    } else {
1096        sanitized
1097    }
1098}
1099
1100fn extract_timestamp(name: &str) -> Option<String> {
1101    Regex::new(r"\d{4}-\d{2}-\d{2}[_-]\d{2}-\d{2}-\d{2}")
1102        .ok()
1103        .and_then(|re| re.find(name).map(|m| m.as_str().to_string()))
1104}
1105
1106fn map_filename(data_type: &str, name: &str) -> String {
1107    format!(
1108        "{}.{}",
1109        sanitize_component(name),
1110        sanitize_component(data_type)
1111    )
1112}