fluvio_protocol_api/
response.rs

1use std::fs::File;
2use std::io::Cursor;
3use std::io::Error as IoError;
4use std::io::ErrorKind;
5use std::io::Read;
6use std::path::Path;
7
8use tracing::debug;
9use tracing::trace;
10
11use crate::core::bytes::Buf;
12use crate::core::bytes::BufMut;
13use crate::core::Decoder;
14use crate::core::Encoder;
15use crate::core::Version;
16
17use crate::RequestHeader;
18
19#[derive(Debug, Default)]
20pub struct ResponseMessage<P> {
21    pub correlation_id: i32,
22    pub response: P,
23}
24
25impl<P> ResponseMessage<P> {
26    #[allow(unused)]
27    pub fn from_header(header: &RequestHeader, response: P) -> Self {
28        Self::new(header.correlation_id(), response)
29    }
30
31    pub fn new(correlation_id: i32, response: P) -> Self {
32        Self {
33            correlation_id,
34            response,
35        }
36    }
37}
38
39impl<P> ResponseMessage<P>
40where
41    P: Decoder,
42{
43    pub fn decode_from<T>(src: &mut T, version: Version) -> Result<Self, IoError>
44    where
45        T: Buf,
46    {
47        let mut correlation_id: i32 = 0;
48        correlation_id.decode(src, version)?;
49        trace!("decoded correlation id: {}", correlation_id);
50
51        let response = P::decode_from(src, version)?;
52        Ok(ResponseMessage {
53            correlation_id,
54            response,
55        })
56    }
57
58    pub fn decode_from_file<H: AsRef<Path>>(
59        file_name: H,
60        version: Version,
61    ) -> Result<Self, IoError> {
62        debug!("decoding from file: {:#?}", file_name.as_ref());
63        let mut f = File::open(file_name)?;
64        let mut buffer: [u8; 1000] = [0; 1000];
65
66        f.read_exact(&mut buffer)?;
67        let data = buffer.to_vec();
68
69        let mut src = Cursor::new(&data);
70
71        // ResponseMessage implementation of fluvio_protocol::storage::FileWrite trait first encodes the length
72        // of the ResponseMessage
73        let mut size: i32 = 0;
74        size.decode(&mut src, version)?;
75        trace!("decoded response size: {} bytes", size);
76
77        if src.remaining() < size as usize {
78            return Err(IoError::new(
79                ErrorKind::UnexpectedEof,
80                "not enought for response",
81            ));
82        }
83        Self::decode_from(&mut src, version)
84    }
85}
86
87impl<P> Encoder for ResponseMessage<P>
88where
89    P: Encoder + Default,
90{
91    fn write_size(&self, version: Version) -> usize {
92        self.correlation_id.write_size(version) + self.response.write_size(version)
93    }
94
95    fn encode<T>(&self, out: &mut T, version: Version) -> Result<(), IoError>
96    where
97        T: BufMut,
98    {
99        let len = self.write_size(version);
100        trace!(
101            "encoding kf response: {} version: {}, len: {}",
102            std::any::type_name::<P>(),
103            version,
104            len
105        );
106        self.correlation_id.encode(out, version)?;
107        self.response.encode(out, version)?;
108        Ok(())
109    }
110}