embedded_redis/network/
response.rs

1use crate::network::protocol::Protocol;
2use alloc::vec::Vec;
3use bytes::Bytes;
4
5/// Parameters for memory optimization
6#[derive(Debug, Clone)]
7pub struct MemoryParameters {
8    /// Pre allocated unparsed buffer size. Should correspond to the maximum expected response size.
9    pub buffer_size: usize,
10
11    /// Pre allocated count of parsed frames. Should correspond to the expected number of parallel futures/commands.
12    pub frame_capacity: usize,
13
14    /// Optional buffer memory limit in bytes for preventing DOS attacks.
15    /// [CommandErrors::MemoryFull](crate::network::CommandErrors::MemoryFull) error is returned in case limit is reached.
16    pub memory_limit: Option<usize>,
17}
18
19impl Default for MemoryParameters {
20    fn default() -> Self {
21        Self {
22            buffer_size: 256,
23            frame_capacity: 8,
24            memory_limit: None,
25        }
26    }
27}
28
29pub(crate) struct ResponseBuffer<P: Protocol> {
30    decoder: P,
31
32    /// Unparsed data buffer
33    buffer: Vec<u8>,
34
35    /// Parsed frames
36    frames: Vec<Option<P::FrameType>>,
37
38    /// Number of non taken messages in message vector
39    frame_count: usize,
40
41    /// Frame index offset on external access (e.g. complete() or take_message())
42    /// So each new message gets an unique external index, while we can drain frame vector when
43    /// alle messages are taken
44    frame_offset: usize,
45
46    /// Received unknown message prefix
47    faulty: bool,
48
49    /// Memory limit in bytes. 0 in case if no limit is used.
50    limit: usize,
51}
52
53impl<P: Protocol> ResponseBuffer<P> {
54    pub fn new(protocol: P, parameters: MemoryParameters) -> ResponseBuffer<P> {
55        Self {
56            decoder: protocol,
57            buffer: Vec::with_capacity(parameters.buffer_size),
58            frames: Vec::with_capacity(parameters.frame_capacity),
59            frame_count: 0,
60            frame_offset: 0,
61            faulty: false,
62            limit: parameters.memory_limit.unwrap_or(0),
63        }
64    }
65
66    /// Appends data to buffer
67    pub fn append(&mut self, data: &[u8]) {
68        if self.is_full() {
69            return;
70        }
71
72        self.buffer.extend_from_slice(data);
73        self.parse_frames();
74    }
75
76    /// Takes the frame at the given index
77    pub fn take_frame(&mut self, mut index: usize) -> Option<P::FrameType> {
78        // Invalid index given
79        if index < self.frame_offset {
80            return None;
81        }
82
83        index -= self.frame_offset;
84
85        // Message does not (yet) exist
86        if self.frames.len() <= index {
87            return None;
88        }
89
90        let frame = self.frames[index].take();
91        if frame.is_some() {
92            self.frame_count -= 1;
93        }
94
95        if self.frame_count == 0 {
96            self.frame_offset += self.frames.len();
97            self.frames.clear();
98        }
99
100        frame
101    }
102
103    /// Takes the next frame. Returns None in case no complete frame exists.
104    pub fn take_next_frame(&mut self) -> Option<P::FrameType> {
105        let index = self.frames.iter().position(|x| x.is_some())?;
106        self.take_frame(index + self.frame_offset)
107    }
108
109    /// Parses buffer and extracts messages
110    /// Buffer is drained to only contain non-complete messages
111    fn parse_frames(&mut self) {
112        // Start of next message
113        let mut start = 0;
114
115        // Position of last termination in buffer
116        let mut last_termination = None;
117
118        while !self.faulty {
119            let termination = self.parse_frame(start);
120            // No more frames left
121            if termination.is_none() {
122                break;
123            }
124
125            start = termination.unwrap() + 1;
126            last_termination = termination;
127        }
128
129        // No message was found, so buffer can stay unchanged
130        if last_termination.is_none() {
131            return;
132        }
133
134        // No unparsed data remaining in buffer
135        if (last_termination.unwrap() + 1) == self.buffer.len() {
136            return self.buffer.clear();
137        }
138
139        self.buffer.drain(..=last_termination.unwrap());
140    }
141
142    /// Parses the next frame
143    ///
144    /// # Arguments
145    ///
146    /// * `start`: Start parsing at this position
147    ///
148    /// returns: Option<usize> Position/Index of last termination
149    /// None is returned in case no message was found
150    fn parse_frame(&mut self, start: usize) -> Option<usize> {
151        if start >= self.buffer.len() {
152            return None;
153        }
154
155        let bytes = Bytes::from(self.buffer[start..].to_vec());
156
157        let result = self.decoder.decode(&bytes);
158        if result.is_err() {
159            self.faulty = true;
160            return None;
161        }
162
163        // No frame found
164        if result.as_ref().unwrap().is_none() {
165            return None;
166        }
167
168        let frame = result.unwrap().unwrap();
169        self.frames.push(Some(frame.0));
170        self.frame_count += 1;
171        Some(frame.1 - 1 + start)
172    }
173
174    /// Is message at index given index complete
175    pub fn is_complete(&self, mut index: usize) -> bool {
176        if index < self.frame_offset {
177            return false;
178        }
179
180        index -= self.frame_offset;
181        self.frames.len() > index
182    }
183
184    /// If true, an protocol violation was detected
185    /// Since the cause (e.g. Redis bug, network fault, etc.) is unclear, this is a fatal problem.
186    /// The mapping of messages indexes can no longer be guaranteed from this point on.
187    pub fn is_faulty(&self) -> bool {
188        self.faulty
189    }
190
191    /// Returns true if max. buffer size is reached
192    pub fn is_full(&self) -> bool {
193        if self.limit == 0 {
194            return false;
195        }
196
197        self.buffer.len() > self.limit
198    }
199
200    /// Resets the buffer in case of fatal error
201    pub fn clear(&mut self) {
202        self.frames.clear();
203        self.buffer.clear();
204        self.frame_offset = 0;
205        self.frame_count = 0;
206        self.faulty = false;
207    }
208
209    #[cfg(test)]
210    pub fn pending_frame_count(&self) -> usize {
211        self.frame_count
212    }
213
214    #[cfg(test)]
215    pub fn frame_offset(&self) -> usize {
216        self.frame_offset
217    }
218}