deltalake_core/table/
builder.rs

1//! Create or load DeltaTables
2
3use std::collections::HashMap;
4use std::path::PathBuf;
5use std::sync::Arc;
6
7use chrono::{DateTime, FixedOffset, Utc};
8use deltalake_derive::DeltaConfig;
9use object_store::DynObjectStore;
10use serde::{Deserialize, Serialize};
11use tracing::debug;
12use url::Url;
13
14use crate::logstore::storage::IORuntime;
15use crate::logstore::{object_store_factories, LogStoreRef};
16use crate::{DeltaResult, DeltaTable, DeltaTableError};
17
18/// possible version specifications for loading a delta table
19#[derive(Debug, Copy, Clone, PartialEq, Eq, Default)]
20pub enum DeltaVersion {
21    /// load the newest version
22    #[default]
23    Newest,
24    /// specify the version to load
25    Version(i64),
26    /// specify the timestamp in UTC
27    Timestamp(DateTime<Utc>),
28}
29
30/// Configuration options for delta table
31#[derive(Debug, Serialize, Deserialize, Clone, DeltaConfig)]
32#[serde(rename_all = "camelCase")]
33pub struct DeltaTableConfig {
34    /// Indicates whether DeltaTable should track files.
35    /// This defaults to `true`
36    ///
37    /// Some append-only applications might have no need of tracking any files.
38    /// Hence, DeltaTable will be loaded with significant memory reduction.
39    pub require_files: bool,
40
41    /// Controls how many files to buffer from the commit log when updating the table.
42    /// This defaults to 4 * number of cpus
43    ///
44    /// Setting a value greater than 1 results in concurrent calls to the storage api.
45    /// This can decrease latency if there are many files in the log since the
46    /// last checkpoint, but will also increase memory usage. Possible rate limits of the storage backend should
47    /// also be considered for optimal performance.
48    pub log_buffer_size: usize,
49
50    /// Control the number of records to read / process from the commit / checkpoint files
51    /// when processing record batches.
52    pub log_batch_size: usize,
53
54    #[serde(skip_serializing, skip_deserializing)]
55    #[delta(skip)]
56    /// When a runtime handler is provided, all IO tasks are spawn in that handle
57    pub io_runtime: Option<IORuntime>,
58}
59
60impl Default for DeltaTableConfig {
61    fn default() -> Self {
62        Self {
63            require_files: true,
64            log_buffer_size: num_cpus::get() * 4,
65            log_batch_size: 1024,
66            io_runtime: None,
67        }
68    }
69}
70
71impl PartialEq for DeltaTableConfig {
72    fn eq(&self, other: &Self) -> bool {
73        self.require_files == other.require_files
74            && self.log_buffer_size == other.log_buffer_size
75            && self.log_batch_size == other.log_batch_size
76    }
77}
78
79/// builder for configuring a delta table load.
80#[derive(Debug)]
81pub struct DeltaTableBuilder {
82    /// table root uri
83    table_uri: String,
84    /// backend to access storage system
85    storage_backend: Option<(Arc<DynObjectStore>, Url)>,
86    /// specify the version we are going to load: a time stamp, a version, or just the newest
87    /// available version
88    version: DeltaVersion,
89    storage_options: Option<HashMap<String, String>>,
90    #[allow(unused_variables)]
91    allow_http: Option<bool>,
92    table_config: DeltaTableConfig,
93}
94
95impl DeltaTableBuilder {
96    /// Creates `DeltaTableBuilder` from table uri
97    ///
98    /// Can panic on an invalid URI
99    ///
100    /// ```rust
101    /// # use deltalake_core::table::builder::*;
102    /// let builder = DeltaTableBuilder::from_uri("../test/tests/data/delta-0.8.0");
103    /// assert!(true);
104    /// ```
105    pub fn from_uri(table_uri: impl AsRef<str>) -> Self {
106        let url = ensure_table_uri(&table_uri).expect("The specified table_uri is not valid");
107        DeltaTableBuilder::from_valid_uri(url).expect("Failed to create valid builder")
108    }
109
110    /// Creates `DeltaTableBuilder` from verified table uri.
111    ///
112    /// ```rust
113    /// # use deltalake_core::table::builder::*;
114    /// let builder = DeltaTableBuilder::from_valid_uri("memory:///");
115    /// assert!(builder.is_ok(), "Builder failed with {builder:?}");
116    /// ```
117    pub fn from_valid_uri(table_uri: impl AsRef<str>) -> DeltaResult<Self> {
118        if let Ok(url) = Url::parse(table_uri.as_ref()) {
119            if url.scheme() == "file" {
120                let path = url.to_file_path().map_err(|_| {
121                    DeltaTableError::InvalidTableLocation(table_uri.as_ref().to_string())
122                })?;
123                ensure_file_location_exists(path)?;
124            }
125        } else {
126            ensure_file_location_exists(PathBuf::from(table_uri.as_ref()))?;
127        }
128
129        let url = ensure_table_uri(&table_uri)?;
130        debug!("creating table builder with {url}");
131
132        Ok(Self {
133            table_uri: url.into(),
134            storage_backend: None,
135            version: DeltaVersion::default(),
136            storage_options: None,
137            allow_http: None,
138            table_config: DeltaTableConfig::default(),
139        })
140    }
141
142    /// Sets `require_files=false` to the builder
143    pub fn without_files(mut self) -> Self {
144        self.table_config.require_files = false;
145        self
146    }
147
148    /// Sets `version` to the builder
149    pub fn with_version(mut self, version: i64) -> Self {
150        self.version = DeltaVersion::Version(version);
151        self
152    }
153
154    /// Sets `log_buffer_size` to the builder
155    pub fn with_log_buffer_size(mut self, log_buffer_size: usize) -> DeltaResult<Self> {
156        if log_buffer_size == 0 {
157            return Err(DeltaTableError::Generic(String::from(
158                "Log buffer size should be positive",
159            )));
160        }
161        self.table_config.log_buffer_size = log_buffer_size;
162        Ok(self)
163    }
164
165    /// specify the timestamp given as ISO-8601/RFC-3339 timestamp
166    pub fn with_datestring(self, date_string: impl AsRef<str>) -> DeltaResult<Self> {
167        let datetime = DateTime::<Utc>::from(DateTime::<FixedOffset>::parse_from_rfc3339(
168            date_string.as_ref(),
169        )?);
170        Ok(self.with_timestamp(datetime))
171    }
172
173    /// specify a timestamp
174    pub fn with_timestamp(mut self, timestamp: DateTime<Utc>) -> Self {
175        self.version = DeltaVersion::Timestamp(timestamp);
176        self
177    }
178
179    /// Set the storage backend.
180    ///
181    /// If a backend is not provided then it is derived from `table_uri`.
182    ///
183    /// # Arguments
184    ///
185    /// * `storage` - A shared reference to an [`ObjectStore`](object_store::ObjectStore) with
186    ///   "/" pointing at delta table root (i.e. where `_delta_log` is located).
187    /// * `location` - A url corresponding to the storagle location of `storage`.
188    pub fn with_storage_backend(mut self, storage: Arc<DynObjectStore>, location: Url) -> Self {
189        self.storage_backend = Some((storage, location));
190        self
191    }
192
193    /// Set options used to initialize storage backend
194    ///
195    /// Options may be passed in the HashMap or set as environment variables. See documentation of
196    /// underlying object store implementation for details. Trailing slash will be trimmed in
197    /// the option's value to avoid failures. Trimming will only be done if one or more of below
198    /// conditions are met:
199    /// - key ends with `_URL` (e.g., `ENDPOINT_URL`, `S3_URL`, `JDBC_URL`, etc.)
200    /// - value starts with `http://`` or `https://` (e.g., `http://localhost:8000/`)
201    ///
202    /// - [Azure options](https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants)
203    /// - [S3 options](https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html#variants)
204    /// - [Google options](https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html#variants)
205    pub fn with_storage_options(mut self, storage_options: HashMap<String, String>) -> Self {
206        self.storage_options = Some(
207            storage_options
208                .clone()
209                .into_iter()
210                .map(|(k, v)| {
211                    let needs_trim = v.starts_with("http://")
212                        || v.starts_with("https://")
213                        || k.to_lowercase().ends_with("_url");
214                    if needs_trim {
215                        (k.to_owned(), v.trim_end_matches('/').to_owned())
216                    } else {
217                        (k, v)
218                    }
219                })
220                .collect(),
221        );
222        self
223    }
224
225    /// Allows unsecure connections via http.
226    ///
227    /// This setting is most useful for testing / development when connecting to emulated services.
228    pub fn with_allow_http(mut self, allow_http: bool) -> Self {
229        self.allow_http = Some(allow_http);
230        self
231    }
232
233    /// Provide a custom runtime handle or runtime config
234    pub fn with_io_runtime(mut self, io_runtime: IORuntime) -> Self {
235        self.table_config.io_runtime = Some(io_runtime);
236        self
237    }
238
239    /// Storage options for configuring backend object store
240    pub fn storage_options(&self) -> HashMap<String, String> {
241        let mut storage_options = self.storage_options.clone().unwrap_or_default();
242        if let Some(allow) = self.allow_http {
243            storage_options.insert(
244                "allow_http".into(),
245                if allow { "true" } else { "false" }.into(),
246            );
247        };
248        storage_options
249    }
250
251    /// Build a delta storage backend for the given config
252    pub fn build_storage(&self) -> DeltaResult<LogStoreRef> {
253        debug!("build_storage() with {}", self.table_uri);
254        let location = Url::parse(&self.table_uri).map_err(|_| {
255            DeltaTableError::NotATable(format!("Could not turn {} into a URL", self.table_uri))
256        })?;
257
258        if let Some((store, _url)) = self.storage_backend.as_ref() {
259            debug!("Loading a logstore with a custom store: {store:?}");
260            crate::logstore::logstore_with(
261                store.clone(),
262                location,
263                self.storage_options(),
264                self.table_config.io_runtime.clone(),
265            )
266        } else {
267            // If there has been no backend defined just default to the normal logstore look up
268            debug!("Loading a logstore based off the location: {location:?}");
269            crate::logstore::logstore_for(
270                location,
271                self.storage_options(),
272                self.table_config.io_runtime.clone(),
273            )
274        }
275    }
276
277    /// Build the [`DeltaTable`] from specified options.
278    ///
279    /// This will not load the log, i.e. the table is not initialized. To get an initialized
280    /// table use the `load` function
281    pub fn build(self) -> DeltaResult<DeltaTable> {
282        Ok(DeltaTable::new(self.build_storage()?, self.table_config))
283    }
284
285    /// Build the [`DeltaTable`] and load its state
286    pub async fn load(self) -> DeltaResult<DeltaTable> {
287        let version = self.version;
288        let mut table = self.build()?;
289        match version {
290            DeltaVersion::Newest => table.load().await?,
291            DeltaVersion::Version(v) => table.load_version(v).await?,
292            DeltaVersion::Timestamp(ts) => table.load_with_datetime(ts).await?,
293        }
294        Ok(table)
295    }
296}
297
298enum UriType {
299    LocalPath(PathBuf),
300    Url(Url),
301}
302
303/// Utility function to figure out whether string representation of the path
304/// is either local path or some kind or URL.
305///
306/// Will return an error if the path is not valid.
307fn resolve_uri_type(table_uri: impl AsRef<str>) -> DeltaResult<UriType> {
308    let table_uri = table_uri.as_ref();
309    let known_schemes: Vec<_> = object_store_factories()
310        .iter()
311        .map(|v| v.key().scheme().to_owned())
312        .collect();
313
314    if let Ok(url) = Url::parse(table_uri) {
315        let scheme = url.scheme().to_string();
316        if url.scheme() == "file" {
317            Ok(UriType::LocalPath(url.to_file_path().map_err(|err| {
318                let msg = format!("Invalid table location: {table_uri}\nError: {err:?}");
319                DeltaTableError::InvalidTableLocation(msg)
320            })?))
321        // NOTE this check is required to support absolute windows paths which may properly parse as url
322        } else if known_schemes.contains(&scheme) {
323            Ok(UriType::Url(url))
324        // NOTE this check is required to support absolute windows paths which may properly parse as url
325        // we assume here that a single character scheme is a windows drive letter
326        } else if scheme.len() == 1 {
327            Ok(UriType::LocalPath(PathBuf::from(table_uri)))
328        } else {
329            Err(DeltaTableError::InvalidTableLocation(format!(
330                "Unknown scheme: {scheme}. Known schemes: {}",
331                known_schemes.join(",")
332            )))
333        }
334    } else {
335        Ok(UriType::LocalPath(PathBuf::from(table_uri)))
336    }
337}
338
339/// Attempt to create a Url from given table location.
340///
341/// The location could be:
342///  * A valid URL, which will be parsed and returned
343///  * A path to a directory, which will be created and then converted to a URL.
344///
345/// If it is a local path, it will be created if it doesn't exist.
346///
347/// Extra slashes will be removed from the end path as well.
348///
349/// Will return an error if the location is not valid. For example,
350pub fn ensure_table_uri(table_uri: impl AsRef<str>) -> DeltaResult<Url> {
351    let table_uri = table_uri.as_ref();
352
353    let uri_type: UriType = resolve_uri_type(table_uri)?;
354
355    // If it is a local path, we need to create it if it does not exist.
356    let mut url = match uri_type {
357        UriType::LocalPath(path) => {
358            if !path.exists() {
359                std::fs::create_dir_all(&path).map_err(|err| {
360                    let msg =
361                        format!("Could not create local directory: {table_uri}\nError: {err:?}");
362                    DeltaTableError::InvalidTableLocation(msg)
363                })?;
364            }
365            let path = std::fs::canonicalize(path).map_err(|err| {
366                let msg = format!("Invalid table location: {table_uri}\nError: {err:?}");
367                DeltaTableError::InvalidTableLocation(msg)
368            })?;
369            Url::from_directory_path(path).map_err(|_| {
370                let msg = format!(
371                    "Could not construct a URL from the canonical path: {table_uri}.\n\
372                    Something must be very wrong with the table path.",
373                );
374                DeltaTableError::InvalidTableLocation(msg)
375            })?
376        }
377        UriType::Url(url) => url,
378    };
379
380    let trimmed_path = url.path().trim_end_matches('/').to_owned();
381    url.set_path(&trimmed_path);
382    Ok(url)
383}
384
385/// Validate that the given [PathBuf] does exist, otherwise return a
386/// [DeltaTableError::InvalidTableLocation]
387fn ensure_file_location_exists(path: PathBuf) -> DeltaResult<()> {
388    if !path.exists() {
389        let msg = format!(
390            "Local path \"{}\" does not exist or you don't have access!",
391            path.as_path().display(),
392        );
393        return Err(DeltaTableError::InvalidTableLocation(msg));
394    }
395    Ok(())
396}
397
398#[cfg(test)]
399mod tests {
400    use super::*;
401    use crate::logstore::factories::DefaultObjectStoreFactory;
402
403    #[test]
404    fn test_ensure_table_uri() {
405        object_store_factories().insert(
406            Url::parse("s3://").unwrap(),
407            Arc::new(DefaultObjectStoreFactory::default()),
408        );
409
410        // parse an existing relative directory
411        let uri = ensure_table_uri(".");
412        assert!(uri.is_ok());
413        let uri = ensure_table_uri("s3://container/path");
414        assert!(uri.is_ok());
415        #[cfg(not(windows))]
416        {
417            let uri = ensure_table_uri("file:///tmp/nonexistent/some/path");
418            assert!(uri.is_ok());
419        }
420        let uri = ensure_table_uri("./nonexistent");
421        assert!(uri.is_ok());
422        let file_path = std::path::Path::new("./nonexistent");
423        std::fs::remove_dir(file_path).unwrap();
424
425        // These cases should all roundtrip to themselves
426        cfg_if::cfg_if! {
427            if #[cfg(windows)] {
428                let roundtrip_cases = &[
429                    "s3://tests/data/delta-0.8.0",
430                    "memory://",
431                    "s3://bucket/my%20table", // Doesn't double-encode
432                ];
433            } else {
434                let roundtrip_cases = &[
435                    "s3://tests/data/delta-0.8.0",
436                    "memory://",
437                    "file:///",
438                    "s3://bucket/my%20table", // Doesn't double-encode
439                ];
440            }
441        }
442
443        for case in roundtrip_cases {
444            let uri = ensure_table_uri(case).unwrap();
445            assert_eq!(case, &uri.as_str());
446        }
447
448        // Other cases
449        let map_cases = &[
450            // extra slashes are removed
451            (
452                "s3://tests/data/delta-0.8.0//",
453                "s3://tests/data/delta-0.8.0",
454            ),
455            ("s3://bucket/my table", "s3://bucket/my%20table"),
456        ];
457
458        for (case, expected) in map_cases {
459            let uri = ensure_table_uri(case).unwrap();
460            assert_eq!(expected, &uri.as_str());
461        }
462    }
463
464    #[test]
465    #[cfg(windows)]
466    fn test_windows_uri() {
467        let map_cases = &[
468            // extra slashes are removed
469            ("c:/", "file:///C:"),
470        ];
471
472        for (case, expected) in map_cases {
473            let uri = ensure_table_uri(case).unwrap();
474            assert_eq!(expected, &uri.as_str());
475        }
476    }
477
478    #[test]
479    fn test_ensure_table_uri_path() {
480        let tmp_dir = tempfile::tempdir().unwrap();
481        let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap();
482        let paths = &[
483            tmp_path.join("data/delta-0.8.0"),
484            tmp_path.join("space in path"),
485            tmp_path.join("special&chars/你好/😊"),
486        ];
487
488        for path in paths {
489            let expected = Url::from_directory_path(path).unwrap();
490            let uri = ensure_table_uri(path.as_os_str().to_str().unwrap()).unwrap();
491            assert_eq!(expected.as_str().trim_end_matches('/'), uri.as_str());
492            assert!(path.exists());
493        }
494
495        // Creates non-existent relative directories
496        let relative_path = std::path::Path::new("_tmp/test %3F");
497        assert!(!relative_path.exists());
498        ensure_table_uri(relative_path.as_os_str().to_str().unwrap()).unwrap();
499        assert!(relative_path.exists());
500        std::fs::remove_dir_all(std::path::Path::new("_tmp")).unwrap();
501    }
502
503    #[test]
504    fn test_ensure_table_uri_url() {
505        // Urls should round trips as-is
506        let expected = Url::parse("memory:///test/tests/data/delta-0.8.0").unwrap();
507        let url = ensure_table_uri(&expected).unwrap();
508        assert_eq!(expected, url);
509
510        let tmp_dir = tempfile::tempdir().unwrap();
511        let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap();
512        let path = tmp_path.join("data/delta-0.8.0");
513        let expected = Url::from_directory_path(path).unwrap();
514        let url = ensure_table_uri(&expected).unwrap();
515        assert_eq!(expected.as_str().trim_end_matches('/'), url.as_str());
516    }
517
518    #[test]
519    fn test_invalid_uri() {
520        // Urls should round trips as-is
521        DeltaTableBuilder::from_valid_uri("this://is.nonsense")
522            .expect_err("this should be an error");
523    }
524
525    #[test]
526    fn test_writer_storage_opts_url_trim() {
527        let cases = [
528            // Trim Case 1 - Key indicating a url
529            ("SOMETHING_URL", "something://else/", "something://else"),
530            // Trim Case 2 - Value https url ending with slash
531            (
532                "SOMETHING",
533                "http://something:port/",
534                "http://something:port",
535            ),
536            // Trim Case 3 - Value https url ending with slash
537            (
538                "SOMETHING",
539                "https://something:port/",
540                "https://something:port",
541            ),
542            // No Trim Case 4 - JDBC MySQL url with slash
543            (
544                "SOME_JDBC_PREFIX",
545                "jdbc:mysql://mysql.db.server:3306/",
546                "jdbc:mysql://mysql.db.server:3306/",
547            ),
548            // No Trim Case 5 - S3A file system link
549            ("SOME_S3_LINK", "s3a://bucket-name/", "s3a://bucket-name/"),
550            // No Trim Case 6 - Not a url but ending with slash
551            ("SOME_RANDOM_STRING", "a1b2c3d4e5f#/", "a1b2c3d4e5f#/"),
552            // No Trim Case 7 - Some value not a url
553            (
554                "SOME_VALUE",
555                "/ This is some value 123 /",
556                "/ This is some value 123 /",
557            ),
558        ];
559        for (key, val, expected) in cases {
560            let table_uri = Url::parse("memory:///test/tests/data/delta-0.8.0").unwrap();
561            let mut storage_opts = HashMap::<String, String>::new();
562            storage_opts.insert(key.to_owned(), val.to_owned());
563
564            let table = DeltaTableBuilder::from_uri(table_uri).with_storage_options(storage_opts);
565            let found_opts = table.storage_options();
566            assert_eq!(expected, found_opts.get(key).unwrap());
567        }
568    }
569}