camel-processor 0.14.0

Message processors for rust-camel
Documentation
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use tower::{Layer, Service};

use camel_api::security_policy::{
    AuthorizationDecision, SecurityPolicy, store_principal_properties,
};
use camel_api::{CamelError, Exchange};

#[derive(Clone)]
pub struct SecurityPolicyLayer {
    policy: Arc<dyn SecurityPolicy>,
}

impl SecurityPolicyLayer {
    pub fn new(policy: Arc<dyn SecurityPolicy>) -> Self {
        Self { policy }
    }
}

impl<S> Layer<S> for SecurityPolicyLayer {
    type Service = SecurityPolicyService<S>;

    fn layer(&self, inner: S) -> Self::Service {
        SecurityPolicyService {
            inner,
            policy: Arc::clone(&self.policy),
        }
    }
}

pub struct SecurityPolicyService<S> {
    inner: S,
    policy: Arc<dyn SecurityPolicy>,
}

impl<S: Clone> Clone for SecurityPolicyService<S> {
    fn clone(&self) -> Self {
        Self {
            inner: self.inner.clone(),
            policy: Arc::clone(&self.policy),
        }
    }
}

impl<S> Service<Exchange> for SecurityPolicyService<S>
where
    S: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + 'static,
    S::Future: Send,
{
    type Response = Exchange;
    type Error = CamelError;
    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.inner.poll_ready(cx)
    }

    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
        let policy = Arc::clone(&self.policy);
        let clone = self.inner.clone();
        let mut inner = std::mem::replace(&mut self.inner, clone);

        Box::pin(async move {
            match policy.evaluate(&mut exchange).await {
                Ok(AuthorizationDecision::Granted { principal }) => {
                    store_principal_properties(&mut exchange, &principal);
                    inner.call(exchange).await
                }
                Ok(AuthorizationDecision::Denied {
                    reason,
                    required,
                    actual,
                }) => {
                    let msg = format!(
                        "Access denied: {reason}. Required: {required:?}, actual: {actual:?}"
                    );
                    Err(CamelError::Unauthorized(msg))
                }
                Err(e) => Err(e),
            }
        })
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use async_trait::async_trait;
    use camel_api::security_policy::{
        PRINCIPAL_AUDIENCE_KEY, PRINCIPAL_CLAIMS_KEY, PRINCIPAL_ISSUER_KEY, PRINCIPAL_KEY,
        PRINCIPAL_ROLES_KEY, PRINCIPAL_SCOPES_KEY, PRINCIPAL_SUBJECT_KEY, Principal,
    };
    use camel_api::{BoxProcessor, BoxProcessorExt, Message};
    use std::sync::atomic::{AtomicU32, Ordering};
    use tower::ServiceExt;

    fn make_exchange() -> Exchange {
        Exchange::new(Message::new("test"))
    }

    fn ok_processor() -> BoxProcessor {
        BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
    }

    fn test_principal() -> Principal {
        Principal {
            subject: "user1".into(),
            issuer: "test-issuer".into(),
            audience: vec!["api".into()],
            scopes: vec!["read".into()],
            roles: vec!["admin".into()],
            claims: serde_json::json!({"sub": "user1"}),
        }
    }

    struct GrantPolicy;
    #[async_trait]
    impl SecurityPolicy for GrantPolicy {
        async fn evaluate(
            &self,
            _exchange: &mut Exchange,
        ) -> Result<AuthorizationDecision, CamelError> {
            Ok(AuthorizationDecision::Granted {
                principal: test_principal(),
            })
        }
    }

    struct DenyPolicy;
    #[async_trait]
    impl SecurityPolicy for DenyPolicy {
        async fn evaluate(
            &self,
            _exchange: &mut Exchange,
        ) -> Result<AuthorizationDecision, CamelError> {
            Ok(AuthorizationDecision::Denied {
                reason: "missing role".into(),
                required: vec!["admin".into()],
                actual: vec!["user".into()],
            })
        }
    }

    struct FailPolicy;
    #[async_trait]
    impl SecurityPolicy for FailPolicy {
        async fn evaluate(
            &self,
            _exchange: &mut Exchange,
        ) -> Result<AuthorizationDecision, CamelError> {
            Err(CamelError::Unauthenticated("invalid token".into()))
        }
    }

    #[tokio::test]
    async fn test_granted_stores_properties() {
        let layer = SecurityPolicyLayer::new(Arc::new(GrantPolicy));
        let mut svc = layer.layer(ok_processor());
        let result = svc.ready().await.unwrap().call(make_exchange()).await;
        assert!(result.is_ok());
        let ex = result.unwrap();
        assert_eq!(
            ex.property(PRINCIPAL_SUBJECT_KEY),
            Some(&serde_json::Value::String("user1".into()))
        );
        assert_eq!(
            ex.property(PRINCIPAL_ISSUER_KEY),
            Some(&serde_json::Value::String("test-issuer".into()))
        );
        assert!(ex.property(PRINCIPAL_KEY).is_some());
    }

    #[tokio::test]
    async fn test_denied_returns_unauthorized_error() {
        let layer = SecurityPolicyLayer::new(Arc::new(DenyPolicy));
        let mut svc = layer.layer(ok_processor());
        let result = svc.ready().await.unwrap().call(make_exchange()).await;
        assert!(result.is_err());
        match result.unwrap_err() {
            CamelError::Unauthorized(msg) => assert!(msg.contains("missing role")),
            other => panic!("expected Unauthorized, got: {other:?}"),
        }
    }

    #[tokio::test]
    async fn test_denied_error_contains_required_actual() {
        let layer = SecurityPolicyLayer::new(Arc::new(DenyPolicy));
        let mut svc = layer.layer(ok_processor());
        let result = svc.ready().await.unwrap().call(make_exchange()).await;
        let msg = match result.unwrap_err() {
            CamelError::Unauthorized(msg) => msg,
            other => panic!("expected Unauthorized, got: {other:?}"),
        };
        assert!(msg.contains("admin"));
        assert!(msg.contains("user"));
    }

    #[tokio::test]
    async fn test_evaluate_error_propagates() {
        let layer = SecurityPolicyLayer::new(Arc::new(FailPolicy));
        let mut svc = layer.layer(ok_processor());
        let result = svc.ready().await.unwrap().call(make_exchange()).await;
        match result.unwrap_err() {
            CamelError::Unauthenticated(msg) => assert!(msg.contains("invalid token")),
            other => panic!("expected Unauthenticated, got: {other:?}"),
        }
    }

    #[tokio::test]
    async fn test_multiple_calls_share_policy() {
        let count = Arc::new(AtomicU32::new(0));
        struct CountingPolicy {
            count: Arc<AtomicU32>,
        }
        #[async_trait]
        impl SecurityPolicy for CountingPolicy {
            async fn evaluate(
                &self,
                _exchange: &mut Exchange,
            ) -> Result<AuthorizationDecision, CamelError> {
                self.count.fetch_add(1, Ordering::SeqCst);
                Ok(AuthorizationDecision::Granted {
                    principal: Principal {
                        subject: "user1".into(),
                        issuer: "test".into(),
                        audience: vec![],
                        scopes: vec![],
                        roles: vec![],
                        claims: serde_json::Value::Null,
                    },
                })
            }
        }
        let policy = Arc::new(CountingPolicy {
            count: Arc::clone(&count),
        });
        let layer = SecurityPolicyLayer::new(Arc::clone(&policy) as Arc<dyn SecurityPolicy>);
        let mut svc = layer.layer(ok_processor());
        for _ in 0..3 {
            let result = svc.ready().await.unwrap().call(make_exchange()).await;
            assert!(result.is_ok());
        }
        assert_eq!(count.load(Ordering::SeqCst), 3);
    }

    #[tokio::test]
    async fn test_granted_all_property_json_formats() {
        let layer = SecurityPolicyLayer::new(Arc::new(GrantPolicy));
        let mut svc = layer.layer(ok_processor());
        let result = svc.ready().await.unwrap().call(make_exchange()).await;
        let ex = result.unwrap();

        let roles: Vec<String> =
            serde_json::from_str(ex.property(PRINCIPAL_ROLES_KEY).unwrap().as_str().unwrap())
                .unwrap();
        assert_eq!(roles, vec!["admin"]);

        let scopes: Vec<String> =
            serde_json::from_str(ex.property(PRINCIPAL_SCOPES_KEY).unwrap().as_str().unwrap())
                .unwrap();
        assert_eq!(scopes, vec!["read"]);

        let audience: Vec<String> = serde_json::from_str(
            ex.property(PRINCIPAL_AUDIENCE_KEY)
                .unwrap()
                .as_str()
                .unwrap(),
        )
        .unwrap();
        assert_eq!(audience, vec!["api"]);

        let claims: serde_json::Value =
            serde_json::from_str(ex.property(PRINCIPAL_CLAIMS_KEY).unwrap().as_str().unwrap())
                .unwrap();
        assert_eq!(claims["sub"], "user1");
    }

    #[tokio::test]
    async fn test_granted_empty_principal_fields() {
        struct EmptyPrincipalPolicy;
        #[async_trait]
        impl SecurityPolicy for EmptyPrincipalPolicy {
            async fn evaluate(
                &self,
                _exchange: &mut Exchange,
            ) -> Result<AuthorizationDecision, CamelError> {
                Ok(AuthorizationDecision::Granted {
                    principal: Principal {
                        subject: "minimal".into(),
                        issuer: String::new(),
                        audience: vec![],
                        scopes: vec![],
                        roles: vec![],
                        claims: serde_json::Value::Null,
                    },
                })
            }
        }
        let layer = SecurityPolicyLayer::new(Arc::new(EmptyPrincipalPolicy));
        let mut svc = layer.layer(ok_processor());
        let result = svc.ready().await.unwrap().call(make_exchange()).await;
        let ex = result.unwrap();

        assert_eq!(
            ex.property(PRINCIPAL_SUBJECT_KEY),
            Some(&serde_json::Value::String("minimal".into()))
        );
        assert_eq!(
            ex.property(PRINCIPAL_ISSUER_KEY),
            Some(&serde_json::Value::String(String::new()))
        );
        let roles: Vec<String> =
            serde_json::from_str(ex.property(PRINCIPAL_ROLES_KEY).unwrap().as_str().unwrap())
                .unwrap();
        assert!(roles.is_empty());
    }

    #[tokio::test]
    async fn test_layer_clone_produces_working_service() {
        let layer = SecurityPolicyLayer::new(Arc::new(GrantPolicy));
        let mut svc1 = layer.layer(ok_processor());
        let svc2 = svc1.clone();

        let r1 = svc1.ready().await.unwrap().call(make_exchange()).await;
        let mut svc2 = svc2;
        let r2 = svc2.ready().await.unwrap().call(make_exchange()).await;
        assert!(r1.is_ok());
        assert!(r2.is_ok());
    }

    #[tokio::test]
    async fn test_granted_preserves_original_exchange_properties() {
        struct GrantPolicy;
        #[async_trait]
        impl SecurityPolicy for GrantPolicy {
            async fn evaluate(
                &self,
                _exchange: &mut Exchange,
            ) -> Result<AuthorizationDecision, CamelError> {
                Ok(AuthorizationDecision::Granted {
                    principal: Principal {
                        subject: "u".into(),
                        issuer: "i".into(),
                        audience: vec![],
                        scopes: vec![],
                        roles: vec![],
                        claims: serde_json::Value::Null,
                    },
                })
            }
        }
        let layer = SecurityPolicyLayer::new(Arc::new(GrantPolicy));
        let mut svc = layer.layer(ok_processor());
        let mut ex = make_exchange();
        ex.set_property("custom.key", "custom-value");
        let result = svc.ready().await.unwrap().call(ex).await;
        let ex = result.unwrap();
        assert_eq!(
            ex.property("custom.key"),
            Some(&serde_json::Value::String("custom-value".into()))
        );
        assert!(ex.property(PRINCIPAL_SUBJECT_KEY).is_some());
    }
}