deltalake_mount/
lib.rs

1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::Arc;
4
5use deltalake_core::logstore::DeltaIOStorageBackend;
6use deltalake_core::logstore::{
7    config::str_is_truthy, default_logstore, logstore_factories, object_store_factories, LogStore,
8    LogStoreFactory, ObjectStoreFactory, ObjectStoreRef, StorageConfig,
9};
10use deltalake_core::{DeltaResult, DeltaTableError, Path};
11use object_store::local::LocalFileSystem;
12use object_store::DynObjectStore;
13use url::Url;
14
15mod config;
16pub mod error;
17mod file;
18
19trait MountOptions {
20    fn as_mount_options(&self) -> HashMap<config::MountConfigKey, String>;
21}
22
23impl MountOptions for HashMap<String, String> {
24    fn as_mount_options(&self) -> HashMap<config::MountConfigKey, String> {
25        self.iter()
26            .filter_map(|(key, value)| {
27                Some((
28                    config::MountConfigKey::from_str(&key.to_ascii_lowercase()).ok()?,
29                    value.clone(),
30                ))
31            })
32            .collect()
33    }
34}
35
36#[derive(Clone, Default, Debug)]
37pub struct MountFactory {}
38
39impl ObjectStoreFactory for MountFactory {
40    fn parse_url_opts(
41        &self,
42        url: &Url,
43        config: &StorageConfig,
44    ) -> DeltaResult<(ObjectStoreRef, Path)> {
45        let mount_config =
46            config::MountConfigHelper::try_new(config.raw.as_mount_options())?.build()?;
47
48        let allow_unsafe_rename = str_is_truthy(
49            mount_config
50                .get(&crate::config::MountConfigKey::AllowUnsafeRename)
51                .unwrap_or(&String::new()),
52        );
53
54        let (mut store, prefix) = match url.scheme() {
55            "dbfs" => {
56                if !allow_unsafe_rename {
57                    // Just let the user know that they need to set the allow_unsafe_rename option
58                    return Err(error::Error::AllowUnsafeRenameNotSpecified.into());
59                }
60                // We need to convert the dbfs url to a file url
61                Url::parse(&format!("file:///dbfs{}", url.path())).unwrap();
62                let store = Arc::new(file::MountFileStorageBackend::try_new()?) as ObjectStoreRef;
63                Ok((store, Path::from("/")))
64            }
65            "file" => {
66                if allow_unsafe_rename {
67                    let store =
68                        Arc::new(file::MountFileStorageBackend::try_new()?) as ObjectStoreRef;
69                    let prefix = Path::from_filesystem_path(url.to_file_path().unwrap())?;
70                    Ok((store, prefix))
71                } else {
72                    let store = Arc::new(LocalFileSystem::new()) as ObjectStoreRef;
73                    let prefix = Path::from_filesystem_path(url.to_file_path().unwrap())?;
74                    Ok((store, prefix))
75                }
76            }
77            _ => Err(DeltaTableError::InvalidTableLocation(url.clone().into())),
78        }?;
79
80        if let Some(runtime) = &config.runtime {
81            store =
82                Arc::new(DeltaIOStorageBackend::new(store, runtime.clone())) as Arc<DynObjectStore>;
83        }
84        Ok((store, prefix))
85    }
86}
87
88impl LogStoreFactory for MountFactory {
89    fn with_options(
90        &self,
91        prefixed_store: ObjectStoreRef,
92        root_store: ObjectStoreRef,
93        location: &Url,
94        options: &StorageConfig,
95    ) -> DeltaResult<Arc<dyn LogStore>> {
96        Ok(default_logstore(
97            prefixed_store,
98            root_store,
99            location,
100            options,
101        ))
102    }
103}
104
105/// Register an [ObjectStoreFactory] for common Mount [Url] schemes
106pub fn register_handlers(_additional_prefixes: Option<Url>) {
107    let factory = Arc::new(MountFactory {});
108    for scheme in ["dbfs", "file"].iter() {
109        let url = Url::parse(&format!("{scheme}://")).unwrap();
110        object_store_factories().insert(url.clone(), factory.clone());
111        logstore_factories().insert(url.clone(), factory.clone());
112    }
113}