Skip to main content

posemesh_domain_http/
domain_client.rs

1use crate::domain_data::{
2    DomainData, DomainDataMetadata, DownloadQuery, UploadDomainData, delete_by_id, download_by_id,
3    download_metadata_v1, download_v1_stream, upload_v1,
4};
5use futures::channel::mpsc::Receiver;
6use serde::{Deserialize, Serialize};
7
8pub use crate::auth;
9use crate::auth::TokenCache;
10pub use crate::config;
11use crate::discovery::{DiscoveryService, DomainWithToken, ListDomainsResponse};
12use crate::errors::DomainError;
13pub use crate::reconstruction::JobRequest;
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct ListDomainsQuery {
17    pub portal_id: Option<String>,
18    pub portal_short_id: Option<String>,
19    pub org: String,
20    pub domain_server_id: Option<String>,
21}
22
23#[derive(Debug, Clone)]
24pub struct DomainClient {
25    discovery_client: DiscoveryService,
26    pub client_id: String,
27}
28
29impl DomainClient {
30    pub fn new(api_url: &str, dds_url: &str, client_id: &str) -> Self {
31        if client_id.is_empty() {
32            panic!("client_id is empty");
33        }
34        Self {
35            discovery_client: DiscoveryService::new(api_url, dds_url, client_id),
36            client_id: client_id.to_string(),
37        }
38    }
39
40    pub async fn new_with_app_credential(
41        api_url: &str,
42        dds_url: &str,
43        client_id: &str,
44        app_key: &str,
45        app_secret: &str,
46    ) -> Result<Self, DomainError> {
47        let mut dc = DomainClient::new(api_url, dds_url, client_id);
48        let _ = dc
49            .discovery_client
50            .sign_in_as_auki_app(app_key, app_secret)
51            .await?;
52        Ok(dc)
53    }
54
55    pub async fn new_with_user_credential(
56        api_url: &str,
57        dds_url: &str,
58        client_id: &str,
59        email: &str,
60        password: &str,
61        remember_password: bool,
62    ) -> Result<Self, DomainError> {
63        let mut dc = DomainClient::new(api_url, dds_url, client_id);
64        let _ = dc
65            .discovery_client
66            .sign_in_with_auki_account(email, password, remember_password)
67            .await?;
68        Ok(dc)
69    }
70
71    pub fn with_oidc_access_token(&self, token: &str) -> Self {
72        Self {
73            discovery_client: self.discovery_client.with_oidc_access_token(token),
74            client_id: self.client_id.clone(),
75        }
76    }
77
78    pub async fn download_domain_data_stream(
79        &self,
80        domain_id: &str,
81        query: &DownloadQuery,
82    ) -> Result<Receiver<Result<DomainData, DomainError>>, DomainError> {
83        let domain = self.discovery_client.auth_domain(domain_id).await?;
84        let rx = download_v1_stream(
85            &domain.domain.domain_server.url,
86            &self.client_id,
87            &domain.get_access_token(),
88            domain_id,
89            query,
90        )
91        .await?;
92        Ok(rx)
93    }
94
95    pub async fn download_domain_data(
96        &self,
97        domain_id: &str,
98        query: &DownloadQuery,
99    ) -> Result<Vec<DomainData>, DomainError> {
100        use futures::StreamExt;
101        let mut rx = self.download_domain_data_stream(domain_id, query).await?;
102
103        let mut results = Vec::new();
104        while let Some(result) = rx.next().await {
105            results.push(result?);
106        }
107        Ok(results)
108    }
109
110    #[cfg(not(target_family = "wasm"))]
111    pub async fn upload_domain_data_stream(
112        &self,
113        domain_id: &str,
114        data: Receiver<UploadDomainData>,
115    ) -> Result<Vec<DomainDataMetadata>, DomainError> {
116        use crate::{auth::TokenCache, domain_data::upload_v1_stream};
117        let domain = self.discovery_client.auth_domain(domain_id).await?;
118        upload_v1_stream(
119            &domain.domain.domain_server.url,
120            &domain.get_access_token(),
121            domain_id,
122            data,
123        )
124        .await
125    }
126
127    pub async fn upload_domain_data(
128        &self,
129        domain_id: &str,
130        data: Vec<UploadDomainData>,
131    ) -> Result<Vec<DomainDataMetadata>, DomainError> {
132        let domain = self.discovery_client.auth_domain(domain_id).await?;
133        upload_v1(
134            &domain.domain.domain_server.url,
135            &domain.get_access_token(),
136            domain_id,
137            data,
138        )
139        .await
140    }
141
142    pub async fn download_metadata(
143        &self,
144        domain_id: &str,
145        query: &DownloadQuery,
146    ) -> Result<Vec<DomainDataMetadata>, DomainError> {
147        let domain = self.discovery_client.auth_domain(domain_id).await?;
148        download_metadata_v1(
149            &domain.domain.domain_server.url,
150            &self.client_id,
151            &domain.get_access_token(),
152            domain_id,
153            query,
154        )
155        .await
156    }
157
158    pub async fn download_domain_data_by_id(
159        &self,
160        domain_id: &str,
161        id: &str,
162    ) -> Result<Vec<u8>, DomainError> {
163        let domain = self.discovery_client.auth_domain(domain_id).await?;
164        download_by_id(
165            &domain.domain.domain_server.url,
166            &self.client_id,
167            &domain.get_access_token(),
168            domain_id,
169            id,
170        )
171        .await
172    }
173
174    pub async fn delete_domain_data_by_id(
175        &self,
176        domain_id: &str,
177        id: &str,
178    ) -> Result<(), DomainError> {
179        let domain = self.discovery_client.auth_domain(domain_id).await?;
180        delete_by_id(
181            &domain.domain.domain_server.url,
182            &domain.get_access_token(),
183            domain_id,
184            id,
185        )
186        .await
187    }
188
189    pub async fn submit_job_request_v1(
190        &self,
191        domain_id: &str,
192        request: &JobRequest,
193    ) -> Result<reqwest::Response, DomainError> {
194        let domain = self.discovery_client.auth_domain(domain_id).await?;
195        crate::reconstruction::forward_job_request_v1(
196            &domain.domain.domain_server.url,
197            &self.client_id,
198            &domain.get_access_token(),
199            domain_id,
200            request,
201        )
202        .await
203    }
204
205    /// Lists domains the caller has access to.
206    ///
207    /// # Arguments
208    /// * `query` - The `ListDomainsQuery` object containing the query parameters.
209    ///
210    /// - org: (required) The organization to list domains from:
211    ///   - "own": returns domains in your own organization.
212    ///   - a UUID: returns domains in that specific organization.
213    ///   - "all": returns domains across all organizations. When filtering by 'portal' (see below), this works without restrictions.
214    ///     Otherwise, 'domain_server_id' is required and the domain server must belong to your org.
215    ///     Not available for app tokens without a portal filter.
216    /// - portal_id: (optional) Full UUID of a portal to filter domains. Mutually exclusive with 'portal_short_id'.
217    /// - portal_short_id: (optional) Short ID of a portal to filter domains. Mutually exclusive with 'portal_id'.
218    /// - domain_server_id: (optional) UUID of the domain server to filter domains. Ignored if a portal filter is active.
219    ///
220    /// # Returns
221    /// * `ListDomainsResponse` - The list of domains the caller has access to.
222    ///
223    pub async fn list_domains(
224        &self,
225        query: &ListDomainsQuery,
226    ) -> Result<ListDomainsResponse, DomainError> {
227        if query.portal_id.is_none() && query.portal_short_id.is_none() {
228            self.discovery_client
229                .list_domains(&query.org, query.domain_server_id.as_deref())
230                .await
231        } else {
232            self.discovery_client
233                .list_domains_by_portal(
234                    query.portal_id.as_deref(),
235                    query.portal_short_id.as_deref(),
236                    &query.org,
237                )
238                .await
239        }
240    }
241
242    pub async fn create_domain(
243        &self,
244        name: &str,
245        domain_server_id: Option<String>,
246        domain_server_url: Option<String>,
247        redirect_url: Option<String>,
248    ) -> Result<DomainWithToken, DomainError> {
249        self.discovery_client
250            .create_domain(name, domain_server_id, domain_server_url, redirect_url)
251            .await
252    }
253
254    pub async fn delete_domain(&self, domain_id: &str) -> Result<(), DomainError> {
255        let domain = self.discovery_client.auth_domain(domain_id).await?;
256        self.discovery_client
257            .delete_domain(&domain.get_access_token(), domain_id)
258            .await
259    }
260}
261
262#[cfg(not(target_family = "wasm"))]
263#[cfg(test)]
264mod tests {
265    use crate::{
266        auth::AuthClient,
267        discovery::OWN_DOMAINS_ORG,
268        domain_data::{DomainAction, UploadDomainData},
269    };
270
271    use super::*;
272    use futures::channel::mpsc;
273    use tokio::spawn;
274
275    fn get_config() -> (config::Config, String) {
276        if std::path::Path::new("../.env.local").exists() {
277            dotenvy::from_filename("../.env.local").ok();
278        }
279        dotenvy::dotenv().ok();
280        let config = config::Config::from_env().unwrap();
281        (config, std::env::var("DOMAIN_ID").unwrap())
282    }
283
284    async fn create_test_domain(config: &config::Config) -> Result<DomainWithToken, DomainError> {
285        let client = DomainClient::new_with_user_credential(
286            &config.api_url,
287            &config.dds_url,
288            &config.client_id,
289            &config.email.clone().unwrap(),
290            &config.password.clone().unwrap(),
291            true,
292        )
293        .await
294        .expect("Failed to create test client");
295        client
296            .create_domain(
297                &format!("test_domain_{}", uuid::Uuid::new_v4()),
298                None,
299                Some(std::env::var("TEST_DOMAIN_SERVER_URL").unwrap()),
300                None,
301            )
302            .await
303    }
304
305    async fn delete_test_domain(
306        config: &config::Config,
307        domain_id: &str,
308    ) -> Result<(), DomainError> {
309        let client = DomainClient::new_with_user_credential(
310            &config.api_url,
311            &config.dds_url,
312            &config.client_id,
313            &config.email.clone().unwrap(),
314            &config.password.clone().unwrap(),
315            true,
316        )
317        .await
318        .expect("Failed to create test client");
319        client.delete_domain(domain_id).await
320    }
321
322    async fn create_test_domain_data(
323        config: &config::Config,
324        domain_id: &str,
325    ) -> Result<Vec<DomainDataMetadata>, DomainError> {
326        let client = DomainClient::new_with_user_credential(
327            &config.api_url,
328            &config.dds_url,
329            &config.client_id,
330            &config.email.clone().unwrap(),
331            &config.password.clone().unwrap(),
332            true,
333        )
334        .await
335        .expect("Failed to create test client");
336
337        let data = vec![UploadDomainData {
338            action: DomainAction::Create {
339                name: "to be deleted".to_string(),
340                data_type: "test".to_string(),
341            },
342            data: "{\"test\": \"test\"}".as_bytes().to_vec(),
343        }];
344        client.upload_domain_data(domain_id, data).await
345    }
346
347    #[tokio::test]
348    async fn get_organization_id() {
349        let config = get_config();
350        let mut client = AuthClient::new(&config.0.api_url, &config.0.client_id);
351        client
352            .sign_in_with_app_credentials(&config.0.app_key.unwrap(), &config.0.app_secret.unwrap())
353            .await
354            .expect("Failed to sign in with app credentials");
355        let token = client
356            .get_dds_access_token(None)
357            .await
358            .expect("Failed to get DDS access token");
359        let claims = auth::parse_jwt(&token).expect("Failed to parse JWT");
360        assert!(claims.org.is_some());
361    }
362
363    #[tokio::test]
364    async fn test_download_domain_data_with_app_credential() {
365        // Create a test client
366        let config = get_config();
367        let config = config.0.clone();
368        let client = DomainClient::new_with_app_credential(
369            &config.api_url,
370            &config.dds_url,
371            &config.client_id,
372            &config.app_key.clone().unwrap(),
373            &config.app_secret.clone().unwrap(),
374        )
375        .await
376        .expect("Failed to create client");
377
378        let domain = create_test_domain(&config)
379            .await
380            .expect("Failed to create test domain");
381        let domain_id = domain.domain.id.clone();
382
383        let created = create_test_domain_data(&config, &domain_id)
384            .await
385            .expect("Failed to create test domain data");
386        assert_eq!(created.len(), 1);
387        assert_eq!(created[0].name, "to be deleted");
388        assert_eq!(created[0].data_type, "test");
389
390        // Create a test query
391        let query = DownloadQuery {
392            ids: vec![],
393            name: None,
394            data_type: Some("test".to_string()),
395        };
396
397        // Test the download function
398        let result = client.download_domain_data(&domain_id, &query).await;
399
400        assert!(result.is_ok(), "error message : {:?}", result.err());
401
402        let results = result.unwrap();
403        assert!(!results.is_empty());
404        for result in results {
405            assert_eq!(result.metadata.data_type, "test");
406        }
407
408        // Delete the domain
409        delete_test_domain(&config, &domain_id)
410            .await
411            .expect("Failed to delete test domain");
412    }
413
414    #[tokio::test]
415    async fn test_upload_domain_data_with_user_credential() {
416        use futures::SinkExt;
417        let config = get_config();
418        let client = DomainClient::new_with_user_credential(
419            &config.0.api_url,
420            &config.0.dds_url,
421            &config.0.client_id,
422            &config.0.email.clone().unwrap(),
423            &config.0.password.clone().unwrap(),
424            true,
425        )
426        .await
427        .expect("Failed to create client");
428
429        let domain = create_test_domain(&config.0)
430            .await
431            .expect("Failed to create test domain");
432        let domain_id = domain.domain.id.clone();
433
434        let created = create_test_domain_data(&config.0, &domain_id)
435            .await
436            .expect("Failed to create test domain data");
437        assert_eq!(created.len(), 1);
438        assert_eq!(created[0].name, "to be deleted");
439        assert_eq!(created[0].data_type, "test");
440
441        let data = vec![
442            UploadDomainData {
443                action: DomainAction::Update {
444                    id: created[0].id.clone(),
445                },
446                data: "{\"test\": \"test updated\"}".as_bytes().to_vec(),
447            },
448            UploadDomainData {
449                action: DomainAction::Create {
450                    name: "to be deleted2".to_string(),
451                    data_type: "test".to_string(),
452                },
453                data: "{\"test\": \"test\"}".as_bytes().to_vec(),
454            },
455        ];
456        let (mut tx, rx) = mpsc::channel(10);
457        spawn(async move {
458            for d in data {
459                tx.send(d).await.unwrap();
460            }
461            tx.close().await.unwrap();
462        });
463        let result = client.upload_domain_data_stream(&domain_id, rx).await;
464        assert!(result.is_ok(), "error message : {:?}", result.err());
465        let created2 = result.unwrap();
466        assert_eq!(created2.len(), 2);
467
468        let ids = created2
469            .iter()
470            .map(|d| d.id.clone())
471            .collect::<Vec<String>>();
472        assert_eq!(ids.len(), 2);
473        // Create a test query
474        let query = DownloadQuery {
475            ids,
476            name: None,
477            data_type: None,
478        };
479
480        // Test the download function
481        let result = client.download_domain_data(&domain_id, &query).await;
482
483        assert!(result.is_ok(), "error message : {:?}", result.err());
484
485        let mut to_delete = None;
486        let mut count = 0;
487        let results = result.unwrap();
488        for result in results {
489            count += 1;
490            if result.metadata.id == created[0].id {
491                assert_eq!(result.data, b"{\"test\": \"test updated\"}");
492                continue;
493            } else {
494                assert_eq!(result.data, b"{\"test\": \"test\"}");
495            }
496            to_delete = Some(result.metadata.id.clone());
497        }
498        assert_eq!(count, 2);
499        assert!(to_delete.is_some());
500
501        // Delete the one whose id is not "a8"
502        let delete_result = client
503            .delete_domain_data_by_id(&domain_id, &to_delete.unwrap())
504            .await;
505        assert!(
506            delete_result.is_ok(),
507            "Failed to delete data by id: {:?}",
508            delete_result.err()
509        );
510
511        // Delete the domain
512        delete_test_domain(&config.0, &domain_id)
513            .await
514            .expect("Failed to delete test domain");
515    }
516
517    #[tokio::test]
518    async fn test_download_domain_data_by_id() {
519        let config = get_config();
520        let client = DomainClient::new_with_app_credential(
521            &config.0.api_url,
522            &config.0.dds_url,
523            &config.0.client_id,
524            &config.0.app_key.clone().unwrap(),
525            &config.0.app_secret.clone().unwrap(),
526        )
527        .await
528        .expect("Failed to create client");
529
530        let domain = create_test_domain(&config.0)
531            .await
532            .expect("Failed to create test domain");
533        let domain_id = domain.domain.id.clone();
534
535        let created = create_test_domain_data(&config.0, &domain_id)
536            .await
537            .expect("Failed to create test domain data");
538        assert_eq!(created.len(), 1);
539        assert_eq!(created[0].name, "to be deleted");
540        assert_eq!(created[0].data_type, "test");
541
542        // Now test download by id
543        let download_result = client
544            .download_domain_data_by_id(&domain_id, &created[0].id)
545            .await;
546
547        assert!(
548            download_result.is_ok(),
549            "download by id failed: {:?}",
550            download_result.err()
551        );
552        let downloaded_bytes = download_result.unwrap();
553        assert_eq!(downloaded_bytes, b"{\"test\": \"test\"}".to_vec());
554
555        // Delete the domain
556        delete_test_domain(&config.0, &domain_id)
557            .await
558            .expect("Failed to delete test domain");
559    }
560
561    #[tokio::test]
562    async fn test_download_domain_metadata() {
563        let config = get_config();
564        let client = DomainClient::new_with_app_credential(
565            &config.0.api_url,
566            &config.0.dds_url,
567            &config.0.client_id,
568            &config.0.app_key.clone().unwrap(),
569            &config.0.app_secret.clone().unwrap(),
570        )
571        .await
572        .expect("Failed to create client");
573
574        let domain = create_test_domain(&config.0)
575            .await
576            .expect("Failed to create test domain");
577        let domain_id = domain.domain.id.clone();
578
579        let created = create_test_domain_data(&config.0, &domain_id)
580            .await
581            .expect("Failed to create test domain data");
582        assert_eq!(created.len(), 1);
583        assert_eq!(created[0].name, "to be deleted");
584        assert_eq!(created[0].data_type, "test");
585
586        // Download all metadata for the domain
587        let result = client
588            .download_metadata(
589                &domain_id,
590                &DownloadQuery {
591                    ids: vec![],
592                    name: None,
593                    data_type: Some("test".to_string()),
594                },
595            )
596            .await;
597        assert!(
598            result.is_ok(),
599            "Failed to download domain metadata: {:?}",
600            result.err()
601        );
602        let result = result.unwrap();
603        assert!(!result.is_empty());
604        for meta in result {
605            assert!(!meta.id.is_empty());
606            assert_eq!(meta.domain_id, domain_id);
607            assert!(!meta.name.is_empty());
608            assert_eq!(meta.data_type, "test");
609        }
610
611        // Delete the domain
612        delete_test_domain(&config.0, &domain_id)
613            .await
614            .expect("Failed to delete test domain");
615    }
616
617    #[tokio::test]
618    async fn test_load_domain_with_oidc_access_token() {
619        let config = get_config();
620        // Assume we have a function to get a valid oidc_access_token for testing
621        let oidc_access_token =
622            std::env::var("AUTH_TEST_TOKEN").expect("AUTH_TEST_TOKEN env var not set");
623        if oidc_access_token.is_empty() {
624            eprintln!("Missing AUTH_TEST_TOKEN, skipping test");
625            return;
626        }
627
628        let client =
629            DiscoveryService::new(&config.0.api_url, &config.0.dds_url, &config.0.client_id);
630
631        let domain = client
632            .with_oidc_access_token(&oidc_access_token)
633            .auth_domain(&config.1)
634            .await;
635        assert!(domain.is_ok(), "Failed to get domain: {:?}", domain.err());
636        assert_eq!(domain.unwrap().domain.id, config.1);
637    }
638
639    #[tokio::test]
640    async fn test_list_domains() {
641        let config = get_config();
642        let client = DomainClient::new_with_app_credential(
643            &config.0.api_url,
644            &config.0.dds_url,
645            &config.0.client_id,
646            &config.0.app_key.unwrap(),
647            &config.0.app_secret.unwrap(),
648        )
649        .await
650        .expect("Failed to create client");
651
652        let org = std::env::var("TEST_ORGANIZATION").unwrap_or("own".to_string());
653        let result = client
654            .list_domains(&ListDomainsQuery {
655                portal_id: None,
656                portal_short_id: None,
657                org,
658                domain_server_id: None,
659            })
660            .await
661            .unwrap();
662        assert!(!result.domains.is_empty(), "No domains found");
663    }
664
665    #[tokio::test]
666    async fn test_list_domains_by_domain_server_id() {
667        let config = get_config();
668        let client = DomainClient::new_with_app_credential(
669            &config.0.api_url,
670            &config.0.dds_url,
671            &config.0.client_id,
672            &config.0.app_key.clone().unwrap(),
673            &config.0.app_secret.clone().unwrap(),
674        )
675        .await
676        .expect("Failed to create client");
677
678        // Use a known domain_server_id from one of the domains
679        let all_domains = client
680            .list_domains(&ListDomainsQuery {
681                portal_id: None,
682                portal_short_id: None,
683                org: OWN_DOMAINS_ORG.to_string(),
684                domain_server_id: None,
685            })
686            .await
687            .expect("Failed to list all domains");
688        assert!(!all_domains.domains.is_empty(), "No domains found");
689
690        let domain_server_id = &all_domains.domains[0].domain_server_id;
691
692        let filtered_domains = client
693            .list_domains(&ListDomainsQuery {
694                portal_id: None,
695                portal_short_id: None,
696                org: OWN_DOMAINS_ORG.to_string(),
697                domain_server_id: Some(domain_server_id.clone()),
698            })
699            .await
700            .expect("Failed to list domains by domain_server_id");
701
702        // Make sure every returned domain matches the domain_server_id
703        assert!(
704            !filtered_domains.domains.is_empty(),
705            "Filtered domains should not be empty"
706        );
707        for domain in &filtered_domains.domains {
708            assert_eq!(
709                &domain.domain_server_id, domain_server_id,
710                "Domain server id should match the filter"
711            );
712        }
713    }
714
715    #[tokio::test]
716    async fn test_submit_job_request_v1_with_invalid_processing_type() {
717        let config = get_config();
718        let client = DomainClient::new_with_user_credential(
719            &config.0.api_url,
720            &config.0.dds_url,
721            &config.0.client_id,
722            &config.0.email.unwrap(),
723            &config.0.password.unwrap(),
724            true,
725        )
726        .await
727        .expect("Failed to create client");
728
729        let job_request = JobRequest {
730            processing_type: "invalid_processing_type".to_string(),
731            ..Default::default()
732        };
733        let res = client
734            .submit_job_request_v1(&config.1, &job_request)
735            .await
736            .expect_err("Failed to submit job request");
737        assert_eq!(
738            res.to_string(),
739            "Auki response - status: 400 Bad Request, error: Failed to process domain. invalid processing type"
740        );
741    }
742}