1pub mod serde {
3 use serde::{Deserialize, Deserializer};
4 use std::path::Path;
5 use url::{ParseError, Url};
6
7 pub fn deserialize_file_location<'de, D>(deserializer: D) -> core::result::Result<Url, D::Error>
10 where
11 D: Deserializer<'de>,
12 {
13 let buf = String::deserialize(deserializer)?;
14
15 let url = match Url::parse(buf.as_str()) {
16 Err(ParseError::RelativeUrlWithoutBase)
17 | Err(ParseError::RelativeUrlWithCannotBeABaseBase)
18 if buf.ends_with('/') || buf.ends_with('\\') =>
19 {
20 let canonicalized = Path::new(buf.as_str()).canonicalize().map_err(|e| {
21 serde::de::Error::custom(format!("error canonicalizing dir path: {e:?}"))
22 })?;
23
24 let url = Url::from_directory_path(canonicalized).map_err(|e| {
25 serde::de::Error::custom(format!("error parsing directory path as url: {e:?}"))
26 })?;
27
28 Ok(url)
29 }
30 Err(ParseError::RelativeUrlWithoutBase)
31 | Err(ParseError::RelativeUrlWithCannotBeABaseBase) => {
32 let (path, file_name) = buf
33 .contains('/')
34 .then_some('/')
35 .or(buf.contains('\\').then_some('\\'))
36 .and_then(|split_char| buf.as_str().rsplit_once(split_char))
37 .ok_or_else(|| {
38 serde::de::Error::custom(
39 "relative paths cannot only contain the file name".to_string(),
40 )
41 })?;
42
43 let canonicalized = {
45 let mut path = Path::new(path).canonicalize().map_err(|e| {
46 serde::de::Error::custom(format!(
47 "error canonicalizing file path '{buf}': {e:?}"
48 ))
49 })?;
50
51 path.push(file_name);
52 path
53 };
54
55 let url = Url::from_file_path(canonicalized).map_err(|e| {
56 serde::de::Error::custom(format!(
57 "error parsing file path as url '{buf}': {e:?}"
58 ))
59 })?;
60
61 Ok(url)
62 }
63 Err(err) => Err(serde::de::Error::custom(format!(
64 "error parsing location: {err:?}"
65 ))),
66 Ok(url) => Ok(url),
67 }?;
68
69 Ok(url)
70 }
71}
72
73pub mod store {
75 use deltalake::{
76 datafusion::prelude::SessionContext, storage::StorageOptions, DeltaTableError,
77 };
78 use std::{collections::HashMap, sync::Arc};
79 use url::Url;
80
81 #[allow(dead_code)]
83 pub fn register_handlers() {
84 #[cfg(feature = "s3")]
85 {
86 deltalake::aws::register_handlers(None);
87 }
88
89 #[cfg(feature = "gcs")]
90 {
91 deltalake::gcp::register_handlers(None);
92 }
93
94 #[cfg(feature = "azure")]
95 {
96 deltalake::azure::register_handlers(None);
97 }
98 }
99
100 pub fn register_object_store(
101 ctx: Arc<SessionContext>,
102 location: &Url,
103 storage_options: &HashMap<String, String>,
104 ) -> Result<(), DeltaTableError> {
105 if location.scheme() == "file" || location.scheme() == "memory" {
106 return Ok(());
107 }
108
109 let scheme = Url::parse(&format!("{}://", location.scheme())).unwrap();
110 if let Some(factory) = deltalake::storage::factories().get(&scheme) {
111 let (store, _prefix) =
112 factory.parse_url_opts(location, &StorageOptions(storage_options.clone()))?;
113 let _ = ctx
114 .runtime_env()
115 .register_object_store(location, Arc::new(store));
116
117 Ok(())
118 } else {
119 Err(DeltaTableError::InvalidTableLocation(
120 location.clone().into(),
121 ))
122 }
123 }
124}