1use bytes::Bytes;
2use futures::{SinkExt, Stream, channel::mpsc, stream::StreamExt};
3use reqwest::{Body, Client, Response, StatusCode};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6#[cfg(not(target_family = "wasm"))]
7use tokio::spawn;
8#[cfg(target_family = "wasm")]
9use wasm_bindgen_futures::spawn_local as spawn;
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct DomainDataMetadata {
13 pub id: String,
14 pub domain_id: String,
15 pub name: String,
16 pub data_type: String,
17 pub size: u64,
18 pub created_at: String,
19 pub updated_at: String,
20}
21
22#[derive(Debug, Deserialize, Serialize)]
23pub struct DomainData {
24 pub metadata: DomainDataMetadata,
26 pub data: Vec<u8>,
27}
28
29#[derive(Debug, Serialize, Clone, Deserialize)]
30pub struct UpdateDomainData {
31 pub id: String,
32}
33
34#[derive(Debug, Serialize, Clone, Deserialize)]
35pub struct CreateDomainData {
36 pub name: String,
37 pub data_type: String,
38}
39
40#[derive(Debug, Serialize, Deserialize)]
41#[serde(untagged)]
42pub enum DomainAction {
43 Create(CreateDomainData),
44 Update(UpdateDomainData),
45}
46
47#[derive(Debug, Serialize, Deserialize)]
48pub struct UploadDomainData {
49 #[serde(flatten)]
50 pub action: DomainAction,
51 pub data: Vec<u8>,
52}
53
54#[derive(Debug, Serialize, Deserialize)]
55pub struct DownloadQuery {
56 pub ids: Vec<String>,
57 pub name: Option<String>,
58 pub data_type: Option<String>,
59}
60
61#[derive(Debug, Deserialize)]
62struct ListDomainDataMetadata {
63 pub data: Vec<DomainDataMetadata>,
64}
65
66pub async fn download_by_id(
67 url: &str,
68 client_id: &str,
69 access_token: &str,
70 domain_id: &str,
71 id: &str,
72) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
73 let response = Client::new()
74 .get(format!(
75 "{}/api/v1/domains/{}/data/{}?raw=true",
76 url, domain_id, id
77 ))
78 .bearer_auth(access_token)
79 .header("posemesh-client-id", client_id)
80 .send()
81 .await?;
82
83 if response.status().is_success() {
84 let data = response.bytes().await?;
85 Ok(data.to_vec())
86 } else {
87 let status = response.status();
88 let text = response
89 .text()
90 .await
91 .unwrap_or_else(|_| "Unknown error".to_string());
92 Err(format!(
93 "Failed to download data by id. Status: {} - {}",
94 status, text
95 )
96 .into())
97 }
98}
99
100pub async fn request_download_absolute(
103 url: &str,
104 client_id: &str,
105 access_token: &str,
106) -> Result<Response, Box<dyn std::error::Error + Send + Sync>> {
107 let response = Client::new()
108 .get(url)
109 .bearer_auth(access_token)
110 .header("posemesh-client-id", client_id)
111 .header("Accept", "multipart/form-data")
112 .send()
113 .await?;
114 Ok(response)
115}
116
117pub async fn download_metadata_v1(
118 url: &str,
119 client_id: &str,
120 access_token: &str,
121 domain_id: &str,
122 query: &DownloadQuery,
123) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
124 let response = download_v1(url, client_id, access_token, domain_id, query, false).await?;
125 if response.status().is_success() {
126 let data = response.json::<ListDomainDataMetadata>().await?;
127 Ok(data.data)
128 } else {
129 let status = response.status();
130 let text = response
131 .text()
132 .await
133 .unwrap_or_else(|_| "Unknown error".to_string());
134 Err(format!("Failed to download data. Status: {} - {}", status, text).into())
135 }
136}
137
138pub async fn download_v1(
139 url: &str,
140 client_id: &str,
141 access_token: &str,
142 domain_id: &str,
143 query: &DownloadQuery,
144 with_data: bool,
145) -> Result<Response, Box<dyn std::error::Error + Send + Sync>> {
146 let mut params = HashMap::new();
147
148 if let Some(name) = &query.name {
149 params.insert("name", name.clone());
150 }
151 if let Some(data_type) = &query.data_type {
152 params.insert("data_type", data_type.clone());
153 }
154 let ids = if !query.ids.is_empty() {
155 format!("?ids={}", query.ids.join(","))
156 } else {
157 String::new()
158 };
159
160 let response = Client::new()
161 .get(format!("{}/api/v1/domains/{}/data{}", url, domain_id, ids))
162 .bearer_auth(access_token)
163 .header(
164 "Accept",
165 if with_data {
166 "multipart/form-data"
167 } else {
168 "application/json"
169 },
170 )
171 .header("posemesh-client-id", client_id)
172 .query(¶ms)
173 .send()
174 .await?;
175
176 if response.status().is_success() {
177 Ok(response)
178 } else {
179 let status = response.status();
180 let text = response
181 .text()
182 .await
183 .unwrap_or_else(|_| "Unknown error".to_string());
184 Err(format!("Failed to download data. Status: {} - {}", status, text).into())
185 }
186}
187
188pub async fn download_v1_stream(
189 url: &str,
190 client_id: &str,
191 access_token: &str,
192 domain_id: &str,
193 query: &DownloadQuery,
194) -> Result<
195 mpsc::Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
196 Box<dyn std::error::Error + Send + Sync>,
197> {
198 let response = download_v1(url, client_id, access_token, domain_id, query, true).await?;
199
200 stream_from_response(response).await
201}
202
203pub async fn stream_from_response(
206 response: Response,
207) -> Result<
208 mpsc::Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
209 Box<dyn std::error::Error + Send + Sync>,
210> {
211 let (mut tx, rx) =
212 mpsc::channel::<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>(100);
213
214 let boundary = match response
215 .headers()
216 .get("content-type")
217 .and_then(|ct| ct.to_str().ok())
218 .and_then(|ct| {
219 if ct.starts_with("multipart/form-data; boundary=") {
220 Some(ct.split("boundary=").nth(1)?.to_string())
221 } else {
222 None
223 }
224 }) {
225 Some(b) => b,
226 None => {
227 tracing::error!("Invalid content-type header");
228 let _ = tx.close().await;
229 return Err("Invalid content-type header".into());
230 }
231 };
232
233 spawn(async move {
234 let stream = response.bytes_stream();
235 handle_domain_data_stream(tx, stream, &boundary).await;
236 });
237
238 Ok(rx)
239}
240
241pub async fn delete_by_id(
242 url: &str,
243 access_token: &str,
244 domain_id: &str,
245 id: &str,
246) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
247 let endpoint = format!("{}/api/v1/domains/{}/data/{}", url, domain_id, id);
248 let client = Client::new();
249 let resp = client
250 .delete(&endpoint)
251 .bearer_auth(access_token)
252 .send()
253 .await?;
254
255 if resp.status().is_success() {
256 Ok(())
257 } else {
258 let err = resp
259 .text()
260 .await
261 .unwrap_or_else(|_| "Unknown error".to_string());
262 Err(format!("Delete failed with status: {}", err).into())
263 }
264}
265
266#[cfg(not(target_family = "wasm"))]
267pub async fn upload_v1_stream(
268 url: &str,
269 access_token: &str,
270 domain_id: &str,
271 mut rx: mpsc::Receiver<UploadDomainData>,
272) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
273 use futures::channel::oneshot;
274
275 let boundary = "boundary";
276
277 let (mut create_tx, create_rx) = mpsc::channel(100);
278 let (mut update_tx, update_rx) = mpsc::channel(100);
279
280 let create_body = Body::wrap_stream(create_rx.map(Ok::<Vec<u8>, std::io::Error>));
281 let update_body = Body::wrap_stream(update_rx.map(Ok::<Vec<u8>, std::io::Error>));
282
283 let url = url.to_string();
284 let url_2 = url.clone();
285 let access_token = access_token.to_string();
286 let domain_id = domain_id.to_string();
287 let access_token_2 = access_token.clone();
288 let domain_id_2 = domain_id.clone();
289
290 let (create_signal, create_signal_rx) = oneshot::channel::<
291 Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>>,
292 >();
293 let (update_signal, update_signal_rx) = oneshot::channel::<
294 Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>>,
295 >();
296
297 spawn(async move {
298 let create_response =
299 create_v1(&url, &access_token, &domain_id, boundary, create_body).await;
300 if let Err(Err(e)) = create_signal.send(create_response) {
301 tracing::error!("Failed to send create response: {}", e);
302 }
303 });
304
305 spawn(async move {
306 let update_response =
307 update_v1(&url_2, &access_token_2, &domain_id_2, boundary, update_body).await;
308 if let Err(Err(e)) = update_signal.send(update_response) {
309 tracing::error!("Failed to send update response: {}", e);
310 }
311 });
312
313 while let Some(datum) = rx.next().await {
314 match datum.action {
315 DomainAction::Create(create) => {
316 let create_data = write_create_body(boundary, &create, &datum.data);
317 create_tx.clone().send(create_data).await?;
318 }
319 DomainAction::Update(update) => {
320 let update_data = write_update_body(boundary, &update, &datum.data);
321 update_tx.send(update_data).await?;
322 }
323 }
324 }
325 update_tx
326 .send(format!("--{}--\r\n", boundary).as_bytes().to_vec())
327 .await?;
328 create_tx
329 .send(format!("--{}--\r\n", boundary).as_bytes().to_vec())
330 .await?;
331 update_tx.close().await?;
332 create_tx.close().await?;
333
334 let mut data = {
335 if let Ok(res) = create_signal_rx.await {
336 match res {
337 Ok(d) => d,
338 Err(e) => return Err(e),
339 }
340 } else {
341 return Err("create cancelled".into());
342 }
343 };
344
345 if let Ok(res) = update_signal_rx.await {
346 match res {
347 Ok(d) => data.extend(d),
348 Err(e) => return Err(e),
349 }
350 } else {
351 return Err("update cancelled".into());
352 }
353
354 Ok(data)
355}
356
357async fn update_v1(
358 url: &str,
359 access_token: &str,
360 domain_id: &str,
361 boundary: &str,
362 body: Body,
363) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
364 let update_response = Client::new()
365 .put(format!("{}/api/v1/domains/{}/data", url, domain_id))
366 .bearer_auth(access_token)
367 .header(
368 "Content-Type",
369 &format!("multipart/form-data; boundary={}", boundary),
370 )
371 .body(body)
372 .send()
373 .await?;
374
375 if update_response.status().is_success() {
376 let data = update_response
377 .json::<ListDomainDataMetadata>()
378 .await
379 .unwrap();
380 Ok(data.data)
381 } else {
382 let err = update_response
383 .text()
384 .await
385 .unwrap_or_else(|_| "Unknown error".to_string());
386 Err(format!("Update failed with status: {}", err).into())
387 }
388}
389
390async fn create_v1(
391 url: &str,
392 access_token: &str,
393 domain_id: &str,
394 boundary: &str,
395 body: Body,
396) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
397 let create_response = Client::new()
398 .post(format!("{}/api/v1/domains/{}/data", url, domain_id))
399 .bearer_auth(access_token)
400 .header(
401 "Content-Type",
402 &format!("multipart/form-data; boundary={}", boundary),
403 )
404 .body(body)
405 .send()
406 .await?;
407
408 if create_response.status().is_success() {
409 let data = create_response
410 .json::<ListDomainDataMetadata>()
411 .await
412 .unwrap();
413 Ok(data.data)
414 } else {
415 let err = create_response
416 .text()
417 .await
418 .unwrap_or_else(|_| "Unknown error".to_string());
419 Err(format!("Create failed with status: {}", err).into())
420 }
421}
422
423fn write_create_body(boundary: &str, data: &CreateDomainData, data_bytes: &[u8]) -> Vec<u8> {
424 let create_bytes = format!(
425 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
426 boundary, data.name, data.data_type
427 );
428 let mut create_data = create_bytes.into_bytes();
429 create_data.extend_from_slice(data_bytes);
430 create_data.extend_from_slice("\r\n".as_bytes());
431 create_data
432}
433
434fn write_update_body(boundary: &str, data: &UpdateDomainData, data_bytes: &[u8]) -> Vec<u8> {
435 let update_bytes = format!(
436 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; id=\"{}\"\r\n\r\n",
437 boundary, data.id
438 );
439 let mut update_data = update_bytes.into_bytes();
440 update_data.extend_from_slice(data_bytes);
441 update_data.extend_from_slice("\r\n".as_bytes());
442 update_data
443}
444
445pub async fn upload_v1(
446 url: &str,
447 access_token: &str,
448 domain_id: &str,
449 data: Vec<UploadDomainData>,
450) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
451 let boundary = "boundary";
452
453 let mut create_body = Vec::new();
454 let mut update_body = Vec::new();
455 let mut to_update = false;
456 let mut to_create = false;
457
458 for datum in data {
460 match datum.action {
461 DomainAction::Create(create) => {
462 to_create = true;
463 let create_data = write_create_body(boundary, &create, &datum.data);
464 create_body.extend_from_slice(&create_data);
465 }
466 DomainAction::Update(update) => {
467 to_update = true;
468 let update_data = write_update_body(boundary, &update, &datum.data);
469 update_body.extend_from_slice(&update_data);
470 }
471 }
472 }
473
474 create_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
475 update_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
476
477 let create_body = Body::from(create_body);
478 let update_body = Body::from(update_body);
479 let mut res = Vec::new();
480
481 if to_create {
482 res = create_v1(url, access_token, domain_id, boundary, create_body).await?;
483 }
484 if to_update {
485 let update_response =
486 update_v1(url, access_token, domain_id, boundary, update_body).await?;
487 if !update_response.is_empty() {
488 res.extend(update_response);
489 }
490 }
491
492 Ok(res)
493}
494
495#[cfg(not(target_family = "wasm"))]
498pub async fn upload_one(
499 url: &str,
500 access_token: &str,
501 domain_id: &str,
502 data: UploadDomainData,
503) -> Result<Vec<DomainData>, (StatusCode, String)> {
504 let boundary = "boundary";
505 let (method, body) = match &data.action {
506 DomainAction::Create(create) => {
507 let mut bytes = write_create_body(boundary, create, &data.data);
508 bytes.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
510 (reqwest::Method::POST, Body::from(bytes))
511 }
512 DomainAction::Update(update) => {
513 let mut bytes = write_update_body(boundary, update, &data.data);
514 bytes.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
516 (reqwest::Method::PUT, Body::from(bytes))
517 }
518 };
519
520 let endpoint = format!("{}/api/v1/domains/{}/data", url, domain_id);
521 let response = Client::new()
522 .request(method, endpoint)
523 .bearer_auth(access_token)
524 .header(
525 "Content-Type",
526 &format!("multipart/form-data; boundary={}", boundary),
527 )
528 .body(body)
529 .send()
530 .await
531 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
532
533 if response.status().is_success() {
534 let text = response
536 .text()
537 .await
538 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
539 if let Ok(md) = serde_json::from_str::<ListDomainDataMetadata>(&text) {
541 let items: Vec<DomainData> = md
542 .data
543 .into_iter()
544 .map(|m| DomainData {
545 metadata: m,
546 data: Vec::new(),
547 })
548 .collect();
549 return Ok(items);
550 }
551 let v: serde_json::Value = serde_json::from_str(&text)
553 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
554 let mut out: Vec<DomainData> = Vec::new();
555 if let Some(arr) = v.get("data").and_then(|d| d.as_array()) {
556 for item in arr {
557 let id = item.get("id").and_then(|x| x.as_str()).unwrap_or("").to_string();
558 let domain_id = item
559 .get("domain_id")
560 .and_then(|x| x.as_str())
561 .unwrap_or("")
562 .to_string();
563 let name = item
564 .get("name")
565 .and_then(|x| x.as_str())
566 .unwrap_or("")
567 .to_string();
568 let data_type = item
569 .get("data_type")
570 .and_then(|x| x.as_str())
571 .unwrap_or("")
572 .to_string();
573 out.push(DomainData {
574 metadata: DomainDataMetadata {
575 id,
576 domain_id,
577 name,
578 data_type,
579 size: 0,
580 created_at: String::new(),
581 updated_at: String::new(),
582 },
583 data: Vec::new(),
584 });
585 }
586 return Ok(out);
587 }
588 Err((StatusCode::INTERNAL_SERVER_ERROR, "invalid response".to_string()))
589 } else {
590 let status = response.status();
591 let text = response
592 .text()
593 .await
594 .unwrap_or_else(|_| "Unknown error".to_string());
595 Err((status, text))
596 }
597}
598
599fn parse_headers(
600 headers_slice: &[u8],
601) -> Result<DomainData, Box<dyn std::error::Error + Send + Sync>> {
602 let headers_str = String::from_utf8_lossy(headers_slice);
603 let mut domain_data = None;
604
605 for line in headers_str.lines() {
606 if line.trim().is_empty() {
607 break;
608 }
609 if let Some((key, value)) = line.split_once(':') {
610 let key = key.trim().to_lowercase();
611 if key == "content-disposition" {
612 let mut parsed_domain_data = DomainData {
613 metadata: DomainDataMetadata {
614 id: String::new(),
615 domain_id: String::new(),
616 name: String::new(),
617 data_type: String::new(),
618 size: 0,
619 created_at: String::new(),
620 updated_at: String::new(),
621 },
622 data: Vec::new(),
623 };
624 for part in value.split(';') {
625 let part = part.trim();
626 if let Some((key, value)) = part.split_once('=') {
627 let key = key.trim();
628 let value = value.trim().trim_matches('"');
629 match key {
630 "id" => parsed_domain_data.metadata.id = value.to_string(),
631 "domain-id" => {
632 parsed_domain_data.metadata.domain_id = value.to_string()
633 }
634 "name" => parsed_domain_data.metadata.name = value.to_string(),
635 "data-type" => {
636 parsed_domain_data.metadata.data_type = value.to_string()
637 }
638 "size" => parsed_domain_data.metadata.size = value.parse()?,
639 "created-at" => {
640 parsed_domain_data.metadata.created_at = value.to_string()
641 }
642 "updated-at" => {
643 parsed_domain_data.metadata.updated_at = value.to_string()
644 }
645 _ => {}
646 }
647 }
648 }
649 domain_data = Some(parsed_domain_data);
650 }
651 }
652 }
653
654 if let Some(domain_data) = domain_data {
655 Ok(domain_data)
656 } else {
657 Err("Missing content-disposition header".into())
658 }
659}
660
661fn find_boundary(data: &[u8], boundary: &[u8]) -> Option<usize> {
662 let _data = String::from_utf8_lossy(data);
663 let _boundary = String::from_utf8_lossy(boundary);
664 data.windows(boundary.len())
665 .position(|window| window == boundary)
666}
667
668fn find_headers_end(data: &[u8]) -> Option<usize> {
669 if let Some(i) = data.windows(4).position(|w| w == b"\r\n\r\n") {
670 return Some(i + 4);
671 }
672 data.windows(2)
673 .position(|w| w == b"\n\n")
674 .map(|i| i + 2)
675}
676
677async fn handle_domain_data_stream(
678 mut tx: mpsc::Sender<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
679 stream: impl Stream<Item = Result<Bytes, reqwest::Error>>,
680 boundary: &str,
681) {
682 use futures::pin_mut;
683
684 let mut buffer = Vec::new();
685 let mut current_domain_data: Option<DomainData> = None;
686 let boundary_bytes = format!("--{}", boundary).into_bytes();
687 let keep_tail = boundary_bytes.len() + 4; pin_mut!(stream);
690
691 while let Some(chunk_result) = stream.next().await {
692 let chunk = match chunk_result {
693 Ok(c) if c.is_empty() => continue,
694 Ok(c) => c,
695 Err(e) => {
696 let _ = tx.send(Err(e.into())).await;
697 return;
698 }
699 };
700
701 buffer.extend_from_slice(&chunk);
702
703 'consume: loop {
704 match &mut current_domain_data {
705 None => {
706 let Some(boundary_pos) = find_boundary(&buffer, &boundary_bytes) else {
707 if buffer.len() > keep_tail {
708 buffer.drain(..buffer.len() - keep_tail);
709 }
710 break 'consume;
711 };
712 let Some(header_end_rel) = find_headers_end(&buffer[boundary_pos..]) else {
713 break 'consume;
714 };
715 let headers_slice = &buffer[boundary_pos..boundary_pos + header_end_rel];
716 let part_headers = parse_headers(headers_slice);
717 let domain_data = match part_headers {
718 Ok(d) => d,
719 Err(e) => {
720 tracing::error!("Failed to parse headers: {:?}", e);
721 return;
722 }
723 };
724 buffer.drain(..boundary_pos + header_end_rel);
725 current_domain_data = Some(domain_data);
726 }
727 Some(dd) => {
728 if let Some(next_boundary_pos) = find_boundary(&buffer, &boundary_bytes) {
729 let mut data_end = next_boundary_pos;
730 if data_end >= 2 && &buffer[data_end - 2..data_end] == b"\r\n" {
731 data_end -= 2;
732 } else if data_end >= 1 && buffer[data_end - 1] == b'\n' {
733 data_end -= 1;
734 }
735 dd.data.extend_from_slice(&buffer[..data_end]);
736 buffer.drain(..next_boundary_pos);
737 let finished = current_domain_data.take().unwrap();
738 if tx.send(Ok(finished)).await.is_err() {
739 return;
740 }
741 } else {
742 if buffer.len() > keep_tail {
743 let take = buffer.len() - keep_tail;
744 dd.data.extend_from_slice(&buffer[..take]);
745 buffer.drain(..take);
746 }
747 break 'consume;
748 }
749 }
750 }
751 }
752 }
753
754 let _ = tx.close().await;
755}
756
757#[cfg(test)]
758mod tests {
759 use bytes::Bytes;
760
761 use crate::{auth::TokenCache, config::Config, discovery::DiscoveryService};
762
763 use super::*;
764
765 fn get_config() -> (Config, String) {
766 if std::path::Path::new("../.env.local").exists() {
767 dotenvy::from_filename("../.env.local").ok();
768 dotenvy::dotenv().ok();
769 }
770 let config = Config::from_env().unwrap();
771 (config, std::env::var("DOMAIN_ID").unwrap())
772 }
773
774 #[test]
775 fn test_find_boundary_found() {
776 let data = b"random--boundary--data";
777 let boundary = b"--boundary";
778 assert_eq!(find_boundary(data, boundary), Some(6));
779 }
780
781 #[test]
782 fn test_find_boundary_not_found() {
783 let data = b"random-data";
784 let boundary = b"--boundary";
785 assert_eq!(find_boundary(data, boundary), None);
786 }
787
788 #[test]
789 fn test_find_headers_end_crlf() {
790 let data = b"header1: value1\r\nheader2: value2\r\n\r\nbody";
791 assert_eq!(find_headers_end(data), Some(36));
792 }
793
794 #[test]
795 fn test_find_headers_end_lf() {
796 let data = b"header1: value1\nheader2: value2\n\nbody";
797 assert_eq!(find_headers_end(data), Some(33));
798 }
799
800 #[test]
801 fn test_find_headers_end_none() {
802 let data = b"header1: value1\nheader2: value2\nbody";
803 assert_eq!(find_headers_end(data), None);
804 }
805
806 #[test]
807 fn test_parse_headers_success() {
808 let headers = b"content-disposition: form-data; id=\"123\"; domain-id=\"abc\"; name=\"test\"; data-type=\"type\"; size=\"42\"; created-at=\"2024-01-01T00:00:00Z\"; updated-at=\"2024-01-02T00:00:00Z\"\r\n\r\n";
809 let parsed = super::parse_headers(headers);
810 assert!(parsed.is_ok());
811 let domain_data = parsed.unwrap();
812 assert_eq!(domain_data.metadata.id, "123");
813 assert_eq!(domain_data.metadata.domain_id, "abc");
814 assert_eq!(domain_data.metadata.name, "test");
815 assert_eq!(domain_data.metadata.data_type, "type");
816 assert_eq!(domain_data.metadata.size, 42);
817 assert_eq!(domain_data.metadata.created_at, "2024-01-01T00:00:00Z");
818 assert_eq!(domain_data.metadata.updated_at, "2024-01-02T00:00:00Z");
819 }
820
821 #[test]
822 fn test_parse_headers_missing_content_disposition() {
823 let headers = b"content-type: application/octet-stream\r\n\r\n";
824 let parsed = super::parse_headers(headers);
825 assert!(parsed.is_err());
826 }
827
828 #[tokio::test]
829 async fn test_a_chunk_contains_multiple_data() {
830 let (tx, rx) = mpsc::channel(10);
831
832 let payload = br#"
833 --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
834Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
835Content-Type: application/octet-stream
836
837{"test": "test"}
838--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
839Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
840Content-Type: application/octet-stream
841
842{"test": "test updated"}
843--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
844 "#;
845 let stream = tokio_stream::iter(vec![Ok(Bytes::from_static(payload))]);
846
847 handle_domain_data_stream(
848 tx,
849 stream,
850 "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
851 )
852 .await;
853
854 let output: Vec<DomainData> = rx
855 .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
856 .await
857 .into_iter()
858 .map(|r| r.unwrap())
859 .collect();
860 assert_eq!(output.len(), 2);
861 assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
862 assert_eq!(output[0].data, b"{\"test\": \"test\"}");
863 }
864
865 #[tokio::test]
866 async fn test_chunk_size_is_smaller_than_part() {
867 let (tx, rx) = mpsc::channel(10);
868
869 let payload = br#"
870 --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
871Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
872Content-Type: application/octet-stream
873 "#;
874 let payload2 = br#"
875
876{"test": "test"}
877--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
878Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
879Content-Type: application/octet-stream
880
881{"test": "test updated"}
882--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
883"#;
884 let stream = tokio_stream::iter(vec![
885 Ok(Bytes::from_static(payload)),
886 Ok(Bytes::from_static(payload2)),
887 ]);
888
889 handle_domain_data_stream(
890 tx,
891 stream,
892 "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
893 )
894 .await;
895
896 let output: Vec<DomainData> = rx
897 .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
898 .await
899 .into_iter()
900 .map(|r| r.unwrap())
901 .collect();
902 assert_eq!(output.len(), 2);
903 assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
904 assert_eq!(output[0].data, b"{\"test\": \"test\"}");
905 }
906
907 #[tokio::test]
908 async fn test_chunk_size_is_smaller_than_header() {
909 let (tx, rx) = mpsc::channel(10);
910
911 let payload = br#"
912 --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
913Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
914Content-Type: application/octet-stream
915 "#;
916 let payload2 = br#"
917e: application/octet-stream
918
919{"test": "test"}
920--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
921Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
922Content-Type: application/octet-stream
923
924{"test": "test updated"}
925--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
926"#;
927 let stream = tokio_stream::iter(vec![
928 Ok(Bytes::from_static(payload)),
929 Ok(Bytes::from_static(payload2)),
930 ]);
931
932 handle_domain_data_stream(
933 tx,
934 stream,
935 "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
936 )
937 .await;
938
939 let output: Vec<DomainData> = rx
940 .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
941 .await
942 .into_iter()
943 .map(|r| r.unwrap())
944 .collect();
945 assert_eq!(output.len(), 2);
946 assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
947 assert_eq!(output[0].data, b"{\"test\": \"test\"}");
948 }
949
950 #[tokio::test]
951 async fn test_chunk_size_doesnt_cover_the_whole_data() {
952 let (tx, rx) = mpsc::channel(10);
953
954 let payload = br#"
955 --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
956Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
957Content-Type: application/octet-stream
958
959{"test": "test"#;
960 let payload2 = br#""}
961--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
962Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
963Content-Type: application/octet-stream
964
965{"test": "test updated"}
966--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
967"#;
968 let stream = tokio_stream::iter(vec![
969 Ok(Bytes::from_static(payload)),
970 Ok(Bytes::from_static(payload2)),
971 ]);
972
973 handle_domain_data_stream(
974 tx,
975 stream,
976 "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
977 )
978 .await;
979
980 let output: Vec<DomainData> = rx
981 .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
982 .await
983 .into_iter()
984 .map(|r| r.unwrap())
985 .collect();
986 assert_eq!(output.len(), 2);
987 assert_eq!(
988 std::str::from_utf8(&output[1].data).unwrap(),
989 "{\"test\": \"test updated\"}"
990 );
991 assert_eq!(
992 std::str::from_utf8(&output[0].data).unwrap(),
993 "{\"test\": \"test\"}"
994 );
995 }
996
997 #[tokio::test]
998 async fn test_upload_v1_with_user_dds_access_token() {
999 use crate::domain_data::{CreateDomainData, DomainAction, UploadDomainData};
1000
1001 let (config, domain_id) = get_config();
1002
1003 let mut discovery =
1004 DiscoveryService::new(&config.api_url, &config.dds_url, &config.client_id);
1005 discovery
1006 .sign_in_with_auki_account(&config.email.unwrap(), &config.password.unwrap(), false)
1007 .await
1008 .expect("sign_in_with_auki_account failed");
1009 let domain = discovery
1010 .auth_domain(&domain_id)
1011 .await
1012 .expect("get_domain failed");
1013 let upload_data = vec![
1015 UploadDomainData {
1016 action: DomainAction::Create(CreateDomainData {
1017 name: "test_upload".to_string(),
1018 data_type: "test".to_string(),
1019 }),
1020 data: b"hello world".to_vec(),
1021 },
1022 UploadDomainData {
1023 action: DomainAction::Update(UpdateDomainData {
1024 id: "a84a36e5-312b-4f80-974a-06f5d19c1e16".to_string(),
1025 }),
1026 data: b"{\"test\": \"test updated\"}".to_vec(),
1027 },
1028 ];
1029
1030 let result = upload_v1(
1032 &domain.domain.domain_server.url,
1033 &domain.get_access_token(),
1034 &domain_id,
1035 upload_data,
1036 )
1037 .await
1038 .expect("upload_v1 failed");
1039
1040 assert_eq!(result.len(), 2, "No metadata returned from upload_v1");
1041 for data in result {
1042 if data.id != "a84a36e5-312b-4f80-974a-06f5d19c1e16" {
1043 assert_eq!(data.name, "test_upload");
1044 delete_by_id(
1045 &domain.domain.domain_server.url,
1046 &domain.get_access_token(),
1047 &domain_id,
1048 &data.id,
1049 )
1050 .await
1051 .expect("delete_by_id failed");
1052 }
1053 }
1054 }
1055}