use std::marker::PhantomData;
pub struct UltraBuffer {
start: *mut u8,
end: *mut u8,
capacity: *mut u8,
_marker: PhantomData<Vec<u8>>,
}
impl Default for UltraBuffer {
fn default() -> Self {
Self::new()
}
}
impl Drop for UltraBuffer {
fn drop(&mut self) {
unsafe {
let _ = Vec::from_raw_parts(self.start, self.len(), self.capacity());
}
}
}
impl UltraBuffer {
#[inline]
pub fn new() -> Self {
Self::with_capacity(0)
}
#[inline]
pub fn with_capacity(capacity: usize) -> Self {
let mut vec: Vec<u8> = Vec::with_capacity(capacity);
let start = vec.as_mut_ptr();
let end = start; let cap = unsafe { start.add(vec.capacity()) };
std::mem::forget(vec);
Self {
start,
end,
capacity: cap,
_marker: PhantomData,
}
}
#[inline(always)]
pub fn len(&self) -> usize {
self.end as usize - self.start as usize
}
#[inline(always)]
pub fn is_empty(&self) -> bool {
self.end == self.start
}
#[inline(always)]
pub fn capacity(&self) -> usize {
self.capacity as usize - self.start as usize
}
#[inline(always)]
pub fn remaining(&self) -> usize {
self.capacity as usize - self.end as usize
}
#[inline(always)]
pub fn as_slice(&self) -> &[u8] {
unsafe { std::slice::from_raw_parts(self.start, self.len()) }
}
#[inline]
pub fn into_vec(self) -> Vec<u8> {
let vec = unsafe { Vec::from_raw_parts(self.start, self.len(), self.capacity()) };
std::mem::forget(self); vec
}
#[inline(always)]
pub fn clear(&mut self) {
self.end = self.start;
}
pub fn reserve(&mut self, additional: usize) {
if additional > self.remaining() {
self.reserve_slow(additional);
}
}
#[cold]
#[inline(never)]
fn reserve_slow(&mut self, additional: usize) {
let len = self.len();
let cap = self.capacity();
let mut vec =
std::mem::ManuallyDrop::new(unsafe { Vec::from_raw_parts(self.start, len, cap) });
vec.reserve(additional);
self.start = vec.as_mut_ptr();
self.end = unsafe { self.start.add(len) };
self.capacity = unsafe { self.start.add(vec.capacity()) };
}
#[inline(always)]
pub unsafe fn push_unchecked(&mut self, byte: u8) {
debug_assert!(self.end < self.capacity);
unsafe {
std::ptr::write(self.end, byte);
self.end = self.end.add(1);
}
}
#[inline(always)]
pub unsafe fn extend_unchecked(&mut self, bytes: &[u8]) {
debug_assert!(self.remaining() >= bytes.len());
unsafe {
std::ptr::copy_nonoverlapping(bytes.as_ptr(), self.end, bytes.len());
self.end = self.end.add(bytes.len());
}
}
#[inline(always)]
pub fn push(&mut self, byte: u8) {
self.reserve(1);
unsafe {
self.push_unchecked(byte);
}
}
#[inline(always)]
pub fn extend(&mut self, bytes: &[u8]) {
self.reserve(bytes.len());
unsafe {
self.extend_unchecked(bytes);
}
}
#[inline(always)]
pub fn end_ptr(&mut self) -> *mut u8 {
self.end
}
#[inline(always)]
pub unsafe fn set_end(&mut self, new_end: *mut u8) {
debug_assert!(new_end >= self.start && new_end <= self.capacity);
self.end = new_end;
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
#[repr(u8)]
pub enum IntPacking {
Bits64 = 0,
Bits32 = 1,
Bits16 = 2,
Bits8 = 3,
}
impl IntPacking {
#[inline]
pub fn from_max_u64(max: u64) -> Self {
if max <= u8::MAX as u64 {
IntPacking::Bits8
} else if max <= u16::MAX as u64 {
IntPacking::Bits16
} else if max <= u32::MAX as u64 {
IntPacking::Bits32
} else {
IntPacking::Bits64
}
}
#[inline]
pub fn from_max_u32(max: u32) -> Self {
if max <= u8::MAX as u32 {
IntPacking::Bits8
} else if max <= u16::MAX as u32 {
IntPacking::Bits16
} else {
IntPacking::Bits32
}
}
#[inline(always)]
pub fn bytes_per_value(&self) -> usize {
match self {
IntPacking::Bits8 => 1,
IntPacking::Bits16 => 2,
IntPacking::Bits32 => 4,
IntPacking::Bits64 => 8,
}
}
}
#[inline]
pub fn sample_minmax_u32(values: &[u32], sample_size: usize) -> (u32, u32) {
if values.is_empty() {
return (0, 0);
}
let sample = &values[..values.len().min(sample_size)];
let mut min = sample[0];
let mut max = sample[0];
for &v in &sample[1..] {
min = min.min(v);
max = max.max(v);
}
if sample.len() < values.len() {
for &v in &values[sample.len()..] {
min = min.min(v);
max = max.max(v);
}
}
(min, max)
}
#[inline]
pub fn sample_minmax_u64(values: &[u64], sample_size: usize) -> (u64, u64) {
if values.is_empty() {
return (0, 0);
}
let sample = &values[..values.len().min(sample_size)];
let mut min = sample[0];
let mut max = sample[0];
for &v in &sample[1..] {
min = min.min(v);
max = max.max(v);
}
if sample.len() < values.len() {
for &v in &values[sample.len()..] {
min = min.min(v);
max = max.max(v);
}
}
(min, max)
}
pub fn pack_u32_adaptive(values: &[u32], buf: &mut UltraBuffer) -> (IntPacking, bool) {
if values.is_empty() {
return (IntPacking::Bits8, false);
}
let (min, max) = sample_minmax_u32(values, 16);
let range = max.wrapping_sub(min);
let basic_packing = IntPacking::from_max_u32(max);
let offset_packing = IntPacking::from_max_u32(range);
let use_offset = offset_packing > basic_packing && values.len() > 5;
let packing = if use_offset {
offset_packing
} else {
basic_packing
};
let data_size = values.len() * packing.bytes_per_value();
let header_size = 1 + if use_offset { 4 } else { 0 };
buf.reserve(header_size + data_size);
let header = (packing as u8) << 1 | (use_offset as u8);
unsafe {
buf.push_unchecked(header);
}
if use_offset {
unsafe {
buf.extend_unchecked(&min.to_le_bytes());
}
}
match packing {
IntPacking::Bits8 => {
buf.reserve(values.len());
if use_offset {
for &v in values {
unsafe {
buf.push_unchecked((v.wrapping_sub(min)) as u8);
}
}
} else {
for &v in values {
unsafe {
buf.push_unchecked(v as u8);
}
}
}
}
IntPacking::Bits16 => {
buf.reserve(values.len() * 2);
if use_offset {
for &v in values {
unsafe {
buf.extend_unchecked(&(v.wrapping_sub(min) as u16).to_le_bytes());
}
}
} else {
for &v in values {
unsafe {
buf.extend_unchecked(&(v as u16).to_le_bytes());
}
}
}
}
IntPacking::Bits32 => {
buf.reserve(values.len() * 4);
if use_offset {
for &v in values {
unsafe {
buf.extend_unchecked(&v.wrapping_sub(min).to_le_bytes());
}
}
} else {
#[cfg(target_endian = "little")]
unsafe {
let bytes =
std::slice::from_raw_parts(values.as_ptr() as *const u8, values.len() * 4);
buf.extend_unchecked(bytes);
}
#[cfg(target_endian = "big")]
for &v in values {
unsafe {
buf.extend_unchecked(&v.to_le_bytes());
}
}
}
}
IntPacking::Bits64 => unreachable!("u32 can't need 64-bit packing"),
}
(packing, use_offset)
}
pub fn pack_u64_adaptive(values: &[u64], buf: &mut UltraBuffer) -> (IntPacking, bool) {
if values.is_empty() {
return (IntPacking::Bits8, false);
}
let (min, max) = sample_minmax_u64(values, 16);
let range = max.wrapping_sub(min);
let basic_packing = IntPacking::from_max_u64(max);
let offset_packing = IntPacking::from_max_u64(range);
let use_offset = offset_packing > basic_packing && values.len() > 5;
let packing = if use_offset {
offset_packing
} else {
basic_packing
};
let data_size = values.len() * packing.bytes_per_value();
let header_size = 1 + if use_offset { 8 } else { 0 };
buf.reserve(header_size + data_size);
let header = (packing as u8) << 1 | (use_offset as u8);
unsafe {
buf.push_unchecked(header);
}
if use_offset {
unsafe {
buf.extend_unchecked(&min.to_le_bytes());
}
}
match packing {
IntPacking::Bits8 => {
buf.reserve(values.len());
if use_offset {
for &v in values {
unsafe {
buf.push_unchecked((v.wrapping_sub(min)) as u8);
}
}
} else {
for &v in values {
unsafe {
buf.push_unchecked(v as u8);
}
}
}
}
IntPacking::Bits16 => {
buf.reserve(values.len() * 2);
if use_offset {
for &v in values {
unsafe {
buf.extend_unchecked(&(v.wrapping_sub(min) as u16).to_le_bytes());
}
}
} else {
for &v in values {
unsafe {
buf.extend_unchecked(&(v as u16).to_le_bytes());
}
}
}
}
IntPacking::Bits32 => {
buf.reserve(values.len() * 4);
if use_offset {
for &v in values {
unsafe {
buf.extend_unchecked(&(v.wrapping_sub(min) as u32).to_le_bytes());
}
}
} else {
for &v in values {
unsafe {
buf.extend_unchecked(&(v as u32).to_le_bytes());
}
}
}
}
IntPacking::Bits64 => {
buf.reserve(values.len() * 8);
if use_offset {
for &v in values {
unsafe {
buf.extend_unchecked(&v.wrapping_sub(min).to_le_bytes());
}
}
} else {
#[cfg(target_endian = "little")]
unsafe {
let bytes =
std::slice::from_raw_parts(values.as_ptr() as *const u8, values.len() * 8);
buf.extend_unchecked(bytes);
}
#[cfg(target_endian = "big")]
for &v in values {
unsafe {
buf.extend_unchecked(&v.to_le_bytes());
}
}
}
}
}
(packing, use_offset)
}
#[allow(dead_code)]
pub fn encode_strings_inline(strings: &[&str], buf: &mut UltraBuffer) {
let total_len: usize = strings.iter().map(|s| s.len()).sum();
let header_size = strings.len() * 4; buf.reserve(header_size + total_len);
for s in strings {
let len = s.len();
if len < 128 {
unsafe {
buf.push_unchecked(len as u8);
}
} else if len < 16384 {
unsafe {
buf.push_unchecked((len as u8) | 0x80);
buf.push_unchecked((len >> 7) as u8);
}
} else {
encode_varint_to_ultra(len as u64, buf);
}
unsafe {
buf.extend_unchecked(s.as_bytes());
}
}
}
#[inline(always)]
pub fn encode_varint_to_ultra(mut value: u64, buf: &mut UltraBuffer) {
buf.reserve(10); while value >= 0x80 {
unsafe {
buf.push_unchecked((value as u8) | 0x80);
}
value >>= 7;
}
unsafe {
buf.push_unchecked(value as u8);
}
}
pub const ULTRA_MAGIC: [u8; 4] = [0x55, 0x4C, 0x54, 0x01];
pub const ULTRA_VERSION: u8 = 1;
pub trait UltraEncode {
fn column_count() -> usize;
fn collect_columns(items: &[Self], collectors: &mut ColumnCollectors)
where
Self: Sized;
fn ultra_encode_slice(items: &[Self]) -> Vec<u8>
where
Self: Sized,
{
if items.is_empty() {
let mut buf = UltraBuffer::with_capacity(16);
buf.extend(&ULTRA_MAGIC);
buf.push(ULTRA_VERSION);
buf.push(0); encode_varint_to_ultra(0, &mut buf); return buf.into_vec();
}
let estimated_size = items.len() * 20 + 64;
let mut buf = UltraBuffer::with_capacity(estimated_size);
buf.extend(&ULTRA_MAGIC);
buf.push(ULTRA_VERSION);
buf.push(0);
encode_varint_to_ultra(items.len() as u64, &mut buf);
encode_varint_to_ultra(Self::column_count() as u64, &mut buf);
let mut collectors = ColumnCollectors::new(Self::column_count(), items.len());
Self::collect_columns(items, &mut collectors);
collectors.encode_all(&mut buf);
buf.into_vec()
}
}
pub struct DirectU32Encoder {
values: Vec<u32>,
}
impl DirectU32Encoder {
#[inline]
pub fn with_capacity(cap: usize) -> Self {
Self {
values: Vec::with_capacity(cap),
}
}
#[inline(always)]
pub fn push(&mut self, value: u32) {
self.values.push(value);
}
pub fn encode_to(&self, buf: &mut UltraBuffer) {
buf.push(ColumnType::U32 as u8);
pack_u32_adaptive(&self.values, buf);
}
}
pub struct DirectStringEncoder {
data: UltraBuffer,
count: usize,
}
impl DirectStringEncoder {
#[inline]
pub fn with_capacity(cap: usize) -> Self {
Self {
data: UltraBuffer::with_capacity(cap * 16), count: 0,
}
}
#[inline(always)]
pub fn push(&mut self, s: &str) {
let len = s.len();
self.data.reserve(len + 4);
if len < 128 {
unsafe {
self.data.push_unchecked(len as u8);
}
} else if len < 16384 {
unsafe {
self.data.push_unchecked((len as u8) | 0x80);
self.data.push_unchecked((len >> 7) as u8);
}
} else {
encode_varint_to_ultra(len as u64, &mut self.data);
}
unsafe {
self.data.extend_unchecked(s.as_bytes());
}
self.count += 1;
}
pub fn encode_to(self, buf: &mut UltraBuffer) {
buf.push(ColumnType::String as u8);
buf.extend(self.data.as_slice());
}
}
pub trait UltraEncodeDirect {
fn ultra_encode_direct(items: &[Self]) -> Vec<u8>
where
Self: Sized;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum ColumnType {
U32 = 0,
U64 = 1,
I32 = 2,
I64 = 3,
F32 = 4,
F64 = 5,
Bool = 6,
String = 7,
}
pub struct ColumnCollectors {
columns: Vec<ColumnData>,
}
pub enum ColumnData {
U32(Vec<u32>),
U64(Vec<u64>),
I32(Vec<i32>),
I64(Vec<i64>),
F32(Vec<f32>),
F64(Vec<f64>),
Bool(Vec<bool>),
String(Vec<String>),
}
impl ColumnCollectors {
pub fn new(column_count: usize, row_count: usize) -> Self {
let columns = (0..column_count)
.map(|_| ColumnData::U32(Vec::with_capacity(row_count)))
.collect();
Self { columns }
}
pub fn init_column(&mut self, idx: usize, col_type: ColumnType, capacity: usize) {
self.columns[idx] = match col_type {
ColumnType::U32 => ColumnData::U32(Vec::with_capacity(capacity)),
ColumnType::U64 => ColumnData::U64(Vec::with_capacity(capacity)),
ColumnType::I32 => ColumnData::I32(Vec::with_capacity(capacity)),
ColumnType::I64 => ColumnData::I64(Vec::with_capacity(capacity)),
ColumnType::F32 => ColumnData::F32(Vec::with_capacity(capacity)),
ColumnType::F64 => ColumnData::F64(Vec::with_capacity(capacity)),
ColumnType::Bool => ColumnData::Bool(Vec::with_capacity(capacity)),
ColumnType::String => ColumnData::String(Vec::with_capacity(capacity)),
};
}
#[inline(always)]
pub fn push_u32(&mut self, col: usize, value: u32) {
if let ColumnData::U32(ref mut v) = self.columns[col] {
v.push(value);
}
}
#[inline(always)]
pub fn push_u64(&mut self, col: usize, value: u64) {
if let ColumnData::U64(ref mut v) = self.columns[col] {
v.push(value);
}
}
#[inline(always)]
pub fn push_i32(&mut self, col: usize, value: i32) {
if let ColumnData::I32(ref mut v) = self.columns[col] {
v.push(value);
}
}
#[inline(always)]
pub fn push_i64(&mut self, col: usize, value: i64) {
if let ColumnData::I64(ref mut v) = self.columns[col] {
v.push(value);
}
}
#[inline(always)]
pub fn push_f32(&mut self, col: usize, value: f32) {
if let ColumnData::F32(ref mut v) = self.columns[col] {
v.push(value);
}
}
#[inline(always)]
pub fn push_f64(&mut self, col: usize, value: f64) {
if let ColumnData::F64(ref mut v) = self.columns[col] {
v.push(value);
}
}
#[inline(always)]
pub fn push_bool(&mut self, col: usize, value: bool) {
if let ColumnData::Bool(ref mut v) = self.columns[col] {
v.push(value);
}
}
#[inline(always)]
pub fn push_string(&mut self, col: usize, value: &str) {
if let ColumnData::String(ref mut v) = self.columns[col] {
v.push(value.to_string());
}
}
pub fn encode_all(&self, buf: &mut UltraBuffer) {
for col in &self.columns {
match col {
ColumnData::U32(values) => {
buf.push(ColumnType::U32 as u8);
pack_u32_adaptive(values, buf);
}
ColumnData::U64(values) => {
buf.push(ColumnType::U64 as u8);
pack_u64_adaptive(values, buf);
}
ColumnData::I32(values) => {
buf.push(ColumnType::I32 as u8);
let unsigned: Vec<u32> = values
.iter()
.map(|&v| ((v << 1) ^ (v >> 31)) as u32)
.collect();
pack_u32_adaptive(&unsigned, buf);
}
ColumnData::I64(values) => {
buf.push(ColumnType::I64 as u8);
let unsigned: Vec<u64> = values
.iter()
.map(|&v| ((v << 1) ^ (v >> 63)) as u64)
.collect();
pack_u64_adaptive(&unsigned, buf);
}
ColumnData::F32(values) => {
buf.push(ColumnType::F32 as u8);
encode_f32_column(values, buf);
}
ColumnData::F64(values) => {
buf.push(ColumnType::F64 as u8);
encode_f64_column(values, buf);
}
ColumnData::Bool(values) => {
buf.push(ColumnType::Bool as u8);
encode_bool_column(values, buf);
}
ColumnData::String(values) => {
buf.push(ColumnType::String as u8);
encode_string_column(values, buf);
}
}
}
}
}
fn encode_f32_column(values: &[f32], buf: &mut UltraBuffer) {
buf.reserve(values.len() * 4);
#[cfg(target_endian = "little")]
unsafe {
let bytes = std::slice::from_raw_parts(values.as_ptr() as *const u8, values.len() * 4);
buf.extend_unchecked(bytes);
}
#[cfg(target_endian = "big")]
for &v in values {
unsafe {
buf.extend_unchecked(&v.to_le_bytes());
}
}
}
fn encode_f64_column(values: &[f64], buf: &mut UltraBuffer) {
buf.reserve(values.len() * 8);
#[cfg(target_endian = "little")]
unsafe {
let bytes = std::slice::from_raw_parts(values.as_ptr() as *const u8, values.len() * 8);
buf.extend_unchecked(bytes);
}
#[cfg(target_endian = "big")]
for &v in values {
unsafe {
buf.extend_unchecked(&v.to_le_bytes());
}
}
}
fn encode_bool_column(values: &[bool], buf: &mut UltraBuffer) {
let bytes_needed = values.len().div_ceil(8);
buf.reserve(bytes_needed);
let chunks = values.chunks(8);
for chunk in chunks {
let mut byte = 0u8;
for (i, &b) in chunk.iter().enumerate() {
if b {
byte |= 1 << i;
}
}
unsafe {
buf.push_unchecked(byte);
}
}
}
fn encode_string_column(values: &[String], buf: &mut UltraBuffer) {
let total_bytes: usize = values.iter().map(|s| s.len()).sum();
let max_len_bytes = values.len() * 4; buf.reserve(max_len_bytes + total_bytes);
for s in values {
let len = s.len();
if len < 128 {
unsafe {
buf.push_unchecked(len as u8);
}
} else if len < 16384 {
unsafe {
buf.push_unchecked((len as u8) | 0x80);
buf.push_unchecked((len >> 7) as u8);
}
} else {
encode_varint_to_ultra(len as u64, buf);
}
unsafe {
buf.extend_unchecked(s.as_bytes());
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_ultra_buffer_basic() {
let mut buf = UltraBuffer::with_capacity(100);
buf.push(1);
buf.push(2);
buf.push(3);
assert_eq!(buf.as_slice(), &[1, 2, 3]);
assert_eq!(buf.len(), 3);
}
#[test]
fn test_ultra_buffer_extend() {
let mut buf = UltraBuffer::with_capacity(100);
buf.extend(&[1, 2, 3, 4, 5]);
assert_eq!(buf.as_slice(), &[1, 2, 3, 4, 5]);
}
#[test]
fn test_ultra_buffer_grow() {
let mut buf = UltraBuffer::with_capacity(4);
for i in 0..100u8 {
buf.push(i);
}
assert_eq!(buf.len(), 100);
assert!(buf.capacity() >= 100);
}
#[test]
fn test_pack_u32_small_values() {
let values = vec![1u32, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let mut buf = UltraBuffer::with_capacity(100);
let (packing, offset) = pack_u32_adaptive(&values, &mut buf);
assert_eq!(packing, IntPacking::Bits8);
assert!(!offset);
assert_eq!(buf.len(), 11);
}
#[test]
fn test_pack_u32_with_offset() {
let values: Vec<u32> = (1000..1010).collect();
let mut buf = UltraBuffer::with_capacity(100);
let (packing, offset) = pack_u32_adaptive(&values, &mut buf);
assert_eq!(packing, IntPacking::Bits8);
assert!(offset);
}
#[test]
fn test_pack_u32_large_values() {
let values = vec![100000u32, 200000, 300000, 400000, 500000];
let mut buf = UltraBuffer::with_capacity(100);
let (packing, _) = pack_u32_adaptive(&values, &mut buf);
assert_eq!(packing, IntPacking::Bits32);
}
#[test]
fn test_bool_column() {
let values = vec![true, false, true, true, false, true, false, false, true];
let mut buf = UltraBuffer::with_capacity(100);
encode_bool_column(&values, &mut buf);
assert_eq!(buf.len(), 2);
assert_eq!(buf.as_slice()[0], 0b00101101);
assert_eq!(buf.as_slice()[1], 0b00000001);
}
}