1#[cfg(feature = "credential-vendor-aws")]
66pub mod aws;
67
68#[cfg(feature = "credential-vendor-azure")]
69pub mod azure;
70
71#[cfg(feature = "credential-vendor-gcp")]
72pub mod gcp;
73
74#[cfg(any(
77 feature = "credential-vendor-aws",
78 feature = "credential-vendor-azure",
79 feature = "credential-vendor-gcp"
80))]
81pub mod cache;
82
83use std::collections::HashMap;
84use std::str::FromStr;
85
86use async_trait::async_trait;
87use lance_core::Result;
88use lance_io::object_store::uri_to_url;
89use lance_namespace::models::Identity;
90
91pub const DEFAULT_CREDENTIAL_DURATION_MILLIS: u64 = 3600 * 1000;
93
94pub fn redact_credential(credential: &str) -> String {
107 const SHOW_START: usize = 8;
108 const SHOW_END: usize = 4;
109 const MIN_LENGTH_FOR_BOTH_ENDS: usize = SHOW_START + SHOW_END + 4; if credential.is_empty() {
112 return "[empty]".to_string();
113 }
114
115 if credential.len() < MIN_LENGTH_FOR_BOTH_ENDS {
116 let show = credential.len().min(SHOW_START);
118 format!("{}***", &credential[..show])
119 } else {
120 format!(
122 "{}***{}",
123 &credential[..SHOW_START],
124 &credential[credential.len() - SHOW_END..]
125 )
126 }
127}
128
129#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
142pub enum VendedPermission {
143 #[default]
145 Read,
146 Write,
152 Admin,
154}
155
156impl VendedPermission {
157 pub fn can_write(&self) -> bool {
159 matches!(self, Self::Write | Self::Admin)
160 }
161
162 pub fn can_delete(&self) -> bool {
164 matches!(self, Self::Admin)
165 }
166}
167
168impl FromStr for VendedPermission {
169 type Err = String;
170
171 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
172 match s.to_lowercase().as_str() {
173 "read" => Ok(Self::Read),
174 "write" => Ok(Self::Write),
175 "admin" => Ok(Self::Admin),
176 _ => Err(format!(
177 "Invalid permission '{}'. Must be one of: read, write, admin",
178 s
179 )),
180 }
181 }
182}
183
184impl std::fmt::Display for VendedPermission {
185 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186 match self {
187 Self::Read => write!(f, "read"),
188 Self::Write => write!(f, "write"),
189 Self::Admin => write!(f, "admin"),
190 }
191 }
192}
193
194pub const PROPERTY_PREFIX: &str = "credential_vendor.";
197
198pub const ENABLED: &str = "enabled";
200
201pub const PERMISSION: &str = "permission";
203
204pub const CACHE_ENABLED: &str = "cache_enabled";
207
208pub const API_KEY_SALT: &str = "api_key_salt";
211
212pub const API_KEY_HASH_PREFIX: &str = "api_key_hash.";
215
216#[cfg(feature = "credential-vendor-aws")]
218pub mod aws_props {
219 pub const ROLE_ARN: &str = "aws_role_arn";
220 pub const EXTERNAL_ID: &str = "aws_external_id";
221 pub const REGION: &str = "aws_region";
222 pub const ROLE_SESSION_NAME: &str = "aws_role_session_name";
223 pub const DURATION_MILLIS: &str = "aws_duration_millis";
226}
227
228#[cfg(feature = "credential-vendor-gcp")]
230pub mod gcp_props {
231 pub const SERVICE_ACCOUNT: &str = "gcp_service_account";
232
233 pub const WORKLOAD_IDENTITY_PROVIDER: &str = "gcp_workload_identity_provider";
236
237 pub const IMPERSONATION_SERVICE_ACCOUNT: &str = "gcp_impersonation_service_account";
240}
241
242#[cfg(feature = "credential-vendor-azure")]
244pub mod azure_props {
245 pub const TENANT_ID: &str = "azure_tenant_id";
246 pub const ACCOUNT_NAME: &str = "azure_account_name";
248 pub const DURATION_MILLIS: &str = "azure_duration_millis";
251
252 pub const FEDERATED_CLIENT_ID: &str = "azure_federated_client_id";
255}
256
257#[derive(Clone)]
259pub struct VendedCredentials {
260 pub storage_options: HashMap<String, String>,
265
266 pub expires_at_millis: u64,
268}
269
270impl std::fmt::Debug for VendedCredentials {
271 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
272 f.debug_struct("VendedCredentials")
273 .field(
274 "storage_options",
275 &format!("[{} keys redacted]", self.storage_options.len()),
276 )
277 .field("expires_at_millis", &self.expires_at_millis)
278 .finish()
279 }
280}
281
282impl VendedCredentials {
283 pub fn new(storage_options: HashMap<String, String>, expires_at_millis: u64) -> Self {
285 Self {
286 storage_options,
287 expires_at_millis,
288 }
289 }
290
291 pub fn is_expired(&self) -> bool {
293 let now_millis = std::time::SystemTime::now()
294 .duration_since(std::time::UNIX_EPOCH)
295 .expect("time went backwards")
296 .as_millis() as u64;
297 now_millis >= self.expires_at_millis
298 }
299}
300
301#[async_trait]
307pub trait CredentialVendor: Send + Sync + std::fmt::Debug {
308 async fn vend_credentials(
331 &self,
332 table_location: &str,
333 identity: Option<&Identity>,
334 ) -> Result<VendedCredentials>;
335
336 fn provider_name(&self) -> &'static str;
338
339 fn permission(&self) -> VendedPermission;
341}
342
343pub fn detect_provider_from_uri(uri: &str) -> &'static str {
352 let Ok(url) = uri_to_url(uri) else {
353 return "unknown";
354 };
355
356 match url.scheme() {
357 "s3" => "aws",
358 "gs" => "gcp",
359 "az" | "abfss" => "azure",
360 _ => "unknown",
361 }
362}
363
364pub fn has_credential_vendor_config(properties: &HashMap<String, String>) -> bool {
369 properties
370 .get(ENABLED)
371 .map(|v| v.eq_ignore_ascii_case("true"))
372 .unwrap_or(false)
373}
374
375#[allow(unused_variables)]
397pub async fn create_credential_vendor_for_location(
398 table_location: &str,
399 properties: &HashMap<String, String>,
400) -> Result<Option<Box<dyn CredentialVendor>>> {
401 let provider = detect_provider_from_uri(table_location);
402
403 let vendor: Option<Box<dyn CredentialVendor>> = match provider {
404 #[cfg(feature = "credential-vendor-aws")]
405 "aws" => create_aws_vendor(properties).await?,
406
407 #[cfg(feature = "credential-vendor-gcp")]
408 "gcp" => create_gcp_vendor(properties).await?,
409
410 #[cfg(feature = "credential-vendor-azure")]
411 "azure" => create_azure_vendor(properties)?,
412
413 _ => None,
414 };
415
416 #[cfg(any(
418 feature = "credential-vendor-aws",
419 feature = "credential-vendor-azure",
420 feature = "credential-vendor-gcp"
421 ))]
422 if let Some(v) = vendor {
423 let cache_enabled = properties
424 .get(CACHE_ENABLED)
425 .map(|s| !s.eq_ignore_ascii_case("false"))
426 .unwrap_or(true);
427
428 if cache_enabled {
429 return Ok(Some(Box::new(cache::CachingCredentialVendor::new(v))));
430 } else {
431 return Ok(Some(v));
432 }
433 }
434
435 #[cfg(not(any(
436 feature = "credential-vendor-aws",
437 feature = "credential-vendor-azure",
438 feature = "credential-vendor-gcp"
439 )))]
440 let _ = vendor;
441
442 Ok(None)
443}
444
445#[cfg(any(
447 test,
448 feature = "credential-vendor-aws",
449 feature = "credential-vendor-azure",
450 feature = "credential-vendor-gcp"
451))]
452fn parse_permission(properties: &HashMap<String, String>) -> VendedPermission {
453 properties
454 .get(PERMISSION)
455 .and_then(|s| s.parse().ok())
456 .unwrap_or_default()
457}
458
459#[cfg(any(
461 test,
462 feature = "credential-vendor-aws",
463 feature = "credential-vendor-azure",
464 feature = "credential-vendor-gcp"
465))]
466fn parse_duration_millis(properties: &HashMap<String, String>, key: &str) -> u64 {
467 properties
468 .get(key)
469 .and_then(|s| s.parse::<u64>().ok())
470 .unwrap_or(DEFAULT_CREDENTIAL_DURATION_MILLIS)
471}
472
473#[cfg(feature = "credential-vendor-aws")]
474async fn create_aws_vendor(
475 properties: &HashMap<String, String>,
476) -> Result<Option<Box<dyn CredentialVendor>>> {
477 use aws::{AwsCredentialVendor, AwsCredentialVendorConfig};
478 use lance_namespace::error::NamespaceError;
479
480 let role_arn = properties.get(aws_props::ROLE_ARN).ok_or_else(|| {
482 lance_core::Error::from(NamespaceError::InvalidInput {
483 message: "AWS credential vending requires 'credential_vendor.aws_role_arn' to be set"
484 .to_string(),
485 })
486 })?;
487
488 let duration_millis = parse_duration_millis(properties, aws_props::DURATION_MILLIS);
489
490 let permission = parse_permission(properties);
491
492 let mut config = AwsCredentialVendorConfig::new(role_arn)
493 .with_duration_millis(duration_millis)
494 .with_permission(permission);
495
496 if let Some(external_id) = properties.get(aws_props::EXTERNAL_ID) {
497 config = config.with_external_id(external_id);
498 }
499 if let Some(region) = properties.get(aws_props::REGION) {
500 config = config.with_region(region);
501 }
502 if let Some(session_name) = properties.get(aws_props::ROLE_SESSION_NAME) {
503 config = config.with_role_session_name(session_name);
504 }
505
506 let vendor = AwsCredentialVendor::new(config).await?;
507 Ok(Some(Box::new(vendor)))
508}
509
510#[cfg(feature = "credential-vendor-gcp")]
511async fn create_gcp_vendor(
512 properties: &HashMap<String, String>,
513) -> Result<Option<Box<dyn CredentialVendor>>> {
514 use gcp::{GcpCredentialVendor, GcpCredentialVendorConfig};
515
516 let permission = parse_permission(properties);
517
518 let mut config = GcpCredentialVendorConfig::new().with_permission(permission);
519
520 if let Some(sa) = properties.get(gcp_props::SERVICE_ACCOUNT) {
521 config = config.with_service_account(sa);
522 }
523 if let Some(provider) = properties.get(gcp_props::WORKLOAD_IDENTITY_PROVIDER) {
524 config = config.with_workload_identity_provider(provider);
525 }
526 if let Some(service_account) = properties.get(gcp_props::IMPERSONATION_SERVICE_ACCOUNT) {
527 config = config.with_impersonation_service_account(service_account);
528 }
529
530 let vendor = GcpCredentialVendor::new(config)?;
531 Ok(Some(Box::new(vendor)))
532}
533
534#[cfg(feature = "credential-vendor-azure")]
535fn create_azure_vendor(
536 properties: &HashMap<String, String>,
537) -> Result<Option<Box<dyn CredentialVendor>>> {
538 use azure::{AzureCredentialVendor, AzureCredentialVendorConfig};
539 use lance_namespace::error::NamespaceError;
540
541 let account_name = properties.get(azure_props::ACCOUNT_NAME).ok_or_else(|| {
543 lance_core::Error::from(NamespaceError::InvalidInput {
544 message:
545 "Azure credential vending requires 'credential_vendor.azure_account_name' to be set"
546 .to_string(),
547 })
548 })?;
549
550 let duration_millis = parse_duration_millis(properties, azure_props::DURATION_MILLIS);
551 let permission = parse_permission(properties);
552
553 let mut config = AzureCredentialVendorConfig::new()
554 .with_account_name(account_name)
555 .with_duration_millis(duration_millis)
556 .with_permission(permission);
557
558 if let Some(tenant_id) = properties.get(azure_props::TENANT_ID) {
559 config = config.with_tenant_id(tenant_id);
560 }
561 if let Some(client_id) = properties.get(azure_props::FEDERATED_CLIENT_ID) {
562 config = config.with_federated_client_id(client_id);
563 }
564
565 let vendor = AzureCredentialVendor::new(config);
566 Ok(Some(Box::new(vendor)))
567}
568
569#[cfg(test)]
570mod tests {
571 use super::*;
572
573 #[test]
574 fn test_detect_provider_from_uri() {
575 assert_eq!(detect_provider_from_uri("s3://bucket/path"), "aws");
577 assert_eq!(detect_provider_from_uri("S3://bucket/path"), "aws");
578
579 assert_eq!(detect_provider_from_uri("gs://bucket/path"), "gcp");
581 assert_eq!(detect_provider_from_uri("GS://bucket/path"), "gcp");
582
583 assert_eq!(detect_provider_from_uri("az://container/path"), "azure");
585 assert_eq!(
586 detect_provider_from_uri("az://container@account.blob.core.windows.net/path"),
587 "azure"
588 );
589 assert_eq!(
590 detect_provider_from_uri("abfss://container@account.dfs.core.windows.net/path"),
591 "azure"
592 );
593
594 assert_eq!(detect_provider_from_uri("/local/path"), "unknown");
596 assert_eq!(detect_provider_from_uri("file:///local/path"), "unknown");
597 assert_eq!(detect_provider_from_uri("memory://test"), "unknown");
598 assert_eq!(detect_provider_from_uri("s3a://bucket/path"), "unknown");
600 assert_eq!(
601 detect_provider_from_uri("wasbs://container@account.blob.core.windows.net/path"),
602 "unknown"
603 );
604 }
605
606 #[test]
607 fn test_vended_permission_from_str() {
608 assert_eq!(
610 "read".parse::<VendedPermission>().unwrap(),
611 VendedPermission::Read
612 );
613 assert_eq!(
614 "READ".parse::<VendedPermission>().unwrap(),
615 VendedPermission::Read
616 );
617 assert_eq!(
618 "write".parse::<VendedPermission>().unwrap(),
619 VendedPermission::Write
620 );
621 assert_eq!(
622 "WRITE".parse::<VendedPermission>().unwrap(),
623 VendedPermission::Write
624 );
625 assert_eq!(
626 "admin".parse::<VendedPermission>().unwrap(),
627 VendedPermission::Admin
628 );
629 assert_eq!(
630 "Admin".parse::<VendedPermission>().unwrap(),
631 VendedPermission::Admin
632 );
633
634 let err = "invalid".parse::<VendedPermission>().unwrap_err();
636 assert!(err.contains("Invalid permission"));
637 assert!(err.contains("invalid"));
638
639 let err = "".parse::<VendedPermission>().unwrap_err();
640 assert!(err.contains("Invalid permission"));
641
642 let err = "readwrite".parse::<VendedPermission>().unwrap_err();
643 assert!(err.contains("Invalid permission"));
644 }
645
646 #[test]
647 fn test_vended_permission_display() {
648 assert_eq!(VendedPermission::Read.to_string(), "read");
649 assert_eq!(VendedPermission::Write.to_string(), "write");
650 assert_eq!(VendedPermission::Admin.to_string(), "admin");
651 }
652
653 #[test]
654 fn test_parse_permission_with_invalid_values() {
655 let mut props = HashMap::new();
657 props.insert(PERMISSION.to_string(), "invalid".to_string());
658 assert_eq!(parse_permission(&props), VendedPermission::Read);
659
660 props.insert(PERMISSION.to_string(), "".to_string());
662 assert_eq!(parse_permission(&props), VendedPermission::Read);
663
664 let empty_props: HashMap<String, String> = HashMap::new();
666 assert_eq!(parse_permission(&empty_props), VendedPermission::Read);
667 }
668
669 #[test]
670 fn test_parse_duration_millis_with_invalid_values() {
671 const TEST_KEY: &str = "test_duration_millis";
672
673 let mut props = HashMap::new();
675 props.insert(TEST_KEY.to_string(), "not_a_number".to_string());
676 assert_eq!(
677 parse_duration_millis(&props, TEST_KEY),
678 DEFAULT_CREDENTIAL_DURATION_MILLIS
679 );
680
681 props.insert(TEST_KEY.to_string(), "-1000".to_string());
683 assert_eq!(
684 parse_duration_millis(&props, TEST_KEY),
685 DEFAULT_CREDENTIAL_DURATION_MILLIS
686 );
687
688 props.insert(TEST_KEY.to_string(), "".to_string());
690 assert_eq!(
691 parse_duration_millis(&props, TEST_KEY),
692 DEFAULT_CREDENTIAL_DURATION_MILLIS
693 );
694
695 let empty_props: HashMap<String, String> = HashMap::new();
697 assert_eq!(
698 parse_duration_millis(&empty_props, TEST_KEY),
699 DEFAULT_CREDENTIAL_DURATION_MILLIS
700 );
701
702 props.insert(TEST_KEY.to_string(), "7200000".to_string());
704 assert_eq!(parse_duration_millis(&props, TEST_KEY), 7200000);
705 }
706
707 #[test]
708 fn test_has_credential_vendor_config() {
709 let mut props = HashMap::new();
711 props.insert(ENABLED.to_string(), "true".to_string());
712 assert!(has_credential_vendor_config(&props));
713
714 props.insert(ENABLED.to_string(), "TRUE".to_string());
716 assert!(has_credential_vendor_config(&props));
717
718 props.insert(ENABLED.to_string(), "false".to_string());
720 assert!(!has_credential_vendor_config(&props));
721
722 props.insert(ENABLED.to_string(), "yes".to_string());
724 assert!(!has_credential_vendor_config(&props));
725
726 let empty_props: HashMap<String, String> = HashMap::new();
728 assert!(!has_credential_vendor_config(&empty_props));
729 }
730
731 #[test]
732 fn test_vended_credentials_debug_redacts_secrets() {
733 let mut storage_options = HashMap::new();
734 storage_options.insert(
735 "aws_access_key_id".to_string(),
736 "AKIAIOSFODNN7EXAMPLE".to_string(),
737 );
738 storage_options.insert(
739 "aws_secret_access_key".to_string(),
740 "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".to_string(),
741 );
742 storage_options.insert(
743 "aws_session_token".to_string(),
744 "FwoGZXIvYXdzE...".to_string(),
745 );
746
747 let creds = VendedCredentials::new(storage_options, 1234567890);
748 let debug_output = format!("{:?}", creds);
749
750 assert!(!debug_output.contains("AKIAIOSFODNN7EXAMPLE"));
752 assert!(!debug_output.contains("wJalrXUtnFEMI"));
753 assert!(!debug_output.contains("FwoGZXIvYXdzE"));
754
755 assert!(debug_output.contains("redacted"));
757 assert!(debug_output.contains("3 keys"));
758
759 assert!(debug_output.contains("1234567890"));
761 }
762
763 #[test]
764 fn test_vended_credentials_is_expired() {
765 let past_millis = std::time::SystemTime::now()
767 .duration_since(std::time::UNIX_EPOCH)
768 .unwrap()
769 .as_millis() as u64
770 - 1000; let expired_creds = VendedCredentials::new(HashMap::new(), past_millis);
773 assert!(expired_creds.is_expired());
774
775 let future_millis = std::time::SystemTime::now()
777 .duration_since(std::time::UNIX_EPOCH)
778 .unwrap()
779 .as_millis() as u64
780 + 3600000; let valid_creds = VendedCredentials::new(HashMap::new(), future_millis);
783 assert!(!valid_creds.is_expired());
784 }
785
786 #[test]
787 fn test_redact_credential() {
788 assert_eq!(redact_credential("AKIAIOSFODNN7EXAMPLE"), "AKIAIOSF***MPLE");
790
791 assert_eq!(redact_credential("1234567890123456"), "12345678***3456");
793
794 assert_eq!(redact_credential("short1234567"), "short123***");
796 assert_eq!(redact_credential("short123"), "short123***");
797 assert_eq!(redact_credential("tiny"), "tiny***");
798 assert_eq!(redact_credential("ab"), "ab***");
799 assert_eq!(redact_credential("a"), "a***");
800
801 assert_eq!(redact_credential(""), "[empty]");
803
804 assert_eq!(redact_credential("AKIAIOSFODNN7EXAMPLE"), "AKIAIOSF***MPLE");
807
808 let long_token = "ya29.a0AfH6SMBx1234567890abcdefghijklmnopqrstuvwxyz";
810 assert_eq!(redact_credential(long_token), "ya29.a0A***wxyz");
811
812 let sas_token = "sv=2021-06-08&ss=b&srt=sco&sp=rwdlacuiytfx&se=2024-12-31";
814 assert_eq!(redact_credential(sas_token), "sv=2021-***2-31");
815 }
816}