posemesh_domain_http/
lib.rs

1use futures::channel::mpsc::Receiver;
2
3use crate::domain_data::{
4    DomainData, DomainDataMetadata, DownloadQuery, delete_by_id, download_by_id,
5    download_metadata_v1, download_v1_stream,
6};
7#[cfg(target_family = "wasm")]
8use crate::domain_data::{UploadDomainData, upload_v1};
9
10mod auth;
11pub mod config;
12pub mod discovery;
13pub mod domain_data;
14#[cfg(target_family = "wasm")]
15pub mod wasm;
16
17use crate::auth::TokenCache;
18use crate::discovery::DiscoveryService;
19#[derive(Debug, Clone)]
20pub struct DomainClient {
21    discovery_client: DiscoveryService,
22    pub client_id: String,
23}
24
25impl DomainClient {
26    pub fn new(api_url: &str, dds_url: &str, client_id: &str) -> Self {
27        if client_id.is_empty() {
28            panic!("client_id is empty");
29        }
30        Self {
31            discovery_client: DiscoveryService::new(api_url, dds_url, client_id),
32            client_id: client_id.to_string(),
33        }
34    }
35
36    pub async fn new_with_app_credential(
37        api_url: &str,
38        dds_url: &str,
39        client_id: &str,
40        app_key: &str,
41        app_secret: &str,
42    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
43        let mut dc = DomainClient::new(api_url, dds_url, client_id);
44        let _ = dc
45            .discovery_client
46            .sign_in_as_auki_app(app_key, app_secret)
47            .await?;
48        Ok(dc)
49    }
50
51    pub async fn new_with_user_credential(
52        api_url: &str,
53        dds_url: &str,
54        client_id: &str,
55        email: &str,
56        password: &str,
57        remember_password: bool,
58    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
59        let mut dc = DomainClient::new(api_url, dds_url, client_id);
60        let _ = dc
61            .discovery_client
62            .sign_in_with_auki_account(email, password, remember_password)
63            .await?;
64        Ok(dc)
65    }
66
67    pub fn with_oidc_access_token(&self, token: &str) -> Self {
68        Self {
69            discovery_client: self.discovery_client.with_oidc_access_token(token),
70            client_id: self.client_id.clone(),
71        }
72    }
73
74    pub async fn download_domain_data(
75        &self,
76        domain_id: &str,
77        query: &DownloadQuery,
78    ) -> Result<
79        Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
80        Box<dyn std::error::Error + Send + Sync>,
81    > {
82        let domain = self.discovery_client.auth_domain(domain_id).await?;
83        let rx = download_v1_stream(
84            &domain.domain.domain_server.url,
85            &self.client_id,
86            &domain.get_access_token(),
87            domain_id,
88            query,
89        )
90        .await?;
91        Ok(rx)
92    }
93
94    #[cfg(not(target_family = "wasm"))]
95    pub async fn upload_domain_data(
96        &self,
97        domain_id: &str,
98        data: Receiver<domain_data::UploadDomainData>,
99    ) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
100        use crate::{auth::TokenCache, domain_data::upload_v1_stream};
101        let domain = self.discovery_client.auth_domain(domain_id).await?;
102        upload_v1_stream(
103            &domain.domain.domain_server.url,
104            &domain.get_access_token(),
105            domain_id,
106            data,
107        )
108        .await
109    }
110
111    #[cfg(target_family = "wasm")]
112    pub async fn upload_domain_data(
113        &self,
114        domain_id: &str,
115        data: Vec<UploadDomainData>,
116    ) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
117        let domain = self.discovery_client.auth_domain(domain_id).await?;
118        upload_v1(
119            &domain.domain.domain_server.url,
120            &domain.get_access_token(),
121            domain_id,
122            data,
123        )
124        .await
125    }
126
127    pub async fn download_metadata(
128        &self,
129        domain_id: &str,
130        query: &DownloadQuery,
131    ) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
132        let domain = self.discovery_client.auth_domain(domain_id).await?;
133        download_metadata_v1(
134            &domain.domain.domain_server.url,
135            &self.client_id,
136            &domain.get_access_token(),
137            domain_id,
138            query,
139        )
140        .await
141    }
142
143    pub async fn download_domain_data_by_id(
144        &self,
145        domain_id: &str,
146        id: &str,
147    ) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
148        let domain = self.discovery_client.auth_domain(domain_id).await?;
149        download_by_id(
150            &domain.domain.domain_server.url,
151            &self.client_id,
152            &domain.get_access_token(),
153            domain_id,
154            id,
155        )
156        .await
157    }
158
159    pub async fn delete_domain_data_by_id(
160        &self,
161        domain_id: &str,
162        id: &str,
163    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
164        let domain = self.discovery_client.auth_domain(domain_id).await?;
165        delete_by_id(
166            &domain.domain.domain_server.url,
167            &domain.get_access_token(),
168            domain_id,
169            id,
170        )
171        .await
172    }
173}
174
175#[cfg(not(target_family = "wasm"))]
176#[cfg(test)]
177mod tests {
178    use std::time::Duration;
179
180    use crate::domain_data::{CreateDomainData, DomainAction, UpdateDomainData, UploadDomainData};
181
182    use super::*;
183    use futures::{StreamExt, channel::mpsc};
184    use tokio::{spawn, time::sleep};
185
186    fn get_config() -> (config::Config, String) {
187        if std::path::Path::new("../.env.local").exists() {
188            dotenvy::from_filename("../.env.local").ok();
189            dotenvy::dotenv().ok();
190        }
191        let config = config::Config::from_env().unwrap();
192        (config, std::env::var("DOMAIN_ID").unwrap())
193    }
194
195    #[tokio::test]
196    async fn test_download_domain_data_with_app_credential() {
197        // Create a test client
198        let config = get_config();
199        let client = DomainClient::new_with_app_credential(
200            &config.0.api_url,
201            &config.0.dds_url,
202            &config.0.client_id,
203            &config.0.app_key.unwrap(),
204            &config.0.app_secret.unwrap(),
205        )
206        .await
207        .expect("Failed to create client");
208
209        // Create a test query
210        let query = DownloadQuery {
211            ids: vec![],
212            name: None,
213            data_type: Some("dmt_accel_csv".to_string()),
214        };
215
216        // Test the download function
217        let result = client.download_domain_data(&config.1, &query).await;
218
219        assert!(result.is_ok(), "error message : {:?}", result.err());
220
221        let mut rx = result.unwrap();
222        let mut count = 0;
223        while let Some(Ok(data)) = rx.next().await {
224            count += 1;
225            assert_eq!(data.metadata.data_type, "dmt_accel_csv");
226        }
227        assert!(count > 0);
228    }
229
230    #[tokio::test]
231    async fn test_upload_domain_data_with_user_credential() {
232        use futures::SinkExt;
233        let config = get_config();
234        let client = DomainClient::new_with_user_credential(
235            &config.0.api_url,
236            &config.0.dds_url,
237            &config.0.client_id,
238            &config.0.email.unwrap(),
239            &config.0.password.unwrap(),
240            true,
241        )
242        .await
243        .expect("Failed to create client");
244
245        let data = vec![
246            UploadDomainData {
247                action: DomainAction::Create(CreateDomainData {
248                    name: "to be deleted".to_string(),
249                    data_type: "test".to_string(),
250                }),
251                data: "{\"test\": \"test\"}".as_bytes().to_vec(),
252            },
253            UploadDomainData {
254                action: DomainAction::Update(UpdateDomainData {
255                    id: "a84a36e5-312b-4f80-974a-06f5d19c1e16".to_string(),
256                }),
257                data: "{\"test\": \"test updated\"}".as_bytes().to_vec(),
258            },
259        ];
260        let (mut tx, rx) = mpsc::channel(10);
261        spawn(async move {
262            for d in data {
263                tx.send(d).await.unwrap();
264            }
265            tx.close().await.unwrap();
266        });
267        let result = client.upload_domain_data(&config.1, rx).await;
268
269        assert!(result.is_ok(), "error message : {:?}", result.err());
270
271        sleep(Duration::from_secs(5)).await;
272        let result = result.unwrap();
273        assert_eq!(result.len(), 2);
274
275        let ids = result.iter().map(|d| d.id.clone()).collect::<Vec<String>>();
276        assert_eq!(ids.len(), 2);
277        // Create a test query
278        let query = DownloadQuery {
279            ids: ids,
280            name: None,
281            data_type: None,
282        };
283
284        // Test the download function
285        let result = client.download_domain_data(&config.1, &query).await;
286
287        assert!(result.is_ok(), "error message : {:?}", result.err());
288
289        let mut to_delete = None;
290        let mut count = 0;
291        let mut rx = result.unwrap();
292        while let Some(Ok(data)) = rx.next().await {
293            count += 1;
294            if data.metadata.id == "a84a36e5-312b-4f80-974a-06f5d19c1e16" {
295                assert_eq!(data.data, b"{\"test\": \"test updated\"}");
296                continue;
297            } else {
298                assert_eq!(data.data, b"{\"test\": \"test\"}");
299            }
300            to_delete = Some(data.metadata.id.clone());
301        }
302        assert_eq!(count, 2);
303        assert_eq!(to_delete.is_some(), true);
304
305        // Delete the one whose id is not "a8"
306        let delete_result = client
307            .delete_domain_data_by_id(&config.1, &to_delete.unwrap())
308            .await;
309        assert!(
310            delete_result.is_ok(),
311            "Failed to delete data by id: {:?}",
312            delete_result.err()
313        );
314    }
315
316    #[tokio::test]
317    async fn test_download_domain_data_by_id() {
318        let config = get_config();
319        let client = DomainClient::new_with_app_credential(
320            &config.0.api_url,
321            &config.0.dds_url,
322            &config.0.client_id,
323            &config.0.app_key.unwrap(),
324            &config.0.app_secret.unwrap(),
325        )
326        .await
327        .expect("Failed to create client");
328
329        // Now test download by id
330        let download_result = client
331            .download_domain_data_by_id(&config.1, "a84a36e5-312b-4f80-974a-06f5d19c1e16")
332            .await;
333
334        assert!(
335            download_result.is_ok(),
336            "download by id failed: {:?}",
337            download_result.err()
338        );
339        let downloaded_bytes = download_result.unwrap();
340        assert_eq!(downloaded_bytes, b"{\"test\": \"test updated\"}".to_vec());
341    }
342
343    #[tokio::test]
344    async fn test_download_domain_metadata() {
345        let config = get_config();
346        let client = DomainClient::new_with_app_credential(
347            &config.0.api_url,
348            &config.0.dds_url,
349            &config.0.client_id,
350            &config.0.app_key.clone().unwrap(),
351            &config.0.app_secret.clone().unwrap(),
352        )
353        .await
354        .expect("Failed to create client");
355
356        // Download all metadata for the domain
357        let result = client
358            .download_metadata(
359                &config.1,
360                &DownloadQuery {
361                    ids: vec![],
362                    name: None,
363                    data_type: Some("test".to_string()),
364                },
365            )
366            .await;
367        assert!(
368            result.is_ok(),
369            "Failed to download domain metadata: {:?}",
370            result.err()
371        );
372        let result = result.unwrap();
373        assert!(result.len() > 0);
374        for meta in result {
375            assert!(!meta.id.is_empty());
376            assert_eq!(meta.domain_id, config.1);
377            assert!(!meta.name.is_empty());
378            assert_eq!(meta.data_type, "test");
379        }
380    }
381
382    #[tokio::test]
383    async fn test_load_domain_with_oidc_access_token() {
384        let config = get_config();
385        // Assume we have a function to get a valid oidc_access_token for testing
386        let oidc_access_token =
387            std::env::var("AUTH_TEST_TOKEN").expect("AUTH_TEST_TOKEN env var not set");
388
389        let client =
390            DiscoveryService::new(&config.0.api_url, &config.0.dds_url, &config.0.client_id);
391
392        let domain = client
393            .with_oidc_access_token(&oidc_access_token)
394            .auth_domain(&config.1)
395            .await;
396        assert!(domain.is_ok(), "Failed to get domain: {:?}", domain.err());
397        assert_eq!(domain.unwrap().domain.domain.id, config.1);
398    }
399}