fluvio_protocol_api/
response.rs1use std::fs::File;
2use std::io::Cursor;
3use std::io::Error as IoError;
4use std::io::ErrorKind;
5use std::io::Read;
6use std::path::Path;
7
8use tracing::debug;
9use tracing::trace;
10
11use crate::core::bytes::Buf;
12use crate::core::bytes::BufMut;
13use crate::core::Decoder;
14use crate::core::Encoder;
15use crate::core::Version;
16
17use crate::RequestHeader;
18
19#[derive(Debug, Default)]
20pub struct ResponseMessage<P> {
21 pub correlation_id: i32,
22 pub response: P,
23}
24
25impl<P> ResponseMessage<P> {
26 #[allow(unused)]
27 pub fn from_header(header: &RequestHeader, response: P) -> Self {
28 Self::new(header.correlation_id(), response)
29 }
30
31 pub fn new(correlation_id: i32, response: P) -> Self {
32 Self {
33 correlation_id,
34 response,
35 }
36 }
37}
38
39impl<P> ResponseMessage<P>
40where
41 P: Decoder,
42{
43 pub fn decode_from<T>(src: &mut T, version: Version) -> Result<Self, IoError>
44 where
45 T: Buf,
46 {
47 let mut correlation_id: i32 = 0;
48 correlation_id.decode(src, version)?;
49 trace!("decoded correlation id: {}", correlation_id);
50
51 let response = P::decode_from(src, version)?;
52 Ok(ResponseMessage {
53 correlation_id,
54 response,
55 })
56 }
57
58 pub fn decode_from_file<H: AsRef<Path>>(
59 file_name: H,
60 version: Version,
61 ) -> Result<Self, IoError> {
62 debug!("decoding from file: {:#?}", file_name.as_ref());
63 let mut f = File::open(file_name)?;
64 let mut buffer: [u8; 1000] = [0; 1000];
65
66 f.read_exact(&mut buffer)?;
67 let data = buffer.to_vec();
68
69 let mut src = Cursor::new(&data);
70
71 let mut size: i32 = 0;
74 size.decode(&mut src, version)?;
75 trace!("decoded response size: {} bytes", size);
76
77 if src.remaining() < size as usize {
78 return Err(IoError::new(
79 ErrorKind::UnexpectedEof,
80 "not enought for response",
81 ));
82 }
83 Self::decode_from(&mut src, version)
84 }
85}
86
87impl<P> Encoder for ResponseMessage<P>
88where
89 P: Encoder + Default,
90{
91 fn write_size(&self, version: Version) -> usize {
92 self.correlation_id.write_size(version) + self.response.write_size(version)
93 }
94
95 fn encode<T>(&self, out: &mut T, version: Version) -> Result<(), IoError>
96 where
97 T: BufMut,
98 {
99 let len = self.write_size(version);
100 trace!(
101 "encoding kf response: {} version: {}, len: {}",
102 std::any::type_name::<P>(),
103 version,
104 len
105 );
106 self.correlation_id.encode(out, version)?;
107 self.response.encode(out, version)?;
108 Ok(())
109 }
110}