1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::Arc;
4
5use deltalake_core::logstore::DeltaIOStorageBackend;
6use deltalake_core::logstore::{
7 config::str_is_truthy, default_logstore, logstore_factories, object_store_factories, LogStore,
8 LogStoreFactory, ObjectStoreFactory, ObjectStoreRef, StorageConfig,
9};
10use deltalake_core::{DeltaResult, DeltaTableError, Path};
11use object_store::local::LocalFileSystem;
12use object_store::DynObjectStore;
13use url::Url;
14
15mod config;
16pub mod error;
17mod file;
18
19trait MountOptions {
20 fn as_mount_options(&self) -> HashMap<config::MountConfigKey, String>;
21}
22
23impl MountOptions for HashMap<String, String> {
24 fn as_mount_options(&self) -> HashMap<config::MountConfigKey, String> {
25 self.iter()
26 .filter_map(|(key, value)| {
27 Some((
28 config::MountConfigKey::from_str(&key.to_ascii_lowercase()).ok()?,
29 value.clone(),
30 ))
31 })
32 .collect()
33 }
34}
35
36#[derive(Clone, Default, Debug)]
37pub struct MountFactory {}
38
39impl ObjectStoreFactory for MountFactory {
40 fn parse_url_opts(
41 &self,
42 url: &Url,
43 config: &StorageConfig,
44 ) -> DeltaResult<(ObjectStoreRef, Path)> {
45 let mount_config =
46 config::MountConfigHelper::try_new(config.raw.as_mount_options())?.build()?;
47
48 let allow_unsafe_rename = str_is_truthy(
49 mount_config
50 .get(&crate::config::MountConfigKey::AllowUnsafeRename)
51 .unwrap_or(&String::new()),
52 );
53
54 let (mut store, prefix) = match url.scheme() {
55 "dbfs" => {
56 if !allow_unsafe_rename {
57 return Err(error::Error::AllowUnsafeRenameNotSpecified.into());
59 }
60 Url::parse(&format!("file:///dbfs{}", url.path())).unwrap();
62 let store = Arc::new(file::MountFileStorageBackend::try_new()?) as ObjectStoreRef;
63 Ok((store, Path::from("/")))
64 }
65 "file" => {
66 if allow_unsafe_rename {
67 let store =
68 Arc::new(file::MountFileStorageBackend::try_new()?) as ObjectStoreRef;
69 let prefix = Path::from_filesystem_path(url.to_file_path().unwrap())?;
70 Ok((store, prefix))
71 } else {
72 let store = Arc::new(LocalFileSystem::new()) as ObjectStoreRef;
73 let prefix = Path::from_filesystem_path(url.to_file_path().unwrap())?;
74 Ok((store, prefix))
75 }
76 }
77 _ => Err(DeltaTableError::InvalidTableLocation(url.clone().into())),
78 }?;
79
80 if let Some(runtime) = &config.runtime {
81 store =
82 Arc::new(DeltaIOStorageBackend::new(store, runtime.clone())) as Arc<DynObjectStore>;
83 }
84 Ok((store, prefix))
85 }
86}
87
88impl LogStoreFactory for MountFactory {
89 fn with_options(
90 &self,
91 prefixed_store: ObjectStoreRef,
92 root_store: ObjectStoreRef,
93 location: &Url,
94 options: &StorageConfig,
95 ) -> DeltaResult<Arc<dyn LogStore>> {
96 Ok(default_logstore(
97 prefixed_store,
98 root_store,
99 location,
100 options,
101 ))
102 }
103}
104
105pub fn register_handlers(_additional_prefixes: Option<Url>) {
107 let factory = Arc::new(MountFactory {});
108 for scheme in ["dbfs", "file"].iter() {
109 let url = Url::parse(&format!("{scheme}://")).unwrap();
110 object_store_factories().insert(url.clone(), factory.clone());
111 logstore_factories().insert(url.clone(), factory.clone());
112 }
113}