posemesh_domain_http/
domain_data.rs

1use bytes::Bytes;
2use futures::{SinkExt, Stream, channel::mpsc, stream::StreamExt};
3use reqwest::{Body, Client, Response};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6#[cfg(not(target_family = "wasm"))]
7use tokio::spawn;
8#[cfg(target_family = "wasm")]
9use wasm_bindgen_futures::spawn_local as spawn;
10
11#[derive(Debug, Clone)]
12pub struct DomainServer {
13    pub url: String,
14}
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct DomainDataMetadata {
18    pub id: String,
19    pub domain_id: String,
20    pub name: String,
21    pub data_type: String,
22    pub size: u64,
23    pub created_at: String,
24    pub updated_at: String,
25}
26
27#[derive(Debug, Deserialize, Serialize)]
28pub struct DomainData {
29    // #[serde(flatten)] This doesn't work in serde_wasm_bindgen, it generates Map instead of a plain object
30    pub metadata: DomainDataMetadata,
31    pub data: Vec<u8>,
32}
33
34#[derive(Debug, Serialize, Clone, Deserialize)]
35pub struct UpdateDomainData {
36    pub id: String,
37}
38
39#[derive(Debug, Serialize, Clone, Deserialize)]
40pub struct CreateDomainData {
41    pub name: String,
42    pub data_type: String,
43}
44
45#[derive(Debug, Serialize, Deserialize)]
46#[serde(untagged)]
47pub enum DomainAction {
48    Create(CreateDomainData),
49    Update(UpdateDomainData),
50}
51
52#[derive(Debug, Serialize, Deserialize)]
53pub struct UploadDomainData {
54    #[serde(flatten)]
55    pub action: DomainAction,
56    pub data: Vec<u8>,
57}
58
59#[derive(Debug, Serialize, Deserialize)]
60pub struct DownloadQuery {
61    pub ids: Vec<String>,
62    pub name: Option<String>,
63    pub data_type: Option<String>,
64}
65
66#[derive(Debug, Deserialize)]
67struct ListDomainDataMetadata {
68    pub data: Vec<DomainDataMetadata>,
69}
70
71pub async fn download_by_id(
72    url: &str,
73    client_id: &str,
74    access_token: &str,
75    domain_id: &str,
76    id: &str,
77) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
78    let response = Client::new()
79        .get(&format!(
80            "{}/api/v1/domains/{}/data/{}?raw=true",
81            url, domain_id, id
82        ))
83        .bearer_auth(access_token)
84        .header("posemesh-client-id", client_id)
85        .send()
86        .await?;
87
88    if response.status().is_success() {
89        let data = response.bytes().await?;
90        Ok(data.to_vec())
91    } else {
92        let status = response.status();
93        let text = response
94            .text()
95            .await
96            .unwrap_or_else(|_| "Unknown error".to_string());
97        Err(format!(
98            "Failed to download data by id. Status: {} - {}",
99            status, text
100        )
101        .into())
102    }
103}
104
105pub async fn download_metadata_v1(
106    url: &str,
107    client_id: &str,
108    access_token: &str,
109    domain_id: &str,
110    query: &DownloadQuery,
111) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
112    let response = download_v1(url, client_id, access_token, domain_id, query, false).await?;
113    if response.status().is_success() {
114        let data = response.json::<ListDomainDataMetadata>().await?;
115        Ok(data.data)
116    } else {
117        let status = response.status();
118        let text = response
119            .text()
120            .await
121            .unwrap_or_else(|_| "Unknown error".to_string());
122        Err(format!("Failed to download data. Status: {} - {}", status, text).into())
123    }
124}
125
126pub async fn download_v1(
127    url: &str,
128    client_id: &str,
129    access_token: &str,
130    domain_id: &str,
131    query: &DownloadQuery,
132    with_data: bool,
133) -> Result<Response, Box<dyn std::error::Error + Send + Sync>> {
134    let mut params = HashMap::new();
135
136    if let Some(name) = &query.name {
137        params.insert("name", name.clone());
138    }
139    if let Some(data_type) = &query.data_type {
140        params.insert("data_type", data_type.clone());
141    }
142    let ids = {
143        if !query.ids.is_empty() {
144            let ids = query.ids.join(",");
145            if params.is_empty() {
146                &format!("?ids={}", ids)
147            } else {
148                &format!("?ids={}", ids)
149            }
150        } else {
151            ""
152        }
153    };
154
155    let response = Client::new()
156        .get(&format!("{}/api/v1/domains/{}/data{}", url, domain_id, ids))
157        .bearer_auth(access_token)
158        .header(
159            "Accept",
160            if with_data {
161                "multipart/form-data"
162            } else {
163                "application/json"
164            },
165        )
166        .header("posemesh-client-id", client_id)
167        .query(&params)
168        .send()
169        .await?;
170
171    if response.status().is_success() {
172        Ok(response)
173    } else {
174        let status = response.status();
175        let text = response
176            .text()
177            .await
178            .unwrap_or_else(|_| "Unknown error".to_string());
179        Err(format!("Failed to download data. Status: {} - {}", status, text).into())
180    }
181}
182
183pub async fn download_v1_stream(
184    url: &str,
185    client_id: &str,
186    access_token: &str,
187    domain_id: &str,
188    query: &DownloadQuery,
189) -> Result<
190    mpsc::Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
191    Box<dyn std::error::Error + Send + Sync>,
192> {
193    let response = download_v1(url, client_id, access_token, domain_id, query, true).await?;
194
195    let (mut tx, rx) =
196        mpsc::channel::<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>(100);
197
198    let boundary = match response
199        .headers()
200        .get("content-type")
201        .and_then(|ct| ct.to_str().ok())
202        .and_then(|ct| {
203            if ct.starts_with("multipart/form-data; boundary=") {
204                Some(ct.split("boundary=").nth(1)?.to_string())
205            } else {
206                None
207            }
208        }) {
209        Some(b) => b,
210        None => {
211            tracing::error!("Invalid content-type header");
212            let _ = tx.close().await;
213            return Err("Invalid content-type header".into());
214        }
215    };
216
217    spawn(async move {
218        let stream = response.bytes_stream();
219        handle_domain_data_stream(tx, stream, &boundary).await;
220    });
221
222    Ok(rx)
223}
224
225pub async fn delete_by_id(
226    url: &str,
227    access_token: &str,
228    domain_id: &str,
229    id: &str,
230) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
231    let endpoint = format!("{}/api/v1/domains/{}/data/{}", url, domain_id, id);
232    let client = Client::new();
233    let resp = client
234        .delete(&endpoint)
235        .bearer_auth(access_token)
236        .send()
237        .await?;
238
239    if resp.status().is_success() {
240        Ok(())
241    } else {
242        let err = resp
243            .text()
244            .await
245            .unwrap_or_else(|_| "Unknown error".to_string());
246        Err(format!("Delete failed with status: {}", err).into())
247    }
248}
249
250#[cfg(not(target_family = "wasm"))]
251pub async fn upload_v1_stream(
252    url: &str,
253    access_token: &str,
254    domain_id: &str,
255    mut rx: mpsc::Receiver<UploadDomainData>,
256) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
257    use futures::channel::oneshot;
258
259    let boundary = "boundary";
260
261    let (mut create_tx, create_rx) = mpsc::channel(100);
262    let (mut update_tx, update_rx) = mpsc::channel(100);
263
264    let create_body = Body::wrap_stream(create_rx.map(Ok::<Vec<u8>, std::io::Error>));
265    let update_body = Body::wrap_stream(update_rx.map(Ok::<Vec<u8>, std::io::Error>));
266
267    let url = url.to_string();
268    let url_2 = url.clone();
269    let access_token = access_token.to_string();
270    let domain_id = domain_id.to_string();
271    let access_token_2 = access_token.clone();
272    let domain_id_2 = domain_id.clone();
273
274    let (create_signal, create_signal_rx) = oneshot::channel::<
275        Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>>,
276    >();
277    let (update_signal, update_signal_rx) = oneshot::channel::<
278        Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>>,
279    >();
280
281    spawn(async move {
282        let create_response =
283            create_v1(&url, &access_token, &domain_id, boundary, create_body).await;
284        if let Err(Err(e)) = create_signal.send(create_response) {
285            tracing::error!("Failed to send create response: {}", e);
286        }
287    });
288
289    spawn(async move {
290        let update_response =
291            update_v1(&url_2, &access_token_2, &domain_id_2, boundary, update_body).await;
292        if let Err(Err(e)) = update_signal.send(update_response) {
293            tracing::error!("Failed to send update response: {}", e);
294        }
295    });
296
297    while let Some(datum) = rx.next().await {
298        match datum.action {
299            DomainAction::Create(create) => {
300                let create_data = write_create_body(boundary, &create, &datum.data);
301                create_tx.clone().send(create_data).await?;
302            }
303            DomainAction::Update(update) => {
304                let update_data = write_update_body(boundary, &update, &datum.data);
305                update_tx.send(update_data).await?;
306            }
307        }
308    }
309    update_tx
310        .send(format!("--{}--\r\n", boundary).as_bytes().to_vec())
311        .await?;
312    create_tx
313        .send(format!("--{}--\r\n", boundary).as_bytes().to_vec())
314        .await?;
315    update_tx.close().await?;
316    create_tx.close().await?;
317
318    let mut data = {
319        if let Ok(res) = create_signal_rx.await {
320            match res {
321                Ok(d) => d,
322                Err(e) => return Err(e),
323            }
324        } else {
325            return Err("create cancelled".into());
326        }
327    };
328
329    if let Ok(res) = update_signal_rx.await {
330        match res {
331            Ok(d) => data.extend(d),
332            Err(e) => return Err(e),
333        }
334    } else {
335        return Err("update cancelled".into());
336    }
337
338    Ok(data)
339}
340
341async fn update_v1(
342    url: &str,
343    access_token: &str,
344    domain_id: &str,
345    boundary: &str,
346    body: Body,
347) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
348    let update_response = Client::new()
349        .put(&format!("{}/api/v1/domains/{}/data", url, domain_id))
350        .bearer_auth(access_token)
351        .header(
352            "Content-Type",
353            &format!("multipart/form-data; boundary={}", boundary),
354        )
355        .body(body)
356        .send()
357        .await?;
358
359    if update_response.status().is_success() {
360        let data = update_response
361            .json::<ListDomainDataMetadata>()
362            .await
363            .unwrap();
364        Ok(data.data)
365    } else {
366        let err = update_response
367            .text()
368            .await
369            .unwrap_or_else(|_| "Unknown error".to_string());
370        Err(format!("Update failed with status: {}", err).into())
371    }
372}
373
374async fn create_v1(
375    url: &str,
376    access_token: &str,
377    domain_id: &str,
378    boundary: &str,
379    body: Body,
380) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
381    let create_response = Client::new()
382        .post(&format!("{}/api/v1/domains/{}/data", url, domain_id))
383        .bearer_auth(access_token)
384        .header(
385            "Content-Type",
386            &format!("multipart/form-data; boundary={}", boundary),
387        )
388        .body(body)
389        .send()
390        .await?;
391
392    if create_response.status().is_success() {
393        let data = create_response
394            .json::<ListDomainDataMetadata>()
395            .await
396            .unwrap();
397        Ok(data.data)
398    } else {
399        let err = create_response
400            .text()
401            .await
402            .unwrap_or_else(|_| "Unknown error".to_string());
403        Err(format!("Create failed with status: {}", err).into())
404    }
405}
406
407fn write_create_body(boundary: &str, data: &CreateDomainData, data_bytes: &[u8]) -> Vec<u8> {
408    let create_bytes = format!(
409        "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
410        boundary, data.name, data.data_type
411    );
412    let mut create_data = create_bytes.into_bytes();
413    create_data.extend_from_slice(data_bytes);
414    create_data.extend_from_slice("\r\n".as_bytes());
415    create_data
416}
417
418fn write_update_body(boundary: &str, data: &UpdateDomainData, data_bytes: &[u8]) -> Vec<u8> {
419    let update_bytes = format!(
420        "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; id=\"{}\"\r\n\r\n",
421        boundary, data.id
422    );
423    let mut update_data = update_bytes.into_bytes();
424    update_data.extend_from_slice(data_bytes);
425    update_data.extend_from_slice("\r\n".as_bytes());
426    update_data
427}
428
429pub async fn upload_v1(
430    url: &str,
431    access_token: &str,
432    domain_id: &str,
433    data: Vec<UploadDomainData>,
434) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
435    let boundary = "boundary";
436
437    let mut create_body = Vec::new();
438    let mut update_body = Vec::new();
439    let mut to_update = false;
440    let mut to_create = false;
441
442    // Process the first item to get metadata for the form
443    for datum in data {
444        match datum.action {
445            DomainAction::Create(create) => {
446                to_create = true;
447                let create_data = write_create_body(boundary, &create, &datum.data);
448                create_body.extend_from_slice(&create_data);
449            }
450            DomainAction::Update(update) => {
451                to_update = true;
452                let update_data = write_update_body(boundary, &update, &datum.data);
453                update_body.extend_from_slice(&update_data);
454            }
455        }
456    }
457
458    create_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
459    update_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
460
461    let create_body = Body::from(create_body);
462    let update_body = Body::from(update_body);
463    let mut res = Vec::new();
464
465    if to_create {
466        res = create_v1(url, access_token, domain_id, boundary, create_body).await?;
467    }
468    if to_update {
469        let update_response =
470            update_v1(url, access_token, domain_id, boundary, update_body).await?;
471        if !update_response.is_empty() {
472            res.extend(update_response);
473        }
474    }
475
476    Ok(res)
477}
478
479fn parse_headers(
480    headers_slice: &[u8],
481) -> Result<DomainData, Box<dyn std::error::Error + Send + Sync>> {
482    let headers_str = String::from_utf8_lossy(headers_slice);
483    let mut domain_data = None;
484
485    for line in headers_str.lines() {
486        if line.trim().is_empty() {
487            break;
488        }
489        if let Some((key, value)) = line.split_once(':') {
490            let key = key.trim().to_lowercase();
491            if key == "content-disposition" {
492                let mut parsed_domain_data = DomainData {
493                    metadata: DomainDataMetadata {
494                        id: String::new(),
495                        domain_id: String::new(),
496                        name: String::new(),
497                        data_type: String::new(),
498                        size: 0,
499                        created_at: String::new(),
500                        updated_at: String::new(),
501                    },
502                    data: Vec::new(),
503                };
504                for part in value.split(';') {
505                    let part = part.trim();
506                    if let Some((key, value)) = part.split_once('=') {
507                        let key = key.trim();
508                        let value = value.trim().trim_matches('"');
509                        match key {
510                            "id" => parsed_domain_data.metadata.id = value.to_string(),
511                            "domain-id" => {
512                                parsed_domain_data.metadata.domain_id = value.to_string()
513                            }
514                            "name" => parsed_domain_data.metadata.name = value.to_string(),
515                            "data-type" => {
516                                parsed_domain_data.metadata.data_type = value.to_string()
517                            }
518                            "size" => parsed_domain_data.metadata.size = value.parse()?,
519                            "created-at" => {
520                                parsed_domain_data.metadata.created_at = value.to_string()
521                            }
522                            "updated-at" => {
523                                parsed_domain_data.metadata.updated_at = value.to_string()
524                            }
525                            _ => {}
526                        }
527                    }
528                }
529                domain_data = Some(parsed_domain_data);
530            }
531        }
532    }
533
534    if let Some(domain_data) = domain_data {
535        Ok(domain_data)
536    } else {
537        Err("Missing content-disposition header".into())
538    }
539}
540
541fn find_boundary(data: &[u8], boundary: &[u8]) -> Option<usize> {
542    let _data = String::from_utf8_lossy(data);
543    let _boundary = String::from_utf8_lossy(boundary);
544    data.windows(boundary.len())
545        .position(|window| window == boundary)
546}
547
548fn find_headers_end(data: &[u8]) -> Option<usize> {
549    if let Some(i) = data.windows(4).position(|w| w == b"\r\n\r\n") {
550        Some(i + 4) // body starts after \r\n\r\n
551    } else if let Some(i) = data.windows(2).position(|w| w == b"\n\n") {
552        Some(i + 2) // body starts after \n\n
553    } else {
554        None
555    }
556}
557
558async fn handle_domain_data_stream(
559    mut tx: mpsc::Sender<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
560    stream: impl Stream<Item = Result<Bytes, reqwest::Error>>,
561    boundary: &str,
562) {
563    use futures::pin_mut;
564
565    let mut buffer = Vec::new();
566    let mut current_domain_data: Option<DomainData> = None;
567    let boundary_bytes = format!("--{}", boundary).as_bytes().to_vec();
568
569    pin_mut!(stream);
570
571    while let Some(chunk_result) = stream.next().await {
572        // Handle chunk result
573        let chunk = match chunk_result {
574            Ok(c) if c.is_empty() => {
575                tx.close().await.ok();
576                return;
577            }
578            Ok(c) => c,
579            Err(e) => {
580                let _ = tx.send(Err(e.into())).await;
581                return;
582            }
583        };
584
585        buffer.extend_from_slice(&chunk);
586
587        // If we are in the middle of reading a domain_data part, continue filling it
588        if let Some(mut domain_data) = current_domain_data.take() {
589            let expected_size = domain_data.metadata.size as usize - domain_data.data.len();
590            if buffer.len() >= expected_size {
591                domain_data.data.extend_from_slice(&buffer[..expected_size]);
592                buffer.drain(..expected_size);
593                if tx.send(Ok(domain_data)).await.is_err() {
594                    return;
595                }
596            } else {
597                domain_data.data.extend_from_slice(&buffer);
598                buffer.clear();
599                current_domain_data = Some(domain_data);
600                continue;
601            }
602        }
603
604        // Process all boundaries in the current buffer
605        loop {
606            // Find the next boundary in the buffer
607            let boundary_pos = match find_boundary(&buffer, &boundary_bytes) {
608                Some(pos) => pos,
609                None => break, // No more boundaries found in current buffer
610            };
611
612            // Look for header end after boundary
613            let header_end = match find_headers_end(&buffer[boundary_pos..]) {
614                Some(end) => end,
615                None => break, // Incomplete headers, wait for more chunks
616            };
617
618            let headers_slice = &buffer[boundary_pos..boundary_pos + header_end];
619            let part_headers = parse_headers(headers_slice);
620
621            let mut domain_data = match part_headers {
622                Ok(data) => data,
623                Err(e) => {
624                    tracing::error!("Failed to parse headers: {:?}", e);
625                    return;
626                }
627            };
628
629            // Remove processed data (boundary + headers) from buffer
630            buffer.drain(..boundary_pos + header_end);
631
632            let expected_size = domain_data.metadata.size as usize - domain_data.data.len();
633            if buffer.len() >= expected_size {
634                domain_data.data.extend_from_slice(&buffer[..expected_size]);
635                buffer.drain(..expected_size);
636                if tx.send(Ok(domain_data)).await.is_err() {
637                    return;
638                }
639            } else {
640                domain_data.data.extend_from_slice(&buffer);
641                buffer.clear();
642                current_domain_data = Some(domain_data);
643                break;
644            }
645
646            // Continue to process the next boundary in the same buffer
647        }
648    }
649}
650
651#[cfg(test)]
652mod tests {
653    use bytes::Bytes;
654
655    use crate::{auth::TokenCache, config::Config, discovery::DiscoveryService};
656
657    use super::*;
658
659    fn get_config() -> (Config, String) {
660        if std::path::Path::new("../.env.local").exists() {
661            dotenvy::from_filename("../.env.local").ok();
662            dotenvy::dotenv().ok();
663        }
664        let config = Config::from_env().unwrap();
665        (config, std::env::var("DOMAIN_ID").unwrap())
666    }
667
668    #[test]
669    fn test_find_boundary_found() {
670        let data = b"random--boundary--data";
671        let boundary = b"--boundary";
672        assert_eq!(find_boundary(data, boundary), Some(6));
673    }
674
675    #[test]
676    fn test_find_boundary_not_found() {
677        let data = b"random-data";
678        let boundary = b"--boundary";
679        assert_eq!(find_boundary(data, boundary), None);
680    }
681
682    #[test]
683    fn test_find_headers_end_crlf() {
684        let data = b"header1: value1\r\nheader2: value2\r\n\r\nbody";
685        assert_eq!(find_headers_end(data), Some(36));
686    }
687
688    #[test]
689    fn test_find_headers_end_lf() {
690        let data = b"header1: value1\nheader2: value2\n\nbody";
691        assert_eq!(find_headers_end(data), Some(33));
692    }
693
694    #[test]
695    fn test_find_headers_end_none() {
696        let data = b"header1: value1\nheader2: value2\nbody";
697        assert_eq!(find_headers_end(data), None);
698    }
699
700    #[test]
701    fn test_parse_headers_success() {
702        let headers = b"content-disposition: form-data; id=\"123\"; domain-id=\"abc\"; name=\"test\"; data-type=\"type\"; size=\"42\"; created-at=\"2024-01-01T00:00:00Z\"; updated-at=\"2024-01-02T00:00:00Z\"\r\n\r\n";
703        let parsed = super::parse_headers(headers);
704        assert!(parsed.is_ok());
705        let domain_data = parsed.unwrap();
706        assert_eq!(domain_data.metadata.id, "123");
707        assert_eq!(domain_data.metadata.domain_id, "abc");
708        assert_eq!(domain_data.metadata.name, "test");
709        assert_eq!(domain_data.metadata.data_type, "type");
710        assert_eq!(domain_data.metadata.size, 42);
711        assert_eq!(domain_data.metadata.created_at, "2024-01-01T00:00:00Z");
712        assert_eq!(domain_data.metadata.updated_at, "2024-01-02T00:00:00Z");
713    }
714
715    #[test]
716    fn test_parse_headers_missing_content_disposition() {
717        let headers = b"content-type: application/octet-stream\r\n\r\n";
718        let parsed = super::parse_headers(headers);
719        assert!(parsed.is_err());
720    }
721
722    #[tokio::test]
723    async fn test_a_chunk_contains_multiple_data() {
724        let (tx, rx) = mpsc::channel(10);
725
726        let payload = br#"
727        --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
728Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
729Content-Type: application/octet-stream
730
731{"test": "test"}
732--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
733Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
734Content-Type: application/octet-stream
735
736{"test": "test updated"}
737--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
738        "#;
739        let stream = tokio_stream::iter(vec![Ok(Bytes::from_static(payload))]);
740
741        handle_domain_data_stream(
742            tx,
743            stream,
744            "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
745        )
746        .await;
747
748        let output: Vec<DomainData> = rx
749            .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
750            .await
751            .into_iter()
752            .map(|r| r.unwrap())
753            .collect();
754        assert_eq!(output.len(), 2);
755        assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
756        assert_eq!(output[0].data, b"{\"test\": \"test\"}");
757    }
758
759    #[tokio::test]
760    async fn test_chunk_size_is_smaller_than_part() {
761        let (tx, rx) = mpsc::channel(10);
762
763        let payload = br#"
764        --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
765Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
766Content-Type: application/octet-stream
767        "#;
768        let payload2 = br#"
769
770{"test": "test"}
771--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
772Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
773Content-Type: application/octet-stream
774
775{"test": "test updated"}
776--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
777"#;
778        let stream = tokio_stream::iter(vec![
779            Ok(Bytes::from_static(payload)),
780            Ok(Bytes::from_static(payload2)),
781        ]);
782
783        handle_domain_data_stream(
784            tx,
785            stream,
786            "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
787        )
788        .await;
789
790        let output: Vec<DomainData> = rx
791            .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
792            .await
793            .into_iter()
794            .map(|r| r.unwrap())
795            .collect();
796        assert_eq!(output.len(), 2);
797        assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
798        assert_eq!(output[0].data, b"{\"test\": \"test\"}");
799    }
800
801    #[tokio::test]
802    async fn test_chunk_size_is_smaller_than_header() {
803        let (tx, rx) = mpsc::channel(10);
804
805        let payload = br#"
806        --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
807Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
808Content-Type: application/octet-stream
809        "#;
810        let payload2 = br#"
811e: application/octet-stream
812
813{"test": "test"}
814--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
815Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
816Content-Type: application/octet-stream
817
818{"test": "test updated"}
819--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
820"#;
821        let stream = tokio_stream::iter(vec![
822            Ok(Bytes::from_static(payload)),
823            Ok(Bytes::from_static(payload2)),
824        ]);
825
826        handle_domain_data_stream(
827            tx,
828            stream,
829            "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
830        )
831        .await;
832
833        let output: Vec<DomainData> = rx
834            .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
835            .await
836            .into_iter()
837            .map(|r| r.unwrap())
838            .collect();
839        assert_eq!(output.len(), 2);
840        assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
841        assert_eq!(output[0].data, b"{\"test\": \"test\"}");
842    }
843
844    #[tokio::test]
845    async fn test_chunk_size_doesnt_cover_the_whole_data() {
846        let (tx, rx) = mpsc::channel(10);
847
848        let payload = br#"
849        --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
850Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
851Content-Type: application/octet-stream
852
853{"test": "test"#;
854        let payload2 = br#""}
855--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
856Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
857Content-Type: application/octet-stream
858
859{"test": "test updated"}
860--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
861"#;
862        let stream = tokio_stream::iter(vec![
863            Ok(Bytes::from_static(payload)),
864            Ok(Bytes::from_static(payload2)),
865        ]);
866
867        handle_domain_data_stream(
868            tx,
869            stream,
870            "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
871        )
872        .await;
873
874        let output: Vec<DomainData> = rx
875            .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
876            .await
877            .into_iter()
878            .map(|r| r.unwrap())
879            .collect();
880        assert_eq!(output.len(), 2);
881        assert_eq!(
882            std::str::from_utf8(&output[1].data).unwrap(),
883            "{\"test\": \"test updated\"}"
884        );
885        assert_eq!(
886            std::str::from_utf8(&output[0].data).unwrap(),
887            "{\"test\": \"test\"}"
888        );
889    }
890
891    #[tokio::test]
892    async fn test_upload_v1_with_user_dds_access_token() {
893        use crate::domain_data::{CreateDomainData, DomainAction, UploadDomainData};
894
895        let (config, domain_id) = get_config();
896
897        let mut discovery =
898            DiscoveryService::new(&config.api_url, &config.dds_url, &config.client_id);
899        discovery
900            .sign_in_with_auki_account(&config.email.unwrap(), &config.password.unwrap(), false)
901            .await
902            .expect("sign_in_with_auki_account failed");
903        let domain = discovery
904            .auth_domain(&domain_id)
905            .await
906            .expect("get_domain failed");
907        // 4. Prepare upload data
908        let upload_data = vec![
909            UploadDomainData {
910                action: DomainAction::Create(CreateDomainData {
911                    name: "test_upload".to_string(),
912                    data_type: "test".to_string(),
913                }),
914                data: b"hello world".to_vec(),
915            },
916            UploadDomainData {
917                action: DomainAction::Update(UpdateDomainData {
918                    id: "a84a36e5-312b-4f80-974a-06f5d19c1e16".to_string(),
919                }),
920                data: b"{\"test\": \"test updated\"}".to_vec(),
921            },
922        ];
923
924        // 5. Call upload_v1
925        let result = upload_v1(
926            &domain.domain.domain_server.url,
927            &domain.get_access_token(),
928            &domain_id,
929            upload_data,
930        )
931        .await
932        .expect("upload_v1 failed");
933
934        assert_eq!(result.len(), 2, "No metadata returned from upload_v1");
935        for data in result {
936            if data.id != "a84a36e5-312b-4f80-974a-06f5d19c1e16" {
937                assert_eq!(data.name, "test_upload");
938                delete_by_id(
939                    &domain.domain.domain_server.url,
940                    &domain.get_access_token(),
941                    &domain_id,
942                    &data.id,
943                )
944                .await
945                .expect("delete_by_id failed");
946            }
947        }
948    }
949}