datafusion_delta_sharing/client/
mod.rs

1//! Delta Sharing Client
2
3use std::collections::HashMap;
4
5use chrono::{DateTime, Utc};
6use reqwest::{Client, Method, Response, Url};
7use serde_json::Deserializer;
8use tracing::{debug, info, trace, warn};
9
10use crate::{
11    client::response::GetShareResponse,
12    error::DeltaSharingError,
13    profile::{DeltaSharingProfileExt, Profile},
14    securable::{Schema, Share, Table},
15};
16
17use {
18    action::{File, Metadata, Protocol},
19    pagination::{Pagination, PaginationExt},
20    response::{
21        ErrorResponse, ListSchemasResponse, ListSharesResponse, ListTablesResponse, ParquetResponse,
22    },
23};
24
25pub mod action;
26pub mod pagination;
27pub mod response;
28
29const QUERY_PARAM_VERSION_TIMESTAMP: &str = "startingTimestamp";
30
31/// Delta Sharing client
32#[derive(Debug, Clone)]
33pub struct DeltaSharingClient {
34    client: Client,
35    profile: Profile,
36}
37
38impl DeltaSharingClient {
39    /// Create a new Delta Sharing client
40    pub fn new(profile: Profile) -> Self {
41        Self {
42            client: Client::new(),
43            profile,
44        }
45    }
46
47    /// Retrieve the profile of the client
48    pub fn profile(&self) -> &Profile {
49        &self.profile
50    }
51
52    /// List shares with pagination
53    pub async fn list_shares_paginated(
54        &self,
55        pagination: &Pagination,
56    ) -> Result<ListSharesResponse, DeltaSharingError> {
57        let mut url = self.profile.endpoint().clone();
58        url.path_segments_mut()
59            .expect("valid base")
60            .pop_if_empty()
61            .push("shares");
62        url.set_pagination(pagination);
63        trace!("URL: {}", url);
64
65        let response = self.request(Method::GET, url).await?;
66        let status = response.status();
67
68        if status.is_success() {
69            info!("list shares status: {:?}", status);
70            Ok(response.json::<ListSharesResponse>().await?)
71        } else {
72            warn!("list shares status: {:?}", status);
73            let err = response.json::<ErrorResponse>().await?;
74            if status.is_client_error() {
75                Err(DeltaSharingError::client(err.to_string()))
76            } else {
77                Err(DeltaSharingError::server(err.to_string()))
78            }
79        }
80    }
81
82    /// List all available shares
83    pub async fn list_shares(&self) -> Result<Vec<Share>, DeltaSharingError> {
84        let mut shares = vec![];
85        let mut pagination = Pagination::default();
86        loop {
87            let response = self.list_shares_paginated(&pagination).await?;
88            pagination.set_next_token(response.next_page_token().map(ToOwned::to_owned));
89            shares.extend(response);
90            if pagination.is_finished() {
91                break;
92            }
93        }
94        Ok(shares)
95    }
96
97    /// Retrieve a share
98    pub async fn get_share(&self, share: &Share) -> Result<Share, DeltaSharingError> {
99        let url = url_for_share(self.profile.endpoint().clone(), share, None);
100        trace!("URL: {}", url);
101
102        let response = self.request(Method::GET, url).await?;
103        let status = response.status();
104
105        if status.is_success() {
106            info!("get share status: {:?}", status);
107            let res = response.json::<GetShareResponse>().await?;
108            Ok(res.share().clone())
109        } else {
110            warn!("get share status: {:?}", status);
111            let err = response.json::<ErrorResponse>().await?;
112            if status.is_client_error() {
113                Err(DeltaSharingError::client(err.to_string()))
114            } else {
115                Err(DeltaSharingError::server(err.to_string()))
116            }
117        }
118    }
119
120    /// List schemas with pagination
121    pub async fn list_schemas_paginated(
122        &self,
123        share: &Share,
124        pagination: &Pagination,
125    ) -> Result<ListSchemasResponse, DeltaSharingError> {
126        let mut url = url_for_share(self.profile.endpoint().clone(), share, Some("schemas"));
127        url.set_pagination(pagination);
128        trace!("URL: {}", url);
129
130        let response = self.request(Method::GET, url).await?;
131        let status = response.status();
132
133        if status.is_success() {
134            info!("list schemas status: {:?}", status);
135            Ok(response.json::<ListSchemasResponse>().await?)
136        } else {
137            warn!("list schemas status: {:?}", status);
138            let err = response.json::<ErrorResponse>().await?;
139            if status.is_client_error() {
140                Err(DeltaSharingError::client(err.to_string()))
141            } else {
142                Err(DeltaSharingError::server(err.to_string()))
143            }
144        }
145    }
146
147    /// List all available schemas
148    pub async fn list_schemas(&self, share: &Share) -> Result<Vec<Schema>, DeltaSharingError> {
149        let mut schemas = vec![];
150        let mut pagination = Pagination::default();
151        loop {
152            let response = self.list_schemas_paginated(share, &pagination).await?;
153            pagination.set_next_token(response.next_page_token().map(ToOwned::to_owned));
154            schemas.extend(response);
155            if pagination.is_finished() {
156                break;
157            }
158        }
159        Ok(schemas)
160    }
161
162    /// List tables in schema with pagination
163    pub async fn list_tables_in_schema_paginated(
164        &self,
165        schema: &Schema,
166        pagination: &Pagination,
167    ) -> Result<ListTablesResponse, DeltaSharingError> {
168        let mut url = url_for_schema(self.profile.endpoint().clone(), schema, Some("tables"));
169        url.set_pagination(pagination);
170        trace!("URL: {}", url);
171
172        let response = self.request(Method::GET, url).await?;
173        let status = response.status();
174
175        if status.is_success() {
176            info!("list tables in schema status: {:?}", status);
177            Ok(response.json::<ListTablesResponse>().await?)
178        } else {
179            warn!("list tables in schema status: {:?}", status);
180            let err = response.json::<ErrorResponse>().await?;
181            if status.is_client_error() {
182                Err(DeltaSharingError::client(err.to_string()))
183            } else {
184                Err(DeltaSharingError::server(err.to_string()))
185            }
186        }
187    }
188
189    /// List all available tables in schema
190    pub async fn list_tables(&self, schema: &Schema) -> Result<Vec<Table>, DeltaSharingError> {
191        let mut tables = vec![];
192        let mut pagination = Pagination::default();
193        loop {
194            let response = self
195                .list_tables_in_schema_paginated(schema, &pagination)
196                .await?;
197            pagination.set_next_token(response.next_page_token().map(ToOwned::to_owned));
198            tables.extend(response);
199            if pagination.is_finished() {
200                break;
201            }
202        }
203        Ok(tables)
204    }
205
206    /// List tables in share with pagination
207    async fn list_tables_in_share_paginated(
208        &self,
209        share: &Share,
210        pagination: &Pagination,
211    ) -> Result<ListTablesResponse, DeltaSharingError> {
212        let mut url = url_for_share(self.profile.endpoint().clone(), share, Some("all-tables"));
213        url.set_pagination(pagination);
214        trace!("URL: {}", url);
215
216        let response = self.request(Method::GET, url).await?;
217        let status = response.status();
218
219        if status.is_success() {
220            info!("list tables in share status: {:?}", status);
221            Ok(response.json::<ListTablesResponse>().await?)
222        } else {
223            warn!("list tables in share status: {:?}", status);
224            let err = response.json::<ErrorResponse>().await?;
225            if status.is_client_error() {
226                Err(DeltaSharingError::client(err.to_string()))
227            } else {
228                Err(DeltaSharingError::server(err.to_string()))
229            }
230        }
231    }
232
233    /// List all available tables in share
234    pub async fn list_all_tables(&self, share: &Share) -> Result<Vec<Table>, DeltaSharingError> {
235        let mut tables = vec![];
236        let mut pagination = Pagination::default();
237        loop {
238            let response = self
239                .list_tables_in_share_paginated(share, &pagination)
240                .await?;
241            pagination.set_next_token(response.next_page_token().map(ToOwned::to_owned));
242            tables.extend(response);
243            if pagination.is_finished() {
244                break;
245            }
246        }
247        Ok(tables)
248    }
249
250    /// Retrieve the version of a table
251    pub async fn get_table_version(
252        &self,
253        table: &Table,
254        starting_timestamp: Option<DateTime<Utc>>,
255    ) -> Result<u64, DeltaSharingError> {
256        let mut url = url_for_table(self.profile.endpoint().clone(), table, Some("version"));
257        if let Some(ts) = starting_timestamp {
258            url.query_pairs_mut().append_pair(
259                QUERY_PARAM_VERSION_TIMESTAMP,
260                &ts.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
261            );
262        }
263        trace!("URL: {}", url);
264
265        let response = self.request(Method::GET, url).await?;
266        let status = response.status();
267
268        if status.is_success() {
269            info!("get table version status: {:?}", status);
270            parse_table_version(&response)
271        } else {
272            warn!("get table version status: {:?}", status);
273            let err = response.json::<ErrorResponse>().await?;
274            if status.is_client_error() {
275                Err(DeltaSharingError::client(err.to_string()))
276            } else {
277                Err(DeltaSharingError::server(err.to_string()))
278            }
279        }
280    }
281
282    /// Retrieve the metadata of a table
283    pub async fn get_table_metadata(
284        &self,
285        table: &Table,
286    ) -> Result<(Protocol, Metadata), DeltaSharingError> {
287        let url = url_for_table(self.profile.endpoint().clone(), table, Some("metadata"));
288        trace!("requesting: {}", url);
289
290        let response = self.request(Method::GET, url).await?;
291        let status = response.status();
292
293        if !status.is_success() {
294            warn!("get table metadata status: {:?}", status);
295            let res = response.json::<ErrorResponse>().await.unwrap();
296            if status.is_client_error() {
297                Err(DeltaSharingError::client(res.to_string()))
298            } else {
299                Err(DeltaSharingError::server(res.to_string()))
300            }
301        } else {
302            info!("get table metadata status: {:?}", status);
303            let full = response.bytes().await?;
304            let mut lines = Deserializer::from_slice(&full).into_iter::<ParquetResponse>();
305
306            let protocol = lines
307                .next()
308                .and_then(Result::ok)
309                .and_then(ParquetResponse::to_protocol)
310                .ok_or(DeltaSharingError::parse_response("parsing protocol failed"))?;
311            let metadata = lines
312                .next()
313                .and_then(Result::ok)
314                .and_then(ParquetResponse::to_metadata)
315                .ok_or(DeltaSharingError::parse_response("parsing metadata failed"))?;
316
317            Ok((protocol, metadata))
318        }
319    }
320
321    /// Retrieve the data of a table
322    pub async fn get_table_data(
323        &self,
324        table: &Table,
325        predicates: Option<String>,
326        limit: Option<u32>,
327    ) -> Result<Vec<File>, DeltaSharingError> {
328        let url = url_for_table(self.profile.endpoint().clone(), table, Some("query"));
329        trace!("requesting: {}", url);
330
331        let mut body: HashMap<String, String> = HashMap::new();
332        if let Some(pred) = predicates {
333            body.insert("jsonPredicateHints".to_string(), pred);
334        }
335        if let Some(lim) = limit {
336            body.insert("limitHint".to_string(), lim.to_string());
337        }
338        debug!("body: {:?}", body);
339
340        let response = self
341            .client
342            .request(Method::POST, url)
343            .json(&body)
344            .authorize_with_profile(&self.profile)?
345            .send()
346            .await?;
347        let status = response.status();
348
349        if !status.is_success() {
350            warn!("get table data status: {:?}", status);
351            let err = response.json::<ErrorResponse>().await.unwrap();
352            if status.is_client_error() {
353                Err(DeltaSharingError::client(err.to_string()))
354            } else {
355                Err(DeltaSharingError::server(err.to_string()))
356            }
357        } else {
358            let full = response.bytes().await?;
359            let mut lines = Deserializer::from_slice(&full).into_iter::<ParquetResponse>();
360
361            let _ = lines
362                .next()
363                .and_then(Result::ok)
364                .and_then(ParquetResponse::to_protocol)
365                .ok_or(DeltaSharingError::parse_response("parsing protocol failed"))?;
366            let _ = lines
367                .next()
368                .and_then(Result::ok)
369                .and_then(ParquetResponse::to_metadata)
370                .ok_or(DeltaSharingError::parse_response("parsing metadata failed"))?;
371
372            let mut files = vec![];
373            for line in lines {
374                let file = line.ok().and_then(ParquetResponse::to_file);
375                if let Some(f) = file {
376                    files.push(f);
377                }
378            }
379
380            Ok(files)
381        }
382    }
383
384    async fn _get_table_changes(&self, _table: &Table) {
385        todo!()
386    }
387
388    async fn request(&self, method: Method, url: Url) -> Result<Response, DeltaSharingError> {
389        self.client
390            .request(method, url)
391            .authorize_with_profile(&self.profile)
392            .unwrap()
393            .send()
394            .await
395            .map_err(Into::into)
396    }
397}
398
399fn url_for_share(endpoint: Url, share: &Share, res: Option<&str>) -> Url {
400    let mut url = endpoint;
401    url.path_segments_mut()
402        .expect("valid base")
403        .pop_if_empty()
404        .push("shares")
405        .push(share.name());
406    if let Some(r) = res {
407        url.path_segments_mut().expect("valid base").push(r);
408    }
409    url
410}
411
412fn url_for_schema(endpoint: Url, schema: &Schema, res: Option<&str>) -> Url {
413    let mut url = endpoint;
414    url.path_segments_mut()
415        .expect("valid base")
416        .pop_if_empty()
417        .extend(&["shares", schema.share_name(), "schemas", schema.name()]);
418    if let Some(r) = res {
419        url.path_segments_mut().expect("valid base").push(r);
420    }
421    url
422}
423
424fn url_for_table(endpoint: Url, table: &Table, res: Option<&str>) -> Url {
425    let mut url = endpoint;
426    url.path_segments_mut()
427        .expect("valid base")
428        .pop_if_empty()
429        .extend(&[
430            "shares",
431            table.share_name(),
432            "schemas",
433            table.schema_name(),
434            "tables",
435            table.name(),
436        ]);
437    if let Some(r) = res {
438        url.path_segments_mut().expect("valid base").push(r);
439    }
440    url
441}
442
443fn parse_table_version(response: &Response) -> Result<u64, DeltaSharingError> {
444    response
445        .headers()
446        .get("Delta-Table-Version")
447        .and_then(|v| v.to_str().ok())
448        .and_then(|v| v.parse::<u64>().ok())
449        .ok_or(DeltaSharingError::parse_response("parsing version failed"))
450}
451
452#[cfg(test)]
453mod test {
454    use chrono::{TimeZone, Utc};
455    use httpmock::MockServer;
456    use serde_json::json;
457    use tracing_test::traced_test;
458
459    use crate::profile::ProfileType;
460
461    use super::*;
462
463    #[test]
464    fn test_url_for_share() {
465        let endpoint = Url::parse("https://example.com/prefix").unwrap();
466        let share = Share::new("my-share", None);
467        let url = url_for_share(endpoint.clone(), &share, None);
468        assert_eq!(url.as_str(), "https://example.com/prefix/shares/my-share");
469
470        let url = url_for_share(endpoint.clone(), &share, Some("res"));
471        assert_eq!(
472            url.as_str(),
473            "https://example.com/prefix/shares/my-share/res"
474        );
475    }
476
477    #[test]
478    fn test_url_for_schema() {
479        let endpoint = Url::parse("https://example.com/prefix/").unwrap();
480        let schema = Schema::new("my-share", "my-schema");
481        let url = url_for_schema(endpoint.clone(), &schema, None);
482        assert_eq!(
483            url.as_str(),
484            "https://example.com/prefix/shares/my-share/schemas/my-schema"
485        );
486
487        let url = url_for_schema(endpoint.clone(), &schema, Some("res"));
488        assert_eq!(
489            url.as_str(),
490            "https://example.com/prefix/shares/my-share/schemas/my-schema/res"
491        );
492    }
493
494    #[test]
495    fn test_url_for_table() {
496        let endpoint = Url::parse("https://example.com/prefix").unwrap();
497        let table = Table::new("my-share", "my-schema", "my-table", None, None);
498        let url = url_for_table(endpoint.clone(), &table, None);
499        assert_eq!(
500            url.as_str(),
501            "https://example.com/prefix/shares/my-share/schemas/my-schema/tables/my-table"
502        );
503
504        let url = url_for_table(endpoint.clone(), &table, Some("res"));
505        assert_eq!(
506            url.as_str(),
507            "https://example.com/prefix/shares/my-share/schemas/my-schema/tables/my-table/res"
508        );
509    }
510
511    fn build_sharing_client(server: &MockServer) -> DeltaSharingClient {
512        let profile_type = ProfileType::new_bearer_token("test-token", None);
513        let mock_server_url = server.base_url().parse::<Url>().unwrap();
514        let profile: Profile = Profile::from_profile_type(1, mock_server_url, profile_type);
515        DeltaSharingClient::new(profile)
516    }
517
518    #[traced_test]
519    #[tokio::test]
520    async fn list_shares_paginated() {
521        let server = MockServer::start();
522        let mock = server.mock(|when, then| {
523            when.method("GET")
524                .path("/shares")
525                .query_param("maxResults", "1")
526                .query_param("pageToken", "foo")
527                .header_exists("authorization");
528            then.status(200)
529                .header("content-type", "application/json")
530                .header("charset", "utf-8")
531                .body_from_file("./src/client/resources/list_shares.json");
532        });
533        let client = build_sharing_client(&server);
534
535        let result = client
536            .list_shares_paginated(&Pagination::start(Some(1), Some("foo".into())))
537            .await
538            .unwrap();
539
540        mock.assert();
541        assert_eq!(
542            result.items(),
543            vec![
544                Share::new(
545                    "vaccine_share",
546                    Some("edacc4a7-6600-4fbb-85f3-a62a5ce6761f")
547                ),
548                Share::new("sales_share", Some("3e979c79-6399-4dac-bcf8-54e268f48515"))
549            ]
550        );
551        assert_eq!(result.next_page_token(), Some("..."));
552    }
553
554    #[traced_test]
555    #[tokio::test]
556    async fn get_share() {
557        let server = MockServer::start();
558        let mock = server.mock(|when, then| {
559            when.method("GET")
560                .path("/shares/vaccine_share")
561                .header_exists("authorization");
562            then.status(200)
563                .header("content-type", "application/json")
564                .header("charset", "utf-8")
565                .body_from_file("./src/client/resources/get_share.json");
566        });
567        let client = build_sharing_client(&server);
568        let share = Share::new(
569            "vaccine_share",
570            Some("edacc4a7-6600-4fbb-85f3-a62a5ce6761f"),
571        );
572
573        let result = client.get_share(&share).await.unwrap();
574
575        mock.assert();
576        assert_eq!(result, share);
577    }
578
579    #[traced_test]
580    #[tokio::test]
581    async fn list_schemas_paginated() {
582        let server = MockServer::start();
583        let mock = server.mock(|when, then| {
584            when.method("GET")
585                .path("/shares/vaccine_share/schemas")
586                .query_param("maxResults", "1")
587                .query_param("pageToken", "foo")
588                .header_exists("authorization");
589            then.status(200)
590                .header("content-type", "application/json")
591                .header("charset", "utf-8")
592                .body_from_file("./src/client/resources/list_schemas.json");
593        });
594        let client = build_sharing_client(&server);
595        let share = Share::new("vaccine_share", None);
596
597        let result = client
598            .list_schemas_paginated(&share, &Pagination::start(Some(1), Some("foo".into())))
599            .await
600            .unwrap();
601
602        mock.assert();
603        assert_eq!(
604            result.items(),
605            vec![Schema::new("vaccine_share", "acme_vaccine_data")]
606        );
607        assert_eq!(result.next_page_token(), Some("..."));
608    }
609
610    #[traced_test]
611    #[tokio::test]
612    async fn list_tables_in_schema_paginated() {
613        let server = MockServer::start();
614        let mock = server.mock(|when, then| {
615            when.method("GET")
616                .path("/shares/vaccine_share/schemas/acme_vaccine_data/tables")
617                .query_param("maxResults", "1")
618                .query_param("pageToken", "foo")
619                .header_exists("authorization");
620            then.status(200)
621                .header("content-type", "application/json")
622                .header("charset", "utf-8")
623                .body_from_file("./src/client/resources/list_tables_in_schema.json");
624        });
625        let client = build_sharing_client(&server);
626        let schema = Schema::new("vaccine_share", "acme_vaccine_data");
627
628        let result = client
629            .list_tables_in_schema_paginated(
630                &schema,
631                &Pagination::start(Some(1), Some("foo".into())),
632            )
633            .await
634            .unwrap();
635
636        mock.assert();
637        assert_eq!(
638            result.items(),
639            vec![
640                Table::new(
641                    "vaccine_share",
642                    "acme_vaccine_data",
643                    "vaccine_ingredients",
644                    Some("edacc4a7-6600-4fbb-85f3-a62a5ce6761f".into()),
645                    Some("dcb1e680-7da4-4041-9be8-88aff508d001".into())
646                ),
647                Table::new(
648                    "vaccine_share",
649                    "acme_vaccine_data",
650                    "vaccine_patients",
651                    Some("edacc4a7-6600-4fbb-85f3-a62a5ce6761f".into()),
652                    Some("c48f3e19-2c29-4ea3-b6f7-3899e53338fa".into())
653                )
654            ]
655        );
656        assert_eq!(result.next_page_token(), Some("..."));
657    }
658
659    #[traced_test]
660    #[tokio::test]
661    async fn list_tables_in_share_paginated() {
662        let server = MockServer::start();
663        let mock = server.mock(|when, then| {
664            when.method("GET")
665                .path("/shares/vaccine_share/all-tables")
666                .query_param("maxResults", "1")
667                .query_param("pageToken", "foo")
668                .header_exists("authorization");
669            then.status(200)
670                .header("content-type", "application/json")
671                .header("charset", "utf-8")
672                .body_from_file("./src/client/resources/list_tables_in_share.json");
673        });
674        let client = build_sharing_client(&server);
675        let share = Share::new("vaccine_share", None);
676
677        let result = client
678            .list_tables_in_share_paginated(&share, &Pagination::start(Some(1), Some("foo".into())))
679            .await
680            .unwrap();
681
682        mock.assert();
683        assert_eq!(
684            result.items(),
685            vec![
686                Table::new(
687                    "vaccine_share",
688                    "acme_vaccine_ingredient_data",
689                    "vaccine_ingredients",
690                    Some("edacc4a7-6600-4fbb-85f3-a62a5ce6761f".into()),
691                    Some("2f9729e9-6fcf-4d34-96df-bf72b26dfbe9".into())
692                ),
693                Table::new(
694                    "vaccine_share",
695                    "acme_vaccine_patient_data",
696                    "vaccine_patients",
697                    Some("edacc4a7-6600-4fbb-85f3-a62a5ce6761f".into()),
698                    Some("74be6365-0fc8-4a2f-8720-0de125bb5832".into())
699                )
700            ]
701        );
702        assert_eq!(result.next_page_token(), Some("..."));
703    }
704
705    #[traced_test]
706    #[tokio::test]
707    async fn get_table_version() {
708        let server = MockServer::start();
709        let mock = server.mock(|when, then| {
710            when.method("GET")
711                .path("/shares/vaccine_share/schemas/acme_vaccine_data/tables/vaccine_patients/version")
712                .query_param("startingTimestamp", "2022-01-01T00:00:00Z")
713                .header_exists("authorization");
714            then.status(200)
715                .header("delta-table-version", "123")
716                .header("content-type", "application/json")
717                .header("charset", "utf-8")
718                .body("");
719        });
720        let client = build_sharing_client(&server);
721        let table = Table::new(
722            "vaccine_share",
723            "acme_vaccine_data",
724            "vaccine_patients",
725            None,
726            None,
727        );
728        let starting_timestamp = Utc.with_ymd_and_hms(2022, 1, 1, 0, 0, 0).unwrap();
729
730        let result = client
731            .get_table_version(&table, Some(starting_timestamp))
732            .await
733            .unwrap();
734
735        mock.assert();
736        assert_eq!(result, 123);
737    }
738
739    #[traced_test]
740    #[tokio::test]
741    async fn get_table_metadata() {
742        let server = MockServer::start();
743        let mock = server.mock(|when, then| {
744            when.method("GET")
745                .path("/shares/vaccine_share/schemas/acme_vaccine_data/tables/vaccine_patients/metadata")
746                .header_exists("authorization");
747            then.status(200)
748                .header("content-type", "application/x-ndjson")
749                .header("charset", "utf-8")
750                .header("delta-table-version", "123")
751                .body_from_file("./src/client/resources/get_table_metadata.ndjson");
752        });
753        let client = build_sharing_client(&server);
754        let table = Table::new(
755            "vaccine_share",
756            "acme_vaccine_data",
757            "vaccine_patients",
758            None,
759            None,
760        );
761
762        let (protocol, metadata) = client.get_table_metadata(&table).await.unwrap();
763
764        mock.assert();
765        assert_eq!(protocol.min_reader_version(), 1);
766        assert_eq!(metadata.id(), "table_id");
767        assert_eq!(metadata.format().provider(), "parquet");
768        assert_eq!(metadata.schema_string(), "schema_as_string");
769        assert_eq!(metadata.partition_columns(), &["date"]);
770    }
771
772    #[traced_test]
773    #[tokio::test]
774    async fn get_table_data() {
775        let server = MockServer::start();
776        let mock = server.mock(|when, then| {
777            when.method("POST")
778                .path(
779                    "/shares/vaccine_share/schemas/acme_vaccine_data/tables/vaccine_patients/query",
780                )
781                .json_body(json!({
782                    "jsonPredicateHints": "{\"foo\": \"bar\"}",
783                    "limitHint": "100"
784                }))
785                .header_exists("authorization");
786            then.status(200)
787                .header("content-type", "application/x-ndjson")
788                .header("charset", "utf-8")
789                .header("delta-table-version", "123")
790                .body_from_file("./src/client/resources/get_table_data.ndjson");
791        });
792        let client = build_sharing_client(&server);
793        let table = Table::new(
794            "vaccine_share",
795            "acme_vaccine_data",
796            "vaccine_patients",
797            None,
798            None,
799        );
800
801        let result = client
802            .get_table_data(&table, Some("{\"foo\": \"bar\"}".into()), Some(100))
803            .await
804            .unwrap();
805
806        mock.assert();
807        assert_eq!(result.len(), 2);
808    }
809
810    #[traced_test]
811    #[tokio::test]
812    async fn get_table_data_not_found() {
813        let server = MockServer::start();
814        let mock = server.mock(|when, then| {
815            when.method("POST")
816                .path("/shares/my_share/schemas/my_schema/tables/fake_table/query");
817            then.status(404)
818                .header("content-type", "application/json")
819                .header("charset", "utf-8")
820                .body(r#"{"errorCode": "RESOURCE_DOES_NOT_EXIST", "message": "[Share/Schema/Table] 'my_share/my_schema/fake_table' does not exist, please contact your share provider for further information."}"#);
821        });
822        let client = build_sharing_client(&server);
823        let table = "my_share.my_schema.fake_table".parse::<Table>().unwrap();
824
825        let result = client.get_table_data(&table, None, None).await;
826
827        mock.assert();
828        assert!(result.is_err());
829        assert!(result.unwrap_err().to_string().starts_with("[SHARING_CLIENT_ERROR] [RESOURCE_DOES_NOT_EXIST] [Share/Schema/Table] 'my_share/my_schema/fake_table' does not exist"));
830    }
831}