use redis::{
from_redis_value, FromRedisValue, RedisError, RedisResult, RedisWrite, ToRedisArgs, Value,
};
use std::collections::HashMap;
use std::str;
#[derive(PartialEq, Eq, Clone, Debug, Copy)]
pub enum TsAggregationType {
Avg(u64),
Sum(u64),
Min(u64),
Max(u64),
Range(u64),
Count(u64),
First(u64),
Last(u64),
StdP(u64),
StdS(u64),
VarP(u64),
VarS(u64),
}
impl ToRedisArgs for TsAggregationType {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
let (t, val) = match *self {
TsAggregationType::Avg(v) => ("avg", v),
TsAggregationType::Sum(v) => ("sum", v),
TsAggregationType::Min(v) => ("min", v),
TsAggregationType::Max(v) => ("max", v),
TsAggregationType::Range(v) => ("range", v),
TsAggregationType::Count(v) => ("count", v),
TsAggregationType::First(v) => ("first", v),
TsAggregationType::Last(v) => ("last", v),
TsAggregationType::StdP(v) => ("std.p", v),
TsAggregationType::StdS(v) => ("std.s", v),
TsAggregationType::VarP(v) => ("var.p", v),
TsAggregationType::VarS(v) => ("var.s", v),
};
out.write_arg(b"AGGREGATION");
out.write_arg(t.as_bytes());
val.write_redis_args(out);
}
}
#[derive(Default, Debug, Clone)]
pub struct TsOptions {
retention_time: Option<u64>,
uncompressed: bool,
labels: Option<Vec<Vec<u8>>>,
}
impl TsOptions {
pub fn retention_time(mut self, time: u64) -> Self {
self.retention_time = Some(time);
self
}
pub fn uncompressed(mut self, value: bool) -> Self {
self.uncompressed = value;
self
}
pub fn labels(mut self, labels: Vec<(&str, &str)>) -> Self {
if !labels.is_empty() {
self.labels = Some(ToRedisArgs::to_redis_args(&labels));
} else {
self.labels = None;
}
self
}
pub fn label(mut self, name: &str, value: &str) -> Self {
let mut l = ToRedisArgs::to_redis_args(&vec![(name, value)]);
let mut res: Vec<Vec<u8>> = vec![];
if let Some(mut cur) = self.labels {
res.append(&mut cur);
}
res.append(&mut l);
self.labels = Some(res);
self
}
}
impl ToRedisArgs for TsOptions {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
if let Some(ref rt) = self.retention_time {
out.write_arg(b"RETENTION");
out.write_arg(format!("{}", rt).as_bytes());
}
if self.uncompressed {
out.write_arg(b"UNCOMPRESSED");
}
if let Some(ref l) = self.labels {
out.write_arg(b"LABELS");
for arg in l {
out.write_arg(&arg);
}
}
}
}
#[derive(Debug, Default, Clone)]
pub struct TsFilterOptions {
with_labels: bool,
filters: Vec<TsFilter>,
}
impl TsFilterOptions {
pub fn with_labels(mut self, value: bool) -> Self {
self.with_labels = value;
self
}
pub fn equals<L: std::fmt::Display + ToRedisArgs, V: std::fmt::Display + ToRedisArgs>(
mut self,
name: L,
value: V,
) -> Self {
self.filters.push(TsFilter {
name: format!("{}", name),
value: format!("{}", value),
compare: TsCompare::Eq,
});
self
}
pub fn not_equals<L: std::fmt::Debug + ToRedisArgs, V: std::fmt::Debug + ToRedisArgs>(
mut self,
name: L,
value: V,
) -> Self {
self.filters.push(TsFilter {
name: format!("{:?}", name),
value: format!("{:?}", value),
compare: TsCompare::NotEq,
});
self
}
pub fn in_set<L: std::fmt::Debug + ToRedisArgs, V: std::fmt::Debug + ToRedisArgs>(
mut self,
name: L,
values: Vec<V>,
) -> Self {
let set = format!(
"({:?})",
values
.iter()
.map(|v| { format!("{:?}", v) })
.collect::<Vec<String>>()
.join(",")
);
self.filters.push(TsFilter {
name: format!("{:?}", name),
value: set,
compare: TsCompare::Eq,
});
self
}
pub fn not_in_set<L: std::fmt::Debug + ToRedisArgs, V: std::fmt::Debug + ToRedisArgs>(
mut self,
name: L,
values: Vec<V>,
) -> Self {
let set = format!(
"({:?})",
values
.iter()
.map(|v| { format!("{:?}", v) })
.collect::<Vec<String>>()
.join(",")
);
self.filters.push(TsFilter {
name: format!("{:?}", name),
value: set,
compare: TsCompare::NotEq,
});
self
}
pub fn has_label<L: std::fmt::Debug + ToRedisArgs>(mut self, name: L) -> Self {
self.filters.push(TsFilter {
name: format!("{:?}", name),
value: "".to_string(),
compare: TsCompare::NotEq,
});
self
}
pub fn not_has_label<L: std::fmt::Debug + ToRedisArgs>(mut self, name: L) -> Self {
self.filters.push(TsFilter {
name: format!("{:?}", name),
value: "".to_string(),
compare: TsCompare::Eq,
});
self
}
pub fn get_filters(self) -> Vec<TsFilter> {
self.filters
}
}
impl ToRedisArgs for TsFilterOptions {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
if self.with_labels {
out.write_arg(b"WITHLABELS");
}
out.write_arg(b"FILTER");
for f in self.filters.iter() {
f.write_redis_args(out)
}
}
}
#[derive(Debug, Default)]
pub struct TsInfo {
pub total_samples: u64,
pub memory_usage: u64,
pub first_timestamp: u64,
pub last_timestamp: u64,
pub retention_time: u64,
pub chunk_count: u64,
pub max_samples_per_chunk: u16,
pub labels: Vec<(String, String)>,
pub source_key: Option<String>,
pub rules: Vec<(String, u64, String)>,
}
impl FromRedisValue for TsInfo {
fn from_redis_value(v: &Value) -> RedisResult<Self> {
match *v {
Value::Bulk(ref values) => {
let mut result = TsInfo::default();
let mut map: HashMap<String, Value> = HashMap::new();
for pair in values.chunks(2) {
map.insert(from_redis_value(&pair[0])?, pair[1].clone());
}
if let Some(v) = map.get("totalSamples") {
result.total_samples = from_redis_value(v)?;
}
if let Some(v) = map.get("memoryUsage") {
result.memory_usage = from_redis_value(v)?;
}
if let Some(v) = map.get("firstTimestamp") {
result.first_timestamp = from_redis_value(v)?;
}
if let Some(v) = map.get("lastTimestamp") {
result.last_timestamp = from_redis_value(v)?;
}
if let Some(v) = map.get("retentionTime") {
result.retention_time = from_redis_value(v)?;
}
if let Some(v) = map.get("chunkCount") {
result.chunk_count = from_redis_value(v)?;
}
if let Some(v) = map.get("maxSamplesPerChunk") {
result.max_samples_per_chunk = from_redis_value(v)?;
}
if let Some(v) = map.get("sourceKey") {
result.source_key = from_redis_value(v)?;
}
result.rules = match map.get("rules") {
Some(Value::Bulk(ref values)) => values
.iter()
.flat_map(|value| match value {
Value::Bulk(ref vs) => Some((
from_redis_value(&vs[0]).unwrap(),
from_redis_value(&vs[1]).unwrap(),
from_redis_value(&vs[2]).unwrap(),
)),
_ => None,
})
.collect(),
_ => vec![],
};
result.labels = match map.get("labels") {
Some(Value::Bulk(ref values)) => values
.iter()
.flat_map(|value| match value {
Value::Bulk(ref vs) => Some((
from_redis_value(&vs[0]).unwrap(),
from_redis_value(&vs[1]).unwrap(),
)),
_ => None,
})
.collect(),
_ => vec![],
};
Ok(result)
}
_ => Err(RedisError::from(std::io::Error::new(
std::io::ErrorKind::Other,
"no_ts_info_data",
))),
}
}
}
#[derive(Debug)]
pub struct TsMget<TS: FromRedisValue, V: FromRedisValue> {
pub values: Vec<TsMgetEntry<TS, V>>,
}
impl<TS: std::default::Default + FromRedisValue, V: std::default::Default + FromRedisValue>
FromRedisValue for TsMget<TS, V>
{
fn from_redis_value(v: &Value) -> RedisResult<Self> {
let res = match *v {
Value::Bulk(ref values) => TsMget {
values: FromRedisValue::from_redis_values(values)?,
},
_ => TsMget { values: vec![] },
};
Ok(res)
}
}
#[derive(Debug, Default)]
pub struct TsMgetEntry<TS: FromRedisValue, V: FromRedisValue> {
pub key: String,
pub labels: Vec<(String, String)>,
pub value: Option<(TS, V)>,
}
impl<TS: std::default::Default + FromRedisValue, V: std::default::Default + FromRedisValue>
FromRedisValue for TsMgetEntry<TS, V>
{
fn from_redis_value(v: &Value) -> RedisResult<Self> {
match *v {
Value::Bulk(ref values) => {
let mut result = TsMgetEntry::<TS, V>::default();
result.key = from_redis_value(&values[0])?;
result.labels = match values[1] {
Value::Bulk(ref vs) => vs
.iter()
.flat_map(|value| match value {
Value::Bulk(ref v) => Some((
from_redis_value(&v[0]).unwrap(),
from_redis_value(&v[1]).unwrap(),
)),
_ => None,
})
.collect(),
_ => vec![],
};
result.value = match values[2] {
Value::Bulk(ref vs) if !vs.is_empty() => Some((
from_redis_value(&vs[0]).unwrap(),
from_redis_value(&vs[1]).unwrap(),
)),
_ => None,
};
Ok(result)
}
_ => Err(RedisError::from(std::io::Error::new(
std::io::ErrorKind::Other,
"no_mget_data",
))),
}
}
}
#[derive(Debug)]
pub struct TsRange<TS: FromRedisValue + Copy, V: FromRedisValue + Copy> {
pub values: Vec<(TS, V)>,
}
impl<TS: FromRedisValue + Copy, V: FromRedisValue + Copy> FromRedisValue for TsRange<TS, V> {
fn from_redis_value(v: &Value) -> RedisResult<Self> {
match *v {
Value::Bulk(ref values) => {
let items: Vec<TsValueReply<TS, V>> = FromRedisValue::from_redis_values(values)?;
Ok(TsRange {
values: items.iter().map(|i| (i.ts, i.value)).collect(),
})
}
_ => Err(RedisError::from(std::io::Error::new(
std::io::ErrorKind::Other,
"no_range_data",
))),
}
}
}
#[derive(Debug)]
pub struct TsMrange<TS: FromRedisValue + Copy, V: FromRedisValue + Copy> {
pub values: Vec<TsMrangeEntry<TS, V>>,
}
impl<
TS: std::default::Default + FromRedisValue + Copy,
V: std::default::Default + FromRedisValue + Copy,
> FromRedisValue for TsMrange<TS, V>
{
fn from_redis_value(v: &Value) -> RedisResult<Self> {
let res = match *v {
Value::Bulk(ref values) => TsMrange {
values: FromRedisValue::from_redis_values(values)?,
},
_ => TsMrange { values: vec![] },
};
Ok(res)
}
}
#[derive(Debug, Default)]
pub struct TsMrangeEntry<TS: FromRedisValue + Copy, V: FromRedisValue + Copy> {
pub key: String,
pub labels: Vec<(String, String)>,
pub values: Vec<(TS, V)>,
}
impl<
TS: std::default::Default + FromRedisValue + Copy,
V: std::default::Default + FromRedisValue + Copy,
> FromRedisValue for TsMrangeEntry<TS, V>
{
fn from_redis_value(v: &Value) -> RedisResult<Self> {
match *v {
Value::Bulk(ref values) => {
let mut result = TsMrangeEntry::<TS, V>::default();
result.key = from_redis_value(&values[0]).unwrap();
result.labels = match values[1] {
Value::Bulk(ref vs) => vs
.iter()
.flat_map(|value| match value {
Value::Bulk(ref v) => Some((
from_redis_value(&v[0]).unwrap(),
from_redis_value(&v[1]).unwrap(),
)),
_ => None,
})
.collect(),
_ => vec![],
};
result.values = match values[2] {
Value::Bulk(ref vs) => {
let items: Vec<TsValueReply<TS, V>> =
FromRedisValue::from_redis_values(&vs)?;
items.iter().map(|i| (i.ts, i.value)).collect()
}
_ => vec![],
};
Ok(result)
}
_ => Err(RedisError::from(std::io::Error::new(
std::io::ErrorKind::Other,
"no_mget_data",
))),
}
}
}
#[derive(Debug)]
struct TsValueReply<TS: FromRedisValue, V: FromRedisValue> {
pub ts: TS,
pub value: V,
}
impl<TS: FromRedisValue, V: FromRedisValue> FromRedisValue for TsValueReply<TS, V> {
fn from_redis_value(v: &Value) -> RedisResult<Self> {
match *v {
Value::Bulk(ref values) => Ok(TsValueReply {
ts: from_redis_value(&values[0]).unwrap(),
value: from_redis_value(&values[1]).unwrap(),
}),
_ => Err(RedisError::from(std::io::Error::new(
std::io::ErrorKind::Other,
"no_value_data",
))),
}
}
}
#[derive(PartialEq, Eq, Clone, Debug, Copy)]
enum TsCompare {
Eq,
NotEq,
}
impl ToRedisArgs for TsCompare {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
let val = match *self {
TsCompare::Eq => "=",
TsCompare::NotEq => "!=",
};
val.write_redis_args(out);
}
}
#[derive(Debug, Clone)]
pub struct TsFilter {
name: String,
value: String,
compare: TsCompare,
}
impl ToRedisArgs for TsFilter {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
let comp = match self.compare {
TsCompare::Eq => "=",
TsCompare::NotEq => "!=",
};
let arg = format!("{}{}{}", self.name, comp, self.value);
out.write_arg(arg.as_bytes());
}
}