1#![allow(rustdoc::invalid_html_tags)]
72#![allow(clippy::nonminimal_bool)]
73pub mod data_catalog;
74pub mod errors;
75pub mod kernel;
76pub mod logstore;
77pub mod operations;
78pub mod protocol;
79pub use kernel::schema;
80pub mod table;
81
82#[cfg(any(test, feature = "integration_test"))]
83pub mod test_utils;
84
85#[cfg(feature = "datafusion")]
86pub mod delta_datafusion;
87pub mod writer;
88
89use std::collections::HashMap;
90use std::sync::OnceLock;
91
92pub use self::data_catalog::{DataCatalog, DataCatalogError};
93pub use self::errors::*;
94pub use self::schema::partitions::*;
95pub use self::schema::*;
96pub use self::table::builder::{DeltaTableBuilder, DeltaTableConfig, DeltaVersion};
97pub use self::table::config::TableProperty;
98pub use self::table::DeltaTable;
99pub use object_store::{path::Path, Error as ObjectStoreError, ObjectMeta, ObjectStore};
100pub use operations::DeltaOps;
101
102pub use protocol::checkpoints;
103
104pub use arrow;
106#[cfg(feature = "datafusion")]
107pub use datafusion;
108pub use parquet;
109
110#[cfg(not(any(feature = "rustls", feature = "native-tls")))]
111compile_error!("You must enable at least one of the features: `rustls` or `native-tls`.");
112
113pub async fn open_table(table_uri: impl AsRef<str>) -> Result<DeltaTable, DeltaTableError> {
118 let table = DeltaTableBuilder::from_valid_uri(table_uri)?.load().await?;
119 Ok(table)
120}
121
122pub async fn open_table_with_storage_options(
127 table_uri: impl AsRef<str>,
128 storage_options: HashMap<String, String>,
129) -> Result<DeltaTable, DeltaTableError> {
130 let table = DeltaTableBuilder::from_valid_uri(table_uri)?
131 .with_storage_options(storage_options)
132 .load()
133 .await?;
134 Ok(table)
135}
136
137pub async fn open_table_with_version(
142 table_uri: impl AsRef<str>,
143 version: i64,
144) -> Result<DeltaTable, DeltaTableError> {
145 let table = DeltaTableBuilder::from_valid_uri(table_uri)?
146 .with_version(version)
147 .load()
148 .await?;
149 Ok(table)
150}
151
152pub async fn open_table_with_ds(
159 table_uri: impl AsRef<str>,
160 ds: impl AsRef<str>,
161) -> Result<DeltaTable, DeltaTableError> {
162 let table = DeltaTableBuilder::from_valid_uri(table_uri)?
163 .with_datestring(ds)?
164 .load()
165 .await?;
166 Ok(table)
167}
168
169static CLIENT_VERSION: OnceLock<String> = OnceLock::new();
170
171pub fn init_client_version(version: &str) {
172 let _ = CLIENT_VERSION.set(version.to_string());
173}
174
175pub fn crate_version() -> &'static str {
177 CLIENT_VERSION
178 .get()
179 .map(|s| s.as_str())
180 .unwrap_or(env!("CARGO_PKG_VERSION"))
181}
182
183#[cfg(test)]
184mod tests {
185 use itertools::Itertools;
186
187 use super::*;
188 use crate::table::PeekCommit;
189 use std::collections::HashMap;
190
191 #[tokio::test]
192 async fn read_delta_2_0_table_without_version() {
193 let table = crate::open_table("../test/tests/data/delta-0.2.0")
194 .await
195 .unwrap();
196 let snapshot = table.snapshot().unwrap();
197 assert_eq!(snapshot.version(), 3);
198 assert_eq!(snapshot.protocol().min_writer_version(), 2);
199 assert_eq!(snapshot.protocol().min_reader_version(), 1);
200 assert_eq!(
201 snapshot.file_paths_iter().collect_vec(),
202 vec![
203 Path::from("part-00000-cb6b150b-30b8-4662-ad28-ff32ddab96d2-c000.snappy.parquet"),
204 Path::from("part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet"),
205 Path::from("part-00001-c373a5bd-85f0-4758-815e-7eb62007a15c-c000.snappy.parquet"),
206 ]
207 );
208 let tombstones = table
209 .snapshot()
210 .unwrap()
211 .all_tombstones(&table.log_store())
212 .await
213 .unwrap()
214 .collect_vec();
215 assert_eq!(tombstones.len(), 4);
216 }
229
230 #[tokio::test]
231 async fn read_delta_table_with_update() {
232 let path = "../test/tests/data/simple_table_with_checkpoint/";
233 let table_newest_version = crate::open_table(path).await.unwrap();
234 let mut table_to_update = crate::open_table_with_version(path, 0).await.unwrap();
235 table_to_update.update().await.unwrap();
237 table_to_update.update().await.unwrap();
238 table_to_update.update().await.unwrap();
239
240 assert_eq!(
241 table_newest_version
242 .snapshot()
243 .unwrap()
244 .file_paths_iter()
245 .collect_vec(),
246 table_to_update
247 .snapshot()
248 .unwrap()
249 .file_paths_iter()
250 .collect_vec()
251 );
252 }
253 #[tokio::test]
254 async fn read_delta_2_0_table_with_version() {
255 let mut table = crate::open_table_with_version("../test/tests/data/delta-0.2.0", 0)
256 .await
257 .unwrap();
258 let snapshot = table.snapshot().unwrap();
259 assert_eq!(snapshot.version(), 0);
260 assert_eq!(snapshot.protocol().min_writer_version(), 2);
261 assert_eq!(snapshot.protocol().min_reader_version(), 1);
262 assert_eq!(
263 snapshot.file_paths_iter().collect_vec(),
264 vec![
265 Path::from("part-00000-b44fcdb0-8b06-4f3a-8606-f8311a96f6dc-c000.snappy.parquet"),
266 Path::from("part-00001-185eca06-e017-4dea-ae49-fc48b973e37e-c000.snappy.parquet"),
267 ],
268 );
269
270 table = crate::open_table_with_version("../test/tests/data/delta-0.2.0", 2)
271 .await
272 .unwrap();
273 let snapshot = table.snapshot().unwrap();
274 assert_eq!(snapshot.version(), 2);
275 assert_eq!(snapshot.protocol().min_writer_version(), 2);
276 assert_eq!(snapshot.protocol().min_reader_version(), 1);
277 assert_eq!(
278 snapshot.file_paths_iter().collect_vec(),
279 vec![
280 Path::from("part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet"),
281 Path::from("part-00001-c373a5bd-85f0-4758-815e-7eb62007a15c-c000.snappy.parquet"),
282 ]
283 );
284
285 table = crate::open_table_with_version("../test/tests/data/delta-0.2.0", 3)
286 .await
287 .unwrap();
288 let snapshot = table.snapshot().unwrap();
289 assert_eq!(snapshot.version(), 3);
290 assert_eq!(snapshot.protocol().min_writer_version(), 2);
291 assert_eq!(snapshot.protocol().min_reader_version(), 1);
292 assert_eq!(
293 snapshot.file_paths_iter().collect_vec(),
294 vec![
295 Path::from("part-00000-cb6b150b-30b8-4662-ad28-ff32ddab96d2-c000.snappy.parquet"),
296 Path::from("part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet"),
297 Path::from("part-00001-c373a5bd-85f0-4758-815e-7eb62007a15c-c000.snappy.parquet"),
298 ]
299 );
300 }
301
302 #[tokio::test]
303 async fn read_delta_8_0_table_without_version() {
304 let table = crate::open_table("../test/tests/data/delta-0.8.0")
305 .await
306 .unwrap();
307 let snapshot = table.snapshot().unwrap();
308 assert_eq!(snapshot.version(), 1);
309 assert_eq!(snapshot.protocol().min_writer_version(), 2);
310 assert_eq!(snapshot.protocol().min_reader_version(), 1);
311 assert_eq!(
312 snapshot.file_paths_iter().collect_vec(),
313 vec![
314 Path::from("part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet"),
315 Path::from("part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet"),
316 ]
317 );
318 assert_eq!(table.snapshot().unwrap().log_data().num_files(), 2);
319
320 let stats = table.snapshot().unwrap().add_actions_table(true).unwrap();
321
322 let num_records = stats.column_by_name("num_records").unwrap();
323 let num_records = num_records
324 .as_any()
325 .downcast_ref::<arrow::array::Int64Array>()
326 .unwrap();
327 let total_records = num_records.values().iter().sum::<i64>();
328 assert_eq!(total_records, 4);
329
330 let null_counts = stats.column_by_name("null_count.value").unwrap();
331 let null_counts = null_counts
332 .as_any()
333 .downcast_ref::<arrow::array::Int64Array>()
334 .unwrap();
335 null_counts.values().iter().for_each(|x| assert_eq!(*x, 0));
336
337 let tombstones = table
338 .snapshot()
339 .unwrap()
340 .all_tombstones(&table.log_store())
341 .await
342 .unwrap()
343 .collect_vec();
344 assert_eq!(tombstones.len(), 1);
345 assert!(tombstones.contains(&crate::kernel::Remove {
346 path: "part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet".to_string(),
347 deletion_timestamp: Some(1615043776198),
348 data_change: true,
349 extended_file_metadata: Some(true),
350 partition_values: Some(HashMap::new()),
351 size: Some(445),
352 base_row_id: None,
353 default_row_commit_version: None,
354 deletion_vector: None,
355 tags: Some(HashMap::new()),
356 }));
357 }
358
359 #[tokio::test]
360 async fn read_delta_8_0_table_with_load_version() {
361 let mut table = crate::open_table("../test/tests/data/delta-0.8.0")
362 .await
363 .unwrap();
364 let snapshot = table.snapshot().unwrap();
365 assert_eq!(snapshot.version(), 1);
366 assert_eq!(snapshot.protocol().min_writer_version(), 2);
367 assert_eq!(snapshot.protocol().min_reader_version(), 1);
368 assert_eq!(
369 snapshot.file_paths_iter().collect_vec(),
370 vec![
371 Path::from("part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet"),
372 Path::from("part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet"),
373 ]
374 );
375 table.load_version(0).await.unwrap();
376 let snapshot = table.snapshot().unwrap();
377 assert_eq!(snapshot.version(), 0);
378 assert_eq!(snapshot.protocol().min_writer_version(), 2);
379 assert_eq!(snapshot.protocol().min_reader_version(), 1);
380 assert_eq!(
381 snapshot.file_paths_iter().collect_vec(),
382 vec![
383 Path::from("part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet"),
384 Path::from("part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"),
385 ]
386 );
387 }
388
389 #[tokio::test]
390 async fn read_delta_8_0_table_with_partitions() {
391 let table = crate::open_table("../test/tests/data/delta-0.8.0-partitioned")
392 .await
393 .unwrap();
394
395 let filters = vec![
396 crate::PartitionFilter {
397 key: "month".to_string(),
398 value: crate::PartitionValue::Equal("2".to_string()),
399 },
400 crate::PartitionFilter {
401 key: "year".to_string(),
402 value: crate::PartitionValue::Equal("2020".to_string()),
403 },
404 ];
405
406 assert_eq!(
407 table.get_files_by_partitions(&filters).await.unwrap(),
408 vec![
409 Path::from("year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet"),
410 Path::from("year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet")
411 ]
412 );
413 assert_eq!(
414 table.get_file_uris_by_partitions(&filters).await.unwrap().into_iter().map(|p| std::fs::canonicalize(p).unwrap()).collect::<Vec<_>>(),
415 vec![
416 std::fs::canonicalize("../test/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet").unwrap(),
417 std::fs::canonicalize("../test/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet").unwrap(),
418 ]
419 );
420
421 let filters = vec![crate::PartitionFilter {
422 key: "month".to_string(),
423 value: crate::PartitionValue::NotEqual("2".to_string()),
424 }];
425 assert_eq!(
426 table.get_files_by_partitions(&filters).await.unwrap(),
427 vec![
428 Path::from("year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet"),
429 Path::from("year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet"),
430 Path::from("year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet"),
431 Path::from("year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet")
432 ]
433 );
434
435 let filters = vec![crate::PartitionFilter {
436 key: "month".to_string(),
437 value: crate::PartitionValue::In(vec!["2".to_string(), "12".to_string()]),
438 }];
439 assert_eq!(
440 table.get_files_by_partitions(&filters).await.unwrap(),
441 vec![
442 Path::from("year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet"),
443 Path::from("year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet"),
444 Path::from("year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet"),
445 Path::from("year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet")
446 ]
447 );
448
449 let filters = vec![crate::PartitionFilter {
450 key: "month".to_string(),
451 value: crate::PartitionValue::NotIn(vec!["2".to_string(), "12".to_string()]),
452 }];
453 assert_eq!(
454 table.get_files_by_partitions(&filters).await.unwrap(),
455 vec![
456 Path::from("year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet"),
457 Path::from("year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet")
458 ]
459 );
460 }
461
462 #[tokio::test]
463 async fn read_delta_8_0_table_with_null_partition() {
464 let table = crate::open_table("../test/tests/data/delta-0.8.0-null-partition")
465 .await
466 .unwrap();
467
468 let filters = vec![crate::PartitionFilter {
469 key: "k".to_string(),
470 value: crate::PartitionValue::Equal("A".to_string()),
471 }];
472 assert_eq!(
473 table.get_files_by_partitions(&filters).await.unwrap(),
474 vec![Path::from(
475 "k=A/part-00000-b1f1dbbb-70bc-4970-893f-9bb772bf246e.c000.snappy.parquet"
476 )]
477 );
478
479 let filters = vec![crate::PartitionFilter {
480 key: "k".to_string(),
481 value: crate::PartitionValue::Equal("".to_string()),
482 }];
483 assert_eq!(
484 table.get_files_by_partitions(&filters).await.unwrap(),
485 vec![
486 Path::from("k=__HIVE_DEFAULT_PARTITION__/part-00001-8474ac85-360b-4f58-b3ea-23990c71b932.c000.snappy.parquet")
487 ]
488 );
489 }
490
491 #[tokio::test]
492 async fn read_delta_8_0_table_with_special_partition() {
493 let table = crate::open_table("../test/tests/data/delta-0.8.0-special-partition")
494 .await
495 .unwrap();
496
497 assert_eq!(
498 table.snapshot().unwrap().file_paths_iter().collect_vec(),
499 vec![
500 Path::parse(
501 "x=A%2FA/part-00007-b350e235-2832-45df-9918-6cab4f7578f7.c000.snappy.parquet"
502 )
503 .unwrap(),
504 Path::parse(
505 "x=B%20B/part-00015-e9abbc6f-85e9-457b-be8e-e9f5b8a22890.c000.snappy.parquet"
506 )
507 .unwrap()
508 ]
509 );
510
511 let filters = vec![crate::PartitionFilter {
512 key: "x".to_string(),
513 value: crate::PartitionValue::Equal("A/A".to_string()),
514 }];
515 assert_eq!(
516 table.get_files_by_partitions(&filters).await.unwrap(),
517 vec![Path::parse(
518 "x=A%2FA/part-00007-b350e235-2832-45df-9918-6cab4f7578f7.c000.snappy.parquet"
519 )
520 .unwrap()]
521 );
522 }
523
524 #[tokio::test]
525 async fn read_delta_8_0_table_partition_with_compare_op() {
526 let table = crate::open_table("../test/tests/data/delta-0.8.0-numeric-partition")
527 .await
528 .unwrap();
529
530 let filters = vec![crate::PartitionFilter {
531 key: "x".to_string(),
532 value: crate::PartitionValue::LessThanOrEqual("9".to_string()),
533 }];
534 assert_eq!(
535 table.get_files_by_partitions(&filters).await.unwrap(),
536 vec![Path::from(
537 "x=9/y=9.9/part-00007-3c50fba1-4264-446c-9c67-d8e24a1ccf83.c000.snappy.parquet"
538 )]
539 );
540
541 let filters = vec![crate::PartitionFilter {
542 key: "y".to_string(),
543 value: crate::PartitionValue::LessThan("10.0".to_string()),
544 }];
545 assert_eq!(
546 table.get_files_by_partitions(&filters).await.unwrap(),
547 vec![Path::from(
548 "x=9/y=9.9/part-00007-3c50fba1-4264-446c-9c67-d8e24a1ccf83.c000.snappy.parquet"
549 )]
550 );
551 }
552
553 #[tokio::test]
554 async fn test_table_history() {
555 let path = "../test/tests/data/simple_table_with_checkpoint";
556 let latest_table = crate::open_table(path).await.unwrap();
557
558 let table = crate::open_table_with_version(path, 1).await.unwrap();
559
560 let history1 = table.history(None).await.expect("Cannot get table history");
561 let history2 = latest_table
562 .history(None)
563 .await
564 .expect("Cannot get table history");
565
566 assert_eq!(history1, history2);
567
568 let history3 = latest_table
569 .history(Some(5))
570 .await
571 .expect("Cannot get table history");
572 assert_eq!(history3.len(), 5);
573 }
574
575 #[tokio::test]
576 async fn test_poll_table_commits() {
577 let path = "../test/tests/data/simple_table_with_checkpoint";
578 let mut table = crate::open_table_with_version(path, 9).await.unwrap();
579 assert_eq!(table.version(), Some(9));
580 let peek = table
581 .log_store()
582 .peek_next_commit(table.version().unwrap())
583 .await
584 .unwrap();
585 assert!(matches!(peek, PeekCommit::New(..)));
586
587 if let PeekCommit::New(version, actions) = peek {
588 assert_eq!(table.version(), Some(9));
589 assert!(!table.snapshot().unwrap().file_paths_iter().any(|f| f
590 == Path::from(
591 "part-00000-f0e955c5-a1e3-4eec-834e-dcc098fc9005-c000.snappy.parquet"
592 )));
593
594 assert_eq!(version, 10);
595 assert_eq!(actions.len(), 2);
596
597 table.update_incremental(None).await.unwrap();
598
599 assert_eq!(table.version(), Some(10));
600 assert!(table.snapshot().unwrap().file_paths_iter().any(|f| f
601 == Path::from(
602 "part-00000-f0e955c5-a1e3-4eec-834e-dcc098fc9005-c000.snappy.parquet"
603 )));
604 };
605
606 let peek = table
607 .log_store()
608 .peek_next_commit(table.version().unwrap())
609 .await
610 .unwrap();
611 assert!(matches!(peek, PeekCommit::UpToDate));
612 }
613
614 #[tokio::test]
615 async fn test_read_vacuumed_log() {
616 let path = "../test/tests/data/checkpoints_vacuumed";
617 let table = crate::open_table(path).await.unwrap();
618 assert_eq!(table.version(), Some(12));
619 }
620
621 #[tokio::test]
622 async fn test_read_vacuumed_log_history() {
623 let path = "../test/tests/data/checkpoints_vacuumed";
624 let table = crate::open_table(path).await.unwrap();
625
626 let history = table
628 .history(Some(5))
629 .await
630 .expect("Cannot get table history");
631
632 assert_eq!(history.len(), 5);
633
634 let history = table
636 .history(Some(10))
637 .await
638 .expect("Cannot get table history");
639
640 assert_eq!(history.len(), 8);
641 }
642
643 #[tokio::test]
644 async fn read_empty_folder() {
645 let dir = std::env::temp_dir();
646 let result = crate::open_table(&dir.into_os_string().into_string().unwrap()).await;
647
648 assert!(matches!(
649 result.unwrap_err(),
650 crate::errors::DeltaTableError::NotATable(_),
651 ));
652
653 let dir = std::env::temp_dir();
654 let result = crate::open_table_with_ds(
655 &dir.into_os_string().into_string().unwrap(),
656 "2021-08-09T13:18:31+08:00",
657 )
658 .await;
659
660 assert!(matches!(
661 result.unwrap_err(),
662 crate::errors::DeltaTableError::NotATable(_),
663 ));
664 }
665
666 #[tokio::test]
667 async fn read_delta_table_with_cdc() {
668 let table = crate::open_table("../test/tests/data/simple_table_with_cdc")
669 .await
670 .unwrap();
671 assert_eq!(table.version(), Some(2));
672 assert_eq!(
673 table.snapshot().unwrap().file_paths_iter().collect_vec(),
674 vec![Path::from(
675 "part-00000-7444aec4-710a-4a4c-8abe-3323499043e9.c000.snappy.parquet"
676 ),]
677 );
678 }
679
680 #[tokio::test()]
681 async fn test_version_zero_table_load() {
682 let path = "../test/tests/data/COVID-19_NYT";
683 let latest_table: DeltaTable = crate::open_table(path).await.unwrap();
684
685 let version_0_table = crate::open_table_with_version(path, 0).await.unwrap();
686
687 let version_0_history = version_0_table
688 .history(None)
689 .await
690 .expect("Cannot get table history");
691 let latest_table_history = latest_table
692 .history(None)
693 .await
694 .expect("Cannot get table history");
695
696 assert_eq!(latest_table_history, version_0_history);
697 }
698
699 #[tokio::test()]
700 async fn test_fail_fast_on_not_existing_path() {
701 use std::path::Path as FolderPath;
702
703 let non_existing_path_str = "../test/tests/data/folder_doesnt_exist";
704
705 let path_doesnt_exist = !FolderPath::new(non_existing_path_str).exists();
707 assert!(path_doesnt_exist);
708
709 let error = crate::open_table(non_existing_path_str).await.unwrap_err();
710 let _expected_error_msg = format!(
711 "Local path \"{non_existing_path_str}\" does not exist or you don't have access!"
712 );
713 assert!(matches!(
714 error,
715 DeltaTableError::InvalidTableLocation(_expected_error_msg),
716 ))
717 }
718
719 #[tokio::test]
721 async fn test_identity_column() {
722 let path = "../test/tests/data/issue-2152";
723 let _ = crate::open_table(path)
724 .await
725 .expect("Failed to load the table");
726 }
727}