1use futures::{channel::mpsc, stream::StreamExt, SinkExt};
2use reqwest::{Client, Response, Body};
3use serde::{Deserialize, Serialize};
4#[cfg(target_family = "wasm")]
5use wasm_bindgen_futures::js_sys::Uint8Array;
6use std::collections::HashMap;
7#[cfg(not(target_family = "wasm"))]
8use tokio::spawn;
9#[cfg(target_family = "wasm")]
10use wasm_bindgen_futures::spawn_local as spawn;
11#[cfg(target_family = "wasm")]
12use wasm_bindgen::prelude::*;
13
14#[derive(Debug, Clone)]
15pub struct DomainServer {
16 pub url: String,
17}
18
19#[cfg_attr(target_family = "wasm", wasm_bindgen(getter_with_clone))]
20#[derive(Debug, Deserialize, Serialize)]
21pub struct DomainData {
22 pub id: String,
23 pub domain_id: String,
24 pub name: String,
25 pub data_type: String,
26 pub size: u64,
27 pub created_at: String,
28 pub updated_at: String,
29 #[serde(skip_serializing_if = "Vec::is_empty", skip_deserializing)]
30 pub data: Vec<u8>,
31}
32
33#[cfg_attr(target_family = "wasm", wasm_bindgen)]
34impl DomainData {
35 #[cfg(target_family = "wasm")]
36 #[wasm_bindgen]
37 pub fn get_data_bytes(&self) -> Uint8Array {
38 Uint8Array::from(self.data.as_slice())
39 }
40}
41
42#[cfg_attr(target_family = "wasm", wasm_bindgen(getter_with_clone))]
43#[derive(Debug, Serialize, Clone)]
44pub struct UpdateDomainData {
45 pub id: String,
46}
47
48#[cfg_attr(target_family = "wasm", wasm_bindgen)]
49impl UpdateDomainData {
50 #[cfg(target_family = "wasm")]
51 #[wasm_bindgen(constructor)]
52 pub fn new(id: String) -> Self {
53 Self { id }
54 }
55}
56#[cfg_attr(target_family = "wasm", wasm_bindgen(getter_with_clone))]
57#[derive(Debug, Serialize, Clone)]
58pub struct CreateDomainData {
59 pub name: String,
60 pub data_type: String,
61}
62#[cfg_attr(target_family = "wasm", wasm_bindgen)]
63impl CreateDomainData {
64 #[cfg(target_family = "wasm")]
65 #[wasm_bindgen(constructor)]
66 pub fn new(name: String, data_type: String) -> Self {
67 Self { name, data_type }
68 }
69}
70
71#[cfg_attr(target_family = "wasm", wasm_bindgen(getter_with_clone))]
72#[derive(Debug, Serialize)]
73pub struct UploadDomainData {
74 #[serde(flatten, skip_serializing_if = "Option::is_none")]
75 pub create: Option<CreateDomainData>,
76 #[serde(flatten, skip_serializing_if = "Option::is_none")]
77 pub update: Option<UpdateDomainData>,
78 pub data: Vec<u8>,
79}
80
81#[cfg_attr(target_family = "wasm", wasm_bindgen)]
82impl UploadDomainData {
83 #[cfg(target_family = "wasm")]
84 #[wasm_bindgen(constructor)]
85 pub fn new(create: Option<CreateDomainData>, update: Option<UpdateDomainData>, data: Uint8Array) -> Self {
86 Self { create, update, data: data.to_vec() }
87 }
88}
89
90#[derive(Debug, Serialize, Deserialize)]
91pub struct DownloadQuery {
92 pub ids: Vec<String>,
93 pub name: Option<String>,
94 pub data_type: Option<String>,
95}
96
97#[derive(Debug, Deserialize)]
98struct ListDomainData {
99 pub data: Vec<DomainData>,
100}
101
102pub async fn download_by_id(
103 url: &str,
104 client_id: &str,
105 access_token: &str,
106 domain_id: &str,
107 id: &str,
108) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
109 let response = Client::new()
110 .get(&format!("{}/api/v1/domains/{}/data/{}?raw=true", url, domain_id, id))
111 .bearer_auth(access_token)
112 .header("posemesh-client-id", client_id)
113 .send()
114 .await?;
115
116 if response.status().is_success() {
117 let data = response.bytes().await?;
118 Ok(data.to_vec())
119 } else {
120 let status = response.status();
121 let text = response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
122 Err(format!("Failed to download data by id. Status: {} - {}", status, text).into())
123 }
124}
125
126pub async fn download_metadata_v1(
127 url: &str,
128 client_id: &str,
129 access_token: &str,
130 domain_id: &str,
131 query: &DownloadQuery,
132) -> Result<Vec<DomainData>, Box<dyn std::error::Error + Send + Sync>> {
133 let mut params = HashMap::new();
134
135 if !query.ids.is_empty() {
136 params.insert("ids", query.ids.join(","));
137 }
138 if let Some(name) = &query.name {
139 params.insert("name", name.clone());
140 }
141 if let Some(data_type) = &query.data_type {
142 params.insert("data_type", data_type.clone());
143 }
144
145 let response = Client::new()
146 .get(&format!("{}/api/v1/domains/{}/data", url, domain_id))
147 .bearer_auth(access_token)
148 .header("Accept", "application/json")
149 .header("posemesh-client-id", client_id)
150 .query(¶ms)
151 .send()
152 .await?;
153
154 if response.status().is_success() {
155 let data = response.json::<ListDomainData>().await?;
156 Ok(data.data)
157 } else {
158 let status = response.status();
159 let text = response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
160 Err(format!("Failed to download data. Status: {} - {}", status, text).into())
161 }
162}
163
164pub async fn download_v1(
165 url: &str,
166 client_id: &str,
167 access_token: &str,
168 domain_id: &str,
169 query: &DownloadQuery,
170) -> Result<Response, Box<dyn std::error::Error + Send + Sync>> {
171 let mut params = HashMap::new();
172
173 if !query.ids.is_empty() {
174 params.insert("ids", query.ids.join(","));
175 }
176 if let Some(name) = &query.name {
177 params.insert("name", name.clone());
178 }
179 if let Some(data_type) = &query.data_type {
180 params.insert("data_type", data_type.clone());
181 }
182
183 let response = Client::new()
184 .get(&format!("{}/api/v1/domains/{}/data", url, domain_id))
185 .bearer_auth(access_token)
186 .header("Accept", "multipart/form-data")
187 .header("posemesh-client-id", client_id)
188 .query(¶ms)
189 .send()
190 .await?;
191
192 if response.status().is_success() {
193 Ok(response)
194 } else {
195 let status = response.status();
196 let text = response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
197 Err(format!("Failed to download data. Status: {} - {}", status, text).into())
198 }
199}
200
201pub async fn download_v1_stream(
202 url: &str,
203 client_id: &str,
204 access_token: &str,
205 domain_id: &str,
206 query: &DownloadQuery,
207) -> Result<mpsc::Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>, Box<dyn std::error::Error + Send + Sync>> {
208 let response = download_v1(url, client_id, access_token, domain_id, query).await?;
209 let (mut tx, rx) = mpsc::channel::<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>(100);
210
211 let boundary = match response
212 .headers()
213 .get("content-type")
214 .and_then(|ct| ct.to_str().ok())
215 .and_then(|ct| {
216 if ct.starts_with("multipart/form-data; boundary=") {
217 Some(ct.split("boundary=").nth(1)?.to_string())
218 } else {
219 None
220 }
221 }) {
222 Some(b) => b,
223 None => {
224 tracing::error!("Invalid content-type header");
225 let _ = tx.close().await;
226 return Err("Invalid content-type header".into());
227 }
228 };
229
230 spawn(async move {
231 let mut stream = response.bytes_stream();
232 let mut buffer = Vec::new();
233 let mut current_domain_data: Option<DomainData> = None;
234
235 let boundary_bytes = format!("--{}", boundary).as_bytes().to_vec();
236
237 while let Some(chunk_result) = stream.next().await {
238 let chunk = match chunk_result {
239 Ok(c) if c.is_empty() => {
240 tx.close().await.ok();
241 return;
242 },
243 Ok(c) => c,
244 Err(e) => {
245 let _ = tx.send(Err(e.into())).await;
246 return;
247 }
248 };
249
250 buffer.extend_from_slice(&chunk);
251
252 if let Some(mut domain_data) = current_domain_data.take() {
253 let expected_size = domain_data.size as usize;
254 if buffer.len() >= expected_size {
255 domain_data.data.extend_from_slice(&buffer[..expected_size]);
256 buffer.drain(..expected_size);
257 if tx.send(Ok(domain_data)).await.is_err() {
258 return;
259 }
260 } else {
261 current_domain_data = Some(domain_data);
262 continue;
263 }
264 }
265
266 if let Some(boundary_pos) = find_boundary(&buffer, &boundary_bytes) {
268 if let Some(header_end) = find_headers_end(&buffer[boundary_pos..]) {
270 let headers_slice = &buffer[boundary_pos..boundary_pos + header_end];
271 let part_headers = parse_headers(headers_slice);
272 if let Ok(domain_data) = part_headers {
273 current_domain_data = Some(domain_data);
274 } else {
275 tracing::error!("Failed to parse headers: {:?}", part_headers.err());
276 return;
277 }
278
279 buffer.drain(..boundary_pos + header_end);
281 continue;
282 } else {
283 continue;
285 }
286 }
287 }
288 });
289
290 Ok(rx)
291}
292
293#[cfg(not(target_family = "wasm"))]
294pub async fn upload_v1_stream(
295 url: &str,
296 access_token: &str,
297 domain_id: &str,
298 mut rx: mpsc::Receiver<UploadDomainData>,
299) -> Result<Vec<DomainData>, Box<dyn std::error::Error + Send + Sync>> {
300 use futures::channel::oneshot;
301
302 let boundary = "boundary";
303
304 let (mut create_tx, create_rx) = mpsc::channel(100);
305 let (mut update_tx, update_rx) = mpsc::channel(100);
306
307 let create_body = Body::wrap_stream(create_rx.map(Ok::<Vec<u8>, std::io::Error>));
308 let update_body = Body::wrap_stream(update_rx.map(Ok::<Vec<u8>, std::io::Error>));
309
310 let url = url.to_string();
311 let url_2 = url.clone();
312 let access_token = access_token.to_string();
313 let domain_id = domain_id.to_string();
314 let access_token_2 = access_token.clone();
315 let domain_id_2 = domain_id.clone();
316
317 let (create_signal, create_signal_rx) = oneshot::channel::<Result<Vec<DomainData>, Box<dyn std::error::Error + Send + Sync>>>();
318 let (update_signal, update_signal_rx) = oneshot::channel::<Result<Vec<DomainData>, Box<dyn std::error::Error + Send + Sync>>>();
319
320 spawn(async move {
321 let create_response = Client::new()
322 .post(&format!("{}/api/v1/domains/{}/data", url, domain_id))
323 .bearer_auth(access_token)
324 .header("Content-Type", &format!("multipart/form-data; boundary={}", boundary))
325 .body(create_body)
326 .send()
327 .await;
328
329 if let Err(e) = create_response {
330 tracing::error!("Create failed with error: {}", e);
331 create_signal.send(Err(e.into())).unwrap();
332 return;
333 }
334
335 let create_response = create_response.unwrap();
336 if create_response.status().is_success() {
337 let data = create_response.json::<ListDomainData>().await.unwrap();
338 create_signal.send(Ok(data.data)).unwrap();
339 } else {
340 let err = create_response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
341 create_signal.send(Err(format!("Create failed with status: {}", err).into())).unwrap();
342 }
343 });
344
345 spawn(async move {
346 let update_response = Client::new()
347 .put(&format!("{}/api/v1/domains/{}/data", url_2, domain_id_2))
348 .bearer_auth(access_token_2)
349 .header("Content-Type", &format!("multipart/form-data; boundary={}", boundary))
350 .body(update_body)
351 .send()
352 .await;
353
354 if let Err(e) = update_response {
355 tracing::error!("Update failed with error: {}", e);
356 update_signal.send(Err(e.into())).unwrap();
357 return;
358 }
359 let update_response = update_response.unwrap();
360 if update_response.status().is_success() {
361 let data = update_response.json::<ListDomainData>().await.unwrap();
362 update_signal.send(Ok(data.data)).unwrap();
363 } else {
364 let err = update_response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
365 update_signal.send(Err(format!("Update failed with status: {}", err).into())).unwrap();
366 }
367 });
368
369 while let Some(datum) = rx.next().await {
370 if let Some(update) = &datum.update {
372 let update_bytes = format!(
374 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; id=\"{}\"\r\n\r\n",
375 boundary, update.id, update.id
376 );
377 let mut update_data = update_bytes.into_bytes();
378 update_data.extend_from_slice(&datum.data);
379 update_data.extend_from_slice("\r\n".as_bytes());
380
381 update_tx.send(update_data).await?;
382 } else if let Some(create) = &datum.create {
383 let create_bytes = format!(
385 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
386 boundary, create.name, create.data_type
387 );
388 let mut create_data = create_bytes.into_bytes();
389 create_data.extend_from_slice(&datum.data);
390 create_data.extend_from_slice("\r\n".as_bytes());
391
392 create_tx.clone().send(create_data).await?;
393 }
394 }
395 update_tx.send(format!("--{}--\r\n", boundary).as_bytes().to_vec()).await?;
396 create_tx.send(format!("--{}--\r\n", boundary).as_bytes().to_vec()).await?;
397 update_tx.close().await?;
398 create_tx.close().await?;
399
400 let mut data = Vec::new();
401
402 if let Ok(res) = create_signal_rx.await {
403 match res {
404 Ok(d) => data = d,
405 Err(e) => return Err(e),
406 }
407 } else {
408 return Err("create cancelled".into());
409 }
410
411 if let Ok(res) = update_signal_rx.await {
412 match res {
413 Ok(d) => data.extend(d),
414 Err(e) => return Err(e),
415 }
416 } else {
417 return Err("update cancelled".into());
418 }
419
420 Ok(data)
421}
422
423pub async fn upload_v1(
424 url: &str,
425 access_token: &str,
426 domain_id: &str,
427 data: Vec<UploadDomainData>,
428) -> Result<Vec<DomainData>, Box<dyn std::error::Error + Send + Sync>> {
429 let boundary = "boundary";
430
431 let mut create_body = Vec::new();
432 let mut update_body = Vec::new();
433
434 let url = url.to_string();
435 let url_2 = url.clone();
436 let access_token = access_token.to_string();
437 let domain_id = domain_id.to_string();
438 let access_token_2 = access_token.clone();
439 let domain_id_2 = domain_id.clone();
440
441 for datun in data {
443 if let Some(update) = &datun.update {
445 let update_bytes = format!(
447 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; id=\"{}\"\r\n\r\n",
448 boundary, update.id, update.id
449 );
450 let mut update_data = update_bytes.into_bytes();
451 update_data.extend_from_slice(&datun.data);
452 update_data.extend_from_slice("\r\n".as_bytes());
453
454 update_body.extend_from_slice(&update_data);
455 } else if let Some(create) = &datun.create {
456 let create_bytes = format!(
458 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
459 boundary, create.name, create.data_type
460 );
461 let mut create_data = create_bytes.into_bytes();
462 create_data.extend_from_slice(&datun.data);
463 create_data.extend_from_slice("\r\n".as_bytes());
464
465 create_body.extend_from_slice(&create_data);
466 }
467 }
468
469 create_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
470 update_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
471
472 let create_body = Body::from(create_body);
473 let update_body = Body::from(update_body);
474
475 let create_response = Client::new()
476 .post(&format!("{}/api/v1/domains/{}/data", url, domain_id))
477 .bearer_auth(access_token)
478 .header("Content-Type", &format!("multipart/form-data; boundary={}", boundary))
479 .body(create_body)
480 .send()
481 .await?;
482
483 let mut res = Vec::new();
484 if create_response.status().is_success() {
485 let data = create_response.json::<ListDomainData>().await.unwrap();
486 res = data.data;
487 } else {
488 let err = create_response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
489 return Err(format!("Create failed with status: {}", err).into());
490 }
491
492 let update_response = Client::new()
493 .post(&format!("{}/api/v1/domains/{}/data", url_2, domain_id_2))
494 .bearer_auth(access_token_2)
495 .header("Content-Type", &format!("multipart/form-data; boundary={}", boundary))
496 .body(update_body)
497 .send()
498 .await.unwrap();
499
500 if update_response.status().is_success() {
501 let data = update_response.json::<ListDomainData>().await.unwrap();
502 res.extend(data.data);
503 } else {
504 let err = update_response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
505 return Err(format!("Update failed with status: {}", err).into());
506 }
507
508 Ok(res)
509}
510
511fn parse_headers(headers_slice: &[u8]) -> Result<DomainData, Box<dyn std::error::Error + Send + Sync>> {
512 let headers_str = String::from_utf8_lossy(headers_slice);
513 let mut domain_data = None;
514
515 for line in headers_str.lines() {
516 if line.trim().is_empty() {
517 break;
518 }
519 if let Some((key, value)) = line.split_once(':') {
520 let key = key.trim().to_lowercase();
521 if key == "content-disposition" {
522 let mut parsed_domain_data = DomainData {
523 id: String::new(),
524 domain_id: String::new(),
525 name: String::new(),
526 data_type: String::new(),
527 size: 0,
528 created_at: String::new(),
529 updated_at: String::new(),
530 data: Vec::new(),
531 };
532 for part in value.split(';') {
533 let part = part.trim();
534 if let Some((key, value)) = part.split_once('=') {
535 let key = key.trim();
536 let value = value.trim().trim_matches('"');
537 match key {
538 "id" => parsed_domain_data.id = value.to_string(),
539 "domain-id" => parsed_domain_data.domain_id = value.to_string(),
540 "name" => parsed_domain_data.name = value.to_string(),
541 "data-type" => parsed_domain_data.data_type = value.to_string(),
542 "size" => parsed_domain_data.size = value.parse()?,
543 "created-at" => parsed_domain_data.created_at = value.to_string(),
544 "updated-at" => parsed_domain_data.updated_at = value.to_string(),
545 _ => {}
546 }
547 }
548 }
549 domain_data = Some(parsed_domain_data);
550 }
551 }
552 }
553
554 if let Some(domain_data) = domain_data {
555 Ok(domain_data)
556 } else {
557 Err("Missing content-disposition header".into())
558 }
559}
560
561fn find_boundary(data: &[u8], boundary: &[u8]) -> Option<usize> {
562 let _data = String::from_utf8_lossy(data);
563 let _boundary = String::from_utf8_lossy(boundary);
564 data.windows(boundary.len()).position(|window| window == boundary)
565}
566
567fn find_headers_end(data: &[u8]) -> Option<usize> {
568 if let Some(i) = data.windows(4).position(|w| w == b"\r\n\r\n") {
569 Some(i + 4) } else if let Some(i) = data.windows(2).position(|w| w == b"\n\n") {
571 Some(i + 2) } else {
573 None
574 }
575}