1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
use crate::network::protocol::Protocol;
use alloc::vec::Vec;
use bytes::Bytes;
/// Parameters for memory optimization
#[derive(Debug, Clone)]
pub struct MemoryParameters {
/// Pre allocated unparsed buffer size. Should correspond to the maximum expected response size.
pub buffer_size: usize,
/// Pre allocated count of parsed frames. Should correspond to the expected number of parallel futures/commands.
pub frame_capacity: usize,
/// Optional buffer memory limit in bytes for preventing DOS attacks.
/// [CommandErrors::MemoryFull](crate::network::CommandErrors::MemoryFull) error is returned in case limit is reached.
pub memory_limit: Option<usize>,
}
impl Default for MemoryParameters {
fn default() -> Self {
Self {
buffer_size: 256,
frame_capacity: 8,
memory_limit: None,
}
}
}
pub(crate) struct ResponseBuffer<P: Protocol> {
decoder: P,
/// Unparsed data buffer
buffer: Vec<u8>,
/// Parsed frames
frames: Vec<Option<P::FrameType>>,
/// Number of non taken messages in message vector
frame_count: usize,
/// Frame index offset on external access (e.g. complete() or take_message())
/// So each new message gets an unique external index, while we can drain frame vector when
/// alle messages are taken
frame_offset: usize,
/// Received unknown message prefix
faulty: bool,
/// Memory limit in bytes. 0 in case if no limit is used.
limit: usize,
}
impl<P: Protocol> ResponseBuffer<P> {
pub fn new(protocol: P, parameters: MemoryParameters) -> ResponseBuffer<P> {
Self {
decoder: protocol,
buffer: Vec::with_capacity(parameters.buffer_size),
frames: Vec::with_capacity(parameters.frame_capacity),
frame_count: 0,
frame_offset: 0,
faulty: false,
limit: parameters.memory_limit.unwrap_or(0),
}
}
/// Appends data to buffer
pub fn append(&mut self, data: &[u8]) {
if self.is_full() {
return;
}
self.buffer.extend_from_slice(data);
self.parse_frames();
}
/// Takes the frame at the given index
pub fn take_frame(&mut self, mut index: usize) -> Option<P::FrameType> {
// Invalid index given
if index < self.frame_offset {
return None;
}
index -= self.frame_offset;
// Message does not (yet) exist
if self.frames.len() <= index {
return None;
}
let frame = self.frames[index].take();
if frame.is_some() {
self.frame_count -= 1;
}
if self.frame_count == 0 {
self.frame_offset += self.frames.len();
self.frames.clear();
}
frame
}
/// Takes the next frame. Returns None in case no complete frame exists.
pub fn take_next_frame(&mut self) -> Option<P::FrameType> {
let index = self.frames.iter().position(|x| x.is_some())?;
self.take_frame(index + self.frame_offset)
}
/// Parses buffer and extracts messages
/// Buffer is drained to only contain non-complete messages
fn parse_frames(&mut self) {
// Start of next message
let mut start = 0;
// Position of last termination in buffer
let mut last_termination = None;
while !self.faulty {
let termination = self.parse_frame(start);
// No more frames left
if termination.is_none() {
break;
}
start = termination.unwrap() + 1;
last_termination = termination;
}
// No message was found, so buffer can stay unchanged
if last_termination.is_none() {
return;
}
// No unparsed data remaining in buffer
if (last_termination.unwrap() + 1) == self.buffer.len() {
return self.buffer.clear();
}
self.buffer.drain(..=last_termination.unwrap());
}
/// Parses the next frame
///
/// # Arguments
///
/// * `start`: Start parsing at this position
///
/// returns: Option<usize> Position/Index of last termination
/// None is returned in case no message was found
fn parse_frame(&mut self, start: usize) -> Option<usize> {
if start >= self.buffer.len() {
return None;
}
let bytes = Bytes::from(self.buffer[start..].to_vec());
let result = self.decoder.decode(&bytes);
if result.is_err() {
self.faulty = true;
return None;
}
// No frame found
if result.as_ref().unwrap().is_none() {
return None;
}
let frame = result.unwrap().unwrap();
self.frames.push(Some(frame.0));
self.frame_count += 1;
Some(frame.1 - 1 + start)
}
/// Is message at index given index complete
pub fn is_complete(&self, mut index: usize) -> bool {
if index < self.frame_offset {
return false;
}
index -= self.frame_offset;
self.frames.len() > index
}
/// If true, an protocol violation was detected
/// Since the cause (e.g. Redis bug, network fault, etc.) is unclear, this is a fatal problem.
/// The mapping of messages indexes can no longer be guaranteed from this point on.
pub fn is_faulty(&self) -> bool {
self.faulty
}
/// Returns true if max. buffer size is reached
pub fn is_full(&self) -> bool {
if self.limit == 0 {
return false;
}
self.buffer.len() > self.limit
}
/// Resets the buffer in case of fatal error
pub fn clear(&mut self) {
self.frames.clear();
self.buffer.clear();
self.frame_offset = 0;
self.frame_count = 0;
self.faulty = false;
}
#[cfg(test)]
pub fn pending_frame_count(&self) -> usize {
self.frame_count
}
#[cfg(test)]
pub fn frame_offset(&self) -> usize {
self.frame_offset
}
}