1use crate::errors::StorageError;
2use crate::storage::token::TokenRef;
3use anyhow::Result;
4use futures::StreamExt;
5use regex::Regex;
6use reqwest::Method;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::path::{Path, PathBuf};
10use std::sync::OnceLock;
11use std::time::Duration;
12use tokio::fs;
13use url::Url;
14use uuid::Uuid;
15#[derive(Debug, Clone)]
19pub struct DownloadedPart {
20 pub id: Option<String>,
21 pub name: Option<String>,
22 pub data_type: Option<String>,
23 pub domain_id: Option<String>,
24 pub path: PathBuf,
25 pub root: PathBuf,
26 pub relative_path: PathBuf,
27 pub extracted_paths: Vec<PathBuf>,
28}
29
30#[derive(Debug)]
31pub struct UploadRequest<'a> {
32 pub domain_id: &'a str,
33 pub name: &'a str,
34 pub data_type: &'a str,
35 pub logical_path: &'a str,
36 pub bytes: &'a [u8],
37 pub existing_id: Option<&'a str>,
38}
39
40#[derive(Debug)]
41pub struct UploadFileRequest<'a> {
42 pub domain_id: &'a str,
43 pub name: &'a str,
44 pub data_type: &'a str,
45 pub logical_path: &'a str,
46 pub path: &'a Path,
47 pub existing_id: Option<&'a str>,
48}
49
50#[derive(Debug, Clone)]
51struct UploadInfoV1 {
52 request_max_bytes: i64,
53 multipart_enabled: bool,
54}
55
56#[derive(Debug, Deserialize, Clone)]
57struct InfoResponseV1 {
58 upload: InfoUploadV1,
59}
60
61#[derive(Debug, Deserialize, Clone)]
62struct InfoUploadV1 {
63 request_max_bytes: i64,
64 multipart: InfoMultipartV1,
65}
66
67#[derive(Debug, Deserialize, Clone)]
68struct InfoMultipartV1 {
69 enabled: bool,
70}
71
72#[derive(Debug, Serialize)]
73struct InitiateMultipartRequestV1 {
74 name: String,
75 data_type: String,
76 size: Option<i64>,
77 content_type: Option<String>,
78 existing_id: Option<String>,
79}
80
81#[derive(Debug, Deserialize)]
82struct InitiateMultipartResponseV1 {
83 upload_id: String,
84 part_size: i64,
85}
86
87#[derive(Debug, Deserialize)]
88struct UploadPartResultV1 {
89 etag: String,
90}
91
92#[derive(Debug, Serialize)]
93struct CompletedPartV1 {
94 part_number: i32,
95 etag: String,
96}
97
98#[derive(Debug, Serialize)]
99struct CompleteMultipartRequestV1 {
100 parts: Vec<CompletedPartV1>,
101}
102
103#[derive(Debug, Deserialize)]
104struct DomainDataMetadataV1 {
105 id: String,
106}
107
108#[derive(Debug, Clone)]
109struct InfoCacheEntryV1 {
110 value: Option<UploadInfoV1>,
111 expires_at: u64,
112}
113
114const INFO_CACHE_TTL_SECS: u64 = 60;
115static INFO_CACHE_V1: OnceLock<parking_lot::Mutex<HashMap<String, InfoCacheEntryV1>>> =
116 OnceLock::new();
117
118fn info_cache_v1() -> &'static parking_lot::Mutex<HashMap<String, InfoCacheEntryV1>> {
119 INFO_CACHE_V1.get_or_init(|| parking_lot::Mutex::new(HashMap::new()))
120}
121
122fn now_unix_secs() -> u64 {
123 std::time::SystemTime::now()
124 .duration_since(std::time::UNIX_EPOCH)
125 .unwrap_or_else(|_| Duration::from_secs(0))
126 .as_secs()
127}
128
129#[derive(Clone)]
131pub struct DomainClient {
132 pub base: Url,
133 pub token: TokenRef,
134 client_id: String,
135}
136impl DomainClient {
137 pub fn new(base: Url, token: TokenRef) -> Result<Self> {
138 let client_id = env_client_id();
139 Ok(Self {
140 base,
141 token,
142 client_id,
143 })
144 }
145
146 pub fn with_timeout(base: Url, token: TokenRef, _timeout: Duration) -> Result<Self> {
147 let client_id = env_client_id();
148 Ok(Self {
149 base,
150 token,
151 client_id,
152 })
153 }
154
155 pub async fn download_uri(
158 &self,
159 uri: &str,
160 ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
161 let resolved = resolve_domain_url(&self.base, uri)?;
162 let (domain_id, query) = parse_download_target(&resolved, None)?;
163 self.download_domain_data(&resolved, &domain_id, query)
164 .await
165 }
166
167 pub async fn download_cid(
171 &self,
172 domain_id: &str,
173 cid: &str,
174 ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
175 let cid = cid.trim();
176 if cid.is_empty() {
177 return Err(StorageError::Other("empty cid".into()));
178 }
179
180 if cid.contains("://") || cid.starts_with('/') {
181 let resolved = resolve_domain_url(&self.base, cid)?;
182 let (domain_id, query) = parse_download_target(&resolved, Some(domain_id))?;
183 return self
184 .download_domain_data(&resolved, &domain_id, query)
185 .await;
186 }
187
188 let query = posemesh_domain_http::domain_data::DownloadQuery {
189 ids: vec![cid.to_string()],
190 name: None,
191 data_type: None,
192 };
193 self.download_domain_data(&self.base, domain_id, query)
194 .await
195 }
196
197 async fn download_domain_data(
198 &self,
199 url_for_log: &Url,
200 domain_id: &str,
201 query: posemesh_domain_http::domain_data::DownloadQuery,
202 ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
203 let domain_id = domain_id.trim();
204 if domain_id.is_empty() {
205 return Err(StorageError::Other("missing domain_id for download".into()));
206 }
207
208 tracing::debug!(
209 target: "posemesh_compute_node::storage::client",
210 method = "GET",
211 %url_for_log,
212 domain_id = domain_id,
213 ids = ?query.ids,
214 name = ?query.name,
215 data_type = ?query.data_type,
216 "Downloading domain data"
217 );
218
219 let base = self.base.as_str().trim_end_matches('/');
220 let mut rx = posemesh_domain_http::domain_data::download_v1_stream(
221 base,
222 self.client_id.as_str(),
223 self.token.get().as_str(),
224 domain_id,
225 &query,
226 )
227 .await
228 .map_err(map_domain_error)?;
229
230 let root = std::env::temp_dir().join(format!("domain-input-{}", Uuid::new_v4()));
231 fs::create_dir_all(&root)
232 .await
233 .map_err(|e| StorageError::Other(format!("create download root: {}", e)))?;
234 let datasets_root = root.join("datasets");
235 fs::create_dir_all(&datasets_root)
236 .await
237 .map_err(|e| StorageError::Other(format!("create datasets root: {}", e)))?;
238
239 let mut parts = Vec::new();
240
241 while let Some(item) = rx.next().await {
242 let domain_item = item.map_err(map_domain_error)?;
243 let name = domain_item.metadata.name.clone();
244 let data_type = domain_item.metadata.data_type.clone();
245
246 let scan_folder = extract_timestamp(&name)
247 .map(|ts| sanitize_component(&ts))
248 .unwrap_or_else(|| sanitize_component(&name));
249 let scan_dir = datasets_root.join(&scan_folder);
250 fs::create_dir_all(&scan_dir)
251 .await
252 .map_err(|e| StorageError::Other(format!("create scan dir: {}", e)))?;
253
254 let file_name = map_filename(&data_type, &name);
255 let file_path = scan_dir.join(&file_name);
256 if let Some(parent) = file_path.parent() {
257 fs::create_dir_all(parent)
258 .await
259 .map_err(|e| StorageError::Other(format!("create parent dir: {}", e)))?;
260 }
261 fs::write(&file_path, &domain_item.data)
262 .await
263 .map_err(|e| StorageError::Other(format!("write temp file: {}", e)))?;
264
265 let extracted_paths = Vec::new();
266
267 let relative_path = file_path
268 .strip_prefix(&root)
269 .unwrap_or(&file_path)
270 .to_path_buf();
271
272 parts.push(DownloadedPart {
273 id: Some(domain_item.metadata.id),
274 name: Some(name),
275 data_type: Some(data_type),
276 domain_id: Some(domain_item.metadata.domain_id),
277 path: file_path,
278 root: root.clone(),
279 relative_path,
280 extracted_paths,
281 });
282 }
283
284 if parts.is_empty() {
285 return Err(StorageError::NotFound);
286 }
287
288 Ok(parts)
289 }
290
291 pub async fn upload_artifact(
292 &self,
293 request: UploadRequest<'_>,
294 ) -> std::result::Result<Option<String>, StorageError> {
295 let domain_id = request.domain_id.trim();
296 if domain_id.is_empty() {
297 return Err(StorageError::Other(
298 "missing domain_id for artifact upload".into(),
299 ));
300 }
301
302 let base = self.base.as_str().trim_end_matches('/');
303 if let Some(info) = get_upload_info_v1(base).await {
304 if info.multipart_enabled && info.request_max_bytes > 0 {
305 let fits_alone = fits_single_upload_request(
306 info.request_max_bytes,
307 request.name,
308 request.data_type,
309 request.existing_id,
310 request.bytes.len(),
311 );
312 if !fits_alone {
313 return self
314 .upload_artifact_v1_multipart(base, domain_id, request)
315 .await;
316 }
317 }
318 }
319
320 let action = if let Some(id) = request.existing_id {
321 posemesh_domain_http::domain_data::DomainAction::Update { id: id.to_string() }
322 } else {
323 posemesh_domain_http::domain_data::DomainAction::Create {
324 name: request.name.to_string(),
325 data_type: request.data_type.to_string(),
326 }
327 };
328
329 let upload = posemesh_domain_http::domain_data::UploadDomainData {
330 action,
331 data: request.bytes.to_vec(),
332 };
333 let method = if request.existing_id.is_some() {
334 Method::PUT
335 } else {
336 Method::POST
337 };
338 tracing::debug!(
339 target: "posemesh_compute_node::storage::client",
340 method = %method,
341 url = %format!("{}/api/v1/domains/{}/data", base, domain_id),
342 logical_path = request.logical_path,
343 name = request.name,
344 data_type = request.data_type,
345 has_existing_id = request.existing_id.is_some(),
346 "Sending domain upload request"
347 );
348
349 let items = posemesh_domain_http::domain_data::upload_v1(
350 base,
351 self.token.get().as_str(),
352 domain_id,
353 vec![upload],
354 )
355 .await
356 .map_err(map_domain_error)?;
357
358 Ok(items.into_iter().next().map(|d| d.id))
359 }
360
361 pub async fn upload_artifact_file(
362 &self,
363 request: UploadFileRequest<'_>,
364 ) -> std::result::Result<Option<String>, StorageError> {
365 let domain_id = request.domain_id.trim();
366 if domain_id.is_empty() {
367 return Err(StorageError::Other(
368 "missing domain_id for artifact upload".into(),
369 ));
370 }
371
372 let base = self.base.as_str().trim_end_matches('/');
373
374 let mut file = fs::File::open(request.path)
375 .await
376 .map_err(|e| StorageError::Other(format!("open upload file: {}", e)))?;
377
378 let file_size = match file.metadata().await {
379 Ok(meta) => {
380 if meta.len() == 0 {
381 return Err(StorageError::BadRequest);
382 }
383 i64::try_from(meta.len()).ok()
384 }
385 Err(_) => None,
386 };
387
388 match self
389 .upload_artifact_v1_multipart_file(base, domain_id, &request, &mut file, file_size)
390 .await
391 {
392 Ok(v) => Ok(v),
393 Err(UploadFileFallback::UnsupportedEndpoint) => {
394 let bytes_owned = fs::read(request.path)
396 .await
397 .map_err(|e| StorageError::Other(format!("read upload file: {}", e)))?;
398 if bytes_owned.is_empty() {
399 return Err(StorageError::BadRequest);
400 }
401 self.upload_artifact(UploadRequest {
402 domain_id: request.domain_id,
403 name: request.name,
404 data_type: request.data_type,
405 logical_path: request.logical_path,
406 bytes: bytes_owned.as_slice(),
407 existing_id: request.existing_id,
408 })
409 .await
410 }
411 Err(UploadFileFallback::Error(e)) => Err(e),
412 }
413 }
414
415 async fn upload_artifact_v1_multipart(
416 &self,
417 base: &str,
418 domain_id: &str,
419 request: UploadRequest<'_>,
420 ) -> std::result::Result<Option<String>, StorageError> {
421 if request.bytes.is_empty() {
422 return Err(StorageError::BadRequest);
423 }
424
425 let client = reqwest::Client::new();
426 let initiate_endpoint = format!(
427 "{}/api/v1/domains/{}/data/multipart?uploads",
428 base, domain_id
429 );
430
431 let init_req = InitiateMultipartRequestV1 {
432 name: request.name.to_string(),
433 data_type: request.data_type.to_string(),
434 size: Some(request.bytes.len() as i64),
435 content_type: Some("application/octet-stream".to_string()),
436 existing_id: request.existing_id.map(|id| id.to_string()),
437 };
438
439 tracing::debug!(
440 target: "posemesh_compute_node::storage::client",
441 method = "POST",
442 url = %initiate_endpoint,
443 logical_path = request.logical_path,
444 name = request.name,
445 data_type = request.data_type,
446 has_existing_id = request.existing_id.is_some(),
447 "Initiating multipart upload"
448 );
449
450 let init_resp = client
451 .post(&initiate_endpoint)
452 .bearer_auth(self.token.get())
453 .header("posemesh-client-id", self.client_id.as_str())
454 .header("Content-Type", "application/json")
455 .json(&init_req)
456 .send()
457 .await
458 .map_err(|e| StorageError::Network(e.to_string()))?;
459
460 if !init_resp.status().is_success() {
461 let status = init_resp.status();
462 let err = init_resp
463 .text()
464 .await
465 .unwrap_or_else(|_| "Unknown error".to_string());
466 tracing::warn!(
467 target: "posemesh_compute_node::storage::client",
468 %status,
469 error = %err,
470 "Multipart initiation failed"
471 );
472 return Err(map_status(status));
473 }
474
475 let init: InitiateMultipartResponseV1 = init_resp
476 .json()
477 .await
478 .map_err(|e| StorageError::Other(format!("invalid initiate response: {}", e)))?;
479
480 let part_size = usize::try_from(init.part_size)
481 .map_err(|_| StorageError::Other("invalid multipart part_size".into()))?;
482 if part_size == 0 {
483 return Err(StorageError::Other("invalid multipart part_size".into()));
484 }
485
486 let upload_id = init.upload_id;
487 let mut completed_parts: Vec<CompletedPartV1> = Vec::new();
488
489 let upload_res: std::result::Result<DomainDataMetadataV1, StorageError> = async {
490 let mut offset: usize = 0;
491 let mut part_number: i32 = 1;
492 while offset < request.bytes.len() {
493 let end = std::cmp::min(offset + part_size, request.bytes.len());
494 let chunk = request.bytes[offset..end].to_vec();
495
496 let part_endpoint = format!(
497 "{}/api/v1/domains/{}/data/multipart?uploadId={}&partNumber={}",
498 base, domain_id, upload_id, part_number
499 );
500 tracing::debug!(
501 target: "posemesh_compute_node::storage::client",
502 method = "PUT",
503 url = %part_endpoint,
504 part_number,
505 part_bytes = chunk.len(),
506 "Uploading multipart part"
507 );
508
509 let resp = client
510 .put(&part_endpoint)
511 .bearer_auth(self.token.get())
512 .header("posemesh-client-id", self.client_id.as_str())
513 .header("Content-Type", "application/octet-stream")
514 .body(chunk)
515 .send()
516 .await
517 .map_err(|e| StorageError::Network(e.to_string()))?;
518
519 if !resp.status().is_success() {
520 let status = resp.status();
521 let err = resp
522 .text()
523 .await
524 .unwrap_or_else(|_| "Unknown error".to_string());
525 tracing::warn!(
526 target: "posemesh_compute_node::storage::client",
527 %status,
528 error = %err,
529 part_number,
530 "Multipart part upload failed"
531 );
532 return Err(map_status(status));
533 }
534
535 let res: UploadPartResultV1 = resp
536 .json()
537 .await
538 .map_err(|e| StorageError::Other(format!("invalid part response: {}", e)))?;
539
540 completed_parts.push(CompletedPartV1 {
541 part_number,
542 etag: res.etag,
543 });
544
545 offset = end;
546 part_number = part_number
547 .checked_add(1)
548 .ok_or_else(|| StorageError::Other("multipart upload too many parts".into()))?;
549 }
550
551 let complete_endpoint = format!(
552 "{}/api/v1/domains/{}/data/multipart?uploadId={}",
553 base, domain_id, upload_id
554 );
555 tracing::debug!(
556 target: "posemesh_compute_node::storage::client",
557 method = "POST",
558 url = %complete_endpoint,
559 parts = completed_parts.len(),
560 "Completing multipart upload"
561 );
562 let resp = client
563 .post(&complete_endpoint)
564 .bearer_auth(self.token.get())
565 .header("posemesh-client-id", self.client_id.as_str())
566 .header("Content-Type", "application/json")
567 .json(&CompleteMultipartRequestV1 {
568 parts: completed_parts,
569 })
570 .send()
571 .await
572 .map_err(|e| StorageError::Network(e.to_string()))?;
573
574 if !resp.status().is_success() {
575 let status = resp.status();
576 let err = resp
577 .text()
578 .await
579 .unwrap_or_else(|_| "Unknown error".to_string());
580 tracing::warn!(
581 target: "posemesh_compute_node::storage::client",
582 %status,
583 error = %err,
584 "Multipart completion failed"
585 );
586 return Err(map_status(status));
587 }
588
589 resp.json::<DomainDataMetadataV1>()
590 .await
591 .map_err(|e| StorageError::Other(format!("invalid complete response: {}", e)))
592 }
593 .await;
594
595 if upload_res.is_err() {
596 let abort_endpoint = format!(
597 "{}/api/v1/domains/{}/data/multipart?uploadId={}",
598 base, domain_id, upload_id
599 );
600 let _ = client
601 .delete(&abort_endpoint)
602 .bearer_auth(self.token.get())
603 .header("posemesh-client-id", self.client_id.as_str())
604 .send()
605 .await;
606 }
607
608 upload_res.map(|meta| Some(meta.id))
609 }
610
611 async fn upload_artifact_v1_multipart_file(
612 &self,
613 base: &str,
614 domain_id: &str,
615 request: &UploadFileRequest<'_>,
616 file: &mut fs::File,
617 file_size: Option<i64>,
618 ) -> std::result::Result<Option<String>, UploadFileFallback> {
619 use tokio::io::AsyncReadExt;
620
621 let client = reqwest::Client::new();
622 let initiate_endpoint = format!(
623 "{}/api/v1/domains/{}/data/multipart?uploads",
624 base, domain_id
625 );
626
627 let init_req = InitiateMultipartRequestV1 {
628 name: request.name.to_string(),
629 data_type: request.data_type.to_string(),
630 size: file_size,
631 content_type: Some("application/octet-stream".to_string()),
632 existing_id: request.existing_id.map(|id| id.to_string()),
633 };
634
635 tracing::debug!(
636 target: "posemesh_compute_node::storage::client",
637 method = "POST",
638 url = %initiate_endpoint,
639 logical_path = request.logical_path,
640 name = request.name,
641 data_type = request.data_type,
642 has_existing_id = request.existing_id.is_some(),
643 "Initiating multipart file upload"
644 );
645
646 let init_resp = client
647 .post(&initiate_endpoint)
648 .bearer_auth(self.token.get())
649 .header("posemesh-client-id", self.client_id.as_str())
650 .header("Content-Type", "application/json")
651 .json(&init_req)
652 .send()
653 .await
654 .map_err(|e| UploadFileFallback::Error(StorageError::Network(e.to_string())))?;
655
656 if !init_resp.status().is_success() {
657 let status = init_resp.status();
658 let err = init_resp
659 .text()
660 .await
661 .unwrap_or_else(|_| "Unknown error".to_string());
662 tracing::warn!(
663 target: "posemesh_compute_node::storage::client",
664 %status,
665 error = %err,
666 "Multipart file initiation failed"
667 );
668 if is_unsupported_endpoint_status(status) {
669 return Err(UploadFileFallback::UnsupportedEndpoint);
670 }
671 return Err(UploadFileFallback::Error(map_status(status)));
672 }
673
674 let init: InitiateMultipartResponseV1 = init_resp.json().await.map_err(|e| {
675 UploadFileFallback::Error(StorageError::Other(format!(
676 "invalid initiate response: {}",
677 e
678 )))
679 })?;
680
681 let part_size = usize::try_from(init.part_size).map_err(|_| {
682 UploadFileFallback::Error(StorageError::Other("invalid multipart part_size".into()))
683 })?;
684 if part_size == 0 {
685 return Err(UploadFileFallback::Error(StorageError::Other(
686 "invalid multipart part_size".into(),
687 )));
688 }
689
690 let upload_id = init.upload_id;
691 let mut completed_parts: Vec<CompletedPartV1> = Vec::new();
692
693 let upload_res: std::result::Result<DomainDataMetadataV1, StorageError> = async {
694 let mut part_number: i32 = 1;
695 loop {
696 let mut chunk = vec![0u8; part_size];
697 let mut read = 0usize;
698 while read < part_size {
699 let n = file
700 .read(&mut chunk[read..])
701 .await
702 .map_err(|e| StorageError::Other(format!("read upload file: {}", e)))?;
703 if n == 0 {
704 break;
705 }
706 read += n;
707 }
708 if read == 0 {
709 break;
710 }
711 chunk.truncate(read);
712
713 let part_endpoint = format!(
714 "{}/api/v1/domains/{}/data/multipart?uploadId={}&partNumber={}",
715 base, domain_id, upload_id, part_number
716 );
717 tracing::debug!(
718 target: "posemesh_compute_node::storage::client",
719 method = "PUT",
720 url = %part_endpoint,
721 part_number,
722 part_bytes = chunk.len(),
723 "Uploading multipart file part"
724 );
725
726 let resp = client
727 .put(&part_endpoint)
728 .bearer_auth(self.token.get())
729 .header("posemesh-client-id", self.client_id.as_str())
730 .header("Content-Type", "application/octet-stream")
731 .body(chunk)
732 .send()
733 .await
734 .map_err(|e| StorageError::Network(e.to_string()))?;
735
736 if !resp.status().is_success() {
737 let status = resp.status();
738 let err = resp
739 .text()
740 .await
741 .unwrap_or_else(|_| "Unknown error".to_string());
742 tracing::warn!(
743 target: "posemesh_compute_node::storage::client",
744 %status,
745 error = %err,
746 part_number,
747 "Multipart file part upload failed"
748 );
749 return Err(map_status(status));
750 }
751
752 let res: UploadPartResultV1 = resp
753 .json()
754 .await
755 .map_err(|e| StorageError::Other(format!("invalid part response: {}", e)))?;
756
757 completed_parts.push(CompletedPartV1 {
758 part_number,
759 etag: res.etag,
760 });
761
762 part_number = part_number
763 .checked_add(1)
764 .ok_or_else(|| StorageError::Other("multipart upload too many parts".into()))?;
765 }
766
767 if completed_parts.is_empty() {
768 return Err(StorageError::BadRequest);
769 }
770
771 let complete_endpoint = format!(
772 "{}/api/v1/domains/{}/data/multipart?uploadId={}",
773 base, domain_id, upload_id
774 );
775 tracing::debug!(
776 target: "posemesh_compute_node::storage::client",
777 method = "POST",
778 url = %complete_endpoint,
779 parts = completed_parts.len(),
780 "Completing multipart file upload"
781 );
782 let resp = client
783 .post(&complete_endpoint)
784 .bearer_auth(self.token.get())
785 .header("posemesh-client-id", self.client_id.as_str())
786 .header("Content-Type", "application/json")
787 .json(&CompleteMultipartRequestV1 {
788 parts: completed_parts,
789 })
790 .send()
791 .await
792 .map_err(|e| StorageError::Network(e.to_string()))?;
793
794 if !resp.status().is_success() {
795 let status = resp.status();
796 let err = resp
797 .text()
798 .await
799 .unwrap_or_else(|_| "Unknown error".to_string());
800 tracing::warn!(
801 target: "posemesh_compute_node::storage::client",
802 %status,
803 error = %err,
804 "Multipart file completion failed"
805 );
806 return Err(map_status(status));
807 }
808
809 resp.json::<DomainDataMetadataV1>()
810 .await
811 .map_err(|e| StorageError::Other(format!("invalid complete response: {}", e)))
812 }
813 .await;
814
815 if upload_res.is_err() {
816 let abort_endpoint = format!(
817 "{}/api/v1/domains/{}/data/multipart?uploadId={}",
818 base, domain_id, upload_id
819 );
820 let _ = client
821 .delete(&abort_endpoint)
822 .bearer_auth(self.token.get())
823 .header("posemesh-client-id", self.client_id.as_str())
824 .send()
825 .await;
826 }
827
828 upload_res
829 .map(|meta| Some(meta.id))
830 .map_err(UploadFileFallback::Error)
831 }
832
833 pub async fn find_artifact_id(
834 &self,
835 domain_id: &str,
836 name: &str,
837 data_type: &str,
838 ) -> std::result::Result<Option<String>, StorageError> {
839 let domain_id = domain_id.trim();
840 if domain_id.is_empty() {
841 return Err(StorageError::Other(
842 "missing domain_id for artifact lookup".into(),
843 ));
844 }
845
846 let query = posemesh_domain_http::domain_data::DownloadQuery {
847 ids: Vec::new(),
848 name: Some(name.to_string()),
849 data_type: Some(data_type.to_string()),
850 };
851
852 let base = self.base.as_str().trim_end_matches('/');
853 let url = format!("{}/api/v1/domains/{}/data", base, domain_id);
854 tracing::debug!(
855 target: "posemesh_compute_node::storage::client",
856 method = "GET",
857 %url,
858 artifact_name = name,
859 artifact_type = data_type,
860 "Looking up existing domain artifact"
861 );
862
863 let results = posemesh_domain_http::domain_data::download_metadata_v1(
864 base,
865 self.client_id.as_str(),
866 self.token.get().as_str(),
867 domain_id,
868 &query,
869 )
870 .await;
871
872 let results = match results {
873 Ok(items) => items,
874 Err(err) => {
875 if let posemesh_domain_http::errors::DomainError::AukiErrorResponse(resp) = &err {
876 if resp.status == reqwest::StatusCode::NOT_FOUND {
877 return Ok(None);
878 }
879 }
880 return Err(map_domain_error(err));
881 }
882 };
883
884 Ok(results
885 .into_iter()
886 .find(|item| item.name == name && item.data_type == data_type)
887 .map(|item| item.id))
888 }
889}
890
891async fn fetch_info_v1(base: &str) -> Result<Option<UploadInfoV1>, ()> {
892 let resp = reqwest::Client::new()
893 .get(format!("{}/api/v1/info", base))
894 .send()
895 .await
896 .map_err(|_| ())?;
897
898 if resp.status() == reqwest::StatusCode::NOT_FOUND {
899 return Ok(None);
900 }
901 if !resp.status().is_success() {
902 return Err(());
903 }
904
905 let info = resp.json::<InfoResponseV1>().await.map_err(|_| ())?;
906 Ok(Some(UploadInfoV1 {
907 request_max_bytes: info.upload.request_max_bytes,
908 multipart_enabled: info.upload.multipart.enabled,
909 }))
910}
911
912async fn get_upload_info_v1(base: &str) -> Option<UploadInfoV1> {
913 let now = now_unix_secs();
914 {
915 let cache = info_cache_v1().lock();
916 if let Some(entry) = cache.get(base) {
917 if entry.expires_at > now {
918 return entry.value.clone();
919 }
920 }
921 }
922
923 let fetched = match fetch_info_v1(base).await {
924 Ok(v) => v,
925 Err(_) => return None,
926 };
927
928 let mut cache = info_cache_v1().lock();
929 cache.retain(|_, entry| entry.expires_at > now);
930 cache.insert(
931 base.to_string(),
932 InfoCacheEntryV1 {
933 value: fetched.clone(),
934 expires_at: now.saturating_add(INFO_CACHE_TTL_SECS),
935 },
936 );
937 fetched
938}
939
940fn fits_single_upload_request(
941 request_max_bytes: i64,
942 name: &str,
943 data_type: &str,
944 existing_id: Option<&str>,
945 data_len: usize,
946) -> bool {
947 let boundary = "boundary";
948 let header = if let Some(id) = existing_id {
949 format!(
950 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; id=\"{}\"\r\n\r\n",
951 boundary, id
952 )
953 } else {
954 format!(
955 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
956 boundary, name, data_type
957 )
958 };
959 let closing = format!("--{}--\r\n", boundary);
960 let part_len = header.len() + data_len + 2;
961 (part_len + closing.len()) as i64 <= request_max_bytes
962}
963
964fn map_status(status: reqwest::StatusCode) -> StorageError {
965 match status.as_u16() {
966 400 => StorageError::BadRequest,
967 401 => StorageError::Unauthorized,
968 404 => StorageError::NotFound,
969 409 => StorageError::Conflict,
970 n if (500..=599).contains(&n) => StorageError::Server(n),
971 other => StorageError::Other(format!("unexpected status: {}", other)),
972 }
973}
974
975fn is_unsupported_endpoint_status(status: reqwest::StatusCode) -> bool {
976 status == reqwest::StatusCode::NOT_FOUND
977 || status == reqwest::StatusCode::METHOD_NOT_ALLOWED
978 || status == reqwest::StatusCode::NOT_IMPLEMENTED
979}
980
981enum UploadFileFallback {
982 UnsupportedEndpoint,
983 Error(StorageError),
984}
985
986fn map_domain_error(err: posemesh_domain_http::errors::DomainError) -> StorageError {
987 use posemesh_domain_http::errors::{AuthError, DomainError};
988
989 match err {
990 DomainError::AukiErrorResponse(resp) => map_status(resp.status),
991 DomainError::ReqwestError(e) => StorageError::Network(e.to_string()),
992 DomainError::AuthError(AuthError::Unauthorized(_)) => StorageError::Unauthorized,
993 other => StorageError::Other(other.to_string()),
994 }
995}
996
997fn env_client_id() -> String {
998 std::env::var("CLIENT_ID")
999 .ok()
1000 .map(|v| v.trim().to_string())
1001 .filter(|v| !v.is_empty())
1002 .unwrap_or_else(|| format!("posemesh-compute-node/{}", Uuid::new_v4()))
1003}
1004
1005fn resolve_domain_url(base: &Url, value: &str) -> std::result::Result<Url, StorageError> {
1006 if value.contains("://") {
1007 Url::parse(value).map_err(|e| StorageError::Other(format!("parse domain url: {e}")))
1008 } else {
1009 base.join(value)
1010 .map_err(|e| StorageError::Other(format!("join domain url: {e}")))
1011 }
1012}
1013
1014fn parse_download_target(
1015 url: &Url,
1016 fallback_domain_id: Option<&str>,
1017) -> std::result::Result<(String, posemesh_domain_http::domain_data::DownloadQuery), StorageError> {
1018 let segments: Vec<&str> = url
1019 .path_segments()
1020 .map(|segments| segments.filter(|seg| !seg.is_empty()).collect())
1021 .unwrap_or_default();
1022
1023 let mut domain_id_from_path: Option<&str> = None;
1024 let mut data_id_from_path: Option<&str> = None;
1025
1026 for idx in 0..segments.len() {
1027 if segments[idx] == "domains" && idx + 2 < segments.len() && segments[idx + 2] == "data" {
1028 domain_id_from_path = Some(segments[idx + 1]);
1029 data_id_from_path = segments.get(idx + 3).copied();
1030 break;
1031 }
1032 }
1033
1034 let domain_id = domain_id_from_path
1035 .or(fallback_domain_id)
1036 .ok_or_else(|| StorageError::Other(format!("cid url missing domain_id: {}", url)))?
1037 .to_string();
1038
1039 let mut ids: Vec<String> = Vec::new();
1040 let mut name: Option<String> = None;
1041 let mut data_type: Option<String> = None;
1042
1043 for (key, value) in url.query_pairs() {
1044 match key.as_ref() {
1045 "ids" => ids.extend(
1046 value
1047 .split(',')
1048 .map(|s| s.trim())
1049 .filter(|s| !s.is_empty())
1050 .map(|s| s.to_string()),
1051 ),
1052 "name" => {
1053 if name.is_none() {
1054 name = Some(value.to_string());
1055 }
1056 }
1057 "data_type" => {
1058 if data_type.is_none() {
1059 data_type = Some(value.to_string());
1060 }
1061 }
1062 _ => {}
1063 }
1064 }
1065
1066 if let Some(id) = data_id_from_path {
1067 ids = vec![id.to_string()];
1068 }
1069
1070 Ok((
1071 domain_id,
1072 posemesh_domain_http::domain_data::DownloadQuery {
1073 ids,
1074 name,
1075 data_type,
1076 },
1077 ))
1078}
1079
1080fn sanitize_component(value: &str) -> String {
1083 let sanitized: String = value
1084 .chars()
1085 .map(|c| {
1086 if c.is_ascii_alphanumeric() || matches!(c, '-' | '_') {
1087 c
1088 } else {
1089 '_'
1090 }
1091 })
1092 .collect();
1093 if sanitized.is_empty() {
1094 "part".into()
1095 } else {
1096 sanitized
1097 }
1098}
1099
1100fn extract_timestamp(name: &str) -> Option<String> {
1101 Regex::new(r"\d{4}-\d{2}-\d{2}[_-]\d{2}-\d{2}-\d{2}")
1102 .ok()
1103 .and_then(|re| re.find(name).map(|m| m.as_str().to_string()))
1104}
1105
1106fn map_filename(data_type: &str, name: &str) -> String {
1107 format!(
1108 "{}.{}",
1109 sanitize_component(name),
1110 sanitize_component(data_type)
1111 )
1112}