1#![doc(html_root_url = "https://docs.rs/tower-pipeline/0.1.0")]
38#![warn(
39 missing_debug_implementations,
40 missing_docs,
41 rust_2018_idioms,
42 unreachable_pub
43)]
44#![deny(broken_intra_doc_links)]
45#![allow(elided_lifetimes_in_paths, clippy::type_complexity)]
46#![cfg_attr(test, allow(clippy::float_cmp))]
47#![cfg_attr(docsrs, feature(doc_cfg))]
48
49use futures_util::ready;
50use pin_project_lite::pin_project;
51use std::future::Future;
52use std::{
53 pin::Pin,
54 task::{Context, Poll},
55};
56use tower_service::Service;
57
58#[derive(Debug, Clone, Copy, Default)]
60pub struct Pipeline<A, B> {
61 first: A,
62 second: B,
63}
64
65impl<A, B> Pipeline<A, B> {
66 pub fn new(first: A, second: B) -> Self {
68 Self { first, second }
69 }
70
71 pub fn first_as_ref(&self) -> &A {
73 &self.first
74 }
75
76 pub fn first_as_mut(&mut self) -> &mut A {
78 &mut self.first
79 }
80
81 pub fn into_first(self) -> A {
83 self.first
84 }
85
86 pub fn second_as_ref(&self) -> &B {
88 &self.second
89 }
90
91 pub fn second_as_mut(&mut self) -> &mut B {
93 &mut self.second
94 }
95
96 pub fn into_second(self) -> B {
98 self.second
99 }
100}
101
102impl<R, A, B> Service<R> for Pipeline<A, B>
103where
104 A: Service<R>,
105 B: Service<A::Response> + Clone,
106 A::Error: Into<B::Error>,
107{
108 type Response = B::Response;
109 type Error = B::Error;
110 type Future = ResponseFuture<R, A, B>;
111
112 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
113 self.first.poll_ready(cx).map_err(Into::into)
114 }
115
116 fn call(&mut self, req: R) -> Self::Future {
117 ResponseFuture {
118 state: State::FirstFuturePending {
119 future: self.first.call(req),
120 },
121 second: Some(self.second.clone()),
122 }
123 }
124}
125
126pin_project! {
127 pub struct ResponseFuture<R, A, B>
129 where
130 A: Service<R>,
131 B: Service<A::Response>,
132 {
133 #[pin]
134 state: State<R, A, B>,
135 second: Option<B>,
136 }
137}
138
139pin_project! {
140 #[project = StateProj]
141 enum State<R, A, B>
142 where
143 A: Service<R>,
144 B: Service<A::Response>,
145 {
146 FirstFuturePending { #[pin] future: A::Future },
147 PollReadySecond { first_res: Option<A::Response>, second: B },
148 SecondFuturePending { #[pin] future: B::Future },
149 }
150}
151
152impl<R, A, B> Future for ResponseFuture<R, A, B>
153where
154 A: Service<R>,
155 B: Service<A::Response>,
156 A::Error: Into<B::Error>,
157{
158 type Output = Result<B::Response, B::Error>;
159
160 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
161 loop {
162 let mut this = self.as_mut().project();
163
164 let new_state = match this.state.as_mut().project() {
165 StateProj::FirstFuturePending { future } => {
166 let first_res = ready!(future.poll(cx).map_err(Into::into)?);
167 let second = this.second.take().unwrap();
168 State::PollReadySecond {
169 first_res: Some(first_res),
170 second,
171 }
172 }
173
174 StateProj::PollReadySecond { first_res, second } => {
175 let _ready: () = ready!(second.poll_ready(cx)?);
176 State::SecondFuturePending {
177 future: second.call(first_res.take().unwrap()),
178 }
179 }
180
181 StateProj::SecondFuturePending { future } => return future.poll(cx),
182 };
183
184 this.state.set(new_state);
185 }
186 }
187}
188
189pub trait PipelineExt<R>: Service<R> {
191 fn pipeline<B>(self, second: B) -> Pipeline<Self, B>
193 where
194 Self: Service<R> + Sized,
195 B: Service<Self::Response> + Clone,
196 Self::Error: Into<B::Error>;
197}
198
199impl<R, T> PipelineExt<R> for T
200where
201 T: Service<R>,
202{
203 fn pipeline<B>(self, second: B) -> Pipeline<Self, B>
204 where
205 Self: Service<R> + Sized,
206 B: Service<Self::Response> + Clone,
207 Self::Error: Into<B::Error>,
208 {
209 Pipeline::new(self, second)
210 }
211}