use std::{
fmt::{self, Debug},
future::IntoFuture,
};
use bytes::Bytes;
use serde::Serialize;
use watermelon_proto::{
MessageBase, Subject,
headers::{HeaderMap, HeaderName, HeaderValue},
};
use crate::{
client::{Client, ClientClosedError, TryCommandError},
handler::HandlerCommand,
util::BoxFuture,
};
use super::Request;
#[derive(Debug, Clone)]
pub struct Publish {
pub(super) subject: Subject,
pub(super) reply_subject: Option<Subject>,
pub(super) headers: HeaderMap,
pub(super) payload: Bytes,
}
#[derive(Debug)]
pub struct PublishBuilder {
publish: Publish,
}
pub struct ClientPublish<'a> {
client: &'a Client,
publish: Publish,
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct DoClientPublish<'a> {
client: &'a Client,
publish: Publish,
}
pub struct OwnedClientPublish {
client: Client,
publish: Publish,
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct DoOwnedClientPublish {
client: Client,
publish: Publish,
}
macro_rules! publish {
($payload_t:ty) => {
#[must_use]
pub fn reply_subject(mut self, reply_subject: Option<Subject>) -> Self {
self.publish_mut().reply_subject = reply_subject;
self
}
#[must_use]
pub fn header(mut self, name: HeaderName, value: HeaderValue) -> Self {
self.publish_mut().headers.insert(name, value);
self
}
#[must_use]
pub fn headers(mut self, headers: HeaderMap) -> Self {
self.publish_mut().headers = headers;
self
}
pub fn payload_json<T: Serialize>(
self,
payload: &T,
) -> Result<$payload_t, serde_json::Error> {
let payload = serde_json::to_vec(payload)?;
Ok(self.payload(payload.into()))
}
};
}
impl Publish {
#[must_use]
pub fn builder(subject: Subject) -> PublishBuilder {
PublishBuilder::subject(subject)
}
pub fn client(self, client: &Client) -> DoClientPublish<'_> {
DoClientPublish {
client,
publish: self,
}
}
pub fn client_owned(self, client: Client) -> DoOwnedClientPublish {
DoOwnedClientPublish {
client,
publish: self,
}
}
pub fn into_request(self) -> Request {
Request {
publish: self,
response_timeout: None,
}
}
fn into_message_base(self) -> MessageBase {
let Self {
subject,
reply_subject,
headers,
payload,
} = self;
MessageBase {
subject,
reply_subject,
headers,
payload,
}
}
}
impl PublishBuilder {
#[must_use]
pub fn subject(subject: Subject) -> Self {
Self {
publish: Publish {
subject,
reply_subject: None,
headers: HeaderMap::new(),
payload: Bytes::new(),
},
}
}
publish!(Publish);
#[must_use]
pub fn payload(mut self, payload: Bytes) -> Publish {
self.publish.payload = payload;
self.publish
}
fn publish_mut(&mut self) -> &mut Publish {
&mut self.publish
}
}
impl<'a> ClientPublish<'a> {
pub(crate) fn build(client: &'a Client, subject: Subject) -> Self {
Self {
client,
publish: PublishBuilder::subject(subject).publish,
}
}
publish!(DoClientPublish<'a>);
pub fn payload(mut self, payload: Bytes) -> DoClientPublish<'a> {
self.publish.payload = payload;
self.publish.client(self.client)
}
#[must_use]
pub fn to_owned(self) -> OwnedClientPublish {
OwnedClientPublish {
client: self.client.clone(),
publish: self.publish,
}
}
fn publish_mut(&mut self) -> &mut Publish {
&mut self.publish
}
}
impl OwnedClientPublish {
pub(crate) fn build(client: Client, subject: Subject) -> Self {
Self {
client,
publish: PublishBuilder::subject(subject).publish,
}
}
publish!(DoOwnedClientPublish);
pub fn payload(mut self, payload: Bytes) -> DoOwnedClientPublish {
self.publish.payload = payload;
self.publish.client_owned(self.client)
}
fn publish_mut(&mut self) -> &mut Publish {
&mut self.publish
}
}
impl DoClientPublish<'_> {
pub fn try_publish(self) -> Result<(), TryCommandError> {
try_publish(self.client, self.publish)
}
}
impl<'a> IntoFuture for DoClientPublish<'a> {
type Output = Result<(), ClientClosedError>;
type IntoFuture = BoxFuture<'a, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move { publish(self.client, self.publish).await })
}
}
impl DoOwnedClientPublish {
pub fn try_publish(self) -> Result<(), TryCommandError> {
try_publish(&self.client, self.publish)
}
}
impl IntoFuture for DoOwnedClientPublish {
type Output = Result<(), ClientClosedError>;
type IntoFuture = BoxFuture<'static, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move { publish(&self.client, self.publish).await })
}
}
fn try_publish(client: &Client, publish: Publish) -> Result<(), TryCommandError> {
client.try_enqueue_command(HandlerCommand::Publish {
message: publish.into_message_base(),
})
}
async fn publish(client: &Client, publish: Publish) -> Result<(), ClientClosedError> {
client
.enqueue_command(HandlerCommand::Publish {
message: publish.into_message_base(),
})
.await
}
impl Debug for ClientPublish<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ClientPublish")
.field("publish", &self.publish)
.finish_non_exhaustive()
}
}
impl Debug for DoClientPublish<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DoClientPublish")
.field("publish", &self.publish)
.finish_non_exhaustive()
}
}
impl Debug for OwnedClientPublish {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("OwnedClientPublish")
.field("publish", &self.publish)
.finish_non_exhaustive()
}
}
impl Debug for DoOwnedClientPublish {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DoOwnedClientPublish")
.field("publish", &self.publish)
.finish_non_exhaustive()
}
}