1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#![feature(generators)]
mod file_fetch;
mod file_produce;
pub use file_fetch::FilePartitionResponse;
pub use file_fetch::FileFetchResponse;
pub use file_fetch::FileTopicResponse;
pub use file_fetch::KfFileFetchRequest;
pub use file_produce::FileProduceRequest;
pub use file_produce::FileTopicRequest;
pub use file_produce::FilePartitionRequest;
pub use file_fetch::KfFileRecordSet;
use std::io::Error as IoError;
use log::trace;
use flv_future_aio::bytes::Bytes;
use flv_future_aio::bytes::BytesMut;
use flv_future_aio::fs::AsyncFileSlice;
use kf_protocol_core::Version;
use kf_protocol_api::Request;
use kf_protocol_api::RequestMessage;
use kf_protocol_api::ResponseMessage;
use kf_protocol_core::Encoder;
pub enum StoreValue {
Bytes(Bytes),
FileSlice(AsyncFileSlice),
}
impl std::fmt::Debug for StoreValue {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
StoreValue::Bytes(bytes) => write!(f, "StoreValue:Bytes with len: {}", bytes.len()),
StoreValue::FileSlice(slice) => write!(f, "StoreValue:FileSlice: {:#?}", slice),
}
}
}
pub trait FileWrite: Encoder {
fn file_encode(
&self,
src: &mut BytesMut,
_data: &mut Vec<StoreValue>,
version: Version,
) -> Result<(), IoError> {
self.encode(src, version)
}
}
impl<M> FileWrite for Vec<M>
where
M: FileWrite,
{
fn file_encode(
&self,
src: &mut BytesMut,
data: &mut Vec<StoreValue>,
version: Version,
) -> Result<(), IoError> {
let len: i32 = self.len() as i32;
len.encode(src, version)?;
for v in self {
v.file_encode(src, data, version)?;
}
Ok(())
}
}
impl<P> FileWrite for ResponseMessage<P>
where
P: FileWrite + Default,
{
fn file_encode(
&self,
dest: &mut BytesMut,
data: &mut Vec<StoreValue>,
version: Version,
) -> Result<(), IoError> {
let len = self.write_size(version) as i32;
log::debug!("encoding file write response: {} version: {}, len: {}", std::any::type_name::<P>(),version, len);
len.encode(dest, version)?;
trace!("encoding response correlation id: {}", self.correlation_id);
self.correlation_id.encode(dest, version)?;
trace!("encoding response");
self.response.file_encode(dest, data, version)?;
Ok(())
}
}
impl<R> FileWrite for RequestMessage<R>
where
R: FileWrite + Default + Request,
{
fn file_encode(
&self,
dest: &mut BytesMut,
data: &mut Vec<StoreValue>,
version: Version,
) -> Result<(), IoError> {
trace!("file encoding response message");
let len = self.write_size(version) as i32;
trace!("file encoding response len: {}", len);
len.encode(dest, version)?;
trace!("file encoding header");
self.header.encode(dest, version)?;
trace!("encoding response");
self.request.file_encode(dest, data, version)?;
Ok(())
}
}