intercom_rs/jetstream/
context.rs

1//! JetStream context for managing streams and publishing messages.
2
3use std::marker::PhantomData;
4
5use serde::{de::DeserializeOwned, Serialize};
6
7use crate::{codec::CodecType, error::Result};
8
9use super::stream::{Stream, StreamBuilder};
10
11/// A typed JetStream context with configurable codec.
12///
13/// Provides methods for creating streams, publishing messages, and managing
14/// JetStream resources.
15///
16/// # Type Parameters
17///
18/// * `C` - The codec type used for message serialization
19#[derive(Clone)]
20pub struct JetStreamContext<C: CodecType> {
21    inner: async_nats::jetstream::Context,
22    _codec: PhantomData<C>,
23}
24
25impl<C: CodecType> JetStreamContext<C> {
26    /// Create a new JetStream context.
27    pub(crate) fn new(inner: async_nats::jetstream::Context) -> Self {
28        Self {
29            inner,
30            _codec: PhantomData,
31        }
32    }
33
34    /// Get the underlying async-nats JetStream context.
35    pub fn inner(&self) -> &async_nats::jetstream::Context {
36        &self.inner
37    }
38
39    /// Create a stream builder for configuring and creating a stream.
40    ///
41    /// # Example
42    ///
43    /// ```no_run
44    /// use intercom::{Client, MsgPackCodec};
45    ///
46    /// # async fn example() -> intercom::Result<()> {
47    /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
48    /// let jetstream = client.jetstream();
49    ///
50    /// let stream = jetstream
51    ///     .stream_builder("my-stream")
52    ///     .subjects(vec!["events.>".to_string()])
53    ///     .max_messages(1_000_000)
54    ///     .create()
55    ///     .await?;
56    /// # Ok(())
57    /// # }
58    /// ```
59    pub fn stream_builder(&self, name: &str) -> StreamBuilder<C> {
60        StreamBuilder::new(self.inner.clone(), name.to_string())
61    }
62
63    /// Get an existing stream by name.
64    ///
65    /// # Example
66    ///
67    /// ```no_run
68    /// use intercom::{Client, MsgPackCodec};
69    ///
70    /// # async fn example() -> intercom::Result<()> {
71    /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
72    /// let jetstream = client.jetstream();
73    ///
74    /// let stream = jetstream.get_stream("my-stream").await?;
75    /// # Ok(())
76    /// # }
77    /// ```
78    pub async fn get_stream(&self, name: &str) -> Result<Stream<C>> {
79        let inner = self
80            .inner
81            .get_stream(name)
82            .await
83            .map_err(|e| crate::error::Error::JetStreamStream(e.to_string()))?;
84        Ok(Stream::new(inner))
85    }
86
87    /// Delete a stream by name.
88    ///
89    /// # Example
90    ///
91    /// ```no_run
92    /// use intercom::{Client, MsgPackCodec};
93    ///
94    /// # async fn example() -> intercom::Result<()> {
95    /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
96    /// let jetstream = client.jetstream();
97    ///
98    /// jetstream.delete_stream("my-stream").await?;
99    /// # Ok(())
100    /// # }
101    /// ```
102    pub async fn delete_stream(&self, name: &str) -> Result<()> {
103        self.inner
104            .delete_stream(name)
105            .await
106            .map_err(|e| crate::error::Error::JetStreamStream(e.to_string()))?;
107        Ok(())
108    }
109
110    /// Publish a typed message to a JetStream subject.
111    ///
112    /// # Type Parameters
113    ///
114    /// * `T` - The message type (must implement Serialize)
115    ///
116    /// # Example
117    ///
118    /// ```no_run
119    /// use intercom::{Client, MsgPackCodec};
120    /// use serde::{Deserialize, Serialize};
121    ///
122    /// #[derive(Serialize, Deserialize)]
123    /// struct Event { id: u64, data: String }
124    ///
125    /// # async fn example() -> intercom::Result<()> {
126    /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
127    /// let jetstream = client.jetstream();
128    ///
129    /// let ack = jetstream.publish::<Event>("events.user", &Event {
130    ///     id: 1,
131    ///     data: "created".to_string(),
132    /// }).await?;
133    ///
134    /// println!("Published to stream: {}, seq: {}", ack.stream, ack.sequence);
135    /// # Ok(())
136    /// # }
137    /// ```
138    pub async fn publish<T: Serialize>(&self, subject: &str, message: &T) -> Result<PublishAck> {
139        let data = C::encode(message)?;
140        let ack = self
141            .inner
142            .publish(subject.to_string(), data.into())
143            .await
144            .map_err(|e| crate::error::Error::JetStream(e.to_string()))?
145            .await
146            .map_err(|e| crate::error::Error::JetStream(e.to_string()))?;
147        Ok(PublishAck {
148            stream: ack.stream.to_string(),
149            sequence: ack.sequence,
150            duplicate: ack.duplicate,
151        })
152    }
153
154    /// Publish a typed message and get the ack future immediately.
155    ///
156    /// This allows for more efficient batching by not waiting for the ack.
157    ///
158    /// # Type Parameters
159    ///
160    /// * `T` - The message type (must implement Serialize)
161    ///
162    /// # Example
163    ///
164    /// ```no_run
165    /// use intercom::{Client, MsgPackCodec};
166    /// use serde::{Deserialize, Serialize};
167    ///
168    /// #[derive(Serialize, Deserialize)]
169    /// struct Event { id: u64 }
170    ///
171    /// # async fn example() -> intercom::Result<()> {
172    /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
173    /// let jetstream = client.jetstream();
174    ///
175    /// // Publish without waiting for ack
176    /// let ack_future = jetstream.publish_async::<Event>("events.user", &Event { id: 1 }).await?;
177    ///
178    /// // Do other work...
179    ///
180    /// // Now wait for the ack
181    /// let ack = ack_future.await?;
182    /// # Ok(())
183    /// # }
184    /// ```
185    pub async fn publish_async<T: Serialize>(
186        &self,
187        subject: &str,
188        message: &T,
189    ) -> Result<PublishAckFuture> {
190        let data = C::encode(message)?;
191        let future = self
192            .inner
193            .publish(subject.to_string(), data.into())
194            .await
195            .map_err(|e| crate::error::Error::JetStream(e.to_string()))?;
196        Ok(PublishAckFuture { inner: future })
197    }
198
199    /// Request-reply pattern over JetStream.
200    ///
201    /// # Type Parameters
202    ///
203    /// * `T` - The request message type
204    /// * `R` - The response message type
205    pub async fn request<T: Serialize, R: DeserializeOwned>(
206        &self,
207        subject: &str,
208        message: &T,
209    ) -> Result<R> {
210        let data = C::encode(message)?;
211        let response: async_nats::Message = self
212            .inner
213            .request(subject.to_string(), &bytes::Bytes::from(data))
214            .await
215            .map_err(|e| crate::error::Error::JetStream(e.to_string()))?;
216        C::decode(&response.payload)
217    }
218}
219
220/// Acknowledgment from a JetStream publish.
221#[derive(Debug, Clone)]
222pub struct PublishAck {
223    /// The stream name.
224    pub stream: String,
225    /// The sequence number.
226    pub sequence: u64,
227    /// Whether this was a duplicate.
228    pub duplicate: bool,
229}
230
231/// A future that resolves to a publish acknowledgment.
232pub struct PublishAckFuture {
233    inner: async_nats::jetstream::context::PublishAckFuture,
234}
235
236impl PublishAckFuture {
237    /// Wait for the acknowledgment.
238    pub async fn await_ack(self) -> Result<PublishAck> {
239        let ack = self
240            .inner
241            .await
242            .map_err(|e| crate::error::Error::JetStream(e.to_string()))?;
243        Ok(PublishAck {
244            stream: ack.stream.to_string(),
245            sequence: ack.sequence,
246            duplicate: ack.duplicate,
247        })
248    }
249}
250
251impl std::future::IntoFuture for PublishAckFuture {
252    type Output = Result<PublishAck>;
253    type IntoFuture = std::pin::Pin<Box<dyn std::future::Future<Output = Self::Output> + Send>>;
254
255    fn into_future(self) -> Self::IntoFuture {
256        Box::pin(self.await_ack())
257    }
258}