1#![warn(clippy::all)]
2#![warn(rust_2018_idioms)]
3#[cfg(not(any(feature = "aws", feature = "azure", feature = "gcp", feature = "r2")))]
5compile_error!(
6 "At least one of the following crate features `aws`, `azure`, `gcp`, or `r2` must be enabled \
7 for this crate to function properly."
8);
9
10use deltalake_core::logstore::{
11 LogStore, LogStoreFactory, StorageConfig, default_logstore, logstore_factories,
12 object_store::RetryConfig,
13};
14use reqwest::Url;
15use reqwest::header::{AUTHORIZATION, HeaderValue, InvalidHeaderValue};
16use std::collections::HashMap;
17use std::future::Future;
18use std::str::FromStr;
19use std::sync::Arc;
20use typed_builder::TypedBuilder;
21
22use crate::credential::{
23 AzureCliCredential, ClientSecretOAuthProvider, CredentialProvider, WorkspaceOAuthProvider,
24};
25use crate::models::{
26 ErrorResponse, GetSchemaResponse, GetTableResponse, ListCatalogsResponse, ListSchemasResponse,
27 ListTableSummariesResponse, TableTempCredentialsResponse, TemporaryTableCredentialsRequest,
28 TokenErrorResponse,
29};
30
31use deltalake_core::data_catalog::DataCatalogResult;
32use deltalake_core::{
33 DataCatalog, DataCatalogError, DeltaResult, DeltaTableBuilder, DeltaTableError,
34 ObjectStoreError, Path, ensure_table_uri,
35};
36
37use crate::client::retry::*;
38use deltalake_core::logstore::{
39 ObjectStoreFactory, ObjectStoreRef, config::str_is_truthy, object_store_factories,
40};
41pub mod client;
42pub mod credential;
43
44const STORE_NAME: &str = "UnityCatalogObjectStore";
45#[cfg(feature = "datafusion")]
46pub mod datafusion;
47pub mod models;
48pub mod prelude;
49
50#[derive(thiserror::Error, Debug)]
52pub enum UnityCatalogError {
53 #[error("GET request error: {source}")]
54 RequestError {
56 #[from]
58 source: reqwest::Error,
59 },
60
61 #[error("Error in middleware: {source}")]
62 RequestMiddlewareError {
63 #[from]
65 source: reqwest_middleware::Error,
66 },
67
68 #[error("Invalid table error: {error_code}: {message}")]
70 InvalidTable {
71 error_code: String,
73 message: String,
75 },
76
77 #[error("Invalid token for auth header: {header_error}")]
78 InvalidHeader {
79 #[from]
80 header_error: InvalidHeaderValue,
81 },
82
83 #[error("Invalid Unity Catalog Table URI: {table_uri}")]
85 InvalidTableURI {
86 table_uri: String,
88 },
89
90 #[error("Missing configuration key: {0}")]
92 MissingConfiguration(String),
93
94 #[error("Failed to get a credential from UnityCatalog client configuration.")]
96 MissingCredential,
97
98 #[error("Unable to get temporary credentials from Unity Catalog: {error_code}: {message}")]
100 TemporaryCredentialsFetchFailure {
101 error_code: String,
103 message: String,
105 },
106
107 #[error("Azure CLI error: {message}")]
108 AzureCli {
109 message: String,
111 },
112
113 #[error("Missing or corrupted federated token file for WorkloadIdentity.")]
114 FederatedTokenFile,
115
116 #[cfg(feature = "datafusion")]
117 #[error("Datafusion error: {0}")]
118 DatafusionError(#[from] ::datafusion::common::DataFusionError),
119
120 #[error("Unable to initialize Unity Catalog, potentially a threading issue")]
122 InitializationError,
123
124 #[error("An error occurred in catalog: {source}")]
126 Generic {
127 source: Box<dyn std::error::Error + Send + Sync + 'static>,
129 },
130
131 #[error("Non-200 returned on token acquisition: {0}")]
132 InvalidCredentials(TokenErrorResponse),
133}
134
135impl From<ErrorResponse> for UnityCatalogError {
136 fn from(value: ErrorResponse) -> Self {
137 UnityCatalogError::InvalidTable {
138 error_code: value.error_code,
139 message: value.message,
140 }
141 }
142}
143
144impl From<DataCatalogError> for UnityCatalogError {
145 fn from(value: DataCatalogError) -> Self {
146 UnityCatalogError::Generic {
147 source: Box::new(value),
148 }
149 }
150}
151
152impl From<UnityCatalogError> for DataCatalogError {
153 fn from(value: UnityCatalogError) -> Self {
154 DataCatalogError::Generic {
155 catalog: "Unity",
156 source: Box::new(value),
157 }
158 }
159}
160
161impl From<UnityCatalogError> for DeltaTableError {
162 fn from(value: UnityCatalogError) -> Self {
163 DeltaTableError::GenericError {
164 source: Box::new(value),
165 }
166 }
167}
168
169pub enum UnityCatalogConfigKey {
171 #[deprecated(since = "0.17.0", note = "Please use the DATABRICKS_HOST env variable")]
178 WorkspaceUrl,
179
180 Host,
182
183 #[deprecated(
190 since = "0.17.0",
191 note = "Please use the DATABRICKS_TOKEN env variable"
192 )]
193 AccessToken,
194
195 Token,
197
198 ClientId,
205
206 ClientSecret,
213
214 AuthorityId,
221
222 AuthorityHost,
229
230 MsiEndpoint,
238
239 ObjectId,
245
246 MsiResourceId,
252
253 FederatedTokenFile,
259
260 UseAzureCli,
266
267 AllowHttpUrl,
271}
272
273impl FromStr for UnityCatalogConfigKey {
274 type Err = DataCatalogError;
275
276 #[allow(deprecated)]
277 fn from_str(s: &str) -> Result<Self, Self::Err> {
278 match s {
279 "access_token"
280 | "unity_access_token"
281 | "databricks_access_token"
282 | "databricks_token" => Ok(UnityCatalogConfigKey::AccessToken),
283 "authority_host" | "unity_authority_host" | "databricks_authority_host" => {
284 Ok(UnityCatalogConfigKey::AuthorityHost)
285 }
286 "authority_id" | "unity_authority_id" | "databricks_authority_id" => {
287 Ok(UnityCatalogConfigKey::AuthorityId)
288 }
289 "client_id" | "unity_client_id" | "databricks_client_id" => {
290 Ok(UnityCatalogConfigKey::ClientId)
291 }
292 "client_secret" | "unity_client_secret" | "databricks_client_secret" => {
293 Ok(UnityCatalogConfigKey::ClientSecret)
294 }
295 "federated_token_file"
296 | "unity_federated_token_file"
297 | "databricks_federated_token_file" => Ok(UnityCatalogConfigKey::FederatedTokenFile),
298 "host" => Ok(UnityCatalogConfigKey::Host),
299 "msi_endpoint" | "unity_msi_endpoint" | "databricks_msi_endpoint" => {
300 Ok(UnityCatalogConfigKey::MsiEndpoint)
301 }
302 "msi_resource_id" | "unity_msi_resource_id" | "databricks_msi_resource_id" => {
303 Ok(UnityCatalogConfigKey::MsiResourceId)
304 }
305 "object_id" | "unity_object_id" | "databricks_object_id" => {
306 Ok(UnityCatalogConfigKey::ObjectId)
307 }
308 "token" => Ok(UnityCatalogConfigKey::Token),
309 "use_azure_cli" | "unity_use_azure_cli" | "databricks_use_azure_cli" => {
310 Ok(UnityCatalogConfigKey::UseAzureCli)
311 }
312 "workspace_url"
313 | "unity_workspace_url"
314 | "databricks_workspace_url"
315 | "databricks_host" => Ok(UnityCatalogConfigKey::WorkspaceUrl),
316 "allow_http_url" | "unity_allow_http_url" => Ok(UnityCatalogConfigKey::AllowHttpUrl),
317 _ => Err(DataCatalogError::UnknownConfigKey {
318 catalog: "unity",
319 key: s.to_string(),
320 }),
321 }
322 }
323}
324
325#[allow(deprecated)]
326impl AsRef<str> for UnityCatalogConfigKey {
327 fn as_ref(&self) -> &str {
328 match self {
329 UnityCatalogConfigKey::AccessToken => "unity_access_token",
330 UnityCatalogConfigKey::AllowHttpUrl => "unity_allow_http_url",
331 UnityCatalogConfigKey::AuthorityHost => "unity_authority_host",
332 UnityCatalogConfigKey::AuthorityId => "unity_authority_id",
333 UnityCatalogConfigKey::ClientId => "unity_client_id",
334 UnityCatalogConfigKey::ClientSecret => "unity_client_secret",
335 UnityCatalogConfigKey::FederatedTokenFile => "unity_federated_token_file",
336 UnityCatalogConfigKey::Host => "databricks_host",
337 UnityCatalogConfigKey::MsiEndpoint => "unity_msi_endpoint",
338 UnityCatalogConfigKey::MsiResourceId => "unity_msi_resource_id",
339 UnityCatalogConfigKey::ObjectId => "unity_object_id",
340 UnityCatalogConfigKey::UseAzureCli => "unity_use_azure_cli",
341 UnityCatalogConfigKey::Token => "databricks_token",
342 UnityCatalogConfigKey::WorkspaceUrl => "unity_workspace_url",
343 }
344 }
345}
346
347#[derive(TypedBuilder)]
349#[builder(doc)]
350pub struct UnityCatalogBuilder {
351 #[builder(default, setter(strip_option, into))]
353 workspace_url: Option<String>,
354
355 #[builder(default, setter(strip_option, into))]
357 bearer_token: Option<String>,
358
359 #[builder(default, setter(strip_option, into))]
361 client_id: Option<String>,
362
363 #[builder(default, setter(strip_option, into))]
365 client_secret: Option<String>,
366
367 #[builder(default, setter(strip_option, into))]
369 authority_id: Option<String>,
370
371 #[builder(default, setter(strip_option, into))]
373 authority_host: Option<String>,
374
375 #[builder(default, setter(strip_option, into))]
377 msi_endpoint: Option<String>,
378
379 #[builder(default, setter(strip_option, into))]
381 object_id: Option<String>,
382
383 #[builder(default, setter(strip_option, into))]
385 msi_resource_id: Option<String>,
386
387 #[builder(default, setter(strip_option, into))]
389 federated_token_file: Option<String>,
390
391 #[builder(default)]
393 use_azure_cli: bool,
394
395 #[builder(default)]
397 allow_http_url: bool,
398
399 #[builder(default)]
401 #[allow(dead_code)]
402 retry_config: RetryConfig,
403
404 #[builder(default)]
406 client_options: client::ClientOptions,
407}
408
409#[allow(deprecated)]
410impl UnityCatalogBuilder {
411 pub fn try_with_option(
413 mut self,
414 key: impl AsRef<str>,
415 value: impl Into<String>,
416 ) -> DataCatalogResult<Self> {
417 match UnityCatalogConfigKey::from_str(key.as_ref())? {
418 UnityCatalogConfigKey::AccessToken => self.bearer_token = Some(value.into()),
419 UnityCatalogConfigKey::AllowHttpUrl => {
420 self.allow_http_url = str_is_truthy(&value.into())
421 }
422 UnityCatalogConfigKey::ClientId => self.client_id = Some(value.into()),
423 UnityCatalogConfigKey::ClientSecret => self.client_secret = Some(value.into()),
424 UnityCatalogConfigKey::AuthorityId => self.authority_id = Some(value.into()),
425 UnityCatalogConfigKey::AuthorityHost => self.authority_host = Some(value.into()),
426 UnityCatalogConfigKey::Host => self.workspace_url = Some(value.into()),
427 UnityCatalogConfigKey::MsiEndpoint => self.msi_endpoint = Some(value.into()),
428 UnityCatalogConfigKey::ObjectId => self.object_id = Some(value.into()),
429 UnityCatalogConfigKey::MsiResourceId => self.msi_resource_id = Some(value.into()),
430 UnityCatalogConfigKey::FederatedTokenFile => {
431 self.federated_token_file = Some(value.into())
432 }
433 UnityCatalogConfigKey::Token => self.bearer_token = Some(value.into()),
434 UnityCatalogConfigKey::UseAzureCli => self.use_azure_cli = str_is_truthy(&value.into()),
435 UnityCatalogConfigKey::WorkspaceUrl => self.workspace_url = Some(value.into()),
436 };
437 Ok(self)
438 }
439
440 pub fn try_with_options<I: IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>>(
442 mut self,
443 options: I,
444 ) -> DataCatalogResult<Self> {
445 for (key, value) in options {
446 self = self.try_with_option(key, value)?;
447 }
448 Ok(self)
449 }
450
451 pub fn from_env() -> Self {
455 let mut builder = Self::builder().build();
456 for (os_key, os_value) in std::env::vars_os() {
457 if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str())
458 && (key.starts_with("UNITY_") || key.starts_with("DATABRICKS_"))
459 {
460 tracing::debug!("Found relevant env: {key}");
461 if let Ok(config_key) = UnityCatalogConfigKey::from_str(&key.to_ascii_lowercase()) {
462 tracing::debug!("Trying: {key} with {value}");
463 builder = builder.try_with_option(config_key, value).unwrap();
464 }
465 }
466 }
467
468 builder
469 }
470
471 fn execute_uc_future<F, T>(future: F) -> DeltaResult<T>
472 where
473 T: Send,
474 F: Future<Output = T> + Send,
475 {
476 match tokio::runtime::Handle::try_current() {
477 Ok(handle) => match handle.runtime_flavor() {
478 tokio::runtime::RuntimeFlavor::MultiThread => {
479 Ok(tokio::task::block_in_place(move || handle.block_on(future)))
480 }
481 _ => {
482 let mut cfg: Option<T> = None;
483 std::thread::scope(|scope| {
484 scope.spawn(|| {
485 cfg = Some(handle.block_on(future));
486 });
487 });
488 cfg.ok_or(DeltaTableError::ObjectStore {
489 source: ObjectStoreError::Generic {
490 store: STORE_NAME,
491 source: Box::new(UnityCatalogError::InitializationError),
492 },
493 })
494 }
495 },
496 Err(_) => {
497 let runtime = tokio::runtime::Builder::new_current_thread()
498 .enable_all()
499 .build()
500 .expect("a tokio runtime is required by the Unity Catalog Builder");
501 Ok(runtime.block_on(future))
502 }
503 }
504 }
505
506 pub async fn get_uc_location_and_token(
510 table_uri: &str,
511 storage_options: Option<&HashMap<String, String>>,
512 ) -> Result<(String, HashMap<String, String>), UnityCatalogError> {
513 let uri_parts: Vec<&str> = table_uri[5..].split('.').collect();
514 if uri_parts.len() != 3 {
515 return Err(UnityCatalogError::InvalidTableURI {
516 table_uri: table_uri.to_string(),
517 });
518 }
519
520 let catalog_id = uri_parts[0];
521 let database_name = uri_parts[1];
522 let table_name = uri_parts[2];
523
524 let unity_catalog = if let Some(options) = storage_options {
525 let mut builder = UnityCatalogBuilder::from_env();
526 builder =
527 builder.try_with_options(options.iter().map(|(k, v)| (k.as_str(), v.as_str())))?;
528 builder.build()?
529 } else {
530 UnityCatalogBuilder::from_env().build()?
531 };
532
533 let storage_location = unity_catalog
534 .get_table_storage_location(Some(catalog_id.to_string()), database_name, table_name)
535 .await?;
536 let temp_creds_res = unity_catalog
538 .get_temp_table_credentials_with_permission(
539 catalog_id,
540 database_name,
541 table_name,
542 "READ_WRITE",
543 )
544 .await?;
545 let credentials = match temp_creds_res {
546 TableTempCredentialsResponse::Success(temp_creds) => temp_creds
547 .get_credentials()
548 .ok_or_else(|| UnityCatalogError::MissingCredential)?,
549 TableTempCredentialsResponse::Error(rw_error) => {
550 match unity_catalog
552 .get_temp_table_credentials(catalog_id, database_name, table_name)
553 .await?
554 {
555 TableTempCredentialsResponse::Success(temp_creds) => temp_creds
556 .get_credentials()
557 .ok_or_else(|| UnityCatalogError::MissingCredential)?,
558 TableTempCredentialsResponse::Error(read_error) => {
559 return Err(UnityCatalogError::TemporaryCredentialsFetchFailure {
560 error_code: read_error.error_code,
561 message: format!(
562 "READ_WRITE failed: {}. READ failed: {}",
563 rw_error.message, read_error.message
564 ),
565 });
566 }
567 }
568 }
569 };
570 Ok((storage_location, credentials))
571 }
572
573 fn get_credential_provider(&self) -> Option<CredentialProvider> {
574 if let Some(token) = self.bearer_token.as_ref() {
575 return Some(CredentialProvider::BearerToken(token.clone()));
576 }
577
578 if let (Some(client_id), Some(client_secret), Some(workspace_host)) =
579 (&self.client_id, &self.client_secret, &self.workspace_url)
580 {
581 return Some(CredentialProvider::TokenCredential(
582 Default::default(),
583 Box::new(WorkspaceOAuthProvider::new(
584 client_id,
585 client_secret,
586 workspace_host,
587 )),
588 ));
589 }
590
591 if let (Some(client_id), Some(client_secret), Some(authority_id)) = (
592 self.client_id.as_ref(),
593 self.client_secret.as_ref(),
594 self.authority_id.as_ref(),
595 ) {
596 return Some(CredentialProvider::TokenCredential(
597 Default::default(),
598 Box::new(ClientSecretOAuthProvider::new(
599 client_id,
600 client_secret,
601 authority_id,
602 self.authority_host.as_ref(),
603 )),
604 ));
605 }
606 if self.use_azure_cli {
607 return Some(CredentialProvider::TokenCredential(
608 Default::default(),
609 Box::new(AzureCliCredential::new()),
610 ));
611 }
612
613 None
614 }
615
616 pub fn build(self) -> DataCatalogResult<UnityCatalog> {
618 let credential = self
619 .get_credential_provider()
620 .ok_or(UnityCatalogError::MissingCredential)?;
621
622 let workspace_url = self
623 .workspace_url
624 .ok_or(UnityCatalogError::MissingConfiguration(
625 "workspace_url".into(),
626 ))?
627 .trim_end_matches('/')
628 .to_string();
629
630 let mut client_options = self.client_options;
631 if self.allow_http_url {
632 client_options.allow_http = true;
633 }
634 let client = client_options.client()?;
635
636 Ok(UnityCatalog {
637 client,
638 workspace_url,
639 credential,
640 })
641 }
642}
643
644pub struct UnityCatalog {
646 client: reqwest_middleware::ClientWithMiddleware,
647 credential: CredentialProvider,
648 workspace_url: String,
649}
650
651impl UnityCatalog {
652 async fn get_credential(&self) -> Result<HeaderValue, UnityCatalogError> {
653 match &self.credential {
654 CredentialProvider::BearerToken(token) => {
655 Ok(HeaderValue::from_str(&format!("Bearer {token}"))?)
658 }
659 CredentialProvider::TokenCredential(cache, cred) => {
660 let token = cache
661 .get_or_insert_with(|| cred.fetch_token(&self.client))
662 .await?;
663
664 Ok(HeaderValue::from_str(&format!("Bearer {token}"))?)
667 }
668 }
669 }
670
671 fn catalog_url(&self) -> String {
672 format!("{}/api/2.1/unity-catalog", self.workspace_url)
673 }
674
675 pub async fn list_catalogs(&self) -> Result<ListCatalogsResponse, UnityCatalogError> {
680 let token = self.get_credential().await?;
681 let resp = self
683 .client
684 .get(format!("{}/catalogs", self.catalog_url()))
685 .header(AUTHORIZATION, token)
686 .send()
687 .await?;
688 Ok(resp.json().await?)
689 }
690
691 pub async fn list_schemas(
701 &self,
702 catalog_name: impl AsRef<str>,
703 ) -> Result<ListSchemasResponse, UnityCatalogError> {
704 let token = self.get_credential().await?;
705 let resp = self
707 .client
708 .get(format!("{}/schemas", self.catalog_url()))
709 .header(AUTHORIZATION, token)
710 .query(&[("catalog_name", catalog_name.as_ref())])
711 .send()
712 .await?;
713 let status = resp.status();
714 let body = resp.text().await?;
715 serde_json::from_str(&body).map_err(|e| {
716 tracing::error!(
717 "Failed to parse list_schemas response (status {}): {}",
718 status,
719 body
720 );
721 UnityCatalogError::Generic {
722 source: Box::new(e),
723 }
724 })
725 }
726
727 pub async fn get_schema(
732 &self,
733 catalog_name: impl AsRef<str>,
734 schema_name: impl AsRef<str>,
735 ) -> Result<GetSchemaResponse, UnityCatalogError> {
736 let token = self.get_credential().await?;
737 let resp = self
739 .client
740 .get(format!(
741 "{}/schemas/{}.{}",
742 self.catalog_url(),
743 catalog_name.as_ref(),
744 schema_name.as_ref()
745 ))
746 .header(AUTHORIZATION, token)
747 .send()
748 .await?;
749 Ok(resp.json().await?)
750 }
751
752 pub async fn list_table_summaries(
764 &self,
765 catalog_name: impl AsRef<str>,
766 schema_name_pattern: impl AsRef<str>,
767 ) -> Result<ListTableSummariesResponse, UnityCatalogError> {
768 let token = self.get_credential().await?;
769 let resp = self
771 .client
772 .get(format!("{}/table-summaries", self.catalog_url()))
773 .query(&[
774 ("catalog_name", catalog_name.as_ref()),
775 ("schema_name_pattern", schema_name_pattern.as_ref()),
776 ])
777 .header(AUTHORIZATION, token)
778 .send()
779 .await?;
780
781 Ok(resp.json().await?)
782 }
783
784 pub async fn get_table(
792 &self,
793 catalog_id: impl AsRef<str>,
794 database_name: impl AsRef<str>,
795 table_name: impl AsRef<str>,
796 ) -> Result<GetTableResponse, UnityCatalogError> {
797 let token = self.get_credential().await?;
798 let resp = self
800 .client
801 .get(format!(
802 "{}/tables/{}.{}.{}",
803 self.catalog_url(),
804 catalog_id.as_ref(),
805 database_name.as_ref(),
806 table_name.as_ref(),
807 ))
808 .header(AUTHORIZATION, token)
809 .send()
810 .await?;
811
812 Ok(resp.json().await?)
813 }
814
815 pub async fn get_temp_table_credentials(
816 &self,
817 catalog_id: impl AsRef<str>,
818 database_name: impl AsRef<str>,
819 table_name: impl AsRef<str>,
820 ) -> Result<TableTempCredentialsResponse, UnityCatalogError> {
821 self.get_temp_table_credentials_with_permission(
822 catalog_id,
823 database_name,
824 table_name,
825 "READ",
826 )
827 .await
828 }
829
830 pub async fn get_temp_table_credentials_with_permission(
831 &self,
832 catalog_id: impl AsRef<str>,
833 database_name: impl AsRef<str>,
834 table_name: impl AsRef<str>,
835 operation: impl AsRef<str>,
836 ) -> Result<TableTempCredentialsResponse, UnityCatalogError> {
837 let token = self.get_credential().await?;
838 let table_info = self
839 .get_table(catalog_id, database_name, table_name)
840 .await?;
841 let response = match table_info {
842 GetTableResponse::Success(table) => {
843 let request =
844 TemporaryTableCredentialsRequest::new(&table.table_id, operation.as_ref());
845 Ok(self
846 .client
847 .post(format!(
848 "{}/temporary-table-credentials",
849 self.catalog_url()
850 ))
851 .header(AUTHORIZATION, token)
852 .json(&request)
853 .send()
854 .await?)
855 }
856 GetTableResponse::Error(err) => Err(UnityCatalogError::InvalidTable {
857 error_code: err.error_code,
858 message: err.message,
859 }),
860 }?;
861
862 Ok(response.json().await?)
863 }
864}
865
866#[derive(Clone, Default, Debug)]
867pub struct UnityCatalogFactory {}
868
869impl ObjectStoreFactory for UnityCatalogFactory {
870 fn parse_url_opts(
871 &self,
872 table_uri: &Url,
873 config: &StorageConfig,
874 ) -> DeltaResult<(ObjectStoreRef, Path)> {
875 let (table_path, temp_creds) = UnityCatalogBuilder::execute_uc_future(
876 UnityCatalogBuilder::get_uc_location_and_token(table_uri.as_str(), Some(&config.raw)),
877 )??;
878
879 let mut storage_options = config.raw.clone();
880 storage_options.extend(temp_creds);
881
882 let table_url = ensure_table_uri(&table_path)?;
885 let mut builder = DeltaTableBuilder::from_url(table_url)?;
886
887 if let Some(runtime) = &config.runtime {
888 builder = builder.with_io_runtime(runtime.clone());
889 }
890
891 if !storage_options.is_empty() {
892 builder = builder.with_storage_options(storage_options.clone());
893 }
894 let prefix = Path::parse(table_uri.path())?;
895 let store = builder.build_storage()?.object_store(None);
896
897 Ok((store, prefix))
898 }
899}
900
901impl LogStoreFactory for UnityCatalogFactory {
902 fn with_options(
903 &self,
904 prefixed_store: ObjectStoreRef,
905 root_store: ObjectStoreRef,
906 location: &Url,
907 options: &StorageConfig,
908 ) -> DeltaResult<Arc<dyn LogStore>> {
909 Ok(default_logstore(
910 prefixed_store,
911 root_store,
912 location,
913 options,
914 ))
915 }
916}
917
918pub fn register_handlers(_additional_prefixes: Option<Url>) {
920 let factory = Arc::new(UnityCatalogFactory::default());
921 let url = Url::parse("uc://").unwrap();
922 object_store_factories().insert(url.clone(), factory.clone());
923 logstore_factories().insert(url.clone(), factory.clone());
924}
925
926#[async_trait::async_trait]
927impl DataCatalog for UnityCatalog {
928 type Error = UnityCatalogError;
929 async fn get_table_storage_location(
931 &self,
932 catalog_id: Option<String>,
933 database_name: &str,
934 table_name: &str,
935 ) -> Result<String, UnityCatalogError> {
936 match self
937 .get_table(
938 catalog_id.unwrap_or("main".into()),
939 database_name,
940 table_name,
941 )
942 .await?
943 {
944 GetTableResponse::Success(table) => Ok(table.storage_location),
945 GetTableResponse::Error(err) => Err(UnityCatalogError::InvalidTable {
946 error_code: err.error_code,
947 message: err.message,
948 }),
949 }
950 }
951}
952
953impl std::fmt::Debug for UnityCatalog {
954 fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
955 write!(fmt, "UnityCatalog")
956 }
957}
958
959#[cfg(test)]
960mod tests {
961 use crate::UnityCatalogBuilder;
962 use crate::client::ClientOptions;
963 use crate::models::tests::{GET_SCHEMA_RESPONSE, GET_TABLE_RESPONSE, LIST_SCHEMAS_RESPONSE};
964 use crate::models::*;
965 use deltalake_core::DataCatalog;
966 use httpmock::prelude::*;
967 use std::collections::HashMap;
968
969 #[tokio::test]
970 async fn test_unity_client() {
971 let server = MockServer::start_async().await;
972
973 let options = ClientOptions::builder().allow_http(true).build();
974
975 let client = UnityCatalogBuilder::builder()
976 .workspace_url(server.url(""))
977 .bearer_token("bearer_token")
978 .client_options(options)
979 .build()
980 .build()
981 .unwrap();
982
983 server
984 .mock_async(|when, then| {
985 when.path("/api/2.1/unity-catalog/schemas").method("GET");
986 then.body(LIST_SCHEMAS_RESPONSE);
987 })
988 .await;
989
990 server
991 .mock_async(|when, then| {
992 when.path("/api/2.1/unity-catalog/schemas/catalog_name.schema_name")
993 .method("GET");
994 then.body(GET_SCHEMA_RESPONSE);
995 })
996 .await;
997
998 server
999 .mock_async(|when, then| {
1000 when.path("/api/2.1/unity-catalog/tables/catalog_name.schema_name.table_name")
1001 .method("GET");
1002 then.body(GET_TABLE_RESPONSE);
1003 })
1004 .await;
1005
1006 let list_schemas_response = client.list_schemas("catalog_name").await.unwrap();
1007 assert!(matches!(
1008 list_schemas_response,
1009 ListSchemasResponse::Success { .. }
1010 ));
1011
1012 let get_schema_response = client
1013 .get_schema("catalog_name", "schema_name")
1014 .await
1015 .unwrap();
1016 assert!(matches!(get_schema_response, GetSchemaResponse::Success(_)));
1017
1018 let get_table_response = client
1019 .get_table("catalog_name", "schema_name", "table_name")
1020 .await;
1021 assert!(matches!(
1022 get_table_response.unwrap(),
1023 GetTableResponse::Success(_)
1024 ));
1025
1026 let storage_location = client
1027 .get_table_storage_location(
1028 Some("catalog_name".to_string()),
1029 "schema_name",
1030 "table_name",
1031 )
1032 .await
1033 .unwrap();
1034 assert!(storage_location.eq_ignore_ascii_case("string"));
1035 }
1036
1037 #[test]
1038 fn test_unitycatalogbuilder_with_storage_options() {
1039 let mut storage_options = HashMap::new();
1040 storage_options.insert(
1041 "databricks_host".to_string(),
1042 "https://test.databricks.com".to_string(),
1043 );
1044 storage_options.insert("databricks_token".to_string(), "test_token".to_string());
1045
1046 let builder = UnityCatalogBuilder::builder()
1047 .build()
1048 .try_with_options(&storage_options)
1049 .unwrap();
1050
1051 assert_eq!(
1052 builder.workspace_url,
1053 Some("https://test.databricks.com".to_string())
1054 );
1055 assert_eq!(builder.bearer_token, Some("test_token".to_string()));
1056 }
1057
1058 #[test]
1059 fn test_unitycatalogbuilder_client_credentials() {
1060 let mut storage_options = HashMap::new();
1061 storage_options.insert(
1062 "databricks_host".to_string(),
1063 "https://test.databricks.com".to_string(),
1064 );
1065 storage_options.insert("unity_client_id".to_string(), "test_client_id".to_string());
1066 storage_options.insert("unity_client_secret".to_string(), "test_secret".to_string());
1067 storage_options.insert("unity_authority_id".to_string(), "test_tenant".to_string());
1068
1069 let builder = UnityCatalogBuilder::builder()
1070 .build()
1071 .try_with_options(&storage_options)
1072 .unwrap();
1073
1074 assert_eq!(
1075 builder.workspace_url,
1076 Some("https://test.databricks.com".to_string())
1077 );
1078 assert_eq!(builder.client_id, Some("test_client_id".to_string()));
1079 assert_eq!(builder.client_secret, Some("test_secret".to_string()));
1080 assert_eq!(builder.authority_id, Some("test_tenant".to_string()));
1081 }
1082
1083 #[test]
1084 fn test_env_with_storage_options_override() {
1085 unsafe {
1086 std::env::set_var("DATABRICKS_HOST", "https://env.databricks.com");
1087 std::env::set_var("DATABRICKS_TOKEN", "env_token");
1088 }
1089
1090 let mut storage_options = HashMap::new();
1091 storage_options.insert(
1092 "databricks_host".to_string(),
1093 "https://override.databricks.com".to_string(),
1094 );
1095
1096 let builder = UnityCatalogBuilder::from_env()
1097 .try_with_options(&storage_options)
1098 .unwrap();
1099
1100 assert_eq!(
1101 builder.workspace_url,
1102 Some("https://override.databricks.com".to_string())
1103 );
1104 assert_eq!(builder.bearer_token, Some("env_token".to_string()));
1105
1106 unsafe {
1107 std::env::remove_var("DATABRICKS_HOST");
1108 std::env::remove_var("DATABRICKS_TOKEN");
1109 }
1110 }
1111
1112 #[test]
1113 fn test_storage_options_key_variations() {
1114 let test_cases = vec![
1115 ("databricks_host", "workspace_url"),
1116 ("unity_workspace_url", "workspace_url"),
1117 ("databricks_workspace_url", "workspace_url"),
1118 ("databricks_token", "bearer_token"),
1119 ("token", "bearer_token"),
1120 ("unity_client_id", "client_id"),
1121 ("databricks_client_id", "client_id"),
1122 ("client_id", "client_id"),
1123 ];
1124
1125 for (key, field) in test_cases {
1126 let mut storage_options = HashMap::new();
1127 let test_value = format!("test_value_for_{}", key);
1128 storage_options.insert(key.to_string(), test_value.clone());
1129
1130 let result = UnityCatalogBuilder::builder()
1131 .build()
1132 .try_with_options(&storage_options);
1133 assert!(result.is_ok(), "Failed to parse key: {}", key);
1134
1135 let builder = result.unwrap();
1136 match field {
1137 "workspace_url" => assert_eq!(builder.workspace_url, Some(test_value)),
1138 "bearer_token" => assert_eq!(builder.bearer_token, Some(test_value)),
1139 "client_id" => assert_eq!(builder.client_id, Some(test_value)),
1140 _ => {}
1141 }
1142 }
1143 }
1144
1145 #[test]
1146 fn test_invalid_config_key() {
1147 let mut storage_options = HashMap::new();
1148 storage_options.insert("invalid_key".to_string(), "test_value".to_string());
1149
1150 let result = UnityCatalogBuilder::builder()
1151 .build()
1152 .try_with_options(&storage_options);
1153 assert!(result.is_err());
1154 }
1155
1156 #[test]
1157 fn test_boolean_options() {
1158 let test_cases = vec![
1159 ("true", true),
1160 ("false", false),
1161 ("1", true),
1162 ("0", false),
1163 ("yes", true),
1164 ("no", false),
1165 ];
1166
1167 for (value, expected) in test_cases {
1168 let mut storage_options = HashMap::new();
1169 storage_options.insert("unity_allow_http_url".to_string(), value.to_string());
1170 storage_options.insert("unity_use_azure_cli".to_string(), value.to_string());
1171
1172 let builder = UnityCatalogBuilder::builder()
1173 .build()
1174 .try_with_options(&storage_options)
1175 .unwrap();
1176
1177 assert_eq!(
1178 builder.allow_http_url, expected,
1179 "Failed for value: {}",
1180 value
1181 );
1182 assert_eq!(
1183 builder.use_azure_cli, expected,
1184 "Failed for value: {}",
1185 value
1186 );
1187 }
1188 }
1189
1190 #[tokio::test]
1191 async fn test_invalid_table_uri() {
1192 let test_cases = vec![
1193 "uc://invalid",
1194 "uc://",
1195 "uc://catalog",
1196 "uc://catalog.schema",
1197 "uc://catalog.schema.table.extra",
1198 "invalid://catalog.schema.table",
1199 ];
1200
1201 for uri in test_cases {
1202 let result = UnityCatalogBuilder::get_uc_location_and_token(uri, None).await;
1203 assert!(result.is_err(), "Expected error for URI: {}", uri);
1204
1205 if let Err(e) = result
1206 && uri.starts_with("uc://")
1207 && uri.len() > 5
1208 {
1209 assert!(matches!(
1210 e,
1211 crate::UnityCatalogError::InvalidTableURI { .. }
1212 ));
1213 }
1214 }
1215 }
1216}