posemesh_domain_http/
lib.rs

1use futures::channel::mpsc::Receiver;
2
3use crate::domain_data::{
4    DomainData, DomainDataMetadata, DownloadQuery, delete_by_id, download_by_id,
5    download_metadata_v1, download_v1_stream,
6};
7#[cfg(target_family = "wasm")]
8use crate::domain_data::{UploadDomainData, upload_v1};
9
10mod auth;
11pub mod config;
12pub mod discovery;
13pub mod domain_data;
14pub mod reconstruction;
15#[cfg(target_family = "wasm")]
16pub mod wasm;
17
18use crate::auth::TokenCache;
19use crate::discovery::{DiscoveryService, DomainWithServer};
20pub use crate::reconstruction::JobRequest;
21
22#[derive(Debug, Clone)]
23pub struct DomainClient {
24    discovery_client: DiscoveryService,
25    pub client_id: String,
26}
27
28impl DomainClient {
29    pub fn new(api_url: &str, dds_url: &str, client_id: &str) -> Self {
30        if client_id.is_empty() {
31            panic!("client_id is empty");
32        }
33        Self {
34            discovery_client: DiscoveryService::new(api_url, dds_url, client_id),
35            client_id: client_id.to_string(),
36        }
37    }
38
39    pub async fn new_with_app_credential(
40        api_url: &str,
41        dds_url: &str,
42        client_id: &str,
43        app_key: &str,
44        app_secret: &str,
45    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
46        let mut dc = DomainClient::new(api_url, dds_url, client_id);
47        let _ = dc
48            .discovery_client
49            .sign_in_as_auki_app(app_key, app_secret)
50            .await?;
51        Ok(dc)
52    }
53
54    pub async fn new_with_user_credential(
55        api_url: &str,
56        dds_url: &str,
57        client_id: &str,
58        email: &str,
59        password: &str,
60        remember_password: bool,
61    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
62        let mut dc = DomainClient::new(api_url, dds_url, client_id);
63        let _ = dc
64            .discovery_client
65            .sign_in_with_auki_account(email, password, remember_password)
66            .await?;
67        Ok(dc)
68    }
69
70    pub fn with_oidc_access_token(&self, token: &str) -> Self {
71        Self {
72            discovery_client: self.discovery_client.with_oidc_access_token(token),
73            client_id: self.client_id.clone(),
74        }
75    }
76
77    pub async fn download_domain_data(
78        &self,
79        domain_id: &str,
80        query: &DownloadQuery,
81    ) -> Result<
82        Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
83        Box<dyn std::error::Error + Send + Sync>,
84    > {
85        let domain = self.discovery_client.auth_domain(domain_id).await?;
86        let rx = download_v1_stream(
87            &domain.domain.domain_server.url,
88            &self.client_id,
89            &domain.get_access_token(),
90            domain_id,
91            query,
92        )
93        .await?;
94        Ok(rx)
95    }
96
97    #[cfg(not(target_family = "wasm"))]
98    pub async fn upload_domain_data(
99        &self,
100        domain_id: &str,
101        data: Receiver<domain_data::UploadDomainData>,
102    ) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
103        use crate::{auth::TokenCache, domain_data::upload_v1_stream};
104        let domain = self.discovery_client.auth_domain(domain_id).await?;
105        upload_v1_stream(
106            &domain.domain.domain_server.url,
107            &domain.get_access_token(),
108            domain_id,
109            data,
110        )
111        .await
112    }
113
114    #[cfg(target_family = "wasm")]
115    pub async fn upload_domain_data(
116        &self,
117        domain_id: &str,
118        data: Vec<UploadDomainData>,
119    ) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
120        let domain = self.discovery_client.auth_domain(domain_id).await?;
121        upload_v1(
122            &domain.domain.domain_server.url,
123            &domain.get_access_token(),
124            domain_id,
125            data,
126        )
127        .await
128    }
129
130    pub async fn download_metadata(
131        &self,
132        domain_id: &str,
133        query: &DownloadQuery,
134    ) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
135        let domain = self.discovery_client.auth_domain(domain_id).await?;
136        download_metadata_v1(
137            &domain.domain.domain_server.url,
138            &self.client_id,
139            &domain.get_access_token(),
140            domain_id,
141            query,
142        )
143        .await
144    }
145
146    pub async fn download_domain_data_by_id(
147        &self,
148        domain_id: &str,
149        id: &str,
150    ) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
151        let domain = self.discovery_client.auth_domain(domain_id).await?;
152        download_by_id(
153            &domain.domain.domain_server.url,
154            &self.client_id,
155            &domain.get_access_token(),
156            domain_id,
157            id,
158        )
159        .await
160    }
161
162    pub async fn delete_domain_data_by_id(
163        &self,
164        domain_id: &str,
165        id: &str,
166    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
167        let domain = self.discovery_client.auth_domain(domain_id).await?;
168        delete_by_id(
169            &domain.domain.domain_server.url,
170            &domain.get_access_token(),
171            domain_id,
172            id,
173        )
174        .await
175    }
176
177    pub async fn submit_job_request_v1(
178        &self,
179        domain_id: &str,
180        request: &JobRequest,
181    ) -> Result<reqwest::Response, Box<dyn std::error::Error + Send + Sync>> {
182        let domain = self.discovery_client.auth_domain(domain_id).await?;
183        crate::reconstruction::forward_job_request_v1(
184            &domain.domain.domain_server.url,
185            &self.client_id,
186            &domain.get_access_token(),
187            domain_id,
188            request,
189        )
190        .await
191    }
192  
193    pub async fn list_domains(
194        &self,
195        org: &str,
196    ) -> Result<Vec<DomainWithServer>, Box<dyn std::error::Error + Send + Sync>> {
197        self.discovery_client.list_domains(org).await
198    }
199}
200
201#[cfg(not(target_family = "wasm"))]
202#[cfg(test)]
203mod tests {
204    use std::time::Duration;
205
206    use crate::domain_data::{CreateDomainData, DomainAction, UpdateDomainData, UploadDomainData};
207
208    use super::*;
209    use futures::{StreamExt, channel::mpsc};
210    use tokio::{spawn, time::sleep};
211
212    fn get_config() -> (config::Config, String) {
213        if std::path::Path::new("../.env.local").exists() {
214            dotenvy::from_filename("../.env.local").ok();
215        }
216        dotenvy::dotenv().ok();
217        let config = config::Config::from_env().unwrap();
218        (config, std::env::var("DOMAIN_ID").unwrap())
219    }
220
221    #[tokio::test]
222    async fn test_download_domain_data_with_app_credential() {
223        // Create a test client
224        let config = get_config();
225        let client = DomainClient::new_with_app_credential(
226            &config.0.api_url,
227            &config.0.dds_url,
228            &config.0.client_id,
229            &config.0.app_key.unwrap(),
230            &config.0.app_secret.unwrap(),
231        )
232        .await
233        .expect("Failed to create client");
234
235        // Create a test query
236        let query = DownloadQuery {
237            ids: vec![],
238            name: None,
239            data_type: Some("dmt_accel_csv".to_string()),
240        };
241
242        // Test the download function
243        let result = client.download_domain_data(&config.1, &query).await;
244
245        assert!(result.is_ok(), "error message : {:?}", result.err());
246
247        let mut rx = result.unwrap();
248        let mut count = 0;
249        while let Some(Ok(data)) = rx.next().await {
250            count += 1;
251            assert_eq!(data.metadata.data_type, "dmt_accel_csv");
252        }
253        assert!(count > 0);
254    }
255
256    #[tokio::test]
257    async fn test_upload_domain_data_with_user_credential() {
258        use futures::SinkExt;
259        let config = get_config();
260        let client = DomainClient::new_with_user_credential(
261            &config.0.api_url,
262            &config.0.dds_url,
263            &config.0.client_id,
264            &config.0.email.unwrap(),
265            &config.0.password.unwrap(),
266            true,
267        )
268        .await
269        .expect("Failed to create client");
270
271        let data = vec![
272            UploadDomainData {
273                action: DomainAction::Create(CreateDomainData {
274                    name: "to be deleted".to_string(),
275                    data_type: "test".to_string(),
276                }),
277                data: "{\"test\": \"test\"}".as_bytes().to_vec(),
278            },
279            UploadDomainData {
280                action: DomainAction::Update(UpdateDomainData {
281                    id: "a84a36e5-312b-4f80-974a-06f5d19c1e16".to_string(),
282                }),
283                data: "{\"test\": \"test updated\"}".as_bytes().to_vec(),
284            },
285        ];
286        let (mut tx, rx) = mpsc::channel(10);
287        spawn(async move {
288            for d in data {
289                tx.send(d).await.unwrap();
290            }
291            tx.close().await.unwrap();
292        });
293        let result = client.upload_domain_data(&config.1, rx).await;
294
295        assert!(result.is_ok(), "error message : {:?}", result.err());
296
297        sleep(Duration::from_secs(5)).await;
298        let result = result.unwrap();
299        assert_eq!(result.len(), 2);
300
301        let ids = result.iter().map(|d| d.id.clone()).collect::<Vec<String>>();
302        assert_eq!(ids.len(), 2);
303        // Create a test query
304        let query = DownloadQuery {
305            ids: ids,
306            name: None,
307            data_type: None,
308        };
309
310        // Test the download function
311        let result = client.download_domain_data(&config.1, &query).await;
312
313        assert!(result.is_ok(), "error message : {:?}", result.err());
314
315        let mut to_delete = None;
316        let mut count = 0;
317        let mut rx = result.unwrap();
318        while let Some(Ok(data)) = rx.next().await {
319            count += 1;
320            if data.metadata.id == "a84a36e5-312b-4f80-974a-06f5d19c1e16" {
321                assert_eq!(data.data, b"{\"test\": \"test updated\"}");
322                continue;
323            } else {
324                assert_eq!(data.data, b"{\"test\": \"test\"}");
325            }
326            to_delete = Some(data.metadata.id.clone());
327        }
328        assert_eq!(count, 2);
329        assert_eq!(to_delete.is_some(), true);
330
331        // Delete the one whose id is not "a8"
332        let delete_result = client
333            .delete_domain_data_by_id(&config.1, &to_delete.unwrap())
334            .await;
335        assert!(
336            delete_result.is_ok(),
337            "Failed to delete data by id: {:?}",
338            delete_result.err()
339        );
340    }
341
342    #[tokio::test]
343    async fn test_download_domain_data_by_id() {
344        let config = get_config();
345        let client = DomainClient::new_with_app_credential(
346            &config.0.api_url,
347            &config.0.dds_url,
348            &config.0.client_id,
349            &config.0.app_key.unwrap(),
350            &config.0.app_secret.unwrap(),
351        )
352        .await
353        .expect("Failed to create client");
354
355        // Now test download by id
356        let download_result = client
357            .download_domain_data_by_id(&config.1, "a84a36e5-312b-4f80-974a-06f5d19c1e16")
358            .await;
359
360        assert!(
361            download_result.is_ok(),
362            "download by id failed: {:?}",
363            download_result.err()
364        );
365        let downloaded_bytes = download_result.unwrap();
366        assert_eq!(downloaded_bytes, b"{\"test\": \"test updated\"}".to_vec());
367    }
368
369    #[tokio::test]
370    async fn test_download_domain_metadata() {
371        let config = get_config();
372        let client = DomainClient::new_with_app_credential(
373            &config.0.api_url,
374            &config.0.dds_url,
375            &config.0.client_id,
376            &config.0.app_key.clone().unwrap(),
377            &config.0.app_secret.clone().unwrap(),
378        )
379        .await
380        .expect("Failed to create client");
381
382        // Download all metadata for the domain
383        let result = client
384            .download_metadata(
385                &config.1,
386                &DownloadQuery {
387                    ids: vec![],
388                    name: None,
389                    data_type: Some("test".to_string()),
390                },
391            )
392            .await;
393        assert!(
394            result.is_ok(),
395            "Failed to download domain metadata: {:?}",
396            result.err()
397        );
398        let result = result.unwrap();
399        assert!(result.len() > 0);
400        for meta in result {
401            assert!(!meta.id.is_empty());
402            assert_eq!(meta.domain_id, config.1);
403            assert!(!meta.name.is_empty());
404            assert_eq!(meta.data_type, "test");
405        }
406    }
407
408    #[tokio::test]
409    async fn test_load_domain_with_oidc_access_token() {
410        let config = get_config();
411        // Assume we have a function to get a valid oidc_access_token for testing
412        let oidc_access_token =
413            std::env::var("AUTH_TEST_TOKEN").expect("AUTH_TEST_TOKEN env var not set");
414        if oidc_access_token.is_empty() {
415            eprintln!("Missing AUTH_TEST_TOKEN, skipping test");
416            return;
417        }
418
419        let client =
420            DiscoveryService::new(&config.0.api_url, &config.0.dds_url, &config.0.client_id);
421
422        let domain = client
423            .with_oidc_access_token(&oidc_access_token)
424            .auth_domain(&config.1)
425            .await;
426        assert!(domain.is_ok(), "Failed to get domain: {:?}", domain.err());
427        assert_eq!(domain.unwrap().domain.id, config.1);
428    }
429
430    #[tokio::test]
431    async fn test_list_domains() {
432        let config = get_config();
433        let client = DomainClient::new_with_app_credential(
434            &config.0.api_url,
435            &config.0.dds_url,
436            &config.0.client_id,
437            &config.0.app_key.unwrap(),
438            &config.0.app_secret.unwrap(),
439        )
440        .await
441        .expect("Failed to create client");
442
443        let org = std::env::var("TEST_ORGANIZATION").unwrap_or("own".to_string());
444        let result = client.list_domains(&org).await.unwrap();
445        assert!(result.len() > 0, "No domains found");
446    }
447
448    #[tokio::test]
449    async fn test_submit_job_request_v1_with_invalid_processing_type() {
450        let config = get_config();
451        let client = DomainClient::new_with_user_credential(
452            &config.0.api_url,
453            &config.0.dds_url,
454            &config.0.client_id,
455            &config.0.email.unwrap(),
456            &config.0.password.unwrap(),
457            true,
458        )
459        .await
460        .expect("Failed to create client");
461
462        let mut job_request= JobRequest::default();
463        job_request.processing_type = "invalid_processing_type".to_string();
464        let res = client.submit_job_request_v1(&config.1, &job_request).await.expect_err("Failed to submit job request");
465        assert_eq!(res.to_string(), "Failed to process domain. Status: 400 Bad Request - invalid processing type");
466    }
467}