ruststream_nats/
subscriber.rs1use async_nats::jetstream::consumer::{PullConsumer, pull::Stream as PullStream};
4use futures::stream::{poll_fn, unfold};
5use futures::{Stream, StreamExt, future::Either};
6use ruststream::{BatchSubscriber, Subscriber};
7use std::fmt::{Debug, Formatter};
8use std::{pin::Pin, task::Poll, time::Duration};
9use tracing::warn;
10
11use crate::{
12 error::NatsError,
13 message::{CoreMessage, JetStreamMessage, NatsMessage},
14};
15
16const CORE_BATCH_LIMIT: usize = 256;
19
20enum SubscriberKind {
21 Core { inner: async_nats::Subscriber },
22 JetStream(Box<JetStreamKind>),
25}
26
27struct JetStreamKind {
28 inner: Pin<Box<PullStream>>,
29 consumer: PullConsumer,
30 stream_name: String,
31 pull_batch: usize,
32 pull_expires: Duration,
33}
34
35pub struct NatsSubscriber {
41 subject: String,
42 kind: SubscriberKind,
43}
44
45impl Debug for NatsSubscriber {
46 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47 let mut s = f.debug_struct("NatsSubscriber");
48 s.field("subject", &self.subject);
49 match &self.kind {
50 SubscriberKind::Core { .. } => {
51 s.field("kind", &"core");
52 }
53 SubscriberKind::JetStream(js) => {
54 s.field("kind", &"jetstream")
55 .field("stream", &js.stream_name);
56 }
57 }
58 s.finish_non_exhaustive()
59 }
60}
61
62impl NatsSubscriber {
63 pub(crate) const fn from_core(subject: String, inner: async_nats::Subscriber) -> Self {
64 Self {
65 subject,
66 kind: SubscriberKind::Core { inner },
67 }
68 }
69
70 pub(crate) fn from_jetstream(
71 subject: String,
72 stream_name: String,
73 inner: PullStream,
74 consumer: PullConsumer,
75 pull_batch: usize,
76 pull_expires: Duration,
77 ) -> Self {
78 Self {
79 subject,
80 kind: SubscriberKind::JetStream(Box::new(JetStreamKind {
81 inner: Box::pin(inner),
82 consumer,
83 stream_name,
84 pull_batch,
85 pull_expires,
86 })),
87 }
88 }
89}
90
91fn core_message(msg: async_nats::Message) -> NatsMessage {
92 NatsMessage::Core(Box::new(CoreMessage::new(msg)))
93}
94
95fn jetstream_message(msg: async_nats::jetstream::Message) -> NatsMessage {
96 NatsMessage::JetStream(Box::new(JetStreamMessage::new(msg)))
97}
98
99impl Subscriber for NatsSubscriber {
100 type Message = NatsMessage;
101 type Error = NatsError;
102
103 fn stream(&mut self) -> impl Stream<Item = Result<Self::Message, Self::Error>> + Send + '_ {
104 match &mut self.kind {
108 SubscriberKind::Core { inner } => Either::Left(
109 poll_fn(move |cx| Pin::new(&mut *inner).poll_next(cx))
110 .map(|msg| Ok(core_message(msg))),
111 ),
112 SubscriberKind::JetStream(js) => Either::Right(
113 poll_fn(move |cx| js.inner.as_mut().poll_next(cx)).map(|item| match item {
114 Ok(msg) => Ok(jetstream_message(msg)),
115 Err(err) => {
116 warn!(target: "ruststream::nats", error = %err, "jetstream fetch error");
117 Err(NatsError::JetStream(Box::new(err)))
118 }
119 }),
120 ),
121 }
122 }
123}
124
125impl BatchSubscriber for NatsSubscriber {
126 type Batch = Vec<NatsMessage>;
127
128 fn batches(&mut self) -> impl Stream<Item = Result<Self::Batch, Self::Error>> + Send + '_ {
147 match &mut self.kind {
148 SubscriberKind::Core { inner } => Either::Left(poll_fn(move |cx| {
149 let first = match Pin::new(&mut *inner).poll_next(cx) {
150 Poll::Pending => return Poll::Pending,
151 Poll::Ready(None) => return Poll::Ready(None),
152 Poll::Ready(Some(msg)) => msg,
153 };
154 let mut batch = vec![core_message(first)];
155 while batch.len() < CORE_BATCH_LIMIT {
156 match Pin::new(&mut *inner).poll_next(cx) {
157 Poll::Ready(Some(msg)) => batch.push(core_message(msg)),
158 Poll::Ready(None) | Poll::Pending => break,
159 }
160 }
161 Poll::Ready(Some(Ok(batch)))
162 })),
163 SubscriberKind::JetStream(js) => {
164 let max = js.pull_batch;
165 let expires = js.pull_expires;
166 Either::Right(unfold(&mut js.consumer, move |consumer| async move {
167 loop {
168 let fetch = consumer
169 .fetch()
170 .max_messages(max)
171 .expires(expires)
172 .messages()
173 .await;
174 let mut messages = match fetch {
175 Ok(messages) => messages,
176 Err(err) => {
178 return Some((Err(NatsError::JetStream(Box::new(err))), consumer));
179 }
180 };
181 let mut batch = Vec::new();
182 while let Some(item) = messages.next().await {
183 match item {
184 Ok(msg) => batch.push(jetstream_message(msg)),
185 Err(err) => {
187 if batch.is_empty() {
188 return Some((Err(NatsError::JetStream(err)), consumer));
189 }
190 warn!(
191 target: "ruststream::nats",
192 error = %err,
193 "jetstream fetch error mid-batch; delivering the partial batch",
194 );
195 break;
196 }
197 }
198 }
199 if !batch.is_empty() {
200 return Some((Ok(batch), consumer));
201 }
202 }
204 }))
205 }
206 }
207 }
208}