use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tower::Service;
use camel_api::body::Body;
use camel_api::{CamelError, ClaimCheckRepository, Exchange};
pub type KeyExpression = Arc<dyn Fn(&Exchange) -> Result<String, CamelError> + Send + Sync>;
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ClaimCheckOp {
Set,
Get,
GetAndRemove,
Push,
Pop,
}
#[derive(Clone)]
pub struct ClaimCheckService {
repository: Arc<dyn ClaimCheckRepository>,
operation: ClaimCheckOp,
key_expression: KeyExpression,
}
impl std::fmt::Debug for ClaimCheckService {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ClaimCheckService")
.field("repository", &self.repository.name())
.field("operation", &self.operation)
.finish()
}
}
impl ClaimCheckService {
pub fn new(
repository: Arc<dyn ClaimCheckRepository>,
operation: ClaimCheckOp,
key_expression: KeyExpression,
) -> Self {
Self {
repository,
operation,
key_expression,
}
}
}
impl Service<Exchange> for ClaimCheckService {
type Response = Exchange;
type Error = CamelError;
type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, mut exchange: Exchange) -> Self::Future {
let repository = self.repository.clone();
let operation = self.operation.clone();
let key_result = (self.key_expression)(&exchange);
Box::pin(async move {
let key = key_result?;
match operation {
ClaimCheckOp::Set => {
let body = std::mem::replace(&mut exchange.input.body, Body::Empty);
repository.set(&key, body).await?;
exchange.input.body = Body::Text(key);
Ok(exchange)
}
ClaimCheckOp::Get => {
let body = repository.get(&key).await?;
exchange.input.body = body;
Ok(exchange)
}
ClaimCheckOp::GetAndRemove => {
let body = repository.get_and_remove(&key).await?;
exchange.input.body = body;
Ok(exchange)
}
ClaimCheckOp::Push => {
let body = std::mem::replace(&mut exchange.input.body, Body::Empty);
repository.push(&key, body).await?;
exchange.input.body = Body::Text(key);
Ok(exchange)
}
ClaimCheckOp::Pop => {
let body = repository.pop(&key).await?;
exchange.input.body = body;
Ok(exchange)
}
}
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use camel_api::{Exchange, Message};
use std::collections::{HashMap, VecDeque};
use std::sync::Mutex;
use tower::ServiceExt;
#[derive(Debug)]
struct TestRepo {
name: String,
keys: Mutex<HashMap<String, Body>>,
stacks: Mutex<HashMap<String, VecDeque<Body>>>,
}
impl TestRepo {
fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
keys: Mutex::new(HashMap::new()),
stacks: Mutex::new(HashMap::new()),
}
}
}
#[async_trait::async_trait]
impl ClaimCheckRepository for TestRepo {
fn name(&self) -> &str {
&self.name
}
async fn set(&self, key: &str, payload: Body) -> Result<(), CamelError> {
self.keys
.lock()
.expect("mutex poisoned") .insert(key.to_string(), payload);
Ok(())
}
async fn get(&self, key: &str) -> Result<Body, CamelError> {
self.keys
.lock()
.expect("mutex poisoned") .get(key)
.cloned()
.ok_or_else(|| CamelError::RouteError(format!("Claim check key not found: {key}")))
}
async fn get_and_remove(&self, key: &str) -> Result<Body, CamelError> {
self.keys
.lock()
.expect("mutex poisoned") .remove(key)
.ok_or_else(|| CamelError::RouteError(format!("Claim check key not found: {key}")))
}
async fn remove(&self, key: &str) -> Result<(), CamelError> {
self.keys
.lock()
.expect("mutex poisoned") .remove(key);
Ok(())
}
async fn push(&self, key: &str, payload: Body) -> Result<(), CamelError> {
self.stacks
.lock()
.expect("mutex poisoned") .entry(key.to_string())
.or_default()
.push_back(payload);
Ok(())
}
async fn pop(&self, key: &str) -> Result<Body, CamelError> {
self.stacks
.lock()
.expect("mutex poisoned") .get_mut(key)
.and_then(|s| s.pop_back())
.ok_or_else(|| {
CamelError::RouteError(format!("Claim check stack empty for key: {key}"))
})
}
}
fn make_key_expr(key: &str) -> KeyExpression {
let k = key.to_string();
Arc::new(move |_ex: &Exchange| Ok(k.clone()))
}
fn make_exchange(body: Body) -> Exchange {
Exchange::new(Message::new(body))
}
#[tokio::test]
async fn set_moves_body_to_repo() {
let repo = Arc::new(TestRepo::new("test"));
let svc = ClaimCheckService::new(repo.clone(), ClaimCheckOp::Set, make_key_expr("mykey"));
let exchange = make_exchange(Body::Text("secret-data".to_string()));
let result = svc.oneshot(exchange).await.unwrap();
assert_eq!(result.input.body, Body::Text("mykey".to_string()));
let stashed = repo.get("mykey").await.unwrap();
assert_eq!(stashed, Body::Text("secret-data".to_string()));
}
#[tokio::test]
async fn get_restores_body() {
let repo = Arc::new(TestRepo::new("test"));
repo.set("k1", Body::Text("stashed-body".to_string()))
.await
.unwrap();
let svc = ClaimCheckService::new(repo.clone(), ClaimCheckOp::Get, make_key_expr("k1"));
let exchange = make_exchange(Body::Empty);
let result = svc.oneshot(exchange).await.unwrap();
assert_eq!(result.input.body, Body::Text("stashed-body".to_string()));
}
#[tokio::test]
async fn get_and_remove_restores_and_deletes() {
let repo = Arc::new(TestRepo::new("test"));
repo.set("k2", Body::Text("will-be-removed".to_string()))
.await
.unwrap();
let svc = ClaimCheckService::new(
repo.clone(),
ClaimCheckOp::GetAndRemove,
make_key_expr("k2"),
);
let exchange = make_exchange(Body::Empty);
let result = svc.oneshot(exchange).await.unwrap();
assert_eq!(result.input.body, Body::Text("will-be-removed".to_string()));
let err = repo.get("k2").await.unwrap_err();
assert!(
matches!(&err, CamelError::RouteError(msg) if msg.contains("not found")),
"expected RouteError with 'not found', got: {err:?}"
);
}
#[tokio::test]
async fn push_pop_lifo() {
let repo = Arc::new(TestRepo::new("test"));
let first = Body::Text("first".to_string());
let second = Body::Text("second".to_string());
let svc =
ClaimCheckService::new(repo.clone(), ClaimCheckOp::Push, make_key_expr("stack-key"));
svc.oneshot(make_exchange(first)).await.unwrap();
let svc =
ClaimCheckService::new(repo.clone(), ClaimCheckOp::Push, make_key_expr("stack-key"));
svc.oneshot(make_exchange(second.clone())).await.unwrap();
let svc =
ClaimCheckService::new(repo.clone(), ClaimCheckOp::Pop, make_key_expr("stack-key"));
let result = svc.oneshot(make_exchange(Body::Empty)).await.unwrap();
assert_eq!(result.input.body, second);
let svc =
ClaimCheckService::new(repo.clone(), ClaimCheckOp::Pop, make_key_expr("stack-key"));
let result = svc.oneshot(make_exchange(Body::Empty)).await.unwrap();
assert_eq!(result.input.body, Body::Text("first".to_string()));
}
#[tokio::test]
async fn get_missing_propagates_error() {
let repo = Arc::new(TestRepo::new("test"));
let svc = ClaimCheckService::new(
repo.clone(),
ClaimCheckOp::Get,
make_key_expr("nonexistent"),
);
let exchange = make_exchange(Body::Empty);
let result = svc.oneshot(exchange).await;
assert!(
matches!(&result, Err(CamelError::RouteError(msg)) if msg.contains("not found")),
"expected Err with 'not found', got: {result:?}"
);
}
#[tokio::test]
async fn set_overwrites_existing() {
let repo = Arc::new(TestRepo::new("test"));
repo.set("k", Body::Text("old".to_string())).await.unwrap();
let svc = ClaimCheckService::new(repo.clone(), ClaimCheckOp::Set, make_key_expr("k"));
let exchange = make_exchange(Body::Text("new".to_string()));
svc.oneshot(exchange).await.unwrap();
let stashed = repo.get("k").await.unwrap();
assert_eq!(stashed, Body::Text("new".to_string()));
}
#[tokio::test]
async fn pop_empty_stack_propagates_error() {
let repo = Arc::new(TestRepo::new("test"));
let svc = ClaimCheckService::new(repo.clone(), ClaimCheckOp::Pop, make_key_expr("empty"));
let exchange = make_exchange(Body::Empty);
let result = svc.oneshot(exchange).await;
assert!(
matches!(&result, Err(CamelError::RouteError(msg)) if msg.contains("empty")),
"expected Err with 'empty', got: {result:?}"
);
}
}