use std::cell::{Cell, RefCell};
use std::io::{BufRead, BufReader, Write};
use std::net::{self, TcpStream};
use std::path::PathBuf;
use std::str::from_utf8;
use std::time::Duration;
use url;
use cmd::{cmd, pipe, Pipeline};
use parser::Parser;
use types::{
from_redis_value, ErrorKind, FromRedisValue, RedisError, RedisResult, ToRedisArgs, Value,
};
#[cfg(all(
feature = "with-system-unix-sockets",
not(feature = "with-unix-sockets")
))]
use std::os::unix::net::UnixStream;
#[cfg(feature = "with-unix-sockets")]
use unix_socket::UnixStream;
static DEFAULT_PORT: u16 = 6379;
pub fn parse_redis_url(input: &str) -> Result<url::Url, ()> {
match url::Url::parse(input) {
Ok(result) => match result.scheme() {
"redis" | "redis+unix" | "unix" => Ok(result),
_ => Err(()),
},
Err(_) => Err(()),
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum ConnectionAddr {
Tcp(String, u16),
Unix(PathBuf),
}
impl ConnectionAddr {
pub fn is_supported(&self) -> bool {
match *self {
ConnectionAddr::Tcp(_, _) => true,
#[cfg(any(feature = "with-unix-sockets", feature = "with-system-unix-sockets"))]
ConnectionAddr::Unix(_) => true,
#[cfg(not(any(feature = "with-unix-sockets", feature = "with-system-unix-sockets")))]
ConnectionAddr::Unix(_) => false,
}
}
}
#[derive(Clone, Debug)]
pub struct ConnectionInfo {
pub addr: Box<ConnectionAddr>,
pub db: i64,
pub passwd: Option<String>,
}
pub trait IntoConnectionInfo {
fn into_connection_info(self) -> RedisResult<ConnectionInfo>;
}
impl IntoConnectionInfo for ConnectionInfo {
fn into_connection_info(self) -> RedisResult<ConnectionInfo> {
Ok(self)
}
}
impl<'a> IntoConnectionInfo for &'a str {
fn into_connection_info(self) -> RedisResult<ConnectionInfo> {
match parse_redis_url(self) {
Ok(u) => u.into_connection_info(),
Err(_) => fail!((ErrorKind::InvalidClientConfig, "Redis URL did not parse")),
}
}
}
fn url_to_tcp_connection_info(url: url::Url) -> RedisResult<ConnectionInfo> {
Ok(ConnectionInfo {
addr: Box::new(ConnectionAddr::Tcp(
match url.host() {
Some(host) => host.to_string(),
None => fail!((ErrorKind::InvalidClientConfig, "Missing hostname")),
},
url.port().unwrap_or(DEFAULT_PORT),
)),
db: match url.path().trim_matches('/') {
"" => 0,
path => unwrap_or!(
path.parse::<i64>().ok(),
fail!((ErrorKind::InvalidClientConfig, "Invalid database number"))
),
},
passwd: match url.password() {
Some(pw) => match url::percent_encoding::percent_decode(pw.as_bytes()).decode_utf8() {
Ok(decoded) => Some(decoded.into_owned()),
Err(_) => fail!((
ErrorKind::InvalidClientConfig,
"Password is not valid UTF-8 string"
)),
},
None => None,
},
})
}
#[cfg(any(feature = "with-unix-sockets", feature = "with-system-unix-sockets"))]
fn url_to_unix_connection_info(url: url::Url) -> RedisResult<ConnectionInfo> {
Ok(ConnectionInfo {
addr: Box::new(ConnectionAddr::Unix(unwrap_or!(
url.to_file_path().ok(),
fail!((ErrorKind::InvalidClientConfig, "Missing path"))
))),
db: match url
.query_pairs()
.into_iter()
.filter(|&(ref key, _)| key == "db")
.next()
{
Some((_, db)) => unwrap_or!(
db.parse::<i64>().ok(),
fail!((ErrorKind::InvalidClientConfig, "Invalid database number"))
),
None => 0,
},
passwd: url.password().and_then(|pw| Some(pw.to_string())),
})
}
#[cfg(not(any(feature = "with-unix-sockets", feature = "with-system-unix-sockets")))]
fn url_to_unix_connection_info(_: url::Url) -> RedisResult<ConnectionInfo> {
fail!((
ErrorKind::InvalidClientConfig,
"Unix sockets are not available on this platform."
));
}
impl IntoConnectionInfo for url::Url {
fn into_connection_info(self) -> RedisResult<ConnectionInfo> {
if self.scheme() == "redis" {
url_to_tcp_connection_info(self)
} else if self.scheme() == "unix" || self.scheme() == "redis+unix" {
url_to_unix_connection_info(self)
} else {
fail!((
ErrorKind::InvalidClientConfig,
"URL provided is not a redis URL"
));
}
}
}
struct TcpConnection {
reader: BufReader<TcpStream>,
open: bool,
}
#[cfg(any(feature = "with-unix-sockets", feature = "with-system-unix-sockets"))]
struct UnixConnection {
sock: BufReader<UnixStream>,
open: bool,
}
enum ActualConnection {
Tcp(TcpConnection),
#[cfg(any(feature = "with-unix-sockets", feature = "with-system-unix-sockets"))]
Unix(UnixConnection),
}
pub struct Connection {
con: RefCell<ActualConnection>,
db: i64,
pubsub: Cell<bool>,
}
pub struct PubSub<'a> {
con: &'a mut Connection,
}
pub struct Msg {
payload: Value,
channel: Value,
pattern: Option<Value>,
}
impl ActualConnection {
pub fn new(addr: &ConnectionAddr) -> RedisResult<ActualConnection> {
Ok(match *addr {
ConnectionAddr::Tcp(ref host, ref port) => {
let host: &str = &*host;
let tcp = TcpStream::connect((host, *port))?;
let buffered = BufReader::new(tcp);
ActualConnection::Tcp(TcpConnection {
reader: buffered,
open: true,
})
}
#[cfg(any(feature = "with-unix-sockets", feature = "with-system-unix-sockets"))]
ConnectionAddr::Unix(ref path) => ActualConnection::Unix(UnixConnection {
sock: BufReader::new(UnixStream::connect(path)?),
open: true,
}),
#[cfg(not(any(feature = "with-unix-sockets", feature = "with-system-unix-sockets")))]
ConnectionAddr::Unix(ref path) => {
fail!((
ErrorKind::InvalidClientConfig,
"Cannot connect to unix sockets \
on this platform"
));
}
})
}
pub fn send_bytes(&mut self, bytes: &[u8]) -> RedisResult<Value> {
match *self {
ActualConnection::Tcp(ref mut connection) => {
let res = connection
.reader
.get_mut()
.write_all(bytes)
.map_err(|e| RedisError::from(e));
match res {
Err(e) => {
if e.is_connection_dropped() {
connection.open = false;
}
Err(e)
}
Ok(_) => Ok(Value::Okay),
}
}
#[cfg(any(feature = "with-unix-sockets", feature = "with-system-unix-sockets"))]
ActualConnection::Unix(ref mut connection) => {
let result = connection
.sock
.get_mut()
.write_all(bytes)
.map_err(|e| RedisError::from(e));
match result {
Err(e) => {
if e.is_connection_dropped() {
connection.open = false;
}
Err(e)
}
Ok(_) => Ok(Value::Okay),
}
}
}
}
pub fn read_response(&mut self) -> RedisResult<Value> {
let result = Parser::new(match *self {
ActualConnection::Tcp(TcpConnection { ref mut reader, .. }) => reader as &mut BufRead,
#[cfg(any(feature = "with-unix-sockets", feature = "with-system-unix-sockets"))]
ActualConnection::Unix(UnixConnection { ref mut sock, .. }) => sock as &mut BufRead,
})
.parse_value();
match result {
Err(ref e) if e.kind() == ErrorKind::ResponseError => match *self {
ActualConnection::Tcp(ref mut connection) => {
let _ = connection.reader.get_mut().shutdown(net::Shutdown::Both);
connection.open = false;
}
#[cfg(any(feature = "with-unix-sockets", feature = "with-system-unix-sockets"))]
ActualConnection::Unix(ref mut connection) => {
let _ = connection.sock.get_mut().shutdown(net::Shutdown::Both);
connection.open = false;
}
},
_ => (),
}
result
}
pub fn set_write_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
match *self {
ActualConnection::Tcp(TcpConnection { ref reader, .. }) => {
reader.get_ref().set_write_timeout(dur)?;
}
#[cfg(any(feature = "with-unix-sockets", feature = "with-system-unix-sockets"))]
ActualConnection::Unix(UnixConnection { ref sock, .. }) => {
sock.get_ref().set_write_timeout(dur)?;
}
}
Ok(())
}
pub fn set_read_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
match *self {
ActualConnection::Tcp(TcpConnection { ref reader, .. }) => {
reader.get_ref().set_read_timeout(dur)?;
}
#[cfg(any(feature = "with-unix-sockets", feature = "with-system-unix-sockets"))]
ActualConnection::Unix(UnixConnection { ref sock, .. }) => {
sock.get_ref().set_read_timeout(dur)?;
}
}
Ok(())
}
pub fn is_open(&self) -> bool {
match *self {
ActualConnection::Tcp(TcpConnection { open, .. }) => open,
#[cfg(any(feature = "with-unix-sockets", feature = "with-system-unix-sockets"))]
ActualConnection::Unix(UnixConnection { open, .. }) => open,
}
}
}
pub fn connect(connection_info: &ConnectionInfo) -> RedisResult<Connection> {
let con = ActualConnection::new(&connection_info.addr)?;
let rv = Connection {
con: RefCell::new(con),
db: connection_info.db,
pubsub: Cell::new(false),
};
match connection_info.passwd {
Some(ref passwd) => match cmd("AUTH").arg(&**passwd).query::<Value>(&rv) {
Ok(Value::Okay) => {}
_ => {
fail!((
ErrorKind::AuthenticationFailed,
"Password authentication failed"
));
}
},
None => {}
}
if connection_info.db != 0 {
match cmd("SELECT").arg(connection_info.db).query::<Value>(&rv) {
Ok(Value::Okay) => {}
_ => fail!((
ErrorKind::ResponseError,
"Redis server refused to switch database"
)),
}
}
Ok(rv)
}
pub trait ConnectionLike {
fn req_packed_command(&self, cmd: &[u8]) -> RedisResult<Value>;
fn req_packed_commands(
&self,
cmd: &[u8],
offset: usize,
count: usize,
) -> RedisResult<Vec<Value>>;
fn get_db(&self) -> i64;
}
impl Connection {
pub fn send_packed_command(&self, cmd: &[u8]) -> RedisResult<()> {
self.con.borrow_mut().send_bytes(cmd)?;
Ok(())
}
pub fn recv_response(&self) -> RedisResult<Value> {
self.con.borrow_mut().read_response()
}
pub fn set_write_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
self.con.borrow().set_write_timeout(dur)
}
pub fn set_read_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
self.con.borrow().set_read_timeout(dur)
}
pub fn as_pubsub<'a>(&'a mut self) -> PubSub<'a> {
PubSub::new(self)
}
fn exit_pubsub(&self) -> RedisResult<()> {
let res = self.clear_active_subscriptions();
if res.is_ok() {
self.pubsub.set(false);
} else {
self.pubsub.set(true);
}
res
}
fn clear_active_subscriptions(&self) -> RedisResult<()> {
{
let unsubscribe = cmd("UNSUBSCRIBE").get_packed_command();
let punsubscribe = cmd("PUNSUBSCRIBE").get_packed_command();
let mut con = self.con.borrow_mut();
con.send_bytes(&unsubscribe)?;
con.send_bytes(&punsubscribe)?;
}
let mut received_unsub = false;
let mut received_punsub = false;
loop {
let res: (Vec<u8>, (), isize) = from_redis_value(&self.recv_response()?)?;
match res.0.first().map(|v| *v) {
Some(b'u') => received_unsub = true,
Some(b'p') => received_punsub = true,
_ => (),
}
if received_unsub && received_punsub && res.2 == 0 {
break;
}
}
Ok(())
}
pub fn is_open(&self) -> bool {
self.con.borrow().is_open()
}
}
impl ConnectionLike for Connection {
fn req_packed_command(&self, cmd: &[u8]) -> RedisResult<Value> {
if self.pubsub.get() {
self.exit_pubsub()?;
}
let mut con = self.con.borrow_mut();
con.send_bytes(cmd)?;
con.read_response()
}
fn req_packed_commands(
&self,
cmd: &[u8],
offset: usize,
count: usize,
) -> RedisResult<Vec<Value>> {
if self.pubsub.get() {
self.exit_pubsub()?;
}
let mut con = self.con.borrow_mut();
con.send_bytes(cmd)?;
let mut rv = vec![];
for idx in 0..(offset + count) {
let item = con.read_response()?;
if idx >= offset {
rv.push(item);
}
}
Ok(rv)
}
fn get_db(&self) -> i64 {
self.db
}
}
impl<'a> PubSub<'a> {
fn new(con: &'a mut Connection) -> Self {
Self { con }
}
pub fn subscribe<T: ToRedisArgs>(&mut self, channel: T) -> RedisResult<()> {
let _: () = cmd("SUBSCRIBE").arg(channel).query(self.con)?;
Ok(())
}
pub fn psubscribe<T: ToRedisArgs>(&mut self, pchannel: T) -> RedisResult<()> {
let _: () = cmd("PSUBSCRIBE").arg(pchannel).query(self.con)?;
Ok(())
}
pub fn unsubscribe<T: ToRedisArgs>(&mut self, channel: T) -> RedisResult<()> {
let _: () = cmd("UNSUBSCRIBE").arg(channel).query(self.con)?;
Ok(())
}
pub fn punsubscribe<T: ToRedisArgs>(&mut self, pchannel: T) -> RedisResult<()> {
let _: () = cmd("PUNSUBSCRIBE").arg(pchannel).query(self.con)?;
Ok(())
}
pub fn get_message(&self) -> RedisResult<Msg> {
loop {
let raw_msg: Vec<Value> = from_redis_value(&self.con.recv_response()?)?;
let mut iter = raw_msg.into_iter();
let msg_type: String = from_redis_value(&unwrap_or!(iter.next(), continue))?;
let mut pattern = None;
let payload;
let channel;
if msg_type == "message" {
channel = unwrap_or!(iter.next(), continue);
payload = unwrap_or!(iter.next(), continue);
} else if msg_type == "pmessage" {
pattern = Some(unwrap_or!(iter.next(), continue));
channel = unwrap_or!(iter.next(), continue);
payload = unwrap_or!(iter.next(), continue);
} else {
continue;
}
return Ok(Msg {
payload: payload,
channel: channel,
pattern: pattern,
});
}
}
pub fn set_read_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
self.con.set_read_timeout(dur)
}
}
impl<'a> Drop for PubSub<'a> {
fn drop(&mut self) {
let _ = self.con.exit_pubsub();
}
}
impl Msg {
pub fn get_channel<T: FromRedisValue>(&self) -> RedisResult<T> {
from_redis_value(&self.channel)
}
pub fn get_channel_name(&self) -> &str {
match self.channel {
Value::Data(ref bytes) => from_utf8(bytes).unwrap_or("?"),
_ => "?",
}
}
pub fn get_payload<T: FromRedisValue>(&self) -> RedisResult<T> {
from_redis_value(&self.payload)
}
pub fn get_payload_bytes(&self) -> &[u8] {
match self.payload {
Value::Data(ref bytes) => bytes,
_ => b"",
}
}
pub fn from_pattern(&self) -> bool {
self.pattern.is_some()
}
pub fn get_pattern<T: FromRedisValue>(&self) -> RedisResult<T> {
match self.pattern {
None => from_redis_value(&Value::Nil),
Some(ref x) => from_redis_value(x),
}
}
}
pub fn transaction<
K: ToRedisArgs,
T: FromRedisValue,
F: FnMut(&mut Pipeline) -> RedisResult<Option<T>>,
>(
con: &ConnectionLike,
keys: &[K],
func: F,
) -> RedisResult<T> {
let mut func = func;
loop {
let _: () = cmd("WATCH").arg(keys).query(con)?;
let mut p = pipe();
let response: Option<T> = func(p.atomic())?;
match response {
None => {
continue;
}
Some(response) => {
let _: () = cmd("UNWATCH").query(con)?;
return Ok(response);
}
}
}
}