1use std::sync::Arc;
2
3use async_trait::async_trait;
4use tokio::sync::{mpsc, oneshot};
5use tokio::task::JoinHandle;
6use tokio_util::sync::CancellationToken;
7
8use camel_api::security_policy::SecurityPolicy;
9use camel_api::{CamelError, Exchange};
10use camel_auth::{CredentialSource, TokenAuthenticator};
11
12pub struct ExchangeEnvelope {
18 pub exchange: Exchange,
19 pub reply_tx: Option<oneshot::Sender<Result<Exchange, CamelError>>>,
20}
21
22#[derive(Clone)]
24pub struct ConsumerContext {
25 sender: mpsc::Sender<ExchangeEnvelope>,
26 cancel_token: CancellationToken,
27 route_id: String,
28}
29
30impl ConsumerContext {
31 pub fn new(
36 sender: mpsc::Sender<ExchangeEnvelope>,
37 cancel_token: CancellationToken,
38 route_id: String,
39 ) -> Self {
40 Self {
41 sender,
42 cancel_token,
43 route_id,
44 }
45 }
46
47 pub async fn cancelled(&self) {
50 self.cancel_token.cancelled().await
51 }
52
53 pub fn is_cancelled(&self) -> bool {
55 self.cancel_token.is_cancelled()
56 }
57
58 pub fn route_id(&self) -> &str {
64 &self.route_id
65 }
66
67 pub fn cancel_token(&self) -> CancellationToken {
72 self.cancel_token.clone()
73 }
74
75 pub fn sender(&self) -> mpsc::Sender<ExchangeEnvelope> {
81 self.sender.clone()
82 }
83
84 pub async fn send(&self, exchange: Exchange) -> Result<(), CamelError> {
86 self.sender
87 .send(ExchangeEnvelope {
88 exchange,
89 reply_tx: None,
90 })
91 .await
92 .map_err(|_| CamelError::ChannelClosed)
93 }
94
95 pub async fn send_and_wait(&self, exchange: Exchange) -> Result<Exchange, CamelError> {
100 let (reply_tx, reply_rx) = oneshot::channel();
101 self.sender
102 .send(ExchangeEnvelope {
103 exchange,
104 reply_tx: Some(reply_tx),
105 })
106 .await
107 .map_err(|_| CamelError::ChannelClosed)?;
108 reply_rx.await.map_err(|_| CamelError::ChannelClosed)?
109 }
110}
111
112pub struct SecurityContext {
118 pub policy: Arc<dyn SecurityPolicy>,
119 pub authenticator: Arc<dyn TokenAuthenticator>,
120 pub credential_sources: Vec<CredentialSource>,
121}
122
123impl SecurityContext {
124 pub fn new(
125 policy: impl SecurityPolicy + 'static,
126 authenticator: Arc<dyn TokenAuthenticator>,
127 ) -> Self {
128 Self {
129 policy: Arc::new(policy),
130 authenticator,
131 credential_sources: vec![CredentialSource::AuthorizationHeader],
132 }
133 }
134
135 pub fn from_arc(
136 policy: Arc<dyn SecurityPolicy>,
137 authenticator: Arc<dyn TokenAuthenticator>,
138 ) -> Self {
139 Self {
140 policy,
141 authenticator,
142 credential_sources: vec![CredentialSource::AuthorizationHeader],
143 }
144 }
145
146 pub fn with_credential_sources(mut self, sources: Vec<CredentialSource>) -> Self {
147 self.credential_sources = sources;
148 self
149 }
150}
151
152impl Clone for SecurityContext {
153 fn clone(&self) -> Self {
154 Self {
155 policy: Arc::clone(&self.policy),
156 authenticator: Arc::clone(&self.authenticator),
157 credential_sources: self.credential_sources.clone(),
158 }
159 }
160}
161
162impl std::fmt::Debug for SecurityContext {
163 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164 f.debug_struct("SecurityContext")
165 .field("policy", &"<SecurityPolicy>")
166 .field("authenticator", &"<TokenAuthenticator>")
167 .field("credential_sources", &self.credential_sources)
168 .finish()
169 }
170}
171
172#[derive(Debug, Clone, PartialEq, Eq)]
174pub enum ConcurrencyModel {
175 Sequential,
178 Concurrent { max: Option<usize> },
182}
183
184#[async_trait]
208pub trait Consumer: Send + Sync {
209 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError>;
211
212 async fn stop(&mut self) -> Result<(), CamelError>;
217
218 async fn suspend(&self) -> Result<(), CamelError> {
222 Ok(())
223 }
224
225 async fn resume(&self) -> Result<(), CamelError> {
229 Ok(())
230 }
231
232 fn concurrency_model(&self) -> ConcurrencyModel {
241 ConcurrencyModel::Sequential
242 }
243
244 fn background_task_handle(&mut self) -> Option<JoinHandle<Result<(), CamelError>>> {
256 None
257 }
258
259 fn set_security_context(&mut self, _ctx: SecurityContext) {}
266}
267
268#[cfg(test)]
269mod tests {
270 use super::*;
271
272 #[test]
273 fn consumer_context_exposes_route_id() {
274 let (tx, _rx) = mpsc::channel(1);
275 let ctx = ConsumerContext::new(tx, CancellationToken::new(), "test-route".to_string());
276 assert_eq!(ctx.route_id(), "test-route");
277 }
278
279 #[tokio::test]
280 async fn test_consumer_context_cancelled() {
281 let (tx, _rx) = mpsc::channel(16);
282 let token = CancellationToken::new();
283 let ctx = ConsumerContext::new(tx, token.clone(), "test-route".to_string());
284
285 assert!(!ctx.is_cancelled());
286 token.cancel();
287 ctx.cancelled().await;
288 assert!(ctx.is_cancelled());
289 }
290
291 #[test]
292 fn test_concurrency_model_default_is_sequential() {
293 use super::ConcurrencyModel;
294
295 struct DummyConsumer;
296
297 #[async_trait::async_trait]
298 impl super::Consumer for DummyConsumer {
299 async fn start(&mut self, _ctx: super::ConsumerContext) -> Result<(), CamelError> {
300 Ok(())
301 }
302 async fn stop(&mut self) -> Result<(), CamelError> {
303 Ok(())
304 }
305 }
306
307 let consumer = DummyConsumer;
308 assert_eq!(consumer.concurrency_model(), ConcurrencyModel::Sequential);
309 }
310
311 #[test]
312 fn test_concurrency_model_concurrent_override() {
313 use super::ConcurrencyModel;
314
315 struct ConcurrentConsumer;
316
317 #[async_trait::async_trait]
318 impl super::Consumer for ConcurrentConsumer {
319 async fn start(&mut self, _ctx: super::ConsumerContext) -> Result<(), CamelError> {
320 Ok(())
321 }
322 async fn stop(&mut self) -> Result<(), CamelError> {
323 Ok(())
324 }
325 fn concurrency_model(&self) -> ConcurrencyModel {
326 ConcurrencyModel::Concurrent { max: Some(16) }
327 }
328 }
329
330 let consumer = ConcurrentConsumer;
331 assert_eq!(
332 consumer.concurrency_model(),
333 ConcurrencyModel::Concurrent { max: Some(16) }
334 );
335 }
336
337 #[tokio::test]
338 async fn test_consumer_default_suspend_resume() {
339 struct DummyConsumer;
340
341 #[async_trait::async_trait]
342 impl super::Consumer for DummyConsumer {
343 async fn start(&mut self, _ctx: super::ConsumerContext) -> Result<(), CamelError> {
344 Ok(())
345 }
346 async fn stop(&mut self) -> Result<(), CamelError> {
347 Ok(())
348 }
349 }
350
351 let consumer = DummyConsumer;
352 assert!(consumer.suspend().await.is_ok());
353 assert!(consumer.resume().await.is_ok());
354 }
355
356 struct StubPolicy;
359
360 #[async_trait::async_trait]
361 impl SecurityPolicy for StubPolicy {
362 async fn evaluate(
363 &self,
364 _exchange: &mut Exchange,
365 ) -> Result<camel_api::security_policy::AuthorizationDecision, CamelError> {
366 Ok(camel_api::security_policy::AuthorizationDecision::Granted {
367 principal: camel_api::security_policy::Principal {
368 subject: "stub".into(),
369 issuer: "stub".into(),
370 audience: vec![],
371 scopes: vec![],
372 roles: vec![],
373 claims: serde_json::json!({}),
374 },
375 })
376 }
377 }
378
379 struct StubAuthenticator;
380
381 #[async_trait::async_trait]
382 impl camel_auth::TokenAuthenticator for StubAuthenticator {
383 async fn authenticate_bearer(
384 &self,
385 _token: &str,
386 ) -> Result<camel_api::security_policy::Principal, CamelError> {
387 Ok(camel_api::security_policy::Principal {
388 subject: "stub".into(),
389 issuer: "stub".into(),
390 audience: vec![],
391 scopes: vec![],
392 roles: vec![],
393 claims: serde_json::json!({}),
394 })
395 }
396 }
397
398 #[test]
399 fn test_security_context_new() {
400 let ctx = SecurityContext::new(StubPolicy, Arc::new(StubAuthenticator));
401 assert!(Arc::strong_count(&ctx.policy) == 1);
402 assert!(Arc::strong_count(&ctx.authenticator) == 1);
403 assert_eq!(
404 ctx.credential_sources,
405 vec![camel_auth::CredentialSource::AuthorizationHeader]
406 );
407 }
408
409 #[test]
410 fn test_security_context_from_arc() {
411 let policy: Arc<dyn SecurityPolicy> = Arc::new(StubPolicy);
412 let authenticator: Arc<dyn camel_auth::TokenAuthenticator> = Arc::new(StubAuthenticator);
413 let ctx = SecurityContext::from_arc(Arc::clone(&policy), Arc::clone(&authenticator));
414 assert!(Arc::ptr_eq(&ctx.policy, &policy));
415 assert!(Arc::ptr_eq(&ctx.authenticator, &authenticator));
416 assert_eq!(
417 ctx.credential_sources,
418 vec![camel_auth::CredentialSource::AuthorizationHeader]
419 );
420 }
421
422 #[test]
423 fn test_security_context_clone_independent() {
424 let ctx = SecurityContext::new(StubPolicy, Arc::new(StubAuthenticator));
425 let cloned = ctx.clone();
426 assert!(Arc::ptr_eq(&ctx.policy, &cloned.policy));
427 assert!(Arc::ptr_eq(&ctx.authenticator, &cloned.authenticator));
428 assert_eq!(ctx.credential_sources, cloned.credential_sources);
429 }
430
431 #[test]
432 fn test_security_context_debug_redacts_traits() {
433 let ctx = SecurityContext::new(StubPolicy, Arc::new(StubAuthenticator));
434 let debug_str = format!("{ctx:?}");
435 assert!(debug_str.contains("<SecurityPolicy>"));
436 assert!(debug_str.contains("<TokenAuthenticator>"));
437 assert!(debug_str.contains("credential_sources"));
438 }
439
440 #[test]
441 fn test_security_context_with_credential_sources() {
442 let ctx = SecurityContext::new(StubPolicy, Arc::new(StubAuthenticator))
443 .with_credential_sources(vec![
444 camel_auth::CredentialSource::Cookie {
445 name: "session".into(),
446 },
447 camel_auth::CredentialSource::AuthorizationHeader,
448 ]);
449 assert_eq!(ctx.credential_sources.len(), 2);
450 assert!(matches!(
451 &ctx.credential_sources[0],
452 camel_auth::CredentialSource::Cookie { .. }
453 ));
454 }
455}