use ruststream::{BuildContext, Field};
use crate::message::RedisMessage;
use crate::pubsub::RedisPubSubMessage;
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct StreamContext {
entry_id: Option<String>,
consumer_group: Option<String>,
}
impl StreamContext {
#[must_use]
pub fn new(entry_id: Option<String>, consumer_group: Option<String>) -> Self {
Self {
entry_id,
consumer_group,
}
}
#[must_use]
pub fn entry_id(&self) -> Option<&str> {
self.entry_id.as_deref()
}
#[must_use]
pub fn consumer_group(&self) -> Option<&str> {
self.consumer_group.as_deref()
}
}
impl BuildContext<RedisMessage> for StreamContext {
fn build(msg: &RedisMessage) -> Self {
Self {
entry_id: msg.id().map(str::to_owned),
consumer_group: msg.group().map(str::to_owned),
}
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct PubSubContext {
channel: String,
from_pattern: bool,
}
impl PubSubContext {
#[must_use]
pub fn new(channel: impl Into<String>, from_pattern: bool) -> Self {
Self {
channel: channel.into(),
from_pattern,
}
}
#[must_use]
pub fn channel(&self) -> &str {
&self.channel
}
#[must_use]
pub fn from_pattern(&self) -> bool {
self.from_pattern
}
}
impl BuildContext<RedisPubSubMessage> for PubSubContext {
fn build(msg: &RedisPubSubMessage) -> Self {
Self {
channel: msg.channel().to_owned(),
from_pattern: msg.from_pattern(),
}
}
}
pub mod keys {
use super::{Field, PubSubContext, StreamContext};
#[derive(Debug, Clone, Copy, Default)]
pub struct EntryId;
impl Field<StreamContext> for EntryId {
type Value<'a> = Option<&'a str>;
fn get(self, src: &StreamContext) -> Option<&str> {
src.entry_id()
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct ConsumerGroup;
impl Field<StreamContext> for ConsumerGroup {
type Value<'a> = Option<&'a str>;
fn get(self, src: &StreamContext) -> Option<&str> {
src.consumer_group()
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct Channel;
impl Field<PubSubContext> for Channel {
type Value<'a> = &'a str;
fn get(self, src: &PubSubContext) -> &str {
src.channel()
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct FromPattern;
impl Field<PubSubContext> for FromPattern {
type Value<'a> = bool;
fn get(self, src: &PubSubContext) -> bool {
src.from_pattern()
}
}
}
#[cfg(test)]
mod tests {
use super::keys::{Channel, ConsumerGroup, EntryId, FromPattern};
use super::{PubSubContext, StreamContext};
use ruststream::Field;
#[test]
fn stream_keys_read_native_fields() {
let cx = StreamContext::new(
Some("1700000000000-0".to_owned()),
Some("workers".to_owned()),
);
assert_eq!(EntryId.get(&cx), Some("1700000000000-0"));
assert_eq!(ConsumerGroup.get(&cx), Some("workers"));
}
#[test]
fn stream_keys_absent_when_settled() {
let cx = StreamContext::new(None, None);
assert_eq!(EntryId.get(&cx), None);
assert_eq!(ConsumerGroup.get(&cx), None);
}
#[test]
fn pubsub_keys_read_channel_and_pattern_flag() {
let exact = PubSubContext::new("events", false);
assert_eq!(Channel.get(&exact), "events");
assert!(!FromPattern.get(&exact));
let matched = PubSubContext::new("events.user", true);
assert_eq!(Channel.get(&matched), "events.user");
assert!(FromPattern.get(&matched));
}
}