mod common;
use common::create_mocked_test_app;
use delta_sharing::protocol::*;
use std::path::Path;
use wiremock::matchers::{method, path};
use wiremock::{Mock, ResponseTemplate};
#[tokio::test]
async fn list_shares() {
let body =
r#"{"items": [ { "name": "share_1","id": "1" }, { "name": "share_2", "id": "2" } ]}"#;
let app = create_mocked_test_app(body, "/shares", method("GET")).await;
let shares = app.client.list_shares().await.unwrap();
assert_eq!(
shares.len(),
2,
"Expected {} items, got {}",
2,
shares.len()
);
}
#[tokio::test]
async fn list_schemas() {
let share = Share {
name: "share_1".to_string(),
};
let body = &format!(
r#"{{ "items": [ {{ "name":"schema_1", "share": "{0}" }}, {{ "name": "schema_2", "share": "{0}" }} ] }}"#,
share.name
);
let url = format!("/shares/{}/schemas", share.name);
let app = create_mocked_test_app(body, &url, method("GET")).await;
let schemas = app.client.list_schemas(&share).await.unwrap();
assert_eq!(
schemas.len(),
2,
"Expected {} items, got {}",
2,
schemas.len()
);
}
#[tokio::test]
async fn list_tables() {
let schema = Schema {
name: "share_1".to_string(),
share: "schema_1".to_string(),
};
let body = &format!(
r#"{{ "items": [ {{ "name":"table_1", "share": "{0}", "schema": "{1}" }}, {{ "name": "table_2", "share": "{0}", "schema": "{1}" }} ] }}"#,
schema.share, schema.name
);
let url = format!("shares/{}/schemas/{}/tables", schema.share, schema.name);
let app = create_mocked_test_app(body, &url, method("GET")).await;
let tables = app.client.list_tables(&schema).await.unwrap();
assert_eq!(
tables.len(),
2,
"Expected {} items, got {}",
2,
tables.len()
);
}
#[tokio::test]
async fn list_all_tables() {
let share = Share {
name: "share_1".to_string(),
};
let body = &format!(
r#"{{ "items": [ {{ "name":"table_1", "share": "{0}", "schema": "{1}" }}, {{ "name": "table_2", "share": "{0}", "schema": "{1}" }} ] }}"#,
share.name, "schema_1"
);
let url = format!("shares/{}/all-tables", share.name);
let app = create_mocked_test_app(body, &url, method("GET")).await;
let tables = app.client.list_all_tables(&share).await.unwrap();
assert_eq!(
tables.len(),
2,
"Expected {} items, got {}",
2,
tables.len()
);
}
#[tokio::test]
async fn get_table_metadata() {
let table = Table {
name: "table_1".to_string(),
share: "share_1".to_string(),
schema: "schema_1".to_string(),
};
let body = &format!(
r#"{{ "protocol": {} }}
{{ "metaData": {} }}"#,
common::TEST_PROTOCOL_RESPONSE,
common::TEST_METADATA_RESPONSE,
);
let url = format!(
"shares/{}/schemas/{}/tables/{}/metadata",
table.share, table.schema, table.name
);
let app = create_mocked_test_app(body, &url, method("GET")).await;
let meta = app.client.get_table_metadata(&table).await.unwrap();
match meta.protocol {
Protocol::Delta { .. } => assert!(false, "Wrong protocol deserialization"),
Protocol::Parquet { min_reader_version } => {
assert_eq!(min_reader_version, 1, "Protocol mismatch")
}
};
match meta.metadata {
Metadata::Delta { .. } => assert!(false, "Wrong metadata deserialization"),
Metadata::Parquet(ParquetMetadata {
id,
format,
name,
partition_columns,
configuration,
..
}) => {
assert_eq!(
id, "cf9c9342-b773-4c7b-a217-037d02ffe5d8",
"Metadata ID mismatch"
);
assert_eq!(
format.provider, "parquet",
"Metadata format provider mismatch"
);
assert_eq!(name, None, "Metadata name value should be missing");
assert_eq!(partition_columns.len(), 0, "There should be no partitions");
assert_eq!(
configuration["conf_1_name"], "conf_1_value",
"Configuration value expected"
);
}
};
}
#[tokio::test]
async fn get_table_version() {
let table = Table {
name: "table_1".to_string(),
share: "share_1".to_string(),
schema: "schema_1".to_string(),
};
let expected_version = "3";
let url = format!(
"shares/{}/schemas/{}/tables/{}",
table.share, table.schema, table.name
);
let app = common::create_test_app().await;
let response =
ResponseTemplate::new(200).insert_header("delta-table-version", expected_version);
Mock::given(path(url))
.and(method("HEAD"))
.respond_with(response)
.expect(1)
.mount(&app.server)
.await;
let version = app.client.get_table_version(&table).await;
assert_eq!(
&format!("{}", version),
expected_version,
"Table version mismatch"
);
}
#[tokio::test]
async fn list_all_table_files() {
let table = Table {
name: "table_1".to_string(),
share: "share_1".to_string(),
schema: "schema_1".to_string(),
};
let body = &format!(
r#"{{ "protocol": {} }}
{{ "metaData": {} }}
{{ "file": {} }}
{{ "file": {} }}"#,
common::TEST_PROTOCOL_RESPONSE,
common::TEST_METADATA_RESPONSE,
common::TEST_FILE_RESPONSE,
common::TEST_FILE_RESPONSE.replace("\"id\": \"1\"", "\"id\": \"2\"")
);
let url = format!(
"shares/{}/schemas/{}/tables/{}/query",
table.share, table.schema, table.name
);
let app = create_mocked_test_app(body, &url, method("POST")).await;
let files = app.client.list_table_files(&table, None).await.unwrap();
assert_eq!(files.files.len(), 2, "File count mismatch");
match &files.files[1] {
File::Parquet(ParquetFile { id, .. }) => assert_eq!(id, "2", "File id mismatch"),
File::Delta(DeltaFile { .. }) => assert!(false, "Wrong file deserialization"),
};
}
#[tokio::test]
async fn get_files() {
use std::path::Path;
let table = Table {
name: "table_1".to_string(),
share: "share_1".to_string(),
schema: "schema_1".to_string(),
};
let app = common::create_test_app().await;
let list_files_url = format!(
"shares/{}/schemas/{}/tables/{}/query",
table.share, table.schema, table.name
);
let mut file: ParquetFile =
serde_json::from_str(common::TEST_FILE_RESPONSE).expect("Invalid file info");
let file_url_path = "/shares/test.parquet";
file.url = format!("{}{}", &app.server.uri(), &file_url_path);
let list_files_body = &format!(
r#"{{ "protocol": {} }}
{{ "metaData": {} }}
{{ "file": {} }}"#,
common::TEST_PROTOCOL_RESPONSE,
common::TEST_METADATA_RESPONSE,
serde_json::to_string(&file).unwrap()
);
let list_files_response = ResponseTemplate::new(200).set_body_string(list_files_body);
Mock::given(path(list_files_url))
.and(method("POST"))
.respond_with(list_files_response)
.expect(1)
.mount(&app.server)
.await;
let parquet_local_path =
std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("resources/test/test.parquet");
let file_content = std::fs::read(parquet_local_path).unwrap();
let file_response = ResponseTemplate::new(200).set_body_bytes(file_content);
Mock::given(path(file_url_path))
.and(method("GET"))
.respond_with(file_response)
.expect(1)
.mount(&app.server)
.await;
let mut c = app.client;
c.data_root = common::get_random_location(Path::new(env!("CARGO_TARGET_TMPDIR")))
.to_str()
.unwrap()
.to_string();
let expected_path = Path::new(&c.data_root)
.join(table.fully_qualified_name())
.join(format!("{}.snappy.parquet", &file.id));
assert!(!Path::exists(&expected_path), "File should not exist");
let files = c.get_files(&table, None).await.unwrap();
assert_eq!(files.len(), 1, "File count mismatch");
assert_eq!(files[0], expected_path, "File path mismatch");
assert!(Path::exists(&expected_path), "File should exist");
}
#[tokio::test]
async fn get_dataframe() {
let table = Table {
name: "table_1".to_string(),
share: "share_1".to_string(),
schema: "schema_1".to_string(),
};
let app = common::create_test_app().await;
let list_files_url = format!(
"shares/{}/schemas/{}/tables/{}/query",
table.share, table.schema, table.name
);
let mut file: ParquetFile =
serde_json::from_str(common::TEST_FILE_RESPONSE).expect("Invalid file info");
let file_url_path = "/shares/test.parquet";
file.url = format!("{}{}", &app.server.uri(), &file_url_path);
let list_files_body = &format!(
r#"{{ "protocol": {} }}
{{ "metaData": {} }}
{{ "file": {} }}"#,
common::TEST_PROTOCOL_RESPONSE,
common::TEST_METADATA_RESPONSE,
serde_json::to_string(&file).unwrap()
);
let list_files_response = ResponseTemplate::new(200).set_body_string(list_files_body);
Mock::given(path(list_files_url))
.and(method("POST"))
.respond_with(list_files_response)
.expect(2)
.mount(&app.server)
.await;
let parquet_local_path =
std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("resources/test/test.parquet");
let file_content = std::fs::read(parquet_local_path).unwrap();
let file_response = ResponseTemplate::new(200).set_body_bytes(file_content);
Mock::given(path(file_url_path))
.and(method("GET"))
.respond_with(file_response)
.expect(1)
.mount(&app.server)
.await;
let mut c = app.client;
c.data_root = common::get_random_location(Path::new(env!("CARGO_TARGET_TMPDIR")))
.to_str()
.unwrap()
.to_string();
let df = c
.get_dataframe(&table, None)
.await
.unwrap()
.collect()
.unwrap();
assert_eq!(df.shape(), (5, 3), "Dataframe shape mismatch");
let df1 = c
.get_dataframe(&table, None)
.await
.unwrap()
.collect()
.unwrap();
assert_eq!(df1.shape(), (5, 3), "Dataframe shape mismatch");
assert_eq!(
df1.get_row(0).unwrap().0[1],
polars::datatypes::AnyValue::String("One"),
"Row value mismatch"
);
}