1use bytes::Bytes;
2use futures::{SinkExt, Stream, channel::mpsc, stream::StreamExt};
3use reqwest::{Body, Client, Response, StatusCode};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6#[cfg(not(target_family = "wasm"))]
7use tokio::spawn;
8#[cfg(target_family = "wasm")]
9use wasm_bindgen_futures::spawn_local as spawn;
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct DomainDataMetadata {
13 pub id: String,
14 pub domain_id: String,
15 pub name: String,
16 pub data_type: String,
17 pub size: u64,
18 pub created_at: String,
19 pub updated_at: String,
20}
21
22#[derive(Debug, Deserialize, Serialize)]
23pub struct DomainData {
24 pub metadata: DomainDataMetadata,
26 pub data: Vec<u8>,
27}
28
29#[derive(Debug, Serialize, Clone, Deserialize)]
30pub struct UpdateDomainData {
31 pub id: String,
32}
33
34#[derive(Debug, Serialize, Clone, Deserialize)]
35pub struct CreateDomainData {
36 pub name: String,
37 pub data_type: String,
38}
39
40#[derive(Debug, Serialize, Deserialize)]
41#[serde(untagged)]
42pub enum DomainAction {
43 Create(CreateDomainData),
44 Update(UpdateDomainData),
45}
46
47#[derive(Debug, Serialize, Deserialize)]
48pub struct UploadDomainData {
49 #[serde(flatten)]
50 pub action: DomainAction,
51 pub data: Vec<u8>,
52}
53
54#[derive(Debug, Serialize, Deserialize)]
55pub struct DownloadQuery {
56 pub ids: Vec<String>,
57 pub name: Option<String>,
58 pub data_type: Option<String>,
59}
60
61#[derive(Debug, Deserialize)]
62struct ListDomainDataMetadata {
63 pub data: Vec<DomainDataMetadata>,
64}
65
66pub async fn download_by_id(
67 url: &str,
68 client_id: &str,
69 access_token: &str,
70 domain_id: &str,
71 id: &str,
72) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
73 let response = Client::new()
74 .get(format!(
75 "{}/api/v1/domains/{}/data/{}?raw=true",
76 url, domain_id, id
77 ))
78 .bearer_auth(access_token)
79 .header("posemesh-client-id", client_id)
80 .send()
81 .await?;
82
83 if response.status().is_success() {
84 let data = response.bytes().await?;
85 Ok(data.to_vec())
86 } else {
87 let status = response.status();
88 let text = response
89 .text()
90 .await
91 .unwrap_or_else(|_| "Unknown error".to_string());
92 Err(format!(
93 "Failed to download data by id. Status: {} - {}",
94 status, text
95 )
96 .into())
97 }
98}
99
100pub async fn request_download_absolute(
103 url: &str,
104 client_id: &str,
105 access_token: &str,
106) -> Result<Response, Box<dyn std::error::Error + Send + Sync>> {
107 let response = Client::new()
108 .get(url)
109 .bearer_auth(access_token)
110 .header("posemesh-client-id", client_id)
111 .header("Accept", "multipart/form-data")
112 .send()
113 .await?;
114 Ok(response)
115}
116
117pub async fn download_metadata_v1(
118 url: &str,
119 client_id: &str,
120 access_token: &str,
121 domain_id: &str,
122 query: &DownloadQuery,
123) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
124 let response = download_v1(url, client_id, access_token, domain_id, query, false).await?;
125 if response.status().is_success() {
126 let data = response.json::<ListDomainDataMetadata>().await?;
127 Ok(data.data)
128 } else {
129 let status = response.status();
130 let text = response
131 .text()
132 .await
133 .unwrap_or_else(|_| "Unknown error".to_string());
134 Err(format!("Failed to download data. Status: {} - {}", status, text).into())
135 }
136}
137
138pub async fn download_v1(
139 url: &str,
140 client_id: &str,
141 access_token: &str,
142 domain_id: &str,
143 query: &DownloadQuery,
144 with_data: bool,
145) -> Result<Response, Box<dyn std::error::Error + Send + Sync>> {
146 let mut params = HashMap::new();
147
148 if let Some(name) = &query.name {
149 params.insert("name", name.clone());
150 }
151 if let Some(data_type) = &query.data_type {
152 params.insert("data_type", data_type.clone());
153 }
154 let ids = if !query.ids.is_empty() {
155 format!("?ids={}", query.ids.join(","))
156 } else {
157 String::new()
158 };
159
160 let response = Client::new()
161 .get(format!("{}/api/v1/domains/{}/data{}", url, domain_id, ids))
162 .bearer_auth(access_token)
163 .header(
164 "Accept",
165 if with_data {
166 "multipart/form-data"
167 } else {
168 "application/json"
169 },
170 )
171 .header("posemesh-client-id", client_id)
172 .query(¶ms)
173 .send()
174 .await?;
175
176 if response.status().is_success() {
177 Ok(response)
178 } else {
179 let status = response.status();
180 let text = response
181 .text()
182 .await
183 .unwrap_or_else(|_| "Unknown error".to_string());
184 Err(format!("Failed to download data. Status: {} - {}", status, text).into())
185 }
186}
187
188pub async fn download_v1_stream(
189 url: &str,
190 client_id: &str,
191 access_token: &str,
192 domain_id: &str,
193 query: &DownloadQuery,
194) -> Result<
195 mpsc::Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
196 Box<dyn std::error::Error + Send + Sync>,
197> {
198 let response = download_v1(url, client_id, access_token, domain_id, query, true).await?;
199
200 stream_from_response(response).await
201}
202
203pub async fn stream_from_response(
206 response: Response,
207) -> Result<
208 mpsc::Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
209 Box<dyn std::error::Error + Send + Sync>,
210> {
211 let (mut tx, rx) =
212 mpsc::channel::<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>(100);
213
214 let boundary = match response
215 .headers()
216 .get("content-type")
217 .and_then(|ct| ct.to_str().ok())
218 .and_then(|ct| {
219 if ct.starts_with("multipart/form-data; boundary=") {
220 Some(ct.split("boundary=").nth(1)?.to_string())
221 } else {
222 None
223 }
224 }) {
225 Some(b) => b,
226 None => {
227 tracing::error!("Invalid content-type header");
228 let _ = tx.close().await;
229 return Err("Invalid content-type header".into());
230 }
231 };
232
233 spawn(async move {
234 let stream = response.bytes_stream();
235 handle_domain_data_stream(tx, stream, &boundary).await;
236 });
237
238 Ok(rx)
239}
240
241pub async fn delete_by_id(
242 url: &str,
243 access_token: &str,
244 domain_id: &str,
245 id: &str,
246) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
247 let endpoint = format!("{}/api/v1/domains/{}/data/{}", url, domain_id, id);
248 let client = Client::new();
249 let resp = client
250 .delete(&endpoint)
251 .bearer_auth(access_token)
252 .send()
253 .await?;
254
255 if resp.status().is_success() {
256 Ok(())
257 } else {
258 let err = resp
259 .text()
260 .await
261 .unwrap_or_else(|_| "Unknown error".to_string());
262 Err(format!("Delete failed with status: {}", err).into())
263 }
264}
265
266#[cfg(not(target_family = "wasm"))]
267pub async fn upload_v1_stream(
268 url: &str,
269 access_token: &str,
270 domain_id: &str,
271 mut rx: mpsc::Receiver<UploadDomainData>,
272) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
273 use futures::channel::oneshot;
274
275 let boundary = "boundary";
276
277 let (mut create_tx, create_rx) = mpsc::channel(100);
278 let (mut update_tx, update_rx) = mpsc::channel(100);
279
280 let create_body = Body::wrap_stream(create_rx.map(Ok::<Vec<u8>, std::io::Error>));
281 let update_body = Body::wrap_stream(update_rx.map(Ok::<Vec<u8>, std::io::Error>));
282
283 let url = url.to_string();
284 let url_2 = url.clone();
285 let access_token = access_token.to_string();
286 let domain_id = domain_id.to_string();
287 let access_token_2 = access_token.clone();
288 let domain_id_2 = domain_id.clone();
289
290 let (create_signal, create_signal_rx) = oneshot::channel::<
291 Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>>,
292 >();
293 let (update_signal, update_signal_rx) = oneshot::channel::<
294 Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>>,
295 >();
296
297 spawn(async move {
298 let create_response =
299 create_v1(&url, &access_token, &domain_id, boundary, create_body).await;
300 if let Err(Err(e)) = create_signal.send(create_response) {
301 tracing::error!("Failed to send create response: {}", e);
302 }
303 });
304
305 spawn(async move {
306 let update_response =
307 update_v1(&url_2, &access_token_2, &domain_id_2, boundary, update_body).await;
308 if let Err(Err(e)) = update_signal.send(update_response) {
309 tracing::error!("Failed to send update response: {}", e);
310 }
311 });
312
313 while let Some(datum) = rx.next().await {
314 match datum.action {
315 DomainAction::Create(create) => {
316 let create_data = write_create_body(boundary, &create, &datum.data);
317 create_tx.clone().send(create_data).await?;
318 }
319 DomainAction::Update(update) => {
320 let update_data = write_update_body(boundary, &update, &datum.data);
321 update_tx.send(update_data).await?;
322 }
323 }
324 }
325 update_tx
326 .send(format!("--{}--\r\n", boundary).as_bytes().to_vec())
327 .await?;
328 create_tx
329 .send(format!("--{}--\r\n", boundary).as_bytes().to_vec())
330 .await?;
331 update_tx.close().await?;
332 create_tx.close().await?;
333
334 let mut data = {
335 if let Ok(res) = create_signal_rx.await {
336 match res {
337 Ok(d) => d,
338 Err(e) => return Err(e),
339 }
340 } else {
341 return Err("create cancelled".into());
342 }
343 };
344
345 if let Ok(res) = update_signal_rx.await {
346 match res {
347 Ok(d) => data.extend(d),
348 Err(e) => return Err(e),
349 }
350 } else {
351 return Err("update cancelled".into());
352 }
353
354 Ok(data)
355}
356
357async fn update_v1(
358 url: &str,
359 access_token: &str,
360 domain_id: &str,
361 boundary: &str,
362 body: Body,
363) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
364 let update_response = Client::new()
365 .put(format!("{}/api/v1/domains/{}/data", url, domain_id))
366 .bearer_auth(access_token)
367 .header(
368 "Content-Type",
369 &format!("multipart/form-data; boundary={}", boundary),
370 )
371 .body(body)
372 .send()
373 .await?;
374
375 if update_response.status().is_success() {
376 let data = update_response
377 .json::<ListDomainDataMetadata>()
378 .await
379 .unwrap();
380 Ok(data.data)
381 } else {
382 let err = update_response
383 .text()
384 .await
385 .unwrap_or_else(|_| "Unknown error".to_string());
386 Err(format!("Update failed with status: {}", err).into())
387 }
388}
389
390async fn create_v1(
391 url: &str,
392 access_token: &str,
393 domain_id: &str,
394 boundary: &str,
395 body: Body,
396) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
397 let create_response = Client::new()
398 .post(format!("{}/api/v1/domains/{}/data", url, domain_id))
399 .bearer_auth(access_token)
400 .header(
401 "Content-Type",
402 &format!("multipart/form-data; boundary={}", boundary),
403 )
404 .body(body)
405 .send()
406 .await?;
407
408 if create_response.status().is_success() {
409 let data = create_response
410 .json::<ListDomainDataMetadata>()
411 .await
412 .unwrap();
413 Ok(data.data)
414 } else {
415 let err = create_response
416 .text()
417 .await
418 .unwrap_or_else(|_| "Unknown error".to_string());
419 Err(format!("Create failed with status: {}", err).into())
420 }
421}
422
423fn write_create_body(boundary: &str, data: &CreateDomainData, data_bytes: &[u8]) -> Vec<u8> {
424 let create_bytes = format!(
425 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
426 boundary, data.name, data.data_type
427 );
428 let mut create_data = create_bytes.into_bytes();
429 create_data.extend_from_slice(data_bytes);
430 create_data.extend_from_slice("\r\n".as_bytes());
431 create_data
432}
433
434fn write_update_body(boundary: &str, data: &UpdateDomainData, data_bytes: &[u8]) -> Vec<u8> {
435 let update_bytes = format!(
436 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; id=\"{}\"\r\n\r\n",
437 boundary, data.id
438 );
439 let mut update_data = update_bytes.into_bytes();
440 update_data.extend_from_slice(data_bytes);
441 update_data.extend_from_slice("\r\n".as_bytes());
442 update_data
443}
444
445pub async fn upload_v1(
446 url: &str,
447 access_token: &str,
448 domain_id: &str,
449 data: Vec<UploadDomainData>,
450) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
451 let boundary = "boundary";
452
453 let mut create_body = Vec::new();
454 let mut update_body = Vec::new();
455 let mut to_update = false;
456 let mut to_create = false;
457
458 for datum in data {
460 match datum.action {
461 DomainAction::Create(create) => {
462 to_create = true;
463 let create_data = write_create_body(boundary, &create, &datum.data);
464 create_body.extend_from_slice(&create_data);
465 }
466 DomainAction::Update(update) => {
467 to_update = true;
468 let update_data = write_update_body(boundary, &update, &datum.data);
469 update_body.extend_from_slice(&update_data);
470 }
471 }
472 }
473
474 create_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
475 update_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
476
477 let create_body = Body::from(create_body);
478 let update_body = Body::from(update_body);
479 let mut res = Vec::new();
480
481 if to_create {
482 res = create_v1(url, access_token, domain_id, boundary, create_body).await?;
483 }
484 if to_update {
485 let update_response =
486 update_v1(url, access_token, domain_id, boundary, update_body).await?;
487 if !update_response.is_empty() {
488 res.extend(update_response);
489 }
490 }
491
492 Ok(res)
493}
494
495#[cfg(not(target_family = "wasm"))]
498pub async fn upload_one(
499 url: &str,
500 access_token: &str,
501 domain_id: &str,
502 data: UploadDomainData,
503) -> Result<Vec<DomainData>, (StatusCode, String)> {
504 let boundary = "boundary";
505 let (method, body) = match &data.action {
506 DomainAction::Create(create) => {
507 let bytes = write_create_body(boundary, create, &data.data);
508 (reqwest::Method::POST, Body::from(bytes))
509 }
510 DomainAction::Update(update) => {
511 let bytes = write_update_body(boundary, update, &data.data);
512 (reqwest::Method::PUT, Body::from(bytes))
513 }
514 };
515
516 let endpoint = format!("{}/api/v1/domains/{}/data", url, domain_id);
517 let response = Client::new()
518 .request(method, endpoint)
519 .bearer_auth(access_token)
520 .header(
521 "Content-Type",
522 &format!("multipart/form-data; boundary={}", boundary),
523 )
524 .body(body)
525 .send()
526 .await
527 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
528
529 if response.status().is_success() {
530 let text = response
532 .text()
533 .await
534 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
535 if let Ok(md) = serde_json::from_str::<ListDomainDataMetadata>(&text) {
537 let items: Vec<DomainData> = md
538 .data
539 .into_iter()
540 .map(|m| DomainData {
541 metadata: m,
542 data: Vec::new(),
543 })
544 .collect();
545 return Ok(items);
546 }
547 let v: serde_json::Value = serde_json::from_str(&text)
549 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
550 let mut out: Vec<DomainData> = Vec::new();
551 if let Some(arr) = v.get("data").and_then(|d| d.as_array()) {
552 for item in arr {
553 let id = item.get("id").and_then(|x| x.as_str()).unwrap_or("").to_string();
554 let domain_id = item
555 .get("domain_id")
556 .and_then(|x| x.as_str())
557 .unwrap_or("")
558 .to_string();
559 let name = item
560 .get("name")
561 .and_then(|x| x.as_str())
562 .unwrap_or("")
563 .to_string();
564 let data_type = item
565 .get("data_type")
566 .and_then(|x| x.as_str())
567 .unwrap_or("")
568 .to_string();
569 out.push(DomainData {
570 metadata: DomainDataMetadata {
571 id,
572 domain_id,
573 name,
574 data_type,
575 size: 0,
576 created_at: String::new(),
577 updated_at: String::new(),
578 },
579 data: Vec::new(),
580 });
581 }
582 return Ok(out);
583 }
584 Err((StatusCode::INTERNAL_SERVER_ERROR, "invalid response".to_string()))
585 } else {
586 let status = response.status();
587 let text = response
588 .text()
589 .await
590 .unwrap_or_else(|_| "Unknown error".to_string());
591 Err((status, text))
592 }
593}
594
595fn parse_headers(
596 headers_slice: &[u8],
597) -> Result<DomainData, Box<dyn std::error::Error + Send + Sync>> {
598 let headers_str = String::from_utf8_lossy(headers_slice);
599 let mut domain_data = None;
600
601 for line in headers_str.lines() {
602 if line.trim().is_empty() {
603 break;
604 }
605 if let Some((key, value)) = line.split_once(':') {
606 let key = key.trim().to_lowercase();
607 if key == "content-disposition" {
608 let mut parsed_domain_data = DomainData {
609 metadata: DomainDataMetadata {
610 id: String::new(),
611 domain_id: String::new(),
612 name: String::new(),
613 data_type: String::new(),
614 size: 0,
615 created_at: String::new(),
616 updated_at: String::new(),
617 },
618 data: Vec::new(),
619 };
620 for part in value.split(';') {
621 let part = part.trim();
622 if let Some((key, value)) = part.split_once('=') {
623 let key = key.trim();
624 let value = value.trim().trim_matches('"');
625 match key {
626 "id" => parsed_domain_data.metadata.id = value.to_string(),
627 "domain-id" => {
628 parsed_domain_data.metadata.domain_id = value.to_string()
629 }
630 "name" => parsed_domain_data.metadata.name = value.to_string(),
631 "data-type" => {
632 parsed_domain_data.metadata.data_type = value.to_string()
633 }
634 "size" => parsed_domain_data.metadata.size = value.parse()?,
635 "created-at" => {
636 parsed_domain_data.metadata.created_at = value.to_string()
637 }
638 "updated-at" => {
639 parsed_domain_data.metadata.updated_at = value.to_string()
640 }
641 _ => {}
642 }
643 }
644 }
645 domain_data = Some(parsed_domain_data);
646 }
647 }
648 }
649
650 if let Some(domain_data) = domain_data {
651 Ok(domain_data)
652 } else {
653 Err("Missing content-disposition header".into())
654 }
655}
656
657fn find_boundary(data: &[u8], boundary: &[u8]) -> Option<usize> {
658 let _data = String::from_utf8_lossy(data);
659 let _boundary = String::from_utf8_lossy(boundary);
660 data.windows(boundary.len())
661 .position(|window| window == boundary)
662}
663
664fn find_headers_end(data: &[u8]) -> Option<usize> {
665 if let Some(i) = data.windows(4).position(|w| w == b"\r\n\r\n") {
666 return Some(i + 4);
667 }
668 data.windows(2)
669 .position(|w| w == b"\n\n")
670 .map(|i| i + 2)
671}
672
673async fn handle_domain_data_stream(
674 mut tx: mpsc::Sender<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
675 stream: impl Stream<Item = Result<Bytes, reqwest::Error>>,
676 boundary: &str,
677) {
678 use futures::pin_mut;
679
680 let mut buffer = Vec::new();
681 let mut current_domain_data: Option<DomainData> = None;
682 let boundary_bytes = format!("--{}", boundary).into_bytes();
683 let keep_tail = boundary_bytes.len() + 4; pin_mut!(stream);
686
687 while let Some(chunk_result) = stream.next().await {
688 let chunk = match chunk_result {
689 Ok(c) if c.is_empty() => continue,
690 Ok(c) => c,
691 Err(e) => {
692 let _ = tx.send(Err(e.into())).await;
693 return;
694 }
695 };
696
697 buffer.extend_from_slice(&chunk);
698
699 'consume: loop {
700 match &mut current_domain_data {
701 None => {
702 let Some(boundary_pos) = find_boundary(&buffer, &boundary_bytes) else {
703 if buffer.len() > keep_tail {
704 buffer.drain(..buffer.len() - keep_tail);
705 }
706 break 'consume;
707 };
708 let Some(header_end_rel) = find_headers_end(&buffer[boundary_pos..]) else {
709 break 'consume;
710 };
711 let headers_slice = &buffer[boundary_pos..boundary_pos + header_end_rel];
712 let part_headers = parse_headers(headers_slice);
713 let domain_data = match part_headers {
714 Ok(d) => d,
715 Err(e) => {
716 tracing::error!("Failed to parse headers: {:?}", e);
717 return;
718 }
719 };
720 buffer.drain(..boundary_pos + header_end_rel);
721 current_domain_data = Some(domain_data);
722 }
723 Some(dd) => {
724 if let Some(next_boundary_pos) = find_boundary(&buffer, &boundary_bytes) {
725 let mut data_end = next_boundary_pos;
726 if data_end >= 2 && &buffer[data_end - 2..data_end] == b"\r\n" {
727 data_end -= 2;
728 } else if data_end >= 1 && buffer[data_end - 1] == b'\n' {
729 data_end -= 1;
730 }
731 dd.data.extend_from_slice(&buffer[..data_end]);
732 buffer.drain(..next_boundary_pos);
733 let finished = current_domain_data.take().unwrap();
734 if tx.send(Ok(finished)).await.is_err() {
735 return;
736 }
737 } else {
738 if buffer.len() > keep_tail {
739 let take = buffer.len() - keep_tail;
740 dd.data.extend_from_slice(&buffer[..take]);
741 buffer.drain(..take);
742 }
743 break 'consume;
744 }
745 }
746 }
747 }
748 }
749
750 let _ = tx.close().await;
751}
752
753#[cfg(test)]
754mod tests {
755 use bytes::Bytes;
756
757 use crate::{auth::TokenCache, config::Config, discovery::DiscoveryService};
758
759 use super::*;
760
761 fn get_config() -> (Config, String) {
762 if std::path::Path::new("../.env.local").exists() {
763 dotenvy::from_filename("../.env.local").ok();
764 dotenvy::dotenv().ok();
765 }
766 let config = Config::from_env().unwrap();
767 (config, std::env::var("DOMAIN_ID").unwrap())
768 }
769
770 #[test]
771 fn test_find_boundary_found() {
772 let data = b"random--boundary--data";
773 let boundary = b"--boundary";
774 assert_eq!(find_boundary(data, boundary), Some(6));
775 }
776
777 #[test]
778 fn test_find_boundary_not_found() {
779 let data = b"random-data";
780 let boundary = b"--boundary";
781 assert_eq!(find_boundary(data, boundary), None);
782 }
783
784 #[test]
785 fn test_find_headers_end_crlf() {
786 let data = b"header1: value1\r\nheader2: value2\r\n\r\nbody";
787 assert_eq!(find_headers_end(data), Some(36));
788 }
789
790 #[test]
791 fn test_find_headers_end_lf() {
792 let data = b"header1: value1\nheader2: value2\n\nbody";
793 assert_eq!(find_headers_end(data), Some(33));
794 }
795
796 #[test]
797 fn test_find_headers_end_none() {
798 let data = b"header1: value1\nheader2: value2\nbody";
799 assert_eq!(find_headers_end(data), None);
800 }
801
802 #[test]
803 fn test_parse_headers_success() {
804 let headers = b"content-disposition: form-data; id=\"123\"; domain-id=\"abc\"; name=\"test\"; data-type=\"type\"; size=\"42\"; created-at=\"2024-01-01T00:00:00Z\"; updated-at=\"2024-01-02T00:00:00Z\"\r\n\r\n";
805 let parsed = super::parse_headers(headers);
806 assert!(parsed.is_ok());
807 let domain_data = parsed.unwrap();
808 assert_eq!(domain_data.metadata.id, "123");
809 assert_eq!(domain_data.metadata.domain_id, "abc");
810 assert_eq!(domain_data.metadata.name, "test");
811 assert_eq!(domain_data.metadata.data_type, "type");
812 assert_eq!(domain_data.metadata.size, 42);
813 assert_eq!(domain_data.metadata.created_at, "2024-01-01T00:00:00Z");
814 assert_eq!(domain_data.metadata.updated_at, "2024-01-02T00:00:00Z");
815 }
816
817 #[test]
818 fn test_parse_headers_missing_content_disposition() {
819 let headers = b"content-type: application/octet-stream\r\n\r\n";
820 let parsed = super::parse_headers(headers);
821 assert!(parsed.is_err());
822 }
823
824 #[tokio::test]
825 async fn test_a_chunk_contains_multiple_data() {
826 let (tx, rx) = mpsc::channel(10);
827
828 let payload = br#"
829 --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
830Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
831Content-Type: application/octet-stream
832
833{"test": "test"}
834--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
835Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
836Content-Type: application/octet-stream
837
838{"test": "test updated"}
839--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
840 "#;
841 let stream = tokio_stream::iter(vec![Ok(Bytes::from_static(payload))]);
842
843 handle_domain_data_stream(
844 tx,
845 stream,
846 "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
847 )
848 .await;
849
850 let output: Vec<DomainData> = rx
851 .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
852 .await
853 .into_iter()
854 .map(|r| r.unwrap())
855 .collect();
856 assert_eq!(output.len(), 2);
857 assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
858 assert_eq!(output[0].data, b"{\"test\": \"test\"}");
859 }
860
861 #[tokio::test]
862 async fn test_chunk_size_is_smaller_than_part() {
863 let (tx, rx) = mpsc::channel(10);
864
865 let payload = br#"
866 --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
867Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
868Content-Type: application/octet-stream
869 "#;
870 let payload2 = br#"
871
872{"test": "test"}
873--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
874Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
875Content-Type: application/octet-stream
876
877{"test": "test updated"}
878--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
879"#;
880 let stream = tokio_stream::iter(vec![
881 Ok(Bytes::from_static(payload)),
882 Ok(Bytes::from_static(payload2)),
883 ]);
884
885 handle_domain_data_stream(
886 tx,
887 stream,
888 "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
889 )
890 .await;
891
892 let output: Vec<DomainData> = rx
893 .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
894 .await
895 .into_iter()
896 .map(|r| r.unwrap())
897 .collect();
898 assert_eq!(output.len(), 2);
899 assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
900 assert_eq!(output[0].data, b"{\"test\": \"test\"}");
901 }
902
903 #[tokio::test]
904 async fn test_chunk_size_is_smaller_than_header() {
905 let (tx, rx) = mpsc::channel(10);
906
907 let payload = br#"
908 --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
909Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
910Content-Type: application/octet-stream
911 "#;
912 let payload2 = br#"
913e: application/octet-stream
914
915{"test": "test"}
916--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
917Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
918Content-Type: application/octet-stream
919
920{"test": "test updated"}
921--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
922"#;
923 let stream = tokio_stream::iter(vec![
924 Ok(Bytes::from_static(payload)),
925 Ok(Bytes::from_static(payload2)),
926 ]);
927
928 handle_domain_data_stream(
929 tx,
930 stream,
931 "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
932 )
933 .await;
934
935 let output: Vec<DomainData> = rx
936 .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
937 .await
938 .into_iter()
939 .map(|r| r.unwrap())
940 .collect();
941 assert_eq!(output.len(), 2);
942 assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
943 assert_eq!(output[0].data, b"{\"test\": \"test\"}");
944 }
945
946 #[tokio::test]
947 async fn test_chunk_size_doesnt_cover_the_whole_data() {
948 let (tx, rx) = mpsc::channel(10);
949
950 let payload = br#"
951 --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
952Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
953Content-Type: application/octet-stream
954
955{"test": "test"#;
956 let payload2 = br#""}
957--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
958Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
959Content-Type: application/octet-stream
960
961{"test": "test updated"}
962--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
963"#;
964 let stream = tokio_stream::iter(vec![
965 Ok(Bytes::from_static(payload)),
966 Ok(Bytes::from_static(payload2)),
967 ]);
968
969 handle_domain_data_stream(
970 tx,
971 stream,
972 "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
973 )
974 .await;
975
976 let output: Vec<DomainData> = rx
977 .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
978 .await
979 .into_iter()
980 .map(|r| r.unwrap())
981 .collect();
982 assert_eq!(output.len(), 2);
983 assert_eq!(
984 std::str::from_utf8(&output[1].data).unwrap(),
985 "{\"test\": \"test updated\"}"
986 );
987 assert_eq!(
988 std::str::from_utf8(&output[0].data).unwrap(),
989 "{\"test\": \"test\"}"
990 );
991 }
992
993 #[tokio::test]
994 async fn test_upload_v1_with_user_dds_access_token() {
995 use crate::domain_data::{CreateDomainData, DomainAction, UploadDomainData};
996
997 let (config, domain_id) = get_config();
998
999 let mut discovery =
1000 DiscoveryService::new(&config.api_url, &config.dds_url, &config.client_id);
1001 discovery
1002 .sign_in_with_auki_account(&config.email.unwrap(), &config.password.unwrap(), false)
1003 .await
1004 .expect("sign_in_with_auki_account failed");
1005 let domain = discovery
1006 .auth_domain(&domain_id)
1007 .await
1008 .expect("get_domain failed");
1009 let upload_data = vec![
1011 UploadDomainData {
1012 action: DomainAction::Create(CreateDomainData {
1013 name: "test_upload".to_string(),
1014 data_type: "test".to_string(),
1015 }),
1016 data: b"hello world".to_vec(),
1017 },
1018 UploadDomainData {
1019 action: DomainAction::Update(UpdateDomainData {
1020 id: "a84a36e5-312b-4f80-974a-06f5d19c1e16".to_string(),
1021 }),
1022 data: b"{\"test\": \"test updated\"}".to_vec(),
1023 },
1024 ];
1025
1026 let result = upload_v1(
1028 &domain.domain.domain_server.url,
1029 &domain.get_access_token(),
1030 &domain_id,
1031 upload_data,
1032 )
1033 .await
1034 .expect("upload_v1 failed");
1035
1036 assert_eq!(result.len(), 2, "No metadata returned from upload_v1");
1037 for data in result {
1038 if data.id != "a84a36e5-312b-4f80-974a-06f5d19c1e16" {
1039 assert_eq!(data.name, "test_upload");
1040 delete_by_id(
1041 &domain.domain.domain_server.url,
1042 &domain.get_access_token(),
1043 &domain_id,
1044 &data.id,
1045 )
1046 .await
1047 .expect("delete_by_id failed");
1048 }
1049 }
1050 }
1051}