1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use tower::{Layer, Service};
7
8use camel_api::security_policy::{
9 AuthorizationDecision, SecurityPolicy, store_principal_properties,
10};
11use camel_api::{CamelError, Exchange};
12
13#[derive(Clone)]
14pub struct SecurityPolicyLayer {
15 policy: Arc<dyn SecurityPolicy>,
16}
17
18impl SecurityPolicyLayer {
19 pub fn new(policy: Arc<dyn SecurityPolicy>) -> Self {
20 Self { policy }
21 }
22}
23
24impl<S> Layer<S> for SecurityPolicyLayer {
25 type Service = SecurityPolicyService<S>;
26
27 fn layer(&self, inner: S) -> Self::Service {
28 SecurityPolicyService {
29 inner,
30 policy: Arc::clone(&self.policy),
31 }
32 }
33}
34
35pub struct SecurityPolicyService<S> {
36 inner: S,
37 policy: Arc<dyn SecurityPolicy>,
38}
39
40impl<S: Clone> Clone for SecurityPolicyService<S> {
41 fn clone(&self) -> Self {
42 Self {
43 inner: self.inner.clone(),
44 policy: Arc::clone(&self.policy),
45 }
46 }
47}
48
49impl<S> Service<Exchange> for SecurityPolicyService<S>
50where
51 S: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + 'static,
52 S::Future: Send,
53{
54 type Response = Exchange;
55 type Error = CamelError;
56 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
57
58 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
59 self.inner.poll_ready(cx)
60 }
61
62 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
63 let policy = Arc::clone(&self.policy);
64 let clone = self.inner.clone();
65 let mut inner = std::mem::replace(&mut self.inner, clone);
66
67 Box::pin(async move {
68 match policy.evaluate(&mut exchange).await {
69 Ok(AuthorizationDecision::Granted { principal }) => {
70 store_principal_properties(&mut exchange, &principal);
71 inner.call(exchange).await
72 }
73 Ok(AuthorizationDecision::Denied {
74 reason,
75 required,
76 actual,
77 }) => {
78 let msg = format!(
79 "Access denied: {reason}. Required: {required:?}, actual: {actual:?}"
80 );
81 Err(CamelError::Unauthorized(msg))
82 }
83 Err(e) => Err(e),
84 }
85 })
86 }
87}
88
89#[cfg(test)]
90mod tests {
91 use super::*;
92 use async_trait::async_trait;
93 use camel_api::security_policy::{
94 PRINCIPAL_AUDIENCE_KEY, PRINCIPAL_CLAIMS_KEY, PRINCIPAL_ISSUER_KEY, PRINCIPAL_KEY,
95 PRINCIPAL_ROLES_KEY, PRINCIPAL_SCOPES_KEY, PRINCIPAL_SUBJECT_KEY, Principal,
96 };
97 use camel_api::{BoxProcessor, BoxProcessorExt, Message};
98 use std::sync::atomic::{AtomicU32, Ordering};
99 use tower::ServiceExt;
100
101 fn make_exchange() -> Exchange {
102 Exchange::new(Message::new("test"))
103 }
104
105 fn ok_processor() -> BoxProcessor {
106 BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
107 }
108
109 fn test_principal() -> Principal {
110 Principal {
111 subject: "user1".into(),
112 issuer: "test-issuer".into(),
113 audience: vec!["api".into()],
114 scopes: vec!["read".into()],
115 roles: vec!["admin".into()],
116 claims: serde_json::json!({"sub": "user1"}),
117 }
118 }
119
120 struct GrantPolicy;
121 #[async_trait]
122 impl SecurityPolicy for GrantPolicy {
123 async fn evaluate(
124 &self,
125 _exchange: &mut Exchange,
126 ) -> Result<AuthorizationDecision, CamelError> {
127 Ok(AuthorizationDecision::Granted {
128 principal: test_principal(),
129 })
130 }
131 }
132
133 struct DenyPolicy;
134 #[async_trait]
135 impl SecurityPolicy for DenyPolicy {
136 async fn evaluate(
137 &self,
138 _exchange: &mut Exchange,
139 ) -> Result<AuthorizationDecision, CamelError> {
140 Ok(AuthorizationDecision::Denied {
141 reason: "missing role".into(),
142 required: vec!["admin".into()],
143 actual: vec!["user".into()],
144 })
145 }
146 }
147
148 struct FailPolicy;
149 #[async_trait]
150 impl SecurityPolicy for FailPolicy {
151 async fn evaluate(
152 &self,
153 _exchange: &mut Exchange,
154 ) -> Result<AuthorizationDecision, CamelError> {
155 Err(CamelError::Unauthenticated("invalid token".into()))
156 }
157 }
158
159 #[tokio::test]
160 async fn test_granted_stores_properties() {
161 let layer = SecurityPolicyLayer::new(Arc::new(GrantPolicy));
162 let mut svc = layer.layer(ok_processor());
163 let result = svc.ready().await.unwrap().call(make_exchange()).await;
164 assert!(result.is_ok());
165 let ex = result.unwrap();
166 assert_eq!(
167 ex.property(PRINCIPAL_SUBJECT_KEY),
168 Some(&serde_json::Value::String("user1".into()))
169 );
170 assert_eq!(
171 ex.property(PRINCIPAL_ISSUER_KEY),
172 Some(&serde_json::Value::String("test-issuer".into()))
173 );
174 assert!(ex.property(PRINCIPAL_KEY).is_some());
175 }
176
177 #[tokio::test]
178 async fn test_denied_returns_unauthorized_error() {
179 let layer = SecurityPolicyLayer::new(Arc::new(DenyPolicy));
180 let mut svc = layer.layer(ok_processor());
181 let result = svc.ready().await.unwrap().call(make_exchange()).await;
182 assert!(result.is_err());
183 match result.unwrap_err() {
184 CamelError::Unauthorized(msg) => assert!(msg.contains("missing role")),
185 other => panic!("expected Unauthorized, got: {other:?}"),
186 }
187 }
188
189 #[tokio::test]
190 async fn test_denied_error_contains_required_actual() {
191 let layer = SecurityPolicyLayer::new(Arc::new(DenyPolicy));
192 let mut svc = layer.layer(ok_processor());
193 let result = svc.ready().await.unwrap().call(make_exchange()).await;
194 let msg = match result.unwrap_err() {
195 CamelError::Unauthorized(msg) => msg,
196 other => panic!("expected Unauthorized, got: {other:?}"),
197 };
198 assert!(msg.contains("admin"));
199 assert!(msg.contains("user"));
200 }
201
202 #[tokio::test]
203 async fn test_evaluate_error_propagates() {
204 let layer = SecurityPolicyLayer::new(Arc::new(FailPolicy));
205 let mut svc = layer.layer(ok_processor());
206 let result = svc.ready().await.unwrap().call(make_exchange()).await;
207 match result.unwrap_err() {
208 CamelError::Unauthenticated(msg) => assert!(msg.contains("invalid token")),
209 other => panic!("expected Unauthenticated, got: {other:?}"),
210 }
211 }
212
213 #[tokio::test]
214 async fn test_multiple_calls_share_policy() {
215 let count = Arc::new(AtomicU32::new(0));
216 struct CountingPolicy {
217 count: Arc<AtomicU32>,
218 }
219 #[async_trait]
220 impl SecurityPolicy for CountingPolicy {
221 async fn evaluate(
222 &self,
223 _exchange: &mut Exchange,
224 ) -> Result<AuthorizationDecision, CamelError> {
225 self.count.fetch_add(1, Ordering::SeqCst);
226 Ok(AuthorizationDecision::Granted {
227 principal: Principal {
228 subject: "user1".into(),
229 issuer: "test".into(),
230 audience: vec![],
231 scopes: vec![],
232 roles: vec![],
233 claims: serde_json::Value::Null,
234 },
235 })
236 }
237 }
238 let policy = Arc::new(CountingPolicy {
239 count: Arc::clone(&count),
240 });
241 let layer = SecurityPolicyLayer::new(Arc::clone(&policy) as Arc<dyn SecurityPolicy>);
242 let mut svc = layer.layer(ok_processor());
243 for _ in 0..3 {
244 let result = svc.ready().await.unwrap().call(make_exchange()).await;
245 assert!(result.is_ok());
246 }
247 assert_eq!(count.load(Ordering::SeqCst), 3);
248 }
249
250 #[tokio::test]
251 async fn test_granted_all_property_json_formats() {
252 let layer = SecurityPolicyLayer::new(Arc::new(GrantPolicy));
253 let mut svc = layer.layer(ok_processor());
254 let result = svc.ready().await.unwrap().call(make_exchange()).await;
255 let ex = result.unwrap();
256
257 let roles: Vec<String> =
258 serde_json::from_str(ex.property(PRINCIPAL_ROLES_KEY).unwrap().as_str().unwrap())
259 .unwrap();
260 assert_eq!(roles, vec!["admin"]);
261
262 let scopes: Vec<String> =
263 serde_json::from_str(ex.property(PRINCIPAL_SCOPES_KEY).unwrap().as_str().unwrap())
264 .unwrap();
265 assert_eq!(scopes, vec!["read"]);
266
267 let audience: Vec<String> = serde_json::from_str(
268 ex.property(PRINCIPAL_AUDIENCE_KEY)
269 .unwrap()
270 .as_str()
271 .unwrap(),
272 )
273 .unwrap();
274 assert_eq!(audience, vec!["api"]);
275
276 let claims: serde_json::Value =
277 serde_json::from_str(ex.property(PRINCIPAL_CLAIMS_KEY).unwrap().as_str().unwrap())
278 .unwrap();
279 assert_eq!(claims["sub"], "user1");
280 }
281
282 #[tokio::test]
283 async fn test_granted_empty_principal_fields() {
284 struct EmptyPrincipalPolicy;
285 #[async_trait]
286 impl SecurityPolicy for EmptyPrincipalPolicy {
287 async fn evaluate(
288 &self,
289 _exchange: &mut Exchange,
290 ) -> Result<AuthorizationDecision, CamelError> {
291 Ok(AuthorizationDecision::Granted {
292 principal: Principal {
293 subject: "minimal".into(),
294 issuer: String::new(),
295 audience: vec![],
296 scopes: vec![],
297 roles: vec![],
298 claims: serde_json::Value::Null,
299 },
300 })
301 }
302 }
303 let layer = SecurityPolicyLayer::new(Arc::new(EmptyPrincipalPolicy));
304 let mut svc = layer.layer(ok_processor());
305 let result = svc.ready().await.unwrap().call(make_exchange()).await;
306 let ex = result.unwrap();
307
308 assert_eq!(
309 ex.property(PRINCIPAL_SUBJECT_KEY),
310 Some(&serde_json::Value::String("minimal".into()))
311 );
312 assert_eq!(
313 ex.property(PRINCIPAL_ISSUER_KEY),
314 Some(&serde_json::Value::String(String::new()))
315 );
316 let roles: Vec<String> =
317 serde_json::from_str(ex.property(PRINCIPAL_ROLES_KEY).unwrap().as_str().unwrap())
318 .unwrap();
319 assert!(roles.is_empty());
320 }
321
322 #[tokio::test]
323 async fn test_layer_clone_produces_working_service() {
324 let layer = SecurityPolicyLayer::new(Arc::new(GrantPolicy));
325 let mut svc1 = layer.layer(ok_processor());
326 let svc2 = svc1.clone();
327
328 let r1 = svc1.ready().await.unwrap().call(make_exchange()).await;
329 let mut svc2 = svc2;
330 let r2 = svc2.ready().await.unwrap().call(make_exchange()).await;
331 assert!(r1.is_ok());
332 assert!(r2.is_ok());
333 }
334
335 #[tokio::test]
336 async fn test_granted_preserves_original_exchange_properties() {
337 struct GrantPolicy;
338 #[async_trait]
339 impl SecurityPolicy for GrantPolicy {
340 async fn evaluate(
341 &self,
342 _exchange: &mut Exchange,
343 ) -> Result<AuthorizationDecision, CamelError> {
344 Ok(AuthorizationDecision::Granted {
345 principal: Principal {
346 subject: "u".into(),
347 issuer: "i".into(),
348 audience: vec![],
349 scopes: vec![],
350 roles: vec![],
351 claims: serde_json::Value::Null,
352 },
353 })
354 }
355 }
356 let layer = SecurityPolicyLayer::new(Arc::new(GrantPolicy));
357 let mut svc = layer.layer(ok_processor());
358 let mut ex = make_exchange();
359 ex.set_property("custom.key", "custom-value");
360 let result = svc.ready().await.unwrap().call(ex).await;
361 let ex = result.unwrap();
362 assert_eq!(
363 ex.property("custom.key"),
364 Some(&serde_json::Value::String("custom-value".into()))
365 );
366 assert!(ex.property(PRINCIPAL_SUBJECT_KEY).is_some());
367 }
368}