ruststream_nats/context.rs
1//! Optional typed per-delivery context exposing native `JetStream` metadata.
2//!
3//! A handler reads native `JetStream` delivery metadata - the stream and consumer names, the stream
4//! and consumer sequence numbers, the server-side redelivery count, and the pending count - by
5//! declaring [`JetStreamContext`] as its per-delivery context and reading fields with the
6//! compile-time [`keys`]. The runtime builds the context once per delivery via
7//! [`BuildContext`](ruststream::BuildContext); resolving a key is a direct field read, with no
8//! hashing, boxing, or downcasting.
9//!
10//! This is purely additive: the default context is `()` (no fields), so existing handlers are
11//! unaffected. Opting in means changing the handler's context type to [`JetStreamContext`].
12//!
13//! These fields are genuinely native: they come from the `JetStream` `$JS.ACK` reply subject, not
14//! from the payload or the message [`Headers`](ruststream::Headers), so they are not reachable any
15//! other way. Core (non-JetStream) NATS deliveries carry no such metadata - their only native datum
16//! is the reply inbox, already surfaced as the `reply-to` header - so Core handlers should keep the
17//! default `()` context. A handler bound to [`JetStreamContext`] still works on a Core subscription;
18//! every key just reads `None` there.
19//!
20//! # Examples
21//!
22//! ```
23//! use ruststream::IncomingMessage;
24//! use ruststream::runtime::{Context, HandlerResult};
25//! use ruststream_nats::context::{JetStreamContext, keys};
26//!
27//! async fn handle<M: IncomingMessage>(
28//! _msg: &M,
29//! ctx: &mut Context<'_, JetStreamContext>,
30//! ) -> HandlerResult {
31//! // `None` on a core delivery; the stream sequence on a JetStream one.
32//! if let Some(seq) = ctx.context(keys::STREAM_SEQUENCE) {
33//! println!("stream sequence {seq}");
34//! }
35//! // The server-side delivery count distinguishes a first delivery from a redelivery.
36//! if ctx.context(keys::DELIVERED).is_some_and(|n| n > 1) {
37//! println!("redelivery");
38//! }
39//! HandlerResult::Ack
40//! }
41//! ```
42
43use ruststream::BuildContext;
44
45use crate::message::NatsMessage;
46
47/// Native `JetStream` per-delivery metadata, built once per delivery from the broker message.
48///
49/// Read its fields by the compile-time [`keys`] (for example
50/// [`keys::STREAM_SEQUENCE`]). On a core (non-JetStream) delivery there is no such metadata, so the
51/// context is empty and every key reads `None`; the same handler therefore works on both kinds of
52/// subscription.
53///
54/// The context owns its fields (it copies them out of the delivery), so it does not borrow the
55/// message: numbers are a stack copy and the stream/consumer names are cloned once per delivery.
56///
57/// # Examples
58///
59/// ```
60/// use ruststream::runtime::Context;
61/// use ruststream_nats::context::{JetStreamContext, keys};
62///
63/// // The context a handler reads through; the runtime supplies it per delivery.
64/// fn read(ctx: &Context<'_, JetStreamContext>) -> Option<u64> {
65/// ctx.context(keys::STREAM_SEQUENCE)
66/// }
67/// ```
68#[derive(Debug, Clone, Default, PartialEq, Eq)]
69pub struct JetStreamContext {
70 /// `None` for a core delivery, or when the `JetStream` reply subject could not be parsed.
71 info: Option<JetStreamInfo>,
72}
73
74/// The owned snapshot of one `JetStream` delivery's native metadata.
75#[derive(Debug, Clone, PartialEq, Eq)]
76struct JetStreamInfo {
77 stream: String,
78 consumer: String,
79 stream_sequence: u64,
80 consumer_sequence: u64,
81 delivered: i64,
82 pending: u64,
83}
84
85impl BuildContext<NatsMessage> for JetStreamContext {
86 fn build(msg: &NatsMessage) -> Self {
87 let info = match msg {
88 NatsMessage::JetStream(m) => m.info().map(|i| JetStreamInfo {
89 stream: i.stream.to_owned(),
90 consumer: i.consumer.to_owned(),
91 stream_sequence: i.stream_sequence,
92 consumer_sequence: i.consumer_sequence,
93 delivered: i.delivered,
94 pending: i.pending,
95 }),
96 NatsMessage::Core(_) => None,
97 };
98 Self { info }
99 }
100}
101
102// Compile-time guarantee that the context is buildable from the subscriber's actual delivery type,
103// so a handler declaring `Context<'_, JetStreamContext>` satisfies the runtime's
104// `Cx: BuildContext<S::Message>` mount bound. The JetStream `build` arm itself is only exercised
105// against a real server (`async_nats::jetstream::Message` has no in-process constructor), so this
106// type-level check is what validates the mount path here.
107const _: fn() = || {
108 fn assert_build_context<C: BuildContext<M>, M>() {}
109 assert_build_context::<
110 JetStreamContext,
111 <crate::NatsSubscriber as ruststream::Subscriber>::Message,
112 >();
113};
114
115/// Compile-time [`Field`](ruststream::Field) keys, one per native `JetStream` metadatum.
116///
117/// Each key is a zero-sized selector exported both as a type (for naming in bounds) and as a
118/// `const` value (for use at the call site, `ctx.context(keys::STREAM_SEQUENCE)`). Every key reads
119/// `None` on a core delivery or when the `JetStream` reply subject could not be parsed.
120pub mod keys {
121 use ruststream::Field;
122
123 use super::JetStreamContext;
124
125 /// Selector for the stream sequence number; see [`STREAM_SEQUENCE`].
126 #[derive(Debug, Clone, Copy)]
127 pub struct StreamSequence;
128
129 /// The monotonically increasing sequence the message holds within its stream.
130 pub const STREAM_SEQUENCE: StreamSequence = StreamSequence;
131
132 impl Field<JetStreamContext> for StreamSequence {
133 type Value<'a> = Option<u64>;
134 fn get(self, cx: &JetStreamContext) -> Option<u64> {
135 cx.info.as_ref().map(|i| i.stream_sequence)
136 }
137 }
138
139 /// Selector for the consumer sequence number; see [`CONSUMER_SEQUENCE`].
140 #[derive(Debug, Clone, Copy)]
141 pub struct ConsumerSequence;
142
143 /// The sequence the message holds within this consumer's delivery stream.
144 pub const CONSUMER_SEQUENCE: ConsumerSequence = ConsumerSequence;
145
146 impl Field<JetStreamContext> for ConsumerSequence {
147 type Value<'a> = Option<u64>;
148 fn get(self, cx: &JetStreamContext) -> Option<u64> {
149 cx.info.as_ref().map(|i| i.consumer_sequence)
150 }
151 }
152
153 /// Selector for the server-side delivery count; see [`DELIVERED`].
154 #[derive(Debug, Clone, Copy)]
155 pub struct Delivered;
156
157 /// The number of times the server has delivered this message.
158 ///
159 /// `1` on first delivery, higher on a redelivery (after a `nack` or an `ack_wait` timeout).
160 /// This is the native redelivery count, not reconstructable from the payload or headers.
161 pub const DELIVERED: Delivered = Delivered;
162
163 impl Field<JetStreamContext> for Delivered {
164 type Value<'a> = Option<i64>;
165 fn get(self, cx: &JetStreamContext) -> Option<i64> {
166 cx.info.as_ref().map(|i| i.delivered)
167 }
168 }
169
170 /// Selector for the pending count; see [`PENDING`].
171 #[derive(Debug, Clone, Copy)]
172 pub struct Pending;
173
174 /// The number of messages the server still has pending for this consumer behind this one.
175 pub const PENDING: Pending = Pending;
176
177 impl Field<JetStreamContext> for Pending {
178 type Value<'a> = Option<u64>;
179 fn get(self, cx: &JetStreamContext) -> Option<u64> {
180 cx.info.as_ref().map(|i| i.pending)
181 }
182 }
183
184 /// Selector for the stream name; see [`STREAM`].
185 #[derive(Debug, Clone, Copy)]
186 pub struct Stream;
187
188 /// The name of the stream this message was delivered from.
189 pub const STREAM: Stream = Stream;
190
191 impl Field<JetStreamContext> for Stream {
192 type Value<'a> = Option<&'a str>;
193 fn get(self, cx: &JetStreamContext) -> Option<&str> {
194 cx.info.as_ref().map(|i| i.stream.as_str())
195 }
196 }
197
198 /// Selector for the consumer name; see [`CONSUMER`].
199 #[derive(Debug, Clone, Copy)]
200 pub struct Consumer;
201
202 /// The name of the durable (or ephemeral) consumer this message was delivered through.
203 pub const CONSUMER: Consumer = Consumer;
204
205 impl Field<JetStreamContext> for Consumer {
206 type Value<'a> = Option<&'a str>;
207 fn get(self, cx: &JetStreamContext) -> Option<&str> {
208 cx.info.as_ref().map(|i| i.consumer.as_str())
209 }
210 }
211}
212
213#[cfg(test)]
214mod tests {
215 use ruststream::{BuildContext, Field};
216
217 use super::keys::{CONSUMER, CONSUMER_SEQUENCE, DELIVERED, PENDING, STREAM, STREAM_SEQUENCE};
218 use super::{JetStreamContext, JetStreamInfo};
219 use crate::message::{CoreMessage, NatsMessage};
220
221 fn populated() -> JetStreamContext {
222 JetStreamContext {
223 info: Some(JetStreamInfo {
224 stream: "ORDERS".to_owned(),
225 consumer: "orders-worker".to_owned(),
226 stream_sequence: 42,
227 consumer_sequence: 7,
228 delivered: 3,
229 pending: 5,
230 }),
231 }
232 }
233
234 #[test]
235 fn keys_read_populated_jetstream_fields() {
236 let cx = populated();
237 assert_eq!(STREAM_SEQUENCE.get(&cx), Some(42));
238 assert_eq!(CONSUMER_SEQUENCE.get(&cx), Some(7));
239 assert_eq!(DELIVERED.get(&cx), Some(3));
240 assert_eq!(PENDING.get(&cx), Some(5));
241 assert_eq!(STREAM.get(&cx), Some("ORDERS"));
242 assert_eq!(CONSUMER.get(&cx), Some("orders-worker"));
243 }
244
245 #[test]
246 fn keys_read_none_without_jetstream_info() {
247 let cx = JetStreamContext::default();
248 assert_eq!(STREAM_SEQUENCE.get(&cx), None);
249 assert_eq!(CONSUMER_SEQUENCE.get(&cx), None);
250 assert_eq!(DELIVERED.get(&cx), None);
251 assert_eq!(PENDING.get(&cx), None);
252 assert_eq!(STREAM.get(&cx), None);
253 assert_eq!(CONSUMER.get(&cx), None);
254 }
255
256 #[test]
257 fn build_over_core_message_has_no_info() {
258 // A core delivery is constructible in-process; a JetStream `async_nats::jetstream::Message`
259 // is not, so the JetStream build path is compile-validated and exercised against a real
260 // server only. Core must yield an empty context (no native JetStream metadata).
261 let msg = NatsMessage::Core(Box::new(CoreMessage::new(async_nats::Message {
262 subject: "orders.created".into(),
263 reply: None,
264 payload: bytes::Bytes::from_static(b"{}"),
265 headers: None,
266 status: None,
267 description: None,
268 length: 2,
269 })));
270 let cx = JetStreamContext::build(&msg);
271 assert_eq!(cx, JetStreamContext::default());
272 assert_eq!(STREAM_SEQUENCE.get(&cx), None);
273 }
274}