#![allow(rustdoc::invalid_html_tags)]
#![allow(clippy::nonminimal_bool)]
pub mod data_catalog;
pub mod errors;
pub mod kernel;
pub mod logstore;
pub mod operations;
pub mod protocol;
pub mod schema;
pub mod storage;
pub mod table;
#[cfg(test)]
pub mod test_utils;
#[cfg(feature = "datafusion")]
pub mod delta_datafusion;
pub mod writer;
use std::collections::HashMap;
pub use self::data_catalog::{DataCatalog, DataCatalogError};
pub use self::errors::*;
pub use self::schema::partitions::*;
pub use self::schema::*;
pub use self::table::builder::{DeltaTableBuilder, DeltaTableConfig, DeltaVersion};
pub use self::table::config::TableProperty;
pub use self::table::DeltaTable;
pub use object_store::{path::Path, Error as ObjectStoreError, ObjectMeta, ObjectStore};
pub use operations::DeltaOps;
pub use arrow;
#[cfg(feature = "datafusion")]
pub use datafusion;
pub use parquet;
pub use protocol::checkpoints;
pub async fn open_table(table_uri: impl AsRef<str>) -> Result<DeltaTable, DeltaTableError> {
    let table = DeltaTableBuilder::from_valid_uri(table_uri)?.load().await?;
    Ok(table)
}
pub async fn open_table_with_storage_options(
    table_uri: impl AsRef<str>,
    storage_options: HashMap<String, String>,
) -> Result<DeltaTable, DeltaTableError> {
    let table = DeltaTableBuilder::from_valid_uri(table_uri)?
        .with_storage_options(storage_options)
        .load()
        .await?;
    Ok(table)
}
pub async fn open_table_with_version(
    table_uri: impl AsRef<str>,
    version: i64,
) -> Result<DeltaTable, DeltaTableError> {
    let table = DeltaTableBuilder::from_valid_uri(table_uri)?
        .with_version(version)
        .load()
        .await?;
    Ok(table)
}
pub async fn open_table_with_ds(
    table_uri: impl AsRef<str>,
    ds: impl AsRef<str>,
) -> Result<DeltaTable, DeltaTableError> {
    let table = DeltaTableBuilder::from_valid_uri(table_uri)?
        .with_datestring(ds)?
        .load()
        .await?;
    Ok(table)
}
pub fn crate_version() -> &'static str {
    env!("CARGO_PKG_VERSION")
}
#[cfg(test)]
mod tests {
    use itertools::Itertools;
    use super::*;
    use crate::table::PeekCommit;
    use std::collections::HashMap;
    #[tokio::test]
    async fn read_delta_2_0_table_without_version() {
        let table = crate::open_table("../test/tests/data/delta-0.2.0")
            .await
            .unwrap();
        assert_eq!(table.version(), 3);
        assert_eq!(table.protocol().unwrap().min_writer_version, 2);
        assert_eq!(table.protocol().unwrap().min_reader_version, 1);
        assert_eq!(
            table.get_files_iter().unwrap().collect_vec(),
            vec![
                Path::from("part-00000-cb6b150b-30b8-4662-ad28-ff32ddab96d2-c000.snappy.parquet"),
                Path::from("part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet"),
                Path::from("part-00001-c373a5bd-85f0-4758-815e-7eb62007a15c-c000.snappy.parquet"),
            ]
        );
        let tombstones = table
            .snapshot()
            .unwrap()
            .all_tombstones(table.object_store().clone())
            .await
            .unwrap()
            .collect_vec();
        assert_eq!(tombstones.len(), 4);
        assert!(tombstones.contains(&crate::kernel::Remove {
            path: "part-00000-512e1537-8aaa-4193-b8b4-bef3de0de409-c000.snappy.parquet".to_string(),
            deletion_timestamp: Some(1564524298213),
            data_change: false,
            extended_file_metadata: None,
            deletion_vector: None,
            partition_values: None,
            tags: None,
            base_row_id: None,
            default_row_commit_version: None,
            size: None,
        }));
    }
    #[tokio::test]
    async fn read_delta_table_with_update() {
        let path = "../test/tests/data/simple_table_with_checkpoint/";
        let table_newest_version = crate::open_table(path).await.unwrap();
        let mut table_to_update = crate::open_table_with_version(path, 0).await.unwrap();
        table_to_update.update().await.unwrap();
        table_to_update.update().await.unwrap();
        table_to_update.update().await.unwrap();
        assert_eq!(
            table_newest_version.get_files_iter().unwrap().collect_vec(),
            table_to_update.get_files_iter().unwrap().collect_vec()
        );
    }
    #[tokio::test]
    async fn read_delta_2_0_table_with_version() {
        let mut table = crate::open_table_with_version("../test/tests/data/delta-0.2.0", 0)
            .await
            .unwrap();
        assert_eq!(table.version(), 0);
        assert_eq!(table.protocol().unwrap().min_writer_version, 2);
        assert_eq!(table.protocol().unwrap().min_reader_version, 1);
        assert_eq!(
            table.get_files_iter().unwrap().collect_vec(),
            vec![
                Path::from("part-00000-b44fcdb0-8b06-4f3a-8606-f8311a96f6dc-c000.snappy.parquet"),
                Path::from("part-00001-185eca06-e017-4dea-ae49-fc48b973e37e-c000.snappy.parquet"),
            ],
        );
        table = crate::open_table_with_version("../test/tests/data/delta-0.2.0", 2)
            .await
            .unwrap();
        assert_eq!(table.version(), 2);
        assert_eq!(table.protocol().unwrap().min_writer_version, 2);
        assert_eq!(table.protocol().unwrap().min_reader_version, 1);
        assert_eq!(
            table.get_files_iter().unwrap().collect_vec(),
            vec![
                Path::from("part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet"),
                Path::from("part-00001-c373a5bd-85f0-4758-815e-7eb62007a15c-c000.snappy.parquet"),
            ]
        );
        table = crate::open_table_with_version("../test/tests/data/delta-0.2.0", 3)
            .await
            .unwrap();
        assert_eq!(table.version(), 3);
        assert_eq!(table.protocol().unwrap().min_writer_version, 2);
        assert_eq!(table.protocol().unwrap().min_reader_version, 1);
        assert_eq!(
            table.get_files_iter().unwrap().collect_vec(),
            vec![
                Path::from("part-00000-cb6b150b-30b8-4662-ad28-ff32ddab96d2-c000.snappy.parquet"),
                Path::from("part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet"),
                Path::from("part-00001-c373a5bd-85f0-4758-815e-7eb62007a15c-c000.snappy.parquet"),
            ]
        );
    }
    #[tokio::test]
    async fn read_delta_8_0_table_without_version() {
        let table = crate::open_table("../test/tests/data/delta-0.8.0")
            .await
            .unwrap();
        assert_eq!(table.version(), 1);
        assert_eq!(table.protocol().unwrap().min_writer_version, 2);
        assert_eq!(table.protocol().unwrap().min_reader_version, 1);
        assert_eq!(
            table.get_files_iter().unwrap().collect_vec(),
            vec![
                Path::from("part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet"),
                Path::from("part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet"),
            ]
        );
        assert_eq!(table.get_files_count(), 2);
        let stats = table.snapshot().unwrap().add_actions_table(true).unwrap();
        let num_records = stats.column_by_name("num_records").unwrap();
        let num_records = num_records
            .as_any()
            .downcast_ref::<arrow::array::Int64Array>()
            .unwrap();
        let total_records = num_records.values().iter().sum::<i64>();
        assert_eq!(total_records, 4);
        let null_counts = stats.column_by_name("null_count.value").unwrap();
        let null_counts = null_counts
            .as_any()
            .downcast_ref::<arrow::array::Int64Array>()
            .unwrap();
        null_counts.values().iter().for_each(|x| assert_eq!(*x, 0));
        let tombstones = table
            .snapshot()
            .unwrap()
            .all_tombstones(table.object_store().clone())
            .await
            .unwrap()
            .collect_vec();
        assert_eq!(tombstones.len(), 1);
        assert!(tombstones.contains(&crate::kernel::Remove {
            path: "part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet".to_string(),
            deletion_timestamp: Some(1615043776198),
            data_change: true,
            extended_file_metadata: Some(true),
            partition_values: Some(HashMap::new()),
            size: Some(445),
            base_row_id: None,
            default_row_commit_version: None,
            deletion_vector: None,
            tags: Some(HashMap::new()),
        }));
    }
    #[tokio::test]
    async fn read_delta_8_0_table_with_load_version() {
        let mut table = crate::open_table("../test/tests/data/delta-0.8.0")
            .await
            .unwrap();
        assert_eq!(table.version(), 1);
        assert_eq!(table.protocol().unwrap().min_writer_version, 2);
        assert_eq!(table.protocol().unwrap().min_reader_version, 1);
        assert_eq!(
            table.get_files_iter().unwrap().collect_vec(),
            vec![
                Path::from("part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet"),
                Path::from("part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet"),
            ]
        );
        table.load_version(0).await.unwrap();
        assert_eq!(table.version(), 0);
        assert_eq!(table.protocol().unwrap().min_writer_version, 2);
        assert_eq!(table.protocol().unwrap().min_reader_version, 1);
        assert_eq!(
            table.get_files_iter().unwrap().collect_vec(),
            vec![
                Path::from("part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet"),
                Path::from("part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"),
            ]
        );
    }
    #[tokio::test]
    async fn read_delta_8_0_table_with_partitions() {
        let table = crate::open_table("../test/tests/data/delta-0.8.0-partitioned")
            .await
            .unwrap();
        let filters = vec![
            crate::PartitionFilter {
                key: "month".to_string(),
                value: crate::PartitionValue::Equal("2".to_string()),
            },
            crate::PartitionFilter {
                key: "year".to_string(),
                value: crate::PartitionValue::Equal("2020".to_string()),
            },
        ];
        assert_eq!(
            table.get_files_by_partitions(&filters).unwrap(),
            vec![
                Path::from("year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet"),
                Path::from("year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet")
            ]
        );
        assert_eq!(
            table.get_file_uris_by_partitions(&filters).unwrap().into_iter().map(|p| std::fs::canonicalize(p).unwrap()).collect::<Vec<_>>(),
            vec![
                std::fs::canonicalize("../test/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet").unwrap(),
                std::fs::canonicalize("../test/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet").unwrap(),
            ]
        );
        let filters = vec![crate::PartitionFilter {
            key: "month".to_string(),
            value: crate::PartitionValue::NotEqual("2".to_string()),
        }];
        assert_eq!(
            table.get_files_by_partitions(&filters).unwrap(),
            vec![
                Path::from("year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet"),
                Path::from("year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet"),
                Path::from("year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet"),
                Path::from("year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet")
            ]
        );
        let filters = vec![crate::PartitionFilter {
            key: "month".to_string(),
            value: crate::PartitionValue::In(vec!["2".to_string(), "12".to_string()]),
        }];
        assert_eq!(
            table.get_files_by_partitions(&filters).unwrap(),
            vec![
                Path::from("year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet"),
                Path::from("year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet"),
                Path::from("year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet"),
                Path::from("year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet")
            ]
        );
        let filters = vec![crate::PartitionFilter {
            key: "month".to_string(),
            value: crate::PartitionValue::NotIn(vec!["2".to_string(), "12".to_string()]),
        }];
        assert_eq!(
            table.get_files_by_partitions(&filters).unwrap(),
            vec![
                Path::from("year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet"),
                Path::from("year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet")
            ]
        );
    }
    #[tokio::test]
    async fn read_delta_8_0_table_with_null_partition() {
        let table = crate::open_table("../test/tests/data/delta-0.8.0-null-partition")
            .await
            .unwrap();
        let filters = vec![crate::PartitionFilter {
            key: "k".to_string(),
            value: crate::PartitionValue::Equal("A".to_string()),
        }];
        assert_eq!(
            table.get_files_by_partitions(&filters).unwrap(),
            vec![Path::from(
                "k=A/part-00000-b1f1dbbb-70bc-4970-893f-9bb772bf246e.c000.snappy.parquet"
            )]
        );
        let filters = vec![crate::PartitionFilter {
            key: "k".to_string(),
            value: crate::PartitionValue::Equal("".to_string()),
        }];
        assert_eq!(
        table.get_files_by_partitions(&filters).unwrap(),
        vec![
            Path::from("k=__HIVE_DEFAULT_PARTITION__/part-00001-8474ac85-360b-4f58-b3ea-23990c71b932.c000.snappy.parquet")
        ]
    );
    }
    #[tokio::test]
    async fn read_delta_8_0_table_with_special_partition() {
        let table = crate::open_table("../test/tests/data/delta-0.8.0-special-partition")
            .await
            .unwrap();
        assert_eq!(
            table.get_files_iter().unwrap().collect_vec(),
            vec![
                Path::parse(
                    "x=A%2FA/part-00007-b350e235-2832-45df-9918-6cab4f7578f7.c000.snappy.parquet"
                )
                .unwrap(),
                Path::parse(
                    "x=B%20B/part-00015-e9abbc6f-85e9-457b-be8e-e9f5b8a22890.c000.snappy.parquet"
                )
                .unwrap()
            ]
        );
        let filters = vec![crate::PartitionFilter {
            key: "x".to_string(),
            value: crate::PartitionValue::Equal("A/A".to_string()),
        }];
        assert_eq!(
            table.get_files_by_partitions(&filters).unwrap(),
            vec![Path::parse(
                "x=A%2FA/part-00007-b350e235-2832-45df-9918-6cab4f7578f7.c000.snappy.parquet"
            )
            .unwrap()]
        );
    }
    #[tokio::test]
    async fn read_delta_8_0_table_partition_with_compare_op() {
        let table = crate::open_table("../test/tests/data/delta-0.8.0-numeric-partition")
            .await
            .unwrap();
        let filters = vec![crate::PartitionFilter {
            key: "x".to_string(),
            value: crate::PartitionValue::LessThanOrEqual("9".to_string()),
        }];
        assert_eq!(
            table.get_files_by_partitions(&filters).unwrap(),
            vec![Path::from(
                "x=9/y=9.9/part-00007-3c50fba1-4264-446c-9c67-d8e24a1ccf83.c000.snappy.parquet"
            )]
        );
        let filters = vec![crate::PartitionFilter {
            key: "y".to_string(),
            value: crate::PartitionValue::LessThan("10.0".to_string()),
        }];
        assert_eq!(
            table.get_files_by_partitions(&filters).unwrap(),
            vec![Path::from(
                "x=9/y=9.9/part-00007-3c50fba1-4264-446c-9c67-d8e24a1ccf83.c000.snappy.parquet"
            )]
        );
    }
    #[tokio::test]
    async fn test_table_history() {
        let path = "../test/tests/data/simple_table_with_checkpoint";
        let latest_table = crate::open_table(path).await.unwrap();
        let table = crate::open_table_with_version(path, 1).await.unwrap();
        let history1 = table.history(None).await.expect("Cannot get table history");
        let history2 = latest_table
            .history(None)
            .await
            .expect("Cannot get table history");
        assert_eq!(history1, history2);
        let history3 = latest_table
            .history(Some(5))
            .await
            .expect("Cannot get table history");
        assert_eq!(history3.len(), 5);
    }
    #[tokio::test]
    async fn test_poll_table_commits() {
        let path = "../test/tests/data/simple_table_with_checkpoint";
        let mut table = crate::open_table_with_version(path, 9).await.unwrap();
        let peek = table.peek_next_commit(table.version()).await.unwrap();
        assert!(matches!(peek, PeekCommit::New(..)));
        if let PeekCommit::New(version, actions) = peek {
            assert_eq!(table.version(), 9);
            assert!(!table.get_files_iter().unwrap().any(|f| f
                == Path::from(
                    "part-00000-f0e955c5-a1e3-4eec-834e-dcc098fc9005-c000.snappy.parquet"
                )));
            assert_eq!(version, 10);
            assert_eq!(actions.len(), 2);
            table.update_incremental(None).await.unwrap();
            assert_eq!(table.version(), 10);
            assert!(table.get_files_iter().unwrap().any(|f| f
                == Path::from(
                    "part-00000-f0e955c5-a1e3-4eec-834e-dcc098fc9005-c000.snappy.parquet"
                )));
        };
        let peek = table.peek_next_commit(table.version()).await.unwrap();
        assert!(matches!(peek, PeekCommit::UpToDate));
    }
    #[tokio::test]
    async fn test_read_vacuumed_log() {
        let path = "../test/tests/data/checkpoints_vacuumed";
        let table = crate::open_table(path).await.unwrap();
        assert_eq!(table.version(), 12);
    }
    #[tokio::test]
    async fn test_read_vacuumed_log_history() {
        let path = "../test/tests/data/checkpoints_vacuumed";
        let table = crate::open_table(path).await.unwrap();
        let history = table
            .history(Some(5))
            .await
            .expect("Cannot get table history");
        assert_eq!(history.len(), 5);
        let history = table
            .history(Some(10))
            .await
            .expect("Cannot get table history");
        assert_eq!(history.len(), 8);
    }
    #[tokio::test]
    async fn read_empty_folder() {
        let dir = std::env::temp_dir();
        let result = crate::open_table(&dir.into_os_string().into_string().unwrap()).await;
        assert!(matches!(
            result.unwrap_err(),
            crate::errors::DeltaTableError::NotATable(_),
        ));
        let dir = std::env::temp_dir();
        let result = crate::open_table_with_ds(
            &dir.into_os_string().into_string().unwrap(),
            "2021-08-09T13:18:31+08:00",
        )
        .await;
        assert!(matches!(
            result.unwrap_err(),
            crate::errors::DeltaTableError::NotATable(_),
        ));
    }
    #[tokio::test]
    async fn read_delta_table_with_cdc() {
        let table = crate::open_table("../test/tests/data/simple_table_with_cdc")
            .await
            .unwrap();
        assert_eq!(table.version(), 2);
        assert_eq!(
            table.get_files_iter().unwrap().collect_vec(),
            vec![Path::from(
                "part-00000-7444aec4-710a-4a4c-8abe-3323499043e9.c000.snappy.parquet"
            ),]
        );
    }
    #[tokio::test()]
    async fn test_version_zero_table_load() {
        let path = "../test/tests/data/COVID-19_NYT";
        let latest_table: DeltaTable = crate::open_table(path).await.unwrap();
        let version_0_table = crate::open_table_with_version(path, 0).await.unwrap();
        let version_0_history = version_0_table
            .history(None)
            .await
            .expect("Cannot get table history");
        let latest_table_history = latest_table
            .history(None)
            .await
            .expect("Cannot get table history");
        assert_eq!(latest_table_history, version_0_history);
    }
    #[tokio::test()]
    async fn test_fail_fast_on_not_existing_path() {
        use std::path::Path as FolderPath;
        let non_existing_path_str = "../test/tests/data/folder_doesnt_exist";
        let path_doesnt_exist = !FolderPath::new(non_existing_path_str).exists();
        assert!(path_doesnt_exist);
        let error = crate::open_table(non_existing_path_str).await.unwrap_err();
        let _expected_error_msg = format!(
            "Local path \"{}\" does not exist or you don't have access!",
            non_existing_path_str
        );
        assert!(matches!(
            error,
            DeltaTableError::InvalidTableLocation(_expected_error_msg),
        ))
    }
    #[tokio::test]
    async fn test_identity_column() {
        let path = "../test/tests/data/issue-2152";
        let _ = crate::open_table(path)
            .await
            .expect("Failed to load the table");
    }
}