use crate::{
SdkError, Subscription,
types::{Error, Response, SubscriptionItem},
wit,
};
use std::time::Duration;
pub use time::OffsetDateTime;
pub use wit::NatsAuth;
pub struct NatsClient {
inner: wit::NatsClient,
}
impl NatsClient {
pub fn publish<S>(&self, subject: &str, payload: &S) -> Result<(), SdkError>
where
S: serde::Serialize,
{
self.inner
.publish(subject, &serde_json::to_vec(payload)?)
.map_err(Into::into)
}
pub fn request<S, T>(&self, subject: &str, payload: &S, timeout: Option<Duration>) -> Result<T, SdkError>
where
S: serde::Serialize,
T: for<'de> serde::Deserialize<'de>,
{
let body = serde_json::to_vec(payload).unwrap();
let response = self.request_bytes(subject, &body, timeout)?;
Ok(serde_json::from_slice(&response)?)
}
pub fn request_bytes(&self, subject: &str, body: &[u8], timeout: Option<Duration>) -> Result<Vec<u8>, SdkError> {
let timeout = timeout.map(|t| t.as_millis() as u64);
let response = self.inner.request(subject, body, timeout)?;
Ok(response.payload)
}
pub fn subscribe(&self, subject: &str, config: Option<NatsStreamConfig>) -> Result<NatsSubscription, SdkError> {
let subscription = self
.inner
.subscribe(subject, config.map(Into::into).as_ref())
.map(Into::into)?;
Ok(subscription)
}
pub fn key_value(&self, bucket: &str) -> Result<NatsKeyValue, SdkError> {
let store = self.inner.key_value(bucket)?;
Ok(store.into())
}
}
pub struct NatsKeyValue {
inner: wit::NatsKeyValue,
}
impl From<wit::NatsKeyValue> for NatsKeyValue {
fn from(inner: wit::NatsKeyValue) -> Self {
NatsKeyValue { inner }
}
}
impl NatsKeyValue {
pub fn get<S>(&self, key: &str) -> Result<Option<S>, SdkError>
where
S: for<'a> serde::Deserialize<'a>,
{
match self.get_bytes(key)? {
Some(ref value) => Ok(Some(serde_json::from_slice(value)?)),
None => Ok(None),
}
}
pub fn get_bytes(&self, key: &str) -> Result<Option<Vec<u8>>, SdkError> {
match self.inner.get(key)? {
Some(value) => Ok(Some(value)),
None => Ok(None),
}
}
pub fn put<S>(&self, key: &str, value: &S) -> Result<u64, SdkError>
where
S: serde::Serialize,
{
let value = serde_json::to_vec(value)?;
self.put_bytes(key, &value)
}
pub fn put_bytes(&self, key: &str, value: &[u8]) -> Result<u64, SdkError> {
Ok(self.inner.put(key, value)?)
}
pub fn create<S>(&self, key: &str, value: &S) -> Result<u64, SdkError>
where
S: serde::Serialize,
{
let value = serde_json::to_vec(value)?;
self.create_bytes(key, &value)
}
pub fn create_bytes(&self, key: &str, value: &[u8]) -> Result<u64, SdkError> {
Ok(self.inner.create(key, value)?)
}
pub fn delete(&self, key: &str) -> Result<(), SdkError> {
Ok(self.inner.delete(key)?)
}
}
pub struct NatsSubscription {
inner: wit::NatsSubscriber,
}
impl From<wit::NatsSubscriber> for NatsSubscription {
fn from(inner: wit::NatsSubscriber) -> Self {
NatsSubscription { inner }
}
}
impl NatsSubscription {
pub fn next(&self) -> Result<Option<NatsMessage>, SdkError> {
self.inner.next().map_err(Into::into).map(|msg| msg.map(Into::into))
}
}
pub struct NatsMessage {
inner: crate::wit::NatsMessage,
}
impl From<crate::wit::NatsMessage> for NatsMessage {
fn from(inner: crate::wit::NatsMessage) -> Self {
NatsMessage { inner }
}
}
impl NatsMessage {
pub fn payload<S>(&self) -> Result<S, SdkError>
where
S: for<'de> serde::Deserialize<'de>,
{
Ok(serde_json::from_slice(self.payload_bytes())?)
}
pub fn payload_bytes(&self) -> &[u8] {
&self.inner.payload
}
pub fn subject(&self) -> &str {
&self.inner.subject
}
}
pub fn connect(servers: impl IntoIterator<Item = impl ToString>) -> Result<NatsClient, SdkError> {
let servers: Vec<_> = servers.into_iter().map(|s| s.to_string()).collect();
let inner = crate::wit::NatsClient::connect(&servers, None)?;
Ok(NatsClient { inner })
}
pub fn connect_with_auth(
servers: impl IntoIterator<Item = impl ToString>,
auth: &NatsAuth,
) -> Result<NatsClient, SdkError> {
let servers: Vec<_> = servers.into_iter().map(|s| s.to_string()).collect();
let inner = crate::wit::NatsClient::connect(&servers, Some(auth))?;
Ok(NatsClient { inner })
}
impl Subscription for NatsSubscription {
fn next(&mut self) -> Result<Option<SubscriptionItem>, Error> {
match NatsSubscription::next(self) {
Ok(Some(msg)) => Ok(Some(Response::json(msg.inner.payload).into())),
Ok(None) => Ok(None),
Err(err) => Err(format!("Error receiving NATS message: {err}").into()),
}
}
}
pub struct NatsStreamConfig(wit::NatsStreamConfig);
impl From<NatsStreamConfig> for wit::NatsStreamConfig {
fn from(config: NatsStreamConfig) -> Self {
config.0
}
}
#[derive(Debug)]
pub enum NatsStreamDeliverPolicy {
All,
Last,
New,
ByStartSequence(u64),
ByStartTime(OffsetDateTime),
LastPerSubject,
}
impl From<NatsStreamDeliverPolicy> for wit::NatsStreamDeliverPolicy {
fn from(value: NatsStreamDeliverPolicy) -> Self {
match value {
NatsStreamDeliverPolicy::All => wit::NatsStreamDeliverPolicy::All,
NatsStreamDeliverPolicy::Last => wit::NatsStreamDeliverPolicy::Last,
NatsStreamDeliverPolicy::New => wit::NatsStreamDeliverPolicy::New,
NatsStreamDeliverPolicy::ByStartSequence(seq) => wit::NatsStreamDeliverPolicy::ByStartSequence(seq),
NatsStreamDeliverPolicy::ByStartTime(time) => {
wit::NatsStreamDeliverPolicy::ByStartTimeMs((time.unix_timestamp_nanos() / 1_000_000) as u64)
}
NatsStreamDeliverPolicy::LastPerSubject => wit::NatsStreamDeliverPolicy::LastPerSubject,
}
}
}
impl NatsStreamConfig {
pub fn new(
stream_name: String,
consumer_name: String,
deliver_policy: NatsStreamDeliverPolicy,
inactive_threshold: Duration,
) -> Self {
NatsStreamConfig(wit::NatsStreamConfig {
stream_name,
consumer_name,
durable_name: None,
deliver_policy: deliver_policy.into(),
inactive_threshold_ms: inactive_threshold.as_millis() as u64,
description: None,
})
}
pub fn with_durable_name(mut self, durable_name: String) -> Self {
self.0.durable_name = Some(durable_name);
self
}
pub fn with_description(mut self, description: String) -> Self {
self.0.description = Some(description);
self
}
}