posemesh_domain_http/
domain_data.rs

1use futures::{channel::mpsc, stream::StreamExt, SinkExt};
2use reqwest::{Client, Response, Body};
3use serde::{Deserialize, Serialize};
4#[cfg(target_family = "wasm")]
5use wasm_bindgen_futures::js_sys::Uint8Array;
6use std::collections::HashMap;
7#[cfg(not(target_family = "wasm"))]
8use tokio::spawn;
9#[cfg(target_family = "wasm")]
10use wasm_bindgen_futures::spawn_local as spawn;
11#[cfg(target_family = "wasm")]
12use wasm_bindgen::prelude::*;
13
14#[derive(Debug, Clone)]
15pub struct DomainServer {
16    pub url: String,
17}
18
19#[cfg_attr(target_family = "wasm", wasm_bindgen(getter_with_clone))]
20#[derive(Debug, Deserialize, Serialize)]
21pub struct DomainData {
22    pub id: String,
23    pub domain_id: String,
24    pub name: String,
25    pub data_type: String,
26    pub size: u64,
27    pub created_at: String,
28    pub updated_at: String,
29    #[serde(skip_serializing_if = "Vec::is_empty", skip_deserializing)]
30    pub data: Vec<u8>,
31}
32
33#[cfg_attr(target_family = "wasm", wasm_bindgen)]
34impl DomainData {
35    #[cfg(target_family = "wasm")]
36    #[wasm_bindgen]
37    pub fn get_data_bytes(&self) -> Uint8Array {
38        Uint8Array::from(self.data.as_slice())
39    }
40}
41
42#[cfg_attr(target_family = "wasm", wasm_bindgen(getter_with_clone))]
43#[derive(Debug, Serialize, Clone)]
44pub struct UpdateDomainData {
45    pub id: String,
46}
47
48#[cfg_attr(target_family = "wasm", wasm_bindgen)]
49impl UpdateDomainData {
50    #[cfg(target_family = "wasm")]
51    #[wasm_bindgen(constructor)]
52    pub fn new(id: String) -> Self {
53        Self { id }
54    }
55}
56#[cfg_attr(target_family = "wasm", wasm_bindgen(getter_with_clone))]
57#[derive(Debug, Serialize, Clone)]
58pub struct CreateDomainData {
59    pub name: String,
60    pub data_type: String,
61}
62#[cfg_attr(target_family = "wasm", wasm_bindgen)]
63impl CreateDomainData {
64    #[cfg(target_family = "wasm")]
65    #[wasm_bindgen(constructor)]
66    pub fn new(name: String, data_type: String) -> Self {
67        Self { name, data_type }
68    }
69}
70
71#[cfg_attr(target_family = "wasm", wasm_bindgen(getter_with_clone))]
72#[derive(Debug, Serialize)]
73pub struct UploadDomainData {
74    #[serde(flatten, skip_serializing_if = "Option::is_none")]
75    pub create: Option<CreateDomainData>,
76    #[serde(flatten, skip_serializing_if = "Option::is_none")]
77    pub update: Option<UpdateDomainData>,
78    pub data: Vec<u8>,
79}
80
81#[cfg_attr(target_family = "wasm", wasm_bindgen)]
82impl UploadDomainData {
83    #[cfg(target_family = "wasm")]
84    #[wasm_bindgen(constructor)]
85    pub fn new(create: Option<CreateDomainData>, update: Option<UpdateDomainData>, data: Uint8Array) -> Self {
86        Self { create, update, data: data.to_vec() }
87    }
88}
89
90#[derive(Debug, Serialize, Deserialize)]
91pub struct DownloadQuery {
92    pub ids: Vec<String>,
93    pub name: Option<String>,
94    pub data_type: Option<String>,
95}
96
97#[derive(Debug, Deserialize)]
98struct ListDomainData {
99    pub data: Vec<DomainData>,
100}
101
102pub async fn download_by_id(
103    url: &str,
104    client_id: &str,
105    access_token: &str,
106    domain_id: &str,
107    id: &str,
108) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
109    let response = Client::new()
110        .get(&format!("{}/api/v1/domains/{}/data/{}?raw=true", url, domain_id, id))
111        .bearer_auth(access_token)
112        .header("posemesh-client-id", client_id)
113        .send()
114        .await?;
115
116    if response.status().is_success() {
117        let data = response.bytes().await?;
118        Ok(data.to_vec())
119    } else {
120        let status = response.status();
121        let text = response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
122        Err(format!("Failed to download data by id. Status: {} - {}", status, text).into())
123    }
124}
125
126pub async fn download_metadata_v1(
127    url: &str,
128    client_id: &str,
129    access_token: &str,
130    domain_id: &str,
131    query: &DownloadQuery,
132) -> Result<Vec<DomainData>, Box<dyn std::error::Error + Send + Sync>> {
133    let mut params = HashMap::new();
134    
135    if !query.ids.is_empty() {
136        params.insert("ids", query.ids.join(","));
137    }
138    if let Some(name) = &query.name {
139        params.insert("name", name.clone());
140    }
141    if let Some(data_type) = &query.data_type {
142        params.insert("data_type", data_type.clone());
143    }
144
145    let response = Client::new()
146        .get(&format!("{}/api/v1/domains/{}/data", url, domain_id))
147        .bearer_auth(access_token)
148        .header("Accept", "application/json")
149        .header("posemesh-client-id", client_id)
150        .query(&params)
151        .send()
152        .await?;
153
154    if response.status().is_success() {
155        let data = response.json::<ListDomainData>().await?;
156        Ok(data.data)
157    } else {
158        let status = response.status();
159        let text = response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
160        Err(format!("Failed to download data. Status: {} - {}", status, text).into())
161    }
162}
163
164pub async fn download_v1(
165    url: &str,
166    client_id: &str,
167    access_token: &str,
168    domain_id: &str,
169    query: &DownloadQuery,
170) -> Result<Response, Box<dyn std::error::Error + Send + Sync>> {
171    let mut params = HashMap::new();
172    
173    if !query.ids.is_empty() {
174        params.insert("ids", query.ids.join(","));
175    }
176    if let Some(name) = &query.name {
177        params.insert("name", name.clone());
178    }
179    if let Some(data_type) = &query.data_type {
180        params.insert("data_type", data_type.clone());
181    }
182
183    let response = Client::new()
184        .get(&format!("{}/api/v1/domains/{}/data", url, domain_id))
185        .bearer_auth(access_token)
186        .header("Accept", "multipart/form-data")
187        .header("posemesh-client-id", client_id)
188        .query(&params)
189        .send()
190        .await?;
191
192    if response.status().is_success() {
193        Ok(response)
194    } else {
195        let status = response.status();
196        let text = response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
197        Err(format!("Failed to download data. Status: {} - {}", status, text).into())
198    }
199}
200
201pub async fn download_v1_stream(
202    url: &str,
203    client_id: &str,
204    access_token: &str,
205    domain_id: &str,
206    query: &DownloadQuery,
207) -> Result<mpsc::Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>, Box<dyn std::error::Error + Send + Sync>> {
208    let response = download_v1(url, client_id, access_token, domain_id, query).await?;
209    let (mut tx, rx) = mpsc::channel::<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>(100);
210
211    let boundary = match response
212        .headers()
213        .get("content-type")
214        .and_then(|ct| ct.to_str().ok())
215        .and_then(|ct| {
216            if ct.starts_with("multipart/form-data; boundary=") {
217                Some(ct.split("boundary=").nth(1)?.to_string())
218            } else {
219                None
220            }
221        }) {
222        Some(b) => b,
223        None => {
224            tracing::error!("Invalid content-type header");
225            let _ = tx.close().await;
226            return Err("Invalid content-type header".into());
227        }
228    };
229
230    spawn(async move {
231        let mut stream = response.bytes_stream();
232        let mut buffer = Vec::new();
233        let mut current_domain_data: Option<DomainData> = None;
234
235        let boundary_bytes = format!("--{}", boundary).as_bytes().to_vec();
236
237        while let Some(chunk_result) = stream.next().await {
238            let chunk = match chunk_result {
239                Ok(c) if c.is_empty() => {
240                    tx.close().await.ok();
241                    return;
242                },
243                Ok(c) => c,
244                Err(e) => {
245                    let _ = tx.send(Err(e.into())).await;
246                    return;
247                }
248            };
249
250            buffer.extend_from_slice(&chunk);
251
252            if let Some(mut domain_data) = current_domain_data.take() {
253                let expected_size = domain_data.size as usize;
254                if buffer.len() >= expected_size {
255                    domain_data.data.extend_from_slice(&buffer[..expected_size]);
256                    buffer.drain(..expected_size);
257                    if tx.send(Ok(domain_data)).await.is_err() {
258                        return;
259                    }
260                } else {
261                    current_domain_data = Some(domain_data);
262                    continue;
263                }
264            }
265
266            // Find boundary
267            if let Some(boundary_pos) = find_boundary(&buffer, &boundary_bytes) {
268                // Look for header end after boundary
269                if let Some(header_end) = find_headers_end(&buffer[boundary_pos..]) {
270                    let headers_slice = &buffer[boundary_pos..boundary_pos + header_end];
271                    let part_headers = parse_headers(headers_slice);
272                    if let Ok(domain_data) = part_headers {
273                        current_domain_data = Some(domain_data);
274                    } else {
275                        tracing::error!("Failed to parse headers: {:?}", part_headers.err());
276                        return;
277                    }
278
279                    // Remove processed data from buffer
280                    buffer.drain(..boundary_pos + header_end);
281                    continue;
282                } else {
283                    // Incomplete headers, wait for more chunks
284                    continue;
285                }
286            }
287        }
288    });
289
290    Ok(rx)
291}
292
293#[cfg(not(target_family = "wasm"))]
294pub async fn upload_v1_stream(
295    url: &str,
296    access_token: &str,
297    domain_id: &str,
298    mut rx: mpsc::Receiver<UploadDomainData>,
299) -> Result<Vec<DomainData>, Box<dyn std::error::Error + Send + Sync>> {
300    use futures::channel::oneshot;
301
302    let boundary = "boundary";
303
304    let (mut create_tx, create_rx) = mpsc::channel(100);
305    let (mut update_tx, update_rx) = mpsc::channel(100);
306
307    let create_body = Body::wrap_stream(create_rx.map(Ok::<Vec<u8>, std::io::Error>));
308    let update_body = Body::wrap_stream(update_rx.map(Ok::<Vec<u8>, std::io::Error>));
309
310    let url = url.to_string();
311    let url_2 = url.clone();
312    let access_token = access_token.to_string();
313    let domain_id = domain_id.to_string();
314    let access_token_2 = access_token.clone();
315    let domain_id_2 = domain_id.clone();
316
317    let (create_signal, create_signal_rx) = oneshot::channel::<Result<Vec<DomainData>, Box<dyn std::error::Error + Send + Sync>>>();
318    let (update_signal, update_signal_rx) = oneshot::channel::<Result<Vec<DomainData>, Box<dyn std::error::Error + Send + Sync>>>();
319
320    spawn(async move {
321        let create_response = Client::new()
322            .post(&format!("{}/api/v1/domains/{}/data", url, domain_id))
323            .bearer_auth(access_token)
324            .header("Content-Type", &format!("multipart/form-data; boundary={}", boundary))
325            .body(create_body)
326            .send()
327            .await;
328
329        if let Err(e) = create_response {
330            tracing::error!("Create failed with error: {}", e);
331            create_signal.send(Err(e.into())).unwrap();
332            return;
333        }
334
335        let create_response = create_response.unwrap();
336        if create_response.status().is_success() {
337            let data = create_response.json::<ListDomainData>().await.unwrap();
338            create_signal.send(Ok(data.data)).unwrap();
339        } else {
340            let err = create_response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
341            create_signal.send(Err(format!("Create failed with status: {}", err).into())).unwrap();
342        }
343    });
344
345    spawn(async move {
346        let update_response = Client::new()
347            .put(&format!("{}/api/v1/domains/{}/data", url_2, domain_id_2))
348            .bearer_auth(access_token_2)
349            .header("Content-Type", &format!("multipart/form-data; boundary={}", boundary))
350            .body(update_body)
351            .send()
352            .await;
353
354        if let Err(e) = update_response {
355            tracing::error!("Update failed with error: {}", e);
356            update_signal.send(Err(e.into())).unwrap();
357            return;
358        }
359        let update_response = update_response.unwrap();
360        if update_response.status().is_success() {
361            let data = update_response.json::<ListDomainData>().await.unwrap();
362            update_signal.send(Ok(data.data)).unwrap();
363        } else {
364            let err = update_response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
365            update_signal.send(Err(format!("Update failed with status: {}", err).into())).unwrap();
366        }
367    });
368
369    while let Some(datum) = rx.next().await {
370        // Process the first item based on whether it's create or update
371        if let Some(update) = &datum.update {
372            // Create update bytes with boundary format
373            let update_bytes = format!(
374                "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; id=\"{}\"\r\n\r\n",
375                boundary, update.id, update.id
376            );
377            let mut update_data = update_bytes.into_bytes();
378            update_data.extend_from_slice(&datum.data);
379            update_data.extend_from_slice("\r\n".as_bytes());
380            
381            update_tx.send(update_data).await?;
382        } else if let Some(create) = &datum.create {
383            // Create create bytes with boundary format
384            let create_bytes = format!(
385                "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
386                boundary, create.name, create.data_type
387            );
388            let mut create_data = create_bytes.into_bytes();
389            create_data.extend_from_slice(&datum.data);
390            create_data.extend_from_slice("\r\n".as_bytes());
391            
392            create_tx.clone().send(create_data).await?;
393        }
394    }
395    update_tx.send(format!("--{}--\r\n", boundary).as_bytes().to_vec()).await?;
396    create_tx.send(format!("--{}--\r\n", boundary).as_bytes().to_vec()).await?;
397    update_tx.close().await?;
398    create_tx.close().await?;
399
400    let mut data = Vec::new();
401
402    if let Ok(res) = create_signal_rx.await {
403        match res {
404            Ok(d) => data = d,
405            Err(e) => return Err(e),
406        }
407    } else {
408        return Err("create cancelled".into());
409    }
410
411    if let Ok(res) = update_signal_rx.await {
412        match res {
413            Ok(d) => data.extend(d),
414            Err(e) => return Err(e),
415        }
416    } else {
417        return Err("update cancelled".into());
418    }
419
420    Ok(data)
421}
422
423pub async fn upload_v1(
424    url: &str,
425    access_token: &str,
426    domain_id: &str,
427    data: Vec<UploadDomainData>,
428) -> Result<Vec<DomainData>, Box<dyn std::error::Error + Send + Sync>> {
429    let boundary = "boundary";
430
431    let mut create_body = Vec::new();
432    let mut update_body = Vec::new();
433
434    let url = url.to_string();
435    let url_2 = url.clone();
436    let access_token = access_token.to_string();
437    let domain_id = domain_id.to_string();
438    let access_token_2 = access_token.clone();
439    let domain_id_2 = domain_id.clone();
440
441    // Process the first item to get metadata for the form
442    for datun in data {
443    // Process the first item based on whether it's create or update
444        if let Some(update) = &datun.update {
445            // Create update bytes with boundary format
446            let update_bytes = format!(
447                "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; id=\"{}\"\r\n\r\n",
448                boundary, update.id, update.id
449            );
450            let mut update_data = update_bytes.into_bytes();
451            update_data.extend_from_slice(&datun.data);
452            update_data.extend_from_slice("\r\n".as_bytes());
453            
454            update_body.extend_from_slice(&update_data);
455        } else if let Some(create) = &datun.create {
456            // Create create bytes with boundary format
457            let create_bytes = format!(
458                "--{}\r\nContent-Type: application/octet-stream\r\nContent-Disposition: form-data; name=\"{}\"; data-type=\"{}\"\r\n\r\n",
459                boundary, create.name, create.data_type
460            );
461            let mut create_data = create_bytes.into_bytes();
462            create_data.extend_from_slice(&datun.data);
463            create_data.extend_from_slice("\r\n".as_bytes());
464            
465            create_body.extend_from_slice(&create_data);
466        }
467    }
468
469    create_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
470    update_body.extend_from_slice(format!("--{}--\r\n", boundary).as_bytes());
471
472    let create_body = Body::from(create_body);
473    let update_body = Body::from(update_body);
474
475    let create_response = Client::new()
476        .post(&format!("{}/api/v1/domains/{}/data", url, domain_id))
477        .bearer_auth(access_token)
478        .header("Content-Type", &format!("multipart/form-data; boundary={}", boundary))
479        .body(create_body)
480        .send()
481        .await?;
482
483    let mut res = Vec::new();
484    if create_response.status().is_success() {
485        let data = create_response.json::<ListDomainData>().await.unwrap();
486        res = data.data;
487    } else {
488        let err = create_response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
489        return Err(format!("Create failed with status: {}", err).into());
490    }
491
492    let update_response = Client::new()
493        .post(&format!("{}/api/v1/domains/{}/data", url_2, domain_id_2))
494        .bearer_auth(access_token_2)
495        .header("Content-Type", &format!("multipart/form-data; boundary={}", boundary))
496        .body(update_body)
497        .send()
498        .await.unwrap();
499
500    if update_response.status().is_success() {
501        let data = update_response.json::<ListDomainData>().await.unwrap();
502        res.extend(data.data);
503    } else {
504        let err = update_response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
505        return Err(format!("Update failed with status: {}", err).into());
506    }
507
508    Ok(res)
509} 
510    
511fn parse_headers(headers_slice: &[u8]) -> Result<DomainData, Box<dyn std::error::Error + Send + Sync>> {
512    let headers_str = String::from_utf8_lossy(headers_slice);
513    let mut domain_data = None;
514    
515    for line in headers_str.lines() {
516        if line.trim().is_empty() {
517            break;
518        }
519        if let Some((key, value)) = line.split_once(':') {
520            let key = key.trim().to_lowercase();
521            if key == "content-disposition" {
522                let mut parsed_domain_data = DomainData {
523                    id: String::new(),
524                    domain_id: String::new(),
525                    name: String::new(),
526                    data_type: String::new(),
527                    size: 0,
528                    created_at: String::new(),
529                    updated_at: String::new(),
530                    data: Vec::new(),
531                };
532                for part in value.split(';') {
533                    let part = part.trim();
534                    if let Some((key, value)) = part.split_once('=') {
535                        let key = key.trim();
536                        let value = value.trim().trim_matches('"');
537                        match key {
538                            "id" => parsed_domain_data.id = value.to_string(),
539                            "domain-id" => parsed_domain_data.domain_id = value.to_string(),
540                            "name" => parsed_domain_data.name = value.to_string(),
541                            "data-type" => parsed_domain_data.data_type = value.to_string(),
542                            "size" => parsed_domain_data.size = value.parse()?,
543                            "created-at" => parsed_domain_data.created_at = value.to_string(),
544                            "updated-at" => parsed_domain_data.updated_at = value.to_string(),
545                            _ => {}
546                        }
547                    }
548                } 
549                domain_data = Some(parsed_domain_data);
550            }
551        }
552    }
553
554    if let Some(domain_data) = domain_data {
555        Ok(domain_data)
556    } else {
557        Err("Missing content-disposition header".into())
558    }
559}
560
561fn find_boundary(data: &[u8], boundary: &[u8]) -> Option<usize> {
562    let _data = String::from_utf8_lossy(data);
563    let _boundary = String::from_utf8_lossy(boundary);
564    data.windows(boundary.len()).position(|window| window == boundary)
565}
566
567fn find_headers_end(data: &[u8]) -> Option<usize> {
568    if let Some(i) = data.windows(4).position(|w| w == b"\r\n\r\n") {
569        Some(i + 4) // body starts after \r\n\r\n
570    } else if let Some(i) = data.windows(2).position(|w| w == b"\n\n") {
571        Some(i + 2) // body starts after \n\n
572    } else {
573        None
574    }
575}