use super::adaptive_encode::CodecAnalyzer;
use super::dictionary::StringDictionary;
use super::schema::{SchemaRegistry, SchemaType};
use super::stats_collector::StatisticsCollector;
use super::varint::*;
use super::{FLAG_CODEC_METADATA, FLAG_DICTIONARY, TBF_MAGIC, TBF_VERSION, TypeTag};
use std::collections::HashMap;
pub const MODE_SELF_DESCRIBING: u8 = 0x00;
pub const MODE_SCHEMA: u8 = 0x01;
pub const MARKER_SCHEMA_SEQ: u8 = 0xFE;
pub struct TbfSerializer {
pub(crate) buf: Vec<u8>,
pub(crate) dict: StringDictionary,
pub(crate) schemas: SchemaRegistry,
pub(crate) schema_buf: Vec<u8>,
pub(crate) codec_metadata_buf: Vec<u8>,
pub(crate) context: EncodingContext,
pub(crate) depth: usize,
pub(crate) stats: Option<StatisticsCollector>,
pub(crate) codec_analyzer: Option<CodecAnalyzer>,
pub(crate) selected_codecs: HashMap<String, super::adaptive_encode::CompressionCodec>,
pub(crate) codec_metadata: Vec<super::codec_encode::CodecMetadata>,
}
#[derive(Debug, Clone, Default)]
pub struct EncodingContext {
pub in_sequence: bool,
pub seq_element_count: usize,
pub seq_schema: Option<DetectedSchema>,
pub current_fields: Vec<(String, SchemaType)>,
pub seq_schema_idx: Option<u32>,
pub schema_mode: bool,
}
#[derive(Debug, Clone)]
pub struct DetectedSchema {
pub fields: Vec<String>,
pub types: Vec<SchemaType>,
}
impl TbfSerializer {
pub fn new() -> Self {
Self {
buf: Vec::with_capacity(1024),
dict: StringDictionary::new(),
schemas: SchemaRegistry::new(),
schema_buf: Vec::new(),
codec_metadata_buf: Vec::new(),
context: EncodingContext::default(),
depth: 0,
stats: None,
codec_analyzer: None,
selected_codecs: HashMap::new(),
codec_metadata: Vec::new(),
}
}
pub fn with_capacity(capacity: usize) -> Self {
Self {
buf: Vec::with_capacity(capacity),
dict: StringDictionary::with_capacity(capacity / 32),
schemas: SchemaRegistry::new(),
schema_buf: Vec::new(),
codec_metadata_buf: Vec::new(),
context: EncodingContext::default(),
depth: 0,
stats: None,
codec_analyzer: None,
selected_codecs: HashMap::new(),
codec_metadata: Vec::new(),
}
}
pub fn with_statistics() -> Self {
Self {
buf: Vec::with_capacity(1024),
dict: StringDictionary::new(),
schemas: SchemaRegistry::new(),
schema_buf: Vec::new(),
codec_metadata_buf: Vec::new(),
context: EncodingContext::default(),
depth: 0,
stats: Some(StatisticsCollector::new()),
codec_analyzer: None,
selected_codecs: HashMap::new(),
codec_metadata: Vec::new(),
}
}
pub fn with_capacity_and_statistics(capacity: usize) -> Self {
Self {
buf: Vec::with_capacity(capacity),
dict: StringDictionary::with_capacity(capacity / 32),
schemas: SchemaRegistry::new(),
schema_buf: Vec::new(),
codec_metadata_buf: Vec::new(),
context: EncodingContext::default(),
depth: 0,
stats: Some(StatisticsCollector::new()),
codec_analyzer: None,
selected_codecs: HashMap::new(),
codec_metadata: Vec::new(),
}
}
pub fn with_codecs() -> Self {
Self {
buf: Vec::with_capacity(1024),
dict: StringDictionary::new(),
schemas: SchemaRegistry::new(),
schema_buf: Vec::new(),
codec_metadata_buf: Vec::new(),
context: EncodingContext::default(),
depth: 0,
stats: None,
codec_analyzer: Some(CodecAnalyzer::new(100)),
selected_codecs: HashMap::new(),
codec_metadata: Vec::new(),
}
}
pub fn with_codecs_and_statistics() -> Self {
Self {
buf: Vec::with_capacity(1024),
dict: StringDictionary::new(),
schemas: SchemaRegistry::new(),
schema_buf: Vec::new(),
codec_metadata_buf: Vec::new(),
context: EncodingContext::default(),
depth: 0,
stats: Some(StatisticsCollector::new()),
codec_analyzer: Some(CodecAnalyzer::new(100)),
selected_codecs: HashMap::new(),
codec_metadata: Vec::new(),
}
}
pub fn into_bytes(mut self) -> Vec<u8> {
let mut dict_buf = Vec::new();
self.dict.encode(&mut dict_buf);
self.schemas.encode(&mut self.schema_buf, &mut self.dict);
dict_buf.clear();
self.dict.encode(&mut dict_buf);
if !self.codec_metadata.is_empty() {
use super::varint::encode_varint;
encode_varint(
self.codec_metadata.len() as u64,
&mut self.codec_metadata_buf,
);
for metadata in &self.codec_metadata {
let encoded = metadata.encode();
encode_varint(encoded.len() as u64, &mut self.codec_metadata_buf);
self.codec_metadata_buf.extend_from_slice(&encoded);
}
}
let mode = if self.schemas.is_empty() {
MODE_SELF_DESCRIBING
} else {
MODE_SCHEMA
};
let mut flags = FLAG_DICTIONARY;
if !self.codec_metadata_buf.is_empty() {
flags |= FLAG_CODEC_METADATA;
}
flags |= mode << 4;
let mut result = Vec::with_capacity(
8 + dict_buf.len()
+ self.schema_buf.len()
+ self.codec_metadata_buf.len()
+ self.buf.len(),
);
result.extend_from_slice(&TBF_MAGIC);
result.push(TBF_VERSION);
result.push(flags);
result.extend_from_slice(&[0u8; 2]);
result.extend_from_slice(&dict_buf);
if mode == MODE_SCHEMA {
result.extend_from_slice(&self.schema_buf);
}
if !self.codec_metadata_buf.is_empty() {
result.extend_from_slice(&self.codec_metadata_buf);
}
result.extend_from_slice(&self.buf);
if let Some(stats) = self.stats
&& let Ok(stats_bytes) = stats.encode_all()
{
let footer_offset = result.len() as u64;
result.extend_from_slice(&stats_bytes);
result.extend_from_slice(&footer_offset.to_le_bytes());
}
result
}
pub fn output(&self) -> &[u8] {
&self.buf
}
pub fn add_codec_metadata(&mut self, metadata: super::codec_encode::CodecMetadata) {
self.codec_metadata.push(metadata);
}
pub fn codec_metadata_count(&self) -> usize {
self.codec_metadata.len()
}
#[inline(always)]
pub(crate) fn write_tag(&mut self, tag: TypeTag) {
self.buf.push(tag as u8);
}
#[inline(always)]
pub(crate) fn write_varint(&mut self, value: u64) {
encode_varint(value, &mut self.buf);
}
#[inline(always)]
pub(crate) fn write_signed_varint(&mut self, value: i64) {
encode_signed_varint(value, &mut self.buf);
}
#[inline]
pub(crate) fn write_string(&mut self, s: &str) {
let idx = self.dict.intern(s);
encode_varint(idx as u64, &mut self.buf);
}
#[inline]
pub(crate) fn write_typed_value_bool(&mut self, v: bool) {
self.buf.push(if v { 1 } else { 0 });
}
#[inline]
pub(crate) fn write_typed_value_int(&mut self, v: i64) {
encode_signed_varint(v, &mut self.buf);
}
#[inline]
pub(crate) fn write_typed_value_uint(&mut self, v: u64) {
encode_varint(v, &mut self.buf);
}
#[inline]
pub(crate) fn write_typed_value_f32(&mut self, v: f32) {
self.buf.extend_from_slice(&v.to_le_bytes());
}
#[inline]
pub(crate) fn write_typed_value_f64(&mut self, v: f64) {
self.buf.extend_from_slice(&v.to_le_bytes());
}
#[inline]
pub(crate) fn write_typed_value_string(&mut self, s: &str) {
let idx = self.dict.intern(s);
encode_varint(idx as u64, &mut self.buf);
}
#[inline]
pub(crate) fn enter(&mut self) {
self.depth += 1;
}
#[inline]
pub(crate) fn leave(&mut self) {
self.depth -= 1;
}
#[inline]
pub(crate) fn is_root(&self) -> bool {
self.depth == 0
}
pub(crate) fn begin_sequence(&mut self, len: Option<usize>) -> SequenceEncoder<'_> {
SequenceEncoder::new(self, len)
}
pub(crate) fn in_schema_mode(&self) -> bool {
self.context.schema_mode && self.context.seq_schema.is_some()
}
pub(crate) fn get_expected_field_type(&self, field_idx: usize) -> Option<SchemaType> {
self.context
.seq_schema
.as_ref()
.and_then(|s| s.types.get(field_idx).copied())
}
}
impl Default for TbfSerializer {
fn default() -> Self {
Self::new()
}
}
pub struct SequenceEncoder<'a> {
serializer: &'a mut TbfSerializer,
len: Option<usize>,
element_count: usize,
schema_detected: bool,
first_element_buf: Vec<u8>,
first_element_fields: Vec<(String, SchemaType)>,
}
impl<'a> SequenceEncoder<'a> {
fn new(serializer: &'a mut TbfSerializer, len: Option<usize>) -> Self {
Self {
serializer,
len,
element_count: 0,
schema_detected: false,
first_element_buf: Vec::new(),
first_element_fields: Vec::new(),
}
}
pub fn before_element(&mut self) {
self.element_count += 1;
}
pub fn struct_started(&mut self, field_count: usize) {
if self.element_count == 1 {
self.serializer.context.current_fields = Vec::with_capacity(field_count);
}
}
pub fn field_serialized(&mut self, name: &str, typ: SchemaType) {
if self.element_count == 1 {
self.serializer
.context
.current_fields
.push((name.to_string(), typ));
}
}
pub fn first_struct_ended(&mut self) {
if self.element_count == 1 && !self.serializer.context.current_fields.is_empty() {
let fields: Vec<String> = self
.serializer
.context
.current_fields
.iter()
.map(|(n, _)| n.clone())
.collect();
let types: Vec<SchemaType> = self
.serializer
.context
.current_fields
.iter()
.map(|(_, t)| *t)
.collect();
let detected = DetectedSchema { fields, types };
self.serializer.context.seq_schema = Some(detected);
self.serializer.context.schema_mode = true;
self.schema_detected = true;
}
}
pub fn finish(self) {
self.serializer.context.in_sequence = false;
self.serializer.context.seq_element_count = 0;
self.serializer.context.seq_schema = None;
self.serializer.context.schema_mode = false;
self.serializer.context.current_fields.clear();
}
}
pub struct SchemaStructSerializer<'a> {
serializer: &'a mut TbfSerializer,
field_idx: usize,
schema_mode: bool,
expected_types: Option<Vec<SchemaType>>,
}
impl<'a> SchemaStructSerializer<'a> {
pub fn new(serializer: &'a mut TbfSerializer, schema_mode: bool) -> Self {
let expected_types = if schema_mode {
serializer
.context
.seq_schema
.as_ref()
.map(|s| s.types.clone())
} else {
None
};
Self {
serializer,
field_idx: 0,
schema_mode,
expected_types,
}
}
pub fn expected_type(&self) -> Option<SchemaType> {
self.expected_types
.as_ref()
.and_then(|types| types.get(self.field_idx).copied())
}
pub fn next_field(&mut self) {
self.field_idx += 1;
}
pub fn is_schema_mode(&self) -> bool {
self.schema_mode
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tbf::adaptive_encode::CompressionCodec;
use serde_json::json;
#[test]
fn test_serializer_with_codecs_creation() {
let serializer = TbfSerializer::with_codecs();
assert!(serializer.codec_analyzer.is_some());
assert!(serializer.stats.is_none());
assert!(serializer.selected_codecs.is_empty());
}
#[test]
fn test_serializer_with_codecs_and_statistics() {
let serializer = TbfSerializer::with_codecs_and_statistics();
assert!(serializer.codec_analyzer.is_some());
assert!(serializer.stats.is_some());
assert!(serializer.selected_codecs.is_empty());
}
#[test]
fn test_serializer_without_codecs_unchanged() {
let serializer = TbfSerializer::new();
assert!(serializer.codec_analyzer.is_none());
assert!(serializer.stats.is_none());
assert!(serializer.selected_codecs.is_empty());
}
#[test]
fn test_codec_analyzer_accessibility() {
let mut serializer = TbfSerializer::with_codecs();
if let Some(analyzer) = &mut serializer.codec_analyzer {
for i in 0..20 {
analyzer.add_sample(Some(json!(i)));
}
let codec = analyzer.choose_codec();
assert_eq!(codec, CompressionCodec::Delta);
} else {
panic!("codec_analyzer should be Some");
}
}
#[test]
fn test_codec_roundtrip_basic() {
let _value = json!({
"id": 1,
"name": "test"
});
let serializer = TbfSerializer::new();
let bytes = serializer.into_bytes();
assert!(!bytes.is_empty());
}
#[test]
fn test_selected_codecs_storage() {
let mut serializer = TbfSerializer::with_codecs();
serializer
.selected_codecs
.insert("field1".to_string(), CompressionCodec::Dictionary);
serializer
.selected_codecs
.insert("field2".to_string(), CompressionCodec::Delta);
assert_eq!(serializer.selected_codecs.len(), 2);
assert_eq!(
serializer.selected_codecs.get("field1"),
Some(&CompressionCodec::Dictionary)
);
assert_eq!(
serializer.selected_codecs.get("field2"),
Some(&CompressionCodec::Delta)
);
}
#[test]
fn test_codec_and_stats_together() {
let serializer = TbfSerializer::with_codecs_and_statistics();
assert!(serializer.codec_analyzer.is_some());
assert!(serializer.stats.is_some());
assert_eq!(serializer.depth, 0);
assert!(serializer.buf.is_empty());
}
#[test]
fn test_codec_metadata_collection() {
use crate::tbf::codec_encode::CodecMetadata;
let mut serializer = TbfSerializer::new();
serializer.add_codec_metadata(CodecMetadata::Delta { initial_value: 100 });
serializer.add_codec_metadata(CodecMetadata::Dictionary {
dictionary_size: 50,
});
assert_eq!(serializer.codec_metadata_count(), 2);
}
#[test]
fn test_codec_metadata_binary_encoding() {
use crate::tbf::codec_encode::CodecMetadata;
let mut serializer = TbfSerializer::new();
serializer.add_codec_metadata(CodecMetadata::Delta { initial_value: 42 });
let bytes = serializer.into_bytes();
assert!(bytes.len() > 8);
let flags = bytes[5];
assert!(flags & 0x04 != 0, "Codec metadata flag should be set");
}
#[test]
fn test_codec_metadata_format_section() {
use crate::tbf::codec_encode::CodecMetadata;
let mut serializer = TbfSerializer::with_codecs();
serializer.add_codec_metadata(CodecMetadata::RLE);
serializer.add_codec_metadata(CodecMetadata::Dictionary {
dictionary_size: 100,
});
let bytes = serializer.into_bytes();
assert!(bytes.len() > 8);
let flags = bytes[5];
assert!(flags & 0x04 != 0);
}
#[test]
fn test_codec_metadata_with_statistics() {
use crate::tbf::codec_encode::CodecMetadata;
let mut serializer = TbfSerializer::with_codecs_and_statistics();
serializer.add_codec_metadata(CodecMetadata::Delta { initial_value: 0 });
let bytes = serializer.into_bytes();
assert!(bytes.len() > 8);
let flags = bytes[5];
assert!(flags & 0x04 != 0);
}
#[test]
fn test_no_codec_metadata_no_flag() {
let serializer = TbfSerializer::new();
let bytes = serializer.into_bytes();
let flags = bytes[5];
assert!(
flags & 0x04 == 0,
"Codec metadata flag should not be set when no codecs"
);
}
#[test]
fn test_multiple_codec_metadata_entries() {
use crate::tbf::codec_encode::CodecMetadata;
let mut serializer = TbfSerializer::new();
for i in 0..5 {
serializer.add_codec_metadata(CodecMetadata::Delta {
initial_value: i as i64,
});
}
assert_eq!(serializer.codec_metadata_count(), 5);
let bytes = serializer.into_bytes();
let flags = bytes[5];
assert!(flags & 0x04 != 0);
}
}