1use futures::{channel::mpsc::{self, Receiver}, StreamExt, SinkExt};
2#[cfg(not(target_family = "wasm"))]
3use tokio::spawn;
4#[cfg(target_family = "wasm")]
5use wasm_bindgen_futures::spawn_local as spawn;
6use serde::{Deserialize, Serialize};
7use crate::domain_data::{
8 DomainData, DomainDataMetadata, DownloadQuery, UploadDomainData, delete_by_id, download_by_id, download_metadata_v1, download_v1_stream, upload_v1
9};
10
11use crate::auth::TokenCache;
12use crate::discovery::{DiscoveryService, DomainWithToken, ListDomainsResponse};
13use crate::errors::DomainError;
14pub use crate::reconstruction::JobRequest;
15pub use crate::config;
16pub use crate::auth;
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct ListDomainsQuery {
20 pub portal_id: Option<String>,
21 pub portal_short_id: Option<String>,
22 pub org: String,
23}
24
25#[derive(Debug, Clone)]
26pub struct DomainClient {
27 discovery_client: DiscoveryService,
28 pub client_id: String,
29}
30
31impl DomainClient {
32 pub fn new(api_url: &str, dds_url: &str, client_id: &str) -> Self {
33 if client_id.is_empty() {
34 panic!("client_id is empty");
35 }
36 Self {
37 discovery_client: DiscoveryService::new(api_url, dds_url, client_id),
38 client_id: client_id.to_string(),
39 }
40 }
41
42 pub async fn new_with_app_credential(
43 api_url: &str,
44 dds_url: &str,
45 client_id: &str,
46 app_key: &str,
47 app_secret: &str,
48 ) -> Result<Self, DomainError> {
49 let mut dc = DomainClient::new(api_url, dds_url, client_id);
50 dc.discovery_client
51 .sign_in_as_auki_app(app_key, app_secret)
52 .await?;
53 Ok(dc)
54 }
55
56 pub async fn new_with_user_credential(
57 api_url: &str,
58 dds_url: &str,
59 client_id: &str,
60 email: &str,
61 password: &str,
62 remember_password: bool,
63 ) -> Result<Self, DomainError> {
64 let mut dc = DomainClient::new(api_url, dds_url, client_id);
65 dc.discovery_client
66 .sign_in_with_auki_account(email, password, remember_password)
67 .await?;
68 Ok(dc)
69 }
70
71 pub fn with_oidc_access_token(&self, token: &str) -> Self {
72 Self {
73 discovery_client: self.discovery_client.with_oidc_access_token(token),
74 client_id: self.client_id.clone(),
75 }
76 }
77
78 pub async fn download_domain_data_stream(
79 &self,
80 domain_id: &str,
81 query: &DownloadQuery,
82 ) -> Result<
83 Receiver<Result<DomainData, DomainError>>,
84 DomainError,
85 > {
86 let domain = self.discovery_client.auth_domain(domain_id).await?;
87 let rx_box = download_v1_stream(
88 &domain.domain.domain_server.url,
89 &self.client_id,
90 &domain.get_access_token(),
91 domain_id,
92 query,
93 )
94 .await?;
95 let (mut tx, rx) = mpsc::channel(100);
97 spawn(async move {
98 let mut rx_box = rx_box;
99 while let Some(item) = rx_box.next().await {
100 let mapped = item.map_err(DomainError::from);
101 if tx.send(mapped).await.is_err() {
102 break;
103 }
104 }
105 });
106 Ok(rx)
107 }
108
109 pub async fn download_domain_data(
110 &self,
111 domain_id: &str,
112 query: &DownloadQuery,
113 ) -> Result<
114 Vec<DomainData>,
115 DomainError,
116 > {
117 use futures::StreamExt;
118 let mut rx = self.download_domain_data_stream(domain_id, query).await?;
119
120 let mut results = Vec::new();
121 while let Some(result) = rx.next().await {
122 results.push(result?);
123 }
124 Ok(results)
125 }
126
127 #[cfg(not(target_family = "wasm"))]
128 pub async fn upload_domain_data_stream(
129 &self,
130 domain_id: &str,
131 data: Receiver<UploadDomainData>,
132 ) -> Result<Vec<DomainDataMetadata>, DomainError> {
133 use crate::{auth::TokenCache, domain_data::upload_v1_stream};
134 let domain = self.discovery_client.auth_domain(domain_id).await?;
135 upload_v1_stream(
136 &domain.domain.domain_server.url,
137 &domain.get_access_token(),
138 domain_id,
139 data,
140 )
141 .await
142 .map_err(DomainError::from)
143 }
144
145 pub async fn upload_domain_data(
146 &self,
147 domain_id: &str,
148 data: Vec<UploadDomainData>,
149 ) -> Result<Vec<DomainDataMetadata>, DomainError> {
150 let domain = self.discovery_client.auth_domain(domain_id).await?;
151 upload_v1(
152 &domain.domain.domain_server.url,
153 &domain.get_access_token(),
154 domain_id,
155 data,
156 )
157 .await
158 .map_err(DomainError::from)
159 }
160
161 pub async fn download_metadata(
162 &self,
163 domain_id: &str,
164 query: &DownloadQuery,
165 ) -> Result<Vec<DomainDataMetadata>, DomainError> {
166 let domain = self.discovery_client.auth_domain(domain_id).await?;
167 download_metadata_v1(
168 &domain.domain.domain_server.url,
169 &self.client_id,
170 &domain.get_access_token(),
171 domain_id,
172 query,
173 )
174 .await
175 .map_err(DomainError::from)
176 }
177
178 pub async fn download_domain_data_by_id(
179 &self,
180 domain_id: &str,
181 id: &str,
182 ) -> Result<Vec<u8>, DomainError> {
183 let domain = self.discovery_client.auth_domain(domain_id).await?;
184 download_by_id(
185 &domain.domain.domain_server.url,
186 &self.client_id,
187 &domain.get_access_token(),
188 domain_id,
189 id,
190 )
191 .await
192 .map_err(DomainError::from)
193 }
194
195 pub async fn delete_domain_data_by_id(
196 &self,
197 domain_id: &str,
198 id: &str,
199 ) -> Result<(), DomainError> {
200 let domain = self.discovery_client.auth_domain(domain_id).await?;
201 delete_by_id(
202 &domain.domain.domain_server.url,
203 &domain.get_access_token(),
204 domain_id,
205 id,
206 )
207 .await
208 .map_err(DomainError::from)
209 }
210
211 pub async fn submit_job_request_v1(
212 &self,
213 domain_id: &str,
214 request: &JobRequest,
215 ) -> Result<reqwest::Response, DomainError> {
216 let domain = self.discovery_client.auth_domain(domain_id).await?;
217 crate::reconstruction::forward_job_request_v1(
218 &domain.domain.domain_server.url,
219 &self.client_id,
220 &domain.get_access_token(),
221 domain_id,
222 request,
223 )
224 .await
225 .map_err(DomainError::from)
226 }
227
228 pub async fn list_domains(
229 &self,
230 query: &ListDomainsQuery,
231 ) -> Result<ListDomainsResponse, DomainError> {
232 if query.portal_id.is_none() && query.portal_short_id.is_none() {
233 self.discovery_client.list_domains(&query.org).await
234 } else {
235 self.discovery_client.list_domains_by_portal(query.portal_id.as_deref(), query.portal_short_id.as_deref(), &query.org).await
236 }
237 }
238
239 pub async fn create_domain(
240 &self,
241 name: &str,
242 domain_server_id: Option<String>,
243 domain_server_url: Option<String>,
244 redirect_url: Option<String>,
245 ) -> Result<DomainWithToken, DomainError> {
246 self.discovery_client.create_domain(name, domain_server_id, domain_server_url, redirect_url).await
247 }
248
249 pub async fn delete_domain(
250 &self,
251 domain_id: &str,
252 ) -> Result<(), DomainError> {
253 let domain = self.discovery_client.auth_domain(domain_id).await?;
254 self.discovery_client.delete_domain(&domain.get_access_token(), domain_id).await
255 }
256}
257
258#[cfg(not(target_family = "wasm"))]
259#[cfg(test)]
260mod tests {
261 use crate::{auth::AuthClient, domain_data::{DomainAction, UploadDomainData}};
262
263 use super::*;
264 use futures::channel::mpsc;
265 use tokio::spawn;
266
267 fn get_config() -> (config::Config, String) {
268 if std::path::Path::new("../.env.local").exists() {
269 dotenvy::from_filename("../.env.local").ok();
270 }
271 dotenvy::dotenv().ok();
272 let config = config::Config::from_env().unwrap();
273 (config, std::env::var("DOMAIN_ID").unwrap())
274 }
275
276 async fn create_test_domain(config: &config::Config) -> Result<DomainWithToken, DomainError> {
277 let client = DomainClient::new_with_user_credential(
278 &config.api_url,
279 &config.dds_url,
280 &config.client_id,
281 &config.email.clone().unwrap(),
282 &config.password.clone().unwrap(),
283 true,
284 )
285 .await
286 .expect("Failed to create test client");
287 client.create_domain(
288 &format!("test_domain_{}", uuid::Uuid::new_v4()),
289 None,
290 Some(std::env::var("TEST_DOMAIN_SERVER_URL").unwrap()),
291 None,
292 )
293 .await
294 }
295
296 async fn delete_test_domain(config: &config::Config, domain_id: &str) -> Result<(), DomainError> {
297 let client = DomainClient::new_with_user_credential(
298 &config.api_url,
299 &config.dds_url,
300 &config.client_id,
301 &config.email.clone().unwrap(),
302 &config.password.clone().unwrap(),
303 true,
304 )
305 .await
306 .expect("Failed to create test client");
307 client.delete_domain(domain_id).await
308 }
309
310 async fn create_test_domain_data(config: &config::Config, domain_id: &str) -> Result<Vec<DomainDataMetadata>, DomainError> {
311 let client = DomainClient::new_with_user_credential(
312 &config.api_url,
313 &config.dds_url,
314 &config.client_id,
315 &config.email.clone().unwrap(),
316 &config.password.clone().unwrap(),
317 true,
318 )
319 .await
320 .expect("Failed to create test client");
321
322 let data = vec![
323 UploadDomainData {
324 action: DomainAction::Create {
325 name: "to be deleted".to_string(),
326 data_type: "test".to_string(),
327 },
328 data: "{\"test\": \"test\"}".as_bytes().to_vec(),
329 },
330 ];
331 client.upload_domain_data(domain_id, data).await
332 }
333
334 #[tokio::test]
335 #[ignore = "requires live Auki credentials"]
336 async fn get_organization_id() {
337 let config = get_config();
338 let mut client = AuthClient::new(
339 &config.0.api_url,
340 &config.0.client_id,
341 );
342 client.sign_in_with_app_credentials(&config.0.app_key.unwrap(), &config.0.app_secret.unwrap()).await.expect("Failed to sign in with app credentials");
343 let token = client.get_dds_access_token(None).await.expect("Failed to get DDS access token");
344 let claims = auth::parse_jwt(&token).expect("Failed to parse JWT");
345 assert_ne!(claims.org.is_some(), false);
346 }
347
348 #[tokio::test]
349 #[ignore = "requires live Auki credentials"]
350 async fn test_download_domain_data_with_app_credential() {
351 let config = get_config();
353 let config = config.0.clone();
354 let client = DomainClient::new_with_app_credential(
355 &config.api_url,
356 &config.dds_url,
357 &config.client_id,
358 &config.app_key.clone().unwrap(),
359 &config.app_secret.clone().unwrap(),
360 )
361 .await
362 .expect("Failed to create client");
363
364 let domain = create_test_domain(&config).await.expect("Failed to create test domain");
365 let domain_id = domain.domain.id.clone();
366
367 let created = create_test_domain_data(&config, &domain_id).await.expect("Failed to create test domain data");
368 assert_eq!(created.len(), 1);
369 assert_eq!(created[0].name, "to be deleted");
370 assert_eq!(created[0].data_type, "test");
371
372 let query = DownloadQuery {
374 ids: vec![],
375 name: None,
376 data_type: Some("test".to_string()),
377 };
378
379 let result = client.download_domain_data(&domain_id, &query).await;
381
382 assert!(result.is_ok(), "error message : {:?}", result.err());
383
384 let results = result.unwrap();
385 assert!(results.len() > 0);
386 for result in results {
387 assert_eq!(result.metadata.data_type, "test");
388 }
389
390 delete_test_domain(&config, &domain_id).await.expect("Failed to delete test domain");
392 }
393
394 #[tokio::test]
395 #[ignore = "requires live Auki credentials"]
396 async fn test_upload_domain_data_with_user_credential() {
397 use futures::SinkExt;
398 let config = get_config();
399 let client = DomainClient::new_with_user_credential(
400 &config.0.api_url,
401 &config.0.dds_url,
402 &config.0.client_id,
403 &config.0.email.clone().unwrap(),
404 &config.0.password.clone().unwrap(),
405 true,
406 )
407 .await
408 .expect("Failed to create client");
409
410 let domain = create_test_domain(&config.0).await.expect("Failed to create test domain");
411 let domain_id = domain.domain.id.clone();
412
413 let created = create_test_domain_data(&config.0, &domain_id).await.expect("Failed to create test domain data");
414 assert_eq!(created.len(), 1);
415 assert_eq!(created[0].name, "to be deleted");
416 assert_eq!(created[0].data_type, "test");
417
418 let data = vec![
419 UploadDomainData {
420 action: DomainAction::Update {
421 id: created[0].id.clone(),
422 },
423 data: "{\"test\": \"test updated\"}".as_bytes().to_vec(),
424 },
425 UploadDomainData {
426 action: DomainAction::Create {
427 name: "to be deleted2".to_string(),
428 data_type: "test".to_string(),
429 },
430 data: "{\"test\": \"test\"}".as_bytes().to_vec(),
431 },
432 ];
433 let (mut tx, rx) = mpsc::channel(10);
434 spawn(async move {
435 for d in data {
436 tx.send(d).await.unwrap();
437 }
438 tx.close().await.unwrap();
439 });
440 let result = client.upload_domain_data_stream(&domain_id, rx).await;
441 assert!(result.is_ok(), "error message : {:?}", result.err());
442 let created2 = result.unwrap();
443 assert_eq!(created2.len(), 2);
444
445 let ids = created2.iter().map(|d| d.id.clone()).collect::<Vec<String>>();
446 assert_eq!(ids.len(), 2);
447 let query = DownloadQuery {
449 ids: ids,
450 name: None,
451 data_type: None,
452 };
453
454 let result = client.download_domain_data(&domain_id, &query).await;
456
457 assert!(result.is_ok(), "error message : {:?}", result.err());
458
459 let mut to_delete = None;
460 let mut count = 0;
461 let results = result.unwrap();
462 for result in results {
463 count += 1;
464 if result.metadata.id == created[0].id {
465 assert_eq!(result.data, b"{\"test\": \"test updated\"}");
466 continue;
467 } else {
468 assert_eq!(result.data, b"{\"test\": \"test\"}");
469 }
470 to_delete = Some(result.metadata.id.clone());
471 }
472 assert_eq!(count, 2);
473 assert_eq!(to_delete.is_some(), true);
474
475 let delete_result = client
477 .delete_domain_data_by_id(&domain_id, &to_delete.unwrap())
478 .await;
479 assert!(
480 delete_result.is_ok(),
481 "Failed to delete data by id: {:?}",
482 delete_result.err()
483 );
484
485 delete_test_domain(&config.0, &domain_id).await.expect("Failed to delete test domain");
487 }
488
489 #[tokio::test]
490 #[ignore = "requires live Auki credentials"]
491 async fn test_download_domain_data_by_id() {
492 let config = get_config();
493 let client = DomainClient::new_with_app_credential(
494 &config.0.api_url,
495 &config.0.dds_url,
496 &config.0.client_id,
497 &config.0.app_key.clone().unwrap(),
498 &config.0.app_secret.clone().unwrap(),
499 )
500 .await
501 .expect("Failed to create client");
502
503 let domain = create_test_domain(&config.0).await.expect("Failed to create test domain");
504 let domain_id = domain.domain.id.clone();
505
506 let created = create_test_domain_data(&config.0, &domain_id).await.expect("Failed to create test domain data");
507 assert_eq!(created.len(), 1);
508 assert_eq!(created[0].name, "to be deleted");
509 assert_eq!(created[0].data_type, "test");
510
511 let download_result = client
513 .download_domain_data_by_id(&domain_id, &created[0].id)
514 .await;
515
516 assert!(
517 download_result.is_ok(),
518 "download by id failed: {:?}",
519 download_result.err()
520 );
521 let downloaded_bytes = download_result.unwrap();
522 assert_eq!(downloaded_bytes, b"{\"test\": \"test\"}".to_vec());
523
524 delete_test_domain(&config.0, &domain_id).await.expect("Failed to delete test domain");
526 }
527
528 #[tokio::test]
529 #[ignore = "requires live Auki credentials"]
530 async fn test_download_domain_metadata() {
531 let config = get_config();
532 let client = DomainClient::new_with_app_credential(
533 &config.0.api_url,
534 &config.0.dds_url,
535 &config.0.client_id,
536 &config.0.app_key.clone().unwrap(),
537 &config.0.app_secret.clone().unwrap(),
538 )
539 .await
540 .expect("Failed to create client");
541
542 let domain = create_test_domain(&config.0).await.expect("Failed to create test domain");
543 let domain_id = domain.domain.id.clone();
544
545 let created = create_test_domain_data(&config.0, &domain_id).await.expect("Failed to create test domain data");
546 assert_eq!(created.len(), 1);
547 assert_eq!(created[0].name, "to be deleted");
548 assert_eq!(created[0].data_type, "test");
549
550 let result = client
552 .download_metadata(
553 &domain_id,
554 &DownloadQuery {
555 ids: vec![],
556 name: None,
557 data_type: Some("test".to_string()),
558 },
559 )
560 .await;
561 assert!(
562 result.is_ok(),
563 "Failed to download domain metadata: {:?}",
564 result.err()
565 );
566 let result = result.unwrap();
567 assert!(result.len() > 0);
568 for meta in result {
569 assert!(!meta.id.is_empty());
570 assert_eq!(meta.domain_id, domain_id);
571 assert!(!meta.name.is_empty());
572 assert_eq!(meta.data_type, "test");
573 }
574
575 delete_test_domain(&config.0, &domain_id).await.expect("Failed to delete test domain");
577 }
578
579 #[tokio::test]
580 async fn test_load_domain_with_oidc_access_token() {
581 let config = get_config();
582 let oidc_access_token =
584 std::env::var("AUTH_TEST_TOKEN").expect("AUTH_TEST_TOKEN env var not set");
585 if oidc_access_token.is_empty() {
586 eprintln!("Missing AUTH_TEST_TOKEN, skipping test");
587 return;
588 }
589
590 let client =
591 DiscoveryService::new(&config.0.api_url, &config.0.dds_url, &config.0.client_id);
592
593 let domain = client
594 .with_oidc_access_token(&oidc_access_token)
595 .auth_domain(&config.1)
596 .await;
597 assert!(domain.is_ok(), "Failed to get domain: {:?}", domain.err());
598 assert_eq!(domain.unwrap().domain.id, config.1);
599 }
600
601 #[tokio::test]
602 #[ignore = "requires live Auki credentials"]
603 async fn test_list_domains() {
604 let config = get_config();
605 let client = DomainClient::new_with_app_credential(
606 &config.0.api_url,
607 &config.0.dds_url,
608 &config.0.client_id,
609 &config.0.app_key.unwrap(),
610 &config.0.app_secret.unwrap(),
611 )
612 .await
613 .expect("Failed to create client");
614
615 let org = std::env::var("TEST_ORGANIZATION").unwrap_or("own".to_string());
616 let result = client.list_domains(&ListDomainsQuery {
617 portal_id: None,
618 portal_short_id: None,
619 org: org,
620 }).await.unwrap();
621 assert!(result.domains.len() > 0, "No domains found");
622 }
623
624 #[tokio::test]
625 #[ignore = "requires live Auki credentials"]
626 async fn test_submit_job_request_v1_with_invalid_processing_type() {
627 let config = get_config();
628 let client = DomainClient::new_with_user_credential(
629 &config.0.api_url,
630 &config.0.dds_url,
631 &config.0.client_id,
632 &config.0.email.unwrap(),
633 &config.0.password.unwrap(),
634 true,
635 )
636 .await
637 .expect("Failed to create client");
638
639 let mut job_request= JobRequest::default();
640 job_request.processing_type = "invalid_processing_type".to_string();
641 let res = client.submit_job_request_v1(&config.1, &job_request).await.expect_err("Failed to submit job request");
642 assert_eq!(res.to_string(), "Auki response - status: 400 Bad Request, error: Failed to process domain. invalid processing type");
643 }
644}