Skip to main content

dynamo_runtime/pipeline/network/codec/
zero_copy_decoder.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Zero-copy TCP message decoder for high-concurrency scenarios
5//!
6//! This decoder eliminates message reconstruction copies by:
7//! 1. Reading into a reusable buffer
8//! 2. Parsing headers in-place
9//! 3. Splitting off exact message sizes (zero-copy via Bytes::split_to)
10//! 4. Returning Arc-counted Bytes that can be cloned cheaply
11
12use super::{
13    check_tcp_request_max_message_size, parse_tcp_request_frame_header, tcp_request_endpoint_len,
14    tcp_request_header_size, tcp_request_headers_len,
15};
16use crate::pipeline::network::get_tcp_max_message_size;
17use bytes::{Bytes, BytesMut};
18use std::io;
19use std::sync::OnceLock;
20use tokio::io::{AsyncRead, AsyncReadExt};
21
22const INITIAL_BUFFER_SIZE: usize = 262144; // 256KB
23const DEFAULT_SHRINK_SIZE: usize = 8 * 1024 * 1024; // 8MB
24
25static SHRINK_MESSAGE_SIZE: OnceLock<usize> = OnceLock::new();
26
27/// Get the shrink message size threshold.
28fn get_shrink_message_size() -> usize {
29    *SHRINK_MESSAGE_SIZE.get_or_init(|| {
30        let max_size = get_tcp_max_message_size();
31        // Check for environment variable override
32        let env_result = std::env::var("DYN_TCP_SHRINK_MESSAGE_SIZE");
33        let env_shrink_size = env_result.as_ref().ok().and_then(|s| {
34            s.parse::<usize>().ok().or_else(|| {
35                tracing::warn!(
36                    env_var = "DYN_TCP_SHRINK_MESSAGE_SIZE",
37                    value = %s,
38                    "Invalid value for DYN_TCP_SHRINK_MESSAGE_SIZE, using default"
39                );
40                None
41            })
42        });
43
44        let resolved = resolve_shrink_message_size(max_size, env_shrink_size);
45
46        // Warn if the configured value was clamped
47        if let Some(configured) = env_shrink_size
48            && configured != resolved
49        {
50            tracing::warn!(
51                configured_size = configured,
52                resolved_size = resolved,
53                max_size = max_size,
54                initial_buffer_size = INITIAL_BUFFER_SIZE,
55                "DYN_TCP_SHRINK_MESSAGE_SIZE was clamped to valid range. Note the size is in bytes."
56            );
57        }
58
59        resolved
60    })
61}
62
63/// Resolve the shrink message size threshold based on configuration and constraints.
64///
65fn resolve_shrink_message_size(max_size: usize, env_shrink_size: Option<usize>) -> usize {
66    let configured_size = env_shrink_size.unwrap_or(DEFAULT_SHRINK_SIZE);
67
68    // Clamp to valid range: [INITIAL_BUFFER_SIZE, max_size]
69    configured_size
70        .min(max_size) // Don't exceed max message size
71        .max(INITIAL_BUFFER_SIZE) // Don't go below initial buffer size
72}
73
74/// Zero-copy streaming decoder that reuses buffers
75///
76/// This decoder maintains an internal buffer and only allocates when necessary.
77/// Messages are returned as Arc-counted Bytes slices, making cloning extremely cheap.
78/// The reusable buffer resets back to INITIAL_BUFFER_SIZE only when unread data
79/// is empty and capacity exceeds DYN_TCP_SHRINK_MESSAGE_SIZE.
80pub struct ZeroCopyTcpDecoder {
81    /// Reusable read buffer - grows as needed, shrinks when empty and oversized
82    read_buffer: BytesMut,
83    /// Maximum allowed message size
84    max_message_size: usize,
85    /// Threshold for shrinking buffer back to initial size when empty
86    shrink_threshold: usize,
87}
88
89impl ZeroCopyTcpDecoder {
90    /// Create a new decoder with default buffer size
91    pub fn new() -> Self {
92        Self::with_capacity(INITIAL_BUFFER_SIZE)
93    }
94
95    /// Create a new decoder with specific initial capacity
96    pub fn with_capacity(capacity: usize) -> Self {
97        Self {
98            read_buffer: BytesMut::with_capacity(capacity),
99            max_message_size: get_tcp_max_message_size(),
100            shrink_threshold: get_shrink_message_size(),
101        }
102    }
103
104    /// Read one complete message with ZERO copies
105    ///
106    /// This method:
107    /// 1. Ensures headers are buffered
108    /// 2. Parses headers in-place (no allocation)
109    /// 3. Ensures entire message is buffered
110    /// 4. Splits off exact message size (zero-copy pointer arithmetic)
111    /// 5. Returns Arc-counted Bytes (cheap to clone)
112    pub async fn read_message<R: AsyncRead + Unpin>(
113        &mut self,
114        reader: &mut R,
115    ) -> io::Result<TcpRequestMessageZeroCopy> {
116        // Fill buffer if needed
117        while self.read_buffer.len() < super::TCP_REQUEST_ENDPOINT_LEN_WIDTH {
118            let n = reader.read_buf(&mut self.read_buffer).await?;
119            if n == 0 {
120                if self.read_buffer.is_empty() {
121                    return Err(io::Error::new(
122                        io::ErrorKind::UnexpectedEof,
123                        "connection closed",
124                    ));
125                } else {
126                    return Err(io::Error::new(
127                        io::ErrorKind::UnexpectedEof,
128                        "incomplete message header",
129                    ));
130                }
131            }
132        }
133
134        // Parse endpoint path length (first 2 bytes) - NO COPY
135        let path_len = tcp_request_endpoint_len(&self.read_buffer)?;
136
137        // Ensure we have path + headers_len
138        let initial_header_size =
139            super::TCP_REQUEST_ENDPOINT_LEN_WIDTH + path_len + super::TCP_REQUEST_HEADERS_LEN_WIDTH;
140        while self.read_buffer.len() < initial_header_size {
141            let n = reader.read_buf(&mut self.read_buffer).await?;
142            if n == 0 {
143                return Err(io::Error::new(
144                    io::ErrorKind::UnexpectedEof,
145                    "incomplete message header",
146                ));
147            }
148        }
149
150        // Parse headers length (2 bytes after path) - NO COPY
151        let headers_len = tcp_request_headers_len(&self.read_buffer, path_len)?;
152
153        // Ensure we have headers + payload length
154        let full_header_size = tcp_request_header_size(path_len, headers_len);
155        while self.read_buffer.len() < full_header_size {
156            let n = reader.read_buf(&mut self.read_buffer).await?;
157            if n == 0 {
158                return Err(io::Error::new(
159                    io::ErrorKind::UnexpectedEof,
160                    "incomplete message header",
161                ));
162            }
163        }
164
165        let parsed = parse_tcp_request_frame_header(&self.read_buffer)?;
166
167        // Sanity check total message length (including all overhead)
168        check_tcp_request_max_message_size(parsed.total_len, self.max_message_size)?;
169
170        // Ensure entire message is buffered
171        while self.read_buffer.len() < parsed.total_len {
172            let n = reader.read_buf(&mut self.read_buffer).await?;
173            if n == 0 {
174                return Err(io::Error::new(
175                    io::ErrorKind::UnexpectedEof,
176                    format!(
177                        "incomplete message: expected {} bytes, got {}",
178                        parsed.total_len,
179                        self.read_buffer.len()
180                    ),
181                ));
182            }
183        }
184
185        // Split off exactly what we need - ZERO COPY!
186        // split_to() just advances the internal pointer, doesn't allocate or copy
187        let message_bytes = self.read_buffer.split_to(parsed.total_len).freeze();
188
189        // Shrink buffer if it grew too large and is now empty, could be optimized with lock-free buffer pool in the future.
190        if self.read_buffer.is_empty() && self.read_buffer.capacity() > self.shrink_threshold {
191            self.read_buffer = BytesMut::with_capacity(INITIAL_BUFFER_SIZE);
192        }
193
194        // Return zero-copy message wrapper
195        Ok(TcpRequestMessageZeroCopy::new(message_bytes, parsed))
196    }
197
198    /// Get the current buffer capacity
199    pub fn buffer_capacity(&self) -> usize {
200        self.read_buffer.capacity()
201    }
202
203    /// Get the current buffered data size
204    pub fn buffered_len(&self) -> usize {
205        self.read_buffer.len()
206    }
207}
208
209impl Default for ZeroCopyTcpDecoder {
210    fn default() -> Self {
211        Self::new()
212    }
213}
214
215/// Zero-copy message representation
216///
217/// This struct holds an Arc-counted Bytes buffer containing the entire message.
218/// All accessors return zero-copy slices or references into this buffer.
219#[derive(Clone)]
220pub struct TcpRequestMessageZeroCopy {
221    /// Entire message as Arc-counted buffer
222    /// Format: [path_len(2)][path(var)][headers_len(2)][headers(var)][payload_len(4)][payload(var)]
223    raw: Bytes,
224    parsed: super::TcpRequestWireHeader,
225}
226
227impl TcpRequestMessageZeroCopy {
228    /// Create a new zero-copy message from raw bytes
229    fn new(raw: Bytes, parsed: super::TcpRequestWireHeader) -> Self {
230        Self { raw, parsed }
231    }
232
233    /// Get endpoint path as a string slice (zero-copy)
234    ///
235    /// This returns a reference into the message buffer, no allocation.
236    pub fn endpoint_path(&self) -> Result<&str, std::str::Utf8Error> {
237        std::str::from_utf8(&self.raw[self.parsed.endpoint_start()..self.parsed.endpoint_end()])
238    }
239
240    /// Get endpoint path as bytes (zero-copy)
241    pub fn endpoint_path_bytes(&self) -> &[u8] {
242        &self.raw[self.parsed.endpoint_start()..self.parsed.endpoint_end()]
243    }
244
245    /// Get headers as bytes (zero-copy)
246    pub fn headers_bytes(&self) -> &[u8] {
247        &self.raw[self.parsed.headers_start()..self.parsed.headers_end()]
248    }
249
250    /// Get headers as a HashMap (requires parsing)
251    pub fn headers(&self) -> std::collections::HashMap<String, String> {
252        let headers_bytes = self.headers_bytes();
253        if headers_bytes.is_empty() {
254            return std::collections::HashMap::new();
255        }
256
257        // Parse headers from JSON format
258        serde_json::from_slice(headers_bytes).unwrap_or_default()
259    }
260
261    /// Get the payload length
262    #[inline]
263    fn payload_len(&self) -> usize {
264        self.parsed.payload_len
265    }
266
267    /// Get payload as zero-copy Bytes
268    ///
269    /// This returns an Arc-counted slice of the message buffer.
270    /// Cloning the returned Bytes is extremely cheap (just Arc clone).
271    pub fn payload(&self) -> Bytes {
272        self.raw.slice(self.parsed.payload_start()..) // ZERO COPY! Just Arc clone + offset
273    }
274
275    /// Get total message size in bytes
276    pub fn total_size(&self) -> usize {
277        self.raw.len()
278    }
279
280    /// Get the raw message bytes (for debugging)
281    pub fn raw_bytes(&self) -> &Bytes {
282        &self.raw
283    }
284}
285
286impl std::fmt::Debug for TcpRequestMessageZeroCopy {
287    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
288        f.debug_struct("TcpRequestMessageZeroCopy")
289            .field("total_size", &self.total_size())
290            .field("endpoint_path", &self.endpoint_path().ok())
291            .field("payload_len", &self.payload_len())
292            .finish()
293    }
294}
295
296#[cfg(test)]
297mod tests {
298    use super::*;
299    use tokio::io::AsyncWriteExt;
300
301    #[test]
302    fn test_resolve_shrink_message_size_edge_cases() {
303        // Test case: max_size = 10MB (larger than DEFAULT_SHRINK_SIZE)
304        // Should return DEFAULT_SHRINK_SIZE (8MB) since env is None
305        let max_size_10mb = 10 * 1024 * 1024;
306        let result = resolve_shrink_message_size(max_size_10mb, None);
307        assert_eq!(
308            result, DEFAULT_SHRINK_SIZE,
309            "10MB max should return default 8MB"
310        );
311
312        // Test case: max_size < DEFAULT_SHRINK_SIZE
313        // Should return max_size (capped by .min())
314        let max_size_1mb = 1024 * 1024;
315        let result = resolve_shrink_message_size(max_size_1mb, None);
316        assert_eq!(result, max_size_1mb, "1MB max should be capped to 1MB");
317
318        // Test case: max_size = DEFAULT_SHRINK_SIZE
319        // Should return DEFAULT_SHRINK_SIZE (exact match)
320        let result = resolve_shrink_message_size(DEFAULT_SHRINK_SIZE, None);
321        assert_eq!(
322            result, DEFAULT_SHRINK_SIZE,
323            "exact match should return default"
324        );
325
326        // Test case: env_shrink_size provided and within bounds
327        let env_size = 2 * 1024 * 1024; // 2MB
328        let result = resolve_shrink_message_size(max_size_10mb, Some(env_size));
329        assert_eq!(
330            result, env_size,
331            "env var should be used when within bounds"
332        );
333
334        // Test case: env_shrink_size exceeds max_size
335        let env_size_large = 20 * 1024 * 1024; // 20MB
336        let result = resolve_shrink_message_size(max_size_10mb, Some(env_size_large));
337        assert_eq!(
338            result, max_size_10mb,
339            "env var should be capped to max_size"
340        );
341
342        // Test case: env_shrink_size below INITIAL_BUFFER_SIZE
343        let env_size_small = 100 * 1024; // 100KB < 256KB
344        let result = resolve_shrink_message_size(max_size_10mb, Some(env_size_small));
345        assert_eq!(
346            result, INITIAL_BUFFER_SIZE,
347            "env var should be clamped to INITIAL_BUFFER_SIZE"
348        );
349
350        // Test case: max_size below INITIAL_BUFFER_SIZE
351        let max_size_small = 100 * 1024; // 100KB < 256KB
352        let result = resolve_shrink_message_size(max_size_small, None);
353        assert_eq!(
354            result, INITIAL_BUFFER_SIZE,
355            "result should be clamped to INITIAL_BUFFER_SIZE"
356        );
357    }
358
359    #[tokio::test]
360    async fn test_zero_copy_decoder_basic() {
361        // Create a test message with headers
362        let endpoint = "test/endpoint";
363        let payload = b"Hello, World!";
364        let headers: Vec<u8> = vec![]; // Empty headers
365
366        let mut message = Vec::new();
367        // path_len + path
368        message.extend_from_slice(&(endpoint.len() as u16).to_be_bytes());
369        message.extend_from_slice(endpoint.as_bytes());
370        // headers_len + headers
371        message.extend_from_slice(&(headers.len() as u16).to_be_bytes());
372        message.extend_from_slice(&headers);
373        // payload_len + payload
374        message.extend_from_slice(&(payload.len() as u32).to_be_bytes());
375        message.extend_from_slice(payload);
376
377        // Create a mock reader
378        let mut reader = &message[..];
379
380        // Decode
381        let mut decoder = ZeroCopyTcpDecoder::new();
382        let msg = decoder.read_message(&mut reader).await.unwrap();
383
384        // Verify
385        assert_eq!(msg.endpoint_path().unwrap(), endpoint);
386        assert_eq!(msg.payload().as_ref(), payload);
387        assert_eq!(msg.total_size(), message.len());
388        assert_eq!(msg.headers().len(), 0); // Empty headers
389    }
390
391    #[tokio::test]
392    async fn test_zero_copy_decoder_allows_empty_and_long_endpoint_paths() {
393        for endpoint in [String::new(), "x".repeat(2048)] {
394            let payload = b"payload";
395
396            let mut message = Vec::new();
397            message.extend_from_slice(&(endpoint.len() as u16).to_be_bytes());
398            message.extend_from_slice(endpoint.as_bytes());
399            message.extend_from_slice(&(0u16).to_be_bytes());
400            message.extend_from_slice(&(payload.len() as u32).to_be_bytes());
401            message.extend_from_slice(payload);
402
403            let mut reader = &message[..];
404            let mut decoder = ZeroCopyTcpDecoder::new();
405            let msg = decoder.read_message(&mut reader).await.unwrap();
406
407            assert_eq!(msg.endpoint_path().unwrap(), endpoint.as_str());
408            assert_eq!(msg.payload().as_ref(), payload);
409        }
410    }
411
412    #[tokio::test]
413    async fn test_zero_copy_decoder_large_payload() {
414        // Create a large payload (200KB)
415        let endpoint = "large/endpoint";
416        let payload = vec![0x42u8; 200 * 1024];
417        let headers: Vec<u8> = vec![]; // Empty headers
418
419        let mut message = Vec::new();
420        // path_len + path
421        message.extend_from_slice(&(endpoint.len() as u16).to_be_bytes());
422        message.extend_from_slice(endpoint.as_bytes());
423        // headers_len + headers
424        message.extend_from_slice(&(headers.len() as u16).to_be_bytes());
425        message.extend_from_slice(&headers);
426        // payload_len + payload
427        message.extend_from_slice(&(payload.len() as u32).to_be_bytes());
428        message.extend_from_slice(&payload);
429
430        let mut reader = &message[..];
431        let mut decoder = ZeroCopyTcpDecoder::new();
432        let msg = decoder.read_message(&mut reader).await.unwrap();
433
434        assert_eq!(msg.endpoint_path().unwrap(), endpoint);
435        assert_eq!(msg.payload().len(), payload.len());
436    }
437
438    #[tokio::test]
439    async fn test_zero_copy_decoder_total_size_limit() {
440        // Test that the decoder validates total message size, not just payload size
441        // Create a message where total_len exceeds max but payload alone might not
442        let max_size = 1024; // 1KB limit
443        let mut decoder = ZeroCopyTcpDecoder::with_capacity(256);
444        decoder.max_message_size = max_size;
445
446        // Create a message that exceeds the limit with overhead included
447        let endpoint = "test/endpoint";
448        let payload = vec![0x42u8; max_size]; // Payload equals max
449        let headers: Vec<u8> = vec![]; // Empty headers
450
451        let mut message = Vec::new();
452        // path_len + path
453        message.extend_from_slice(&(endpoint.len() as u16).to_be_bytes());
454        message.extend_from_slice(endpoint.as_bytes());
455        // headers_len + headers
456        message.extend_from_slice(&(headers.len() as u16).to_be_bytes());
457        message.extend_from_slice(&headers);
458        // payload_len + payload
459        message.extend_from_slice(&(payload.len() as u32).to_be_bytes());
460        message.extend_from_slice(&payload);
461
462        // total_len = 2 + 13 + 2 + 0 + 4 + 1024 = 1045 bytes > 1024 max
463        let mut reader = &message[..];
464        let result = decoder.read_message(&mut reader).await;
465
466        // Should fail with InvalidData error
467        assert!(result.is_err());
468        let err = result.unwrap_err();
469        assert_eq!(err.kind(), io::ErrorKind::InvalidData);
470        assert!(err.to_string().contains("message too large"));
471        assert!(err.to_string().contains("1045")); // total_len
472        assert!(err.to_string().contains("1024")); // max_message_size
473    }
474
475    #[tokio::test]
476    async fn test_zero_copy_decoder_with_headers() {
477        // Test header parsing with actual header data
478        let endpoint = "api/v1/inference";
479        let payload = b"Request payload data";
480
481        // Create mock headers as JSON
482        let mut headers_map = std::collections::HashMap::new();
483        headers_map.insert("traceparent".to_string(), "00-abc123-def456-01".to_string());
484        headers_map.insert("user-agent".to_string(), "test-client/1.0".to_string());
485        headers_map.insert("request-id".to_string(), "req-12345".to_string());
486
487        let headers_json = serde_json::to_vec(&headers_map).unwrap();
488
489        let mut message = Vec::new();
490        // path_len + path
491        message.extend_from_slice(&(endpoint.len() as u16).to_be_bytes());
492        message.extend_from_slice(endpoint.as_bytes());
493        // headers_len + headers (non-empty this time)
494        message.extend_from_slice(&(headers_json.len() as u16).to_be_bytes());
495        message.extend_from_slice(&headers_json);
496        // payload_len + payload
497        message.extend_from_slice(&(payload.len() as u32).to_be_bytes());
498        message.extend_from_slice(payload);
499
500        // Decode the message
501        let mut reader = &message[..];
502        let mut decoder = ZeroCopyTcpDecoder::new();
503        let msg = decoder.read_message(&mut reader).await.unwrap();
504
505        // Verify endpoint
506        assert_eq!(msg.endpoint_path().unwrap(), endpoint);
507
508        // Verify payload
509        assert_eq!(msg.payload().as_ref(), payload);
510
511        // Verify total size includes all components
512        assert_eq!(msg.total_size(), message.len());
513
514        // Verify headers are correctly parsed
515        let decoded_headers = msg.headers();
516        assert_eq!(decoded_headers.len(), 3);
517        assert_eq!(
518            decoded_headers.get("traceparent").unwrap(),
519            "00-abc123-def456-01"
520        );
521        assert_eq!(
522            decoded_headers.get("user-agent").unwrap(),
523            "test-client/1.0"
524        );
525        assert_eq!(decoded_headers.get("request-id").unwrap(), "req-12345");
526
527        // Verify headers_bytes returns the raw JSON
528        let headers_bytes = msg.headers_bytes();
529        assert_eq!(headers_bytes, &headers_json[..]);
530    }
531
532    #[tokio::test]
533    async fn test_zero_copy_decoder_empty_vs_populated_headers() {
534        // Test both empty and populated headers in sequence to ensure proper parsing
535        let endpoint = "test/endpoint";
536        let payload = b"test data";
537
538        // Test 1: Empty headers
539        let mut message_empty = Vec::new();
540        message_empty.extend_from_slice(&(endpoint.len() as u16).to_be_bytes());
541        message_empty.extend_from_slice(endpoint.as_bytes());
542        message_empty.extend_from_slice(&(0u16).to_be_bytes()); // headers_len = 0
543        // No headers bytes
544        message_empty.extend_from_slice(&(payload.len() as u32).to_be_bytes());
545        message_empty.extend_from_slice(payload);
546
547        let mut reader = &message_empty[..];
548        let mut decoder = ZeroCopyTcpDecoder::new();
549        let msg = decoder.read_message(&mut reader).await.unwrap();
550
551        assert_eq!(msg.endpoint_path().unwrap(), endpoint);
552        assert_eq!(msg.payload().as_ref(), payload);
553        assert_eq!(msg.headers().len(), 0);
554        assert_eq!(msg.headers_bytes().len(), 0);
555
556        // Test 2: Populated headers with same decoder
557        let mut headers_map = std::collections::HashMap::new();
558        headers_map.insert("x-test-header".to_string(), "test-value".to_string());
559        let headers_json = serde_json::to_vec(&headers_map).unwrap();
560
561        let mut message_with_headers = Vec::new();
562        message_with_headers.extend_from_slice(&(endpoint.len() as u16).to_be_bytes());
563        message_with_headers.extend_from_slice(endpoint.as_bytes());
564        message_with_headers.extend_from_slice(&(headers_json.len() as u16).to_be_bytes());
565        message_with_headers.extend_from_slice(&headers_json);
566        message_with_headers.extend_from_slice(&(payload.len() as u32).to_be_bytes());
567        message_with_headers.extend_from_slice(payload);
568
569        let mut reader = &message_with_headers[..];
570        let msg = decoder.read_message(&mut reader).await.unwrap();
571
572        assert_eq!(msg.endpoint_path().unwrap(), endpoint);
573        assert_eq!(msg.payload().as_ref(), payload);
574        assert_eq!(msg.headers().len(), 1);
575        assert_eq!(msg.headers().get("x-test-header").unwrap(), "test-value");
576    }
577
578    #[tokio::test]
579    async fn test_zero_copy_decoder_buffer_shrinking() {
580        // Test that buffer shrinks back after reading a large message.
581        // Uses small sizes to avoid env var dependencies and keep test fast.
582        let endpoint = "test/endpoint";
583        let small_payload = b"small";
584        // Use 1MB payload with 512KB shrink threshold
585        let large_payload = vec![0x42u8; 1024 * 1024]; // 1MB
586
587        fn make_message(endpoint: &str, payload: &[u8]) -> Vec<u8> {
588            let mut message = Vec::new();
589            message.extend_from_slice(&(endpoint.len() as u16).to_be_bytes());
590            message.extend_from_slice(endpoint.as_bytes());
591            message.extend_from_slice(&(0u16).to_be_bytes()); // empty headers
592            message.extend_from_slice(&(payload.len() as u32).to_be_bytes());
593            message.extend_from_slice(payload);
594            message
595        }
596
597        // Create decoder with explicit settings to avoid env var dependencies
598        let mut decoder = ZeroCopyTcpDecoder::with_capacity(INITIAL_BUFFER_SIZE);
599        decoder.max_message_size = 2 * 1024 * 1024; // 2MB max
600        decoder.shrink_threshold = 512 * 1024; // 512KB shrink threshold
601
602        assert!(decoder.buffer_capacity() <= INITIAL_BUFFER_SIZE);
603
604        // Read large message - buffer grows during read, then shrinks after split_to()
605        let large_message = make_message(endpoint, &large_payload);
606        let mut reader = &large_message[..];
607        decoder.read_message(&mut reader).await.unwrap();
608
609        // After reading, buffer should have shrunk back because:
610        // - The buffer grew to ~1MB to hold the message
611        // - 1MB >= 512KB shrink threshold, so it triggers the shrink
612        assert!(
613            decoder.buffer_capacity() <= INITIAL_BUFFER_SIZE,
614            "buffer should shrink after large message, got capacity {}",
615            decoder.buffer_capacity()
616        );
617        assert!(
618            decoder.buffered_len() == 0,
619            "buffer should be empty after read"
620        );
621
622        // Read small message - should work fine with shrunk buffer
623        let small_message = make_message(endpoint, small_payload);
624        let mut reader = &small_message[..];
625        let msg = decoder.read_message(&mut reader).await.unwrap();
626        assert_eq!(msg.payload().as_ref(), small_payload);
627    }
628}