1use std::{
4 marker::PhantomData,
5 pin::Pin,
6 task::{Context, Poll},
7};
8
9use futures::Stream;
10use pin_project_lite::pin_project;
11use serde::de::DeserializeOwned;
12
13use crate::{codec::CodecType, error::Result};
14
15#[derive(Debug)]
17pub struct JetStreamMessage<T> {
18 pub payload: T,
20 pub subject: String,
22 pub stream_sequence: u64,
24 pub consumer_sequence: u64,
26 pub reply: Option<String>,
28 raw: async_nats::jetstream::Message,
30}
31
32impl<T> JetStreamMessage<T> {
33 pub async fn ack(&self) -> Result<()> {
35 self.raw
36 .ack()
37 .await
38 .map_err(|e| crate::error::Error::JetStream(e.to_string()))
39 }
40
41 pub async fn double_ack(&self) -> Result<()> {
43 self.raw
44 .double_ack()
45 .await
46 .map_err(|e| crate::error::Error::JetStream(e.to_string()))
47 }
48
49 pub async fn nak(&self) -> Result<()> {
51 self.raw
52 .ack_with(async_nats::jetstream::AckKind::Nak(None))
53 .await
54 .map_err(|e| crate::error::Error::JetStream(e.to_string()))
55 }
56
57 pub async fn nak_with_delay(&self, delay: std::time::Duration) -> Result<()> {
59 self.raw
60 .ack_with(async_nats::jetstream::AckKind::Nak(Some(delay)))
61 .await
62 .map_err(|e| crate::error::Error::JetStream(e.to_string()))
63 }
64
65 pub async fn in_progress(&self) -> Result<()> {
67 self.raw
68 .ack_with(async_nats::jetstream::AckKind::Progress)
69 .await
70 .map_err(|e| crate::error::Error::JetStream(e.to_string()))
71 }
72
73 pub async fn term(&self) -> Result<()> {
75 self.raw
76 .ack_with(async_nats::jetstream::AckKind::Term)
77 .await
78 .map_err(|e| crate::error::Error::JetStream(e.to_string()))
79 }
80
81 pub fn raw(&self) -> &async_nats::jetstream::Message {
83 &self.raw
84 }
85}
86
87pub struct PullConsumer<T, C: CodecType> {
98 inner: async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::pull::Config>,
99 _marker: PhantomData<(T, C)>,
100}
101
102impl<T, C: CodecType> PullConsumer<T, C> {
103 pub(crate) fn new(
105 inner: async_nats::jetstream::consumer::Consumer<
106 async_nats::jetstream::consumer::pull::Config,
107 >,
108 ) -> Self {
109 Self {
110 inner,
111 _marker: PhantomData,
112 }
113 }
114
115 pub fn inner(
117 &self,
118 ) -> &async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::pull::Config>
119 {
120 &self.inner
121 }
122
123 pub fn name(&self) -> &str {
125 &self.inner.cached_info().name
126 }
127}
128
129impl<T: DeserializeOwned, C: CodecType> PullConsumer<T, C> {
130 pub async fn fetch(&self, batch_size: usize) -> Result<PullBatch<T, C>> {
158 let inner = self
159 .inner
160 .fetch()
161 .max_messages(batch_size)
162 .messages()
163 .await
164 .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
165 Ok(PullBatch::new(inner))
166 }
167
168 pub fn fetch_builder(&self) -> FetchBuilder<T, C> {
170 FetchBuilder::new(self.inner.clone())
171 }
172
173 pub async fn messages(&self) -> Result<PullMessages<T, C>> {
203 let inner = self
204 .inner
205 .messages()
206 .await
207 .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
208 Ok(PullMessages::new(inner))
209 }
210}
211
212pin_project! {
213 pub struct PullBatch<T, C: CodecType> {
215 #[pin]
216 inner: async_nats::jetstream::consumer::pull::Batch,
217 _marker: PhantomData<(T, C)>,
218 }
219}
220
221impl<T, C: CodecType> PullBatch<T, C> {
222 fn new(inner: async_nats::jetstream::consumer::pull::Batch) -> Self {
223 Self {
224 inner,
225 _marker: PhantomData,
226 }
227 }
228}
229
230impl<T: DeserializeOwned, C: CodecType> Stream for PullBatch<T, C> {
231 type Item = Result<JetStreamMessage<T>>;
232
233 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
234 let this = self.project();
235
236 match this.inner.poll_next(cx) {
237 Poll::Ready(Some(Ok(msg))) => {
238 let info = msg.info().ok();
239 let stream_sequence = info.as_ref().map(|i| i.stream_sequence).unwrap_or(0);
240 let consumer_sequence = info.as_ref().map(|i| i.consumer_sequence).unwrap_or(0);
241 let result = C::decode(&msg.payload).map(|payload| JetStreamMessage {
242 payload,
243 subject: msg.subject.to_string(),
244 stream_sequence,
245 consumer_sequence,
246 reply: msg.reply.clone().map(|s| s.to_string()),
247 raw: msg,
248 });
249 Poll::Ready(Some(result))
250 }
251 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(
252 crate::error::Error::JetStreamConsumer(e.to_string()),
253 ))),
254 Poll::Ready(None) => Poll::Ready(None),
255 Poll::Pending => Poll::Pending,
256 }
257 }
258}
259
260pin_project! {
261 pub struct PullMessages<T, C: CodecType> {
263 #[pin]
264 inner: async_nats::jetstream::consumer::pull::Stream,
265 _marker: PhantomData<(T, C)>,
266 }
267}
268
269impl<T, C: CodecType> PullMessages<T, C> {
270 fn new(inner: async_nats::jetstream::consumer::pull::Stream) -> Self {
271 Self {
272 inner,
273 _marker: PhantomData,
274 }
275 }
276}
277
278impl<T: DeserializeOwned, C: CodecType> Stream for PullMessages<T, C> {
279 type Item = Result<JetStreamMessage<T>>;
280
281 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
282 let this = self.project();
283
284 match this.inner.poll_next(cx) {
285 Poll::Ready(Some(Ok(msg))) => {
286 let info = msg.info().ok();
287 let stream_sequence = info.as_ref().map(|i| i.stream_sequence).unwrap_or(0);
288 let consumer_sequence = info.as_ref().map(|i| i.consumer_sequence).unwrap_or(0);
289 let result = C::decode(&msg.payload).map(|payload| JetStreamMessage {
290 payload,
291 subject: msg.subject.to_string(),
292 stream_sequence,
293 consumer_sequence,
294 reply: msg.reply.clone().map(|s| s.to_string()),
295 raw: msg,
296 });
297 Poll::Ready(Some(result))
298 }
299 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(
300 crate::error::Error::JetStreamConsumer(e.to_string()),
301 ))),
302 Poll::Ready(None) => Poll::Ready(None),
303 Poll::Pending => Poll::Pending,
304 }
305 }
306}
307
308pub struct FetchBuilder<T, C: CodecType> {
310 inner: async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::pull::Config>,
311 max_messages: usize,
312 max_bytes: Option<usize>,
313 expires: Option<std::time::Duration>,
314 idle_heartbeat: Option<std::time::Duration>,
315 _marker: PhantomData<(T, C)>,
316}
317
318impl<T, C: CodecType> FetchBuilder<T, C> {
319 fn new(
320 inner: async_nats::jetstream::consumer::Consumer<
321 async_nats::jetstream::consumer::pull::Config,
322 >,
323 ) -> Self {
324 Self {
325 inner,
326 max_messages: 10,
327 max_bytes: None,
328 expires: None,
329 idle_heartbeat: None,
330 _marker: PhantomData,
331 }
332 }
333
334 pub fn max_messages(mut self, max: usize) -> Self {
336 self.max_messages = max;
337 self
338 }
339
340 pub fn max_bytes(mut self, max: usize) -> Self {
342 self.max_bytes = Some(max);
343 self
344 }
345
346 pub fn expires(mut self, duration: std::time::Duration) -> Self {
348 self.expires = Some(duration);
349 self
350 }
351
352 pub fn idle_heartbeat(mut self, duration: std::time::Duration) -> Self {
354 self.idle_heartbeat = Some(duration);
355 self
356 }
357}
358
359impl<T: DeserializeOwned, C: CodecType> FetchBuilder<T, C> {
360 pub async fn fetch(self) -> Result<PullBatch<T, C>> {
362 let mut fetch = self.inner.fetch().max_messages(self.max_messages);
363
364 if let Some(bytes) = self.max_bytes {
365 fetch = fetch.max_bytes(bytes);
366 }
367 if let Some(expires) = self.expires {
368 fetch = fetch.expires(expires);
369 }
370 if let Some(heartbeat) = self.idle_heartbeat {
371 fetch = fetch.heartbeat(heartbeat);
372 }
373
374 let inner = fetch
375 .messages()
376 .await
377 .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
378 Ok(PullBatch::new(inner))
379 }
380}
381
382pub struct PushConsumer<T, C: CodecType> {
393 inner: async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::push::Config>,
394 _marker: PhantomData<(T, C)>,
395}
396
397impl<T, C: CodecType> PushConsumer<T, C> {
398 pub(crate) fn new(
400 inner: async_nats::jetstream::consumer::Consumer<
401 async_nats::jetstream::consumer::push::Config,
402 >,
403 ) -> Self {
404 Self {
405 inner,
406 _marker: PhantomData,
407 }
408 }
409
410 pub fn inner(
412 &self,
413 ) -> &async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::push::Config>
414 {
415 &self.inner
416 }
417
418 pub fn name(&self) -> &str {
420 &self.inner.cached_info().name
421 }
422}
423
424impl<T: DeserializeOwned, C: CodecType> PushConsumer<T, C> {
425 pub async fn messages(&self) -> Result<PushMessages<T, C>> {
427 let inner = self
428 .inner
429 .messages()
430 .await
431 .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
432 Ok(PushMessages::new(inner))
433 }
434}
435
436pin_project! {
437 pub struct PushMessages<T, C: CodecType> {
439 #[pin]
440 inner: async_nats::jetstream::consumer::push::Messages,
441 _marker: PhantomData<(T, C)>,
442 }
443}
444
445impl<T, C: CodecType> PushMessages<T, C> {
446 fn new(inner: async_nats::jetstream::consumer::push::Messages) -> Self {
447 Self {
448 inner,
449 _marker: PhantomData,
450 }
451 }
452}
453
454impl<T: DeserializeOwned, C: CodecType> Stream for PushMessages<T, C> {
455 type Item = Result<JetStreamMessage<T>>;
456
457 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
458 let this = self.project();
459
460 match this.inner.poll_next(cx) {
461 Poll::Ready(Some(Ok(msg))) => {
462 let info = msg.info().ok();
463 let stream_sequence = info.as_ref().map(|i| i.stream_sequence).unwrap_or(0);
464 let consumer_sequence = info.as_ref().map(|i| i.consumer_sequence).unwrap_or(0);
465 let result = C::decode(&msg.payload).map(|payload| JetStreamMessage {
466 payload,
467 subject: msg.subject.to_string(),
468 stream_sequence,
469 consumer_sequence,
470 reply: msg.reply.clone().map(|s| s.to_string()),
471 raw: msg,
472 });
473 Poll::Ready(Some(result))
474 }
475 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(
476 crate::error::Error::JetStreamConsumer(e.to_string()),
477 ))),
478 Poll::Ready(None) => Poll::Ready(None),
479 Poll::Pending => Poll::Pending,
480 }
481 }
482}
483
484pub struct PullConsumerBuilder<T, C: CodecType> {
490 stream: async_nats::jetstream::stream::Stream,
491 config: async_nats::jetstream::consumer::pull::Config,
492 _marker: PhantomData<(T, C)>,
493}
494
495impl<T, C: CodecType> PullConsumerBuilder<T, C> {
496 pub(crate) fn new(stream: async_nats::jetstream::stream::Stream, name: String) -> Self {
498 Self {
499 stream,
500 config: async_nats::jetstream::consumer::pull::Config {
501 name: Some(name),
502 ..Default::default()
503 },
504 _marker: PhantomData,
505 }
506 }
507
508 pub fn durable(mut self) -> Self {
510 self.config.durable_name = self.config.name.clone();
511 self
512 }
513
514 pub fn durable_name(mut self, name: impl Into<String>) -> Self {
516 self.config.durable_name = Some(name.into());
517 self
518 }
519
520 pub fn filter_subject(mut self, subject: impl Into<String>) -> Self {
522 self.config.filter_subject = subject.into();
523 self
524 }
525
526 pub fn filter_subjects(mut self, subjects: Vec<String>) -> Self {
528 self.config.filter_subjects = subjects;
529 self
530 }
531
532 pub fn description(mut self, description: impl Into<String>) -> Self {
534 self.config.description = Some(description.into());
535 self
536 }
537
538 pub fn ack_policy(mut self, policy: AckPolicy) -> Self {
540 self.config.ack_policy = policy.into();
541 self
542 }
543
544 pub fn ack_wait(mut self, duration: std::time::Duration) -> Self {
546 self.config.ack_wait = duration;
547 self
548 }
549
550 pub fn max_deliver(mut self, max: i64) -> Self {
552 self.config.max_deliver = max;
553 self
554 }
555
556 pub fn replay_policy(mut self, policy: ReplayPolicy) -> Self {
558 self.config.replay_policy = policy.into();
559 self
560 }
561
562 pub fn deliver_policy(mut self, policy: DeliverPolicy) -> Self {
564 self.config.deliver_policy = policy.into();
565 self
566 }
567
568 pub fn start_sequence(mut self, seq: u64) -> Self {
570 self.config.deliver_policy =
571 async_nats::jetstream::consumer::DeliverPolicy::ByStartSequence {
572 start_sequence: seq,
573 };
574 self
575 }
576
577 pub fn start_time(mut self, time: time::OffsetDateTime) -> Self {
579 self.config.deliver_policy =
580 async_nats::jetstream::consumer::DeliverPolicy::ByStartTime { start_time: time };
581 self
582 }
583
584 pub fn max_ack_pending(mut self, max: i64) -> Self {
586 self.config.max_ack_pending = max;
587 self
588 }
589
590 pub fn max_waiting(mut self, max: i64) -> Self {
592 self.config.max_waiting = max;
593 self
594 }
595
596 pub fn max_batch(mut self, max: i64) -> Self {
598 self.config.max_batch = max;
599 self
600 }
601
602 pub fn max_bytes(mut self, max: i64) -> Self {
604 self.config.max_bytes = max;
605 self
606 }
607
608 pub fn max_expires(mut self, duration: std::time::Duration) -> Self {
610 self.config.max_expires = duration;
611 self
612 }
613
614 pub fn inactive_threshold(mut self, duration: std::time::Duration) -> Self {
616 self.config.inactive_threshold = duration;
617 self
618 }
619
620 pub fn headers_only(mut self, headers_only: bool) -> Self {
622 self.config.headers_only = headers_only;
623 self
624 }
625
626 pub fn replicas(mut self, replicas: usize) -> Self {
628 self.config.num_replicas = replicas;
629 self
630 }
631
632 pub fn memory_storage(mut self, memory: bool) -> Self {
634 self.config.memory_storage = memory;
635 self
636 }
637
638 pub fn backoff(mut self, backoff: Vec<std::time::Duration>) -> Self {
640 self.config.backoff = backoff;
641 self
642 }
643
644 pub fn metadata(mut self, metadata: std::collections::HashMap<String, String>) -> Self {
646 self.config.metadata = metadata;
647 self
648 }
649
650 pub async fn create(self) -> Result<PullConsumer<T, C>> {
652 let inner = self
653 .stream
654 .create_consumer(self.config)
655 .await
656 .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
657 Ok(PullConsumer::new(inner))
658 }
659
660 pub async fn create_or_update(self) -> Result<PullConsumer<T, C>> {
662 let name = self.config.name.clone().unwrap_or_default();
663 let inner = self
664 .stream
665 .get_or_create_consumer(&name, self.config)
666 .await
667 .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
668 Ok(PullConsumer::new(inner))
669 }
670}
671
672pub struct PushConsumerBuilder<T, C: CodecType> {
678 stream: async_nats::jetstream::stream::Stream,
679 config: async_nats::jetstream::consumer::push::Config,
680 _marker: PhantomData<(T, C)>,
681}
682
683impl<T, C: CodecType> PushConsumerBuilder<T, C> {
684 pub(crate) fn new(stream: async_nats::jetstream::stream::Stream, name: String) -> Self {
686 Self {
687 stream,
688 config: async_nats::jetstream::consumer::push::Config {
689 name: Some(name),
690 ..Default::default()
691 },
692 _marker: PhantomData,
693 }
694 }
695
696 pub fn durable(mut self) -> Self {
698 self.config.durable_name = self.config.name.clone();
699 self
700 }
701
702 pub fn durable_name(mut self, name: impl Into<String>) -> Self {
704 self.config.durable_name = Some(name.into());
705 self
706 }
707
708 pub fn deliver_subject(mut self, subject: impl Into<String>) -> Self {
710 self.config.deliver_subject = subject.into();
711 self
712 }
713
714 pub fn deliver_group(mut self, group: impl Into<String>) -> Self {
716 self.config.deliver_group = Some(group.into());
717 self
718 }
719
720 pub fn filter_subject(mut self, subject: impl Into<String>) -> Self {
722 self.config.filter_subject = subject.into();
723 self
724 }
725
726 pub fn filter_subjects(mut self, subjects: Vec<String>) -> Self {
728 self.config.filter_subjects = subjects;
729 self
730 }
731
732 pub fn description(mut self, description: impl Into<String>) -> Self {
734 self.config.description = Some(description.into());
735 self
736 }
737
738 pub fn ack_policy(mut self, policy: AckPolicy) -> Self {
740 self.config.ack_policy = policy.into();
741 self
742 }
743
744 pub fn ack_wait(mut self, duration: std::time::Duration) -> Self {
746 self.config.ack_wait = duration;
747 self
748 }
749
750 pub fn max_deliver(mut self, max: i64) -> Self {
752 self.config.max_deliver = max;
753 self
754 }
755
756 pub fn replay_policy(mut self, policy: ReplayPolicy) -> Self {
758 self.config.replay_policy = policy.into();
759 self
760 }
761
762 pub fn deliver_policy(mut self, policy: DeliverPolicy) -> Self {
764 self.config.deliver_policy = policy.into();
765 self
766 }
767
768 pub fn start_sequence(mut self, seq: u64) -> Self {
770 self.config.deliver_policy =
771 async_nats::jetstream::consumer::DeliverPolicy::ByStartSequence {
772 start_sequence: seq,
773 };
774 self
775 }
776
777 pub fn start_time(mut self, time: time::OffsetDateTime) -> Self {
779 self.config.deliver_policy =
780 async_nats::jetstream::consumer::DeliverPolicy::ByStartTime { start_time: time };
781 self
782 }
783
784 pub fn rate_limit(mut self, limit: u64) -> Self {
786 self.config.rate_limit = limit;
787 self
788 }
789
790 pub fn max_ack_pending(mut self, max: i64) -> Self {
792 self.config.max_ack_pending = max;
793 self
794 }
795
796 pub fn idle_heartbeat(mut self, duration: std::time::Duration) -> Self {
798 self.config.idle_heartbeat = duration;
799 self
800 }
801
802 pub fn flow_control(mut self, enabled: bool) -> Self {
804 self.config.flow_control = enabled;
805 self
806 }
807
808 pub fn headers_only(mut self, headers_only: bool) -> Self {
810 self.config.headers_only = headers_only;
811 self
812 }
813
814 pub fn replicas(mut self, replicas: usize) -> Self {
816 self.config.num_replicas = replicas;
817 self
818 }
819
820 pub fn memory_storage(mut self, memory: bool) -> Self {
822 self.config.memory_storage = memory;
823 self
824 }
825
826 pub fn backoff(mut self, backoff: Vec<std::time::Duration>) -> Self {
828 self.config.backoff = backoff;
829 self
830 }
831
832 pub fn metadata(mut self, metadata: std::collections::HashMap<String, String>) -> Self {
834 self.config.metadata = metadata;
835 self
836 }
837
838 pub fn inactive_threshold(mut self, duration: std::time::Duration) -> Self {
840 self.config.inactive_threshold = duration;
841 self
842 }
843
844 pub async fn create(self) -> Result<PushConsumer<T, C>> {
846 let inner = self
847 .stream
848 .create_consumer(self.config)
849 .await
850 .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
851 Ok(PushConsumer::new(inner))
852 }
853
854 pub async fn create_or_update(self) -> Result<PushConsumer<T, C>> {
856 let name = self.config.name.clone().unwrap_or_default();
857 let inner = self
858 .stream
859 .get_or_create_consumer(&name, self.config)
860 .await
861 .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
862 Ok(PushConsumer::new(inner))
863 }
864}
865
866#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
872pub enum AckPolicy {
873 #[default]
875 Explicit,
876 None,
878 All,
880}
881
882impl From<AckPolicy> for async_nats::jetstream::consumer::AckPolicy {
883 fn from(policy: AckPolicy) -> Self {
884 match policy {
885 AckPolicy::Explicit => async_nats::jetstream::consumer::AckPolicy::Explicit,
886 AckPolicy::None => async_nats::jetstream::consumer::AckPolicy::None,
887 AckPolicy::All => async_nats::jetstream::consumer::AckPolicy::All,
888 }
889 }
890}
891
892#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
894pub enum ReplayPolicy {
895 #[default]
897 Instant,
898 Original,
900}
901
902impl From<ReplayPolicy> for async_nats::jetstream::consumer::ReplayPolicy {
903 fn from(policy: ReplayPolicy) -> Self {
904 match policy {
905 ReplayPolicy::Instant => async_nats::jetstream::consumer::ReplayPolicy::Instant,
906 ReplayPolicy::Original => async_nats::jetstream::consumer::ReplayPolicy::Original,
907 }
908 }
909}
910
911#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
913pub enum DeliverPolicy {
914 #[default]
916 All,
917 Last,
919 New,
921 LastPerSubject,
923}
924
925impl From<DeliverPolicy> for async_nats::jetstream::consumer::DeliverPolicy {
926 fn from(policy: DeliverPolicy) -> Self {
927 match policy {
928 DeliverPolicy::All => async_nats::jetstream::consumer::DeliverPolicy::All,
929 DeliverPolicy::Last => async_nats::jetstream::consumer::DeliverPolicy::Last,
930 DeliverPolicy::New => async_nats::jetstream::consumer::DeliverPolicy::New,
931 DeliverPolicy::LastPerSubject => {
932 async_nats::jetstream::consumer::DeliverPolicy::LastPerSubject
933 }
934 }
935 }
936}