posemesh_domain_http/
domain_data.rs

1use bytes::Bytes;
2use futures::{SinkExt, Stream, channel::mpsc, stream::StreamExt};
3use reqwest::{Body, Client, Response, StatusCode};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6#[cfg(not(target_family = "wasm"))]
7use tokio::spawn;
8#[cfg(target_family = "wasm")]
9use wasm_bindgen_futures::spawn_local as spawn;
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct DomainDataMetadata {
13    pub id: String,
14    pub domain_id: String,
15    pub name: String,
16    pub data_type: String,
17    pub size: u64,
18    pub created_at: String,
19    pub updated_at: String,
20}
21
22#[derive(Debug, Deserialize, Serialize)]
23pub struct DomainData {
24    // #[serde(flatten)] This doesn't work in serde_wasm_bindgen, it generates Map instead of a plain object
25    pub metadata: DomainDataMetadata,
26    pub data: Vec<u8>,
27}
28
29#[derive(Debug, Serialize, Clone, Deserialize)]
30pub struct UpdateDomainData {
31    pub id: String,
32}
33
34#[derive(Debug, Serialize, Clone, Deserialize)]
35pub struct CreateDomainData {
36    pub name: String,
37    pub data_type: String,
38}
39
40#[derive(Debug, Serialize, Deserialize)]
41#[serde(untagged)]
42pub enum DomainAction {
43    Create(CreateDomainData),
44    Update(UpdateDomainData),
45}
46
47#[derive(Debug, Serialize, Deserialize)]
48pub struct UploadDomainData {
49    #[serde(flatten)]
50    pub action: DomainAction,
51    pub data: Vec<u8>,
52}
53
54#[derive(Debug, Serialize, Deserialize)]
55pub struct DownloadQuery {
56    pub ids: Vec<String>,
57    pub name: Option<String>,
58    pub data_type: Option<String>,
59}
60
61#[derive(Debug, Deserialize)]
62struct ListDomainDataMetadata {
63    pub data: Vec<DomainDataMetadata>,
64}
65
66pub async fn download_by_id(
67    url: &str,
68    client_id: &str,
69    access_token: &str,
70    domain_id: &str,
71    id: &str,
72) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
73    let response = Client::new()
74        .get(format!(
75            "{}/api/v1/domains/{}/data/{}?raw=true",
76            url, domain_id, id
77        ))
78        .bearer_auth(access_token)
79        .header("posemesh-client-id", client_id)
80        .send()
81        .await?;
82
83    if response.status().is_success() {
84        let data = response.bytes().await?;
85        Ok(data.to_vec())
86    } else {
87        let status = response.status();
88        let text = response
89            .text()
90            .await
91            .unwrap_or_else(|_| "Unknown error".to_string());
92        Err(format!(
93            "Failed to download data by id. Status: {} - {}",
94            status, text
95        )
96        .into())
97    }
98}
99
100/// Perform a direct absolute download request to the domain server.
101/// Sets headers: Accept: multipart/form-data, Authorization: Bearer <token>, posemesh-client-id.
102pub async fn request_download_absolute(
103    url: &str,
104    client_id: &str,
105    access_token: &str,
106) -> Result<Response, Box<dyn std::error::Error + Send + Sync>> {
107    let response = Client::new()
108        .get(url)
109        .bearer_auth(access_token)
110        .header("posemesh-client-id", client_id)
111        .header("Accept", "multipart/form-data")
112        .send()
113        .await?;
114    Ok(response)
115}
116
117pub async fn download_metadata_v1(
118    url: &str,
119    client_id: &str,
120    access_token: &str,
121    domain_id: &str,
122    query: &DownloadQuery,
123) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
124    let response = download_v1(url, client_id, access_token, domain_id, query, false).await?;
125    if response.status().is_success() {
126        let data = response.json::<ListDomainDataMetadata>().await?;
127        Ok(data.data)
128    } else {
129        let status = response.status();
130        let text = response
131            .text()
132            .await
133            .unwrap_or_else(|_| "Unknown error".to_string());
134        Err(format!("Failed to download data. Status: {} - {}", status, text).into())
135    }
136}
137
138pub async fn download_v1(
139    url: &str,
140    client_id: &str,
141    access_token: &str,
142    domain_id: &str,
143    query: &DownloadQuery,
144    with_data: bool,
145) -> Result<Response, Box<dyn std::error::Error + Send + Sync>> {
146    let mut params = HashMap::new();
147
148    if let Some(name) = &query.name {
149        params.insert("name", name.clone());
150    }
151    if let Some(data_type) = &query.data_type {
152        params.insert("data_type", data_type.clone());
153    }
154    let ids = if !query.ids.is_empty() {
155        format!("?ids={}", query.ids.join(","))
156    } else {
157        String::new()
158    };
159
160    let response = Client::new()
161        .get(format!("{}/api/v1/domains/{}/data{}", url, domain_id, ids))
162        .bearer_auth(access_token)
163        .header(
164            "Accept",
165            if with_data {
166                "multipart/form-data"
167            } else {
168                "application/json"
169            },
170        )
171        .header("posemesh-client-id", client_id)
172        .query(&params)
173        .send()
174        .await?;
175
176    if response.status().is_success() {
177        Ok(response)
178    } else {
179        let status = response.status();
180        let text = response
181            .text()
182            .await
183            .unwrap_or_else(|_| "Unknown error".to_string());
184        Err(format!("Failed to download data. Status: {} - {}", status, text).into())
185    }
186}
187
188pub async fn download_v1_stream(
189    url: &str,
190    client_id: &str,
191    access_token: &str,
192    domain_id: &str,
193    query: &DownloadQuery,
194) -> Result<
195    mpsc::Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
196    Box<dyn std::error::Error + Send + Sync>,
197> {
198    let response = download_v1(url, client_id, access_token, domain_id, query, true).await?;
199
200    stream_from_response(response).await
201}
202
203/// Build a stream from an HTTP response returned by the domain download endpoint.
204/// Parses multipart boundaries and yields DomainData items as they arrive.
205pub async fn stream_from_response(
206    response: Response,
207) -> Result<
208    mpsc::Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
209    Box<dyn std::error::Error + Send + Sync>,
210> {
211    let (mut tx, rx) =
212        mpsc::channel::<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>(100);
213
214    let boundary = match response
215        .headers()
216        .get("content-type")
217        .and_then(|ct| ct.to_str().ok())
218        .and_then(|ct| {
219            if ct.starts_with("multipart/form-data; boundary=") {
220                Some(ct.split("boundary=").nth(1)?.to_string())
221            } else {
222                None
223            }
224        }) {
225        Some(b) => b,
226        None => {
227            tracing::error!("Invalid content-type header");
228            let _ = tx.close().await;
229            return Err("Invalid content-type header".into());
230        }
231    };
232
233    spawn(async move {
234        let stream = response.bytes_stream();
235        handle_domain_data_stream(tx, stream, &boundary).await;
236    });
237
238    Ok(rx)
239}
240
241pub async fn delete_by_id(
242    url: &str,
243    access_token: &str,
244    domain_id: &str,
245    id: &str,
246) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
247    let endpoint = format!("{}/api/v1/domains/{}/data/{}", url, domain_id, id);
248    let client = Client::new();
249    let resp = client
250        .delete(&endpoint)
251        .bearer_auth(access_token)
252        .send()
253        .await?;
254
255    if resp.status().is_success() {
256        Ok(())
257    } else {
258        let err = resp
259            .text()
260            .await
261            .unwrap_or_else(|_| "Unknown error".to_string());
262        Err(format!("Delete failed with status: {}", err).into())
263    }
264}
265
266#[cfg(not(target_family = "wasm"))]
267pub async fn upload_v1_stream(
268    url: &str,
269    access_token: &str,
270    domain_id: &str,
271    mut rx: mpsc::Receiver<UploadDomainData>,
272) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
273    use futures::channel::oneshot;
274
275    let boundary = "boundary";
276
277    let (mut create_tx, create_rx) = mpsc::channel(100);
278    let (mut update_tx, update_rx) = mpsc::channel(100);
279
280    let create_body = Body::wrap_stream(create_rx.map(Ok::<Vec<u8>, std::io::Error>));
281    let update_body = Body::wrap_stream(update_rx.map(Ok::<Vec<u8>, std::io::Error>));
282
283    let url = url.to_string();
284    let url_2 = url.clone();
285    let access_token = access_token.to_string();
286    let domain_id = domain_id.to_string();
287    let access_token_2 = access_token.clone();
288    let domain_id_2 = domain_id.clone();
289
290    let (create_signal, create_signal_rx) = oneshot::channel::<
291        Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>>,
292    >();
293    let (update_signal, update_signal_rx) = oneshot::channel::<
294        Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>>,
295    >();
296
297    spawn(async move {
298        let create_response =
299            create_v1(&url, &access_token, &domain_id, boundary, create_body).await;
300        if let Err(Err(e)) = create_signal.send(create_response) {
301            tracing::error!("Failed to send create response: {}", e);
302        }
303    });
304
305    spawn(async move {
306        let update_response =
307            update_v1(&url_2, &access_token_2, &domain_id_2, boundary, update_body).await;
308        if let Err(Err(e)) = update_signal.send(update_response) {
309            tracing::error!("Failed to send update response: {}", e);
310        }
311    });
312
313    while let Some(datum) = rx.next().await {
314        match datum.action {
315            DomainAction::Create(create) => {
316                let create_data = write_create_body(boundary, &create, &datum.data);
317                create_tx.clone().send(create_data).await?;
318            }
319            DomainAction::Update(update) => {
320                let update_data = write_update_body(boundary, &update, &datum.data);
321                update_tx.send(update_data).await?;
322            }
323        }
324    }
325    update_tx
326        .send(format!("--{}--\r\n", boundary).as_bytes().to_vec())
327        .await?;
328    create_tx
329        .send(format!("--{}--\r\n", boundary).as_bytes().to_vec())
330        .await?;
331    update_tx.close().await?;
332    create_tx.close().await?;
333
334    let mut data = {
335        if let Ok(res) = create_signal_rx.await {
336            match res {
337                Ok(d) => d,
338                Err(e) => return Err(e),
339            }
340        } else {
341            return Err("create cancelled".into());
342        }
343    };
344
345    if let Ok(res) = update_signal_rx.await {
346        match res {
347            Ok(d) => data.extend(d),
348            Err(e) => return Err(e),
349        }
350    } else {
351        return Err("update cancelled".into());
352    }
353
354    Ok(data)
355}
356
357async fn update_v1(
358    url: &str,
359    access_token: &str,
360    domain_id: &str,
361    boundary: &str,
362    body: Body,
363) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
364    let update_response = Client::new()
365        .put(format!("{}/api/v1/domains/{}/data", url, domain_id))
366        .bearer_auth(access_token)
367        .header(
368            "Content-Type",
369            &format!("multipart/form-data; boundary={}", boundary),
370        )
371        .body(body)
372        .send()
373        .await?;
374
375    if update_response.status().is_success() {
376        let data = update_response
377            .json::<ListDomainDataMetadata>()
378            .await
379            .unwrap();
380        Ok(data.data)
381    } else {
382        let err = update_response
383            .text()
384            .await
385            .unwrap_or_else(|_| "Unknown error".to_string());
386        Err(format!("Update failed with status: {}", err).into())
387    }
388}
389
390async fn create_v1(
391    url: &str,
392    access_token: &str,
393    domain_id: &str,
394    boundary: &str,
395    body: Body,
396) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
397    let create_response = Client::new()
398        .post(format!("{}/api/v1/domains/{}/data", url, domain_id))
399        .bearer_auth(access_token)
400        .header(
401            "Content-Type",
402            &format!("multipart/form-data; boundary={}", boundary),
403        )
404        .body(body)
405        .send()
406        .await?;
407
408    if create_response.status().is_success() {
409        let data = create_response
410            .json::<ListDomainDataMetadata>()
411            .await
412            .unwrap();
413        Ok(data.data)
414    } else {
415        let err = create_response
416            .text()
417            .await
418            .unwrap_or_else(|_| "Unknown error".to_string());
419        Err(format!("Create failed with status: {}", err).into())
420    }
421}
422
423fn write_create_body(boundary: &str, data: &CreateDomainData, data_bytes: &[u8]) -> Vec<u8> {
424    let create_bytes = format!(
425        "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
426        boundary, data.name, data.data_type
427    );
428    let mut create_data = create_bytes.into_bytes();
429    create_data.extend_from_slice(data_bytes);
430    create_data.extend_from_slice("\r\n".as_bytes());
431    create_data
432}
433
434fn write_update_body(boundary: &str, data: &UpdateDomainData, data_bytes: &[u8]) -> Vec<u8> {
435    let update_bytes = format!(
436        "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; id=\"{}\"\r\n\r\n",
437        boundary, data.id
438    );
439    let mut update_data = update_bytes.into_bytes();
440    update_data.extend_from_slice(data_bytes);
441    update_data.extend_from_slice("\r\n".as_bytes());
442    update_data
443}
444
445pub async fn upload_v1(
446    url: &str,
447    access_token: &str,
448    domain_id: &str,
449    data: Vec<UploadDomainData>,
450) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
451    let boundary = "boundary";
452
453    let mut create_body = Vec::new();
454    let mut update_body = Vec::new();
455    let mut to_update = false;
456    let mut to_create = false;
457
458    // Process the first item to get metadata for the form
459    for datum in data {
460        match datum.action {
461            DomainAction::Create(create) => {
462                to_create = true;
463                let create_data = write_create_body(boundary, &create, &datum.data);
464                create_body.extend_from_slice(&create_data);
465            }
466            DomainAction::Update(update) => {
467                to_update = true;
468                let update_data = write_update_body(boundary, &update, &datum.data);
469                update_body.extend_from_slice(&update_data);
470            }
471        }
472    }
473
474    create_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
475    update_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
476
477    let create_body = Body::from(create_body);
478    let update_body = Body::from(update_body);
479    let mut res = Vec::new();
480
481    if to_create {
482        res = create_v1(url, access_token, domain_id, boundary, create_body).await?;
483    }
484    if to_update {
485        let update_response =
486            update_v1(url, access_token, domain_id, boundary, update_body).await?;
487        if !update_response.is_empty() {
488            res.extend(update_response);
489        }
490    }
491
492    Ok(res)
493}
494
495/// Upload a single domain data item (create or update) as one HTTP request.
496/// Returns a list of created/updated DomainData entries (with empty data payloads).
497#[cfg(not(target_family = "wasm"))]
498pub async fn upload_one(
499    url: &str,
500    access_token: &str,
501    domain_id: &str,
502    data: UploadDomainData,
503) -> Result<Vec<DomainData>, (StatusCode, String)> {
504    let boundary = "boundary";
505    let (method, body) = match &data.action {
506        DomainAction::Create(create) => {
507            let bytes = write_create_body(boundary, create, &data.data);
508            (reqwest::Method::POST, Body::from(bytes))
509        }
510        DomainAction::Update(update) => {
511            let bytes = write_update_body(boundary, update, &data.data);
512            (reqwest::Method::PUT, Body::from(bytes))
513        }
514    };
515
516    let endpoint = format!("{}/api/v1/domains/{}/data", url, domain_id);
517    let response = Client::new()
518        .request(method, endpoint)
519        .bearer_auth(access_token)
520        .header(
521            "Content-Type",
522            &format!("multipart/form-data; boundary={}", boundary),
523        )
524        .body(body)
525        .send()
526        .await
527        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
528
529    if response.status().is_success() {
530        // Be tolerant to varying response shapes (full metadata or id-only)
531        let text = response
532            .text()
533            .await
534            .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
535        // Try strict metadata first
536        if let Ok(md) = serde_json::from_str::<ListDomainDataMetadata>(&text) {
537            let items: Vec<DomainData> = md
538                .data
539                .into_iter()
540                .map(|m| DomainData {
541                    metadata: m,
542                    data: Vec::new(),
543                })
544                .collect();
545            return Ok(items);
546        }
547        // Fallback: extract minimal fields from generic JSON
548        let v: serde_json::Value = serde_json::from_str(&text)
549            .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
550        let mut out: Vec<DomainData> = Vec::new();
551        if let Some(arr) = v.get("data").and_then(|d| d.as_array()) {
552            for item in arr {
553                let id = item.get("id").and_then(|x| x.as_str()).unwrap_or("").to_string();
554                let domain_id = item
555                    .get("domain_id")
556                    .and_then(|x| x.as_str())
557                    .unwrap_or("")
558                    .to_string();
559                let name = item
560                    .get("name")
561                    .and_then(|x| x.as_str())
562                    .unwrap_or("")
563                    .to_string();
564                let data_type = item
565                    .get("data_type")
566                    .and_then(|x| x.as_str())
567                    .unwrap_or("")
568                    .to_string();
569                out.push(DomainData {
570                    metadata: DomainDataMetadata {
571                        id,
572                        domain_id,
573                        name,
574                        data_type,
575                        size: 0,
576                        created_at: String::new(),
577                        updated_at: String::new(),
578                    },
579                    data: Vec::new(),
580                });
581            }
582            return Ok(out);
583        }
584        Err((StatusCode::INTERNAL_SERVER_ERROR, "invalid response".to_string()))
585    } else {
586        let status = response.status();
587        let text = response
588            .text()
589            .await
590            .unwrap_or_else(|_| "Unknown error".to_string());
591        Err((status, text))
592    }
593}
594
595fn parse_headers(
596    headers_slice: &[u8],
597) -> Result<DomainData, Box<dyn std::error::Error + Send + Sync>> {
598    let headers_str = String::from_utf8_lossy(headers_slice);
599    let mut domain_data = None;
600
601    for line in headers_str.lines() {
602        if line.trim().is_empty() {
603            break;
604        }
605        if let Some((key, value)) = line.split_once(':') {
606            let key = key.trim().to_lowercase();
607            if key == "content-disposition" {
608                let mut parsed_domain_data = DomainData {
609                    metadata: DomainDataMetadata {
610                        id: String::new(),
611                        domain_id: String::new(),
612                        name: String::new(),
613                        data_type: String::new(),
614                        size: 0,
615                        created_at: String::new(),
616                        updated_at: String::new(),
617                    },
618                    data: Vec::new(),
619                };
620                for part in value.split(';') {
621                    let part = part.trim();
622                    if let Some((key, value)) = part.split_once('=') {
623                        let key = key.trim();
624                        let value = value.trim().trim_matches('"');
625                        match key {
626                            "id" => parsed_domain_data.metadata.id = value.to_string(),
627                            "domain-id" => {
628                                parsed_domain_data.metadata.domain_id = value.to_string()
629                            }
630                            "name" => parsed_domain_data.metadata.name = value.to_string(),
631                            "data-type" => {
632                                parsed_domain_data.metadata.data_type = value.to_string()
633                            }
634                            "size" => parsed_domain_data.metadata.size = value.parse()?,
635                            "created-at" => {
636                                parsed_domain_data.metadata.created_at = value.to_string()
637                            }
638                            "updated-at" => {
639                                parsed_domain_data.metadata.updated_at = value.to_string()
640                            }
641                            _ => {}
642                        }
643                    }
644                }
645                domain_data = Some(parsed_domain_data);
646            }
647        }
648    }
649
650    if let Some(domain_data) = domain_data {
651        Ok(domain_data)
652    } else {
653        Err("Missing content-disposition header".into())
654    }
655}
656
657fn find_boundary(data: &[u8], boundary: &[u8]) -> Option<usize> {
658    let _data = String::from_utf8_lossy(data);
659    let _boundary = String::from_utf8_lossy(boundary);
660    data.windows(boundary.len())
661        .position(|window| window == boundary)
662}
663
664fn find_headers_end(data: &[u8]) -> Option<usize> {
665    if let Some(i) = data.windows(4).position(|w| w == b"\r\n\r\n") {
666        return Some(i + 4);
667    }
668    data.windows(2)
669        .position(|w| w == b"\n\n")
670        .map(|i| i + 2)
671}
672
673async fn handle_domain_data_stream(
674    mut tx: mpsc::Sender<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
675    stream: impl Stream<Item = Result<Bytes, reqwest::Error>>,
676    boundary: &str,
677) {
678    use futures::pin_mut;
679
680    let mut buffer = Vec::new();
681    let mut current_domain_data: Option<DomainData> = None;
682    let boundary_bytes = format!("--{}", boundary).into_bytes();
683    let keep_tail = boundary_bytes.len() + 4; // bytes to keep for boundary detection across chunks
684
685    pin_mut!(stream);
686
687    while let Some(chunk_result) = stream.next().await {
688        let chunk = match chunk_result {
689            Ok(c) if c.is_empty() => continue,
690            Ok(c) => c,
691            Err(e) => {
692                let _ = tx.send(Err(e.into())).await;
693                return;
694            }
695        };
696
697        buffer.extend_from_slice(&chunk);
698
699        'consume: loop {
700            match &mut current_domain_data {
701                None => {
702                    let Some(boundary_pos) = find_boundary(&buffer, &boundary_bytes) else {
703                        if buffer.len() > keep_tail {
704                            buffer.drain(..buffer.len() - keep_tail);
705                        }
706                        break 'consume;
707                    };
708                    let Some(header_end_rel) = find_headers_end(&buffer[boundary_pos..]) else {
709                        break 'consume;
710                    };
711                    let headers_slice = &buffer[boundary_pos..boundary_pos + header_end_rel];
712                    let part_headers = parse_headers(headers_slice);
713                    let domain_data = match part_headers {
714                        Ok(d) => d,
715                        Err(e) => {
716                            tracing::error!("Failed to parse headers: {:?}", e);
717                            return;
718                        }
719                    };
720                    buffer.drain(..boundary_pos + header_end_rel);
721                    current_domain_data = Some(domain_data);
722                }
723                Some(dd) => {
724                    if let Some(next_boundary_pos) = find_boundary(&buffer, &boundary_bytes) {
725                        let mut data_end = next_boundary_pos;
726                        if data_end >= 2 && &buffer[data_end - 2..data_end] == b"\r\n" {
727                            data_end -= 2;
728                        } else if data_end >= 1 && buffer[data_end - 1] == b'\n' {
729                            data_end -= 1;
730                        }
731                        dd.data.extend_from_slice(&buffer[..data_end]);
732                        buffer.drain(..next_boundary_pos);
733                        let finished = current_domain_data.take().unwrap();
734                        if tx.send(Ok(finished)).await.is_err() {
735                            return;
736                        }
737                    } else {
738                        if buffer.len() > keep_tail {
739                            let take = buffer.len() - keep_tail;
740                            dd.data.extend_from_slice(&buffer[..take]);
741                            buffer.drain(..take);
742                        }
743                        break 'consume;
744                    }
745                }
746            }
747        }
748    }
749
750    let _ = tx.close().await;
751}
752
753#[cfg(test)]
754mod tests {
755    use bytes::Bytes;
756
757    use crate::{auth::TokenCache, config::Config, discovery::DiscoveryService};
758
759    use super::*;
760
761    fn get_config() -> (Config, String) {
762        if std::path::Path::new("../.env.local").exists() {
763            dotenvy::from_filename("../.env.local").ok();
764            dotenvy::dotenv().ok();
765        }
766        let config = Config::from_env().unwrap();
767        (config, std::env::var("DOMAIN_ID").unwrap())
768    }
769
770    #[test]
771    fn test_find_boundary_found() {
772        let data = b"random--boundary--data";
773        let boundary = b"--boundary";
774        assert_eq!(find_boundary(data, boundary), Some(6));
775    }
776
777    #[test]
778    fn test_find_boundary_not_found() {
779        let data = b"random-data";
780        let boundary = b"--boundary";
781        assert_eq!(find_boundary(data, boundary), None);
782    }
783
784    #[test]
785    fn test_find_headers_end_crlf() {
786        let data = b"header1: value1\r\nheader2: value2\r\n\r\nbody";
787        assert_eq!(find_headers_end(data), Some(36));
788    }
789
790    #[test]
791    fn test_find_headers_end_lf() {
792        let data = b"header1: value1\nheader2: value2\n\nbody";
793        assert_eq!(find_headers_end(data), Some(33));
794    }
795
796    #[test]
797    fn test_find_headers_end_none() {
798        let data = b"header1: value1\nheader2: value2\nbody";
799        assert_eq!(find_headers_end(data), None);
800    }
801
802    #[test]
803    fn test_parse_headers_success() {
804        let headers = b"content-disposition: form-data; id=\"123\"; domain-id=\"abc\"; name=\"test\"; data-type=\"type\"; size=\"42\"; created-at=\"2024-01-01T00:00:00Z\"; updated-at=\"2024-01-02T00:00:00Z\"\r\n\r\n";
805        let parsed = super::parse_headers(headers);
806        assert!(parsed.is_ok());
807        let domain_data = parsed.unwrap();
808        assert_eq!(domain_data.metadata.id, "123");
809        assert_eq!(domain_data.metadata.domain_id, "abc");
810        assert_eq!(domain_data.metadata.name, "test");
811        assert_eq!(domain_data.metadata.data_type, "type");
812        assert_eq!(domain_data.metadata.size, 42);
813        assert_eq!(domain_data.metadata.created_at, "2024-01-01T00:00:00Z");
814        assert_eq!(domain_data.metadata.updated_at, "2024-01-02T00:00:00Z");
815    }
816
817    #[test]
818    fn test_parse_headers_missing_content_disposition() {
819        let headers = b"content-type: application/octet-stream\r\n\r\n";
820        let parsed = super::parse_headers(headers);
821        assert!(parsed.is_err());
822    }
823
824    #[tokio::test]
825    async fn test_a_chunk_contains_multiple_data() {
826        let (tx, rx) = mpsc::channel(10);
827
828        let payload = br#"
829        --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
830Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
831Content-Type: application/octet-stream
832
833{"test": "test"}
834--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
835Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
836Content-Type: application/octet-stream
837
838{"test": "test updated"}
839--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
840        "#;
841        let stream = tokio_stream::iter(vec![Ok(Bytes::from_static(payload))]);
842
843        handle_domain_data_stream(
844            tx,
845            stream,
846            "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
847        )
848        .await;
849
850        let output: Vec<DomainData> = rx
851            .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
852            .await
853            .into_iter()
854            .map(|r| r.unwrap())
855            .collect();
856        assert_eq!(output.len(), 2);
857        assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
858        assert_eq!(output[0].data, b"{\"test\": \"test\"}");
859    }
860
861    #[tokio::test]
862    async fn test_chunk_size_is_smaller_than_part() {
863        let (tx, rx) = mpsc::channel(10);
864
865        let payload = br#"
866        --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
867Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
868Content-Type: application/octet-stream
869        "#;
870        let payload2 = br#"
871
872{"test": "test"}
873--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
874Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
875Content-Type: application/octet-stream
876
877{"test": "test updated"}
878--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
879"#;
880        let stream = tokio_stream::iter(vec![
881            Ok(Bytes::from_static(payload)),
882            Ok(Bytes::from_static(payload2)),
883        ]);
884
885        handle_domain_data_stream(
886            tx,
887            stream,
888            "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
889        )
890        .await;
891
892        let output: Vec<DomainData> = rx
893            .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
894            .await
895            .into_iter()
896            .map(|r| r.unwrap())
897            .collect();
898        assert_eq!(output.len(), 2);
899        assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
900        assert_eq!(output[0].data, b"{\"test\": \"test\"}");
901    }
902
903    #[tokio::test]
904    async fn test_chunk_size_is_smaller_than_header() {
905        let (tx, rx) = mpsc::channel(10);
906
907        let payload = br#"
908        --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
909Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
910Content-Type: application/octet-stream
911        "#;
912        let payload2 = br#"
913e: application/octet-stream
914
915{"test": "test"}
916--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
917Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
918Content-Type: application/octet-stream
919
920{"test": "test updated"}
921--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
922"#;
923        let stream = tokio_stream::iter(vec![
924            Ok(Bytes::from_static(payload)),
925            Ok(Bytes::from_static(payload2)),
926        ]);
927
928        handle_domain_data_stream(
929            tx,
930            stream,
931            "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
932        )
933        .await;
934
935        let output: Vec<DomainData> = rx
936            .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
937            .await
938            .into_iter()
939            .map(|r| r.unwrap())
940            .collect();
941        assert_eq!(output.len(), 2);
942        assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
943        assert_eq!(output[0].data, b"{\"test\": \"test\"}");
944    }
945
946    #[tokio::test]
947    async fn test_chunk_size_doesnt_cover_the_whole_data() {
948        let (tx, rx) = mpsc::channel(10);
949
950        let payload = br#"
951        --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
952Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
953Content-Type: application/octet-stream
954
955{"test": "test"#;
956        let payload2 = br#""}
957--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
958Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
959Content-Type: application/octet-stream
960
961{"test": "test updated"}
962--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
963"#;
964        let stream = tokio_stream::iter(vec![
965            Ok(Bytes::from_static(payload)),
966            Ok(Bytes::from_static(payload2)),
967        ]);
968
969        handle_domain_data_stream(
970            tx,
971            stream,
972            "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
973        )
974        .await;
975
976        let output: Vec<DomainData> = rx
977            .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
978            .await
979            .into_iter()
980            .map(|r| r.unwrap())
981            .collect();
982        assert_eq!(output.len(), 2);
983        assert_eq!(
984            std::str::from_utf8(&output[1].data).unwrap(),
985            "{\"test\": \"test updated\"}"
986        );
987        assert_eq!(
988            std::str::from_utf8(&output[0].data).unwrap(),
989            "{\"test\": \"test\"}"
990        );
991    }
992
993    #[tokio::test]
994    async fn test_upload_v1_with_user_dds_access_token() {
995        use crate::domain_data::{CreateDomainData, DomainAction, UploadDomainData};
996
997        let (config, domain_id) = get_config();
998
999        let mut discovery =
1000            DiscoveryService::new(&config.api_url, &config.dds_url, &config.client_id);
1001        discovery
1002            .sign_in_with_auki_account(&config.email.unwrap(), &config.password.unwrap(), false)
1003            .await
1004            .expect("sign_in_with_auki_account failed");
1005        let domain = discovery
1006            .auth_domain(&domain_id)
1007            .await
1008            .expect("get_domain failed");
1009        // 4. Prepare upload data
1010        let upload_data = vec![
1011            UploadDomainData {
1012                action: DomainAction::Create(CreateDomainData {
1013                    name: "test_upload".to_string(),
1014                    data_type: "test".to_string(),
1015                }),
1016                data: b"hello world".to_vec(),
1017            },
1018            UploadDomainData {
1019                action: DomainAction::Update(UpdateDomainData {
1020                    id: "a84a36e5-312b-4f80-974a-06f5d19c1e16".to_string(),
1021                }),
1022                data: b"{\"test\": \"test updated\"}".to_vec(),
1023            },
1024        ];
1025
1026        // 5. Call upload_v1
1027        let result = upload_v1(
1028            &domain.domain.domain_server.url,
1029            &domain.get_access_token(),
1030            &domain_id,
1031            upload_data,
1032        )
1033        .await
1034        .expect("upload_v1 failed");
1035
1036        assert_eq!(result.len(), 2, "No metadata returned from upload_v1");
1037        for data in result {
1038            if data.id != "a84a36e5-312b-4f80-974a-06f5d19c1e16" {
1039                assert_eq!(data.name, "test_upload");
1040                delete_by_id(
1041                    &domain.domain.domain_server.url,
1042                    &domain.get_access_token(),
1043                    &domain_id,
1044                    &data.id,
1045                )
1046                .await
1047                .expect("delete_by_id failed");
1048            }
1049        }
1050    }
1051}