camel-component-keycloak 0.18.0

Keycloak integration for rust-camel — claim presets, URL builders (Admin API and event consumer in future phases)
Documentation
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use camel_component_api::{Body, CamelError, Exchange, Message, RuntimeObservability};

use crate::admin_endpoint_config::AdminEndpointConfig;

#[cfg(test)]
use camel_component_api::test_support::PanicRuntimeObservability;
#[cfg(test)]
fn rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
    std::sync::Arc::new(PanicRuntimeObservability)
}

const USER_ID_HEADER: &str = "camel.keycloak.userId";

pub struct KeycloakAdminProducer {
    config: AdminEndpointConfig,
    http: reqwest::Client,
    runtime: Arc<dyn RuntimeObservability>,
}

impl KeycloakAdminProducer {
    pub fn new(config: AdminEndpointConfig, runtime: Arc<dyn RuntimeObservability>) -> Self {
        Self {
            http: config.http.clone(),
            config,
            runtime,
        }
    }

    async fn execute(&self, exchange: &mut Exchange) -> Result<(), CamelError> {
        let token = self.config.token_provider.get_token().await.map_err(|e| {
            CamelError::ProcessorError(format!("failed to acquire admin auth material: {e}"))
        })?;

        let user_id = self
            .config
            .user_id
            .as_deref()
            .or_else(|| exchange.property(USER_ID_HEADER).and_then(|v| v.as_str()));

        let target_realm = self.config.target_realm.as_deref().unwrap_or("");
        let path = self
            .config
            .operation
            .build_path(target_realm, user_id)
            .map_err(|e| CamelError::ProcessorError(format!("keycloak admin path error: {e}")))?;
        let url = format!("{}{}", self.config.server_url, path);

        let request_body = Self::extract_body(exchange);

        let mut builder = match self.config.operation.http_method() {
            "POST" => self.http.post(&url),
            "GET" => self.http.get(&url),
            "DELETE" => self.http.delete(&url),
            _ => unreachable!(),
        };

        builder = builder.bearer_auth(&token);

        if let Some(body) = request_body {
            builder = builder
                .header("Content-Type", "application/json")
                .body(body);
        }

        let response = builder.send().await.map_err(|e| {
            CamelError::ProcessorError(format!("keycloak admin request failed: {e}"))
        })?;

        let status = response.status();
        if status.is_success() {
            let response_body = response
                .text()
                .await
                .map_err(|e| CamelError::ProcessorError(format!("failed to read response: {e}")))?;
            if response_body.is_empty() {
                exchange.output = Some(Message::new(Body::Empty));
            } else {
                let body = if response_body.starts_with('{') || response_body.starts_with('[') {
                    let val: serde_json::Value =
                        serde_json::from_str(&response_body).map_err(|e| {
                            CamelError::ProcessorError(format!(
                                "failed to parse JSON response: {e}"
                            ))
                        })?;
                    Body::Json(val)
                } else {
                    Body::Text(response_body)
                };
                exchange.output = Some(Message::new(body));
            }
            Ok(())
        } else {
            let body = response.text().await.unwrap_or_default();
            Err(CamelError::ProcessorError(format!(
                "keycloak admin {} failed: {} {}",
                self.config.operation, status, body
            )))
        }
    }

    fn extract_body(exchange: &Exchange) -> Option<String> {
        match &exchange.input.body {
            Body::Text(s) => Some(s.clone()),
            Body::Json(v) => Some(serde_json::to_string(v).unwrap_or_default()),
            _ => None,
        }
    }
}

impl Clone for KeycloakAdminProducer {
    fn clone(&self) -> Self {
        Self {
            config: self.config.clone(),
            http: self.http.clone(),
            runtime: Arc::clone(&self.runtime),
        }
    }
}

impl tower::Service<Exchange> for KeycloakAdminProducer {
    type Response = Exchange;
    type Error = CamelError;
    type Future =
        Pin<Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>>;

    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
        let producer = self.clone();
        Box::pin(async move {
            producer.execute(&mut exchange).await?;
            Ok(exchange)
        })
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn extract_body_from_text() {
        let exchange = Exchange::new(Message::new(Body::Text("hello".to_string())));
        let result = KeycloakAdminProducer::extract_body(&exchange);
        assert_eq!(result, Some("hello".to_string()));
    }

    #[test]
    fn extract_body_from_json() {
        let exchange = Exchange::new(Message::new(Body::Json(serde_json::json!({"key": "val"}))));
        let result = KeycloakAdminProducer::extract_body(&exchange);
        assert_eq!(result, Some(r#"{"key":"val"}"#.to_string()));
    }

    #[test]
    fn extract_body_from_empty() {
        let exchange = Exchange::new(Message::new(Body::Empty));
        let result = KeycloakAdminProducer::extract_body(&exchange);
        assert_eq!(result, None);
    }
}