intercom_rs/client.rs
1//! NATS client wrapper with typed publish/subscribe operations.
2
3use std::{marker::PhantomData, sync::Arc};
4
5use async_nats::ServerAddr;
6use serde::{de::DeserializeOwned, Serialize};
7
8use crate::{
9 codec::CodecType,
10 error::{Error, Result},
11 jetstream::context::JetStreamContext,
12 publisher::Publisher,
13 subscriber::Subscriber,
14};
15
16/// A typed NATS client wrapper with configurable codec.
17///
18/// # Type Parameters
19///
20/// * `C` - The codec type to use for message serialization (e.g., `MsgPackCodec`, `JsonCodec`)
21///
22/// # Example
23///
24/// ```no_run
25/// use intercom::{Client, MsgPackCodec};
26///
27/// # async fn example() -> intercom::Result<()> {
28/// // Using MessagePack codec (default)
29/// let msgpack_client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
30/// # Ok(())
31/// # }
32/// ```
33#[derive(Clone)]
34pub struct Client<C: CodecType> {
35 inner: Arc<async_nats::Client>,
36 _codec: PhantomData<C>,
37}
38
39impl<C: CodecType> Client<C> {
40 /// Connect to a NATS server.
41 ///
42 /// # Example
43 ///
44 /// ```no_run
45 /// use intercom::{Client, MsgPackCodec};
46 ///
47 /// # async fn example() -> intercom::Result<()> {
48 /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
49 /// # Ok(())
50 /// # }
51 /// ```
52 pub async fn connect<A: ToServerAddrs>(addrs: A) -> Result<Self> {
53 let client = async_nats::connect(addrs).await?;
54 Ok(Self {
55 inner: Arc::new(client),
56 _codec: PhantomData,
57 })
58 }
59
60 /// Connect to a NATS server with custom options.
61 ///
62 /// # Example
63 ///
64 /// ```no_run
65 /// use intercom::{Client, MsgPackCodec};
66 ///
67 /// # async fn example() -> intercom::Result<()> {
68 /// let client = Client::<MsgPackCodec>::connect_with_options(
69 /// "nats://localhost:4222",
70 /// async_nats::ConnectOptions::new().name("my-app")
71 /// ).await?;
72 /// # Ok(())
73 /// # }
74 /// ```
75 pub async fn connect_with_options<A: ToServerAddrs>(
76 addrs: A,
77 options: async_nats::ConnectOptions,
78 ) -> Result<Self> {
79 let client = options.connect(addrs).await?;
80 Ok(Self {
81 inner: Arc::new(client),
82 _codec: PhantomData,
83 })
84 }
85
86 /// Get the underlying async-nats client.
87 pub fn inner(&self) -> &async_nats::Client {
88 &self.inner
89 }
90
91 /// Publish a typed message to a subject.
92 ///
93 /// # Type Parameters
94 ///
95 /// * `T` - The message type (must implement Serialize)
96 ///
97 /// # Example
98 ///
99 /// ```no_run
100 /// use intercom::{Client, MsgPackCodec};
101 /// use serde::{Deserialize, Serialize};
102 ///
103 /// #[derive(Serialize, Deserialize)]
104 /// struct MyMessage {
105 /// content: String,
106 /// }
107 ///
108 /// # async fn example() -> intercom::Result<()> {
109 /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
110 ///
111 /// // Using turbofish syntax
112 /// client.publish::<MyMessage>("subject", &MyMessage { content: "hello".into() }).await?;
113 /// # Ok(())
114 /// # }
115 /// ```
116 pub async fn publish<T: Serialize>(&self, subject: &str, message: &T) -> Result<()> {
117 let data = C::encode(message)?;
118 self.inner
119 .publish(subject.to_string(), data.into())
120 .await?;
121 Ok(())
122 }
123
124 /// Publish a typed message and wait for a reply.
125 ///
126 /// # Type Parameters
127 ///
128 /// * `T` - The request message type (must implement Serialize)
129 /// * `R` - The response message type (must implement DeserializeOwned)
130 ///
131 /// # Example
132 ///
133 /// ```no_run
134 /// use intercom::{Client, MsgPackCodec};
135 /// use serde::{Deserialize, Serialize};
136 ///
137 /// #[derive(Serialize, Deserialize)]
138 /// struct Request { query: String }
139 ///
140 /// #[derive(Serialize, Deserialize)]
141 /// struct Response { result: String }
142 ///
143 /// # async fn example() -> intercom::Result<()> {
144 /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
145 ///
146 /// let response = client.request::<Request, Response>(
147 /// "service",
148 /// &Request { query: "hello".into() }
149 /// ).await?;
150 /// # Ok(())
151 /// # }
152 /// ```
153 pub async fn request<T: Serialize, R: DeserializeOwned>(
154 &self,
155 subject: &str,
156 message: &T,
157 ) -> Result<R> {
158 let data = C::encode(message)?;
159 let response = self.inner.request(subject.to_string(), data.into()).await?;
160 C::decode(&response.payload)
161 }
162
163 /// Publish a message with a specific reply subject.
164 ///
165 /// # Type Parameters
166 ///
167 /// * `T` - The message type (must implement Serialize)
168 ///
169 /// # Example
170 ///
171 /// ```no_run
172 /// use intercom::{Client, MsgPackCodec};
173 /// use serde::{Deserialize, Serialize};
174 ///
175 /// #[derive(Serialize, Deserialize)]
176 /// struct MyMessage { content: String }
177 ///
178 /// # async fn example() -> intercom::Result<()> {
179 /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
180 ///
181 /// client.publish_with_reply::<MyMessage>(
182 /// "subject",
183 /// "reply.subject",
184 /// &MyMessage { content: "hello".into() }
185 /// ).await?;
186 /// # Ok(())
187 /// # }
188 /// ```
189 pub async fn publish_with_reply<T: Serialize>(
190 &self,
191 subject: &str,
192 reply: &str,
193 message: &T,
194 ) -> Result<()> {
195 let data = C::encode(message)?;
196 self.inner
197 .publish_with_reply(subject.to_string(), reply.to_string(), data.into())
198 .await?;
199 Ok(())
200 }
201
202 /// Subscribe to a subject with typed messages.
203 ///
204 /// Returns a [`Subscriber`] that implements [`futures::Stream`].
205 ///
206 /// # Type Parameters
207 ///
208 /// * `T` - The message type (must implement DeserializeOwned)
209 ///
210 /// # Example
211 ///
212 /// ```no_run
213 /// use intercom::{Client, MsgPackCodec};
214 /// use serde::{Deserialize, Serialize};
215 /// use futures::StreamExt;
216 ///
217 /// #[derive(Serialize, Deserialize, Debug)]
218 /// struct MyMessage { content: String }
219 ///
220 /// # async fn example() -> intercom::Result<()> {
221 /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
222 ///
223 /// let mut subscriber = client.subscribe::<MyMessage>("subject").await?;
224 ///
225 /// while let Some(result) = subscriber.next().await {
226 /// match result {
227 /// Ok(msg) => println!("Received: {:?}", msg.payload),
228 /// Err(e) => eprintln!("Error: {}", e),
229 /// }
230 /// }
231 /// # Ok(())
232 /// # }
233 /// ```
234 pub async fn subscribe<T: DeserializeOwned>(&self, subject: &str) -> Result<Subscriber<T, C>> {
235 let inner = self.inner.subscribe(subject.to_string()).await?;
236 Ok(Subscriber::new(inner))
237 }
238
239 /// Subscribe to a subject as part of a queue group.
240 ///
241 /// Messages are load-balanced across subscribers in the same queue group.
242 ///
243 /// # Type Parameters
244 ///
245 /// * `T` - The message type (must implement DeserializeOwned)
246 ///
247 /// # Example
248 ///
249 /// ```no_run
250 /// use intercom::{Client, MsgPackCodec};
251 /// use serde::{Deserialize, Serialize};
252 ///
253 /// #[derive(Serialize, Deserialize, Debug)]
254 /// struct MyMessage { content: String }
255 ///
256 /// # async fn example() -> intercom::Result<()> {
257 /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
258 ///
259 /// let subscriber = client.queue_subscribe::<MyMessage>("subject", "my-queue").await?;
260 /// # Ok(())
261 /// # }
262 /// ```
263 pub async fn queue_subscribe<T: DeserializeOwned>(
264 &self,
265 subject: &str,
266 queue_group: &str,
267 ) -> Result<Subscriber<T, C>> {
268 let inner = self
269 .inner
270 .queue_subscribe(subject.to_string(), queue_group.to_string())
271 .await?;
272 Ok(Subscriber::new(inner))
273 }
274
275 /// Create a typed publisher for a subject.
276 ///
277 /// Returns a [`Publisher`] that implements [`futures::Sink`].
278 ///
279 /// # Type Parameters
280 ///
281 /// * `T` - The message type (must implement Serialize)
282 ///
283 /// # Example
284 ///
285 /// ```no_run
286 /// use intercom::{Client, MsgPackCodec};
287 /// use serde::{Deserialize, Serialize};
288 /// use futures::SinkExt;
289 ///
290 /// #[derive(Serialize, Deserialize)]
291 /// struct MyMessage { content: String }
292 ///
293 /// # async fn example() -> intercom::Result<()> {
294 /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
295 ///
296 /// let mut publisher = client.publisher::<MyMessage>("subject");
297 ///
298 /// publisher.send(MyMessage { content: "hello".into() }).await?;
299 /// # Ok(())
300 /// # }
301 /// ```
302 pub fn publisher<T: Serialize>(&self, subject: &str) -> Publisher<T, C> {
303 Publisher::new(self.inner.clone(), subject.to_string())
304 }
305
306 /// Create a JetStream context.
307 ///
308 /// # Example
309 ///
310 /// ```no_run
311 /// use intercom::{Client, MsgPackCodec};
312 ///
313 /// # async fn example() -> intercom::Result<()> {
314 /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
315 /// let jetstream = client.jetstream();
316 /// # Ok(())
317 /// # }
318 /// ```
319 pub fn jetstream(&self) -> JetStreamContext<C> {
320 JetStreamContext::new(async_nats::jetstream::new((*self.inner).clone()))
321 }
322
323 /// Create a JetStream context with a custom domain.
324 ///
325 /// # Example
326 ///
327 /// ```no_run
328 /// use intercom::{Client, MsgPackCodec};
329 ///
330 /// # async fn example() -> intercom::Result<()> {
331 /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
332 /// let jetstream = client.jetstream_with_domain("my-domain");
333 /// # Ok(())
334 /// # }
335 /// ```
336 pub fn jetstream_with_domain(&self, domain: &str) -> JetStreamContext<C> {
337 JetStreamContext::new(async_nats::jetstream::with_domain(
338 (*self.inner).clone(),
339 domain,
340 ))
341 }
342
343 /// Create a JetStream context with a custom prefix.
344 ///
345 /// # Example
346 ///
347 /// ```no_run
348 /// use intercom::{Client, MsgPackCodec};
349 ///
350 /// # async fn example() -> intercom::Result<()> {
351 /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
352 /// let jetstream = client.jetstream_with_prefix("my-prefix");
353 /// # Ok(())
354 /// # }
355 /// ```
356 pub fn jetstream_with_prefix(&self, prefix: &str) -> JetStreamContext<C> {
357 JetStreamContext::new(async_nats::jetstream::with_prefix(
358 (*self.inner).clone(),
359 prefix,
360 ))
361 }
362
363 /// Flush any pending messages.
364 pub async fn flush(&self) -> Result<()> {
365 self.inner
366 .flush()
367 .await
368 .map_err(|e| Error::Nats(e.into()))
369 }
370}
371
372/// Trait for types that can be converted to server addresses.
373pub trait ToServerAddrs: async_nats::ToServerAddrs {}
374
375impl ToServerAddrs for &str {}
376impl ToServerAddrs for String {}
377impl ToServerAddrs for &String {}
378impl ToServerAddrs for ServerAddr {}
379impl ToServerAddrs for &ServerAddr {}
380impl ToServerAddrs for Vec<ServerAddr> {}
381impl ToServerAddrs for &[ServerAddr] {}