kf_protocol_api/
response.rs1
2use std::fs::File;
3use std::io::Cursor;
4use std::io::Error as IoError;
5use std::io::ErrorKind;
6use std::io::Read;
7use std::path::Path;
8
9use log::debug;
10use log::trace;
11
12use kf_protocol::bytes::Buf;
13use kf_protocol::bytes::BufMut;
14use kf_protocol::Decoder;
15use kf_protocol::Encoder;
16use kf_protocol::Version;
17
18use crate::RequestHeader;
19
20#[derive(Debug, Default)]
21pub struct ResponseMessage<P> {
22 pub correlation_id: i32,
23 pub response: P,
24}
25
26
27impl<P> ResponseMessage<P> {
28
29
30 pub fn from_header(header: &RequestHeader,response: P) -> Self {
31 Self::new(header.correlation_id(),response)
32 }
33
34 pub fn new(correlation_id: i32, response: P) -> Self {
35 Self {
36 correlation_id,
37 response,
38 }
39 }
40
41}
42
43
44
45impl<P> ResponseMessage<P>
46 where P: Decoder
47{
48 pub fn decode_from<T>(src: &mut T,version: Version) -> Result<Self,IoError>
49 where T:Buf {
50
51 let mut correlation_id: i32 = 0;
52 correlation_id.decode(src,version)?;
53 trace!("decoded correlation id: {}",correlation_id);
54
55 let response = P::decode_from(src, version)?;
56 Ok(ResponseMessage {
57 correlation_id,
58 response
59 })
60
61 }
62
63
64 pub fn decode_from_file<H: AsRef<Path>>(
65 file_name: H,
66 version: Version
67 ) -> Result<Self, IoError> {
68
69 debug!("decoding from file: {:#?}", file_name.as_ref());
70 let mut f = File::open(file_name)?;
71 let mut buffer: [u8; 1000] = [0; 1000];
72
73 f.read(&mut buffer)?;
74 let data = buffer.to_vec();
75
76 let mut src = Cursor::new(&data);
77
78 let mut size: i32 = 0;
79 size.decode(&mut src,version)?;
80 trace!("decoded response size: {} bytes", size);
81
82 if src.remaining() < size as usize {
83 return Err(IoError::new(
84 ErrorKind::UnexpectedEof,
85 "not enought for response",
86 ));
87 }
88
89 Self::decode_from(&mut src,version)
90 }
91
92}
93
94impl<P> Encoder for ResponseMessage<P>
95where
96 P: Encoder + Default,
97{
98 fn write_size(&self,version: Version) -> usize {
99 self.correlation_id.write_size(version) + self.response.write_size(version)
100 }
101
102 fn encode<T>(&self, out: &mut T,version: Version) -> Result<(), IoError>
103 where
104 T: BufMut,
105 {
106 let len = self.write_size(version) as i32;
107 trace!("encoding kf response: {} version: {}, len: {}", std::any::type_name::<P>(),version, len);
108 len.encode(out,version)?;
109 self.correlation_id.encode(out,version)?;
110 self.response.encode(out,version)?;
111 Ok(())
112 }
113}