posemesh_domain_http/
domain_data.rs

1use bytes::Bytes;
2use futures::{SinkExt, Stream, channel::mpsc, stream::StreamExt};
3use reqwest::{Body, Client, Response, StatusCode};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6#[cfg(not(target_family = "wasm"))]
7use tokio::spawn;
8#[cfg(target_family = "wasm")]
9use wasm_bindgen_futures::spawn_local as spawn;
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct DomainDataMetadata {
13    pub id: String,
14    pub domain_id: String,
15    pub name: String,
16    pub data_type: String,
17    pub size: u64,
18    pub created_at: String,
19    pub updated_at: String,
20}
21
22#[derive(Debug, Deserialize, Serialize)]
23pub struct DomainData {
24    // #[serde(flatten)] This doesn't work in serde_wasm_bindgen, it generates Map instead of a plain object
25    pub metadata: DomainDataMetadata,
26    pub data: Vec<u8>,
27}
28
29#[derive(Debug, Serialize, Clone, Deserialize)]
30pub struct UpdateDomainData {
31    pub id: String,
32}
33
34#[derive(Debug, Serialize, Clone, Deserialize)]
35pub struct CreateDomainData {
36    pub name: String,
37    pub data_type: String,
38}
39
40#[derive(Debug, Serialize, Deserialize)]
41#[serde(untagged)]
42pub enum DomainAction {
43    Create(CreateDomainData),
44    Update(UpdateDomainData),
45}
46
47#[derive(Debug, Serialize, Deserialize)]
48pub struct UploadDomainData {
49    #[serde(flatten)]
50    pub action: DomainAction,
51    pub data: Vec<u8>,
52}
53
54#[derive(Debug, Serialize, Deserialize)]
55pub struct DownloadQuery {
56    pub ids: Vec<String>,
57    pub name: Option<String>,
58    pub data_type: Option<String>,
59}
60
61#[derive(Debug, Deserialize)]
62struct ListDomainDataMetadata {
63    pub data: Vec<DomainDataMetadata>,
64}
65
66pub async fn download_by_id(
67    url: &str,
68    client_id: &str,
69    access_token: &str,
70    domain_id: &str,
71    id: &str,
72) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
73    let response = Client::new()
74        .get(format!(
75            "{}/api/v1/domains/{}/data/{}?raw=true",
76            url, domain_id, id
77        ))
78        .bearer_auth(access_token)
79        .header("posemesh-client-id", client_id)
80        .send()
81        .await?;
82
83    if response.status().is_success() {
84        let data = response.bytes().await?;
85        Ok(data.to_vec())
86    } else {
87        let status = response.status();
88        let text = response
89            .text()
90            .await
91            .unwrap_or_else(|_| "Unknown error".to_string());
92        Err(format!(
93            "Failed to download data by id. Status: {} - {}",
94            status, text
95        )
96        .into())
97    }
98}
99
100/// Perform a direct absolute download request to the domain server.
101/// Sets headers: Accept: multipart/form-data, Authorization: Bearer <token>, posemesh-client-id.
102pub async fn request_download_absolute(
103    url: &str,
104    client_id: &str,
105    access_token: &str,
106) -> Result<Response, Box<dyn std::error::Error + Send + Sync>> {
107    let response = Client::new()
108        .get(url)
109        .bearer_auth(access_token)
110        .header("posemesh-client-id", client_id)
111        .header("Accept", "multipart/form-data")
112        .send()
113        .await?;
114    Ok(response)
115}
116
117pub async fn download_metadata_v1(
118    url: &str,
119    client_id: &str,
120    access_token: &str,
121    domain_id: &str,
122    query: &DownloadQuery,
123) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
124    let response = download_v1(url, client_id, access_token, domain_id, query, false).await?;
125    if response.status().is_success() {
126        let data = response.json::<ListDomainDataMetadata>().await?;
127        Ok(data.data)
128    } else {
129        let status = response.status();
130        let text = response
131            .text()
132            .await
133            .unwrap_or_else(|_| "Unknown error".to_string());
134        Err(format!("Failed to download data. Status: {} - {}", status, text).into())
135    }
136}
137
138pub async fn download_v1(
139    url: &str,
140    client_id: &str,
141    access_token: &str,
142    domain_id: &str,
143    query: &DownloadQuery,
144    with_data: bool,
145) -> Result<Response, Box<dyn std::error::Error + Send + Sync>> {
146    let mut params = HashMap::new();
147
148    if let Some(name) = &query.name {
149        params.insert("name", name.clone());
150    }
151    if let Some(data_type) = &query.data_type {
152        params.insert("data_type", data_type.clone());
153    }
154    let ids = if !query.ids.is_empty() {
155        format!("?ids={}", query.ids.join(","))
156    } else {
157        String::new()
158    };
159
160    let response = Client::new()
161        .get(format!("{}/api/v1/domains/{}/data{}", url, domain_id, ids))
162        .bearer_auth(access_token)
163        .header(
164            "Accept",
165            if with_data {
166                "multipart/form-data"
167            } else {
168                "application/json"
169            },
170        )
171        .header("posemesh-client-id", client_id)
172        .query(&params)
173        .send()
174        .await?;
175
176    if response.status().is_success() {
177        Ok(response)
178    } else {
179        let status = response.status();
180        let text = response
181            .text()
182            .await
183            .unwrap_or_else(|_| "Unknown error".to_string());
184        Err(format!("Failed to download data. Status: {} - {}", status, text).into())
185    }
186}
187
188pub async fn download_v1_stream(
189    url: &str,
190    client_id: &str,
191    access_token: &str,
192    domain_id: &str,
193    query: &DownloadQuery,
194) -> Result<
195    mpsc::Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
196    Box<dyn std::error::Error + Send + Sync>,
197> {
198    let response = download_v1(url, client_id, access_token, domain_id, query, true).await?;
199
200    stream_from_response(response).await
201}
202
203/// Build a stream from an HTTP response returned by the domain download endpoint.
204/// Parses multipart boundaries and yields DomainData items as they arrive.
205pub async fn stream_from_response(
206    response: Response,
207) -> Result<
208    mpsc::Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
209    Box<dyn std::error::Error + Send + Sync>,
210> {
211    let (mut tx, rx) =
212        mpsc::channel::<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>(100);
213
214    let boundary = match response
215        .headers()
216        .get("content-type")
217        .and_then(|ct| ct.to_str().ok())
218        .and_then(|ct| {
219            if ct.starts_with("multipart/form-data; boundary=") {
220                Some(ct.split("boundary=").nth(1)?.to_string())
221            } else {
222                None
223            }
224        }) {
225        Some(b) => b,
226        None => {
227            tracing::error!("Invalid content-type header");
228            let _ = tx.close().await;
229            return Err("Invalid content-type header".into());
230        }
231    };
232
233    spawn(async move {
234        let stream = response.bytes_stream();
235        handle_domain_data_stream(tx, stream, &boundary).await;
236    });
237
238    Ok(rx)
239}
240
241pub async fn delete_by_id(
242    url: &str,
243    access_token: &str,
244    domain_id: &str,
245    id: &str,
246) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
247    let endpoint = format!("{}/api/v1/domains/{}/data/{}", url, domain_id, id);
248    let client = Client::new();
249    let resp = client
250        .delete(&endpoint)
251        .bearer_auth(access_token)
252        .send()
253        .await?;
254
255    if resp.status().is_success() {
256        Ok(())
257    } else {
258        let err = resp
259            .text()
260            .await
261            .unwrap_or_else(|_| "Unknown error".to_string());
262        Err(format!("Delete failed with status: {}", err).into())
263    }
264}
265
266#[cfg(not(target_family = "wasm"))]
267pub async fn upload_v1_stream(
268    url: &str,
269    access_token: &str,
270    domain_id: &str,
271    mut rx: mpsc::Receiver<UploadDomainData>,
272) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
273    use futures::channel::oneshot;
274
275    let boundary = "boundary";
276
277    let (mut create_tx, create_rx) = mpsc::channel(100);
278    let (mut update_tx, update_rx) = mpsc::channel(100);
279
280    let create_body = Body::wrap_stream(create_rx.map(Ok::<Vec<u8>, std::io::Error>));
281    let update_body = Body::wrap_stream(update_rx.map(Ok::<Vec<u8>, std::io::Error>));
282
283    let url = url.to_string();
284    let url_2 = url.clone();
285    let access_token = access_token.to_string();
286    let domain_id = domain_id.to_string();
287    let access_token_2 = access_token.clone();
288    let domain_id_2 = domain_id.clone();
289
290    let (create_signal, create_signal_rx) = oneshot::channel::<
291        Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>>,
292    >();
293    let (update_signal, update_signal_rx) = oneshot::channel::<
294        Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>>,
295    >();
296
297    spawn(async move {
298        let create_response =
299            create_v1(&url, &access_token, &domain_id, boundary, create_body).await;
300        if let Err(Err(e)) = create_signal.send(create_response) {
301            tracing::error!("Failed to send create response: {}", e);
302        }
303    });
304
305    spawn(async move {
306        let update_response =
307            update_v1(&url_2, &access_token_2, &domain_id_2, boundary, update_body).await;
308        if let Err(Err(e)) = update_signal.send(update_response) {
309            tracing::error!("Failed to send update response: {}", e);
310        }
311    });
312
313    while let Some(datum) = rx.next().await {
314        match datum.action {
315            DomainAction::Create(create) => {
316                let create_data = write_create_body(boundary, &create, &datum.data);
317                create_tx.clone().send(create_data).await?;
318            }
319            DomainAction::Update(update) => {
320                let update_data = write_update_body(boundary, &update, &datum.data);
321                update_tx.send(update_data).await?;
322            }
323        }
324    }
325    update_tx
326        .send(format!("--{}--\r\n", boundary).as_bytes().to_vec())
327        .await?;
328    create_tx
329        .send(format!("--{}--\r\n", boundary).as_bytes().to_vec())
330        .await?;
331    update_tx.close().await?;
332    create_tx.close().await?;
333
334    let mut data = {
335        if let Ok(res) = create_signal_rx.await {
336            match res {
337                Ok(d) => d,
338                Err(e) => return Err(e),
339            }
340        } else {
341            return Err("create cancelled".into());
342        }
343    };
344
345    if let Ok(res) = update_signal_rx.await {
346        match res {
347            Ok(d) => data.extend(d),
348            Err(e) => return Err(e),
349        }
350    } else {
351        return Err("update cancelled".into());
352    }
353
354    Ok(data)
355}
356
357async fn update_v1(
358    url: &str,
359    access_token: &str,
360    domain_id: &str,
361    boundary: &str,
362    body: Body,
363) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
364    let update_response = Client::new()
365        .put(format!("{}/api/v1/domains/{}/data", url, domain_id))
366        .bearer_auth(access_token)
367        .header(
368            "Content-Type",
369            &format!("multipart/form-data; boundary={}", boundary),
370        )
371        .body(body)
372        .send()
373        .await?;
374
375    if update_response.status().is_success() {
376        let data = update_response
377            .json::<ListDomainDataMetadata>()
378            .await
379            .unwrap();
380        Ok(data.data)
381    } else {
382        let err = update_response
383            .text()
384            .await
385            .unwrap_or_else(|_| "Unknown error".to_string());
386        Err(format!("Update failed with status: {}", err).into())
387    }
388}
389
390async fn create_v1(
391    url: &str,
392    access_token: &str,
393    domain_id: &str,
394    boundary: &str,
395    body: Body,
396) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
397    let create_response = Client::new()
398        .post(format!("{}/api/v1/domains/{}/data", url, domain_id))
399        .bearer_auth(access_token)
400        .header(
401            "Content-Type",
402            &format!("multipart/form-data; boundary={}", boundary),
403        )
404        .body(body)
405        .send()
406        .await?;
407
408    if create_response.status().is_success() {
409        let data = create_response
410            .json::<ListDomainDataMetadata>()
411            .await
412            .unwrap();
413        Ok(data.data)
414    } else {
415        let err = create_response
416            .text()
417            .await
418            .unwrap_or_else(|_| "Unknown error".to_string());
419        Err(format!("Create failed with status: {}", err).into())
420    }
421}
422
423fn write_create_body(boundary: &str, data: &CreateDomainData, data_bytes: &[u8]) -> Vec<u8> {
424    let create_bytes = format!(
425        "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
426        boundary, data.name, data.data_type
427    );
428    let mut create_data = create_bytes.into_bytes();
429    create_data.extend_from_slice(data_bytes);
430    create_data.extend_from_slice("\r\n".as_bytes());
431    create_data
432}
433
434fn write_update_body(boundary: &str, data: &UpdateDomainData, data_bytes: &[u8]) -> Vec<u8> {
435    let update_bytes = format!(
436        "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; id=\"{}\"\r\n\r\n",
437        boundary, data.id
438    );
439    let mut update_data = update_bytes.into_bytes();
440    update_data.extend_from_slice(data_bytes);
441    update_data.extend_from_slice("\r\n".as_bytes());
442    update_data
443}
444
445pub async fn upload_v1(
446    url: &str,
447    access_token: &str,
448    domain_id: &str,
449    data: Vec<UploadDomainData>,
450) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
451    let boundary = "boundary";
452
453    let mut create_body = Vec::new();
454    let mut update_body = Vec::new();
455    let mut to_update = false;
456    let mut to_create = false;
457
458    // Process the first item to get metadata for the form
459    for datum in data {
460        match datum.action {
461            DomainAction::Create(create) => {
462                to_create = true;
463                let create_data = write_create_body(boundary, &create, &datum.data);
464                create_body.extend_from_slice(&create_data);
465            }
466            DomainAction::Update(update) => {
467                to_update = true;
468                let update_data = write_update_body(boundary, &update, &datum.data);
469                update_body.extend_from_slice(&update_data);
470            }
471        }
472    }
473
474    create_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
475    update_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
476
477    let create_body = Body::from(create_body);
478    let update_body = Body::from(update_body);
479    let mut res = Vec::new();
480
481    if to_create {
482        res = create_v1(url, access_token, domain_id, boundary, create_body).await?;
483    }
484    if to_update {
485        let update_response =
486            update_v1(url, access_token, domain_id, boundary, update_body).await?;
487        if !update_response.is_empty() {
488            res.extend(update_response);
489        }
490    }
491
492    Ok(res)
493}
494
495/// Upload a single domain data item (create or update) as one HTTP request.
496/// Returns a list of created/updated DomainData entries (with empty data payloads).
497#[cfg(not(target_family = "wasm"))]
498pub async fn upload_one(
499    url: &str,
500    access_token: &str,
501    domain_id: &str,
502    data: UploadDomainData,
503) -> Result<Vec<DomainData>, (StatusCode, String)> {
504    let boundary = "boundary";
505    let (method, body) = match &data.action {
506        DomainAction::Create(create) => {
507            let mut bytes = write_create_body(boundary, create, &data.data);
508            // terminate multipart body
509            bytes.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
510            (reqwest::Method::POST, Body::from(bytes))
511        }
512        DomainAction::Update(update) => {
513            let mut bytes = write_update_body(boundary, update, &data.data);
514            // terminate multipart body
515            bytes.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
516            (reqwest::Method::PUT, Body::from(bytes))
517        }
518    };
519
520    let endpoint = format!("{}/api/v1/domains/{}/data", url, domain_id);
521    let response = Client::new()
522        .request(method, endpoint)
523        .bearer_auth(access_token)
524        .header(
525            "Content-Type",
526            &format!("multipart/form-data; boundary={}", boundary),
527        )
528        .body(body)
529        .send()
530        .await
531        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
532
533    if response.status().is_success() {
534        // Be tolerant to varying response shapes (full metadata or id-only)
535        let text = response
536            .text()
537            .await
538            .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
539        // Try strict metadata first
540        if let Ok(md) = serde_json::from_str::<ListDomainDataMetadata>(&text) {
541            let items: Vec<DomainData> = md
542                .data
543                .into_iter()
544                .map(|m| DomainData {
545                    metadata: m,
546                    data: Vec::new(),
547                })
548                .collect();
549            return Ok(items);
550        }
551        // Fallback: extract minimal fields from generic JSON
552        let v: serde_json::Value = serde_json::from_str(&text)
553            .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
554        let mut out: Vec<DomainData> = Vec::new();
555        if let Some(arr) = v.get("data").and_then(|d| d.as_array()) {
556            for item in arr {
557                let id = item.get("id").and_then(|x| x.as_str()).unwrap_or("").to_string();
558                let domain_id = item
559                    .get("domain_id")
560                    .and_then(|x| x.as_str())
561                    .unwrap_or("")
562                    .to_string();
563                let name = item
564                    .get("name")
565                    .and_then(|x| x.as_str())
566                    .unwrap_or("")
567                    .to_string();
568                let data_type = item
569                    .get("data_type")
570                    .and_then(|x| x.as_str())
571                    .unwrap_or("")
572                    .to_string();
573                out.push(DomainData {
574                    metadata: DomainDataMetadata {
575                        id,
576                        domain_id,
577                        name,
578                        data_type,
579                        size: 0,
580                        created_at: String::new(),
581                        updated_at: String::new(),
582                    },
583                    data: Vec::new(),
584                });
585            }
586            return Ok(out);
587        }
588        Err((StatusCode::INTERNAL_SERVER_ERROR, "invalid response".to_string()))
589    } else {
590        let status = response.status();
591        let text = response
592            .text()
593            .await
594            .unwrap_or_else(|_| "Unknown error".to_string());
595        Err((status, text))
596    }
597}
598
599fn parse_headers(
600    headers_slice: &[u8],
601) -> Result<DomainData, Box<dyn std::error::Error + Send + Sync>> {
602    let headers_str = String::from_utf8_lossy(headers_slice);
603    let mut domain_data = None;
604
605    for line in headers_str.lines() {
606        if line.trim().is_empty() {
607            break;
608        }
609        if let Some((key, value)) = line.split_once(':') {
610            let key = key.trim().to_lowercase();
611            if key == "content-disposition" {
612                let mut parsed_domain_data = DomainData {
613                    metadata: DomainDataMetadata {
614                        id: String::new(),
615                        domain_id: String::new(),
616                        name: String::new(),
617                        data_type: String::new(),
618                        size: 0,
619                        created_at: String::new(),
620                        updated_at: String::new(),
621                    },
622                    data: Vec::new(),
623                };
624                for part in value.split(';') {
625                    let part = part.trim();
626                    if let Some((key, value)) = part.split_once('=') {
627                        let key = key.trim();
628                        let value = value.trim().trim_matches('"');
629                        match key {
630                            "id" => parsed_domain_data.metadata.id = value.to_string(),
631                            "domain-id" => {
632                                parsed_domain_data.metadata.domain_id = value.to_string()
633                            }
634                            "name" => parsed_domain_data.metadata.name = value.to_string(),
635                            "data-type" => {
636                                parsed_domain_data.metadata.data_type = value.to_string()
637                            }
638                            "size" => parsed_domain_data.metadata.size = value.parse()?,
639                            "created-at" => {
640                                parsed_domain_data.metadata.created_at = value.to_string()
641                            }
642                            "updated-at" => {
643                                parsed_domain_data.metadata.updated_at = value.to_string()
644                            }
645                            _ => {}
646                        }
647                    }
648                }
649                domain_data = Some(parsed_domain_data);
650            }
651        }
652    }
653
654    if let Some(domain_data) = domain_data {
655        Ok(domain_data)
656    } else {
657        Err("Missing content-disposition header".into())
658    }
659}
660
661fn find_boundary(data: &[u8], boundary: &[u8]) -> Option<usize> {
662    let _data = String::from_utf8_lossy(data);
663    let _boundary = String::from_utf8_lossy(boundary);
664    data.windows(boundary.len())
665        .position(|window| window == boundary)
666}
667
668fn find_headers_end(data: &[u8]) -> Option<usize> {
669    if let Some(i) = data.windows(4).position(|w| w == b"\r\n\r\n") {
670        return Some(i + 4);
671    }
672    data.windows(2)
673        .position(|w| w == b"\n\n")
674        .map(|i| i + 2)
675}
676
677async fn handle_domain_data_stream(
678    mut tx: mpsc::Sender<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
679    stream: impl Stream<Item = Result<Bytes, reqwest::Error>>,
680    boundary: &str,
681) {
682    use futures::pin_mut;
683
684    let mut buffer = Vec::new();
685    let mut current_domain_data: Option<DomainData> = None;
686    let boundary_bytes = format!("--{}", boundary).into_bytes();
687    let keep_tail = boundary_bytes.len() + 4; // bytes to keep for boundary detection across chunks
688
689    pin_mut!(stream);
690
691    while let Some(chunk_result) = stream.next().await {
692        let chunk = match chunk_result {
693            Ok(c) if c.is_empty() => continue,
694            Ok(c) => c,
695            Err(e) => {
696                let _ = tx.send(Err(e.into())).await;
697                return;
698            }
699        };
700
701        buffer.extend_from_slice(&chunk);
702
703        'consume: loop {
704            match &mut current_domain_data {
705                None => {
706                    let Some(boundary_pos) = find_boundary(&buffer, &boundary_bytes) else {
707                        if buffer.len() > keep_tail {
708                            buffer.drain(..buffer.len() - keep_tail);
709                        }
710                        break 'consume;
711                    };
712                    let Some(header_end_rel) = find_headers_end(&buffer[boundary_pos..]) else {
713                        break 'consume;
714                    };
715                    let headers_slice = &buffer[boundary_pos..boundary_pos + header_end_rel];
716                    let part_headers = parse_headers(headers_slice);
717                    let domain_data = match part_headers {
718                        Ok(d) => d,
719                        Err(e) => {
720                            tracing::error!("Failed to parse headers: {:?}", e);
721                            return;
722                        }
723                    };
724                    buffer.drain(..boundary_pos + header_end_rel);
725                    current_domain_data = Some(domain_data);
726                }
727                Some(dd) => {
728                    if let Some(next_boundary_pos) = find_boundary(&buffer, &boundary_bytes) {
729                        let mut data_end = next_boundary_pos;
730                        if data_end >= 2 && &buffer[data_end - 2..data_end] == b"\r\n" {
731                            data_end -= 2;
732                        } else if data_end >= 1 && buffer[data_end - 1] == b'\n' {
733                            data_end -= 1;
734                        }
735                        dd.data.extend_from_slice(&buffer[..data_end]);
736                        buffer.drain(..next_boundary_pos);
737                        let finished = current_domain_data.take().unwrap();
738                        if tx.send(Ok(finished)).await.is_err() {
739                            return;
740                        }
741                    } else {
742                        if buffer.len() > keep_tail {
743                            let take = buffer.len() - keep_tail;
744                            dd.data.extend_from_slice(&buffer[..take]);
745                            buffer.drain(..take);
746                        }
747                        break 'consume;
748                    }
749                }
750            }
751        }
752    }
753
754    let _ = tx.close().await;
755}
756
757#[cfg(test)]
758mod tests {
759    use bytes::Bytes;
760
761    use crate::{auth::TokenCache, config::Config, discovery::DiscoveryService};
762
763    use super::*;
764
765    fn get_config() -> (Config, String) {
766        if std::path::Path::new("../.env.local").exists() {
767            dotenvy::from_filename("../.env.local").ok();
768            dotenvy::dotenv().ok();
769        }
770        let config = Config::from_env().unwrap();
771        (config, std::env::var("DOMAIN_ID").unwrap())
772    }
773
774    #[test]
775    fn test_find_boundary_found() {
776        let data = b"random--boundary--data";
777        let boundary = b"--boundary";
778        assert_eq!(find_boundary(data, boundary), Some(6));
779    }
780
781    #[test]
782    fn test_find_boundary_not_found() {
783        let data = b"random-data";
784        let boundary = b"--boundary";
785        assert_eq!(find_boundary(data, boundary), None);
786    }
787
788    #[test]
789    fn test_find_headers_end_crlf() {
790        let data = b"header1: value1\r\nheader2: value2\r\n\r\nbody";
791        assert_eq!(find_headers_end(data), Some(36));
792    }
793
794    #[test]
795    fn test_find_headers_end_lf() {
796        let data = b"header1: value1\nheader2: value2\n\nbody";
797        assert_eq!(find_headers_end(data), Some(33));
798    }
799
800    #[test]
801    fn test_find_headers_end_none() {
802        let data = b"header1: value1\nheader2: value2\nbody";
803        assert_eq!(find_headers_end(data), None);
804    }
805
806    #[test]
807    fn test_parse_headers_success() {
808        let headers = b"content-disposition: form-data; id=\"123\"; domain-id=\"abc\"; name=\"test\"; data-type=\"type\"; size=\"42\"; created-at=\"2024-01-01T00:00:00Z\"; updated-at=\"2024-01-02T00:00:00Z\"\r\n\r\n";
809        let parsed = super::parse_headers(headers);
810        assert!(parsed.is_ok());
811        let domain_data = parsed.unwrap();
812        assert_eq!(domain_data.metadata.id, "123");
813        assert_eq!(domain_data.metadata.domain_id, "abc");
814        assert_eq!(domain_data.metadata.name, "test");
815        assert_eq!(domain_data.metadata.data_type, "type");
816        assert_eq!(domain_data.metadata.size, 42);
817        assert_eq!(domain_data.metadata.created_at, "2024-01-01T00:00:00Z");
818        assert_eq!(domain_data.metadata.updated_at, "2024-01-02T00:00:00Z");
819    }
820
821    #[test]
822    fn test_parse_headers_missing_content_disposition() {
823        let headers = b"content-type: application/octet-stream\r\n\r\n";
824        let parsed = super::parse_headers(headers);
825        assert!(parsed.is_err());
826    }
827
828    #[tokio::test]
829    async fn test_a_chunk_contains_multiple_data() {
830        let (tx, rx) = mpsc::channel(10);
831
832        let payload = br#"
833        --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
834Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
835Content-Type: application/octet-stream
836
837{"test": "test"}
838--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
839Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
840Content-Type: application/octet-stream
841
842{"test": "test updated"}
843--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
844        "#;
845        let stream = tokio_stream::iter(vec![Ok(Bytes::from_static(payload))]);
846
847        handle_domain_data_stream(
848            tx,
849            stream,
850            "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
851        )
852        .await;
853
854        let output: Vec<DomainData> = rx
855            .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
856            .await
857            .into_iter()
858            .map(|r| r.unwrap())
859            .collect();
860        assert_eq!(output.len(), 2);
861        assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
862        assert_eq!(output[0].data, b"{\"test\": \"test\"}");
863    }
864
865    #[tokio::test]
866    async fn test_chunk_size_is_smaller_than_part() {
867        let (tx, rx) = mpsc::channel(10);
868
869        let payload = br#"
870        --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
871Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
872Content-Type: application/octet-stream
873        "#;
874        let payload2 = br#"
875
876{"test": "test"}
877--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
878Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
879Content-Type: application/octet-stream
880
881{"test": "test updated"}
882--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
883"#;
884        let stream = tokio_stream::iter(vec![
885            Ok(Bytes::from_static(payload)),
886            Ok(Bytes::from_static(payload2)),
887        ]);
888
889        handle_domain_data_stream(
890            tx,
891            stream,
892            "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
893        )
894        .await;
895
896        let output: Vec<DomainData> = rx
897            .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
898            .await
899            .into_iter()
900            .map(|r| r.unwrap())
901            .collect();
902        assert_eq!(output.len(), 2);
903        assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
904        assert_eq!(output[0].data, b"{\"test\": \"test\"}");
905    }
906
907    #[tokio::test]
908    async fn test_chunk_size_is_smaller_than_header() {
909        let (tx, rx) = mpsc::channel(10);
910
911        let payload = br#"
912        --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
913Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
914Content-Type: application/octet-stream
915        "#;
916        let payload2 = br#"
917e: application/octet-stream
918
919{"test": "test"}
920--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
921Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
922Content-Type: application/octet-stream
923
924{"test": "test updated"}
925--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
926"#;
927        let stream = tokio_stream::iter(vec![
928            Ok(Bytes::from_static(payload)),
929            Ok(Bytes::from_static(payload2)),
930        ]);
931
932        handle_domain_data_stream(
933            tx,
934            stream,
935            "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
936        )
937        .await;
938
939        let output: Vec<DomainData> = rx
940            .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
941            .await
942            .into_iter()
943            .map(|r| r.unwrap())
944            .collect();
945        assert_eq!(output.len(), 2);
946        assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
947        assert_eq!(output[0].data, b"{\"test\": \"test\"}");
948    }
949
950    #[tokio::test]
951    async fn test_chunk_size_doesnt_cover_the_whole_data() {
952        let (tx, rx) = mpsc::channel(10);
953
954        let payload = br#"
955        --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
956Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
957Content-Type: application/octet-stream
958
959{"test": "test"#;
960        let payload2 = br#""}
961--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
962Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
963Content-Type: application/octet-stream
964
965{"test": "test updated"}
966--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
967"#;
968        let stream = tokio_stream::iter(vec![
969            Ok(Bytes::from_static(payload)),
970            Ok(Bytes::from_static(payload2)),
971        ]);
972
973        handle_domain_data_stream(
974            tx,
975            stream,
976            "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
977        )
978        .await;
979
980        let output: Vec<DomainData> = rx
981            .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
982            .await
983            .into_iter()
984            .map(|r| r.unwrap())
985            .collect();
986        assert_eq!(output.len(), 2);
987        assert_eq!(
988            std::str::from_utf8(&output[1].data).unwrap(),
989            "{\"test\": \"test updated\"}"
990        );
991        assert_eq!(
992            std::str::from_utf8(&output[0].data).unwrap(),
993            "{\"test\": \"test\"}"
994        );
995    }
996
997    #[tokio::test]
998    async fn test_upload_v1_with_user_dds_access_token() {
999        use crate::domain_data::{CreateDomainData, DomainAction, UploadDomainData};
1000
1001        let (config, domain_id) = get_config();
1002
1003        let mut discovery =
1004            DiscoveryService::new(&config.api_url, &config.dds_url, &config.client_id);
1005        discovery
1006            .sign_in_with_auki_account(&config.email.unwrap(), &config.password.unwrap(), false)
1007            .await
1008            .expect("sign_in_with_auki_account failed");
1009        let domain = discovery
1010            .auth_domain(&domain_id)
1011            .await
1012            .expect("get_domain failed");
1013        // 4. Prepare upload data
1014        let upload_data = vec![
1015            UploadDomainData {
1016                action: DomainAction::Create(CreateDomainData {
1017                    name: "test_upload".to_string(),
1018                    data_type: "test".to_string(),
1019                }),
1020                data: b"hello world".to_vec(),
1021            },
1022            UploadDomainData {
1023                action: DomainAction::Update(UpdateDomainData {
1024                    id: "a84a36e5-312b-4f80-974a-06f5d19c1e16".to_string(),
1025                }),
1026                data: b"{\"test\": \"test updated\"}".to_vec(),
1027            },
1028        ];
1029
1030        // 5. Call upload_v1
1031        let result = upload_v1(
1032            &domain.domain.domain_server.url,
1033            &domain.get_access_token(),
1034            &domain_id,
1035            upload_data,
1036        )
1037        .await
1038        .expect("upload_v1 failed");
1039
1040        assert_eq!(result.len(), 2, "No metadata returned from upload_v1");
1041        for data in result {
1042            if data.id != "a84a36e5-312b-4f80-974a-06f5d19c1e16" {
1043                assert_eq!(data.name, "test_upload");
1044                delete_by_id(
1045                    &domain.domain.domain_server.url,
1046                    &domain.get_access_token(),
1047                    &domain_id,
1048                    &data.id,
1049                )
1050                .await
1051                .expect("delete_by_id failed");
1052            }
1053        }
1054    }
1055}