use crate::errors::CrabbyError;
use crate::event::{Event, EventParts};
use crate::publish::{PreparedPublishPayload, Publisher};
use bytes::Bytes;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
#[cfg(feature = "cbor")]
use crate::publish::cbor_payload;
#[cfg(feature = "json")]
use crate::publish::json_payload;
#[cfg(any(feature = "json", feature = "cbor"))]
use serde::de::DeserializeOwned;
pub type ExtractFuture<T> = Pin<Box<dyn Future<Output = Result<T, CrabbyError>> + Send>>;
#[derive(Clone)]
#[doc(hidden)]
pub struct RuntimeState<S> {
pub(crate) app_state: S,
pub(crate) publisher: Publisher,
}
impl<S> RuntimeState<S> {
pub(crate) fn new(app_state: S, publisher: Publisher) -> Self {
Self {
app_state,
publisher,
}
}
}
pub trait FromRef<T> {
fn from_ref(input: &T) -> Self;
}
impl<T: Clone> FromRef<T> for T {
fn from_ref(input: &T) -> Self {
input.clone()
}
}
pub trait FromEventParts<S>: Sized {
fn from_event_parts(parts: &mut EventParts, state: &S) -> ExtractFuture<Self>;
}
pub trait FromEvent<S>: Sized {
fn from_event(event: Event, state: &S) -> ExtractFuture<Self>;
}
pub struct State<T>(pub T);
pub struct Subject(pub String);
pub struct Headers(pub Option<HashMap<String, String>>);
pub struct Body(pub Bytes);
pub struct Publish(pub Publisher);
#[cfg(feature = "json")]
pub struct Json<T>(pub T);
#[cfg(feature = "cbor")]
pub struct Cbor<T>(pub T);
impl<S, T> FromEventParts<RuntimeState<S>> for State<T>
where
T: FromRef<S> + Send + 'static,
S: Send + Sync + 'static,
{
fn from_event_parts(_parts: &mut EventParts, state: &RuntimeState<S>) -> ExtractFuture<Self> {
let value = T::from_ref(&state.app_state);
Box::pin(async move { Ok(State(value)) })
}
}
impl<S> FromEventParts<S> for Subject
where
S: Send + Sync + 'static,
{
fn from_event_parts(parts: &mut EventParts, _state: &S) -> ExtractFuture<Self> {
let subject = parts.subject.clone();
Box::pin(async move { Ok(Subject(subject)) })
}
}
impl<S> FromEventParts<S> for Headers
where
S: Send + Sync + 'static,
{
fn from_event_parts(parts: &mut EventParts, _state: &S) -> ExtractFuture<Self> {
let headers = parts.headers.clone();
Box::pin(async move { Ok(Headers(headers)) })
}
}
impl<S> FromEventParts<RuntimeState<S>> for Publish
where
S: Send + Sync + 'static,
{
fn from_event_parts(_parts: &mut EventParts, state: &RuntimeState<S>) -> ExtractFuture<Self> {
let publisher = state.publisher.clone();
Box::pin(async move { Ok(Publish(publisher)) })
}
}
impl<S> FromEvent<S> for Body
where
S: Send + Sync + 'static,
{
fn from_event(event: Event, _state: &S) -> ExtractFuture<Self> {
Box::pin(async move { Ok(Body(event.payload)) })
}
}
#[cfg(feature = "json")]
impl<S, T> FromEvent<S> for Json<T>
where
S: Send + Sync + 'static,
T: DeserializeOwned + Send + 'static,
{
fn from_event(event: Event, _state: &S) -> ExtractFuture<Self> {
Box::pin(async move {
let value = serde_json::from_slice(&event.payload)?;
Ok(Json(value))
})
}
}
#[cfg(feature = "cbor")]
impl<S, T> FromEvent<S> for Cbor<T>
where
S: Send + Sync + 'static,
T: DeserializeOwned + Send + 'static,
{
fn from_event(event: Event, _state: &S) -> ExtractFuture<Self> {
Box::pin(async move {
let value = ciborium::from_reader(event.payload.as_ref())?;
Ok(Cbor(value))
})
}
}
impl crate::publish::IntoPublishPayload for Body {
fn into_publish_payload(self) -> Result<PreparedPublishPayload, CrabbyError> {
Ok(PreparedPublishPayload {
payload: self.0.to_vec(),
headers: None,
})
}
}
#[cfg(feature = "json")]
impl<T> crate::publish::IntoPublishPayload for Json<T>
where
T: serde::Serialize,
{
fn into_publish_payload(self) -> Result<PreparedPublishPayload, CrabbyError> {
json_payload(self.0)
}
}
#[cfg(feature = "cbor")]
impl<T> crate::publish::IntoPublishPayload for Cbor<T>
where
T: serde::Serialize,
{
fn into_publish_payload(self) -> Result<PreparedPublishPayload, CrabbyError> {
cbor_payload(self.0)
}
}