ruststream_nats/
subscriber.rs1use async_nats::jetstream::consumer::pull::Stream as PullStream;
4use futures::{Stream, future::Either};
5use ruststream::Subscriber;
6use tokio_stream::StreamExt;
7use tracing::warn;
8
9use crate::{
10 error::NatsError,
11 message::{CoreMessage, JetStreamMessage, NatsMessage},
12};
13
14enum SubscriberKind {
15 Core {
16 inner: Option<async_nats::Subscriber>,
17 },
18 JetStream {
19 inner: Option<Box<PullStream>>,
20 stream_name: String,
21 },
22}
23
24pub struct NatsSubscriber {
30 subject: String,
31 kind: SubscriberKind,
32}
33
34impl std::fmt::Debug for NatsSubscriber {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 let mut s = f.debug_struct("NatsSubscriber");
37 s.field("subject", &self.subject);
38 match &self.kind {
39 SubscriberKind::Core { .. } => {
40 s.field("kind", &"core");
41 }
42 SubscriberKind::JetStream { stream_name, .. } => {
43 s.field("kind", &"jetstream").field("stream", stream_name);
44 }
45 }
46 s.finish_non_exhaustive()
47 }
48}
49
50impl NatsSubscriber {
51 pub(crate) const fn from_core(subject: String, inner: async_nats::Subscriber) -> Self {
52 Self {
53 subject,
54 kind: SubscriberKind::Core { inner: Some(inner) },
55 }
56 }
57
58 pub(crate) fn from_jetstream(subject: String, stream_name: String, inner: PullStream) -> Self {
59 Self {
60 subject,
61 kind: SubscriberKind::JetStream {
62 inner: Some(Box::new(inner)),
63 stream_name,
64 },
65 }
66 }
67}
68
69impl Subscriber for NatsSubscriber {
70 type Message = NatsMessage;
71 type Error = NatsError;
72
73 fn stream(&mut self) -> impl Stream<Item = Result<Self::Message, Self::Error>> + Send + '_ {
74 match &mut self.kind {
75 SubscriberKind::Core { inner } => {
76 let inner = inner
77 .take()
78 .expect("NatsSubscriber::stream called more than once");
79 Either::Left(
80 inner.map(|msg| Ok(NatsMessage::Core(Box::new(CoreMessage::new(msg))))),
81 )
82 }
83 SubscriberKind::JetStream { inner, .. } => {
84 let inner = *inner
85 .take()
86 .expect("NatsSubscriber::stream called more than once");
87 Either::Right(inner.map(|item| match item {
88 Ok(msg) => Ok(NatsMessage::JetStream(Box::new(JetStreamMessage::new(msg)))),
89 Err(err) => {
90 warn!(target: "ruststream::nats", error = %err, "jetstream fetch error");
91 Err(NatsError::JetStream(Box::new(err)))
92 }
93 }))
94 }
95 }
96 }
97}