1use std::cmp::{Ordering, min};
4use std::fmt;
5use std::fmt::Formatter;
6use std::sync::Arc;
7
8use chrono::{DateTime, Utc};
9use futures::future::ready;
10use futures::stream::{BoxStream, once};
11use futures::{StreamExt, TryStreamExt};
12use object_store::{ObjectStore, ObjectStoreExt as _, path::Path};
13use serde::de::{Error, SeqAccess, Visitor};
14use serde::ser::SerializeSeq;
15use serde::{Deserialize, Deserializer, Serialize, Serializer};
16use url::Url;
17
18use self::builder::DeltaTableConfig;
19use self::state::DeltaTableState;
20use crate::kernel::{CommitInfo, DataCheck, LogicalFileView, Version};
21use crate::logstore::{
22 LogStoreConfig, LogStoreExt, LogStoreRef, ObjectStoreRef, commit_uri_from_version,
23 extract_version_from_filename,
24};
25use crate::partitions::PartitionFilter;
26use crate::{DeltaResult, DeltaTableBuilder, DeltaTableError};
27
28pub mod builder;
29pub mod config;
30pub mod state;
31
32mod columns;
33
34pub use columns::*;
36
37#[derive(Clone)]
44pub struct DeltaTable {
45 pub state: Option<DeltaTableState>,
47 pub config: DeltaTableConfig,
49 pub(crate) log_store: LogStoreRef,
51}
52
53impl Serialize for DeltaTable {
54 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
55 where
56 S: Serializer,
57 {
58 let mut seq = serializer.serialize_seq(None)?;
59 seq.serialize_element(&self.state)?;
60 seq.serialize_element(&self.config)?;
61 seq.serialize_element(self.log_store.config())?;
62 seq.end()
63 }
64}
65
66impl<'de> Deserialize<'de> for DeltaTable {
67 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
68 where
69 D: Deserializer<'de>,
70 {
71 struct DeltaTableVisitor {}
72
73 impl<'de> Visitor<'de> for DeltaTableVisitor {
74 type Value = DeltaTable;
75
76 fn expecting(&self, formatter: &mut Formatter) -> fmt::Result {
77 formatter.write_str("struct DeltaTable")
78 }
79
80 fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
81 where
82 A: SeqAccess<'de>,
83 {
84 let state = seq
85 .next_element()?
86 .ok_or_else(|| A::Error::invalid_length(0, &self))?;
87 let config = seq
88 .next_element()?
89 .ok_or_else(|| A::Error::invalid_length(0, &self))?;
90 let storage_config: LogStoreConfig = seq
91 .next_element()?
92 .ok_or_else(|| A::Error::invalid_length(0, &self))?;
93 let log_store = crate::logstore::logstore_for(
94 storage_config.location(),
95 storage_config.options().clone(),
96 )
97 .map_err(|_| A::Error::custom("Failed deserializing LogStore"))?;
98
99 let table = DeltaTable {
100 state,
101 config,
102 log_store,
103 };
104 Ok(table)
105 }
106 }
107
108 deserializer.deserialize_seq(DeltaTableVisitor {})
109 }
110}
111
112impl DeltaTable {
113 pub fn new(log_store: LogStoreRef, config: DeltaTableConfig) -> Self {
118 Self {
119 state: None,
120 log_store,
121 config,
122 }
123 }
124
125 pub fn new_in_memory() -> Self {
135 let url = Url::parse("memory:///").unwrap();
136 DeltaTableBuilder::from_url(url).unwrap().build().unwrap()
137 }
138
139 pub(crate) fn new_with_state(log_store: LogStoreRef, state: DeltaTableState) -> Self {
145 let config = state.load_config().clone();
146 Self {
147 state: Some(state),
148 log_store,
149 config,
150 }
151 }
152
153 pub fn object_store(&self) -> ObjectStoreRef {
155 self.log_store.object_store(None)
156 }
157
158 pub async fn verify_deltatable_existence(&self) -> DeltaResult<bool> {
160 self.log_store.is_delta_table_location().await
161 }
162
163 pub fn table_url(&self) -> &Url {
165 self.log_store.root_url()
166 }
167
168 pub fn log_store(&self) -> LogStoreRef {
170 self.log_store.clone()
171 }
172
173 pub async fn get_latest_version(&self) -> Result<Version, DeltaTableError> {
175 self.log_store
176 .get_latest_version(self.version().unwrap_or(0))
177 .await
178 }
179
180 pub fn version(&self) -> Option<Version> {
185 self.state.as_ref().map(|s| s.version())
186 }
187
188 pub async fn load(&mut self) -> Result<(), DeltaTableError> {
190 self.update_incremental(None).await
191 }
192
193 pub async fn update_state(&mut self) -> Result<(), DeltaTableError> {
196 self.update_incremental(None).await
197 }
198
199 pub async fn update_incremental(
204 &mut self,
205 max_version: Option<Version>,
206 ) -> Result<(), DeltaTableError> {
207 let Some(state) = self.state.as_mut() else {
208 self.state = Some(
209 DeltaTableState::try_new(&self.log_store, self.config.clone(), max_version).await?,
210 );
211 return Ok(());
212 };
213
214 let current_version = state.version();
215 if let Some(requested_version) = max_version
216 && requested_version < current_version
217 {
218 return Err(DeltaTableError::VersionDowngrade {
219 current_version,
220 requested_version,
221 });
222 }
223
224 state.update(&self.log_store, max_version).await?;
225 Ok(())
226 }
227
228 pub async fn load_version(&mut self, version: Version) -> Result<(), DeltaTableError> {
230 if let Some(snapshot) = &self.state
231 && snapshot.version() > version
232 {
233 self.state = None;
234 }
235 self.update_incremental(Some(version)).await
236 }
237
238 pub(crate) async fn get_version_timestamp(
239 &self,
240 version: Version,
241 ) -> Result<i64, DeltaTableError> {
242 match self
243 .state
244 .as_ref()
245 .and_then(|s| s.version_timestamp(version))
246 {
247 Some(ts) => Ok(ts),
248 None => {
249 let meta = self
250 .object_store()
251 .head(&commit_uri_from_version(Some(version)))
252 .await?;
253 let ts = meta.last_modified.timestamp_millis();
254 Ok(ts)
255 }
256 }
257 }
258
259 pub async fn history(
264 &self,
265 limit: Option<usize>,
266 ) -> Result<impl Iterator<Item = CommitInfo> + use<>, DeltaTableError> {
267 let infos = self
268 .snapshot()?
269 .snapshot()
270 .snapshot()
271 .commit_infos(&self.log_store(), limit)
272 .await?
273 .try_collect::<Vec<_>>()
274 .await?;
275 Ok(infos.into_iter().flatten())
276 }
277
278 #[cfg(test)]
279 pub(crate) async fn last_commit(&self) -> Result<CommitInfo, DeltaTableError> {
283 let mut infos: Vec<_> = self.history(Some(1)).await?.collect();
284 infos.pop().ok_or(DeltaTableError::Generic(
285 "Somehow there is nothing in the history!".into(),
286 ))
287 }
288
289 pub fn get_active_add_actions_by_partitions(
291 &self,
292 filters: &[PartitionFilter],
293 ) -> BoxStream<'_, DeltaResult<LogicalFileView>> {
294 let Some(state) = self.state.as_ref() else {
295 return Box::pin(futures::stream::once(async {
296 Err(DeltaTableError::NotInitialized)
297 }));
298 };
299
300 if filters.is_empty() {
301 return state.snapshot().file_views(&self.log_store, None);
302 }
303
304 let predicate =
305 match crate::to_kernel_predicate(filters, state.snapshot().schema().as_ref()) {
306 Ok(predicate) => Arc::new(predicate),
307 Err(err) => return Box::pin(once(ready(Err(err)))),
308 };
309 state
310 .snapshot()
311 .file_views(&self.log_store, Some(predicate))
312 }
313
314 pub async fn get_files_by_partitions(
317 &self,
318 filters: &[PartitionFilter],
319 ) -> Result<Vec<Path>, DeltaTableError> {
320 Ok(self
321 .get_active_add_actions_by_partitions(filters)
322 .try_collect::<Vec<_>>()
323 .await?
324 .into_iter()
325 .map(|add| add.object_store_path())
326 .collect())
327 }
328
329 pub async fn get_file_uris_by_partitions(
331 &self,
332 filters: &[PartitionFilter],
333 ) -> Result<Vec<String>, DeltaTableError> {
334 let files = self.get_files_by_partitions(filters).await?;
335 Ok(files
336 .iter()
337 .map(|fname| self.log_store.to_uri(fname))
338 .collect())
339 }
340
341 pub fn get_file_uris(&self) -> DeltaResult<impl Iterator<Item = String> + '_> {
343 Ok(self
344 .state
345 .as_ref()
346 .ok_or(DeltaTableError::NotInitialized)?
347 .log_data()
348 .into_iter()
349 .map(|add| add.object_store_path())
350 .map(|path| self.log_store.to_uri(&path)))
351 }
352
353 pub fn snapshot(&self) -> DeltaResult<&DeltaTableState> {
365 self.state.as_ref().ok_or(DeltaTableError::NotInitialized)
366 }
367
368 pub async fn load_with_datetime(
373 &mut self,
374 datetime: DateTime<Utc>,
375 ) -> Result<(), DeltaTableError> {
376 let mut min_version: i64 = -1;
377 let log_store = self.log_store();
378 let prefix = log_store.log_path();
379 let offset_path = commit_uri_from_version(None);
380 let object_store = log_store.object_store(None);
381 let mut files = object_store.list_with_offset(Some(prefix), &offset_path);
382
383 while let Some(obj_meta) = files.next().await {
384 let obj_meta = obj_meta?;
385 let location_path: Path = obj_meta.location.clone();
386 let part_count = location_path.prefix_match(prefix).unwrap().count();
387 if part_count > 1 {
388 continue;
395 }
396 if let Some(log_version) = extract_version_from_filename(obj_meta.location.as_ref()) {
397 if min_version == -1 {
398 min_version = log_version as i64;
399 } else {
400 min_version = min(min_version, log_version as i64);
401 }
402 }
403 if min_version == 0 {
404 break;
405 }
406 }
407 let latest_default_version = if min_version < 0 {
408 0
409 } else {
410 min_version.try_into().unwrap()
411 };
412 let mut max_version = match self
413 .log_store
414 .get_latest_version(self.version().unwrap_or(latest_default_version))
415 .await
416 {
417 Ok(version) => version,
418 Err(DeltaTableError::InvalidVersion(_)) => {
419 return Err(DeltaTableError::NotATable(
420 log_store.table_root_url().to_string(),
421 ));
422 }
423 Err(e) => return Err(e),
424 } as i64;
425 let mut version = min_version;
426 let lowest_table_version = min_version;
427 let target_ts = datetime.timestamp_millis();
428
429 while min_version <= max_version {
431 let pivot = (max_version + min_version) / 2;
432 version = pivot;
433 let pts: i64 = self
434 .get_version_timestamp(pivot.try_into().unwrap())
435 .await?;
436 match pts.cmp(&target_ts) {
437 Ordering::Equal => {
438 break;
439 }
440 Ordering::Less => {
441 min_version = pivot + 1;
442 }
443 Ordering::Greater => {
444 max_version = pivot - 1;
445 version = max_version
446 }
447 }
448 }
449
450 if version < lowest_table_version {
451 version = lowest_table_version;
452 }
453 assert!(
454 version >= 0,
455 "load_with_datetime() came up with a negative version which shouldn't be possible"
456 );
457
458 self.load_version(version.try_into().unwrap()).await
459 }
460}
461
462impl fmt::Display for DeltaTable {
463 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
464 writeln!(f, "DeltaTable({})", self.table_url())?;
465 writeln!(f, "\tversion: {:?}", self.version())
466 }
467}
468
469impl std::fmt::Debug for DeltaTable {
470 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
471 write!(f, "DeltaTable <{}>", self.table_url())
472 }
473}
474
475pub fn normalize_table_url(url: &Url) -> Url {
486 let mut new_segments = vec![];
487 for segment in url.path().split('/') {
488 if !segment.is_empty() {
489 new_segments.push(segment);
490 }
491 }
492 new_segments.push("");
494
495 let mut url = url.clone();
496 url.set_path(&new_segments.join("/"));
497 url
498}
499
500#[cfg(test)]
501mod tests {
502 use arrow_ipc::writer::FileWriter;
503 use pretty_assertions::assert_eq;
504 use serde_json::json;
505 use tempfile::TempDir;
506
507 use super::*;
508 use crate::kernel::{DataType, PrimitiveType, StructField};
509 use crate::operations::create::CreateBuilder;
510
511 fn legacy_eager_snapshot_payload(snapshot: &crate::kernel::EagerSnapshot) -> serde_json::Value {
512 let mut snapshot_value = serde_json::to_value(snapshot.snapshot()).unwrap();
513 let snapshot_fields = snapshot_value
514 .as_array_mut()
515 .expect("snapshot serde should use a sequence");
516 snapshot_fields.pop();
517
518 let materialized_files = snapshot
519 .snapshot()
520 .materialized_files()
521 .expect("expected materialized files for legacy eager snapshot payload");
522 let bytes = if materialized_files.batches.is_empty() {
523 Vec::new()
524 } else {
525 let mut buffer = vec![];
526 let mut writer =
527 FileWriter::try_new(&mut buffer, materialized_files.batches[0].schema().as_ref())
528 .unwrap();
529 for batch in materialized_files.batches.iter() {
530 writer.write(batch).unwrap();
531 }
532 writer.finish().unwrap();
533 drop(writer);
534 buffer
535 };
536
537 json!([snapshot_value, bytes])
538 }
539
540 #[test]
541 fn test_normalize_table_url() {
542 for (u, path) in [
543 (Url::parse("s3://bucket/prefix/").unwrap(), "/prefix/"),
544 (Url::parse("s3://bucket/prefix").unwrap(), "/prefix/"),
545 (
546 Url::parse("s3://bucket/prefix with space/").unwrap(),
547 "/prefix%20with%20space/",
548 ),
549 (
550 Url::parse("s3://bucket/special&chars/你好/😊").unwrap(),
551 "/special&chars/%E4%BD%A0%E5%A5%BD/%F0%9F%98%8A/",
552 ),
553 (
554 Url::parse("s3://bucket/prefix/with/redundant/slashes//").unwrap(),
555 "/prefix/with/redundant/slashes/",
556 ),
557 ] {
558 assert_eq!(
559 normalize_table_url(&u).path(),
560 path,
561 "Failed to normalize: {}",
562 u.as_str()
563 );
564 }
565 }
566
567 #[tokio::test]
568 async fn table_round_trip() {
569 let (dt, tmp_dir) = create_test_table().await;
570 let bytes = serde_json::to_vec(&dt).unwrap();
571 let actual: DeltaTable = serde_json::from_slice(&bytes).unwrap();
572 assert_eq!(actual.version(), dt.version());
573 drop(tmp_dir);
574 }
575
576 #[tokio::test]
577 async fn table_round_trip_preserves_legacy_eager_snapshot_payload() {
578 let (dt, tmp_dir) = create_test_table().await;
579 let mut value = serde_json::to_value(&dt).unwrap();
580 let table_fields = value.as_array_mut().unwrap();
581 let state = table_fields[0].as_object_mut().unwrap();
582 state.insert(
583 "snapshot".to_string(),
584 legacy_eager_snapshot_payload(dt.state.as_ref().unwrap().snapshot()),
585 );
586
587 let actual: DeltaTable = serde_json::from_value(value).unwrap();
588 assert_eq!(
589 actual.snapshot().unwrap().log_data().num_files(),
590 dt.snapshot().unwrap().log_data().num_files()
591 );
592 drop(tmp_dir);
593 }
594
595 #[tokio::test]
596 async fn table_without_files_does_not_panic_on_log_data() {
597 let (dt, _tmp_dir) = create_test_table().await;
598 let url = dt.table_url().clone();
599
600 let table = DeltaTableBuilder::from_url(url)
601 .unwrap()
602 .without_files()
603 .load()
604 .await
605 .unwrap();
606
607 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
608 table.snapshot().unwrap().log_data().num_files()
609 }));
610
611 assert!(result.is_ok());
612 }
613
614 async fn create_test_table() -> (DeltaTable, TempDir) {
615 let tmp_dir = tempfile::tempdir().unwrap();
616 let table_dir = tmp_dir.path().join("test_create");
617 std::fs::create_dir(&table_dir).unwrap();
618
619 let dt = CreateBuilder::new()
620 .with_location(table_dir.to_str().unwrap())
621 .with_table_name("Test Table Create")
622 .with_comment("This table is made to test the create function for a DeltaTable")
623 .with_columns(vec![
624 StructField::new(
625 "Id".to_string(),
626 DataType::Primitive(PrimitiveType::Integer),
627 true,
628 ),
629 StructField::new(
630 "Name".to_string(),
631 DataType::Primitive(PrimitiveType::String),
632 true,
633 ),
634 ])
635 .await
636 .unwrap();
637 (dt, tmp_dir)
638 }
639}