use std::sync::Arc;
use aws_config::BehaviorVersion;
use aws_credential_types::provider::SharedCredentialsProvider;
use aws_msk_iam_sasl_signer::generate_auth_token_from_credentials_provider;
use rdkafka::client::{ClientContext, OAuthToken};
use rdkafka::consumer::ConsumerContext;
use crate::ShoveError;
use crate::error::Result;
#[allow(dead_code)]
pub(super) struct MskIamTokenProvider {
region: aws_config::Region,
credentials: SharedCredentialsProvider,
handle: tokio::runtime::Handle,
}
#[allow(dead_code)]
impl MskIamTokenProvider {
pub(super) async fn new(region: String, profile: Option<String>) -> Result<Self> {
let region = aws_config::Region::new(region);
let mut loader = aws_config::defaults(BehaviorVersion::latest()).region(region.clone());
if let Some(p) = profile {
loader = loader.profile_name(p);
}
let cfg = loader.load().await;
let credentials = cfg.credentials_provider().ok_or_else(|| {
ShoveError::Connection("no AWS credentials provider available for MSK IAM auth".into())
})?;
let handle = tokio::runtime::Handle::current();
Ok(Self {
region,
credentials,
handle,
})
}
}
#[allow(dead_code)]
#[derive(Clone)]
pub(super) struct MskIamContext {
provider: Arc<MskIamTokenProvider>,
}
#[allow(dead_code)]
impl MskIamContext {
pub(super) fn new(provider: Arc<MskIamTokenProvider>) -> Self {
Self { provider }
}
}
impl ConsumerContext for MskIamContext {}
impl ClientContext for MskIamContext {
const ENABLE_REFRESH_OAUTH_TOKEN: bool = true;
fn generate_oauth_token(
&self,
_principal_name: Option<&str>,
) -> std::result::Result<OAuthToken, Box<dyn std::error::Error>> {
let provider = self.provider.clone();
let handle = provider.handle.clone();
let (token, expiry_ms) = handle.block_on(async move {
generate_auth_token_from_credentials_provider(
provider.region.clone(),
provider.credentials.clone(),
)
.await
})?;
Ok(OAuthToken {
token,
principal_name: String::new(),
lifetime_ms: expiry_ms,
})
}
}