grafbase_sdk/host_io/pubsub/
nats.rs

1//! Client interface for interacting with NATS messaging system
2//!
3//! Ok(Some(field_output))
4//!
5//! This module provides a high-level client for connecting to and interacting with NATS servers.
6//! It supports both authenticated and unauthenticated connections to one or more NATS servers.
7
8use crate::{
9    extension::resolver::Subscription,
10    types::{Error, SubscriptionOutput},
11    wit, SdkError,
12};
13use std::time::Duration;
14
15pub use time::OffsetDateTime;
16pub use wit::NatsAuth;
17
18/// A client for interacting with NATS servers
19pub struct NatsClient {
20    inner: wit::NatsClient,
21}
22
23impl NatsClient {
24    /// Publishes a message in JSON formatto the specified NATS subject
25    ///
26    /// # Arguments
27    ///
28    /// * `subject` - The NATS subject to publish to
29    /// * `payload` - The message payload as a byte slice
30    ///
31    /// # Returns
32    ///
33    /// Result indicating success or an error if the publish fails
34    pub fn publish<S>(&self, subject: &str, payload: &S) -> Result<(), SdkError>
35    where
36        S: serde::Serialize,
37    {
38        self.inner
39            .publish(subject, &serde_json::to_vec(payload)?)
40            .map_err(Into::into)
41    }
42
43    /// Sends a request in JSON to the specified NATS subject and waits for a response
44    ///
45    /// # Arguments
46    ///
47    /// * `subject` - The NATS subject to send the request to
48    /// * `payload` - The request payload to serialize and send
49    /// * `timeout` - Optional duration to wait for a response before timing out
50    ///
51    /// # Returns
52    ///
53    /// Result containing the deserialized response or an error if the request fails
54    pub fn request<S, T>(&self, subject: &str, payload: &S, timeout: Option<Duration>) -> Result<T, SdkError>
55    where
56        S: serde::Serialize,
57        T: for<'de> serde::Deserialize<'de>,
58    {
59        let body = serde_json::to_vec(payload).unwrap();
60        let response = self.request_bytes(subject, &body, timeout)?;
61
62        Ok(serde_json::from_slice(&response)?)
63    }
64
65    /// Sends a request to the specified NATS subject and waits for a response, returning raw bytes
66    ///
67    /// # Arguments
68    ///
69    /// * `subject` - The NATS subject to send the request to
70    /// * `body` - The raw byte payload to send
71    /// * `timeout` - Optional duration to wait for a response before timing out
72    ///
73    /// # Returns
74    ///
75    /// Result containing the raw byte response or an error if the request fails
76    pub fn request_bytes(&self, subject: &str, body: &[u8], timeout: Option<Duration>) -> Result<Vec<u8>, SdkError> {
77        let timeout = timeout.map(|t| t.as_millis() as u64);
78        let response = self.inner.request(subject, body, timeout)?;
79
80        Ok(response.payload)
81    }
82
83    /// Subscribes to messages on the specified NATS subject
84    ///
85    /// # Arguments
86    ///
87    /// * `subject` - The NATS subject to subscribe to
88    ///
89    /// # Returns
90    ///
91    /// Result containing the subscription or an error if subscription fails
92    pub fn subscribe(&self, subject: &str, config: Option<NatsStreamConfig>) -> Result<NatsSubscription, SdkError> {
93        let subscription = self
94            .inner
95            .subscribe(subject, config.map(Into::into).as_ref())
96            .map(Into::into)?;
97
98        Ok(subscription)
99    }
100
101    /// Gets a key-value store interface for the specified bucket
102    ///
103    /// # Arguments
104    ///
105    /// * `bucket` - The name of the JetStream KV bucket to access
106    ///
107    /// # Returns
108    ///
109    /// Result containing the key-value store interface or an error if retrieval fails
110    pub fn key_value(&self, bucket: &str) -> Result<NatsKeyValue, SdkError> {
111        let store = self.inner.key_value(bucket)?;
112        Ok(store.into())
113    }
114}
115
116/// A key-value store for interacting with NATS JetStream KV buckets
117pub struct NatsKeyValue {
118    inner: wit::NatsKeyValue,
119}
120
121impl From<wit::NatsKeyValue> for NatsKeyValue {
122    fn from(inner: wit::NatsKeyValue) -> Self {
123        NatsKeyValue { inner }
124    }
125}
126
127impl NatsKeyValue {
128    /// Retrieves a value for the specified key in JSON format
129    ///
130    /// # Arguments
131    ///
132    /// * `key` - The key to retrieve the value for
133    ///
134    /// # Returns
135    ///
136    /// Result containing the deserialized value if found, or None if the key doesn't exist
137    pub fn get<S>(&self, key: &str) -> Result<Option<S>, SdkError>
138    where
139        S: for<'a> serde::Deserialize<'a>,
140    {
141        match self.get_bytes(key)? {
142            Some(ref value) => Ok(Some(serde_json::from_slice(value)?)),
143            None => Ok(None),
144        }
145    }
146
147    /// Retrieves the raw bytes for the specified key
148    ///
149    /// # Arguments
150    ///
151    /// * `key` - The key to retrieve the value for
152    ///
153    /// # Returns
154    ///
155    /// Result containing the raw byte value if found, or None if the key doesn't exist
156    pub fn get_bytes(&self, key: &str) -> Result<Option<Vec<u8>>, SdkError> {
157        match self.inner.get(key)? {
158            Some(value) => Ok(Some(value)),
159            None => Ok(None),
160        }
161    }
162
163    /// Stores a value for the specified key in JSON format
164    ///
165    /// # Arguments
166    ///
167    /// * `key` - The key to store the value under
168    /// * `value` - The value to store, which will be serialized to JSON
169    ///
170    /// # Returns
171    ///
172    /// Result containing the revision number of the stored value
173    pub fn put<S>(&self, key: &str, value: &S) -> Result<u64, SdkError>
174    where
175        S: serde::Serialize,
176    {
177        let value = serde_json::to_vec(value)?;
178        self.put_bytes(key, &value)
179    }
180
181    /// Stores raw bytes for the specified key
182    ///
183    /// # Arguments
184    ///
185    /// * `key` - The key to store the value under
186    /// * `value` - The raw byte value to store
187    ///
188    /// # Returns
189    ///
190    /// Result containing the revision number of the stored value
191    pub fn put_bytes(&self, key: &str, value: &[u8]) -> Result<u64, SdkError> {
192        Ok(self.inner.put(key, value)?)
193    }
194
195    /// Creates a new key-value pair in JSON format, failing if the key already exists
196    ///
197    /// # Arguments
198    ///
199    /// * `key` - The key to create
200    /// * `value` - The value to store, which will be serialized to JSON
201    ///
202    /// # Returns
203    ///
204    /// Result containing the revision number of the created value
205    pub fn create<S>(&self, key: &str, value: &S) -> Result<u64, SdkError>
206    where
207        S: serde::Serialize,
208    {
209        let value = serde_json::to_vec(value)?;
210        self.create_bytes(key, &value)
211    }
212
213    /// Creates a new key-value pair with raw bytes, failing if the key already exists
214    ///
215    /// # Arguments
216    ///
217    /// * `key` - The key to create
218    /// * `value` - The raw byte value to store
219    ///
220    /// # Returns
221    ///
222    /// Result containing the revision number of the created value
223    pub fn create_bytes(&self, key: &str, value: &[u8]) -> Result<u64, SdkError> {
224        Ok(self.inner.create(key, value)?)
225    }
226
227    /// Deletes the specified key and its associated value
228    ///
229    /// # Arguments
230    ///
231    /// * `key` - The key to delete
232    ///
233    /// # Returns
234    ///
235    /// Result indicating success or an error if deletion fails
236    pub fn delete(&self, key: &str) -> Result<(), SdkError> {
237        Ok(self.inner.delete(key)?)
238    }
239}
240
241/// A subscription to a NATS subject that receives messages published to that subject
242pub struct NatsSubscription {
243    inner: wit::NatsSubscriber,
244}
245
246impl From<wit::NatsSubscriber> for NatsSubscription {
247    fn from(inner: wit::NatsSubscriber) -> Self {
248        NatsSubscription { inner }
249    }
250}
251
252impl NatsSubscription {
253    /// Gets the next message from the subscription
254    ///
255    /// # Returns
256    ///
257    /// Result containing the next message or an error if retrieval fails
258    pub fn next(&self) -> Result<Option<NatsMessage>, SdkError> {
259        self.inner.next().map_err(Into::into).map(|msg| msg.map(Into::into))
260    }
261}
262
263/// A message received from a NATS subscription containing the payload data
264pub struct NatsMessage {
265    inner: crate::wit::NatsMessage,
266}
267
268impl From<crate::wit::NatsMessage> for NatsMessage {
269    fn from(inner: crate::wit::NatsMessage) -> Self {
270        NatsMessage { inner }
271    }
272}
273
274impl NatsMessage {
275    /// Gets the payload data of the message in JSON format
276    ///
277    /// # Returns
278    ///
279    /// Result containing the payload data or an error if retrieval fails
280    pub fn payload<S>(&self) -> Result<S, SdkError>
281    where
282        S: for<'de> serde::Deserialize<'de>,
283    {
284        Ok(serde_json::from_slice(self.payload_bytes())?)
285    }
286
287    /// Gets the raw bytes of the message payload
288    ///
289    /// # Returns
290    ///
291    /// A byte slice containing the raw message payload
292    pub fn payload_bytes(&self) -> &[u8] {
293        &self.inner.payload
294    }
295
296    /// Gets the subject of the message
297    ///
298    /// # Returns
299    ///
300    /// The NATS subject this message was published to
301    pub fn subject(&self) -> &str {
302        &self.inner.subject
303    }
304}
305
306/// Connects to one or more NATS servers
307///
308/// # Arguments
309///
310/// * `servers` - Iterator of server addresses to connect to
311///
312/// # Returns
313///
314/// Result containing the connected NATS client or an error if connection fails
315pub fn connect(servers: impl IntoIterator<Item = impl ToString>) -> Result<NatsClient, SdkError> {
316    let servers: Vec<_> = servers.into_iter().map(|s| s.to_string()).collect();
317    let inner = crate::wit::NatsClient::connect(&servers, None)?;
318
319    Ok(NatsClient { inner })
320}
321
322/// Connects to one or more NATS servers with authentication
323///
324/// # Arguments
325///
326/// * `servers` - Iterator of server addresses to connect to
327/// * `auth` - Authentication credentials for connecting to the servers
328///
329/// # Returns
330///
331/// Result containing the connected NATS client or an error if connection fails
332pub fn connect_with_auth(
333    servers: impl IntoIterator<Item = impl ToString>,
334    auth: &NatsAuth,
335) -> Result<NatsClient, SdkError> {
336    let servers: Vec<_> = servers.into_iter().map(|s| s.to_string()).collect();
337    let inner = crate::wit::NatsClient::connect(&servers, Some(auth))?;
338
339    Ok(NatsClient { inner })
340}
341
342impl Subscription for NatsSubscription {
343    fn next(&mut self) -> Result<Option<SubscriptionOutput>, Error> {
344        let item = match NatsSubscription::next(self) {
345            Ok(Some(item)) => item,
346            Ok(None) => return Ok(None),
347            Err(e) => return Err(format!("Error receiving NATS message: {e}").into()),
348        };
349
350        let mut builder = SubscriptionOutput::builder();
351
352        let payload: serde_json::Value = item
353            .payload()
354            .map_err(|e| format!("Error parsing NATS value as JSON: {e}"))?;
355
356        builder.push(payload)?;
357
358        Ok(Some(builder.build()))
359    }
360}
361
362/// Configuration for NATS JetStream consumers
363///
364/// This struct wraps the internal configuration for JetStream consumers
365/// and provides a builder pattern for easy configuration.
366pub struct NatsStreamConfig(wit::NatsStreamConfig);
367
368impl From<NatsStreamConfig> for wit::NatsStreamConfig {
369    fn from(config: NatsStreamConfig) -> Self {
370        config.0
371    }
372}
373
374/// Delivery policy for NATS JetStream consumers
375///
376/// This enum defines the various policies that determine how messages are delivered to
377/// JetStream consumers, such as delivering all messages, only the latest message,
378/// or messages from a specific sequence or time.
379#[derive(Debug)]
380pub enum NatsStreamDeliverPolicy {
381    /// All causes the consumer to receive the oldest messages still present in the system.
382    /// This is the default.
383    All,
384    /// Last will start the consumer with the last sequence received.
385    Last,
386    /// New will only deliver new messages that are received by the JetStream server after
387    /// the consumer is created.
388    New,
389    /// ByStartSequence will look for a defined starting sequence to the consumer’s configured
390    /// opt_start_seq parameter.
391    ByStartSequence(u64),
392    /// ByStartTime will select the first message with a timestamp >= to the consumer’s
393    /// configured opt_start_time parameter.
394    ByStartTime(OffsetDateTime),
395    /// LastPerSubject will start the consumer with the last message for all subjects received.
396    LastPerSubject,
397}
398
399impl From<NatsStreamDeliverPolicy> for wit::NatsStreamDeliverPolicy {
400    fn from(value: NatsStreamDeliverPolicy) -> Self {
401        match value {
402            NatsStreamDeliverPolicy::All => wit::NatsStreamDeliverPolicy::All,
403            NatsStreamDeliverPolicy::Last => wit::NatsStreamDeliverPolicy::Last,
404            NatsStreamDeliverPolicy::New => wit::NatsStreamDeliverPolicy::New,
405            NatsStreamDeliverPolicy::ByStartSequence(seq) => wit::NatsStreamDeliverPolicy::ByStartSequence(seq),
406            NatsStreamDeliverPolicy::ByStartTime(time) => {
407                wit::NatsStreamDeliverPolicy::ByStartTimeMs((time.unix_timestamp_nanos() / 1_000_000) as u64)
408            }
409            NatsStreamDeliverPolicy::LastPerSubject => wit::NatsStreamDeliverPolicy::LastPerSubject,
410        }
411    }
412}
413
414impl NatsStreamConfig {
415    /// Creates a new JetStream consumer configuration
416    ///
417    /// # Arguments
418    ///
419    /// * `deliver_policy` - Determines how messages are delivered to the consumer
420    /// * `inactive_threshold` - Duration after which a consumer is considered inactive
421    ///
422    /// # Returns
423    ///
424    /// A new `NatsStreamConfig` with the specified settings
425    pub fn new(
426        stream_name: String,
427        consumer_name: String,
428        deliver_policy: NatsStreamDeliverPolicy,
429        inactive_threshold: Duration,
430    ) -> Self {
431        NatsStreamConfig(wit::NatsStreamConfig {
432            stream_name,
433            consumer_name,
434            durable_name: None,
435            deliver_policy: deliver_policy.into(),
436            inactive_threshold_ms: inactive_threshold.as_millis() as u64,
437            description: None,
438        })
439    }
440
441    /// Sets a durable name for the consumer
442    ///
443    /// Durable consumers maintain their state even when disconnected.
444    ///
445    /// # Arguments
446    ///
447    /// * `durable_name` - The durable name to use for this consumer
448    ///
449    /// # Returns
450    ///
451    /// The updated configuration
452    pub fn with_durable_name(mut self, durable_name: String) -> Self {
453        self.0.durable_name = Some(durable_name);
454        self
455    }
456
457    /// Sets a description for the consumer
458    ///
459    /// # Arguments
460    ///
461    /// * `description` - The description to use for this consumer
462    ///
463    /// # Returns
464    ///
465    /// The updated configuration
466    pub fn with_description(mut self, description: String) -> Self {
467        self.0.description = Some(description);
468        self
469    }
470}