1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
use std::sync::Arc;
use std::time::Instant;
use future::BoxFuture;
use futures::prelude::*;
use redis::IntoConnectionInfo;
use tokio::time::timeout_at;
use trust_dns_resolver::TokioAsyncResolver;
use crate::connection_factory::ConnectionFactory;
use crate::error::{Error, InitializationResult};
use crate::pool_connection::ConnectionFlavour;
use crate::{Ping, PingState};
pub struct RedisRsFactory {
connects_to: Arc<String>,
}
impl RedisRsFactory {
pub fn new(connect_to: String) -> InitializationResult<Self> {
Ok(Self {
connects_to: (Arc::new(connect_to)),
})
}
}
impl ConnectionFactory for RedisRsFactory {
type Connection = ConnectionFlavour;
fn create_connection(&self) -> BoxFuture<Result<Self::Connection, Error>> {
async move {
let mut url = redis::parse_redis_url(&self.connects_to)
.map_err(|()| Error::message(format!("Invalid redis url: {}", self.connects_to)))?;
let resolver = TokioAsyncResolver::tokio_from_system_conf()
.await
.map_err(|err| Error::new("Failed to resolve", Some(err)))?;
let host = url
.host_str()
.ok_or_else(|| Error::message("Redis url has no host part"))?;
let addrs = resolver
.lookup_ip(host)
.await
.map_err(|err| Error::new("Failed to look up address", Some(err)))?;
let addr = addrs
.into_iter()
.next()
.ok_or_else(|| Error::message("No addresses were returned"))?;
url.set_ip_host(addr).ok();
let connection_info = url.into_connection_info().map_err(|err| {
Error::new("Failed to turn redis url into connection inf", Some(err))
})?;
let connection = redis::aio::connect_tokio(&connection_info).await?;
let connection = ConnectionFlavour::RedisRs(connection, self.connects_to.clone());
Ok(connection)
}
.boxed()
}
fn connecting_to(&self) -> &str {
&*self.connects_to
}
fn ping(&self, timeout: Instant) -> BoxFuture<Ping> {
use crate::commands::Commands;
let started_at = Instant::now();
let ping_future = async move {
let connection = self.create_connection().await;
let connect_time = Some(started_at.elapsed());
let uri = self.connecting_to().to_owned();
let mut connection = match connection {
Ok(connection) => connection,
Err(err) => {
return Ping {
uri,
state: PingState::failed_msg(format!(
"failed to create connection: {}",
err
)),
connect_time,
latency: None,
total_time: started_at.elapsed(),
}
}
};
let ping_started_at = Instant::now();
let ping_result = connection.ping().await;
let latency = Some(ping_started_at.elapsed());
let state = match ping_result {
Ok(()) => PingState::Ok,
Err(err) => PingState::failed_msg(format!("ping failed: {}", err)),
};
Ping {
uri: self.connecting_to().to_owned(),
state,
connect_time,
latency,
total_time: started_at.elapsed(),
}
};
timeout_at(timeout.into(), ping_future)
.unwrap_or_else(move |_| {
let total_time = started_at.elapsed();
let uri = self.connecting_to().to_owned();
let state = PingState::failed_msg(format!(
"ping to '{}' timed out after {:?}",
uri, total_time
));
Ping {
uri,
latency: None,
connect_time: None,
total_time,
state,
}
})
.boxed()
}
}
impl From<redis::RedisError> for Error {
fn from(err: redis::RedisError) -> Self {
Self::new("redis error", Some(err))
}
}