#![expect(unused_crate_dependencies, reason = "used in the library target")]
#[cfg(feature = "jwt-source")]
#[expect(
clippy::tests_outside_test_module,
reason = "https://github.com/rust-lang/rust-clippy/issues/11024"
)]
#[expect(
clippy::expect_used,
clippy::unwrap_used,
reason = "https://github.com/rust-lang/rust-clippy/issues/11119"
)]
mod integration_tests_jwt_source {
use spiffe::bundle::BundleSource as _;
use spiffe::jwt_source::JwtSourceBuilder;
use spiffe::jwt_source::{JwtSource, MetricsErrorKind, MetricsRecorder, ResourceLimits};
use spiffe::workload_api::error::WorkloadApiError;
use spiffe::{JwtBundle, SpiffeId, TrustDomain, WorkloadApiClient};
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
fn spiffe_id_1() -> SpiffeId {
SpiffeId::new("spiffe://example.org/myservice").unwrap()
}
fn spiffe_id_2() -> SpiffeId {
SpiffeId::new("spiffe://example.org/myservice2").unwrap()
}
fn trust_domain() -> TrustDomain {
TrustDomain::new("example.org").unwrap()
}
#[derive(Debug)]
struct TestMetricsRecorder {
updates: AtomicU64,
reconnects: AtomicU64,
errors: Arc<Mutex<HashMap<MetricsErrorKind, u64>>>,
update_notify: Arc<tokio::sync::Notify>,
}
impl MetricsRecorder for TestMetricsRecorder {
fn record_update(&self) {
self.updates.fetch_add(1, Ordering::Relaxed);
self.update_notify.notify_one();
}
fn record_reconnect(&self) {
self.reconnects.fetch_add(1, Ordering::Relaxed);
}
fn record_error(&self, kind: MetricsErrorKind) {
let mut errors = self.errors.blocking_lock();
*errors.entry(kind).or_insert(0) += 1;
}
}
impl TestMetricsRecorder {
fn new() -> Self {
Self {
updates: AtomicU64::new(0),
reconnects: AtomicU64::new(0),
errors: Arc::new(Mutex::new(HashMap::new())),
update_notify: Arc::new(tokio::sync::Notify::new()),
}
}
fn update_count(&self) -> u64 {
self.updates.load(Ordering::Relaxed)
}
fn update_notify(&self) -> Arc<tokio::sync::Notify> {
Arc::clone(&self.update_notify)
}
}
async fn get_source() -> JwtSource {
JwtSource::new().await.expect("Failed to create JwtSource")
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_get_bundle_for_trust_domain() {
let source = get_source().await;
let bundle: Arc<JwtBundle> = source
.bundle_for_trust_domain(&trust_domain())
.expect("Failed to get JwtBundle")
.expect("No JwtBundle found");
assert_eq!(bundle.trust_domain().as_ref(), trust_domain().as_ref());
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_try_bundle_for_trust_domain() {
let source = get_source().await;
let bundle = source
.try_bundle_for_trust_domain(&trust_domain())
.expect("try_bundle_for_trust_domain() should return Some when healthy");
assert_eq!(bundle.trust_domain().as_ref(), trust_domain().as_ref());
source.shutdown().await;
assert!(
source
.try_bundle_for_trust_domain(&trust_domain())
.is_none(),
"try_bundle_for_trust_domain() should return None after shutdown"
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_bundle_set() {
let source = get_source().await;
let bundle_set = source.bundle_set().expect("Failed to get bundle set");
let bundle = bundle_set.get(&trust_domain());
assert!(bundle.is_some(), "Bundle set should contain trust domain");
assert_eq!(
bundle.unwrap().trust_domain().as_ref(),
trust_domain().as_ref()
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_get_jwt_svid() {
let source = get_source().await;
let audience = vec!["test-audience".to_string()];
let svid = source
.get_jwt_svid(&audience)
.await
.expect("Failed to get JWT SVID");
let expected_ids = [spiffe_id_1(), spiffe_id_2()];
assert!(
expected_ids.contains(svid.spiffe_id()),
"Unexpected SPIFFE ID: {:?}",
svid.spiffe_id()
);
assert!(!svid.token().is_empty(), "JWT token should not be empty");
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_get_jwt_svid_with_id() {
let source = get_source().await;
let audience = vec!["test-audience".to_string()];
let svid = source
.get_jwt_svid_with_id(&audience, Some(&spiffe_id_1()))
.await
.expect("Failed to get JWT SVID with ID");
assert_eq!(svid.spiffe_id(), &spiffe_id_1());
assert!(!svid.token().is_empty(), "JWT token should not be empty");
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_is_healthy() {
let source = get_source().await;
assert!(
source.is_healthy(),
"Source should be healthy after creation"
);
if source.is_healthy() {
let bundle_result = source.bundle_for_trust_domain(&trust_domain());
assert!(
bundle_result.is_ok(),
"If is_healthy() returns true, bundle_for_trust_domain() should succeed"
);
}
source.shutdown().await;
assert!(
!source.is_healthy(),
"Source should be unhealthy after shutdown"
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_source_updates() {
let source = get_source().await;
let mut updates = source.updated();
let initial_seq = updates.last();
tokio::time::timeout(
Duration::from_secs(10),
updates.wait_for(|&seq| seq > initial_seq),
)
.await
.expect("Should receive initial update within 10 seconds")
.expect("Update channel should not be closed");
let new_seq = updates.last();
assert!(
new_seq > initial_seq,
"Sequence number should have increased"
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_jwt_source_with_custom_client() {
type ClientFactory = Arc<
dyn Fn() -> Pin<
Box<dyn Future<Output = Result<WorkloadApiClient, WorkloadApiError>> + Send>,
> + Send
+ Sync,
>;
let factory: ClientFactory =
Arc::new(|| Box::pin(async { WorkloadApiClient::connect_env().await }));
let source = JwtSourceBuilder::new()
.client_factory(factory)
.build()
.await
.unwrap();
let bundle = source
.bundle_for_trust_domain(&trust_domain())
.unwrap()
.expect("No JwtBundle found");
assert_eq!(bundle.trust_domain().as_ref(), trust_domain().as_ref());
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_resource_limits() {
let limits = ResourceLimits {
max_bundles: Some(10),
max_bundle_jwks_bytes: Some(1024 * 1024), };
let source = JwtSourceBuilder::new()
.resource_limits(limits)
.build()
.await
.unwrap();
let bundle = source.bundle_for_trust_domain(&trust_domain()).unwrap();
assert!(
bundle.is_some(),
"Should get bundle when limits are not exceeded"
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_unlimited_resource_limits() {
let limits = ResourceLimits {
max_bundles: None,
max_bundle_jwks_bytes: None,
};
let source = JwtSourceBuilder::new()
.resource_limits(limits)
.build()
.await
.unwrap();
let bundle = source.bundle_for_trust_domain(&trust_domain()).unwrap();
assert!(bundle.is_some(), "Should get bundle with unlimited limits");
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_metrics_recorder() {
let metrics = Arc::new(TestMetricsRecorder::new());
let update_notify = metrics.update_notify();
let source = {
let metrics = Arc::clone(&metrics);
JwtSourceBuilder::new()
.metrics(metrics)
.build()
.await
.unwrap()
};
let _bundle = source.bundle_for_trust_domain(&trust_domain()).unwrap();
let mut updates = source.updated();
let initial_seq = updates.last();
let update_result = tokio::time::timeout(Duration::from_secs(30), async {
tokio::select! {
seq_result = updates.wait_for(|&seq| seq > initial_seq) => seq_result,
() = update_notify.notified() => {
if updates.last() > initial_seq {
Ok(updates.last())
} else {
updates.wait_for(|&seq| seq > initial_seq).await
}
}
}
})
.await;
if let Ok(Ok(_seq)) = update_result {
assert!(
metrics.update_count() > 0,
"Should have recorded at least one update after bundle rotation"
);
}
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_shutdown_with_timeout() {
let source = JwtSourceBuilder::new()
.shutdown_timeout(Some(Duration::from_secs(5)))
.build()
.await
.unwrap();
let result =
tokio::time::timeout(Duration::from_secs(10), source.shutdown_configured()).await;
assert!(result.is_ok(), "Shutdown should complete");
result.unwrap().expect("Shutdown should succeed");
assert!(
source.bundle_for_trust_domain(&trust_domain()).is_err(),
"bundle_for_trust_domain() should fail after shutdown"
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_shutdown_idempotent() {
let source = JwtSourceBuilder::new()
.shutdown_timeout(Some(Duration::from_secs(5)))
.build()
.await
.unwrap();
source.shutdown_configured().await.unwrap();
let result = source.shutdown_configured().await;
assert!(result.is_ok(), "Shutdown should be idempotent");
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_reconnect_backoff_config() {
let source = JwtSourceBuilder::new()
.reconnect_backoff(Duration::from_millis(100), Duration::from_secs(5))
.build()
.await
.unwrap();
let bundle = source.bundle_for_trust_domain(&trust_domain()).unwrap();
assert!(
bundle.is_some(),
"Should get bundle with custom backoff config"
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_custom_endpoint() {
let source = JwtSourceBuilder::new()
.endpoint("unix:/tmp/spire-agent/public/api.sock")
.build()
.await
.unwrap();
let bundle = source.bundle_for_trust_domain(&trust_domain()).unwrap();
assert!(bundle.is_some(), "Should get bundle with custom endpoint");
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_health_check_guarantees_bundle_success() {
let source = get_source().await;
if source.is_healthy() {
let bundle_result = source.bundle_for_trust_domain(&trust_domain());
assert!(
bundle_result.is_ok(),
"is_healthy() returning true must guarantee bundle_for_trust_domain() succeeds"
);
}
for _ in 0..10 {
if source.is_healthy() {
assert!(
source.bundle_for_trust_domain(&trust_domain()).is_ok(),
"is_healthy() must consistently guarantee bundle_for_trust_domain() success"
);
}
}
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_update_notifications_sequence() {
let source = get_source().await;
let mut updates = source.updated();
let initial_seq = updates.last();
tokio::time::timeout(
Duration::from_secs(30),
updates.wait_for(|&seq| seq > initial_seq),
)
.await
.expect("Should receive update within 30 seconds")
.expect("Update channel should not be closed");
let new_seq = updates.last();
assert!(new_seq > initial_seq, "Sequence number should increase");
assert!(
new_seq >= initial_seq,
"Sequence numbers should be monotonic"
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_multiple_update_receivers() {
let source = get_source().await;
let mut updates1 = source.updated();
let updates2 = source.updated();
let initial_seq1 = updates1.last();
let initial_seq2 = updates2.last();
assert_eq!(
initial_seq1, initial_seq2,
"Receivers should start with same sequence"
);
tokio::time::timeout(
Duration::from_secs(30),
updates1.wait_for(|&seq| seq > initial_seq1),
)
.await
.expect("Should receive update")
.expect("Update channel should not be closed");
assert_eq!(
updates1.last(),
updates2.last(),
"All receivers should see the same sequence number"
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_builder_defaults() {
let source = JwtSourceBuilder::new().build().await.unwrap();
let bundle = source.bundle_for_trust_domain(&trust_domain()).unwrap();
assert!(
bundle.is_some(),
"Should work with default builder configuration"
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_bundle_set_after_shutdown() {
let source = get_source().await;
source.shutdown().await;
assert!(
source.bundle_set().is_err(),
"bundle_set() should fail after shutdown"
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_updated_after_shutdown() {
let source = get_source().await;
let mut updates = source.updated();
source.shutdown().await;
let seq_before = updates.last();
let seq_immediate = updates.last();
assert_eq!(
seq_before, seq_immediate,
"Sequence should not change immediately after shutdown"
);
let update_occurred = tokio::time::timeout(Duration::from_millis(100), updates.changed())
.await
.is_ok();
assert!(!update_occurred, "No updates should occur after shutdown");
let seq_after = updates.last();
assert_eq!(
seq_before, seq_after,
"Sequence should not change after shutdown"
);
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_get_jwt_svid_after_shutdown() {
let source = get_source().await;
source.shutdown().await;
let audience = vec!["test-audience".to_string()];
let result = source.get_jwt_svid(&audience).await;
assert!(result.is_err(), "get_jwt_svid() should fail after shutdown");
}
#[tokio::test]
#[ignore = "requires running SPIFFE Workload API"]
async fn test_get_jwt_svid_with_id_after_shutdown() {
let source = get_source().await;
source.shutdown().await;
let audience = vec!["test-audience".to_string()];
let result = source
.get_jwt_svid_with_id(&audience, Some(&spiffe_id_1()))
.await;
assert!(
result.is_err(),
"get_jwt_svid_with_id() should fail after shutdown"
);
}
}