1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#![cfg_attr(finchers_inject_extern_prelude, feature(extern_prelude))]
#![doc(html_root_url = "https://docs.rs/finchers-r2d2/0.1.3")]
#![warn(
missing_docs,
missing_debug_implementations,
nonstandard_style,
rust_2018_idioms,
unused,
)]
#![cfg_attr(finchers_deny_warnings, deny(warnings))]
#![cfg_attr(finchers_deny_warnings, doc(test(attr(deny(warnings)))))]
extern crate finchers;
extern crate futures;
#[macro_use]
extern crate log;
extern crate r2d2;
extern crate tokio_executor;
extern crate tokio_threadpool;
pub mod current_thread;
#[doc(no_inline)]
pub use r2d2::*;
pub use imp::{pool_endpoint, PoolEndpoint};
mod imp {
use finchers::endpoint::{ApplyContext, ApplyResult, Endpoint};
use finchers::error;
use finchers::error::Error;
use futures::future::poll_fn;
use futures::sync::oneshot;
use futures::{Async, Future, Poll};
use r2d2::{ManageConnection, Pool, PooledConnection};
use std::fmt;
use tokio_executor::DefaultExecutor;
use tokio_threadpool::blocking;
pub fn pool_endpoint<M>(pool: Pool<M>) -> PoolEndpoint<M>
where
M: ManageConnection,
{
PoolEndpoint {
pool,
preflight_before_spawn: true,
}
}
#[derive(Debug, Clone)]
pub struct PoolEndpoint<M: ManageConnection> {
pool: Pool<M>,
preflight_before_spawn: bool,
}
impl<M> PoolEndpoint<M>
where
M: ManageConnection,
{
pub fn preflight_before_spawn(self, enabled: bool) -> Self {
Self {
preflight_before_spawn: enabled,
..self
}
}
fn preflight(&self) -> Option<PooledConnection<M>> {
if self.preflight_before_spawn {
self.pool.try_get()
} else {
None
}
}
}
impl<'a, M> Endpoint<'a> for PoolEndpoint<M>
where
M: ManageConnection + 'a,
{
type Output = (PooledConnection<M>,);
type Future = PoolFuture<'a, M>;
fn apply(&'a self, _: &mut ApplyContext<'_>) -> ApplyResult<Self::Future> {
Ok(PoolFuture {
endpoint: self,
handle: None,
})
}
}
pub struct PoolFuture<'a, M: ManageConnection> {
endpoint: &'a PoolEndpoint<M>,
handle: Option<oneshot::SpawnHandle<PooledConnection<M>, Error>>,
}
impl<'a, M> fmt::Debug for PoolFuture<'a, M>
where
M: ManageConnection + fmt::Debug,
M::Connection: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PoolFuture")
.field("endpoint", &self.endpoint)
.field("handle", &self.handle)
.finish()
}
}
impl<'a, M> Future for PoolFuture<'a, M>
where
M: ManageConnection,
{
type Item = (PooledConnection<M>,);
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
if let Some(ref mut handle) = self.handle {
trace!("retrieved the connection from spawned task");
return handle.poll().map(|x| x.map(|conn| (conn,)));
}
if let Some(conn) = self.endpoint.preflight() {
trace!("retrieved the connection without spawning the task");
return Ok(Async::Ready((conn,)));
}
trace!("spawning the task for executing blocking section");
let pool = self.endpoint.pool.clone();
let future = poll_fn(move || match blocking(|| pool.get()) {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(res)) => res.map(Async::Ready).map_err(error::fail),
Err(blocking_err) => Err(error::fail(blocking_err)),
});
self.handle = Some(oneshot::spawn(future, &DefaultExecutor::current()));
}
}
}
}