posemesh_domain_http/
domain_data.rs

1use bytes::Bytes;
2use futures::{SinkExt, Stream, channel::mpsc, stream::StreamExt};
3use reqwest::{Body, Client, Response};
4#[cfg(not(target_family = "wasm"))]
5use reqwest::StatusCode;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8#[cfg(not(target_family = "wasm"))]
9use tokio::spawn;
10#[cfg(target_family = "wasm")]
11use wasm_bindgen_futures::spawn_local as spawn;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct DomainDataMetadata {
15    pub id: String,
16    pub domain_id: String,
17    pub name: String,
18    pub data_type: String,
19    pub size: u64,
20    pub created_at: String,
21    pub updated_at: String,
22}
23
24#[derive(Debug, Deserialize, Serialize)]
25pub struct DomainData {
26    // #[serde(flatten)] This doesn't work in serde_wasm_bindgen, it generates Map instead of a plain object
27    pub metadata: DomainDataMetadata,
28    pub data: Vec<u8>,
29}
30
31#[derive(Debug, Serialize, Clone, Deserialize)]
32pub struct UpdateDomainData {
33    pub id: String,
34}
35
36#[derive(Debug, Serialize, Clone, Deserialize)]
37pub struct CreateDomainData {
38    pub name: String,
39    pub data_type: String,
40}
41
42#[derive(Debug, Serialize, Deserialize)]
43#[serde(untagged)]
44pub enum DomainAction {
45    Create {
46        name: String,
47        data_type: String,
48    },
49    Update {
50        id: String,
51    },
52}
53
54#[derive(Debug, Serialize, Deserialize)]
55pub struct UploadDomainData {
56    #[serde(flatten)]
57    pub action: DomainAction,
58    pub data: Vec<u8>,
59}
60
61#[derive(Debug, Serialize, Deserialize)]
62pub struct DownloadQuery {
63    pub ids: Vec<String>,
64    pub name: Option<String>,
65    pub data_type: Option<String>,
66}
67
68#[derive(Debug, Deserialize)]
69struct ListDomainDataMetadata {
70    pub data: Vec<DomainDataMetadata>,
71}
72
73pub async fn download_by_id(
74    url: &str,
75    client_id: &str,
76    access_token: &str,
77    domain_id: &str,
78    id: &str,
79) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
80    let response = Client::new()
81        .get(format!(
82            "{}/api/v1/domains/{}/data/{}?raw=true",
83            url, domain_id, id
84        ))
85        .bearer_auth(access_token)
86        .header("posemesh-client-id", client_id)
87        .send()
88        .await?;
89
90    if response.status().is_success() {
91        let data = response.bytes().await?;
92        Ok(data.to_vec())
93    } else {
94        let status = response.status();
95        let text = response
96            .text()
97            .await
98            .unwrap_or_else(|_| "Unknown error".to_string());
99        Err(format!(
100            "Failed to download data by id. Status: {} - {}",
101            status, text
102        )
103        .into())
104    }
105}
106
107/// Perform a direct absolute download request to the domain server.
108/// Sets headers: Accept: multipart/form-data, Authorization: Bearer <token>, posemesh-client-id.
109pub async fn request_download_absolute(
110    url: &str,
111    client_id: &str,
112    access_token: &str,
113) -> Result<Response, Box<dyn std::error::Error + Send + Sync>> {
114    let response = Client::new()
115        .get(url)
116        .bearer_auth(access_token)
117        .header("posemesh-client-id", client_id)
118        .header("Accept", "multipart/form-data")
119        .send()
120        .await?;
121    Ok(response)
122}
123
124pub async fn download_metadata_v1(
125    url: &str,
126    client_id: &str,
127    access_token: &str,
128    domain_id: &str,
129    query: &DownloadQuery,
130) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
131    let response = download_v1(url, client_id, access_token, domain_id, query, false).await?;
132    if response.status().is_success() {
133        let data = response.json::<ListDomainDataMetadata>().await?;
134        Ok(data.data)
135    } else {
136        let status = response.status();
137        let text = response
138            .text()
139            .await
140            .unwrap_or_else(|_| "Unknown error".to_string());
141        Err(format!("Failed to download data. Status: {} - {}", status, text).into())
142    }
143}
144
145pub async fn download_v1(
146    url: &str,
147    client_id: &str,
148    access_token: &str,
149    domain_id: &str,
150    query: &DownloadQuery,
151    with_data: bool,
152) -> Result<Response, Box<dyn std::error::Error + Send + Sync>> {
153    let mut params = HashMap::new();
154
155    if let Some(name) = &query.name {
156        params.insert("name", name.clone());
157    }
158    if let Some(data_type) = &query.data_type {
159        params.insert("data_type", data_type.clone());
160    }
161    let ids = if !query.ids.is_empty() {
162        format!("?ids={}", query.ids.join(","))
163    } else {
164        String::new()
165    };
166
167    let response = Client::new()
168        .get(format!("{}/api/v1/domains/{}/data{}", url, domain_id, ids))
169        .bearer_auth(access_token)
170        .header(
171            "Accept",
172            if with_data {
173                "multipart/form-data"
174            } else {
175                "application/json"
176            },
177        )
178        .header("posemesh-client-id", client_id)
179        .query(&params)
180        .send()
181        .await?;
182
183    if response.status().is_success() {
184        Ok(response)
185    } else {
186        let status = response.status();
187        let text = response
188            .text()
189            .await
190            .unwrap_or_else(|_| "Unknown error".to_string());
191        Err(format!("Failed to download data. Status: {} - {}", status, text).into())
192    }
193}
194
195pub async fn download_v1_stream(
196    url: &str,
197    client_id: &str,
198    access_token: &str,
199    domain_id: &str,
200    query: &DownloadQuery,
201) -> Result<
202    mpsc::Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
203    Box<dyn std::error::Error + Send + Sync>,
204> {
205    let response = download_v1(url, client_id, access_token, domain_id, query, true).await?;
206
207    stream_from_response(response).await
208}
209
210/// Build a stream from an HTTP response returned by the domain download endpoint.
211/// Parses multipart boundaries and yields DomainData items as they arrive.
212pub async fn stream_from_response(
213    response: Response,
214) -> Result<
215    mpsc::Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
216    Box<dyn std::error::Error + Send + Sync>,
217> {
218    let (mut tx, rx) =
219        mpsc::channel::<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>(100);
220
221    let boundary = match response
222        .headers()
223        .get("content-type")
224        .and_then(|ct| ct.to_str().ok())
225        .and_then(|ct| {
226            if ct.starts_with("multipart/form-data; boundary=") {
227                Some(ct.split("boundary=").nth(1)?.to_string())
228            } else {
229                None
230            }
231        }) {
232        Some(b) => b,
233        None => {
234            tracing::error!("Invalid content-type header");
235            let _ = tx.close().await;
236            return Err("Invalid content-type header".into());
237        }
238    };
239
240    spawn(async move {
241        let stream = response.bytes_stream();
242        handle_domain_data_stream(tx, stream, &boundary).await;
243    });
244
245    Ok(rx)
246}
247
248pub async fn delete_by_id(
249    url: &str,
250    access_token: &str,
251    domain_id: &str,
252    id: &str,
253) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
254    let endpoint = format!("{}/api/v1/domains/{}/data/{}", url, domain_id, id);
255    let client = Client::new();
256    let resp = client
257        .delete(&endpoint)
258        .bearer_auth(access_token)
259        .send()
260        .await?;
261
262    if resp.status().is_success() {
263        Ok(())
264    } else {
265        let err = resp
266            .text()
267            .await
268            .unwrap_or_else(|_| "Unknown error".to_string());
269        Err(format!("Delete failed with status: {}", err).into())
270    }
271}
272
273#[cfg(not(target_family = "wasm"))]
274pub async fn upload_v1_stream(
275    url: &str,
276    access_token: &str,
277    domain_id: &str,
278    mut rx: mpsc::Receiver<UploadDomainData>,
279) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
280    use futures::channel::oneshot;
281
282    let boundary = "boundary";
283
284    let (mut create_tx, create_rx) = mpsc::channel(100);
285    let (mut update_tx, update_rx) = mpsc::channel(100);
286
287    let create_body = Body::wrap_stream(create_rx.map(Ok::<Vec<u8>, std::io::Error>));
288    let update_body = Body::wrap_stream(update_rx.map(Ok::<Vec<u8>, std::io::Error>));
289
290    let url = url.to_string();
291    let url_2 = url.clone();
292    let access_token = access_token.to_string();
293    let domain_id = domain_id.to_string();
294    let access_token_2 = access_token.clone();
295    let domain_id_2 = domain_id.clone();
296
297    let (create_signal, create_signal_rx) = oneshot::channel::<
298        Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>>,
299    >();
300    let (update_signal, update_signal_rx) = oneshot::channel::<
301        Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>>,
302    >();
303
304    spawn(async move {
305        let create_response =
306            create_v1(&url, &access_token, &domain_id, boundary, create_body).await;
307        if let Err(Err(e)) = create_signal.send(create_response) {
308            tracing::error!("Failed to send create response: {}", e);
309        }
310    });
311
312    spawn(async move {
313        let update_response =
314            update_v1(&url_2, &access_token_2, &domain_id_2, boundary, update_body).await;
315        if let Err(Err(e)) = update_signal.send(update_response) {
316            tracing::error!("Failed to send update response: {}", e);
317        }
318    });
319
320    while let Some(datum) = rx.next().await {
321        match datum.action {
322            DomainAction::Create { name, data_type } => {
323                let create_data = write_create_body(boundary, &CreateDomainData { name, data_type }, &datum.data);
324                create_tx.clone().send(create_data).await?;
325            }
326            DomainAction::Update { id } => {
327                let update_data = write_update_body(boundary, &UpdateDomainData { id }, &datum.data);
328                update_tx.send(update_data).await?;
329            }
330        }
331    }
332    update_tx
333        .send(format!("--{}--\r\n", boundary).as_bytes().to_vec())
334        .await?;
335    create_tx
336        .send(format!("--{}--\r\n", boundary).as_bytes().to_vec())
337        .await?;
338    update_tx.close().await?;
339    create_tx.close().await?;
340
341    let mut data = {
342        if let Ok(res) = create_signal_rx.await {
343            match res {
344                Ok(d) => d,
345                Err(e) => return Err(e),
346            }
347        } else {
348            return Err("create cancelled".into());
349        }
350    };
351
352    if let Ok(res) = update_signal_rx.await {
353        match res {
354            Ok(d) => data.extend(d),
355            Err(e) => return Err(e),
356        }
357    } else {
358        return Err("update cancelled".into());
359    }
360
361    Ok(data)
362}
363
364async fn update_v1(
365    url: &str,
366    access_token: &str,
367    domain_id: &str,
368    boundary: &str,
369    body: Body,
370) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
371    let update_response = Client::new()
372        .put(format!("{}/api/v1/domains/{}/data", url, domain_id))
373        .bearer_auth(access_token)
374        .header(
375            "Content-Type",
376            &format!("multipart/form-data; boundary={}", boundary),
377        )
378        .body(body)
379        .send()
380        .await?;
381
382    if update_response.status().is_success() {
383        let data = update_response
384            .json::<ListDomainDataMetadata>()
385            .await
386            .unwrap();
387        Ok(data.data)
388    } else {
389        let err = update_response
390            .text()
391            .await
392            .unwrap_or_else(|_| "Unknown error".to_string());
393        Err(format!("Update failed with status: {}", err).into())
394    }
395}
396
397async fn create_v1(
398    url: &str,
399    access_token: &str,
400    domain_id: &str,
401    boundary: &str,
402    body: Body,
403) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
404    let create_response = Client::new()
405        .post(format!("{}/api/v1/domains/{}/data", url, domain_id))
406        .bearer_auth(access_token)
407        .header(
408            "Content-Type",
409            &format!("multipart/form-data; boundary={}", boundary),
410        )
411        .body(body)
412        .send()
413        .await?;
414
415    if create_response.status().is_success() {
416        let data = create_response
417            .json::<ListDomainDataMetadata>()
418            .await
419            .unwrap();
420        Ok(data.data)
421    } else {
422        let err = create_response
423            .text()
424            .await
425            .unwrap_or_else(|_| "Unknown error".to_string());
426        Err(format!("Create failed with status: {}", err).into())
427    }
428}
429
430fn write_create_body(boundary: &str, data: &CreateDomainData, data_bytes: &[u8]) -> Vec<u8> {
431    let create_bytes = format!(
432        "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
433        boundary, data.name, data.data_type
434    );
435    let mut create_data = create_bytes.into_bytes();
436    create_data.extend_from_slice(data_bytes);
437    create_data.extend_from_slice("\r\n".as_bytes());
438    create_data
439}
440
441fn write_update_body(boundary: &str, data: &UpdateDomainData, data_bytes: &[u8]) -> Vec<u8> {
442    let update_bytes = format!(
443        "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; id=\"{}\"\r\n\r\n",
444        boundary, data.id
445    );
446    let mut update_data = update_bytes.into_bytes();
447    update_data.extend_from_slice(data_bytes);
448    update_data.extend_from_slice("\r\n".as_bytes());
449    update_data
450}
451
452pub async fn upload_v1(
453    url: &str,
454    access_token: &str,
455    domain_id: &str,
456    data: Vec<UploadDomainData>,
457) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
458    let boundary = "boundary";
459
460    let mut create_body = Vec::new();
461    let mut update_body = Vec::new();
462    let mut to_update = false;
463    let mut to_create = false;
464
465    // Process the first item to get metadata for the form
466    for datum in data {
467        match datum.action {
468            DomainAction::Create { name, data_type } => {
469                to_create = true;
470                let create_data = write_create_body(boundary, &CreateDomainData { name, data_type }, &datum.data);
471                create_body.extend_from_slice(&create_data);
472            }
473            DomainAction::Update { id } => {
474                to_update = true;
475                let update_data = write_update_body(boundary, &UpdateDomainData { id }, &datum.data);
476                update_body.extend_from_slice(&update_data);
477            }
478        }
479    }
480
481    create_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
482    update_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
483
484    let create_body = Body::from(create_body);
485    let update_body = Body::from(update_body);
486    let mut res = Vec::new();
487
488    if to_create {
489        res = create_v1(url, access_token, domain_id, boundary, create_body).await?;
490    }
491    if to_update {
492        let update_response =
493            update_v1(url, access_token, domain_id, boundary, update_body).await?;
494        if !update_response.is_empty() {
495            res.extend(update_response);
496        }
497    }
498
499    Ok(res)
500}
501
502/// Upload a single domain data item (create or update) as one HTTP request.
503/// Returns a list of created/updated DomainData entries (with empty data payloads).
504#[cfg(not(target_family = "wasm"))]
505pub async fn upload_one(
506    url: &str,
507    access_token: &str,
508    domain_id: &str,
509    data: UploadDomainData,
510) -> Result<Vec<DomainData>, (StatusCode, String)> {
511    let boundary = "boundary";
512    let (method, body) = match &data.action {
513        DomainAction::Create { name, data_type } => {
514            let create = CreateDomainData {
515                name: name.clone(),
516                data_type: data_type.clone(),
517            };
518            let mut bytes = write_create_body(boundary, &create, &data.data);
519            // terminate multipart body
520            bytes.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
521            (reqwest::Method::POST, Body::from(bytes))
522        }
523        DomainAction::Update { id } => {
524            let update = UpdateDomainData { id: id.clone() };
525            let mut bytes = write_update_body(boundary, &update, &data.data);
526            // terminate multipart body
527            bytes.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
528            (reqwest::Method::PUT, Body::from(bytes))
529        }
530    };
531
532    let endpoint = format!("{}/api/v1/domains/{}/data", url, domain_id);
533    let response = Client::new()
534        .request(method, endpoint)
535        .bearer_auth(access_token)
536        .header(
537            "Content-Type",
538            &format!("multipart/form-data; boundary={}", boundary),
539        )
540        .body(body)
541        .send()
542        .await
543        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
544
545    if response.status().is_success() {
546        // Be tolerant to varying response shapes (full metadata or id-only)
547        let text = response
548            .text()
549            .await
550            .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
551        // Try strict metadata first
552        if let Ok(md) = serde_json::from_str::<ListDomainDataMetadata>(&text) {
553            let items: Vec<DomainData> = md
554                .data
555                .into_iter()
556                .map(|m| DomainData {
557                    metadata: m,
558                    data: Vec::new(),
559                })
560                .collect();
561            return Ok(items);
562        }
563        // Fallback: extract minimal fields from generic JSON
564        let v: serde_json::Value = serde_json::from_str(&text)
565            .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
566        let mut out: Vec<DomainData> = Vec::new();
567        if let Some(arr) = v.get("data").and_then(|d| d.as_array()) {
568            for item in arr {
569                let id = item.get("id").and_then(|x| x.as_str()).unwrap_or("").to_string();
570                let domain_id = item
571                    .get("domain_id")
572                    .and_then(|x| x.as_str())
573                    .unwrap_or("")
574                    .to_string();
575                let name = item
576                    .get("name")
577                    .and_then(|x| x.as_str())
578                    .unwrap_or("")
579                    .to_string();
580                let data_type = item
581                    .get("data_type")
582                    .and_then(|x| x.as_str())
583                    .unwrap_or("")
584                    .to_string();
585                out.push(DomainData {
586                    metadata: DomainDataMetadata {
587                        id,
588                        domain_id,
589                        name,
590                        data_type,
591                        size: 0,
592                        created_at: String::new(),
593                        updated_at: String::new(),
594                    },
595                    data: Vec::new(),
596                });
597            }
598            return Ok(out);
599        }
600        Err((StatusCode::INTERNAL_SERVER_ERROR, "invalid response".to_string()))
601    } else {
602        let status = response.status();
603        let text = response
604            .text()
605            .await
606            .unwrap_or_else(|_| "Unknown error".to_string());
607        Err((status, text))
608    }
609}
610
611fn parse_headers(
612    headers_slice: &[u8],
613) -> Result<DomainData, Box<dyn std::error::Error + Send + Sync>> {
614    let headers_str = String::from_utf8_lossy(headers_slice);
615    let mut domain_data = None;
616
617    for line in headers_str.lines() {
618        if line.trim().is_empty() {
619            break;
620        }
621        if let Some((key, value)) = line.split_once(':') {
622            let key = key.trim().to_lowercase();
623            if key == "content-disposition" {
624                let mut parsed_domain_data = DomainData {
625                    metadata: DomainDataMetadata {
626                        id: String::new(),
627                        domain_id: String::new(),
628                        name: String::new(),
629                        data_type: String::new(),
630                        size: 0,
631                        created_at: String::new(),
632                        updated_at: String::new(),
633                    },
634                    data: Vec::new(),
635                };
636                for part in value.split(';') {
637                    let part = part.trim();
638                    if let Some((key, value)) = part.split_once('=') {
639                        let key = key.trim();
640                        let value = value.trim().trim_matches('"');
641                        match key {
642                            "id" => parsed_domain_data.metadata.id = value.to_string(),
643                            "domain-id" => {
644                                parsed_domain_data.metadata.domain_id = value.to_string()
645                            }
646                            "name" => parsed_domain_data.metadata.name = value.to_string(),
647                            "data-type" => {
648                                parsed_domain_data.metadata.data_type = value.to_string()
649                            }
650                            "size" => parsed_domain_data.metadata.size = value.parse()?,
651                            "created-at" => {
652                                parsed_domain_data.metadata.created_at = value.to_string()
653                            }
654                            "updated-at" => {
655                                parsed_domain_data.metadata.updated_at = value.to_string()
656                            }
657                            _ => {}
658                        }
659                    }
660                }
661                domain_data = Some(parsed_domain_data);
662            }
663        }
664    }
665
666    if let Some(domain_data) = domain_data {
667        Ok(domain_data)
668    } else {
669        Err("Missing content-disposition header".into())
670    }
671}
672
673fn find_boundary(data: &[u8], boundary: &[u8]) -> Option<usize> {
674    let _data = String::from_utf8_lossy(data);
675    let _boundary = String::from_utf8_lossy(boundary);
676    data.windows(boundary.len())
677        .position(|window| window == boundary)
678}
679
680fn find_headers_end(data: &[u8]) -> Option<usize> {
681    if let Some(i) = data.windows(4).position(|w| w == b"\r\n\r\n") {
682        return Some(i + 4);
683    }
684    data.windows(2)
685        .position(|w| w == b"\n\n")
686        .map(|i| i + 2)
687}
688
689async fn handle_domain_data_stream(
690    mut tx: mpsc::Sender<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
691    stream: impl Stream<Item = Result<Bytes, reqwest::Error>>,
692    boundary: &str,
693) {
694    use futures::pin_mut;
695
696    let mut buffer = Vec::new();
697    let mut current_domain_data: Option<DomainData> = None;
698    let boundary_bytes = format!("--{}", boundary).into_bytes();
699    let keep_tail = boundary_bytes.len() + 4; // bytes to keep for boundary detection across chunks
700
701    pin_mut!(stream);
702
703    while let Some(chunk_result) = stream.next().await {
704        let chunk = match chunk_result {
705            Ok(c) if c.is_empty() => continue,
706            Ok(c) => c,
707            Err(e) => {
708                let _ = tx.send(Err(e.into())).await;
709                return;
710            }
711        };
712
713        buffer.extend_from_slice(&chunk);
714
715        'consume: loop {
716            match &mut current_domain_data {
717                None => {
718                    let Some(boundary_pos) = find_boundary(&buffer, &boundary_bytes) else {
719                        if buffer.len() > keep_tail {
720                            buffer.drain(..buffer.len() - keep_tail);
721                        }
722                        break 'consume;
723                    };
724                    let Some(header_end_rel) = find_headers_end(&buffer[boundary_pos..]) else {
725                        break 'consume;
726                    };
727                    let headers_slice = &buffer[boundary_pos..boundary_pos + header_end_rel];
728                    let part_headers = parse_headers(headers_slice);
729                    let domain_data = match part_headers {
730                        Ok(d) => d,
731                        Err(e) => {
732                            tracing::error!("Failed to parse headers: {:?}", e);
733                            return;
734                        }
735                    };
736                    buffer.drain(..boundary_pos + header_end_rel);
737                    current_domain_data = Some(domain_data);
738                }
739                Some(dd) => {
740                    if let Some(next_boundary_pos) = find_boundary(&buffer, &boundary_bytes) {
741                        let mut data_end = next_boundary_pos;
742                        if data_end >= 2 && &buffer[data_end - 2..data_end] == b"\r\n" {
743                            data_end -= 2;
744                        } else if data_end >= 1 && buffer[data_end - 1] == b'\n' {
745                            data_end -= 1;
746                        }
747                        dd.data.extend_from_slice(&buffer[..data_end]);
748                        buffer.drain(..next_boundary_pos);
749                        let finished = current_domain_data.take().unwrap();
750                        if tx.send(Ok(finished)).await.is_err() {
751                            return;
752                        }
753                    } else {
754                        if buffer.len() > keep_tail {
755                            let take = buffer.len() - keep_tail;
756                            dd.data.extend_from_slice(&buffer[..take]);
757                            buffer.drain(..take);
758                        }
759                        break 'consume;
760                    }
761                }
762            }
763        }
764    }
765
766    let _ = tx.close().await;
767}
768
769#[cfg(test)]
770mod tests {
771    use bytes::Bytes;
772
773    use crate::{auth::TokenCache, config::Config, discovery::DiscoveryService};
774
775    use super::*;
776
777    fn get_config() -> (Config, String) {
778        if std::path::Path::new("../.env.local").exists() {
779            dotenvy::from_filename("../.env.local").ok();
780            dotenvy::dotenv().ok();
781        }
782        let config = Config::from_env().unwrap();
783        (config, std::env::var("DOMAIN_ID").unwrap())
784    }
785
786    #[test]
787    fn test_find_boundary_found() {
788        let data = b"random--boundary--data";
789        let boundary = b"--boundary";
790        assert_eq!(find_boundary(data, boundary), Some(6));
791    }
792
793    #[test]
794    fn test_find_boundary_not_found() {
795        let data = b"random-data";
796        let boundary = b"--boundary";
797        assert_eq!(find_boundary(data, boundary), None);
798    }
799
800    #[test]
801    fn test_find_headers_end_crlf() {
802        let data = b"header1: value1\r\nheader2: value2\r\n\r\nbody";
803        assert_eq!(find_headers_end(data), Some(36));
804    }
805
806    #[test]
807    fn test_find_headers_end_lf() {
808        let data = b"header1: value1\nheader2: value2\n\nbody";
809        assert_eq!(find_headers_end(data), Some(33));
810    }
811
812    #[test]
813    fn test_find_headers_end_none() {
814        let data = b"header1: value1\nheader2: value2\nbody";
815        assert_eq!(find_headers_end(data), None);
816    }
817
818    #[test]
819    fn test_parse_headers_success() {
820        let headers = b"content-disposition: form-data; id=\"123\"; domain-id=\"abc\"; name=\"test\"; data-type=\"type\"; size=\"42\"; created-at=\"2024-01-01T00:00:00Z\"; updated-at=\"2024-01-02T00:00:00Z\"\r\n\r\n";
821        let parsed = super::parse_headers(headers);
822        assert!(parsed.is_ok());
823        let domain_data = parsed.unwrap();
824        assert_eq!(domain_data.metadata.id, "123");
825        assert_eq!(domain_data.metadata.domain_id, "abc");
826        assert_eq!(domain_data.metadata.name, "test");
827        assert_eq!(domain_data.metadata.data_type, "type");
828        assert_eq!(domain_data.metadata.size, 42);
829        assert_eq!(domain_data.metadata.created_at, "2024-01-01T00:00:00Z");
830        assert_eq!(domain_data.metadata.updated_at, "2024-01-02T00:00:00Z");
831    }
832
833    #[test]
834    fn test_parse_headers_missing_content_disposition() {
835        let headers = b"content-type: application/octet-stream\r\n\r\n";
836        let parsed = super::parse_headers(headers);
837        assert!(parsed.is_err());
838    }
839
840    #[tokio::test]
841    async fn test_a_chunk_contains_multiple_data() {
842        let (tx, rx) = mpsc::channel(10);
843
844        let payload = br#"
845        --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
846Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
847Content-Type: application/octet-stream
848
849{"test": "test"}
850--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
851Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
852Content-Type: application/octet-stream
853
854{"test": "test updated"}
855--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
856        "#;
857        let stream = tokio_stream::iter(vec![Ok(Bytes::from_static(payload))]);
858
859        handle_domain_data_stream(
860            tx,
861            stream,
862            "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
863        )
864        .await;
865
866        let output: Vec<DomainData> = rx
867            .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
868            .await
869            .into_iter()
870            .map(|r| r.unwrap())
871            .collect();
872        assert_eq!(output.len(), 2);
873        assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
874        assert_eq!(output[0].data, b"{\"test\": \"test\"}");
875    }
876
877    #[tokio::test]
878    async fn test_chunk_size_is_smaller_than_part() {
879        let (tx, rx) = mpsc::channel(10);
880
881        let payload = br#"
882        --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
883Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
884Content-Type: application/octet-stream
885        "#;
886        let payload2 = br#"
887
888{"test": "test"}
889--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
890Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
891Content-Type: application/octet-stream
892
893{"test": "test updated"}
894--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
895"#;
896        let stream = tokio_stream::iter(vec![
897            Ok(Bytes::from_static(payload)),
898            Ok(Bytes::from_static(payload2)),
899        ]);
900
901        handle_domain_data_stream(
902            tx,
903            stream,
904            "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
905        )
906        .await;
907
908        let output: Vec<DomainData> = rx
909            .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
910            .await
911            .into_iter()
912            .map(|r| r.unwrap())
913            .collect();
914        assert_eq!(output.len(), 2);
915        assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
916        assert_eq!(output[0].data, b"{\"test\": \"test\"}");
917    }
918
919    #[tokio::test]
920    async fn test_chunk_size_is_smaller_than_header() {
921        let (tx, rx) = mpsc::channel(10);
922
923        let payload = br#"
924        --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
925Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
926Content-Type: application/octet-stream
927        "#;
928        let payload2 = br#"
929e: application/octet-stream
930
931{"test": "test"}
932--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
933Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
934Content-Type: application/octet-stream
935
936{"test": "test updated"}
937--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
938"#;
939        let stream = tokio_stream::iter(vec![
940            Ok(Bytes::from_static(payload)),
941            Ok(Bytes::from_static(payload2)),
942        ]);
943
944        handle_domain_data_stream(
945            tx,
946            stream,
947            "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
948        )
949        .await;
950
951        let output: Vec<DomainData> = rx
952            .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
953            .await
954            .into_iter()
955            .map(|r| r.unwrap())
956            .collect();
957        assert_eq!(output.len(), 2);
958        assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
959        assert_eq!(output[0].data, b"{\"test\": \"test\"}");
960    }
961
962    #[tokio::test]
963    async fn test_chunk_size_doesnt_cover_the_whole_data() {
964        let (tx, rx) = mpsc::channel(10);
965
966        let payload = br#"
967        --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
968Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
969Content-Type: application/octet-stream
970
971{"test": "test"#;
972        let payload2 = br#""}
973--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
974Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
975Content-Type: application/octet-stream
976
977{"test": "test updated"}
978--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
979"#;
980        let stream = tokio_stream::iter(vec![
981            Ok(Bytes::from_static(payload)),
982            Ok(Bytes::from_static(payload2)),
983        ]);
984
985        handle_domain_data_stream(
986            tx,
987            stream,
988            "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
989        )
990        .await;
991
992        let output: Vec<DomainData> = rx
993            .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
994            .await
995            .into_iter()
996            .map(|r| r.unwrap())
997            .collect();
998        assert_eq!(output.len(), 2);
999        assert_eq!(
1000            std::str::from_utf8(&output[1].data).unwrap(),
1001            "{\"test\": \"test updated\"}"
1002        );
1003        assert_eq!(
1004            std::str::from_utf8(&output[0].data).unwrap(),
1005            "{\"test\": \"test\"}"
1006        );
1007    }
1008
1009    #[tokio::test]
1010    #[ignore = "requires live Auki credentials"]
1011    async fn test_upload_v1_with_user_dds_access_token() {
1012        use crate::domain_data::{DomainAction, UploadDomainData};
1013
1014        let (config, domain_id) = get_config();
1015
1016        let mut discovery =
1017            DiscoveryService::new(&config.api_url, &config.dds_url, &config.client_id);
1018        discovery
1019            .sign_in_with_auki_account(&config.email.unwrap(), &config.password.unwrap(), false)
1020            .await
1021            .expect("sign_in_with_auki_account failed");
1022        let domain = discovery
1023            .auth_domain(&domain_id)
1024            .await
1025            .expect("get_domain failed");
1026        // 4. Prepare upload data
1027        let upload_data = vec![
1028            UploadDomainData {
1029                action: DomainAction::Create {
1030                    name: "test_upload".to_string(),
1031                    data_type: "test".to_string(),
1032                },
1033                data: b"hello world".to_vec(),
1034            },
1035            UploadDomainData {
1036                action: DomainAction::Update {
1037                    id: "a84a36e5-312b-4f80-974a-06f5d19c1e16".to_string(),
1038                },
1039                data: b"{\"test\": \"test updated\"}".to_vec(),
1040            },
1041        ];
1042
1043        // 5. Call upload_v1
1044        let result = upload_v1(
1045            &domain.domain.domain_server.url,
1046            &domain.get_access_token(),
1047            &domain_id,
1048            upload_data,
1049        )
1050        .await
1051        .expect("upload_v1 failed");
1052
1053        assert_eq!(result.len(), 2, "No metadata returned from upload_v1");
1054        for data in result {
1055            if data.id != "a84a36e5-312b-4f80-974a-06f5d19c1e16" {
1056                assert_eq!(data.name, "test_upload");
1057                delete_by_id(
1058                    &domain.domain.domain_server.url,
1059                    &domain.get_access_token(),
1060                    &domain_id,
1061                    &data.id,
1062                )
1063                .await
1064                .expect("delete_by_id failed");
1065            }
1066        }
1067    }
1068}