use std::future::Future;
use std::pin::Pin;
use camel_api::{
BodyType, BoxProcessor, CamelError, Exchange, OutcomePipeline, OutcomeSegment, PipelineOutcome,
body_converter,
};
pub fn compose_outcome_segment(children: Vec<Box<dyn OutcomePipeline>>) -> OutcomeSegment {
if children.is_empty() {
return OutcomeSegment::new(Box::new(NoopSegment));
}
OutcomeSegment::new(Box::new(SequentialOutcomeSegment::new(children)))
}
#[derive(Clone)]
struct NoopSegment;
impl OutcomePipeline for NoopSegment {
fn clone_box(&self) -> Box<dyn OutcomePipeline> {
Box::new(NoopSegment)
}
fn run<'a>(
&'a mut self,
exchange: Exchange,
) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
Box::pin(async move { PipelineOutcome::Completed(exchange) })
}
}
#[derive(Clone)]
pub struct SequentialOutcomeSegment {
children: Vec<Box<dyn OutcomePipeline>>,
}
impl SequentialOutcomeSegment {
pub fn new(children: Vec<Box<dyn OutcomePipeline>>) -> Self {
Self { children }
}
}
impl OutcomePipeline for SequentialOutcomeSegment {
fn clone_box(&self) -> Box<dyn OutcomePipeline> {
Box::new(self.clone())
}
fn run<'a>(
&'a mut self,
mut exchange: Exchange,
) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
Box::pin(async move {
for child in self.children.iter_mut() {
match child.run(exchange).await {
PipelineOutcome::Completed(next) => {
if camel_api::is_camel_stop(&next) {
return PipelineOutcome::Stopped(next);
}
exchange = next;
}
other => return other,
}
}
PipelineOutcome::Completed(exchange)
})
}
}
#[derive(Clone)]
pub struct BodyCoercingSegment {
inner: Box<dyn OutcomePipeline>,
contract: BodyType,
}
impl BodyCoercingSegment {
pub fn new(inner: Box<dyn OutcomePipeline>, contract: BodyType) -> Self {
Self { inner, contract }
}
}
impl OutcomePipeline for BodyCoercingSegment {
fn clone_box(&self) -> Box<dyn OutcomePipeline> {
Box::new(self.clone())
}
fn run<'a>(
&'a mut self,
mut exchange: Exchange,
) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
let contract = self.contract;
Box::pin(async move {
let body = std::mem::replace(&mut exchange.input.body, camel_api::body::Body::Empty);
match body_converter::convert(body, contract) {
Ok(coerced) => {
exchange.input.body = coerced;
}
Err(e) => {
return PipelineOutcome::Failed(CamelError::TypeConversionFailed(format!(
"body coercion failed: {e}"
)));
}
}
self.inner.run(exchange).await
})
}
}
#[derive(Clone)]
pub struct BoxProcessorSegment {
processor: BoxProcessor,
}
impl BoxProcessorSegment {
pub fn new(processor: BoxProcessor) -> Self {
Self { processor }
}
}
impl OutcomePipeline for BoxProcessorSegment {
fn clone_box(&self) -> Box<dyn OutcomePipeline> {
Box::new(self.clone())
}
fn run<'a>(
&'a mut self,
exchange: Exchange,
) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
use tower::ServiceExt;
Box::pin(async move {
match self.processor.ready().await {
Ok(mut ready) => match tower::Service::call(&mut ready, exchange).await {
Ok(ex) => {
if camel_api::is_camel_stop(&ex) {
PipelineOutcome::Stopped(ex)
} else {
PipelineOutcome::Completed(ex)
}
}
Err(err) => PipelineOutcome::Failed(err),
},
Err(err) => PipelineOutcome::Failed(err),
}
})
}
}
#[derive(Clone)]
pub struct StopSegment;
impl OutcomePipeline for StopSegment {
fn clone_box(&self) -> Box<dyn OutcomePipeline> {
Box::new(StopSegment)
}
fn run<'a>(
&'a mut self,
exchange: Exchange,
) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
Box::pin(async move { PipelineOutcome::Stopped(exchange) })
}
}
#[cfg(test)]
mod tests {
use super::*;
use camel_api::BoxProcessorExt;
#[test]
fn compose_outcome_segment_empty_returns_noop() {
let seg = compose_outcome_segment(vec![]);
let _ = seg;
}
#[test]
fn compose_outcome_segment_single_child() {
let kids: Vec<Box<dyn OutcomePipeline>> = vec![Box::new(StopSegment)];
let seg = compose_outcome_segment(kids);
let _ = seg;
}
#[test]
fn body_coercing_segment_construction() {
let inner = Box::new(StopSegment);
let seg = BodyCoercingSegment::new(inner, BodyType::Text);
assert_eq!(seg.contract, BodyType::Text);
}
#[test]
fn box_processor_segment_construction() {
let bp = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
let seg = BoxProcessorSegment::new(bp);
let _ = seg;
}
#[test]
fn sequential_outcome_segment_construction() {
let kids: Vec<Box<dyn OutcomePipeline>> =
vec![Box::new(StopSegment), Box::new(NoopSegment)];
let seq = SequentialOutcomeSegment::new(kids);
assert_eq!(seq.children.len(), 2);
}
}