use std::sync::Arc;
use magnetar_admin::AdminClient;
use magnetar_auth_oauth2::{ClientCredentialsFlow, Credentials};
use wiremock::matchers::{body_string_contains, header, method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
#[tokio::test]
async fn token_auth_attaches_bearer_header() {
let broker = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/admin/v2/clusters"))
.and(header("authorization", "Bearer secret-token-123"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!(["cl-1"])))
.expect(1)
.mount(&broker)
.await;
let admin = AdminClient::builder()
.service_url(broker.uri().parse().unwrap())
.token("secret-token-123".to_owned())
.build()
.expect("build admin client");
let clusters = admin
.cluster_list()
.await
.expect("cluster list returns 200");
assert_eq!(clusters, vec!["cl-1".to_owned()]);
}
#[tokio::test]
async fn oauth2_auth_fetches_token_then_attaches_bearer() {
let idp = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/token"))
.and(body_string_contains("grant_type=client_credentials"))
.and(body_string_contains("client_id=admin-client"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"access_token": "idp-issued-access-token",
"expires_in": 3600,
"token_type": "Bearer"
})))
.expect(1)
.mount(&idp)
.await;
let broker = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/admin/v2/clusters"))
.and(header("authorization", "Bearer idp-issued-access-token"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!(["cl-oauth"])))
.expect(1)
.mount(&broker)
.await;
let flow = ClientCredentialsFlow::builder()
.issuer_url("https://idp.example/realms/test".parse().unwrap())
.token_endpoint(format!("{}/token", idp.uri()).parse().unwrap())
.audience("urn:pulsar:broker")
.credentials(Credentials::ClientSecret {
client_id: "admin-client".to_owned(),
client_secret: "admin-secret".to_owned(),
})
.build()
.expect("build oauth2 flow");
let admin = AdminClient::builder()
.service_url(broker.uri().parse().unwrap())
.oauth2(Arc::new(flow))
.build()
.expect("build admin client");
let clusters = admin
.cluster_list()
.await
.expect("cluster list returns 200");
assert_eq!(clusters, vec!["cl-oauth".to_owned()]);
}
#[tokio::test]
async fn tls_builder_options_apply_cleanly() {
let broker = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/admin/v2/clusters"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([])))
.expect(1)
.mount(&broker)
.await;
let ca_pem = TEST_CA_PEM.as_bytes().to_vec();
let admin = AdminClient::builder()
.service_url(broker.uri().parse().unwrap())
.tls_trust_cert_pem(ca_pem)
.tls_allow_insecure(true)
.build()
.expect("build admin client with TLS options");
let clusters = admin
.cluster_list()
.await
.expect("cluster list returns 200");
assert!(clusters.is_empty());
}
const TEST_CA_PEM: &str = "-----BEGIN CERTIFICATE-----
MIIBjDCCATGgAwIBAgIUemNWluoHpjirTKja7Gw+wVFlFWIwCgYIKoZIzj0EAwIw
GzEZMBcGA1UEAwwQbWFnbmV0YXItdGVzdC1jYTAeFw0yNjA2MTUyMDQ0MTNaFw0z
NjA2MTIyMDQ0MTNaMBsxGTAXBgNVBAMMEG1hZ25ldGFyLXRlc3QtY2EwWTATBgcq
hkjOPQIBBggqhkjOPQMBBwNCAATwIEPGpbT9u0SPTBR+SAhIUBy9GSrhPmlMH3Xp
4fQ8MUFi5KDxDV1s/JPw4Iv0JJx5WE7X0Wn/eZG9kFnZ6I5To1MwUTAdBgNVHQ4E
FgQUMolXG6aL9uxcHt4DlzEb+HGogtMwHwYDVR0jBBgwFoAUMolXG6aL9uxcHt4D
lzEb+HGogtMwDwYDVR0TAQH/BAUwAwEB/zAKBggqhkjOPQQDAgNJADBGAiEA1z/z
Yg9awrqi95eIIAnLH3jCzopiA7vtxyR54zbT5LsCIQCwOK9PVJwjwhMxwtDC3h44
yxkLerhB+WdzfB1b3bMgUA==
-----END CERTIFICATE-----
";