#[cfg(feature="with-unix-sockets")]
use std::path::PathBuf;
use std::io::{Read, BufReader, Write};
use std::net::{self, TcpStream};
use std::str::from_utf8;
use std::cell::RefCell;
use std::collections::HashSet;
use std::time::Duration;
use url;
use cmd::{cmd, pipe, Pipeline};
use types::{RedisResult, Value, ToRedisArgs, FromRedisValue, from_redis_value,
ErrorKind};
use parser::Parser;
#[cfg(feature="with-unix-sockets")]
use unix_socket::UnixStream;
static DEFAULT_PORT: u16 = 6379;
pub fn parse_redis_url(input: &str) -> Result<url::Url, ()> {
match url::Url::parse(input) {
Ok(result) => {
if result.scheme() == "redis" || result.scheme() == "unix" {
Ok(result)
} else {
Err(())
}
},
Err(_) => Err(()),
}
}
#[derive(Clone, Debug)]
pub enum ConnectionAddr {
Tcp(String, u16),
#[cfg(feature="with-unix-sockets")]
Unix(PathBuf),
}
#[derive(Clone, Debug)]
pub struct ConnectionInfo {
pub addr: Box<ConnectionAddr>,
pub db: i64,
pub passwd: Option<String>,
}
pub trait IntoConnectionInfo {
fn into_connection_info(self) -> RedisResult<ConnectionInfo>;
}
impl IntoConnectionInfo for ConnectionInfo {
fn into_connection_info(self) -> RedisResult<ConnectionInfo> {
Ok(self)
}
}
impl<'a> IntoConnectionInfo for &'a str {
fn into_connection_info(self) -> RedisResult<ConnectionInfo> {
match parse_redis_url(self) {
Ok(u) => u.into_connection_info(),
Err(_) => fail!((ErrorKind::InvalidClientConfig, "Redis URL did not parse")),
}
}
}
fn url_to_tcp_connection_info(url: url::Url) -> RedisResult<ConnectionInfo> {
Ok(ConnectionInfo {
addr: Box::new(ConnectionAddr::Tcp(
match url.host() {
Some(host) => host.to_string(),
None => fail!((ErrorKind::InvalidClientConfig, "Missing hostname")),
},
url.port().unwrap_or(DEFAULT_PORT)
)),
db: match url.path().trim_matches('/') {
"" => 0,
path => unwrap_or!(path.parse::<i64>().ok(),
fail!((ErrorKind::InvalidClientConfig, "Invalid database number"))),
},
passwd: url.password().and_then(|pw| Some(pw.to_string())),
})
}
#[cfg(feature="with-unix-sockets")]
fn url_to_unix_connection_info(url: url::Url) -> RedisResult<ConnectionInfo> {
Ok(ConnectionInfo {
addr: Box::new(ConnectionAddr::Unix(
unwrap_or!(url.to_file_path().ok(),
fail!((ErrorKind::InvalidClientConfig, "Missing path"))),
)),
db: match url.query_pairs().into_iter().filter(|&(ref key, _)| key == "db").next() {
Some((_, db)) => unwrap_or!(db.parse::<i64>().ok(),
fail!((ErrorKind::InvalidClientConfig, "Invalid database number"))),
None => 0,
},
passwd: url.password().and_then(|pw| Some(pw.to_string())),
})
}
#[cfg(not(feature="with-unix-sockets"))]
fn url_to_unix_connection_info(_: url::Url) -> RedisResult<ConnectionInfo> {
fail!((ErrorKind::InvalidClientConfig,
"This version of redis-rs is not compiled with Unix socket support."));
}
impl IntoConnectionInfo for url::Url {
fn into_connection_info(self) -> RedisResult<ConnectionInfo> {
if self.scheme() == "redis" {
url_to_tcp_connection_info(self)
} else if self.scheme() == "unix" {
url_to_unix_connection_info(self)
} else {
fail!((ErrorKind::InvalidClientConfig, "URL provided is not a redis URL"));
}
}
}
enum ActualConnection {
Tcp(BufReader<TcpStream>),
#[cfg(feature="with-unix-sockets")]
Unix(UnixStream),
}
pub struct Connection {
con: RefCell<ActualConnection>,
db: i64,
}
pub struct PubSub {
con: Connection,
channels: HashSet<Vec<u8>>,
pchannels: HashSet<Vec<u8>>,
}
pub struct Msg {
payload: Value,
channel: Value,
pattern: Option<Value>,
}
impl ActualConnection {
pub fn new(addr: &ConnectionAddr) -> RedisResult<ActualConnection> {
Ok(match *addr {
ConnectionAddr::Tcp(ref host, ref port) => {
let host : &str = &*host;
let tcp = try!(TcpStream::connect((host, *port)));
let buffered = BufReader::new(tcp);
ActualConnection::Tcp(buffered)
}
#[cfg(feature="with-unix-sockets")]
ConnectionAddr::Unix(ref path) => {
ActualConnection::Unix(try!(UnixStream::connect(path)))
}
})
}
pub fn send_bytes(&mut self, bytes: &[u8]) -> RedisResult<Value> {
let w = match *self {
ActualConnection::Tcp(ref mut reader) => {
reader.get_mut() as &mut Write
}
#[cfg(feature="with-unix-sockets")]
ActualConnection::Unix(ref mut sock) => {
&mut *sock as &mut Write
}
};
try!(w.write(bytes));
Ok(Value::Okay)
}
pub fn read_response(&mut self) -> RedisResult<Value> {
let result = Parser::new(match *self {
ActualConnection::Tcp(ref mut reader) => {
reader as &mut Read
}
#[cfg(feature="with-unix-sockets")]
ActualConnection::Unix(ref mut sock) => {
&mut *sock as &mut Read
}
}).parse_value();
match result {
Err(ref e) if e.kind() == ErrorKind::ResponseError => {
match *self {
ActualConnection::Tcp(ref mut reader) => {
let _ = reader.get_mut().shutdown(net::Shutdown::Both);
}
#[cfg(feature="with-unix-sockets")]
ActualConnection::Unix(ref mut sock) => {
let _ = sock.shutdown(net::Shutdown::Both);
}
}
}
_ => ()
}
result
}
pub fn set_write_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
match *self {
ActualConnection::Tcp(ref reader) => {
try!(reader.get_ref().set_write_timeout(dur));
}
#[cfg(feature="with-unix-sockets")]
ActualConnection::Unix(ref sock) => {
try!(sock.set_write_timeout(dur));
}
}
Ok(())
}
pub fn set_read_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
match *self {
ActualConnection::Tcp(ref reader) => {
try!(reader.get_ref().set_read_timeout(dur));
}
#[cfg(feature="with-unix-sockets")]
ActualConnection::Unix(ref sock) => {
try!(sock.set_read_timeout(dur));
}
}
Ok(())
}
}
pub fn connect(connection_info: &ConnectionInfo) -> RedisResult<Connection> {
let con = try!(ActualConnection::new(&connection_info.addr));
let rv = Connection { con: RefCell::new(con), db: connection_info.db };
match connection_info.passwd {
Some(ref passwd) => {
match cmd("AUTH").arg(&**passwd).query::<Value>(&rv) {
Ok(Value::Okay) => {}
_ => { fail!((ErrorKind::AuthenticationFailed,
"Password authentication failed")); }
}
},
None => {},
}
if connection_info.db != 0 {
match cmd("SELECT").arg(connection_info.db).query::<Value>(&rv) {
Ok(Value::Okay) => {}
_ => fail!((ErrorKind::ResponseError, "Redis server refused to switch database"))
}
}
Ok(rv)
}
pub fn connect_pubsub(connection_info: &ConnectionInfo) -> RedisResult<PubSub> {
Ok(PubSub {
con: try!(connect(connection_info)),
channels: HashSet::new(),
pchannels: HashSet::new(),
})
}
pub trait ConnectionLike {
fn req_packed_command(&self, cmd: &[u8]) -> RedisResult<Value>;
fn req_packed_commands(&self, cmd: &[u8],
offset: usize, count: usize) -> RedisResult<Vec<Value>>;
fn get_db(&self) -> i64;
}
impl Connection {
pub fn send_packed_command(&self, cmd: &[u8]) -> RedisResult<()> {
try!(self.con.borrow_mut().send_bytes(cmd));
Ok(())
}
pub fn recv_response(&self) -> RedisResult<Value> {
self.con.borrow_mut().read_response()
}
pub fn set_write_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
self.con.borrow().set_write_timeout(dur)
}
pub fn set_read_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
self.con.borrow().set_read_timeout(dur)
}
}
impl ConnectionLike for Connection {
fn req_packed_command(&self, cmd: &[u8]) -> RedisResult<Value> {
let mut con = self.con.borrow_mut();
try!(con.send_bytes(cmd));
con.read_response()
}
fn req_packed_commands(&self, cmd: &[u8],
offset: usize, count: usize) -> RedisResult<Vec<Value>> {
let mut con = self.con.borrow_mut();
try!(con.send_bytes(cmd));
let mut rv = vec![];
for idx in 0..(offset + count) {
let item = try!(con.read_response());
if idx >= offset {
rv.push(item);
}
}
Ok(rv)
}
fn get_db(&self) -> i64 {
self.db
}
}
impl PubSub {
fn get_channel<T: ToRedisArgs>(&mut self, channel: &T) -> Vec<u8> {
let mut chan = vec![];
for item in channel.to_redis_args().iter() {
chan.extend(item.iter().cloned());
}
chan
}
pub fn subscribe<T: ToRedisArgs>(&mut self, channel: T) -> RedisResult<()> {
let chan = self.get_channel(&channel);
let _ : () = try!(cmd("SUBSCRIBE").arg(&*chan).query(&self.con));
self.channels.insert(chan);
Ok(())
}
pub fn psubscribe<T: ToRedisArgs>(&mut self, pchannel: T) -> RedisResult<()> {
let chan = self.get_channel(&pchannel);
let _ : () = try!(cmd("PSUBSCRIBE").arg(&*chan).query(&self.con));
self.pchannels.insert(chan);
Ok(())
}
pub fn unsubscribe<T: ToRedisArgs>(&mut self, channel: T) -> RedisResult<()> {
let chan = self.get_channel(&channel);
let _ : () = try!(cmd("UNSUBSCRIBE").arg(&*chan).query(&self.con));
self.channels.remove(&chan);
Ok(())
}
pub fn punsubscribe<T: ToRedisArgs>(&mut self, pchannel: T) -> RedisResult<()> {
let chan = self.get_channel(&pchannel);
let _ : () = try!(cmd("PUNSUBSCRIBE").arg(&*chan).query(&self.con));
self.pchannels.remove(&chan);
Ok(())
}
pub fn get_message(&self) -> RedisResult<Msg> {
loop {
let raw_msg : Vec<Value> = try!(from_redis_value(
&try!(self.con.recv_response())));
let mut iter = raw_msg.into_iter();
let msg_type : String = try!(from_redis_value(
&unwrap_or!(iter.next(), continue)));
let mut pattern = None;
let payload;
let channel;
if msg_type == "message" {
channel = unwrap_or!(iter.next(), continue);
payload = unwrap_or!(iter.next(), continue);
} else if msg_type == "pmessage" {
pattern = Some(unwrap_or!(iter.next(), continue));
channel = unwrap_or!(iter.next(), continue);
payload = unwrap_or!(iter.next(), continue);
} else {
continue;
}
return Ok(Msg {
payload: payload,
channel: channel,
pattern: pattern,
})
}
}
pub fn set_read_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
self.con.set_read_timeout(dur)
}
}
impl Msg {
pub fn get_channel<T: FromRedisValue>(&self) -> RedisResult<T> {
from_redis_value(&self.channel)
}
pub fn get_channel_name(&self) -> &str {
match self.channel {
Value::Data(ref bytes) => from_utf8(bytes).unwrap_or("?"),
_ => "?"
}
}
pub fn get_payload<T: FromRedisValue>(&self) -> RedisResult<T> {
from_redis_value(&self.payload)
}
pub fn get_payload_bytes(&self) -> &[u8] {
match self.channel {
Value::Data(ref bytes) => bytes,
_ => b""
}
}
pub fn from_pattern(&self) -> bool {
self.pattern.is_some()
}
pub fn get_pattern<T: FromRedisValue>(&self) -> RedisResult<T> {
match self.pattern {
None => from_redis_value(&Value::Nil),
Some(ref x) => from_redis_value(x),
}
}
}
pub fn transaction<K: ToRedisArgs, T: FromRedisValue, F: FnMut(&mut Pipeline) -> RedisResult<Option<T>>>
(con: &ConnectionLike, keys: &[K], func: F) -> RedisResult<T> {
let mut func = func;
loop {
let _ : () = try!(cmd("WATCH").arg(keys).query(con));
let mut p = pipe();
let response : Option<T> = try!(func(p.atomic()));
match response {
None => { continue; }
Some(response) => {
let _ : () = try!(cmd("UNWATCH").query(con));
return Ok(response);
}
}
}
}