1use std::future::Future;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19use tower::Service;
20
21use camel_api::body::Body;
22use camel_api::{CamelError, ClaimCheckRepository, Exchange};
23
24pub type KeyExpression = Arc<dyn Fn(&Exchange) -> Result<String, CamelError> + Send + Sync>;
27
28#[derive(Clone, Debug, PartialEq, Eq)]
30pub enum ClaimCheckOp {
31 Set,
33 Get,
35 GetAndRemove,
37 Push,
39 Pop,
41}
42
43#[derive(Clone)]
48pub struct ClaimCheckService {
49 repository: Arc<dyn ClaimCheckRepository>,
50 operation: ClaimCheckOp,
51 key_expression: KeyExpression,
52}
53
54impl std::fmt::Debug for ClaimCheckService {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 f.debug_struct("ClaimCheckService")
57 .field("repository", &self.repository.name())
58 .field("operation", &self.operation)
59 .finish()
60 }
61}
62
63impl ClaimCheckService {
64 pub fn new(
66 repository: Arc<dyn ClaimCheckRepository>,
67 operation: ClaimCheckOp,
68 key_expression: KeyExpression,
69 ) -> Self {
70 Self {
71 repository,
72 operation,
73 key_expression,
74 }
75 }
76}
77
78impl Service<Exchange> for ClaimCheckService {
79 type Response = Exchange;
80 type Error = CamelError;
81 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
82
83 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
84 Poll::Ready(Ok(()))
85 }
86
87 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
88 let repository = self.repository.clone();
89 let operation = self.operation.clone();
90 let key_result = (self.key_expression)(&exchange);
91
92 Box::pin(async move {
93 let key = key_result?;
94 match operation {
95 ClaimCheckOp::Set => {
96 let body = std::mem::replace(&mut exchange.input.body, Body::Empty);
97 repository.set(&key, body).await?;
98 exchange.input.body = Body::Text(key);
99 Ok(exchange)
100 }
101 ClaimCheckOp::Get => {
102 let body = repository.get(&key).await?;
103 exchange.input.body = body;
104 Ok(exchange)
105 }
106 ClaimCheckOp::GetAndRemove => {
107 let body = repository.get_and_remove(&key).await?;
108 exchange.input.body = body;
109 Ok(exchange)
110 }
111 ClaimCheckOp::Push => {
112 let body = std::mem::replace(&mut exchange.input.body, Body::Empty);
113 repository.push(&key, body).await?;
114 exchange.input.body = Body::Text(key);
115 Ok(exchange)
116 }
117 ClaimCheckOp::Pop => {
118 let body = repository.pop(&key).await?;
119 exchange.input.body = body;
120 Ok(exchange)
121 }
122 }
123 })
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use super::*;
130 use camel_api::{Exchange, Message};
131 use std::collections::{HashMap, VecDeque};
132 use std::sync::Mutex;
133 use tower::ServiceExt;
134
135 #[derive(Debug)]
137 struct TestRepo {
138 name: String,
139 keys: Mutex<HashMap<String, Body>>,
140 stacks: Mutex<HashMap<String, VecDeque<Body>>>,
141 }
142
143 impl TestRepo {
144 fn new(name: impl Into<String>) -> Self {
145 Self {
146 name: name.into(),
147 keys: Mutex::new(HashMap::new()),
148 stacks: Mutex::new(HashMap::new()),
149 }
150 }
151 }
152
153 #[async_trait::async_trait]
154 impl ClaimCheckRepository for TestRepo {
155 fn name(&self) -> &str {
156 &self.name
157 }
158
159 async fn set(&self, key: &str, payload: Body) -> Result<(), CamelError> {
160 self.keys
161 .lock()
162 .expect("mutex poisoned") .insert(key.to_string(), payload);
164 Ok(())
165 }
166
167 async fn get(&self, key: &str) -> Result<Body, CamelError> {
168 self.keys
169 .lock()
170 .expect("mutex poisoned") .get(key)
172 .cloned()
173 .ok_or_else(|| CamelError::RouteError(format!("Claim check key not found: {key}")))
174 }
175
176 async fn get_and_remove(&self, key: &str) -> Result<Body, CamelError> {
177 self.keys
178 .lock()
179 .expect("mutex poisoned") .remove(key)
181 .ok_or_else(|| CamelError::RouteError(format!("Claim check key not found: {key}")))
182 }
183
184 async fn remove(&self, key: &str) -> Result<(), CamelError> {
185 self.keys
186 .lock()
187 .expect("mutex poisoned") .remove(key);
189 Ok(())
190 }
191
192 async fn push(&self, key: &str, payload: Body) -> Result<(), CamelError> {
193 self.stacks
194 .lock()
195 .expect("mutex poisoned") .entry(key.to_string())
197 .or_default()
198 .push_back(payload);
199 Ok(())
200 }
201
202 async fn pop(&self, key: &str) -> Result<Body, CamelError> {
203 self.stacks
204 .lock()
205 .expect("mutex poisoned") .get_mut(key)
207 .and_then(|s| s.pop_back())
208 .ok_or_else(|| {
209 CamelError::RouteError(format!("Claim check stack empty for key: {key}"))
210 })
211 }
212 }
213
214 fn make_key_expr(key: &str) -> KeyExpression {
215 let k = key.to_string();
216 Arc::new(move |_ex: &Exchange| Ok(k.clone()))
217 }
218
219 fn make_exchange(body: Body) -> Exchange {
220 Exchange::new(Message::new(body))
221 }
222
223 #[tokio::test]
224 async fn set_moves_body_to_repo() {
225 let repo = Arc::new(TestRepo::new("test"));
226 let svc = ClaimCheckService::new(repo.clone(), ClaimCheckOp::Set, make_key_expr("mykey"));
227 let exchange = make_exchange(Body::Text("secret-data".to_string()));
228
229 let result = svc.oneshot(exchange).await.unwrap();
230 assert_eq!(result.input.body, Body::Text("mykey".to_string()));
232
233 let stashed = repo.get("mykey").await.unwrap();
235 assert_eq!(stashed, Body::Text("secret-data".to_string()));
236 }
237
238 #[tokio::test]
239 async fn get_restores_body() {
240 let repo = Arc::new(TestRepo::new("test"));
241 repo.set("k1", Body::Text("stashed-body".to_string()))
242 .await
243 .unwrap();
244
245 let svc = ClaimCheckService::new(repo.clone(), ClaimCheckOp::Get, make_key_expr("k1"));
246 let exchange = make_exchange(Body::Empty);
247
248 let result = svc.oneshot(exchange).await.unwrap();
249 assert_eq!(result.input.body, Body::Text("stashed-body".to_string()));
250 }
251
252 #[tokio::test]
253 async fn get_and_remove_restores_and_deletes() {
254 let repo = Arc::new(TestRepo::new("test"));
255 repo.set("k2", Body::Text("will-be-removed".to_string()))
256 .await
257 .unwrap();
258
259 let svc = ClaimCheckService::new(
260 repo.clone(),
261 ClaimCheckOp::GetAndRemove,
262 make_key_expr("k2"),
263 );
264 let exchange = make_exchange(Body::Empty);
265
266 let result = svc.oneshot(exchange).await.unwrap();
267 assert_eq!(result.input.body, Body::Text("will-be-removed".to_string()));
268
269 let err = repo.get("k2").await.unwrap_err();
271 assert!(
272 matches!(&err, CamelError::RouteError(msg) if msg.contains("not found")),
273 "expected RouteError with 'not found', got: {err:?}"
274 );
275 }
276
277 #[tokio::test]
278 async fn push_pop_lifo() {
279 let repo = Arc::new(TestRepo::new("test"));
280 let first = Body::Text("first".to_string());
281 let second = Body::Text("second".to_string());
282
283 let svc =
285 ClaimCheckService::new(repo.clone(), ClaimCheckOp::Push, make_key_expr("stack-key"));
286 svc.oneshot(make_exchange(first)).await.unwrap();
287
288 let svc =
290 ClaimCheckService::new(repo.clone(), ClaimCheckOp::Push, make_key_expr("stack-key"));
291 svc.oneshot(make_exchange(second.clone())).await.unwrap();
292
293 let svc =
295 ClaimCheckService::new(repo.clone(), ClaimCheckOp::Pop, make_key_expr("stack-key"));
296 let result = svc.oneshot(make_exchange(Body::Empty)).await.unwrap();
297 assert_eq!(result.input.body, second);
298
299 let svc =
301 ClaimCheckService::new(repo.clone(), ClaimCheckOp::Pop, make_key_expr("stack-key"));
302 let result = svc.oneshot(make_exchange(Body::Empty)).await.unwrap();
303 assert_eq!(result.input.body, Body::Text("first".to_string()));
304 }
305
306 #[tokio::test]
307 async fn get_missing_propagates_error() {
308 let repo = Arc::new(TestRepo::new("test"));
309 let svc = ClaimCheckService::new(
310 repo.clone(),
311 ClaimCheckOp::Get,
312 make_key_expr("nonexistent"),
313 );
314 let exchange = make_exchange(Body::Empty);
315 let result = svc.oneshot(exchange).await;
316 assert!(
317 matches!(&result, Err(CamelError::RouteError(msg)) if msg.contains("not found")),
318 "expected Err with 'not found', got: {result:?}"
319 );
320 }
321
322 #[tokio::test]
323 async fn set_overwrites_existing() {
324 let repo = Arc::new(TestRepo::new("test"));
325 repo.set("k", Body::Text("old".to_string())).await.unwrap();
326
327 let svc = ClaimCheckService::new(repo.clone(), ClaimCheckOp::Set, make_key_expr("k"));
328 let exchange = make_exchange(Body::Text("new".to_string()));
329 svc.oneshot(exchange).await.unwrap();
330
331 let stashed = repo.get("k").await.unwrap();
332 assert_eq!(stashed, Body::Text("new".to_string()));
333 }
334
335 #[tokio::test]
336 async fn pop_empty_stack_propagates_error() {
337 let repo = Arc::new(TestRepo::new("test"));
338 let svc = ClaimCheckService::new(repo.clone(), ClaimCheckOp::Pop, make_key_expr("empty"));
339 let exchange = make_exchange(Body::Empty);
340 let result = svc.oneshot(exchange).await;
341 assert!(
342 matches!(&result, Err(CamelError::RouteError(msg)) if msg.contains("empty")),
343 "expected Err with 'empty', got: {result:?}"
344 );
345 }
346}