1use std::sync::Arc;
2
3use async_trait::async_trait;
4use tokio::sync::{mpsc, oneshot};
5use tokio::task::JoinHandle;
6use tokio_util::sync::CancellationToken;
7
8use camel_api::security_policy::SecurityPolicy;
9use camel_api::{CamelError, Exchange};
10use camel_auth::{CredentialSource, TokenAuthenticator};
11
12pub struct ExchangeEnvelope {
18 pub exchange: Exchange,
19 pub reply_tx: Option<oneshot::Sender<Result<Exchange, CamelError>>>,
20}
21
22#[derive(Clone)]
24pub struct ConsumerContext {
25 sender: mpsc::Sender<ExchangeEnvelope>,
26 cancel_token: CancellationToken,
27}
28
29impl ConsumerContext {
30 pub fn new(sender: mpsc::Sender<ExchangeEnvelope>, cancel_token: CancellationToken) -> Self {
32 Self {
33 sender,
34 cancel_token,
35 }
36 }
37
38 pub async fn cancelled(&self) {
41 self.cancel_token.cancelled().await
42 }
43
44 pub fn is_cancelled(&self) -> bool {
46 self.cancel_token.is_cancelled()
47 }
48
49 pub fn cancel_token(&self) -> CancellationToken {
54 self.cancel_token.clone()
55 }
56
57 pub fn sender(&self) -> mpsc::Sender<ExchangeEnvelope> {
63 self.sender.clone()
64 }
65
66 pub async fn send(&self, exchange: Exchange) -> Result<(), CamelError> {
68 self.sender
69 .send(ExchangeEnvelope {
70 exchange,
71 reply_tx: None,
72 })
73 .await
74 .map_err(|_| CamelError::ChannelClosed)
75 }
76
77 pub async fn send_and_wait(&self, exchange: Exchange) -> Result<Exchange, CamelError> {
82 let (reply_tx, reply_rx) = oneshot::channel();
83 self.sender
84 .send(ExchangeEnvelope {
85 exchange,
86 reply_tx: Some(reply_tx),
87 })
88 .await
89 .map_err(|_| CamelError::ChannelClosed)?;
90 reply_rx.await.map_err(|_| CamelError::ChannelClosed)?
91 }
92}
93
94pub struct SecurityContext {
100 pub policy: Arc<dyn SecurityPolicy>,
101 pub authenticator: Arc<dyn TokenAuthenticator>,
102 pub credential_sources: Vec<CredentialSource>,
103}
104
105impl SecurityContext {
106 pub fn new(
107 policy: impl SecurityPolicy + 'static,
108 authenticator: Arc<dyn TokenAuthenticator>,
109 ) -> Self {
110 Self {
111 policy: Arc::new(policy),
112 authenticator,
113 credential_sources: vec![CredentialSource::AuthorizationHeader],
114 }
115 }
116
117 pub fn from_arc(
118 policy: Arc<dyn SecurityPolicy>,
119 authenticator: Arc<dyn TokenAuthenticator>,
120 ) -> Self {
121 Self {
122 policy,
123 authenticator,
124 credential_sources: vec![CredentialSource::AuthorizationHeader],
125 }
126 }
127
128 pub fn with_credential_sources(mut self, sources: Vec<CredentialSource>) -> Self {
129 self.credential_sources = sources;
130 self
131 }
132}
133
134impl Clone for SecurityContext {
135 fn clone(&self) -> Self {
136 Self {
137 policy: Arc::clone(&self.policy),
138 authenticator: Arc::clone(&self.authenticator),
139 credential_sources: self.credential_sources.clone(),
140 }
141 }
142}
143
144impl std::fmt::Debug for SecurityContext {
145 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146 f.debug_struct("SecurityContext")
147 .field("policy", &"<SecurityPolicy>")
148 .field("authenticator", &"<TokenAuthenticator>")
149 .field("credential_sources", &self.credential_sources)
150 .finish()
151 }
152}
153
154#[derive(Debug, Clone, PartialEq, Eq)]
156pub enum ConcurrencyModel {
157 Sequential,
160 Concurrent { max: Option<usize> },
164}
165
166#[async_trait]
190pub trait Consumer: Send + Sync {
191 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError>;
193
194 async fn stop(&mut self) -> Result<(), CamelError>;
199
200 async fn suspend(&self) -> Result<(), CamelError> {
204 Ok(())
205 }
206
207 async fn resume(&self) -> Result<(), CamelError> {
211 Ok(())
212 }
213
214 fn concurrency_model(&self) -> ConcurrencyModel {
223 ConcurrencyModel::Sequential
224 }
225
226 fn background_task_handle(&mut self) -> Option<JoinHandle<Result<(), CamelError>>> {
238 None
239 }
240
241 fn set_security_context(&mut self, _ctx: SecurityContext) {}
248}
249
250#[cfg(test)]
251mod tests {
252 use super::*;
253
254 #[tokio::test]
255 async fn test_consumer_context_cancelled() {
256 let (tx, _rx) = mpsc::channel(16);
257 let token = CancellationToken::new();
258 let ctx = ConsumerContext::new(tx, token.clone());
259
260 assert!(!ctx.is_cancelled());
261 token.cancel();
262 ctx.cancelled().await;
263 assert!(ctx.is_cancelled());
264 }
265
266 #[test]
267 fn test_concurrency_model_default_is_sequential() {
268 use super::ConcurrencyModel;
269
270 struct DummyConsumer;
271
272 #[async_trait::async_trait]
273 impl super::Consumer for DummyConsumer {
274 async fn start(&mut self, _ctx: super::ConsumerContext) -> Result<(), CamelError> {
275 Ok(())
276 }
277 async fn stop(&mut self) -> Result<(), CamelError> {
278 Ok(())
279 }
280 }
281
282 let consumer = DummyConsumer;
283 assert_eq!(consumer.concurrency_model(), ConcurrencyModel::Sequential);
284 }
285
286 #[test]
287 fn test_concurrency_model_concurrent_override() {
288 use super::ConcurrencyModel;
289
290 struct ConcurrentConsumer;
291
292 #[async_trait::async_trait]
293 impl super::Consumer for ConcurrentConsumer {
294 async fn start(&mut self, _ctx: super::ConsumerContext) -> Result<(), CamelError> {
295 Ok(())
296 }
297 async fn stop(&mut self) -> Result<(), CamelError> {
298 Ok(())
299 }
300 fn concurrency_model(&self) -> ConcurrencyModel {
301 ConcurrencyModel::Concurrent { max: Some(16) }
302 }
303 }
304
305 let consumer = ConcurrentConsumer;
306 assert_eq!(
307 consumer.concurrency_model(),
308 ConcurrencyModel::Concurrent { max: Some(16) }
309 );
310 }
311
312 #[tokio::test]
313 async fn test_consumer_default_suspend_resume() {
314 struct DummyConsumer;
315
316 #[async_trait::async_trait]
317 impl super::Consumer for DummyConsumer {
318 async fn start(&mut self, _ctx: super::ConsumerContext) -> Result<(), CamelError> {
319 Ok(())
320 }
321 async fn stop(&mut self) -> Result<(), CamelError> {
322 Ok(())
323 }
324 }
325
326 let consumer = DummyConsumer;
327 assert!(consumer.suspend().await.is_ok());
328 assert!(consumer.resume().await.is_ok());
329 }
330
331 struct StubPolicy;
334
335 #[async_trait::async_trait]
336 impl SecurityPolicy for StubPolicy {
337 async fn evaluate(
338 &self,
339 _exchange: &mut Exchange,
340 ) -> Result<camel_api::security_policy::AuthorizationDecision, CamelError> {
341 Ok(camel_api::security_policy::AuthorizationDecision::Granted {
342 principal: camel_api::security_policy::Principal {
343 subject: "stub".into(),
344 issuer: "stub".into(),
345 audience: vec![],
346 scopes: vec![],
347 roles: vec![],
348 claims: serde_json::json!({}),
349 },
350 })
351 }
352 }
353
354 struct StubAuthenticator;
355
356 #[async_trait::async_trait]
357 impl camel_auth::TokenAuthenticator for StubAuthenticator {
358 async fn authenticate_bearer(
359 &self,
360 _token: &str,
361 ) -> Result<camel_api::security_policy::Principal, CamelError> {
362 Ok(camel_api::security_policy::Principal {
363 subject: "stub".into(),
364 issuer: "stub".into(),
365 audience: vec![],
366 scopes: vec![],
367 roles: vec![],
368 claims: serde_json::json!({}),
369 })
370 }
371 }
372
373 #[test]
374 fn test_security_context_new() {
375 let ctx = SecurityContext::new(StubPolicy, Arc::new(StubAuthenticator));
376 assert!(Arc::strong_count(&ctx.policy) == 1);
377 assert!(Arc::strong_count(&ctx.authenticator) == 1);
378 assert_eq!(
379 ctx.credential_sources,
380 vec![camel_auth::CredentialSource::AuthorizationHeader]
381 );
382 }
383
384 #[test]
385 fn test_security_context_from_arc() {
386 let policy: Arc<dyn SecurityPolicy> = Arc::new(StubPolicy);
387 let authenticator: Arc<dyn camel_auth::TokenAuthenticator> = Arc::new(StubAuthenticator);
388 let ctx = SecurityContext::from_arc(Arc::clone(&policy), Arc::clone(&authenticator));
389 assert!(Arc::ptr_eq(&ctx.policy, &policy));
390 assert!(Arc::ptr_eq(&ctx.authenticator, &authenticator));
391 assert_eq!(
392 ctx.credential_sources,
393 vec![camel_auth::CredentialSource::AuthorizationHeader]
394 );
395 }
396
397 #[test]
398 fn test_security_context_clone_independent() {
399 let ctx = SecurityContext::new(StubPolicy, Arc::new(StubAuthenticator));
400 let cloned = ctx.clone();
401 assert!(Arc::ptr_eq(&ctx.policy, &cloned.policy));
402 assert!(Arc::ptr_eq(&ctx.authenticator, &cloned.authenticator));
403 assert_eq!(ctx.credential_sources, cloned.credential_sources);
404 }
405
406 #[test]
407 fn test_security_context_debug_redacts_traits() {
408 let ctx = SecurityContext::new(StubPolicy, Arc::new(StubAuthenticator));
409 let debug_str = format!("{ctx:?}");
410 assert!(debug_str.contains("<SecurityPolicy>"));
411 assert!(debug_str.contains("<TokenAuthenticator>"));
412 assert!(debug_str.contains("credential_sources"));
413 }
414
415 #[test]
416 fn test_security_context_with_credential_sources() {
417 let ctx = SecurityContext::new(StubPolicy, Arc::new(StubAuthenticator))
418 .with_credential_sources(vec![
419 camel_auth::CredentialSource::Cookie {
420 name: "session".into(),
421 },
422 camel_auth::CredentialSource::AuthorizationHeader,
423 ]);
424 assert_eq!(ctx.credential_sources.len(), 2);
425 assert!(matches!(
426 &ctx.credential_sources[0],
427 camel_auth::CredentialSource::Cookie { .. }
428 ));
429 }
430}