Skip to main content

connectrpc_axum_core/
compression.rs

1//! Compression configuration types.
2//!
3//! This module provides configuration types for compression in ConnectRPC:
4//! - [`CompressionEncoding`]: Supported compression algorithms
5//! - [`CompressionLevel`]: Compression quality settings
6//! - [`CompressionConfig`]: Server/client compression configuration
7
8use crate::codec::BoxedCodec;
9
10#[cfg(feature = "compression-gzip-stream")]
11use crate::codec::GzipCodec;
12
13#[cfg(feature = "compression-deflate-stream")]
14use crate::codec::DeflateCodec;
15
16#[cfg(feature = "compression-br-stream")]
17use crate::codec::BrotliCodec;
18
19#[cfg(feature = "compression-zstd-stream")]
20use crate::codec::ZstdCodec;
21
22/// Supported compression encodings.
23///
24/// This enum is used for header parsing and negotiation.
25/// Use [`CompressionEncoding::codec()`] to get the actual codec implementation.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
27pub enum CompressionEncoding {
28    #[default]
29    Identity,
30    #[cfg(feature = "compression-gzip-stream")]
31    Gzip,
32    #[cfg(feature = "compression-deflate-stream")]
33    Deflate,
34    #[cfg(feature = "compression-br-stream")]
35    Brotli,
36    #[cfg(feature = "compression-zstd-stream")]
37    Zstd,
38}
39
40impl CompressionEncoding {
41    /// Parse from Content-Encoding or Connect-Content-Encoding header value.
42    /// Returns None for unsupported encodings (caller should return Unimplemented).
43    pub fn from_header(value: Option<&str>) -> Option<Self> {
44        match value {
45            None | Some("identity") | Some("") => Some(Self::Identity),
46            #[cfg(feature = "compression-gzip-stream")]
47            Some("gzip") => Some(Self::Gzip),
48            #[cfg(feature = "compression-deflate-stream")]
49            Some("deflate") => Some(Self::Deflate),
50            #[cfg(feature = "compression-br-stream")]
51            Some("br") => Some(Self::Brotli),
52            #[cfg(feature = "compression-zstd-stream")]
53            Some("zstd") => Some(Self::Zstd),
54            _ => None, // unsupported
55        }
56    }
57
58    /// Get the header value string for this encoding.
59    pub fn as_str(&self) -> &'static str {
60        match self {
61            Self::Identity => "identity",
62            #[cfg(feature = "compression-gzip-stream")]
63            Self::Gzip => "gzip",
64            #[cfg(feature = "compression-deflate-stream")]
65            Self::Deflate => "deflate",
66            #[cfg(feature = "compression-br-stream")]
67            Self::Brotli => "br",
68            #[cfg(feature = "compression-zstd-stream")]
69            Self::Zstd => "zstd",
70        }
71    }
72
73    /// Returns true if this encoding is identity (no compression).
74    pub fn is_identity(&self) -> bool {
75        matches!(self, Self::Identity)
76    }
77
78    /// Get the codec for this encoding.
79    ///
80    /// Returns `None` for identity, `Some(BoxedCodec)` for others.
81    pub fn codec(&self) -> Option<BoxedCodec> {
82        match self {
83            Self::Identity => None,
84            #[cfg(feature = "compression-gzip-stream")]
85            Self::Gzip => Some(BoxedCodec::new(GzipCodec::default())),
86            #[cfg(feature = "compression-deflate-stream")]
87            Self::Deflate => Some(BoxedCodec::new(DeflateCodec::default())),
88            #[cfg(feature = "compression-br-stream")]
89            Self::Brotli => Some(BoxedCodec::new(BrotliCodec::default())),
90            #[cfg(feature = "compression-zstd-stream")]
91            Self::Zstd => Some(BoxedCodec::new(ZstdCodec::default())),
92        }
93    }
94
95    /// Get the codec for this encoding with the specified compression level.
96    ///
97    /// Returns `None` for identity, `Some(BoxedCodec)` for others.
98    #[allow(unused_variables)]
99    pub fn codec_with_level(&self, level: CompressionLevel) -> Option<BoxedCodec> {
100        match self {
101            Self::Identity => None,
102            #[cfg(feature = "compression-gzip-stream")]
103            Self::Gzip => Some(BoxedCodec::new(GzipCodec::with_level(level_to_flate2(
104                level,
105            )))),
106            #[cfg(feature = "compression-deflate-stream")]
107            Self::Deflate => Some(BoxedCodec::new(DeflateCodec::with_level(level_to_flate2(
108                level,
109            )))),
110            #[cfg(feature = "compression-br-stream")]
111            Self::Brotli => Some(BoxedCodec::new(BrotliCodec::with_quality(level_to_brotli(
112                level,
113            )))),
114            #[cfg(feature = "compression-zstd-stream")]
115            Self::Zstd => Some(BoxedCodec::new(ZstdCodec::with_level(level_to_zstd(level)))),
116        }
117    }
118}
119
120/// Compression level configuration.
121///
122/// This is a local definition that doesn't depend on tower-http,
123/// making it suitable for use in both client and server contexts.
124#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
125pub enum CompressionLevel {
126    /// Fastest compression (lowest ratio).
127    Fastest,
128    /// Best compression (highest ratio, slowest).
129    Best,
130    /// Default compression level for each algorithm.
131    #[default]
132    Default,
133    /// Precise compression level (algorithm-specific value).
134    Precise(u32),
135}
136
137impl CompressionLevel {
138    /// Create a compression level with a precise value.
139    ///
140    /// The value interpretation is algorithm-specific:
141    /// - gzip/deflate: 0-9 (0=no compression, 9=best)
142    /// - brotli: 0-11 (0=fastest, 11=best)
143    /// - zstd: 1-22 (1=fastest, 22=best)
144    pub fn precise(level: u32) -> Self {
145        CompressionLevel::Precise(level)
146    }
147}
148
149/// Convert CompressionLevel to flate2 gzip level (0-9).
150///
151/// Matches tower-http → async_compression behavior:
152/// - `Fastest` → 1
153/// - `Best` → 9
154/// - `Default` → 6
155/// - `Precise(n)` → n clamped to 0-9
156#[cfg(any(
157    feature = "compression-gzip-stream",
158    feature = "compression-deflate-stream"
159))]
160fn level_to_flate2(level: CompressionLevel) -> u32 {
161    match level {
162        CompressionLevel::Fastest => 1,
163        CompressionLevel::Best => 9,
164        CompressionLevel::Default => 6,
165        CompressionLevel::Precise(n) => n.clamp(0, 9),
166    }
167}
168
169/// Convert CompressionLevel to brotli quality (0-11).
170///
171/// tower-http overrides Default to 4 (NGINX default) for performance.
172#[cfg(feature = "compression-br-stream")]
173fn level_to_brotli(level: CompressionLevel) -> u32 {
174    match level {
175        CompressionLevel::Fastest => 0,
176        CompressionLevel::Best => 11,
177        CompressionLevel::Default => 4, // tower-http's custom default
178        CompressionLevel::Precise(n) => n.clamp(0, 11),
179    }
180}
181
182/// Convert CompressionLevel to zstd level (1-22).
183#[cfg(feature = "compression-zstd-stream")]
184fn level_to_zstd(level: CompressionLevel) -> i32 {
185    match level {
186        CompressionLevel::Fastest => 1,
187        CompressionLevel::Best => 22,
188        CompressionLevel::Default => 3,
189        CompressionLevel::Precise(n) => (n as i32).clamp(1, 22),
190    }
191}
192
193/// Compression configuration.
194///
195/// Used to configure compression behavior for both client and server.
196#[derive(Debug, Clone, Copy)]
197pub struct CompressionConfig {
198    /// Minimum bytes before compression is applied.
199    /// Default is 0 (compress everything), matching connect-go behavior.
200    /// Messages smaller than this threshold are sent uncompressed.
201    pub min_bytes: usize,
202    /// Compression level/quality.
203    pub level: CompressionLevel,
204}
205
206impl Default for CompressionConfig {
207    fn default() -> Self {
208        Self {
209            min_bytes: 0,
210            level: CompressionLevel::Default,
211        }
212    }
213}
214
215impl CompressionConfig {
216    /// Create a new compression config with the specified minimum bytes threshold.
217    pub fn new(min_bytes: usize) -> Self {
218        Self {
219            min_bytes,
220            level: CompressionLevel::Default,
221        }
222    }
223
224    /// Set the compression level.
225    pub fn level(mut self, level: CompressionLevel) -> Self {
226        self.level = level;
227        self
228    }
229
230    /// Disable compression by setting threshold to usize::MAX.
231    pub fn disabled() -> Self {
232        Self {
233            min_bytes: usize::MAX,
234            level: CompressionLevel::Default,
235        }
236    }
237
238    /// Check if compression is effectively disabled.
239    pub fn is_disabled(&self) -> bool {
240        self.min_bytes == usize::MAX
241    }
242}
243
244/// Returns a comma-separated string of supported encodings for error messages.
245pub fn supported_encodings_str() -> &'static str {
246    // Build string based on enabled features
247    // Order: gzip, deflate, br, zstd, identity
248    #[cfg(all(
249        feature = "compression-gzip-stream",
250        feature = "compression-deflate-stream",
251        feature = "compression-br-stream",
252        feature = "compression-zstd-stream"
253    ))]
254    {
255        "gzip, deflate, br, zstd, identity"
256    }
257    #[cfg(all(
258        feature = "compression-gzip-stream",
259        feature = "compression-deflate-stream",
260        feature = "compression-br-stream",
261        not(feature = "compression-zstd-stream")
262    ))]
263    {
264        "gzip, deflate, br, identity"
265    }
266    #[cfg(all(
267        feature = "compression-gzip-stream",
268        feature = "compression-deflate-stream",
269        not(feature = "compression-br-stream"),
270        feature = "compression-zstd-stream"
271    ))]
272    {
273        "gzip, deflate, zstd, identity"
274    }
275    #[cfg(all(
276        feature = "compression-gzip-stream",
277        feature = "compression-deflate-stream",
278        not(feature = "compression-br-stream"),
279        not(feature = "compression-zstd-stream")
280    ))]
281    {
282        "gzip, deflate, identity"
283    }
284    #[cfg(all(
285        feature = "compression-gzip-stream",
286        not(feature = "compression-deflate-stream"),
287        feature = "compression-br-stream",
288        feature = "compression-zstd-stream"
289    ))]
290    {
291        "gzip, br, zstd, identity"
292    }
293    #[cfg(all(
294        feature = "compression-gzip-stream",
295        not(feature = "compression-deflate-stream"),
296        feature = "compression-br-stream",
297        not(feature = "compression-zstd-stream")
298    ))]
299    {
300        "gzip, br, identity"
301    }
302    #[cfg(all(
303        feature = "compression-gzip-stream",
304        not(feature = "compression-deflate-stream"),
305        not(feature = "compression-br-stream"),
306        feature = "compression-zstd-stream"
307    ))]
308    {
309        "gzip, zstd, identity"
310    }
311    #[cfg(all(
312        feature = "compression-gzip-stream",
313        not(feature = "compression-deflate-stream"),
314        not(feature = "compression-br-stream"),
315        not(feature = "compression-zstd-stream")
316    ))]
317    {
318        "gzip, identity"
319    }
320    #[cfg(all(
321        not(feature = "compression-gzip-stream"),
322        feature = "compression-deflate-stream",
323        feature = "compression-br-stream",
324        feature = "compression-zstd-stream"
325    ))]
326    {
327        "deflate, br, zstd, identity"
328    }
329    #[cfg(all(
330        not(feature = "compression-gzip-stream"),
331        feature = "compression-deflate-stream",
332        feature = "compression-br-stream",
333        not(feature = "compression-zstd-stream")
334    ))]
335    {
336        "deflate, br, identity"
337    }
338    #[cfg(all(
339        not(feature = "compression-gzip-stream"),
340        feature = "compression-deflate-stream",
341        not(feature = "compression-br-stream"),
342        feature = "compression-zstd-stream"
343    ))]
344    {
345        "deflate, zstd, identity"
346    }
347    #[cfg(all(
348        not(feature = "compression-gzip-stream"),
349        feature = "compression-deflate-stream",
350        not(feature = "compression-br-stream"),
351        not(feature = "compression-zstd-stream")
352    ))]
353    {
354        "deflate, identity"
355    }
356    #[cfg(all(
357        not(feature = "compression-gzip-stream"),
358        not(feature = "compression-deflate-stream"),
359        feature = "compression-br-stream",
360        feature = "compression-zstd-stream"
361    ))]
362    {
363        "br, zstd, identity"
364    }
365    #[cfg(all(
366        not(feature = "compression-gzip-stream"),
367        not(feature = "compression-deflate-stream"),
368        feature = "compression-br-stream",
369        not(feature = "compression-zstd-stream")
370    ))]
371    {
372        "br, identity"
373    }
374    #[cfg(all(
375        not(feature = "compression-gzip-stream"),
376        not(feature = "compression-deflate-stream"),
377        not(feature = "compression-br-stream"),
378        feature = "compression-zstd-stream"
379    ))]
380    {
381        "zstd, identity"
382    }
383    #[cfg(all(
384        not(feature = "compression-gzip-stream"),
385        not(feature = "compression-deflate-stream"),
386        not(feature = "compression-br-stream"),
387        not(feature = "compression-zstd-stream")
388    ))]
389    {
390        "identity"
391    }
392}
393
394/// Negotiate response encoding from Accept-Encoding header.
395///
396/// Follows connect-go's approach: first supported encoding wins (client preference order).
397/// Respects `q=0` which means "not acceptable" per RFC 7231.
398pub fn negotiate_response_encoding(accept: Option<&str>) -> CompressionEncoding {
399    let Some(accept) = accept else {
400        return CompressionEncoding::Identity;
401    };
402
403    for token in accept.split(',') {
404        let token = token.trim();
405        if token.is_empty() {
406            continue;
407        }
408
409        // Parse "gzip;q=0.5" into encoding="gzip", q_value=Some("0.5")
410        let (encoding, q_value) = match token.split_once(';') {
411            Some((enc, params)) => {
412                let q = params.split(';').find_map(|p| p.trim().strip_prefix("q="));
413                (enc.trim(), q)
414            }
415            None => (token, None),
416        };
417
418        // Skip if q=0 (explicitly disabled)
419        if let Some(q) = q_value {
420            let q = q.trim();
421            if q == "0" || q == "0.0" || q == "0.00" || q == "0.000" {
422                continue;
423            }
424        }
425
426        // Return first supported encoding
427        match encoding {
428            #[cfg(feature = "compression-gzip-stream")]
429            "gzip" => return CompressionEncoding::Gzip,
430            #[cfg(feature = "compression-deflate-stream")]
431            "deflate" => return CompressionEncoding::Deflate,
432            #[cfg(feature = "compression-br-stream")]
433            "br" => return CompressionEncoding::Brotli,
434            #[cfg(feature = "compression-zstd-stream")]
435            "zstd" => return CompressionEncoding::Zstd,
436            "identity" => return CompressionEncoding::Identity,
437            _ => continue,
438        }
439    }
440
441    CompressionEncoding::Identity
442}
443
444/// Header name for Connect streaming request compression.
445pub const CONNECT_CONTENT_ENCODING: &str = "connect-content-encoding";
446
447/// Header name for Connect streaming response compression negotiation.
448pub const CONNECT_ACCEPT_ENCODING: &str = "connect-accept-encoding";
449
450#[cfg(test)]
451mod tests {
452    use super::*;
453
454    #[test]
455    fn test_compression_encoding_from_header_identity() {
456        assert_eq!(
457            CompressionEncoding::from_header(None),
458            Some(CompressionEncoding::Identity)
459        );
460        assert_eq!(
461            CompressionEncoding::from_header(Some("")),
462            Some(CompressionEncoding::Identity)
463        );
464        assert_eq!(
465            CompressionEncoding::from_header(Some("identity")),
466            Some(CompressionEncoding::Identity)
467        );
468        assert_eq!(CompressionEncoding::from_header(Some("lz4")), None);
469    }
470
471    #[cfg(feature = "compression-gzip-stream")]
472    #[test]
473    fn test_compression_encoding_from_header_gzip() {
474        assert_eq!(
475            CompressionEncoding::from_header(Some("gzip")),
476            Some(CompressionEncoding::Gzip)
477        );
478    }
479
480    #[test]
481    fn test_compression_encoding_as_str_identity() {
482        assert_eq!(CompressionEncoding::Identity.as_str(), "identity");
483    }
484
485    #[cfg(feature = "compression-gzip-stream")]
486    #[test]
487    fn test_compression_encoding_as_str_gzip() {
488        assert_eq!(CompressionEncoding::Gzip.as_str(), "gzip");
489    }
490
491    #[cfg(feature = "compression-gzip-stream")]
492    #[test]
493    fn test_compression_encoding_codec() {
494        assert!(CompressionEncoding::Identity.codec().is_none());
495        let codec = CompressionEncoding::Gzip.codec();
496        assert!(codec.is_some());
497        assert_eq!(codec.unwrap().name(), "gzip");
498    }
499
500    #[test]
501    fn test_compression_level_precise() {
502        assert_eq!(CompressionLevel::precise(5), CompressionLevel::Precise(5));
503    }
504
505    #[test]
506    fn test_compression_config_default() {
507        let config = CompressionConfig::default();
508        assert_eq!(config.min_bytes, 0);
509        assert_eq!(config.level, CompressionLevel::Default);
510    }
511
512    #[test]
513    fn test_compression_config_new() {
514        let config = CompressionConfig::new(512);
515        assert_eq!(config.min_bytes, 512);
516    }
517
518    #[test]
519    fn test_compression_config_disabled() {
520        let config = CompressionConfig::disabled();
521        assert_eq!(config.min_bytes, usize::MAX);
522        assert!(config.is_disabled());
523    }
524
525    #[test]
526    fn test_negotiate_response_encoding_identity() {
527        assert_eq!(
528            negotiate_response_encoding(None),
529            CompressionEncoding::Identity
530        );
531        assert_eq!(
532            negotiate_response_encoding(Some("")),
533            CompressionEncoding::Identity
534        );
535    }
536
537    #[cfg(feature = "compression-gzip-stream")]
538    #[test]
539    fn test_negotiate_response_encoding_gzip() {
540        assert_eq!(
541            negotiate_response_encoding(Some("gzip")),
542            CompressionEncoding::Gzip
543        );
544        assert_eq!(
545            negotiate_response_encoding(Some("gzip, identity")),
546            CompressionEncoding::Gzip
547        );
548    }
549
550    #[cfg(feature = "compression-gzip-stream")]
551    #[test]
552    fn test_negotiate_response_encoding_q_values() {
553        // q=0 means "not acceptable"
554        assert_eq!(
555            negotiate_response_encoding(Some("gzip;q=0")),
556            CompressionEncoding::Identity
557        );
558        assert_eq!(
559            negotiate_response_encoding(Some("gzip;q=0, identity")),
560            CompressionEncoding::Identity
561        );
562        // Non-zero q values should be accepted
563        assert_eq!(
564            negotiate_response_encoding(Some("gzip;q=1")),
565            CompressionEncoding::Gzip
566        );
567        assert_eq!(
568            negotiate_response_encoding(Some("gzip;q=0.5")),
569            CompressionEncoding::Gzip
570        );
571    }
572}