1use std::fmt;
3use futures::{AsyncSink, Stream, StartSend, Poll, Async};
4use futures::sync::mpsc::{self, channel, Sender};
5use futures::sink::Sink;
6use futures::stream::Fuse;
7use futures::future::Future;
8use tokio_core::reactor::Handle;
9
10use metrics::Collect;
11use error_log::{ErrorLog, ShutdownReason};
12use config::{Queue, DefaultQueue, private};
13
14
15#[derive(Debug)]
22pub struct Pool<V, M> {
23 channel: Sender<V>,
24 metrics: M,
25}
26
27pub struct QueueError<V>(V);
31
32
33#[derive(Debug)]
35#[must_use = "futures do nothing unless polled"]
36struct ForwardFuture<S, M, E>
37 where S: Sink
38{
39 receiver: Fuse<mpsc::Receiver<S::SinkItem>>,
40 buffer: Option<S::SinkItem>,
41 metrics: M,
42 errors: E,
43 sink: S,
44}
45
46impl<I: 'static, M> private::NewQueue<I, M> for DefaultQueue {
47 type Pool = Pool<I, M>;
48 fn spawn_on<S, E>(self, pool: S, err: E, metrics: M, handle: &Handle)
49 -> Self::Pool
50 where S: Sink<SinkItem=I, SinkError=private::Done> + 'static,
51 E: ErrorLog + 'static,
52 M: Collect + 'static,
53 {
54 Queue(100).spawn_on(pool, err, metrics, handle)
55 }
56}
57
58impl<I: 'static, M> private::NewQueue<I, M> for Queue {
59 type Pool = Pool<I, M>;
60 fn spawn_on<S, E>(self, pool: S, e: E, metrics: M, handle: &Handle)
61 -> Self::Pool
62 where S: Sink<SinkItem=I, SinkError=private::Done> + 'static,
63 E: ErrorLog + 'static,
64 M: Collect + 'static,
65 {
66 let buf_size = self.0.saturating_sub(1);
68 let (tx, rx) = channel(buf_size);
69 handle.spawn(ForwardFuture {
70 receiver: rx.fuse(),
71 metrics: metrics.clone(),
72 errors: e,
73 sink: pool,
74 buffer: None,
75 });
76 return Pool {
77 channel: tx,
78 metrics,
79 };
80 }
81}
82
83
84trait AssertTraits: Clone + Send + Sync {}
85impl<V: Send, M: Collect> AssertTraits for Pool<V, M> {}
86
87impl<V, M: Clone> Clone for Pool<V, M> {
88 fn clone(&self) -> Self {
89 Pool {
90 channel: self.channel.clone(),
91 metrics: self.metrics.clone(),
92 }
93 }
94}
95
96impl<S, M, E> ForwardFuture<S, M, E>
97 where S: Sink<SinkError=private::Done>,
98 M: Collect,
99 E: ErrorLog,
100{
101 fn poll_forever(&mut self) -> Async<()> {
102 if let Some(item) = self.buffer.take() {
103 match self.sink.start_send(item) {
104 Ok(AsyncSink::Ready) => {
105 self.metrics.request_forwarded();
106 }
107 Ok(AsyncSink::NotReady(item)) => {
108 self.buffer = Some(item);
109 return Async::NotReady;
110 }
111 Err(private::Done) => return Async::Ready(()),
112 }
113 }
114
115 let was_done = self.receiver.is_done();
116 loop {
117 match self.receiver.poll() {
118 Ok(Async::Ready(Some(item))) => {
119 match self.sink.start_send(item) {
120 Ok(AsyncSink::Ready) => {
121 self.metrics.request_forwarded();
122 continue;
123 }
124 Ok(AsyncSink::NotReady(item)) => {
125 self.buffer = Some(item);
126 return Async::NotReady;
127 }
128 Err(private::Done) => return Async::Ready(()),
129 }
130 }
131 Ok(Async::Ready(None)) => {
132 if !was_done {
133 self.errors.pool_shutting_down(
134 ShutdownReason::RequestStreamClosed);
135 }
136 match self.sink.close() {
137 Ok(Async::NotReady) => {
138 return Async::NotReady;
139 }
140 Ok(Async::Ready(())) | Err(private::Done) => {
141 return Async::Ready(());
142 }
143 }
144 }
145 Ok(Async::NotReady) => match self.sink.poll_complete() {
146 Ok(_) => {
147 return Async::NotReady;
148 }
149 Err(private::Done) => {
150 return Async::Ready(());
151 }
152 }
153 Err(()) => unreachable!(),
155 }
156 }
157 }
158}
159
160impl<S, M, E> Future for ForwardFuture<S, M, E>
161 where S: Sink<SinkError=private::Done>,
162 M: Collect,
163 E: ErrorLog,
164{
165 type Item = ();
166 type Error = (); fn poll(&mut self) -> Result<Async<()>, ()> {
168 match self.poll_forever() {
169 Async::NotReady => Ok(Async::NotReady),
170 Async::Ready(()) => {
171 self.errors.pool_closed();
172 self.metrics.pool_closed();
173 Ok(Async::Ready(()))
174 }
175 }
176 }
177}
178
179
180impl<V, M> Sink for Pool<V, M>
181 where M: Collect,
182{
183 type SinkItem=V;
184 type SinkError=QueueError<V>;
185
186 fn start_send(&mut self, item: Self::SinkItem)
187 -> StartSend<Self::SinkItem, Self::SinkError>
188 {
189 match self.channel.start_send(item) {
190 Ok(AsyncSink::Ready) => {
191 self.metrics.request_queued();
192 Ok(AsyncSink::Ready)
193 }
194 Ok(AsyncSink::NotReady(item)) => Ok(AsyncSink::NotReady(item)),
195 Err(e) => Err(QueueError(e.into_inner())),
196 }
197 }
198 fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
199 self.channel.poll_complete()
201 .map_err(|_| {
202 unreachable!();
207 })
208 }
209 fn close(&mut self) -> Poll<(), Self::SinkError> {
210 self.channel.close()
211 .map_err(|_| {
212 unreachable!();
217 })
218 }
219}
220
221impl<T> QueueError<T> {
222 pub fn into_inner(self) -> T {
224 self.0
225 }
226}
227
228impl<T> fmt::Display for QueueError<T> {
229 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
230 f.write_str("connection pool is closed")
231 }
232}
233
234impl<T> fmt::Debug for QueueError<T> {
235 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
236 f.write_str("QueueError(_)")
237 }
238}
239
240impl<T> ::std::error::Error for QueueError<T> {
241 fn description(&self) -> &str {
242 "QueueError"
243 }
244 fn cause(&self) -> Option<&::std::error::Error> {
245 None
246 }
247}