1use futures::channel::mpsc::Receiver;
2
3use crate::domain_data::{
4 DomainData, DomainDataMetadata, DownloadQuery, delete_by_id, download_by_id,
5 download_metadata_v1, download_v1_stream,
6};
7#[cfg(target_family = "wasm")]
8use crate::domain_data::{UploadDomainData, upload_v1};
9
10mod auth;
11pub mod config;
12pub mod discovery;
13pub mod domain_data;
14pub mod reconstruction;
15#[cfg(target_family = "wasm")]
16pub mod wasm;
17
18use crate::auth::TokenCache;
19use crate::discovery::{DiscoveryService, DomainWithServer};
20pub use crate::reconstruction::JobRequest;
21
22#[derive(Debug, Clone)]
23pub struct DomainClient {
24 discovery_client: DiscoveryService,
25 pub client_id: String,
26}
27
28impl DomainClient {
29 pub fn new(api_url: &str, dds_url: &str, client_id: &str) -> Self {
30 if client_id.is_empty() {
31 panic!("client_id is empty");
32 }
33 Self {
34 discovery_client: DiscoveryService::new(api_url, dds_url, client_id),
35 client_id: client_id.to_string(),
36 }
37 }
38
39 pub async fn new_with_app_credential(
40 api_url: &str,
41 dds_url: &str,
42 client_id: &str,
43 app_key: &str,
44 app_secret: &str,
45 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
46 let mut dc = DomainClient::new(api_url, dds_url, client_id);
47 dc.discovery_client
48 .sign_in_as_auki_app(app_key, app_secret)
49 .await?;
50 Ok(dc)
51 }
52
53 pub async fn new_with_user_credential(
54 api_url: &str,
55 dds_url: &str,
56 client_id: &str,
57 email: &str,
58 password: &str,
59 remember_password: bool,
60 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
61 let mut dc = DomainClient::new(api_url, dds_url, client_id);
62 dc.discovery_client
63 .sign_in_with_auki_account(email, password, remember_password)
64 .await?;
65 Ok(dc)
66 }
67
68 pub fn with_oidc_access_token(&self, token: &str) -> Self {
69 Self {
70 discovery_client: self.discovery_client.with_oidc_access_token(token),
71 client_id: self.client_id.clone(),
72 }
73 }
74
75 pub async fn download_domain_data(
76 &self,
77 domain_id: &str,
78 query: &DownloadQuery,
79 ) -> Result<
80 Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
81 Box<dyn std::error::Error + Send + Sync>,
82 > {
83 let domain = self.discovery_client.auth_domain(domain_id).await?;
84 let rx = download_v1_stream(
85 &domain.domain.domain_server.url,
86 &self.client_id,
87 &domain.get_access_token(),
88 domain_id,
89 query,
90 )
91 .await?;
92 Ok(rx)
93 }
94
95 #[cfg(not(target_family = "wasm"))]
96 pub async fn upload_domain_data(
97 &self,
98 domain_id: &str,
99 data: Receiver<domain_data::UploadDomainData>,
100 ) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
101 use crate::{auth::TokenCache, domain_data::upload_v1_stream};
102 let domain = self.discovery_client.auth_domain(domain_id).await?;
103 upload_v1_stream(
104 &domain.domain.domain_server.url,
105 &domain.get_access_token(),
106 domain_id,
107 data,
108 )
109 .await
110 }
111
112 #[cfg(target_family = "wasm")]
113 pub async fn upload_domain_data(
114 &self,
115 domain_id: &str,
116 data: Vec<UploadDomainData>,
117 ) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
118 let domain = self.discovery_client.auth_domain(domain_id).await?;
119 upload_v1(
120 &domain.domain.domain_server.url,
121 &domain.get_access_token(),
122 domain_id,
123 data,
124 )
125 .await
126 }
127
128 pub async fn download_metadata(
129 &self,
130 domain_id: &str,
131 query: &DownloadQuery,
132 ) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
133 let domain = self.discovery_client.auth_domain(domain_id).await?;
134 download_metadata_v1(
135 &domain.domain.domain_server.url,
136 &self.client_id,
137 &domain.get_access_token(),
138 domain_id,
139 query,
140 )
141 .await
142 }
143
144 pub async fn download_domain_data_by_id(
145 &self,
146 domain_id: &str,
147 id: &str,
148 ) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
149 let domain = self.discovery_client.auth_domain(domain_id).await?;
150 download_by_id(
151 &domain.domain.domain_server.url,
152 &self.client_id,
153 &domain.get_access_token(),
154 domain_id,
155 id,
156 )
157 .await
158 }
159
160 pub async fn delete_domain_data_by_id(
161 &self,
162 domain_id: &str,
163 id: &str,
164 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
165 let domain = self.discovery_client.auth_domain(domain_id).await?;
166 delete_by_id(
167 &domain.domain.domain_server.url,
168 &domain.get_access_token(),
169 domain_id,
170 id,
171 )
172 .await
173 }
174
175 pub async fn submit_job_request_v1(
176 &self,
177 domain_id: &str,
178 request: &JobRequest,
179 ) -> Result<reqwest::Response, Box<dyn std::error::Error + Send + Sync>> {
180 let domain = self.discovery_client.auth_domain(domain_id).await?;
181 crate::reconstruction::forward_job_request_v1(
182 &domain.domain.domain_server.url,
183 &self.client_id,
184 &domain.get_access_token(),
185 domain_id,
186 request,
187 )
188 .await
189 }
190
191 pub async fn list_domains(
192 &self,
193 org: &str,
194 ) -> Result<Vec<DomainWithServer>, Box<dyn std::error::Error + Send + Sync>> {
195 self.discovery_client.list_domains(org).await
196 }
197}
198
199#[cfg(not(target_family = "wasm"))]
200#[cfg(test)]
201mod tests {
202 use std::time::Duration;
203
204 use crate::domain_data::{CreateDomainData, DomainAction, UpdateDomainData, UploadDomainData};
205
206 use super::*;
207 use futures::{StreamExt, channel::mpsc};
208 use tokio::{spawn, time::sleep};
209
210 fn get_config() -> (config::Config, String) {
211 if std::path::Path::new("../.env.local").exists() {
212 dotenvy::from_filename("../.env.local").ok();
213 }
214 dotenvy::dotenv().ok();
215 let config = config::Config::from_env().unwrap();
216 (config, std::env::var("DOMAIN_ID").unwrap())
217 }
218
219 #[tokio::test]
220 async fn test_download_domain_data_with_app_credential() {
221 let config = get_config();
223 let client = DomainClient::new_with_app_credential(
224 &config.0.api_url,
225 &config.0.dds_url,
226 &config.0.client_id,
227 &config.0.app_key.unwrap(),
228 &config.0.app_secret.unwrap(),
229 )
230 .await
231 .expect("Failed to create client");
232
233 let query = DownloadQuery {
235 ids: vec![],
236 name: None,
237 data_type: Some("dmt_accel_csv".to_string()),
238 };
239
240 let result = client.download_domain_data(&config.1, &query).await;
242
243 assert!(result.is_ok(), "error message : {:?}", result.err());
244
245 let mut rx = result.unwrap();
246 let mut count = 0;
247 while let Some(Ok(data)) = rx.next().await {
248 count += 1;
249 assert_eq!(data.metadata.data_type, "dmt_accel_csv");
250 }
251 assert!(count > 0);
252 }
253
254 #[tokio::test]
255 async fn test_upload_domain_data_with_user_credential() {
256 use futures::SinkExt;
257 let config = get_config();
258 let client = DomainClient::new_with_user_credential(
259 &config.0.api_url,
260 &config.0.dds_url,
261 &config.0.client_id,
262 &config.0.email.unwrap(),
263 &config.0.password.unwrap(),
264 true,
265 )
266 .await
267 .expect("Failed to create client");
268
269 let data = vec![
270 UploadDomainData {
271 action: DomainAction::Create(CreateDomainData {
272 name: "to be deleted".to_string(),
273 data_type: "test".to_string(),
274 }),
275 data: "{\"test\": \"test\"}".as_bytes().to_vec(),
276 },
277 UploadDomainData {
278 action: DomainAction::Update(UpdateDomainData {
279 id: "a84a36e5-312b-4f80-974a-06f5d19c1e16".to_string(),
280 }),
281 data: "{\"test\": \"test updated\"}".as_bytes().to_vec(),
282 },
283 ];
284 let (mut tx, rx) = mpsc::channel(10);
285 spawn(async move {
286 for d in data {
287 tx.send(d).await.unwrap();
288 }
289 tx.close().await.unwrap();
290 });
291 let result = client.upload_domain_data(&config.1, rx).await;
292
293 assert!(result.is_ok(), "error message : {:?}", result.err());
294
295 sleep(Duration::from_secs(5)).await;
296 let result = result.unwrap();
297 assert_eq!(result.len(), 2);
298
299 let ids = result.iter().map(|d| d.id.clone()).collect::<Vec<String>>();
300 assert_eq!(ids.len(), 2);
301 let query = DownloadQuery {
303 ids: ids,
304 name: None,
305 data_type: None,
306 };
307
308 let result = client.download_domain_data(&config.1, &query).await;
310
311 assert!(result.is_ok(), "error message : {:?}", result.err());
312
313 let mut to_delete = None;
314 let mut count = 0;
315 let mut rx = result.unwrap();
316 while let Some(Ok(data)) = rx.next().await {
317 count += 1;
318 if data.metadata.id == "a84a36e5-312b-4f80-974a-06f5d19c1e16" {
319 assert_eq!(data.data, b"{\"test\": \"test updated\"}");
320 continue;
321 } else {
322 assert_eq!(data.data, b"{\"test\": \"test\"}");
323 }
324 to_delete = Some(data.metadata.id.clone());
325 }
326 assert_eq!(count, 2);
327 assert_eq!(to_delete.is_some(), true);
328
329 let delete_result = client
331 .delete_domain_data_by_id(&config.1, &to_delete.unwrap())
332 .await;
333 assert!(
334 delete_result.is_ok(),
335 "Failed to delete data by id: {:?}",
336 delete_result.err()
337 );
338 }
339
340 #[tokio::test]
341 async fn test_download_domain_data_by_id() {
342 let config = get_config();
343 let client = DomainClient::new_with_app_credential(
344 &config.0.api_url,
345 &config.0.dds_url,
346 &config.0.client_id,
347 &config.0.app_key.unwrap(),
348 &config.0.app_secret.unwrap(),
349 )
350 .await
351 .expect("Failed to create client");
352
353 let download_result = client
355 .download_domain_data_by_id(&config.1, "a84a36e5-312b-4f80-974a-06f5d19c1e16")
356 .await;
357
358 assert!(
359 download_result.is_ok(),
360 "download by id failed: {:?}",
361 download_result.err()
362 );
363 let downloaded_bytes = download_result.unwrap();
364 assert_eq!(downloaded_bytes, b"{\"test\": \"test updated\"}".to_vec());
365 }
366
367 #[tokio::test]
368 async fn test_download_domain_metadata() {
369 let config = get_config();
370 let client = DomainClient::new_with_app_credential(
371 &config.0.api_url,
372 &config.0.dds_url,
373 &config.0.client_id,
374 &config.0.app_key.clone().unwrap(),
375 &config.0.app_secret.clone().unwrap(),
376 )
377 .await
378 .expect("Failed to create client");
379
380 let result = client
382 .download_metadata(
383 &config.1,
384 &DownloadQuery {
385 ids: vec![],
386 name: None,
387 data_type: Some("test".to_string()),
388 },
389 )
390 .await;
391 assert!(
392 result.is_ok(),
393 "Failed to download domain metadata: {:?}",
394 result.err()
395 );
396 let result = result.unwrap();
397 assert!(result.len() > 0);
398 for meta in result {
399 assert!(!meta.id.is_empty());
400 assert_eq!(meta.domain_id, config.1);
401 assert!(!meta.name.is_empty());
402 assert_eq!(meta.data_type, "test");
403 }
404 }
405
406 #[tokio::test]
407 async fn test_load_domain_with_oidc_access_token() {
408 let config = get_config();
409 let oidc_access_token =
411 std::env::var("AUTH_TEST_TOKEN").expect("AUTH_TEST_TOKEN env var not set");
412 if oidc_access_token.is_empty() {
413 eprintln!("Missing AUTH_TEST_TOKEN, skipping test");
414 return;
415 }
416
417 let client =
418 DiscoveryService::new(&config.0.api_url, &config.0.dds_url, &config.0.client_id);
419
420 let domain = client
421 .with_oidc_access_token(&oidc_access_token)
422 .auth_domain(&config.1)
423 .await;
424 assert!(domain.is_ok(), "Failed to get domain: {:?}", domain.err());
425 assert_eq!(domain.unwrap().domain.id, config.1);
426 }
427
428 #[tokio::test]
429 async fn test_list_domains() {
430 let config = get_config();
431 let client = DomainClient::new_with_app_credential(
432 &config.0.api_url,
433 &config.0.dds_url,
434 &config.0.client_id,
435 &config.0.app_key.unwrap(),
436 &config.0.app_secret.unwrap(),
437 )
438 .await
439 .expect("Failed to create client");
440
441 let org = std::env::var("TEST_ORGANIZATION").unwrap_or("own".to_string());
442 let result = client.list_domains(&org).await.unwrap();
443 assert!(result.len() > 0, "No domains found");
444 }
445
446 #[tokio::test]
447 async fn test_submit_job_request_v1_with_invalid_processing_type() {
448 let config = get_config();
449 let client = DomainClient::new_with_user_credential(
450 &config.0.api_url,
451 &config.0.dds_url,
452 &config.0.client_id,
453 &config.0.email.unwrap(),
454 &config.0.password.unwrap(),
455 true,
456 )
457 .await
458 .expect("Failed to create client");
459
460 let mut job_request= JobRequest::default();
461 job_request.processing_type = "invalid_processing_type".to_string();
462 let res = client.submit_job_request_v1(&config.1, &job_request).await.expect_err("Failed to submit job request");
463 assert_eq!(res.to_string(), "Failed to process domain. Status: 400 Bad Request - invalid processing type");
464 }
465}