use crate::types::RedisBytes;
use crate::{RsmqConnection, RsmqMessage, RsmqResult};
use core::convert::TryFrom;
use serde::{de::DeserializeOwned, Serialize};
use std::future::Future;
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Json<T>(pub T);
impl<T: Serialize> From<Json<T>> for RedisBytes {
fn from(json: Json<T>) -> RedisBytes {
let bytes = serde_json::to_vec(&json.0).expect(
"Json<T> -> RedisBytes failed; use RsmqJsonExt::send_json for fallible serialization",
);
RedisBytes::from(bytes)
}
}
impl<T: DeserializeOwned> TryFrom<RedisBytes> for Json<T> {
type Error = Vec<u8>;
fn try_from(bytes: RedisBytes) -> Result<Self, Self::Error> {
let raw = bytes.into_bytes();
match serde_json::from_slice(&raw) {
Ok(v) => Ok(Json(v)),
Err(_) => Err(raw),
}
}
}
pub trait RsmqJsonExt: RsmqConnection {
fn send_json<T: Serialize + ?Sized>(
&mut self,
qname: &str,
message: &T,
delay: Option<Duration>,
) -> impl Future<Output = RsmqResult<String>> + Send;
fn receive_json<T: DeserializeOwned>(
&mut self,
qname: &str,
hidden: Option<Duration>,
) -> impl Future<Output = RsmqResult<Option<RsmqMessage<T>>>> + Send;
fn pop_json<T: DeserializeOwned>(
&mut self,
qname: &str,
) -> impl Future<Output = RsmqResult<Option<RsmqMessage<T>>>> + Send;
}
impl<C: RsmqConnection + Send> RsmqJsonExt for C {
fn send_json<T: Serialize + ?Sized>(
&mut self,
qname: &str,
message: &T,
delay: Option<Duration>,
) -> impl Future<Output = RsmqResult<String>> + Send {
let serialized = serde_json::to_vec(message);
async move {
let bytes = serialized?;
self.send_message(qname, bytes, delay).await
}
}
async fn receive_json<T: DeserializeOwned>(
&mut self,
qname: &str,
hidden: Option<Duration>,
) -> RsmqResult<Option<RsmqMessage<T>>> {
decode_json(self.receive_message::<Vec<u8>>(qname, hidden).await?)
}
async fn pop_json<T: DeserializeOwned>(
&mut self,
qname: &str,
) -> RsmqResult<Option<RsmqMessage<T>>> {
decode_json(self.pop_message::<Vec<u8>>(qname).await?)
}
}
fn decode_json<T: DeserializeOwned>(
raw: Option<RsmqMessage<Vec<u8>>>,
) -> RsmqResult<Option<RsmqMessage<T>>> {
match raw {
None => Ok(None),
Some(msg) => {
let message: T = serde_json::from_slice(&msg.message)?;
Ok(Some(RsmqMessage {
id: msg.id,
message,
rc: msg.rc,
fr: msg.fr,
sent: msg.sent,
}))
}
}
}
#[cfg(feature = "sync")]
mod sync {
use super::decode_json;
use crate::r#trait::RsmqConnectionSync;
use crate::{RsmqMessage, RsmqResult};
use serde::{de::DeserializeOwned, Serialize};
use std::time::Duration;
pub trait RsmqJsonExtSync: RsmqConnectionSync {
fn send_json<T: Serialize + ?Sized>(
&mut self,
qname: &str,
message: &T,
delay: Option<Duration>,
) -> RsmqResult<String>;
fn receive_json<T: DeserializeOwned>(
&mut self,
qname: &str,
hidden: Option<Duration>,
) -> RsmqResult<Option<RsmqMessage<T>>>;
fn pop_json<T: DeserializeOwned>(
&mut self,
qname: &str,
) -> RsmqResult<Option<RsmqMessage<T>>>;
}
impl<C: RsmqConnectionSync> RsmqJsonExtSync for C {
fn send_json<T: Serialize + ?Sized>(
&mut self,
qname: &str,
message: &T,
delay: Option<Duration>,
) -> RsmqResult<String> {
let bytes = serde_json::to_vec(message)?;
self.send_message(qname, bytes, delay)
}
fn receive_json<T: DeserializeOwned>(
&mut self,
qname: &str,
hidden: Option<Duration>,
) -> RsmqResult<Option<RsmqMessage<T>>> {
decode_json(self.receive_message::<Vec<u8>>(qname, hidden)?)
}
fn pop_json<T: DeserializeOwned>(
&mut self,
qname: &str,
) -> RsmqResult<Option<RsmqMessage<T>>> {
decode_json(self.pop_message::<Vec<u8>>(qname)?)
}
}
}
#[cfg(feature = "sync")]
pub use sync::RsmqJsonExtSync;