Skip to main content

ruststream_nats/
subscriber.rs

1//! Unified NATS subscriber wrapping either a Core or a `JetStream` pull consumer.
2
3use async_nats::jetstream::consumer::{PullConsumer, pull::Stream as PullStream};
4use futures::stream::{poll_fn, unfold};
5use futures::{Stream, StreamExt, future::Either};
6use ruststream::{BatchSubscriber, Subscriber};
7use std::fmt::{Debug, Formatter};
8use std::{pin::Pin, task::Poll, time::Duration};
9use tracing::warn;
10
11use crate::{
12    error::NatsError,
13    message::{CoreMessage, JetStreamMessage, NatsMessage},
14};
15
16/// Cap on a Core NATS batch: [`BatchSubscriber::batches`] drains only what the client has
17/// already buffered locally, and this bounds one drain.
18const CORE_BATCH_LIMIT: usize = 256;
19
20enum SubscriberKind {
21    Core { inner: async_nats::Subscriber },
22    // Box the JetStream variant: PullConsumer is large (~1400 bytes) and the enum would otherwise
23    // penalise the Core path with the same footprint.
24    JetStream(Box<JetStreamKind>),
25}
26
27struct JetStreamKind {
28    inner: Pin<Box<PullStream>>,
29    consumer: PullConsumer,
30    stream_name: String,
31    pull_batch: usize,
32    pull_expires: Duration,
33}
34
35/// A NATS subscription.
36///
37/// Backed transparently by either a Core subscription (no ack) or a `JetStream` pull consumer
38/// (full ack/nack/term). Construct via [`crate::NatsBroker::subscribe`] with
39/// [`crate::SubscribeOptions`].
40pub struct NatsSubscriber {
41    subject: String,
42    kind: SubscriberKind,
43}
44
45impl Debug for NatsSubscriber {
46    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47        let mut s = f.debug_struct("NatsSubscriber");
48        s.field("subject", &self.subject);
49        match &self.kind {
50            SubscriberKind::Core { .. } => {
51                s.field("kind", &"core");
52            }
53            SubscriberKind::JetStream(js) => {
54                s.field("kind", &"jetstream")
55                    .field("stream", &js.stream_name);
56            }
57        }
58        s.finish_non_exhaustive()
59    }
60}
61
62impl NatsSubscriber {
63    pub(crate) const fn from_core(subject: String, inner: async_nats::Subscriber) -> Self {
64        Self {
65            subject,
66            kind: SubscriberKind::Core { inner },
67        }
68    }
69
70    pub(crate) fn from_jetstream(
71        subject: String,
72        stream_name: String,
73        inner: PullStream,
74        consumer: PullConsumer,
75        pull_batch: usize,
76        pull_expires: Duration,
77    ) -> Self {
78        Self {
79            subject,
80            kind: SubscriberKind::JetStream(Box::new(JetStreamKind {
81                inner: Box::pin(inner),
82                consumer,
83                stream_name,
84                pull_batch,
85                pull_expires,
86            })),
87        }
88    }
89}
90
91fn core_message(msg: async_nats::Message) -> NatsMessage {
92    NatsMessage::Core(Box::new(CoreMessage::new(msg)))
93}
94
95fn jetstream_message(msg: async_nats::jetstream::Message) -> NatsMessage {
96    NatsMessage::JetStream(Box::new(JetStreamMessage::new(msg)))
97}
98
99impl Subscriber for NatsSubscriber {
100    type Message = NatsMessage;
101    type Error = NatsError;
102
103    fn stream(&mut self) -> impl Stream<Item = Result<Self::Message, Self::Error>> + Send + '_ {
104        // Poll the inner subscription in place rather than moving it into the returned stream,
105        // so `stream` can be called again after the returned stream is dropped (the runtime and
106        // the conformance helpers re-enter it per call).
107        match &mut self.kind {
108            SubscriberKind::Core { inner } => Either::Left(
109                poll_fn(move |cx| Pin::new(&mut *inner).poll_next(cx))
110                    .map(|msg| Ok(core_message(msg))),
111            ),
112            SubscriberKind::JetStream(js) => Either::Right(
113                poll_fn(move |cx| js.inner.as_mut().poll_next(cx)).map(|item| match item {
114                    Ok(msg) => Ok(jetstream_message(msg)),
115                    Err(err) => {
116                        warn!(target: "ruststream::nats", error = %err, "jetstream fetch error");
117                        Err(NatsError::JetStream(Box::new(err)))
118                    }
119                }),
120            ),
121        }
122    }
123}
124
125impl BatchSubscriber for NatsSubscriber {
126    type Batch = Vec<NatsMessage>;
127
128    /// Returns a stream of message batches.
129    ///
130    /// `JetStream` batches natively: one stream item is one `fetch` of up to
131    /// [`pull_batch`](crate::SubscribeOptions::pull_batch) messages, waiting at most
132    /// [`pull_expires`](crate::SubscribeOptions::pull_expires) before delivering a partial batch
133    /// (an empty fetch is retried, so the stream never yields empty batches). Core NATS has no
134    /// wire-level batching; there a batch is whatever the client has already buffered locally
135    /// (at least one message, at most [`CORE_BATCH_LIMIT`]), with no added latency.
136    ///
137    /// Drive a subscriber through either [`Subscriber::stream`] or `batches`, not both at once:
138    /// on `JetStream` each issues its own pull requests, so deliveries would be split between
139    /// them.
140    ///
141    /// # Cancel safety
142    ///
143    /// Dropping the returned stream between items is allowed. On `JetStream`, dropping it
144    /// mid-fetch can leave already-fetched, undelivered messages to be redelivered after the
145    /// consumer's `ack_wait`.
146    fn batches(&mut self) -> impl Stream<Item = Result<Self::Batch, Self::Error>> + Send + '_ {
147        match &mut self.kind {
148            SubscriberKind::Core { inner } => Either::Left(poll_fn(move |cx| {
149                let first = match Pin::new(&mut *inner).poll_next(cx) {
150                    Poll::Pending => return Poll::Pending,
151                    Poll::Ready(None) => return Poll::Ready(None),
152                    Poll::Ready(Some(msg)) => msg,
153                };
154                let mut batch = vec![core_message(first)];
155                while batch.len() < CORE_BATCH_LIMIT {
156                    match Pin::new(&mut *inner).poll_next(cx) {
157                        Poll::Ready(Some(msg)) => batch.push(core_message(msg)),
158                        Poll::Ready(None) | Poll::Pending => break,
159                    }
160                }
161                Poll::Ready(Some(Ok(batch)))
162            })),
163            SubscriberKind::JetStream(js) => {
164                let max = js.pull_batch;
165                let expires = js.pull_expires;
166                Either::Right(unfold(&mut js.consumer, move |consumer| async move {
167                    loop {
168                        let fetch = consumer
169                            .fetch()
170                            .max_messages(max)
171                            .expires(expires)
172                            .messages()
173                            .await;
174                        let mut messages = match fetch {
175                            Ok(messages) => messages,
176                            // BatchError is a concrete sized type; wrap it.
177                            Err(err) => {
178                                return Some((Err(NatsError::JetStream(Box::new(err))), consumer));
179                            }
180                        };
181                        let mut batch = Vec::new();
182                        while let Some(item) = messages.next().await {
183                            match item {
184                                Ok(msg) => batch.push(jetstream_message(msg)),
185                                // crate::Error is already Box<dyn StdError + ...>; use directly.
186                                Err(err) => {
187                                    if batch.is_empty() {
188                                        return Some((Err(NatsError::JetStream(err)), consumer));
189                                    }
190                                    warn!(
191                                        target: "ruststream::nats",
192                                        error = %err,
193                                        "jetstream fetch error mid-batch; delivering the partial batch",
194                                    );
195                                    break;
196                                }
197                            }
198                        }
199                        if !batch.is_empty() {
200                            return Some((Ok(batch), consumer));
201                        }
202                        // An empty fetch only means `expires` elapsed with nothing pending.
203                    }
204                }))
205            }
206        }
207    }
208}