posemesh_compute_node/storage/
client.rs

1use crate::errors::StorageError;
2use crate::storage::token::TokenRef;
3use anyhow::Result;
4use futures::StreamExt;
5use regex::Regex;
6use reqwest::Method;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::path::PathBuf;
10use std::sync::OnceLock;
11use std::time::Duration;
12use tokio::fs;
13use url::Url;
14use uuid::Uuid;
15// zip extraction moved to reconstruction-specific runner
16
17/// Representation of one multipart section downloaded from Domain.
18#[derive(Debug, Clone)]
19pub struct DownloadedPart {
20    pub id: Option<String>,
21    pub name: Option<String>,
22    pub data_type: Option<String>,
23    pub domain_id: Option<String>,
24    pub path: PathBuf,
25    pub root: PathBuf,
26    pub relative_path: PathBuf,
27    pub extracted_paths: Vec<PathBuf>,
28}
29
30#[derive(Debug)]
31pub struct UploadRequest<'a> {
32    pub domain_id: &'a str,
33    pub name: &'a str,
34    pub data_type: &'a str,
35    pub logical_path: &'a str,
36    pub bytes: &'a [u8],
37    pub existing_id: Option<&'a str>,
38}
39
40#[derive(Debug, Clone)]
41struct UploadInfoV1 {
42    request_max_bytes: i64,
43    multipart_enabled: bool,
44}
45
46#[derive(Debug, Deserialize, Clone)]
47struct InfoResponseV1 {
48    upload: InfoUploadV1,
49}
50
51#[derive(Debug, Deserialize, Clone)]
52struct InfoUploadV1 {
53    request_max_bytes: i64,
54    multipart: InfoMultipartV1,
55}
56
57#[derive(Debug, Deserialize, Clone)]
58struct InfoMultipartV1 {
59    enabled: bool,
60}
61
62#[derive(Debug, Serialize)]
63struct InitiateMultipartRequestV1 {
64    name: String,
65    data_type: String,
66    size: Option<i64>,
67    content_type: Option<String>,
68    existing_id: Option<String>,
69}
70
71#[derive(Debug, Deserialize)]
72struct InitiateMultipartResponseV1 {
73    upload_id: String,
74    part_size: i64,
75}
76
77#[derive(Debug, Deserialize)]
78struct UploadPartResultV1 {
79    etag: String,
80}
81
82#[derive(Debug, Serialize)]
83struct CompletedPartV1 {
84    part_number: i32,
85    etag: String,
86}
87
88#[derive(Debug, Serialize)]
89struct CompleteMultipartRequestV1 {
90    parts: Vec<CompletedPartV1>,
91}
92
93#[derive(Debug, Deserialize)]
94struct DomainDataMetadataV1 {
95    id: String,
96}
97
98#[derive(Debug, Clone)]
99struct InfoCacheEntryV1 {
100    value: Option<UploadInfoV1>,
101    expires_at: u64,
102}
103
104const INFO_CACHE_TTL_SECS: u64 = 60;
105static INFO_CACHE_V1: OnceLock<parking_lot::Mutex<HashMap<String, InfoCacheEntryV1>>> =
106    OnceLock::new();
107
108fn info_cache_v1() -> &'static parking_lot::Mutex<HashMap<String, InfoCacheEntryV1>> {
109    INFO_CACHE_V1.get_or_init(|| parking_lot::Mutex::new(HashMap::new()))
110}
111
112fn now_unix_secs() -> u64 {
113    std::time::SystemTime::now()
114        .duration_since(std::time::UNIX_EPOCH)
115        .unwrap_or_else(|_| Duration::from_secs(0))
116        .as_secs()
117}
118
119/// Domain server HTTP client (skeleton; HTTP added later).
120#[derive(Clone)]
121pub struct DomainClient {
122    pub base: Url,
123    pub token: TokenRef,
124    client_id: String,
125}
126impl DomainClient {
127    pub fn new(base: Url, token: TokenRef) -> Result<Self> {
128        let client_id = env_client_id();
129        Ok(Self {
130            base,
131            token,
132            client_id,
133        })
134    }
135
136    pub fn with_timeout(base: Url, token: TokenRef, _timeout: Duration) -> Result<Self> {
137        let client_id = env_client_id();
138        Ok(Self {
139            base,
140            token,
141            client_id,
142        })
143    }
144
145    /// Download a Domain data item referenced by an absolute URI, persisting each multipart
146    /// part into a temporary file and returning its metadata.
147    pub async fn download_uri(
148        &self,
149        uri: &str,
150    ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
151        let resolved = resolve_domain_url(&self.base, uri)?;
152        let (domain_id, query) = parse_download_target(&resolved, None)?;
153        self.download_domain_data(&resolved, &domain_id, query)
154            .await
155    }
156
157    /// Download domain data referenced by a CID, which can be either:
158    /// - a bare domain-data ID (UUID), or
159    /// - an absolute/relative URL under the domain server.
160    pub async fn download_cid(
161        &self,
162        domain_id: &str,
163        cid: &str,
164    ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
165        let cid = cid.trim();
166        if cid.is_empty() {
167            return Err(StorageError::Other("empty cid".into()));
168        }
169
170        if cid.contains("://") || cid.starts_with('/') {
171            let resolved = resolve_domain_url(&self.base, cid)?;
172            let (domain_id, query) = parse_download_target(&resolved, Some(domain_id))?;
173            return self
174                .download_domain_data(&resolved, &domain_id, query)
175                .await;
176        }
177
178        let query = posemesh_domain_http::domain_data::DownloadQuery {
179            ids: vec![cid.to_string()],
180            name: None,
181            data_type: None,
182        };
183        self.download_domain_data(&self.base, domain_id, query)
184            .await
185    }
186
187    async fn download_domain_data(
188        &self,
189        url_for_log: &Url,
190        domain_id: &str,
191        query: posemesh_domain_http::domain_data::DownloadQuery,
192    ) -> std::result::Result<Vec<DownloadedPart>, StorageError> {
193        let domain_id = domain_id.trim();
194        if domain_id.is_empty() {
195            return Err(StorageError::Other("missing domain_id for download".into()));
196        }
197
198        tracing::debug!(
199            target: "posemesh_compute_node::storage::client",
200            method = "GET",
201            %url_for_log,
202            domain_id = domain_id,
203            ids = ?query.ids,
204            name = ?query.name,
205            data_type = ?query.data_type,
206            "Downloading domain data"
207        );
208
209        let base = self.base.as_str().trim_end_matches('/');
210        let mut rx = posemesh_domain_http::domain_data::download_v1_stream(
211            base,
212            self.client_id.as_str(),
213            self.token.get().as_str(),
214            domain_id,
215            &query,
216        )
217        .await
218        .map_err(map_domain_error)?;
219
220        let root = std::env::temp_dir().join(format!("domain-input-{}", Uuid::new_v4()));
221        fs::create_dir_all(&root)
222            .await
223            .map_err(|e| StorageError::Other(format!("create download root: {}", e)))?;
224        let datasets_root = root.join("datasets");
225        fs::create_dir_all(&datasets_root)
226            .await
227            .map_err(|e| StorageError::Other(format!("create datasets root: {}", e)))?;
228
229        let mut parts = Vec::new();
230
231        while let Some(item) = rx.next().await {
232            let domain_item = item.map_err(map_domain_error)?;
233            let name = domain_item.metadata.name.clone();
234            let data_type = domain_item.metadata.data_type.clone();
235
236            let scan_folder = extract_timestamp(&name)
237                .map(|ts| sanitize_component(&ts))
238                .unwrap_or_else(|| sanitize_component(&name));
239            let scan_dir = datasets_root.join(&scan_folder);
240            fs::create_dir_all(&scan_dir)
241                .await
242                .map_err(|e| StorageError::Other(format!("create scan dir: {}", e)))?;
243
244            let file_name = map_filename(&data_type, &name);
245            let file_path = scan_dir.join(&file_name);
246            if let Some(parent) = file_path.parent() {
247                fs::create_dir_all(parent)
248                    .await
249                    .map_err(|e| StorageError::Other(format!("create parent dir: {}", e)))?;
250            }
251            fs::write(&file_path, &domain_item.data)
252                .await
253                .map_err(|e| StorageError::Other(format!("write temp file: {}", e)))?;
254
255            let extracted_paths = Vec::new();
256
257            let relative_path = file_path
258                .strip_prefix(&root)
259                .unwrap_or(&file_path)
260                .to_path_buf();
261
262            parts.push(DownloadedPart {
263                id: Some(domain_item.metadata.id),
264                name: Some(name),
265                data_type: Some(data_type),
266                domain_id: Some(domain_item.metadata.domain_id),
267                path: file_path,
268                root: root.clone(),
269                relative_path,
270                extracted_paths,
271            });
272        }
273
274        if parts.is_empty() {
275            return Err(StorageError::NotFound);
276        }
277
278        Ok(parts)
279    }
280
281    pub async fn upload_artifact(
282        &self,
283        request: UploadRequest<'_>,
284    ) -> std::result::Result<Option<String>, StorageError> {
285        let domain_id = request.domain_id.trim();
286        if domain_id.is_empty() {
287            return Err(StorageError::Other(
288                "missing domain_id for artifact upload".into(),
289            ));
290        }
291
292        let base = self.base.as_str().trim_end_matches('/');
293        if let Some(info) = get_upload_info_v1(base).await {
294            if info.multipart_enabled && info.request_max_bytes > 0 {
295                let fits_alone = fits_single_upload_request(
296                    info.request_max_bytes,
297                    request.name,
298                    request.data_type,
299                    request.existing_id,
300                    request.bytes.len(),
301                );
302                if !fits_alone {
303                    return self
304                        .upload_artifact_v1_multipart(base, domain_id, request)
305                        .await;
306                }
307            }
308        }
309
310        let action = if let Some(id) = request.existing_id {
311            posemesh_domain_http::domain_data::DomainAction::Update { id: id.to_string() }
312        } else {
313            posemesh_domain_http::domain_data::DomainAction::Create {
314                name: request.name.to_string(),
315                data_type: request.data_type.to_string(),
316            }
317        };
318
319        let upload = posemesh_domain_http::domain_data::UploadDomainData {
320            action,
321            data: request.bytes.to_vec(),
322        };
323        let method = if request.existing_id.is_some() {
324            Method::PUT
325        } else {
326            Method::POST
327        };
328        tracing::debug!(
329            target: "posemesh_compute_node::storage::client",
330            method = %method,
331            url = %format!("{}/api/v1/domains/{}/data", base, domain_id),
332            logical_path = request.logical_path,
333            name = request.name,
334            data_type = request.data_type,
335            has_existing_id = request.existing_id.is_some(),
336            "Sending domain upload request"
337        );
338
339        let items = posemesh_domain_http::domain_data::upload_v1(
340            base,
341            self.token.get().as_str(),
342            domain_id,
343            vec![upload],
344        )
345        .await
346        .map_err(map_domain_error)?;
347
348        Ok(items.into_iter().next().map(|d| d.id))
349    }
350
351    async fn upload_artifact_v1_multipart(
352        &self,
353        base: &str,
354        domain_id: &str,
355        request: UploadRequest<'_>,
356    ) -> std::result::Result<Option<String>, StorageError> {
357        if request.bytes.is_empty() {
358            return Err(StorageError::BadRequest);
359        }
360
361        let client = reqwest::Client::new();
362        let initiate_endpoint = format!(
363            "{}/api/v1/domains/{}/data/multipart?uploads",
364            base, domain_id
365        );
366
367        let init_req = InitiateMultipartRequestV1 {
368            name: request.name.to_string(),
369            data_type: request.data_type.to_string(),
370            size: Some(request.bytes.len() as i64),
371            content_type: Some("application/octet-stream".to_string()),
372            existing_id: request.existing_id.map(|id| id.to_string()),
373        };
374
375        tracing::debug!(
376            target: "posemesh_compute_node::storage::client",
377            method = "POST",
378            url = %initiate_endpoint,
379            logical_path = request.logical_path,
380            name = request.name,
381            data_type = request.data_type,
382            has_existing_id = request.existing_id.is_some(),
383            "Initiating multipart upload"
384        );
385
386        let init_resp = client
387            .post(&initiate_endpoint)
388            .bearer_auth(self.token.get())
389            .header("posemesh-client-id", self.client_id.as_str())
390            .header("Content-Type", "application/json")
391            .json(&init_req)
392            .send()
393            .await
394            .map_err(|e| StorageError::Network(e.to_string()))?;
395
396        if !init_resp.status().is_success() {
397            let status = init_resp.status();
398            let err = init_resp
399                .text()
400                .await
401                .unwrap_or_else(|_| "Unknown error".to_string());
402            tracing::warn!(
403                target: "posemesh_compute_node::storage::client",
404                %status,
405                error = %err,
406                "Multipart initiation failed"
407            );
408            return Err(map_status(status));
409        }
410
411        let init: InitiateMultipartResponseV1 = init_resp
412            .json()
413            .await
414            .map_err(|e| StorageError::Other(format!("invalid initiate response: {}", e)))?;
415
416        let part_size = usize::try_from(init.part_size)
417            .map_err(|_| StorageError::Other("invalid multipart part_size".into()))?;
418        if part_size == 0 {
419            return Err(StorageError::Other("invalid multipart part_size".into()));
420        }
421
422        let upload_id = init.upload_id;
423        let mut completed_parts: Vec<CompletedPartV1> = Vec::new();
424
425        let upload_res: std::result::Result<DomainDataMetadataV1, StorageError> = async {
426            let mut offset: usize = 0;
427            let mut part_number: i32 = 1;
428            while offset < request.bytes.len() {
429                let end = std::cmp::min(offset + part_size, request.bytes.len());
430                let chunk = request.bytes[offset..end].to_vec();
431
432                let part_endpoint = format!(
433                    "{}/api/v1/domains/{}/data/multipart?uploadId={}&partNumber={}",
434                    base, domain_id, upload_id, part_number
435                );
436                tracing::debug!(
437                    target: "posemesh_compute_node::storage::client",
438                    method = "PUT",
439                    url = %part_endpoint,
440                    part_number,
441                    part_bytes = chunk.len(),
442                    "Uploading multipart part"
443                );
444
445                let resp = client
446                    .put(&part_endpoint)
447                    .bearer_auth(self.token.get())
448                    .header("posemesh-client-id", self.client_id.as_str())
449                    .header("Content-Type", "application/octet-stream")
450                    .body(chunk)
451                    .send()
452                    .await
453                    .map_err(|e| StorageError::Network(e.to_string()))?;
454
455                if !resp.status().is_success() {
456                    let status = resp.status();
457                    let err = resp
458                        .text()
459                        .await
460                        .unwrap_or_else(|_| "Unknown error".to_string());
461                    tracing::warn!(
462                        target: "posemesh_compute_node::storage::client",
463                        %status,
464                        error = %err,
465                        part_number,
466                        "Multipart part upload failed"
467                    );
468                    return Err(map_status(status));
469                }
470
471                let res: UploadPartResultV1 = resp
472                    .json()
473                    .await
474                    .map_err(|e| StorageError::Other(format!("invalid part response: {}", e)))?;
475
476                completed_parts.push(CompletedPartV1 {
477                    part_number,
478                    etag: res.etag,
479                });
480
481                offset = end;
482                part_number = part_number
483                    .checked_add(1)
484                    .ok_or_else(|| StorageError::Other("multipart upload too many parts".into()))?;
485            }
486
487            let complete_endpoint = format!(
488                "{}/api/v1/domains/{}/data/multipart?uploadId={}",
489                base, domain_id, upload_id
490            );
491            tracing::debug!(
492                target: "posemesh_compute_node::storage::client",
493                method = "POST",
494                url = %complete_endpoint,
495                parts = completed_parts.len(),
496                "Completing multipart upload"
497            );
498            let resp = client
499                .post(&complete_endpoint)
500                .bearer_auth(self.token.get())
501                .header("posemesh-client-id", self.client_id.as_str())
502                .header("Content-Type", "application/json")
503                .json(&CompleteMultipartRequestV1 {
504                    parts: completed_parts,
505                })
506                .send()
507                .await
508                .map_err(|e| StorageError::Network(e.to_string()))?;
509
510            if !resp.status().is_success() {
511                let status = resp.status();
512                let err = resp
513                    .text()
514                    .await
515                    .unwrap_or_else(|_| "Unknown error".to_string());
516                tracing::warn!(
517                    target: "posemesh_compute_node::storage::client",
518                    %status,
519                    error = %err,
520                    "Multipart completion failed"
521                );
522                return Err(map_status(status));
523            }
524
525            resp.json::<DomainDataMetadataV1>()
526                .await
527                .map_err(|e| StorageError::Other(format!("invalid complete response: {}", e)))
528        }
529        .await;
530
531        if upload_res.is_err() {
532            let abort_endpoint = format!(
533                "{}/api/v1/domains/{}/data/multipart?uploadId={}",
534                base, domain_id, upload_id
535            );
536            let _ = client
537                .delete(&abort_endpoint)
538                .bearer_auth(self.token.get())
539                .header("posemesh-client-id", self.client_id.as_str())
540                .send()
541                .await;
542        }
543
544        upload_res.map(|meta| Some(meta.id))
545    }
546
547    pub async fn find_artifact_id(
548        &self,
549        domain_id: &str,
550        name: &str,
551        data_type: &str,
552    ) -> std::result::Result<Option<String>, StorageError> {
553        let domain_id = domain_id.trim();
554        if domain_id.is_empty() {
555            return Err(StorageError::Other(
556                "missing domain_id for artifact lookup".into(),
557            ));
558        }
559
560        let query = posemesh_domain_http::domain_data::DownloadQuery {
561            ids: Vec::new(),
562            name: Some(name.to_string()),
563            data_type: Some(data_type.to_string()),
564        };
565
566        let base = self.base.as_str().trim_end_matches('/');
567        let url = format!("{}/api/v1/domains/{}/data", base, domain_id);
568        tracing::debug!(
569            target: "posemesh_compute_node::storage::client",
570            method = "GET",
571            %url,
572            artifact_name = name,
573            artifact_type = data_type,
574            "Looking up existing domain artifact"
575        );
576
577        let results = posemesh_domain_http::domain_data::download_metadata_v1(
578            base,
579            self.client_id.as_str(),
580            self.token.get().as_str(),
581            domain_id,
582            &query,
583        )
584        .await;
585
586        let results = match results {
587            Ok(items) => items,
588            Err(err) => {
589                if let posemesh_domain_http::errors::DomainError::AukiErrorResponse(resp) = &err {
590                    if resp.status == reqwest::StatusCode::NOT_FOUND {
591                        return Ok(None);
592                    }
593                }
594                return Err(map_domain_error(err));
595            }
596        };
597
598        Ok(results
599            .into_iter()
600            .find(|item| item.name == name && item.data_type == data_type)
601            .map(|item| item.id))
602    }
603}
604
605async fn fetch_info_v1(base: &str) -> Result<Option<UploadInfoV1>, ()> {
606    let resp = reqwest::Client::new()
607        .get(&format!("{}/api/v1/info", base))
608        .send()
609        .await
610        .map_err(|_| ())?;
611
612    if resp.status() == reqwest::StatusCode::NOT_FOUND {
613        return Ok(None);
614    }
615    if !resp.status().is_success() {
616        return Err(());
617    }
618
619    let info = resp.json::<InfoResponseV1>().await.map_err(|_| ())?;
620    Ok(Some(UploadInfoV1 {
621        request_max_bytes: info.upload.request_max_bytes,
622        multipart_enabled: info.upload.multipart.enabled,
623    }))
624}
625
626async fn get_upload_info_v1(base: &str) -> Option<UploadInfoV1> {
627    let now = now_unix_secs();
628    {
629        let cache = info_cache_v1().lock();
630        if let Some(entry) = cache.get(base) {
631            if entry.expires_at > now {
632                return entry.value.clone();
633            }
634        }
635    }
636
637    let fetched = match fetch_info_v1(base).await {
638        Ok(v) => v,
639        Err(_) => return None,
640    };
641
642    let mut cache = info_cache_v1().lock();
643    cache.retain(|_, entry| entry.expires_at > now);
644    cache.insert(
645        base.to_string(),
646        InfoCacheEntryV1 {
647            value: fetched.clone(),
648            expires_at: now.saturating_add(INFO_CACHE_TTL_SECS),
649        },
650    );
651    fetched
652}
653
654fn fits_single_upload_request(
655    request_max_bytes: i64,
656    name: &str,
657    data_type: &str,
658    existing_id: Option<&str>,
659    data_len: usize,
660) -> bool {
661    let boundary = "boundary";
662    let header = if let Some(id) = existing_id {
663        format!(
664            "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; id=\"{}\"\r\n\r\n",
665            boundary, id
666        )
667    } else {
668        format!(
669            "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
670            boundary, name, data_type
671        )
672    };
673    let closing = format!("--{}--\r\n", boundary);
674    let part_len = header.as_bytes().len() + data_len + 2;
675    (part_len + closing.len()) as i64 <= request_max_bytes
676}
677
678fn map_status(status: reqwest::StatusCode) -> StorageError {
679    match status.as_u16() {
680        400 => StorageError::BadRequest,
681        401 => StorageError::Unauthorized,
682        404 => StorageError::NotFound,
683        409 => StorageError::Conflict,
684        n if (500..=599).contains(&n) => StorageError::Server(n),
685        other => StorageError::Other(format!("unexpected status: {}", other)),
686    }
687}
688
689fn map_domain_error(err: posemesh_domain_http::errors::DomainError) -> StorageError {
690    use posemesh_domain_http::errors::{AuthError, DomainError};
691
692    match err {
693        DomainError::AukiErrorResponse(resp) => map_status(resp.status),
694        DomainError::ReqwestError(e) => StorageError::Network(e.to_string()),
695        DomainError::AuthError(AuthError::Unauthorized(_)) => StorageError::Unauthorized,
696        other => StorageError::Other(other.to_string()),
697    }
698}
699
700fn env_client_id() -> String {
701    std::env::var("CLIENT_ID")
702        .ok()
703        .map(|v| v.trim().to_string())
704        .filter(|v| !v.is_empty())
705        .unwrap_or_else(|| format!("posemesh-compute-node/{}", Uuid::new_v4()))
706}
707
708fn resolve_domain_url(base: &Url, value: &str) -> std::result::Result<Url, StorageError> {
709    if value.contains("://") {
710        Url::parse(value).map_err(|e| StorageError::Other(format!("parse domain url: {e}")))
711    } else {
712        base.join(value)
713            .map_err(|e| StorageError::Other(format!("join domain url: {e}")))
714    }
715}
716
717fn parse_download_target(
718    url: &Url,
719    fallback_domain_id: Option<&str>,
720) -> std::result::Result<(String, posemesh_domain_http::domain_data::DownloadQuery), StorageError> {
721    let segments: Vec<&str> = url
722        .path_segments()
723        .map(|segments| segments.filter(|seg| !seg.is_empty()).collect())
724        .unwrap_or_default();
725
726    let mut domain_id_from_path: Option<&str> = None;
727    let mut data_id_from_path: Option<&str> = None;
728
729    for idx in 0..segments.len() {
730        if segments[idx] == "domains" && idx + 2 < segments.len() && segments[idx + 2] == "data" {
731            domain_id_from_path = Some(segments[idx + 1]);
732            data_id_from_path = segments.get(idx + 3).copied();
733            break;
734        }
735    }
736
737    let domain_id = domain_id_from_path
738        .or(fallback_domain_id)
739        .ok_or_else(|| StorageError::Other(format!("cid url missing domain_id: {}", url)))?
740        .to_string();
741
742    let mut ids: Vec<String> = Vec::new();
743    let mut name: Option<String> = None;
744    let mut data_type: Option<String> = None;
745
746    for (key, value) in url.query_pairs() {
747        match key.as_ref() {
748            "ids" => ids.extend(
749                value
750                    .split(',')
751                    .map(|s| s.trim())
752                    .filter(|s| !s.is_empty())
753                    .map(|s| s.to_string()),
754            ),
755            "name" => {
756                if name.is_none() {
757                    name = Some(value.to_string());
758                }
759            }
760            "data_type" => {
761                if data_type.is_none() {
762                    data_type = Some(value.to_string());
763                }
764            }
765            _ => {}
766        }
767    }
768
769    if let Some(id) = data_id_from_path {
770        ids = vec![id.to_string()];
771    }
772
773    Ok((
774        domain_id,
775        posemesh_domain_http::domain_data::DownloadQuery {
776            ids,
777            name,
778            data_type,
779        },
780    ))
781}
782
783// no parse_disposition_params; headers are parsed in posemesh-domain-http
784
785fn sanitize_component(value: &str) -> String {
786    let sanitized: String = value
787        .chars()
788        .map(|c| {
789            if c.is_ascii_alphanumeric() || matches!(c, '-' | '_') {
790                c
791            } else {
792                '_'
793            }
794        })
795        .collect();
796    if sanitized.is_empty() {
797        "part".into()
798    } else {
799        sanitized
800    }
801}
802
803fn extract_timestamp(name: &str) -> Option<String> {
804    Regex::new(r"\d{4}-\d{2}-\d{2}[_-]\d{2}-\d{2}-\d{2}")
805        .ok()
806        .and_then(|re| re.find(name).map(|m| m.as_str().to_string()))
807}
808
809fn map_filename(data_type: &str, name: &str) -> String {
810    format!(
811        "{}.{}",
812        sanitize_component(name),
813        sanitize_component(data_type)
814    )
815}