rocketmq_remoting/smart_encode_buffer.rs
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17// smart_encode_buffer.rs
18//
19// High-performance adaptive EncodeBuffer with automatic shrink behavior.
20// - Expands as needed (using BytesMut growth rules).
21// - Tracks recent write sizes using EMA (exponential moving average).
22// - Shrinks conservatively based on EMA, ratio trigger, and cooldown to avoid jitter.
23// - Provides metrics for diagnostics.
24//
25
26use std::time::Duration;
27use std::time::Instant;
28
29use bytes::Bytes;
30use bytes::BytesMut;
31
32/// Configuration for EncodeBuffer adaptive behavior.
33#[derive(Debug, Clone)]
34pub struct EncodeBufferConfig {
35 /// Minimum capacity to keep (do not shrink below this).
36 pub min_capacity: usize,
37 /// EMA smoothing factor in (0,1]. Higher alpha -> more sensitive to recent
38 /// writes.
39 pub ema_alpha: f64,
40 /// Shrink trigger ratio: current_capacity >= ema_size *
41 /// shrink_ratio_trigger => consider shrink.
42 pub shrink_ratio_trigger: f64,
43 /// Shrink target factor: target_capacity = max(min_capacity, ema_size *
44 /// shrink_target_factor)
45 pub shrink_target_factor: f64,
46 /// Minimum time between two shrink operations (cooldown).
47 pub shrink_cooldown: Duration,
48 /// Absolute lower bound to consider shrink (if capacity <= this, do not
49 /// shrink).
50 pub min_shrink_threshold: usize,
51}
52
53impl Default for EncodeBufferConfig {
54 fn default() -> Self {
55 Self {
56 min_capacity: 8 * 1024, // 8 KB
57 ema_alpha: 0.05, // slow EMA to reduce jitter
58 shrink_ratio_trigger: 3.0, // conservative: capacity >= 3x recent need
59 shrink_target_factor: 1.5, // shrink to 1.5x of recent need
60 shrink_cooldown: Duration::from_secs(30),
61 min_shrink_threshold: 64 * 1024, // do not shrink small capacities (<64KB)
62 }
63 }
64}
65
66/// Runtime buffer statistics for diagnostics.
67#[derive(Debug, Clone)]
68pub struct BufferStats {
69 pub current_capacity: usize,
70 pub ema_size: f64,
71 pub shrink_count: u64,
72 pub expand_count: u64,
73 pub historical_max: usize,
74}
75
76/// EncodeBuffer: zero-copy friendly buffer for encoding operations with
77/// automatic, conservative shrinking to prevent long-term memory bloat.
78///
79/// Design goals:
80/// - Avoid frequent reallocation on normal workload (keep capacity after expansion).
81/// - Protect against single large spike causing permanent large allocation (occasional shrink).
82/// - Avoid shrink jitter by cooldown + EMA + absolute threshold.
83pub struct EncodeBuffer {
84 buf: BytesMut,
85 cfg: EncodeBufferConfig,
86
87 // Exponential moving average of recent write sizes (in bytes).
88 ema_recent_size: f64,
89 // Timestamp of last shrink operation.
90 last_shrink: Instant,
91 // The maximum len observed at runtime (diagnostic).
92 historical_max_len: usize,
93
94 // Diagnostics counters
95 shrink_count: u64,
96 expand_count: u64,
97 last_capacity: usize,
98}
99
100impl Default for EncodeBuffer {
101 fn default() -> Self {
102 Self::new()
103 }
104}
105
106impl EncodeBuffer {
107 /// Create a new EncodeBuffer with default configuration.
108 pub fn new() -> Self {
109 Self::with_config(EncodeBufferConfig::default())
110 }
111
112 /// Create a new EncodeBuffer with custom configuration.
113 pub fn with_config(cfg: EncodeBufferConfig) -> Self {
114 let initial = std::cmp::max(cfg.min_capacity, 8);
115 EncodeBuffer {
116 buf: BytesMut::with_capacity(initial),
117 cfg,
118 // Initialize EMA to a conservative value: twice the initial to avoid immediate shrink.
119 ema_recent_size: (initial * 2) as f64,
120 //allow immediate shrink if triggered
121 last_shrink: Instant::now() - Duration::from_secs(60),
122
123 historical_max_len: 0,
124 shrink_count: 0,
125 expand_count: 0,
126 last_capacity: initial,
127 }
128 }
129
130 /// Return current capacity.
131 #[inline]
132 pub fn capacity(&self) -> usize {
133 self.buf.capacity()
134 }
135
136 /// Return current length (amount of data written).
137 #[inline]
138 pub fn len(&self) -> usize {
139 self.buf.len()
140 }
141
142 /// Return whether buffer is empty.
143 #[inline]
144 pub fn is_empty(&self) -> bool {
145 self.buf.is_empty()
146 }
147
148 /// Append a slice into the buffer.
149 #[inline]
150 pub fn append(&mut self, data: &[u8]) {
151 self.track_expansion();
152 self.buf.extend_from_slice(data);
153 }
154
155 /// Append a Bytes value into the buffer (will copy into internal BytesMut).
156 /// Note: if you already have a complete chunk (single Bytes) to send,
157 /// send the Bytes directly instead of appending them to the buffer.
158 #[inline]
159 pub fn append_bytes(&mut self, bytes: &Bytes) {
160 self.track_expansion();
161 self.buf.extend_from_slice(bytes.as_ref());
162 }
163
164 /// Provide mutable access to inner BytesMut for in-place encoding.
165 /// Users that write directly should call `take_bytes` after writing.
166 #[inline]
167 pub fn buf_mut(&mut self) -> &mut BytesMut {
168 self.track_expansion();
169 &mut self.buf
170 }
171
172 /// Extract current contents as Bytes (zero-copy) and reset length to 0.
173 /// This also updates EMA and may trigger a shrink according to policy.
174 pub fn take_bytes(&mut self) -> Bytes {
175 let len = self.buf.len();
176 if len > self.historical_max_len {
177 self.historical_max_len = len;
178 }
179
180 let out = if len > 0 {
181 // split_to(len) returns the first len bytes and leaves self.buf.len() == 0.
182 // Note: After split_to, the remaining capacity becomes 0.
183 self.buf.split_to(len).freeze()
184 } else {
185 Bytes::new()
186 };
187
188 // Update EMA using the observed message size.
189 self.update_ema(len);
190
191 // Consider shrinking after updating EMA.
192 // Note: maybe_shrink will allocate a new buffer with appropriate capacity
193 self.maybe_shrink();
194
195 // After split_to, the remaining buffer may have reduced capacity.
196 // Ensure we have at least min_capacity for next write.
197 // Only do this if we didn't just shrink (shrink already sets capacity correctly).
198 let current_cap = self.buf.capacity();
199 if current_cap < self.cfg.min_capacity && self.buf.is_empty() {
200 // Replace the buffer with a new one at min_capacity
201 self.buf = BytesMut::with_capacity(self.cfg.min_capacity);
202 }
203
204 out
205 }
206
207 /// Force immediate shrink to configured min_capacity.
208 pub fn force_shrink_to_min(&mut self) {
209 let min = self.cfg.min_capacity;
210 if self.buf.capacity() > min {
211 self.do_shrink(min);
212 }
213 }
214
215 /// Return historic maximum recorded length.
216 pub fn historical_max(&self) -> usize {
217 self.historical_max_len
218 }
219
220 /// Return diagnostic stats.
221 pub fn stats(&self) -> BufferStats {
222 BufferStats {
223 current_capacity: self.capacity(),
224 ema_size: self.ema_recent_size,
225 shrink_count: self.shrink_count,
226 expand_count: self.expand_count,
227 historical_max: self.historical_max_len,
228 }
229 }
230
231 // --- internal helpers ---
232
233 /// Track capacity increases for diagnostics.
234 #[inline]
235 fn track_expansion(&mut self) {
236 let cur = self.buf.capacity();
237 if cur > self.last_capacity {
238 self.expand_count = self.expand_count.saturating_add(1);
239 self.last_capacity = cur;
240 }
241 }
242
243 /// Update the EMA with the last written size.
244 #[inline]
245 fn update_ema(&mut self, last_size: usize) {
246 let alpha = self.cfg.ema_alpha;
247 // guard for invalid alpha
248 let alpha = if alpha <= 0.0 {
249 0.05
250 } else if alpha > 1.0 {
251 1.0
252 } else {
253 alpha
254 };
255 self.ema_recent_size = alpha * (last_size as f64) + (1.0 - alpha) * self.ema_recent_size;
256 }
257
258 /// Decide whether to shrink and perform shrink if conditions are met.
259 fn maybe_shrink(&mut self) {
260 let cap = self.buf.capacity();
261 let ema = self.ema_recent_size.max(1.0);
262
263 // Do not attempt shrink below an absolute threshold to avoid
264 // over-fragmentation.
265 if cap <= self.cfg.min_shrink_threshold {
266 return;
267 }
268
269 // Conservative trigger: only shrink when capacity >= ema * ratio.
270 if (cap as f64) >= ema * self.cfg.shrink_ratio_trigger {
271 let now = Instant::now();
272 if now.duration_since(self.last_shrink) >= self.cfg.shrink_cooldown {
273 // Compute target capacity (rounded up), enforce min_capacity lower bound.
274 let target = std::cmp::max(
275 self.cfg.min_capacity,
276 (ema * self.cfg.shrink_target_factor).ceil() as usize,
277 );
278
279 if target < cap {
280 self.do_shrink(target);
281 }
282 }
283 }
284 }
285
286 /// Perform the actual shrink: allocate a new BytesMut with target capacity.
287 /// If allocation fails (unlikely), we keep the old buffer to preserve
288 /// correctness.
289 fn do_shrink(&mut self, target: usize) {
290 // Attempt to allocate in a panic-safe way.
291 match std::panic::catch_unwind(|| BytesMut::with_capacity(target)) {
292 Ok(mut new_buf) => {
293 // If there are leftover bytes (should be none after take_bytes), copy them.
294 if !self.buf.is_empty() {
295 new_buf.extend_from_slice(&self.buf);
296 }
297 self.buf = new_buf;
298 self.last_shrink = Instant::now();
299 self.shrink_count = self.shrink_count.saturating_add(1);
300 self.last_capacity = target;
301 }
302 Err(_) => {
303 // Allocation panic occurred; do nothing to preserve existing
304 // buffer.
305 }
306 }
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use std::time::Duration;
313
314 use super::*;
315
316 /// Test that buffer expands when large writes occur.
317 #[test]
318 fn test_expand_on_large_write() {
319 let mut eb = EncodeBuffer::new();
320 let initial_cap = eb.capacity();
321 // write bigger than initial capacity
322 let big = vec![0u8; initial_cap * 4 + 10];
323 eb.append(&big);
324 assert!(
325 eb.capacity() >= big.len(),
326 "capacity did not expand as expected"
327 );
328 let _ = eb.take_bytes();
329 assert_eq!(eb.len(), 0);
330 }
331
332 /// Test that EMA tracks message sizes and shrink logic is sound.
333 #[test]
334 fn test_shrink_after_spike() {
335 // Configure with settings that allow shrinking
336 let cfg = EncodeBufferConfig {
337 min_capacity: 128,
338 ema_alpha: 0.3,
339 shrink_ratio_trigger: 3.0,
340 shrink_target_factor: 1.8,
341 shrink_cooldown: Duration::from_millis(5),
342 min_shrink_threshold: 256,
343 };
344
345 let mut eb = EncodeBuffer::with_config(cfg);
346
347 // Write increasing sizes to build up EMA
348 for i in 1..=10 {
349 eb.append(&vec![0u8; i * 100]);
350 let _ = eb.take_bytes();
351 }
352
353 let stats_after_increase = eb.stats();
354 println!(
355 "After increasing writes - EMA: {:.1}",
356 stats_after_increase.ema_size
357 );
358
359 // EMA should be somewhere in the middle range
360 assert!(
361 stats_after_increase.ema_size > 100.0 && stats_after_increase.ema_size < 1500.0,
362 "EMA should reflect the varying message sizes"
363 );
364
365 // Now switch to very small writes
366 for _ in 0..30 {
367 eb.append(&[0u8; 20]);
368 let _ = eb.take_bytes();
369 std::thread::sleep(Duration::from_millis(1)); // Allow cooldown
370 }
371
372 let stats_final = eb.stats();
373 println!(
374 "After small writes - EMA: {:.1}, capacity: {}, shrinks: {}",
375 stats_final.ema_size, stats_final.current_capacity, stats_final.shrink_count
376 );
377
378 // EMA should have decreased significantly
379 assert!(
380 stats_final.ema_size < stats_after_increase.ema_size,
381 "EMA should decrease with small writes"
382 );
383 // Capacity should respect min_capacity
384 assert!(
385 stats_final.current_capacity >= 128,
386 "Capacity should not go below min_capacity"
387 );
388 }
389
390 /// Test EMA updates and stability (no excessive shrink when series of moderate
391 /// writes).
392 #[test]
393 fn test_ema_and_no_shrink_on_stable_load() {
394 let cfg = EncodeBufferConfig {
395 min_capacity: 64,
396 ema_alpha: 0.5, // faster response for test
397 shrink_ratio_trigger: 3.0, // higher ratio to avoid premature shrink
398 shrink_target_factor: 1.5,
399 shrink_cooldown: Duration::from_secs(1),
400 min_shrink_threshold: 256, // prevent shrink for small capacities
401 };
402
403 let min_cap = cfg.min_capacity;
404 let mut eb = EncodeBuffer::with_config(cfg);
405 let initial_cap = eb.capacity();
406 println!("Initial capacity: {}", initial_cap);
407
408 // simulate moderate writes
409 for _ in 0..10 {
410 eb.append(&[0u8; 32]);
411 let _ = eb.take_bytes();
412 }
413 let stats = eb.stats();
414 println!(
415 "After 10 writes - capacity: {}, ema: {}",
416 eb.capacity(),
417 stats.ema_size
418 );
419
420 // EMA should converge towards 32 (approx)
421 assert!(
422 stats.ema_size > 1.0 && stats.ema_size < 1000.0,
423 "EMA should be reasonable"
424 );
425 // Capacity should remain stable and not shrink below min_capacity
426 assert!(
427 eb.capacity() >= min_cap,
428 "Capacity should not go below min_capacity"
429 );
430 // With stable moderate load, capacity should be stable
431 assert!(
432 eb.capacity() <= initial_cap * 2,
433 "Capacity should not grow excessively"
434 );
435 // Should not have shrunk with stable load
436 assert_eq!(
437 stats.shrink_count, 0,
438 "Should not shrink with stable moderate load"
439 );
440 }
441
442 /// Jitter test: ensure frequent tiny spikes do not cause frequent shrinks.
443 #[test]
444 fn test_no_jitter_under_flapping() {
445 let cfg = EncodeBufferConfig {
446 min_capacity: 32,
447 ema_alpha: 0.1, // slow EMA to resist jitter
448 shrink_ratio_trigger: 2.0,
449 shrink_target_factor: 1.0,
450 shrink_cooldown: Duration::from_millis(50), // small cooldown for test
451 min_shrink_threshold: 0,
452 };
453
454 let mut eb = EncodeBuffer::with_config(cfg);
455 // simulate occasional large spikes but mostly small writes
456 for i in 0..200 {
457 if i % 50 == 0 {
458 // large spike
459 eb.append(&[0u8; 4096]);
460 } else {
461 eb.append(&[0u8; 16]);
462 }
463 let _ = eb.take_bytes();
464 }
465
466 let stats = eb.stats();
467 // shrink_count should be relatively small (0 or a few), not many
468 assert!(stats.shrink_count <= 10, "too many shrinks => jitter");
469 }
470}