deltalake_core/logstore/
mod.rs

1//! # DeltaLake storage system
2//!
3//! Interacting with storage systems is a crucial part of any table format.
4//! On one had the storage abstractions need to provide certain guarantees
5//! (e.g. atomic rename, ...) and meet certain assumptions (e.g. sorted list results)
6//! on the other hand can we exploit our knowledge about the general file layout
7//! and access patterns to optimize our operations in terms of cost and performance.
8//!
9//! Two distinct phases are involved in querying a Delta table:
10//! - **Metadata**: Fetching metadata about the table, such as schema, partitioning, and statistics.
11//! - **Data**: Reading and processing data files based on the metadata.
12//!
13//! When writing to a table, we see the same phases, just in inverse order:
14//! - **Data**: Writing data files that should become part of the table.
15//! - **Metadata**: Updating table metadata to incorporate updates.
16//!
17//! Two main abstractions govern the file operations [`LogStore`] and [`ObjectStore`].
18//!
19//! [`LogStore`]s are scoped to individual tables and are responsible for maintaining proper
20//! behaviours and ensuring consistency during the metadata phase. The correctness is predicated
21//! on the atomicity and durability guarantees of the implementation of this interface.
22//!
23//! - Atomic visibility: Partial writes must not be visible to readers.
24//! - Mutual exclusion: Only one writer must be able to write to a specific log file.
25//! - Consistent listing: Once a file has been written, any future list files operation must return
26//!   the underlying file system entry must immediately.
27//!
28//! <div class="warning">
29//!
30//! While most object stores today provide the required guarantees, the specific
31//! locking mechanics are a table level responsibility. Specific implementations may
32//! decide to refer to a central catalog or other mechanisms for coordination.
33//!
34//! </div>
35//!
36//! [`ObjectStore`]s are responsible for direct interactions with storage systems. Either
37//! during the data phase, where additional requirements are imposed on the storage system,
38//! or by specific LogStore implementations for their internal object store interactions.
39//!
40//! ## Managing LogStores and ObjectStores.
41//!
42//! Aside from very basic implementations (i.e. in-memory and local file system) we rely
43//! on external integrations to provide [`ObjectStore`] and/or [`LogStore`] implementations.
44//!
45//! At runtime, deltalake needs to produce appropriate [`ObjectStore`]s to access the files
46//! discovered in a table. This is done via
47//!
48//! ## Configuration
49//!
50use std::collections::HashMap;
51use std::sync::{Arc, LazyLock};
52
53use bytes::Bytes;
54#[cfg(feature = "datafusion")]
55use datafusion::datasource::object_store::ObjectStoreUrl;
56use delta_kernel::engine::default::executor::tokio::{
57    TokioBackgroundExecutor, TokioMultiThreadExecutor,
58};
59use delta_kernel::engine::default::DefaultEngine;
60use delta_kernel::log_segment::LogSegment;
61use delta_kernel::path::{LogPathFileType, ParsedLogPath};
62use delta_kernel::{AsAny, Engine};
63use futures::StreamExt;
64use object_store::ObjectStoreScheme;
65use object_store::{path::Path, Error as ObjectStoreError, ObjectStore};
66use regex::Regex;
67use serde::de::{Error, SeqAccess, Visitor};
68use serde::ser::SerializeSeq;
69use serde::{Deserialize, Serialize};
70use serde_json::Deserializer;
71use tokio::runtime::RuntimeFlavor;
72use tracing::*;
73use url::Url;
74use uuid::Uuid;
75
76use crate::kernel::transaction::TransactionError;
77use crate::kernel::{spawn_blocking_with_span, Action};
78use crate::{DeltaResult, DeltaTableError};
79
80pub use self::config::StorageConfig;
81pub use self::factories::{
82    logstore_factories, object_store_factories, store_for, LogStoreFactory,
83    LogStoreFactoryRegistry, ObjectStoreFactory, ObjectStoreFactoryRegistry,
84};
85pub use self::storage::utils::commit_uri_from_version;
86pub use self::storage::{
87    DefaultObjectStoreRegistry, DeltaIOStorageBackend, IORuntime, ObjectStoreRef,
88    ObjectStoreRegistry, ObjectStoreRetryExt,
89};
90/// Convenience re-export of the object store crate
91pub use ::object_store;
92
93pub mod config;
94pub(crate) mod default_logstore;
95pub(crate) mod factories;
96pub(crate) mod storage;
97
98/// Internal trait to handle object store configuration and initialization.
99trait LogStoreFactoryExt {
100    /// Create a new log store with the given options.
101    ///
102    /// ## Parameters
103    ///
104    /// - `root_store`: and instance of [`ObjectStoreRef`] with no prefix o.a. applied.
105    ///   I.e. pointing to the root of the onject store.
106    /// - `location`: The location of the delta table (where the `_delta_log` directory is).
107    /// - `options`: The options for the log store.
108    fn with_options_internal(
109        &self,
110        root_store: ObjectStoreRef,
111        location: &Url,
112        options: &StorageConfig,
113    ) -> DeltaResult<LogStoreRef>;
114}
115
116impl<T: LogStoreFactory + ?Sized> LogStoreFactoryExt for T {
117    fn with_options_internal(
118        &self,
119        root_store: ObjectStoreRef,
120        location: &Url,
121        options: &StorageConfig,
122    ) -> DeltaResult<LogStoreRef> {
123        let prefixed_store = options.decorate_store(root_store.clone(), location)?;
124        let log_store =
125            self.with_options(Arc::new(prefixed_store), root_store, location, options)?;
126        Ok(log_store)
127    }
128}
129
130impl<T: LogStoreFactory> LogStoreFactoryExt for Arc<T> {
131    fn with_options_internal(
132        &self,
133        root_store: ObjectStoreRef,
134        location: &Url,
135        options: &StorageConfig,
136    ) -> DeltaResult<LogStoreRef> {
137        T::with_options_internal(self, root_store, location, options)
138    }
139}
140
141/// Return the [DefaultLogStore] implementation with the provided configuration options
142pub fn default_logstore(
143    prefixed_store: ObjectStoreRef,
144    root_store: ObjectStoreRef,
145    location: &Url,
146    options: &StorageConfig,
147) -> Arc<dyn LogStore> {
148    Arc::new(default_logstore::DefaultLogStore::new(
149        prefixed_store,
150        root_store,
151        LogStoreConfig {
152            location: location.clone(),
153            options: options.clone(),
154        },
155    ))
156}
157
158/// Sharable reference to [`LogStore`]
159pub type LogStoreRef = Arc<dyn LogStore>;
160
161static DELTA_LOG_PATH: LazyLock<Path> = LazyLock::new(|| Path::from("_delta_log"));
162
163pub(crate) static DELTA_LOG_REGEX: LazyLock<Regex> =
164    LazyLock::new(|| Regex::new(r"(\d{20})\.(json|checkpoint(\.\d+)?\.parquet)$").unwrap());
165
166/// Return the [LogStoreRef] for the provided [Url] location
167///
168/// This will use the built-in process global [crate::storage::ObjectStoreRegistry] by default
169///
170/// ```rust
171/// # use deltalake_core::logstore::*;
172/// # use std::collections::HashMap;
173/// # use url::Url;
174/// let location = Url::parse("memory:///").expect("Failed to make location");
175/// let storage_config = StorageConfig::default();
176/// let logstore = logstore_for(location, storage_config).expect("Failed to get a logstore");
177/// ```
178pub fn logstore_for(location: Url, storage_config: StorageConfig) -> DeltaResult<LogStoreRef> {
179    // turn location into scheme
180    let scheme = Url::parse(&format!("{}://", location.scheme()))
181        .map_err(|_| DeltaTableError::InvalidTableLocation(location.clone().into()))?;
182
183    if let Some(entry) = object_store_factories().get(&scheme) {
184        debug!("Found a storage provider for {scheme} ({location})");
185        let (root_store, _prefix) = entry.value().parse_url_opts(&location, &storage_config)?;
186        return logstore_with(root_store, location, storage_config);
187    }
188
189    Err(DeltaTableError::InvalidTableLocation(location.into()))
190}
191
192/// Return the [LogStoreRef] using the given [ObjectStoreRef]
193pub fn logstore_with(
194    root_store: ObjectStoreRef,
195    location: Url,
196    storage_config: StorageConfig,
197) -> DeltaResult<LogStoreRef> {
198    let scheme = Url::parse(&format!("{}://", location.scheme()))
199        .map_err(|_| DeltaTableError::InvalidTableLocation(location.clone().into()))?;
200
201    if let Some(factory) = logstore_factories().get(&scheme) {
202        debug!("Found a logstore provider for {scheme}");
203        return factory
204            .value()
205            .with_options_internal(root_store, &location, &storage_config);
206    }
207
208    error!("Could not find a logstore for the scheme {scheme}");
209    Err(DeltaTableError::InvalidTableLocation(
210        location.clone().into(),
211    ))
212}
213
214/// Holder whether it's tmp_commit path or commit bytes
215#[derive(Clone)]
216pub enum CommitOrBytes {
217    /// Path of the tmp commit, to be used by logstores which use CopyIfNotExists
218    TmpCommit(Path),
219    /// Bytes of the log, to be used by logstoers which use Conditional Put
220    LogBytes(Bytes),
221}
222
223/// The next commit that's available from underlying storage
224///
225#[derive(Debug)]
226pub enum PeekCommit {
227    /// The next commit version and associated actions
228    New(i64, Vec<Action>),
229    /// Provided DeltaVersion is up to date
230    UpToDate,
231}
232
233/// Configuration parameters for a log store
234#[derive(Debug, Clone)]
235pub struct LogStoreConfig {
236    /// url corresponding to the storage location.
237    pub location: Url,
238    // Options used for configuring backend storage
239    pub options: StorageConfig,
240}
241
242impl LogStoreConfig {
243    pub fn decorate_store<T: ObjectStore + Clone>(
244        &self,
245        store: T,
246        table_root: Option<&url::Url>,
247    ) -> DeltaResult<Box<dyn ObjectStore>> {
248        let table_url = table_root.unwrap_or(&self.location);
249        self.options.decorate_store(store, table_url)
250    }
251
252    pub fn object_store_factory(&self) -> ObjectStoreFactoryRegistry {
253        self::factories::object_store_factories()
254    }
255}
256
257/// Trait for critical operations required to read and write commit entries in Delta logs.
258///
259/// The correctness is predicated on the atomicity and durability guarantees of
260/// the implementation of this interface. Specifically,
261///
262/// - Atomic visibility: Any commit created via `write_commit_entry` must become visible atomically.
263/// - Mutual exclusion: Only one writer must be able to create a commit for a specific version.
264/// - Consistent listing: Once a commit entry for version `v` has been written, any future call to
265///   `get_latest_version` must return a version >= `v`, i.e. the underlying file system entry must
266///   become visible immediately.
267#[async_trait::async_trait]
268pub trait LogStore: Send + Sync + AsAny {
269    /// Return the name of this LogStore implementation
270    fn name(&self) -> String;
271
272    /// Trigger sync operation on log store to.
273    async fn refresh(&self) -> DeltaResult<()> {
274        Ok(())
275    }
276
277    /// Read data for commit entry with the given version.
278    async fn read_commit_entry(&self, version: i64) -> DeltaResult<Option<Bytes>>;
279
280    /// Write list of actions as delta commit entry for given version.
281    ///
282    /// This operation can be retried with a higher version in case the write
283    /// fails with [`TransactionError::VersionAlreadyExists`].
284    async fn write_commit_entry(
285        &self,
286        version: i64,
287        commit_or_bytes: CommitOrBytes,
288        operation_id: Uuid,
289    ) -> Result<(), TransactionError>;
290
291    /// Abort the commit entry for the given version.
292    async fn abort_commit_entry(
293        &self,
294        version: i64,
295        commit_or_bytes: CommitOrBytes,
296        operation_id: Uuid,
297    ) -> Result<(), TransactionError>;
298
299    /// Find latest version currently stored in the delta log.
300    async fn get_latest_version(&self, start_version: i64) -> DeltaResult<i64>;
301
302    /// Get the list of actions for the next commit
303    async fn peek_next_commit(&self, current_version: i64) -> DeltaResult<PeekCommit> {
304        let next_version = current_version + 1;
305        let commit_log_bytes = match self.read_commit_entry(next_version).await {
306            Ok(Some(bytes)) => Ok(bytes),
307            Ok(None) => return Ok(PeekCommit::UpToDate),
308            Err(err) => Err(err),
309        }?;
310
311        let actions = crate::logstore::get_actions(next_version, &commit_log_bytes);
312        Ok(PeekCommit::New(next_version, actions?))
313    }
314
315    /// Get object store, can pass operation_id for object stores linked to an operation
316    fn object_store(&self, operation_id: Option<Uuid>) -> Arc<dyn ObjectStore>;
317
318    fn root_object_store(&self, operation_id: Option<Uuid>) -> Arc<dyn ObjectStore>;
319
320    fn engine(&self, operation_id: Option<Uuid>) -> Arc<dyn Engine> {
321        let store = self.root_object_store(operation_id);
322        get_engine(store)
323    }
324
325    /// [Path] to Delta log
326    fn to_uri(&self, location: &Path) -> String {
327        let root = &self.config().location;
328        to_uri(root, location)
329    }
330
331    /// Get fully qualified uri for table root
332    fn root_uri(&self) -> String {
333        self.to_uri(&Path::from(""))
334    }
335
336    /// [Path] to Delta log
337    fn log_path(&self) -> &Path {
338        &DELTA_LOG_PATH
339    }
340
341    #[deprecated(
342        since = "0.1.0",
343        note = "DO NOT USE: Just a stop gap to support lakefs during kernel migration"
344    )]
345    fn transaction_url(&self, _operation_id: Uuid, base: &Url) -> DeltaResult<Url> {
346        Ok(base.clone())
347    }
348
349    /// Check if the location is a delta table location
350    async fn is_delta_table_location(&self) -> DeltaResult<bool> {
351        let object_store = self.object_store(None);
352        let dummy_url = Url::parse("http://example.com").unwrap();
353        let log_path = Path::from("_delta_log");
354
355        let mut stream = object_store.list(Some(&log_path));
356        while let Some(res) = stream.next().await {
357            match res {
358                Ok(meta) => {
359                    let file_url = dummy_url.join(meta.location.as_ref()).unwrap();
360                    if let Ok(Some(parsed_path)) = ParsedLogPath::try_from(file_url) {
361                        if matches!(
362                            parsed_path.file_type,
363                            LogPathFileType::Commit
364                                | LogPathFileType::SinglePartCheckpoint
365                                | LogPathFileType::UuidCheckpoint(_)
366                                | LogPathFileType::MultiPartCheckpoint { .. }
367                                | LogPathFileType::CompactedCommit { .. }
368                        ) {
369                            return Ok(true);
370                        }
371                    }
372                    continue;
373                }
374                Err(ObjectStoreError::NotFound { .. }) => return Ok(false),
375                Err(err) => return Err(err.into()),
376            }
377        }
378
379        Ok(false)
380    }
381
382    /// Get configuration representing configured log store.
383    fn config(&self) -> &LogStoreConfig;
384
385    #[cfg(feature = "datafusion")]
386    /// Generate a unique enough url to identify the store in datafusion.
387    /// The DF object store registry only cares about the scheme and the host of the url for
388    /// registering/fetching. In our case the scheme is hard-coded to "delta-rs", so to get a unique
389    /// host we convert the location from this `LogStore` to a valid name, combining the
390    /// original scheme, host and path with invalid characters replaced.
391    fn object_store_url(&self) -> ObjectStoreUrl {
392        crate::logstore::object_store_url(&self.config().location)
393    }
394}
395
396/// Extension trait for LogStore to handle some internal invariants.
397pub(crate) trait LogStoreExt: LogStore {
398    /// The fully qualified table URL
399    ///
400    /// The paths is guaranteed to end with a slash,
401    /// so that it can be used as a prefix for other paths.
402    fn table_root_url(&self) -> Url {
403        let mut base = self.config().location.clone();
404        if !base.path().ends_with("/") {
405            base.set_path(&format!("{}/", base.path()));
406        }
407        base
408    }
409
410    /// The fully qualified table log URL
411    ///
412    /// The paths is guaranteed to end with a slash,
413    /// so that it can be used as a prefix for other paths.
414    fn log_root_url(&self) -> Url {
415        self.table_root_url().join("_delta_log/").unwrap()
416    }
417}
418
419impl<T: LogStore + ?Sized> LogStoreExt for T {}
420
421#[async_trait::async_trait]
422impl<T: LogStore + ?Sized> LogStore for Arc<T> {
423    fn name(&self) -> String {
424        T::name(self)
425    }
426
427    async fn refresh(&self) -> DeltaResult<()> {
428        T::refresh(self).await
429    }
430
431    async fn read_commit_entry(&self, version: i64) -> DeltaResult<Option<Bytes>> {
432        T::read_commit_entry(self, version).await
433    }
434
435    async fn write_commit_entry(
436        &self,
437        version: i64,
438        commit_or_bytes: CommitOrBytes,
439        operation_id: Uuid,
440    ) -> Result<(), TransactionError> {
441        T::write_commit_entry(self, version, commit_or_bytes, operation_id).await
442    }
443
444    async fn abort_commit_entry(
445        &self,
446        version: i64,
447        commit_or_bytes: CommitOrBytes,
448        operation_id: Uuid,
449    ) -> Result<(), TransactionError> {
450        T::abort_commit_entry(self, version, commit_or_bytes, operation_id).await
451    }
452
453    async fn get_latest_version(&self, start_version: i64) -> DeltaResult<i64> {
454        T::get_latest_version(self, start_version).await
455    }
456
457    async fn peek_next_commit(&self, current_version: i64) -> DeltaResult<PeekCommit> {
458        T::peek_next_commit(self, current_version).await
459    }
460
461    fn object_store(&self, operation_id: Option<Uuid>) -> Arc<dyn ObjectStore> {
462        T::object_store(self, operation_id)
463    }
464
465    fn root_object_store(&self, operation_id: Option<Uuid>) -> Arc<dyn ObjectStore> {
466        T::root_object_store(self, operation_id)
467    }
468
469    fn engine(&self, operation_id: Option<Uuid>) -> Arc<dyn Engine> {
470        T::engine(self, operation_id)
471    }
472
473    fn to_uri(&self, location: &Path) -> String {
474        T::to_uri(self, location)
475    }
476
477    fn root_uri(&self) -> String {
478        T::root_uri(self)
479    }
480
481    fn log_path(&self) -> &Path {
482        T::log_path(self)
483    }
484
485    async fn is_delta_table_location(&self) -> DeltaResult<bool> {
486        T::is_delta_table_location(self).await
487    }
488
489    fn config(&self) -> &LogStoreConfig {
490        T::config(self)
491    }
492
493    #[cfg(feature = "datafusion")]
494    fn object_store_url(&self) -> ObjectStoreUrl {
495        T::object_store_url(self)
496    }
497}
498
499pub(crate) fn get_engine(store: Arc<dyn ObjectStore>) -> Arc<dyn Engine> {
500    let handle = tokio::runtime::Handle::current();
501    match handle.runtime_flavor() {
502        RuntimeFlavor::MultiThread => Arc::new(DefaultEngine::new(
503            store,
504            Arc::new(TokioMultiThreadExecutor::new(handle)),
505        )),
506        RuntimeFlavor::CurrentThread => Arc::new(DefaultEngine::new(
507            store,
508            Arc::new(TokioBackgroundExecutor::new()),
509        )),
510        _ => panic!("unsupported runtime flavor"),
511    }
512}
513
514#[cfg(feature = "datafusion")]
515fn object_store_url(location: &Url) -> ObjectStoreUrl {
516    use object_store::path::DELIMITER;
517    ObjectStoreUrl::parse(format!(
518        "delta-rs://{}-{}{}",
519        location.scheme(),
520        location.host_str().unwrap_or("-"),
521        location.path().replace(DELIMITER, "-").replace(':', "-")
522    ))
523    .expect("Invalid object store url.")
524}
525
526/// Parse the path from a URL accounting for special case witjh S3
527// TODO: find out why this is necessary
528pub(crate) fn object_store_path(table_root: &Url) -> DeltaResult<Path> {
529    Ok(match ObjectStoreScheme::parse(table_root) {
530        Ok((_, path)) => path,
531        _ => Path::parse(table_root.path())?,
532    })
533}
534
535/// TODO
536pub fn to_uri(root: &Url, location: &Path) -> String {
537    match root.scheme() {
538        "file" => {
539            #[cfg(windows)]
540            let uri = format!(
541                "{}/{}",
542                root.as_ref().trim_end_matches('/'),
543                location.as_ref()
544            )
545            .replace("file:///", "");
546            #[cfg(unix)]
547            let uri = format!(
548                "{}/{}",
549                root.as_ref().trim_end_matches('/'),
550                location.as_ref()
551            )
552            .replace("file://", "");
553            uri
554        }
555        _ => {
556            if location.as_ref().is_empty() || location.as_ref() == "/" {
557                root.as_ref().to_string()
558            } else {
559                format!("{}/{}", root.as_ref(), location.as_ref())
560            }
561        }
562    }
563}
564
565/// Reads a commit and gets list of actions
566pub fn get_actions(
567    version: i64,
568    commit_log_bytes: &bytes::Bytes,
569) -> Result<Vec<Action>, DeltaTableError> {
570    debug!("parsing commit with version {version}...");
571    Deserializer::from_slice(commit_log_bytes)
572        .into_iter::<Action>()
573        .map(|result| {
574            result.map_err(|e| {
575                let line = format!("Error at line {}, column {}", e.line(), e.column());
576                DeltaTableError::InvalidJsonLog {
577                    json_err: e,
578                    line,
579                    version,
580                }
581            })
582        })
583        .collect()
584}
585
586// TODO: maybe a bit of a hack, required to `#[derive(Debug)]` for the operation builders
587impl std::fmt::Debug for dyn LogStore + '_ {
588    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
589        write!(f, "{}({})", self.name(), self.root_uri())
590    }
591}
592
593impl Serialize for LogStoreConfig {
594    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
595    where
596        S: serde::Serializer,
597    {
598        let mut seq = serializer.serialize_seq(None)?;
599        seq.serialize_element(&self.location.to_string())?;
600        seq.serialize_element(&self.options.raw)?;
601        seq.end()
602    }
603}
604
605impl<'de> Deserialize<'de> for LogStoreConfig {
606    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
607    where
608        D: serde::Deserializer<'de>,
609    {
610        struct LogStoreConfigVisitor {}
611
612        impl<'de> Visitor<'de> for LogStoreConfigVisitor {
613            type Value = LogStoreConfig;
614
615            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
616                formatter.write_str("struct LogStoreConfig")
617            }
618
619            fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
620            where
621                A: SeqAccess<'de>,
622            {
623                let location_str: String = seq
624                    .next_element()?
625                    .ok_or_else(|| A::Error::invalid_length(0, &self))?;
626                let options: HashMap<String, String> = seq
627                    .next_element()?
628                    .ok_or_else(|| A::Error::invalid_length(0, &self))?;
629                let location = Url::parse(&location_str).map_err(A::Error::custom)?;
630                Ok(LogStoreConfig {
631                    location,
632                    options: StorageConfig::parse_options(options).map_err(A::Error::custom)?,
633                })
634            }
635        }
636
637        deserializer.deserialize_seq(LogStoreConfigVisitor {})
638    }
639}
640
641/// Extract version from a file name in the delta log
642pub fn extract_version_from_filename(name: &str) -> Option<i64> {
643    DELTA_LOG_REGEX
644        .captures(name)
645        .map(|captures| captures.get(1).unwrap().as_str().parse().unwrap())
646}
647
648/// Default implementation for retrieving the latest version
649pub async fn get_latest_version(
650    log_store: &dyn LogStore,
651    current_version: i64,
652) -> DeltaResult<i64> {
653    let current_version = if current_version < 0 {
654        0
655    } else {
656        current_version
657    };
658
659    let storage = log_store.engine(None).storage_handler();
660    let log_root = log_store.log_root_url();
661
662    let segment = spawn_blocking_with_span(move || {
663        LogSegment::for_table_changes(storage.as_ref(), log_root, current_version as u64, None)
664    })
665    .await
666    .map_err(|e| DeltaTableError::Generic(e.to_string()))?
667    .map_err(|e| {
668        if e.to_string()
669            .contains(&format!("to have version {current_version}"))
670        {
671            DeltaTableError::InvalidVersion(current_version)
672        } else {
673            DeltaTableError::Generic(e.to_string())
674        }
675    })?;
676
677    Ok(segment.end_version as i64)
678}
679
680/// Read delta log for a specific version
681#[instrument(skip(storage), fields(version = version, path = %commit_uri_from_version(version)))]
682pub async fn read_commit_entry(
683    storage: &dyn ObjectStore,
684    version: i64,
685) -> DeltaResult<Option<Bytes>> {
686    let commit_uri = commit_uri_from_version(version);
687    match storage.get(&commit_uri).await {
688        Ok(res) => {
689            let bytes = res.bytes().await?;
690            debug!(size = bytes.len(), "commit entry read successfully");
691            Ok(Some(bytes))
692        }
693        Err(ObjectStoreError::NotFound { .. }) => {
694            debug!("commit entry not found");
695            Ok(None)
696        }
697        Err(err) => {
698            error!(error = %err, version = version, "failed to read commit entry");
699            Err(err.into())
700        }
701    }
702}
703
704/// Default implementation for writing a commit entry
705#[instrument(skip(storage), fields(version = version, tmp_path = %tmp_commit, commit_path = %commit_uri_from_version(version)))]
706pub async fn write_commit_entry(
707    storage: &dyn ObjectStore,
708    version: i64,
709    tmp_commit: &Path,
710) -> Result<(), TransactionError> {
711    // move temporary commit file to delta log directory
712    // rely on storage to fail if the file already exists -
713    storage
714        .rename_if_not_exists(tmp_commit, &commit_uri_from_version(version))
715        .await
716        .map_err(|err| -> TransactionError {
717            match err {
718                ObjectStoreError::AlreadyExists { .. } => {
719                    warn!("commit entry already exists");
720                    TransactionError::VersionAlreadyExists(version)
721                }
722                _ => {
723                    error!(error = %err, "failed to write commit entry");
724                    TransactionError::from(err)
725                }
726            }
727        })?;
728    debug!("commit entry written successfully");
729    Ok(())
730}
731
732/// Default implementation for aborting a commit entry
733#[instrument(skip(storage), fields(version = _version, tmp_path = %tmp_commit))]
734pub async fn abort_commit_entry(
735    storage: &dyn ObjectStore,
736    _version: i64,
737    tmp_commit: &Path,
738) -> Result<(), TransactionError> {
739    storage.delete_with_retries(tmp_commit, 15).await?;
740    debug!("commit entry aborted successfully");
741    Ok(())
742}
743
744#[cfg(test)]
745pub(crate) mod tests {
746    use futures::TryStreamExt;
747
748    use super::*;
749
750    #[test]
751    fn logstore_with_invalid_url() {
752        let location = Url::parse("nonexistent://table").unwrap();
753
754        let store = logstore_for(location, StorageConfig::default());
755        assert!(store.is_err());
756    }
757
758    #[test]
759    fn logstore_with_memory() {
760        let location = Url::parse("memory:///table").unwrap();
761        let store = logstore_for(location, StorageConfig::default());
762        assert!(store.is_ok());
763    }
764
765    #[test]
766    fn logstore_with_memory_and_rt() {
767        let location = Url::parse("memory:///table").unwrap();
768        let store = logstore_for(
769            location,
770            StorageConfig::default().with_io_runtime(IORuntime::default()),
771        );
772        assert!(store.is_ok());
773    }
774
775    #[test]
776    fn test_logstore_ext() {
777        let location = Url::parse("memory:///table").unwrap();
778        let store = logstore_for(location, StorageConfig::default()).unwrap();
779        let table_url = store.table_root_url();
780        assert!(table_url.path().ends_with('/'));
781        let log_url = store.log_root_url();
782        assert!(log_url.path().ends_with("_delta_log/"));
783    }
784
785    #[tokio::test]
786    async fn test_is_location_a_table() {
787        use object_store::path::Path;
788        use object_store::{PutOptions, PutPayload};
789        let location = Url::parse("memory:///table").unwrap();
790        let store =
791            logstore_for(location, StorageConfig::default()).expect("Failed to get logstore");
792        assert!(!store
793            .is_delta_table_location()
794            .await
795            .expect("Failed to look at table"));
796
797        // Let's put a failed commit into the directory and then see if it's still considered a
798        // delta table (it shouldn't be).
799        let payload = PutPayload::from_static(b"test-drivin");
800        let _put = store
801            .object_store(None)
802            .put_opts(
803                &Path::from("_delta_log/_commit_failed.tmp"),
804                payload,
805                PutOptions::default(),
806            )
807            .await
808            .expect("Failed to put");
809        assert!(!store
810            .is_delta_table_location()
811            .await
812            .expect("Failed to look at table"));
813    }
814
815    #[tokio::test]
816    async fn test_is_location_a_table_commit() {
817        use object_store::path::Path;
818        use object_store::{PutOptions, PutPayload};
819        let location = Url::parse("memory:///table").unwrap();
820        let store =
821            logstore_for(location, StorageConfig::default()).expect("Failed to get logstore");
822        assert!(!store
823            .is_delta_table_location()
824            .await
825            .expect("Failed to identify table"));
826
827        // Save a commit to the transaction log
828        let payload = PutPayload::from_static(b"test");
829        let _put = store
830            .object_store(None)
831            .put_opts(
832                &Path::from("_delta_log/00000000000000000000.json"),
833                payload,
834                PutOptions::default(),
835            )
836            .await
837            .expect("Failed to put");
838        // The table should be considered a delta table
839        assert!(store
840            .is_delta_table_location()
841            .await
842            .expect("Failed to identify table"));
843    }
844
845    #[tokio::test]
846    async fn test_is_location_a_table_checkpoint() {
847        use object_store::path::Path;
848        use object_store::{PutOptions, PutPayload};
849        let location = Url::parse("memory:///table").unwrap();
850        let store =
851            logstore_for(location, StorageConfig::default()).expect("Failed to get logstore");
852        assert!(!store
853            .is_delta_table_location()
854            .await
855            .expect("Failed to identify table"));
856
857        // Save a "checkpoint" file to the transaction log directory
858        let payload = PutPayload::from_static(b"test");
859        let _put = store
860            .object_store(None)
861            .put_opts(
862                &Path::from("_delta_log/00000000000000000000.checkpoint.parquet"),
863                payload,
864                PutOptions::default(),
865            )
866            .await
867            .expect("Failed to put");
868        // The table should be considered a delta table
869        assert!(store
870            .is_delta_table_location()
871            .await
872            .expect("Failed to identify table"));
873    }
874
875    #[tokio::test]
876    async fn test_is_location_a_table_crc() {
877        use object_store::path::Path;
878        use object_store::{PutOptions, PutPayload};
879        let location = Url::parse("memory:///table").unwrap();
880        let store =
881            logstore_for(location, StorageConfig::default()).expect("Failed to get logstore");
882        assert!(!store
883            .is_delta_table_location()
884            .await
885            .expect("Failed to identify table"));
886
887        // Save .crc files to the transaction log directory (all 3 formats)
888        let payload = PutPayload::from_static(b"test");
889
890        let _put = store
891            .object_store(None)
892            .put_opts(
893                &Path::from("_delta_log/.00000000000000000000.crc.crc"),
894                payload.clone(),
895                PutOptions::default(),
896            )
897            .await
898            .expect("Failed to put");
899
900        let _put = store
901            .object_store(None)
902            .put_opts(
903                &Path::from("_delta_log/.00000000000000000000.json.crc"),
904                payload.clone(),
905                PutOptions::default(),
906            )
907            .await
908            .expect("Failed to put");
909
910        let _put = store
911            .object_store(None)
912            .put_opts(
913                &Path::from("_delta_log/00000000000000000000.crc"),
914                payload.clone(),
915                PutOptions::default(),
916            )
917            .await
918            .expect("Failed to put");
919
920        // Now add a commit
921        let _put = store
922            .object_store(None)
923            .put_opts(
924                &Path::from("_delta_log/00000000000000000000.json"),
925                payload.clone(),
926                PutOptions::default(),
927            )
928            .await
929            .expect("Failed to put");
930
931        // The table should be considered a delta table
932        assert!(store
933            .is_delta_table_location()
934            .await
935            .expect("Failed to identify table"));
936    }
937
938    /// <https://github.com/delta-io/delta-rs/issues/3297>:w
939    #[tokio::test]
940    async fn test_peek_with_invalid_json() -> DeltaResult<()> {
941        use crate::logstore::object_store::memory::InMemory;
942        let memory_store = Arc::new(InMemory::new());
943        let log_path = Path::from("delta-table/_delta_log/00000000000000000001.json");
944
945        let log_content = r#"{invalid_json"#;
946
947        memory_store
948            .put(&log_path, log_content.into())
949            .await
950            .expect("Failed to write log file");
951
952        let table_uri = url::Url::parse("memory:///delta-table").unwrap();
953        let table = crate::DeltaTableBuilder::from_uri(table_uri.clone())
954            .unwrap()
955            .with_storage_backend(memory_store, table_uri)
956            .build()?;
957
958        let result = table.log_store().peek_next_commit(0).await;
959        assert!(result.is_err());
960        Ok(())
961    }
962
963    /// Collect list stream
964    pub(crate) async fn flatten_list_stream(
965        storage: &object_store::DynObjectStore,
966        prefix: Option<&Path>,
967    ) -> object_store::Result<Vec<Path>> {
968        storage
969            .list(prefix)
970            .map_ok(|meta| meta.location)
971            .try_collect::<Vec<Path>>()
972            .await
973    }
974}
975
976#[cfg(all(test, feature = "datafusion"))]
977mod datafusion_tests {
978    use super::*;
979    use url::Url;
980
981    #[tokio::test]
982    async fn test_unique_object_store_url() {
983        for (location_1, location_2) in [
984            // Same scheme, no host, different path
985            ("file:///path/to/table_1", "file:///path/to/table_2"),
986            // Different scheme/host, same path
987            ("s3://my_bucket/path/to/table_1", "file:///path/to/table_1"),
988            // Same scheme, different host, same path
989            ("s3://bucket_1/table_1", "s3://bucket_2/table_1"),
990        ] {
991            let url_1 = Url::parse(location_1).unwrap();
992            let url_2 = Url::parse(location_2).unwrap();
993
994            assert_ne!(
995                object_store_url(&url_1).as_str(),
996                object_store_url(&url_2).as_str(),
997            );
998        }
999    }
1000
1001    #[tokio::test]
1002    async fn test_get_actions_malformed_json() {
1003        let malformed_json = bytes::Bytes::from(
1004            r#"{"add": {"path": "test.parquet", "partitionValues": {}, "size": 100, "modificationTime": 1234567890, "dataChange": true}}
1005{"invalid json without closing brace"#,
1006        );
1007
1008        let result = get_actions(0, &malformed_json);
1009
1010        match result {
1011            Err(DeltaTableError::InvalidJsonLog {
1012                line,
1013                version,
1014                json_err,
1015            }) => {
1016                assert_eq!(version, 0);
1017                assert!(line.contains("line 2"));
1018                assert!(json_err.is_eof());
1019            }
1020            other => panic!("Expected InvalidJsonLog error, got {:?}", other),
1021        }
1022    }
1023}