1use futures::{channel::mpsc, stream::StreamExt, SinkExt};
2use reqwest::{Client, Response, Body};
3use serde::{Deserialize, Serialize};
4#[cfg(target_family = "wasm")]
5use wasm_bindgen_futures::js_sys::Uint8Array;
6use std::collections::HashMap;
7#[cfg(not(target_family = "wasm"))]
8use tokio::spawn;
9#[cfg(target_family = "wasm")]
10use wasm_bindgen_futures::spawn_local as spawn;
11#[cfg(target_family = "wasm")]
12use wasm_bindgen::prelude::*;
13
14#[derive(Debug, Clone)]
15pub struct DomainServer {
16 pub url: String,
17}
18
19#[cfg_attr(target_family = "wasm", wasm_bindgen(getter_with_clone))]
20#[derive(Debug, Deserialize, Serialize)]
21pub struct DomainData {
22 pub id: String,
23 pub domain_id: String,
24 pub name: String,
25 pub data_type: String,
26 pub size: u64,
27 pub created_at: String,
28 pub updated_at: String,
29 #[serde(skip_serializing_if = "Vec::is_empty", skip_deserializing)]
30 pub data: Vec<u8>,
31}
32
33#[cfg_attr(target_family = "wasm", wasm_bindgen)]
34impl DomainData {
35 #[cfg(target_family = "wasm")]
36 #[wasm_bindgen]
37 pub fn get_data_bytes(&self) -> Uint8Array {
38 Uint8Array::from(self.data.as_slice())
39 }
40}
41
42#[cfg_attr(target_family = "wasm", wasm_bindgen(getter_with_clone))]
43#[derive(Debug, Serialize, Clone)]
44pub struct UpdateDomainData {
45 pub id: String,
46}
47
48#[cfg_attr(target_family = "wasm", wasm_bindgen(getter_with_clone))]
49#[derive(Debug, Serialize, Clone)]
50pub struct CreateDomainData {
51 pub name: String,
52 pub data_type: String,
53}
54
55#[cfg_attr(target_family = "wasm", wasm_bindgen(getter_with_clone))]
56#[derive(Debug, Serialize)]
57pub struct UploadDomainData {
58 #[serde(flatten, skip_serializing_if = "Option::is_none")]
59 pub create: Option<CreateDomainData>,
60 #[serde(flatten, skip_serializing_if = "Option::is_none")]
61 pub update: Option<UpdateDomainData>,
62 pub data: Vec<u8>,
63}
64
65#[derive(Debug, Serialize, Deserialize)]
66pub struct DownloadQuery {
67 pub ids: Vec<String>,
68 pub name: Option<String>,
69 pub data_type: Option<String>,
70}
71
72#[derive(Debug, Deserialize)]
73struct ListDomainData {
74 pub data: Vec<DomainData>,
75}
76
77pub async fn download_metadata_v1(
78 url: &str,
79 client_id: &str,
80 access_token: &str,
81 domain_id: &str,
82 query: &DownloadQuery,
83) -> Result<Vec<DomainData>, Box<dyn std::error::Error + Send + Sync>> {
84 let mut params = HashMap::new();
85
86 if !query.ids.is_empty() {
87 params.insert("ids", query.ids.join(","));
88 }
89 if let Some(name) = &query.name {
90 params.insert("name", name.clone());
91 }
92 if let Some(data_type) = &query.data_type {
93 params.insert("data_type", data_type.clone());
94 }
95
96 let response = Client::new()
97 .get(&format!("{}/api/v1/domains/{}/data", url, domain_id))
98 .bearer_auth(access_token)
99 .header("Accept", "application/json")
100 .header("posemesh-client-id", client_id)
101 .query(¶ms)
102 .send()
103 .await?;
104
105 if response.status().is_success() {
106 let data = response.json::<ListDomainData>().await?;
107 Ok(data.data)
108 } else {
109 let status = response.status();
110 let text = response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
111 Err(format!("Failed to download data. Status: {} - {}", status, text).into())
112 }
113}
114
115pub async fn download_v1(
116 url: &str,
117 client_id: &str,
118 access_token: &str,
119 domain_id: &str,
120 query: &DownloadQuery,
121) -> Result<Response, Box<dyn std::error::Error + Send + Sync>> {
122 let mut params = HashMap::new();
123
124 if !query.ids.is_empty() {
125 params.insert("ids", query.ids.join(","));
126 }
127 if let Some(name) = &query.name {
128 params.insert("name", name.clone());
129 }
130 if let Some(data_type) = &query.data_type {
131 params.insert("data_type", data_type.clone());
132 }
133
134 let response = Client::new()
135 .get(&format!("{}/api/v1/domains/{}/data", url, domain_id))
136 .bearer_auth(access_token)
137 .header("Accept", "multipart/form-data")
138 .header("posemesh-client-id", client_id)
139 .query(¶ms)
140 .send()
141 .await?;
142
143 if response.status().is_success() {
144 Ok(response)
145 } else {
146 let status = response.status();
147 let text = response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
148 Err(format!("Failed to download data. Status: {} - {}", status, text).into())
149 }
150}
151
152pub async fn download_v1_stream(
153 url: &str,
154 client_id: &str,
155 access_token: &str,
156 domain_id: &str,
157 query: &DownloadQuery,
158) -> Result<mpsc::Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>, Box<dyn std::error::Error + Send + Sync>> {
159 let response = download_v1(url, client_id, access_token, domain_id, query).await?;
160 let (mut tx, rx) = mpsc::channel::<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>(100);
161
162 let boundary = match response
163 .headers()
164 .get("content-type")
165 .and_then(|ct| ct.to_str().ok())
166 .and_then(|ct| {
167 if ct.starts_with("multipart/form-data; boundary=") {
168 Some(ct.split("boundary=").nth(1)?.to_string())
169 } else {
170 None
171 }
172 }) {
173 Some(b) => b,
174 None => {
175 tracing::error!("Invalid content-type header");
176 let _ = tx.close().await;
177 return Err("Invalid content-type header".into());
178 }
179 };
180
181 spawn(async move {
182 let mut stream = response.bytes_stream();
183 let mut buffer = Vec::new();
184 let mut current_domain_data: Option<DomainData> = None;
185
186 let boundary_bytes = format!("--{}", boundary).as_bytes().to_vec();
187
188 while let Some(chunk_result) = stream.next().await {
189 let chunk = match chunk_result {
190 Ok(c) if c.is_empty() => {
191 tx.close().await.ok();
192 return;
193 },
194 Ok(c) => c,
195 Err(e) => {
196 let _ = tx.send(Err(e.into())).await;
197 return;
198 }
199 };
200
201 buffer.extend_from_slice(&chunk);
202
203 if let Some(mut domain_data) = current_domain_data.take() {
204 let expected_size = domain_data.size as usize;
205 if buffer.len() >= expected_size {
206 domain_data.data.extend_from_slice(&buffer[..expected_size]);
207 buffer.drain(..expected_size);
208 if tx.send(Ok(domain_data)).await.is_err() {
209 return;
210 }
211 } else {
212 current_domain_data = Some(domain_data);
213 continue;
214 }
215 }
216
217 if let Some(boundary_pos) = find_boundary(&buffer, &boundary_bytes) {
219 if let Some(header_end) = find_headers_end(&buffer[boundary_pos..]) {
221 let headers_slice = &buffer[boundary_pos..boundary_pos + header_end];
222 let part_headers = parse_headers(headers_slice);
223 if let Ok(domain_data) = part_headers {
224 current_domain_data = Some(domain_data);
225 } else {
226 tracing::error!("Failed to parse headers: {:?}", part_headers.err());
227 return;
228 }
229
230 buffer.drain(..boundary_pos + header_end);
232 continue;
233 } else {
234 continue;
236 }
237 }
238 }
239 });
240
241 Ok(rx)
242}
243
244#[cfg(not(target_family = "wasm"))]
245pub async fn upload_v1_stream(
246 url: &str,
247 access_token: &str,
248 domain_id: &str,
249 mut rx: mpsc::Receiver<UploadDomainData>,
250) -> Result<Vec<DomainData>, Box<dyn std::error::Error + Send + Sync>> {
251 use futures::channel::oneshot;
252
253 let boundary = "boundary";
254
255 let (mut create_tx, create_rx) = mpsc::channel(100);
256 let (mut update_tx, update_rx) = mpsc::channel(100);
257
258 let create_body = Body::wrap_stream(create_rx.map(Ok::<Vec<u8>, std::io::Error>));
259 let update_body = Body::wrap_stream(update_rx.map(Ok::<Vec<u8>, std::io::Error>));
260
261 let url = url.to_string();
262 let url_2 = url.clone();
263 let access_token = access_token.to_string();
264 let domain_id = domain_id.to_string();
265 let access_token_2 = access_token.clone();
266 let domain_id_2 = domain_id.clone();
267
268 let (create_signal, create_signal_rx) = oneshot::channel::<Result<Vec<DomainData>, Box<dyn std::error::Error + Send + Sync>>>();
269 let (update_signal, update_signal_rx) = oneshot::channel::<Result<Vec<DomainData>, Box<dyn std::error::Error + Send + Sync>>>();
270
271 spawn(async move {
272 let create_response = Client::new()
273 .post(&format!("{}/api/v1/domains/{}/data", url, domain_id))
274 .bearer_auth(access_token)
275 .header("Content-Type", &format!("multipart/form-data; boundary={}", boundary))
276 .body(create_body)
277 .send()
278 .await;
279
280 if let Err(e) = create_response {
281 tracing::error!("Create failed with error: {}", e);
282 create_signal.send(Err(e.into())).unwrap();
283 return;
284 }
286
287 let create_response = create_response.unwrap();
288 if create_response.status().is_success() {
289 let data = create_response.json::<ListDomainData>().await.unwrap();
290 create_signal.send(Ok(data.data)).unwrap();
291 } else {
292 let err = create_response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
293 create_signal.send(Err(format!("Create failed with status: {}", err).into())).unwrap();
294 }
295 });
296
297 spawn(async move {
298 let update_response = Client::new()
299 .put(&format!("{}/api/v1/domains/{}/data", url_2, domain_id_2))
300 .bearer_auth(access_token_2)
301 .header("Content-Type", &format!("multipart/form-data; boundary={}", boundary))
302 .body(update_body)
303 .send()
304 .await;
305
306 if let Err(e) = update_response {
307 tracing::error!("Update failed with error: {}", e);
308 update_signal.send(Err(e.into())).unwrap();
309 return;
310 }
312 let update_response = update_response.unwrap();
313 if update_response.status().is_success() {
314 let data = update_response.json::<ListDomainData>().await.unwrap();
315 update_signal.send(Ok(data.data)).unwrap();
316 } else {
317 let err = update_response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
318 update_signal.send(Err(format!("Update failed with status: {}", err).into())).unwrap();
319 }
320 });
321
322 while let Some(datum) = rx.next().await {
323 if let Some(update) = &datum.update {
325 let update_bytes = format!(
327 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; id=\"{}\"\r\n\r\n",
328 boundary, update.id, update.id
329 );
330 let mut update_data = update_bytes.into_bytes();
331 update_data.extend_from_slice(&datum.data);
332 update_data.extend_from_slice("\r\n".as_bytes());
333
334 update_tx.send(update_data).await?;
335 } else if let Some(create) = &datum.create {
336 let create_bytes = format!(
338 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
339 boundary, create.name, create.data_type
340 );
341 let mut create_data = create_bytes.into_bytes();
342 create_data.extend_from_slice(&datum.data);
343 create_data.extend_from_slice("\r\n".as_bytes());
344
345 create_tx.clone().send(create_data).await?;
346 }
347 }
348 update_tx.send(format!("--{}--\r\n", boundary).as_bytes().to_vec()).await?;
349 create_tx.send(format!("--{}--\r\n", boundary).as_bytes().to_vec()).await?;
350 update_tx.close().await?;
351 create_tx.close().await?;
352
353 let mut data = Vec::new();
354
355 if let Ok(res) = create_signal_rx.await {
356 match res {
357 Ok(d) => data = d,
358 Err(e) => return Err(e),
359 }
360 } else {
361 return Err("create cancelled".into());
362 }
363
364 if let Ok(res) = update_signal_rx.await {
365 match res {
366 Ok(d) => data.extend(d),
367 Err(e) => return Err(e),
368 }
369 } else {
370 return Err("update cancelled".into());
371 }
372
373 Ok(data)
374}
375
376pub async fn upload_v1(
377 url: &str,
378 access_token: &str,
379 domain_id: &str,
380 data: Vec<UploadDomainData>,
381) -> Result<Vec<DomainData>, Box<dyn std::error::Error + Send + Sync>> {
382 let boundary = "boundary";
383
384 let mut create_body = Vec::new();
385 let mut update_body = Vec::new();
386
387 let url = url.to_string();
388 let url_2 = url.clone();
389 let access_token = access_token.to_string();
390 let domain_id = domain_id.to_string();
391 let access_token_2 = access_token.clone();
392 let domain_id_2 = domain_id.clone();
393
394 for datun in data {
396 if let Some(update) = &datun.update {
398 let update_bytes = format!(
400 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; id=\"{}\"\r\n\r\n",
401 boundary, update.id, update.id
402 );
403 let mut update_data = update_bytes.into_bytes();
404 update_data.extend_from_slice(&datun.data);
405 update_data.extend_from_slice("\r\n".as_bytes());
406
407 update_body.extend_from_slice(&update_data);
408 } else if let Some(create) = &datun.create {
409 let create_bytes = format!(
411 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
412 boundary, create.name, create.data_type
413 );
414 let mut create_data = create_bytes.into_bytes();
415 create_data.extend_from_slice(&datun.data);
416 create_data.extend_from_slice("\r\n".as_bytes());
417
418 create_body.extend_from_slice(&create_data);
419 }
420 }
421
422 create_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
423 update_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
424
425 let create_body = Body::from(create_body);
426 let update_body = Body::from(update_body);
427
428 let create_response = Client::new()
429 .post(&format!("{}/api/v1/domains/{}/data", url, domain_id))
430 .bearer_auth(access_token)
431 .header("Content-Type", "multipart/form-data")
432 .body(create_body)
433 .send()
434 .await.unwrap();
435
436 let mut res = Vec::new();
437 if create_response.status().is_success() {
438 let data = create_response.json::<ListDomainData>().await.unwrap();
439 res = data.data;
440 } else {
441 let err = create_response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
442 return Err(format!("Create failed with status: {}", err).into());
443 }
444
445 let update_response = Client::new()
446 .post(&format!("{}/api/v1/domains/{}/data", url_2, domain_id_2))
447 .bearer_auth(access_token_2)
448 .header("Content-Type", "multipart/form-data")
449 .body(update_body)
450 .send()
451 .await.unwrap();
452
453 if update_response.status().is_success() {
454 let data = update_response.json::<ListDomainData>().await.unwrap();
455 res.extend(data.data);
456 } else {
457 let err = update_response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
458 return Err(format!("Update failed with status: {}", err).into());
459 }
460
461 Ok(res)
462}
463
464fn parse_headers(headers_slice: &[u8]) -> Result<DomainData, Box<dyn std::error::Error + Send + Sync>> {
465 let headers_str = String::from_utf8_lossy(headers_slice);
466 let mut domain_data = None;
467
468 for line in headers_str.lines() {
469 if line.trim().is_empty() {
470 break;
471 }
472 if let Some((key, value)) = line.split_once(':') {
473 let key = key.trim().to_lowercase();
474 if key == "content-disposition" {
475 let mut parsed_domain_data = DomainData {
476 id: String::new(),
477 domain_id: String::new(),
478 name: String::new(),
479 data_type: String::new(),
480 size: 0,
481 created_at: String::new(),
482 updated_at: String::new(),
483 data: Vec::new(),
484 };
485 for part in value.split(';') {
486 let part = part.trim();
487 if let Some((key, value)) = part.split_once('=') {
488 let key = key.trim();
489 let value = value.trim().trim_matches('"');
490 match key {
491 "id" => parsed_domain_data.id = value.to_string(),
492 "domain-id" => parsed_domain_data.domain_id = value.to_string(),
493 "name" => parsed_domain_data.name = value.to_string(),
494 "data-type" => parsed_domain_data.data_type = value.to_string(),
495 "size" => parsed_domain_data.size = value.parse()?,
496 "created-at" => parsed_domain_data.created_at = value.to_string(),
497 "updated-at" => parsed_domain_data.updated_at = value.to_string(),
498 _ => {}
499 }
500 }
501 }
502 domain_data = Some(parsed_domain_data);
503 }
504 }
505 }
506
507 if let Some(domain_data) = domain_data {
508 Ok(domain_data)
509 } else {
510 Err("Missing content-disposition header".into())
511 }
512}
513
514fn find_boundary(data: &[u8], boundary: &[u8]) -> Option<usize> {
515 let _data = String::from_utf8_lossy(data);
516 let _boundary = String::from_utf8_lossy(boundary);
517 data.windows(boundary.len()).position(|window| window == boundary)
518}
519
520fn find_headers_end(data: &[u8]) -> Option<usize> {
521 if let Some(i) = data.windows(4).position(|w| w == b"\r\n\r\n") {
522 Some(i + 4) } else if let Some(i) = data.windows(2).position(|w| w == b"\n\n") {
524 Some(i + 2) } else {
526 None
527 }
528}