Skip to main content

posemesh_domain_http/
domain_data.rs

1use bytes::Bytes;
2use futures::lock::Mutex;
3use futures::{SinkExt, Stream, channel::mpsc, stream::StreamExt};
4use reqwest::{Body, Client, Response, StatusCode};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::sync::OnceLock;
8#[cfg(not(target_family = "wasm"))]
9use tokio::spawn;
10#[cfg(target_family = "wasm")]
11use wasm_bindgen_futures::spawn_local as spawn;
12
13use posemesh_utils::now_unix_secs;
14
15use crate::errors::{AukiErrorResponse, DomainError};
16
17#[derive(Debug, Deserialize, Clone)]
18struct InfoResponse {
19    upload: InfoUpload,
20}
21
22#[derive(Debug, Deserialize, Clone)]
23struct InfoUpload {
24    request_max_bytes: i64,
25    multipart: InfoMultipart,
26}
27
28#[derive(Debug, Deserialize, Clone)]
29struct InfoMultipart {
30    enabled: bool,
31}
32
33#[derive(Debug, Clone)]
34pub struct UploadInfoV1 {
35    pub request_max_bytes: i64,
36    pub multipart_enabled: bool,
37}
38
39#[derive(Debug, Clone)]
40struct InfoCacheEntry {
41    value: Option<UploadInfoV1>,
42    expires_at: u64,
43}
44
45const INFO_CACHE_TTL_SECS: u64 = 60;
46static INFO_CACHE: OnceLock<Mutex<HashMap<String, InfoCacheEntry>>> = OnceLock::new();
47
48fn info_cache() -> &'static Mutex<HashMap<String, InfoCacheEntry>> {
49    INFO_CACHE.get_or_init(|| Mutex::new(HashMap::new()))
50}
51
52async fn fetch_info_v1(url: &str) -> Result<Option<UploadInfoV1>, ()> {
53    let resp = Client::new()
54        .get(format!("{}/api/v1/info", url))
55        .send()
56        .await
57        .map_err(|_| ())?;
58
59    if resp.status() == StatusCode::NOT_FOUND {
60        return Ok(None);
61    }
62    if !resp.status().is_success() {
63        return Err(());
64    }
65    let info = resp.json::<InfoResponse>().await.map_err(|_| ())?;
66    Ok(Some(UploadInfoV1 {
67        request_max_bytes: info.upload.request_max_bytes,
68        multipart_enabled: info.upload.multipart.enabled,
69    }))
70}
71
72pub async fn get_upload_info_v1(url: &str) -> Option<UploadInfoV1> {
73    let now = now_unix_secs();
74    {
75        let cache = info_cache().lock().await;
76        if let Some(entry) = cache.get(url)
77            && entry.expires_at > now
78        {
79            return entry.value.clone();
80        }
81    }
82
83    let fetched = match fetch_info_v1(url).await {
84        Ok(v) => v,
85        Err(_) => return None,
86    };
87
88    let mut cache = info_cache().lock().await;
89    cache.retain(|_, entry| entry.expires_at > now);
90    cache.insert(
91        url.to_string(),
92        InfoCacheEntry {
93            value: fetched.clone(),
94            expires_at: now.saturating_add(INFO_CACHE_TTL_SECS),
95        },
96    );
97    fetched
98}
99
100fn is_unsupported_endpoint_status(status: StatusCode) -> bool {
101    status == StatusCode::NOT_FOUND
102        || status == StatusCode::METHOD_NOT_ALLOWED
103        || status == StatusCode::NOT_IMPLEMENTED
104}
105
106fn is_unsupported_endpoint_error(err: &DomainError) -> bool {
107    match err {
108        DomainError::AukiErrorResponse(resp) => is_unsupported_endpoint_status(resp.status),
109        _ => false,
110    }
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct DomainDataMetadata {
115    pub id: String,
116    pub domain_id: String,
117    pub name: String,
118    pub data_type: String,
119    pub size: u64,
120    pub created_at: String,
121    pub updated_at: String,
122}
123
124#[derive(Debug, Deserialize, Serialize)]
125pub struct DomainData {
126    // #[serde(flatten)] This doesn't work in serde_wasm_bindgen, it generates Map instead of a plain object
127    pub metadata: DomainDataMetadata,
128    pub data: Vec<u8>,
129}
130
131#[derive(Debug, Serialize, Clone, Deserialize)]
132pub struct UpdateDomainData {
133    pub id: String,
134}
135
136#[derive(Debug, Serialize, Clone, Deserialize)]
137pub struct CreateDomainData {
138    pub name: String,
139    pub data_type: String,
140}
141
142#[derive(Debug, Serialize, Deserialize)]
143#[serde(untagged)]
144pub enum DomainAction {
145    Create { name: String, data_type: String },
146    Update { id: String },
147}
148
149#[derive(Debug, Serialize)]
150struct InitiateMultipartRequest {
151    name: String,
152    data_type: String,
153    size: Option<i64>,
154    content_type: Option<String>,
155    existing_id: Option<String>,
156}
157
158#[derive(Debug, Deserialize)]
159struct InitiateMultipartResponse {
160    upload_id: String,
161    part_size: i64,
162}
163
164#[derive(Debug, Serialize)]
165struct CompletedPart {
166    part_number: i32,
167    etag: String,
168}
169
170#[derive(Debug, Serialize)]
171struct CompleteMultipartRequest {
172    parts: Vec<CompletedPart>,
173}
174
175#[derive(Debug, Deserialize)]
176struct UploadPartResult {
177    etag: String,
178}
179
180#[derive(Debug, Serialize, Deserialize)]
181pub struct UploadDomainData {
182    #[serde(flatten)]
183    pub action: DomainAction,
184    pub data: Vec<u8>,
185}
186
187#[derive(Debug, Serialize, Deserialize)]
188pub struct DownloadQuery {
189    pub ids: Vec<String>,
190    pub name: Option<String>,
191    pub data_type: Option<String>,
192}
193
194#[derive(Debug, Deserialize)]
195struct ListDomainDataMetadata {
196    pub data: Vec<DomainDataMetadata>,
197}
198
199pub async fn download_by_id(
200    url: &str,
201    client_id: &str,
202    access_token: &str,
203    domain_id: &str,
204    id: &str,
205) -> Result<Vec<u8>, DomainError> {
206    let response = Client::new()
207        .get(format!(
208            "{}/api/v1/domains/{}/data/{}?raw=true",
209            url, domain_id, id
210        ))
211        .bearer_auth(access_token)
212        .header("posemesh-client-id", client_id)
213        .send()
214        .await?;
215
216    if response.status().is_success() {
217        let data = response.bytes().await?;
218        Ok(data.to_vec())
219    } else {
220        let status = response.status();
221        let error = response
222            .text()
223            .await
224            .unwrap_or_else(|_| "Unknown error".to_string());
225        Err(AukiErrorResponse {
226            status,
227            error: format!("Failed to download data by id. {}", error),
228        }
229        .into())
230    }
231}
232
233pub async fn download_metadata_v1(
234    url: &str,
235    client_id: &str,
236    access_token: &str,
237    domain_id: &str,
238    query: &DownloadQuery,
239) -> Result<Vec<DomainDataMetadata>, DomainError> {
240    let response = download_v1(url, client_id, access_token, domain_id, query, false).await?;
241    if response.status().is_success() {
242        let data = response.json::<ListDomainDataMetadata>().await?;
243        Ok(data.data)
244    } else {
245        let status = response.status();
246        let text = response
247            .text()
248            .await
249            .unwrap_or_else(|_| "Unknown error".to_string());
250        Err(AukiErrorResponse {
251            status,
252            error: format!("Failed to download metadata. {}", text),
253        }
254        .into())
255    }
256}
257
258pub async fn download_v1(
259    url: &str,
260    client_id: &str,
261    access_token: &str,
262    domain_id: &str,
263    query: &DownloadQuery,
264    with_data: bool,
265) -> Result<Response, DomainError> {
266    let mut params = HashMap::new();
267
268    if let Some(name) = &query.name {
269        params.insert("name", name.clone());
270    }
271    if let Some(data_type) = &query.data_type {
272        params.insert("data_type", data_type.clone());
273    }
274    let ids = {
275        if !query.ids.is_empty() {
276            let ids = query.ids.join(",");
277            if params.is_empty() {
278                &format!("?ids={}", ids)
279            } else {
280                &format!("&ids={}", ids)
281            }
282        } else {
283            ""
284        }
285    };
286
287    let response = Client::new()
288        .get(format!("{}/api/v1/domains/{}/data{}", url, domain_id, ids))
289        .bearer_auth(access_token)
290        .header(
291            "Accept",
292            if with_data {
293                "multipart/form-data"
294            } else {
295                "application/json"
296            },
297        )
298        .header("posemesh-client-id", client_id)
299        .query(&params)
300        .send()
301        .await?;
302
303    if response.status().is_success() {
304        Ok(response)
305    } else {
306        let status = response.status();
307        let text = response
308            .text()
309            .await
310            .unwrap_or_else(|_| "Unknown error".to_string());
311        Err(AukiErrorResponse {
312            status,
313            error: format!("Failed to download data. {}", text),
314        }
315        .into())
316    }
317}
318
319pub async fn download_v1_stream(
320    url: &str,
321    client_id: &str,
322    access_token: &str,
323    domain_id: &str,
324    query: &DownloadQuery,
325) -> Result<mpsc::Receiver<Result<DomainData, DomainError>>, DomainError> {
326    let response = download_v1(url, client_id, access_token, domain_id, query, true).await?;
327
328    let (mut tx, rx) = mpsc::channel::<Result<DomainData, DomainError>>(100);
329
330    let boundary = match response
331        .headers()
332        .get("content-type")
333        .and_then(|ct| ct.to_str().ok())
334        .and_then(|ct| {
335            if ct.starts_with("multipart/form-data; boundary=") {
336                Some(ct.split("boundary=").nth(1)?.to_string())
337            } else {
338                None
339            }
340        }) {
341        Some(b) => b,
342        None => {
343            tracing::error!("Invalid content-type header");
344            let _ = tx.close().await;
345            return Err(DomainError::InvalidContentTypeHeader);
346        }
347    };
348
349    spawn(async move {
350        let stream = response.bytes_stream();
351        handle_domain_data_stream(tx, stream, &boundary).await;
352    });
353
354    Ok(rx)
355}
356
357pub async fn delete_by_id(
358    url: &str,
359    access_token: &str,
360    domain_id: &str,
361    id: &str,
362) -> Result<(), DomainError> {
363    let endpoint = format!("{}/api/v1/domains/{}/data/{}", url, domain_id, id);
364    let client = Client::new();
365    let resp = client
366        .delete(&endpoint)
367        .bearer_auth(access_token)
368        .send()
369        .await?;
370
371    if resp.status().is_success() {
372        Ok(())
373    } else {
374        let status = resp.status();
375        let err = resp
376            .text()
377            .await
378            .unwrap_or_else(|_| "Unknown error".to_string());
379        Err(AukiErrorResponse {
380            status,
381            error: format!("Failed to delete data by id. {}", err),
382        }
383        .into())
384    }
385}
386
387async fn initiate_domain_data_multipart_upload(
388    client: &Client,
389    url: &str,
390    access_token: &str,
391    domain_id: &str,
392    req: &InitiateMultipartRequest,
393) -> Result<InitiateMultipartResponse, DomainError> {
394    let resp = client
395        .post(format!(
396            "{}/api/v1/domains/{}/data/multipart?uploads",
397            url, domain_id
398        ))
399        .bearer_auth(access_token)
400        .header("Content-Type", "application/json")
401        .json(req)
402        .send()
403        .await?;
404
405    if resp.status().is_success() {
406        Ok(resp.json::<InitiateMultipartResponse>().await?)
407    } else {
408        let status = resp.status();
409        let err = resp
410            .text()
411            .await
412            .unwrap_or_else(|_| "Unknown error".to_string());
413        Err(AukiErrorResponse {
414            status,
415            error: format!("Failed to initiate multipart upload. {}", err),
416        }
417        .into())
418    }
419}
420
421async fn upload_domain_data_multipart_part(
422    client: &Client,
423    url: &str,
424    access_token: &str,
425    domain_id: &str,
426    upload_id: &str,
427    part_number: i32,
428    bytes: Bytes,
429) -> Result<UploadPartResult, DomainError> {
430    let resp = client
431        .put(format!(
432            "{}/api/v1/domains/{}/data/multipart?uploadId={}&partNumber={}",
433            url, domain_id, upload_id, part_number
434        ))
435        .bearer_auth(access_token)
436        .header("Content-Type", "application/octet-stream")
437        .body(bytes)
438        .send()
439        .await?;
440
441    if resp.status().is_success() {
442        Ok(resp.json::<UploadPartResult>().await?)
443    } else {
444        let status = resp.status();
445        let err = resp
446            .text()
447            .await
448            .unwrap_or_else(|_| "Unknown error".to_string());
449        Err(AukiErrorResponse {
450            status,
451            error: format!("Failed to upload multipart part. {}", err),
452        }
453        .into())
454    }
455}
456
457async fn complete_domain_data_multipart_upload(
458    client: &Client,
459    url: &str,
460    access_token: &str,
461    domain_id: &str,
462    upload_id: &str,
463    parts: Vec<CompletedPart>,
464) -> Result<DomainDataMetadata, DomainError> {
465    let resp = client
466        .post(format!(
467            "{}/api/v1/domains/{}/data/multipart?uploadId={}",
468            url, domain_id, upload_id
469        ))
470        .bearer_auth(access_token)
471        .header("Content-Type", "application/json")
472        .json(&CompleteMultipartRequest { parts })
473        .send()
474        .await?;
475
476    if resp.status().is_success() {
477        Ok(resp.json::<DomainDataMetadata>().await?)
478    } else {
479        let status = resp.status();
480        let err = resp
481            .text()
482            .await
483            .unwrap_or_else(|_| "Unknown error".to_string());
484        Err(AukiErrorResponse {
485            status,
486            error: format!("Failed to complete multipart upload. {}", err),
487        }
488        .into())
489    }
490}
491
492async fn abort_domain_data_multipart_upload(
493    client: &Client,
494    url: &str,
495    access_token: &str,
496    domain_id: &str,
497    upload_id: &str,
498) -> Result<(), DomainError> {
499    let resp = client
500        .delete(format!(
501            "{}/api/v1/domains/{}/data/multipart?uploadId={}",
502            url, domain_id, upload_id
503        ))
504        .bearer_auth(access_token)
505        .send()
506        .await?;
507
508    if resp.status().is_success() {
509        Ok(())
510    } else {
511        let status = resp.status();
512        let err = resp
513            .text()
514            .await
515            .unwrap_or_else(|_| "Unknown error".to_string());
516        Err(AukiErrorResponse {
517            status,
518            error: format!("Failed to abort multipart upload. {}", err),
519        }
520        .into())
521    }
522}
523
524async fn upload_domain_data_multipart_bytes(
525    url: &str,
526    access_token: &str,
527    domain_id: &str,
528    action: DomainAction,
529    bytes: Bytes,
530) -> Result<DomainDataMetadata, DomainError> {
531    if bytes.is_empty() {
532        return Err(DomainError::InvalidRequest(
533            "multipart upload requires non-empty data",
534        ));
535    }
536
537    let (name, data_type, existing_id) = match action {
538        DomainAction::Create { name, data_type } => (name, data_type, None),
539        DomainAction::Update { id } => ("".to_string(), "".to_string(), Some(id)),
540    };
541
542    let client = Client::new();
543    let init_res = initiate_domain_data_multipart_upload(
544        &client,
545        url,
546        access_token,
547        domain_id,
548        &InitiateMultipartRequest {
549            name,
550            data_type,
551            size: Some(bytes.len() as i64),
552            content_type: Some("application/octet-stream".to_string()),
553            existing_id,
554        },
555    )
556    .await?;
557
558    let part_size = usize::try_from(init_res.part_size)
559        .map_err(|_| DomainError::InvalidRequest("invalid multipart part_size"))?;
560    if part_size == 0 {
561        return Err(DomainError::InvalidRequest("invalid multipart part_size"));
562    }
563
564    let upload_id = init_res.upload_id;
565    let mut parts = Vec::new();
566
567    let upload_res = async {
568        let mut offset = 0usize;
569        let mut part_number: i32 = 1;
570
571        while offset < bytes.len() {
572            let end = std::cmp::min(offset + part_size, bytes.len());
573            let chunk = bytes.slice(offset..end);
574
575            let res = upload_domain_data_multipart_part(
576                &client,
577                url,
578                access_token,
579                domain_id,
580                &upload_id,
581                part_number,
582                chunk,
583            )
584            .await?;
585
586            parts.push(CompletedPart {
587                part_number,
588                etag: res.etag,
589            });
590
591            offset = end;
592            part_number = part_number
593                .checked_add(1)
594                .ok_or(DomainError::InvalidRequest(
595                    "multipart upload too many parts",
596                ))?;
597        }
598
599        complete_domain_data_multipart_upload(
600            &client,
601            url,
602            access_token,
603            domain_id,
604            &upload_id,
605            parts,
606        )
607        .await
608    }
609    .await;
610
611    if upload_res.is_err() {
612        let _ =
613            abort_domain_data_multipart_upload(&client, url, access_token, domain_id, &upload_id)
614                .await;
615    }
616
617    upload_res
618}
619
620#[cfg(not(target_family = "wasm"))]
621pub async fn upload_v1_stream(
622    url: &str,
623    access_token: &str,
624    domain_id: &str,
625    mut rx: mpsc::Receiver<UploadDomainData>,
626) -> Result<Vec<DomainDataMetadata>, DomainError> {
627    use futures::channel::oneshot;
628
629    let boundary = "boundary";
630
631    let info = get_upload_info_v1(url).await;
632    let request_max_bytes = info.as_ref().map(|i| i.request_max_bytes).unwrap_or(0);
633    let multipart_enabled = info.as_ref().map(|i| i.multipart_enabled).unwrap_or(false);
634
635    // If we can't determine a meaningful request size limit, keep the existing streaming behavior.
636    if request_max_bytes <= 0 || !multipart_enabled {
637        let (mut create_tx, create_rx) = mpsc::channel(100);
638        let (mut update_tx, update_rx) = mpsc::channel(100);
639
640        let create_body = Body::wrap_stream(create_rx.map(Ok::<Vec<u8>, std::io::Error>));
641        let update_body = Body::wrap_stream(update_rx.map(Ok::<Vec<u8>, std::io::Error>));
642
643        let url = url.to_string();
644        let url_2 = url.clone();
645        let access_token = access_token.to_string();
646        let domain_id = domain_id.to_string();
647        let access_token_2 = access_token.clone();
648        let domain_id_2 = domain_id.clone();
649
650        let (create_signal, create_signal_rx) =
651            oneshot::channel::<Result<Vec<DomainDataMetadata>, DomainError>>();
652        let (update_signal, update_signal_rx) =
653            oneshot::channel::<Result<Vec<DomainDataMetadata>, DomainError>>();
654
655        spawn(async move {
656            let create_response =
657                create_v1(&url, &access_token, &domain_id, boundary, create_body).await;
658            if let Err(Err(e)) = create_signal.send(create_response) {
659                tracing::error!("Failed to send create response: {}", e);
660            }
661        });
662
663        spawn(async move {
664            let update_response =
665                update_v1(&url_2, &access_token_2, &domain_id_2, boundary, update_body).await;
666            if let Err(Err(e)) = update_signal.send(update_response) {
667                tracing::error!("Failed to send update response: {}", e);
668            }
669        });
670
671        while let Some(datum) = rx.next().await {
672            match datum.action {
673                DomainAction::Create { name, data_type } => {
674                    let create_data = write_create_body(
675                        boundary,
676                        &CreateDomainData { name, data_type },
677                        &datum.data,
678                    );
679                    create_tx.clone().send(create_data).await?;
680                }
681                DomainAction::Update { id } => {
682                    let update_data =
683                        write_update_body(boundary, &UpdateDomainData { id }, &datum.data);
684                    update_tx.send(update_data).await?;
685                }
686            }
687        }
688        update_tx
689            .send(format!("--{}--\r\n", boundary).as_bytes().to_vec())
690            .await?;
691        create_tx
692            .send(format!("--{}--\r\n", boundary).as_bytes().to_vec())
693            .await?;
694        update_tx.close().await?;
695        create_tx.close().await?;
696
697        let mut data = {
698            match create_signal_rx.await {
699                Ok(res) => match res {
700                    Ok(d) => d,
701                    Err(e) => return Err(e),
702                },
703                Err(e) => return Err(DomainError::StreamCancelled(e)),
704            }
705        };
706
707        match update_signal_rx.await {
708            Ok(res) => match res {
709                Ok(d) => data.extend(d),
710                Err(e) => return Err(e),
711            },
712            Err(e) => return Err(DomainError::StreamCancelled(e)),
713        }
714
715        return Ok(data);
716    }
717
718    let closing = format!("--{}--\r\n", boundary).into_bytes();
719    let closing_len = closing.len();
720
721    struct Batch {
722        tx: mpsc::Sender<Vec<u8>>,
723        done: oneshot::Receiver<Result<Vec<DomainDataMetadata>, DomainError>>,
724        size: usize,
725    }
726
727    let mut create_batch: Option<Batch> = None;
728    let mut update_batch: Option<Batch> = None;
729    let mut create_done = Vec::new();
730    let mut update_done = Vec::new();
731    let mut create_res = Vec::new();
732    let mut update_res = Vec::new();
733
734    let spawn_create_batch = |url: String, access_token: String, domain_id: String| {
735        let (tx, rx) = mpsc::channel(100);
736        let body = Body::wrap_stream(rx.map(Ok::<Vec<u8>, std::io::Error>));
737        let (signal, signal_rx) =
738            oneshot::channel::<Result<Vec<DomainDataMetadata>, DomainError>>();
739        spawn(async move {
740            let create_response = create_v1(&url, &access_token, &domain_id, boundary, body).await;
741            if let Err(Err(e)) = signal.send(create_response) {
742                tracing::error!("Failed to send create response: {}", e);
743            }
744        });
745        Batch {
746            tx,
747            done: signal_rx,
748            size: 0,
749        }
750    };
751
752    let spawn_update_batch = |url: String, access_token: String, domain_id: String| {
753        let (tx, rx) = mpsc::channel(100);
754        let body = Body::wrap_stream(rx.map(Ok::<Vec<u8>, std::io::Error>));
755        let (signal, signal_rx) =
756            oneshot::channel::<Result<Vec<DomainDataMetadata>, DomainError>>();
757        spawn(async move {
758            let update_response = update_v1(&url, &access_token, &domain_id, boundary, body).await;
759            if let Err(Err(e)) = signal.send(update_response) {
760                tracing::error!("Failed to send update response: {}", e);
761            }
762        });
763        Batch {
764            tx,
765            done: signal_rx,
766            size: 0,
767        }
768    };
769
770    let base_url = url.to_string();
771    let token = access_token.to_string();
772    let did = domain_id.to_string();
773
774    while let Some(datum) = rx.next().await {
775        let bytes = Bytes::from(datum.data);
776        match datum.action {
777            DomainAction::Create { name, data_type } => {
778                let header = format!(
779                    "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
780                    boundary, name, data_type
781                );
782                let part_len = header.len() + bytes.len() + 2;
783
784                let fits_alone = (part_len + closing_len) as i64 <= request_max_bytes;
785                if multipart_enabled && !fits_alone {
786                    match upload_domain_data_multipart_bytes(
787                        &base_url,
788                        &token,
789                        &did,
790                        DomainAction::Create {
791                            name: name.clone(),
792                            data_type: data_type.clone(),
793                        },
794                        bytes.clone(),
795                    )
796                    .await
797                    {
798                        Ok(meta) => {
799                            create_res.push(meta);
800                            continue;
801                        }
802                        Err(e) => {
803                            if is_unsupported_endpoint_error(&e) {
804                                // Endpoint not supported: fall back to single upload (will likely 413).
805                                let mut body = Vec::with_capacity(part_len + closing.len());
806                                body.extend_from_slice(header.as_bytes());
807                                body.extend_from_slice(bytes.as_ref());
808                                body.extend_from_slice("\r\n".as_bytes());
809                                body.extend_from_slice(&closing);
810                                let res =
811                                    create_v1(&base_url, &token, &did, boundary, Body::from(body))
812                                        .await?;
813                                create_res.extend(res);
814                                continue;
815                            }
816                            return Err(e);
817                        }
818                    }
819                }
820
821                if create_batch.is_none() {
822                    create_batch = Some(spawn_create_batch(
823                        base_url.clone(),
824                        token.clone(),
825                        did.clone(),
826                    ));
827                }
828                let mut batch = create_batch.take().unwrap();
829                if batch.size > 0
830                    && (batch.size + part_len + closing_len) as i64 > request_max_bytes
831                {
832                    batch.tx.send(closing.clone()).await?;
833                    batch.tx.close().await?;
834                    create_done.push(batch.done);
835                    batch = spawn_create_batch(base_url.clone(), token.clone(), did.clone());
836                }
837                let mut part = Vec::with_capacity(part_len);
838                part.extend_from_slice(header.as_bytes());
839                part.extend_from_slice(bytes.as_ref());
840                part.extend_from_slice("\r\n".as_bytes());
841                batch.size += part.len();
842                batch.tx.send(part).await?;
843                create_batch = Some(batch);
844            }
845            DomainAction::Update { id } => {
846                let header = format!(
847                    "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; id=\"{}\"\r\n\r\n",
848                    boundary, id
849                );
850                let part_len = header.len() + bytes.len() + 2;
851
852                let fits_alone = (part_len + closing_len) as i64 <= request_max_bytes;
853                if multipart_enabled && !fits_alone {
854                    match upload_domain_data_multipart_bytes(
855                        &base_url,
856                        &token,
857                        &did,
858                        DomainAction::Update { id: id.clone() },
859                        bytes.clone(),
860                    )
861                    .await
862                    {
863                        Ok(meta) => {
864                            update_res.push(meta);
865                            continue;
866                        }
867                        Err(e) => {
868                            if is_unsupported_endpoint_error(&e) {
869                                let mut body = Vec::with_capacity(part_len + closing.len());
870                                body.extend_from_slice(header.as_bytes());
871                                body.extend_from_slice(bytes.as_ref());
872                                body.extend_from_slice("\r\n".as_bytes());
873                                body.extend_from_slice(&closing);
874                                let res =
875                                    update_v1(&base_url, &token, &did, boundary, Body::from(body))
876                                        .await?;
877                                update_res.extend(res);
878                                continue;
879                            }
880                            return Err(e);
881                        }
882                    }
883                }
884
885                if update_batch.is_none() {
886                    update_batch = Some(spawn_update_batch(
887                        base_url.clone(),
888                        token.clone(),
889                        did.clone(),
890                    ));
891                }
892                let mut batch = update_batch.take().unwrap();
893                if batch.size > 0
894                    && (batch.size + part_len + closing_len) as i64 > request_max_bytes
895                {
896                    batch.tx.send(closing.clone()).await?;
897                    batch.tx.close().await?;
898                    update_done.push(batch.done);
899                    batch = spawn_update_batch(base_url.clone(), token.clone(), did.clone());
900                }
901                let mut part = Vec::with_capacity(part_len);
902                part.extend_from_slice(header.as_bytes());
903                part.extend_from_slice(bytes.as_ref());
904                part.extend_from_slice("\r\n".as_bytes());
905                batch.size += part.len();
906                batch.tx.send(part).await?;
907                update_batch = Some(batch);
908            }
909        }
910    }
911
912    if let Some(mut batch) = create_batch {
913        batch.tx.send(closing.clone()).await?;
914        batch.tx.close().await?;
915        create_done.push(batch.done);
916    }
917    if let Some(mut batch) = update_batch {
918        batch.tx.send(closing.clone()).await?;
919        batch.tx.close().await?;
920        update_done.push(batch.done);
921    }
922
923    for done in create_done {
924        match done.await {
925            Ok(Ok(v)) => create_res.extend(v),
926            Ok(Err(e)) => return Err(e),
927            Err(e) => return Err(DomainError::StreamCancelled(e)),
928        }
929    }
930
931    for done in update_done {
932        match done.await {
933            Ok(Ok(v)) => update_res.extend(v),
934            Ok(Err(e)) => return Err(e),
935            Err(e) => return Err(DomainError::StreamCancelled(e)),
936        }
937    }
938
939    let mut out = Vec::new();
940    out.extend(create_res);
941    out.extend(update_res);
942    Ok(out)
943}
944
945async fn update_v1(
946    url: &str,
947    access_token: &str,
948    domain_id: &str,
949    boundary: &str,
950    body: Body,
951) -> Result<Vec<DomainDataMetadata>, DomainError> {
952    let update_response = Client::new()
953        .put(format!("{}/api/v1/domains/{}/data", url, domain_id))
954        .bearer_auth(access_token)
955        .header(
956            "Content-Type",
957            &format!("multipart/form-data; boundary={}", boundary),
958        )
959        .body(body)
960        .send()
961        .await?;
962
963    if update_response.status().is_success() {
964        let data = update_response
965            .json::<ListDomainDataMetadata>()
966            .await
967            .unwrap();
968        Ok(data.data)
969    } else {
970        let status = update_response.status();
971        let err = update_response
972            .text()
973            .await
974            .unwrap_or_else(|_| "Unknown error".to_string());
975        Err(AukiErrorResponse {
976            status,
977            error: format!("Failed to update data. {}", err),
978        }
979        .into())
980    }
981}
982
983async fn create_v1(
984    url: &str,
985    access_token: &str,
986    domain_id: &str,
987    boundary: &str,
988    body: Body,
989) -> Result<Vec<DomainDataMetadata>, DomainError> {
990    let create_response = Client::new()
991        .post(format!("{}/api/v1/domains/{}/data", url, domain_id))
992        .bearer_auth(access_token)
993        .header(
994            "Content-Type",
995            &format!("multipart/form-data; boundary={}", boundary),
996        )
997        .body(body)
998        .send()
999        .await?;
1000
1001    if create_response.status().is_success() {
1002        let data = create_response
1003            .json::<ListDomainDataMetadata>()
1004            .await
1005            .unwrap();
1006        Ok(data.data)
1007    } else {
1008        let status = create_response.status();
1009        let err = create_response
1010            .text()
1011            .await
1012            .unwrap_or_else(|_| "Unknown error".to_string());
1013        Err(AukiErrorResponse {
1014            status,
1015            error: format!("Failed to create data. {}", err),
1016        }
1017        .into())
1018    }
1019}
1020
1021fn write_create_body(boundary: &str, data: &CreateDomainData, data_bytes: &[u8]) -> Vec<u8> {
1022    let create_bytes = format!(
1023        "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
1024        boundary, data.name, data.data_type
1025    );
1026    let mut create_data = create_bytes.into_bytes();
1027    create_data.extend_from_slice(data_bytes);
1028    create_data.extend_from_slice("\r\n".as_bytes());
1029    create_data
1030}
1031
1032fn write_update_body(boundary: &str, data: &UpdateDomainData, data_bytes: &[u8]) -> Vec<u8> {
1033    let update_bytes = format!(
1034        "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; id=\"{}\"\r\n\r\n",
1035        boundary, data.id
1036    );
1037    let mut update_data = update_bytes.into_bytes();
1038    update_data.extend_from_slice(data_bytes);
1039    update_data.extend_from_slice("\r\n".as_bytes());
1040    update_data
1041}
1042
1043pub async fn upload_v1(
1044    url: &str,
1045    access_token: &str,
1046    domain_id: &str,
1047    data: Vec<UploadDomainData>,
1048) -> Result<Vec<DomainDataMetadata>, DomainError> {
1049    let boundary = "boundary";
1050
1051    let info = get_upload_info_v1(url).await;
1052    let request_max_bytes = info.as_ref().map(|i| i.request_max_bytes).unwrap_or(0);
1053    let multipart_enabled = info.as_ref().map(|i| i.multipart_enabled).unwrap_or(false);
1054
1055    // If we can't determine a meaningful request size limit, keep existing single-request behavior.
1056    if request_max_bytes <= 0 || !multipart_enabled {
1057        let mut create_body = Vec::new();
1058        let mut update_body = Vec::new();
1059        let mut to_update = false;
1060        let mut to_create = false;
1061
1062        for datum in data {
1063            match datum.action {
1064                DomainAction::Create { name, data_type } => {
1065                    to_create = true;
1066                    let create_data = write_create_body(
1067                        boundary,
1068                        &CreateDomainData { name, data_type },
1069                        &datum.data,
1070                    );
1071                    create_body.extend_from_slice(&create_data);
1072                }
1073                DomainAction::Update { id } => {
1074                    to_update = true;
1075                    let update_data =
1076                        write_update_body(boundary, &UpdateDomainData { id }, &datum.data);
1077                    update_body.extend_from_slice(&update_data);
1078                }
1079            }
1080        }
1081
1082        create_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
1083        update_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
1084
1085        let create_body = Body::from(create_body);
1086        let update_body = Body::from(update_body);
1087        let mut res = Vec::new();
1088
1089        if to_create {
1090            res = create_v1(url, access_token, domain_id, boundary, create_body).await?;
1091        }
1092        if to_update {
1093            let update_response =
1094                update_v1(url, access_token, domain_id, boundary, update_body).await?;
1095            if !update_response.is_empty() {
1096                res.extend(update_response);
1097            }
1098        }
1099
1100        return Ok(res);
1101    }
1102
1103    let closing = format!("--{}--\r\n", boundary).into_bytes();
1104    let closing_len = closing.len();
1105
1106    let mut create_res = Vec::new();
1107    let mut update_res = Vec::new();
1108
1109    let mut create_batch = Vec::new();
1110    let mut update_batch = Vec::new();
1111
1112    let mut create_size = 0usize;
1113    let mut update_size = 0usize;
1114
1115    for datum in data {
1116        let bytes = Bytes::from(datum.data);
1117        match datum.action {
1118            DomainAction::Create { name, data_type } => {
1119                let header = format!(
1120                    "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
1121                    boundary, name, data_type
1122                );
1123                let part_len = header.len() + bytes.len() + 2;
1124                let fits_alone = (part_len + closing_len) as i64 <= request_max_bytes;
1125
1126                if multipart_enabled && !fits_alone {
1127                    if !create_batch.is_empty() {
1128                        let mut body = std::mem::take(&mut create_batch);
1129                        body.extend_from_slice(&closing);
1130                        create_res.extend(
1131                            create_v1(url, access_token, domain_id, boundary, Body::from(body))
1132                                .await?,
1133                        );
1134                        create_size = 0;
1135                    }
1136                    match upload_domain_data_multipart_bytes(
1137                        url,
1138                        access_token,
1139                        domain_id,
1140                        DomainAction::Create {
1141                            name: name.clone(),
1142                            data_type: data_type.clone(),
1143                        },
1144                        bytes.clone(),
1145                    )
1146                    .await
1147                    {
1148                        Ok(meta) => {
1149                            create_res.push(meta);
1150                        }
1151                        Err(e) => {
1152                            if is_unsupported_endpoint_error(&e) {
1153                                // Fall back to single upload (will likely 413).
1154                                let mut body = Vec::with_capacity(part_len + closing.len());
1155                                body.extend_from_slice(header.as_bytes());
1156                                body.extend_from_slice(bytes.as_ref());
1157                                body.extend_from_slice("\r\n".as_bytes());
1158                                body.extend_from_slice(&closing);
1159                                create_res.extend(
1160                                    create_v1(
1161                                        url,
1162                                        access_token,
1163                                        domain_id,
1164                                        boundary,
1165                                        Body::from(body),
1166                                    )
1167                                    .await?,
1168                                );
1169                            } else {
1170                                return Err(e);
1171                            }
1172                        }
1173                    }
1174                    continue;
1175                }
1176
1177                if !create_batch.is_empty()
1178                    && (create_size + part_len + closing_len) as i64 > request_max_bytes
1179                {
1180                    let mut body = std::mem::take(&mut create_batch);
1181                    body.extend_from_slice(&closing);
1182                    create_res.extend(
1183                        create_v1(url, access_token, domain_id, boundary, Body::from(body)).await?,
1184                    );
1185                    create_size = 0;
1186                }
1187                create_batch.extend_from_slice(header.as_bytes());
1188                create_batch.extend_from_slice(bytes.as_ref());
1189                create_batch.extend_from_slice("\r\n".as_bytes());
1190                create_size += part_len;
1191            }
1192            DomainAction::Update { id } => {
1193                let header = format!(
1194                    "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; id=\"{}\"\r\n\r\n",
1195                    boundary, id
1196                );
1197                let part_len = header.len() + bytes.len() + 2;
1198                let fits_alone = (part_len + closing_len) as i64 <= request_max_bytes;
1199
1200                if multipart_enabled && !fits_alone {
1201                    if !update_batch.is_empty() {
1202                        let mut body = std::mem::take(&mut update_batch);
1203                        body.extend_from_slice(&closing);
1204                        update_res.extend(
1205                            update_v1(url, access_token, domain_id, boundary, Body::from(body))
1206                                .await?,
1207                        );
1208                        update_size = 0;
1209                    }
1210                    match upload_domain_data_multipart_bytes(
1211                        url,
1212                        access_token,
1213                        domain_id,
1214                        DomainAction::Update { id: id.clone() },
1215                        bytes.clone(),
1216                    )
1217                    .await
1218                    {
1219                        Ok(meta) => {
1220                            update_res.push(meta);
1221                        }
1222                        Err(e) => {
1223                            if is_unsupported_endpoint_error(&e) {
1224                                let mut body = Vec::with_capacity(part_len + closing.len());
1225                                body.extend_from_slice(header.as_bytes());
1226                                body.extend_from_slice(bytes.as_ref());
1227                                body.extend_from_slice("\r\n".as_bytes());
1228                                body.extend_from_slice(&closing);
1229                                update_res.extend(
1230                                    update_v1(
1231                                        url,
1232                                        access_token,
1233                                        domain_id,
1234                                        boundary,
1235                                        Body::from(body),
1236                                    )
1237                                    .await?,
1238                                );
1239                            } else {
1240                                return Err(e);
1241                            }
1242                        }
1243                    }
1244                    continue;
1245                }
1246
1247                if !update_batch.is_empty()
1248                    && (update_size + part_len + closing_len) as i64 > request_max_bytes
1249                {
1250                    let mut body = std::mem::take(&mut update_batch);
1251                    body.extend_from_slice(&closing);
1252                    update_res.extend(
1253                        update_v1(url, access_token, domain_id, boundary, Body::from(body)).await?,
1254                    );
1255                    update_size = 0;
1256                }
1257                update_batch.extend_from_slice(header.as_bytes());
1258                update_batch.extend_from_slice(bytes.as_ref());
1259                update_batch.extend_from_slice("\r\n".as_bytes());
1260                update_size += part_len;
1261            }
1262        }
1263    }
1264
1265    if !create_batch.is_empty() {
1266        let mut body = create_batch;
1267        body.extend_from_slice(&closing);
1268        create_res
1269            .extend(create_v1(url, access_token, domain_id, boundary, Body::from(body)).await?);
1270    }
1271    if !update_batch.is_empty() {
1272        let mut body = update_batch;
1273        body.extend_from_slice(&closing);
1274        update_res
1275            .extend(update_v1(url, access_token, domain_id, boundary, Body::from(body)).await?);
1276    }
1277
1278    let mut res = Vec::new();
1279    res.extend(create_res);
1280    res.extend(update_res);
1281    Ok(res)
1282}
1283
1284fn parse_headers(
1285    headers_slice: &[u8],
1286) -> Result<DomainData, Box<dyn std::error::Error + Send + Sync>> {
1287    let headers_str = String::from_utf8_lossy(headers_slice);
1288    let mut domain_data = None;
1289
1290    for line in headers_str.lines() {
1291        if line.trim().is_empty() {
1292            break;
1293        }
1294        if let Some((key, value)) = line.split_once(':') {
1295            let key = key.trim().to_lowercase();
1296            if key == "content-disposition" {
1297                let mut parsed_domain_data = DomainData {
1298                    metadata: DomainDataMetadata {
1299                        id: String::new(),
1300                        domain_id: String::new(),
1301                        name: String::new(),
1302                        data_type: String::new(),
1303                        size: 0,
1304                        created_at: String::new(),
1305                        updated_at: String::new(),
1306                    },
1307                    data: Vec::new(),
1308                };
1309                for part in value.split(';') {
1310                    let part = part.trim();
1311                    if let Some((key, value)) = part.split_once('=') {
1312                        let key = key.trim();
1313                        let value = value.trim().trim_matches('"');
1314                        match key {
1315                            "id" => parsed_domain_data.metadata.id = value.to_string(),
1316                            "domain-id" => {
1317                                parsed_domain_data.metadata.domain_id = value.to_string()
1318                            }
1319                            "name" => parsed_domain_data.metadata.name = value.to_string(),
1320                            "data-type" => {
1321                                parsed_domain_data.metadata.data_type = value.to_string()
1322                            }
1323                            "size" => parsed_domain_data.metadata.size = value.parse()?,
1324                            "created-at" => {
1325                                parsed_domain_data.metadata.created_at = value.to_string()
1326                            }
1327                            "updated-at" => {
1328                                parsed_domain_data.metadata.updated_at = value.to_string()
1329                            }
1330                            _ => {}
1331                        }
1332                    }
1333                }
1334                domain_data = Some(parsed_domain_data);
1335            }
1336        }
1337    }
1338
1339    if let Some(domain_data) = domain_data {
1340        Ok(domain_data)
1341    } else {
1342        Err("Missing content-disposition header".into())
1343    }
1344}
1345
1346fn find_boundary(data: &[u8], boundary: &[u8]) -> Option<usize> {
1347    let _data = String::from_utf8_lossy(data);
1348    let _boundary = String::from_utf8_lossy(boundary);
1349    data.windows(boundary.len())
1350        .position(|window| window == boundary)
1351}
1352
1353fn find_headers_end(data: &[u8]) -> Option<usize> {
1354    if let Some(i) = data.windows(4).position(|w| w == b"\r\n\r\n") {
1355        Some(i + 4) // body starts after \r\n\r\n
1356    } else {
1357        data.windows(2).position(|w| w == b"\n\n").map(|i| i + 2)
1358    }
1359}
1360
1361async fn handle_domain_data_stream(
1362    mut tx: mpsc::Sender<Result<DomainData, DomainError>>,
1363    stream: impl Stream<Item = Result<Bytes, reqwest::Error>>,
1364    boundary: &str,
1365) {
1366    use futures::pin_mut;
1367
1368    let mut buffer = Vec::new();
1369    let mut current_domain_data: Option<DomainData> = None;
1370    let boundary_bytes = format!("--{}", boundary).as_bytes().to_vec();
1371
1372    pin_mut!(stream);
1373
1374    while let Some(chunk_result) = stream.next().await {
1375        // Handle chunk result
1376        let chunk = match chunk_result {
1377            Ok(c) if c.is_empty() => {
1378                tx.close().await.ok();
1379                return;
1380            }
1381            Ok(c) => c,
1382            Err(e) => {
1383                let _ = tx.send(Err(e.into())).await;
1384                return;
1385            }
1386        };
1387
1388        buffer.extend_from_slice(&chunk);
1389
1390        // If we are in the middle of reading a domain_data part, continue filling it
1391        if let Some(mut domain_data) = current_domain_data.take() {
1392            let expected_size = domain_data.metadata.size as usize - domain_data.data.len();
1393            if buffer.len() >= expected_size {
1394                domain_data.data.extend_from_slice(&buffer[..expected_size]);
1395                buffer.drain(..expected_size);
1396                if tx.send(Ok(domain_data)).await.is_err() {
1397                    return;
1398                }
1399            } else {
1400                domain_data.data.extend_from_slice(&buffer);
1401                buffer.clear();
1402                current_domain_data = Some(domain_data);
1403                continue;
1404            }
1405        }
1406
1407        // Process all boundaries in the current buffer
1408        while let Some(boundary_pos) = find_boundary(&buffer, &boundary_bytes) {
1409            // Look for header end after boundary
1410            let header_end = match find_headers_end(&buffer[boundary_pos..]) {
1411                Some(end) => end,
1412                None => break, // Incomplete headers, wait for more chunks
1413            };
1414
1415            let headers_slice = &buffer[boundary_pos..boundary_pos + header_end];
1416            let part_headers = parse_headers(headers_slice);
1417
1418            let mut domain_data = match part_headers {
1419                Ok(data) => data,
1420                Err(e) => {
1421                    tracing::error!("Failed to parse headers: {:?}", e);
1422                    return;
1423                }
1424            };
1425
1426            // Remove processed data (boundary + headers) from buffer
1427            buffer.drain(..boundary_pos + header_end);
1428
1429            let expected_size = domain_data.metadata.size as usize - domain_data.data.len();
1430            if buffer.len() >= expected_size {
1431                domain_data.data.extend_from_slice(&buffer[..expected_size]);
1432                buffer.drain(..expected_size);
1433                if tx.send(Ok(domain_data)).await.is_err() {
1434                    return;
1435                }
1436            } else {
1437                domain_data.data.extend_from_slice(&buffer);
1438                buffer.clear();
1439                current_domain_data = Some(domain_data);
1440                break;
1441            }
1442        }
1443    }
1444}
1445
1446#[cfg(test)]
1447mod tests {
1448    use super::*;
1449    use bytes::Bytes;
1450
1451    #[test]
1452    fn test_find_boundary_found() {
1453        let data = b"random--boundary--data";
1454        let boundary = b"--boundary";
1455        assert_eq!(find_boundary(data, boundary), Some(6));
1456    }
1457
1458    #[test]
1459    fn test_find_boundary_not_found() {
1460        let data = b"random-data";
1461        let boundary = b"--boundary";
1462        assert_eq!(find_boundary(data, boundary), None);
1463    }
1464
1465    #[test]
1466    fn test_find_headers_end_crlf() {
1467        let data = b"header1: value1\r\nheader2: value2\r\n\r\nbody";
1468        assert_eq!(find_headers_end(data), Some(36));
1469    }
1470
1471    #[test]
1472    fn test_find_headers_end_lf() {
1473        let data = b"header1: value1\nheader2: value2\n\nbody";
1474        assert_eq!(find_headers_end(data), Some(33));
1475    }
1476
1477    #[test]
1478    fn test_find_headers_end_none() {
1479        let data = b"header1: value1\nheader2: value2\nbody";
1480        assert_eq!(find_headers_end(data), None);
1481    }
1482
1483    #[test]
1484    fn test_parse_headers_success() {
1485        let headers = b"content-disposition: form-data; id=\"123\"; domain-id=\"abc\"; name=\"test\"; data-type=\"type\"; size=\"42\"; created-at=\"2024-01-01T00:00:00Z\"; updated-at=\"2024-01-02T00:00:00Z\"\r\n\r\n";
1486        let parsed = super::parse_headers(headers);
1487        assert!(parsed.is_ok());
1488        let domain_data = parsed.unwrap();
1489        assert_eq!(domain_data.metadata.id, "123");
1490        assert_eq!(domain_data.metadata.domain_id, "abc");
1491        assert_eq!(domain_data.metadata.name, "test");
1492        assert_eq!(domain_data.metadata.data_type, "type");
1493        assert_eq!(domain_data.metadata.size, 42);
1494        assert_eq!(domain_data.metadata.created_at, "2024-01-01T00:00:00Z");
1495        assert_eq!(domain_data.metadata.updated_at, "2024-01-02T00:00:00Z");
1496    }
1497
1498    #[test]
1499    fn test_parse_headers_missing_content_disposition() {
1500        let headers = b"content-type: application/octet-stream\r\n\r\n";
1501        let parsed = super::parse_headers(headers);
1502        assert!(parsed.is_err());
1503    }
1504
1505    #[tokio::test]
1506    async fn test_a_chunk_contains_multiple_data() {
1507        let (tx, rx) = mpsc::channel(10);
1508
1509        let payload = br#"
1510        --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
1511Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
1512Content-Type: application/octet-stream
1513
1514{"test": "test"}
1515--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
1516Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
1517Content-Type: application/octet-stream
1518
1519{"test": "test updated"}
1520--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
1521        "#;
1522        let stream = tokio_stream::iter(vec![Ok(Bytes::from_static(payload))]);
1523
1524        handle_domain_data_stream(
1525            tx,
1526            stream,
1527            "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
1528        )
1529        .await;
1530
1531        let output: Vec<DomainData> = rx
1532            .collect::<Vec<Result<DomainData, DomainError>>>()
1533            .await
1534            .into_iter()
1535            .map(|r| r.unwrap())
1536            .collect();
1537        assert_eq!(output.len(), 2);
1538        assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
1539        assert_eq!(output[0].data, b"{\"test\": \"test\"}");
1540    }
1541
1542    #[tokio::test]
1543    async fn test_chunk_size_is_smaller_than_part() {
1544        let (tx, rx) = mpsc::channel(10);
1545
1546        let payload = br#"
1547        --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
1548Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
1549Content-Type: application/octet-stream
1550        "#;
1551        let payload2 = br#"
1552
1553{"test": "test"}
1554--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
1555Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
1556Content-Type: application/octet-stream
1557
1558{"test": "test updated"}
1559--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
1560"#;
1561        let stream = tokio_stream::iter(vec![
1562            Ok(Bytes::from_static(payload)),
1563            Ok(Bytes::from_static(payload2)),
1564        ]);
1565
1566        handle_domain_data_stream(
1567            tx,
1568            stream,
1569            "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
1570        )
1571        .await;
1572
1573        let output: Vec<DomainData> = rx
1574            .collect::<Vec<Result<DomainData, DomainError>>>()
1575            .await
1576            .into_iter()
1577            .map(|r| r.unwrap())
1578            .collect();
1579        assert_eq!(output.len(), 2);
1580        assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
1581        assert_eq!(output[0].data, b"{\"test\": \"test\"}");
1582    }
1583
1584    #[tokio::test]
1585    async fn test_chunk_size_is_smaller_than_header() {
1586        let (tx, rx) = mpsc::channel(10);
1587
1588        let payload = br#"
1589        --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
1590Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
1591Content-Type: application/octet-stream
1592        "#;
1593        let payload2 = br#"
1594e: application/octet-stream
1595
1596{"test": "test"}
1597--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
1598Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
1599Content-Type: application/octet-stream
1600
1601{"test": "test updated"}
1602--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
1603"#;
1604        let stream = tokio_stream::iter(vec![
1605            Ok(Bytes::from_static(payload)),
1606            Ok(Bytes::from_static(payload2)),
1607        ]);
1608
1609        handle_domain_data_stream(
1610            tx,
1611            stream,
1612            "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
1613        )
1614        .await;
1615
1616        let output: Vec<DomainData> = rx
1617            .collect::<Vec<Result<DomainData, DomainError>>>()
1618            .await
1619            .into_iter()
1620            .map(|r| r.unwrap())
1621            .collect();
1622        assert_eq!(output.len(), 2);
1623        assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
1624        assert_eq!(output[0].data, b"{\"test\": \"test\"}");
1625    }
1626
1627    #[tokio::test]
1628    async fn test_chunk_size_doesnt_cover_the_whole_data() {
1629        let (tx, rx) = mpsc::channel(10);
1630
1631        let payload = br#"
1632        --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
1633Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
1634Content-Type: application/octet-stream
1635
1636{"test": "test"#;
1637        let payload2 = br#""}
1638--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
1639Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
1640Content-Type: application/octet-stream
1641
1642{"test": "test updated"}
1643--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
1644"#;
1645        let stream = tokio_stream::iter(vec![
1646            Ok(Bytes::from_static(payload)),
1647            Ok(Bytes::from_static(payload2)),
1648        ]);
1649
1650        handle_domain_data_stream(
1651            tx,
1652            stream,
1653            "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
1654        )
1655        .await;
1656
1657        let output: Vec<DomainData> = rx
1658            .collect::<Vec<Result<DomainData, DomainError>>>()
1659            .await
1660            .into_iter()
1661            .map(|r| r.unwrap())
1662            .collect();
1663        assert_eq!(output.len(), 2);
1664        assert_eq!(
1665            std::str::from_utf8(&output[1].data).unwrap(),
1666            "{\"test\": \"test updated\"}"
1667        );
1668        assert_eq!(
1669            std::str::from_utf8(&output[0].data).unwrap(),
1670            "{\"test\": \"test\"}"
1671        );
1672    }
1673}