1use futures::channel::mpsc::Receiver;
2
3use crate::domain_data::{
4 DomainData, DomainDataMetadata, DownloadQuery, delete_by_id, download_by_id,
5 download_metadata_v1, download_v1_stream,
6};
7#[cfg(target_family = "wasm")]
8use crate::domain_data::{UploadDomainData, upload_v1};
9
10mod auth;
11pub mod config;
12pub mod discovery;
13pub mod domain_data;
14pub mod reconstruction;
15#[cfg(target_family = "wasm")]
16pub mod wasm;
17
18use crate::auth::TokenCache;
19use crate::discovery::{DiscoveryService, DomainWithServer};
20pub use crate::reconstruction::JobRequest;
21
22#[derive(Debug, Clone)]
23pub struct DomainClient {
24 discovery_client: DiscoveryService,
25 pub client_id: String,
26}
27
28impl DomainClient {
29 pub fn new(api_url: &str, dds_url: &str, client_id: &str) -> Self {
30 if client_id.is_empty() {
31 panic!("client_id is empty");
32 }
33 Self {
34 discovery_client: DiscoveryService::new(api_url, dds_url, client_id),
35 client_id: client_id.to_string(),
36 }
37 }
38
39 pub async fn new_with_app_credential(
40 api_url: &str,
41 dds_url: &str,
42 client_id: &str,
43 app_key: &str,
44 app_secret: &str,
45 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
46 let mut dc = DomainClient::new(api_url, dds_url, client_id);
47 let _ = dc
48 .discovery_client
49 .sign_in_as_auki_app(app_key, app_secret)
50 .await?;
51 Ok(dc)
52 }
53
54 pub async fn new_with_user_credential(
55 api_url: &str,
56 dds_url: &str,
57 client_id: &str,
58 email: &str,
59 password: &str,
60 remember_password: bool,
61 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
62 let mut dc = DomainClient::new(api_url, dds_url, client_id);
63 let _ = dc
64 .discovery_client
65 .sign_in_with_auki_account(email, password, remember_password)
66 .await?;
67 Ok(dc)
68 }
69
70 pub fn with_oidc_access_token(&self, token: &str) -> Self {
71 Self {
72 discovery_client: self.discovery_client.with_oidc_access_token(token),
73 client_id: self.client_id.clone(),
74 }
75 }
76
77 pub async fn download_domain_data(
78 &self,
79 domain_id: &str,
80 query: &DownloadQuery,
81 ) -> Result<
82 Receiver<Result<DomainData, Box<dyn std::error::Error + Send + Sync>>>,
83 Box<dyn std::error::Error + Send + Sync>,
84 > {
85 let domain = self.discovery_client.auth_domain(domain_id).await?;
86 let rx = download_v1_stream(
87 &domain.domain.domain_server.url,
88 &self.client_id,
89 &domain.get_access_token(),
90 domain_id,
91 query,
92 )
93 .await?;
94 Ok(rx)
95 }
96
97 #[cfg(not(target_family = "wasm"))]
98 pub async fn upload_domain_data(
99 &self,
100 domain_id: &str,
101 data: Receiver<domain_data::UploadDomainData>,
102 ) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
103 use crate::{auth::TokenCache, domain_data::upload_v1_stream};
104 let domain = self.discovery_client.auth_domain(domain_id).await?;
105 upload_v1_stream(
106 &domain.domain.domain_server.url,
107 &domain.get_access_token(),
108 domain_id,
109 data,
110 )
111 .await
112 }
113
114 #[cfg(target_family = "wasm")]
115 pub async fn upload_domain_data(
116 &self,
117 domain_id: &str,
118 data: Vec<UploadDomainData>,
119 ) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
120 let domain = self.discovery_client.auth_domain(domain_id).await?;
121 upload_v1(
122 &domain.domain.domain_server.url,
123 &domain.get_access_token(),
124 domain_id,
125 data,
126 )
127 .await
128 }
129
130 pub async fn download_metadata(
131 &self,
132 domain_id: &str,
133 query: &DownloadQuery,
134 ) -> Result<Vec<DomainDataMetadata>, Box<dyn std::error::Error + Send + Sync>> {
135 let domain = self.discovery_client.auth_domain(domain_id).await?;
136 download_metadata_v1(
137 &domain.domain.domain_server.url,
138 &self.client_id,
139 &domain.get_access_token(),
140 domain_id,
141 query,
142 )
143 .await
144 }
145
146 pub async fn download_domain_data_by_id(
147 &self,
148 domain_id: &str,
149 id: &str,
150 ) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
151 let domain = self.discovery_client.auth_domain(domain_id).await?;
152 download_by_id(
153 &domain.domain.domain_server.url,
154 &self.client_id,
155 &domain.get_access_token(),
156 domain_id,
157 id,
158 )
159 .await
160 }
161
162 pub async fn delete_domain_data_by_id(
163 &self,
164 domain_id: &str,
165 id: &str,
166 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
167 let domain = self.discovery_client.auth_domain(domain_id).await?;
168 delete_by_id(
169 &domain.domain.domain_server.url,
170 &domain.get_access_token(),
171 domain_id,
172 id,
173 )
174 .await
175 }
176
177 pub async fn submit_job_request_v1(
178 &self,
179 domain_id: &str,
180 request: &JobRequest,
181 ) -> Result<reqwest::Response, Box<dyn std::error::Error + Send + Sync>> {
182 let domain = self.discovery_client.auth_domain(domain_id).await?;
183 crate::reconstruction::forward_job_request_v1(
184 &domain.domain.domain_server.url,
185 &self.client_id,
186 &domain.get_access_token(),
187 domain_id,
188 request,
189 )
190 .await
191 }
192
193 pub async fn list_domains(
194 &self,
195 org: &str,
196 ) -> Result<Vec<DomainWithServer>, Box<dyn std::error::Error + Send + Sync>> {
197 self.discovery_client.list_domains(org).await
198 }
199}
200
201#[cfg(not(target_family = "wasm"))]
202#[cfg(test)]
203mod tests {
204 use std::time::Duration;
205
206 use crate::domain_data::{CreateDomainData, DomainAction, UpdateDomainData, UploadDomainData};
207
208 use super::*;
209 use futures::{StreamExt, channel::mpsc};
210 use tokio::{spawn, time::sleep};
211
212 fn get_config() -> (config::Config, String) {
213 if std::path::Path::new("../.env.local").exists() {
214 dotenvy::from_filename("../.env.local").ok();
215 }
216 dotenvy::dotenv().ok();
217 let config = config::Config::from_env().unwrap();
218 (config, std::env::var("DOMAIN_ID").unwrap())
219 }
220
221 #[tokio::test]
222 async fn test_download_domain_data_with_app_credential() {
223 let config = get_config();
225 let client = DomainClient::new_with_app_credential(
226 &config.0.api_url,
227 &config.0.dds_url,
228 &config.0.client_id,
229 &config.0.app_key.unwrap(),
230 &config.0.app_secret.unwrap(),
231 )
232 .await
233 .expect("Failed to create client");
234
235 let query = DownloadQuery {
237 ids: vec![],
238 name: None,
239 data_type: Some("dmt_accel_csv".to_string()),
240 };
241
242 let result = client.download_domain_data(&config.1, &query).await;
244
245 assert!(result.is_ok(), "error message : {:?}", result.err());
246
247 let mut rx = result.unwrap();
248 let mut count = 0;
249 while let Some(Ok(data)) = rx.next().await {
250 count += 1;
251 assert_eq!(data.metadata.data_type, "dmt_accel_csv");
252 }
253 assert!(count > 0);
254 }
255
256 #[tokio::test]
257 async fn test_upload_domain_data_with_user_credential() {
258 use futures::SinkExt;
259 let config = get_config();
260 let client = DomainClient::new_with_user_credential(
261 &config.0.api_url,
262 &config.0.dds_url,
263 &config.0.client_id,
264 &config.0.email.unwrap(),
265 &config.0.password.unwrap(),
266 true,
267 )
268 .await
269 .expect("Failed to create client");
270
271 let data = vec![
272 UploadDomainData {
273 action: DomainAction::Create(CreateDomainData {
274 name: "to be deleted".to_string(),
275 data_type: "test".to_string(),
276 }),
277 data: "{\"test\": \"test\"}".as_bytes().to_vec(),
278 },
279 UploadDomainData {
280 action: DomainAction::Update(UpdateDomainData {
281 id: "a84a36e5-312b-4f80-974a-06f5d19c1e16".to_string(),
282 }),
283 data: "{\"test\": \"test updated\"}".as_bytes().to_vec(),
284 },
285 ];
286 let (mut tx, rx) = mpsc::channel(10);
287 spawn(async move {
288 for d in data {
289 tx.send(d).await.unwrap();
290 }
291 tx.close().await.unwrap();
292 });
293 let result = client.upload_domain_data(&config.1, rx).await;
294
295 assert!(result.is_ok(), "error message : {:?}", result.err());
296
297 sleep(Duration::from_secs(5)).await;
298 let result = result.unwrap();
299 assert_eq!(result.len(), 2);
300
301 let ids = result.iter().map(|d| d.id.clone()).collect::<Vec<String>>();
302 assert_eq!(ids.len(), 2);
303 let query = DownloadQuery {
305 ids: ids,
306 name: None,
307 data_type: None,
308 };
309
310 let result = client.download_domain_data(&config.1, &query).await;
312
313 assert!(result.is_ok(), "error message : {:?}", result.err());
314
315 let mut to_delete = None;
316 let mut count = 0;
317 let mut rx = result.unwrap();
318 while let Some(Ok(data)) = rx.next().await {
319 count += 1;
320 if data.metadata.id == "a84a36e5-312b-4f80-974a-06f5d19c1e16" {
321 assert_eq!(data.data, b"{\"test\": \"test updated\"}");
322 continue;
323 } else {
324 assert_eq!(data.data, b"{\"test\": \"test\"}");
325 }
326 to_delete = Some(data.metadata.id.clone());
327 }
328 assert_eq!(count, 2);
329 assert_eq!(to_delete.is_some(), true);
330
331 let delete_result = client
333 .delete_domain_data_by_id(&config.1, &to_delete.unwrap())
334 .await;
335 assert!(
336 delete_result.is_ok(),
337 "Failed to delete data by id: {:?}",
338 delete_result.err()
339 );
340 }
341
342 #[tokio::test]
343 async fn test_download_domain_data_by_id() {
344 let config = get_config();
345 let client = DomainClient::new_with_app_credential(
346 &config.0.api_url,
347 &config.0.dds_url,
348 &config.0.client_id,
349 &config.0.app_key.unwrap(),
350 &config.0.app_secret.unwrap(),
351 )
352 .await
353 .expect("Failed to create client");
354
355 let download_result = client
357 .download_domain_data_by_id(&config.1, "a84a36e5-312b-4f80-974a-06f5d19c1e16")
358 .await;
359
360 assert!(
361 download_result.is_ok(),
362 "download by id failed: {:?}",
363 download_result.err()
364 );
365 let downloaded_bytes = download_result.unwrap();
366 assert_eq!(downloaded_bytes, b"{\"test\": \"test updated\"}".to_vec());
367 }
368
369 #[tokio::test]
370 async fn test_download_domain_metadata() {
371 let config = get_config();
372 let client = DomainClient::new_with_app_credential(
373 &config.0.api_url,
374 &config.0.dds_url,
375 &config.0.client_id,
376 &config.0.app_key.clone().unwrap(),
377 &config.0.app_secret.clone().unwrap(),
378 )
379 .await
380 .expect("Failed to create client");
381
382 let result = client
384 .download_metadata(
385 &config.1,
386 &DownloadQuery {
387 ids: vec![],
388 name: None,
389 data_type: Some("test".to_string()),
390 },
391 )
392 .await;
393 assert!(
394 result.is_ok(),
395 "Failed to download domain metadata: {:?}",
396 result.err()
397 );
398 let result = result.unwrap();
399 assert!(result.len() > 0);
400 for meta in result {
401 assert!(!meta.id.is_empty());
402 assert_eq!(meta.domain_id, config.1);
403 assert!(!meta.name.is_empty());
404 assert_eq!(meta.data_type, "test");
405 }
406 }
407
408 #[tokio::test]
409 async fn test_load_domain_with_oidc_access_token() {
410 let config = get_config();
411 let oidc_access_token =
413 std::env::var("AUTH_TEST_TOKEN").expect("AUTH_TEST_TOKEN env var not set");
414 if oidc_access_token.is_empty() {
415 eprintln!("Missing AUTH_TEST_TOKEN, skipping test");
416 return;
417 }
418
419 let client =
420 DiscoveryService::new(&config.0.api_url, &config.0.dds_url, &config.0.client_id);
421
422 let domain = client
423 .with_oidc_access_token(&oidc_access_token)
424 .auth_domain(&config.1)
425 .await;
426 assert!(domain.is_ok(), "Failed to get domain: {:?}", domain.err());
427 assert_eq!(domain.unwrap().domain.id, config.1);
428 }
429
430 #[tokio::test]
431 async fn test_list_domains() {
432 let config = get_config();
433 let client = DomainClient::new_with_app_credential(
434 &config.0.api_url,
435 &config.0.dds_url,
436 &config.0.client_id,
437 &config.0.app_key.unwrap(),
438 &config.0.app_secret.unwrap(),
439 )
440 .await
441 .expect("Failed to create client");
442
443 let org = std::env::var("TEST_ORGANIZATION").unwrap_or("own".to_string());
444 let result = client.list_domains(&org).await.unwrap();
445 assert!(result.len() > 0, "No domains found");
446 }
447
448 #[tokio::test]
449 async fn test_submit_job_request_v1_with_invalid_processing_type() {
450 let config = get_config();
451 let client = DomainClient::new_with_user_credential(
452 &config.0.api_url,
453 &config.0.dds_url,
454 &config.0.client_id,
455 &config.0.email.unwrap(),
456 &config.0.password.unwrap(),
457 true,
458 )
459 .await
460 .expect("Failed to create client");
461
462 let mut job_request= JobRequest::default();
463 job_request.processing_type = "invalid_processing_type".to_string();
464 let res = client.submit_job_request_v1(&config.1, &job_request).await.expect_err("Failed to submit job request");
465 assert_eq!(res.to_string(), "Failed to process domain. Status: 400 Bad Request - invalid processing type");
466 }
467}