tk_pool/
queue.rs

1//! A queue (buffer) of requests sent to connection pool
2use std::fmt;
3use futures::{AsyncSink, Stream, StartSend, Poll, Async};
4use futures::sync::mpsc::{self, channel, Sender};
5use futures::sink::Sink;
6use futures::stream::Fuse;
7use futures::future::Future;
8use tokio_core::reactor::Handle;
9
10use metrics::Collect;
11use error_log::{ErrorLog, ShutdownReason};
12use config::{Queue, DefaultQueue, private};
13
14
15/// Pool is an object you use to access a connection pool
16///
17/// Usually the whole logic of connection pool is spawned in another future.
18/// This object encapsulates a channel that is used to communicate with pool.
19/// This object also contains a clone of metrics collection object as it's
20/// very important to collect metrics at this side of a channel.
21#[derive(Debug)]
22pub struct Pool<V, M> {
23    channel: Sender<V>,
24    metrics: M,
25}
26
27/// Error returned by the sink, when underlying pool is closed
28///
29/// The error contains underlying item that was sent using `start_send`
30pub struct QueueError<V>(V);
31
32
33/// This is similar to `Forward` from `futures` but has metrics and errors
34#[derive(Debug)]
35#[must_use = "futures do nothing unless polled"]
36struct ForwardFuture<S, M, E>
37    where S: Sink
38{
39     receiver: Fuse<mpsc::Receiver<S::SinkItem>>,
40     buffer: Option<S::SinkItem>,
41     metrics: M,
42     errors: E,
43     sink: S,
44}
45
46impl<I: 'static, M> private::NewQueue<I, M> for DefaultQueue {
47    type Pool = Pool<I, M>;
48    fn spawn_on<S, E>(self, pool: S, err: E, metrics: M, handle: &Handle)
49        -> Self::Pool
50        where S: Sink<SinkItem=I, SinkError=private::Done> + 'static,
51              E: ErrorLog + 'static,
52              M: Collect + 'static,
53    {
54        Queue(100).spawn_on(pool, err, metrics, handle)
55    }
56}
57
58impl<I: 'static, M> private::NewQueue<I, M> for Queue {
59    type Pool = Pool<I, M>;
60    fn spawn_on<S, E>(self, pool: S, e: E, metrics: M, handle: &Handle)
61        -> Self::Pool
62        where S: Sink<SinkItem=I, SinkError=private::Done> + 'static,
63              E: ErrorLog + 'static,
64              M: Collect + 'static,
65    {
66        // one item is buffered ForwardFuture
67        let buf_size = self.0.saturating_sub(1);
68        let (tx, rx) = channel(buf_size);
69        handle.spawn(ForwardFuture {
70            receiver: rx.fuse(),
71            metrics: metrics.clone(),
72            errors: e,
73            sink: pool,
74            buffer: None,
75        });
76        return Pool {
77            channel: tx,
78            metrics,
79        };
80    }
81}
82
83
84trait AssertTraits: Clone + Send + Sync {}
85impl<V: Send, M: Collect> AssertTraits for Pool<V, M> {}
86
87impl<V, M: Clone> Clone for Pool<V, M> {
88    fn clone(&self) -> Self {
89        Pool {
90            channel: self.channel.clone(),
91            metrics: self.metrics.clone(),
92        }
93    }
94}
95
96impl<S, M, E> ForwardFuture<S, M, E>
97    where S: Sink<SinkError=private::Done>,
98          M: Collect,
99          E: ErrorLog,
100{
101    fn poll_forever(&mut self) -> Async<()> {
102        if let Some(item) = self.buffer.take() {
103            match self.sink.start_send(item) {
104                Ok(AsyncSink::Ready) => {
105                    self.metrics.request_forwarded();
106                }
107                Ok(AsyncSink::NotReady(item)) => {
108                    self.buffer = Some(item);
109                    return Async::NotReady;
110                }
111                Err(private::Done) => return Async::Ready(()),
112            }
113        }
114
115        let was_done = self.receiver.is_done();
116        loop {
117            match self.receiver.poll() {
118                Ok(Async::Ready(Some(item))) => {
119                    match self.sink.start_send(item) {
120                        Ok(AsyncSink::Ready) => {
121                            self.metrics.request_forwarded();
122                            continue;
123                        }
124                        Ok(AsyncSink::NotReady(item)) => {
125                            self.buffer = Some(item);
126                            return Async::NotReady;
127                        }
128                        Err(private::Done) => return Async::Ready(()),
129                    }
130                }
131                Ok(Async::Ready(None)) => {
132                    if !was_done {
133                        self.errors.pool_shutting_down(
134                            ShutdownReason::RequestStreamClosed);
135                    }
136                    match self.sink.close() {
137                        Ok(Async::NotReady) => {
138                            return Async::NotReady;
139                        }
140                        Ok(Async::Ready(())) | Err(private::Done) => {
141                            return Async::Ready(());
142                        }
143                    }
144                }
145                Ok(Async::NotReady) => match self.sink.poll_complete() {
146                    Ok(_) => {
147                        return Async::NotReady;
148                    }
149                    Err(private::Done) => {
150                        return Async::Ready(());
151                    }
152                }
153                // No errors in channel receiver
154                Err(()) => unreachable!(),
155            }
156        }
157    }
158}
159
160impl<S, M, E> Future for ForwardFuture<S, M, E>
161    where S: Sink<SinkError=private::Done>,
162          M: Collect,
163          E: ErrorLog,
164{
165    type Item = ();
166    type Error = ();  // Really Void
167    fn poll(&mut self) -> Result<Async<()>, ()> {
168        match self.poll_forever() {
169            Async::NotReady => Ok(Async::NotReady),
170            Async::Ready(()) => {
171                self.errors.pool_closed();
172                self.metrics.pool_closed();
173                Ok(Async::Ready(()))
174            }
175        }
176    }
177}
178
179
180impl<V, M> Sink for Pool<V, M>
181    where M: Collect,
182{
183    type SinkItem=V;
184    type SinkError=QueueError<V>;
185
186    fn start_send(&mut self, item: Self::SinkItem)
187        -> StartSend<Self::SinkItem, Self::SinkError>
188    {
189        match self.channel.start_send(item) {
190            Ok(AsyncSink::Ready) => {
191                self.metrics.request_queued();
192                Ok(AsyncSink::Ready)
193            }
194            Ok(AsyncSink::NotReady(item)) => Ok(AsyncSink::NotReady(item)),
195            Err(e) => Err(QueueError(e.into_inner())),
196        }
197    }
198    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
199        // TODO(tailhook) turn closed flag into error
200        self.channel.poll_complete()
201        .map_err(|_| {
202            // In fact poll_complete of the channel does nothing for now
203            // Even if this is fixed there is no sense to return error for
204            // it because error contains a value SinkItem and there is no way
205            // to construct a value from nothing
206            unreachable!();
207        })
208    }
209    fn close(&mut self) -> Poll<(), Self::SinkError> {
210        self.channel.close()
211        .map_err(|_| {
212            // In fact close of the channel does nothing for now
213            // Even if this is fixed there is no sense to return error for
214            // it because error contains a value SinkItem and there is no way
215            // to construct a value from nothing
216            unreachable!();
217        })
218    }
219}
220
221impl<T> QueueError<T> {
222    /// Return ownership of contained message
223    pub fn into_inner(self) -> T {
224        self.0
225    }
226}
227
228impl<T> fmt::Display for QueueError<T> {
229    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
230        f.write_str("connection pool is closed")
231    }
232}
233
234impl<T> fmt::Debug for QueueError<T> {
235    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
236        f.write_str("QueueError(_)")
237    }
238}
239
240impl<T> ::std::error::Error for QueueError<T> {
241    fn description(&self) -> &str {
242        "QueueError"
243    }
244    fn cause(&self) -> Option<&::std::error::Error> {
245        None
246    }
247}