zus_common/
compression.rs

1use bytes::{Bytes, BytesMut};
2
3use crate::error::{Result, ZusError};
4
5/// QuickLZ compression prefix (matching C++ implementation)
6const QUICKLZ_PREFIX: &[u8] = b"qlz";
7const QUICKLZ_PREFIX_LEN: usize = 3;
8
9/// Compression type (matching Java's COMPRESSTYPE constants)
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum CompressionType {
12  /// No compression
13  None = 0,
14  /// QuickLZ compression (default in C++ and Java)
15  QuickLZ = 1,
16  /// Snappy compression (faster alternative)
17  Snappy = 2,
18}
19
20/// Compression configuration and utilities
21///
22/// **Default Configuration**:
23/// - Enabled: true
24/// - Threshold: 4KB (4096 bytes)
25/// - Type: Snappy
26///
27/// **Implementation Notes**:
28/// - C++ client: 4KB default (recommended)
29/// - C++ server: 1KB default (too aggressive, causes asymmetry)
30/// - Java: 8KB default (too conservative, less network savings)
31/// - **Rust: 4KB default (standardized best practice)**
32///
33/// **Configuration via config file**:
34/// ```ini
35/// [zusnet]
36/// COMPRESS=true                # Enable/disable compression
37/// COMPRESS_THRESHSIZE=4096     # Threshold in bytes (default: 4096)
38/// ```
39#[derive(Debug, Clone)]
40pub struct Compressor {
41  /// Compression enabled
42  pub enabled: bool,
43  /// Threshold size in bytes (only compress if data >= this size)
44  ///
45  /// **Recommended**: 4096 (4KB)
46  /// - Balances compression benefit vs CPU cost
47  /// - Standardized across all implementations
48  pub threshold_bytes: usize,
49  /// Compression type
50  ///
51  /// **Current**: Snappy (fast, good compression)
52  /// **Future**: QuickLZ support for backward compatibility
53  pub compression_type: CompressionType,
54}
55
56impl Compressor {
57  /// Create a new compressor with default settings
58  ///
59  /// **Compression Defaults**:
60  /// - Threshold: 4KB (4096 bytes) - Recommended standardized value
61  /// - Type: QuickLZ (default in C++ and Java)
62  /// - Enabled: true
63  ///
64  /// **Rationale for 4KB threshold**:
65  /// - C++ client default: 4KB
66  /// - C++ server default: 1KB (too aggressive)
67  /// - Java default: 8KB (too conservative)
68  /// - Production recommendation: 4KB balances CPU vs network savings
69  /// - Messages < 4KB compress poorly (low ratio, high CPU cost)
70  /// - Messages >= 4KB show 60-70% compression ratio
71  ///
72  /// **Rationale for QuickLZ as default**:
73  /// - Matches C++ and Java default algorithm
74  /// - Better compression ratio than Snappy (~70% vs ~60%)
75  /// - Slightly slower than Snappy but more compatible
76  /// - Auto-detection works with existing C++/Java services
77  ///
78  /// See: `/Users/junfeiwang/Documents/zus-analysis/compression_defaults_comparison.md`
79  pub fn new() -> Self {
80    Self {
81      enabled: true,
82      threshold_bytes: 4096,                      // 4KB - standardized across implementations
83      compression_type: CompressionType::QuickLZ, // Match C++/Java default
84    }
85  }
86
87  /// Create a compressor with custom settings
88  pub fn with_config(enabled: bool, threshold_bytes: usize, compression_type: CompressionType) -> Self {
89    Self {
90      enabled,
91      threshold_bytes,
92      compression_type,
93    }
94  }
95
96  /// Compress data if it meets the threshold
97  ///
98  /// Returns (compressed_data, was_compressed)
99  pub fn compress(&self, data: &[u8]) -> Result<(Bytes, bool)> {
100    // Don't compress if disabled or below threshold
101    if !self.enabled || data.len() < self.threshold_bytes {
102      return Ok((Bytes::copy_from_slice(data), false));
103    }
104
105    match self.compression_type {
106      | CompressionType::None => Ok((Bytes::copy_from_slice(data), false)),
107      | CompressionType::QuickLZ => self.compress_quicklz(data),
108      | CompressionType::Snappy => self.compress_snappy(data),
109    }
110  }
111
112  /// Decompress data
113  ///
114  /// Auto-detects compression type by checking prefixes:
115  /// - "Snappy\0" (7 bytes) → Snappy format
116  /// - "qlz" (3 bytes) → QuickLZ format
117  /// - Otherwise → Try QuickLZ, fallback to uncompressed if it fails
118  ///
119  /// This matches the C++ and Java auto-detection logic.
120  pub fn decompress(&self, data: &[u8]) -> Result<Bytes> {
121    if data.is_empty() {
122      return Ok(Bytes::new());
123    }
124
125    // Auto-detect Snappy by prefix (matching Java/C++ implementation)
126    if data.len() >= 7
127      && data[0] == b'S'
128      && data[1] == b'n'
129      && data[2] == b'a'
130      && data[3] == b'p'
131      && data[4] == b'p'
132      && data[5] == b'y'
133      && data[6] == 0
134    {
135      // Snappy format with prefix
136      return self.decompress_snappy(&data[7..]);
137    }
138
139    // Auto-detect QuickLZ by prefix
140    if data.len() >= QUICKLZ_PREFIX_LEN && &data[0..QUICKLZ_PREFIX_LEN] == QUICKLZ_PREFIX {
141      // QuickLZ format with prefix
142      return self.decompress_quicklz(&data[QUICKLZ_PREFIX_LEN..]);
143    }
144
145    // Fallback: Try QuickLZ without prefix (matching C++ behavior)
146    // If decompression fails, assume data is uncompressed and return as-is
147    match self.decompress_quicklz(data) {
148      | Ok(decompressed) => Ok(decompressed),
149      | Err(_) => {
150        // Not compressed, return original data
151        Ok(Bytes::copy_from_slice(data))
152      }
153    }
154  }
155
156  /// Compress using Snappy
157  ///
158  /// Prepends "Snappy\0" prefix for auto-detection (matching Java)
159  fn compress_snappy(&self, data: &[u8]) -> Result<(Bytes, bool)> {
160    let mut encoder = snap::raw::Encoder::new();
161    let compressed = encoder
162      .compress_vec(data)
163      .map_err(|e| ZusError::Protocol(format!("Snappy compression failed: {e}")))?;
164
165    // Prepend "Snappy\0" prefix
166    let mut result = BytesMut::with_capacity(7 + compressed.len());
167    result.extend_from_slice(b"Snappy\0");
168    result.extend_from_slice(&compressed);
169
170    Ok((result.freeze(), true))
171  }
172
173  /// Decompress Snappy data (without prefix)
174  fn decompress_snappy(&self, data: &[u8]) -> Result<Bytes> {
175    let mut decoder = snap::raw::Decoder::new();
176    let decompressed = decoder
177      .decompress_vec(data)
178      .map_err(|e| ZusError::Protocol(format!("Snappy decompression failed: {e}")))?;
179
180    Ok(Bytes::from(decompressed))
181  }
182
183  /// Compress using QuickLZ
184  ///
185  /// Returns raw QuickLZ data without prefix (matching Java/C++ behavior)
186  fn compress_quicklz(&self, data: &[u8]) -> Result<(Bytes, bool)> {
187    use quicklz::CompressionLevel;
188
189    // Use Lvl1 for speed (matching C++/Java behavior)
190    let compressed = quicklz::compress(data, CompressionLevel::Lvl1);
191
192    // Return raw compressed data without prefix
193    // Java expects raw QuickLZ data starting with the QuickLZ header byte
194    Ok((Bytes::from(compressed), true))
195  }
196
197  /// Decompress QuickLZ data (without prefix)
198  fn decompress_quicklz(&self, data: &[u8]) -> Result<Bytes> {
199    use std::io::Cursor;
200
201    let mut cursor = Cursor::new(data);
202    // QuickLZ can decompress to much larger size, use generous estimate
203    // Max 100MB decompressed (reasonable for RPC messages)
204    let max_decompressed_size = 100 * 1024 * 1024; // 100MB
205    let decompressed = quicklz::decompress(&mut cursor, max_decompressed_size)
206      .map_err(|e| ZusError::Protocol(format!("QuickLZ decompression failed: {e:?}")))?;
207
208    Ok(Bytes::from(decompressed))
209  }
210}
211
212impl Default for Compressor {
213  fn default() -> Self {
214    Self::new()
215  }
216}
217
218#[cfg(test)]
219mod tests {
220  use super::*;
221
222  #[test]
223  fn test_quicklz_roundtrip() {
224    let compressor = Compressor::new(); // Default is QuickLZ now
225
226    // Create data larger than threshold (4KB)
227    let data: Vec<u8> = (0..10000).map(|i| (i % 256) as u8).collect();
228
229    // Compress
230    let (compressed, was_compressed) = compressor.compress(&data).unwrap();
231    assert!(was_compressed);
232    assert!(compressed.len() < data.len()); // Should be smaller
233
234    // QuickLZ raw format: no prefix, starts with QuickLZ header byte
235    // The first byte contains flags and compression level info
236    // We just verify it's not empty and decompresses correctly
237    assert!(!compressed.is_empty());
238
239    // Decompress
240    let decompressed = compressor.decompress(&compressed).unwrap();
241    assert_eq!(decompressed.as_ref(), data.as_slice());
242  }
243
244  #[test]
245  fn test_snappy_roundtrip() {
246    let compressor = Compressor::with_config(true, 4096, CompressionType::Snappy);
247
248    // Create data larger than threshold (4KB)
249    let data: Vec<u8> = (0..10000).map(|i| (i % 256) as u8).collect();
250
251    // Compress
252    let (compressed, was_compressed) = compressor.compress(&data).unwrap();
253    assert!(was_compressed);
254    assert!(compressed.len() < data.len()); // Should be smaller
255
256    // Verify Snappy prefix
257    assert_eq!(&compressed[0..7], b"Snappy\0");
258
259    // Decompress
260    let decompressed = compressor.decompress(&compressed).unwrap();
261    assert_eq!(decompressed.as_ref(), data.as_slice());
262  }
263
264  #[test]
265  fn test_below_threshold_not_compressed() {
266    let compressor = Compressor::new();
267
268    // Small data (below 4KB threshold)
269    let data = b"Hello, world!";
270
271    let (result, was_compressed) = compressor.compress(data).unwrap();
272    assert!(!was_compressed);
273    assert_eq!(result.as_ref(), data);
274  }
275
276  #[test]
277  fn test_compression_disabled() {
278    let compressor = Compressor::with_config(false, 4096, CompressionType::QuickLZ);
279
280    // Large data (above threshold)
281    let data: Vec<u8> = (0..10000).map(|i| (i % 256) as u8).collect();
282
283    let (result, was_compressed) = compressor.compress(&data).unwrap();
284    assert!(!was_compressed);
285    assert_eq!(result.as_ref(), data.as_slice());
286  }
287
288  #[test]
289  fn test_auto_detect_compression() {
290    // Test QuickLZ auto-detection
291    let quicklz_compressor = Compressor::new(); // Default is QuickLZ
292    let data: Vec<u8> = (0..10000).map(|i| (i % 256) as u8).collect();
293    let (compressed_qlz, _) = quicklz_compressor.compress(&data).unwrap();
294    let decompressed_qlz = quicklz_compressor.decompress(&compressed_qlz).unwrap();
295    assert_eq!(decompressed_qlz.as_ref(), data.as_slice());
296
297    // Test Snappy auto-detection
298    let snappy_compressor = Compressor::with_config(true, 4096, CompressionType::Snappy);
299    let (compressed_snappy, _) = snappy_compressor.compress(&data).unwrap();
300    let decompressed_snappy = snappy_compressor.decompress(&compressed_snappy).unwrap();
301    assert_eq!(decompressed_snappy.as_ref(), data.as_slice());
302
303    // Test cross-decompression (QuickLZ compressor can decompress Snappy)
304    let decompressed_cross = quicklz_compressor.decompress(&compressed_snappy).unwrap();
305    assert_eq!(decompressed_cross.as_ref(), data.as_slice());
306  }
307
308  #[test]
309  fn test_uncompressed_data() {
310    let compressor = Compressor::new();
311
312    let data = b"This is not compressed";
313
314    // Should not fail on uncompressed data
315    let decompressed = compressor.decompress(data).unwrap();
316    assert_eq!(decompressed.as_ref(), data);
317  }
318
319  #[test]
320  fn test_threshold_boundary() {
321    let compressor = Compressor::new();
322
323    // Test exactly at 4KB threshold boundary
324    let data_4095: Vec<u8> = vec![0xFF; 4095]; // 4095 bytes (< 4KB)
325    let data_4096: Vec<u8> = vec![0xFF; 4096]; // 4096 bytes (== 4KB)
326    let data_4097: Vec<u8> = vec![0xFF; 4097]; // 4097 bytes (> 4KB)
327
328    // Below threshold: should NOT compress
329    let (_, compressed_4095) = compressor.compress(&data_4095).unwrap();
330    assert!(
331      !compressed_4095,
332      "4095 bytes should NOT be compressed (< 4KB threshold)"
333    );
334
335    // Exactly at threshold: SHOULD compress
336    let (_, compressed_4096) = compressor.compress(&data_4096).unwrap();
337    assert!(compressed_4096, "4096 bytes SHOULD be compressed (>= 4KB threshold)");
338
339    // Above threshold: SHOULD compress
340    let (_, compressed_4097) = compressor.compress(&data_4097).unwrap();
341    assert!(compressed_4097, "4097 bytes SHOULD be compressed (> 4KB threshold)");
342  }
343
344  #[test]
345  fn test_standardized_threshold_vs_legacy() {
346    // Test that new 4KB default differs from old implementations
347    let rust_compressor = Compressor::new();
348    assert_eq!(
349      rust_compressor.threshold_bytes, 4096,
350      "Rust should use 4KB (standardized)"
351    );
352    assert_eq!(
353      rust_compressor.compression_type,
354      CompressionType::QuickLZ,
355      "Rust should use QuickLZ (matching C++/Java default)"
356    );
357
358    // Simulate legacy thresholds (all use QuickLZ like C++/Java)
359    let cpp_client = Compressor::with_config(true, 4096, CompressionType::QuickLZ);
360    let cpp_server = Compressor::with_config(true, 1024, CompressionType::QuickLZ);
361    let java_legacy = Compressor::with_config(true, 8192, CompressionType::QuickLZ);
362
363    // 2KB message
364    let data_2kb: Vec<u8> = vec![0xFF; 2048];
365
366    // C++ client (4KB): NOT compressed
367    let (_, compressed_cpp_client) = cpp_client.compress(&data_2kb).unwrap();
368    assert!(!compressed_cpp_client);
369
370    // C++ server (1KB): COMPRESSED (asymmetric!)
371    let (_, compressed_cpp_server) = cpp_server.compress(&data_2kb).unwrap();
372    assert!(compressed_cpp_server);
373
374    // Java (8KB): NOT compressed
375    let (_, compressed_java) = java_legacy.compress(&data_2kb).unwrap();
376    assert!(!compressed_java);
377
378    // Rust (4KB): NOT compressed (matches C++ client)
379    let (_, compressed_rust) = rust_compressor.compress(&data_2kb).unwrap();
380    assert!(!compressed_rust);
381    assert_eq!(
382      compressed_rust, compressed_cpp_client,
383      "Rust should match C++ client behavior (not C++ server)"
384    );
385  }
386}