volans_request/codec/
json.rs1use std::{io, marker::PhantomData};
2
3use async_trait::async_trait;
4use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
5use serde::{Serialize, de::DeserializeOwned};
6use volans_swarm::StreamProtocol;
7
8use crate::Codec;
9
10#[derive(Debug, Clone)]
11pub struct JsonCodec<Req, Resp> {
12 request_size_maximum: u64,
13 response_size_maximum: u64,
14 phantom: PhantomData<(Req, Resp)>,
15}
16
17impl<Req, Resp> Default for JsonCodec<Req, Resp> {
18 fn default() -> Self {
19 JsonCodec {
20 request_size_maximum: 1024 * 1024,
21 response_size_maximum: 10 * 1024 * 1024,
22 phantom: PhantomData,
23 }
24 }
25}
26
27impl<Req, Resp> JsonCodec<Req, Resp> {
28 pub fn new() -> Self {
29 Self::default()
30 }
31
32 pub fn request_size_maximum(mut self, size: u64) -> Self {
33 self.request_size_maximum = size;
34 self
35 }
36
37 pub fn response_size_maximum(mut self, size: u64) -> Self {
38 self.response_size_maximum = size;
39 self
40 }
41}
42
43#[async_trait]
44impl<Req, Resp> Codec for JsonCodec<Req, Resp>
45where
46 Req: Send + Serialize + DeserializeOwned,
47 Resp: Send + Serialize + DeserializeOwned,
48{
49 type Protocol = StreamProtocol;
50 type Request = Req;
51 type Response = Resp;
52
53 async fn read_request<T>(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result<Self::Request>
54 where
55 T: AsyncRead + Unpin + Send,
56 {
57 let mut buffer = Vec::new();
58 io.take(self.request_size_maximum)
59 .read_to_end(&mut buffer)
60 .await?;
61 Ok(serde_json::from_slice(buffer.as_slice())?)
62 }
63
64 async fn read_response<T>(
65 &mut self,
66 _: &Self::Protocol,
67 io: &mut T,
68 ) -> io::Result<Self::Response>
69 where
70 T: AsyncRead + Unpin + Send,
71 {
72 let mut buffer = Vec::new();
73 io.take(self.response_size_maximum)
74 .read_to_end(&mut buffer)
75 .await?;
76 Ok(serde_json::from_slice(buffer.as_slice())?)
77 }
78
79 async fn write_request<T>(
80 &mut self,
81 _: &Self::Protocol,
82 io: &mut T,
83 request: Self::Request,
84 ) -> io::Result<()>
85 where
86 T: AsyncWrite + Unpin + Send,
87 {
88 let data = serde_json::to_vec(&request)?;
89 io.write_all(&data).await?;
90 Ok(())
91 }
92
93 async fn write_response<T>(
94 &mut self,
95 _: &Self::Protocol,
96 io: &mut T,
97 response: Self::Response,
98 ) -> io::Result<()>
99 where
100 T: AsyncWrite + Unpin + Send,
101 {
102 let data = serde_json::to_vec(&response)?;
103 io.write_all(&data).await?;
104 Ok(())
105 }
106}