use crate::crc::{crc24, crc32};
use crate::frame::{
COMPRESSED_FRAME_HEADER_LENGTH, FRAME_TRAILER_LENGTH, PAYLOAD_SIZE_LIMIT,
UNCOMPRESSED_FRAME_HEADER_LENGTH,
};
use lz4_flex::block::get_maximum_output_size;
use lz4_flex::{compress, compress_into};
#[inline]
fn put3b(buffer: &mut [u8], value: i32) {
let value = value.to_le_bytes();
buffer[0] = value[0];
buffer[1] = value[1];
buffer[2] = value[2];
}
#[inline]
fn add_trailer(buffer: &mut Vec<u8>, payload_start: usize) {
buffer.reserve(4);
let crc = crc32(&buffer[payload_start..]).to_le_bytes();
buffer.push(crc[0]);
buffer.push(crc[1]);
buffer.push(crc[2]);
buffer.push(crc[3]);
}
pub trait FrameEncoder {
fn can_fit(&self, len: usize) -> bool;
fn reset(&mut self);
fn add_envelope(&mut self, envelope: Vec<u8>);
fn finalize_self_contained(&mut self) -> &[u8];
fn finalize_non_self_contained(&mut self, envelope: &[u8]) -> (usize, &[u8]);
fn has_envelopes(&self) -> bool;
}
#[derive(Clone, Debug, Default)]
pub struct LegacyFrameEncoder {
buffer: Vec<u8>,
}
impl FrameEncoder for LegacyFrameEncoder {
#[inline]
fn can_fit(&self, _len: usize) -> bool {
self.buffer.is_empty()
}
#[inline]
fn reset(&mut self) {
self.buffer.clear();
}
#[inline]
fn add_envelope(&mut self, envelope: Vec<u8>) {
self.buffer = envelope;
}
#[inline]
fn finalize_self_contained(&mut self) -> &[u8] {
&self.buffer
}
#[inline]
fn finalize_non_self_contained(&mut self, envelope: &[u8]) -> (usize, &[u8]) {
self.buffer.clear();
self.buffer.extend_from_slice(envelope);
(envelope.len(), &self.buffer)
}
#[inline]
fn has_envelopes(&self) -> bool {
!self.buffer.is_empty()
}
}
#[derive(Clone, Debug)]
pub struct UncompressedFrameEncoder {
buffer: Vec<u8>,
}
impl FrameEncoder for UncompressedFrameEncoder {
#[inline]
fn can_fit(&self, len: usize) -> bool {
(self.buffer.len() - UNCOMPRESSED_FRAME_HEADER_LENGTH).saturating_add(len)
< PAYLOAD_SIZE_LIMIT
}
#[inline]
fn reset(&mut self) {
self.buffer.truncate(UNCOMPRESSED_FRAME_HEADER_LENGTH);
}
#[inline]
fn add_envelope(&mut self, mut envelope: Vec<u8>) {
self.buffer.append(&mut envelope);
}
fn finalize_self_contained(&mut self) -> &[u8] {
self.write_header(true);
add_trailer(&mut self.buffer, UNCOMPRESSED_FRAME_HEADER_LENGTH);
&self.buffer
}
fn finalize_non_self_contained(&mut self, envelope: &[u8]) -> (usize, &[u8]) {
let max_size = envelope.len().min(PAYLOAD_SIZE_LIMIT - 1);
self.buffer.extend_from_slice(&envelope[..max_size]);
self.buffer.reserve(FRAME_TRAILER_LENGTH);
self.write_header(false);
add_trailer(&mut self.buffer, UNCOMPRESSED_FRAME_HEADER_LENGTH);
(max_size, &self.buffer)
}
#[inline]
fn has_envelopes(&self) -> bool {
self.buffer.len() > UNCOMPRESSED_FRAME_HEADER_LENGTH
}
}
impl Default for UncompressedFrameEncoder {
fn default() -> Self {
let mut buffer = vec![];
buffer.resize(UNCOMPRESSED_FRAME_HEADER_LENGTH, 0);
Self { buffer }
}
}
impl UncompressedFrameEncoder {
fn write_header(&mut self, self_contained: bool) {
let len = self.buffer.len();
debug_assert!(
len < (PAYLOAD_SIZE_LIMIT + UNCOMPRESSED_FRAME_HEADER_LENGTH),
"len: {} max: {}",
len,
PAYLOAD_SIZE_LIMIT + UNCOMPRESSED_FRAME_HEADER_LENGTH
);
let mut len = (len - UNCOMPRESSED_FRAME_HEADER_LENGTH) as u64;
if self_contained {
len |= 1 << 17;
}
put3b(self.buffer.as_mut_slice(), len as i32);
put3b(&mut self.buffer[3..], crc24(&len.to_le_bytes()[..3]));
}
}
#[derive(Clone, Debug)]
pub struct Lz4FrameEncoder {
buffer: Vec<u8>,
}
impl FrameEncoder for Lz4FrameEncoder {
#[inline]
fn can_fit(&self, len: usize) -> bool {
get_maximum_output_size(
(self.buffer.len() - COMPRESSED_FRAME_HEADER_LENGTH).saturating_add(len),
) < PAYLOAD_SIZE_LIMIT
}
#[inline]
fn reset(&mut self) {
self.buffer.truncate(COMPRESSED_FRAME_HEADER_LENGTH);
}
#[inline]
fn add_envelope(&mut self, mut envelope: Vec<u8>) {
self.buffer.append(&mut envelope);
}
fn finalize_self_contained(&mut self) -> &[u8] {
let uncompressed_size = self.buffer.len() - COMPRESSED_FRAME_HEADER_LENGTH;
let mut compressed_payload = compress(&self.buffer[COMPRESSED_FRAME_HEADER_LENGTH..]);
self.buffer.truncate(COMPRESSED_FRAME_HEADER_LENGTH);
self.buffer.append(&mut compressed_payload);
self.write_header(uncompressed_size, true);
add_trailer(&mut self.buffer, COMPRESSED_FRAME_HEADER_LENGTH);
&self.buffer
}
fn finalize_non_self_contained(&mut self, envelope: &[u8]) -> (usize, &[u8]) {
let mut uncompressed_size = envelope.len().min(PAYLOAD_SIZE_LIMIT - 1);
let offset = uncompressed_size;
self.buffer.resize(
get_maximum_output_size(uncompressed_size)
+ COMPRESSED_FRAME_HEADER_LENGTH
+ FRAME_TRAILER_LENGTH, 0,
);
let mut compressed_size = compress_into(
&envelope[..uncompressed_size],
&mut self.buffer[COMPRESSED_FRAME_HEADER_LENGTH..],
)
.unwrap(); if compressed_size >= PAYLOAD_SIZE_LIMIT {
self.buffer[COMPRESSED_FRAME_HEADER_LENGTH
..(COMPRESSED_FRAME_HEADER_LENGTH + uncompressed_size)]
.copy_from_slice(&envelope[..uncompressed_size]);
compressed_size = uncompressed_size;
uncompressed_size = 0; }
self.buffer
.truncate(COMPRESSED_FRAME_HEADER_LENGTH + compressed_size);
self.write_header(uncompressed_size, false);
add_trailer(&mut self.buffer, COMPRESSED_FRAME_HEADER_LENGTH);
(offset, &self.buffer)
}
#[inline]
fn has_envelopes(&self) -> bool {
self.buffer.len() > COMPRESSED_FRAME_HEADER_LENGTH
}
}
impl Default for Lz4FrameEncoder {
fn default() -> Self {
let mut buffer = vec![];
buffer.resize(COMPRESSED_FRAME_HEADER_LENGTH, 0);
Self { buffer }
}
}
impl Lz4FrameEncoder {
fn write_header(&mut self, uncompressed_size: usize, self_contained: bool) {
let len = self.buffer.len();
debug_assert!(len < (PAYLOAD_SIZE_LIMIT + COMPRESSED_FRAME_HEADER_LENGTH));
let mut header =
(len - COMPRESSED_FRAME_HEADER_LENGTH) as u64 | ((uncompressed_size as u64) << 17);
if self_contained {
header |= 1 << 34;
}
let crc = crc24(&header.to_le_bytes()[..5]) as u64;
let header = header | (crc << 40);
self.buffer[..8].copy_from_slice(&header.to_le_bytes());
}
}