use crate::perf::{
compiler_optimization::CompileTimeOptimizedEventProcessor, simd::SIMDSerializer,
};
use anyhow::Result;
use base64::engine::general_purpose::STANDARD;
use base64::Engine;
use crossbeam_queue::ArrayQueue;
use once_cell::sync::Lazy;
use solana_client::rpc_client::SerializableTransaction;
use solana_sdk::signature::Signature;
use solana_transaction_status::UiTransactionEncoding;
use std::sync::Arc;
const SERIALIZER_POOL_SIZE: usize = 10_000;
const SERIALIZER_BUFFER_SIZE: usize = 256 * 1024;
const SERIALIZER_PREWARM_BUFFERS: usize = 64;
pub struct ZeroAllocSerializer {
buffer_pool: Arc<ArrayQueue<Vec<u8>>>,
buffer_size: usize,
}
impl ZeroAllocSerializer {
pub fn new(pool_size: usize, buffer_size: usize) -> Self {
Self::new_with_prewarm(pool_size, buffer_size, SERIALIZER_PREWARM_BUFFERS)
}
fn new_with_prewarm(pool_size: usize, buffer_size: usize, prewarm_buffers: usize) -> Self {
let pool = ArrayQueue::new(pool_size);
let prewarm_count = prewarm_buffers.min(pool_size);
for _ in 0..prewarm_count {
let _ = pool.push(Vec::with_capacity(buffer_size));
}
Self { buffer_pool: Arc::new(pool), buffer_size }
}
pub fn serialize_zero_alloc<T: serde::Serialize>(
&self,
data: &T,
_label: &str,
) -> Result<Vec<u8>> {
let mut buffer =
self.buffer_pool.pop().unwrap_or_else(|| Vec::with_capacity(self.buffer_size));
let serialized = bincode::serialize(data)?;
buffer.clear();
buffer.extend_from_slice(&serialized);
Ok(buffer)
}
pub fn return_buffer(&self, buffer: Vec<u8>) {
let _ = self.buffer_pool.push(buffer);
}
pub fn get_pool_stats(&self) -> (usize, usize) {
let available = self.buffer_pool.len();
let capacity = self.buffer_pool.capacity();
(available, capacity)
}
}
static SERIALIZER: Lazy<Arc<ZeroAllocSerializer>> =
Lazy::new(|| Arc::new(ZeroAllocSerializer::new(SERIALIZER_POOL_SIZE, SERIALIZER_BUFFER_SIZE)));
static COMPILE_TIME_PROCESSOR: CompileTimeOptimizedEventProcessor =
CompileTimeOptimizedEventProcessor::new();
pub struct Base64Encoder;
impl Base64Encoder {
#[inline(always)]
pub fn encode(data: &[u8]) -> String {
let _route = if !data.is_empty() {
COMPILE_TIME_PROCESSOR.route_event_zero_cost(data[0])
} else {
0
};
SIMDSerializer::encode_base64_simd(data)
}
#[inline(always)]
pub fn serialize_and_encode<T: serde::Serialize>(
value: &T,
event_type: &str,
) -> Result<String> {
let serialized = SERIALIZER.serialize_zero_alloc(value, event_type)?;
let encoded = STANDARD.encode(&serialized);
SERIALIZER.return_buffer(serialized);
Ok(encoded)
}
}
pub struct PooledTxBufGuard(pub Vec<u8>);
impl std::ops::Deref for PooledTxBufGuard {
type Target = [u8];
fn deref(&self) -> &[u8] {
&self.0
}
}
impl Drop for PooledTxBufGuard {
fn drop(&mut self) {
if !self.0.is_empty() {
SERIALIZER.return_buffer(std::mem::take(&mut self.0));
}
}
}
pub fn serialize_transaction_bincode_sync(
transaction: &impl SerializableTransaction,
) -> Result<(PooledTxBufGuard, Signature)> {
let signature = transaction.get_signature();
let serialized_tx = SERIALIZER.serialize_zero_alloc(transaction, "transaction")?;
Ok((PooledTxBufGuard(serialized_tx), *signature))
}
pub fn return_serialization_buffer(buffer: Vec<u8>) {
SERIALIZER.return_buffer(buffer);
}
pub fn serialize_transaction_sync(
transaction: &impl SerializableTransaction,
encoding: UiTransactionEncoding,
) -> Result<(String, Signature)> {
let signature = transaction.get_signature();
let serialized_tx = SERIALIZER.serialize_zero_alloc(transaction, "transaction")?;
let serialized = match encoding {
UiTransactionEncoding::Base58 => bs58::encode(&serialized_tx).into_string(),
UiTransactionEncoding::Base64 => SIMDSerializer::encode_base64_simd(&serialized_tx),
_ => return Err(anyhow::anyhow!("Unsupported encoding")),
};
SERIALIZER.return_buffer(serialized_tx);
Ok((serialized, *signature))
}
pub async fn serialize_transaction(
transaction: &impl SerializableTransaction,
encoding: UiTransactionEncoding,
) -> Result<(String, Signature)> {
let signature = transaction.get_signature();
let serialized_tx = SERIALIZER.serialize_zero_alloc(transaction, "transaction")?;
let serialized = match encoding {
UiTransactionEncoding::Base58 => bs58::encode(&serialized_tx).into_string(),
UiTransactionEncoding::Base64 => SIMDSerializer::encode_base64_simd(&serialized_tx),
_ => return Err(anyhow::anyhow!("Unsupported encoding")),
};
SERIALIZER.return_buffer(serialized_tx);
Ok((serialized, *signature))
}
pub fn serialize_transactions_batch_sync(
transactions: &[impl SerializableTransaction],
encoding: UiTransactionEncoding,
) -> Result<Vec<String>> {
let mut results = Vec::with_capacity(transactions.len());
for tx in transactions {
let serialized_tx = SERIALIZER.serialize_zero_alloc(tx, "transaction")?;
let encoded = match encoding {
UiTransactionEncoding::Base58 => bs58::encode(&serialized_tx).into_string(),
UiTransactionEncoding::Base64 => SIMDSerializer::encode_base64_simd(&serialized_tx),
_ => return Err(anyhow::anyhow!("Unsupported encoding")),
};
SERIALIZER.return_buffer(serialized_tx);
results.push(encoded);
}
Ok(results)
}
pub async fn serialize_transactions_batch(
transactions: &[impl SerializableTransaction],
encoding: UiTransactionEncoding,
) -> Result<Vec<String>> {
let mut results = Vec::with_capacity(transactions.len());
for tx in transactions {
let serialized_tx = SERIALIZER.serialize_zero_alloc(tx, "transaction")?;
let encoded = match encoding {
UiTransactionEncoding::Base58 => bs58::encode(&serialized_tx).into_string(),
UiTransactionEncoding::Base64 => SIMDSerializer::encode_base64_simd(&serialized_tx),
_ => return Err(anyhow::anyhow!("Unsupported encoding")),
};
SERIALIZER.return_buffer(serialized_tx);
results.push(encoded);
}
Ok(results)
}
pub fn get_serializer_stats() -> (usize, usize) {
SERIALIZER.get_pool_stats()
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Instant;
#[test]
fn test_base64_encode() {
let data = b"Hello, World!";
let encoded = Base64Encoder::encode(data);
assert!(!encoded.is_empty());
let decoded = STANDARD.decode(&encoded).unwrap();
assert_eq!(&decoded[..data.len()], data);
}
#[test]
fn test_serializer_stats() {
let (available, capacity) = get_serializer_stats();
assert!(available <= capacity);
assert_eq!(capacity, SERIALIZER_POOL_SIZE);
}
#[test]
fn test_serializer_prewarm_is_bounded() {
let serializer = ZeroAllocSerializer::new_with_prewarm(128, 1024, 8);
let (available, capacity) = serializer.get_pool_stats();
assert_eq!(capacity, 128);
assert_eq!(available, 8);
}
#[test]
fn test_serializer_lazy_alloc_and_return() {
let serializer = ZeroAllocSerializer::new_with_prewarm(8, 1024, 0);
let (available_before, capacity) = serializer.get_pool_stats();
assert_eq!(capacity, 8);
assert_eq!(available_before, 0);
let buf = serializer.serialize_zero_alloc(&"hello", "test").unwrap();
assert!(buf.capacity() >= 1024);
serializer.return_buffer(buf);
let (available_after, _) = serializer.get_pool_stats();
assert_eq!(available_after, 1);
}
fn legacy_eager_zero_fill_serializer(
pool_size: usize,
buffer_size: usize,
) -> ZeroAllocSerializer {
let pool = ArrayQueue::new(pool_size);
for _ in 0..pool_size {
let mut buffer = Vec::with_capacity(buffer_size);
buffer.resize(buffer_size, 0);
let _ = pool.push(buffer);
}
ZeroAllocSerializer { buffer_pool: Arc::new(pool), buffer_size }
}
#[test]
#[ignore = "manual perf benchmark"]
fn perf_serializer_cold_start_vs_legacy_eager() {
const POOL_SIZE: usize = 4096;
const BUFFER_SIZE: usize = 32 * 1024;
const PREWARM: usize = 64;
let payload = vec![7u8; 4096];
let legacy_init_start = Instant::now();
let legacy = legacy_eager_zero_fill_serializer(POOL_SIZE, BUFFER_SIZE);
let legacy_init = legacy_init_start.elapsed();
let current_init_start = Instant::now();
let current = ZeroAllocSerializer::new_with_prewarm(POOL_SIZE, BUFFER_SIZE, PREWARM);
let current_init = current_init_start.elapsed();
let legacy_first_start = Instant::now();
let legacy_buf = legacy.serialize_zero_alloc(&payload, "perf").unwrap();
let legacy_first = legacy_first_start.elapsed();
legacy.return_buffer(legacy_buf);
let current_first_start = Instant::now();
let current_buf = current.serialize_zero_alloc(&payload, "perf").unwrap();
let current_first = current_first_start.elapsed();
current.return_buffer(current_buf);
println!(
"[perf] serializer cold-start compare\n pool_size={POOL_SIZE} buffer_size={BUFFER_SIZE} prewarm={PREWARM}\n legacy_init={legacy_init:?} current_init={current_init:?}\n legacy_first_serialize={legacy_first:?} current_first_serialize={current_first:?}"
);
assert!(
current_init <= legacy_init,
"expected bounded prewarm init ({current_init:?}) to be <= legacy eager init ({legacy_init:?})"
);
}
#[test]
#[ignore = "manual perf benchmark"]
fn perf_serializer_lazy_growth_amortization() {
const POOL_SIZE: usize = 128;
const BUFFER_SIZE: usize = 256 * 1024;
let serializer = ZeroAllocSerializer::new_with_prewarm(POOL_SIZE, BUFFER_SIZE, 0);
let payload = vec![1u8; 8 * 1024];
let first_start = Instant::now();
let first_buf = serializer.serialize_zero_alloc(&payload, "perf").unwrap();
let first = first_start.elapsed();
serializer.return_buffer(first_buf);
let second_start = Instant::now();
let second_buf = serializer.serialize_zero_alloc(&payload, "perf").unwrap();
let second = second_start.elapsed();
serializer.return_buffer(second_buf);
let (available, capacity) = serializer.get_pool_stats();
println!(
"[perf] serializer lazy growth\n pool_size={POOL_SIZE} buffer_size={BUFFER_SIZE}\n first_serialize={first:?} second_serialize={second:?}\n available={available} capacity={capacity}"
);
assert!(available >= 1);
assert_eq!(capacity, POOL_SIZE);
}
}