volans_request/codec/
json.rs

1use std::{io, marker::PhantomData};
2
3use async_trait::async_trait;
4use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
5use serde::{Serialize, de::DeserializeOwned};
6use volans_swarm::StreamProtocol;
7
8use crate::Codec;
9
10#[derive(Debug, Clone)]
11pub struct JsonCodec<Req, Resp> {
12    request_size_maximum: u64,
13    response_size_maximum: u64,
14    phantom: PhantomData<(Req, Resp)>,
15}
16
17impl<Req, Resp> Default for JsonCodec<Req, Resp> {
18    fn default() -> Self {
19        JsonCodec {
20            request_size_maximum: 1024 * 1024,
21            response_size_maximum: 10 * 1024 * 1024,
22            phantom: PhantomData,
23        }
24    }
25}
26
27impl<Req, Resp> JsonCodec<Req, Resp> {
28    pub fn new() -> Self {
29        Self::default()
30    }
31
32    pub fn request_size_maximum(mut self, size: u64) -> Self {
33        self.request_size_maximum = size;
34        self
35    }
36
37    pub fn response_size_maximum(mut self, size: u64) -> Self {
38        self.response_size_maximum = size;
39        self
40    }
41}
42
43#[async_trait]
44impl<Req, Resp> Codec for JsonCodec<Req, Resp>
45where
46    Req: Send + Serialize + DeserializeOwned,
47    Resp: Send + Serialize + DeserializeOwned,
48{
49    type Protocol = StreamProtocol;
50    type Request = Req;
51    type Response = Resp;
52
53    async fn read_request<T>(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result<Self::Request>
54    where
55        T: AsyncRead + Unpin + Send,
56    {
57        let mut buffer = Vec::new();
58        io.take(self.request_size_maximum)
59            .read_to_end(&mut buffer)
60            .await?;
61        Ok(serde_json::from_slice(buffer.as_slice())?)
62    }
63
64    async fn read_response<T>(
65        &mut self,
66        _: &Self::Protocol,
67        io: &mut T,
68    ) -> io::Result<Self::Response>
69    where
70        T: AsyncRead + Unpin + Send,
71    {
72        let mut buffer = Vec::new();
73        io.take(self.response_size_maximum)
74            .read_to_end(&mut buffer)
75            .await?;
76        Ok(serde_json::from_slice(buffer.as_slice())?)
77    }
78
79    async fn write_request<T>(
80        &mut self,
81        _: &Self::Protocol,
82        io: &mut T,
83        request: Self::Request,
84    ) -> io::Result<()>
85    where
86        T: AsyncWrite + Unpin + Send,
87    {
88        let data = serde_json::to_vec(&request)?;
89        io.write_all(&data).await?;
90        Ok(())
91    }
92
93    async fn write_response<T>(
94        &mut self,
95        _: &Self::Protocol,
96        io: &mut T,
97        response: Self::Response,
98    ) -> io::Result<()>
99    where
100        T: AsyncWrite + Unpin + Send,
101    {
102        let data = serde_json::to_vec(&response)?;
103        io.write_all(&data).await?;
104        Ok(())
105    }
106}