grafbase_sdk/host_io/pubsub/
nats.rs

1//! Client interface for interacting with NATS messaging system
2//!
3//! Ok(Some(field_output))
4//!
5//! This module provides a high-level client for connecting to and interacting with NATS servers.
6//! It supports both authenticated and unauthenticated connections to one or more NATS servers.
7
8use crate::{extension::resolver::Subscription, types, wit, Error};
9use std::time::Duration;
10
11pub use time::OffsetDateTime;
12
13/// A client for interacting with NATS servers
14pub struct NatsClient {
15    inner: wit::NatsClient,
16}
17
18impl NatsClient {
19    /// Publishes a message to the specified NATS subject
20    ///
21    /// # Arguments
22    ///
23    /// * `subject` - The NATS subject to publish to
24    /// * `payload` - The message payload as a byte slice
25    ///
26    /// # Returns
27    ///
28    /// Result indicating success or an error if the publish fails
29    pub fn publish<S>(&self, subject: &str, payload: &S) -> Result<(), Box<dyn std::error::Error>>
30    where
31        S: serde::Serialize,
32    {
33        Ok(self.inner.publish(subject, &serde_json::to_vec(payload).unwrap())?)
34    }
35
36    /// Sends a request to the specified NATS subject and waits for a response
37    ///
38    /// # Arguments
39    ///
40    /// * `subject` - The NATS subject to send the request to
41    /// * `payload` - The request payload to serialize and send
42    /// * `timeout` - Optional duration to wait for a response before timing out
43    ///
44    /// # Returns
45    ///
46    /// Result containing the deserialized response or an error if the request fails
47    pub fn request<S, T>(
48        &self,
49        subject: &str,
50        payload: &S,
51        timeout: Option<Duration>,
52    ) -> Result<T, Box<dyn std::error::Error>>
53    where
54        S: serde::Serialize,
55        T: for<'de> serde::Deserialize<'de>,
56    {
57        let timeout = timeout.map(|t| t.as_millis() as u64);
58        let body = serde_json::to_vec(payload).unwrap();
59        let response = self.inner.request(subject, &body, timeout)?;
60
61        Ok(serde_json::from_slice(&response.payload)?)
62    }
63
64    /// Subscribes to messages on the specified NATS subject
65    ///
66    /// # Arguments
67    ///
68    /// * `subject` - The NATS subject to subscribe to
69    ///
70    /// # Returns
71    ///
72    /// Result containing the subscription or an error if subscription fails
73    pub fn subscribe(
74        &self,
75        subject: &str,
76        config: Option<NatsStreamConfig>,
77    ) -> Result<NatsSubscription, Box<dyn std::error::Error>> {
78        let subscription = self
79            .inner
80            .subscribe(subject, config.map(Into::into).as_ref())
81            .map(Into::into)?;
82
83        Ok(subscription)
84    }
85
86    /// Gets a key-value store interface for the specified bucket
87    ///
88    /// # Arguments
89    ///
90    /// * `bucket` - The name of the JetStream KV bucket to access
91    ///
92    /// # Returns
93    ///
94    /// Result containing the key-value store interface or an error if retrieval fails
95    pub fn key_value(&self, bucket: &str) -> Result<NatsKeyValue, Box<dyn std::error::Error>> {
96        let store = self.inner.key_value(bucket)?;
97        Ok(store.into())
98    }
99}
100
101/// A key-value store for interacting with NATS JetStream KV buckets
102pub struct NatsKeyValue {
103    inner: wit::NatsKeyValue,
104}
105
106impl From<wit::NatsKeyValue> for NatsKeyValue {
107    fn from(inner: wit::NatsKeyValue) -> Self {
108        NatsKeyValue { inner }
109    }
110}
111
112impl NatsKeyValue {
113    /// Retrieves a value for the specified key
114    ///
115    /// # Arguments
116    ///
117    /// * `key` - The key to retrieve the value for
118    ///
119    /// # Returns
120    ///
121    /// Result containing the deserialized value if found, or None if the key doesn't exist
122    pub fn get<S>(&self, key: &str) -> Result<Option<S>, Box<dyn std::error::Error>>
123    where
124        S: for<'a> serde::Deserialize<'a>,
125    {
126        match self.inner.get(key)? {
127            Some(ref value) => Ok(Some(serde_json::from_slice(value)?)),
128            None => Ok(None),
129        }
130    }
131
132    /// Stores a value for the specified key
133    ///
134    /// # Arguments
135    ///
136    /// * `key` - The key to store the value under
137    /// * `value` - The value to store, which will be serialized to JSON
138    ///
139    /// # Returns
140    ///
141    /// Result containing the revision number of the stored value
142    pub fn put<S>(&self, key: &str, value: &S) -> Result<u64, Box<dyn std::error::Error>>
143    where
144        S: serde::Serialize,
145    {
146        let value = serde_json::to_vec(value)?;
147
148        Ok(self.inner.put(key, &value)?)
149    }
150
151    /// Creates a new key-value pair, failing if the key already exists
152    ///
153    /// # Arguments
154    ///
155    /// * `key` - The key to create
156    /// * `value` - The value to store, which will be serialized to JSON
157    ///
158    /// # Returns
159    ///
160    /// Result containing the revision number of the created value
161    pub fn create<S>(&self, key: &str, value: &S) -> Result<u64, Box<dyn std::error::Error>>
162    where
163        S: serde::Serialize,
164    {
165        let value = serde_json::to_vec(value)?;
166        Ok(self.inner.create(key, &value)?)
167    }
168
169    /// Deletes the specified key and its associated value
170    ///
171    /// # Arguments
172    ///
173    /// * `key` - The key to delete
174    ///
175    /// # Returns
176    ///
177    /// Result indicating success or an error if deletion fails
178    pub fn delete(&self, key: &str) -> Result<(), Box<dyn std::error::Error>> {
179        Ok(self.inner.delete(key)?)
180    }
181}
182
183/// A subscription to a NATS subject that receives messages published to that subject
184pub struct NatsSubscription {
185    inner: wit::NatsSubscriber,
186}
187
188impl From<wit::NatsSubscriber> for NatsSubscription {
189    fn from(inner: wit::NatsSubscriber) -> Self {
190        NatsSubscription { inner }
191    }
192}
193
194impl NatsSubscription {
195    /// Gets the next message from the subscription
196    ///
197    /// # Returns
198    ///
199    /// Result containing the next message or an error if retrieval fails
200    pub fn next(&self) -> Result<Option<NatsMessage>, Box<dyn std::error::Error>> {
201        Ok(self.inner.next()?.map(Into::into))
202    }
203}
204
205/// A message received from a NATS subscription containing the payload data
206pub struct NatsMessage {
207    inner: crate::wit::NatsMessage,
208}
209
210impl From<crate::wit::NatsMessage> for NatsMessage {
211    fn from(inner: crate::wit::NatsMessage) -> Self {
212        NatsMessage { inner }
213    }
214}
215
216impl NatsMessage {
217    /// Gets the payload data of the message
218    ///
219    /// # Returns
220    ///
221    /// Result containing the payload data or an error if retrieval fails
222    pub fn payload<S>(&self) -> anyhow::Result<S>
223    where
224        S: for<'de> serde::Deserialize<'de>,
225    {
226        Ok(serde_json::from_slice(&self.inner.payload)?)
227    }
228
229    /// Gets the subject of the message
230    ///
231    /// # Returns
232    ///
233    /// The NATS subject this message was published to
234    pub fn subject(&self) -> &str {
235        &self.inner.subject
236    }
237}
238
239/// Connects to one or more NATS servers
240///
241/// # Arguments
242///
243/// * `servers` - Iterator of server addresses to connect to
244///
245/// # Returns
246///
247/// Result containing the connected NATS client or an error if connection fails
248pub fn connect(servers: impl IntoIterator<Item = impl ToString>) -> Result<NatsClient, Box<dyn std::error::Error>> {
249    let servers: Vec<_> = servers.into_iter().map(|s| s.to_string()).collect();
250    let inner = crate::wit::NatsClient::connect(&servers, None)?;
251
252    Ok(NatsClient { inner })
253}
254
255/// Connects to one or more NATS servers with authentication
256///
257/// # Arguments
258///
259/// * `servers` - Iterator of server addresses to connect to
260/// * `auth` - Authentication credentials for connecting to the servers
261///
262/// # Returns
263///
264/// Result containing the connected NATS client or an error if connection fails
265pub fn connect_with_auth(
266    servers: impl IntoIterator<Item = impl ToString>,
267    auth: &crate::NatsAuth,
268) -> Result<NatsClient, Box<dyn std::error::Error>> {
269    let servers: Vec<_> = servers.into_iter().map(|s| s.to_string()).collect();
270    let inner = crate::wit::NatsClient::connect(&servers, Some(auth))?;
271
272    Ok(NatsClient { inner })
273}
274
275impl Subscription for NatsSubscription {
276    fn next(&mut self) -> Result<Option<types::FieldOutput>, Error> {
277        let item = match NatsSubscription::next(self) {
278            Ok(Some(item)) => item,
279            Ok(None) => return Ok(None),
280            Err(e) => {
281                return Err(Error {
282                    extensions: Vec::new(),
283                    message: format!("Error receiving NATS message: {e}"),
284                })
285            }
286        };
287
288        let mut field_output = types::FieldOutput::default();
289
290        let payload: serde_json::Value = item.payload().map_err(|e| Error {
291            extensions: Vec::new(),
292            message: format!("Error parsing NATS value as JSON: {e}"),
293        })?;
294
295        field_output.push_value(payload);
296
297        Ok(Some(field_output))
298    }
299}
300
301/// Configuration for NATS JetStream consumers
302///
303/// This struct wraps the internal configuration for JetStream consumers
304/// and provides a builder pattern for easy configuration.
305pub struct NatsStreamConfig(wit::NatsStreamConfig);
306
307impl From<NatsStreamConfig> for wit::NatsStreamConfig {
308    fn from(config: NatsStreamConfig) -> Self {
309        config.0
310    }
311}
312
313/// Delivery policy for NATS JetStream consumers
314///
315/// This enum defines the various policies that determine how messages are delivered to
316/// JetStream consumers, such as delivering all messages, only the latest message,
317/// or messages from a specific sequence or time.
318#[derive(Debug)]
319pub enum NatsStreamDeliverPolicy {
320    /// All causes the consumer to receive the oldest messages still present in the system.
321    /// This is the default.
322    All,
323    /// Last will start the consumer with the last sequence received.
324    Last,
325    /// New will only deliver new messages that are received by the JetStream server after
326    /// the consumer is created.
327    New,
328    /// ByStartSequence will look for a defined starting sequence to the consumer’s configured
329    /// opt_start_seq parameter.
330    ByStartSequence(u64),
331    /// ByStartTime will select the first message with a timestamp >= to the consumer’s
332    /// configured opt_start_time parameter.
333    ByStartTime(OffsetDateTime),
334    /// LastPerSubject will start the consumer with the last message for all subjects received.
335    LastPerSubject,
336}
337
338impl From<NatsStreamDeliverPolicy> for wit::NatsStreamDeliverPolicy {
339    fn from(value: NatsStreamDeliverPolicy) -> Self {
340        match value {
341            NatsStreamDeliverPolicy::All => wit::NatsStreamDeliverPolicy::All,
342            NatsStreamDeliverPolicy::Last => wit::NatsStreamDeliverPolicy::Last,
343            NatsStreamDeliverPolicy::New => wit::NatsStreamDeliverPolicy::New,
344            NatsStreamDeliverPolicy::ByStartSequence(seq) => wit::NatsStreamDeliverPolicy::ByStartSequence(seq),
345            NatsStreamDeliverPolicy::ByStartTime(time) => {
346                wit::NatsStreamDeliverPolicy::ByStartTimeMs((time.unix_timestamp_nanos() / 1_000_000) as u64)
347            }
348            NatsStreamDeliverPolicy::LastPerSubject => wit::NatsStreamDeliverPolicy::LastPerSubject,
349        }
350    }
351}
352
353impl NatsStreamConfig {
354    /// Creates a new JetStream consumer configuration
355    ///
356    /// # Arguments
357    ///
358    /// * `deliver_policy` - Determines how messages are delivered to the consumer
359    /// * `inactive_threshold` - Duration after which a consumer is considered inactive
360    ///
361    /// # Returns
362    ///
363    /// A new `NatsStreamConfig` with the specified settings
364    pub fn new(
365        stream_name: String,
366        consumer_name: String,
367        deliver_policy: NatsStreamDeliverPolicy,
368        inactive_threshold: Duration,
369    ) -> Self {
370        NatsStreamConfig(wit::NatsStreamConfig {
371            stream_name,
372            consumer_name,
373            durable_name: None,
374            deliver_policy: deliver_policy.into(),
375            inactive_threshold_ms: inactive_threshold.as_millis() as u64,
376            description: None,
377        })
378    }
379
380    /// Sets a durable name for the consumer
381    ///
382    /// Durable consumers maintain their state even when disconnected.
383    ///
384    /// # Arguments
385    ///
386    /// * `durable_name` - The durable name to use for this consumer
387    ///
388    /// # Returns
389    ///
390    /// The updated configuration
391    pub fn with_durable_name(mut self, durable_name: String) -> Self {
392        self.0.durable_name = Some(durable_name);
393        self
394    }
395
396    /// Sets a description for the consumer
397    ///
398    /// # Arguments
399    ///
400    /// * `description` - The description to use for this consumer
401    ///
402    /// # Returns
403    ///
404    /// The updated configuration
405    pub fn with_description(mut self, description: String) -> Self {
406        self.0.description = Some(description);
407        self
408    }
409}