1#![allow(rustdoc::invalid_html_tags)]
80#![allow(clippy::nonminimal_bool)]
81pub mod data_catalog;
82pub mod errors;
83pub mod kernel;
84pub mod logstore;
85pub mod operations;
86pub mod protocol;
87pub use kernel::schema;
88pub mod table;
89
90#[cfg(any(test, feature = "integration_test"))]
91pub mod test_utils;
92
93#[cfg(feature = "datafusion")]
94pub mod delta_datafusion;
95pub mod writer;
96
97use std::collections::HashMap;
98use std::sync::OnceLock;
99use url::Url;
100
101pub use self::data_catalog::{DataCatalog, DataCatalogError};
102pub use self::errors::*;
103pub use self::schema::partitions::*;
104pub use self::schema::*;
105pub use self::table::DeltaTable;
106pub use self::table::builder::{
107 DeltaTableBuilder, DeltaTableConfig, DeltaVersion, ensure_table_uri,
108};
109pub use self::table::config::TableProperty;
110pub use object_store::{Error as ObjectStoreError, ObjectMeta, ObjectStore, path::Path};
111#[allow(deprecated)]
112pub use operations::DeltaOps;
113
114pub use protocol::checkpoints;
115
116pub use arrow;
118#[cfg(feature = "datafusion")]
119pub use datafusion;
120pub use parquet;
121
122#[cfg(not(any(feature = "rustls", feature = "native-tls")))]
123compile_error!("You must enable at least one of the features: `rustls` or `native-tls`.");
124
125pub async fn open_table(table_url: Url) -> Result<DeltaTable, DeltaTableError> {
130 let table = DeltaTableBuilder::from_url(table_url)?.load().await?;
131 Ok(table)
132}
133
134pub async fn open_table_with_storage_options(
139 table_url: Url,
140 storage_options: HashMap<String, String>,
141) -> Result<DeltaTable, DeltaTableError> {
142 let table = DeltaTableBuilder::from_url(table_url)?
143 .with_storage_options(storage_options)
144 .load()
145 .await?;
146 Ok(table)
147}
148
149pub async fn open_table_with_version(
154 table_url: Url,
155 version: i64,
156) -> Result<DeltaTable, DeltaTableError> {
157 let table = DeltaTableBuilder::from_url(table_url)?
158 .with_version(version)
159 .load()
160 .await?;
161 Ok(table)
162}
163
164pub async fn open_table_with_ds(
171 table_url: Url,
172 ds: impl AsRef<str>,
173) -> Result<DeltaTable, DeltaTableError> {
174 let table = DeltaTableBuilder::from_url(table_url)?
175 .with_datestring(ds)?
176 .load()
177 .await?;
178 Ok(table)
179}
180
181static CLIENT_VERSION: OnceLock<String> = OnceLock::new();
182
183pub fn init_client_version(version: &str) {
184 let _ = CLIENT_VERSION.set(version.to_string());
185}
186
187pub fn crate_version() -> &'static str {
189 CLIENT_VERSION
190 .get()
191 .map(|s| s.as_str())
192 .unwrap_or(env!("CARGO_PKG_VERSION"))
193}
194
195#[cfg(test)]
196mod tests {
197 use futures::TryStreamExt as _;
198
199 use super::*;
200 use test_utils::file_paths_from;
201
202 #[tokio::test]
203 async fn read_delta_2_0_table_without_version() -> DeltaResult<()> {
204 let table_path = std::path::Path::new("../test/tests/data/delta-0.2.0")
205 .canonicalize()
206 .unwrap();
207 let table_url = url::Url::from_directory_path(table_path).unwrap();
208 let table = crate::open_table(table_url).await.unwrap();
209 let snapshot = table.snapshot().unwrap();
210 assert_eq!(snapshot.version(), 3);
211 assert_eq!(snapshot.protocol().min_writer_version(), 2);
212 assert_eq!(snapshot.protocol().min_reader_version(), 1);
213 assert_eq!(
214 file_paths_from(snapshot, &table.log_store()).await?,
215 vec![
216 "part-00000-cb6b150b-30b8-4662-ad28-ff32ddab96d2-c000.snappy.parquet",
217 "part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet",
218 "part-00001-c373a5bd-85f0-4758-815e-7eb62007a15c-c000.snappy.parquet",
219 ]
220 );
221 let tombstones = table
222 .snapshot()
223 .unwrap()
224 .all_tombstones(&table.log_store())
225 .try_collect::<Vec<_>>()
226 .await
227 .unwrap();
228 assert_eq!(tombstones.len(), 4);
229 Ok(())
242 }
243
244 #[tokio::test]
245 async fn read_delta_table_with_update() -> DeltaResult<()> {
246 let table_path = std::path::Path::new("../test/tests/data/simple_table_with_checkpoint/")
247 .canonicalize()
248 .unwrap();
249 let table_url = url::Url::from_directory_path(table_path).unwrap();
250 let table_newest_version = crate::open_table(table_url.clone()).await.unwrap();
251 let mut table_to_update = crate::open_table_with_version(table_url, 0).await.unwrap();
252 table_to_update.update_state().await.unwrap();
254 table_to_update.update_state().await.unwrap();
255 table_to_update.update_state().await.unwrap();
256
257 assert_eq!(
258 file_paths_from(
259 table_newest_version.snapshot()?,
260 &table_newest_version.log_store()
261 )
262 .await?,
263 file_paths_from(table_to_update.snapshot()?, &table_to_update.log_store()).await?
264 );
265 Ok(())
266 }
267
268 #[tokio::test]
269 async fn read_delta_2_0_table_with_version() -> DeltaResult<()> {
270 let table_path = std::path::Path::new("../test/tests/data/delta-0.2.0")
271 .canonicalize()
272 .unwrap();
273 let table_url = url::Url::from_directory_path(table_path).unwrap();
274 let mut table = crate::open_table_with_version(table_url.clone(), 0)
275 .await
276 .unwrap();
277 let snapshot = table.snapshot().unwrap();
278 assert_eq!(snapshot.version(), 0);
279 assert_eq!(snapshot.protocol().min_writer_version(), 2);
280 assert_eq!(snapshot.protocol().min_reader_version(), 1);
281 assert_eq!(
282 file_paths_from(snapshot, &table.log_store()).await?,
283 vec![
284 "part-00000-b44fcdb0-8b06-4f3a-8606-f8311a96f6dc-c000.snappy.parquet",
285 "part-00001-185eca06-e017-4dea-ae49-fc48b973e37e-c000.snappy.parquet",
286 ],
287 );
288
289 table = crate::open_table_with_version(table_url.clone(), 2)
290 .await
291 .unwrap();
292 let snapshot = table.snapshot().unwrap();
293 assert_eq!(snapshot.version(), 2);
294 assert_eq!(snapshot.protocol().min_writer_version(), 2);
295 assert_eq!(snapshot.protocol().min_reader_version(), 1);
296 assert_eq!(
297 file_paths_from(snapshot, &table.log_store()).await?,
298 vec![
299 "part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet",
300 "part-00001-c373a5bd-85f0-4758-815e-7eb62007a15c-c000.snappy.parquet",
301 ]
302 );
303
304 table = crate::open_table_with_version(table_url, 3).await.unwrap();
305 let snapshot = table.snapshot().unwrap();
306 assert_eq!(snapshot.version(), 3);
307 assert_eq!(snapshot.protocol().min_writer_version(), 2);
308 assert_eq!(snapshot.protocol().min_reader_version(), 1);
309 assert_eq!(
310 file_paths_from(snapshot, &table.log_store()).await?,
311 vec![
312 "part-00000-cb6b150b-30b8-4662-ad28-ff32ddab96d2-c000.snappy.parquet",
313 "part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet",
314 "part-00001-c373a5bd-85f0-4758-815e-7eb62007a15c-c000.snappy.parquet",
315 ]
316 );
317 Ok(())
318 }
319
320 #[tokio::test]
321 async fn read_delta_8_0_table_without_version() {
322 let table_path = std::path::Path::new("../test/tests/data/delta-0.8.0")
323 .canonicalize()
324 .unwrap();
325 let table_url = url::Url::from_directory_path(table_path).unwrap();
326 let table = crate::open_table(table_url).await.unwrap();
327 let snapshot = table.snapshot().unwrap();
328 assert_eq!(snapshot.version(), 1);
329 assert_eq!(snapshot.protocol().min_writer_version(), 2);
330 assert_eq!(snapshot.protocol().min_reader_version(), 1);
331 assert_eq!(
332 file_paths_from(snapshot, &table.log_store()).await.unwrap(),
333 vec![
334 "part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet",
335 "part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet",
336 ]
337 );
338 assert_eq!(table.snapshot().unwrap().log_data().num_files(), 2);
339
340 let stats = table.snapshot().unwrap().add_actions_table(true).unwrap();
341
342 let num_records = stats.column_by_name("num_records").unwrap();
343 let num_records = num_records
344 .as_any()
345 .downcast_ref::<arrow::array::Int64Array>()
346 .unwrap();
347 let total_records = num_records.values().iter().sum::<i64>();
348 assert_eq!(total_records, 4);
349
350 let null_counts = stats.column_by_name("null_count.value").unwrap();
351 let null_counts = null_counts
352 .as_any()
353 .downcast_ref::<arrow::array::Int64Array>()
354 .unwrap();
355 null_counts.values().iter().for_each(|x| assert_eq!(*x, 0));
356
357 let tombstones = table
358 .snapshot()
359 .unwrap()
360 .all_tombstones(&table.log_store())
361 .try_collect::<Vec<_>>()
362 .await
363 .unwrap();
364 assert_eq!(tombstones.len(), 1);
365 let tombstone = tombstones.first().unwrap();
366 assert_eq!(
367 tombstone.path(),
368 "part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"
369 );
370 assert_eq!(tombstone.deletion_timestamp(), Some(1615043776198));
371 }
372
373 #[tokio::test]
374 async fn read_delta_8_0_table_with_load_version() {
375 let table_path = std::path::Path::new("../test/tests/data/delta-0.8.0")
376 .canonicalize()
377 .unwrap();
378 let table_url = url::Url::from_directory_path(table_path).unwrap();
379 let mut table = crate::open_table(table_url).await.unwrap();
380 let snapshot = table.snapshot().unwrap();
381 assert_eq!(snapshot.version(), 1);
382 assert_eq!(snapshot.protocol().min_writer_version(), 2);
383 assert_eq!(snapshot.protocol().min_reader_version(), 1);
384 assert_eq!(
385 file_paths_from(snapshot, &table.log_store()).await.unwrap(),
386 vec![
387 "part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet",
388 "part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet",
389 ]
390 );
391 table.load_version(0).await.unwrap();
392 let snapshot = table.snapshot().unwrap();
393 assert_eq!(snapshot.version(), 0);
394 assert_eq!(snapshot.protocol().min_writer_version(), 2);
395 assert_eq!(snapshot.protocol().min_reader_version(), 1);
396 assert_eq!(
397 file_paths_from(snapshot, &table.log_store()).await.unwrap(),
398 vec![
399 "part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet",
400 "part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet",
401 ]
402 );
403 }
404
405 #[tokio::test]
406 async fn read_delta_8_0_table_with_partitions() {
407 let table_path = std::path::Path::new("../test/tests/data/delta-0.8.0-partitioned")
408 .canonicalize()
409 .unwrap();
410 let table_url = url::Url::from_directory_path(table_path).unwrap();
411 let table = crate::open_table(table_url).await.unwrap();
412
413 let filters = vec![
414 crate::PartitionFilter {
415 key: "month".to_string(),
416 value: crate::PartitionValue::Equal("2".to_string()),
417 },
418 crate::PartitionFilter {
419 key: "year".to_string(),
420 value: crate::PartitionValue::Equal("2020".to_string()),
421 },
422 ];
423
424 assert_eq!(
425 table.get_files_by_partitions(&filters).await.unwrap(),
426 vec![
427 Path::from(
428 "year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet"
429 ),
430 Path::from(
431 "year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet"
432 )
433 ]
434 );
435 assert_eq!(
436 table.get_file_uris_by_partitions(&filters).await.unwrap().into_iter().map(|p| std::fs::canonicalize(p).unwrap()).collect::<Vec<_>>(),
437 vec![
438 std::fs::canonicalize("../test/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet").unwrap(),
439 std::fs::canonicalize("../test/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet").unwrap(),
440 ]
441 );
442
443 let filters = vec![crate::PartitionFilter {
444 key: "month".to_string(),
445 value: crate::PartitionValue::NotEqual("2".to_string()),
446 }];
447 assert_eq!(
448 table.get_files_by_partitions(&filters).await.unwrap(),
449 vec![
450 Path::from(
451 "year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet"
452 ),
453 Path::from(
454 "year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet"
455 ),
456 Path::from(
457 "year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet"
458 ),
459 Path::from(
460 "year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet"
461 )
462 ]
463 );
464
465 let filters = vec![crate::PartitionFilter {
466 key: "month".to_string(),
467 value: crate::PartitionValue::In(vec!["2".to_string(), "12".to_string()]),
468 }];
469 assert_eq!(
470 table.get_files_by_partitions(&filters).await.unwrap(),
471 vec![
472 Path::from(
473 "year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet"
474 ),
475 Path::from(
476 "year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet"
477 ),
478 Path::from(
479 "year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet"
480 ),
481 Path::from(
482 "year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet"
483 )
484 ]
485 );
486
487 let filters = vec![crate::PartitionFilter {
488 key: "month".to_string(),
489 value: crate::PartitionValue::NotIn(vec!["2".to_string(), "12".to_string()]),
490 }];
491 assert_eq!(
492 table.get_files_by_partitions(&filters).await.unwrap(),
493 vec![
494 Path::from(
495 "year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet"
496 ),
497 Path::from(
498 "year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet"
499 )
500 ]
501 );
502 }
503
504 #[tokio::test]
505 async fn read_delta_8_0_table_with_null_partition() {
506 let table_path = std::path::Path::new("../test/tests/data/delta-0.8.0-null-partition")
507 .canonicalize()
508 .unwrap();
509 let table_url = url::Url::from_directory_path(table_path).unwrap();
510 let table = crate::open_table(table_url).await.unwrap();
511
512 let filters = vec![crate::PartitionFilter {
513 key: "k".to_string(),
514 value: crate::PartitionValue::Equal("A".to_string()),
515 }];
516 assert_eq!(
517 table.get_files_by_partitions(&filters).await.unwrap(),
518 vec![Path::from(
519 "k=A/part-00000-b1f1dbbb-70bc-4970-893f-9bb772bf246e.c000.snappy.parquet"
520 )]
521 );
522
523 let filters = vec![crate::PartitionFilter {
524 key: "k".to_string(),
525 value: crate::PartitionValue::Equal("".to_string()),
526 }];
527 assert_eq!(
528 table.get_files_by_partitions(&filters).await.unwrap(),
529 vec![Path::from(
530 "k=__HIVE_DEFAULT_PARTITION__/part-00001-8474ac85-360b-4f58-b3ea-23990c71b932.c000.snappy.parquet"
531 )]
532 );
533 }
534
535 #[tokio::test]
536 async fn read_delta_8_0_table_with_special_partition() -> DeltaResult<()> {
537 let table_path = std::path::Path::new("../test/tests/data/delta-0.8.0-special-partition")
538 .canonicalize()
539 .unwrap();
540 let table_url = url::Url::from_directory_path(table_path).unwrap();
541 let _table = crate::open_table(table_url).await.unwrap();
542
543 Ok(())
566 }
567
568 #[tokio::test]
569 async fn read_delta_8_0_table_partition_with_compare_op() {
570 let table_path = std::path::Path::new("../test/tests/data/delta-0.8.0-numeric-partition")
571 .canonicalize()
572 .unwrap();
573 let table_url = url::Url::from_directory_path(table_path).unwrap();
574 let table = crate::open_table(table_url).await.unwrap();
575
576 let filters = vec![crate::PartitionFilter {
577 key: "x".to_string(),
578 value: crate::PartitionValue::LessThanOrEqual("9".to_string()),
579 }];
580 assert_eq!(
581 table.get_files_by_partitions(&filters).await.unwrap().len(),
582 1
583 );
584
585 let filters = vec![crate::PartitionFilter {
586 key: "y".to_string(),
587 value: crate::PartitionValue::LessThan("10.0".to_string()),
588 }];
589 assert_eq!(
590 table.get_files_by_partitions(&filters).await.unwrap().len(),
591 1
592 );
593 }
594
595 #[tokio::test]
596 async fn test_table_history() {
597 let table_path = std::path::Path::new("../test/tests/data/simple_table_with_checkpoint")
598 .canonicalize()
599 .unwrap();
600 let table_url = url::Url::from_directory_path(table_path).unwrap();
601 let latest_table = crate::open_table(table_url.clone()).await.unwrap();
602
603 let table = crate::open_table_with_version(table_url, 1).await.unwrap();
604
605 let history1: Vec<_> = table
606 .history(None)
607 .await
608 .expect("Cannot get table history")
609 .collect();
610 let history2: Vec<_> = latest_table
611 .history(None)
612 .await
613 .expect("Cannot get table history")
614 .collect();
615
616 assert_eq!(history1, history2);
617
618 let history3: Vec<_> = latest_table
619 .history(Some(5))
620 .await
621 .expect("Cannot get table history")
622 .collect();
623 assert_eq!(history3.len(), 5);
624 }
625
626 #[tokio::test]
627 async fn test_read_vacuumed_log() {
628 let table_path = std::path::Path::new("../test/tests/data/checkpoints_vacuumed")
629 .canonicalize()
630 .unwrap();
631 let table_url = url::Url::from_directory_path(table_path).unwrap();
632 let table = crate::open_table(table_url).await.unwrap();
633 assert_eq!(table.version(), Some(12));
634 }
635
636 #[tokio::test]
637 async fn test_read_vacuumed_log_history() {
638 let table_path = std::path::Path::new("../test/tests/data/checkpoints_vacuumed")
639 .canonicalize()
640 .unwrap();
641 let table_url = url::Url::from_directory_path(table_path).unwrap();
642 let table = crate::open_table(table_url).await.unwrap();
643
644 let history: Vec<_> = table
646 .history(Some(5))
647 .await
648 .expect("Cannot get table history")
649 .collect();
650
651 assert_eq!(history.len(), 5);
652
653 let history: Vec<_> = table
655 .history(Some(10))
656 .await
657 .expect("Cannot get table history")
658 .collect();
659
660 assert_eq!(history.len(), 8);
661 }
662
663 #[tokio::test]
664 async fn read_empty_folder() {
665 let dir = std::env::temp_dir();
666 let table_url = url::Url::from_directory_path(&dir).unwrap();
667 let result = crate::open_table(table_url).await;
668
669 assert!(matches!(
670 result.unwrap_err(),
671 crate::errors::DeltaTableError::NotATable(_),
672 ));
673
674 let dir = std::env::temp_dir();
675 let table_url = url::Url::from_directory_path(&dir).unwrap();
676 let result = crate::open_table_with_ds(table_url, "2021-08-09T13:18:31+08:00").await;
677
678 assert!(matches!(
679 result.unwrap_err(),
680 crate::errors::DeltaTableError::NotATable(_),
681 ));
682 }
683
684 #[tokio::test]
685 async fn read_delta_table_with_cdc() {
686 let table_path = std::path::Path::new("../test/tests/data/simple_table_with_cdc")
687 .canonicalize()
688 .unwrap();
689 let table_url = url::Url::from_directory_path(table_path).unwrap();
690 let table = crate::open_table(table_url).await.unwrap();
691 assert_eq!(table.version(), Some(2));
692 assert_eq!(
693 file_paths_from(table.snapshot().unwrap(), &table.log_store())
694 .await
695 .unwrap(),
696 vec!["part-00000-7444aec4-710a-4a4c-8abe-3323499043e9.c000.snappy.parquet"]
697 );
698 }
699
700 #[tokio::test()]
701 async fn test_version_zero_table_load() {
702 let table_path = std::path::Path::new("../test/tests/data/COVID-19_NYT")
703 .canonicalize()
704 .unwrap();
705 let table_url = url::Url::from_directory_path(table_path).unwrap();
706 let latest_table: DeltaTable = crate::open_table(table_url.clone()).await.unwrap();
707
708 let version_0_table = crate::open_table_with_version(table_url, 0).await.unwrap();
709
710 let version_0_history: Vec<_> = version_0_table
711 .history(None)
712 .await
713 .expect("Cannot get table history")
714 .collect();
715 let latest_table_history: Vec<_> = latest_table
716 .history(None)
717 .await
718 .expect("Cannot get table history")
719 .collect();
720
721 assert_eq!(latest_table_history, version_0_history);
722 }
723
724 #[tokio::test()]
725 async fn test_fail_fast_on_not_existing_path() {
726 use std::path::Path as FolderPath;
727
728 let non_existing_path_str = "../test/tests/data/folder_doesnt_exist";
729
730 let path_doesnt_exist = !FolderPath::new(non_existing_path_str).exists();
732 assert!(path_doesnt_exist);
733
734 let table_path = std::path::Path::new(non_existing_path_str);
735 let abs_path = std::fs::canonicalize(".").unwrap().join(table_path);
736 let table_url = url::Url::from_directory_path(abs_path).unwrap();
737 let error = crate::open_table(table_url).await.unwrap_err();
738 let _expected_error_msg = format!(
739 "Local path \"{non_existing_path_str}\" does not exist or you don't have access!"
740 );
741 assert!(matches!(
742 error,
743 DeltaTableError::InvalidTableLocation(_expected_error_msg),
744 ))
745 }
746
747 #[tokio::test]
749 async fn test_identity_column() {
750 let table_path = std::path::Path::new("../test/tests/data/issue-2152")
751 .canonicalize()
752 .unwrap();
753 let table_url = url::Url::from_directory_path(table_path).unwrap();
754 let _ = crate::open_table(table_url)
755 .await
756 .expect("Failed to load the table");
757 }
758}