rocketmq_remoting/
smart_encode_buffer.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17// smart_encode_buffer.rs
18//
19// High-performance adaptive EncodeBuffer with automatic shrink behavior.
20// - Expands as needed (using BytesMut growth rules).
21// - Tracks recent write sizes using EMA (exponential moving average).
22// - Shrinks conservatively based on EMA, ratio trigger, and cooldown to avoid jitter.
23// - Provides metrics for diagnostics.
24//
25
26use std::time::Duration;
27use std::time::Instant;
28
29use bytes::Bytes;
30use bytes::BytesMut;
31
32/// Configuration for EncodeBuffer adaptive behavior.
33#[derive(Debug, Clone)]
34pub struct EncodeBufferConfig {
35    /// Minimum capacity to keep (do not shrink below this).
36    pub min_capacity: usize,
37    /// EMA smoothing factor in (0,1]. Higher alpha -> more sensitive to recent
38    /// writes.
39    pub ema_alpha: f64,
40    /// Shrink trigger ratio: current_capacity >= ema_size *
41    /// shrink_ratio_trigger => consider shrink.
42    pub shrink_ratio_trigger: f64,
43    /// Shrink target factor: target_capacity = max(min_capacity, ema_size *
44    /// shrink_target_factor)
45    pub shrink_target_factor: f64,
46    /// Minimum time between two shrink operations (cooldown).
47    pub shrink_cooldown: Duration,
48    /// Absolute lower bound to consider shrink (if capacity <= this, do not
49    /// shrink).
50    pub min_shrink_threshold: usize,
51}
52
53impl Default for EncodeBufferConfig {
54    fn default() -> Self {
55        Self {
56            min_capacity: 8 * 1024,    // 8 KB
57            ema_alpha: 0.05,           // slow EMA to reduce jitter
58            shrink_ratio_trigger: 3.0, // conservative: capacity >= 3x recent need
59            shrink_target_factor: 1.5, // shrink to 1.5x of recent need
60            shrink_cooldown: Duration::from_secs(30),
61            min_shrink_threshold: 64 * 1024, // do not shrink small capacities (<64KB)
62        }
63    }
64}
65
66/// Runtime buffer statistics for diagnostics.
67#[derive(Debug, Clone)]
68pub struct BufferStats {
69    pub current_capacity: usize,
70    pub ema_size: f64,
71    pub shrink_count: u64,
72    pub expand_count: u64,
73    pub historical_max: usize,
74}
75
76/// EncodeBuffer: zero-copy friendly buffer for encoding operations with
77/// automatic, conservative shrinking to prevent long-term memory bloat.
78///
79/// Design goals:
80/// - Avoid frequent reallocation on normal workload (keep capacity after expansion).
81/// - Protect against single large spike causing permanent large allocation (occasional shrink).
82/// - Avoid shrink jitter by cooldown + EMA + absolute threshold.
83pub struct EncodeBuffer {
84    buf: BytesMut,
85    cfg: EncodeBufferConfig,
86
87    // Exponential moving average of recent write sizes (in bytes).
88    ema_recent_size: f64,
89    // Timestamp of last shrink operation.
90    last_shrink: Instant,
91    // The maximum len observed at runtime (diagnostic).
92    historical_max_len: usize,
93
94    // Diagnostics counters
95    shrink_count: u64,
96    expand_count: u64,
97    last_capacity: usize,
98}
99
100impl Default for EncodeBuffer {
101    fn default() -> Self {
102        Self::new()
103    }
104}
105
106impl EncodeBuffer {
107    /// Create a new EncodeBuffer with default configuration.
108    pub fn new() -> Self {
109        Self::with_config(EncodeBufferConfig::default())
110    }
111
112    /// Create a new EncodeBuffer with custom configuration.
113    pub fn with_config(cfg: EncodeBufferConfig) -> Self {
114        let initial = std::cmp::max(cfg.min_capacity, 8);
115        EncodeBuffer {
116            buf: BytesMut::with_capacity(initial),
117            cfg,
118            // Initialize EMA to a conservative value: twice the initial to avoid immediate shrink.
119            ema_recent_size: (initial * 2) as f64,
120            //allow immediate shrink if triggered
121            last_shrink: Instant::now() - Duration::from_secs(60),
122
123            historical_max_len: 0,
124            shrink_count: 0,
125            expand_count: 0,
126            last_capacity: initial,
127        }
128    }
129
130    /// Return current capacity.
131    #[inline]
132    pub fn capacity(&self) -> usize {
133        self.buf.capacity()
134    }
135
136    /// Return current length (amount of data written).
137    #[inline]
138    pub fn len(&self) -> usize {
139        self.buf.len()
140    }
141
142    /// Return whether buffer is empty.
143    #[inline]
144    pub fn is_empty(&self) -> bool {
145        self.buf.is_empty()
146    }
147
148    /// Append a slice into the buffer.
149    #[inline]
150    pub fn append(&mut self, data: &[u8]) {
151        self.track_expansion();
152        self.buf.extend_from_slice(data);
153    }
154
155    /// Append a Bytes value into the buffer (will copy into internal BytesMut).
156    /// Note: if you already have a complete chunk (single Bytes) to send,
157    /// send the Bytes directly instead of appending them to the buffer.
158    #[inline]
159    pub fn append_bytes(&mut self, bytes: &Bytes) {
160        self.track_expansion();
161        self.buf.extend_from_slice(bytes.as_ref());
162    }
163
164    /// Provide mutable access to inner BytesMut for in-place encoding.
165    /// Users that write directly should call `take_bytes` after writing.
166    #[inline]
167    pub fn buf_mut(&mut self) -> &mut BytesMut {
168        self.track_expansion();
169        &mut self.buf
170    }
171
172    /// Extract current contents as Bytes (zero-copy) and reset length to 0.
173    /// This also updates EMA and may trigger a shrink according to policy.
174    pub fn take_bytes(&mut self) -> Bytes {
175        let len = self.buf.len();
176        if len > self.historical_max_len {
177            self.historical_max_len = len;
178        }
179
180        let out = if len > 0 {
181            // split_to(len) returns the first len bytes and leaves self.buf.len() == 0.
182            // Note: After split_to, the remaining capacity becomes 0.
183            self.buf.split_to(len).freeze()
184        } else {
185            Bytes::new()
186        };
187
188        // Update EMA using the observed message size.
189        self.update_ema(len);
190
191        // Consider shrinking after updating EMA.
192        // Note: maybe_shrink will allocate a new buffer with appropriate capacity
193        self.maybe_shrink();
194
195        // After split_to, the remaining buffer may have reduced capacity.
196        // Ensure we have at least min_capacity for next write.
197        // Only do this if we didn't just shrink (shrink already sets capacity correctly).
198        let current_cap = self.buf.capacity();
199        if current_cap < self.cfg.min_capacity && self.buf.is_empty() {
200            // Replace the buffer with a new one at min_capacity
201            self.buf = BytesMut::with_capacity(self.cfg.min_capacity);
202        }
203
204        out
205    }
206
207    /// Force immediate shrink to configured min_capacity.
208    pub fn force_shrink_to_min(&mut self) {
209        let min = self.cfg.min_capacity;
210        if self.buf.capacity() > min {
211            self.do_shrink(min);
212        }
213    }
214
215    /// Return historic maximum recorded length.
216    pub fn historical_max(&self) -> usize {
217        self.historical_max_len
218    }
219
220    /// Return diagnostic stats.
221    pub fn stats(&self) -> BufferStats {
222        BufferStats {
223            current_capacity: self.capacity(),
224            ema_size: self.ema_recent_size,
225            shrink_count: self.shrink_count,
226            expand_count: self.expand_count,
227            historical_max: self.historical_max_len,
228        }
229    }
230
231    // --- internal helpers ---
232
233    /// Track capacity increases for diagnostics.
234    #[inline]
235    fn track_expansion(&mut self) {
236        let cur = self.buf.capacity();
237        if cur > self.last_capacity {
238            self.expand_count = self.expand_count.saturating_add(1);
239            self.last_capacity = cur;
240        }
241    }
242
243    /// Update the EMA with the last written size.
244    #[inline]
245    fn update_ema(&mut self, last_size: usize) {
246        let alpha = self.cfg.ema_alpha;
247        // guard for invalid alpha
248        let alpha = if alpha <= 0.0 {
249            0.05
250        } else if alpha > 1.0 {
251            1.0
252        } else {
253            alpha
254        };
255        self.ema_recent_size = alpha * (last_size as f64) + (1.0 - alpha) * self.ema_recent_size;
256    }
257
258    /// Decide whether to shrink and perform shrink if conditions are met.
259    fn maybe_shrink(&mut self) {
260        let cap = self.buf.capacity();
261        let ema = self.ema_recent_size.max(1.0);
262
263        // Do not attempt shrink below an absolute threshold to avoid
264        // over-fragmentation.
265        if cap <= self.cfg.min_shrink_threshold {
266            return;
267        }
268
269        // Conservative trigger: only shrink when capacity >= ema * ratio.
270        if (cap as f64) >= ema * self.cfg.shrink_ratio_trigger {
271            let now = Instant::now();
272            if now.duration_since(self.last_shrink) >= self.cfg.shrink_cooldown {
273                // Compute target capacity (rounded up), enforce min_capacity lower bound.
274                let target = std::cmp::max(
275                    self.cfg.min_capacity,
276                    (ema * self.cfg.shrink_target_factor).ceil() as usize,
277                );
278
279                if target < cap {
280                    self.do_shrink(target);
281                }
282            }
283        }
284    }
285
286    /// Perform the actual shrink: allocate a new BytesMut with target capacity.
287    /// If allocation fails (unlikely), we keep the old buffer to preserve
288    /// correctness.
289    fn do_shrink(&mut self, target: usize) {
290        // Attempt to allocate in a panic-safe way.
291        match std::panic::catch_unwind(|| BytesMut::with_capacity(target)) {
292            Ok(mut new_buf) => {
293                // If there are leftover bytes (should be none after take_bytes), copy them.
294                if !self.buf.is_empty() {
295                    new_buf.extend_from_slice(&self.buf);
296                }
297                self.buf = new_buf;
298                self.last_shrink = Instant::now();
299                self.shrink_count = self.shrink_count.saturating_add(1);
300                self.last_capacity = target;
301            }
302            Err(_) => {
303                // Allocation panic occurred; do nothing to preserve existing
304                // buffer.
305            }
306        }
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use std::time::Duration;
313
314    use super::*;
315
316    /// Test that buffer expands when large writes occur.
317    #[test]
318    fn test_expand_on_large_write() {
319        let mut eb = EncodeBuffer::new();
320        let initial_cap = eb.capacity();
321        // write bigger than initial capacity
322        let big = vec![0u8; initial_cap * 4 + 10];
323        eb.append(&big);
324        assert!(
325            eb.capacity() >= big.len(),
326            "capacity did not expand as expected"
327        );
328        let _ = eb.take_bytes();
329        assert_eq!(eb.len(), 0);
330    }
331
332    /// Test that EMA tracks message sizes and shrink logic is sound.
333    #[test]
334    fn test_shrink_after_spike() {
335        // Configure with settings that allow shrinking
336        let cfg = EncodeBufferConfig {
337            min_capacity: 128,
338            ema_alpha: 0.3,
339            shrink_ratio_trigger: 3.0,
340            shrink_target_factor: 1.8,
341            shrink_cooldown: Duration::from_millis(5),
342            min_shrink_threshold: 256,
343        };
344
345        let mut eb = EncodeBuffer::with_config(cfg);
346
347        // Write increasing sizes to build up EMA
348        for i in 1..=10 {
349            eb.append(&vec![0u8; i * 100]);
350            let _ = eb.take_bytes();
351        }
352
353        let stats_after_increase = eb.stats();
354        println!(
355            "After increasing writes - EMA: {:.1}",
356            stats_after_increase.ema_size
357        );
358
359        // EMA should be somewhere in the middle range
360        assert!(
361            stats_after_increase.ema_size > 100.0 && stats_after_increase.ema_size < 1500.0,
362            "EMA should reflect the varying message sizes"
363        );
364
365        // Now switch to very small writes
366        for _ in 0..30 {
367            eb.append(&[0u8; 20]);
368            let _ = eb.take_bytes();
369            std::thread::sleep(Duration::from_millis(1)); // Allow cooldown
370        }
371
372        let stats_final = eb.stats();
373        println!(
374            "After small writes - EMA: {:.1}, capacity: {}, shrinks: {}",
375            stats_final.ema_size, stats_final.current_capacity, stats_final.shrink_count
376        );
377
378        // EMA should have decreased significantly
379        assert!(
380            stats_final.ema_size < stats_after_increase.ema_size,
381            "EMA should decrease with small writes"
382        );
383        // Capacity should respect min_capacity
384        assert!(
385            stats_final.current_capacity >= 128,
386            "Capacity should not go below min_capacity"
387        );
388    }
389
390    /// Test EMA updates and stability (no excessive shrink when series of moderate
391    /// writes).
392    #[test]
393    fn test_ema_and_no_shrink_on_stable_load() {
394        let cfg = EncodeBufferConfig {
395            min_capacity: 64,
396            ema_alpha: 0.5,            // faster response for test
397            shrink_ratio_trigger: 3.0, // higher ratio to avoid premature shrink
398            shrink_target_factor: 1.5,
399            shrink_cooldown: Duration::from_secs(1),
400            min_shrink_threshold: 256, // prevent shrink for small capacities
401        };
402
403        let min_cap = cfg.min_capacity;
404        let mut eb = EncodeBuffer::with_config(cfg);
405        let initial_cap = eb.capacity();
406        println!("Initial capacity: {}", initial_cap);
407
408        // simulate moderate writes
409        for _ in 0..10 {
410            eb.append(&[0u8; 32]);
411            let _ = eb.take_bytes();
412        }
413        let stats = eb.stats();
414        println!(
415            "After 10 writes - capacity: {}, ema: {}",
416            eb.capacity(),
417            stats.ema_size
418        );
419
420        // EMA should converge towards 32 (approx)
421        assert!(
422            stats.ema_size > 1.0 && stats.ema_size < 1000.0,
423            "EMA should be reasonable"
424        );
425        // Capacity should remain stable and not shrink below min_capacity
426        assert!(
427            eb.capacity() >= min_cap,
428            "Capacity should not go below min_capacity"
429        );
430        // With stable moderate load, capacity should be stable
431        assert!(
432            eb.capacity() <= initial_cap * 2,
433            "Capacity should not grow excessively"
434        );
435        // Should not have shrunk with stable load
436        assert_eq!(
437            stats.shrink_count, 0,
438            "Should not shrink with stable moderate load"
439        );
440    }
441
442    /// Jitter test: ensure frequent tiny spikes do not cause frequent shrinks.
443    #[test]
444    fn test_no_jitter_under_flapping() {
445        let cfg = EncodeBufferConfig {
446            min_capacity: 32,
447            ema_alpha: 0.1, // slow EMA to resist jitter
448            shrink_ratio_trigger: 2.0,
449            shrink_target_factor: 1.0,
450            shrink_cooldown: Duration::from_millis(50), // small cooldown for test
451            min_shrink_threshold: 0,
452        };
453
454        let mut eb = EncodeBuffer::with_config(cfg);
455        // simulate occasional large spikes but mostly small writes
456        for i in 0..200 {
457            if i % 50 == 0 {
458                // large spike
459                eb.append(&[0u8; 4096]);
460            } else {
461                eb.append(&[0u8; 16]);
462            }
463            let _ = eb.take_bytes();
464        }
465
466        let stats = eb.stats();
467        // shrink_count should be relatively small (0 or a few), not many
468        assert!(stats.shrink_count <= 10, "too many shrinks => jitter");
469    }
470}