use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tower::Service;
use camel_api::{BoxProcessor, CamelError, Exchange, FilterPredicate};
#[derive(Clone)]
pub struct FilterService {
predicate: FilterPredicate,
sub_pipeline: BoxProcessor,
}
impl FilterService {
pub fn new(
predicate: impl Fn(&Exchange) -> bool + Send + Sync + 'static,
sub_pipeline: BoxProcessor,
) -> Self {
Self {
predicate: Arc::new(predicate),
sub_pipeline,
}
}
pub fn from_predicate(predicate: FilterPredicate, sub_pipeline: BoxProcessor) -> Self {
Self {
predicate,
sub_pipeline,
}
}
}
impl Service<Exchange> for FilterService {
type Response = Exchange;
type Error = CamelError;
type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.sub_pipeline.poll_ready(cx)
}
fn call(&mut self, exchange: Exchange) -> Self::Future {
if (self.predicate)(&exchange) {
let fut = self.sub_pipeline.call(exchange);
Box::pin(fut)
} else {
Box::pin(async move { Ok(exchange) })
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use camel_api::{Body, BoxProcessorExt, Message, Value};
use tower::ServiceExt;
fn passthrough() -> BoxProcessor {
BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
}
fn uppercase_body() -> BoxProcessor {
BoxProcessor::from_fn(|mut ex: Exchange| {
Box::pin(async move {
if let Body::Text(s) = &ex.input.body {
ex.input.body = Body::Text(s.to_uppercase());
}
Ok(ex)
})
})
}
fn failing() -> BoxProcessor {
BoxProcessor::from_fn(|_ex| {
Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
})
}
#[tokio::test]
async fn test_filter_passes_matching_exchange() {
let mut svc = FilterService::new(
|ex: &Exchange| ex.input.header("active").is_some(),
uppercase_body(),
);
let mut ex = Exchange::new(Message::new("hello"));
ex.input.set_header("active", Value::Bool(true));
let result = svc.ready().await.unwrap().call(ex).await.unwrap();
assert_eq!(result.input.body.as_text(), Some("HELLO"));
}
#[tokio::test]
async fn test_filter_blocks_non_matching_exchange() {
let mut svc = FilterService::new(
|ex: &Exchange| ex.input.header("active").is_some(),
uppercase_body(),
);
let ex = Exchange::new(Message::new("hello"));
let result = svc.ready().await.unwrap().call(ex).await.unwrap();
assert_eq!(result.input.body.as_text(), Some("hello"));
}
#[tokio::test]
async fn test_filter_sub_pipeline_transforms_body() {
let mut svc = FilterService::new(|_: &Exchange| true, uppercase_body());
let ex = Exchange::new(Message::new("world"));
let result = svc.ready().await.unwrap().call(ex).await.unwrap();
assert_eq!(result.input.body.as_text(), Some("WORLD"));
}
#[tokio::test]
async fn test_filter_sub_pipeline_error_propagates() {
let mut svc = FilterService::new(|_: &Exchange| true, failing());
let ex = Exchange::new(Message::new("x"));
let result = svc.ready().await.unwrap().call(ex).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("boom"));
}
#[tokio::test]
async fn test_filter_predicate_receives_original_exchange() {
let mut svc = FilterService::new(
|ex: &Exchange| ex.input.body.as_text() == Some("check"),
uppercase_body(),
);
let ex = Exchange::new(Message::new("check"));
let result = svc.ready().await.unwrap().call(ex).await.unwrap();
assert_eq!(result.input.body.as_text(), Some("CHECK"));
}
#[tokio::test]
async fn test_filter_clone_is_independent() {
let svc = FilterService::new(|_: &Exchange| true, passthrough());
let mut clone = svc.clone();
let ex = Exchange::new(Message::new("hi"));
let result = clone.ready().await.unwrap().call(ex).await.unwrap();
assert_eq!(result.input.body.as_text(), Some("hi"));
}
}