1use std::collections::HashMap;
4use std::path::PathBuf;
5use std::sync::Arc;
6
7use chrono::{DateTime, FixedOffset, Utc};
8use deltalake_derive::DeltaConfig;
9use object_store::DynObjectStore;
10use serde::{Deserialize, Serialize};
11use tracing::debug;
12use url::Url;
13
14use super::normalize_table_url;
15use crate::logstore::storage::IORuntime;
16use crate::logstore::{LogStoreRef, StorageConfig, object_store_factories};
17use crate::{DeltaResult, DeltaTable, DeltaTableError};
18
19#[derive(Debug, Copy, Clone, PartialEq, Eq, Default)]
21pub enum DeltaVersion {
22 #[default]
24 Newest,
25 Version(i64),
27 Timestamp(DateTime<Utc>),
29}
30
31#[derive(Debug, Serialize, Deserialize, Clone, DeltaConfig)]
33#[serde(rename_all = "camelCase")]
34pub struct DeltaTableConfig {
35 pub require_files: bool,
41
42 pub log_buffer_size: usize,
50
51 pub log_batch_size: usize,
54
55 #[serde(skip_serializing, skip_deserializing)]
56 #[delta(skip)]
57 pub io_runtime: Option<IORuntime>,
59}
60
61impl Default for DeltaTableConfig {
62 fn default() -> Self {
63 Self {
64 require_files: true,
65 log_buffer_size: num_cpus::get() * 4,
66 log_batch_size: 1024,
67 io_runtime: None,
68 }
69 }
70}
71
72impl PartialEq for DeltaTableConfig {
73 fn eq(&self, other: &Self) -> bool {
74 self.require_files == other.require_files
75 && self.log_buffer_size == other.log_buffer_size
76 && self.log_batch_size == other.log_batch_size
77 }
78}
79
80#[derive(Debug)]
82pub struct DeltaTableBuilder {
83 table_url: Url,
85 storage_backend: Option<(Arc<DynObjectStore>, Url)>,
87 version: DeltaVersion,
90 storage_options: Option<HashMap<String, String>>,
91 allow_http: Option<bool>,
92 table_config: DeltaTableConfig,
93}
94
95impl DeltaTableBuilder {
96 pub fn from_url(table_url: Url) -> DeltaResult<Self> {
105 let table_url = Url::parse(table_url.as_str()).map_err(|_| {
108 DeltaTableError::NotATable(
109 "Received path segments that could not be canonicalized".into(),
110 )
111 })?;
112
113 debug!("creating table builder with {table_url}");
114
115 Ok(Self {
116 table_url,
117 storage_backend: None,
118 version: DeltaVersion::default(),
119 storage_options: None,
120 allow_http: None,
121 table_config: DeltaTableConfig::default(),
122 })
123 }
124
125 pub fn without_files(mut self) -> Self {
127 self.table_config.require_files = false;
128 self
129 }
130
131 pub fn with_version(mut self, version: i64) -> Self {
133 self.version = DeltaVersion::Version(version);
134 self
135 }
136
137 pub fn with_log_buffer_size(mut self, log_buffer_size: usize) -> DeltaResult<Self> {
139 if log_buffer_size == 0 {
140 return Err(DeltaTableError::Generic(String::from(
141 "Log buffer size should be positive",
142 )));
143 }
144 self.table_config.log_buffer_size = log_buffer_size;
145 Ok(self)
146 }
147
148 pub fn with_datestring(self, date_string: impl AsRef<str>) -> DeltaResult<Self> {
150 let datetime = DateTime::<Utc>::from(DateTime::<FixedOffset>::parse_from_rfc3339(
151 date_string.as_ref(),
152 )?);
153 Ok(self.with_timestamp(datetime))
154 }
155
156 pub fn with_timestamp(mut self, timestamp: DateTime<Utc>) -> Self {
158 self.version = DeltaVersion::Timestamp(timestamp);
159 self
160 }
161
162 pub fn with_storage_backend(
172 mut self,
173 root_storage: Arc<DynObjectStore>,
174 location: Url,
175 ) -> Self {
176 self.storage_backend = Some((root_storage, location));
177 self
178 }
179
180 pub fn with_storage_options(mut self, storage_options: HashMap<String, String>) -> Self {
193 self.storage_options = Some(
194 storage_options
195 .clone()
196 .into_iter()
197 .map(|(k, v)| {
198 let needs_trim = v.starts_with("http://")
199 || v.starts_with("https://")
200 || k.to_lowercase().ends_with("_url");
201 if needs_trim {
202 (k.to_owned(), v.trim_end_matches('/').to_owned())
203 } else {
204 (k, v)
205 }
206 })
207 .collect(),
208 );
209 self
210 }
211
212 pub fn with_allow_http(mut self, allow_http: bool) -> Self {
216 self.allow_http = Some(allow_http);
217 self
218 }
219
220 pub fn with_io_runtime(mut self, io_runtime: IORuntime) -> Self {
222 self.table_config.io_runtime = Some(io_runtime);
223 self
224 }
225
226 pub fn storage_options(&self) -> HashMap<String, String> {
228 let mut storage_options = self.storage_options.clone().unwrap_or_default();
229 if let Some(allow) = self.allow_http {
230 storage_options.insert(
231 "allow_http".into(),
232 if allow { "true" } else { "false" }.into(),
233 );
234 };
235 storage_options
236 }
237
238 pub fn build_storage(&self) -> DeltaResult<LogStoreRef> {
240 debug!("build_storage() with {}", self.table_url);
241
242 let mut storage_config = StorageConfig::parse_options(self.storage_options())?;
243 if let Some(io_runtime) = self.table_config.io_runtime.clone() {
244 storage_config = storage_config.with_io_runtime(io_runtime);
245 }
246
247 if let Some((store, _url)) = self.storage_backend.as_ref() {
248 debug!("Loading a logstore with a custom store: {store:?}");
249 crate::logstore::logstore_with(store.clone(), &self.table_url, storage_config)
250 } else {
251 debug!(
253 "Loading a logstore based off the location: {:?}",
254 self.table_url
255 );
256 crate::logstore::logstore_for(&self.table_url, storage_config)
257 }
258 }
259
260 pub fn build(self) -> DeltaResult<DeltaTable> {
265 Ok(DeltaTable::new(self.build_storage()?, self.table_config))
266 }
267
268 pub async fn load(self) -> DeltaResult<DeltaTable> {
270 let version = self.version;
271 let mut table = self.build()?;
272 match version {
273 DeltaVersion::Newest => table.load().await?,
274 DeltaVersion::Version(v) => table.load_version(v).await?,
275 DeltaVersion::Timestamp(ts) => table.load_with_datetime(ts).await?,
276 }
277 Ok(table)
278 }
279}
280
281enum UriType {
282 LocalPath(PathBuf),
283 Url(Url),
284}
285
286fn expand_tilde_path(path: &str) -> DeltaResult<PathBuf> {
288 if path.starts_with("~/") || path == "~" {
289 let home_dir = dirs::home_dir().ok_or_else(|| {
290 DeltaTableError::InvalidTableLocation(
291 "Could not determine home directory for tilde expansion".to_string(),
292 )
293 })?;
294
295 if path == "~" {
296 Ok(home_dir)
297 } else {
298 let relative_path = &path[2..];
299 Ok(home_dir.join(relative_path))
300 }
301 } else {
302 Ok(PathBuf::from(path))
303 }
304}
305
306fn resolve_uri_type(table_uri: impl AsRef<str>) -> DeltaResult<UriType> {
311 let table_uri = table_uri.as_ref();
312 let known_schemes: Vec<_> = object_store_factories()
313 .iter()
314 .map(|v| v.key().scheme().to_owned())
315 .collect();
316
317 match Url::parse(table_uri) {
318 Ok(url) => {
319 let scheme = url.scheme().to_string();
320 if url.scheme() == "file" {
321 Ok(UriType::LocalPath(url.to_file_path().map_err(|err| {
322 let msg = format!("Invalid table location: {table_uri}\nError: {err:?}");
323 DeltaTableError::InvalidTableLocation(msg)
324 })?))
325 } else if known_schemes.contains(&scheme) {
327 Ok(UriType::Url(url))
328 } else if scheme.len() == 1 {
331 Ok(UriType::LocalPath(expand_tilde_path(table_uri)?))
332 } else {
333 Err(DeltaTableError::InvalidTableLocation(format!(
334 "Unknown scheme: {scheme}. Known schemes: {}",
335 known_schemes.join(",")
336 )))
337 }
338 }
339 Err(url_error) => {
340 match url_error {
341 url::ParseError::RelativeUrlWithoutBase => {
344 Ok(UriType::LocalPath(expand_tilde_path(table_uri)?))
345 }
346 _others => Err(DeltaTableError::InvalidTableLocation(format!(
349 "Could not parse {table_uri} as a URL: {url_error}"
350 ))),
351 }
352 }
353 }
354}
355
356pub fn parse_table_uri(table_uri: impl AsRef<str>) -> DeltaResult<Url> {
367 let table_uri = table_uri.as_ref();
368
369 let uri_type: UriType = resolve_uri_type(table_uri)?;
370
371 let mut url = match uri_type {
372 UriType::LocalPath(path) => {
373 let path = std::fs::canonicalize(path).map_err(|err| {
374 let msg = format!("Invalid table location: {table_uri}\nError: {err:?}");
375 DeltaTableError::InvalidTableLocation(msg)
376 })?;
377 Url::from_directory_path(path).map_err(|_| {
378 let msg = format!(
379 "Could not construct a URL from the canonical path: {table_uri}.\n\
380 Something must be very wrong with the table path.",
381 );
382 DeltaTableError::InvalidTableLocation(msg)
383 })?
384 }
385 UriType::Url(url) => url,
386 };
387
388 let trimmed_path = url.path().trim_end_matches('/').to_owned();
389 url.set_path(&trimmed_path);
390 Ok(url)
391}
392
393pub fn ensure_table_uri(table_uri: impl AsRef<str>) -> DeltaResult<Url> {
396 let table_uri = table_uri.as_ref();
397
398 let uri_type: UriType = resolve_uri_type(table_uri)?;
399
400 let url = match uri_type {
402 UriType::LocalPath(path) => {
403 if !path.exists() {
404 std::fs::create_dir_all(&path).map_err(|err| {
405 let msg =
406 format!("Could not create local directory: {table_uri}\nError: {err:?}");
407 DeltaTableError::InvalidTableLocation(msg)
408 })?;
409 }
410 let path = std::fs::canonicalize(path).map_err(|err| {
411 let msg = format!("Invalid table location: {table_uri}\nError: {err:?}");
412 DeltaTableError::InvalidTableLocation(msg)
413 })?;
414 Url::from_directory_path(path).map_err(|_| {
415 let msg = format!(
416 "Could not construct a URL from the canonical path: {table_uri}.\n\
417 Something must be very wrong with the table path.",
418 );
419 DeltaTableError::InvalidTableLocation(msg)
420 })?
421 }
422 UriType::Url(url) => url,
423 };
424
425 Ok(normalize_table_url(&url))
429}
430
431#[cfg(test)]
432mod tests {
433 use super::*;
434 use crate::logstore::factories::DefaultObjectStoreFactory;
435
436 #[test]
437 fn test_ensure_table_uri() {
438 object_store_factories().insert(
439 Url::parse("s3://").unwrap(),
440 Arc::new(DefaultObjectStoreFactory::default()),
441 );
442
443 let uri = ensure_table_uri(".");
445 assert!(uri.is_ok());
446 let uri = ensure_table_uri("s3://container/path");
447 assert!(uri.is_ok());
448 assert_eq!(Url::parse("s3://container/path/").unwrap(), uri.unwrap());
449 #[cfg(not(windows))]
450 {
451 let uri = ensure_table_uri("file:///tmp/nonexistent/some/path");
452 assert!(uri.is_ok());
453 }
454 let uri = ensure_table_uri("./nonexistent");
455 assert!(uri.is_ok());
456 let file_path = std::path::Path::new("./nonexistent");
457 std::fs::remove_dir(file_path).unwrap();
458
459 cfg_if::cfg_if! {
461 if #[cfg(windows)] {
462 let roundtrip_cases = &[
463 "s3://tests/data/delta-0.8.0/",
464 "memory://",
465 "s3://bucket/my%20table/", ];
467 } else {
468 let roundtrip_cases = &[
469 "s3://tests/data/delta-0.8.0/",
470 "memory://",
471 "file:///",
472 "s3://bucket/my%20table/", ];
474 }
475 }
476
477 for case in roundtrip_cases {
478 let uri = ensure_table_uri(case).unwrap();
479 assert_eq!(case, &uri.as_str());
480 }
481
482 let map_cases = &[
484 (
486 "s3://tests/data/delta-0.8.0//",
487 "s3://tests/data/delta-0.8.0/",
488 ),
489 ("s3://bucket/my table", "s3://bucket/my%20table/"),
490 ];
491
492 for (case, expected) in map_cases {
493 let uri = ensure_table_uri(case).unwrap();
494 assert_eq!(expected, &uri.as_str());
495 }
496 }
497
498 #[test]
499 #[cfg(windows)]
500 fn test_windows_uri() {
501 let map_cases = &[
502 ("c://", "file:///C:/"),
504 ];
505
506 for (case, expected) in map_cases {
507 let uri = ensure_table_uri(case).unwrap();
508 assert_eq!(expected, &uri.as_str());
509 }
510 }
511
512 #[test]
513 fn test_ensure_table_uri_path() {
514 let tmp_dir = tempfile::tempdir().unwrap();
515 let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap();
516 let paths = &[
517 tmp_path.join("data/delta-0.8.0"),
518 tmp_path.join("space in path"),
519 tmp_path.join("special&chars/你好/😊"),
520 ];
521
522 for path in paths {
523 let expected = Url::from_directory_path(path).unwrap();
524 let uri = ensure_table_uri(path.as_os_str().to_str().unwrap()).unwrap();
525 assert_eq!(expected.as_str(), uri.as_str());
526 assert!(path.exists());
527 }
528
529 let relative_path = std::path::Path::new("_tmp/test %3F");
531 assert!(!relative_path.exists());
532 ensure_table_uri(relative_path.as_os_str().to_str().unwrap()).unwrap();
533 assert!(relative_path.exists());
534 std::fs::remove_dir_all(std::path::Path::new("_tmp")).unwrap();
535 }
536
537 #[test]
538 fn test_ensure_table_uri_url() {
539 let expected = Url::parse("memory:///test/tests/data/delta-0.8.0/").unwrap();
541 let url = ensure_table_uri(&expected).unwrap();
542 assert_eq!(expected, url);
543
544 let tmp_dir = tempfile::tempdir().unwrap();
545 let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap();
546 let path = tmp_path.join("data/delta-0.8.0");
547 let expected = Url::from_directory_path(path).unwrap();
548 let url = ensure_table_uri(&expected).unwrap();
549 assert_eq!(expected.as_str(), url.as_str());
550 }
551
552 #[test]
553 fn test_writer_storage_opts_url_trim() {
554 let cases = [
555 ("SOMETHING_URL", "something://else/", "something://else"),
557 (
559 "SOMETHING",
560 "http://something:port/",
561 "http://something:port",
562 ),
563 (
565 "SOMETHING",
566 "https://something:port/",
567 "https://something:port",
568 ),
569 (
571 "SOME_JDBC_PREFIX",
572 "jdbc:mysql://mysql.db.server:3306/",
573 "jdbc:mysql://mysql.db.server:3306/",
574 ),
575 ("SOME_S3_LINK", "s3a://bucket-name/", "s3a://bucket-name/"),
577 ("SOME_RANDOM_STRING", "a1b2c3d4e5f#/", "a1b2c3d4e5f#/"),
579 (
581 "SOME_VALUE",
582 "/ This is some value 123 /",
583 "/ This is some value 123 /",
584 ),
585 ];
586 for (key, val, expected) in cases {
587 let table_uri = Url::parse("memory:///test/tests/data/delta-0.8.0").unwrap();
588 let mut storage_opts = HashMap::<String, String>::new();
589 storage_opts.insert(key.to_owned(), val.to_owned());
590
591 let table = DeltaTableBuilder::from_url(table_uri)
592 .unwrap()
593 .with_storage_options(storage_opts);
594 let found_opts = table.storage_options();
595 assert_eq!(expected, found_opts.get(key).unwrap());
596 }
597 }
598
599 #[test]
600 fn test_expand_tilde_path() {
601 let home_dir = dirs::home_dir().expect("Should have home directory");
602
603 let result = expand_tilde_path("~").unwrap();
604 assert_eq!(result, home_dir);
605
606 let result = expand_tilde_path("~/test/path").unwrap();
607 assert_eq!(result, home_dir.join("test/path"));
608
609 let result = expand_tilde_path("/absolute/path").unwrap();
610 assert_eq!(result, PathBuf::from("/absolute/path"));
611
612 let result = expand_tilde_path("relative/path").unwrap();
613 assert_eq!(result, PathBuf::from("relative/path"));
614
615 let result = expand_tilde_path("~other").unwrap();
616 assert_eq!(result, PathBuf::from("~other"));
617 }
618
619 #[test]
620 fn test_resolve_uri_type_with_tilde() {
621 let home_dir = dirs::home_dir().expect("Should have home directory");
622
623 match resolve_uri_type("~/test/path").unwrap() {
624 UriType::LocalPath(path) => {
625 assert_eq!(path, home_dir.join("test/path"));
626 }
627 _ => panic!("Expected LocalPath"),
628 }
629
630 match resolve_uri_type("~").unwrap() {
631 UriType::LocalPath(path) => {
632 assert_eq!(path, home_dir);
633 }
634 _ => panic!("Expected LocalPath"),
635 }
636
637 match resolve_uri_type("regular/path").unwrap() {
638 UriType::LocalPath(path) => {
639 assert_eq!(path, PathBuf::from("regular/path"));
640 }
641 _ => panic!("Expected LocalPath"),
642 }
643 }
644
645 #[test]
646 fn test_invalid_url_but_invalid_file_path_too() -> DeltaResult<()> {
647 for wrong in &["s3://arn:aws:s3:::something", "hdfs://"] {
648 let result = ensure_table_uri(wrong);
649 assert!(
650 result.is_err(),
651 "Expected {wrong} parsed into {result:#?} to return an error because I gave it something URLish"
652 );
653 }
654 Ok(())
655 }
656
657 #[test]
658 fn test_ensure_table_uri_with_tilde() {
659 let home_dir = dirs::home_dir().expect("Should have home directory");
660
661 let test_dir = home_dir.join("delta_test_temp");
662 std::fs::create_dir_all(&test_dir).ok();
663
664 let tilde_path = "~/delta_test_temp";
665 let result = ensure_table_uri(tilde_path);
666 assert!(
667 result.is_ok(),
668 "ensure_table_uri should work with tilde paths"
669 );
670
671 let url = result.unwrap();
672 assert!(!url.as_str().contains("~"));
673
674 #[cfg(windows)]
675 {
676 let home_dir_normalized = home_dir.to_string_lossy().replace('\\', "/");
677 assert!(url.as_str().contains(&home_dir_normalized));
678 }
679
680 #[cfg(not(windows))]
681 {
682 assert!(url.as_str().contains(home_dir.to_string_lossy().as_ref()));
683 }
684
685 std::fs::remove_dir_all(&test_dir).ok();
686 }
687
688 #[test]
689 fn test_create_builder_from_non_existent_path() {
690 let tmp_dir = tempfile::tempdir().unwrap();
691 let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap();
692 let new_path = tmp_path.join("new_table");
693 assert!(!new_path.exists());
694
695 let builder_result =
696 DeltaTableBuilder::from_url(Url::from_directory_path(&new_path).unwrap());
697 assert!(
698 builder_result.is_ok(),
699 "Builder should be created successfully even if the path does not exist"
700 );
701
702 let builder = builder_result.unwrap();
703 assert_eq!(
704 builder.table_url.as_str(),
705 Url::from_directory_path(&new_path).unwrap().as_str()
706 );
707 }
708}