1use bytes::Bytes;
2use futures::lock::Mutex;
3use futures::{SinkExt, Stream, channel::mpsc, stream::StreamExt};
4use reqwest::{Body, Client, Response, StatusCode};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::sync::OnceLock;
8#[cfg(not(target_family = "wasm"))]
9use tokio::spawn;
10#[cfg(target_family = "wasm")]
11use wasm_bindgen_futures::spawn_local as spawn;
12
13use posemesh_utils::now_unix_secs;
14
15use crate::errors::{AukiErrorResponse, DomainError};
16
17#[derive(Debug, Deserialize, Clone)]
18struct InfoResponse {
19 upload: InfoUpload,
20}
21
22#[derive(Debug, Deserialize, Clone)]
23struct InfoUpload {
24 request_max_bytes: i64,
25 multipart: InfoMultipart,
26}
27
28#[derive(Debug, Deserialize, Clone)]
29struct InfoMultipart {
30 enabled: bool,
31}
32
33#[derive(Debug, Clone)]
34pub struct UploadInfoV1 {
35 pub request_max_bytes: i64,
36 pub multipart_enabled: bool,
37}
38
39#[derive(Debug, Clone)]
40struct InfoCacheEntry {
41 value: Option<UploadInfoV1>,
42 expires_at: u64,
43}
44
45const INFO_CACHE_TTL_SECS: u64 = 60;
46static INFO_CACHE: OnceLock<Mutex<HashMap<String, InfoCacheEntry>>> = OnceLock::new();
47
48fn info_cache() -> &'static Mutex<HashMap<String, InfoCacheEntry>> {
49 INFO_CACHE.get_or_init(|| Mutex::new(HashMap::new()))
50}
51
52async fn fetch_info_v1(url: &str) -> Result<Option<UploadInfoV1>, ()> {
53 let resp = Client::new()
54 .get(format!("{}/api/v1/info", url))
55 .send()
56 .await
57 .map_err(|_| ())?;
58
59 if resp.status() == StatusCode::NOT_FOUND {
60 return Ok(None);
61 }
62 if !resp.status().is_success() {
63 return Err(());
64 }
65 let info = resp.json::<InfoResponse>().await.map_err(|_| ())?;
66 Ok(Some(UploadInfoV1 {
67 request_max_bytes: info.upload.request_max_bytes,
68 multipart_enabled: info.upload.multipart.enabled,
69 }))
70}
71
72pub async fn get_upload_info_v1(url: &str) -> Option<UploadInfoV1> {
73 let now = now_unix_secs();
74 {
75 let cache = info_cache().lock().await;
76 if let Some(entry) = cache.get(url)
77 && entry.expires_at > now
78 {
79 return entry.value.clone();
80 }
81 }
82
83 let fetched = match fetch_info_v1(url).await {
84 Ok(v) => v,
85 Err(_) => return None,
86 };
87
88 let mut cache = info_cache().lock().await;
89 cache.retain(|_, entry| entry.expires_at > now);
90 cache.insert(
91 url.to_string(),
92 InfoCacheEntry {
93 value: fetched.clone(),
94 expires_at: now.saturating_add(INFO_CACHE_TTL_SECS),
95 },
96 );
97 fetched
98}
99
100fn is_unsupported_endpoint_status(status: StatusCode) -> bool {
101 status == StatusCode::NOT_FOUND
102 || status == StatusCode::METHOD_NOT_ALLOWED
103 || status == StatusCode::NOT_IMPLEMENTED
104}
105
106fn is_unsupported_endpoint_error(err: &DomainError) -> bool {
107 match err {
108 DomainError::AukiErrorResponse(resp) => is_unsupported_endpoint_status(resp.status),
109 _ => false,
110 }
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct DomainDataMetadata {
115 pub id: String,
116 pub domain_id: String,
117 pub name: String,
118 pub data_type: String,
119 pub size: u64,
120 pub created_at: String,
121 pub updated_at: String,
122}
123
124#[derive(Debug, Deserialize, Serialize)]
125pub struct DomainData {
126 pub metadata: DomainDataMetadata,
128 pub data: Vec<u8>,
129}
130
131#[derive(Debug, Serialize, Clone, Deserialize)]
132pub struct UpdateDomainData {
133 pub id: String,
134}
135
136#[derive(Debug, Serialize, Clone, Deserialize)]
137pub struct CreateDomainData {
138 pub name: String,
139 pub data_type: String,
140}
141
142#[derive(Debug, Serialize, Deserialize)]
143#[serde(untagged)]
144pub enum DomainAction {
145 Create { name: String, data_type: String },
146 Update { id: String },
147}
148
149#[derive(Debug, Serialize)]
150struct InitiateMultipartRequest {
151 name: String,
152 data_type: String,
153 size: Option<i64>,
154 content_type: Option<String>,
155 existing_id: Option<String>,
156}
157
158#[derive(Debug, Deserialize)]
159struct InitiateMultipartResponse {
160 upload_id: String,
161 part_size: i64,
162}
163
164#[derive(Debug, Serialize)]
165struct CompletedPart {
166 part_number: i32,
167 etag: String,
168}
169
170#[derive(Debug, Serialize)]
171struct CompleteMultipartRequest {
172 parts: Vec<CompletedPart>,
173}
174
175#[derive(Debug, Deserialize)]
176struct UploadPartResult {
177 etag: String,
178}
179
180#[derive(Debug, Serialize, Deserialize)]
181pub struct UploadDomainData {
182 #[serde(flatten)]
183 pub action: DomainAction,
184 pub data: Vec<u8>,
185}
186
187#[derive(Debug, Serialize, Deserialize)]
188pub struct DownloadQuery {
189 pub ids: Vec<String>,
190 pub name: Option<String>,
191 pub data_type: Option<String>,
192}
193
194#[derive(Debug, Deserialize)]
195struct ListDomainDataMetadata {
196 pub data: Vec<DomainDataMetadata>,
197}
198
199pub async fn download_by_id(
200 url: &str,
201 client_id: &str,
202 access_token: &str,
203 domain_id: &str,
204 id: &str,
205) -> Result<Vec<u8>, DomainError> {
206 let response = Client::new()
207 .get(format!(
208 "{}/api/v1/domains/{}/data/{}?raw=true",
209 url, domain_id, id
210 ))
211 .bearer_auth(access_token)
212 .header("posemesh-client-id", client_id)
213 .send()
214 .await?;
215
216 if response.status().is_success() {
217 let data = response.bytes().await?;
218 Ok(data.to_vec())
219 } else {
220 let status = response.status();
221 let error = response
222 .text()
223 .await
224 .unwrap_or_else(|_| "Unknown error".to_string());
225 Err(AukiErrorResponse {
226 status,
227 error: format!("Failed to download data by id. {}", error),
228 }
229 .into())
230 }
231}
232
233pub async fn download_metadata_v1(
234 url: &str,
235 client_id: &str,
236 access_token: &str,
237 domain_id: &str,
238 query: &DownloadQuery,
239) -> Result<Vec<DomainDataMetadata>, DomainError> {
240 let response = download_v1(url, client_id, access_token, domain_id, query, false).await?;
241 if response.status().is_success() {
242 let data = response.json::<ListDomainDataMetadata>().await?;
243 Ok(data.data)
244 } else {
245 let status = response.status();
246 let text = response
247 .text()
248 .await
249 .unwrap_or_else(|_| "Unknown error".to_string());
250 Err(AukiErrorResponse {
251 status,
252 error: format!("Failed to download metadata. {}", text),
253 }
254 .into())
255 }
256}
257
258pub async fn download_v1(
259 url: &str,
260 client_id: &str,
261 access_token: &str,
262 domain_id: &str,
263 query: &DownloadQuery,
264 with_data: bool,
265) -> Result<Response, DomainError> {
266 let mut params = HashMap::new();
267
268 if let Some(name) = &query.name {
269 params.insert("name", name.clone());
270 }
271 if let Some(data_type) = &query.data_type {
272 params.insert("data_type", data_type.clone());
273 }
274 let ids = {
275 if !query.ids.is_empty() {
276 let ids = query.ids.join(",");
277 if params.is_empty() {
278 &format!("?ids={}", ids)
279 } else {
280 &format!("&ids={}", ids)
281 }
282 } else {
283 ""
284 }
285 };
286
287 let response = Client::new()
288 .get(format!("{}/api/v1/domains/{}/data{}", url, domain_id, ids))
289 .bearer_auth(access_token)
290 .header(
291 "Accept",
292 if with_data {
293 "multipart/form-data"
294 } else {
295 "application/json"
296 },
297 )
298 .header("posemesh-client-id", client_id)
299 .query(¶ms)
300 .send()
301 .await?;
302
303 if response.status().is_success() {
304 Ok(response)
305 } else {
306 let status = response.status();
307 let text = response
308 .text()
309 .await
310 .unwrap_or_else(|_| "Unknown error".to_string());
311 Err(AukiErrorResponse {
312 status,
313 error: format!("Failed to download data. {}", text),
314 }
315 .into())
316 }
317}
318
319pub async fn download_v1_stream(
320 url: &str,
321 client_id: &str,
322 access_token: &str,
323 domain_id: &str,
324 query: &DownloadQuery,
325) -> Result<mpsc::Receiver<Result<DomainData, DomainError>>, DomainError> {
326 let response = download_v1(url, client_id, access_token, domain_id, query, true).await?;
327
328 let (mut tx, rx) = mpsc::channel::<Result<DomainData, DomainError>>(100);
329
330 let boundary = match response
331 .headers()
332 .get("content-type")
333 .and_then(|ct| ct.to_str().ok())
334 .and_then(|ct| {
335 if ct.starts_with("multipart/form-data; boundary=") {
336 Some(ct.split("boundary=").nth(1)?.to_string())
337 } else {
338 None
339 }
340 }) {
341 Some(b) => b,
342 None => {
343 tracing::error!("Invalid content-type header");
344 let _ = tx.close().await;
345 return Err(DomainError::InvalidContentTypeHeader);
346 }
347 };
348
349 spawn(async move {
350 let stream = response.bytes_stream();
351 handle_domain_data_stream(tx, stream, &boundary).await;
352 });
353
354 Ok(rx)
355}
356
357pub async fn delete_by_id(
358 url: &str,
359 access_token: &str,
360 domain_id: &str,
361 id: &str,
362) -> Result<(), DomainError> {
363 let endpoint = format!("{}/api/v1/domains/{}/data/{}", url, domain_id, id);
364 let client = Client::new();
365 let resp = client
366 .delete(&endpoint)
367 .bearer_auth(access_token)
368 .send()
369 .await?;
370
371 if resp.status().is_success() {
372 Ok(())
373 } else {
374 let status = resp.status();
375 let err = resp
376 .text()
377 .await
378 .unwrap_or_else(|_| "Unknown error".to_string());
379 Err(AukiErrorResponse {
380 status,
381 error: format!("Failed to delete data by id. {}", err),
382 }
383 .into())
384 }
385}
386
387async fn initiate_domain_data_multipart_upload(
388 client: &Client,
389 url: &str,
390 access_token: &str,
391 domain_id: &str,
392 req: &InitiateMultipartRequest,
393) -> Result<InitiateMultipartResponse, DomainError> {
394 let resp = client
395 .post(format!(
396 "{}/api/v1/domains/{}/data/multipart?uploads",
397 url, domain_id
398 ))
399 .bearer_auth(access_token)
400 .header("Content-Type", "application/json")
401 .json(req)
402 .send()
403 .await?;
404
405 if resp.status().is_success() {
406 Ok(resp.json::<InitiateMultipartResponse>().await?)
407 } else {
408 let status = resp.status();
409 let err = resp
410 .text()
411 .await
412 .unwrap_or_else(|_| "Unknown error".to_string());
413 Err(AukiErrorResponse {
414 status,
415 error: format!("Failed to initiate multipart upload. {}", err),
416 }
417 .into())
418 }
419}
420
421async fn upload_domain_data_multipart_part(
422 client: &Client,
423 url: &str,
424 access_token: &str,
425 domain_id: &str,
426 upload_id: &str,
427 part_number: i32,
428 bytes: Bytes,
429) -> Result<UploadPartResult, DomainError> {
430 let resp = client
431 .put(format!(
432 "{}/api/v1/domains/{}/data/multipart?uploadId={}&partNumber={}",
433 url, domain_id, upload_id, part_number
434 ))
435 .bearer_auth(access_token)
436 .header("Content-Type", "application/octet-stream")
437 .body(bytes)
438 .send()
439 .await?;
440
441 if resp.status().is_success() {
442 Ok(resp.json::<UploadPartResult>().await?)
443 } else {
444 let status = resp.status();
445 let err = resp
446 .text()
447 .await
448 .unwrap_or_else(|_| "Unknown error".to_string());
449 Err(AukiErrorResponse {
450 status,
451 error: format!("Failed to upload multipart part. {}", err),
452 }
453 .into())
454 }
455}
456
457async fn complete_domain_data_multipart_upload(
458 client: &Client,
459 url: &str,
460 access_token: &str,
461 domain_id: &str,
462 upload_id: &str,
463 parts: Vec<CompletedPart>,
464) -> Result<DomainDataMetadata, DomainError> {
465 let resp = client
466 .post(format!(
467 "{}/api/v1/domains/{}/data/multipart?uploadId={}",
468 url, domain_id, upload_id
469 ))
470 .bearer_auth(access_token)
471 .header("Content-Type", "application/json")
472 .json(&CompleteMultipartRequest { parts })
473 .send()
474 .await?;
475
476 if resp.status().is_success() {
477 Ok(resp.json::<DomainDataMetadata>().await?)
478 } else {
479 let status = resp.status();
480 let err = resp
481 .text()
482 .await
483 .unwrap_or_else(|_| "Unknown error".to_string());
484 Err(AukiErrorResponse {
485 status,
486 error: format!("Failed to complete multipart upload. {}", err),
487 }
488 .into())
489 }
490}
491
492async fn abort_domain_data_multipart_upload(
493 client: &Client,
494 url: &str,
495 access_token: &str,
496 domain_id: &str,
497 upload_id: &str,
498) -> Result<(), DomainError> {
499 let resp = client
500 .delete(format!(
501 "{}/api/v1/domains/{}/data/multipart?uploadId={}",
502 url, domain_id, upload_id
503 ))
504 .bearer_auth(access_token)
505 .send()
506 .await?;
507
508 if resp.status().is_success() {
509 Ok(())
510 } else {
511 let status = resp.status();
512 let err = resp
513 .text()
514 .await
515 .unwrap_or_else(|_| "Unknown error".to_string());
516 Err(AukiErrorResponse {
517 status,
518 error: format!("Failed to abort multipart upload. {}", err),
519 }
520 .into())
521 }
522}
523
524async fn upload_domain_data_multipart_bytes(
525 url: &str,
526 access_token: &str,
527 domain_id: &str,
528 action: DomainAction,
529 bytes: Bytes,
530) -> Result<DomainDataMetadata, DomainError> {
531 if bytes.is_empty() {
532 return Err(DomainError::InvalidRequest(
533 "multipart upload requires non-empty data",
534 ));
535 }
536
537 let (name, data_type, existing_id) = match action {
538 DomainAction::Create { name, data_type } => (name, data_type, None),
539 DomainAction::Update { id } => ("".to_string(), "".to_string(), Some(id)),
540 };
541
542 let client = Client::new();
543 let init_res = initiate_domain_data_multipart_upload(
544 &client,
545 url,
546 access_token,
547 domain_id,
548 &InitiateMultipartRequest {
549 name,
550 data_type,
551 size: Some(bytes.len() as i64),
552 content_type: Some("application/octet-stream".to_string()),
553 existing_id,
554 },
555 )
556 .await?;
557
558 let part_size = usize::try_from(init_res.part_size)
559 .map_err(|_| DomainError::InvalidRequest("invalid multipart part_size"))?;
560 if part_size == 0 {
561 return Err(DomainError::InvalidRequest("invalid multipart part_size"));
562 }
563
564 let upload_id = init_res.upload_id;
565 let mut parts = Vec::new();
566
567 let upload_res = async {
568 let mut offset = 0usize;
569 let mut part_number: i32 = 1;
570
571 while offset < bytes.len() {
572 let end = std::cmp::min(offset + part_size, bytes.len());
573 let chunk = bytes.slice(offset..end);
574
575 let res = upload_domain_data_multipart_part(
576 &client,
577 url,
578 access_token,
579 domain_id,
580 &upload_id,
581 part_number,
582 chunk,
583 )
584 .await?;
585
586 parts.push(CompletedPart {
587 part_number,
588 etag: res.etag,
589 });
590
591 offset = end;
592 part_number = part_number
593 .checked_add(1)
594 .ok_or(DomainError::InvalidRequest(
595 "multipart upload too many parts",
596 ))?;
597 }
598
599 complete_domain_data_multipart_upload(
600 &client,
601 url,
602 access_token,
603 domain_id,
604 &upload_id,
605 parts,
606 )
607 .await
608 }
609 .await;
610
611 if upload_res.is_err() {
612 let _ =
613 abort_domain_data_multipart_upload(&client, url, access_token, domain_id, &upload_id)
614 .await;
615 }
616
617 upload_res
618}
619
620#[cfg(not(target_family = "wasm"))]
621pub async fn upload_v1_stream(
622 url: &str,
623 access_token: &str,
624 domain_id: &str,
625 mut rx: mpsc::Receiver<UploadDomainData>,
626) -> Result<Vec<DomainDataMetadata>, DomainError> {
627 use futures::channel::oneshot;
628
629 let boundary = "boundary";
630
631 let info = get_upload_info_v1(url).await;
632 let request_max_bytes = info.as_ref().map(|i| i.request_max_bytes).unwrap_or(0);
633 let multipart_enabled = info.as_ref().map(|i| i.multipart_enabled).unwrap_or(false);
634
635 if request_max_bytes <= 0 || !multipart_enabled {
637 let (mut create_tx, create_rx) = mpsc::channel(100);
638 let (mut update_tx, update_rx) = mpsc::channel(100);
639
640 let create_body = Body::wrap_stream(create_rx.map(Ok::<Vec<u8>, std::io::Error>));
641 let update_body = Body::wrap_stream(update_rx.map(Ok::<Vec<u8>, std::io::Error>));
642
643 let url = url.to_string();
644 let url_2 = url.clone();
645 let access_token = access_token.to_string();
646 let domain_id = domain_id.to_string();
647 let access_token_2 = access_token.clone();
648 let domain_id_2 = domain_id.clone();
649
650 let (create_signal, create_signal_rx) =
651 oneshot::channel::<Result<Vec<DomainDataMetadata>, DomainError>>();
652 let (update_signal, update_signal_rx) =
653 oneshot::channel::<Result<Vec<DomainDataMetadata>, DomainError>>();
654
655 spawn(async move {
656 let create_response =
657 create_v1(&url, &access_token, &domain_id, boundary, create_body).await;
658 if let Err(Err(e)) = create_signal.send(create_response) {
659 tracing::error!("Failed to send create response: {}", e);
660 }
661 });
662
663 spawn(async move {
664 let update_response =
665 update_v1(&url_2, &access_token_2, &domain_id_2, boundary, update_body).await;
666 if let Err(Err(e)) = update_signal.send(update_response) {
667 tracing::error!("Failed to send update response: {}", e);
668 }
669 });
670
671 while let Some(datum) = rx.next().await {
672 match datum.action {
673 DomainAction::Create { name, data_type } => {
674 let create_data = write_create_body(
675 boundary,
676 &CreateDomainData { name, data_type },
677 &datum.data,
678 );
679 create_tx.clone().send(create_data).await?;
680 }
681 DomainAction::Update { id } => {
682 let update_data =
683 write_update_body(boundary, &UpdateDomainData { id }, &datum.data);
684 update_tx.send(update_data).await?;
685 }
686 }
687 }
688 update_tx
689 .send(format!("--{}--\r\n", boundary).as_bytes().to_vec())
690 .await?;
691 create_tx
692 .send(format!("--{}--\r\n", boundary).as_bytes().to_vec())
693 .await?;
694 update_tx.close().await?;
695 create_tx.close().await?;
696
697 let mut data = {
698 match create_signal_rx.await {
699 Ok(res) => match res {
700 Ok(d) => d,
701 Err(e) => return Err(e),
702 },
703 Err(e) => return Err(DomainError::StreamCancelled(e)),
704 }
705 };
706
707 match update_signal_rx.await {
708 Ok(res) => match res {
709 Ok(d) => data.extend(d),
710 Err(e) => return Err(e),
711 },
712 Err(e) => return Err(DomainError::StreamCancelled(e)),
713 }
714
715 return Ok(data);
716 }
717
718 let closing = format!("--{}--\r\n", boundary).into_bytes();
719 let closing_len = closing.len();
720
721 struct Batch {
722 tx: mpsc::Sender<Vec<u8>>,
723 done: oneshot::Receiver<Result<Vec<DomainDataMetadata>, DomainError>>,
724 size: usize,
725 }
726
727 let mut create_batch: Option<Batch> = None;
728 let mut update_batch: Option<Batch> = None;
729 let mut create_done = Vec::new();
730 let mut update_done = Vec::new();
731 let mut create_res = Vec::new();
732 let mut update_res = Vec::new();
733
734 let spawn_create_batch = |url: String, access_token: String, domain_id: String| {
735 let (tx, rx) = mpsc::channel(100);
736 let body = Body::wrap_stream(rx.map(Ok::<Vec<u8>, std::io::Error>));
737 let (signal, signal_rx) =
738 oneshot::channel::<Result<Vec<DomainDataMetadata>, DomainError>>();
739 spawn(async move {
740 let create_response = create_v1(&url, &access_token, &domain_id, boundary, body).await;
741 if let Err(Err(e)) = signal.send(create_response) {
742 tracing::error!("Failed to send create response: {}", e);
743 }
744 });
745 Batch {
746 tx,
747 done: signal_rx,
748 size: 0,
749 }
750 };
751
752 let spawn_update_batch = |url: String, access_token: String, domain_id: String| {
753 let (tx, rx) = mpsc::channel(100);
754 let body = Body::wrap_stream(rx.map(Ok::<Vec<u8>, std::io::Error>));
755 let (signal, signal_rx) =
756 oneshot::channel::<Result<Vec<DomainDataMetadata>, DomainError>>();
757 spawn(async move {
758 let update_response = update_v1(&url, &access_token, &domain_id, boundary, body).await;
759 if let Err(Err(e)) = signal.send(update_response) {
760 tracing::error!("Failed to send update response: {}", e);
761 }
762 });
763 Batch {
764 tx,
765 done: signal_rx,
766 size: 0,
767 }
768 };
769
770 let base_url = url.to_string();
771 let token = access_token.to_string();
772 let did = domain_id.to_string();
773
774 while let Some(datum) = rx.next().await {
775 let bytes = Bytes::from(datum.data);
776 match datum.action {
777 DomainAction::Create { name, data_type } => {
778 let header = format!(
779 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
780 boundary, name, data_type
781 );
782 let part_len = header.len() + bytes.len() + 2;
783
784 let fits_alone = (part_len + closing_len) as i64 <= request_max_bytes;
785 if multipart_enabled && !fits_alone {
786 match upload_domain_data_multipart_bytes(
787 &base_url,
788 &token,
789 &did,
790 DomainAction::Create {
791 name: name.clone(),
792 data_type: data_type.clone(),
793 },
794 bytes.clone(),
795 )
796 .await
797 {
798 Ok(meta) => {
799 create_res.push(meta);
800 continue;
801 }
802 Err(e) => {
803 if is_unsupported_endpoint_error(&e) {
804 let mut body = Vec::with_capacity(part_len + closing.len());
806 body.extend_from_slice(header.as_bytes());
807 body.extend_from_slice(bytes.as_ref());
808 body.extend_from_slice("\r\n".as_bytes());
809 body.extend_from_slice(&closing);
810 let res =
811 create_v1(&base_url, &token, &did, boundary, Body::from(body))
812 .await?;
813 create_res.extend(res);
814 continue;
815 }
816 return Err(e);
817 }
818 }
819 }
820
821 if create_batch.is_none() {
822 create_batch = Some(spawn_create_batch(
823 base_url.clone(),
824 token.clone(),
825 did.clone(),
826 ));
827 }
828 let mut batch = create_batch.take().unwrap();
829 if batch.size > 0
830 && (batch.size + part_len + closing_len) as i64 > request_max_bytes
831 {
832 batch.tx.send(closing.clone()).await?;
833 batch.tx.close().await?;
834 create_done.push(batch.done);
835 batch = spawn_create_batch(base_url.clone(), token.clone(), did.clone());
836 }
837 let mut part = Vec::with_capacity(part_len);
838 part.extend_from_slice(header.as_bytes());
839 part.extend_from_slice(bytes.as_ref());
840 part.extend_from_slice("\r\n".as_bytes());
841 batch.size += part.len();
842 batch.tx.send(part).await?;
843 create_batch = Some(batch);
844 }
845 DomainAction::Update { id } => {
846 let header = format!(
847 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; id=\"{}\"\r\n\r\n",
848 boundary, id
849 );
850 let part_len = header.len() + bytes.len() + 2;
851
852 let fits_alone = (part_len + closing_len) as i64 <= request_max_bytes;
853 if multipart_enabled && !fits_alone {
854 match upload_domain_data_multipart_bytes(
855 &base_url,
856 &token,
857 &did,
858 DomainAction::Update { id: id.clone() },
859 bytes.clone(),
860 )
861 .await
862 {
863 Ok(meta) => {
864 update_res.push(meta);
865 continue;
866 }
867 Err(e) => {
868 if is_unsupported_endpoint_error(&e) {
869 let mut body = Vec::with_capacity(part_len + closing.len());
870 body.extend_from_slice(header.as_bytes());
871 body.extend_from_slice(bytes.as_ref());
872 body.extend_from_slice("\r\n".as_bytes());
873 body.extend_from_slice(&closing);
874 let res =
875 update_v1(&base_url, &token, &did, boundary, Body::from(body))
876 .await?;
877 update_res.extend(res);
878 continue;
879 }
880 return Err(e);
881 }
882 }
883 }
884
885 if update_batch.is_none() {
886 update_batch = Some(spawn_update_batch(
887 base_url.clone(),
888 token.clone(),
889 did.clone(),
890 ));
891 }
892 let mut batch = update_batch.take().unwrap();
893 if batch.size > 0
894 && (batch.size + part_len + closing_len) as i64 > request_max_bytes
895 {
896 batch.tx.send(closing.clone()).await?;
897 batch.tx.close().await?;
898 update_done.push(batch.done);
899 batch = spawn_update_batch(base_url.clone(), token.clone(), did.clone());
900 }
901 let mut part = Vec::with_capacity(part_len);
902 part.extend_from_slice(header.as_bytes());
903 part.extend_from_slice(bytes.as_ref());
904 part.extend_from_slice("\r\n".as_bytes());
905 batch.size += part.len();
906 batch.tx.send(part).await?;
907 update_batch = Some(batch);
908 }
909 }
910 }
911
912 if let Some(mut batch) = create_batch {
913 batch.tx.send(closing.clone()).await?;
914 batch.tx.close().await?;
915 create_done.push(batch.done);
916 }
917 if let Some(mut batch) = update_batch {
918 batch.tx.send(closing.clone()).await?;
919 batch.tx.close().await?;
920 update_done.push(batch.done);
921 }
922
923 for done in create_done {
924 match done.await {
925 Ok(Ok(v)) => create_res.extend(v),
926 Ok(Err(e)) => return Err(e),
927 Err(e) => return Err(DomainError::StreamCancelled(e)),
928 }
929 }
930
931 for done in update_done {
932 match done.await {
933 Ok(Ok(v)) => update_res.extend(v),
934 Ok(Err(e)) => return Err(e),
935 Err(e) => return Err(DomainError::StreamCancelled(e)),
936 }
937 }
938
939 let mut out = Vec::new();
940 out.extend(create_res);
941 out.extend(update_res);
942 Ok(out)
943}
944
945async fn update_v1(
946 url: &str,
947 access_token: &str,
948 domain_id: &str,
949 boundary: &str,
950 body: Body,
951) -> Result<Vec<DomainDataMetadata>, DomainError> {
952 let update_response = Client::new()
953 .put(format!("{}/api/v1/domains/{}/data", url, domain_id))
954 .bearer_auth(access_token)
955 .header(
956 "Content-Type",
957 &format!("multipart/form-data; boundary={}", boundary),
958 )
959 .body(body)
960 .send()
961 .await?;
962
963 if update_response.status().is_success() {
964 let data = update_response
965 .json::<ListDomainDataMetadata>()
966 .await
967 .unwrap();
968 Ok(data.data)
969 } else {
970 let status = update_response.status();
971 let err = update_response
972 .text()
973 .await
974 .unwrap_or_else(|_| "Unknown error".to_string());
975 Err(AukiErrorResponse {
976 status,
977 error: format!("Failed to update data. {}", err),
978 }
979 .into())
980 }
981}
982
983async fn create_v1(
984 url: &str,
985 access_token: &str,
986 domain_id: &str,
987 boundary: &str,
988 body: Body,
989) -> Result<Vec<DomainDataMetadata>, DomainError> {
990 let create_response = Client::new()
991 .post(format!("{}/api/v1/domains/{}/data", url, domain_id))
992 .bearer_auth(access_token)
993 .header(
994 "Content-Type",
995 &format!("multipart/form-data; boundary={}", boundary),
996 )
997 .body(body)
998 .send()
999 .await?;
1000
1001 if create_response.status().is_success() {
1002 let data = create_response
1003 .json::<ListDomainDataMetadata>()
1004 .await
1005 .unwrap();
1006 Ok(data.data)
1007 } else {
1008 let status = create_response.status();
1009 let err = create_response
1010 .text()
1011 .await
1012 .unwrap_or_else(|_| "Unknown error".to_string());
1013 Err(AukiErrorResponse {
1014 status,
1015 error: format!("Failed to create data. {}", err),
1016 }
1017 .into())
1018 }
1019}
1020
1021fn write_create_body(boundary: &str, data: &CreateDomainData, data_bytes: &[u8]) -> Vec<u8> {
1022 let create_bytes = format!(
1023 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
1024 boundary, data.name, data.data_type
1025 );
1026 let mut create_data = create_bytes.into_bytes();
1027 create_data.extend_from_slice(data_bytes);
1028 create_data.extend_from_slice("\r\n".as_bytes());
1029 create_data
1030}
1031
1032fn write_update_body(boundary: &str, data: &UpdateDomainData, data_bytes: &[u8]) -> Vec<u8> {
1033 let update_bytes = format!(
1034 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; id=\"{}\"\r\n\r\n",
1035 boundary, data.id
1036 );
1037 let mut update_data = update_bytes.into_bytes();
1038 update_data.extend_from_slice(data_bytes);
1039 update_data.extend_from_slice("\r\n".as_bytes());
1040 update_data
1041}
1042
1043pub async fn upload_v1(
1044 url: &str,
1045 access_token: &str,
1046 domain_id: &str,
1047 data: Vec<UploadDomainData>,
1048) -> Result<Vec<DomainDataMetadata>, DomainError> {
1049 let boundary = "boundary";
1050
1051 let info = get_upload_info_v1(url).await;
1052 let request_max_bytes = info.as_ref().map(|i| i.request_max_bytes).unwrap_or(0);
1053 let multipart_enabled = info.as_ref().map(|i| i.multipart_enabled).unwrap_or(false);
1054
1055 if request_max_bytes <= 0 || !multipart_enabled {
1057 let mut create_body = Vec::new();
1058 let mut update_body = Vec::new();
1059 let mut to_update = false;
1060 let mut to_create = false;
1061
1062 for datum in data {
1063 match datum.action {
1064 DomainAction::Create { name, data_type } => {
1065 to_create = true;
1066 let create_data = write_create_body(
1067 boundary,
1068 &CreateDomainData { name, data_type },
1069 &datum.data,
1070 );
1071 create_body.extend_from_slice(&create_data);
1072 }
1073 DomainAction::Update { id } => {
1074 to_update = true;
1075 let update_data =
1076 write_update_body(boundary, &UpdateDomainData { id }, &datum.data);
1077 update_body.extend_from_slice(&update_data);
1078 }
1079 }
1080 }
1081
1082 create_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
1083 update_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
1084
1085 let create_body = Body::from(create_body);
1086 let update_body = Body::from(update_body);
1087 let mut res = Vec::new();
1088
1089 if to_create {
1090 res = create_v1(url, access_token, domain_id, boundary, create_body).await?;
1091 }
1092 if to_update {
1093 let update_response =
1094 update_v1(url, access_token, domain_id, boundary, update_body).await?;
1095 if !update_response.is_empty() {
1096 res.extend(update_response);
1097 }
1098 }
1099
1100 return Ok(res);
1101 }
1102
1103 let closing = format!("--{}--\r\n", boundary).into_bytes();
1104 let closing_len = closing.len();
1105
1106 let mut create_res = Vec::new();
1107 let mut update_res = Vec::new();
1108
1109 let mut create_batch = Vec::new();
1110 let mut update_batch = Vec::new();
1111
1112 let mut create_size = 0usize;
1113 let mut update_size = 0usize;
1114
1115 for datum in data {
1116 let bytes = Bytes::from(datum.data);
1117 match datum.action {
1118 DomainAction::Create { name, data_type } => {
1119 let header = format!(
1120 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
1121 boundary, name, data_type
1122 );
1123 let part_len = header.len() + bytes.len() + 2;
1124 let fits_alone = (part_len + closing_len) as i64 <= request_max_bytes;
1125
1126 if multipart_enabled && !fits_alone {
1127 if !create_batch.is_empty() {
1128 let mut body = std::mem::take(&mut create_batch);
1129 body.extend_from_slice(&closing);
1130 create_res.extend(
1131 create_v1(url, access_token, domain_id, boundary, Body::from(body))
1132 .await?,
1133 );
1134 create_size = 0;
1135 }
1136 match upload_domain_data_multipart_bytes(
1137 url,
1138 access_token,
1139 domain_id,
1140 DomainAction::Create {
1141 name: name.clone(),
1142 data_type: data_type.clone(),
1143 },
1144 bytes.clone(),
1145 )
1146 .await
1147 {
1148 Ok(meta) => {
1149 create_res.push(meta);
1150 }
1151 Err(e) => {
1152 if is_unsupported_endpoint_error(&e) {
1153 let mut body = Vec::with_capacity(part_len + closing.len());
1155 body.extend_from_slice(header.as_bytes());
1156 body.extend_from_slice(bytes.as_ref());
1157 body.extend_from_slice("\r\n".as_bytes());
1158 body.extend_from_slice(&closing);
1159 create_res.extend(
1160 create_v1(
1161 url,
1162 access_token,
1163 domain_id,
1164 boundary,
1165 Body::from(body),
1166 )
1167 .await?,
1168 );
1169 } else {
1170 return Err(e);
1171 }
1172 }
1173 }
1174 continue;
1175 }
1176
1177 if !create_batch.is_empty()
1178 && (create_size + part_len + closing_len) as i64 > request_max_bytes
1179 {
1180 let mut body = std::mem::take(&mut create_batch);
1181 body.extend_from_slice(&closing);
1182 create_res.extend(
1183 create_v1(url, access_token, domain_id, boundary, Body::from(body)).await?,
1184 );
1185 create_size = 0;
1186 }
1187 create_batch.extend_from_slice(header.as_bytes());
1188 create_batch.extend_from_slice(bytes.as_ref());
1189 create_batch.extend_from_slice("\r\n".as_bytes());
1190 create_size += part_len;
1191 }
1192 DomainAction::Update { id } => {
1193 let header = format!(
1194 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; id=\"{}\"\r\n\r\n",
1195 boundary, id
1196 );
1197 let part_len = header.len() + bytes.len() + 2;
1198 let fits_alone = (part_len + closing_len) as i64 <= request_max_bytes;
1199
1200 if multipart_enabled && !fits_alone {
1201 if !update_batch.is_empty() {
1202 let mut body = std::mem::take(&mut update_batch);
1203 body.extend_from_slice(&closing);
1204 update_res.extend(
1205 update_v1(url, access_token, domain_id, boundary, Body::from(body))
1206 .await?,
1207 );
1208 update_size = 0;
1209 }
1210 match upload_domain_data_multipart_bytes(
1211 url,
1212 access_token,
1213 domain_id,
1214 DomainAction::Update { id: id.clone() },
1215 bytes.clone(),
1216 )
1217 .await
1218 {
1219 Ok(meta) => {
1220 update_res.push(meta);
1221 }
1222 Err(e) => {
1223 if is_unsupported_endpoint_error(&e) {
1224 let mut body = Vec::with_capacity(part_len + closing.len());
1225 body.extend_from_slice(header.as_bytes());
1226 body.extend_from_slice(bytes.as_ref());
1227 body.extend_from_slice("\r\n".as_bytes());
1228 body.extend_from_slice(&closing);
1229 update_res.extend(
1230 update_v1(
1231 url,
1232 access_token,
1233 domain_id,
1234 boundary,
1235 Body::from(body),
1236 )
1237 .await?,
1238 );
1239 } else {
1240 return Err(e);
1241 }
1242 }
1243 }
1244 continue;
1245 }
1246
1247 if !update_batch.is_empty()
1248 && (update_size + part_len + closing_len) as i64 > request_max_bytes
1249 {
1250 let mut body = std::mem::take(&mut update_batch);
1251 body.extend_from_slice(&closing);
1252 update_res.extend(
1253 update_v1(url, access_token, domain_id, boundary, Body::from(body)).await?,
1254 );
1255 update_size = 0;
1256 }
1257 update_batch.extend_from_slice(header.as_bytes());
1258 update_batch.extend_from_slice(bytes.as_ref());
1259 update_batch.extend_from_slice("\r\n".as_bytes());
1260 update_size += part_len;
1261 }
1262 }
1263 }
1264
1265 if !create_batch.is_empty() {
1266 let mut body = create_batch;
1267 body.extend_from_slice(&closing);
1268 create_res
1269 .extend(create_v1(url, access_token, domain_id, boundary, Body::from(body)).await?);
1270 }
1271 if !update_batch.is_empty() {
1272 let mut body = update_batch;
1273 body.extend_from_slice(&closing);
1274 update_res
1275 .extend(update_v1(url, access_token, domain_id, boundary, Body::from(body)).await?);
1276 }
1277
1278 let mut res = Vec::new();
1279 res.extend(create_res);
1280 res.extend(update_res);
1281 Ok(res)
1282}
1283
1284fn parse_headers(
1285 headers_slice: &[u8],
1286) -> Result<DomainData, Box<dyn std::error::Error + Send + Sync>> {
1287 let headers_str = String::from_utf8_lossy(headers_slice);
1288 let mut domain_data = None;
1289
1290 for line in headers_str.lines() {
1291 if line.trim().is_empty() {
1292 break;
1293 }
1294 if let Some((key, value)) = line.split_once(':') {
1295 let key = key.trim().to_lowercase();
1296 if key == "content-disposition" {
1297 let mut parsed_domain_data = DomainData {
1298 metadata: DomainDataMetadata {
1299 id: String::new(),
1300 domain_id: String::new(),
1301 name: String::new(),
1302 data_type: String::new(),
1303 size: 0,
1304 created_at: String::new(),
1305 updated_at: String::new(),
1306 },
1307 data: Vec::new(),
1308 };
1309 for part in value.split(';') {
1310 let part = part.trim();
1311 if let Some((key, value)) = part.split_once('=') {
1312 let key = key.trim();
1313 let value = value.trim().trim_matches('"');
1314 match key {
1315 "id" => parsed_domain_data.metadata.id = value.to_string(),
1316 "domain-id" => {
1317 parsed_domain_data.metadata.domain_id = value.to_string()
1318 }
1319 "name" => parsed_domain_data.metadata.name = value.to_string(),
1320 "data-type" => {
1321 parsed_domain_data.metadata.data_type = value.to_string()
1322 }
1323 "size" => parsed_domain_data.metadata.size = value.parse()?,
1324 "created-at" => {
1325 parsed_domain_data.metadata.created_at = value.to_string()
1326 }
1327 "updated-at" => {
1328 parsed_domain_data.metadata.updated_at = value.to_string()
1329 }
1330 _ => {}
1331 }
1332 }
1333 }
1334 domain_data = Some(parsed_domain_data);
1335 }
1336 }
1337 }
1338
1339 if let Some(domain_data) = domain_data {
1340 Ok(domain_data)
1341 } else {
1342 Err("Missing content-disposition header".into())
1343 }
1344}
1345
1346fn find_boundary(data: &[u8], boundary: &[u8]) -> Option<usize> {
1347 let _data = String::from_utf8_lossy(data);
1348 let _boundary = String::from_utf8_lossy(boundary);
1349 data.windows(boundary.len())
1350 .position(|window| window == boundary)
1351}
1352
1353fn find_headers_end(data: &[u8]) -> Option<usize> {
1354 if let Some(i) = data.windows(4).position(|w| w == b"\r\n\r\n") {
1355 Some(i + 4) } else {
1357 data.windows(2).position(|w| w == b"\n\n").map(|i| i + 2)
1358 }
1359}
1360
1361async fn handle_domain_data_stream(
1362 mut tx: mpsc::Sender<Result<DomainData, DomainError>>,
1363 stream: impl Stream<Item = Result<Bytes, reqwest::Error>>,
1364 boundary: &str,
1365) {
1366 use futures::pin_mut;
1367
1368 let mut buffer = Vec::new();
1369 let mut current_domain_data: Option<DomainData> = None;
1370 let boundary_bytes = format!("--{}", boundary).as_bytes().to_vec();
1371
1372 pin_mut!(stream);
1373
1374 while let Some(chunk_result) = stream.next().await {
1375 let chunk = match chunk_result {
1377 Ok(c) if c.is_empty() => {
1378 tx.close().await.ok();
1379 return;
1380 }
1381 Ok(c) => c,
1382 Err(e) => {
1383 let _ = tx.send(Err(e.into())).await;
1384 return;
1385 }
1386 };
1387
1388 buffer.extend_from_slice(&chunk);
1389
1390 if let Some(mut domain_data) = current_domain_data.take() {
1392 let expected_size = domain_data.metadata.size as usize - domain_data.data.len();
1393 if buffer.len() >= expected_size {
1394 domain_data.data.extend_from_slice(&buffer[..expected_size]);
1395 buffer.drain(..expected_size);
1396 if tx.send(Ok(domain_data)).await.is_err() {
1397 return;
1398 }
1399 } else {
1400 domain_data.data.extend_from_slice(&buffer);
1401 buffer.clear();
1402 current_domain_data = Some(domain_data);
1403 continue;
1404 }
1405 }
1406
1407 while let Some(boundary_pos) = find_boundary(&buffer, &boundary_bytes) {
1409 let header_end = match find_headers_end(&buffer[boundary_pos..]) {
1411 Some(end) => end,
1412 None => break, };
1414
1415 let headers_slice = &buffer[boundary_pos..boundary_pos + header_end];
1416 let part_headers = parse_headers(headers_slice);
1417
1418 let mut domain_data = match part_headers {
1419 Ok(data) => data,
1420 Err(e) => {
1421 tracing::error!("Failed to parse headers: {:?}", e);
1422 return;
1423 }
1424 };
1425
1426 buffer.drain(..boundary_pos + header_end);
1428
1429 let expected_size = domain_data.metadata.size as usize - domain_data.data.len();
1430 if buffer.len() >= expected_size {
1431 domain_data.data.extend_from_slice(&buffer[..expected_size]);
1432 buffer.drain(..expected_size);
1433 if tx.send(Ok(domain_data)).await.is_err() {
1434 return;
1435 }
1436 } else {
1437 domain_data.data.extend_from_slice(&buffer);
1438 buffer.clear();
1439 current_domain_data = Some(domain_data);
1440 break;
1441 }
1442 }
1443 }
1444}
1445
1446#[cfg(test)]
1447mod tests {
1448 use super::*;
1449 use bytes::Bytes;
1450
1451 #[test]
1452 fn test_find_boundary_found() {
1453 let data = b"random--boundary--data";
1454 let boundary = b"--boundary";
1455 assert_eq!(find_boundary(data, boundary), Some(6));
1456 }
1457
1458 #[test]
1459 fn test_find_boundary_not_found() {
1460 let data = b"random-data";
1461 let boundary = b"--boundary";
1462 assert_eq!(find_boundary(data, boundary), None);
1463 }
1464
1465 #[test]
1466 fn test_find_headers_end_crlf() {
1467 let data = b"header1: value1\r\nheader2: value2\r\n\r\nbody";
1468 assert_eq!(find_headers_end(data), Some(36));
1469 }
1470
1471 #[test]
1472 fn test_find_headers_end_lf() {
1473 let data = b"header1: value1\nheader2: value2\n\nbody";
1474 assert_eq!(find_headers_end(data), Some(33));
1475 }
1476
1477 #[test]
1478 fn test_find_headers_end_none() {
1479 let data = b"header1: value1\nheader2: value2\nbody";
1480 assert_eq!(find_headers_end(data), None);
1481 }
1482
1483 #[test]
1484 fn test_parse_headers_success() {
1485 let headers = b"content-disposition: form-data; id=\"123\"; domain-id=\"abc\"; name=\"test\"; data-type=\"type\"; size=\"42\"; created-at=\"2024-01-01T00:00:00Z\"; updated-at=\"2024-01-02T00:00:00Z\"\r\n\r\n";
1486 let parsed = super::parse_headers(headers);
1487 assert!(parsed.is_ok());
1488 let domain_data = parsed.unwrap();
1489 assert_eq!(domain_data.metadata.id, "123");
1490 assert_eq!(domain_data.metadata.domain_id, "abc");
1491 assert_eq!(domain_data.metadata.name, "test");
1492 assert_eq!(domain_data.metadata.data_type, "type");
1493 assert_eq!(domain_data.metadata.size, 42);
1494 assert_eq!(domain_data.metadata.created_at, "2024-01-01T00:00:00Z");
1495 assert_eq!(domain_data.metadata.updated_at, "2024-01-02T00:00:00Z");
1496 }
1497
1498 #[test]
1499 fn test_parse_headers_missing_content_disposition() {
1500 let headers = b"content-type: application/octet-stream\r\n\r\n";
1501 let parsed = super::parse_headers(headers);
1502 assert!(parsed.is_err());
1503 }
1504
1505 #[tokio::test]
1506 async fn test_a_chunk_contains_multiple_data() {
1507 let (tx, rx) = mpsc::channel(10);
1508
1509 let payload = br#"
1510 --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
1511Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
1512Content-Type: application/octet-stream
1513
1514{"test": "test"}
1515--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
1516Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
1517Content-Type: application/octet-stream
1518
1519{"test": "test updated"}
1520--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
1521 "#;
1522 let stream = tokio_stream::iter(vec![Ok(Bytes::from_static(payload))]);
1523
1524 handle_domain_data_stream(
1525 tx,
1526 stream,
1527 "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
1528 )
1529 .await;
1530
1531 let output: Vec<DomainData> = rx
1532 .collect::<Vec<Result<DomainData, DomainError>>>()
1533 .await
1534 .into_iter()
1535 .map(|r| r.unwrap())
1536 .collect();
1537 assert_eq!(output.len(), 2);
1538 assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
1539 assert_eq!(output[0].data, b"{\"test\": \"test\"}");
1540 }
1541
1542 #[tokio::test]
1543 async fn test_chunk_size_is_smaller_than_part() {
1544 let (tx, rx) = mpsc::channel(10);
1545
1546 let payload = br#"
1547 --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
1548Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
1549Content-Type: application/octet-stream
1550 "#;
1551 let payload2 = br#"
1552
1553{"test": "test"}
1554--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
1555Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
1556Content-Type: application/octet-stream
1557
1558{"test": "test updated"}
1559--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
1560"#;
1561 let stream = tokio_stream::iter(vec![
1562 Ok(Bytes::from_static(payload)),
1563 Ok(Bytes::from_static(payload2)),
1564 ]);
1565
1566 handle_domain_data_stream(
1567 tx,
1568 stream,
1569 "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
1570 )
1571 .await;
1572
1573 let output: Vec<DomainData> = rx
1574 .collect::<Vec<Result<DomainData, DomainError>>>()
1575 .await
1576 .into_iter()
1577 .map(|r| r.unwrap())
1578 .collect();
1579 assert_eq!(output.len(), 2);
1580 assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
1581 assert_eq!(output[0].data, b"{\"test\": \"test\"}");
1582 }
1583
1584 #[tokio::test]
1585 async fn test_chunk_size_is_smaller_than_header() {
1586 let (tx, rx) = mpsc::channel(10);
1587
1588 let payload = br#"
1589 --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
1590Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
1591Content-Type: application/octet-stream
1592 "#;
1593 let payload2 = br#"
1594e: application/octet-stream
1595
1596{"test": "test"}
1597--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
1598Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
1599Content-Type: application/octet-stream
1600
1601{"test": "test updated"}
1602--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
1603"#;
1604 let stream = tokio_stream::iter(vec![
1605 Ok(Bytes::from_static(payload)),
1606 Ok(Bytes::from_static(payload2)),
1607 ]);
1608
1609 handle_domain_data_stream(
1610 tx,
1611 stream,
1612 "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
1613 )
1614 .await;
1615
1616 let output: Vec<DomainData> = rx
1617 .collect::<Vec<Result<DomainData, DomainError>>>()
1618 .await
1619 .into_iter()
1620 .map(|r| r.unwrap())
1621 .collect();
1622 assert_eq!(output.len(), 2);
1623 assert_eq!(output[1].data, b"{\"test\": \"test updated\"}");
1624 assert_eq!(output[0].data, b"{\"test\": \"test\"}");
1625 }
1626
1627 #[tokio::test]
1628 async fn test_chunk_size_doesnt_cover_the_whole_data() {
1629 let (tx, rx) = mpsc::channel(10);
1630
1631 let payload = br#"
1632 --0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
1633Content-Disposition: form-data; name="to be deleted"; data-type="test"; id="3c5bbdbc-65b9-4f11-93b6-a3e535d63990"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="16"; created-at="2025-09-25T02:54:26.124336Z"; updated-at="2025-09-25T02:54:26.124336Z"
1634Content-Type: application/octet-stream
1635
1636{"test": "test"#;
1637 let payload2 = br#""}
1638--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda
1639Content-Disposition: form-data; name="test"; data-type="test"; id="a84a36e5-312b-4f80-974a-06f5d19c1e16"; domain-id="23d60e61-6978-4f6b-a59d-9ffa027755fc"; size="24"; created-at="2025-08-05T10:29:56.448595Z"; updated-at="2025-09-25T02:54:26.154224Z"
1640Content-Type: application/octet-stream
1641
1642{"test": "test updated"}
1643--0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda--
1644"#;
1645 let stream = tokio_stream::iter(vec![
1646 Ok(Bytes::from_static(payload)),
1647 Ok(Bytes::from_static(payload2)),
1648 ]);
1649
1650 handle_domain_data_stream(
1651 tx,
1652 stream,
1653 "0f336dec6f61e466706eb557cda40d8caa86c28df397bd7348766b5b5eda",
1654 )
1655 .await;
1656
1657 let output: Vec<DomainData> = rx
1658 .collect::<Vec<Result<DomainData, DomainError>>>()
1659 .await
1660 .into_iter()
1661 .map(|r| r.unwrap())
1662 .collect();
1663 assert_eq!(output.len(), 2);
1664 assert_eq!(
1665 std::str::from_utf8(&output[1].data).unwrap(),
1666 "{\"test\": \"test updated\"}"
1667 );
1668 assert_eq!(
1669 std::str::from_utf8(&output[0].data).unwrap(),
1670 "{\"test\": \"test\"}"
1671 );
1672 }
1673}