1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
use std::fs::File;
use std::io::Cursor;
use std::io::Error as IoError;
use std::io::ErrorKind;
use std::io::Read;
use std::path::Path;
use tracing::debug;
use tracing::trace;
use crate::core::bytes::Buf;
use crate::core::bytes::BufMut;
use crate::core::Decoder;
use crate::core::Encoder;
use crate::core::Version;
use crate::RequestHeader;
#[derive(Debug, Default)]
pub struct ResponseMessage<P> {
pub correlation_id: i32,
pub response: P,
}
impl<P> ResponseMessage<P> {
#[allow(unused)]
pub fn from_header(header: &RequestHeader, response: P) -> Self {
Self::new(header.correlation_id(), response)
}
pub fn new(correlation_id: i32, response: P) -> Self {
Self {
correlation_id,
response,
}
}
}
impl<P> ResponseMessage<P>
where
P: Decoder,
{
pub fn decode_from<T>(src: &mut T, version: Version) -> Result<Self, IoError>
where
T: Buf,
{
let mut correlation_id: i32 = 0;
correlation_id.decode(src, version)?;
trace!("decoded correlation id: {}", correlation_id);
let response = P::decode_from(src, version)?;
Ok(ResponseMessage {
correlation_id,
response,
})
}
pub fn decode_from_file<H: AsRef<Path>>(
file_name: H,
version: Version,
) -> Result<Self, IoError> {
debug!("decoding from file: {:#?}", file_name.as_ref());
let mut f = File::open(file_name)?;
let mut buffer: [u8; 1000] = [0; 1000];
f.read_exact(&mut buffer)?;
let data = buffer.to_vec();
let mut src = Cursor::new(&data);
let mut size: i32 = 0;
size.decode(&mut src, version)?;
trace!("decoded response size: {} bytes", size);
if src.remaining() < size as usize {
return Err(IoError::new(
ErrorKind::UnexpectedEof,
"not enought for response",
));
}
Self::decode_from(&mut src, version)
}
}
impl<P> Encoder for ResponseMessage<P>
where
P: Encoder + Default,
{
fn write_size(&self, version: Version) -> usize {
self.correlation_id.write_size(version) + self.response.write_size(version)
}
fn encode<T>(&self, out: &mut T, version: Version) -> Result<(), IoError>
where
T: BufMut,
{
let len = self.write_size(version);
trace!(
"encoding kf response: {} version: {}, len: {}",
std::any::type_name::<P>(),
version,
len
);
self.correlation_id.encode(out, version)?;
self.response.encode(out, version)?;
Ok(())
}
}