d_engine_core/replication/
batch_buffer.rs1use std::collections::VecDeque;
2use std::time::Duration;
3
4use tokio::time::Instant;
5use tracing::trace;
6
7pub struct BatchBuffer<E> {
8 pub(super) max_batch_size: usize,
9 pub(super) batch_timeout: Duration,
10 pub(super) buffer: VecDeque<E>,
11 pub(super) last_flush: Instant,
12}
13
14impl<E> BatchBuffer<E> {
15 pub fn new(
16 max_batch_size: usize,
17 batch_timeout: Duration,
18 ) -> Self {
19 Self {
20 max_batch_size,
21 batch_timeout,
22 buffer: VecDeque::with_capacity(max_batch_size),
23 last_flush: Instant::now(),
24 }
25 }
26
27 pub fn push(
28 &mut self,
29 request: E,
30 ) -> Option<usize> {
31 self.buffer.push_back(request);
32 trace!(
33 "BatchBuffer::push, self.max_batch_size={}, self.buffer.len()={}",
34 self.max_batch_size,
35 self.buffer.len()
36 );
37 if self.buffer.len() >= self.max_batch_size {
38 Some(self.buffer.len())
39 } else {
40 None
41 }
42 }
43
44 pub fn should_flush(&self) -> bool {
45 !self.buffer.is_empty() && self.last_flush.elapsed() > self.batch_timeout
46 }
47
48 pub fn take(&mut self) -> VecDeque<E> {
49 self.last_flush = Instant::now();
50 std::mem::take(&mut self.buffer)
51 }
52}