#![expect(unused_crate_dependencies, reason = "used in the library target")]
#[cfg(feature = "x509-source")]
#[expect(
clippy::tests_outside_test_module,
reason = "https://github.com/rust-lang/rust-clippy/issues/11024"
)]
#[expect(
clippy::expect_used,
clippy::unwrap_used,
reason = "https://github.com/rust-lang/rust-clippy/issues/11119"
)]
mod integration_tests_x509_source {
use spiffe::bundle::BundleSource as _;
use spiffe::workload_api::error::WorkloadApiError;
use spiffe::x509_source::X509SourceBuilder;
use spiffe::x509_source::{
MetricsErrorKind, MetricsRecorder, ResourceLimits, SvidPicker, X509Source,
};
use spiffe::{SpiffeId, TrustDomain, WorkloadApiClient, X509Bundle, X509Svid};
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
fn spiffe_id_1() -> SpiffeId {
SpiffeId::new("spiffe://example.org/myservice").unwrap()
}
fn spiffe_id_2() -> SpiffeId {
SpiffeId::new("spiffe://example.org/myservice2").unwrap()
}
fn trust_domain() -> TrustDomain {
TrustDomain::new("example.org").unwrap()
}
struct SecondSvidPicker;
impl SvidPicker for SecondSvidPicker {
fn pick_svid(&self, svids: &[Arc<X509Svid>]) -> Option<usize> {
(svids.len() > 1).then_some(1)
}
}
#[derive(Debug)]
struct TestMetricsRecorder {
updates: AtomicU64,
reconnects: AtomicU64,
errors: Arc<Mutex<HashMap<MetricsErrorKind, u64>>>,
update_notify: Arc<tokio::sync::Notify>,
}
impl MetricsRecorder for TestMetricsRecorder {
fn record_update(&self) {
self.updates.fetch_add(1, Ordering::Relaxed);
self.update_notify.notify_one();
}
fn record_reconnect(&self) {
self.reconnects.fetch_add(1, Ordering::Relaxed);
}
fn record_error(&self, kind: MetricsErrorKind) {
let mut errors = self.errors.blocking_lock();
*errors.entry(kind).or_insert(0) += 1;
}
}
impl TestMetricsRecorder {
fn new() -> Self {
Self {
updates: AtomicU64::new(0),
reconnects: AtomicU64::new(0),
errors: Arc::new(Mutex::new(HashMap::new())),
update_notify: Arc::new(tokio::sync::Notify::new()),
}
}
fn update_count(&self) -> u64 {
self.updates.load(Ordering::Relaxed)
}
fn update_notify(&self) -> Arc<tokio::sync::Notify> {
Arc::clone(&self.update_notify)
}
}
async fn get_source() -> X509Source {
X509Source::new()
.await
.expect("Failed to create X509Source")
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_get_x509_svid() {
let source = get_source().await;
let svid = source.svid().expect("Failed to get X509Svid");
let expected_ids = [spiffe_id_1(), spiffe_id_2()];
assert!(
expected_ids.contains(svid.spiffe_id()),
"Unexpected SPIFFE ID: {:?}",
svid.spiffe_id()
);
assert!(
!svid.cert_chain().is_empty(),
"Certificate chain should not be empty"
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_try_svid() {
let source = get_source().await;
let svid = source
.try_svid()
.expect("try_svid() should return Some when healthy");
assert!(
[spiffe_id_1(), spiffe_id_2()].contains(svid.spiffe_id()),
"Unexpected SPIFFE ID"
);
source.shutdown().await;
assert!(
source.try_svid().is_none(),
"try_svid() should return None after shutdown"
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_get_bundle_for_trust_domain() {
let source = get_source().await;
let bundle: Arc<X509Bundle> = source
.bundle_for_trust_domain(&trust_domain())
.expect("Failed to get X509Bundle")
.expect("No X509Bundle found");
assert_eq!(bundle.trust_domain().as_ref(), trust_domain().as_ref());
assert!(
!bundle.authorities().is_empty(),
"Bundle should have at least one authority"
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_try_bundle_for_trust_domain() {
let source = get_source().await;
let bundle = source
.try_bundle_for_trust_domain(&trust_domain())
.expect("try_bundle_for_trust_domain() should return Some when healthy");
assert_eq!(bundle.trust_domain().as_ref(), trust_domain().as_ref());
source.shutdown().await;
assert!(
source
.try_bundle_for_trust_domain(&trust_domain())
.is_none(),
"try_bundle_for_trust_domain() should return None after shutdown"
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_bundle_set() {
let source = get_source().await;
let bundle_set = source.bundle_set().expect("Failed to get bundle set");
let bundle = bundle_set.get(&trust_domain());
assert!(bundle.is_some(), "Bundle set should contain trust domain");
assert_eq!(
bundle.unwrap().trust_domain().as_ref(),
trust_domain().as_ref()
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_x509_context() {
let source = get_source().await;
let context = source.x509_context().expect("Failed to get X509Context");
assert!(
!context.svids().is_empty(),
"Context should have at least one SVID"
);
let default_svid = context.default_svid();
assert!(default_svid.is_some(), "Context should have a default SVID");
assert!(
!context.bundle_set().is_empty(),
"Context should have bundles"
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_is_healthy() {
let source = get_source().await;
assert!(
source.is_healthy(),
"Source should be healthy after creation"
);
if source.is_healthy() {
let svid_result = source.svid();
assert!(
svid_result.is_ok(),
"If is_healthy() returns true, svid() should succeed"
);
}
source.shutdown().await;
assert!(
!source.is_healthy(),
"Source should be unhealthy after shutdown"
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_source_updates() {
let source = get_source().await;
let mut updates = source.updated();
let initial_seq = updates.last();
tokio::time::timeout(
Duration::from_secs(10),
updates.wait_for(|&seq| seq > initial_seq),
)
.await
.expect("Should receive initial update within 10 seconds")
.expect("Update channel should not be closed");
let new_seq = updates.last();
assert!(
new_seq > initial_seq,
"Sequence number should have increased"
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_x509_source_with_custom_picker_and_client() {
type ClientFactory = Arc<
dyn Fn() -> Pin<
Box<dyn Future<Output = Result<WorkloadApiClient, WorkloadApiError>> + Send>,
> + Send
+ Sync,
>;
let factory: ClientFactory =
Arc::new(|| Box::pin(async { WorkloadApiClient::connect_env().await }));
let source = X509SourceBuilder::new()
.client_factory(factory)
.picker(SecondSvidPicker)
.build()
.await
.unwrap();
let svid = source.svid().expect("Failed to get X509Svid");
let expected_ids = [spiffe_id_1(), spiffe_id_2()];
assert!(
expected_ids.contains(svid.spiffe_id()),
"Unexpected SPIFFE ID"
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_resource_limits() {
let limits = ResourceLimits {
max_svids: Some(10),
max_bundles: Some(10),
max_bundle_der_bytes: Some(1024 * 1024), };
let source = X509SourceBuilder::new()
.resource_limits(limits)
.build()
.await
.unwrap();
let svid = source.svid().unwrap();
assert!(
[spiffe_id_1(), spiffe_id_2()].contains(svid.spiffe_id()),
"Should get SVID when limits are not exceeded"
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_unlimited_resource_limits() {
let limits = ResourceLimits {
max_svids: None,
max_bundles: None,
max_bundle_der_bytes: None,
};
let source = X509SourceBuilder::new()
.resource_limits(limits)
.build()
.await
.unwrap();
let svid = source.svid().unwrap();
assert!(
[spiffe_id_1(), spiffe_id_2()].contains(svid.spiffe_id()),
"Should get SVID with unlimited limits"
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_metrics_recorder() {
let metrics = Arc::new(TestMetricsRecorder::new());
let update_notify = metrics.update_notify();
let source = {
let metrics = Arc::clone(&metrics);
X509SourceBuilder::new()
.metrics(metrics)
.build()
.await
.unwrap()
};
let _svid = source.svid().unwrap();
let update_recorded =
tokio::time::timeout(Duration::from_secs(2), update_notify.notified())
.await
.is_ok();
assert!(
update_recorded && metrics.update_count() > 0,
"Should have recorded at least one update"
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_shutdown_with_timeout() {
let source = X509SourceBuilder::new()
.shutdown_timeout(Some(Duration::from_secs(5)))
.build()
.await
.unwrap();
let result =
tokio::time::timeout(Duration::from_secs(10), source.shutdown_configured()).await;
assert!(result.is_ok(), "Shutdown should complete");
result.unwrap().expect("Shutdown should succeed");
assert!(source.svid().is_err(), "svid() should fail after shutdown");
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_shutdown_idempotent() {
let source = X509SourceBuilder::new()
.shutdown_timeout(Some(Duration::from_secs(5)))
.build()
.await
.unwrap();
source.shutdown_configured().await.unwrap();
let result = source.shutdown_configured().await;
assert!(result.is_ok(), "Shutdown should be idempotent");
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_reconnect_backoff_config() {
let source = X509SourceBuilder::new()
.reconnect_backoff(Duration::from_millis(100), Duration::from_secs(5))
.build()
.await
.unwrap();
let svid = source.svid().unwrap();
assert!(
[spiffe_id_1(), spiffe_id_2()].contains(svid.spiffe_id()),
"Should get SVID with custom backoff config"
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_custom_endpoint() {
let source = X509SourceBuilder::new()
.endpoint("unix:/tmp/spire-agent/public/api.sock")
.build()
.await
.unwrap();
let svid = source.svid().unwrap();
assert!(
[spiffe_id_1(), spiffe_id_2()].contains(svid.spiffe_id()),
"Should get SVID with custom endpoint"
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_picker_selection() {
let source = X509SourceBuilder::new()
.picker(SecondSvidPicker)
.build()
.await
.unwrap();
let svid = source.svid().unwrap();
assert!(
[spiffe_id_1(), spiffe_id_2()].contains(svid.spiffe_id()),
"Picker should select a valid SVID"
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_health_check_guarantees_svid_success() {
let source = get_source().await;
if source.is_healthy() {
let svid_result = source.svid();
assert!(
svid_result.is_ok(),
"is_healthy() returning true must guarantee svid() succeeds"
);
}
for _ in 0..10 {
if source.is_healthy() {
assert!(
source.svid().is_ok(),
"is_healthy() must consistently guarantee svid() success"
);
}
}
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_update_notifications_sequence() {
let source = get_source().await;
let mut updates = source.updated();
let initial_seq = updates.last();
tokio::time::timeout(
Duration::from_secs(30),
updates.wait_for(|&seq| seq > initial_seq),
)
.await
.expect("Should receive update within 30 seconds")
.expect("Update channel should not be closed");
let new_seq = updates.last();
assert!(new_seq > initial_seq, "Sequence number should increase");
assert!(
new_seq >= initial_seq,
"Sequence numbers should be monotonic"
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_multiple_update_receivers() {
let source = get_source().await;
let mut updates1 = source.updated();
let updates2 = source.updated();
let initial_seq1 = updates1.last();
let initial_seq2 = updates2.last();
assert_eq!(
initial_seq1, initial_seq2,
"Receivers should start with same sequence"
);
tokio::time::timeout(
Duration::from_secs(30),
updates1.wait_for(|&seq| seq > initial_seq1),
)
.await
.expect("Should receive update")
.expect("Update channel should not be closed");
assert_eq!(
updates1.last(),
updates2.last(),
"All receivers should see the same sequence number"
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_builder_defaults() {
let source = X509SourceBuilder::new().build().await.unwrap();
let svid = source.svid().unwrap();
assert!(
[spiffe_id_1(), spiffe_id_2()].contains(svid.spiffe_id()),
"Should work with default builder configuration"
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_context_after_shutdown() {
let source = get_source().await;
source.shutdown().await;
assert!(
source.x509_context().is_err(),
"x509_context() should fail after shutdown"
);
assert!(
source.bundle_set().is_err(),
"bundle_set() should fail after shutdown"
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_updated_after_shutdown() {
let source = get_source().await;
let mut updates = source.updated();
source.shutdown().await;
let seq_before = updates.last();
let seq_immediate = updates.last();
assert_eq!(
seq_before, seq_immediate,
"Sequence should not change immediately after shutdown"
);
let update_occurred = tokio::time::timeout(Duration::from_millis(100), updates.changed())
.await
.is_ok();
assert!(!update_occurred, "No updates should occur after shutdown");
let seq_after = updates.last();
assert_eq!(
seq_before, seq_after,
"Sequence should not change after shutdown"
);
}
}