intercom_rs/
client.rs

1//! NATS client wrapper with typed publish/subscribe operations.
2
3use std::{marker::PhantomData, sync::Arc};
4
5use async_nats::ServerAddr;
6use serde::{de::DeserializeOwned, Serialize};
7
8use crate::{
9    codec::CodecType,
10    error::{Error, Result},
11    jetstream::context::JetStreamContext,
12    publisher::Publisher,
13    subscriber::Subscriber,
14};
15
16/// A typed NATS client wrapper with configurable codec.
17///
18/// # Type Parameters
19///
20/// * `C` - The codec type to use for message serialization (e.g., `MsgPackCodec`, `JsonCodec`)
21///
22/// # Example
23///
24/// ```no_run
25/// use intercom::{Client, MsgPackCodec};
26///
27/// # async fn example() -> intercom::Result<()> {
28/// // Using MessagePack codec (default)
29/// let msgpack_client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
30/// # Ok(())
31/// # }
32/// ```
33#[derive(Clone)]
34pub struct Client<C: CodecType> {
35    inner: Arc<async_nats::Client>,
36    _codec: PhantomData<C>,
37}
38
39impl<C: CodecType> Client<C> {
40    /// Connect to a NATS server.
41    ///
42    /// # Example
43    ///
44    /// ```no_run
45    /// use intercom::{Client, MsgPackCodec};
46    ///
47    /// # async fn example() -> intercom::Result<()> {
48    /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
49    /// # Ok(())
50    /// # }
51    /// ```
52    pub async fn connect<A: ToServerAddrs>(addrs: A) -> Result<Self> {
53        let client = async_nats::connect(addrs).await?;
54        Ok(Self {
55            inner: Arc::new(client),
56            _codec: PhantomData,
57        })
58    }
59
60    /// Connect to a NATS server with custom options.
61    ///
62    /// # Example
63    ///
64    /// ```no_run
65    /// use intercom::{Client, MsgPackCodec};
66    ///
67    /// # async fn example() -> intercom::Result<()> {
68    /// let client = Client::<MsgPackCodec>::connect_with_options(
69    ///     "nats://localhost:4222",
70    ///     async_nats::ConnectOptions::new().name("my-app")
71    /// ).await?;
72    /// # Ok(())
73    /// # }
74    /// ```
75    pub async fn connect_with_options<A: ToServerAddrs>(
76        addrs: A,
77        options: async_nats::ConnectOptions,
78    ) -> Result<Self> {
79        let client = options.connect(addrs).await?;
80        Ok(Self {
81            inner: Arc::new(client),
82            _codec: PhantomData,
83        })
84    }
85
86    /// Get the underlying async-nats client.
87    pub fn inner(&self) -> &async_nats::Client {
88        &self.inner
89    }
90
91    /// Publish a typed message to a subject.
92    ///
93    /// # Type Parameters
94    ///
95    /// * `T` - The message type (must implement Serialize)
96    ///
97    /// # Example
98    ///
99    /// ```no_run
100    /// use intercom::{Client, MsgPackCodec};
101    /// use serde::{Deserialize, Serialize};
102    ///
103    /// #[derive(Serialize, Deserialize)]
104    /// struct MyMessage {
105    ///     content: String,
106    /// }
107    ///
108    /// # async fn example() -> intercom::Result<()> {
109    /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
110    ///
111    /// // Using turbofish syntax
112    /// client.publish::<MyMessage>("subject", &MyMessage { content: "hello".into() }).await?;
113    /// # Ok(())
114    /// # }
115    /// ```
116    pub async fn publish<T: Serialize>(&self, subject: &str, message: &T) -> Result<()> {
117        let data = C::encode(message)?;
118        self.inner
119            .publish(subject.to_string(), data.into())
120            .await?;
121        Ok(())
122    }
123
124    /// Publish a typed message and wait for a reply.
125    ///
126    /// # Type Parameters
127    ///
128    /// * `T` - The request message type (must implement Serialize)
129    /// * `R` - The response message type (must implement DeserializeOwned)
130    ///
131    /// # Example
132    ///
133    /// ```no_run
134    /// use intercom::{Client, MsgPackCodec};
135    /// use serde::{Deserialize, Serialize};
136    ///
137    /// #[derive(Serialize, Deserialize)]
138    /// struct Request { query: String }
139    ///
140    /// #[derive(Serialize, Deserialize)]
141    /// struct Response { result: String }
142    ///
143    /// # async fn example() -> intercom::Result<()> {
144    /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
145    ///
146    /// let response = client.request::<Request, Response>(
147    ///     "service",
148    ///     &Request { query: "hello".into() }
149    /// ).await?;
150    /// # Ok(())
151    /// # }
152    /// ```
153    pub async fn request<T: Serialize, R: DeserializeOwned>(
154        &self,
155        subject: &str,
156        message: &T,
157    ) -> Result<R> {
158        let data = C::encode(message)?;
159        let response = self.inner.request(subject.to_string(), data.into()).await?;
160        C::decode(&response.payload)
161    }
162
163    /// Publish a message with a specific reply subject.
164    ///
165    /// # Type Parameters
166    ///
167    /// * `T` - The message type (must implement Serialize)
168    ///
169    /// # Example
170    ///
171    /// ```no_run
172    /// use intercom::{Client, MsgPackCodec};
173    /// use serde::{Deserialize, Serialize};
174    ///
175    /// #[derive(Serialize, Deserialize)]
176    /// struct MyMessage { content: String }
177    ///
178    /// # async fn example() -> intercom::Result<()> {
179    /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
180    ///
181    /// client.publish_with_reply::<MyMessage>(
182    ///     "subject",
183    ///     "reply.subject",
184    ///     &MyMessage { content: "hello".into() }
185    /// ).await?;
186    /// # Ok(())
187    /// # }
188    /// ```
189    pub async fn publish_with_reply<T: Serialize>(
190        &self,
191        subject: &str,
192        reply: &str,
193        message: &T,
194    ) -> Result<()> {
195        let data = C::encode(message)?;
196        self.inner
197            .publish_with_reply(subject.to_string(), reply.to_string(), data.into())
198            .await?;
199        Ok(())
200    }
201
202    /// Subscribe to a subject with typed messages.
203    ///
204    /// Returns a [`Subscriber`] that implements [`futures::Stream`].
205    ///
206    /// # Type Parameters
207    ///
208    /// * `T` - The message type (must implement DeserializeOwned)
209    ///
210    /// # Example
211    ///
212    /// ```no_run
213    /// use intercom::{Client, MsgPackCodec};
214    /// use serde::{Deserialize, Serialize};
215    /// use futures::StreamExt;
216    ///
217    /// #[derive(Serialize, Deserialize, Debug)]
218    /// struct MyMessage { content: String }
219    ///
220    /// # async fn example() -> intercom::Result<()> {
221    /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
222    ///
223    /// let mut subscriber = client.subscribe::<MyMessage>("subject").await?;
224    ///
225    /// while let Some(result) = subscriber.next().await {
226    ///     match result {
227    ///         Ok(msg) => println!("Received: {:?}", msg.payload),
228    ///         Err(e) => eprintln!("Error: {}", e),
229    ///     }
230    /// }
231    /// # Ok(())
232    /// # }
233    /// ```
234    pub async fn subscribe<T: DeserializeOwned>(&self, subject: &str) -> Result<Subscriber<T, C>> {
235        let inner = self.inner.subscribe(subject.to_string()).await?;
236        Ok(Subscriber::new(inner))
237    }
238
239    /// Subscribe to a subject as part of a queue group.
240    ///
241    /// Messages are load-balanced across subscribers in the same queue group.
242    ///
243    /// # Type Parameters
244    ///
245    /// * `T` - The message type (must implement DeserializeOwned)
246    ///
247    /// # Example
248    ///
249    /// ```no_run
250    /// use intercom::{Client, MsgPackCodec};
251    /// use serde::{Deserialize, Serialize};
252    ///
253    /// #[derive(Serialize, Deserialize, Debug)]
254    /// struct MyMessage { content: String }
255    ///
256    /// # async fn example() -> intercom::Result<()> {
257    /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
258    ///
259    /// let subscriber = client.queue_subscribe::<MyMessage>("subject", "my-queue").await?;
260    /// # Ok(())
261    /// # }
262    /// ```
263    pub async fn queue_subscribe<T: DeserializeOwned>(
264        &self,
265        subject: &str,
266        queue_group: &str,
267    ) -> Result<Subscriber<T, C>> {
268        let inner = self
269            .inner
270            .queue_subscribe(subject.to_string(), queue_group.to_string())
271            .await?;
272        Ok(Subscriber::new(inner))
273    }
274
275    /// Create a typed publisher for a subject.
276    ///
277    /// Returns a [`Publisher`] that implements [`futures::Sink`].
278    ///
279    /// # Type Parameters
280    ///
281    /// * `T` - The message type (must implement Serialize)
282    ///
283    /// # Example
284    ///
285    /// ```no_run
286    /// use intercom::{Client, MsgPackCodec};
287    /// use serde::{Deserialize, Serialize};
288    /// use futures::SinkExt;
289    ///
290    /// #[derive(Serialize, Deserialize)]
291    /// struct MyMessage { content: String }
292    ///
293    /// # async fn example() -> intercom::Result<()> {
294    /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
295    ///
296    /// let mut publisher = client.publisher::<MyMessage>("subject");
297    ///
298    /// publisher.send(MyMessage { content: "hello".into() }).await?;
299    /// # Ok(())
300    /// # }
301    /// ```
302    pub fn publisher<T: Serialize>(&self, subject: &str) -> Publisher<T, C> {
303        Publisher::new(self.inner.clone(), subject.to_string())
304    }
305
306    /// Create a JetStream context.
307    ///
308    /// # Example
309    ///
310    /// ```no_run
311    /// use intercom::{Client, MsgPackCodec};
312    ///
313    /// # async fn example() -> intercom::Result<()> {
314    /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
315    /// let jetstream = client.jetstream();
316    /// # Ok(())
317    /// # }
318    /// ```
319    pub fn jetstream(&self) -> JetStreamContext<C> {
320        JetStreamContext::new(async_nats::jetstream::new((*self.inner).clone()))
321    }
322
323    /// Create a JetStream context with a custom domain.
324    ///
325    /// # Example
326    ///
327    /// ```no_run
328    /// use intercom::{Client, MsgPackCodec};
329    ///
330    /// # async fn example() -> intercom::Result<()> {
331    /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
332    /// let jetstream = client.jetstream_with_domain("my-domain");
333    /// # Ok(())
334    /// # }
335    /// ```
336    pub fn jetstream_with_domain(&self, domain: &str) -> JetStreamContext<C> {
337        JetStreamContext::new(async_nats::jetstream::with_domain(
338            (*self.inner).clone(),
339            domain,
340        ))
341    }
342
343    /// Create a JetStream context with a custom prefix.
344    ///
345    /// # Example
346    ///
347    /// ```no_run
348    /// use intercom::{Client, MsgPackCodec};
349    ///
350    /// # async fn example() -> intercom::Result<()> {
351    /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
352    /// let jetstream = client.jetstream_with_prefix("my-prefix");
353    /// # Ok(())
354    /// # }
355    /// ```
356    pub fn jetstream_with_prefix(&self, prefix: &str) -> JetStreamContext<C> {
357        JetStreamContext::new(async_nats::jetstream::with_prefix(
358            (*self.inner).clone(),
359            prefix,
360        ))
361    }
362
363    /// Flush any pending messages.
364    pub async fn flush(&self) -> Result<()> {
365        self.inner
366            .flush()
367            .await
368            .map_err(|e| Error::Nats(e.into()))
369    }
370}
371
372/// Trait for types that can be converted to server addresses.
373pub trait ToServerAddrs: async_nats::ToServerAddrs {}
374
375impl ToServerAddrs for &str {}
376impl ToServerAddrs for String {}
377impl ToServerAddrs for &String {}
378impl ToServerAddrs for ServerAddr {}
379impl ToServerAddrs for &ServerAddr {}
380impl ToServerAddrs for Vec<ServerAddr> {}
381impl ToServerAddrs for &[ServerAddr] {}