1use futures::{channel::mpsc, stream::StreamExt, SinkExt};
2use reqwest::{Client, Response, Body};
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5#[cfg(not(target_family = "wasm"))]
6use tokio::spawn;
7#[cfg(target_family = "wasm")]
8use wasm_bindgen_futures::spawn_local as spawn;
9#[cfg(target_family = "wasm")]
10use wasm_bindgen::prelude::*;
11
12#[derive(Debug, Clone)]
13pub struct DomainServer {
14 pub url: String,
15}
16
17#[cfg_attr(target_family = "wasm", wasm_bindgen(getter_with_clone))]
18#[derive(Debug, Deserialize, Serialize)]
19pub struct DomainData {
20 pub id: String,
21 pub domain_id: String,
22 pub name: String,
23 pub data_type: String,
24 pub size: u64,
25 pub created_at: String,
26 pub updated_at: String,
27 #[serde(skip_serializing_if = "Vec::is_empty", skip_deserializing)]
28 pub data: Vec<u8>,
29}
30
31#[cfg_attr(target_family = "wasm", wasm_bindgen(getter_with_clone))]
32#[derive(Debug, Serialize, Clone)]
33pub struct UpdateDomainData {
34 pub id: String,
35}
36
37#[cfg_attr(target_family = "wasm", wasm_bindgen(getter_with_clone))]
38#[derive(Debug, Serialize, Clone)]
39pub struct CreateDomainData {
40 pub name: String,
41 pub data_type: String,
42}
43
44#[cfg_attr(target_family = "wasm", wasm_bindgen(getter_with_clone))]
45#[derive(Debug, Serialize)]
46pub struct UploadDomainData {
47 #[serde(flatten, skip_serializing_if = "Option::is_none")]
48 pub create: Option<CreateDomainData>,
49 #[serde(flatten, skip_serializing_if = "Option::is_none")]
50 pub update: Option<UpdateDomainData>,
51 pub data: Vec<u8>,
52}
53
54#[cfg_attr(target_family = "wasm", wasm_bindgen(getter_with_clone))]
55#[derive(Debug, Serialize)]
56pub struct DownloadQuery {
57 pub ids: Vec<String>,
58 pub name: Option<String>,
59 pub data_type: Option<String>,
60}
61
62#[derive(Debug, Deserialize)]
63struct ListDomainData {
64 pub data: Vec<DomainData>,
65}
66
67pub async fn download_v1(
68 url: &str,
69 client_id: &str,
70 access_token: &str,
71 domain_id: &str,
72 query: &DownloadQuery,
73) -> Result<Response, Box<dyn std::error::Error>> {
74 let mut params = HashMap::new();
75
76 if !query.ids.is_empty() {
77 params.insert("ids", query.ids.join(","));
78 }
79 if let Some(name) = &query.name {
80 params.insert("name", name.clone());
81 }
82 if let Some(data_type) = &query.data_type {
83 params.insert("data_type", data_type.clone());
84 }
85
86 let response = Client::new()
87 .get(&format!("{}/api/v1/domains/{}/data", url, domain_id))
88 .bearer_auth(access_token)
89 .header("Accept", "multipart/form-data")
90 .header("posemesh-client-id", client_id)
91 .query(¶ms)
92 .send()
93 .await?;
94
95 if response.status().is_success() {
96 Ok(response)
97 } else {
98 let status = response.status();
99 let text = response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
100 Err(format!("Failed to download data. Status: {} - {}", status, text).into())
101 }
102}
103
104pub async fn download_v1_stream(
105 url: &str,
106 client_id: &str,
107 access_token: &str,
108 domain_id: &str,
109 query: &DownloadQuery,
110) -> Result<mpsc::Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>, Box<dyn std::error::Error>> {
111 let response = download_v1(url, client_id, access_token, domain_id, query).await?;
112 let (mut tx, rx) = mpsc::channel::<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>(100);
113
114 let boundary = match response
115 .headers()
116 .get("content-type")
117 .and_then(|ct| ct.to_str().ok())
118 .and_then(|ct| {
119 if ct.starts_with("multipart/form-data; boundary=") {
120 Some(ct.split("boundary=").nth(1)?.to_string())
121 } else {
122 None
123 }
124 }) {
125 Some(b) => b,
126 None => {
127 tracing::error!("Invalid content-type header");
128 let _ = tx.close().await;
129 return Err("Invalid content-type header".into());
130 }
131 };
132
133 spawn(async move {
134 let mut stream = response.bytes_stream();
135 let mut buffer = Vec::new();
136 let mut current_domain_data: Option<DomainData> = None;
137
138 let boundary_bytes = format!("--{}", boundary).as_bytes().to_vec();
139
140 while let Some(chunk_result) = stream.next().await {
141 let chunk = match chunk_result {
142 Ok(c) if c.is_empty() => {
143 tx.close().await.ok();
144 return;
145 },
146 Ok(c) => c,
147 Err(e) => {
148 let _ = tx.send(Err(e.into())).await;
149 return;
150 }
151 };
152
153 buffer.extend_from_slice(&chunk);
154
155 if let Some(mut domain_data) = current_domain_data.take() {
156 let expected_size = domain_data.size as usize;
157 if buffer.len() >= expected_size {
158 domain_data.data.extend_from_slice(&buffer[..expected_size]);
159 buffer.drain(..expected_size);
160 if tx.send(Ok(domain_data)).await.is_err() {
161 return;
162 }
163 } else {
164 current_domain_data = Some(domain_data);
165 continue;
166 }
167 }
168
169 if let Some(boundary_pos) = find_boundary(&buffer, &boundary_bytes) {
171 if let Some(header_end) = find_headers_end(&buffer[boundary_pos..]) {
173 let headers_slice = &buffer[boundary_pos..boundary_pos + header_end];
174 let part_headers = parse_headers(headers_slice);
175 if let Ok(domain_data) = part_headers {
176 current_domain_data = Some(domain_data);
177 } else {
178 tracing::error!("Failed to parse headers: {:?}", part_headers.err());
179 return;
180 }
181
182 buffer.drain(..boundary_pos + header_end);
184 continue;
185 } else {
186 continue;
188 }
189 }
190 }
191 });
192
193 Ok(rx)
194}
195
196#[cfg(not(target_family = "wasm"))]
197pub async fn upload_v1_stream(
198 url: &str,
199 access_token: &str,
200 domain_id: &str,
201 mut rx: mpsc::Receiver<UploadDomainData>,
202) -> Result<Vec<DomainData>, Box<dyn std::error::Error>> {
203 use futures::channel::oneshot;
204
205 let boundary = "boundary";
206
207 let (mut create_tx, create_rx) = mpsc::channel(100);
208 let (mut update_tx, update_rx) = mpsc::channel(100);
209
210 let create_body = Body::wrap_stream(create_rx.map(Ok::<Vec<u8>, std::io::Error>));
211 let update_body = Body::wrap_stream(update_rx.map(Ok::<Vec<u8>, std::io::Error>));
212
213 let url = url.to_string();
214 let url_2 = url.clone();
215 let access_token = access_token.to_string();
216 let domain_id = domain_id.to_string();
217 let access_token_2 = access_token.clone();
218 let domain_id_2 = domain_id.clone();
219
220 let (create_signal, create_signal_rx) = oneshot::channel::<Result<Vec<DomainData>, Box<dyn std::error::Error + Send + Sync>>>();
221 let (update_signal, update_signal_rx) = oneshot::channel::<Result<Vec<DomainData>, Box<dyn std::error::Error + Send + Sync>>>();
222
223 spawn(async move {
224 let create_response = Client::new()
225 .post(&format!("{}/api/v1/domains/{}/data", url, domain_id))
226 .bearer_auth(access_token)
227 .header("Content-Type", &format!("multipart/form-data; boundary={}", boundary))
228 .body(create_body)
229 .send()
230 .await;
231
232 if let Err(e) = create_response {
233 tracing::error!("Create failed with error: {}", e);
234 create_signal.send(Err(e.into())).unwrap();
235 return;
236 }
238
239 let create_response = create_response.unwrap();
240 if create_response.status().is_success() {
241 let data = create_response.json::<ListDomainData>().await.unwrap();
242 create_signal.send(Ok(data.data)).unwrap();
243 } else {
244 let err = create_response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
245 create_signal.send(Err(format!("Create failed with status: {}", err).into())).unwrap();
246 }
247 });
248
249 spawn(async move {
250 let update_response = Client::new()
251 .put(&format!("{}/api/v1/domains/{}/data", url_2, domain_id_2))
252 .bearer_auth(access_token_2)
253 .header("Content-Type", &format!("multipart/form-data; boundary={}", boundary))
254 .body(update_body)
255 .send()
256 .await;
257
258 if let Err(e) = update_response {
259 tracing::error!("Update failed with error: {}", e);
260 update_signal.send(Err(e.into())).unwrap();
261 return;
262 }
264 let update_response = update_response.unwrap();
265 if update_response.status().is_success() {
266 let data = update_response.json::<ListDomainData>().await.unwrap();
267 update_signal.send(Ok(data.data)).unwrap();
268 } else {
269 let err = update_response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
270 update_signal.send(Err(format!("Update failed with status: {}", err).into())).unwrap();
271 }
272 });
273
274 while let Some(datum) = rx.next().await {
275 if let Some(update) = &datum.update {
277 let update_bytes = format!(
279 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; id=\"{}\"\r\n\r\n",
280 boundary, update.id, update.id
281 );
282 let mut update_data = update_bytes.into_bytes();
283 update_data.extend_from_slice(&datum.data);
284 update_data.extend_from_slice("\r\n".as_bytes());
285
286 update_tx.send(update_data).await?;
287 } else if let Some(create) = &datum.create {
288 let create_bytes = format!(
290 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
291 boundary, create.name, create.data_type
292 );
293 let mut create_data = create_bytes.into_bytes();
294 create_data.extend_from_slice(&datum.data);
295 create_data.extend_from_slice("\r\n".as_bytes());
296
297 create_tx.clone().send(create_data).await?;
298 }
299 }
300 update_tx.send(format!("--{}--\r\n", boundary).as_bytes().to_vec()).await?;
301 create_tx.send(format!("--{}--\r\n", boundary).as_bytes().to_vec()).await?;
302 update_tx.close().await?;
303 create_tx.close().await?;
304
305 let mut data = Vec::new();
306
307 if let Ok(res) = create_signal_rx.await {
308 match res {
309 Ok(d) => data = d,
310 Err(e) => return Err(e),
311 }
312 } else {
313 return Err("create cancelled".into());
314 }
315
316 if let Ok(res) = update_signal_rx.await {
317 match res {
318 Ok(d) => data.extend(d),
319 Err(e) => return Err(e),
320 }
321 } else {
322 return Err("update cancelled".into());
323 }
324
325 Ok(data)
326}
327
328pub async fn upload_v1(
329 url: &str,
330 access_token: &str,
331 domain_id: &str,
332 data: Vec<UploadDomainData>,
333) -> Result<Vec<DomainData>, Box<dyn std::error::Error>> {
334 let boundary = "boundary";
335
336 let mut create_body = Vec::new();
337 let mut update_body = Vec::new();
338
339 let url = url.to_string();
340 let url_2 = url.clone();
341 let access_token = access_token.to_string();
342 let domain_id = domain_id.to_string();
343 let access_token_2 = access_token.clone();
344 let domain_id_2 = domain_id.clone();
345
346 for datun in data {
348 if let Some(update) = &datun.update {
350 let update_bytes = format!(
352 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; id=\"{}\"\r\n\r\n",
353 boundary, update.id, update.id
354 );
355 let mut update_data = update_bytes.into_bytes();
356 update_data.extend_from_slice(&datun.data);
357 update_data.extend_from_slice("\r\n".as_bytes());
358
359 update_body.extend_from_slice(&update_data);
360 } else if let Some(create) = &datun.create {
361 let create_bytes = format!(
363 "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
364 boundary, create.name, create.data_type
365 );
366 let mut create_data = create_bytes.into_bytes();
367 create_data.extend_from_slice(&datun.data);
368 create_data.extend_from_slice("\r\n".as_bytes());
369
370 create_body.extend_from_slice(&create_data);
371 }
372 }
373
374 create_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
375 update_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
376
377 let create_body = Body::from(create_body);
378 let update_body = Body::from(update_body);
379
380 let create_response = Client::new()
381 .post(&format!("{}/api/v1/domains/{}/data", url, domain_id))
382 .bearer_auth(access_token)
383 .header("Content-Type", "multipart/form-data")
384 .body(create_body)
385 .send()
386 .await.unwrap();
387
388 let mut res = Vec::new();
389 if create_response.status().is_success() {
390 let data = create_response.json::<ListDomainData>().await.unwrap();
391 res = data.data;
392 } else {
393 let err = create_response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
394 return Err(format!("Create failed with status: {}", err).into());
395 }
396
397 let update_response = Client::new()
398 .post(&format!("{}/api/v1/domains/{}/data", url_2, domain_id_2))
399 .bearer_auth(access_token_2)
400 .header("Content-Type", "multipart/form-data")
401 .body(update_body)
402 .send()
403 .await.unwrap();
404
405 if update_response.status().is_success() {
406 let data = update_response.json::<ListDomainData>().await.unwrap();
407 res.extend(data.data);
408 } else {
409 let err = update_response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
410 return Err(format!("Update failed with status: {}", err).into());
411 }
412
413 Ok(res)
414}
415
416fn parse_headers(headers_slice: &[u8]) -> Result<DomainData, Box<dyn std::error::Error + Send + Sync>> {
417 let headers_str = String::from_utf8_lossy(headers_slice);
418 let mut domain_data = None;
419
420 for line in headers_str.lines() {
421 if line.trim().is_empty() {
422 break;
423 }
424 if let Some((key, value)) = line.split_once(':') {
425 let key = key.trim().to_lowercase();
426 if key == "content-disposition" {
427 let mut parsed_domain_data = DomainData {
428 id: String::new(),
429 domain_id: String::new(),
430 name: String::new(),
431 data_type: String::new(),
432 size: 0,
433 created_at: String::new(),
434 updated_at: String::new(),
435 data: Vec::new(),
436 };
437 for part in value.split(';') {
438 let part = part.trim();
439 if let Some((key, value)) = part.split_once('=') {
440 let key = key.trim();
441 let value = value.trim().trim_matches('"');
442 match key {
443 "id" => parsed_domain_data.id = value.to_string(),
444 "domain-id" => parsed_domain_data.domain_id = value.to_string(),
445 "name" => parsed_domain_data.name = value.to_string(),
446 "data-type" => parsed_domain_data.data_type = value.to_string(),
447 "size" => parsed_domain_data.size = value.parse()?,
448 "created-at" => parsed_domain_data.created_at = value.to_string(),
449 "updated-at" => parsed_domain_data.updated_at = value.to_string(),
450 _ => {}
451 }
452 }
453 }
454 domain_data = Some(parsed_domain_data);
455 }
456 }
457 }
458
459 if let Some(domain_data) = domain_data {
460 Ok(domain_data)
461 } else {
462 Err("Missing content-disposition header".into())
463 }
464}
465
466fn find_boundary(data: &[u8], boundary: &[u8]) -> Option<usize> {
467 let _data = String::from_utf8_lossy(data);
468 let _boundary = String::from_utf8_lossy(boundary);
469 data.windows(boundary.len()).position(|window| window == boundary)
470}
471
472fn find_headers_end(data: &[u8]) -> Option<usize> {
473 data.windows(4).position(|window| window == b"\r\n\r\n")
475 .or_else(|| data.windows(2).position(|window| window == b"\n\n"))
476}