intercom_rs/
publisher.rs

1//! Typed publisher with Sink trait implementation.
2//!
3//! The [`Publisher`] type provides a typed interface for publishing messages to NATS.
4//! It implements the [`Sink`] trait for convenient stream-based publishing.
5//!
6//! **Note**: The Sink implementation uses fire-and-forget semantics for efficiency.
7//! For applications requiring acknowledgment of each message, use the
8//! [`Client::publish`](crate::Client::publish) method directly or the JetStream
9//! [`JetStreamContext::publish`](crate::JetStreamContext::publish) for guaranteed delivery.
10
11use std::{
12    collections::VecDeque,
13    marker::PhantomData,
14    pin::Pin,
15    sync::Arc,
16    task::{Context, Poll},
17};
18
19use futures::Sink;
20use serde::Serialize;
21
22use crate::{codec::CodecType, error::Error};
23
24/// A typed publisher that implements [`Sink`].
25///
26/// # Type Parameters
27///
28/// * `T` - The message type that will be serialized for publishing.
29/// * `C` - The codec type used for serialization.
30///
31/// # Note on Error Handling
32///
33/// The [`Sink`] implementation uses fire-and-forget semantics for efficiency.
34/// Publish errors are not propagated back through the Sink interface.
35/// For applications requiring guaranteed delivery or error handling:
36/// - Use [`Publisher::publish`] for direct publish with error handling
37/// - Use [`Client::publish`](crate::Client::publish) for basic NATS
38/// - Use [`JetStreamContext::publish`](crate::JetStreamContext::publish) for JetStream
39///
40/// # Example
41///
42/// ```no_run
43/// use intercom::{Client, MsgPackCodec};
44/// use serde::{Deserialize, Serialize};
45/// use futures::SinkExt;
46///
47/// #[derive(Serialize, Deserialize)]
48/// struct MyMessage {
49///     content: String,
50/// }
51///
52/// # async fn example() -> intercom::Result<()> {
53/// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
54/// let mut publisher = client.publisher::<MyMessage>("subject");
55///
56/// // Use Sink trait methods
57/// publisher.send(MyMessage { content: "hello".into() }).await?;
58///
59/// // Or use feed + flush for batching
60/// publisher.feed(MyMessage { content: "msg1".into() }).await?;
61/// publisher.feed(MyMessage { content: "msg2".into() }).await?;
62/// publisher.flush().await?;
63/// # Ok(())
64/// # }
65/// ```
66pub struct Publisher<T, C: CodecType> {
67    client: Arc<async_nats::Client>,
68    subject: String,
69    buffer: VecDeque<Vec<u8>>,
70    _marker: PhantomData<(T, C)>,
71}
72
73impl<T, C: CodecType> Publisher<T, C> {
74    /// Create a new publisher for a subject.
75    pub(crate) fn new(client: Arc<async_nats::Client>, subject: String) -> Self {
76        Self {
77            client,
78            subject,
79            buffer: VecDeque::new(),
80            _marker: PhantomData,
81        }
82    }
83
84    /// Get the subject this publisher publishes to.
85    pub fn subject(&self) -> &str {
86        &self.subject
87    }
88
89    /// Get the underlying async-nats client.
90    pub fn client(&self) -> &async_nats::Client {
91        &self.client
92    }
93}
94
95impl<T: Serialize, C: CodecType> Publisher<T, C> {
96    /// Publish a message directly with error handling.
97    ///
98    /// Unlike the [`Sink`] implementation, this method returns any publish errors.
99    ///
100    /// # Example
101    ///
102    /// ```no_run
103    /// use intercom::{Client, MsgPackCodec};
104    /// use serde::{Deserialize, Serialize};
105    ///
106    /// #[derive(Serialize, Deserialize)]
107    /// struct MyMessage { content: String }
108    ///
109    /// # async fn example() -> intercom::Result<()> {
110    /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
111    /// let publisher = client.publisher::<MyMessage>("subject");
112    ///
113    /// // Direct publish with error handling
114    /// publisher.publish(&MyMessage { content: "hello".into() }).await?;
115    /// # Ok(())
116    /// # }
117    /// ```
118    pub async fn publish(&self, message: &T) -> crate::error::Result<()> {
119        let data = C::encode(message)?;
120        self.client
121            .publish(self.subject.clone(), data.into())
122            .await?;
123        Ok(())
124    }
125}
126
127impl<T, C: CodecType> Clone for Publisher<T, C> {
128    fn clone(&self) -> Self {
129        Self {
130            client: self.client.clone(),
131            subject: self.subject.clone(),
132            buffer: VecDeque::new(),
133            _marker: PhantomData,
134        }
135    }
136}
137
138impl<T: Serialize + Unpin, C: CodecType + Unpin> Sink<T> for Publisher<T, C> {
139    type Error = Error;
140
141    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
142        // The NATS client is always ready to accept messages
143        Poll::Ready(Ok(()))
144    }
145
146    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
147        let data = C::encode(&item)?;
148        let this = self.get_mut();
149        this.buffer.push_back(data);
150        Ok(())
151    }
152
153    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
154        let this = self.get_mut();
155
156        // Publish all buffered messages
157        // Note: Uses fire-and-forget for Sink compatibility.
158        // Use Publisher::publish() for error handling.
159        while let Some(data) = this.buffer.pop_front() {
160            let client = this.client.clone();
161            let subject = this.subject.clone();
162            tokio::spawn(async move {
163                let _ = client.publish(subject, data.into()).await;
164            });
165        }
166
167        Poll::Ready(Ok(()))
168    }
169
170    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
171        self.poll_flush(cx)
172    }
173}