Skip to main content

deltalake_core/
lib.rs

1//! Native Delta Lake implementation in Rust
2//!
3//! # Usage
4//!
5//! Load a Delta Table by URL:
6//!
7//! ```rust
8//! # use url::Url;
9//! async {
10//!   let table_url = Url::from_directory_path("/abs/test/tests/data/simple_table").unwrap();
11//!   let table = deltalake_core::open_table(table_url).await.unwrap();
12//!   let version = table.version();
13//! };
14//! ```
15//!
16//! Load a specific version of Delta Table by URL then filter files by partitions:
17//!
18//! ```rust
19//! # use url::Url;
20//! async {
21//!   let table_url = Url::from_directory_path("/abs/test/tests/data/simple_table").unwrap();
22//!   let table = deltalake_core::open_table_with_version(table_url, 0).await.unwrap();
23//!   let filter = [deltalake_core::PartitionFilter {
24//!       key: "month".to_string(),
25//!       value: deltalake_core::PartitionValue::Equal("12".to_string()),
26//!   }];
27//!   let files = table.get_files_by_partitions(&filter).await.unwrap();
28//! };
29//! ```
30//!
31//! Load a specific version of Delta Table by URL and datetime:
32//!
33//! ```rust
34//! # use url::Url;
35//! async {
36//!   let table_url = Url::from_directory_path("../test/tests/data/simple_table").unwrap();
37//!   let table = deltalake_core::open_table_with_ds(
38//!       table_url,
39//!       "2020-05-02T23:47:31-07:00",
40//!   ).await.unwrap();
41//!   let version = table.version();
42//! };
43//! ```
44//!
45//! # Optional cargo package features
46//!
47//! - `s3`, `gcs`, `azure` - enable the storage backends for AWS S3, Google Cloud Storage (GCS),
48//!   or Azure Blob Storage / Azure Data Lake Storage Gen2 (ADLS2). Use `s3-native-tls` to use native TLS
49//!   instead of Rust TLS implementation.
50//! - `datafusion` - enable the `datafusion::datasource::TableProvider` trait implementation
51//!   for Delta Tables, allowing them to be queried using [DataFusion](https://github.com/apache/arrow-datafusion).
52//! - `datafusion-ext` - DEPRECATED: alias for `datafusion` feature.
53//!
54//! # Querying Delta Tables with Datafusion
55//!
56//! Querying from local filesystem:
57//! ```
58//! use std::sync::Arc;
59//! # use url::Url;
60//!
61//! # #[cfg(feature="datafusion")]
62//! async {
63//!   use datafusion::prelude::SessionContext;
64//!   let mut ctx = SessionContext::new();
65//!   let table_url = Url::from_directory_path("/abs/test/tests/data/simple_table").unwrap();
66//!   let table = deltalake_core::open_table(table_url)
67//!       .await
68//!       .unwrap();
69//!   ctx.register_table("demo", table.table_provider().await.unwrap()).unwrap();
70//!
71//!   let batches = ctx
72//!       .sql("SELECT * FROM demo").await.unwrap()
73//!       .collect()
74//!       .await.unwrap();
75//! };
76//! ```
77
78// #![deny(missing_docs)]
79#![allow(rustdoc::invalid_html_tags)]
80#![allow(clippy::nonminimal_bool)]
81pub mod data_catalog;
82pub mod errors;
83pub mod kernel;
84pub mod logstore;
85pub mod operations;
86pub mod protocol;
87pub use kernel::schema;
88pub mod table;
89
90#[cfg(any(test, feature = "integration_test"))]
91pub mod test_utils;
92
93#[cfg(feature = "datafusion")]
94pub mod delta_datafusion;
95pub mod writer;
96
97use std::collections::HashMap;
98use std::sync::OnceLock;
99use url::Url;
100
101pub use self::data_catalog::{DataCatalog, DataCatalogError};
102pub use self::errors::*;
103pub use self::schema::partitions::*;
104pub use self::schema::*;
105pub use self::table::DeltaTable;
106pub use self::table::builder::{
107    DeltaTableBuilder, DeltaTableConfig, DeltaVersion, ensure_table_uri,
108};
109pub use self::table::config::TableProperty;
110pub use object_store::{Error as ObjectStoreError, ObjectMeta, ObjectStore, path::Path};
111#[allow(deprecated)]
112pub use operations::DeltaOps;
113
114pub use protocol::checkpoints;
115
116// convenience exports for consumers to avoid aligning crate versions
117pub use arrow;
118#[cfg(feature = "datafusion")]
119pub use datafusion;
120pub use parquet;
121
122#[cfg(not(any(feature = "rustls", feature = "native-tls")))]
123compile_error!("You must enable at least one of the features: `rustls` or `native-tls`.");
124
125/// Creates and loads a DeltaTable from the given URL with current metadata.
126/// Infers the storage backend to use from the scheme in the given table URL.
127///
128/// Will fail fast if specified `table_uri` is a local path but doesn't exist.
129pub async fn open_table(table_url: Url) -> Result<DeltaTable, DeltaTableError> {
130    let table = DeltaTableBuilder::from_url(table_url)?.load().await?;
131    Ok(table)
132}
133
134/// Same as `open_table`, but also accepts storage options to aid in building the table for a deduced
135/// `StorageService`.
136///
137/// Will fail fast if specified `table_uri` is a local path but doesn't exist.
138pub async fn open_table_with_storage_options(
139    table_url: Url,
140    storage_options: HashMap<String, String>,
141) -> Result<DeltaTable, DeltaTableError> {
142    let table = DeltaTableBuilder::from_url(table_url)?
143        .with_storage_options(storage_options)
144        .load()
145        .await?;
146    Ok(table)
147}
148
149/// Creates a DeltaTable from the given URL and loads it with the metadata from the given version.
150/// Infers the storage backend to use from the scheme in the given table URL.
151///
152/// Will fail fast if specified `table_uri` is a local path but doesn't exist.
153pub async fn open_table_with_version(
154    table_url: Url,
155    version: i64,
156) -> Result<DeltaTable, DeltaTableError> {
157    let table = DeltaTableBuilder::from_url(table_url)?
158        .with_version(version)
159        .load()
160        .await?;
161    Ok(table)
162}
163
164/// Creates a DeltaTable from the given URL.
165///
166/// Loads metadata from the version appropriate based on the given ISO-8601/RFC-3339 timestamp.
167/// Infers the storage backend to use from the scheme in the given table URL.
168///
169/// Will fail fast if specified `table_uri` is a local path but doesn't exist.
170pub async fn open_table_with_ds(
171    table_url: Url,
172    ds: impl AsRef<str>,
173) -> Result<DeltaTable, DeltaTableError> {
174    let table = DeltaTableBuilder::from_url(table_url)?
175        .with_datestring(ds)?
176        .load()
177        .await?;
178    Ok(table)
179}
180
181static CLIENT_VERSION: OnceLock<String> = OnceLock::new();
182
183pub fn init_client_version(version: &str) {
184    let _ = CLIENT_VERSION.set(version.to_string());
185}
186
187/// Returns Rust core version or custom set client_version such as the py-binding
188pub fn crate_version() -> &'static str {
189    CLIENT_VERSION
190        .get()
191        .map(|s| s.as_str())
192        .unwrap_or(env!("CARGO_PKG_VERSION"))
193}
194
195#[cfg(test)]
196mod tests {
197    use futures::TryStreamExt as _;
198
199    use super::*;
200    use test_utils::file_paths_from;
201
202    #[tokio::test]
203    async fn read_delta_2_0_table_without_version() -> DeltaResult<()> {
204        let table_path = std::path::Path::new("../test/tests/data/delta-0.2.0")
205            .canonicalize()
206            .unwrap();
207        let table_url = url::Url::from_directory_path(table_path).unwrap();
208        let table = crate::open_table(table_url).await.unwrap();
209        let snapshot = table.snapshot().unwrap();
210        assert_eq!(snapshot.version(), 3);
211        assert_eq!(snapshot.protocol().min_writer_version(), 2);
212        assert_eq!(snapshot.protocol().min_reader_version(), 1);
213        assert_eq!(
214            file_paths_from(snapshot, &table.log_store()).await?,
215            vec![
216                "part-00000-cb6b150b-30b8-4662-ad28-ff32ddab96d2-c000.snappy.parquet",
217                "part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet",
218                "part-00001-c373a5bd-85f0-4758-815e-7eb62007a15c-c000.snappy.parquet",
219            ]
220        );
221        let tombstones = table
222            .snapshot()
223            .unwrap()
224            .all_tombstones(&table.log_store())
225            .try_collect::<Vec<_>>()
226            .await
227            .unwrap();
228        assert_eq!(tombstones.len(), 4);
229        // assert!(tombstones.contains(&crate::kernel::Remove {
230        //     path: "part-00000-512e1537-8aaa-4193-b8b4-bef3de0de409-c000.snappy.parquet".to_string(),
231        //     deletion_timestamp: Some(1564524298213),
232        //     data_change: false,
233        //     extended_file_metadata: None,
234        //     deletion_vector: None,
235        //     partition_values: None,
236        //     tags: None,
237        //     base_row_id: None,
238        //     default_row_commit_version: None,
239        //     size: None,
240        // }));
241        Ok(())
242    }
243
244    #[tokio::test]
245    async fn read_delta_table_with_update() -> DeltaResult<()> {
246        let table_path = std::path::Path::new("../test/tests/data/simple_table_with_checkpoint/")
247            .canonicalize()
248            .unwrap();
249        let table_url = url::Url::from_directory_path(table_path).unwrap();
250        let table_newest_version = crate::open_table(table_url.clone()).await.unwrap();
251        let mut table_to_update = crate::open_table_with_version(table_url, 0).await.unwrap();
252        // calling update several times should not produce any duplicates
253        table_to_update.update_state().await.unwrap();
254        table_to_update.update_state().await.unwrap();
255        table_to_update.update_state().await.unwrap();
256
257        assert_eq!(
258            file_paths_from(
259                table_newest_version.snapshot()?,
260                &table_newest_version.log_store()
261            )
262            .await?,
263            file_paths_from(table_to_update.snapshot()?, &table_to_update.log_store()).await?
264        );
265        Ok(())
266    }
267
268    #[tokio::test]
269    async fn read_delta_2_0_table_with_version() -> DeltaResult<()> {
270        let table_path = std::path::Path::new("../test/tests/data/delta-0.2.0")
271            .canonicalize()
272            .unwrap();
273        let table_url = url::Url::from_directory_path(table_path).unwrap();
274        let mut table = crate::open_table_with_version(table_url.clone(), 0)
275            .await
276            .unwrap();
277        let snapshot = table.snapshot().unwrap();
278        assert_eq!(snapshot.version(), 0);
279        assert_eq!(snapshot.protocol().min_writer_version(), 2);
280        assert_eq!(snapshot.protocol().min_reader_version(), 1);
281        assert_eq!(
282            file_paths_from(snapshot, &table.log_store()).await?,
283            vec![
284                "part-00000-b44fcdb0-8b06-4f3a-8606-f8311a96f6dc-c000.snappy.parquet",
285                "part-00001-185eca06-e017-4dea-ae49-fc48b973e37e-c000.snappy.parquet",
286            ],
287        );
288
289        table = crate::open_table_with_version(table_url.clone(), 2)
290            .await
291            .unwrap();
292        let snapshot = table.snapshot().unwrap();
293        assert_eq!(snapshot.version(), 2);
294        assert_eq!(snapshot.protocol().min_writer_version(), 2);
295        assert_eq!(snapshot.protocol().min_reader_version(), 1);
296        assert_eq!(
297            file_paths_from(snapshot, &table.log_store()).await?,
298            vec![
299                "part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet",
300                "part-00001-c373a5bd-85f0-4758-815e-7eb62007a15c-c000.snappy.parquet",
301            ]
302        );
303
304        table = crate::open_table_with_version(table_url, 3).await.unwrap();
305        let snapshot = table.snapshot().unwrap();
306        assert_eq!(snapshot.version(), 3);
307        assert_eq!(snapshot.protocol().min_writer_version(), 2);
308        assert_eq!(snapshot.protocol().min_reader_version(), 1);
309        assert_eq!(
310            file_paths_from(snapshot, &table.log_store()).await?,
311            vec![
312                "part-00000-cb6b150b-30b8-4662-ad28-ff32ddab96d2-c000.snappy.parquet",
313                "part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet",
314                "part-00001-c373a5bd-85f0-4758-815e-7eb62007a15c-c000.snappy.parquet",
315            ]
316        );
317        Ok(())
318    }
319
320    #[tokio::test]
321    async fn read_delta_8_0_table_without_version() {
322        let table_path = std::path::Path::new("../test/tests/data/delta-0.8.0")
323            .canonicalize()
324            .unwrap();
325        let table_url = url::Url::from_directory_path(table_path).unwrap();
326        let table = crate::open_table(table_url).await.unwrap();
327        let snapshot = table.snapshot().unwrap();
328        assert_eq!(snapshot.version(), 1);
329        assert_eq!(snapshot.protocol().min_writer_version(), 2);
330        assert_eq!(snapshot.protocol().min_reader_version(), 1);
331        assert_eq!(
332            file_paths_from(snapshot, &table.log_store()).await.unwrap(),
333            vec![
334                "part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet",
335                "part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet",
336            ]
337        );
338        assert_eq!(table.snapshot().unwrap().log_data().num_files(), 2);
339
340        let stats = table.snapshot().unwrap().add_actions_table(true).unwrap();
341
342        let num_records = stats.column_by_name("num_records").unwrap();
343        let num_records = num_records
344            .as_any()
345            .downcast_ref::<arrow::array::Int64Array>()
346            .unwrap();
347        let total_records = num_records.values().iter().sum::<i64>();
348        assert_eq!(total_records, 4);
349
350        let null_counts = stats.column_by_name("null_count.value").unwrap();
351        let null_counts = null_counts
352            .as_any()
353            .downcast_ref::<arrow::array::Int64Array>()
354            .unwrap();
355        null_counts.values().iter().for_each(|x| assert_eq!(*x, 0));
356
357        let tombstones = table
358            .snapshot()
359            .unwrap()
360            .all_tombstones(&table.log_store())
361            .try_collect::<Vec<_>>()
362            .await
363            .unwrap();
364        assert_eq!(tombstones.len(), 1);
365        let tombstone = tombstones.first().unwrap();
366        assert_eq!(
367            tombstone.path(),
368            "part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"
369        );
370        assert_eq!(tombstone.deletion_timestamp(), Some(1615043776198));
371    }
372
373    #[tokio::test]
374    async fn read_delta_8_0_table_with_load_version() {
375        let table_path = std::path::Path::new("../test/tests/data/delta-0.8.0")
376            .canonicalize()
377            .unwrap();
378        let table_url = url::Url::from_directory_path(table_path).unwrap();
379        let mut table = crate::open_table(table_url).await.unwrap();
380        let snapshot = table.snapshot().unwrap();
381        assert_eq!(snapshot.version(), 1);
382        assert_eq!(snapshot.protocol().min_writer_version(), 2);
383        assert_eq!(snapshot.protocol().min_reader_version(), 1);
384        assert_eq!(
385            file_paths_from(snapshot, &table.log_store()).await.unwrap(),
386            vec![
387                "part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet",
388                "part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet",
389            ]
390        );
391        table.load_version(0).await.unwrap();
392        let snapshot = table.snapshot().unwrap();
393        assert_eq!(snapshot.version(), 0);
394        assert_eq!(snapshot.protocol().min_writer_version(), 2);
395        assert_eq!(snapshot.protocol().min_reader_version(), 1);
396        assert_eq!(
397            file_paths_from(snapshot, &table.log_store()).await.unwrap(),
398            vec![
399                "part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet",
400                "part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet",
401            ]
402        );
403    }
404
405    #[tokio::test]
406    async fn read_delta_8_0_table_with_partitions() {
407        let table_path = std::path::Path::new("../test/tests/data/delta-0.8.0-partitioned")
408            .canonicalize()
409            .unwrap();
410        let table_url = url::Url::from_directory_path(table_path).unwrap();
411        let table = crate::open_table(table_url).await.unwrap();
412
413        let filters = vec![
414            crate::PartitionFilter {
415                key: "month".to_string(),
416                value: crate::PartitionValue::Equal("2".to_string()),
417            },
418            crate::PartitionFilter {
419                key: "year".to_string(),
420                value: crate::PartitionValue::Equal("2020".to_string()),
421            },
422        ];
423
424        assert_eq!(
425            table.get_files_by_partitions(&filters).await.unwrap(),
426            vec![
427                Path::from(
428                    "year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet"
429                ),
430                Path::from(
431                    "year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet"
432                )
433            ]
434        );
435        assert_eq!(
436            table.get_file_uris_by_partitions(&filters).await.unwrap().into_iter().map(|p| std::fs::canonicalize(p).unwrap()).collect::<Vec<_>>(),
437            vec![
438                std::fs::canonicalize("../test/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet").unwrap(),
439                std::fs::canonicalize("../test/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet").unwrap(),
440            ]
441        );
442
443        let filters = vec![crate::PartitionFilter {
444            key: "month".to_string(),
445            value: crate::PartitionValue::NotEqual("2".to_string()),
446        }];
447        assert_eq!(
448            table.get_files_by_partitions(&filters).await.unwrap(),
449            vec![
450                Path::from(
451                    "year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet"
452                ),
453                Path::from(
454                    "year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet"
455                ),
456                Path::from(
457                    "year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet"
458                ),
459                Path::from(
460                    "year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet"
461                )
462            ]
463        );
464
465        let filters = vec![crate::PartitionFilter {
466            key: "month".to_string(),
467            value: crate::PartitionValue::In(vec!["2".to_string(), "12".to_string()]),
468        }];
469        assert_eq!(
470            table.get_files_by_partitions(&filters).await.unwrap(),
471            vec![
472                Path::from(
473                    "year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet"
474                ),
475                Path::from(
476                    "year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet"
477                ),
478                Path::from(
479                    "year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet"
480                ),
481                Path::from(
482                    "year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet"
483                )
484            ]
485        );
486
487        let filters = vec![crate::PartitionFilter {
488            key: "month".to_string(),
489            value: crate::PartitionValue::NotIn(vec!["2".to_string(), "12".to_string()]),
490        }];
491        assert_eq!(
492            table.get_files_by_partitions(&filters).await.unwrap(),
493            vec![
494                Path::from(
495                    "year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet"
496                ),
497                Path::from(
498                    "year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet"
499                )
500            ]
501        );
502    }
503
504    #[tokio::test]
505    async fn read_delta_8_0_table_with_null_partition() {
506        let table_path = std::path::Path::new("../test/tests/data/delta-0.8.0-null-partition")
507            .canonicalize()
508            .unwrap();
509        let table_url = url::Url::from_directory_path(table_path).unwrap();
510        let table = crate::open_table(table_url).await.unwrap();
511
512        let filters = vec![crate::PartitionFilter {
513            key: "k".to_string(),
514            value: crate::PartitionValue::Equal("A".to_string()),
515        }];
516        assert_eq!(
517            table.get_files_by_partitions(&filters).await.unwrap(),
518            vec![Path::from(
519                "k=A/part-00000-b1f1dbbb-70bc-4970-893f-9bb772bf246e.c000.snappy.parquet"
520            )]
521        );
522
523        let filters = vec![crate::PartitionFilter {
524            key: "k".to_string(),
525            value: crate::PartitionValue::Equal("".to_string()),
526        }];
527        assert_eq!(
528            table.get_files_by_partitions(&filters).await.unwrap(),
529            vec![Path::from(
530                "k=__HIVE_DEFAULT_PARTITION__/part-00001-8474ac85-360b-4f58-b3ea-23990c71b932.c000.snappy.parquet"
531            )]
532        );
533    }
534
535    #[tokio::test]
536    async fn read_delta_8_0_table_with_special_partition() -> DeltaResult<()> {
537        let table_path = std::path::Path::new("../test/tests/data/delta-0.8.0-special-partition")
538            .canonicalize()
539            .unwrap();
540        let table_url = url::Url::from_directory_path(table_path).unwrap();
541        let _table = crate::open_table(table_url).await.unwrap();
542
543        /* All of this is bad
544         * TODO: determine whether this test makes sense for delta-rs to have any more
545        assert_eq!(
546            file_paths_from(table.snapshot().unwrap(), &table.log_store()).await.unwrap(),
547            vec![
548                "x=A-A/part-00007-b350e235-2832-45df-9918-6cab4f7578f7.c000.snappy.parquet",
549                "x=B B/part-00015-e9abbc6f-85e9-457b-be8e-e9f5b8a22890.c000.snappy.parquet",
550            ]
551        );
552
553        let filters = vec![crate::PartitionFilter {
554            key: "x".to_string(),
555            value: crate::PartitionValue::Equal("A/A".to_string()),
556        }];
557        assert_eq!(
558            table.get_files_by_partitions(&filters).await.unwrap(),
559            vec![Path::parse(
560                "x=A%2FA/part-00007-b350e235-2832-45df-9918-6cab4f7578f7.c000.snappy.parquet"
561            )
562            .unwrap()]
563        );
564        */
565        Ok(())
566    }
567
568    #[tokio::test]
569    async fn read_delta_8_0_table_partition_with_compare_op() {
570        let table_path = std::path::Path::new("../test/tests/data/delta-0.8.0-numeric-partition")
571            .canonicalize()
572            .unwrap();
573        let table_url = url::Url::from_directory_path(table_path).unwrap();
574        let table = crate::open_table(table_url).await.unwrap();
575
576        let filters = vec![crate::PartitionFilter {
577            key: "x".to_string(),
578            value: crate::PartitionValue::LessThanOrEqual("9".to_string()),
579        }];
580        assert_eq!(
581            table.get_files_by_partitions(&filters).await.unwrap().len(),
582            1
583        );
584
585        let filters = vec![crate::PartitionFilter {
586            key: "y".to_string(),
587            value: crate::PartitionValue::LessThan("10.0".to_string()),
588        }];
589        assert_eq!(
590            table.get_files_by_partitions(&filters).await.unwrap().len(),
591            1
592        );
593    }
594
595    #[tokio::test]
596    async fn test_table_history() {
597        let table_path = std::path::Path::new("../test/tests/data/simple_table_with_checkpoint")
598            .canonicalize()
599            .unwrap();
600        let table_url = url::Url::from_directory_path(table_path).unwrap();
601        let latest_table = crate::open_table(table_url.clone()).await.unwrap();
602
603        let table = crate::open_table_with_version(table_url, 1).await.unwrap();
604
605        let history1: Vec<_> = table
606            .history(None)
607            .await
608            .expect("Cannot get table history")
609            .collect();
610        let history2: Vec<_> = latest_table
611            .history(None)
612            .await
613            .expect("Cannot get table history")
614            .collect();
615
616        assert_eq!(history1, history2);
617
618        let history3: Vec<_> = latest_table
619            .history(Some(5))
620            .await
621            .expect("Cannot get table history")
622            .collect();
623        assert_eq!(history3.len(), 5);
624    }
625
626    #[tokio::test]
627    async fn test_read_vacuumed_log() {
628        let table_path = std::path::Path::new("../test/tests/data/checkpoints_vacuumed")
629            .canonicalize()
630            .unwrap();
631        let table_url = url::Url::from_directory_path(table_path).unwrap();
632        let table = crate::open_table(table_url).await.unwrap();
633        assert_eq!(table.version(), Some(12));
634    }
635
636    #[tokio::test]
637    async fn test_read_vacuumed_log_history() {
638        let table_path = std::path::Path::new("../test/tests/data/checkpoints_vacuumed")
639            .canonicalize()
640            .unwrap();
641        let table_url = url::Url::from_directory_path(table_path).unwrap();
642        let table = crate::open_table(table_url).await.unwrap();
643
644        // load history for table version with available log file
645        let history: Vec<_> = table
646            .history(Some(5))
647            .await
648            .expect("Cannot get table history")
649            .collect();
650
651        assert_eq!(history.len(), 5);
652
653        // load history for table version without log file
654        let history: Vec<_> = table
655            .history(Some(10))
656            .await
657            .expect("Cannot get table history")
658            .collect();
659
660        assert_eq!(history.len(), 8);
661    }
662
663    #[tokio::test]
664    async fn read_empty_folder() {
665        let dir = std::env::temp_dir();
666        let table_url = url::Url::from_directory_path(&dir).unwrap();
667        let result = crate::open_table(table_url).await;
668
669        assert!(matches!(
670            result.unwrap_err(),
671            crate::errors::DeltaTableError::NotATable(_),
672        ));
673
674        let dir = std::env::temp_dir();
675        let table_url = url::Url::from_directory_path(&dir).unwrap();
676        let result = crate::open_table_with_ds(table_url, "2021-08-09T13:18:31+08:00").await;
677
678        assert!(matches!(
679            result.unwrap_err(),
680            crate::errors::DeltaTableError::NotATable(_),
681        ));
682    }
683
684    #[tokio::test]
685    async fn read_delta_table_with_cdc() {
686        let table_path = std::path::Path::new("../test/tests/data/simple_table_with_cdc")
687            .canonicalize()
688            .unwrap();
689        let table_url = url::Url::from_directory_path(table_path).unwrap();
690        let table = crate::open_table(table_url).await.unwrap();
691        assert_eq!(table.version(), Some(2));
692        assert_eq!(
693            file_paths_from(table.snapshot().unwrap(), &table.log_store())
694                .await
695                .unwrap(),
696            vec!["part-00000-7444aec4-710a-4a4c-8abe-3323499043e9.c000.snappy.parquet"]
697        );
698    }
699
700    #[tokio::test()]
701    async fn test_version_zero_table_load() {
702        let table_path = std::path::Path::new("../test/tests/data/COVID-19_NYT")
703            .canonicalize()
704            .unwrap();
705        let table_url = url::Url::from_directory_path(table_path).unwrap();
706        let latest_table: DeltaTable = crate::open_table(table_url.clone()).await.unwrap();
707
708        let version_0_table = crate::open_table_with_version(table_url, 0).await.unwrap();
709
710        let version_0_history: Vec<_> = version_0_table
711            .history(None)
712            .await
713            .expect("Cannot get table history")
714            .collect();
715        let latest_table_history: Vec<_> = latest_table
716            .history(None)
717            .await
718            .expect("Cannot get table history")
719            .collect();
720
721        assert_eq!(latest_table_history, version_0_history);
722    }
723
724    #[tokio::test()]
725    async fn test_fail_fast_on_not_existing_path() {
726        use std::path::Path as FolderPath;
727
728        let non_existing_path_str = "../test/tests/data/folder_doesnt_exist";
729
730        // Check that there is no such path at the beginning
731        let path_doesnt_exist = !FolderPath::new(non_existing_path_str).exists();
732        assert!(path_doesnt_exist);
733
734        let table_path = std::path::Path::new(non_existing_path_str);
735        let abs_path = std::fs::canonicalize(".").unwrap().join(table_path);
736        let table_url = url::Url::from_directory_path(abs_path).unwrap();
737        let error = crate::open_table(table_url).await.unwrap_err();
738        let _expected_error_msg = format!(
739            "Local path \"{non_existing_path_str}\" does not exist or you don't have access!"
740        );
741        assert!(matches!(
742            error,
743            DeltaTableError::InvalidTableLocation(_expected_error_msg),
744        ))
745    }
746
747    /// <https://github.com/delta-io/delta-rs/issues/2152>
748    #[tokio::test]
749    async fn test_identity_column() {
750        let table_path = std::path::Path::new("../test/tests/data/issue-2152")
751            .canonicalize()
752            .unwrap();
753        let table_url = url::Url::from_directory_path(table_path).unwrap();
754        let _ = crate::open_table(table_url)
755            .await
756            .expect("Failed to load the table");
757    }
758}