deltalake_core/
lib.rs

1//! Native Delta Lake implementation in Rust
2//!
3//! # Usage
4//!
5//! Load a Delta Table by path:
6//!
7//! ```rust
8//! async {
9//!   let table = deltalake_core::open_table("../test/tests/data/simple_table").await.unwrap();
10//!   let version = table.version();
11//! };
12//! ```
13//!
14//! Load a specific version of Delta Table by path then filter files by partitions:
15//!
16//! ```rust
17//! async {
18//!   let table = deltalake_core::open_table_with_version("../test/tests/data/simple_table", 0).await.unwrap();
19//!   let filter = [deltalake_core::PartitionFilter {
20//!       key: "month".to_string(),
21//!       value: deltalake_core::PartitionValue::Equal("12".to_string()),
22//!   }];
23//!   let files = table.get_files_by_partitions(&filter).await.unwrap();
24//! };
25//! ```
26//!
27//! Load a specific version of Delta Table by path and datetime:
28//!
29//! ```rust
30//! async {
31//!   let table = deltalake_core::open_table_with_ds(
32//!       "../test/tests/data/simple_table",
33//!       "2020-05-02T23:47:31-07:00",
34//!   ).await.unwrap();
35//!   let version = table.version();
36//! };
37//! ```
38//!
39//! # Optional cargo package features
40//!
41//! - `s3`, `gcs`, `azure` - enable the storage backends for AWS S3, Google Cloud Storage (GCS),
42//!   or Azure Blob Storage / Azure Data Lake Storage Gen2 (ADLS2). Use `s3-native-tls` to use native TLS
43//!   instead of Rust TLS implementation.
44//! - `datafusion` - enable the `datafusion::datasource::TableProvider` trait implementation
45//!   for Delta Tables, allowing them to be queried using [DataFusion](https://github.com/apache/arrow-datafusion).
46//! - `datafusion-ext` - DEPRECATED: alias for `datafusion` feature.
47//!
48//! # Querying Delta Tables with Datafusion
49//!
50//! Querying from local filesystem:
51//! ```
52//! use std::sync::Arc;
53//!
54//! # #[cfg(feature="datafusion")]
55//! async {
56//!   use datafusion::prelude::SessionContext;
57//!   let mut ctx = SessionContext::new();
58//!   let table = deltalake_core::open_table("../test/tests/data/simple_table")
59//!       .await
60//!       .unwrap();
61//!   ctx.register_table("demo", Arc::new(table)).unwrap();
62//!
63//!   let batches = ctx
64//!       .sql("SELECT * FROM demo").await.unwrap()
65//!       .collect()
66//!       .await.unwrap();
67//! };
68//! ```
69
70// #![deny(missing_docs)]
71#![allow(rustdoc::invalid_html_tags)]
72#![allow(clippy::nonminimal_bool)]
73pub mod data_catalog;
74pub mod errors;
75pub mod kernel;
76pub mod logstore;
77pub mod operations;
78pub mod protocol;
79pub use kernel::schema;
80pub mod table;
81
82#[cfg(any(test, feature = "integration_test"))]
83pub mod test_utils;
84
85#[cfg(feature = "datafusion")]
86pub mod delta_datafusion;
87pub mod writer;
88
89use std::collections::HashMap;
90use std::sync::OnceLock;
91
92pub use self::data_catalog::{DataCatalog, DataCatalogError};
93pub use self::errors::*;
94pub use self::schema::partitions::*;
95pub use self::schema::*;
96pub use self::table::builder::{DeltaTableBuilder, DeltaTableConfig, DeltaVersion};
97pub use self::table::config::TableProperty;
98pub use self::table::DeltaTable;
99pub use object_store::{path::Path, Error as ObjectStoreError, ObjectMeta, ObjectStore};
100pub use operations::DeltaOps;
101
102pub use protocol::checkpoints;
103
104// convenience exports for consumers to avoid aligning crate versions
105pub use arrow;
106#[cfg(feature = "datafusion")]
107pub use datafusion;
108pub use parquet;
109
110#[cfg(not(any(feature = "rustls", feature = "native-tls")))]
111compile_error!("You must enable at least one of the features: `rustls` or `native-tls`.");
112
113/// Creates and loads a DeltaTable from the given path with current metadata.
114/// Infers the storage backend to use from the scheme in the given table path.
115///
116/// Will fail fast if specified `table_uri` is a local path but doesn't exist.
117pub async fn open_table(table_uri: impl AsRef<str>) -> Result<DeltaTable, DeltaTableError> {
118    let table = DeltaTableBuilder::from_valid_uri(table_uri)?.load().await?;
119    Ok(table)
120}
121
122/// Same as `open_table`, but also accepts storage options to aid in building the table for a deduced
123/// `StorageService`.
124///
125/// Will fail fast if specified `table_uri` is a local path but doesn't exist.
126pub async fn open_table_with_storage_options(
127    table_uri: impl AsRef<str>,
128    storage_options: HashMap<String, String>,
129) -> Result<DeltaTable, DeltaTableError> {
130    let table = DeltaTableBuilder::from_valid_uri(table_uri)?
131        .with_storage_options(storage_options)
132        .load()
133        .await?;
134    Ok(table)
135}
136
137/// Creates a DeltaTable from the given path and loads it with the metadata from the given version.
138/// Infers the storage backend to use from the scheme in the given table path.
139///
140/// Will fail fast if specified `table_uri` is a local path but doesn't exist.
141pub async fn open_table_with_version(
142    table_uri: impl AsRef<str>,
143    version: i64,
144) -> Result<DeltaTable, DeltaTableError> {
145    let table = DeltaTableBuilder::from_valid_uri(table_uri)?
146        .with_version(version)
147        .load()
148        .await?;
149    Ok(table)
150}
151
152/// Creates a DeltaTable from the given path.
153///
154/// Loads metadata from the version appropriate based on the given ISO-8601/RFC-3339 timestamp.
155/// Infers the storage backend to use from the scheme in the given table path.
156///
157/// Will fail fast if specified `table_uri` is a local path but doesn't exist.
158pub async fn open_table_with_ds(
159    table_uri: impl AsRef<str>,
160    ds: impl AsRef<str>,
161) -> Result<DeltaTable, DeltaTableError> {
162    let table = DeltaTableBuilder::from_valid_uri(table_uri)?
163        .with_datestring(ds)?
164        .load()
165        .await?;
166    Ok(table)
167}
168
169static CLIENT_VERSION: OnceLock<String> = OnceLock::new();
170
171pub fn init_client_version(version: &str) {
172    let _ = CLIENT_VERSION.set(version.to_string());
173}
174
175/// Returns Rust core version or custom set client_version such as the py-binding
176pub fn crate_version() -> &'static str {
177    CLIENT_VERSION
178        .get()
179        .map(|s| s.as_str())
180        .unwrap_or(env!("CARGO_PKG_VERSION"))
181}
182
183#[cfg(test)]
184mod tests {
185    use itertools::Itertools;
186
187    use super::*;
188    use crate::table::PeekCommit;
189    use std::collections::HashMap;
190
191    #[tokio::test]
192    async fn read_delta_2_0_table_without_version() {
193        let table = crate::open_table("../test/tests/data/delta-0.2.0")
194            .await
195            .unwrap();
196        let snapshot = table.snapshot().unwrap();
197        assert_eq!(snapshot.version(), 3);
198        assert_eq!(snapshot.protocol().min_writer_version(), 2);
199        assert_eq!(snapshot.protocol().min_reader_version(), 1);
200        assert_eq!(
201            snapshot.file_paths_iter().collect_vec(),
202            vec![
203                Path::from("part-00000-cb6b150b-30b8-4662-ad28-ff32ddab96d2-c000.snappy.parquet"),
204                Path::from("part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet"),
205                Path::from("part-00001-c373a5bd-85f0-4758-815e-7eb62007a15c-c000.snappy.parquet"),
206            ]
207        );
208        let tombstones = table
209            .snapshot()
210            .unwrap()
211            .all_tombstones(&table.log_store())
212            .await
213            .unwrap()
214            .collect_vec();
215        assert_eq!(tombstones.len(), 4);
216        // assert!(tombstones.contains(&crate::kernel::Remove {
217        //     path: "part-00000-512e1537-8aaa-4193-b8b4-bef3de0de409-c000.snappy.parquet".to_string(),
218        //     deletion_timestamp: Some(1564524298213),
219        //     data_change: false,
220        //     extended_file_metadata: None,
221        //     deletion_vector: None,
222        //     partition_values: None,
223        //     tags: None,
224        //     base_row_id: None,
225        //     default_row_commit_version: None,
226        //     size: None,
227        // }));
228    }
229
230    #[tokio::test]
231    async fn read_delta_table_with_update() {
232        let path = "../test/tests/data/simple_table_with_checkpoint/";
233        let table_newest_version = crate::open_table(path).await.unwrap();
234        let mut table_to_update = crate::open_table_with_version(path, 0).await.unwrap();
235        // calling update several times should not produce any duplicates
236        table_to_update.update().await.unwrap();
237        table_to_update.update().await.unwrap();
238        table_to_update.update().await.unwrap();
239
240        assert_eq!(
241            table_newest_version
242                .snapshot()
243                .unwrap()
244                .file_paths_iter()
245                .collect_vec(),
246            table_to_update
247                .snapshot()
248                .unwrap()
249                .file_paths_iter()
250                .collect_vec()
251        );
252    }
253    #[tokio::test]
254    async fn read_delta_2_0_table_with_version() {
255        let mut table = crate::open_table_with_version("../test/tests/data/delta-0.2.0", 0)
256            .await
257            .unwrap();
258        let snapshot = table.snapshot().unwrap();
259        assert_eq!(snapshot.version(), 0);
260        assert_eq!(snapshot.protocol().min_writer_version(), 2);
261        assert_eq!(snapshot.protocol().min_reader_version(), 1);
262        assert_eq!(
263            snapshot.file_paths_iter().collect_vec(),
264            vec![
265                Path::from("part-00000-b44fcdb0-8b06-4f3a-8606-f8311a96f6dc-c000.snappy.parquet"),
266                Path::from("part-00001-185eca06-e017-4dea-ae49-fc48b973e37e-c000.snappy.parquet"),
267            ],
268        );
269
270        table = crate::open_table_with_version("../test/tests/data/delta-0.2.0", 2)
271            .await
272            .unwrap();
273        let snapshot = table.snapshot().unwrap();
274        assert_eq!(snapshot.version(), 2);
275        assert_eq!(snapshot.protocol().min_writer_version(), 2);
276        assert_eq!(snapshot.protocol().min_reader_version(), 1);
277        assert_eq!(
278            snapshot.file_paths_iter().collect_vec(),
279            vec![
280                Path::from("part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet"),
281                Path::from("part-00001-c373a5bd-85f0-4758-815e-7eb62007a15c-c000.snappy.parquet"),
282            ]
283        );
284
285        table = crate::open_table_with_version("../test/tests/data/delta-0.2.0", 3)
286            .await
287            .unwrap();
288        let snapshot = table.snapshot().unwrap();
289        assert_eq!(snapshot.version(), 3);
290        assert_eq!(snapshot.protocol().min_writer_version(), 2);
291        assert_eq!(snapshot.protocol().min_reader_version(), 1);
292        assert_eq!(
293            snapshot.file_paths_iter().collect_vec(),
294            vec![
295                Path::from("part-00000-cb6b150b-30b8-4662-ad28-ff32ddab96d2-c000.snappy.parquet"),
296                Path::from("part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet"),
297                Path::from("part-00001-c373a5bd-85f0-4758-815e-7eb62007a15c-c000.snappy.parquet"),
298            ]
299        );
300    }
301
302    #[tokio::test]
303    async fn read_delta_8_0_table_without_version() {
304        let table = crate::open_table("../test/tests/data/delta-0.8.0")
305            .await
306            .unwrap();
307        let snapshot = table.snapshot().unwrap();
308        assert_eq!(snapshot.version(), 1);
309        assert_eq!(snapshot.protocol().min_writer_version(), 2);
310        assert_eq!(snapshot.protocol().min_reader_version(), 1);
311        assert_eq!(
312            snapshot.file_paths_iter().collect_vec(),
313            vec![
314                Path::from("part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet"),
315                Path::from("part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet"),
316            ]
317        );
318        assert_eq!(table.snapshot().unwrap().log_data().num_files(), 2);
319
320        let stats = table.snapshot().unwrap().add_actions_table(true).unwrap();
321
322        let num_records = stats.column_by_name("num_records").unwrap();
323        let num_records = num_records
324            .as_any()
325            .downcast_ref::<arrow::array::Int64Array>()
326            .unwrap();
327        let total_records = num_records.values().iter().sum::<i64>();
328        assert_eq!(total_records, 4);
329
330        let null_counts = stats.column_by_name("null_count.value").unwrap();
331        let null_counts = null_counts
332            .as_any()
333            .downcast_ref::<arrow::array::Int64Array>()
334            .unwrap();
335        null_counts.values().iter().for_each(|x| assert_eq!(*x, 0));
336
337        let tombstones = table
338            .snapshot()
339            .unwrap()
340            .all_tombstones(&table.log_store())
341            .await
342            .unwrap()
343            .collect_vec();
344        assert_eq!(tombstones.len(), 1);
345        assert!(tombstones.contains(&crate::kernel::Remove {
346            path: "part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet".to_string(),
347            deletion_timestamp: Some(1615043776198),
348            data_change: true,
349            extended_file_metadata: Some(true),
350            partition_values: Some(HashMap::new()),
351            size: Some(445),
352            base_row_id: None,
353            default_row_commit_version: None,
354            deletion_vector: None,
355            tags: Some(HashMap::new()),
356        }));
357    }
358
359    #[tokio::test]
360    async fn read_delta_8_0_table_with_load_version() {
361        let mut table = crate::open_table("../test/tests/data/delta-0.8.0")
362            .await
363            .unwrap();
364        let snapshot = table.snapshot().unwrap();
365        assert_eq!(snapshot.version(), 1);
366        assert_eq!(snapshot.protocol().min_writer_version(), 2);
367        assert_eq!(snapshot.protocol().min_reader_version(), 1);
368        assert_eq!(
369            snapshot.file_paths_iter().collect_vec(),
370            vec![
371                Path::from("part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet"),
372                Path::from("part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet"),
373            ]
374        );
375        table.load_version(0).await.unwrap();
376        let snapshot = table.snapshot().unwrap();
377        assert_eq!(snapshot.version(), 0);
378        assert_eq!(snapshot.protocol().min_writer_version(), 2);
379        assert_eq!(snapshot.protocol().min_reader_version(), 1);
380        assert_eq!(
381            snapshot.file_paths_iter().collect_vec(),
382            vec![
383                Path::from("part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet"),
384                Path::from("part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"),
385            ]
386        );
387    }
388
389    #[tokio::test]
390    async fn read_delta_8_0_table_with_partitions() {
391        let table = crate::open_table("../test/tests/data/delta-0.8.0-partitioned")
392            .await
393            .unwrap();
394
395        let filters = vec![
396            crate::PartitionFilter {
397                key: "month".to_string(),
398                value: crate::PartitionValue::Equal("2".to_string()),
399            },
400            crate::PartitionFilter {
401                key: "year".to_string(),
402                value: crate::PartitionValue::Equal("2020".to_string()),
403            },
404        ];
405
406        assert_eq!(
407            table.get_files_by_partitions(&filters).await.unwrap(),
408            vec![
409                Path::from("year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet"),
410                Path::from("year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet")
411            ]
412        );
413        assert_eq!(
414            table.get_file_uris_by_partitions(&filters).await.unwrap().into_iter().map(|p| std::fs::canonicalize(p).unwrap()).collect::<Vec<_>>(),
415            vec![
416                std::fs::canonicalize("../test/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet").unwrap(),
417                std::fs::canonicalize("../test/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet").unwrap(),
418            ]
419        );
420
421        let filters = vec![crate::PartitionFilter {
422            key: "month".to_string(),
423            value: crate::PartitionValue::NotEqual("2".to_string()),
424        }];
425        assert_eq!(
426            table.get_files_by_partitions(&filters).await.unwrap(),
427            vec![
428                Path::from("year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet"),
429                Path::from("year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet"),
430                Path::from("year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet"),
431                Path::from("year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet")
432            ]
433        );
434
435        let filters = vec![crate::PartitionFilter {
436            key: "month".to_string(),
437            value: crate::PartitionValue::In(vec!["2".to_string(), "12".to_string()]),
438        }];
439        assert_eq!(
440            table.get_files_by_partitions(&filters).await.unwrap(),
441            vec![
442                Path::from("year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet"),
443                Path::from("year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet"),
444                Path::from("year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet"),
445                Path::from("year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet")
446            ]
447        );
448
449        let filters = vec![crate::PartitionFilter {
450            key: "month".to_string(),
451            value: crate::PartitionValue::NotIn(vec!["2".to_string(), "12".to_string()]),
452        }];
453        assert_eq!(
454            table.get_files_by_partitions(&filters).await.unwrap(),
455            vec![
456                Path::from("year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet"),
457                Path::from("year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet")
458            ]
459        );
460    }
461
462    #[tokio::test]
463    async fn read_delta_8_0_table_with_null_partition() {
464        let table = crate::open_table("../test/tests/data/delta-0.8.0-null-partition")
465            .await
466            .unwrap();
467
468        let filters = vec![crate::PartitionFilter {
469            key: "k".to_string(),
470            value: crate::PartitionValue::Equal("A".to_string()),
471        }];
472        assert_eq!(
473            table.get_files_by_partitions(&filters).await.unwrap(),
474            vec![Path::from(
475                "k=A/part-00000-b1f1dbbb-70bc-4970-893f-9bb772bf246e.c000.snappy.parquet"
476            )]
477        );
478
479        let filters = vec![crate::PartitionFilter {
480            key: "k".to_string(),
481            value: crate::PartitionValue::Equal("".to_string()),
482        }];
483        assert_eq!(
484            table.get_files_by_partitions(&filters).await.unwrap(),
485            vec![
486                Path::from("k=__HIVE_DEFAULT_PARTITION__/part-00001-8474ac85-360b-4f58-b3ea-23990c71b932.c000.snappy.parquet")
487            ]
488        );
489    }
490
491    #[tokio::test]
492    async fn read_delta_8_0_table_with_special_partition() {
493        let table = crate::open_table("../test/tests/data/delta-0.8.0-special-partition")
494            .await
495            .unwrap();
496
497        assert_eq!(
498            table.snapshot().unwrap().file_paths_iter().collect_vec(),
499            vec![
500                Path::parse(
501                    "x=A%2FA/part-00007-b350e235-2832-45df-9918-6cab4f7578f7.c000.snappy.parquet"
502                )
503                .unwrap(),
504                Path::parse(
505                    "x=B%20B/part-00015-e9abbc6f-85e9-457b-be8e-e9f5b8a22890.c000.snappy.parquet"
506                )
507                .unwrap()
508            ]
509        );
510
511        let filters = vec![crate::PartitionFilter {
512            key: "x".to_string(),
513            value: crate::PartitionValue::Equal("A/A".to_string()),
514        }];
515        assert_eq!(
516            table.get_files_by_partitions(&filters).await.unwrap(),
517            vec![Path::parse(
518                "x=A%2FA/part-00007-b350e235-2832-45df-9918-6cab4f7578f7.c000.snappy.parquet"
519            )
520            .unwrap()]
521        );
522    }
523
524    #[tokio::test]
525    async fn read_delta_8_0_table_partition_with_compare_op() {
526        let table = crate::open_table("../test/tests/data/delta-0.8.0-numeric-partition")
527            .await
528            .unwrap();
529
530        let filters = vec![crate::PartitionFilter {
531            key: "x".to_string(),
532            value: crate::PartitionValue::LessThanOrEqual("9".to_string()),
533        }];
534        assert_eq!(
535            table.get_files_by_partitions(&filters).await.unwrap(),
536            vec![Path::from(
537                "x=9/y=9.9/part-00007-3c50fba1-4264-446c-9c67-d8e24a1ccf83.c000.snappy.parquet"
538            )]
539        );
540
541        let filters = vec![crate::PartitionFilter {
542            key: "y".to_string(),
543            value: crate::PartitionValue::LessThan("10.0".to_string()),
544        }];
545        assert_eq!(
546            table.get_files_by_partitions(&filters).await.unwrap(),
547            vec![Path::from(
548                "x=9/y=9.9/part-00007-3c50fba1-4264-446c-9c67-d8e24a1ccf83.c000.snappy.parquet"
549            )]
550        );
551    }
552
553    #[tokio::test]
554    async fn test_table_history() {
555        let path = "../test/tests/data/simple_table_with_checkpoint";
556        let latest_table = crate::open_table(path).await.unwrap();
557
558        let table = crate::open_table_with_version(path, 1).await.unwrap();
559
560        let history1 = table.history(None).await.expect("Cannot get table history");
561        let history2 = latest_table
562            .history(None)
563            .await
564            .expect("Cannot get table history");
565
566        assert_eq!(history1, history2);
567
568        let history3 = latest_table
569            .history(Some(5))
570            .await
571            .expect("Cannot get table history");
572        assert_eq!(history3.len(), 5);
573    }
574
575    #[tokio::test]
576    async fn test_poll_table_commits() {
577        let path = "../test/tests/data/simple_table_with_checkpoint";
578        let mut table = crate::open_table_with_version(path, 9).await.unwrap();
579        assert_eq!(table.version(), Some(9));
580        let peek = table
581            .log_store()
582            .peek_next_commit(table.version().unwrap())
583            .await
584            .unwrap();
585        assert!(matches!(peek, PeekCommit::New(..)));
586
587        if let PeekCommit::New(version, actions) = peek {
588            assert_eq!(table.version(), Some(9));
589            assert!(!table.snapshot().unwrap().file_paths_iter().any(|f| f
590                == Path::from(
591                    "part-00000-f0e955c5-a1e3-4eec-834e-dcc098fc9005-c000.snappy.parquet"
592                )));
593
594            assert_eq!(version, 10);
595            assert_eq!(actions.len(), 2);
596
597            table.update_incremental(None).await.unwrap();
598
599            assert_eq!(table.version(), Some(10));
600            assert!(table.snapshot().unwrap().file_paths_iter().any(|f| f
601                == Path::from(
602                    "part-00000-f0e955c5-a1e3-4eec-834e-dcc098fc9005-c000.snappy.parquet"
603                )));
604        };
605
606        let peek = table
607            .log_store()
608            .peek_next_commit(table.version().unwrap())
609            .await
610            .unwrap();
611        assert!(matches!(peek, PeekCommit::UpToDate));
612    }
613
614    #[tokio::test]
615    async fn test_read_vacuumed_log() {
616        let path = "../test/tests/data/checkpoints_vacuumed";
617        let table = crate::open_table(path).await.unwrap();
618        assert_eq!(table.version(), Some(12));
619    }
620
621    #[tokio::test]
622    async fn test_read_vacuumed_log_history() {
623        let path = "../test/tests/data/checkpoints_vacuumed";
624        let table = crate::open_table(path).await.unwrap();
625
626        // load history for table version with available log file
627        let history = table
628            .history(Some(5))
629            .await
630            .expect("Cannot get table history");
631
632        assert_eq!(history.len(), 5);
633
634        // load history for table version without log file
635        let history = table
636            .history(Some(10))
637            .await
638            .expect("Cannot get table history");
639
640        assert_eq!(history.len(), 8);
641    }
642
643    #[tokio::test]
644    async fn read_empty_folder() {
645        let dir = std::env::temp_dir();
646        let result = crate::open_table(&dir.into_os_string().into_string().unwrap()).await;
647
648        assert!(matches!(
649            result.unwrap_err(),
650            crate::errors::DeltaTableError::NotATable(_),
651        ));
652
653        let dir = std::env::temp_dir();
654        let result = crate::open_table_with_ds(
655            &dir.into_os_string().into_string().unwrap(),
656            "2021-08-09T13:18:31+08:00",
657        )
658        .await;
659
660        assert!(matches!(
661            result.unwrap_err(),
662            crate::errors::DeltaTableError::NotATable(_),
663        ));
664    }
665
666    #[tokio::test]
667    async fn read_delta_table_with_cdc() {
668        let table = crate::open_table("../test/tests/data/simple_table_with_cdc")
669            .await
670            .unwrap();
671        assert_eq!(table.version(), Some(2));
672        assert_eq!(
673            table.snapshot().unwrap().file_paths_iter().collect_vec(),
674            vec![Path::from(
675                "part-00000-7444aec4-710a-4a4c-8abe-3323499043e9.c000.snappy.parquet"
676            ),]
677        );
678    }
679
680    #[tokio::test()]
681    async fn test_version_zero_table_load() {
682        let path = "../test/tests/data/COVID-19_NYT";
683        let latest_table: DeltaTable = crate::open_table(path).await.unwrap();
684
685        let version_0_table = crate::open_table_with_version(path, 0).await.unwrap();
686
687        let version_0_history = version_0_table
688            .history(None)
689            .await
690            .expect("Cannot get table history");
691        let latest_table_history = latest_table
692            .history(None)
693            .await
694            .expect("Cannot get table history");
695
696        assert_eq!(latest_table_history, version_0_history);
697    }
698
699    #[tokio::test()]
700    async fn test_fail_fast_on_not_existing_path() {
701        use std::path::Path as FolderPath;
702
703        let non_existing_path_str = "../test/tests/data/folder_doesnt_exist";
704
705        // Check that there is no such path at the beginning
706        let path_doesnt_exist = !FolderPath::new(non_existing_path_str).exists();
707        assert!(path_doesnt_exist);
708
709        let error = crate::open_table(non_existing_path_str).await.unwrap_err();
710        let _expected_error_msg = format!(
711            "Local path \"{non_existing_path_str}\" does not exist or you don't have access!"
712        );
713        assert!(matches!(
714            error,
715            DeltaTableError::InvalidTableLocation(_expected_error_msg),
716        ))
717    }
718
719    /// <https://github.com/delta-io/delta-rs/issues/2152>
720    #[tokio::test]
721    async fn test_identity_column() {
722        let path = "../test/tests/data/issue-2152";
723        let _ = crate::open_table(path)
724            .await
725            .expect("Failed to load the table");
726    }
727}