1use crate::errors::StorageError;
2use crate::storage::token::TokenRef;
3use anyhow::Result;
4use futures::StreamExt;
5use regex::Regex;
6use reqwest::Method;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::path::PathBuf;
10use std::sync::OnceLock;
11use std::time::Duration;
12use tokio::fs;
13use url::Url;
14use uuid::Uuid;
15#[derive(Debug, Clone)]
19pub struct DownloadedPart {
20 pub id: Option<String>,
21 pub name: Option<String>,
22 pub data_type: Option<String>,
23 pub domain_id: Option<String>,
24 pub path: PathBuf,
25 pub root: PathBuf,
26 pub relative_path: PathBuf,
27 pub extracted_paths: Vec<PathBuf>,
28}
29
30#[derive(Debug)]
31pub struct UploadRequest<'a> {
32 pub domain_id: &'a str,
33 pub name: &'a str,
34 pub data_type: &'a str,
35 pub logical_path: &'a str,
36 pub bytes: &'a [u8],
37 pub existing_id: Option<&'a str>,
38}
39
40#[derive(Debug, Clone)]
41struct UploadInfoV1 {
42 request_max_bytes: i64,
43 multipart_enabled: bool,
44}
45
46#[derive(Debug, Deserialize, Clone)]
47struct InfoResponseV1 {
48 upload: InfoUploadV1,
49}
50
51#[derive(Debug, Deserialize, Clone)]
52struct InfoUploadV1 {
53 request_max_bytes: i64,
54 multipart: InfoMultipartV1,
55}
56
57#[derive(Debug, Deserialize, Clone)]
58struct InfoMultipartV1 {
59 enabled: bool,
60}
61
62#[derive(Debug, Serialize)]
63struct InitiateMultipartRequestV1 {
64 name: String,
65 data_type: String,
66 size: Option<i64>,
67 content_type: Option<String>,
68 existing_id: Option<String>,
69}
70
71#[derive(Debug, Deserialize)]
72struct InitiateMultipartResponseV1 {
73 upload_id: String,
74 part_size: i64,
75}
76
77#[derive(Debug, Deserialize)]
78struct UploadPartResultV1 {
79 etag: String,
80}
81
82#[derive(Debug, Serialize)]
83struct CompletedPartV1 {
84 part_number: i32,
85 etag: String,
86}
87
88#[derive(Debug, Serialize)]
89struct CompleteMultipartRequestV1 {
90 parts: Vec<CompletedPartV1>,
91}
92
93#[derive(Debug, Deserialize)]
94struct DomainDataMetadataV1 {
95 id: String,
96}
97
98#[derive(Debug, Clone)]
99struct InfoCacheEntryV1 {
100 value: Option<UploadInfoV1>,
101 expires_at: u64,
102}
103
104const INFO_CACHE_TTL_SECS: u64 = 60;
105static INFO_CACHE_V1: OnceLock<parking_lot::Mutex<HashMap<String, InfoCacheEntryV1>>> =
106 OnceLock::new();
107
108fn info_cache_v1() -> &'static parking_lot::Mutex<HashMap<String, InfoCacheEntryV1>> {
109 INFO_CACHE_V1.get_or_init(|| parking_lot::Mutex::new(HashMap::new()))
110}
111
112fn now_unix_secs() -> u64 {
113 std::time::SystemTime::now()
114 .duration_since(std::time::UNIX_EPOCH)
115 .unwrap_or_else(|_| Duration::from_secs(0))
116 .as_secs()
117}
118
119#[derive(Clone)]
121pub struct DomainClient {
122 pub base: Url,
123 pub token: TokenRef,
124 client_id: String,
125}
126impl DomainClient {
127 pub fn new(base: Url, token: TokenRef) -> Result<Self> {
128 let client_id = env_client_id();
129 Ok(Self {
130 base,
131 token,
132 client_id,
133 })
134 }
135
136 pub fn with_timeout(base: Url, token: TokenRef, _timeout: Duration) -> Result<Self> {
137 let client_id = env_client_id();
138 Ok(Self {
139 base,
140 token,
141 client_id,
142 })
143 }
144
145 pub async fn download_uri(
148 &self,
149 uri: &str,
150 ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
151 let resolved = resolve_domain_url(&self.base, uri)?;
152 let (domain_id, query) = parse_download_target(&resolved, None)?;
153 self.download_domain_data(&resolved, &domain_id, query)
154 .await
155 }
156
157 pub async fn download_cid(
161 &self,
162 domain_id: &str,
163 cid: &str,
164 ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
165 let cid = cid.trim();
166 if cid.is_empty() {
167 return Err(StorageError::Other("empty cid".into()));
168 }
169
170 if cid.contains("://") || cid.starts_with('/') {
171 let resolved = resolve_domain_url(&self.base, cid)?;
172 let (domain_id, query) = parse_download_target(&resolved, Some(domain_id))?;
173 return self
174 .download_domain_data(&resolved, &domain_id, query)
175 .await;
176 }
177
178 let query = posemesh_domain_http::domain_data::DownloadQuery {
179 ids: vec![cid.to_string()],
180 name: None,
181 data_type: None,
182 };
183 self.download_domain_data(&self.base, domain_id, query)
184 .await
185 }
186
187 async fn download_domain_data(
188 &self,
189 url_for_log: &Url,
190 domain_id: &str,
191 query: posemesh_domain_http::domain_data::DownloadQuery,
192 ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
193 let domain_id = domain_id.trim();
194 if domain_id.is_empty() {
195 return Err(StorageError::Other("missing domain_id for download".into()));
196 }
197
198 tracing::debug!(
199 target: "posemesh_compute_node::storage::client",
200 method = "GET",
201 %url_for_log,
202 domain_id = domain_id,
203 ids = ?query.ids,
204 name = ?query.name,
205 data_type = ?query.data_type,
206 "Downloading domain data"
207 );
208
209 let base = self.base.as_str().trim_end_matches('/');
210 let mut rx = posemesh_domain_http::domain_data::download_v1_stream(
211 base,
212 self.client_id.as_str(),
213 self.token.get().as_str(),
214 domain_id,
215 &query,
216 )
217 .await
218 .map_err(map_domain_error)?;
219
220 let root = std::env::temp_dir().join(format!("domain-input-{}", Uuid::new_v4()));
221 fs::create_dir_all(&root)
222 .await
223 .map_err(|e| StorageError::Other(format!("create download root: {}", e)))?;
224 let datasets_root = root.join("datasets");
225 fs::create_dir_all(&datasets_root)
226 .await
227 .map_err(|e| StorageError::Other(format!("create datasets root: {}", e)))?;
228
229 let mut parts = Vec::new();
230
231 while let Some(item) = rx.next().await {
232 let domain_item = item.map_err(map_domain_error)?;
233 let name = domain_item.metadata.name.clone();
234 let data_type = domain_item.metadata.data_type.clone();
235
236 let scan_folder = extract_timestamp(&name)
237 .map(|ts| sanitize_component(&ts))
238 .unwrap_or_else(|| sanitize_component(&name));
239 let scan_dir = datasets_root.join(&scan_folder);
240 fs::create_dir_all(&scan_dir)
241 .await
242 .map_err(|e| StorageError::Other(format!("create scan dir: {}", e)))?;
243
244 let file_name = map_filename(&data_type, &name);
245 let file_path = scan_dir.join(&file_name);
246 if let Some(parent) = file_path.parent() {
247 fs::create_dir_all(parent)
248 .await
249 .map_err(|e| StorageError::Other(format!("create parent dir: {}", e)))?;
250 }
251 fs::write(&file_path, &domain_item.data)
252 .await
253 .map_err(|e| StorageError::Other(format!("write temp file: {}", e)))?;
254
255 let extracted_paths = Vec::new();
256
257 let relative_path = file_path
258 .strip_prefix(&root)
259 .unwrap_or(&file_path)
260 .to_path_buf();
261
262 parts.push(DownloadedPart {
263 id: Some(domain_item.metadata.id),
264 name: Some(name),
265 data_type: Some(data_type),
266 domain_id: Some(domain_item.metadata.domain_id),
267 path: file_path,
268 root: root.clone(),
269 relative_path,
270 extracted_paths,
271 });
272 }
273
274 if parts.is_empty() {
275 return Err(StorageError::NotFound);
276 }
277
278 Ok(parts)
279 }
280
281 pub async fn upload_artifact(
282 &self,
283 request: UploadRequest<'_>,
284 ) -> std::result::Result<Option<String>, StorageError> {
285 let domain_id = request.domain_id.trim();
286 if domain_id.is_empty() {
287 return Err(StorageError::Other(
288 "missing domain_id for artifact upload".into(),
289 ));
290 }
291
292 let base = self.base.as_str().trim_end_matches('/');
293 if let Some(info) = get_upload_info_v1(base).await {
294 if info.multipart_enabled && info.request_max_bytes > 0 {
295 let fits_alone = fits_single_upload_request(
296 info.request_max_bytes,
297 request.name,
298 request.data_type,
299 request.existing_id,
300 request.bytes.len(),
301 );
302 if !fits_alone {
303 return self
304 .upload_artifact_v1_multipart(base, domain_id, request)
305 .await;
306 }
307 }
308 }
309
310 let action = if let Some(id) = request.existing_id {
311 posemesh_domain_http::domain_data::DomainAction::Update { id: id.to_string() }
312 } else {
313 posemesh_domain_http::domain_data::DomainAction::Create {
314 name: request.name.to_string(),
315 data_type: request.data_type.to_string(),
316 }
317 };
318
319 let upload = posemesh_domain_http::domain_data::UploadDomainData {
320 action,
321 data: request.bytes.to_vec(),
322 };
323 let method = if request.existing_id.is_some() {
324 Method::PUT
325 } else {
326 Method::POST
327 };
328 tracing::debug!(
329 target: "posemesh_compute_node::storage::client",
330 method = %method,
331 url = %format!("{}/api/v1/domains/{}/data", base, domain_id),
332 logical_path = request.logical_path,
333 name = request.name,
334 data_type = request.data_type,
335 has_existing_id = request.existing_id.is_some(),
336 "Sending domain upload request"
337 );
338
339 let items = posemesh_domain_http::domain_data::upload_v1(
340 base,
341 self.token.get().as_str(),
342 domain_id,
343 vec![upload],
344 )
345 .await
346 .map_err(map_domain_error)?;
347
348 Ok(items.into_iter().next().map(|d| d.id))
349 }
350
351 async fn upload_artifact_v1_multipart(
352 &self,
353 base: &str,
354 domain_id: &str,
355 request: UploadRequest<'_>,
356 ) -> std::result::Result<Option<String>, StorageError> {
357 if request.bytes.is_empty() {
358 return Err(StorageError::BadRequest);
359 }
360
361 let client = reqwest::Client::new();
362 let initiate_endpoint = format!(
363 "{}/api/v1/domains/{}/data/multipart?uploads",
364 base, domain_id
365 );
366
367 let init_req = InitiateMultipartRequestV1 {
368 name: request.name.to_string(),
369 data_type: request.data_type.to_string(),
370 size: Some(request.bytes.len() as i64),
371 content_type: Some("application/octet-stream".to_string()),
372 existing_id: request.existing_id.map(|id| id.to_string()),
373 };
374
375 tracing::debug!(
376 target: "posemesh_compute_node::storage::client",
377 method = "POST",
378 url = %initiate_endpoint,
379 logical_path = request.logical_path,
380 name = request.name,
381 data_type = request.data_type,
382 has_existing_id = request.existing_id.is_some(),
383 "Initiating multipart upload"
384 );
385
386 let init_resp = client
387 .post(&initiate_endpoint)
388 .bearer_auth(self.token.get())
389 .header("posemesh-client-id", self.client_id.as_str())
390 .header("Content-Type", "application/json")
391 .json(&init_req)
392 .send()
393 .await
394 .map_err(|e| StorageError::Network(e.to_string()))?;
395
396 if !init_resp.status().is_success() {
397 let status = init_resp.status();
398 let err = init_resp
399 .text()
400 .await
401 .unwrap_or_else(|_| "Unknown error".to_string());
402 tracing::warn!(
403 target: "posemesh_compute_node::storage::client",
404 %status,
405 error = %err,
406 "Multipart initiation failed"
407 );
408 return Err(map_status(status));
409 }
410
411 let init: InitiateMultipartResponseV1 = init_resp
412 .json()
413 .await
414 .map_err(|e| StorageError::Other(format!("invalid initiate response: {}", e)))?;
415
416 let part_size = usize::try_from(init.part_size)
417 .map_err(|_| StorageError::Other("invalid multipart part_size".into()))?;
418 if part_size == 0 {
419 return Err(StorageError::Other("invalid multipart part_size".into()));
420 }
421
422 let upload_id = init.upload_id;
423 let mut completed_parts: Vec<CompletedPartV1> = Vec::new();
424
425 let upload_res: std::result::Result<DomainDataMetadataV1, StorageError> = async {
426 let mut offset: usize = 0;
427 let mut part_number: i32 = 1;
428 while offset < request.bytes.len() {
429 let end = std::cmp::min(offset + part_size, request.bytes.len());
430 let chunk = request.bytes[offset..end].to_vec();
431
432 let part_endpoint = format!(
433 "{}/api/v1/domains/{}/data/multipart?uploadId={}&partNumber={}",
434 base, domain_id, upload_id, part_number
435 );
436 tracing::debug!(
437 target: "posemesh_compute_node::storage::client",
438 method = "PUT",
439 url = %part_endpoint,
440 part_number,
441 part_bytes = chunk.len(),
442 "Uploading multipart part"
443 );
444
445 let resp = client
446 .put(&part_endpoint)
447 .bearer_auth(self.token.get())
448 .header("posemesh-client-id", self.client_id.as_str())
449 .header("Content-Type", "application/octet-stream")
450 .body(chunk)
451 .send()
452 .await
453 .map_err(|e| StorageError::Network(e.to_string()))?;
454
455 if !resp.status().is_success() {
456 let status = resp.status();
457 let err = resp
458 .text()
459 .await
460 .unwrap_or_else(|_| "Unknown error".to_string());
461 tracing::warn!(
462 target: "posemesh_compute_node::storage::client",
463 %status,
464 error = %err,
465 part_number,
466 "Multipart part upload failed"
467 );
468 return Err(map_status(status));
469 }
470
471 let res: UploadPartResultV1 = resp
472 .json()
473 .await
474 .map_err(|e| StorageError::Other(format!("invalid part response: {}", e)))?;
475
476 completed_parts.push(CompletedPartV1 {
477 part_number,
478 etag: res.etag,
479 });
480
481 offset = end;
482 part_number = part_number
483 .checked_add(1)
484 .ok_or_else(|| StorageError::Other("multipart upload too many parts".into()))?;
485 }
486
487 let complete_endpoint = format!(
488 "{}/api/v1/domains/{}/data/multipart?uploadId={}",
489 base, domain_id, upload_id
490 );
491 tracing::debug!(
492 target: "posemesh_compute_node::storage::client",
493 method = "POST",
494 url = %complete_endpoint,
495 parts = completed_parts.len(),
496 "Completing multipart upload"
497 );
498 let resp = client
499 .post(&complete_endpoint)
500 .bearer_auth(self.token.get())
501 .header("posemesh-client-id", self.client_id.as_str())
502 .header("Content-Type", "application/json")
503 .json(&CompleteMultipartRequestV1 {
504 parts: completed_parts,
505 })
506 .send()
507 .await
508 .map_err(|e| StorageError::Network(e.to_string()))?;
509
510 if !resp.status().is_success() {
511 let status = resp.status();
512 let err = resp
513 .text()
514 .await
515 .unwrap_or_else(|_| "Unknown error".to_string());
516 tracing::warn!(
517 target: "posemesh_compute_node::storage::client",
518 %status,
519 error = %err,
520 "Multipart completion failed"
521 );
522 return Err(map_status(status));
523 }
524
525 resp.json::<DomainDataMetadataV1>()
526 .await
527 .map_err(|e| StorageError::Other(format!("invalid complete response: {}", e)))
528 }
529 .await;
530
531 if upload_res.is_err() {
532 let abort_endpoint = format!(
533 "{}/api/v1/domains/{}/data/multipart?uploadId={}",
534 base, domain_id, upload_id
535 );
536 let _ = client
537 .delete(&abort_endpoint)
538 .bearer_auth(self.token.get())
539 .header("posemesh-client-id", self.client_id.as_str())
540 .send()
541 .await;
542 }
543
544 upload_res.map(|meta| Some(meta.id))
545 }
546
547 pub async fn find_artifact_id(
548 &self,
549 domain_id: &str,
550 name: &str,
551 data_type: &str,
552 ) -> std::result::Result<Option<String>, StorageError> {
553 let domain_id = domain_id.trim();
554 if domain_id.is_empty() {
555 return Err(StorageError::Other(
556 "missing domain_id for artifact lookup".into(),
557 ));
558 }
559
560 let query = posemesh_domain_http::domain_data::DownloadQuery {
561 ids: Vec::new(),
562 name: Some(name.to_string()),
563 data_type: Some(data_type.to_string()),
564 };
565
566 let base = self.base.as_str().trim_end_matches('/');
567 let url = format!("{}/api/v1/domains/{}/data", base, domain_id);
568 tracing::debug!(
569 target: "posemesh_compute_node::storage::client",
570 method = "GET",
571 %url,
572 artifact_name = name,
573 artifact_type = data_type,
574 "Looking up existing domain artifact"
575 );
576
577 let results = posemesh_domain_http::domain_data::download_metadata_v1(
578 base,
579 self.client_id.as_str(),
580 self.token.get().as_str(),
581 domain_id,
582 &query,
583 )
584 .await;
585
586 let results = match results {
587 Ok(items) => items,
588 Err(err) => {
589 if let posemesh_domain_http::errors::DomainError::AukiErrorResponse(resp) = &err {
590 if resp.status == reqwest::StatusCode::NOT_FOUND {
591 return Ok(None);
592 }
593 }
594 return Err(map_domain_error(err));
595 }
596 };
597
598 Ok(results
599 .into_iter()
600 .find(|item| item.name == name && item.data_type == data_type)
601 .map(|item| item.id))
602 }
603}
604
605async fn fetch_info_v1(base: &str) -> Result<Option<UploadInfoV1>, ()> {
606 let resp = reqwest::Client::new()
607 .get(&format!("{}/api/v1/info", base))
608 .send()
609 .await
610 .map_err(|_| ())?;
611
612 if resp.status() == reqwest::StatusCode::NOT_FOUND {
613 return Ok(None);
614 }
615 if !resp.status().is_success() {
616 return Err(());
617 }
618
619 let info = resp.json::<InfoResponseV1>().await.map_err(|_| ())?;
620 Ok(Some(UploadInfoV1 {
621 request_max_bytes: info.upload.request_max_bytes,
622 multipart_enabled: info.upload.multipart.enabled,
623 }))
624}
625
626async fn get_upload_info_v1(base: &str) -> Option<UploadInfoV1> {
627 let now = now_unix_secs();
628 {
629 let cache = info_cache_v1().lock();
630 if let Some(entry) = cache.get(base) {
631 if entry.expires_at > now {
632 return entry.value.clone();
633 }
634 }
635 }
636
637 let fetched = match fetch_info_v1(base).await {
638 Ok(v) => v,
639 Err(_) => return None,
640 };
641
642 let mut cache = info_cache_v1().lock();
643 cache.retain(|_, entry| entry.expires_at > now);
644 cache.insert(
645 base.to_string(),
646 InfoCacheEntryV1 {
647 value: fetched.clone(),
648 expires_at: now.saturating_add(INFO_CACHE_TTL_SECS),
649 },
650 );
651 fetched
652}
653
654fn fits_single_upload_request(
655 request_max_bytes: i64,
656 name: &str,
657 data_type: &str,
658 existing_id: Option<&str>,
659 data_len: usize,
660) -> bool {
661 let boundary = "boundary";
662 let header = if let Some(id) = existing_id {
663 format!(
664 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; id=\"{}\"\r\n\r\n",
665 boundary, id
666 )
667 } else {
668 format!(
669 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
670 boundary, name, data_type
671 )
672 };
673 let closing = format!("--{}--\r\n", boundary);
674 let part_len = header.as_bytes().len() + data_len + 2;
675 (part_len + closing.len()) as i64 <= request_max_bytes
676}
677
678fn map_status(status: reqwest::StatusCode) -> StorageError {
679 match status.as_u16() {
680 400 => StorageError::BadRequest,
681 401 => StorageError::Unauthorized,
682 404 => StorageError::NotFound,
683 409 => StorageError::Conflict,
684 n if (500..=599).contains(&n) => StorageError::Server(n),
685 other => StorageError::Other(format!("unexpected status: {}", other)),
686 }
687}
688
689fn map_domain_error(err: posemesh_domain_http::errors::DomainError) -> StorageError {
690 use posemesh_domain_http::errors::{AuthError, DomainError};
691
692 match err {
693 DomainError::AukiErrorResponse(resp) => map_status(resp.status),
694 DomainError::ReqwestError(e) => StorageError::Network(e.to_string()),
695 DomainError::AuthError(AuthError::Unauthorized(_)) => StorageError::Unauthorized,
696 other => StorageError::Other(other.to_string()),
697 }
698}
699
700fn env_client_id() -> String {
701 std::env::var("CLIENT_ID")
702 .ok()
703 .map(|v| v.trim().to_string())
704 .filter(|v| !v.is_empty())
705 .unwrap_or_else(|| format!("posemesh-compute-node/{}", Uuid::new_v4()))
706}
707
708fn resolve_domain_url(base: &Url, value: &str) -> std::result::Result<Url, StorageError> {
709 if value.contains("://") {
710 Url::parse(value).map_err(|e| StorageError::Other(format!("parse domain url: {e}")))
711 } else {
712 base.join(value)
713 .map_err(|e| StorageError::Other(format!("join domain url: {e}")))
714 }
715}
716
717fn parse_download_target(
718 url: &Url,
719 fallback_domain_id: Option<&str>,
720) -> std::result::Result<(String, posemesh_domain_http::domain_data::DownloadQuery), StorageError> {
721 let segments: Vec<&str> = url
722 .path_segments()
723 .map(|segments| segments.filter(|seg| !seg.is_empty()).collect())
724 .unwrap_or_default();
725
726 let mut domain_id_from_path: Option<&str> = None;
727 let mut data_id_from_path: Option<&str> = None;
728
729 for idx in 0..segments.len() {
730 if segments[idx] == "domains" && idx + 2 < segments.len() && segments[idx + 2] == "data" {
731 domain_id_from_path = Some(segments[idx + 1]);
732 data_id_from_path = segments.get(idx + 3).copied();
733 break;
734 }
735 }
736
737 let domain_id = domain_id_from_path
738 .or(fallback_domain_id)
739 .ok_or_else(|| StorageError::Other(format!("cid url missing domain_id: {}", url)))?
740 .to_string();
741
742 let mut ids: Vec<String> = Vec::new();
743 let mut name: Option<String> = None;
744 let mut data_type: Option<String> = None;
745
746 for (key, value) in url.query_pairs() {
747 match key.as_ref() {
748 "ids" => ids.extend(
749 value
750 .split(',')
751 .map(|s| s.trim())
752 .filter(|s| !s.is_empty())
753 .map(|s| s.to_string()),
754 ),
755 "name" => {
756 if name.is_none() {
757 name = Some(value.to_string());
758 }
759 }
760 "data_type" => {
761 if data_type.is_none() {
762 data_type = Some(value.to_string());
763 }
764 }
765 _ => {}
766 }
767 }
768
769 if let Some(id) = data_id_from_path {
770 ids = vec![id.to_string()];
771 }
772
773 Ok((
774 domain_id,
775 posemesh_domain_http::domain_data::DownloadQuery {
776 ids,
777 name,
778 data_type,
779 },
780 ))
781}
782
783fn sanitize_component(value: &str) -> String {
786 let sanitized: String = value
787 .chars()
788 .map(|c| {
789 if c.is_ascii_alphanumeric() || matches!(c, '-' | '_') {
790 c
791 } else {
792 '_'
793 }
794 })
795 .collect();
796 if sanitized.is_empty() {
797 "part".into()
798 } else {
799 sanitized
800 }
801}
802
803fn extract_timestamp(name: &str) -> Option<String> {
804 Regex::new(r"\d{4}-\d{2}-\d{2}[_-]\d{2}-\d{2}-\d{2}")
805 .ok()
806 .and_then(|re| re.find(name).map(|m| m.as_str().to_string()))
807}
808
809fn map_filename(data_type: &str, name: &str) -> String {
810 format!(
811 "{}.{}",
812 sanitize_component(name),
813 sanitize_component(data_type)
814 )
815}