aqueducts_utils/
lib.rs

1/// custom serde
2pub mod serde {
3    use serde::{Deserialize, Deserializer};
4    use std::path::Path;
5    use url::{ParseError, Url};
6
7    /// try to deserialize URL
8    /// if URL deserialization fails due to it being a relative path this function will fallback to using the `std::path` API to create a canonical representation of the given path and then parse as a URL
9    pub fn deserialize_file_location<'de, D>(deserializer: D) -> core::result::Result<Url, D::Error>
10    where
11        D: Deserializer<'de>,
12    {
13        let buf = String::deserialize(deserializer)?;
14
15        let url = match Url::parse(buf.as_str()) {
16            Err(ParseError::RelativeUrlWithoutBase)
17            | Err(ParseError::RelativeUrlWithCannotBeABaseBase)
18                if buf.ends_with('/') || buf.ends_with('\\') =>
19            {
20                let canonicalized = Path::new(buf.as_str()).canonicalize().map_err(|e| {
21                    serde::de::Error::custom(format!("error canonicalizing dir path: {e:?}"))
22                })?;
23
24                let url = Url::from_directory_path(canonicalized).map_err(|e| {
25                    serde::de::Error::custom(format!("error parsing directory path as url: {e:?}"))
26                })?;
27
28                Ok(url)
29            }
30            Err(ParseError::RelativeUrlWithoutBase)
31            | Err(ParseError::RelativeUrlWithCannotBeABaseBase) => {
32                let (path, file_name) = buf
33                    .contains('/')
34                    .then_some('/')
35                    .or(buf.contains('\\').then_some('\\'))
36                    .and_then(|split_char| buf.as_str().rsplit_once(split_char))
37                    .ok_or_else(|| {
38                        serde::de::Error::custom(
39                            "relative paths cannot only contain the file name".to_string(),
40                        )
41                    })?;
42
43                // file might not exist in the output case
44                let canonicalized = {
45                    let mut path = Path::new(path).canonicalize().map_err(|e| {
46                        serde::de::Error::custom(format!(
47                            "error canonicalizing file path '{buf}': {e:?}"
48                        ))
49                    })?;
50
51                    path.push(file_name);
52                    path
53                };
54
55                let url = Url::from_file_path(canonicalized).map_err(|e| {
56                    serde::de::Error::custom(format!(
57                        "error parsing file path as url '{buf}': {e:?}"
58                    ))
59                })?;
60
61                Ok(url)
62            }
63            Err(err) => Err(serde::de::Error::custom(format!(
64                "error parsing location: {err:?}"
65            ))),
66            Ok(url) => Ok(url),
67        }?;
68
69        Ok(url)
70    }
71}
72
73/// object store handlers
74pub mod store {
75    use deltalake::{
76        datafusion::prelude::SessionContext, storage::StorageOptions, DeltaTableError,
77    };
78    use std::{collections::HashMap, sync::Arc};
79    use url::Url;
80
81    /// register deltalake object store handlers
82    #[allow(dead_code)]
83    pub fn register_handlers() {
84        #[cfg(feature = "s3")]
85        {
86            deltalake::aws::register_handlers(None);
87        }
88
89        #[cfg(feature = "gcs")]
90        {
91            deltalake::gcp::register_handlers(None);
92        }
93
94        #[cfg(feature = "azure")]
95        {
96            deltalake::azure::register_handlers(None);
97        }
98    }
99
100    pub fn register_object_store(
101        ctx: Arc<SessionContext>,
102        location: &Url,
103        storage_options: &HashMap<String, String>,
104    ) -> Result<(), DeltaTableError> {
105        if location.scheme() == "file" || location.scheme() == "memory" {
106            return Ok(());
107        }
108
109        let scheme = Url::parse(&format!("{}://", location.scheme())).unwrap();
110        if let Some(factory) = deltalake::storage::factories().get(&scheme) {
111            let (store, _prefix) =
112                factory.parse_url_opts(location, &StorageOptions(storage_options.clone()))?;
113            let _ = ctx
114                .runtime_env()
115                .register_object_store(location, Arc::new(store));
116
117            Ok(())
118        } else {
119            Err(DeltaTableError::InvalidTableLocation(
120                location.clone().into(),
121            ))
122        }
123    }
124}