1use std::{
4 marker::PhantomData,
5 pin::Pin,
6 task::{Context, Poll},
7};
8
9use futures::Stream;
10use pin_project_lite::pin_project;
11use serde::de::DeserializeOwned;
12
13use crate::{codec::CodecType, error::Result};
14
15#[derive(Debug)]
17pub struct JetStreamMessage<T> {
18 pub payload: T,
20 pub subject: String,
22 pub stream_sequence: u64,
24 pub consumer_sequence: u64,
26 raw: async_nats::jetstream::Message,
28}
29
30impl<T> JetStreamMessage<T> {
31 pub async fn ack(&self) -> Result<()> {
33 self.raw
34 .ack()
35 .await
36 .map_err(|e| crate::error::Error::JetStream(e.to_string()))
37 }
38
39 pub async fn double_ack(&self) -> Result<()> {
41 self.raw
42 .double_ack()
43 .await
44 .map_err(|e| crate::error::Error::JetStream(e.to_string()))
45 }
46
47 pub async fn nak(&self) -> Result<()> {
49 self.raw
50 .ack_with(async_nats::jetstream::AckKind::Nak(None))
51 .await
52 .map_err(|e| crate::error::Error::JetStream(e.to_string()))
53 }
54
55 pub async fn nak_with_delay(&self, delay: std::time::Duration) -> Result<()> {
57 self.raw
58 .ack_with(async_nats::jetstream::AckKind::Nak(Some(delay)))
59 .await
60 .map_err(|e| crate::error::Error::JetStream(e.to_string()))
61 }
62
63 pub async fn in_progress(&self) -> Result<()> {
65 self.raw
66 .ack_with(async_nats::jetstream::AckKind::Progress)
67 .await
68 .map_err(|e| crate::error::Error::JetStream(e.to_string()))
69 }
70
71 pub async fn term(&self) -> Result<()> {
73 self.raw
74 .ack_with(async_nats::jetstream::AckKind::Term)
75 .await
76 .map_err(|e| crate::error::Error::JetStream(e.to_string()))
77 }
78
79 pub fn raw(&self) -> &async_nats::jetstream::Message {
81 &self.raw
82 }
83}
84
85pub struct PullConsumer<T, C: CodecType> {
96 inner: async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::pull::Config>,
97 _marker: PhantomData<(T, C)>,
98}
99
100impl<T, C: CodecType> PullConsumer<T, C> {
101 pub(crate) fn new(
103 inner: async_nats::jetstream::consumer::Consumer<
104 async_nats::jetstream::consumer::pull::Config,
105 >,
106 ) -> Self {
107 Self {
108 inner,
109 _marker: PhantomData,
110 }
111 }
112
113 pub fn inner(
115 &self,
116 ) -> &async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::pull::Config>
117 {
118 &self.inner
119 }
120
121 pub fn name(&self) -> &str {
123 &self.inner.cached_info().name
124 }
125}
126
127impl<T: DeserializeOwned, C: CodecType> PullConsumer<T, C> {
128 pub async fn fetch(&self, batch_size: usize) -> Result<PullBatch<T, C>> {
156 let inner = self
157 .inner
158 .fetch()
159 .max_messages(batch_size)
160 .messages()
161 .await
162 .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
163 Ok(PullBatch::new(inner))
164 }
165
166 pub fn fetch_builder(&self) -> FetchBuilder<T, C> {
168 FetchBuilder::new(self.inner.clone())
169 }
170
171 pub async fn messages(&self) -> Result<PullMessages<T, C>> {
201 let inner = self
202 .inner
203 .messages()
204 .await
205 .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
206 Ok(PullMessages::new(inner))
207 }
208}
209
210pin_project! {
211 pub struct PullBatch<T, C: CodecType> {
213 #[pin]
214 inner: async_nats::jetstream::consumer::pull::Batch,
215 _marker: PhantomData<(T, C)>,
216 }
217}
218
219impl<T, C: CodecType> PullBatch<T, C> {
220 fn new(inner: async_nats::jetstream::consumer::pull::Batch) -> Self {
221 Self {
222 inner,
223 _marker: PhantomData,
224 }
225 }
226}
227
228impl<T: DeserializeOwned, C: CodecType> Stream for PullBatch<T, C> {
229 type Item = Result<JetStreamMessage<T>>;
230
231 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
232 let this = self.project();
233
234 match this.inner.poll_next(cx) {
235 Poll::Ready(Some(Ok(msg))) => {
236 let info = msg.info().ok();
237 let stream_sequence = info.as_ref().map(|i| i.stream_sequence).unwrap_or(0);
238 let consumer_sequence = info.as_ref().map(|i| i.consumer_sequence).unwrap_or(0);
239 let result = C::decode(&msg.payload).map(|payload| JetStreamMessage {
240 payload,
241 subject: msg.subject.to_string(),
242 stream_sequence,
243 consumer_sequence,
244 raw: msg,
245 });
246 Poll::Ready(Some(result))
247 }
248 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(
249 crate::error::Error::JetStreamConsumer(e.to_string()),
250 ))),
251 Poll::Ready(None) => Poll::Ready(None),
252 Poll::Pending => Poll::Pending,
253 }
254 }
255}
256
257pin_project! {
258 pub struct PullMessages<T, C: CodecType> {
260 #[pin]
261 inner: async_nats::jetstream::consumer::pull::Stream,
262 _marker: PhantomData<(T, C)>,
263 }
264}
265
266impl<T, C: CodecType> PullMessages<T, C> {
267 fn new(inner: async_nats::jetstream::consumer::pull::Stream) -> Self {
268 Self {
269 inner,
270 _marker: PhantomData,
271 }
272 }
273}
274
275impl<T: DeserializeOwned, C: CodecType> Stream for PullMessages<T, C> {
276 type Item = Result<JetStreamMessage<T>>;
277
278 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
279 let this = self.project();
280
281 match this.inner.poll_next(cx) {
282 Poll::Ready(Some(Ok(msg))) => {
283 let info = msg.info().ok();
284 let stream_sequence = info.as_ref().map(|i| i.stream_sequence).unwrap_or(0);
285 let consumer_sequence = info.as_ref().map(|i| i.consumer_sequence).unwrap_or(0);
286 let result = C::decode(&msg.payload).map(|payload| JetStreamMessage {
287 payload,
288 subject: msg.subject.to_string(),
289 stream_sequence,
290 consumer_sequence,
291 raw: msg,
292 });
293 Poll::Ready(Some(result))
294 }
295 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(
296 crate::error::Error::JetStreamConsumer(e.to_string()),
297 ))),
298 Poll::Ready(None) => Poll::Ready(None),
299 Poll::Pending => Poll::Pending,
300 }
301 }
302}
303
304pub struct FetchBuilder<T, C: CodecType> {
306 inner: async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::pull::Config>,
307 max_messages: usize,
308 max_bytes: Option<usize>,
309 expires: Option<std::time::Duration>,
310 idle_heartbeat: Option<std::time::Duration>,
311 _marker: PhantomData<(T, C)>,
312}
313
314impl<T, C: CodecType> FetchBuilder<T, C> {
315 fn new(
316 inner: async_nats::jetstream::consumer::Consumer<
317 async_nats::jetstream::consumer::pull::Config,
318 >,
319 ) -> Self {
320 Self {
321 inner,
322 max_messages: 10,
323 max_bytes: None,
324 expires: None,
325 idle_heartbeat: None,
326 _marker: PhantomData,
327 }
328 }
329
330 pub fn max_messages(mut self, max: usize) -> Self {
332 self.max_messages = max;
333 self
334 }
335
336 pub fn max_bytes(mut self, max: usize) -> Self {
338 self.max_bytes = Some(max);
339 self
340 }
341
342 pub fn expires(mut self, duration: std::time::Duration) -> Self {
344 self.expires = Some(duration);
345 self
346 }
347
348 pub fn idle_heartbeat(mut self, duration: std::time::Duration) -> Self {
350 self.idle_heartbeat = Some(duration);
351 self
352 }
353}
354
355impl<T: DeserializeOwned, C: CodecType> FetchBuilder<T, C> {
356 pub async fn fetch(self) -> Result<PullBatch<T, C>> {
358 let mut fetch = self.inner.fetch().max_messages(self.max_messages);
359
360 if let Some(bytes) = self.max_bytes {
361 fetch = fetch.max_bytes(bytes);
362 }
363 if let Some(expires) = self.expires {
364 fetch = fetch.expires(expires);
365 }
366 if let Some(heartbeat) = self.idle_heartbeat {
367 fetch = fetch.heartbeat(heartbeat);
368 }
369
370 let inner = fetch
371 .messages()
372 .await
373 .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
374 Ok(PullBatch::new(inner))
375 }
376}
377
378pub struct PushConsumer<T, C: CodecType> {
389 inner: async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::push::Config>,
390 _marker: PhantomData<(T, C)>,
391}
392
393impl<T, C: CodecType> PushConsumer<T, C> {
394 pub(crate) fn new(
396 inner: async_nats::jetstream::consumer::Consumer<
397 async_nats::jetstream::consumer::push::Config,
398 >,
399 ) -> Self {
400 Self {
401 inner,
402 _marker: PhantomData,
403 }
404 }
405
406 pub fn inner(
408 &self,
409 ) -> &async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::push::Config>
410 {
411 &self.inner
412 }
413
414 pub fn name(&self) -> &str {
416 &self.inner.cached_info().name
417 }
418}
419
420impl<T: DeserializeOwned, C: CodecType> PushConsumer<T, C> {
421 pub async fn messages(&self) -> Result<PushMessages<T, C>> {
423 let inner = self
424 .inner
425 .messages()
426 .await
427 .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
428 Ok(PushMessages::new(inner))
429 }
430}
431
432pin_project! {
433 pub struct PushMessages<T, C: CodecType> {
435 #[pin]
436 inner: async_nats::jetstream::consumer::push::Messages,
437 _marker: PhantomData<(T, C)>,
438 }
439}
440
441impl<T, C: CodecType> PushMessages<T, C> {
442 fn new(inner: async_nats::jetstream::consumer::push::Messages) -> Self {
443 Self {
444 inner,
445 _marker: PhantomData,
446 }
447 }
448}
449
450impl<T: DeserializeOwned, C: CodecType> Stream for PushMessages<T, C> {
451 type Item = Result<JetStreamMessage<T>>;
452
453 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
454 let this = self.project();
455
456 match this.inner.poll_next(cx) {
457 Poll::Ready(Some(Ok(msg))) => {
458 let info = msg.info().ok();
459 let stream_sequence = info.as_ref().map(|i| i.stream_sequence).unwrap_or(0);
460 let consumer_sequence = info.as_ref().map(|i| i.consumer_sequence).unwrap_or(0);
461 let result = C::decode(&msg.payload).map(|payload| JetStreamMessage {
462 payload,
463 subject: msg.subject.to_string(),
464 stream_sequence,
465 consumer_sequence,
466 raw: msg,
467 });
468 Poll::Ready(Some(result))
469 }
470 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(
471 crate::error::Error::JetStreamConsumer(e.to_string()),
472 ))),
473 Poll::Ready(None) => Poll::Ready(None),
474 Poll::Pending => Poll::Pending,
475 }
476 }
477}
478
479pub struct PullConsumerBuilder<T, C: CodecType> {
485 stream: async_nats::jetstream::stream::Stream,
486 config: async_nats::jetstream::consumer::pull::Config,
487 _marker: PhantomData<(T, C)>,
488}
489
490impl<T, C: CodecType> PullConsumerBuilder<T, C> {
491 pub(crate) fn new(stream: async_nats::jetstream::stream::Stream, name: String) -> Self {
493 Self {
494 stream,
495 config: async_nats::jetstream::consumer::pull::Config {
496 name: Some(name),
497 ..Default::default()
498 },
499 _marker: PhantomData,
500 }
501 }
502
503 pub fn durable(mut self) -> Self {
505 self.config.durable_name = self.config.name.clone();
506 self
507 }
508
509 pub fn durable_name(mut self, name: impl Into<String>) -> Self {
511 self.config.durable_name = Some(name.into());
512 self
513 }
514
515 pub fn filter_subject(mut self, subject: impl Into<String>) -> Self {
517 self.config.filter_subject = subject.into();
518 self
519 }
520
521 pub fn filter_subjects(mut self, subjects: Vec<String>) -> Self {
523 self.config.filter_subjects = subjects;
524 self
525 }
526
527 pub fn description(mut self, description: impl Into<String>) -> Self {
529 self.config.description = Some(description.into());
530 self
531 }
532
533 pub fn ack_policy(mut self, policy: AckPolicy) -> Self {
535 self.config.ack_policy = policy.into();
536 self
537 }
538
539 pub fn ack_wait(mut self, duration: std::time::Duration) -> Self {
541 self.config.ack_wait = duration;
542 self
543 }
544
545 pub fn max_deliver(mut self, max: i64) -> Self {
547 self.config.max_deliver = max;
548 self
549 }
550
551 pub fn replay_policy(mut self, policy: ReplayPolicy) -> Self {
553 self.config.replay_policy = policy.into();
554 self
555 }
556
557 pub fn deliver_policy(mut self, policy: DeliverPolicy) -> Self {
559 self.config.deliver_policy = policy.into();
560 self
561 }
562
563 pub fn start_sequence(mut self, seq: u64) -> Self {
565 self.config.deliver_policy = async_nats::jetstream::consumer::DeliverPolicy::ByStartSequence { start_sequence: seq };
566 self
567 }
568
569 pub fn start_time(mut self, time: time::OffsetDateTime) -> Self {
571 self.config.deliver_policy = async_nats::jetstream::consumer::DeliverPolicy::ByStartTime { start_time: time };
572 self
573 }
574
575 pub fn max_ack_pending(mut self, max: i64) -> Self {
577 self.config.max_ack_pending = max;
578 self
579 }
580
581 pub fn max_waiting(mut self, max: i64) -> Self {
583 self.config.max_waiting = max;
584 self
585 }
586
587 pub fn max_batch(mut self, max: i64) -> Self {
589 self.config.max_batch = max;
590 self
591 }
592
593 pub fn max_bytes(mut self, max: i64) -> Self {
595 self.config.max_bytes = max;
596 self
597 }
598
599 pub fn max_expires(mut self, duration: std::time::Duration) -> Self {
601 self.config.max_expires = duration;
602 self
603 }
604
605 pub fn inactive_threshold(mut self, duration: std::time::Duration) -> Self {
607 self.config.inactive_threshold = duration;
608 self
609 }
610
611 pub fn headers_only(mut self, headers_only: bool) -> Self {
613 self.config.headers_only = headers_only;
614 self
615 }
616
617 pub fn replicas(mut self, replicas: usize) -> Self {
619 self.config.num_replicas = replicas;
620 self
621 }
622
623 pub fn memory_storage(mut self, memory: bool) -> Self {
625 self.config.memory_storage = memory;
626 self
627 }
628
629 pub fn backoff(mut self, backoff: Vec<std::time::Duration>) -> Self {
631 self.config.backoff = backoff;
632 self
633 }
634
635 pub fn metadata(mut self, metadata: std::collections::HashMap<String, String>) -> Self {
637 self.config.metadata = metadata;
638 self
639 }
640
641 pub async fn create(self) -> Result<PullConsumer<T, C>> {
643 let inner = self
644 .stream
645 .create_consumer(self.config)
646 .await
647 .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
648 Ok(PullConsumer::new(inner))
649 }
650
651 pub async fn create_or_update(self) -> Result<PullConsumer<T, C>> {
653 let name = self.config.name.clone().unwrap_or_default();
654 let inner = self
655 .stream
656 .get_or_create_consumer(&name, self.config)
657 .await
658 .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
659 Ok(PullConsumer::new(inner))
660 }
661}
662
663pub struct PushConsumerBuilder<T, C: CodecType> {
669 stream: async_nats::jetstream::stream::Stream,
670 config: async_nats::jetstream::consumer::push::Config,
671 _marker: PhantomData<(T, C)>,
672}
673
674impl<T, C: CodecType> PushConsumerBuilder<T, C> {
675 pub(crate) fn new(stream: async_nats::jetstream::stream::Stream, name: String) -> Self {
677 Self {
678 stream,
679 config: async_nats::jetstream::consumer::push::Config {
680 name: Some(name),
681 ..Default::default()
682 },
683 _marker: PhantomData,
684 }
685 }
686
687 pub fn durable(mut self) -> Self {
689 self.config.durable_name = self.config.name.clone();
690 self
691 }
692
693 pub fn durable_name(mut self, name: impl Into<String>) -> Self {
695 self.config.durable_name = Some(name.into());
696 self
697 }
698
699 pub fn deliver_subject(mut self, subject: impl Into<String>) -> Self {
701 self.config.deliver_subject = subject.into();
702 self
703 }
704
705 pub fn deliver_group(mut self, group: impl Into<String>) -> Self {
707 self.config.deliver_group = Some(group.into());
708 self
709 }
710
711 pub fn filter_subject(mut self, subject: impl Into<String>) -> Self {
713 self.config.filter_subject = subject.into();
714 self
715 }
716
717 pub fn filter_subjects(mut self, subjects: Vec<String>) -> Self {
719 self.config.filter_subjects = subjects;
720 self
721 }
722
723 pub fn description(mut self, description: impl Into<String>) -> Self {
725 self.config.description = Some(description.into());
726 self
727 }
728
729 pub fn ack_policy(mut self, policy: AckPolicy) -> Self {
731 self.config.ack_policy = policy.into();
732 self
733 }
734
735 pub fn ack_wait(mut self, duration: std::time::Duration) -> Self {
737 self.config.ack_wait = duration;
738 self
739 }
740
741 pub fn max_deliver(mut self, max: i64) -> Self {
743 self.config.max_deliver = max;
744 self
745 }
746
747 pub fn replay_policy(mut self, policy: ReplayPolicy) -> Self {
749 self.config.replay_policy = policy.into();
750 self
751 }
752
753 pub fn deliver_policy(mut self, policy: DeliverPolicy) -> Self {
755 self.config.deliver_policy = policy.into();
756 self
757 }
758
759 pub fn start_sequence(mut self, seq: u64) -> Self {
761 self.config.deliver_policy = async_nats::jetstream::consumer::DeliverPolicy::ByStartSequence { start_sequence: seq };
762 self
763 }
764
765 pub fn start_time(mut self, time: time::OffsetDateTime) -> Self {
767 self.config.deliver_policy = async_nats::jetstream::consumer::DeliverPolicy::ByStartTime { start_time: time };
768 self
769 }
770
771 pub fn rate_limit(mut self, limit: u64) -> Self {
773 self.config.rate_limit = limit;
774 self
775 }
776
777 pub fn max_ack_pending(mut self, max: i64) -> Self {
779 self.config.max_ack_pending = max;
780 self
781 }
782
783 pub fn idle_heartbeat(mut self, duration: std::time::Duration) -> Self {
785 self.config.idle_heartbeat = duration;
786 self
787 }
788
789 pub fn flow_control(mut self, enabled: bool) -> Self {
791 self.config.flow_control = enabled;
792 self
793 }
794
795 pub fn headers_only(mut self, headers_only: bool) -> Self {
797 self.config.headers_only = headers_only;
798 self
799 }
800
801 pub fn replicas(mut self, replicas: usize) -> Self {
803 self.config.num_replicas = replicas;
804 self
805 }
806
807 pub fn memory_storage(mut self, memory: bool) -> Self {
809 self.config.memory_storage = memory;
810 self
811 }
812
813 pub fn backoff(mut self, backoff: Vec<std::time::Duration>) -> Self {
815 self.config.backoff = backoff;
816 self
817 }
818
819 pub fn metadata(mut self, metadata: std::collections::HashMap<String, String>) -> Self {
821 self.config.metadata = metadata;
822 self
823 }
824
825 pub fn inactive_threshold(mut self, duration: std::time::Duration) -> Self {
827 self.config.inactive_threshold = duration;
828 self
829 }
830
831 pub async fn create(self) -> Result<PushConsumer<T, C>> {
833 let inner = self
834 .stream
835 .create_consumer(self.config)
836 .await
837 .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
838 Ok(PushConsumer::new(inner))
839 }
840
841 pub async fn create_or_update(self) -> Result<PushConsumer<T, C>> {
843 let name = self.config.name.clone().unwrap_or_default();
844 let inner = self
845 .stream
846 .get_or_create_consumer(&name, self.config)
847 .await
848 .map_err(|e| crate::error::Error::JetStreamConsumer(e.to_string()))?;
849 Ok(PushConsumer::new(inner))
850 }
851}
852
853#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
859pub enum AckPolicy {
860 #[default]
862 Explicit,
863 None,
865 All,
867}
868
869impl From<AckPolicy> for async_nats::jetstream::consumer::AckPolicy {
870 fn from(policy: AckPolicy) -> Self {
871 match policy {
872 AckPolicy::Explicit => async_nats::jetstream::consumer::AckPolicy::Explicit,
873 AckPolicy::None => async_nats::jetstream::consumer::AckPolicy::None,
874 AckPolicy::All => async_nats::jetstream::consumer::AckPolicy::All,
875 }
876 }
877}
878
879#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
881pub enum ReplayPolicy {
882 #[default]
884 Instant,
885 Original,
887}
888
889impl From<ReplayPolicy> for async_nats::jetstream::consumer::ReplayPolicy {
890 fn from(policy: ReplayPolicy) -> Self {
891 match policy {
892 ReplayPolicy::Instant => async_nats::jetstream::consumer::ReplayPolicy::Instant,
893 ReplayPolicy::Original => async_nats::jetstream::consumer::ReplayPolicy::Original,
894 }
895 }
896}
897
898#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
900pub enum DeliverPolicy {
901 #[default]
903 All,
904 Last,
906 New,
908 LastPerSubject,
910}
911
912impl From<DeliverPolicy> for async_nats::jetstream::consumer::DeliverPolicy {
913 fn from(policy: DeliverPolicy) -> Self {
914 match policy {
915 DeliverPolicy::All => async_nats::jetstream::consumer::DeliverPolicy::All,
916 DeliverPolicy::Last => async_nats::jetstream::consumer::DeliverPolicy::Last,
917 DeliverPolicy::New => async_nats::jetstream::consumer::DeliverPolicy::New,
918 DeliverPolicy::LastPerSubject => {
919 async_nats::jetstream::consumer::DeliverPolicy::LastPerSubject
920 }
921 }
922 }
923}