use crate::brokers::NatsBroker;
use crate::brokers::base::HeaderMap;
use crate::errors::CrabbyError;
use crate::publish::base::{
IntoPublishPayload, PreparedPublishPayload, PublishRequest, Publisher, Request, merge_headers,
};
use std::future::Future;
use std::future::IntoFuture;
use std::pin::Pin;
type JsPublishFuture = Pin<Box<dyn Future<Output = Result<NatsPublishAck, CrabbyError>> + Send>>;
pub type NatsPublishAck = async_nats::jetstream::publish::PublishAck;
#[derive(Clone)]
pub struct NatsPublisher {
core: Publisher,
jetstream: async_nats::jetstream::Context,
}
impl NatsPublisher {
pub fn new(broker: NatsBroker) -> Self {
let core = Publisher::new(broker.clone());
let jetstream = async_nats::jetstream::new(broker.client());
Self { core, jetstream }
}
pub fn core(&self) -> Publisher {
self.core.clone()
}
pub fn publish<P>(&self, subject: &str, payload: P) -> PublishRequest
where
P: IntoPublishPayload,
{
self.core.publish(subject, payload)
}
pub fn request<P>(&self, subject: &str, payload: P) -> Request
where
P: IntoPublishPayload,
{
self.core.request(subject, payload)
}
pub fn js_publish<P>(&self, subject: &str, payload: P) -> NatsJsPublishRequest
where
P: IntoPublishPayload,
{
NatsJsPublishRequest {
jetstream: self.jetstream.clone(),
subject: subject.to_string(),
prepared: payload.into_publish_payload(),
extra_headers: None,
}
}
}
pub struct NatsJsPublishRequest {
jetstream: async_nats::jetstream::Context,
subject: String,
prepared: Result<PreparedPublishPayload, CrabbyError>,
extra_headers: Option<HeaderMap>,
}
impl NatsJsPublishRequest {
pub fn headers(mut self, headers: HeaderMap) -> Self {
self.extra_headers = Some(headers);
self
}
}
impl IntoFuture for NatsJsPublishRequest {
type Output = Result<NatsPublishAck, CrabbyError>;
type IntoFuture = JsPublishFuture;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
let prepared = self.prepared?;
let headers = merge_headers(prepared.headers, self.extra_headers);
let ack = if let Some(headers) = headers {
let mut nats_headers = async_nats::HeaderMap::new();
for (key, value) in headers {
nats_headers.insert(key.as_str(), value.as_str());
}
self.jetstream
.publish_with_headers(self.subject, nats_headers, prepared.payload.into())
.await?
.await?
} else {
self.jetstream
.publish(self.subject, prepared.payload.into())
.await?
.await?
};
Ok(ack)
})
}
}