Skip to main content

rocketmq_remoting/
smart_encode_buffer.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// smart_encode_buffer.rs
16//
17// High-performance adaptive EncodeBuffer with automatic shrink behavior.
18// - Expands as needed (using BytesMut growth rules).
19// - Tracks recent write sizes using EMA (exponential moving average).
20// - Shrinks conservatively based on EMA, ratio trigger, and cooldown to avoid jitter.
21// - Provides metrics for diagnostics.
22//
23
24use std::time::Duration;
25use std::time::Instant;
26
27use bytes::Bytes;
28use bytes::BytesMut;
29
30/// Configuration for EncodeBuffer adaptive behavior.
31#[derive(Debug, Clone)]
32pub struct EncodeBufferConfig {
33    /// Minimum capacity to keep (do not shrink below this).
34    pub min_capacity: usize,
35    /// EMA smoothing factor in (0,1]. Higher alpha -> more sensitive to recent
36    /// writes.
37    pub ema_alpha: f64,
38    /// Shrink trigger ratio: current_capacity >= ema_size *
39    /// shrink_ratio_trigger => consider shrink.
40    pub shrink_ratio_trigger: f64,
41    /// Shrink target factor: target_capacity = max(min_capacity, ema_size *
42    /// shrink_target_factor)
43    pub shrink_target_factor: f64,
44    /// Minimum time between two shrink operations (cooldown).
45    pub shrink_cooldown: Duration,
46    /// Absolute lower bound to consider shrink (if capacity <= this, do not
47    /// shrink).
48    pub min_shrink_threshold: usize,
49}
50
51impl Default for EncodeBufferConfig {
52    fn default() -> Self {
53        Self {
54            min_capacity: 8 * 1024,    // 8 KB
55            ema_alpha: 0.05,           // slow EMA to reduce jitter
56            shrink_ratio_trigger: 3.0, // conservative: capacity >= 3x recent need
57            shrink_target_factor: 1.5, // shrink to 1.5x of recent need
58            shrink_cooldown: Duration::from_secs(30),
59            min_shrink_threshold: 64 * 1024, // do not shrink small capacities (<64KB)
60        }
61    }
62}
63
64/// Runtime buffer statistics for diagnostics.
65#[derive(Debug, Clone)]
66pub struct BufferStats {
67    pub current_capacity: usize,
68    pub ema_size: f64,
69    pub shrink_count: u64,
70    pub expand_count: u64,
71    pub historical_max: usize,
72}
73
74/// EncodeBuffer: zero-copy friendly buffer for encoding operations with
75/// automatic, conservative shrinking to prevent long-term memory bloat.
76///
77/// Design goals:
78/// - Avoid frequent reallocation on normal workload (keep capacity after expansion).
79/// - Protect against single large spike causing permanent large allocation (occasional shrink).
80/// - Avoid shrink jitter by cooldown + EMA + absolute threshold.
81pub struct EncodeBuffer {
82    buf: BytesMut,
83    cfg: EncodeBufferConfig,
84
85    // Exponential moving average of recent write sizes (in bytes).
86    ema_recent_size: f64,
87    // Timestamp of last shrink operation.
88    last_shrink: Instant,
89    // The maximum len observed at runtime (diagnostic).
90    historical_max_len: usize,
91
92    // Diagnostics counters
93    shrink_count: u64,
94    expand_count: u64,
95    last_capacity: usize,
96}
97
98impl Default for EncodeBuffer {
99    fn default() -> Self {
100        Self::new()
101    }
102}
103
104impl EncodeBuffer {
105    /// Create a new EncodeBuffer with default configuration.
106    pub fn new() -> Self {
107        Self::with_config(EncodeBufferConfig::default())
108    }
109
110    /// Create a new EncodeBuffer with custom configuration.
111    pub fn with_config(cfg: EncodeBufferConfig) -> Self {
112        let initial = std::cmp::max(cfg.min_capacity, 8);
113        EncodeBuffer {
114            buf: BytesMut::with_capacity(initial),
115            cfg,
116            // Initialize EMA to a conservative value: twice the initial to avoid immediate shrink.
117            ema_recent_size: (initial * 2) as f64,
118            //allow immediate shrink if triggered
119            last_shrink: Instant::now() - Duration::from_secs(60),
120
121            historical_max_len: 0,
122            shrink_count: 0,
123            expand_count: 0,
124            last_capacity: initial,
125        }
126    }
127
128    /// Return current capacity.
129    #[inline]
130    pub fn capacity(&self) -> usize {
131        self.buf.capacity()
132    }
133
134    /// Return current length (amount of data written).
135    #[inline]
136    pub fn len(&self) -> usize {
137        self.buf.len()
138    }
139
140    /// Return whether buffer is empty.
141    #[inline]
142    pub fn is_empty(&self) -> bool {
143        self.buf.is_empty()
144    }
145
146    /// Append a slice into the buffer.
147    #[inline]
148    pub fn append(&mut self, data: &[u8]) {
149        self.track_expansion();
150        self.buf.extend_from_slice(data);
151    }
152
153    /// Append a Bytes value into the buffer (will copy into internal BytesMut).
154    /// Note: if you already have a complete chunk (single Bytes) to send,
155    /// send the Bytes directly instead of appending them to the buffer.
156    #[inline]
157    pub fn append_bytes(&mut self, bytes: &Bytes) {
158        self.track_expansion();
159        self.buf.extend_from_slice(bytes.as_ref());
160    }
161
162    /// Provide mutable access to inner BytesMut for in-place encoding.
163    /// Users that write directly should call `take_bytes` after writing.
164    #[inline]
165    pub fn buf_mut(&mut self) -> &mut BytesMut {
166        self.track_expansion();
167        &mut self.buf
168    }
169
170    /// Extract current contents as Bytes (zero-copy) and reset length to 0.
171    /// This also updates EMA and may trigger a shrink according to policy.
172    pub fn take_bytes(&mut self) -> Bytes {
173        let len = self.buf.len();
174        if len > self.historical_max_len {
175            self.historical_max_len = len;
176        }
177
178        let out = if len > 0 {
179            // split_to(len) returns the first len bytes and leaves self.buf.len() == 0.
180            // Note: After split_to, the remaining capacity becomes 0.
181            self.buf.split_to(len).freeze()
182        } else {
183            Bytes::new()
184        };
185
186        // Update EMA using the observed message size.
187        self.update_ema(len);
188
189        // Consider shrinking after updating EMA.
190        // Note: maybe_shrink will allocate a new buffer with appropriate capacity
191        self.maybe_shrink();
192
193        // After split_to, the remaining buffer may have reduced capacity.
194        // Ensure we have at least min_capacity for next write.
195        // Only do this if we didn't just shrink (shrink already sets capacity correctly).
196        let current_cap = self.buf.capacity();
197        if current_cap < self.cfg.min_capacity && self.buf.is_empty() {
198            // Replace the buffer with a new one at min_capacity
199            self.buf = BytesMut::with_capacity(self.cfg.min_capacity);
200        }
201
202        out
203    }
204
205    /// Force immediate shrink to configured min_capacity.
206    pub fn force_shrink_to_min(&mut self) {
207        let min = self.cfg.min_capacity;
208        if self.buf.capacity() > min {
209            self.do_shrink(min);
210        }
211    }
212
213    /// Return historic maximum recorded length.
214    pub fn historical_max(&self) -> usize {
215        self.historical_max_len
216    }
217
218    /// Return diagnostic stats.
219    pub fn stats(&self) -> BufferStats {
220        BufferStats {
221            current_capacity: self.capacity(),
222            ema_size: self.ema_recent_size,
223            shrink_count: self.shrink_count,
224            expand_count: self.expand_count,
225            historical_max: self.historical_max_len,
226        }
227    }
228
229    // --- internal helpers ---
230
231    /// Track capacity increases for diagnostics.
232    #[inline]
233    fn track_expansion(&mut self) {
234        let cur = self.buf.capacity();
235        if cur > self.last_capacity {
236            self.expand_count = self.expand_count.saturating_add(1);
237            self.last_capacity = cur;
238        }
239    }
240
241    /// Update the EMA with the last written size.
242    #[inline]
243    fn update_ema(&mut self, last_size: usize) {
244        let alpha = self.cfg.ema_alpha;
245        // guard for invalid alpha
246        let alpha = if alpha <= 0.0 {
247            0.05
248        } else if alpha > 1.0 {
249            1.0
250        } else {
251            alpha
252        };
253        self.ema_recent_size = alpha * (last_size as f64) + (1.0 - alpha) * self.ema_recent_size;
254    }
255
256    /// Decide whether to shrink and perform shrink if conditions are met.
257    fn maybe_shrink(&mut self) {
258        let cap = self.buf.capacity();
259        let ema = self.ema_recent_size.max(1.0);
260
261        // Do not attempt shrink below an absolute threshold to avoid
262        // over-fragmentation.
263        if cap <= self.cfg.min_shrink_threshold {
264            return;
265        }
266
267        // Conservative trigger: only shrink when capacity >= ema * ratio.
268        if (cap as f64) >= ema * self.cfg.shrink_ratio_trigger {
269            let now = Instant::now();
270            if now.duration_since(self.last_shrink) >= self.cfg.shrink_cooldown {
271                // Compute target capacity (rounded up), enforce min_capacity lower bound.
272                let target = std::cmp::max(
273                    self.cfg.min_capacity,
274                    (ema * self.cfg.shrink_target_factor).ceil() as usize,
275                );
276
277                if target < cap {
278                    self.do_shrink(target);
279                }
280            }
281        }
282    }
283
284    /// Perform the actual shrink: allocate a new BytesMut with target capacity.
285    /// If allocation fails (unlikely), we keep the old buffer to preserve
286    /// correctness.
287    fn do_shrink(&mut self, target: usize) {
288        // Attempt to allocate in a panic-safe way.
289        match std::panic::catch_unwind(|| BytesMut::with_capacity(target)) {
290            Ok(mut new_buf) => {
291                // If there are leftover bytes (should be none after take_bytes), copy them.
292                if !self.buf.is_empty() {
293                    new_buf.extend_from_slice(&self.buf);
294                }
295                self.buf = new_buf;
296                self.last_shrink = Instant::now();
297                self.shrink_count = self.shrink_count.saturating_add(1);
298                self.last_capacity = target;
299            }
300            Err(_) => {
301                // Allocation panic occurred; do nothing to preserve existing
302                // buffer.
303            }
304        }
305    }
306}
307
308#[cfg(test)]
309mod tests {
310    use std::time::Duration;
311
312    use super::*;
313
314    /// Test that buffer expands when large writes occur.
315    #[test]
316    fn test_expand_on_large_write() {
317        let mut eb = EncodeBuffer::new();
318        let initial_cap = eb.capacity();
319        // write bigger than initial capacity
320        let big = vec![0u8; initial_cap * 4 + 10];
321        eb.append(&big);
322        assert!(eb.capacity() >= big.len(), "capacity did not expand as expected");
323        let _ = eb.take_bytes();
324        assert_eq!(eb.len(), 0);
325    }
326
327    /// Test that EMA tracks message sizes and shrink logic is sound.
328    #[test]
329    fn test_shrink_after_spike() {
330        // Configure with settings that allow shrinking
331        let cfg = EncodeBufferConfig {
332            min_capacity: 128,
333            ema_alpha: 0.3,
334            shrink_ratio_trigger: 3.0,
335            shrink_target_factor: 1.8,
336            shrink_cooldown: Duration::from_millis(5),
337            min_shrink_threshold: 256,
338        };
339
340        let mut eb = EncodeBuffer::with_config(cfg);
341
342        // Write increasing sizes to build up EMA
343        for i in 1..=10 {
344            eb.append(&vec![0u8; i * 100]);
345            let _ = eb.take_bytes();
346        }
347
348        let stats_after_increase = eb.stats();
349        println!("After increasing writes - EMA: {:.1}", stats_after_increase.ema_size);
350
351        // EMA should be somewhere in the middle range
352        assert!(
353            stats_after_increase.ema_size > 100.0 && stats_after_increase.ema_size < 1500.0,
354            "EMA should reflect the varying message sizes"
355        );
356
357        // Now switch to very small writes
358        for _ in 0..30 {
359            eb.append(&[0u8; 20]);
360            let _ = eb.take_bytes();
361            std::thread::sleep(Duration::from_millis(1)); // Allow cooldown
362        }
363
364        let stats_final = eb.stats();
365        println!(
366            "After small writes - EMA: {:.1}, capacity: {}, shrinks: {}",
367            stats_final.ema_size, stats_final.current_capacity, stats_final.shrink_count
368        );
369
370        // EMA should have decreased significantly
371        assert!(
372            stats_final.ema_size < stats_after_increase.ema_size,
373            "EMA should decrease with small writes"
374        );
375        // Capacity should respect min_capacity
376        assert!(
377            stats_final.current_capacity >= 128,
378            "Capacity should not go below min_capacity"
379        );
380    }
381
382    /// Test EMA updates and stability (no excessive shrink when series of moderate
383    /// writes).
384    #[test]
385    fn test_ema_and_no_shrink_on_stable_load() {
386        let cfg = EncodeBufferConfig {
387            min_capacity: 64,
388            ema_alpha: 0.5,            // faster response for test
389            shrink_ratio_trigger: 3.0, // higher ratio to avoid premature shrink
390            shrink_target_factor: 1.5,
391            shrink_cooldown: Duration::from_secs(1),
392            min_shrink_threshold: 256, // prevent shrink for small capacities
393        };
394
395        let min_cap = cfg.min_capacity;
396        let mut eb = EncodeBuffer::with_config(cfg);
397        let initial_cap = eb.capacity();
398        println!("Initial capacity: {}", initial_cap);
399
400        // simulate moderate writes
401        for _ in 0..10 {
402            eb.append(&[0u8; 32]);
403            let _ = eb.take_bytes();
404        }
405        let stats = eb.stats();
406        println!("After 10 writes - capacity: {}, ema: {}", eb.capacity(), stats.ema_size);
407
408        // EMA should converge towards 32 (approx)
409        assert!(
410            stats.ema_size > 1.0 && stats.ema_size < 1000.0,
411            "EMA should be reasonable"
412        );
413        // Capacity should remain stable and not shrink below min_capacity
414        assert!(eb.capacity() >= min_cap, "Capacity should not go below min_capacity");
415        // With stable moderate load, capacity should be stable
416        assert!(eb.capacity() <= initial_cap * 2, "Capacity should not grow excessively");
417        // Should not have shrunk with stable load
418        assert_eq!(stats.shrink_count, 0, "Should not shrink with stable moderate load");
419    }
420
421    /// Jitter test: ensure frequent tiny spikes do not cause frequent shrinks.
422    #[test]
423    fn test_no_jitter_under_flapping() {
424        let cfg = EncodeBufferConfig {
425            min_capacity: 32,
426            ema_alpha: 0.1, // slow EMA to resist jitter
427            shrink_ratio_trigger: 2.0,
428            shrink_target_factor: 1.0,
429            shrink_cooldown: Duration::from_millis(50), // small cooldown for test
430            min_shrink_threshold: 0,
431        };
432
433        let mut eb = EncodeBuffer::with_config(cfg);
434        // simulate occasional large spikes but mostly small writes
435        for i in 0..200 {
436            if i % 50 == 0 {
437                // large spike
438                eb.append(&[0u8; 4096]);
439            } else {
440                eb.append(&[0u8; 16]);
441            }
442            let _ = eb.take_bytes();
443        }
444
445        let stats = eb.stats();
446        // shrink_count should be relatively small (0 or a few), not many
447        assert!(stats.shrink_count <= 10, "too many shrinks => jitter");
448    }
449}