Skip to main content

deltalake_catalog_unity/
lib.rs

1#![warn(clippy::all)]
2#![warn(rust_2018_idioms)]
3//! Databricks Unity Catalog.
4#[cfg(not(any(feature = "aws", feature = "azure", feature = "gcp", feature = "r2")))]
5compile_error!(
6    "At least one of the following crate features `aws`, `azure`, `gcp`, or `r2` must be enabled \
7    for this crate to function properly."
8);
9
10use deltalake_core::logstore::{
11    LogStore, LogStoreFactory, StorageConfig, default_logstore, logstore_factories,
12    object_store::RetryConfig,
13};
14use reqwest::Url;
15use reqwest::header::{AUTHORIZATION, HeaderValue, InvalidHeaderValue};
16use std::collections::HashMap;
17use std::future::Future;
18use std::str::FromStr;
19use std::sync::Arc;
20use typed_builder::TypedBuilder;
21
22use crate::credential::{
23    AzureCliCredential, ClientSecretOAuthProvider, CredentialProvider, WorkspaceOAuthProvider,
24};
25use crate::models::{
26    ErrorResponse, GetSchemaResponse, GetTableResponse, ListCatalogsResponse, ListSchemasResponse,
27    ListTableSummariesResponse, TableTempCredentialsResponse, TemporaryTableCredentialsRequest,
28    TokenErrorResponse,
29};
30
31use deltalake_core::data_catalog::DataCatalogResult;
32use deltalake_core::{
33    DataCatalog, DataCatalogError, DeltaResult, DeltaTableBuilder, DeltaTableError,
34    ObjectStoreError, Path, ensure_table_uri,
35};
36
37use crate::client::retry::*;
38use deltalake_core::logstore::{
39    ObjectStoreFactory, ObjectStoreRef, config::str_is_truthy, object_store_factories,
40};
41pub mod client;
42pub mod credential;
43
44const STORE_NAME: &str = "UnityCatalogObjectStore";
45#[cfg(feature = "datafusion")]
46pub mod datafusion;
47pub mod models;
48pub mod prelude;
49
50/// Possible errors from the unity-catalog/tables API call
51#[derive(thiserror::Error, Debug)]
52pub enum UnityCatalogError {
53    #[error("GET request error: {source}")]
54    /// Error from reqwest library
55    RequestError {
56        /// The underlying reqwest_middleware::Error
57        #[from]
58        source: reqwest::Error,
59    },
60
61    #[error("Error in middleware: {source}")]
62    RequestMiddlewareError {
63        /// The underlying reqwest_middleware::Error
64        #[from]
65        source: reqwest_middleware::Error,
66    },
67
68    /// Request returned error response
69    #[error("Invalid table error: {error_code}: {message}")]
70    InvalidTable {
71        /// Error code
72        error_code: String,
73        /// Error description
74        message: String,
75    },
76
77    #[error("Invalid token for auth header: {header_error}")]
78    InvalidHeader {
79        #[from]
80        header_error: InvalidHeaderValue,
81    },
82
83    /// Invalid Table URI
84    #[error("Invalid Unity Catalog Table URI: {table_uri}")]
85    InvalidTableURI {
86        /// Table URI
87        table_uri: String,
88    },
89
90    /// Unknown configuration key
91    #[error("Missing configuration key: {0}")]
92    MissingConfiguration(String),
93
94    /// Unknown configuration key
95    #[error("Failed to get a credential from UnityCatalog client configuration.")]
96    MissingCredential,
97
98    /// Temporary Credentials Fetch Failure
99    #[error("Unable to get temporary credentials from Unity Catalog: {error_code}: {message}")]
100    TemporaryCredentialsFetchFailure {
101        /// Error code from Unity Catalog
102        error_code: String,
103        /// Error message from Unity Catalog
104        message: String,
105    },
106
107    #[error("Azure CLI error: {message}")]
108    AzureCli {
109        /// Error description
110        message: String,
111    },
112
113    #[error("Missing or corrupted federated token file for WorkloadIdentity.")]
114    FederatedTokenFile,
115
116    #[cfg(feature = "datafusion")]
117    #[error("Datafusion error: {0}")]
118    DatafusionError(#[from] ::datafusion::common::DataFusionError),
119
120    /// Cannot initialize DynamoDbConfiguration due to some sort of threading issue
121    #[error("Unable to initialize Unity Catalog, potentially a threading issue")]
122    InitializationError,
123
124    /// A generic error from a source
125    #[error("An error occurred in catalog: {source}")]
126    Generic {
127        /// Error message
128        source: Box<dyn std::error::Error + Send + Sync + 'static>,
129    },
130
131    #[error("Non-200 returned on token acquisition: {0}")]
132    InvalidCredentials(TokenErrorResponse),
133}
134
135impl From<ErrorResponse> for UnityCatalogError {
136    fn from(value: ErrorResponse) -> Self {
137        UnityCatalogError::InvalidTable {
138            error_code: value.error_code,
139            message: value.message,
140        }
141    }
142}
143
144impl From<DataCatalogError> for UnityCatalogError {
145    fn from(value: DataCatalogError) -> Self {
146        UnityCatalogError::Generic {
147            source: Box::new(value),
148        }
149    }
150}
151
152impl From<UnityCatalogError> for DataCatalogError {
153    fn from(value: UnityCatalogError) -> Self {
154        DataCatalogError::Generic {
155            catalog: "Unity",
156            source: Box::new(value),
157        }
158    }
159}
160
161impl From<UnityCatalogError> for DeltaTableError {
162    fn from(value: UnityCatalogError) -> Self {
163        DeltaTableError::GenericError {
164            source: Box::new(value),
165        }
166    }
167}
168
169/// Configuration options for unity catalog client
170pub enum UnityCatalogConfigKey {
171    /// Url of a Databricks workspace
172    ///
173    /// Supported keys:
174    /// - `unity_workspace_url`
175    /// - `databricks_workspace_url`
176    /// - `workspace_url`
177    #[deprecated(since = "0.17.0", note = "Please use the DATABRICKS_HOST env variable")]
178    WorkspaceUrl,
179
180    /// Host of the Databricks workspace
181    Host,
182
183    /// Access token to authorize API requests
184    ///
185    /// Supported keys:
186    /// - `unity_access_token`
187    /// - `databricks_access_token`
188    /// - `access_token`
189    #[deprecated(
190        since = "0.17.0",
191        note = "Please use the DATABRICKS_TOKEN env variable"
192    )]
193    AccessToken,
194
195    /// Token to use for Databricks Unity
196    Token,
197
198    /// Service principal client id for authorizing requests
199    ///
200    /// Supported keys:
201    /// - `azure_client_id`
202    /// - `unity_client_id`
203    /// - `client_id`
204    ClientId,
205
206    /// Service principal client secret for authorizing requests
207    ///
208    /// Supported keys:
209    /// - `azure_client_secret`
210    /// - `unity_client_secret`
211    /// - `client_secret`
212    ClientSecret,
213
214    /// Authority (tenant) id used in oauth flows
215    ///
216    /// Supported keys:
217    /// - `azure_tenant_id`
218    /// - `unity_tenant_id`
219    /// - `tenant_id`
220    AuthorityId,
221
222    /// Authority host used in oauth flows
223    ///
224    /// Supported keys:
225    /// - `azure_authority_host`
226    /// - `unity_authority_host`
227    /// - `authority_host`
228    AuthorityHost,
229
230    /// Endpoint to request a imds managed identity token
231    ///
232    /// Supported keys:
233    /// - `azure_msi_endpoint`
234    /// - `azure_identity_endpoint`
235    /// - `identity_endpoint`
236    /// - `msi_endpoint`
237    MsiEndpoint,
238
239    /// Object id for use with managed identity authentication
240    ///
241    /// Supported keys:
242    /// - `azure_object_id`
243    /// - `object_id`
244    ObjectId,
245
246    /// Msi resource id for use with managed identity authentication
247    ///
248    /// Supported keys:
249    /// - `azure_msi_resource_id`
250    /// - `msi_resource_id`
251    MsiResourceId,
252
253    /// File containing token for Azure AD workload identity federation
254    ///
255    /// Supported keys:
256    /// - `azure_federated_token_file`
257    /// - `federated_token_file`
258    FederatedTokenFile,
259
260    /// Use azure cli for acquiring access token
261    ///
262    /// Supported keys:
263    /// - `azure_use_azure_cli`
264    /// - `use_azure_cli`
265    UseAzureCli,
266
267    /// Allow http url (e.g. http://localhost:8080/api/2.1/...)
268    /// Supported keys:
269    /// - `unity_allow_http_url`
270    AllowHttpUrl,
271}
272
273impl FromStr for UnityCatalogConfigKey {
274    type Err = DataCatalogError;
275
276    #[allow(deprecated)]
277    fn from_str(s: &str) -> Result<Self, Self::Err> {
278        match s {
279            "access_token"
280            | "unity_access_token"
281            | "databricks_access_token"
282            | "databricks_token" => Ok(UnityCatalogConfigKey::AccessToken),
283            "authority_host" | "unity_authority_host" | "databricks_authority_host" => {
284                Ok(UnityCatalogConfigKey::AuthorityHost)
285            }
286            "authority_id" | "unity_authority_id" | "databricks_authority_id" => {
287                Ok(UnityCatalogConfigKey::AuthorityId)
288            }
289            "client_id" | "unity_client_id" | "databricks_client_id" => {
290                Ok(UnityCatalogConfigKey::ClientId)
291            }
292            "client_secret" | "unity_client_secret" | "databricks_client_secret" => {
293                Ok(UnityCatalogConfigKey::ClientSecret)
294            }
295            "federated_token_file"
296            | "unity_federated_token_file"
297            | "databricks_federated_token_file" => Ok(UnityCatalogConfigKey::FederatedTokenFile),
298            "host" => Ok(UnityCatalogConfigKey::Host),
299            "msi_endpoint" | "unity_msi_endpoint" | "databricks_msi_endpoint" => {
300                Ok(UnityCatalogConfigKey::MsiEndpoint)
301            }
302            "msi_resource_id" | "unity_msi_resource_id" | "databricks_msi_resource_id" => {
303                Ok(UnityCatalogConfigKey::MsiResourceId)
304            }
305            "object_id" | "unity_object_id" | "databricks_object_id" => {
306                Ok(UnityCatalogConfigKey::ObjectId)
307            }
308            "token" => Ok(UnityCatalogConfigKey::Token),
309            "use_azure_cli" | "unity_use_azure_cli" | "databricks_use_azure_cli" => {
310                Ok(UnityCatalogConfigKey::UseAzureCli)
311            }
312            "workspace_url"
313            | "unity_workspace_url"
314            | "databricks_workspace_url"
315            | "databricks_host" => Ok(UnityCatalogConfigKey::WorkspaceUrl),
316            "allow_http_url" | "unity_allow_http_url" => Ok(UnityCatalogConfigKey::AllowHttpUrl),
317            _ => Err(DataCatalogError::UnknownConfigKey {
318                catalog: "unity",
319                key: s.to_string(),
320            }),
321        }
322    }
323}
324
325#[allow(deprecated)]
326impl AsRef<str> for UnityCatalogConfigKey {
327    fn as_ref(&self) -> &str {
328        match self {
329            UnityCatalogConfigKey::AccessToken => "unity_access_token",
330            UnityCatalogConfigKey::AllowHttpUrl => "unity_allow_http_url",
331            UnityCatalogConfigKey::AuthorityHost => "unity_authority_host",
332            UnityCatalogConfigKey::AuthorityId => "unity_authority_id",
333            UnityCatalogConfigKey::ClientId => "unity_client_id",
334            UnityCatalogConfigKey::ClientSecret => "unity_client_secret",
335            UnityCatalogConfigKey::FederatedTokenFile => "unity_federated_token_file",
336            UnityCatalogConfigKey::Host => "databricks_host",
337            UnityCatalogConfigKey::MsiEndpoint => "unity_msi_endpoint",
338            UnityCatalogConfigKey::MsiResourceId => "unity_msi_resource_id",
339            UnityCatalogConfigKey::ObjectId => "unity_object_id",
340            UnityCatalogConfigKey::UseAzureCli => "unity_use_azure_cli",
341            UnityCatalogConfigKey::Token => "databricks_token",
342            UnityCatalogConfigKey::WorkspaceUrl => "unity_workspace_url",
343        }
344    }
345}
346
347/// Builder for creating a UnityCatalogClient
348#[derive(TypedBuilder)]
349#[builder(doc)]
350pub struct UnityCatalogBuilder {
351    /// Url of a Databricks workspace
352    #[builder(default, setter(strip_option, into))]
353    workspace_url: Option<String>,
354
355    /// Bearer token
356    #[builder(default, setter(strip_option, into))]
357    bearer_token: Option<String>,
358
359    /// Client id
360    #[builder(default, setter(strip_option, into))]
361    client_id: Option<String>,
362
363    /// Client secret
364    #[builder(default, setter(strip_option, into))]
365    client_secret: Option<String>,
366
367    /// Tenant id
368    #[builder(default, setter(strip_option, into))]
369    authority_id: Option<String>,
370
371    /// Authority host
372    #[builder(default, setter(strip_option, into))]
373    authority_host: Option<String>,
374
375    /// Msi endpoint for acquiring managed identity token
376    #[builder(default, setter(strip_option, into))]
377    msi_endpoint: Option<String>,
378
379    /// Object id for use with managed identity authentication
380    #[builder(default, setter(strip_option, into))]
381    object_id: Option<String>,
382
383    /// Msi resource id for use with managed identity authentication
384    #[builder(default, setter(strip_option, into))]
385    msi_resource_id: Option<String>,
386
387    /// File containing token for Azure AD workload identity federation
388    #[builder(default, setter(strip_option, into))]
389    federated_token_file: Option<String>,
390
391    /// When set to true, azure cli has to be used for acquiring access token
392    #[builder(default)]
393    use_azure_cli: bool,
394
395    /// When set to true, http will be allowed in the catalog url
396    #[builder(default)]
397    allow_http_url: bool,
398
399    /// Retry config
400    #[builder(default)]
401    #[allow(dead_code)]
402    retry_config: RetryConfig,
403
404    /// Options for the underlying http client
405    #[builder(default)]
406    client_options: client::ClientOptions,
407}
408
409#[allow(deprecated)]
410impl UnityCatalogBuilder {
411    /// Set an option on the builder via a key - value pair.
412    pub fn try_with_option(
413        mut self,
414        key: impl AsRef<str>,
415        value: impl Into<String>,
416    ) -> DataCatalogResult<Self> {
417        match UnityCatalogConfigKey::from_str(key.as_ref())? {
418            UnityCatalogConfigKey::AccessToken => self.bearer_token = Some(value.into()),
419            UnityCatalogConfigKey::AllowHttpUrl => {
420                self.allow_http_url = str_is_truthy(&value.into())
421            }
422            UnityCatalogConfigKey::ClientId => self.client_id = Some(value.into()),
423            UnityCatalogConfigKey::ClientSecret => self.client_secret = Some(value.into()),
424            UnityCatalogConfigKey::AuthorityId => self.authority_id = Some(value.into()),
425            UnityCatalogConfigKey::AuthorityHost => self.authority_host = Some(value.into()),
426            UnityCatalogConfigKey::Host => self.workspace_url = Some(value.into()),
427            UnityCatalogConfigKey::MsiEndpoint => self.msi_endpoint = Some(value.into()),
428            UnityCatalogConfigKey::ObjectId => self.object_id = Some(value.into()),
429            UnityCatalogConfigKey::MsiResourceId => self.msi_resource_id = Some(value.into()),
430            UnityCatalogConfigKey::FederatedTokenFile => {
431                self.federated_token_file = Some(value.into())
432            }
433            UnityCatalogConfigKey::Token => self.bearer_token = Some(value.into()),
434            UnityCatalogConfigKey::UseAzureCli => self.use_azure_cli = str_is_truthy(&value.into()),
435            UnityCatalogConfigKey::WorkspaceUrl => self.workspace_url = Some(value.into()),
436        };
437        Ok(self)
438    }
439
440    /// Hydrate builder from key value pairs
441    pub fn try_with_options<I: IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>>(
442        mut self,
443        options: I,
444    ) -> DataCatalogResult<Self> {
445        for (key, value) in options {
446            self = self.try_with_option(key, value)?;
447        }
448        Ok(self)
449    }
450
451    /// Parse configuration from the environment.
452    ///
453    /// Environment keys prefixed with "UNITY_" or "DATABRICKS_" will be considered
454    pub fn from_env() -> Self {
455        let mut builder = Self::builder().build();
456        for (os_key, os_value) in std::env::vars_os() {
457            if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str())
458                && (key.starts_with("UNITY_") || key.starts_with("DATABRICKS_"))
459            {
460                tracing::debug!("Found relevant env: {key}");
461                if let Ok(config_key) = UnityCatalogConfigKey::from_str(&key.to_ascii_lowercase()) {
462                    tracing::debug!("Trying: {key} with {value}");
463                    builder = builder.try_with_option(config_key, value).unwrap();
464                }
465            }
466        }
467
468        builder
469    }
470
471    fn execute_uc_future<F, T>(future: F) -> DeltaResult<T>
472    where
473        T: Send,
474        F: Future<Output = T> + Send,
475    {
476        match tokio::runtime::Handle::try_current() {
477            Ok(handle) => match handle.runtime_flavor() {
478                tokio::runtime::RuntimeFlavor::MultiThread => {
479                    Ok(tokio::task::block_in_place(move || handle.block_on(future)))
480                }
481                _ => {
482                    let mut cfg: Option<T> = None;
483                    std::thread::scope(|scope| {
484                        scope.spawn(|| {
485                            cfg = Some(handle.block_on(future));
486                        });
487                    });
488                    cfg.ok_or(DeltaTableError::ObjectStore {
489                        source: ObjectStoreError::Generic {
490                            store: STORE_NAME,
491                            source: Box::new(UnityCatalogError::InitializationError),
492                        },
493                    })
494                }
495            },
496            Err(_) => {
497                let runtime = tokio::runtime::Builder::new_current_thread()
498                    .enable_all()
499                    .build()
500                    .expect("a tokio runtime is required by the Unity Catalog Builder");
501                Ok(runtime.block_on(future))
502            }
503        }
504    }
505
506    /// Returns the storage location and temporary token for the Unity Catalog table.
507    ///
508    /// If storage options are provided, they override environment variables for authentication.
509    pub async fn get_uc_location_and_token(
510        table_uri: &str,
511        storage_options: Option<&HashMap<String, String>>,
512    ) -> Result<(String, HashMap<String, String>), UnityCatalogError> {
513        let uri_parts: Vec<&str> = table_uri[5..].split('.').collect();
514        if uri_parts.len() != 3 {
515            return Err(UnityCatalogError::InvalidTableURI {
516                table_uri: table_uri.to_string(),
517            });
518        }
519
520        let catalog_id = uri_parts[0];
521        let database_name = uri_parts[1];
522        let table_name = uri_parts[2];
523
524        let unity_catalog = if let Some(options) = storage_options {
525            let mut builder = UnityCatalogBuilder::from_env();
526            builder =
527                builder.try_with_options(options.iter().map(|(k, v)| (k.as_str(), v.as_str())))?;
528            builder.build()?
529        } else {
530            UnityCatalogBuilder::from_env().build()?
531        };
532
533        let storage_location = unity_catalog
534            .get_table_storage_location(Some(catalog_id.to_string()), database_name, table_name)
535            .await?;
536        // Attempt to get read/write permissions to begin with.
537        let temp_creds_res = unity_catalog
538            .get_temp_table_credentials_with_permission(
539                catalog_id,
540                database_name,
541                table_name,
542                "READ_WRITE",
543            )
544            .await?;
545        let credentials = match temp_creds_res {
546            TableTempCredentialsResponse::Success(temp_creds) => temp_creds
547                .get_credentials()
548                .ok_or_else(|| UnityCatalogError::MissingCredential)?,
549            TableTempCredentialsResponse::Error(rw_error) => {
550                // If that fails attempt to get just read permissions.
551                match unity_catalog
552                    .get_temp_table_credentials(catalog_id, database_name, table_name)
553                    .await?
554                {
555                    TableTempCredentialsResponse::Success(temp_creds) => temp_creds
556                        .get_credentials()
557                        .ok_or_else(|| UnityCatalogError::MissingCredential)?,
558                    TableTempCredentialsResponse::Error(read_error) => {
559                        return Err(UnityCatalogError::TemporaryCredentialsFetchFailure {
560                            error_code: read_error.error_code,
561                            message: format!(
562                                "READ_WRITE failed: {}. READ failed: {}",
563                                rw_error.message, read_error.message
564                            ),
565                        });
566                    }
567                }
568            }
569        };
570        Ok((storage_location, credentials))
571    }
572
573    fn get_credential_provider(&self) -> Option<CredentialProvider> {
574        if let Some(token) = self.bearer_token.as_ref() {
575            return Some(CredentialProvider::BearerToken(token.clone()));
576        }
577
578        if let (Some(client_id), Some(client_secret), Some(workspace_host)) =
579            (&self.client_id, &self.client_secret, &self.workspace_url)
580        {
581            return Some(CredentialProvider::TokenCredential(
582                Default::default(),
583                Box::new(WorkspaceOAuthProvider::new(
584                    client_id,
585                    client_secret,
586                    workspace_host,
587                )),
588            ));
589        }
590
591        if let (Some(client_id), Some(client_secret), Some(authority_id)) = (
592            self.client_id.as_ref(),
593            self.client_secret.as_ref(),
594            self.authority_id.as_ref(),
595        ) {
596            return Some(CredentialProvider::TokenCredential(
597                Default::default(),
598                Box::new(ClientSecretOAuthProvider::new(
599                    client_id,
600                    client_secret,
601                    authority_id,
602                    self.authority_host.as_ref(),
603                )),
604            ));
605        }
606        if self.use_azure_cli {
607            return Some(CredentialProvider::TokenCredential(
608                Default::default(),
609                Box::new(AzureCliCredential::new()),
610            ));
611        }
612
613        None
614    }
615
616    /// Build an instance of [`UnityCatalog`]
617    pub fn build(self) -> DataCatalogResult<UnityCatalog> {
618        let credential = self
619            .get_credential_provider()
620            .ok_or(UnityCatalogError::MissingCredential)?;
621
622        let workspace_url = self
623            .workspace_url
624            .ok_or(UnityCatalogError::MissingConfiguration(
625                "workspace_url".into(),
626            ))?
627            .trim_end_matches('/')
628            .to_string();
629
630        let mut client_options = self.client_options;
631        if self.allow_http_url {
632            client_options.allow_http = true;
633        }
634        let client = client_options.client()?;
635
636        Ok(UnityCatalog {
637            client,
638            workspace_url,
639            credential,
640        })
641    }
642}
643
644/// Databricks Unity Catalog
645pub struct UnityCatalog {
646    client: reqwest_middleware::ClientWithMiddleware,
647    credential: CredentialProvider,
648    workspace_url: String,
649}
650
651impl UnityCatalog {
652    async fn get_credential(&self) -> Result<HeaderValue, UnityCatalogError> {
653        match &self.credential {
654            CredentialProvider::BearerToken(token) => {
655                // we do the conversion to a HeaderValue here, since it is fallible,
656                // and we want to use it in an infallible function
657                Ok(HeaderValue::from_str(&format!("Bearer {token}"))?)
658            }
659            CredentialProvider::TokenCredential(cache, cred) => {
660                let token = cache
661                    .get_or_insert_with(|| cred.fetch_token(&self.client))
662                    .await?;
663
664                // we do the conversion to a HeaderValue here, since it is fallible,
665                // and we want to use it in an infallible function
666                Ok(HeaderValue::from_str(&format!("Bearer {token}"))?)
667            }
668        }
669    }
670
671    fn catalog_url(&self) -> String {
672        format!("{}/api/2.1/unity-catalog", self.workspace_url)
673    }
674
675    /// Gets an array of catalogs in the metastore. If the caller is the metastore admin,
676    /// all catalogs will be retrieved. Otherwise, only catalogs owned by the caller
677    /// (or for which the caller has the USE_CATALOG privilege) will be retrieved.
678    /// There is no guarantee of a specific ordering of the elements in the array.
679    pub async fn list_catalogs(&self) -> Result<ListCatalogsResponse, UnityCatalogError> {
680        let token = self.get_credential().await?;
681        // https://docs.databricks.com/api-explorer/workspace/schemas/list
682        let resp = self
683            .client
684            .get(format!("{}/catalogs", self.catalog_url()))
685            .header(AUTHORIZATION, token)
686            .send()
687            .await?;
688        Ok(resp.json().await?)
689    }
690
691    /// List all schemas for a catalog in the metastore.
692    ///
693    /// If the caller is the metastore admin or the owner of the parent catalog, all schemas
694    /// for the catalog will be retrieved. Otherwise, only schemas owned by the caller
695    /// (or for which the caller has the USE_SCHEMA privilege) will be retrieved.
696    /// There is no guarantee of a specific ordering of the elements in the array.
697    ///
698    /// # Parameters
699    /// - catalog_name: Parent catalog for schemas of interest.
700    pub async fn list_schemas(
701        &self,
702        catalog_name: impl AsRef<str>,
703    ) -> Result<ListSchemasResponse, UnityCatalogError> {
704        let token = self.get_credential().await?;
705        // https://docs.databricks.com/api-explorer/workspace/schemas/list
706        let resp = self
707            .client
708            .get(format!("{}/schemas", self.catalog_url()))
709            .header(AUTHORIZATION, token)
710            .query(&[("catalog_name", catalog_name.as_ref())])
711            .send()
712            .await?;
713        let status = resp.status();
714        let body = resp.text().await?;
715        serde_json::from_str(&body).map_err(|e| {
716            tracing::error!(
717                "Failed to parse list_schemas response (status {}): {}",
718                status,
719                body
720            );
721            UnityCatalogError::Generic {
722                source: Box::new(e),
723            }
724        })
725    }
726
727    /// Gets the specified schema within the metastore.#
728    ///
729    /// The caller must be a metastore admin, the owner of the schema,
730    /// or a user that has the USE_SCHEMA privilege on the schema.
731    pub async fn get_schema(
732        &self,
733        catalog_name: impl AsRef<str>,
734        schema_name: impl AsRef<str>,
735    ) -> Result<GetSchemaResponse, UnityCatalogError> {
736        let token = self.get_credential().await?;
737        // https://docs.databricks.com/api-explorer/workspace/schemas/get
738        let resp = self
739            .client
740            .get(format!(
741                "{}/schemas/{}.{}",
742                self.catalog_url(),
743                catalog_name.as_ref(),
744                schema_name.as_ref()
745            ))
746            .header(AUTHORIZATION, token)
747            .send()
748            .await?;
749        Ok(resp.json().await?)
750    }
751
752    /// Gets an array of summaries for tables for a schema and catalog within the metastore.
753    ///
754    /// The table summaries returned are either:
755    /// - summaries for all tables (within the current metastore and parent catalog and schema),
756    ///   when the user is a metastore admin, or:
757    /// - summaries for all tables and schemas (within the current metastore and parent catalog)
758    ///   for which the user has ownership or the SELECT privilege on the table and ownership or
759    ///   USE_SCHEMA privilege on the schema, provided that the user also has ownership or the
760    ///   USE_CATALOG privilege on the parent catalog.
761    ///
762    /// There is no guarantee of a specific ordering of the elements in the array.
763    pub async fn list_table_summaries(
764        &self,
765        catalog_name: impl AsRef<str>,
766        schema_name_pattern: impl AsRef<str>,
767    ) -> Result<ListTableSummariesResponse, UnityCatalogError> {
768        let token = self.get_credential().await?;
769        // https://docs.databricks.com/api-explorer/workspace/tables/listsummaries
770        let resp = self
771            .client
772            .get(format!("{}/table-summaries", self.catalog_url()))
773            .query(&[
774                ("catalog_name", catalog_name.as_ref()),
775                ("schema_name_pattern", schema_name_pattern.as_ref()),
776            ])
777            .header(AUTHORIZATION, token)
778            .send()
779            .await?;
780
781        Ok(resp.json().await?)
782    }
783
784    /// Gets a table from the metastore for a specific catalog and schema.
785    ///
786    /// The caller must be a metastore admin, be the owner of the table and have the
787    /// USE_CATALOG privilege on the parent catalog and the USE_SCHEMA privilege on
788    /// the parent schema, or be the owner of the table and have the SELECT privilege on it as well.
789    ///
790    /// # Parameters
791    pub async fn get_table(
792        &self,
793        catalog_id: impl AsRef<str>,
794        database_name: impl AsRef<str>,
795        table_name: impl AsRef<str>,
796    ) -> Result<GetTableResponse, UnityCatalogError> {
797        let token = self.get_credential().await?;
798        // https://docs.databricks.com/api-explorer/workspace/tables/get
799        let resp = self
800            .client
801            .get(format!(
802                "{}/tables/{}.{}.{}",
803                self.catalog_url(),
804                catalog_id.as_ref(),
805                database_name.as_ref(),
806                table_name.as_ref(),
807            ))
808            .header(AUTHORIZATION, token)
809            .send()
810            .await?;
811
812        Ok(resp.json().await?)
813    }
814
815    pub async fn get_temp_table_credentials(
816        &self,
817        catalog_id: impl AsRef<str>,
818        database_name: impl AsRef<str>,
819        table_name: impl AsRef<str>,
820    ) -> Result<TableTempCredentialsResponse, UnityCatalogError> {
821        self.get_temp_table_credentials_with_permission(
822            catalog_id,
823            database_name,
824            table_name,
825            "READ",
826        )
827        .await
828    }
829
830    pub async fn get_temp_table_credentials_with_permission(
831        &self,
832        catalog_id: impl AsRef<str>,
833        database_name: impl AsRef<str>,
834        table_name: impl AsRef<str>,
835        operation: impl AsRef<str>,
836    ) -> Result<TableTempCredentialsResponse, UnityCatalogError> {
837        let token = self.get_credential().await?;
838        let table_info = self
839            .get_table(catalog_id, database_name, table_name)
840            .await?;
841        let response = match table_info {
842            GetTableResponse::Success(table) => {
843                let request =
844                    TemporaryTableCredentialsRequest::new(&table.table_id, operation.as_ref());
845                Ok(self
846                    .client
847                    .post(format!(
848                        "{}/temporary-table-credentials",
849                        self.catalog_url()
850                    ))
851                    .header(AUTHORIZATION, token)
852                    .json(&request)
853                    .send()
854                    .await?)
855            }
856            GetTableResponse::Error(err) => Err(UnityCatalogError::InvalidTable {
857                error_code: err.error_code,
858                message: err.message,
859            }),
860        }?;
861
862        Ok(response.json().await?)
863    }
864}
865
866#[derive(Clone, Default, Debug)]
867pub struct UnityCatalogFactory {}
868
869impl ObjectStoreFactory for UnityCatalogFactory {
870    fn parse_url_opts(
871        &self,
872        table_uri: &Url,
873        config: &StorageConfig,
874    ) -> DeltaResult<(ObjectStoreRef, Path)> {
875        let (table_path, temp_creds) = UnityCatalogBuilder::execute_uc_future(
876            UnityCatalogBuilder::get_uc_location_and_token(table_uri.as_str(), Some(&config.raw)),
877        )??;
878
879        let mut storage_options = config.raw.clone();
880        storage_options.extend(temp_creds);
881
882        // TODO(roeap): we should not have to go through the table here.
883        // ideally we just create the right storage ...
884        let table_url = ensure_table_uri(&table_path)?;
885        let mut builder = DeltaTableBuilder::from_url(table_url)?;
886
887        if let Some(runtime) = &config.runtime {
888            builder = builder.with_io_runtime(runtime.clone());
889        }
890
891        if !storage_options.is_empty() {
892            builder = builder.with_storage_options(storage_options.clone());
893        }
894        let prefix = Path::parse(table_uri.path())?;
895        let store = builder.build_storage()?.object_store(None);
896
897        Ok((store, prefix))
898    }
899}
900
901impl LogStoreFactory for UnityCatalogFactory {
902    fn with_options(
903        &self,
904        prefixed_store: ObjectStoreRef,
905        root_store: ObjectStoreRef,
906        location: &Url,
907        options: &StorageConfig,
908    ) -> DeltaResult<Arc<dyn LogStore>> {
909        Ok(default_logstore(
910            prefixed_store,
911            root_store,
912            location,
913            options,
914        ))
915    }
916}
917
918/// Register an [ObjectStoreFactory] for common UnityCatalogFactory [Url] schemes
919pub fn register_handlers(_additional_prefixes: Option<Url>) {
920    let factory = Arc::new(UnityCatalogFactory::default());
921    let url = Url::parse("uc://").unwrap();
922    object_store_factories().insert(url.clone(), factory.clone());
923    logstore_factories().insert(url.clone(), factory.clone());
924}
925
926#[async_trait::async_trait]
927impl DataCatalog for UnityCatalog {
928    type Error = UnityCatalogError;
929    /// Get the table storage location from the UnityCatalog
930    async fn get_table_storage_location(
931        &self,
932        catalog_id: Option<String>,
933        database_name: &str,
934        table_name: &str,
935    ) -> Result<String, UnityCatalogError> {
936        match self
937            .get_table(
938                catalog_id.unwrap_or("main".into()),
939                database_name,
940                table_name,
941            )
942            .await?
943        {
944            GetTableResponse::Success(table) => Ok(table.storage_location),
945            GetTableResponse::Error(err) => Err(UnityCatalogError::InvalidTable {
946                error_code: err.error_code,
947                message: err.message,
948            }),
949        }
950    }
951}
952
953impl std::fmt::Debug for UnityCatalog {
954    fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
955        write!(fmt, "UnityCatalog")
956    }
957}
958
959#[cfg(test)]
960mod tests {
961    use crate::UnityCatalogBuilder;
962    use crate::client::ClientOptions;
963    use crate::models::tests::{GET_SCHEMA_RESPONSE, GET_TABLE_RESPONSE, LIST_SCHEMAS_RESPONSE};
964    use crate::models::*;
965    use deltalake_core::DataCatalog;
966    use httpmock::prelude::*;
967    use std::collections::HashMap;
968
969    #[tokio::test]
970    async fn test_unity_client() {
971        let server = MockServer::start_async().await;
972
973        let options = ClientOptions::builder().allow_http(true).build();
974
975        let client = UnityCatalogBuilder::builder()
976            .workspace_url(server.url(""))
977            .bearer_token("bearer_token")
978            .client_options(options)
979            .build()
980            .build()
981            .unwrap();
982
983        server
984            .mock_async(|when, then| {
985                when.path("/api/2.1/unity-catalog/schemas").method("GET");
986                then.body(LIST_SCHEMAS_RESPONSE);
987            })
988            .await;
989
990        server
991            .mock_async(|when, then| {
992                when.path("/api/2.1/unity-catalog/schemas/catalog_name.schema_name")
993                    .method("GET");
994                then.body(GET_SCHEMA_RESPONSE);
995            })
996            .await;
997
998        server
999            .mock_async(|when, then| {
1000                when.path("/api/2.1/unity-catalog/tables/catalog_name.schema_name.table_name")
1001                    .method("GET");
1002                then.body(GET_TABLE_RESPONSE);
1003            })
1004            .await;
1005
1006        let list_schemas_response = client.list_schemas("catalog_name").await.unwrap();
1007        assert!(matches!(
1008            list_schemas_response,
1009            ListSchemasResponse::Success { .. }
1010        ));
1011
1012        let get_schema_response = client
1013            .get_schema("catalog_name", "schema_name")
1014            .await
1015            .unwrap();
1016        assert!(matches!(get_schema_response, GetSchemaResponse::Success(_)));
1017
1018        let get_table_response = client
1019            .get_table("catalog_name", "schema_name", "table_name")
1020            .await;
1021        assert!(matches!(
1022            get_table_response.unwrap(),
1023            GetTableResponse::Success(_)
1024        ));
1025
1026        let storage_location = client
1027            .get_table_storage_location(
1028                Some("catalog_name".to_string()),
1029                "schema_name",
1030                "table_name",
1031            )
1032            .await
1033            .unwrap();
1034        assert!(storage_location.eq_ignore_ascii_case("string"));
1035    }
1036
1037    #[test]
1038    fn test_unitycatalogbuilder_with_storage_options() {
1039        let mut storage_options = HashMap::new();
1040        storage_options.insert(
1041            "databricks_host".to_string(),
1042            "https://test.databricks.com".to_string(),
1043        );
1044        storage_options.insert("databricks_token".to_string(), "test_token".to_string());
1045
1046        let builder = UnityCatalogBuilder::builder()
1047            .build()
1048            .try_with_options(&storage_options)
1049            .unwrap();
1050
1051        assert_eq!(
1052            builder.workspace_url,
1053            Some("https://test.databricks.com".to_string())
1054        );
1055        assert_eq!(builder.bearer_token, Some("test_token".to_string()));
1056    }
1057
1058    #[test]
1059    fn test_unitycatalogbuilder_client_credentials() {
1060        let mut storage_options = HashMap::new();
1061        storage_options.insert(
1062            "databricks_host".to_string(),
1063            "https://test.databricks.com".to_string(),
1064        );
1065        storage_options.insert("unity_client_id".to_string(), "test_client_id".to_string());
1066        storage_options.insert("unity_client_secret".to_string(), "test_secret".to_string());
1067        storage_options.insert("unity_authority_id".to_string(), "test_tenant".to_string());
1068
1069        let builder = UnityCatalogBuilder::builder()
1070            .build()
1071            .try_with_options(&storage_options)
1072            .unwrap();
1073
1074        assert_eq!(
1075            builder.workspace_url,
1076            Some("https://test.databricks.com".to_string())
1077        );
1078        assert_eq!(builder.client_id, Some("test_client_id".to_string()));
1079        assert_eq!(builder.client_secret, Some("test_secret".to_string()));
1080        assert_eq!(builder.authority_id, Some("test_tenant".to_string()));
1081    }
1082
1083    #[test]
1084    fn test_env_with_storage_options_override() {
1085        unsafe {
1086            std::env::set_var("DATABRICKS_HOST", "https://env.databricks.com");
1087            std::env::set_var("DATABRICKS_TOKEN", "env_token");
1088        }
1089
1090        let mut storage_options = HashMap::new();
1091        storage_options.insert(
1092            "databricks_host".to_string(),
1093            "https://override.databricks.com".to_string(),
1094        );
1095
1096        let builder = UnityCatalogBuilder::from_env()
1097            .try_with_options(&storage_options)
1098            .unwrap();
1099
1100        assert_eq!(
1101            builder.workspace_url,
1102            Some("https://override.databricks.com".to_string())
1103        );
1104        assert_eq!(builder.bearer_token, Some("env_token".to_string()));
1105
1106        unsafe {
1107            std::env::remove_var("DATABRICKS_HOST");
1108            std::env::remove_var("DATABRICKS_TOKEN");
1109        }
1110    }
1111
1112    #[test]
1113    fn test_storage_options_key_variations() {
1114        let test_cases = vec![
1115            ("databricks_host", "workspace_url"),
1116            ("unity_workspace_url", "workspace_url"),
1117            ("databricks_workspace_url", "workspace_url"),
1118            ("databricks_token", "bearer_token"),
1119            ("token", "bearer_token"),
1120            ("unity_client_id", "client_id"),
1121            ("databricks_client_id", "client_id"),
1122            ("client_id", "client_id"),
1123        ];
1124
1125        for (key, field) in test_cases {
1126            let mut storage_options = HashMap::new();
1127            let test_value = format!("test_value_for_{}", key);
1128            storage_options.insert(key.to_string(), test_value.clone());
1129
1130            let result = UnityCatalogBuilder::builder()
1131                .build()
1132                .try_with_options(&storage_options);
1133            assert!(result.is_ok(), "Failed to parse key: {}", key);
1134
1135            let builder = result.unwrap();
1136            match field {
1137                "workspace_url" => assert_eq!(builder.workspace_url, Some(test_value)),
1138                "bearer_token" => assert_eq!(builder.bearer_token, Some(test_value)),
1139                "client_id" => assert_eq!(builder.client_id, Some(test_value)),
1140                _ => {}
1141            }
1142        }
1143    }
1144
1145    #[test]
1146    fn test_invalid_config_key() {
1147        let mut storage_options = HashMap::new();
1148        storage_options.insert("invalid_key".to_string(), "test_value".to_string());
1149
1150        let result = UnityCatalogBuilder::builder()
1151            .build()
1152            .try_with_options(&storage_options);
1153        assert!(result.is_err());
1154    }
1155
1156    #[test]
1157    fn test_boolean_options() {
1158        let test_cases = vec![
1159            ("true", true),
1160            ("false", false),
1161            ("1", true),
1162            ("0", false),
1163            ("yes", true),
1164            ("no", false),
1165        ];
1166
1167        for (value, expected) in test_cases {
1168            let mut storage_options = HashMap::new();
1169            storage_options.insert("unity_allow_http_url".to_string(), value.to_string());
1170            storage_options.insert("unity_use_azure_cli".to_string(), value.to_string());
1171
1172            let builder = UnityCatalogBuilder::builder()
1173                .build()
1174                .try_with_options(&storage_options)
1175                .unwrap();
1176
1177            assert_eq!(
1178                builder.allow_http_url, expected,
1179                "Failed for value: {}",
1180                value
1181            );
1182            assert_eq!(
1183                builder.use_azure_cli, expected,
1184                "Failed for value: {}",
1185                value
1186            );
1187        }
1188    }
1189
1190    #[tokio::test]
1191    async fn test_invalid_table_uri() {
1192        let test_cases = vec![
1193            "uc://invalid",
1194            "uc://",
1195            "uc://catalog",
1196            "uc://catalog.schema",
1197            "uc://catalog.schema.table.extra",
1198            "invalid://catalog.schema.table",
1199        ];
1200
1201        for uri in test_cases {
1202            let result = UnityCatalogBuilder::get_uc_location_and_token(uri, None).await;
1203            assert!(result.is_err(), "Expected error for URI: {}", uri);
1204
1205            if let Err(e) = result
1206                && uri.starts_with("uc://")
1207                && uri.len() > 5
1208            {
1209                assert!(matches!(
1210                    e,
1211                    crate::UnityCatalogError::InvalidTableURI { .. }
1212                ));
1213            }
1214        }
1215    }
1216}