simulator-api 0.9.0

Wire-protocol types for the Solana simulator backtest WebSocket API
Documentation
//! Streaming zstd compression for the subscription WebSocket firehose.
//!
//! One compressor/decompressor per connection, retaining the zstd window across
//! messages (context-takeover) for a high ratio on the similar notification
//! frames. The server compresses each notification into a `Binary` frame; the
//! client decodes it. The pair is dropped on disconnect, so a reconnect starts a
//! fresh stream. Opt-in rides [`crate::subscribe_config::SubscribeConfig`] so it
//! survives the management WebSocket proxy.

use std::io::{self, Write};

use zstd::stream::write::{Decoder, Encoder};

/// zstd level for the firehose; low because the win is context-takeover across
/// similar messages, not the level (and the session workers are CPU-bound).
pub const WS_COMPRESSION_LEVEL: i32 = 3;

/// Per-connection streaming compressor. Each [`Self::compress`] call flushes the
/// zstd stream so the returned bytes independently decode to that one message,
/// while the dictionary window carries over to the next call.
pub struct WsStreamCompressor {
    encoder: Encoder<'static, Vec<u8>>,
}

impl WsStreamCompressor {
    pub fn new(level: i32) -> io::Result<Self> {
        Ok(Self {
            encoder: Encoder::new(Vec::new(), level)?,
        })
    }

    /// Compress one message into a self-contained frame, retaining the stream
    /// context (window) for the next message.
    pub fn compress(&mut self, message: &[u8]) -> io::Result<Vec<u8>> {
        self.encoder.write_all(message)?;
        // `flush` emits a zstd flush block (everything written so far) but keeps
        // the stream open, so the next message reuses the accumulated window.
        self.encoder.flush()?;
        Ok(take_buffer(self.encoder.get_mut()))
    }
}

/// Per-connection streaming decompressor, the inverse of [`WsStreamCompressor`].
/// Frames must be fed in the order they were produced.
pub struct WsStreamDecompressor {
    decoder: Decoder<'static, Vec<u8>>,
}

impl WsStreamDecompressor {
    pub fn new() -> io::Result<Self> {
        Ok(Self {
            decoder: Decoder::new(Vec::new())?,
        })
    }

    /// Decompress one frame produced by [`WsStreamCompressor::compress`],
    /// returning the original message bytes.
    pub fn decompress(&mut self, frame: &[u8]) -> io::Result<Vec<u8>> {
        self.decoder.write_all(frame)?;
        self.decoder.flush()?;
        Ok(take_buffer(self.decoder.get_mut()))
    }
}

/// Take the produced bytes out of a streaming codec's output buffer, leaving a
/// fresh buffer pre-sized to the last frame. Avoids regrowing the `Vec` from
/// zero capacity on every message on the per-notification firehose path.
fn take_buffer(buf: &mut Vec<u8>) -> Vec<u8> {
    std::mem::replace(buf, Vec::with_capacity(buf.capacity()))
}

#[cfg(test)]
mod tests {
    use super::*;

    /// A batch of structurally-similar messages, like a transaction firehose.
    fn sample_messages(n: usize) -> Vec<Vec<u8>> {
        (0..n)
            .map(|i| {
                format!(
                    r#"{{"jsonrpc":"2.0","method":"transactionNotification","params":{{"subscription":1,"result":{{"slot":{},"transaction":{{"signatures":["sig{}"],"meta":{{"fee":5000,"computeUnitsConsumed":1234,"logMessages":["Program X invoke [1]","Program X success"]}}}}}}}}}}"#,
                    423_816_307 + i,
                    i
                )
                .into_bytes()
            })
            .collect()
    }

    #[test]
    fn round_trips_a_stream_of_messages() {
        let messages = sample_messages(50);
        let mut comp = WsStreamCompressor::new(WS_COMPRESSION_LEVEL).unwrap();
        let mut decomp = WsStreamDecompressor::new().unwrap();

        for msg in &messages {
            let frame = comp.compress(msg).unwrap();
            let back = decomp.decompress(&frame).unwrap();
            assert_eq!(&back, msg, "decompressed frame must match the original");
        }
    }

    #[test]
    fn handles_empty_and_large_messages() {
        let mut comp = WsStreamCompressor::new(WS_COMPRESSION_LEVEL).unwrap();
        let mut decomp = WsStreamDecompressor::new().unwrap();

        let empty = comp.compress(b"").unwrap();
        assert_eq!(decomp.decompress(&empty).unwrap(), b"");

        let large = vec![b'a'; 5 * 1024 * 1024];
        let frame = comp.compress(&large).unwrap();
        assert_eq!(decomp.decompress(&frame).unwrap(), large);
    }
}