embedded_redis/network/
response.rs1use crate::network::protocol::Protocol;
2use alloc::vec::Vec;
3use bytes::Bytes;
4
5#[derive(Debug, Clone)]
7pub struct MemoryParameters {
8 pub buffer_size: usize,
10
11 pub frame_capacity: usize,
13
14 pub memory_limit: Option<usize>,
17}
18
19impl Default for MemoryParameters {
20 fn default() -> Self {
21 Self {
22 buffer_size: 256,
23 frame_capacity: 8,
24 memory_limit: None,
25 }
26 }
27}
28
29pub(crate) struct ResponseBuffer<P: Protocol> {
30 decoder: P,
31
32 buffer: Vec<u8>,
34
35 frames: Vec<Option<P::FrameType>>,
37
38 frame_count: usize,
40
41 frame_offset: usize,
45
46 faulty: bool,
48
49 limit: usize,
51}
52
53impl<P: Protocol> ResponseBuffer<P> {
54 pub fn new(protocol: P, parameters: MemoryParameters) -> ResponseBuffer<P> {
55 Self {
56 decoder: protocol,
57 buffer: Vec::with_capacity(parameters.buffer_size),
58 frames: Vec::with_capacity(parameters.frame_capacity),
59 frame_count: 0,
60 frame_offset: 0,
61 faulty: false,
62 limit: parameters.memory_limit.unwrap_or(0),
63 }
64 }
65
66 pub fn append(&mut self, data: &[u8]) {
68 if self.is_full() {
69 return;
70 }
71
72 self.buffer.extend_from_slice(data);
73 self.parse_frames();
74 }
75
76 pub fn take_frame(&mut self, mut index: usize) -> Option<P::FrameType> {
78 if index < self.frame_offset {
80 return None;
81 }
82
83 index -= self.frame_offset;
84
85 if self.frames.len() <= index {
87 return None;
88 }
89
90 let frame = self.frames[index].take();
91 if frame.is_some() {
92 self.frame_count -= 1;
93 }
94
95 if self.frame_count == 0 {
96 self.frame_offset += self.frames.len();
97 self.frames.clear();
98 }
99
100 frame
101 }
102
103 pub fn take_next_frame(&mut self) -> Option<P::FrameType> {
105 let index = self.frames.iter().position(|x| x.is_some())?;
106 self.take_frame(index + self.frame_offset)
107 }
108
109 fn parse_frames(&mut self) {
112 let mut start = 0;
114
115 let mut last_termination = None;
117
118 while !self.faulty {
119 let termination = self.parse_frame(start);
120 if termination.is_none() {
122 break;
123 }
124
125 start = termination.unwrap() + 1;
126 last_termination = termination;
127 }
128
129 if last_termination.is_none() {
131 return;
132 }
133
134 if (last_termination.unwrap() + 1) == self.buffer.len() {
136 return self.buffer.clear();
137 }
138
139 self.buffer.drain(..=last_termination.unwrap());
140 }
141
142 fn parse_frame(&mut self, start: usize) -> Option<usize> {
151 if start >= self.buffer.len() {
152 return None;
153 }
154
155 let bytes = Bytes::from(self.buffer[start..].to_vec());
156
157 let result = self.decoder.decode(&bytes);
158 if result.is_err() {
159 self.faulty = true;
160 return None;
161 }
162
163 if result.as_ref().unwrap().is_none() {
165 return None;
166 }
167
168 let frame = result.unwrap().unwrap();
169 self.frames.push(Some(frame.0));
170 self.frame_count += 1;
171 Some(frame.1 - 1 + start)
172 }
173
174 pub fn is_complete(&self, mut index: usize) -> bool {
176 if index < self.frame_offset {
177 return false;
178 }
179
180 index -= self.frame_offset;
181 self.frames.len() > index
182 }
183
184 pub fn is_faulty(&self) -> bool {
188 self.faulty
189 }
190
191 pub fn is_full(&self) -> bool {
193 if self.limit == 0 {
194 return false;
195 }
196
197 self.buffer.len() > self.limit
198 }
199
200 pub fn clear(&mut self) {
202 self.frames.clear();
203 self.buffer.clear();
204 self.frame_offset = 0;
205 self.frame_count = 0;
206 self.faulty = false;
207 }
208
209 #[cfg(test)]
210 pub fn pending_frame_count(&self) -> usize {
211 self.frame_count
212 }
213
214 #[cfg(test)]
215 pub fn frame_offset(&self) -> usize {
216 self.frame_offset
217 }
218}