use super::ultra_encode::UltraBuffer;
use std::cell::RefCell;
use std::collections::HashMap;
thread_local! {
static SCRATCH: RefCell<Vec<u8>> = const { RefCell::new(Vec::new()) };
static OUTPUT: RefCell<UltraBuffer> = RefCell::new(UltraBuffer::new());
}
#[inline]
pub fn with_scratch<T>(f: impl FnOnce(&mut Vec<u8>) -> T) -> T {
SCRATCH.with(|s| {
let s = &mut *s.borrow_mut();
s.clear();
f(s)
})
}
#[inline]
pub fn with_output<T>(estimated_size: usize, f: impl FnOnce(&mut UltraBuffer) -> T) -> T {
OUTPUT.with(|o| {
let o = &mut *o.borrow_mut();
o.clear();
if o.capacity() < estimated_size {
*o = UltraBuffer::with_capacity(estimated_size);
}
f(o)
})
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum FieldEncoding {
Auto,
U8,
U16,
U32,
U64,
I8,
I16,
I32,
I64,
U8Offset {
offset: i64,
},
U16Offset {
offset: i64,
},
U32Offset {
offset: i64,
},
Compact {
min_hint: i64,
max_hint: i64,
},
VarInt,
Dictionary,
Inline,
Bool,
Float32,
Float64,
}
impl FieldEncoding {
#[inline]
pub const fn compact(min_hint: i64, max_hint: i64) -> Self {
FieldEncoding::Compact { min_hint, max_hint }
}
#[inline]
pub const fn compact_unsigned(max_hint: u64) -> Self {
FieldEncoding::Compact {
min_hint: 0,
max_hint: max_hint as i64,
}
}
#[inline]
pub fn bits(&self) -> Option<u8> {
match self {
FieldEncoding::U8 | FieldEncoding::I8 | FieldEncoding::U8Offset { .. } => Some(8),
FieldEncoding::U16 | FieldEncoding::I16 | FieldEncoding::U16Offset { .. } => Some(16),
FieldEncoding::U32
| FieldEncoding::I32
| FieldEncoding::Float32
| FieldEncoding::U32Offset { .. } => Some(32),
FieldEncoding::U64 | FieldEncoding::I64 | FieldEncoding::Float64 => Some(64),
FieldEncoding::Bool => Some(1),
FieldEncoding::Compact { min_hint, max_hint } => {
let range = (*max_hint as u64).saturating_sub(*min_hint as u64);
Some(if range <= 0xFF {
8
} else if range <= 0xFFFF {
16
} else if range <= 0xFFFF_FFFF {
32
} else {
64
})
}
_ => None,
}
}
#[inline]
pub fn is_signed(&self) -> bool {
matches!(
self,
FieldEncoding::I8
| FieldEncoding::I16
| FieldEncoding::I32
| FieldEncoding::I64
| FieldEncoding::Compact { .. }
)
}
#[inline]
pub fn offset(&self) -> i64 {
match self {
FieldEncoding::U8Offset { offset } => *offset,
FieldEncoding::U16Offset { offset } => *offset,
FieldEncoding::U32Offset { offset } => *offset,
FieldEncoding::Compact { min_hint, .. } => *min_hint,
_ => 0,
}
}
}
#[derive(Debug, Clone)]
pub struct ColumnSchema {
pub name: String,
pub encoding: FieldEncoding,
}
impl ColumnSchema {
pub fn new(name: impl Into<String>, encoding: FieldEncoding) -> Self {
Self {
name: name.into(),
encoding,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct TableSchema {
columns: Vec<ColumnSchema>,
}
impl TableSchema {
pub fn new() -> Self {
Self {
columns: Vec::new(),
}
}
pub fn builder() -> TableSchemaBuilder {
TableSchemaBuilder::new()
}
pub fn add_column(&mut self, name: impl Into<String>, encoding: FieldEncoding) {
self.columns.push(ColumnSchema::new(name, encoding));
}
pub fn columns(&self) -> &[ColumnSchema] {
&self.columns
}
pub fn encoding(&self, index: usize) -> Option<FieldEncoding> {
self.columns.get(index).map(|c| c.encoding)
}
pub fn encoding_by_name(&self, name: &str) -> Option<FieldEncoding> {
self.columns
.iter()
.find(|c| c.name == name)
.map(|c| c.encoding)
}
}
#[derive(Debug, Default)]
pub struct TableSchemaBuilder {
columns: Vec<ColumnSchema>,
}
impl TableSchemaBuilder {
pub fn new() -> Self {
Self {
columns: Vec::new(),
}
}
pub fn column(mut self, name: impl Into<String>, encoding: FieldEncoding) -> Self {
self.columns.push(ColumnSchema::new(name, encoding));
self
}
pub fn u8(self, name: impl Into<String>) -> Self {
self.column(name, FieldEncoding::U8)
}
pub fn u16(self, name: impl Into<String>) -> Self {
self.column(name, FieldEncoding::U16)
}
pub fn u32(self, name: impl Into<String>) -> Self {
self.column(name, FieldEncoding::U32)
}
pub fn u64(self, name: impl Into<String>) -> Self {
self.column(name, FieldEncoding::U64)
}
pub fn i8(self, name: impl Into<String>) -> Self {
self.column(name, FieldEncoding::I8)
}
pub fn i16(self, name: impl Into<String>) -> Self {
self.column(name, FieldEncoding::I16)
}
pub fn i32(self, name: impl Into<String>) -> Self {
self.column(name, FieldEncoding::I32)
}
pub fn i64(self, name: impl Into<String>) -> Self {
self.column(name, FieldEncoding::I64)
}
pub fn f32(self, name: impl Into<String>) -> Self {
self.column(name, FieldEncoding::Float32)
}
pub fn f64(self, name: impl Into<String>) -> Self {
self.column(name, FieldEncoding::Float64)
}
pub fn bool(self, name: impl Into<String>) -> Self {
self.column(name, FieldEncoding::Bool)
}
pub fn u8_offset(self, name: impl Into<String>, offset: i64) -> Self {
self.column(name, FieldEncoding::U8Offset { offset })
}
pub fn u16_offset(self, name: impl Into<String>, offset: i64) -> Self {
self.column(name, FieldEncoding::U16Offset { offset })
}
pub fn u32_offset(self, name: impl Into<String>, offset: i64) -> Self {
self.column(name, FieldEncoding::U32Offset { offset })
}
pub fn dict(self, name: impl Into<String>) -> Self {
self.column(name, FieldEncoding::Dictionary)
}
pub fn string(self, name: impl Into<String>) -> Self {
self.column(name, FieldEncoding::Inline)
}
pub fn auto(self, name: impl Into<String>) -> Self {
self.column(name, FieldEncoding::Auto)
}
pub fn compact(self, name: impl Into<String>, min: i64, max: i64) -> Self {
self.column(name, FieldEncoding::compact(min, max))
}
pub fn varint(self, name: impl Into<String>) -> Self {
self.column(name, FieldEncoding::VarInt)
}
pub fn dictionary(self, name: impl Into<String>) -> Self {
self.column(name, FieldEncoding::Dictionary)
}
pub fn inline(self, name: impl Into<String>) -> Self {
self.column(name, FieldEncoding::Inline)
}
pub fn build(self) -> TableSchema {
TableSchema {
columns: self.columns,
}
}
}
pub struct AdaptiveIntEncoder {
values: Vec<i64>,
encoding: FieldEncoding,
actual_min: i64,
actual_max: i64,
}
impl AdaptiveIntEncoder {
pub fn new(encoding: FieldEncoding, capacity: usize) -> Self {
Self {
values: Vec::with_capacity(capacity),
encoding,
actual_min: i64::MAX,
actual_max: i64::MIN,
}
}
#[inline(always)]
pub fn push(&mut self, value: i64) {
self.values.push(value);
if matches!(self.encoding, FieldEncoding::Auto) {
self.actual_min = self.actual_min.min(value);
self.actual_max = self.actual_max.max(value);
}
}
#[inline(always)]
pub fn push_u32(&mut self, value: u32) {
self.push(value as i64);
}
pub fn encode_to(&self, buf: &mut UltraBuffer) {
if self.values.is_empty() {
return;
}
let (bits, offset, signed) = match self.encoding {
FieldEncoding::U8 => (8, 0i64, false),
FieldEncoding::U16 => (16, 0, false),
FieldEncoding::U32 => (32, 0, false),
FieldEncoding::U64 => (64, 0, false),
FieldEncoding::I8 => (8, 0, true),
FieldEncoding::I16 => (16, 0, true),
FieldEncoding::I32 => (32, 0, true),
FieldEncoding::I64 => (64, 0, true),
FieldEncoding::U8Offset { offset } => (8, offset, false),
FieldEncoding::U16Offset { offset } => (16, offset, false),
FieldEncoding::U32Offset { offset } => (32, offset, false),
FieldEncoding::Auto => {
let range = (self.actual_max - self.actual_min) as u64;
let bits = if range <= 0xFF {
8
} else if range <= 0xFFFF {
16
} else if range <= 0xFFFF_FFFF {
32
} else {
64
};
(bits, self.actual_min, true)
}
FieldEncoding::Compact { min_hint, max_hint } => {
let hint_range = (max_hint - min_hint) as u64;
let bits = if hint_range <= 0xFF {
8
} else if hint_range <= 0xFFFF {
16
} else if hint_range <= 0xFFFF_FFFF {
32
} else {
64
};
(bits, min_hint, true)
}
FieldEncoding::VarInt => {
self.encode_varint(buf);
return;
}
_ => (64, 0, false),
};
let _ = signed;
buf.push(bits);
if bits < 64 {
encode_varint_fast(offset as u64, buf);
}
match bits {
8 => {
buf.reserve(self.values.len());
for &v in &self.values {
let packed = (v - offset) as u8;
unsafe {
buf.push_unchecked(packed);
}
}
}
16 => {
buf.reserve(self.values.len() * 2);
for &v in &self.values {
let packed = (v - offset) as u16;
unsafe {
buf.extend_unchecked(&packed.to_le_bytes());
}
}
}
32 => {
buf.reserve(self.values.len() * 4);
for &v in &self.values {
let packed = (v - offset) as u32;
unsafe {
buf.extend_unchecked(&packed.to_le_bytes());
}
}
}
64 => {
buf.reserve(self.values.len() * 8);
for &v in &self.values {
unsafe {
buf.extend_unchecked(&v.to_le_bytes());
}
}
}
_ => unreachable!(),
}
}
fn encode_varint(&self, buf: &mut UltraBuffer) {
buf.push(0); for &v in &self.values {
encode_signed_varint_fast(v, buf);
}
}
}
pub struct AdaptiveStringEncoder {
dict: HashMap<String, u16>,
indices: Vec<u16>,
dict_strings: Vec<String>,
inline_data: UltraBuffer,
encoding: FieldEncoding,
count: usize,
}
impl AdaptiveStringEncoder {
pub fn new(encoding: FieldEncoding, capacity: usize) -> Self {
let use_dict = matches!(encoding, FieldEncoding::Dictionary | FieldEncoding::Auto);
Self {
dict: if use_dict {
HashMap::with_capacity(256)
} else {
HashMap::new()
},
indices: if use_dict {
Vec::with_capacity(capacity)
} else {
Vec::new()
},
dict_strings: if use_dict {
Vec::with_capacity(256)
} else {
Vec::new()
},
inline_data: if !use_dict {
UltraBuffer::with_capacity(capacity * 16)
} else {
UltraBuffer::new()
},
encoding,
count: 0,
}
}
#[inline]
pub fn push(&mut self, s: &str) {
self.count += 1;
match self.encoding {
FieldEncoding::Dictionary => {
self.push_dict(s);
}
FieldEncoding::Inline => {
self.push_inline(s);
}
FieldEncoding::Auto => {
if self.dict_strings.len() < 65535 || self.dict.contains_key(s) {
self.push_dict(s);
} else {
self.push_dict(s);
}
}
_ => {
self.push_inline(s);
}
}
}
#[inline]
fn push_dict(&mut self, s: &str) {
let idx = if let Some(&idx) = self.dict.get(s) {
idx
} else {
let idx = self.dict_strings.len() as u16;
self.dict.insert(s.to_string(), idx);
self.dict_strings.push(s.to_string());
idx
};
self.indices.push(idx);
}
#[inline]
fn push_inline(&mut self, s: &str) {
let len = s.len();
self.inline_data.reserve(len + 4);
if len < 128 {
unsafe {
self.inline_data.push_unchecked(len as u8);
}
} else {
encode_varint_fast(len as u64, &mut self.inline_data);
}
unsafe {
self.inline_data.extend_unchecked(s.as_bytes());
}
}
pub fn encode_to(self, buf: &mut UltraBuffer) {
let use_dict = !self.dict_strings.is_empty();
buf.push(if use_dict { 1 } else { 0 });
if use_dict {
encode_varint_fast(self.dict_strings.len() as u64, buf);
for s in &self.dict_strings {
encode_varint_fast(s.len() as u64, buf);
buf.extend(s.as_bytes());
}
let dict_size = self.dict_strings.len();
if dict_size <= 16 {
let packed_len = self.indices.len().div_ceil(2);
buf.reserve(packed_len);
for chunk in self.indices.chunks(2) {
let byte = (chunk[0] as u8) | ((chunk.get(1).copied().unwrap_or(0) as u8) << 4);
unsafe {
buf.push_unchecked(byte);
}
}
} else if dict_size <= 256 {
buf.reserve(self.indices.len());
for &idx in &self.indices {
unsafe {
buf.push_unchecked(idx as u8);
}
}
} else {
buf.reserve(self.indices.len() * 2);
for &idx in &self.indices {
unsafe {
buf.extend_unchecked(&idx.to_le_bytes());
}
}
}
} else {
buf.extend(self.inline_data.as_slice());
}
}
}
#[inline(always)]
pub fn encode_varint_fast(mut value: u64, buf: &mut UltraBuffer) {
buf.reserve(10);
while value >= 0x80 {
unsafe {
buf.push_unchecked((value as u8) | 0x80);
}
value >>= 7;
}
unsafe {
buf.push_unchecked(value as u8);
}
}
#[inline(always)]
pub fn encode_signed_varint_fast(value: i64, buf: &mut UltraBuffer) {
let encoded = ((value << 1) ^ (value >> 63)) as u64;
encode_varint_fast(encoded, buf);
}
pub const SCHEMA_MAGIC: [u8; 4] = [0x53, 0x43, 0x48, 0x01];
pub trait TableEncode {
fn schema() -> TableSchema;
fn encode_with_schema(items: &[Self]) -> Vec<u8>
where
Self: Sized;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_table_schema_builder() {
let schema = TableSchema::builder()
.u32("id")
.string("name")
.u8("age")
.dict("city")
.dict("department")
.u32("salary")
.u8("experience")
.u8("project_count")
.build();
assert_eq!(schema.columns().len(), 8);
assert_eq!(schema.encoding(0), Some(FieldEncoding::U32));
assert_eq!(schema.encoding(2), Some(FieldEncoding::U8));
assert_eq!(
schema.encoding_by_name("city"),
Some(FieldEncoding::Dictionary)
);
assert_eq!(schema.encoding_by_name("name"), Some(FieldEncoding::Inline));
}
#[test]
fn test_field_encoding_bits() {
assert_eq!(FieldEncoding::U8.bits(), Some(8));
assert_eq!(FieldEncoding::U16.bits(), Some(16));
assert_eq!(FieldEncoding::U32.bits(), Some(32));
assert_eq!(FieldEncoding::U64.bits(), Some(64));
assert_eq!(FieldEncoding::I8.bits(), Some(8));
assert_eq!(FieldEncoding::compact(0, 100).bits(), Some(8));
assert_eq!(FieldEncoding::compact(0, 1000).bits(), Some(16));
assert_eq!(FieldEncoding::compact(0, 100_000).bits(), Some(32));
}
#[test]
fn test_adaptive_int_encoder_auto() {
let mut enc = AdaptiveIntEncoder::new(FieldEncoding::Auto, 10);
for i in 0..10 {
enc.push(i * 10);
}
let mut buf = UltraBuffer::with_capacity(100);
enc.encode_to(&mut buf);
assert!(buf.len() < 20); }
#[test]
fn test_adaptive_int_encoder_compact() {
let mut enc = AdaptiveIntEncoder::new(FieldEncoding::compact(1000, 1100), 5);
enc.push(1000);
enc.push(1050);
enc.push(1100);
let mut buf = UltraBuffer::with_capacity(100);
enc.encode_to(&mut buf);
assert!(buf.len() <= 10);
}
#[test]
fn test_adaptive_string_encoder_dict() {
let mut enc = AdaptiveStringEncoder::new(FieldEncoding::Dictionary, 100);
for _ in 0..10 {
enc.push("NYC");
enc.push("LA");
enc.push("Chicago");
}
let mut buf = UltraBuffer::with_capacity(200);
enc.encode_to(&mut buf);
assert!(buf.len() < 100);
}
#[test]
fn test_adaptive_string_encoder_inline() {
let mut enc = AdaptiveStringEncoder::new(FieldEncoding::Inline, 10);
enc.push("hello");
enc.push("world");
let mut buf = UltraBuffer::with_capacity(100);
enc.encode_to(&mut buf);
assert_eq!(buf.len(), 13);
}
#[test]
fn test_thread_local_scratch() {
let result = with_scratch(|buf| {
buf.extend_from_slice(&[1, 2, 3]);
buf.len()
});
assert_eq!(result, 3);
let result2 = with_scratch(|buf| {
assert!(buf.is_empty());
buf.extend_from_slice(&[4, 5]);
buf.len()
});
assert_eq!(result2, 2);
}
}