Skip to main content

deltalake_core/table/
mod.rs

1//! Delta Table read and write implementation
2
3use std::cmp::{Ordering, min};
4use std::fmt;
5use std::fmt::Formatter;
6use std::sync::Arc;
7
8use chrono::{DateTime, Utc};
9use futures::future::ready;
10use futures::stream::{BoxStream, once};
11use futures::{StreamExt, TryStreamExt};
12use object_store::{ObjectStore, ObjectStoreExt as _, path::Path};
13use serde::de::{Error, SeqAccess, Visitor};
14use serde::ser::SerializeSeq;
15use serde::{Deserialize, Deserializer, Serialize, Serializer};
16use url::Url;
17
18use self::builder::DeltaTableConfig;
19use self::state::DeltaTableState;
20use crate::kernel::{CommitInfo, DataCheck, LogicalFileView, Version};
21use crate::logstore::{
22    LogStoreConfig, LogStoreExt, LogStoreRef, ObjectStoreRef, commit_uri_from_version,
23    extract_version_from_filename,
24};
25use crate::partitions::PartitionFilter;
26use crate::{DeltaResult, DeltaTableBuilder, DeltaTableError};
27
28pub mod builder;
29pub mod config;
30pub mod state;
31
32mod columns;
33
34// Re-exposing for backwards compatibility
35pub use columns::*;
36
37/// In memory representation of a Delta Table
38///
39/// A DeltaTable is a purely logical concept that represents a dataset that can ewvolve over time.
40/// To attain concrete information about a table a snapshot need to be loaded.
41/// Most commonly this is the latest state of the tablem but may also loaded for a specific
42/// version or point in time.
43#[derive(Clone)]
44pub struct DeltaTable {
45    /// The state of the table as of the most recent loaded Delta log entry.
46    pub state: Option<DeltaTableState>,
47    /// the load options used during load
48    pub config: DeltaTableConfig,
49    /// log store
50    pub(crate) log_store: LogStoreRef,
51}
52
53impl Serialize for DeltaTable {
54    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
55    where
56        S: Serializer,
57    {
58        let mut seq = serializer.serialize_seq(None)?;
59        seq.serialize_element(&self.state)?;
60        seq.serialize_element(&self.config)?;
61        seq.serialize_element(self.log_store.config())?;
62        seq.end()
63    }
64}
65
66impl<'de> Deserialize<'de> for DeltaTable {
67    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
68    where
69        D: Deserializer<'de>,
70    {
71        struct DeltaTableVisitor {}
72
73        impl<'de> Visitor<'de> for DeltaTableVisitor {
74            type Value = DeltaTable;
75
76            fn expecting(&self, formatter: &mut Formatter) -> fmt::Result {
77                formatter.write_str("struct DeltaTable")
78            }
79
80            fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
81            where
82                A: SeqAccess<'de>,
83            {
84                let state = seq
85                    .next_element()?
86                    .ok_or_else(|| A::Error::invalid_length(0, &self))?;
87                let config = seq
88                    .next_element()?
89                    .ok_or_else(|| A::Error::invalid_length(0, &self))?;
90                let storage_config: LogStoreConfig = seq
91                    .next_element()?
92                    .ok_or_else(|| A::Error::invalid_length(0, &self))?;
93                let log_store = crate::logstore::logstore_for(
94                    storage_config.location(),
95                    storage_config.options().clone(),
96                )
97                .map_err(|_| A::Error::custom("Failed deserializing LogStore"))?;
98
99                let table = DeltaTable {
100                    state,
101                    config,
102                    log_store,
103                };
104                Ok(table)
105            }
106        }
107
108        deserializer.deserialize_seq(DeltaTableVisitor {})
109    }
110}
111
112impl DeltaTable {
113    /// Create a new Delta Table struct without loading any data from backing storage.
114    ///
115    /// NOTE: This is for advanced users. If you don't know why you need to use this method, please
116    /// call one of the `open_table` helper methods instead.
117    pub fn new(log_store: LogStoreRef, config: DeltaTableConfig) -> Self {
118        Self {
119            state: None,
120            log_store,
121            config,
122        }
123    }
124
125    /// Create a new [`DeltaTable`] instance, backed by an un-initialized in memory table
126    ///
127    /// Using this will not persist any changes beyond the lifetime of the table object.
128    /// The main purpose of in-memory tables is for use in testing.
129    ///
130    /// ```
131    /// use deltalake_core::DeltaTable;
132    /// let table = DeltaTable::new_in_memory();
133    /// ```
134    pub fn new_in_memory() -> Self {
135        let url = Url::parse("memory:///").unwrap();
136        DeltaTableBuilder::from_url(url).unwrap().build().unwrap()
137    }
138
139    /// Create a new [`DeltaTable`] from a [`DeltaTableState`] without loading any
140    /// data from backing storage.
141    ///
142    /// NOTE: This is for advanced users. If you don't know why you need to use this method,
143    /// please call one of the `open_table` helper methods instead.
144    pub(crate) fn new_with_state(log_store: LogStoreRef, state: DeltaTableState) -> Self {
145        let config = state.load_config().clone();
146        Self {
147            state: Some(state),
148            log_store,
149            config,
150        }
151    }
152
153    /// get a shared reference to the delta object store
154    pub fn object_store(&self) -> ObjectStoreRef {
155        self.log_store.object_store(None)
156    }
157
158    /// Check if the [`DeltaTable`] exists
159    pub async fn verify_deltatable_existence(&self) -> DeltaResult<bool> {
160        self.log_store.is_delta_table_location().await
161    }
162
163    /// The URI of the underlying data
164    pub fn table_url(&self) -> &Url {
165        self.log_store.root_url()
166    }
167
168    /// get a shared reference to the log store
169    pub fn log_store(&self) -> LogStoreRef {
170        self.log_store.clone()
171    }
172
173    /// returns the latest available version of the table
174    pub async fn get_latest_version(&self) -> Result<Version, DeltaTableError> {
175        self.log_store
176            .get_latest_version(self.version().unwrap_or(0))
177            .await
178    }
179
180    /// Currently loaded version of the table - if any.
181    ///
182    /// This will return the latest version of the table if it has been loaded.
183    /// Returns `None` if the table has not been loaded.
184    pub fn version(&self) -> Option<Version> {
185        self.state.as_ref().map(|s| s.version())
186    }
187
188    /// Load DeltaTable with data from latest checkpoint
189    pub async fn load(&mut self) -> Result<(), DeltaTableError> {
190        self.update_incremental(None).await
191    }
192
193    /// Updates the DeltaTable to the most recent state committed to the transaction log by
194    /// loading the last checkpoint and incrementally applying each version since.
195    pub async fn update_state(&mut self) -> Result<(), DeltaTableError> {
196        self.update_incremental(None).await
197    }
198
199    /// Updates the DeltaTable by incrementally applying newer versions, optionally bounded by
200    /// `max_version`.
201    ///
202    /// This API is forward-only. Use [`DeltaTable::load_version`] to load an older version.
203    pub async fn update_incremental(
204        &mut self,
205        max_version: Option<Version>,
206    ) -> Result<(), DeltaTableError> {
207        let Some(state) = self.state.as_mut() else {
208            self.state = Some(
209                DeltaTableState::try_new(&self.log_store, self.config.clone(), max_version).await?,
210            );
211            return Ok(());
212        };
213
214        let current_version = state.version();
215        if let Some(requested_version) = max_version
216            && requested_version < current_version
217        {
218            return Err(DeltaTableError::VersionDowngrade {
219                current_version,
220                requested_version,
221            });
222        }
223
224        state.update(&self.log_store, max_version).await?;
225        Ok(())
226    }
227
228    /// Loads the DeltaTable state for the given version.
229    pub async fn load_version(&mut self, version: Version) -> Result<(), DeltaTableError> {
230        if let Some(snapshot) = &self.state
231            && snapshot.version() > version
232        {
233            self.state = None;
234        }
235        self.update_incremental(Some(version)).await
236    }
237
238    pub(crate) async fn get_version_timestamp(
239        &self,
240        version: Version,
241    ) -> Result<i64, DeltaTableError> {
242        match self
243            .state
244            .as_ref()
245            .and_then(|s| s.version_timestamp(version))
246        {
247            Some(ts) => Ok(ts),
248            None => {
249                let meta = self
250                    .object_store()
251                    .head(&commit_uri_from_version(Some(version)))
252                    .await?;
253                let ts = meta.last_modified.timestamp_millis();
254                Ok(ts)
255            }
256        }
257    }
258
259    /// Returns provenance information, including the operation, user, and so on, for each write to a table.
260    /// The table history retention is based on the `logRetentionDuration` property of the Delta Table, 30 days by default.
261    /// If `limit` is given, this returns the information of the latest `limit` commits made to this table. Otherwise,
262    /// it returns all commits from the earliest commit.
263    pub async fn history(
264        &self,
265        limit: Option<usize>,
266    ) -> Result<impl Iterator<Item = CommitInfo> + use<>, DeltaTableError> {
267        let infos = self
268            .snapshot()?
269            .snapshot()
270            .snapshot()
271            .commit_infos(&self.log_store(), limit)
272            .await?
273            .try_collect::<Vec<_>>()
274            .await?;
275        Ok(infos.into_iter().flatten())
276    }
277
278    #[cfg(test)]
279    /// We have enough internal tests that just need to check the last commit of the table.
280    ///
281    /// This is a silly convenience function to reduce some copy-paste in tests
282    pub(crate) async fn last_commit(&self) -> Result<CommitInfo, DeltaTableError> {
283        let mut infos: Vec<_> = self.history(Some(1)).await?.collect();
284        infos.pop().ok_or(DeltaTableError::Generic(
285            "Somehow there is nothing in the history!".into(),
286        ))
287    }
288
289    /// Stream all logical files matching the provided `PartitionFilter`s.
290    pub fn get_active_add_actions_by_partitions(
291        &self,
292        filters: &[PartitionFilter],
293    ) -> BoxStream<'_, DeltaResult<LogicalFileView>> {
294        let Some(state) = self.state.as_ref() else {
295            return Box::pin(futures::stream::once(async {
296                Err(DeltaTableError::NotInitialized)
297            }));
298        };
299
300        if filters.is_empty() {
301            return state.snapshot().file_views(&self.log_store, None);
302        }
303
304        let predicate =
305            match crate::to_kernel_predicate(filters, state.snapshot().schema().as_ref()) {
306                Ok(predicate) => Arc::new(predicate),
307                Err(err) => return Box::pin(once(ready(Err(err)))),
308            };
309        state
310            .snapshot()
311            .file_views(&self.log_store, Some(predicate))
312    }
313
314    /// Returns the file list tracked in current table state filtered by provided
315    /// `PartitionFilter`s.
316    pub async fn get_files_by_partitions(
317        &self,
318        filters: &[PartitionFilter],
319    ) -> Result<Vec<Path>, DeltaTableError> {
320        Ok(self
321            .get_active_add_actions_by_partitions(filters)
322            .try_collect::<Vec<_>>()
323            .await?
324            .into_iter()
325            .map(|add| add.object_store_path())
326            .collect())
327    }
328
329    /// Return the file uris as strings for the partition(s)
330    pub async fn get_file_uris_by_partitions(
331        &self,
332        filters: &[PartitionFilter],
333    ) -> Result<Vec<String>, DeltaTableError> {
334        let files = self.get_files_by_partitions(filters).await?;
335        Ok(files
336            .iter()
337            .map(|fname| self.log_store.to_uri(fname))
338            .collect())
339    }
340
341    /// Returns a URIs for all active files present in the current table version.
342    pub fn get_file_uris(&self) -> DeltaResult<impl Iterator<Item = String> + '_> {
343        Ok(self
344            .state
345            .as_ref()
346            .ok_or(DeltaTableError::NotInitialized)?
347            .log_data()
348            .into_iter()
349            .map(|add| add.object_store_path())
350            .map(|path| self.log_store.to_uri(&path)))
351    }
352
353    /// Returns the currently loaded state snapshot.
354    ///
355    /// This method provides access to the currently loaded state of the Delta table.
356    ///
357    /// ## Returns
358    ///
359    /// A reference to the current state of the Delta table.
360    ///
361    /// ## Errors
362    ///
363    /// Returns [`NotInitialized`](DeltaTableError::NotInitialized) if the table has not been initialized.
364    pub fn snapshot(&self) -> DeltaResult<&DeltaTableState> {
365        self.state.as_ref().ok_or(DeltaTableError::NotInitialized)
366    }
367
368    /// Time travel Delta table to the latest version that's created at or before provided
369    /// `datetime` argument.
370    ///
371    /// Internally, this methods performs a binary search on all Delta transaction logs.
372    pub async fn load_with_datetime(
373        &mut self,
374        datetime: DateTime<Utc>,
375    ) -> Result<(), DeltaTableError> {
376        let mut min_version: i64 = -1;
377        let log_store = self.log_store();
378        let prefix = log_store.log_path();
379        let offset_path = commit_uri_from_version(None);
380        let object_store = log_store.object_store(None);
381        let mut files = object_store.list_with_offset(Some(prefix), &offset_path);
382
383        while let Some(obj_meta) = files.next().await {
384            let obj_meta = obj_meta?;
385            let location_path: Path = obj_meta.location.clone();
386            let part_count = location_path.prefix_match(prefix).unwrap().count();
387            if part_count > 1 {
388                // Per the spec, ignore any files in subdirectories.
389                // Spark may create these as uncommitted transactions which we don't want
390                //
391                // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#delta-log-entries
392                // "Delta files are stored as JSON in a directory at the *root* of the table
393                // named _delta_log, and ... make up the log of all changes that have occurred to a table."
394                continue;
395            }
396            if let Some(log_version) = extract_version_from_filename(obj_meta.location.as_ref()) {
397                if min_version == -1 {
398                    min_version = log_version as i64;
399                } else {
400                    min_version = min(min_version, log_version as i64);
401                }
402            }
403            if min_version == 0 {
404                break;
405            }
406        }
407        let latest_default_version = if min_version < 0 {
408            0
409        } else {
410            min_version.try_into().unwrap()
411        };
412        let mut max_version = match self
413            .log_store
414            .get_latest_version(self.version().unwrap_or(latest_default_version))
415            .await
416        {
417            Ok(version) => version,
418            Err(DeltaTableError::InvalidVersion(_)) => {
419                return Err(DeltaTableError::NotATable(
420                    log_store.table_root_url().to_string(),
421                ));
422            }
423            Err(e) => return Err(e),
424        } as i64;
425        let mut version = min_version;
426        let lowest_table_version = min_version;
427        let target_ts = datetime.timestamp_millis();
428
429        // binary search
430        while min_version <= max_version {
431            let pivot = (max_version + min_version) / 2;
432            version = pivot;
433            let pts: i64 = self
434                .get_version_timestamp(pivot.try_into().unwrap())
435                .await?;
436            match pts.cmp(&target_ts) {
437                Ordering::Equal => {
438                    break;
439                }
440                Ordering::Less => {
441                    min_version = pivot + 1;
442                }
443                Ordering::Greater => {
444                    max_version = pivot - 1;
445                    version = max_version
446                }
447            }
448        }
449
450        if version < lowest_table_version {
451            version = lowest_table_version;
452        }
453        assert!(
454            version >= 0,
455            "load_with_datetime() came up with a negative version which shouldn't be possible"
456        );
457
458        self.load_version(version.try_into().unwrap()).await
459    }
460}
461
462impl fmt::Display for DeltaTable {
463    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
464        writeln!(f, "DeltaTable({})", self.table_url())?;
465        writeln!(f, "\tversion: {:?}", self.version())
466    }
467}
468
469impl std::fmt::Debug for DeltaTable {
470    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
471        write!(f, "DeltaTable <{}>", self.table_url())
472    }
473}
474
475/// Normalize a given [Url] to **always** contain a trailing slash. This is critically important
476/// for assumptions about [Url] equivalency and more importantly for **joining** on a Url`.
477///
478/// This function will also remove redundant slashes in the ]Url] path which can cause other
479/// equivalency failures
480///
481/// ```ignore
482///  left.join("_delta_log"); // produces `s3://bucket/prefix/_delta_log`
483///  right.join("_delta_log"); // produces `s3://bucket/_delta_log`
484/// ```
485pub fn normalize_table_url(url: &Url) -> Url {
486    let mut new_segments = vec![];
487    for segment in url.path().split('/') {
488        if !segment.is_empty() {
489            new_segments.push(segment);
490        }
491    }
492    // Add a trailing slash segment
493    new_segments.push("");
494
495    let mut url = url.clone();
496    url.set_path(&new_segments.join("/"));
497    url
498}
499
500#[cfg(test)]
501mod tests {
502    use arrow_ipc::writer::FileWriter;
503    use pretty_assertions::assert_eq;
504    use serde_json::json;
505    use tempfile::TempDir;
506
507    use super::*;
508    use crate::kernel::{DataType, PrimitiveType, StructField};
509    use crate::operations::create::CreateBuilder;
510
511    fn legacy_eager_snapshot_payload(snapshot: &crate::kernel::EagerSnapshot) -> serde_json::Value {
512        let mut snapshot_value = serde_json::to_value(snapshot.snapshot()).unwrap();
513        let snapshot_fields = snapshot_value
514            .as_array_mut()
515            .expect("snapshot serde should use a sequence");
516        snapshot_fields.pop();
517
518        let materialized_files = snapshot
519            .snapshot()
520            .materialized_files()
521            .expect("expected materialized files for legacy eager snapshot payload");
522        let bytes = if materialized_files.batches.is_empty() {
523            Vec::new()
524        } else {
525            let mut buffer = vec![];
526            let mut writer =
527                FileWriter::try_new(&mut buffer, materialized_files.batches[0].schema().as_ref())
528                    .unwrap();
529            for batch in materialized_files.batches.iter() {
530                writer.write(batch).unwrap();
531            }
532            writer.finish().unwrap();
533            drop(writer);
534            buffer
535        };
536
537        json!([snapshot_value, bytes])
538    }
539
540    #[test]
541    fn test_normalize_table_url() {
542        for (u, path) in [
543            (Url::parse("s3://bucket/prefix/").unwrap(), "/prefix/"),
544            (Url::parse("s3://bucket/prefix").unwrap(), "/prefix/"),
545            (
546                Url::parse("s3://bucket/prefix with space/").unwrap(),
547                "/prefix%20with%20space/",
548            ),
549            (
550                Url::parse("s3://bucket/special&chars/你好/😊").unwrap(),
551                "/special&chars/%E4%BD%A0%E5%A5%BD/%F0%9F%98%8A/",
552            ),
553            (
554                Url::parse("s3://bucket/prefix/with/redundant/slashes//").unwrap(),
555                "/prefix/with/redundant/slashes/",
556            ),
557        ] {
558            assert_eq!(
559                normalize_table_url(&u).path(),
560                path,
561                "Failed to normalize: {}",
562                u.as_str()
563            );
564        }
565    }
566
567    #[tokio::test]
568    async fn table_round_trip() {
569        let (dt, tmp_dir) = create_test_table().await;
570        let bytes = serde_json::to_vec(&dt).unwrap();
571        let actual: DeltaTable = serde_json::from_slice(&bytes).unwrap();
572        assert_eq!(actual.version(), dt.version());
573        drop(tmp_dir);
574    }
575
576    #[tokio::test]
577    async fn table_round_trip_preserves_legacy_eager_snapshot_payload() {
578        let (dt, tmp_dir) = create_test_table().await;
579        let mut value = serde_json::to_value(&dt).unwrap();
580        let table_fields = value.as_array_mut().unwrap();
581        let state = table_fields[0].as_object_mut().unwrap();
582        state.insert(
583            "snapshot".to_string(),
584            legacy_eager_snapshot_payload(dt.state.as_ref().unwrap().snapshot()),
585        );
586
587        let actual: DeltaTable = serde_json::from_value(value).unwrap();
588        assert_eq!(
589            actual.snapshot().unwrap().log_data().num_files(),
590            dt.snapshot().unwrap().log_data().num_files()
591        );
592        drop(tmp_dir);
593    }
594
595    #[tokio::test]
596    async fn table_without_files_does_not_panic_on_log_data() {
597        let (dt, _tmp_dir) = create_test_table().await;
598        let url = dt.table_url().clone();
599
600        let table = DeltaTableBuilder::from_url(url)
601            .unwrap()
602            .without_files()
603            .load()
604            .await
605            .unwrap();
606
607        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
608            table.snapshot().unwrap().log_data().num_files()
609        }));
610
611        assert!(result.is_ok());
612    }
613
614    async fn create_test_table() -> (DeltaTable, TempDir) {
615        let tmp_dir = tempfile::tempdir().unwrap();
616        let table_dir = tmp_dir.path().join("test_create");
617        std::fs::create_dir(&table_dir).unwrap();
618
619        let dt = CreateBuilder::new()
620            .with_location(table_dir.to_str().unwrap())
621            .with_table_name("Test Table Create")
622            .with_comment("This table is made to test the create function for a DeltaTable")
623            .with_columns(vec![
624                StructField::new(
625                    "Id".to_string(),
626                    DataType::Primitive(PrimitiveType::Integer),
627                    true,
628                ),
629                StructField::new(
630                    "Name".to_string(),
631                    DataType::Primitive(PrimitiveType::String),
632                    true,
633                ),
634            ])
635            .await
636            .unwrap();
637        (dt, tmp_dir)
638    }
639}