grafbase_sdk/host_io/pubsub/
nats.rs

1//! Client interface for interacting with NATS messaging system
2//!
3//! Ok(Some(field_output))
4//!
5//! This module provides a high-level client for connecting to and interacting with NATS servers.
6//! It supports both authenticated and unauthenticated connections to one or more NATS servers.
7
8use crate::{extension::resolver::Subscription, types::SubscriptionOutput, wit, Error, SdkError};
9use std::time::Duration;
10
11pub use time::OffsetDateTime;
12
13/// A client for interacting with NATS servers
14pub struct NatsClient {
15    inner: wit::NatsClient,
16}
17
18impl NatsClient {
19    /// Publishes a message to the specified NATS subject
20    ///
21    /// # Arguments
22    ///
23    /// * `subject` - The NATS subject to publish to
24    /// * `payload` - The message payload as a byte slice
25    ///
26    /// # Returns
27    ///
28    /// Result indicating success or an error if the publish fails
29    pub fn publish<S>(&self, subject: &str, payload: &S) -> Result<(), SdkError>
30    where
31        S: serde::Serialize,
32    {
33        self.inner
34            .publish(subject, &serde_json::to_vec(payload)?)
35            .map_err(Into::into)
36    }
37
38    /// Sends a request to the specified NATS subject and waits for a response
39    ///
40    /// # Arguments
41    ///
42    /// * `subject` - The NATS subject to send the request to
43    /// * `payload` - The request payload to serialize and send
44    /// * `timeout` - Optional duration to wait for a response before timing out
45    ///
46    /// # Returns
47    ///
48    /// Result containing the deserialized response or an error if the request fails
49    pub fn request<S, T>(&self, subject: &str, payload: &S, timeout: Option<Duration>) -> Result<T, SdkError>
50    where
51        S: serde::Serialize,
52        T: for<'de> serde::Deserialize<'de>,
53    {
54        let timeout = timeout.map(|t| t.as_millis() as u64);
55        let body = serde_json::to_vec(payload).unwrap();
56        let response = self.inner.request(subject, &body, timeout)?;
57
58        Ok(serde_json::from_slice(&response.payload)?)
59    }
60
61    /// Subscribes to messages on the specified NATS subject
62    ///
63    /// # Arguments
64    ///
65    /// * `subject` - The NATS subject to subscribe to
66    ///
67    /// # Returns
68    ///
69    /// Result containing the subscription or an error if subscription fails
70    pub fn subscribe(&self, subject: &str, config: Option<NatsStreamConfig>) -> Result<NatsSubscription, SdkError> {
71        let subscription = self
72            .inner
73            .subscribe(subject, config.map(Into::into).as_ref())
74            .map(Into::into)?;
75
76        Ok(subscription)
77    }
78
79    /// Gets a key-value store interface for the specified bucket
80    ///
81    /// # Arguments
82    ///
83    /// * `bucket` - The name of the JetStream KV bucket to access
84    ///
85    /// # Returns
86    ///
87    /// Result containing the key-value store interface or an error if retrieval fails
88    pub fn key_value(&self, bucket: &str) -> Result<NatsKeyValue, SdkError> {
89        let store = self.inner.key_value(bucket)?;
90        Ok(store.into())
91    }
92}
93
94/// A key-value store for interacting with NATS JetStream KV buckets
95pub struct NatsKeyValue {
96    inner: wit::NatsKeyValue,
97}
98
99impl From<wit::NatsKeyValue> for NatsKeyValue {
100    fn from(inner: wit::NatsKeyValue) -> Self {
101        NatsKeyValue { inner }
102    }
103}
104
105impl NatsKeyValue {
106    /// Retrieves a value for the specified key
107    ///
108    /// # Arguments
109    ///
110    /// * `key` - The key to retrieve the value for
111    ///
112    /// # Returns
113    ///
114    /// Result containing the deserialized value if found, or None if the key doesn't exist
115    pub fn get<S>(&self, key: &str) -> Result<Option<S>, SdkError>
116    where
117        S: for<'a> serde::Deserialize<'a>,
118    {
119        match self.inner.get(key)? {
120            Some(ref value) => Ok(Some(serde_json::from_slice(value)?)),
121            None => Ok(None),
122        }
123    }
124
125    /// Stores a value for the specified key
126    ///
127    /// # Arguments
128    ///
129    /// * `key` - The key to store the value under
130    /// * `value` - The value to store, which will be serialized to JSON
131    ///
132    /// # Returns
133    ///
134    /// Result containing the revision number of the stored value
135    pub fn put<S>(&self, key: &str, value: &S) -> Result<u64, SdkError>
136    where
137        S: serde::Serialize,
138    {
139        let value = serde_json::to_vec(value)?;
140
141        Ok(self.inner.put(key, &value)?)
142    }
143
144    /// Creates a new key-value pair, failing if the key already exists
145    ///
146    /// # Arguments
147    ///
148    /// * `key` - The key to create
149    /// * `value` - The value to store, which will be serialized to JSON
150    ///
151    /// # Returns
152    ///
153    /// Result containing the revision number of the created value
154    pub fn create<S>(&self, key: &str, value: &S) -> Result<u64, SdkError>
155    where
156        S: serde::Serialize,
157    {
158        let value = serde_json::to_vec(value)?;
159        Ok(self.inner.create(key, &value)?)
160    }
161
162    /// Deletes the specified key and its associated value
163    ///
164    /// # Arguments
165    ///
166    /// * `key` - The key to delete
167    ///
168    /// # Returns
169    ///
170    /// Result indicating success or an error if deletion fails
171    pub fn delete(&self, key: &str) -> Result<(), SdkError> {
172        Ok(self.inner.delete(key)?)
173    }
174}
175
176/// A subscription to a NATS subject that receives messages published to that subject
177pub struct NatsSubscription {
178    inner: wit::NatsSubscriber,
179}
180
181impl From<wit::NatsSubscriber> for NatsSubscription {
182    fn from(inner: wit::NatsSubscriber) -> Self {
183        NatsSubscription { inner }
184    }
185}
186
187impl NatsSubscription {
188    /// Gets the next message from the subscription
189    ///
190    /// # Returns
191    ///
192    /// Result containing the next message or an error if retrieval fails
193    pub fn next(&self) -> Result<Option<NatsMessage>, SdkError> {
194        self.inner.next().map_err(Into::into).map(|msg| msg.map(Into::into))
195    }
196}
197
198/// A message received from a NATS subscription containing the payload data
199pub struct NatsMessage {
200    inner: crate::wit::NatsMessage,
201}
202
203impl From<crate::wit::NatsMessage> for NatsMessage {
204    fn from(inner: crate::wit::NatsMessage) -> Self {
205        NatsMessage { inner }
206    }
207}
208
209impl NatsMessage {
210    /// Gets the payload data of the message
211    ///
212    /// # Returns
213    ///
214    /// Result containing the payload data or an error if retrieval fails
215    pub fn payload<S>(&self) -> anyhow::Result<S>
216    where
217        S: for<'de> serde::Deserialize<'de>,
218    {
219        Ok(serde_json::from_slice(&self.inner.payload)?)
220    }
221
222    /// Gets the subject of the message
223    ///
224    /// # Returns
225    ///
226    /// The NATS subject this message was published to
227    pub fn subject(&self) -> &str {
228        &self.inner.subject
229    }
230}
231
232/// Connects to one or more NATS servers
233///
234/// # Arguments
235///
236/// * `servers` - Iterator of server addresses to connect to
237///
238/// # Returns
239///
240/// Result containing the connected NATS client or an error if connection fails
241pub fn connect(servers: impl IntoIterator<Item = impl ToString>) -> Result<NatsClient, SdkError> {
242    let servers: Vec<_> = servers.into_iter().map(|s| s.to_string()).collect();
243    let inner = crate::wit::NatsClient::connect(&servers, None)?;
244
245    Ok(NatsClient { inner })
246}
247
248/// Connects to one or more NATS servers with authentication
249///
250/// # Arguments
251///
252/// * `servers` - Iterator of server addresses to connect to
253/// * `auth` - Authentication credentials for connecting to the servers
254///
255/// # Returns
256///
257/// Result containing the connected NATS client or an error if connection fails
258pub fn connect_with_auth(
259    servers: impl IntoIterator<Item = impl ToString>,
260    auth: &crate::NatsAuth,
261) -> Result<NatsClient, SdkError> {
262    let servers: Vec<_> = servers.into_iter().map(|s| s.to_string()).collect();
263    let inner = crate::wit::NatsClient::connect(&servers, Some(auth))?;
264
265    Ok(NatsClient { inner })
266}
267
268impl Subscription for NatsSubscription {
269    fn next(&mut self) -> Result<Option<SubscriptionOutput>, Error> {
270        let item = match NatsSubscription::next(self) {
271            Ok(Some(item)) => item,
272            Ok(None) => return Ok(None),
273            Err(e) => return Err(format!("Error receiving NATS message: {e}").into()),
274        };
275
276        let mut builder = SubscriptionOutput::builder();
277
278        let payload: serde_json::Value = item
279            .payload()
280            .map_err(|e| format!("Error parsing NATS value as JSON: {e}"))?;
281
282        builder.push(payload)?;
283
284        Ok(Some(builder.build()))
285    }
286}
287
288/// Configuration for NATS JetStream consumers
289///
290/// This struct wraps the internal configuration for JetStream consumers
291/// and provides a builder pattern for easy configuration.
292pub struct NatsStreamConfig(wit::NatsStreamConfig);
293
294impl From<NatsStreamConfig> for wit::NatsStreamConfig {
295    fn from(config: NatsStreamConfig) -> Self {
296        config.0
297    }
298}
299
300/// Delivery policy for NATS JetStream consumers
301///
302/// This enum defines the various policies that determine how messages are delivered to
303/// JetStream consumers, such as delivering all messages, only the latest message,
304/// or messages from a specific sequence or time.
305#[derive(Debug)]
306pub enum NatsStreamDeliverPolicy {
307    /// All causes the consumer to receive the oldest messages still present in the system.
308    /// This is the default.
309    All,
310    /// Last will start the consumer with the last sequence received.
311    Last,
312    /// New will only deliver new messages that are received by the JetStream server after
313    /// the consumer is created.
314    New,
315    /// ByStartSequence will look for a defined starting sequence to the consumer’s configured
316    /// opt_start_seq parameter.
317    ByStartSequence(u64),
318    /// ByStartTime will select the first message with a timestamp >= to the consumer’s
319    /// configured opt_start_time parameter.
320    ByStartTime(OffsetDateTime),
321    /// LastPerSubject will start the consumer with the last message for all subjects received.
322    LastPerSubject,
323}
324
325impl From<NatsStreamDeliverPolicy> for wit::NatsStreamDeliverPolicy {
326    fn from(value: NatsStreamDeliverPolicy) -> Self {
327        match value {
328            NatsStreamDeliverPolicy::All => wit::NatsStreamDeliverPolicy::All,
329            NatsStreamDeliverPolicy::Last => wit::NatsStreamDeliverPolicy::Last,
330            NatsStreamDeliverPolicy::New => wit::NatsStreamDeliverPolicy::New,
331            NatsStreamDeliverPolicy::ByStartSequence(seq) => wit::NatsStreamDeliverPolicy::ByStartSequence(seq),
332            NatsStreamDeliverPolicy::ByStartTime(time) => {
333                wit::NatsStreamDeliverPolicy::ByStartTimeMs((time.unix_timestamp_nanos() / 1_000_000) as u64)
334            }
335            NatsStreamDeliverPolicy::LastPerSubject => wit::NatsStreamDeliverPolicy::LastPerSubject,
336        }
337    }
338}
339
340impl NatsStreamConfig {
341    /// Creates a new JetStream consumer configuration
342    ///
343    /// # Arguments
344    ///
345    /// * `deliver_policy` - Determines how messages are delivered to the consumer
346    /// * `inactive_threshold` - Duration after which a consumer is considered inactive
347    ///
348    /// # Returns
349    ///
350    /// A new `NatsStreamConfig` with the specified settings
351    pub fn new(
352        stream_name: String,
353        consumer_name: String,
354        deliver_policy: NatsStreamDeliverPolicy,
355        inactive_threshold: Duration,
356    ) -> Self {
357        NatsStreamConfig(wit::NatsStreamConfig {
358            stream_name,
359            consumer_name,
360            durable_name: None,
361            deliver_policy: deliver_policy.into(),
362            inactive_threshold_ms: inactive_threshold.as_millis() as u64,
363            description: None,
364        })
365    }
366
367    /// Sets a durable name for the consumer
368    ///
369    /// Durable consumers maintain their state even when disconnected.
370    ///
371    /// # Arguments
372    ///
373    /// * `durable_name` - The durable name to use for this consumer
374    ///
375    /// # Returns
376    ///
377    /// The updated configuration
378    pub fn with_durable_name(mut self, durable_name: String) -> Self {
379        self.0.durable_name = Some(durable_name);
380        self
381    }
382
383    /// Sets a description for the consumer
384    ///
385    /// # Arguments
386    ///
387    /// * `description` - The description to use for this consumer
388    ///
389    /// # Returns
390    ///
391    /// The updated configuration
392    pub fn with_description(mut self, description: String) -> Self {
393        self.0.description = Some(description);
394        self
395    }
396}