posemesh_domain_http/
domain_data.rs

1use futures::{channel::mpsc, stream::StreamExt, SinkExt};
2use reqwest::{Client, Response, Body};
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5#[cfg(not(target_family = "wasm"))]
6use tokio::spawn;
7#[cfg(target_family = "wasm")]
8use wasm_bindgen_futures::spawn_local as spawn;
9#[cfg(target_family = "wasm")]
10use wasm_bindgen::prelude::*;
11
12#[derive(Debug, Clone)]
13pub struct DomainServer {
14    pub url: String,
15}
16
17#[cfg_attr(target_family = "wasm", wasm_bindgen(getter_with_clone))]
18#[derive(Debug, Deserialize, Serialize)]
19pub struct DomainData {
20    pub id: String,
21    pub domain_id: String,
22    pub name: String,
23    pub data_type: String,
24    pub size: u64,
25    pub created_at: String,
26    pub updated_at: String,
27    #[serde(skip_serializing_if = "Vec::is_empty", skip_deserializing)]
28    pub data: Vec<u8>,
29}
30
31#[cfg_attr(target_family = "wasm", wasm_bindgen(getter_with_clone))]
32#[derive(Debug, Serialize, Clone)]
33pub struct UpdateDomainData {
34    pub id: String,
35}
36
37#[cfg_attr(target_family = "wasm", wasm_bindgen(getter_with_clone))]
38#[derive(Debug, Serialize, Clone)]
39pub struct CreateDomainData {
40    pub name: String,
41    pub data_type: String,
42}
43
44#[cfg_attr(target_family = "wasm", wasm_bindgen(getter_with_clone))]
45#[derive(Debug, Serialize)]
46pub struct UploadDomainData {
47    #[serde(flatten, skip_serializing_if = "Option::is_none")]
48    pub create: Option<CreateDomainData>,
49    #[serde(flatten, skip_serializing_if = "Option::is_none")]
50    pub update: Option<UpdateDomainData>,
51    pub data: Vec<u8>,
52}
53
54#[cfg_attr(target_family = "wasm", wasm_bindgen(getter_with_clone))]
55#[derive(Debug, Serialize)]
56pub struct DownloadQuery {
57    pub ids: Vec<String>,
58    pub name: Option<String>,
59    pub data_type: Option<String>,
60}
61
62#[derive(Debug, Deserialize)]
63struct ListDomainData {
64    pub data: Vec<DomainData>,
65}
66
67pub async fn download_v1(
68    url: &str,
69    client_id: &str,
70    access_token: &str,
71    domain_id: &str,
72    query: &DownloadQuery,
73) -> Result<Response, Box<dyn std::error::Error>> {
74    let mut params = HashMap::new();
75    
76    if !query.ids.is_empty() {
77        params.insert("ids", query.ids.join(","));
78    }
79    if let Some(name) = &query.name {
80        params.insert("name", name.clone());
81    }
82    if let Some(data_type) = &query.data_type {
83        params.insert("data_type", data_type.clone());
84    }
85
86    let response = Client::new()
87        .get(&format!("{}/api/v1/domains/{}/data", url, domain_id))
88        .bearer_auth(access_token)
89        .header("Accept", "multipart/form-data")
90        .header("posemesh-client-id", client_id)
91        .query(&params)
92        .send()
93        .await?;
94
95    if response.status().is_success() {
96        Ok(response)
97    } else {
98        let status = response.status();
99        let text = response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
100        Err(format!("Failed to download data. Status: {} - {}", status, text).into())
101    }
102}
103
104pub async fn download_v1_stream(
105    url: &str,
106    client_id: &str,
107    access_token: &str,
108    domain_id: &str,
109    query: &DownloadQuery,
110) -> Result<mpsc::Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>, Box<dyn std::error::Error>> {
111    let response = download_v1(url, client_id, access_token, domain_id, query).await?;
112    let (mut tx, rx) = mpsc::channel::<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>(100);
113
114    let boundary = match response
115        .headers()
116        .get("content-type")
117        .and_then(|ct| ct.to_str().ok())
118        .and_then(|ct| {
119            if ct.starts_with("multipart/form-data; boundary=") {
120                Some(ct.split("boundary=").nth(1)?.to_string())
121            } else {
122                None
123            }
124        }) {
125        Some(b) => b,
126        None => {
127            tracing::error!("Invalid content-type header");
128            let _ = tx.close().await;
129            return Err("Invalid content-type header".into());
130        }
131    };
132
133    spawn(async move {
134        let mut stream = response.bytes_stream();
135        let mut buffer = Vec::new();
136        let mut current_domain_data: Option<DomainData> = None;
137
138        let boundary_bytes = format!("--{}", boundary).as_bytes().to_vec();
139
140        while let Some(chunk_result) = stream.next().await {
141            let chunk = match chunk_result {
142                Ok(c) if c.is_empty() => {
143                    tx.close().await.ok();
144                    return;
145                },
146                Ok(c) => c,
147                Err(e) => {
148                    let _ = tx.send(Err(e.into())).await;
149                    return;
150                }
151            };
152
153            buffer.extend_from_slice(&chunk);
154
155            if let Some(mut domain_data) = current_domain_data.take() {
156                let expected_size = domain_data.size as usize;
157                if buffer.len() >= expected_size {
158                    domain_data.data.extend_from_slice(&buffer[..expected_size]);
159                    buffer.drain(..expected_size);
160                    if tx.send(Ok(domain_data)).await.is_err() {
161                        return;
162                    }
163                } else {
164                    current_domain_data = Some(domain_data);
165                    continue;
166                }
167            }
168
169            // Find boundary
170            if let Some(boundary_pos) = find_boundary(&buffer, &boundary_bytes) {
171                // Look for header end after boundary
172                if let Some(header_end) = find_headers_end(&buffer[boundary_pos..]) {
173                    let headers_slice = &buffer[boundary_pos..boundary_pos + header_end];
174                    let part_headers = parse_headers(headers_slice);
175                    if let Ok(domain_data) = part_headers {
176                        current_domain_data = Some(domain_data);
177                    } else {
178                        tracing::error!("Failed to parse headers: {:?}", part_headers.err());
179                        return;
180                    }
181
182                    // Remove processed data from buffer
183                    buffer.drain(..boundary_pos + header_end);
184                    continue;
185                } else {
186                    // Incomplete headers, wait for more chunks
187                    continue;
188                }
189            }
190        }
191    });
192
193    Ok(rx)
194}
195
196#[cfg(not(target_family = "wasm"))]
197pub async fn upload_v1_stream(
198    url: &str,
199    access_token: &str,
200    domain_id: &str,
201    mut rx: mpsc::Receiver<UploadDomainData>,
202) -> Result<Vec<DomainData>, Box<dyn std::error::Error>> {
203    use futures::channel::oneshot;
204
205    let boundary = "boundary";
206
207    let (mut create_tx, create_rx) = mpsc::channel(100);
208    let (mut update_tx, update_rx) = mpsc::channel(100);
209
210    let create_body = Body::wrap_stream(create_rx.map(Ok::<Vec<u8>, std::io::Error>));
211    let update_body = Body::wrap_stream(update_rx.map(Ok::<Vec<u8>, std::io::Error>));
212
213    let url = url.to_string();
214    let url_2 = url.clone();
215    let access_token = access_token.to_string();
216    let domain_id = domain_id.to_string();
217    let access_token_2 = access_token.clone();
218    let domain_id_2 = domain_id.clone();
219
220    let (create_signal, create_signal_rx) = oneshot::channel::<Result<Vec<DomainData>, Box<dyn std::error::Error + Send + Sync>>>();
221    let (update_signal, update_signal_rx) = oneshot::channel::<Result<Vec<DomainData>, Box<dyn std::error::Error + Send + Sync>>>();
222
223    spawn(async move {
224        let create_response = Client::new()
225            .post(&format!("{}/api/v1/domains/{}/data", url, domain_id))
226            .bearer_auth(access_token)
227            .header("Content-Type", &format!("multipart/form-data; boundary={}", boundary))
228            .body(create_body)
229            .send()
230            .await;
231
232        if let Err(e) = create_response {
233            tracing::error!("Create failed with error: {}", e);
234            create_signal.send(Err(e.into())).unwrap();
235            return;
236            // return Err(e.into());
237        }
238
239        let create_response = create_response.unwrap();
240        if create_response.status().is_success() {
241            let data = create_response.json::<ListDomainData>().await.unwrap();
242            create_signal.send(Ok(data.data)).unwrap();
243        } else {
244            let err = create_response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
245            create_signal.send(Err(format!("Create failed with status: {}", err).into())).unwrap();
246        }
247    });
248
249    spawn(async move {
250        let update_response = Client::new()
251            .put(&format!("{}/api/v1/domains/{}/data", url_2, domain_id_2))
252            .bearer_auth(access_token_2)
253            .header("Content-Type", &format!("multipart/form-data; boundary={}", boundary))
254            .body(update_body)
255            .send()
256            .await;
257
258        if let Err(e) = update_response {
259            tracing::error!("Update failed with error: {}", e);
260            update_signal.send(Err(e.into())).unwrap();
261            return;
262            // return Err(e.into());
263        }
264        let update_response = update_response.unwrap();
265        if update_response.status().is_success() {
266            let data = update_response.json::<ListDomainData>().await.unwrap();
267            update_signal.send(Ok(data.data)).unwrap();
268        } else {
269            let err = update_response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
270            update_signal.send(Err(format!("Update failed with status: {}", err).into())).unwrap();
271        }
272    });
273
274    while let Some(datum) = rx.next().await {
275        // Process the first item based on whether it's create or update
276        if let Some(update) = &datum.update {
277            // Create update bytes with boundary format
278            let update_bytes = format!(
279                "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; id=\"{}\"\r\n\r\n",
280                boundary, update.id, update.id
281            );
282            let mut update_data = update_bytes.into_bytes();
283            update_data.extend_from_slice(&datum.data);
284            update_data.extend_from_slice("\r\n".as_bytes());
285            
286            update_tx.send(update_data).await?;
287        } else if let Some(create) = &datum.create {
288            // Create create bytes with boundary format
289            let create_bytes = format!(
290                "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
291                boundary, create.name, create.data_type
292            );
293            let mut create_data = create_bytes.into_bytes();
294            create_data.extend_from_slice(&datum.data);
295            create_data.extend_from_slice("\r\n".as_bytes());
296            
297            create_tx.clone().send(create_data).await?;
298        }
299    }
300    update_tx.send(format!("--{}--\r\n", boundary).as_bytes().to_vec()).await?;
301    create_tx.send(format!("--{}--\r\n", boundary).as_bytes().to_vec()).await?;
302    update_tx.close().await?;
303    create_tx.close().await?;
304
305    let mut data = Vec::new();
306
307    if let Ok(res) = create_signal_rx.await {
308        match res {
309            Ok(d) => data = d,
310            Err(e) => return Err(e),
311        }
312    } else {
313        return Err("create cancelled".into());
314    }
315
316    if let Ok(res) = update_signal_rx.await {
317        match res {
318            Ok(d) => data.extend(d),
319            Err(e) => return Err(e),
320        }
321    } else {
322        return Err("update cancelled".into());
323    }
324
325    Ok(data)
326}
327
328pub async fn upload_v1(
329    url: &str,
330    access_token: &str,
331    domain_id: &str,
332    data: Vec<UploadDomainData>,
333) -> Result<Vec<DomainData>, Box<dyn std::error::Error>> {
334    let boundary = "boundary";
335
336    let mut create_body = Vec::new();
337    let mut update_body = Vec::new();
338
339    let url = url.to_string();
340    let url_2 = url.clone();
341    let access_token = access_token.to_string();
342    let domain_id = domain_id.to_string();
343    let access_token_2 = access_token.clone();
344    let domain_id_2 = domain_id.clone();
345
346    // Process the first item to get metadata for the form
347    for datun in data {
348    // Process the first item based on whether it's create or update
349        if let Some(update) = &datun.update {
350            // Create update bytes with boundary format
351            let update_bytes = format!(
352                "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; id=\"{}\"\r\n\r\n",
353                boundary, update.id, update.id
354            );
355            let mut update_data = update_bytes.into_bytes();
356            update_data.extend_from_slice(&datun.data);
357            update_data.extend_from_slice("\r\n".as_bytes());
358            
359            update_body.extend_from_slice(&update_data);
360        } else if let Some(create) = &datun.create {
361            // Create create bytes with boundary format
362            let create_bytes = format!(
363                "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
364                boundary, create.name, create.data_type
365            );
366            let mut create_data = create_bytes.into_bytes();
367            create_data.extend_from_slice(&datun.data);
368            create_data.extend_from_slice("\r\n".as_bytes());
369            
370            create_body.extend_from_slice(&create_data);
371        }
372    }
373
374    create_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
375    update_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
376
377    let create_body = Body::from(create_body);
378    let update_body = Body::from(update_body);
379
380    let create_response = Client::new()
381        .post(&format!("{}/api/v1/domains/{}/data", url, domain_id))
382        .bearer_auth(access_token)
383        .header("Content-Type", "multipart/form-data")
384        .body(create_body)
385        .send()
386        .await.unwrap();
387
388    let mut res = Vec::new();
389    if create_response.status().is_success() {
390        let data = create_response.json::<ListDomainData>().await.unwrap();
391        res = data.data;
392    } else {
393        let err = create_response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
394        return Err(format!("Create failed with status: {}", err).into());
395    }
396
397    let update_response = Client::new()
398        .post(&format!("{}/api/v1/domains/{}/data", url_2, domain_id_2))
399        .bearer_auth(access_token_2)
400        .header("Content-Type", "multipart/form-data")
401        .body(update_body)
402        .send()
403        .await.unwrap();
404
405    if update_response.status().is_success() {
406        let data = update_response.json::<ListDomainData>().await.unwrap();
407        res.extend(data.data);
408    } else {
409        let err = update_response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
410        return Err(format!("Update failed with status: {}", err).into());
411    }
412
413    Ok(res)
414} 
415    
416fn parse_headers(headers_slice: &[u8]) -> Result<DomainData, Box<dyn std::error::Error + Send + Sync>> {
417    let headers_str = String::from_utf8_lossy(headers_slice);
418    let mut domain_data = None;
419    
420    for line in headers_str.lines() {
421        if line.trim().is_empty() {
422            break;
423        }
424        if let Some((key, value)) = line.split_once(':') {
425            let key = key.trim().to_lowercase();
426            if key == "content-disposition" {
427                let mut parsed_domain_data = DomainData {
428                    id: String::new(),
429                    domain_id: String::new(),
430                    name: String::new(),
431                    data_type: String::new(),
432                    size: 0,
433                    created_at: String::new(),
434                    updated_at: String::new(),
435                    data: Vec::new(),
436                };
437                for part in value.split(';') {
438                    let part = part.trim();
439                    if let Some((key, value)) = part.split_once('=') {
440                        let key = key.trim();
441                        let value = value.trim().trim_matches('"');
442                        match key {
443                            "id" => parsed_domain_data.id = value.to_string(),
444                            "domain-id" => parsed_domain_data.domain_id = value.to_string(),
445                            "name" => parsed_domain_data.name = value.to_string(),
446                            "data-type" => parsed_domain_data.data_type = value.to_string(),
447                            "size" => parsed_domain_data.size = value.parse()?,
448                            "created-at" => parsed_domain_data.created_at = value.to_string(),
449                            "updated-at" => parsed_domain_data.updated_at = value.to_string(),
450                            _ => {}
451                        }
452                    }
453                } 
454                domain_data = Some(parsed_domain_data);
455            }
456        }
457    }
458
459    if let Some(domain_data) = domain_data {
460        Ok(domain_data)
461    } else {
462        Err("Missing content-disposition header".into())
463    }
464}
465
466fn find_boundary(data: &[u8], boundary: &[u8]) -> Option<usize> {
467    let _data = String::from_utf8_lossy(data);
468    let _boundary = String::from_utf8_lossy(boundary);
469    data.windows(boundary.len()).position(|window| window == boundary)
470}
471
472fn find_headers_end(data: &[u8]) -> Option<usize> {
473    // Look for double CRLF which marks the end of headers
474    data.windows(4).position(|window| window == b"\r\n\r\n")
475        .or_else(|| data.windows(2).position(|window| window == b"\n\n"))
476}