connectrpc_axum/context/
envelope_compression.rs

1//! Envelope compression for Connect streaming RPCs.
2//!
3//! This module handles per-envelope compression in streaming Connect RPCs.
4//! HTTP body compression (for unary RPCs) is handled by Tower middleware.
5//!
6//! ## Architecture
7//!
8//! The Connect protocol uses two different compression mechanisms:
9//!
10//! - **Unary RPCs**: Use standard HTTP `Content-Encoding`/`Accept-Encoding` headers.
11//!   This is handled by Tower's `CompressionLayer`.
12//!
13//! - **Streaming RPCs**: Use `Connect-Content-Encoding`/`Connect-Accept-Encoding` headers.
14//!   Each message envelope is individually compressed. This module handles that.
15//!
16//! ## Codec Trait
17//!
18//! The [`Codec`] trait provides a simple `Bytes → Bytes` API for compression.
19//! This is intentionally simple because the envelope format requires full buffering anyway:
20//!
21//! ```text
22//! [flags:1][length:4][payload]
23//! ```
24//!
25//! We must read all `length` bytes before decompression, so streaming codecs
26//! provide no benefit for envelope compression.
27
28use crate::error::{Code, ConnectError};
29use bytes::Bytes;
30use flate2::Compression as GzipLevel;
31use flate2::read::GzDecoder;
32use flate2::write::GzEncoder;
33use std::io::{self, Read, Write};
34use std::sync::Arc;
35
36// ============================================================================
37// Codec Trait
38// ============================================================================
39
40/// Codec trait for per-message (envelope) compression.
41///
42/// Used for streaming Connect RPCs where each message is individually compressed.
43/// HTTP body compression for unary RPCs is handled by Tower middleware.
44///
45/// # Example
46///
47/// ```ignore
48/// use connectrpc_axum::context::message_compression::Codec;
49/// use bytes::Bytes;
50/// use std::io;
51///
52/// struct Lz4Codec;
53///
54/// impl Codec for Lz4Codec {
55///     fn name(&self) -> &'static str { "lz4" }
56///
57///     fn compress(&self, data: &[u8]) -> io::Result<Bytes> {
58///         // ... lz4 compression
59///     }
60///
61///     fn decompress(&self, data: &[u8]) -> io::Result<Bytes> {
62///         // ... lz4 decompression
63///     }
64/// }
65/// ```
66pub trait Codec: Send + Sync + 'static {
67    /// The encoding name for HTTP headers (e.g., "gzip", "zstd", "br").
68    fn name(&self) -> &'static str;
69
70    /// Compress data.
71    fn compress(&self, data: &[u8]) -> io::Result<Bytes>;
72
73    /// Decompress data.
74    fn decompress(&self, data: &[u8]) -> io::Result<Bytes>;
75}
76
77// ============================================================================
78// Boxed Codec
79// ============================================================================
80
81/// A boxed codec for type-erased storage.
82///
83/// Use `Option<BoxedCodec>` where `None` represents identity (no compression).
84#[derive(Clone)]
85pub struct BoxedCodec(Arc<dyn Codec>);
86
87impl BoxedCodec {
88    /// Create a new boxed codec.
89    pub fn new<C: Codec>(codec: C) -> Self {
90        BoxedCodec(Arc::new(codec))
91    }
92
93    /// Get the codec name for HTTP headers.
94    pub fn name(&self) -> &'static str {
95        self.0.name()
96    }
97
98    /// Compress data.
99    pub fn compress(&self, data: &[u8]) -> io::Result<Bytes> {
100        self.0.compress(data)
101    }
102
103    /// Decompress data.
104    pub fn decompress(&self, data: &[u8]) -> io::Result<Bytes> {
105        self.0.decompress(data)
106    }
107}
108
109impl std::fmt::Debug for BoxedCodec {
110    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111        f.debug_tuple("BoxedCodec").field(&self.name()).finish()
112    }
113}
114
115// ============================================================================
116// Built-in Codecs
117// ============================================================================
118
119/// Gzip codec using flate2.
120#[derive(Debug, Clone, Copy)]
121pub struct GzipCodec {
122    /// Compression level (0-9). Default is 6.
123    pub level: u32,
124}
125
126impl Default for GzipCodec {
127    fn default() -> Self {
128        Self { level: 6 }
129    }
130}
131
132impl GzipCodec {
133    /// Create a new GzipCodec with the specified compression level.
134    ///
135    /// Level ranges from 0 (no compression) to 9 (best compression).
136    pub fn with_level(level: u32) -> Self {
137        Self {
138            level: level.min(9),
139        }
140    }
141}
142
143impl Codec for GzipCodec {
144    fn name(&self) -> &'static str {
145        "gzip"
146    }
147
148    fn compress(&self, data: &[u8]) -> io::Result<Bytes> {
149        let mut encoder = GzEncoder::new(Vec::new(), GzipLevel::new(self.level));
150        encoder.write_all(data)?;
151        Ok(Bytes::from(encoder.finish()?))
152    }
153
154    fn decompress(&self, data: &[u8]) -> io::Result<Bytes> {
155        let mut decoder = GzDecoder::new(data);
156        let mut decompressed = Vec::new();
157        decoder.read_to_end(&mut decompressed)?;
158        Ok(Bytes::from(decompressed))
159    }
160}
161
162// ============================================================================
163// Deflate Codec (feature-gated)
164// ============================================================================
165
166/// Deflate codec using flate2.
167///
168/// Requires the `compression-deflate` feature.
169#[cfg(feature = "compression-deflate")]
170#[derive(Debug, Clone, Copy)]
171pub struct DeflateCodec {
172    /// Compression level (0-9). Default is 6.
173    pub level: u32,
174}
175
176#[cfg(feature = "compression-deflate")]
177impl Default for DeflateCodec {
178    fn default() -> Self {
179        Self { level: 6 }
180    }
181}
182
183#[cfg(feature = "compression-deflate")]
184impl DeflateCodec {
185    /// Create a new DeflateCodec with the specified compression level.
186    ///
187    /// Level ranges from 0 (no compression) to 9 (best compression).
188    pub fn with_level(level: u32) -> Self {
189        Self {
190            level: level.min(9),
191        }
192    }
193}
194
195#[cfg(feature = "compression-deflate")]
196impl Codec for DeflateCodec {
197    fn name(&self) -> &'static str {
198        "deflate"
199    }
200
201    fn compress(&self, data: &[u8]) -> io::Result<Bytes> {
202        // HTTP "deflate" Content-Encoding uses zlib format (RFC 1950),
203        // not raw DEFLATE (RFC 1951). This ensures compatibility with
204        // tower-http and standard HTTP clients.
205        use flate2::write::ZlibEncoder;
206        let mut encoder = ZlibEncoder::new(Vec::new(), GzipLevel::new(self.level));
207        encoder.write_all(data)?;
208        Ok(Bytes::from(encoder.finish()?))
209    }
210
211    fn decompress(&self, data: &[u8]) -> io::Result<Bytes> {
212        // HTTP "deflate" Content-Encoding uses zlib format (RFC 1950),
213        // not raw DEFLATE (RFC 1951). This ensures compatibility with
214        // tower-http and standard HTTP clients.
215        use flate2::read::ZlibDecoder;
216        let mut decoder = ZlibDecoder::new(data);
217        let mut decompressed = Vec::new();
218        decoder.read_to_end(&mut decompressed)?;
219        Ok(Bytes::from(decompressed))
220    }
221}
222
223// ============================================================================
224// Brotli Codec (feature-gated)
225// ============================================================================
226
227/// Brotli codec.
228///
229/// Requires the `compression-br` feature.
230#[cfg(feature = "compression-br")]
231#[derive(Debug, Clone, Copy)]
232pub struct BrotliCodec {
233    /// Compression quality (0-11). Default is 4.
234    pub quality: u32,
235}
236
237#[cfg(feature = "compression-br")]
238impl Default for BrotliCodec {
239    fn default() -> Self {
240        Self { quality: 4 }
241    }
242}
243
244#[cfg(feature = "compression-br")]
245impl BrotliCodec {
246    /// Create a new BrotliCodec with the specified quality level.
247    ///
248    /// Quality ranges from 0 (fastest) to 11 (best compression).
249    pub fn with_quality(quality: u32) -> Self {
250        Self {
251            quality: quality.min(11),
252        }
253    }
254}
255
256#[cfg(feature = "compression-br")]
257impl Codec for BrotliCodec {
258    fn name(&self) -> &'static str {
259        "br"
260    }
261
262    fn compress(&self, data: &[u8]) -> io::Result<Bytes> {
263        use brotli::enc::BrotliEncoderParams;
264        let mut output = Vec::new();
265        let params = BrotliEncoderParams {
266            quality: self.quality as i32,
267            ..Default::default()
268        };
269        brotli::enc::BrotliCompress(&mut std::io::Cursor::new(data), &mut output, &params)?;
270        Ok(Bytes::from(output))
271    }
272
273    fn decompress(&self, data: &[u8]) -> io::Result<Bytes> {
274        let mut output = Vec::new();
275        brotli::BrotliDecompress(&mut std::io::Cursor::new(data), &mut output)?;
276        Ok(Bytes::from(output))
277    }
278}
279
280// ============================================================================
281// Zstd Codec (feature-gated)
282// ============================================================================
283
284/// Zstd codec.
285///
286/// Requires the `compression-zstd` feature.
287#[cfg(feature = "compression-zstd")]
288#[derive(Debug, Clone, Copy)]
289pub struct ZstdCodec {
290    /// Compression level (1-22). Default is 3.
291    pub level: i32,
292}
293
294#[cfg(feature = "compression-zstd")]
295impl Default for ZstdCodec {
296    fn default() -> Self {
297        Self { level: 3 }
298    }
299}
300
301#[cfg(feature = "compression-zstd")]
302impl ZstdCodec {
303    /// Create a new ZstdCodec with the specified compression level.
304    ///
305    /// Level ranges from 1 (fastest) to 22 (best compression).
306    pub fn with_level(level: i32) -> Self {
307        Self {
308            level: level.clamp(1, 22),
309        }
310    }
311}
312
313#[cfg(feature = "compression-zstd")]
314impl Codec for ZstdCodec {
315    fn name(&self) -> &'static str {
316        "zstd"
317    }
318
319    fn compress(&self, data: &[u8]) -> io::Result<Bytes> {
320        let compressed = zstd::bulk::compress(data, self.level)
321            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
322        Ok(Bytes::from(compressed))
323    }
324
325    fn decompress(&self, data: &[u8]) -> io::Result<Bytes> {
326        let mut decoder = zstd::Decoder::new(data)?;
327        let mut decompressed = Vec::new();
328        decoder.read_to_end(&mut decompressed)?;
329        Ok(Bytes::from(decompressed))
330    }
331}
332
333// ============================================================================
334// Codec Resolution
335// ============================================================================
336
337/// Resolve a codec from an encoding name.
338///
339/// Returns `None` for identity (no compression needed).
340/// Returns `Some(BoxedCodec)` for supported encodings.
341/// Returns `Err` for unsupported encodings.
342pub fn resolve_codec(name: &str) -> Result<Option<BoxedCodec>, ConnectError> {
343    match name {
344        "" | "identity" => Ok(None),
345        "gzip" => Ok(Some(BoxedCodec::new(GzipCodec::default()))),
346        #[cfg(feature = "compression-deflate")]
347        "deflate" => Ok(Some(BoxedCodec::new(DeflateCodec::default()))),
348        #[cfg(feature = "compression-br")]
349        "br" => Ok(Some(BoxedCodec::new(BrotliCodec::default()))),
350        #[cfg(feature = "compression-zstd")]
351        "zstd" => Ok(Some(BoxedCodec::new(ZstdCodec::default()))),
352        other => Err(ConnectError::new(
353            Code::Unimplemented,
354            format!(
355                "unsupported compression \"{}\": supported encodings are {}",
356                other,
357                supported_encodings_str()
358            ),
359        )),
360    }
361}
362
363/// Returns a comma-separated string of supported encodings for error messages.
364fn supported_encodings_str() -> &'static str {
365    #[cfg(all(
366        feature = "compression-deflate",
367        feature = "compression-br",
368        feature = "compression-zstd"
369    ))]
370    {
371        "gzip, deflate, br, zstd, identity"
372    }
373    #[cfg(all(
374        feature = "compression-deflate",
375        feature = "compression-br",
376        not(feature = "compression-zstd")
377    ))]
378    {
379        "gzip, deflate, br, identity"
380    }
381    #[cfg(all(
382        feature = "compression-deflate",
383        not(feature = "compression-br"),
384        feature = "compression-zstd"
385    ))]
386    {
387        "gzip, deflate, zstd, identity"
388    }
389    #[cfg(all(
390        not(feature = "compression-deflate"),
391        feature = "compression-br",
392        feature = "compression-zstd"
393    ))]
394    {
395        "gzip, br, zstd, identity"
396    }
397    #[cfg(all(
398        feature = "compression-deflate",
399        not(feature = "compression-br"),
400        not(feature = "compression-zstd")
401    ))]
402    {
403        "gzip, deflate, identity"
404    }
405    #[cfg(all(
406        not(feature = "compression-deflate"),
407        feature = "compression-br",
408        not(feature = "compression-zstd")
409    ))]
410    {
411        "gzip, br, identity"
412    }
413    #[cfg(all(
414        not(feature = "compression-deflate"),
415        not(feature = "compression-br"),
416        feature = "compression-zstd"
417    ))]
418    {
419        "gzip, zstd, identity"
420    }
421    #[cfg(all(
422        not(feature = "compression-deflate"),
423        not(feature = "compression-br"),
424        not(feature = "compression-zstd")
425    ))]
426    {
427        "gzip, identity"
428    }
429}
430
431// ============================================================================
432// Compression Helpers
433// ============================================================================
434
435/// Compress bytes using the specified codec.
436///
437/// If `codec` is `None`, returns the input unchanged (identity).
438pub fn compress_bytes(bytes: Bytes, codec: Option<&BoxedCodec>) -> io::Result<Bytes> {
439    match codec {
440        None => Ok(bytes), // identity: zero-copy passthrough
441        Some(c) => c.compress(&bytes),
442    }
443}
444
445/// Decompress bytes using the specified codec.
446///
447/// If `codec` is `None`, returns the input unchanged (identity).
448pub fn decompress_bytes(bytes: Bytes, codec: Option<&BoxedCodec>) -> io::Result<Bytes> {
449    match codec {
450        None => Ok(bytes), // identity: zero-copy passthrough
451        Some(c) => c.decompress(&bytes),
452    }
453}
454
455// ============================================================================
456// Compression Encoding Enum
457// ============================================================================
458
459/// Supported compression encodings.
460///
461/// This enum is used for header parsing and negotiation.
462/// Use [`resolve_codec`] to get the actual codec implementation.
463#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
464pub enum CompressionEncoding {
465    #[default]
466    Identity,
467    Gzip,
468    #[cfg(feature = "compression-deflate")]
469    Deflate,
470    #[cfg(feature = "compression-br")]
471    Brotli,
472    #[cfg(feature = "compression-zstd")]
473    Zstd,
474}
475
476impl CompressionEncoding {
477    /// Parse from Content-Encoding or Connect-Content-Encoding header value.
478    /// Returns None for unsupported encodings (caller should return Unimplemented).
479    pub fn from_header(value: Option<&str>) -> Option<Self> {
480        match value {
481            None | Some("identity") | Some("") => Some(Self::Identity),
482            Some("gzip") => Some(Self::Gzip),
483            #[cfg(feature = "compression-deflate")]
484            Some("deflate") => Some(Self::Deflate),
485            #[cfg(feature = "compression-br")]
486            Some("br") => Some(Self::Brotli),
487            #[cfg(feature = "compression-zstd")]
488            Some("zstd") => Some(Self::Zstd),
489            _ => None, // unsupported
490        }
491    }
492
493    /// Get the header value string for this encoding.
494    pub fn as_str(&self) -> &'static str {
495        match self {
496            Self::Identity => "identity",
497            Self::Gzip => "gzip",
498            #[cfg(feature = "compression-deflate")]
499            Self::Deflate => "deflate",
500            #[cfg(feature = "compression-br")]
501            Self::Brotli => "br",
502            #[cfg(feature = "compression-zstd")]
503            Self::Zstd => "zstd",
504        }
505    }
506
507    /// Get the codec for this encoding.
508    ///
509    /// Returns `None` for identity, `Some(BoxedCodec)` for others.
510    pub fn codec(&self) -> Option<BoxedCodec> {
511        match self {
512            Self::Identity => None,
513            Self::Gzip => Some(BoxedCodec::new(GzipCodec::default())),
514            #[cfg(feature = "compression-deflate")]
515            Self::Deflate => Some(BoxedCodec::new(DeflateCodec::default())),
516            #[cfg(feature = "compression-br")]
517            Self::Brotli => Some(BoxedCodec::new(BrotliCodec::default())),
518            #[cfg(feature = "compression-zstd")]
519            Self::Zstd => Some(BoxedCodec::new(ZstdCodec::default())),
520        }
521    }
522
523    /// Get the codec for this encoding with the specified compression level.
524    ///
525    /// Returns `None` for identity, `Some(BoxedCodec)` for others.
526    /// The level is converted to algorithm-specific values matching tower-http behavior.
527    pub fn codec_with_level(&self, level: CompressionLevel) -> Option<BoxedCodec> {
528        match self {
529            Self::Identity => None,
530            Self::Gzip => Some(BoxedCodec::new(GzipCodec::with_level(level_to_flate2(level)))),
531            #[cfg(feature = "compression-deflate")]
532            Self::Deflate => Some(BoxedCodec::new(DeflateCodec::with_level(level_to_flate2(
533                level,
534            )))),
535            #[cfg(feature = "compression-br")]
536            Self::Brotli => Some(BoxedCodec::new(BrotliCodec::with_quality(level_to_brotli(
537                level,
538            )))),
539            #[cfg(feature = "compression-zstd")]
540            Self::Zstd => Some(BoxedCodec::new(ZstdCodec::with_level(level_to_zstd(level)))),
541        }
542    }
543}
544
545// ============================================================================
546// Compression Level Conversion
547// ============================================================================
548
549/// Convert CompressionLevel to flate2 gzip/deflate level (0-9).
550///
551/// Matches tower-http → async_compression → compression_codecs behavior:
552/// - `Fastest` → 1 (flate2::Compression::fast())
553/// - `Best` → 9 (flate2::Compression::best())
554/// - `Default` → 6 (flate2::Compression::default())
555/// - `Precise(n)` → n clamped to 0-9
556fn level_to_flate2(level: CompressionLevel) -> u32 {
557    match level {
558        CompressionLevel::Fastest => 1,
559        CompressionLevel::Best => 9,
560        CompressionLevel::Default => 6,
561        CompressionLevel::Precise(n) => n.clamp(0, 9) as u32,
562        _ => 6, // Future variants: use default
563    }
564}
565
566/// Convert CompressionLevel to brotli quality (0-11).
567///
568/// tower-http overrides Default to 4 (NGINX default) for performance.
569/// The brotli library default is 11, which is too slow for on-the-fly compression.
570#[cfg(feature = "compression-br")]
571fn level_to_brotli(level: CompressionLevel) -> u32 {
572    match level {
573        CompressionLevel::Fastest => 0,
574        CompressionLevel::Best => 11,
575        CompressionLevel::Default => 4, // tower-http's custom default (not 11)
576        CompressionLevel::Precise(n) => n.clamp(0, 11) as u32,
577        _ => 4, // Future variants: use default
578    }
579}
580
581/// Convert CompressionLevel to zstd level (1-22).
582///
583/// Note: zstd supports negative levels but we follow tower-http/async-compression
584/// which uses 1 as "fastest" (negative levels produce larger outputs).
585#[cfg(feature = "compression-zstd")]
586fn level_to_zstd(level: CompressionLevel) -> i32 {
587    match level {
588        CompressionLevel::Fastest => 1,   // OUR_FASTEST in async-compression
589        CompressionLevel::Best => 22,     // libzstd max
590        CompressionLevel::Default => 3,   // libzstd::DEFAULT_COMPRESSION_LEVEL
591        CompressionLevel::Precise(n) => (n as i32).clamp(1, 22),
592        _ => 3, // Future variants: use default
593    }
594}
595
596// ============================================================================
597// Compression Configuration
598// ============================================================================
599
600/// Re-export tower-http's CompressionLevel for unified compression configuration.
601///
602/// This is used for both:
603/// - Tower's HTTP body compression (unary RPCs via `CompressionLayer`)
604/// - Envelope compression (streaming RPCs via `GzipCodec`, etc.)
605pub use tower_http::CompressionLevel;
606
607/// Server compression configuration.
608#[derive(Debug, Clone, Copy)]
609pub struct CompressionConfig {
610    /// Minimum bytes before compression is applied.
611    /// Default is 0 (compress everything), matching connect-go behavior.
612    /// Messages smaller than this threshold are sent uncompressed.
613    pub min_bytes: usize,
614    /// Compression level/quality.
615    /// Default is `CompressionLevel::Default` which varies by algorithm:
616    /// - gzip: level 4
617    /// - brotli: level 4
618    /// - zstd: level 3
619    /// - deflate: level 4
620    pub level: CompressionLevel,
621}
622
623impl Default for CompressionConfig {
624    fn default() -> Self {
625        // Connect-go default is 0 (compress everything)
626        Self {
627            min_bytes: 0,
628            level: CompressionLevel::Default,
629        }
630    }
631}
632
633impl CompressionConfig {
634    /// Create a new compression config with the specified minimum bytes threshold.
635    pub fn new(min_bytes: usize) -> Self {
636        Self {
637            min_bytes,
638            level: CompressionLevel::Default,
639        }
640    }
641
642    /// Set the compression level.
643    ///
644    /// # Examples
645    ///
646    /// ```rust
647    /// use connectrpc_axum::context::CompressionConfig;
648    /// use connectrpc_axum::context::CompressionLevel;
649    ///
650    /// // Use fastest compression
651    /// let config = CompressionConfig::default().level(CompressionLevel::Fastest);
652    ///
653    /// // Use best compression
654    /// let config = CompressionConfig::default().level(CompressionLevel::Best);
655    ///
656    /// // Use precise level (algorithm-specific)
657    /// let config = CompressionConfig::default().level(CompressionLevel::Precise(6));
658    /// ```
659    pub fn level(mut self, level: CompressionLevel) -> Self {
660        self.level = level;
661        self
662    }
663
664    /// Disable compression by setting threshold to usize::MAX.
665    pub fn disabled() -> Self {
666        Self {
667            min_bytes: usize::MAX,
668            level: CompressionLevel::Default,
669        }
670    }
671}
672
673// ============================================================================
674// Header Parsing
675// ============================================================================
676
677/// Header name for Connect streaming request compression.
678pub const CONNECT_CONTENT_ENCODING: &str = "connect-content-encoding";
679
680/// Header name for Connect streaming response compression negotiation.
681pub const CONNECT_ACCEPT_ENCODING: &str = "connect-accept-encoding";
682
683/// Negotiate response encoding from Accept-Encoding header.
684///
685/// Follows connect-go's approach: first supported encoding wins (client preference order).
686/// Respects `q=0` which means "not acceptable" per RFC 7231.
687pub fn negotiate_response_encoding(accept: Option<&str>) -> CompressionEncoding {
688    let Some(accept) = accept else {
689        return CompressionEncoding::Identity;
690    };
691
692    for token in accept.split(',') {
693        let token = token.trim();
694        if token.is_empty() {
695            continue;
696        }
697
698        // Parse "gzip;q=0.5" into encoding="gzip", q_value=Some("0.5")
699        let (encoding, q_value) = match token.split_once(';') {
700            Some((enc, params)) => {
701                let q = params.split(';').find_map(|p| p.trim().strip_prefix("q="));
702                (enc.trim(), q)
703            }
704            None => (token, None),
705        };
706
707        // Skip if q=0 (explicitly disabled)
708        if let Some(q) = q_value
709            && (q.trim() == "0" || q.trim() == "0.0" || q.trim() == "0.00" || q.trim() == "0.000")
710        {
711            continue;
712        }
713
714        // Return first supported encoding
715        match encoding {
716            "gzip" => return CompressionEncoding::Gzip,
717            #[cfg(feature = "compression-deflate")]
718            "deflate" => return CompressionEncoding::Deflate,
719            #[cfg(feature = "compression-br")]
720            "br" => return CompressionEncoding::Brotli,
721            #[cfg(feature = "compression-zstd")]
722            "zstd" => return CompressionEncoding::Zstd,
723            "identity" => return CompressionEncoding::Identity,
724            _ => continue,
725        }
726    }
727
728    CompressionEncoding::Identity
729}
730
731/// Per-envelope compression settings for streaming RPCs.
732///
733/// Parsed from `Connect-Content-Encoding` and `Connect-Accept-Encoding` headers.
734/// Only used for streaming - unary RPCs use Tower's HTTP body compression.
735#[derive(Debug, Clone, Copy)]
736pub struct EnvelopeCompression {
737    /// Encoding used for request envelopes (from `Connect-Content-Encoding`).
738    pub request: CompressionEncoding,
739    /// Negotiated encoding for response envelopes (from `Connect-Accept-Encoding`).
740    pub response: CompressionEncoding,
741}
742
743/// Parse envelope compression settings from streaming request headers.
744///
745/// Returns `Some(EnvelopeCompression)` for streaming RPCs, parsing:
746/// - `Connect-Content-Encoding` for per-envelope request compression
747/// - `Connect-Accept-Encoding` for per-envelope response compression negotiation
748///
749/// Returns `None` for unary RPCs (Tower handles HTTP body compression via
750/// `Content-Encoding`/`Accept-Encoding`).
751///
752/// Returns `Err(ConnectError)` if `Connect-Content-Encoding` is unsupported.
753pub fn parse_envelope_compression<B>(
754    req: &axum::http::Request<B>,
755    is_streaming: bool,
756) -> Result<Option<EnvelopeCompression>, ConnectError> {
757    // Unary compression is handled by Tower's CompressionLayer/DecompressionLayer
758    // via Content-Encoding/Accept-Encoding headers.
759    if !is_streaming {
760        return Ok(None);
761    }
762
763    // Parse Connect-Content-Encoding for streaming request compression
764    let content_encoding = req
765        .headers()
766        .get(CONNECT_CONTENT_ENCODING)
767        .and_then(|v| v.to_str().ok());
768
769    let request_encoding = match CompressionEncoding::from_header(content_encoding) {
770        Some(enc) => enc,
771        None => {
772            return Err(ConnectError::new(
773                Code::Unimplemented,
774                format!(
775                    "unsupported compression \"{}\": supported encodings are {}",
776                    content_encoding.unwrap_or(""),
777                    supported_encodings_str()
778                ),
779            ));
780        }
781    };
782
783    // Parse Connect-Accept-Encoding for streaming response compression
784    let accept_encoding = req
785        .headers()
786        .get(CONNECT_ACCEPT_ENCODING)
787        .and_then(|v| v.to_str().ok());
788    let response_encoding = negotiate_response_encoding(accept_encoding);
789
790    Ok(Some(EnvelopeCompression {
791        request: request_encoding,
792        response: response_encoding,
793    }))
794}
795
796#[cfg(test)]
797mod tests {
798    use super::*;
799
800    #[test]
801    fn test_compression_encoding_from_header() {
802        // Identity cases
803        assert_eq!(
804            CompressionEncoding::from_header(None),
805            Some(CompressionEncoding::Identity)
806        );
807        assert_eq!(
808            CompressionEncoding::from_header(Some("")),
809            Some(CompressionEncoding::Identity)
810        );
811        assert_eq!(
812            CompressionEncoding::from_header(Some("identity")),
813            Some(CompressionEncoding::Identity)
814        );
815
816        // Gzip
817        assert_eq!(
818            CompressionEncoding::from_header(Some("gzip")),
819            Some(CompressionEncoding::Gzip)
820        );
821
822        // Feature-gated: supported when feature enabled, None otherwise
823        #[cfg(feature = "compression-br")]
824        assert_eq!(
825            CompressionEncoding::from_header(Some("br")),
826            Some(CompressionEncoding::Brotli)
827        );
828        #[cfg(not(feature = "compression-br"))]
829        assert_eq!(CompressionEncoding::from_header(Some("br")), None);
830
831        #[cfg(feature = "compression-deflate")]
832        assert_eq!(
833            CompressionEncoding::from_header(Some("deflate")),
834            Some(CompressionEncoding::Deflate)
835        );
836        #[cfg(not(feature = "compression-deflate"))]
837        assert_eq!(CompressionEncoding::from_header(Some("deflate")), None);
838
839        #[cfg(feature = "compression-zstd")]
840        assert_eq!(
841            CompressionEncoding::from_header(Some("zstd")),
842            Some(CompressionEncoding::Zstd)
843        );
844        #[cfg(not(feature = "compression-zstd"))]
845        assert_eq!(CompressionEncoding::from_header(Some("zstd")), None);
846
847        // Always unsupported
848        assert_eq!(CompressionEncoding::from_header(Some("lz4")), None);
849    }
850
851    #[test]
852    fn test_compression_encoding_as_str() {
853        assert_eq!(CompressionEncoding::Identity.as_str(), "identity");
854        assert_eq!(CompressionEncoding::Gzip.as_str(), "gzip");
855    }
856
857    #[test]
858    fn test_compression_encoding_codec() {
859        // Identity returns None
860        assert!(CompressionEncoding::Identity.codec().is_none());
861
862        // Gzip returns Some
863        let codec = CompressionEncoding::Gzip.codec();
864        assert!(codec.is_some());
865        assert_eq!(codec.unwrap().name(), "gzip");
866    }
867
868    #[test]
869    fn test_negotiate_response_encoding() {
870        // Gzip requested
871        assert_eq!(
872            negotiate_response_encoding(Some("gzip")),
873            CompressionEncoding::Gzip
874        );
875        assert_eq!(
876            negotiate_response_encoding(Some("gzip, deflate, br")),
877            CompressionEncoding::Gzip
878        );
879
880        // deflate first - depends on feature
881        #[cfg(feature = "compression-deflate")]
882        assert_eq!(
883            negotiate_response_encoding(Some("deflate, gzip")),
884            CompressionEncoding::Deflate
885        );
886        #[cfg(not(feature = "compression-deflate"))]
887        assert_eq!(
888            negotiate_response_encoding(Some("deflate, gzip")),
889            CompressionEncoding::Gzip
890        );
891
892        // No gzip, only unsupported algorithms (when features disabled)
893        #[cfg(all(not(feature = "compression-deflate"), not(feature = "compression-br")))]
894        assert_eq!(
895            negotiate_response_encoding(Some("deflate, br")),
896            CompressionEncoding::Identity
897        );
898        // When deflate feature is enabled
899        #[cfg(all(feature = "compression-deflate", not(feature = "compression-br")))]
900        assert_eq!(
901            negotiate_response_encoding(Some("deflate, br")),
902            CompressionEncoding::Deflate
903        );
904        // When br feature is enabled (but not deflate)
905        #[cfg(all(not(feature = "compression-deflate"), feature = "compression-br"))]
906        assert_eq!(
907            negotiate_response_encoding(Some("deflate, br")),
908            CompressionEncoding::Brotli
909        );
910        // When both features are enabled
911        #[cfg(all(feature = "compression-deflate", feature = "compression-br"))]
912        assert_eq!(
913            negotiate_response_encoding(Some("deflate, br")),
914            CompressionEncoding::Deflate
915        );
916
917        assert_eq!(
918            negotiate_response_encoding(None),
919            CompressionEncoding::Identity
920        );
921        assert_eq!(
922            negotiate_response_encoding(Some("")),
923            CompressionEncoding::Identity
924        );
925    }
926
927    #[test]
928    fn test_negotiate_response_encoding_order() {
929        // First supported encoding wins (client preference order)
930        #[cfg(feature = "compression-br")]
931        assert_eq!(
932            negotiate_response_encoding(Some("br, gzip")),
933            CompressionEncoding::Brotli
934        );
935        #[cfg(not(feature = "compression-br"))]
936        assert_eq!(
937            negotiate_response_encoding(Some("br, gzip")),
938            CompressionEncoding::Gzip
939        );
940
941        assert_eq!(
942            negotiate_response_encoding(Some("gzip, identity")),
943            CompressionEncoding::Gzip
944        );
945        assert_eq!(
946            negotiate_response_encoding(Some("identity, gzip")),
947            CompressionEncoding::Identity
948        );
949
950        #[cfg(feature = "compression-br")]
951        assert_eq!(
952            negotiate_response_encoding(Some("br, zstd, gzip")),
953            CompressionEncoding::Brotli
954        );
955        #[cfg(all(not(feature = "compression-br"), feature = "compression-zstd"))]
956        assert_eq!(
957            negotiate_response_encoding(Some("br, zstd, gzip")),
958            CompressionEncoding::Zstd
959        );
960        #[cfg(all(not(feature = "compression-br"), not(feature = "compression-zstd")))]
961        assert_eq!(
962            negotiate_response_encoding(Some("br, zstd, gzip")),
963            CompressionEncoding::Gzip
964        );
965    }
966
967    #[test]
968    fn test_negotiate_response_encoding_q_values() {
969        // q=0 means "not acceptable" - should be skipped
970        assert_eq!(
971            negotiate_response_encoding(Some("gzip;q=0")),
972            CompressionEncoding::Identity
973        );
974        assert_eq!(
975            negotiate_response_encoding(Some("gzip;q=0, identity")),
976            CompressionEncoding::Identity
977        );
978        assert_eq!(
979            negotiate_response_encoding(Some("gzip;q=0.0")),
980            CompressionEncoding::Identity
981        );
982
983        // Non-zero q values should be accepted (we ignore the actual weight)
984        assert_eq!(
985            negotiate_response_encoding(Some("gzip;q=1")),
986            CompressionEncoding::Gzip
987        );
988        assert_eq!(
989            negotiate_response_encoding(Some("gzip;q=0.5")),
990            CompressionEncoding::Gzip
991        );
992        assert_eq!(
993            negotiate_response_encoding(Some("gzip;q=0.001")),
994            CompressionEncoding::Gzip
995        );
996
997        // Mixed: skip disabled, use first enabled
998        // When br feature is enabled, br;q=1 is accepted
999        #[cfg(feature = "compression-br")]
1000        assert_eq!(
1001            negotiate_response_encoding(Some("br;q=1, gzip;q=0, identity")),
1002            CompressionEncoding::Brotli
1003        );
1004        // When br feature is disabled, br is skipped, gzip;q=0 is skipped, identity is returned
1005        #[cfg(not(feature = "compression-br"))]
1006        assert_eq!(
1007            negotiate_response_encoding(Some("br;q=1, gzip;q=0, identity")),
1008            CompressionEncoding::Identity
1009        );
1010
1011        assert_eq!(
1012            negotiate_response_encoding(Some("gzip;q=0, identity;q=0")),
1013            CompressionEncoding::Identity
1014        );
1015    }
1016
1017    #[test]
1018    fn test_negotiate_response_encoding_whitespace() {
1019        // Handle various whitespace scenarios
1020        assert_eq!(
1021            negotiate_response_encoding(Some("  gzip  ")),
1022            CompressionEncoding::Gzip
1023        );
1024        assert_eq!(
1025            negotiate_response_encoding(Some("gzip ; q=0")),
1026            CompressionEncoding::Identity
1027        );
1028        assert_eq!(
1029            negotiate_response_encoding(Some("gzip;  q=0")),
1030            CompressionEncoding::Identity
1031        );
1032
1033        // br first with whitespace
1034        #[cfg(feature = "compression-br")]
1035        assert_eq!(
1036            negotiate_response_encoding(Some("br ,  gzip")),
1037            CompressionEncoding::Brotli
1038        );
1039        #[cfg(not(feature = "compression-br"))]
1040        assert_eq!(
1041            negotiate_response_encoding(Some("br ,  gzip")),
1042            CompressionEncoding::Gzip
1043        );
1044    }
1045
1046    #[test]
1047    fn test_gzip_codec_compress_decompress() {
1048        let codec = GzipCodec::default();
1049        assert_eq!(codec.name(), "gzip");
1050
1051        let original = b"Hello, World! This is a test message.";
1052        let compressed = codec.compress(original).unwrap();
1053        assert_ne!(&compressed[..], &original[..]);
1054
1055        let decompressed = codec.decompress(&compressed).unwrap();
1056        assert_eq!(&decompressed[..], &original[..]);
1057    }
1058
1059    #[test]
1060    fn test_gzip_codec_with_level() {
1061        let codec = GzipCodec::with_level(9);
1062        assert_eq!(codec.level, 9);
1063
1064        let original = b"Hello, World! This is a test message.";
1065        let compressed = codec.compress(original).unwrap();
1066        let decompressed = codec.decompress(&compressed).unwrap();
1067        assert_eq!(&decompressed[..], &original[..]);
1068    }
1069
1070    #[test]
1071    fn test_boxed_codec() {
1072        let codec = BoxedCodec::new(GzipCodec::default());
1073        assert_eq!(codec.name(), "gzip");
1074
1075        let original = b"Hello, World! This is a test message.";
1076        let compressed = codec.compress(original).unwrap();
1077        assert_ne!(&compressed[..], &original[..]);
1078
1079        let decompressed = codec.decompress(&compressed).unwrap();
1080        assert_eq!(&decompressed[..], &original[..]);
1081    }
1082
1083    #[test]
1084    fn test_compress_decompress_bytes_with_codec() {
1085        let codec = BoxedCodec::new(GzipCodec::default());
1086        let original = Bytes::from_static(b"Hello, World! This is a test message.");
1087
1088        let compressed = compress_bytes(original.clone(), Some(&codec)).unwrap();
1089        assert_ne!(compressed, original);
1090
1091        let decompressed = decompress_bytes(compressed, Some(&codec)).unwrap();
1092        assert_eq!(decompressed, original);
1093    }
1094
1095    #[test]
1096    fn test_compress_decompress_bytes_identity() {
1097        let original = Bytes::from_static(b"Hello, World!");
1098
1099        // None = identity, zero-copy passthrough
1100        let compressed = compress_bytes(original.clone(), None).unwrap();
1101        assert_eq!(compressed, original);
1102
1103        let decompressed = decompress_bytes(compressed, None).unwrap();
1104        assert_eq!(decompressed, original);
1105    }
1106
1107    #[test]
1108    fn test_resolve_codec() {
1109        // Identity
1110        assert!(resolve_codec("").unwrap().is_none());
1111        assert!(resolve_codec("identity").unwrap().is_none());
1112
1113        // Gzip
1114        let codec = resolve_codec("gzip").unwrap();
1115        assert!(codec.is_some());
1116        assert_eq!(codec.unwrap().name(), "gzip");
1117
1118        // Deflate - feature gated
1119        #[cfg(feature = "compression-deflate")]
1120        {
1121            let codec = resolve_codec("deflate").unwrap();
1122            assert!(codec.is_some());
1123            assert_eq!(codec.unwrap().name(), "deflate");
1124        }
1125        #[cfg(not(feature = "compression-deflate"))]
1126        assert!(resolve_codec("deflate").is_err());
1127
1128        // Brotli - feature gated
1129        #[cfg(feature = "compression-br")]
1130        {
1131            let codec = resolve_codec("br").unwrap();
1132            assert!(codec.is_some());
1133            assert_eq!(codec.unwrap().name(), "br");
1134        }
1135        #[cfg(not(feature = "compression-br"))]
1136        assert!(resolve_codec("br").is_err());
1137
1138        // Zstd - feature gated
1139        #[cfg(feature = "compression-zstd")]
1140        {
1141            let codec = resolve_codec("zstd").unwrap();
1142            assert!(codec.is_some());
1143            assert_eq!(codec.unwrap().name(), "zstd");
1144        }
1145        #[cfg(not(feature = "compression-zstd"))]
1146        assert!(resolve_codec("zstd").is_err());
1147
1148        // Always unsupported
1149        assert!(resolve_codec("lz4").is_err());
1150    }
1151
1152    #[test]
1153    fn test_decompress_invalid_gzip() {
1154        let codec = BoxedCodec::new(GzipCodec::default());
1155        let invalid = b"not valid gzip data";
1156        let result = codec.decompress(invalid);
1157        assert!(result.is_err());
1158    }
1159
1160    #[test]
1161    fn test_compression_config_default() {
1162        let config = CompressionConfig::default();
1163        // Connect-go default is 0 (compress everything)
1164        assert_eq!(config.min_bytes, 0);
1165    }
1166
1167    #[test]
1168    fn test_compression_config_new() {
1169        let config = CompressionConfig::new(512);
1170        assert_eq!(config.min_bytes, 512);
1171    }
1172
1173    #[test]
1174    fn test_compression_config_disabled() {
1175        let config = CompressionConfig::disabled();
1176        assert_eq!(config.min_bytes, usize::MAX);
1177    }
1178
1179    #[test]
1180    fn test_boxed_codec_debug() {
1181        let codec = BoxedCodec::new(GzipCodec::default());
1182        let debug_str = format!("{:?}", codec);
1183        assert!(debug_str.contains("BoxedCodec"));
1184        assert!(debug_str.contains("gzip"));
1185    }
1186
1187    // Feature-gated codec tests
1188    #[cfg(feature = "compression-deflate")]
1189    #[test]
1190    fn test_deflate_codec_compress_decompress() {
1191        let codec = DeflateCodec::default();
1192        assert_eq!(codec.name(), "deflate");
1193
1194        let original = b"Hello, World! This is a test message for deflate.";
1195        let compressed = codec.compress(original).unwrap();
1196        assert_ne!(&compressed[..], &original[..]);
1197
1198        let decompressed = codec.decompress(&compressed).unwrap();
1199        assert_eq!(&decompressed[..], &original[..]);
1200    }
1201
1202    #[cfg(feature = "compression-deflate")]
1203    #[test]
1204    fn test_deflate_codec_with_level() {
1205        let codec = DeflateCodec::with_level(9);
1206        assert_eq!(codec.level, 9);
1207
1208        let original = b"Hello, World! This is a test message.";
1209        let compressed = codec.compress(original).unwrap();
1210        let decompressed = codec.decompress(&compressed).unwrap();
1211        assert_eq!(&decompressed[..], &original[..]);
1212    }
1213
1214    #[cfg(feature = "compression-br")]
1215    #[test]
1216    fn test_brotli_codec_compress_decompress() {
1217        let codec = BrotliCodec::default();
1218        assert_eq!(codec.name(), "br");
1219
1220        let original = b"Hello, World! This is a test message for brotli.";
1221        let compressed = codec.compress(original).unwrap();
1222        assert_ne!(&compressed[..], &original[..]);
1223
1224        let decompressed = codec.decompress(&compressed).unwrap();
1225        assert_eq!(&decompressed[..], &original[..]);
1226    }
1227
1228    #[cfg(feature = "compression-br")]
1229    #[test]
1230    fn test_brotli_codec_with_quality() {
1231        let codec = BrotliCodec::with_quality(11);
1232        assert_eq!(codec.quality, 11);
1233
1234        let original = b"Hello, World! This is a test message.";
1235        let compressed = codec.compress(original).unwrap();
1236        let decompressed = codec.decompress(&compressed).unwrap();
1237        assert_eq!(&decompressed[..], &original[..]);
1238    }
1239
1240    #[cfg(feature = "compression-zstd")]
1241    #[test]
1242    fn test_zstd_codec_compress_decompress() {
1243        let codec = ZstdCodec::default();
1244        assert_eq!(codec.name(), "zstd");
1245
1246        let original = b"Hello, World! This is a test message for zstd.";
1247        let compressed = codec.compress(original).unwrap();
1248        assert_ne!(&compressed[..], &original[..]);
1249
1250        let decompressed = codec.decompress(&compressed).unwrap();
1251        assert_eq!(&decompressed[..], &original[..]);
1252    }
1253
1254    #[cfg(feature = "compression-zstd")]
1255    #[test]
1256    fn test_zstd_codec_with_level() {
1257        let codec = ZstdCodec::with_level(19);
1258        assert_eq!(codec.level, 19);
1259
1260        let original = b"Hello, World! This is a test message.";
1261        let compressed = codec.compress(original).unwrap();
1262        let decompressed = codec.decompress(&compressed).unwrap();
1263        assert_eq!(&decompressed[..], &original[..]);
1264    }
1265
1266    #[test]
1267    fn test_codec_with_level_produces_different_compression() {
1268        // Use sufficiently large data to see compression differences
1269        let original: Vec<u8> = (0..10000)
1270            .map(|i| ((i % 256) as u8).wrapping_add((i / 256) as u8))
1271            .collect();
1272
1273        // Test gzip with different levels
1274        let codec_fastest = CompressionEncoding::Gzip
1275            .codec_with_level(CompressionLevel::Fastest)
1276            .unwrap();
1277        let codec_best = CompressionEncoding::Gzip
1278            .codec_with_level(CompressionLevel::Best)
1279            .unwrap();
1280
1281        let compressed_fastest = codec_fastest.compress(&original).unwrap();
1282        let compressed_best = codec_best.compress(&original).unwrap();
1283
1284        // Best compression should produce smaller output (or equal in edge cases)
1285        assert!(
1286            compressed_best.len() <= compressed_fastest.len(),
1287            "Best compression ({}) should be <= fastest ({})",
1288            compressed_best.len(),
1289            compressed_fastest.len()
1290        );
1291
1292        // Both should decompress correctly
1293        let decompressed_fastest = codec_fastest.decompress(&compressed_fastest).unwrap();
1294        let decompressed_best = codec_best.decompress(&compressed_best).unwrap();
1295        assert_eq!(decompressed_fastest, original);
1296        assert_eq!(decompressed_best, original);
1297    }
1298
1299    #[test]
1300    fn test_codec_with_level_vs_default_codec() {
1301        // Verify that codec_with_level(Default) produces same results as codec()
1302        let original = b"Hello, World! This is a test message for compression level comparison.";
1303
1304        let codec_default = CompressionEncoding::Gzip.codec().unwrap();
1305        let codec_with_default = CompressionEncoding::Gzip
1306            .codec_with_level(CompressionLevel::Default)
1307            .unwrap();
1308
1309        let compressed_default = codec_default.compress(original).unwrap();
1310        let compressed_with_default = codec_with_default.compress(original).unwrap();
1311
1312        // Should produce identical output since both use default level
1313        assert_eq!(compressed_default, compressed_with_default);
1314    }
1315
1316    #[test]
1317    fn test_codec_with_level_identity_returns_none() {
1318        // Identity encoding should return None for any level
1319        assert!(CompressionEncoding::Identity
1320            .codec_with_level(CompressionLevel::Default)
1321            .is_none());
1322        assert!(CompressionEncoding::Identity
1323            .codec_with_level(CompressionLevel::Best)
1324            .is_none());
1325        assert!(CompressionEncoding::Identity
1326            .codec_with_level(CompressionLevel::Fastest)
1327            .is_none());
1328    }
1329}