use std::time::Duration;
use std::time::Instant;
use bytes::Bytes;
use bytes::BytesMut;
#[derive(Debug, Clone)]
pub struct EncodeBufferConfig {
pub min_capacity: usize,
pub ema_alpha: f64,
pub shrink_ratio_trigger: f64,
pub shrink_target_factor: f64,
pub shrink_cooldown: Duration,
pub min_shrink_threshold: usize,
}
impl Default for EncodeBufferConfig {
fn default() -> Self {
Self {
min_capacity: 8 * 1024, ema_alpha: 0.05, shrink_ratio_trigger: 3.0, shrink_target_factor: 1.5, shrink_cooldown: Duration::from_secs(30),
min_shrink_threshold: 64 * 1024, }
}
}
#[derive(Debug, Clone)]
pub struct BufferStats {
pub current_capacity: usize,
pub ema_size: f64,
pub shrink_count: u64,
pub expand_count: u64,
pub historical_max: usize,
}
pub struct EncodeBuffer {
buf: BytesMut,
cfg: EncodeBufferConfig,
ema_recent_size: f64,
last_shrink: Instant,
historical_max_len: usize,
shrink_count: u64,
expand_count: u64,
last_capacity: usize,
}
impl Default for EncodeBuffer {
fn default() -> Self {
Self::new()
}
}
impl EncodeBuffer {
pub fn new() -> Self {
Self::with_config(EncodeBufferConfig::default())
}
pub fn with_config(cfg: EncodeBufferConfig) -> Self {
let initial = std::cmp::max(cfg.min_capacity, 8);
EncodeBuffer {
buf: BytesMut::with_capacity(initial),
cfg,
ema_recent_size: (initial * 2) as f64,
last_shrink: Instant::now() - Duration::from_secs(60),
historical_max_len: 0,
shrink_count: 0,
expand_count: 0,
last_capacity: initial,
}
}
#[inline]
pub fn capacity(&self) -> usize {
self.buf.capacity()
}
#[inline]
pub fn len(&self) -> usize {
self.buf.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.buf.is_empty()
}
#[inline]
pub fn append(&mut self, data: &[u8]) {
self.track_expansion();
self.buf.extend_from_slice(data);
}
#[inline]
pub fn append_bytes(&mut self, bytes: &Bytes) {
self.track_expansion();
self.buf.extend_from_slice(bytes.as_ref());
}
#[inline]
pub fn buf_mut(&mut self) -> &mut BytesMut {
self.track_expansion();
&mut self.buf
}
pub fn take_bytes(&mut self) -> Bytes {
let len = self.buf.len();
if len > self.historical_max_len {
self.historical_max_len = len;
}
let out = if len > 0 {
self.buf.split_to(len).freeze()
} else {
Bytes::new()
};
self.update_ema(len);
self.maybe_shrink();
let current_cap = self.buf.capacity();
if current_cap < self.cfg.min_capacity && self.buf.is_empty() {
self.buf = BytesMut::with_capacity(self.cfg.min_capacity);
}
out
}
pub fn force_shrink_to_min(&mut self) {
let min = self.cfg.min_capacity;
if self.buf.capacity() > min {
self.do_shrink(min);
}
}
pub fn historical_max(&self) -> usize {
self.historical_max_len
}
pub fn stats(&self) -> BufferStats {
BufferStats {
current_capacity: self.capacity(),
ema_size: self.ema_recent_size,
shrink_count: self.shrink_count,
expand_count: self.expand_count,
historical_max: self.historical_max_len,
}
}
#[inline]
fn track_expansion(&mut self) {
let cur = self.buf.capacity();
if cur > self.last_capacity {
self.expand_count = self.expand_count.saturating_add(1);
self.last_capacity = cur;
}
}
#[inline]
fn update_ema(&mut self, last_size: usize) {
let alpha = self.cfg.ema_alpha;
let alpha = if alpha <= 0.0 {
0.05
} else if alpha > 1.0 {
1.0
} else {
alpha
};
self.ema_recent_size = alpha * (last_size as f64) + (1.0 - alpha) * self.ema_recent_size;
}
fn maybe_shrink(&mut self) {
let cap = self.buf.capacity();
let ema = self.ema_recent_size.max(1.0);
if cap <= self.cfg.min_shrink_threshold {
return;
}
if (cap as f64) >= ema * self.cfg.shrink_ratio_trigger {
let now = Instant::now();
if now.duration_since(self.last_shrink) >= self.cfg.shrink_cooldown {
let target = std::cmp::max(
self.cfg.min_capacity,
(ema * self.cfg.shrink_target_factor).ceil() as usize,
);
if target < cap {
self.do_shrink(target);
}
}
}
}
fn do_shrink(&mut self, target: usize) {
match std::panic::catch_unwind(|| BytesMut::with_capacity(target)) {
Ok(mut new_buf) => {
if !self.buf.is_empty() {
new_buf.extend_from_slice(&self.buf);
}
self.buf = new_buf;
self.last_shrink = Instant::now();
self.shrink_count = self.shrink_count.saturating_add(1);
self.last_capacity = target;
}
Err(_) => {
}
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
#[test]
fn test_expand_on_large_write() {
let mut eb = EncodeBuffer::new();
let initial_cap = eb.capacity();
let big = vec![0u8; initial_cap * 4 + 10];
eb.append(&big);
assert!(eb.capacity() >= big.len(), "capacity did not expand as expected");
let _ = eb.take_bytes();
assert_eq!(eb.len(), 0);
}
#[test]
fn test_shrink_after_spike() {
let cfg = EncodeBufferConfig {
min_capacity: 128,
ema_alpha: 0.3,
shrink_ratio_trigger: 3.0,
shrink_target_factor: 1.8,
shrink_cooldown: Duration::from_millis(5),
min_shrink_threshold: 256,
};
let mut eb = EncodeBuffer::with_config(cfg);
for i in 1..=10 {
eb.append(&vec![0u8; i * 100]);
let _ = eb.take_bytes();
}
let stats_after_increase = eb.stats();
println!("After increasing writes - EMA: {:.1}", stats_after_increase.ema_size);
assert!(
stats_after_increase.ema_size > 100.0 && stats_after_increase.ema_size < 1500.0,
"EMA should reflect the varying message sizes"
);
for _ in 0..30 {
eb.append(&[0u8; 20]);
let _ = eb.take_bytes();
std::thread::sleep(Duration::from_millis(1)); }
let stats_final = eb.stats();
println!(
"After small writes - EMA: {:.1}, capacity: {}, shrinks: {}",
stats_final.ema_size, stats_final.current_capacity, stats_final.shrink_count
);
assert!(
stats_final.ema_size < stats_after_increase.ema_size,
"EMA should decrease with small writes"
);
assert!(
stats_final.current_capacity >= 128,
"Capacity should not go below min_capacity"
);
}
#[test]
fn test_ema_and_no_shrink_on_stable_load() {
let cfg = EncodeBufferConfig {
min_capacity: 64,
ema_alpha: 0.5, shrink_ratio_trigger: 3.0, shrink_target_factor: 1.5,
shrink_cooldown: Duration::from_secs(1),
min_shrink_threshold: 256, };
let min_cap = cfg.min_capacity;
let mut eb = EncodeBuffer::with_config(cfg);
let initial_cap = eb.capacity();
println!("Initial capacity: {}", initial_cap);
for _ in 0..10 {
eb.append(&[0u8; 32]);
let _ = eb.take_bytes();
}
let stats = eb.stats();
println!("After 10 writes - capacity: {}, ema: {}", eb.capacity(), stats.ema_size);
assert!(
stats.ema_size > 1.0 && stats.ema_size < 1000.0,
"EMA should be reasonable"
);
assert!(eb.capacity() >= min_cap, "Capacity should not go below min_capacity");
assert!(eb.capacity() <= initial_cap * 2, "Capacity should not grow excessively");
assert_eq!(stats.shrink_count, 0, "Should not shrink with stable moderate load");
}
#[test]
fn test_no_jitter_under_flapping() {
let cfg = EncodeBufferConfig {
min_capacity: 32,
ema_alpha: 0.1, shrink_ratio_trigger: 2.0,
shrink_target_factor: 1.0,
shrink_cooldown: Duration::from_millis(50), min_shrink_threshold: 0,
};
let mut eb = EncodeBuffer::with_config(cfg);
for i in 0..200 {
if i % 50 == 0 {
eb.append(&[0u8; 4096]);
} else {
eb.append(&[0u8; 16]);
}
let _ = eb.take_bytes();
}
let stats = eb.stats();
assert!(stats.shrink_count <= 10, "too many shrinks => jitter");
}
}