use std::marker::PhantomData;
use bytes::Bytes;
use crate::{
context::{InboundContext, OutboundContext},
traits::{Flow, Inbound, Outbound},
Error, Result,
};
pub struct JsonDecode<T> {
_marker: PhantomData<fn() -> T>,
}
impl<T> JsonDecode<T> {
pub fn new() -> Self {
Self {
_marker: PhantomData,
}
}
}
impl<T> Default for JsonDecode<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> Inbound<String> for JsonDecode<T>
where
T: for<'de> serde::Deserialize<'de> + Send + 'static,
{
type Out = T;
async fn read(&mut self, _ctx: &mut InboundContext, msg: String) -> Result<Flow<Self::Out>> {
decode_json(msg.as_bytes())
}
}
impl<T> Inbound<Bytes> for JsonDecode<T>
where
T: for<'de> serde::Deserialize<'de> + Send + 'static,
{
type Out = T;
async fn read(&mut self, _ctx: &mut InboundContext, msg: Bytes) -> Result<Flow<Self::Out>> {
decode_json(&msg)
}
}
pub struct JsonEncode<T> {
_marker: PhantomData<fn() -> T>,
}
impl<T> JsonEncode<T> {
pub fn new() -> Self {
Self {
_marker: PhantomData,
}
}
}
impl<T> Default for JsonEncode<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> Outbound<T> for JsonEncode<T>
where
T: serde::Serialize + Send + 'static,
{
type Out = String;
async fn write(&mut self, _ctx: &mut OutboundContext, msg: T) -> Result<Flow<Self::Out>> {
sonic_rs::to_string(&msg)
.map(Flow::Next)
.map_err(|err| Error::Encode(format!("json encode failed: {err}")))
}
}
fn decode_json<T>(src: &[u8]) -> Result<Flow<T>>
where
T: for<'de> serde::Deserialize<'de>,
{
sonic_rs::from_slice(src)
.map(Flow::Next)
.map_err(|err| Error::Decode(format!("json decode failed: {err}")))
}
#[cfg(test)]
mod tests {
use crate::{
codec::{JsonDecode, JsonEncode, LineCodec},
pipeline,
traits::{Flow, Inbound, Outbound},
Context, Handler, InboundContext, OutboundContext, Result,
};
#[derive(Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)]
struct Request {
value: String,
}
#[derive(Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)]
struct Response {
value: String,
}
struct Echo;
impl Handler<Request> for Echo {
type Write = Response;
async fn read(&mut self, ctx: &mut Context<Self::Write>, msg: Request) -> Result<()> {
ctx.write(Response { value: msg.value }).await
}
}
#[tokio::test]
async fn decodes_json_from_string() {
let mut decoder = JsonDecode::<Request>::new();
let mut ctx = InboundContext::new_datagram(crate::DatagramInfo::new(
1,
"127.0.0.1:1".parse().unwrap(),
"127.0.0.1:2".parse().unwrap(),
));
let decoded = decoder
.read(&mut ctx, r#"{"value":"hello"}"#.to_string())
.await
.unwrap();
assert!(matches!(
decoded,
Flow::Next(Request { value }) if value == "hello"
));
}
#[tokio::test]
async fn encodes_json_to_string() {
let mut encoder = JsonEncode::<Response>::new();
let mut ctx = OutboundContext::new_datagram(crate::DatagramInfo::new(
1,
"127.0.0.1:1".parse().unwrap(),
"127.0.0.1:2".parse().unwrap(),
));
let encoded = encoder
.write(
&mut ctx,
Response {
value: "hello".to_string(),
},
)
.await
.unwrap();
assert!(matches!(encoded, Flow::Next(json) if json == r#"{"value":"hello"}"#));
}
#[test]
fn composes_with_line_codec() {
let _pipeline = pipeline()
.codec(LineCodec::new())
.inbound(JsonDecode::<Request>::new())
.handler(Echo)
.outbound(JsonEncode::<Response>::new());
}
}