grafbase_sdk/host_io/nats.rs
1//! Client interface for interacting with NATS messaging system
2//!
3//! Ok(Some(field_output))
4//!
5//! This module provides a high-level client for connecting to and interacting with NATS servers.
6//! It supports both authenticated and unauthenticated connections to one or more NATS servers.
7
8use crate::{
9 SdkError, Subscription,
10 types::{Error, Response, SubscriptionItem},
11 wit,
12};
13use std::time::Duration;
14
15pub use time::OffsetDateTime;
16pub use wit::NatsAuth;
17
18/// A client for interacting with NATS servers
19pub struct NatsClient {
20 inner: wit::NatsClient,
21}
22
23impl NatsClient {
24 /// Publishes a message in JSON formatto the specified NATS subject
25 ///
26 /// # Arguments
27 ///
28 /// * `subject` - The NATS subject to publish to
29 /// * `payload` - The message payload as a byte slice
30 ///
31 /// # Returns
32 ///
33 /// Result indicating success or an error if the publish fails
34 pub fn publish<S>(&self, subject: &str, payload: &S) -> Result<(), SdkError>
35 where
36 S: serde::Serialize,
37 {
38 self.inner
39 .publish(subject, &serde_json::to_vec(payload)?)
40 .map_err(Into::into)
41 }
42
43 /// Sends a request in JSON to the specified NATS subject and waits for a response
44 ///
45 /// # Arguments
46 ///
47 /// * `subject` - The NATS subject to send the request to
48 /// * `payload` - The request payload to serialize and send
49 /// * `timeout` - Optional duration to wait for a response before timing out
50 ///
51 /// # Returns
52 ///
53 /// Result containing the deserialized response or an error if the request fails
54 pub fn request<S, T>(&self, subject: &str, payload: &S, timeout: Option<Duration>) -> Result<T, SdkError>
55 where
56 S: serde::Serialize,
57 T: for<'de> serde::Deserialize<'de>,
58 {
59 let body = serde_json::to_vec(payload).unwrap();
60 let response = self.request_bytes(subject, &body, timeout)?;
61
62 Ok(serde_json::from_slice(&response)?)
63 }
64
65 /// Sends a request to the specified NATS subject and waits for a response, returning raw bytes
66 ///
67 /// # Arguments
68 ///
69 /// * `subject` - The NATS subject to send the request to
70 /// * `body` - The raw byte payload to send
71 /// * `timeout` - Optional duration to wait for a response before timing out
72 ///
73 /// # Returns
74 ///
75 /// Result containing the raw byte response or an error if the request fails
76 pub fn request_bytes(&self, subject: &str, body: &[u8], timeout: Option<Duration>) -> Result<Vec<u8>, SdkError> {
77 let timeout = timeout.map(|t| t.as_millis() as u64);
78 let response = self.inner.request(subject, body, timeout)?;
79
80 Ok(response.payload)
81 }
82
83 /// Subscribes to messages on the specified NATS subject
84 ///
85 /// # Arguments
86 ///
87 /// * `subject` - The NATS subject to subscribe to
88 ///
89 /// # Returns
90 ///
91 /// Result containing the subscription or an error if subscription fails
92 pub fn subscribe(&self, subject: &str, config: Option<NatsStreamConfig>) -> Result<NatsSubscription, SdkError> {
93 let subscription = self
94 .inner
95 .subscribe(subject, config.map(Into::into).as_ref())
96 .map(Into::into)?;
97
98 Ok(subscription)
99 }
100
101 /// Gets a key-value store interface for the specified bucket
102 ///
103 /// # Arguments
104 ///
105 /// * `bucket` - The name of the JetStream KV bucket to access
106 ///
107 /// # Returns
108 ///
109 /// Result containing the key-value store interface or an error if retrieval fails
110 pub fn key_value(&self, bucket: &str) -> Result<NatsKeyValue, SdkError> {
111 let store = self.inner.key_value(bucket)?;
112 Ok(store.into())
113 }
114}
115
116/// A key-value store for interacting with NATS JetStream KV buckets
117pub struct NatsKeyValue {
118 inner: wit::NatsKeyValue,
119}
120
121impl From<wit::NatsKeyValue> for NatsKeyValue {
122 fn from(inner: wit::NatsKeyValue) -> Self {
123 NatsKeyValue { inner }
124 }
125}
126
127impl NatsKeyValue {
128 /// Retrieves a value for the specified key in JSON format
129 ///
130 /// # Arguments
131 ///
132 /// * `key` - The key to retrieve the value for
133 ///
134 /// # Returns
135 ///
136 /// Result containing the deserialized value if found, or None if the key doesn't exist
137 pub fn get<S>(&self, key: &str) -> Result<Option<S>, SdkError>
138 where
139 S: for<'a> serde::Deserialize<'a>,
140 {
141 match self.get_bytes(key)? {
142 Some(ref value) => Ok(Some(serde_json::from_slice(value)?)),
143 None => Ok(None),
144 }
145 }
146
147 /// Retrieves the raw bytes for the specified key
148 ///
149 /// # Arguments
150 ///
151 /// * `key` - The key to retrieve the value for
152 ///
153 /// # Returns
154 ///
155 /// Result containing the raw byte value if found, or None if the key doesn't exist
156 pub fn get_bytes(&self, key: &str) -> Result<Option<Vec<u8>>, SdkError> {
157 match self.inner.get(key)? {
158 Some(value) => Ok(Some(value)),
159 None => Ok(None),
160 }
161 }
162
163 /// Stores a value for the specified key in JSON format
164 ///
165 /// # Arguments
166 ///
167 /// * `key` - The key to store the value under
168 /// * `value` - The value to store, which will be serialized to JSON
169 ///
170 /// # Returns
171 ///
172 /// Result containing the revision number of the stored value
173 pub fn put<S>(&self, key: &str, value: &S) -> Result<u64, SdkError>
174 where
175 S: serde::Serialize,
176 {
177 let value = serde_json::to_vec(value)?;
178 self.put_bytes(key, &value)
179 }
180
181 /// Stores raw bytes for the specified key
182 ///
183 /// # Arguments
184 ///
185 /// * `key` - The key to store the value under
186 /// * `value` - The raw byte value to store
187 ///
188 /// # Returns
189 ///
190 /// Result containing the revision number of the stored value
191 pub fn put_bytes(&self, key: &str, value: &[u8]) -> Result<u64, SdkError> {
192 Ok(self.inner.put(key, value)?)
193 }
194
195 /// Creates a new key-value pair in JSON format, failing if the key already exists
196 ///
197 /// # Arguments
198 ///
199 /// * `key` - The key to create
200 /// * `value` - The value to store, which will be serialized to JSON
201 ///
202 /// # Returns
203 ///
204 /// Result containing the revision number of the created value
205 pub fn create<S>(&self, key: &str, value: &S) -> Result<u64, SdkError>
206 where
207 S: serde::Serialize,
208 {
209 let value = serde_json::to_vec(value)?;
210 self.create_bytes(key, &value)
211 }
212
213 /// Creates a new key-value pair with raw bytes, failing if the key already exists
214 ///
215 /// # Arguments
216 ///
217 /// * `key` - The key to create
218 /// * `value` - The raw byte value to store
219 ///
220 /// # Returns
221 ///
222 /// Result containing the revision number of the created value
223 pub fn create_bytes(&self, key: &str, value: &[u8]) -> Result<u64, SdkError> {
224 Ok(self.inner.create(key, value)?)
225 }
226
227 /// Deletes the specified key and its associated value
228 ///
229 /// # Arguments
230 ///
231 /// * `key` - The key to delete
232 ///
233 /// # Returns
234 ///
235 /// Result indicating success or an error if deletion fails
236 pub fn delete(&self, key: &str) -> Result<(), SdkError> {
237 Ok(self.inner.delete(key)?)
238 }
239}
240
241/// A subscription to a NATS subject that receives messages published to that subject
242pub struct NatsSubscription {
243 inner: wit::NatsSubscriber,
244}
245
246impl From<wit::NatsSubscriber> for NatsSubscription {
247 fn from(inner: wit::NatsSubscriber) -> Self {
248 NatsSubscription { inner }
249 }
250}
251
252impl NatsSubscription {
253 /// Gets the next message from the subscription
254 ///
255 /// # Returns
256 ///
257 /// Result containing the next message or an error if retrieval fails
258 pub fn next(&self) -> Result<Option<NatsMessage>, SdkError> {
259 self.inner.next().map_err(Into::into).map(|msg| msg.map(Into::into))
260 }
261}
262
263/// A message received from a NATS subscription containing the payload data
264pub struct NatsMessage {
265 inner: crate::wit::NatsMessage,
266}
267
268impl From<crate::wit::NatsMessage> for NatsMessage {
269 fn from(inner: crate::wit::NatsMessage) -> Self {
270 NatsMessage { inner }
271 }
272}
273
274impl NatsMessage {
275 /// Gets the payload data of the message in JSON format
276 ///
277 /// # Returns
278 ///
279 /// Result containing the payload data or an error if retrieval fails
280 pub fn payload<S>(&self) -> Result<S, SdkError>
281 where
282 S: for<'de> serde::Deserialize<'de>,
283 {
284 Ok(serde_json::from_slice(self.payload_bytes())?)
285 }
286
287 /// Gets the raw bytes of the message payload
288 ///
289 /// # Returns
290 ///
291 /// A byte slice containing the raw message payload
292 pub fn payload_bytes(&self) -> &[u8] {
293 &self.inner.payload
294 }
295
296 /// Gets the subject of the message
297 ///
298 /// # Returns
299 ///
300 /// The NATS subject this message was published to
301 pub fn subject(&self) -> &str {
302 &self.inner.subject
303 }
304}
305
306/// Connects to one or more NATS servers
307///
308/// # Arguments
309///
310/// * `servers` - Iterator of server addresses to connect to
311///
312/// # Returns
313///
314/// Result containing the connected NATS client or an error if connection fails
315pub fn connect(servers: impl IntoIterator<Item = impl ToString>) -> Result<NatsClient, SdkError> {
316 let servers: Vec<_> = servers.into_iter().map(|s| s.to_string()).collect();
317 let inner = crate::wit::NatsClient::connect(&servers, None)?;
318
319 Ok(NatsClient { inner })
320}
321
322/// Connects to one or more NATS servers with authentication
323///
324/// # Arguments
325///
326/// * `servers` - Iterator of server addresses to connect to
327/// * `auth` - Authentication credentials for connecting to the servers
328///
329/// # Returns
330///
331/// Result containing the connected NATS client or an error if connection fails
332pub fn connect_with_auth(
333 servers: impl IntoIterator<Item = impl ToString>,
334 auth: &NatsAuth,
335) -> Result<NatsClient, SdkError> {
336 let servers: Vec<_> = servers.into_iter().map(|s| s.to_string()).collect();
337 let inner = crate::wit::NatsClient::connect(&servers, Some(auth))?;
338
339 Ok(NatsClient { inner })
340}
341
342impl Subscription for NatsSubscription {
343 fn next(&mut self) -> Result<Option<SubscriptionItem>, Error> {
344 match NatsSubscription::next(self) {
345 Ok(Some(msg)) => Ok(Some(Response::json(msg.inner.payload).into())),
346 Ok(None) => Ok(None),
347 Err(err) => Err(format!("Error receiving NATS message: {err}").into()),
348 }
349 }
350}
351
352/// Configuration for NATS JetStream consumers
353///
354/// This struct wraps the internal configuration for JetStream consumers
355/// and provides a builder pattern for easy configuration.
356pub struct NatsStreamConfig(wit::NatsStreamConfig);
357
358impl From<NatsStreamConfig> for wit::NatsStreamConfig {
359 fn from(config: NatsStreamConfig) -> Self {
360 config.0
361 }
362}
363
364/// Delivery policy for NATS JetStream consumers
365///
366/// This enum defines the various policies that determine how messages are delivered to
367/// JetStream consumers, such as delivering all messages, only the latest message,
368/// or messages from a specific sequence or time.
369#[derive(Debug)]
370pub enum NatsStreamDeliverPolicy {
371 /// All causes the consumer to receive the oldest messages still present in the system.
372 /// This is the default.
373 All,
374 /// Last will start the consumer with the last sequence received.
375 Last,
376 /// New will only deliver new messages that are received by the JetStream server after
377 /// the consumer is created.
378 New,
379 /// ByStartSequence will look for a defined starting sequence to the consumer’s configured
380 /// opt_start_seq parameter.
381 ByStartSequence(u64),
382 /// ByStartTime will select the first message with a timestamp >= to the consumer’s
383 /// configured opt_start_time parameter.
384 ByStartTime(OffsetDateTime),
385 /// LastPerSubject will start the consumer with the last message for all subjects received.
386 LastPerSubject,
387}
388
389impl From<NatsStreamDeliverPolicy> for wit::NatsStreamDeliverPolicy {
390 fn from(value: NatsStreamDeliverPolicy) -> Self {
391 match value {
392 NatsStreamDeliverPolicy::All => wit::NatsStreamDeliverPolicy::All,
393 NatsStreamDeliverPolicy::Last => wit::NatsStreamDeliverPolicy::Last,
394 NatsStreamDeliverPolicy::New => wit::NatsStreamDeliverPolicy::New,
395 NatsStreamDeliverPolicy::ByStartSequence(seq) => wit::NatsStreamDeliverPolicy::ByStartSequence(seq),
396 NatsStreamDeliverPolicy::ByStartTime(time) => {
397 wit::NatsStreamDeliverPolicy::ByStartTimeMs((time.unix_timestamp_nanos() / 1_000_000) as u64)
398 }
399 NatsStreamDeliverPolicy::LastPerSubject => wit::NatsStreamDeliverPolicy::LastPerSubject,
400 }
401 }
402}
403
404impl NatsStreamConfig {
405 /// Creates a new JetStream consumer configuration
406 ///
407 /// # Arguments
408 ///
409 /// * `deliver_policy` - Determines how messages are delivered to the consumer
410 /// * `inactive_threshold` - Duration after which a consumer is considered inactive
411 ///
412 /// # Returns
413 ///
414 /// A new `NatsStreamConfig` with the specified settings
415 pub fn new(
416 stream_name: String,
417 consumer_name: String,
418 deliver_policy: NatsStreamDeliverPolicy,
419 inactive_threshold: Duration,
420 ) -> Self {
421 NatsStreamConfig(wit::NatsStreamConfig {
422 stream_name,
423 consumer_name,
424 durable_name: None,
425 deliver_policy: deliver_policy.into(),
426 inactive_threshold_ms: inactive_threshold.as_millis() as u64,
427 description: None,
428 })
429 }
430
431 /// Sets a durable name for the consumer
432 ///
433 /// Durable consumers maintain their state even when disconnected.
434 ///
435 /// # Arguments
436 ///
437 /// * `durable_name` - The durable name to use for this consumer
438 ///
439 /// # Returns
440 ///
441 /// The updated configuration
442 pub fn with_durable_name(mut self, durable_name: String) -> Self {
443 self.0.durable_name = Some(durable_name);
444 self
445 }
446
447 /// Sets a description for the consumer
448 ///
449 /// # Arguments
450 ///
451 /// * `description` - The description to use for this consumer
452 ///
453 /// # Returns
454 ///
455 /// The updated configuration
456 pub fn with_description(mut self, description: String) -> Self {
457 self.0.description = Some(description);
458 self
459 }
460}