1use bytes::Bytes;
2use futures::{SinkExt, Stream, channel::mpsc, stream::StreamExt};
3use reqwest::{Body, Client, Response};
4#[cfg(not(target_family = "wasm"))]
5use reqwest::StatusCode;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8#[cfg(not(target_family = "wasm"))]
9use tokio::spawn;
10#[cfg(target_family = "wasm")]
11use wasm_bindgen_futures::spawn_local as spawn;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct DomainDataMetadata {
15 pub id: String,
16 pub domain_id: String,
17 pub name: String,
18 pub data_type: String,
19 pub size: u64,
20 pub created_at: String,
21 pub updated_at: String,
22}
23
24#[derive(Debug, Deserialize, Serialize)]
25pub struct DomainData {
26 pub metadata: DomainDataMetadata,
28 pub data: Vec<u8>,
29}
30
31#[derive(Debug, Serialize, Clone, Deserialize)]
32pub struct UpdateDomainData {
33 pub id: String,
34}
35
36#[derive(Debug, Serialize, Clone, Deserialize)]
37pub struct CreateDomainData {
38 pub name: String,
39 pub data_type: String,
40}
41
42#[derive(Debug, Serialize, Deserialize)]
43#[serde(untagged)]
44pub enum DomainAction {
45 Create {
46 name: String,
47 data_type: String,
48 },
49 Update {
50 id: String,
51 },
52}
53
54#[derive(Debug, Serialize, Deserialize)]
55pub struct UploadDomainData {
56 #[serde(flatten)]
57 pub action: DomainAction,
58 pub data: Vec<u8>,
59}
60
61#[derive(Debug, Serialize, Deserialize)]
62pub struct DownloadQuery {
63 pub ids: Vec<String>,
64 pub name: Option<String>,
65 pub data_type: Option<String>,
66}
67
68#[derive(Debug, Deserialize)]
69struct ListDomainDataMetadata {
70 pub data: Vec<DomainDataMetadata>,
71}
72
73pub async fn download_by_id(
74 url: &str,
75 client_id: &str,
76 access_token: &str,
77 domain_id: &str,
78 id: &str,
79) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
80 let response = Client::new()
81 .get(format!(
82 "{}/api/v1/domains/{}/data/{}?raw=true",
83 url, domain_id, id
84 ))
85 .bearer_auth(access_token)
86 .header("posemesh-client-id", client_id)
87 .send()
88 .await?;
89
90 if response.status().is_success() {
91 let data = response.bytes().await?;
92 Ok(data.to_vec())
93 } else {
94 let status = response.status();
95 let text = response
96 .text()
97 .await
98 .unwrap_or_else(|_| "Unknown error".to_string());
99 Err(format!(
100 "Failed to download data by id. Status: {} - {}",
101 status, text
102 )
103 .into())
104 }
105}
106
107pub async fn request_download_absolute(
110 url: &str,
111 client_id: &str,
112 access_token: &str,
113) -> Result<Response, Box<dyn std::error::Error + Send + Sync>> {
114 let response = Client::new()
115 .get(url)
116 .bearer_auth(access_token)
117 .header("posemesh-client-id", client_id)
118 .header("Accept", "multipart/form-data")
119 .send()
120 .await?;
121 Ok(response)
122}
123
124pub async fn download_metadata_v1(
125 url: &str,
126 client_id: &str,
127 access_token: &str,
128 domain_id: &str,
129 query: &DownloadQuery,
130) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
131 let response = download_v1(url, client_id, access_token, domain_id, query, false).await?;
132 if response.status().is_success() {
133 let data = response.json::<ListDomainDataMetadata>().await?;
134 Ok(data.data)
135 } else {
136 let status = response.status();
137 let text = response
138 .text()
139 .await
140 .unwrap_or_else(|_| "Unknown error".to_string());
141 Err(format!("Failed to download data. Status: {} - {}", status, text).into())
142 }
143}
144
145pub async fn download_v1(
146 url: &str,
147 client_id: &str,
148 access_token: &str,
149 domain_id: &str,
150 query: &DownloadQuery,
151 with_data: bool,
152) -> Result<Response, Box<dyn std::error::Error + Send + Sync>> {
153 let mut params = HashMap::new();
154
155 if let Some(name) = &query.name {
156 params.insert("name", name.clone());
157 }
158 if let Some(data_type) = &query.data_type {
159 params.insert("data_type", data_type.clone());
160 }
161 let ids = if !query.ids.is_empty() {
162 format!("?ids={}", query.ids.join(","))
163 } else {
164 String::new()
165 };
166
167 let response = Client::new()
168 .get(format!("{}/api/v1/domains/{}/data{}", url, domain_id, ids))
169 .bearer_auth(access_token)
170 .header(
171 "Accept",
172 if with_data {
173 "multipart/form-data"
174 } else {
175 "application/json"
176 },
177 )
178 .header("posemesh-client-id", client_id)
179 .query(¶ms)
180 .send()
181 .await?;
182
183 if response.status().is_success() {
184 Ok(response)
185 } else {
186 let status = response.status();
187 let text = response
188 .text()
189 .await
190 .unwrap_or_else(|_| "Unknown error".to_string());
191 Err(format!("Failed to download data. Status: {} - {}", status, text).into())
192 }
193}
194
195pub async fn download_v1_stream(
196 url: &str,
197 client_id: &str,
198 access_token: &str,
199 domain_id: &str,
200 query: &DownloadQuery,
201) -> Result<
202 mpsc::Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
203 Box<dyn std::error::Error + Send + Sync>,
204> {
205 let response = download_v1(url, client_id, access_token, domain_id, query, true).await?;
206
207 stream_from_response(response).await
208}
209
210pub async fn stream_from_response(
213 response: Response,
214) -> Result<
215 mpsc::Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
216 Box<dyn std::error::Error + Send + Sync>,
217> {
218 let (mut tx, rx) =
219 mpsc::channel::<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>(100);
220
221 let boundary = match response
222 .headers()
223 .get("content-type")
224 .and_then(|ct| ct.to_str().ok())
225 .and_then(|ct| {
226 if ct.starts_with("multipart/form-data; boundary=") {
227 Some(ct.split("boundary=").nth(1)?.to_string())
228 } else {
229 None
230 }
231 }) {
232 Some(b) => b,
233 None => {
234 tracing::error!("Invalid content-type header");
235 let _ = tx.close().await;
236 return Err("Invalid content-type header".into());
237 }
238 };
239
240 spawn(async move {
241 let stream = response.bytes_stream();
242 handle_domain_data_stream(tx, stream, &boundary).await;
243 });
244
245 Ok(rx)
246}
247
248pub async fn delete_by_id(
249 url: &str,
250 access_token: &str,
251 domain_id: &str,
252 id: &str,
253) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
254 let endpoint = format!("{}/api/v1/domains/{}/data/{}", url, domain_id, id);
255 let client = Client::new();
256 let resp = client
257 .delete(&endpoint)
258 .bearer_auth(access_token)
259 .send()
260 .await?;
261
262 if resp.status().is_success() {
263 Ok(())
264 } else {
265 let err = resp
266 .text()
267 .await
268 .unwrap_or_else(|_| "Unknown error".to_string());
269 Err(format!("Delete failed with status: {}", err).into())
270 }
271}
272
273#[cfg(not(target_family = "wasm"))]
274pub async fn upload_v1_stream(
275 url: &str,
276 access_token: &str,
277 domain_id: &str,
278 mut rx: mpsc::Receiver<UploadDomainData>,
279) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
280 use futures::channel::oneshot;
281
282 let boundary = "boundary";
283
284 let (mut create_tx, create_rx) = mpsc::channel(100);
285 let (mut update_tx, update_rx) = mpsc::channel(100);
286
287 let create_body = Body::wrap_stream(create_rx.map(Ok::<Vec<u8>, std::io::Error>));
288 let update_body = Body::wrap_stream(update_rx.map(Ok::<Vec<u8>, std::io::Error>));
289
290 let url = url.to_string();
291 let url_2 = url.clone();
292 let access_token = access_token.to_string();
293 let domain_id = domain_id.to_string();
294 let access_token_2 = access_token.clone();
295 let domain_id_2 = domain_id.clone();
296
297 let (create_signal, create_signal_rx) = oneshot::channel::<
298 Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>>,
299 >();
300 let (update_signal, update_signal_rx) = oneshot::channel::<
301 Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>>,
302 >();
303
304 spawn(async move {
305 let create_response =
306 create_v1(&url, &access_token, &domain_id, boundary, create_body).await;
307 if let Err(Err(e)) = create_signal.send(create_response) {
308 tracing::error!("Failed to send create response: {}", e);
309 }
310 });
311
312 spawn(async move {
313 let update_response =
314 update_v1(&url_2, &access_token_2, &domain_id_2, boundary, update_body).await;
315 if let Err(Err(e)) = update_signal.send(update_response) {
316 tracing::error!("Failed to send update response: {}", e);
317 }
318 });
319
320 while let Some(datum) = rx.next().await {
321 match datum.action {
322 DomainAction::Create { name, data_type } => {
323 let create_data = write_create_body(boundary, &CreateDomainData { name, data_type }, &datum.data);
324 create_tx.clone().send(create_data).await?;
325 }
326 DomainAction::Update { id } => {
327 let update_data = write_update_body(boundary, &UpdateDomainData { id }, &datum.data);
328 update_tx.send(update_data).await?;
329 }
330 }
331 }
332 update_tx
333 .send(format!("--{}--\r\n", boundary).as_bytes().to_vec())
334 .await?;
335 create_tx
336 .send(format!("--{}--\r\n", boundary).as_bytes().to_vec())
337 .await?;
338 update_tx.close().await?;
339 create_tx.close().await?;
340
341 let mut data = {
342 if let Ok(res) = create_signal_rx.await {
343 match res {
344 Ok(d) => d,
345 Err(e) => return Err(e),
346 }
347 } else {
348 return Err("create cancelled".into());
349 }
350 };
351
352 if let Ok(res) = update_signal_rx.await {
353 match res {
354 Ok(d) => data.extend(d),
355 Err(e) => return Err(e),
356 }
357 } else {
358 return Err("update cancelled".into());
359 }
360
361 Ok(data)
362}
363
364async fn update_v1(
365 url: &str,
366 access_token: &str,
367 domain_id: &str,
368 boundary: &str,
369 body: Body,
370) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
371 let update_response = Client::new()
372 .put(format!("{}/api/v1/domains/{}/data", url, domain_id))
373 .bearer_auth(access_token)
374 .header(
375 "Content-Type",
376 &format!("multipart/form-data; boundary={}", boundary),
377 )
378 .body(body)
379 .send()
380 .await?;
381
382 if update_response.status().is_success() {
383 let data = update_response
384 .json::<ListDomainDataMetadata>()
385 .await
386 .unwrap();
387 Ok(data.data)
388 } else {
389 let err = update_response
390 .text()
391 .await
392 .unwrap_or_else(|_| "Unknown error".to_string());
393 Err(format!("Update failed with status: {}", err).into())
394 }
395}
396
397async fn create_v1(
398 url: &str,
399 access_token: &str,
400 domain_id: &str,
401 boundary: &str,
402 body: Body,
403) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
404 let create_response = Client::new()
405 .post(format!("{}/api/v1/domains/{}/data", url, domain_id))
406 .bearer_auth(access_token)
407 .header(
408 "Content-Type",
409 &format!("multipart/form-data; boundary={}", boundary),
410 )
411 .body(body)
412 .send()
413 .await?;
414
415 if create_response.status().is_success() {
416 let data = create_response
417 .json::<ListDomainDataMetadata>()
418 .await
419 .unwrap();
420 Ok(data.data)
421 } else {
422 let err = create_response
423 .text()
424 .await
425 .unwrap_or_else(|_| "Unknown error".to_string());
426 Err(format!("Create failed with status: {}", err).into())
427 }
428}
429
430fn write_create_body(boundary: &str, data: &CreateDomainData, data_bytes: &[u8]) -> Vec<u8> {
431 let create_bytes = format!(
432 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
433 boundary, data.name, data.data_type
434 );
435 let mut create_data = create_bytes.into_bytes();
436 create_data.extend_from_slice(data_bytes);
437 create_data.extend_from_slice("\r\n".as_bytes());
438 create_data
439}
440
441fn write_update_body(boundary: &str, data: &UpdateDomainData, data_bytes: &[u8]) -> Vec<u8> {
442 let update_bytes = format!(
443 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; id=\"{}\"\r\n\r\n",
444 boundary, data.id
445 );
446 let mut update_data = update_bytes.into_bytes();
447 update_data.extend_from_slice(data_bytes);
448 update_data.extend_from_slice("\r\n".as_bytes());
449 update_data
450}
451
452pub async fn upload_v1(
453 url: &str,
454 access_token: &str,
455 domain_id: &str,
456 data: Vec<UploadDomainData>,
457) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
458 let boundary = "boundary";
459
460 let mut create_body = Vec::new();
461 let mut update_body = Vec::new();
462 let mut to_update = false;
463 let mut to_create = false;
464
465 for datum in data {
467 match datum.action {
468 DomainAction::Create { name, data_type } => {
469 to_create = true;
470 let create_data = write_create_body(boundary, &CreateDomainData { name, data_type }, &datum.data);
471 create_body.extend_from_slice(&create_data);
472 }
473 DomainAction::Update { id } => {
474 to_update = true;
475 let update_data = write_update_body(boundary, &UpdateDomainData { id }, &datum.data);
476 update_body.extend_from_slice(&update_data);
477 }
478 }
479 }
480
481 create_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
482 update_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
483
484 let create_body = Body::from(create_body);
485 let update_body = Body::from(update_body);
486 let mut res = Vec::new();
487
488 if to_create {
489 res = create_v1(url, access_token, domain_id, boundary, create_body).await?;
490 }
491 if to_update {
492 let update_response =
493 update_v1(url, access_token, domain_id, boundary, update_body).await?;
494 if !update_response.is_empty() {
495 res.extend(update_response);
496 }
497 }
498
499 Ok(res)
500}
501
502#[cfg(not(target_family = "wasm"))]
505pub async fn upload_one(
506 url: &str,
507 access_token: &str,
508 domain_id: &str,
509 data: UploadDomainData,
510) -> Result<Vec<DomainData>, (StatusCode, String)> {
511 let boundary = "boundary";
512 let (method, body) = match &data.action {
513 DomainAction::Create { name, data_type } => {
514 let create = CreateDomainData {
515 name: name.clone(),
516 data_type: data_type.clone(),
517 };
518 let mut bytes = write_create_body(boundary, &create, &data.data);
519 bytes.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
521 (reqwest::Method::POST, Body::from(bytes))
522 }
523 DomainAction::Update { id } => {
524 let update = UpdateDomainData { id: id.clone() };
525 let mut bytes = write_update_body(boundary, &update, &data.data);
526 bytes.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
528 (reqwest::Method::PUT, Body::from(bytes))
529 }
530 };
531
532 let endpoint = format!("{}/api/v1/domains/{}/data", url, domain_id);
533 let response = Client::new()
534 .request(method, endpoint)
535 .bearer_auth(access_token)
536 .header(
537 "Content-Type",
538 &format!("multipart/form-data; boundary={}", boundary),
539 )
540 .body(body)
541 .send()
542 .await
543 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
544
545 if response.status().is_success() {
546 let text = response
548 .text()
549 .await
550 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
551 if let Ok(md) = serde_json::from_str::<ListDomainDataMetadata>(&text) {
553 let items: Vec<DomainData> = md
554 .data
555 .into_iter()
556 .map(|m| DomainData {
557 metadata: m,
558 data: Vec::new(),
559 })
560 .collect();
561 return Ok(items);
562 }
563 let v: serde_json::Value = serde_json::from_str(&text)
565 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
566 let mut out: Vec<DomainData> = Vec::new();
567 if let Some(arr) = v.get("data").and_then(|d| d.as_array()) {
568 for item in arr {
569 let id = item.get("id").and_then(|x| x.as_str()).unwrap_or("").to_string();
570 let domain_id = item
571 .get("domain_id")
572 .and_then(|x| x.as_str())
573 .unwrap_or("")
574 .to_string();
575 let name = item
576 .get("name")
577 .and_then(|x| x.as_str())
578 .unwrap_or("")
579 .to_string();
580 let data_type = item
581 .get("data_type")
582 .and_then(|x| x.as_str())
583 .unwrap_or("")
584 .to_string();
585 out.push(DomainData {
586 metadata: DomainDataMetadata {
587 id,
588 domain_id,
589 name,
590 data_type,
591 size: 0,
592 created_at: String::new(),
593 updated_at: String::new(),
594 },
595 data: Vec::new(),
596 });
597 }
598 return Ok(out);
599 }
600 Err((StatusCode::INTERNAL_SERVER_ERROR, "invalid response".to_string()))
601 } else {
602 let status = response.status();
603 let text = response
604 .text()
605 .await
606 .unwrap_or_else(|_| "Unknown error".to_string());
607 Err((status, text))
608 }
609}
610
611fn parse_headers(
612 headers_slice: &[u8],
613) -> Result<DomainData, Box<dyn std::error::Error + Send + Sync>> {
614 let headers_str = String::from_utf8_lossy(headers_slice);
615 let mut domain_data = None;
616
617 for line in headers_str.lines() {
618 if line.trim().is_empty() {
619 break;
620 }
621 if let Some((key, value)) = line.split_once(':') {
622 let key = key.trim().to_lowercase();
623 if key == "content-disposition" {
624 let mut parsed_domain_data = DomainData {
625 metadata: DomainDataMetadata {
626 id: String::new(),
627 domain_id: String::new(),
628 name: String::new(),
629 data_type: String::new(),
630 size: 0,
631 created_at: String::new(),
632 updated_at: String::new(),
633 },
634 data: Vec::new(),
635 };
636 for part in value.split(';') {
637 let part = part.trim();
638 if let Some((key, value)) = part.split_once('=') {
639 let key = key.trim();
640 let value = value.trim().trim_matches('"');
641 match key {
642 "id" => parsed_domain_data.metadata.id = value.to_string(),
643 "domain-id" => {
644 parsed_domain_data.metadata.domain_id = value.to_string()
645 }
646 "name" => parsed_domain_data.metadata.name = value.to_string(),
647 "data-type" => {
648 parsed_domain_data.metadata.data_type = value.to_string()
649 }
650 "size" => parsed_domain_data.metadata.size = value.parse()?,
651 "created-at" => {
652 parsed_domain_data.metadata.created_at = value.to_string()
653 }
654 "updated-at" => {
655 parsed_domain_data.metadata.updated_at = value.to_string()
656 }
657 _ => {}
658 }
659 }
660 }
661 domain_data = Some(parsed_domain_data);
662 }
663 }
664 }
665
666 if let Some(domain_data) = domain_data {
667 Ok(domain_data)
668 } else {
669 Err("Missing content-disposition header".into())
670 }
671}
672
673fn find_boundary(data: &[u8], boundary: &[u8]) -> Option<usize> {
674 let _data = String::from_utf8_lossy(data);
675 let _boundary = String::from_utf8_lossy(boundary);
676 data.windows(boundary.len())
677 .position(|window| window == boundary)
678}
679
680fn find_headers_end(data: &[u8]) -> Option<usize> {
681 if let Some(i) = data.windows(4).position(|w| w == b"\r\n\r\n") {
682 return Some(i + 4);
683 }
684 data.windows(2)
685 .position(|w| w == b"\n\n")
686 .map(|i| i + 2)
687}
688
689async fn handle_domain_data_stream(
690 mut tx: mpsc::Sender<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
691 stream: impl Stream<Item = Result<Bytes, reqwest::Error>>,
692 boundary: &str,
693) {
694 use futures::pin_mut;
695
696 let mut buffer = Vec::new();
697 let mut current_domain_data: Option<DomainData> = None;
698 let boundary_bytes = format!("--{}", boundary).into_bytes();
699 let keep_tail = boundary_bytes.len() + 4; pin_mut!(stream);
702
703 while let Some(chunk_result) = stream.next().await {
704 let chunk = match chunk_result {
705 Ok(c) if c.is_empty() => continue,
706 Ok(c) => c,
707 Err(e) => {
708 let _ = tx.send(Err(e.into())).await;
709 return;
710 }
711 };
712
713 buffer.extend_from_slice(&chunk);
714
715 'consume: loop {
716 match &mut current_domain_data {
717 None => {
718 let Some(boundary_pos) = find_boundary(&buffer, &boundary_bytes) else {
719 if buffer.len() > keep_tail {
720 buffer.drain(..buffer.len() - keep_tail);
721 }
722 break 'consume;
723 };
724 let Some(header_end_rel) = find_headers_end(&buffer[boundary_pos..]) else {
725 break 'consume;
726 };
727 let headers_slice = &buffer[boundary_pos..boundary_pos + header_end_rel];
728 let part_headers = parse_headers(headers_slice);
729 let domain_data = match part_headers {
730 Ok(d) => d,
731 Err(e) => {
732 tracing::error!("Failed to parse headers: {:?}", e);
733 return;
734 }
735 };
736 buffer.drain(..boundary_pos + header_end_rel);
737 current_domain_data = Some(domain_data);
738 }
739 Some(dd) => {
740 if let Some(next_boundary_pos) = find_boundary(&buffer, &boundary_bytes) {
741 let mut data_end = next_boundary_pos;
742 if data_end >= 2 && &buffer[data_end - 2..data_end] == b"\r\n" {
743 data_end -= 2;
744 } else if data_end >= 1 && buffer[data_end - 1] == b'\n' {
745 data_end -= 1;
746 }
747 dd.data.extend_from_slice(&buffer[..data_end]);
748 buffer.drain(..next_boundary_pos);
749 let finished = current_domain_data.take().unwrap();
750 if tx.send(Ok(finished)).await.is_err() {
751 return;
752 }
753 } else {
754 if buffer.len() > keep_tail {
755 let take = buffer.len() - keep_tail;
756 dd.data.extend_from_slice(&buffer[..take]);
757 buffer.drain(..take);
758 }
759 break 'consume;
760 }
761 }
762 }
763 }
764 }
765
766 let _ = tx.close().await;
767}
768
769#[cfg(test)]
770mod tests {
771 use bytes::Bytes;
772
773 use crate::{auth::TokenCache, config::Config, discovery::DiscoveryService};
774
775 use super::*;
776
777 fn get_config() -> (Config, String) {
778 if std::path::Path::new("../.env.local").exists() {
779 dotenvy::from_filename("../.env.local").ok();
780 dotenvy::dotenv().ok();
781 }
782 let config = Config::from_env().unwrap();
783 (config, std::env::var("DOMAIN_ID").unwrap())
784 }
785
786 #[test]
787 fn test_find_boundary_found() {
788 let data = b"random--boundary--data";
789 let boundary = b"--boundary";
790 assert_eq!(find_boundary(data, boundary), Some(6));
791 }
792
793 #[test]
794 fn test_find_boundary_not_found() {
795 let data = b"random-data";
796 let boundary = b"--boundary";
797 assert_eq!(find_boundary(data, boundary), None);
798 }
799
800 #[test]
801 fn test_find_headers_end_crlf() {
802 let data = b"header1: value1\r\nheader2: value2\r\n\r\nbody";
803 assert_eq!(find_headers_end(data), Some(36));
804 }
805
806 #[test]
807 fn test_find_headers_end_lf() {
808 let data = b"header1: value1\nheader2: value2\n\nbody";
809 assert_eq!(find_headers_end(data), Some(33));
810 }
811
812 #[test]
813 fn test_find_headers_end_none() {
814 let data = b"header1: value1\nheader2: value2\nbody";
815 assert_eq!(find_headers_end(data), None);
816 }
817
818 #[test]
819 fn test_parse_headers_success() {
820 let headers = b"content-disposition: form-data; id=\"123\"; domain-id=\"abc\"; name=\"test\"; data-type=\"type\"; size=\"42\"; created-at=\"2024-01-01T00:00:00Z\"; updated-at=\"2024-01-02T00:00:00Z\"\r\n\r\n";
821 let parsed = super::parse_headers(headers);
822 assert!(parsed.is_ok());
823 let domain_data = parsed.unwrap();
824 assert_eq!(domain_data.metadata.id, "123");
825 assert_eq!(domain_data.metadata.domain_id, "abc");
826 assert_eq!(domain_data.metadata.name, "test");
827 assert_eq!(domain_data.metadata.data_type, "type");
828 assert_eq!(domain_data.metadata.size, 42);
829 assert_eq!(domain_data.metadata.created_at, "2024-01-01T00:00:00Z");
830 assert_eq!(domain_data.metadata.updated_at, "2024-01-02T00:00:00Z");
831 }
832
833 #[test]
834 fn test_parse_headers_missing_content_disposition() {
835 let headers = b"content-type: application/octet-stream\r\n\r\n";
836 let parsed = super::parse_headers(headers);
837 assert!(parsed.is_err());
838 }
839
840 #[tokio::test]
841 async fn test_a_chunk_contains_multiple_data() {
842 let (tx, rx) = mpsc::channel(10);
843
844 let payload = br#"
845 --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
846Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
847Content-Type: application/octet-stream
848
849{"test": "test"}
850--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
851Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
852Content-Type: application/octet-stream
853
854{"test": "test updated"}
855--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
856 "#;
857 let stream = tokio_stream::iter(vec![Ok(Bytes::from_static(payload))]);
858
859 handle_domain_data_stream(
860 tx,
861 stream,
862 "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
863 )
864 .await;
865
866 let output: Vec<DomainData> = rx
867 .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
868 .await
869 .into_iter()
870 .map(|r| r.unwrap())
871 .collect();
872 assert_eq!(output.len(), 2);
873 assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
874 assert_eq!(output[0].data, b"{\"test\": \"test\"}");
875 }
876
877 #[tokio::test]
878 async fn test_chunk_size_is_smaller_than_part() {
879 let (tx, rx) = mpsc::channel(10);
880
881 let payload = br#"
882 --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
883Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
884Content-Type: application/octet-stream
885 "#;
886 let payload2 = br#"
887
888{"test": "test"}
889--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
890Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
891Content-Type: application/octet-stream
892
893{"test": "test updated"}
894--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
895"#;
896 let stream = tokio_stream::iter(vec![
897 Ok(Bytes::from_static(payload)),
898 Ok(Bytes::from_static(payload2)),
899 ]);
900
901 handle_domain_data_stream(
902 tx,
903 stream,
904 "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
905 )
906 .await;
907
908 let output: Vec<DomainData> = rx
909 .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
910 .await
911 .into_iter()
912 .map(|r| r.unwrap())
913 .collect();
914 assert_eq!(output.len(), 2);
915 assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
916 assert_eq!(output[0].data, b"{\"test\": \"test\"}");
917 }
918
919 #[tokio::test]
920 async fn test_chunk_size_is_smaller_than_header() {
921 let (tx, rx) = mpsc::channel(10);
922
923 let payload = br#"
924 --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
925Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
926Content-Type: application/octet-stream
927 "#;
928 let payload2 = br#"
929e: application/octet-stream
930
931{"test": "test"}
932--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
933Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
934Content-Type: application/octet-stream
935
936{"test": "test updated"}
937--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
938"#;
939 let stream = tokio_stream::iter(vec![
940 Ok(Bytes::from_static(payload)),
941 Ok(Bytes::from_static(payload2)),
942 ]);
943
944 handle_domain_data_stream(
945 tx,
946 stream,
947 "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
948 )
949 .await;
950
951 let output: Vec<DomainData> = rx
952 .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
953 .await
954 .into_iter()
955 .map(|r| r.unwrap())
956 .collect();
957 assert_eq!(output.len(), 2);
958 assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
959 assert_eq!(output[0].data, b"{\"test\": \"test\"}");
960 }
961
962 #[tokio::test]
963 async fn test_chunk_size_doesnt_cover_the_whole_data() {
964 let (tx, rx) = mpsc::channel(10);
965
966 let payload = br#"
967 --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
968Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
969Content-Type: application/octet-stream
970
971{"test": "test"#;
972 let payload2 = br#""}
973--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
974Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
975Content-Type: application/octet-stream
976
977{"test": "test updated"}
978--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
979"#;
980 let stream = tokio_stream::iter(vec![
981 Ok(Bytes::from_static(payload)),
982 Ok(Bytes::from_static(payload2)),
983 ]);
984
985 handle_domain_data_stream(
986 tx,
987 stream,
988 "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
989 )
990 .await;
991
992 let output: Vec<DomainData> = rx
993 .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
994 .await
995 .into_iter()
996 .map(|r| r.unwrap())
997 .collect();
998 assert_eq!(output.len(), 2);
999 assert_eq!(
1000 std::str::from_utf8(&output[1].data).unwrap(),
1001 "{\"test\": \"test updated\"}"
1002 );
1003 assert_eq!(
1004 std::str::from_utf8(&output[0].data).unwrap(),
1005 "{\"test\": \"test\"}"
1006 );
1007 }
1008
1009 #[tokio::test]
1010 #[ignore = "requires live Auki credentials"]
1011 async fn test_upload_v1_with_user_dds_access_token() {
1012 use crate::domain_data::{DomainAction, UploadDomainData};
1013
1014 let (config, domain_id) = get_config();
1015
1016 let mut discovery =
1017 DiscoveryService::new(&config.api_url, &config.dds_url, &config.client_id);
1018 discovery
1019 .sign_in_with_auki_account(&config.email.unwrap(), &config.password.unwrap(), false)
1020 .await
1021 .expect("sign_in_with_auki_account failed");
1022 let domain = discovery
1023 .auth_domain(&domain_id)
1024 .await
1025 .expect("get_domain failed");
1026 let upload_data = vec![
1028 UploadDomainData {
1029 action: DomainAction::Create {
1030 name: "test_upload".to_string(),
1031 data_type: "test".to_string(),
1032 },
1033 data: b"hello world".to_vec(),
1034 },
1035 UploadDomainData {
1036 action: DomainAction::Update {
1037 id: "a84a36e5-312b-4f80-974a-06f5d19c1e16".to_string(),
1038 },
1039 data: b"{\"test\": \"test updated\"}".to_vec(),
1040 },
1041 ];
1042
1043 let result = upload_v1(
1045 &domain.domain.domain_server.url,
1046 &domain.get_access_token(),
1047 &domain_id,
1048 upload_data,
1049 )
1050 .await
1051 .expect("upload_v1 failed");
1052
1053 assert_eq!(result.len(), 2, "No metadata returned from upload_v1");
1054 for data in result {
1055 if data.id != "a84a36e5-312b-4f80-974a-06f5d19c1e16" {
1056 assert_eq!(data.name, "test_upload");
1057 delete_by_id(
1058 &domain.domain.domain_server.url,
1059 &domain.get_access_token(),
1060 &domain_id,
1061 &data.id,
1062 )
1063 .await
1064 .expect("delete_by_id failed");
1065 }
1066 }
1067 }
1068}