1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
use crate::network::protocol::Protocol;
use alloc::vec::Vec;
use bytes::Bytes;
/// Parameters for memory optimization
#[derive(Debug, Clone)]
pub struct MemoryParameters {
/// Pre allocated unparsed buffer size. Should correspond to the maximum expected response size.
pub buffer_size: usize,
/// Pre allocated count of parsed frames. Should correspond to the expected number of parallel futures/commands.
pub frame_capacity: usize,
/// Optional buffer memory limit in bytes for preventing DOS attacks.
/// [CommandErrors::MemoryFull](crate::network::CommandErrors::MemoryFull) error is returned in case limit is reached.
pub memory_limit: Option<usize>,
}
impl Default for MemoryParameters {
fn default() -> Self {
Self {
buffer_size: 256,
frame_capacity: 8,
memory_limit: None,
}
}
}
pub(crate) struct ResponseBuffer<P: Protocol> {
decoder: P,
/// Unparsed data buffer
buffer: Vec<u8>,
/// Parsed frames
frames: Vec<Option<P::FrameType>>,
/// Number of non taken messages in message vector
frame_count: usize,
/// Frame index offset on external access (e.g. complete() or take_message())
/// So each new message gets an unique external index, while we can drain frame vector when
/// alle messages are taken
frame_offset: usize,
/// Received unknown message prefix
faulty: bool,
/// Memory limit in bytes. 0 in case if no limit is used.
limit: usize,
}
impl<P: Protocol> ResponseBuffer<P> {
pub fn new(protocol: P, parameters: MemoryParameters) -> ResponseBuffer<P> {
Self {
decoder: protocol,
buffer: Vec::with_capacity(parameters.buffer_size),
frames: Vec::with_capacity(parameters.frame_capacity),
frame_count: 0,
frame_offset: 0,
faulty: false,
limit: parameters.memory_limit.unwrap_or(0),
}
}
/// Appends data to buffer
pub fn append(&mut self, data: &[u8]) {
if self.is_full() {
return;
}
self.buffer.extend_from_slice(data);
self.parse_frames();
}
/// Takes the frame at the given index
pub fn take_frame(&mut self, mut index: usize) -> Option<P::FrameType> {
// Invalid index given
if index < self.frame_offset {
return None;
}
index -= self.frame_offset;
// Message does not (yet) exist
if self.frames.len() <= index {
return None;
}
let frame = self.frames[index].take();
if frame.is_some() {
self.frame_count -= 1;
}
if self.frame_count == 0 {
self.frame_offset += self.frames.len();
self.frames.clear();
}
frame
}
/// Takes the next frame. Returns None in case no complete frame exists.
pub fn take_next_frame(&mut self) -> Option<P::FrameType> {
let index = self.frames.iter().position(|x| x.is_some())?;
self.take_frame(index + self.frame_offset)
}
/// Parses buffer and extracts messages
/// Buffer is drained to only contain non-complete messages
fn parse_frames(&mut self) {
// Start of next message
let mut start = 0;
// Position of last termination in buffer
let mut last_termination = None;
while !self.faulty {
let termination = self.parse_frame(start);
// No more frames left
if termination.is_none() {
break;
}
start = termination.unwrap() + 1;
last_termination = termination;
}
// No message was found, so buffer can stay unchanged
if last_termination.is_none() {
return;
}
// No unparsed data remaining in buffer
if (last_termination.unwrap() + 1) == self.buffer.len() {
return self.buffer.clear();
}
self.buffer.drain(..=last_termination.unwrap());
}
/// Parses the next frame
///
/// # Arguments
///
/// * `start`: Start parsing at this position
///
/// returns: Option<usize> Position/Index of last termination
/// None is returned in case no message was found
fn parse_frame(&mut self, start: usize) -> Option<usize> {
if start >= self.buffer.len() {
return None;
}
let bytes = Bytes::from(self.buffer[start..].to_vec());
let result = self.decoder.decode(&bytes);
if result.is_err() {
self.faulty = true;
return None;
}
// No frame found
if result.as_ref().unwrap().is_none() {
return None;
}
let frame = result.unwrap().unwrap();
self.frames.push(Some(frame.0));
self.frame_count += 1;
Some(frame.1 - 1 + start)
}
/// Is message at index given index complete
pub fn is_complete(&self, mut index: usize) -> bool {
if index < self.frame_offset {
return false;
}
index -= self.frame_offset;
self.frames.len() > index
}
/// If true, an protocol violation was detected
/// Since the cause (e.g. Redis bug, network fault, etc.) is unclear, this is a fatal problem.
/// The mapping of messages indexes can no longer be guaranteed from this point on.
pub fn is_faulty(&self) -> bool {
self.faulty
}
/// Returns true if max. buffer size is reached
pub fn is_full(&self) -> bool {
if self.limit == 0 {
return false;
}
self.buffer.len() > self.limit
}
/// Resets the buffer in case of fatal error
pub fn clear(&mut self) {
self.frames.clear();
self.buffer.clear();
self.frame_offset = 0;
self.frame_count = 0;
self.faulty = false;
}
#[cfg(test)]
pub fn pending_frame_count(&self) -> usize {
self.frame_count
}
#[cfg(test)]
pub fn frame_offset(&self) -> usize {
self.frame_offset
}
}