maili_protocol/compression/shadow.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
//! Contains the shadow compressor for Optimism.
//!
//! This is a port of the [ShadowCompressor][sc] from the op-batcher.
//!
//! [sc]: https://github.com/ethereum-optimism/optimism/blob/develop/op-batcher/compressor/shadow_compressor.go#L18
use crate::{CompressorError, CompressorResult, CompressorWriter, Config, VariantCompressor};
/// The largest potential blow-up in bytes we expect to see when compressing
/// arbitrary (e.g. random) data. Here we account for a 2 byte header, 4 byte
/// digest, 5 byte eof indicator, and then 5 byte flate block header for each 16k of potential
/// data. Assuming frames are max 128k size (the current max blob size) this is 2+4+5+(5*8) = 51
/// bytes. If we start using larger frames (e.g. should max blob size increase) a larger blowup
/// might be possible, but it would be highly unlikely, and the system still works if our
/// estimate is wrong -- we just end up writing one more tx for the overflow.
const SAFE_COMPRESSION_OVERHEAD: u64 = 51;
// The number of final bytes a `zlib.Writer` call writes to the output buffer.
const CLOSE_OVERHEAD_ZLIB: u64 = 9;
/// Shadow Compressor
///
/// The shadow compressor contains two compression buffers, one for size estimation, and
/// one for the final compressed data. The first compression buffer is flushed on every
/// write, and the second isn't, which means the final compressed data is always at least
/// smaller than the size estimation.
///
/// One exception to the rule is when the first write to the buffer is not checked against
/// the target. This allows individual blocks larger than the target to be included.
/// Notice, this will be split across multiple channel frames.
#[derive(Debug, Clone)]
pub struct ShadowCompressor {
/// The compressor configuration.
config: Config,
/// The inner [VariantCompressor] that will be used to compress the data.
compressor: VariantCompressor,
/// The shadow compressor.
shadow: VariantCompressor,
/// Flags that the buffer is full.
is_full: bool,
/// An upper bound on the size of the compressed data.
bound: u64,
}
impl ShadowCompressor {
/// Creates a new [ShadowCompressor] with the given [VariantCompressor].
pub const fn new(
config: Config,
compressor: VariantCompressor,
shadow: VariantCompressor,
) -> Self {
Self { config, is_full: false, compressor, shadow, bound: SAFE_COMPRESSION_OVERHEAD }
}
}
impl From<Config> for ShadowCompressor {
fn from(config: Config) -> Self {
let compressor = VariantCompressor::from(config.compression_algo);
let shadow = VariantCompressor::from(config.compression_algo);
Self::new(config, compressor, shadow)
}
}
impl CompressorWriter for ShadowCompressor {
fn write(&mut self, data: &[u8]) -> CompressorResult<usize> {
// If the buffer is full, error so the user can flush.
if self.is_full {
return Err(CompressorError::Full);
}
// Write to the shadow compressor.
self.shadow.write(data)?;
// The new bound increases by the length of the compressed data.
let mut newbound = data.len() as u64;
if newbound > self.config.target_output_size {
// Don't flush the buffer if there's a chance we're over the size limit.
self.shadow.flush()?;
newbound = self.shadow.len() as u64 + CLOSE_OVERHEAD_ZLIB;
if newbound > self.config.target_output_size {
self.is_full = true;
// Only error if the buffer has been written to.
if self.compressor.len() > 0 {
return Err(CompressorError::Full);
}
}
}
// Update the bound and compress.
self.bound = newbound;
self.compressor.write(data)
}
fn len(&self) -> usize {
self.compressor.len()
}
fn flush(&mut self) -> CompressorResult<()> {
self.shadow.flush()
}
fn close(&mut self) -> CompressorResult<()> {
self.shadow.close()
}
fn reset(&mut self) {
self.compressor.reset();
self.shadow.reset();
self.is_full = false;
self.bound = SAFE_COMPRESSION_OVERHEAD;
}
fn read(&mut self, buf: &mut [u8]) -> CompressorResult<usize> {
self.compressor.read(buf)
}
}