grafbase_sdk/host_io/pubsub/
nats.rs

1//! Client interface for interacting with NATS messaging system
2//!
3//! Ok(Some(field_output))
4//!
5//! This module provides a high-level client for connecting to and interacting with NATS servers.
6//! It supports both authenticated and unauthenticated connections to one or more NATS servers.
7
8use crate::{
9    extension::resolver::Subscription,
10    types::{self},
11    wit, Error, SdkError,
12};
13use std::time::Duration;
14
15pub use time::OffsetDateTime;
16
17/// A client for interacting with NATS servers
18pub struct NatsClient {
19    inner: wit::NatsClient,
20}
21
22impl NatsClient {
23    /// Publishes a message to the specified NATS subject
24    ///
25    /// # Arguments
26    ///
27    /// * `subject` - The NATS subject to publish to
28    /// * `payload` - The message payload as a byte slice
29    ///
30    /// # Returns
31    ///
32    /// Result indicating success or an error if the publish fails
33    pub fn publish<S>(&self, subject: &str, payload: &S) -> Result<(), SdkError>
34    where
35        S: serde::Serialize,
36    {
37        self.inner
38            .publish(subject, &serde_json::to_vec(payload)?)
39            .map_err(Into::into)
40    }
41
42    /// Sends a request to the specified NATS subject and waits for a response
43    ///
44    /// # Arguments
45    ///
46    /// * `subject` - The NATS subject to send the request to
47    /// * `payload` - The request payload to serialize and send
48    /// * `timeout` - Optional duration to wait for a response before timing out
49    ///
50    /// # Returns
51    ///
52    /// Result containing the deserialized response or an error if the request fails
53    pub fn request<S, T>(&self, subject: &str, payload: &S, timeout: Option<Duration>) -> Result<T, SdkError>
54    where
55        S: serde::Serialize,
56        T: for<'de> serde::Deserialize<'de>,
57    {
58        let timeout = timeout.map(|t| t.as_millis() as u64);
59        let body = serde_json::to_vec(payload).unwrap();
60        let response = self.inner.request(subject, &body, timeout)?;
61
62        Ok(serde_json::from_slice(&response.payload)?)
63    }
64
65    /// Subscribes to messages on the specified NATS subject
66    ///
67    /// # Arguments
68    ///
69    /// * `subject` - The NATS subject to subscribe to
70    ///
71    /// # Returns
72    ///
73    /// Result containing the subscription or an error if subscription fails
74    pub fn subscribe(&self, subject: &str, config: Option<NatsStreamConfig>) -> Result<NatsSubscription, SdkError> {
75        let subscription = self
76            .inner
77            .subscribe(subject, config.map(Into::into).as_ref())
78            .map(Into::into)?;
79
80        Ok(subscription)
81    }
82
83    /// Gets a key-value store interface for the specified bucket
84    ///
85    /// # Arguments
86    ///
87    /// * `bucket` - The name of the JetStream KV bucket to access
88    ///
89    /// # Returns
90    ///
91    /// Result containing the key-value store interface or an error if retrieval fails
92    pub fn key_value(&self, bucket: &str) -> Result<NatsKeyValue, SdkError> {
93        let store = self.inner.key_value(bucket)?;
94        Ok(store.into())
95    }
96}
97
98/// A key-value store for interacting with NATS JetStream KV buckets
99pub struct NatsKeyValue {
100    inner: wit::NatsKeyValue,
101}
102
103impl From<wit::NatsKeyValue> for NatsKeyValue {
104    fn from(inner: wit::NatsKeyValue) -> Self {
105        NatsKeyValue { inner }
106    }
107}
108
109impl NatsKeyValue {
110    /// Retrieves a value for the specified key
111    ///
112    /// # Arguments
113    ///
114    /// * `key` - The key to retrieve the value for
115    ///
116    /// # Returns
117    ///
118    /// Result containing the deserialized value if found, or None if the key doesn't exist
119    pub fn get<S>(&self, key: &str) -> Result<Option<S>, SdkError>
120    where
121        S: for<'a> serde::Deserialize<'a>,
122    {
123        match self.inner.get(key)? {
124            Some(ref value) => Ok(Some(serde_json::from_slice(value)?)),
125            None => Ok(None),
126        }
127    }
128
129    /// Stores a value for the specified key
130    ///
131    /// # Arguments
132    ///
133    /// * `key` - The key to store the value under
134    /// * `value` - The value to store, which will be serialized to JSON
135    ///
136    /// # Returns
137    ///
138    /// Result containing the revision number of the stored value
139    pub fn put<S>(&self, key: &str, value: &S) -> Result<u64, SdkError>
140    where
141        S: serde::Serialize,
142    {
143        let value = serde_json::to_vec(value)?;
144
145        Ok(self.inner.put(key, &value)?)
146    }
147
148    /// Creates a new key-value pair, failing if the key already exists
149    ///
150    /// # Arguments
151    ///
152    /// * `key` - The key to create
153    /// * `value` - The value to store, which will be serialized to JSON
154    ///
155    /// # Returns
156    ///
157    /// Result containing the revision number of the created value
158    pub fn create<S>(&self, key: &str, value: &S) -> Result<u64, SdkError>
159    where
160        S: serde::Serialize,
161    {
162        let value = serde_json::to_vec(value)?;
163        Ok(self.inner.create(key, &value)?)
164    }
165
166    /// Deletes the specified key and its associated value
167    ///
168    /// # Arguments
169    ///
170    /// * `key` - The key to delete
171    ///
172    /// # Returns
173    ///
174    /// Result indicating success or an error if deletion fails
175    pub fn delete(&self, key: &str) -> Result<(), SdkError> {
176        Ok(self.inner.delete(key)?)
177    }
178}
179
180/// A subscription to a NATS subject that receives messages published to that subject
181pub struct NatsSubscription {
182    inner: wit::NatsSubscriber,
183}
184
185impl From<wit::NatsSubscriber> for NatsSubscription {
186    fn from(inner: wit::NatsSubscriber) -> Self {
187        NatsSubscription { inner }
188    }
189}
190
191impl NatsSubscription {
192    /// Gets the next message from the subscription
193    ///
194    /// # Returns
195    ///
196    /// Result containing the next message or an error if retrieval fails
197    pub fn next(&self) -> Result<Option<NatsMessage>, SdkError> {
198        self.inner.next().map_err(Into::into).map(|msg| msg.map(Into::into))
199    }
200}
201
202/// A message received from a NATS subscription containing the payload data
203pub struct NatsMessage {
204    inner: crate::wit::NatsMessage,
205}
206
207impl From<crate::wit::NatsMessage> for NatsMessage {
208    fn from(inner: crate::wit::NatsMessage) -> Self {
209        NatsMessage { inner }
210    }
211}
212
213impl NatsMessage {
214    /// Gets the payload data of the message
215    ///
216    /// # Returns
217    ///
218    /// Result containing the payload data or an error if retrieval fails
219    pub fn payload<S>(&self) -> anyhow::Result<S>
220    where
221        S: for<'de> serde::Deserialize<'de>,
222    {
223        Ok(serde_json::from_slice(&self.inner.payload)?)
224    }
225
226    /// Gets the subject of the message
227    ///
228    /// # Returns
229    ///
230    /// The NATS subject this message was published to
231    pub fn subject(&self) -> &str {
232        &self.inner.subject
233    }
234}
235
236/// Connects to one or more NATS servers
237///
238/// # Arguments
239///
240/// * `servers` - Iterator of server addresses to connect to
241///
242/// # Returns
243///
244/// Result containing the connected NATS client or an error if connection fails
245pub fn connect(servers: impl IntoIterator<Item = impl ToString>) -> Result<NatsClient, SdkError> {
246    let servers: Vec<_> = servers.into_iter().map(|s| s.to_string()).collect();
247    let inner = crate::wit::NatsClient::connect(&servers, None)?;
248
249    Ok(NatsClient { inner })
250}
251
252/// Connects to one or more NATS servers with authentication
253///
254/// # Arguments
255///
256/// * `servers` - Iterator of server addresses to connect to
257/// * `auth` - Authentication credentials for connecting to the servers
258///
259/// # Returns
260///
261/// Result containing the connected NATS client or an error if connection fails
262pub fn connect_with_auth(
263    servers: impl IntoIterator<Item = impl ToString>,
264    auth: &crate::NatsAuth,
265) -> Result<NatsClient, SdkError> {
266    let servers: Vec<_> = servers.into_iter().map(|s| s.to_string()).collect();
267    let inner = crate::wit::NatsClient::connect(&servers, Some(auth))?;
268
269    Ok(NatsClient { inner })
270}
271
272impl Subscription for NatsSubscription {
273    fn next(&mut self) -> Result<Option<types::FieldOutput>, Error> {
274        let item = match NatsSubscription::next(self) {
275            Ok(Some(item)) => item,
276            Ok(None) => return Ok(None),
277            Err(e) => return Err(format!("Error receiving NATS message: {e}").into()),
278        };
279
280        let mut field_output = types::FieldOutput::default();
281
282        let payload: serde_json::Value = item
283            .payload()
284            .map_err(|e| format!("Error parsing NATS value as JSON: {e}"))?;
285
286        field_output.push_value(payload);
287
288        Ok(Some(field_output))
289    }
290}
291
292/// Configuration for NATS JetStream consumers
293///
294/// This struct wraps the internal configuration for JetStream consumers
295/// and provides a builder pattern for easy configuration.
296pub struct NatsStreamConfig(wit::NatsStreamConfig);
297
298impl From<NatsStreamConfig> for wit::NatsStreamConfig {
299    fn from(config: NatsStreamConfig) -> Self {
300        config.0
301    }
302}
303
304/// Delivery policy for NATS JetStream consumers
305///
306/// This enum defines the various policies that determine how messages are delivered to
307/// JetStream consumers, such as delivering all messages, only the latest message,
308/// or messages from a specific sequence or time.
309#[derive(Debug)]
310pub enum NatsStreamDeliverPolicy {
311    /// All causes the consumer to receive the oldest messages still present in the system.
312    /// This is the default.
313    All,
314    /// Last will start the consumer with the last sequence received.
315    Last,
316    /// New will only deliver new messages that are received by the JetStream server after
317    /// the consumer is created.
318    New,
319    /// ByStartSequence will look for a defined starting sequence to the consumer’s configured
320    /// opt_start_seq parameter.
321    ByStartSequence(u64),
322    /// ByStartTime will select the first message with a timestamp >= to the consumer’s
323    /// configured opt_start_time parameter.
324    ByStartTime(OffsetDateTime),
325    /// LastPerSubject will start the consumer with the last message for all subjects received.
326    LastPerSubject,
327}
328
329impl From<NatsStreamDeliverPolicy> for wit::NatsStreamDeliverPolicy {
330    fn from(value: NatsStreamDeliverPolicy) -> Self {
331        match value {
332            NatsStreamDeliverPolicy::All => wit::NatsStreamDeliverPolicy::All,
333            NatsStreamDeliverPolicy::Last => wit::NatsStreamDeliverPolicy::Last,
334            NatsStreamDeliverPolicy::New => wit::NatsStreamDeliverPolicy::New,
335            NatsStreamDeliverPolicy::ByStartSequence(seq) => wit::NatsStreamDeliverPolicy::ByStartSequence(seq),
336            NatsStreamDeliverPolicy::ByStartTime(time) => {
337                wit::NatsStreamDeliverPolicy::ByStartTimeMs((time.unix_timestamp_nanos() / 1_000_000) as u64)
338            }
339            NatsStreamDeliverPolicy::LastPerSubject => wit::NatsStreamDeliverPolicy::LastPerSubject,
340        }
341    }
342}
343
344impl NatsStreamConfig {
345    /// Creates a new JetStream consumer configuration
346    ///
347    /// # Arguments
348    ///
349    /// * `deliver_policy` - Determines how messages are delivered to the consumer
350    /// * `inactive_threshold` - Duration after which a consumer is considered inactive
351    ///
352    /// # Returns
353    ///
354    /// A new `NatsStreamConfig` with the specified settings
355    pub fn new(
356        stream_name: String,
357        consumer_name: String,
358        deliver_policy: NatsStreamDeliverPolicy,
359        inactive_threshold: Duration,
360    ) -> Self {
361        NatsStreamConfig(wit::NatsStreamConfig {
362            stream_name,
363            consumer_name,
364            durable_name: None,
365            deliver_policy: deliver_policy.into(),
366            inactive_threshold_ms: inactive_threshold.as_millis() as u64,
367            description: None,
368        })
369    }
370
371    /// Sets a durable name for the consumer
372    ///
373    /// Durable consumers maintain their state even when disconnected.
374    ///
375    /// # Arguments
376    ///
377    /// * `durable_name` - The durable name to use for this consumer
378    ///
379    /// # Returns
380    ///
381    /// The updated configuration
382    pub fn with_durable_name(mut self, durable_name: String) -> Self {
383        self.0.durable_name = Some(durable_name);
384        self
385    }
386
387    /// Sets a description for the consumer
388    ///
389    /// # Arguments
390    ///
391    /// * `description` - The description to use for this consumer
392    ///
393    /// # Returns
394    ///
395    /// The updated configuration
396    pub fn with_description(mut self, description: String) -> Self {
397        self.0.description = Some(description);
398        self
399    }
400}