grafbase_sdk/host_io/
nats.rs

1//! Client interface for interacting with NATS messaging system
2//!
3//! Ok(Some(field_output))
4//!
5//! This module provides a high-level client for connecting to and interacting with NATS servers.
6//! It supports both authenticated and unauthenticated connections to one or more NATS servers.
7
8use crate::{
9    SdkError,
10    extension::field_resolver::Subscription,
11    types::{Error, SubscriptionOutput},
12    wit,
13};
14use std::time::Duration;
15
16pub use time::OffsetDateTime;
17pub use wit::NatsAuth;
18
19/// A client for interacting with NATS servers
20pub struct NatsClient {
21    inner: wit::NatsClient,
22}
23
24impl NatsClient {
25    /// Publishes a message in JSON formatto the specified NATS subject
26    ///
27    /// # Arguments
28    ///
29    /// * `subject` - The NATS subject to publish to
30    /// * `payload` - The message payload as a byte slice
31    ///
32    /// # Returns
33    ///
34    /// Result indicating success or an error if the publish fails
35    pub fn publish<S>(&self, subject: &str, payload: &S) -> Result<(), SdkError>
36    where
37        S: serde::Serialize,
38    {
39        self.inner
40            .publish(subject, &serde_json::to_vec(payload)?)
41            .map_err(Into::into)
42    }
43
44    /// Sends a request in JSON to the specified NATS subject and waits for a response
45    ///
46    /// # Arguments
47    ///
48    /// * `subject` - The NATS subject to send the request to
49    /// * `payload` - The request payload to serialize and send
50    /// * `timeout` - Optional duration to wait for a response before timing out
51    ///
52    /// # Returns
53    ///
54    /// Result containing the deserialized response or an error if the request fails
55    pub fn request<S, T>(&self, subject: &str, payload: &S, timeout: Option<Duration>) -> Result<T, SdkError>
56    where
57        S: serde::Serialize,
58        T: for<'de> serde::Deserialize<'de>,
59    {
60        let body = serde_json::to_vec(payload).unwrap();
61        let response = self.request_bytes(subject, &body, timeout)?;
62
63        Ok(serde_json::from_slice(&response)?)
64    }
65
66    /// Sends a request to the specified NATS subject and waits for a response, returning raw bytes
67    ///
68    /// # Arguments
69    ///
70    /// * `subject` - The NATS subject to send the request to
71    /// * `body` - The raw byte payload to send
72    /// * `timeout` - Optional duration to wait for a response before timing out
73    ///
74    /// # Returns
75    ///
76    /// Result containing the raw byte response or an error if the request fails
77    pub fn request_bytes(&self, subject: &str, body: &[u8], timeout: Option<Duration>) -> Result<Vec<u8>, SdkError> {
78        let timeout = timeout.map(|t| t.as_millis() as u64);
79        let response = self.inner.request(subject, body, timeout)?;
80
81        Ok(response.payload)
82    }
83
84    /// Subscribes to messages on the specified NATS subject
85    ///
86    /// # Arguments
87    ///
88    /// * `subject` - The NATS subject to subscribe to
89    ///
90    /// # Returns
91    ///
92    /// Result containing the subscription or an error if subscription fails
93    pub fn subscribe(&self, subject: &str, config: Option<NatsStreamConfig>) -> Result<NatsSubscription, SdkError> {
94        let subscription = self
95            .inner
96            .subscribe(subject, config.map(Into::into).as_ref())
97            .map(Into::into)?;
98
99        Ok(subscription)
100    }
101
102    /// Gets a key-value store interface for the specified bucket
103    ///
104    /// # Arguments
105    ///
106    /// * `bucket` - The name of the JetStream KV bucket to access
107    ///
108    /// # Returns
109    ///
110    /// Result containing the key-value store interface or an error if retrieval fails
111    pub fn key_value(&self, bucket: &str) -> Result<NatsKeyValue, SdkError> {
112        let store = self.inner.key_value(bucket)?;
113        Ok(store.into())
114    }
115}
116
117/// A key-value store for interacting with NATS JetStream KV buckets
118pub struct NatsKeyValue {
119    inner: wit::NatsKeyValue,
120}
121
122impl From<wit::NatsKeyValue> for NatsKeyValue {
123    fn from(inner: wit::NatsKeyValue) -> Self {
124        NatsKeyValue { inner }
125    }
126}
127
128impl NatsKeyValue {
129    /// Retrieves a value for the specified key in JSON format
130    ///
131    /// # Arguments
132    ///
133    /// * `key` - The key to retrieve the value for
134    ///
135    /// # Returns
136    ///
137    /// Result containing the deserialized value if found, or None if the key doesn't exist
138    pub fn get<S>(&self, key: &str) -> Result<Option<S>, SdkError>
139    where
140        S: for<'a> serde::Deserialize<'a>,
141    {
142        match self.get_bytes(key)? {
143            Some(ref value) => Ok(Some(serde_json::from_slice(value)?)),
144            None => Ok(None),
145        }
146    }
147
148    /// Retrieves the raw bytes for the specified key
149    ///
150    /// # Arguments
151    ///
152    /// * `key` - The key to retrieve the value for
153    ///
154    /// # Returns
155    ///
156    /// Result containing the raw byte value if found, or None if the key doesn't exist
157    pub fn get_bytes(&self, key: &str) -> Result<Option<Vec<u8>>, SdkError> {
158        match self.inner.get(key)? {
159            Some(value) => Ok(Some(value)),
160            None => Ok(None),
161        }
162    }
163
164    /// Stores a value for the specified key in JSON format
165    ///
166    /// # Arguments
167    ///
168    /// * `key` - The key to store the value under
169    /// * `value` - The value to store, which will be serialized to JSON
170    ///
171    /// # Returns
172    ///
173    /// Result containing the revision number of the stored value
174    pub fn put<S>(&self, key: &str, value: &S) -> Result<u64, SdkError>
175    where
176        S: serde::Serialize,
177    {
178        let value = serde_json::to_vec(value)?;
179        self.put_bytes(key, &value)
180    }
181
182    /// Stores raw bytes for the specified key
183    ///
184    /// # Arguments
185    ///
186    /// * `key` - The key to store the value under
187    /// * `value` - The raw byte value to store
188    ///
189    /// # Returns
190    ///
191    /// Result containing the revision number of the stored value
192    pub fn put_bytes(&self, key: &str, value: &[u8]) -> Result<u64, SdkError> {
193        Ok(self.inner.put(key, value)?)
194    }
195
196    /// Creates a new key-value pair in JSON format, failing if the key already exists
197    ///
198    /// # Arguments
199    ///
200    /// * `key` - The key to create
201    /// * `value` - The value to store, which will be serialized to JSON
202    ///
203    /// # Returns
204    ///
205    /// Result containing the revision number of the created value
206    pub fn create<S>(&self, key: &str, value: &S) -> Result<u64, SdkError>
207    where
208        S: serde::Serialize,
209    {
210        let value = serde_json::to_vec(value)?;
211        self.create_bytes(key, &value)
212    }
213
214    /// Creates a new key-value pair with raw bytes, failing if the key already exists
215    ///
216    /// # Arguments
217    ///
218    /// * `key` - The key to create
219    /// * `value` - The raw byte value to store
220    ///
221    /// # Returns
222    ///
223    /// Result containing the revision number of the created value
224    pub fn create_bytes(&self, key: &str, value: &[u8]) -> Result<u64, SdkError> {
225        Ok(self.inner.create(key, value)?)
226    }
227
228    /// Deletes the specified key and its associated value
229    ///
230    /// # Arguments
231    ///
232    /// * `key` - The key to delete
233    ///
234    /// # Returns
235    ///
236    /// Result indicating success or an error if deletion fails
237    pub fn delete(&self, key: &str) -> Result<(), SdkError> {
238        Ok(self.inner.delete(key)?)
239    }
240}
241
242/// A subscription to a NATS subject that receives messages published to that subject
243pub struct NatsSubscription {
244    inner: wit::NatsSubscriber,
245}
246
247impl From<wit::NatsSubscriber> for NatsSubscription {
248    fn from(inner: wit::NatsSubscriber) -> Self {
249        NatsSubscription { inner }
250    }
251}
252
253impl NatsSubscription {
254    /// Gets the next message from the subscription
255    ///
256    /// # Returns
257    ///
258    /// Result containing the next message or an error if retrieval fails
259    pub fn next(&self) -> Result<Option<NatsMessage>, SdkError> {
260        self.inner.next().map_err(Into::into).map(|msg| msg.map(Into::into))
261    }
262}
263
264/// A message received from a NATS subscription containing the payload data
265pub struct NatsMessage {
266    inner: crate::wit::NatsMessage,
267}
268
269impl From<crate::wit::NatsMessage> for NatsMessage {
270    fn from(inner: crate::wit::NatsMessage) -> Self {
271        NatsMessage { inner }
272    }
273}
274
275impl NatsMessage {
276    /// Gets the payload data of the message in JSON format
277    ///
278    /// # Returns
279    ///
280    /// Result containing the payload data or an error if retrieval fails
281    pub fn payload<S>(&self) -> Result<S, SdkError>
282    where
283        S: for<'de> serde::Deserialize<'de>,
284    {
285        Ok(serde_json::from_slice(self.payload_bytes())?)
286    }
287
288    /// Gets the raw bytes of the message payload
289    ///
290    /// # Returns
291    ///
292    /// A byte slice containing the raw message payload
293    pub fn payload_bytes(&self) -> &[u8] {
294        &self.inner.payload
295    }
296
297    /// Gets the subject of the message
298    ///
299    /// # Returns
300    ///
301    /// The NATS subject this message was published to
302    pub fn subject(&self) -> &str {
303        &self.inner.subject
304    }
305}
306
307/// Connects to one or more NATS servers
308///
309/// # Arguments
310///
311/// * `servers` - Iterator of server addresses to connect to
312///
313/// # Returns
314///
315/// Result containing the connected NATS client or an error if connection fails
316pub fn connect(servers: impl IntoIterator<Item = impl ToString>) -> Result<NatsClient, SdkError> {
317    let servers: Vec<_> = servers.into_iter().map(|s| s.to_string()).collect();
318    let inner = crate::wit::NatsClient::connect(&servers, None)?;
319
320    Ok(NatsClient { inner })
321}
322
323/// Connects to one or more NATS servers with authentication
324///
325/// # Arguments
326///
327/// * `servers` - Iterator of server addresses to connect to
328/// * `auth` - Authentication credentials for connecting to the servers
329///
330/// # Returns
331///
332/// Result containing the connected NATS client or an error if connection fails
333pub fn connect_with_auth(
334    servers: impl IntoIterator<Item = impl ToString>,
335    auth: &NatsAuth,
336) -> Result<NatsClient, SdkError> {
337    let servers: Vec<_> = servers.into_iter().map(|s| s.to_string()).collect();
338    let inner = crate::wit::NatsClient::connect(&servers, Some(auth))?;
339
340    Ok(NatsClient { inner })
341}
342
343impl Subscription for NatsSubscription {
344    fn next(&mut self) -> Result<Option<SubscriptionOutput>, Error> {
345        let item = match NatsSubscription::next(self) {
346            Ok(Some(item)) => item,
347            Ok(None) => return Ok(None),
348            Err(e) => return Err(format!("Error receiving NATS message: {e}").into()),
349        };
350
351        let mut builder = SubscriptionOutput::builder();
352
353        let payload: serde_json::Value = item
354            .payload()
355            .map_err(|e| format!("Error parsing NATS value as JSON: {e}"))?;
356
357        builder.push(payload)?;
358
359        Ok(Some(builder.build()))
360    }
361}
362
363/// Configuration for NATS JetStream consumers
364///
365/// This struct wraps the internal configuration for JetStream consumers
366/// and provides a builder pattern for easy configuration.
367pub struct NatsStreamConfig(wit::NatsStreamConfig);
368
369impl From<NatsStreamConfig> for wit::NatsStreamConfig {
370    fn from(config: NatsStreamConfig) -> Self {
371        config.0
372    }
373}
374
375/// Delivery policy for NATS JetStream consumers
376///
377/// This enum defines the various policies that determine how messages are delivered to
378/// JetStream consumers, such as delivering all messages, only the latest message,
379/// or messages from a specific sequence or time.
380#[derive(Debug)]
381pub enum NatsStreamDeliverPolicy {
382    /// All causes the consumer to receive the oldest messages still present in the system.
383    /// This is the default.
384    All,
385    /// Last will start the consumer with the last sequence received.
386    Last,
387    /// New will only deliver new messages that are received by the JetStream server after
388    /// the consumer is created.
389    New,
390    /// ByStartSequence will look for a defined starting sequence to the consumer’s configured
391    /// opt_start_seq parameter.
392    ByStartSequence(u64),
393    /// ByStartTime will select the first message with a timestamp >= to the consumer’s
394    /// configured opt_start_time parameter.
395    ByStartTime(OffsetDateTime),
396    /// LastPerSubject will start the consumer with the last message for all subjects received.
397    LastPerSubject,
398}
399
400impl From<NatsStreamDeliverPolicy> for wit::NatsStreamDeliverPolicy {
401    fn from(value: NatsStreamDeliverPolicy) -> Self {
402        match value {
403            NatsStreamDeliverPolicy::All => wit::NatsStreamDeliverPolicy::All,
404            NatsStreamDeliverPolicy::Last => wit::NatsStreamDeliverPolicy::Last,
405            NatsStreamDeliverPolicy::New => wit::NatsStreamDeliverPolicy::New,
406            NatsStreamDeliverPolicy::ByStartSequence(seq) => wit::NatsStreamDeliverPolicy::ByStartSequence(seq),
407            NatsStreamDeliverPolicy::ByStartTime(time) => {
408                wit::NatsStreamDeliverPolicy::ByStartTimeMs((time.unix_timestamp_nanos() / 1_000_000) as u64)
409            }
410            NatsStreamDeliverPolicy::LastPerSubject => wit::NatsStreamDeliverPolicy::LastPerSubject,
411        }
412    }
413}
414
415impl NatsStreamConfig {
416    /// Creates a new JetStream consumer configuration
417    ///
418    /// # Arguments
419    ///
420    /// * `deliver_policy` - Determines how messages are delivered to the consumer
421    /// * `inactive_threshold` - Duration after which a consumer is considered inactive
422    ///
423    /// # Returns
424    ///
425    /// A new `NatsStreamConfig` with the specified settings
426    pub fn new(
427        stream_name: String,
428        consumer_name: String,
429        deliver_policy: NatsStreamDeliverPolicy,
430        inactive_threshold: Duration,
431    ) -> Self {
432        NatsStreamConfig(wit::NatsStreamConfig {
433            stream_name,
434            consumer_name,
435            durable_name: None,
436            deliver_policy: deliver_policy.into(),
437            inactive_threshold_ms: inactive_threshold.as_millis() as u64,
438            description: None,
439        })
440    }
441
442    /// Sets a durable name for the consumer
443    ///
444    /// Durable consumers maintain their state even when disconnected.
445    ///
446    /// # Arguments
447    ///
448    /// * `durable_name` - The durable name to use for this consumer
449    ///
450    /// # Returns
451    ///
452    /// The updated configuration
453    pub fn with_durable_name(mut self, durable_name: String) -> Self {
454        self.0.durable_name = Some(durable_name);
455        self
456    }
457
458    /// Sets a description for the consumer
459    ///
460    /// # Arguments
461    ///
462    /// * `description` - The description to use for this consumer
463    ///
464    /// # Returns
465    ///
466    /// The updated configuration
467    pub fn with_description(mut self, description: String) -> Self {
468        self.0.description = Some(description);
469        self
470    }
471}