posemesh_domain_http/
domain_client.rs

1use futures::{channel::mpsc::{self, Receiver}, StreamExt, SinkExt};
2#[cfg(not(target_family = "wasm"))]
3use tokio::spawn;
4#[cfg(target_family = "wasm")]
5use wasm_bindgen_futures::spawn_local as spawn;
6use serde::{Deserialize, Serialize};
7use crate::domain_data::{
8    DomainData, DomainDataMetadata, DownloadQuery, UploadDomainData, delete_by_id, download_by_id, download_metadata_v1, download_v1_stream, upload_v1
9};
10
11use crate::auth::TokenCache;
12use crate::discovery::{DiscoveryService, DomainWithToken, ListDomainsResponse};
13use crate::errors::DomainError;
14pub use crate::reconstruction::JobRequest;
15pub use crate::config;
16pub use crate::auth;
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct ListDomainsQuery {
20    pub portal_id: Option<String>,
21    pub portal_short_id: Option<String>,
22    pub org: String,
23}
24
25#[derive(Debug, Clone)]
26pub struct DomainClient {
27    discovery_client: DiscoveryService,
28    pub client_id: String,
29}
30
31impl DomainClient {
32    pub fn new(api_url: &str, dds_url: &str, client_id: &str) -> Self {
33        if client_id.is_empty() {
34            panic!("client_id is empty");
35        }
36        Self {
37            discovery_client: DiscoveryService::new(api_url, dds_url, client_id),
38            client_id: client_id.to_string(),
39        }
40    }
41
42    pub async fn new_with_app_credential(
43        api_url: &str,
44        dds_url: &str,
45        client_id: &str,
46        app_key: &str,
47        app_secret: &str,
48    ) -> Result<Self, DomainError> {
49        let mut dc = DomainClient::new(api_url, dds_url, client_id);
50        dc.discovery_client
51            .sign_in_as_auki_app(app_key, app_secret)
52            .await?;
53        Ok(dc)
54    }
55
56    pub async fn new_with_user_credential(
57        api_url: &str,
58        dds_url: &str,
59        client_id: &str,
60        email: &str,
61        password: &str,
62        remember_password: bool,
63    ) -> Result<Self, DomainError> {
64        let mut dc = DomainClient::new(api_url, dds_url, client_id);
65        dc.discovery_client
66            .sign_in_with_auki_account(email, password, remember_password)
67            .await?;
68        Ok(dc)
69    }
70
71    pub fn with_oidc_access_token(&self, token: &str) -> Self {
72        Self {
73            discovery_client: self.discovery_client.with_oidc_access_token(token),
74            client_id: self.client_id.clone(),
75        }
76    }
77
78    pub async fn download_domain_data_stream(
79        &self,
80        domain_id: &str,
81        query: &DownloadQuery,
82    ) -> Result<
83        Receiver<Result<DomainData, DomainError>>,
84        DomainError,
85    > {
86        let domain = self.discovery_client.auth_domain(domain_id).await?;
87        let rx_box = download_v1_stream(
88            &domain.domain.domain_server.url,
89            &self.client_id,
90            &domain.get_access_token(),
91            domain_id,
92            query,
93        )
94        .await?;
95        // Map inner errors from Box<dyn Error> to DomainError
96        let (mut tx, rx) = mpsc::channel(100);
97        spawn(async move {
98            let mut rx_box = rx_box;
99            while let Some(item) = rx_box.next().await {
100                let mapped = item.map_err(DomainError::from);
101                if tx.send(mapped).await.is_err() {
102                    break;
103                }
104            }
105        });
106        Ok(rx)
107    }
108
109    pub async fn download_domain_data(
110        &self,
111        domain_id: &str,
112        query: &DownloadQuery,
113    ) -> Result<
114        Vec<DomainData>,
115        DomainError,
116    > {
117        use futures::StreamExt;
118        let mut rx = self.download_domain_data_stream(domain_id, query).await?;
119
120        let mut results = Vec::new();
121        while let Some(result) = rx.next().await {
122            results.push(result?);
123        }
124        Ok(results)
125    }
126
127    #[cfg(not(target_family = "wasm"))]
128    pub async fn upload_domain_data_stream(
129        &self,
130        domain_id: &str,
131        data: Receiver<UploadDomainData>,
132    ) -> Result<Vec<DomainDataMetadata>, DomainError> {
133        use crate::{auth::TokenCache, domain_data::upload_v1_stream};
134        let domain = self.discovery_client.auth_domain(domain_id).await?;
135        upload_v1_stream(
136            &domain.domain.domain_server.url,
137            &domain.get_access_token(),
138            domain_id,
139            data,
140        )
141        .await
142        .map_err(DomainError::from)
143    }
144
145    pub async fn upload_domain_data(
146        &self,
147        domain_id: &str,
148        data: Vec<UploadDomainData>,
149    ) -> Result<Vec<DomainDataMetadata>, DomainError> {
150        let domain = self.discovery_client.auth_domain(domain_id).await?;
151        upload_v1(
152            &domain.domain.domain_server.url,
153            &domain.get_access_token(),
154            domain_id,
155            data,
156        )
157        .await
158        .map_err(DomainError::from)
159    }
160
161    pub async fn download_metadata(
162        &self,
163        domain_id: &str,
164        query: &DownloadQuery,
165    ) -> Result<Vec<DomainDataMetadata>, DomainError> {
166        let domain = self.discovery_client.auth_domain(domain_id).await?;
167        download_metadata_v1(
168            &domain.domain.domain_server.url,
169            &self.client_id,
170            &domain.get_access_token(),
171            domain_id,
172            query,
173        )
174        .await
175        .map_err(DomainError::from)
176    }
177
178    pub async fn download_domain_data_by_id(
179        &self,
180        domain_id: &str,
181        id: &str,
182    ) -> Result<Vec<u8>, DomainError> {
183        let domain = self.discovery_client.auth_domain(domain_id).await?;
184        download_by_id(
185            &domain.domain.domain_server.url,
186            &self.client_id,
187            &domain.get_access_token(),
188            domain_id,
189            id,
190        )
191        .await
192        .map_err(DomainError::from)
193    }
194
195    pub async fn delete_domain_data_by_id(
196        &self,
197        domain_id: &str,
198        id: &str,
199    ) -> Result<(), DomainError> {
200        let domain = self.discovery_client.auth_domain(domain_id).await?;
201        delete_by_id(
202            &domain.domain.domain_server.url,
203            &domain.get_access_token(),
204            domain_id,
205            id,
206        )
207        .await
208        .map_err(DomainError::from)
209    }
210
211    pub async fn submit_job_request_v1(
212        &self,
213        domain_id: &str,
214        request: &JobRequest,
215    ) -> Result<reqwest::Response, DomainError> {
216        let domain = self.discovery_client.auth_domain(domain_id).await?;
217        crate::reconstruction::forward_job_request_v1(
218            &domain.domain.domain_server.url,
219            &self.client_id,
220            &domain.get_access_token(),
221            domain_id,
222            request,
223        )
224        .await
225        .map_err(DomainError::from)
226    }
227  
228    pub async fn list_domains(
229        &self,
230        query: &ListDomainsQuery,
231    ) -> Result<ListDomainsResponse, DomainError> {
232        if query.portal_id.is_none() && query.portal_short_id.is_none() {
233            self.discovery_client.list_domains(&query.org).await
234        } else {
235            self.discovery_client.list_domains_by_portal(query.portal_id.as_deref(), query.portal_short_id.as_deref(), &query.org).await
236        }
237    }
238
239    pub async fn create_domain(
240        &self,
241        name: &str,
242        domain_server_id: Option<String>,
243        domain_server_url: Option<String>,
244        redirect_url: Option<String>,
245    ) -> Result<DomainWithToken, DomainError> {
246        self.discovery_client.create_domain(name, domain_server_id, domain_server_url, redirect_url).await
247    }
248
249    pub async fn delete_domain(
250        &self,
251        domain_id: &str,
252    ) -> Result<(), DomainError> {
253        let domain = self.discovery_client.auth_domain(domain_id).await?;
254        self.discovery_client.delete_domain(&domain.get_access_token(), domain_id).await
255    }
256}
257
258#[cfg(not(target_family = "wasm"))]
259#[cfg(test)]
260mod tests {
261    use crate::{auth::AuthClient, domain_data::{DomainAction, UploadDomainData}};
262
263    use super::*;
264    use futures::channel::mpsc;
265    use tokio::spawn;
266
267    fn get_config() -> (config::Config, String) {
268        if std::path::Path::new("../.env.local").exists() {
269            dotenvy::from_filename("../.env.local").ok();
270        }
271        dotenvy::dotenv().ok();
272        let config = config::Config::from_env().unwrap();
273        (config, std::env::var("DOMAIN_ID").unwrap())
274    }
275
276    async fn create_test_domain(config: &config::Config) -> Result<DomainWithToken, DomainError> {
277        let client = DomainClient::new_with_user_credential(
278            &config.api_url,
279            &config.dds_url,
280            &config.client_id,
281            &config.email.clone().unwrap(),
282            &config.password.clone().unwrap(),
283            true,
284        )
285        .await
286        .expect("Failed to create test client");
287        client.create_domain(
288            &format!("test_domain_{}", uuid::Uuid::new_v4()),
289            None,
290            Some(std::env::var("TEST_DOMAIN_SERVER_URL").unwrap()),
291            None,
292        )
293        .await
294    }
295
296    async fn delete_test_domain(config: &config::Config, domain_id: &str) -> Result<(), DomainError> {
297        let client = DomainClient::new_with_user_credential(
298            &config.api_url,
299            &config.dds_url,
300            &config.client_id,
301            &config.email.clone().unwrap(),
302            &config.password.clone().unwrap(),
303            true,
304        )
305        .await
306        .expect("Failed to create test client");
307        client.delete_domain(domain_id).await
308    }
309
310    async fn create_test_domain_data(config: &config::Config, domain_id: &str) -> Result<Vec<DomainDataMetadata>, DomainError> {
311        let client = DomainClient::new_with_user_credential(
312            &config.api_url,
313            &config.dds_url,
314            &config.client_id,
315            &config.email.clone().unwrap(),
316            &config.password.clone().unwrap(),
317            true,
318        )
319        .await
320        .expect("Failed to create test client");
321
322        let data = vec![
323            UploadDomainData {
324                action: DomainAction::Create {
325                    name: "to be deleted".to_string(),
326                    data_type: "test".to_string(),
327                },
328                data: "{\"test\": \"test\"}".as_bytes().to_vec(),
329            },
330        ];
331        client.upload_domain_data(domain_id, data).await
332    }
333
334    #[tokio::test]
335    #[ignore = "requires live Auki credentials"]
336    async fn get_organization_id() {
337        let config = get_config();
338        let mut client = AuthClient::new(
339            &config.0.api_url,
340            &config.0.client_id,
341        );
342        client.sign_in_with_app_credentials(&config.0.app_key.unwrap(), &config.0.app_secret.unwrap()).await.expect("Failed to sign in with app credentials");
343        let token = client.get_dds_access_token(None).await.expect("Failed to get DDS access token");
344        let claims = auth::parse_jwt(&token).expect("Failed to parse JWT");
345        assert_ne!(claims.org.is_some(), false);
346    }
347
348    #[tokio::test]
349    #[ignore = "requires live Auki credentials"]
350    async fn test_download_domain_data_with_app_credential() {
351        // Create a test client
352        let config = get_config();
353        let config = config.0.clone();
354        let client = DomainClient::new_with_app_credential(
355            &config.api_url,
356            &config.dds_url,
357            &config.client_id,
358            &config.app_key.clone().unwrap(),
359            &config.app_secret.clone().unwrap(),
360        )
361        .await
362        .expect("Failed to create client");
363
364        let domain = create_test_domain(&config).await.expect("Failed to create test domain");
365        let domain_id = domain.domain.id.clone();
366
367        let created = create_test_domain_data(&config, &domain_id).await.expect("Failed to create test domain data");
368        assert_eq!(created.len(), 1);
369        assert_eq!(created[0].name, "to be deleted");
370        assert_eq!(created[0].data_type, "test");
371
372        // Create a test query
373        let query = DownloadQuery {
374            ids: vec![],
375            name: None,
376            data_type: Some("test".to_string()),
377        };
378
379        // Test the download function
380        let result = client.download_domain_data(&domain_id, &query).await;
381
382        assert!(result.is_ok(), "error message : {:?}", result.err());
383
384        let results = result.unwrap();
385        assert!(results.len() > 0);
386        for result in results {
387            assert_eq!(result.metadata.data_type, "test");
388        }
389
390        // Delete the domain
391        delete_test_domain(&config, &domain_id).await.expect("Failed to delete test domain");
392    }
393
394    #[tokio::test]
395    #[ignore = "requires live Auki credentials"]
396    async fn test_upload_domain_data_with_user_credential() {
397        use futures::SinkExt;
398        let config = get_config();
399        let client = DomainClient::new_with_user_credential(
400            &config.0.api_url,
401            &config.0.dds_url,
402            &config.0.client_id,
403            &config.0.email.clone().unwrap(),
404            &config.0.password.clone().unwrap(),
405            true,
406        )
407        .await
408        .expect("Failed to create client");
409
410        let domain = create_test_domain(&config.0).await.expect("Failed to create test domain");
411        let domain_id = domain.domain.id.clone();
412
413        let created = create_test_domain_data(&config.0, &domain_id).await.expect("Failed to create test domain data");
414        assert_eq!(created.len(), 1);
415        assert_eq!(created[0].name, "to be deleted");
416        assert_eq!(created[0].data_type, "test");
417
418        let data = vec![
419            UploadDomainData {
420                action: DomainAction::Update {
421                    id: created[0].id.clone(),
422                },
423                data: "{\"test\": \"test updated\"}".as_bytes().to_vec(),
424            },
425            UploadDomainData {
426                action: DomainAction::Create {
427                    name: "to be deleted2".to_string(),
428                    data_type: "test".to_string(),
429                },
430                data: "{\"test\": \"test\"}".as_bytes().to_vec(),
431            },
432        ];
433        let (mut tx, rx) = mpsc::channel(10);
434        spawn(async move {
435            for d in data {
436                tx.send(d).await.unwrap();
437            }
438            tx.close().await.unwrap();
439        });
440        let result = client.upload_domain_data_stream(&domain_id, rx).await;
441        assert!(result.is_ok(), "error message : {:?}", result.err());
442        let created2 = result.unwrap();
443        assert_eq!(created2.len(), 2);
444
445        let ids = created2.iter().map(|d| d.id.clone()).collect::<Vec<String>>();
446        assert_eq!(ids.len(), 2);
447        // Create a test query
448        let query = DownloadQuery {
449            ids: ids,
450            name: None,
451            data_type: None,
452        };
453
454        // Test the download function
455        let result = client.download_domain_data(&domain_id, &query).await;
456
457        assert!(result.is_ok(), "error message : {:?}", result.err());
458
459        let mut to_delete = None;
460        let mut count = 0;
461        let results = result.unwrap();
462        for result in results {
463            count += 1;
464            if result.metadata.id == created[0].id {
465                assert_eq!(result.data, b"{\"test\": \"test updated\"}");
466                continue;
467            } else {
468                assert_eq!(result.data, b"{\"test\": \"test\"}");
469            }
470            to_delete = Some(result.metadata.id.clone());
471        }
472        assert_eq!(count, 2);
473        assert_eq!(to_delete.is_some(), true);
474
475        // Delete the one whose id is not "a8"
476        let delete_result = client
477            .delete_domain_data_by_id(&domain_id, &to_delete.unwrap())
478            .await;
479        assert!(
480            delete_result.is_ok(),
481            "Failed to delete data by id: {:?}",
482            delete_result.err()
483        );
484
485        // Delete the domain
486        delete_test_domain(&config.0, &domain_id).await.expect("Failed to delete test domain");
487    }
488
489    #[tokio::test]
490    #[ignore = "requires live Auki credentials"]
491    async fn test_download_domain_data_by_id() {
492        let config = get_config();
493        let client = DomainClient::new_with_app_credential(
494            &config.0.api_url,
495            &config.0.dds_url,
496            &config.0.client_id,
497            &config.0.app_key.clone().unwrap(),
498            &config.0.app_secret.clone().unwrap(),
499        )
500        .await
501        .expect("Failed to create client");
502
503        let domain = create_test_domain(&config.0).await.expect("Failed to create test domain");
504        let domain_id = domain.domain.id.clone();
505
506        let created = create_test_domain_data(&config.0, &domain_id).await.expect("Failed to create test domain data");
507        assert_eq!(created.len(), 1);
508        assert_eq!(created[0].name, "to be deleted");
509        assert_eq!(created[0].data_type, "test");
510
511        // Now test download by id
512        let download_result = client
513            .download_domain_data_by_id(&domain_id, &created[0].id)
514            .await;
515
516        assert!(
517            download_result.is_ok(),
518            "download by id failed: {:?}",
519            download_result.err()
520        );
521        let downloaded_bytes = download_result.unwrap();
522        assert_eq!(downloaded_bytes, b"{\"test\": \"test\"}".to_vec());
523
524        // Delete the domain
525        delete_test_domain(&config.0, &domain_id).await.expect("Failed to delete test domain");
526    }
527
528    #[tokio::test]
529    #[ignore = "requires live Auki credentials"]
530    async fn test_download_domain_metadata() {
531        let config = get_config();
532        let client = DomainClient::new_with_app_credential(
533            &config.0.api_url,
534            &config.0.dds_url,
535            &config.0.client_id,
536            &config.0.app_key.clone().unwrap(),
537            &config.0.app_secret.clone().unwrap(),
538        )
539        .await
540        .expect("Failed to create client");
541
542        let domain = create_test_domain(&config.0).await.expect("Failed to create test domain");
543        let domain_id = domain.domain.id.clone();
544
545        let created = create_test_domain_data(&config.0, &domain_id).await.expect("Failed to create test domain data");
546        assert_eq!(created.len(), 1);
547        assert_eq!(created[0].name, "to be deleted");
548        assert_eq!(created[0].data_type, "test");
549
550        // Download all metadata for the domain
551        let result = client
552            .download_metadata(
553                &domain_id,
554                &DownloadQuery {
555                    ids: vec![],
556                    name: None,
557                    data_type: Some("test".to_string()),
558                },
559            )
560            .await;
561        assert!(
562            result.is_ok(),
563            "Failed to download domain metadata: {:?}",
564            result.err()
565        );
566        let result = result.unwrap();
567        assert!(result.len() > 0);
568        for meta in result {
569            assert!(!meta.id.is_empty());
570            assert_eq!(meta.domain_id, domain_id);
571            assert!(!meta.name.is_empty());
572            assert_eq!(meta.data_type, "test");
573        }
574
575        // Delete the domain
576        delete_test_domain(&config.0, &domain_id).await.expect("Failed to delete test domain");
577    }
578
579    #[tokio::test]
580    async fn test_load_domain_with_oidc_access_token() {
581        let config = get_config();
582        // Assume we have a function to get a valid oidc_access_token for testing
583        let oidc_access_token =
584            std::env::var("AUTH_TEST_TOKEN").expect("AUTH_TEST_TOKEN env var not set");
585        if oidc_access_token.is_empty() {
586            eprintln!("Missing AUTH_TEST_TOKEN, skipping test");
587            return;
588        }
589
590        let client =
591            DiscoveryService::new(&config.0.api_url, &config.0.dds_url, &config.0.client_id);
592
593        let domain = client
594            .with_oidc_access_token(&oidc_access_token)
595            .auth_domain(&config.1)
596            .await;
597        assert!(domain.is_ok(), "Failed to get domain: {:?}", domain.err());
598        assert_eq!(domain.unwrap().domain.id, config.1);
599    }
600
601    #[tokio::test]
602    #[ignore = "requires live Auki credentials"]
603    async fn test_list_domains() {
604        let config = get_config();
605        let client = DomainClient::new_with_app_credential(
606            &config.0.api_url,
607            &config.0.dds_url,
608            &config.0.client_id,
609            &config.0.app_key.unwrap(),
610            &config.0.app_secret.unwrap(),
611        )
612        .await
613        .expect("Failed to create client");
614
615        let org = std::env::var("TEST_ORGANIZATION").unwrap_or("own".to_string());
616        let result = client.list_domains(&ListDomainsQuery {
617            portal_id: None,
618            portal_short_id: None,
619            org: org,
620        }).await.unwrap();
621        assert!(result.domains.len() > 0, "No domains found");
622    }
623
624    #[tokio::test]
625    #[ignore = "requires live Auki credentials"]
626    async fn test_submit_job_request_v1_with_invalid_processing_type() {
627        let config = get_config();
628        let client = DomainClient::new_with_user_credential(
629            &config.0.api_url,
630            &config.0.dds_url,
631            &config.0.client_id,
632            &config.0.email.unwrap(),
633            &config.0.password.unwrap(),
634            true,
635        )
636        .await
637        .expect("Failed to create client");
638
639        let mut job_request= JobRequest::default();
640        job_request.processing_type = "invalid_processing_type".to_string();
641        let res = client.submit_job_request_v1(&config.1, &job_request).await.expect_err("Failed to submit job request");
642        assert_eq!(res.to_string(), "Auki response - status: 400 Bad Request, error: Failed to process domain. invalid processing type");
643    }
644}