1use bytes::Bytes;
2use futures::{SinkExt, Stream, channel::mpsc, stream::StreamExt};
3use reqwest::{Body, Client, Response};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6#[cfg(not(target_family = "wasm"))]
7use tokio::spawn;
8#[cfg(target_family = "wasm")]
9use wasm_bindgen_futures::spawn_local as spawn;
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct DomainDataMetadata {
13 pub id: String,
14 pub domain_id: String,
15 pub name: String,
16 pub data_type: String,
17 pub size: u64,
18 pub created_at: String,
19 pub updated_at: String,
20}
21
22#[derive(Debug, Deserialize, Serialize)]
23pub struct DomainData {
24 pub metadata: DomainDataMetadata,
26 pub data: Vec<u8>,
27}
28
29#[derive(Debug, Serialize, Clone, Deserialize)]
30pub struct UpdateDomainData {
31 pub id: String,
32}
33
34#[derive(Debug, Serialize, Clone, Deserialize)]
35pub struct CreateDomainData {
36 pub name: String,
37 pub data_type: String,
38}
39
40#[derive(Debug, Serialize, Deserialize)]
41#[serde(untagged)]
42pub enum DomainAction {
43 Create(CreateDomainData),
44 Update(UpdateDomainData),
45}
46
47#[derive(Debug, Serialize, Deserialize)]
48pub struct UploadDomainData {
49 #[serde(flatten)]
50 pub action: DomainAction,
51 pub data: Vec<u8>,
52}
53
54#[derive(Debug, Serialize, Deserialize)]
55pub struct DownloadQuery {
56 pub ids: Vec<String>,
57 pub name: Option<String>,
58 pub data_type: Option<String>,
59}
60
61#[derive(Debug, Deserialize)]
62struct ListDomainDataMetadata {
63 pub data: Vec<DomainDataMetadata>,
64}
65
66pub async fn download_by_id(
67 url: &str,
68 client_id: &str,
69 access_token: &str,
70 domain_id: &str,
71 id: &str,
72) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
73 let response = Client::new()
74 .get(&format!(
75 "{}/api/v1/domains/{}/data/{}?raw=true",
76 url, domain_id, id
77 ))
78 .bearer_auth(access_token)
79 .header("posemesh-client-id", client_id)
80 .send()
81 .await?;
82
83 if response.status().is_success() {
84 let data = response.bytes().await?;
85 Ok(data.to_vec())
86 } else {
87 let status = response.status();
88 let text = response
89 .text()
90 .await
91 .unwrap_or_else(|_| "Unknown error".to_string());
92 Err(format!(
93 "Failed to download data by id. Status: {} - {}",
94 status, text
95 )
96 .into())
97 }
98}
99
100pub async fn download_metadata_v1(
101 url: &str,
102 client_id: &str,
103 access_token: &str,
104 domain_id: &str,
105 query: &DownloadQuery,
106) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
107 let response = download_v1(url, client_id, access_token, domain_id, query, false).await?;
108 if response.status().is_success() {
109 let data = response.json::<ListDomainDataMetadata>().await?;
110 Ok(data.data)
111 } else {
112 let status = response.status();
113 let text = response
114 .text()
115 .await
116 .unwrap_or_else(|_| "Unknown error".to_string());
117 Err(format!("Failed to download data. Status: {} - {}", status, text).into())
118 }
119}
120
121pub async fn download_v1(
122 url: &str,
123 client_id: &str,
124 access_token: &str,
125 domain_id: &str,
126 query: &DownloadQuery,
127 with_data: bool,
128) -> Result<Response, Box<dyn std::error::Error + Send + Sync>> {
129 let mut params = HashMap::new();
130
131 if let Some(name) = &query.name {
132 params.insert("name", name.clone());
133 }
134 if let Some(data_type) = &query.data_type {
135 params.insert("data_type", data_type.clone());
136 }
137 let ids = {
138 if !query.ids.is_empty() {
139 let ids = query.ids.join(",");
140 if params.is_empty() {
141 &format!("?ids={}", ids)
142 } else {
143 &format!("?ids={}", ids)
144 }
145 } else {
146 ""
147 }
148 };
149
150 let response = Client::new()
151 .get(&format!("{}/api/v1/domains/{}/data{}", url, domain_id, ids))
152 .bearer_auth(access_token)
153 .header(
154 "Accept",
155 if with_data {
156 "multipart/form-data"
157 } else {
158 "application/json"
159 },
160 )
161 .header("posemesh-client-id", client_id)
162 .query(¶ms)
163 .send()
164 .await?;
165
166 if response.status().is_success() {
167 Ok(response)
168 } else {
169 let status = response.status();
170 let text = response
171 .text()
172 .await
173 .unwrap_or_else(|_| "Unknown error".to_string());
174 Err(format!("Failed to download data. Status: {} - {}", status, text).into())
175 }
176}
177
178pub async fn download_v1_stream(
179 url: &str,
180 client_id: &str,
181 access_token: &str,
182 domain_id: &str,
183 query: &DownloadQuery,
184) -> Result<
185 mpsc::Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
186 Box<dyn std::error::Error + Send + Sync>,
187> {
188 let response = download_v1(url, client_id, access_token, domain_id, query, true).await?;
189
190 let (mut tx, rx) =
191 mpsc::channel::<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>(100);
192
193 let boundary = match response
194 .headers()
195 .get("content-type")
196 .and_then(|ct| ct.to_str().ok())
197 .and_then(|ct| {
198 if ct.starts_with("multipart/form-data; boundary=") {
199 Some(ct.split("boundary=").nth(1)?.to_string())
200 } else {
201 None
202 }
203 }) {
204 Some(b) => b,
205 None => {
206 tracing::error!("Invalid content-type header");
207 let _ = tx.close().await;
208 return Err("Invalid content-type header".into());
209 }
210 };
211
212 spawn(async move {
213 let stream = response.bytes_stream();
214 handle_domain_data_stream(tx, stream, &boundary).await;
215 });
216
217 Ok(rx)
218}
219
220pub async fn delete_by_id(
221 url: &str,
222 access_token: &str,
223 domain_id: &str,
224 id: &str,
225) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
226 let endpoint = format!("{}/api/v1/domains/{}/data/{}", url, domain_id, id);
227 let client = Client::new();
228 let resp = client
229 .delete(&endpoint)
230 .bearer_auth(access_token)
231 .send()
232 .await?;
233
234 if resp.status().is_success() {
235 Ok(())
236 } else {
237 let err = resp
238 .text()
239 .await
240 .unwrap_or_else(|_| "Unknown error".to_string());
241 Err(format!("Delete failed with status: {}", err).into())
242 }
243}
244
245#[cfg(not(target_family = "wasm"))]
246pub async fn upload_v1_stream(
247 url: &str,
248 access_token: &str,
249 domain_id: &str,
250 mut rx: mpsc::Receiver<UploadDomainData>,
251) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
252 use futures::channel::oneshot;
253
254 let boundary = "boundary";
255
256 let (mut create_tx, create_rx) = mpsc::channel(100);
257 let (mut update_tx, update_rx) = mpsc::channel(100);
258
259 let create_body = Body::wrap_stream(create_rx.map(Ok::<Vec<u8>, std::io::Error>));
260 let update_body = Body::wrap_stream(update_rx.map(Ok::<Vec<u8>, std::io::Error>));
261
262 let url = url.to_string();
263 let url_2 = url.clone();
264 let access_token = access_token.to_string();
265 let domain_id = domain_id.to_string();
266 let access_token_2 = access_token.clone();
267 let domain_id_2 = domain_id.clone();
268
269 let (create_signal, create_signal_rx) = oneshot::channel::<
270 Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>>,
271 >();
272 let (update_signal, update_signal_rx) = oneshot::channel::<
273 Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>>,
274 >();
275
276 spawn(async move {
277 let create_response =
278 create_v1(&url, &access_token, &domain_id, boundary, create_body).await;
279 if let Err(Err(e)) = create_signal.send(create_response) {
280 tracing::error!("Failed to send create response: {}", e);
281 }
282 });
283
284 spawn(async move {
285 let update_response =
286 update_v1(&url_2, &access_token_2, &domain_id_2, boundary, update_body).await;
287 if let Err(Err(e)) = update_signal.send(update_response) {
288 tracing::error!("Failed to send update response: {}", e);
289 }
290 });
291
292 while let Some(datum) = rx.next().await {
293 match datum.action {
294 DomainAction::Create(create) => {
295 let create_data = write_create_body(boundary, &create, &datum.data);
296 create_tx.clone().send(create_data).await?;
297 }
298 DomainAction::Update(update) => {
299 let update_data = write_update_body(boundary, &update, &datum.data);
300 update_tx.send(update_data).await?;
301 }
302 }
303 }
304 update_tx
305 .send(format!("--{}--\r\n", boundary).as_bytes().to_vec())
306 .await?;
307 create_tx
308 .send(format!("--{}--\r\n", boundary).as_bytes().to_vec())
309 .await?;
310 update_tx.close().await?;
311 create_tx.close().await?;
312
313 let mut data = {
314 if let Ok(res) = create_signal_rx.await {
315 match res {
316 Ok(d) => d,
317 Err(e) => return Err(e),
318 }
319 } else {
320 return Err("create cancelled".into());
321 }
322 };
323
324 if let Ok(res) = update_signal_rx.await {
325 match res {
326 Ok(d) => data.extend(d),
327 Err(e) => return Err(e),
328 }
329 } else {
330 return Err("update cancelled".into());
331 }
332
333 Ok(data)
334}
335
336async fn update_v1(
337 url: &str,
338 access_token: &str,
339 domain_id: &str,
340 boundary: &str,
341 body: Body,
342) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
343 let update_response = Client::new()
344 .put(&format!("{}/api/v1/domains/{}/data", url, domain_id))
345 .bearer_auth(access_token)
346 .header(
347 "Content-Type",
348 &format!("multipart/form-data; boundary={}", boundary),
349 )
350 .body(body)
351 .send()
352 .await?;
353
354 if update_response.status().is_success() {
355 let data = update_response
356 .json::<ListDomainDataMetadata>()
357 .await
358 .unwrap();
359 Ok(data.data)
360 } else {
361 let err = update_response
362 .text()
363 .await
364 .unwrap_or_else(|_| "Unknown error".to_string());
365 Err(format!("Update failed with status: {}", err).into())
366 }
367}
368
369async fn create_v1(
370 url: &str,
371 access_token: &str,
372 domain_id: &str,
373 boundary: &str,
374 body: Body,
375) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
376 let create_response = Client::new()
377 .post(&format!("{}/api/v1/domains/{}/data", url, domain_id))
378 .bearer_auth(access_token)
379 .header(
380 "Content-Type",
381 &format!("multipart/form-data; boundary={}", boundary),
382 )
383 .body(body)
384 .send()
385 .await?;
386
387 if create_response.status().is_success() {
388 let data = create_response
389 .json::<ListDomainDataMetadata>()
390 .await
391 .unwrap();
392 Ok(data.data)
393 } else {
394 let err = create_response
395 .text()
396 .await
397 .unwrap_or_else(|_| "Unknown error".to_string());
398 Err(format!("Create failed with status: {}", err).into())
399 }
400}
401
402fn write_create_body(boundary: &str, data: &CreateDomainData, data_bytes: &[u8]) -> Vec<u8> {
403 let create_bytes = format!(
404 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
405 boundary, data.name, data.data_type
406 );
407 let mut create_data = create_bytes.into_bytes();
408 create_data.extend_from_slice(data_bytes);
409 create_data.extend_from_slice("\r\n".as_bytes());
410 create_data
411}
412
413fn write_update_body(boundary: &str, data: &UpdateDomainData, data_bytes: &[u8]) -> Vec<u8> {
414 let update_bytes = format!(
415 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; id=\"{}\"\r\n\r\n",
416 boundary, data.id
417 );
418 let mut update_data = update_bytes.into_bytes();
419 update_data.extend_from_slice(data_bytes);
420 update_data.extend_from_slice("\r\n".as_bytes());
421 update_data
422}
423
424pub async fn upload_v1(
425 url: &str,
426 access_token: &str,
427 domain_id: &str,
428 data: Vec<UploadDomainData>,
429) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
430 let boundary = "boundary";
431
432 let mut create_body = Vec::new();
433 let mut update_body = Vec::new();
434 let mut to_update = false;
435 let mut to_create = false;
436
437 for datum in data {
439 match datum.action {
440 DomainAction::Create(create) => {
441 to_create = true;
442 let create_data = write_create_body(boundary, &create, &datum.data);
443 create_body.extend_from_slice(&create_data);
444 }
445 DomainAction::Update(update) => {
446 to_update = true;
447 let update_data = write_update_body(boundary, &update, &datum.data);
448 update_body.extend_from_slice(&update_data);
449 }
450 }
451 }
452
453 create_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
454 update_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
455
456 let create_body = Body::from(create_body);
457 let update_body = Body::from(update_body);
458 let mut res = Vec::new();
459
460 if to_create {
461 res = create_v1(url, access_token, domain_id, boundary, create_body).await?;
462 }
463 if to_update {
464 let update_response =
465 update_v1(url, access_token, domain_id, boundary, update_body).await?;
466 if !update_response.is_empty() {
467 res.extend(update_response);
468 }
469 }
470
471 Ok(res)
472}
473
474fn parse_headers(
475 headers_slice: &[u8],
476) -> Result<DomainData, Box<dyn std::error::Error + Send + Sync>> {
477 let headers_str = String::from_utf8_lossy(headers_slice);
478 let mut domain_data = None;
479
480 for line in headers_str.lines() {
481 if line.trim().is_empty() {
482 break;
483 }
484 if let Some((key, value)) = line.split_once(':') {
485 let key = key.trim().to_lowercase();
486 if key == "content-disposition" {
487 let mut parsed_domain_data = DomainData {
488 metadata: DomainDataMetadata {
489 id: String::new(),
490 domain_id: String::new(),
491 name: String::new(),
492 data_type: String::new(),
493 size: 0,
494 created_at: String::new(),
495 updated_at: String::new(),
496 },
497 data: Vec::new(),
498 };
499 for part in value.split(';') {
500 let part = part.trim();
501 if let Some((key, value)) = part.split_once('=') {
502 let key = key.trim();
503 let value = value.trim().trim_matches('"');
504 match key {
505 "id" => parsed_domain_data.metadata.id = value.to_string(),
506 "domain-id" => {
507 parsed_domain_data.metadata.domain_id = value.to_string()
508 }
509 "name" => parsed_domain_data.metadata.name = value.to_string(),
510 "data-type" => {
511 parsed_domain_data.metadata.data_type = value.to_string()
512 }
513 "size" => parsed_domain_data.metadata.size = value.parse()?,
514 "created-at" => {
515 parsed_domain_data.metadata.created_at = value.to_string()
516 }
517 "updated-at" => {
518 parsed_domain_data.metadata.updated_at = value.to_string()
519 }
520 _ => {}
521 }
522 }
523 }
524 domain_data = Some(parsed_domain_data);
525 }
526 }
527 }
528
529 if let Some(domain_data) = domain_data {
530 Ok(domain_data)
531 } else {
532 Err("Missing content-disposition header".into())
533 }
534}
535
536fn find_boundary(data: &[u8], boundary: &[u8]) -> Option<usize> {
537 let _data = String::from_utf8_lossy(data);
538 let _boundary = String::from_utf8_lossy(boundary);
539 data.windows(boundary.len())
540 .position(|window| window == boundary)
541}
542
543fn find_headers_end(data: &[u8]) -> Option<usize> {
544 if let Some(i) = data.windows(4).position(|w| w == b"\r\n\r\n") {
545 Some(i + 4) } else if let Some(i) = data.windows(2).position(|w| w == b"\n\n") {
547 Some(i + 2) } else {
549 None
550 }
551}
552
553async fn handle_domain_data_stream(
554 mut tx: mpsc::Sender<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
555 stream: impl Stream<Item = Result<Bytes, reqwest::Error>>,
556 boundary: &str,
557) {
558 use futures::pin_mut;
559
560 let mut buffer = Vec::new();
561 let mut current_domain_data: Option<DomainData> = None;
562 let boundary_bytes = format!("--{}", boundary).as_bytes().to_vec();
563
564 pin_mut!(stream);
565
566 while let Some(chunk_result) = stream.next().await {
567 let chunk = match chunk_result {
569 Ok(c) if c.is_empty() => {
570 tx.close().await.ok();
571 return;
572 }
573 Ok(c) => c,
574 Err(e) => {
575 let _ = tx.send(Err(e.into())).await;
576 return;
577 }
578 };
579
580 buffer.extend_from_slice(&chunk);
581
582 if let Some(mut domain_data) = current_domain_data.take() {
584 let expected_size = domain_data.metadata.size as usize - domain_data.data.len();
585 if buffer.len() >= expected_size {
586 domain_data.data.extend_from_slice(&buffer[..expected_size]);
587 buffer.drain(..expected_size);
588 if tx.send(Ok(domain_data)).await.is_err() {
589 return;
590 }
591 } else {
592 domain_data.data.extend_from_slice(&buffer);
593 buffer.clear();
594 current_domain_data = Some(domain_data);
595 continue;
596 }
597 }
598
599 loop {
601 let boundary_pos = match find_boundary(&buffer, &boundary_bytes) {
603 Some(pos) => pos,
604 None => break, };
606
607 let header_end = match find_headers_end(&buffer[boundary_pos..]) {
609 Some(end) => end,
610 None => break, };
612
613 let headers_slice = &buffer[boundary_pos..boundary_pos + header_end];
614 let part_headers = parse_headers(headers_slice);
615
616 let mut domain_data = match part_headers {
617 Ok(data) => data,
618 Err(e) => {
619 tracing::error!("Failed to parse headers: {:?}", e);
620 return;
621 }
622 };
623
624 buffer.drain(..boundary_pos + header_end);
626
627 let expected_size = domain_data.metadata.size as usize - domain_data.data.len();
628 if buffer.len() >= expected_size {
629 domain_data.data.extend_from_slice(&buffer[..expected_size]);
630 buffer.drain(..expected_size);
631 if tx.send(Ok(domain_data)).await.is_err() {
632 return;
633 }
634 } else {
635 domain_data.data.extend_from_slice(&buffer);
636 buffer.clear();
637 current_domain_data = Some(domain_data);
638 break;
639 }
640
641 }
643 }
644}
645
646#[cfg(test)]
647mod tests {
648 use bytes::Bytes;
649
650 use crate::{auth::TokenCache, config::Config, discovery::DiscoveryService};
651
652 use super::*;
653
654 fn get_config() -> (Config, String) {
655 if std::path::Path::new("../.env.local").exists() {
656 dotenvy::from_filename("../.env.local").ok();
657 dotenvy::dotenv().ok();
658 }
659 let config = Config::from_env().unwrap();
660 (config, std::env::var("DOMAIN_ID").unwrap())
661 }
662
663 #[test]
664 fn test_find_boundary_found() {
665 let data = b"random--boundary--data";
666 let boundary = b"--boundary";
667 assert_eq!(find_boundary(data, boundary), Some(6));
668 }
669
670 #[test]
671 fn test_find_boundary_not_found() {
672 let data = b"random-data";
673 let boundary = b"--boundary";
674 assert_eq!(find_boundary(data, boundary), None);
675 }
676
677 #[test]
678 fn test_find_headers_end_crlf() {
679 let data = b"header1: value1\r\nheader2: value2\r\n\r\nbody";
680 assert_eq!(find_headers_end(data), Some(36));
681 }
682
683 #[test]
684 fn test_find_headers_end_lf() {
685 let data = b"header1: value1\nheader2: value2\n\nbody";
686 assert_eq!(find_headers_end(data), Some(33));
687 }
688
689 #[test]
690 fn test_find_headers_end_none() {
691 let data = b"header1: value1\nheader2: value2\nbody";
692 assert_eq!(find_headers_end(data), None);
693 }
694
695 #[test]
696 fn test_parse_headers_success() {
697 let headers = b"content-disposition: form-data; id=\"123\"; domain-id=\"abc\"; name=\"test\"; data-type=\"type\"; size=\"42\"; created-at=\"2024-01-01T00:00:00Z\"; updated-at=\"2024-01-02T00:00:00Z\"\r\n\r\n";
698 let parsed = super::parse_headers(headers);
699 assert!(parsed.is_ok());
700 let domain_data = parsed.unwrap();
701 assert_eq!(domain_data.metadata.id, "123");
702 assert_eq!(domain_data.metadata.domain_id, "abc");
703 assert_eq!(domain_data.metadata.name, "test");
704 assert_eq!(domain_data.metadata.data_type, "type");
705 assert_eq!(domain_data.metadata.size, 42);
706 assert_eq!(domain_data.metadata.created_at, "2024-01-01T00:00:00Z");
707 assert_eq!(domain_data.metadata.updated_at, "2024-01-02T00:00:00Z");
708 }
709
710 #[test]
711 fn test_parse_headers_missing_content_disposition() {
712 let headers = b"content-type: application/octet-stream\r\n\r\n";
713 let parsed = super::parse_headers(headers);
714 assert!(parsed.is_err());
715 }
716
717 #[tokio::test]
718 async fn test_a_chunk_contains_multiple_data() {
719 let (tx, rx) = mpsc::channel(10);
720
721 let payload = br#"
722 --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
723Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
724Content-Type: application/octet-stream
725
726{"test": "test"}
727--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
728Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
729Content-Type: application/octet-stream
730
731{"test": "test updated"}
732--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
733 "#;
734 let stream = tokio_stream::iter(vec![Ok(Bytes::from_static(payload))]);
735
736 handle_domain_data_stream(
737 tx,
738 stream,
739 "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
740 )
741 .await;
742
743 let output: Vec<DomainData> = rx
744 .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
745 .await
746 .into_iter()
747 .map(|r| r.unwrap())
748 .collect();
749 assert_eq!(output.len(), 2);
750 assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
751 assert_eq!(output[0].data, b"{\"test\": \"test\"}");
752 }
753
754 #[tokio::test]
755 async fn test_chunk_size_is_smaller_than_part() {
756 let (tx, rx) = mpsc::channel(10);
757
758 let payload = br#"
759 --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
760Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
761Content-Type: application/octet-stream
762 "#;
763 let payload2 = br#"
764
765{"test": "test"}
766--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
767Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
768Content-Type: application/octet-stream
769
770{"test": "test updated"}
771--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
772"#;
773 let stream = tokio_stream::iter(vec![
774 Ok(Bytes::from_static(payload)),
775 Ok(Bytes::from_static(payload2)),
776 ]);
777
778 handle_domain_data_stream(
779 tx,
780 stream,
781 "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
782 )
783 .await;
784
785 let output: Vec<DomainData> = rx
786 .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
787 .await
788 .into_iter()
789 .map(|r| r.unwrap())
790 .collect();
791 assert_eq!(output.len(), 2);
792 assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
793 assert_eq!(output[0].data, b"{\"test\": \"test\"}");
794 }
795
796 #[tokio::test]
797 async fn test_chunk_size_is_smaller_than_header() {
798 let (tx, rx) = mpsc::channel(10);
799
800 let payload = br#"
801 --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
802Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
803Content-Type: application/octet-stream
804 "#;
805 let payload2 = br#"
806e: application/octet-stream
807
808{"test": "test"}
809--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
810Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
811Content-Type: application/octet-stream
812
813{"test": "test updated"}
814--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
815"#;
816 let stream = tokio_stream::iter(vec![
817 Ok(Bytes::from_static(payload)),
818 Ok(Bytes::from_static(payload2)),
819 ]);
820
821 handle_domain_data_stream(
822 tx,
823 stream,
824 "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
825 )
826 .await;
827
828 let output: Vec<DomainData> = rx
829 .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
830 .await
831 .into_iter()
832 .map(|r| r.unwrap())
833 .collect();
834 assert_eq!(output.len(), 2);
835 assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
836 assert_eq!(output[0].data, b"{\"test\": \"test\"}");
837 }
838
839 #[tokio::test]
840 async fn test_chunk_size_doesnt_cover_the_whole_data() {
841 let (tx, rx) = mpsc::channel(10);
842
843 let payload = br#"
844 --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
845Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
846Content-Type: application/octet-stream
847
848{"test": "test"#;
849 let payload2 = br#""}
850--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
851Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
852Content-Type: application/octet-stream
853
854{"test": "test updated"}
855--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
856"#;
857 let stream = tokio_stream::iter(vec![
858 Ok(Bytes::from_static(payload)),
859 Ok(Bytes::from_static(payload2)),
860 ]);
861
862 handle_domain_data_stream(
863 tx,
864 stream,
865 "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
866 )
867 .await;
868
869 let output: Vec<DomainData> = rx
870 .collect::<Vec<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>>()
871 .await
872 .into_iter()
873 .map(|r| r.unwrap())
874 .collect();
875 assert_eq!(output.len(), 2);
876 assert_eq!(
877 std::str::from_utf8(&output[1].data).unwrap(),
878 "{\"test\": \"test updated\"}"
879 );
880 assert_eq!(
881 std::str::from_utf8(&output[0].data).unwrap(),
882 "{\"test\": \"test\"}"
883 );
884 }
885
886 #[tokio::test]
887 async fn test_upload_v1_with_user_dds_access_token() {
888 use crate::domain_data::{CreateDomainData, DomainAction, UploadDomainData};
889
890 let (config, domain_id) = get_config();
891
892 let mut discovery =
893 DiscoveryService::new(&config.api_url, &config.dds_url, &config.client_id);
894 discovery
895 .sign_in_with_auki_account(&config.email.unwrap(), &config.password.unwrap(), false)
896 .await
897 .expect("sign_in_with_auki_account failed");
898 let domain = discovery
899 .auth_domain(&domain_id)
900 .await
901 .expect("get_domain failed");
902 let upload_data = vec![
904 UploadDomainData {
905 action: DomainAction::Create(CreateDomainData {
906 name: "test_upload".to_string(),
907 data_type: "test".to_string(),
908 }),
909 data: b"hello world".to_vec(),
910 },
911 UploadDomainData {
912 action: DomainAction::Update(UpdateDomainData {
913 id: "a84a36e5-312b-4f80-974a-06f5d19c1e16".to_string(),
914 }),
915 data: b"{\"test\": \"test updated\"}".to_vec(),
916 },
917 ];
918
919 let result = upload_v1(
921 &domain.domain.domain_server.url,
922 &domain.get_access_token(),
923 &domain_id,
924 upload_data,
925 )
926 .await
927 .expect("upload_v1 failed");
928
929 assert_eq!(result.len(), 2, "No metadata returned from upload_v1");
930 for data in result {
931 if data.id != "a84a36e5-312b-4f80-974a-06f5d19c1e16" {
932 assert_eq!(data.name, "test_upload");
933 delete_by_id(
934 &domain.domain.domain_server.url,
935 &domain.get_access_token(),
936 &domain_id,
937 &data.id,
938 )
939 .await
940 .expect("delete_by_id failed");
941 }
942 }
943 }
944}