use std::collections::{BTreeMap, BTreeSet};
use std::default::Default;
use std::error;
use std::ffi::{CString, NulError};
use std::fmt;
use std::hash::{BuildHasher, Hash};
use std::io;
use std::num::ParseIntError;
use std::str::{Utf8Error, from_utf8};
use std::string::FromUtf8Error;
use std::sync::{
Arc,
atomic::{AtomicIsize, Ordering},
};
use crate::cluster::routing::Redirect;
use num_bigint::BigInt;
pub(crate) use std::collections::{HashMap, HashSet};
use std::ops::Deref;
use strum_macros::Display;
macro_rules! invalid_type_error {
($v:expr, $det:expr) => {{ fail!(invalid_type_error_inner!($v, $det)) }};
}
macro_rules! invalid_type_error_inner {
($v:expr, $det:expr) => {
Error::from((
ErrorKind::TypeError,
"Response was of incompatible type",
format!("{:?} (response was {:?})", $det, $v),
))
};
}
pub enum Expiry {
EX(usize),
PX(usize),
EXAT(usize),
PXAT(usize),
PERSIST,
}
#[derive(Clone, Copy)]
pub enum SetExpiry {
EX(usize),
PX(usize),
EXAT(usize),
PXAT(usize),
KEEPTTL,
}
#[derive(Clone, Copy)]
pub enum ExistenceCheck {
NX,
XX,
}
#[derive(PartialEq, Eq, Clone, Debug, Copy)]
pub enum NumericBehavior {
NonNumeric,
NumberIsInteger,
NumberIsFloat,
}
#[derive(PartialEq, Eq, Copy, Clone, Debug, Display)]
#[non_exhaustive]
pub enum ErrorKind {
ResponseError,
ParseError,
AuthenticationFailed,
PermissionDenied,
TypeError,
ExecAbortError,
BusyLoadingError,
NoScriptError,
InvalidClientConfig,
Moved,
Ask,
TryAgain,
ClusterDown,
CrossSlot,
MasterDown,
IoError,
FatalSendError,
FatalReceiveError,
ClientError,
ExtensionError,
ReadOnly,
MasterNameNotFoundBySentinel,
NoValidReplicasFoundBySentinel,
EmptySentinelList,
NotBusy,
AllConnectionsUnavailable,
ConnectionNotFoundForRoute,
RESP3NotSupported,
NotAllSlotsCovered,
UserOperationError,
ProtocolDesync,
}
#[derive(PartialEq, Clone)]
pub enum Value {
Nil,
Int(i64),
BulkString(bytes::Bytes),
Array(Vec<Result<Value>>),
SimpleString(String),
Okay,
Map(Vec<(Value, Value)>),
Attribute {
data: Box<Value>,
attributes: Vec<(Value, Value)>,
},
Set(Vec<Value>),
Double(f64),
Boolean(bool),
VerbatimString {
format: VerbatimFormat,
text: String,
},
BigNumber(BigInt),
Push {
kind: PushKind,
data: Vec<Value>,
},
}
#[derive(PartialEq, Clone, Debug)]
pub enum VerbatimFormat {
Unknown(String),
Markdown,
Text,
}
#[derive(PartialEq, Clone, Debug)]
pub enum PushKind {
Disconnection,
Other(String),
Invalidate,
Message,
PMessage,
SMessage,
Unsubscribe,
PUnsubscribe,
SUnsubscribe,
Subscribe,
PSubscribe,
SSubscribe,
}
impl PushKind {
pub(crate) fn has_reply(&self) -> bool {
matches!(
self,
&PushKind::Unsubscribe
| &PushKind::PUnsubscribe
| &PushKind::Subscribe
| &PushKind::PSubscribe
| &PushKind::SSubscribe
)
}
}
impl fmt::Display for VerbatimFormat {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
VerbatimFormat::Markdown => write!(f, "mkd"),
VerbatimFormat::Unknown(val) => write!(f, "{val}"),
VerbatimFormat::Text => write!(f, "txt"),
}
}
}
impl fmt::Display for PushKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PushKind::Other(kind) => write!(f, "{kind}"),
PushKind::Invalidate => write!(f, "invalidate"),
PushKind::Message => write!(f, "message"),
PushKind::PMessage => write!(f, "pmessage"),
PushKind::SMessage => write!(f, "smessage"),
PushKind::Unsubscribe => write!(f, "unsubscribe"),
PushKind::PUnsubscribe => write!(f, "punsubscribe"),
PushKind::SUnsubscribe => write!(f, "sunsubscribe"),
PushKind::Subscribe => write!(f, "subscribe"),
PushKind::PSubscribe => write!(f, "psubscribe"),
PushKind::SSubscribe => write!(f, "ssubscribe"),
PushKind::Disconnection => write!(f, "disconnection"),
}
}
}
pub enum MapIter<'a> {
Map(std::slice::Iter<'a, (Value, Value)>),
}
impl<'a> Iterator for MapIter<'a> {
type Item = (&'a Value, &'a Value);
fn next(&mut self) -> Option<Self::Item> {
match self {
MapIter::Map(iter) => {
let (k, v) = iter.next()?;
Some((k, v))
}
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
match self {
MapIter::Map(iter) => iter.size_hint(),
}
}
}
pub enum OwnedMapIter {
Map(std::vec::IntoIter<(Value, Value)>),
}
impl Iterator for OwnedMapIter {
type Item = (Value, Value);
fn next(&mut self) -> Option<Self::Item> {
match self {
OwnedMapIter::Map(iter) => iter.next(),
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
match self {
OwnedMapIter::Map(iter) => iter.size_hint(),
}
}
}
impl Value {
pub fn looks_like_cursor(&self) -> bool {
match *self {
Value::Array(ref items) => {
if items.len() != 2 {
return false;
}
matches!(items[0], Ok(Value::BulkString(_))) && matches!(items[1], Ok(Value::Array(_)))
}
_ => false,
}
}
pub fn as_sequence(&self) -> Option<&Vec<Result<Value>>> {
match self {
Value::Array(items) => Some(items),
_ => None,
}
}
pub fn as_plain_sequence(&self) -> Option<&[Value]> {
match self {
Value::Set(items) => Some(&items[..]),
Value::Nil => Some(&[]),
_ => None,
}
}
pub fn into_sequence(self) -> std::result::Result<Vec<Result<Value>>, Value> {
match self {
Value::Array(items) => Ok(items),
Value::Set(items) => Ok(items.into_iter().map(Ok).collect()),
Value::Nil => Ok(vec![]),
_ => Err(self),
}
}
pub fn as_map_iter(&self) -> Option<MapIter<'_>> {
match self {
Value::Map(items) => Some(MapIter::Map(items.iter())),
_ => None,
}
}
pub fn into_map_iter(self) -> std::result::Result<OwnedMapIter, Value> {
match self {
Value::Map(items) => Ok(OwnedMapIter::Map(items.into_iter())),
_ => Err(self),
}
}
pub fn extract_error(self) -> Result<Self> {
match self {
Self::Array(val) => {
Self::first_error_in_results(&val)?;
Ok(Self::Array(val))
}
Self::Map(map) => Ok(Self::Map(Self::extract_error_map(map)?)),
Self::Attribute { data, attributes } => {
let data = Box::new((*data).extract_error()?);
let attributes = Self::extract_error_map(attributes)?;
Ok(Value::Attribute { data, attributes })
}
Self::Set(set) => Ok(Self::Set(Self::extract_error_plain_vec(set)?)),
Self::Push { kind, data } => Ok(Self::Push {
kind,
data: Self::extract_error_plain_vec(data)?,
}),
Value::BigNumber(_)
| Value::Boolean(_)
| Value::BulkString(_)
| Value::Double(_)
| Value::Int(_)
| Value::Nil
| Value::Okay
| Value::SimpleString(_)
| Value::VerbatimString { .. } => Ok(self),
}
}
fn first_error_in_results(items: &[Result<Value>]) -> Result<()> {
for item in items {
if let Err(e) = item {
return Err(e.clone());
}
}
Ok(())
}
pub fn extract_error_vec(vec: Vec<Result<Self>>) -> Result<Vec<Result<Self>>> {
Self::first_error_in_results(&vec)?;
Ok(vec)
}
fn extract_error_plain_vec(vec: Vec<Self>) -> Result<Vec<Self>> {
vec.into_iter()
.map(Self::extract_error)
.collect::<Result<Vec<_>>>()
}
fn extract_error_map(map: Vec<(Self, Self)>) -> Result<Vec<(Self, Self)>> {
let mut vec = Vec::with_capacity(map.len());
for (key, value) in map.into_iter() {
vec.push((key.extract_error()?, value.extract_error()?));
}
Ok(vec)
}
}
impl fmt::Debug for Value {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
Value::Nil => write!(fmt, "nil"),
Value::Int(val) => write!(fmt, "int({val:?})"),
Value::BulkString(ref val) => match from_utf8(val) {
Ok(x) => write!(fmt, "bulk-string('{x:?}')"),
Err(_) => write!(fmt, "binary-data({val:?})"),
},
Value::Array(ref values) => {
write!(fmt, "array(")?;
fmt.debug_list()
.entries(values.iter().map(|r| match r {
Ok(v) => format!("{v:?}"),
Err(e) => format!("err({e:?})"),
}))
.finish()?;
write!(fmt, ")")
}
Value::Push { ref kind, ref data } => write!(fmt, "push({kind:?}, {data:?})"),
Value::Okay => write!(fmt, "ok"),
Value::SimpleString(ref s) => write!(fmt, "simple-string({s:?})"),
Value::Map(ref values) => write!(fmt, "map({values:?})"),
Value::Attribute {
ref data,
attributes: _,
} => write!(fmt, "attribute({data:?})"),
Value::Set(ref values) => write!(fmt, "set({values:?})"),
Value::Double(ref d) => write!(fmt, "double({d:?})"),
Value::Boolean(ref b) => write!(fmt, "boolean({b:?})"),
Value::VerbatimString {
ref format,
ref text,
} => {
write!(fmt, "verbatim-string({format:?},{text:?})")
}
Value::BigNumber(ref m) => write!(fmt, "big-number({m:?})"),
}
}
}
pub struct Error {
repr: ErrorRepr,
}
#[derive(Debug)]
enum ErrorRepr {
WithDescription(ErrorKind, &'static str),
WithDescriptionAndDetail(ErrorKind, &'static str, String),
ExtensionError(String, String),
IoError(io::Error),
}
impl Clone for Error {
fn clone(&self) -> Self {
Error {
repr: match &self.repr {
ErrorRepr::WithDescription(k, s) => ErrorRepr::WithDescription(*k, s),
ErrorRepr::WithDescriptionAndDetail(k, s, d) => {
ErrorRepr::WithDescriptionAndDetail(*k, s, d.clone())
}
ErrorRepr::ExtensionError(c, d) => {
ErrorRepr::ExtensionError(c.clone(), d.clone())
}
ErrorRepr::IoError(e) => ErrorRepr::IoError(io::Error::from(e.kind())),
},
}
}
}
impl PartialEq for Error {
fn eq(&self, other: &Error) -> bool {
match (&self.repr, &other.repr) {
(&ErrorRepr::WithDescription(kind_a, _), &ErrorRepr::WithDescription(kind_b, _)) => {
kind_a == kind_b
}
(
&ErrorRepr::WithDescriptionAndDetail(kind_a, _, _),
&ErrorRepr::WithDescriptionAndDetail(kind_b, _, _),
) => kind_a == kind_b,
(ErrorRepr::ExtensionError(a, _), ErrorRepr::ExtensionError(b, _)) => *a == *b,
_ => false,
}
}
}
impl From<tokio::time::error::Elapsed> for Error {
fn from(_: tokio::time::error::Elapsed) -> Error {
Error::from(io::Error::from(io::ErrorKind::TimedOut))
}
}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Error {
Error {
repr: ErrorRepr::IoError(err),
}
}
}
impl From<Utf8Error> for Error {
fn from(_: Utf8Error) -> Error {
Error {
repr: ErrorRepr::WithDescription(ErrorKind::TypeError, "Invalid UTF-8"),
}
}
}
impl From<NulError> for Error {
fn from(err: NulError) -> Error {
Error {
repr: ErrorRepr::WithDescriptionAndDetail(
ErrorKind::TypeError,
"Value contains interior nul terminator",
err.to_string(),
),
}
}
}
impl From<rustls::Error> for Error {
fn from(err: rustls::Error) -> Error {
Error {
repr: ErrorRepr::WithDescriptionAndDetail(
ErrorKind::IoError,
"TLS error",
err.to_string(),
),
}
}
}
impl From<rustls_pki_types::InvalidDnsNameError> for Error {
fn from(err: rustls_pki_types::InvalidDnsNameError) -> Error {
Error {
repr: ErrorRepr::WithDescriptionAndDetail(
ErrorKind::IoError,
"TLS Error",
err.to_string(),
),
}
}
}
impl From<FromUtf8Error> for Error {
fn from(_: FromUtf8Error) -> Error {
Error {
repr: ErrorRepr::WithDescription(ErrorKind::TypeError, "Cannot convert from UTF-8"),
}
}
}
impl From<ParseIntError> for Error {
fn from(_: ParseIntError) -> Error {
Error {
repr: ErrorRepr::WithDescription(
ErrorKind::TypeError,
"Cannot parse string as an integer",
),
}
}
}
impl From<(ErrorKind, &'static str)> for Error {
fn from((kind, desc): (ErrorKind, &'static str)) -> Error {
Error {
repr: ErrorRepr::WithDescription(kind, desc),
}
}
}
impl From<(ErrorKind, &'static str, String)> for Error {
fn from((kind, desc, detail): (ErrorKind, &'static str, String)) -> Error {
Error {
repr: ErrorRepr::WithDescriptionAndDetail(kind, desc, detail),
}
}
}
impl error::Error for Error {
#[allow(deprecated)]
fn description(&self) -> &str {
match self.repr {
ErrorRepr::WithDescription(_, desc) => desc,
ErrorRepr::WithDescriptionAndDetail(_, desc, _) => desc,
ErrorRepr::ExtensionError(_, _) => "extension error",
ErrorRepr::IoError(ref err) => err.description(),
}
}
fn cause(&self) -> Option<&dyn error::Error> {
match self.repr {
ErrorRepr::IoError(ref err) => Some(err as &dyn error::Error),
_ => None,
}
}
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
match self.repr {
ErrorRepr::WithDescription(kind, desc) => {
desc.fmt(f)?;
f.write_str(" - ")?;
fmt::Debug::fmt(&kind, f)
}
ErrorRepr::WithDescriptionAndDetail(kind, desc, ref detail) => {
desc.fmt(f)?;
f.write_str(" - ")?;
fmt::Debug::fmt(&kind, f)?;
f.write_str(": ")?;
detail.fmt(f)
}
ErrorRepr::ExtensionError(ref code, ref detail) => {
code.fmt(f)?;
f.write_str(": ")?;
detail.fmt(f)
}
ErrorRepr::IoError(ref err) => err.fmt(f),
}
}
}
impl fmt::Debug for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
fmt::Display::fmt(self, f)
}
}
#[derive(PartialEq, Eq, Hash, Debug, Copy, Clone)]
pub(crate) enum RetryMethod {
Reconnect,
ReconnectAndRetry,
NoRetry,
RetryImmediately,
WaitAndRetry,
AskRedirect,
MovedRedirect,
WaitAndRetryOnPrimaryRedirectOnReplica,
RefreshSlotsAndRetry,
}
impl Error {
pub(crate) fn append_detail(&mut self, extra: &str) {
self.repr = match std::mem::replace(&mut self.repr, ErrorRepr::WithDescription(ErrorKind::ClientError, "")) {
ErrorRepr::WithDescription(k, s) => {
ErrorRepr::WithDescriptionAndDetail(k, s, extra.to_string())
}
ErrorRepr::WithDescriptionAndDetail(k, s, existing) => {
ErrorRepr::WithDescriptionAndDetail(k, s, format!("{existing}; {extra}"))
}
ErrorRepr::ExtensionError(code, existing) => {
ErrorRepr::ExtensionError(code, format!("{existing}; {extra}"))
}
ErrorRepr::IoError(_) => {
ErrorRepr::WithDescriptionAndDetail(ErrorKind::IoError, "IO error", extra.to_string())
}
};
}
pub fn kind(&self) -> ErrorKind {
match self.repr {
ErrorRepr::WithDescription(kind, _)
| ErrorRepr::WithDescriptionAndDetail(kind, _, _) => kind,
ErrorRepr::ExtensionError(_, _) => ErrorKind::ExtensionError,
ErrorRepr::IoError(_) => ErrorKind::IoError,
}
}
pub fn detail(&self) -> Option<&str> {
match self.repr {
ErrorRepr::WithDescriptionAndDetail(_, _, ref detail)
| ErrorRepr::ExtensionError(_, ref detail) => Some(detail.as_str()),
_ => None,
}
}
pub fn err_code(&self) -> &str {
self.code().unwrap_or("")
}
pub fn code(&self) -> Option<&str> {
match self.kind() {
ErrorKind::ResponseError => Some("ERR"),
ErrorKind::ExecAbortError => Some("EXECABORT"),
ErrorKind::BusyLoadingError => Some("LOADING"),
ErrorKind::NoScriptError => Some("NOSCRIPT"),
ErrorKind::Moved => Some("MOVED"),
ErrorKind::Ask => Some("ASK"),
ErrorKind::TryAgain => Some("TRYAGAIN"),
ErrorKind::ClusterDown => Some("CLUSTERDOWN"),
ErrorKind::CrossSlot => Some("CROSSSLOT"),
ErrorKind::MasterDown => Some("MASTERDOWN"),
ErrorKind::ReadOnly => Some("READONLY"),
ErrorKind::NotBusy => Some("NOTBUSY"),
ErrorKind::PermissionDenied => Some("NOPERM"),
_ => match self.repr {
ErrorRepr::ExtensionError(ref code, _) => Some(code),
_ => None,
},
}
}
pub fn category(&self) -> &str {
match self.kind() {
ErrorKind::ResponseError => "response error",
ErrorKind::AuthenticationFailed => "authentication failed",
ErrorKind::PermissionDenied => "permission denied",
ErrorKind::TypeError => "type error",
ErrorKind::ExecAbortError => "script execution aborted",
ErrorKind::BusyLoadingError => "busy loading",
ErrorKind::NoScriptError => "no script",
ErrorKind::InvalidClientConfig => "invalid client config",
ErrorKind::Moved => "key moved",
ErrorKind::Ask => "key moved (ask)",
ErrorKind::TryAgain => "try again",
ErrorKind::ClusterDown => "cluster down",
ErrorKind::CrossSlot => "cross-slot",
ErrorKind::MasterDown => "master down",
ErrorKind::IoError => "I/O error",
ErrorKind::FatalSendError => {
"failed to send the request to the server due to a fatal error - the request was not transmitted"
}
ErrorKind::FatalReceiveError => {
"a fatal error occurred while attempting to receive a response from the server"
}
ErrorKind::ExtensionError => "extension error",
ErrorKind::ClientError => "client error",
ErrorKind::ReadOnly => "read-only",
ErrorKind::MasterNameNotFoundBySentinel => "master name not found by sentinel",
ErrorKind::NoValidReplicasFoundBySentinel => "no valid replicas found by sentinel",
ErrorKind::EmptySentinelList => "empty sentinel list",
ErrorKind::NotBusy => "not busy",
ErrorKind::AllConnectionsUnavailable => "no valid connections remain in the cluster",
ErrorKind::ConnectionNotFoundForRoute => "No connection found for the requested route",
ErrorKind::RESP3NotSupported => "resp3 is not supported by server",
ErrorKind::ParseError => "parse error",
ErrorKind::NotAllSlotsCovered => "not all slots are covered",
ErrorKind::UserOperationError => "Wrong usage of management operation",
ErrorKind::ProtocolDesync => "Response processing has gotten out of sync",
}
}
pub fn is_io_error(&self) -> bool {
self.as_io_error().is_some()
}
pub(crate) fn as_io_error(&self) -> Option<&io::Error> {
match &self.repr {
ErrorRepr::IoError(e) => Some(e),
_ => None,
}
}
pub fn is_cluster_error(&self) -> bool {
matches!(
self.kind(),
ErrorKind::Moved | ErrorKind::Ask | ErrorKind::TryAgain | ErrorKind::ClusterDown
)
}
pub fn is_connection_refusal(&self) -> bool {
match self.repr {
ErrorRepr::IoError(ref err) => {
#[allow(clippy::match_like_matches_macro)]
match err.kind() {
io::ErrorKind::ConnectionRefused => true,
io::ErrorKind::NotFound => cfg!(unix),
_ => false,
}
}
_ => false,
}
}
pub fn is_timeout(&self) -> bool {
match self.repr {
ErrorRepr::IoError(ref err) => matches!(
err.kind(),
io::ErrorKind::TimedOut | io::ErrorKind::WouldBlock
),
_ => false,
}
}
pub fn is_connection_dropped(&self) -> bool {
if matches!(
self.kind(),
ErrorKind::FatalSendError | ErrorKind::FatalReceiveError
) {
return true;
}
match self.repr {
ErrorRepr::IoError(ref err) => matches!(
err.kind(),
io::ErrorKind::BrokenPipe
| io::ErrorKind::ConnectionReset
| io::ErrorKind::UnexpectedEof
),
_ => false,
}
}
pub fn is_unrecoverable_error(&self) -> bool {
match self.retry_method() {
RetryMethod::Reconnect => true,
RetryMethod::ReconnectAndRetry => true,
RetryMethod::NoRetry => false,
RetryMethod::RetryImmediately => false,
RetryMethod::WaitAndRetry => false,
RetryMethod::AskRedirect => false,
RetryMethod::MovedRedirect => false,
RetryMethod::WaitAndRetryOnPrimaryRedirectOnReplica => false,
RetryMethod::RefreshSlotsAndRetry => false,
}
}
pub fn redirect_node(&self) -> Option<(&str, u16)> {
match self.kind() {
ErrorKind::Ask | ErrorKind::Moved => (),
_ => return None,
}
let mut iter = self.detail()?.split_ascii_whitespace();
let slot_id: u16 = iter.next()?.parse().ok()?;
let addr = iter.next()?;
Some((addr, slot_id))
}
pub(crate) fn redirect(&self, should_exec_asking: bool) -> Option<Redirect> {
let node = self.redirect_node()?;
match self.kind() {
ErrorKind::Ask => Some(Redirect::Ask(node.0.to_string(), should_exec_asking)),
ErrorKind::Moved => Some(Redirect::Moved(node.0.to_string())),
_ => None,
}
}
pub(crate) fn clone_mostly(&self, ioerror_description: &'static str) -> Self {
let repr = match self.repr {
ErrorRepr::WithDescription(kind, desc) => ErrorRepr::WithDescription(kind, desc),
ErrorRepr::WithDescriptionAndDetail(kind, desc, ref detail) => {
ErrorRepr::WithDescriptionAndDetail(kind, desc, detail.clone())
}
ErrorRepr::ExtensionError(ref code, ref detail) => {
ErrorRepr::ExtensionError(code.clone(), detail.clone())
}
ErrorRepr::IoError(ref e) => ErrorRepr::IoError(io::Error::new(
e.kind(),
format!("{ioerror_description}: {e}"),
)),
};
Self { repr }
}
pub(crate) fn retry_method(&self) -> RetryMethod {
match self.kind() {
ErrorKind::Moved => RetryMethod::MovedRedirect,
ErrorKind::Ask => RetryMethod::AskRedirect,
ErrorKind::TryAgain => RetryMethod::WaitAndRetry,
ErrorKind::MasterDown => RetryMethod::WaitAndRetry,
ErrorKind::ClusterDown => RetryMethod::WaitAndRetry,
ErrorKind::MasterNameNotFoundBySentinel => RetryMethod::WaitAndRetry,
ErrorKind::NoValidReplicasFoundBySentinel => RetryMethod::WaitAndRetry,
ErrorKind::BusyLoadingError => RetryMethod::WaitAndRetryOnPrimaryRedirectOnReplica,
ErrorKind::ResponseError => RetryMethod::NoRetry,
ErrorKind::PermissionDenied => RetryMethod::NoRetry,
ErrorKind::ReadOnly => RetryMethod::RefreshSlotsAndRetry,
ErrorKind::ExtensionError => RetryMethod::NoRetry,
ErrorKind::ExecAbortError => RetryMethod::NoRetry,
ErrorKind::TypeError => RetryMethod::NoRetry,
ErrorKind::NoScriptError => RetryMethod::NoRetry,
ErrorKind::InvalidClientConfig => RetryMethod::NoRetry,
ErrorKind::CrossSlot => RetryMethod::NoRetry,
ErrorKind::ClientError => RetryMethod::NoRetry,
ErrorKind::EmptySentinelList => RetryMethod::NoRetry,
ErrorKind::NotBusy => RetryMethod::NoRetry,
ErrorKind::RESP3NotSupported => RetryMethod::NoRetry,
ErrorKind::ParseError => RetryMethod::Reconnect,
ErrorKind::AuthenticationFailed => RetryMethod::Reconnect,
ErrorKind::AllConnectionsUnavailable => RetryMethod::Reconnect,
ErrorKind::ConnectionNotFoundForRoute => RetryMethod::Reconnect,
ErrorKind::IoError => match &self.repr {
ErrorRepr::IoError(err) => match err.kind() {
io::ErrorKind::ConnectionRefused => RetryMethod::Reconnect,
io::ErrorKind::NotFound => RetryMethod::Reconnect,
io::ErrorKind::ConnectionReset => RetryMethod::Reconnect,
io::ErrorKind::ConnectionAborted => RetryMethod::Reconnect,
io::ErrorKind::NotConnected => RetryMethod::Reconnect,
io::ErrorKind::BrokenPipe => RetryMethod::Reconnect,
io::ErrorKind::UnexpectedEof => RetryMethod::Reconnect,
io::ErrorKind::PermissionDenied => RetryMethod::NoRetry,
io::ErrorKind::Unsupported => RetryMethod::NoRetry,
io::ErrorKind::TimedOut => RetryMethod::NoRetry,
_ => RetryMethod::RetryImmediately,
},
_ => RetryMethod::RetryImmediately,
},
ErrorKind::NotAllSlotsCovered => RetryMethod::NoRetry,
ErrorKind::FatalReceiveError => RetryMethod::Reconnect,
ErrorKind::FatalSendError => RetryMethod::ReconnectAndRetry,
ErrorKind::UserOperationError => RetryMethod::NoRetry,
ErrorKind::ProtocolDesync => RetryMethod::NoRetry,
}
}
}
pub fn make_extension_error(code: String, detail: Option<String>) -> Error {
Error {
repr: ErrorRepr::ExtensionError(
code,
match detail {
Some(x) => x,
None => "Unknown extension error encountered".to_string(),
},
),
}
}
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Clone)]
pub struct InfoDict {
map: HashMap<String, Value>,
}
impl InfoDict {
pub fn new(key_val_pairs: &str) -> InfoDict {
let mut map = HashMap::new();
for line in key_val_pairs.lines() {
if line.is_empty() || line.starts_with('#') {
continue;
}
let mut p = line.splitn(2, ':');
let (k, v) = match (p.next(), p.next()) {
(Some(k), Some(v)) => (k.to_string(), v.to_string()),
_ => continue,
};
map.insert(k, Value::SimpleString(v));
}
InfoDict { map }
}
pub fn get<T: FromValue>(&self, key: &str) -> Option<T> {
match self.find(&key) {
Some(x) => from_value(x).ok(),
None => None,
}
}
pub fn find(&self, key: &&str) -> Option<&Value> {
self.map.get(*key)
}
pub fn contains_key(&self, key: &&str) -> bool {
self.find(key).is_some()
}
pub fn len(&self) -> usize {
self.map.len()
}
pub fn is_empty(&self) -> bool {
self.map.is_empty()
}
}
impl Deref for InfoDict {
type Target = HashMap<String, Value>;
fn deref(&self) -> &Self::Target {
&self.map
}
}
pub trait Write {
fn write_arg(&mut self, arg: &[u8]);
fn write_arg_fmt(&mut self, arg: impl fmt::Display) {
self.write_arg(arg.to_string().as_bytes())
}
}
impl Write for Vec<Vec<u8>> {
fn write_arg(&mut self, arg: &[u8]) {
self.push(arg.to_owned());
}
fn write_arg_fmt(&mut self, arg: impl fmt::Display) {
self.push(arg.to_string().into_bytes())
}
}
pub trait ToArgs: Sized {
fn to_args(&self) -> Vec<Vec<u8>> {
let mut out = Vec::new();
self.write_args(&mut out);
out
}
fn write_args<W>(&self, out: &mut W)
where
W: ?Sized + Write;
fn describe_numeric_behavior(&self) -> NumericBehavior {
NumericBehavior::NonNumeric
}
fn is_single_arg(&self) -> bool {
true
}
#[doc(hidden)]
fn write_args_from_slice<W>(items: &[Self], out: &mut W)
where
W: ?Sized + Write,
{
Self::make_arg_iter_ref(items.iter(), out)
}
#[doc(hidden)]
fn make_arg_iter_ref<'a, I, W>(items: I, out: &mut W)
where
W: ?Sized + Write,
I: Iterator<Item = &'a Self>,
Self: 'a,
{
for item in items {
item.write_args(out);
}
}
#[doc(hidden)]
fn is_single_vec_arg(items: &[Self]) -> bool {
items.len() == 1 && items[0].is_single_arg()
}
}
macro_rules! itoa_based_to_args_impl {
($t:ty, $numeric:expr) => {
impl ToArgs for $t {
fn write_args<W>(&self, out: &mut W)
where
W: ?Sized + Write,
{
let mut buf = ::itoa::Buffer::new();
let s = buf.format(*self);
out.write_arg(s.as_bytes())
}
fn describe_numeric_behavior(&self) -> NumericBehavior {
$numeric
}
}
};
}
macro_rules! non_zero_itoa_based_to_args_impl {
($t:ty, $numeric:expr) => {
impl ToArgs for $t {
fn write_args<W>(&self, out: &mut W)
where
W: ?Sized + Write,
{
let mut buf = ::itoa::Buffer::new();
let s = buf.format(self.get());
out.write_arg(s.as_bytes())
}
fn describe_numeric_behavior(&self) -> NumericBehavior {
$numeric
}
}
};
}
macro_rules! ryu_based_to_redis_impl {
($t:ty, $numeric:expr) => {
impl ToArgs for $t {
fn write_args<W>(&self, out: &mut W)
where
W: ?Sized + Write,
{
let mut buf = ::ryu::Buffer::new();
let s = buf.format(*self);
out.write_arg(s.as_bytes())
}
fn describe_numeric_behavior(&self) -> NumericBehavior {
$numeric
}
}
};
}
impl ToArgs for u8 {
fn write_args<W>(&self, out: &mut W)
where
W: ?Sized + Write,
{
let mut buf = ::itoa::Buffer::new();
let s = buf.format(*self);
out.write_arg(s.as_bytes())
}
fn write_args_from_slice<W>(items: &[u8], out: &mut W)
where
W: ?Sized + Write,
{
out.write_arg(items);
}
fn is_single_vec_arg(_items: &[u8]) -> bool {
true
}
}
itoa_based_to_args_impl!(i8, NumericBehavior::NumberIsInteger);
itoa_based_to_args_impl!(i16, NumericBehavior::NumberIsInteger);
itoa_based_to_args_impl!(u16, NumericBehavior::NumberIsInteger);
itoa_based_to_args_impl!(i32, NumericBehavior::NumberIsInteger);
itoa_based_to_args_impl!(u32, NumericBehavior::NumberIsInteger);
itoa_based_to_args_impl!(i64, NumericBehavior::NumberIsInteger);
itoa_based_to_args_impl!(u64, NumericBehavior::NumberIsInteger);
itoa_based_to_args_impl!(isize, NumericBehavior::NumberIsInteger);
itoa_based_to_args_impl!(usize, NumericBehavior::NumberIsInteger);
non_zero_itoa_based_to_args_impl!(core::num::NonZeroU8, NumericBehavior::NumberIsInteger);
non_zero_itoa_based_to_args_impl!(core::num::NonZeroI8, NumericBehavior::NumberIsInteger);
non_zero_itoa_based_to_args_impl!(core::num::NonZeroU16, NumericBehavior::NumberIsInteger);
non_zero_itoa_based_to_args_impl!(core::num::NonZeroI16, NumericBehavior::NumberIsInteger);
non_zero_itoa_based_to_args_impl!(core::num::NonZeroU32, NumericBehavior::NumberIsInteger);
non_zero_itoa_based_to_args_impl!(core::num::NonZeroI32, NumericBehavior::NumberIsInteger);
non_zero_itoa_based_to_args_impl!(core::num::NonZeroU64, NumericBehavior::NumberIsInteger);
non_zero_itoa_based_to_args_impl!(core::num::NonZeroI64, NumericBehavior::NumberIsInteger);
non_zero_itoa_based_to_args_impl!(core::num::NonZeroUsize, NumericBehavior::NumberIsInteger);
non_zero_itoa_based_to_args_impl!(core::num::NonZeroIsize, NumericBehavior::NumberIsInteger);
ryu_based_to_redis_impl!(f32, NumericBehavior::NumberIsFloat);
ryu_based_to_redis_impl!(f64, NumericBehavior::NumberIsFloat);
impl ToArgs for bool {
fn write_args<W>(&self, out: &mut W)
where
W: ?Sized + Write,
{
out.write_arg(if *self { b"1" } else { b"0" })
}
}
impl ToArgs for String {
fn write_args<W>(&self, out: &mut W)
where
W: ?Sized + Write,
{
out.write_arg(self.as_bytes())
}
}
impl ToArgs for &str {
fn write_args<W>(&self, out: &mut W)
where
W: ?Sized + Write,
{
out.write_arg(self.as_bytes())
}
}
impl<T: ToArgs> ToArgs for Vec<T> {
fn write_args<W>(&self, out: &mut W)
where
W: ?Sized + Write,
{
ToArgs::write_args_from_slice(self, out)
}
fn is_single_arg(&self) -> bool {
ToArgs::is_single_vec_arg(&self[..])
}
}
impl<T: ToArgs> ToArgs for &[T] {
fn write_args<W>(&self, out: &mut W)
where
W: ?Sized + Write,
{
ToArgs::write_args_from_slice(self, out)
}
fn is_single_arg(&self) -> bool {
ToArgs::is_single_vec_arg(self)
}
}
impl<T: ToArgs> ToArgs for Option<T> {
fn write_args<W>(&self, out: &mut W)
where
W: ?Sized + Write,
{
if let Some(ref x) = *self {
x.write_args(out);
}
}
fn describe_numeric_behavior(&self) -> NumericBehavior {
match *self {
Some(ref x) => x.describe_numeric_behavior(),
None => NumericBehavior::NonNumeric,
}
}
fn is_single_arg(&self) -> bool {
match *self {
Some(ref x) => x.is_single_arg(),
None => false,
}
}
}
impl<T: ToArgs> ToArgs for &T {
fn write_args<W>(&self, out: &mut W)
where
W: ?Sized + Write,
{
(*self).write_args(out)
}
fn is_single_arg(&self) -> bool {
(*self).is_single_arg()
}
}
impl<T: ToArgs + Hash + Eq, S: BuildHasher + Default> ToArgs
for std::collections::HashSet<T, S>
{
fn write_args<W>(&self, out: &mut W)
where
W: ?Sized + Write,
{
ToArgs::make_arg_iter_ref(self.iter(), out)
}
fn is_single_arg(&self) -> bool {
self.len() <= 1
}
}
impl<T: ToArgs + Hash + Eq + Ord> ToArgs for BTreeSet<T> {
fn write_args<W>(&self, out: &mut W)
where
W: ?Sized + Write,
{
ToArgs::make_arg_iter_ref(self.iter(), out)
}
fn is_single_arg(&self) -> bool {
self.len() <= 1
}
}
impl<T: ToArgs + Hash + Eq + Ord, V: ToArgs> ToArgs for BTreeMap<T, V> {
fn write_args<W>(&self, out: &mut W)
where
W: ?Sized + Write,
{
for (key, value) in self {
assert!(key.is_single_arg() && value.is_single_arg());
key.write_args(out);
value.write_args(out);
}
}
fn is_single_arg(&self) -> bool {
self.len() <= 1
}
}
impl<T: ToArgs + Hash + Eq + Ord, V: ToArgs> ToArgs
for std::collections::HashMap<T, V>
{
fn write_args<W>(&self, out: &mut W)
where
W: ?Sized + Write,
{
for (key, value) in self {
assert!(key.is_single_arg() && value.is_single_arg());
key.write_args(out);
value.write_args(out);
}
}
fn is_single_arg(&self) -> bool {
self.len() <= 1
}
}
macro_rules! to_args_for_tuple {
() => ();
($($name:ident,)+) => (
#[doc(hidden)]
impl<$($name: ToArgs),*> ToArgs for ($($name,)*) {
#[allow(non_snake_case, unused_variables)]
fn write_args<W>(&self, out: &mut W) where W: ?Sized + Write {
let ($(ref $name,)*) = *self;
$($name.write_args(out);)*
}
#[allow(non_snake_case, unused_variables)]
fn is_single_arg(&self) -> bool {
let mut n = 0u32;
$(let $name = (); n += 1;)*
n == 1
}
}
to_args_for_tuple_peel!($($name,)*);
)
}
macro_rules! to_args_for_tuple_peel {
($name:ident, $($other:ident,)*) => (to_args_for_tuple!($($other,)*);)
}
to_args_for_tuple! { T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, }
impl<T: ToArgs, const N: usize> ToArgs for &[T; N] {
fn write_args<W>(&self, out: &mut W)
where
W: ?Sized + Write,
{
ToArgs::write_args_from_slice(self.as_slice(), out)
}
fn is_single_arg(&self) -> bool {
ToArgs::is_single_vec_arg(self.as_slice())
}
}
fn vec_to_array<T, const N: usize>(items: Vec<T>, original_value: &Value) -> Result<[T; N]> {
match items.try_into() {
Ok(array) => Ok(array),
Err(items) => {
let msg = format!(
"Response has wrong dimension, expected {N}, got {}",
items.len()
);
invalid_type_error!(original_value, msg)
}
}
}
impl<T: FromValue, const N: usize> FromValue for [T; N] {
fn from_value(value: &Value) -> Result<[T; N]> {
match *value {
Value::BulkString(ref bytes) => match FromValue::from_byte_vec(bytes) {
Some(items) => vec_to_array(items, value),
None => {
let msg = format!(
"Conversion to Array[{}; {N}] failed",
std::any::type_name::<T>()
);
invalid_type_error!(value, msg)
}
},
Value::Array(ref items) => {
let items = FromValue::from_result_values(items)?;
vec_to_array(items, value)
}
Value::Nil => vec_to_array(vec![], value),
_ => invalid_type_error!(value, "Response type not array compatible"),
}
}
}
pub trait FromValue: Sized {
fn from_value(v: &Value) -> Result<Self>;
fn from_owned_value(v: Value) -> Result<Self> {
Self::from_value(&v)
}
fn from_values(items: &[Value]) -> Result<Vec<Self>> {
items
.iter()
.map(FromValue::from_value)
.collect()
}
fn from_result_values(items: &[Result<Value>]) -> Result<Vec<Self>> {
items
.iter()
.map(|r| match r {
Ok(v) => FromValue::from_value(v),
Err(e) => Err(e.clone()),
})
.collect()
}
fn from_owned_values(items: Vec<Value>) -> Result<Vec<Self>> {
items
.into_iter()
.map(FromValue::from_owned_value)
.collect()
}
fn from_owned_result_values(items: Vec<Result<Value>>) -> Result<Vec<Self>> {
items
.into_iter()
.map(|r| match r {
Ok(v) => FromValue::from_owned_value(v),
Err(e) => Err(e),
})
.collect()
}
fn from_byte_vec(_vec: &[u8]) -> Option<Vec<Self>> {
Self::from_owned_value(Value::BulkString(bytes::Bytes::copy_from_slice(_vec)))
.map(|rv| vec![rv])
.ok()
}
fn from_owned_byte_vec(_vec: Vec<u8>) -> Result<Vec<Self>> {
Self::from_owned_value(Value::BulkString(bytes::Bytes::from(_vec)))
.map(|rv| vec![rv])
}
}
fn get_inner_value(v: &Value) -> &Value {
if let Value::Attribute {
data,
attributes: _,
} = v
{
data.as_ref()
} else {
v
}
}
fn get_owned_inner_value(v: Value) -> Value {
if let Value::Attribute {
data,
attributes: _,
} = v
{
*data
} else {
v
}
}
macro_rules! from_value_for_int_internal {
($t:ty, $v:expr) => {{
let v = if let Value::Attribute {
data,
attributes: _,
} = $v
{
data
} else {
$v
};
match *v {
Value::Int(val) => <$t>::try_from(val).map_err(|_| {
Error::from((
ErrorKind::TypeError,
"Response was of incompatible type",
format!(
"i64 value {} out of range for {} (response was {:?})",
val,
stringify!($t),
v,
),
))
}),
Value::SimpleString(ref s) => match s.parse::<$t>() {
Ok(rv) => Ok(rv),
Err(_) => invalid_type_error!(v, "Could not convert from string."),
},
Value::BulkString(ref bytes) => match from_utf8(bytes)?.parse::<$t>() {
Ok(rv) => Ok(rv),
Err(_) => invalid_type_error!(v, "Could not convert from string."),
},
Value::Double(val) => {
if val.is_nan() || val.is_infinite() {
invalid_type_error!(
v,
format!("f64 value {} cannot be converted to {}", val, stringify!($t))
)
} else {
let truncated = val as i128;
<$t>::try_from(truncated).map_err(|_| {
Error::from((
ErrorKind::TypeError,
"Response was of incompatible type",
format!(
"f64 value {} out of range for {} (response was {:?})",
val,
stringify!($t),
v,
),
))
})
}
}
_ => invalid_type_error!(v, "Response type not convertible to numeric."),
}
}};
}
macro_rules! from_value_for_float_internal {
($t:ty, $v:expr) => {{
let v = if let Value::Attribute {
data,
attributes: _,
} = $v
{
data
} else {
$v
};
match *v {
Value::Int(val) => Ok(val as $t),
Value::SimpleString(ref s) => match s.parse::<$t>() {
Ok(rv) => Ok(rv),
Err(_) => invalid_type_error!(v, "Could not convert from string."),
},
Value::BulkString(ref bytes) => match from_utf8(bytes)?.parse::<$t>() {
Ok(rv) => Ok(rv),
Err(_) => invalid_type_error!(v, "Could not convert from string."),
},
Value::Double(val) => Ok(val as $t),
_ => invalid_type_error!(v, "Response type not convertible to numeric."),
}
}};
}
macro_rules! from_value_for_int {
($t:ty) => {
impl FromValue for $t {
fn from_value(v: &Value) -> Result<$t> {
from_value_for_int_internal!($t, v)
}
}
};
}
macro_rules! from_value_for_float {
($t:ty) => {
impl FromValue for $t {
fn from_value(v: &Value) -> Result<$t> {
from_value_for_float_internal!($t, v)
}
}
};
}
impl FromValue for u8 {
fn from_value(v: &Value) -> Result<u8> {
from_value_for_int_internal!(u8, v)
}
fn from_byte_vec(vec: &[u8]) -> Option<Vec<u8>> {
Some(vec.to_vec())
}
fn from_owned_byte_vec(vec: Vec<u8>) -> Result<Vec<u8>> {
Ok(vec)
}
}
from_value_for_int!(i8);
from_value_for_int!(i16);
from_value_for_int!(u16);
from_value_for_int!(i32);
from_value_for_int!(u32);
from_value_for_int!(i64);
from_value_for_int!(u64);
from_value_for_int!(i128);
from_value_for_int!(u128);
from_value_for_float!(f32);
from_value_for_float!(f64);
from_value_for_int!(isize);
from_value_for_int!(usize);
impl FromValue for bool {
fn from_value(v: &Value) -> Result<bool> {
let v = get_inner_value(v);
match *v {
Value::Nil => Ok(false),
Value::Int(val) => Ok(val != 0),
Value::SimpleString(ref s) => {
if &s[..] == "1" {
Ok(true)
} else if &s[..] == "0" {
Ok(false)
} else {
invalid_type_error!(v, "Response status not valid boolean");
}
}
Value::BulkString(ref bytes) => {
if &**bytes == b"1" {
Ok(true)
} else if &**bytes == b"0" {
Ok(false)
} else {
invalid_type_error!(v, "Response type not bool compatible.");
}
}
Value::Boolean(b) => Ok(b),
Value::Okay => Ok(true),
_ => invalid_type_error!(v, "Response type not bool compatible."),
}
}
}
impl FromValue for CString {
fn from_value(v: &Value) -> Result<CString> {
let v = get_inner_value(v);
match *v {
Value::BulkString(ref bytes) => Ok(CString::new(&bytes[..])?),
Value::Okay => Ok(CString::new("OK")?),
Value::SimpleString(ref val) => Ok(CString::new(val.as_bytes())?),
_ => invalid_type_error!(v, "Response type not CString compatible."),
}
}
fn from_owned_value(v: Value) -> Result<CString> {
let v = get_owned_inner_value(v);
match v {
Value::BulkString(bytes) => Ok(CString::new(bytes.to_vec())?),
Value::Okay => Ok(CString::new("OK")?),
Value::SimpleString(val) => Ok(CString::new(val)?),
_ => invalid_type_error!(v, "Response type not CString compatible."),
}
}
}
impl FromValue for String {
fn from_value(v: &Value) -> Result<String> {
let v = get_inner_value(v);
match *v {
Value::BulkString(ref bytes) => Ok(from_utf8(bytes)?.to_string()),
Value::Okay => Ok("OK".to_string()),
Value::SimpleString(ref val) => Ok(val.to_string()),
Value::VerbatimString {
format: _,
ref text,
} => Ok(text.to_string()),
Value::Double(ref val) => Ok(val.to_string()),
Value::Int(val) => Ok(val.to_string()),
_ => invalid_type_error!(v, "Response type not string compatible."),
}
}
fn from_owned_value(v: Value) -> Result<String> {
let v = get_owned_inner_value(v);
match v {
Value::BulkString(bytes) => Ok(String::from_utf8(bytes.to_vec())?),
Value::Okay => Ok("OK".to_string()),
Value::SimpleString(val) => Ok(val),
Value::VerbatimString { format: _, text } => Ok(text),
Value::Double(val) => Ok(val.to_string()),
Value::Int(val) => Ok(val.to_string()),
_ => invalid_type_error!(v, "Response type not string compatible."),
}
}
}
macro_rules! from_vec_from_value {
(<$T:ident> $Type:ty) => {
from_vec_from_value!(<$T> $Type; Into::into);
};
(<$T:ident> $Type:ty; $convert:expr) => {
impl<$T: FromValue> FromValue for $Type {
fn from_value(v: &Value) -> Result<$Type> {
match v {
Value::BulkString(bytes) => match FromValue::from_byte_vec(bytes) {
Some(x) => Ok($convert(x)),
None => invalid_type_error!(
v,
format!("Conversion to {} failed.", std::any::type_name::<$Type>())
),
},
Value::Array(items) => FromValue::from_result_values(items).map($convert),
Value::Set(items) => FromValue::from_values(items).map($convert),
Value::Map(items) => {
let mut n: Vec<T> = vec![];
for item in items {
match FromValue::from_value(&Value::Map(vec![item.clone()])) {
Ok(v) => {
n.push(v);
}
Err(e) => {
return Err(e);
}
}
}
Ok($convert(n))
}
Value::Nil => Ok($convert(Vec::new())),
_ => invalid_type_error!(v, "Response type not vector compatible."),
}
}
fn from_owned_value(v: Value) -> Result<$Type> {
match v {
Value::BulkString(bytes) => FromValue::from_owned_byte_vec(bytes.to_vec().into()).map($convert),
Value::Array(items) => FromValue::from_owned_result_values(items).map($convert),
Value::Set(items) => FromValue::from_owned_values(items).map($convert),
Value::Map(items) => {
let mut n: Vec<T> = vec![];
for item in items {
match FromValue::from_owned_value(Value::Map(vec![item])) {
Ok(v) => {
n.push(v);
}
Err(e) => {
return Err(e);
}
}
}
Ok($convert(n))
}
Value::Nil => Ok($convert(Vec::new())),
_ => invalid_type_error!(v, "Response type not vector compatible."),
}
}
}
};
}
from_vec_from_value!(<T> Vec<T>);
from_vec_from_value!(<T> std::sync::Arc<[T]>);
from_vec_from_value!(<T> Box<[T]>; Vec::into_boxed_slice);
impl<K: FromValue + Eq + Hash, V: FromValue, S: BuildHasher + Default> FromValue
for std::collections::HashMap<K, V, S>
{
fn from_value(v: &Value) -> Result<std::collections::HashMap<K, V, S>> {
let v = get_inner_value(v);
match v {
Value::Nil => Ok(Default::default()),
Value::Array(items) => {
if items.len() % 2 != 0 {
return Err(invalid_type_error_inner!(v, "Response type not hashmap compatible"));
}
let mut map = std::collections::HashMap::with_capacity_and_hasher(items.len() / 2, S::default());
let mut iter = items.iter();
while let (Some(k_res), Some(v_res)) = (iter.next(), iter.next()) {
let k_val = k_res.as_ref().map_err(|e| e.clone())?;
let v_val = v_res.as_ref().map_err(|e| e.clone())?;
map.insert(from_value(k_val)?, from_value(v_val)?);
}
Ok(map)
}
_ => v
.as_map_iter()
.ok_or_else(|| {
invalid_type_error_inner!(v, "Response type not hashmap compatible")
})?
.map(|(k, v)| Ok((from_value(k)?, from_value(v)?)))
.collect(),
}
}
fn from_owned_value(v: Value) -> Result<std::collections::HashMap<K, V, S>> {
let v = get_owned_inner_value(v);
match v {
Value::Nil => Ok(Default::default()),
Value::Array(items) => {
if items.len() % 2 != 0 {
return Err(invalid_type_error_inner!(Value::Array(items), "Response type not hashmap compatible"));
}
let mut map = std::collections::HashMap::with_capacity_and_hasher(items.len() / 2, S::default());
let mut iter = items.into_iter();
while let (Some(k_res), Some(v_res)) = (iter.next(), iter.next()) {
map.insert(from_owned_value(k_res?)?, from_owned_value(v_res?)?);
}
Ok(map)
}
_ => v
.into_map_iter()
.map_err(|v| invalid_type_error_inner!(v, "Response type not hashmap compatible"))?
.map(|(k, v)| Ok((from_owned_value(k)?, from_owned_value(v)?)))
.collect(),
}
}
}
impl<K: FromValue + Eq + Hash, V: FromValue> FromValue for BTreeMap<K, V>
where
K: Ord,
{
fn from_value(v: &Value) -> Result<BTreeMap<K, V>> {
let v = get_inner_value(v);
match v {
Value::Array(items) => {
if items.len() % 2 != 0 {
return Err(invalid_type_error_inner!(v, "Response type not btreemap compatible"));
}
let mut map = BTreeMap::new();
let mut iter = items.iter();
while let (Some(k_res), Some(v_res)) = (iter.next(), iter.next()) {
let k_val = k_res.as_ref().map_err(|e| e.clone())?;
let v_val = v_res.as_ref().map_err(|e| e.clone())?;
map.insert(from_value(k_val)?, from_value(v_val)?);
}
Ok(map)
}
_ => v
.as_map_iter()
.ok_or_else(|| invalid_type_error_inner!(v, "Response type not btreemap compatible"))?
.map(|(k, v)| Ok((from_value(k)?, from_value(v)?)))
.collect(),
}
}
fn from_owned_value(v: Value) -> Result<BTreeMap<K, V>> {
let v = get_owned_inner_value(v);
match v {
Value::Array(items) => {
if items.len() % 2 != 0 {
return Err(invalid_type_error_inner!(Value::Array(items), "Response type not btreemap compatible"));
}
let mut map = BTreeMap::new();
let mut iter = items.into_iter();
while let (Some(k_res), Some(v_res)) = (iter.next(), iter.next()) {
map.insert(from_owned_value(k_res?)?, from_owned_value(v_res?)?);
}
Ok(map)
}
_ => v
.into_map_iter()
.map_err(|v| invalid_type_error_inner!(v, "Response type not btreemap compatible"))?
.map(|(k, v)| Ok((from_owned_value(k)?, from_owned_value(v)?)))
.collect(),
}
}
}
impl<T: FromValue + Eq + Hash, S: BuildHasher + Default> FromValue
for std::collections::HashSet<T, S>
{
fn from_value(v: &Value) -> Result<std::collections::HashSet<T, S>> {
let v = get_inner_value(v);
match v {
Value::Array(items) => items
.iter()
.map(|r| {
let val = r.as_ref().map_err(|e| e.clone())?;
from_value(val)
})
.collect(),
Value::Set(items) => items.iter().map(|item| from_value(item)).collect(),
Value::Nil => Ok(Default::default()),
_ => Err(invalid_type_error_inner!(v, "Response type not hashset compatible")),
}
}
fn from_owned_value(v: Value) -> Result<std::collections::HashSet<T, S>> {
let v = get_owned_inner_value(v);
match v {
Value::Array(items) => items
.into_iter()
.map(|r| from_owned_value(r?))
.collect(),
Value::Set(items) => items
.into_iter()
.map(|item| from_owned_value(item))
.collect(),
Value::Nil => Ok(Default::default()),
_ => Err(invalid_type_error_inner!(v, "Response type not hashset compatible")),
}
}
}
impl<T: FromValue + Eq + Hash> FromValue for BTreeSet<T>
where
T: Ord,
{
fn from_value(v: &Value) -> Result<BTreeSet<T>> {
let v = get_inner_value(v);
match v {
Value::Array(items) => items
.iter()
.map(|r| {
let val = r.as_ref().map_err(|e| e.clone())?;
from_value(val)
})
.collect(),
Value::Set(items) => items.iter().map(|item| from_value(item)).collect(),
Value::Nil => Ok(Default::default()),
_ => Err(invalid_type_error_inner!(v, "Response type not btree_set compatible")),
}
}
fn from_owned_value(v: Value) -> Result<BTreeSet<T>> {
let v = get_owned_inner_value(v);
match v {
Value::Array(items) => items
.into_iter()
.map(|r| from_owned_value(r?))
.collect(),
Value::Set(items) => items
.into_iter()
.map(|item| from_owned_value(item))
.collect(),
Value::Nil => Ok(Default::default()),
_ => Err(invalid_type_error_inner!(v, "Response type not btree_set compatible")),
}
}
}
impl FromValue for Value {
fn from_value(v: &Value) -> Result<Value> {
Ok(v.clone())
}
fn from_owned_value(v: Value) -> Result<Self> {
Ok(v)
}
}
impl FromValue for () {
fn from_value(_v: &Value) -> Result<()> {
Ok(())
}
}
macro_rules! from_value_for_tuple {
() => ();
($($name:ident,)+) => (
#[doc(hidden)]
impl<$($name: FromValue),*> FromValue for ($($name,)*) {
#[allow(non_snake_case, unused_variables)]
fn from_value(v: &Value) -> Result<($($name,)*)> {
let v = get_inner_value(v);
match *v {
Value::Array(ref items) => {
let mut n = 0;
$(let $name = (); n += 1;)*
if items.len() != n {
invalid_type_error!(v, "Array response of wrong dimension")
}
let mut i = 0;
Ok(($({let $name = (); let val = items[{ i += 1; i - 1 }].as_ref().map_err(|e| e.clone())?; from_value(
val)?},)*))
}
Value::Map(ref items) => {
let mut n = 0;
$(let $name = (); n += 1;)*
if n != 2 {
invalid_type_error!(v, "Map response of wrong dimension")
}
let mut flatten_items = vec![];
for (k,v) in items {
flatten_items.push(k);
flatten_items.push(v);
}
let mut i = 0;
Ok(($({let $name = (); from_value(
&flatten_items[{ i += 1; i - 1 }])?},)*))
}
_ => invalid_type_error!(v, "Not a Array response")
}
}
#[allow(non_snake_case, unused_variables)]
fn from_owned_value(v: Value) -> Result<($($name,)*)> {
let v = get_owned_inner_value(v);
match v {
Value::Array(mut items) => {
let mut n = 0;
$(let $name = (); n += 1;)*
if items.len() != n {
invalid_type_error!(Value::Array(items), "Array response of wrong dimension")
}
let mut i = 0;
Ok(($({let $name = (); from_owned_value(
::std::mem::replace(&mut items[{ i += 1; i - 1 }], Ok(Value::Nil))?
)?},)*))
}
Value::Map(items) => {
let mut n = 0;
$(let $name = (); n += 1;)*
if n != 2 {
invalid_type_error!(Value::Map(items), "Map response of wrong dimension")
}
let mut flatten_items = vec![];
for (k,v) in items {
flatten_items.push(k);
flatten_items.push(v);
}
let mut i = 0;
Ok(($({let $name = (); from_value(
&flatten_items[{ i += 1; i - 1 }])?},)*))
}
_ => invalid_type_error!(v, "Not a Array response")
}
}
#[allow(non_snake_case, unused_variables)]
fn from_values(items: &[Value]) -> Result<Vec<($($name,)*)>> {
let mut n = 0;
$(let $name = (); n += 1;)*
let mut rv = vec![];
if items.len() == 0 {
return Ok(rv)
}
for item in items {
match item {
Value::Array(ch) => {
let unwrapped: Vec<&Value> = ch.iter().map(|r| r.as_ref().map_err(|e| e.clone())).collect::<Result<Vec<_>>>()?;
if let [$($name),*] = &unwrapped[..] {
rv.push(($(from_value($name)?),*),)
} else {
unreachable!()
};
},
_ => {
fail!(Error::from((
ErrorKind::TypeError,
"Response was of incompatible type",
format!("Expected Array value for tuple deserialization, got {:?}", item),
)));
},
}
}
if !rv.is_empty(){
return Ok(rv);
}
if let [$($name),*] = items{
rv.push(($(from_value($name)?),*),);
return Ok(rv);
}
let chunks = items.chunks_exact(n);
let remainder = chunks.remainder();
if !remainder.is_empty() {
fail!(Error::from((
ErrorKind::TypeError,
"Response was of incompatible type",
format!(
"Item count {} is not a multiple of tuple size {} ({} trailing elements would be lost)",
items.len(), n, remainder.len()
),
)));
}
for chunk in items.chunks_exact(n) {
match chunk {
[$($name),*] => rv.push(($(from_value($name)?),*),),
_ => {},
}
}
Ok(rv)
}
#[allow(non_snake_case, unused_variables)]
fn from_owned_values(mut items: Vec<Value>) -> Result<Vec<($($name,)*)>> {
let mut n = 0;
$(let $name = (); n += 1;)*
let mut rv = vec![];
if items.len() == 0 {
return Ok(rv)
}
for item in items.iter() {
match item {
Value::Array(ch) => {
let unwrapped: Vec<&Value> = ch.iter().map(|r| r.as_ref().map_err(|e| e.clone())).collect::<Result<Vec<_>>>()?;
if let [$($name),*] = &unwrapped[..] {
rv.push(($(from_value($name)?),*),)
} else {
unreachable!()
};
},
_ => {
fail!(Error::from((
ErrorKind::TypeError,
"Response was of incompatible type",
format!("Expected Array value for tuple deserialization, got {:?}", item),
)));
},
}
}
if !rv.is_empty(){
return Ok(rv);
}
let mut rv = Vec::with_capacity(items.len() / n);
if items.len() == 0 {
return Ok(rv)
}
let remainder = items.len() % n;
if remainder != 0 {
fail!(Error::from((
ErrorKind::TypeError,
"Response was of incompatible type",
format!(
"Item count {} is not a multiple of tuple size {} ({} trailing elements would be lost)",
items.len(), n, remainder
),
)));
}
for chunk in items.chunks_mut(n) {
match chunk {
[$($name),*] => rv.push(($(from_owned_value(std::mem::replace($name, Value::Nil))?),*),),
_ => unreachable!(),
}
}
Ok(rv)
}
}
from_value_for_tuple_peel!($($name,)*);
)
}
macro_rules! from_value_for_tuple_peel {
($name:ident, $($other:ident,)*) => (from_value_for_tuple!($($other,)*);)
}
from_value_for_tuple! { T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, }
impl FromValue for InfoDict {
fn from_value(v: &Value) -> Result<InfoDict> {
let v = get_inner_value(v);
let s: String = from_value(v)?;
Ok(InfoDict::new(&s))
}
fn from_owned_value(v: Value) -> Result<InfoDict> {
let v = get_owned_inner_value(v);
let s: String = from_owned_value(v)?;
Ok(InfoDict::new(&s))
}
}
impl<T: FromValue> FromValue for Option<T> {
fn from_value(v: &Value) -> Result<Option<T>> {
let v = get_inner_value(v);
if *v == Value::Nil {
return Ok(None);
}
Ok(Some(from_value(v)?))
}
fn from_owned_value(v: Value) -> Result<Option<T>> {
let v = get_owned_inner_value(v);
if v == Value::Nil {
return Ok(None);
}
Ok(Some(from_owned_value(v)?))
}
}
impl FromValue for bytes::Bytes {
fn from_value(v: &Value) -> Result<Self> {
let v = get_inner_value(v);
match v {
Value::BulkString(bytes_vec) => Ok(bytes::Bytes::copy_from_slice(bytes_vec.as_ref())),
_ => invalid_type_error!(v, "Not a bulk string"),
}
}
fn from_owned_value(v: Value) -> Result<Self> {
let v = get_owned_inner_value(v);
match v {
Value::BulkString(bytes_vec) => Ok(bytes_vec),
_ => invalid_type_error!(v, "Not a bulk string"),
}
}
}
pub fn from_value<T: FromValue>(v: &Value) -> Result<T> {
FromValue::from_value(v)
}
pub fn from_owned_value<T: FromValue>(v: Value) -> Result<T> {
FromValue::from_owned_value(v)
}
#[derive(Clone, Eq, PartialEq, Default, Debug, Copy)]
#[repr(C)]
pub enum ProtocolVersion {
RESP2,
#[default]
RESP3,
}
struct InflightSlotGuard(Arc<AtomicIsize>);
impl Drop for InflightSlotGuard {
fn drop(&mut self) {
self.0.fetch_add(1, Ordering::SeqCst);
}
}
#[derive(Clone)]
pub struct InflightRequestTracker {
_guard: Arc<InflightSlotGuard>,
}
impl InflightRequestTracker {
pub fn try_new(counter: Arc<AtomicIsize>) -> Option<Self> {
loop {
let current = counter.load(Ordering::SeqCst);
if current <= 0 {
return None;
}
if counter
.compare_exchange(current, current - 1, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
return Some(Self {
_guard: Arc::new(InflightSlotGuard(counter)),
});
}
}
}
}
#[cfg(test)]
mod inflight_tracker_tests {
use super::*;
#[test]
fn tracker_reserves_and_releases_slot() {
let counter = Arc::new(AtomicIsize::new(5));
let tracker = InflightRequestTracker::try_new(counter.clone()).unwrap();
assert_eq!(counter.load(Ordering::Relaxed), 4); drop(tracker);
assert_eq!(counter.load(Ordering::Relaxed), 5); }
#[test]
fn try_new_returns_none_when_no_slots() {
let counter = Arc::new(AtomicIsize::new(0));
assert!(InflightRequestTracker::try_new(counter).is_none());
}
#[test]
fn cloned_tracker_releases_only_when_last_clone_drops() {
let counter = Arc::new(AtomicIsize::new(5));
let tracker = InflightRequestTracker::try_new(counter.clone()).unwrap();
assert_eq!(counter.load(Ordering::Relaxed), 4);
let clone1 = tracker.clone();
let clone2 = tracker.clone();
drop(clone1);
assert_eq!(counter.load(Ordering::Relaxed), 4);
drop(tracker);
assert_eq!(counter.load(Ordering::Relaxed), 4);
drop(clone2);
assert_eq!(counter.load(Ordering::Relaxed), 5); }
}