1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
use crate::network::protocol::Protocol;
use alloc::vec::Vec;
use bytes::Bytes;

/// Parameters for memory optimization
#[derive(Debug, Clone)]
pub struct MemoryParameters {
    /// Pre allocated unparsed buffer size. Should correspond to the maximum expected response size.
    pub buffer_size: usize,

    /// Pre allocated count of parsed frames. Should correspond to the expected number of parallel futures/commands.
    pub frame_capacity: usize,
}

impl Default for MemoryParameters {
    fn default() -> Self {
        Self {
            buffer_size: 256,
            frame_capacity: 8,
        }
    }
}

pub(crate) struct ResponseBuffer<P: Protocol> {
    decoder: P,

    /// Unparsed data buffer
    buffer: Vec<u8>,

    /// Parsed frames
    frames: Vec<Option<P::FrameType>>,

    /// Number of non taken messages in message vector
    frame_count: usize,

    /// Frame index offset on external access (e.g. complete() or take_message())
    /// So each new message gets an unique external index, while we can drain frame vector when
    /// alle messages are taken
    frame_offset: usize,

    /// Received unknown message prefix
    faulty: bool,
}

impl<P: Protocol> ResponseBuffer<P> {
    pub fn new(protocol: P, parameters: MemoryParameters) -> ResponseBuffer<P> {
        Self {
            decoder: protocol,
            buffer: Vec::with_capacity(parameters.buffer_size),
            frames: Vec::with_capacity(parameters.frame_capacity),
            frame_count: 0,
            frame_offset: 0,
            faulty: false,
        }
    }

    /// Appends data to buffer
    pub fn append(&mut self, data: &[u8]) {
        self.buffer.extend_from_slice(data);
        self.parse_frames();
    }

    /// Takes the frame at the given index
    pub fn take_frame(&mut self, mut index: usize) -> Option<P::FrameType> {
        // Invalid index given
        if index < self.frame_offset {
            return None;
        }

        index -= self.frame_offset;

        // Message does not (yet) exist
        if self.frames.len() <= index {
            return None;
        }

        let frame = self.frames[index].take();
        if frame.is_some() {
            self.frame_count -= 1;
        }

        if self.frame_count == 0 {
            self.frame_offset += self.frames.len();
            self.frames.clear();
        }

        frame
    }

    /// Takes the next frame. Returns None in case no complete frame exists.
    pub fn take_next_frame(&mut self) -> Option<P::FrameType> {
        let index = self.frames.iter().position(|x| x.is_some())?;
        self.take_frame(index + self.frame_offset)
    }

    /// Parses buffer and extracts messages
    /// Buffer is drained to only contain non-complete messages
    fn parse_frames(&mut self) {
        // Start of next message
        let mut start = 0;

        // Position of last termination in buffer
        let mut last_termination = None;

        while !self.faulty {
            let termination = self.parse_frame(start);
            // No more frames left
            if termination.is_none() {
                break;
            }

            start = termination.unwrap() + 1;
            last_termination = termination;
        }

        // No message was found, so buffer can stay unchanged
        if last_termination.is_none() {
            return;
        }

        // No unparsed data remaining in buffer
        if (last_termination.unwrap() + 1) == self.buffer.len() {
            return self.buffer.clear();
        }

        self.buffer.drain(..=last_termination.unwrap());
    }

    /// Parses the next frame
    ///
    /// # Arguments
    ///
    /// * `start`: Start parsing at this position
    ///
    /// returns: Option<usize> Position/Index of last termination
    /// None is returned in case no message was found
    fn parse_frame(&mut self, start: usize) -> Option<usize> {
        if start >= self.buffer.len() {
            return None;
        }

        let bytes = Bytes::from(self.buffer[start..].to_vec());

        let result = self.decoder.decode(&bytes);
        if result.is_err() {
            self.faulty = true;
            return None;
        }

        // No frame found
        if result.as_ref().unwrap().is_none() {
            return None;
        }

        let frame = result.unwrap().unwrap();
        self.frames.push(Some(frame.0));
        self.frame_count += 1;
        Some(frame.1 - 1 + start)
    }

    /// Is message at index given index complete
    pub fn is_complete(&self, mut index: usize) -> bool {
        if index < self.frame_offset {
            return false;
        }

        index -= self.frame_offset;
        self.frames.len() > index
    }

    /// If true, an protocol violation was detected
    /// Since the cause (e.g. Redis bug, network fault, etc.) is unclear, this is a fatal problem.
    /// The mapping of messages indexes can no longer be guaranteed from this point on.
    pub fn is_faulty(&self) -> bool {
        self.faulty
    }

    /// Resets the buffer in case of fatal error
    pub fn clear(&mut self) {
        self.frames.clear();
        self.buffer.clear();
        self.frame_offset = 0;
        self.frame_count = 0;
        self.faulty = false;
    }

    #[cfg(test)]
    pub fn pending_frame_count(&self) -> usize {
        self.frame_count
    }

    #[cfg(test)]
    pub fn frame_offset(&self) -> usize {
        self.frame_offset
    }
}