intercom_rs/subscriber.rs
1//! Typed subscriber with Stream trait implementation.
2
3use std::{
4 marker::PhantomData,
5 pin::Pin,
6 task::{Context, Poll},
7};
8
9use futures::Stream;
10use pin_project_lite::pin_project;
11use serde::de::DeserializeOwned;
12
13use crate::{codec::CodecType, error::Result};
14
15/// A typed message received from a subscription.
16#[derive(Debug)]
17pub struct Message<T> {
18 /// The decoded message payload.
19 pub payload: T,
20 /// The subject the message was published to.
21 pub subject: String,
22 /// The reply subject, if any.
23 pub reply: Option<String>,
24 /// The raw underlying message (for advanced use cases).
25 raw: async_nats::Message,
26}
27
28impl<T> Message<T> {
29 /// Get the raw underlying NATS message.
30 pub fn raw(&self) -> &async_nats::Message {
31 &self.raw
32 }
33}
34
35pin_project! {
36 /// A typed subscriber that implements [`Stream`].
37 ///
38 /// # Type Parameters
39 ///
40 /// * `T` - The message type that will be deserialized from incoming messages.
41 /// * `C` - The codec type used for deserialization.
42 ///
43 /// # Example
44 ///
45 /// ```no_run
46 /// use intercom::{Client, MsgPackCodec};
47 /// use serde::{Deserialize, Serialize};
48 /// use futures::StreamExt;
49 ///
50 /// #[derive(Serialize, Deserialize, Debug)]
51 /// struct MyMessage {
52 /// content: String,
53 /// }
54 ///
55 /// # async fn example() -> intercom::Result<()> {
56 /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
57 /// let mut subscriber = client.subscribe::<MyMessage>("subject").await?;
58 ///
59 /// while let Some(result) = subscriber.next().await {
60 /// match result {
61 /// Ok(msg) => println!("Received: {:?}", msg.payload),
62 /// Err(e) => eprintln!("Decode error: {}", e),
63 /// }
64 /// }
65 /// # Ok(())
66 /// # }
67 /// ```
68 pub struct Subscriber<T, C: CodecType> {
69 #[pin]
70 inner: async_nats::Subscriber,
71 _marker: PhantomData<(T, C)>,
72 }
73}
74
75impl<T, C: CodecType> Subscriber<T, C> {
76 /// Create a new subscriber wrapping an async-nats subscriber.
77 pub(crate) fn new(inner: async_nats::Subscriber) -> Self {
78 Self {
79 inner,
80 _marker: PhantomData,
81 }
82 }
83
84 /// Get the underlying async-nats subscriber.
85 pub fn inner(&self) -> &async_nats::Subscriber {
86 &self.inner
87 }
88
89 /// Unsubscribe from the subject.
90 pub async fn unsubscribe(mut self) -> Result<()> {
91 self.inner
92 .unsubscribe()
93 .await
94 .map_err(|e| crate::error::Error::Nats(e.into()))
95 }
96}
97
98impl<T: DeserializeOwned, C: CodecType> Stream for Subscriber<T, C> {
99 type Item = Result<Message<T>>;
100
101 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
102 let this = self.project();
103
104 match this.inner.poll_next(cx) {
105 Poll::Ready(Some(msg)) => {
106 let reply = msg.reply.as_ref().map(|r| r.to_string());
107 let result = C::decode(&msg.payload).map(|payload| Message {
108 payload,
109 subject: msg.subject.to_string(),
110 reply,
111 raw: msg,
112 });
113 Poll::Ready(Some(result))
114 }
115 Poll::Ready(None) => Poll::Ready(None),
116 Poll::Pending => Poll::Pending,
117 }
118 }
119}