use crate::{from_redis_value, FromRedisValue, RedisResult, RedisWrite, ToRedisArgs, Value};
use std::collections::HashMap;
use std::io::{Error, ErrorKind};
#[derive(PartialEq, Eq, Clone, Debug, Copy)]
pub enum StreamMaxlen {
Equals(usize),
Approx(usize),
}
impl ToRedisArgs for StreamMaxlen {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
let (ch, val) = match *self {
StreamMaxlen::Equals(v) => ("=", v),
StreamMaxlen::Approx(v) => ("~", v),
};
out.write_arg(b"MAXLEN");
out.write_arg(ch.as_bytes());
val.write_redis_args(out);
}
}
#[derive(Default, Debug)]
pub struct StreamClaimOptions {
idle: Option<usize>,
time: Option<usize>,
retry: Option<usize>,
force: bool,
justid: bool,
}
impl StreamClaimOptions {
pub fn idle(mut self, ms: usize) -> Self {
self.idle = Some(ms);
self
}
pub fn time(mut self, ms_time: usize) -> Self {
self.time = Some(ms_time);
self
}
pub fn retry(mut self, count: usize) -> Self {
self.retry = Some(count);
self
}
pub fn with_force(mut self) -> Self {
self.force = true;
self
}
pub fn with_justid(mut self) -> Self {
self.justid = true;
self
}
}
impl ToRedisArgs for StreamClaimOptions {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
if let Some(ref ms) = self.idle {
out.write_arg(b"IDLE");
out.write_arg(format!("{}", ms).as_bytes());
}
if let Some(ref ms_time) = self.time {
out.write_arg(b"TIME");
out.write_arg(format!("{}", ms_time).as_bytes());
}
if let Some(ref count) = self.retry {
out.write_arg(b"RETRYCOUNT");
out.write_arg(format!("{}", count).as_bytes());
}
if self.force {
out.write_arg(b"FORCE");
}
if self.justid {
out.write_arg(b"JUSTID");
}
}
}
type SRGroup = Option<(Vec<Vec<u8>>, Vec<Vec<u8>>)>;
#[derive(Default, Debug)]
pub struct StreamReadOptions {
block: Option<usize>,
count: Option<usize>,
noack: Option<bool>,
group: SRGroup,
}
impl StreamReadOptions {
pub fn read_only(&self) -> bool {
self.group.is_none()
}
pub fn noack(mut self) -> Self {
self.noack = Some(true);
self
}
pub fn block(mut self, ms: usize) -> Self {
self.block = Some(ms);
self
}
pub fn count(mut self, n: usize) -> Self {
self.count = Some(n);
self
}
pub fn group<GN: ToRedisArgs, CN: ToRedisArgs>(
mut self,
group_name: GN,
consumer_name: CN,
) -> Self {
self.group = Some((
ToRedisArgs::to_redis_args(&group_name),
ToRedisArgs::to_redis_args(&consumer_name),
));
self
}
}
impl ToRedisArgs for StreamReadOptions {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
if let Some(ref ms) = self.block {
out.write_arg(b"BLOCK");
out.write_arg(format!("{}", ms).as_bytes());
}
if let Some(ref n) = self.count {
out.write_arg(b"COUNT");
out.write_arg(format!("{}", n).as_bytes());
}
if let Some(ref group) = self.group {
if let Some(true) = self.noack {
out.write_arg(b"NOACK");
}
out.write_arg(b"GROUP");
for i in &group.0 {
out.write_arg(i);
}
for i in &group.1 {
out.write_arg(i);
}
}
}
}
#[derive(Default, Debug, Clone)]
pub struct StreamReadReply {
pub keys: Vec<StreamKey>,
}
#[derive(Default, Debug, Clone)]
pub struct StreamRangeReply {
pub ids: Vec<StreamId>,
}
#[derive(Default, Debug, Clone)]
pub struct StreamClaimReply {
pub ids: Vec<StreamId>,
}
#[derive(Debug, Clone)]
pub enum StreamPendingReply {
Empty,
Data(StreamPendingData),
}
impl Default for StreamPendingReply {
fn default() -> StreamPendingReply {
StreamPendingReply::Empty
}
}
impl StreamPendingReply {
pub fn count(&self) -> usize {
match self {
StreamPendingReply::Empty => 0,
StreamPendingReply::Data(x) => x.count,
}
}
}
#[derive(Default, Debug, Clone)]
pub struct StreamPendingData {
pub count: usize,
pub start_id: String,
pub end_id: String,
pub consumers: Vec<StreamInfoConsumer>,
}
#[derive(Default, Debug, Clone)]
pub struct StreamPendingCountReply {
pub ids: Vec<StreamPendingId>,
}
#[derive(Default, Debug, Clone)]
pub struct StreamInfoStreamReply {
pub last_generated_id: String,
pub radix_tree_keys: usize,
pub groups: usize,
pub length: usize,
pub first_entry: StreamId,
pub last_entry: StreamId,
}
#[derive(Default, Debug, Clone)]
pub struct StreamInfoConsumersReply {
pub consumers: Vec<StreamInfoConsumer>,
}
#[derive(Default, Debug, Clone)]
pub struct StreamInfoGroupsReply {
pub groups: Vec<StreamInfoGroup>,
}
#[derive(Default, Debug, Clone)]
pub struct StreamInfoConsumer {
pub name: String,
pub pending: usize,
pub idle: usize,
}
#[derive(Default, Debug, Clone)]
pub struct StreamInfoGroup {
pub name: String,
pub consumers: usize,
pub pending: usize,
pub last_delivered_id: String,
}
#[derive(Default, Debug, Clone)]
pub struct StreamPendingId {
pub id: String,
pub consumer: String,
pub last_delivered_ms: usize,
pub times_delivered: usize,
}
#[derive(Default, Debug, Clone)]
pub struct StreamKey {
pub key: String,
pub ids: Vec<StreamId>,
}
#[derive(Default, Debug, Clone)]
pub struct StreamId {
pub id: String,
pub map: HashMap<String, Value>,
}
impl StreamId {
fn from_bulk_value(v: &Value) -> RedisResult<Self> {
let mut stream_id = StreamId::default();
if let Value::Bulk(ref values) = *v {
if let Some(v) = values.get(0) {
stream_id.id = from_redis_value(&v)?;
}
if let Some(v) = values.get(1) {
stream_id.map = from_redis_value(&v)?;
}
}
Ok(stream_id)
}
pub fn get<T: FromRedisValue>(&self, key: &str) -> Option<T> {
match self.map.get(key) {
Some(ref x) => from_redis_value(*x).ok(),
None => None,
}
}
pub fn contains_key(&self, key: &&str) -> bool {
self.map.get(*key).is_some()
}
pub fn len(&self) -> usize {
self.map.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
type SRRows = Vec<HashMap<String, Vec<HashMap<String, HashMap<String, Value>>>>>;
impl FromRedisValue for StreamReadReply {
fn from_redis_value(v: &Value) -> RedisResult<Self> {
let rows: SRRows = from_redis_value(v)?;
let keys = rows
.into_iter()
.flat_map(|row| {
row.into_iter().map(|(key, entry)| {
let ids = entry
.into_iter()
.flat_map(|id_row| id_row.into_iter().map(|(id, map)| StreamId { id, map }))
.collect();
StreamKey { key, ids }
})
})
.collect();
Ok(StreamReadReply { keys })
}
}
impl FromRedisValue for StreamRangeReply {
fn from_redis_value(v: &Value) -> RedisResult<Self> {
let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
let ids: Vec<StreamId> = rows
.into_iter()
.flat_map(|row| row.into_iter().map(|(id, map)| StreamId { id, map }))
.collect();
Ok(StreamRangeReply { ids })
}
}
impl FromRedisValue for StreamClaimReply {
fn from_redis_value(v: &Value) -> RedisResult<Self> {
let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
let ids: Vec<StreamId> = rows
.into_iter()
.flat_map(|row| row.into_iter().map(|(id, map)| StreamId { id, map }))
.collect();
Ok(StreamClaimReply { ids })
}
}
type SPRInner = (
usize,
Option<String>,
Option<String>,
Vec<Option<(String, String)>>,
);
impl FromRedisValue for StreamPendingReply {
fn from_redis_value(v: &Value) -> RedisResult<Self> {
let (count, start, end, consumer_data): SPRInner = from_redis_value(v)?;
if count == 0 {
Ok(StreamPendingReply::Empty)
} else {
let mut result = StreamPendingData::default();
let start_id = start.ok_or_else(|| {
Error::new(
ErrorKind::Other,
"IllegalState: Non-zero pending expects start id",
)
})?;
let end_id = end.ok_or_else(|| {
Error::new(
ErrorKind::Other,
"IllegalState: Non-zero pending expects end id",
)
})?;
result.count = count;
result.start_id = start_id;
result.end_id = end_id;
for cd in consumer_data {
if let Some((name, pending)) = cd {
let mut info = StreamInfoConsumer::default();
info.name = name;
if let Ok(v) = pending.parse::<usize>() {
info.pending = v;
}
result.consumers.push(info);
}
}
Ok(StreamPendingReply::Data(result))
}
}
}
impl FromRedisValue for StreamPendingCountReply {
fn from_redis_value(v: &Value) -> RedisResult<Self> {
let mut reply = StreamPendingCountReply::default();
match v {
Value::Bulk(outer_tuple) => {
for outer in outer_tuple {
match outer {
Value::Bulk(inner_tuple) => match &inner_tuple[..] {
[Value::Data(id_bytes), Value::Data(consumer_bytes), Value::Int(last_delivered_ms_u64), Value::Int(times_delivered_u64)] =>
{
let id = String::from_utf8(id_bytes.to_vec())?;
let consumer = String::from_utf8(consumer_bytes.to_vec())?;
let last_delivered_ms = *last_delivered_ms_u64 as usize;
let times_delivered = *times_delivered_u64 as usize;
reply.ids.push(StreamPendingId {
id,
consumer,
last_delivered_ms,
times_delivered,
});
}
_ => fail!((
crate::types::ErrorKind::TypeError,
"Cannot parse redis data (3)"
)),
},
_ => fail!((
crate::types::ErrorKind::TypeError,
"Cannot parse redis data (2)"
)),
}
}
}
_ => fail!((
crate::types::ErrorKind::TypeError,
"Cannot parse redis data (1)"
)),
};
Ok(reply)
}
}
impl FromRedisValue for StreamInfoStreamReply {
fn from_redis_value(v: &Value) -> RedisResult<Self> {
let map: HashMap<String, Value> = from_redis_value(v)?;
let mut reply = StreamInfoStreamReply::default();
if let Some(v) = &map.get("last-generated-id") {
reply.last_generated_id = from_redis_value(v)?;
}
if let Some(v) = &map.get("radix-tree-nodes") {
reply.radix_tree_keys = from_redis_value(v)?;
}
if let Some(v) = &map.get("groups") {
reply.groups = from_redis_value(v)?;
}
if let Some(v) = &map.get("length") {
reply.length = from_redis_value(v)?;
}
if let Some(v) = &map.get("first-entry") {
reply.first_entry = StreamId::from_bulk_value(v)?;
}
if let Some(v) = &map.get("last-entry") {
reply.last_entry = StreamId::from_bulk_value(v)?;
}
Ok(reply)
}
}
impl FromRedisValue for StreamInfoConsumersReply {
fn from_redis_value(v: &Value) -> RedisResult<Self> {
let consumers: Vec<HashMap<String, Value>> = from_redis_value(v)?;
let mut reply = StreamInfoConsumersReply::default();
for map in consumers {
let mut c = StreamInfoConsumer::default();
if let Some(v) = &map.get("name") {
c.name = from_redis_value(v)?;
}
if let Some(v) = &map.get("pending") {
c.pending = from_redis_value(v)?;
}
if let Some(v) = &map.get("idle") {
c.idle = from_redis_value(v)?;
}
reply.consumers.push(c);
}
Ok(reply)
}
}
impl FromRedisValue for StreamInfoGroupsReply {
fn from_redis_value(v: &Value) -> RedisResult<Self> {
let groups: Vec<HashMap<String, Value>> = from_redis_value(v)?;
let mut reply = StreamInfoGroupsReply::default();
for map in groups {
let mut g = StreamInfoGroup::default();
if let Some(v) = &map.get("name") {
g.name = from_redis_value(v)?;
}
if let Some(v) = &map.get("pending") {
g.pending = from_redis_value(v)?;
}
if let Some(v) = &map.get("consumers") {
g.consumers = from_redis_value(v)?;
}
if let Some(v) = &map.get("last-delivered-id") {
g.last_delivered_id = from_redis_value(v)?;
}
reply.groups.push(g);
}
Ok(reply)
}
}