intercom_rs/
subscriber.rs

1//! Typed subscriber with Stream trait implementation.
2
3use std::{
4    marker::PhantomData,
5    pin::Pin,
6    task::{Context, Poll},
7};
8
9use futures::Stream;
10use pin_project_lite::pin_project;
11use serde::de::DeserializeOwned;
12
13use crate::{codec::CodecType, error::Result};
14
15/// A typed message received from a subscription.
16#[derive(Debug)]
17pub struct Message<T> {
18    /// The decoded message payload.
19    pub payload: T,
20    /// The subject the message was published to.
21    pub subject: String,
22    /// The reply subject, if any.
23    pub reply: Option<String>,
24    /// The raw underlying message (for advanced use cases).
25    raw: async_nats::Message,
26}
27
28impl<T> Message<T> {
29    /// Get the raw underlying NATS message.
30    pub fn raw(&self) -> &async_nats::Message {
31        &self.raw
32    }
33}
34
35pin_project! {
36    /// A typed subscriber that implements [`Stream`].
37    ///
38    /// # Type Parameters
39    ///
40    /// * `T` - The message type that will be deserialized from incoming messages.
41    /// * `C` - The codec type used for deserialization.
42    ///
43    /// # Example
44    ///
45    /// ```no_run
46    /// use intercom::{Client, MsgPackCodec};
47    /// use serde::{Deserialize, Serialize};
48    /// use futures::StreamExt;
49    ///
50    /// #[derive(Serialize, Deserialize, Debug)]
51    /// struct MyMessage {
52    ///     content: String,
53    /// }
54    ///
55    /// # async fn example() -> intercom::Result<()> {
56    /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
57    /// let mut subscriber = client.subscribe::<MyMessage>("subject").await?;
58    ///
59    /// while let Some(result) = subscriber.next().await {
60    ///     match result {
61    ///         Ok(msg) => println!("Received: {:?}", msg.payload),
62    ///         Err(e) => eprintln!("Decode error: {}", e),
63    ///     }
64    /// }
65    /// # Ok(())
66    /// # }
67    /// ```
68    pub struct Subscriber<T, C: CodecType> {
69        #[pin]
70        inner: async_nats::Subscriber,
71        _marker: PhantomData<(T, C)>,
72    }
73}
74
75impl<T, C: CodecType> Subscriber<T, C> {
76    /// Create a new subscriber wrapping an async-nats subscriber.
77    pub(crate) fn new(inner: async_nats::Subscriber) -> Self {
78        Self {
79            inner,
80            _marker: PhantomData,
81        }
82    }
83
84    /// Get the underlying async-nats subscriber.
85    pub fn inner(&self) -> &async_nats::Subscriber {
86        &self.inner
87    }
88
89    /// Unsubscribe from the subject.
90    pub async fn unsubscribe(mut self) -> Result<()> {
91        self.inner
92            .unsubscribe()
93            .await
94            .map_err(|e| crate::error::Error::Nats(e.into()))
95    }
96}
97
98impl<T: DeserializeOwned, C: CodecType> Stream for Subscriber<T, C> {
99    type Item = Result<Message<T>>;
100
101    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
102        let this = self.project();
103
104        match this.inner.poll_next(cx) {
105            Poll::Ready(Some(msg)) => {
106                let reply = msg.reply.as_ref().map(|r| r.to_string());
107                let result = C::decode(&msg.payload).map(|payload| Message {
108                    payload,
109                    subject: msg.subject.to_string(),
110                    reply,
111                    raw: msg,
112                });
113                Poll::Ready(Some(result))
114            }
115            Poll::Ready(None) => Poll::Ready(None),
116            Poll::Pending => Poll::Pending,
117        }
118    }
119}