use crate::{
commands,
interfaces::ClientLike,
prelude::{Error, FredResult, Key},
types::{
timeseries::{
Aggregator,
DuplicatePolicy,
Encoding,
GetLabels,
GetTimestamp,
GroupBy,
RangeAggregation,
Timestamp,
},
FromValue,
Map,
},
};
use bytes_utils::Str;
use fred_macros::rm_send_if;
use futures::Future;
#[cfg_attr(docsrs, doc(cfg(feature = "i-time-series")))]
#[rm_send_if(feature = "glommio")]
pub trait TimeSeriesInterface: ClientLike {
fn ts_add<R, K, T, L>(
&self,
key: K,
timestamp: T,
value: f64,
retention: Option<u64>,
encoding: Option<Encoding>,
chunk_size: Option<u64>,
on_duplicate: Option<DuplicatePolicy>,
labels: L,
) -> impl Future<Output = FredResult<R>> + Send
where
R: FromValue,
K: Into<Key> + Send,
T: TryInto<Timestamp> + Send,
T::Error: Into<Error> + Send,
L: TryInto<Map> + Send,
L::Error: Into<Error>,
{
async move {
into!(key);
try_into!(timestamp, labels);
commands::timeseries::ts_add(
self,
key,
timestamp,
value,
retention,
encoding,
chunk_size,
on_duplicate,
labels,
)
.await?
.convert()
}
}
fn ts_alter<R, K, L>(
&self,
key: K,
retention: Option<u64>,
chunk_size: Option<u64>,
duplicate_policy: Option<DuplicatePolicy>,
labels: L,
) -> impl Future<Output = FredResult<R>> + Send
where
R: FromValue,
K: Into<Key> + Send,
L: TryInto<Map> + Send,
L::Error: Into<Error>,
{
async move {
into!(key);
try_into!(labels);
commands::timeseries::ts_alter(self, key, retention, chunk_size, duplicate_policy, labels)
.await?
.convert()
}
}
fn ts_create<R, K, L>(
&self,
key: K,
retention: Option<u64>,
encoding: Option<Encoding>,
chunk_size: Option<u64>,
duplicate_policy: Option<DuplicatePolicy>,
labels: L,
) -> impl Future<Output = FredResult<R>> + Send
where
R: FromValue,
K: Into<Key> + Send,
L: TryInto<Map> + Send,
L::Error: Into<Error>,
{
async move {
into!(key);
try_into!(labels);
commands::timeseries::ts_create(self, key, retention, encoding, chunk_size, duplicate_policy, labels)
.await?
.convert()
}
}
fn ts_createrule<R, S, D>(
&self,
src: S,
dest: D,
aggregation: (Aggregator, u64),
align_timestamp: Option<u64>,
) -> impl Future<Output = FredResult<R>> + Send
where
R: FromValue,
S: Into<Key> + Send,
D: Into<Key> + Send,
{
async move {
into!(src, dest);
commands::timeseries::ts_createrule(self, src, dest, aggregation, align_timestamp)
.await?
.convert()
}
}
fn ts_decrby<R, K, L>(
&self,
key: K,
subtrahend: f64,
timestamp: Option<Timestamp>,
retention: Option<u64>,
uncompressed: bool,
chunk_size: Option<u64>,
labels: L,
) -> impl Future<Output = FredResult<R>> + Send
where
R: FromValue,
K: Into<Key> + Send,
L: TryInto<Map> + Send,
L::Error: Into<Error> + Send,
{
async move {
into!(key);
try_into!(labels);
commands::timeseries::ts_decrby(
self,
key,
subtrahend,
timestamp,
retention,
uncompressed,
chunk_size,
labels,
)
.await?
.convert()
}
}
fn ts_del<R, K>(&self, key: K, from: i64, to: i64) -> impl Future<Output = FredResult<R>> + Send
where
R: FromValue,
K: Into<Key> + Send,
{
async move {
into!(key);
commands::timeseries::ts_del(self, key, from, to).await?.convert()
}
}
fn ts_deleterule<R, S, D>(&self, src: S, dest: D) -> impl Future<Output = FredResult<R>> + Send
where
R: FromValue,
S: Into<Key> + Send,
D: Into<Key> + Send,
{
async move {
into!(src, dest);
commands::timeseries::ts_deleterule(self, src, dest).await?.convert()
}
}
fn ts_get<R, K>(&self, key: K, latest: bool) -> impl Future<Output = FredResult<R>> + Send
where
R: FromValue,
K: Into<Key> + Send,
{
async move {
into!(key);
commands::timeseries::ts_get(self, key, latest).await?.convert()
}
}
fn ts_incrby<R, K, L>(
&self,
key: K,
addend: f64,
timestamp: Option<Timestamp>,
retention: Option<u64>,
uncompressed: bool,
chunk_size: Option<u64>,
labels: L,
) -> impl Future<Output = FredResult<R>> + Send
where
R: FromValue,
K: Into<Key> + Send,
L: TryInto<Map> + Send,
L::Error: Into<Error> + Send,
{
async move {
into!(key);
try_into!(labels);
commands::timeseries::ts_incrby(
self,
key,
addend,
timestamp,
retention,
uncompressed,
chunk_size,
labels,
)
.await?
.convert()
}
}
fn ts_info<R, K>(&self, key: K, debug: bool) -> impl Future<Output = FredResult<R>> + Send
where
R: FromValue,
K: Into<Key> + Send,
{
async move {
into!(key);
commands::timeseries::ts_info(self, key, debug).await?.convert()
}
}
fn ts_madd<R, K, I>(&self, samples: I) -> impl Future<Output = FredResult<R>> + Send
where
R: FromValue,
K: Into<Key> + Send,
I: IntoIterator<Item = (K, Timestamp, f64)> + Send,
{
async move {
let samples: Vec<_> = samples
.into_iter()
.map(|(key, ts, val)| (key.into(), ts, val))
.collect();
commands::timeseries::ts_madd(self, samples).await?.convert()
}
}
fn ts_mget<R, L, S, I>(
&self,
latest: bool,
labels: Option<L>,
filters: I,
) -> impl Future<Output = FredResult<R>> + Send
where
R: FromValue,
L: Into<GetLabels> + Send,
S: Into<Str> + Send,
I: IntoIterator<Item = S> + Send,
{
async move {
let labels = labels.map(|l| l.into());
let filters = filters.into_iter().map(|s| s.into()).collect();
commands::timeseries::ts_mget(self, latest, labels, filters)
.await?
.convert()
}
}
fn ts_mrange<R, F, T, I, S, J>(
&self,
from: F,
to: T,
latest: bool,
filter_by_ts: I,
filter_by_value: Option<(i64, i64)>,
labels: Option<GetLabels>,
count: Option<u64>,
aggregation: Option<RangeAggregation>,
filters: J,
group_by: Option<GroupBy>,
) -> impl Future<Output = FredResult<R>> + Send
where
R: FromValue,
F: TryInto<GetTimestamp> + Send,
F::Error: Into<Error> + Send,
T: TryInto<GetTimestamp> + Send,
T::Error: Into<Error> + Send,
S: Into<Str> + Send,
I: IntoIterator<Item = i64> + Send,
J: IntoIterator<Item = S> + Send,
{
async move {
try_into!(from, to);
let filters = filters.into_iter().map(|s| s.into()).collect();
let filter_by_ts = filter_by_ts.into_iter().collect();
commands::timeseries::ts_mrange(
self,
from,
to,
latest,
filter_by_ts,
filter_by_value,
labels,
count,
aggregation,
filters,
group_by,
)
.await?
.convert()
}
}
fn ts_mrevrange<R, F, T, I, S, J>(
&self,
from: F,
to: T,
latest: bool,
filter_by_ts: I,
filter_by_value: Option<(i64, i64)>,
labels: Option<GetLabels>,
count: Option<u64>,
aggregation: Option<RangeAggregation>,
filters: J,
group_by: Option<GroupBy>,
) -> impl Future<Output = FredResult<R>> + Send
where
R: FromValue,
F: TryInto<GetTimestamp> + Send,
F::Error: Into<Error> + Send,
T: TryInto<GetTimestamp> + Send,
T::Error: Into<Error> + Send,
S: Into<Str> + Send,
I: IntoIterator<Item = i64> + Send,
J: IntoIterator<Item = S> + Send,
{
async move {
try_into!(from, to);
let filters = filters.into_iter().map(|s| s.into()).collect();
let filter_by_ts = filter_by_ts.into_iter().collect();
commands::timeseries::ts_mrevrange(
self,
from,
to,
latest,
filter_by_ts,
filter_by_value,
labels,
count,
aggregation,
filters,
group_by,
)
.await?
.convert()
}
}
fn ts_queryindex<R, S, I>(&self, filters: I) -> impl Future<Output = FredResult<R>> + Send
where
R: FromValue,
S: Into<Str> + Send,
I: IntoIterator<Item = S> + Send,
{
async move {
let filters = filters.into_iter().map(|s| s.into()).collect();
commands::timeseries::ts_queryindex(self, filters).await?.convert()
}
}
fn ts_range<R, K, F, T, I>(
&self,
key: K,
from: F,
to: T,
latest: bool,
filter_by_ts: I,
filter_by_value: Option<(i64, i64)>,
count: Option<u64>,
aggregation: Option<RangeAggregation>,
) -> impl Future<Output = FredResult<R>> + Send
where
R: FromValue,
K: Into<Key> + Send,
F: TryInto<GetTimestamp> + Send,
F::Error: Into<Error> + Send,
T: TryInto<GetTimestamp> + Send,
T::Error: Into<Error> + Send,
I: IntoIterator<Item = i64> + Send,
{
async move {
into!(key);
try_into!(from, to);
let filter_by_ts = filter_by_ts.into_iter().collect();
commands::timeseries::ts_range(
self,
key,
from,
to,
latest,
filter_by_ts,
filter_by_value,
count,
aggregation,
)
.await?
.convert()
}
}
fn ts_revrange<R, K, F, T, I>(
&self,
key: K,
from: F,
to: T,
latest: bool,
filter_by_ts: I,
filter_by_value: Option<(i64, i64)>,
count: Option<u64>,
aggregation: Option<RangeAggregation>,
) -> impl Future<Output = FredResult<R>> + Send
where
R: FromValue,
K: Into<Key> + Send,
F: TryInto<GetTimestamp> + Send,
F::Error: Into<Error> + Send,
T: TryInto<GetTimestamp> + Send,
T::Error: Into<Error> + Send,
I: IntoIterator<Item = i64> + Send,
{
async move {
into!(key);
try_into!(from, to);
let filter_by_ts = filter_by_ts.into_iter().collect();
commands::timeseries::ts_revrange(
self,
key,
from,
to,
latest,
filter_by_ts,
filter_by_value,
count,
aggregation,
)
.await?
.convert()
}
}
}