posemesh_domain_http/
domain_data.rs

1use futures::{channel::mpsc, stream::StreamExt, SinkExt};
2use reqwest::{Client, Response, Body};
3use serde::{Deserialize, Serialize};
4#[cfg(target_family = "wasm")]
5use wasm_bindgen_futures::js_sys::Uint8Array;
6use std::collections::HashMap;
7#[cfg(not(target_family = "wasm"))]
8use tokio::spawn;
9#[cfg(target_family = "wasm")]
10use wasm_bindgen_futures::spawn_local as spawn;
11#[cfg(target_family = "wasm")]
12use wasm_bindgen::prelude::*;
13
14#[derive(Debug, Clone)]
15pub struct DomainServer {
16    pub url: String,
17}
18
19#[cfg_attr(target_family = "wasm", wasm_bindgen(getter_with_clone))]
20#[derive(Debug, Deserialize, Serialize)]
21pub struct DomainData {
22    pub id: String,
23    pub domain_id: String,
24    pub name: String,
25    pub data_type: String,
26    pub size: u64,
27    pub created_at: String,
28    pub updated_at: String,
29    #[serde(skip_serializing_if = "Vec::is_empty", skip_deserializing)]
30    pub data: Vec<u8>,
31}
32
33#[cfg_attr(target_family = "wasm", wasm_bindgen)]
34impl DomainData {
35    #[cfg(target_family = "wasm")]
36    #[wasm_bindgen]
37    pub fn get_data_bytes(self) -> Uint8Array {
38        Uint8Array::from(self.data.as_slice())
39    }
40}
41
42#[cfg_attr(target_family = "wasm", wasm_bindgen(getter_with_clone))]
43#[derive(Debug, Serialize, Clone)]
44pub struct UpdateDomainData {
45    pub id: String,
46}
47
48#[cfg_attr(target_family = "wasm", wasm_bindgen(getter_with_clone))]
49#[derive(Debug, Serialize, Clone)]
50pub struct CreateDomainData {
51    pub name: String,
52    pub data_type: String,
53}
54
55#[cfg_attr(target_family = "wasm", wasm_bindgen(getter_with_clone))]
56#[derive(Debug, Serialize)]
57pub struct UploadDomainData {
58    #[serde(flatten, skip_serializing_if = "Option::is_none")]
59    pub create: Option<CreateDomainData>,
60    #[serde(flatten, skip_serializing_if = "Option::is_none")]
61    pub update: Option<UpdateDomainData>,
62    pub data: Vec<u8>,
63}
64
65#[derive(Debug, Serialize)]
66pub struct DownloadQuery {
67    pub ids: Vec<String>,
68    pub name: Option<String>,
69    pub data_type: Option<String>,
70}
71
72#[derive(Debug, Deserialize)]
73struct ListDomainData {
74    pub data: Vec<DomainData>,
75}
76
77pub async fn download_metadata_v1(
78    url: &str,
79    client_id: &str,
80    access_token: &str,
81    domain_id: &str,
82    query: &DownloadQuery,
83) -> Result<Vec<DomainData>, Box<dyn std::error::Error + Send + Sync>> {
84    let mut params = HashMap::new();
85    
86    if !query.ids.is_empty() {
87        params.insert("ids", query.ids.join(","));
88    }
89    if let Some(name) = &query.name {
90        params.insert("name", name.clone());
91    }
92    if let Some(data_type) = &query.data_type {
93        params.insert("data_type", data_type.clone());
94    }
95
96    let response = Client::new()
97        .get(&format!("{}/api/v1/domains/{}/data", url, domain_id))
98        .bearer_auth(access_token)
99        .header("Accept", "application/json")
100        .header("posemesh-client-id", client_id)
101        .query(&params)
102        .send()
103        .await?;
104
105    if response.status().is_success() {
106        let data = response.json::<ListDomainData>().await?;
107        Ok(data.data)
108    } else {
109        let status = response.status();
110        let text = response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
111        Err(format!("Failed to download data. Status: {} - {}", status, text).into())
112    }
113}
114
115pub async fn download_v1(
116    url: &str,
117    client_id: &str,
118    access_token: &str,
119    domain_id: &str,
120    query: &DownloadQuery,
121) -> Result<Response, Box<dyn std::error::Error + Send + Sync>> {
122    let mut params = HashMap::new();
123    
124    if !query.ids.is_empty() {
125        params.insert("ids", query.ids.join(","));
126    }
127    if let Some(name) = &query.name {
128        params.insert("name", name.clone());
129    }
130    if let Some(data_type) = &query.data_type {
131        params.insert("data_type", data_type.clone());
132    }
133
134    let response = Client::new()
135        .get(&format!("{}/api/v1/domains/{}/data", url, domain_id))
136        .bearer_auth(access_token)
137        .header("Accept", "multipart/form-data")
138        .header("posemesh-client-id", client_id)
139        .query(&params)
140        .send()
141        .await?;
142
143    if response.status().is_success() {
144        Ok(response)
145    } else {
146        let status = response.status();
147        let text = response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
148        Err(format!("Failed to download data. Status: {} - {}", status, text).into())
149    }
150}
151
152pub async fn download_v1_stream(
153    url: &str,
154    client_id: &str,
155    access_token: &str,
156    domain_id: &str,
157    query: &DownloadQuery,
158) -> Result<mpsc::Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>, Box<dyn std::error::Error + Send + Sync>> {
159    let response = download_v1(url, client_id, access_token, domain_id, query).await?;
160    let (mut tx, rx) = mpsc::channel::<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>(100);
161
162    let boundary = match response
163        .headers()
164        .get("content-type")
165        .and_then(|ct| ct.to_str().ok())
166        .and_then(|ct| {
167            if ct.starts_with("multipart/form-data; boundary=") {
168                Some(ct.split("boundary=").nth(1)?.to_string())
169            } else {
170                None
171            }
172        }) {
173        Some(b) => b,
174        None => {
175            tracing::error!("Invalid content-type header");
176            let _ = tx.close().await;
177            return Err("Invalid content-type header".into());
178        }
179    };
180
181    spawn(async move {
182        let mut stream = response.bytes_stream();
183        let mut buffer = Vec::new();
184        let mut current_domain_data: Option<DomainData> = None;
185
186        let boundary_bytes = format!("--{}", boundary).as_bytes().to_vec();
187
188        while let Some(chunk_result) = stream.next().await {
189            let chunk = match chunk_result {
190                Ok(c) if c.is_empty() => {
191                    tx.close().await.ok();
192                    return;
193                },
194                Ok(c) => c,
195                Err(e) => {
196                    let _ = tx.send(Err(e.into())).await;
197                    return;
198                }
199            };
200
201            buffer.extend_from_slice(&chunk);
202
203            if let Some(mut domain_data) = current_domain_data.take() {
204                let expected_size = domain_data.size as usize;
205                if buffer.len() >= expected_size {
206                    domain_data.data.extend_from_slice(&buffer[..expected_size]);
207                    buffer.drain(..expected_size);
208                    if tx.send(Ok(domain_data)).await.is_err() {
209                        return;
210                    }
211                } else {
212                    current_domain_data = Some(domain_data);
213                    continue;
214                }
215            }
216
217            // Find boundary
218            if let Some(boundary_pos) = find_boundary(&buffer, &boundary_bytes) {
219                // Look for header end after boundary
220                if let Some(header_end) = find_headers_end(&buffer[boundary_pos..]) {
221                    let headers_slice = &buffer[boundary_pos..boundary_pos + header_end];
222                    let part_headers = parse_headers(headers_slice);
223                    if let Ok(domain_data) = part_headers {
224                        current_domain_data = Some(domain_data);
225                    } else {
226                        tracing::error!("Failed to parse headers: {:?}", part_headers.err());
227                        return;
228                    }
229
230                    // Remove processed data from buffer
231                    buffer.drain(..boundary_pos + header_end);
232                    continue;
233                } else {
234                    // Incomplete headers, wait for more chunks
235                    continue;
236                }
237            }
238        }
239    });
240
241    Ok(rx)
242}
243
244#[cfg(not(target_family = "wasm"))]
245pub async fn upload_v1_stream(
246    url: &str,
247    access_token: &str,
248    domain_id: &str,
249    mut rx: mpsc::Receiver<UploadDomainData>,
250) -> Result<Vec<DomainData>, Box<dyn std::error::Error + Send + Sync>> {
251    use futures::channel::oneshot;
252
253    let boundary = "boundary";
254
255    let (mut create_tx, create_rx) = mpsc::channel(100);
256    let (mut update_tx, update_rx) = mpsc::channel(100);
257
258    let create_body = Body::wrap_stream(create_rx.map(Ok::<Vec<u8>, std::io::Error>));
259    let update_body = Body::wrap_stream(update_rx.map(Ok::<Vec<u8>, std::io::Error>));
260
261    let url = url.to_string();
262    let url_2 = url.clone();
263    let access_token = access_token.to_string();
264    let domain_id = domain_id.to_string();
265    let access_token_2 = access_token.clone();
266    let domain_id_2 = domain_id.clone();
267
268    let (create_signal, create_signal_rx) = oneshot::channel::<Result<Vec<DomainData>, Box<dyn std::error::Error + Send + Sync>>>();
269    let (update_signal, update_signal_rx) = oneshot::channel::<Result<Vec<DomainData>, Box<dyn std::error::Error + Send + Sync>>>();
270
271    spawn(async move {
272        let create_response = Client::new()
273            .post(&format!("{}/api/v1/domains/{}/data", url, domain_id))
274            .bearer_auth(access_token)
275            .header("Content-Type", &format!("multipart/form-data; boundary={}", boundary))
276            .body(create_body)
277            .send()
278            .await;
279
280        if let Err(e) = create_response {
281            tracing::error!("Create failed with error: {}", e);
282            create_signal.send(Err(e.into())).unwrap();
283            return;
284            // return Err(e.into());
285        }
286
287        let create_response = create_response.unwrap();
288        if create_response.status().is_success() {
289            let data = create_response.json::<ListDomainData>().await.unwrap();
290            create_signal.send(Ok(data.data)).unwrap();
291        } else {
292            let err = create_response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
293            create_signal.send(Err(format!("Create failed with status: {}", err).into())).unwrap();
294        }
295    });
296
297    spawn(async move {
298        let update_response = Client::new()
299            .put(&format!("{}/api/v1/domains/{}/data", url_2, domain_id_2))
300            .bearer_auth(access_token_2)
301            .header("Content-Type", &format!("multipart/form-data; boundary={}", boundary))
302            .body(update_body)
303            .send()
304            .await;
305
306        if let Err(e) = update_response {
307            tracing::error!("Update failed with error: {}", e);
308            update_signal.send(Err(e.into())).unwrap();
309            return;
310            // return Err(e.into());
311        }
312        let update_response = update_response.unwrap();
313        if update_response.status().is_success() {
314            let data = update_response.json::<ListDomainData>().await.unwrap();
315            update_signal.send(Ok(data.data)).unwrap();
316        } else {
317            let err = update_response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
318            update_signal.send(Err(format!("Update failed with status: {}", err).into())).unwrap();
319        }
320    });
321
322    while let Some(datum) = rx.next().await {
323        // Process the first item based on whether it's create or update
324        if let Some(update) = &datum.update {
325            // Create update bytes with boundary format
326            let update_bytes = format!(
327                "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; id=\"{}\"\r\n\r\n",
328                boundary, update.id, update.id
329            );
330            let mut update_data = update_bytes.into_bytes();
331            update_data.extend_from_slice(&datum.data);
332            update_data.extend_from_slice("\r\n".as_bytes());
333            
334            update_tx.send(update_data).await?;
335        } else if let Some(create) = &datum.create {
336            // Create create bytes with boundary format
337            let create_bytes = format!(
338                "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
339                boundary, create.name, create.data_type
340            );
341            let mut create_data = create_bytes.into_bytes();
342            create_data.extend_from_slice(&datum.data);
343            create_data.extend_from_slice("\r\n".as_bytes());
344            
345            create_tx.clone().send(create_data).await?;
346        }
347    }
348    update_tx.send(format!("--{}--\r\n", boundary).as_bytes().to_vec()).await?;
349    create_tx.send(format!("--{}--\r\n", boundary).as_bytes().to_vec()).await?;
350    update_tx.close().await?;
351    create_tx.close().await?;
352
353    let mut data = Vec::new();
354
355    if let Ok(res) = create_signal_rx.await {
356        match res {
357            Ok(d) => data = d,
358            Err(e) => return Err(e),
359        }
360    } else {
361        return Err("create cancelled".into());
362    }
363
364    if let Ok(res) = update_signal_rx.await {
365        match res {
366            Ok(d) => data.extend(d),
367            Err(e) => return Err(e),
368        }
369    } else {
370        return Err("update cancelled".into());
371    }
372
373    Ok(data)
374}
375
376pub async fn upload_v1(
377    url: &str,
378    access_token: &str,
379    domain_id: &str,
380    data: Vec<UploadDomainData>,
381) -> Result<Vec<DomainData>, Box<dyn std::error::Error + Send + Sync>> {
382    let boundary = "boundary";
383
384    let mut create_body = Vec::new();
385    let mut update_body = Vec::new();
386
387    let url = url.to_string();
388    let url_2 = url.clone();
389    let access_token = access_token.to_string();
390    let domain_id = domain_id.to_string();
391    let access_token_2 = access_token.clone();
392    let domain_id_2 = domain_id.clone();
393
394    // Process the first item to get metadata for the form
395    for datun in data {
396    // Process the first item based on whether it's create or update
397        if let Some(update) = &datun.update {
398            // Create update bytes with boundary format
399            let update_bytes = format!(
400                "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; id=\"{}\"\r\n\r\n",
401                boundary, update.id, update.id
402            );
403            let mut update_data = update_bytes.into_bytes();
404            update_data.extend_from_slice(&datun.data);
405            update_data.extend_from_slice("\r\n".as_bytes());
406            
407            update_body.extend_from_slice(&update_data);
408        } else if let Some(create) = &datun.create {
409            // Create create bytes with boundary format
410            let create_bytes = format!(
411                "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
412                boundary, create.name, create.data_type
413            );
414            let mut create_data = create_bytes.into_bytes();
415            create_data.extend_from_slice(&datun.data);
416            create_data.extend_from_slice("\r\n".as_bytes());
417            
418            create_body.extend_from_slice(&create_data);
419        }
420    }
421
422    create_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
423    update_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
424
425    let create_body = Body::from(create_body);
426    let update_body = Body::from(update_body);
427
428    let create_response = Client::new()
429        .post(&format!("{}/api/v1/domains/{}/data", url, domain_id))
430        .bearer_auth(access_token)
431        .header("Content-Type", "multipart/form-data")
432        .body(create_body)
433        .send()
434        .await.unwrap();
435
436    let mut res = Vec::new();
437    if create_response.status().is_success() {
438        let data = create_response.json::<ListDomainData>().await.unwrap();
439        res = data.data;
440    } else {
441        let err = create_response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
442        return Err(format!("Create failed with status: {}", err).into());
443    }
444
445    let update_response = Client::new()
446        .post(&format!("{}/api/v1/domains/{}/data", url_2, domain_id_2))
447        .bearer_auth(access_token_2)
448        .header("Content-Type", "multipart/form-data")
449        .body(update_body)
450        .send()
451        .await.unwrap();
452
453    if update_response.status().is_success() {
454        let data = update_response.json::<ListDomainData>().await.unwrap();
455        res.extend(data.data);
456    } else {
457        let err = update_response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
458        return Err(format!("Update failed with status: {}", err).into());
459    }
460
461    Ok(res)
462} 
463    
464fn parse_headers(headers_slice: &[u8]) -> Result<DomainData, Box<dyn std::error::Error + Send + Sync>> {
465    let headers_str = String::from_utf8_lossy(headers_slice);
466    let mut domain_data = None;
467    
468    for line in headers_str.lines() {
469        if line.trim().is_empty() {
470            break;
471        }
472        if let Some((key, value)) = line.split_once(':') {
473            let key = key.trim().to_lowercase();
474            if key == "content-disposition" {
475                let mut parsed_domain_data = DomainData {
476                    id: String::new(),
477                    domain_id: String::new(),
478                    name: String::new(),
479                    data_type: String::new(),
480                    size: 0,
481                    created_at: String::new(),
482                    updated_at: String::new(),
483                    data: Vec::new(),
484                };
485                for part in value.split(';') {
486                    let part = part.trim();
487                    if let Some((key, value)) = part.split_once('=') {
488                        let key = key.trim();
489                        let value = value.trim().trim_matches('"');
490                        match key {
491                            "id" => parsed_domain_data.id = value.to_string(),
492                            "domain-id" => parsed_domain_data.domain_id = value.to_string(),
493                            "name" => parsed_domain_data.name = value.to_string(),
494                            "data-type" => parsed_domain_data.data_type = value.to_string(),
495                            "size" => parsed_domain_data.size = value.parse()?,
496                            "created-at" => parsed_domain_data.created_at = value.to_string(),
497                            "updated-at" => parsed_domain_data.updated_at = value.to_string(),
498                            _ => {}
499                        }
500                    }
501                } 
502                domain_data = Some(parsed_domain_data);
503            }
504        }
505    }
506
507    if let Some(domain_data) = domain_data {
508        Ok(domain_data)
509    } else {
510        Err("Missing content-disposition header".into())
511    }
512}
513
514fn find_boundary(data: &[u8], boundary: &[u8]) -> Option<usize> {
515    let _data = String::from_utf8_lossy(data);
516    let _boundary = String::from_utf8_lossy(boundary);
517    data.windows(boundary.len()).position(|window| window == boundary)
518}
519
520fn find_headers_end(data: &[u8]) -> Option<usize> {
521    // Look for double CRLF which marks the end of headers
522    data.windows(4).position(|window| window == b"\r\n\r\n")
523        .or_else(|| data.windows(2).position(|window| window == b"\n\n"))
524}