deltalake_catalog_unity/
lib.rs

1#![warn(clippy::all)]
2#![warn(rust_2018_idioms)]
3//! Databricks Unity Catalog.
4#[cfg(not(any(feature = "aws", feature = "azure", feature = "gcp", feature = "r2")))]
5compile_error!(
6    "At least one of the following crate features `aws`, `azure`, `gcp`, or `r2` must be enabled \
7    for this crate to function properly."
8);
9
10use deltalake_core::logstore::{
11    default_logstore, logstore_factories, object_store::RetryConfig, LogStore, LogStoreFactory,
12    StorageConfig,
13};
14use reqwest::header::{HeaderValue, InvalidHeaderValue, AUTHORIZATION};
15use reqwest::Url;
16use std::collections::HashMap;
17use std::future::Future;
18use std::str::FromStr;
19use std::sync::Arc;
20
21use crate::credential::{
22    AzureCliCredential, ClientSecretOAuthProvider, CredentialProvider, WorkspaceOAuthProvider,
23};
24use crate::models::{
25    ErrorResponse, GetSchemaResponse, GetTableResponse, ListCatalogsResponse, ListSchemasResponse,
26    ListTableSummariesResponse, TableTempCredentialsResponse, TemporaryTableCredentialsRequest,
27    TokenErrorResponse,
28};
29
30use deltalake_core::data_catalog::DataCatalogResult;
31use deltalake_core::{
32    ensure_table_uri, DataCatalog, DataCatalogError, DeltaResult, DeltaTableBuilder,
33    DeltaTableError, ObjectStoreError, Path,
34};
35
36use crate::client::retry::*;
37use deltalake_core::logstore::{
38    config::str_is_truthy, object_store_factories, ObjectStoreFactory, ObjectStoreRef,
39};
40pub mod client;
41pub mod credential;
42
43const STORE_NAME: &str = "UnityCatalogObjectStore";
44#[cfg(feature = "datafusion")]
45pub mod datafusion;
46pub mod models;
47pub mod prelude;
48
49/// Possible errors from the unity-catalog/tables API call
50#[derive(thiserror::Error, Debug)]
51pub enum UnityCatalogError {
52    #[error("GET request error: {source}")]
53    /// Error from reqwest library
54    RequestError {
55        /// The underlying reqwest_middleware::Error
56        #[from]
57        source: reqwest::Error,
58    },
59
60    #[error("Error in middleware: {source}")]
61    RequestMiddlewareError {
62        /// The underlying reqwest_middleware::Error
63        #[from]
64        source: reqwest_middleware::Error,
65    },
66
67    /// Request returned error response
68    #[error("Invalid table error: {error_code}: {message}")]
69    InvalidTable {
70        /// Error code
71        error_code: String,
72        /// Error description
73        message: String,
74    },
75
76    #[error("Invalid token for auth header: {header_error}")]
77    InvalidHeader {
78        #[from]
79        header_error: InvalidHeaderValue,
80    },
81
82    /// Invalid Table URI
83    #[error("Invalid Unity Catalog Table URI: {table_uri}")]
84    InvalidTableURI {
85        /// Table URI
86        table_uri: String,
87    },
88
89    /// Unknown configuration key
90    #[error("Missing configuration key: {0}")]
91    MissingConfiguration(String),
92
93    /// Unknown configuration key
94    #[error("Failed to get a credential from UnityCatalog client configuration.")]
95    MissingCredential,
96
97    /// Temporary Credentials Fetch Failure
98    #[error("Unable to get temporary credentials from Unity Catalog.")]
99    TemporaryCredentialsFetchFailure,
100
101    #[error("Azure CLI error: {message}")]
102    AzureCli {
103        /// Error description
104        message: String,
105    },
106
107    #[error("Missing or corrupted federated token file for WorkloadIdentity.")]
108    FederatedTokenFile,
109
110    #[cfg(feature = "datafusion")]
111    #[error("Datafusion error: {0}")]
112    DatafusionError(#[from] ::datafusion::common::DataFusionError),
113
114    /// Cannot initialize DynamoDbConfiguration due to some sort of threading issue
115    #[error("Unable to initialize Unity Catalog, potentially a threading issue")]
116    InitializationError,
117
118    /// A generic error from a source
119    #[error("An error occurred in catalog: {source}")]
120    Generic {
121        /// Error message
122        source: Box<dyn std::error::Error + Send + Sync + 'static>,
123    },
124
125    #[error("Non-200 returned on token acquisition: {0}")]
126    InvalidCredentials(TokenErrorResponse),
127}
128
129impl From<ErrorResponse> for UnityCatalogError {
130    fn from(value: ErrorResponse) -> Self {
131        UnityCatalogError::InvalidTable {
132            error_code: value.error_code,
133            message: value.message,
134        }
135    }
136}
137
138impl From<DataCatalogError> for UnityCatalogError {
139    fn from(value: DataCatalogError) -> Self {
140        UnityCatalogError::Generic {
141            source: Box::new(value),
142        }
143    }
144}
145
146impl From<UnityCatalogError> for DataCatalogError {
147    fn from(value: UnityCatalogError) -> Self {
148        DataCatalogError::Generic {
149            catalog: "Unity",
150            source: Box::new(value),
151        }
152    }
153}
154
155impl From<UnityCatalogError> for DeltaTableError {
156    fn from(value: UnityCatalogError) -> Self {
157        DeltaTableError::GenericError {
158            source: Box::new(value),
159        }
160    }
161}
162
163/// Configuration options for unity catalog client
164pub enum UnityCatalogConfigKey {
165    /// Url of a Databricks workspace
166    ///
167    /// Supported keys:
168    /// - `unity_workspace_url`
169    /// - `databricks_workspace_url`
170    /// - `workspace_url`
171    #[deprecated(since = "0.17.0", note = "Please use the DATABRICKS_HOST env variable")]
172    WorkspaceUrl,
173
174    /// Host of the Databricks workspace
175    Host,
176
177    /// Access token to authorize API requests
178    ///
179    /// Supported keys:
180    /// - `unity_access_token`
181    /// - `databricks_access_token`
182    /// - `access_token`
183    #[deprecated(
184        since = "0.17.0",
185        note = "Please use the DATABRICKS_TOKEN env variable"
186    )]
187    AccessToken,
188
189    /// Token to use for Databricks Unity
190    Token,
191
192    /// Service principal client id for authorizing requests
193    ///
194    /// Supported keys:
195    /// - `azure_client_id`
196    /// - `unity_client_id`
197    /// - `client_id`
198    ClientId,
199
200    /// Service principal client secret for authorizing requests
201    ///
202    /// Supported keys:
203    /// - `azure_client_secret`
204    /// - `unity_client_secret`
205    /// - `client_secret`
206    ClientSecret,
207
208    /// Authority (tenant) id used in oauth flows
209    ///
210    /// Supported keys:
211    /// - `azure_tenant_id`
212    /// - `unity_tenant_id`
213    /// - `tenant_id`
214    AuthorityId,
215
216    /// Authority host used in oauth flows
217    ///
218    /// Supported keys:
219    /// - `azure_authority_host`
220    /// - `unity_authority_host`
221    /// - `authority_host`
222    AuthorityHost,
223
224    /// Endpoint to request a imds managed identity token
225    ///
226    /// Supported keys:
227    /// - `azure_msi_endpoint`
228    /// - `azure_identity_endpoint`
229    /// - `identity_endpoint`
230    /// - `msi_endpoint`
231    MsiEndpoint,
232
233    /// Object id for use with managed identity authentication
234    ///
235    /// Supported keys:
236    /// - `azure_object_id`
237    /// - `object_id`
238    ObjectId,
239
240    /// Msi resource id for use with managed identity authentication
241    ///
242    /// Supported keys:
243    /// - `azure_msi_resource_id`
244    /// - `msi_resource_id`
245    MsiResourceId,
246
247    /// File containing token for Azure AD workload identity federation
248    ///
249    /// Supported keys:
250    /// - `azure_federated_token_file`
251    /// - `federated_token_file`
252    FederatedTokenFile,
253
254    /// Use azure cli for acquiring access token
255    ///
256    /// Supported keys:
257    /// - `azure_use_azure_cli`
258    /// - `use_azure_cli`
259    UseAzureCli,
260
261    /// Allow http url (e.g. http://localhost:8080/api/2.1/...)
262    /// Supported keys:
263    /// - `unity_allow_http_url`
264    AllowHttpUrl,
265}
266
267impl FromStr for UnityCatalogConfigKey {
268    type Err = DataCatalogError;
269
270    #[allow(deprecated)]
271    fn from_str(s: &str) -> Result<Self, Self::Err> {
272        match s {
273            "access_token"
274            | "unity_access_token"
275            | "databricks_access_token"
276            | "databricks_token" => Ok(UnityCatalogConfigKey::AccessToken),
277            "authority_host" | "unity_authority_host" | "databricks_authority_host" => {
278                Ok(UnityCatalogConfigKey::AuthorityHost)
279            }
280            "authority_id" | "unity_authority_id" | "databricks_authority_id" => {
281                Ok(UnityCatalogConfigKey::AuthorityId)
282            }
283            "client_id" | "unity_client_id" | "databricks_client_id" => {
284                Ok(UnityCatalogConfigKey::ClientId)
285            }
286            "client_secret" | "unity_client_secret" | "databricks_client_secret" => {
287                Ok(UnityCatalogConfigKey::ClientSecret)
288            }
289            "federated_token_file"
290            | "unity_federated_token_file"
291            | "databricks_federated_token_file" => Ok(UnityCatalogConfigKey::FederatedTokenFile),
292            "host" => Ok(UnityCatalogConfigKey::Host),
293            "msi_endpoint" | "unity_msi_endpoint" | "databricks_msi_endpoint" => {
294                Ok(UnityCatalogConfigKey::MsiEndpoint)
295            }
296            "msi_resource_id" | "unity_msi_resource_id" | "databricks_msi_resource_id" => {
297                Ok(UnityCatalogConfigKey::MsiResourceId)
298            }
299            "object_id" | "unity_object_id" | "databricks_object_id" => {
300                Ok(UnityCatalogConfigKey::ObjectId)
301            }
302            "token" => Ok(UnityCatalogConfigKey::Token),
303            "use_azure_cli" | "unity_use_azure_cli" | "databricks_use_azure_cli" => {
304                Ok(UnityCatalogConfigKey::UseAzureCli)
305            }
306            "workspace_url"
307            | "unity_workspace_url"
308            | "databricks_workspace_url"
309            | "databricks_host" => Ok(UnityCatalogConfigKey::WorkspaceUrl),
310            "allow_http_url" | "unity_allow_http_url" => Ok(UnityCatalogConfigKey::AllowHttpUrl),
311            _ => Err(DataCatalogError::UnknownConfigKey {
312                catalog: "unity",
313                key: s.to_string(),
314            }),
315        }
316    }
317}
318
319#[allow(deprecated)]
320impl AsRef<str> for UnityCatalogConfigKey {
321    fn as_ref(&self) -> &str {
322        match self {
323            UnityCatalogConfigKey::AccessToken => "unity_access_token",
324            UnityCatalogConfigKey::AllowHttpUrl => "unity_allow_http_url",
325            UnityCatalogConfigKey::AuthorityHost => "unity_authority_host",
326            UnityCatalogConfigKey::AuthorityId => "unity_authority_id",
327            UnityCatalogConfigKey::ClientId => "unity_client_id",
328            UnityCatalogConfigKey::ClientSecret => "unity_client_secret",
329            UnityCatalogConfigKey::FederatedTokenFile => "unity_federated_token_file",
330            UnityCatalogConfigKey::Host => "databricks_host",
331            UnityCatalogConfigKey::MsiEndpoint => "unity_msi_endpoint",
332            UnityCatalogConfigKey::MsiResourceId => "unity_msi_resource_id",
333            UnityCatalogConfigKey::ObjectId => "unity_object_id",
334            UnityCatalogConfigKey::UseAzureCli => "unity_use_azure_cli",
335            UnityCatalogConfigKey::Token => "databricks_token",
336            UnityCatalogConfigKey::WorkspaceUrl => "unity_workspace_url",
337        }
338    }
339}
340
341/// Builder for creating a UnityCatalogClient
342#[derive(Default)]
343pub struct UnityCatalogBuilder {
344    /// Url of a Databricks workspace
345    workspace_url: Option<String>,
346
347    /// Bearer token
348    bearer_token: Option<String>,
349
350    /// Client id
351    client_id: Option<String>,
352
353    /// Client secret
354    client_secret: Option<String>,
355
356    /// Tenant id
357    authority_id: Option<String>,
358
359    /// Authority host
360    authority_host: Option<String>,
361
362    /// Msi endpoint for acquiring managed identity token
363    msi_endpoint: Option<String>,
364
365    /// Object id for use with managed identity authentication
366    object_id: Option<String>,
367
368    /// Msi resource id for use with managed identity authentication
369    msi_resource_id: Option<String>,
370
371    /// File containing token for Azure AD workload identity federation
372    federated_token_file: Option<String>,
373
374    /// When set to true, azure cli has to be used for acquiring access token
375    use_azure_cli: bool,
376
377    /// When set to true, http will be allowed in the catalog url
378    allow_http_url: bool,
379
380    /// Retry config
381    retry_config: RetryConfig,
382
383    /// Options for the underlying http client
384    client_options: client::ClientOptions,
385}
386
387#[allow(deprecated)]
388impl UnityCatalogBuilder {
389    /// Create a new [`UnityCatalogBuilder`] with default values.
390    pub fn new() -> Self {
391        Default::default()
392    }
393
394    /// Set an option on the builder via a key - value pair.
395    pub fn try_with_option(
396        mut self,
397        key: impl AsRef<str>,
398        value: impl Into<String>,
399    ) -> DataCatalogResult<Self> {
400        match UnityCatalogConfigKey::from_str(key.as_ref())? {
401            UnityCatalogConfigKey::AccessToken => self.bearer_token = Some(value.into()),
402            UnityCatalogConfigKey::AllowHttpUrl => {
403                self.allow_http_url = str_is_truthy(&value.into())
404            }
405            UnityCatalogConfigKey::ClientId => self.client_id = Some(value.into()),
406            UnityCatalogConfigKey::ClientSecret => self.client_secret = Some(value.into()),
407            UnityCatalogConfigKey::AuthorityId => self.authority_id = Some(value.into()),
408            UnityCatalogConfigKey::AuthorityHost => self.authority_host = Some(value.into()),
409            UnityCatalogConfigKey::Host => self.workspace_url = Some(value.into()),
410            UnityCatalogConfigKey::MsiEndpoint => self.msi_endpoint = Some(value.into()),
411            UnityCatalogConfigKey::ObjectId => self.object_id = Some(value.into()),
412            UnityCatalogConfigKey::MsiResourceId => self.msi_resource_id = Some(value.into()),
413            UnityCatalogConfigKey::FederatedTokenFile => {
414                self.federated_token_file = Some(value.into())
415            }
416            UnityCatalogConfigKey::Token => self.bearer_token = Some(value.into()),
417            UnityCatalogConfigKey::UseAzureCli => self.use_azure_cli = str_is_truthy(&value.into()),
418            UnityCatalogConfigKey::WorkspaceUrl => self.workspace_url = Some(value.into()),
419        };
420        Ok(self)
421    }
422
423    /// Hydrate builder from key value pairs
424    pub fn try_with_options<I: IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>>(
425        mut self,
426        options: I,
427    ) -> DataCatalogResult<Self> {
428        for (key, value) in options {
429            self = self.try_with_option(key, value)?;
430        }
431        Ok(self)
432    }
433
434    /// Parse configuration from the environment.
435    ///
436    /// Environment keys prefixed with "UNITY_" or "DATABRICKS_" will be considered
437    pub fn from_env() -> Self {
438        let mut builder = Self::default();
439        for (os_key, os_value) in std::env::vars_os() {
440            if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) {
441                if key.starts_with("UNITY_") || key.starts_with("DATABRICKS_") {
442                    tracing::debug!("Found relevant env: {key}");
443                    if let Ok(config_key) =
444                        UnityCatalogConfigKey::from_str(&key.to_ascii_lowercase())
445                    {
446                        tracing::debug!("Trying: {key} with {value}");
447                        builder = builder.try_with_option(config_key, value).unwrap();
448                    }
449                }
450            }
451        }
452
453        builder
454    }
455
456    /// Set the URL of a Databricks workspace.
457    pub fn with_workspace_url(mut self, url: impl Into<String>) -> Self {
458        self.workspace_url = Some(url.into());
459        self
460    }
461
462    /// Sets the client id for use in client secret or k8s federated credential flow
463    pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
464        self.client_id = Some(client_id.into());
465        self
466    }
467
468    /// Sets the client secret for use in client secret flow
469    pub fn with_client_secret(mut self, client_secret: impl Into<String>) -> Self {
470        self.client_secret = Some(client_secret.into());
471        self
472    }
473
474    /// Sets the authority id for use service principal credential based authentication
475    pub fn with_authority_id(mut self, tenant_id: impl Into<String>) -> Self {
476        self.authority_id = Some(tenant_id.into());
477        self
478    }
479
480    /// Set a static bearer token to be used for authorizing requests
481    pub fn with_bearer_token(mut self, bearer_token: impl Into<String>) -> Self {
482        self.bearer_token = Some(bearer_token.into());
483        self
484    }
485
486    /// Set a personal access token (PAT) to be used for authorizing requests
487    pub fn with_access_token(self, access_token: impl Into<String>) -> Self {
488        self.with_bearer_token(access_token)
489    }
490
491    /// Sets the client options, overriding any already set
492    pub fn with_client_options(mut self, options: client::ClientOptions) -> Self {
493        self.client_options = options;
494        self
495    }
496
497    /// Sets the retry config, overriding any already set
498    pub fn with_retry_config(mut self, config: RetryConfig) -> Self {
499        self.retry_config = config;
500        self
501    }
502
503    fn execute_uc_future<F, T>(future: F) -> DeltaResult<T>
504    where
505        T: Send,
506        F: Future<Output = T> + Send,
507    {
508        match tokio::runtime::Handle::try_current() {
509            Ok(handle) => match handle.runtime_flavor() {
510                tokio::runtime::RuntimeFlavor::MultiThread => {
511                    Ok(tokio::task::block_in_place(move || handle.block_on(future)))
512                }
513                _ => {
514                    let mut cfg: Option<T> = None;
515                    std::thread::scope(|scope| {
516                        scope.spawn(|| {
517                            cfg = Some(handle.block_on(future));
518                        });
519                    });
520                    cfg.ok_or(DeltaTableError::ObjectStore {
521                        source: ObjectStoreError::Generic {
522                            store: STORE_NAME,
523                            source: Box::new(UnityCatalogError::InitializationError),
524                        },
525                    })
526                }
527            },
528            Err(_) => {
529                let runtime = tokio::runtime::Builder::new_current_thread()
530                    .enable_all()
531                    .build()
532                    .expect("a tokio runtime is required by the Unity Catalog Builder");
533                Ok(runtime.block_on(future))
534            }
535        }
536    }
537
538    /// Returns the storage location and temporary token for the Unity Catalog table.
539    ///
540    /// If storage options are provided, they override environment variables for authentication.
541    pub async fn get_uc_location_and_token(
542        table_uri: &str,
543        storage_options: Option<&HashMap<String, String>>,
544    ) -> Result<(String, HashMap<String, String>), UnityCatalogError> {
545        let uri_parts: Vec<&str> = table_uri[5..].split('.').collect();
546        if uri_parts.len() != 3 {
547            return Err(UnityCatalogError::InvalidTableURI {
548                table_uri: table_uri.to_string(),
549            });
550        }
551
552        let catalog_id = uri_parts[0];
553        let database_name = uri_parts[1];
554        let table_name = uri_parts[2];
555
556        let unity_catalog = if let Some(options) = storage_options {
557            let mut builder = UnityCatalogBuilder::from_env();
558            builder =
559                builder.try_with_options(options.iter().map(|(k, v)| (k.as_str(), v.as_str())))?;
560            builder.build()?
561        } else {
562            UnityCatalogBuilder::from_env().build()?
563        };
564
565        let storage_location = unity_catalog
566            .get_table_storage_location(Some(catalog_id.to_string()), database_name, table_name)
567            .await?;
568        let temp_creds_res = unity_catalog
569            .get_temp_table_credentials(catalog_id, database_name, table_name)
570            .await?;
571        let credentials = match temp_creds_res {
572            TableTempCredentialsResponse::Success(temp_creds) => temp_creds
573                .get_credentials()
574                .ok_or_else(|| UnityCatalogError::MissingCredential)?,
575            TableTempCredentialsResponse::Error(_error) => {
576                return Err(UnityCatalogError::TemporaryCredentialsFetchFailure)
577            }
578        };
579        Ok((storage_location, credentials))
580    }
581
582    fn get_credential_provider(&self) -> Option<CredentialProvider> {
583        if let Some(token) = self.bearer_token.as_ref() {
584            return Some(CredentialProvider::BearerToken(token.clone()));
585        }
586
587        if let (Some(client_id), Some(client_secret), Some(workspace_host)) =
588            (&self.client_id, &self.client_secret, &self.workspace_url)
589        {
590            return Some(CredentialProvider::TokenCredential(
591                Default::default(),
592                Box::new(WorkspaceOAuthProvider::new(
593                    client_id,
594                    client_secret,
595                    workspace_host,
596                )),
597            ));
598        }
599
600        if let (Some(client_id), Some(client_secret), Some(authority_id)) = (
601            self.client_id.as_ref(),
602            self.client_secret.as_ref(),
603            self.authority_id.as_ref(),
604        ) {
605            return Some(CredentialProvider::TokenCredential(
606                Default::default(),
607                Box::new(ClientSecretOAuthProvider::new(
608                    client_id,
609                    client_secret,
610                    authority_id,
611                    self.authority_host.as_ref(),
612                )),
613            ));
614        }
615        if self.use_azure_cli {
616            return Some(CredentialProvider::TokenCredential(
617                Default::default(),
618                Box::new(AzureCliCredential::new()),
619            ));
620        }
621
622        None
623    }
624
625    /// Build an instance of [`UnityCatalog`]
626    pub fn build(self) -> DataCatalogResult<UnityCatalog> {
627        let credential = self
628            .get_credential_provider()
629            .ok_or(UnityCatalogError::MissingCredential)?;
630
631        let workspace_url = self
632            .workspace_url
633            .ok_or(UnityCatalogError::MissingConfiguration(
634                "workspace_url".into(),
635            ))?
636            .trim_end_matches('/')
637            .to_string();
638
639        let client_options = if self.allow_http_url {
640            self.client_options.with_allow_http(true)
641        } else {
642            self.client_options
643        };
644        let client = client_options.client()?;
645
646        Ok(UnityCatalog {
647            client,
648            workspace_url,
649            credential,
650        })
651    }
652}
653
654/// Databricks Unity Catalog
655pub struct UnityCatalog {
656    client: reqwest_middleware::ClientWithMiddleware,
657    credential: CredentialProvider,
658    workspace_url: String,
659}
660
661impl UnityCatalog {
662    async fn get_credential(&self) -> Result<HeaderValue, UnityCatalogError> {
663        match &self.credential {
664            CredentialProvider::BearerToken(token) => {
665                // we do the conversion to a HeaderValue here, since it is fallible,
666                // and we want to use it in an infallible function
667                Ok(HeaderValue::from_str(&format!("Bearer {token}"))?)
668            }
669            CredentialProvider::TokenCredential(cache, cred) => {
670                let token = cache
671                    .get_or_insert_with(|| cred.fetch_token(&self.client))
672                    .await?;
673
674                // we do the conversion to a HeaderValue here, since it is fallible,
675                // and we want to use it in an infallible function
676                Ok(HeaderValue::from_str(&format!("Bearer {token}"))?)
677            }
678        }
679    }
680
681    fn catalog_url(&self) -> String {
682        format!("{}/api/2.1/unity-catalog", self.workspace_url)
683    }
684
685    /// Gets an array of catalogs in the metastore. If the caller is the metastore admin,
686    /// all catalogs will be retrieved. Otherwise, only catalogs owned by the caller
687    /// (or for which the caller has the USE_CATALOG privilege) will be retrieved.
688    /// There is no guarantee of a specific ordering of the elements in the array.
689    pub async fn list_catalogs(&self) -> Result<ListCatalogsResponse, UnityCatalogError> {
690        let token = self.get_credential().await?;
691        // https://docs.databricks.com/api-explorer/workspace/schemas/list
692        let resp = self
693            .client
694            .get(format!("{}/catalogs", self.catalog_url()))
695            .header(AUTHORIZATION, token)
696            .send()
697            .await?;
698        Ok(resp.json().await?)
699    }
700
701    /// List all schemas for a catalog in the metastore.
702    ///
703    /// If the caller is the metastore admin or the owner of the parent catalog, all schemas
704    /// for the catalog will be retrieved. Otherwise, only schemas owned by the caller
705    /// (or for which the caller has the USE_SCHEMA privilege) will be retrieved.
706    /// There is no guarantee of a specific ordering of the elements in the array.
707    ///
708    /// # Parameters
709    /// - catalog_name: Parent catalog for schemas of interest.
710    pub async fn list_schemas(
711        &self,
712        catalog_name: impl AsRef<str>,
713    ) -> Result<ListSchemasResponse, UnityCatalogError> {
714        let token = self.get_credential().await?;
715        // https://docs.databricks.com/api-explorer/workspace/schemas/list
716        let resp = self
717            .client
718            .get(format!("{}/schemas", self.catalog_url()))
719            .header(AUTHORIZATION, token)
720            .query(&[("catalog_name", catalog_name.as_ref())])
721            .send()
722            .await?;
723        Ok(resp.json().await?)
724    }
725
726    /// Gets the specified schema within the metastore.#
727    ///
728    /// The caller must be a metastore admin, the owner of the schema,
729    /// or a user that has the USE_SCHEMA privilege on the schema.
730    pub async fn get_schema(
731        &self,
732        catalog_name: impl AsRef<str>,
733        schema_name: impl AsRef<str>,
734    ) -> Result<GetSchemaResponse, UnityCatalogError> {
735        let token = self.get_credential().await?;
736        // https://docs.databricks.com/api-explorer/workspace/schemas/get
737        let resp = self
738            .client
739            .get(format!(
740                "{}/schemas/{}.{}",
741                self.catalog_url(),
742                catalog_name.as_ref(),
743                schema_name.as_ref()
744            ))
745            .header(AUTHORIZATION, token)
746            .send()
747            .await?;
748        Ok(resp.json().await?)
749    }
750
751    /// Gets an array of summaries for tables for a schema and catalog within the metastore.
752    ///
753    /// The table summaries returned are either:
754    /// - summaries for all tables (within the current metastore and parent catalog and schema),
755    ///   when the user is a metastore admin, or:
756    /// - summaries for all tables and schemas (within the current metastore and parent catalog)
757    ///   for which the user has ownership or the SELECT privilege on the table and ownership or
758    ///   USE_SCHEMA privilege on the schema, provided that the user also has ownership or the
759    ///   USE_CATALOG privilege on the parent catalog.
760    ///
761    /// There is no guarantee of a specific ordering of the elements in the array.
762    pub async fn list_table_summaries(
763        &self,
764        catalog_name: impl AsRef<str>,
765        schema_name_pattern: impl AsRef<str>,
766    ) -> Result<ListTableSummariesResponse, UnityCatalogError> {
767        let token = self.get_credential().await?;
768        // https://docs.databricks.com/api-explorer/workspace/tables/listsummaries
769        let resp = self
770            .client
771            .get(format!("{}/table-summaries", self.catalog_url()))
772            .query(&[
773                ("catalog_name", catalog_name.as_ref()),
774                ("schema_name_pattern", schema_name_pattern.as_ref()),
775            ])
776            .header(AUTHORIZATION, token)
777            .send()
778            .await?;
779
780        Ok(resp.json().await?)
781    }
782
783    /// Gets a table from the metastore for a specific catalog and schema.
784    ///
785    /// The caller must be a metastore admin, be the owner of the table and have the
786    /// USE_CATALOG privilege on the parent catalog and the USE_SCHEMA privilege on
787    /// the parent schema, or be the owner of the table and have the SELECT privilege on it as well.
788    ///
789    /// # Parameters
790    pub async fn get_table(
791        &self,
792        catalog_id: impl AsRef<str>,
793        database_name: impl AsRef<str>,
794        table_name: impl AsRef<str>,
795    ) -> Result<GetTableResponse, UnityCatalogError> {
796        let token = self.get_credential().await?;
797        // https://docs.databricks.com/api-explorer/workspace/tables/get
798        let resp = self
799            .client
800            .get(format!(
801                "{}/tables/{}.{}.{}",
802                self.catalog_url(),
803                catalog_id.as_ref(),
804                database_name.as_ref(),
805                table_name.as_ref(),
806            ))
807            .header(AUTHORIZATION, token)
808            .send()
809            .await?;
810
811        Ok(resp.json().await?)
812    }
813
814    pub async fn get_temp_table_credentials(
815        &self,
816        catalog_id: impl AsRef<str>,
817        database_name: impl AsRef<str>,
818        table_name: impl AsRef<str>,
819    ) -> Result<TableTempCredentialsResponse, UnityCatalogError> {
820        let token = self.get_credential().await?;
821        let table_info = self
822            .get_table(catalog_id, database_name, table_name)
823            .await?;
824        let response = match table_info {
825            GetTableResponse::Success(table) => {
826                let request = TemporaryTableCredentialsRequest::new(&table.table_id, "READ");
827                Ok(self
828                    .client
829                    .post(format!(
830                        "{}/temporary-table-credentials",
831                        self.catalog_url()
832                    ))
833                    .header(AUTHORIZATION, token)
834                    .json(&request)
835                    .send()
836                    .await?)
837            }
838            GetTableResponse::Error(err) => Err(UnityCatalogError::InvalidTable {
839                error_code: err.error_code,
840                message: err.message,
841            }),
842        }?;
843
844        Ok(response.json().await?)
845    }
846}
847
848#[derive(Clone, Default, Debug)]
849pub struct UnityCatalogFactory {}
850
851impl ObjectStoreFactory for UnityCatalogFactory {
852    fn parse_url_opts(
853        &self,
854        table_uri: &Url,
855        config: &StorageConfig,
856    ) -> DeltaResult<(ObjectStoreRef, Path)> {
857        let (table_path, temp_creds) = UnityCatalogBuilder::execute_uc_future(
858            UnityCatalogBuilder::get_uc_location_and_token(table_uri.as_str(), Some(&config.raw)),
859        )??;
860
861        let mut storage_options = config.raw.clone();
862        storage_options.extend(temp_creds);
863
864        // TODO(roeap): we should not have to go through the table here.
865        // ideally we just create the right storage ...
866        let table_url = ensure_table_uri(&table_path)?;
867        let mut builder = DeltaTableBuilder::from_uri(table_url)?;
868
869        if let Some(runtime) = &config.runtime {
870            builder = builder.with_io_runtime(runtime.clone());
871        }
872
873        if !storage_options.is_empty() {
874            builder = builder.with_storage_options(storage_options.clone());
875        }
876        let prefix = Path::parse(table_uri.path())?;
877        let store = builder.build_storage()?.object_store(None);
878
879        Ok((store, prefix))
880    }
881}
882
883impl LogStoreFactory for UnityCatalogFactory {
884    fn with_options(
885        &self,
886        prefixed_store: ObjectStoreRef,
887        root_store: ObjectStoreRef,
888        location: &Url,
889        options: &StorageConfig,
890    ) -> DeltaResult<Arc<dyn LogStore>> {
891        Ok(default_logstore(
892            prefixed_store,
893            root_store,
894            location,
895            options,
896        ))
897    }
898}
899
900/// Register an [ObjectStoreFactory] for common UnityCatalogFactory [Url] schemes
901pub fn register_handlers(_additional_prefixes: Option<Url>) {
902    let factory = Arc::new(UnityCatalogFactory::default());
903    let url = Url::parse("uc://").unwrap();
904    object_store_factories().insert(url.clone(), factory.clone());
905    logstore_factories().insert(url.clone(), factory.clone());
906}
907
908#[async_trait::async_trait]
909impl DataCatalog for UnityCatalog {
910    type Error = UnityCatalogError;
911    /// Get the table storage location from the UnityCatalog
912    async fn get_table_storage_location(
913        &self,
914        catalog_id: Option<String>,
915        database_name: &str,
916        table_name: &str,
917    ) -> Result<String, UnityCatalogError> {
918        match self
919            .get_table(
920                catalog_id.unwrap_or("main".into()),
921                database_name,
922                table_name,
923            )
924            .await?
925        {
926            GetTableResponse::Success(table) => Ok(table.storage_location),
927            GetTableResponse::Error(err) => Err(UnityCatalogError::InvalidTable {
928                error_code: err.error_code,
929                message: err.message,
930            }),
931        }
932    }
933}
934
935impl std::fmt::Debug for UnityCatalog {
936    fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
937        write!(fmt, "UnityCatalog")
938    }
939}
940
941#[cfg(test)]
942mod tests {
943    use crate::client::ClientOptions;
944    use crate::models::tests::{GET_SCHEMA_RESPONSE, GET_TABLE_RESPONSE, LIST_SCHEMAS_RESPONSE};
945    use crate::models::*;
946    use crate::UnityCatalogBuilder;
947    use deltalake_core::DataCatalog;
948    use httpmock::prelude::*;
949    use std::collections::HashMap;
950
951    #[tokio::test]
952    async fn test_unity_client() {
953        let server = MockServer::start_async().await;
954
955        let options = ClientOptions::default().with_allow_http(true);
956
957        let client = UnityCatalogBuilder::new()
958            .with_workspace_url(server.url(""))
959            .with_bearer_token("bearer_token")
960            .with_client_options(options)
961            .build()
962            .unwrap();
963
964        server
965            .mock_async(|when, then| {
966                when.path("/api/2.1/unity-catalog/schemas").method("GET");
967                then.body(LIST_SCHEMAS_RESPONSE);
968            })
969            .await;
970
971        server
972            .mock_async(|when, then| {
973                when.path("/api/2.1/unity-catalog/schemas/catalog_name.schema_name")
974                    .method("GET");
975                then.body(GET_SCHEMA_RESPONSE);
976            })
977            .await;
978
979        server
980            .mock_async(|when, then| {
981                when.path("/api/2.1/unity-catalog/tables/catalog_name.schema_name.table_name")
982                    .method("GET");
983                then.body(GET_TABLE_RESPONSE);
984            })
985            .await;
986
987        let list_schemas_response = client.list_schemas("catalog_name").await.unwrap();
988        assert!(matches!(
989            list_schemas_response,
990            ListSchemasResponse::Success { .. }
991        ));
992
993        let get_schema_response = client
994            .get_schema("catalog_name", "schema_name")
995            .await
996            .unwrap();
997        assert!(matches!(get_schema_response, GetSchemaResponse::Success(_)));
998
999        let get_table_response = client
1000            .get_table("catalog_name", "schema_name", "table_name")
1001            .await;
1002        assert!(matches!(
1003            get_table_response.unwrap(),
1004            GetTableResponse::Success(_)
1005        ));
1006
1007        let storage_location = client
1008            .get_table_storage_location(
1009                Some("catalog_name".to_string()),
1010                "schema_name",
1011                "table_name",
1012            )
1013            .await
1014            .unwrap();
1015        assert!(storage_location.eq_ignore_ascii_case("string"));
1016    }
1017
1018    #[test]
1019    fn test_unitycatalogbuilder_with_storage_options() {
1020        let mut storage_options = HashMap::new();
1021        storage_options.insert(
1022            "databricks_host".to_string(),
1023            "https://test.databricks.com".to_string(),
1024        );
1025        storage_options.insert("databricks_token".to_string(), "test_token".to_string());
1026
1027        let builder = UnityCatalogBuilder::new()
1028            .try_with_options(&storage_options)
1029            .unwrap();
1030
1031        assert_eq!(
1032            builder.workspace_url,
1033            Some("https://test.databricks.com".to_string())
1034        );
1035        assert_eq!(builder.bearer_token, Some("test_token".to_string()));
1036    }
1037
1038    #[test]
1039    fn test_unitycatalogbuilder_client_credentials() {
1040        let mut storage_options = HashMap::new();
1041        storage_options.insert(
1042            "databricks_host".to_string(),
1043            "https://test.databricks.com".to_string(),
1044        );
1045        storage_options.insert("unity_client_id".to_string(), "test_client_id".to_string());
1046        storage_options.insert("unity_client_secret".to_string(), "test_secret".to_string());
1047        storage_options.insert("unity_authority_id".to_string(), "test_tenant".to_string());
1048
1049        let builder = UnityCatalogBuilder::new()
1050            .try_with_options(&storage_options)
1051            .unwrap();
1052
1053        assert_eq!(
1054            builder.workspace_url,
1055            Some("https://test.databricks.com".to_string())
1056        );
1057        assert_eq!(builder.client_id, Some("test_client_id".to_string()));
1058        assert_eq!(builder.client_secret, Some("test_secret".to_string()));
1059        assert_eq!(builder.authority_id, Some("test_tenant".to_string()));
1060    }
1061
1062    #[test]
1063    fn test_env_with_storage_options_override() {
1064        std::env::set_var("DATABRICKS_HOST", "https://env.databricks.com");
1065        std::env::set_var("DATABRICKS_TOKEN", "env_token");
1066
1067        let mut storage_options = HashMap::new();
1068        storage_options.insert(
1069            "databricks_host".to_string(),
1070            "https://override.databricks.com".to_string(),
1071        );
1072
1073        let builder = UnityCatalogBuilder::from_env()
1074            .try_with_options(&storage_options)
1075            .unwrap();
1076
1077        assert_eq!(
1078            builder.workspace_url,
1079            Some("https://override.databricks.com".to_string())
1080        );
1081        assert_eq!(builder.bearer_token, Some("env_token".to_string()));
1082
1083        std::env::remove_var("DATABRICKS_HOST");
1084        std::env::remove_var("DATABRICKS_TOKEN");
1085    }
1086
1087    #[test]
1088    fn test_storage_options_key_variations() {
1089        let test_cases = vec![
1090            ("databricks_host", "workspace_url"),
1091            ("unity_workspace_url", "workspace_url"),
1092            ("databricks_workspace_url", "workspace_url"),
1093            ("databricks_token", "bearer_token"),
1094            ("token", "bearer_token"),
1095            ("unity_client_id", "client_id"),
1096            ("databricks_client_id", "client_id"),
1097            ("client_id", "client_id"),
1098        ];
1099
1100        for (key, field) in test_cases {
1101            let mut storage_options = HashMap::new();
1102            let test_value = format!("test_value_for_{}", key);
1103            storage_options.insert(key.to_string(), test_value.clone());
1104
1105            let result = UnityCatalogBuilder::new().try_with_options(&storage_options);
1106            assert!(result.is_ok(), "Failed to parse key: {}", key);
1107
1108            let builder = result.unwrap();
1109            match field {
1110                "workspace_url" => assert_eq!(builder.workspace_url, Some(test_value)),
1111                "bearer_token" => assert_eq!(builder.bearer_token, Some(test_value)),
1112                "client_id" => assert_eq!(builder.client_id, Some(test_value)),
1113                _ => {}
1114            }
1115        }
1116    }
1117
1118    #[test]
1119    fn test_invalid_config_key() {
1120        let mut storage_options = HashMap::new();
1121        storage_options.insert("invalid_key".to_string(), "test_value".to_string());
1122
1123        let result = UnityCatalogBuilder::new().try_with_options(&storage_options);
1124        assert!(result.is_err());
1125    }
1126
1127    #[test]
1128    fn test_boolean_options() {
1129        let test_cases = vec![
1130            ("true", true),
1131            ("false", false),
1132            ("1", true),
1133            ("0", false),
1134            ("yes", true),
1135            ("no", false),
1136        ];
1137
1138        for (value, expected) in test_cases {
1139            let mut storage_options = HashMap::new();
1140            storage_options.insert("unity_allow_http_url".to_string(), value.to_string());
1141            storage_options.insert("unity_use_azure_cli".to_string(), value.to_string());
1142
1143            let builder = UnityCatalogBuilder::new()
1144                .try_with_options(&storage_options)
1145                .unwrap();
1146
1147            assert_eq!(
1148                builder.allow_http_url, expected,
1149                "Failed for value: {}",
1150                value
1151            );
1152            assert_eq!(
1153                builder.use_azure_cli, expected,
1154                "Failed for value: {}",
1155                value
1156            );
1157        }
1158    }
1159
1160    #[tokio::test]
1161    async fn test_invalid_table_uri() {
1162        let test_cases = vec![
1163            "uc://invalid",
1164            "uc://",
1165            "uc://catalog",
1166            "uc://catalog.schema",
1167            "uc://catalog.schema.table.extra",
1168            "invalid://catalog.schema.table",
1169        ];
1170
1171        for uri in test_cases {
1172            let result = UnityCatalogBuilder::get_uc_location_and_token(uri, None).await;
1173            assert!(result.is_err(), "Expected error for URI: {}", uri);
1174
1175            if let Err(e) = result {
1176                if uri.starts_with("uc://") && uri.len() > 5 {
1177                    assert!(matches!(
1178                        e,
1179                        crate::UnityCatalogError::InvalidTableURI { .. }
1180                    ));
1181                }
1182            }
1183        }
1184    }
1185}