rocketmq_remoting/smart_encode_buffer.rs
1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// smart_encode_buffer.rs
16//
17// High-performance adaptive EncodeBuffer with automatic shrink behavior.
18// - Expands as needed (using BytesMut growth rules).
19// - Tracks recent write sizes using EMA (exponential moving average).
20// - Shrinks conservatively based on EMA, ratio trigger, and cooldown to avoid jitter.
21// - Provides metrics for diagnostics.
22//
23
24use std::time::Duration;
25use std::time::Instant;
26
27use bytes::Bytes;
28use bytes::BytesMut;
29
30/// Configuration for EncodeBuffer adaptive behavior.
31#[derive(Debug, Clone)]
32pub struct EncodeBufferConfig {
33 /// Minimum capacity to keep (do not shrink below this).
34 pub min_capacity: usize,
35 /// EMA smoothing factor in (0,1]. Higher alpha -> more sensitive to recent
36 /// writes.
37 pub ema_alpha: f64,
38 /// Shrink trigger ratio: current_capacity >= ema_size *
39 /// shrink_ratio_trigger => consider shrink.
40 pub shrink_ratio_trigger: f64,
41 /// Shrink target factor: target_capacity = max(min_capacity, ema_size *
42 /// shrink_target_factor)
43 pub shrink_target_factor: f64,
44 /// Minimum time between two shrink operations (cooldown).
45 pub shrink_cooldown: Duration,
46 /// Absolute lower bound to consider shrink (if capacity <= this, do not
47 /// shrink).
48 pub min_shrink_threshold: usize,
49}
50
51impl Default for EncodeBufferConfig {
52 fn default() -> Self {
53 Self {
54 min_capacity: 8 * 1024, // 8 KB
55 ema_alpha: 0.05, // slow EMA to reduce jitter
56 shrink_ratio_trigger: 3.0, // conservative: capacity >= 3x recent need
57 shrink_target_factor: 1.5, // shrink to 1.5x of recent need
58 shrink_cooldown: Duration::from_secs(30),
59 min_shrink_threshold: 64 * 1024, // do not shrink small capacities (<64KB)
60 }
61 }
62}
63
64/// Runtime buffer statistics for diagnostics.
65#[derive(Debug, Clone)]
66pub struct BufferStats {
67 pub current_capacity: usize,
68 pub ema_size: f64,
69 pub shrink_count: u64,
70 pub expand_count: u64,
71 pub historical_max: usize,
72}
73
74/// EncodeBuffer: zero-copy friendly buffer for encoding operations with
75/// automatic, conservative shrinking to prevent long-term memory bloat.
76///
77/// Design goals:
78/// - Avoid frequent reallocation on normal workload (keep capacity after expansion).
79/// - Protect against single large spike causing permanent large allocation (occasional shrink).
80/// - Avoid shrink jitter by cooldown + EMA + absolute threshold.
81pub struct EncodeBuffer {
82 buf: BytesMut,
83 cfg: EncodeBufferConfig,
84
85 // Exponential moving average of recent write sizes (in bytes).
86 ema_recent_size: f64,
87 // Timestamp of last shrink operation.
88 last_shrink: Instant,
89 // The maximum len observed at runtime (diagnostic).
90 historical_max_len: usize,
91
92 // Diagnostics counters
93 shrink_count: u64,
94 expand_count: u64,
95 last_capacity: usize,
96}
97
98impl Default for EncodeBuffer {
99 fn default() -> Self {
100 Self::new()
101 }
102}
103
104impl EncodeBuffer {
105 /// Create a new EncodeBuffer with default configuration.
106 pub fn new() -> Self {
107 Self::with_config(EncodeBufferConfig::default())
108 }
109
110 /// Create a new EncodeBuffer with custom configuration.
111 pub fn with_config(cfg: EncodeBufferConfig) -> Self {
112 let initial = std::cmp::max(cfg.min_capacity, 8);
113 EncodeBuffer {
114 buf: BytesMut::with_capacity(initial),
115 cfg,
116 // Initialize EMA to a conservative value: twice the initial to avoid immediate shrink.
117 ema_recent_size: (initial * 2) as f64,
118 //allow immediate shrink if triggered
119 last_shrink: Instant::now() - Duration::from_secs(60),
120
121 historical_max_len: 0,
122 shrink_count: 0,
123 expand_count: 0,
124 last_capacity: initial,
125 }
126 }
127
128 /// Return current capacity.
129 #[inline]
130 pub fn capacity(&self) -> usize {
131 self.buf.capacity()
132 }
133
134 /// Return current length (amount of data written).
135 #[inline]
136 pub fn len(&self) -> usize {
137 self.buf.len()
138 }
139
140 /// Return whether buffer is empty.
141 #[inline]
142 pub fn is_empty(&self) -> bool {
143 self.buf.is_empty()
144 }
145
146 /// Append a slice into the buffer.
147 #[inline]
148 pub fn append(&mut self, data: &[u8]) {
149 self.track_expansion();
150 self.buf.extend_from_slice(data);
151 }
152
153 /// Append a Bytes value into the buffer (will copy into internal BytesMut).
154 /// Note: if you already have a complete chunk (single Bytes) to send,
155 /// send the Bytes directly instead of appending them to the buffer.
156 #[inline]
157 pub fn append_bytes(&mut self, bytes: &Bytes) {
158 self.track_expansion();
159 self.buf.extend_from_slice(bytes.as_ref());
160 }
161
162 /// Provide mutable access to inner BytesMut for in-place encoding.
163 /// Users that write directly should call `take_bytes` after writing.
164 #[inline]
165 pub fn buf_mut(&mut self) -> &mut BytesMut {
166 self.track_expansion();
167 &mut self.buf
168 }
169
170 /// Extract current contents as Bytes (zero-copy) and reset length to 0.
171 /// This also updates EMA and may trigger a shrink according to policy.
172 pub fn take_bytes(&mut self) -> Bytes {
173 let len = self.buf.len();
174 if len > self.historical_max_len {
175 self.historical_max_len = len;
176 }
177
178 let out = if len > 0 {
179 // split_to(len) returns the first len bytes and leaves self.buf.len() == 0.
180 // Note: After split_to, the remaining capacity becomes 0.
181 self.buf.split_to(len).freeze()
182 } else {
183 Bytes::new()
184 };
185
186 // Update EMA using the observed message size.
187 self.update_ema(len);
188
189 // Consider shrinking after updating EMA.
190 // Note: maybe_shrink will allocate a new buffer with appropriate capacity
191 self.maybe_shrink();
192
193 // After split_to, the remaining buffer may have reduced capacity.
194 // Ensure we have at least min_capacity for next write.
195 // Only do this if we didn't just shrink (shrink already sets capacity correctly).
196 let current_cap = self.buf.capacity();
197 if current_cap < self.cfg.min_capacity && self.buf.is_empty() {
198 // Replace the buffer with a new one at min_capacity
199 self.buf = BytesMut::with_capacity(self.cfg.min_capacity);
200 }
201
202 out
203 }
204
205 /// Force immediate shrink to configured min_capacity.
206 pub fn force_shrink_to_min(&mut self) {
207 let min = self.cfg.min_capacity;
208 if self.buf.capacity() > min {
209 self.do_shrink(min);
210 }
211 }
212
213 /// Return historic maximum recorded length.
214 pub fn historical_max(&self) -> usize {
215 self.historical_max_len
216 }
217
218 /// Return diagnostic stats.
219 pub fn stats(&self) -> BufferStats {
220 BufferStats {
221 current_capacity: self.capacity(),
222 ema_size: self.ema_recent_size,
223 shrink_count: self.shrink_count,
224 expand_count: self.expand_count,
225 historical_max: self.historical_max_len,
226 }
227 }
228
229 // --- internal helpers ---
230
231 /// Track capacity increases for diagnostics.
232 #[inline]
233 fn track_expansion(&mut self) {
234 let cur = self.buf.capacity();
235 if cur > self.last_capacity {
236 self.expand_count = self.expand_count.saturating_add(1);
237 self.last_capacity = cur;
238 }
239 }
240
241 /// Update the EMA with the last written size.
242 #[inline]
243 fn update_ema(&mut self, last_size: usize) {
244 let alpha = self.cfg.ema_alpha;
245 // guard for invalid alpha
246 let alpha = if alpha <= 0.0 {
247 0.05
248 } else if alpha > 1.0 {
249 1.0
250 } else {
251 alpha
252 };
253 self.ema_recent_size = alpha * (last_size as f64) + (1.0 - alpha) * self.ema_recent_size;
254 }
255
256 /// Decide whether to shrink and perform shrink if conditions are met.
257 fn maybe_shrink(&mut self) {
258 let cap = self.buf.capacity();
259 let ema = self.ema_recent_size.max(1.0);
260
261 // Do not attempt shrink below an absolute threshold to avoid
262 // over-fragmentation.
263 if cap <= self.cfg.min_shrink_threshold {
264 return;
265 }
266
267 // Conservative trigger: only shrink when capacity >= ema * ratio.
268 if (cap as f64) >= ema * self.cfg.shrink_ratio_trigger {
269 let now = Instant::now();
270 if now.duration_since(self.last_shrink) >= self.cfg.shrink_cooldown {
271 // Compute target capacity (rounded up), enforce min_capacity lower bound.
272 let target = std::cmp::max(
273 self.cfg.min_capacity,
274 (ema * self.cfg.shrink_target_factor).ceil() as usize,
275 );
276
277 if target < cap {
278 self.do_shrink(target);
279 }
280 }
281 }
282 }
283
284 /// Perform the actual shrink: allocate a new BytesMut with target capacity.
285 /// If allocation fails (unlikely), we keep the old buffer to preserve
286 /// correctness.
287 fn do_shrink(&mut self, target: usize) {
288 // Attempt to allocate in a panic-safe way.
289 match std::panic::catch_unwind(|| BytesMut::with_capacity(target)) {
290 Ok(mut new_buf) => {
291 // If there are leftover bytes (should be none after take_bytes), copy them.
292 if !self.buf.is_empty() {
293 new_buf.extend_from_slice(&self.buf);
294 }
295 self.buf = new_buf;
296 self.last_shrink = Instant::now();
297 self.shrink_count = self.shrink_count.saturating_add(1);
298 self.last_capacity = target;
299 }
300 Err(_) => {
301 // Allocation panic occurred; do nothing to preserve existing
302 // buffer.
303 }
304 }
305 }
306}
307
308#[cfg(test)]
309mod tests {
310 use std::time::Duration;
311
312 use super::*;
313
314 /// Test that buffer expands when large writes occur.
315 #[test]
316 fn test_expand_on_large_write() {
317 let mut eb = EncodeBuffer::new();
318 let initial_cap = eb.capacity();
319 // write bigger than initial capacity
320 let big = vec![0u8; initial_cap * 4 + 10];
321 eb.append(&big);
322 assert!(eb.capacity() >= big.len(), "capacity did not expand as expected");
323 let _ = eb.take_bytes();
324 assert_eq!(eb.len(), 0);
325 }
326
327 /// Test that EMA tracks message sizes and shrink logic is sound.
328 #[test]
329 fn test_shrink_after_spike() {
330 // Configure with settings that allow shrinking
331 let cfg = EncodeBufferConfig {
332 min_capacity: 128,
333 ema_alpha: 0.3,
334 shrink_ratio_trigger: 3.0,
335 shrink_target_factor: 1.8,
336 shrink_cooldown: Duration::from_millis(5),
337 min_shrink_threshold: 256,
338 };
339
340 let mut eb = EncodeBuffer::with_config(cfg);
341
342 // Write increasing sizes to build up EMA
343 for i in 1..=10 {
344 eb.append(&vec![0u8; i * 100]);
345 let _ = eb.take_bytes();
346 }
347
348 let stats_after_increase = eb.stats();
349 println!("After increasing writes - EMA: {:.1}", stats_after_increase.ema_size);
350
351 // EMA should be somewhere in the middle range
352 assert!(
353 stats_after_increase.ema_size > 100.0 && stats_after_increase.ema_size < 1500.0,
354 "EMA should reflect the varying message sizes"
355 );
356
357 // Now switch to very small writes
358 for _ in 0..30 {
359 eb.append(&[0u8; 20]);
360 let _ = eb.take_bytes();
361 std::thread::sleep(Duration::from_millis(1)); // Allow cooldown
362 }
363
364 let stats_final = eb.stats();
365 println!(
366 "After small writes - EMA: {:.1}, capacity: {}, shrinks: {}",
367 stats_final.ema_size, stats_final.current_capacity, stats_final.shrink_count
368 );
369
370 // EMA should have decreased significantly
371 assert!(
372 stats_final.ema_size < stats_after_increase.ema_size,
373 "EMA should decrease with small writes"
374 );
375 // Capacity should respect min_capacity
376 assert!(
377 stats_final.current_capacity >= 128,
378 "Capacity should not go below min_capacity"
379 );
380 }
381
382 /// Test EMA updates and stability (no excessive shrink when series of moderate
383 /// writes).
384 #[test]
385 fn test_ema_and_no_shrink_on_stable_load() {
386 let cfg = EncodeBufferConfig {
387 min_capacity: 64,
388 ema_alpha: 0.5, // faster response for test
389 shrink_ratio_trigger: 3.0, // higher ratio to avoid premature shrink
390 shrink_target_factor: 1.5,
391 shrink_cooldown: Duration::from_secs(1),
392 min_shrink_threshold: 256, // prevent shrink for small capacities
393 };
394
395 let min_cap = cfg.min_capacity;
396 let mut eb = EncodeBuffer::with_config(cfg);
397 let initial_cap = eb.capacity();
398 println!("Initial capacity: {}", initial_cap);
399
400 // simulate moderate writes
401 for _ in 0..10 {
402 eb.append(&[0u8; 32]);
403 let _ = eb.take_bytes();
404 }
405 let stats = eb.stats();
406 println!("After 10 writes - capacity: {}, ema: {}", eb.capacity(), stats.ema_size);
407
408 // EMA should converge towards 32 (approx)
409 assert!(
410 stats.ema_size > 1.0 && stats.ema_size < 1000.0,
411 "EMA should be reasonable"
412 );
413 // Capacity should remain stable and not shrink below min_capacity
414 assert!(eb.capacity() >= min_cap, "Capacity should not go below min_capacity");
415 // With stable moderate load, capacity should be stable
416 assert!(eb.capacity() <= initial_cap * 2, "Capacity should not grow excessively");
417 // Should not have shrunk with stable load
418 assert_eq!(stats.shrink_count, 0, "Should not shrink with stable moderate load");
419 }
420
421 /// Jitter test: ensure frequent tiny spikes do not cause frequent shrinks.
422 #[test]
423 fn test_no_jitter_under_flapping() {
424 let cfg = EncodeBufferConfig {
425 min_capacity: 32,
426 ema_alpha: 0.1, // slow EMA to resist jitter
427 shrink_ratio_trigger: 2.0,
428 shrink_target_factor: 1.0,
429 shrink_cooldown: Duration::from_millis(50), // small cooldown for test
430 min_shrink_threshold: 0,
431 };
432
433 let mut eb = EncodeBuffer::with_config(cfg);
434 // simulate occasional large spikes but mostly small writes
435 for i in 0..200 {
436 if i % 50 == 0 {
437 // large spike
438 eb.append(&[0u8; 4096]);
439 } else {
440 eb.append(&[0u8; 16]);
441 }
442 let _ = eb.take_bytes();
443 }
444
445 let stats = eb.stats();
446 // shrink_count should be relatively small (0 or a few), not many
447 assert!(stats.shrink_count <= 10, "too many shrinks => jitter");
448 }
449}