1#[cfg(feature = "credential-vendor-aws")]
63pub mod aws;
64
65#[cfg(feature = "credential-vendor-azure")]
66pub mod azure;
67
68#[cfg(feature = "credential-vendor-gcp")]
69pub mod gcp;
70
71#[cfg(any(
74 feature = "credential-vendor-aws",
75 feature = "credential-vendor-azure",
76 feature = "credential-vendor-gcp"
77))]
78pub mod cache;
79
80use std::collections::HashMap;
81use std::str::FromStr;
82
83use async_trait::async_trait;
84use lance_core::Result;
85use lance_io::object_store::uri_to_url;
86use lance_namespace::models::Identity;
87
88pub const DEFAULT_CREDENTIAL_DURATION_MILLIS: u64 = 3600 * 1000;
90
91pub fn redact_credential(credential: &str) -> String {
104 const SHOW_START: usize = 8;
105 const SHOW_END: usize = 4;
106 const MIN_LENGTH_FOR_BOTH_ENDS: usize = SHOW_START + SHOW_END + 4; if credential.is_empty() {
109 return "[empty]".to_string();
110 }
111
112 if credential.len() < MIN_LENGTH_FOR_BOTH_ENDS {
113 let show = credential.len().min(SHOW_START);
115 format!("{}***", &credential[..show])
116 } else {
117 format!(
119 "{}***{}",
120 &credential[..SHOW_START],
121 &credential[credential.len() - SHOW_END..]
122 )
123 }
124}
125
126#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
139pub enum VendedPermission {
140 #[default]
142 Read,
143 Write,
149 Admin,
151}
152
153impl VendedPermission {
154 pub fn can_write(&self) -> bool {
156 matches!(self, Self::Write | Self::Admin)
157 }
158
159 pub fn can_delete(&self) -> bool {
161 matches!(self, Self::Admin)
162 }
163}
164
165impl FromStr for VendedPermission {
166 type Err = String;
167
168 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
169 match s.to_lowercase().as_str() {
170 "read" => Ok(Self::Read),
171 "write" => Ok(Self::Write),
172 "admin" => Ok(Self::Admin),
173 _ => Err(format!(
174 "Invalid permission '{}'. Must be one of: read, write, admin",
175 s
176 )),
177 }
178 }
179}
180
181impl std::fmt::Display for VendedPermission {
182 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
183 match self {
184 Self::Read => write!(f, "read"),
185 Self::Write => write!(f, "write"),
186 Self::Admin => write!(f, "admin"),
187 }
188 }
189}
190
191pub const PROPERTY_PREFIX: &str = "credential_vendor.";
194
195pub const ENABLED: &str = "enabled";
197
198pub const PERMISSION: &str = "permission";
200
201pub const CACHE_ENABLED: &str = "cache_enabled";
204
205pub const API_KEY_SALT: &str = "api_key_salt";
208
209pub const API_KEY_HASH_PREFIX: &str = "api_key_hash.";
212
213#[cfg(feature = "credential-vendor-aws")]
215pub mod aws_props {
216 pub const ROLE_ARN: &str = "aws_role_arn";
217 pub const EXTERNAL_ID: &str = "aws_external_id";
218 pub const REGION: &str = "aws_region";
219 pub const ROLE_SESSION_NAME: &str = "aws_role_session_name";
220 pub const DURATION_MILLIS: &str = "aws_duration_millis";
223}
224
225#[cfg(feature = "credential-vendor-gcp")]
227pub mod gcp_props {
228 pub const SERVICE_ACCOUNT: &str = "gcp_service_account";
229
230 pub const WORKLOAD_IDENTITY_PROVIDER: &str = "gcp_workload_identity_provider";
233
234 pub const IMPERSONATION_SERVICE_ACCOUNT: &str = "gcp_impersonation_service_account";
237}
238
239#[cfg(feature = "credential-vendor-azure")]
241pub mod azure_props {
242 pub const TENANT_ID: &str = "azure_tenant_id";
243 pub const ACCOUNT_NAME: &str = "azure_account_name";
245 pub const DURATION_MILLIS: &str = "azure_duration_millis";
248
249 pub const FEDERATED_CLIENT_ID: &str = "azure_federated_client_id";
252}
253
254#[derive(Clone)]
256pub struct VendedCredentials {
257 pub storage_options: HashMap<String, String>,
262
263 pub expires_at_millis: u64,
265}
266
267impl std::fmt::Debug for VendedCredentials {
268 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269 f.debug_struct("VendedCredentials")
270 .field(
271 "storage_options",
272 &format!("[{} keys redacted]", self.storage_options.len()),
273 )
274 .field("expires_at_millis", &self.expires_at_millis)
275 .finish()
276 }
277}
278
279impl VendedCredentials {
280 pub fn new(storage_options: HashMap<String, String>, expires_at_millis: u64) -> Self {
282 Self {
283 storage_options,
284 expires_at_millis,
285 }
286 }
287
288 pub fn is_expired(&self) -> bool {
290 let now_millis = std::time::SystemTime::now()
291 .duration_since(std::time::UNIX_EPOCH)
292 .expect("time went backwards")
293 .as_millis() as u64;
294 now_millis >= self.expires_at_millis
295 }
296}
297
298#[async_trait]
304pub trait CredentialVendor: Send + Sync + std::fmt::Debug {
305 async fn vend_credentials(
328 &self,
329 table_location: &str,
330 identity: Option<&Identity>,
331 ) -> Result<VendedCredentials>;
332
333 fn provider_name(&self) -> &'static str;
335
336 fn permission(&self) -> VendedPermission;
338}
339
340pub fn detect_provider_from_uri(uri: &str) -> &'static str {
349 let Ok(url) = uri_to_url(uri) else {
350 return "unknown";
351 };
352
353 match url.scheme() {
354 "s3" => "aws",
355 "gs" => "gcp",
356 "az" | "abfss" => "azure",
357 _ => "unknown",
358 }
359}
360
361pub fn has_credential_vendor_config(properties: &HashMap<String, String>) -> bool {
366 properties
367 .get(ENABLED)
368 .map(|v| v.eq_ignore_ascii_case("true"))
369 .unwrap_or(false)
370}
371
372#[allow(unused_variables)]
394pub async fn create_credential_vendor_for_location(
395 table_location: &str,
396 properties: &HashMap<String, String>,
397) -> Result<Option<Box<dyn CredentialVendor>>> {
398 let provider = detect_provider_from_uri(table_location);
399
400 let vendor: Option<Box<dyn CredentialVendor>> = match provider {
401 #[cfg(feature = "credential-vendor-aws")]
402 "aws" => create_aws_vendor(properties).await?,
403
404 #[cfg(feature = "credential-vendor-gcp")]
405 "gcp" => create_gcp_vendor(properties).await?,
406
407 #[cfg(feature = "credential-vendor-azure")]
408 "azure" => create_azure_vendor(properties)?,
409
410 _ => None,
411 };
412
413 #[cfg(any(
415 feature = "credential-vendor-aws",
416 feature = "credential-vendor-azure",
417 feature = "credential-vendor-gcp"
418 ))]
419 if let Some(v) = vendor {
420 let cache_enabled = properties
421 .get(CACHE_ENABLED)
422 .map(|s| !s.eq_ignore_ascii_case("false"))
423 .unwrap_or(true);
424
425 if cache_enabled {
426 return Ok(Some(Box::new(cache::CachingCredentialVendor::new(v))));
427 } else {
428 return Ok(Some(v));
429 }
430 }
431
432 #[cfg(not(any(
433 feature = "credential-vendor-aws",
434 feature = "credential-vendor-azure",
435 feature = "credential-vendor-gcp"
436 )))]
437 let _ = vendor;
438
439 Ok(None)
440}
441
442#[allow(dead_code)]
444fn parse_permission(properties: &HashMap<String, String>) -> VendedPermission {
445 properties
446 .get(PERMISSION)
447 .and_then(|s| s.parse().ok())
448 .unwrap_or_default()
449}
450
451#[allow(dead_code)]
453fn parse_duration_millis(properties: &HashMap<String, String>, key: &str) -> u64 {
454 properties
455 .get(key)
456 .and_then(|s| s.parse::<u64>().ok())
457 .unwrap_or(DEFAULT_CREDENTIAL_DURATION_MILLIS)
458}
459
460#[cfg(feature = "credential-vendor-aws")]
461async fn create_aws_vendor(
462 properties: &HashMap<String, String>,
463) -> Result<Option<Box<dyn CredentialVendor>>> {
464 use aws::{AwsCredentialVendor, AwsCredentialVendorConfig};
465 use lance_core::Error;
466
467 let role_arn = properties.get(aws_props::ROLE_ARN).ok_or_else(|| {
469 Error::invalid_input_source(
470 "AWS credential vending requires 'credential_vendor.aws_role_arn' to be set".into(),
471 )
472 })?;
473
474 let duration_millis = parse_duration_millis(properties, aws_props::DURATION_MILLIS);
475
476 let permission = parse_permission(properties);
477
478 let mut config = AwsCredentialVendorConfig::new(role_arn)
479 .with_duration_millis(duration_millis)
480 .with_permission(permission);
481
482 if let Some(external_id) = properties.get(aws_props::EXTERNAL_ID) {
483 config = config.with_external_id(external_id);
484 }
485 if let Some(region) = properties.get(aws_props::REGION) {
486 config = config.with_region(region);
487 }
488 if let Some(session_name) = properties.get(aws_props::ROLE_SESSION_NAME) {
489 config = config.with_role_session_name(session_name);
490 }
491
492 let vendor = AwsCredentialVendor::new(config).await?;
493 Ok(Some(Box::new(vendor)))
494}
495
496#[cfg(feature = "credential-vendor-gcp")]
497async fn create_gcp_vendor(
498 properties: &HashMap<String, String>,
499) -> Result<Option<Box<dyn CredentialVendor>>> {
500 use gcp::{GcpCredentialVendor, GcpCredentialVendorConfig};
501
502 let permission = parse_permission(properties);
503
504 let mut config = GcpCredentialVendorConfig::new().with_permission(permission);
505
506 if let Some(sa) = properties.get(gcp_props::SERVICE_ACCOUNT) {
507 config = config.with_service_account(sa);
508 }
509
510 let vendor = GcpCredentialVendor::new(config).await?;
511 Ok(Some(Box::new(vendor)))
512}
513
514#[cfg(feature = "credential-vendor-azure")]
515fn create_azure_vendor(
516 properties: &HashMap<String, String>,
517) -> Result<Option<Box<dyn CredentialVendor>>> {
518 use azure::{AzureCredentialVendor, AzureCredentialVendorConfig};
519 use lance_core::Error;
520
521 let account_name = properties.get(azure_props::ACCOUNT_NAME).ok_or_else(|| {
523 Error::invalid_input_source(
524 "Azure credential vending requires 'credential_vendor.azure_account_name' to be set"
525 .into(),
526 )
527 })?;
528
529 let duration_millis = parse_duration_millis(properties, azure_props::DURATION_MILLIS);
530 let permission = parse_permission(properties);
531
532 let mut config = AzureCredentialVendorConfig::new()
533 .with_account_name(account_name)
534 .with_duration_millis(duration_millis)
535 .with_permission(permission);
536
537 if let Some(tenant_id) = properties.get(azure_props::TENANT_ID) {
538 config = config.with_tenant_id(tenant_id);
539 }
540
541 let vendor = AzureCredentialVendor::new(config);
542 Ok(Some(Box::new(vendor)))
543}
544
545#[cfg(test)]
546mod tests {
547 use super::*;
548
549 #[test]
550 fn test_detect_provider_from_uri() {
551 assert_eq!(detect_provider_from_uri("s3://bucket/path"), "aws");
553 assert_eq!(detect_provider_from_uri("S3://bucket/path"), "aws");
554
555 assert_eq!(detect_provider_from_uri("gs://bucket/path"), "gcp");
557 assert_eq!(detect_provider_from_uri("GS://bucket/path"), "gcp");
558
559 assert_eq!(detect_provider_from_uri("az://container/path"), "azure");
561 assert_eq!(
562 detect_provider_from_uri("az://container@account.blob.core.windows.net/path"),
563 "azure"
564 );
565 assert_eq!(
566 detect_provider_from_uri("abfss://container@account.dfs.core.windows.net/path"),
567 "azure"
568 );
569
570 assert_eq!(detect_provider_from_uri("/local/path"), "unknown");
572 assert_eq!(detect_provider_from_uri("file:///local/path"), "unknown");
573 assert_eq!(detect_provider_from_uri("memory://test"), "unknown");
574 assert_eq!(detect_provider_from_uri("s3a://bucket/path"), "unknown");
576 assert_eq!(
577 detect_provider_from_uri("wasbs://container@account.blob.core.windows.net/path"),
578 "unknown"
579 );
580 }
581
582 #[test]
583 fn test_vended_permission_from_str() {
584 assert_eq!(
586 "read".parse::<VendedPermission>().unwrap(),
587 VendedPermission::Read
588 );
589 assert_eq!(
590 "READ".parse::<VendedPermission>().unwrap(),
591 VendedPermission::Read
592 );
593 assert_eq!(
594 "write".parse::<VendedPermission>().unwrap(),
595 VendedPermission::Write
596 );
597 assert_eq!(
598 "WRITE".parse::<VendedPermission>().unwrap(),
599 VendedPermission::Write
600 );
601 assert_eq!(
602 "admin".parse::<VendedPermission>().unwrap(),
603 VendedPermission::Admin
604 );
605 assert_eq!(
606 "Admin".parse::<VendedPermission>().unwrap(),
607 VendedPermission::Admin
608 );
609
610 let err = "invalid".parse::<VendedPermission>().unwrap_err();
612 assert!(err.contains("Invalid permission"));
613 assert!(err.contains("invalid"));
614
615 let err = "".parse::<VendedPermission>().unwrap_err();
616 assert!(err.contains("Invalid permission"));
617
618 let err = "readwrite".parse::<VendedPermission>().unwrap_err();
619 assert!(err.contains("Invalid permission"));
620 }
621
622 #[test]
623 fn test_vended_permission_display() {
624 assert_eq!(VendedPermission::Read.to_string(), "read");
625 assert_eq!(VendedPermission::Write.to_string(), "write");
626 assert_eq!(VendedPermission::Admin.to_string(), "admin");
627 }
628
629 #[test]
630 fn test_parse_permission_with_invalid_values() {
631 let mut props = HashMap::new();
633 props.insert(PERMISSION.to_string(), "invalid".to_string());
634 assert_eq!(parse_permission(&props), VendedPermission::Read);
635
636 props.insert(PERMISSION.to_string(), "".to_string());
638 assert_eq!(parse_permission(&props), VendedPermission::Read);
639
640 let empty_props: HashMap<String, String> = HashMap::new();
642 assert_eq!(parse_permission(&empty_props), VendedPermission::Read);
643 }
644
645 #[test]
646 fn test_parse_duration_millis_with_invalid_values() {
647 const TEST_KEY: &str = "test_duration_millis";
648
649 let mut props = HashMap::new();
651 props.insert(TEST_KEY.to_string(), "not_a_number".to_string());
652 assert_eq!(
653 parse_duration_millis(&props, TEST_KEY),
654 DEFAULT_CREDENTIAL_DURATION_MILLIS
655 );
656
657 props.insert(TEST_KEY.to_string(), "-1000".to_string());
659 assert_eq!(
660 parse_duration_millis(&props, TEST_KEY),
661 DEFAULT_CREDENTIAL_DURATION_MILLIS
662 );
663
664 props.insert(TEST_KEY.to_string(), "".to_string());
666 assert_eq!(
667 parse_duration_millis(&props, TEST_KEY),
668 DEFAULT_CREDENTIAL_DURATION_MILLIS
669 );
670
671 let empty_props: HashMap<String, String> = HashMap::new();
673 assert_eq!(
674 parse_duration_millis(&empty_props, TEST_KEY),
675 DEFAULT_CREDENTIAL_DURATION_MILLIS
676 );
677
678 props.insert(TEST_KEY.to_string(), "7200000".to_string());
680 assert_eq!(parse_duration_millis(&props, TEST_KEY), 7200000);
681 }
682
683 #[test]
684 fn test_has_credential_vendor_config() {
685 let mut props = HashMap::new();
687 props.insert(ENABLED.to_string(), "true".to_string());
688 assert!(has_credential_vendor_config(&props));
689
690 props.insert(ENABLED.to_string(), "TRUE".to_string());
692 assert!(has_credential_vendor_config(&props));
693
694 props.insert(ENABLED.to_string(), "false".to_string());
696 assert!(!has_credential_vendor_config(&props));
697
698 props.insert(ENABLED.to_string(), "yes".to_string());
700 assert!(!has_credential_vendor_config(&props));
701
702 let empty_props: HashMap<String, String> = HashMap::new();
704 assert!(!has_credential_vendor_config(&empty_props));
705 }
706
707 #[test]
708 fn test_vended_credentials_debug_redacts_secrets() {
709 let mut storage_options = HashMap::new();
710 storage_options.insert(
711 "aws_access_key_id".to_string(),
712 "AKIAIOSFODNN7EXAMPLE".to_string(),
713 );
714 storage_options.insert(
715 "aws_secret_access_key".to_string(),
716 "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".to_string(),
717 );
718 storage_options.insert(
719 "aws_session_token".to_string(),
720 "FwoGZXIvYXdzE...".to_string(),
721 );
722
723 let creds = VendedCredentials::new(storage_options, 1234567890);
724 let debug_output = format!("{:?}", creds);
725
726 assert!(!debug_output.contains("AKIAIOSFODNN7EXAMPLE"));
728 assert!(!debug_output.contains("wJalrXUtnFEMI"));
729 assert!(!debug_output.contains("FwoGZXIvYXdzE"));
730
731 assert!(debug_output.contains("redacted"));
733 assert!(debug_output.contains("3 keys"));
734
735 assert!(debug_output.contains("1234567890"));
737 }
738
739 #[test]
740 fn test_vended_credentials_is_expired() {
741 let past_millis = std::time::SystemTime::now()
743 .duration_since(std::time::UNIX_EPOCH)
744 .unwrap()
745 .as_millis() as u64
746 - 1000; let expired_creds = VendedCredentials::new(HashMap::new(), past_millis);
749 assert!(expired_creds.is_expired());
750
751 let future_millis = std::time::SystemTime::now()
753 .duration_since(std::time::UNIX_EPOCH)
754 .unwrap()
755 .as_millis() as u64
756 + 3600000; let valid_creds = VendedCredentials::new(HashMap::new(), future_millis);
759 assert!(!valid_creds.is_expired());
760 }
761
762 #[test]
763 fn test_redact_credential() {
764 assert_eq!(redact_credential("AKIAIOSFODNN7EXAMPLE"), "AKIAIOSF***MPLE");
766
767 assert_eq!(redact_credential("1234567890123456"), "12345678***3456");
769
770 assert_eq!(redact_credential("short1234567"), "short123***");
772 assert_eq!(redact_credential("short123"), "short123***");
773 assert_eq!(redact_credential("tiny"), "tiny***");
774 assert_eq!(redact_credential("ab"), "ab***");
775 assert_eq!(redact_credential("a"), "a***");
776
777 assert_eq!(redact_credential(""), "[empty]");
779
780 assert_eq!(redact_credential("AKIAIOSFODNN7EXAMPLE"), "AKIAIOSF***MPLE");
783
784 let long_token = "ya29.a0AfH6SMBx1234567890abcdefghijklmnopqrstuvwxyz";
786 assert_eq!(redact_credential(long_token), "ya29.a0A***wxyz");
787
788 let sas_token = "sv=2021-06-08&ss=b&srt=sco&sp=rwdlacuiytfx&se=2024-12-31";
790 assert_eq!(redact_credential(sas_token), "sv=2021-***2-31");
791 }
792}