intercom_rs/jetstream/
consumer.rs

1//! JetStream consumer types and builders.
2
3use std::{
4    marker::PhantomData,
5    pin::Pin,
6    task::{Context, Poll},
7};
8
9use futures::Stream;
10use pin_project_lite::pin_project;
11use serde::de::DeserializeOwned;
12
13use crate::{codec::CodecType, error::Result};
14
15/// A typed message from a JetStream consumer.
16#[derive(Debug)]
17pub struct JetStreamMessage<T> {
18    /// The decoded message payload.
19    pub payload: T,
20    /// The subject the message was published to.
21    pub subject: String,
22    /// The stream sequence number.
23    pub stream_sequence: u64,
24    /// The consumer sequence number.
25    pub consumer_sequence: u64,
26    /// The reply subject for request-reply patterns.
27    pub reply: Option<String>,
28    /// The raw message for acknowledgment.
29    raw: async_nats::jetstream::Message,
30}
31
32impl<T> JetStreamMessage<T> {
33    /// Acknowledge the message.
34    pub async fn ack(&self) -> Result<()> {
35        self.raw
36            .ack()
37            .await
38            .map_err(|e| crate::error::Error::JetStream(e.to_string()))
39    }
40
41    /// Acknowledge the message with double ack.
42    pub async fn double_ack(&self) -> Result<()> {
43        self.raw
44            .double_ack()
45            .await
46            .map_err(|e| crate::error::Error::JetStream(e.to_string()))
47    }
48
49    /// Negatively acknowledge the message.
50    pub async fn nak(&self) -> Result<()> {
51        self.raw
52            .ack_with(async_nats::jetstream::AckKind::Nak(None))
53            .await
54            .map_err(|e| crate::error::Error::JetStream(e.to_string()))
55    }
56
57    /// Negatively acknowledge with a delay before redelivery.
58    pub async fn nak_with_delay(&self, delay: std::time::Duration) -> Result<()> {
59        self.raw
60            .ack_with(async_nats::jetstream::AckKind::Nak(Some(delay)))
61            .await
62            .map_err(|e| crate::error::Error::JetStream(e.to_string()))
63    }
64
65    /// Mark the message as in progress.
66    pub async fn in_progress(&self) -> Result<()> {
67        self.raw
68            .ack_with(async_nats::jetstream::AckKind::Progress)
69            .await
70            .map_err(|e| crate::error::Error::JetStream(e.to_string()))
71    }
72
73    /// Terminate the message (will not be redelivered).
74    pub async fn term(&self) -> Result<()> {
75        self.raw
76            .ack_with(async_nats::jetstream::AckKind::Term)
77            .await
78            .map_err(|e| crate::error::Error::JetStream(e.to_string()))
79    }
80
81    /// Get the raw underlying JetStream message.
82    pub fn raw(&self) -> &async_nats::jetstream::Message {
83        &self.raw
84    }
85}
86
87// ============================================================================
88// Pull Consumer
89// ============================================================================
90
91/// A typed pull consumer with configurable codec.
92///
93/// # Type Parameters
94///
95/// * `T` - The message type
96/// * `C` - The codec type used for deserialization
97pub struct PullConsumer<T, C: CodecType> {
98    inner: async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::pull::Config>,
99    _marker: PhantomData<(T, C)>,
100}
101
102impl<T, C: CodecType> PullConsumer<T, C> {
103    /// Create a new pull consumer wrapper.
104    pub(crate) fn new(
105        inner: async_nats::jetstream::consumer::Consumer<
106            async_nats::jetstream::consumer::pull::Config,
107        >,
108    ) -> Self {
109        Self {
110            inner,
111            _marker: PhantomData,
112        }
113    }
114
115    /// Get the underlying async-nats consumer.
116    pub fn inner(
117        &self,
118    ) -> &async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::pull::Config>
119    {
120        &self.inner
121    }
122
123    /// Get consumer name.
124    pub fn name(&self) -> &str {
125        &self.inner.cached_info().name
126    }
127}
128
129impl<T: DeserializeOwned, C: CodecType> PullConsumer<T, C> {
130    /// Fetch a batch of messages.
131    ///
132    /// # Example
133    ///
134    /// ```no_run
135    /// use intercom::{Client, MsgPackCodec};
136    /// use serde::{Deserialize, Serialize};
137    /// use futures::StreamExt;
138    ///
139    /// #[derive(Serialize, Deserialize, Debug)]
140    /// struct Event { id: u64 }
141    ///
142    /// # async fn example() -> intercom::Result<()> {
143    /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
144    /// let jetstream = client.jetstream();
145    /// let stream = jetstream.get_stream("events").await?;
146    /// let consumer = stream.get_pull_consumer::<Event>("my-consumer").await?;
147    ///
148    /// let mut messages = consumer.fetch(10).await?;
149    /// while let Some(result) = messages.next().await {
150    ///     let msg = result?;
151    ///     println!("Got: {:?}", msg.payload);
152    ///     msg.ack().await?;
153    /// }
154    /// # Ok(())
155    /// # }
156    /// ```
157    pub async fn fetch(&self, batch_size: usize) -> Result<PullBatch<T, C>> {
158        let inner = self
159            .inner
160            .fetch()
161            .max_messages(batch_size)
162            .messages()
163            .await
164            .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
165        Ok(PullBatch::new(inner))
166    }
167
168    /// Fetch messages with a builder for advanced options.
169    pub fn fetch_builder(&self) -> FetchBuilder<T, C> {
170        FetchBuilder::new(self.inner.clone())
171    }
172
173    /// Create a continuous message stream.
174    ///
175    /// Returns a [`Stream`] that continuously delivers messages.
176    ///
177    /// # Example
178    ///
179    /// ```no_run
180    /// use intercom::{Client, MsgPackCodec};
181    /// use serde::{Deserialize, Serialize};
182    /// use futures::StreamExt;
183    ///
184    /// #[derive(Serialize, Deserialize, Debug)]
185    /// struct Event { id: u64 }
186    ///
187    /// # async fn example() -> intercom::Result<()> {
188    /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
189    /// let jetstream = client.jetstream();
190    /// let stream = jetstream.get_stream("events").await?;
191    /// let consumer = stream.get_pull_consumer::<Event>("my-consumer").await?;
192    ///
193    /// let mut messages = consumer.messages().await?;
194    /// while let Some(result) = messages.next().await {
195    ///     let msg = result?;
196    ///     println!("Got: {:?}", msg.payload);
197    ///     msg.ack().await?;
198    /// }
199    /// # Ok(())
200    /// # }
201    /// ```
202    pub async fn messages(&self) -> Result<PullMessages<T, C>> {
203        let inner = self
204            .inner
205            .messages()
206            .await
207            .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
208        Ok(PullMessages::new(inner))
209    }
210}
211
212pin_project! {
213    /// A batch of messages from a fetch operation.
214    pub struct PullBatch<T, C: CodecType> {
215        #[pin]
216        inner: async_nats::jetstream::consumer::pull::Batch,
217        _marker: PhantomData<(T, C)>,
218    }
219}
220
221impl<T, C: CodecType> PullBatch<T, C> {
222    fn new(inner: async_nats::jetstream::consumer::pull::Batch) -> Self {
223        Self {
224            inner,
225            _marker: PhantomData,
226        }
227    }
228}
229
230impl<T: DeserializeOwned, C: CodecType> Stream for PullBatch<T, C> {
231    type Item = Result<JetStreamMessage<T>>;
232
233    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
234        let this = self.project();
235
236        match this.inner.poll_next(cx) {
237            Poll::Ready(Some(Ok(msg))) => {
238                let info = msg.info().ok();
239                let stream_sequence = info.as_ref().map(|i| i.stream_sequence).unwrap_or(0);
240                let consumer_sequence = info.as_ref().map(|i| i.consumer_sequence).unwrap_or(0);
241                let result = C::decode(&msg.payload).map(|payload| JetStreamMessage {
242                    payload,
243                    subject: msg.subject.to_string(),
244                    stream_sequence,
245                    consumer_sequence,
246                    reply: msg.reply.clone().map(|s| s.to_string()),
247                    raw: msg,
248                });
249                Poll::Ready(Some(result))
250            }
251            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(
252                crate::error::Error::JetStreamConsumer(e.to_string()),
253            ))),
254            Poll::Ready(None) => Poll::Ready(None),
255            Poll::Pending => Poll::Pending,
256        }
257    }
258}
259
260pin_project! {
261    /// A stream of messages from a pull consumer.
262    pub struct PullMessages<T, C: CodecType> {
263        #[pin]
264        inner: async_nats::jetstream::consumer::pull::Stream,
265        _marker: PhantomData<(T, C)>,
266    }
267}
268
269impl<T, C: CodecType> PullMessages<T, C> {
270    fn new(inner: async_nats::jetstream::consumer::pull::Stream) -> Self {
271        Self {
272            inner,
273            _marker: PhantomData,
274        }
275    }
276}
277
278impl<T: DeserializeOwned, C: CodecType> Stream for PullMessages<T, C> {
279    type Item = Result<JetStreamMessage<T>>;
280
281    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
282        let this = self.project();
283
284        match this.inner.poll_next(cx) {
285            Poll::Ready(Some(Ok(msg))) => {
286                let info = msg.info().ok();
287                let stream_sequence = info.as_ref().map(|i| i.stream_sequence).unwrap_or(0);
288                let consumer_sequence = info.as_ref().map(|i| i.consumer_sequence).unwrap_or(0);
289                let result = C::decode(&msg.payload).map(|payload| JetStreamMessage {
290                    payload,
291                    subject: msg.subject.to_string(),
292                    stream_sequence,
293                    consumer_sequence,
294                    reply: msg.reply.clone().map(|s| s.to_string()),
295                    raw: msg,
296                });
297                Poll::Ready(Some(result))
298            }
299            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(
300                crate::error::Error::JetStreamConsumer(e.to_string()),
301            ))),
302            Poll::Ready(None) => Poll::Ready(None),
303            Poll::Pending => Poll::Pending,
304        }
305    }
306}
307
308/// Builder for fetching messages from a pull consumer.
309pub struct FetchBuilder<T, C: CodecType> {
310    inner: async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::pull::Config>,
311    max_messages: usize,
312    max_bytes: Option<usize>,
313    expires: Option<std::time::Duration>,
314    idle_heartbeat: Option<std::time::Duration>,
315    _marker: PhantomData<(T, C)>,
316}
317
318impl<T, C: CodecType> FetchBuilder<T, C> {
319    fn new(
320        inner: async_nats::jetstream::consumer::Consumer<
321            async_nats::jetstream::consumer::pull::Config,
322        >,
323    ) -> Self {
324        Self {
325            inner,
326            max_messages: 10,
327            max_bytes: None,
328            expires: None,
329            idle_heartbeat: None,
330            _marker: PhantomData,
331        }
332    }
333
334    /// Set the maximum number of messages to fetch.
335    pub fn max_messages(mut self, max: usize) -> Self {
336        self.max_messages = max;
337        self
338    }
339
340    /// Set the maximum bytes to fetch.
341    pub fn max_bytes(mut self, max: usize) -> Self {
342        self.max_bytes = Some(max);
343        self
344    }
345
346    /// Set the expiration time.
347    pub fn expires(mut self, duration: std::time::Duration) -> Self {
348        self.expires = Some(duration);
349        self
350    }
351
352    /// Set the idle heartbeat interval.
353    pub fn idle_heartbeat(mut self, duration: std::time::Duration) -> Self {
354        self.idle_heartbeat = Some(duration);
355        self
356    }
357}
358
359impl<T: DeserializeOwned, C: CodecType> FetchBuilder<T, C> {
360    /// Execute the fetch.
361    pub async fn fetch(self) -> Result<PullBatch<T, C>> {
362        let mut fetch = self.inner.fetch().max_messages(self.max_messages);
363
364        if let Some(bytes) = self.max_bytes {
365            fetch = fetch.max_bytes(bytes);
366        }
367        if let Some(expires) = self.expires {
368            fetch = fetch.expires(expires);
369        }
370        if let Some(heartbeat) = self.idle_heartbeat {
371            fetch = fetch.heartbeat(heartbeat);
372        }
373
374        let inner = fetch
375            .messages()
376            .await
377            .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
378        Ok(PullBatch::new(inner))
379    }
380}
381
382// ============================================================================
383// Push Consumer
384// ============================================================================
385
386/// A typed push consumer with configurable codec.
387///
388/// # Type Parameters
389///
390/// * `T` - The message type
391/// * `C` - The codec type used for deserialization
392pub struct PushConsumer<T, C: CodecType> {
393    inner: async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::push::Config>,
394    _marker: PhantomData<(T, C)>,
395}
396
397impl<T, C: CodecType> PushConsumer<T, C> {
398    /// Create a new push consumer wrapper.
399    pub(crate) fn new(
400        inner: async_nats::jetstream::consumer::Consumer<
401            async_nats::jetstream::consumer::push::Config,
402        >,
403    ) -> Self {
404        Self {
405            inner,
406            _marker: PhantomData,
407        }
408    }
409
410    /// Get the underlying async-nats consumer.
411    pub fn inner(
412        &self,
413    ) -> &async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::push::Config>
414    {
415        &self.inner
416    }
417
418    /// Get consumer name.
419    pub fn name(&self) -> &str {
420        &self.inner.cached_info().name
421    }
422}
423
424impl<T: DeserializeOwned, C: CodecType> PushConsumer<T, C> {
425    /// Get a message stream from this push consumer.
426    pub async fn messages(&self) -> Result<PushMessages<T, C>> {
427        let inner = self
428            .inner
429            .messages()
430            .await
431            .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
432        Ok(PushMessages::new(inner))
433    }
434}
435
436pin_project! {
437    /// A stream of messages from a push consumer.
438    pub struct PushMessages<T, C: CodecType> {
439        #[pin]
440        inner: async_nats::jetstream::consumer::push::Messages,
441        _marker: PhantomData<(T, C)>,
442    }
443}
444
445impl<T, C: CodecType> PushMessages<T, C> {
446    fn new(inner: async_nats::jetstream::consumer::push::Messages) -> Self {
447        Self {
448            inner,
449            _marker: PhantomData,
450        }
451    }
452}
453
454impl<T: DeserializeOwned, C: CodecType> Stream for PushMessages<T, C> {
455    type Item = Result<JetStreamMessage<T>>;
456
457    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
458        let this = self.project();
459
460        match this.inner.poll_next(cx) {
461            Poll::Ready(Some(Ok(msg))) => {
462                let info = msg.info().ok();
463                let stream_sequence = info.as_ref().map(|i| i.stream_sequence).unwrap_or(0);
464                let consumer_sequence = info.as_ref().map(|i| i.consumer_sequence).unwrap_or(0);
465                let result = C::decode(&msg.payload).map(|payload| JetStreamMessage {
466                    payload,
467                    subject: msg.subject.to_string(),
468                    stream_sequence,
469                    consumer_sequence,
470                    reply: msg.reply.clone().map(|s| s.to_string()),
471                    raw: msg,
472                });
473                Poll::Ready(Some(result))
474            }
475            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(
476                crate::error::Error::JetStreamConsumer(e.to_string()),
477            ))),
478            Poll::Ready(None) => Poll::Ready(None),
479            Poll::Pending => Poll::Pending,
480        }
481    }
482}
483
484// ============================================================================
485// Pull Consumer Builder
486// ============================================================================
487
488/// Builder for creating pull consumers.
489pub struct PullConsumerBuilder<T, C: CodecType> {
490    stream: async_nats::jetstream::stream::Stream,
491    config: async_nats::jetstream::consumer::pull::Config,
492    _marker: PhantomData<(T, C)>,
493}
494
495impl<T, C: CodecType> PullConsumerBuilder<T, C> {
496    /// Create a new pull consumer builder.
497    pub(crate) fn new(stream: async_nats::jetstream::stream::Stream, name: String) -> Self {
498        Self {
499            stream,
500            config: async_nats::jetstream::consumer::pull::Config {
501                name: Some(name),
502                ..Default::default()
503            },
504            _marker: PhantomData,
505        }
506    }
507
508    /// Make this a durable consumer.
509    pub fn durable(mut self) -> Self {
510        self.config.durable_name = self.config.name.clone();
511        self
512    }
513
514    /// Set the durable name (different from consumer name).
515    pub fn durable_name(mut self, name: impl Into<String>) -> Self {
516        self.config.durable_name = Some(name.into());
517        self
518    }
519
520    /// Set a filter subject.
521    pub fn filter_subject(mut self, subject: impl Into<String>) -> Self {
522        self.config.filter_subject = subject.into();
523        self
524    }
525
526    /// Set multiple filter subjects.
527    pub fn filter_subjects(mut self, subjects: Vec<String>) -> Self {
528        self.config.filter_subjects = subjects;
529        self
530    }
531
532    /// Set the description.
533    pub fn description(mut self, description: impl Into<String>) -> Self {
534        self.config.description = Some(description.into());
535        self
536    }
537
538    /// Set the ack policy.
539    pub fn ack_policy(mut self, policy: AckPolicy) -> Self {
540        self.config.ack_policy = policy.into();
541        self
542    }
543
544    /// Set the ack wait duration.
545    pub fn ack_wait(mut self, duration: std::time::Duration) -> Self {
546        self.config.ack_wait = duration;
547        self
548    }
549
550    /// Set the maximum number of deliveries.
551    pub fn max_deliver(mut self, max: i64) -> Self {
552        self.config.max_deliver = max;
553        self
554    }
555
556    /// Set the replay policy.
557    pub fn replay_policy(mut self, policy: ReplayPolicy) -> Self {
558        self.config.replay_policy = policy.into();
559        self
560    }
561
562    /// Set the deliver policy.
563    pub fn deliver_policy(mut self, policy: DeliverPolicy) -> Self {
564        self.config.deliver_policy = policy.into();
565        self
566    }
567
568    /// Start at a specific sequence.
569    pub fn start_sequence(mut self, seq: u64) -> Self {
570        self.config.deliver_policy =
571            async_nats::jetstream::consumer::DeliverPolicy::ByStartSequence {
572                start_sequence: seq,
573            };
574        self
575    }
576
577    /// Start at a specific time.
578    pub fn start_time(mut self, time: time::OffsetDateTime) -> Self {
579        self.config.deliver_policy =
580            async_nats::jetstream::consumer::DeliverPolicy::ByStartTime { start_time: time };
581        self
582    }
583
584    /// Set the maximum ack pending.
585    pub fn max_ack_pending(mut self, max: i64) -> Self {
586        self.config.max_ack_pending = max;
587        self
588    }
589
590    /// Set the maximum waiting.
591    pub fn max_waiting(mut self, max: i64) -> Self {
592        self.config.max_waiting = max;
593        self
594    }
595
596    /// Set the maximum batch size.
597    pub fn max_batch(mut self, max: i64) -> Self {
598        self.config.max_batch = max;
599        self
600    }
601
602    /// Set the maximum bytes.
603    pub fn max_bytes(mut self, max: i64) -> Self {
604        self.config.max_bytes = max;
605        self
606    }
607
608    /// Set the maximum expiry.
609    pub fn max_expires(mut self, duration: std::time::Duration) -> Self {
610        self.config.max_expires = duration;
611        self
612    }
613
614    /// Set the inactive threshold.
615    pub fn inactive_threshold(mut self, duration: std::time::Duration) -> Self {
616        self.config.inactive_threshold = duration;
617        self
618    }
619
620    /// Set headers only.
621    pub fn headers_only(mut self, headers_only: bool) -> Self {
622        self.config.headers_only = headers_only;
623        self
624    }
625
626    /// Set the number of replicas.
627    pub fn replicas(mut self, replicas: usize) -> Self {
628        self.config.num_replicas = replicas;
629        self
630    }
631
632    /// Set memory storage.
633    pub fn memory_storage(mut self, memory: bool) -> Self {
634        self.config.memory_storage = memory;
635        self
636    }
637
638    /// Set the backoff durations for redelivery.
639    pub fn backoff(mut self, backoff: Vec<std::time::Duration>) -> Self {
640        self.config.backoff = backoff;
641        self
642    }
643
644    /// Set metadata.
645    pub fn metadata(mut self, metadata: std::collections::HashMap<String, String>) -> Self {
646        self.config.metadata = metadata;
647        self
648    }
649
650    /// Create the consumer.
651    pub async fn create(self) -> Result<PullConsumer<T, C>> {
652        let inner = self
653            .stream
654            .create_consumer(self.config)
655            .await
656            .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
657        Ok(PullConsumer::new(inner))
658    }
659
660    /// Create or update the consumer.
661    pub async fn create_or_update(self) -> Result<PullConsumer<T, C>> {
662        let name = self.config.name.clone().unwrap_or_default();
663        let inner = self
664            .stream
665            .get_or_create_consumer(&name, self.config)
666            .await
667            .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
668        Ok(PullConsumer::new(inner))
669    }
670}
671
672// ============================================================================
673// Push Consumer Builder
674// ============================================================================
675
676/// Builder for creating push consumers.
677pub struct PushConsumerBuilder<T, C: CodecType> {
678    stream: async_nats::jetstream::stream::Stream,
679    config: async_nats::jetstream::consumer::push::Config,
680    _marker: PhantomData<(T, C)>,
681}
682
683impl<T, C: CodecType> PushConsumerBuilder<T, C> {
684    /// Create a new push consumer builder.
685    pub(crate) fn new(stream: async_nats::jetstream::stream::Stream, name: String) -> Self {
686        Self {
687            stream,
688            config: async_nats::jetstream::consumer::push::Config {
689                name: Some(name),
690                ..Default::default()
691            },
692            _marker: PhantomData,
693        }
694    }
695
696    /// Make this a durable consumer.
697    pub fn durable(mut self) -> Self {
698        self.config.durable_name = self.config.name.clone();
699        self
700    }
701
702    /// Set the durable name.
703    pub fn durable_name(mut self, name: impl Into<String>) -> Self {
704        self.config.durable_name = Some(name.into());
705        self
706    }
707
708    /// Set the deliver subject (required for push consumers).
709    pub fn deliver_subject(mut self, subject: impl Into<String>) -> Self {
710        self.config.deliver_subject = subject.into();
711        self
712    }
713
714    /// Set the deliver group (for queue groups).
715    pub fn deliver_group(mut self, group: impl Into<String>) -> Self {
716        self.config.deliver_group = Some(group.into());
717        self
718    }
719
720    /// Set a filter subject.
721    pub fn filter_subject(mut self, subject: impl Into<String>) -> Self {
722        self.config.filter_subject = subject.into();
723        self
724    }
725
726    /// Set multiple filter subjects.
727    pub fn filter_subjects(mut self, subjects: Vec<String>) -> Self {
728        self.config.filter_subjects = subjects;
729        self
730    }
731
732    /// Set the description.
733    pub fn description(mut self, description: impl Into<String>) -> Self {
734        self.config.description = Some(description.into());
735        self
736    }
737
738    /// Set the ack policy.
739    pub fn ack_policy(mut self, policy: AckPolicy) -> Self {
740        self.config.ack_policy = policy.into();
741        self
742    }
743
744    /// Set the ack wait duration.
745    pub fn ack_wait(mut self, duration: std::time::Duration) -> Self {
746        self.config.ack_wait = duration;
747        self
748    }
749
750    /// Set the maximum number of deliveries.
751    pub fn max_deliver(mut self, max: i64) -> Self {
752        self.config.max_deliver = max;
753        self
754    }
755
756    /// Set the replay policy.
757    pub fn replay_policy(mut self, policy: ReplayPolicy) -> Self {
758        self.config.replay_policy = policy.into();
759        self
760    }
761
762    /// Set the deliver policy.
763    pub fn deliver_policy(mut self, policy: DeliverPolicy) -> Self {
764        self.config.deliver_policy = policy.into();
765        self
766    }
767
768    /// Start at a specific sequence.
769    pub fn start_sequence(mut self, seq: u64) -> Self {
770        self.config.deliver_policy =
771            async_nats::jetstream::consumer::DeliverPolicy::ByStartSequence {
772                start_sequence: seq,
773            };
774        self
775    }
776
777    /// Start at a specific time.
778    pub fn start_time(mut self, time: time::OffsetDateTime) -> Self {
779        self.config.deliver_policy =
780            async_nats::jetstream::consumer::DeliverPolicy::ByStartTime { start_time: time };
781        self
782    }
783
784    /// Set the rate limit.
785    pub fn rate_limit(mut self, limit: u64) -> Self {
786        self.config.rate_limit = limit;
787        self
788    }
789
790    /// Set the maximum ack pending.
791    pub fn max_ack_pending(mut self, max: i64) -> Self {
792        self.config.max_ack_pending = max;
793        self
794    }
795
796    /// Set the idle heartbeat.
797    pub fn idle_heartbeat(mut self, duration: std::time::Duration) -> Self {
798        self.config.idle_heartbeat = duration;
799        self
800    }
801
802    /// Enable flow control.
803    pub fn flow_control(mut self, enabled: bool) -> Self {
804        self.config.flow_control = enabled;
805        self
806    }
807
808    /// Set headers only.
809    pub fn headers_only(mut self, headers_only: bool) -> Self {
810        self.config.headers_only = headers_only;
811        self
812    }
813
814    /// Set the number of replicas.
815    pub fn replicas(mut self, replicas: usize) -> Self {
816        self.config.num_replicas = replicas;
817        self
818    }
819
820    /// Set memory storage.
821    pub fn memory_storage(mut self, memory: bool) -> Self {
822        self.config.memory_storage = memory;
823        self
824    }
825
826    /// Set the backoff durations for redelivery.
827    pub fn backoff(mut self, backoff: Vec<std::time::Duration>) -> Self {
828        self.config.backoff = backoff;
829        self
830    }
831
832    /// Set metadata.
833    pub fn metadata(mut self, metadata: std::collections::HashMap<String, String>) -> Self {
834        self.config.metadata = metadata;
835        self
836    }
837
838    /// Set the inactive threshold.
839    pub fn inactive_threshold(mut self, duration: std::time::Duration) -> Self {
840        self.config.inactive_threshold = duration;
841        self
842    }
843
844    /// Create the consumer.
845    pub async fn create(self) -> Result<PushConsumer<T, C>> {
846        let inner = self
847            .stream
848            .create_consumer(self.config)
849            .await
850            .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
851        Ok(PushConsumer::new(inner))
852    }
853
854    /// Create or update the consumer.
855    pub async fn create_or_update(self) -> Result<PushConsumer<T, C>> {
856        let name = self.config.name.clone().unwrap_or_default();
857        let inner = self
858            .stream
859            .get_or_create_consumer(&name, self.config)
860            .await
861            .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
862        Ok(PushConsumer::new(inner))
863    }
864}
865
866// ============================================================================
867// Consumer Policies
868// ============================================================================
869
870/// Acknowledgment policy for consumers.
871#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
872pub enum AckPolicy {
873    /// Acknowledge each message individually.
874    #[default]
875    Explicit,
876    /// Messages are considered acknowledged on delivery.
877    None,
878    /// Acknowledge all messages up to and including the acknowledged one.
879    All,
880}
881
882impl From<AckPolicy> for async_nats::jetstream::consumer::AckPolicy {
883    fn from(policy: AckPolicy) -> Self {
884        match policy {
885            AckPolicy::Explicit => async_nats::jetstream::consumer::AckPolicy::Explicit,
886            AckPolicy::None => async_nats::jetstream::consumer::AckPolicy::None,
887            AckPolicy::All => async_nats::jetstream::consumer::AckPolicy::All,
888        }
889    }
890}
891
892/// Replay policy for consumers.
893#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
894pub enum ReplayPolicy {
895    /// Replay messages as fast as possible.
896    #[default]
897    Instant,
898    /// Replay messages at the original rate.
899    Original,
900}
901
902impl From<ReplayPolicy> for async_nats::jetstream::consumer::ReplayPolicy {
903    fn from(policy: ReplayPolicy) -> Self {
904        match policy {
905            ReplayPolicy::Instant => async_nats::jetstream::consumer::ReplayPolicy::Instant,
906            ReplayPolicy::Original => async_nats::jetstream::consumer::ReplayPolicy::Original,
907        }
908    }
909}
910
911/// Deliver policy for consumers.
912#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
913pub enum DeliverPolicy {
914    /// Deliver all messages.
915    #[default]
916    All,
917    /// Deliver only the last message.
918    Last,
919    /// Deliver only new messages.
920    New,
921    /// Deliver the last message per subject.
922    LastPerSubject,
923}
924
925impl From<DeliverPolicy> for async_nats::jetstream::consumer::DeliverPolicy {
926    fn from(policy: DeliverPolicy) -> Self {
927        match policy {
928            DeliverPolicy::All => async_nats::jetstream::consumer::DeliverPolicy::All,
929            DeliverPolicy::Last => async_nats::jetstream::consumer::DeliverPolicy::Last,
930            DeliverPolicy::New => async_nats::jetstream::consumer::DeliverPolicy::New,
931            DeliverPolicy::LastPerSubject => {
932                async_nats::jetstream::consumer::DeliverPolicy::LastPerSubject
933            }
934        }
935    }
936}