simulator_api/
ws_compression.rs1use std::io::{self, Write};
11
12use zstd::stream::write::{Decoder, Encoder};
13
14pub const WS_COMPRESSION_LEVEL: i32 = 3;
17
18pub struct WsStreamCompressor {
22 encoder: Encoder<'static, Vec<u8>>,
23}
24
25impl WsStreamCompressor {
26 pub fn new(level: i32) -> io::Result<Self> {
27 Ok(Self {
28 encoder: Encoder::new(Vec::new(), level)?,
29 })
30 }
31
32 pub fn compress(&mut self, message: &[u8]) -> io::Result<Vec<u8>> {
35 self.encoder.write_all(message)?;
36 self.encoder.flush()?;
39 Ok(take_buffer(self.encoder.get_mut()))
40 }
41}
42
43pub struct WsStreamDecompressor {
46 decoder: Decoder<'static, Vec<u8>>,
47}
48
49impl WsStreamDecompressor {
50 pub fn new() -> io::Result<Self> {
51 Ok(Self {
52 decoder: Decoder::new(Vec::new())?,
53 })
54 }
55
56 pub fn decompress(&mut self, frame: &[u8]) -> io::Result<Vec<u8>> {
59 self.decoder.write_all(frame)?;
60 self.decoder.flush()?;
61 Ok(take_buffer(self.decoder.get_mut()))
62 }
63}
64
65fn take_buffer(buf: &mut Vec<u8>) -> Vec<u8> {
69 std::mem::replace(buf, Vec::with_capacity(buf.capacity()))
70}
71
72#[cfg(test)]
73mod tests {
74 use super::*;
75
76 fn sample_messages(n: usize) -> Vec<Vec<u8>> {
78 (0..n)
79 .map(|i| {
80 format!(
81 r#"{{"jsonrpc":"2.0","method":"transactionNotification","params":{{"subscription":1,"result":{{"slot":{},"transaction":{{"signatures":["sig{}"],"meta":{{"fee":5000,"computeUnitsConsumed":1234,"logMessages":["Program X invoke [1]","Program X success"]}}}}}}}}}}"#,
82 423_816_307 + i,
83 i
84 )
85 .into_bytes()
86 })
87 .collect()
88 }
89
90 #[test]
91 fn round_trips_a_stream_of_messages() {
92 let messages = sample_messages(50);
93 let mut comp = WsStreamCompressor::new(WS_COMPRESSION_LEVEL).unwrap();
94 let mut decomp = WsStreamDecompressor::new().unwrap();
95
96 for msg in &messages {
97 let frame = comp.compress(msg).unwrap();
98 let back = decomp.decompress(&frame).unwrap();
99 assert_eq!(&back, msg, "decompressed frame must match the original");
100 }
101 }
102
103 #[test]
104 fn handles_empty_and_large_messages() {
105 let mut comp = WsStreamCompressor::new(WS_COMPRESSION_LEVEL).unwrap();
106 let mut decomp = WsStreamDecompressor::new().unwrap();
107
108 let empty = comp.compress(b"").unwrap();
109 assert_eq!(decomp.decompress(&empty).unwrap(), b"");
110
111 let large = vec![b'a'; 5 * 1024 * 1024];
112 let frame = comp.compress(&large).unwrap();
113 assert_eq!(decomp.decompress(&frame).unwrap(), large);
114 }
115}