1use crate::errors::StorageError;
2use crate::storage::token::TokenRef;
3use anyhow::Result;
4use futures::StreamExt;
5use regex::Regex;
6use reqwest::Method;
7use serde::{Deserialize, Serialize};
8use std::path::{Path, PathBuf};
9use std::time::Duration;
10use tokio::fs;
11use url::Url;
12use uuid::Uuid;
13#[derive(Debug, Clone)]
17pub struct DownloadedPart {
18 pub id: Option<String>,
19 pub name: Option<String>,
20 pub data_type: Option<String>,
21 pub domain_id: Option<String>,
22 pub path: PathBuf,
23 pub root: PathBuf,
24 pub relative_path: PathBuf,
25 pub extracted_paths: Vec<PathBuf>,
26}
27
28#[derive(Debug)]
29pub struct UploadRequest<'a> {
30 pub domain_id: &'a str,
31 pub name: &'a str,
32 pub data_type: &'a str,
33 pub logical_path: &'a str,
34 pub bytes: &'a [u8],
35 pub existing_id: Option<&'a str>,
36}
37
38#[derive(Debug)]
39pub struct UploadFileRequest<'a> {
40 pub domain_id: &'a str,
41 pub name: &'a str,
42 pub data_type: &'a str,
43 pub logical_path: &'a str,
44 pub path: &'a Path,
45 pub existing_id: Option<&'a str>,
46}
47
48#[derive(Debug, Serialize)]
49struct InitiateMultipartRequestV1 {
50 name: String,
51 data_type: String,
52 size: Option<i64>,
53 content_type: Option<String>,
54 existing_id: Option<String>,
55}
56
57#[derive(Debug, Deserialize)]
58struct InitiateMultipartResponseV1 {
59 upload_id: String,
60 part_size: i64,
61}
62
63#[derive(Debug, Deserialize)]
64struct UploadPartResultV1 {
65 etag: String,
66}
67
68#[derive(Debug, Serialize)]
69struct CompletedPartV1 {
70 part_number: i32,
71 etag: String,
72}
73
74#[derive(Debug, Serialize)]
75struct CompleteMultipartRequestV1 {
76 parts: Vec<CompletedPartV1>,
77}
78
79#[derive(Debug, Deserialize)]
80struct DomainDataMetadataV1 {
81 id: String,
82}
83
84#[derive(Clone)]
86pub struct DomainClient {
87 pub base: Url,
88 pub token: TokenRef,
89 client_id: String,
90}
91impl DomainClient {
92 pub fn new(base: Url, token: TokenRef) -> Result<Self> {
93 let client_id = env_client_id();
94 Ok(Self {
95 base,
96 token,
97 client_id,
98 })
99 }
100
101 pub fn with_timeout(base: Url, token: TokenRef, _timeout: Duration) -> Result<Self> {
102 let client_id = env_client_id();
103 Ok(Self {
104 base,
105 token,
106 client_id,
107 })
108 }
109
110 pub async fn download_uri(
113 &self,
114 uri: &str,
115 ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
116 let resolved = resolve_domain_url(&self.base, uri)?;
117 let (domain_id, query) = parse_download_target(&resolved, None)?;
118 self.download_domain_data(&resolved, &domain_id, query)
119 .await
120 }
121
122 pub async fn download_cid(
126 &self,
127 domain_id: &str,
128 cid: &str,
129 ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
130 let cid = cid.trim();
131 if cid.is_empty() {
132 return Err(StorageError::Other("empty cid".into()));
133 }
134
135 if cid.contains("://") || cid.starts_with('/') {
136 let resolved = resolve_domain_url(&self.base, cid)?;
137 let (domain_id, query) = parse_download_target(&resolved, Some(domain_id))?;
138 return self
139 .download_domain_data(&resolved, &domain_id, query)
140 .await;
141 }
142
143 let query = posemesh_domain_http::domain_data::DownloadQuery {
144 ids: vec![cid.to_string()],
145 name: None,
146 data_type: None,
147 };
148 self.download_domain_data(&self.base, domain_id, query)
149 .await
150 }
151
152 async fn download_domain_data(
153 &self,
154 url_for_log: &Url,
155 domain_id: &str,
156 query: posemesh_domain_http::domain_data::DownloadQuery,
157 ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
158 let domain_id = domain_id.trim();
159 if domain_id.is_empty() {
160 return Err(StorageError::Other("missing domain_id for download".into()));
161 }
162
163 tracing::debug!(
164 target: "posemesh_compute_node::storage::client",
165 method = "GET",
166 %url_for_log,
167 domain_id = domain_id,
168 ids = ?query.ids,
169 name = ?query.name,
170 data_type = ?query.data_type,
171 "Downloading domain data"
172 );
173
174 let base = self.base.as_str().trim_end_matches('/');
175 let mut rx = posemesh_domain_http::domain_data::download_v1_stream(
176 base,
177 self.client_id.as_str(),
178 self.token.get().as_str(),
179 domain_id,
180 &query,
181 )
182 .await
183 .map_err(map_domain_error)?;
184
185 let root = std::env::temp_dir().join(format!("domain-input-{}", Uuid::new_v4()));
186 fs::create_dir_all(&root)
187 .await
188 .map_err(|e| StorageError::Other(format!("create download root: {}", e)))?;
189 let datasets_root = root.join("datasets");
190 fs::create_dir_all(&datasets_root)
191 .await
192 .map_err(|e| StorageError::Other(format!("create datasets root: {}", e)))?;
193
194 let mut parts = Vec::new();
195
196 while let Some(item) = rx.next().await {
197 let domain_item = item.map_err(map_domain_error)?;
198 let name = domain_item.metadata.name.clone();
199 let data_type = domain_item.metadata.data_type.clone();
200
201 let scan_folder = extract_timestamp(&name)
202 .map(|ts| sanitize_component(&ts))
203 .unwrap_or_else(|| sanitize_component(&name));
204 let scan_dir = datasets_root.join(&scan_folder);
205 fs::create_dir_all(&scan_dir)
206 .await
207 .map_err(|e| StorageError::Other(format!("create scan dir: {}", e)))?;
208
209 let file_name = map_filename(&data_type, &name);
210 let file_path = scan_dir.join(&file_name);
211 if let Some(parent) = file_path.parent() {
212 fs::create_dir_all(parent)
213 .await
214 .map_err(|e| StorageError::Other(format!("create parent dir: {}", e)))?;
215 }
216 fs::write(&file_path, &domain_item.data)
217 .await
218 .map_err(|e| StorageError::Other(format!("write temp file: {}", e)))?;
219
220 let extracted_paths = Vec::new();
221
222 let relative_path = file_path
223 .strip_prefix(&root)
224 .unwrap_or(&file_path)
225 .to_path_buf();
226
227 parts.push(DownloadedPart {
228 id: Some(domain_item.metadata.id),
229 name: Some(name),
230 data_type: Some(data_type),
231 domain_id: Some(domain_item.metadata.domain_id),
232 path: file_path,
233 root: root.clone(),
234 relative_path,
235 extracted_paths,
236 });
237 }
238
239 if parts.is_empty() {
240 return Err(StorageError::NotFound);
241 }
242
243 Ok(parts)
244 }
245
246 pub async fn upload_artifact(
247 &self,
248 request: UploadRequest<'_>,
249 ) -> std::result::Result<Option<String>, StorageError> {
250 let domain_id = request.domain_id.trim();
251 if domain_id.is_empty() {
252 return Err(StorageError::Other(
253 "missing domain_id for artifact upload".into(),
254 ));
255 }
256
257 let base = self.base.as_str().trim_end_matches('/');
258 if let Some(info) = posemesh_domain_http::domain_data::get_upload_info_v1(base).await {
259 if info.multipart_enabled && info.request_max_bytes > 0 {
260 let fits_alone = fits_single_upload_request(
261 info.request_max_bytes,
262 request.name,
263 request.data_type,
264 request.existing_id,
265 request.bytes.len(),
266 );
267 if !fits_alone {
268 return self
269 .upload_artifact_v1_multipart(base, domain_id, request)
270 .await;
271 }
272 }
273 }
274
275 let action = if let Some(id) = request.existing_id {
276 posemesh_domain_http::domain_data::DomainAction::Update { id: id.to_string() }
277 } else {
278 posemesh_domain_http::domain_data::DomainAction::Create {
279 name: request.name.to_string(),
280 data_type: request.data_type.to_string(),
281 }
282 };
283
284 let upload = posemesh_domain_http::domain_data::UploadDomainData {
285 action,
286 data: request.bytes.to_vec(),
287 };
288 let method = if request.existing_id.is_some() {
289 Method::PUT
290 } else {
291 Method::POST
292 };
293 tracing::debug!(
294 target: "posemesh_compute_node::storage::client",
295 method = %method,
296 url = %format!("{}/api/v1/domains/{}/data", base, domain_id),
297 logical_path = request.logical_path,
298 name = request.name,
299 data_type = request.data_type,
300 has_existing_id = request.existing_id.is_some(),
301 "Sending domain upload request"
302 );
303
304 let items = posemesh_domain_http::domain_data::upload_v1(
305 base,
306 self.token.get().as_str(),
307 domain_id,
308 vec![upload],
309 )
310 .await
311 .map_err(map_domain_error)?;
312
313 Ok(items.into_iter().next().map(|d| d.id))
314 }
315
316 pub async fn upload_artifact_file(
317 &self,
318 request: UploadFileRequest<'_>,
319 ) -> std::result::Result<Option<String>, StorageError> {
320 let domain_id = request.domain_id.trim();
321 if domain_id.is_empty() {
322 return Err(StorageError::Other(
323 "missing domain_id for artifact upload".into(),
324 ));
325 }
326
327 let base = self.base.as_str().trim_end_matches('/');
328
329 let mut file = fs::File::open(request.path)
330 .await
331 .map_err(|e| StorageError::Other(format!("open upload file: {}", e)))?;
332
333 let file_size = match file.metadata().await {
334 Ok(meta) => {
335 if meta.len() == 0 {
336 return Err(StorageError::BadRequest);
337 }
338 i64::try_from(meta.len()).ok()
339 }
340 Err(_) => None,
341 };
342
343 match self
344 .upload_artifact_v1_multipart_file(base, domain_id, &request, &mut file, file_size)
345 .await
346 {
347 Ok(v) => Ok(v),
348 Err(UploadFileFallback::UnsupportedEndpoint) => {
349 let bytes_owned = fs::read(request.path)
351 .await
352 .map_err(|e| StorageError::Other(format!("read upload file: {}", e)))?;
353 if bytes_owned.is_empty() {
354 return Err(StorageError::BadRequest);
355 }
356 self.upload_artifact(UploadRequest {
357 domain_id: request.domain_id,
358 name: request.name,
359 data_type: request.data_type,
360 logical_path: request.logical_path,
361 bytes: bytes_owned.as_slice(),
362 existing_id: request.existing_id,
363 })
364 .await
365 }
366 Err(UploadFileFallback::Error(e)) => Err(e),
367 }
368 }
369
370 async fn upload_artifact_v1_multipart(
371 &self,
372 base: &str,
373 domain_id: &str,
374 request: UploadRequest<'_>,
375 ) -> std::result::Result<Option<String>, StorageError> {
376 if request.bytes.is_empty() {
377 return Err(StorageError::BadRequest);
378 }
379
380 let client = reqwest::Client::new();
381 let initiate_endpoint = format!(
382 "{}/api/v1/domains/{}/data/multipart?uploads",
383 base, domain_id
384 );
385
386 let init_req = InitiateMultipartRequestV1 {
387 name: request.name.to_string(),
388 data_type: request.data_type.to_string(),
389 size: Some(request.bytes.len() as i64),
390 content_type: Some("application/octet-stream".to_string()),
391 existing_id: request.existing_id.map(|id| id.to_string()),
392 };
393
394 tracing::debug!(
395 target: "posemesh_compute_node::storage::client",
396 method = "POST",
397 url = %initiate_endpoint,
398 logical_path = request.logical_path,
399 name = request.name,
400 data_type = request.data_type,
401 has_existing_id = request.existing_id.is_some(),
402 "Initiating multipart upload"
403 );
404
405 let init_resp = client
406 .post(&initiate_endpoint)
407 .bearer_auth(self.token.get())
408 .header("posemesh-client-id", self.client_id.as_str())
409 .header("Content-Type", "application/json")
410 .json(&init_req)
411 .send()
412 .await
413 .map_err(|e| StorageError::Network(e.to_string()))?;
414
415 if !init_resp.status().is_success() {
416 let status = init_resp.status();
417 let err = init_resp
418 .text()
419 .await
420 .unwrap_or_else(|_| "Unknown error".to_string());
421 tracing::warn!(
422 target: "posemesh_compute_node::storage::client",
423 %status,
424 error = %err,
425 "Multipart initiation failed"
426 );
427 return Err(map_status(status));
428 }
429
430 let init: InitiateMultipartResponseV1 = init_resp
431 .json()
432 .await
433 .map_err(|e| StorageError::Other(format!("invalid initiate response: {}", e)))?;
434
435 let part_size = usize::try_from(init.part_size)
436 .map_err(|_| StorageError::Other("invalid multipart part_size".into()))?;
437 if part_size == 0 {
438 return Err(StorageError::Other("invalid multipart part_size".into()));
439 }
440
441 let upload_id = init.upload_id;
442 let mut completed_parts: Vec<CompletedPartV1> = Vec::new();
443
444 let upload_res: std::result::Result<DomainDataMetadataV1, StorageError> = async {
445 let mut offset: usize = 0;
446 let mut part_number: i32 = 1;
447 while offset < request.bytes.len() {
448 let end = std::cmp::min(offset + part_size, request.bytes.len());
449 let chunk = request.bytes[offset..end].to_vec();
450
451 let part_endpoint = format!(
452 "{}/api/v1/domains/{}/data/multipart?uploadId={}&partNumber={}",
453 base, domain_id, upload_id, part_number
454 );
455 tracing::debug!(
456 target: "posemesh_compute_node::storage::client",
457 method = "PUT",
458 url = %part_endpoint,
459 part_number,
460 part_bytes = chunk.len(),
461 "Uploading multipart part"
462 );
463
464 let resp = client
465 .put(&part_endpoint)
466 .bearer_auth(self.token.get())
467 .header("posemesh-client-id", self.client_id.as_str())
468 .header("Content-Type", "application/octet-stream")
469 .body(chunk)
470 .send()
471 .await
472 .map_err(|e| StorageError::Network(e.to_string()))?;
473
474 if !resp.status().is_success() {
475 let status = resp.status();
476 let err = resp
477 .text()
478 .await
479 .unwrap_or_else(|_| "Unknown error".to_string());
480 tracing::warn!(
481 target: "posemesh_compute_node::storage::client",
482 %status,
483 error = %err,
484 part_number,
485 "Multipart part upload failed"
486 );
487 return Err(map_status(status));
488 }
489
490 let res: UploadPartResultV1 = resp
491 .json()
492 .await
493 .map_err(|e| StorageError::Other(format!("invalid part response: {}", e)))?;
494
495 completed_parts.push(CompletedPartV1 {
496 part_number,
497 etag: res.etag,
498 });
499
500 offset = end;
501 part_number = part_number
502 .checked_add(1)
503 .ok_or_else(|| StorageError::Other("multipart upload too many parts".into()))?;
504 }
505
506 let complete_endpoint = format!(
507 "{}/api/v1/domains/{}/data/multipart?uploadId={}",
508 base, domain_id, upload_id
509 );
510 tracing::debug!(
511 target: "posemesh_compute_node::storage::client",
512 method = "POST",
513 url = %complete_endpoint,
514 parts = completed_parts.len(),
515 "Completing multipart upload"
516 );
517 let resp = client
518 .post(&complete_endpoint)
519 .bearer_auth(self.token.get())
520 .header("posemesh-client-id", self.client_id.as_str())
521 .header("Content-Type", "application/json")
522 .json(&CompleteMultipartRequestV1 {
523 parts: completed_parts,
524 })
525 .send()
526 .await
527 .map_err(|e| StorageError::Network(e.to_string()))?;
528
529 if !resp.status().is_success() {
530 let status = resp.status();
531 let err = resp
532 .text()
533 .await
534 .unwrap_or_else(|_| "Unknown error".to_string());
535 tracing::warn!(
536 target: "posemesh_compute_node::storage::client",
537 %status,
538 error = %err,
539 "Multipart completion failed"
540 );
541 return Err(map_status(status));
542 }
543
544 resp.json::<DomainDataMetadataV1>()
545 .await
546 .map_err(|e| StorageError::Other(format!("invalid complete response: {}", e)))
547 }
548 .await;
549
550 if upload_res.is_err() {
551 let abort_endpoint = format!(
552 "{}/api/v1/domains/{}/data/multipart?uploadId={}",
553 base, domain_id, upload_id
554 );
555 let _ = client
556 .delete(&abort_endpoint)
557 .bearer_auth(self.token.get())
558 .header("posemesh-client-id", self.client_id.as_str())
559 .send()
560 .await;
561 }
562
563 upload_res.map(|meta| Some(meta.id))
564 }
565
566 async fn upload_artifact_v1_multipart_file(
567 &self,
568 base: &str,
569 domain_id: &str,
570 request: &UploadFileRequest<'_>,
571 file: &mut fs::File,
572 file_size: Option<i64>,
573 ) -> std::result::Result<Option<String>, UploadFileFallback> {
574 use tokio::io::AsyncReadExt;
575
576 let client = reqwest::Client::new();
577 let initiate_endpoint = format!(
578 "{}/api/v1/domains/{}/data/multipart?uploads",
579 base, domain_id
580 );
581
582 let init_req = InitiateMultipartRequestV1 {
583 name: request.name.to_string(),
584 data_type: request.data_type.to_string(),
585 size: file_size,
586 content_type: Some("application/octet-stream".to_string()),
587 existing_id: request.existing_id.map(|id| id.to_string()),
588 };
589
590 tracing::debug!(
591 target: "posemesh_compute_node::storage::client",
592 method = "POST",
593 url = %initiate_endpoint,
594 logical_path = request.logical_path,
595 name = request.name,
596 data_type = request.data_type,
597 has_existing_id = request.existing_id.is_some(),
598 "Initiating multipart file upload"
599 );
600
601 let init_resp = client
602 .post(&initiate_endpoint)
603 .bearer_auth(self.token.get())
604 .header("posemesh-client-id", self.client_id.as_str())
605 .header("Content-Type", "application/json")
606 .json(&init_req)
607 .send()
608 .await
609 .map_err(|e| UploadFileFallback::Error(StorageError::Network(e.to_string())))?;
610
611 if !init_resp.status().is_success() {
612 let status = init_resp.status();
613 let err = init_resp
614 .text()
615 .await
616 .unwrap_or_else(|_| "Unknown error".to_string());
617 tracing::warn!(
618 target: "posemesh_compute_node::storage::client",
619 %status,
620 error = %err,
621 "Multipart file initiation failed"
622 );
623 if is_unsupported_endpoint_status(status) {
624 return Err(UploadFileFallback::UnsupportedEndpoint);
625 }
626 return Err(UploadFileFallback::Error(map_status(status)));
627 }
628
629 let init: InitiateMultipartResponseV1 = init_resp.json().await.map_err(|e| {
630 UploadFileFallback::Error(StorageError::Other(format!(
631 "invalid initiate response: {}",
632 e
633 )))
634 })?;
635
636 let part_size = usize::try_from(init.part_size).map_err(|_| {
637 UploadFileFallback::Error(StorageError::Other("invalid multipart part_size".into()))
638 })?;
639 if part_size == 0 {
640 return Err(UploadFileFallback::Error(StorageError::Other(
641 "invalid multipart part_size".into(),
642 )));
643 }
644
645 let upload_id = init.upload_id;
646 let mut completed_parts: Vec<CompletedPartV1> = Vec::new();
647
648 let upload_res: std::result::Result<DomainDataMetadataV1, StorageError> = async {
649 let mut part_number: i32 = 1;
650 loop {
651 let mut chunk = vec![0u8; part_size];
652 let mut read = 0usize;
653 while read < part_size {
654 let n = file
655 .read(&mut chunk[read..])
656 .await
657 .map_err(|e| StorageError::Other(format!("read upload file: {}", e)))?;
658 if n == 0 {
659 break;
660 }
661 read += n;
662 }
663 if read == 0 {
664 break;
665 }
666 chunk.truncate(read);
667
668 let part_endpoint = format!(
669 "{}/api/v1/domains/{}/data/multipart?uploadId={}&partNumber={}",
670 base, domain_id, upload_id, part_number
671 );
672 tracing::debug!(
673 target: "posemesh_compute_node::storage::client",
674 method = "PUT",
675 url = %part_endpoint,
676 part_number,
677 part_bytes = chunk.len(),
678 "Uploading multipart file part"
679 );
680
681 let resp = client
682 .put(&part_endpoint)
683 .bearer_auth(self.token.get())
684 .header("posemesh-client-id", self.client_id.as_str())
685 .header("Content-Type", "application/octet-stream")
686 .body(chunk)
687 .send()
688 .await
689 .map_err(|e| StorageError::Network(e.to_string()))?;
690
691 if !resp.status().is_success() {
692 let status = resp.status();
693 let err = resp
694 .text()
695 .await
696 .unwrap_or_else(|_| "Unknown error".to_string());
697 tracing::warn!(
698 target: "posemesh_compute_node::storage::client",
699 %status,
700 error = %err,
701 part_number,
702 "Multipart file part upload failed"
703 );
704 return Err(map_status(status));
705 }
706
707 let res: UploadPartResultV1 = resp
708 .json()
709 .await
710 .map_err(|e| StorageError::Other(format!("invalid part response: {}", e)))?;
711
712 completed_parts.push(CompletedPartV1 {
713 part_number,
714 etag: res.etag,
715 });
716
717 part_number = part_number
718 .checked_add(1)
719 .ok_or_else(|| StorageError::Other("multipart upload too many parts".into()))?;
720 }
721
722 if completed_parts.is_empty() {
723 return Err(StorageError::BadRequest);
724 }
725
726 let complete_endpoint = format!(
727 "{}/api/v1/domains/{}/data/multipart?uploadId={}",
728 base, domain_id, upload_id
729 );
730 tracing::debug!(
731 target: "posemesh_compute_node::storage::client",
732 method = "POST",
733 url = %complete_endpoint,
734 parts = completed_parts.len(),
735 "Completing multipart file upload"
736 );
737 let resp = client
738 .post(&complete_endpoint)
739 .bearer_auth(self.token.get())
740 .header("posemesh-client-id", self.client_id.as_str())
741 .header("Content-Type", "application/json")
742 .json(&CompleteMultipartRequestV1 {
743 parts: completed_parts,
744 })
745 .send()
746 .await
747 .map_err(|e| StorageError::Network(e.to_string()))?;
748
749 if !resp.status().is_success() {
750 let status = resp.status();
751 let err = resp
752 .text()
753 .await
754 .unwrap_or_else(|_| "Unknown error".to_string());
755 tracing::warn!(
756 target: "posemesh_compute_node::storage::client",
757 %status,
758 error = %err,
759 "Multipart file completion failed"
760 );
761 return Err(map_status(status));
762 }
763
764 resp.json::<DomainDataMetadataV1>()
765 .await
766 .map_err(|e| StorageError::Other(format!("invalid complete response: {}", e)))
767 }
768 .await;
769
770 if upload_res.is_err() {
771 let abort_endpoint = format!(
772 "{}/api/v1/domains/{}/data/multipart?uploadId={}",
773 base, domain_id, upload_id
774 );
775 let _ = client
776 .delete(&abort_endpoint)
777 .bearer_auth(self.token.get())
778 .header("posemesh-client-id", self.client_id.as_str())
779 .send()
780 .await;
781 }
782
783 upload_res
784 .map(|meta| Some(meta.id))
785 .map_err(UploadFileFallback::Error)
786 }
787
788 pub async fn find_artifact_id(
789 &self,
790 domain_id: &str,
791 name: &str,
792 data_type: &str,
793 ) -> std::result::Result<Option<String>, StorageError> {
794 let domain_id = domain_id.trim();
795 if domain_id.is_empty() {
796 return Err(StorageError::Other(
797 "missing domain_id for artifact lookup".into(),
798 ));
799 }
800
801 let query = posemesh_domain_http::domain_data::DownloadQuery {
802 ids: Vec::new(),
803 name: Some(name.to_string()),
804 data_type: Some(data_type.to_string()),
805 };
806
807 let base = self.base.as_str().trim_end_matches('/');
808 let url = format!("{}/api/v1/domains/{}/data", base, domain_id);
809 tracing::debug!(
810 target: "posemesh_compute_node::storage::client",
811 method = "GET",
812 %url,
813 artifact_name = name,
814 artifact_type = data_type,
815 "Looking up existing domain artifact"
816 );
817
818 let results = posemesh_domain_http::domain_data::download_metadata_v1(
819 base,
820 self.client_id.as_str(),
821 self.token.get().as_str(),
822 domain_id,
823 &query,
824 )
825 .await;
826
827 let results = match results {
828 Ok(items) => items,
829 Err(err) => {
830 if let posemesh_domain_http::errors::DomainError::AukiErrorResponse(resp) = &err {
831 if resp.status == reqwest::StatusCode::NOT_FOUND {
832 return Ok(None);
833 }
834 }
835 return Err(map_domain_error(err));
836 }
837 };
838
839 Ok(results
840 .into_iter()
841 .find(|item| item.name == name && item.data_type == data_type)
842 .map(|item| item.id))
843 }
844}
845
846fn fits_single_upload_request(
847 request_max_bytes: i64,
848 name: &str,
849 data_type: &str,
850 existing_id: Option<&str>,
851 data_len: usize,
852) -> bool {
853 let boundary = "boundary";
854 let header = if let Some(id) = existing_id {
855 format!(
856 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; id=\"{}\"\r\n\r\n",
857 boundary, id
858 )
859 } else {
860 format!(
861 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
862 boundary, name, data_type
863 )
864 };
865 let closing = format!("--{}--\r\n", boundary);
866 let part_len = header.len() + data_len + 2;
867 (part_len + closing.len()) as i64 <= request_max_bytes
868}
869
870fn map_status(status: reqwest::StatusCode) -> StorageError {
871 match status.as_u16() {
872 400 => StorageError::BadRequest,
873 401 => StorageError::Unauthorized,
874 404 => StorageError::NotFound,
875 409 => StorageError::Conflict,
876 n if (500..=599).contains(&n) => StorageError::Server(n),
877 other => StorageError::Other(format!("unexpected status: {}", other)),
878 }
879}
880
881fn is_unsupported_endpoint_status(status: reqwest::StatusCode) -> bool {
882 status == reqwest::StatusCode::NOT_FOUND
883 || status == reqwest::StatusCode::METHOD_NOT_ALLOWED
884 || status == reqwest::StatusCode::NOT_IMPLEMENTED
885}
886
887enum UploadFileFallback {
888 UnsupportedEndpoint,
889 Error(StorageError),
890}
891
892fn map_domain_error(err: posemesh_domain_http::errors::DomainError) -> StorageError {
893 use posemesh_domain_http::errors::{AuthError, DomainError};
894
895 match err {
896 DomainError::AukiErrorResponse(resp) => map_status(resp.status),
897 DomainError::ReqwestError(e) => StorageError::Network(e.to_string()),
898 DomainError::AuthError(AuthError::Unauthorized(_)) => StorageError::Unauthorized,
899 other => StorageError::Other(other.to_string()),
900 }
901}
902
903fn env_client_id() -> String {
904 std::env::var("CLIENT_ID")
905 .ok()
906 .map(|v| v.trim().to_string())
907 .filter(|v| !v.is_empty())
908 .unwrap_or_else(|| format!("posemesh-compute-node/{}", Uuid::new_v4()))
909}
910
911fn resolve_domain_url(base: &Url, value: &str) -> std::result::Result<Url, StorageError> {
912 if value.contains("://") {
913 Url::parse(value).map_err(|e| StorageError::Other(format!("parse domain url: {e}")))
914 } else {
915 base.join(value)
916 .map_err(|e| StorageError::Other(format!("join domain url: {e}")))
917 }
918}
919
920fn parse_download_target(
921 url: &Url,
922 fallback_domain_id: Option<&str>,
923) -> std::result::Result<(String, posemesh_domain_http::domain_data::DownloadQuery), StorageError> {
924 let segments: Vec<&str> = url
925 .path_segments()
926 .map(|segments| segments.filter(|seg| !seg.is_empty()).collect())
927 .unwrap_or_default();
928
929 let mut domain_id_from_path: Option<&str> = None;
930 let mut data_id_from_path: Option<&str> = None;
931
932 for idx in 0..segments.len() {
933 if segments[idx] == "domains" && idx + 2 < segments.len() && segments[idx + 2] == "data" {
934 domain_id_from_path = Some(segments[idx + 1]);
935 data_id_from_path = segments.get(idx + 3).copied();
936 break;
937 }
938 }
939
940 let domain_id = domain_id_from_path
941 .or(fallback_domain_id)
942 .ok_or_else(|| StorageError::Other(format!("cid url missing domain_id: {}", url)))?
943 .to_string();
944
945 let mut ids: Vec<String> = Vec::new();
946 let mut name: Option<String> = None;
947 let mut data_type: Option<String> = None;
948
949 for (key, value) in url.query_pairs() {
950 match key.as_ref() {
951 "ids" => ids.extend(
952 value
953 .split(',')
954 .map(|s| s.trim())
955 .filter(|s| !s.is_empty())
956 .map(|s| s.to_string()),
957 ),
958 "name" => {
959 if name.is_none() {
960 name = Some(value.to_string());
961 }
962 }
963 "data_type" => {
964 if data_type.is_none() {
965 data_type = Some(value.to_string());
966 }
967 }
968 _ => {}
969 }
970 }
971
972 if let Some(id) = data_id_from_path {
973 ids = vec![id.to_string()];
974 }
975
976 Ok((
977 domain_id,
978 posemesh_domain_http::domain_data::DownloadQuery {
979 ids,
980 name,
981 data_type,
982 },
983 ))
984}
985
986fn sanitize_component(value: &str) -> String {
989 let sanitized: String = value
990 .chars()
991 .map(|c| {
992 if c.is_ascii_alphanumeric() || matches!(c, '-' | '_') {
993 c
994 } else {
995 '_'
996 }
997 })
998 .collect();
999 if sanitized.is_empty() {
1000 "part".into()
1001 } else {
1002 sanitized
1003 }
1004}
1005
1006fn extract_timestamp(name: &str) -> Option<String> {
1007 Regex::new(r"\d{4}-\d{2}-\d{2}[_-]\d{2}-\d{2}-\d{2}")
1008 .ok()
1009 .and_then(|re| re.find(name).map(|m| m.as_str().to_string()))
1010}
1011
1012fn map_filename(data_type: &str, name: &str) -> String {
1013 format!(
1014 "{}.{}",
1015 sanitize_component(name),
1016 sanitize_component(data_type)
1017 )
1018}