Skip to main content

ruststream_fred/
context.rs

1//! Optional typed per-delivery context exposing native Redis metadata, one struct per transport.
2//!
3//! A handler can read native Redis metadata for the message it is processing by compile-time
4//! [`Field`] key, with no hashing, boxing, or downcasting. The runtime builds the context value once
5//! per delivery (via [`BuildContext`]) from the concrete broker message, then the handler reads a
6//! field with `ctx.context(key)`.
7//!
8//! This is purely additive. A handler that declares the default `()` context (the vast majority) is
9//! unaffected: the blanket `impl BuildContext<M> for ()` still applies, so opting in costs nothing
10//! to those who do not.
11//!
12//! # What is exposed
13//!
14//! Only genuinely-native metadata that is not already reachable off the payload or
15//! [`Headers`](ruststream::Headers) is surfaced here:
16//!
17//! * [`StreamContext`] (Redis Streams) - the stream entry id and the consumer group. The native
18//!   reclaim delivery-count and idle time stay header-surfaced
19//!   ([`DELIVERY_COUNT_HEADER`](crate::DELIVERY_COUNT_HEADER) /
20//!   [`IDLE_MS_HEADER`](crate::IDLE_MS_HEADER)) and are deliberately not duplicated.
21//! * [`PubSubContext`] (Redis Pub/Sub) - the concrete channel the message arrived on and whether it
22//!   matched through a `PSUBSCRIBE` pattern (for a pattern subscription the channel differs from the
23//!   registered glob).
24//!
25//! # Examples
26//!
27//! ```
28//! use ruststream::runtime::{Context, HandlerResult};
29//! use ruststream_fred::context::{StreamContext, keys};
30//!
31//! // A handler over the Streams transport reading the native entry id and consumer group.
32//! async fn handle(order: &Vec<u8>, ctx: &mut Context<'_, StreamContext>) -> HandlerResult {
33//!     if let Some(id) = ctx.context(keys::EntryId) {
34//!         let _ = id; // e.g. log the stream entry id `1700000000000-0`
35//!     }
36//!     let _group = ctx.context(keys::ConsumerGroup);
37//!     HandlerResult::Ack
38//! }
39//! # let _ = handle;
40//! ```
41
42use ruststream::{BuildContext, Field};
43
44use crate::message::RedisMessage;
45use crate::pubsub::RedisPubSubMessage;
46
47/// Per-delivery context for a Redis Streams delivery ([`RedisMessage`]).
48///
49/// Built once per delivery from the message. Read its fields by [`keys`] key off a
50/// [`Context`](ruststream::runtime::Context).
51#[derive(Debug, Clone, Default, PartialEq, Eq)]
52pub struct StreamContext {
53    entry_id: Option<String>,
54    consumer_group: Option<String>,
55}
56
57impl StreamContext {
58    /// Constructs a context directly from its native fields (mainly for tests).
59    #[must_use]
60    pub fn new(entry_id: Option<String>, consumer_group: Option<String>) -> Self {
61        Self {
62            entry_id,
63            consumer_group,
64        }
65    }
66
67    /// The stream entry id (for example `1700000000000-0`) this delivery was read at.
68    #[must_use]
69    pub fn entry_id(&self) -> Option<&str> {
70        self.entry_id.as_deref()
71    }
72
73    /// The consumer group this delivery was read through.
74    #[must_use]
75    pub fn consumer_group(&self) -> Option<&str> {
76        self.consumer_group.as_deref()
77    }
78}
79
80impl BuildContext<RedisMessage> for StreamContext {
81    fn build(msg: &RedisMessage) -> Self {
82        Self {
83            entry_id: msg.id().map(str::to_owned),
84            consumer_group: msg.group().map(str::to_owned),
85        }
86    }
87}
88
89/// Per-delivery context for a Redis Pub/Sub delivery ([`RedisPubSubMessage`]).
90#[derive(Debug, Clone, Default, PartialEq, Eq)]
91pub struct PubSubContext {
92    channel: String,
93    from_pattern: bool,
94}
95
96impl PubSubContext {
97    /// Constructs a context directly from its native fields (mainly for tests).
98    #[must_use]
99    pub fn new(channel: impl Into<String>, from_pattern: bool) -> Self {
100        Self {
101            channel: channel.into(),
102            from_pattern,
103        }
104    }
105
106    /// The concrete channel this message arrived on (the matched channel, not the subscription
107    /// glob, for a pattern subscription).
108    #[must_use]
109    pub fn channel(&self) -> &str {
110        &self.channel
111    }
112
113    /// Whether the delivery matched through a `PSUBSCRIBE` pattern rather than an exact subscribe.
114    #[must_use]
115    pub fn from_pattern(&self) -> bool {
116        self.from_pattern
117    }
118}
119
120impl BuildContext<RedisPubSubMessage> for PubSubContext {
121    fn build(msg: &RedisPubSubMessage) -> Self {
122        Self {
123            channel: msg.channel().to_owned(),
124            from_pattern: msg.from_pattern(),
125        }
126    }
127}
128
129/// Compile-time [`Field`] keys, one per native field, read with `ctx.context(key)`.
130///
131/// Each key is a zero-sized selector implementing [`Field`] only for the context type that carries
132/// its field, so applying a key to the wrong transport's context is a compile error.
133pub mod keys {
134    use super::{Field, PubSubContext, StreamContext};
135
136    /// Reads the stream entry id off a [`StreamContext`].
137    #[derive(Debug, Clone, Copy, Default)]
138    pub struct EntryId;
139
140    impl Field<StreamContext> for EntryId {
141        type Value<'a> = Option<&'a str>;
142        fn get(self, src: &StreamContext) -> Option<&str> {
143            src.entry_id()
144        }
145    }
146
147    /// Reads the consumer group off a [`StreamContext`].
148    #[derive(Debug, Clone, Copy, Default)]
149    pub struct ConsumerGroup;
150
151    impl Field<StreamContext> for ConsumerGroup {
152        type Value<'a> = Option<&'a str>;
153        fn get(self, src: &StreamContext) -> Option<&str> {
154            src.consumer_group()
155        }
156    }
157
158    /// Reads the concrete channel off a [`PubSubContext`].
159    #[derive(Debug, Clone, Copy, Default)]
160    pub struct Channel;
161
162    impl Field<PubSubContext> for Channel {
163        type Value<'a> = &'a str;
164        fn get(self, src: &PubSubContext) -> &str {
165            src.channel()
166        }
167    }
168
169    /// Reads whether a [`PubSubContext`] delivery matched through a `PSUBSCRIBE` pattern.
170    #[derive(Debug, Clone, Copy, Default)]
171    pub struct FromPattern;
172
173    impl Field<PubSubContext> for FromPattern {
174        type Value<'a> = bool;
175        fn get(self, src: &PubSubContext) -> bool {
176            src.from_pattern()
177        }
178    }
179}
180
181#[cfg(test)]
182mod tests {
183    use super::keys::{Channel, ConsumerGroup, EntryId, FromPattern};
184    use super::{PubSubContext, StreamContext};
185    use ruststream::Field;
186
187    #[test]
188    fn stream_keys_read_native_fields() {
189        let cx = StreamContext::new(
190            Some("1700000000000-0".to_owned()),
191            Some("workers".to_owned()),
192        );
193        assert_eq!(EntryId.get(&cx), Some("1700000000000-0"));
194        assert_eq!(ConsumerGroup.get(&cx), Some("workers"));
195    }
196
197    #[test]
198    fn stream_keys_absent_when_settled() {
199        let cx = StreamContext::new(None, None);
200        assert_eq!(EntryId.get(&cx), None);
201        assert_eq!(ConsumerGroup.get(&cx), None);
202    }
203
204    #[test]
205    fn pubsub_keys_read_channel_and_pattern_flag() {
206        let exact = PubSubContext::new("events", false);
207        assert_eq!(Channel.get(&exact), "events");
208        assert!(!FromPattern.get(&exact));
209
210        let matched = PubSubContext::new("events.user", true);
211        assert_eq!(Channel.get(&matched), "events.user");
212        assert!(FromPattern.get(&matched));
213    }
214}