use crate::incremental::{StateChangelogBuffer, StateChangelogEntry};
pub struct ChangelogDrainer {
buffer: std::sync::Arc<StateChangelogBuffer>,
pending: Vec<StateChangelogEntry>,
max_batch_size: usize,
max_pending: usize,
total_drained: u64,
}
const DEFAULT_MAX_PENDING: usize = 256 * 1024;
impl ChangelogDrainer {
#[must_use]
pub fn new(buffer: std::sync::Arc<StateChangelogBuffer>, max_batch_size: usize) -> Self {
Self {
buffer,
pending: Vec::with_capacity(max_batch_size),
max_batch_size,
max_pending: DEFAULT_MAX_PENDING,
total_drained: 0,
}
}
#[must_use]
pub fn with_max_pending(mut self, max_pending: usize) -> Self {
self.max_pending = max_pending;
self
}
pub fn drain(&mut self) -> usize {
if self.pending.len() >= self.max_pending {
let keep = self.max_pending / 2;
let drop_count = self.pending.len() - keep;
tracing::warn!(
dropped = drop_count,
kept = keep,
max_pending = self.max_pending,
"changelog drainer pending buffer at limit, shedding oldest entries"
);
self.pending.drain(..drop_count);
}
let room = self.max_pending.saturating_sub(self.pending.len());
let limit = self.max_batch_size.min(room);
let mut count = 0;
while count < limit {
match self.buffer.pop() {
Some(entry) => {
self.pending.push(entry);
count += 1;
}
None => break,
}
}
self.total_drained += count as u64;
count
}
pub fn take_pending(&mut self) -> Vec<StateChangelogEntry> {
std::mem::take(&mut self.pending)
}
pub fn clear_pending(&mut self) {
self.pending.clear();
}
#[must_use]
pub fn pending_count(&self) -> usize {
self.pending.len()
}
#[must_use]
pub fn pending(&self) -> &[StateChangelogEntry] {
&self.pending
}
#[must_use]
pub fn total_drained(&self) -> u64 {
self.total_drained
}
#[must_use]
pub fn buffer(&self) -> &StateChangelogBuffer {
&self.buffer
}
}
impl std::fmt::Debug for ChangelogDrainer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ChangelogDrainer")
.field("pending", &self.pending.len())
.field("max_batch_size", &self.max_batch_size)
.field("max_pending", &self.max_pending)
.field("total_drained", &self.total_drained)
.finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::incremental::StateChangelogEntry;
use std::sync::Arc;
#[test]
fn test_drainer_empty_buffer() {
let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
let mut drainer = ChangelogDrainer::new(buf, 100);
assert_eq!(drainer.drain(), 0);
assert_eq!(drainer.pending_count(), 0);
assert_eq!(drainer.total_drained(), 0);
}
#[test]
fn test_drainer_basic_drain() {
let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
buf.push(StateChangelogEntry::put(1, 100, 0, 10));
buf.push(StateChangelogEntry::put(1, 200, 10, 20));
buf.push(StateChangelogEntry::delete(1, 300));
let mut drainer = ChangelogDrainer::new(buf, 100);
let count = drainer.drain();
assert_eq!(count, 3);
assert_eq!(drainer.pending_count(), 3);
assert_eq!(drainer.total_drained(), 3);
}
#[test]
fn test_drainer_take_pending() {
let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
buf.push(StateChangelogEntry::put(1, 100, 0, 10));
buf.push(StateChangelogEntry::put(1, 200, 10, 20));
let mut drainer = ChangelogDrainer::new(buf, 100);
drainer.drain();
let entries = drainer.take_pending();
assert_eq!(entries.len(), 2);
assert_eq!(drainer.pending_count(), 0);
assert!(entries[0].is_put());
assert_eq!(entries[0].key_hash, 100);
assert!(entries[1].is_put());
assert_eq!(entries[1].key_hash, 200);
}
#[test]
fn test_drainer_respects_max_batch_size() {
let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
for i in 0..10 {
buf.push(StateChangelogEntry::put(1, i, 0, 1));
}
let mut drainer = ChangelogDrainer::new(buf, 3);
let count = drainer.drain();
assert_eq!(count, 3);
assert_eq!(drainer.pending_count(), 3);
let count2 = drainer.drain();
assert_eq!(count2, 3);
assert_eq!(drainer.pending_count(), 6);
}
#[test]
fn test_drainer_multiple_drain_cycles() {
let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
buf.push(StateChangelogEntry::put(1, 100, 0, 10));
let mut drainer = ChangelogDrainer::new(buf.clone(), 100);
drainer.drain();
let batch1 = drainer.take_pending();
assert_eq!(batch1.len(), 1);
buf.push(StateChangelogEntry::delete(2, 200));
buf.push(StateChangelogEntry::put(2, 300, 20, 30));
drainer.drain();
let batch2 = drainer.take_pending();
assert_eq!(batch2.len(), 2);
assert_eq!(drainer.total_drained(), 3);
}
#[test]
fn test_drainer_debug() {
let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
let drainer = ChangelogDrainer::new(buf, 100);
let debug = format!("{drainer:?}");
assert!(debug.contains("ChangelogDrainer"));
assert!(debug.contains("pending: 0"));
assert!(debug.contains("max_pending"));
}
#[test]
fn test_clear_pending_reuses_allocation() {
let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
buf.push(StateChangelogEntry::put(1, 100, 0, 10));
buf.push(StateChangelogEntry::put(1, 200, 10, 20));
let mut drainer = ChangelogDrainer::new(buf, 100);
drainer.drain();
assert_eq!(drainer.pending_count(), 2);
drainer.clear_pending();
assert_eq!(drainer.pending_count(), 0);
assert_eq!(drainer.total_drained(), 2);
}
#[test]
fn test_max_pending_bounds() {
let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
for i in 0..10 {
buf.push(StateChangelogEntry::put(1, i, 0, 1));
}
let mut drainer = ChangelogDrainer::new(buf.clone(), 100).with_max_pending(6);
let count = drainer.drain();
assert_eq!(count, 6);
assert_eq!(drainer.pending_count(), 6);
let count2 = drainer.drain();
assert_eq!(count2, 3);
assert_eq!(drainer.pending_count(), 6); assert_eq!(drainer.total_drained(), 9);
let count3 = drainer.drain();
assert_eq!(count3, 1);
assert_eq!(drainer.pending_count(), 4); assert_eq!(drainer.total_drained(), 10);
}
#[test]
fn test_max_pending_does_not_exceed() {
let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
for i in 0..3 {
buf.push(StateChangelogEntry::put(1, i, 0, 1));
}
let mut drainer = ChangelogDrainer::new(buf, 100).with_max_pending(5);
let count = drainer.drain();
assert_eq!(count, 3);
assert_eq!(drainer.pending_count(), 3);
}
}