1use bytes::Bytes;
2use futures::{SinkExt, Stream, channel::mpsc, stream::StreamExt};
3use reqwest::{Body, Client, Response};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6#[cfg(not(target_family = "wasm"))]
7use tokio::spawn;
8#[cfg(target_family = "wasm")]
9use wasm_bindgen_futures::spawn_local as spawn;
10
11#[derive(Debug, Clone)]
12pub struct DomainServer {
13 pub url: String,
14}
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct DomainDataMetadata {
18 pub id: String,
19 pub domain_id: String,
20 pub name: String,
21 pub data_type: String,
22 pub size: u64,
23 pub created_at: String,
24 pub updated_at: String,
25}
26
27#[derive(Debug, Deserialize, Serialize)]
28pub struct DomainData {
29 pub metadata: DomainDataMetadata,
31 pub data: Vec<u8>,
32}
33
34#[derive(Debug, Serialize, Clone, Deserialize)]
35pub struct UpdateDomainData {
36 pub id: String,
37}
38
39#[derive(Debug, Serialize, Clone, Deserialize)]
40pub struct CreateDomainData {
41 pub name: String,
42 pub data_type: String,
43}
44
45#[derive(Debug, Serialize, Deserialize)]
46#[serde(untagged)]
47pub enum DomainAction {
48 Create(CreateDomainData),
49 Update(UpdateDomainData),
50}
51
52#[derive(Debug, Serialize, Deserialize)]
53pub struct UploadDomainData {
54 #[serde(flatten)]
55 pub action: DomainAction,
56 pub data: Vec<u8>,
57}
58
59#[derive(Debug, Serialize, Deserialize)]
60pub struct DownloadQuery {
61 pub ids: Vec<String>,
62 pub name: Option<String>,
63 pub data_type: Option<String>,
64}
65
66#[derive(Debug, Deserialize)]
67struct ListDomainDataMetadata {
68 pub data: Vec<DomainDataMetadata>,
69}
70
71pub async fn download_by_id(
72 url: &str,
73 client_id: &str,
74 access_token: &str,
75 domain_id: &str,
76 id: &str,
77) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
78 let response = Client::new()
79 .get(&format!(
80 "{}/api/v1/domains/{}/data/{}?raw=true",
81 url, domain_id, id
82 ))
83 .bearer_auth(access_token)
84 .header("posemesh-client-id", client_id)
85 .send()
86 .await?;
87
88 if response.status().is_success() {
89 let data = response.bytes().await?;
90 Ok(data.to_vec())
91 } else {
92 let status = response.status();
93 let text = response
94 .text()
95 .await
96 .unwrap_or_else(|_| "Unknown error".to_string());
97 Err(format!(
98 "Failed to download data by id. Status: {} - {}",
99 status, text
100 )
101 .into())
102 }
103}
104
105pub async fn download_metadata_v1(
106 url: &str,
107 client_id: &str,
108 access_token: &str,
109 domain_id: &str,
110 query: &DownloadQuery,
111) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
112 let response = download_v1(url, client_id, access_token, domain_id, query, false).await?;
113 if response.status().is_success() {
114 let data = response.json::<ListDomainDataMetadata>().await?;
115 Ok(data.data)
116 } else {
117 let status = response.status();
118 let text = response
119 .text()
120 .await
121 .unwrap_or_else(|_| "Unknown error".to_string());
122 Err(format!("Failed to download data. Status: {} - {}", status, text).into())
123 }
124}
125
126pub async fn download_v1(
127 url: &str,
128 client_id: &str,
129 access_token: &str,
130 domain_id: &str,
131 query: &DownloadQuery,
132 with_data: bool,
133) -> Result<Response, Box<dyn std::error::Error + Send + Sync>> {
134 let mut params = HashMap::new();
135
136 if let Some(name) = &query.name {
137 params.insert("name", name.clone());
138 }
139 if let Some(data_type) = &query.data_type {
140 params.insert("data_type", data_type.clone());
141 }
142 let ids = {
143 if !query.ids.is_empty() {
144 let ids = query.ids.join(",");
145 if params.is_empty() {
146 &format!("?ids={}", ids)
147 } else {
148 &format!("?ids={}", ids)
149 }
150 } else {
151 ""
152 }
153 };
154
155 let response = Client::new()
156 .get(&format!("{}/api/v1/domains/{}/data{}", url, domain_id, ids))
157 .bearer_auth(access_token)
158 .header(
159 "Accept",
160 if with_data {
161 "multipart/form-data"
162 } else {
163 "application/json"
164 },
165 )
166 .header("posemesh-client-id", client_id)
167 .query(¶ms)
168 .send()
169 .await?;
170
171 if response.status().is_success() {
172 Ok(response)
173 } else {
174 let status = response.status();
175 let text = response
176 .text()
177 .await
178 .unwrap_or_else(|_| "Unknown error".to_string());
179 Err(format!("Failed to download data. Status: {} - {}", status, text).into())
180 }
181}
182
183pub async fn download_v1_stream(
184 url: &str,
185 client_id: &str,
186 access_token: &str,
187 domain_id: &str,
188 query: &DownloadQuery,
189) -> Result<
190 mpsc::Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
191 Box<dyn std::error::Error + Send + Sync>,
192> {
193 let response = download_v1(url, client_id, access_token, domain_id, query, true).await?;
194
195 let (mut tx, rx) =
196 mpsc::channel::<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>(100);
197
198 let boundary = match response
199 .headers()
200 .get("content-type")
201 .and_then(|ct| ct.to_str().ok())
202 .and_then(|ct| {
203 if ct.starts_with("multipart/form-data; boundary=") {
204 Some(ct.split("boundary=").nth(1)?.to_string())
205 } else {
206 None
207 }
208 }) {
209 Some(b) => b,
210 None => {
211 tracing::error!("Invalid content-type header");
212 let _ = tx.close().await;
213 return Err("Invalid content-type header".into());
214 }
215 };
216
217 spawn(async move {
218 let stream = response.bytes_stream();
219 handle_domain_data_stream(tx, stream, &boundary).await;
220 });
221
222 Ok(rx)
223}
224
225pub async fn delete_by_id(
226 url: &str,
227 access_token: &str,
228 domain_id: &str,
229 id: &str,
230) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
231 let endpoint = format!("{}/api/v1/domains/{}/data/{}", url, domain_id, id);
232 let client = Client::new();
233 let resp = client
234 .delete(&endpoint)
235 .bearer_auth(access_token)
236 .send()
237 .await?;
238
239 if resp.status().is_success() {
240 Ok(())
241 } else {
242 let err = resp
243 .text()
244 .await
245 .unwrap_or_else(|_| "Unknown error".to_string());
246 Err(format!("Delete failed with status: {}", err).into())
247 }
248}
249
250#[cfg(not(target_family = "wasm"))]
251pub async fn upload_v1_stream(
252 url: &str,
253 access_token: &str,
254 domain_id: &str,
255 mut rx: mpsc::Receiver<UploadDomainData>,
256) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
257 use futures::channel::oneshot;
258
259 let boundary = "boundary";
260
261 let (mut create_tx, create_rx) = mpsc::channel(100);
262 let (mut update_tx, update_rx) = mpsc::channel(100);
263
264 let create_body = Body::wrap_stream(create_rx.map(Ok::<Vec<u8>, std::io::Error>));
265 let update_body = Body::wrap_stream(update_rx.map(Ok::<Vec<u8>, std::io::Error>));
266
267 let url = url.to_string();
268 let url_2 = url.clone();
269 let access_token = access_token.to_string();
270 let domain_id = domain_id.to_string();
271 let access_token_2 = access_token.clone();
272 let domain_id_2 = domain_id.clone();
273
274 let (create_signal, create_signal_rx) = oneshot::channel::<
275 Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>>,
276 >();
277 let (update_signal, update_signal_rx) = oneshot::channel::<
278 Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>>,
279 >();
280
281 spawn(async move {
282 let create_response =
283 create_v1(&url, &access_token, &domain_id, boundary, create_body).await;
284 if let Err(Err(e)) = create_signal.send(create_response) {
285 tracing::error!("Failed to send create response: {}", e);
286 }
287 });
288
289 spawn(async move {
290 let update_response =
291 update_v1(&url_2, &access_token_2, &domain_id_2, boundary, update_body).await;
292 if let Err(Err(e)) = update_signal.send(update_response) {
293 tracing::error!("Failed to send update response: {}", e);
294 }
295 });
296
297 while let Some(datum) = rx.next().await {
298 match datum.action {
299 DomainAction::Create(create) => {
300 let create_data = write_create_body(boundary, &create, &datum.data);
301 create_tx.clone().send(create_data).await?;
302 }
303 DomainAction::Update(update) => {
304 let update_data = write_update_body(boundary, &update, &datum.data);
305 update_tx.send(update_data).await?;
306 }
307 }
308 }
309 update_tx
310 .send(format!("--{}--\r\n", boundary).as_bytes().to_vec())
311 .await?;
312 create_tx
313 .send(format!("--{}--\r\n", boundary).as_bytes().to_vec())
314 .await?;
315 update_tx.close().await?;
316 create_tx.close().await?;
317
318 let mut data = {
319 if let Ok(res) = create_signal_rx.await {
320 match res {
321 Ok(d) => d,
322 Err(e) => return Err(e),
323 }
324 } else {
325 return Err("create cancelled".into());
326 }
327 };
328
329 if let Ok(res) = update_signal_rx.await {
330 match res {
331 Ok(d) => data.extend(d),
332 Err(e) => return Err(e),
333 }
334 } else {
335 return Err("update cancelled".into());
336 }
337
338 Ok(data)
339}
340
341async fn update_v1(
342 url: &str,
343 access_token: &str,
344 domain_id: &str,
345 boundary: &str,
346 body: Body,
347) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
348 let update_response = Client::new()
349 .put(&format!("{}/api/v1/domains/{}/data", url, domain_id))
350 .bearer_auth(access_token)
351 .header(
352 "Content-Type",
353 &format!("multipart/form-data; boundary={}", boundary),
354 )
355 .body(body)
356 .send()
357 .await?;
358
359 if update_response.status().is_success() {
360 let data = update_response
361 .json::<ListDomainDataMetadata>()
362 .await
363 .unwrap();
364 Ok(data.data)
365 } else {
366 let err = update_response
367 .text()
368 .await
369 .unwrap_or_else(|_| "Unknown error".to_string());
370 Err(format!("Update failed with status: {}", err).into())
371 }
372}
373
374async fn create_v1(
375 url: &str,
376 access_token: &str,
377 domain_id: &str,
378 boundary: &str,
379 body: Body,
380) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
381 let create_response = Client::new()
382 .post(&format!("{}/api/v1/domains/{}/data", url, domain_id))
383 .bearer_auth(access_token)
384 .header(
385 "Content-Type",
386 &format!("multipart/form-data; boundary={}", boundary),
387 )
388 .body(body)
389 .send()
390 .await?;
391
392 if create_response.status().is_success() {
393 let data = create_response
394 .json::<ListDomainDataMetadata>()
395 .await
396 .unwrap();
397 Ok(data.data)
398 } else {
399 let err = create_response
400 .text()
401 .await
402 .unwrap_or_else(|_| "Unknown error".to_string());
403 Err(format!("Create failed with status: {}", err).into())
404 }
405}
406
407fn write_create_body(boundary: &str, data: &CreateDomainData, data_bytes: &[u8]) -> Vec<u8> {
408 let create_bytes = format!(
409 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
410 boundary, data.name, data.data_type
411 );
412 let mut create_data = create_bytes.into_bytes();
413 create_data.extend_from_slice(data_bytes);
414 create_data.extend_from_slice("\r\n".as_bytes());
415 create_data
416}
417
418fn write_update_body(boundary: &str, data: &UpdateDomainData, data_bytes: &[u8]) -> Vec<u8> {
419 let update_bytes = format!(
420 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; id=\"{}\"\r\n\r\n",
421 boundary, data.id
422 );
423 let mut update_data = update_bytes.into_bytes();
424 update_data.extend_from_slice(data_bytes);
425 update_data.extend_from_slice("\r\n".as_bytes());
426 update_data
427}
428
429pub async fn upload_v1(
430 url: &str,
431 access_token: &str,
432 domain_id: &str,
433 data: Vec<UploadDomainData>,
434) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
435 let boundary = "boundary";
436
437 let mut create_body = Vec::new();
438 let mut update_body = Vec::new();
439 let mut to_update = false;
440 let mut to_create = false;
441
442 for datum in data {
444 match datum.action {
445 DomainAction::Create(create) => {
446 to_create = true;
447 let create_data = write_create_body(boundary, &create, &datum.data);
448 create_body.extend_from_slice(&create_data);
449 }
450 DomainAction::Update(update) => {
451 to_update = true;
452 let update_data = write_update_body(boundary, &update, &datum.data);
453 update_body.extend_from_slice(&update_data);
454 }
455 }
456 }
457
458 create_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
459 update_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
460
461 let create_body = Body::from(create_body);
462 let update_body = Body::from(update_body);
463 let mut res = Vec::new();
464
465 if to_create {
466 res = create_v1(url, access_token, domain_id, boundary, create_body).await?;
467 }
468 if to_update {
469 let update_response =
470 update_v1(url, access_token, domain_id, boundary, update_body).await?;
471 if !update_response.is_empty() {
472 res.extend(update_response);
473 }
474 }
475
476 Ok(res)
477}
478
479fn parse_headers(
480 headers_slice: &[u8],
481) -> Result<DomainData, Box<dyn std::error::Error + Send + Sync>> {
482 let headers_str = String::from_utf8_lossy(headers_slice);
483 let mut domain_data = None;
484
485 for line in headers_str.lines() {
486 if line.trim().is_empty() {
487 break;
488 }
489 if let Some((key, value)) = line.split_once(':') {
490 let key = key.trim().to_lowercase();
491 if key == "content-disposition" {
492 let mut parsed_domain_data = DomainData {
493 metadata: DomainDataMetadata {
494 id: String::new(),
495 domain_id: String::new(),
496 name: String::new(),
497 data_type: String::new(),
498 size: 0,
499 created_at: String::new(),
500 updated_at: String::new(),
501 },
502 data: Vec::new(),
503 };
504 for part in value.split(';') {
505 let part = part.trim();
506 if let Some((key, value)) = part.split_once('=') {
507 let key = key.trim();
508 let value = value.trim().trim_matches('"');
509 match key {
510 "id" => parsed_domain_data.metadata.id = value.to_string(),
511 "domain-id" => {
512 parsed_domain_data.metadata.domain_id = value.to_string()
513 }
514 "name" => parsed_domain_data.metadata.name = value.to_string(),
515 "data-type" => {
516 parsed_domain_data.metadata.data_type = value.to_string()
517 }
518 "size" => parsed_domain_data.metadata.size = value.parse()?,
519 "created-at" => {
520 parsed_domain_data.metadata.created_at = value.to_string()
521 }
522 "updated-at" => {
523 parsed_domain_data.metadata.updated_at = value.to_string()
524 }
525 _ => {}
526 }
527 }
528 }
529 domain_data = Some(parsed_domain_data);
530 }
531 }
532 }
533
534 if let Some(domain_data) = domain_data {
535 Ok(domain_data)
536 } else {
537 Err("Missing content-disposition header".into())
538 }
539}
540
541fn find_boundary(data: &[u8], boundary: &[u8]) -> Option<usize> {
542 let _data = String::from_utf8_lossy(data);
543 let _boundary = String::from_utf8_lossy(boundary);
544 data.windows(boundary.len())
545 .position(|window| window == boundary)
546}
547
548fn find_headers_end(data: &[u8]) -> Option<usize> {
549 if let Some(i) = data.windows(4).position(|w| w == b"\r\n\r\n") {
550 Some(i + 4) } else if let Some(i) = data.windows(2).position(|w| w == b"\n\n") {
552 Some(i + 2) } else {
554 None
555 }
556}
557
558async fn handle_domain_data_stream(
559 mut tx: mpsc::Sender<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
560 stream: impl Stream<Item = Result<Bytes, reqwest::Error>>,
561 boundary: &str,
562) {
563 use futures::pin_mut;
564
565 let mut buffer = Vec::new();
566 let mut current_domain_data: Option<DomainData> = None;
567 let boundary_bytes = format!("--{}", boundary).as_bytes().to_vec();
568
569 pin_mut!(stream);
570
571 while let Some(chunk_result) = stream.next().await {
572 let chunk = match chunk_result {
574 Ok(c) if c.is_empty() => {
575 tx.close().await.ok();
576 return;
577 }
578 Ok(c) => c,
579 Err(e) => {
580 let _ = tx.send(Err(e.into())).await;
581 return;
582 }
583 };
584
585 buffer.extend_from_slice(&chunk);
586
587 if let Some(mut domain_data) = current_domain_data.take() {
589 let expected_size = domain_data.metadata.size as usize - domain_data.data.len();
590 if buffer.len() >= expected_size {
591 domain_data.data.extend_from_slice(&buffer[..expected_size]);
592 buffer.drain(..expected_size);
593 if tx.send(Ok(domain_data)).await.is_err() {
594 return;
595 }
596 } else {
597 domain_data.data.extend_from_slice(&buffer);
598 buffer.clear();
599 current_domain_data = Some(domain_data);
600 continue;
601 }
602 }
603
604 loop {
606 let boundary_pos = match find_boundary(&buffer, &boundary_bytes) {
608 Some(pos) => pos,
609 None => break, };
611
612 let header_end = match find_headers_end(&buffer[boundary_pos..]) {
614 Some(end) => end,
615 None => break, };
617
618 let headers_slice = &buffer[boundary_pos..boundary_pos + header_end];
619 let part_headers = parse_headers(headers_slice);
620
621 let mut domain_data = match part_headers {
622 Ok(data) => data,
623 Err(e) => {
624 tracing::error!("Failed to parse headers: {:?}", e);
625 return;
626 }
627 };
628
629 buffer.drain(..boundary_pos + header_end);
631
632 let expected_size = domain_data.metadata.size as usize - domain_data.data.len();
633 if buffer.len() >= expected_size {
634 domain_data.data.extend_from_slice(&buffer[..expected_size]);
635 buffer.drain(..expected_size);
636 if tx.send(Ok(domain_data)).await.is_err() {
637 return;
638 }
639 } else {
640 domain_data.data.extend_from_slice(&buffer);
641 buffer.clear();
642 current_domain_data = Some(domain_data);
643 break;
644 }
645
646 }
648 }
649}
650
651#[cfg(test)]
652mod tests {
653 use bytes::Bytes;
654
655 use crate::{auth::TokenCache, config::Config, discovery::DiscoveryService};
656
657 use super::*;
658
659 fn get_config() -> (Config, String) {
660 if std::path::Path::new("../.env.local").exists() {
661 dotenvy::from_filename("../.env.local").ok();
662 dotenvy::dotenv().ok();
663 }
664 let config = Config::from_env().unwrap();
665 (config, std::env::var("DOMAIN_ID").unwrap())
666 }
667
668 #[test]
669 fn test_find_boundary_found() {
670 let data = b"random--boundary--data";
671 let boundary = b"--boundary";
672 assert_eq!(find_boundary(data, boundary), Some(6));
673 }
674
675 #[test]
676 fn test_find_boundary_not_found() {
677 let data = b"random-data";
678 let boundary = b"--boundary";
679 assert_eq!(find_boundary(data, boundary), None);
680 }
681
682 #[test]
683 fn test_find_headers_end_crlf() {
684 let data = b"header1: value1\r\nheader2: value2\r\n\r\nbody";
685 assert_eq!(find_headers_end(data), Some(36));
686 }
687
688 #[test]
689 fn test_find_headers_end_lf() {
690 let data = b"header1: value1\nheader2: value2\n\nbody";
691 assert_eq!(find_headers_end(data), Some(33));
692 }
693
694 #[test]
695 fn test_find_headers_end_none() {
696 let data = b"header1: value1\nheader2: value2\nbody";
697 assert_eq!(find_headers_end(data), None);
698 }
699
700 #[test]
701 fn test_parse_headers_success() {
702 let headers = b"content-disposition: form-data; id=\"123\"; domain-id=\"abc\"; name=\"test\"; data-type=\"type\"; size=\"42\"; created-at=\"2024-01-01T00:00:00Z\"; updated-at=\"2024-01-02T00:00:00Z\"\r\n\r\n";
703 let parsed = super::parse_headers(headers);
704 assert!(parsed.is_ok());
705 let domain_data = parsed.unwrap();
706 assert_eq!(domain_data.metadata.id, "123");
707 assert_eq!(domain_data.metadata.domain_id, "abc");
708 assert_eq!(domain_data.metadata.name, "test");
709 assert_eq!(domain_data.metadata.data_type, "type");
710 assert_eq!(domain_data.metadata.size, 42);
711 assert_eq!(domain_data.metadata.created_at, "2024-01-01T00:00:00Z");
712 assert_eq!(domain_data.metadata.updated_at, "2024-01-02T00:00:00Z");
713 }
714
715 #[test]
716 fn test_parse_headers_missing_content_disposition() {
717 let headers = b"content-type: application/octet-stream\r\n\r\n";
718 let parsed = super::parse_headers(headers);
719 assert!(parsed.is_err());
720 }
721
722 #[tokio::test]
723 async fn test_a_chunk_contains_multiple_data() {
724 let (tx, rx) = mpsc::channel(10);
725
726 let payload = br#"
727 --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
728Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
729Content-Type: application/octet-stream
730
731{"test": "test"}
732--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
733Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
734Content-Type: application/octet-stream
735
736{"test": "test updated"}
737--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
738 "#;
739 let stream = tokio_stream::iter(vec![Ok(Bytes::from_static(payload))]);
740
741 handle_domain_data_stream(
742 tx,
743 stream,
744 "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
745 )
746 .await;
747
748 let output: Vec<DomainData> = rx
749 .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
750 .await
751 .into_iter()
752 .map(|r| r.unwrap())
753 .collect();
754 assert_eq!(output.len(), 2);
755 assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
756 assert_eq!(output[0].data, b"{\"test\": \"test\"}");
757 }
758
759 #[tokio::test]
760 async fn test_chunk_size_is_smaller_than_part() {
761 let (tx, rx) = mpsc::channel(10);
762
763 let payload = br#"
764 --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
765Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
766Content-Type: application/octet-stream
767 "#;
768 let payload2 = br#"
769
770{"test": "test"}
771--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
772Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
773Content-Type: application/octet-stream
774
775{"test": "test updated"}
776--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
777"#;
778 let stream = tokio_stream::iter(vec![
779 Ok(Bytes::from_static(payload)),
780 Ok(Bytes::from_static(payload2)),
781 ]);
782
783 handle_domain_data_stream(
784 tx,
785 stream,
786 "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
787 )
788 .await;
789
790 let output: Vec<DomainData> = rx
791 .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
792 .await
793 .into_iter()
794 .map(|r| r.unwrap())
795 .collect();
796 assert_eq!(output.len(), 2);
797 assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
798 assert_eq!(output[0].data, b"{\"test\": \"test\"}");
799 }
800
801 #[tokio::test]
802 async fn test_chunk_size_is_smaller_than_header() {
803 let (tx, rx) = mpsc::channel(10);
804
805 let payload = br#"
806 --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
807Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
808Content-Type: application/octet-stream
809 "#;
810 let payload2 = br#"
811e: application/octet-stream
812
813{"test": "test"}
814--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
815Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
816Content-Type: application/octet-stream
817
818{"test": "test updated"}
819--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
820"#;
821 let stream = tokio_stream::iter(vec![
822 Ok(Bytes::from_static(payload)),
823 Ok(Bytes::from_static(payload2)),
824 ]);
825
826 handle_domain_data_stream(
827 tx,
828 stream,
829 "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
830 )
831 .await;
832
833 let output: Vec<DomainData> = rx
834 .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
835 .await
836 .into_iter()
837 .map(|r| r.unwrap())
838 .collect();
839 assert_eq!(output.len(), 2);
840 assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
841 assert_eq!(output[0].data, b"{\"test\": \"test\"}");
842 }
843
844 #[tokio::test]
845 async fn test_chunk_size_doesnt_cover_the_whole_data() {
846 let (tx, rx) = mpsc::channel(10);
847
848 let payload = br#"
849 --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
850Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
851Content-Type: application/octet-stream
852
853{"test": "test"#;
854 let payload2 = br#""}
855--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
856Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
857Content-Type: application/octet-stream
858
859{"test": "test updated"}
860--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
861"#;
862 let stream = tokio_stream::iter(vec![
863 Ok(Bytes::from_static(payload)),
864 Ok(Bytes::from_static(payload2)),
865 ]);
866
867 handle_domain_data_stream(
868 tx,
869 stream,
870 "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
871 )
872 .await;
873
874 let output: Vec<DomainData> = rx
875 .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
876 .await
877 .into_iter()
878 .map(|r| r.unwrap())
879 .collect();
880 assert_eq!(output.len(), 2);
881 assert_eq!(
882 std::str::from_utf8(&output[1].data).unwrap(),
883 "{\"test\": \"test updated\"}"
884 );
885 assert_eq!(
886 std::str::from_utf8(&output[0].data).unwrap(),
887 "{\"test\": \"test\"}"
888 );
889 }
890
891 #[tokio::test]
892 async fn test_upload_v1_with_user_dds_access_token() {
893 use crate::domain_data::{CreateDomainData, DomainAction, UploadDomainData};
894
895 let (config, domain_id) = get_config();
896
897 let mut discovery =
898 DiscoveryService::new(&config.api_url, &config.dds_url, &config.client_id);
899 discovery
900 .sign_in_with_auki_account(&config.email.unwrap(), &config.password.unwrap(), false)
901 .await
902 .expect("sign_in_with_auki_account failed");
903 let domain = discovery
904 .auth_domain(&domain_id)
905 .await
906 .expect("get_domain failed");
907 let upload_data = vec![
909 UploadDomainData {
910 action: DomainAction::Create(CreateDomainData {
911 name: "test_upload".to_string(),
912 data_type: "test".to_string(),
913 }),
914 data: b"hello world".to_vec(),
915 },
916 UploadDomainData {
917 action: DomainAction::Update(UpdateDomainData {
918 id: "a84a36e5-312b-4f80-974a-06f5d19c1e16".to_string(),
919 }),
920 data: b"{\"test\": \"test updated\"}".to_vec(),
921 },
922 ];
923
924 let result = upload_v1(
926 &domain.domain.domain_server.url,
927 &domain.get_access_token(),
928 &domain_id,
929 upload_data,
930 )
931 .await
932 .expect("upload_v1 failed");
933
934 assert_eq!(result.len(), 2, "No metadata returned from upload_v1");
935 for data in result {
936 if data.id != "a84a36e5-312b-4f80-974a-06f5d19c1e16" {
937 assert_eq!(data.name, "test_upload");
938 delete_by_id(
939 &domain.domain.domain_server.url,
940 &domain.get_access_token(),
941 &domain_id,
942 &data.id,
943 )
944 .await
945 .expect("delete_by_id failed");
946 }
947 }
948 }
949}