intercom_rs/jetstream/context.rs
1//! JetStream context for managing streams and publishing messages.
2
3use std::marker::PhantomData;
4
5use serde::{de::DeserializeOwned, Serialize};
6
7use crate::{codec::CodecType, error::Result};
8
9use super::stream::{Stream, StreamBuilder};
10
11/// A typed JetStream context with configurable codec.
12///
13/// Provides methods for creating streams, publishing messages, and managing
14/// JetStream resources.
15///
16/// # Type Parameters
17///
18/// * `C` - The codec type used for message serialization
19#[derive(Clone)]
20pub struct JetStreamContext<C: CodecType> {
21 inner: async_nats::jetstream::Context,
22 _codec: PhantomData<C>,
23}
24
25impl<C: CodecType> JetStreamContext<C> {
26 /// Create a new JetStream context.
27 pub(crate) fn new(inner: async_nats::jetstream::Context) -> Self {
28 Self {
29 inner,
30 _codec: PhantomData,
31 }
32 }
33
34 /// Get the underlying async-nats JetStream context.
35 pub fn inner(&self) -> &async_nats::jetstream::Context {
36 &self.inner
37 }
38
39 /// Create a stream builder for configuring and creating a stream.
40 ///
41 /// # Example
42 ///
43 /// ```no_run
44 /// use intercom::{Client, MsgPackCodec};
45 ///
46 /// # async fn example() -> intercom::Result<()> {
47 /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
48 /// let jetstream = client.jetstream();
49 ///
50 /// let stream = jetstream
51 /// .stream_builder("my-stream")
52 /// .subjects(vec!["events.>".to_string()])
53 /// .max_messages(1_000_000)
54 /// .create()
55 /// .await?;
56 /// # Ok(())
57 /// # }
58 /// ```
59 pub fn stream_builder(&self, name: &str) -> StreamBuilder<C> {
60 StreamBuilder::new(self.inner.clone(), name.to_string())
61 }
62
63 /// Get an existing stream by name.
64 ///
65 /// # Example
66 ///
67 /// ```no_run
68 /// use intercom::{Client, MsgPackCodec};
69 ///
70 /// # async fn example() -> intercom::Result<()> {
71 /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
72 /// let jetstream = client.jetstream();
73 ///
74 /// let stream = jetstream.get_stream("my-stream").await?;
75 /// # Ok(())
76 /// # }
77 /// ```
78 pub async fn get_stream(&self, name: &str) -> Result<Stream<C>> {
79 let inner = self
80 .inner
81 .get_stream(name)
82 .await
83 .map_err(|e| crate::error::Error::JetStreamStream(e.to_string()))?;
84 Ok(Stream::new(inner))
85 }
86
87 /// Delete a stream by name.
88 ///
89 /// # Example
90 ///
91 /// ```no_run
92 /// use intercom::{Client, MsgPackCodec};
93 ///
94 /// # async fn example() -> intercom::Result<()> {
95 /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
96 /// let jetstream = client.jetstream();
97 ///
98 /// jetstream.delete_stream("my-stream").await?;
99 /// # Ok(())
100 /// # }
101 /// ```
102 pub async fn delete_stream(&self, name: &str) -> Result<()> {
103 self.inner
104 .delete_stream(name)
105 .await
106 .map_err(|e| crate::error::Error::JetStreamStream(e.to_string()))?;
107 Ok(())
108 }
109
110 /// Publish a typed message to a JetStream subject.
111 ///
112 /// # Type Parameters
113 ///
114 /// * `T` - The message type (must implement Serialize)
115 ///
116 /// # Example
117 ///
118 /// ```no_run
119 /// use intercom::{Client, MsgPackCodec};
120 /// use serde::{Deserialize, Serialize};
121 ///
122 /// #[derive(Serialize, Deserialize)]
123 /// struct Event { id: u64, data: String }
124 ///
125 /// # async fn example() -> intercom::Result<()> {
126 /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
127 /// let jetstream = client.jetstream();
128 ///
129 /// let ack = jetstream.publish::<Event>("events.user", &Event {
130 /// id: 1,
131 /// data: "created".to_string(),
132 /// }).await?;
133 ///
134 /// println!("Published to stream: {}, seq: {}", ack.stream, ack.sequence);
135 /// # Ok(())
136 /// # }
137 /// ```
138 pub async fn publish<T: Serialize>(&self, subject: &str, message: &T) -> Result<PublishAck> {
139 let data = C::encode(message)?;
140 let ack = self
141 .inner
142 .publish(subject.to_string(), data.into())
143 .await
144 .map_err(|e| crate::error::Error::JetStream(e.to_string()))?
145 .await
146 .map_err(|e| crate::error::Error::JetStream(e.to_string()))?;
147 Ok(PublishAck {
148 stream: ack.stream.to_string(),
149 sequence: ack.sequence,
150 duplicate: ack.duplicate,
151 })
152 }
153
154 /// Publish a typed message and get the ack future immediately.
155 ///
156 /// This allows for more efficient batching by not waiting for the ack.
157 ///
158 /// # Type Parameters
159 ///
160 /// * `T` - The message type (must implement Serialize)
161 ///
162 /// # Example
163 ///
164 /// ```no_run
165 /// use intercom::{Client, MsgPackCodec};
166 /// use serde::{Deserialize, Serialize};
167 ///
168 /// #[derive(Serialize, Deserialize)]
169 /// struct Event { id: u64 }
170 ///
171 /// # async fn example() -> intercom::Result<()> {
172 /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
173 /// let jetstream = client.jetstream();
174 ///
175 /// // Publish without waiting for ack
176 /// let ack_future = jetstream.publish_async::<Event>("events.user", &Event { id: 1 }).await?;
177 ///
178 /// // Do other work...
179 ///
180 /// // Now wait for the ack
181 /// let ack = ack_future.await?;
182 /// # Ok(())
183 /// # }
184 /// ```
185 pub async fn publish_async<T: Serialize>(
186 &self,
187 subject: &str,
188 message: &T,
189 ) -> Result<PublishAckFuture> {
190 let data = C::encode(message)?;
191 let future = self
192 .inner
193 .publish(subject.to_string(), data.into())
194 .await
195 .map_err(|e| crate::error::Error::JetStream(e.to_string()))?;
196 Ok(PublishAckFuture { inner: future })
197 }
198
199 /// Request-reply pattern over JetStream.
200 ///
201 /// # Type Parameters
202 ///
203 /// * `T` - The request message type
204 /// * `R` - The response message type
205 pub async fn request<T: Serialize, R: DeserializeOwned>(
206 &self,
207 subject: &str,
208 message: &T,
209 ) -> Result<R> {
210 let data = C::encode(message)?;
211 let response: async_nats::Message = self
212 .inner
213 .request(subject.to_string(), &bytes::Bytes::from(data))
214 .await
215 .map_err(|e| crate::error::Error::JetStream(e.to_string()))?;
216 C::decode(&response.payload)
217 }
218}
219
220/// Acknowledgment from a JetStream publish.
221#[derive(Debug, Clone)]
222pub struct PublishAck {
223 /// The stream name.
224 pub stream: String,
225 /// The sequence number.
226 pub sequence: u64,
227 /// Whether this was a duplicate.
228 pub duplicate: bool,
229}
230
231/// A future that resolves to a publish acknowledgment.
232pub struct PublishAckFuture {
233 inner: async_nats::jetstream::context::PublishAckFuture,
234}
235
236impl PublishAckFuture {
237 /// Wait for the acknowledgment.
238 pub async fn await_ack(self) -> Result<PublishAck> {
239 let ack = self
240 .inner
241 .await
242 .map_err(|e| crate::error::Error::JetStream(e.to_string()))?;
243 Ok(PublishAck {
244 stream: ack.stream.to_string(),
245 sequence: ack.sequence,
246 duplicate: ack.duplicate,
247 })
248 }
249}
250
251impl std::future::IntoFuture for PublishAckFuture {
252 type Output = Result<PublishAck>;
253 type IntoFuture = std::pin::Pin<Box<dyn std::future::Future<Output = Self::Output> + Send>>;
254
255 fn into_future(self) -> Self::IntoFuture {
256 Box::pin(self.await_ack())
257 }
258}