grafbase_sdk/host_io/pubsub/
nats.rs1use crate::{types, wit, Error};
9
10pub struct NatsClient {
12 inner: wit::NatsClient,
13}
14
15impl NatsClient {
16 pub fn publish<S>(&self, subject: &str, payload: &S) -> Result<(), Box<dyn std::error::Error>>
27 where
28 S: serde::Serialize,
29 {
30 Ok(self.inner.publish(subject, &serde_json::to_vec(payload).unwrap())?)
31 }
32
33 pub fn subscribe(&self, subject: &str) -> Result<NatsSubscriber, Box<dyn std::error::Error>> {
43 Ok(self.inner.subscribe(subject).map(Into::into)?)
44 }
45}
46
47pub struct NatsSubscriber {
49 inner: wit::NatsSubscriber,
50}
51
52impl From<wit::NatsSubscriber> for NatsSubscriber {
53 fn from(inner: wit::NatsSubscriber) -> Self {
54 NatsSubscriber { inner }
55 }
56}
57
58impl NatsSubscriber {
59 pub fn next(&self) -> Option<NatsMessage> {
65 self.inner.next().map(Into::into)
66 }
67}
68
69pub struct NatsMessage {
71 inner: crate::wit::NatsMessage,
72}
73
74impl From<crate::wit::NatsMessage> for NatsMessage {
75 fn from(inner: crate::wit::NatsMessage) -> Self {
76 NatsMessage { inner }
77 }
78}
79
80impl NatsMessage {
81 pub fn payload<S>(&self) -> anyhow::Result<S>
87 where
88 S: for<'de> serde::Deserialize<'de>,
89 {
90 Ok(serde_json::from_slice(&self.inner.payload)?)
91 }
92
93 pub fn subject(&self) -> &str {
99 &self.inner.subject
100 }
101}
102
103pub fn connect(servers: impl IntoIterator<Item = impl ToString>) -> Result<NatsClient, Box<dyn std::error::Error>> {
113 let servers: Vec<_> = servers.into_iter().map(|s| s.to_string()).collect();
114 let inner = crate::wit::NatsClient::connect(&servers, None)?;
115
116 Ok(NatsClient { inner })
117}
118
119pub fn connect_with_auth(
130 servers: impl IntoIterator<Item = impl ToString>,
131 auth: &crate::NatsAuth,
132) -> Result<NatsClient, Box<dyn std::error::Error>> {
133 let servers: Vec<_> = servers.into_iter().map(|s| s.to_string()).collect();
134 let inner = crate::wit::NatsClient::connect(&servers, Some(auth))?;
135
136 Ok(NatsClient { inner })
137}
138
139impl super::Subscription for NatsSubscriber {
140 fn next(&mut self) -> Result<Option<types::FieldOutput>, Error> {
141 let item = match NatsSubscriber::next(self) {
142 Some(item) => item,
143 None => return Ok(None),
144 };
145
146 let mut field_output = types::FieldOutput::default();
147
148 let payload: serde_json::Value = item.payload().map_err(|e| Error {
149 extensions: Vec::new(),
150 message: format!("Error parsing NATS value as JSON: {e}"),
151 })?;
152
153 field_output.push_value(payload);
154
155 Ok(Some(field_output))
156 }
157}