d_engine_core/replication/
batch_buffer.rs

1use std::collections::VecDeque;
2use std::time::Duration;
3
4use tokio::time::Instant;
5use tracing::trace;
6
7pub struct BatchBuffer<E> {
8    pub(super) max_batch_size: usize,
9    pub(super) batch_timeout: Duration,
10    pub(super) buffer: VecDeque<E>,
11    pub(super) last_flush: Instant,
12}
13
14impl<E> BatchBuffer<E> {
15    pub fn new(
16        max_batch_size: usize,
17        batch_timeout: Duration,
18    ) -> Self {
19        Self {
20            max_batch_size,
21            batch_timeout,
22            buffer: VecDeque::with_capacity(max_batch_size),
23            last_flush: Instant::now(),
24        }
25    }
26
27    pub fn push(
28        &mut self,
29        request: E,
30    ) -> Option<usize> {
31        self.buffer.push_back(request);
32        trace!(
33            "BatchBuffer::push, self.max_batch_size={}, self.buffer.len()={}",
34            self.max_batch_size,
35            self.buffer.len()
36        );
37        if self.buffer.len() >= self.max_batch_size {
38            Some(self.buffer.len())
39        } else {
40            None
41        }
42    }
43
44    pub fn should_flush(&self) -> bool {
45        !self.buffer.is_empty() && self.last_flush.elapsed() > self.batch_timeout
46    }
47
48    pub fn take(&mut self) -> VecDeque<E> {
49        self.last_flush = Instant::now();
50        std::mem::take(&mut self.buffer)
51    }
52}