raft/tracker/
inflights.rs

1// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3// Copyright 2015 The etcd Authors
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::cmp::Ordering;
18
19/// A buffer of inflight messages.
20#[derive(Debug, PartialEq, Eq, Clone)]
21pub struct Inflights {
22    // the starting index in the buffer
23    start: usize,
24    // number of inflights in the buffer
25    count: usize,
26
27    // ring buffer
28    buffer: Vec<u64>,
29
30    // capacity
31    cap: usize,
32
33    // To support dynamically change inflight size.
34    incoming_cap: Option<usize>,
35}
36
37impl Inflights {
38    /// Creates a new buffer for inflight messages.
39    pub fn new(cap: usize) -> Inflights {
40        Inflights {
41            buffer: Vec::with_capacity(cap),
42            start: 0,
43            count: 0,
44            cap,
45            incoming_cap: None,
46        }
47    }
48
49    /// Adjust inflight buffer capacity. Set it to `0` will disable the progress.
50    // Calling it between `self.full()` and `self.add()` can cause a panic.
51    pub fn set_cap(&mut self, incoming_cap: usize) {
52        match self.cap.cmp(&incoming_cap) {
53            Ordering::Equal => self.incoming_cap = None,
54            Ordering::Less => {
55                if self.start + self.count <= self.cap {
56                    if self.buffer.capacity() > 0 {
57                        self.buffer.reserve(incoming_cap - self.buffer.len());
58                    }
59                } else {
60                    debug_assert_eq!(self.cap, self.buffer.len());
61                    let mut buffer = Vec::with_capacity(incoming_cap);
62                    buffer.extend_from_slice(&self.buffer[self.start..]);
63                    buffer.extend_from_slice(&self.buffer[0..self.count - (self.cap - self.start)]);
64                    self.buffer = buffer;
65                    self.start = 0;
66                }
67                self.cap = incoming_cap;
68                self.incoming_cap = None;
69            }
70            Ordering::Greater => {
71                if self.count == 0 {
72                    self.cap = incoming_cap;
73                    self.incoming_cap = None;
74                    self.start = 0;
75                    if self.buffer.capacity() > 0 {
76                        self.buffer = Vec::with_capacity(incoming_cap);
77                    }
78                } else {
79                    self.incoming_cap = Some(incoming_cap);
80                }
81            }
82        }
83    }
84
85    /// Returns true if the inflights is full.
86    #[inline]
87    pub fn full(&self) -> bool {
88        self.count == self.cap || self.incoming_cap.map_or(false, |cap| self.count >= cap)
89    }
90
91    /// Adds an inflight into inflights
92    pub fn add(&mut self, inflight: u64) {
93        if self.full() {
94            panic!("cannot add into a full inflights")
95        }
96
97        if self.buffer.capacity() == 0 {
98            debug_assert_eq!(self.count, 0);
99            debug_assert_eq!(self.start, 0);
100            debug_assert!(self.incoming_cap.is_none());
101            self.buffer = Vec::with_capacity(self.cap);
102        }
103
104        let mut next = self.start + self.count;
105        if next >= self.cap {
106            next -= self.cap;
107        }
108        assert!(next <= self.buffer.len());
109        if next == self.buffer.len() {
110            self.buffer.push(inflight);
111        } else {
112            self.buffer[next] = inflight;
113        }
114        self.count += 1;
115    }
116
117    /// Frees the inflights smaller or equal to the given `to` flight.
118    pub fn free_to(&mut self, to: u64) {
119        if self.count == 0 || to < self.buffer[self.start] {
120            // out of the left side of the window
121            return;
122        }
123
124        let mut i = 0usize;
125        let mut idx = self.start;
126        while i < self.count {
127            if to < self.buffer[idx] {
128                // found the first large inflight
129                break;
130            }
131
132            // increase index and maybe rotate
133            idx += 1;
134            if idx >= self.cap {
135                idx -= self.cap;
136            }
137
138            i += 1;
139        }
140
141        // free i inflights and set new start index
142        self.count -= i;
143        self.start = idx;
144
145        if self.count == 0 {
146            if let Some(incoming_cap) = self.incoming_cap.take() {
147                self.start = 0;
148                self.cap = incoming_cap;
149                self.buffer = Vec::with_capacity(self.cap);
150            }
151        }
152    }
153
154    /// Frees the first buffer entry.
155    #[inline]
156    pub fn free_first_one(&mut self) {
157        if self.count > 0 {
158            let start = self.buffer[self.start];
159            self.free_to(start);
160        }
161    }
162
163    /// Frees all inflights.
164    #[inline]
165    pub fn reset(&mut self) {
166        self.count = 0;
167        self.start = 0;
168        self.buffer = vec![];
169        self.cap = self.incoming_cap.take().unwrap_or(self.cap);
170    }
171
172    // Number of inflight messages. It's for tests.
173    #[doc(hidden)]
174    #[inline]
175    pub fn count(&self) -> usize {
176        self.count
177    }
178
179    // Capacity of the internal buffer.
180    #[doc(hidden)]
181    #[inline]
182    pub fn buffer_capacity(&self) -> usize {
183        self.buffer.capacity()
184    }
185
186    // Whether buffer is allocated or not. It's for tests.
187    #[doc(hidden)]
188    #[inline]
189    pub fn buffer_is_allocated(&self) -> bool {
190        self.buffer_capacity() > 0
191    }
192
193    /// Free unused memory
194    #[inline]
195    pub fn maybe_free_buffer(&mut self) {
196        if self.count == 0 {
197            self.start = 0;
198            self.buffer = vec![];
199            debug_assert_eq!(self.buffer.capacity(), 0);
200        }
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use super::Inflights;
207
208    #[test]
209    fn test_inflight_add() {
210        let mut inflight = Inflights::new(10);
211        for i in 0..5 {
212            inflight.add(i);
213        }
214
215        let wantin = Inflights {
216            start: 0,
217            count: 5,
218            buffer: vec![0, 1, 2, 3, 4],
219            cap: 10,
220            incoming_cap: None,
221        };
222
223        assert_eq!(inflight, wantin);
224
225        for i in 5..10 {
226            inflight.add(i);
227        }
228
229        let wantin2 = Inflights {
230            start: 0,
231            count: 10,
232            buffer: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
233            cap: 10,
234            incoming_cap: None,
235        };
236
237        assert_eq!(inflight, wantin2);
238
239        let mut inflight2 = Inflights::new(10);
240        inflight2.start = 5;
241        inflight2.buffer.extend_from_slice(&[0, 0, 0, 0, 0]);
242
243        for i in 0..5 {
244            inflight2.add(i);
245        }
246
247        let wantin21 = Inflights {
248            start: 5,
249            count: 5,
250            buffer: vec![0, 0, 0, 0, 0, 0, 1, 2, 3, 4],
251            cap: 10,
252            incoming_cap: None,
253        };
254
255        assert_eq!(inflight2, wantin21);
256
257        for i in 5..10 {
258            inflight2.add(i);
259        }
260
261        let wantin22 = Inflights {
262            start: 5,
263            count: 10,
264            buffer: vec![5, 6, 7, 8, 9, 0, 1, 2, 3, 4],
265            cap: 10,
266            incoming_cap: None,
267        };
268
269        assert_eq!(inflight2, wantin22);
270    }
271
272    #[test]
273    fn test_inflight_free_to() {
274        let mut inflight = Inflights::new(10);
275        for i in 0..10 {
276            inflight.add(i);
277        }
278
279        inflight.free_to(4);
280
281        let wantin = Inflights {
282            start: 5,
283            count: 5,
284            buffer: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
285            cap: 10,
286            incoming_cap: None,
287        };
288
289        assert_eq!(inflight, wantin);
290
291        inflight.free_to(8);
292
293        let wantin2 = Inflights {
294            start: 9,
295            count: 1,
296            buffer: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
297            cap: 10,
298            incoming_cap: None,
299        };
300
301        assert_eq!(inflight, wantin2);
302
303        for i in 10..15 {
304            inflight.add(i);
305        }
306
307        inflight.free_to(12);
308
309        let wantin3 = Inflights {
310            start: 3,
311            count: 2,
312            buffer: vec![10, 11, 12, 13, 14, 5, 6, 7, 8, 9],
313            cap: 10,
314            incoming_cap: None,
315        };
316
317        assert_eq!(inflight, wantin3);
318
319        inflight.free_to(14);
320
321        let wantin4 = Inflights {
322            start: 5,
323            count: 0,
324            buffer: vec![10, 11, 12, 13, 14, 5, 6, 7, 8, 9],
325            cap: 10,
326            incoming_cap: None,
327        };
328
329        assert_eq!(inflight, wantin4);
330    }
331
332    #[test]
333    fn test_inflight_free_first_one() {
334        let mut inflight = Inflights::new(10);
335        for i in 0..10 {
336            inflight.add(i);
337        }
338
339        inflight.free_first_one();
340
341        let wantin = Inflights {
342            start: 1,
343            count: 9,
344            buffer: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
345            cap: 10,
346            incoming_cap: None,
347        };
348
349        assert_eq!(inflight, wantin);
350    }
351
352    #[test]
353    fn test_inflights_set_cap() {
354        // Prepare 3 `Inflights` with 16 items, but start at 16, 112 and 120.
355        let mut inflights = Vec::with_capacity(3);
356        for &start in &[16, 112, 120] {
357            let mut inflight = Inflights::new(128);
358            (0..start).for_each(|i| inflight.add(i));
359            inflight.free_to(start - 1);
360            (0..16).for_each(|i| inflight.add(i));
361            assert_eq!(inflight.count(), 16);
362            assert_eq!(inflight.start, start as usize);
363            inflights.push(inflight);
364        }
365
366        // Adjust cap to a larger value.
367        for (i, inflight) in inflights.iter_mut().enumerate() {
368            inflight.set_cap(1024);
369            assert_eq!(inflight.cap, 1024);
370            assert_eq!(inflight.incoming_cap, None);
371            assert_eq!(inflight.buffer_capacity(), 1024);
372            if i < 2 {
373                // The internal buffer is extended directly.
374                assert_ne!(inflight.start, 0);
375            } else {
376                // The internal buffer is re-allocated instead of extended.
377                assert_eq!(inflight.start, 0);
378            }
379        }
380
381        // Prepare 3 `Inflights` with given `start`, `count` and `buffer_cap`.
382        let mut inflights = Vec::with_capacity(3);
383        for &(start, count, buffer_cap) in &[(1, 0, 0), (1, 0, 128), (1, 8, 128)] {
384            let mut inflight = Inflights::new(128);
385            inflight.start = start;
386            inflight.buffer = vec![0; buffer_cap];
387            (0..count).for_each(|i| inflight.add(i));
388            inflights.push(inflight);
389        }
390
391        // Adjust cap to a less value.
392        for (i, inflight) in inflights.iter_mut().enumerate() {
393            inflight.set_cap(64);
394            if i == 0 || i == 1 {
395                assert_eq!(inflight.cap, 64);
396                assert_eq!(inflight.incoming_cap, None);
397                assert_eq!(inflight.start, 0);
398                if i == 0 {
399                    assert_eq!(inflight.buffer.capacity(), 0)
400                } else {
401                    assert_eq!(inflight.buffer.capacity(), 64)
402                }
403            } else {
404                assert_eq!(inflight.cap, 128);
405                assert_eq!(inflight.incoming_cap, Some(64));
406                assert_eq!(inflight.start, 1);
407                assert_eq!(inflight.buffer.capacity(), 128)
408            }
409        }
410
411        // `incoming_cap` can be cleared if the buffer is freed totally.
412        let mut inflight = inflights[2].clone();
413        inflight.free_to(7);
414        assert_eq!(inflight.cap, 64);
415        assert_eq!(inflight.incoming_cap, None);
416        assert_eq!(inflight.start, 0);
417
418        // `incoming_cap` can be cleared when `cap` is enlarged.
419        for &new_cap in &[128, 1024] {
420            let mut inflight = inflights[2].clone();
421            inflight.set_cap(new_cap);
422            assert_eq!(inflight.cap, new_cap);
423            assert_eq!(inflight.incoming_cap, None);
424        }
425    }
426}