ruststream_fred/context.rs
1//! Optional typed per-delivery context exposing native Redis metadata, one struct per transport.
2//!
3//! A handler can read native Redis metadata for the message it is processing by compile-time
4//! [`Field`] key, with no hashing, boxing, or downcasting. The runtime builds the context value once
5//! per delivery (via [`BuildContext`]) from the concrete broker message, then the handler reads a
6//! field with `ctx.context(key)`.
7//!
8//! This is purely additive. A handler that declares the default `()` context (the vast majority) is
9//! unaffected: the blanket `impl BuildContext<M> for ()` still applies, so opting in costs nothing
10//! to those who do not.
11//!
12//! # What is exposed
13//!
14//! Only genuinely-native metadata that is not already reachable off the payload or
15//! [`Headers`](ruststream::Headers) is surfaced here:
16//!
17//! * [`StreamContext`] (Redis Streams) - the stream entry id and the consumer group. The native
18//! reclaim delivery-count and idle time stay header-surfaced
19//! ([`DELIVERY_COUNT_HEADER`](crate::DELIVERY_COUNT_HEADER) /
20//! [`IDLE_MS_HEADER`](crate::IDLE_MS_HEADER)) and are deliberately not duplicated.
21//! * [`PubSubContext`] (Redis Pub/Sub) - the concrete channel the message arrived on and whether it
22//! matched through a `PSUBSCRIBE` pattern (for a pattern subscription the channel differs from the
23//! registered glob).
24//!
25//! # Examples
26//!
27//! ```
28//! use ruststream::runtime::{Context, HandlerResult};
29//! use ruststream_fred::context::{StreamContext, keys};
30//!
31//! // A handler over the Streams transport reading the native entry id and consumer group.
32//! async fn handle(order: &Vec<u8>, ctx: &mut Context<'_, StreamContext>) -> HandlerResult {
33//! if let Some(id) = ctx.context(keys::EntryId) {
34//! let _ = id; // e.g. log the stream entry id `1700000000000-0`
35//! }
36//! let _group = ctx.context(keys::ConsumerGroup);
37//! HandlerResult::Ack
38//! }
39//! # let _ = handle;
40//! ```
41
42use ruststream::{BuildContext, Field};
43
44use crate::message::RedisMessage;
45use crate::pubsub::RedisPubSubMessage;
46
47/// Per-delivery context for a Redis Streams delivery ([`RedisMessage`]).
48///
49/// Built once per delivery from the message. Read its fields by [`keys`] key off a
50/// [`Context`](ruststream::runtime::Context).
51#[derive(Debug, Clone, Default, PartialEq, Eq)]
52pub struct StreamContext {
53 entry_id: Option<String>,
54 consumer_group: Option<String>,
55}
56
57impl StreamContext {
58 /// Constructs a context directly from its native fields (mainly for tests).
59 #[must_use]
60 pub fn new(entry_id: Option<String>, consumer_group: Option<String>) -> Self {
61 Self {
62 entry_id,
63 consumer_group,
64 }
65 }
66
67 /// The stream entry id (for example `1700000000000-0`) this delivery was read at.
68 #[must_use]
69 pub fn entry_id(&self) -> Option<&str> {
70 self.entry_id.as_deref()
71 }
72
73 /// The consumer group this delivery was read through.
74 #[must_use]
75 pub fn consumer_group(&self) -> Option<&str> {
76 self.consumer_group.as_deref()
77 }
78}
79
80impl BuildContext<RedisMessage> for StreamContext {
81 fn build(msg: &RedisMessage) -> Self {
82 Self {
83 entry_id: msg.id().map(str::to_owned),
84 consumer_group: msg.group().map(str::to_owned),
85 }
86 }
87}
88
89/// Per-delivery context for a Redis Pub/Sub delivery ([`RedisPubSubMessage`]).
90#[derive(Debug, Clone, Default, PartialEq, Eq)]
91pub struct PubSubContext {
92 channel: String,
93 from_pattern: bool,
94}
95
96impl PubSubContext {
97 /// Constructs a context directly from its native fields (mainly for tests).
98 #[must_use]
99 pub fn new(channel: impl Into<String>, from_pattern: bool) -> Self {
100 Self {
101 channel: channel.into(),
102 from_pattern,
103 }
104 }
105
106 /// The concrete channel this message arrived on (the matched channel, not the subscription
107 /// glob, for a pattern subscription).
108 #[must_use]
109 pub fn channel(&self) -> &str {
110 &self.channel
111 }
112
113 /// Whether the delivery matched through a `PSUBSCRIBE` pattern rather than an exact subscribe.
114 #[must_use]
115 pub fn from_pattern(&self) -> bool {
116 self.from_pattern
117 }
118}
119
120impl BuildContext<RedisPubSubMessage> for PubSubContext {
121 fn build(msg: &RedisPubSubMessage) -> Self {
122 Self {
123 channel: msg.channel().to_owned(),
124 from_pattern: msg.from_pattern(),
125 }
126 }
127}
128
129/// Compile-time [`Field`] keys, one per native field, read with `ctx.context(key)`.
130///
131/// Each key is a zero-sized selector implementing [`Field`] only for the context type that carries
132/// its field, so applying a key to the wrong transport's context is a compile error.
133pub mod keys {
134 use super::{Field, PubSubContext, StreamContext};
135
136 /// Reads the stream entry id off a [`StreamContext`].
137 #[derive(Debug, Clone, Copy, Default)]
138 pub struct EntryId;
139
140 impl Field<StreamContext> for EntryId {
141 type Value<'a> = Option<&'a str>;
142 fn get(self, src: &StreamContext) -> Option<&str> {
143 src.entry_id()
144 }
145 }
146
147 /// Reads the consumer group off a [`StreamContext`].
148 #[derive(Debug, Clone, Copy, Default)]
149 pub struct ConsumerGroup;
150
151 impl Field<StreamContext> for ConsumerGroup {
152 type Value<'a> = Option<&'a str>;
153 fn get(self, src: &StreamContext) -> Option<&str> {
154 src.consumer_group()
155 }
156 }
157
158 /// Reads the concrete channel off a [`PubSubContext`].
159 #[derive(Debug, Clone, Copy, Default)]
160 pub struct Channel;
161
162 impl Field<PubSubContext> for Channel {
163 type Value<'a> = &'a str;
164 fn get(self, src: &PubSubContext) -> &str {
165 src.channel()
166 }
167 }
168
169 /// Reads whether a [`PubSubContext`] delivery matched through a `PSUBSCRIBE` pattern.
170 #[derive(Debug, Clone, Copy, Default)]
171 pub struct FromPattern;
172
173 impl Field<PubSubContext> for FromPattern {
174 type Value<'a> = bool;
175 fn get(self, src: &PubSubContext) -> bool {
176 src.from_pattern()
177 }
178 }
179}
180
181#[cfg(test)]
182mod tests {
183 use super::keys::{Channel, ConsumerGroup, EntryId, FromPattern};
184 use super::{PubSubContext, StreamContext};
185 use ruststream::Field;
186
187 #[test]
188 fn stream_keys_read_native_fields() {
189 let cx = StreamContext::new(
190 Some("1700000000000-0".to_owned()),
191 Some("workers".to_owned()),
192 );
193 assert_eq!(EntryId.get(&cx), Some("1700000000000-0"));
194 assert_eq!(ConsumerGroup.get(&cx), Some("workers"));
195 }
196
197 #[test]
198 fn stream_keys_absent_when_settled() {
199 let cx = StreamContext::new(None, None);
200 assert_eq!(EntryId.get(&cx), None);
201 assert_eq!(ConsumerGroup.get(&cx), None);
202 }
203
204 #[test]
205 fn pubsub_keys_read_channel_and_pattern_flag() {
206 let exact = PubSubContext::new("events", false);
207 assert_eq!(Channel.get(&exact), "events");
208 assert!(!FromPattern.get(&exact));
209
210 let matched = PubSubContext::new("events.user", true);
211 assert_eq!(Channel.get(&matched), "events.user");
212 assert!(FromPattern.get(&matched));
213 }
214}