use std::sync::Arc;
use std::time::Instant;
use future::BoxFuture;
use futures::prelude::*;
use redis::{Client as RedisClient, IntoConnectionInfo};
use tokio::time::timeout_at;
use trust_dns_resolver::TokioAsyncResolver;
use crate::connection_factory::ConnectionFactory;
use crate::error::{Error, InitializationResult};
use crate::pool_connection::ConnectionFlavour;
use crate::{Ping, PingState};
pub struct RedisRsFactory {
connects_to: Arc<String>,
}
impl RedisRsFactory {
pub fn new(connect_to: String) -> InitializationResult<Self> {
Ok(Self {
connects_to: (Arc::new(connect_to)),
})
}
}
impl ConnectionFactory for RedisRsFactory {
type Connection = ConnectionFlavour;
fn create_connection(&self) -> BoxFuture<Result<Self::Connection, Error>> {
async move {
let mut url = redis::parse_redis_url(&self.connects_to).ok_or_else(|| {
Error::message(format!("Invalid redis url: {}", self.connects_to))
})?;
let resolver = TokioAsyncResolver::tokio_from_system_conf()
.map_err(|err| Error::new("Failed to resolve", Some(err)))?;
let host = url
.host_str()
.ok_or_else(|| Error::message("Redis url has no host part"))?;
let addrs = resolver
.lookup_ip(host)
.await
.map_err(|err| Error::new("Failed to look up address", Some(err)))?;
let addr = addrs.into_iter().next().ok_or_else(|| {
Error::message(format!("No addresses were resolved for {}", host))
})?;
url.set_ip_host(addr).ok();
let connection_info = url.into_connection_info().map_err(|err| {
Error::new("Failed to turn redis url into connection inf", Some(err))
})?;
let client = RedisClient::open(connection_info)?;
let connection = client.get_async_connection().await?;
let connection = ConnectionFlavour::RedisRs(connection, self.connects_to.clone());
Ok(connection)
}
.boxed()
}
fn connecting_to(&self) -> &str {
&*self.connects_to
}
fn ping(&self, timeout: Instant) -> BoxFuture<Ping> {
use crate::redis_ops::RedisOps;
let started_at = Instant::now();
let ping_future = async move {
let connection = self.create_connection().await;
let connect_time = Some(started_at.elapsed());
let uri = self.connecting_to().to_owned();
let mut connection = match connection {
Ok(connection) => connection,
Err(err) => {
return Ping {
uri,
state: PingState::failed_msg(format!(
"failed to create connection: {}",
err
)),
connect_time,
latency: None,
total_time: started_at.elapsed(),
}
}
};
let ping_started_at = Instant::now();
let ping_result = connection.ping().await;
let latency = Some(ping_started_at.elapsed());
let state = match ping_result {
Ok(_) => PingState::Ok,
Err(err) => PingState::failed_msg(format!("ping failed: {}", err)),
};
Ping {
uri: self.connecting_to().to_owned(),
state,
connect_time,
latency,
total_time: started_at.elapsed(),
}
};
timeout_at(timeout.into(), ping_future)
.unwrap_or_else(move |_| {
let total_time = started_at.elapsed();
let uri = self.connecting_to().to_owned();
let state = PingState::failed_msg(format!(
"ping to '{}' timed out after {:?}",
uri, total_time
));
Ping {
uri,
latency: None,
connect_time: None,
total_time,
state,
}
})
.boxed()
}
}