use crate::brokers::Broker;
use crate::brokers::base::{BrokerMessage, HeaderMap};
use crate::errors::CrabbyError;
use bytes::Bytes;
#[cfg(any(feature = "json", feature = "cbor"))]
use serde::Serialize;
use serde::de::DeserializeOwned;
use std::future::Future;
use std::future::IntoFuture;
use std::pin::Pin;
use std::sync::Arc;
type PublishFuture = Pin<Box<dyn Future<Output = Result<(), CrabbyError>> + Send>>;
type RequestFuture = Pin<Box<dyn Future<Output = Result<Reply, CrabbyError>> + Send>>;
#[doc(hidden)]
pub struct PreparedPublishPayload {
pub(crate) payload: Vec<u8>,
pub(crate) headers: Option<HeaderMap>,
}
trait PublishBackend: Send + Sync {
fn publish(
&self,
subject: String,
payload: Vec<u8>,
headers: Option<HeaderMap>,
) -> PublishFuture;
fn request(
&self,
subject: String,
payload: Vec<u8>,
headers: Option<HeaderMap>,
) -> RequestFuture;
}
impl<B> PublishBackend for B
where
B: Broker + Clone,
{
fn publish(
&self,
subject: String,
payload: Vec<u8>,
headers: Option<HeaderMap>,
) -> PublishFuture {
let broker = self.clone();
Box::pin(async move { broker.publish(&subject, &payload, headers.as_ref()).await })
}
fn request(
&self,
subject: String,
payload: Vec<u8>,
headers: Option<HeaderMap>,
) -> RequestFuture {
let broker = self.clone();
Box::pin(async move {
let reply = broker.request(&subject, &payload, headers.as_ref()).await?;
Ok(Reply::from_broker_message(reply))
})
}
}
#[derive(Clone)]
pub struct Publisher {
inner: Arc<dyn PublishBackend>,
}
impl Publisher {
pub fn new<B>(broker: B) -> Self
where
B: Broker + Clone,
{
Self {
inner: Arc::new(broker),
}
}
pub fn publish<P>(&self, subject: &str, payload: P) -> PublishRequest
where
P: IntoPublishPayload,
{
PublishRequest {
publisher: self.clone(),
subject: subject.to_string(),
prepared: payload.into_publish_payload(),
extra_headers: None,
}
}
pub fn request<P>(&self, subject: &str, payload: P) -> Request
where
P: IntoPublishPayload,
{
Request {
publisher: self.clone(),
subject: subject.to_string(),
prepared: payload.into_publish_payload(),
extra_headers: None,
}
}
}
#[doc(hidden)]
pub trait IntoPublishPayload {
fn into_publish_payload(self) -> Result<PreparedPublishPayload, CrabbyError>;
}
pub struct PublishRequest {
publisher: Publisher,
subject: String,
prepared: Result<PreparedPublishPayload, CrabbyError>,
extra_headers: Option<HeaderMap>,
}
impl PublishRequest {
pub fn headers(mut self, headers: HeaderMap) -> Self {
self.extra_headers = Some(headers);
self
}
}
pub struct Request {
publisher: Publisher,
subject: String,
prepared: Result<PreparedPublishPayload, CrabbyError>,
extra_headers: Option<HeaderMap>,
}
impl Request {
pub fn headers(mut self, headers: HeaderMap) -> Self {
self.extra_headers = Some(headers);
self
}
}
impl IntoFuture for PublishRequest {
type Output = Result<(), CrabbyError>;
type IntoFuture = PublishFuture;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
let prepared = self.prepared?;
let headers = merge_headers(prepared.headers, self.extra_headers);
self.publisher
.inner
.publish(self.subject, prepared.payload, headers)
.await
})
}
}
impl IntoFuture for Request {
type Output = Result<Reply, CrabbyError>;
type IntoFuture = RequestFuture;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
let prepared = self.prepared?;
let headers = merge_headers(prepared.headers, self.extra_headers);
self.publisher
.inner
.request(self.subject, prepared.payload, headers)
.await
})
}
}
pub struct Reply {
subject: String,
payload: Vec<u8>,
headers: Option<HeaderMap>,
}
impl Reply {
fn from_broker_message(message: BrokerMessage) -> Self {
Self {
subject: message.subject,
payload: message.payload,
headers: message.headers,
}
}
pub fn subject(&self) -> &str {
&self.subject
}
pub fn payload(&self) -> &[u8] {
&self.payload
}
pub fn headers(&self) -> Option<&HeaderMap> {
self.headers.as_ref()
}
pub fn into_bytes(self) -> Vec<u8> {
self.payload
}
#[cfg(feature = "json")]
pub fn into_json<T>(self) -> Result<T, CrabbyError>
where
T: DeserializeOwned,
{
Ok(serde_json::from_slice(&self.payload)?)
}
#[cfg(feature = "cbor")]
pub fn into_cbor<T>(self) -> Result<T, CrabbyError>
where
T: DeserializeOwned,
{
Ok(ciborium::from_reader(self.payload.as_slice())?)
}
}
pub fn merge_headers(base: Option<HeaderMap>, extra: Option<HeaderMap>) -> Option<HeaderMap> {
match (base, extra) {
(None, None) => None,
(Some(headers), None) | (None, Some(headers)) => Some(headers),
(Some(mut base), Some(extra)) => {
base.extend(extra);
Some(base)
}
}
}
impl IntoPublishPayload for Vec<u8> {
fn into_publish_payload(self) -> Result<PreparedPublishPayload, CrabbyError> {
Ok(PreparedPublishPayload {
payload: self,
headers: None,
})
}
}
impl IntoPublishPayload for Bytes {
fn into_publish_payload(self) -> Result<PreparedPublishPayload, CrabbyError> {
Ok(PreparedPublishPayload {
payload: self.to_vec(),
headers: None,
})
}
}
impl IntoPublishPayload for String {
fn into_publish_payload(self) -> Result<PreparedPublishPayload, CrabbyError> {
Ok(PreparedPublishPayload {
payload: self.into_bytes(),
headers: None,
})
}
}
impl IntoPublishPayload for &str {
fn into_publish_payload(self) -> Result<PreparedPublishPayload, CrabbyError> {
Ok(PreparedPublishPayload {
payload: self.as_bytes().to_vec(),
headers: None,
})
}
}
#[cfg(feature = "json")]
pub fn json_payload<T>(value: T) -> Result<PreparedPublishPayload, CrabbyError>
where
T: Serialize,
{
let mut headers = HeaderMap::new();
headers.insert("content-type".to_string(), "application/json".to_string());
Ok(PreparedPublishPayload {
payload: serde_json::to_vec(&value)?,
headers: Some(headers),
})
}
#[cfg(feature = "cbor")]
pub fn cbor_payload<T>(value: T) -> Result<PreparedPublishPayload, CrabbyError>
where
T: Serialize,
{
let mut headers = HeaderMap::new();
headers.insert("content-type".to_string(), "application/cbor".to_string());
let mut payload = Vec::new();
ciborium::into_writer(&value, &mut payload)?;
Ok(PreparedPublishPayload {
payload,
headers: Some(headers),
})
}