Skip to main content

camel_processor/
claim_check.rs

1//! Claim Check EIP processor.
2//!
3//! Stashes the message body into a `ClaimCheckRepository` and replaces it with
4//! a lightweight key reference (text body containing the key). Later steps can
5//! retrieve (Get) or retrieve+remove (GetAndRemove) the body, or use LIFO
6//! stack operations (Push/Pop).
7//!
8//! # Process mode
9//!
10//! This is a **Process-mode** Tower `Service<Exchange>` — transforms the body
11//! in place, no child sub-pipeline. No `StepLifecycle` (holds only
12//! `Arc<dyn ClaimCheckRepository>`, no background work).
13
14use std::future::Future;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use tower::Service;
20
21use camel_api::body::Body;
22use camel_api::{CamelError, ClaimCheckRepository, Exchange};
23
24/// Extracts a claim-check key string from the exchange (e.g. from header/property/body).
25/// Returns an error if the key cannot be resolved (null or empty).
26pub type KeyExpression = Arc<dyn Fn(&Exchange) -> Result<String, CamelError> + Send + Sync>;
27
28/// Claim Check operation variant.
29#[derive(Clone, Debug, PartialEq, Eq)]
30pub enum ClaimCheckOp {
31    /// Stash the body in the repository; replace exchange body with key reference.
32    Set,
33    /// Retrieve the body from the repository by key; replace exchange body.
34    Get,
35    /// Retrieve and remove in one atomic step.
36    GetAndRemove,
37    /// Push current body onto a LIFO stack for this key; replace body with key reference.
38    Push,
39    /// Pop body from a LIFO stack for this key; replace exchange body.
40    Pop,
41}
42
43/// Claim Check EIP processor.
44///
45/// Transforms the exchange body to/from a `ClaimCheckRepository` using the
46/// configured operation and key expression.
47#[derive(Clone)]
48pub struct ClaimCheckService {
49    repository: Arc<dyn ClaimCheckRepository>,
50    operation: ClaimCheckOp,
51    key_expression: KeyExpression,
52}
53
54impl std::fmt::Debug for ClaimCheckService {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        f.debug_struct("ClaimCheckService")
57            .field("repository", &self.repository.name())
58            .field("operation", &self.operation)
59            .finish()
60    }
61}
62
63impl ClaimCheckService {
64    /// Create a new `ClaimCheckService`.
65    pub fn new(
66        repository: Arc<dyn ClaimCheckRepository>,
67        operation: ClaimCheckOp,
68        key_expression: KeyExpression,
69    ) -> Self {
70        Self {
71            repository,
72            operation,
73            key_expression,
74        }
75    }
76}
77
78impl Service<Exchange> for ClaimCheckService {
79    type Response = Exchange;
80    type Error = CamelError;
81    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
82
83    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
84        Poll::Ready(Ok(()))
85    }
86
87    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
88        let repository = self.repository.clone();
89        let operation = self.operation.clone();
90        let key_result = (self.key_expression)(&exchange);
91
92        Box::pin(async move {
93            let key = key_result?;
94            match operation {
95                ClaimCheckOp::Set => {
96                    let body = std::mem::replace(&mut exchange.input.body, Body::Empty);
97                    repository.set(&key, body).await?;
98                    exchange.input.body = Body::Text(key);
99                    Ok(exchange)
100                }
101                ClaimCheckOp::Get => {
102                    let body = repository.get(&key).await?;
103                    exchange.input.body = body;
104                    Ok(exchange)
105                }
106                ClaimCheckOp::GetAndRemove => {
107                    let body = repository.get_and_remove(&key).await?;
108                    exchange.input.body = body;
109                    Ok(exchange)
110                }
111                ClaimCheckOp::Push => {
112                    let body = std::mem::replace(&mut exchange.input.body, Body::Empty);
113                    repository.push(&key, body).await?;
114                    exchange.input.body = Body::Text(key);
115                    Ok(exchange)
116                }
117                ClaimCheckOp::Pop => {
118                    let body = repository.pop(&key).await?;
119                    exchange.input.body = body;
120                    Ok(exchange)
121                }
122            }
123        })
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    use super::*;
130    use camel_api::{Exchange, Message};
131    use std::collections::{HashMap, VecDeque};
132    use std::sync::Mutex;
133    use tower::ServiceExt;
134
135    /// In-memory test repository backed by `Mutex<HashMap>`.
136    #[derive(Debug)]
137    struct TestRepo {
138        name: String,
139        keys: Mutex<HashMap<String, Body>>,
140        stacks: Mutex<HashMap<String, VecDeque<Body>>>,
141    }
142
143    impl TestRepo {
144        fn new(name: impl Into<String>) -> Self {
145            Self {
146                name: name.into(),
147                keys: Mutex::new(HashMap::new()),
148                stacks: Mutex::new(HashMap::new()),
149            }
150        }
151    }
152
153    #[async_trait::async_trait]
154    impl ClaimCheckRepository for TestRepo {
155        fn name(&self) -> &str {
156            &self.name
157        }
158
159        async fn set(&self, key: &str, payload: Body) -> Result<(), CamelError> {
160            self.keys
161                .lock()
162                .expect("mutex poisoned") // allow-unwrap
163                .insert(key.to_string(), payload);
164            Ok(())
165        }
166
167        async fn get(&self, key: &str) -> Result<Body, CamelError> {
168            self.keys
169                .lock()
170                .expect("mutex poisoned") // allow-unwrap
171                .get(key)
172                .cloned()
173                .ok_or_else(|| CamelError::RouteError(format!("Claim check key not found: {key}")))
174        }
175
176        async fn get_and_remove(&self, key: &str) -> Result<Body, CamelError> {
177            self.keys
178                .lock()
179                .expect("mutex poisoned") // allow-unwrap
180                .remove(key)
181                .ok_or_else(|| CamelError::RouteError(format!("Claim check key not found: {key}")))
182        }
183
184        async fn remove(&self, key: &str) -> Result<(), CamelError> {
185            self.keys
186                .lock()
187                .expect("mutex poisoned") // allow-unwrap
188                .remove(key);
189            Ok(())
190        }
191
192        async fn push(&self, key: &str, payload: Body) -> Result<(), CamelError> {
193            self.stacks
194                .lock()
195                .expect("mutex poisoned") // allow-unwrap
196                .entry(key.to_string())
197                .or_default()
198                .push_back(payload);
199            Ok(())
200        }
201
202        async fn pop(&self, key: &str) -> Result<Body, CamelError> {
203            self.stacks
204                .lock()
205                .expect("mutex poisoned") // allow-unwrap
206                .get_mut(key)
207                .and_then(|s| s.pop_back())
208                .ok_or_else(|| {
209                    CamelError::RouteError(format!("Claim check stack empty for key: {key}"))
210                })
211        }
212    }
213
214    fn make_key_expr(key: &str) -> KeyExpression {
215        let k = key.to_string();
216        Arc::new(move |_ex: &Exchange| Ok(k.clone()))
217    }
218
219    fn make_exchange(body: Body) -> Exchange {
220        Exchange::new(Message::new(body))
221    }
222
223    #[tokio::test]
224    async fn set_moves_body_to_repo() {
225        let repo = Arc::new(TestRepo::new("test"));
226        let svc = ClaimCheckService::new(repo.clone(), ClaimCheckOp::Set, make_key_expr("mykey"));
227        let exchange = make_exchange(Body::Text("secret-data".to_string()));
228
229        let result = svc.oneshot(exchange).await.unwrap();
230        // Exchange body is now the key reference
231        assert_eq!(result.input.body, Body::Text("mykey".to_string()));
232
233        // The original body is stashed in the repo
234        let stashed = repo.get("mykey").await.unwrap();
235        assert_eq!(stashed, Body::Text("secret-data".to_string()));
236    }
237
238    #[tokio::test]
239    async fn get_restores_body() {
240        let repo = Arc::new(TestRepo::new("test"));
241        repo.set("k1", Body::Text("stashed-body".to_string()))
242            .await
243            .unwrap();
244
245        let svc = ClaimCheckService::new(repo.clone(), ClaimCheckOp::Get, make_key_expr("k1"));
246        let exchange = make_exchange(Body::Empty);
247
248        let result = svc.oneshot(exchange).await.unwrap();
249        assert_eq!(result.input.body, Body::Text("stashed-body".to_string()));
250    }
251
252    #[tokio::test]
253    async fn get_and_remove_restores_and_deletes() {
254        let repo = Arc::new(TestRepo::new("test"));
255        repo.set("k2", Body::Text("will-be-removed".to_string()))
256            .await
257            .unwrap();
258
259        let svc = ClaimCheckService::new(
260            repo.clone(),
261            ClaimCheckOp::GetAndRemove,
262            make_key_expr("k2"),
263        );
264        let exchange = make_exchange(Body::Empty);
265
266        let result = svc.oneshot(exchange).await.unwrap();
267        assert_eq!(result.input.body, Body::Text("will-be-removed".to_string()));
268
269        // Repo should be empty after get_and_remove
270        let err = repo.get("k2").await.unwrap_err();
271        assert!(
272            matches!(&err, CamelError::RouteError(msg) if msg.contains("not found")),
273            "expected RouteError with 'not found', got: {err:?}"
274        );
275    }
276
277    #[tokio::test]
278    async fn push_pop_lifo() {
279        let repo = Arc::new(TestRepo::new("test"));
280        let first = Body::Text("first".to_string());
281        let second = Body::Text("second".to_string());
282
283        // Push first
284        let svc =
285            ClaimCheckService::new(repo.clone(), ClaimCheckOp::Push, make_key_expr("stack-key"));
286        svc.oneshot(make_exchange(first)).await.unwrap();
287
288        // Push second
289        let svc =
290            ClaimCheckService::new(repo.clone(), ClaimCheckOp::Push, make_key_expr("stack-key"));
291        svc.oneshot(make_exchange(second.clone())).await.unwrap();
292
293        // Pop should return second (LIFO)
294        let svc =
295            ClaimCheckService::new(repo.clone(), ClaimCheckOp::Pop, make_key_expr("stack-key"));
296        let result = svc.oneshot(make_exchange(Body::Empty)).await.unwrap();
297        assert_eq!(result.input.body, second);
298
299        // Pop should return first
300        let svc =
301            ClaimCheckService::new(repo.clone(), ClaimCheckOp::Pop, make_key_expr("stack-key"));
302        let result = svc.oneshot(make_exchange(Body::Empty)).await.unwrap();
303        assert_eq!(result.input.body, Body::Text("first".to_string()));
304    }
305
306    #[tokio::test]
307    async fn get_missing_propagates_error() {
308        let repo = Arc::new(TestRepo::new("test"));
309        let svc = ClaimCheckService::new(
310            repo.clone(),
311            ClaimCheckOp::Get,
312            make_key_expr("nonexistent"),
313        );
314        let exchange = make_exchange(Body::Empty);
315        let result = svc.oneshot(exchange).await;
316        assert!(
317            matches!(&result, Err(CamelError::RouteError(msg)) if msg.contains("not found")),
318            "expected Err with 'not found', got: {result:?}"
319        );
320    }
321
322    #[tokio::test]
323    async fn set_overwrites_existing() {
324        let repo = Arc::new(TestRepo::new("test"));
325        repo.set("k", Body::Text("old".to_string())).await.unwrap();
326
327        let svc = ClaimCheckService::new(repo.clone(), ClaimCheckOp::Set, make_key_expr("k"));
328        let exchange = make_exchange(Body::Text("new".to_string()));
329        svc.oneshot(exchange).await.unwrap();
330
331        let stashed = repo.get("k").await.unwrap();
332        assert_eq!(stashed, Body::Text("new".to_string()));
333    }
334
335    #[tokio::test]
336    async fn pop_empty_stack_propagates_error() {
337        let repo = Arc::new(TestRepo::new("test"));
338        let svc = ClaimCheckService::new(repo.clone(), ClaimCheckOp::Pop, make_key_expr("empty"));
339        let exchange = make_exchange(Body::Empty);
340        let result = svc.oneshot(exchange).await;
341        assert!(
342            matches!(&result, Err(CamelError::RouteError(msg)) if msg.contains("empty")),
343            "expected Err with 'empty', got: {result:?}"
344        );
345    }
346}