1use std::collections::HashMap;
4
5use chrono::{DateTime, Utc};
6use reqwest::{Client, Method, Response, Url};
7use serde_json::Deserializer;
8use tracing::{debug, info, trace, warn};
9
10use crate::{
11 client::response::GetShareResponse,
12 error::DeltaSharingError,
13 profile::{DeltaSharingProfileExt, Profile},
14 securable::{Schema, Share, Table},
15};
16
17use {
18 action::{File, Metadata, Protocol},
19 pagination::{Pagination, PaginationExt},
20 response::{
21 ErrorResponse, ListSchemasResponse, ListSharesResponse, ListTablesResponse, ParquetResponse,
22 },
23};
24
25pub mod action;
26pub mod pagination;
27pub mod response;
28
29const QUERY_PARAM_VERSION_TIMESTAMP: &str = "startingTimestamp";
30
31#[derive(Debug, Clone)]
33pub struct DeltaSharingClient {
34 client: Client,
35 profile: Profile,
36}
37
38impl DeltaSharingClient {
39 pub fn new(profile: Profile) -> Self {
41 Self {
42 client: Client::new(),
43 profile,
44 }
45 }
46
47 pub fn profile(&self) -> &Profile {
49 &self.profile
50 }
51
52 pub async fn list_shares_paginated(
54 &self,
55 pagination: &Pagination,
56 ) -> Result<ListSharesResponse, DeltaSharingError> {
57 let mut url = self.profile.endpoint().clone();
58 url.path_segments_mut()
59 .expect("valid base")
60 .pop_if_empty()
61 .push("shares");
62 url.set_pagination(pagination);
63 trace!("URL: {}", url);
64
65 let response = self.request(Method::GET, url).await?;
66 let status = response.status();
67
68 if status.is_success() {
69 info!("list shares status: {:?}", status);
70 Ok(response.json::<ListSharesResponse>().await?)
71 } else {
72 warn!("list shares status: {:?}", status);
73 let err = response.json::<ErrorResponse>().await?;
74 if status.is_client_error() {
75 Err(DeltaSharingError::client(err.to_string()))
76 } else {
77 Err(DeltaSharingError::server(err.to_string()))
78 }
79 }
80 }
81
82 pub async fn list_shares(&self) -> Result<Vec<Share>, DeltaSharingError> {
84 let mut shares = vec![];
85 let mut pagination = Pagination::default();
86 loop {
87 let response = self.list_shares_paginated(&pagination).await?;
88 pagination.set_next_token(response.next_page_token().map(ToOwned::to_owned));
89 shares.extend(response);
90 if pagination.is_finished() {
91 break;
92 }
93 }
94 Ok(shares)
95 }
96
97 pub async fn get_share(&self, share: &Share) -> Result<Share, DeltaSharingError> {
99 let url = url_for_share(self.profile.endpoint().clone(), share, None);
100 trace!("URL: {}", url);
101
102 let response = self.request(Method::GET, url).await?;
103 let status = response.status();
104
105 if status.is_success() {
106 info!("get share status: {:?}", status);
107 let res = response.json::<GetShareResponse>().await?;
108 Ok(res.share().clone())
109 } else {
110 warn!("get share status: {:?}", status);
111 let err = response.json::<ErrorResponse>().await?;
112 if status.is_client_error() {
113 Err(DeltaSharingError::client(err.to_string()))
114 } else {
115 Err(DeltaSharingError::server(err.to_string()))
116 }
117 }
118 }
119
120 pub async fn list_schemas_paginated(
122 &self,
123 share: &Share,
124 pagination: &Pagination,
125 ) -> Result<ListSchemasResponse, DeltaSharingError> {
126 let mut url = url_for_share(self.profile.endpoint().clone(), share, Some("schemas"));
127 url.set_pagination(pagination);
128 trace!("URL: {}", url);
129
130 let response = self.request(Method::GET, url).await?;
131 let status = response.status();
132
133 if status.is_success() {
134 info!("list schemas status: {:?}", status);
135 Ok(response.json::<ListSchemasResponse>().await?)
136 } else {
137 warn!("list schemas status: {:?}", status);
138 let err = response.json::<ErrorResponse>().await?;
139 if status.is_client_error() {
140 Err(DeltaSharingError::client(err.to_string()))
141 } else {
142 Err(DeltaSharingError::server(err.to_string()))
143 }
144 }
145 }
146
147 pub async fn list_schemas(&self, share: &Share) -> Result<Vec<Schema>, DeltaSharingError> {
149 let mut schemas = vec![];
150 let mut pagination = Pagination::default();
151 loop {
152 let response = self.list_schemas_paginated(share, &pagination).await?;
153 pagination.set_next_token(response.next_page_token().map(ToOwned::to_owned));
154 schemas.extend(response);
155 if pagination.is_finished() {
156 break;
157 }
158 }
159 Ok(schemas)
160 }
161
162 pub async fn list_tables_in_schema_paginated(
164 &self,
165 schema: &Schema,
166 pagination: &Pagination,
167 ) -> Result<ListTablesResponse, DeltaSharingError> {
168 let mut url = url_for_schema(self.profile.endpoint().clone(), schema, Some("tables"));
169 url.set_pagination(pagination);
170 trace!("URL: {}", url);
171
172 let response = self.request(Method::GET, url).await?;
173 let status = response.status();
174
175 if status.is_success() {
176 info!("list tables in schema status: {:?}", status);
177 Ok(response.json::<ListTablesResponse>().await?)
178 } else {
179 warn!("list tables in schema status: {:?}", status);
180 let err = response.json::<ErrorResponse>().await?;
181 if status.is_client_error() {
182 Err(DeltaSharingError::client(err.to_string()))
183 } else {
184 Err(DeltaSharingError::server(err.to_string()))
185 }
186 }
187 }
188
189 pub async fn list_tables(&self, schema: &Schema) -> Result<Vec<Table>, DeltaSharingError> {
191 let mut tables = vec![];
192 let mut pagination = Pagination::default();
193 loop {
194 let response = self
195 .list_tables_in_schema_paginated(schema, &pagination)
196 .await?;
197 pagination.set_next_token(response.next_page_token().map(ToOwned::to_owned));
198 tables.extend(response);
199 if pagination.is_finished() {
200 break;
201 }
202 }
203 Ok(tables)
204 }
205
206 async fn list_tables_in_share_paginated(
208 &self,
209 share: &Share,
210 pagination: &Pagination,
211 ) -> Result<ListTablesResponse, DeltaSharingError> {
212 let mut url = url_for_share(self.profile.endpoint().clone(), share, Some("all-tables"));
213 url.set_pagination(pagination);
214 trace!("URL: {}", url);
215
216 let response = self.request(Method::GET, url).await?;
217 let status = response.status();
218
219 if status.is_success() {
220 info!("list tables in share status: {:?}", status);
221 Ok(response.json::<ListTablesResponse>().await?)
222 } else {
223 warn!("list tables in share status: {:?}", status);
224 let err = response.json::<ErrorResponse>().await?;
225 if status.is_client_error() {
226 Err(DeltaSharingError::client(err.to_string()))
227 } else {
228 Err(DeltaSharingError::server(err.to_string()))
229 }
230 }
231 }
232
233 pub async fn list_all_tables(&self, share: &Share) -> Result<Vec<Table>, DeltaSharingError> {
235 let mut tables = vec![];
236 let mut pagination = Pagination::default();
237 loop {
238 let response = self
239 .list_tables_in_share_paginated(share, &pagination)
240 .await?;
241 pagination.set_next_token(response.next_page_token().map(ToOwned::to_owned));
242 tables.extend(response);
243 if pagination.is_finished() {
244 break;
245 }
246 }
247 Ok(tables)
248 }
249
250 pub async fn get_table_version(
252 &self,
253 table: &Table,
254 starting_timestamp: Option<DateTime<Utc>>,
255 ) -> Result<u64, DeltaSharingError> {
256 let mut url = url_for_table(self.profile.endpoint().clone(), table, Some("version"));
257 if let Some(ts) = starting_timestamp {
258 url.query_pairs_mut().append_pair(
259 QUERY_PARAM_VERSION_TIMESTAMP,
260 &ts.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
261 );
262 }
263 trace!("URL: {}", url);
264
265 let response = self.request(Method::GET, url).await?;
266 let status = response.status();
267
268 if status.is_success() {
269 info!("get table version status: {:?}", status);
270 parse_table_version(&response)
271 } else {
272 warn!("get table version status: {:?}", status);
273 let err = response.json::<ErrorResponse>().await?;
274 if status.is_client_error() {
275 Err(DeltaSharingError::client(err.to_string()))
276 } else {
277 Err(DeltaSharingError::server(err.to_string()))
278 }
279 }
280 }
281
282 pub async fn get_table_metadata(
284 &self,
285 table: &Table,
286 ) -> Result<(Protocol, Metadata), DeltaSharingError> {
287 let url = url_for_table(self.profile.endpoint().clone(), table, Some("metadata"));
288 trace!("requesting: {}", url);
289
290 let response = self.request(Method::GET, url).await?;
291 let status = response.status();
292
293 if !status.is_success() {
294 warn!("get table metadata status: {:?}", status);
295 let res = response.json::<ErrorResponse>().await.unwrap();
296 if status.is_client_error() {
297 Err(DeltaSharingError::client(res.to_string()))
298 } else {
299 Err(DeltaSharingError::server(res.to_string()))
300 }
301 } else {
302 info!("get table metadata status: {:?}", status);
303 let full = response.bytes().await?;
304 let mut lines = Deserializer::from_slice(&full).into_iter::<ParquetResponse>();
305
306 let protocol = lines
307 .next()
308 .and_then(Result::ok)
309 .and_then(ParquetResponse::to_protocol)
310 .ok_or(DeltaSharingError::parse_response("parsing protocol failed"))?;
311 let metadata = lines
312 .next()
313 .and_then(Result::ok)
314 .and_then(ParquetResponse::to_metadata)
315 .ok_or(DeltaSharingError::parse_response("parsing metadata failed"))?;
316
317 Ok((protocol, metadata))
318 }
319 }
320
321 pub async fn get_table_data(
323 &self,
324 table: &Table,
325 predicates: Option<String>,
326 limit: Option<u32>,
327 ) -> Result<Vec<File>, DeltaSharingError> {
328 let url = url_for_table(self.profile.endpoint().clone(), table, Some("query"));
329 trace!("requesting: {}", url);
330
331 let mut body: HashMap<String, String> = HashMap::new();
332 if let Some(pred) = predicates {
333 body.insert("jsonPredicateHints".to_string(), pred);
334 }
335 if let Some(lim) = limit {
336 body.insert("limitHint".to_string(), lim.to_string());
337 }
338 debug!("body: {:?}", body);
339
340 let response = self
341 .client
342 .request(Method::POST, url)
343 .json(&body)
344 .authorize_with_profile(&self.profile)?
345 .send()
346 .await?;
347 let status = response.status();
348
349 if !status.is_success() {
350 warn!("get table data status: {:?}", status);
351 let err = response.json::<ErrorResponse>().await.unwrap();
352 if status.is_client_error() {
353 Err(DeltaSharingError::client(err.to_string()))
354 } else {
355 Err(DeltaSharingError::server(err.to_string()))
356 }
357 } else {
358 let full = response.bytes().await?;
359 let mut lines = Deserializer::from_slice(&full).into_iter::<ParquetResponse>();
360
361 let _ = lines
362 .next()
363 .and_then(Result::ok)
364 .and_then(ParquetResponse::to_protocol)
365 .ok_or(DeltaSharingError::parse_response("parsing protocol failed"))?;
366 let _ = lines
367 .next()
368 .and_then(Result::ok)
369 .and_then(ParquetResponse::to_metadata)
370 .ok_or(DeltaSharingError::parse_response("parsing metadata failed"))?;
371
372 let mut files = vec![];
373 for line in lines {
374 let file = line.ok().and_then(ParquetResponse::to_file);
375 if let Some(f) = file {
376 files.push(f);
377 }
378 }
379
380 Ok(files)
381 }
382 }
383
384 async fn _get_table_changes(&self, _table: &Table) {
385 todo!()
386 }
387
388 async fn request(&self, method: Method, url: Url) -> Result<Response, DeltaSharingError> {
389 self.client
390 .request(method, url)
391 .authorize_with_profile(&self.profile)
392 .unwrap()
393 .send()
394 .await
395 .map_err(Into::into)
396 }
397}
398
399fn url_for_share(endpoint: Url, share: &Share, res: Option<&str>) -> Url {
400 let mut url = endpoint;
401 url.path_segments_mut()
402 .expect("valid base")
403 .pop_if_empty()
404 .push("shares")
405 .push(share.name());
406 if let Some(r) = res {
407 url.path_segments_mut().expect("valid base").push(r);
408 }
409 url
410}
411
412fn url_for_schema(endpoint: Url, schema: &Schema, res: Option<&str>) -> Url {
413 let mut url = endpoint;
414 url.path_segments_mut()
415 .expect("valid base")
416 .pop_if_empty()
417 .extend(&["shares", schema.share_name(), "schemas", schema.name()]);
418 if let Some(r) = res {
419 url.path_segments_mut().expect("valid base").push(r);
420 }
421 url
422}
423
424fn url_for_table(endpoint: Url, table: &Table, res: Option<&str>) -> Url {
425 let mut url = endpoint;
426 url.path_segments_mut()
427 .expect("valid base")
428 .pop_if_empty()
429 .extend(&[
430 "shares",
431 table.share_name(),
432 "schemas",
433 table.schema_name(),
434 "tables",
435 table.name(),
436 ]);
437 if let Some(r) = res {
438 url.path_segments_mut().expect("valid base").push(r);
439 }
440 url
441}
442
443fn parse_table_version(response: &Response) -> Result<u64, DeltaSharingError> {
444 response
445 .headers()
446 .get("Delta-Table-Version")
447 .and_then(|v| v.to_str().ok())
448 .and_then(|v| v.parse::<u64>().ok())
449 .ok_or(DeltaSharingError::parse_response("parsing version failed"))
450}
451
452#[cfg(test)]
453mod test {
454 use chrono::{TimeZone, Utc};
455 use httpmock::MockServer;
456 use serde_json::json;
457 use tracing_test::traced_test;
458
459 use crate::profile::ProfileType;
460
461 use super::*;
462
463 #[test]
464 fn test_url_for_share() {
465 let endpoint = Url::parse("https://example.com/prefix").unwrap();
466 let share = Share::new("my-share", None);
467 let url = url_for_share(endpoint.clone(), &share, None);
468 assert_eq!(url.as_str(), "https://example.com/prefix/shares/my-share");
469
470 let url = url_for_share(endpoint.clone(), &share, Some("res"));
471 assert_eq!(
472 url.as_str(),
473 "https://example.com/prefix/shares/my-share/res"
474 );
475 }
476
477 #[test]
478 fn test_url_for_schema() {
479 let endpoint = Url::parse("https://example.com/prefix/").unwrap();
480 let schema = Schema::new("my-share", "my-schema");
481 let url = url_for_schema(endpoint.clone(), &schema, None);
482 assert_eq!(
483 url.as_str(),
484 "https://example.com/prefix/shares/my-share/schemas/my-schema"
485 );
486
487 let url = url_for_schema(endpoint.clone(), &schema, Some("res"));
488 assert_eq!(
489 url.as_str(),
490 "https://example.com/prefix/shares/my-share/schemas/my-schema/res"
491 );
492 }
493
494 #[test]
495 fn test_url_for_table() {
496 let endpoint = Url::parse("https://example.com/prefix").unwrap();
497 let table = Table::new("my-share", "my-schema", "my-table", None, None);
498 let url = url_for_table(endpoint.clone(), &table, None);
499 assert_eq!(
500 url.as_str(),
501 "https://example.com/prefix/shares/my-share/schemas/my-schema/tables/my-table"
502 );
503
504 let url = url_for_table(endpoint.clone(), &table, Some("res"));
505 assert_eq!(
506 url.as_str(),
507 "https://example.com/prefix/shares/my-share/schemas/my-schema/tables/my-table/res"
508 );
509 }
510
511 fn build_sharing_client(server: &MockServer) -> DeltaSharingClient {
512 let profile_type = ProfileType::new_bearer_token("test-token", None);
513 let mock_server_url = server.base_url().parse::<Url>().unwrap();
514 let profile: Profile = Profile::from_profile_type(1, mock_server_url, profile_type);
515 DeltaSharingClient::new(profile)
516 }
517
518 #[traced_test]
519 #[tokio::test]
520 async fn list_shares_paginated() {
521 let server = MockServer::start();
522 let mock = server.mock(|when, then| {
523 when.method("GET")
524 .path("/shares")
525 .query_param("maxResults", "1")
526 .query_param("pageToken", "foo")
527 .header_exists("authorization");
528 then.status(200)
529 .header("content-type", "application/json")
530 .header("charset", "utf-8")
531 .body_from_file("./src/client/resources/list_shares.json");
532 });
533 let client = build_sharing_client(&server);
534
535 let result = client
536 .list_shares_paginated(&Pagination::start(Some(1), Some("foo".into())))
537 .await
538 .unwrap();
539
540 mock.assert();
541 assert_eq!(
542 result.items(),
543 vec![
544 Share::new(
545 "vaccine_share",
546 Some("edacc4a7-6600-4fbb-85f3-a62a5ce6761f")
547 ),
548 Share::new("sales_share", Some("3e979c79-6399-4dac-bcf8-54e268f48515"))
549 ]
550 );
551 assert_eq!(result.next_page_token(), Some("..."));
552 }
553
554 #[traced_test]
555 #[tokio::test]
556 async fn get_share() {
557 let server = MockServer::start();
558 let mock = server.mock(|when, then| {
559 when.method("GET")
560 .path("/shares/vaccine_share")
561 .header_exists("authorization");
562 then.status(200)
563 .header("content-type", "application/json")
564 .header("charset", "utf-8")
565 .body_from_file("./src/client/resources/get_share.json");
566 });
567 let client = build_sharing_client(&server);
568 let share = Share::new(
569 "vaccine_share",
570 Some("edacc4a7-6600-4fbb-85f3-a62a5ce6761f"),
571 );
572
573 let result = client.get_share(&share).await.unwrap();
574
575 mock.assert();
576 assert_eq!(result, share);
577 }
578
579 #[traced_test]
580 #[tokio::test]
581 async fn list_schemas_paginated() {
582 let server = MockServer::start();
583 let mock = server.mock(|when, then| {
584 when.method("GET")
585 .path("/shares/vaccine_share/schemas")
586 .query_param("maxResults", "1")
587 .query_param("pageToken", "foo")
588 .header_exists("authorization");
589 then.status(200)
590 .header("content-type", "application/json")
591 .header("charset", "utf-8")
592 .body_from_file("./src/client/resources/list_schemas.json");
593 });
594 let client = build_sharing_client(&server);
595 let share = Share::new("vaccine_share", None);
596
597 let result = client
598 .list_schemas_paginated(&share, &Pagination::start(Some(1), Some("foo".into())))
599 .await
600 .unwrap();
601
602 mock.assert();
603 assert_eq!(
604 result.items(),
605 vec![Schema::new("vaccine_share", "acme_vaccine_data")]
606 );
607 assert_eq!(result.next_page_token(), Some("..."));
608 }
609
610 #[traced_test]
611 #[tokio::test]
612 async fn list_tables_in_schema_paginated() {
613 let server = MockServer::start();
614 let mock = server.mock(|when, then| {
615 when.method("GET")
616 .path("/shares/vaccine_share/schemas/acme_vaccine_data/tables")
617 .query_param("maxResults", "1")
618 .query_param("pageToken", "foo")
619 .header_exists("authorization");
620 then.status(200)
621 .header("content-type", "application/json")
622 .header("charset", "utf-8")
623 .body_from_file("./src/client/resources/list_tables_in_schema.json");
624 });
625 let client = build_sharing_client(&server);
626 let schema = Schema::new("vaccine_share", "acme_vaccine_data");
627
628 let result = client
629 .list_tables_in_schema_paginated(
630 &schema,
631 &Pagination::start(Some(1), Some("foo".into())),
632 )
633 .await
634 .unwrap();
635
636 mock.assert();
637 assert_eq!(
638 result.items(),
639 vec![
640 Table::new(
641 "vaccine_share",
642 "acme_vaccine_data",
643 "vaccine_ingredients",
644 Some("edacc4a7-6600-4fbb-85f3-a62a5ce6761f".into()),
645 Some("dcb1e680-7da4-4041-9be8-88aff508d001".into())
646 ),
647 Table::new(
648 "vaccine_share",
649 "acme_vaccine_data",
650 "vaccine_patients",
651 Some("edacc4a7-6600-4fbb-85f3-a62a5ce6761f".into()),
652 Some("c48f3e19-2c29-4ea3-b6f7-3899e53338fa".into())
653 )
654 ]
655 );
656 assert_eq!(result.next_page_token(), Some("..."));
657 }
658
659 #[traced_test]
660 #[tokio::test]
661 async fn list_tables_in_share_paginated() {
662 let server = MockServer::start();
663 let mock = server.mock(|when, then| {
664 when.method("GET")
665 .path("/shares/vaccine_share/all-tables")
666 .query_param("maxResults", "1")
667 .query_param("pageToken", "foo")
668 .header_exists("authorization");
669 then.status(200)
670 .header("content-type", "application/json")
671 .header("charset", "utf-8")
672 .body_from_file("./src/client/resources/list_tables_in_share.json");
673 });
674 let client = build_sharing_client(&server);
675 let share = Share::new("vaccine_share", None);
676
677 let result = client
678 .list_tables_in_share_paginated(&share, &Pagination::start(Some(1), Some("foo".into())))
679 .await
680 .unwrap();
681
682 mock.assert();
683 assert_eq!(
684 result.items(),
685 vec![
686 Table::new(
687 "vaccine_share",
688 "acme_vaccine_ingredient_data",
689 "vaccine_ingredients",
690 Some("edacc4a7-6600-4fbb-85f3-a62a5ce6761f".into()),
691 Some("2f9729e9-6fcf-4d34-96df-bf72b26dfbe9".into())
692 ),
693 Table::new(
694 "vaccine_share",
695 "acme_vaccine_patient_data",
696 "vaccine_patients",
697 Some("edacc4a7-6600-4fbb-85f3-a62a5ce6761f".into()),
698 Some("74be6365-0fc8-4a2f-8720-0de125bb5832".into())
699 )
700 ]
701 );
702 assert_eq!(result.next_page_token(), Some("..."));
703 }
704
705 #[traced_test]
706 #[tokio::test]
707 async fn get_table_version() {
708 let server = MockServer::start();
709 let mock = server.mock(|when, then| {
710 when.method("GET")
711 .path("/shares/vaccine_share/schemas/acme_vaccine_data/tables/vaccine_patients/version")
712 .query_param("startingTimestamp", "2022-01-01T00:00:00Z")
713 .header_exists("authorization");
714 then.status(200)
715 .header("delta-table-version", "123")
716 .header("content-type", "application/json")
717 .header("charset", "utf-8")
718 .body("");
719 });
720 let client = build_sharing_client(&server);
721 let table = Table::new(
722 "vaccine_share",
723 "acme_vaccine_data",
724 "vaccine_patients",
725 None,
726 None,
727 );
728 let starting_timestamp = Utc.with_ymd_and_hms(2022, 1, 1, 0, 0, 0).unwrap();
729
730 let result = client
731 .get_table_version(&table, Some(starting_timestamp))
732 .await
733 .unwrap();
734
735 mock.assert();
736 assert_eq!(result, 123);
737 }
738
739 #[traced_test]
740 #[tokio::test]
741 async fn get_table_metadata() {
742 let server = MockServer::start();
743 let mock = server.mock(|when, then| {
744 when.method("GET")
745 .path("/shares/vaccine_share/schemas/acme_vaccine_data/tables/vaccine_patients/metadata")
746 .header_exists("authorization");
747 then.status(200)
748 .header("content-type", "application/x-ndjson")
749 .header("charset", "utf-8")
750 .header("delta-table-version", "123")
751 .body_from_file("./src/client/resources/get_table_metadata.ndjson");
752 });
753 let client = build_sharing_client(&server);
754 let table = Table::new(
755 "vaccine_share",
756 "acme_vaccine_data",
757 "vaccine_patients",
758 None,
759 None,
760 );
761
762 let (protocol, metadata) = client.get_table_metadata(&table).await.unwrap();
763
764 mock.assert();
765 assert_eq!(protocol.min_reader_version(), 1);
766 assert_eq!(metadata.id(), "table_id");
767 assert_eq!(metadata.format().provider(), "parquet");
768 assert_eq!(metadata.schema_string(), "schema_as_string");
769 assert_eq!(metadata.partition_columns(), &["date"]);
770 }
771
772 #[traced_test]
773 #[tokio::test]
774 async fn get_table_data() {
775 let server = MockServer::start();
776 let mock = server.mock(|when, then| {
777 when.method("POST")
778 .path(
779 "/shares/vaccine_share/schemas/acme_vaccine_data/tables/vaccine_patients/query",
780 )
781 .json_body(json!({
782 "jsonPredicateHints": "{\"foo\": \"bar\"}",
783 "limitHint": "100"
784 }))
785 .header_exists("authorization");
786 then.status(200)
787 .header("content-type", "application/x-ndjson")
788 .header("charset", "utf-8")
789 .header("delta-table-version", "123")
790 .body_from_file("./src/client/resources/get_table_data.ndjson");
791 });
792 let client = build_sharing_client(&server);
793 let table = Table::new(
794 "vaccine_share",
795 "acme_vaccine_data",
796 "vaccine_patients",
797 None,
798 None,
799 );
800
801 let result = client
802 .get_table_data(&table, Some("{\"foo\": \"bar\"}".into()), Some(100))
803 .await
804 .unwrap();
805
806 mock.assert();
807 assert_eq!(result.len(), 2);
808 }
809
810 #[traced_test]
811 #[tokio::test]
812 async fn get_table_data_not_found() {
813 let server = MockServer::start();
814 let mock = server.mock(|when, then| {
815 when.method("POST")
816 .path("/shares/my_share/schemas/my_schema/tables/fake_table/query");
817 then.status(404)
818 .header("content-type", "application/json")
819 .header("charset", "utf-8")
820 .body(r#"{"errorCode": "RESOURCE_DOES_NOT_EXIST", "message": "[Share/Schema/Table] 'my_share/my_schema/fake_table' does not exist, please contact your share provider for further information."}"#);
821 });
822 let client = build_sharing_client(&server);
823 let table = "my_share.my_schema.fake_table".parse::<Table>().unwrap();
824
825 let result = client.get_table_data(&table, None, None).await;
826
827 mock.assert();
828 assert!(result.is_err());
829 assert!(result.unwrap_err().to_string().starts_with("[SHARING_CLIENT_ERROR] [RESOURCE_DOES_NOT_EXIST] [Share/Schema/Table] 'my_share/my_schema/fake_table' does not exist"));
830 }
831}