1use std::collections::HashMap;
4use std::path::PathBuf;
5use std::sync::Arc;
6
7use chrono::{DateTime, FixedOffset, Utc};
8use deltalake_derive::DeltaConfig;
9use object_store::DynObjectStore;
10use serde::{Deserialize, Serialize};
11use tracing::debug;
12use url::Url;
13
14use crate::logstore::storage::IORuntime;
15use crate::logstore::{object_store_factories, LogStoreRef};
16use crate::{DeltaResult, DeltaTable, DeltaTableError};
17
18#[derive(Debug, Copy, Clone, PartialEq, Eq, Default)]
20pub enum DeltaVersion {
21 #[default]
23 Newest,
24 Version(i64),
26 Timestamp(DateTime<Utc>),
28}
29
30#[derive(Debug, Serialize, Deserialize, Clone, DeltaConfig)]
32#[serde(rename_all = "camelCase")]
33pub struct DeltaTableConfig {
34 pub require_files: bool,
40
41 pub log_buffer_size: usize,
49
50 pub log_batch_size: usize,
53
54 #[serde(skip_serializing, skip_deserializing)]
55 #[delta(skip)]
56 pub io_runtime: Option<IORuntime>,
58}
59
60impl Default for DeltaTableConfig {
61 fn default() -> Self {
62 Self {
63 require_files: true,
64 log_buffer_size: num_cpus::get() * 4,
65 log_batch_size: 1024,
66 io_runtime: None,
67 }
68 }
69}
70
71impl PartialEq for DeltaTableConfig {
72 fn eq(&self, other: &Self) -> bool {
73 self.require_files == other.require_files
74 && self.log_buffer_size == other.log_buffer_size
75 && self.log_batch_size == other.log_batch_size
76 }
77}
78
79#[derive(Debug)]
81pub struct DeltaTableBuilder {
82 table_uri: String,
84 storage_backend: Option<(Arc<DynObjectStore>, Url)>,
86 version: DeltaVersion,
89 storage_options: Option<HashMap<String, String>>,
90 #[allow(unused_variables)]
91 allow_http: Option<bool>,
92 table_config: DeltaTableConfig,
93}
94
95impl DeltaTableBuilder {
96 pub fn from_uri(table_uri: impl AsRef<str>) -> Self {
106 let url = ensure_table_uri(&table_uri).expect("The specified table_uri is not valid");
107 DeltaTableBuilder::from_valid_uri(url).expect("Failed to create valid builder")
108 }
109
110 pub fn from_valid_uri(table_uri: impl AsRef<str>) -> DeltaResult<Self> {
118 if let Ok(url) = Url::parse(table_uri.as_ref()) {
119 if url.scheme() == "file" {
120 let path = url.to_file_path().map_err(|_| {
121 DeltaTableError::InvalidTableLocation(table_uri.as_ref().to_string())
122 })?;
123 ensure_file_location_exists(path)?;
124 }
125 } else {
126 ensure_file_location_exists(PathBuf::from(table_uri.as_ref()))?;
127 }
128
129 let url = ensure_table_uri(&table_uri)?;
130 debug!("creating table builder with {url}");
131
132 Ok(Self {
133 table_uri: url.into(),
134 storage_backend: None,
135 version: DeltaVersion::default(),
136 storage_options: None,
137 allow_http: None,
138 table_config: DeltaTableConfig::default(),
139 })
140 }
141
142 pub fn without_files(mut self) -> Self {
144 self.table_config.require_files = false;
145 self
146 }
147
148 pub fn with_version(mut self, version: i64) -> Self {
150 self.version = DeltaVersion::Version(version);
151 self
152 }
153
154 pub fn with_log_buffer_size(mut self, log_buffer_size: usize) -> DeltaResult<Self> {
156 if log_buffer_size == 0 {
157 return Err(DeltaTableError::Generic(String::from(
158 "Log buffer size should be positive",
159 )));
160 }
161 self.table_config.log_buffer_size = log_buffer_size;
162 Ok(self)
163 }
164
165 pub fn with_datestring(self, date_string: impl AsRef<str>) -> DeltaResult<Self> {
167 let datetime = DateTime::<Utc>::from(DateTime::<FixedOffset>::parse_from_rfc3339(
168 date_string.as_ref(),
169 )?);
170 Ok(self.with_timestamp(datetime))
171 }
172
173 pub fn with_timestamp(mut self, timestamp: DateTime<Utc>) -> Self {
175 self.version = DeltaVersion::Timestamp(timestamp);
176 self
177 }
178
179 pub fn with_storage_backend(mut self, storage: Arc<DynObjectStore>, location: Url) -> Self {
189 self.storage_backend = Some((storage, location));
190 self
191 }
192
193 pub fn with_storage_options(mut self, storage_options: HashMap<String, String>) -> Self {
206 self.storage_options = Some(
207 storage_options
208 .clone()
209 .into_iter()
210 .map(|(k, v)| {
211 let needs_trim = v.starts_with("http://")
212 || v.starts_with("https://")
213 || k.to_lowercase().ends_with("_url");
214 if needs_trim {
215 (k.to_owned(), v.trim_end_matches('/').to_owned())
216 } else {
217 (k, v)
218 }
219 })
220 .collect(),
221 );
222 self
223 }
224
225 pub fn with_allow_http(mut self, allow_http: bool) -> Self {
229 self.allow_http = Some(allow_http);
230 self
231 }
232
233 pub fn with_io_runtime(mut self, io_runtime: IORuntime) -> Self {
235 self.table_config.io_runtime = Some(io_runtime);
236 self
237 }
238
239 pub fn storage_options(&self) -> HashMap<String, String> {
241 let mut storage_options = self.storage_options.clone().unwrap_or_default();
242 if let Some(allow) = self.allow_http {
243 storage_options.insert(
244 "allow_http".into(),
245 if allow { "true" } else { "false" }.into(),
246 );
247 };
248 storage_options
249 }
250
251 pub fn build_storage(&self) -> DeltaResult<LogStoreRef> {
253 debug!("build_storage() with {}", self.table_uri);
254 let location = Url::parse(&self.table_uri).map_err(|_| {
255 DeltaTableError::NotATable(format!("Could not turn {} into a URL", self.table_uri))
256 })?;
257
258 if let Some((store, _url)) = self.storage_backend.as_ref() {
259 debug!("Loading a logstore with a custom store: {store:?}");
260 crate::logstore::logstore_with(
261 store.clone(),
262 location,
263 self.storage_options(),
264 self.table_config.io_runtime.clone(),
265 )
266 } else {
267 debug!("Loading a logstore based off the location: {location:?}");
269 crate::logstore::logstore_for(
270 location,
271 self.storage_options(),
272 self.table_config.io_runtime.clone(),
273 )
274 }
275 }
276
277 pub fn build(self) -> DeltaResult<DeltaTable> {
282 Ok(DeltaTable::new(self.build_storage()?, self.table_config))
283 }
284
285 pub async fn load(self) -> DeltaResult<DeltaTable> {
287 let version = self.version;
288 let mut table = self.build()?;
289 match version {
290 DeltaVersion::Newest => table.load().await?,
291 DeltaVersion::Version(v) => table.load_version(v).await?,
292 DeltaVersion::Timestamp(ts) => table.load_with_datetime(ts).await?,
293 }
294 Ok(table)
295 }
296}
297
298enum UriType {
299 LocalPath(PathBuf),
300 Url(Url),
301}
302
303fn resolve_uri_type(table_uri: impl AsRef<str>) -> DeltaResult<UriType> {
308 let table_uri = table_uri.as_ref();
309 let known_schemes: Vec<_> = object_store_factories()
310 .iter()
311 .map(|v| v.key().scheme().to_owned())
312 .collect();
313
314 if let Ok(url) = Url::parse(table_uri) {
315 let scheme = url.scheme().to_string();
316 if url.scheme() == "file" {
317 Ok(UriType::LocalPath(url.to_file_path().map_err(|err| {
318 let msg = format!("Invalid table location: {table_uri}\nError: {err:?}");
319 DeltaTableError::InvalidTableLocation(msg)
320 })?))
321 } else if known_schemes.contains(&scheme) {
323 Ok(UriType::Url(url))
324 } else if scheme.len() == 1 {
327 Ok(UriType::LocalPath(PathBuf::from(table_uri)))
328 } else {
329 Err(DeltaTableError::InvalidTableLocation(format!(
330 "Unknown scheme: {scheme}. Known schemes: {}",
331 known_schemes.join(",")
332 )))
333 }
334 } else {
335 Ok(UriType::LocalPath(PathBuf::from(table_uri)))
336 }
337}
338
339pub fn ensure_table_uri(table_uri: impl AsRef<str>) -> DeltaResult<Url> {
351 let table_uri = table_uri.as_ref();
352
353 let uri_type: UriType = resolve_uri_type(table_uri)?;
354
355 let mut url = match uri_type {
357 UriType::LocalPath(path) => {
358 if !path.exists() {
359 std::fs::create_dir_all(&path).map_err(|err| {
360 let msg =
361 format!("Could not create local directory: {table_uri}\nError: {err:?}");
362 DeltaTableError::InvalidTableLocation(msg)
363 })?;
364 }
365 let path = std::fs::canonicalize(path).map_err(|err| {
366 let msg = format!("Invalid table location: {table_uri}\nError: {err:?}");
367 DeltaTableError::InvalidTableLocation(msg)
368 })?;
369 Url::from_directory_path(path).map_err(|_| {
370 let msg = format!(
371 "Could not construct a URL from the canonical path: {table_uri}.\n\
372 Something must be very wrong with the table path.",
373 );
374 DeltaTableError::InvalidTableLocation(msg)
375 })?
376 }
377 UriType::Url(url) => url,
378 };
379
380 let trimmed_path = url.path().trim_end_matches('/').to_owned();
381 url.set_path(&trimmed_path);
382 Ok(url)
383}
384
385fn ensure_file_location_exists(path: PathBuf) -> DeltaResult<()> {
388 if !path.exists() {
389 let msg = format!(
390 "Local path \"{}\" does not exist or you don't have access!",
391 path.as_path().display(),
392 );
393 return Err(DeltaTableError::InvalidTableLocation(msg));
394 }
395 Ok(())
396}
397
398#[cfg(test)]
399mod tests {
400 use super::*;
401 use crate::logstore::factories::DefaultObjectStoreFactory;
402
403 #[test]
404 fn test_ensure_table_uri() {
405 object_store_factories().insert(
406 Url::parse("s3://").unwrap(),
407 Arc::new(DefaultObjectStoreFactory::default()),
408 );
409
410 let uri = ensure_table_uri(".");
412 assert!(uri.is_ok());
413 let uri = ensure_table_uri("s3://container/path");
414 assert!(uri.is_ok());
415 #[cfg(not(windows))]
416 {
417 let uri = ensure_table_uri("file:///tmp/nonexistent/some/path");
418 assert!(uri.is_ok());
419 }
420 let uri = ensure_table_uri("./nonexistent");
421 assert!(uri.is_ok());
422 let file_path = std::path::Path::new("./nonexistent");
423 std::fs::remove_dir(file_path).unwrap();
424
425 cfg_if::cfg_if! {
427 if #[cfg(windows)] {
428 let roundtrip_cases = &[
429 "s3://tests/data/delta-0.8.0",
430 "memory://",
431 "s3://bucket/my%20table", ];
433 } else {
434 let roundtrip_cases = &[
435 "s3://tests/data/delta-0.8.0",
436 "memory://",
437 "file:///",
438 "s3://bucket/my%20table", ];
440 }
441 }
442
443 for case in roundtrip_cases {
444 let uri = ensure_table_uri(case).unwrap();
445 assert_eq!(case, &uri.as_str());
446 }
447
448 let map_cases = &[
450 (
452 "s3://tests/data/delta-0.8.0//",
453 "s3://tests/data/delta-0.8.0",
454 ),
455 ("s3://bucket/my table", "s3://bucket/my%20table"),
456 ];
457
458 for (case, expected) in map_cases {
459 let uri = ensure_table_uri(case).unwrap();
460 assert_eq!(expected, &uri.as_str());
461 }
462 }
463
464 #[test]
465 #[cfg(windows)]
466 fn test_windows_uri() {
467 let map_cases = &[
468 ("c:/", "file:///C:"),
470 ];
471
472 for (case, expected) in map_cases {
473 let uri = ensure_table_uri(case).unwrap();
474 assert_eq!(expected, &uri.as_str());
475 }
476 }
477
478 #[test]
479 fn test_ensure_table_uri_path() {
480 let tmp_dir = tempfile::tempdir().unwrap();
481 let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap();
482 let paths = &[
483 tmp_path.join("data/delta-0.8.0"),
484 tmp_path.join("space in path"),
485 tmp_path.join("special&chars/你好/😊"),
486 ];
487
488 for path in paths {
489 let expected = Url::from_directory_path(path).unwrap();
490 let uri = ensure_table_uri(path.as_os_str().to_str().unwrap()).unwrap();
491 assert_eq!(expected.as_str().trim_end_matches('/'), uri.as_str());
492 assert!(path.exists());
493 }
494
495 let relative_path = std::path::Path::new("_tmp/test %3F");
497 assert!(!relative_path.exists());
498 ensure_table_uri(relative_path.as_os_str().to_str().unwrap()).unwrap();
499 assert!(relative_path.exists());
500 std::fs::remove_dir_all(std::path::Path::new("_tmp")).unwrap();
501 }
502
503 #[test]
504 fn test_ensure_table_uri_url() {
505 let expected = Url::parse("memory:///test/tests/data/delta-0.8.0").unwrap();
507 let url = ensure_table_uri(&expected).unwrap();
508 assert_eq!(expected, url);
509
510 let tmp_dir = tempfile::tempdir().unwrap();
511 let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap();
512 let path = tmp_path.join("data/delta-0.8.0");
513 let expected = Url::from_directory_path(path).unwrap();
514 let url = ensure_table_uri(&expected).unwrap();
515 assert_eq!(expected.as_str().trim_end_matches('/'), url.as_str());
516 }
517
518 #[test]
519 fn test_invalid_uri() {
520 DeltaTableBuilder::from_valid_uri("this://is.nonsense")
522 .expect_err("this should be an error");
523 }
524
525 #[test]
526 fn test_writer_storage_opts_url_trim() {
527 let cases = [
528 ("SOMETHING_URL", "something://else/", "something://else"),
530 (
532 "SOMETHING",
533 "http://something:port/",
534 "http://something:port",
535 ),
536 (
538 "SOMETHING",
539 "https://something:port/",
540 "https://something:port",
541 ),
542 (
544 "SOME_JDBC_PREFIX",
545 "jdbc:mysql://mysql.db.server:3306/",
546 "jdbc:mysql://mysql.db.server:3306/",
547 ),
548 ("SOME_S3_LINK", "s3a://bucket-name/", "s3a://bucket-name/"),
550 ("SOME_RANDOM_STRING", "a1b2c3d4e5f#/", "a1b2c3d4e5f#/"),
552 (
554 "SOME_VALUE",
555 "/ This is some value 123 /",
556 "/ This is some value 123 /",
557 ),
558 ];
559 for (key, val, expected) in cases {
560 let table_uri = Url::parse("memory:///test/tests/data/delta-0.8.0").unwrap();
561 let mut storage_opts = HashMap::<String, String>::new();
562 storage_opts.insert(key.to_owned(), val.to_owned());
563
564 let table = DeltaTableBuilder::from_uri(table_uri).with_storage_options(storage_opts);
565 let found_opts = table.storage_options();
566 assert_eq!(expected, found_opts.get(key).unwrap());
567 }
568 }
569}