intercom_rs/publisher.rs
1//! Typed publisher with Sink trait implementation.
2//!
3//! The [`Publisher`] type provides a typed interface for publishing messages to NATS.
4//! It implements the [`Sink`] trait for convenient stream-based publishing.
5//!
6//! **Note**: The Sink implementation uses fire-and-forget semantics for efficiency.
7//! For applications requiring acknowledgment of each message, use the
8//! [`Client::publish`](crate::Client::publish) method directly or the JetStream
9//! [`JetStreamContext::publish`](crate::JetStreamContext::publish) for guaranteed delivery.
10
11use std::{
12 collections::VecDeque,
13 marker::PhantomData,
14 pin::Pin,
15 sync::Arc,
16 task::{Context, Poll},
17};
18
19use futures::Sink;
20use serde::Serialize;
21
22use crate::{codec::CodecType, error::Error};
23
24/// A typed publisher that implements [`Sink`].
25///
26/// # Type Parameters
27///
28/// * `T` - The message type that will be serialized for publishing.
29/// * `C` - The codec type used for serialization.
30///
31/// # Note on Error Handling
32///
33/// The [`Sink`] implementation uses fire-and-forget semantics for efficiency.
34/// Publish errors are not propagated back through the Sink interface.
35/// For applications requiring guaranteed delivery or error handling:
36/// - Use [`Publisher::publish`] for direct publish with error handling
37/// - Use [`Client::publish`](crate::Client::publish) for basic NATS
38/// - Use [`JetStreamContext::publish`](crate::JetStreamContext::publish) for JetStream
39///
40/// # Example
41///
42/// ```no_run
43/// use intercom::{Client, MsgPackCodec};
44/// use serde::{Deserialize, Serialize};
45/// use futures::SinkExt;
46///
47/// #[derive(Serialize, Deserialize)]
48/// struct MyMessage {
49/// content: String,
50/// }
51///
52/// # async fn example() -> intercom::Result<()> {
53/// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
54/// let mut publisher = client.publisher::<MyMessage>("subject");
55///
56/// // Use Sink trait methods
57/// publisher.send(MyMessage { content: "hello".into() }).await?;
58///
59/// // Or use feed + flush for batching
60/// publisher.feed(MyMessage { content: "msg1".into() }).await?;
61/// publisher.feed(MyMessage { content: "msg2".into() }).await?;
62/// publisher.flush().await?;
63/// # Ok(())
64/// # }
65/// ```
66pub struct Publisher<T, C: CodecType> {
67 client: Arc<async_nats::Client>,
68 subject: String,
69 buffer: VecDeque<Vec<u8>>,
70 _marker: PhantomData<(T, C)>,
71}
72
73impl<T, C: CodecType> Publisher<T, C> {
74 /// Create a new publisher for a subject.
75 pub(crate) fn new(client: Arc<async_nats::Client>, subject: String) -> Self {
76 Self {
77 client,
78 subject,
79 buffer: VecDeque::new(),
80 _marker: PhantomData,
81 }
82 }
83
84 /// Get the subject this publisher publishes to.
85 pub fn subject(&self) -> &str {
86 &self.subject
87 }
88
89 /// Get the underlying async-nats client.
90 pub fn client(&self) -> &async_nats::Client {
91 &self.client
92 }
93}
94
95impl<T: Serialize, C: CodecType> Publisher<T, C> {
96 /// Publish a message directly with error handling.
97 ///
98 /// Unlike the [`Sink`] implementation, this method returns any publish errors.
99 ///
100 /// # Example
101 ///
102 /// ```no_run
103 /// use intercom::{Client, MsgPackCodec};
104 /// use serde::{Deserialize, Serialize};
105 ///
106 /// #[derive(Serialize, Deserialize)]
107 /// struct MyMessage { content: String }
108 ///
109 /// # async fn example() -> intercom::Result<()> {
110 /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
111 /// let publisher = client.publisher::<MyMessage>("subject");
112 ///
113 /// // Direct publish with error handling
114 /// publisher.publish(&MyMessage { content: "hello".into() }).await?;
115 /// # Ok(())
116 /// # }
117 /// ```
118 pub async fn publish(&self, message: &T) -> crate::error::Result<()> {
119 let data = C::encode(message)?;
120 self.client
121 .publish(self.subject.clone(), data.into())
122 .await?;
123 Ok(())
124 }
125}
126
127impl<T, C: CodecType> Clone for Publisher<T, C> {
128 fn clone(&self) -> Self {
129 Self {
130 client: self.client.clone(),
131 subject: self.subject.clone(),
132 buffer: VecDeque::new(),
133 _marker: PhantomData,
134 }
135 }
136}
137
138impl<T: Serialize + Unpin, C: CodecType + Unpin> Sink<T> for Publisher<T, C> {
139 type Error = Error;
140
141 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
142 // The NATS client is always ready to accept messages
143 Poll::Ready(Ok(()))
144 }
145
146 fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
147 let data = C::encode(&item)?;
148 let this = self.get_mut();
149 this.buffer.push_back(data);
150 Ok(())
151 }
152
153 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
154 let this = self.get_mut();
155
156 // Publish all buffered messages
157 // Note: Uses fire-and-forget for Sink compatibility.
158 // Use Publisher::publish() for error handling.
159 while let Some(data) = this.buffer.pop_front() {
160 let client = this.client.clone();
161 let subject = this.subject.clone();
162 tokio::spawn(async move {
163 let _ = client.publish(subject, data.into()).await;
164 });
165 }
166
167 Poll::Ready(Ok(()))
168 }
169
170 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
171 self.poll_flush(cx)
172 }
173}