posemesh_domain_http/
domain_data.rs

1use bytes::Bytes;
2use futures::{SinkExt, Stream, channel::mpsc, stream::StreamExt};
3use reqwest::{Body, Client, Response, StatusCode};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6#[cfg(not(target_family = "wasm"))]
7use tokio::spawn;
8#[cfg(target_family = "wasm")]
9use wasm_bindgen_futures::spawn_local as spawn;
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct DomainDataMetadata {
13    pub id: String,
14    pub domain_id: String,
15    pub name: String,
16    pub data_type: String,
17    pub size: u64,
18    pub created_at: String,
19    pub updated_at: String,
20}
21
22#[derive(Debug, Deserialize, Serialize)]
23pub struct DomainData {
24    // #[serde(flatten)] This doesn't work in serde_wasm_bindgen, it generates Map instead of a plain object
25    pub metadata: DomainDataMetadata,
26    pub data: Vec<u8>,
27}
28
29#[derive(Debug, Serialize, Clone, Deserialize)]
30pub struct UpdateDomainData {
31    pub id: String,
32}
33
34#[derive(Debug, Serialize, Clone, Deserialize)]
35pub struct CreateDomainData {
36    pub name: String,
37    pub data_type: String,
38}
39
40#[derive(Debug, Serialize, Deserialize)]
41#[serde(untagged)]
42pub enum DomainAction {
43    Create(CreateDomainData),
44    Update(UpdateDomainData),
45}
46
47#[derive(Debug, Serialize, Deserialize)]
48pub struct UploadDomainData {
49    #[serde(flatten)]
50    pub action: DomainAction,
51    pub data: Vec<u8>,
52}
53
54#[derive(Debug, Serialize, Deserialize)]
55pub struct DownloadQuery {
56    pub ids: Vec<String>,
57    pub name: Option<String>,
58    pub data_type: Option<String>,
59}
60
61#[derive(Debug, Deserialize)]
62struct ListDomainDataMetadata {
63    pub data: Vec<DomainDataMetadata>,
64}
65
66pub async fn download_by_id(
67    url: &str,
68    client_id: &str,
69    access_token: &str,
70    domain_id: &str,
71    id: &str,
72) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
73    let base = url.trim_end_matches('/');
74    let response = Client::new()
75        .get(format!(
76            "{}/api/v1/domains/{}/data/{}?raw=true",
77            base, domain_id, id
78        ))
79        .bearer_auth(access_token)
80        .header("posemesh-client-id", client_id)
81        .send()
82        .await?;
83
84    if response.status().is_success() {
85        let data = response.bytes().await?;
86        Ok(data.to_vec())
87    } else {
88        let status = response.status();
89        let text = response
90            .text()
91            .await
92            .unwrap_or_else(|_| "Unknown error".to_string());
93        Err(format!(
94            "Failed to download data by id. Status: {} - {}",
95            status, text
96        )
97        .into())
98    }
99}
100
101/// Perform a direct absolute download request to the domain server.
102/// Sets headers: Accept: multipart/form-data, Authorization: Bearer <token>, posemesh-client-id.
103pub async fn request_download_absolute(
104    url: &str,
105    client_id: &str,
106    access_token: &str,
107) -> Result<Response, Box<dyn std::error::Error + Send + Sync>> {
108    let response = Client::new()
109        .get(url)
110        .bearer_auth(access_token)
111        .header("posemesh-client-id", client_id)
112        .header("Accept", "multipart/form-data")
113        .send()
114        .await?;
115    Ok(response)
116}
117
118pub async fn download_metadata_v1(
119    url: &str,
120    client_id: &str,
121    access_token: &str,
122    domain_id: &str,
123    query: &DownloadQuery,
124) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
125    let response = download_v1(url, client_id, access_token, domain_id, query, false).await?;
126    if response.status().is_success() {
127        let data = response.json::<ListDomainDataMetadata>().await?;
128        Ok(data.data)
129    } else {
130        let status = response.status();
131        let text = response
132            .text()
133            .await
134            .unwrap_or_else(|_| "Unknown error".to_string());
135        Err(format!("Failed to download data. Status: {} - {}", status, text).into())
136    }
137}
138
139pub async fn download_v1(
140    url: &str,
141    client_id: &str,
142    access_token: &str,
143    domain_id: &str,
144    query: &DownloadQuery,
145    with_data: bool,
146) -> Result<Response, Box<dyn std::error::Error + Send + Sync>> {
147    let mut params = HashMap::new();
148
149    if let Some(name) = &query.name {
150        params.insert("name", name.clone());
151    }
152    if let Some(data_type) = &query.data_type {
153        params.insert("data_type", data_type.clone());
154    }
155    let ids = if !query.ids.is_empty() {
156        format!("?ids={}", query.ids.join(","))
157    } else {
158        String::new()
159    };
160
161    let base = url.trim_end_matches('/');
162    let response = Client::new()
163        .get(format!("{}/api/v1/domains/{}/data{}", base, domain_id, ids))
164        .bearer_auth(access_token)
165        .header(
166            "Accept",
167            if with_data {
168                "multipart/form-data"
169            } else {
170                "application/json"
171            },
172        )
173        .header("posemesh-client-id", client_id)
174        .query(&params)
175        .send()
176        .await?;
177
178    if response.status().is_success() {
179        Ok(response)
180    } else {
181        let status = response.status();
182        let text = response
183            .text()
184            .await
185            .unwrap_or_else(|_| "Unknown error".to_string());
186        Err(format!("Failed to download data. Status: {} - {}", status, text).into())
187    }
188}
189
190pub async fn download_v1_stream(
191    url: &str,
192    client_id: &str,
193    access_token: &str,
194    domain_id: &str,
195    query: &DownloadQuery,
196) -> Result<
197    mpsc::Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
198    Box<dyn std::error::Error + Send + Sync>,
199> {
200    let response = download_v1(url, client_id, access_token, domain_id, query, true).await?;
201
202    stream_from_response(response).await
203}
204
205/// Build a stream from an HTTP response returned by the domain download endpoint.
206/// Parses multipart boundaries and yields DomainData items as they arrive.
207pub async fn stream_from_response(
208    response: Response,
209) -> Result<
210    mpsc::Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
211    Box<dyn std::error::Error + Send + Sync>,
212> {
213    let (mut tx, rx) =
214        mpsc::channel::<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>(100);
215
216    let boundary = match response
217        .headers()
218        .get("content-type")
219        .and_then(|ct| ct.to_str().ok())
220        .and_then(|ct| {
221            if ct.starts_with("multipart/form-data; boundary=") {
222                Some(ct.split("boundary=").nth(1)?.to_string())
223            } else {
224                None
225            }
226        }) {
227        Some(b) => b,
228        None => {
229            tracing::error!("Invalid content-type header");
230            let _ = tx.close().await;
231            return Err("Invalid content-type header".into());
232        }
233    };
234
235    spawn(async move {
236        let stream = response.bytes_stream();
237        handle_domain_data_stream(tx, stream, &boundary).await;
238    });
239
240    Ok(rx)
241}
242
243pub async fn delete_by_id(
244    url: &str,
245    access_token: &str,
246    domain_id: &str,
247    id: &str,
248) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
249    let base = url.trim_end_matches('/');
250    let endpoint = format!("{}/api/v1/domains/{}/data/{}", base, domain_id, id);
251    let client = Client::new();
252    let resp = client
253        .delete(&endpoint)
254        .bearer_auth(access_token)
255        .send()
256        .await?;
257
258    if resp.status().is_success() {
259        Ok(())
260    } else {
261        let err = resp
262            .text()
263            .await
264            .unwrap_or_else(|_| "Unknown error".to_string());
265        Err(format!("Delete failed with status: {}", err).into())
266    }
267}
268
269#[cfg(not(target_family = "wasm"))]
270pub async fn upload_v1_stream(
271    url: &str,
272    access_token: &str,
273    domain_id: &str,
274    mut rx: mpsc::Receiver<UploadDomainData>,
275) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
276    use futures::channel::oneshot;
277
278    let boundary = "boundary";
279
280    let (mut create_tx, create_rx) = mpsc::channel(100);
281    let (mut update_tx, update_rx) = mpsc::channel(100);
282
283    let create_body = Body::wrap_stream(create_rx.map(Ok::<Vec<u8>, std::io::Error>));
284    let update_body = Body::wrap_stream(update_rx.map(Ok::<Vec<u8>, std::io::Error>));
285
286    let url = url.to_string();
287    let url_2 = url.clone();
288    let access_token = access_token.to_string();
289    let domain_id = domain_id.to_string();
290    let access_token_2 = access_token.clone();
291    let domain_id_2 = domain_id.clone();
292
293    let (create_signal, create_signal_rx) = oneshot::channel::<
294        Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>>,
295    >();
296    let (update_signal, update_signal_rx) = oneshot::channel::<
297        Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>>,
298    >();
299
300    spawn(async move {
301        let create_response =
302            create_v1(&url, &access_token, &domain_id, boundary, create_body).await;
303        if let Err(Err(e)) = create_signal.send(create_response) {
304            tracing::error!("Failed to send create response: {}", e);
305        }
306    });
307
308    spawn(async move {
309        let update_response =
310            update_v1(&url_2, &access_token_2, &domain_id_2, boundary, update_body).await;
311        if let Err(Err(e)) = update_signal.send(update_response) {
312            tracing::error!("Failed to send update response: {}", e);
313        }
314    });
315
316    while let Some(datum) = rx.next().await {
317        match datum.action {
318            DomainAction::Create(create) => {
319                let create_data = write_create_body(boundary, &create, &datum.data);
320                create_tx.clone().send(create_data).await?;
321            }
322            DomainAction::Update(update) => {
323                let update_data = write_update_body(boundary, &update, &datum.data);
324                update_tx.send(update_data).await?;
325            }
326        }
327    }
328    update_tx
329        .send(format!("--{}--\r\n", boundary).as_bytes().to_vec())
330        .await?;
331    create_tx
332        .send(format!("--{}--\r\n", boundary).as_bytes().to_vec())
333        .await?;
334    update_tx.close().await?;
335    create_tx.close().await?;
336
337    let mut data = {
338        if let Ok(res) = create_signal_rx.await {
339            match res {
340                Ok(d) => d,
341                Err(e) => return Err(e),
342            }
343        } else {
344            return Err("create cancelled".into());
345        }
346    };
347
348    if let Ok(res) = update_signal_rx.await {
349        match res {
350            Ok(d) => data.extend(d),
351            Err(e) => return Err(e),
352        }
353    } else {
354        return Err("update cancelled".into());
355    }
356
357    Ok(data)
358}
359
360async fn update_v1(
361    url: &str,
362    access_token: &str,
363    domain_id: &str,
364    boundary: &str,
365    body: Body,
366) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
367    let base = url.trim_end_matches('/');
368    let update_response = Client::new()
369        .put(format!("{}/api/v1/domains/{}/data", base, domain_id))
370        .bearer_auth(access_token)
371        .header(
372            "Content-Type",
373            &format!("multipart/form-data; boundary={}", boundary),
374        )
375        .body(body)
376        .send()
377        .await?;
378
379    if update_response.status().is_success() {
380        let data = update_response
381            .json::<ListDomainDataMetadata>()
382            .await
383            .unwrap();
384        Ok(data.data)
385    } else {
386        let err = update_response
387            .text()
388            .await
389            .unwrap_or_else(|_| "Unknown error".to_string());
390        Err(format!("Update failed with status: {}", err).into())
391    }
392}
393
394async fn create_v1(
395    url: &str,
396    access_token: &str,
397    domain_id: &str,
398    boundary: &str,
399    body: Body,
400) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
401    let base = url.trim_end_matches('/');
402    let create_response = Client::new()
403        .post(format!("{}/api/v1/domains/{}/data", base, domain_id))
404        .bearer_auth(access_token)
405        .header(
406            "Content-Type",
407            &format!("multipart/form-data; boundary={}", boundary),
408        )
409        .body(body)
410        .send()
411        .await?;
412
413    if create_response.status().is_success() {
414        let data = create_response
415            .json::<ListDomainDataMetadata>()
416            .await
417            .unwrap();
418        Ok(data.data)
419    } else {
420        let err = create_response
421            .text()
422            .await
423            .unwrap_or_else(|_| "Unknown error".to_string());
424        Err(format!("Create failed with status: {}", err).into())
425    }
426}
427
428fn write_create_body(boundary: &str, data: &CreateDomainData, data_bytes: &[u8]) -> Vec<u8> {
429    let create_bytes = format!(
430        "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
431        boundary, data.name, data.data_type
432    );
433    let mut create_data = create_bytes.into_bytes();
434    create_data.extend_from_slice(data_bytes);
435    create_data.extend_from_slice("\r\n".as_bytes());
436    create_data
437}
438
439fn write_update_body(boundary: &str, data: &UpdateDomainData, data_bytes: &[u8]) -> Vec<u8> {
440    let update_bytes = format!(
441        "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; id=\"{}\"\r\n\r\n",
442        boundary, data.id
443    );
444    let mut update_data = update_bytes.into_bytes();
445    update_data.extend_from_slice(data_bytes);
446    update_data.extend_from_slice("\r\n".as_bytes());
447    update_data
448}
449
450pub async fn upload_v1(
451    url: &str,
452    access_token: &str,
453    domain_id: &str,
454    data: Vec<UploadDomainData>,
455) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
456    let boundary = "boundary";
457
458    let mut create_body = Vec::new();
459    let mut update_body = Vec::new();
460    let mut to_update = false;
461    let mut to_create = false;
462
463    // Process the first item to get metadata for the form
464    for datum in data {
465        match datum.action {
466            DomainAction::Create(create) => {
467                to_create = true;
468                let create_data = write_create_body(boundary, &create, &datum.data);
469                create_body.extend_from_slice(&create_data);
470            }
471            DomainAction::Update(update) => {
472                to_update = true;
473                let update_data = write_update_body(boundary, &update, &datum.data);
474                update_body.extend_from_slice(&update_data);
475            }
476        }
477    }
478
479    create_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
480    update_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
481
482    let create_body = Body::from(create_body);
483    let update_body = Body::from(update_body);
484    let mut res = Vec::new();
485
486    if to_create {
487        res = create_v1(url, access_token, domain_id, boundary, create_body).await?;
488    }
489    if to_update {
490        let update_response =
491            update_v1(url, access_token, domain_id, boundary, update_body).await?;
492        if !update_response.is_empty() {
493            res.extend(update_response);
494        }
495    }
496
497    Ok(res)
498}
499
500/// Upload a single domain data item (create or update) as one HTTP request.
501/// Returns a list of created/updated DomainData entries (with empty data payloads).
502#[cfg(not(target_family = "wasm"))]
503pub async fn upload_one(
504    url: &str,
505    access_token: &str,
506    domain_id: &str,
507    data: UploadDomainData,
508) -> Result<Vec<DomainData>, (StatusCode, String)> {
509    let boundary = "boundary";
510    let (method, body) = match &data.action {
511        DomainAction::Create(create) => {
512            let mut bytes = write_create_body(boundary, create, &data.data);
513            // terminate multipart body
514            bytes.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
515            (reqwest::Method::POST, Body::from(bytes))
516        }
517        DomainAction::Update(update) => {
518            let mut bytes = write_update_body(boundary, update, &data.data);
519            // terminate multipart body
520            bytes.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
521            (reqwest::Method::PUT, Body::from(bytes))
522        }
523    };
524
525    let base = url.trim_end_matches('/');
526    let endpoint = format!("{}/api/v1/domains/{}/data", base, domain_id);
527    let response = Client::new()
528        .request(method, endpoint)
529        .bearer_auth(access_token)
530        .header(
531            "Content-Type",
532            &format!("multipart/form-data; boundary={}", boundary),
533        )
534        .body(body)
535        .send()
536        .await
537        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
538
539    if response.status().is_success() {
540        // Be tolerant to varying response shapes (full metadata or id-only)
541        let text = response
542            .text()
543            .await
544            .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
545        // Try strict metadata first
546        if let Ok(md) = serde_json::from_str::<ListDomainDataMetadata>(&text) {
547            let items: Vec<DomainData> = md
548                .data
549                .into_iter()
550                .map(|m| DomainData {
551                    metadata: m,
552                    data: Vec::new(),
553                })
554                .collect();
555            return Ok(items);
556        }
557        // Fallback: extract minimal fields from generic JSON
558        let v: serde_json::Value = serde_json::from_str(&text)
559            .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
560        let mut out: Vec<DomainData> = Vec::new();
561        if let Some(arr) = v.get("data").and_then(|d| d.as_array()) {
562            for item in arr {
563                let id = item.get("id").and_then(|x| x.as_str()).unwrap_or("").to_string();
564                let domain_id = item
565                    .get("domain_id")
566                    .and_then(|x| x.as_str())
567                    .unwrap_or("")
568                    .to_string();
569                let name = item
570                    .get("name")
571                    .and_then(|x| x.as_str())
572                    .unwrap_or("")
573                    .to_string();
574                let data_type = item
575                    .get("data_type")
576                    .and_then(|x| x.as_str())
577                    .unwrap_or("")
578                    .to_string();
579                out.push(DomainData {
580                    metadata: DomainDataMetadata {
581                        id,
582                        domain_id,
583                        name,
584                        data_type,
585                        size: 0,
586                        created_at: String::new(),
587                        updated_at: String::new(),
588                    },
589                    data: Vec::new(),
590                });
591            }
592            return Ok(out);
593        }
594        Err((StatusCode::INTERNAL_SERVER_ERROR, "invalid response".to_string()))
595    } else {
596        let status = response.status();
597        let text = response
598            .text()
599            .await
600            .unwrap_or_else(|_| "Unknown error".to_string());
601        Err((status, text))
602    }
603}
604
605fn parse_headers(
606    headers_slice: &[u8],
607) -> Result<DomainData, Box<dyn std::error::Error + Send + Sync>> {
608    let headers_str = String::from_utf8_lossy(headers_slice);
609    let mut domain_data = None;
610
611    for line in headers_str.lines() {
612        if line.trim().is_empty() {
613            break;
614        }
615        if let Some((key, value)) = line.split_once(':') {
616            let key = key.trim().to_lowercase();
617            if key == "content-disposition" {
618                let mut parsed_domain_data = DomainData {
619                    metadata: DomainDataMetadata {
620                        id: String::new(),
621                        domain_id: String::new(),
622                        name: String::new(),
623                        data_type: String::new(),
624                        size: 0,
625                        created_at: String::new(),
626                        updated_at: String::new(),
627                    },
628                    data: Vec::new(),
629                };
630                for part in value.split(';') {
631                    let part = part.trim();
632                    if let Some((key, value)) = part.split_once('=') {
633                        let key = key.trim();
634                        let value = value.trim().trim_matches('"');
635                        match key {
636                            "id" => parsed_domain_data.metadata.id = value.to_string(),
637                            "domain-id" => {
638                                parsed_domain_data.metadata.domain_id = value.to_string()
639                            }
640                            "name" => parsed_domain_data.metadata.name = value.to_string(),
641                            "data-type" => {
642                                parsed_domain_data.metadata.data_type = value.to_string()
643                            }
644                            "size" => parsed_domain_data.metadata.size = value.parse()?,
645                            "created-at" => {
646                                parsed_domain_data.metadata.created_at = value.to_string()
647                            }
648                            "updated-at" => {
649                                parsed_domain_data.metadata.updated_at = value.to_string()
650                            }
651                            _ => {}
652                        }
653                    }
654                }
655                domain_data = Some(parsed_domain_data);
656            }
657        }
658    }
659
660    if let Some(domain_data) = domain_data {
661        Ok(domain_data)
662    } else {
663        Err("Missing content-disposition header".into())
664    }
665}
666
667fn find_boundary(data: &[u8], boundary: &[u8]) -> Option<usize> {
668    let _data = String::from_utf8_lossy(data);
669    let _boundary = String::from_utf8_lossy(boundary);
670    data.windows(boundary.len())
671        .position(|window| window == boundary)
672}
673
674fn find_headers_end(data: &[u8]) -> Option<usize> {
675    if let Some(i) = data.windows(4).position(|w| w == b"\r\n\r\n") {
676        return Some(i + 4);
677    }
678    data.windows(2)
679        .position(|w| w == b"\n\n")
680        .map(|i| i + 2)
681}
682
683async fn handle_domain_data_stream(
684    mut tx: mpsc::Sender<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
685    stream: impl Stream<Item = Result<Bytes, reqwest::Error>>,
686    boundary: &str,
687) {
688    use futures::pin_mut;
689
690    let mut buffer = Vec::new();
691    let mut current_domain_data: Option<DomainData> = None;
692    let boundary_bytes = format!("--{}", boundary).into_bytes();
693    let keep_tail = boundary_bytes.len() + 4; // bytes to keep for boundary detection across chunks
694
695    pin_mut!(stream);
696
697    while let Some(chunk_result) = stream.next().await {
698        let chunk = match chunk_result {
699            Ok(c) if c.is_empty() => continue,
700            Ok(c) => c,
701            Err(e) => {
702                let _ = tx.send(Err(e.into())).await;
703                return;
704            }
705        };
706
707        buffer.extend_from_slice(&chunk);
708
709        'consume: loop {
710            match &mut current_domain_data {
711                None => {
712                    let Some(boundary_pos) = find_boundary(&buffer, &boundary_bytes) else {
713                        if buffer.len() > keep_tail {
714                            buffer.drain(..buffer.len() - keep_tail);
715                        }
716                        break 'consume;
717                    };
718                    let Some(header_end_rel) = find_headers_end(&buffer[boundary_pos..]) else {
719                        break 'consume;
720                    };
721                    let headers_slice = &buffer[boundary_pos..boundary_pos + header_end_rel];
722                    let part_headers = parse_headers(headers_slice);
723                    let domain_data = match part_headers {
724                        Ok(d) => d,
725                        Err(e) => {
726                            tracing::error!("Failed to parse headers: {:?}", e);
727                            return;
728                        }
729                    };
730                    buffer.drain(..boundary_pos + header_end_rel);
731                    current_domain_data = Some(domain_data);
732                }
733                Some(dd) => {
734                    if let Some(next_boundary_pos) = find_boundary(&buffer, &boundary_bytes) {
735                        let mut data_end = next_boundary_pos;
736                        if data_end >= 2 && &buffer[data_end - 2..data_end] == b"\r\n" {
737                            data_end -= 2;
738                        } else if data_end >= 1 && buffer[data_end - 1] == b'\n' {
739                            data_end -= 1;
740                        }
741                        dd.data.extend_from_slice(&buffer[..data_end]);
742                        buffer.drain(..next_boundary_pos);
743                        let finished = current_domain_data.take().unwrap();
744                        if tx.send(Ok(finished)).await.is_err() {
745                            return;
746                        }
747                    } else {
748                        if buffer.len() > keep_tail {
749                            let take = buffer.len() - keep_tail;
750                            dd.data.extend_from_slice(&buffer[..take]);
751                            buffer.drain(..take);
752                        }
753                        break 'consume;
754                    }
755                }
756            }
757        }
758    }
759
760    let _ = tx.close().await;
761}
762
763#[cfg(test)]
764mod tests {
765    use bytes::Bytes;
766
767    use crate::{auth::TokenCache, config::Config, discovery::DiscoveryService};
768
769    use super::*;
770
771    fn get_config() -> (Config, String) {
772        if std::path::Path::new("../.env.local").exists() {
773            dotenvy::from_filename("../.env.local").ok();
774            dotenvy::dotenv().ok();
775        }
776        let config = Config::from_env().unwrap();
777        (config, std::env::var("DOMAIN_ID").unwrap())
778    }
779
780    #[test]
781    fn test_find_boundary_found() {
782        let data = b"random--boundary--data";
783        let boundary = b"--boundary";
784        assert_eq!(find_boundary(data, boundary), Some(6));
785    }
786
787    #[test]
788    fn test_find_boundary_not_found() {
789        let data = b"random-data";
790        let boundary = b"--boundary";
791        assert_eq!(find_boundary(data, boundary), None);
792    }
793
794    #[test]
795    fn test_find_headers_end_crlf() {
796        let data = b"header1: value1\r\nheader2: value2\r\n\r\nbody";
797        assert_eq!(find_headers_end(data), Some(36));
798    }
799
800    #[test]
801    fn test_find_headers_end_lf() {
802        let data = b"header1: value1\nheader2: value2\n\nbody";
803        assert_eq!(find_headers_end(data), Some(33));
804    }
805
806    #[test]
807    fn test_find_headers_end_none() {
808        let data = b"header1: value1\nheader2: value2\nbody";
809        assert_eq!(find_headers_end(data), None);
810    }
811
812    #[test]
813    fn test_parse_headers_success() {
814        let headers = b"content-disposition: form-data; id=\"123\"; domain-id=\"abc\"; name=\"test\"; data-type=\"type\"; size=\"42\"; created-at=\"2024-01-01T00:00:00Z\"; updated-at=\"2024-01-02T00:00:00Z\"\r\n\r\n";
815        let parsed = super::parse_headers(headers);
816        assert!(parsed.is_ok());
817        let domain_data = parsed.unwrap();
818        assert_eq!(domain_data.metadata.id, "123");
819        assert_eq!(domain_data.metadata.domain_id, "abc");
820        assert_eq!(domain_data.metadata.name, "test");
821        assert_eq!(domain_data.metadata.data_type, "type");
822        assert_eq!(domain_data.metadata.size, 42);
823        assert_eq!(domain_data.metadata.created_at, "2024-01-01T00:00:00Z");
824        assert_eq!(domain_data.metadata.updated_at, "2024-01-02T00:00:00Z");
825    }
826
827    #[test]
828    fn test_parse_headers_missing_content_disposition() {
829        let headers = b"content-type: application/octet-stream\r\n\r\n";
830        let parsed = super::parse_headers(headers);
831        assert!(parsed.is_err());
832    }
833
834    #[tokio::test]
835    async fn test_a_chunk_contains_multiple_data() {
836        let (tx, rx) = mpsc::channel(10);
837
838        let payload = br#"
839        --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
840Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
841Content-Type: application/octet-stream
842
843{"test": "test"}
844--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
845Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
846Content-Type: application/octet-stream
847
848{"test": "test updated"}
849--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
850        "#;
851        let stream = tokio_stream::iter(vec![Ok(Bytes::from_static(payload))]);
852
853        handle_domain_data_stream(
854            tx,
855            stream,
856            "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
857        )
858        .await;
859
860        let output: Vec<DomainData> = rx
861            .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
862            .await
863            .into_iter()
864            .map(|r| r.unwrap())
865            .collect();
866        assert_eq!(output.len(), 2);
867        assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
868        assert_eq!(output[0].data, b"{\"test\": \"test\"}");
869    }
870
871    #[tokio::test]
872    async fn test_chunk_size_is_smaller_than_part() {
873        let (tx, rx) = mpsc::channel(10);
874
875        let payload = br#"
876        --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
877Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
878Content-Type: application/octet-stream
879        "#;
880        let payload2 = br#"
881
882{"test": "test"}
883--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
884Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
885Content-Type: application/octet-stream
886
887{"test": "test updated"}
888--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
889"#;
890        let stream = tokio_stream::iter(vec![
891            Ok(Bytes::from_static(payload)),
892            Ok(Bytes::from_static(payload2)),
893        ]);
894
895        handle_domain_data_stream(
896            tx,
897            stream,
898            "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
899        )
900        .await;
901
902        let output: Vec<DomainData> = rx
903            .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
904            .await
905            .into_iter()
906            .map(|r| r.unwrap())
907            .collect();
908        assert_eq!(output.len(), 2);
909        assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
910        assert_eq!(output[0].data, b"{\"test\": \"test\"}");
911    }
912
913    #[tokio::test]
914    async fn test_chunk_size_is_smaller_than_header() {
915        let (tx, rx) = mpsc::channel(10);
916
917        let payload = br#"
918        --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
919Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
920Content-Type: application/octet-stream
921        "#;
922        let payload2 = br#"
923e: application/octet-stream
924
925{"test": "test"}
926--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
927Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
928Content-Type: application/octet-stream
929
930{"test": "test updated"}
931--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
932"#;
933        let stream = tokio_stream::iter(vec![
934            Ok(Bytes::from_static(payload)),
935            Ok(Bytes::from_static(payload2)),
936        ]);
937
938        handle_domain_data_stream(
939            tx,
940            stream,
941            "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
942        )
943        .await;
944
945        let output: Vec<DomainData> = rx
946            .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
947            .await
948            .into_iter()
949            .map(|r| r.unwrap())
950            .collect();
951        assert_eq!(output.len(), 2);
952        assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
953        assert_eq!(output[0].data, b"{\"test\": \"test\"}");
954    }
955
956    #[tokio::test]
957    async fn test_chunk_size_doesnt_cover_the_whole_data() {
958        let (tx, rx) = mpsc::channel(10);
959
960        let payload = br#"
961        --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
962Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
963Content-Type: application/octet-stream
964
965{"test": "test"#;
966        let payload2 = br#""}
967--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
968Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
969Content-Type: application/octet-stream
970
971{"test": "test updated"}
972--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
973"#;
974        let stream = tokio_stream::iter(vec![
975            Ok(Bytes::from_static(payload)),
976            Ok(Bytes::from_static(payload2)),
977        ]);
978
979        handle_domain_data_stream(
980            tx,
981            stream,
982            "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
983        )
984        .await;
985
986        let output: Vec<DomainData> = rx
987            .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
988            .await
989            .into_iter()
990            .map(|r| r.unwrap())
991            .collect();
992        assert_eq!(output.len(), 2);
993        assert_eq!(
994            std::str::from_utf8(&output[1].data).unwrap(),
995            "{\"test\": \"test updated\"}"
996        );
997        assert_eq!(
998            std::str::from_utf8(&output[0].data).unwrap(),
999            "{\"test\": \"test\"}"
1000        );
1001    }
1002
1003    #[tokio::test]
1004    async fn test_upload_v1_with_user_dds_access_token() {
1005        use crate::domain_data::{CreateDomainData, DomainAction, UploadDomainData};
1006
1007        let (config, domain_id) = get_config();
1008
1009        let mut discovery =
1010            DiscoveryService::new(&config.api_url, &config.dds_url, &config.client_id);
1011        discovery
1012            .sign_in_with_auki_account(&config.email.unwrap(), &config.password.unwrap(), false)
1013            .await
1014            .expect("sign_in_with_auki_account failed");
1015        let domain = discovery
1016            .auth_domain(&domain_id)
1017            .await
1018            .expect("get_domain failed");
1019        // 4. Prepare upload data
1020        let upload_data = vec![
1021            UploadDomainData {
1022                action: DomainAction::Create(CreateDomainData {
1023                    name: "test_upload".to_string(),
1024                    data_type: "test".to_string(),
1025                }),
1026                data: b"hello world".to_vec(),
1027            },
1028            UploadDomainData {
1029                action: DomainAction::Update(UpdateDomainData {
1030                    id: "a84a36e5-312b-4f80-974a-06f5d19c1e16".to_string(),
1031                }),
1032                data: b"{\"test\": \"test updated\"}".to_vec(),
1033            },
1034        ];
1035
1036        // 5. Call upload_v1
1037        let result = upload_v1(
1038            &domain.domain.domain_server.url,
1039            &domain.get_access_token(),
1040            &domain_id,
1041            upload_data,
1042        )
1043        .await
1044        .expect("upload_v1 failed");
1045
1046        assert_eq!(result.len(), 2, "No metadata returned from upload_v1");
1047        for data in result {
1048            if data.id != "a84a36e5-312b-4f80-974a-06f5d19c1e16" {
1049                assert_eq!(data.name, "test_upload");
1050                delete_by_id(
1051                    &domain.domain.domain_server.url,
1052                    &domain.get_access_token(),
1053                    &domain_id,
1054                    &data.id,
1055                )
1056                .await
1057                .expect("delete_by_id failed");
1058            }
1059        }
1060    }
1061}