use core::{fmt, time};
use core::future::Future;
use std::borrow::Cow;
use redis::{Cmd, ToRedisArgs, RedisError, FromRedisValue};
use redis::aio::ConnectionManager;
use crate::types::{idents, RedisType, TimestampId, StreamId, TrimMethod, GroupInfo, PendingStats, EntryValue, PendingEntry, PendingParams, PendingParamsConfig, FetchParams, FetchParamsConfig, FetchResult, FetchEntries};
use crate::iters::{FetchIter, PendingIter};
#[derive(Clone)]
pub struct QueueConfig {
pub stream: Cow<'static, str>,
}
#[derive(Clone)]
pub struct Queue {
config: QueueConfig,
conn: ConnectionManager,
}
impl Queue {
pub fn new(config: QueueConfig, conn: ConnectionManager) -> Self {
Self {
config,
conn,
}
}
fn cmd(&self, name: &str) -> Cmd {
let mut cmd = Cmd::new();
cmd.arg(name).arg(self.config.stream.as_ref());
cmd
}
#[inline(always)]
pub fn connection(&self) -> ConnectionManager {
self.conn.clone()
}
pub async fn create_group(&self, group: &str) -> Result<(), RedisError> {
let mut conn = self.connection();
let mut cmd = self.cmd(idents::TYPE);
let typ: RedisType = cmd.query_async(&mut conn).await?;
match typ {
RedisType::Stream | RedisType::None => (),
_ => return Err((redis::ErrorKind::ClientError, "key is already used by non-stream type").into()),
}
cmd = Cmd::new();
cmd.arg(idents::XGROUP)
.arg(idents::CREATE)
.arg(self.config.stream.as_ref())
.arg(group)
.arg("$")
.arg(idents::MKSTREAM);
match cmd.query_async(&mut conn).await {
Ok(()) => Ok(()),
Err(error) => match error.code() {
Some(idents::BUSYGROUP) => Ok(()),
_ => Err(error),
},
}
}
async fn inner_time_ref(conn: &mut ConnectionManager) -> Result<time::Duration, RedisError> {
let mut cmd = Cmd::new();
cmd.arg(idents::TIME);
let result: (u64, u64) = cmd.query_async(conn).await?;
let secs = time::Duration::from_secs(result.0);
let micros = time::Duration::from_micros(result.1);
Ok(secs + micros)
}
#[inline(always)]
async fn inner_time(mut conn: ConnectionManager) -> Result<time::Duration, RedisError> {
Self::inner_time_ref(&mut conn).await
}
#[inline(always)]
pub fn time(&self) -> impl Future<Output = Result<time::Duration, RedisError>> + Send {
Self::inner_time(self.connection())
}
pub async fn len(&self) -> Result<usize, RedisError> {
let mut conn = self.connection();
let cmd = self.cmd(idents::XLEN);
cmd.query_async(&mut conn).await
}
pub async fn consume(&self, group: &str, ids: &[StreamId]) -> Result<usize, RedisError> {
let mut conn = self.connection();
let mut cmd = self.cmd(idents::XACK);
cmd.arg(group).arg(ids);
cmd.query_async(&mut conn).await
}
pub async fn delete(&self, ids: &[StreamId]) -> Result<usize, RedisError> {
let mut conn = self.connection();
let mut cmd = self.cmd(idents::XDEL);
cmd.arg(ids);
cmd.query_async(&mut conn).await
}
pub async fn trim(&self, method: TrimMethod) -> Result<u64, RedisError> {
let mut conn = self.connection();
let mut cmd = self.cmd(idents::XTRIM);
cmd.arg(method);
cmd.query_async(&mut conn).await
}
pub async fn purge(&self) -> Result<(), RedisError> {
let mut conn = self.connection();
self.cmd("DEL").query_async(&mut conn).await
}
pub async fn groups_info(&self) -> Result<Vec<GroupInfo>, RedisError> {
let mut conn = self.connection();
let mut cmd = Cmd::new();
cmd.arg(idents::XINFO)
.arg(idents::GROUPS)
.arg(self.config.stream.as_ref());
cmd.query_async(&mut conn).await
}
pub async fn pending_stats(&self, group: &str) -> Result<PendingStats, RedisError> {
let mut conn = self.connection();
let mut cmd = self.cmd(idents::XPENDING);
cmd.arg(&group);
cmd.query_async(&mut conn).await
}
pub async fn append<T: ToRedisArgs>(&self, item: &EntryValue<T>) -> Result<StreamId, RedisError> {
let mut conn = self.connection();
let mut cmd = self.cmd(idents::XADD);
cmd.arg("*").arg(item);
cmd.query_async(&mut conn).await
}
pub async fn append_delayed<T: ToRedisArgs>(&self, item: &EntryValue<T>, delay: time::Duration) -> Result<StreamId, RedisError> {
let mut conn = self.connection();
let now = Self::inner_time_ref(&mut conn).await?.saturating_add(delay);
let id = TimestampId::new(now);
let mut cmd = self.cmd(idents::XADD);
cmd.arg(id).arg(item);
cmd.query_async(&mut conn).await
}
pub async fn pending(&self, params: &PendingParams<'_>) -> Result<Vec<PendingEntry>, RedisError> {
let mut conn = self.connection();
let args = PendingParamsConfig {
params,
config: &self.config,
};
let mut cmd = Cmd::new();
cmd.arg(idents::XPENDING).arg(&args);
cmd.query_async(&mut conn).await
}
pub async fn fetch<T: FromRedisValue>(&self, params: &FetchParams<'_>) -> Result<FetchResult<T>, redis::RedisError> {
let mut conn = self.connection();
let args = FetchParamsConfig {
params,
config: &self.config,
};
let mut cmd = Cmd::new();
cmd.arg(idents::XREADGROUP).arg(&args);
match cmd.query_async::<_, Option<(FetchResult<T>,)>>(&mut conn).await? {
Some((res,)) => Ok(res),
None => Ok(FetchResult {
stream: args.config.stream.clone().into_owned(),
entries: Vec::new(),
}),
}
}
pub async fn fetch_entries<T: FromRedisValue>(&self, params: &FetchParams<'_>) -> Result<FetchEntries<T>, redis::RedisError> {
let mut conn = self.connection();
let args = FetchParamsConfig {
params,
config: &self.config,
};
let mut cmd = Cmd::new();
cmd.arg(idents::XREADGROUP).arg(&args);
match cmd.query_async::<_, Option<(FetchEntries<T>,)>>(&mut conn).await? {
Some((res,)) => Ok(res),
None => Ok(FetchEntries {
entries: Vec::new(),
}),
}
}
#[inline(always)]
pub fn fetch_iter<'a>(&self, params: FetchParams<'a>) -> FetchIter<'a> {
FetchIter::new(params, self.clone())
}
#[inline(always)]
pub fn pending_iter<'a>(&self, params: PendingParams<'a>) -> PendingIter<'a> {
PendingIter::new(params, self.clone())
}
}
impl fmt::Debug for Queue {
#[inline]
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Queue").field("name", &self.config.stream).finish()
}
}