1#![warn(clippy::all)]
2#![warn(rust_2018_idioms)]
3#[cfg(not(any(feature = "aws", feature = "azure", feature = "gcp", feature = "r2")))]
5compile_error!(
6 "At least one of the following crate features `aws`, `azure`, `gcp`, or `r2` must be enabled \
7 for this crate to function properly."
8);
9
10use deltalake_core::logstore::{
11 default_logstore, logstore_factories, object_store::RetryConfig, LogStore, LogStoreFactory,
12 StorageConfig,
13};
14use reqwest::header::{HeaderValue, InvalidHeaderValue, AUTHORIZATION};
15use reqwest::Url;
16use std::collections::HashMap;
17use std::future::Future;
18use std::str::FromStr;
19use std::sync::Arc;
20
21use crate::credential::{
22 AzureCliCredential, ClientSecretOAuthProvider, CredentialProvider, WorkspaceOAuthProvider,
23};
24use crate::models::{
25 ErrorResponse, GetSchemaResponse, GetTableResponse, ListCatalogsResponse, ListSchemasResponse,
26 ListTableSummariesResponse, TableTempCredentialsResponse, TemporaryTableCredentialsRequest,
27 TokenErrorResponse,
28};
29
30use deltalake_core::data_catalog::DataCatalogResult;
31use deltalake_core::{
32 ensure_table_uri, DataCatalog, DataCatalogError, DeltaResult, DeltaTableBuilder,
33 DeltaTableError, ObjectStoreError, Path,
34};
35
36use crate::client::retry::*;
37use deltalake_core::logstore::{
38 config::str_is_truthy, object_store_factories, ObjectStoreFactory, ObjectStoreRef,
39};
40pub mod client;
41pub mod credential;
42
43const STORE_NAME: &str = "UnityCatalogObjectStore";
44#[cfg(feature = "datafusion")]
45pub mod datafusion;
46pub mod models;
47pub mod prelude;
48
49#[derive(thiserror::Error, Debug)]
51pub enum UnityCatalogError {
52 #[error("GET request error: {source}")]
53 RequestError {
55 #[from]
57 source: reqwest::Error,
58 },
59
60 #[error("Error in middleware: {source}")]
61 RequestMiddlewareError {
62 #[from]
64 source: reqwest_middleware::Error,
65 },
66
67 #[error("Invalid table error: {error_code}: {message}")]
69 InvalidTable {
70 error_code: String,
72 message: String,
74 },
75
76 #[error("Invalid token for auth header: {header_error}")]
77 InvalidHeader {
78 #[from]
79 header_error: InvalidHeaderValue,
80 },
81
82 #[error("Invalid Unity Catalog Table URI: {table_uri}")]
84 InvalidTableURI {
85 table_uri: String,
87 },
88
89 #[error("Missing configuration key: {0}")]
91 MissingConfiguration(String),
92
93 #[error("Failed to get a credential from UnityCatalog client configuration.")]
95 MissingCredential,
96
97 #[error("Unable to get temporary credentials from Unity Catalog.")]
99 TemporaryCredentialsFetchFailure,
100
101 #[error("Azure CLI error: {message}")]
102 AzureCli {
103 message: String,
105 },
106
107 #[error("Missing or corrupted federated token file for WorkloadIdentity.")]
108 FederatedTokenFile,
109
110 #[cfg(feature = "datafusion")]
111 #[error("Datafusion error: {0}")]
112 DatafusionError(#[from] ::datafusion::common::DataFusionError),
113
114 #[error("Unable to initialize Unity Catalog, potentially a threading issue")]
116 InitializationError,
117
118 #[error("An error occurred in catalog: {source}")]
120 Generic {
121 source: Box<dyn std::error::Error + Send + Sync + 'static>,
123 },
124
125 #[error("Non-200 returned on token acquisition: {0}")]
126 InvalidCredentials(TokenErrorResponse),
127}
128
129impl From<ErrorResponse> for UnityCatalogError {
130 fn from(value: ErrorResponse) -> Self {
131 UnityCatalogError::InvalidTable {
132 error_code: value.error_code,
133 message: value.message,
134 }
135 }
136}
137
138impl From<DataCatalogError> for UnityCatalogError {
139 fn from(value: DataCatalogError) -> Self {
140 UnityCatalogError::Generic {
141 source: Box::new(value),
142 }
143 }
144}
145
146impl From<UnityCatalogError> for DataCatalogError {
147 fn from(value: UnityCatalogError) -> Self {
148 DataCatalogError::Generic {
149 catalog: "Unity",
150 source: Box::new(value),
151 }
152 }
153}
154
155impl From<UnityCatalogError> for DeltaTableError {
156 fn from(value: UnityCatalogError) -> Self {
157 DeltaTableError::GenericError {
158 source: Box::new(value),
159 }
160 }
161}
162
163pub enum UnityCatalogConfigKey {
165 #[deprecated(since = "0.17.0", note = "Please use the DATABRICKS_HOST env variable")]
172 WorkspaceUrl,
173
174 Host,
176
177 #[deprecated(
184 since = "0.17.0",
185 note = "Please use the DATABRICKS_TOKEN env variable"
186 )]
187 AccessToken,
188
189 Token,
191
192 ClientId,
199
200 ClientSecret,
207
208 AuthorityId,
215
216 AuthorityHost,
223
224 MsiEndpoint,
232
233 ObjectId,
239
240 MsiResourceId,
246
247 FederatedTokenFile,
253
254 UseAzureCli,
260
261 AllowHttpUrl,
265}
266
267impl FromStr for UnityCatalogConfigKey {
268 type Err = DataCatalogError;
269
270 #[allow(deprecated)]
271 fn from_str(s: &str) -> Result<Self, Self::Err> {
272 match s {
273 "access_token"
274 | "unity_access_token"
275 | "databricks_access_token"
276 | "databricks_token" => Ok(UnityCatalogConfigKey::AccessToken),
277 "authority_host" | "unity_authority_host" | "databricks_authority_host" => {
278 Ok(UnityCatalogConfigKey::AuthorityHost)
279 }
280 "authority_id" | "unity_authority_id" | "databricks_authority_id" => {
281 Ok(UnityCatalogConfigKey::AuthorityId)
282 }
283 "client_id" | "unity_client_id" | "databricks_client_id" => {
284 Ok(UnityCatalogConfigKey::ClientId)
285 }
286 "client_secret" | "unity_client_secret" | "databricks_client_secret" => {
287 Ok(UnityCatalogConfigKey::ClientSecret)
288 }
289 "federated_token_file"
290 | "unity_federated_token_file"
291 | "databricks_federated_token_file" => Ok(UnityCatalogConfigKey::FederatedTokenFile),
292 "host" => Ok(UnityCatalogConfigKey::Host),
293 "msi_endpoint" | "unity_msi_endpoint" | "databricks_msi_endpoint" => {
294 Ok(UnityCatalogConfigKey::MsiEndpoint)
295 }
296 "msi_resource_id" | "unity_msi_resource_id" | "databricks_msi_resource_id" => {
297 Ok(UnityCatalogConfigKey::MsiResourceId)
298 }
299 "object_id" | "unity_object_id" | "databricks_object_id" => {
300 Ok(UnityCatalogConfigKey::ObjectId)
301 }
302 "token" => Ok(UnityCatalogConfigKey::Token),
303 "use_azure_cli" | "unity_use_azure_cli" | "databricks_use_azure_cli" => {
304 Ok(UnityCatalogConfigKey::UseAzureCli)
305 }
306 "workspace_url"
307 | "unity_workspace_url"
308 | "databricks_workspace_url"
309 | "databricks_host" => Ok(UnityCatalogConfigKey::WorkspaceUrl),
310 "allow_http_url" | "unity_allow_http_url" => Ok(UnityCatalogConfigKey::AllowHttpUrl),
311 _ => Err(DataCatalogError::UnknownConfigKey {
312 catalog: "unity",
313 key: s.to_string(),
314 }),
315 }
316 }
317}
318
319#[allow(deprecated)]
320impl AsRef<str> for UnityCatalogConfigKey {
321 fn as_ref(&self) -> &str {
322 match self {
323 UnityCatalogConfigKey::AccessToken => "unity_access_token",
324 UnityCatalogConfigKey::AllowHttpUrl => "unity_allow_http_url",
325 UnityCatalogConfigKey::AuthorityHost => "unity_authority_host",
326 UnityCatalogConfigKey::AuthorityId => "unity_authority_id",
327 UnityCatalogConfigKey::ClientId => "unity_client_id",
328 UnityCatalogConfigKey::ClientSecret => "unity_client_secret",
329 UnityCatalogConfigKey::FederatedTokenFile => "unity_federated_token_file",
330 UnityCatalogConfigKey::Host => "databricks_host",
331 UnityCatalogConfigKey::MsiEndpoint => "unity_msi_endpoint",
332 UnityCatalogConfigKey::MsiResourceId => "unity_msi_resource_id",
333 UnityCatalogConfigKey::ObjectId => "unity_object_id",
334 UnityCatalogConfigKey::UseAzureCli => "unity_use_azure_cli",
335 UnityCatalogConfigKey::Token => "databricks_token",
336 UnityCatalogConfigKey::WorkspaceUrl => "unity_workspace_url",
337 }
338 }
339}
340
341#[derive(Default)]
343pub struct UnityCatalogBuilder {
344 workspace_url: Option<String>,
346
347 bearer_token: Option<String>,
349
350 client_id: Option<String>,
352
353 client_secret: Option<String>,
355
356 authority_id: Option<String>,
358
359 authority_host: Option<String>,
361
362 msi_endpoint: Option<String>,
364
365 object_id: Option<String>,
367
368 msi_resource_id: Option<String>,
370
371 federated_token_file: Option<String>,
373
374 use_azure_cli: bool,
376
377 allow_http_url: bool,
379
380 retry_config: RetryConfig,
382
383 client_options: client::ClientOptions,
385}
386
387#[allow(deprecated)]
388impl UnityCatalogBuilder {
389 pub fn new() -> Self {
391 Default::default()
392 }
393
394 pub fn try_with_option(
396 mut self,
397 key: impl AsRef<str>,
398 value: impl Into<String>,
399 ) -> DataCatalogResult<Self> {
400 match UnityCatalogConfigKey::from_str(key.as_ref())? {
401 UnityCatalogConfigKey::AccessToken => self.bearer_token = Some(value.into()),
402 UnityCatalogConfigKey::AllowHttpUrl => {
403 self.allow_http_url = str_is_truthy(&value.into())
404 }
405 UnityCatalogConfigKey::ClientId => self.client_id = Some(value.into()),
406 UnityCatalogConfigKey::ClientSecret => self.client_secret = Some(value.into()),
407 UnityCatalogConfigKey::AuthorityId => self.authority_id = Some(value.into()),
408 UnityCatalogConfigKey::AuthorityHost => self.authority_host = Some(value.into()),
409 UnityCatalogConfigKey::Host => self.workspace_url = Some(value.into()),
410 UnityCatalogConfigKey::MsiEndpoint => self.msi_endpoint = Some(value.into()),
411 UnityCatalogConfigKey::ObjectId => self.object_id = Some(value.into()),
412 UnityCatalogConfigKey::MsiResourceId => self.msi_resource_id = Some(value.into()),
413 UnityCatalogConfigKey::FederatedTokenFile => {
414 self.federated_token_file = Some(value.into())
415 }
416 UnityCatalogConfigKey::Token => self.bearer_token = Some(value.into()),
417 UnityCatalogConfigKey::UseAzureCli => self.use_azure_cli = str_is_truthy(&value.into()),
418 UnityCatalogConfigKey::WorkspaceUrl => self.workspace_url = Some(value.into()),
419 };
420 Ok(self)
421 }
422
423 pub fn try_with_options<I: IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>>(
425 mut self,
426 options: I,
427 ) -> DataCatalogResult<Self> {
428 for (key, value) in options {
429 self = self.try_with_option(key, value)?;
430 }
431 Ok(self)
432 }
433
434 pub fn from_env() -> Self {
438 let mut builder = Self::default();
439 for (os_key, os_value) in std::env::vars_os() {
440 if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) {
441 if key.starts_with("UNITY_") || key.starts_with("DATABRICKS_") {
442 tracing::debug!("Found relevant env: {key}");
443 if let Ok(config_key) =
444 UnityCatalogConfigKey::from_str(&key.to_ascii_lowercase())
445 {
446 tracing::debug!("Trying: {key} with {value}");
447 builder = builder.try_with_option(config_key, value).unwrap();
448 }
449 }
450 }
451 }
452
453 builder
454 }
455
456 pub fn with_workspace_url(mut self, url: impl Into<String>) -> Self {
458 self.workspace_url = Some(url.into());
459 self
460 }
461
462 pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
464 self.client_id = Some(client_id.into());
465 self
466 }
467
468 pub fn with_client_secret(mut self, client_secret: impl Into<String>) -> Self {
470 self.client_secret = Some(client_secret.into());
471 self
472 }
473
474 pub fn with_authority_id(mut self, tenant_id: impl Into<String>) -> Self {
476 self.authority_id = Some(tenant_id.into());
477 self
478 }
479
480 pub fn with_bearer_token(mut self, bearer_token: impl Into<String>) -> Self {
482 self.bearer_token = Some(bearer_token.into());
483 self
484 }
485
486 pub fn with_access_token(self, access_token: impl Into<String>) -> Self {
488 self.with_bearer_token(access_token)
489 }
490
491 pub fn with_client_options(mut self, options: client::ClientOptions) -> Self {
493 self.client_options = options;
494 self
495 }
496
497 pub fn with_retry_config(mut self, config: RetryConfig) -> Self {
499 self.retry_config = config;
500 self
501 }
502
503 fn execute_uc_future<F, T>(future: F) -> DeltaResult<T>
504 where
505 T: Send,
506 F: Future<Output = T> + Send,
507 {
508 match tokio::runtime::Handle::try_current() {
509 Ok(handle) => match handle.runtime_flavor() {
510 tokio::runtime::RuntimeFlavor::MultiThread => {
511 Ok(tokio::task::block_in_place(move || handle.block_on(future)))
512 }
513 _ => {
514 let mut cfg: Option<T> = None;
515 std::thread::scope(|scope| {
516 scope.spawn(|| {
517 cfg = Some(handle.block_on(future));
518 });
519 });
520 cfg.ok_or(DeltaTableError::ObjectStore {
521 source: ObjectStoreError::Generic {
522 store: STORE_NAME,
523 source: Box::new(UnityCatalogError::InitializationError),
524 },
525 })
526 }
527 },
528 Err(_) => {
529 let runtime = tokio::runtime::Builder::new_current_thread()
530 .enable_all()
531 .build()
532 .expect("a tokio runtime is required by the Unity Catalog Builder");
533 Ok(runtime.block_on(future))
534 }
535 }
536 }
537
538 pub async fn get_uc_location_and_token(
542 table_uri: &str,
543 storage_options: Option<&HashMap<String, String>>,
544 ) -> Result<(String, HashMap<String, String>), UnityCatalogError> {
545 let uri_parts: Vec<&str> = table_uri[5..].split('.').collect();
546 if uri_parts.len() != 3 {
547 return Err(UnityCatalogError::InvalidTableURI {
548 table_uri: table_uri.to_string(),
549 });
550 }
551
552 let catalog_id = uri_parts[0];
553 let database_name = uri_parts[1];
554 let table_name = uri_parts[2];
555
556 let unity_catalog = if let Some(options) = storage_options {
557 let mut builder = UnityCatalogBuilder::from_env();
558 builder =
559 builder.try_with_options(options.iter().map(|(k, v)| (k.as_str(), v.as_str())))?;
560 builder.build()?
561 } else {
562 UnityCatalogBuilder::from_env().build()?
563 };
564
565 let storage_location = unity_catalog
566 .get_table_storage_location(Some(catalog_id.to_string()), database_name, table_name)
567 .await?;
568 let temp_creds_res = unity_catalog
569 .get_temp_table_credentials(catalog_id, database_name, table_name)
570 .await?;
571 let credentials = match temp_creds_res {
572 TableTempCredentialsResponse::Success(temp_creds) => temp_creds
573 .get_credentials()
574 .ok_or_else(|| UnityCatalogError::MissingCredential)?,
575 TableTempCredentialsResponse::Error(_error) => {
576 return Err(UnityCatalogError::TemporaryCredentialsFetchFailure)
577 }
578 };
579 Ok((storage_location, credentials))
580 }
581
582 fn get_credential_provider(&self) -> Option<CredentialProvider> {
583 if let Some(token) = self.bearer_token.as_ref() {
584 return Some(CredentialProvider::BearerToken(token.clone()));
585 }
586
587 if let (Some(client_id), Some(client_secret), Some(workspace_host)) =
588 (&self.client_id, &self.client_secret, &self.workspace_url)
589 {
590 return Some(CredentialProvider::TokenCredential(
591 Default::default(),
592 Box::new(WorkspaceOAuthProvider::new(
593 client_id,
594 client_secret,
595 workspace_host,
596 )),
597 ));
598 }
599
600 if let (Some(client_id), Some(client_secret), Some(authority_id)) = (
601 self.client_id.as_ref(),
602 self.client_secret.as_ref(),
603 self.authority_id.as_ref(),
604 ) {
605 return Some(CredentialProvider::TokenCredential(
606 Default::default(),
607 Box::new(ClientSecretOAuthProvider::new(
608 client_id,
609 client_secret,
610 authority_id,
611 self.authority_host.as_ref(),
612 )),
613 ));
614 }
615 if self.use_azure_cli {
616 return Some(CredentialProvider::TokenCredential(
617 Default::default(),
618 Box::new(AzureCliCredential::new()),
619 ));
620 }
621
622 None
623 }
624
625 pub fn build(self) -> DataCatalogResult<UnityCatalog> {
627 let credential = self
628 .get_credential_provider()
629 .ok_or(UnityCatalogError::MissingCredential)?;
630
631 let workspace_url = self
632 .workspace_url
633 .ok_or(UnityCatalogError::MissingConfiguration(
634 "workspace_url".into(),
635 ))?
636 .trim_end_matches('/')
637 .to_string();
638
639 let client_options = if self.allow_http_url {
640 self.client_options.with_allow_http(true)
641 } else {
642 self.client_options
643 };
644 let client = client_options.client()?;
645
646 Ok(UnityCatalog {
647 client,
648 workspace_url,
649 credential,
650 })
651 }
652}
653
654pub struct UnityCatalog {
656 client: reqwest_middleware::ClientWithMiddleware,
657 credential: CredentialProvider,
658 workspace_url: String,
659}
660
661impl UnityCatalog {
662 async fn get_credential(&self) -> Result<HeaderValue, UnityCatalogError> {
663 match &self.credential {
664 CredentialProvider::BearerToken(token) => {
665 Ok(HeaderValue::from_str(&format!("Bearer {token}"))?)
668 }
669 CredentialProvider::TokenCredential(cache, cred) => {
670 let token = cache
671 .get_or_insert_with(|| cred.fetch_token(&self.client))
672 .await?;
673
674 Ok(HeaderValue::from_str(&format!("Bearer {token}"))?)
677 }
678 }
679 }
680
681 fn catalog_url(&self) -> String {
682 format!("{}/api/2.1/unity-catalog", self.workspace_url)
683 }
684
685 pub async fn list_catalogs(&self) -> Result<ListCatalogsResponse, UnityCatalogError> {
690 let token = self.get_credential().await?;
691 let resp = self
693 .client
694 .get(format!("{}/catalogs", self.catalog_url()))
695 .header(AUTHORIZATION, token)
696 .send()
697 .await?;
698 Ok(resp.json().await?)
699 }
700
701 pub async fn list_schemas(
711 &self,
712 catalog_name: impl AsRef<str>,
713 ) -> Result<ListSchemasResponse, UnityCatalogError> {
714 let token = self.get_credential().await?;
715 let resp = self
717 .client
718 .get(format!("{}/schemas", self.catalog_url()))
719 .header(AUTHORIZATION, token)
720 .query(&[("catalog_name", catalog_name.as_ref())])
721 .send()
722 .await?;
723 Ok(resp.json().await?)
724 }
725
726 pub async fn get_schema(
731 &self,
732 catalog_name: impl AsRef<str>,
733 schema_name: impl AsRef<str>,
734 ) -> Result<GetSchemaResponse, UnityCatalogError> {
735 let token = self.get_credential().await?;
736 let resp = self
738 .client
739 .get(format!(
740 "{}/schemas/{}.{}",
741 self.catalog_url(),
742 catalog_name.as_ref(),
743 schema_name.as_ref()
744 ))
745 .header(AUTHORIZATION, token)
746 .send()
747 .await?;
748 Ok(resp.json().await?)
749 }
750
751 pub async fn list_table_summaries(
763 &self,
764 catalog_name: impl AsRef<str>,
765 schema_name_pattern: impl AsRef<str>,
766 ) -> Result<ListTableSummariesResponse, UnityCatalogError> {
767 let token = self.get_credential().await?;
768 let resp = self
770 .client
771 .get(format!("{}/table-summaries", self.catalog_url()))
772 .query(&[
773 ("catalog_name", catalog_name.as_ref()),
774 ("schema_name_pattern", schema_name_pattern.as_ref()),
775 ])
776 .header(AUTHORIZATION, token)
777 .send()
778 .await?;
779
780 Ok(resp.json().await?)
781 }
782
783 pub async fn get_table(
791 &self,
792 catalog_id: impl AsRef<str>,
793 database_name: impl AsRef<str>,
794 table_name: impl AsRef<str>,
795 ) -> Result<GetTableResponse, UnityCatalogError> {
796 let token = self.get_credential().await?;
797 let resp = self
799 .client
800 .get(format!(
801 "{}/tables/{}.{}.{}",
802 self.catalog_url(),
803 catalog_id.as_ref(),
804 database_name.as_ref(),
805 table_name.as_ref(),
806 ))
807 .header(AUTHORIZATION, token)
808 .send()
809 .await?;
810
811 Ok(resp.json().await?)
812 }
813
814 pub async fn get_temp_table_credentials(
815 &self,
816 catalog_id: impl AsRef<str>,
817 database_name: impl AsRef<str>,
818 table_name: impl AsRef<str>,
819 ) -> Result<TableTempCredentialsResponse, UnityCatalogError> {
820 let token = self.get_credential().await?;
821 let table_info = self
822 .get_table(catalog_id, database_name, table_name)
823 .await?;
824 let response = match table_info {
825 GetTableResponse::Success(table) => {
826 let request = TemporaryTableCredentialsRequest::new(&table.table_id, "READ");
827 Ok(self
828 .client
829 .post(format!(
830 "{}/temporary-table-credentials",
831 self.catalog_url()
832 ))
833 .header(AUTHORIZATION, token)
834 .json(&request)
835 .send()
836 .await?)
837 }
838 GetTableResponse::Error(err) => Err(UnityCatalogError::InvalidTable {
839 error_code: err.error_code,
840 message: err.message,
841 }),
842 }?;
843
844 Ok(response.json().await?)
845 }
846}
847
848#[derive(Clone, Default, Debug)]
849pub struct UnityCatalogFactory {}
850
851impl ObjectStoreFactory for UnityCatalogFactory {
852 fn parse_url_opts(
853 &self,
854 table_uri: &Url,
855 config: &StorageConfig,
856 ) -> DeltaResult<(ObjectStoreRef, Path)> {
857 let (table_path, temp_creds) = UnityCatalogBuilder::execute_uc_future(
858 UnityCatalogBuilder::get_uc_location_and_token(table_uri.as_str(), Some(&config.raw)),
859 )??;
860
861 let mut storage_options = config.raw.clone();
862 storage_options.extend(temp_creds);
863
864 let table_url = ensure_table_uri(&table_path)?;
867 let mut builder = DeltaTableBuilder::from_uri(table_url)?;
868
869 if let Some(runtime) = &config.runtime {
870 builder = builder.with_io_runtime(runtime.clone());
871 }
872
873 if !storage_options.is_empty() {
874 builder = builder.with_storage_options(storage_options.clone());
875 }
876 let prefix = Path::parse(table_uri.path())?;
877 let store = builder.build_storage()?.object_store(None);
878
879 Ok((store, prefix))
880 }
881}
882
883impl LogStoreFactory for UnityCatalogFactory {
884 fn with_options(
885 &self,
886 prefixed_store: ObjectStoreRef,
887 root_store: ObjectStoreRef,
888 location: &Url,
889 options: &StorageConfig,
890 ) -> DeltaResult<Arc<dyn LogStore>> {
891 Ok(default_logstore(
892 prefixed_store,
893 root_store,
894 location,
895 options,
896 ))
897 }
898}
899
900pub fn register_handlers(_additional_prefixes: Option<Url>) {
902 let factory = Arc::new(UnityCatalogFactory::default());
903 let url = Url::parse("uc://").unwrap();
904 object_store_factories().insert(url.clone(), factory.clone());
905 logstore_factories().insert(url.clone(), factory.clone());
906}
907
908#[async_trait::async_trait]
909impl DataCatalog for UnityCatalog {
910 type Error = UnityCatalogError;
911 async fn get_table_storage_location(
913 &self,
914 catalog_id: Option<String>,
915 database_name: &str,
916 table_name: &str,
917 ) -> Result<String, UnityCatalogError> {
918 match self
919 .get_table(
920 catalog_id.unwrap_or("main".into()),
921 database_name,
922 table_name,
923 )
924 .await?
925 {
926 GetTableResponse::Success(table) => Ok(table.storage_location),
927 GetTableResponse::Error(err) => Err(UnityCatalogError::InvalidTable {
928 error_code: err.error_code,
929 message: err.message,
930 }),
931 }
932 }
933}
934
935impl std::fmt::Debug for UnityCatalog {
936 fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
937 write!(fmt, "UnityCatalog")
938 }
939}
940
941#[cfg(test)]
942mod tests {
943 use crate::client::ClientOptions;
944 use crate::models::tests::{GET_SCHEMA_RESPONSE, GET_TABLE_RESPONSE, LIST_SCHEMAS_RESPONSE};
945 use crate::models::*;
946 use crate::UnityCatalogBuilder;
947 use deltalake_core::DataCatalog;
948 use httpmock::prelude::*;
949 use std::collections::HashMap;
950
951 #[tokio::test]
952 async fn test_unity_client() {
953 let server = MockServer::start_async().await;
954
955 let options = ClientOptions::default().with_allow_http(true);
956
957 let client = UnityCatalogBuilder::new()
958 .with_workspace_url(server.url(""))
959 .with_bearer_token("bearer_token")
960 .with_client_options(options)
961 .build()
962 .unwrap();
963
964 server
965 .mock_async(|when, then| {
966 when.path("/api/2.1/unity-catalog/schemas").method("GET");
967 then.body(LIST_SCHEMAS_RESPONSE);
968 })
969 .await;
970
971 server
972 .mock_async(|when, then| {
973 when.path("/api/2.1/unity-catalog/schemas/catalog_name.schema_name")
974 .method("GET");
975 then.body(GET_SCHEMA_RESPONSE);
976 })
977 .await;
978
979 server
980 .mock_async(|when, then| {
981 when.path("/api/2.1/unity-catalog/tables/catalog_name.schema_name.table_name")
982 .method("GET");
983 then.body(GET_TABLE_RESPONSE);
984 })
985 .await;
986
987 let list_schemas_response = client.list_schemas("catalog_name").await.unwrap();
988 assert!(matches!(
989 list_schemas_response,
990 ListSchemasResponse::Success { .. }
991 ));
992
993 let get_schema_response = client
994 .get_schema("catalog_name", "schema_name")
995 .await
996 .unwrap();
997 assert!(matches!(get_schema_response, GetSchemaResponse::Success(_)));
998
999 let get_table_response = client
1000 .get_table("catalog_name", "schema_name", "table_name")
1001 .await;
1002 assert!(matches!(
1003 get_table_response.unwrap(),
1004 GetTableResponse::Success(_)
1005 ));
1006
1007 let storage_location = client
1008 .get_table_storage_location(
1009 Some("catalog_name".to_string()),
1010 "schema_name",
1011 "table_name",
1012 )
1013 .await
1014 .unwrap();
1015 assert!(storage_location.eq_ignore_ascii_case("string"));
1016 }
1017
1018 #[test]
1019 fn test_unitycatalogbuilder_with_storage_options() {
1020 let mut storage_options = HashMap::new();
1021 storage_options.insert(
1022 "databricks_host".to_string(),
1023 "https://test.databricks.com".to_string(),
1024 );
1025 storage_options.insert("databricks_token".to_string(), "test_token".to_string());
1026
1027 let builder = UnityCatalogBuilder::new()
1028 .try_with_options(&storage_options)
1029 .unwrap();
1030
1031 assert_eq!(
1032 builder.workspace_url,
1033 Some("https://test.databricks.com".to_string())
1034 );
1035 assert_eq!(builder.bearer_token, Some("test_token".to_string()));
1036 }
1037
1038 #[test]
1039 fn test_unitycatalogbuilder_client_credentials() {
1040 let mut storage_options = HashMap::new();
1041 storage_options.insert(
1042 "databricks_host".to_string(),
1043 "https://test.databricks.com".to_string(),
1044 );
1045 storage_options.insert("unity_client_id".to_string(), "test_client_id".to_string());
1046 storage_options.insert("unity_client_secret".to_string(), "test_secret".to_string());
1047 storage_options.insert("unity_authority_id".to_string(), "test_tenant".to_string());
1048
1049 let builder = UnityCatalogBuilder::new()
1050 .try_with_options(&storage_options)
1051 .unwrap();
1052
1053 assert_eq!(
1054 builder.workspace_url,
1055 Some("https://test.databricks.com".to_string())
1056 );
1057 assert_eq!(builder.client_id, Some("test_client_id".to_string()));
1058 assert_eq!(builder.client_secret, Some("test_secret".to_string()));
1059 assert_eq!(builder.authority_id, Some("test_tenant".to_string()));
1060 }
1061
1062 #[test]
1063 fn test_env_with_storage_options_override() {
1064 std::env::set_var("DATABRICKS_HOST", "https://env.databricks.com");
1065 std::env::set_var("DATABRICKS_TOKEN", "env_token");
1066
1067 let mut storage_options = HashMap::new();
1068 storage_options.insert(
1069 "databricks_host".to_string(),
1070 "https://override.databricks.com".to_string(),
1071 );
1072
1073 let builder = UnityCatalogBuilder::from_env()
1074 .try_with_options(&storage_options)
1075 .unwrap();
1076
1077 assert_eq!(
1078 builder.workspace_url,
1079 Some("https://override.databricks.com".to_string())
1080 );
1081 assert_eq!(builder.bearer_token, Some("env_token".to_string()));
1082
1083 std::env::remove_var("DATABRICKS_HOST");
1084 std::env::remove_var("DATABRICKS_TOKEN");
1085 }
1086
1087 #[test]
1088 fn test_storage_options_key_variations() {
1089 let test_cases = vec![
1090 ("databricks_host", "workspace_url"),
1091 ("unity_workspace_url", "workspace_url"),
1092 ("databricks_workspace_url", "workspace_url"),
1093 ("databricks_token", "bearer_token"),
1094 ("token", "bearer_token"),
1095 ("unity_client_id", "client_id"),
1096 ("databricks_client_id", "client_id"),
1097 ("client_id", "client_id"),
1098 ];
1099
1100 for (key, field) in test_cases {
1101 let mut storage_options = HashMap::new();
1102 let test_value = format!("test_value_for_{}", key);
1103 storage_options.insert(key.to_string(), test_value.clone());
1104
1105 let result = UnityCatalogBuilder::new().try_with_options(&storage_options);
1106 assert!(result.is_ok(), "Failed to parse key: {}", key);
1107
1108 let builder = result.unwrap();
1109 match field {
1110 "workspace_url" => assert_eq!(builder.workspace_url, Some(test_value)),
1111 "bearer_token" => assert_eq!(builder.bearer_token, Some(test_value)),
1112 "client_id" => assert_eq!(builder.client_id, Some(test_value)),
1113 _ => {}
1114 }
1115 }
1116 }
1117
1118 #[test]
1119 fn test_invalid_config_key() {
1120 let mut storage_options = HashMap::new();
1121 storage_options.insert("invalid_key".to_string(), "test_value".to_string());
1122
1123 let result = UnityCatalogBuilder::new().try_with_options(&storage_options);
1124 assert!(result.is_err());
1125 }
1126
1127 #[test]
1128 fn test_boolean_options() {
1129 let test_cases = vec![
1130 ("true", true),
1131 ("false", false),
1132 ("1", true),
1133 ("0", false),
1134 ("yes", true),
1135 ("no", false),
1136 ];
1137
1138 for (value, expected) in test_cases {
1139 let mut storage_options = HashMap::new();
1140 storage_options.insert("unity_allow_http_url".to_string(), value.to_string());
1141 storage_options.insert("unity_use_azure_cli".to_string(), value.to_string());
1142
1143 let builder = UnityCatalogBuilder::new()
1144 .try_with_options(&storage_options)
1145 .unwrap();
1146
1147 assert_eq!(
1148 builder.allow_http_url, expected,
1149 "Failed for value: {}",
1150 value
1151 );
1152 assert_eq!(
1153 builder.use_azure_cli, expected,
1154 "Failed for value: {}",
1155 value
1156 );
1157 }
1158 }
1159
1160 #[tokio::test]
1161 async fn test_invalid_table_uri() {
1162 let test_cases = vec![
1163 "uc://invalid",
1164 "uc://",
1165 "uc://catalog",
1166 "uc://catalog.schema",
1167 "uc://catalog.schema.table.extra",
1168 "invalid://catalog.schema.table",
1169 ];
1170
1171 for uri in test_cases {
1172 let result = UnityCatalogBuilder::get_uc_location_and_token(uri, None).await;
1173 assert!(result.is_err(), "Expected error for URI: {}", uri);
1174
1175 if let Err(e) = result {
1176 if uri.starts_with("uc://") && uri.len() > 5 {
1177 assert!(matches!(
1178 e,
1179 crate::UnityCatalogError::InvalidTableURI { .. }
1180 ));
1181 }
1182 }
1183 }
1184 }
1185}