grafbase_sdk/host_io/pubsub/
nats.rs

1//! Client interface for interacting with NATS messaging system
2//!
3//! Ok(Some(field_output))
4//!
5//! This module provides a high-level client for connecting to and interacting with NATS servers.
6//! It supports both authenticated and unauthenticated connections to one or more NATS servers.
7
8use crate::{extension::resolver::Subscription, types::SubscriptionOutput, wit, Error, SdkError};
9use std::time::Duration;
10
11pub use time::OffsetDateTime;
12
13/// A client for interacting with NATS servers
14pub struct NatsClient {
15    inner: wit::NatsClient,
16}
17
18impl NatsClient {
19    /// Publishes a message in JSON formatto the specified NATS subject
20    ///
21    /// # Arguments
22    ///
23    /// * `subject` - The NATS subject to publish to
24    /// * `payload` - The message payload as a byte slice
25    ///
26    /// # Returns
27    ///
28    /// Result indicating success or an error if the publish fails
29    pub fn publish<S>(&self, subject: &str, payload: &S) -> Result<(), SdkError>
30    where
31        S: serde::Serialize,
32    {
33        self.inner
34            .publish(subject, &serde_json::to_vec(payload)?)
35            .map_err(Into::into)
36    }
37
38    /// Sends a request in JSON to the specified NATS subject and waits for a response
39    ///
40    /// # Arguments
41    ///
42    /// * `subject` - The NATS subject to send the request to
43    /// * `payload` - The request payload to serialize and send
44    /// * `timeout` - Optional duration to wait for a response before timing out
45    ///
46    /// # Returns
47    ///
48    /// Result containing the deserialized response or an error if the request fails
49    pub fn request<S, T>(&self, subject: &str, payload: &S, timeout: Option<Duration>) -> Result<T, SdkError>
50    where
51        S: serde::Serialize,
52        T: for<'de> serde::Deserialize<'de>,
53    {
54        let body = serde_json::to_vec(payload).unwrap();
55        let response = self.request_bytes(subject, &body, timeout)?;
56
57        Ok(serde_json::from_slice(&response)?)
58    }
59
60    /// Sends a request to the specified NATS subject and waits for a response, returning raw bytes
61    ///
62    /// # Arguments
63    ///
64    /// * `subject` - The NATS subject to send the request to
65    /// * `body` - The raw byte payload to send
66    /// * `timeout` - Optional duration to wait for a response before timing out
67    ///
68    /// # Returns
69    ///
70    /// Result containing the raw byte response or an error if the request fails
71    pub fn request_bytes(&self, subject: &str, body: &[u8], timeout: Option<Duration>) -> Result<Vec<u8>, SdkError> {
72        let timeout = timeout.map(|t| t.as_millis() as u64);
73        let response = self.inner.request(subject, body, timeout)?;
74
75        Ok(response.payload)
76    }
77
78    /// Subscribes to messages on the specified NATS subject
79    ///
80    /// # Arguments
81    ///
82    /// * `subject` - The NATS subject to subscribe to
83    ///
84    /// # Returns
85    ///
86    /// Result containing the subscription or an error if subscription fails
87    pub fn subscribe(&self, subject: &str, config: Option<NatsStreamConfig>) -> Result<NatsSubscription, SdkError> {
88        let subscription = self
89            .inner
90            .subscribe(subject, config.map(Into::into).as_ref())
91            .map(Into::into)?;
92
93        Ok(subscription)
94    }
95
96    /// Gets a key-value store interface for the specified bucket
97    ///
98    /// # Arguments
99    ///
100    /// * `bucket` - The name of the JetStream KV bucket to access
101    ///
102    /// # Returns
103    ///
104    /// Result containing the key-value store interface or an error if retrieval fails
105    pub fn key_value(&self, bucket: &str) -> Result<NatsKeyValue, SdkError> {
106        let store = self.inner.key_value(bucket)?;
107        Ok(store.into())
108    }
109}
110
111/// A key-value store for interacting with NATS JetStream KV buckets
112pub struct NatsKeyValue {
113    inner: wit::NatsKeyValue,
114}
115
116impl From<wit::NatsKeyValue> for NatsKeyValue {
117    fn from(inner: wit::NatsKeyValue) -> Self {
118        NatsKeyValue { inner }
119    }
120}
121
122impl NatsKeyValue {
123    /// Retrieves a value for the specified key in JSON format
124    ///
125    /// # Arguments
126    ///
127    /// * `key` - The key to retrieve the value for
128    ///
129    /// # Returns
130    ///
131    /// Result containing the deserialized value if found, or None if the key doesn't exist
132    pub fn get<S>(&self, key: &str) -> Result<Option<S>, SdkError>
133    where
134        S: for<'a> serde::Deserialize<'a>,
135    {
136        match self.get_bytes(key)? {
137            Some(ref value) => Ok(Some(serde_json::from_slice(value)?)),
138            None => Ok(None),
139        }
140    }
141
142    /// Retrieves the raw bytes for the specified key
143    ///
144    /// # Arguments
145    ///
146    /// * `key` - The key to retrieve the value for
147    ///
148    /// # Returns
149    ///
150    /// Result containing the raw byte value if found, or None if the key doesn't exist
151    pub fn get_bytes(&self, key: &str) -> Result<Option<Vec<u8>>, SdkError> {
152        match self.inner.get(key)? {
153            Some(value) => Ok(Some(value)),
154            None => Ok(None),
155        }
156    }
157
158    /// Stores a value for the specified key in JSON format
159    ///
160    /// # Arguments
161    ///
162    /// * `key` - The key to store the value under
163    /// * `value` - The value to store, which will be serialized to JSON
164    ///
165    /// # Returns
166    ///
167    /// Result containing the revision number of the stored value
168    pub fn put<S>(&self, key: &str, value: &S) -> Result<u64, SdkError>
169    where
170        S: serde::Serialize,
171    {
172        let value = serde_json::to_vec(value)?;
173        self.put_bytes(key, &value)
174    }
175
176    /// Stores raw bytes for the specified key
177    ///
178    /// # Arguments
179    ///
180    /// * `key` - The key to store the value under
181    /// * `value` - The raw byte value to store
182    ///
183    /// # Returns
184    ///
185    /// Result containing the revision number of the stored value
186    pub fn put_bytes(&self, key: &str, value: &[u8]) -> Result<u64, SdkError> {
187        Ok(self.inner.put(key, value)?)
188    }
189
190    /// Creates a new key-value pair in JSON format, failing if the key already exists
191    ///
192    /// # Arguments
193    ///
194    /// * `key` - The key to create
195    /// * `value` - The value to store, which will be serialized to JSON
196    ///
197    /// # Returns
198    ///
199    /// Result containing the revision number of the created value
200    pub fn create<S>(&self, key: &str, value: &S) -> Result<u64, SdkError>
201    where
202        S: serde::Serialize,
203    {
204        let value = serde_json::to_vec(value)?;
205        self.create_bytes(key, &value)
206    }
207
208    /// Creates a new key-value pair with raw bytes, failing if the key already exists
209    ///
210    /// # Arguments
211    ///
212    /// * `key` - The key to create
213    /// * `value` - The raw byte value to store
214    ///
215    /// # Returns
216    ///
217    /// Result containing the revision number of the created value
218    pub fn create_bytes(&self, key: &str, value: &[u8]) -> Result<u64, SdkError> {
219        Ok(self.inner.create(key, value)?)
220    }
221
222    /// Deletes the specified key and its associated value
223    ///
224    /// # Arguments
225    ///
226    /// * `key` - The key to delete
227    ///
228    /// # Returns
229    ///
230    /// Result indicating success or an error if deletion fails
231    pub fn delete(&self, key: &str) -> Result<(), SdkError> {
232        Ok(self.inner.delete(key)?)
233    }
234}
235
236/// A subscription to a NATS subject that receives messages published to that subject
237pub struct NatsSubscription {
238    inner: wit::NatsSubscriber,
239}
240
241impl From<wit::NatsSubscriber> for NatsSubscription {
242    fn from(inner: wit::NatsSubscriber) -> Self {
243        NatsSubscription { inner }
244    }
245}
246
247impl NatsSubscription {
248    /// Gets the next message from the subscription
249    ///
250    /// # Returns
251    ///
252    /// Result containing the next message or an error if retrieval fails
253    pub fn next(&self) -> Result<Option<NatsMessage>, SdkError> {
254        self.inner.next().map_err(Into::into).map(|msg| msg.map(Into::into))
255    }
256}
257
258/// A message received from a NATS subscription containing the payload data
259pub struct NatsMessage {
260    inner: crate::wit::NatsMessage,
261}
262
263impl From<crate::wit::NatsMessage> for NatsMessage {
264    fn from(inner: crate::wit::NatsMessage) -> Self {
265        NatsMessage { inner }
266    }
267}
268
269impl NatsMessage {
270    /// Gets the payload data of the message in JSON format
271    ///
272    /// # Returns
273    ///
274    /// Result containing the payload data or an error if retrieval fails
275    pub fn payload<S>(&self) -> anyhow::Result<S>
276    where
277        S: for<'de> serde::Deserialize<'de>,
278    {
279        Ok(serde_json::from_slice(self.payload_bytes())?)
280    }
281
282    /// Gets the raw bytes of the message payload
283    ///
284    /// # Returns
285    ///
286    /// A byte slice containing the raw message payload
287    pub fn payload_bytes(&self) -> &[u8] {
288        &self.inner.payload
289    }
290
291    /// Gets the subject of the message
292    ///
293    /// # Returns
294    ///
295    /// The NATS subject this message was published to
296    pub fn subject(&self) -> &str {
297        &self.inner.subject
298    }
299}
300
301/// Connects to one or more NATS servers
302///
303/// # Arguments
304///
305/// * `servers` - Iterator of server addresses to connect to
306///
307/// # Returns
308///
309/// Result containing the connected NATS client or an error if connection fails
310pub fn connect(servers: impl IntoIterator<Item = impl ToString>) -> Result<NatsClient, SdkError> {
311    let servers: Vec<_> = servers.into_iter().map(|s| s.to_string()).collect();
312    let inner = crate::wit::NatsClient::connect(&servers, None)?;
313
314    Ok(NatsClient { inner })
315}
316
317/// Connects to one or more NATS servers with authentication
318///
319/// # Arguments
320///
321/// * `servers` - Iterator of server addresses to connect to
322/// * `auth` - Authentication credentials for connecting to the servers
323///
324/// # Returns
325///
326/// Result containing the connected NATS client or an error if connection fails
327pub fn connect_with_auth(
328    servers: impl IntoIterator<Item = impl ToString>,
329    auth: &crate::NatsAuth,
330) -> Result<NatsClient, SdkError> {
331    let servers: Vec<_> = servers.into_iter().map(|s| s.to_string()).collect();
332    let inner = crate::wit::NatsClient::connect(&servers, Some(auth))?;
333
334    Ok(NatsClient { inner })
335}
336
337impl Subscription for NatsSubscription {
338    fn next(&mut self) -> Result<Option<SubscriptionOutput>, Error> {
339        let item = match NatsSubscription::next(self) {
340            Ok(Some(item)) => item,
341            Ok(None) => return Ok(None),
342            Err(e) => return Err(format!("Error receiving NATS message: {e}").into()),
343        };
344
345        let mut builder = SubscriptionOutput::builder();
346
347        let payload: serde_json::Value = item
348            .payload()
349            .map_err(|e| format!("Error parsing NATS value as JSON: {e}"))?;
350
351        builder.push(payload)?;
352
353        Ok(Some(builder.build()))
354    }
355}
356
357/// Configuration for NATS JetStream consumers
358///
359/// This struct wraps the internal configuration for JetStream consumers
360/// and provides a builder pattern for easy configuration.
361pub struct NatsStreamConfig(wit::NatsStreamConfig);
362
363impl From<NatsStreamConfig> for wit::NatsStreamConfig {
364    fn from(config: NatsStreamConfig) -> Self {
365        config.0
366    }
367}
368
369/// Delivery policy for NATS JetStream consumers
370///
371/// This enum defines the various policies that determine how messages are delivered to
372/// JetStream consumers, such as delivering all messages, only the latest message,
373/// or messages from a specific sequence or time.
374#[derive(Debug)]
375pub enum NatsStreamDeliverPolicy {
376    /// All causes the consumer to receive the oldest messages still present in the system.
377    /// This is the default.
378    All,
379    /// Last will start the consumer with the last sequence received.
380    Last,
381    /// New will only deliver new messages that are received by the JetStream server after
382    /// the consumer is created.
383    New,
384    /// ByStartSequence will look for a defined starting sequence to the consumer’s configured
385    /// opt_start_seq parameter.
386    ByStartSequence(u64),
387    /// ByStartTime will select the first message with a timestamp >= to the consumer’s
388    /// configured opt_start_time parameter.
389    ByStartTime(OffsetDateTime),
390    /// LastPerSubject will start the consumer with the last message for all subjects received.
391    LastPerSubject,
392}
393
394impl From<NatsStreamDeliverPolicy> for wit::NatsStreamDeliverPolicy {
395    fn from(value: NatsStreamDeliverPolicy) -> Self {
396        match value {
397            NatsStreamDeliverPolicy::All => wit::NatsStreamDeliverPolicy::All,
398            NatsStreamDeliverPolicy::Last => wit::NatsStreamDeliverPolicy::Last,
399            NatsStreamDeliverPolicy::New => wit::NatsStreamDeliverPolicy::New,
400            NatsStreamDeliverPolicy::ByStartSequence(seq) => wit::NatsStreamDeliverPolicy::ByStartSequence(seq),
401            NatsStreamDeliverPolicy::ByStartTime(time) => {
402                wit::NatsStreamDeliverPolicy::ByStartTimeMs((time.unix_timestamp_nanos() / 1_000_000) as u64)
403            }
404            NatsStreamDeliverPolicy::LastPerSubject => wit::NatsStreamDeliverPolicy::LastPerSubject,
405        }
406    }
407}
408
409impl NatsStreamConfig {
410    /// Creates a new JetStream consumer configuration
411    ///
412    /// # Arguments
413    ///
414    /// * `deliver_policy` - Determines how messages are delivered to the consumer
415    /// * `inactive_threshold` - Duration after which a consumer is considered inactive
416    ///
417    /// # Returns
418    ///
419    /// A new `NatsStreamConfig` with the specified settings
420    pub fn new(
421        stream_name: String,
422        consumer_name: String,
423        deliver_policy: NatsStreamDeliverPolicy,
424        inactive_threshold: Duration,
425    ) -> Self {
426        NatsStreamConfig(wit::NatsStreamConfig {
427            stream_name,
428            consumer_name,
429            durable_name: None,
430            deliver_policy: deliver_policy.into(),
431            inactive_threshold_ms: inactive_threshold.as_millis() as u64,
432            description: None,
433        })
434    }
435
436    /// Sets a durable name for the consumer
437    ///
438    /// Durable consumers maintain their state even when disconnected.
439    ///
440    /// # Arguments
441    ///
442    /// * `durable_name` - The durable name to use for this consumer
443    ///
444    /// # Returns
445    ///
446    /// The updated configuration
447    pub fn with_durable_name(mut self, durable_name: String) -> Self {
448        self.0.durable_name = Some(durable_name);
449        self
450    }
451
452    /// Sets a description for the consumer
453    ///
454    /// # Arguments
455    ///
456    /// * `description` - The description to use for this consumer
457    ///
458    /// # Returns
459    ///
460    /// The updated configuration
461    pub fn with_description(mut self, description: String) -> Self {
462        self.0.description = Some(description);
463        self
464    }
465}