Skip to main content

simulator_api/
ws_compression.rs

1//! Streaming zstd compression for the subscription WebSocket firehose.
2//!
3//! One compressor/decompressor per connection, retaining the zstd window across
4//! messages (context-takeover) for a high ratio on the similar notification
5//! frames. The server compresses each notification into a `Binary` frame; the
6//! client decodes it. The pair is dropped on disconnect, so a reconnect starts a
7//! fresh stream. Opt-in rides [`crate::subscribe_config::SubscribeConfig`] so it
8//! survives the management WebSocket proxy.
9
10use std::io::{self, Write};
11
12use zstd::stream::write::{Decoder, Encoder};
13
14/// zstd level for the firehose; low because the win is context-takeover across
15/// similar messages, not the level (and the session workers are CPU-bound).
16pub const WS_COMPRESSION_LEVEL: i32 = 3;
17
18/// Per-connection streaming compressor. Each [`Self::compress`] call flushes the
19/// zstd stream so the returned bytes independently decode to that one message,
20/// while the dictionary window carries over to the next call.
21pub struct WsStreamCompressor {
22    encoder: Encoder<'static, Vec<u8>>,
23}
24
25impl WsStreamCompressor {
26    pub fn new(level: i32) -> io::Result<Self> {
27        Ok(Self {
28            encoder: Encoder::new(Vec::new(), level)?,
29        })
30    }
31
32    /// Compress one message into a self-contained frame, retaining the stream
33    /// context (window) for the next message.
34    pub fn compress(&mut self, message: &[u8]) -> io::Result<Vec<u8>> {
35        self.encoder.write_all(message)?;
36        // `flush` emits a zstd flush block (everything written so far) but keeps
37        // the stream open, so the next message reuses the accumulated window.
38        self.encoder.flush()?;
39        Ok(take_buffer(self.encoder.get_mut()))
40    }
41}
42
43/// Per-connection streaming decompressor, the inverse of [`WsStreamCompressor`].
44/// Frames must be fed in the order they were produced.
45pub struct WsStreamDecompressor {
46    decoder: Decoder<'static, Vec<u8>>,
47}
48
49impl WsStreamDecompressor {
50    pub fn new() -> io::Result<Self> {
51        Ok(Self {
52            decoder: Decoder::new(Vec::new())?,
53        })
54    }
55
56    /// Decompress one frame produced by [`WsStreamCompressor::compress`],
57    /// returning the original message bytes.
58    pub fn decompress(&mut self, frame: &[u8]) -> io::Result<Vec<u8>> {
59        self.decoder.write_all(frame)?;
60        self.decoder.flush()?;
61        Ok(take_buffer(self.decoder.get_mut()))
62    }
63}
64
65/// Take the produced bytes out of a streaming codec's output buffer, leaving a
66/// fresh buffer pre-sized to the last frame. Avoids regrowing the `Vec` from
67/// zero capacity on every message on the per-notification firehose path.
68fn take_buffer(buf: &mut Vec<u8>) -> Vec<u8> {
69    std::mem::replace(buf, Vec::with_capacity(buf.capacity()))
70}
71
72#[cfg(test)]
73mod tests {
74    use super::*;
75
76    /// A batch of structurally-similar messages, like a transaction firehose.
77    fn sample_messages(n: usize) -> Vec<Vec<u8>> {
78        (0..n)
79            .map(|i| {
80                format!(
81                    r#"{{"jsonrpc":"2.0","method":"transactionNotification","params":{{"subscription":1,"result":{{"slot":{},"transaction":{{"signatures":["sig{}"],"meta":{{"fee":5000,"computeUnitsConsumed":1234,"logMessages":["Program X invoke [1]","Program X success"]}}}}}}}}}}"#,
82                    423_816_307 + i,
83                    i
84                )
85                .into_bytes()
86            })
87            .collect()
88    }
89
90    #[test]
91    fn round_trips_a_stream_of_messages() {
92        let messages = sample_messages(50);
93        let mut comp = WsStreamCompressor::new(WS_COMPRESSION_LEVEL).unwrap();
94        let mut decomp = WsStreamDecompressor::new().unwrap();
95
96        for msg in &messages {
97            let frame = comp.compress(msg).unwrap();
98            let back = decomp.decompress(&frame).unwrap();
99            assert_eq!(&back, msg, "decompressed frame must match the original");
100        }
101    }
102
103    #[test]
104    fn handles_empty_and_large_messages() {
105        let mut comp = WsStreamCompressor::new(WS_COMPRESSION_LEVEL).unwrap();
106        let mut decomp = WsStreamDecompressor::new().unwrap();
107
108        let empty = comp.compress(b"").unwrap();
109        assert_eq!(decomp.decompress(&empty).unwrap(), b"");
110
111        let large = vec![b'a'; 5 * 1024 * 1024];
112        let frame = comp.compress(&large).unwrap();
113        assert_eq!(decomp.decompress(&frame).unwrap(), large);
114    }
115}