Skip to main content

ruststream_nats/
subscriber.rs

1//! Unified NATS subscriber wrapping either a Core or a `JetStream` pull consumer.
2
3use async_nats::jetstream::consumer::pull::Stream as PullStream;
4use futures::{Stream, future::Either};
5use ruststream::Subscriber;
6use tokio_stream::StreamExt;
7use tracing::warn;
8
9use crate::{
10    error::NatsError,
11    message::{CoreMessage, JetStreamMessage, NatsMessage},
12};
13
14enum SubscriberKind {
15    Core {
16        inner: Option<async_nats::Subscriber>,
17    },
18    JetStream {
19        inner: Option<Box<PullStream>>,
20        stream_name: String,
21    },
22}
23
24/// A NATS subscription.
25///
26/// Backed transparently by either a Core subscription (no ack) or a `JetStream` pull consumer
27/// (full ack/nack/term). Construct via [`crate::NatsBroker::subscribe`] with
28/// [`crate::SubscribeOptions`].
29pub struct NatsSubscriber {
30    subject: String,
31    kind: SubscriberKind,
32}
33
34impl std::fmt::Debug for NatsSubscriber {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        let mut s = f.debug_struct("NatsSubscriber");
37        s.field("subject", &self.subject);
38        match &self.kind {
39            SubscriberKind::Core { .. } => {
40                s.field("kind", &"core");
41            }
42            SubscriberKind::JetStream { stream_name, .. } => {
43                s.field("kind", &"jetstream").field("stream", stream_name);
44            }
45        }
46        s.finish_non_exhaustive()
47    }
48}
49
50impl NatsSubscriber {
51    pub(crate) const fn from_core(subject: String, inner: async_nats::Subscriber) -> Self {
52        Self {
53            subject,
54            kind: SubscriberKind::Core { inner: Some(inner) },
55        }
56    }
57
58    pub(crate) fn from_jetstream(subject: String, stream_name: String, inner: PullStream) -> Self {
59        Self {
60            subject,
61            kind: SubscriberKind::JetStream {
62                inner: Some(Box::new(inner)),
63                stream_name,
64            },
65        }
66    }
67}
68
69impl Subscriber for NatsSubscriber {
70    type Message = NatsMessage;
71    type Error = NatsError;
72
73    fn stream(&mut self) -> impl Stream<Item = Result<Self::Message, Self::Error>> + Send + '_ {
74        match &mut self.kind {
75            SubscriberKind::Core { inner } => {
76                let inner = inner
77                    .take()
78                    .expect("NatsSubscriber::stream called more than once");
79                Either::Left(
80                    inner.map(|msg| Ok(NatsMessage::Core(Box::new(CoreMessage::new(msg))))),
81                )
82            }
83            SubscriberKind::JetStream { inner, .. } => {
84                let inner = *inner
85                    .take()
86                    .expect("NatsSubscriber::stream called more than once");
87                Either::Right(inner.map(|item| match item {
88                    Ok(msg) => Ok(NatsMessage::JetStream(Box::new(JetStreamMessage::new(msg)))),
89                    Err(err) => {
90                        warn!(target: "ruststream::nats", error = %err, "jetstream fetch error");
91                        Err(NatsError::JetStream(Box::new(err)))
92                    }
93                }))
94            }
95        }
96    }
97}