Skip to main content

intercom_rs/jetstream/
consumer.rs

1//! JetStream consumer types and builders.
2
3use std::{
4    marker::PhantomData,
5    pin::Pin,
6    task::{Context, Poll},
7};
8
9use futures::Stream;
10use pin_project_lite::pin_project;
11use serde::de::DeserializeOwned;
12
13use crate::{codec::CodecType, error::Result};
14
15/// A typed message from a JetStream consumer.
16#[derive(Debug)]
17pub struct JetStreamMessage<T> {
18    /// The decoded message payload.
19    pub payload: T,
20    /// The subject the message was published to.
21    pub subject: String,
22    /// The stream sequence number.
23    pub stream_sequence: u64,
24    /// The consumer sequence number.
25    pub consumer_sequence: u64,
26    /// The raw message for acknowledgment.
27    raw: async_nats::jetstream::Message,
28}
29
30impl<T> JetStreamMessage<T> {
31    /// Acknowledge the message.
32    pub async fn ack(&self) -> Result<()> {
33        self.raw
34            .ack()
35            .await
36            .map_err(|e| crate::error::Error::JetStream(e.to_string()))
37    }
38
39    /// Acknowledge the message with double ack.
40    pub async fn double_ack(&self) -> Result<()> {
41        self.raw
42            .double_ack()
43            .await
44            .map_err(|e| crate::error::Error::JetStream(e.to_string()))
45    }
46
47    /// Negatively acknowledge the message.
48    pub async fn nak(&self) -> Result<()> {
49        self.raw
50            .ack_with(async_nats::jetstream::AckKind::Nak(None))
51            .await
52            .map_err(|e| crate::error::Error::JetStream(e.to_string()))
53    }
54
55    /// Negatively acknowledge with a delay before redelivery.
56    pub async fn nak_with_delay(&self, delay: std::time::Duration) -> Result<()> {
57        self.raw
58            .ack_with(async_nats::jetstream::AckKind::Nak(Some(delay)))
59            .await
60            .map_err(|e| crate::error::Error::JetStream(e.to_string()))
61    }
62
63    /// Mark the message as in progress.
64    pub async fn in_progress(&self) -> Result<()> {
65        self.raw
66            .ack_with(async_nats::jetstream::AckKind::Progress)
67            .await
68            .map_err(|e| crate::error::Error::JetStream(e.to_string()))
69    }
70
71    /// Terminate the message (will not be redelivered).
72    pub async fn term(&self) -> Result<()> {
73        self.raw
74            .ack_with(async_nats::jetstream::AckKind::Term)
75            .await
76            .map_err(|e| crate::error::Error::JetStream(e.to_string()))
77    }
78
79    /// Get the raw underlying JetStream message.
80    pub fn raw(&self) -> &async_nats::jetstream::Message {
81        &self.raw
82    }
83}
84
85// ============================================================================
86// Pull Consumer
87// ============================================================================
88
89/// A typed pull consumer with configurable codec.
90///
91/// # Type Parameters
92///
93/// * `T` - The message type
94/// * `C` - The codec type used for deserialization
95pub struct PullConsumer<T, C: CodecType> {
96    inner: async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::pull::Config>,
97    _marker: PhantomData<(T, C)>,
98}
99
100impl<T, C: CodecType> PullConsumer<T, C> {
101    /// Create a new pull consumer wrapper.
102    pub(crate) fn new(
103        inner: async_nats::jetstream::consumer::Consumer<
104            async_nats::jetstream::consumer::pull::Config,
105        >,
106    ) -> Self {
107        Self {
108            inner,
109            _marker: PhantomData,
110        }
111    }
112
113    /// Get the underlying async-nats consumer.
114    pub fn inner(
115        &self,
116    ) -> &async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::pull::Config>
117    {
118        &self.inner
119    }
120
121    /// Get consumer name.
122    pub fn name(&self) -> &str {
123        &self.inner.cached_info().name
124    }
125}
126
127impl<T: DeserializeOwned, C: CodecType> PullConsumer<T, C> {
128    /// Fetch a batch of messages.
129    ///
130    /// # Example
131    ///
132    /// ```no_run
133    /// use intercom::{Client, MsgPackCodec};
134    /// use serde::{Deserialize, Serialize};
135    /// use futures::StreamExt;
136    ///
137    /// #[derive(Serialize, Deserialize, Debug)]
138    /// struct Event { id: u64 }
139    ///
140    /// # async fn example() -> intercom::Result<()> {
141    /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
142    /// let jetstream = client.jetstream();
143    /// let stream = jetstream.get_stream("events").await?;
144    /// let consumer = stream.get_pull_consumer::<Event>("my-consumer").await?;
145    ///
146    /// let mut messages = consumer.fetch(10).await?;
147    /// while let Some(result) = messages.next().await {
148    ///     let msg = result?;
149    ///     println!("Got: {:?}", msg.payload);
150    ///     msg.ack().await?;
151    /// }
152    /// # Ok(())
153    /// # }
154    /// ```
155    pub async fn fetch(&self, batch_size: usize) -> Result<PullBatch<T, C>> {
156        let inner = self
157            .inner
158            .fetch()
159            .max_messages(batch_size)
160            .messages()
161            .await
162            .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
163        Ok(PullBatch::new(inner))
164    }
165
166    /// Fetch messages with a builder for advanced options.
167    pub fn fetch_builder(&self) -> FetchBuilder<T, C> {
168        FetchBuilder::new(self.inner.clone())
169    }
170
171    /// Create a continuous message stream.
172    ///
173    /// Returns a [`Stream`] that continuously delivers messages.
174    ///
175    /// # Example
176    ///
177    /// ```no_run
178    /// use intercom::{Client, MsgPackCodec};
179    /// use serde::{Deserialize, Serialize};
180    /// use futures::StreamExt;
181    ///
182    /// #[derive(Serialize, Deserialize, Debug)]
183    /// struct Event { id: u64 }
184    ///
185    /// # async fn example() -> intercom::Result<()> {
186    /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
187    /// let jetstream = client.jetstream();
188    /// let stream = jetstream.get_stream("events").await?;
189    /// let consumer = stream.get_pull_consumer::<Event>("my-consumer").await?;
190    ///
191    /// let mut messages = consumer.messages().await?;
192    /// while let Some(result) = messages.next().await {
193    ///     let msg = result?;
194    ///     println!("Got: {:?}", msg.payload);
195    ///     msg.ack().await?;
196    /// }
197    /// # Ok(())
198    /// # }
199    /// ```
200    pub async fn messages(&self) -> Result<PullMessages<T, C>> {
201        let inner = self
202            .inner
203            .messages()
204            .await
205            .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
206        Ok(PullMessages::new(inner))
207    }
208}
209
210pin_project! {
211    /// A batch of messages from a fetch operation.
212    pub struct PullBatch<T, C: CodecType> {
213        #[pin]
214        inner: async_nats::jetstream::consumer::pull::Batch,
215        _marker: PhantomData<(T, C)>,
216    }
217}
218
219impl<T, C: CodecType> PullBatch<T, C> {
220    fn new(inner: async_nats::jetstream::consumer::pull::Batch) -> Self {
221        Self {
222            inner,
223            _marker: PhantomData,
224        }
225    }
226}
227
228impl<T: DeserializeOwned, C: CodecType> Stream for PullBatch<T, C> {
229    type Item = Result<JetStreamMessage<T>>;
230
231    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
232        let this = self.project();
233
234        match this.inner.poll_next(cx) {
235            Poll::Ready(Some(Ok(msg))) => {
236                let info = msg.info().ok();
237                let stream_sequence = info.as_ref().map(|i| i.stream_sequence).unwrap_or(0);
238                let consumer_sequence = info.as_ref().map(|i| i.consumer_sequence).unwrap_or(0);
239                let result = C::decode(&msg.payload).map(|payload| JetStreamMessage {
240                    payload,
241                    subject: msg.subject.to_string(),
242                    stream_sequence,
243                    consumer_sequence,
244                    raw: msg,
245                });
246                Poll::Ready(Some(result))
247            }
248            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(
249                crate::error::Error::JetStreamConsumer(e.to_string()),
250            ))),
251            Poll::Ready(None) => Poll::Ready(None),
252            Poll::Pending => Poll::Pending,
253        }
254    }
255}
256
257pin_project! {
258    /// A stream of messages from a pull consumer.
259    pub struct PullMessages<T, C: CodecType> {
260        #[pin]
261        inner: async_nats::jetstream::consumer::pull::Stream,
262        _marker: PhantomData<(T, C)>,
263    }
264}
265
266impl<T, C: CodecType> PullMessages<T, C> {
267    fn new(inner: async_nats::jetstream::consumer::pull::Stream) -> Self {
268        Self {
269            inner,
270            _marker: PhantomData,
271        }
272    }
273}
274
275impl<T: DeserializeOwned, C: CodecType> Stream for PullMessages<T, C> {
276    type Item = Result<JetStreamMessage<T>>;
277
278    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
279        let this = self.project();
280
281        match this.inner.poll_next(cx) {
282            Poll::Ready(Some(Ok(msg))) => {
283                let info = msg.info().ok();
284                let stream_sequence = info.as_ref().map(|i| i.stream_sequence).unwrap_or(0);
285                let consumer_sequence = info.as_ref().map(|i| i.consumer_sequence).unwrap_or(0);
286                let result = C::decode(&msg.payload).map(|payload| JetStreamMessage {
287                    payload,
288                    subject: msg.subject.to_string(),
289                    stream_sequence,
290                    consumer_sequence,
291                    raw: msg,
292                });
293                Poll::Ready(Some(result))
294            }
295            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(
296                crate::error::Error::JetStreamConsumer(e.to_string()),
297            ))),
298            Poll::Ready(None) => Poll::Ready(None),
299            Poll::Pending => Poll::Pending,
300        }
301    }
302}
303
304/// Builder for fetching messages from a pull consumer.
305pub struct FetchBuilder<T, C: CodecType> {
306    inner: async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::pull::Config>,
307    max_messages: usize,
308    max_bytes: Option<usize>,
309    expires: Option<std::time::Duration>,
310    idle_heartbeat: Option<std::time::Duration>,
311    _marker: PhantomData<(T, C)>,
312}
313
314impl<T, C: CodecType> FetchBuilder<T, C> {
315    fn new(
316        inner: async_nats::jetstream::consumer::Consumer<
317            async_nats::jetstream::consumer::pull::Config,
318        >,
319    ) -> Self {
320        Self {
321            inner,
322            max_messages: 10,
323            max_bytes: None,
324            expires: None,
325            idle_heartbeat: None,
326            _marker: PhantomData,
327        }
328    }
329
330    /// Set the maximum number of messages to fetch.
331    pub fn max_messages(mut self, max: usize) -> Self {
332        self.max_messages = max;
333        self
334    }
335
336    /// Set the maximum bytes to fetch.
337    pub fn max_bytes(mut self, max: usize) -> Self {
338        self.max_bytes = Some(max);
339        self
340    }
341
342    /// Set the expiration time.
343    pub fn expires(mut self, duration: std::time::Duration) -> Self {
344        self.expires = Some(duration);
345        self
346    }
347
348    /// Set the idle heartbeat interval.
349    pub fn idle_heartbeat(mut self, duration: std::time::Duration) -> Self {
350        self.idle_heartbeat = Some(duration);
351        self
352    }
353}
354
355impl<T: DeserializeOwned, C: CodecType> FetchBuilder<T, C> {
356    /// Execute the fetch.
357    pub async fn fetch(self) -> Result<PullBatch<T, C>> {
358        let mut fetch = self.inner.fetch().max_messages(self.max_messages);
359
360        if let Some(bytes) = self.max_bytes {
361            fetch = fetch.max_bytes(bytes);
362        }
363        if let Some(expires) = self.expires {
364            fetch = fetch.expires(expires);
365        }
366        if let Some(heartbeat) = self.idle_heartbeat {
367            fetch = fetch.heartbeat(heartbeat);
368        }
369
370        let inner = fetch
371            .messages()
372            .await
373            .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
374        Ok(PullBatch::new(inner))
375    }
376}
377
378// ============================================================================
379// Push Consumer
380// ============================================================================
381
382/// A typed push consumer with configurable codec.
383///
384/// # Type Parameters
385///
386/// * `T` - The message type
387/// * `C` - The codec type used for deserialization
388pub struct PushConsumer<T, C: CodecType> {
389    inner: async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::push::Config>,
390    _marker: PhantomData<(T, C)>,
391}
392
393impl<T, C: CodecType> PushConsumer<T, C> {
394    /// Create a new push consumer wrapper.
395    pub(crate) fn new(
396        inner: async_nats::jetstream::consumer::Consumer<
397            async_nats::jetstream::consumer::push::Config,
398        >,
399    ) -> Self {
400        Self {
401            inner,
402            _marker: PhantomData,
403        }
404    }
405
406    /// Get the underlying async-nats consumer.
407    pub fn inner(
408        &self,
409    ) -> &async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::push::Config>
410    {
411        &self.inner
412    }
413
414    /// Get consumer name.
415    pub fn name(&self) -> &str {
416        &self.inner.cached_info().name
417    }
418}
419
420impl<T: DeserializeOwned, C: CodecType> PushConsumer<T, C> {
421    /// Get a message stream from this push consumer.
422    pub async fn messages(&self) -> Result<PushMessages<T, C>> {
423        let inner = self
424            .inner
425            .messages()
426            .await
427            .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
428        Ok(PushMessages::new(inner))
429    }
430}
431
432pin_project! {
433    /// A stream of messages from a push consumer.
434    pub struct PushMessages<T, C: CodecType> {
435        #[pin]
436        inner: async_nats::jetstream::consumer::push::Messages,
437        _marker: PhantomData<(T, C)>,
438    }
439}
440
441impl<T, C: CodecType> PushMessages<T, C> {
442    fn new(inner: async_nats::jetstream::consumer::push::Messages) -> Self {
443        Self {
444            inner,
445            _marker: PhantomData,
446        }
447    }
448}
449
450impl<T: DeserializeOwned, C: CodecType> Stream for PushMessages<T, C> {
451    type Item = Result<JetStreamMessage<T>>;
452
453    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
454        let this = self.project();
455
456        match this.inner.poll_next(cx) {
457            Poll::Ready(Some(Ok(msg))) => {
458                let info = msg.info().ok();
459                let stream_sequence = info.as_ref().map(|i| i.stream_sequence).unwrap_or(0);
460                let consumer_sequence = info.as_ref().map(|i| i.consumer_sequence).unwrap_or(0);
461                let result = C::decode(&msg.payload).map(|payload| JetStreamMessage {
462                    payload,
463                    subject: msg.subject.to_string(),
464                    stream_sequence,
465                    consumer_sequence,
466                    raw: msg,
467                });
468                Poll::Ready(Some(result))
469            }
470            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(
471                crate::error::Error::JetStreamConsumer(e.to_string()),
472            ))),
473            Poll::Ready(None) => Poll::Ready(None),
474            Poll::Pending => Poll::Pending,
475        }
476    }
477}
478
479// ============================================================================
480// Pull Consumer Builder
481// ============================================================================
482
483/// Builder for creating pull consumers.
484pub struct PullConsumerBuilder<T, C: CodecType> {
485    stream: async_nats::jetstream::stream::Stream,
486    config: async_nats::jetstream::consumer::pull::Config,
487    _marker: PhantomData<(T, C)>,
488}
489
490impl<T, C: CodecType> PullConsumerBuilder<T, C> {
491    /// Create a new pull consumer builder.
492    pub(crate) fn new(stream: async_nats::jetstream::stream::Stream, name: String) -> Self {
493        Self {
494            stream,
495            config: async_nats::jetstream::consumer::pull::Config {
496                name: Some(name),
497                ..Default::default()
498            },
499            _marker: PhantomData,
500        }
501    }
502
503    /// Make this a durable consumer.
504    pub fn durable(mut self) -> Self {
505        self.config.durable_name = self.config.name.clone();
506        self
507    }
508
509    /// Set the durable name (different from consumer name).
510    pub fn durable_name(mut self, name: impl Into<String>) -> Self {
511        self.config.durable_name = Some(name.into());
512        self
513    }
514
515    /// Set a filter subject.
516    pub fn filter_subject(mut self, subject: impl Into<String>) -> Self {
517        self.config.filter_subject = subject.into();
518        self
519    }
520
521    /// Set multiple filter subjects.
522    pub fn filter_subjects(mut self, subjects: Vec<String>) -> Self {
523        self.config.filter_subjects = subjects;
524        self
525    }
526
527    /// Set the description.
528    pub fn description(mut self, description: impl Into<String>) -> Self {
529        self.config.description = Some(description.into());
530        self
531    }
532
533    /// Set the ack policy.
534    pub fn ack_policy(mut self, policy: AckPolicy) -> Self {
535        self.config.ack_policy = policy.into();
536        self
537    }
538
539    /// Set the ack wait duration.
540    pub fn ack_wait(mut self, duration: std::time::Duration) -> Self {
541        self.config.ack_wait = duration;
542        self
543    }
544
545    /// Set the maximum number of deliveries.
546    pub fn max_deliver(mut self, max: i64) -> Self {
547        self.config.max_deliver = max;
548        self
549    }
550
551    /// Set the replay policy.
552    pub fn replay_policy(mut self, policy: ReplayPolicy) -> Self {
553        self.config.replay_policy = policy.into();
554        self
555    }
556
557    /// Set the deliver policy.
558    pub fn deliver_policy(mut self, policy: DeliverPolicy) -> Self {
559        self.config.deliver_policy = policy.into();
560        self
561    }
562
563    /// Start at a specific sequence.
564    pub fn start_sequence(mut self, seq: u64) -> Self {
565        self.config.deliver_policy = async_nats::jetstream::consumer::DeliverPolicy::ByStartSequence { start_sequence: seq };
566        self
567    }
568
569    /// Start at a specific time.
570    pub fn start_time(mut self, time: time::OffsetDateTime) -> Self {
571        self.config.deliver_policy = async_nats::jetstream::consumer::DeliverPolicy::ByStartTime { start_time: time };
572        self
573    }
574
575    /// Set the maximum ack pending.
576    pub fn max_ack_pending(mut self, max: i64) -> Self {
577        self.config.max_ack_pending = max;
578        self
579    }
580
581    /// Set the maximum waiting.
582    pub fn max_waiting(mut self, max: i64) -> Self {
583        self.config.max_waiting = max;
584        self
585    }
586
587    /// Set the maximum batch size.
588    pub fn max_batch(mut self, max: i64) -> Self {
589        self.config.max_batch = max;
590        self
591    }
592
593    /// Set the maximum bytes.
594    pub fn max_bytes(mut self, max: i64) -> Self {
595        self.config.max_bytes = max;
596        self
597    }
598
599    /// Set the maximum expiry.
600    pub fn max_expires(mut self, duration: std::time::Duration) -> Self {
601        self.config.max_expires = duration;
602        self
603    }
604
605    /// Set the inactive threshold.
606    pub fn inactive_threshold(mut self, duration: std::time::Duration) -> Self {
607        self.config.inactive_threshold = duration;
608        self
609    }
610
611    /// Set headers only.
612    pub fn headers_only(mut self, headers_only: bool) -> Self {
613        self.config.headers_only = headers_only;
614        self
615    }
616
617    /// Set the number of replicas.
618    pub fn replicas(mut self, replicas: usize) -> Self {
619        self.config.num_replicas = replicas;
620        self
621    }
622
623    /// Set memory storage.
624    pub fn memory_storage(mut self, memory: bool) -> Self {
625        self.config.memory_storage = memory;
626        self
627    }
628
629    /// Set the backoff durations for redelivery.
630    pub fn backoff(mut self, backoff: Vec<std::time::Duration>) -> Self {
631        self.config.backoff = backoff;
632        self
633    }
634
635    /// Set metadata.
636    pub fn metadata(mut self, metadata: std::collections::HashMap<String, String>) -> Self {
637        self.config.metadata = metadata;
638        self
639    }
640
641    /// Create the consumer.
642    pub async fn create(self) -> Result<PullConsumer<T, C>> {
643        let inner = self
644            .stream
645            .create_consumer(self.config)
646            .await
647            .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
648        Ok(PullConsumer::new(inner))
649    }
650
651    /// Create or update the consumer.
652    pub async fn create_or_update(self) -> Result<PullConsumer<T, C>> {
653        let name = self.config.name.clone().unwrap_or_default();
654        let inner = self
655            .stream
656            .get_or_create_consumer(&name, self.config)
657            .await
658            .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
659        Ok(PullConsumer::new(inner))
660    }
661}
662
663// ============================================================================
664// Push Consumer Builder
665// ============================================================================
666
667/// Builder for creating push consumers.
668pub struct PushConsumerBuilder<T, C: CodecType> {
669    stream: async_nats::jetstream::stream::Stream,
670    config: async_nats::jetstream::consumer::push::Config,
671    _marker: PhantomData<(T, C)>,
672}
673
674impl<T, C: CodecType> PushConsumerBuilder<T, C> {
675    /// Create a new push consumer builder.
676    pub(crate) fn new(stream: async_nats::jetstream::stream::Stream, name: String) -> Self {
677        Self {
678            stream,
679            config: async_nats::jetstream::consumer::push::Config {
680                name: Some(name),
681                ..Default::default()
682            },
683            _marker: PhantomData,
684        }
685    }
686
687    /// Make this a durable consumer.
688    pub fn durable(mut self) -> Self {
689        self.config.durable_name = self.config.name.clone();
690        self
691    }
692
693    /// Set the durable name.
694    pub fn durable_name(mut self, name: impl Into<String>) -> Self {
695        self.config.durable_name = Some(name.into());
696        self
697    }
698
699    /// Set the deliver subject (required for push consumers).
700    pub fn deliver_subject(mut self, subject: impl Into<String>) -> Self {
701        self.config.deliver_subject = subject.into();
702        self
703    }
704
705    /// Set the deliver group (for queue groups).
706    pub fn deliver_group(mut self, group: impl Into<String>) -> Self {
707        self.config.deliver_group = Some(group.into());
708        self
709    }
710
711    /// Set a filter subject.
712    pub fn filter_subject(mut self, subject: impl Into<String>) -> Self {
713        self.config.filter_subject = subject.into();
714        self
715    }
716
717    /// Set multiple filter subjects.
718    pub fn filter_subjects(mut self, subjects: Vec<String>) -> Self {
719        self.config.filter_subjects = subjects;
720        self
721    }
722
723    /// Set the description.
724    pub fn description(mut self, description: impl Into<String>) -> Self {
725        self.config.description = Some(description.into());
726        self
727    }
728
729    /// Set the ack policy.
730    pub fn ack_policy(mut self, policy: AckPolicy) -> Self {
731        self.config.ack_policy = policy.into();
732        self
733    }
734
735    /// Set the ack wait duration.
736    pub fn ack_wait(mut self, duration: std::time::Duration) -> Self {
737        self.config.ack_wait = duration;
738        self
739    }
740
741    /// Set the maximum number of deliveries.
742    pub fn max_deliver(mut self, max: i64) -> Self {
743        self.config.max_deliver = max;
744        self
745    }
746
747    /// Set the replay policy.
748    pub fn replay_policy(mut self, policy: ReplayPolicy) -> Self {
749        self.config.replay_policy = policy.into();
750        self
751    }
752
753    /// Set the deliver policy.
754    pub fn deliver_policy(mut self, policy: DeliverPolicy) -> Self {
755        self.config.deliver_policy = policy.into();
756        self
757    }
758
759    /// Start at a specific sequence.
760    pub fn start_sequence(mut self, seq: u64) -> Self {
761        self.config.deliver_policy = async_nats::jetstream::consumer::DeliverPolicy::ByStartSequence { start_sequence: seq };
762        self
763    }
764
765    /// Start at a specific time.
766    pub fn start_time(mut self, time: time::OffsetDateTime) -> Self {
767        self.config.deliver_policy = async_nats::jetstream::consumer::DeliverPolicy::ByStartTime { start_time: time };
768        self
769    }
770
771    /// Set the rate limit.
772    pub fn rate_limit(mut self, limit: u64) -> Self {
773        self.config.rate_limit = limit;
774        self
775    }
776
777    /// Set the maximum ack pending.
778    pub fn max_ack_pending(mut self, max: i64) -> Self {
779        self.config.max_ack_pending = max;
780        self
781    }
782
783    /// Set the idle heartbeat.
784    pub fn idle_heartbeat(mut self, duration: std::time::Duration) -> Self {
785        self.config.idle_heartbeat = duration;
786        self
787    }
788
789    /// Enable flow control.
790    pub fn flow_control(mut self, enabled: bool) -> Self {
791        self.config.flow_control = enabled;
792        self
793    }
794
795    /// Set headers only.
796    pub fn headers_only(mut self, headers_only: bool) -> Self {
797        self.config.headers_only = headers_only;
798        self
799    }
800
801    /// Set the number of replicas.
802    pub fn replicas(mut self, replicas: usize) -> Self {
803        self.config.num_replicas = replicas;
804        self
805    }
806
807    /// Set memory storage.
808    pub fn memory_storage(mut self, memory: bool) -> Self {
809        self.config.memory_storage = memory;
810        self
811    }
812
813    /// Set the backoff durations for redelivery.
814    pub fn backoff(mut self, backoff: Vec<std::time::Duration>) -> Self {
815        self.config.backoff = backoff;
816        self
817    }
818
819    /// Set metadata.
820    pub fn metadata(mut self, metadata: std::collections::HashMap<String, String>) -> Self {
821        self.config.metadata = metadata;
822        self
823    }
824
825    /// Set the inactive threshold.
826    pub fn inactive_threshold(mut self, duration: std::time::Duration) -> Self {
827        self.config.inactive_threshold = duration;
828        self
829    }
830
831    /// Create the consumer.
832    pub async fn create(self) -> Result<PushConsumer<T, C>> {
833        let inner = self
834            .stream
835            .create_consumer(self.config)
836            .await
837            .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
838        Ok(PushConsumer::new(inner))
839    }
840
841    /// Create or update the consumer.
842    pub async fn create_or_update(self) -> Result<PushConsumer<T, C>> {
843        let name = self.config.name.clone().unwrap_or_default();
844        let inner = self
845            .stream
846            .get_or_create_consumer(&name, self.config)
847            .await
848            .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
849        Ok(PushConsumer::new(inner))
850    }
851}
852
853// ============================================================================
854// Consumer Policies
855// ============================================================================
856
857/// Acknowledgment policy for consumers.
858#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
859pub enum AckPolicy {
860    /// Acknowledge each message individually.
861    #[default]
862    Explicit,
863    /// Messages are considered acknowledged on delivery.
864    None,
865    /// Acknowledge all messages up to and including the acknowledged one.
866    All,
867}
868
869impl From<AckPolicy> for async_nats::jetstream::consumer::AckPolicy {
870    fn from(policy: AckPolicy) -> Self {
871        match policy {
872            AckPolicy::Explicit => async_nats::jetstream::consumer::AckPolicy::Explicit,
873            AckPolicy::None => async_nats::jetstream::consumer::AckPolicy::None,
874            AckPolicy::All => async_nats::jetstream::consumer::AckPolicy::All,
875        }
876    }
877}
878
879/// Replay policy for consumers.
880#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
881pub enum ReplayPolicy {
882    /// Replay messages as fast as possible.
883    #[default]
884    Instant,
885    /// Replay messages at the original rate.
886    Original,
887}
888
889impl From<ReplayPolicy> for async_nats::jetstream::consumer::ReplayPolicy {
890    fn from(policy: ReplayPolicy) -> Self {
891        match policy {
892            ReplayPolicy::Instant => async_nats::jetstream::consumer::ReplayPolicy::Instant,
893            ReplayPolicy::Original => async_nats::jetstream::consumer::ReplayPolicy::Original,
894        }
895    }
896}
897
898/// Deliver policy for consumers.
899#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
900pub enum DeliverPolicy {
901    /// Deliver all messages.
902    #[default]
903    All,
904    /// Deliver only the last message.
905    Last,
906    /// Deliver only new messages.
907    New,
908    /// Deliver the last message per subject.
909    LastPerSubject,
910}
911
912impl From<DeliverPolicy> for async_nats::jetstream::consumer::DeliverPolicy {
913    fn from(policy: DeliverPolicy) -> Self {
914        match policy {
915            DeliverPolicy::All => async_nats::jetstream::consumer::DeliverPolicy::All,
916            DeliverPolicy::Last => async_nats::jetstream::consumer::DeliverPolicy::Last,
917            DeliverPolicy::New => async_nats::jetstream::consumer::DeliverPolicy::New,
918            DeliverPolicy::LastPerSubject => {
919                async_nats::jetstream::consumer::DeliverPolicy::LastPerSubject
920            }
921        }
922    }
923}