posemesh_domain_http/
lib.rs

1use futures::channel::mpsc::Receiver;
2
3use crate::domain_data::{
4    DomainData, DomainDataMetadata, DownloadQuery, delete_by_id, download_by_id,
5    download_metadata_v1, download_v1_stream,
6};
7#[cfg(target_family = "wasm")]
8use crate::domain_data::{UploadDomainData, upload_v1};
9
10mod auth;
11pub mod config;
12pub mod discovery;
13pub mod domain_data;
14pub mod reconstruction;
15#[cfg(target_family = "wasm")]
16pub mod wasm;
17
18use crate::auth::TokenCache;
19use crate::discovery::{DiscoveryService, DomainWithServer};
20pub use crate::reconstruction::JobRequest;
21
22#[derive(Debug, Clone)]
23pub struct DomainClient {
24    discovery_client: DiscoveryService,
25    pub client_id: String,
26}
27
28impl DomainClient {
29    pub fn new(api_url: &str, dds_url: &str, client_id: &str) -> Self {
30        if client_id.is_empty() {
31            panic!("client_id is empty");
32        }
33        Self {
34            discovery_client: DiscoveryService::new(api_url, dds_url, client_id),
35            client_id: client_id.to_string(),
36        }
37    }
38
39    pub async fn new_with_app_credential(
40        api_url: &str,
41        dds_url: &str,
42        client_id: &str,
43        app_key: &str,
44        app_secret: &str,
45    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
46        let mut dc = DomainClient::new(api_url, dds_url, client_id);
47        dc.discovery_client
48            .sign_in_as_auki_app(app_key, app_secret)
49            .await?;
50        Ok(dc)
51    }
52
53    pub async fn new_with_user_credential(
54        api_url: &str,
55        dds_url: &str,
56        client_id: &str,
57        email: &str,
58        password: &str,
59        remember_password: bool,
60    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
61        let mut dc = DomainClient::new(api_url, dds_url, client_id);
62        dc.discovery_client
63            .sign_in_with_auki_account(email, password, remember_password)
64            .await?;
65        Ok(dc)
66    }
67
68    pub fn with_oidc_access_token(&self, token: &str) -> Self {
69        Self {
70            discovery_client: self.discovery_client.with_oidc_access_token(token),
71            client_id: self.client_id.clone(),
72        }
73    }
74
75    pub async fn download_domain_data(
76        &self,
77        domain_id: &str,
78        query: &DownloadQuery,
79    ) -> Result<
80        Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
81        Box<dyn std::error::Error + Send + Sync>,
82    > {
83        let domain = self.discovery_client.auth_domain(domain_id).await?;
84        let rx = download_v1_stream(
85            &domain.domain.domain_server.url,
86            &self.client_id,
87            &domain.get_access_token(),
88            domain_id,
89            query,
90        )
91        .await?;
92        Ok(rx)
93    }
94
95    #[cfg(not(target_family = "wasm"))]
96    pub async fn upload_domain_data(
97        &self,
98        domain_id: &str,
99        data: Receiver<domain_data::UploadDomainData>,
100    ) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
101        use crate::{auth::TokenCache, domain_data::upload_v1_stream};
102        let domain = self.discovery_client.auth_domain(domain_id).await?;
103        upload_v1_stream(
104            &domain.domain.domain_server.url,
105            &domain.get_access_token(),
106            domain_id,
107            data,
108        )
109        .await
110    }
111
112    #[cfg(target_family = "wasm")]
113    pub async fn upload_domain_data(
114        &self,
115        domain_id: &str,
116        data: Vec<UploadDomainData>,
117    ) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
118        let domain = self.discovery_client.auth_domain(domain_id).await?;
119        upload_v1(
120            &domain.domain.domain_server.url,
121            &domain.get_access_token(),
122            domain_id,
123            data,
124        )
125        .await
126    }
127
128    pub async fn download_metadata(
129        &self,
130        domain_id: &str,
131        query: &DownloadQuery,
132    ) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
133        let domain = self.discovery_client.auth_domain(domain_id).await?;
134        download_metadata_v1(
135            &domain.domain.domain_server.url,
136            &self.client_id,
137            &domain.get_access_token(),
138            domain_id,
139            query,
140        )
141        .await
142    }
143
144    pub async fn download_domain_data_by_id(
145        &self,
146        domain_id: &str,
147        id: &str,
148    ) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
149        let domain = self.discovery_client.auth_domain(domain_id).await?;
150        download_by_id(
151            &domain.domain.domain_server.url,
152            &self.client_id,
153            &domain.get_access_token(),
154            domain_id,
155            id,
156        )
157        .await
158    }
159
160    pub async fn delete_domain_data_by_id(
161        &self,
162        domain_id: &str,
163        id: &str,
164    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
165        let domain = self.discovery_client.auth_domain(domain_id).await?;
166        delete_by_id(
167            &domain.domain.domain_server.url,
168            &domain.get_access_token(),
169            domain_id,
170            id,
171        )
172        .await
173    }
174
175    pub async fn submit_job_request_v1(
176        &self,
177        domain_id: &str,
178        request: &JobRequest,
179    ) -> Result<reqwest::Response, Box<dyn std::error::Error + Send + Sync>> {
180        let domain = self.discovery_client.auth_domain(domain_id).await?;
181        crate::reconstruction::forward_job_request_v1(
182            &domain.domain.domain_server.url,
183            &self.client_id,
184            &domain.get_access_token(),
185            domain_id,
186            request,
187        )
188        .await
189    }
190  
191    pub async fn list_domains(
192        &self,
193        org: &str,
194    ) -> Result<Vec<DomainWithServer>, Box<dyn std::error::Error + Send + Sync>> {
195        self.discovery_client.list_domains(org).await
196    }
197}
198
199#[cfg(not(target_family = "wasm"))]
200#[cfg(test)]
201mod tests {
202    use std::time::Duration;
203
204    use crate::domain_data::{CreateDomainData, DomainAction, UpdateDomainData, UploadDomainData};
205
206    use super::*;
207    use futures::{StreamExt, channel::mpsc};
208    use tokio::{spawn, time::sleep};
209
210    fn get_config() -> (config::Config, String) {
211        if std::path::Path::new("../.env.local").exists() {
212            dotenvy::from_filename("../.env.local").ok();
213        }
214        dotenvy::dotenv().ok();
215        let config = config::Config::from_env().unwrap();
216        (config, std::env::var("DOMAIN_ID").unwrap())
217    }
218
219    #[tokio::test]
220    async fn test_download_domain_data_with_app_credential() {
221        // Create a test client
222        let config = get_config();
223        let client = DomainClient::new_with_app_credential(
224            &config.0.api_url,
225            &config.0.dds_url,
226            &config.0.client_id,
227            &config.0.app_key.unwrap(),
228            &config.0.app_secret.unwrap(),
229        )
230        .await
231        .expect("Failed to create client");
232
233        // Create a test query
234        let query = DownloadQuery {
235            ids: vec![],
236            name: None,
237            data_type: Some("dmt_accel_csv".to_string()),
238        };
239
240        // Test the download function
241        let result = client.download_domain_data(&config.1, &query).await;
242
243        assert!(result.is_ok(), "error message : {:?}", result.err());
244
245        let mut rx = result.unwrap();
246        let mut count = 0;
247        while let Some(Ok(data)) = rx.next().await {
248            count += 1;
249            assert_eq!(data.metadata.data_type, "dmt_accel_csv");
250        }
251        assert!(count > 0);
252    }
253
254    #[tokio::test]
255    async fn test_upload_domain_data_with_user_credential() {
256        use futures::SinkExt;
257        let config = get_config();
258        let client = DomainClient::new_with_user_credential(
259            &config.0.api_url,
260            &config.0.dds_url,
261            &config.0.client_id,
262            &config.0.email.unwrap(),
263            &config.0.password.unwrap(),
264            true,
265        )
266        .await
267        .expect("Failed to create client");
268
269        let data = vec![
270            UploadDomainData {
271                action: DomainAction::Create(CreateDomainData {
272                    name: "to be deleted".to_string(),
273                    data_type: "test".to_string(),
274                }),
275                data: "{\"test\": \"test\"}".as_bytes().to_vec(),
276            },
277            UploadDomainData {
278                action: DomainAction::Update(UpdateDomainData {
279                    id: "a84a36e5-312b-4f80-974a-06f5d19c1e16".to_string(),
280                }),
281                data: "{\"test\": \"test updated\"}".as_bytes().to_vec(),
282            },
283        ];
284        let (mut tx, rx) = mpsc::channel(10);
285        spawn(async move {
286            for d in data {
287                tx.send(d).await.unwrap();
288            }
289            tx.close().await.unwrap();
290        });
291        let result = client.upload_domain_data(&config.1, rx).await;
292
293        assert!(result.is_ok(), "error message : {:?}", result.err());
294
295        sleep(Duration::from_secs(5)).await;
296        let result = result.unwrap();
297        assert_eq!(result.len(), 2);
298
299        let ids = result.iter().map(|d| d.id.clone()).collect::<Vec<String>>();
300        assert_eq!(ids.len(), 2);
301        // Create a test query
302        let query = DownloadQuery {
303            ids: ids,
304            name: None,
305            data_type: None,
306        };
307
308        // Test the download function
309        let result = client.download_domain_data(&config.1, &query).await;
310
311        assert!(result.is_ok(), "error message : {:?}", result.err());
312
313        let mut to_delete = None;
314        let mut count = 0;
315        let mut rx = result.unwrap();
316        while let Some(Ok(data)) = rx.next().await {
317            count += 1;
318            if data.metadata.id == "a84a36e5-312b-4f80-974a-06f5d19c1e16" {
319                assert_eq!(data.data, b"{\"test\": \"test updated\"}");
320                continue;
321            } else {
322                assert_eq!(data.data, b"{\"test\": \"test\"}");
323            }
324            to_delete = Some(data.metadata.id.clone());
325        }
326        assert_eq!(count, 2);
327        assert_eq!(to_delete.is_some(), true);
328
329        // Delete the one whose id is not "a8"
330        let delete_result = client
331            .delete_domain_data_by_id(&config.1, &to_delete.unwrap())
332            .await;
333        assert!(
334            delete_result.is_ok(),
335            "Failed to delete data by id: {:?}",
336            delete_result.err()
337        );
338    }
339
340    #[tokio::test]
341    async fn test_download_domain_data_by_id() {
342        let config = get_config();
343        let client = DomainClient::new_with_app_credential(
344            &config.0.api_url,
345            &config.0.dds_url,
346            &config.0.client_id,
347            &config.0.app_key.unwrap(),
348            &config.0.app_secret.unwrap(),
349        )
350        .await
351        .expect("Failed to create client");
352
353        // Now test download by id
354        let download_result = client
355            .download_domain_data_by_id(&config.1, "a84a36e5-312b-4f80-974a-06f5d19c1e16")
356            .await;
357
358        assert!(
359            download_result.is_ok(),
360            "download by id failed: {:?}",
361            download_result.err()
362        );
363        let downloaded_bytes = download_result.unwrap();
364        assert_eq!(downloaded_bytes, b"{\"test\": \"test updated\"}".to_vec());
365    }
366
367    #[tokio::test]
368    async fn test_download_domain_metadata() {
369        let config = get_config();
370        let client = DomainClient::new_with_app_credential(
371            &config.0.api_url,
372            &config.0.dds_url,
373            &config.0.client_id,
374            &config.0.app_key.clone().unwrap(),
375            &config.0.app_secret.clone().unwrap(),
376        )
377        .await
378        .expect("Failed to create client");
379
380        // Download all metadata for the domain
381        let result = client
382            .download_metadata(
383                &config.1,
384                &DownloadQuery {
385                    ids: vec![],
386                    name: None,
387                    data_type: Some("test".to_string()),
388                },
389            )
390            .await;
391        assert!(
392            result.is_ok(),
393            "Failed to download domain metadata: {:?}",
394            result.err()
395        );
396        let result = result.unwrap();
397        assert!(result.len() > 0);
398        for meta in result {
399            assert!(!meta.id.is_empty());
400            assert_eq!(meta.domain_id, config.1);
401            assert!(!meta.name.is_empty());
402            assert_eq!(meta.data_type, "test");
403        }
404    }
405
406    #[tokio::test]
407    async fn test_load_domain_with_oidc_access_token() {
408        let config = get_config();
409        // Assume we have a function to get a valid oidc_access_token for testing
410        let oidc_access_token =
411            std::env::var("AUTH_TEST_TOKEN").expect("AUTH_TEST_TOKEN env var not set");
412        if oidc_access_token.is_empty() {
413            eprintln!("Missing AUTH_TEST_TOKEN, skipping test");
414            return;
415        }
416
417        let client =
418            DiscoveryService::new(&config.0.api_url, &config.0.dds_url, &config.0.client_id);
419
420        let domain = client
421            .with_oidc_access_token(&oidc_access_token)
422            .auth_domain(&config.1)
423            .await;
424        assert!(domain.is_ok(), "Failed to get domain: {:?}", domain.err());
425        assert_eq!(domain.unwrap().domain.id, config.1);
426    }
427
428    #[tokio::test]
429    async fn test_list_domains() {
430        let config = get_config();
431        let client = DomainClient::new_with_app_credential(
432            &config.0.api_url,
433            &config.0.dds_url,
434            &config.0.client_id,
435            &config.0.app_key.unwrap(),
436            &config.0.app_secret.unwrap(),
437        )
438        .await
439        .expect("Failed to create client");
440
441        let org = std::env::var("TEST_ORGANIZATION").unwrap_or("own".to_string());
442        let result = client.list_domains(&org).await.unwrap();
443        assert!(result.len() > 0, "No domains found");
444    }
445
446    #[tokio::test]
447    async fn test_submit_job_request_v1_with_invalid_processing_type() {
448        let config = get_config();
449        let client = DomainClient::new_with_user_credential(
450            &config.0.api_url,
451            &config.0.dds_url,
452            &config.0.client_id,
453            &config.0.email.unwrap(),
454            &config.0.password.unwrap(),
455            true,
456        )
457        .await
458        .expect("Failed to create client");
459
460        let mut job_request= JobRequest::default();
461        job_request.processing_type = "invalid_processing_type".to_string();
462        let res = client.submit_job_request_v1(&config.1, &job_request).await.expect_err("Failed to submit job request");
463        assert_eq!(res.to_string(), "Failed to process domain. Status: 400 Bad Request - invalid processing type");
464    }
465}