Skip to main content

camel_processor/
security_policy_layer.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use tower::{Layer, Service};
7
8use camel_api::security_policy::{
9    AuthorizationDecision, SecurityPolicy, store_principal_properties,
10};
11use camel_api::{CamelError, Exchange};
12
13#[derive(Clone)]
14pub struct SecurityPolicyLayer {
15    policy: Arc<dyn SecurityPolicy>,
16}
17
18impl SecurityPolicyLayer {
19    pub fn new(policy: Arc<dyn SecurityPolicy>) -> Self {
20        Self { policy }
21    }
22}
23
24impl<S> Layer<S> for SecurityPolicyLayer {
25    type Service = SecurityPolicyService<S>;
26
27    fn layer(&self, inner: S) -> Self::Service {
28        SecurityPolicyService {
29            inner,
30            policy: Arc::clone(&self.policy),
31        }
32    }
33}
34
35pub struct SecurityPolicyService<S> {
36    inner: S,
37    policy: Arc<dyn SecurityPolicy>,
38}
39
40impl<S: Clone> Clone for SecurityPolicyService<S> {
41    fn clone(&self) -> Self {
42        Self {
43            inner: self.inner.clone(),
44            policy: Arc::clone(&self.policy),
45        }
46    }
47}
48
49impl<S> Service<Exchange> for SecurityPolicyService<S>
50where
51    S: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + 'static,
52    S::Future: Send,
53{
54    type Response = Exchange;
55    type Error = CamelError;
56    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
57
58    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
59        self.inner.poll_ready(cx)
60    }
61
62    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
63        let policy = Arc::clone(&self.policy);
64        let clone = self.inner.clone();
65        let mut inner = std::mem::replace(&mut self.inner, clone);
66
67        Box::pin(async move {
68            match policy.evaluate(&mut exchange).await {
69                Ok(AuthorizationDecision::Granted { principal }) => {
70                    store_principal_properties(&mut exchange, &principal);
71                    inner.call(exchange).await
72                }
73                Ok(AuthorizationDecision::Denied {
74                    reason,
75                    required,
76                    actual,
77                }) => {
78                    let msg = format!(
79                        "Access denied: {reason}. Required: {required:?}, actual: {actual:?}"
80                    );
81                    Err(CamelError::Unauthorized(msg))
82                }
83                Err(e) => Err(e),
84            }
85        })
86    }
87}
88
89#[cfg(test)]
90mod tests {
91    use super::*;
92    use async_trait::async_trait;
93    use camel_api::security_policy::{
94        PRINCIPAL_AUDIENCE_KEY, PRINCIPAL_CLAIMS_KEY, PRINCIPAL_ISSUER_KEY, PRINCIPAL_KEY,
95        PRINCIPAL_ROLES_KEY, PRINCIPAL_SCOPES_KEY, PRINCIPAL_SUBJECT_KEY, Principal,
96    };
97    use camel_api::{BoxProcessor, BoxProcessorExt, Message};
98    use std::sync::atomic::{AtomicU32, Ordering};
99    use tower::ServiceExt;
100
101    fn make_exchange() -> Exchange {
102        Exchange::new(Message::new("test"))
103    }
104
105    fn ok_processor() -> BoxProcessor {
106        BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
107    }
108
109    fn test_principal() -> Principal {
110        Principal {
111            subject: "user1".into(),
112            issuer: "test-issuer".into(),
113            audience: vec!["api".into()],
114            scopes: vec!["read".into()],
115            roles: vec!["admin".into()],
116            claims: serde_json::json!({"sub": "user1"}),
117        }
118    }
119
120    struct GrantPolicy;
121    #[async_trait]
122    impl SecurityPolicy for GrantPolicy {
123        async fn evaluate(
124            &self,
125            _exchange: &mut Exchange,
126        ) -> Result<AuthorizationDecision, CamelError> {
127            Ok(AuthorizationDecision::Granted {
128                principal: test_principal(),
129            })
130        }
131    }
132
133    struct DenyPolicy;
134    #[async_trait]
135    impl SecurityPolicy for DenyPolicy {
136        async fn evaluate(
137            &self,
138            _exchange: &mut Exchange,
139        ) -> Result<AuthorizationDecision, CamelError> {
140            Ok(AuthorizationDecision::Denied {
141                reason: "missing role".into(),
142                required: vec!["admin".into()],
143                actual: vec!["user".into()],
144            })
145        }
146    }
147
148    struct FailPolicy;
149    #[async_trait]
150    impl SecurityPolicy for FailPolicy {
151        async fn evaluate(
152            &self,
153            _exchange: &mut Exchange,
154        ) -> Result<AuthorizationDecision, CamelError> {
155            Err(CamelError::Unauthenticated("invalid token".into()))
156        }
157    }
158
159    #[tokio::test]
160    async fn test_granted_stores_properties() {
161        let layer = SecurityPolicyLayer::new(Arc::new(GrantPolicy));
162        let mut svc = layer.layer(ok_processor());
163        let result = svc.ready().await.unwrap().call(make_exchange()).await;
164        assert!(result.is_ok());
165        let ex = result.unwrap();
166        assert_eq!(
167            ex.property(PRINCIPAL_SUBJECT_KEY),
168            Some(&serde_json::Value::String("user1".into()))
169        );
170        assert_eq!(
171            ex.property(PRINCIPAL_ISSUER_KEY),
172            Some(&serde_json::Value::String("test-issuer".into()))
173        );
174        assert!(ex.property(PRINCIPAL_KEY).is_some());
175    }
176
177    #[tokio::test]
178    async fn test_denied_returns_unauthorized_error() {
179        let layer = SecurityPolicyLayer::new(Arc::new(DenyPolicy));
180        let mut svc = layer.layer(ok_processor());
181        let result = svc.ready().await.unwrap().call(make_exchange()).await;
182        assert!(result.is_err());
183        match result.unwrap_err() {
184            CamelError::Unauthorized(msg) => assert!(msg.contains("missing role")),
185            other => panic!("expected Unauthorized, got: {other:?}"),
186        }
187    }
188
189    #[tokio::test]
190    async fn test_denied_error_contains_required_actual() {
191        let layer = SecurityPolicyLayer::new(Arc::new(DenyPolicy));
192        let mut svc = layer.layer(ok_processor());
193        let result = svc.ready().await.unwrap().call(make_exchange()).await;
194        let msg = match result.unwrap_err() {
195            CamelError::Unauthorized(msg) => msg,
196            other => panic!("expected Unauthorized, got: {other:?}"),
197        };
198        assert!(msg.contains("admin"));
199        assert!(msg.contains("user"));
200    }
201
202    #[tokio::test]
203    async fn test_evaluate_error_propagates() {
204        let layer = SecurityPolicyLayer::new(Arc::new(FailPolicy));
205        let mut svc = layer.layer(ok_processor());
206        let result = svc.ready().await.unwrap().call(make_exchange()).await;
207        match result.unwrap_err() {
208            CamelError::Unauthenticated(msg) => assert!(msg.contains("invalid token")),
209            other => panic!("expected Unauthenticated, got: {other:?}"),
210        }
211    }
212
213    #[tokio::test]
214    async fn test_multiple_calls_share_policy() {
215        let count = Arc::new(AtomicU32::new(0));
216        struct CountingPolicy {
217            count: Arc<AtomicU32>,
218        }
219        #[async_trait]
220        impl SecurityPolicy for CountingPolicy {
221            async fn evaluate(
222                &self,
223                _exchange: &mut Exchange,
224            ) -> Result<AuthorizationDecision, CamelError> {
225                self.count.fetch_add(1, Ordering::SeqCst);
226                Ok(AuthorizationDecision::Granted {
227                    principal: Principal {
228                        subject: "user1".into(),
229                        issuer: "test".into(),
230                        audience: vec![],
231                        scopes: vec![],
232                        roles: vec![],
233                        claims: serde_json::Value::Null,
234                    },
235                })
236            }
237        }
238        let policy = Arc::new(CountingPolicy {
239            count: Arc::clone(&count),
240        });
241        let layer = SecurityPolicyLayer::new(Arc::clone(&policy) as Arc<dyn SecurityPolicy>);
242        let mut svc = layer.layer(ok_processor());
243        for _ in 0..3 {
244            let result = svc.ready().await.unwrap().call(make_exchange()).await;
245            assert!(result.is_ok());
246        }
247        assert_eq!(count.load(Ordering::SeqCst), 3);
248    }
249
250    #[tokio::test]
251    async fn test_granted_all_property_json_formats() {
252        let layer = SecurityPolicyLayer::new(Arc::new(GrantPolicy));
253        let mut svc = layer.layer(ok_processor());
254        let result = svc.ready().await.unwrap().call(make_exchange()).await;
255        let ex = result.unwrap();
256
257        let roles: Vec<String> =
258            serde_json::from_str(ex.property(PRINCIPAL_ROLES_KEY).unwrap().as_str().unwrap())
259                .unwrap();
260        assert_eq!(roles, vec!["admin"]);
261
262        let scopes: Vec<String> =
263            serde_json::from_str(ex.property(PRINCIPAL_SCOPES_KEY).unwrap().as_str().unwrap())
264                .unwrap();
265        assert_eq!(scopes, vec!["read"]);
266
267        let audience: Vec<String> = serde_json::from_str(
268            ex.property(PRINCIPAL_AUDIENCE_KEY)
269                .unwrap()
270                .as_str()
271                .unwrap(),
272        )
273        .unwrap();
274        assert_eq!(audience, vec!["api"]);
275
276        let claims: serde_json::Value =
277            serde_json::from_str(ex.property(PRINCIPAL_CLAIMS_KEY).unwrap().as_str().unwrap())
278                .unwrap();
279        assert_eq!(claims["sub"], "user1");
280    }
281
282    #[tokio::test]
283    async fn test_granted_empty_principal_fields() {
284        struct EmptyPrincipalPolicy;
285        #[async_trait]
286        impl SecurityPolicy for EmptyPrincipalPolicy {
287            async fn evaluate(
288                &self,
289                _exchange: &mut Exchange,
290            ) -> Result<AuthorizationDecision, CamelError> {
291                Ok(AuthorizationDecision::Granted {
292                    principal: Principal {
293                        subject: "minimal".into(),
294                        issuer: String::new(),
295                        audience: vec![],
296                        scopes: vec![],
297                        roles: vec![],
298                        claims: serde_json::Value::Null,
299                    },
300                })
301            }
302        }
303        let layer = SecurityPolicyLayer::new(Arc::new(EmptyPrincipalPolicy));
304        let mut svc = layer.layer(ok_processor());
305        let result = svc.ready().await.unwrap().call(make_exchange()).await;
306        let ex = result.unwrap();
307
308        assert_eq!(
309            ex.property(PRINCIPAL_SUBJECT_KEY),
310            Some(&serde_json::Value::String("minimal".into()))
311        );
312        assert_eq!(
313            ex.property(PRINCIPAL_ISSUER_KEY),
314            Some(&serde_json::Value::String(String::new()))
315        );
316        let roles: Vec<String> =
317            serde_json::from_str(ex.property(PRINCIPAL_ROLES_KEY).unwrap().as_str().unwrap())
318                .unwrap();
319        assert!(roles.is_empty());
320    }
321
322    #[tokio::test]
323    async fn test_layer_clone_produces_working_service() {
324        let layer = SecurityPolicyLayer::new(Arc::new(GrantPolicy));
325        let mut svc1 = layer.layer(ok_processor());
326        let svc2 = svc1.clone();
327
328        let r1 = svc1.ready().await.unwrap().call(make_exchange()).await;
329        let mut svc2 = svc2;
330        let r2 = svc2.ready().await.unwrap().call(make_exchange()).await;
331        assert!(r1.is_ok());
332        assert!(r2.is_ok());
333    }
334
335    #[tokio::test]
336    async fn test_granted_preserves_original_exchange_properties() {
337        struct GrantPolicy;
338        #[async_trait]
339        impl SecurityPolicy for GrantPolicy {
340            async fn evaluate(
341                &self,
342                _exchange: &mut Exchange,
343            ) -> Result<AuthorizationDecision, CamelError> {
344                Ok(AuthorizationDecision::Granted {
345                    principal: Principal {
346                        subject: "u".into(),
347                        issuer: "i".into(),
348                        audience: vec![],
349                        scopes: vec![],
350                        roles: vec![],
351                        claims: serde_json::Value::Null,
352                    },
353                })
354            }
355        }
356        let layer = SecurityPolicyLayer::new(Arc::new(GrantPolicy));
357        let mut svc = layer.layer(ok_processor());
358        let mut ex = make_exchange();
359        ex.set_property("custom.key", "custom-value");
360        let result = svc.ready().await.unwrap().call(ex).await;
361        let ex = result.unwrap();
362        assert_eq!(
363            ex.property("custom.key"),
364            Some(&serde_json::Value::String("custom-value".into()))
365        );
366        assert!(ex.property(PRINCIPAL_SUBJECT_KEY).is_some());
367    }
368}