posemesh_domain_http/
domain_data.rs

1use bytes::Bytes;
2use futures::{SinkExt, Stream, channel::mpsc, stream::StreamExt};
3use reqwest::{Body, Client, Response};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6#[cfg(not(target_family = "wasm"))]
7use tokio::spawn;
8#[cfg(target_family = "wasm")]
9use wasm_bindgen_futures::spawn_local as spawn;
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct DomainDataMetadata {
13    pub id: String,
14    pub domain_id: String,
15    pub name: String,
16    pub data_type: String,
17    pub size: u64,
18    pub created_at: String,
19    pub updated_at: String,
20}
21
22#[derive(Debug, Deserialize, Serialize)]
23pub struct DomainData {
24    // #[serde(flatten)] This doesn't work in serde_wasm_bindgen, it generates Map instead of a plain object
25    pub metadata: DomainDataMetadata,
26    pub data: Vec<u8>,
27}
28
29#[derive(Debug, Serialize, Clone, Deserialize)]
30pub struct UpdateDomainData {
31    pub id: String,
32}
33
34#[derive(Debug, Serialize, Clone, Deserialize)]
35pub struct CreateDomainData {
36    pub name: String,
37    pub data_type: String,
38}
39
40#[derive(Debug, Serialize, Deserialize)]
41#[serde(untagged)]
42pub enum DomainAction {
43    Create(CreateDomainData),
44    Update(UpdateDomainData),
45}
46
47#[derive(Debug, Serialize, Deserialize)]
48pub struct UploadDomainData {
49    #[serde(flatten)]
50    pub action: DomainAction,
51    pub data: Vec<u8>,
52}
53
54#[derive(Debug, Serialize, Deserialize)]
55pub struct DownloadQuery {
56    pub ids: Vec<String>,
57    pub name: Option<String>,
58    pub data_type: Option<String>,
59}
60
61#[derive(Debug, Deserialize)]
62struct ListDomainDataMetadata {
63    pub data: Vec<DomainDataMetadata>,
64}
65
66pub async fn download_by_id(
67    url: &str,
68    client_id: &str,
69    access_token: &str,
70    domain_id: &str,
71    id: &str,
72) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
73    let response = Client::new()
74        .get(&format!(
75            "{}/api/v1/domains/{}/data/{}?raw=true",
76            url, domain_id, id
77        ))
78        .bearer_auth(access_token)
79        .header("posemesh-client-id", client_id)
80        .send()
81        .await?;
82
83    if response.status().is_success() {
84        let data = response.bytes().await?;
85        Ok(data.to_vec())
86    } else {
87        let status = response.status();
88        let text = response
89            .text()
90            .await
91            .unwrap_or_else(|_| "Unknown error".to_string());
92        Err(format!(
93            "Failed to download data by id. Status: {} - {}",
94            status, text
95        )
96        .into())
97    }
98}
99
100pub async fn download_metadata_v1(
101    url: &str,
102    client_id: &str,
103    access_token: &str,
104    domain_id: &str,
105    query: &DownloadQuery,
106) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
107    let response = download_v1(url, client_id, access_token, domain_id, query, false).await?;
108    if response.status().is_success() {
109        let data = response.json::<ListDomainDataMetadata>().await?;
110        Ok(data.data)
111    } else {
112        let status = response.status();
113        let text = response
114            .text()
115            .await
116            .unwrap_or_else(|_| "Unknown error".to_string());
117        Err(format!("Failed to download data. Status: {} - {}", status, text).into())
118    }
119}
120
121pub async fn download_v1(
122    url: &str,
123    client_id: &str,
124    access_token: &str,
125    domain_id: &str,
126    query: &DownloadQuery,
127    with_data: bool,
128) -> Result<Response, Box<dyn std::error::Error + Send + Sync>> {
129    let mut params = HashMap::new();
130
131    if let Some(name) = &query.name {
132        params.insert("name", name.clone());
133    }
134    if let Some(data_type) = &query.data_type {
135        params.insert("data_type", data_type.clone());
136    }
137    let ids = {
138        if !query.ids.is_empty() {
139            let ids = query.ids.join(",");
140            if params.is_empty() {
141                &format!("?ids={}", ids)
142            } else {
143                &format!("?ids={}", ids)
144            }
145        } else {
146            ""
147        }
148    };
149
150    let response = Client::new()
151        .get(&format!("{}/api/v1/domains/{}/data{}", url, domain_id, ids))
152        .bearer_auth(access_token)
153        .header(
154            "Accept",
155            if with_data {
156                "multipart/form-data"
157            } else {
158                "application/json"
159            },
160        )
161        .header("posemesh-client-id", client_id)
162        .query(&params)
163        .send()
164        .await?;
165
166    if response.status().is_success() {
167        Ok(response)
168    } else {
169        let status = response.status();
170        let text = response
171            .text()
172            .await
173            .unwrap_or_else(|_| "Unknown error".to_string());
174        Err(format!("Failed to download data. Status: {} - {}", status, text).into())
175    }
176}
177
178pub async fn download_v1_stream(
179    url: &str,
180    client_id: &str,
181    access_token: &str,
182    domain_id: &str,
183    query: &DownloadQuery,
184) -> Result<
185    mpsc::Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
186    Box<dyn std::error::Error + Send + Sync>,
187> {
188    let response = download_v1(url, client_id, access_token, domain_id, query, true).await?;
189
190    let (mut tx, rx) =
191        mpsc::channel::<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>(100);
192
193    let boundary = match response
194        .headers()
195        .get("content-type")
196        .and_then(|ct| ct.to_str().ok())
197        .and_then(|ct| {
198            if ct.starts_with("multipart/form-data; boundary=") {
199                Some(ct.split("boundary=").nth(1)?.to_string())
200            } else {
201                None
202            }
203        }) {
204        Some(b) => b,
205        None => {
206            tracing::error!("Invalid content-type header");
207            let _ = tx.close().await;
208            return Err("Invalid content-type header".into());
209        }
210    };
211
212    spawn(async move {
213        let stream = response.bytes_stream();
214        handle_domain_data_stream(tx, stream, &boundary).await;
215    });
216
217    Ok(rx)
218}
219
220pub async fn delete_by_id(
221    url: &str,
222    access_token: &str,
223    domain_id: &str,
224    id: &str,
225) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
226    let endpoint = format!("{}/api/v1/domains/{}/data/{}", url, domain_id, id);
227    let client = Client::new();
228    let resp = client
229        .delete(&endpoint)
230        .bearer_auth(access_token)
231        .send()
232        .await?;
233
234    if resp.status().is_success() {
235        Ok(())
236    } else {
237        let err = resp
238            .text()
239            .await
240            .unwrap_or_else(|_| "Unknown error".to_string());
241        Err(format!("Delete failed with status: {}", err).into())
242    }
243}
244
245#[cfg(not(target_family = "wasm"))]
246pub async fn upload_v1_stream(
247    url: &str,
248    access_token: &str,
249    domain_id: &str,
250    mut rx: mpsc::Receiver<UploadDomainData>,
251) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
252    use futures::channel::oneshot;
253
254    let boundary = "boundary";
255
256    let (mut create_tx, create_rx) = mpsc::channel(100);
257    let (mut update_tx, update_rx) = mpsc::channel(100);
258
259    let create_body = Body::wrap_stream(create_rx.map(Ok::<Vec<u8>, std::io::Error>));
260    let update_body = Body::wrap_stream(update_rx.map(Ok::<Vec<u8>, std::io::Error>));
261
262    let url = url.to_string();
263    let url_2 = url.clone();
264    let access_token = access_token.to_string();
265    let domain_id = domain_id.to_string();
266    let access_token_2 = access_token.clone();
267    let domain_id_2 = domain_id.clone();
268
269    let (create_signal, create_signal_rx) = oneshot::channel::<
270        Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>>,
271    >();
272    let (update_signal, update_signal_rx) = oneshot::channel::<
273        Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>>,
274    >();
275
276    spawn(async move {
277        let create_response =
278            create_v1(&url, &access_token, &domain_id, boundary, create_body).await;
279        if let Err(Err(e)) = create_signal.send(create_response) {
280            tracing::error!("Failed to send create response: {}", e);
281        }
282    });
283
284    spawn(async move {
285        let update_response =
286            update_v1(&url_2, &access_token_2, &domain_id_2, boundary, update_body).await;
287        if let Err(Err(e)) = update_signal.send(update_response) {
288            tracing::error!("Failed to send update response: {}", e);
289        }
290    });
291
292    while let Some(datum) = rx.next().await {
293        match datum.action {
294            DomainAction::Create(create) => {
295                let create_data = write_create_body(boundary, &create, &datum.data);
296                create_tx.clone().send(create_data).await?;
297            }
298            DomainAction::Update(update) => {
299                let update_data = write_update_body(boundary, &update, &datum.data);
300                update_tx.send(update_data).await?;
301            }
302        }
303    }
304    update_tx
305        .send(format!("--{}--\r\n", boundary).as_bytes().to_vec())
306        .await?;
307    create_tx
308        .send(format!("--{}--\r\n", boundary).as_bytes().to_vec())
309        .await?;
310    update_tx.close().await?;
311    create_tx.close().await?;
312
313    let mut data = {
314        if let Ok(res) = create_signal_rx.await {
315            match res {
316                Ok(d) => d,
317                Err(e) => return Err(e),
318            }
319        } else {
320            return Err("create cancelled".into());
321        }
322    };
323
324    if let Ok(res) = update_signal_rx.await {
325        match res {
326            Ok(d) => data.extend(d),
327            Err(e) => return Err(e),
328        }
329    } else {
330        return Err("update cancelled".into());
331    }
332
333    Ok(data)
334}
335
336async fn update_v1(
337    url: &str,
338    access_token: &str,
339    domain_id: &str,
340    boundary: &str,
341    body: Body,
342) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
343    let update_response = Client::new()
344        .put(&format!("{}/api/v1/domains/{}/data", url, domain_id))
345        .bearer_auth(access_token)
346        .header(
347            "Content-Type",
348            &format!("multipart/form-data; boundary={}", boundary),
349        )
350        .body(body)
351        .send()
352        .await?;
353
354    if update_response.status().is_success() {
355        let data = update_response
356            .json::<ListDomainDataMetadata>()
357            .await
358            .unwrap();
359        Ok(data.data)
360    } else {
361        let err = update_response
362            .text()
363            .await
364            .unwrap_or_else(|_| "Unknown error".to_string());
365        Err(format!("Update failed with status: {}", err).into())
366    }
367}
368
369async fn create_v1(
370    url: &str,
371    access_token: &str,
372    domain_id: &str,
373    boundary: &str,
374    body: Body,
375) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
376    let create_response = Client::new()
377        .post(&format!("{}/api/v1/domains/{}/data", url, domain_id))
378        .bearer_auth(access_token)
379        .header(
380            "Content-Type",
381            &format!("multipart/form-data; boundary={}", boundary),
382        )
383        .body(body)
384        .send()
385        .await?;
386
387    if create_response.status().is_success() {
388        let data = create_response
389            .json::<ListDomainDataMetadata>()
390            .await
391            .unwrap();
392        Ok(data.data)
393    } else {
394        let err = create_response
395            .text()
396            .await
397            .unwrap_or_else(|_| "Unknown error".to_string());
398        Err(format!("Create failed with status: {}", err).into())
399    }
400}
401
402fn write_create_body(boundary: &str, data: &CreateDomainData, data_bytes: &[u8]) -> Vec<u8> {
403    let create_bytes = format!(
404        "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
405        boundary, data.name, data.data_type
406    );
407    let mut create_data = create_bytes.into_bytes();
408    create_data.extend_from_slice(data_bytes);
409    create_data.extend_from_slice("\r\n".as_bytes());
410    create_data
411}
412
413fn write_update_body(boundary: &str, data: &UpdateDomainData, data_bytes: &[u8]) -> Vec<u8> {
414    let update_bytes = format!(
415        "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; id=\"{}\"\r\n\r\n",
416        boundary, data.id
417    );
418    let mut update_data = update_bytes.into_bytes();
419    update_data.extend_from_slice(data_bytes);
420    update_data.extend_from_slice("\r\n".as_bytes());
421    update_data
422}
423
424pub async fn upload_v1(
425    url: &str,
426    access_token: &str,
427    domain_id: &str,
428    data: Vec<UploadDomainData>,
429) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
430    let boundary = "boundary";
431
432    let mut create_body = Vec::new();
433    let mut update_body = Vec::new();
434    let mut to_update = false;
435    let mut to_create = false;
436
437    // Process the first item to get metadata for the form
438    for datum in data {
439        match datum.action {
440            DomainAction::Create(create) => {
441                to_create = true;
442                let create_data = write_create_body(boundary, &create, &datum.data);
443                create_body.extend_from_slice(&create_data);
444            }
445            DomainAction::Update(update) => {
446                to_update = true;
447                let update_data = write_update_body(boundary, &update, &datum.data);
448                update_body.extend_from_slice(&update_data);
449            }
450        }
451    }
452
453    create_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
454    update_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
455
456    let create_body = Body::from(create_body);
457    let update_body = Body::from(update_body);
458    let mut res = Vec::new();
459
460    if to_create {
461        res = create_v1(url, access_token, domain_id, boundary, create_body).await?;
462    }
463    if to_update {
464        let update_response =
465            update_v1(url, access_token, domain_id, boundary, update_body).await?;
466        if !update_response.is_empty() {
467            res.extend(update_response);
468        }
469    }
470
471    Ok(res)
472}
473
474fn parse_headers(
475    headers_slice: &[u8],
476) -> Result<DomainData, Box<dyn std::error::Error + Send + Sync>> {
477    let headers_str = String::from_utf8_lossy(headers_slice);
478    let mut domain_data = None;
479
480    for line in headers_str.lines() {
481        if line.trim().is_empty() {
482            break;
483        }
484        if let Some((key, value)) = line.split_once(':') {
485            let key = key.trim().to_lowercase();
486            if key == "content-disposition" {
487                let mut parsed_domain_data = DomainData {
488                    metadata: DomainDataMetadata {
489                        id: String::new(),
490                        domain_id: String::new(),
491                        name: String::new(),
492                        data_type: String::new(),
493                        size: 0,
494                        created_at: String::new(),
495                        updated_at: String::new(),
496                    },
497                    data: Vec::new(),
498                };
499                for part in value.split(';') {
500                    let part = part.trim();
501                    if let Some((key, value)) = part.split_once('=') {
502                        let key = key.trim();
503                        let value = value.trim().trim_matches('"');
504                        match key {
505                            "id" => parsed_domain_data.metadata.id = value.to_string(),
506                            "domain-id" => {
507                                parsed_domain_data.metadata.domain_id = value.to_string()
508                            }
509                            "name" => parsed_domain_data.metadata.name = value.to_string(),
510                            "data-type" => {
511                                parsed_domain_data.metadata.data_type = value.to_string()
512                            }
513                            "size" => parsed_domain_data.metadata.size = value.parse()?,
514                            "created-at" => {
515                                parsed_domain_data.metadata.created_at = value.to_string()
516                            }
517                            "updated-at" => {
518                                parsed_domain_data.metadata.updated_at = value.to_string()
519                            }
520                            _ => {}
521                        }
522                    }
523                }
524                domain_data = Some(parsed_domain_data);
525            }
526        }
527    }
528
529    if let Some(domain_data) = domain_data {
530        Ok(domain_data)
531    } else {
532        Err("Missing content-disposition header".into())
533    }
534}
535
536fn find_boundary(data: &[u8], boundary: &[u8]) -> Option<usize> {
537    let _data = String::from_utf8_lossy(data);
538    let _boundary = String::from_utf8_lossy(boundary);
539    data.windows(boundary.len())
540        .position(|window| window == boundary)
541}
542
543fn find_headers_end(data: &[u8]) -> Option<usize> {
544    if let Some(i) = data.windows(4).position(|w| w == b"\r\n\r\n") {
545        Some(i + 4) // body starts after \r\n\r\n
546    } else if let Some(i) = data.windows(2).position(|w| w == b"\n\n") {
547        Some(i + 2) // body starts after \n\n
548    } else {
549        None
550    }
551}
552
553async fn handle_domain_data_stream(
554    mut tx: mpsc::Sender<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
555    stream: impl Stream<Item = Result<Bytes, reqwest::Error>>,
556    boundary: &str,
557) {
558    use futures::pin_mut;
559
560    let mut buffer = Vec::new();
561    let mut current_domain_data: Option<DomainData> = None;
562    let boundary_bytes = format!("--{}", boundary).as_bytes().to_vec();
563
564    pin_mut!(stream);
565
566    while let Some(chunk_result) = stream.next().await {
567        // Handle chunk result
568        let chunk = match chunk_result {
569            Ok(c) if c.is_empty() => {
570                tx.close().await.ok();
571                return;
572            }
573            Ok(c) => c,
574            Err(e) => {
575                let _ = tx.send(Err(e.into())).await;
576                return;
577            }
578        };
579
580        buffer.extend_from_slice(&chunk);
581
582        // If we are in the middle of reading a domain_data part, continue filling it
583        if let Some(mut domain_data) = current_domain_data.take() {
584            let expected_size = domain_data.metadata.size as usize - domain_data.data.len();
585            if buffer.len() >= expected_size {
586                domain_data.data.extend_from_slice(&buffer[..expected_size]);
587                buffer.drain(..expected_size);
588                if tx.send(Ok(domain_data)).await.is_err() {
589                    return;
590                }
591            } else {
592                domain_data.data.extend_from_slice(&buffer);
593                buffer.clear();
594                current_domain_data = Some(domain_data);
595                continue;
596            }
597        }
598
599        // Process all boundaries in the current buffer
600        loop {
601            // Find the next boundary in the buffer
602            let boundary_pos = match find_boundary(&buffer, &boundary_bytes) {
603                Some(pos) => pos,
604                None => break, // No more boundaries found in current buffer
605            };
606
607            // Look for header end after boundary
608            let header_end = match find_headers_end(&buffer[boundary_pos..]) {
609                Some(end) => end,
610                None => break, // Incomplete headers, wait for more chunks
611            };
612
613            let headers_slice = &buffer[boundary_pos..boundary_pos + header_end];
614            let part_headers = parse_headers(headers_slice);
615
616            let mut domain_data = match part_headers {
617                Ok(data) => data,
618                Err(e) => {
619                    tracing::error!("Failed to parse headers: {:?}", e);
620                    return;
621                }
622            };
623
624            // Remove processed data (boundary + headers) from buffer
625            buffer.drain(..boundary_pos + header_end);
626
627            let expected_size = domain_data.metadata.size as usize - domain_data.data.len();
628            if buffer.len() >= expected_size {
629                domain_data.data.extend_from_slice(&buffer[..expected_size]);
630                buffer.drain(..expected_size);
631                if tx.send(Ok(domain_data)).await.is_err() {
632                    return;
633                }
634            } else {
635                domain_data.data.extend_from_slice(&buffer);
636                buffer.clear();
637                current_domain_data = Some(domain_data);
638                break;
639            }
640
641            // Continue to process the next boundary in the same buffer
642        }
643    }
644}
645
646#[cfg(test)]
647mod tests {
648    use bytes::Bytes;
649
650    use crate::{auth::TokenCache, config::Config, discovery::DiscoveryService};
651
652    use super::*;
653
654    fn get_config() -> (Config, String) {
655        if std::path::Path::new("../.env.local").exists() {
656            dotenvy::from_filename("../.env.local").ok();
657            dotenvy::dotenv().ok();
658        }
659        let config = Config::from_env().unwrap();
660        (config, std::env::var("DOMAIN_ID").unwrap())
661    }
662
663    #[test]
664    fn test_find_boundary_found() {
665        let data = b"random--boundary--data";
666        let boundary = b"--boundary";
667        assert_eq!(find_boundary(data, boundary), Some(6));
668    }
669
670    #[test]
671    fn test_find_boundary_not_found() {
672        let data = b"random-data";
673        let boundary = b"--boundary";
674        assert_eq!(find_boundary(data, boundary), None);
675    }
676
677    #[test]
678    fn test_find_headers_end_crlf() {
679        let data = b"header1: value1\r\nheader2: value2\r\n\r\nbody";
680        assert_eq!(find_headers_end(data), Some(36));
681    }
682
683    #[test]
684    fn test_find_headers_end_lf() {
685        let data = b"header1: value1\nheader2: value2\n\nbody";
686        assert_eq!(find_headers_end(data), Some(33));
687    }
688
689    #[test]
690    fn test_find_headers_end_none() {
691        let data = b"header1: value1\nheader2: value2\nbody";
692        assert_eq!(find_headers_end(data), None);
693    }
694
695    #[test]
696    fn test_parse_headers_success() {
697        let headers = b"content-disposition: form-data; id=\"123\"; domain-id=\"abc\"; name=\"test\"; data-type=\"type\"; size=\"42\"; created-at=\"2024-01-01T00:00:00Z\"; updated-at=\"2024-01-02T00:00:00Z\"\r\n\r\n";
698        let parsed = super::parse_headers(headers);
699        assert!(parsed.is_ok());
700        let domain_data = parsed.unwrap();
701        assert_eq!(domain_data.metadata.id, "123");
702        assert_eq!(domain_data.metadata.domain_id, "abc");
703        assert_eq!(domain_data.metadata.name, "test");
704        assert_eq!(domain_data.metadata.data_type, "type");
705        assert_eq!(domain_data.metadata.size, 42);
706        assert_eq!(domain_data.metadata.created_at, "2024-01-01T00:00:00Z");
707        assert_eq!(domain_data.metadata.updated_at, "2024-01-02T00:00:00Z");
708    }
709
710    #[test]
711    fn test_parse_headers_missing_content_disposition() {
712        let headers = b"content-type: application/octet-stream\r\n\r\n";
713        let parsed = super::parse_headers(headers);
714        assert!(parsed.is_err());
715    }
716
717    #[tokio::test]
718    async fn test_a_chunk_contains_multiple_data() {
719        let (tx, rx) = mpsc::channel(10);
720
721        let payload = br#"
722        --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
723Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
724Content-Type: application/octet-stream
725
726{"test": "test"}
727--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
728Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
729Content-Type: application/octet-stream
730
731{"test": "test updated"}
732--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
733        "#;
734        let stream = tokio_stream::iter(vec![Ok(Bytes::from_static(payload))]);
735
736        handle_domain_data_stream(
737            tx,
738            stream,
739            "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
740        )
741        .await;
742
743        let output: Vec<DomainData> = rx
744            .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
745            .await
746            .into_iter()
747            .map(|r| r.unwrap())
748            .collect();
749        assert_eq!(output.len(), 2);
750        assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
751        assert_eq!(output[0].data, b"{\"test\": \"test\"}");
752    }
753
754    #[tokio::test]
755    async fn test_chunk_size_is_smaller_than_part() {
756        let (tx, rx) = mpsc::channel(10);
757
758        let payload = br#"
759        --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
760Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
761Content-Type: application/octet-stream
762        "#;
763        let payload2 = br#"
764
765{"test": "test"}
766--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
767Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
768Content-Type: application/octet-stream
769
770{"test": "test updated"}
771--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
772"#;
773        let stream = tokio_stream::iter(vec![
774            Ok(Bytes::from_static(payload)),
775            Ok(Bytes::from_static(payload2)),
776        ]);
777
778        handle_domain_data_stream(
779            tx,
780            stream,
781            "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
782        )
783        .await;
784
785        let output: Vec<DomainData> = rx
786            .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
787            .await
788            .into_iter()
789            .map(|r| r.unwrap())
790            .collect();
791        assert_eq!(output.len(), 2);
792        assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
793        assert_eq!(output[0].data, b"{\"test\": \"test\"}");
794    }
795
796    #[tokio::test]
797    async fn test_chunk_size_is_smaller_than_header() {
798        let (tx, rx) = mpsc::channel(10);
799
800        let payload = br#"
801        --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
802Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
803Content-Type: application/octet-stream
804        "#;
805        let payload2 = br#"
806e: application/octet-stream
807
808{"test": "test"}
809--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
810Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
811Content-Type: application/octet-stream
812
813{"test": "test updated"}
814--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
815"#;
816        let stream = tokio_stream::iter(vec![
817            Ok(Bytes::from_static(payload)),
818            Ok(Bytes::from_static(payload2)),
819        ]);
820
821        handle_domain_data_stream(
822            tx,
823            stream,
824            "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
825        )
826        .await;
827
828        let output: Vec<DomainData> = rx
829            .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
830            .await
831            .into_iter()
832            .map(|r| r.unwrap())
833            .collect();
834        assert_eq!(output.len(), 2);
835        assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
836        assert_eq!(output[0].data, b"{\"test\": \"test\"}");
837    }
838
839    #[tokio::test]
840    async fn test_chunk_size_doesnt_cover_the_whole_data() {
841        let (tx, rx) = mpsc::channel(10);
842
843        let payload = br#"
844        --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
845Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
846Content-Type: application/octet-stream
847
848{"test": "test"#;
849        let payload2 = br#""}
850--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
851Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
852Content-Type: application/octet-stream
853
854{"test": "test updated"}
855--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
856"#;
857        let stream = tokio_stream::iter(vec![
858            Ok(Bytes::from_static(payload)),
859            Ok(Bytes::from_static(payload2)),
860        ]);
861
862        handle_domain_data_stream(
863            tx,
864            stream,
865            "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
866        )
867        .await;
868
869        let output: Vec<DomainData> = rx
870            .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
871            .await
872            .into_iter()
873            .map(|r| r.unwrap())
874            .collect();
875        assert_eq!(output.len(), 2);
876        assert_eq!(
877            std::str::from_utf8(&output[1].data).unwrap(),
878            "{\"test\": \"test updated\"}"
879        );
880        assert_eq!(
881            std::str::from_utf8(&output[0].data).unwrap(),
882            "{\"test\": \"test\"}"
883        );
884    }
885
886    #[tokio::test]
887    async fn test_upload_v1_with_user_dds_access_token() {
888        use crate::domain_data::{CreateDomainData, DomainAction, UploadDomainData};
889
890        let (config, domain_id) = get_config();
891
892        let mut discovery =
893            DiscoveryService::new(&config.api_url, &config.dds_url, &config.client_id);
894        discovery
895            .sign_in_with_auki_account(&config.email.unwrap(), &config.password.unwrap(), false)
896            .await
897            .expect("sign_in_with_auki_account failed");
898        let domain = discovery
899            .auth_domain(&domain_id)
900            .await
901            .expect("get_domain failed");
902        // 4. Prepare upload data
903        let upload_data = vec![
904            UploadDomainData {
905                action: DomainAction::Create(CreateDomainData {
906                    name: "test_upload".to_string(),
907                    data_type: "test".to_string(),
908                }),
909                data: b"hello world".to_vec(),
910            },
911            UploadDomainData {
912                action: DomainAction::Update(UpdateDomainData {
913                    id: "a84a36e5-312b-4f80-974a-06f5d19c1e16".to_string(),
914                }),
915                data: b"{\"test\": \"test updated\"}".to_vec(),
916            },
917        ];
918
919        // 5. Call upload_v1
920        let result = upload_v1(
921            &domain.domain.domain_server.url,
922            &domain.get_access_token(),
923            &domain_id,
924            upload_data,
925        )
926        .await
927        .expect("upload_v1 failed");
928
929        assert_eq!(result.len(), 2, "No metadata returned from upload_v1");
930        for data in result {
931            if data.id != "a84a36e5-312b-4f80-974a-06f5d19c1e16" {
932                assert_eq!(data.name, "test_upload");
933                delete_by_id(
934                    &domain.domain.domain_server.url,
935                    &domain.get_access_token(),
936                    &domain_id,
937                    &data.id,
938                )
939                .await
940                .expect("delete_by_id failed");
941            }
942        }
943    }
944}