grafbase_sdk/host_io/pubsub/
nats.rs

1//! Client interface for interacting with NATS messaging system
2//!
3//! Ok(Some(field_output))
4//!
5//! This module provides a high-level client for connecting to and interacting with NATS servers.
6//! It supports both authenticated and unauthenticated connections to one or more NATS servers.
7
8use crate::{types, wit, Error};
9
10/// A client for interacting with NATS servers
11pub struct NatsClient {
12    inner: wit::NatsClient,
13}
14
15impl NatsClient {
16    /// Publishes a message to the specified NATS subject
17    ///
18    /// # Arguments
19    ///
20    /// * `subject` - The NATS subject to publish to
21    /// * `payload` - The message payload as a byte slice
22    ///
23    /// # Returns
24    ///
25    /// Result indicating success or an error if the publish fails
26    pub fn publish<S>(&self, subject: &str, payload: &S) -> Result<(), Box<dyn std::error::Error>>
27    where
28        S: serde::Serialize,
29    {
30        Ok(self.inner.publish(subject, &serde_json::to_vec(payload).unwrap())?)
31    }
32
33    /// Subscribes to messages on the specified NATS subject
34    ///
35    /// # Arguments
36    ///
37    /// * `subject` - The NATS subject to subscribe to
38    ///
39    /// # Returns
40    ///
41    /// Result containing the subscription or an error if subscription fails
42    pub fn subscribe(&self, subject: &str) -> Result<NatsSubscriber, Box<dyn std::error::Error>> {
43        Ok(self.inner.subscribe(subject).map(Into::into)?)
44    }
45}
46
47/// A subscription to a NATS subject that receives messages published to that subject
48pub struct NatsSubscriber {
49    inner: wit::NatsSubscriber,
50}
51
52impl From<wit::NatsSubscriber> for NatsSubscriber {
53    fn from(inner: wit::NatsSubscriber) -> Self {
54        NatsSubscriber { inner }
55    }
56}
57
58impl NatsSubscriber {
59    /// Gets the next message from the subscription
60    ///
61    /// # Returns
62    ///
63    /// Result containing the next message or an error if retrieval fails
64    pub fn next(&self) -> Option<NatsMessage> {
65        self.inner.next().map(Into::into)
66    }
67}
68
69/// A message received from a NATS subscription containing the payload data
70pub struct NatsMessage {
71    inner: crate::wit::NatsMessage,
72}
73
74impl From<crate::wit::NatsMessage> for NatsMessage {
75    fn from(inner: crate::wit::NatsMessage) -> Self {
76        NatsMessage { inner }
77    }
78}
79
80impl NatsMessage {
81    /// Gets the payload data of the message
82    ///
83    /// # Returns
84    ///
85    /// Result containing the payload data or an error if retrieval fails
86    pub fn payload<S>(&self) -> anyhow::Result<S>
87    where
88        S: for<'de> serde::Deserialize<'de>,
89    {
90        Ok(serde_json::from_slice(&self.inner.payload)?)
91    }
92
93    /// Gets the subject of the message
94    ///
95    /// # Returns
96    ///
97    /// The NATS subject this message was published to
98    pub fn subject(&self) -> &str {
99        &self.inner.subject
100    }
101}
102
103/// Connects to one or more NATS servers
104///
105/// # Arguments
106///
107/// * `servers` - Iterator of server addresses to connect to
108///
109/// # Returns
110///
111/// Result containing the connected NATS client or an error if connection fails
112pub fn connect(servers: impl IntoIterator<Item = impl ToString>) -> Result<NatsClient, Box<dyn std::error::Error>> {
113    let servers: Vec<_> = servers.into_iter().map(|s| s.to_string()).collect();
114    let inner = crate::wit::NatsClient::connect(&servers, None)?;
115
116    Ok(NatsClient { inner })
117}
118
119/// Connects to one or more NATS servers with authentication
120///
121/// # Arguments
122///
123/// * `servers` - Iterator of server addresses to connect to
124/// * `auth` - Authentication credentials for connecting to the servers
125///
126/// # Returns
127///
128/// Result containing the connected NATS client or an error if connection fails
129pub fn connect_with_auth(
130    servers: impl IntoIterator<Item = impl ToString>,
131    auth: &crate::NatsAuth,
132) -> Result<NatsClient, Box<dyn std::error::Error>> {
133    let servers: Vec<_> = servers.into_iter().map(|s| s.to_string()).collect();
134    let inner = crate::wit::NatsClient::connect(&servers, Some(auth))?;
135
136    Ok(NatsClient { inner })
137}
138
139impl super::Subscriber for NatsSubscriber {
140    fn next(&mut self) -> Result<Option<types::FieldOutput>, Error> {
141        let item = match NatsSubscriber::next(self) {
142            Some(item) => item,
143            None => return Ok(None),
144        };
145
146        let mut field_output = types::FieldOutput::default();
147
148        let payload: serde_json::Value = item.payload().map_err(|e| Error {
149            extensions: Vec::new(),
150            message: format!("Error parsing NATS value as JSON: {e}"),
151        })?;
152
153        field_output.push_value(payload);
154
155        Ok(Some(field_output))
156    }
157}