tower_pipeline/
lib.rs

1//! A [Tower] [`Service`] combinator that "pipelines" two services.
2//!
3//! A [`Pipeline`] is a [`Service`] consisting of two other [`Service`]s where the response of the
4//! first is the request of the second. This is analagous to [function composition] but for
5//! services.
6//!
7//! ```
8//! use tower_pipeline::PipelineExt;
9//! use tower::{service_fn, BoxError, ServiceExt};
10//!
11//! # #[tokio::main]
12//! # async fn main() {
13//! // service that returns the length of a string
14//! let length_svc = service_fn(|input: &'static str| async move {
15//!     Ok::<_, BoxError>(input.len())
16//! });
17//!
18//! // service that doubles its input
19//! let double_svc = service_fn(|input: usize| async move {
20//!     Ok::<_, BoxError>(input * 2)
21//! });
22//!
23//! // combine our two services
24//! let combined = length_svc.pipeline(double_svc);
25//!
26//! // call the service
27//! let result = combined.oneshot("rust").await.unwrap();
28//!
29//! assert_eq!(result, 8);
30//! # }
31//! ```
32//!
33//! [Tower]: https://crates.io/crates/tower
34//! [`Service`]: tower_service::Service
35//! [function composition]: https://en.wikipedia.org/wiki/Function_composition
36
37#![doc(html_root_url = "https://docs.rs/tower-pipeline/0.1.0")]
38#![warn(
39    missing_debug_implementations,
40    missing_docs,
41    rust_2018_idioms,
42    unreachable_pub
43)]
44#![deny(broken_intra_doc_links)]
45#![allow(elided_lifetimes_in_paths, clippy::type_complexity)]
46#![cfg_attr(test, allow(clippy::float_cmp))]
47#![cfg_attr(docsrs, feature(doc_cfg))]
48
49use futures_util::ready;
50use pin_project_lite::pin_project;
51use std::future::Future;
52use std::{
53    pin::Pin,
54    task::{Context, Poll},
55};
56use tower_service::Service;
57
58/// Two services combined where the response of the first is the request of the second.
59#[derive(Debug, Clone, Copy, Default)]
60pub struct Pipeline<A, B> {
61    first: A,
62    second: B,
63}
64
65impl<A, B> Pipeline<A, B> {
66    /// Create a new [`Pipeline`] from two [`Service`]s.
67    pub fn new(first: A, second: B) -> Self {
68        Self { first, second }
69    }
70
71    /// Get a reference to the first service.
72    pub fn first_as_ref(&self) -> &A {
73        &self.first
74    }
75
76    /// Get a mutable reference to the first service.
77    pub fn first_as_mut(&mut self) -> &mut A {
78        &mut self.first
79    }
80
81    /// Consume `self`, returning the first service
82    pub fn into_first(self) -> A {
83        self.first
84    }
85
86    /// Get a reference to the second service.
87    pub fn second_as_ref(&self) -> &B {
88        &self.second
89    }
90
91    /// Get a mutable reference to the second service.
92    pub fn second_as_mut(&mut self) -> &mut B {
93        &mut self.second
94    }
95
96    /// Consume `self`, returning the second service
97    pub fn into_second(self) -> B {
98        self.second
99    }
100}
101
102impl<R, A, B> Service<R> for Pipeline<A, B>
103where
104    A: Service<R>,
105    B: Service<A::Response> + Clone,
106    A::Error: Into<B::Error>,
107{
108    type Response = B::Response;
109    type Error = B::Error;
110    type Future = ResponseFuture<R, A, B>;
111
112    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
113        self.first.poll_ready(cx).map_err(Into::into)
114    }
115
116    fn call(&mut self, req: R) -> Self::Future {
117        ResponseFuture {
118            state: State::FirstFuturePending {
119                future: self.first.call(req),
120            },
121            second: Some(self.second.clone()),
122        }
123    }
124}
125
126pin_project! {
127    /// Response future of [`Pipeline`].
128    pub struct ResponseFuture<R, A, B>
129    where
130        A: Service<R>,
131        B: Service<A::Response>,
132    {
133        #[pin]
134        state: State<R, A, B>,
135        second: Option<B>,
136    }
137}
138
139pin_project! {
140    #[project = StateProj]
141    enum State<R, A, B>
142    where
143        A: Service<R>,
144        B: Service<A::Response>,
145    {
146        FirstFuturePending { #[pin] future: A::Future },
147        PollReadySecond { first_res: Option<A::Response>, second: B },
148        SecondFuturePending { #[pin] future: B::Future },
149    }
150}
151
152impl<R, A, B> Future for ResponseFuture<R, A, B>
153where
154    A: Service<R>,
155    B: Service<A::Response>,
156    A::Error: Into<B::Error>,
157{
158    type Output = Result<B::Response, B::Error>;
159
160    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
161        loop {
162            let mut this = self.as_mut().project();
163
164            let new_state = match this.state.as_mut().project() {
165                StateProj::FirstFuturePending { future } => {
166                    let first_res = ready!(future.poll(cx).map_err(Into::into)?);
167                    let second = this.second.take().unwrap();
168                    State::PollReadySecond {
169                        first_res: Some(first_res),
170                        second,
171                    }
172                }
173
174                StateProj::PollReadySecond { first_res, second } => {
175                    let _ready: () = ready!(second.poll_ready(cx)?);
176                    State::SecondFuturePending {
177                        future: second.call(first_res.take().unwrap()),
178                    }
179                }
180
181                StateProj::SecondFuturePending { future } => return future.poll(cx),
182            };
183
184            this.state.set(new_state);
185        }
186    }
187}
188
189/// An extension trait for easily pipelining [`Service`]s.
190pub trait PipelineExt<R>: Service<R> {
191    /// Construct a [`Pipeline`].
192    fn pipeline<B>(self, second: B) -> Pipeline<Self, B>
193    where
194        Self: Service<R> + Sized,
195        B: Service<Self::Response> + Clone,
196        Self::Error: Into<B::Error>;
197}
198
199impl<R, T> PipelineExt<R> for T
200where
201    T: Service<R>,
202{
203    fn pipeline<B>(self, second: B) -> Pipeline<Self, B>
204    where
205        Self: Service<R> + Sized,
206        B: Service<Self::Response> + Clone,
207        Self::Error: Into<B::Error>,
208    {
209        Pipeline::new(self, second)
210    }
211}