1use bytes::Bytes;
2use futures::{SinkExt, Stream, channel::mpsc, stream::StreamExt};
3use reqwest::{Body, Client, Response, StatusCode};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6#[cfg(not(target_family = "wasm"))]
7use tokio::spawn;
8#[cfg(target_family = "wasm")]
9use wasm_bindgen_futures::spawn_local as spawn;
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct DomainDataMetadata {
13 pub id: String,
14 pub domain_id: String,
15 pub name: String,
16 pub data_type: String,
17 pub size: u64,
18 pub created_at: String,
19 pub updated_at: String,
20}
21
22#[derive(Debug, Deserialize, Serialize)]
23pub struct DomainData {
24 pub metadata: DomainDataMetadata,
26 pub data: Vec<u8>,
27}
28
29#[derive(Debug, Serialize, Clone, Deserialize)]
30pub struct UpdateDomainData {
31 pub id: String,
32}
33
34#[derive(Debug, Serialize, Clone, Deserialize)]
35pub struct CreateDomainData {
36 pub name: String,
37 pub data_type: String,
38}
39
40#[derive(Debug, Serialize, Deserialize)]
41#[serde(untagged)]
42pub enum DomainAction {
43 Create(CreateDomainData),
44 Update(UpdateDomainData),
45}
46
47#[derive(Debug, Serialize, Deserialize)]
48pub struct UploadDomainData {
49 #[serde(flatten)]
50 pub action: DomainAction,
51 pub data: Vec<u8>,
52}
53
54#[derive(Debug, Serialize, Deserialize)]
55pub struct DownloadQuery {
56 pub ids: Vec<String>,
57 pub name: Option<String>,
58 pub data_type: Option<String>,
59}
60
61#[derive(Debug, Deserialize)]
62struct ListDomainDataMetadata {
63 pub data: Vec<DomainDataMetadata>,
64}
65
66pub async fn download_by_id(
67 url: &str,
68 client_id: &str,
69 access_token: &str,
70 domain_id: &str,
71 id: &str,
72) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
73 let base = url.trim_end_matches('/');
74 let response = Client::new()
75 .get(format!(
76 "{}/api/v1/domains/{}/data/{}?raw=true",
77 base, domain_id, id
78 ))
79 .bearer_auth(access_token)
80 .header("posemesh-client-id", client_id)
81 .send()
82 .await?;
83
84 if response.status().is_success() {
85 let data = response.bytes().await?;
86 Ok(data.to_vec())
87 } else {
88 let status = response.status();
89 let text = response
90 .text()
91 .await
92 .unwrap_or_else(|_| "Unknown error".to_string());
93 Err(format!(
94 "Failed to download data by id. Status: {} - {}",
95 status, text
96 )
97 .into())
98 }
99}
100
101pub async fn request_download_absolute(
104 url: &str,
105 client_id: &str,
106 access_token: &str,
107) -> Result<Response, Box<dyn std::error::Error + Send + Sync>> {
108 let response = Client::new()
109 .get(url)
110 .bearer_auth(access_token)
111 .header("posemesh-client-id", client_id)
112 .header("Accept", "multipart/form-data")
113 .send()
114 .await?;
115 Ok(response)
116}
117
118pub async fn download_metadata_v1(
119 url: &str,
120 client_id: &str,
121 access_token: &str,
122 domain_id: &str,
123 query: &DownloadQuery,
124) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
125 let response = download_v1(url, client_id, access_token, domain_id, query, false).await?;
126 if response.status().is_success() {
127 let data = response.json::<ListDomainDataMetadata>().await?;
128 Ok(data.data)
129 } else {
130 let status = response.status();
131 let text = response
132 .text()
133 .await
134 .unwrap_or_else(|_| "Unknown error".to_string());
135 Err(format!("Failed to download data. Status: {} - {}", status, text).into())
136 }
137}
138
139pub async fn download_v1(
140 url: &str,
141 client_id: &str,
142 access_token: &str,
143 domain_id: &str,
144 query: &DownloadQuery,
145 with_data: bool,
146) -> Result<Response, Box<dyn std::error::Error + Send + Sync>> {
147 let mut params = HashMap::new();
148
149 if let Some(name) = &query.name {
150 params.insert("name", name.clone());
151 }
152 if let Some(data_type) = &query.data_type {
153 params.insert("data_type", data_type.clone());
154 }
155 let ids = if !query.ids.is_empty() {
156 format!("?ids={}", query.ids.join(","))
157 } else {
158 String::new()
159 };
160
161 let base = url.trim_end_matches('/');
162 let response = Client::new()
163 .get(format!("{}/api/v1/domains/{}/data{}", base, domain_id, ids))
164 .bearer_auth(access_token)
165 .header(
166 "Accept",
167 if with_data {
168 "multipart/form-data"
169 } else {
170 "application/json"
171 },
172 )
173 .header("posemesh-client-id", client_id)
174 .query(¶ms)
175 .send()
176 .await?;
177
178 if response.status().is_success() {
179 Ok(response)
180 } else {
181 let status = response.status();
182 let text = response
183 .text()
184 .await
185 .unwrap_or_else(|_| "Unknown error".to_string());
186 Err(format!("Failed to download data. Status: {} - {}", status, text).into())
187 }
188}
189
190pub async fn download_v1_stream(
191 url: &str,
192 client_id: &str,
193 access_token: &str,
194 domain_id: &str,
195 query: &DownloadQuery,
196) -> Result<
197 mpsc::Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
198 Box<dyn std::error::Error + Send + Sync>,
199> {
200 let response = download_v1(url, client_id, access_token, domain_id, query, true).await?;
201
202 stream_from_response(response).await
203}
204
205pub async fn stream_from_response(
208 response: Response,
209) -> Result<
210 mpsc::Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
211 Box<dyn std::error::Error + Send + Sync>,
212> {
213 let (mut tx, rx) =
214 mpsc::channel::<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>(100);
215
216 let boundary = match response
217 .headers()
218 .get("content-type")
219 .and_then(|ct| ct.to_str().ok())
220 .and_then(|ct| {
221 if ct.starts_with("multipart/form-data; boundary=") {
222 Some(ct.split("boundary=").nth(1)?.to_string())
223 } else {
224 None
225 }
226 }) {
227 Some(b) => b,
228 None => {
229 tracing::error!("Invalid content-type header");
230 let _ = tx.close().await;
231 return Err("Invalid content-type header".into());
232 }
233 };
234
235 spawn(async move {
236 let stream = response.bytes_stream();
237 handle_domain_data_stream(tx, stream, &boundary).await;
238 });
239
240 Ok(rx)
241}
242
243pub async fn delete_by_id(
244 url: &str,
245 access_token: &str,
246 domain_id: &str,
247 id: &str,
248) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
249 let base = url.trim_end_matches('/');
250 let endpoint = format!("{}/api/v1/domains/{}/data/{}", base, domain_id, id);
251 let client = Client::new();
252 let resp = client
253 .delete(&endpoint)
254 .bearer_auth(access_token)
255 .send()
256 .await?;
257
258 if resp.status().is_success() {
259 Ok(())
260 } else {
261 let err = resp
262 .text()
263 .await
264 .unwrap_or_else(|_| "Unknown error".to_string());
265 Err(format!("Delete failed with status: {}", err).into())
266 }
267}
268
269#[cfg(not(target_family = "wasm"))]
270pub async fn upload_v1_stream(
271 url: &str,
272 access_token: &str,
273 domain_id: &str,
274 mut rx: mpsc::Receiver<UploadDomainData>,
275) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
276 use futures::channel::oneshot;
277
278 let boundary = "boundary";
279
280 let (mut create_tx, create_rx) = mpsc::channel(100);
281 let (mut update_tx, update_rx) = mpsc::channel(100);
282
283 let create_body = Body::wrap_stream(create_rx.map(Ok::<Vec<u8>, std::io::Error>));
284 let update_body = Body::wrap_stream(update_rx.map(Ok::<Vec<u8>, std::io::Error>));
285
286 let url = url.to_string();
287 let url_2 = url.clone();
288 let access_token = access_token.to_string();
289 let domain_id = domain_id.to_string();
290 let access_token_2 = access_token.clone();
291 let domain_id_2 = domain_id.clone();
292
293 let (create_signal, create_signal_rx) = oneshot::channel::<
294 Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>>,
295 >();
296 let (update_signal, update_signal_rx) = oneshot::channel::<
297 Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>>,
298 >();
299
300 spawn(async move {
301 let create_response =
302 create_v1(&url, &access_token, &domain_id, boundary, create_body).await;
303 if let Err(Err(e)) = create_signal.send(create_response) {
304 tracing::error!("Failed to send create response: {}", e);
305 }
306 });
307
308 spawn(async move {
309 let update_response =
310 update_v1(&url_2, &access_token_2, &domain_id_2, boundary, update_body).await;
311 if let Err(Err(e)) = update_signal.send(update_response) {
312 tracing::error!("Failed to send update response: {}", e);
313 }
314 });
315
316 while let Some(datum) = rx.next().await {
317 match datum.action {
318 DomainAction::Create(create) => {
319 let create_data = write_create_body(boundary, &create, &datum.data);
320 create_tx.clone().send(create_data).await?;
321 }
322 DomainAction::Update(update) => {
323 let update_data = write_update_body(boundary, &update, &datum.data);
324 update_tx.send(update_data).await?;
325 }
326 }
327 }
328 update_tx
329 .send(format!("--{}--\r\n", boundary).as_bytes().to_vec())
330 .await?;
331 create_tx
332 .send(format!("--{}--\r\n", boundary).as_bytes().to_vec())
333 .await?;
334 update_tx.close().await?;
335 create_tx.close().await?;
336
337 let mut data = {
338 if let Ok(res) = create_signal_rx.await {
339 match res {
340 Ok(d) => d,
341 Err(e) => return Err(e),
342 }
343 } else {
344 return Err("create cancelled".into());
345 }
346 };
347
348 if let Ok(res) = update_signal_rx.await {
349 match res {
350 Ok(d) => data.extend(d),
351 Err(e) => return Err(e),
352 }
353 } else {
354 return Err("update cancelled".into());
355 }
356
357 Ok(data)
358}
359
360async fn update_v1(
361 url: &str,
362 access_token: &str,
363 domain_id: &str,
364 boundary: &str,
365 body: Body,
366) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
367 let base = url.trim_end_matches('/');
368 let update_response = Client::new()
369 .put(format!("{}/api/v1/domains/{}/data", base, domain_id))
370 .bearer_auth(access_token)
371 .header(
372 "Content-Type",
373 &format!("multipart/form-data; boundary={}", boundary),
374 )
375 .body(body)
376 .send()
377 .await?;
378
379 if update_response.status().is_success() {
380 let data = update_response
381 .json::<ListDomainDataMetadata>()
382 .await
383 .unwrap();
384 Ok(data.data)
385 } else {
386 let err = update_response
387 .text()
388 .await
389 .unwrap_or_else(|_| "Unknown error".to_string());
390 Err(format!("Update failed with status: {}", err).into())
391 }
392}
393
394async fn create_v1(
395 url: &str,
396 access_token: &str,
397 domain_id: &str,
398 boundary: &str,
399 body: Body,
400) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
401 let base = url.trim_end_matches('/');
402 let create_response = Client::new()
403 .post(format!("{}/api/v1/domains/{}/data", base, domain_id))
404 .bearer_auth(access_token)
405 .header(
406 "Content-Type",
407 &format!("multipart/form-data; boundary={}", boundary),
408 )
409 .body(body)
410 .send()
411 .await?;
412
413 if create_response.status().is_success() {
414 let data = create_response
415 .json::<ListDomainDataMetadata>()
416 .await
417 .unwrap();
418 Ok(data.data)
419 } else {
420 let err = create_response
421 .text()
422 .await
423 .unwrap_or_else(|_| "Unknown error".to_string());
424 Err(format!("Create failed with status: {}", err).into())
425 }
426}
427
428fn write_create_body(boundary: &str, data: &CreateDomainData, data_bytes: &[u8]) -> Vec<u8> {
429 let create_bytes = format!(
430 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
431 boundary, data.name, data.data_type
432 );
433 let mut create_data = create_bytes.into_bytes();
434 create_data.extend_from_slice(data_bytes);
435 create_data.extend_from_slice("\r\n".as_bytes());
436 create_data
437}
438
439fn write_update_body(boundary: &str, data: &UpdateDomainData, data_bytes: &[u8]) -> Vec<u8> {
440 let update_bytes = format!(
441 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; id=\"{}\"\r\n\r\n",
442 boundary, data.id
443 );
444 let mut update_data = update_bytes.into_bytes();
445 update_data.extend_from_slice(data_bytes);
446 update_data.extend_from_slice("\r\n".as_bytes());
447 update_data
448}
449
450pub async fn upload_v1(
451 url: &str,
452 access_token: &str,
453 domain_id: &str,
454 data: Vec<UploadDomainData>,
455) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
456 let boundary = "boundary";
457
458 let mut create_body = Vec::new();
459 let mut update_body = Vec::new();
460 let mut to_update = false;
461 let mut to_create = false;
462
463 for datum in data {
465 match datum.action {
466 DomainAction::Create(create) => {
467 to_create = true;
468 let create_data = write_create_body(boundary, &create, &datum.data);
469 create_body.extend_from_slice(&create_data);
470 }
471 DomainAction::Update(update) => {
472 to_update = true;
473 let update_data = write_update_body(boundary, &update, &datum.data);
474 update_body.extend_from_slice(&update_data);
475 }
476 }
477 }
478
479 create_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
480 update_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
481
482 let create_body = Body::from(create_body);
483 let update_body = Body::from(update_body);
484 let mut res = Vec::new();
485
486 if to_create {
487 res = create_v1(url, access_token, domain_id, boundary, create_body).await?;
488 }
489 if to_update {
490 let update_response =
491 update_v1(url, access_token, domain_id, boundary, update_body).await?;
492 if !update_response.is_empty() {
493 res.extend(update_response);
494 }
495 }
496
497 Ok(res)
498}
499
500#[cfg(not(target_family = "wasm"))]
503pub async fn upload_one(
504 url: &str,
505 access_token: &str,
506 domain_id: &str,
507 data: UploadDomainData,
508) -> Result<Vec<DomainData>, (StatusCode, String)> {
509 let boundary = "boundary";
510 let (method, body) = match &data.action {
511 DomainAction::Create(create) => {
512 let mut bytes = write_create_body(boundary, create, &data.data);
513 bytes.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
515 (reqwest::Method::POST, Body::from(bytes))
516 }
517 DomainAction::Update(update) => {
518 let mut bytes = write_update_body(boundary, update, &data.data);
519 bytes.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
521 (reqwest::Method::PUT, Body::from(bytes))
522 }
523 };
524
525 let base = url.trim_end_matches('/');
526 let endpoint = format!("{}/api/v1/domains/{}/data", base, domain_id);
527 let response = Client::new()
528 .request(method, endpoint)
529 .bearer_auth(access_token)
530 .header(
531 "Content-Type",
532 &format!("multipart/form-data; boundary={}", boundary),
533 )
534 .body(body)
535 .send()
536 .await
537 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
538
539 if response.status().is_success() {
540 let text = response
542 .text()
543 .await
544 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
545 if let Ok(md) = serde_json::from_str::<ListDomainDataMetadata>(&text) {
547 let items: Vec<DomainData> = md
548 .data
549 .into_iter()
550 .map(|m| DomainData {
551 metadata: m,
552 data: Vec::new(),
553 })
554 .collect();
555 return Ok(items);
556 }
557 let v: serde_json::Value = serde_json::from_str(&text)
559 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
560 let mut out: Vec<DomainData> = Vec::new();
561 if let Some(arr) = v.get("data").and_then(|d| d.as_array()) {
562 for item in arr {
563 let id = item.get("id").and_then(|x| x.as_str()).unwrap_or("").to_string();
564 let domain_id = item
565 .get("domain_id")
566 .and_then(|x| x.as_str())
567 .unwrap_or("")
568 .to_string();
569 let name = item
570 .get("name")
571 .and_then(|x| x.as_str())
572 .unwrap_or("")
573 .to_string();
574 let data_type = item
575 .get("data_type")
576 .and_then(|x| x.as_str())
577 .unwrap_or("")
578 .to_string();
579 out.push(DomainData {
580 metadata: DomainDataMetadata {
581 id,
582 domain_id,
583 name,
584 data_type,
585 size: 0,
586 created_at: String::new(),
587 updated_at: String::new(),
588 },
589 data: Vec::new(),
590 });
591 }
592 return Ok(out);
593 }
594 Err((StatusCode::INTERNAL_SERVER_ERROR, "invalid response".to_string()))
595 } else {
596 let status = response.status();
597 let text = response
598 .text()
599 .await
600 .unwrap_or_else(|_| "Unknown error".to_string());
601 Err((status, text))
602 }
603}
604
605fn parse_headers(
606 headers_slice: &[u8],
607) -> Result<DomainData, Box<dyn std::error::Error + Send + Sync>> {
608 let headers_str = String::from_utf8_lossy(headers_slice);
609 let mut domain_data = None;
610
611 for line in headers_str.lines() {
612 if line.trim().is_empty() {
613 break;
614 }
615 if let Some((key, value)) = line.split_once(':') {
616 let key = key.trim().to_lowercase();
617 if key == "content-disposition" {
618 let mut parsed_domain_data = DomainData {
619 metadata: DomainDataMetadata {
620 id: String::new(),
621 domain_id: String::new(),
622 name: String::new(),
623 data_type: String::new(),
624 size: 0,
625 created_at: String::new(),
626 updated_at: String::new(),
627 },
628 data: Vec::new(),
629 };
630 for part in value.split(';') {
631 let part = part.trim();
632 if let Some((key, value)) = part.split_once('=') {
633 let key = key.trim();
634 let value = value.trim().trim_matches('"');
635 match key {
636 "id" => parsed_domain_data.metadata.id = value.to_string(),
637 "domain-id" => {
638 parsed_domain_data.metadata.domain_id = value.to_string()
639 }
640 "name" => parsed_domain_data.metadata.name = value.to_string(),
641 "data-type" => {
642 parsed_domain_data.metadata.data_type = value.to_string()
643 }
644 "size" => parsed_domain_data.metadata.size = value.parse()?,
645 "created-at" => {
646 parsed_domain_data.metadata.created_at = value.to_string()
647 }
648 "updated-at" => {
649 parsed_domain_data.metadata.updated_at = value.to_string()
650 }
651 _ => {}
652 }
653 }
654 }
655 domain_data = Some(parsed_domain_data);
656 }
657 }
658 }
659
660 if let Some(domain_data) = domain_data {
661 Ok(domain_data)
662 } else {
663 Err("Missing content-disposition header".into())
664 }
665}
666
667fn find_boundary(data: &[u8], boundary: &[u8]) -> Option<usize> {
668 let _data = String::from_utf8_lossy(data);
669 let _boundary = String::from_utf8_lossy(boundary);
670 data.windows(boundary.len())
671 .position(|window| window == boundary)
672}
673
674fn find_headers_end(data: &[u8]) -> Option<usize> {
675 if let Some(i) = data.windows(4).position(|w| w == b"\r\n\r\n") {
676 return Some(i + 4);
677 }
678 data.windows(2)
679 .position(|w| w == b"\n\n")
680 .map(|i| i + 2)
681}
682
683async fn handle_domain_data_stream(
684 mut tx: mpsc::Sender<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
685 stream: impl Stream<Item = Result<Bytes, reqwest::Error>>,
686 boundary: &str,
687) {
688 use futures::pin_mut;
689
690 let mut buffer = Vec::new();
691 let mut current_domain_data: Option<DomainData> = None;
692 let boundary_bytes = format!("--{}", boundary).into_bytes();
693 let keep_tail = boundary_bytes.len() + 4; pin_mut!(stream);
696
697 while let Some(chunk_result) = stream.next().await {
698 let chunk = match chunk_result {
699 Ok(c) if c.is_empty() => continue,
700 Ok(c) => c,
701 Err(e) => {
702 let _ = tx.send(Err(e.into())).await;
703 return;
704 }
705 };
706
707 buffer.extend_from_slice(&chunk);
708
709 'consume: loop {
710 match &mut current_domain_data {
711 None => {
712 let Some(boundary_pos) = find_boundary(&buffer, &boundary_bytes) else {
713 if buffer.len() > keep_tail {
714 buffer.drain(..buffer.len() - keep_tail);
715 }
716 break 'consume;
717 };
718 let Some(header_end_rel) = find_headers_end(&buffer[boundary_pos..]) else {
719 break 'consume;
720 };
721 let headers_slice = &buffer[boundary_pos..boundary_pos + header_end_rel];
722 let part_headers = parse_headers(headers_slice);
723 let domain_data = match part_headers {
724 Ok(d) => d,
725 Err(e) => {
726 tracing::error!("Failed to parse headers: {:?}", e);
727 return;
728 }
729 };
730 buffer.drain(..boundary_pos + header_end_rel);
731 current_domain_data = Some(domain_data);
732 }
733 Some(dd) => {
734 if let Some(next_boundary_pos) = find_boundary(&buffer, &boundary_bytes) {
735 let mut data_end = next_boundary_pos;
736 if data_end >= 2 && &buffer[data_end - 2..data_end] == b"\r\n" {
737 data_end -= 2;
738 } else if data_end >= 1 && buffer[data_end - 1] == b'\n' {
739 data_end -= 1;
740 }
741 dd.data.extend_from_slice(&buffer[..data_end]);
742 buffer.drain(..next_boundary_pos);
743 let finished = current_domain_data.take().unwrap();
744 if tx.send(Ok(finished)).await.is_err() {
745 return;
746 }
747 } else {
748 if buffer.len() > keep_tail {
749 let take = buffer.len() - keep_tail;
750 dd.data.extend_from_slice(&buffer[..take]);
751 buffer.drain(..take);
752 }
753 break 'consume;
754 }
755 }
756 }
757 }
758 }
759
760 let _ = tx.close().await;
761}
762
763#[cfg(test)]
764mod tests {
765 use bytes::Bytes;
766
767 use crate::{auth::TokenCache, config::Config, discovery::DiscoveryService};
768
769 use super::*;
770
771 fn get_config() -> (Config, String) {
772 if std::path::Path::new("../.env.local").exists() {
773 dotenvy::from_filename("../.env.local").ok();
774 dotenvy::dotenv().ok();
775 }
776 let config = Config::from_env().unwrap();
777 (config, std::env::var("DOMAIN_ID").unwrap())
778 }
779
780 #[test]
781 fn test_find_boundary_found() {
782 let data = b"random--boundary--data";
783 let boundary = b"--boundary";
784 assert_eq!(find_boundary(data, boundary), Some(6));
785 }
786
787 #[test]
788 fn test_find_boundary_not_found() {
789 let data = b"random-data";
790 let boundary = b"--boundary";
791 assert_eq!(find_boundary(data, boundary), None);
792 }
793
794 #[test]
795 fn test_find_headers_end_crlf() {
796 let data = b"header1: value1\r\nheader2: value2\r\n\r\nbody";
797 assert_eq!(find_headers_end(data), Some(36));
798 }
799
800 #[test]
801 fn test_find_headers_end_lf() {
802 let data = b"header1: value1\nheader2: value2\n\nbody";
803 assert_eq!(find_headers_end(data), Some(33));
804 }
805
806 #[test]
807 fn test_find_headers_end_none() {
808 let data = b"header1: value1\nheader2: value2\nbody";
809 assert_eq!(find_headers_end(data), None);
810 }
811
812 #[test]
813 fn test_parse_headers_success() {
814 let headers = b"content-disposition: form-data; id=\"123\"; domain-id=\"abc\"; name=\"test\"; data-type=\"type\"; size=\"42\"; created-at=\"2024-01-01T00:00:00Z\"; updated-at=\"2024-01-02T00:00:00Z\"\r\n\r\n";
815 let parsed = super::parse_headers(headers);
816 assert!(parsed.is_ok());
817 let domain_data = parsed.unwrap();
818 assert_eq!(domain_data.metadata.id, "123");
819 assert_eq!(domain_data.metadata.domain_id, "abc");
820 assert_eq!(domain_data.metadata.name, "test");
821 assert_eq!(domain_data.metadata.data_type, "type");
822 assert_eq!(domain_data.metadata.size, 42);
823 assert_eq!(domain_data.metadata.created_at, "2024-01-01T00:00:00Z");
824 assert_eq!(domain_data.metadata.updated_at, "2024-01-02T00:00:00Z");
825 }
826
827 #[test]
828 fn test_parse_headers_missing_content_disposition() {
829 let headers = b"content-type: application/octet-stream\r\n\r\n";
830 let parsed = super::parse_headers(headers);
831 assert!(parsed.is_err());
832 }
833
834 #[tokio::test]
835 async fn test_a_chunk_contains_multiple_data() {
836 let (tx, rx) = mpsc::channel(10);
837
838 let payload = br#"
839 --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
840Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
841Content-Type: application/octet-stream
842
843{"test": "test"}
844--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
845Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
846Content-Type: application/octet-stream
847
848{"test": "test updated"}
849--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
850 "#;
851 let stream = tokio_stream::iter(vec![Ok(Bytes::from_static(payload))]);
852
853 handle_domain_data_stream(
854 tx,
855 stream,
856 "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
857 )
858 .await;
859
860 let output: Vec<DomainData> = rx
861 .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
862 .await
863 .into_iter()
864 .map(|r| r.unwrap())
865 .collect();
866 assert_eq!(output.len(), 2);
867 assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
868 assert_eq!(output[0].data, b"{\"test\": \"test\"}");
869 }
870
871 #[tokio::test]
872 async fn test_chunk_size_is_smaller_than_part() {
873 let (tx, rx) = mpsc::channel(10);
874
875 let payload = br#"
876 --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
877Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
878Content-Type: application/octet-stream
879 "#;
880 let payload2 = br#"
881
882{"test": "test"}
883--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
884Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
885Content-Type: application/octet-stream
886
887{"test": "test updated"}
888--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
889"#;
890 let stream = tokio_stream::iter(vec![
891 Ok(Bytes::from_static(payload)),
892 Ok(Bytes::from_static(payload2)),
893 ]);
894
895 handle_domain_data_stream(
896 tx,
897 stream,
898 "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
899 )
900 .await;
901
902 let output: Vec<DomainData> = rx
903 .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
904 .await
905 .into_iter()
906 .map(|r| r.unwrap())
907 .collect();
908 assert_eq!(output.len(), 2);
909 assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
910 assert_eq!(output[0].data, b"{\"test\": \"test\"}");
911 }
912
913 #[tokio::test]
914 async fn test_chunk_size_is_smaller_than_header() {
915 let (tx, rx) = mpsc::channel(10);
916
917 let payload = br#"
918 --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
919Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
920Content-Type: application/octet-stream
921 "#;
922 let payload2 = br#"
923e: application/octet-stream
924
925{"test": "test"}
926--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
927Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
928Content-Type: application/octet-stream
929
930{"test": "test updated"}
931--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
932"#;
933 let stream = tokio_stream::iter(vec![
934 Ok(Bytes::from_static(payload)),
935 Ok(Bytes::from_static(payload2)),
936 ]);
937
938 handle_domain_data_stream(
939 tx,
940 stream,
941 "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
942 )
943 .await;
944
945 let output: Vec<DomainData> = rx
946 .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
947 .await
948 .into_iter()
949 .map(|r| r.unwrap())
950 .collect();
951 assert_eq!(output.len(), 2);
952 assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
953 assert_eq!(output[0].data, b"{\"test\": \"test\"}");
954 }
955
956 #[tokio::test]
957 async fn test_chunk_size_doesnt_cover_the_whole_data() {
958 let (tx, rx) = mpsc::channel(10);
959
960 let payload = br#"
961 --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
962Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
963Content-Type: application/octet-stream
964
965{"test": "test"#;
966 let payload2 = br#""}
967--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
968Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
969Content-Type: application/octet-stream
970
971{"test": "test updated"}
972--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
973"#;
974 let stream = tokio_stream::iter(vec![
975 Ok(Bytes::from_static(payload)),
976 Ok(Bytes::from_static(payload2)),
977 ]);
978
979 handle_domain_data_stream(
980 tx,
981 stream,
982 "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
983 )
984 .await;
985
986 let output: Vec<DomainData> = rx
987 .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
988 .await
989 .into_iter()
990 .map(|r| r.unwrap())
991 .collect();
992 assert_eq!(output.len(), 2);
993 assert_eq!(
994 std::str::from_utf8(&output[1].data).unwrap(),
995 "{\"test\": \"test updated\"}"
996 );
997 assert_eq!(
998 std::str::from_utf8(&output[0].data).unwrap(),
999 "{\"test\": \"test\"}"
1000 );
1001 }
1002
1003 #[tokio::test]
1004 async fn test_upload_v1_with_user_dds_access_token() {
1005 use crate::domain_data::{CreateDomainData, DomainAction, UploadDomainData};
1006
1007 let (config, domain_id) = get_config();
1008
1009 let mut discovery =
1010 DiscoveryService::new(&config.api_url, &config.dds_url, &config.client_id);
1011 discovery
1012 .sign_in_with_auki_account(&config.email.unwrap(), &config.password.unwrap(), false)
1013 .await
1014 .expect("sign_in_with_auki_account failed");
1015 let domain = discovery
1016 .auth_domain(&domain_id)
1017 .await
1018 .expect("get_domain failed");
1019 let upload_data = vec![
1021 UploadDomainData {
1022 action: DomainAction::Create(CreateDomainData {
1023 name: "test_upload".to_string(),
1024 data_type: "test".to_string(),
1025 }),
1026 data: b"hello world".to_vec(),
1027 },
1028 UploadDomainData {
1029 action: DomainAction::Update(UpdateDomainData {
1030 id: "a84a36e5-312b-4f80-974a-06f5d19c1e16".to_string(),
1031 }),
1032 data: b"{\"test\": \"test updated\"}".to_vec(),
1033 },
1034 ];
1035
1036 let result = upload_v1(
1038 &domain.domain.domain_server.url,
1039 &domain.get_access_token(),
1040 &domain_id,
1041 upload_data,
1042 )
1043 .await
1044 .expect("upload_v1 failed");
1045
1046 assert_eq!(result.len(), 2, "No metadata returned from upload_v1");
1047 for data in result {
1048 if data.id != "a84a36e5-312b-4f80-974a-06f5d19c1e16" {
1049 assert_eq!(data.name, "test_upload");
1050 delete_by_id(
1051 &domain.domain.domain_server.url,
1052 &domain.get_access_token(),
1053 &domain_id,
1054 &data.id,
1055 )
1056 .await
1057 .expect("delete_by_id failed");
1058 }
1059 }
1060 }
1061}