deltalake_core/logstore/
mod.rs

1//! # DeltaLake storage system
2//!
3//! Interacting with storage systems is a crucial part of any table format.
4//! On one had the storage abstractions need to provide certain guarantees
5//! (e.g. atomic rename, ...) and meet certain assumptions (e.g. sorted list results)
6//! on the other hand can we exploit our knowledge about the general file layout
7//! and access patterns to optimize our operations in terms of cost and performance.
8//!
9//! Two distinct phases are involved in querying a Delta table:
10//! - **Metadata**: Fetching metadata about the table, such as schema, partitioning, and statistics.
11//! - **Data**: Reading and processing data files based on the metadata.
12//!
13//! When writing to a table, we see the same phases, just in inverse order:
14//! - **Data**: Writing data files that should become part of the table.
15//! - **Metadata**: Updating table metadata to incorporate updates.
16//!
17//! Two main abstractions govern the file operations [`LogStore`] and [`ObjectStore`].
18//!
19//! [`LogStore`]s are scoped to individual tables and are responsible for maintaining proper
20//! behaviours and ensuring consistency during the metadata phase. The correctness is predicated
21//! on the atomicity and durability guarantees of the implementation of this interface.
22//!
23//! - Atomic visibility: Partial writes must not be visible to readers.
24//! - Mutual exclusion: Only one writer must be able to write to a specific log file.
25//! - Consistent listing: Once a file has been written, any future list files operation must return
26//!   the underlying file system entry must immediately.
27//!
28//! <div class="warning">
29//!
30//! While most object stores today provide the required guarantees, the specific
31//! locking mechanics are a table level responsibility. Specific implementations may
32//! decide to refer to a central catalog or other mechanisms for coordination.
33//!
34//! </div>
35//!
36//! [`ObjectStore`]s are responsible for direct interactions with storage systems. Either
37//! during the data phase, where additional requirements are imposed on the storage system,
38//! or by specific LogStore implementations for their internal object store interactions.
39//!
40//! ## Managing LogStores and ObjectStores.
41//!
42//! Aside from very basic implementations (i.e. in-memory and local file system) we rely
43//! on external integrations to provide [`ObjectStore`] and/or [`LogStore`] implementations.
44//!
45//! At runtime, deltalake needs to produce appropriate [`ObjectStore`]s to access the files
46//! discovered in a table. This is done via
47//!
48//! ## Configuration
49//!
50use std::collections::HashMap;
51use std::sync::{Arc, LazyLock};
52
53use bytes::Bytes;
54#[cfg(feature = "datafusion")]
55use datafusion::datasource::object_store::ObjectStoreUrl;
56use delta_kernel::engine::default::executor::tokio::{
57    TokioBackgroundExecutor, TokioMultiThreadExecutor,
58};
59use delta_kernel::engine::default::DefaultEngine;
60use delta_kernel::log_segment::LogSegment;
61use delta_kernel::path::{LogPathFileType, ParsedLogPath};
62use delta_kernel::{AsAny, Engine};
63use futures::StreamExt;
64use object_store::ObjectStoreScheme;
65use object_store::{path::Path, Error as ObjectStoreError, ObjectStore};
66use regex::Regex;
67use serde::de::{Error, SeqAccess, Visitor};
68use serde::ser::SerializeSeq;
69use serde::{Deserialize, Serialize};
70use serde_json::Deserializer;
71use tokio::runtime::RuntimeFlavor;
72use tokio::task::spawn_blocking;
73use tracing::*;
74use url::Url;
75use uuid::Uuid;
76
77use crate::kernel::transaction::TransactionError;
78use crate::kernel::Action;
79use crate::{DeltaResult, DeltaTableError};
80
81pub use self::config::StorageConfig;
82pub use self::factories::{
83    logstore_factories, object_store_factories, store_for, LogStoreFactory,
84    LogStoreFactoryRegistry, ObjectStoreFactory, ObjectStoreFactoryRegistry,
85};
86pub use self::storage::utils::commit_uri_from_version;
87pub use self::storage::{
88    DefaultObjectStoreRegistry, DeltaIOStorageBackend, IORuntime, ObjectStoreRef,
89    ObjectStoreRegistry, ObjectStoreRetryExt,
90};
91/// Convenience re-export of the object store crate
92pub use ::object_store;
93
94pub mod config;
95pub(crate) mod default_logstore;
96pub(crate) mod factories;
97pub(crate) mod storage;
98
99/// Internal trait to handle object store configuration and initialization.
100trait LogStoreFactoryExt {
101    /// Create a new log store with the given options.
102    ///
103    /// ## Parameters
104    ///
105    /// - `root_store`: and instance of [`ObjectStoreRef`] with no prefix o.a. applied.
106    ///   I.e. pointing to the root of the onject store.
107    /// - `location`: The location of the delta table (where the `_delta_log` directory is).
108    /// - `options`: The options for the log store.
109    fn with_options_internal(
110        &self,
111        root_store: ObjectStoreRef,
112        location: &Url,
113        options: &StorageConfig,
114    ) -> DeltaResult<LogStoreRef>;
115}
116
117impl<T: LogStoreFactory + ?Sized> LogStoreFactoryExt for T {
118    fn with_options_internal(
119        &self,
120        root_store: ObjectStoreRef,
121        location: &Url,
122        options: &StorageConfig,
123    ) -> DeltaResult<LogStoreRef> {
124        let prefixed_store = options.decorate_store(root_store.clone(), location)?;
125        let log_store =
126            self.with_options(Arc::new(prefixed_store), root_store, location, options)?;
127        Ok(log_store)
128    }
129}
130
131impl<T: LogStoreFactory> LogStoreFactoryExt for Arc<T> {
132    fn with_options_internal(
133        &self,
134        root_store: ObjectStoreRef,
135        location: &Url,
136        options: &StorageConfig,
137    ) -> DeltaResult<LogStoreRef> {
138        T::with_options_internal(self, root_store, location, options)
139    }
140}
141
142/// Return the [DefaultLogStore] implementation with the provided configuration options
143pub fn default_logstore(
144    prefixed_store: ObjectStoreRef,
145    root_store: ObjectStoreRef,
146    location: &Url,
147    options: &StorageConfig,
148) -> Arc<dyn LogStore> {
149    Arc::new(default_logstore::DefaultLogStore::new(
150        prefixed_store,
151        root_store,
152        LogStoreConfig {
153            location: location.clone(),
154            options: options.clone(),
155        },
156    ))
157}
158
159/// Sharable reference to [`LogStore`]
160pub type LogStoreRef = Arc<dyn LogStore>;
161
162static DELTA_LOG_PATH: LazyLock<Path> = LazyLock::new(|| Path::from("_delta_log"));
163
164pub(crate) static DELTA_LOG_REGEX: LazyLock<Regex> =
165    LazyLock::new(|| Regex::new(r"(\d{20})\.(json|checkpoint(\.\d+)?\.parquet)$").unwrap());
166
167/// Return the [LogStoreRef] for the provided [Url] location
168///
169/// This will use the built-in process global [crate::storage::ObjectStoreRegistry] by default
170///
171/// ```rust
172/// # use deltalake_core::logstore::*;
173/// # use std::collections::HashMap;
174/// # use url::Url;
175/// let location = Url::parse("memory:///").expect("Failed to make location");
176/// let storage_config = StorageConfig::default();
177/// let logstore = logstore_for(location, storage_config).expect("Failed to get a logstore");
178/// ```
179pub fn logstore_for(location: Url, storage_config: StorageConfig) -> DeltaResult<LogStoreRef> {
180    // turn location into scheme
181    let scheme = Url::parse(&format!("{}://", location.scheme()))
182        .map_err(|_| DeltaTableError::InvalidTableLocation(location.clone().into()))?;
183
184    if let Some(entry) = object_store_factories().get(&scheme) {
185        debug!("Found a storage provider for {scheme} ({location})");
186        let (root_store, _prefix) = entry.value().parse_url_opts(&location, &storage_config)?;
187        return logstore_with(root_store, location, storage_config);
188    }
189
190    Err(DeltaTableError::InvalidTableLocation(location.into()))
191}
192
193/// Return the [LogStoreRef] using the given [ObjectStoreRef]
194pub fn logstore_with(
195    root_store: ObjectStoreRef,
196    location: Url,
197    storage_config: StorageConfig,
198) -> DeltaResult<LogStoreRef> {
199    let scheme = Url::parse(&format!("{}://", location.scheme()))
200        .map_err(|_| DeltaTableError::InvalidTableLocation(location.clone().into()))?;
201
202    if let Some(factory) = logstore_factories().get(&scheme) {
203        debug!("Found a logstore provider for {scheme}");
204        return factory
205            .value()
206            .with_options_internal(root_store, &location, &storage_config);
207    }
208
209    error!("Could not find a logstore for the scheme {scheme}");
210    Err(DeltaTableError::InvalidTableLocation(
211        location.clone().into(),
212    ))
213}
214
215/// Holder whether it's tmp_commit path or commit bytes
216#[derive(Clone)]
217pub enum CommitOrBytes {
218    /// Path of the tmp commit, to be used by logstores which use CopyIfNotExists
219    TmpCommit(Path),
220    /// Bytes of the log, to be used by logstoers which use Conditional Put
221    LogBytes(Bytes),
222}
223
224/// The next commit that's available from underlying storage
225///
226#[derive(Debug)]
227pub enum PeekCommit {
228    /// The next commit version and associated actions
229    New(i64, Vec<Action>),
230    /// Provided DeltaVersion is up to date
231    UpToDate,
232}
233
234/// Configuration parameters for a log store
235#[derive(Debug, Clone)]
236pub struct LogStoreConfig {
237    /// url corresponding to the storage location.
238    pub location: Url,
239    // Options used for configuring backend storage
240    pub options: StorageConfig,
241}
242
243impl LogStoreConfig {
244    pub fn decorate_store<T: ObjectStore + Clone>(
245        &self,
246        store: T,
247        table_root: Option<&url::Url>,
248    ) -> DeltaResult<Box<dyn ObjectStore>> {
249        let table_url = table_root.unwrap_or(&self.location);
250        self.options.decorate_store(store, table_url)
251    }
252
253    pub fn object_store_factory(&self) -> ObjectStoreFactoryRegistry {
254        self::factories::object_store_factories()
255    }
256}
257
258/// Trait for critical operations required to read and write commit entries in Delta logs.
259///
260/// The correctness is predicated on the atomicity and durability guarantees of
261/// the implementation of this interface. Specifically,
262///
263/// - Atomic visibility: Any commit created via `write_commit_entry` must become visible atomically.
264/// - Mutual exclusion: Only one writer must be able to create a commit for a specific version.
265/// - Consistent listing: Once a commit entry for version `v` has been written, any future call to
266///   `get_latest_version` must return a version >= `v`, i.e. the underlying file system entry must
267///   become visible immediately.
268#[async_trait::async_trait]
269pub trait LogStore: Send + Sync + AsAny {
270    /// Return the name of this LogStore implementation
271    fn name(&self) -> String;
272
273    /// Trigger sync operation on log store to.
274    async fn refresh(&self) -> DeltaResult<()> {
275        Ok(())
276    }
277
278    /// Read data for commit entry with the given version.
279    async fn read_commit_entry(&self, version: i64) -> DeltaResult<Option<Bytes>>;
280
281    /// Write list of actions as delta commit entry for given version.
282    ///
283    /// This operation can be retried with a higher version in case the write
284    /// fails with [`TransactionError::VersionAlreadyExists`].
285    async fn write_commit_entry(
286        &self,
287        version: i64,
288        commit_or_bytes: CommitOrBytes,
289        operation_id: Uuid,
290    ) -> Result<(), TransactionError>;
291
292    /// Abort the commit entry for the given version.
293    async fn abort_commit_entry(
294        &self,
295        version: i64,
296        commit_or_bytes: CommitOrBytes,
297        operation_id: Uuid,
298    ) -> Result<(), TransactionError>;
299
300    /// Find latest version currently stored in the delta log.
301    async fn get_latest_version(&self, start_version: i64) -> DeltaResult<i64>;
302
303    /// Get the list of actions for the next commit
304    async fn peek_next_commit(&self, current_version: i64) -> DeltaResult<PeekCommit> {
305        let next_version = current_version + 1;
306        let commit_log_bytes = match self.read_commit_entry(next_version).await {
307            Ok(Some(bytes)) => Ok(bytes),
308            Ok(None) => return Ok(PeekCommit::UpToDate),
309            Err(err) => Err(err),
310        }?;
311
312        let actions = crate::logstore::get_actions(next_version, &commit_log_bytes);
313        Ok(PeekCommit::New(next_version, actions?))
314    }
315
316    /// Get object store, can pass operation_id for object stores linked to an operation
317    fn object_store(&self, operation_id: Option<Uuid>) -> Arc<dyn ObjectStore>;
318
319    fn root_object_store(&self, operation_id: Option<Uuid>) -> Arc<dyn ObjectStore>;
320
321    fn engine(&self, operation_id: Option<Uuid>) -> Arc<dyn Engine> {
322        let store = self.root_object_store(operation_id);
323        get_engine(store)
324    }
325
326    /// [Path] to Delta log
327    fn to_uri(&self, location: &Path) -> String {
328        let root = &self.config().location;
329        to_uri(root, location)
330    }
331
332    /// Get fully qualified uri for table root
333    fn root_uri(&self) -> String {
334        self.to_uri(&Path::from(""))
335    }
336
337    /// [Path] to Delta log
338    fn log_path(&self) -> &Path {
339        &DELTA_LOG_PATH
340    }
341
342    #[deprecated(
343        since = "0.1.0",
344        note = "DO NOT USE: Just a stop gap to support lakefs during kernel migration"
345    )]
346    fn transaction_url(&self, _operation_id: Uuid, base: &Url) -> DeltaResult<Url> {
347        Ok(base.clone())
348    }
349
350    /// Check if the location is a delta table location
351    async fn is_delta_table_location(&self) -> DeltaResult<bool> {
352        let object_store = self.object_store(None);
353        let dummy_url = Url::parse("http://example.com").unwrap();
354        let log_path = Path::from("_delta_log");
355
356        let mut stream = object_store.list(Some(&log_path));
357        while let Some(res) = stream.next().await {
358            match res {
359                Ok(meta) => {
360                    let file_url = dummy_url.join(meta.location.as_ref()).unwrap();
361                    if let Ok(Some(parsed_path)) = ParsedLogPath::try_from(file_url) {
362                        if matches!(
363                            parsed_path.file_type,
364                            LogPathFileType::Commit
365                                | LogPathFileType::SinglePartCheckpoint
366                                | LogPathFileType::UuidCheckpoint(_)
367                                | LogPathFileType::MultiPartCheckpoint { .. }
368                                | LogPathFileType::CompactedCommit { .. }
369                        ) {
370                            return Ok(true);
371                        }
372                    }
373                    continue;
374                }
375                Err(ObjectStoreError::NotFound { .. }) => return Ok(false),
376                Err(err) => return Err(err.into()),
377            }
378        }
379
380        Ok(false)
381    }
382
383    /// Get configuration representing configured log store.
384    fn config(&self) -> &LogStoreConfig;
385
386    #[cfg(feature = "datafusion")]
387    /// Generate a unique enough url to identify the store in datafusion.
388    /// The DF object store registry only cares about the scheme and the host of the url for
389    /// registering/fetching. In our case the scheme is hard-coded to "delta-rs", so to get a unique
390    /// host we convert the location from this `LogStore` to a valid name, combining the
391    /// original scheme, host and path with invalid characters replaced.
392    fn object_store_url(&self) -> ObjectStoreUrl {
393        crate::logstore::object_store_url(&self.config().location)
394    }
395}
396
397/// Extension trait for LogStore to handle some internal invariants.
398pub(crate) trait LogStoreExt: LogStore {
399    /// The fully qualified table URL
400    ///
401    /// The paths is guaranteed to end with a slash,
402    /// so that it can be used as a prefix for other paths.
403    fn table_root_url(&self) -> Url {
404        let mut base = self.config().location.clone();
405        if !base.path().ends_with("/") {
406            base.set_path(&format!("{}/", base.path()));
407        }
408        base
409    }
410
411    /// The fully qualified table log URL
412    ///
413    /// The paths is guaranteed to end with a slash,
414    /// so that it can be used as a prefix for other paths.
415    fn log_root_url(&self) -> Url {
416        self.table_root_url().join("_delta_log/").unwrap()
417    }
418}
419
420impl<T: LogStore + ?Sized> LogStoreExt for T {}
421
422#[async_trait::async_trait]
423impl<T: LogStore + ?Sized> LogStore for Arc<T> {
424    fn name(&self) -> String {
425        T::name(self)
426    }
427
428    async fn refresh(&self) -> DeltaResult<()> {
429        T::refresh(self).await
430    }
431
432    async fn read_commit_entry(&self, version: i64) -> DeltaResult<Option<Bytes>> {
433        T::read_commit_entry(self, version).await
434    }
435
436    async fn write_commit_entry(
437        &self,
438        version: i64,
439        commit_or_bytes: CommitOrBytes,
440        operation_id: Uuid,
441    ) -> Result<(), TransactionError> {
442        T::write_commit_entry(self, version, commit_or_bytes, operation_id).await
443    }
444
445    async fn abort_commit_entry(
446        &self,
447        version: i64,
448        commit_or_bytes: CommitOrBytes,
449        operation_id: Uuid,
450    ) -> Result<(), TransactionError> {
451        T::abort_commit_entry(self, version, commit_or_bytes, operation_id).await
452    }
453
454    async fn get_latest_version(&self, start_version: i64) -> DeltaResult<i64> {
455        T::get_latest_version(self, start_version).await
456    }
457
458    async fn peek_next_commit(&self, current_version: i64) -> DeltaResult<PeekCommit> {
459        T::peek_next_commit(self, current_version).await
460    }
461
462    fn object_store(&self, operation_id: Option<Uuid>) -> Arc<dyn ObjectStore> {
463        T::object_store(self, operation_id)
464    }
465
466    fn root_object_store(&self, operation_id: Option<Uuid>) -> Arc<dyn ObjectStore> {
467        T::root_object_store(self, operation_id)
468    }
469
470    fn engine(&self, operation_id: Option<Uuid>) -> Arc<dyn Engine> {
471        T::engine(self, operation_id)
472    }
473
474    fn to_uri(&self, location: &Path) -> String {
475        T::to_uri(self, location)
476    }
477
478    fn root_uri(&self) -> String {
479        T::root_uri(self)
480    }
481
482    fn log_path(&self) -> &Path {
483        T::log_path(self)
484    }
485
486    async fn is_delta_table_location(&self) -> DeltaResult<bool> {
487        T::is_delta_table_location(self).await
488    }
489
490    fn config(&self) -> &LogStoreConfig {
491        T::config(self)
492    }
493
494    #[cfg(feature = "datafusion")]
495    fn object_store_url(&self) -> ObjectStoreUrl {
496        T::object_store_url(self)
497    }
498}
499
500pub(crate) fn get_engine(store: Arc<dyn ObjectStore>) -> Arc<dyn Engine> {
501    let handle = tokio::runtime::Handle::current();
502    match handle.runtime_flavor() {
503        RuntimeFlavor::MultiThread => Arc::new(DefaultEngine::new(
504            store,
505            Arc::new(TokioMultiThreadExecutor::new(handle)),
506        )),
507        RuntimeFlavor::CurrentThread => Arc::new(DefaultEngine::new(
508            store,
509            Arc::new(TokioBackgroundExecutor::new()),
510        )),
511        _ => panic!("unsupported runtime flavor"),
512    }
513}
514
515#[cfg(feature = "datafusion")]
516fn object_store_url(location: &Url) -> ObjectStoreUrl {
517    use object_store::path::DELIMITER;
518    ObjectStoreUrl::parse(format!(
519        "delta-rs://{}-{}{}",
520        location.scheme(),
521        location.host_str().unwrap_or("-"),
522        location.path().replace(DELIMITER, "-").replace(':', "-")
523    ))
524    .expect("Invalid object store url.")
525}
526
527/// Parse the path from a URL accounting for special case witjh S3
528// TODO: find out why this is necessary
529pub(crate) fn object_store_path(table_root: &Url) -> DeltaResult<Path> {
530    Ok(match ObjectStoreScheme::parse(table_root) {
531        Ok((_, path)) => path,
532        _ => Path::parse(table_root.path())?,
533    })
534}
535
536/// TODO
537pub fn to_uri(root: &Url, location: &Path) -> String {
538    match root.scheme() {
539        "file" => {
540            #[cfg(windows)]
541            let uri = format!(
542                "{}/{}",
543                root.as_ref().trim_end_matches('/'),
544                location.as_ref()
545            )
546            .replace("file:///", "");
547            #[cfg(unix)]
548            let uri = format!(
549                "{}/{}",
550                root.as_ref().trim_end_matches('/'),
551                location.as_ref()
552            )
553            .replace("file://", "");
554            uri
555        }
556        _ => {
557            if location.as_ref().is_empty() || location.as_ref() == "/" {
558                root.as_ref().to_string()
559            } else {
560                format!("{}/{}", root.as_ref(), location.as_ref())
561            }
562        }
563    }
564}
565
566/// Reads a commit and gets list of actions
567pub fn get_actions(
568    version: i64,
569    commit_log_bytes: &bytes::Bytes,
570) -> Result<Vec<Action>, DeltaTableError> {
571    debug!("parsing commit with version {version}...");
572    Deserializer::from_slice(commit_log_bytes)
573        .into_iter::<Action>()
574        .map(|result| {
575            result.map_err(|e| {
576                let line = format!("Error at line {}, column {}", e.line(), e.column());
577                DeltaTableError::InvalidJsonLog {
578                    json_err: e,
579                    line,
580                    version,
581                }
582            })
583        })
584        .collect()
585}
586
587// TODO: maybe a bit of a hack, required to `#[derive(Debug)]` for the operation builders
588impl std::fmt::Debug for dyn LogStore + '_ {
589    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
590        write!(f, "{}({})", self.name(), self.root_uri())
591    }
592}
593
594impl Serialize for LogStoreConfig {
595    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
596    where
597        S: serde::Serializer,
598    {
599        let mut seq = serializer.serialize_seq(None)?;
600        seq.serialize_element(&self.location.to_string())?;
601        seq.serialize_element(&self.options.raw)?;
602        seq.end()
603    }
604}
605
606impl<'de> Deserialize<'de> for LogStoreConfig {
607    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
608    where
609        D: serde::Deserializer<'de>,
610    {
611        struct LogStoreConfigVisitor {}
612
613        impl<'de> Visitor<'de> for LogStoreConfigVisitor {
614            type Value = LogStoreConfig;
615
616            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
617                formatter.write_str("struct LogStoreConfig")
618            }
619
620            fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
621            where
622                A: SeqAccess<'de>,
623            {
624                let location_str: String = seq
625                    .next_element()?
626                    .ok_or_else(|| A::Error::invalid_length(0, &self))?;
627                let options: HashMap<String, String> = seq
628                    .next_element()?
629                    .ok_or_else(|| A::Error::invalid_length(0, &self))?;
630                let location = Url::parse(&location_str).map_err(A::Error::custom)?;
631                Ok(LogStoreConfig {
632                    location,
633                    options: StorageConfig::parse_options(options).map_err(A::Error::custom)?,
634                })
635            }
636        }
637
638        deserializer.deserialize_seq(LogStoreConfigVisitor {})
639    }
640}
641
642/// Extract version from a file name in the delta log
643pub fn extract_version_from_filename(name: &str) -> Option<i64> {
644    DELTA_LOG_REGEX
645        .captures(name)
646        .map(|captures| captures.get(1).unwrap().as_str().parse().unwrap())
647}
648
649/// Default implementation for retrieving the latest version
650pub async fn get_latest_version(
651    log_store: &dyn LogStore,
652    current_version: i64,
653) -> DeltaResult<i64> {
654    let current_version = if current_version < 0 {
655        0
656    } else {
657        current_version
658    };
659
660    let storage = log_store.engine(None).storage_handler();
661    let log_root = log_store.log_root_url();
662
663    let segment = spawn_blocking(move || {
664        LogSegment::for_table_changes(storage.as_ref(), log_root, current_version as u64, None)
665    })
666    .await
667    .map_err(|e| DeltaTableError::Generic(e.to_string()))?
668    .map_err(|e| {
669        if e.to_string()
670            .contains(&format!("to have version {current_version}"))
671        {
672            DeltaTableError::InvalidVersion(current_version)
673        } else {
674            DeltaTableError::Generic(e.to_string())
675        }
676    })?;
677
678    Ok(segment.end_version as i64)
679}
680
681/// Read delta log for a specific version
682#[instrument(skip(storage), fields(version = version, path = %commit_uri_from_version(version)))]
683pub async fn read_commit_entry(
684    storage: &dyn ObjectStore,
685    version: i64,
686) -> DeltaResult<Option<Bytes>> {
687    let commit_uri = commit_uri_from_version(version);
688    match storage.get(&commit_uri).await {
689        Ok(res) => {
690            let bytes = res.bytes().await?;
691            debug!(size = bytes.len(), "commit entry read successfully");
692            Ok(Some(bytes))
693        }
694        Err(ObjectStoreError::NotFound { .. }) => {
695            debug!("commit entry not found");
696            Ok(None)
697        }
698        Err(err) => {
699            error!(error = %err, version = version, "failed to read commit entry");
700            Err(err.into())
701        }
702    }
703}
704
705/// Default implementation for writing a commit entry
706#[instrument(skip(storage), fields(version = version, tmp_path = %tmp_commit, commit_path = %commit_uri_from_version(version)))]
707pub async fn write_commit_entry(
708    storage: &dyn ObjectStore,
709    version: i64,
710    tmp_commit: &Path,
711) -> Result<(), TransactionError> {
712    // move temporary commit file to delta log directory
713    // rely on storage to fail if the file already exists -
714    storage
715        .rename_if_not_exists(tmp_commit, &commit_uri_from_version(version))
716        .await
717        .map_err(|err| -> TransactionError {
718            match err {
719                ObjectStoreError::AlreadyExists { .. } => {
720                    warn!("commit entry already exists");
721                    TransactionError::VersionAlreadyExists(version)
722                }
723                _ => {
724                    error!(error = %err, "failed to write commit entry");
725                    TransactionError::from(err)
726                }
727            }
728        })?;
729    debug!("commit entry written successfully");
730    Ok(())
731}
732
733/// Default implementation for aborting a commit entry
734#[instrument(skip(storage), fields(version = _version, tmp_path = %tmp_commit))]
735pub async fn abort_commit_entry(
736    storage: &dyn ObjectStore,
737    _version: i64,
738    tmp_commit: &Path,
739) -> Result<(), TransactionError> {
740    storage.delete_with_retries(tmp_commit, 15).await?;
741    debug!("commit entry aborted successfully");
742    Ok(())
743}
744
745#[cfg(test)]
746pub(crate) mod tests {
747    use futures::TryStreamExt;
748
749    use super::*;
750
751    #[test]
752    fn logstore_with_invalid_url() {
753        let location = Url::parse("nonexistent://table").unwrap();
754
755        let store = logstore_for(location, StorageConfig::default());
756        assert!(store.is_err());
757    }
758
759    #[test]
760    fn logstore_with_memory() {
761        let location = Url::parse("memory:///table").unwrap();
762        let store = logstore_for(location, StorageConfig::default());
763        assert!(store.is_ok());
764    }
765
766    #[test]
767    fn logstore_with_memory_and_rt() {
768        let location = Url::parse("memory:///table").unwrap();
769        let store = logstore_for(
770            location,
771            StorageConfig::default().with_io_runtime(IORuntime::default()),
772        );
773        assert!(store.is_ok());
774    }
775
776    #[test]
777    fn test_logstore_ext() {
778        let location = Url::parse("memory:///table").unwrap();
779        let store = logstore_for(location, StorageConfig::default()).unwrap();
780        let table_url = store.table_root_url();
781        assert!(table_url.path().ends_with('/'));
782        let log_url = store.log_root_url();
783        assert!(log_url.path().ends_with("_delta_log/"));
784    }
785
786    #[tokio::test]
787    async fn test_is_location_a_table() {
788        use object_store::path::Path;
789        use object_store::{PutOptions, PutPayload};
790        let location = Url::parse("memory:///table").unwrap();
791        let store =
792            logstore_for(location, StorageConfig::default()).expect("Failed to get logstore");
793        assert!(!store
794            .is_delta_table_location()
795            .await
796            .expect("Failed to look at table"));
797
798        // Let's put a failed commit into the directory and then see if it's still considered a
799        // delta table (it shouldn't be).
800        let payload = PutPayload::from_static(b"test-drivin");
801        let _put = store
802            .object_store(None)
803            .put_opts(
804                &Path::from("_delta_log/_commit_failed.tmp"),
805                payload,
806                PutOptions::default(),
807            )
808            .await
809            .expect("Failed to put");
810        assert!(!store
811            .is_delta_table_location()
812            .await
813            .expect("Failed to look at table"));
814    }
815
816    #[tokio::test]
817    async fn test_is_location_a_table_commit() {
818        use object_store::path::Path;
819        use object_store::{PutOptions, PutPayload};
820        let location = Url::parse("memory:///table").unwrap();
821        let store =
822            logstore_for(location, StorageConfig::default()).expect("Failed to get logstore");
823        assert!(!store
824            .is_delta_table_location()
825            .await
826            .expect("Failed to identify table"));
827
828        // Save a commit to the transaction log
829        let payload = PutPayload::from_static(b"test");
830        let _put = store
831            .object_store(None)
832            .put_opts(
833                &Path::from("_delta_log/00000000000000000000.json"),
834                payload,
835                PutOptions::default(),
836            )
837            .await
838            .expect("Failed to put");
839        // The table should be considered a delta table
840        assert!(store
841            .is_delta_table_location()
842            .await
843            .expect("Failed to identify table"));
844    }
845
846    #[tokio::test]
847    async fn test_is_location_a_table_checkpoint() {
848        use object_store::path::Path;
849        use object_store::{PutOptions, PutPayload};
850        let location = Url::parse("memory:///table").unwrap();
851        let store =
852            logstore_for(location, StorageConfig::default()).expect("Failed to get logstore");
853        assert!(!store
854            .is_delta_table_location()
855            .await
856            .expect("Failed to identify table"));
857
858        // Save a "checkpoint" file to the transaction log directory
859        let payload = PutPayload::from_static(b"test");
860        let _put = store
861            .object_store(None)
862            .put_opts(
863                &Path::from("_delta_log/00000000000000000000.checkpoint.parquet"),
864                payload,
865                PutOptions::default(),
866            )
867            .await
868            .expect("Failed to put");
869        // The table should be considered a delta table
870        assert!(store
871            .is_delta_table_location()
872            .await
873            .expect("Failed to identify table"));
874    }
875
876    #[tokio::test]
877    async fn test_is_location_a_table_crc() {
878        use object_store::path::Path;
879        use object_store::{PutOptions, PutPayload};
880        let location = Url::parse("memory:///table").unwrap();
881        let store =
882            logstore_for(location, StorageConfig::default()).expect("Failed to get logstore");
883        assert!(!store
884            .is_delta_table_location()
885            .await
886            .expect("Failed to identify table"));
887
888        // Save .crc files to the transaction log directory (all 3 formats)
889        let payload = PutPayload::from_static(b"test");
890
891        let _put = store
892            .object_store(None)
893            .put_opts(
894                &Path::from("_delta_log/.00000000000000000000.crc.crc"),
895                payload.clone(),
896                PutOptions::default(),
897            )
898            .await
899            .expect("Failed to put");
900
901        let _put = store
902            .object_store(None)
903            .put_opts(
904                &Path::from("_delta_log/.00000000000000000000.json.crc"),
905                payload.clone(),
906                PutOptions::default(),
907            )
908            .await
909            .expect("Failed to put");
910
911        let _put = store
912            .object_store(None)
913            .put_opts(
914                &Path::from("_delta_log/00000000000000000000.crc"),
915                payload.clone(),
916                PutOptions::default(),
917            )
918            .await
919            .expect("Failed to put");
920
921        // Now add a commit
922        let _put = store
923            .object_store(None)
924            .put_opts(
925                &Path::from("_delta_log/00000000000000000000.json"),
926                payload.clone(),
927                PutOptions::default(),
928            )
929            .await
930            .expect("Failed to put");
931
932        // The table should be considered a delta table
933        assert!(store
934            .is_delta_table_location()
935            .await
936            .expect("Failed to identify table"));
937    }
938
939    /// <https://github.com/delta-io/delta-rs/issues/3297>:w
940    #[tokio::test]
941    async fn test_peek_with_invalid_json() -> DeltaResult<()> {
942        use crate::logstore::object_store::memory::InMemory;
943        let memory_store = Arc::new(InMemory::new());
944        let log_path = Path::from("delta-table/_delta_log/00000000000000000001.json");
945
946        let log_content = r#"{invalid_json"#;
947
948        memory_store
949            .put(&log_path, log_content.into())
950            .await
951            .expect("Failed to write log file");
952
953        let table_uri = url::Url::parse("memory:///delta-table").unwrap();
954        let table = crate::DeltaTableBuilder::from_uri(table_uri.clone())
955            .unwrap()
956            .with_storage_backend(memory_store, table_uri)
957            .build()?;
958
959        let result = table.log_store().peek_next_commit(0).await;
960        assert!(result.is_err());
961        Ok(())
962    }
963
964    /// Collect list stream
965    pub(crate) async fn flatten_list_stream(
966        storage: &object_store::DynObjectStore,
967        prefix: Option<&Path>,
968    ) -> object_store::Result<Vec<Path>> {
969        storage
970            .list(prefix)
971            .map_ok(|meta| meta.location)
972            .try_collect::<Vec<Path>>()
973            .await
974    }
975}
976
977#[cfg(all(test, feature = "datafusion"))]
978mod datafusion_tests {
979    use super::*;
980    use url::Url;
981
982    #[tokio::test]
983    async fn test_unique_object_store_url() {
984        for (location_1, location_2) in [
985            // Same scheme, no host, different path
986            ("file:///path/to/table_1", "file:///path/to/table_2"),
987            // Different scheme/host, same path
988            ("s3://my_bucket/path/to/table_1", "file:///path/to/table_1"),
989            // Same scheme, different host, same path
990            ("s3://bucket_1/table_1", "s3://bucket_2/table_1"),
991        ] {
992            let url_1 = Url::parse(location_1).unwrap();
993            let url_2 = Url::parse(location_2).unwrap();
994
995            assert_ne!(
996                object_store_url(&url_1).as_str(),
997                object_store_url(&url_2).as_str(),
998            );
999        }
1000    }
1001
1002    #[tokio::test]
1003    async fn test_get_actions_malformed_json() {
1004        let malformed_json = bytes::Bytes::from(
1005            r#"{"add": {"path": "test.parquet", "partitionValues": {}, "size": 100, "modificationTime": 1234567890, "dataChange": true}}
1006{"invalid json without closing brace"#,
1007        );
1008
1009        let result = get_actions(0, &malformed_json);
1010
1011        match result {
1012            Err(DeltaTableError::InvalidJsonLog {
1013                line,
1014                version,
1015                json_err,
1016            }) => {
1017                assert_eq!(version, 0);
1018                assert!(line.contains("line 2"));
1019                assert!(json_err.is_eof());
1020            }
1021            other => panic!("Expected InvalidJsonLog error, got {:?}", other),
1022        }
1023    }
1024}