deltalake_core/
lib.rs

1//! Native Delta Lake implementation in Rust
2//!
3//! # Usage
4//!
5//! Load a Delta Table by URL:
6//!
7//! ```rust
8//! # use url::Url;
9//! async {
10//!   let table_url = Url::from_directory_path("../test/tests/data/simple_table").unwrap();
11//!   let table = deltalake_core::open_table(table_url).await.unwrap();
12//!   let version = table.version();
13//! };
14//! ```
15//!
16//! Load a specific version of Delta Table by URL then filter files by partitions:
17//!
18//! ```rust
19//! # use url::Url;
20//! async {
21//!   let table_url = Url::from_directory_path("../test/tests/data/simple_table").unwrap();
22//!   let table = deltalake_core::open_table_with_version(table_url, 0).await.unwrap();
23//!   let filter = [deltalake_core::PartitionFilter {
24//!       key: "month".to_string(),
25//!       value: deltalake_core::PartitionValue::Equal("12".to_string()),
26//!   }];
27//!   let files = table.get_files_by_partitions(&filter).await.unwrap();
28//! };
29//! ```
30//!
31//! Load a specific version of Delta Table by URL and datetime:
32//!
33//! ```rust
34//! # use url::Url;
35//! async {
36//!   let table_url = Url::from_directory_path("../test/tests/data/simple_table").unwrap();
37//!   let table = deltalake_core::open_table_with_ds(
38//!       table_url,
39//!       "2020-05-02T23:47:31-07:00",
40//!   ).await.unwrap();
41//!   let version = table.version();
42//! };
43//! ```
44//!
45//! # Optional cargo package features
46//!
47//! - `s3`, `gcs`, `azure` - enable the storage backends for AWS S3, Google Cloud Storage (GCS),
48//!   or Azure Blob Storage / Azure Data Lake Storage Gen2 (ADLS2). Use `s3-native-tls` to use native TLS
49//!   instead of Rust TLS implementation.
50//! - `datafusion` - enable the `datafusion::datasource::TableProvider` trait implementation
51//!   for Delta Tables, allowing them to be queried using [DataFusion](https://github.com/apache/arrow-datafusion).
52//! - `datafusion-ext` - DEPRECATED: alias for `datafusion` feature.
53//!
54//! # Querying Delta Tables with Datafusion
55//!
56//! Querying from local filesystem:
57//! ```
58//! use std::sync::Arc;
59//! # use url::Url;
60//!
61//! # #[cfg(feature="datafusion")]
62//! async {
63//!   use datafusion::prelude::SessionContext;
64//!   let mut ctx = SessionContext::new();
65//!   let table_url = Url::from_directory_path("../test/tests/data/simple_table").unwrap();
66//!   let table = deltalake_core::open_table(table_url)
67//!       .await
68//!       .unwrap();
69//!   ctx.register_table("demo", Arc::new(table)).unwrap();
70//!
71//!   let batches = ctx
72//!       .sql("SELECT * FROM demo").await.unwrap()
73//!       .collect()
74//!       .await.unwrap();
75//! };
76//! ```
77
78// #![deny(missing_docs)]
79#![allow(rustdoc::invalid_html_tags)]
80#![allow(clippy::nonminimal_bool)]
81pub mod data_catalog;
82pub mod errors;
83pub mod kernel;
84pub mod logstore;
85pub mod operations;
86pub mod protocol;
87pub use kernel::schema;
88pub mod table;
89
90#[cfg(any(test, feature = "integration_test"))]
91pub mod test_utils;
92
93#[cfg(feature = "datafusion")]
94pub mod delta_datafusion;
95pub mod writer;
96
97use std::collections::HashMap;
98use std::sync::OnceLock;
99use url::Url;
100
101pub use self::data_catalog::{DataCatalog, DataCatalogError};
102pub use self::errors::*;
103pub use self::schema::partitions::*;
104pub use self::schema::*;
105pub use self::table::builder::{
106    ensure_table_uri, DeltaTableBuilder, DeltaTableConfig, DeltaVersion,
107};
108pub use self::table::config::TableProperty;
109pub use self::table::DeltaTable;
110pub use object_store::{path::Path, Error as ObjectStoreError, ObjectMeta, ObjectStore};
111pub use operations::DeltaOps;
112
113pub use protocol::checkpoints;
114
115// convenience exports for consumers to avoid aligning crate versions
116pub use arrow;
117#[cfg(feature = "datafusion")]
118pub use datafusion;
119pub use parquet;
120
121#[cfg(not(any(feature = "rustls", feature = "native-tls")))]
122compile_error!("You must enable at least one of the features: `rustls` or `native-tls`.");
123
124/// Creates and loads a DeltaTable from the given URL with current metadata.
125/// Infers the storage backend to use from the scheme in the given table URL.
126///
127/// Will fail fast if specified `table_uri` is a local path but doesn't exist.
128pub async fn open_table(table_uri: Url) -> Result<DeltaTable, DeltaTableError> {
129    let table = DeltaTableBuilder::from_uri(table_uri)?.load().await?;
130    Ok(table)
131}
132
133/// Same as `open_table`, but also accepts storage options to aid in building the table for a deduced
134/// `StorageService`.
135///
136/// Will fail fast if specified `table_uri` is a local path but doesn't exist.
137pub async fn open_table_with_storage_options(
138    table_uri: Url,
139    storage_options: HashMap<String, String>,
140) -> Result<DeltaTable, DeltaTableError> {
141    let table = DeltaTableBuilder::from_uri(table_uri)?
142        .with_storage_options(storage_options)
143        .load()
144        .await?;
145    Ok(table)
146}
147
148/// Creates a DeltaTable from the given URL and loads it with the metadata from the given version.
149/// Infers the storage backend to use from the scheme in the given table URL.
150///
151/// Will fail fast if specified `table_uri` is a local path but doesn't exist.
152pub async fn open_table_with_version(
153    table_url: Url,
154    version: i64,
155) -> Result<DeltaTable, DeltaTableError> {
156    let table = DeltaTableBuilder::from_uri(table_url)?
157        .with_version(version)
158        .load()
159        .await?;
160    Ok(table)
161}
162
163/// Creates a DeltaTable from the given URL.
164///
165/// Loads metadata from the version appropriate based on the given ISO-8601/RFC-3339 timestamp.
166/// Infers the storage backend to use from the scheme in the given table URL.
167///
168/// Will fail fast if specified `table_uri` is a local path but doesn't exist.
169pub async fn open_table_with_ds(
170    table_uri: Url,
171    ds: impl AsRef<str>,
172) -> Result<DeltaTable, DeltaTableError> {
173    let table = DeltaTableBuilder::from_uri(table_uri)?
174        .with_datestring(ds)?
175        .load()
176        .await?;
177    Ok(table)
178}
179
180static CLIENT_VERSION: OnceLock<String> = OnceLock::new();
181
182pub fn init_client_version(version: &str) {
183    let _ = CLIENT_VERSION.set(version.to_string());
184}
185
186/// Returns Rust core version or custom set client_version such as the py-binding
187pub fn crate_version() -> &'static str {
188    CLIENT_VERSION
189        .get()
190        .map(|s| s.as_str())
191        .unwrap_or(env!("CARGO_PKG_VERSION"))
192}
193
194#[cfg(test)]
195mod tests {
196    use futures::TryStreamExt as _;
197    use itertools::Itertools;
198
199    use super::*;
200    use crate::table::PeekCommit;
201
202    #[tokio::test]
203    async fn read_delta_2_0_table_without_version() {
204        let table_path = std::path::Path::new("../test/tests/data/delta-0.2.0")
205            .canonicalize()
206            .unwrap();
207        let table_url = url::Url::from_directory_path(table_path).unwrap();
208        let table = crate::open_table(table_url).await.unwrap();
209        let snapshot = table.snapshot().unwrap();
210        assert_eq!(snapshot.version(), 3);
211        assert_eq!(snapshot.protocol().min_writer_version(), 2);
212        assert_eq!(snapshot.protocol().min_reader_version(), 1);
213        assert_eq!(
214            snapshot.file_paths_iter().collect_vec(),
215            vec![
216                Path::from("part-00000-cb6b150b-30b8-4662-ad28-ff32ddab96d2-c000.snappy.parquet"),
217                Path::from("part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet"),
218                Path::from("part-00001-c373a5bd-85f0-4758-815e-7eb62007a15c-c000.snappy.parquet"),
219            ]
220        );
221        let tombstones = table
222            .snapshot()
223            .unwrap()
224            .all_tombstones(&table.log_store())
225            .try_collect::<Vec<_>>()
226            .await
227            .unwrap();
228        assert_eq!(tombstones.len(), 4);
229        // assert!(tombstones.contains(&crate::kernel::Remove {
230        //     path: "part-00000-512e1537-8aaa-4193-b8b4-bef3de0de409-c000.snappy.parquet".to_string(),
231        //     deletion_timestamp: Some(1564524298213),
232        //     data_change: false,
233        //     extended_file_metadata: None,
234        //     deletion_vector: None,
235        //     partition_values: None,
236        //     tags: None,
237        //     base_row_id: None,
238        //     default_row_commit_version: None,
239        //     size: None,
240        // }));
241    }
242
243    #[tokio::test]
244    async fn read_delta_table_with_update() {
245        let table_path = std::path::Path::new("../test/tests/data/simple_table_with_checkpoint/")
246            .canonicalize()
247            .unwrap();
248        let table_url = url::Url::from_directory_path(table_path).unwrap();
249        let table_newest_version = crate::open_table(table_url.clone()).await.unwrap();
250        let mut table_to_update = crate::open_table_with_version(table_url, 0).await.unwrap();
251        // calling update several times should not produce any duplicates
252        table_to_update.update().await.unwrap();
253        table_to_update.update().await.unwrap();
254        table_to_update.update().await.unwrap();
255
256        assert_eq!(
257            table_newest_version
258                .snapshot()
259                .unwrap()
260                .file_paths_iter()
261                .collect_vec(),
262            table_to_update
263                .snapshot()
264                .unwrap()
265                .file_paths_iter()
266                .collect_vec()
267        );
268    }
269    #[tokio::test]
270    async fn read_delta_2_0_table_with_version() {
271        let table_path = std::path::Path::new("../test/tests/data/delta-0.2.0")
272            .canonicalize()
273            .unwrap();
274        let table_url = url::Url::from_directory_path(table_path).unwrap();
275        let mut table = crate::open_table_with_version(table_url.clone(), 0)
276            .await
277            .unwrap();
278        let snapshot = table.snapshot().unwrap();
279        assert_eq!(snapshot.version(), 0);
280        assert_eq!(snapshot.protocol().min_writer_version(), 2);
281        assert_eq!(snapshot.protocol().min_reader_version(), 1);
282        assert_eq!(
283            snapshot.file_paths_iter().collect_vec(),
284            vec![
285                Path::from("part-00000-b44fcdb0-8b06-4f3a-8606-f8311a96f6dc-c000.snappy.parquet"),
286                Path::from("part-00001-185eca06-e017-4dea-ae49-fc48b973e37e-c000.snappy.parquet"),
287            ],
288        );
289
290        table = crate::open_table_with_version(table_url.clone(), 2)
291            .await
292            .unwrap();
293        let snapshot = table.snapshot().unwrap();
294        assert_eq!(snapshot.version(), 2);
295        assert_eq!(snapshot.protocol().min_writer_version(), 2);
296        assert_eq!(snapshot.protocol().min_reader_version(), 1);
297        assert_eq!(
298            snapshot.file_paths_iter().collect_vec(),
299            vec![
300                Path::from("part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet"),
301                Path::from("part-00001-c373a5bd-85f0-4758-815e-7eb62007a15c-c000.snappy.parquet"),
302            ]
303        );
304
305        table = crate::open_table_with_version(table_url, 3).await.unwrap();
306        let snapshot = table.snapshot().unwrap();
307        assert_eq!(snapshot.version(), 3);
308        assert_eq!(snapshot.protocol().min_writer_version(), 2);
309        assert_eq!(snapshot.protocol().min_reader_version(), 1);
310        assert_eq!(
311            snapshot.file_paths_iter().collect_vec(),
312            vec![
313                Path::from("part-00000-cb6b150b-30b8-4662-ad28-ff32ddab96d2-c000.snappy.parquet"),
314                Path::from("part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet"),
315                Path::from("part-00001-c373a5bd-85f0-4758-815e-7eb62007a15c-c000.snappy.parquet"),
316            ]
317        );
318    }
319
320    #[tokio::test]
321    async fn read_delta_8_0_table_without_version() {
322        let table_path = std::path::Path::new("../test/tests/data/delta-0.8.0")
323            .canonicalize()
324            .unwrap();
325        let table_url = url::Url::from_directory_path(table_path).unwrap();
326        let table = crate::open_table(table_url).await.unwrap();
327        let snapshot = table.snapshot().unwrap();
328        assert_eq!(snapshot.version(), 1);
329        assert_eq!(snapshot.protocol().min_writer_version(), 2);
330        assert_eq!(snapshot.protocol().min_reader_version(), 1);
331        assert_eq!(
332            snapshot.file_paths_iter().collect_vec(),
333            vec![
334                Path::from("part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet"),
335                Path::from("part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet"),
336            ]
337        );
338        assert_eq!(table.snapshot().unwrap().log_data().num_files(), 2);
339
340        let stats = table.snapshot().unwrap().add_actions_table(true).unwrap();
341
342        let num_records = stats.column_by_name("num_records").unwrap();
343        let num_records = num_records
344            .as_any()
345            .downcast_ref::<arrow::array::Int64Array>()
346            .unwrap();
347        let total_records = num_records.values().iter().sum::<i64>();
348        assert_eq!(total_records, 4);
349
350        let null_counts = stats.column_by_name("null_count.value").unwrap();
351        let null_counts = null_counts
352            .as_any()
353            .downcast_ref::<arrow::array::Int64Array>()
354            .unwrap();
355        null_counts.values().iter().for_each(|x| assert_eq!(*x, 0));
356
357        let tombstones = table
358            .snapshot()
359            .unwrap()
360            .all_tombstones(&table.log_store())
361            .try_collect::<Vec<_>>()
362            .await
363            .unwrap();
364        assert_eq!(tombstones.len(), 1);
365        let tombstone = tombstones.first().unwrap();
366        assert_eq!(
367            tombstone.path(),
368            "part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"
369        );
370        assert_eq!(tombstone.deletion_timestamp(), Some(1615043776198));
371    }
372
373    #[tokio::test]
374    async fn read_delta_8_0_table_with_load_version() {
375        let table_path = std::path::Path::new("../test/tests/data/delta-0.8.0")
376            .canonicalize()
377            .unwrap();
378        let table_url = url::Url::from_directory_path(table_path).unwrap();
379        let mut table = crate::open_table(table_url).await.unwrap();
380        let snapshot = table.snapshot().unwrap();
381        assert_eq!(snapshot.version(), 1);
382        assert_eq!(snapshot.protocol().min_writer_version(), 2);
383        assert_eq!(snapshot.protocol().min_reader_version(), 1);
384        assert_eq!(
385            snapshot.file_paths_iter().collect_vec(),
386            vec![
387                Path::from("part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet"),
388                Path::from("part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet"),
389            ]
390        );
391        table.load_version(0).await.unwrap();
392        let snapshot = table.snapshot().unwrap();
393        assert_eq!(snapshot.version(), 0);
394        assert_eq!(snapshot.protocol().min_writer_version(), 2);
395        assert_eq!(snapshot.protocol().min_reader_version(), 1);
396        assert_eq!(
397            snapshot.file_paths_iter().collect_vec(),
398            vec![
399                Path::from("part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet"),
400                Path::from("part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"),
401            ]
402        );
403    }
404
405    #[tokio::test]
406    async fn read_delta_8_0_table_with_partitions() {
407        let table_path = std::path::Path::new("../test/tests/data/delta-0.8.0-partitioned")
408            .canonicalize()
409            .unwrap();
410        let table_url = url::Url::from_directory_path(table_path).unwrap();
411        let table = crate::open_table(table_url).await.unwrap();
412
413        let filters = vec![
414            crate::PartitionFilter {
415                key: "month".to_string(),
416                value: crate::PartitionValue::Equal("2".to_string()),
417            },
418            crate::PartitionFilter {
419                key: "year".to_string(),
420                value: crate::PartitionValue::Equal("2020".to_string()),
421            },
422        ];
423
424        assert_eq!(
425            table.get_files_by_partitions(&filters).await.unwrap(),
426            vec![
427                Path::from("year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet"),
428                Path::from("year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet")
429            ]
430        );
431        assert_eq!(
432            table.get_file_uris_by_partitions(&filters).await.unwrap().into_iter().map(|p| std::fs::canonicalize(p).unwrap()).collect::<Vec<_>>(),
433            vec![
434                std::fs::canonicalize("../test/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet").unwrap(),
435                std::fs::canonicalize("../test/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet").unwrap(),
436            ]
437        );
438
439        let filters = vec![crate::PartitionFilter {
440            key: "month".to_string(),
441            value: crate::PartitionValue::NotEqual("2".to_string()),
442        }];
443        assert_eq!(
444            table.get_files_by_partitions(&filters).await.unwrap(),
445            vec![
446                Path::from("year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet"),
447                Path::from("year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet"),
448                Path::from("year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet"),
449                Path::from("year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet")
450            ]
451        );
452
453        let filters = vec![crate::PartitionFilter {
454            key: "month".to_string(),
455            value: crate::PartitionValue::In(vec!["2".to_string(), "12".to_string()]),
456        }];
457        assert_eq!(
458            table.get_files_by_partitions(&filters).await.unwrap(),
459            vec![
460                Path::from("year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet"),
461                Path::from("year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet"),
462                Path::from("year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet"),
463                Path::from("year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet")
464            ]
465        );
466
467        let filters = vec![crate::PartitionFilter {
468            key: "month".to_string(),
469            value: crate::PartitionValue::NotIn(vec!["2".to_string(), "12".to_string()]),
470        }];
471        assert_eq!(
472            table.get_files_by_partitions(&filters).await.unwrap(),
473            vec![
474                Path::from("year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet"),
475                Path::from("year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet")
476            ]
477        );
478    }
479
480    #[tokio::test]
481    async fn read_delta_8_0_table_with_null_partition() {
482        let table_path = std::path::Path::new("../test/tests/data/delta-0.8.0-null-partition")
483            .canonicalize()
484            .unwrap();
485        let table_url = url::Url::from_directory_path(table_path).unwrap();
486        let table = crate::open_table(table_url).await.unwrap();
487
488        let filters = vec![crate::PartitionFilter {
489            key: "k".to_string(),
490            value: crate::PartitionValue::Equal("A".to_string()),
491        }];
492        assert_eq!(
493            table.get_files_by_partitions(&filters).await.unwrap(),
494            vec![Path::from(
495                "k=A/part-00000-b1f1dbbb-70bc-4970-893f-9bb772bf246e.c000.snappy.parquet"
496            )]
497        );
498
499        let filters = vec![crate::PartitionFilter {
500            key: "k".to_string(),
501            value: crate::PartitionValue::Equal("".to_string()),
502        }];
503        assert_eq!(
504            table.get_files_by_partitions(&filters).await.unwrap(),
505            vec![
506                Path::from("k=__HIVE_DEFAULT_PARTITION__/part-00001-8474ac85-360b-4f58-b3ea-23990c71b932.c000.snappy.parquet")
507            ]
508        );
509    }
510
511    #[tokio::test]
512    async fn read_delta_8_0_table_with_special_partition() {
513        let table_path = std::path::Path::new("../test/tests/data/delta-0.8.0-special-partition")
514            .canonicalize()
515            .unwrap();
516        let table_url = url::Url::from_directory_path(table_path).unwrap();
517        let table = crate::open_table(table_url).await.unwrap();
518
519        assert_eq!(
520            table.snapshot().unwrap().file_paths_iter().collect_vec(),
521            vec![
522                Path::parse(
523                    "x=A%2FA/part-00007-b350e235-2832-45df-9918-6cab4f7578f7.c000.snappy.parquet"
524                )
525                .unwrap(),
526                Path::parse(
527                    "x=B%20B/part-00015-e9abbc6f-85e9-457b-be8e-e9f5b8a22890.c000.snappy.parquet"
528                )
529                .unwrap()
530            ]
531        );
532
533        let filters = vec![crate::PartitionFilter {
534            key: "x".to_string(),
535            value: crate::PartitionValue::Equal("A/A".to_string()),
536        }];
537        assert_eq!(
538            table.get_files_by_partitions(&filters).await.unwrap(),
539            vec![Path::parse(
540                "x=A%2FA/part-00007-b350e235-2832-45df-9918-6cab4f7578f7.c000.snappy.parquet"
541            )
542            .unwrap()]
543        );
544    }
545
546    #[tokio::test]
547    async fn read_delta_8_0_table_partition_with_compare_op() {
548        let table_path = std::path::Path::new("../test/tests/data/delta-0.8.0-numeric-partition")
549            .canonicalize()
550            .unwrap();
551        let table_url = url::Url::from_directory_path(table_path).unwrap();
552        let table = crate::open_table(table_url).await.unwrap();
553
554        let filters = vec![crate::PartitionFilter {
555            key: "x".to_string(),
556            value: crate::PartitionValue::LessThanOrEqual("9".to_string()),
557        }];
558        assert_eq!(
559            table.get_files_by_partitions(&filters).await.unwrap(),
560            vec![Path::from(
561                "x=9/y=9.9/part-00007-3c50fba1-4264-446c-9c67-d8e24a1ccf83.c000.snappy.parquet"
562            )]
563        );
564
565        let filters = vec![crate::PartitionFilter {
566            key: "y".to_string(),
567            value: crate::PartitionValue::LessThan("10.0".to_string()),
568        }];
569        assert_eq!(
570            table.get_files_by_partitions(&filters).await.unwrap(),
571            vec![Path::from(
572                "x=9/y=9.9/part-00007-3c50fba1-4264-446c-9c67-d8e24a1ccf83.c000.snappy.parquet"
573            )]
574        );
575    }
576
577    #[tokio::test]
578    async fn test_table_history() {
579        let table_path = std::path::Path::new("../test/tests/data/simple_table_with_checkpoint")
580            .canonicalize()
581            .unwrap();
582        let table_url = url::Url::from_directory_path(table_path).unwrap();
583        let latest_table = crate::open_table(table_url.clone()).await.unwrap();
584
585        let table = crate::open_table_with_version(table_url, 1).await.unwrap();
586
587        let history1: Vec<_> = table
588            .history(None)
589            .await
590            .expect("Cannot get table history")
591            .collect();
592        let history2: Vec<_> = latest_table
593            .history(None)
594            .await
595            .expect("Cannot get table history")
596            .collect();
597
598        assert_eq!(history1, history2);
599
600        let history3: Vec<_> = latest_table
601            .history(Some(5))
602            .await
603            .expect("Cannot get table history")
604            .collect();
605        assert_eq!(history3.len(), 5);
606    }
607
608    #[tokio::test]
609    async fn test_poll_table_commits() {
610        let table_path = std::path::Path::new("../test/tests/data/simple_table_with_checkpoint")
611            .canonicalize()
612            .unwrap();
613        let table_url = url::Url::from_directory_path(table_path).unwrap();
614        let mut table = crate::open_table_with_version(table_url, 9).await.unwrap();
615        assert_eq!(table.version(), Some(9));
616        let peek = table
617            .log_store()
618            .peek_next_commit(table.version().unwrap())
619            .await
620            .unwrap();
621        assert!(matches!(peek, PeekCommit::New(..)));
622
623        if let PeekCommit::New(version, actions) = peek {
624            assert_eq!(table.version(), Some(9));
625            assert!(!table.snapshot().unwrap().file_paths_iter().any(|f| f
626                == Path::from(
627                    "part-00000-f0e955c5-a1e3-4eec-834e-dcc098fc9005-c000.snappy.parquet"
628                )));
629
630            assert_eq!(version, 10);
631            assert_eq!(actions.len(), 2);
632
633            table.update_incremental(None).await.unwrap();
634
635            assert_eq!(table.version(), Some(10));
636            assert!(table.snapshot().unwrap().file_paths_iter().any(|f| f
637                == Path::from(
638                    "part-00000-f0e955c5-a1e3-4eec-834e-dcc098fc9005-c000.snappy.parquet"
639                )));
640        };
641
642        let peek = table
643            .log_store()
644            .peek_next_commit(table.version().unwrap())
645            .await
646            .unwrap();
647        assert!(matches!(peek, PeekCommit::UpToDate));
648    }
649
650    #[tokio::test]
651    async fn test_read_vacuumed_log() {
652        let table_path = std::path::Path::new("../test/tests/data/checkpoints_vacuumed")
653            .canonicalize()
654            .unwrap();
655        let table_url = url::Url::from_directory_path(table_path).unwrap();
656        let table = crate::open_table(table_url).await.unwrap();
657        assert_eq!(table.version(), Some(12));
658    }
659
660    #[tokio::test]
661    async fn test_read_vacuumed_log_history() {
662        let table_path = std::path::Path::new("../test/tests/data/checkpoints_vacuumed")
663            .canonicalize()
664            .unwrap();
665        let table_url = url::Url::from_directory_path(table_path).unwrap();
666        let table = crate::open_table(table_url).await.unwrap();
667
668        // load history for table version with available log file
669        let history: Vec<_> = table
670            .history(Some(5))
671            .await
672            .expect("Cannot get table history")
673            .collect();
674
675        assert_eq!(history.len(), 5);
676
677        // load history for table version without log file
678        let history: Vec<_> = table
679            .history(Some(10))
680            .await
681            .expect("Cannot get table history")
682            .collect();
683
684        assert_eq!(history.len(), 8);
685    }
686
687    #[tokio::test]
688    async fn read_empty_folder() {
689        let dir = std::env::temp_dir();
690        let table_url = url::Url::from_directory_path(&dir).unwrap();
691        let result = crate::open_table(table_url).await;
692
693        assert!(matches!(
694            result.unwrap_err(),
695            crate::errors::DeltaTableError::NotATable(_),
696        ));
697
698        let dir = std::env::temp_dir();
699        let table_url = url::Url::from_directory_path(&dir).unwrap();
700        let result = crate::open_table_with_ds(table_url, "2021-08-09T13:18:31+08:00").await;
701
702        assert!(matches!(
703            result.unwrap_err(),
704            crate::errors::DeltaTableError::NotATable(_),
705        ));
706    }
707
708    #[tokio::test]
709    async fn read_delta_table_with_cdc() {
710        let table_path = std::path::Path::new("../test/tests/data/simple_table_with_cdc")
711            .canonicalize()
712            .unwrap();
713        let table_url = url::Url::from_directory_path(table_path).unwrap();
714        let table = crate::open_table(table_url).await.unwrap();
715        assert_eq!(table.version(), Some(2));
716        assert_eq!(
717            table.snapshot().unwrap().file_paths_iter().collect_vec(),
718            vec![Path::from(
719                "part-00000-7444aec4-710a-4a4c-8abe-3323499043e9.c000.snappy.parquet"
720            ),]
721        );
722    }
723
724    #[tokio::test()]
725    async fn test_version_zero_table_load() {
726        let table_path = std::path::Path::new("../test/tests/data/COVID-19_NYT")
727            .canonicalize()
728            .unwrap();
729        let table_url = url::Url::from_directory_path(table_path).unwrap();
730        let latest_table: DeltaTable = crate::open_table(table_url.clone()).await.unwrap();
731
732        let version_0_table = crate::open_table_with_version(table_url, 0).await.unwrap();
733
734        let version_0_history: Vec<_> = version_0_table
735            .history(None)
736            .await
737            .expect("Cannot get table history")
738            .collect();
739        let latest_table_history: Vec<_> = latest_table
740            .history(None)
741            .await
742            .expect("Cannot get table history")
743            .collect();
744
745        assert_eq!(latest_table_history, version_0_history);
746    }
747
748    #[tokio::test()]
749    async fn test_fail_fast_on_not_existing_path() {
750        use std::path::Path as FolderPath;
751
752        let non_existing_path_str = "../test/tests/data/folder_doesnt_exist";
753
754        // Check that there is no such path at the beginning
755        let path_doesnt_exist = !FolderPath::new(non_existing_path_str).exists();
756        assert!(path_doesnt_exist);
757
758        let table_path = std::path::Path::new(non_existing_path_str);
759        let abs_path = std::fs::canonicalize(".").unwrap().join(table_path);
760        let table_url = url::Url::from_directory_path(abs_path).unwrap();
761        let error = crate::open_table(table_url).await.unwrap_err();
762        let _expected_error_msg = format!(
763            "Local path \"{non_existing_path_str}\" does not exist or you don't have access!"
764        );
765        assert!(matches!(
766            error,
767            DeltaTableError::InvalidTableLocation(_expected_error_msg),
768        ))
769    }
770
771    /// <https://github.com/delta-io/delta-rs/issues/2152>
772    #[tokio::test]
773    async fn test_identity_column() {
774        let table_path = std::path::Path::new("../test/tests/data/issue-2152")
775            .canonicalize()
776            .unwrap();
777        let table_url = url::Url::from_directory_path(table_path).unwrap();
778        let _ = crate::open_table(table_url)
779            .await
780            .expect("Failed to load the table");
781    }
782}